aion-rs 0.1.0

Transport-agnostic Aion workflow engine with durability, replay, timers, and supervision.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
//! Engine-facing seam for time, signal, query, child, and concurrency services.
//!
//! AE implements [`EngineHandle`] for the real engine. This AT cluster consumes the seam to resolve
//! workflow residency, deliver already-recorded observations to mailboxes, request linked child
//! workflow starts, arm timer-wheel entries, and route asynchronous-arrival events through the
//! target workflow's single AD Recorder. AT does not manage workflow process lifecycle,
//! supervision, or module loading directly.

use aion_core::{Event, Payload, TimerId, WorkflowError, WorkflowId};

use crate::Pid;
use chrono::{DateTime, Utc};
use tokio::sync::oneshot;

/// Narrow live-process handle used by AT services after AE resolves workflow residency.
///
/// The wrapper intentionally exposes only an opaque process identifier. Real AE implementations can
/// adapt their concrete BEAM process handle into this type without giving AT lifecycle ownership.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct WorkflowProcessHandle {
    pid: u64,
}

impl WorkflowProcessHandle {
    /// Creates a workflow process handle from an opaque process identifier.
    #[must_use]
    pub const fn new(pid: u64) -> Self {
        Self { pid }
    }

    /// Returns the opaque process identifier backing this handle.
    #[must_use]
    pub const fn pid(self) -> u64 {
        self.pid
    }
}

/// AE's answer when AT resolves a logical workflow to a live process.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum WorkflowResidency {
    /// The workflow is currently resident and can receive mailbox messages.
    Resident(WorkflowProcessHandle),
    /// The workflow exists durably but has no live process at the moment.
    NonResident,
    /// The workflow is terminal and should not receive live interactions.
    Terminal,
    /// AE has no durable or live workflow for the requested identifier.
    Unknown,
}

/// One-shot reply path carried by read-only query mailbox messages.
///
/// Workflow processes answer query messages at yield points from registered read-only handlers.
/// Queries are distinct from signals, do not mutate deterministic workflow state, and never record
/// events; the reply sender carries either the handler payload or a typed query error.
pub type QueryReplySender = oneshot::Sender<crate::query::service::QueryResult>;

/// Message kinds AT may ask AE to deliver to a workflow process mailbox.
#[derive(Debug)]
pub enum WorkflowMailboxMessage {
    /// A durable timer fired and has been recorded.
    TimerFired {
        /// Timer that fired.
        timer_id: TimerId,
        /// Deterministic fire timestamp carried for service/replay correlation.
        fire_at: DateTime<Utc>,
    },
    /// A durable signal arrived and has been recorded.
    SignalReceived {
        /// Signal name selected by the sender.
        name: String,
        /// Opaque signal payload.
        payload: Payload,
    },
    /// A read-only query request. Query dispatch records no event.
    Query {
        /// Query name selected by the caller.
        name: String,
        /// Opaque query input payload.
        payload: Payload,
        /// One-shot channel for the workflow query handler's reply.
        reply_to: QueryReplySender,
    },
    /// A linked child workflow completed successfully and has been recorded.
    ChildWorkflowCompleted {
        /// Child workflow that produced the result.
        child_workflow_id: WorkflowId,
        /// Spawn correlation token used by collectors.
        correlation: u64,
        /// Opaque child result payload.
        result: Payload,
    },
    /// A linked child workflow failed terminally and has been recorded.
    ChildWorkflowFailed {
        /// Child workflow that failed.
        child_workflow_id: WorkflowId,
        /// Spawn correlation token used by collectors.
        correlation: u64,
        /// Terminal child workflow failure.
        error: WorkflowError,
    },
    /// A linked child workflow was cancelled and has been recorded.
    ChildWorkflowCancelled {
        /// Child workflow that was cancelled.
        child_workflow_id: WorkflowId,
        /// Spawn correlation token used by collectors.
        correlation: u64,
    },
}

/// Request from AT to AE to spawn a child workflow under a parent process.
///
/// The child workflow identifier is pre-allocated by the parent and durably
/// recorded as `ChildWorkflowStarted` in the parent's history *before* this
/// request is issued (record-then-spawn), so a crash between the record and
/// the start leaves a recoverable record instead of an unrecorded orphan.
/// AE must start the child under exactly this identifier. Children are not
/// process-linked to their parents: parent death leaves children running,
/// and awaited terminals are observed through the child-terminal watcher.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChildWorkflowSpawnRequest {
    /// Parent workflow requesting the child execution.
    pub parent_workflow_id: WorkflowId,
    /// Pre-allocated child workflow identifier already recorded by the parent.
    pub child_workflow_id: WorkflowId,
    /// Child workflow type selected by the parent workflow.
    pub workflow_type: String,
    /// Opaque child workflow input payload.
    pub input: Payload,
}

/// AE's result after starting a linked child workflow execution.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ChildWorkflowSpawnResult {
    /// Logical child workflow identifier.
    pub child_workflow_id: WorkflowId,
    /// Live process handle for the linked child execution.
    pub child_process: WorkflowProcessHandle,
}

/// Timer-wheel entry requested by AT for a live workflow process.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct TimerWheelEntry {
    /// Workflow process that should receive the timer fire path.
    pub process: WorkflowProcessHandle,
    /// Timer selected by workflow code or assigned by the engine.
    pub timer_id: TimerId,
    /// UTC timestamp at which the wheel should fire.
    pub fire_at: DateTime<Utc>,
}

/// Errors returned by the engine seam.
#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)]
pub enum EngineSeamError {
    /// The target workflow has no current live process.
    #[error("workflow {workflow_id} is not resident")]
    NonResident {
        /// Workflow that had no current live process.
        workflow_id: WorkflowId,
    },

    /// The target workflow is terminal.
    #[error("workflow {workflow_id} is terminal")]
    Terminal {
        /// Terminal workflow identifier.
        workflow_id: WorkflowId,
    },

    /// The target workflow is unknown to AE.
    #[error("workflow {workflow_id} is unknown")]
    UnknownWorkflow {
        /// Unknown workflow identifier.
        workflow_id: WorkflowId,
    },

    /// AE could not deliver a mailbox message.
    #[error("mailbox delivery failed: {reason}")]
    Delivery {
        /// Human-readable delivery failure reason.
        reason: String,
    },

    /// AE could not spawn a linked child workflow.
    #[error("child workflow spawn failed: {reason}")]
    ChildSpawn {
        /// Human-readable child-spawn failure reason.
        reason: String,
    },

    /// AE could not arm or disarm the timer wheel.
    #[error("timer wheel operation failed: {reason}")]
    TimerWheel {
        /// Human-readable timer-wheel failure reason.
        reason: String,
    },

    /// AE could not terminate a linked child process.
    #[error("linked child termination failed: {reason}")]
    ChildTermination {
        /// Human-readable child termination failure reason.
        reason: String,
    },

    /// AD's single Recorder path could not record the event.
    #[error("workflow recorder failed: {reason}")]
    Recorder {
        /// Human-readable recorder failure reason.
        reason: String,
    },
}

/// Engine-facing capabilities consumed by AT services and implemented by AE.
///
/// This trait deliberately does not expose operations that start, supervise, tear down, or load
/// top-level workflow processes. Child spawning, residency resolution, and recording are requests
/// into AE/AD-owned infrastructure. In particular, [`EngineHandle::record_workflow_event`] must
/// route asynchronous-arrival events through the target workflow's single Recorder; AT services must
/// not append directly to the event store.
pub trait EngineHandle: Send + Sync {
    /// Resolves a workflow identifier to its current residency state.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when AE cannot inspect residency for the requested workflow.
    fn resolve_workflow(
        &self,
        workflow_id: &WorkflowId,
    ) -> Result<WorkflowResidency, EngineSeamError>;

    /// Delivers a message to a resident workflow process mailbox.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when AE cannot enqueue the message on the target mailbox.
    fn deliver_workflow_message(
        &self,
        process: WorkflowProcessHandle,
        message: WorkflowMailboxMessage,
    ) -> Result<(), EngineSeamError>;

    /// Requests AE to spawn a child workflow execution linked to the parent process.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when AE rejects or fails the linked child-spawn request.
    fn spawn_child_workflow(
        &self,
        request: ChildWorkflowSpawnRequest,
    ) -> Result<ChildWorkflowSpawnResult, EngineSeamError>;

    /// Terminates a linked child workflow process through AE's process-link boundary.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when AE cannot send the cancellation exit to the linked child.
    fn terminate_linked_child_workflow(
        &self,
        parent_workflow_id: &WorkflowId,
        child_process: WorkflowProcessHandle,
        correlation: u64,
    ) -> Result<(), EngineSeamError>;

    /// Terminates a linked in-VM activity process through AE's process-link boundary.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when AE cannot send the cancellation exit to the linked child.
    fn terminate_linked_activity(
        &self,
        parent_workflow_id: &WorkflowId,
        activity_process: Pid,
        correlation: u64,
    ) -> Result<(), EngineSeamError>;

    /// Arms a timer-wheel entry for a resident workflow process.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when AE cannot register the timer with the live wheel.
    fn arm_timer(&self, entry: TimerWheelEntry) -> Result<(), EngineSeamError>;

    /// Disarms a timer-wheel entry for a resident workflow process.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when AE cannot remove the timer from the live wheel.
    fn disarm_timer(
        &self,
        process: WorkflowProcessHandle,
        timer_id: &TimerId,
    ) -> Result<(), EngineSeamError>;

    /// Records an event through the target workflow's single AD Recorder.
    ///
    /// # Errors
    ///
    /// Returns [`EngineSeamError`] when the target workflow's Recorder cannot append the event.
    fn record_workflow_event(
        &self,
        workflow_id: &WorkflowId,
        event: Event,
    ) -> Result<(), EngineSeamError>;
}

#[cfg(test)]
pub(crate) mod test_support {
    use std::collections::{HashMap, VecDeque};
    use std::sync::Arc;
    use std::sync::{Mutex, MutexGuard};

    use aion_store::{WritableEventStore, WriteToken};

    use super::*;

    /// Operation captured by [`FakeEngineHandle`] in observed order.
    #[derive(Clone, Debug, PartialEq)]
    pub enum FakeEngineOperation {
        /// A mailbox message was delivered.
        Delivered {
            /// Target process handle.
            process: WorkflowProcessHandle,
            /// Delivered message projection.
            message: DeliveredWorkflowMessage,
        },
        /// A child spawn was requested.
        ChildSpawnRequested(ChildWorkflowSpawnRequest),
        /// A timer-wheel entry was armed.
        TimerArmed(TimerWheelEntry),
        /// A timer-wheel entry was disarmed.
        TimerDisarmed {
            /// Target process handle.
            process: WorkflowProcessHandle,
            /// Timer that was disarmed.
            timer_id: TimerId,
        },
        /// A linked child workflow process was terminated.
        LinkedChildWorkflowTerminated {
            /// Parent workflow owning the link.
            parent_workflow_id: WorkflowId,
            /// Linked child workflow process.
            child_process: WorkflowProcessHandle,
            /// Spawn correlation token.
            correlation: u64,
        },
        /// A linked activity process was terminated.
        LinkedActivityTerminated {
            /// Parent workflow owning the link.
            parent_workflow_id: WorkflowId,
            /// Linked activity process.
            activity_process: Pid,
            /// Spawn correlation token.
            correlation: u64,
        },
        /// An event was recorded through the recorder seam.
        EventRecorded {
            /// Workflow whose recorder received the event.
            workflow_id: WorkflowId,
            /// Recorded event.
            event: Event,
        },
    }

    #[derive(Default)]
    struct FakeEngineState {
        residency: HashMap<WorkflowId, WorkflowResidency>,
        delivered: Vec<(WorkflowProcessHandle, DeliveredWorkflowMessage)>,
        delivery_responses: VecDeque<Result<(), EngineSeamError>>,
        child_spawn_responses: VecDeque<Result<ChildWorkflowSpawnResult, EngineSeamError>>,
        armed_timers: Vec<TimerWheelEntry>,
        disarmed_timers: Vec<(WorkflowProcessHandle, TimerId)>,
        recorded_events: Vec<(WorkflowId, Event)>,
        operations: Vec<FakeEngineOperation>,
        recorder_store: Option<Arc<dyn WritableEventStore>>,
        record_responses: VecDeque<Result<(), EngineSeamError>>,
    }

    /// Cloneable projection of delivered mailbox messages for seam tests.
    #[derive(Clone, Debug, PartialEq, Eq)]
    pub enum DeliveredWorkflowMessage {
        /// A timer-fired delivery was observed.
        TimerFired {
            timer_id: TimerId,
            fire_at: DateTime<Utc>,
        },
        /// A signal delivery was observed.
        SignalReceived { name: String, payload: Payload },
        /// A query delivery was observed; the one-shot sender is intentionally not retained.
        Query { name: String, payload: Payload },
        /// A child completion delivery was observed.
        ChildWorkflowCompleted {
            child_workflow_id: WorkflowId,
            correlation: u64,
            result: Payload,
        },
        /// A child failure delivery was observed.
        ChildWorkflowFailed {
            child_workflow_id: WorkflowId,
            correlation: u64,
            error: WorkflowError,
        },
        /// A child cancellation delivery was observed.
        ChildWorkflowCancelled {
            child_workflow_id: WorkflowId,
            correlation: u64,
        },
    }

    impl DeliveredWorkflowMessage {
        pub(crate) fn from_message(message: &WorkflowMailboxMessage) -> Self {
            match message {
                WorkflowMailboxMessage::TimerFired { timer_id, fire_at } => Self::TimerFired {
                    timer_id: timer_id.clone(),
                    fire_at: *fire_at,
                },
                WorkflowMailboxMessage::SignalReceived { name, payload } => Self::SignalReceived {
                    name: name.clone(),
                    payload: payload.clone(),
                },
                WorkflowMailboxMessage::Query {
                    name,
                    payload,
                    reply_to: _,
                } => Self::Query {
                    name: name.clone(),
                    payload: payload.clone(),
                },
                WorkflowMailboxMessage::ChildWorkflowCompleted {
                    child_workflow_id,
                    correlation,
                    result,
                } => Self::ChildWorkflowCompleted {
                    child_workflow_id: child_workflow_id.clone(),
                    correlation: *correlation,
                    result: result.clone(),
                },
                WorkflowMailboxMessage::ChildWorkflowFailed {
                    child_workflow_id,
                    correlation,
                    error,
                } => Self::ChildWorkflowFailed {
                    child_workflow_id: child_workflow_id.clone(),
                    correlation: *correlation,
                    error: error.clone(),
                },
                WorkflowMailboxMessage::ChildWorkflowCancelled {
                    child_workflow_id,
                    correlation,
                } => Self::ChildWorkflowCancelled {
                    child_workflow_id: child_workflow_id.clone(),
                    correlation: *correlation,
                },
            }
        }
    }

    /// Test-only fake implementation of [`EngineHandle`].
    #[derive(Default)]
    pub struct FakeEngineHandle {
        state: Mutex<FakeEngineState>,
    }

    impl FakeEngineHandle {
        /// Creates an empty fake engine handle.
        #[must_use]
        pub fn new() -> Self {
            Self::default()
        }

        /// Creates a fake whose recorder seam appends to the supplied store with event sequencing.
        #[must_use]
        pub fn recording_to(store: Arc<dyn WritableEventStore>) -> Self {
            Self {
                state: Mutex::new(FakeEngineState {
                    recorder_store: Some(store),
                    ..FakeEngineState::default()
                }),
            }
        }

        /// Sets the residency response returned for a workflow.
        ///
        /// # Errors
        ///
        /// Returns [`EngineSeamError::EngineOffline`] if the fake's state lock is poisoned.
        pub fn set_residency(
            &self,
            workflow_id: WorkflowId,
            residency: WorkflowResidency,
        ) -> Result<(), EngineSeamError> {
            self.state()?.residency.insert(workflow_id, residency);
            Ok(())
        }

        /// Queues the next response returned by mailbox-delivery seam calls.
        ///
        /// # Errors
        ///
        /// Returns [`EngineSeamError::EngineOffline`] if the fake's state lock is poisoned.
        pub fn push_delivery_response(
            &self,
            response: Result<(), EngineSeamError>,
        ) -> Result<(), EngineSeamError> {
            self.state()?.delivery_responses.push_back(response);
            Ok(())
        }

        /// Returns a snapshot of seam operations in observed order.
        ///
        /// # Errors
        ///
        /// Returns [`EngineSeamError::EngineOffline`] if the fake's state lock is poisoned.
        pub fn operations(&self) -> Result<Vec<FakeEngineOperation>, EngineSeamError> {
            Ok(self.state()?.operations.clone())
        }

        /// Returns a snapshot of delivered mailbox messages.
        ///
        /// # Errors
        ///
        /// Returns [`EngineSeamError::EngineOffline`] if the fake's state lock is poisoned.
        pub fn delivered_messages(
            &self,
        ) -> Result<Vec<(WorkflowProcessHandle, DeliveredWorkflowMessage)>, EngineSeamError>
        {
            Ok(self.state()?.delivered.clone())
        }

        /// Returns a snapshot of armed timer-wheel entries.
        ///
        /// # Errors
        ///
        /// Returns [`EngineSeamError::EngineOffline`] if the fake's state lock is poisoned.
        pub fn armed_timers(&self) -> Result<Vec<TimerWheelEntry>, EngineSeamError> {
            Ok(self.state()?.armed_timers.clone())
        }

        /// Queues the next child-spawn response returned by the fake.
        ///
        /// # Errors
        ///
        /// Returns [`EngineSeamError::EngineOffline`] if the fake's state lock is poisoned.
        pub fn push_child_spawn_response(
            &self,
            response: Result<ChildWorkflowSpawnResult, EngineSeamError>,
        ) -> Result<(), EngineSeamError> {
            self.state()?.child_spawn_responses.push_back(response);
            Ok(())
        }

        /// Returns events recorded through the fake recorder seam.
        ///
        /// # Errors
        ///
        /// Returns [`EngineSeamError::EngineOffline`] if the fake's state lock is poisoned.
        pub fn recorded_events(&self) -> Result<Vec<(WorkflowId, Event)>, EngineSeamError> {
            Ok(self.state()?.recorded_events.clone())
        }

        fn state(&self) -> Result<MutexGuard<'_, FakeEngineState>, EngineSeamError> {
            self.state.lock().map_err(|_| EngineSeamError::Recorder {
                reason: "fake engine state lock was poisoned".to_owned(),
            })
        }
    }

    impl EngineHandle for FakeEngineHandle {
        fn resolve_workflow(
            &self,
            workflow_id: &WorkflowId,
        ) -> Result<WorkflowResidency, EngineSeamError> {
            Ok(self
                .state()?
                .residency
                .get(workflow_id)
                .copied()
                .unwrap_or(WorkflowResidency::Unknown))
        }

        fn deliver_workflow_message(
            &self,
            process: WorkflowProcessHandle,
            message: WorkflowMailboxMessage,
        ) -> Result<(), EngineSeamError> {
            let mut state = self.state()?;
            if let Some(response) = state.delivery_responses.pop_front() {
                response?;
            }
            let delivered = DeliveredWorkflowMessage::from_message(&message);
            state.delivered.push((process, delivered.clone()));
            state.operations.push(FakeEngineOperation::Delivered {
                process,
                message: delivered,
            });
            Ok(())
        }

        fn spawn_child_workflow(
            &self,
            request: ChildWorkflowSpawnRequest,
        ) -> Result<ChildWorkflowSpawnResult, EngineSeamError> {
            let mut state = self.state()?;
            state
                .operations
                .push(FakeEngineOperation::ChildSpawnRequested(request.clone()));
            if let Some(response) = state.child_spawn_responses.pop_front() {
                response
            } else {
                Err(EngineSeamError::ChildSpawn {
                    reason: "fake child spawn response was not queued".to_owned(),
                })
            }
        }

        fn terminate_linked_child_workflow(
            &self,
            parent_workflow_id: &WorkflowId,
            child_process: WorkflowProcessHandle,
            correlation: u64,
        ) -> Result<(), EngineSeamError> {
            let mut state = self.state()?;
            state
                .operations
                .push(FakeEngineOperation::LinkedChildWorkflowTerminated {
                    parent_workflow_id: parent_workflow_id.clone(),
                    child_process,
                    correlation,
                });
            Ok(())
        }

        fn terminate_linked_activity(
            &self,
            parent_workflow_id: &WorkflowId,
            activity_process: Pid,
            correlation: u64,
        ) -> Result<(), EngineSeamError> {
            let mut state = self.state()?;
            state
                .operations
                .push(FakeEngineOperation::LinkedActivityTerminated {
                    parent_workflow_id: parent_workflow_id.clone(),
                    activity_process,
                    correlation,
                });
            Ok(())
        }

        fn arm_timer(&self, entry: TimerWheelEntry) -> Result<(), EngineSeamError> {
            let mut state = self.state()?;
            state.armed_timers.push(entry.clone());
            state
                .operations
                .push(FakeEngineOperation::TimerArmed(entry));
            Ok(())
        }

        fn disarm_timer(
            &self,
            process: WorkflowProcessHandle,
            timer_id: &TimerId,
        ) -> Result<(), EngineSeamError> {
            let mut state = self.state()?;
            state
                .armed_timers
                .retain(|entry| !(entry.process == process && &entry.timer_id == timer_id));
            state.disarmed_timers.push((process, timer_id.clone()));
            state.operations.push(FakeEngineOperation::TimerDisarmed {
                process,
                timer_id: timer_id.clone(),
            });
            Ok(())
        }

        fn record_workflow_event(
            &self,
            workflow_id: &WorkflowId,
            event: Event,
        ) -> Result<(), EngineSeamError> {
            let mut state = self.state()?;
            if let Some(response) = state.record_responses.pop_front() {
                response?;
            }
            state
                .recorded_events
                .push((workflow_id.clone(), event.clone()));
            let recorder_store = state.recorder_store.clone();
            state.operations.push(FakeEngineOperation::EventRecorded {
                workflow_id: workflow_id.clone(),
                event: event.clone(),
            });
            drop(state);

            if let Some(store) = recorder_store {
                let expected_seq = event.seq().saturating_sub(1);
                futures::executor::block_on(store.append(
                    WriteToken::recorder(),
                    workflow_id,
                    &[event],
                    expected_seq,
                ))
                .map_err(|error| EngineSeamError::Recorder {
                    reason: error.to_string(),
                })?;
            }
            Ok(())
        }
    }
}

#[cfg(test)]
mod tests {
    use aion_core::{ContentType, Payload, WorkflowId};

    use super::test_support::{DeliveredWorkflowMessage, FakeEngineHandle};
    use super::{
        EngineHandle, EngineSeamError, WorkflowMailboxMessage, WorkflowProcessHandle,
        WorkflowResidency,
    };

    #[test]
    fn fake_captures_delivered_message_for_resident_workflow()
    -> Result<(), Box<dyn std::error::Error>> {
        let engine = FakeEngineHandle::new();
        let workflow_id = WorkflowId::new_v4();
        let process = WorkflowProcessHandle::new(42);
        engine.set_residency(workflow_id.clone(), WorkflowResidency::Resident(process))?;

        let resolved = engine.resolve_workflow(&workflow_id)?;
        assert_eq!(resolved, WorkflowResidency::Resident(process));

        let payload = Payload::new(ContentType::Json, b"null".to_vec());
        let message = WorkflowMailboxMessage::SignalReceived {
            name: "wake".to_owned(),
            payload: payload.clone(),
        };
        engine.deliver_workflow_message(process, message)?;

        assert_eq!(
            engine.delivered_messages()?,
            vec![(
                process,
                DeliveredWorkflowMessage::SignalReceived {
                    name: "wake".to_owned(),
                    payload,
                }
            )]
        );
        Ok(())
    }

    #[test]
    fn fake_can_inject_delivery_failure() -> Result<(), Box<dyn std::error::Error>> {
        let engine = FakeEngineHandle::new();
        let process = WorkflowProcessHandle::new(43);
        engine.push_delivery_response(Err(EngineSeamError::Delivery {
            reason: "mailbox unavailable".to_owned(),
        }))?;

        let error = engine
            .deliver_workflow_message(
                process,
                WorkflowMailboxMessage::SignalReceived {
                    name: "wake".to_owned(),
                    payload: Payload::new(ContentType::Json, b"null".to_vec()),
                },
            )
            .err()
            .ok_or_else(|| std::io::Error::other("delivery failure was not returned"))?;

        assert!(matches!(error, EngineSeamError::Delivery { .. }));
        assert!(engine.delivered_messages()?.is_empty());
        assert!(engine.operations()?.is_empty());
        Ok(())
    }
}