1use actionqueue_core::budget::BudgetDimension;
7use actionqueue_core::ids::{AttemptId, RunId, TaskId};
8use actionqueue_core::mutation::{
9 ActorDeregisterCommand, ActorHeartbeatCommand, ActorRegisterCommand, AppliedMutation,
10 AttemptFinishCommand, AttemptStartCommand, BudgetAllocateCommand, BudgetConsumeCommand,
11 BudgetReplenishCommand, CapabilityGrantCommand, CapabilityRevokeCommand,
12 DependencyDeclareCommand, DurabilityPolicy, EnginePauseCommand, EngineResumeCommand,
13 LeaseAcquireCommand, LeaseExpireCommand, LeaseHeartbeatCommand, LeaseReleaseCommand,
14 LedgerAppendCommand, MutationAuthority, MutationCommand, MutationOutcome, RoleAssignCommand,
15 RunCreateCommand, RunResumeCommand, RunStateTransitionCommand, RunSuspendCommand,
16 SubscriptionCancelCommand, SubscriptionCreateCommand, SubscriptionTriggerCommand,
17 TaskCancelCommand, TaskCreateCommand, TenantCreateCommand,
18};
19use actionqueue_core::run::state::RunState;
20use actionqueue_core::run::transitions::is_valid_transition;
21use actionqueue_core::subscription::SubscriptionId;
22
23use crate::recovery::reducer::ReplayReducer;
24use crate::recovery::reducer::ReplayReducerError;
25use crate::wal::event::{WalEvent, WalEventType};
26use crate::wal::writer::{WalWriter, WalWriterError};
27
28pub trait MutationProjection {
30 type Error;
32
33 fn latest_sequence(&self) -> u64;
35
36 fn run_state(&self, run_id: &RunId) -> Option<RunState>;
38
39 fn task_exists(&self, task_id: TaskId) -> bool;
41
42 fn is_task_canceled(&self, task_id: TaskId) -> bool;
44
45 fn is_engine_paused(&self) -> bool;
47
48 fn active_attempt_id(&self, run_id: &RunId) -> Option<AttemptId>;
50
51 fn active_lease(&self, run_id: &RunId) -> Option<(String, u64)>;
53
54 fn budget_allocation_exists(&self, _task_id: TaskId, _dimension: BudgetDimension) -> bool {
56 false
57 }
58
59 fn subscription_exists(&self, _subscription_id: SubscriptionId) -> bool {
61 false
62 }
63
64 fn is_subscription_canceled(&self, _subscription_id: SubscriptionId) -> bool {
66 false
67 }
68
69 fn apply_event(&mut self, event: &WalEvent) -> Result<(), Self::Error>;
71}
72
73impl MutationProjection for ReplayReducer {
74 type Error = ReplayReducerError;
75
76 fn latest_sequence(&self) -> u64 {
77 ReplayReducer::latest_sequence(self)
78 }
79
80 fn run_state(&self, run_id: &RunId) -> Option<RunState> {
81 self.get_run_state(run_id).copied()
82 }
83
84 fn task_exists(&self, task_id: TaskId) -> bool {
85 self.get_task(&task_id).is_some()
86 }
87
88 fn is_task_canceled(&self, task_id: TaskId) -> bool {
89 ReplayReducer::is_task_canceled(self, task_id)
90 }
91
92 fn is_engine_paused(&self) -> bool {
93 ReplayReducer::is_engine_paused(self)
94 }
95
96 fn active_attempt_id(&self, run_id: &RunId) -> Option<AttemptId> {
97 self.get_run_instance(run_id).and_then(|run| run.current_attempt_id())
98 }
99
100 fn active_lease(&self, run_id: &RunId) -> Option<(String, u64)> {
101 self.get_lease(run_id).cloned()
102 }
103
104 fn budget_allocation_exists(&self, task_id: TaskId, dimension: BudgetDimension) -> bool {
105 ReplayReducer::budget_allocation_exists(self, task_id, dimension)
106 }
107
108 fn subscription_exists(&self, subscription_id: SubscriptionId) -> bool {
109 ReplayReducer::subscription_exists(self, subscription_id)
110 }
111
112 fn is_subscription_canceled(&self, subscription_id: SubscriptionId) -> bool {
113 ReplayReducer::is_subscription_canceled(self, subscription_id)
114 }
115
116 fn apply_event(&mut self, event: &WalEvent) -> Result<(), Self::Error> {
117 self.apply(event)
118 }
119}
120
121#[derive(Debug)]
123pub struct StorageMutationAuthority<W: WalWriter, P: MutationProjection> {
124 wal_writer: W,
125 projection: P,
126}
127
128impl<W: WalWriter, P: MutationProjection> StorageMutationAuthority<W, P> {
129 pub fn new(wal_writer: W, projection: P) -> Self {
131 Self { wal_writer, projection }
132 }
133
134 pub fn projection(&self) -> &P {
136 &self.projection
137 }
138
139 pub fn projection_mut(&mut self) -> &mut P {
141 &mut self.projection
142 }
143
144 pub fn into_parts(self) -> (W, P) {
146 (self.wal_writer, self.projection)
147 }
148
149 fn validate_command(
150 &self,
151 command: &MutationCommand,
152 ) -> Result<ValidatedCommand, MutationValidationError> {
153 match command {
154 MutationCommand::TaskCreate(details) => {
155 self.validate_task_create(details)?;
156 Ok(ValidatedCommand::TaskCreate(details.clone()))
157 }
158 MutationCommand::RunCreate(details) => {
159 self.validate_run_create(details)?;
160 Ok(ValidatedCommand::RunCreate(details.clone()))
161 }
162 MutationCommand::RunStateTransition(details) => {
163 self.validate_run_state_transition(*details)?;
164 Ok(ValidatedCommand::RunStateTransition(*details))
165 }
166 MutationCommand::AttemptStart(details) => {
167 self.validate_attempt_start(*details)?;
168 Ok(ValidatedCommand::AttemptStart(*details))
169 }
170 MutationCommand::AttemptFinish(details) => {
171 self.validate_attempt_finish(details)?;
172 Ok(ValidatedCommand::AttemptFinish(details.clone()))
173 }
174 MutationCommand::LeaseAcquire(details) => {
175 self.validate_lease_acquire(details)?;
176 Ok(ValidatedCommand::LeaseAcquire(details.clone()))
177 }
178 MutationCommand::LeaseHeartbeat(details) => {
179 self.validate_lease_heartbeat(details)?;
180 Ok(ValidatedCommand::LeaseHeartbeat(details.clone()))
181 }
182 MutationCommand::LeaseExpire(details) => {
183 self.validate_lease_expire(details)?;
184 Ok(ValidatedCommand::LeaseExpire(details.clone()))
185 }
186 MutationCommand::LeaseRelease(details) => {
187 self.validate_lease_release(details)?;
188 Ok(ValidatedCommand::LeaseRelease(details.clone()))
189 }
190 MutationCommand::EnginePause(details) => {
191 self.validate_engine_pause(*details)?;
192 Ok(ValidatedCommand::EnginePause(*details))
193 }
194 MutationCommand::EngineResume(details) => {
195 self.validate_engine_resume(*details)?;
196 Ok(ValidatedCommand::EngineResume(*details))
197 }
198 MutationCommand::TaskCancel(details) => {
199 self.validate_task_cancel(*details)?;
200 Ok(ValidatedCommand::TaskCancel(*details))
201 }
202 MutationCommand::DependencyDeclare(details) => {
203 self.validate_dependency_declare(details)?;
204 Ok(ValidatedCommand::DependencyDeclare(details.clone()))
205 }
206 MutationCommand::RunSuspend(details) => {
207 self.validate_run_suspend(details)?;
208 Ok(ValidatedCommand::RunSuspend(details.clone()))
209 }
210 MutationCommand::RunResume(details) => {
211 self.validate_run_resume(details)?;
212 Ok(ValidatedCommand::RunResume(*details))
213 }
214 MutationCommand::BudgetAllocate(details) => {
215 self.validate_budget_allocate(details)?;
216 Ok(ValidatedCommand::BudgetAllocate(*details))
217 }
218 MutationCommand::BudgetConsume(details) => {
219 self.validate_budget_consume(details)?;
220 Ok(ValidatedCommand::BudgetConsume(*details))
221 }
222 MutationCommand::BudgetReplenish(details) => {
223 self.validate_budget_replenish(details)?;
224 Ok(ValidatedCommand::BudgetReplenish(*details))
225 }
226 MutationCommand::SubscriptionCreate(details) => {
227 self.validate_subscription_create(details)?;
228 Ok(ValidatedCommand::SubscriptionCreate(details.clone()))
229 }
230 MutationCommand::SubscriptionCancel(details) => {
231 self.validate_subscription_cancel(details)?;
232 Ok(ValidatedCommand::SubscriptionCancel(*details))
233 }
234 MutationCommand::SubscriptionTrigger(details) => {
235 self.validate_sequence(details.sequence())?;
236 Ok(ValidatedCommand::SubscriptionTrigger(*details))
237 }
238 MutationCommand::ActorRegister(details) => {
239 self.validate_sequence(details.sequence())?;
240 Ok(ValidatedCommand::ActorRegister(details.clone()))
241 }
242 MutationCommand::ActorDeregister(details) => {
243 self.validate_sequence(details.sequence())?;
244 Ok(ValidatedCommand::ActorDeregister(*details))
245 }
246 MutationCommand::ActorHeartbeat(details) => {
247 self.validate_sequence(details.sequence())?;
248 Ok(ValidatedCommand::ActorHeartbeat(*details))
249 }
250 MutationCommand::TenantCreate(details) => {
251 self.validate_sequence(details.sequence())?;
252 Ok(ValidatedCommand::TenantCreate(details.clone()))
253 }
254 MutationCommand::RoleAssign(details) => {
255 self.validate_sequence(details.sequence())?;
256 Ok(ValidatedCommand::RoleAssign(details.clone()))
257 }
258 MutationCommand::CapabilityGrant(details) => {
259 self.validate_sequence(details.sequence())?;
260 Ok(ValidatedCommand::CapabilityGrant(details.clone()))
261 }
262 MutationCommand::CapabilityRevoke(details) => {
263 self.validate_sequence(details.sequence())?;
264 Ok(ValidatedCommand::CapabilityRevoke(details.clone()))
265 }
266 MutationCommand::LedgerAppend(details) => {
267 self.validate_sequence(details.sequence())?;
268 Ok(ValidatedCommand::LedgerAppend(details.clone()))
269 }
270 }
271 }
272
273 fn validate_sequence(&self, provided: u64) -> Result<(), MutationValidationError> {
274 let expected_sequence = self
275 .projection
276 .latest_sequence()
277 .checked_add(1)
278 .ok_or(MutationValidationError::SequenceOverflow)?;
279
280 if provided != expected_sequence {
281 tracing::warn!(
282 expected = expected_sequence,
283 provided,
284 "sequence monotonicity violation"
285 );
286 return Err(MutationValidationError::NonMonotonicSequence {
287 expected: expected_sequence,
288 provided,
289 });
290 }
291
292 Ok(())
293 }
294
295 fn validate_task_create(
296 &self,
297 command: &TaskCreateCommand,
298 ) -> Result<(), MutationValidationError> {
299 self.validate_sequence(command.sequence())?;
300
301 if self.projection.task_exists(command.task_spec().id()) {
302 return Err(MutationValidationError::TaskAlreadyExists {
303 task_id: command.task_spec().id(),
304 });
305 }
306
307 Ok(())
308 }
309
310 fn validate_run_create(
311 &self,
312 command: &RunCreateCommand,
313 ) -> Result<(), MutationValidationError> {
314 self.validate_sequence(command.sequence())?;
315
316 if !self.projection.task_exists(command.run_instance().task_id()) {
317 return Err(MutationValidationError::UnknownTask {
318 task_id: command.run_instance().task_id(),
319 });
320 }
321
322 if self.projection.run_state(&command.run_instance().id()).is_some() {
323 return Err(MutationValidationError::RunAlreadyExists {
324 run_id: command.run_instance().id(),
325 });
326 }
327
328 if command.run_instance().state() != RunState::Scheduled {
329 return Err(MutationValidationError::RunCreateRequiresScheduled {
330 run_id: command.run_instance().id(),
331 state: command.run_instance().state(),
332 });
333 }
334
335 Ok(())
336 }
337
338 fn validate_run_state_transition(
339 &self,
340 command: RunStateTransitionCommand,
341 ) -> Result<(), MutationValidationError> {
342 self.validate_sequence(command.sequence())?;
343
344 let observed_state = self
345 .projection
346 .run_state(&command.run_id())
347 .ok_or(MutationValidationError::UnknownRun { run_id: command.run_id() })?;
348
349 if observed_state != command.previous_state() {
350 return Err(MutationValidationError::PreviousStateMismatch {
351 run_id: command.run_id(),
352 expected: command.previous_state(),
353 actual: observed_state,
354 });
355 }
356
357 if !is_valid_transition(command.previous_state(), command.new_state()) {
358 return Err(MutationValidationError::InvalidTransition {
359 run_id: command.run_id(),
360 from: command.previous_state(),
361 to: command.new_state(),
362 });
363 }
364
365 Ok(())
366 }
367
368 fn validate_attempt_start(
369 &self,
370 command: AttemptStartCommand,
371 ) -> Result<(), MutationValidationError> {
372 self.validate_sequence(command.sequence())?;
373
374 let observed_state = self
375 .projection
376 .run_state(&command.run_id())
377 .ok_or(MutationValidationError::AttemptStartUnknownRun { run_id: command.run_id() })?;
378
379 if observed_state != RunState::Running {
380 return Err(MutationValidationError::AttemptStartRequiresRunning {
381 run_id: command.run_id(),
382 state: observed_state,
383 });
384 }
385
386 if let Some(active_attempt_id) = self.projection.active_attempt_id(&command.run_id()) {
387 return Err(MutationValidationError::AttemptStartAlreadyActive {
388 run_id: command.run_id(),
389 active_attempt_id,
390 });
391 }
392
393 Ok(())
394 }
395
396 fn validate_attempt_finish(
397 &self,
398 command: &AttemptFinishCommand,
399 ) -> Result<(), MutationValidationError> {
400 self.validate_sequence(command.sequence())?;
401
402 let observed_state = self
403 .projection
404 .run_state(&command.run_id())
405 .ok_or(MutationValidationError::AttemptFinishUnknownRun { run_id: command.run_id() })?;
406
407 if observed_state != RunState::Running {
408 return Err(MutationValidationError::AttemptFinishRequiresRunning {
409 run_id: command.run_id(),
410 state: observed_state,
411 });
412 }
413
414 let active_attempt_id = self.projection.active_attempt_id(&command.run_id()).ok_or(
415 MutationValidationError::AttemptFinishMissingActive { run_id: command.run_id() },
416 )?;
417
418 if active_attempt_id != command.attempt_id() {
419 return Err(MutationValidationError::AttemptFinishAttemptMismatch {
420 run_id: command.run_id(),
421 expected_attempt_id: active_attempt_id,
422 provided_attempt_id: command.attempt_id(),
423 });
424 }
425
426 Ok(())
427 }
428
429 fn validate_lease_acquire(
430 &self,
431 command: &LeaseAcquireCommand,
432 ) -> Result<(), MutationValidationError> {
433 self.validate_sequence(command.sequence())?;
434
435 let observed_state = self.projection.run_state(&command.run_id()).ok_or(
436 MutationValidationError::LeaseUnknownRun {
437 run_id: command.run_id(),
438 event: LeaseValidationEvent::Acquire,
439 },
440 )?;
441
442 if !matches!(observed_state, RunState::Ready | RunState::Leased) {
443 return Err(MutationValidationError::LeaseInvalidRunState {
444 run_id: command.run_id(),
445 event: LeaseValidationEvent::Acquire,
446 state: observed_state,
447 });
448 }
449
450 if self.projection.active_lease(&command.run_id()).is_some() {
451 return Err(MutationValidationError::LeaseAlreadyActive { run_id: command.run_id() });
452 }
453
454 Ok(())
455 }
456
457 fn validate_lease_heartbeat(
458 &self,
459 command: &LeaseHeartbeatCommand,
460 ) -> Result<(), MutationValidationError> {
461 self.validate_sequence(command.sequence())?;
462
463 let observed_state = self.projection.run_state(&command.run_id()).ok_or(
464 MutationValidationError::LeaseUnknownRun {
465 run_id: command.run_id(),
466 event: LeaseValidationEvent::Heartbeat,
467 },
468 )?;
469
470 if !matches!(observed_state, RunState::Leased | RunState::Running) {
471 return Err(MutationValidationError::LeaseInvalidRunState {
472 run_id: command.run_id(),
473 event: LeaseValidationEvent::Heartbeat,
474 state: observed_state,
475 });
476 }
477
478 let (active_owner, active_expiry) = self.projection.active_lease(&command.run_id()).ok_or(
479 MutationValidationError::LeaseMissingActive {
480 run_id: command.run_id(),
481 event: LeaseValidationEvent::Heartbeat,
482 },
483 )?;
484
485 if active_owner != command.owner() {
486 return Err(MutationValidationError::LeaseOwnerMismatch {
487 run_id: command.run_id(),
488 event: LeaseValidationEvent::Heartbeat,
489 });
490 }
491
492 if command.expiry() < active_expiry {
493 return Err(MutationValidationError::LeaseHeartbeatExpiryRegression {
494 run_id: command.run_id(),
495 previous_expiry: active_expiry,
496 proposed_expiry: command.expiry(),
497 });
498 }
499
500 Ok(())
501 }
502
503 fn validate_lease_expire(
504 &self,
505 command: &LeaseExpireCommand,
506 ) -> Result<(), MutationValidationError> {
507 self.validate_lease_close(LeaseCloseParams {
508 sequence: command.sequence(),
509 run_id: command.run_id(),
510 owner: command.owner(),
511 expiry: command.expiry(),
512 event: LeaseValidationEvent::Expire,
513 })
514 }
515
516 fn validate_lease_release(
517 &self,
518 command: &LeaseReleaseCommand,
519 ) -> Result<(), MutationValidationError> {
520 self.validate_lease_close(LeaseCloseParams {
521 sequence: command.sequence(),
522 run_id: command.run_id(),
523 owner: command.owner(),
524 expiry: command.expiry(),
525 event: LeaseValidationEvent::Release,
526 })
527 }
528
529 fn validate_lease_close(
530 &self,
531 params: LeaseCloseParams<'_>,
532 ) -> Result<(), MutationValidationError> {
533 let LeaseCloseParams { sequence, run_id, owner, expiry, event } = params;
534 self.validate_sequence(sequence)?;
535
536 let observed_state = self
537 .projection
538 .run_state(&run_id)
539 .ok_or(MutationValidationError::LeaseUnknownRun { run_id, event })?;
540
541 if !matches!(observed_state, RunState::Ready | RunState::Leased | RunState::Running) {
542 return Err(MutationValidationError::LeaseInvalidRunState {
543 run_id,
544 event,
545 state: observed_state,
546 });
547 }
548
549 let (active_owner, active_expiry) = self
550 .projection
551 .active_lease(&run_id)
552 .ok_or(MutationValidationError::LeaseMissingActive { run_id, event })?;
553
554 if active_owner != owner {
555 return Err(MutationValidationError::LeaseOwnerMismatch { run_id, event });
556 }
557
558 if active_expiry != expiry {
559 return Err(MutationValidationError::LeaseExpiryMismatch {
560 run_id,
561 event,
562 expected_expiry: active_expiry,
563 provided_expiry: expiry,
564 });
565 }
566
567 Ok(())
568 }
569
570 fn validate_task_cancel(
571 &self,
572 command: TaskCancelCommand,
573 ) -> Result<(), MutationValidationError> {
574 self.validate_sequence(command.sequence())?;
575
576 if !self.projection.task_exists(command.task_id()) {
577 return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
578 }
579
580 if self.projection.is_task_canceled(command.task_id()) {
581 return Err(MutationValidationError::TaskAlreadyCanceled {
582 task_id: command.task_id(),
583 });
584 }
585
586 Ok(())
587 }
588
589 fn validate_dependency_declare(
590 &self,
591 command: &DependencyDeclareCommand,
592 ) -> Result<(), MutationValidationError> {
593 self.validate_sequence(command.sequence())?;
594
595 if command.depends_on().is_empty() {
597 return Err(MutationValidationError::EmptyDependencyList {
598 task_id: command.task_id(),
599 });
600 }
601
602 if !self.projection.task_exists(command.task_id()) {
604 return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
605 }
606
607 for &prereq in command.depends_on() {
609 if !self.projection.task_exists(prereq) {
610 return Err(MutationValidationError::UnknownTask { task_id: prereq });
611 }
612 }
613
614 Ok(())
615 }
616
617 fn validate_engine_pause(
618 &self,
619 command: EnginePauseCommand,
620 ) -> Result<(), MutationValidationError> {
621 self.validate_sequence(command.sequence())?;
622
623 if self.projection.is_engine_paused() {
624 return Err(MutationValidationError::EngineAlreadyPaused);
625 }
626
627 Ok(())
628 }
629
630 fn validate_engine_resume(
631 &self,
632 command: EngineResumeCommand,
633 ) -> Result<(), MutationValidationError> {
634 self.validate_sequence(command.sequence())?;
635
636 if !self.projection.is_engine_paused() {
637 return Err(MutationValidationError::EngineNotPaused);
638 }
639
640 Ok(())
641 }
642
643 fn validate_run_suspend(
644 &self,
645 command: &RunSuspendCommand,
646 ) -> Result<(), MutationValidationError> {
647 self.validate_sequence(command.sequence())?;
648 let state = self
649 .projection
650 .run_state(&command.run_id())
651 .ok_or(MutationValidationError::UnknownRun { run_id: command.run_id() })?;
652 if state != RunState::Running {
653 return Err(MutationValidationError::RunSuspendRequiresRunning {
654 run_id: command.run_id(),
655 state,
656 });
657 }
658 Ok(())
659 }
660
661 fn validate_run_resume(
662 &self,
663 command: &RunResumeCommand,
664 ) -> Result<(), MutationValidationError> {
665 self.validate_sequence(command.sequence())?;
666 let state = self
667 .projection
668 .run_state(&command.run_id())
669 .ok_or(MutationValidationError::UnknownRun { run_id: command.run_id() })?;
670 if state != RunState::Suspended {
671 return Err(MutationValidationError::RunResumeRequiresSuspended {
672 run_id: command.run_id(),
673 state,
674 });
675 }
676 Ok(())
677 }
678
679 fn validate_budget_allocate(
680 &self,
681 command: &BudgetAllocateCommand,
682 ) -> Result<(), MutationValidationError> {
683 self.validate_sequence(command.sequence())?;
684 if !self.projection.task_exists(command.task_id()) {
685 return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
686 }
687 Ok(())
688 }
689
690 fn validate_budget_consume(
691 &self,
692 command: &BudgetConsumeCommand,
693 ) -> Result<(), MutationValidationError> {
694 self.validate_sequence(command.sequence())?;
695 if !self.projection.task_exists(command.task_id()) {
696 return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
697 }
698 Ok(())
699 }
700
701 fn validate_budget_replenish(
702 &self,
703 command: &BudgetReplenishCommand,
704 ) -> Result<(), MutationValidationError> {
705 self.validate_sequence(command.sequence())?;
706 if !self.projection.task_exists(command.task_id()) {
707 return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
708 }
709 if !self.projection.budget_allocation_exists(command.task_id(), command.dimension()) {
710 return Err(MutationValidationError::BudgetNotAllocated {
711 task_id: command.task_id(),
712 dimension: command.dimension(),
713 });
714 }
715 Ok(())
716 }
717
718 fn validate_subscription_create(
719 &self,
720 command: &SubscriptionCreateCommand,
721 ) -> Result<(), MutationValidationError> {
722 self.validate_sequence(command.sequence())?;
723 if !self.projection.task_exists(command.task_id()) {
724 return Err(MutationValidationError::UnknownTask { task_id: command.task_id() });
725 }
726 if self.projection.subscription_exists(command.subscription_id()) {
727 return Err(MutationValidationError::SubscriptionAlreadyExists {
728 subscription_id: command.subscription_id(),
729 });
730 }
731 Ok(())
732 }
733
734 fn validate_subscription_cancel(
735 &self,
736 command: &SubscriptionCancelCommand,
737 ) -> Result<(), MutationValidationError> {
738 self.validate_sequence(command.sequence())?;
739 if !self.projection.subscription_exists(command.subscription_id()) {
740 return Err(MutationValidationError::UnknownSubscription {
741 subscription_id: command.subscription_id(),
742 });
743 }
744 if self.projection.is_subscription_canceled(command.subscription_id()) {
745 return Err(MutationValidationError::SubscriptionAlreadyCanceled {
746 subscription_id: command.subscription_id(),
747 });
748 }
749 Ok(())
750 }
751
752 fn build_event_and_applied(validated: ValidatedCommand) -> (WalEvent, AppliedMutation) {
753 match validated {
754 ValidatedCommand::TaskCreate(command) => {
755 let task_id = command.task_spec().id();
756 let event = WalEvent::new(
757 command.sequence(),
758 WalEventType::TaskCreated {
759 task_spec: command.task_spec().clone(),
760 timestamp: command.timestamp(),
761 },
762 );
763 let applied = AppliedMutation::TaskCreate { task_id };
764 (event, applied)
765 }
766 ValidatedCommand::RunCreate(command) => {
767 let run_id = command.run_instance().id();
768 let task_id = command.run_instance().task_id();
769 let event = WalEvent::new(
770 command.sequence(),
771 WalEventType::RunCreated { run_instance: command.run_instance().clone() },
772 );
773 let applied = AppliedMutation::RunCreate { run_id, task_id };
774 (event, applied)
775 }
776 ValidatedCommand::RunStateTransition(command) => {
777 let event = WalEvent::new(
778 command.sequence(),
779 WalEventType::RunStateChanged {
780 run_id: command.run_id(),
781 previous_state: command.previous_state(),
782 new_state: command.new_state(),
783 timestamp: command.timestamp(),
784 },
785 );
786 let applied = AppliedMutation::RunStateTransition {
787 run_id: command.run_id(),
788 previous_state: command.previous_state(),
789 new_state: command.new_state(),
790 };
791 (event, applied)
792 }
793 ValidatedCommand::AttemptStart(command) => {
794 let event = WalEvent::new(
795 command.sequence(),
796 WalEventType::AttemptStarted {
797 run_id: command.run_id(),
798 attempt_id: command.attempt_id(),
799 timestamp: command.timestamp(),
800 },
801 );
802 let applied = AppliedMutation::AttemptStart {
803 run_id: command.run_id(),
804 attempt_id: command.attempt_id(),
805 };
806 (event, applied)
807 }
808 ValidatedCommand::AttemptFinish(command) => {
809 let event = WalEvent::new(
810 command.sequence(),
811 WalEventType::AttemptFinished {
812 run_id: command.run_id(),
813 attempt_id: command.attempt_id(),
814 result: command.result(),
815 error: command.error().map(|s| s.to_string()),
816 output: command.output().map(|b| b.to_vec()),
817 timestamp: command.timestamp(),
818 },
819 );
820 let applied = AppliedMutation::AttemptFinish {
821 run_id: command.run_id(),
822 attempt_id: command.attempt_id(),
823 outcome: command.outcome().clone(),
824 };
825 (event, applied)
826 }
827 ValidatedCommand::LeaseAcquire(command) => {
828 let event = WalEvent::new(
829 command.sequence(),
830 WalEventType::LeaseAcquired {
831 run_id: command.run_id(),
832 owner: command.owner().to_string(),
833 expiry: command.expiry(),
834 timestamp: command.timestamp(),
835 },
836 );
837 let applied = AppliedMutation::LeaseAcquire {
838 run_id: command.run_id(),
839 owner: command.owner().to_string(),
840 expiry: command.expiry(),
841 };
842 (event, applied)
843 }
844 ValidatedCommand::LeaseHeartbeat(command) => {
845 let event = WalEvent::new(
846 command.sequence(),
847 WalEventType::LeaseHeartbeat {
848 run_id: command.run_id(),
849 owner: command.owner().to_string(),
850 expiry: command.expiry(),
851 timestamp: command.timestamp(),
852 },
853 );
854 let applied = AppliedMutation::LeaseHeartbeat {
855 run_id: command.run_id(),
856 owner: command.owner().to_string(),
857 expiry: command.expiry(),
858 };
859 (event, applied)
860 }
861 ValidatedCommand::LeaseExpire(command) => {
862 let event = WalEvent::new(
863 command.sequence(),
864 WalEventType::LeaseExpired {
865 run_id: command.run_id(),
866 owner: command.owner().to_string(),
867 expiry: command.expiry(),
868 timestamp: command.timestamp(),
869 },
870 );
871 let applied = AppliedMutation::LeaseExpire {
872 run_id: command.run_id(),
873 owner: command.owner().to_string(),
874 expiry: command.expiry(),
875 };
876 (event, applied)
877 }
878 ValidatedCommand::LeaseRelease(command) => {
879 let event = WalEvent::new(
880 command.sequence(),
881 WalEventType::LeaseReleased {
882 run_id: command.run_id(),
883 owner: command.owner().to_string(),
884 expiry: command.expiry(),
885 timestamp: command.timestamp(),
886 },
887 );
888 let applied = AppliedMutation::LeaseRelease {
889 run_id: command.run_id(),
890 owner: command.owner().to_string(),
891 expiry: command.expiry(),
892 };
893 (event, applied)
894 }
895 ValidatedCommand::EnginePause(command) => {
896 let event = WalEvent::new(
897 command.sequence(),
898 WalEventType::EnginePaused { timestamp: command.timestamp() },
899 );
900 (event, AppliedMutation::EnginePause)
901 }
902 ValidatedCommand::EngineResume(command) => {
903 let event = WalEvent::new(
904 command.sequence(),
905 WalEventType::EngineResumed { timestamp: command.timestamp() },
906 );
907 (event, AppliedMutation::EngineResume)
908 }
909 ValidatedCommand::TaskCancel(command) => {
910 let event = WalEvent::new(
911 command.sequence(),
912 WalEventType::TaskCanceled {
913 task_id: command.task_id(),
914 timestamp: command.timestamp(),
915 },
916 );
917 let applied = AppliedMutation::TaskCancel { task_id: command.task_id() };
918 (event, applied)
919 }
920 ValidatedCommand::DependencyDeclare(command) => {
921 let depends_on = command.depends_on().to_vec();
922 let event = WalEvent::new(
923 command.sequence(),
924 WalEventType::DependencyDeclared {
925 task_id: command.task_id(),
926 depends_on: depends_on.clone(),
927 timestamp: command.timestamp(),
928 },
929 );
930 let applied =
931 AppliedMutation::DependencyDeclare { task_id: command.task_id(), depends_on };
932 (event, applied)
933 }
934 ValidatedCommand::RunSuspend(command) => {
935 let event = WalEvent::new(
936 command.sequence(),
937 WalEventType::RunSuspended {
938 run_id: command.run_id(),
939 reason: command.reason().map(|s| s.to_string()),
940 timestamp: command.timestamp(),
941 },
942 );
943 (event, AppliedMutation::RunSuspend { run_id: command.run_id() })
944 }
945 ValidatedCommand::RunResume(command) => {
946 let event = WalEvent::new(
947 command.sequence(),
948 WalEventType::RunResumed {
949 run_id: command.run_id(),
950 timestamp: command.timestamp(),
951 },
952 );
953 (event, AppliedMutation::RunResume { run_id: command.run_id() })
954 }
955 ValidatedCommand::BudgetAllocate(command) => {
956 let event = WalEvent::new(
957 command.sequence(),
958 WalEventType::BudgetAllocated {
959 task_id: command.task_id(),
960 dimension: command.dimension(),
961 limit: command.limit(),
962 timestamp: command.timestamp(),
963 },
964 );
965 let applied = AppliedMutation::BudgetAllocate {
966 task_id: command.task_id(),
967 dimension: command.dimension(),
968 limit: command.limit(),
969 };
970 (event, applied)
971 }
972 ValidatedCommand::BudgetConsume(command) => {
973 let event = WalEvent::new(
974 command.sequence(),
975 WalEventType::BudgetConsumed {
976 task_id: command.task_id(),
977 dimension: command.dimension(),
978 amount: command.amount(),
979 timestamp: command.timestamp(),
980 },
981 );
982 let applied = AppliedMutation::BudgetConsume {
983 task_id: command.task_id(),
984 dimension: command.dimension(),
985 amount: command.amount(),
986 };
987 (event, applied)
988 }
989 ValidatedCommand::BudgetReplenish(command) => {
990 let event = WalEvent::new(
991 command.sequence(),
992 WalEventType::BudgetReplenished {
993 task_id: command.task_id(),
994 dimension: command.dimension(),
995 new_limit: command.new_limit(),
996 timestamp: command.timestamp(),
997 },
998 );
999 let applied = AppliedMutation::BudgetReplenish {
1000 task_id: command.task_id(),
1001 dimension: command.dimension(),
1002 new_limit: command.new_limit(),
1003 };
1004 (event, applied)
1005 }
1006 ValidatedCommand::SubscriptionCreate(command) => {
1007 let event = WalEvent::new(
1008 command.sequence(),
1009 WalEventType::SubscriptionCreated {
1010 subscription_id: command.subscription_id(),
1011 task_id: command.task_id(),
1012 filter: command.filter().clone(),
1013 timestamp: command.timestamp(),
1014 },
1015 );
1016 let applied = AppliedMutation::SubscriptionCreate {
1017 subscription_id: command.subscription_id(),
1018 task_id: command.task_id(),
1019 };
1020 (event, applied)
1021 }
1022 ValidatedCommand::SubscriptionCancel(command) => {
1023 let event = WalEvent::new(
1024 command.sequence(),
1025 WalEventType::SubscriptionCanceled {
1026 subscription_id: command.subscription_id(),
1027 timestamp: command.timestamp(),
1028 },
1029 );
1030 (
1031 event,
1032 AppliedMutation::SubscriptionCancel {
1033 subscription_id: command.subscription_id(),
1034 },
1035 )
1036 }
1037 ValidatedCommand::SubscriptionTrigger(command) => {
1038 let event = WalEvent::new(
1039 command.sequence(),
1040 WalEventType::SubscriptionTriggered {
1041 subscription_id: command.subscription_id(),
1042 timestamp: command.timestamp(),
1043 },
1044 );
1045 (
1046 event,
1047 AppliedMutation::SubscriptionTrigger {
1048 subscription_id: command.subscription_id(),
1049 },
1050 )
1051 }
1052 ValidatedCommand::ActorRegister(command) => {
1053 let reg = command.registration();
1054 let event = WalEvent::new(
1055 command.sequence(),
1056 WalEventType::ActorRegistered {
1057 actor_id: reg.actor_id(),
1058 identity: reg.identity().to_string(),
1059 capabilities: reg.capabilities().as_slice().to_vec(),
1060 department: reg.department().map(|d| d.as_str().to_string()),
1061 heartbeat_interval_secs: reg.heartbeat_interval_secs(),
1062 tenant_id: reg.tenant_id(),
1063 timestamp: command.timestamp(),
1064 },
1065 );
1066 (event, AppliedMutation::NoOp)
1067 }
1068 ValidatedCommand::ActorDeregister(command) => {
1069 let event = WalEvent::new(
1070 command.sequence(),
1071 WalEventType::ActorDeregistered {
1072 actor_id: command.actor_id(),
1073 timestamp: command.timestamp(),
1074 },
1075 );
1076 (event, AppliedMutation::NoOp)
1077 }
1078 ValidatedCommand::ActorHeartbeat(command) => {
1079 let event = WalEvent::new(
1080 command.sequence(),
1081 WalEventType::ActorHeartbeat {
1082 actor_id: command.actor_id(),
1083 timestamp: command.timestamp(),
1084 },
1085 );
1086 (event, AppliedMutation::NoOp)
1087 }
1088 ValidatedCommand::TenantCreate(command) => {
1089 let reg = command.registration();
1090 let event = WalEvent::new(
1091 command.sequence(),
1092 WalEventType::TenantCreated {
1093 tenant_id: reg.tenant_id(),
1094 name: reg.name().to_string(),
1095 timestamp: command.timestamp(),
1096 },
1097 );
1098 (event, AppliedMutation::NoOp)
1099 }
1100 ValidatedCommand::RoleAssign(command) => {
1101 let event = WalEvent::new(
1102 command.sequence(),
1103 WalEventType::RoleAssigned {
1104 actor_id: command.actor_id(),
1105 role: command.role().clone(),
1106 tenant_id: command.tenant_id(),
1107 timestamp: command.timestamp(),
1108 },
1109 );
1110 (event, AppliedMutation::NoOp)
1111 }
1112 ValidatedCommand::CapabilityGrant(command) => {
1113 let event = WalEvent::new(
1114 command.sequence(),
1115 WalEventType::CapabilityGranted {
1116 actor_id: command.actor_id(),
1117 capability: command.capability().clone(),
1118 tenant_id: command.tenant_id(),
1119 timestamp: command.timestamp(),
1120 },
1121 );
1122 (event, AppliedMutation::NoOp)
1123 }
1124 ValidatedCommand::CapabilityRevoke(command) => {
1125 let event = WalEvent::new(
1126 command.sequence(),
1127 WalEventType::CapabilityRevoked {
1128 actor_id: command.actor_id(),
1129 capability: command.capability().clone(),
1130 tenant_id: command.tenant_id(),
1131 timestamp: command.timestamp(),
1132 },
1133 );
1134 (event, AppliedMutation::NoOp)
1135 }
1136 ValidatedCommand::LedgerAppend(command) => {
1137 let entry = command.entry();
1138 let event = WalEvent::new(
1139 command.sequence(),
1140 WalEventType::LedgerEntryAppended {
1141 entry_id: entry.entry_id(),
1142 tenant_id: entry.tenant_id(),
1143 ledger_key: entry.ledger_key().to_string(),
1144 actor_id: entry.actor_id(),
1145 payload: entry.payload().to_vec(),
1146 timestamp: command.timestamp(),
1147 },
1148 );
1149 (event, AppliedMutation::NoOp)
1150 }
1151 }
1152 }
1153}
1154
1155impl<W: WalWriter, P: MutationProjection> MutationAuthority for StorageMutationAuthority<W, P> {
1156 type Error = MutationAuthorityError<P::Error>;
1157
1158 fn submit_command(
1159 &mut self,
1160 command: MutationCommand,
1161 durability: DurabilityPolicy,
1162 ) -> Result<MutationOutcome, Self::Error> {
1163 let validated =
1165 self.validate_command(&command).map_err(MutationAuthorityError::Validation)?;
1166
1167 let (event, applied) = Self::build_event_and_applied(validated);
1169
1170 self.wal_writer.append(&event).map_err(MutationAuthorityError::Append)?;
1172
1173 if durability == DurabilityPolicy::Immediate {
1175 if let Err(flush_error) = self.wal_writer.flush() {
1176 return Err(MutationAuthorityError::PartialDurability {
1177 sequence: event.sequence(),
1178 flush_error,
1179 });
1180 }
1181 }
1182
1183 self.projection.apply_event(&event).map_err(|source| MutationAuthorityError::Apply {
1185 sequence: event.sequence(),
1186 source,
1187 })?;
1188
1189 tracing::debug!(sequence = event.sequence(), "command submitted");
1190 Ok(MutationOutcome::new(event.sequence(), applied))
1191 }
1192}
1193
1194#[derive(Debug, Clone, PartialEq, Eq)]
1195enum ValidatedCommand {
1196 TaskCreate(TaskCreateCommand),
1197 RunCreate(RunCreateCommand),
1198 RunStateTransition(RunStateTransitionCommand),
1199 AttemptStart(AttemptStartCommand),
1200 AttemptFinish(AttemptFinishCommand),
1201 LeaseAcquire(LeaseAcquireCommand),
1202 LeaseHeartbeat(LeaseHeartbeatCommand),
1203 LeaseExpire(LeaseExpireCommand),
1204 LeaseRelease(LeaseReleaseCommand),
1205 EnginePause(EnginePauseCommand),
1206 EngineResume(EngineResumeCommand),
1207 TaskCancel(TaskCancelCommand),
1208 DependencyDeclare(DependencyDeclareCommand),
1209 RunSuspend(RunSuspendCommand),
1210 RunResume(RunResumeCommand),
1211 BudgetAllocate(BudgetAllocateCommand),
1212 BudgetConsume(BudgetConsumeCommand),
1213 BudgetReplenish(BudgetReplenishCommand),
1214 SubscriptionCreate(SubscriptionCreateCommand),
1215 SubscriptionCancel(SubscriptionCancelCommand),
1216 SubscriptionTrigger(SubscriptionTriggerCommand),
1217 ActorRegister(ActorRegisterCommand),
1218 ActorDeregister(ActorDeregisterCommand),
1219 ActorHeartbeat(ActorHeartbeatCommand),
1220 TenantCreate(TenantCreateCommand),
1221 RoleAssign(RoleAssignCommand),
1222 CapabilityGrant(CapabilityGrantCommand),
1223 CapabilityRevoke(CapabilityRevokeCommand),
1224 LedgerAppend(LedgerAppendCommand),
1225}
1226
1227#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1229pub enum LeaseValidationEvent {
1230 Acquire,
1232 Heartbeat,
1234 Expire,
1236 Release,
1238}
1239
1240struct LeaseCloseParams<'a> {
1243 sequence: u64,
1244 run_id: RunId,
1245 owner: &'a str,
1246 expiry: u64,
1247 event: LeaseValidationEvent,
1248}
1249
1250#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1252pub enum MutationValidationError {
1253 SequenceOverflow,
1255 NonMonotonicSequence {
1257 expected: u64,
1259 provided: u64,
1261 },
1262 UnknownRun {
1264 run_id: RunId,
1266 },
1267 UnknownTask {
1269 task_id: TaskId,
1271 },
1272 TaskAlreadyExists {
1274 task_id: TaskId,
1276 },
1277 TaskAlreadyCanceled {
1279 task_id: TaskId,
1281 },
1282 EngineAlreadyPaused,
1284 EngineNotPaused,
1286 RunAlreadyExists {
1288 run_id: RunId,
1290 },
1291 RunCreateRequiresScheduled {
1293 run_id: RunId,
1295 state: RunState,
1297 },
1298 PreviousStateMismatch {
1300 run_id: RunId,
1302 expected: RunState,
1304 actual: RunState,
1306 },
1307 InvalidTransition {
1309 run_id: RunId,
1311 from: RunState,
1313 to: RunState,
1315 },
1316 AttemptStartUnknownRun {
1318 run_id: RunId,
1320 },
1321 AttemptStartRequiresRunning {
1323 run_id: RunId,
1325 state: RunState,
1327 },
1328 AttemptStartAlreadyActive {
1330 run_id: RunId,
1332 active_attempt_id: AttemptId,
1334 },
1335 AttemptFinishUnknownRun {
1337 run_id: RunId,
1339 },
1340 AttemptFinishRequiresRunning {
1342 run_id: RunId,
1344 state: RunState,
1346 },
1347 AttemptFinishMissingActive {
1349 run_id: RunId,
1351 },
1352 AttemptFinishAttemptMismatch {
1354 run_id: RunId,
1356 expected_attempt_id: AttemptId,
1358 provided_attempt_id: AttemptId,
1360 },
1361 LeaseUnknownRun {
1363 run_id: RunId,
1365 event: LeaseValidationEvent,
1367 },
1368 LeaseInvalidRunState {
1370 run_id: RunId,
1372 event: LeaseValidationEvent,
1374 state: RunState,
1376 },
1377 LeaseAlreadyActive {
1379 run_id: RunId,
1381 },
1382 LeaseMissingActive {
1384 run_id: RunId,
1386 event: LeaseValidationEvent,
1388 },
1389 LeaseOwnerMismatch {
1391 run_id: RunId,
1393 event: LeaseValidationEvent,
1395 },
1396 LeaseExpiryMismatch {
1398 run_id: RunId,
1400 event: LeaseValidationEvent,
1402 expected_expiry: u64,
1404 provided_expiry: u64,
1406 },
1407 LeaseHeartbeatExpiryRegression {
1409 run_id: RunId,
1411 previous_expiry: u64,
1413 proposed_expiry: u64,
1415 },
1416 EmptyDependencyList {
1418 task_id: TaskId,
1420 },
1421 RunSuspendRequiresRunning {
1423 run_id: RunId,
1425 state: RunState,
1427 },
1428 RunResumeRequiresSuspended {
1430 run_id: RunId,
1432 state: RunState,
1434 },
1435 BudgetNotAllocated {
1437 task_id: TaskId,
1439 dimension: BudgetDimension,
1441 },
1442 SubscriptionAlreadyExists {
1444 subscription_id: SubscriptionId,
1446 },
1447 UnknownSubscription {
1449 subscription_id: SubscriptionId,
1451 },
1452 SubscriptionAlreadyCanceled {
1454 subscription_id: SubscriptionId,
1456 },
1457}
1458
1459impl std::fmt::Display for MutationValidationError {
1460 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1461 match self {
1462 MutationValidationError::SequenceOverflow => {
1463 write!(f, "mutation sequence overflow while computing next expected sequence")
1464 }
1465 MutationValidationError::NonMonotonicSequence { expected, provided } => {
1466 write!(
1467 f,
1468 "mutation sequence rejected: expected next sequence {expected}, received \
1469 {provided}"
1470 )
1471 }
1472 MutationValidationError::UnknownRun { run_id } => {
1473 write!(f, "mutation rejected: unknown run {run_id}")
1474 }
1475 MutationValidationError::UnknownTask { task_id } => {
1476 write!(f, "mutation rejected: unknown task {task_id}")
1477 }
1478 MutationValidationError::TaskAlreadyExists { task_id } => {
1479 write!(f, "mutation rejected: task {task_id} already exists")
1480 }
1481 MutationValidationError::TaskAlreadyCanceled { task_id } => {
1482 write!(f, "mutation rejected: task {task_id} already canceled")
1483 }
1484 MutationValidationError::EngineAlreadyPaused => {
1485 write!(f, "mutation rejected: engine already paused")
1486 }
1487 MutationValidationError::EngineNotPaused => {
1488 write!(f, "mutation rejected: engine not paused")
1489 }
1490 MutationValidationError::RunAlreadyExists { run_id } => {
1491 write!(f, "mutation rejected: run {run_id} already exists")
1492 }
1493 MutationValidationError::RunCreateRequiresScheduled { run_id, state } => {
1494 write!(
1495 f,
1496 "mutation rejected for run {run_id}: run creation requires Scheduled state, \
1497 got {state:?}"
1498 )
1499 }
1500 MutationValidationError::PreviousStateMismatch { run_id, expected, actual } => {
1501 write!(
1502 f,
1503 "mutation rejected for run {run_id}: previous_state mismatch \
1504 expected={expected:?} actual={actual:?}"
1505 )
1506 }
1507 MutationValidationError::InvalidTransition { run_id, from, to } => {
1508 write!(
1509 f,
1510 "mutation rejected for run {run_id}: invalid transition {from:?} -> {to:?}"
1511 )
1512 }
1513 MutationValidationError::AttemptStartUnknownRun { run_id } => {
1514 write!(f, "attempt-start rejected: unknown run {run_id}")
1515 }
1516 MutationValidationError::AttemptStartRequiresRunning { run_id, state } => {
1517 write!(
1518 f,
1519 "attempt-start rejected for run {run_id}: run must be Running, observed \
1520 {state:?}"
1521 )
1522 }
1523 MutationValidationError::AttemptStartAlreadyActive { run_id, active_attempt_id } => {
1524 write!(
1525 f,
1526 "attempt-start rejected for run {run_id}: active attempt {active_attempt_id} \
1527 already exists"
1528 )
1529 }
1530 MutationValidationError::AttemptFinishUnknownRun { run_id } => {
1531 write!(f, "attempt-finish rejected: unknown run {run_id}")
1532 }
1533 MutationValidationError::AttemptFinishRequiresRunning { run_id, state } => {
1534 write!(
1535 f,
1536 "attempt-finish rejected for run {run_id}: run must be Running, observed \
1537 {state:?}"
1538 )
1539 }
1540 MutationValidationError::AttemptFinishMissingActive { run_id } => {
1541 write!(f, "attempt-finish rejected for run {run_id}: no active attempt present")
1542 }
1543 MutationValidationError::AttemptFinishAttemptMismatch {
1544 run_id,
1545 expected_attempt_id,
1546 provided_attempt_id,
1547 } => {
1548 write!(
1549 f,
1550 "attempt-finish rejected for run {run_id}: attempt mismatch \
1551 expected={expected_attempt_id} provided={provided_attempt_id}"
1552 )
1553 }
1554 MutationValidationError::LeaseUnknownRun { run_id, event } => {
1555 write!(f, "{event} rejected: unknown run {run_id}")
1556 }
1557 MutationValidationError::LeaseInvalidRunState { run_id, event, state } => {
1558 write!(f, "{event} rejected for run {run_id}: invalid state {state:?}")
1559 }
1560 MutationValidationError::LeaseAlreadyActive { run_id } => {
1561 write!(f, "lease acquire rejected for run {run_id}: lease already active")
1562 }
1563 MutationValidationError::LeaseMissingActive { run_id, event } => {
1564 write!(f, "{event} rejected for run {run_id}: missing active lease")
1565 }
1566 MutationValidationError::LeaseOwnerMismatch { run_id, event } => {
1567 write!(f, "{event} rejected for run {run_id}: owner mismatch")
1568 }
1569 MutationValidationError::LeaseExpiryMismatch {
1570 run_id,
1571 event,
1572 expected_expiry,
1573 provided_expiry,
1574 } => {
1575 write!(
1576 f,
1577 "{event} rejected for run {run_id}: expiry mismatch \
1578 expected={expected_expiry} provided={provided_expiry}"
1579 )
1580 }
1581 MutationValidationError::LeaseHeartbeatExpiryRegression {
1582 run_id,
1583 previous_expiry,
1584 proposed_expiry,
1585 } => {
1586 write!(
1587 f,
1588 "lease heartbeat rejected for run {run_id}: expiry regression \
1589 previous={previous_expiry} proposed={proposed_expiry}"
1590 )
1591 }
1592 MutationValidationError::EmptyDependencyList { task_id } => {
1593 write!(
1594 f,
1595 "dependency declaration rejected for task {task_id}: depends_on list is empty"
1596 )
1597 }
1598 MutationValidationError::RunSuspendRequiresRunning { run_id, state } => {
1599 write!(
1600 f,
1601 "run-suspend rejected for run {run_id}: run must be Running, observed \
1602 {state:?}"
1603 )
1604 }
1605 MutationValidationError::RunResumeRequiresSuspended { run_id, state } => {
1606 write!(
1607 f,
1608 "run-resume rejected for run {run_id}: run must be Suspended, observed \
1609 {state:?}"
1610 )
1611 }
1612 MutationValidationError::BudgetNotAllocated { task_id, dimension } => {
1613 write!(
1614 f,
1615 "budget-replenish rejected for task {task_id}: no allocation exists for \
1616 dimension {dimension}"
1617 )
1618 }
1619 MutationValidationError::SubscriptionAlreadyExists { subscription_id } => {
1620 write!(
1621 f,
1622 "subscription-create rejected: subscription {subscription_id} already exists"
1623 )
1624 }
1625 MutationValidationError::UnknownSubscription { subscription_id } => {
1626 write!(f, "mutation rejected: unknown subscription {subscription_id}")
1627 }
1628 MutationValidationError::SubscriptionAlreadyCanceled { subscription_id } => {
1629 write!(
1630 f,
1631 "subscription-cancel rejected: subscription {subscription_id} already canceled"
1632 )
1633 }
1634 }
1635 }
1636}
1637
1638impl std::fmt::Display for LeaseValidationEvent {
1639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1640 match self {
1641 LeaseValidationEvent::Acquire => write!(f, "lease acquire"),
1642 LeaseValidationEvent::Heartbeat => write!(f, "lease heartbeat"),
1643 LeaseValidationEvent::Expire => write!(f, "lease expire"),
1644 LeaseValidationEvent::Release => write!(f, "lease release"),
1645 }
1646 }
1647}
1648
1649impl std::error::Error for MutationValidationError {}
1650
1651#[derive(Debug, Clone, PartialEq, Eq)]
1653pub enum MutationAuthorityError<ProjectionError> {
1654 Validation(MutationValidationError),
1656 Append(WalWriterError),
1658 PartialDurability {
1661 sequence: u64,
1663 flush_error: WalWriterError,
1665 },
1666 Apply {
1668 sequence: u64,
1670 source: ProjectionError,
1672 },
1673}
1674
1675impl<ProjectionError: std::fmt::Display> std::fmt::Display
1676 for MutationAuthorityError<ProjectionError>
1677{
1678 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1679 match self {
1680 MutationAuthorityError::Validation(error) => {
1681 write!(f, "mutation validation failed: {error}")
1682 }
1683 MutationAuthorityError::Append(error) => {
1684 write!(f, "mutation append stage failed: {error}")
1685 }
1686 MutationAuthorityError::PartialDurability { sequence, flush_error } => {
1687 write!(
1688 f,
1689 "mutation partial durability: append succeeded at sequence {sequence} but \
1690 flush failed: {flush_error}"
1691 )
1692 }
1693 MutationAuthorityError::Apply { sequence, source } => {
1694 write!(
1695 f,
1696 "mutation apply stage failed after durable append sequence {sequence}: \
1697 {source}"
1698 )
1699 }
1700 }
1701 }
1702}
1703
1704impl<ProjectionError: std::error::Error + 'static> std::error::Error
1705 for MutationAuthorityError<ProjectionError>
1706{
1707}
1708
1709#[cfg(test)]
1710mod tests {
1711 use actionqueue_core::ids::{RunId, TaskId};
1712 use actionqueue_core::task::constraints::TaskConstraints;
1713 use actionqueue_core::task::metadata::TaskMetadata;
1714 use actionqueue_core::task::run_policy::RunPolicy;
1715 use actionqueue_core::task::task_spec::{TaskPayload, TaskSpec};
1716
1717 use super::*;
1718
1719 #[derive(Debug, Default)]
1720 struct ProjectionStub {
1721 latest_sequence: u64,
1722 tasks: std::collections::HashSet<TaskId>,
1723 canceled_tasks: std::collections::HashSet<TaskId>,
1724 engine_paused: bool,
1725 runs: std::collections::HashMap<RunId, RunState>,
1726 active_attempts: std::collections::HashMap<RunId, AttemptId>,
1727 active_leases: std::collections::HashMap<RunId, (String, u64)>,
1728 }
1729
1730 impl MutationProjection for ProjectionStub {
1731 type Error = &'static str;
1732
1733 fn latest_sequence(&self) -> u64 {
1734 self.latest_sequence
1735 }
1736
1737 fn run_state(&self, run_id: &RunId) -> Option<RunState> {
1738 self.runs.get(run_id).copied()
1739 }
1740
1741 fn task_exists(&self, task_id: TaskId) -> bool {
1742 self.tasks.contains(&task_id)
1743 }
1744
1745 fn is_task_canceled(&self, task_id: TaskId) -> bool {
1746 self.canceled_tasks.contains(&task_id)
1747 }
1748
1749 fn is_engine_paused(&self) -> bool {
1750 self.engine_paused
1751 }
1752
1753 fn active_attempt_id(&self, run_id: &RunId) -> Option<AttemptId> {
1754 self.active_attempts.get(run_id).copied()
1755 }
1756
1757 fn active_lease(&self, run_id: &RunId) -> Option<(String, u64)> {
1758 self.active_leases.get(run_id).cloned()
1759 }
1760
1761 fn apply_event(&mut self, event: &WalEvent) -> Result<(), Self::Error> {
1762 self.latest_sequence = event.sequence();
1763 match event.event() {
1764 WalEventType::TaskCreated { task_spec, .. } => {
1765 self.tasks.insert(task_spec.id());
1766 }
1767 WalEventType::RunCreated { run_instance } => {
1768 self.runs.insert(run_instance.id(), run_instance.state());
1769 }
1770 WalEventType::RunStateChanged { run_id, new_state, .. } => {
1771 self.runs.insert(*run_id, *new_state);
1772 }
1773 WalEventType::AttemptStarted { run_id, attempt_id, .. } => {
1774 self.active_attempts.insert(*run_id, *attempt_id);
1775 }
1776 WalEventType::AttemptFinished { run_id, .. } => {
1777 self.active_attempts.remove(run_id);
1778 }
1779 WalEventType::LeaseAcquired { run_id, owner, expiry, .. } => {
1780 self.active_leases.insert(*run_id, (owner.clone(), *expiry));
1781 }
1782 WalEventType::LeaseHeartbeat { run_id, owner, expiry, .. } => {
1783 self.active_leases.insert(*run_id, (owner.clone(), *expiry));
1784 }
1785 WalEventType::LeaseExpired { run_id, .. }
1786 | WalEventType::LeaseReleased { run_id, .. } => {
1787 self.active_leases.remove(run_id);
1788 }
1789 WalEventType::TaskCanceled { task_id, .. } => {
1790 self.canceled_tasks.insert(*task_id);
1791 }
1792 WalEventType::EnginePaused { .. } => {
1793 self.engine_paused = true;
1794 }
1795 WalEventType::EngineResumed { .. } => {
1796 self.engine_paused = false;
1797 }
1798 WalEventType::DependencyDeclared { .. } => {
1799 }
1802 _ => {
1803 }
1806 }
1807 Ok(())
1808 }
1809 }
1810
1811 #[derive(Debug, Default)]
1812 struct WriterStub {
1813 events: Vec<WalEvent>,
1814 fail_append: bool,
1815 fail_flush: bool,
1816 }
1817
1818 impl WalWriter for WriterStub {
1819 fn append(&mut self, event: &WalEvent) -> Result<(), WalWriterError> {
1820 if self.fail_append {
1821 return Err(WalWriterError::IoError("append failed".to_string()));
1822 }
1823 self.events.push(event.clone());
1824 Ok(())
1825 }
1826
1827 fn flush(&mut self) -> Result<(), WalWriterError> {
1828 if self.fail_flush {
1829 return Err(WalWriterError::IoError("flush failed".to_string()));
1830 }
1831 Ok(())
1832 }
1833
1834 fn close(self) -> Result<(), WalWriterError> {
1835 Ok(())
1836 }
1837 }
1838
1839 fn test_task_spec(task_id: TaskId) -> TaskSpec {
1840 TaskSpec::new(
1841 task_id,
1842 TaskPayload::with_content_type(b"payload".to_vec(), "application/octet-stream"),
1843 RunPolicy::Once,
1844 TaskConstraints::default(),
1845 TaskMetadata::default(),
1846 )
1847 .expect("test task spec should be valid")
1848 }
1849
1850 #[test]
1851 fn submits_task_create_then_run_create_then_transition() {
1852 let task_id = TaskId::new();
1853 let run =
1854 actionqueue_core::run::run_instance::RunInstance::new_scheduled(task_id, 100, 100)
1855 .expect("test run should be valid");
1856
1857 let writer = WriterStub::default();
1858 let projection = ProjectionStub::default();
1859 let mut authority = StorageMutationAuthority::new(writer, projection);
1860
1861 let task_outcome = authority
1862 .submit_command(
1863 MutationCommand::TaskCreate(TaskCreateCommand::new(1, test_task_spec(task_id), 10)),
1864 DurabilityPolicy::Immediate,
1865 )
1866 .expect("task create should succeed");
1867 assert_eq!(task_outcome.sequence(), 1);
1868
1869 let run_outcome = authority
1870 .submit_command(
1871 MutationCommand::RunCreate(RunCreateCommand::new(2, run.clone())),
1872 DurabilityPolicy::Immediate,
1873 )
1874 .expect("run create should succeed");
1875 assert_eq!(run_outcome.sequence(), 2);
1876
1877 let transition_outcome = authority
1878 .submit_command(
1879 MutationCommand::RunStateTransition(RunStateTransitionCommand::new(
1880 3,
1881 run.id(),
1882 RunState::Scheduled,
1883 RunState::Ready,
1884 100,
1885 )),
1886 DurabilityPolicy::Immediate,
1887 )
1888 .expect("transition should succeed");
1889 assert_eq!(transition_outcome.sequence(), 3);
1890
1891 let (_writer, projection) = authority.into_parts();
1892 assert!(projection.task_exists(task_id));
1893 assert_eq!(projection.run_state(&run.id()), Some(RunState::Ready));
1894 }
1895
1896 #[test]
1897 fn partial_durability_when_append_ok_flush_err() {
1898 let task_id = TaskId::new();
1899 let writer = WriterStub { fail_flush: true, ..Default::default() };
1900 let projection = ProjectionStub::default();
1901 let mut authority = StorageMutationAuthority::new(writer, projection);
1902
1903 let result = authority.submit_command(
1904 MutationCommand::TaskCreate(TaskCreateCommand::new(1, test_task_spec(task_id), 10)),
1905 DurabilityPolicy::Immediate,
1906 );
1907
1908 match result {
1909 Err(MutationAuthorityError::PartialDurability { sequence, flush_error }) => {
1910 assert_eq!(sequence, 1);
1911 assert_eq!(flush_error, WalWriterError::IoError("flush failed".to_string()));
1912 }
1913 other => panic!("expected PartialDurability, got {other:?}"),
1914 }
1915 }
1916}