Skip to main content

actionqueue_storage/snapshot/
build.rs

1//! Builds a Snapshot from the current projection state.
2
3use crate::recovery::reducer::ReplayReducer;
4use crate::snapshot::mapping::{validate_snapshot, SnapshotMappingError, SNAPSHOT_SCHEMA_VERSION};
5use crate::snapshot::model::{
6    Snapshot, SnapshotActor, SnapshotAttemptHistoryEntry, SnapshotBudget, SnapshotCapabilityGrant,
7    SnapshotDependencyDeclaration, SnapshotEngineControl, SnapshotLeaseMetadata,
8    SnapshotLedgerEntry, SnapshotMetadata, SnapshotRoleAssignment, SnapshotRun,
9    SnapshotRunStateHistoryEntry, SnapshotSubscription, SnapshotTask, SnapshotTenant,
10};
11
12/// The snapshot format version written by this implementation.
13const SNAPSHOT_FORMAT_VERSION: u32 = 4;
14
15/// Builds a validated [`Snapshot`] from the current state of a [`ReplayReducer`].
16///
17/// This function extracts all tasks, runs (with history, attempts, and lease
18/// metadata), and engine control state from the reducer and assembles them
19/// into a snapshot. The snapshot is validated before being returned.
20pub fn build_snapshot_from_projection(
21    reducer: &ReplayReducer,
22    timestamp: u64,
23) -> Result<Snapshot, SnapshotMappingError> {
24    let task_count = reducer.task_records().count();
25    let run_count = reducer.run_instances().count();
26    tracing::debug!(task_count, run_count, "building snapshot from projection");
27
28    let tasks: Vec<SnapshotTask> = reducer
29        .task_records()
30        .map(|tr| SnapshotTask {
31            task_spec: tr.task_spec().clone(),
32            created_at: tr.created_at(),
33            updated_at: tr.updated_at(),
34            canceled_at: tr.canceled_at(),
35        })
36        .collect();
37
38    let runs: Vec<SnapshotRun> = reducer
39        .run_instances()
40        .map(|ri| {
41            let run_id = ri.id();
42            let state_history = reducer
43                .get_run_history(&run_id)
44                .map(|h| {
45                    h.iter()
46                        .map(|entry| SnapshotRunStateHistoryEntry {
47                            from: entry.from(),
48                            to: entry.to(),
49                            timestamp: entry.timestamp(),
50                        })
51                        .collect()
52                })
53                .unwrap_or_default();
54            let attempts = reducer
55                .get_attempt_history(&run_id)
56                .map(|a| {
57                    a.iter()
58                        .map(|entry| SnapshotAttemptHistoryEntry {
59                            attempt_id: entry.attempt_id(),
60                            started_at: entry.started_at(),
61                            finished_at: entry.finished_at(),
62                            result: entry.result(),
63                            error: entry.error().map(|s| s.to_string()),
64                            output: entry.output().map(|b| b.to_vec()),
65                        })
66                        .collect()
67                })
68                .unwrap_or_default();
69            let lease = reducer.get_lease_metadata(&run_id).map(|lm| SnapshotLeaseMetadata {
70                owner: lm.owner().to_string(),
71                expiry: lm.expiry(),
72                acquired_at: lm.acquired_at(),
73                updated_at: lm.updated_at(),
74            });
75
76            SnapshotRun { run_instance: ri.clone(), state_history, attempts, lease }
77        })
78        .collect();
79
80    let dependency_declarations: Vec<SnapshotDependencyDeclaration> = reducer
81        .dependency_declarations()
82        .map(|(task_id, prereqs)| SnapshotDependencyDeclaration {
83            task_id,
84            depends_on: {
85                let mut deps: Vec<actionqueue_core::ids::TaskId> =
86                    prereqs.iter().copied().collect();
87                deps.sort_by_key(|id| *id.as_uuid());
88                deps
89            },
90            declared_at: timestamp,
91        })
92        .collect();
93
94    let budgets: Vec<SnapshotBudget> = reducer
95        .budgets()
96        .map(|((task_id, _dimension), record)| SnapshotBudget {
97            task_id: *task_id,
98            dimension: record.dimension,
99            limit: record.limit,
100            consumed: record.consumed,
101            allocated_at: record.allocated_at,
102            exhausted: record.exhausted,
103        })
104        .collect();
105
106    let subscriptions: Vec<SnapshotSubscription> = reducer
107        .subscriptions()
108        .map(|(_id, record)| SnapshotSubscription {
109            subscription_id: record.subscription_id,
110            task_id: record.task_id,
111            filter: record.filter.clone(),
112            created_at: record.created_at,
113            triggered_at: record.triggered_at,
114            canceled_at: record.canceled_at,
115        })
116        .collect();
117
118    let actors: Vec<SnapshotActor> = reducer
119        .actors()
120        .map(|(_, record)| SnapshotActor {
121            actor_id: record.actor_id,
122            identity: record.identity.clone(),
123            capabilities: record.capabilities.clone(),
124            department: record.department.clone(),
125            heartbeat_interval_secs: record.heartbeat_interval_secs,
126            tenant_id: record.tenant_id,
127            registered_at: record.registered_at,
128            last_heartbeat_at: record.last_heartbeat_at,
129            deregistered_at: record.deregistered_at,
130        })
131        .collect();
132
133    let tenants: Vec<SnapshotTenant> = reducer
134        .tenants()
135        .map(|(_, record)| SnapshotTenant {
136            tenant_id: record.tenant_id,
137            name: record.name.clone(),
138            created_at: record.created_at,
139        })
140        .collect();
141
142    let role_assignments: Vec<SnapshotRoleAssignment> = reducer
143        .role_assignments()
144        .map(|r| SnapshotRoleAssignment {
145            actor_id: r.actor_id,
146            role: r.role.clone(),
147            tenant_id: r.tenant_id,
148            assigned_at: r.assigned_at,
149        })
150        .collect();
151
152    let capability_grants: Vec<SnapshotCapabilityGrant> = reducer
153        .capability_grants()
154        .map(|r| SnapshotCapabilityGrant {
155            actor_id: r.actor_id,
156            capability: r.capability.clone(),
157            tenant_id: r.tenant_id,
158            granted_at: r.granted_at,
159            revoked_at: r.revoked_at,
160        })
161        .collect();
162
163    let ledger_entries: Vec<SnapshotLedgerEntry> = reducer
164        .ledger_entries()
165        .map(|r| SnapshotLedgerEntry {
166            entry_id: r.entry_id,
167            tenant_id: r.tenant_id,
168            ledger_key: r.ledger_key.clone(),
169            actor_id: r.actor_id,
170            payload: r.payload.clone(),
171            timestamp: r.timestamp,
172        })
173        .collect();
174
175    let snapshot = Snapshot {
176        version: SNAPSHOT_FORMAT_VERSION,
177        timestamp,
178        metadata: SnapshotMetadata {
179            schema_version: SNAPSHOT_SCHEMA_VERSION,
180            wal_sequence: reducer.latest_sequence(),
181            task_count: tasks.len() as u64,
182            run_count: runs.len() as u64,
183        },
184        tasks,
185        runs,
186        engine: SnapshotEngineControl {
187            paused: reducer.is_engine_paused(),
188            paused_at: reducer.engine_paused_at(),
189            resumed_at: reducer.engine_resumed_at(),
190        },
191        dependency_declarations,
192        budgets,
193        subscriptions,
194        actors,
195        tenants,
196        role_assignments,
197        capability_grants,
198        ledger_entries,
199    };
200
201    validate_snapshot(&snapshot)?;
202    tracing::debug!(
203        wal_sequence = snapshot.metadata.wal_sequence,
204        task_count = snapshot.metadata.task_count,
205        run_count = snapshot.metadata.run_count,
206        "snapshot build complete"
207    );
208    Ok(snapshot)
209}