1use meerkat_core::lifecycle::RunId;
24
25use crate::runtime_state::{RuntimeState, RuntimeStateTransitionError};
26
27#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum RuntimeControlInput {
37 Initialize,
38 AttachExecutor,
39 DetachExecutor,
40 BeginRun {
41 run_id: RunId,
42 },
43 RunCompleted {
44 run_id: RunId,
45 },
46 RunFailed {
47 run_id: RunId,
48 },
49 RunCancelled {
50 run_id: RunId,
51 },
52 SubmitWork {
53 work_id: String,
54 },
55 AdmissionAccepted {
56 work_id: String,
57 handling_mode: HandlingMode,
58 },
59 AdmissionRejected {
60 work_id: String,
61 reason: String,
62 },
63 AdmissionDeduplicated {
64 work_id: String,
65 existing_work_id: String,
66 },
67 RecoverRequested,
68 RecoverySucceeded,
69 RetireRequested,
70 ResetRequested,
71 StopRequested,
72 DestroyRequested,
73 ResumeRequested,
74 ExternalToolDeltaReceived,
75 RecycleRequested,
76 RecycleSucceeded,
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub enum HandlingMode {
82 Queue,
83 Steer,
84}
85
86#[derive(Debug, Clone, PartialEq, Eq)]
95pub enum RuntimeControlEffect {
96 ResolveAdmission {
97 work_id: String,
98 },
99 SubmitAdmittedIngressEffect {
100 work_id: String,
101 handling_mode: HandlingMode,
102 },
103 SubmitRunPrimitive {
104 run_id: RunId,
105 },
106 SignalWake,
107 SignalImmediateProcess,
108 EmitRuntimeNotice {
109 kind: String,
110 detail: String,
111 },
112 ResolveCompletionAsTerminated {
113 reason: String,
114 },
115 ApplyControlPlaneCommand {
116 command: String,
117 },
118 InitiateRecycle,
119}
120
121#[derive(Debug)]
127pub struct RuntimeControlTransition {
128 pub from_phase: RuntimeState,
130 pub next_phase: RuntimeState,
132 pub effects: Vec<RuntimeControlEffect>,
134}
135
136#[derive(Debug, Clone)]
142struct RuntimeControlFields {
143 current_run_id: Option<RunId>,
144 pre_run_state: Option<RuntimeState>,
145 wake_pending: bool,
146 process_pending: bool,
147}
148
149mod sealed {
154 pub trait Sealed {}
155}
156
157pub trait RuntimeControlMutator: sealed::Sealed {
163 fn apply(
168 &mut self,
169 input: RuntimeControlInput,
170 ) -> Result<RuntimeControlTransition, RuntimeStateTransitionError>;
171}
172
173#[derive(Debug, Clone)]
182pub struct RuntimeControlAuthority {
183 phase: RuntimeState,
185 fields: RuntimeControlFields,
187}
188
189impl sealed::Sealed for RuntimeControlAuthority {}
190
191impl RuntimeControlAuthority {
192 pub fn new() -> Self {
194 Self {
195 phase: RuntimeState::Initializing,
196 fields: RuntimeControlFields {
197 current_run_id: None,
198 pre_run_state: None,
199 wake_pending: false,
200 process_pending: false,
201 },
202 }
203 }
204
205 pub fn from_state(phase: RuntimeState) -> Self {
207 Self {
208 phase,
209 fields: RuntimeControlFields {
210 current_run_id: None,
211 pre_run_state: None,
212 wake_pending: false,
213 process_pending: false,
214 },
215 }
216 }
217
218 pub fn phase(&self) -> RuntimeState {
220 self.phase
221 }
222
223 pub fn current_run_id(&self) -> Option<&RunId> {
225 self.fields.current_run_id.as_ref()
226 }
227
228 pub fn pre_run_state(&self) -> Option<RuntimeState> {
230 self.fields.pre_run_state
231 }
232
233 pub fn wake_pending(&self) -> bool {
235 self.fields.wake_pending
236 }
237
238 pub fn process_pending(&self) -> bool {
240 self.fields.process_pending
241 }
242
243 pub fn is_idle(&self) -> bool {
245 self.phase == RuntimeState::Idle
246 }
247
248 pub fn is_attached(&self) -> bool {
250 self.phase == RuntimeState::Attached
251 }
252
253 pub fn is_running(&self) -> bool {
255 self.phase == RuntimeState::Running
256 }
257
258 pub fn can_process_queue(&self) -> bool {
260 self.phase.can_process_queue()
261 }
262
263 pub fn can_accept(&self, input: &RuntimeControlInput) -> bool {
265 self.evaluate(input).is_ok()
266 }
267
268 fn evaluate(
270 &self,
271 input: &RuntimeControlInput,
272 ) -> Result<
273 (
274 RuntimeState,
275 RuntimeControlFields,
276 Vec<RuntimeControlEffect>,
277 ),
278 RuntimeStateTransitionError,
279 > {
280 use RuntimeControlInput::{
281 AdmissionAccepted, AdmissionDeduplicated, AdmissionRejected, AttachExecutor, BeginRun,
282 DestroyRequested, DetachExecutor, ExternalToolDeltaReceived, Initialize,
283 RecoverRequested, RecoverySucceeded, RecycleRequested, RecycleSucceeded,
284 ResetRequested, ResumeRequested, RetireRequested, RunCancelled, RunCompleted,
285 RunFailed, StopRequested, SubmitWork,
286 };
287 use RuntimeState::{
288 Attached, Destroyed, Idle, Initializing, Recovering, Retired, Running, Stopped,
289 };
290
291 let phase = self.phase;
292 let mut fields = self.fields.clone();
293 let mut effects = Vec::new();
294
295 let next_phase = match (phase, input) {
296 (Initializing, Initialize) => Idle,
298
299 (Idle, AttachExecutor) => Attached,
301
302 (Attached, DetachExecutor) => Idle,
304
305 (Idle, BeginRun { run_id }) => {
307 if fields.current_run_id.is_some() {
308 return Err(RuntimeStateTransitionError {
309 from: phase,
310 to: Running,
311 });
312 }
313 fields.current_run_id = Some(run_id.clone());
314 fields.pre_run_state = Some(Idle);
315 fields.wake_pending = false;
316 fields.process_pending = false;
317 effects.push(RuntimeControlEffect::SubmitRunPrimitive {
318 run_id: run_id.clone(),
319 });
320 Running
321 }
322
323 (Attached, BeginRun { run_id }) => {
325 if fields.current_run_id.is_some() {
326 return Err(RuntimeStateTransitionError {
327 from: phase,
328 to: Running,
329 });
330 }
331 fields.current_run_id = Some(run_id.clone());
332 fields.pre_run_state = Some(Attached);
333 fields.wake_pending = false;
334 fields.process_pending = false;
335 effects.push(RuntimeControlEffect::SubmitRunPrimitive {
336 run_id: run_id.clone(),
337 });
338 Running
339 }
340
341 (Retired, BeginRun { run_id }) => {
343 if fields.current_run_id.is_some() {
344 return Err(RuntimeStateTransitionError {
345 from: phase,
346 to: Running,
347 });
348 }
349 fields.current_run_id = Some(run_id.clone());
350 fields.pre_run_state = Some(Retired);
351 fields.wake_pending = false;
352 fields.process_pending = false;
353 effects.push(RuntimeControlEffect::SubmitRunPrimitive {
354 run_id: run_id.clone(),
355 });
356 Running
357 }
358
359 (Recovering, BeginRun { run_id }) => {
361 if fields.current_run_id.is_some() {
362 return Err(RuntimeStateTransitionError {
363 from: phase,
364 to: Running,
365 });
366 }
367 fields.current_run_id = Some(run_id.clone());
368 fields.pre_run_state = Some(Recovering);
369 fields.wake_pending = false;
370 fields.process_pending = false;
371 effects.push(RuntimeControlEffect::SubmitRunPrimitive {
372 run_id: run_id.clone(),
373 });
374 Running
375 }
376
377 (Running, RunCompleted { run_id }) => {
379 self.validate_run_terminal(run_id, &fields)?;
380 let target = self.resolve_run_return(&fields);
381 fields.current_run_id = None;
382 fields.pre_run_state = None;
383 target
384 }
385 (Retired, RunCompleted { run_id }) => {
387 self.validate_run_terminal(run_id, &fields)?;
388 fields.current_run_id = None;
389 fields.pre_run_state = None;
390 Retired
391 }
392
393 (Running, RunFailed { run_id }) => {
395 self.validate_run_terminal(run_id, &fields)?;
396 let target = self.resolve_run_return(&fields);
397 fields.current_run_id = None;
398 fields.pre_run_state = None;
399 target
400 }
401 (Retired, RunFailed { run_id }) => {
403 self.validate_run_terminal(run_id, &fields)?;
404 fields.current_run_id = None;
405 fields.pre_run_state = None;
406 Retired
407 }
408
409 (Running, RunCancelled { run_id }) => {
411 self.validate_run_terminal(run_id, &fields)?;
412 let target = self.resolve_run_return(&fields);
413 fields.current_run_id = None;
414 fields.pre_run_state = None;
415 target
416 }
417 (Retired, RunCancelled { run_id }) => {
419 self.validate_run_terminal(run_id, &fields)?;
420 fields.current_run_id = None;
421 fields.pre_run_state = None;
422 Retired
423 }
424
425 (Idle, SubmitWork { work_id }) => {
427 effects.push(RuntimeControlEffect::ResolveAdmission {
428 work_id: work_id.clone(),
429 });
430 Idle
431 }
432 (Running, SubmitWork { work_id }) => {
433 effects.push(RuntimeControlEffect::ResolveAdmission {
434 work_id: work_id.clone(),
435 });
436 Running
437 }
438 (Attached, SubmitWork { work_id }) => {
439 effects.push(RuntimeControlEffect::ResolveAdmission {
440 work_id: work_id.clone(),
441 });
442 Attached
443 }
444
445 (
447 Idle,
448 AdmissionAccepted {
449 work_id,
450 handling_mode,
451 },
452 ) => {
453 let process_immediately = *handling_mode == HandlingMode::Steer;
454 fields.wake_pending = true;
456 fields.process_pending = process_immediately;
457 effects.push(RuntimeControlEffect::SubmitAdmittedIngressEffect {
458 work_id: work_id.clone(),
459 handling_mode: *handling_mode,
460 });
461 effects.push(RuntimeControlEffect::SignalWake);
462 if process_immediately {
463 effects.push(RuntimeControlEffect::SignalImmediateProcess);
464 }
465 Idle
466 }
467 (
468 Attached,
469 AdmissionAccepted {
470 work_id,
471 handling_mode,
472 },
473 ) => {
474 let process_immediately = *handling_mode == HandlingMode::Steer;
475 fields.wake_pending = true;
477 fields.process_pending = process_immediately;
478 effects.push(RuntimeControlEffect::SubmitAdmittedIngressEffect {
479 work_id: work_id.clone(),
480 handling_mode: *handling_mode,
481 });
482 effects.push(RuntimeControlEffect::SignalWake);
483 if process_immediately {
484 effects.push(RuntimeControlEffect::SignalImmediateProcess);
485 }
486 Attached
487 }
488 (
489 Running,
490 AdmissionAccepted {
491 work_id,
492 handling_mode,
493 },
494 ) => {
495 let process_immediately = *handling_mode == HandlingMode::Steer;
496 fields.wake_pending = process_immediately;
498 fields.process_pending = process_immediately;
499 effects.push(RuntimeControlEffect::SubmitAdmittedIngressEffect {
500 work_id: work_id.clone(),
501 handling_mode: *handling_mode,
502 });
503 if process_immediately {
504 effects.push(RuntimeControlEffect::SignalWake);
505 effects.push(RuntimeControlEffect::SignalImmediateProcess);
506 }
507 Running
508 }
509
510 (Idle, AdmissionRejected { reason, .. }) => {
512 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
513 kind: "AdmissionRejected".into(),
514 detail: reason.clone(),
515 });
516 Idle
517 }
518 (Running, AdmissionRejected { reason, .. }) => {
519 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
520 kind: "AdmissionRejected".into(),
521 detail: reason.clone(),
522 });
523 Running
524 }
525 (Attached, AdmissionRejected { reason, .. }) => {
526 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
527 kind: "AdmissionRejected".into(),
528 detail: reason.clone(),
529 });
530 Attached
531 }
532
533 (Idle, AdmissionDeduplicated { .. }) => {
535 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
536 kind: "AdmissionDeduplicated".into(),
537 detail: "ExistingInputLinked".into(),
538 });
539 Idle
540 }
541 (Running, AdmissionDeduplicated { .. }) => {
542 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
543 kind: "AdmissionDeduplicated".into(),
544 detail: "ExistingInputLinked".into(),
545 });
546 Running
547 }
548 (Attached, AdmissionDeduplicated { .. }) => {
549 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
550 kind: "AdmissionDeduplicated".into(),
551 detail: "ExistingInputLinked".into(),
552 });
553 Attached
554 }
555
556 (Idle, RecoverRequested) => {
558 fields.pre_run_state = Some(Idle);
559 Recovering
560 }
561 (Running, RecoverRequested) => {
563 fields.current_run_id = None;
564 fields.pre_run_state = Some(Running);
565 Recovering
566 }
567 (Attached, RecoverRequested) => {
569 fields.pre_run_state = Some(Attached);
570 Recovering
571 }
572
573 (Recovering, RecoverySucceeded) => {
575 fields.current_run_id = None;
576 fields.pre_run_state = None;
577 fields.wake_pending = false;
578 fields.process_pending = false;
579 Idle
580 }
581
582 (Idle, RetireRequested) => {
584 effects.push(RuntimeControlEffect::ApplyControlPlaneCommand {
585 command: "Retire".into(),
586 });
587 Retired
588 }
589 (Running, RetireRequested) => {
590 effects.push(RuntimeControlEffect::ApplyControlPlaneCommand {
591 command: "Retire".into(),
592 });
593 Retired
594 }
595 (Attached, RetireRequested) => {
596 effects.push(RuntimeControlEffect::ApplyControlPlaneCommand {
597 command: "Retire".into(),
598 });
599 Retired
600 }
601
602 (Initializing | Idle | Attached | Recovering | Retired, ResetRequested) => {
604 fields.current_run_id = None;
605 fields.pre_run_state = None;
606 fields.wake_pending = false;
607 fields.process_pending = false;
608 effects.push(RuntimeControlEffect::ApplyControlPlaneCommand {
609 command: "Reset".into(),
610 });
611 effects.push(RuntimeControlEffect::ResolveCompletionAsTerminated {
612 reason: "Reset".into(),
613 });
614 Idle
615 }
616
617 (Initializing | Idle | Attached | Running | Recovering | Retired, StopRequested) => {
619 fields.current_run_id = None;
620 fields.pre_run_state = None;
621 effects.push(RuntimeControlEffect::ApplyControlPlaneCommand {
622 command: "Stop".into(),
623 });
624 effects.push(RuntimeControlEffect::ResolveCompletionAsTerminated {
625 reason: "Stopped".into(),
626 });
627 Stopped
628 }
629
630 (
632 Initializing | Idle | Attached | Running | Recovering | Retired | Stopped,
633 DestroyRequested,
634 ) => {
635 fields.current_run_id = None;
636 fields.pre_run_state = None;
637 effects.push(RuntimeControlEffect::ApplyControlPlaneCommand {
638 command: "Destroy".into(),
639 });
640 effects.push(RuntimeControlEffect::ResolveCompletionAsTerminated {
641 reason: "Destroyed".into(),
642 });
643 Destroyed
644 }
645
646 (Recovering, ResumeRequested) => Idle,
648
649 (Idle, ExternalToolDeltaReceived) => {
651 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
652 kind: "ExternalToolDelta".into(),
653 detail: "Received".into(),
654 });
655 Idle
656 }
657 (Running, ExternalToolDeltaReceived) => {
658 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
659 kind: "ExternalToolDelta".into(),
660 detail: "Received".into(),
661 });
662 Running
663 }
664 (Recovering, ExternalToolDeltaReceived) => {
665 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
666 kind: "ExternalToolDelta".into(),
667 detail: "Received".into(),
668 });
669 Recovering
670 }
671 (Retired, ExternalToolDeltaReceived) => {
672 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
673 kind: "ExternalToolDelta".into(),
674 detail: "Received".into(),
675 });
676 Retired
677 }
678 (Attached, ExternalToolDeltaReceived) => {
679 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
680 kind: "ExternalToolDelta".into(),
681 detail: "Received".into(),
682 });
683 Attached
684 }
685
686 (Retired | Idle | Attached, RecycleRequested) => {
688 if fields.current_run_id.is_some() {
689 return Err(RuntimeStateTransitionError {
690 from: phase,
691 to: Recovering,
692 });
693 }
694 fields.pre_run_state = Some(phase);
695 effects.push(RuntimeControlEffect::InitiateRecycle);
696 Recovering
697 }
698
699 (Recovering, RecycleSucceeded) => {
701 fields.current_run_id = None;
702 fields.pre_run_state = None;
703 fields.wake_pending = false;
704 fields.process_pending = false;
705 effects.push(RuntimeControlEffect::EmitRuntimeNotice {
706 kind: "Recycle".into(),
707 detail: "Succeeded".into(),
708 });
709 Idle
710 }
711
712 _ => {
714 return Err(RuntimeStateTransitionError {
715 from: phase,
716 to: self.infer_target(input),
717 });
718 }
719 };
720
721 Ok((next_phase, fields, effects))
722 }
723
724 fn validate_run_terminal(
726 &self,
727 run_id: &RunId,
728 fields: &RuntimeControlFields,
729 ) -> Result<(), RuntimeStateTransitionError> {
730 match &fields.current_run_id {
731 Some(active_id) if active_id == run_id => Ok(()),
732 _ => Err(RuntimeStateTransitionError {
733 from: self.phase,
734 to: self.resolve_run_return(fields),
735 }),
736 }
737 }
738
739 fn resolve_run_return(&self, fields: &RuntimeControlFields) -> RuntimeState {
745 match fields.pre_run_state {
746 Some(RuntimeState::Retired) => RuntimeState::Retired,
747 Some(RuntimeState::Attached) => RuntimeState::Attached,
748 _ => RuntimeState::Idle,
749 }
750 }
751
752 fn infer_target(&self, input: &RuntimeControlInput) -> RuntimeState {
754 use RuntimeControlInput::{
755 AdmissionAccepted, AdmissionDeduplicated, AdmissionRejected, AttachExecutor, BeginRun,
756 DestroyRequested, DetachExecutor, ExternalToolDeltaReceived, Initialize,
757 RecoverRequested, RecoverySucceeded, RecycleRequested, RecycleSucceeded,
758 ResetRequested, ResumeRequested, RetireRequested, RunCancelled, RunCompleted,
759 RunFailed, StopRequested, SubmitWork,
760 };
761 match input {
762 Initialize => RuntimeState::Idle,
763 AttachExecutor => RuntimeState::Attached,
764 DetachExecutor => RuntimeState::Idle,
765 BeginRun { .. } => RuntimeState::Running,
766 RunCompleted { .. } | RunFailed { .. } | RunCancelled { .. } => {
767 self.resolve_run_return(&self.fields)
768 }
769 SubmitWork { .. }
770 | AdmissionAccepted { .. }
771 | AdmissionRejected { .. }
772 | AdmissionDeduplicated { .. }
773 | ExternalToolDeltaReceived => self.phase,
774 RecoverRequested | RecycleRequested => RuntimeState::Recovering,
775 RecoverySucceeded | RecycleSucceeded | ResumeRequested | ResetRequested => {
776 RuntimeState::Idle
777 }
778 RetireRequested => RuntimeState::Retired,
779 StopRequested => RuntimeState::Stopped,
780 DestroyRequested => RuntimeState::Destroyed,
781 }
782 }
783}
784
785impl RuntimeControlMutator for RuntimeControlAuthority {
786 fn apply(
787 &mut self,
788 input: RuntimeControlInput,
789 ) -> Result<RuntimeControlTransition, RuntimeStateTransitionError> {
790 let from_phase = self.phase;
791 let (next_phase, next_fields, effects) = self.evaluate(&input)?;
792
793 self.phase = next_phase;
795 self.fields = next_fields;
796
797 Ok(RuntimeControlTransition {
798 from_phase,
799 next_phase,
800 effects,
801 })
802 }
803}
804
805impl Default for RuntimeControlAuthority {
806 fn default() -> Self {
807 Self::new()
808 }
809}
810
811#[cfg(test)]
816#[allow(
817 clippy::unwrap_used,
818 clippy::expect_used,
819 clippy::redundant_clone,
820 clippy::panic
821)]
822mod tests {
823 use super::*;
824
825 fn make_idle() -> RuntimeControlAuthority {
826 let mut auth = RuntimeControlAuthority::new();
827 auth.apply(RuntimeControlInput::Initialize).unwrap();
828 auth
829 }
830
831 fn make_attached() -> RuntimeControlAuthority {
832 let mut auth = make_idle();
833 auth.apply(RuntimeControlInput::AttachExecutor).unwrap();
834 auth
835 }
836
837 fn make_running_from_idle() -> (RuntimeControlAuthority, RunId) {
838 let mut auth = make_idle();
839 let run_id = RunId::new();
840 auth.apply(RuntimeControlInput::BeginRun {
841 run_id: run_id.clone(),
842 })
843 .unwrap();
844 (auth, run_id)
845 }
846
847 fn make_running_from_attached() -> (RuntimeControlAuthority, RunId) {
848 let mut auth = make_attached();
849 let run_id = RunId::new();
850 auth.apply(RuntimeControlInput::BeginRun {
851 run_id: run_id.clone(),
852 })
853 .unwrap();
854 (auth, run_id)
855 }
856
857 #[test]
860 fn initialize_transitions_to_idle() {
861 let mut auth = RuntimeControlAuthority::new();
862 let t = auth.apply(RuntimeControlInput::Initialize).unwrap();
863 assert_eq!(t.from_phase, RuntimeState::Initializing);
864 assert_eq!(t.next_phase, RuntimeState::Idle);
865 assert_eq!(auth.phase(), RuntimeState::Idle);
866 }
867
868 #[test]
869 fn initialize_rejected_from_idle() {
870 let mut auth = make_idle();
871 assert!(auth.apply(RuntimeControlInput::Initialize).is_err());
872 }
873
874 #[test]
877 fn attach_from_idle() {
878 let mut auth = make_idle();
879 let t = auth.apply(RuntimeControlInput::AttachExecutor).unwrap();
880 assert_eq!(t.next_phase, RuntimeState::Attached);
881 assert!(auth.is_attached());
882 }
883
884 #[test]
885 fn attach_rejected_from_initializing() {
886 let mut auth = RuntimeControlAuthority::new();
887 assert!(auth.apply(RuntimeControlInput::AttachExecutor).is_err());
888 }
889
890 #[test]
891 fn detach_from_attached() {
892 let mut auth = make_attached();
893 let t = auth.apply(RuntimeControlInput::DetachExecutor).unwrap();
894 assert_eq!(t.next_phase, RuntimeState::Idle);
895 assert!(auth.is_idle());
896 }
897
898 #[test]
899 fn detach_rejected_from_idle() {
900 let mut auth = make_idle();
901 assert!(auth.apply(RuntimeControlInput::DetachExecutor).is_err());
902 }
903
904 #[test]
907 fn begin_run_from_idle() {
908 let run_id = RunId::new();
909 let mut auth = make_idle();
910 let t = auth
911 .apply(RuntimeControlInput::BeginRun {
912 run_id: run_id.clone(),
913 })
914 .unwrap();
915 assert_eq!(t.next_phase, RuntimeState::Running);
916 assert_eq!(auth.current_run_id(), Some(&run_id));
917 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Idle));
918 assert_eq!(t.effects.len(), 1);
919 assert!(matches!(
920 &t.effects[0],
921 RuntimeControlEffect::SubmitRunPrimitive { run_id: r } if r == &run_id
922 ));
923 }
924
925 #[test]
926 fn begin_run_from_attached() {
927 let run_id = RunId::new();
928 let mut auth = make_attached();
929 let t = auth
930 .apply(RuntimeControlInput::BeginRun {
931 run_id: run_id.clone(),
932 })
933 .unwrap();
934 assert_eq!(t.next_phase, RuntimeState::Running);
935 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Attached));
936 }
937
938 #[test]
939 fn begin_run_from_retired() {
940 let mut auth = make_idle();
941 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
942 let run_id = RunId::new();
943 let t = auth
944 .apply(RuntimeControlInput::BeginRun {
945 run_id: run_id.clone(),
946 })
947 .unwrap();
948 assert_eq!(t.next_phase, RuntimeState::Running);
949 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Retired));
950 }
951
952 #[test]
953 fn begin_run_rejected_from_running() {
954 let (mut auth, _) = make_running_from_idle();
955 assert!(
956 auth.apply(RuntimeControlInput::BeginRun {
957 run_id: RunId::new()
958 })
959 .is_err()
960 );
961 }
962
963 #[test]
964 fn begin_run_rejected_with_active_run() {
965 let mut auth = make_idle();
967 let run_id = RunId::new();
968 auth.apply(RuntimeControlInput::BeginRun {
969 run_id: run_id.clone(),
970 })
971 .unwrap();
972 assert!(auth.is_running());
973 assert!(
974 auth.apply(RuntimeControlInput::BeginRun {
975 run_id: RunId::new()
976 })
977 .is_err()
978 );
979 }
980
981 #[test]
984 fn run_completed_returns_to_idle() {
985 let (mut auth, run_id) = make_running_from_idle();
986 let t = auth
987 .apply(RuntimeControlInput::RunCompleted {
988 run_id: run_id.clone(),
989 })
990 .unwrap();
991 assert_eq!(t.next_phase, RuntimeState::Idle);
992 assert!(auth.current_run_id().is_none());
993 assert!(auth.pre_run_state().is_none());
994 }
995
996 #[test]
997 fn run_completed_returns_to_attached() {
998 let (mut auth, run_id) = make_running_from_attached();
999 let t = auth
1000 .apply(RuntimeControlInput::RunCompleted {
1001 run_id: run_id.clone(),
1002 })
1003 .unwrap();
1004 assert_eq!(t.next_phase, RuntimeState::Attached);
1005 }
1006
1007 #[test]
1008 fn run_completed_returns_to_retired() {
1009 let mut auth = make_idle();
1010 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1011 let run_id = RunId::new();
1012 auth.apply(RuntimeControlInput::BeginRun {
1013 run_id: run_id.clone(),
1014 })
1015 .unwrap();
1016 let t = auth
1017 .apply(RuntimeControlInput::RunCompleted {
1018 run_id: run_id.clone(),
1019 })
1020 .unwrap();
1021 assert_eq!(t.next_phase, RuntimeState::Retired);
1022 }
1023
1024 #[test]
1025 fn run_completed_after_retire_during_run() {
1026 let (mut auth, run_id) = make_running_from_attached();
1029 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1030 assert_eq!(auth.phase(), RuntimeState::Retired);
1031 let t = auth
1032 .apply(RuntimeControlInput::RunCompleted {
1033 run_id: run_id.clone(),
1034 })
1035 .unwrap();
1036 assert_eq!(t.next_phase, RuntimeState::Retired);
1037 }
1038
1039 #[test]
1040 fn run_failed_after_retire_during_run() {
1041 let (mut auth, run_id) = make_running_from_attached();
1042 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1043 let t = auth
1044 .apply(RuntimeControlInput::RunFailed {
1045 run_id: run_id.clone(),
1046 })
1047 .unwrap();
1048 assert_eq!(t.next_phase, RuntimeState::Retired);
1049 }
1050
1051 #[test]
1052 fn run_cancelled_after_retire_during_run() {
1053 let (mut auth, run_id) = make_running_from_attached();
1054 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1055 let t = auth
1056 .apply(RuntimeControlInput::RunCancelled {
1057 run_id: run_id.clone(),
1058 })
1059 .unwrap();
1060 assert_eq!(t.next_phase, RuntimeState::Retired);
1061 }
1062
1063 #[test]
1064 fn run_completed_rejected_wrong_run_id() {
1065 let (mut auth, _) = make_running_from_idle();
1066 assert!(
1067 auth.apply(RuntimeControlInput::RunCompleted {
1068 run_id: RunId::new()
1069 })
1070 .is_err()
1071 );
1072 }
1073
1074 #[test]
1077 fn run_failed_returns_to_idle() {
1078 let (mut auth, run_id) = make_running_from_idle();
1079 let t = auth
1080 .apply(RuntimeControlInput::RunFailed {
1081 run_id: run_id.clone(),
1082 })
1083 .unwrap();
1084 assert_eq!(t.next_phase, RuntimeState::Idle);
1085 }
1086
1087 #[test]
1090 fn run_cancelled_returns_to_attached() {
1091 let (mut auth, run_id) = make_running_from_attached();
1092 let t = auth
1093 .apply(RuntimeControlInput::RunCancelled {
1094 run_id: run_id.clone(),
1095 })
1096 .unwrap();
1097 assert_eq!(t.next_phase, RuntimeState::Attached);
1098 }
1099
1100 #[test]
1103 fn submit_work_idle_emits_resolve_admission() {
1104 let mut auth = make_idle();
1105 let t = auth
1106 .apply(RuntimeControlInput::SubmitWork {
1107 work_id: "w1".into(),
1108 })
1109 .unwrap();
1110 assert_eq!(t.next_phase, RuntimeState::Idle);
1111 assert_eq!(t.effects.len(), 1);
1112 assert!(matches!(
1113 &t.effects[0],
1114 RuntimeControlEffect::ResolveAdmission { work_id } if work_id == "w1"
1115 ));
1116 }
1117
1118 #[test]
1119 fn submit_work_running_stays_running() {
1120 let (mut auth, _) = make_running_from_idle();
1121 let t = auth
1122 .apply(RuntimeControlInput::SubmitWork {
1123 work_id: "w1".into(),
1124 })
1125 .unwrap();
1126 assert_eq!(t.next_phase, RuntimeState::Running);
1127 }
1128
1129 #[test]
1130 fn submit_work_attached_stays_attached() {
1131 let mut auth = make_attached();
1132 let t = auth
1133 .apply(RuntimeControlInput::SubmitWork {
1134 work_id: "w1".into(),
1135 })
1136 .unwrap();
1137 assert_eq!(t.next_phase, RuntimeState::Attached);
1138 }
1139
1140 #[test]
1143 fn admission_accepted_idle_queue_signals_wake() {
1144 let mut auth = make_idle();
1145 let t = auth
1146 .apply(RuntimeControlInput::AdmissionAccepted {
1147 work_id: "w1".into(),
1148 handling_mode: HandlingMode::Queue,
1149 })
1150 .unwrap();
1151 assert_eq!(t.next_phase, RuntimeState::Idle);
1152 assert!(auth.wake_pending());
1153 assert!(!auth.process_pending());
1154 assert!(t.effects.contains(&RuntimeControlEffect::SignalWake));
1155 assert!(
1156 !t.effects
1157 .contains(&RuntimeControlEffect::SignalImmediateProcess)
1158 );
1159 }
1160
1161 #[test]
1162 fn admission_accepted_idle_steer_signals_wake_and_process() {
1163 let mut auth = make_idle();
1164 let t = auth
1165 .apply(RuntimeControlInput::AdmissionAccepted {
1166 work_id: "w1".into(),
1167 handling_mode: HandlingMode::Steer,
1168 })
1169 .unwrap();
1170 assert_eq!(t.next_phase, RuntimeState::Idle);
1171 assert!(auth.wake_pending());
1172 assert!(auth.process_pending());
1173 assert!(t.effects.contains(&RuntimeControlEffect::SignalWake));
1174 assert!(
1175 t.effects
1176 .contains(&RuntimeControlEffect::SignalImmediateProcess)
1177 );
1178 }
1179
1180 #[test]
1181 fn admission_accepted_running_queue_no_signals() {
1182 let (mut auth, _) = make_running_from_idle();
1183 let t = auth
1184 .apply(RuntimeControlInput::AdmissionAccepted {
1185 work_id: "w1".into(),
1186 handling_mode: HandlingMode::Queue,
1187 })
1188 .unwrap();
1189 assert_eq!(t.next_phase, RuntimeState::Running);
1190 assert!(!auth.wake_pending());
1191 assert!(!auth.process_pending());
1192 assert!(!t.effects.contains(&RuntimeControlEffect::SignalWake));
1193 }
1194
1195 #[test]
1196 fn admission_accepted_running_steer_signals() {
1197 let (mut auth, _) = make_running_from_idle();
1198 let t = auth
1199 .apply(RuntimeControlInput::AdmissionAccepted {
1200 work_id: "w1".into(),
1201 handling_mode: HandlingMode::Steer,
1202 })
1203 .unwrap();
1204 assert_eq!(t.next_phase, RuntimeState::Running);
1205 assert!(auth.wake_pending());
1206 assert!(auth.process_pending());
1207 assert!(t.effects.contains(&RuntimeControlEffect::SignalWake));
1208 assert!(
1209 t.effects
1210 .contains(&RuntimeControlEffect::SignalImmediateProcess)
1211 );
1212 }
1213
1214 #[test]
1215 fn admission_accepted_attached_queue_signals_wake() {
1216 let mut auth = make_attached();
1217 let t = auth
1218 .apply(RuntimeControlInput::AdmissionAccepted {
1219 work_id: "w1".into(),
1220 handling_mode: HandlingMode::Queue,
1221 })
1222 .unwrap();
1223 assert_eq!(t.next_phase, RuntimeState::Attached);
1224 assert!(auth.wake_pending());
1225 assert!(!auth.process_pending());
1226 }
1227
1228 #[test]
1231 fn admission_rejected_emits_notice() {
1232 let mut auth = make_idle();
1233 let t = auth
1234 .apply(RuntimeControlInput::AdmissionRejected {
1235 work_id: "w1".into(),
1236 reason: "quota".into(),
1237 })
1238 .unwrap();
1239 assert_eq!(t.next_phase, RuntimeState::Idle);
1240 assert!(matches!(
1241 &t.effects[0],
1242 RuntimeControlEffect::EmitRuntimeNotice { kind, detail }
1243 if kind == "AdmissionRejected" && detail == "quota"
1244 ));
1245 }
1246
1247 #[test]
1250 fn admission_deduplicated_emits_notice() {
1251 let mut auth = make_idle();
1252 let t = auth
1253 .apply(RuntimeControlInput::AdmissionDeduplicated {
1254 work_id: "w1".into(),
1255 existing_work_id: "w0".into(),
1256 })
1257 .unwrap();
1258 assert_eq!(t.next_phase, RuntimeState::Idle);
1259 assert!(matches!(
1260 &t.effects[0],
1261 RuntimeControlEffect::EmitRuntimeNotice { kind, .. }
1262 if kind == "AdmissionDeduplicated"
1263 ));
1264 }
1265
1266 #[test]
1269 fn recover_from_idle() {
1270 let mut auth = make_idle();
1271 let t = auth.apply(RuntimeControlInput::RecoverRequested).unwrap();
1272 assert_eq!(t.next_phase, RuntimeState::Recovering);
1273 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Idle));
1274 }
1275
1276 #[test]
1277 fn recover_from_running_clears_run_id() {
1278 let (mut auth, _) = make_running_from_idle();
1279 let t = auth.apply(RuntimeControlInput::RecoverRequested).unwrap();
1280 assert_eq!(t.next_phase, RuntimeState::Recovering);
1281 assert!(auth.current_run_id().is_none());
1282 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Running));
1283 }
1284
1285 #[test]
1286 fn recover_from_attached() {
1287 let mut auth = make_attached();
1288 let t = auth.apply(RuntimeControlInput::RecoverRequested).unwrap();
1289 assert_eq!(t.next_phase, RuntimeState::Recovering);
1290 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Attached));
1291 }
1292
1293 #[test]
1296 fn recovery_succeeded_resets_to_idle() {
1297 let mut auth = make_idle();
1298 auth.apply(RuntimeControlInput::RecoverRequested).unwrap();
1299 let t = auth.apply(RuntimeControlInput::RecoverySucceeded).unwrap();
1300 assert_eq!(t.next_phase, RuntimeState::Idle);
1301 assert!(auth.current_run_id().is_none());
1302 assert!(auth.pre_run_state().is_none());
1303 assert!(!auth.wake_pending());
1304 assert!(!auth.process_pending());
1305 }
1306
1307 #[test]
1310 fn retire_from_idle() {
1311 let mut auth = make_idle();
1312 let t = auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1313 assert_eq!(t.next_phase, RuntimeState::Retired);
1314 assert!(matches!(
1315 &t.effects[0],
1316 RuntimeControlEffect::ApplyControlPlaneCommand { command } if command == "Retire"
1317 ));
1318 }
1319
1320 #[test]
1321 fn retire_from_attached() {
1322 let mut auth = make_attached();
1323 let t = auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1324 assert_eq!(t.next_phase, RuntimeState::Retired);
1325 }
1326
1327 #[test]
1328 fn retire_from_running() {
1329 let (mut auth, _) = make_running_from_idle();
1330 let t = auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1331 assert_eq!(t.next_phase, RuntimeState::Retired);
1332 }
1333
1334 #[test]
1337 fn reset_from_idle() {
1338 let mut auth = make_idle();
1339 let t = auth.apply(RuntimeControlInput::ResetRequested).unwrap();
1340 assert_eq!(t.next_phase, RuntimeState::Idle);
1341 assert!(t.effects.iter().any(|e| matches!(
1342 e,
1343 RuntimeControlEffect::ApplyControlPlaneCommand { command } if command == "Reset"
1344 )));
1345 assert!(t.effects.iter().any(|e| matches!(
1346 e,
1347 RuntimeControlEffect::ResolveCompletionAsTerminated { reason } if reason == "Reset"
1348 )));
1349 }
1350
1351 #[test]
1352 fn reset_from_retired() {
1353 let mut auth = make_idle();
1354 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1355 let t = auth.apply(RuntimeControlInput::ResetRequested).unwrap();
1356 assert_eq!(t.next_phase, RuntimeState::Idle);
1357 }
1358
1359 #[test]
1360 fn reset_rejected_from_running() {
1361 let (mut auth, _) = make_running_from_idle();
1362 assert!(auth.apply(RuntimeControlInput::ResetRequested).is_err());
1363 }
1364
1365 #[test]
1368 fn stop_from_idle() {
1369 let mut auth = make_idle();
1370 let t = auth.apply(RuntimeControlInput::StopRequested).unwrap();
1371 assert_eq!(t.next_phase, RuntimeState::Stopped);
1372 }
1373
1374 #[test]
1375 fn stop_from_running() {
1376 let (mut auth, _) = make_running_from_idle();
1377 let t = auth.apply(RuntimeControlInput::StopRequested).unwrap();
1378 assert_eq!(t.next_phase, RuntimeState::Stopped);
1379 assert!(auth.current_run_id().is_none());
1380 }
1381
1382 #[test]
1383 fn stop_is_terminal() {
1384 let mut auth = make_idle();
1385 auth.apply(RuntimeControlInput::StopRequested).unwrap();
1386 assert!(auth.apply(RuntimeControlInput::ResetRequested).is_err());
1387 }
1388
1389 #[test]
1392 fn destroy_from_idle() {
1393 let mut auth = make_idle();
1394 let t = auth.apply(RuntimeControlInput::DestroyRequested).unwrap();
1395 assert_eq!(t.next_phase, RuntimeState::Destroyed);
1396 }
1397
1398 #[test]
1399 fn destroy_from_stopped() {
1400 let mut auth = make_idle();
1401 auth.apply(RuntimeControlInput::StopRequested).unwrap();
1402 let t = auth.apply(RuntimeControlInput::DestroyRequested).unwrap();
1403 assert_eq!(t.next_phase, RuntimeState::Destroyed);
1404 }
1405
1406 #[test]
1407 fn destroy_is_terminal() {
1408 let mut auth = make_idle();
1409 auth.apply(RuntimeControlInput::DestroyRequested).unwrap();
1410 assert!(auth.apply(RuntimeControlInput::DestroyRequested).is_err());
1411 }
1412
1413 #[test]
1416 fn resume_from_recovering() {
1417 let mut auth = make_idle();
1418 auth.apply(RuntimeControlInput::RecoverRequested).unwrap();
1419 let t = auth.apply(RuntimeControlInput::ResumeRequested).unwrap();
1420 assert_eq!(t.next_phase, RuntimeState::Idle);
1421 }
1422
1423 #[test]
1424 fn resume_rejected_from_idle() {
1425 let mut auth = make_idle();
1426 assert!(auth.apply(RuntimeControlInput::ResumeRequested).is_err());
1427 }
1428
1429 #[test]
1432 fn external_tool_delta_stays_in_phase() {
1433 for init_phase in [
1434 RuntimeState::Idle,
1435 RuntimeState::Attached,
1436 RuntimeState::Running,
1437 RuntimeState::Recovering,
1438 RuntimeState::Retired,
1439 ] {
1440 let mut auth = RuntimeControlAuthority::from_state(init_phase);
1441 if init_phase == RuntimeState::Running {
1443 auth.fields.current_run_id = Some(RunId::new());
1444 }
1445 let t = auth
1446 .apply(RuntimeControlInput::ExternalToolDeltaReceived)
1447 .unwrap();
1448 assert_eq!(
1449 t.next_phase, init_phase,
1450 "ExternalToolDelta should stay in {init_phase}"
1451 );
1452 }
1453 }
1454
1455 #[test]
1458 fn recycle_from_retired() {
1459 let mut auth = make_idle();
1460 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1461 let t = auth.apply(RuntimeControlInput::RecycleRequested).unwrap();
1462 assert_eq!(t.next_phase, RuntimeState::Recovering);
1463 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Retired));
1464 assert!(t.effects.contains(&RuntimeControlEffect::InitiateRecycle));
1465 }
1466
1467 #[test]
1468 fn recycle_from_idle() {
1469 let mut auth = make_idle();
1470 let t = auth.apply(RuntimeControlInput::RecycleRequested).unwrap();
1471 assert_eq!(t.next_phase, RuntimeState::Recovering);
1472 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Idle));
1473 }
1474
1475 #[test]
1476 fn recycle_from_attached() {
1477 let mut auth = make_attached();
1478 let t = auth.apply(RuntimeControlInput::RecycleRequested).unwrap();
1479 assert_eq!(t.next_phase, RuntimeState::Recovering);
1480 assert_eq!(auth.pre_run_state(), Some(RuntimeState::Attached));
1481 }
1482
1483 #[test]
1484 fn recycle_rejected_with_active_run() {
1485 let (mut auth, _) = make_running_from_idle();
1486 assert!(auth.apply(RuntimeControlInput::RecycleRequested).is_err());
1489 }
1490
1491 #[test]
1494 fn recycle_succeeded_from_recovering() {
1495 let mut auth = make_idle();
1496 auth.apply(RuntimeControlInput::RecycleRequested).unwrap();
1497 let t = auth.apply(RuntimeControlInput::RecycleSucceeded).unwrap();
1498 assert_eq!(t.next_phase, RuntimeState::Idle);
1499 assert!(auth.current_run_id().is_none());
1500 assert!(auth.pre_run_state().is_none());
1501 assert!(matches!(
1502 &t.effects[0],
1503 RuntimeControlEffect::EmitRuntimeNotice { kind, detail }
1504 if kind == "Recycle" && detail == "Succeeded"
1505 ));
1506 }
1507
1508 #[test]
1511 fn idle_running_idle_cycle() {
1512 let mut auth = make_idle();
1513 for _ in 0..3 {
1514 let run_id = RunId::new();
1515 auth.apply(RuntimeControlInput::BeginRun {
1516 run_id: run_id.clone(),
1517 })
1518 .unwrap();
1519 assert!(auth.is_running());
1520 auth.apply(RuntimeControlInput::RunCompleted {
1521 run_id: run_id.clone(),
1522 })
1523 .unwrap();
1524 assert!(auth.is_idle());
1525 }
1526 }
1527
1528 #[test]
1529 fn attached_running_attached_cycle() {
1530 let mut auth = make_attached();
1531 for _ in 0..3 {
1532 let run_id = RunId::new();
1533 auth.apply(RuntimeControlInput::BeginRun {
1534 run_id: run_id.clone(),
1535 })
1536 .unwrap();
1537 assert!(auth.is_running());
1538 auth.apply(RuntimeControlInput::RunCompleted {
1539 run_id: run_id.clone(),
1540 })
1541 .unwrap();
1542 assert!(auth.is_attached());
1543 }
1544 }
1545
1546 #[test]
1547 fn retired_drain_cycle() {
1548 let mut auth = make_idle();
1549 auth.apply(RuntimeControlInput::RetireRequested).unwrap();
1550 assert!(auth.can_process_queue());
1551
1552 let run_id = RunId::new();
1553 auth.apply(RuntimeControlInput::BeginRun {
1554 run_id: run_id.clone(),
1555 })
1556 .unwrap();
1557 assert!(auth.is_running());
1558
1559 auth.apply(RuntimeControlInput::RunCompleted {
1560 run_id: run_id.clone(),
1561 })
1562 .unwrap();
1563 assert_eq!(auth.phase(), RuntimeState::Retired);
1564 }
1565
1566 #[test]
1567 fn can_accept_probes_without_mutation() {
1568 let auth = make_idle();
1569 assert!(auth.can_accept(&RuntimeControlInput::AttachExecutor));
1570 assert!(!auth.can_accept(&RuntimeControlInput::DetachExecutor));
1571 assert_eq!(auth.phase(), RuntimeState::Idle);
1572 }
1573
1574 #[test]
1577 fn destroy_from_all_non_terminal_phases() {
1578 for phase in [
1579 RuntimeState::Initializing,
1580 RuntimeState::Idle,
1581 RuntimeState::Attached,
1582 RuntimeState::Running,
1583 RuntimeState::Recovering,
1584 RuntimeState::Retired,
1585 RuntimeState::Stopped,
1586 ] {
1587 let mut auth = RuntimeControlAuthority::from_state(phase);
1588 let t = auth
1589 .apply(RuntimeControlInput::DestroyRequested)
1590 .unwrap_or_else(|_| panic!("destroy should work from {phase}"));
1591 assert_eq!(t.next_phase, RuntimeState::Destroyed);
1592 }
1593 }
1594
1595 #[test]
1596 fn destroy_from_destroyed_is_rejected() {
1597 let mut auth = RuntimeControlAuthority::from_state(RuntimeState::Destroyed);
1598 assert!(auth.apply(RuntimeControlInput::DestroyRequested).is_err());
1599 }
1600}