actionqueue_storage/snapshot/
build.rs1use crate::recovery::reducer::ReplayReducer;
4use crate::snapshot::mapping::{validate_snapshot, SnapshotMappingError, SNAPSHOT_SCHEMA_VERSION};
5use crate::snapshot::model::{
6 Snapshot, SnapshotActor, SnapshotAttemptHistoryEntry, SnapshotBudget, SnapshotCapabilityGrant,
7 SnapshotDependencyDeclaration, SnapshotEngineControl, SnapshotLeaseMetadata,
8 SnapshotLedgerEntry, SnapshotMetadata, SnapshotRoleAssignment, SnapshotRun,
9 SnapshotRunStateHistoryEntry, SnapshotSubscription, SnapshotTask, SnapshotTenant,
10};
11
12const SNAPSHOT_FORMAT_VERSION: u32 = 4;
14
15pub fn build_snapshot_from_projection(
21 reducer: &ReplayReducer,
22 timestamp: u64,
23) -> Result<Snapshot, SnapshotMappingError> {
24 let task_count = reducer.task_records().count();
25 let run_count = reducer.run_instances().count();
26 tracing::debug!(task_count, run_count, "building snapshot from projection");
27
28 let tasks: Vec<SnapshotTask> = reducer
29 .task_records()
30 .map(|tr| SnapshotTask {
31 task_spec: tr.task_spec().clone(),
32 created_at: tr.created_at(),
33 updated_at: tr.updated_at(),
34 canceled_at: tr.canceled_at(),
35 })
36 .collect();
37
38 let runs: Vec<SnapshotRun> = reducer
39 .run_instances()
40 .map(|ri| {
41 let run_id = ri.id();
42 let state_history = reducer
43 .get_run_history(&run_id)
44 .map(|h| {
45 h.iter()
46 .map(|entry| SnapshotRunStateHistoryEntry {
47 from: entry.from(),
48 to: entry.to(),
49 timestamp: entry.timestamp(),
50 })
51 .collect()
52 })
53 .unwrap_or_default();
54 let attempts = reducer
55 .get_attempt_history(&run_id)
56 .map(|a| {
57 a.iter()
58 .map(|entry| SnapshotAttemptHistoryEntry {
59 attempt_id: entry.attempt_id(),
60 started_at: entry.started_at(),
61 finished_at: entry.finished_at(),
62 result: entry.result(),
63 error: entry.error().map(|s| s.to_string()),
64 output: entry.output().map(|b| b.to_vec()),
65 })
66 .collect()
67 })
68 .unwrap_or_default();
69 let lease = reducer.get_lease_metadata(&run_id).map(|lm| SnapshotLeaseMetadata {
70 owner: lm.owner().to_string(),
71 expiry: lm.expiry(),
72 acquired_at: lm.acquired_at(),
73 updated_at: lm.updated_at(),
74 });
75
76 SnapshotRun { run_instance: ri.clone(), state_history, attempts, lease }
77 })
78 .collect();
79
80 let dependency_declarations: Vec<SnapshotDependencyDeclaration> = reducer
81 .dependency_declarations()
82 .map(|(task_id, prereqs)| SnapshotDependencyDeclaration {
83 task_id,
84 depends_on: {
85 let mut deps: Vec<actionqueue_core::ids::TaskId> =
86 prereqs.iter().copied().collect();
87 deps.sort_by_key(|id| *id.as_uuid());
88 deps
89 },
90 declared_at: timestamp,
91 })
92 .collect();
93
94 let budgets: Vec<SnapshotBudget> = reducer
95 .budgets()
96 .map(|((task_id, _dimension), record)| SnapshotBudget {
97 task_id: *task_id,
98 dimension: record.dimension,
99 limit: record.limit,
100 consumed: record.consumed,
101 allocated_at: record.allocated_at,
102 exhausted: record.exhausted,
103 })
104 .collect();
105
106 let subscriptions: Vec<SnapshotSubscription> = reducer
107 .subscriptions()
108 .map(|(_id, record)| SnapshotSubscription {
109 subscription_id: record.subscription_id,
110 task_id: record.task_id,
111 filter: record.filter.clone(),
112 created_at: record.created_at,
113 triggered_at: record.triggered_at,
114 canceled_at: record.canceled_at,
115 })
116 .collect();
117
118 let actors: Vec<SnapshotActor> = reducer
119 .actors()
120 .map(|(_, record)| SnapshotActor {
121 actor_id: record.actor_id,
122 identity: record.identity.clone(),
123 capabilities: record.capabilities.clone(),
124 department: record.department.clone(),
125 heartbeat_interval_secs: record.heartbeat_interval_secs,
126 tenant_id: record.tenant_id,
127 registered_at: record.registered_at,
128 last_heartbeat_at: record.last_heartbeat_at,
129 deregistered_at: record.deregistered_at,
130 })
131 .collect();
132
133 let tenants: Vec<SnapshotTenant> = reducer
134 .tenants()
135 .map(|(_, record)| SnapshotTenant {
136 tenant_id: record.tenant_id,
137 name: record.name.clone(),
138 created_at: record.created_at,
139 })
140 .collect();
141
142 let role_assignments: Vec<SnapshotRoleAssignment> = reducer
143 .role_assignments()
144 .map(|r| SnapshotRoleAssignment {
145 actor_id: r.actor_id,
146 role: r.role.clone(),
147 tenant_id: r.tenant_id,
148 assigned_at: r.assigned_at,
149 })
150 .collect();
151
152 let capability_grants: Vec<SnapshotCapabilityGrant> = reducer
153 .capability_grants()
154 .map(|r| SnapshotCapabilityGrant {
155 actor_id: r.actor_id,
156 capability: r.capability.clone(),
157 tenant_id: r.tenant_id,
158 granted_at: r.granted_at,
159 revoked_at: r.revoked_at,
160 })
161 .collect();
162
163 let ledger_entries: Vec<SnapshotLedgerEntry> = reducer
164 .ledger_entries()
165 .map(|r| SnapshotLedgerEntry {
166 entry_id: r.entry_id,
167 tenant_id: r.tenant_id,
168 ledger_key: r.ledger_key.clone(),
169 actor_id: r.actor_id,
170 payload: r.payload.clone(),
171 timestamp: r.timestamp,
172 })
173 .collect();
174
175 let snapshot = Snapshot {
176 version: SNAPSHOT_FORMAT_VERSION,
177 timestamp,
178 metadata: SnapshotMetadata {
179 schema_version: SNAPSHOT_SCHEMA_VERSION,
180 wal_sequence: reducer.latest_sequence(),
181 task_count: tasks.len() as u64,
182 run_count: runs.len() as u64,
183 },
184 tasks,
185 runs,
186 engine: SnapshotEngineControl {
187 paused: reducer.is_engine_paused(),
188 paused_at: reducer.engine_paused_at(),
189 resumed_at: reducer.engine_resumed_at(),
190 },
191 dependency_declarations,
192 budgets,
193 subscriptions,
194 actors,
195 tenants,
196 role_assignments,
197 capability_grants,
198 ledger_entries,
199 };
200
201 validate_snapshot(&snapshot)?;
202 tracing::debug!(
203 wal_sequence = snapshot.metadata.wal_sequence,
204 task_count = snapshot.metadata.task_count,
205 run_count = snapshot.metadata.run_count,
206 "snapshot build complete"
207 );
208 Ok(snapshot)
209}