Skip to main content

ironflow_engine/notify/
event.rs

1//! Domain events emitted throughout the ironflow lifecycle.
2
3use chrono::{DateTime, Utc};
4use rust_decimal::Decimal;
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use ironflow_store::models::{RunStatus, StepKind};
9
10/// Output stream for a [`LogLine`](Event::LogLine) event.
11///
12/// # Examples
13///
14/// ```
15/// use ironflow_engine::notify::LogStream;
16///
17/// let stream: LogStream = "stdout".parse().unwrap();
18/// assert_eq!(stream.as_str(), "stdout");
19/// ```
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
21#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
22#[serde(rename_all = "snake_case")]
23pub enum LogStream {
24    /// Standard output.
25    Stdout,
26    /// Standard error.
27    Stderr,
28    /// System-level messages (e.g. step start/stop notifications).
29    System,
30}
31
32impl LogStream {
33    /// Returns the wire-format string for this stream.
34    pub fn as_str(&self) -> &'static str {
35        match self {
36            Self::Stdout => "stdout",
37            Self::Stderr => "stderr",
38            Self::System => "system",
39        }
40    }
41}
42
43impl std::fmt::Display for LogStream {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.write_str(self.as_str())
46    }
47}
48
49impl std::str::FromStr for LogStream {
50    type Err = String;
51
52    fn from_str(s: &str) -> Result<Self, Self::Err> {
53        match s {
54            "stdout" => Ok(Self::Stdout),
55            "stderr" => Ok(Self::Stderr),
56            "system" => Ok(Self::System),
57            _ => Err(format!("unknown log stream: {s}")),
58        }
59    }
60}
61
62/// A domain event emitted by the ironflow system.
63///
64/// Covers the full lifecycle: runs, steps, approvals, and authentication.
65/// Subscribers receive these via [`EventPublisher`](super::EventPublisher)
66/// and pattern-match on the variants they care about.
67///
68/// # Examples
69///
70/// ```
71/// use ironflow_engine::notify::Event;
72/// use ironflow_store::models::RunStatus;
73/// use uuid::Uuid;
74///
75/// let event = Event::RunStatusChanged {
76///     run_id: Uuid::now_v7(),
77///     workflow_name: "deploy".to_string(),
78///     from: RunStatus::Running,
79///     to: RunStatus::Completed,
80///     error: None,
81///     cost_usd: rust_decimal::Decimal::ZERO,
82///     duration_ms: 5000,
83///     at: chrono::Utc::now(),
84/// };
85/// ```
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
88#[serde(tag = "type", rename_all = "snake_case")]
89pub enum Event {
90    // -- Run lifecycle --
91    /// A new run was created (status: Pending).
92    RunCreated {
93        /// Run identifier.
94        run_id: Uuid,
95        /// Workflow name.
96        workflow_name: String,
97        /// When the run was created.
98        at: DateTime<Utc>,
99    },
100
101    /// A run changed status.
102    RunStatusChanged {
103        /// Run identifier.
104        run_id: Uuid,
105        /// Workflow name.
106        workflow_name: String,
107        /// Previous status.
108        from: RunStatus,
109        /// New status.
110        to: RunStatus,
111        /// Error message (when transitioning to Failed).
112        error: Option<String>,
113        /// Aggregated cost in USD at the time of transition.
114        cost_usd: Decimal,
115        /// Aggregated duration in milliseconds at the time of transition.
116        duration_ms: u64,
117        /// When the transition occurred.
118        at: DateTime<Utc>,
119    },
120
121    /// A run transitioned to [`Failed`](ironflow_store::models::RunStatus::Failed).
122    ///
123    /// This is a convenience event emitted alongside [`RunStatusChanged`](Event::RunStatusChanged)
124    /// when the target status is `Failed`. Subscribe to this instead of
125    /// `RUN_STATUS_CHANGED` when you only care about failures.
126    RunFailed {
127        /// Run identifier.
128        run_id: Uuid,
129        /// Workflow name.
130        workflow_name: String,
131        /// Error message.
132        error: Option<String>,
133        /// Aggregated cost in USD at the time of failure.
134        cost_usd: Decimal,
135        /// Aggregated duration in milliseconds at the time of failure.
136        duration_ms: u64,
137        /// When the failure occurred.
138        at: DateTime<Utc>,
139    },
140
141    // -- Step lifecycle --
142    /// A step completed successfully.
143    StepCompleted {
144        /// Run identifier.
145        run_id: Uuid,
146        /// Step identifier.
147        step_id: Uuid,
148        /// Human-readable step name.
149        step_name: String,
150        /// Step operation kind.
151        #[cfg_attr(feature = "openapi", schema(value_type = String))]
152        kind: StepKind,
153        /// Step duration in milliseconds.
154        duration_ms: u64,
155        /// Step cost in USD.
156        cost_usd: Decimal,
157        /// When the step completed.
158        at: DateTime<Utc>,
159    },
160
161    /// A step failed.
162    StepFailed {
163        /// Run identifier.
164        run_id: Uuid,
165        /// Step identifier.
166        step_id: Uuid,
167        /// Human-readable step name.
168        step_name: String,
169        /// Step operation kind.
170        #[cfg_attr(feature = "openapi", schema(value_type = String))]
171        kind: StepKind,
172        /// Error message.
173        error: String,
174        /// When the step failed.
175        at: DateTime<Utc>,
176    },
177
178    // -- Approval --
179    /// A run is waiting for human approval.
180    ApprovalRequested {
181        /// Run identifier.
182        run_id: Uuid,
183        /// Approval step identifier.
184        step_id: Uuid,
185        /// Message displayed to reviewers.
186        message: String,
187        /// When the approval was requested.
188        at: DateTime<Utc>,
189    },
190
191    /// A run was approved by a human.
192    ApprovalGranted {
193        /// Run identifier.
194        run_id: Uuid,
195        /// User who approved (ID or username).
196        approved_by: String,
197        /// When the approval was granted.
198        at: DateTime<Utc>,
199    },
200
201    /// A run was rejected by a human.
202    ApprovalRejected {
203        /// Run identifier.
204        run_id: Uuid,
205        /// User who rejected (ID or username).
206        rejected_by: String,
207        /// When the rejection occurred.
208        at: DateTime<Utc>,
209    },
210
211    // -- Log streaming --
212    /// A log line emitted during step execution.
213    ///
214    /// Pushed by the worker in real time so that SSE clients can stream
215    /// step output as it happens, without waiting for step completion.
216    LogLine {
217        /// Run identifier.
218        run_id: Uuid,
219        /// Step identifier.
220        step_id: Uuid,
221        /// Human-readable step name.
222        step_name: String,
223        /// Output stream.
224        stream: LogStream,
225        /// The log line content.
226        line: String,
227        /// When the line was emitted.
228        at: DateTime<Utc>,
229    },
230
231    // -- Authentication --
232    /// A user signed in.
233    UserSignedIn {
234        /// User identifier.
235        user_id: Uuid,
236        /// Username.
237        username: String,
238        /// When the sign-in occurred.
239        at: DateTime<Utc>,
240    },
241
242    /// A new user signed up.
243    UserSignedUp {
244        /// User identifier.
245        user_id: Uuid,
246        /// Username.
247        username: String,
248        /// When the sign-up occurred.
249        at: DateTime<Utc>,
250    },
251
252    /// A user signed out.
253    UserSignedOut {
254        /// User identifier.
255        user_id: Uuid,
256        /// When the sign-out occurred.
257        at: DateTime<Utc>,
258    },
259}
260
261impl Event {
262    /// Event type constant for [`RunCreated`](Event::RunCreated).
263    pub const RUN_CREATED: &'static str = "run_created";
264    /// Event type constant for [`RunStatusChanged`](Event::RunStatusChanged).
265    pub const RUN_STATUS_CHANGED: &'static str = "run_status_changed";
266    /// Event type constant for [`RunFailed`](Event::RunFailed).
267    pub const RUN_FAILED: &'static str = "run_failed";
268    /// Event type constant for [`StepCompleted`](Event::StepCompleted).
269    pub const STEP_COMPLETED: &'static str = "step_completed";
270    /// Event type constant for [`StepFailed`](Event::StepFailed).
271    pub const STEP_FAILED: &'static str = "step_failed";
272    /// Event type constant for [`ApprovalRequested`](Event::ApprovalRequested).
273    pub const APPROVAL_REQUESTED: &'static str = "approval_requested";
274    /// Event type constant for [`ApprovalGranted`](Event::ApprovalGranted).
275    pub const APPROVAL_GRANTED: &'static str = "approval_granted";
276    /// Event type constant for [`ApprovalRejected`](Event::ApprovalRejected).
277    pub const APPROVAL_REJECTED: &'static str = "approval_rejected";
278    /// Event type constant for [`LogLine`](Event::LogLine).
279    pub const LOG_LINE: &'static str = "log_line";
280    /// Event type constant for [`UserSignedIn`](Event::UserSignedIn).
281    pub const USER_SIGNED_IN: &'static str = "user_signed_in";
282    /// Event type constant for [`UserSignedUp`](Event::UserSignedUp).
283    pub const USER_SIGNED_UP: &'static str = "user_signed_up";
284    /// Event type constant for [`UserSignedOut`](Event::UserSignedOut).
285    pub const USER_SIGNED_OUT: &'static str = "user_signed_out";
286
287    /// All event types. Pass this to
288    /// [`EventPublisher::subscribe`](super::EventPublisher::subscribe) to
289    /// receive every event.
290    ///
291    /// # Examples
292    ///
293    /// ```no_run
294    /// use ironflow_engine::notify::{Event, EventPublisher, WebhookSubscriber};
295    ///
296    /// let mut publisher = EventPublisher::new();
297    /// publisher.subscribe(
298    ///     WebhookSubscriber::new("https://example.com/all"),
299    ///     Event::ALL,
300    /// );
301    /// ```
302    pub const ALL: &'static [&'static str] = &[
303        Self::RUN_CREATED,
304        Self::RUN_STATUS_CHANGED,
305        Self::RUN_FAILED,
306        Self::STEP_COMPLETED,
307        Self::STEP_FAILED,
308        Self::APPROVAL_REQUESTED,
309        Self::APPROVAL_GRANTED,
310        Self::APPROVAL_REJECTED,
311        Self::LOG_LINE,
312        Self::USER_SIGNED_IN,
313        Self::USER_SIGNED_UP,
314        Self::USER_SIGNED_OUT,
315    ];
316
317    /// Returns the event type as a static string (e.g. `"run_status_changed"`).
318    ///
319    /// Useful for filtering and logging without deserializing.
320    ///
321    /// # Examples
322    ///
323    /// ```
324    /// use ironflow_engine::notify::Event;
325    /// use uuid::Uuid;
326    /// use chrono::Utc;
327    ///
328    /// let event = Event::UserSignedIn {
329    ///     user_id: Uuid::now_v7(),
330    ///     username: "alice".to_string(),
331    ///     at: Utc::now(),
332    /// };
333    /// assert_eq!(event.event_type(), "user_signed_in");
334    /// ```
335    pub fn event_type(&self) -> &'static str {
336        match self {
337            Event::RunCreated { .. } => Self::RUN_CREATED,
338            Event::RunStatusChanged { .. } => Self::RUN_STATUS_CHANGED,
339            Event::RunFailed { .. } => Self::RUN_FAILED,
340            Event::StepCompleted { .. } => Self::STEP_COMPLETED,
341            Event::StepFailed { .. } => Self::STEP_FAILED,
342            Event::ApprovalRequested { .. } => Self::APPROVAL_REQUESTED,
343            Event::ApprovalGranted { .. } => Self::APPROVAL_GRANTED,
344            Event::ApprovalRejected { .. } => Self::APPROVAL_REJECTED,
345            Event::LogLine { .. } => Self::LOG_LINE,
346            Event::UserSignedIn { .. } => Self::USER_SIGNED_IN,
347            Event::UserSignedUp { .. } => Self::USER_SIGNED_UP,
348            Event::UserSignedOut { .. } => Self::USER_SIGNED_OUT,
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    #[test]
358    fn run_status_changed_serde_roundtrip() {
359        let event = Event::RunStatusChanged {
360            run_id: Uuid::now_v7(),
361            workflow_name: "deploy".to_string(),
362            from: RunStatus::Running,
363            to: RunStatus::Completed,
364            error: None,
365            cost_usd: Decimal::new(42, 2),
366            duration_ms: 5000,
367            at: Utc::now(),
368        };
369
370        let json = serde_json::to_string(&event).expect("serialize");
371        let back: Event = serde_json::from_str(&json).expect("deserialize");
372
373        assert_eq!(back.event_type(), "run_status_changed");
374        assert!(json.contains("\"type\":\"run_status_changed\""));
375    }
376
377    #[test]
378    fn run_failed_serde_roundtrip() {
379        let event = Event::RunFailed {
380            run_id: Uuid::now_v7(),
381            workflow_name: "deploy".to_string(),
382            error: Some("step crashed".to_string()),
383            cost_usd: Decimal::new(10, 2),
384            duration_ms: 3000,
385            at: Utc::now(),
386        };
387
388        let json = serde_json::to_string(&event).expect("serialize");
389        let back: Event = serde_json::from_str(&json).expect("deserialize");
390
391        assert_eq!(back.event_type(), "run_failed");
392        assert!(json.contains("\"type\":\"run_failed\""));
393        assert!(json.contains("step crashed"));
394    }
395
396    #[test]
397    fn user_signed_in_serde_roundtrip() {
398        let event = Event::UserSignedIn {
399            user_id: Uuid::now_v7(),
400            username: "alice".to_string(),
401            at: Utc::now(),
402        };
403
404        let json = serde_json::to_string(&event).expect("serialize");
405        let back: Event = serde_json::from_str(&json).expect("deserialize");
406
407        assert_eq!(back.event_type(), "user_signed_in");
408        assert!(json.contains("alice"));
409    }
410
411    #[test]
412    fn step_failed_serde_roundtrip() {
413        let event = Event::StepFailed {
414            run_id: Uuid::now_v7(),
415            step_id: Uuid::now_v7(),
416            step_name: "build".to_string(),
417            kind: StepKind::Shell,
418            error: "exit code 1".to_string(),
419            at: Utc::now(),
420        };
421
422        let json = serde_json::to_string(&event).expect("serialize");
423        let back: Event = serde_json::from_str(&json).expect("deserialize");
424
425        assert_eq!(back.event_type(), "step_failed");
426    }
427
428    #[test]
429    fn approval_requested_serde_roundtrip() {
430        let event = Event::ApprovalRequested {
431            run_id: Uuid::now_v7(),
432            step_id: Uuid::now_v7(),
433            message: "Deploy to prod?".to_string(),
434            at: Utc::now(),
435        };
436
437        let json = serde_json::to_string(&event).expect("serialize");
438        assert!(json.contains("approval_requested"));
439    }
440
441    #[test]
442    fn log_line_serde_roundtrip() {
443        let event = Event::LogLine {
444            run_id: Uuid::now_v7(),
445            step_id: Uuid::now_v7(),
446            step_name: "build".to_string(),
447            stream: LogStream::Stdout,
448            line: "Compiling ironflow v0.1.0".to_string(),
449            at: Utc::now(),
450        };
451
452        let json = serde_json::to_string(&event).expect("serialize");
453        let back: Event = serde_json::from_str(&json).expect("deserialize");
454
455        assert_eq!(back.event_type(), "log_line");
456        assert!(json.contains("\"type\":\"log_line\""));
457        assert!(json.contains("Compiling ironflow"));
458    }
459
460    #[test]
461    fn event_type_all_variants() {
462        let id = Uuid::now_v7();
463        let now = Utc::now();
464
465        let cases: Vec<(Event, &str)> = vec![
466            (
467                Event::RunCreated {
468                    run_id: id,
469                    workflow_name: "w".to_string(),
470                    at: now,
471                },
472                "run_created",
473            ),
474            (
475                Event::RunStatusChanged {
476                    run_id: id,
477                    workflow_name: "w".to_string(),
478                    from: RunStatus::Pending,
479                    to: RunStatus::Running,
480                    error: None,
481                    cost_usd: Decimal::ZERO,
482                    duration_ms: 0,
483                    at: now,
484                },
485                "run_status_changed",
486            ),
487            (
488                Event::RunFailed {
489                    run_id: id,
490                    workflow_name: "w".to_string(),
491                    error: Some("boom".to_string()),
492                    cost_usd: Decimal::ZERO,
493                    duration_ms: 0,
494                    at: now,
495                },
496                "run_failed",
497            ),
498            (
499                Event::StepCompleted {
500                    run_id: id,
501                    step_id: id,
502                    step_name: "s".to_string(),
503                    kind: StepKind::Shell,
504                    duration_ms: 0,
505                    cost_usd: Decimal::ZERO,
506                    at: now,
507                },
508                "step_completed",
509            ),
510            (
511                Event::StepFailed {
512                    run_id: id,
513                    step_id: id,
514                    step_name: "s".to_string(),
515                    kind: StepKind::Shell,
516                    error: "err".to_string(),
517                    at: now,
518                },
519                "step_failed",
520            ),
521            (
522                Event::ApprovalRequested {
523                    run_id: id,
524                    step_id: id,
525                    message: "ok?".to_string(),
526                    at: now,
527                },
528                "approval_requested",
529            ),
530            (
531                Event::ApprovalGranted {
532                    run_id: id,
533                    approved_by: "alice".to_string(),
534                    at: now,
535                },
536                "approval_granted",
537            ),
538            (
539                Event::ApprovalRejected {
540                    run_id: id,
541                    rejected_by: "bob".to_string(),
542                    at: now,
543                },
544                "approval_rejected",
545            ),
546            (
547                Event::LogLine {
548                    run_id: id,
549                    step_id: id,
550                    step_name: "build".to_string(),
551                    stream: LogStream::Stdout,
552                    line: "Compiling ironflow v0.1.0".to_string(),
553                    at: now,
554                },
555                "log_line",
556            ),
557            (
558                Event::UserSignedIn {
559                    user_id: id,
560                    username: "u".to_string(),
561                    at: now,
562                },
563                "user_signed_in",
564            ),
565            (
566                Event::UserSignedUp {
567                    user_id: id,
568                    username: "u".to_string(),
569                    at: now,
570                },
571                "user_signed_up",
572            ),
573            (
574                Event::UserSignedOut {
575                    user_id: id,
576                    at: now,
577                },
578                "user_signed_out",
579            ),
580        ];
581
582        for (event, expected_type) in cases {
583            assert_eq!(event.event_type(), expected_type);
584        }
585    }
586}