Skip to main content

aion_core/
event.rs

1//! Workflow history events and their deterministic recording envelope.
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::{
9    ActivityError, ActivityId, Payload, RunId, ScheduleConfig, ScheduleId, SearchAttributeValue,
10    TimerId, WorkflowError, WorkflowId,
11};
12
13/// Metadata recorded with every workflow history event.
14#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
15pub struct EventEnvelope {
16    /// Monotonic sequence number within the owning workflow history.
17    pub seq: u64,
18    /// Recorded UTC timestamp for this event.
19    ///
20    /// This timestamp is the determinism source for `workflow.now`; replay must use the recorded
21    /// value rather than consulting wall-clock time.
22    pub recorded_at: DateTime<Utc>,
23    /// Workflow history that owns this event.
24    pub workflow_id: WorkflowId,
25}
26
27/// A recorded workflow history event.
28///
29/// User data is carried as opaque [`Payload`] values, while failures use the closed workflow and
30/// activity error types from this crate.
31#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq)]
32#[serde(tag = "type", content = "data")]
33pub enum Event {
34    /// A workflow execution started with a type name and input payload.
35    WorkflowStarted {
36        /// Recording metadata for this event.
37        envelope: EventEnvelope,
38        /// Workflow type selected by the caller.
39        workflow_type: String,
40        /// Opaque workflow input payload.
41        input: Payload,
42        /// Concrete run identifier started by this event.
43        run_id: RunId,
44        /// Parent run that continued as this run, when this start is part of a
45        /// continue-as-new chain.
46        parent_run_id: Option<RunId>,
47    },
48    /// A workflow execution completed successfully; this terminal event projects to Completed.
49    WorkflowCompleted {
50        /// Recording metadata for this event.
51        envelope: EventEnvelope,
52        /// Opaque workflow result payload.
53        result: Payload,
54    },
55    /// A workflow execution failed terminally; this terminal event projects to Failed.
56    WorkflowFailed {
57        /// Recording metadata for this event.
58        envelope: EventEnvelope,
59        /// Terminal workflow failure.
60        error: WorkflowError,
61    },
62    /// A workflow execution was cancelled; this terminal event projects to Cancelled.
63    WorkflowCancelled {
64        /// Recording metadata for this event.
65        envelope: EventEnvelope,
66        /// Human-readable cancellation reason.
67        reason: String,
68    },
69    /// A workflow execution timed out; this terminal event projects to `TimedOut`.
70    WorkflowTimedOut {
71        /// Recording metadata for this event.
72        envelope: EventEnvelope,
73        /// Descriptor identifying the timeout that elapsed.
74        ///
75        /// Intentionally stringly-typed: the closed set of timeout kinds is defined by cluster AT
76        /// (timers and signals), not by the core event model.
77        timeout: String,
78    },
79    /// A workflow execution continued as a new run; this terminal event projects to
80    /// `ContinuedAsNew`.
81    WorkflowContinuedAsNew {
82        /// Recording metadata for this event.
83        envelope: EventEnvelope,
84        /// Opaque workflow input payload carried into the new run.
85        input: Payload,
86        /// Workflow type override for the new run, when migration changes the workflow type.
87        ///
88        /// When absent, the new run uses the current workflow type.
89        workflow_type: Option<String>,
90        /// Run identifier for the current run that is being continued.
91        parent_run_id: RunId,
92    },
93    /// Workflow search attributes were updated for visibility and query projection.
94    SearchAttributesUpdated {
95        /// Recording metadata for this event.
96        envelope: EventEnvelope,
97        /// Workflow whose search attributes changed.
98        workflow_id: WorkflowId,
99        /// Updated search attributes keyed by attribute name.
100        attributes: HashMap<String, SearchAttributeValue>,
101    },
102    /// An activity was scheduled by workflow code.
103    ActivityScheduled {
104        /// Recording metadata for this event.
105        envelope: EventEnvelope,
106        /// Deterministic activity identifier derived from the scheduling sequence position.
107        activity_id: ActivityId,
108        /// Activity type selected by workflow code.
109        activity_type: String,
110        /// Opaque activity input payload.
111        input: Payload,
112    },
113    /// An activity worker started executing an activity attempt.
114    ActivityStarted {
115        /// Recording metadata for this event.
116        envelope: EventEnvelope,
117        /// Activity being executed.
118        activity_id: ActivityId,
119    },
120    /// An activity completed successfully.
121    ActivityCompleted {
122        /// Recording metadata for this event.
123        envelope: EventEnvelope,
124        /// Activity that produced the result.
125        activity_id: ActivityId,
126        /// Opaque activity result payload.
127        result: Payload,
128    },
129    /// An activity attempt failed.
130    ///
131    /// The `attempt` field together with [`ActivityError`]'s retryable or terminal classification
132    /// lets replay distinguish a retryable interim failure from a terminal one for the same
133    /// [`ActivityId`].
134    ActivityFailed {
135        /// Recording metadata for this event.
136        envelope: EventEnvelope,
137        /// Activity whose attempt failed.
138        activity_id: ActivityId,
139        /// Classified activity failure.
140        error: ActivityError,
141        /// One-based activity attempt number that produced this failure.
142        attempt: u32,
143    },
144    /// An activity was cancelled as an explicit cancellation outcome.
145    ActivityCancelled {
146        /// Recording metadata for this event.
147        envelope: EventEnvelope,
148        /// Activity that was cancelled.
149        activity_id: ActivityId,
150    },
151    /// A timer was scheduled to fire at a deterministic timestamp.
152    TimerStarted {
153        /// Recording metadata for this event.
154        envelope: EventEnvelope,
155        /// Timer selected by workflow code or assigned by the engine.
156        timer_id: TimerId,
157        /// UTC timestamp at which the timer becomes eligible to fire.
158        fire_at: DateTime<Utc>,
159    },
160    /// A timer fired.
161    TimerFired {
162        /// Recording metadata for this event.
163        envelope: EventEnvelope,
164        /// Timer that fired.
165        timer_id: TimerId,
166    },
167    /// A timer was cancelled as an explicit cancellation outcome.
168    TimerCancelled {
169        /// Recording metadata for this event.
170        envelope: EventEnvelope,
171        /// Timer that was cancelled.
172        timer_id: TimerId,
173    },
174    /// A `with_timeout` operation reached a durable terminal outcome.
175    WithTimeoutCompleted {
176        /// Recording metadata for this event.
177        envelope: EventEnvelope,
178        /// Timer that bounded the operation.
179        timer_id: TimerId,
180        /// Recorded timeout outcome.
181        outcome: WithTimeoutOutcome,
182        /// JSON-encoded BEAM term payload for completed operation results.
183        result: Option<Payload>,
184    },
185    /// A signal was delivered to the workflow.
186    SignalReceived {
187        /// Recording metadata for this event.
188        envelope: EventEnvelope,
189        /// Signal name selected by the sender.
190        name: String,
191        /// Opaque signal payload.
192        payload: Payload,
193    },
194    /// A signal was sent by this workflow to another workflow.
195    SignalSent {
196        /// Recording metadata for this event.
197        envelope: EventEnvelope,
198        /// Target workflow identifier selected by workflow code.
199        target_workflow_id: WorkflowId,
200        /// Signal name selected by workflow code.
201        name: String,
202        /// Opaque signal payload.
203        payload: Payload,
204    },
205    /// A child workflow was started.
206    ChildWorkflowStarted {
207        /// Recording metadata for this event.
208        envelope: EventEnvelope,
209        /// Child workflow identifier.
210        child_workflow_id: WorkflowId,
211        /// Child workflow type selected by the parent.
212        workflow_type: String,
213        /// Opaque child workflow input payload.
214        input: Payload,
215    },
216    /// A child workflow completed successfully.
217    ChildWorkflowCompleted {
218        /// Recording metadata for this event.
219        envelope: EventEnvelope,
220        /// Child workflow that produced the result.
221        child_workflow_id: WorkflowId,
222        /// Opaque child workflow result payload.
223        result: Payload,
224    },
225    /// A child workflow failed terminally.
226    ChildWorkflowFailed {
227        /// Recording metadata for this event.
228        envelope: EventEnvelope,
229        /// Child workflow that failed.
230        child_workflow_id: WorkflowId,
231        /// Terminal child workflow failure.
232        error: WorkflowError,
233    },
234    /// A child workflow was cancelled as an explicit cancellation outcome.
235    ChildWorkflowCancelled {
236        /// Recording metadata for this event.
237        envelope: EventEnvelope,
238        /// Child workflow that was cancelled.
239        child_workflow_id: WorkflowId,
240    },
241    /// A schedule resource was created.
242    ScheduleCreated {
243        /// Recording metadata for this event.
244        envelope: EventEnvelope,
245        /// Schedule resource that was created.
246        schedule_id: ScheduleId,
247        /// Persisted schedule configuration.
248        config: ScheduleConfig,
249    },
250    /// A schedule resource was updated.
251    ScheduleUpdated {
252        /// Recording metadata for this event.
253        envelope: EventEnvelope,
254        /// Schedule resource that was updated.
255        schedule_id: ScheduleId,
256        /// Updated schedule configuration.
257        config: ScheduleConfig,
258    },
259    /// A schedule resource was paused.
260    SchedulePaused {
261        /// Recording metadata for this event.
262        envelope: EventEnvelope,
263        /// Schedule resource that was paused.
264        schedule_id: ScheduleId,
265    },
266    /// A paused schedule resource was resumed.
267    ScheduleResumed {
268        /// Recording metadata for this event.
269        envelope: EventEnvelope,
270        /// Schedule resource that was resumed.
271        schedule_id: ScheduleId,
272    },
273    /// A schedule resource was deleted.
274    ScheduleDeleted {
275        /// Recording metadata for this event.
276        envelope: EventEnvelope,
277        /// Schedule resource that was deleted.
278        schedule_id: ScheduleId,
279    },
280    /// A schedule tick started a workflow execution.
281    ScheduleTriggered {
282        /// Recording metadata for this event.
283        envelope: EventEnvelope,
284        /// Schedule resource that fired.
285        schedule_id: ScheduleId,
286        /// Workflow execution started by the schedule tick.
287        workflow_id: WorkflowId,
288        /// Run started by the schedule tick.
289        run_id: RunId,
290    },
291}
292
293/// Durable terminal outcome for a `with_timeout` operation.
294#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
295pub enum WithTimeoutOutcome {
296    /// The operation closure returned before the deadline.
297    OperationCompleted,
298    /// The deadline fired before the operation completed.
299    TimedOut,
300}
301
302impl Event {
303    /// Returns the envelope recorded with this event.
304    #[must_use]
305    pub const fn envelope(&self) -> &EventEnvelope {
306        match self {
307            Self::WorkflowStarted { envelope, .. }
308            | Self::WorkflowCompleted { envelope, .. }
309            | Self::WorkflowFailed { envelope, .. }
310            | Self::WorkflowCancelled { envelope, .. }
311            | Self::WorkflowTimedOut { envelope, .. }
312            | Self::WorkflowContinuedAsNew { envelope, .. }
313            | Self::SearchAttributesUpdated { envelope, .. }
314            | Self::ActivityScheduled { envelope, .. }
315            | Self::ActivityStarted { envelope, .. }
316            | Self::ActivityCompleted { envelope, .. }
317            | Self::ActivityFailed { envelope, .. }
318            | Self::ActivityCancelled { envelope, .. }
319            | Self::TimerStarted { envelope, .. }
320            | Self::TimerFired { envelope, .. }
321            | Self::TimerCancelled { envelope, .. }
322            | Self::WithTimeoutCompleted { envelope, .. }
323            | Self::SignalReceived { envelope, .. }
324            | Self::SignalSent { envelope, .. }
325            | Self::ChildWorkflowStarted { envelope, .. }
326            | Self::ChildWorkflowCompleted { envelope, .. }
327            | Self::ChildWorkflowFailed { envelope, .. }
328            | Self::ChildWorkflowCancelled { envelope, .. }
329            | Self::ScheduleCreated { envelope, .. }
330            | Self::ScheduleUpdated { envelope, .. }
331            | Self::SchedulePaused { envelope, .. }
332            | Self::ScheduleResumed { envelope, .. }
333            | Self::ScheduleDeleted { envelope, .. }
334            | Self::ScheduleTriggered { envelope, .. } => envelope,
335        }
336    }
337
338    /// Returns the monotonic sequence number recorded for this event.
339    #[must_use]
340    pub const fn seq(&self) -> u64 {
341        self.envelope().seq
342    }
343
344    /// Returns the deterministic recorded timestamp for this event.
345    #[must_use]
346    pub const fn recorded_at(&self) -> &DateTime<Utc> {
347        &self.envelope().recorded_at
348    }
349
350    /// Returns the workflow history that owns this event.
351    #[must_use]
352    pub const fn workflow_id(&self) -> &WorkflowId {
353        &self.envelope().workflow_id
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::collections::HashMap;
360
361    use chrono::{DateTime, Utc};
362    use serde_json::json;
363
364    use super::{Event, EventEnvelope};
365    use crate::{
366        ActivityError, ActivityErrorKind, ActivityId, CatchUpPolicy, OverlapPolicy, Payload, RunId,
367        ScheduleConfig, ScheduleId, SearchAttributeValue, TimerId, TriggerSpec, WorkflowError,
368        WorkflowId,
369    };
370
371    fn recorded_at() -> DateTime<Utc> {
372        DateTime::from_timestamp(1_700_000_000, 123_000_000).unwrap_or_default()
373    }
374
375    fn envelope(seq: u64) -> EventEnvelope {
376        EventEnvelope {
377            seq,
378            recorded_at: recorded_at(),
379            workflow_id: WorkflowId::new(uuid::Uuid::nil()),
380        }
381    }
382
383    fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
384        Payload::from_json(&json!({ "label": label }))
385    }
386
387    fn schedule_config(label: &str) -> Result<ScheduleConfig, crate::PayloadError> {
388        Ok(ScheduleConfig {
389            trigger: TriggerSpec::Cron {
390                expression: String::from("0 0 * * *"),
391            },
392            overlap_policy: OverlapPolicy::Skip,
393            catch_up_policy: CatchUpPolicy::One,
394            workflow_type: String::from("checkout"),
395            input: payload(label)?,
396            search_attributes: HashMap::from([(
397                String::from("aion.namespace"),
398                crate::SearchAttributeValue::String(String::from("tenant-a")),
399            )]),
400        })
401    }
402
403    fn workflow_error(message: &str) -> WorkflowError {
404        WorkflowError {
405            message: String::from(message),
406            details: None,
407        }
408    }
409
410    fn activity_error(kind: ActivityErrorKind, message: &str) -> ActivityError {
411        ActivityError {
412            kind,
413            message: String::from(message),
414            details: None,
415        }
416    }
417
418    fn round_trip(event: &Event) -> Result<(), serde_json::Error> {
419        let json = serde_json::to_string(event)?;
420        let decoded = serde_json::from_str::<Event>(&json)?;
421        assert_eq!(*event, decoded);
422        Ok(())
423    }
424
425    #[test]
426    fn event_accessors_return_envelope_fields() -> Result<(), Box<dyn std::error::Error>> {
427        let workflow_id = WorkflowId::new_v4();
428        let recorded_at = recorded_at();
429        let envelope = EventEnvelope {
430            seq: 17,
431            recorded_at,
432            workflow_id: workflow_id.clone(),
433        };
434        let event = Event::WorkflowStarted {
435            envelope,
436            workflow_type: String::from("checkout"),
437            input: payload("input")?,
438            run_id: RunId::new(uuid::Uuid::from_u128(1)),
439            parent_run_id: None,
440        };
441
442        assert_eq!(event.seq(), 17);
443        assert_eq!(event.recorded_at(), &recorded_at);
444        assert_eq!(event.workflow_id(), &workflow_id);
445        Ok(())
446    }
447
448    #[test]
449    fn events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
450        let child_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(1));
451        let fire_at = DateTime::from_timestamp(1_700_000_100, 0).unwrap_or_default();
452        let events = vec![
453            Event::WorkflowStarted {
454                envelope: envelope(1),
455                workflow_type: String::from("checkout"),
456                input: payload("workflow-input")?,
457                run_id: RunId::new(uuid::Uuid::from_u128(1)),
458                parent_run_id: None,
459            },
460            Event::WorkflowCompleted {
461                envelope: envelope(2),
462                result: payload("workflow-result")?,
463            },
464            Event::WorkflowFailed {
465                envelope: envelope(3),
466                error: workflow_error("workflow failed"),
467            },
468            Event::WorkflowCancelled {
469                envelope: envelope(4),
470                reason: String::from("caller requested cancellation"),
471            },
472            Event::WorkflowTimedOut {
473                envelope: envelope(5),
474                timeout: String::from("execution"),
475            },
476            Event::ActivityScheduled {
477                envelope: envelope(6),
478                activity_id: ActivityId::from_sequence_position(6),
479                activity_type: String::from("charge-card"),
480                input: payload("activity-input")?,
481            },
482            Event::ActivityStarted {
483                envelope: envelope(7),
484                activity_id: ActivityId::from_sequence_position(6),
485            },
486            Event::ActivityCompleted {
487                envelope: envelope(8),
488                activity_id: ActivityId::from_sequence_position(6),
489                result: payload("activity-result")?,
490            },
491            Event::ActivityFailed {
492                envelope: envelope(9),
493                activity_id: ActivityId::from_sequence_position(6),
494                error: activity_error(ActivityErrorKind::Retryable, "temporary outage"),
495                attempt: 1,
496            },
497            Event::ActivityCancelled {
498                envelope: envelope(10),
499                activity_id: ActivityId::from_sequence_position(6),
500            },
501            Event::TimerStarted {
502                envelope: envelope(11),
503                timer_id: TimerId::anonymous(11),
504                fire_at,
505            },
506            Event::TimerFired {
507                envelope: envelope(12),
508                timer_id: TimerId::anonymous(11),
509            },
510            Event::TimerCancelled {
511                envelope: envelope(13),
512                timer_id: TimerId::named("reminder")?,
513            },
514            Event::SignalReceived {
515                envelope: envelope(14),
516                name: String::from("approve"),
517                payload: payload("signal")?,
518            },
519            Event::SignalSent {
520                envelope: envelope(15),
521                target_workflow_id: WorkflowId::new(uuid::Uuid::from_u128(5)),
522                name: String::from("approve"),
523                payload: payload("signal-sent")?,
524            },
525            Event::ChildWorkflowStarted {
526                envelope: envelope(16),
527                child_workflow_id: child_workflow_id.clone(),
528                workflow_type: String::from("fulfillment"),
529                input: payload("child-input")?,
530            },
531            Event::ChildWorkflowCompleted {
532                envelope: envelope(16),
533                child_workflow_id: child_workflow_id.clone(),
534                result: payload("child-result")?,
535            },
536            Event::ChildWorkflowFailed {
537                envelope: envelope(17),
538                child_workflow_id: child_workflow_id.clone(),
539                error: workflow_error("child failed"),
540            },
541            Event::ChildWorkflowCancelled {
542                envelope: envelope(18),
543                child_workflow_id,
544            },
545        ];
546
547        for event in events {
548            round_trip(&event)?;
549        }
550        Ok(())
551    }
552
553    #[test]
554    fn extended_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
555        let schedule_id = ScheduleId::new(uuid::Uuid::from_u128(2));
556        let triggered_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(3));
557        let triggered_run_id = RunId::new(uuid::Uuid::from_u128(4));
558        let events = vec![
559            Event::WorkflowContinuedAsNew {
560                envelope: envelope(19),
561                input: payload("continued-input")?,
562                workflow_type: Some(String::from("checkout-v2")),
563                parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
564            },
565            Event::SearchAttributesUpdated {
566                envelope: envelope(20),
567                workflow_id: WorkflowId::new(uuid::Uuid::nil()),
568                attributes: HashMap::from([(
569                    String::from("customer_id"),
570                    SearchAttributeValue::String(String::from("cust-123")),
571                )]),
572            },
573            Event::ScheduleCreated {
574                envelope: envelope(20),
575                schedule_id: schedule_id.clone(),
576                config: schedule_config("schedule-created")?,
577            },
578            Event::ScheduleUpdated {
579                envelope: envelope(21),
580                schedule_id: schedule_id.clone(),
581                config: schedule_config("schedule-updated")?,
582            },
583            Event::SchedulePaused {
584                envelope: envelope(22),
585                schedule_id: schedule_id.clone(),
586            },
587            Event::ScheduleResumed {
588                envelope: envelope(23),
589                schedule_id: schedule_id.clone(),
590            },
591            Event::ScheduleDeleted {
592                envelope: envelope(24),
593                schedule_id: schedule_id.clone(),
594            },
595            Event::ScheduleTriggered {
596                envelope: envelope(25),
597                schedule_id,
598                workflow_id: triggered_workflow_id,
599                run_id: triggered_run_id,
600            },
601        ];
602
603        for event in events {
604            round_trip(&event)?;
605        }
606        Ok(())
607    }
608}