Skip to main content

aion_store/
store.rs

1//! Event-store traits and single-writer capability.
2
3use aion_core::{
4    Event, RunId, TimerId, WorkflowFilter, WorkflowId, WorkflowStatus, WorkflowSummary,
5};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8
9use crate::{StoreError, TimerEntry};
10
11mod write_capability {
12    /// Capability required to append workflow events.
13    ///
14    /// This token enforces Aion's single-writer durability invariant at the type level: only the
15    /// recorder append path may hold write authority for a workflow. `SequenceConflict` remains the
16    /// runtime defense-in-depth signal for any internal misuse or future bypass that attempts to
17    /// append with a stale head.
18    #[derive(Clone, Copy, Debug)]
19    pub struct WriteToken {
20        _private: (),
21    }
22
23    impl WriteToken {
24        /// Constructs a write token for Aion's recorder path.
25        #[must_use]
26        pub fn recorder() -> Self {
27            Self { _private: () }
28        }
29    }
30
31    pub(crate) fn conformance() -> WriteToken {
32        WriteToken { _private: () }
33    }
34}
35
36pub use write_capability::WriteToken;
37
38/// Summary of one concrete run in a workflow's continuation chain.
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct RunSummary {
41    /// Concrete run identifier for this chain entry.
42    pub run_id: RunId,
43    /// Parent run that continued as this run, or `None` for the first run.
44    pub parent_run_id: Option<RunId>,
45    /// Status projected from this run's slice of lifecycle events.
46    pub status: WorkflowStatus,
47    /// Timestamp of this run's `WorkflowStarted` event.
48    pub started_at: DateTime<Utc>,
49    /// Timestamp of this run's terminal lifecycle event, when closed.
50    pub closed_at: Option<DateTime<Utc>>,
51}
52
53/// Read and durable-timer contract for Aion event stores.
54#[async_trait]
55pub trait ReadableEventStore: Send + Sync + 'static {
56    /// Reads the complete event history for `workflow_id` in ascending sequence order.
57    ///
58    /// A workflow with no recorded events is observed as an empty history. This includes unknown
59    /// workflow identifiers: because the first append with `expected_seq == 0` creates a workflow
60    /// implicitly, "unknown workflow" and "empty history" are the same observable state for reads.
61    /// This method must not return [`StoreError::NotFound`] for absent workflows.
62    async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError>;
63
64    /// Reads the event history for `workflow_id` restricted to events with sequence number
65    /// greater than or equal to `from_seq`, in ascending sequence order.
66    ///
67    /// This is the range-read primitive behind O(delta) WS resume: callers replaying from a
68    /// cursor must not pay for the full history. Semantics:
69    ///
70    /// - `from_seq <= 1` is equivalent to [`Self::read_history`]: sequence numbers start at 1,
71    ///   so every recorded event satisfies the bound.
72    /// - `from_seq` beyond the current head returns an empty vector, never an error. Whether a
73    ///   beyond-head cursor is *valid* is protocol judgment, not store judgment: the WS resume
74    ///   protocol rejects `resume_from_seq > head + 1` as an invalid cursor
75    ///   (`ResumeCursorAheadOfHistory`), but it makes that call by comparing the cursor against
76    ///   the head it observes — the store only answers which events exist at or after the
77    ///   requested sequence.
78    /// - Unknown workflows behave exactly like [`Self::read_history`] for unknown workflows:
79    ///   empty history, never [`StoreError::NotFound`], because "unknown workflow" and "empty
80    ///   history" are the same observable state for reads.
81    ///
82    /// There is deliberately no default implementation: a read-all-then-filter fallback would
83    /// silently reintroduce O(history) behavior. Every backend must implement this as a real
84    /// range read (for SQL backends, an indexed `seq >= ?` range scan).
85    async fn read_history_from(
86        &self,
87        workflow_id: &WorkflowId,
88        from_seq: u64,
89    ) -> Result<Vec<Event>, StoreError>;
90
91    /// Reads the concrete run chain for `workflow_id` in continuation order.
92    async fn read_run_chain(&self, workflow_id: &WorkflowId)
93    -> Result<Vec<RunSummary>, StoreError>;
94
95    /// Lists every workflow identifier that has at least one event in history.
96    ///
97    /// Unlike [`Self::list_active`], this includes terminal workflows and exists to let projection
98    /// repair jobs reconcile derived indexes against the authoritative event history.
99    async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError>;
100
101    /// Lists workflow identifiers whose projected status is non-terminal.
102    async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError>;
103
104    /// Returns workflow summaries matching `filter`.
105    async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError>;
106
107    /// Persists a durable timer for `workflow_id` that is due at `fire_at`.
108    ///
109    /// Timer scheduling remains on the public store surface because timers are not workflow-history
110    /// appends and are used by the timer subsystem after the recorder has written `TimerStarted`.
111    async fn schedule_timer(
112        &self,
113        workflow_id: &WorkflowId,
114        timer_id: &TimerId,
115        fire_at: DateTime<Utc>,
116    ) -> Result<(), StoreError>;
117
118    /// Returns durable timers whose `fire_at` is less than or equal to `as_of`.
119    async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError>;
120}
121
122/// Write authority for appending workflow-history events.
123///
124/// `append` requires a [`WriteToken`], so having an `Arc<dyn EventStore>` or
125/// `Arc<dyn ReadableEventStore>` is not sufficient to write events.
126#[async_trait]
127pub trait WritableEventStore: Send + Sync + 'static {
128    /// Atomically appends `events` to `workflow_id` when the stored history head equals
129    /// `expected_seq`.
130    ///
131    /// Implementations must apply every event in `events` or none of them. If the current stored
132    /// head for `workflow_id` differs from `expected_seq`, this method must return
133    /// [`StoreError::SequenceConflict`] and leave history unchanged. A first append with
134    /// `expected_seq == 0` creates the workflow history implicitly.
135    async fn append(
136        &self,
137        token: WriteToken,
138        workflow_id: &WorkflowId,
139        events: &[Event],
140        expected_seq: u64,
141    ) -> Result<(), StoreError>;
142}
143
144/// Convenience trait for concrete stores that support both reads/timers and recorder writes.
145pub trait EventStore: ReadableEventStore + WritableEventStore {}
146
147impl<T> EventStore for T where T: ReadableEventStore + WritableEventStore + ?Sized {}
148
149pub(crate) fn conformance_write_token() -> WriteToken {
150    write_capability::conformance()
151}
152
153#[cfg(test)]
154mod tests {
155    use std::sync::Arc;
156
157    use super::{EventStore, ReadableEventStore, WritableEventStore};
158
159    #[test]
160    fn event_store_traits_are_object_safe() {
161        let _: Option<Arc<dyn ReadableEventStore>> = None;
162        let _: Option<Arc<dyn WritableEventStore>> = None;
163        let _: Option<Arc<dyn EventStore>> = None;
164    }
165}