ironflow_engine/notify/event.rs
1//! Domain events emitted throughout the ironflow lifecycle.
2
3use chrono::{DateTime, Utc};
4use rust_decimal::Decimal;
5use serde::{Deserialize, Serialize};
6use uuid::Uuid;
7
8use ironflow_store::models::{RunStatus, StepKind};
9
10/// A domain event emitted by the ironflow system.
11///
12/// Covers the full lifecycle: runs, steps, approvals, and authentication.
13/// Subscribers receive these via [`EventPublisher`](super::EventPublisher)
14/// and pattern-match on the variants they care about.
15///
16/// # Examples
17///
18/// ```
19/// use ironflow_engine::notify::Event;
20/// use ironflow_store::models::RunStatus;
21/// use uuid::Uuid;
22///
23/// let event = Event::RunStatusChanged {
24/// run_id: Uuid::now_v7(),
25/// workflow_name: "deploy".to_string(),
26/// from: RunStatus::Running,
27/// to: RunStatus::Completed,
28/// error: None,
29/// cost_usd: rust_decimal::Decimal::ZERO,
30/// duration_ms: 5000,
31/// at: chrono::Utc::now(),
32/// };
33/// ```
34#[derive(Debug, Clone, Serialize, Deserialize)]
35#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
36#[serde(tag = "type", rename_all = "snake_case")]
37pub enum Event {
38 // -- Run lifecycle --
39 /// A new run was created (status: Pending).
40 RunCreated {
41 /// Run identifier.
42 run_id: Uuid,
43 /// Workflow name.
44 workflow_name: String,
45 /// When the run was created.
46 at: DateTime<Utc>,
47 },
48
49 /// A run changed status.
50 RunStatusChanged {
51 /// Run identifier.
52 run_id: Uuid,
53 /// Workflow name.
54 workflow_name: String,
55 /// Previous status.
56 from: RunStatus,
57 /// New status.
58 to: RunStatus,
59 /// Error message (when transitioning to Failed).
60 error: Option<String>,
61 /// Aggregated cost in USD at the time of transition.
62 cost_usd: Decimal,
63 /// Aggregated duration in milliseconds at the time of transition.
64 duration_ms: u64,
65 /// When the transition occurred.
66 at: DateTime<Utc>,
67 },
68
69 /// A run transitioned to [`Failed`](ironflow_store::models::RunStatus::Failed).
70 ///
71 /// This is a convenience event emitted alongside [`RunStatusChanged`](Event::RunStatusChanged)
72 /// when the target status is `Failed`. Subscribe to this instead of
73 /// `RUN_STATUS_CHANGED` when you only care about failures.
74 RunFailed {
75 /// Run identifier.
76 run_id: Uuid,
77 /// Workflow name.
78 workflow_name: String,
79 /// Error message.
80 error: Option<String>,
81 /// Aggregated cost in USD at the time of failure.
82 cost_usd: Decimal,
83 /// Aggregated duration in milliseconds at the time of failure.
84 duration_ms: u64,
85 /// When the failure occurred.
86 at: DateTime<Utc>,
87 },
88
89 // -- Step lifecycle --
90 /// A step completed successfully.
91 StepCompleted {
92 /// Run identifier.
93 run_id: Uuid,
94 /// Step identifier.
95 step_id: Uuid,
96 /// Human-readable step name.
97 step_name: String,
98 /// Step operation kind.
99 #[cfg_attr(feature = "openapi", schema(value_type = String))]
100 kind: StepKind,
101 /// Step duration in milliseconds.
102 duration_ms: u64,
103 /// Step cost in USD.
104 cost_usd: Decimal,
105 /// When the step completed.
106 at: DateTime<Utc>,
107 },
108
109 /// A step failed.
110 StepFailed {
111 /// Run identifier.
112 run_id: Uuid,
113 /// Step identifier.
114 step_id: Uuid,
115 /// Human-readable step name.
116 step_name: String,
117 /// Step operation kind.
118 #[cfg_attr(feature = "openapi", schema(value_type = String))]
119 kind: StepKind,
120 /// Error message.
121 error: String,
122 /// When the step failed.
123 at: DateTime<Utc>,
124 },
125
126 // -- Approval --
127 /// A run is waiting for human approval.
128 ApprovalRequested {
129 /// Run identifier.
130 run_id: Uuid,
131 /// Approval step identifier.
132 step_id: Uuid,
133 /// Message displayed to reviewers.
134 message: String,
135 /// When the approval was requested.
136 at: DateTime<Utc>,
137 },
138
139 /// A run was approved by a human.
140 ApprovalGranted {
141 /// Run identifier.
142 run_id: Uuid,
143 /// User who approved (ID or username).
144 approved_by: String,
145 /// When the approval was granted.
146 at: DateTime<Utc>,
147 },
148
149 /// A run was rejected by a human.
150 ApprovalRejected {
151 /// Run identifier.
152 run_id: Uuid,
153 /// User who rejected (ID or username).
154 rejected_by: String,
155 /// When the rejection occurred.
156 at: DateTime<Utc>,
157 },
158
159 // -- Authentication --
160 /// A user signed in.
161 UserSignedIn {
162 /// User identifier.
163 user_id: Uuid,
164 /// Username.
165 username: String,
166 /// When the sign-in occurred.
167 at: DateTime<Utc>,
168 },
169
170 /// A new user signed up.
171 UserSignedUp {
172 /// User identifier.
173 user_id: Uuid,
174 /// Username.
175 username: String,
176 /// When the sign-up occurred.
177 at: DateTime<Utc>,
178 },
179
180 /// A user signed out.
181 UserSignedOut {
182 /// User identifier.
183 user_id: Uuid,
184 /// When the sign-out occurred.
185 at: DateTime<Utc>,
186 },
187}
188
189impl Event {
190 /// Event type constant for [`RunCreated`](Event::RunCreated).
191 pub const RUN_CREATED: &'static str = "run_created";
192 /// Event type constant for [`RunStatusChanged`](Event::RunStatusChanged).
193 pub const RUN_STATUS_CHANGED: &'static str = "run_status_changed";
194 /// Event type constant for [`RunFailed`](Event::RunFailed).
195 pub const RUN_FAILED: &'static str = "run_failed";
196 /// Event type constant for [`StepCompleted`](Event::StepCompleted).
197 pub const STEP_COMPLETED: &'static str = "step_completed";
198 /// Event type constant for [`StepFailed`](Event::StepFailed).
199 pub const STEP_FAILED: &'static str = "step_failed";
200 /// Event type constant for [`ApprovalRequested`](Event::ApprovalRequested).
201 pub const APPROVAL_REQUESTED: &'static str = "approval_requested";
202 /// Event type constant for [`ApprovalGranted`](Event::ApprovalGranted).
203 pub const APPROVAL_GRANTED: &'static str = "approval_granted";
204 /// Event type constant for [`ApprovalRejected`](Event::ApprovalRejected).
205 pub const APPROVAL_REJECTED: &'static str = "approval_rejected";
206 /// Event type constant for [`UserSignedIn`](Event::UserSignedIn).
207 pub const USER_SIGNED_IN: &'static str = "user_signed_in";
208 /// Event type constant for [`UserSignedUp`](Event::UserSignedUp).
209 pub const USER_SIGNED_UP: &'static str = "user_signed_up";
210 /// Event type constant for [`UserSignedOut`](Event::UserSignedOut).
211 pub const USER_SIGNED_OUT: &'static str = "user_signed_out";
212
213 /// All event types. Pass this to
214 /// [`EventPublisher::subscribe`](super::EventPublisher::subscribe) to
215 /// receive every event.
216 ///
217 /// # Examples
218 ///
219 /// ```no_run
220 /// use ironflow_engine::notify::{Event, EventPublisher, WebhookSubscriber};
221 ///
222 /// let mut publisher = EventPublisher::new();
223 /// publisher.subscribe(
224 /// WebhookSubscriber::new("https://example.com/all"),
225 /// Event::ALL,
226 /// );
227 /// ```
228 pub const ALL: &'static [&'static str] = &[
229 Self::RUN_CREATED,
230 Self::RUN_STATUS_CHANGED,
231 Self::RUN_FAILED,
232 Self::STEP_COMPLETED,
233 Self::STEP_FAILED,
234 Self::APPROVAL_REQUESTED,
235 Self::APPROVAL_GRANTED,
236 Self::APPROVAL_REJECTED,
237 Self::USER_SIGNED_IN,
238 Self::USER_SIGNED_UP,
239 Self::USER_SIGNED_OUT,
240 ];
241
242 /// Returns the event type as a static string (e.g. `"run_status_changed"`).
243 ///
244 /// Useful for filtering and logging without deserializing.
245 ///
246 /// # Examples
247 ///
248 /// ```
249 /// use ironflow_engine::notify::Event;
250 /// use uuid::Uuid;
251 /// use chrono::Utc;
252 ///
253 /// let event = Event::UserSignedIn {
254 /// user_id: Uuid::now_v7(),
255 /// username: "alice".to_string(),
256 /// at: Utc::now(),
257 /// };
258 /// assert_eq!(event.event_type(), "user_signed_in");
259 /// ```
260 pub fn event_type(&self) -> &'static str {
261 match self {
262 Event::RunCreated { .. } => Self::RUN_CREATED,
263 Event::RunStatusChanged { .. } => Self::RUN_STATUS_CHANGED,
264 Event::RunFailed { .. } => Self::RUN_FAILED,
265 Event::StepCompleted { .. } => Self::STEP_COMPLETED,
266 Event::StepFailed { .. } => Self::STEP_FAILED,
267 Event::ApprovalRequested { .. } => Self::APPROVAL_REQUESTED,
268 Event::ApprovalGranted { .. } => Self::APPROVAL_GRANTED,
269 Event::ApprovalRejected { .. } => Self::APPROVAL_REJECTED,
270 Event::UserSignedIn { .. } => Self::USER_SIGNED_IN,
271 Event::UserSignedUp { .. } => Self::USER_SIGNED_UP,
272 Event::UserSignedOut { .. } => Self::USER_SIGNED_OUT,
273 }
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn run_status_changed_serde_roundtrip() {
283 let event = Event::RunStatusChanged {
284 run_id: Uuid::now_v7(),
285 workflow_name: "deploy".to_string(),
286 from: RunStatus::Running,
287 to: RunStatus::Completed,
288 error: None,
289 cost_usd: Decimal::new(42, 2),
290 duration_ms: 5000,
291 at: Utc::now(),
292 };
293
294 let json = serde_json::to_string(&event).expect("serialize");
295 let back: Event = serde_json::from_str(&json).expect("deserialize");
296
297 assert_eq!(back.event_type(), "run_status_changed");
298 assert!(json.contains("\"type\":\"run_status_changed\""));
299 }
300
301 #[test]
302 fn run_failed_serde_roundtrip() {
303 let event = Event::RunFailed {
304 run_id: Uuid::now_v7(),
305 workflow_name: "deploy".to_string(),
306 error: Some("step crashed".to_string()),
307 cost_usd: Decimal::new(10, 2),
308 duration_ms: 3000,
309 at: Utc::now(),
310 };
311
312 let json = serde_json::to_string(&event).expect("serialize");
313 let back: Event = serde_json::from_str(&json).expect("deserialize");
314
315 assert_eq!(back.event_type(), "run_failed");
316 assert!(json.contains("\"type\":\"run_failed\""));
317 assert!(json.contains("step crashed"));
318 }
319
320 #[test]
321 fn user_signed_in_serde_roundtrip() {
322 let event = Event::UserSignedIn {
323 user_id: Uuid::now_v7(),
324 username: "alice".to_string(),
325 at: Utc::now(),
326 };
327
328 let json = serde_json::to_string(&event).expect("serialize");
329 let back: Event = serde_json::from_str(&json).expect("deserialize");
330
331 assert_eq!(back.event_type(), "user_signed_in");
332 assert!(json.contains("alice"));
333 }
334
335 #[test]
336 fn step_failed_serde_roundtrip() {
337 let event = Event::StepFailed {
338 run_id: Uuid::now_v7(),
339 step_id: Uuid::now_v7(),
340 step_name: "build".to_string(),
341 kind: StepKind::Shell,
342 error: "exit code 1".to_string(),
343 at: Utc::now(),
344 };
345
346 let json = serde_json::to_string(&event).expect("serialize");
347 let back: Event = serde_json::from_str(&json).expect("deserialize");
348
349 assert_eq!(back.event_type(), "step_failed");
350 }
351
352 #[test]
353 fn approval_requested_serde_roundtrip() {
354 let event = Event::ApprovalRequested {
355 run_id: Uuid::now_v7(),
356 step_id: Uuid::now_v7(),
357 message: "Deploy to prod?".to_string(),
358 at: Utc::now(),
359 };
360
361 let json = serde_json::to_string(&event).expect("serialize");
362 assert!(json.contains("approval_requested"));
363 }
364
365 #[test]
366 fn event_type_all_variants() {
367 let id = Uuid::now_v7();
368 let now = Utc::now();
369
370 let cases: Vec<(Event, &str)> = vec![
371 (
372 Event::RunCreated {
373 run_id: id,
374 workflow_name: "w".to_string(),
375 at: now,
376 },
377 "run_created",
378 ),
379 (
380 Event::RunStatusChanged {
381 run_id: id,
382 workflow_name: "w".to_string(),
383 from: RunStatus::Pending,
384 to: RunStatus::Running,
385 error: None,
386 cost_usd: Decimal::ZERO,
387 duration_ms: 0,
388 at: now,
389 },
390 "run_status_changed",
391 ),
392 (
393 Event::RunFailed {
394 run_id: id,
395 workflow_name: "w".to_string(),
396 error: Some("boom".to_string()),
397 cost_usd: Decimal::ZERO,
398 duration_ms: 0,
399 at: now,
400 },
401 "run_failed",
402 ),
403 (
404 Event::StepCompleted {
405 run_id: id,
406 step_id: id,
407 step_name: "s".to_string(),
408 kind: StepKind::Shell,
409 duration_ms: 0,
410 cost_usd: Decimal::ZERO,
411 at: now,
412 },
413 "step_completed",
414 ),
415 (
416 Event::StepFailed {
417 run_id: id,
418 step_id: id,
419 step_name: "s".to_string(),
420 kind: StepKind::Shell,
421 error: "err".to_string(),
422 at: now,
423 },
424 "step_failed",
425 ),
426 (
427 Event::ApprovalRequested {
428 run_id: id,
429 step_id: id,
430 message: "ok?".to_string(),
431 at: now,
432 },
433 "approval_requested",
434 ),
435 (
436 Event::ApprovalGranted {
437 run_id: id,
438 approved_by: "alice".to_string(),
439 at: now,
440 },
441 "approval_granted",
442 ),
443 (
444 Event::ApprovalRejected {
445 run_id: id,
446 rejected_by: "bob".to_string(),
447 at: now,
448 },
449 "approval_rejected",
450 ),
451 (
452 Event::UserSignedIn {
453 user_id: id,
454 username: "u".to_string(),
455 at: now,
456 },
457 "user_signed_in",
458 ),
459 (
460 Event::UserSignedUp {
461 user_id: id,
462 username: "u".to_string(),
463 at: now,
464 },
465 "user_signed_up",
466 ),
467 (
468 Event::UserSignedOut {
469 user_id: id,
470 at: now,
471 },
472 "user_signed_out",
473 ),
474 ];
475
476 for (event, expected_type) in cases {
477 assert_eq!(event.event_type(), expected_type);
478 }
479 }
480}