1use std::collections::VecDeque;
30use std::fmt;
31
32pub const WORKER_PROTOCOL_VERSION: u32 = 1;
34
35pub const MAX_PAYLOAD_BYTES: usize = 262_144;
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum WorkerPayloadTransfer {
46 StructuredClone,
48 TransferArrayBuffer,
50}
51
52#[must_use]
53fn replay_hash(
54 message_id: u64,
55 seq_no: u64,
56 decision_seq: u64,
57 seed: u64,
58 issued_at_turn: u64,
59 worker_id: Option<&str>,
60 op: &WorkerOp,
61) -> u64 {
62 let mut hash = message_id
63 .wrapping_mul(0x9E37_79B1_85EB_CA87)
64 .wrapping_add(seq_no.rotate_left(13))
65 ^ decision_seq.rotate_left(23)
66 ^ seed.rotate_left(29)
67 ^ issued_at_turn.rotate_left(47);
68 if let Some(worker_id) = worker_id {
69 for byte in worker_id.as_bytes() {
70 hash = hash.rotate_left(7) ^ u64::from(*byte);
71 hash = hash.wrapping_mul(0x100_0000_01B3);
72 }
73 }
74 for byte in serde_json::to_vec(op).unwrap_or_default() {
75 hash = hash.rotate_left(5) ^ u64::from(byte);
76 hash = hash.wrapping_mul(0x100_0000_01B3);
77 }
78 hash
79}
80
81fn validate_payload_size(size: usize) -> Result<(), WorkerChannelError> {
82 if size > MAX_PAYLOAD_BYTES {
83 return Err(WorkerChannelError::PayloadTooLarge {
84 size,
85 max: MAX_PAYLOAD_BYTES,
86 });
87 }
88 Ok(())
89}
90
91#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
98pub struct WorkerEnvelope {
99 pub version: u32,
101 pub message_id: u64,
103 pub seq_no: u64,
105 pub decision_seq: u64,
107 pub seed: u64,
109 pub issued_at_turn: u64,
111 #[serde(default, skip_serializing_if = "Option::is_none")]
117 pub worker_id: Option<String>,
118 pub replay_hash: u64,
120 pub op: WorkerOp,
122}
123
124impl WorkerEnvelope {
125 #[must_use]
131 pub fn new(message_id: u64, seq_no: u64, seed: u64, issued_at_turn: u64, op: WorkerOp) -> Self {
132 Self::new_with_decision_seq(message_id, seq_no, seq_no, seed, issued_at_turn, op)
133 }
134
135 #[must_use]
141 pub fn new_with_decision_seq(
142 message_id: u64,
143 seq_no: u64,
144 decision_seq: u64,
145 seed: u64,
146 issued_at_turn: u64,
147 op: WorkerOp,
148 ) -> Self {
149 Self {
150 version: WORKER_PROTOCOL_VERSION,
151 message_id,
152 seq_no,
153 decision_seq,
154 seed,
155 issued_at_turn,
156 worker_id: None,
157 replay_hash: replay_hash(
158 message_id,
159 seq_no,
160 decision_seq,
161 seed,
162 issued_at_turn,
163 None,
164 &op,
165 ),
166 op,
167 }
168 }
169
170 #[must_use]
172 pub fn from_worker(
173 worker_id: impl Into<String>,
174 message_id: u64,
175 seq_no: u64,
176 seed: u64,
177 issued_at_turn: u64,
178 op: WorkerOp,
179 ) -> Self {
180 Self::from_worker_with_decision_seq(
181 worker_id,
182 message_id,
183 seq_no,
184 seq_no,
185 seed,
186 issued_at_turn,
187 op,
188 )
189 }
190
191 #[must_use]
193 pub fn from_worker_with_decision_seq(
194 worker_id: impl Into<String>,
195 message_id: u64,
196 seq_no: u64,
197 decision_seq: u64,
198 seed: u64,
199 issued_at_turn: u64,
200 op: WorkerOp,
201 ) -> Self {
202 let worker_id = worker_id.into();
203 Self {
204 version: WORKER_PROTOCOL_VERSION,
205 message_id,
206 seq_no,
207 decision_seq,
208 seed,
209 issued_at_turn,
210 worker_id: Some(worker_id.clone()),
211 replay_hash: replay_hash(
212 message_id,
213 seq_no,
214 decision_seq,
215 seed,
216 issued_at_turn,
217 Some(worker_id.as_str()),
218 &op,
219 ),
220 op,
221 }
222 }
223
224 pub fn validate(&self) -> Result<(), WorkerChannelError> {
226 if self.version != WORKER_PROTOCOL_VERSION {
227 return Err(WorkerChannelError::VersionMismatch {
228 expected: WORKER_PROTOCOL_VERSION,
229 actual: self.version,
230 });
231 }
232 let expected_replay_hash = replay_hash(
233 self.message_id,
234 self.seq_no,
235 self.decision_seq,
236 self.seed,
237 self.issued_at_turn,
238 self.worker_id.as_deref(),
239 &self.op,
240 );
241 if self.replay_hash != expected_replay_hash {
242 return Err(WorkerChannelError::ReplayHashMismatch {
243 expected: expected_replay_hash,
244 actual: self.replay_hash,
245 });
246 }
247 self.validate_worker_identity()?;
248 match &self.op {
249 WorkerOp::SpawnJob(req) => validate_payload_size(req.payload.len())?,
250 WorkerOp::JobCompleted(JobResult {
251 outcome: JobOutcome::Ok { payload },
252 ..
253 }) => validate_payload_size(payload.len())?,
254 _ => {}
255 }
256 Ok(())
257 }
258
259 fn validate_worker_identity(&self) -> Result<(), WorkerChannelError> {
260 match &self.op {
261 WorkerOp::BootstrapReady { worker_id }
262 | WorkerOp::BootstrapFailed { worker_id, .. } => {
263 let actual = self
264 .worker_id
265 .as_deref()
266 .ok_or(WorkerChannelError::MissingWorkerSessionIdentity)?;
267 if actual != worker_id {
268 return Err(WorkerChannelError::WorkerIdentityMismatch {
269 expected: worker_id.clone(),
270 actual: actual.to_string(),
271 });
272 }
273 }
274 WorkerOp::StatusSnapshot(_)
275 | WorkerOp::JobCompleted(_)
276 | WorkerOp::CancelAcknowledged { .. }
277 | WorkerOp::DrainCompleted { .. }
278 | WorkerOp::FinalizeCompleted { .. }
279 | WorkerOp::ShutdownCompleted
280 | WorkerOp::Diagnostic(_) => {
281 if self.worker_id.is_none() {
282 return Err(WorkerChannelError::MissingWorkerSessionIdentity);
283 }
284 }
285 WorkerOp::SpawnJob(_)
286 | WorkerOp::PollStatus { .. }
287 | WorkerOp::CancelJob { .. }
288 | WorkerOp::DrainJob { .. }
289 | WorkerOp::FinalizeJob { .. }
290 | WorkerOp::ShutdownWorker { .. } => {
291 if let Some(worker_id) = &self.worker_id {
292 return Err(WorkerChannelError::UnexpectedWorkerSessionIdentity(
293 worker_id.clone(),
294 ));
295 }
296 }
297 }
298 Ok(())
299 }
300}
301
302#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
310#[serde(tag = "type")]
311pub enum WorkerOp {
312 BootstrapReady {
315 worker_id: String,
317 },
318 BootstrapFailed {
320 worker_id: String,
322 reason: String,
324 },
325
326 SpawnJob(SpawnJobRequest),
329 PollStatus {
331 job_id: u64,
333 },
334 StatusSnapshot(JobStatusSnapshot),
336 JobCompleted(JobResult),
338
339 CancelJob {
342 job_id: u64,
344 reason: String,
346 },
347 CancelAcknowledged {
349 job_id: u64,
351 },
352 DrainJob {
354 job_id: u64,
356 },
357 DrainCompleted {
359 job_id: u64,
361 },
362 FinalizeJob {
364 job_id: u64,
366 },
367 FinalizeCompleted {
369 job_id: u64,
371 },
372
373 ShutdownWorker {
376 reason: String,
378 },
379 ShutdownCompleted,
381
382 Diagnostic(DiagnosticEvent),
385}
386
387#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
389pub struct SpawnJobRequest {
390 pub job_id: u64,
392 pub region_id: u64,
394 pub task_id: u64,
396 pub obligation_id: u64,
398 pub payload: Vec<u8>,
400}
401
402impl SpawnJobRequest {
403 #[must_use]
405 pub fn payload_transfer(&self) -> WorkerPayloadTransfer {
406 WorkerPayloadTransfer::StructuredClone
407 }
408
409 #[must_use]
411 pub fn owned_payload(&self) -> bool {
412 true
413 }
414}
415
416#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
418pub struct JobStatusSnapshot {
419 pub job_id: u64,
421 pub state: JobState,
430 pub detail: Option<String>,
432}
433
434#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
436pub struct JobResult {
437 pub job_id: u64,
439 pub outcome: JobOutcome,
441}
442
443#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
445#[serde(tag = "status")]
446pub enum JobOutcome {
447 Ok {
449 payload: Vec<u8>,
451 },
452 Err {
454 code: String,
456 message: String,
458 },
459 Cancelled {
461 reason: String,
463 },
464 Panicked {
466 message: String,
468 },
469}
470
471impl JobOutcome {
472 #[must_use]
474 pub fn payload_transfer(&self) -> Option<WorkerPayloadTransfer> {
475 match self {
476 Self::Ok { .. } => Some(WorkerPayloadTransfer::StructuredClone),
477 Self::Err { .. } | Self::Cancelled { .. } | Self::Panicked { .. } => None,
478 }
479 }
480}
481
482#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
484pub struct DiagnosticEvent {
485 pub level: DiagnosticLevel,
487 pub category: String,
489 pub message: String,
491 pub metadata: Option<String>,
493}
494
495#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
497pub enum DiagnosticLevel {
498 Info,
500 Warn,
502 Error,
504}
505
506#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
522pub enum JobState {
523 Created,
525 Queued,
527 Running,
529 CancelRequested,
531 Draining,
533 Finalizing,
535 Completed,
537 Failed,
539}
540
541impl fmt::Display for JobState {
542 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
543 match self {
544 Self::Created => write!(f, "created"),
545 Self::Queued => write!(f, "queued"),
546 Self::Running => write!(f, "running"),
547 Self::CancelRequested => write!(f, "cancel_requested"),
548 Self::Draining => write!(f, "draining"),
549 Self::Finalizing => write!(f, "finalizing"),
550 Self::Completed => write!(f, "completed"),
551 Self::Failed => write!(f, "failed"),
552 }
553 }
554}
555
556impl JobState {
557 #[must_use]
559 pub const fn is_terminal(self) -> bool {
560 matches!(self, Self::Completed | Self::Failed)
561 }
562
563 #[must_use]
569 pub const fn allowed_in_status_snapshot(self) -> bool {
570 matches!(self, Self::Queued | Self::Running)
571 }
572
573 #[must_use]
575 pub fn can_transition_to(self, next: Self) -> bool {
576 matches!(
577 (self, next),
578 (Self::Created, Self::Queued | Self::Failed)
579 | (
580 Self::Queued,
581 Self::Running | Self::CancelRequested | Self::Completed | Self::Failed,
582 )
583 | (
584 Self::Running,
585 Self::Completed | Self::CancelRequested | Self::Failed,
586 )
587 | (
588 Self::CancelRequested,
589 Self::Completed | Self::Draining | Self::Failed,
590 )
591 | (Self::Draining, Self::Finalizing | Self::Failed)
592 | (Self::Finalizing, Self::Completed | Self::Failed)
593 )
594 }
595}
596
597#[derive(Debug)]
601pub struct TrackedJob {
602 pub job_id: u64,
604 pub region_id: u64,
606 pub state: JobState,
608 pub last_seq_no: u64,
610}
611
612impl TrackedJob {
613 #[must_use]
615 pub fn new(job_id: u64, region_id: u64) -> Self {
616 Self {
617 job_id,
618 region_id,
619 state: JobState::Created,
620 last_seq_no: 0,
621 }
622 }
623
624 pub fn transition_to(&mut self, next: JobState) -> Result<JobState, WorkerChannelError> {
626 if !self.state.can_transition_to(next) {
627 return Err(WorkerChannelError::InvalidTransition {
628 job_id: self.job_id,
629 from: self.state,
630 to: next,
631 });
632 }
633 let prev = self.state;
634 self.state = next;
635 Ok(prev)
636 }
637}
638
639#[derive(Debug)]
646pub struct WorkerCoordinator {
647 outbox: VecDeque<WorkerEnvelope>,
649 next_seq: u64,
651 next_decision_seq: u64,
653 next_message_id: u64,
655 jobs: std::collections::BTreeMap<u64, TrackedJob>,
657 seed: u64,
659 turn: u64,
661 last_inbound_seq_no: u64,
663 active_worker_id: Option<String>,
665 worker_ready: bool,
667 shutdown_requested: bool,
669}
670
671impl WorkerCoordinator {
672 #[must_use]
674 pub fn new(seed: u64) -> Self {
675 Self {
676 outbox: VecDeque::new(),
677 next_seq: 1,
678 next_decision_seq: 1,
679 next_message_id: 1,
680 jobs: std::collections::BTreeMap::new(),
681 seed,
682 turn: 0,
683 last_inbound_seq_no: 0,
684 active_worker_id: None,
685 worker_ready: false,
686 shutdown_requested: false,
687 }
688 }
689
690 pub fn advance_turn(&mut self) {
692 self.turn += 1;
693 }
694
695 #[must_use]
697 pub fn is_worker_ready(&self) -> bool {
698 self.worker_ready
699 }
700
701 #[must_use]
703 pub fn is_shutdown_requested(&self) -> bool {
704 self.shutdown_requested
705 }
706
707 #[must_use]
709 pub fn inflight_count(&self) -> usize {
710 self.jobs
711 .values()
712 .filter(|j| j.state != JobState::Completed && j.state != JobState::Failed)
713 .count()
714 }
715
716 pub fn spawn_job(
718 &mut self,
719 job_id: u64,
720 region_id: u64,
721 task_id: u64,
722 obligation_id: u64,
723 payload: Vec<u8>,
724 ) -> Result<u64, WorkerChannelError> {
725 if !self.worker_ready {
726 return Err(WorkerChannelError::WorkerNotReady);
727 }
728 if self.shutdown_requested {
729 return Err(WorkerChannelError::ShutdownInProgress);
730 }
731 validate_payload_size(payload.len())?;
732 if self.jobs.contains_key(&job_id) {
733 return Err(WorkerChannelError::DuplicateJobId(job_id));
734 }
735
736 let mut tracked = TrackedJob::new(job_id, region_id);
737 tracked.transition_to(JobState::Queued)?;
738
739 let envelope = self.make_envelope(WorkerOp::SpawnJob(SpawnJobRequest {
740 job_id,
741 region_id,
742 task_id,
743 obligation_id,
744 payload,
745 }));
746 tracked.last_seq_no = envelope.seq_no;
747 self.jobs.insert(job_id, tracked);
748 self.outbox.push_back(envelope);
749 Ok(job_id)
750 }
751
752 pub fn cancel_job(&mut self, job_id: u64, reason: String) -> Result<(), WorkerChannelError> {
754 if self.shutdown_requested {
755 return Err(WorkerChannelError::ShutdownInProgress);
756 }
757 {
758 let job = self
759 .jobs
760 .get_mut(&job_id)
761 .ok_or(WorkerChannelError::UnknownJobId(job_id))?;
762 job.transition_to(JobState::CancelRequested)?;
763 }
764
765 self.enqueue_job_message(job_id, WorkerOp::CancelJob { job_id, reason })
766 }
767
768 pub fn poll_status(&mut self, job_id: u64) -> Result<(), WorkerChannelError> {
770 if self.shutdown_requested {
771 return Err(WorkerChannelError::ShutdownInProgress);
772 }
773 if !self.jobs.contains_key(&job_id) {
774 return Err(WorkerChannelError::UnknownJobId(job_id));
775 }
776 self.enqueue_job_message(job_id, WorkerOp::PollStatus { job_id })
777 }
778
779 pub fn request_shutdown(&mut self, reason: String) -> Result<(), WorkerChannelError> {
781 if self.shutdown_requested {
782 return Err(WorkerChannelError::ShutdownInProgress);
783 }
784 self.shutdown_requested = true;
785 self.discard_outbound_session_messages();
786 let envelope = self.make_envelope(WorkerOp::ShutdownWorker { reason });
787 self.outbox.push_back(envelope);
788 Ok(())
789 }
790
791 pub fn handle_inbound(&mut self, envelope: &WorkerEnvelope) -> Result<(), WorkerChannelError> {
793 self.prepare_inbound(envelope)?;
794 match &envelope.op {
795 WorkerOp::BootstrapReady { worker_id } => {
796 self.active_worker_id = Some(worker_id.clone());
797 self.worker_ready = true;
798 self.shutdown_requested = false;
799 Ok(())
800 }
801 WorkerOp::BootstrapFailed { reason, .. } => {
802 self.fail_nonterminal_jobs();
803 self.discard_outbound_session_messages();
804 self.active_worker_id = None;
809 self.worker_ready = false;
810 self.shutdown_requested = false;
811 Err(WorkerChannelError::BootstrapFailed(reason.clone()))
812 }
813 WorkerOp::StatusSnapshot(snapshot) => self.handle_status_snapshot(snapshot),
814 WorkerOp::JobCompleted(result) => {
815 let job = self
816 .jobs
817 .get_mut(&result.job_id)
818 .ok_or(WorkerChannelError::UnknownJobId(result.job_id))?;
819 if !matches!(
820 job.state,
821 JobState::Queued | JobState::Running | JobState::CancelRequested
822 ) {
823 return Err(WorkerChannelError::UnexpectedCompletionPhase {
824 job_id: result.job_id,
825 state: job.state,
826 });
827 }
828 job.transition_to(JobState::Completed)?;
829 Ok(())
830 }
831 WorkerOp::CancelAcknowledged { job_id } => {
832 if !self.jobs.contains_key(job_id) {
833 return Err(WorkerChannelError::UnknownJobId(*job_id));
834 }
835 if self.shutdown_requested {
839 return Ok(());
840 }
841 {
842 let job = self
843 .jobs
844 .get_mut(job_id)
845 .ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
846 job.transition_to(JobState::Draining)?;
847 }
848 self.enqueue_job_message(*job_id, WorkerOp::DrainJob { job_id: *job_id })
849 }
850 WorkerOp::DrainCompleted { job_id } => {
851 if !self.jobs.contains_key(job_id) {
852 return Err(WorkerChannelError::UnknownJobId(*job_id));
853 }
854 if self.shutdown_requested {
857 return Ok(());
858 }
859 {
860 let job = self
861 .jobs
862 .get_mut(job_id)
863 .ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
864 job.transition_to(JobState::Finalizing)?;
865 }
866 self.enqueue_job_message(*job_id, WorkerOp::FinalizeJob { job_id: *job_id })
867 }
868 WorkerOp::FinalizeCompleted { job_id } => {
869 let job = self
870 .jobs
871 .get_mut(job_id)
872 .ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
873 job.transition_to(JobState::Completed)?;
874 Ok(())
875 }
876 WorkerOp::ShutdownCompleted => {
877 self.fail_nonterminal_jobs();
878 self.discard_outbound_session_messages();
879 self.active_worker_id = None;
880 self.shutdown_requested = false;
881 self.worker_ready = false;
882 Ok(())
883 }
884 WorkerOp::Diagnostic(_) => Ok(()),
885 WorkerOp::SpawnJob(_)
887 | WorkerOp::PollStatus { .. }
888 | WorkerOp::CancelJob { .. }
889 | WorkerOp::DrainJob { .. }
890 | WorkerOp::FinalizeJob { .. }
891 | WorkerOp::ShutdownWorker { .. } => Err(WorkerChannelError::UnexpectedDirection {
892 op: format!("{:?}", std::mem::discriminant(&envelope.op)),
893 }),
894 }
895 }
896
897 #[must_use]
899 pub fn drain_outbox(&mut self) -> Option<WorkerEnvelope> {
900 self.outbox.pop_front()
901 }
902
903 #[must_use]
905 pub fn job_state(&self, job_id: u64) -> Option<JobState> {
906 self.jobs.get(&job_id).map(|j| j.state)
907 }
908
909 fn enqueue_job_message(&mut self, job_id: u64, op: WorkerOp) -> Result<(), WorkerChannelError> {
910 if !self.jobs.contains_key(&job_id) {
911 return Err(WorkerChannelError::UnknownJobId(job_id));
912 }
913 let envelope = self.make_envelope(op);
914 if let Some(job) = self.jobs.get_mut(&job_id) {
915 job.last_seq_no = envelope.seq_no;
916 }
917 self.outbox.push_back(envelope);
918 Ok(())
919 }
920
921 fn handle_status_snapshot(
922 &mut self,
923 snapshot: &JobStatusSnapshot,
924 ) -> Result<(), WorkerChannelError> {
925 if snapshot.state.is_terminal() {
926 return Err(WorkerChannelError::TerminalStatusSnapshot {
927 job_id: snapshot.job_id,
928 state: snapshot.state,
929 });
930 }
931 if !snapshot.state.allowed_in_status_snapshot() {
932 return Err(WorkerChannelError::InvalidStatusSnapshotState {
933 job_id: snapshot.job_id,
934 state: snapshot.state,
935 });
936 }
937 let job = self
938 .jobs
939 .get_mut(&snapshot.job_id)
940 .ok_or(WorkerChannelError::UnknownJobId(snapshot.job_id))?;
941 if job.state != snapshot.state {
942 job.transition_to(snapshot.state)?;
943 }
944 Ok(())
945 }
946
947 fn prepare_inbound(&mut self, envelope: &WorkerEnvelope) -> Result<(), WorkerChannelError> {
948 envelope.validate()?;
949 if self.should_reset_inbound_session(&envelope.op) {
950 self.fail_nonterminal_jobs();
951 self.reset_inbound_session();
952 }
953 self.validate_inbound_worker_session(envelope)?;
954 self.validate_inbound_sequence(envelope.seq_no)?;
955 self.record_inbound_sequence(envelope.seq_no);
956 Ok(())
957 }
958
959 fn should_reset_inbound_session(&self, op: &WorkerOp) -> bool {
960 match op {
961 WorkerOp::BootstrapReady { worker_id }
962 | WorkerOp::BootstrapFailed { worker_id, .. } => {
963 self.active_worker_id.as_deref() != Some(worker_id.as_str())
964 }
965 _ => false,
966 }
967 }
968
969 fn fail_nonterminal_jobs(&mut self) {
970 for job in self.jobs.values_mut() {
971 if job.state.is_terminal() {
972 continue;
973 }
974 let _ = job.transition_to(JobState::Failed);
975 }
976 }
977
978 fn discard_outbound_session_messages(&mut self) {
979 self.outbox.clear();
980 }
981
982 fn reset_inbound_session(&mut self) {
983 self.last_inbound_seq_no = 0;
984 self.discard_outbound_session_messages();
985 }
986
987 fn validate_inbound_sequence(&self, seq_no: u64) -> Result<(), WorkerChannelError> {
988 if seq_no <= self.last_inbound_seq_no {
989 return Err(WorkerChannelError::InboundSequenceNotFresh {
990 last_seen: self.last_inbound_seq_no,
991 actual: seq_no,
992 });
993 }
994 Ok(())
995 }
996
997 fn record_inbound_sequence(&mut self, seq_no: u64) {
998 self.last_inbound_seq_no = seq_no;
999 }
1000
1001 fn validate_inbound_worker_session(
1002 &self,
1003 envelope: &WorkerEnvelope,
1004 ) -> Result<(), WorkerChannelError> {
1005 match &envelope.op {
1006 WorkerOp::BootstrapReady { .. }
1007 | WorkerOp::BootstrapFailed { .. }
1008 | WorkerOp::SpawnJob(_)
1009 | WorkerOp::PollStatus { .. }
1010 | WorkerOp::CancelJob { .. }
1011 | WorkerOp::DrainJob { .. }
1012 | WorkerOp::FinalizeJob { .. }
1013 | WorkerOp::ShutdownWorker { .. } => Ok(()),
1014 _ => {
1015 let actual = envelope
1016 .worker_id
1017 .as_deref()
1018 .ok_or(WorkerChannelError::MissingWorkerSessionIdentity)?;
1019 if self.active_worker_id.as_deref() == Some(actual) {
1020 return Ok(());
1021 }
1022 Err(WorkerChannelError::InboundWorkerSessionMismatch {
1023 expected: self.active_worker_id.clone(),
1024 actual: actual.to_string(),
1025 })
1026 }
1027 }
1028 }
1029
1030 fn make_envelope(&mut self, op: WorkerOp) -> WorkerEnvelope {
1031 let msg_id = self.next_message_id;
1032 let seq = self.next_seq;
1033 let decision_seq = self.next_decision_seq;
1034 self.next_message_id += 1;
1035 self.next_seq += 1;
1036 self.next_decision_seq += 1;
1037 WorkerEnvelope::new_with_decision_seq(msg_id, seq, decision_seq, self.seed, self.turn, op)
1038 }
1039}
1040
1041#[derive(Debug, Clone, PartialEq, Eq)]
1045pub enum WorkerChannelError {
1046 VersionMismatch {
1048 expected: u32,
1050 actual: u32,
1052 },
1053 ReplayHashMismatch {
1055 expected: u64,
1057 actual: u64,
1059 },
1060 InboundSequenceNotFresh {
1062 last_seen: u64,
1064 actual: u64,
1066 },
1067 MissingWorkerSessionIdentity,
1069 UnexpectedWorkerSessionIdentity(String),
1071 WorkerIdentityMismatch {
1073 expected: String,
1075 actual: String,
1077 },
1078 InboundWorkerSessionMismatch {
1080 expected: Option<String>,
1082 actual: String,
1084 },
1085 PayloadTooLarge {
1087 size: usize,
1089 max: usize,
1091 },
1092 InvalidTransition {
1094 job_id: u64,
1096 from: JobState,
1098 to: JobState,
1100 },
1101 TerminalStatusSnapshot {
1103 job_id: u64,
1105 state: JobState,
1107 },
1108 InvalidStatusSnapshotState {
1110 job_id: u64,
1112 state: JobState,
1114 },
1115 UnexpectedCompletionPhase {
1117 job_id: u64,
1119 state: JobState,
1121 },
1122 WorkerNotReady,
1124 ShutdownInProgress,
1126 DuplicateJobId(u64),
1128 UnknownJobId(u64),
1130 BootstrapFailed(String),
1132 UnexpectedDirection {
1134 op: String,
1136 },
1137}
1138
1139impl fmt::Display for WorkerChannelError {
1140 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1141 match self {
1142 Self::VersionMismatch { expected, actual } => {
1143 write!(
1144 f,
1145 "protocol version mismatch: expected {expected}, got {actual}"
1146 )
1147 }
1148 Self::ReplayHashMismatch { expected, actual } => {
1149 write!(f, "replay hash mismatch: expected {expected}, got {actual}")
1150 }
1151 Self::InboundSequenceNotFresh { last_seen, actual } => {
1152 write!(
1153 f,
1154 "inbound worker sequence is not fresh: saw {actual} after {last_seen}"
1155 )
1156 }
1157 Self::MissingWorkerSessionIdentity => {
1158 write!(
1159 f,
1160 "worker-originated envelope missing worker session identity"
1161 )
1162 }
1163 Self::UnexpectedWorkerSessionIdentity(worker_id) => {
1164 write!(
1165 f,
1166 "main-thread envelope unexpectedly carried worker session identity {worker_id}"
1167 )
1168 }
1169 Self::WorkerIdentityMismatch { expected, actual } => {
1170 write!(
1171 f,
1172 "worker identity mismatch: envelope carried {actual}, operation expects {expected}"
1173 )
1174 }
1175 Self::InboundWorkerSessionMismatch { expected, actual } => match expected {
1176 Some(expected) => write!(
1177 f,
1178 "inbound worker session mismatch: active session {expected}, got {actual}"
1179 ),
1180 None => write!(
1181 f,
1182 "inbound worker session mismatch: no active session, got {actual}"
1183 ),
1184 },
1185 Self::PayloadTooLarge { size, max } => {
1186 write!(
1187 f,
1188 "payload too large: {size} bytes exceeds {max} byte limit"
1189 )
1190 }
1191 Self::InvalidTransition { job_id, from, to } => {
1192 write!(f, "invalid job {job_id} transition: {from} → {to}")
1193 }
1194 Self::TerminalStatusSnapshot { job_id, state } => {
1195 write!(
1196 f,
1197 "job {job_id} reported terminal state {state} via status snapshot"
1198 )
1199 }
1200 Self::InvalidStatusSnapshotState { job_id, state } => {
1201 write!(
1202 f,
1203 "job {job_id} reported coordinator-owned state {state} via status snapshot"
1204 )
1205 }
1206 Self::UnexpectedCompletionPhase { job_id, state } => {
1207 write!(
1208 f,
1209 "job {job_id} reported completion while coordinator was in {state}"
1210 )
1211 }
1212 Self::WorkerNotReady => write!(f, "worker has not reported bootstrap readiness"),
1213 Self::ShutdownInProgress => write!(f, "shutdown already in progress"),
1214 Self::DuplicateJobId(id) => write!(f, "duplicate job id: {id}"),
1215 Self::UnknownJobId(id) => write!(f, "unknown job id: {id}"),
1216 Self::BootstrapFailed(reason) => write!(f, "worker bootstrap failed: {reason}"),
1217 Self::UnexpectedDirection { op } => {
1218 write!(f, "received outbound-only operation as inbound: {op}")
1219 }
1220 }
1221 }
1222}
1223
1224impl std::error::Error for WorkerChannelError {}
1225
1226#[cfg(test)]
1229mod tests {
1230 use super::*;
1231
1232 fn worker_envelope(
1233 worker_id: &str,
1234 message_id: u64,
1235 seq_no: u64,
1236 issued_at_turn: u64,
1237 op: WorkerOp,
1238 ) -> WorkerEnvelope {
1239 WorkerEnvelope::from_worker(worker_id, message_id, seq_no, 42, issued_at_turn, op)
1240 }
1241
1242 fn test_worker_envelope(
1243 message_id: u64,
1244 seq_no: u64,
1245 issued_at_turn: u64,
1246 op: WorkerOp,
1247 ) -> WorkerEnvelope {
1248 worker_envelope("test-worker-1", message_id, seq_no, issued_at_turn, op)
1249 }
1250
1251 fn bootstrap_ready_envelope(seq: u64) -> WorkerEnvelope {
1252 worker_envelope(
1253 "test-worker-1",
1254 seq,
1255 seq,
1256 0,
1257 WorkerOp::BootstrapReady {
1258 worker_id: "test-worker-1".into(),
1259 },
1260 )
1261 }
1262
1263 fn bootstrap_failed_envelope(seq: u64, worker_id: &str, reason: &str) -> WorkerEnvelope {
1264 worker_envelope(
1265 worker_id,
1266 seq,
1267 seq,
1268 0,
1269 WorkerOp::BootstrapFailed {
1270 worker_id: worker_id.into(),
1271 reason: reason.into(),
1272 },
1273 )
1274 }
1275
1276 #[test]
1277 fn coordinator_rejects_spawn_before_bootstrap() {
1278 let mut coord = WorkerCoordinator::new(42);
1279 let result = coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]);
1280 assert_eq!(result, Err(WorkerChannelError::WorkerNotReady));
1281 }
1282
1283 #[test]
1284 fn coordinator_accepts_spawn_after_bootstrap() {
1285 let mut coord = WorkerCoordinator::new(42);
1286 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1287 assert!(coord.is_worker_ready());
1288
1289 let job_id = coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
1290 assert_eq!(job_id, 1);
1291 assert_eq!(coord.job_state(1), Some(JobState::Queued));
1292 assert_eq!(coord.inflight_count(), 1);
1293
1294 let msg = coord.drain_outbox().unwrap();
1295 assert!(matches!(msg.op, WorkerOp::SpawnJob(_)));
1296 assert_eq!(msg.version, WORKER_PROTOCOL_VERSION);
1297 assert_eq!(msg.seed, 42);
1298 }
1299
1300 #[test]
1301 fn coordinator_tracks_full_job_lifecycle() {
1302 let mut coord = WorkerCoordinator::new(42);
1303 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1304 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1305 let _ = coord.drain_outbox(); let result_env = test_worker_envelope(
1310 2,
1311 2,
1312 1,
1313 WorkerOp::JobCompleted(JobResult {
1314 job_id: 1,
1315 outcome: JobOutcome::Ok { payload: vec![42] },
1316 }),
1317 );
1318 coord.handle_inbound(&result_env).unwrap();
1319 assert_eq!(coord.job_state(1), Some(JobState::Completed));
1320 assert_eq!(coord.inflight_count(), 0);
1321 assert_eq!(
1322 JobOutcome::Ok { payload: vec![42] }.payload_transfer(),
1323 Some(WorkerPayloadTransfer::StructuredClone)
1324 );
1325 }
1326
1327 #[test]
1328 fn coordinator_tracks_cancellation_lifecycle() {
1329 let mut coord = WorkerCoordinator::new(42);
1330 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1331 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1332 let _ = coord.drain_outbox();
1333
1334 let running = test_worker_envelope(
1335 2,
1336 2,
1337 1,
1338 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1339 job_id: 1,
1340 state: JobState::Running,
1341 detail: Some("worker accepted job".into()),
1342 }),
1343 );
1344 coord.handle_inbound(&running).unwrap();
1345 assert_eq!(coord.job_state(1), Some(JobState::Running));
1346
1347 coord.cancel_job(1, "test cancel".into()).unwrap();
1349 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1350 let cancel_msg = coord.drain_outbox().unwrap();
1351 assert!(matches!(cancel_msg.op, WorkerOp::CancelJob { .. }));
1352
1353 let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1355 coord.handle_inbound(&ack).unwrap();
1356 assert_eq!(coord.job_state(1), Some(JobState::Draining));
1357 let drain_request = coord.drain_outbox().unwrap();
1358 assert!(matches!(drain_request.op, WorkerOp::DrainJob { job_id: 1 }));
1359
1360 let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1362 coord.handle_inbound(&drain).unwrap();
1363 assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
1364 let finalize_request = coord.drain_outbox().unwrap();
1365 assert!(matches!(
1366 finalize_request.op,
1367 WorkerOp::FinalizeJob { job_id: 1 }
1368 ));
1369
1370 let finalize = test_worker_envelope(5, 5, 4, WorkerOp::FinalizeCompleted { job_id: 1 });
1372 coord.handle_inbound(&finalize).unwrap();
1373 assert_eq!(coord.job_state(1), Some(JobState::Completed));
1374 }
1375
1376 #[test]
1377 fn coordinator_rejects_terminal_status_snapshot() {
1378 let mut coord = WorkerCoordinator::new(42);
1379 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1380 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1381 let _ = coord.drain_outbox();
1382
1383 let snapshot = test_worker_envelope(
1384 2,
1385 2,
1386 1,
1387 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1388 job_id: 1,
1389 state: JobState::Completed,
1390 detail: Some("terminal snapshots must use job_completed".into()),
1391 }),
1392 );
1393 assert!(matches!(
1394 coord.handle_inbound(&snapshot),
1395 Err(WorkerChannelError::TerminalStatusSnapshot {
1396 job_id: 1,
1397 state: JobState::Completed,
1398 })
1399 ));
1400 assert_eq!(coord.job_state(1), Some(JobState::Queued));
1401 }
1402
1403 #[test]
1404 fn coordinator_rejects_status_snapshot_for_protocol_owned_drain_phase() {
1405 let mut coord = WorkerCoordinator::new(42);
1406 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1407 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1408 let _ = coord.drain_outbox();
1409
1410 let running = test_worker_envelope(
1411 2,
1412 2,
1413 1,
1414 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1415 job_id: 1,
1416 state: JobState::Running,
1417 detail: None,
1418 }),
1419 );
1420 coord.handle_inbound(&running).unwrap();
1421
1422 coord.cancel_job(1, "test cancel".into()).unwrap();
1423 let _ = coord.drain_outbox();
1424 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1425
1426 let draining_snapshot = test_worker_envelope(
1427 3,
1428 3,
1429 2,
1430 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1431 job_id: 1,
1432 state: JobState::Draining,
1433 detail: Some("worker tried to skip cancel_acknowledged".into()),
1434 }),
1435 );
1436 assert!(matches!(
1437 coord.handle_inbound(&draining_snapshot),
1438 Err(WorkerChannelError::InvalidStatusSnapshotState {
1439 job_id: 1,
1440 state: JobState::Draining,
1441 })
1442 ));
1443 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1444 }
1445
1446 #[test]
1447 fn coordinator_rejects_status_snapshot_for_protocol_owned_finalize_phase() {
1448 let mut coord = WorkerCoordinator::new(42);
1449 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1450 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1451 let _ = coord.drain_outbox();
1452
1453 let running = test_worker_envelope(
1454 2,
1455 2,
1456 1,
1457 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1458 job_id: 1,
1459 state: JobState::Running,
1460 detail: None,
1461 }),
1462 );
1463 coord.handle_inbound(&running).unwrap();
1464
1465 coord.cancel_job(1, "test cancel".into()).unwrap();
1466 let _ = coord.drain_outbox();
1467
1468 let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1469 coord.handle_inbound(&ack).unwrap();
1470 let _ = coord.drain_outbox();
1471 assert_eq!(coord.job_state(1), Some(JobState::Draining));
1472
1473 let finalizing_snapshot = test_worker_envelope(
1474 4,
1475 4,
1476 3,
1477 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1478 job_id: 1,
1479 state: JobState::Finalizing,
1480 detail: Some("worker tried to skip drain_completed".into()),
1481 }),
1482 );
1483 assert!(matches!(
1484 coord.handle_inbound(&finalizing_snapshot),
1485 Err(WorkerChannelError::InvalidStatusSnapshotState {
1486 job_id: 1,
1487 state: JobState::Finalizing,
1488 })
1489 ));
1490 assert_eq!(coord.job_state(1), Some(JobState::Draining));
1491 }
1492
1493 #[test]
1494 fn coordinator_rejects_job_completed_while_cancellation_protocol_active() {
1495 let mut coord = WorkerCoordinator::new(42);
1496 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1497 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1498 let _ = coord.drain_outbox();
1499
1500 let running = test_worker_envelope(
1501 2,
1502 2,
1503 1,
1504 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1505 job_id: 1,
1506 state: JobState::Running,
1507 detail: None,
1508 }),
1509 );
1510 coord.handle_inbound(&running).unwrap();
1511
1512 coord.cancel_job(1, "test cancel".into()).unwrap();
1513 let _ = coord.drain_outbox();
1514
1515 let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1516 coord.handle_inbound(&ack).unwrap();
1517 let _ = coord.drain_outbox();
1518
1519 let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1520 coord.handle_inbound(&drain).unwrap();
1521 let _ = coord.drain_outbox();
1522
1523 let completed = test_worker_envelope(
1524 5,
1525 5,
1526 4,
1527 WorkerOp::JobCompleted(JobResult {
1528 job_id: 1,
1529 outcome: JobOutcome::Cancelled {
1530 reason: "worker skipped finalize".into(),
1531 },
1532 }),
1533 );
1534 assert!(matches!(
1535 coord.handle_inbound(&completed),
1536 Err(WorkerChannelError::UnexpectedCompletionPhase {
1537 job_id: 1,
1538 state: JobState::Finalizing,
1539 })
1540 ));
1541 assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
1542 }
1543
1544 #[test]
1545 fn coordinator_accepts_job_completed_racing_with_cancel_request() {
1546 let mut coord = WorkerCoordinator::new(42);
1547 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1548 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1549 let _ = coord.drain_outbox();
1550
1551 let running = test_worker_envelope(
1552 2,
1553 2,
1554 1,
1555 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1556 job_id: 1,
1557 state: JobState::Running,
1558 detail: Some("worker accepted job".into()),
1559 }),
1560 );
1561 coord.handle_inbound(&running).unwrap();
1562
1563 coord.cancel_job(1, "test cancel".into()).unwrap();
1564 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1565 let cancel_msg = coord.drain_outbox().unwrap();
1566 assert!(matches!(cancel_msg.op, WorkerOp::CancelJob { .. }));
1567
1568 let completed = test_worker_envelope(
1569 3,
1570 3,
1571 2,
1572 WorkerOp::JobCompleted(JobResult {
1573 job_id: 1,
1574 outcome: JobOutcome::Ok { payload: vec![7] },
1575 }),
1576 );
1577 coord.handle_inbound(&completed).unwrap();
1578 assert_eq!(coord.job_state(1), Some(JobState::Completed));
1579 assert_eq!(coord.inflight_count(), 0);
1580 assert!(coord.drain_outbox().is_none());
1581 }
1582
1583 #[test]
1584 fn coordinator_rejects_invalid_transition() {
1585 let mut coord = WorkerCoordinator::new(42);
1586 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1587 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1588 let _ = coord.drain_outbox();
1589
1590 coord.cancel_job(1, "cancel before running".into()).unwrap();
1591 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1592 let cancel = coord.drain_outbox().unwrap();
1593 assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
1594 }
1595
1596 #[test]
1597 fn coordinator_allows_cancel_before_running_snapshot() {
1598 let mut coord = WorkerCoordinator::new(42);
1599 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1600 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1601
1602 let spawn = coord.drain_outbox().unwrap();
1603 assert!(matches!(spawn.op, WorkerOp::SpawnJob(_)));
1604
1605 coord
1606 .cancel_job(1, "cancel before worker starts".into())
1607 .unwrap();
1608 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1609
1610 let cancel = coord.drain_outbox().unwrap();
1611 assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
1612 }
1613
1614 #[test]
1615 fn coordinator_rejects_oversized_payload() {
1616 let mut coord = WorkerCoordinator::new(42);
1617 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1618 let big_payload = vec![0u8; MAX_PAYLOAD_BYTES + 1];
1619 let result = coord.spawn_job(1, 100, 200, 300, big_payload);
1620 assert!(matches!(
1621 result,
1622 Err(WorkerChannelError::PayloadTooLarge { .. })
1623 ));
1624 }
1625
1626 #[test]
1627 fn coordinator_rejects_duplicate_job_id() {
1628 let mut coord = WorkerCoordinator::new(42);
1629 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1630 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1631 let result = coord.spawn_job(1, 100, 200, 300, vec![]);
1632 assert_eq!(result, Err(WorkerChannelError::DuplicateJobId(1)));
1633 }
1634
1635 #[test]
1636 fn coordinator_shutdown_lifecycle() {
1637 let mut coord = WorkerCoordinator::new(42);
1638 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1639
1640 coord.request_shutdown("test shutdown".into()).unwrap();
1641 assert!(coord.is_shutdown_requested());
1642
1643 let msg = coord.drain_outbox().unwrap();
1644 assert!(matches!(msg.op, WorkerOp::ShutdownWorker { .. }));
1645
1646 let result = coord.spawn_job(1, 100, 200, 300, vec![]);
1648 assert_eq!(result, Err(WorkerChannelError::ShutdownInProgress));
1649
1650 let done = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
1652 coord.handle_inbound(&done).unwrap();
1653 assert!(!coord.is_shutdown_requested());
1654 assert!(!coord.is_worker_ready());
1655 assert_eq!(
1656 coord.spawn_job(1, 100, 200, 300, vec![]),
1657 Err(WorkerChannelError::WorkerNotReady)
1658 );
1659 }
1660
1661 #[test]
1662 fn coordinator_request_shutdown_discards_queued_spawn_before_shutdown() {
1663 let mut coord = WorkerCoordinator::new(42);
1664 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1665 coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
1666
1667 coord.request_shutdown("shutdown now".into()).unwrap();
1668
1669 let shutdown = coord.drain_outbox().unwrap();
1670 assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1671 assert!(coord.drain_outbox().is_none());
1672 }
1673
1674 #[test]
1675 fn coordinator_request_shutdown_discards_queued_poll_and_cancel_before_shutdown() {
1676 let mut coord = WorkerCoordinator::new(42);
1677 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1678 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1679 let _ = coord.drain_outbox();
1680
1681 let running = test_worker_envelope(
1682 2,
1683 2,
1684 1,
1685 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1686 job_id: 1,
1687 state: JobState::Running,
1688 detail: Some("worker accepted job".into()),
1689 }),
1690 );
1691 coord.handle_inbound(&running).unwrap();
1692
1693 coord.poll_status(1).unwrap();
1694 coord
1695 .cancel_job(1, "shutdown supersedes cancel".into())
1696 .unwrap();
1697 coord.request_shutdown("shutdown now".into()).unwrap();
1698
1699 let shutdown = coord.drain_outbox().unwrap();
1700 assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1701 assert!(coord.drain_outbox().is_none());
1702 }
1703
1704 #[test]
1705 fn coordinator_rejects_cancel_after_shutdown_requested_without_mutating_job() {
1706 let mut coord = WorkerCoordinator::new(42);
1707 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1708 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1709 let _ = coord.drain_outbox();
1710
1711 let running = test_worker_envelope(
1712 2,
1713 2,
1714 1,
1715 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1716 job_id: 1,
1717 state: JobState::Running,
1718 detail: Some("worker accepted job".into()),
1719 }),
1720 );
1721 coord.handle_inbound(&running).unwrap();
1722
1723 coord.request_shutdown("shutdown now".into()).unwrap();
1724 assert_eq!(
1725 coord.cancel_job(1, "too late".into()),
1726 Err(WorkerChannelError::ShutdownInProgress)
1727 );
1728 assert_eq!(coord.job_state(1), Some(JobState::Running));
1729
1730 let shutdown = coord.drain_outbox().unwrap();
1731 assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1732 assert!(coord.drain_outbox().is_none());
1733 }
1734
1735 #[test]
1736 fn coordinator_rejects_poll_after_shutdown_requested_without_enqueuing_follow_up() {
1737 let mut coord = WorkerCoordinator::new(42);
1738 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1739 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1740 let _ = coord.drain_outbox();
1741
1742 let running = test_worker_envelope(
1743 2,
1744 2,
1745 1,
1746 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1747 job_id: 1,
1748 state: JobState::Running,
1749 detail: Some("worker accepted job".into()),
1750 }),
1751 );
1752 coord.handle_inbound(&running).unwrap();
1753
1754 coord.request_shutdown("shutdown now".into()).unwrap();
1755 assert_eq!(
1756 coord.poll_status(1),
1757 Err(WorkerChannelError::ShutdownInProgress)
1758 );
1759 assert_eq!(coord.job_state(1), Some(JobState::Running));
1760
1761 let shutdown = coord.drain_outbox().unwrap();
1762 assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1763 assert!(coord.drain_outbox().is_none());
1764 }
1765
1766 #[test]
1767 fn coordinator_shutdown_supersedes_cancel_acknowledged_follow_up() {
1768 let mut coord = WorkerCoordinator::new(42);
1769 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1770 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1771 let _ = coord.drain_outbox();
1772
1773 let running = test_worker_envelope(
1774 2,
1775 2,
1776 1,
1777 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1778 job_id: 1,
1779 state: JobState::Running,
1780 detail: Some("worker accepted job".into()),
1781 }),
1782 );
1783 coord.handle_inbound(&running).unwrap();
1784
1785 coord.cancel_job(1, "begin cancel".into()).unwrap();
1786 let cancel = coord.drain_outbox().unwrap();
1787 assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
1788 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1789
1790 coord.request_shutdown("shutdown now".into()).unwrap();
1791 assert!(coord.is_shutdown_requested());
1792
1793 let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1794 coord.handle_inbound(&ack).unwrap();
1795
1796 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1797 let shutdown = coord.drain_outbox().unwrap();
1798 assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1799 assert!(coord.drain_outbox().is_none());
1800 }
1801
1802 #[test]
1803 fn coordinator_shutdown_supersedes_drain_completed_follow_up() {
1804 let mut coord = WorkerCoordinator::new(42);
1805 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1806 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1807 let _ = coord.drain_outbox();
1808
1809 let running = test_worker_envelope(
1810 2,
1811 2,
1812 1,
1813 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1814 job_id: 1,
1815 state: JobState::Running,
1816 detail: Some("worker accepted job".into()),
1817 }),
1818 );
1819 coord.handle_inbound(&running).unwrap();
1820
1821 coord.cancel_job(1, "begin cancel".into()).unwrap();
1822 let _ = coord.drain_outbox();
1823
1824 let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1825 coord.handle_inbound(&ack).unwrap();
1826 let drain = coord.drain_outbox().unwrap();
1827 assert!(matches!(drain.op, WorkerOp::DrainJob { job_id: 1 }));
1828 assert_eq!(coord.job_state(1), Some(JobState::Draining));
1829
1830 coord.request_shutdown("shutdown now".into()).unwrap();
1831 assert!(coord.is_shutdown_requested());
1832
1833 let drain_completed = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1834 coord.handle_inbound(&drain_completed).unwrap();
1835
1836 assert_eq!(coord.job_state(1), Some(JobState::Draining));
1837 let shutdown = coord.drain_outbox().unwrap();
1838 assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1839 assert!(coord.drain_outbox().is_none());
1840 }
1841
1842 #[test]
1843 fn coordinator_marks_nonterminal_job_failed_when_shutdown_completes() {
1844 let mut coord = WorkerCoordinator::new(42);
1845 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1846 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1847 let _ = coord.drain_outbox();
1848
1849 let running = test_worker_envelope(
1850 2,
1851 2,
1852 1,
1853 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1854 job_id: 1,
1855 state: JobState::Running,
1856 detail: Some("worker started".into()),
1857 }),
1858 );
1859 coord.handle_inbound(&running).unwrap();
1860
1861 coord.cancel_job(1, "shutdown".into()).unwrap();
1862 let _ = coord.drain_outbox();
1863
1864 let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1865 coord.handle_inbound(&ack).unwrap();
1866 let _ = coord.drain_outbox();
1867
1868 let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1869 coord.handle_inbound(&drain).unwrap();
1870 let _ = coord.drain_outbox();
1871 assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
1872 assert_eq!(coord.inflight_count(), 1);
1873
1874 let shutdown = test_worker_envelope(5, 5, 4, WorkerOp::ShutdownCompleted);
1875 coord.handle_inbound(&shutdown).unwrap();
1876 assert_eq!(coord.job_state(1), Some(JobState::Failed));
1877 assert_eq!(coord.inflight_count(), 0);
1878 assert!(!coord.is_worker_ready());
1879 }
1880
1881 #[test]
1882 fn coordinator_rejects_wrong_direction_messages() {
1883 let mut coord = WorkerCoordinator::new(42);
1884 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1885
1886 let bad = WorkerEnvelope::new(
1888 2,
1889 2,
1890 42,
1891 1,
1892 WorkerOp::SpawnJob(SpawnJobRequest {
1893 job_id: 1,
1894 region_id: 100,
1895 task_id: 200,
1896 obligation_id: 300,
1897 payload: vec![],
1898 }),
1899 );
1900 let result = coord.handle_inbound(&bad);
1901 assert!(matches!(
1902 result,
1903 Err(WorkerChannelError::UnexpectedDirection { .. })
1904 ));
1905 }
1906
1907 #[test]
1908 fn coordinator_polls_status_and_applies_snapshot() {
1909 let mut coord = WorkerCoordinator::new(42);
1910 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1911 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1912 let _ = coord.drain_outbox();
1913
1914 coord.poll_status(1).unwrap();
1915 let poll = coord.drain_outbox().unwrap();
1916 assert!(matches!(poll.op, WorkerOp::PollStatus { job_id: 1 }));
1917
1918 let snapshot = test_worker_envelope(
1919 3,
1920 3,
1921 2,
1922 WorkerOp::StatusSnapshot(JobStatusSnapshot {
1923 job_id: 1,
1924 state: JobState::Running,
1925 detail: None,
1926 }),
1927 );
1928 coord.handle_inbound(&snapshot).unwrap();
1929 assert_eq!(coord.job_state(1), Some(JobState::Running));
1930 }
1931
1932 #[test]
1933 fn coordinator_rejects_duplicate_inbound_sequence() {
1934 let mut coord = WorkerCoordinator::new(42);
1935 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1936
1937 let diag = test_worker_envelope(
1938 2,
1939 2,
1940 1,
1941 WorkerOp::Diagnostic(DiagnosticEvent {
1942 level: DiagnosticLevel::Info,
1943 category: "lifecycle".into(),
1944 message: "worker initialized".into(),
1945 metadata: None,
1946 }),
1947 );
1948 coord.handle_inbound(&diag).unwrap();
1949 assert_eq!(coord.last_inbound_seq_no, 2);
1950
1951 let duplicate = test_worker_envelope(3, 2, 2, WorkerOp::ShutdownCompleted);
1952 assert_eq!(
1953 coord.handle_inbound(&duplicate),
1954 Err(WorkerChannelError::InboundSequenceNotFresh {
1955 last_seen: 2,
1956 actual: 2,
1957 })
1958 );
1959 assert_eq!(coord.last_inbound_seq_no, 2);
1960 assert!(coord.is_worker_ready());
1961 }
1962
1963 #[test]
1964 fn coordinator_accepts_fresh_bootstrap_after_shutdown_completed() {
1965 let mut coord = WorkerCoordinator::new(42);
1966 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1967 coord.request_shutdown("done".into()).unwrap();
1968 let _ = coord.drain_outbox();
1969
1970 let shutdown = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
1971 coord.handle_inbound(&shutdown).unwrap();
1972 assert_eq!(coord.last_inbound_seq_no, 2);
1973 assert!(!coord.is_worker_ready());
1974
1975 let reboot = worker_envelope(
1976 "test-worker-2",
1977 1,
1978 1,
1979 0,
1980 WorkerOp::BootstrapReady {
1981 worker_id: "test-worker-2".into(),
1982 },
1983 );
1984 coord.handle_inbound(&reboot).unwrap();
1985 assert!(coord.is_worker_ready());
1986 assert_eq!(coord.last_inbound_seq_no, 1);
1987 }
1988
1989 #[test]
1990 fn coordinator_marks_running_job_failed_when_worker_instance_changes() {
1991 let mut coord = WorkerCoordinator::new(42);
1992 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1993 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1994 let _ = coord.drain_outbox();
1995
1996 let running = test_worker_envelope(
1997 2,
1998 2,
1999 1,
2000 WorkerOp::StatusSnapshot(JobStatusSnapshot {
2001 job_id: 1,
2002 state: JobState::Running,
2003 detail: Some("worker one accepted job".into()),
2004 }),
2005 );
2006 coord.handle_inbound(&running).unwrap();
2007 assert_eq!(coord.job_state(1), Some(JobState::Running));
2008 assert_eq!(coord.inflight_count(), 1);
2009
2010 let reboot = worker_envelope(
2011 "test-worker-2",
2012 1,
2013 1,
2014 0,
2015 WorkerOp::BootstrapReady {
2016 worker_id: "test-worker-2".into(),
2017 },
2018 );
2019 coord.handle_inbound(&reboot).unwrap();
2020 assert!(coord.is_worker_ready());
2021 assert_eq!(coord.last_inbound_seq_no, 1);
2022 assert_eq!(coord.job_state(1), Some(JobState::Failed));
2023 assert_eq!(coord.inflight_count(), 0);
2024 }
2025
2026 #[test]
2027 fn coordinator_discards_queued_spawn_when_worker_instance_changes() {
2028 let mut coord = WorkerCoordinator::new(42);
2029 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2030 coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
2031
2032 let reboot = worker_envelope(
2033 "test-worker-2",
2034 1,
2035 1,
2036 0,
2037 WorkerOp::BootstrapReady {
2038 worker_id: "test-worker-2".into(),
2039 },
2040 );
2041 coord.handle_inbound(&reboot).unwrap();
2042
2043 assert_eq!(coord.job_state(1), Some(JobState::Failed));
2044 assert!(coord.drain_outbox().is_none());
2045
2046 coord.spawn_job(2, 100, 201, 301, vec![9]).unwrap();
2047 let fresh_spawn = coord.drain_outbox().unwrap();
2048 match fresh_spawn.op {
2049 WorkerOp::SpawnJob(request) => assert_eq!(request.job_id, 2),
2050 other => panic!("expected fresh spawn after reboot, got {other:?}"),
2051 }
2052 }
2053
2054 #[test]
2055 fn coordinator_discards_queued_cancel_when_worker_instance_changes() {
2056 let mut coord = WorkerCoordinator::new(42);
2057 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2058 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2059 let _ = coord.drain_outbox();
2060
2061 let running = test_worker_envelope(
2062 2,
2063 2,
2064 1,
2065 WorkerOp::StatusSnapshot(JobStatusSnapshot {
2066 job_id: 1,
2067 state: JobState::Running,
2068 detail: Some("worker accepted job".into()),
2069 }),
2070 );
2071 coord.handle_inbound(&running).unwrap();
2072
2073 coord.cancel_job(1, "worker replaced".into()).unwrap();
2074 assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
2075
2076 let reboot = worker_envelope(
2077 "test-worker-2",
2078 1,
2079 1,
2080 0,
2081 WorkerOp::BootstrapReady {
2082 worker_id: "test-worker-2".into(),
2083 },
2084 );
2085 coord.handle_inbound(&reboot).unwrap();
2086
2087 assert_eq!(coord.job_state(1), Some(JobState::Failed));
2088 assert!(coord.drain_outbox().is_none());
2089 }
2090
2091 #[test]
2092 fn coordinator_discards_queued_finalize_when_shutdown_completes() {
2093 let mut coord = WorkerCoordinator::new(42);
2094 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2095 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2096 let _ = coord.drain_outbox();
2097
2098 let running = test_worker_envelope(
2099 2,
2100 2,
2101 1,
2102 WorkerOp::StatusSnapshot(JobStatusSnapshot {
2103 job_id: 1,
2104 state: JobState::Running,
2105 detail: Some("worker started".into()),
2106 }),
2107 );
2108 coord.handle_inbound(&running).unwrap();
2109
2110 coord.cancel_job(1, "shutdown".into()).unwrap();
2111 let _ = coord.drain_outbox();
2112
2113 let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
2114 coord.handle_inbound(&ack).unwrap();
2115 let _ = coord.drain_outbox();
2116
2117 let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
2118 coord.handle_inbound(&drain).unwrap();
2119 assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
2120
2121 let pending_finalize = coord.drain_outbox().unwrap();
2122 assert!(matches!(
2123 pending_finalize.op,
2124 WorkerOp::FinalizeJob { job_id: 1 }
2125 ));
2126
2127 coord
2128 .enqueue_job_message(1, WorkerOp::FinalizeJob { job_id: 1 })
2129 .unwrap();
2130 let shutdown = test_worker_envelope(5, 5, 4, WorkerOp::ShutdownCompleted);
2131 coord.handle_inbound(&shutdown).unwrap();
2132
2133 assert_eq!(coord.job_state(1), Some(JobState::Failed));
2134 assert!(coord.drain_outbox().is_none());
2135 }
2136
2137 #[test]
2138 fn coordinator_accepts_fresh_bootstrap_after_bootstrap_failed() {
2139 let mut coord = WorkerCoordinator::new(42);
2140
2141 let failed = bootstrap_failed_envelope(1, "failed-worker-1", "synthetic boot failure");
2142 assert_eq!(
2143 coord.handle_inbound(&failed),
2144 Err(WorkerChannelError::BootstrapFailed(
2145 "synthetic boot failure".into()
2146 ))
2147 );
2148 assert_eq!(coord.last_inbound_seq_no, 1);
2149 assert!(!coord.is_worker_ready());
2150
2151 let retry = worker_envelope(
2152 "test-worker-2",
2153 1,
2154 1,
2155 0,
2156 WorkerOp::BootstrapReady {
2157 worker_id: "test-worker-2".into(),
2158 },
2159 );
2160 coord.handle_inbound(&retry).unwrap();
2161 assert!(coord.is_worker_ready());
2162 assert_eq!(coord.last_inbound_seq_no, 1);
2163 }
2164
2165 #[test]
2166 fn coordinator_accepts_fresh_bootstrap_failed_after_live_session() {
2167 let mut coord = WorkerCoordinator::new(42);
2168 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2169 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2170 let _ = coord.drain_outbox();
2171
2172 let running = test_worker_envelope(
2173 2,
2174 2,
2175 1,
2176 WorkerOp::StatusSnapshot(JobStatusSnapshot {
2177 job_id: 1,
2178 state: JobState::Running,
2179 detail: Some("worker one accepted job".into()),
2180 }),
2181 );
2182 coord.handle_inbound(&running).unwrap();
2183
2184 let failed = bootstrap_failed_envelope(1, "failed-worker-2", "reboot failed to init");
2185 assert_eq!(
2186 coord.handle_inbound(&failed),
2187 Err(WorkerChannelError::BootstrapFailed(
2188 "reboot failed to init".into()
2189 ))
2190 );
2191 assert_eq!(coord.last_inbound_seq_no, 1);
2192 assert!(!coord.is_worker_ready());
2193 assert_eq!(coord.job_state(1), Some(JobState::Failed));
2194 assert_eq!(coord.inflight_count(), 0);
2195 }
2196
2197 #[test]
2198 fn coordinator_keeps_prior_high_water_mark_until_rebootstrap() {
2199 let mut coord = WorkerCoordinator::new(42);
2200 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2201 coord.request_shutdown("done".into()).unwrap();
2202 let _ = coord.drain_outbox();
2203
2204 let shutdown = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
2205 coord.handle_inbound(&shutdown).unwrap();
2206 assert_eq!(coord.last_inbound_seq_no, 2);
2207 assert!(!coord.is_worker_ready());
2208
2209 let stale = test_worker_envelope(
2210 1,
2211 1,
2212 0,
2213 WorkerOp::Diagnostic(DiagnosticEvent {
2214 level: DiagnosticLevel::Info,
2215 category: "lifecycle".into(),
2216 message: "stale pre-restart message".into(),
2217 metadata: None,
2218 }),
2219 );
2220 assert_eq!(
2221 coord.handle_inbound(&stale),
2222 Err(WorkerChannelError::InboundWorkerSessionMismatch {
2223 expected: None,
2224 actual: "test-worker-1".into(),
2225 })
2226 );
2227 assert_eq!(coord.last_inbound_seq_no, 2);
2228
2229 let reboot = worker_envelope(
2230 "test-worker-2",
2231 1,
2232 1,
2233 0,
2234 WorkerOp::BootstrapReady {
2235 worker_id: "test-worker-2".into(),
2236 },
2237 );
2238 coord.handle_inbound(&reboot).unwrap();
2239 assert!(coord.is_worker_ready());
2240 assert_eq!(coord.last_inbound_seq_no, 1);
2241 }
2242
2243 #[test]
2244 fn coordinator_rejects_replayed_bootstrap_from_same_worker_session() {
2245 let mut coord = WorkerCoordinator::new(42);
2246
2247 let ready = worker_envelope(
2248 "stable-worker",
2249 5,
2250 5,
2251 0,
2252 WorkerOp::BootstrapReady {
2253 worker_id: "stable-worker".into(),
2254 },
2255 );
2256 coord.handle_inbound(&ready).unwrap();
2257
2258 let replay = worker_envelope(
2259 "stable-worker",
2260 1,
2261 1,
2262 1,
2263 WorkerOp::BootstrapReady {
2264 worker_id: "stable-worker".into(),
2265 },
2266 );
2267 assert_eq!(
2268 coord.handle_inbound(&replay),
2269 Err(WorkerChannelError::InboundSequenceNotFresh {
2270 last_seen: 5,
2271 actual: 1,
2272 })
2273 );
2274 }
2275
2276 #[test]
2277 fn coordinator_rejects_replayed_bootstrap_failed_from_same_worker_session() {
2278 let mut coord = WorkerCoordinator::new(42);
2279
2280 let ready = worker_envelope(
2281 "stable-worker",
2282 5,
2283 5,
2284 0,
2285 WorkerOp::BootstrapReady {
2286 worker_id: "stable-worker".into(),
2287 },
2288 );
2289 coord.handle_inbound(&ready).unwrap();
2290
2291 let replay = bootstrap_failed_envelope(1, "stable-worker", "stale failure replay");
2292 assert_eq!(
2293 coord.handle_inbound(&replay),
2294 Err(WorkerChannelError::InboundSequenceNotFresh {
2295 last_seen: 5,
2296 actual: 1,
2297 })
2298 );
2299 assert!(coord.is_worker_ready());
2300 }
2301
2302 #[test]
2303 fn coordinator_rejects_out_of_order_job_message_sequence() {
2304 let mut coord = WorkerCoordinator::new(42);
2305 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2306 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2307 let _ = coord.drain_outbox();
2308
2309 let running = test_worker_envelope(
2310 2,
2311 2,
2312 1,
2313 WorkerOp::StatusSnapshot(JobStatusSnapshot {
2314 job_id: 1,
2315 state: JobState::Running,
2316 detail: None,
2317 }),
2318 );
2319 coord.handle_inbound(&running).unwrap();
2320
2321 let diag = test_worker_envelope(
2322 3,
2323 4,
2324 2,
2325 WorkerOp::Diagnostic(DiagnosticEvent {
2326 level: DiagnosticLevel::Info,
2327 category: "scheduler".into(),
2328 message: "worker tick".into(),
2329 metadata: None,
2330 }),
2331 );
2332 coord.handle_inbound(&diag).unwrap();
2333
2334 let stale_completion = test_worker_envelope(
2335 4,
2336 3,
2337 3,
2338 WorkerOp::JobCompleted(JobResult {
2339 job_id: 1,
2340 outcome: JobOutcome::Ok { payload: vec![7] },
2341 }),
2342 );
2343 assert_eq!(
2344 coord.handle_inbound(&stale_completion),
2345 Err(WorkerChannelError::InboundSequenceNotFresh {
2346 last_seen: 4,
2347 actual: 3,
2348 })
2349 );
2350 assert_eq!(coord.last_inbound_seq_no, 4);
2351 assert_eq!(coord.job_state(1), Some(JobState::Running));
2352 }
2353
2354 #[test]
2355 fn envelope_validates_version() {
2356 let mut env = worker_envelope(
2357 "w",
2358 1,
2359 1,
2360 0,
2361 WorkerOp::BootstrapReady {
2362 worker_id: "w".into(),
2363 },
2364 );
2365 assert!(env.validate().is_ok());
2366
2367 env.version = 99;
2368 assert!(matches!(
2369 env.validate(),
2370 Err(WorkerChannelError::VersionMismatch { .. })
2371 ));
2372 }
2373
2374 #[test]
2375 fn envelope_validates_replay_metadata() {
2376 let mut env = worker_envelope(
2377 "w",
2378 1,
2379 1,
2380 0,
2381 WorkerOp::BootstrapReady {
2382 worker_id: "w".into(),
2383 },
2384 );
2385 assert!(env.validate().is_ok());
2386
2387 env.decision_seq = 2;
2388 env.replay_hash = replay_hash(
2389 env.message_id,
2390 env.seq_no,
2391 env.decision_seq,
2392 env.seed,
2393 env.issued_at_turn,
2394 env.worker_id.as_deref(),
2395 &env.op,
2396 );
2397 assert!(env.validate().is_ok());
2398
2399 env.op = WorkerOp::ShutdownCompleted;
2400 assert!(matches!(
2401 env.validate(),
2402 Err(WorkerChannelError::ReplayHashMismatch { .. })
2403 ));
2404 }
2405
2406 #[test]
2407 fn envelope_validates_payload_size() {
2408 let env = WorkerEnvelope::new(
2409 1,
2410 1,
2411 42,
2412 0,
2413 WorkerOp::SpawnJob(SpawnJobRequest {
2414 job_id: 1,
2415 region_id: 100,
2416 task_id: 200,
2417 obligation_id: 300,
2418 payload: vec![0u8; MAX_PAYLOAD_BYTES + 1],
2419 }),
2420 );
2421 assert!(matches!(
2422 env.validate(),
2423 Err(WorkerChannelError::PayloadTooLarge { .. })
2424 ));
2425
2426 let completed = test_worker_envelope(
2427 2,
2428 2,
2429 0,
2430 WorkerOp::JobCompleted(JobResult {
2431 job_id: 1,
2432 outcome: JobOutcome::Ok {
2433 payload: vec![0u8; MAX_PAYLOAD_BYTES + 1],
2434 },
2435 }),
2436 );
2437 assert!(matches!(
2438 completed.validate(),
2439 Err(WorkerChannelError::PayloadTooLarge { .. })
2440 ));
2441 }
2442
2443 #[test]
2444 fn job_state_transitions_are_correct() {
2445 assert!(JobState::Created.can_transition_to(JobState::Queued));
2447 assert!(JobState::Created.can_transition_to(JobState::Failed));
2448 assert!(JobState::Queued.can_transition_to(JobState::Running));
2449 assert!(JobState::Queued.can_transition_to(JobState::CancelRequested));
2450 assert!(JobState::Queued.can_transition_to(JobState::Failed));
2451 assert!(JobState::Running.can_transition_to(JobState::Completed));
2452 assert!(JobState::Running.can_transition_to(JobState::CancelRequested));
2453 assert!(JobState::Running.can_transition_to(JobState::Failed));
2454 assert!(JobState::CancelRequested.can_transition_to(JobState::Completed));
2455 assert!(JobState::CancelRequested.can_transition_to(JobState::Draining));
2456 assert!(JobState::CancelRequested.can_transition_to(JobState::Failed));
2457 assert!(JobState::Draining.can_transition_to(JobState::Finalizing));
2458 assert!(JobState::Draining.can_transition_to(JobState::Failed));
2459 assert!(JobState::Finalizing.can_transition_to(JobState::Completed));
2460 assert!(JobState::Finalizing.can_transition_to(JobState::Failed));
2461
2462 assert!(JobState::Queued.can_transition_to(JobState::Completed));
2463
2464 assert!(!JobState::Created.can_transition_to(JobState::Running));
2466 assert!(!JobState::Created.can_transition_to(JobState::Completed));
2467 assert!(!JobState::Draining.can_transition_to(JobState::Completed));
2468 assert!(!JobState::Completed.can_transition_to(JobState::Running));
2469 }
2470
2471 #[test]
2472 fn envelope_serialization_round_trip() {
2473 let env = WorkerEnvelope::new(
2474 1,
2475 1,
2476 42,
2477 0,
2478 WorkerOp::SpawnJob(SpawnJobRequest {
2479 job_id: 1,
2480 region_id: 100,
2481 task_id: 200,
2482 obligation_id: 300,
2483 payload: vec![1, 2, 3],
2484 }),
2485 );
2486 let json = serde_json::to_string(&env).unwrap();
2487 let deserialized: WorkerEnvelope = serde_json::from_str(&json).unwrap();
2488 assert_eq!(env, deserialized);
2489 }
2490
2491 #[test]
2492 fn worker_envelope_serialization_round_trip_preserves_worker_identity() {
2493 let env = worker_envelope(
2494 "worker-round-trip",
2495 7,
2496 9,
2497 3,
2498 WorkerOp::Diagnostic(DiagnosticEvent {
2499 level: DiagnosticLevel::Info,
2500 category: "serde".into(),
2501 message: "round-trip".into(),
2502 metadata: Some("worker identity must survive serialization".into()),
2503 }),
2504 );
2505 let json = serde_json::to_string(&env).unwrap();
2506 let deserialized: WorkerEnvelope = serde_json::from_str(&json).unwrap();
2507 assert_eq!(env, deserialized);
2508 assert!(deserialized.validate().is_ok());
2509 }
2510
2511 #[test]
2512 fn coordinator_sequence_numbers_are_monotonic() {
2513 let mut coord = WorkerCoordinator::new(42);
2514 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2515
2516 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2517 coord.spawn_job(2, 100, 201, 301, vec![]).unwrap();
2518
2519 let msg1 = coord.drain_outbox().unwrap();
2520 let msg2 = coord.drain_outbox().unwrap();
2521 assert!(msg2.seq_no > msg1.seq_no);
2522 assert!(msg2.message_id > msg1.message_id);
2523 }
2524
2525 #[test]
2526 fn coordinator_emits_independent_decision_sequence() {
2527 let mut coord = WorkerCoordinator::new(42);
2528 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2529 coord.next_decision_seq = 19;
2530
2531 coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2532 coord.spawn_job(2, 100, 201, 301, vec![]).unwrap();
2533
2534 let msg1 = coord.drain_outbox().unwrap();
2535 let msg2 = coord.drain_outbox().unwrap();
2536
2537 assert_eq!(msg1.seq_no, 1);
2538 assert_eq!(msg1.decision_seq, 19);
2539 assert_ne!(msg1.seq_no, msg1.decision_seq);
2540 assert!(msg1.validate().is_ok());
2541
2542 assert_eq!(msg2.seq_no, 2);
2543 assert_eq!(msg2.decision_seq, 20);
2544 assert_ne!(msg2.seq_no, msg2.decision_seq);
2545 assert!(msg2.validate().is_ok());
2546 }
2547
2548 #[test]
2549 fn diagnostic_events_are_accepted() {
2550 let mut coord = WorkerCoordinator::new(42);
2551 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2552
2553 let diag = test_worker_envelope(
2554 2,
2555 2,
2556 1,
2557 WorkerOp::Diagnostic(DiagnosticEvent {
2558 level: DiagnosticLevel::Info,
2559 category: "lifecycle".into(),
2560 message: "worker initialized".into(),
2561 metadata: None,
2562 }),
2563 );
2564 assert!(coord.handle_inbound(&diag).is_ok());
2565 }
2566
2567 #[test]
2568 fn coordinator_rejects_stale_old_worker_message_after_rebootstrap() {
2569 let mut coord = WorkerCoordinator::new(42);
2570 coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2571
2572 let reboot = worker_envelope(
2573 "test-worker-2",
2574 1,
2575 1,
2576 0,
2577 WorkerOp::BootstrapReady {
2578 worker_id: "test-worker-2".into(),
2579 },
2580 );
2581 coord.handle_inbound(&reboot).unwrap();
2582
2583 let stale = test_worker_envelope(
2584 9,
2585 9,
2586 1,
2587 WorkerOp::Diagnostic(DiagnosticEvent {
2588 level: DiagnosticLevel::Warn,
2589 category: "stale".into(),
2590 message: "late message from replaced worker".into(),
2591 metadata: None,
2592 }),
2593 );
2594 assert_eq!(
2595 coord.handle_inbound(&stale),
2596 Err(WorkerChannelError::InboundWorkerSessionMismatch {
2597 expected: Some("test-worker-2".into()),
2598 actual: "test-worker-1".into(),
2599 })
2600 );
2601
2602 let fresh = worker_envelope(
2603 "test-worker-2",
2604 2,
2605 2,
2606 1,
2607 WorkerOp::Diagnostic(DiagnosticEvent {
2608 level: DiagnosticLevel::Info,
2609 category: "fresh".into(),
2610 message: "new worker follow-up".into(),
2611 metadata: None,
2612 }),
2613 );
2614 coord.handle_inbound(&fresh).unwrap();
2615 assert_eq!(coord.last_inbound_seq_no, 2);
2616 }
2617
2618 #[test]
2619 fn envelope_rejects_worker_message_without_session_identity() {
2620 let env = WorkerEnvelope::new(2, 2, 42, 1, WorkerOp::ShutdownCompleted);
2621 assert_eq!(
2622 env.validate(),
2623 Err(WorkerChannelError::MissingWorkerSessionIdentity)
2624 );
2625 }
2626}