Skip to main content

actionqueue_storage/wal/
event.rs

1//! WAL event types for the ActionQueue system.
2//!
3//! This module defines the append-only event types that make up the Write-Ahead Log (WAL).
4//! Each event represents a durable state change in the system and is idempotent across replay.
5
6use actionqueue_core::budget::BudgetDimension;
7use actionqueue_core::ids::{ActorId, AttemptId, LedgerEntryId, RunId, TaskId, TenantId};
8use actionqueue_core::mutation::AttemptResultKind;
9use actionqueue_core::platform::{Capability, Role};
10use actionqueue_core::run::state::RunState;
11use actionqueue_core::subscription::{EventFilter, SubscriptionId};
12use actionqueue_core::task::task_spec::TaskSpec;
13
14/// A uniquely identifiable WAL event.
15#[derive(Debug, Clone, PartialEq, Eq)]
16#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
17pub struct WalEvent {
18    /// Monotonically increasing sequence number for the event.
19    sequence: u64,
20    /// The type and payload of this event.
21    event: WalEventType,
22}
23
24impl WalEvent {
25    /// Creates a new WAL event.
26    pub fn new(sequence: u64, event: WalEventType) -> Self {
27        Self { sequence, event }
28    }
29
30    /// Returns the monotonically increasing sequence number.
31    pub fn sequence(&self) -> u64 {
32        self.sequence
33    }
34
35    /// Returns a reference to the event type and payload.
36    pub fn event(&self) -> &WalEventType {
37        &self.event
38    }
39}
40
41/// The type of WAL event that occurred.
42#[derive(Debug, Clone, PartialEq, Eq)]
43#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
44pub enum WalEventType {
45    /// A new task definition has been persisted.
46    TaskCreated {
47        /// The task specification that was created.
48        task_spec: TaskSpec,
49        /// The timestamp when the task was created.
50        ///
51        /// Unix epoch seconds; source-of-truth for task created_at.
52        timestamp: u64,
53    },
54
55    /// A new run instance has been created for a task.
56    RunCreated {
57        /// The run instance that was created.
58        run_instance: actionqueue_core::run::run_instance::RunInstance,
59    },
60
61    /// A run instance transitioned to a new state.
62    RunStateChanged {
63        /// The run that changed state.
64        run_id: RunId,
65        /// The previous state (before the transition).
66        previous_state: RunState,
67        /// The new state (after the transition).
68        new_state: RunState,
69        /// The timestamp of the state change.
70        timestamp: u64,
71    },
72
73    /// An attempt to execute a run has started.
74    AttemptStarted {
75        /// The run that the attempt belongs to.
76        run_id: RunId,
77        /// The attempt that started.
78        attempt_id: AttemptId,
79        /// The timestamp when the attempt started.
80        timestamp: u64,
81    },
82
83    /// An attempt to execute a run has finished.
84    AttemptFinished {
85        /// The run that the attempt belongs to.
86        run_id: RunId,
87        /// The attempt that finished.
88        attempt_id: AttemptId,
89        /// Canonical attempt result taxonomy.
90        result: AttemptResultKind,
91        /// Optional error message if the attempt failed.
92        error: Option<String>,
93        /// Optional opaque output bytes produced by the handler.
94        ///
95        /// Populated from `HandlerOutput::Success { output }` via the executor
96        /// response chain. Stored in the WAL for recovery and projection queries.
97        // NOTE: #[serde(default)] is inert for postcard (non-self-describing format).
98        // Retained for documentation symmetry with the snapshot model.
99        #[cfg_attr(feature = "serde", serde(default))]
100        output: Option<Vec<u8>>,
101        /// The timestamp when the attempt finished.
102        timestamp: u64,
103    },
104
105    /// A task has been canceled.
106    TaskCanceled {
107        /// The task that was canceled.
108        task_id: TaskId,
109        /// The timestamp when the task was canceled.
110        timestamp: u64,
111    },
112
113    /// A run has been canceled.
114    RunCanceled {
115        /// The run that was canceled.
116        run_id: RunId,
117        /// The timestamp when the run was canceled.
118        timestamp: u64,
119    },
120
121    /// A lease was acquired for a run.
122    LeaseAcquired {
123        /// The run that the lease belongs to.
124        run_id: RunId,
125        /// The worker that acquired the lease.
126        owner: String,
127        /// The expiry timestamp of the lease.
128        expiry: u64,
129        /// The timestamp when the lease was acquired.
130        timestamp: u64,
131    },
132
133    /// A lease heartbeat (renewal) was recorded.
134    LeaseHeartbeat {
135        /// The run that the lease belongs to.
136        run_id: RunId,
137        /// The worker that sent the heartbeat.
138        owner: String,
139        /// The new expiry timestamp after heartbeat.
140        expiry: u64,
141        /// The timestamp when the heartbeat was recorded.
142        timestamp: u64,
143    },
144
145    /// A lease expired (either by time or manual release).
146    LeaseExpired {
147        /// The run that the lease belonged to.
148        run_id: RunId,
149        /// The worker that held the lease.
150        owner: String,
151        /// The expiry timestamp of the lease.
152        expiry: u64,
153        /// The timestamp when the lease expired.
154        timestamp: u64,
155    },
156
157    /// A lease was released before expiry.
158    LeaseReleased {
159        /// The run that the lease belonged to.
160        run_id: RunId,
161        /// The worker that released the lease.
162        owner: String,
163        /// The expiry timestamp of the lease at release.
164        expiry: u64,
165        /// The timestamp when the lease was released.
166        timestamp: u64,
167    },
168
169    /// Engine scheduling and dispatch has been paused.
170    EnginePaused {
171        /// The timestamp when engine pause was recorded.
172        timestamp: u64,
173    },
174
175    /// Engine scheduling and dispatch has been resumed.
176    EngineResumed {
177        /// The timestamp when engine resume was recorded.
178        timestamp: u64,
179    },
180
181    /// Task dependency declarations have been durably recorded.
182    ///
183    /// The named task will not be promoted to Ready until all tasks in
184    /// `depends_on` have reached terminal success.
185    ///
186    /// Dependency satisfaction and failure are **not** WAL events — they are
187    /// ephemeral projections derived at bootstrap from `DependencyDeclared`
188    /// events + run terminal states. This is consistent with the architectural
189    /// principle that in-memory indices are ephemeral projections, not
190    /// independent durable state.
191    DependencyDeclared {
192        /// The task whose promotion is gated.
193        task_id: TaskId,
194        /// The prerequisite task identifiers (must all complete first).
195        depends_on: Vec<TaskId>,
196        /// The timestamp when the dependency was declared.
197        timestamp: u64,
198    },
199
200    /// A run has been suspended (e.g. by budget exhaustion).
201    RunSuspended {
202        /// The run that was suspended.
203        run_id: RunId,
204        /// Optional human-readable suspension reason.
205        reason: Option<String>,
206        /// The timestamp when the run was suspended.
207        timestamp: u64,
208    },
209
210    /// A suspended run has been resumed.
211    RunResumed {
212        /// The run that was resumed.
213        run_id: RunId,
214        /// The timestamp when the run was resumed.
215        timestamp: u64,
216    },
217
218    /// A budget has been allocated for a task dimension.
219    BudgetAllocated {
220        /// The task receiving the budget allocation.
221        task_id: TaskId,
222        /// The budget dimension being allocated.
223        dimension: BudgetDimension,
224        /// The maximum amount allowed before dispatch is blocked.
225        limit: u64,
226        /// The timestamp when the budget was allocated.
227        timestamp: u64,
228    },
229
230    /// Resource consumption has been recorded against a task budget.
231    BudgetConsumed {
232        /// The task whose budget was consumed.
233        task_id: TaskId,
234        /// The budget dimension being consumed.
235        dimension: BudgetDimension,
236        /// The amount consumed in this event.
237        amount: u64,
238        /// The timestamp when consumption was recorded.
239        timestamp: u64,
240    },
241
242    /// Reserved for WAL v4 enum ordering. Exhaustion is a derived projection
243    /// from `BudgetConsumed` events (consumed >= limit). This variant is never
244    /// appended to the WAL on disk but may be synthesized during snapshot
245    /// bootstrap for reducer replay. Must be retained to preserve postcard
246    /// deserialization order.
247    BudgetExhausted {
248        /// The task whose budget was exhausted.
249        task_id: TaskId,
250        /// The exhausted budget dimension.
251        dimension: BudgetDimension,
252        /// The timestamp when exhaustion was recorded.
253        timestamp: u64,
254    },
255
256    /// A task budget has been replenished with a new limit.
257    BudgetReplenished {
258        /// The task whose budget was replenished.
259        task_id: TaskId,
260        /// The budget dimension being replenished.
261        dimension: BudgetDimension,
262        /// The new limit after replenishment.
263        new_limit: u64,
264        /// The timestamp when replenishment was recorded.
265        timestamp: u64,
266    },
267
268    /// An event subscription has been created for a task.
269    SubscriptionCreated {
270        /// The subscription identifier.
271        subscription_id: SubscriptionId,
272        /// The subscribing task.
273        task_id: TaskId,
274        /// The event filter that governs when this subscription fires.
275        filter: EventFilter,
276        /// The timestamp when the subscription was created.
277        timestamp: u64,
278    },
279
280    /// A subscription has been triggered by a matching event.
281    SubscriptionTriggered {
282        /// The subscription that was triggered.
283        subscription_id: SubscriptionId,
284        /// The timestamp when the subscription was triggered.
285        timestamp: u64,
286    },
287
288    /// A subscription has been canceled.
289    SubscriptionCanceled {
290        /// The subscription that was canceled.
291        subscription_id: SubscriptionId,
292        /// The timestamp when the subscription was canceled.
293        timestamp: u64,
294    },
295
296    // ── WAL v5: Actor events (discriminants 23-25) ─────────────────────────
297    /// A remote actor has registered with the hub.
298    ActorRegistered {
299        actor_id: ActorId,
300        identity: String,
301        capabilities: Vec<String>,
302        department: Option<String>,
303        heartbeat_interval_secs: u64,
304        tenant_id: Option<TenantId>,
305        timestamp: u64,
306    },
307
308    /// A remote actor has deregistered (explicit or heartbeat timeout).
309    ActorDeregistered { actor_id: ActorId, timestamp: u64 },
310
311    /// A remote actor sent a heartbeat.
312    ActorHeartbeat { actor_id: ActorId, timestamp: u64 },
313
314    // ── WAL v5: Platform events (discriminants 26-31) ──────────────────────
315    /// An organizational tenant was created.
316    TenantCreated { tenant_id: TenantId, name: String, timestamp: u64 },
317
318    /// A role was assigned to an actor within a tenant.
319    RoleAssigned { actor_id: ActorId, role: Role, tenant_id: TenantId, timestamp: u64 },
320
321    /// A capability was granted to an actor within a tenant.
322    CapabilityGranted {
323        actor_id: ActorId,
324        capability: Capability,
325        tenant_id: TenantId,
326        timestamp: u64,
327    },
328
329    /// A capability grant was revoked.
330    CapabilityRevoked {
331        actor_id: ActorId,
332        capability: Capability,
333        tenant_id: TenantId,
334        timestamp: u64,
335    },
336
337    /// A ledger entry was appended.
338    LedgerEntryAppended {
339        entry_id: LedgerEntryId,
340        tenant_id: TenantId,
341        ledger_key: String,
342        actor_id: Option<ActorId>,
343        payload: Vec<u8>,
344        timestamp: u64,
345    },
346}