Skip to main content

actionqueue_storage/mutation/
authority.rs

1//! Storage-owned mutation authority implementation.
2//!
3//! This module implements the D-04 WAL-first authority lane:
4//! validate command -> map event -> append -> durability sync policy -> apply projection.
5
6use actionqueue_core::budget::BudgetDimension;
7use actionqueue_core::ids::{AttemptId, RunId, TaskId};
8use actionqueue_core::mutation::{
9    ActorDeregisterCommand, ActorHeartbeatCommand, ActorRegisterCommand, AppliedMutation,
10    AttemptFinishCommand, AttemptStartCommand, BudgetAllocateCommand, BudgetConsumeCommand,
11    BudgetReplenishCommand, CapabilityGrantCommand, CapabilityRevokeCommand,
12    DependencyDeclareCommand, DurabilityPolicy, EnginePauseCommand, EngineResumeCommand,
13    LeaseAcquireCommand, LeaseExpireCommand, LeaseHeartbeatCommand, LeaseReleaseCommand,
14    LedgerAppendCommand, MutationAuthority, MutationCommand, MutationOutcome, RoleAssignCommand,
15    RunCreateCommand, RunResumeCommand, RunStateTransitionCommand, RunSuspendCommand,
16    SubscriptionCancelCommand, SubscriptionCreateCommand, SubscriptionTriggerCommand,
17    TaskCancelCommand, TaskCreateCommand, TenantCreateCommand,
18};
19use actionqueue_core::run::state::RunState;
20use actionqueue_core::run::transitions::is_valid_transition;
21use actionqueue_core::subscription::SubscriptionId;
22
23use crate::recovery::reducer::ReplayReducer;
24use crate::recovery::reducer::ReplayReducerError;
25use crate::wal::event::{WalEvent, WalEventType};
26use crate::wal::writer::{WalWriter, WalWriterError};
27
28/// Projection behavior required by the mutation authority.
29pub trait MutationProjection {
30    /// Typed projection apply error.
31    type Error;
32
33    /// Returns the latest durable sequence represented by this projection.
34    fn latest_sequence(&self) -> u64;
35
36    /// Returns the current run state for validation, if known.
37    fn run_state(&self, run_id: &RunId) -> Option<RunState>;
38
39    /// Returns true when the task exists in current projection state.
40    fn task_exists(&self, task_id: TaskId) -> bool;
41
42    /// Returns true when the task is already marked canceled in projection state.
43    fn is_task_canceled(&self, task_id: TaskId) -> bool;
44
45    /// Returns true when engine control projection is currently paused.
46    fn is_engine_paused(&self) -> bool;
47
48    /// Returns the active attempt identifier for the run, if one exists.
49    fn active_attempt_id(&self, run_id: &RunId) -> Option<AttemptId>;
50
51    /// Returns the active lease metadata `(owner, expiry)` for the run, if one exists.
52    fn active_lease(&self, run_id: &RunId) -> Option<(String, u64)>;
53
54    /// Returns true when a budget allocation exists for the specified (task, dimension) pair.
55    fn budget_allocation_exists(&self, _task_id: TaskId, _dimension: BudgetDimension) -> bool {
56        false
57    }
58
59    /// Returns true when the specified subscription exists in the projection.
60    fn subscription_exists(&self, _subscription_id: SubscriptionId) -> bool {
61        false
62    }
63
64    /// Returns true when the specified subscription has been canceled.
65    fn is_subscription_canceled(&self, _subscription_id: SubscriptionId) -> bool {
66        false
67    }
68
69    /// Applies a durable event to the in-memory projection.
70    fn apply_event(&mut self, event: &WalEvent) -> Result<(), Self::Error>;
71}
72
73impl MutationProjection for ReplayReducer {
74    type Error = ReplayReducerError;
75
76    fn latest_sequence(&self) -> u64 {
77        ReplayReducer::latest_sequence(self)
78    }
79
80    fn run_state(&self, run_id: &RunId) -> Option<RunState> {
81        self.get_run_state(run_id).copied()
82    }
83
84    fn task_exists(&self, task_id: TaskId) -> bool {
85        self.get_task(&task_id).is_some()
86    }
87
88    fn is_task_canceled(&self, task_id: TaskId) -> bool {
89        ReplayReducer::is_task_canceled(self, task_id)
90    }
91
92    fn is_engine_paused(&self) -> bool {
93        ReplayReducer::is_engine_paused(self)
94    }
95
96    fn active_attempt_id(&self, run_id: &RunId) -> Option<AttemptId> {
97        self.get_run_instance(run_id).and_then(|run| run.current_attempt_id())
98    }
99
100    fn active_lease(&self, run_id: &RunId) -> Option<(String, u64)> {
101        self.get_lease(run_id).cloned()
102    }
103
104    fn budget_allocation_exists(&self, task_id: TaskId, dimension: BudgetDimension) -> bool {
105        ReplayReducer::budget_allocation_exists(self, task_id, dimension)
106    }
107
108    fn subscription_exists(&self, subscription_id: SubscriptionId) -> bool {
109        ReplayReducer::subscription_exists(self, subscription_id)
110    }
111
112    fn is_subscription_canceled(&self, subscription_id: SubscriptionId) -> bool {
113        ReplayReducer::is_subscription_canceled(self, subscription_id)
114    }
115
116    fn apply_event(&mut self, event: &WalEvent) -> Result<(), Self::Error> {
117        self.apply(event)
118    }
119}
120
121/// Storage-owned implementation of the mutation authority lane.
122#[derive(Debug)]
123pub struct StorageMutationAuthority<W: WalWriter, P: MutationProjection> {
124    wal_writer: W,
125    projection: P,
126}
127
128impl<W: WalWriter, P: MutationProjection> StorageMutationAuthority<W, P> {
129    /// Creates a new storage authority with an owned WAL writer and projection.
130    pub fn new(wal_writer: W, projection: P) -> Self {
131        Self { wal_writer, projection }
132    }
133
134    /// Returns the projection used by this authority.
135    pub fn projection(&self) -> &P {
136        &self.projection
137    }
138
139    /// Returns the mutable projection used by this authority.
140    pub fn projection_mut(&mut self) -> &mut P {
141        &mut self.projection
142    }
143
144    /// Decomposes the authority into its owned parts.
145    pub fn into_parts(self) -> (W, P) {
146        (self.wal_writer, self.projection)
147    }
148
149    fn validate_command(
150        &self,
151        command: &MutationCommand,
152    ) -> Result<ValidatedCommand, MutationValidationError> {
153        match command {
154            MutationCommand::TaskCreate(details) => {
155                self.validate_task_create(details)?;
156                Ok(ValidatedCommand::TaskCreate(details.clone()))
157            }
158            MutationCommand::RunCreate(details) => {
159                self.validate_run_create(details)?;
160                Ok(ValidatedCommand::RunCreate(details.clone()))
161            }
162            MutationCommand::RunStateTransition(details) => {
163                self.validate_run_state_transition(*details)?;
164                Ok(ValidatedCommand::RunStateTransition(*details))
165            }
166            MutationCommand::AttemptStart(details) => {
167                self.validate_attempt_start(*details)?;
168                Ok(ValidatedCommand::AttemptStart(*details))
169            }
170            MutationCommand::AttemptFinish(details) => {
171                self.validate_attempt_finish(details)?;
172                Ok(ValidatedCommand::AttemptFinish(details.clone()))
173            }
174            MutationCommand::LeaseAcquire(details) => {
175                self.validate_lease_acquire(details)?;
176                Ok(ValidatedCommand::LeaseAcquire(details.clone()))
177            }
178            MutationCommand::LeaseHeartbeat(details) => {
179                self.validate_lease_heartbeat(details)?;
180                Ok(ValidatedCommand::LeaseHeartbeat(details.clone()))
181            }
182            MutationCommand::LeaseExpire(details) => {
183                self.validate_lease_expire(details)?;
184                Ok(ValidatedCommand::LeaseExpire(details.clone()))
185            }
186            MutationCommand::LeaseRelease(details) => {
187                self.validate_lease_release(details)?;
188                Ok(ValidatedCommand::LeaseRelease(details.clone()))
189            }
190            MutationCommand::EnginePause(details) => {
191                self.validate_engine_pause(*details)?;
192                Ok(ValidatedCommand::EnginePause(*details))
193            }
194            MutationCommand::EngineResume(details) => {
195                self.validate_engine_resume(*details)?;
196                Ok(ValidatedCommand::EngineResume(*details))
197            }
198            MutationCommand::TaskCancel(details) => {
199                self.validate_task_cancel(*details)?;
200                Ok(ValidatedCommand::TaskCancel(*details))
201            }
202            MutationCommand::DependencyDeclare(details) => {
203                self.validate_dependency_declare(details)?;
204                Ok(ValidatedCommand::DependencyDeclare(details.clone()))
205            }
206            MutationCommand::RunSuspend(details) => {
207                self.validate_run_suspend(details)?;
208                Ok(ValidatedCommand::RunSuspend(details.clone()))
209            }
210            MutationCommand::RunResume(details) => {
211                self.validate_run_resume(details)?;
212                Ok(ValidatedCommand::RunResume(*details))
213            }
214            MutationCommand::BudgetAllocate(details) => {
215                self.validate_budget_allocate(details)?;
216                Ok(ValidatedCommand::BudgetAllocate(*details))
217            }
218            MutationCommand::BudgetConsume(details) => {
219                self.validate_budget_consume(details)?;
220                Ok(ValidatedCommand::BudgetConsume(*details))
221            }
222            MutationCommand::BudgetReplenish(details) => {
223                self.validate_budget_replenish(details)?;
224                Ok(ValidatedCommand::BudgetReplenish(*details))
225            }
226            MutationCommand::SubscriptionCreate(details) => {
227                self.validate_subscription_create(details)?;
228                Ok(ValidatedCommand::SubscriptionCreate(details.clone()))
229            }
230            MutationCommand::SubscriptionCancel(details) => {
231                self.validate_subscription_cancel(details)?;
232                Ok(ValidatedCommand::SubscriptionCancel(*details))
233            }
234            MutationCommand::SubscriptionTrigger(details) => {
235                self.validate_sequence(details.sequence())?;
236                Ok(ValidatedCommand::SubscriptionTrigger(*details))
237            }
238            MutationCommand::ActorRegister(details) => {
239                self.validate_sequence(details.sequence())?;
240                Ok(ValidatedCommand::ActorRegister(details.clone()))
241            }
242            MutationCommand::ActorDeregister(details) => {
243                self.validate_sequence(details.sequence())?;
244                Ok(ValidatedCommand::ActorDeregister(*details))
245            }
246            MutationCommand::ActorHeartbeat(details) => {
247                self.validate_sequence(details.sequence())?;
248                Ok(ValidatedCommand::ActorHeartbeat(*details))
249            }
250            MutationCommand::TenantCreate(details) => {
251                self.validate_sequence(details.sequence())?;
252                Ok(ValidatedCommand::TenantCreate(details.clone()))
253            }
254            MutationCommand::RoleAssign(details) => {
255                self.validate_sequence(details.sequence())?;
256                Ok(ValidatedCommand::RoleAssign(details.clone()))
257            }
258            MutationCommand::CapabilityGrant(details) => {
259                self.validate_sequence(details.sequence())?;
260                Ok(ValidatedCommand::CapabilityGrant(details.clone()))
261            }
262            MutationCommand::CapabilityRevoke(details) => {
263                self.validate_sequence(details.sequence())?;
264                Ok(ValidatedCommand::CapabilityRevoke(details.clone()))
265            }
266            MutationCommand::LedgerAppend(details) => {
267                self.validate_sequence(details.sequence())?;
268                Ok(ValidatedCommand::LedgerAppend(details.clone()))
269            }
270        }
271    }
272
273    fn validate_sequence(&self, provided: u64) -> Result<(), MutationValidationError> {
274        let expected_sequence = self
275            .projection
276            .latest_sequence()
277            .checked_add(1)
278            .ok_or(MutationValidationError::SequenceOverflow)?;
279
280        if provided != expected_sequence {
281            tracing::warn!(
282                expected = expected_sequence,
283                provided,
284                "sequence monotonicity violation"
285            );
286            return Err(MutationValidationError::NonMonotonicSequence {
287                expected: expected_sequence,
288                provided,
289            });
290        }
291
292        Ok(())
293    }
294
295    fn validate_task_create(
296        &self,
297        command: &TaskCreateCommand,
298    ) -> Result<(), MutationValidationError> {
299        self.validate_sequence(command.sequence())?;
300
301        if self.projection.task_exists(command.task_spec().id()) {
302            return Err(MutationValidationError::TaskAlreadyExists {
303                task_id: command.task_spec().id(),
304            });
305        }
306
307        Ok(())
308    }
309
310    fn validate_run_create(
311        &self,
312        command: &RunCreateCommand,
313    ) -> Result<(), MutationValidationError> {
314        self.validate_sequence(command.sequence())?;
315
316        if !self.projection.task_exists(command.run_instance().task_id()) {
317            return Err(MutationValidationError::UnknownTask {
318                task_id: command.run_instance().task_id(),
319            });
320        }
321
322        if self.projection.run_state(&command.run_instance().id()).is_some() {
323            return Err(MutationValidationError::RunAlreadyExists {
324                run_id: command.run_instance().id(),
325            });
326        }
327
328        if command.run_instance().state() != RunState::Scheduled {
329            return Err(MutationValidationError::RunCreateRequiresScheduled {
330                run_id: command.run_instance().id(),
331                state: command.run_instance().state(),
332            });
333        }
334
335        Ok(())
336    }
337
338    fn validate_run_state_transition(
339        &self,
340        command: RunStateTransitionCommand,
341    ) -> Result<(), MutationValidationError> {
342        self.validate_sequence(command.sequence())?;
343
344        let observed_state = self
345            .projection
346            .run_state(&command.run_id())
347            .ok_or(MutationValidationError::UnknownRun { run_id: command.run_id() })?;
348
349        if observed_state != command.previous_state() {
350            return Err(MutationValidationError::PreviousStateMismatch {
351                run_id: command.run_id(),
352                expected: command.previous_state(),
353                actual: observed_state,
354            });
355        }
356
357        if !is_valid_transition(command.previous_state(), command.new_state()) {
358            return Err(MutationValidationError::InvalidTransition {
359                run_id: command.run_id(),
360                from: command.previous_state(),
361                to: command.new_state(),
362            });
363        }
364
365        Ok(())
366    }
367
368    fn validate_attempt_start(
369        &self,
370        command: AttemptStartCommand,
371    ) -> Result<(), MutationValidationError> {
372        self.validate_sequence(command.sequence())?;
373
374        let observed_state = self
375            .projection
376            .run_state(&command.run_id())
377            .ok_or(MutationValidationError::AttemptStartUnknownRun { run_id: command.run_id() })?;
378
379        if observed_state != RunState::Running {
380            return Err(MutationValidationError::AttemptStartRequiresRunning {
381                run_id: command.run_id(),
382                state: observed_state,
383            });
384        }
385
386        if let Some(active_attempt_id) = self.projection.active_attempt_id(&command.run_id()) {
387            return Err(MutationValidationError::AttemptStartAlreadyActive {
388                run_id: command.run_id(),
389                active_attempt_id,
390            });
391        }
392
393        Ok(())
394    }
395
396    fn validate_attempt_finish(
397        &self,
398        command: &AttemptFinishCommand,
399    ) -> Result<(), MutationValidationError> {
400        self.validate_sequence(command.sequence())?;
401
402        let observed_state = self
403            .projection
404            .run_state(&command.run_id())
405            .ok_or(MutationValidationError::AttemptFinishUnknownRun { run_id: command.run_id() })?;
406
407        if observed_state != RunState::Running {
408            return Err(MutationValidationError::AttemptFinishRequiresRunning {
409                run_id: command.run_id(),
410                state: observed_state,
411            });
412        }
413
414        let active_attempt_id = self.projection.active_attempt_id(&command.run_id()).ok_or(
415            MutationValidationError::AttemptFinishMissingActive { run_id: command.run_id() },
416        )?;
417
418        if active_attempt_id != command.attempt_id() {
419            return Err(MutationValidationError::AttemptFinishAttemptMismatch {
420                run_id: command.run_id(),
421                expected_attempt_id: active_attempt_id,
422                provided_attempt_id: command.attempt_id(),
423            });
424        }
425
426        Ok(())
427    }
428
429    fn validate_lease_acquire(
430        &self,
431        command: &LeaseAcquireCommand,
432    ) -> Result<(), MutationValidationError> {
433        self.validate_sequence(command.sequence())?;
434
435        let observed_state = self.projection.run_state(&command.run_id()).ok_or(
436            MutationValidationError::LeaseUnknownRun {
437                run_id: command.run_id(),
438                event: LeaseValidationEvent::Acquire,
439            },
440        )?;
441
442        if !matches!(observed_state, RunState::Ready | RunState::Leased) {
443            return Err(MutationValidationError::LeaseInvalidRunState {
444                run_id: command.run_id(),
445                event: LeaseValidationEvent::Acquire,
446                state: observed_state,
447            });
448        }
449
450        if self.projection.active_lease(&command.run_id()).is_some() {
451            return Err(MutationValidationError::LeaseAlreadyActive { run_id: command.run_id() });
452        }
453
454        Ok(())
455    }
456
457    fn validate_lease_heartbeat(
458        &self,
459        command: &LeaseHeartbeatCommand,
460    ) -> Result<(), MutationValidationError> {
461        self.validate_sequence(command.sequence())?;
462
463        let observed_state = self.projection.run_state(&command.run_id()).ok_or(
464            MutationValidationError::LeaseUnknownRun {
465                run_id: command.run_id(),
466                event: LeaseValidationEvent::Heartbeat,
467            },
468        )?;
469
470        if !matches!(observed_state, RunState::Leased | RunState::Running) {
471            return Err(MutationValidationError::LeaseInvalidRunState {
472                run_id: command.run_id(),
473                event: LeaseValidationEvent::Heartbeat,
474                state: observed_state,
475            });
476        }
477
478        let (active_owner, active_expiry) = self.projection.active_lease(&command.run_id()).ok_or(
479            MutationValidationError::LeaseMissingActive {
480                run_id: command.run_id(),
481                event: LeaseValidationEvent::Heartbeat,
482            },
483        )?;
484
485        if active_owner != command.owner() {
486            return Err(MutationValidationError::LeaseOwnerMismatch {
487                run_id: command.run_id(),
488                event: LeaseValidationEvent::Heartbeat,
489            });
490        }
491
492        if command.expiry() < active_expiry {
493            return Err(MutationValidationError::LeaseHeartbeatExpiryRegression {
494                run_id: command.run_id(),
495                previous_expiry: active_expiry,
496                proposed_expiry: command.expiry(),
497            });
498        }
499
500        Ok(())
501    }
502
503    fn validate_lease_expire(
504        &self,
505        command: &LeaseExpireCommand,
506    ) -> Result<(), MutationValidationError> {
507        self.validate_lease_close(LeaseCloseParams {
508            sequence: command.sequence(),
509            run_id: command.run_id(),
510            owner: command.owner(),
511            expiry: command.expiry(),
512            event: LeaseValidationEvent::Expire,
513        })
514    }
515
516    fn validate_lease_release(
517        &self,
518        command: &LeaseReleaseCommand,
519    ) -> Result<(), MutationValidationError> {
520        self.validate_lease_close(LeaseCloseParams {
521            sequence: command.sequence(),
522            run_id: command.run_id(),
523            owner: command.owner(),
524            expiry: command.expiry(),
525            event: LeaseValidationEvent::Release,
526        })
527    }
528
529    fn validate_lease_close(
530        &self,
531        params: LeaseCloseParams<'_>,
532    ) -> Result<(), MutationValidationError> {
533        let LeaseCloseParams { sequence, run_id, owner, expiry, event } = params;
534        self.validate_sequence(sequence)?;
535
536        let observed_state = self
537            .projection
538            .run_state(&run_id)
539            .ok_or(MutationValidationError::LeaseUnknownRun { run_id, event })?;
540
541        if !matches!(observed_state, RunState::Ready | RunState::Leased | RunState::Running) {
542            return Err(MutationValidationError::LeaseInvalidRunState {
543                run_id,
544                event,
545                state: observed_state,
546            });
547        }
548
549        let (active_owner, active_expiry) = self
550            .projection
551            .active_lease(&run_id)
552            .ok_or(MutationValidationError::LeaseMissingActive { run_id, event })?;
553
554        if active_owner != owner {
555            return Err(MutationValidationError::LeaseOwnerMismatch { run_id, event });
556        }
557
558        if active_expiry != expiry {
559            return Err(MutationValidationError::LeaseExpiryMismatch {
560                run_id,
561                event,
562                expected_expiry: active_expiry,
563                provided_expiry: expiry,
564            });
565        }
566
567        Ok(())
568    }
569
570    fn validate_task_cancel(
571        &self,
572        command: TaskCancelCommand,
573    ) -> Result<(), MutationValidationError> {
574        self.validate_sequence(command.sequence())?;
575
576        if !self.projection.task_exists(command.task_id()) {
577            return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
578        }
579
580        if self.projection.is_task_canceled(command.task_id()) {
581            return Err(MutationValidationError::TaskAlreadyCanceled {
582                task_id: command.task_id(),
583            });
584        }
585
586        Ok(())
587    }
588
589    fn validate_dependency_declare(
590        &self,
591        command: &DependencyDeclareCommand,
592    ) -> Result<(), MutationValidationError> {
593        self.validate_sequence(command.sequence())?;
594
595        // Reject empty dependency lists — they are semantically meaningless.
596        if command.depends_on().is_empty() {
597            return Err(MutationValidationError::EmptyDependencyList {
598                task_id: command.task_id(),
599            });
600        }
601
602        // The task that will be gated must exist in the projection.
603        if !self.projection.task_exists(command.task_id()) {
604            return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
605        }
606
607        // All prerequisite tasks must also exist.
608        for &prereq in command.depends_on() {
609            if !self.projection.task_exists(prereq) {
610                return Err(MutationValidationError::UnknownTask { task_id: prereq });
611            }
612        }
613
614        Ok(())
615    }
616
617    fn validate_engine_pause(
618        &self,
619        command: EnginePauseCommand,
620    ) -> Result<(), MutationValidationError> {
621        self.validate_sequence(command.sequence())?;
622
623        if self.projection.is_engine_paused() {
624            return Err(MutationValidationError::EngineAlreadyPaused);
625        }
626
627        Ok(())
628    }
629
630    fn validate_engine_resume(
631        &self,
632        command: EngineResumeCommand,
633    ) -> Result<(), MutationValidationError> {
634        self.validate_sequence(command.sequence())?;
635
636        if !self.projection.is_engine_paused() {
637            return Err(MutationValidationError::EngineNotPaused);
638        }
639
640        Ok(())
641    }
642
643    fn validate_run_suspend(
644        &self,
645        command: &RunSuspendCommand,
646    ) -> Result<(), MutationValidationError> {
647        self.validate_sequence(command.sequence())?;
648        let state = self
649            .projection
650            .run_state(&command.run_id())
651            .ok_or(MutationValidationError::UnknownRun { run_id: command.run_id() })?;
652        if state != RunState::Running {
653            return Err(MutationValidationError::RunSuspendRequiresRunning {
654                run_id: command.run_id(),
655                state,
656            });
657        }
658        Ok(())
659    }
660
661    fn validate_run_resume(
662        &self,
663        command: &RunResumeCommand,
664    ) -> Result<(), MutationValidationError> {
665        self.validate_sequence(command.sequence())?;
666        let state = self
667            .projection
668            .run_state(&command.run_id())
669            .ok_or(MutationValidationError::UnknownRun { run_id: command.run_id() })?;
670        if state != RunState::Suspended {
671            return Err(MutationValidationError::RunResumeRequiresSuspended {
672                run_id: command.run_id(),
673                state,
674            });
675        }
676        Ok(())
677    }
678
679    fn validate_budget_allocate(
680        &self,
681        command: &BudgetAllocateCommand,
682    ) -> Result<(), MutationValidationError> {
683        self.validate_sequence(command.sequence())?;
684        if !self.projection.task_exists(command.task_id()) {
685            return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
686        }
687        Ok(())
688    }
689
690    fn validate_budget_consume(
691        &self,
692        command: &BudgetConsumeCommand,
693    ) -> Result<(), MutationValidationError> {
694        self.validate_sequence(command.sequence())?;
695        if !self.projection.task_exists(command.task_id()) {
696            return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
697        }
698        Ok(())
699    }
700
701    fn validate_budget_replenish(
702        &self,
703        command: &BudgetReplenishCommand,
704    ) -> Result<(), MutationValidationError> {
705        self.validate_sequence(command.sequence())?;
706        if !self.projection.task_exists(command.task_id()) {
707            return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
708        }
709        if !self.projection.budget_allocation_exists(command.task_id(), command.dimension()) {
710            return Err(MutationValidationError::BudgetNotAllocated {
711                task_id: command.task_id(),
712                dimension: command.dimension(),
713            });
714        }
715        Ok(())
716    }
717
718    fn validate_subscription_create(
719        &self,
720        command: &SubscriptionCreateCommand,
721    ) -> Result<(), MutationValidationError> {
722        self.validate_sequence(command.sequence())?;
723        if !self.projection.task_exists(command.task_id()) {
724            return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
725        }
726        if self.projection.subscription_exists(command.subscription_id()) {
727            return Err(MutationValidationError::SubscriptionAlreadyExists {
728                subscription_id: command.subscription_id(),
729            });
730        }
731        Ok(())
732    }
733
734    fn validate_subscription_cancel(
735        &self,
736        command: &SubscriptionCancelCommand,
737    ) -> Result<(), MutationValidationError> {
738        self.validate_sequence(command.sequence())?;
739        if !self.projection.subscription_exists(command.subscription_id()) {
740            return Err(MutationValidationError::UnknownSubscription {
741                subscription_id: command.subscription_id(),
742            });
743        }
744        if self.projection.is_subscription_canceled(command.subscription_id()) {
745            return Err(MutationValidationError::SubscriptionAlreadyCanceled {
746                subscription_id: command.subscription_id(),
747            });
748        }
749        Ok(())
750    }
751
752    fn build_event_and_applied(validated: ValidatedCommand) -> (WalEvent, AppliedMutation) {
753        match validated {
754            ValidatedCommand::TaskCreate(command) => {
755                let task_id = command.task_spec().id();
756                let event = WalEvent::new(
757                    command.sequence(),
758                    WalEventType::TaskCreated {
759                        task_spec: command.task_spec().clone(),
760                        timestamp: command.timestamp(),
761                    },
762                );
763                let applied = AppliedMutation::TaskCreate { task_id };
764                (event, applied)
765            }
766            ValidatedCommand::RunCreate(command) => {
767                let run_id = command.run_instance().id();
768                let task_id = command.run_instance().task_id();
769                let event = WalEvent::new(
770                    command.sequence(),
771                    WalEventType::RunCreated { run_instance: command.run_instance().clone() },
772                );
773                let applied = AppliedMutation::RunCreate { run_id, task_id };
774                (event, applied)
775            }
776            ValidatedCommand::RunStateTransition(command) => {
777                let event = WalEvent::new(
778                    command.sequence(),
779                    WalEventType::RunStateChanged {
780                        run_id: command.run_id(),
781                        previous_state: command.previous_state(),
782                        new_state: command.new_state(),
783                        timestamp: command.timestamp(),
784                    },
785                );
786                let applied = AppliedMutation::RunStateTransition {
787                    run_id: command.run_id(),
788                    previous_state: command.previous_state(),
789                    new_state: command.new_state(),
790                };
791                (event, applied)
792            }
793            ValidatedCommand::AttemptStart(command) => {
794                let event = WalEvent::new(
795                    command.sequence(),
796                    WalEventType::AttemptStarted {
797                        run_id: command.run_id(),
798                        attempt_id: command.attempt_id(),
799                        timestamp: command.timestamp(),
800                    },
801                );
802                let applied = AppliedMutation::AttemptStart {
803                    run_id: command.run_id(),
804                    attempt_id: command.attempt_id(),
805                };
806                (event, applied)
807            }
808            ValidatedCommand::AttemptFinish(command) => {
809                let event = WalEvent::new(
810                    command.sequence(),
811                    WalEventType::AttemptFinished {
812                        run_id: command.run_id(),
813                        attempt_id: command.attempt_id(),
814                        result: command.result(),
815                        error: command.error().map(|s| s.to_string()),
816                        output: command.output().map(|b| b.to_vec()),
817                        timestamp: command.timestamp(),
818                    },
819                );
820                let applied = AppliedMutation::AttemptFinish {
821                    run_id: command.run_id(),
822                    attempt_id: command.attempt_id(),
823                    outcome: command.outcome().clone(),
824                };
825                (event, applied)
826            }
827            ValidatedCommand::LeaseAcquire(command) => {
828                let event = WalEvent::new(
829                    command.sequence(),
830                    WalEventType::LeaseAcquired {
831                        run_id: command.run_id(),
832                        owner: command.owner().to_string(),
833                        expiry: command.expiry(),
834                        timestamp: command.timestamp(),
835                    },
836                );
837                let applied = AppliedMutation::LeaseAcquire {
838                    run_id: command.run_id(),
839                    owner: command.owner().to_string(),
840                    expiry: command.expiry(),
841                };
842                (event, applied)
843            }
844            ValidatedCommand::LeaseHeartbeat(command) => {
845                let event = WalEvent::new(
846                    command.sequence(),
847                    WalEventType::LeaseHeartbeat {
848                        run_id: command.run_id(),
849                        owner: command.owner().to_string(),
850                        expiry: command.expiry(),
851                        timestamp: command.timestamp(),
852                    },
853                );
854                let applied = AppliedMutation::LeaseHeartbeat {
855                    run_id: command.run_id(),
856                    owner: command.owner().to_string(),
857                    expiry: command.expiry(),
858                };
859                (event, applied)
860            }
861            ValidatedCommand::LeaseExpire(command) => {
862                let event = WalEvent::new(
863                    command.sequence(),
864                    WalEventType::LeaseExpired {
865                        run_id: command.run_id(),
866                        owner: command.owner().to_string(),
867                        expiry: command.expiry(),
868                        timestamp: command.timestamp(),
869                    },
870                );
871                let applied = AppliedMutation::LeaseExpire {
872                    run_id: command.run_id(),
873                    owner: command.owner().to_string(),
874                    expiry: command.expiry(),
875                };
876                (event, applied)
877            }
878            ValidatedCommand::LeaseRelease(command) => {
879                let event = WalEvent::new(
880                    command.sequence(),
881                    WalEventType::LeaseReleased {
882                        run_id: command.run_id(),
883                        owner: command.owner().to_string(),
884                        expiry: command.expiry(),
885                        timestamp: command.timestamp(),
886                    },
887                );
888                let applied = AppliedMutation::LeaseRelease {
889                    run_id: command.run_id(),
890                    owner: command.owner().to_string(),
891                    expiry: command.expiry(),
892                };
893                (event, applied)
894            }
895            ValidatedCommand::EnginePause(command) => {
896                let event = WalEvent::new(
897                    command.sequence(),
898                    WalEventType::EnginePaused { timestamp: command.timestamp() },
899                );
900                (event, AppliedMutation::EnginePause)
901            }
902            ValidatedCommand::EngineResume(command) => {
903                let event = WalEvent::new(
904                    command.sequence(),
905                    WalEventType::EngineResumed { timestamp: command.timestamp() },
906                );
907                (event, AppliedMutation::EngineResume)
908            }
909            ValidatedCommand::TaskCancel(command) => {
910                let event = WalEvent::new(
911                    command.sequence(),
912                    WalEventType::TaskCanceled {
913                        task_id: command.task_id(),
914                        timestamp: command.timestamp(),
915                    },
916                );
917                let applied = AppliedMutation::TaskCancel { task_id: command.task_id() };
918                (event, applied)
919            }
920            ValidatedCommand::DependencyDeclare(command) => {
921                let depends_on = command.depends_on().to_vec();
922                let event = WalEvent::new(
923                    command.sequence(),
924                    WalEventType::DependencyDeclared {
925                        task_id: command.task_id(),
926                        depends_on: depends_on.clone(),
927                        timestamp: command.timestamp(),
928                    },
929                );
930                let applied =
931                    AppliedMutation::DependencyDeclare { task_id: command.task_id(), depends_on };
932                (event, applied)
933            }
934            ValidatedCommand::RunSuspend(command) => {
935                let event = WalEvent::new(
936                    command.sequence(),
937                    WalEventType::RunSuspended {
938                        run_id: command.run_id(),
939                        reason: command.reason().map(|s| s.to_string()),
940                        timestamp: command.timestamp(),
941                    },
942                );
943                (event, AppliedMutation::RunSuspend { run_id: command.run_id() })
944            }
945            ValidatedCommand::RunResume(command) => {
946                let event = WalEvent::new(
947                    command.sequence(),
948                    WalEventType::RunResumed {
949                        run_id: command.run_id(),
950                        timestamp: command.timestamp(),
951                    },
952                );
953                (event, AppliedMutation::RunResume { run_id: command.run_id() })
954            }
955            ValidatedCommand::BudgetAllocate(command) => {
956                let event = WalEvent::new(
957                    command.sequence(),
958                    WalEventType::BudgetAllocated {
959                        task_id: command.task_id(),
960                        dimension: command.dimension(),
961                        limit: command.limit(),
962                        timestamp: command.timestamp(),
963                    },
964                );
965                let applied = AppliedMutation::BudgetAllocate {
966                    task_id: command.task_id(),
967                    dimension: command.dimension(),
968                    limit: command.limit(),
969                };
970                (event, applied)
971            }
972            ValidatedCommand::BudgetConsume(command) => {
973                let event = WalEvent::new(
974                    command.sequence(),
975                    WalEventType::BudgetConsumed {
976                        task_id: command.task_id(),
977                        dimension: command.dimension(),
978                        amount: command.amount(),
979                        timestamp: command.timestamp(),
980                    },
981                );
982                let applied = AppliedMutation::BudgetConsume {
983                    task_id: command.task_id(),
984                    dimension: command.dimension(),
985                    amount: command.amount(),
986                };
987                (event, applied)
988            }
989            ValidatedCommand::BudgetReplenish(command) => {
990                let event = WalEvent::new(
991                    command.sequence(),
992                    WalEventType::BudgetReplenished {
993                        task_id: command.task_id(),
994                        dimension: command.dimension(),
995                        new_limit: command.new_limit(),
996                        timestamp: command.timestamp(),
997                    },
998                );
999                let applied = AppliedMutation::BudgetReplenish {
1000                    task_id: command.task_id(),
1001                    dimension: command.dimension(),
1002                    new_limit: command.new_limit(),
1003                };
1004                (event, applied)
1005            }
1006            ValidatedCommand::SubscriptionCreate(command) => {
1007                let event = WalEvent::new(
1008                    command.sequence(),
1009                    WalEventType::SubscriptionCreated {
1010                        subscription_id: command.subscription_id(),
1011                        task_id: command.task_id(),
1012                        filter: command.filter().clone(),
1013                        timestamp: command.timestamp(),
1014                    },
1015                );
1016                let applied = AppliedMutation::SubscriptionCreate {
1017                    subscription_id: command.subscription_id(),
1018                    task_id: command.task_id(),
1019                };
1020                (event, applied)
1021            }
1022            ValidatedCommand::SubscriptionCancel(command) => {
1023                let event = WalEvent::new(
1024                    command.sequence(),
1025                    WalEventType::SubscriptionCanceled {
1026                        subscription_id: command.subscription_id(),
1027                        timestamp: command.timestamp(),
1028                    },
1029                );
1030                (
1031                    event,
1032                    AppliedMutation::SubscriptionCancel {
1033                        subscription_id: command.subscription_id(),
1034                    },
1035                )
1036            }
1037            ValidatedCommand::SubscriptionTrigger(command) => {
1038                let event = WalEvent::new(
1039                    command.sequence(),
1040                    WalEventType::SubscriptionTriggered {
1041                        subscription_id: command.subscription_id(),
1042                        timestamp: command.timestamp(),
1043                    },
1044                );
1045                (
1046                    event,
1047                    AppliedMutation::SubscriptionTrigger {
1048                        subscription_id: command.subscription_id(),
1049                    },
1050                )
1051            }
1052            ValidatedCommand::ActorRegister(command) => {
1053                let reg = command.registration();
1054                let event = WalEvent::new(
1055                    command.sequence(),
1056                    WalEventType::ActorRegistered {
1057                        actor_id: reg.actor_id(),
1058                        identity: reg.identity().to_string(),
1059                        capabilities: reg.capabilities().as_slice().to_vec(),
1060                        department: reg.department().map(|d| d.as_str().to_string()),
1061                        heartbeat_interval_secs: reg.heartbeat_interval_secs(),
1062                        tenant_id: reg.tenant_id(),
1063                        timestamp: command.timestamp(),
1064                    },
1065                );
1066                (event, AppliedMutation::NoOp)
1067            }
1068            ValidatedCommand::ActorDeregister(command) => {
1069                let event = WalEvent::new(
1070                    command.sequence(),
1071                    WalEventType::ActorDeregistered {
1072                        actor_id: command.actor_id(),
1073                        timestamp: command.timestamp(),
1074                    },
1075                );
1076                (event, AppliedMutation::NoOp)
1077            }
1078            ValidatedCommand::ActorHeartbeat(command) => {
1079                let event = WalEvent::new(
1080                    command.sequence(),
1081                    WalEventType::ActorHeartbeat {
1082                        actor_id: command.actor_id(),
1083                        timestamp: command.timestamp(),
1084                    },
1085                );
1086                (event, AppliedMutation::NoOp)
1087            }
1088            ValidatedCommand::TenantCreate(command) => {
1089                let reg = command.registration();
1090                let event = WalEvent::new(
1091                    command.sequence(),
1092                    WalEventType::TenantCreated {
1093                        tenant_id: reg.tenant_id(),
1094                        name: reg.name().to_string(),
1095                        timestamp: command.timestamp(),
1096                    },
1097                );
1098                (event, AppliedMutation::NoOp)
1099            }
1100            ValidatedCommand::RoleAssign(command) => {
1101                let event = WalEvent::new(
1102                    command.sequence(),
1103                    WalEventType::RoleAssigned {
1104                        actor_id: command.actor_id(),
1105                        role: command.role().clone(),
1106                        tenant_id: command.tenant_id(),
1107                        timestamp: command.timestamp(),
1108                    },
1109                );
1110                (event, AppliedMutation::NoOp)
1111            }
1112            ValidatedCommand::CapabilityGrant(command) => {
1113                let event = WalEvent::new(
1114                    command.sequence(),
1115                    WalEventType::CapabilityGranted {
1116                        actor_id: command.actor_id(),
1117                        capability: command.capability().clone(),
1118                        tenant_id: command.tenant_id(),
1119                        timestamp: command.timestamp(),
1120                    },
1121                );
1122                (event, AppliedMutation::NoOp)
1123            }
1124            ValidatedCommand::CapabilityRevoke(command) => {
1125                let event = WalEvent::new(
1126                    command.sequence(),
1127                    WalEventType::CapabilityRevoked {
1128                        actor_id: command.actor_id(),
1129                        capability: command.capability().clone(),
1130                        tenant_id: command.tenant_id(),
1131                        timestamp: command.timestamp(),
1132                    },
1133                );
1134                (event, AppliedMutation::NoOp)
1135            }
1136            ValidatedCommand::LedgerAppend(command) => {
1137                let entry = command.entry();
1138                let event = WalEvent::new(
1139                    command.sequence(),
1140                    WalEventType::LedgerEntryAppended {
1141                        entry_id: entry.entry_id(),
1142                        tenant_id: entry.tenant_id(),
1143                        ledger_key: entry.ledger_key().to_string(),
1144                        actor_id: entry.actor_id(),
1145                        payload: entry.payload().to_vec(),
1146                        timestamp: command.timestamp(),
1147                    },
1148                );
1149                (event, AppliedMutation::NoOp)
1150            }
1151        }
1152    }
1153}
1154
1155impl<W: WalWriter, P: MutationProjection> MutationAuthority for StorageMutationAuthority<W, P> {
1156    type Error = MutationAuthorityError<P::Error>;
1157
1158    fn submit_command(
1159        &mut self,
1160        command: MutationCommand,
1161        durability: DurabilityPolicy,
1162    ) -> Result<MutationOutcome, Self::Error> {
1163        // Stage 1: validate command.
1164        let validated =
1165            self.validate_command(&command).map_err(MutationAuthorityError::Validation)?;
1166
1167        // Stage 2: map validated command to canonical WAL event.
1168        let (event, applied) = Self::build_event_and_applied(validated);
1169
1170        // Stage 3: append WAL event.
1171        self.wal_writer.append(&event).map_err(MutationAuthorityError::Append)?;
1172
1173        // Stage 4: durability sync by policy.
1174        if durability == DurabilityPolicy::Immediate {
1175            if let Err(flush_error) = self.wal_writer.flush() {
1176                return Err(MutationAuthorityError::PartialDurability {
1177                    sequence: event.sequence(),
1178                    flush_error,
1179                });
1180            }
1181        }
1182
1183        // Stage 5: apply in-memory projection.
1184        self.projection.apply_event(&event).map_err(|source| MutationAuthorityError::Apply {
1185            sequence: event.sequence(),
1186            source,
1187        })?;
1188
1189        tracing::debug!(sequence = event.sequence(), "command submitted");
1190        Ok(MutationOutcome::new(event.sequence(), applied))
1191    }
1192}
1193
1194#[derive(Debug, Clone, PartialEq, Eq)]
1195enum ValidatedCommand {
1196    TaskCreate(TaskCreateCommand),
1197    RunCreate(RunCreateCommand),
1198    RunStateTransition(RunStateTransitionCommand),
1199    AttemptStart(AttemptStartCommand),
1200    AttemptFinish(AttemptFinishCommand),
1201    LeaseAcquire(LeaseAcquireCommand),
1202    LeaseHeartbeat(LeaseHeartbeatCommand),
1203    LeaseExpire(LeaseExpireCommand),
1204    LeaseRelease(LeaseReleaseCommand),
1205    EnginePause(EnginePauseCommand),
1206    EngineResume(EngineResumeCommand),
1207    TaskCancel(TaskCancelCommand),
1208    DependencyDeclare(DependencyDeclareCommand),
1209    RunSuspend(RunSuspendCommand),
1210    RunResume(RunResumeCommand),
1211    BudgetAllocate(BudgetAllocateCommand),
1212    BudgetConsume(BudgetConsumeCommand),
1213    BudgetReplenish(BudgetReplenishCommand),
1214    SubscriptionCreate(SubscriptionCreateCommand),
1215    SubscriptionCancel(SubscriptionCancelCommand),
1216    SubscriptionTrigger(SubscriptionTriggerCommand),
1217    ActorRegister(ActorRegisterCommand),
1218    ActorDeregister(ActorDeregisterCommand),
1219    ActorHeartbeat(ActorHeartbeatCommand),
1220    TenantCreate(TenantCreateCommand),
1221    RoleAssign(RoleAssignCommand),
1222    CapabilityGrant(CapabilityGrantCommand),
1223    CapabilityRevoke(CapabilityRevokeCommand),
1224    LedgerAppend(LedgerAppendCommand),
1225}
1226
1227/// Lease lifecycle event kind used by typed validation errors.
1228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1229pub enum LeaseValidationEvent {
1230    /// Lease acquire command validation.
1231    Acquire,
1232    /// Lease heartbeat command validation.
1233    Heartbeat,
1234    /// Lease expire command validation.
1235    Expire,
1236    /// Lease release command validation.
1237    Release,
1238}
1239
1240/// Parameters for lease-close validation that group the common fields
1241/// shared by expire and release validation paths.
1242struct LeaseCloseParams<'a> {
1243    sequence: u64,
1244    run_id: RunId,
1245    owner: &'a str,
1246    expiry: u64,
1247    event: LeaseValidationEvent,
1248}
1249
1250/// Typed validation failures from the authority validation stage.
1251#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1252pub enum MutationValidationError {
1253    /// Projection sequence could not be advanced because it overflowed `u64`.
1254    SequenceOverflow,
1255    /// Command sequence is stale or otherwise non-monotonic.
1256    NonMonotonicSequence {
1257        /// The expected next sequence.
1258        expected: u64,
1259        /// The command-provided sequence.
1260        provided: u64,
1261    },
1262    /// Run targeted by the command does not exist in projection state.
1263    UnknownRun {
1264        /// Missing run identifier.
1265        run_id: RunId,
1266    },
1267    /// Task targeted by the command does not exist in projection state.
1268    UnknownTask {
1269        /// Missing task identifier.
1270        task_id: TaskId,
1271    },
1272    /// Task creation rejected because task already exists.
1273    TaskAlreadyExists {
1274        /// Existing task identifier.
1275        task_id: TaskId,
1276    },
1277    /// Task cancellation rejected because task is already canceled.
1278    TaskAlreadyCanceled {
1279        /// Already canceled task identifier.
1280        task_id: TaskId,
1281    },
1282    /// Engine pause command rejected because engine is already paused.
1283    EngineAlreadyPaused,
1284    /// Engine resume command rejected because engine is not paused.
1285    EngineNotPaused,
1286    /// Run creation rejected because run already exists.
1287    RunAlreadyExists {
1288        /// Existing run identifier.
1289        run_id: RunId,
1290    },
1291    /// Run creation requires Scheduled initial state.
1292    RunCreateRequiresScheduled {
1293        /// Target run identifier.
1294        run_id: RunId,
1295        /// Proposed initial state.
1296        state: RunState,
1297    },
1298    /// Command expected previous state does not match observed projection state.
1299    PreviousStateMismatch {
1300        /// Target run identifier.
1301        run_id: RunId,
1302        /// Expected command previous state.
1303        expected: RunState,
1304        /// Observed projection state.
1305        actual: RunState,
1306    },
1307    /// Command requested an invalid lifecycle transition.
1308    InvalidTransition {
1309        /// Target run identifier.
1310        run_id: RunId,
1311        /// Requested source state.
1312        from: RunState,
1313        /// Requested target state.
1314        to: RunState,
1315    },
1316    /// Attempt-start command references a run that is unknown in projection state.
1317    AttemptStartUnknownRun {
1318        /// Missing run identifier.
1319        run_id: RunId,
1320    },
1321    /// Attempt-start command requires the run to be in Running.
1322    AttemptStartRequiresRunning {
1323        /// Target run identifier.
1324        run_id: RunId,
1325        /// Observed projection state.
1326        state: RunState,
1327    },
1328    /// Attempt-start command rejected because the run already has an active attempt.
1329    AttemptStartAlreadyActive {
1330        /// Target run identifier.
1331        run_id: RunId,
1332        /// Already active attempt identifier.
1333        active_attempt_id: AttemptId,
1334    },
1335    /// Attempt-finish command references a run that is unknown in projection state.
1336    AttemptFinishUnknownRun {
1337        /// Missing run identifier.
1338        run_id: RunId,
1339    },
1340    /// Attempt-finish command requires the run to be in Running.
1341    AttemptFinishRequiresRunning {
1342        /// Target run identifier.
1343        run_id: RunId,
1344        /// Observed projection state.
1345        state: RunState,
1346    },
1347    /// Attempt-finish command requires an active attempt to exist.
1348    AttemptFinishMissingActive {
1349        /// Target run identifier.
1350        run_id: RunId,
1351    },
1352    /// Attempt-finish command attempt ID does not match active projection attempt ID.
1353    AttemptFinishAttemptMismatch {
1354        /// Target run identifier.
1355        run_id: RunId,
1356        /// Currently active attempt identifier.
1357        expected_attempt_id: AttemptId,
1358        /// Attempt identifier provided by command.
1359        provided_attempt_id: AttemptId,
1360    },
1361    /// Lease lifecycle command references a run that is unknown in projection state.
1362    LeaseUnknownRun {
1363        /// Missing run identifier.
1364        run_id: RunId,
1365        /// Lease lifecycle command kind.
1366        event: LeaseValidationEvent,
1367    },
1368    /// Lease lifecycle command is invalid for the run's observed lifecycle state.
1369    LeaseInvalidRunState {
1370        /// Target run identifier.
1371        run_id: RunId,
1372        /// Lease lifecycle command kind.
1373        event: LeaseValidationEvent,
1374        /// Observed projection run state.
1375        state: RunState,
1376    },
1377    /// Lease-acquire command rejected because a lease is already active.
1378    LeaseAlreadyActive {
1379        /// Target run identifier.
1380        run_id: RunId,
1381    },
1382    /// Lease lifecycle command requires active lease metadata but none exists.
1383    LeaseMissingActive {
1384        /// Target run identifier.
1385        run_id: RunId,
1386        /// Lease lifecycle command kind.
1387        event: LeaseValidationEvent,
1388    },
1389    /// Lease lifecycle command owner does not match active lease owner.
1390    LeaseOwnerMismatch {
1391        /// Target run identifier.
1392        run_id: RunId,
1393        /// Lease lifecycle command kind.
1394        event: LeaseValidationEvent,
1395    },
1396    /// Lease close command expiry does not match active lease expiry.
1397    LeaseExpiryMismatch {
1398        /// Target run identifier.
1399        run_id: RunId,
1400        /// Lease lifecycle command kind.
1401        event: LeaseValidationEvent,
1402        /// Expected active lease expiry.
1403        expected_expiry: u64,
1404        /// Command-provided expiry.
1405        provided_expiry: u64,
1406    },
1407    /// Lease heartbeat command attempted to regress lease expiry.
1408    LeaseHeartbeatExpiryRegression {
1409        /// Target run identifier.
1410        run_id: RunId,
1411        /// Current active lease expiry.
1412        previous_expiry: u64,
1413        /// Heartbeat-proposed expiry.
1414        proposed_expiry: u64,
1415    },
1416    /// Dependency declaration command contains an empty depends_on list.
1417    EmptyDependencyList {
1418        /// The task whose dependency list was empty.
1419        task_id: TaskId,
1420    },
1421    /// Run suspend command rejected because the run is not in Running state.
1422    RunSuspendRequiresRunning {
1423        /// Target run identifier.
1424        run_id: RunId,
1425        /// Observed projection state.
1426        state: RunState,
1427    },
1428    /// Run resume command rejected because the run is not in Suspended state.
1429    RunResumeRequiresSuspended {
1430        /// Target run identifier.
1431        run_id: RunId,
1432        /// Observed projection state.
1433        state: RunState,
1434    },
1435    /// Budget replenish command rejected because no allocation exists.
1436    BudgetNotAllocated {
1437        /// Task targeted by the command.
1438        task_id: TaskId,
1439        /// Budget dimension that has no allocation.
1440        dimension: BudgetDimension,
1441    },
1442    /// Subscription create command rejected because the subscription already exists.
1443    SubscriptionAlreadyExists {
1444        /// Existing subscription identifier.
1445        subscription_id: SubscriptionId,
1446    },
1447    /// Subscription lifecycle command references a subscription that is unknown.
1448    UnknownSubscription {
1449        /// Missing subscription identifier.
1450        subscription_id: SubscriptionId,
1451    },
1452    /// Subscription cancel command rejected because the subscription is already canceled.
1453    SubscriptionAlreadyCanceled {
1454        /// Already canceled subscription identifier.
1455        subscription_id: SubscriptionId,
1456    },
1457}
1458
1459impl std::fmt::Display for MutationValidationError {
1460    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1461        match self {
1462            MutationValidationError::SequenceOverflow => {
1463                write!(f, "mutation sequence overflow while computing next expected sequence")
1464            }
1465            MutationValidationError::NonMonotonicSequence { expected, provided } => {
1466                write!(
1467                    f,
1468                    "mutation sequence rejected: expected next sequence {expected}, received \
1469                     {provided}"
1470                )
1471            }
1472            MutationValidationError::UnknownRun { run_id } => {
1473                write!(f, "mutation rejected: unknown run {run_id}")
1474            }
1475            MutationValidationError::UnknownTask { task_id } => {
1476                write!(f, "mutation rejected: unknown task {task_id}")
1477            }
1478            MutationValidationError::TaskAlreadyExists { task_id } => {
1479                write!(f, "mutation rejected: task {task_id} already exists")
1480            }
1481            MutationValidationError::TaskAlreadyCanceled { task_id } => {
1482                write!(f, "mutation rejected: task {task_id} already canceled")
1483            }
1484            MutationValidationError::EngineAlreadyPaused => {
1485                write!(f, "mutation rejected: engine already paused")
1486            }
1487            MutationValidationError::EngineNotPaused => {
1488                write!(f, "mutation rejected: engine not paused")
1489            }
1490            MutationValidationError::RunAlreadyExists { run_id } => {
1491                write!(f, "mutation rejected: run {run_id} already exists")
1492            }
1493            MutationValidationError::RunCreateRequiresScheduled { run_id, state } => {
1494                write!(
1495                    f,
1496                    "mutation rejected for run {run_id}: run creation requires Scheduled state, \
1497                     got {state:?}"
1498                )
1499            }
1500            MutationValidationError::PreviousStateMismatch { run_id, expected, actual } => {
1501                write!(
1502                    f,
1503                    "mutation rejected for run {run_id}: previous_state mismatch \
1504                     expected={expected:?} actual={actual:?}"
1505                )
1506            }
1507            MutationValidationError::InvalidTransition { run_id, from, to } => {
1508                write!(
1509                    f,
1510                    "mutation rejected for run {run_id}: invalid transition {from:?} -> {to:?}"
1511                )
1512            }
1513            MutationValidationError::AttemptStartUnknownRun { run_id } => {
1514                write!(f, "attempt-start rejected: unknown run {run_id}")
1515            }
1516            MutationValidationError::AttemptStartRequiresRunning { run_id, state } => {
1517                write!(
1518                    f,
1519                    "attempt-start rejected for run {run_id}: run must be Running, observed \
1520                     {state:?}"
1521                )
1522            }
1523            MutationValidationError::AttemptStartAlreadyActive { run_id, active_attempt_id } => {
1524                write!(
1525                    f,
1526                    "attempt-start rejected for run {run_id}: active attempt {active_attempt_id} \
1527                     already exists"
1528                )
1529            }
1530            MutationValidationError::AttemptFinishUnknownRun { run_id } => {
1531                write!(f, "attempt-finish rejected: unknown run {run_id}")
1532            }
1533            MutationValidationError::AttemptFinishRequiresRunning { run_id, state } => {
1534                write!(
1535                    f,
1536                    "attempt-finish rejected for run {run_id}: run must be Running, observed \
1537                     {state:?}"
1538                )
1539            }
1540            MutationValidationError::AttemptFinishMissingActive { run_id } => {
1541                write!(f, "attempt-finish rejected for run {run_id}: no active attempt present")
1542            }
1543            MutationValidationError::AttemptFinishAttemptMismatch {
1544                run_id,
1545                expected_attempt_id,
1546                provided_attempt_id,
1547            } => {
1548                write!(
1549                    f,
1550                    "attempt-finish rejected for run {run_id}: attempt mismatch \
1551                     expected={expected_attempt_id} provided={provided_attempt_id}"
1552                )
1553            }
1554            MutationValidationError::LeaseUnknownRun { run_id, event } => {
1555                write!(f, "{event} rejected: unknown run {run_id}")
1556            }
1557            MutationValidationError::LeaseInvalidRunState { run_id, event, state } => {
1558                write!(f, "{event} rejected for run {run_id}: invalid state {state:?}")
1559            }
1560            MutationValidationError::LeaseAlreadyActive { run_id } => {
1561                write!(f, "lease acquire rejected for run {run_id}: lease already active")
1562            }
1563            MutationValidationError::LeaseMissingActive { run_id, event } => {
1564                write!(f, "{event} rejected for run {run_id}: missing active lease")
1565            }
1566            MutationValidationError::LeaseOwnerMismatch { run_id, event } => {
1567                write!(f, "{event} rejected for run {run_id}: owner mismatch")
1568            }
1569            MutationValidationError::LeaseExpiryMismatch {
1570                run_id,
1571                event,
1572                expected_expiry,
1573                provided_expiry,
1574            } => {
1575                write!(
1576                    f,
1577                    "{event} rejected for run {run_id}: expiry mismatch \
1578                     expected={expected_expiry} provided={provided_expiry}"
1579                )
1580            }
1581            MutationValidationError::LeaseHeartbeatExpiryRegression {
1582                run_id,
1583                previous_expiry,
1584                proposed_expiry,
1585            } => {
1586                write!(
1587                    f,
1588                    "lease heartbeat rejected for run {run_id}: expiry regression \
1589                     previous={previous_expiry} proposed={proposed_expiry}"
1590                )
1591            }
1592            MutationValidationError::EmptyDependencyList { task_id } => {
1593                write!(
1594                    f,
1595                    "dependency declaration rejected for task {task_id}: depends_on list is empty"
1596                )
1597            }
1598            MutationValidationError::RunSuspendRequiresRunning { run_id, state } => {
1599                write!(
1600                    f,
1601                    "run-suspend rejected for run {run_id}: run must be Running, observed \
1602                     {state:?}"
1603                )
1604            }
1605            MutationValidationError::RunResumeRequiresSuspended { run_id, state } => {
1606                write!(
1607                    f,
1608                    "run-resume rejected for run {run_id}: run must be Suspended, observed \
1609                     {state:?}"
1610                )
1611            }
1612            MutationValidationError::BudgetNotAllocated { task_id, dimension } => {
1613                write!(
1614                    f,
1615                    "budget-replenish rejected for task {task_id}: no allocation exists for \
1616                     dimension {dimension}"
1617                )
1618            }
1619            MutationValidationError::SubscriptionAlreadyExists { subscription_id } => {
1620                write!(
1621                    f,
1622                    "subscription-create rejected: subscription {subscription_id} already exists"
1623                )
1624            }
1625            MutationValidationError::UnknownSubscription { subscription_id } => {
1626                write!(f, "mutation rejected: unknown subscription {subscription_id}")
1627            }
1628            MutationValidationError::SubscriptionAlreadyCanceled { subscription_id } => {
1629                write!(
1630                    f,
1631                    "subscription-cancel rejected: subscription {subscription_id} already canceled"
1632                )
1633            }
1634        }
1635    }
1636}
1637
1638impl std::fmt::Display for LeaseValidationEvent {
1639    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1640        match self {
1641            LeaseValidationEvent::Acquire => write!(f, "lease acquire"),
1642            LeaseValidationEvent::Heartbeat => write!(f, "lease heartbeat"),
1643            LeaseValidationEvent::Expire => write!(f, "lease expire"),
1644            LeaseValidationEvent::Release => write!(f, "lease release"),
1645        }
1646    }
1647}
1648
1649impl std::error::Error for MutationValidationError {}
1650
1651/// Typed stage-aware authority failures.
1652#[derive(Debug, Clone, PartialEq, Eq)]
1653pub enum MutationAuthorityError<ProjectionError> {
1654    /// Validation stage failure.
1655    Validation(MutationValidationError),
1656    /// WAL append stage failure.
1657    Append(WalWriterError),
1658    /// Append succeeded but flush failed. The event MAY be durable on restart
1659    /// (OS page cache) but fsync was not confirmed.
1660    PartialDurability {
1661        /// Sequence that was appended but not confirmed durable.
1662        sequence: u64,
1663        /// Underlying flush error.
1664        flush_error: WalWriterError,
1665    },
1666    /// Projection apply stage failure after append.
1667    Apply {
1668        /// Sequence that was already appended durably.
1669        sequence: u64,
1670        /// Underlying projection apply error.
1671        source: ProjectionError,
1672    },
1673}
1674
1675impl<ProjectionError: std::fmt::Display> std::fmt::Display
1676    for MutationAuthorityError<ProjectionError>
1677{
1678    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1679        match self {
1680            MutationAuthorityError::Validation(error) => {
1681                write!(f, "mutation validation failed: {error}")
1682            }
1683            MutationAuthorityError::Append(error) => {
1684                write!(f, "mutation append stage failed: {error}")
1685            }
1686            MutationAuthorityError::PartialDurability { sequence, flush_error } => {
1687                write!(
1688                    f,
1689                    "mutation partial durability: append succeeded at sequence {sequence} but \
1690                     flush failed: {flush_error}"
1691                )
1692            }
1693            MutationAuthorityError::Apply { sequence, source } => {
1694                write!(
1695                    f,
1696                    "mutation apply stage failed after durable append sequence {sequence}: \
1697                     {source}"
1698                )
1699            }
1700        }
1701    }
1702}
1703
1704impl<ProjectionError: std::error::Error + 'static> std::error::Error
1705    for MutationAuthorityError<ProjectionError>
1706{
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711    use actionqueue_core::ids::{RunId, TaskId};
1712    use actionqueue_core::task::constraints::TaskConstraints;
1713    use actionqueue_core::task::metadata::TaskMetadata;
1714    use actionqueue_core::task::run_policy::RunPolicy;
1715    use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
1716
1717    use super::*;
1718
1719    #[derive(Debug, Default)]
1720    struct ProjectionStub {
1721        latest_sequence: u64,
1722        tasks: std::collections::HashSet<TaskId>,
1723        canceled_tasks: std::collections::HashSet<TaskId>,
1724        engine_paused: bool,
1725        runs: std::collections::HashMap<RunId, RunState>,
1726        active_attempts: std::collections::HashMap<RunId, AttemptId>,
1727        active_leases: std::collections::HashMap<RunId, (String, u64)>,
1728    }
1729
1730    impl MutationProjection for ProjectionStub {
1731        type Error = &'static str;
1732
1733        fn latest_sequence(&self) -> u64 {
1734            self.latest_sequence
1735        }
1736
1737        fn run_state(&self, run_id: &RunId) -> Option<RunState> {
1738            self.runs.get(run_id).copied()
1739        }
1740
1741        fn task_exists(&self, task_id: TaskId) -> bool {
1742            self.tasks.contains(&task_id)
1743        }
1744
1745        fn is_task_canceled(&self, task_id: TaskId) -> bool {
1746            self.canceled_tasks.contains(&task_id)
1747        }
1748
1749        fn is_engine_paused(&self) -> bool {
1750            self.engine_paused
1751        }
1752
1753        fn active_attempt_id(&self, run_id: &RunId) -> Option<AttemptId> {
1754            self.active_attempts.get(run_id).copied()
1755        }
1756
1757        fn active_lease(&self, run_id: &RunId) -> Option<(String, u64)> {
1758            self.active_leases.get(run_id).cloned()
1759        }
1760
1761        fn apply_event(&mut self, event: &WalEvent) -> Result<(), Self::Error> {
1762            self.latest_sequence = event.sequence();
1763            match event.event() {
1764                WalEventType::TaskCreated { task_spec, .. } => {
1765                    self.tasks.insert(task_spec.id());
1766                }
1767                WalEventType::RunCreated { run_instance } => {
1768                    self.runs.insert(run_instance.id(), run_instance.state());
1769                }
1770                WalEventType::RunStateChanged { run_id, new_state, .. } => {
1771                    self.runs.insert(*run_id, *new_state);
1772                }
1773                WalEventType::AttemptStarted { run_id, attempt_id, .. } => {
1774                    self.active_attempts.insert(*run_id, *attempt_id);
1775                }
1776                WalEventType::AttemptFinished { run_id, .. } => {
1777                    self.active_attempts.remove(run_id);
1778                }
1779                WalEventType::LeaseAcquired { run_id, owner, expiry, .. } => {
1780                    self.active_leases.insert(*run_id, (owner.clone(), *expiry));
1781                }
1782                WalEventType::LeaseHeartbeat { run_id, owner, expiry, .. } => {
1783                    self.active_leases.insert(*run_id, (owner.clone(), *expiry));
1784                }
1785                WalEventType::LeaseExpired { run_id, .. }
1786                | WalEventType::LeaseReleased { run_id, .. } => {
1787                    self.active_leases.remove(run_id);
1788                }
1789                WalEventType::TaskCanceled { task_id, .. } => {
1790                    self.canceled_tasks.insert(*task_id);
1791                }
1792                WalEventType::EnginePaused { .. } => {
1793                    self.engine_paused = true;
1794                }
1795                WalEventType::EngineResumed { .. } => {
1796                    self.engine_paused = false;
1797                }
1798                WalEventType::DependencyDeclared { .. } => {
1799                    // Dependency declarations are tracked in the full ReplayReducer,
1800                    // not in the lightweight test projection stub.
1801                }
1802                _ => {
1803                    // Sprint 3+ event types are not tracked in the lightweight test
1804                    // projection stub; they are handled by the full ReplayReducer.
1805                }
1806            }
1807            Ok(())
1808        }
1809    }
1810
1811    #[derive(Debug, Default)]
1812    struct WriterStub {
1813        events: Vec<WalEvent>,
1814        fail_append: bool,
1815        fail_flush: bool,
1816    }
1817
1818    impl WalWriter for WriterStub {
1819        fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError> {
1820            if self.fail_append {
1821                return Err(WalWriterError::IoError("append failed".to_string()));
1822            }
1823            self.events.push(event.clone());
1824            Ok(())
1825        }
1826
1827        fn flush(&mut self) -> Result<(), WalWriterError> {
1828            if self.fail_flush {
1829                return Err(WalWriterError::IoError("flush failed".to_string()));
1830            }
1831            Ok(())
1832        }
1833
1834        fn close(self) -> Result<(), WalWriterError> {
1835            Ok(())
1836        }
1837    }
1838
1839    fn test_task_spec(task_id: TaskId) -> TaskSpec {
1840        TaskSpec::new(
1841            task_id,
1842            TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
1843            RunPolicy::Once,
1844            TaskConstraints::default(),
1845            TaskMetadata::default(),
1846        )
1847        .expect("test task spec should be valid")
1848    }
1849
1850    #[test]
1851    fn submits_task_create_then_run_create_then_transition() {
1852        let task_id = TaskId::new();
1853        let run =
1854            actionqueue_core::run::run_instance::RunInstance::new_scheduled(task_id, 100, 100)
1855                .expect("test run should be valid");
1856
1857        let writer = WriterStub::default();
1858        let projection = ProjectionStub::default();
1859        let mut authority = StorageMutationAuthority::new(writer, projection);
1860
1861        let task_outcome = authority
1862            .submit_command(
1863                MutationCommand::TaskCreate(TaskCreateCommand::new(1, test_task_spec(task_id), 10)),
1864                DurabilityPolicy::Immediate,
1865            )
1866            .expect("task create should succeed");
1867        assert_eq!(task_outcome.sequence(), 1);
1868
1869        let run_outcome = authority
1870            .submit_command(
1871                MutationCommand::RunCreate(RunCreateCommand::new(2, run.clone())),
1872                DurabilityPolicy::Immediate,
1873            )
1874            .expect("run create should succeed");
1875        assert_eq!(run_outcome.sequence(), 2);
1876
1877        let transition_outcome = authority
1878            .submit_command(
1879                MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1880                    3,
1881                    run.id(),
1882                    RunState::Scheduled,
1883                    RunState::Ready,
1884                    100,
1885                )),
1886                DurabilityPolicy::Immediate,
1887            )
1888            .expect("transition should succeed");
1889        assert_eq!(transition_outcome.sequence(), 3);
1890
1891        let (_writer, projection) = authority.into_parts();
1892        assert!(projection.task_exists(task_id));
1893        assert_eq!(projection.run_state(&run.id()), Some(RunState::Ready));
1894    }
1895
1896    #[test]
1897    fn partial_durability_when_append_ok_flush_err() {
1898        let task_id = TaskId::new();
1899        let writer = WriterStub { fail_flush: true, ..Default::default() };
1900        let projection = ProjectionStub::default();
1901        let mut authority = StorageMutationAuthority::new(writer, projection);
1902
1903        let result = authority.submit_command(
1904            MutationCommand::TaskCreate(TaskCreateCommand::new(1, test_task_spec(task_id), 10)),
1905            DurabilityPolicy::Immediate,
1906        );
1907
1908        match result {
1909            Err(MutationAuthorityError::PartialDurability { sequence, flush_error }) => {
1910                assert_eq!(sequence, 1);
1911                assert_eq!(flush_error, WalWriterError::IoError("flush failed".to_string()));
1912            }
1913            other => panic!("expected PartialDurability, got {other:?}"),
1914        }
1915    }
1916}