Skip to main content

actionqueue_storage/snapshot/
model.rs

1//! Snapshot model types for state persistence.
2//!
3//! This module defines the data structures used for snapshot persistence and recovery.
4//! Snapshots provide a point-in-time view of the ActionQueue state that can be used to
5//! accelerate recovery by reducing the amount of WAL that needs to be replayed.
6
7use actionqueue_core::budget::BudgetDimension;
8use actionqueue_core::ids::{ActorId, LedgerEntryId, RunId, TaskId, TenantId};
9use actionqueue_core::platform::{Capability, Role};
10use actionqueue_core::run::run_instance::RunInstance as CoreRunInstance;
11use actionqueue_core::subscription::{EventFilter, SubscriptionId};
12use actionqueue_core::task::task_spec::TaskSpec;
13
14use crate::recovery::reducer::{AttemptHistoryEntry, LeaseMetadata, RunStateHistoryEntry};
15
16/// A versioned snapshot of the ActionQueue state.
17///
18/// Snapshots capture the state of the ActionQueue system at a point in time and are used
19/// to accelerate recovery by providing a starting state for WAL replay. The snapshot
20/// format is versioned to support future evolution of the data structures.
21#[derive(Debug, Clone, PartialEq, Eq)]
22#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
23pub struct Snapshot {
24    /// The format version of this snapshot.
25    ///
26    /// This version is used to ensure compatibility between the snapshot writer and
27    /// loader. When the snapshot format changes in a breaking way, this version
28    /// number should be incremented.
29    pub version: u32,
30    /// The timestamp when this snapshot was taken (Unix epoch seconds).
31    ///
32    /// This timestamp represents the point-in-time at which the snapshot was captured.
33    pub timestamp: u64,
34    /// Metadata about the snapshot for versioning and compatibility.
35    pub metadata: SnapshotMetadata,
36    /// The tasks known at the time of the snapshot.
37    pub tasks: Vec<SnapshotTask>,
38    /// The runs known at the time of the snapshot.
39    pub runs: Vec<SnapshotRun>,
40    /// Engine control projection at snapshot time.
41    #[cfg_attr(feature = "serde", serde(default))]
42    pub engine: SnapshotEngineControl,
43    /// Dependency declarations at snapshot time.
44    #[cfg_attr(feature = "serde", serde(default))]
45    pub dependency_declarations: Vec<SnapshotDependencyDeclaration>,
46    /// Budget allocations and consumption records at snapshot time.
47    #[cfg_attr(feature = "serde", serde(default))]
48    pub budgets: Vec<SnapshotBudget>,
49    /// Subscription state records at snapshot time.
50    #[cfg_attr(feature = "serde", serde(default))]
51    pub subscriptions: Vec<SnapshotSubscription>,
52    /// Actor registration records at snapshot time.
53    #[cfg_attr(feature = "serde", serde(default))]
54    pub actors: Vec<SnapshotActor>,
55    /// Tenant records at snapshot time.
56    #[cfg_attr(feature = "serde", serde(default))]
57    pub tenants: Vec<SnapshotTenant>,
58    /// Role assignment records at snapshot time.
59    #[cfg_attr(feature = "serde", serde(default))]
60    pub role_assignments: Vec<SnapshotRoleAssignment>,
61    /// Capability grant records at snapshot time.
62    #[cfg_attr(feature = "serde", serde(default))]
63    pub capability_grants: Vec<SnapshotCapabilityGrant>,
64    /// Ledger entries at snapshot time.
65    #[cfg_attr(feature = "serde", serde(default))]
66    pub ledger_entries: Vec<SnapshotLedgerEntry>,
67}
68
69/// Snapshot representation of engine control projection.
70#[derive(Debug, Clone, PartialEq, Eq, Default)]
71#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
72pub struct SnapshotEngineControl {
73    /// Whether scheduling and dispatch are paused.
74    pub paused: bool,
75    /// Timestamp of most recent pause event, if any.
76    pub paused_at: Option<u64>,
77    /// Timestamp of most recent resume event, if any.
78    pub resumed_at: Option<u64>,
79}
80
81/// Snapshot representation of a dependency declaration.
82///
83/// Records that a task's promotion is gated on the successful completion
84/// of all prerequisite tasks listed in `depends_on`.
85#[derive(Debug, Clone, PartialEq, Eq)]
86#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
87pub struct SnapshotDependencyDeclaration {
88    /// The task whose promotion is gated.
89    pub task_id: TaskId,
90    /// The prerequisite task identifiers (must all complete first).
91    pub depends_on: Vec<TaskId>,
92    /// Timestamp when the declaration was captured (snapshot time).
93    pub declared_at: u64,
94}
95
96/// Metadata about a snapshot for versioning and compatibility.
97///
98/// This structure provides extensibility for future snapshot versions while
99/// maintaining backward compatibility with existing data.
100#[derive(Debug, Clone, PartialEq, Eq)]
101#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
102pub struct SnapshotMetadata {
103    /// The schema version of the snapshot format.
104    ///
105    /// This version is incremented when breaking changes are made to the snapshot
106    /// format. The snapshot loader uses this to determine compatibility.
107    pub schema_version: u32,
108    /// The WAL sequence number at the time of the snapshot.
109    ///
110    /// This is the last sequence number that was applied to the state captured
111    /// in this snapshot. WAL replay should start from sequence number + 1.
112    pub wal_sequence: u64,
113    /// The number of tasks included in this snapshot.
114    pub task_count: u64,
115    /// The number of runs included in this snapshot.
116    pub run_count: u64,
117}
118
119/// A task in the snapshot.
120#[derive(Debug, Clone, PartialEq, Eq)]
121#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
122pub struct SnapshotTask {
123    /// The task specification.
124    pub task_spec: TaskSpec,
125    /// The timestamp when the task was created (Unix epoch seconds).
126    pub created_at: u64,
127    /// The timestamp when the task was last updated (Unix epoch seconds), if any.
128    pub updated_at: Option<u64>,
129    /// The timestamp when the task was canceled (Unix epoch seconds), if canceled.
130    #[cfg_attr(feature = "serde", serde(default))]
131    pub canceled_at: Option<u64>,
132}
133
134/// A run in the snapshot.
135#[derive(Debug, Clone, PartialEq, Eq)]
136#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
137pub struct SnapshotRun {
138    /// The canonical run payload.
139    ///
140    /// Snapshot persistence intentionally embeds the core
141    /// [`RunInstance`](actionqueue_core::run::run_instance::RunInstance) to keep
142    /// run-shape semantics anchored to the single contract source.
143    pub run_instance: CoreRunInstance,
144    /// Deterministic run state history entries.
145    pub state_history: Vec<SnapshotRunStateHistoryEntry>,
146    /// Deterministic attempt lineage entries.
147    pub attempts: Vec<SnapshotAttemptHistoryEntry>,
148    /// Deterministic lease metadata snapshot, if any.
149    pub lease: Option<SnapshotLeaseMetadata>,
150}
151
152impl SnapshotRun {
153    /// Returns the run identifier of the embedded canonical payload.
154    pub fn run_id(&self) -> RunId {
155        self.run_instance.id()
156    }
157}
158
159/// Snapshot representation of a run state history entry.
160#[derive(Debug, Clone, PartialEq, Eq)]
161#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
162pub struct SnapshotRunStateHistoryEntry {
163    /// The previous state, if any.
164    pub from: Option<actionqueue_core::run::state::RunState>,
165    /// The new state recorded by the WAL event.
166    pub to: actionqueue_core::run::state::RunState,
167    /// The timestamp associated with the transition.
168    pub timestamp: u64,
169}
170
171impl From<RunStateHistoryEntry> for SnapshotRunStateHistoryEntry {
172    fn from(entry: RunStateHistoryEntry) -> Self {
173        Self { from: entry.from, to: entry.to, timestamp: entry.timestamp }
174    }
175}
176
177/// Snapshot representation of an attempt history entry.
178#[derive(Debug, Clone, PartialEq, Eq)]
179#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
180pub struct SnapshotAttemptHistoryEntry {
181    /// The attempt identifier.
182    pub attempt_id: actionqueue_core::ids::AttemptId,
183    /// The timestamp when the attempt started.
184    pub started_at: u64,
185    /// The timestamp when the attempt finished, if finished.
186    pub finished_at: Option<u64>,
187    /// Canonical attempt result taxonomy, if finished.
188    pub result: Option<actionqueue_core::mutation::AttemptResultKind>,
189    /// Optional error message when the attempt failed.
190    pub error: Option<String>,
191    /// Optional opaque output bytes produced by the handler on success.
192    #[cfg_attr(feature = "serde", serde(default))]
193    pub output: Option<Vec<u8>>,
194}
195
196impl From<AttemptHistoryEntry> for SnapshotAttemptHistoryEntry {
197    fn from(entry: AttemptHistoryEntry) -> Self {
198        Self {
199            attempt_id: entry.attempt_id,
200            started_at: entry.started_at,
201            finished_at: entry.finished_at,
202            result: entry.result,
203            error: entry.error,
204            output: entry.output,
205        }
206    }
207}
208
209/// Snapshot representation of lease metadata.
210#[derive(Debug, Clone, PartialEq, Eq)]
211#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
212pub struct SnapshotLeaseMetadata {
213    /// Lease owner string.
214    pub owner: String,
215    /// Lease expiry timestamp.
216    pub expiry: u64,
217    /// Timestamp when the lease was acquired.
218    pub acquired_at: u64,
219    /// Timestamp when the lease was last updated.
220    pub updated_at: u64,
221}
222
223impl From<LeaseMetadata> for SnapshotLeaseMetadata {
224    fn from(metadata: LeaseMetadata) -> Self {
225        Self {
226            owner: metadata.owner,
227            expiry: metadata.expiry,
228            acquired_at: metadata.acquired_at,
229            updated_at: metadata.updated_at,
230        }
231    }
232}
233
234/// Snapshot representation of a budget allocation and consumption record.
235#[derive(Debug, Clone, PartialEq, Eq)]
236#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
237pub struct SnapshotBudget {
238    /// The task whose budget this record covers.
239    pub task_id: actionqueue_core::ids::TaskId,
240    /// The budget dimension.
241    pub dimension: BudgetDimension,
242    /// The maximum amount allowed before dispatch is blocked.
243    pub limit: u64,
244    /// The total amount consumed so far.
245    pub consumed: u64,
246    /// The timestamp when the budget was allocated.
247    pub allocated_at: u64,
248    /// Whether the budget has been exhausted.
249    pub exhausted: bool,
250}
251
252/// Snapshot representation of a subscription state record.
253#[derive(Debug, Clone, PartialEq, Eq)]
254#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
255pub struct SnapshotSubscription {
256    /// The subscription identifier.
257    pub subscription_id: SubscriptionId,
258    /// The subscribing task identifier.
259    pub task_id: actionqueue_core::ids::TaskId,
260    /// The event filter for this subscription.
261    pub filter: EventFilter,
262    /// The timestamp when the subscription was created.
263    pub created_at: u64,
264    /// The timestamp when the subscription was last triggered, if any.
265    pub triggered_at: Option<u64>,
266    /// The timestamp when the subscription was canceled, if any.
267    pub canceled_at: Option<u64>,
268}
269
270/// Snapshot representation of a registered actor.
271#[derive(Debug, Clone, PartialEq, Eq)]
272#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
273pub struct SnapshotActor {
274    pub actor_id: ActorId,
275    pub identity: String,
276    pub capabilities: Vec<String>,
277    pub department: Option<String>,
278    pub heartbeat_interval_secs: u64,
279    pub tenant_id: Option<TenantId>,
280    pub registered_at: u64,
281    pub last_heartbeat_at: Option<u64>,
282    pub deregistered_at: Option<u64>,
283}
284
285/// Snapshot representation of a tenant.
286#[derive(Debug, Clone, PartialEq, Eq)]
287#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
288pub struct SnapshotTenant {
289    pub tenant_id: TenantId,
290    pub name: String,
291    pub created_at: u64,
292}
293
294/// Snapshot representation of a role assignment.
295#[derive(Debug, Clone, PartialEq, Eq)]
296#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
297pub struct SnapshotRoleAssignment {
298    pub actor_id: ActorId,
299    pub role: Role,
300    pub tenant_id: TenantId,
301    pub assigned_at: u64,
302}
303
304/// Snapshot representation of a capability grant.
305#[derive(Debug, Clone, PartialEq, Eq)]
306#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
307pub struct SnapshotCapabilityGrant {
308    pub actor_id: ActorId,
309    pub capability: Capability,
310    pub tenant_id: TenantId,
311    pub granted_at: u64,
312    pub revoked_at: Option<u64>,
313}
314
315/// Snapshot representation of a ledger entry.
316#[derive(Debug, Clone, PartialEq, Eq)]
317#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
318pub struct SnapshotLedgerEntry {
319    pub entry_id: LedgerEntryId,
320    pub tenant_id: TenantId,
321    pub ledger_key: String,
322    pub actor_id: Option<ActorId>,
323    pub payload: Vec<u8>,
324    pub timestamp: u64,
325}