actionqueue_storage/wal/event.rs
1//! WAL event types for the ActionQueue system.
2//!
3//! This module defines the append-only event types that make up the Write-Ahead Log (WAL).
4//! Each event represents a durable state change in the system and is idempotent across replay.
5
6use actionqueue_core::budget::BudgetDimension;
7use actionqueue_core::ids::{ActorId, AttemptId, LedgerEntryId, RunId, TaskId, TenantId};
8use actionqueue_core::mutation::AttemptResultKind;
9use actionqueue_core::platform::{Capability, Role};
10use actionqueue_core::run::state::RunState;
11use actionqueue_core::subscription::{EventFilter, SubscriptionId};
12use actionqueue_core::task::task_spec::TaskSpec;
13
14/// A uniquely identifiable WAL event.
15#[derive(Debug, Clone, PartialEq, Eq)]
16#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
17pub struct WalEvent {
18 /// Monotonically increasing sequence number for the event.
19 sequence: u64,
20 /// The type and payload of this event.
21 event: WalEventType,
22}
23
24impl WalEvent {
25 /// Creates a new WAL event.
26 pub fn new(sequence: u64, event: WalEventType) -> Self {
27 Self { sequence, event }
28 }
29
30 /// Returns the monotonically increasing sequence number.
31 pub fn sequence(&self) -> u64 {
32 self.sequence
33 }
34
35 /// Returns a reference to the event type and payload.
36 pub fn event(&self) -> &WalEventType {
37 &self.event
38 }
39}
40
41/// The type of WAL event that occurred.
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
44pub enum WalEventType {
45 /// A new task definition has been persisted.
46 TaskCreated {
47 /// The task specification that was created.
48 task_spec: TaskSpec,
49 /// The timestamp when the task was created.
50 ///
51 /// Unix epoch seconds; source-of-truth for task created_at.
52 timestamp: u64,
53 },
54
55 /// A new run instance has been created for a task.
56 RunCreated {
57 /// The run instance that was created.
58 run_instance: actionqueue_core::run::run_instance::RunInstance,
59 },
60
61 /// A run instance transitioned to a new state.
62 RunStateChanged {
63 /// The run that changed state.
64 run_id: RunId,
65 /// The previous state (before the transition).
66 previous_state: RunState,
67 /// The new state (after the transition).
68 new_state: RunState,
69 /// The timestamp of the state change.
70 timestamp: u64,
71 },
72
73 /// An attempt to execute a run has started.
74 AttemptStarted {
75 /// The run that the attempt belongs to.
76 run_id: RunId,
77 /// The attempt that started.
78 attempt_id: AttemptId,
79 /// The timestamp when the attempt started.
80 timestamp: u64,
81 },
82
83 /// An attempt to execute a run has finished.
84 AttemptFinished {
85 /// The run that the attempt belongs to.
86 run_id: RunId,
87 /// The attempt that finished.
88 attempt_id: AttemptId,
89 /// Canonical attempt result taxonomy.
90 result: AttemptResultKind,
91 /// Optional error message if the attempt failed.
92 error: Option<String>,
93 /// Optional opaque output bytes produced by the handler.
94 ///
95 /// Populated from `HandlerOutput::Success { output }` via the executor
96 /// response chain. Stored in the WAL for recovery and projection queries.
97 // NOTE: #[serde(default)] is inert for postcard (non-self-describing format).
98 // Retained for documentation symmetry with the snapshot model.
99 #[cfg_attr(feature = "serde", serde(default))]
100 output: Option<Vec<u8>>,
101 /// The timestamp when the attempt finished.
102 timestamp: u64,
103 },
104
105 /// A task has been canceled.
106 TaskCanceled {
107 /// The task that was canceled.
108 task_id: TaskId,
109 /// The timestamp when the task was canceled.
110 timestamp: u64,
111 },
112
113 /// A run has been canceled.
114 RunCanceled {
115 /// The run that was canceled.
116 run_id: RunId,
117 /// The timestamp when the run was canceled.
118 timestamp: u64,
119 },
120
121 /// A lease was acquired for a run.
122 LeaseAcquired {
123 /// The run that the lease belongs to.
124 run_id: RunId,
125 /// The worker that acquired the lease.
126 owner: String,
127 /// The expiry timestamp of the lease.
128 expiry: u64,
129 /// The timestamp when the lease was acquired.
130 timestamp: u64,
131 },
132
133 /// A lease heartbeat (renewal) was recorded.
134 LeaseHeartbeat {
135 /// The run that the lease belongs to.
136 run_id: RunId,
137 /// The worker that sent the heartbeat.
138 owner: String,
139 /// The new expiry timestamp after heartbeat.
140 expiry: u64,
141 /// The timestamp when the heartbeat was recorded.
142 timestamp: u64,
143 },
144
145 /// A lease expired (either by time or manual release).
146 LeaseExpired {
147 /// The run that the lease belonged to.
148 run_id: RunId,
149 /// The worker that held the lease.
150 owner: String,
151 /// The expiry timestamp of the lease.
152 expiry: u64,
153 /// The timestamp when the lease expired.
154 timestamp: u64,
155 },
156
157 /// A lease was released before expiry.
158 LeaseReleased {
159 /// The run that the lease belonged to.
160 run_id: RunId,
161 /// The worker that released the lease.
162 owner: String,
163 /// The expiry timestamp of the lease at release.
164 expiry: u64,
165 /// The timestamp when the lease was released.
166 timestamp: u64,
167 },
168
169 /// Engine scheduling and dispatch has been paused.
170 EnginePaused {
171 /// The timestamp when engine pause was recorded.
172 timestamp: u64,
173 },
174
175 /// Engine scheduling and dispatch has been resumed.
176 EngineResumed {
177 /// The timestamp when engine resume was recorded.
178 timestamp: u64,
179 },
180
181 /// Task dependency declarations have been durably recorded.
182 ///
183 /// The named task will not be promoted to Ready until all tasks in
184 /// `depends_on` have reached terminal success.
185 ///
186 /// Dependency satisfaction and failure are **not** WAL events — they are
187 /// ephemeral projections derived at bootstrap from `DependencyDeclared`
188 /// events + run terminal states. This is consistent with the architectural
189 /// principle that in-memory indices are ephemeral projections, not
190 /// independent durable state.
191 DependencyDeclared {
192 /// The task whose promotion is gated.
193 task_id: TaskId,
194 /// The prerequisite task identifiers (must all complete first).
195 depends_on: Vec<TaskId>,
196 /// The timestamp when the dependency was declared.
197 timestamp: u64,
198 },
199
200 /// A run has been suspended (e.g. by budget exhaustion).
201 RunSuspended {
202 /// The run that was suspended.
203 run_id: RunId,
204 /// Optional human-readable suspension reason.
205 reason: Option<String>,
206 /// The timestamp when the run was suspended.
207 timestamp: u64,
208 },
209
210 /// A suspended run has been resumed.
211 RunResumed {
212 /// The run that was resumed.
213 run_id: RunId,
214 /// The timestamp when the run was resumed.
215 timestamp: u64,
216 },
217
218 /// A budget has been allocated for a task dimension.
219 BudgetAllocated {
220 /// The task receiving the budget allocation.
221 task_id: TaskId,
222 /// The budget dimension being allocated.
223 dimension: BudgetDimension,
224 /// The maximum amount allowed before dispatch is blocked.
225 limit: u64,
226 /// The timestamp when the budget was allocated.
227 timestamp: u64,
228 },
229
230 /// Resource consumption has been recorded against a task budget.
231 BudgetConsumed {
232 /// The task whose budget was consumed.
233 task_id: TaskId,
234 /// The budget dimension being consumed.
235 dimension: BudgetDimension,
236 /// The amount consumed in this event.
237 amount: u64,
238 /// The timestamp when consumption was recorded.
239 timestamp: u64,
240 },
241
242 /// Reserved for WAL v4 enum ordering. Exhaustion is a derived projection
243 /// from `BudgetConsumed` events (consumed >= limit). This variant is never
244 /// appended to the WAL on disk but may be synthesized during snapshot
245 /// bootstrap for reducer replay. Must be retained to preserve postcard
246 /// deserialization order.
247 BudgetExhausted {
248 /// The task whose budget was exhausted.
249 task_id: TaskId,
250 /// The exhausted budget dimension.
251 dimension: BudgetDimension,
252 /// The timestamp when exhaustion was recorded.
253 timestamp: u64,
254 },
255
256 /// A task budget has been replenished with a new limit.
257 BudgetReplenished {
258 /// The task whose budget was replenished.
259 task_id: TaskId,
260 /// The budget dimension being replenished.
261 dimension: BudgetDimension,
262 /// The new limit after replenishment.
263 new_limit: u64,
264 /// The timestamp when replenishment was recorded.
265 timestamp: u64,
266 },
267
268 /// An event subscription has been created for a task.
269 SubscriptionCreated {
270 /// The subscription identifier.
271 subscription_id: SubscriptionId,
272 /// The subscribing task.
273 task_id: TaskId,
274 /// The event filter that governs when this subscription fires.
275 filter: EventFilter,
276 /// The timestamp when the subscription was created.
277 timestamp: u64,
278 },
279
280 /// A subscription has been triggered by a matching event.
281 SubscriptionTriggered {
282 /// The subscription that was triggered.
283 subscription_id: SubscriptionId,
284 /// The timestamp when the subscription was triggered.
285 timestamp: u64,
286 },
287
288 /// A subscription has been canceled.
289 SubscriptionCanceled {
290 /// The subscription that was canceled.
291 subscription_id: SubscriptionId,
292 /// The timestamp when the subscription was canceled.
293 timestamp: u64,
294 },
295
296 // ── WAL v5: Actor events (discriminants 23-25) ─────────────────────────
297 /// A remote actor has registered with the hub.
298 ActorRegistered {
299 actor_id: ActorId,
300 identity: String,
301 capabilities: Vec<String>,
302 department: Option<String>,
303 heartbeat_interval_secs: u64,
304 tenant_id: Option<TenantId>,
305 timestamp: u64,
306 },
307
308 /// A remote actor has deregistered (explicit or heartbeat timeout).
309 ActorDeregistered { actor_id: ActorId, timestamp: u64 },
310
311 /// A remote actor sent a heartbeat.
312 ActorHeartbeat { actor_id: ActorId, timestamp: u64 },
313
314 // ── WAL v5: Platform events (discriminants 26-31) ──────────────────────
315 /// An organizational tenant was created.
316 TenantCreated { tenant_id: TenantId, name: String, timestamp: u64 },
317
318 /// A role was assigned to an actor within a tenant.
319 RoleAssigned { actor_id: ActorId, role: Role, tenant_id: TenantId, timestamp: u64 },
320
321 /// A capability was granted to an actor within a tenant.
322 CapabilityGranted {
323 actor_id: ActorId,
324 capability: Capability,
325 tenant_id: TenantId,
326 timestamp: u64,
327 },
328
329 /// A capability grant was revoked.
330 CapabilityRevoked {
331 actor_id: ActorId,
332 capability: Capability,
333 tenant_id: TenantId,
334 timestamp: u64,
335 },
336
337 /// A ledger entry was appended.
338 LedgerEntryAppended {
339 entry_id: LedgerEntryId,
340 tenant_id: TenantId,
341 ledger_key: String,
342 actor_id: Option<ActorId>,
343 payload: Vec<u8>,
344 timestamp: u64,
345 },
346}