Skip to main content

asupersync/net/
worker_channel.rs

1//! MessagePort-based coordination utilities for browser main-thread / worker
2//! runtime communication.
3//!
4//! Bead: asupersync-18tbo.3
5//!
6//! This module provides the typed coordination layer that sits between raw
7//! `postMessage` usage and application code. It defines a structured message
8//! protocol for:
9//!
10//! - **Bootstrap readiness**: worker reports initialization status
11//! - **Work dispatch**: main thread sends work requests with region/task IDs
12//! - **Cancellation**: main thread requests cancellation of in-flight work
13//! - **Graceful shutdown**: coordinated worker lifecycle termination
14//! - **Diagnostic events**: structured error/status reporting
15//!
16//! # Design
17//!
18//! The protocol is defined as Rust types that serialize to JSON for the
19//! structured-clone boundary. All messages carry explicit region and
20//! sequence metadata so the coordination path remains compatible with
21//! Asupersync's structured concurrency and deterministic replay.
22//!
23//! # Browser Integration
24//!
25//! On `wasm32` targets, the coordinator and endpoint integrate with the
26//! [`BrowserReactor`] through its `register_message_port()` API, delivering
27//! events via the reactor's token-based readiness model.
28
29use std::collections::VecDeque;
30use std::fmt;
31
32/// Protocol version for the worker coordination envelope.
33pub const WORKER_PROTOCOL_VERSION: u32 = 1;
34
35/// Maximum payload size in bytes (256 KiB, matching policy).
36pub const MAX_PAYLOAD_BYTES: usize = 262_144;
37
38/// Payload ownership mode across the non-SAB worker boundary.
39///
40/// The concrete v1 envelope only ships structured-clone semantics for owned
41/// byte payloads. The enum exists so policy, diagnostics, and later browser
42/// wiring can describe transfer rules without implying SharedArrayBuffer.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
44#[serde(rename_all = "snake_case")]
45pub enum WorkerPayloadTransfer {
46    /// Structured-clone the payload across the `postMessage` boundary.
47    StructuredClone,
48    /// Transfer ownership of an `ArrayBuffer`-backed payload.
49    TransferArrayBuffer,
50}
51
52#[must_use]
53fn replay_hash(
54    message_id: u64,
55    seq_no: u64,
56    decision_seq: u64,
57    seed: u64,
58    issued_at_turn: u64,
59    worker_id: Option<&str>,
60    op: &WorkerOp,
61) -> u64 {
62    let mut hash = message_id
63        .wrapping_mul(0x9E37_79B1_85EB_CA87)
64        .wrapping_add(seq_no.rotate_left(13))
65        ^ decision_seq.rotate_left(23)
66        ^ seed.rotate_left(29)
67        ^ issued_at_turn.rotate_left(47);
68    if let Some(worker_id) = worker_id {
69        for byte in worker_id.as_bytes() {
70            hash = hash.rotate_left(7) ^ u64::from(*byte);
71            hash = hash.wrapping_mul(0x100_0000_01B3);
72        }
73    }
74    for byte in serde_json::to_vec(op).unwrap_or_default() {
75        hash = hash.rotate_left(5) ^ u64::from(byte);
76        hash = hash.wrapping_mul(0x100_0000_01B3);
77    }
78    hash
79}
80
81fn validate_payload_size(size: usize) -> Result<(), WorkerChannelError> {
82    if size > MAX_PAYLOAD_BYTES {
83        return Err(WorkerChannelError::PayloadTooLarge {
84            size,
85            max: MAX_PAYLOAD_BYTES,
86        });
87    }
88    Ok(())
89}
90
91// ─── Envelope ────────────────────────────────────────────────────────
92
93/// A typed coordination message exchanged between main thread and worker.
94///
95/// All messages carry sequence metadata for deterministic replay and
96/// region affinity for structured concurrency enforcement.
97#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
98pub struct WorkerEnvelope {
99    /// Protocol version for forward compatibility.
100    pub version: u32,
101    /// Unique message identifier within this coordination session.
102    pub message_id: u64,
103    /// Monotonically increasing sequence number per sender.
104    pub seq_no: u64,
105    /// Deterministic decision sequence used for replay parity.
106    pub decision_seq: u64,
107    /// Deterministic RNG seed for replay (propagated from parent Cx).
108    pub seed: u64,
109    /// Host turn ID at message creation (for deterministic scheduling).
110    pub issued_at_turn: u64,
111    /// Worker runtime instance for worker-originated messages.
112    ///
113    /// Main-thread coordinator messages leave this unset. Worker-originated
114    /// messages must carry the runtime instance id so stale messages from a
115    /// replaced worker cannot poison a fresh inbound session.
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    pub worker_id: Option<String>,
118    /// Stable replay digest for cross-runtime policy checks.
119    pub replay_hash: u64,
120    /// The coordination operation.
121    pub op: WorkerOp,
122}
123
124impl WorkerEnvelope {
125    /// Create a coordinator-originated envelope with the given operation and sequence metadata.
126    ///
127    /// This constructor is for main-thread/coordinator messages only. Worker-
128    /// originated messages must use [`Self::from_worker`] so the runtime
129    /// instance identity is carried explicitly.
130    #[must_use]
131    pub fn new(message_id: u64, seq_no: u64, seed: u64, issued_at_turn: u64, op: WorkerOp) -> Self {
132        Self::new_with_decision_seq(message_id, seq_no, seq_no, seed, issued_at_turn, op)
133    }
134
135    /// Create a coordinator-originated envelope with explicit transport and decision sequence
136    /// counters.
137    ///
138    /// This constructor intentionally leaves [`Self::worker_id`] unset. Use
139    /// [`Self::from_worker_with_decision_seq`] for worker-originated messages.
140    #[must_use]
141    pub fn new_with_decision_seq(
142        message_id: u64,
143        seq_no: u64,
144        decision_seq: u64,
145        seed: u64,
146        issued_at_turn: u64,
147        op: WorkerOp,
148    ) -> Self {
149        Self {
150            version: WORKER_PROTOCOL_VERSION,
151            message_id,
152            seq_no,
153            decision_seq,
154            seed,
155            issued_at_turn,
156            worker_id: None,
157            replay_hash: replay_hash(
158                message_id,
159                seq_no,
160                decision_seq,
161                seed,
162                issued_at_turn,
163                None,
164                &op,
165            ),
166            op,
167        }
168    }
169
170    /// Create a worker-originated envelope tagged with the worker runtime id.
171    #[must_use]
172    pub fn from_worker(
173        worker_id: impl Into<String>,
174        message_id: u64,
175        seq_no: u64,
176        seed: u64,
177        issued_at_turn: u64,
178        op: WorkerOp,
179    ) -> Self {
180        Self::from_worker_with_decision_seq(
181            worker_id,
182            message_id,
183            seq_no,
184            seq_no,
185            seed,
186            issued_at_turn,
187            op,
188        )
189    }
190
191    /// Create a worker-originated envelope with explicit decision sequence metadata.
192    #[must_use]
193    pub fn from_worker_with_decision_seq(
194        worker_id: impl Into<String>,
195        message_id: u64,
196        seq_no: u64,
197        decision_seq: u64,
198        seed: u64,
199        issued_at_turn: u64,
200        op: WorkerOp,
201    ) -> Self {
202        let worker_id = worker_id.into();
203        Self {
204            version: WORKER_PROTOCOL_VERSION,
205            message_id,
206            seq_no,
207            decision_seq,
208            seed,
209            issued_at_turn,
210            worker_id: Some(worker_id.clone()),
211            replay_hash: replay_hash(
212                message_id,
213                seq_no,
214                decision_seq,
215                seed,
216                issued_at_turn,
217                Some(worker_id.as_str()),
218                &op,
219            ),
220            op,
221        }
222    }
223
224    /// Validate the envelope against protocol constraints.
225    pub fn validate(&self) -> Result<(), WorkerChannelError> {
226        if self.version != WORKER_PROTOCOL_VERSION {
227            return Err(WorkerChannelError::VersionMismatch {
228                expected: WORKER_PROTOCOL_VERSION,
229                actual: self.version,
230            });
231        }
232        let expected_replay_hash = replay_hash(
233            self.message_id,
234            self.seq_no,
235            self.decision_seq,
236            self.seed,
237            self.issued_at_turn,
238            self.worker_id.as_deref(),
239            &self.op,
240        );
241        if self.replay_hash != expected_replay_hash {
242            return Err(WorkerChannelError::ReplayHashMismatch {
243                expected: expected_replay_hash,
244                actual: self.replay_hash,
245            });
246        }
247        self.validate_worker_identity()?;
248        match &self.op {
249            WorkerOp::SpawnJob(req) => validate_payload_size(req.payload.len())?,
250            WorkerOp::JobCompleted(JobResult {
251                outcome: JobOutcome::Ok { payload },
252                ..
253            }) => validate_payload_size(payload.len())?,
254            _ => {}
255        }
256        Ok(())
257    }
258
259    fn validate_worker_identity(&self) -> Result<(), WorkerChannelError> {
260        match &self.op {
261            WorkerOp::BootstrapReady { worker_id }
262            | WorkerOp::BootstrapFailed { worker_id, .. } => {
263                let actual = self
264                    .worker_id
265                    .as_deref()
266                    .ok_or(WorkerChannelError::MissingWorkerSessionIdentity)?;
267                if actual != worker_id {
268                    return Err(WorkerChannelError::WorkerIdentityMismatch {
269                        expected: worker_id.clone(),
270                        actual: actual.to_string(),
271                    });
272                }
273            }
274            WorkerOp::StatusSnapshot(_)
275            | WorkerOp::JobCompleted(_)
276            | WorkerOp::CancelAcknowledged { .. }
277            | WorkerOp::DrainCompleted { .. }
278            | WorkerOp::FinalizeCompleted { .. }
279            | WorkerOp::ShutdownCompleted
280            | WorkerOp::Diagnostic(_) => {
281                if self.worker_id.is_none() {
282                    return Err(WorkerChannelError::MissingWorkerSessionIdentity);
283                }
284            }
285            WorkerOp::SpawnJob(_)
286            | WorkerOp::PollStatus { .. }
287            | WorkerOp::CancelJob { .. }
288            | WorkerOp::DrainJob { .. }
289            | WorkerOp::FinalizeJob { .. }
290            | WorkerOp::ShutdownWorker { .. } => {
291                if let Some(worker_id) = &self.worker_id {
292                    return Err(WorkerChannelError::UnexpectedWorkerSessionIdentity(
293                        worker_id.clone(),
294                    ));
295                }
296            }
297        }
298        Ok(())
299    }
300}
301
302// ─── Operations ──────────────────────────────────────────────────────
303
304/// Worker coordination operations.
305///
306/// These map to the lifecycle messages described in the worker offload
307/// policy: bootstrap, work dispatch, cancellation, shutdown, and
308/// diagnostics.
309#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
310#[serde(tag = "type")]
311pub enum WorkerOp {
312    // ── Bootstrap ────────────────────────────────────────────────
313    /// Worker → main: worker runtime has initialized successfully.
314    BootstrapReady {
315        /// Worker-assigned identifier for this runtime instance.
316        worker_id: String,
317    },
318    /// Worker → main: worker runtime failed to initialize.
319    BootstrapFailed {
320        /// Worker-assigned identifier for this runtime instance.
321        worker_id: String,
322        /// Human-readable failure reason.
323        reason: String,
324    },
325
326    // ── Work dispatch ────────────────────────────────────────────
327    /// Main → worker: spawn a new job inside the worker runtime.
328    SpawnJob(SpawnJobRequest),
329    /// Main → worker: request an explicit status snapshot for an in-flight job.
330    PollStatus {
331        /// Job identifier whose state should be reported.
332        job_id: u64,
333    },
334    /// Worker → main: current state for an in-flight job.
335    StatusSnapshot(JobStatusSnapshot),
336    /// Worker → main: job completed with a result.
337    JobCompleted(JobResult),
338
339    // ── Cancellation ─────────────────────────────────────────────
340    /// Main → worker: request cancellation of a specific job.
341    CancelJob {
342        /// Job identifier to cancel.
343        job_id: u64,
344        /// Cancellation reason.
345        reason: String,
346    },
347    /// Worker → main: cancellation acknowledged, entering drain phase.
348    CancelAcknowledged {
349        /// Job identifier whose cancellation request was acknowledged.
350        job_id: u64,
351    },
352    /// Main → worker: execute bounded drain for a cancelled job.
353    DrainJob {
354        /// Job identifier whose drain phase should execute.
355        job_id: u64,
356    },
357    /// Worker → main: drain phase completed.
358    DrainCompleted {
359        /// Job identifier whose drain phase completed.
360        job_id: u64,
361    },
362    /// Main → worker: execute bounded finalize after drain completion.
363    FinalizeJob {
364        /// Job identifier whose finalize phase should execute.
365        job_id: u64,
366    },
367    /// Worker → main: finalize phase completed.
368    FinalizeCompleted {
369        /// Job identifier whose finalize phase completed.
370        job_id: u64,
371    },
372
373    // ── Shutdown ─────────────────────────────────────────────────
374    /// Main → worker: request graceful shutdown of the worker runtime.
375    ShutdownWorker {
376        /// Reason for the shutdown request.
377        reason: String,
378    },
379    /// Worker → main: shutdown completed, worker is safe to terminate.
380    ShutdownCompleted,
381
382    // ── Diagnostics ──────────────────────────────────────────────
383    /// Worker → main: structured diagnostic event.
384    Diagnostic(DiagnosticEvent),
385}
386
387/// A request to spawn a job inside the worker runtime.
388#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
389pub struct SpawnJobRequest {
390    /// Unique job identifier within this coordination session.
391    pub job_id: u64,
392    /// Region ID that owns this job (for structured concurrency enforcement).
393    pub region_id: u64,
394    /// Task ID within the owning region.
395    pub task_id: u64,
396    /// Obligation ID for the job's commit/abort tracking.
397    pub obligation_id: u64,
398    /// Serialized job payload (must respect MAX_PAYLOAD_BYTES).
399    pub payload: Vec<u8>,
400}
401
402impl SpawnJobRequest {
403    /// The concrete v1 non-SAB transport always uses structured-cloned owned bytes.
404    #[must_use]
405    pub fn payload_transfer(&self) -> WorkerPayloadTransfer {
406        WorkerPayloadTransfer::StructuredClone
407    }
408
409    /// Whether the request owns the payload it is sending across the boundary.
410    #[must_use]
411    pub fn owned_payload(&self) -> bool {
412        true
413    }
414}
415
416/// Explicit non-terminal status report for `poll_status` requests.
417#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
418pub struct JobStatusSnapshot {
419    /// Job identifier being reported.
420    pub job_id: u64,
421    /// Current worker-observed non-terminal lifecycle state.
422    ///
423    /// Terminal outcomes must use [`WorkerOp::JobCompleted`] or
424    /// [`WorkerOp::FinalizeCompleted`] so the coordinator does not lose the
425    /// completion semantics attached to those messages. Coordinator-owned
426    /// phases such as `created`, `draining`, and `finalizing` must not be
427    /// reported via `status_snapshot`; they are driven by explicit control
428    /// messages in the cancellation protocol.
429    pub state: JobState,
430    /// Optional human-readable detail for diagnostics.
431    pub detail: Option<String>,
432}
433
434/// The result of a completed job.
435#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
436pub struct JobResult {
437    /// Job identifier.
438    pub job_id: u64,
439    /// Four-valued outcome matching Asupersync's Outcome semantics.
440    pub outcome: JobOutcome,
441}
442
443/// Job completion outcome using the four-valued Asupersync model.
444#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
445#[serde(tag = "status")]
446pub enum JobOutcome {
447    /// Job completed successfully.
448    Ok {
449        /// Serialized result payload.
450        payload: Vec<u8>,
451    },
452    /// Job completed with an application error.
453    Err {
454        /// Error code.
455        code: String,
456        /// Human-readable error message.
457        message: String,
458    },
459    /// Job was cancelled through the cancellation protocol.
460    Cancelled {
461        /// Cancellation reason.
462        reason: String,
463    },
464    /// Job panicked (worker caught the panic).
465    Panicked {
466        /// Panic payload description.
467        message: String,
468    },
469}
470
471impl JobOutcome {
472    /// The concrete v1 result path uses structured-cloned owned bytes.
473    #[must_use]
474    pub fn payload_transfer(&self) -> Option<WorkerPayloadTransfer> {
475        match self {
476            Self::Ok { .. } => Some(WorkerPayloadTransfer::StructuredClone),
477            Self::Err { .. } | Self::Cancelled { .. } | Self::Panicked { .. } => None,
478        }
479    }
480}
481
482/// A structured diagnostic event from the worker.
483#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
484pub struct DiagnosticEvent {
485    /// Severity level.
486    pub level: DiagnosticLevel,
487    /// Diagnostic category.
488    pub category: String,
489    /// Human-readable message.
490    pub message: String,
491    /// Optional structured metadata.
492    pub metadata: Option<String>,
493}
494
495/// Diagnostic severity levels.
496#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
497pub enum DiagnosticLevel {
498    /// Informational (lifecycle transitions, metrics).
499    Info,
500    /// Warning (degraded state, approaching limits).
501    Warn,
502    /// Error (failed operation, requires attention).
503    Error,
504}
505
506// ─── Job State Machine ───────────────────────────────────────────────
507
508/// Job lifecycle state matching the worker offload policy.
509///
510/// Transitions:
511/// ```text
512/// Created → Queued → Running → Completed
513///           ↓          ↓         ↘
514///      CancelRequested → Draining → Finalizing → Completed
515///                      ↘
516///                       Completed (cancel raced with natural completion before ack)
517///
518/// Any non-terminal state may also transition to `Failed` if the worker
519/// session dies or is replaced before the job reaches a terminal outcome.
520/// ```
521#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
522pub enum JobState {
523    /// Job created but not yet dispatched to the worker.
524    Created,
525    /// Job dispatched, waiting for worker to start.
526    Queued,
527    /// Job actively running in the worker.
528    Running,
529    /// Cancellation requested, waiting for acknowledgement.
530    CancelRequested,
531    /// Worker acknowledged cancellation, draining in progress.
532    Draining,
533    /// Drain completed, finalization in progress.
534    Finalizing,
535    /// Job completed (with any outcome).
536    Completed,
537    /// Job failed before completion.
538    Failed,
539}
540
541impl fmt::Display for JobState {
542    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
543        match self {
544            Self::Created => write!(f, "created"),
545            Self::Queued => write!(f, "queued"),
546            Self::Running => write!(f, "running"),
547            Self::CancelRequested => write!(f, "cancel_requested"),
548            Self::Draining => write!(f, "draining"),
549            Self::Finalizing => write!(f, "finalizing"),
550            Self::Completed => write!(f, "completed"),
551            Self::Failed => write!(f, "failed"),
552        }
553    }
554}
555
556impl JobState {
557    /// Whether this state is terminal.
558    #[must_use]
559    pub const fn is_terminal(self) -> bool {
560        matches!(self, Self::Completed | Self::Failed)
561    }
562
563    /// Whether a worker may report this state via `status_snapshot`.
564    ///
565    /// Only states the worker can observe organically are allowed.
566    /// `CancelRequested`, `Draining`, and `Finalizing` are driven by
567    /// the coordinator's explicit cancel protocol messages.
568    #[must_use]
569    pub const fn allowed_in_status_snapshot(self) -> bool {
570        matches!(self, Self::Queued | Self::Running)
571    }
572
573    /// Check whether the given transition is valid.
574    #[must_use]
575    pub fn can_transition_to(self, next: Self) -> bool {
576        matches!(
577            (self, next),
578            (Self::Created, Self::Queued | Self::Failed)
579                | (
580                    Self::Queued,
581                    Self::Running | Self::CancelRequested | Self::Completed | Self::Failed,
582                )
583                | (
584                    Self::Running,
585                    Self::Completed | Self::CancelRequested | Self::Failed,
586                )
587                | (
588                    Self::CancelRequested,
589                    Self::Completed | Self::Draining | Self::Failed,
590                )
591                | (Self::Draining, Self::Finalizing | Self::Failed)
592                | (Self::Finalizing, Self::Completed | Self::Failed)
593        )
594    }
595}
596
597// ─── Tracked Job ─────────────────────────────────────────────────────
598
599/// Main-thread tracking state for an in-flight job.
600#[derive(Debug)]
601pub struct TrackedJob {
602    /// Job identifier.
603    pub job_id: u64,
604    /// Region that owns this job.
605    pub region_id: u64,
606    /// Current lifecycle state.
607    pub state: JobState,
608    /// Sequence number of the last message sent about this job.
609    pub last_seq_no: u64,
610}
611
612impl TrackedJob {
613    /// Create a new tracked job in the Created state.
614    #[must_use]
615    pub fn new(job_id: u64, region_id: u64) -> Self {
616        Self {
617            job_id,
618            region_id,
619            state: JobState::Created,
620            last_seq_no: 0,
621        }
622    }
623
624    /// Attempt a state transition, returning an error on invalid transitions.
625    pub fn transition_to(&mut self, next: JobState) -> Result<JobState, WorkerChannelError> {
626        if !self.state.can_transition_to(next) {
627            return Err(WorkerChannelError::InvalidTransition {
628                job_id: self.job_id,
629                from: self.state,
630                to: next,
631            });
632        }
633        let prev = self.state;
634        self.state = next;
635        Ok(prev)
636    }
637}
638
639// ─── Coordinator (main-thread side) ──────────────────────────────────
640
641/// Main-thread coordinator for worker lifecycle management.
642///
643/// Manages the outbound message queue, tracks in-flight jobs, and enforces
644/// the coordination protocol.
645#[derive(Debug)]
646pub struct WorkerCoordinator {
647    /// Outbound message queue.
648    outbox: VecDeque<WorkerEnvelope>,
649    /// Monotonically increasing sequence number.
650    next_seq: u64,
651    /// Monotonically increasing deterministic decision sequence.
652    next_decision_seq: u64,
653    /// Monotonically increasing message ID.
654    next_message_id: u64,
655    /// Active tracked jobs by job_id.
656    jobs: std::collections::BTreeMap<u64, TrackedJob>,
657    /// Current deterministic seed.
658    seed: u64,
659    /// Current host turn ID.
660    turn: u64,
661    /// Highest inbound worker sequence accepted by this coordinator session.
662    last_inbound_seq_no: u64,
663    /// Worker runtime instance currently associated with the inbound session.
664    active_worker_id: Option<String>,
665    /// Whether the worker has reported bootstrap readiness.
666    worker_ready: bool,
667    /// Whether a shutdown has been requested.
668    shutdown_requested: bool,
669}
670
671impl WorkerCoordinator {
672    /// Create a new coordinator with the given deterministic seed.
673    #[must_use]
674    pub fn new(seed: u64) -> Self {
675        Self {
676            outbox: VecDeque::new(),
677            next_seq: 1,
678            next_decision_seq: 1,
679            next_message_id: 1,
680            jobs: std::collections::BTreeMap::new(),
681            seed,
682            turn: 0,
683            last_inbound_seq_no: 0,
684            active_worker_id: None,
685            worker_ready: false,
686            shutdown_requested: false,
687        }
688    }
689
690    /// Advance the host turn counter.
691    pub fn advance_turn(&mut self) {
692        self.turn += 1;
693    }
694
695    /// Whether the worker has reported bootstrap readiness.
696    #[must_use]
697    pub fn is_worker_ready(&self) -> bool {
698        self.worker_ready
699    }
700
701    /// Whether a shutdown has been requested.
702    #[must_use]
703    pub fn is_shutdown_requested(&self) -> bool {
704        self.shutdown_requested
705    }
706
707    /// Number of in-flight (non-completed) jobs.
708    #[must_use]
709    pub fn inflight_count(&self) -> usize {
710        self.jobs
711            .values()
712            .filter(|j| j.state != JobState::Completed && j.state != JobState::Failed)
713            .count()
714    }
715
716    /// Enqueue a spawn-job message. Returns the job_id.
717    pub fn spawn_job(
718        &mut self,
719        job_id: u64,
720        region_id: u64,
721        task_id: u64,
722        obligation_id: u64,
723        payload: Vec<u8>,
724    ) -> Result<u64, WorkerChannelError> {
725        if !self.worker_ready {
726            return Err(WorkerChannelError::WorkerNotReady);
727        }
728        if self.shutdown_requested {
729            return Err(WorkerChannelError::ShutdownInProgress);
730        }
731        validate_payload_size(payload.len())?;
732        if self.jobs.contains_key(&job_id) {
733            return Err(WorkerChannelError::DuplicateJobId(job_id));
734        }
735
736        let mut tracked = TrackedJob::new(job_id, region_id);
737        tracked.transition_to(JobState::Queued)?;
738
739        let envelope = self.make_envelope(WorkerOp::SpawnJob(SpawnJobRequest {
740            job_id,
741            region_id,
742            task_id,
743            obligation_id,
744            payload,
745        }));
746        tracked.last_seq_no = envelope.seq_no;
747        self.jobs.insert(job_id, tracked);
748        self.outbox.push_back(envelope);
749        Ok(job_id)
750    }
751
752    /// Enqueue a cancel-job message.
753    pub fn cancel_job(&mut self, job_id: u64, reason: String) -> Result<(), WorkerChannelError> {
754        if self.shutdown_requested {
755            return Err(WorkerChannelError::ShutdownInProgress);
756        }
757        {
758            let job = self
759                .jobs
760                .get_mut(&job_id)
761                .ok_or(WorkerChannelError::UnknownJobId(job_id))?;
762            job.transition_to(JobState::CancelRequested)?;
763        }
764
765        self.enqueue_job_message(job_id, WorkerOp::CancelJob { job_id, reason })
766    }
767
768    /// Enqueue an explicit poll-status message for an in-flight job.
769    pub fn poll_status(&mut self, job_id: u64) -> Result<(), WorkerChannelError> {
770        if self.shutdown_requested {
771            return Err(WorkerChannelError::ShutdownInProgress);
772        }
773        if !self.jobs.contains_key(&job_id) {
774            return Err(WorkerChannelError::UnknownJobId(job_id));
775        }
776        self.enqueue_job_message(job_id, WorkerOp::PollStatus { job_id })
777    }
778
779    /// Enqueue a shutdown message.
780    pub fn request_shutdown(&mut self, reason: String) -> Result<(), WorkerChannelError> {
781        if self.shutdown_requested {
782            return Err(WorkerChannelError::ShutdownInProgress);
783        }
784        self.shutdown_requested = true;
785        self.discard_outbound_session_messages();
786        let envelope = self.make_envelope(WorkerOp::ShutdownWorker { reason });
787        self.outbox.push_back(envelope);
788        Ok(())
789    }
790
791    /// Process an inbound message from the worker.
792    pub fn handle_inbound(&mut self, envelope: &WorkerEnvelope) -> Result<(), WorkerChannelError> {
793        self.prepare_inbound(envelope)?;
794        match &envelope.op {
795            WorkerOp::BootstrapReady { worker_id } => {
796                self.active_worker_id = Some(worker_id.clone());
797                self.worker_ready = true;
798                self.shutdown_requested = false;
799                Ok(())
800            }
801            WorkerOp::BootstrapFailed { reason, .. } => {
802                self.fail_nonterminal_jobs();
803                self.discard_outbound_session_messages();
804                // Clear active_worker_id so the failed worker cannot send
805                // follow-up messages (Diagnostic, ShutdownCompleted) that
806                // would pass the session check and potentially interfere
807                // with a replacement worker's bootstrap sequence.
808                self.active_worker_id = None;
809                self.worker_ready = false;
810                self.shutdown_requested = false;
811                Err(WorkerChannelError::BootstrapFailed(reason.clone()))
812            }
813            WorkerOp::StatusSnapshot(snapshot) => self.handle_status_snapshot(snapshot),
814            WorkerOp::JobCompleted(result) => {
815                let job = self
816                    .jobs
817                    .get_mut(&result.job_id)
818                    .ok_or(WorkerChannelError::UnknownJobId(result.job_id))?;
819                if !matches!(
820                    job.state,
821                    JobState::Queued | JobState::Running | JobState::CancelRequested
822                ) {
823                    return Err(WorkerChannelError::UnexpectedCompletionPhase {
824                        job_id: result.job_id,
825                        state: job.state,
826                    });
827                }
828                job.transition_to(JobState::Completed)?;
829                Ok(())
830            }
831            WorkerOp::CancelAcknowledged { job_id } => {
832                if !self.jobs.contains_key(job_id) {
833                    return Err(WorkerChannelError::UnknownJobId(*job_id));
834                }
835                // Once shutdown is in progress, do not emit new job-scoped
836                // control traffic. The pending job will be failed when the
837                // worker reports shutdown completion or the session is reset.
838                if self.shutdown_requested {
839                    return Ok(());
840                }
841                {
842                    let job = self
843                        .jobs
844                        .get_mut(job_id)
845                        .ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
846                    job.transition_to(JobState::Draining)?;
847                }
848                self.enqueue_job_message(*job_id, WorkerOp::DrainJob { job_id: *job_id })
849            }
850            WorkerOp::DrainCompleted { job_id } => {
851                if !self.jobs.contains_key(job_id) {
852                    return Err(WorkerChannelError::UnknownJobId(*job_id));
853                }
854                // Once shutdown is in progress, do not continue the explicit
855                // cancel protocol with new outbound drain/finalize traffic.
856                if self.shutdown_requested {
857                    return Ok(());
858                }
859                {
860                    let job = self
861                        .jobs
862                        .get_mut(job_id)
863                        .ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
864                    job.transition_to(JobState::Finalizing)?;
865                }
866                self.enqueue_job_message(*job_id, WorkerOp::FinalizeJob { job_id: *job_id })
867            }
868            WorkerOp::FinalizeCompleted { job_id } => {
869                let job = self
870                    .jobs
871                    .get_mut(job_id)
872                    .ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
873                job.transition_to(JobState::Completed)?;
874                Ok(())
875            }
876            WorkerOp::ShutdownCompleted => {
877                self.fail_nonterminal_jobs();
878                self.discard_outbound_session_messages();
879                self.active_worker_id = None;
880                self.shutdown_requested = false;
881                self.worker_ready = false;
882                Ok(())
883            }
884            WorkerOp::Diagnostic(_) => Ok(()),
885            // Main-to-worker ops should not be inbound
886            WorkerOp::SpawnJob(_)
887            | WorkerOp::PollStatus { .. }
888            | WorkerOp::CancelJob { .. }
889            | WorkerOp::DrainJob { .. }
890            | WorkerOp::FinalizeJob { .. }
891            | WorkerOp::ShutdownWorker { .. } => Err(WorkerChannelError::UnexpectedDirection {
892                op: format!("{:?}", std::mem::discriminant(&envelope.op)),
893            }),
894        }
895    }
896
897    /// Drain the next outbound message, if any.
898    #[must_use]
899    pub fn drain_outbox(&mut self) -> Option<WorkerEnvelope> {
900        self.outbox.pop_front()
901    }
902
903    /// Get the current state of a tracked job.
904    #[must_use]
905    pub fn job_state(&self, job_id: u64) -> Option<JobState> {
906        self.jobs.get(&job_id).map(|j| j.state)
907    }
908
909    fn enqueue_job_message(&mut self, job_id: u64, op: WorkerOp) -> Result<(), WorkerChannelError> {
910        if !self.jobs.contains_key(&job_id) {
911            return Err(WorkerChannelError::UnknownJobId(job_id));
912        }
913        let envelope = self.make_envelope(op);
914        if let Some(job) = self.jobs.get_mut(&job_id) {
915            job.last_seq_no = envelope.seq_no;
916        }
917        self.outbox.push_back(envelope);
918        Ok(())
919    }
920
921    fn handle_status_snapshot(
922        &mut self,
923        snapshot: &JobStatusSnapshot,
924    ) -> Result<(), WorkerChannelError> {
925        if snapshot.state.is_terminal() {
926            return Err(WorkerChannelError::TerminalStatusSnapshot {
927                job_id: snapshot.job_id,
928                state: snapshot.state,
929            });
930        }
931        if !snapshot.state.allowed_in_status_snapshot() {
932            return Err(WorkerChannelError::InvalidStatusSnapshotState {
933                job_id: snapshot.job_id,
934                state: snapshot.state,
935            });
936        }
937        let job = self
938            .jobs
939            .get_mut(&snapshot.job_id)
940            .ok_or(WorkerChannelError::UnknownJobId(snapshot.job_id))?;
941        if job.state != snapshot.state {
942            job.transition_to(snapshot.state)?;
943        }
944        Ok(())
945    }
946
947    fn prepare_inbound(&mut self, envelope: &WorkerEnvelope) -> Result<(), WorkerChannelError> {
948        envelope.validate()?;
949        if self.should_reset_inbound_session(&envelope.op) {
950            self.fail_nonterminal_jobs();
951            self.reset_inbound_session();
952        }
953        self.validate_inbound_worker_session(envelope)?;
954        self.validate_inbound_sequence(envelope.seq_no)?;
955        self.record_inbound_sequence(envelope.seq_no);
956        Ok(())
957    }
958
959    fn should_reset_inbound_session(&self, op: &WorkerOp) -> bool {
960        match op {
961            WorkerOp::BootstrapReady { worker_id }
962            | WorkerOp::BootstrapFailed { worker_id, .. } => {
963                self.active_worker_id.as_deref() != Some(worker_id.as_str())
964            }
965            _ => false,
966        }
967    }
968
969    fn fail_nonterminal_jobs(&mut self) {
970        for job in self.jobs.values_mut() {
971            if job.state.is_terminal() {
972                continue;
973            }
974            let _ = job.transition_to(JobState::Failed);
975        }
976    }
977
978    fn discard_outbound_session_messages(&mut self) {
979        self.outbox.clear();
980    }
981
982    fn reset_inbound_session(&mut self) {
983        self.last_inbound_seq_no = 0;
984        self.discard_outbound_session_messages();
985    }
986
987    fn validate_inbound_sequence(&self, seq_no: u64) -> Result<(), WorkerChannelError> {
988        if seq_no <= self.last_inbound_seq_no {
989            return Err(WorkerChannelError::InboundSequenceNotFresh {
990                last_seen: self.last_inbound_seq_no,
991                actual: seq_no,
992            });
993        }
994        Ok(())
995    }
996
997    fn record_inbound_sequence(&mut self, seq_no: u64) {
998        self.last_inbound_seq_no = seq_no;
999    }
1000
1001    fn validate_inbound_worker_session(
1002        &self,
1003        envelope: &WorkerEnvelope,
1004    ) -> Result<(), WorkerChannelError> {
1005        match &envelope.op {
1006            WorkerOp::BootstrapReady { .. }
1007            | WorkerOp::BootstrapFailed { .. }
1008            | WorkerOp::SpawnJob(_)
1009            | WorkerOp::PollStatus { .. }
1010            | WorkerOp::CancelJob { .. }
1011            | WorkerOp::DrainJob { .. }
1012            | WorkerOp::FinalizeJob { .. }
1013            | WorkerOp::ShutdownWorker { .. } => Ok(()),
1014            _ => {
1015                let actual = envelope
1016                    .worker_id
1017                    .as_deref()
1018                    .ok_or(WorkerChannelError::MissingWorkerSessionIdentity)?;
1019                if self.active_worker_id.as_deref() == Some(actual) {
1020                    return Ok(());
1021                }
1022                Err(WorkerChannelError::InboundWorkerSessionMismatch {
1023                    expected: self.active_worker_id.clone(),
1024                    actual: actual.to_string(),
1025                })
1026            }
1027        }
1028    }
1029
1030    fn make_envelope(&mut self, op: WorkerOp) -> WorkerEnvelope {
1031        let msg_id = self.next_message_id;
1032        let seq = self.next_seq;
1033        let decision_seq = self.next_decision_seq;
1034        self.next_message_id += 1;
1035        self.next_seq += 1;
1036        self.next_decision_seq += 1;
1037        WorkerEnvelope::new_with_decision_seq(msg_id, seq, decision_seq, self.seed, self.turn, op)
1038    }
1039}
1040
1041// ─── Errors ──────────────────────────────────────────────────────────
1042
1043/// Errors from the worker coordination channel.
1044#[derive(Debug, Clone, PartialEq, Eq)]
1045pub enum WorkerChannelError {
1046    /// Protocol version mismatch.
1047    VersionMismatch {
1048        /// Protocol version expected by the receiver.
1049        expected: u32,
1050        /// Protocol version provided by the sender.
1051        actual: u32,
1052    },
1053    /// Replay digest does not match the envelope metadata.
1054    ReplayHashMismatch {
1055        /// Expected replay digest for the envelope metadata.
1056        expected: u64,
1057        /// Actual replay digest carried by the envelope.
1058        actual: u64,
1059    },
1060    /// Inbound worker sequence repeated or regressed relative to coordinator state.
1061    InboundSequenceNotFresh {
1062        /// Highest inbound sequence previously observed by this coordinator.
1063        last_seen: u64,
1064        /// Sequence number carried by the rejected envelope.
1065        actual: u64,
1066    },
1067    /// Worker-originated messages must carry the worker runtime instance id.
1068    MissingWorkerSessionIdentity,
1069    /// Main-thread-originated messages must not carry a worker runtime instance id.
1070    UnexpectedWorkerSessionIdentity(String),
1071    /// Envelope worker runtime id disagrees with the operation payload.
1072    WorkerIdentityMismatch {
1073        /// Worker id implied by the operation payload.
1074        expected: String,
1075        /// Worker id carried by the envelope metadata.
1076        actual: String,
1077    },
1078    /// Inbound message came from a different worker runtime instance than the active session.
1079    InboundWorkerSessionMismatch {
1080        /// Active worker runtime id, if any.
1081        expected: Option<String>,
1082        /// Worker runtime id carried by the inbound envelope.
1083        actual: String,
1084    },
1085    /// Payload exceeds maximum size.
1086    PayloadTooLarge {
1087        /// Serialized payload size in bytes.
1088        size: usize,
1089        /// Maximum payload size in bytes permitted by the protocol.
1090        max: usize,
1091    },
1092    /// Invalid job state transition.
1093    InvalidTransition {
1094        /// Job identifier whose state transition was rejected.
1095        job_id: u64,
1096        /// State observed before the rejected transition.
1097        from: JobState,
1098        /// Target state requested by the rejected transition.
1099        to: JobState,
1100    },
1101    /// A status snapshot tried to report a terminal state.
1102    TerminalStatusSnapshot {
1103        /// Job identifier carried by the terminal snapshot.
1104        job_id: u64,
1105        /// Terminal state that must not be reported via `status_snapshot`.
1106        state: JobState,
1107    },
1108    /// A status snapshot tried to report a state owned by explicit coordinator control flow.
1109    InvalidStatusSnapshotState {
1110        /// Job identifier carried by the invalid snapshot.
1111        job_id: u64,
1112        /// State that must not be reported via `status_snapshot`.
1113        state: JobState,
1114    },
1115    /// A completion result arrived while the explicit cancellation protocol was active.
1116    UnexpectedCompletionPhase {
1117        /// Job identifier whose completion arrived out of phase.
1118        job_id: u64,
1119        /// Coordinator state when the completion arrived.
1120        state: JobState,
1121    },
1122    /// Worker has not reported bootstrap readiness.
1123    WorkerNotReady,
1124    /// Shutdown is already in progress.
1125    ShutdownInProgress,
1126    /// Duplicate job ID.
1127    DuplicateJobId(u64),
1128    /// Unknown job ID.
1129    UnknownJobId(u64),
1130    /// Worker bootstrap failed.
1131    BootstrapFailed(String),
1132    /// Received a message in the wrong direction.
1133    UnexpectedDirection {
1134        /// Operation received from the wrong side of the channel.
1135        op: String,
1136    },
1137}
1138
1139impl fmt::Display for WorkerChannelError {
1140    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1141        match self {
1142            Self::VersionMismatch { expected, actual } => {
1143                write!(
1144                    f,
1145                    "protocol version mismatch: expected {expected}, got {actual}"
1146                )
1147            }
1148            Self::ReplayHashMismatch { expected, actual } => {
1149                write!(f, "replay hash mismatch: expected {expected}, got {actual}")
1150            }
1151            Self::InboundSequenceNotFresh { last_seen, actual } => {
1152                write!(
1153                    f,
1154                    "inbound worker sequence is not fresh: saw {actual} after {last_seen}"
1155                )
1156            }
1157            Self::MissingWorkerSessionIdentity => {
1158                write!(
1159                    f,
1160                    "worker-originated envelope missing worker session identity"
1161                )
1162            }
1163            Self::UnexpectedWorkerSessionIdentity(worker_id) => {
1164                write!(
1165                    f,
1166                    "main-thread envelope unexpectedly carried worker session identity {worker_id}"
1167                )
1168            }
1169            Self::WorkerIdentityMismatch { expected, actual } => {
1170                write!(
1171                    f,
1172                    "worker identity mismatch: envelope carried {actual}, operation expects {expected}"
1173                )
1174            }
1175            Self::InboundWorkerSessionMismatch { expected, actual } => match expected {
1176                Some(expected) => write!(
1177                    f,
1178                    "inbound worker session mismatch: active session {expected}, got {actual}"
1179                ),
1180                None => write!(
1181                    f,
1182                    "inbound worker session mismatch: no active session, got {actual}"
1183                ),
1184            },
1185            Self::PayloadTooLarge { size, max } => {
1186                write!(
1187                    f,
1188                    "payload too large: {size} bytes exceeds {max} byte limit"
1189                )
1190            }
1191            Self::InvalidTransition { job_id, from, to } => {
1192                write!(f, "invalid job {job_id} transition: {from} → {to}")
1193            }
1194            Self::TerminalStatusSnapshot { job_id, state } => {
1195                write!(
1196                    f,
1197                    "job {job_id} reported terminal state {state} via status snapshot"
1198                )
1199            }
1200            Self::InvalidStatusSnapshotState { job_id, state } => {
1201                write!(
1202                    f,
1203                    "job {job_id} reported coordinator-owned state {state} via status snapshot"
1204                )
1205            }
1206            Self::UnexpectedCompletionPhase { job_id, state } => {
1207                write!(
1208                    f,
1209                    "job {job_id} reported completion while coordinator was in {state}"
1210                )
1211            }
1212            Self::WorkerNotReady => write!(f, "worker has not reported bootstrap readiness"),
1213            Self::ShutdownInProgress => write!(f, "shutdown already in progress"),
1214            Self::DuplicateJobId(id) => write!(f, "duplicate job id: {id}"),
1215            Self::UnknownJobId(id) => write!(f, "unknown job id: {id}"),
1216            Self::BootstrapFailed(reason) => write!(f, "worker bootstrap failed: {reason}"),
1217            Self::UnexpectedDirection { op } => {
1218                write!(f, "received outbound-only operation as inbound: {op}")
1219            }
1220        }
1221    }
1222}
1223
1224impl std::error::Error for WorkerChannelError {}
1225
1226// ─── Tests ───────────────────────────────────────────────────────────
1227
1228#[cfg(test)]
1229mod tests {
1230    use super::*;
1231
1232    fn worker_envelope(
1233        worker_id: &str,
1234        message_id: u64,
1235        seq_no: u64,
1236        issued_at_turn: u64,
1237        op: WorkerOp,
1238    ) -> WorkerEnvelope {
1239        WorkerEnvelope::from_worker(worker_id, message_id, seq_no, 42, issued_at_turn, op)
1240    }
1241
1242    fn test_worker_envelope(
1243        message_id: u64,
1244        seq_no: u64,
1245        issued_at_turn: u64,
1246        op: WorkerOp,
1247    ) -> WorkerEnvelope {
1248        worker_envelope("test-worker-1", message_id, seq_no, issued_at_turn, op)
1249    }
1250
1251    fn bootstrap_ready_envelope(seq: u64) -> WorkerEnvelope {
1252        worker_envelope(
1253            "test-worker-1",
1254            seq,
1255            seq,
1256            0,
1257            WorkerOp::BootstrapReady {
1258                worker_id: "test-worker-1".into(),
1259            },
1260        )
1261    }
1262
1263    fn bootstrap_failed_envelope(seq: u64, worker_id: &str, reason: &str) -> WorkerEnvelope {
1264        worker_envelope(
1265            worker_id,
1266            seq,
1267            seq,
1268            0,
1269            WorkerOp::BootstrapFailed {
1270                worker_id: worker_id.into(),
1271                reason: reason.into(),
1272            },
1273        )
1274    }
1275
1276    #[test]
1277    fn coordinator_rejects_spawn_before_bootstrap() {
1278        let mut coord = WorkerCoordinator::new(42);
1279        let result = coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]);
1280        assert_eq!(result, Err(WorkerChannelError::WorkerNotReady));
1281    }
1282
1283    #[test]
1284    fn coordinator_accepts_spawn_after_bootstrap() {
1285        let mut coord = WorkerCoordinator::new(42);
1286        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1287        assert!(coord.is_worker_ready());
1288
1289        let job_id = coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
1290        assert_eq!(job_id, 1);
1291        assert_eq!(coord.job_state(1), Some(JobState::Queued));
1292        assert_eq!(coord.inflight_count(), 1);
1293
1294        let msg = coord.drain_outbox().unwrap();
1295        assert!(matches!(msg.op, WorkerOp::SpawnJob(_)));
1296        assert_eq!(msg.version, WORKER_PROTOCOL_VERSION);
1297        assert_eq!(msg.seed, 42);
1298    }
1299
1300    #[test]
1301    fn coordinator_tracks_full_job_lifecycle() {
1302        let mut coord = WorkerCoordinator::new(42);
1303        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1304        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1305        let _ = coord.drain_outbox(); // consume spawn message
1306
1307        // Job completed successfully (Queued → Completed is valid for
1308        // fast-completing jobs that skip the Running notification)
1309        let result_env = test_worker_envelope(
1310            2,
1311            2,
1312            1,
1313            WorkerOp::JobCompleted(JobResult {
1314                job_id: 1,
1315                outcome: JobOutcome::Ok { payload: vec![42] },
1316            }),
1317        );
1318        coord.handle_inbound(&result_env).unwrap();
1319        assert_eq!(coord.job_state(1), Some(JobState::Completed));
1320        assert_eq!(coord.inflight_count(), 0);
1321        assert_eq!(
1322            JobOutcome::Ok { payload: vec![42] }.payload_transfer(),
1323            Some(WorkerPayloadTransfer::StructuredClone)
1324        );
1325    }
1326
1327    #[test]
1328    fn coordinator_tracks_cancellation_lifecycle() {
1329        let mut coord = WorkerCoordinator::new(42);
1330        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1331        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1332        let _ = coord.drain_outbox();
1333
1334        let running = test_worker_envelope(
1335            2,
1336            2,
1337            1,
1338            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1339                job_id: 1,
1340                state: JobState::Running,
1341                detail: Some("worker accepted job".into()),
1342            }),
1343        );
1344        coord.handle_inbound(&running).unwrap();
1345        assert_eq!(coord.job_state(1), Some(JobState::Running));
1346
1347        // Request cancellation
1348        coord.cancel_job(1, "test cancel".into()).unwrap();
1349        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1350        let cancel_msg = coord.drain_outbox().unwrap();
1351        assert!(matches!(cancel_msg.op, WorkerOp::CancelJob { .. }));
1352
1353        // Worker acknowledges and coordinator emits the explicit drain phase request.
1354        let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1355        coord.handle_inbound(&ack).unwrap();
1356        assert_eq!(coord.job_state(1), Some(JobState::Draining));
1357        let drain_request = coord.drain_outbox().unwrap();
1358        assert!(matches!(drain_request.op, WorkerOp::DrainJob { job_id: 1 }));
1359
1360        // Worker completes drain and coordinator emits the finalize phase request.
1361        let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1362        coord.handle_inbound(&drain).unwrap();
1363        assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
1364        let finalize_request = coord.drain_outbox().unwrap();
1365        assert!(matches!(
1366            finalize_request.op,
1367            WorkerOp::FinalizeJob { job_id: 1 }
1368        ));
1369
1370        // Finalize completed
1371        let finalize = test_worker_envelope(5, 5, 4, WorkerOp::FinalizeCompleted { job_id: 1 });
1372        coord.handle_inbound(&finalize).unwrap();
1373        assert_eq!(coord.job_state(1), Some(JobState::Completed));
1374    }
1375
1376    #[test]
1377    fn coordinator_rejects_terminal_status_snapshot() {
1378        let mut coord = WorkerCoordinator::new(42);
1379        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1380        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1381        let _ = coord.drain_outbox();
1382
1383        let snapshot = test_worker_envelope(
1384            2,
1385            2,
1386            1,
1387            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1388                job_id: 1,
1389                state: JobState::Completed,
1390                detail: Some("terminal snapshots must use job_completed".into()),
1391            }),
1392        );
1393        assert!(matches!(
1394            coord.handle_inbound(&snapshot),
1395            Err(WorkerChannelError::TerminalStatusSnapshot {
1396                job_id: 1,
1397                state: JobState::Completed,
1398            })
1399        ));
1400        assert_eq!(coord.job_state(1), Some(JobState::Queued));
1401    }
1402
1403    #[test]
1404    fn coordinator_rejects_status_snapshot_for_protocol_owned_drain_phase() {
1405        let mut coord = WorkerCoordinator::new(42);
1406        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1407        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1408        let _ = coord.drain_outbox();
1409
1410        let running = test_worker_envelope(
1411            2,
1412            2,
1413            1,
1414            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1415                job_id: 1,
1416                state: JobState::Running,
1417                detail: None,
1418            }),
1419        );
1420        coord.handle_inbound(&running).unwrap();
1421
1422        coord.cancel_job(1, "test cancel".into()).unwrap();
1423        let _ = coord.drain_outbox();
1424        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1425
1426        let draining_snapshot = test_worker_envelope(
1427            3,
1428            3,
1429            2,
1430            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1431                job_id: 1,
1432                state: JobState::Draining,
1433                detail: Some("worker tried to skip cancel_acknowledged".into()),
1434            }),
1435        );
1436        assert!(matches!(
1437            coord.handle_inbound(&draining_snapshot),
1438            Err(WorkerChannelError::InvalidStatusSnapshotState {
1439                job_id: 1,
1440                state: JobState::Draining,
1441            })
1442        ));
1443        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1444    }
1445
1446    #[test]
1447    fn coordinator_rejects_status_snapshot_for_protocol_owned_finalize_phase() {
1448        let mut coord = WorkerCoordinator::new(42);
1449        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1450        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1451        let _ = coord.drain_outbox();
1452
1453        let running = test_worker_envelope(
1454            2,
1455            2,
1456            1,
1457            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1458                job_id: 1,
1459                state: JobState::Running,
1460                detail: None,
1461            }),
1462        );
1463        coord.handle_inbound(&running).unwrap();
1464
1465        coord.cancel_job(1, "test cancel".into()).unwrap();
1466        let _ = coord.drain_outbox();
1467
1468        let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1469        coord.handle_inbound(&ack).unwrap();
1470        let _ = coord.drain_outbox();
1471        assert_eq!(coord.job_state(1), Some(JobState::Draining));
1472
1473        let finalizing_snapshot = test_worker_envelope(
1474            4,
1475            4,
1476            3,
1477            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1478                job_id: 1,
1479                state: JobState::Finalizing,
1480                detail: Some("worker tried to skip drain_completed".into()),
1481            }),
1482        );
1483        assert!(matches!(
1484            coord.handle_inbound(&finalizing_snapshot),
1485            Err(WorkerChannelError::InvalidStatusSnapshotState {
1486                job_id: 1,
1487                state: JobState::Finalizing,
1488            })
1489        ));
1490        assert_eq!(coord.job_state(1), Some(JobState::Draining));
1491    }
1492
1493    #[test]
1494    fn coordinator_rejects_job_completed_while_cancellation_protocol_active() {
1495        let mut coord = WorkerCoordinator::new(42);
1496        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1497        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1498        let _ = coord.drain_outbox();
1499
1500        let running = test_worker_envelope(
1501            2,
1502            2,
1503            1,
1504            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1505                job_id: 1,
1506                state: JobState::Running,
1507                detail: None,
1508            }),
1509        );
1510        coord.handle_inbound(&running).unwrap();
1511
1512        coord.cancel_job(1, "test cancel".into()).unwrap();
1513        let _ = coord.drain_outbox();
1514
1515        let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1516        coord.handle_inbound(&ack).unwrap();
1517        let _ = coord.drain_outbox();
1518
1519        let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1520        coord.handle_inbound(&drain).unwrap();
1521        let _ = coord.drain_outbox();
1522
1523        let completed = test_worker_envelope(
1524            5,
1525            5,
1526            4,
1527            WorkerOp::JobCompleted(JobResult {
1528                job_id: 1,
1529                outcome: JobOutcome::Cancelled {
1530                    reason: "worker skipped finalize".into(),
1531                },
1532            }),
1533        );
1534        assert!(matches!(
1535            coord.handle_inbound(&completed),
1536            Err(WorkerChannelError::UnexpectedCompletionPhase {
1537                job_id: 1,
1538                state: JobState::Finalizing,
1539            })
1540        ));
1541        assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
1542    }
1543
1544    #[test]
1545    fn coordinator_accepts_job_completed_racing_with_cancel_request() {
1546        let mut coord = WorkerCoordinator::new(42);
1547        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1548        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1549        let _ = coord.drain_outbox();
1550
1551        let running = test_worker_envelope(
1552            2,
1553            2,
1554            1,
1555            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1556                job_id: 1,
1557                state: JobState::Running,
1558                detail: Some("worker accepted job".into()),
1559            }),
1560        );
1561        coord.handle_inbound(&running).unwrap();
1562
1563        coord.cancel_job(1, "test cancel".into()).unwrap();
1564        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1565        let cancel_msg = coord.drain_outbox().unwrap();
1566        assert!(matches!(cancel_msg.op, WorkerOp::CancelJob { .. }));
1567
1568        let completed = test_worker_envelope(
1569            3,
1570            3,
1571            2,
1572            WorkerOp::JobCompleted(JobResult {
1573                job_id: 1,
1574                outcome: JobOutcome::Ok { payload: vec![7] },
1575            }),
1576        );
1577        coord.handle_inbound(&completed).unwrap();
1578        assert_eq!(coord.job_state(1), Some(JobState::Completed));
1579        assert_eq!(coord.inflight_count(), 0);
1580        assert!(coord.drain_outbox().is_none());
1581    }
1582
1583    #[test]
1584    fn coordinator_rejects_invalid_transition() {
1585        let mut coord = WorkerCoordinator::new(42);
1586        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1587        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1588        let _ = coord.drain_outbox();
1589
1590        coord.cancel_job(1, "cancel before running".into()).unwrap();
1591        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1592        let cancel = coord.drain_outbox().unwrap();
1593        assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
1594    }
1595
1596    #[test]
1597    fn coordinator_allows_cancel_before_running_snapshot() {
1598        let mut coord = WorkerCoordinator::new(42);
1599        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1600        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1601
1602        let spawn = coord.drain_outbox().unwrap();
1603        assert!(matches!(spawn.op, WorkerOp::SpawnJob(_)));
1604
1605        coord
1606            .cancel_job(1, "cancel before worker starts".into())
1607            .unwrap();
1608        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1609
1610        let cancel = coord.drain_outbox().unwrap();
1611        assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
1612    }
1613
1614    #[test]
1615    fn coordinator_rejects_oversized_payload() {
1616        let mut coord = WorkerCoordinator::new(42);
1617        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1618        let big_payload = vec![0u8; MAX_PAYLOAD_BYTES + 1];
1619        let result = coord.spawn_job(1, 100, 200, 300, big_payload);
1620        assert!(matches!(
1621            result,
1622            Err(WorkerChannelError::PayloadTooLarge { .. })
1623        ));
1624    }
1625
1626    #[test]
1627    fn coordinator_rejects_duplicate_job_id() {
1628        let mut coord = WorkerCoordinator::new(42);
1629        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1630        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1631        let result = coord.spawn_job(1, 100, 200, 300, vec![]);
1632        assert_eq!(result, Err(WorkerChannelError::DuplicateJobId(1)));
1633    }
1634
1635    #[test]
1636    fn coordinator_shutdown_lifecycle() {
1637        let mut coord = WorkerCoordinator::new(42);
1638        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1639
1640        coord.request_shutdown("test shutdown".into()).unwrap();
1641        assert!(coord.is_shutdown_requested());
1642
1643        let msg = coord.drain_outbox().unwrap();
1644        assert!(matches!(msg.op, WorkerOp::ShutdownWorker { .. }));
1645
1646        // Reject spawn during shutdown
1647        let result = coord.spawn_job(1, 100, 200, 300, vec![]);
1648        assert_eq!(result, Err(WorkerChannelError::ShutdownInProgress));
1649
1650        // Worker completes shutdown
1651        let done = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
1652        coord.handle_inbound(&done).unwrap();
1653        assert!(!coord.is_shutdown_requested());
1654        assert!(!coord.is_worker_ready());
1655        assert_eq!(
1656            coord.spawn_job(1, 100, 200, 300, vec![]),
1657            Err(WorkerChannelError::WorkerNotReady)
1658        );
1659    }
1660
1661    #[test]
1662    fn coordinator_request_shutdown_discards_queued_spawn_before_shutdown() {
1663        let mut coord = WorkerCoordinator::new(42);
1664        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1665        coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
1666
1667        coord.request_shutdown("shutdown now".into()).unwrap();
1668
1669        let shutdown = coord.drain_outbox().unwrap();
1670        assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1671        assert!(coord.drain_outbox().is_none());
1672    }
1673
1674    #[test]
1675    fn coordinator_request_shutdown_discards_queued_poll_and_cancel_before_shutdown() {
1676        let mut coord = WorkerCoordinator::new(42);
1677        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1678        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1679        let _ = coord.drain_outbox();
1680
1681        let running = test_worker_envelope(
1682            2,
1683            2,
1684            1,
1685            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1686                job_id: 1,
1687                state: JobState::Running,
1688                detail: Some("worker accepted job".into()),
1689            }),
1690        );
1691        coord.handle_inbound(&running).unwrap();
1692
1693        coord.poll_status(1).unwrap();
1694        coord
1695            .cancel_job(1, "shutdown supersedes cancel".into())
1696            .unwrap();
1697        coord.request_shutdown("shutdown now".into()).unwrap();
1698
1699        let shutdown = coord.drain_outbox().unwrap();
1700        assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1701        assert!(coord.drain_outbox().is_none());
1702    }
1703
1704    #[test]
1705    fn coordinator_rejects_cancel_after_shutdown_requested_without_mutating_job() {
1706        let mut coord = WorkerCoordinator::new(42);
1707        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1708        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1709        let _ = coord.drain_outbox();
1710
1711        let running = test_worker_envelope(
1712            2,
1713            2,
1714            1,
1715            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1716                job_id: 1,
1717                state: JobState::Running,
1718                detail: Some("worker accepted job".into()),
1719            }),
1720        );
1721        coord.handle_inbound(&running).unwrap();
1722
1723        coord.request_shutdown("shutdown now".into()).unwrap();
1724        assert_eq!(
1725            coord.cancel_job(1, "too late".into()),
1726            Err(WorkerChannelError::ShutdownInProgress)
1727        );
1728        assert_eq!(coord.job_state(1), Some(JobState::Running));
1729
1730        let shutdown = coord.drain_outbox().unwrap();
1731        assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1732        assert!(coord.drain_outbox().is_none());
1733    }
1734
1735    #[test]
1736    fn coordinator_rejects_poll_after_shutdown_requested_without_enqueuing_follow_up() {
1737        let mut coord = WorkerCoordinator::new(42);
1738        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1739        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1740        let _ = coord.drain_outbox();
1741
1742        let running = test_worker_envelope(
1743            2,
1744            2,
1745            1,
1746            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1747                job_id: 1,
1748                state: JobState::Running,
1749                detail: Some("worker accepted job".into()),
1750            }),
1751        );
1752        coord.handle_inbound(&running).unwrap();
1753
1754        coord.request_shutdown("shutdown now".into()).unwrap();
1755        assert_eq!(
1756            coord.poll_status(1),
1757            Err(WorkerChannelError::ShutdownInProgress)
1758        );
1759        assert_eq!(coord.job_state(1), Some(JobState::Running));
1760
1761        let shutdown = coord.drain_outbox().unwrap();
1762        assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1763        assert!(coord.drain_outbox().is_none());
1764    }
1765
1766    #[test]
1767    fn coordinator_shutdown_supersedes_cancel_acknowledged_follow_up() {
1768        let mut coord = WorkerCoordinator::new(42);
1769        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1770        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1771        let _ = coord.drain_outbox();
1772
1773        let running = test_worker_envelope(
1774            2,
1775            2,
1776            1,
1777            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1778                job_id: 1,
1779                state: JobState::Running,
1780                detail: Some("worker accepted job".into()),
1781            }),
1782        );
1783        coord.handle_inbound(&running).unwrap();
1784
1785        coord.cancel_job(1, "begin cancel".into()).unwrap();
1786        let cancel = coord.drain_outbox().unwrap();
1787        assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
1788        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1789
1790        coord.request_shutdown("shutdown now".into()).unwrap();
1791        assert!(coord.is_shutdown_requested());
1792
1793        let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1794        coord.handle_inbound(&ack).unwrap();
1795
1796        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
1797        let shutdown = coord.drain_outbox().unwrap();
1798        assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1799        assert!(coord.drain_outbox().is_none());
1800    }
1801
1802    #[test]
1803    fn coordinator_shutdown_supersedes_drain_completed_follow_up() {
1804        let mut coord = WorkerCoordinator::new(42);
1805        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1806        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1807        let _ = coord.drain_outbox();
1808
1809        let running = test_worker_envelope(
1810            2,
1811            2,
1812            1,
1813            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1814                job_id: 1,
1815                state: JobState::Running,
1816                detail: Some("worker accepted job".into()),
1817            }),
1818        );
1819        coord.handle_inbound(&running).unwrap();
1820
1821        coord.cancel_job(1, "begin cancel".into()).unwrap();
1822        let _ = coord.drain_outbox();
1823
1824        let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1825        coord.handle_inbound(&ack).unwrap();
1826        let drain = coord.drain_outbox().unwrap();
1827        assert!(matches!(drain.op, WorkerOp::DrainJob { job_id: 1 }));
1828        assert_eq!(coord.job_state(1), Some(JobState::Draining));
1829
1830        coord.request_shutdown("shutdown now".into()).unwrap();
1831        assert!(coord.is_shutdown_requested());
1832
1833        let drain_completed = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1834        coord.handle_inbound(&drain_completed).unwrap();
1835
1836        assert_eq!(coord.job_state(1), Some(JobState::Draining));
1837        let shutdown = coord.drain_outbox().unwrap();
1838        assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
1839        assert!(coord.drain_outbox().is_none());
1840    }
1841
1842    #[test]
1843    fn coordinator_marks_nonterminal_job_failed_when_shutdown_completes() {
1844        let mut coord = WorkerCoordinator::new(42);
1845        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1846        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1847        let _ = coord.drain_outbox();
1848
1849        let running = test_worker_envelope(
1850            2,
1851            2,
1852            1,
1853            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1854                job_id: 1,
1855                state: JobState::Running,
1856                detail: Some("worker started".into()),
1857            }),
1858        );
1859        coord.handle_inbound(&running).unwrap();
1860
1861        coord.cancel_job(1, "shutdown".into()).unwrap();
1862        let _ = coord.drain_outbox();
1863
1864        let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
1865        coord.handle_inbound(&ack).unwrap();
1866        let _ = coord.drain_outbox();
1867
1868        let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
1869        coord.handle_inbound(&drain).unwrap();
1870        let _ = coord.drain_outbox();
1871        assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
1872        assert_eq!(coord.inflight_count(), 1);
1873
1874        let shutdown = test_worker_envelope(5, 5, 4, WorkerOp::ShutdownCompleted);
1875        coord.handle_inbound(&shutdown).unwrap();
1876        assert_eq!(coord.job_state(1), Some(JobState::Failed));
1877        assert_eq!(coord.inflight_count(), 0);
1878        assert!(!coord.is_worker_ready());
1879    }
1880
1881    #[test]
1882    fn coordinator_rejects_wrong_direction_messages() {
1883        let mut coord = WorkerCoordinator::new(42);
1884        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1885
1886        // Worker should not send SpawnJob to coordinator
1887        let bad = WorkerEnvelope::new(
1888            2,
1889            2,
1890            42,
1891            1,
1892            WorkerOp::SpawnJob(SpawnJobRequest {
1893                job_id: 1,
1894                region_id: 100,
1895                task_id: 200,
1896                obligation_id: 300,
1897                payload: vec![],
1898            }),
1899        );
1900        let result = coord.handle_inbound(&bad);
1901        assert!(matches!(
1902            result,
1903            Err(WorkerChannelError::UnexpectedDirection { .. })
1904        ));
1905    }
1906
1907    #[test]
1908    fn coordinator_polls_status_and_applies_snapshot() {
1909        let mut coord = WorkerCoordinator::new(42);
1910        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1911        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1912        let _ = coord.drain_outbox();
1913
1914        coord.poll_status(1).unwrap();
1915        let poll = coord.drain_outbox().unwrap();
1916        assert!(matches!(poll.op, WorkerOp::PollStatus { job_id: 1 }));
1917
1918        let snapshot = test_worker_envelope(
1919            3,
1920            3,
1921            2,
1922            WorkerOp::StatusSnapshot(JobStatusSnapshot {
1923                job_id: 1,
1924                state: JobState::Running,
1925                detail: None,
1926            }),
1927        );
1928        coord.handle_inbound(&snapshot).unwrap();
1929        assert_eq!(coord.job_state(1), Some(JobState::Running));
1930    }
1931
1932    #[test]
1933    fn coordinator_rejects_duplicate_inbound_sequence() {
1934        let mut coord = WorkerCoordinator::new(42);
1935        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1936
1937        let diag = test_worker_envelope(
1938            2,
1939            2,
1940            1,
1941            WorkerOp::Diagnostic(DiagnosticEvent {
1942                level: DiagnosticLevel::Info,
1943                category: "lifecycle".into(),
1944                message: "worker initialized".into(),
1945                metadata: None,
1946            }),
1947        );
1948        coord.handle_inbound(&diag).unwrap();
1949        assert_eq!(coord.last_inbound_seq_no, 2);
1950
1951        let duplicate = test_worker_envelope(3, 2, 2, WorkerOp::ShutdownCompleted);
1952        assert_eq!(
1953            coord.handle_inbound(&duplicate),
1954            Err(WorkerChannelError::InboundSequenceNotFresh {
1955                last_seen: 2,
1956                actual: 2,
1957            })
1958        );
1959        assert_eq!(coord.last_inbound_seq_no, 2);
1960        assert!(coord.is_worker_ready());
1961    }
1962
1963    #[test]
1964    fn coordinator_accepts_fresh_bootstrap_after_shutdown_completed() {
1965        let mut coord = WorkerCoordinator::new(42);
1966        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1967        coord.request_shutdown("done".into()).unwrap();
1968        let _ = coord.drain_outbox();
1969
1970        let shutdown = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
1971        coord.handle_inbound(&shutdown).unwrap();
1972        assert_eq!(coord.last_inbound_seq_no, 2);
1973        assert!(!coord.is_worker_ready());
1974
1975        let reboot = worker_envelope(
1976            "test-worker-2",
1977            1,
1978            1,
1979            0,
1980            WorkerOp::BootstrapReady {
1981                worker_id: "test-worker-2".into(),
1982            },
1983        );
1984        coord.handle_inbound(&reboot).unwrap();
1985        assert!(coord.is_worker_ready());
1986        assert_eq!(coord.last_inbound_seq_no, 1);
1987    }
1988
1989    #[test]
1990    fn coordinator_marks_running_job_failed_when_worker_instance_changes() {
1991        let mut coord = WorkerCoordinator::new(42);
1992        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
1993        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
1994        let _ = coord.drain_outbox();
1995
1996        let running = test_worker_envelope(
1997            2,
1998            2,
1999            1,
2000            WorkerOp::StatusSnapshot(JobStatusSnapshot {
2001                job_id: 1,
2002                state: JobState::Running,
2003                detail: Some("worker one accepted job".into()),
2004            }),
2005        );
2006        coord.handle_inbound(&running).unwrap();
2007        assert_eq!(coord.job_state(1), Some(JobState::Running));
2008        assert_eq!(coord.inflight_count(), 1);
2009
2010        let reboot = worker_envelope(
2011            "test-worker-2",
2012            1,
2013            1,
2014            0,
2015            WorkerOp::BootstrapReady {
2016                worker_id: "test-worker-2".into(),
2017            },
2018        );
2019        coord.handle_inbound(&reboot).unwrap();
2020        assert!(coord.is_worker_ready());
2021        assert_eq!(coord.last_inbound_seq_no, 1);
2022        assert_eq!(coord.job_state(1), Some(JobState::Failed));
2023        assert_eq!(coord.inflight_count(), 0);
2024    }
2025
2026    #[test]
2027    fn coordinator_discards_queued_spawn_when_worker_instance_changes() {
2028        let mut coord = WorkerCoordinator::new(42);
2029        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2030        coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
2031
2032        let reboot = worker_envelope(
2033            "test-worker-2",
2034            1,
2035            1,
2036            0,
2037            WorkerOp::BootstrapReady {
2038                worker_id: "test-worker-2".into(),
2039            },
2040        );
2041        coord.handle_inbound(&reboot).unwrap();
2042
2043        assert_eq!(coord.job_state(1), Some(JobState::Failed));
2044        assert!(coord.drain_outbox().is_none());
2045
2046        coord.spawn_job(2, 100, 201, 301, vec![9]).unwrap();
2047        let fresh_spawn = coord.drain_outbox().unwrap();
2048        match fresh_spawn.op {
2049            WorkerOp::SpawnJob(request) => assert_eq!(request.job_id, 2),
2050            other => panic!("expected fresh spawn after reboot, got {other:?}"),
2051        }
2052    }
2053
2054    #[test]
2055    fn coordinator_discards_queued_cancel_when_worker_instance_changes() {
2056        let mut coord = WorkerCoordinator::new(42);
2057        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2058        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2059        let _ = coord.drain_outbox();
2060
2061        let running = test_worker_envelope(
2062            2,
2063            2,
2064            1,
2065            WorkerOp::StatusSnapshot(JobStatusSnapshot {
2066                job_id: 1,
2067                state: JobState::Running,
2068                detail: Some("worker accepted job".into()),
2069            }),
2070        );
2071        coord.handle_inbound(&running).unwrap();
2072
2073        coord.cancel_job(1, "worker replaced".into()).unwrap();
2074        assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
2075
2076        let reboot = worker_envelope(
2077            "test-worker-2",
2078            1,
2079            1,
2080            0,
2081            WorkerOp::BootstrapReady {
2082                worker_id: "test-worker-2".into(),
2083            },
2084        );
2085        coord.handle_inbound(&reboot).unwrap();
2086
2087        assert_eq!(coord.job_state(1), Some(JobState::Failed));
2088        assert!(coord.drain_outbox().is_none());
2089    }
2090
2091    #[test]
2092    fn coordinator_discards_queued_finalize_when_shutdown_completes() {
2093        let mut coord = WorkerCoordinator::new(42);
2094        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2095        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2096        let _ = coord.drain_outbox();
2097
2098        let running = test_worker_envelope(
2099            2,
2100            2,
2101            1,
2102            WorkerOp::StatusSnapshot(JobStatusSnapshot {
2103                job_id: 1,
2104                state: JobState::Running,
2105                detail: Some("worker started".into()),
2106            }),
2107        );
2108        coord.handle_inbound(&running).unwrap();
2109
2110        coord.cancel_job(1, "shutdown".into()).unwrap();
2111        let _ = coord.drain_outbox();
2112
2113        let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
2114        coord.handle_inbound(&ack).unwrap();
2115        let _ = coord.drain_outbox();
2116
2117        let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
2118        coord.handle_inbound(&drain).unwrap();
2119        assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
2120
2121        let pending_finalize = coord.drain_outbox().unwrap();
2122        assert!(matches!(
2123            pending_finalize.op,
2124            WorkerOp::FinalizeJob { job_id: 1 }
2125        ));
2126
2127        coord
2128            .enqueue_job_message(1, WorkerOp::FinalizeJob { job_id: 1 })
2129            .unwrap();
2130        let shutdown = test_worker_envelope(5, 5, 4, WorkerOp::ShutdownCompleted);
2131        coord.handle_inbound(&shutdown).unwrap();
2132
2133        assert_eq!(coord.job_state(1), Some(JobState::Failed));
2134        assert!(coord.drain_outbox().is_none());
2135    }
2136
2137    #[test]
2138    fn coordinator_accepts_fresh_bootstrap_after_bootstrap_failed() {
2139        let mut coord = WorkerCoordinator::new(42);
2140
2141        let failed = bootstrap_failed_envelope(1, "failed-worker-1", "synthetic boot failure");
2142        assert_eq!(
2143            coord.handle_inbound(&failed),
2144            Err(WorkerChannelError::BootstrapFailed(
2145                "synthetic boot failure".into()
2146            ))
2147        );
2148        assert_eq!(coord.last_inbound_seq_no, 1);
2149        assert!(!coord.is_worker_ready());
2150
2151        let retry = worker_envelope(
2152            "test-worker-2",
2153            1,
2154            1,
2155            0,
2156            WorkerOp::BootstrapReady {
2157                worker_id: "test-worker-2".into(),
2158            },
2159        );
2160        coord.handle_inbound(&retry).unwrap();
2161        assert!(coord.is_worker_ready());
2162        assert_eq!(coord.last_inbound_seq_no, 1);
2163    }
2164
2165    #[test]
2166    fn coordinator_accepts_fresh_bootstrap_failed_after_live_session() {
2167        let mut coord = WorkerCoordinator::new(42);
2168        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2169        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2170        let _ = coord.drain_outbox();
2171
2172        let running = test_worker_envelope(
2173            2,
2174            2,
2175            1,
2176            WorkerOp::StatusSnapshot(JobStatusSnapshot {
2177                job_id: 1,
2178                state: JobState::Running,
2179                detail: Some("worker one accepted job".into()),
2180            }),
2181        );
2182        coord.handle_inbound(&running).unwrap();
2183
2184        let failed = bootstrap_failed_envelope(1, "failed-worker-2", "reboot failed to init");
2185        assert_eq!(
2186            coord.handle_inbound(&failed),
2187            Err(WorkerChannelError::BootstrapFailed(
2188                "reboot failed to init".into()
2189            ))
2190        );
2191        assert_eq!(coord.last_inbound_seq_no, 1);
2192        assert!(!coord.is_worker_ready());
2193        assert_eq!(coord.job_state(1), Some(JobState::Failed));
2194        assert_eq!(coord.inflight_count(), 0);
2195    }
2196
2197    #[test]
2198    fn coordinator_keeps_prior_high_water_mark_until_rebootstrap() {
2199        let mut coord = WorkerCoordinator::new(42);
2200        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2201        coord.request_shutdown("done".into()).unwrap();
2202        let _ = coord.drain_outbox();
2203
2204        let shutdown = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
2205        coord.handle_inbound(&shutdown).unwrap();
2206        assert_eq!(coord.last_inbound_seq_no, 2);
2207        assert!(!coord.is_worker_ready());
2208
2209        let stale = test_worker_envelope(
2210            1,
2211            1,
2212            0,
2213            WorkerOp::Diagnostic(DiagnosticEvent {
2214                level: DiagnosticLevel::Info,
2215                category: "lifecycle".into(),
2216                message: "stale pre-restart message".into(),
2217                metadata: None,
2218            }),
2219        );
2220        assert_eq!(
2221            coord.handle_inbound(&stale),
2222            Err(WorkerChannelError::InboundWorkerSessionMismatch {
2223                expected: None,
2224                actual: "test-worker-1".into(),
2225            })
2226        );
2227        assert_eq!(coord.last_inbound_seq_no, 2);
2228
2229        let reboot = worker_envelope(
2230            "test-worker-2",
2231            1,
2232            1,
2233            0,
2234            WorkerOp::BootstrapReady {
2235                worker_id: "test-worker-2".into(),
2236            },
2237        );
2238        coord.handle_inbound(&reboot).unwrap();
2239        assert!(coord.is_worker_ready());
2240        assert_eq!(coord.last_inbound_seq_no, 1);
2241    }
2242
2243    #[test]
2244    fn coordinator_rejects_replayed_bootstrap_from_same_worker_session() {
2245        let mut coord = WorkerCoordinator::new(42);
2246
2247        let ready = worker_envelope(
2248            "stable-worker",
2249            5,
2250            5,
2251            0,
2252            WorkerOp::BootstrapReady {
2253                worker_id: "stable-worker".into(),
2254            },
2255        );
2256        coord.handle_inbound(&ready).unwrap();
2257
2258        let replay = worker_envelope(
2259            "stable-worker",
2260            1,
2261            1,
2262            1,
2263            WorkerOp::BootstrapReady {
2264                worker_id: "stable-worker".into(),
2265            },
2266        );
2267        assert_eq!(
2268            coord.handle_inbound(&replay),
2269            Err(WorkerChannelError::InboundSequenceNotFresh {
2270                last_seen: 5,
2271                actual: 1,
2272            })
2273        );
2274    }
2275
2276    #[test]
2277    fn coordinator_rejects_replayed_bootstrap_failed_from_same_worker_session() {
2278        let mut coord = WorkerCoordinator::new(42);
2279
2280        let ready = worker_envelope(
2281            "stable-worker",
2282            5,
2283            5,
2284            0,
2285            WorkerOp::BootstrapReady {
2286                worker_id: "stable-worker".into(),
2287            },
2288        );
2289        coord.handle_inbound(&ready).unwrap();
2290
2291        let replay = bootstrap_failed_envelope(1, "stable-worker", "stale failure replay");
2292        assert_eq!(
2293            coord.handle_inbound(&replay),
2294            Err(WorkerChannelError::InboundSequenceNotFresh {
2295                last_seen: 5,
2296                actual: 1,
2297            })
2298        );
2299        assert!(coord.is_worker_ready());
2300    }
2301
2302    #[test]
2303    fn coordinator_rejects_out_of_order_job_message_sequence() {
2304        let mut coord = WorkerCoordinator::new(42);
2305        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2306        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2307        let _ = coord.drain_outbox();
2308
2309        let running = test_worker_envelope(
2310            2,
2311            2,
2312            1,
2313            WorkerOp::StatusSnapshot(JobStatusSnapshot {
2314                job_id: 1,
2315                state: JobState::Running,
2316                detail: None,
2317            }),
2318        );
2319        coord.handle_inbound(&running).unwrap();
2320
2321        let diag = test_worker_envelope(
2322            3,
2323            4,
2324            2,
2325            WorkerOp::Diagnostic(DiagnosticEvent {
2326                level: DiagnosticLevel::Info,
2327                category: "scheduler".into(),
2328                message: "worker tick".into(),
2329                metadata: None,
2330            }),
2331        );
2332        coord.handle_inbound(&diag).unwrap();
2333
2334        let stale_completion = test_worker_envelope(
2335            4,
2336            3,
2337            3,
2338            WorkerOp::JobCompleted(JobResult {
2339                job_id: 1,
2340                outcome: JobOutcome::Ok { payload: vec![7] },
2341            }),
2342        );
2343        assert_eq!(
2344            coord.handle_inbound(&stale_completion),
2345            Err(WorkerChannelError::InboundSequenceNotFresh {
2346                last_seen: 4,
2347                actual: 3,
2348            })
2349        );
2350        assert_eq!(coord.last_inbound_seq_no, 4);
2351        assert_eq!(coord.job_state(1), Some(JobState::Running));
2352    }
2353
2354    #[test]
2355    fn envelope_validates_version() {
2356        let mut env = worker_envelope(
2357            "w",
2358            1,
2359            1,
2360            0,
2361            WorkerOp::BootstrapReady {
2362                worker_id: "w".into(),
2363            },
2364        );
2365        assert!(env.validate().is_ok());
2366
2367        env.version = 99;
2368        assert!(matches!(
2369            env.validate(),
2370            Err(WorkerChannelError::VersionMismatch { .. })
2371        ));
2372    }
2373
2374    #[test]
2375    fn envelope_validates_replay_metadata() {
2376        let mut env = worker_envelope(
2377            "w",
2378            1,
2379            1,
2380            0,
2381            WorkerOp::BootstrapReady {
2382                worker_id: "w".into(),
2383            },
2384        );
2385        assert!(env.validate().is_ok());
2386
2387        env.decision_seq = 2;
2388        env.replay_hash = replay_hash(
2389            env.message_id,
2390            env.seq_no,
2391            env.decision_seq,
2392            env.seed,
2393            env.issued_at_turn,
2394            env.worker_id.as_deref(),
2395            &env.op,
2396        );
2397        assert!(env.validate().is_ok());
2398
2399        env.op = WorkerOp::ShutdownCompleted;
2400        assert!(matches!(
2401            env.validate(),
2402            Err(WorkerChannelError::ReplayHashMismatch { .. })
2403        ));
2404    }
2405
2406    #[test]
2407    fn envelope_validates_payload_size() {
2408        let env = WorkerEnvelope::new(
2409            1,
2410            1,
2411            42,
2412            0,
2413            WorkerOp::SpawnJob(SpawnJobRequest {
2414                job_id: 1,
2415                region_id: 100,
2416                task_id: 200,
2417                obligation_id: 300,
2418                payload: vec![0u8; MAX_PAYLOAD_BYTES + 1],
2419            }),
2420        );
2421        assert!(matches!(
2422            env.validate(),
2423            Err(WorkerChannelError::PayloadTooLarge { .. })
2424        ));
2425
2426        let completed = test_worker_envelope(
2427            2,
2428            2,
2429            0,
2430            WorkerOp::JobCompleted(JobResult {
2431                job_id: 1,
2432                outcome: JobOutcome::Ok {
2433                    payload: vec![0u8; MAX_PAYLOAD_BYTES + 1],
2434                },
2435            }),
2436        );
2437        assert!(matches!(
2438            completed.validate(),
2439            Err(WorkerChannelError::PayloadTooLarge { .. })
2440        ));
2441    }
2442
2443    #[test]
2444    fn job_state_transitions_are_correct() {
2445        // Valid transitions
2446        assert!(JobState::Created.can_transition_to(JobState::Queued));
2447        assert!(JobState::Created.can_transition_to(JobState::Failed));
2448        assert!(JobState::Queued.can_transition_to(JobState::Running));
2449        assert!(JobState::Queued.can_transition_to(JobState::CancelRequested));
2450        assert!(JobState::Queued.can_transition_to(JobState::Failed));
2451        assert!(JobState::Running.can_transition_to(JobState::Completed));
2452        assert!(JobState::Running.can_transition_to(JobState::CancelRequested));
2453        assert!(JobState::Running.can_transition_to(JobState::Failed));
2454        assert!(JobState::CancelRequested.can_transition_to(JobState::Completed));
2455        assert!(JobState::CancelRequested.can_transition_to(JobState::Draining));
2456        assert!(JobState::CancelRequested.can_transition_to(JobState::Failed));
2457        assert!(JobState::Draining.can_transition_to(JobState::Finalizing));
2458        assert!(JobState::Draining.can_transition_to(JobState::Failed));
2459        assert!(JobState::Finalizing.can_transition_to(JobState::Completed));
2460        assert!(JobState::Finalizing.can_transition_to(JobState::Failed));
2461
2462        assert!(JobState::Queued.can_transition_to(JobState::Completed));
2463
2464        // Invalid transitions
2465        assert!(!JobState::Created.can_transition_to(JobState::Running));
2466        assert!(!JobState::Created.can_transition_to(JobState::Completed));
2467        assert!(!JobState::Draining.can_transition_to(JobState::Completed));
2468        assert!(!JobState::Completed.can_transition_to(JobState::Running));
2469    }
2470
2471    #[test]
2472    fn envelope_serialization_round_trip() {
2473        let env = WorkerEnvelope::new(
2474            1,
2475            1,
2476            42,
2477            0,
2478            WorkerOp::SpawnJob(SpawnJobRequest {
2479                job_id: 1,
2480                region_id: 100,
2481                task_id: 200,
2482                obligation_id: 300,
2483                payload: vec![1, 2, 3],
2484            }),
2485        );
2486        let json = serde_json::to_string(&env).unwrap();
2487        let deserialized: WorkerEnvelope = serde_json::from_str(&json).unwrap();
2488        assert_eq!(env, deserialized);
2489    }
2490
2491    #[test]
2492    fn worker_envelope_serialization_round_trip_preserves_worker_identity() {
2493        let env = worker_envelope(
2494            "worker-round-trip",
2495            7,
2496            9,
2497            3,
2498            WorkerOp::Diagnostic(DiagnosticEvent {
2499                level: DiagnosticLevel::Info,
2500                category: "serde".into(),
2501                message: "round-trip".into(),
2502                metadata: Some("worker identity must survive serialization".into()),
2503            }),
2504        );
2505        let json = serde_json::to_string(&env).unwrap();
2506        let deserialized: WorkerEnvelope = serde_json::from_str(&json).unwrap();
2507        assert_eq!(env, deserialized);
2508        assert!(deserialized.validate().is_ok());
2509    }
2510
2511    #[test]
2512    fn coordinator_sequence_numbers_are_monotonic() {
2513        let mut coord = WorkerCoordinator::new(42);
2514        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2515
2516        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2517        coord.spawn_job(2, 100, 201, 301, vec![]).unwrap();
2518
2519        let msg1 = coord.drain_outbox().unwrap();
2520        let msg2 = coord.drain_outbox().unwrap();
2521        assert!(msg2.seq_no > msg1.seq_no);
2522        assert!(msg2.message_id > msg1.message_id);
2523    }
2524
2525    #[test]
2526    fn coordinator_emits_independent_decision_sequence() {
2527        let mut coord = WorkerCoordinator::new(42);
2528        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2529        coord.next_decision_seq = 19;
2530
2531        coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
2532        coord.spawn_job(2, 100, 201, 301, vec![]).unwrap();
2533
2534        let msg1 = coord.drain_outbox().unwrap();
2535        let msg2 = coord.drain_outbox().unwrap();
2536
2537        assert_eq!(msg1.seq_no, 1);
2538        assert_eq!(msg1.decision_seq, 19);
2539        assert_ne!(msg1.seq_no, msg1.decision_seq);
2540        assert!(msg1.validate().is_ok());
2541
2542        assert_eq!(msg2.seq_no, 2);
2543        assert_eq!(msg2.decision_seq, 20);
2544        assert_ne!(msg2.seq_no, msg2.decision_seq);
2545        assert!(msg2.validate().is_ok());
2546    }
2547
2548    #[test]
2549    fn diagnostic_events_are_accepted() {
2550        let mut coord = WorkerCoordinator::new(42);
2551        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2552
2553        let diag = test_worker_envelope(
2554            2,
2555            2,
2556            1,
2557            WorkerOp::Diagnostic(DiagnosticEvent {
2558                level: DiagnosticLevel::Info,
2559                category: "lifecycle".into(),
2560                message: "worker initialized".into(),
2561                metadata: None,
2562            }),
2563        );
2564        assert!(coord.handle_inbound(&diag).is_ok());
2565    }
2566
2567    #[test]
2568    fn coordinator_rejects_stale_old_worker_message_after_rebootstrap() {
2569        let mut coord = WorkerCoordinator::new(42);
2570        coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
2571
2572        let reboot = worker_envelope(
2573            "test-worker-2",
2574            1,
2575            1,
2576            0,
2577            WorkerOp::BootstrapReady {
2578                worker_id: "test-worker-2".into(),
2579            },
2580        );
2581        coord.handle_inbound(&reboot).unwrap();
2582
2583        let stale = test_worker_envelope(
2584            9,
2585            9,
2586            1,
2587            WorkerOp::Diagnostic(DiagnosticEvent {
2588                level: DiagnosticLevel::Warn,
2589                category: "stale".into(),
2590                message: "late message from replaced worker".into(),
2591                metadata: None,
2592            }),
2593        );
2594        assert_eq!(
2595            coord.handle_inbound(&stale),
2596            Err(WorkerChannelError::InboundWorkerSessionMismatch {
2597                expected: Some("test-worker-2".into()),
2598                actual: "test-worker-1".into(),
2599            })
2600        );
2601
2602        let fresh = worker_envelope(
2603            "test-worker-2",
2604            2,
2605            2,
2606            1,
2607            WorkerOp::Diagnostic(DiagnosticEvent {
2608                level: DiagnosticLevel::Info,
2609                category: "fresh".into(),
2610                message: "new worker follow-up".into(),
2611                metadata: None,
2612            }),
2613        );
2614        coord.handle_inbound(&fresh).unwrap();
2615        assert_eq!(coord.last_inbound_seq_no, 2);
2616    }
2617
2618    #[test]
2619    fn envelope_rejects_worker_message_without_session_identity() {
2620        let env = WorkerEnvelope::new(2, 2, 42, 1, WorkerOp::ShutdownCompleted);
2621        assert_eq!(
2622            env.validate(),
2623            Err(WorkerChannelError::MissingWorkerSessionIdentity)
2624        );
2625    }
2626}