aion-core 0.2.0

Pure domain model and shared vocabulary for Aion durable workflows.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
//! Workflow history events and their deterministic recording envelope.

use std::collections::HashMap;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

use crate::{
    ActivityError, ActivityId, PackageVersion, Payload, RunId, ScheduleConfig, ScheduleId,
    SearchAttributeValue, TimerId, WorkflowError, WorkflowId,
};

/// Metadata recorded with every workflow history event.
#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
pub struct EventEnvelope {
    /// Monotonic sequence number within the owning workflow history.
    pub seq: u64,
    /// Recorded UTC timestamp for this event.
    ///
    /// This timestamp is the determinism source for `workflow.now`; replay must use the recorded
    /// value rather than consulting wall-clock time.
    pub recorded_at: DateTime<Utc>,
    /// Workflow history that owns this event.
    pub workflow_id: WorkflowId,
}

/// A recorded workflow history event.
///
/// User data is carried as opaque [`Payload`] values, while failures use the closed workflow and
/// activity error types from this crate.
#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq)]
#[serde(tag = "type", content = "data")]
pub enum Event {
    /// A workflow execution started with a type name and input payload.
    WorkflowStarted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Workflow type selected by the caller.
        workflow_type: String,
        /// Opaque workflow input payload.
        input: Payload,
        /// Concrete run identifier started by this event.
        run_id: RunId,
        /// Parent run that continued as this run, when this start is part of a
        /// continue-as-new chain.
        parent_run_id: Option<RunId>,
        /// Package version this run was resolved against at record time.
        ///
        /// Recovery and replay resolve workflow code from this recorded
        /// version; they never re-resolve a "latest" version.
        package_version: PackageVersion,
    },
    /// A workflow execution completed successfully; this terminal event projects to Completed.
    WorkflowCompleted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Opaque workflow result payload.
        result: Payload,
    },
    /// A workflow execution failed terminally; this terminal event projects to Failed.
    WorkflowFailed {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Terminal workflow failure.
        error: WorkflowError,
    },
    /// A workflow execution was cancelled; this terminal event projects to Cancelled.
    WorkflowCancelled {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Human-readable cancellation reason.
        reason: String,
    },
    /// A workflow execution timed out; this terminal event projects to `TimedOut`.
    WorkflowTimedOut {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Descriptor identifying the timeout that elapsed.
        ///
        /// Intentionally stringly-typed: the closed set of timeout kinds is defined by cluster AT
        /// (timers and signals), not by the core event model.
        timeout: String,
    },
    /// A workflow execution continued as a new run; this terminal event projects to
    /// `ContinuedAsNew`.
    WorkflowContinuedAsNew {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Opaque workflow input payload carried into the new run.
        input: Payload,
        /// Workflow type override for the new run, when migration changes the workflow type.
        ///
        /// When absent, the new run uses the current workflow type.
        workflow_type: Option<String>,
        /// Run identifier for the current run that is being continued.
        parent_run_id: RunId,
    },
    /// Workflow search attributes were updated for visibility and query projection.
    SearchAttributesUpdated {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Workflow whose search attributes changed.
        workflow_id: WorkflowId,
        /// Updated search attributes keyed by attribute name.
        attributes: HashMap<String, SearchAttributeValue>,
    },
    /// An activity was scheduled by workflow code.
    ActivityScheduled {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Deterministic activity identifier derived from the scheduling sequence position.
        activity_id: ActivityId,
        /// Activity type selected by workflow code.
        activity_type: String,
        /// Opaque activity input payload.
        input: Payload,
    },
    /// An activity worker started executing an activity attempt.
    ActivityStarted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Activity being executed.
        activity_id: ActivityId,
    },
    /// An activity completed successfully.
    ActivityCompleted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Activity that produced the result.
        activity_id: ActivityId,
        /// Opaque activity result payload.
        result: Payload,
    },
    /// An activity attempt failed.
    ///
    /// The `attempt` field together with [`ActivityError`]'s retryable or terminal classification
    /// lets replay distinguish a retryable interim failure from a terminal one for the same
    /// [`ActivityId`].
    ActivityFailed {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Activity whose attempt failed.
        activity_id: ActivityId,
        /// Classified activity failure.
        error: ActivityError,
        /// One-based activity attempt number that produced this failure.
        attempt: u32,
    },
    /// An activity was cancelled as an explicit cancellation outcome.
    ActivityCancelled {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Activity that was cancelled.
        activity_id: ActivityId,
    },
    /// A timer was scheduled to fire at a deterministic timestamp.
    TimerStarted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Timer selected by workflow code or assigned by the engine.
        timer_id: TimerId,
        /// UTC timestamp at which the timer becomes eligible to fire.
        fire_at: DateTime<Utc>,
    },
    /// A timer fired.
    TimerFired {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Timer that fired.
        timer_id: TimerId,
    },
    /// A timer was cancelled as an explicit cancellation outcome.
    TimerCancelled {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Timer that was cancelled.
        timer_id: TimerId,
    },
    /// A `with_timeout` operation reached a durable terminal outcome.
    WithTimeoutCompleted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Timer that bounded the operation.
        timer_id: TimerId,
        /// Recorded timeout outcome.
        outcome: WithTimeoutOutcome,
        /// JSON-encoded BEAM term payload for completed operation results.
        result: Option<Payload>,
    },
    /// A signal was delivered to the workflow.
    SignalReceived {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Signal name selected by the sender.
        name: String,
        /// Opaque signal payload.
        payload: Payload,
    },
    /// A signal was sent by this workflow to another workflow.
    SignalSent {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Target workflow identifier selected by workflow code.
        target_workflow_id: WorkflowId,
        /// Signal name selected by workflow code.
        name: String,
        /// Opaque signal payload.
        payload: Payload,
    },
    /// A child workflow was started.
    ChildWorkflowStarted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Child workflow identifier.
        child_workflow_id: WorkflowId,
        /// Child workflow type selected by the parent.
        workflow_type: String,
        /// Opaque child workflow input payload.
        input: Payload,
        /// Package version resolved for the child at record time.
        ///
        /// The crash-repair sweep and the child's own start use exactly this
        /// recorded version, so the crash path resolves identically to the
        /// crash-free path.
        package_version: PackageVersion,
    },
    /// A child workflow completed successfully.
    ChildWorkflowCompleted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Child workflow that produced the result.
        child_workflow_id: WorkflowId,
        /// Opaque child workflow result payload.
        result: Payload,
    },
    /// A child workflow failed terminally.
    ChildWorkflowFailed {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Child workflow that failed.
        child_workflow_id: WorkflowId,
        /// Terminal child workflow failure.
        error: WorkflowError,
    },
    /// A child workflow was cancelled as an explicit cancellation outcome.
    ChildWorkflowCancelled {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Child workflow that was cancelled.
        child_workflow_id: WorkflowId,
    },
    /// A schedule resource was created.
    ScheduleCreated {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Schedule resource that was created.
        schedule_id: ScheduleId,
        /// Persisted schedule configuration.
        config: ScheduleConfig,
    },
    /// A schedule resource was updated.
    ScheduleUpdated {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Schedule resource that was updated.
        schedule_id: ScheduleId,
        /// Updated schedule configuration.
        config: ScheduleConfig,
    },
    /// A schedule resource was paused.
    SchedulePaused {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Schedule resource that was paused.
        schedule_id: ScheduleId,
    },
    /// A paused schedule resource was resumed.
    ScheduleResumed {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Schedule resource that was resumed.
        schedule_id: ScheduleId,
    },
    /// A schedule resource was deleted.
    ScheduleDeleted {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Schedule resource that was deleted.
        schedule_id: ScheduleId,
    },
    /// A schedule tick started a workflow execution.
    ScheduleTriggered {
        /// Recording metadata for this event.
        envelope: EventEnvelope,
        /// Schedule resource that fired.
        schedule_id: ScheduleId,
        /// Workflow execution started by the schedule tick.
        workflow_id: WorkflowId,
        /// Run started by the schedule tick.
        run_id: RunId,
    },
}

/// Durable terminal outcome for a `with_timeout` operation.
#[derive(Serialize, Deserialize, ts_rs::TS, Clone, Debug, PartialEq, Eq)]
pub enum WithTimeoutOutcome {
    /// The operation closure returned before the deadline.
    OperationCompleted,
    /// The deadline fired before the operation completed.
    TimedOut,
}

impl Event {
    /// Returns the envelope recorded with this event.
    #[must_use]
    pub const fn envelope(&self) -> &EventEnvelope {
        match self {
            Self::WorkflowStarted { envelope, .. }
            | Self::WorkflowCompleted { envelope, .. }
            | Self::WorkflowFailed { envelope, .. }
            | Self::WorkflowCancelled { envelope, .. }
            | Self::WorkflowTimedOut { envelope, .. }
            | Self::WorkflowContinuedAsNew { envelope, .. }
            | Self::SearchAttributesUpdated { envelope, .. }
            | Self::ActivityScheduled { envelope, .. }
            | Self::ActivityStarted { envelope, .. }
            | Self::ActivityCompleted { envelope, .. }
            | Self::ActivityFailed { envelope, .. }
            | Self::ActivityCancelled { envelope, .. }
            | Self::TimerStarted { envelope, .. }
            | Self::TimerFired { envelope, .. }
            | Self::TimerCancelled { envelope, .. }
            | Self::WithTimeoutCompleted { envelope, .. }
            | Self::SignalReceived { envelope, .. }
            | Self::SignalSent { envelope, .. }
            | Self::ChildWorkflowStarted { envelope, .. }
            | Self::ChildWorkflowCompleted { envelope, .. }
            | Self::ChildWorkflowFailed { envelope, .. }
            | Self::ChildWorkflowCancelled { envelope, .. }
            | Self::ScheduleCreated { envelope, .. }
            | Self::ScheduleUpdated { envelope, .. }
            | Self::SchedulePaused { envelope, .. }
            | Self::ScheduleResumed { envelope, .. }
            | Self::ScheduleDeleted { envelope, .. }
            | Self::ScheduleTriggered { envelope, .. } => envelope,
        }
    }

    /// Returns the monotonic sequence number recorded for this event.
    #[must_use]
    pub const fn seq(&self) -> u64 {
        self.envelope().seq
    }

    /// Returns the deterministic recorded timestamp for this event.
    #[must_use]
    pub const fn recorded_at(&self) -> &DateTime<Utc> {
        &self.envelope().recorded_at
    }

    /// Returns the workflow history that owns this event.
    #[must_use]
    pub const fn workflow_id(&self) -> &WorkflowId {
        &self.envelope().workflow_id
    }
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;

    use chrono::{DateTime, Utc};
    use serde_json::json;

    use super::{Event, EventEnvelope};
    use crate::{
        ActivityError, ActivityErrorKind, ActivityId, CatchUpPolicy, OverlapPolicy, PackageVersion,
        Payload, RunId, ScheduleConfig, ScheduleId, SearchAttributeValue, TimerId, TriggerSpec,
        WorkflowError, WorkflowId,
    };

    fn package_version() -> PackageVersion {
        PackageVersion::new("a".repeat(64))
    }

    fn recorded_at() -> DateTime<Utc> {
        DateTime::from_timestamp(1_700_000_000, 123_000_000).unwrap_or_default()
    }

    fn envelope(seq: u64) -> EventEnvelope {
        EventEnvelope {
            seq,
            recorded_at: recorded_at(),
            workflow_id: WorkflowId::new(uuid::Uuid::nil()),
        }
    }

    fn payload(label: &str) -> Result<Payload, crate::PayloadError> {
        Payload::from_json(&json!({ "label": label }))
    }

    fn schedule_config(label: &str) -> Result<ScheduleConfig, crate::PayloadError> {
        Ok(ScheduleConfig {
            trigger: TriggerSpec::Cron {
                expression: String::from("0 0 * * *"),
            },
            overlap_policy: OverlapPolicy::Skip,
            catch_up_policy: CatchUpPolicy::One,
            workflow_type: String::from("checkout"),
            input: payload(label)?,
            search_attributes: HashMap::from([(
                String::from("aion.namespace"),
                crate::SearchAttributeValue::String(String::from("tenant-a")),
            )]),
        })
    }

    fn workflow_error(message: &str) -> WorkflowError {
        WorkflowError {
            message: String::from(message),
            details: None,
        }
    }

    fn activity_error(kind: ActivityErrorKind, message: &str) -> ActivityError {
        ActivityError {
            kind,
            message: String::from(message),
            details: None,
        }
    }

    fn round_trip(event: &Event) -> Result<(), serde_json::Error> {
        let json = serde_json::to_string(event)?;
        let decoded = serde_json::from_str::<Event>(&json)?;
        assert_eq!(*event, decoded);
        Ok(())
    }

    #[test]
    fn event_accessors_return_envelope_fields() -> Result<(), Box<dyn std::error::Error>> {
        let workflow_id = WorkflowId::new_v4();
        let recorded_at = recorded_at();
        let envelope = EventEnvelope {
            seq: 17,
            recorded_at,
            workflow_id: workflow_id.clone(),
        };
        let event = Event::WorkflowStarted {
            envelope,
            workflow_type: String::from("checkout"),
            input: payload("input")?,
            run_id: RunId::new(uuid::Uuid::from_u128(1)),
            parent_run_id: None,
            package_version: package_version(),
        };

        assert_eq!(event.seq(), 17);
        assert_eq!(event.recorded_at(), &recorded_at);
        assert_eq!(event.workflow_id(), &workflow_id);
        Ok(())
    }

    #[test]
    fn events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
        let fire_at = DateTime::from_timestamp(1_700_000_100, 0).unwrap_or_default();
        let events = vec![
            Event::WorkflowStarted {
                envelope: envelope(1),
                workflow_type: String::from("checkout"),
                input: payload("workflow-input")?,
                run_id: RunId::new(uuid::Uuid::from_u128(1)),
                parent_run_id: None,
                package_version: package_version(),
            },
            Event::WorkflowCompleted {
                envelope: envelope(2),
                result: payload("workflow-result")?,
            },
            Event::WorkflowFailed {
                envelope: envelope(3),
                error: workflow_error("workflow failed"),
            },
            Event::WorkflowCancelled {
                envelope: envelope(4),
                reason: String::from("caller requested cancellation"),
            },
            Event::WorkflowTimedOut {
                envelope: envelope(5),
                timeout: String::from("execution"),
            },
            Event::ActivityScheduled {
                envelope: envelope(6),
                activity_id: ActivityId::from_sequence_position(6),
                activity_type: String::from("charge-card"),
                input: payload("activity-input")?,
            },
            Event::ActivityStarted {
                envelope: envelope(7),
                activity_id: ActivityId::from_sequence_position(6),
            },
            Event::ActivityCompleted {
                envelope: envelope(8),
                activity_id: ActivityId::from_sequence_position(6),
                result: payload("activity-result")?,
            },
            Event::ActivityFailed {
                envelope: envelope(9),
                activity_id: ActivityId::from_sequence_position(6),
                error: activity_error(ActivityErrorKind::Retryable, "temporary outage"),
                attempt: 1,
            },
            Event::ActivityCancelled {
                envelope: envelope(10),
                activity_id: ActivityId::from_sequence_position(6),
            },
            Event::TimerStarted {
                envelope: envelope(11),
                timer_id: TimerId::anonymous(11),
                fire_at,
            },
            Event::TimerFired {
                envelope: envelope(12),
                timer_id: TimerId::anonymous(11),
            },
            Event::TimerCancelled {
                envelope: envelope(13),
                timer_id: TimerId::named("reminder")?,
            },
            Event::SignalReceived {
                envelope: envelope(14),
                name: String::from("approve"),
                payload: payload("signal")?,
            },
            Event::SignalSent {
                envelope: envelope(15),
                target_workflow_id: WorkflowId::new(uuid::Uuid::from_u128(5)),
                name: String::from("approve"),
                payload: payload("signal-sent")?,
            },
        ];

        for event in events {
            round_trip(&event)?;
        }
        Ok(())
    }

    #[test]
    fn child_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
        let child_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(1));
        let events = vec![
            Event::ChildWorkflowStarted {
                envelope: envelope(16),
                child_workflow_id: child_workflow_id.clone(),
                workflow_type: String::from("fulfillment"),
                input: payload("child-input")?,
                package_version: package_version(),
            },
            Event::ChildWorkflowCompleted {
                envelope: envelope(16),
                child_workflow_id: child_workflow_id.clone(),
                result: payload("child-result")?,
            },
            Event::ChildWorkflowFailed {
                envelope: envelope(17),
                child_workflow_id: child_workflow_id.clone(),
                error: workflow_error("child failed"),
            },
            Event::ChildWorkflowCancelled {
                envelope: envelope(18),
                child_workflow_id,
            },
        ];

        for event in events {
            round_trip(&event)?;
        }
        Ok(())
    }

    #[test]
    fn extended_events_round_trip_through_json() -> Result<(), Box<dyn std::error::Error>> {
        let schedule_id = ScheduleId::new(uuid::Uuid::from_u128(2));
        let triggered_workflow_id = WorkflowId::new(uuid::Uuid::from_u128(3));
        let triggered_run_id = RunId::new(uuid::Uuid::from_u128(4));
        let events = vec![
            Event::WorkflowContinuedAsNew {
                envelope: envelope(19),
                input: payload("continued-input")?,
                workflow_type: Some(String::from("checkout-v2")),
                parent_run_id: RunId::new(uuid::Uuid::from_u128(2)),
            },
            Event::SearchAttributesUpdated {
                envelope: envelope(20),
                workflow_id: WorkflowId::new(uuid::Uuid::nil()),
                attributes: HashMap::from([(
                    String::from("customer_id"),
                    SearchAttributeValue::String(String::from("cust-123")),
                )]),
            },
            Event::ScheduleCreated {
                envelope: envelope(20),
                schedule_id: schedule_id.clone(),
                config: schedule_config("schedule-created")?,
            },
            Event::ScheduleUpdated {
                envelope: envelope(21),
                schedule_id: schedule_id.clone(),
                config: schedule_config("schedule-updated")?,
            },
            Event::SchedulePaused {
                envelope: envelope(22),
                schedule_id: schedule_id.clone(),
            },
            Event::ScheduleResumed {
                envelope: envelope(23),
                schedule_id: schedule_id.clone(),
            },
            Event::ScheduleDeleted {
                envelope: envelope(24),
                schedule_id: schedule_id.clone(),
            },
            Event::ScheduleTriggered {
                envelope: envelope(25),
                schedule_id,
                workflow_id: triggered_workflow_id,
                run_id: triggered_run_id,
            },
        ];

        for event in events {
            round_trip(&event)?;
        }
        Ok(())
    }
}