Skip to main content

aion_core/
event.rs

1//! Workflow history events and their deterministic recording envelope.
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::{
9    ActivityError, ActivityId, PackageVersion, Payload, RunId, ScheduleConfig, ScheduleId,
10    SearchAttributeValue, TimerId, WorkflowError, WorkflowId,
11};
12
13/// Metadata recorded with every workflow history event.
14#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
15pub struct EventEnvelope {
16    /// Monotonic sequence number within the owning workflow history.
17    pub seq: u64,
18    /// Recorded UTC timestamp for this event.
19    ///
20    /// This timestamp is the determinism source for `workflow.now`; replay must use the recorded
21    /// value rather than consulting wall-clock time.
22    pub recorded_at: DateTime<Utc>,
23    /// Workflow history that owns this event.
24    pub workflow_id: WorkflowId,
25}
26
27/// A recorded workflow history event.
28///
29/// User data is carried as opaque [`Payload`] values, while failures use the closed workflow and
30/// activity error types from this crate.
31#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq)]
32#[serde(tag = "type", content = "data")]
33pub enum Event {
34    /// A workflow execution started with a type name and input payload.
35    WorkflowStarted {
36        /// Recording metadata for this event.
37        envelope: EventEnvelope,
38        /// Workflow type selected by the caller.
39        workflow_type: String,
40        /// Opaque workflow input payload.
41        input: Payload,
42        /// Concrete run identifier started by this event.
43        run_id: RunId,
44        /// Parent run that continued as this run, when this start is part of a
45        /// continue-as-new chain.
46        parent_run_id: Option<RunId>,
47        /// Package version this run was resolved against at record time.
48        ///
49        /// Recovery and replay resolve workflow code from this recorded
50        /// version; they never re-resolve a "latest" version.
51        package_version: PackageVersion,
52    },
53    /// A workflow execution completed successfully; this terminal event projects to Completed.
54    WorkflowCompleted {
55        /// Recording metadata for this event.
56        envelope: EventEnvelope,
57        /// Opaque workflow result payload.
58        result: Payload,
59    },
60    /// A workflow execution failed terminally; this terminal event projects to Failed.
61    WorkflowFailed {
62        /// Recording metadata for this event.
63        envelope: EventEnvelope,
64        /// Terminal workflow failure.
65        error: WorkflowError,
66    },
67    /// A workflow execution was cancelled; this terminal event projects to Cancelled.
68    WorkflowCancelled {
69        /// Recording metadata for this event.
70        envelope: EventEnvelope,
71        /// Human-readable cancellation reason.
72        reason: String,
73    },
74    /// A workflow execution timed out; this terminal event projects to `TimedOut`.
75    WorkflowTimedOut {
76        /// Recording metadata for this event.
77        envelope: EventEnvelope,
78        /// Descriptor identifying the timeout that elapsed.
79        ///
80        /// Intentionally stringly-typed: the closed set of timeout kinds is defined by cluster AT
81        /// (timers and signals), not by the core event model.
82        timeout: String,
83    },
84    /// A workflow execution continued as a new run; this terminal event projects to
85    /// `ContinuedAsNew`.
86    WorkflowContinuedAsNew {
87        /// Recording metadata for this event.
88        envelope: EventEnvelope,
89        /// Opaque workflow input payload carried into the new run.
90        input: Payload,
91        /// Workflow type override for the new run, when migration changes the workflow type.
92        ///
93        /// When absent, the new run uses the current workflow type.
94        workflow_type: Option<String>,
95        /// Run identifier for the current run that is being continued.
96        parent_run_id: RunId,
97    },
98    /// Workflow search attributes were updated for visibility and query projection.
99    SearchAttributesUpdated {
100        /// Recording metadata for this event.
101        envelope: EventEnvelope,
102        /// Workflow whose search attributes changed.
103        workflow_id: WorkflowId,
104        /// Updated search attributes keyed by attribute name.
105        attributes: HashMap<String, SearchAttributeValue>,
106    },
107    /// An activity was scheduled by workflow code.
108    ActivityScheduled {
109        /// Recording metadata for this event.
110        envelope: EventEnvelope,
111        /// Deterministic activity identifier derived from the scheduling sequence position.
112        activity_id: ActivityId,
113        /// Activity type selected by workflow code.
114        activity_type: String,
115        /// Opaque activity input payload.
116        input: Payload,
117    },
118    /// An activity worker started executing an activity attempt.
119    ActivityStarted {
120        /// Recording metadata for this event.
121        envelope: EventEnvelope,
122        /// Activity being executed.
123        activity_id: ActivityId,
124    },
125    /// An activity completed successfully.
126    ActivityCompleted {
127        /// Recording metadata for this event.
128        envelope: EventEnvelope,
129        /// Activity that produced the result.
130        activity_id: ActivityId,
131        /// Opaque activity result payload.
132        result: Payload,
133    },
134    /// An activity attempt failed.
135    ///
136    /// The `attempt` field together with [`ActivityError`]'s retryable or terminal classification
137    /// lets replay distinguish a retryable interim failure from a terminal one for the same
138    /// [`ActivityId`].
139    ActivityFailed {
140        /// Recording metadata for this event.
141        envelope: EventEnvelope,
142        /// Activity whose attempt failed.
143        activity_id: ActivityId,
144        /// Classified activity failure.
145        error: ActivityError,
146        /// One-based activity attempt number that produced this failure.
147        attempt: u32,
148    },
149    /// An activity was cancelled as an explicit cancellation outcome.
150    ActivityCancelled {
151        /// Recording metadata for this event.
152        envelope: EventEnvelope,
153        /// Activity that was cancelled.
154        activity_id: ActivityId,
155    },
156    /// A timer was scheduled to fire at a deterministic timestamp.
157    TimerStarted {
158        /// Recording metadata for this event.
159        envelope: EventEnvelope,
160        /// Timer selected by workflow code or assigned by the engine.
161        timer_id: TimerId,
162        /// UTC timestamp at which the timer becomes eligible to fire.
163        fire_at: DateTime<Utc>,
164    },
165    /// A timer fired.
166    TimerFired {
167        /// Recording metadata for this event.
168        envelope: EventEnvelope,
169        /// Timer that fired.
170        timer_id: TimerId,
171    },
172    /// A timer was cancelled as an explicit cancellation outcome.
173    TimerCancelled {
174        /// Recording metadata for this event.
175        envelope: EventEnvelope,
176        /// Timer that was cancelled.
177        timer_id: TimerId,
178    },
179    /// A `with_timeout` operation reached a durable terminal outcome.
180    WithTimeoutCompleted {
181        /// Recording metadata for this event.
182        envelope: EventEnvelope,
183        /// Timer that bounded the operation.
184        timer_id: TimerId,
185        /// Recorded timeout outcome.
186        outcome: WithTimeoutOutcome,
187        /// JSON-encoded BEAM term payload for completed operation results.
188        result: Option<Payload>,
189    },
190    /// A signal was delivered to the workflow.
191    SignalReceived {
192        /// Recording metadata for this event.
193        envelope: EventEnvelope,
194        /// Signal name selected by the sender.
195        name: String,
196        /// Opaque signal payload.
197        payload: Payload,
198    },
199    /// A signal was sent by this workflow to another workflow.
200    SignalSent {
201        /// Recording metadata for this event.
202        envelope: EventEnvelope,
203        /// Target workflow identifier selected by workflow code.
204        target_workflow_id: WorkflowId,
205        /// Signal name selected by workflow code.
206        name: String,
207        /// Opaque signal payload.
208        payload: Payload,
209    },
210    /// A child workflow was started.
211    ChildWorkflowStarted {
212        /// Recording metadata for this event.
213        envelope: EventEnvelope,
214        /// Child workflow identifier.
215        child_workflow_id: WorkflowId,
216        /// Child workflow type selected by the parent.
217        workflow_type: String,
218        /// Opaque child workflow input payload.
219        input: Payload,
220        /// Package version resolved for the child at record time.
221        ///
222        /// The crash-repair sweep and the child's own start use exactly this
223        /// recorded version, so the crash path resolves identically to the
224        /// crash-free path.
225        package_version: PackageVersion,
226    },
227    /// A child workflow completed successfully.
228    ChildWorkflowCompleted {
229        /// Recording metadata for this event.
230        envelope: EventEnvelope,
231        /// Child workflow that produced the result.
232        child_workflow_id: WorkflowId,
233        /// Opaque child workflow result payload.
234        result: Payload,
235    },
236    /// A child workflow failed terminally.
237    ChildWorkflowFailed {
238        /// Recording metadata for this event.
239        envelope: EventEnvelope,
240        /// Child workflow that failed.
241        child_workflow_id: WorkflowId,
242        /// Terminal child workflow failure.
243        error: WorkflowError,
244    },
245    /// A child workflow was cancelled as an explicit cancellation outcome.
246    ChildWorkflowCancelled {
247        /// Recording metadata for this event.
248        envelope: EventEnvelope,
249        /// Child workflow that was cancelled.
250        child_workflow_id: WorkflowId,
251    },
252    /// A schedule resource was created.
253    ScheduleCreated {
254        /// Recording metadata for this event.
255        envelope: EventEnvelope,
256        /// Schedule resource that was created.
257        schedule_id: ScheduleId,
258        /// Persisted schedule configuration.
259        config: ScheduleConfig,
260    },
261    /// A schedule resource was updated.
262    ScheduleUpdated {
263        /// Recording metadata for this event.
264        envelope: EventEnvelope,
265        /// Schedule resource that was updated.
266        schedule_id: ScheduleId,
267        /// Updated schedule configuration.
268        config: ScheduleConfig,
269    },
270    /// A schedule resource was paused.
271    SchedulePaused {
272        /// Recording metadata for this event.
273        envelope: EventEnvelope,
274        /// Schedule resource that was paused.
275        schedule_id: ScheduleId,
276    },
277    /// A paused schedule resource was resumed.
278    ScheduleResumed {
279        /// Recording metadata for this event.
280        envelope: EventEnvelope,
281        /// Schedule resource that was resumed.
282        schedule_id: ScheduleId,
283    },
284    /// A schedule resource was deleted.
285    ScheduleDeleted {
286        /// Recording metadata for this event.
287        envelope: EventEnvelope,
288        /// Schedule resource that was deleted.
289        schedule_id: ScheduleId,
290    },
291    /// A schedule tick started a workflow execution.
292    ScheduleTriggered {
293        /// Recording metadata for this event.
294        envelope: EventEnvelope,
295        /// Schedule resource that fired.
296        schedule_id: ScheduleId,
297        /// Workflow execution started by the schedule tick.
298        workflow_id: WorkflowId,
299        /// Run started by the schedule tick.
300        run_id: RunId,
301    },
302}
303
304/// Durable terminal outcome for a `with_timeout` operation.
305#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
306pub enum WithTimeoutOutcome {
307    /// The operation closure returned before the deadline.
308    OperationCompleted,
309    /// The deadline fired before the operation completed.
310    TimedOut,
311}
312
313impl Event {
314    /// Returns the envelope recorded with this event.
315    #[must_use]
316    pub const fn envelope(&self) -> &EventEnvelope {
317        match self {
318            Self::WorkflowStarted { envelope, .. }
319            | Self::WorkflowCompleted { envelope, .. }
320            | Self::WorkflowFailed { envelope, .. }
321            | Self::WorkflowCancelled { envelope, .. }
322            | Self::WorkflowTimedOut { envelope, .. }
323            | Self::WorkflowContinuedAsNew { envelope, .. }
324            | Self::SearchAttributesUpdated { envelope, .. }
325            | Self::ActivityScheduled { envelope, .. }
326            | Self::ActivityStarted { envelope, .. }
327            | Self::ActivityCompleted { envelope, .. }
328            | Self::ActivityFailed { envelope, .. }
329            | Self::ActivityCancelled { envelope, .. }
330            | Self::TimerStarted { envelope, .. }
331            | Self::TimerFired { envelope, .. }
332            | Self::TimerCancelled { envelope, .. }
333            | Self::WithTimeoutCompleted { envelope, .. }
334            | Self::SignalReceived { envelope, .. }
335            | Self::SignalSent { envelope, .. }
336            | Self::ChildWorkflowStarted { envelope, .. }
337            | Self::ChildWorkflowCompleted { envelope, .. }
338            | Self::ChildWorkflowFailed { envelope, .. }
339            | Self::ChildWorkflowCancelled { envelope, .. }
340            | Self::ScheduleCreated { envelope, .. }
341            | Self::ScheduleUpdated { envelope, .. }
342            | Self::SchedulePaused { envelope, .. }
343            | Self::ScheduleResumed { envelope, .. }
344            | Self::ScheduleDeleted { envelope, .. }
345            | Self::ScheduleTriggered { envelope, .. } => envelope,
346        }
347    }
348
349    /// Returns the monotonic sequence number recorded for this event.
350    #[must_use]
351    pub const fn seq(&self) -> u64 {
352        self.envelope().seq
353    }
354
355    /// Returns the deterministic recorded timestamp for this event.
356    #[must_use]
357    pub const fn recorded_at(&self) -> &DateTime<Utc> {
358        &self.envelope().recorded_at
359    }
360
361    /// Returns the workflow history that owns this event.
362    #[must_use]
363    pub const fn workflow_id(&self) -> &WorkflowId {
364        &self.envelope().workflow_id
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use std::collections::HashMap;
371
372    use chrono::{DateTime, Utc};
373    use serde_json::json;
374
375    use super::{Event, EventEnvelope};
376    use crate::{
377        ActivityError, ActivityErrorKind, ActivityId, CatchUpPolicy, OverlapPolicy, PackageVersion,
378        Payload, RunId, ScheduleConfig, ScheduleId, SearchAttributeValue, TimerId, TriggerSpec,
379        WorkflowError, WorkflowId,
380    };
381
382    fn package_version() -> PackageVersion {
383        PackageVersion::new("a".repeat(64))
384    }
385
386    fn recorded_at() -> DateTime<Utc> {
387        DateTime::from_timestamp(1_700_000_000, 123_000_000).unwrap_or_default()
388    }
389
390    fn envelope(seq: u64) -> EventEnvelope {
391        EventEnvelope {
392            seq,
393            recorded_at: recorded_at(),
394            workflow_id: WorkflowId::new(uuid::Uuid::nil()),
395        }
396    }
397
398    fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
399        Payload::from_json(&json!({ "label": label }))
400    }
401
402    fn schedule_config(label: &str) -> Result<ScheduleConfig, crate::PayloadError> {
403        Ok(ScheduleConfig {
404            trigger: TriggerSpec::Cron {
405                expression: String::from("0 0 * * *"),
406            },
407            overlap_policy: OverlapPolicy::Skip,
408            catch_up_policy: CatchUpPolicy::One,
409            workflow_type: String::from("checkout"),
410            input: payload(label)?,
411            search_attributes: HashMap::from([(
412                String::from("aion.namespace"),
413                crate::SearchAttributeValue::String(String::from("tenant-a")),
414            )]),
415        })
416    }
417
418    fn workflow_error(message: &str) -> WorkflowError {
419        WorkflowError {
420            message: String::from(message),
421            details: None,
422        }
423    }
424
425    fn activity_error(kind: ActivityErrorKind, message: &str) -> ActivityError {
426        ActivityError {
427            kind,
428            message: String::from(message),
429            details: None,
430        }
431    }
432
433    fn round_trip(event: &Event) -> Result<(), serde_json::Error> {
434        let json = serde_json::to_string(event)?;
435        let decoded = serde_json::from_str::<Event>(&json)?;
436        assert_eq!(*event, decoded);
437        Ok(())
438    }
439
440    #[test]
441    fn event_accessors_return_envelope_fields() -> Result<(), Box<dyn std::error::Error>> {
442        let workflow_id = WorkflowId::new_v4();
443        let recorded_at = recorded_at();
444        let envelope = EventEnvelope {
445            seq: 17,
446            recorded_at,
447            workflow_id: workflow_id.clone(),
448        };
449        let event = Event::WorkflowStarted {
450            envelope,
451            workflow_type: String::from("checkout"),
452            input: payload("input")?,
453            run_id: RunId::new(uuid::Uuid::from_u128(1)),
454            parent_run_id: None,
455            package_version: package_version(),
456        };
457
458        assert_eq!(event.seq(), 17);
459        assert_eq!(event.recorded_at(), &recorded_at);
460        assert_eq!(event.workflow_id(), &workflow_id);
461        Ok(())
462    }
463
464    #[test]
465    fn events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
466        let fire_at = DateTime::from_timestamp(1_700_000_100, 0).unwrap_or_default();
467        let events = vec![
468            Event::WorkflowStarted {
469                envelope: envelope(1),
470                workflow_type: String::from("checkout"),
471                input: payload("workflow-input")?,
472                run_id: RunId::new(uuid::Uuid::from_u128(1)),
473                parent_run_id: None,
474                package_version: package_version(),
475            },
476            Event::WorkflowCompleted {
477                envelope: envelope(2),
478                result: payload("workflow-result")?,
479            },
480            Event::WorkflowFailed {
481                envelope: envelope(3),
482                error: workflow_error("workflow failed"),
483            },
484            Event::WorkflowCancelled {
485                envelope: envelope(4),
486                reason: String::from("caller requested cancellation"),
487            },
488            Event::WorkflowTimedOut {
489                envelope: envelope(5),
490                timeout: String::from("execution"),
491            },
492            Event::ActivityScheduled {
493                envelope: envelope(6),
494                activity_id: ActivityId::from_sequence_position(6),
495                activity_type: String::from("charge-card"),
496                input: payload("activity-input")?,
497            },
498            Event::ActivityStarted {
499                envelope: envelope(7),
500                activity_id: ActivityId::from_sequence_position(6),
501            },
502            Event::ActivityCompleted {
503                envelope: envelope(8),
504                activity_id: ActivityId::from_sequence_position(6),
505                result: payload("activity-result")?,
506            },
507            Event::ActivityFailed {
508                envelope: envelope(9),
509                activity_id: ActivityId::from_sequence_position(6),
510                error: activity_error(ActivityErrorKind::Retryable, "temporary outage"),
511                attempt: 1,
512            },
513            Event::ActivityCancelled {
514                envelope: envelope(10),
515                activity_id: ActivityId::from_sequence_position(6),
516            },
517            Event::TimerStarted {
518                envelope: envelope(11),
519                timer_id: TimerId::anonymous(11),
520                fire_at,
521            },
522            Event::TimerFired {
523                envelope: envelope(12),
524                timer_id: TimerId::anonymous(11),
525            },
526            Event::TimerCancelled {
527                envelope: envelope(13),
528                timer_id: TimerId::named("reminder")?,
529            },
530            Event::SignalReceived {
531                envelope: envelope(14),
532                name: String::from("approve"),
533                payload: payload("signal")?,
534            },
535            Event::SignalSent {
536                envelope: envelope(15),
537                target_workflow_id: WorkflowId::new(uuid::Uuid::from_u128(5)),
538                name: String::from("approve"),
539                payload: payload("signal-sent")?,
540            },
541        ];
542
543        for event in events {
544            round_trip(&event)?;
545        }
546        Ok(())
547    }
548
549    #[test]
550    fn child_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
551        let child_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(1));
552        let events = vec![
553            Event::ChildWorkflowStarted {
554                envelope: envelope(16),
555                child_workflow_id: child_workflow_id.clone(),
556                workflow_type: String::from("fulfillment"),
557                input: payload("child-input")?,
558                package_version: package_version(),
559            },
560            Event::ChildWorkflowCompleted {
561                envelope: envelope(16),
562                child_workflow_id: child_workflow_id.clone(),
563                result: payload("child-result")?,
564            },
565            Event::ChildWorkflowFailed {
566                envelope: envelope(17),
567                child_workflow_id: child_workflow_id.clone(),
568                error: workflow_error("child failed"),
569            },
570            Event::ChildWorkflowCancelled {
571                envelope: envelope(18),
572                child_workflow_id,
573            },
574        ];
575
576        for event in events {
577            round_trip(&event)?;
578        }
579        Ok(())
580    }
581
582    #[test]
583    fn extended_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
584        let schedule_id = ScheduleId::new(uuid::Uuid::from_u128(2));
585        let triggered_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(3));
586        let triggered_run_id = RunId::new(uuid::Uuid::from_u128(4));
587        let events = vec![
588            Event::WorkflowContinuedAsNew {
589                envelope: envelope(19),
590                input: payload("continued-input")?,
591                workflow_type: Some(String::from("checkout-v2")),
592                parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
593            },
594            Event::SearchAttributesUpdated {
595                envelope: envelope(20),
596                workflow_id: WorkflowId::new(uuid::Uuid::nil()),
597                attributes: HashMap::from([(
598                    String::from("customer_id"),
599                    SearchAttributeValue::String(String::from("cust-123")),
600                )]),
601            },
602            Event::ScheduleCreated {
603                envelope: envelope(20),
604                schedule_id: schedule_id.clone(),
605                config: schedule_config("schedule-created")?,
606            },
607            Event::ScheduleUpdated {
608                envelope: envelope(21),
609                schedule_id: schedule_id.clone(),
610                config: schedule_config("schedule-updated")?,
611            },
612            Event::SchedulePaused {
613                envelope: envelope(22),
614                schedule_id: schedule_id.clone(),
615            },
616            Event::ScheduleResumed {
617                envelope: envelope(23),
618                schedule_id: schedule_id.clone(),
619            },
620            Event::ScheduleDeleted {
621                envelope: envelope(24),
622                schedule_id: schedule_id.clone(),
623            },
624            Event::ScheduleTriggered {
625                envelope: envelope(25),
626                schedule_id,
627                workflow_id: triggered_workflow_id,
628                run_id: triggered_run_id,
629            },
630        ];
631
632        for event in events {
633            round_trip(&event)?;
634        }
635        Ok(())
636    }
637}