aion_store/store.rs
1//! Event-store traits and single-writer capability.
2
3use aion_core::{
4 Event, RunId, TimerId, WorkflowFilter, WorkflowId, WorkflowStatus, WorkflowSummary,
5};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8
9use crate::{StoreError, TimerEntry};
10
11mod write_capability {
12 /// Capability required to append workflow events.
13 ///
14 /// This token enforces Aion's single-writer durability invariant at the type level: only the
15 /// recorder append path may hold write authority for a workflow. `SequenceConflict` remains the
16 /// runtime defense-in-depth signal for any internal misuse or future bypass that attempts to
17 /// append with a stale head.
18 #[derive(Clone, Copy, Debug)]
19 pub struct WriteToken {
20 _private: (),
21 }
22
23 impl WriteToken {
24 /// Constructs a write token for Aion's recorder path.
25 #[must_use]
26 pub fn recorder() -> Self {
27 Self { _private: () }
28 }
29 }
30
31 pub(crate) fn conformance() -> WriteToken {
32 WriteToken { _private: () }
33 }
34}
35
36pub use write_capability::WriteToken;
37
38/// Summary of one concrete run in a workflow's continuation chain.
39#[derive(Clone, Debug, PartialEq, Eq)]
40pub struct RunSummary {
41 /// Concrete run identifier for this chain entry.
42 pub run_id: RunId,
43 /// Parent run that continued as this run, or `None` for the first run.
44 pub parent_run_id: Option<RunId>,
45 /// Status projected from this run's slice of lifecycle events.
46 pub status: WorkflowStatus,
47 /// Timestamp of this run's `WorkflowStarted` event.
48 pub started_at: DateTime<Utc>,
49 /// Timestamp of this run's terminal lifecycle event, when closed.
50 pub closed_at: Option<DateTime<Utc>>,
51}
52
53/// Read and durable-timer contract for Aion event stores.
54#[async_trait]
55pub trait ReadableEventStore: Send + Sync + 'static {
56 /// Reads the complete event history for `workflow_id` in ascending sequence order.
57 ///
58 /// A workflow with no recorded events is observed as an empty history. This includes unknown
59 /// workflow identifiers: because the first append with `expected_seq == 0` creates a workflow
60 /// implicitly, "unknown workflow" and "empty history" are the same observable state for reads.
61 /// This method must not return [`StoreError::NotFound`] for absent workflows.
62 async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError>;
63
64 /// Reads the event history for `workflow_id` restricted to events with sequence number
65 /// greater than or equal to `from_seq`, in ascending sequence order.
66 ///
67 /// This is the range-read primitive behind O(delta) WS resume: callers replaying from a
68 /// cursor must not pay for the full history. Semantics:
69 ///
70 /// - `from_seq <= 1` is equivalent to [`Self::read_history`]: sequence numbers start at 1,
71 /// so every recorded event satisfies the bound.
72 /// - `from_seq` beyond the current head returns an empty vector, never an error. Whether a
73 /// beyond-head cursor is *valid* is protocol judgment, not store judgment: the WS resume
74 /// protocol rejects `resume_from_seq > head + 1` as an invalid cursor
75 /// (`ResumeCursorAheadOfHistory`), but it makes that call by comparing the cursor against
76 /// the head it observes — the store only answers which events exist at or after the
77 /// requested sequence.
78 /// - Unknown workflows behave exactly like [`Self::read_history`] for unknown workflows:
79 /// empty history, never [`StoreError::NotFound`], because "unknown workflow" and "empty
80 /// history" are the same observable state for reads.
81 ///
82 /// There is deliberately no default implementation: a read-all-then-filter fallback would
83 /// silently reintroduce O(history) behavior. Every backend must implement this as a real
84 /// range read (for SQL backends, an indexed `seq >= ?` range scan).
85 async fn read_history_from(
86 &self,
87 workflow_id: &WorkflowId,
88 from_seq: u64,
89 ) -> Result<Vec<Event>, StoreError>;
90
91 /// Reads the concrete run chain for `workflow_id` in continuation order.
92 async fn read_run_chain(&self, workflow_id: &WorkflowId)
93 -> Result<Vec<RunSummary>, StoreError>;
94
95 /// Lists every workflow identifier that has at least one event in history.
96 ///
97 /// Unlike [`Self::list_active`], this includes terminal workflows and exists to let projection
98 /// repair jobs reconcile derived indexes against the authoritative event history.
99 async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError>;
100
101 /// Lists workflow identifiers whose projected status is non-terminal.
102 async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError>;
103
104 /// Returns workflow summaries matching `filter`.
105 async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError>;
106
107 /// Persists a durable timer for `workflow_id` that is due at `fire_at`.
108 ///
109 /// Timer scheduling remains on the public store surface because timers are not workflow-history
110 /// appends and are used by the timer subsystem after the recorder has written `TimerStarted`.
111 async fn schedule_timer(
112 &self,
113 workflow_id: &WorkflowId,
114 timer_id: &TimerId,
115 fire_at: DateTime<Utc>,
116 ) -> Result<(), StoreError>;
117
118 /// Returns durable timers whose `fire_at` is less than or equal to `as_of`.
119 async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError>;
120}
121
122/// Write authority for appending workflow-history events.
123///
124/// `append` requires a [`WriteToken`], so having an `Arc<dyn EventStore>` or
125/// `Arc<dyn ReadableEventStore>` is not sufficient to write events.
126#[async_trait]
127pub trait WritableEventStore: Send + Sync + 'static {
128 /// Atomically appends `events` to `workflow_id` when the stored history head equals
129 /// `expected_seq`.
130 ///
131 /// Implementations must apply every event in `events` or none of them. If the current stored
132 /// head for `workflow_id` differs from `expected_seq`, this method must return
133 /// [`StoreError::SequenceConflict`] and leave history unchanged. A first append with
134 /// `expected_seq == 0` creates the workflow history implicitly.
135 async fn append(
136 &self,
137 token: WriteToken,
138 workflow_id: &WorkflowId,
139 events: &[Event],
140 expected_seq: u64,
141 ) -> Result<(), StoreError>;
142}
143
144/// Convenience trait for concrete stores that support reads/timers, recorder
145/// writes, and deployed-package persistence.
146///
147/// [`crate::PackageStore`] is part of the contract, not an optional add-on:
148/// runtime-deployed packages share the durability promise of event history
149/// (a recovered run is pinned to a recorded package version, and a backend
150/// that dropped the archive would strand it).
151pub trait EventStore: ReadableEventStore + WritableEventStore + crate::PackageStore {}
152
153impl<T> EventStore for T where
154 T: ReadableEventStore + WritableEventStore + crate::PackageStore + ?Sized
155{
156}
157
158pub(crate) fn conformance_write_token() -> WriteToken {
159 write_capability::conformance()
160}
161
162#[cfg(test)]
163mod tests {
164 use std::sync::Arc;
165
166 use super::{EventStore, ReadableEventStore, WritableEventStore};
167
168 #[test]
169 fn event_store_traits_are_object_safe() {
170 let _: Option<Arc<dyn ReadableEventStore>> = None;
171 let _: Option<Arc<dyn WritableEventStore>> = None;
172 let _: Option<Arc<dyn EventStore>> = None;
173 }
174}