use std::collections::VecDeque;
use std::fmt;
pub const WORKER_PROTOCOL_VERSION: u32 = 1;
pub const MAX_PAYLOAD_BYTES: usize = 262_144;
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WorkerPayloadTransfer {
StructuredClone,
TransferArrayBuffer,
}
#[must_use]
fn replay_hash(
message_id: u64,
seq_no: u64,
decision_seq: u64,
seed: u64,
issued_at_turn: u64,
worker_id: Option<&str>,
op: &WorkerOp,
) -> u64 {
let mut hash = message_id
.wrapping_mul(0x9E37_79B1_85EB_CA87)
.wrapping_add(seq_no.rotate_left(13))
^ decision_seq.rotate_left(23)
^ seed.rotate_left(29)
^ issued_at_turn.rotate_left(47);
if let Some(worker_id) = worker_id {
for byte in worker_id.as_bytes() {
hash = hash.rotate_left(7) ^ u64::from(*byte);
hash = hash.wrapping_mul(0x100_0000_01B3);
}
}
for byte in serde_json::to_vec(op).unwrap_or_default() {
hash = hash.rotate_left(5) ^ u64::from(byte);
hash = hash.wrapping_mul(0x100_0000_01B3);
}
hash
}
fn validate_payload_size(size: usize) -> Result<(), WorkerChannelError> {
if size > MAX_PAYLOAD_BYTES {
return Err(WorkerChannelError::PayloadTooLarge {
size,
max: MAX_PAYLOAD_BYTES,
});
}
Ok(())
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct WorkerEnvelope {
pub version: u32,
pub message_id: u64,
pub seq_no: u64,
pub decision_seq: u64,
pub seed: u64,
pub issued_at_turn: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub worker_id: Option<String>,
pub replay_hash: u64,
pub op: WorkerOp,
}
impl WorkerEnvelope {
#[must_use]
pub fn new(message_id: u64, seq_no: u64, seed: u64, issued_at_turn: u64, op: WorkerOp) -> Self {
Self::new_with_decision_seq(message_id, seq_no, seq_no, seed, issued_at_turn, op)
}
#[must_use]
pub fn new_with_decision_seq(
message_id: u64,
seq_no: u64,
decision_seq: u64,
seed: u64,
issued_at_turn: u64,
op: WorkerOp,
) -> Self {
Self {
version: WORKER_PROTOCOL_VERSION,
message_id,
seq_no,
decision_seq,
seed,
issued_at_turn,
worker_id: None,
replay_hash: replay_hash(
message_id,
seq_no,
decision_seq,
seed,
issued_at_turn,
None,
&op,
),
op,
}
}
#[must_use]
pub fn from_worker(
worker_id: impl Into<String>,
message_id: u64,
seq_no: u64,
seed: u64,
issued_at_turn: u64,
op: WorkerOp,
) -> Self {
Self::from_worker_with_decision_seq(
worker_id,
message_id,
seq_no,
seq_no,
seed,
issued_at_turn,
op,
)
}
#[must_use]
pub fn from_worker_with_decision_seq(
worker_id: impl Into<String>,
message_id: u64,
seq_no: u64,
decision_seq: u64,
seed: u64,
issued_at_turn: u64,
op: WorkerOp,
) -> Self {
let worker_id = worker_id.into();
Self {
version: WORKER_PROTOCOL_VERSION,
message_id,
seq_no,
decision_seq,
seed,
issued_at_turn,
worker_id: Some(worker_id.clone()),
replay_hash: replay_hash(
message_id,
seq_no,
decision_seq,
seed,
issued_at_turn,
Some(worker_id.as_str()),
&op,
),
op,
}
}
pub fn validate(&self) -> Result<(), WorkerChannelError> {
if self.version != WORKER_PROTOCOL_VERSION {
return Err(WorkerChannelError::VersionMismatch {
expected: WORKER_PROTOCOL_VERSION,
actual: self.version,
});
}
let expected_replay_hash = replay_hash(
self.message_id,
self.seq_no,
self.decision_seq,
self.seed,
self.issued_at_turn,
self.worker_id.as_deref(),
&self.op,
);
if self.replay_hash != expected_replay_hash {
return Err(WorkerChannelError::ReplayHashMismatch {
expected: expected_replay_hash,
actual: self.replay_hash,
});
}
self.validate_worker_identity()?;
match &self.op {
WorkerOp::SpawnJob(req) => validate_payload_size(req.payload.len())?,
WorkerOp::JobCompleted(JobResult {
outcome: JobOutcome::Ok { payload },
..
}) => validate_payload_size(payload.len())?,
_ => {}
}
Ok(())
}
fn validate_worker_identity(&self) -> Result<(), WorkerChannelError> {
match &self.op {
WorkerOp::BootstrapReady { worker_id }
| WorkerOp::BootstrapFailed { worker_id, .. } => {
let actual = self
.worker_id
.as_deref()
.ok_or(WorkerChannelError::MissingWorkerSessionIdentity)?;
if actual != worker_id {
return Err(WorkerChannelError::WorkerIdentityMismatch {
expected: worker_id.clone(),
actual: actual.to_string(),
});
}
}
WorkerOp::StatusSnapshot(_)
| WorkerOp::JobCompleted(_)
| WorkerOp::CancelAcknowledged { .. }
| WorkerOp::DrainCompleted { .. }
| WorkerOp::FinalizeCompleted { .. }
| WorkerOp::ShutdownCompleted
| WorkerOp::Diagnostic(_) => {
if self.worker_id.is_none() {
return Err(WorkerChannelError::MissingWorkerSessionIdentity);
}
}
WorkerOp::SpawnJob(_)
| WorkerOp::PollStatus { .. }
| WorkerOp::CancelJob { .. }
| WorkerOp::DrainJob { .. }
| WorkerOp::FinalizeJob { .. }
| WorkerOp::ShutdownWorker { .. } => {
if let Some(worker_id) = &self.worker_id {
return Err(WorkerChannelError::UnexpectedWorkerSessionIdentity(
worker_id.clone(),
));
}
}
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
pub enum WorkerOp {
BootstrapReady {
worker_id: String,
},
BootstrapFailed {
worker_id: String,
reason: String,
},
SpawnJob(SpawnJobRequest),
PollStatus {
job_id: u64,
},
StatusSnapshot(JobStatusSnapshot),
JobCompleted(JobResult),
CancelJob {
job_id: u64,
reason: String,
},
CancelAcknowledged {
job_id: u64,
},
DrainJob {
job_id: u64,
},
DrainCompleted {
job_id: u64,
},
FinalizeJob {
job_id: u64,
},
FinalizeCompleted {
job_id: u64,
},
ShutdownWorker {
reason: String,
},
ShutdownCompleted,
Diagnostic(DiagnosticEvent),
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct SpawnJobRequest {
pub job_id: u64,
pub region_id: u64,
pub task_id: u64,
pub obligation_id: u64,
pub payload: Vec<u8>,
}
impl SpawnJobRequest {
#[must_use]
pub fn payload_transfer(&self) -> WorkerPayloadTransfer {
WorkerPayloadTransfer::StructuredClone
}
#[must_use]
pub fn owned_payload(&self) -> bool {
true
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct JobStatusSnapshot {
pub job_id: u64,
pub state: JobState,
pub detail: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct JobResult {
pub job_id: u64,
pub outcome: JobOutcome,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "status")]
pub enum JobOutcome {
Ok {
payload: Vec<u8>,
},
Err {
code: String,
message: String,
},
Cancelled {
reason: String,
},
Panicked {
message: String,
},
}
impl JobOutcome {
#[must_use]
pub fn payload_transfer(&self) -> Option<WorkerPayloadTransfer> {
match self {
Self::Ok { .. } => Some(WorkerPayloadTransfer::StructuredClone),
Self::Err { .. } | Self::Cancelled { .. } | Self::Panicked { .. } => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct DiagnosticEvent {
pub level: DiagnosticLevel,
pub category: String,
pub message: String,
pub metadata: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum DiagnosticLevel {
Info,
Warn,
Error,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum JobState {
Created,
Queued,
Running,
CancelRequested,
Draining,
Finalizing,
Completed,
Failed,
}
impl fmt::Display for JobState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Created => write!(f, "created"),
Self::Queued => write!(f, "queued"),
Self::Running => write!(f, "running"),
Self::CancelRequested => write!(f, "cancel_requested"),
Self::Draining => write!(f, "draining"),
Self::Finalizing => write!(f, "finalizing"),
Self::Completed => write!(f, "completed"),
Self::Failed => write!(f, "failed"),
}
}
}
impl JobState {
#[must_use]
pub const fn is_terminal(self) -> bool {
matches!(self, Self::Completed | Self::Failed)
}
#[must_use]
pub const fn allowed_in_status_snapshot(self) -> bool {
matches!(self, Self::Queued | Self::Running)
}
#[must_use]
pub fn can_transition_to(self, next: Self) -> bool {
matches!(
(self, next),
(Self::Created, Self::Queued | Self::Failed)
| (
Self::Queued,
Self::Running | Self::CancelRequested | Self::Completed | Self::Failed,
)
| (
Self::Running,
Self::Completed | Self::CancelRequested | Self::Failed,
)
| (
Self::CancelRequested,
Self::Completed | Self::Draining | Self::Failed,
)
| (Self::Draining, Self::Finalizing | Self::Failed)
| (Self::Finalizing, Self::Completed | Self::Failed)
)
}
}
#[derive(Debug)]
pub struct TrackedJob {
pub job_id: u64,
pub region_id: u64,
pub state: JobState,
pub last_seq_no: u64,
}
impl TrackedJob {
#[must_use]
pub fn new(job_id: u64, region_id: u64) -> Self {
Self {
job_id,
region_id,
state: JobState::Created,
last_seq_no: 0,
}
}
pub fn transition_to(&mut self, next: JobState) -> Result<JobState, WorkerChannelError> {
if !self.state.can_transition_to(next) {
return Err(WorkerChannelError::InvalidTransition {
job_id: self.job_id,
from: self.state,
to: next,
});
}
let prev = self.state;
self.state = next;
Ok(prev)
}
}
#[derive(Debug)]
pub struct WorkerCoordinator {
outbox: VecDeque<WorkerEnvelope>,
next_seq: u64,
next_decision_seq: u64,
next_message_id: u64,
jobs: std::collections::BTreeMap<u64, TrackedJob>,
seed: u64,
turn: u64,
last_inbound_seq_no: u64,
active_worker_id: Option<String>,
worker_ready: bool,
shutdown_requested: bool,
}
impl WorkerCoordinator {
#[must_use]
pub fn new(seed: u64) -> Self {
Self {
outbox: VecDeque::new(),
next_seq: 1,
next_decision_seq: 1,
next_message_id: 1,
jobs: std::collections::BTreeMap::new(),
seed,
turn: 0,
last_inbound_seq_no: 0,
active_worker_id: None,
worker_ready: false,
shutdown_requested: false,
}
}
pub fn advance_turn(&mut self) {
self.turn += 1;
}
#[must_use]
pub fn is_worker_ready(&self) -> bool {
self.worker_ready
}
#[must_use]
pub fn is_shutdown_requested(&self) -> bool {
self.shutdown_requested
}
#[must_use]
pub fn inflight_count(&self) -> usize {
self.jobs
.values()
.filter(|j| j.state != JobState::Completed && j.state != JobState::Failed)
.count()
}
pub fn spawn_job(
&mut self,
job_id: u64,
region_id: u64,
task_id: u64,
obligation_id: u64,
payload: Vec<u8>,
) -> Result<u64, WorkerChannelError> {
if !self.worker_ready {
return Err(WorkerChannelError::WorkerNotReady);
}
if self.shutdown_requested {
return Err(WorkerChannelError::ShutdownInProgress);
}
validate_payload_size(payload.len())?;
if let Some(existing_state) = self.jobs.get(&job_id).map(|job| job.state) {
if existing_state == JobState::Failed {
self.jobs.remove(&job_id);
} else {
return Err(WorkerChannelError::DuplicateJobId(job_id));
}
}
let mut tracked = TrackedJob::new(job_id, region_id);
tracked.transition_to(JobState::Queued)?;
let envelope = self.make_envelope(WorkerOp::SpawnJob(SpawnJobRequest {
job_id,
region_id,
task_id,
obligation_id,
payload,
}));
tracked.last_seq_no = envelope.seq_no;
self.jobs.insert(job_id, tracked);
self.outbox.push_back(envelope);
Ok(job_id)
}
pub fn cancel_job(&mut self, job_id: u64, reason: String) -> Result<(), WorkerChannelError> {
if self.shutdown_requested {
return Err(WorkerChannelError::ShutdownInProgress);
}
{
let job = self
.jobs
.get_mut(&job_id)
.ok_or(WorkerChannelError::UnknownJobId(job_id))?;
job.transition_to(JobState::CancelRequested)?;
}
self.enqueue_job_message(job_id, WorkerOp::CancelJob { job_id, reason })
}
pub fn poll_status(&mut self, job_id: u64) -> Result<(), WorkerChannelError> {
if self.shutdown_requested {
return Err(WorkerChannelError::ShutdownInProgress);
}
let state = self
.jobs
.get(&job_id)
.ok_or(WorkerChannelError::UnknownJobId(job_id))?
.state;
if state.is_terminal() {
return Err(WorkerChannelError::JobNotPollable { job_id, state });
}
self.enqueue_job_message(job_id, WorkerOp::PollStatus { job_id })
}
pub fn request_shutdown(&mut self, reason: String) -> Result<(), WorkerChannelError> {
if self.shutdown_requested {
return Err(WorkerChannelError::ShutdownInProgress);
}
self.shutdown_requested = true;
self.discard_outbound_session_messages();
let envelope = self.make_envelope(WorkerOp::ShutdownWorker { reason });
self.outbox.push_back(envelope);
Ok(())
}
pub fn handle_inbound(&mut self, envelope: &WorkerEnvelope) -> Result<(), WorkerChannelError> {
self.prepare_inbound(envelope)?;
match &envelope.op {
WorkerOp::BootstrapReady { worker_id } => {
self.active_worker_id = Some(worker_id.clone());
self.worker_ready = true;
self.shutdown_requested = false;
Ok(())
}
WorkerOp::BootstrapFailed { reason, .. } => {
self.fail_nonterminal_jobs();
self.discard_outbound_session_messages();
self.active_worker_id = None;
self.worker_ready = false;
self.shutdown_requested = false;
Err(WorkerChannelError::BootstrapFailed(reason.clone()))
}
WorkerOp::StatusSnapshot(snapshot) => self.handle_status_snapshot(snapshot),
WorkerOp::JobCompleted(result) => {
let job = self
.jobs
.get_mut(&result.job_id)
.ok_or(WorkerChannelError::UnknownJobId(result.job_id))?;
if !matches!(
job.state,
JobState::Queued | JobState::Running | JobState::CancelRequested
) {
return Err(WorkerChannelError::UnexpectedCompletionPhase {
job_id: result.job_id,
state: job.state,
});
}
job.transition_to(JobState::Completed)?;
Ok(())
}
WorkerOp::CancelAcknowledged { job_id } => {
if !self.jobs.contains_key(job_id) {
return Err(WorkerChannelError::UnknownJobId(*job_id));
}
if self.shutdown_requested {
return Ok(());
}
{
let job = self
.jobs
.get_mut(job_id)
.ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
job.transition_to(JobState::Draining)?;
}
self.enqueue_job_message(*job_id, WorkerOp::DrainJob { job_id: *job_id })
}
WorkerOp::DrainCompleted { job_id } => {
if !self.jobs.contains_key(job_id) {
return Err(WorkerChannelError::UnknownJobId(*job_id));
}
if self.shutdown_requested {
return Ok(());
}
{
let job = self
.jobs
.get_mut(job_id)
.ok_or(WorkerChannelError::UnknownJobId(*job_id))?;
job.transition_to(JobState::Finalizing)?;
}
self.enqueue_job_message(*job_id, WorkerOp::FinalizeJob { job_id: *job_id })
}
WorkerOp::FinalizeCompleted { job_id } => {
if !self.jobs.contains_key(job_id) {
return Err(WorkerChannelError::UnknownJobId(*job_id));
}
self.remove_job(*job_id);
Ok(())
}
WorkerOp::ShutdownCompleted => {
self.fail_nonterminal_jobs();
self.discard_outbound_session_messages();
self.active_worker_id = None;
self.shutdown_requested = false;
self.worker_ready = false;
Ok(())
}
WorkerOp::Diagnostic(_) => Ok(()),
WorkerOp::SpawnJob(_)
| WorkerOp::PollStatus { .. }
| WorkerOp::CancelJob { .. }
| WorkerOp::DrainJob { .. }
| WorkerOp::FinalizeJob { .. }
| WorkerOp::ShutdownWorker { .. } => Err(WorkerChannelError::UnexpectedDirection {
op: format!("{:?}", std::mem::discriminant(&envelope.op)),
}),
}
}
#[must_use]
pub fn drain_outbox(&mut self) -> Option<WorkerEnvelope> {
self.outbox.pop_front()
}
#[must_use]
pub fn job_state(&self, job_id: u64) -> Option<JobState> {
self.jobs.get(&job_id).map(|j| j.state)
}
pub fn remove_job(&mut self, job_id: u64) -> Option<JobState> {
self.jobs.remove(&job_id).map(|j| j.state)
}
fn enqueue_job_message(&mut self, job_id: u64, op: WorkerOp) -> Result<(), WorkerChannelError> {
if !self.jobs.contains_key(&job_id) {
return Err(WorkerChannelError::UnknownJobId(job_id));
}
let envelope = self.make_envelope(op);
if let Some(job) = self.jobs.get_mut(&job_id) {
job.last_seq_no = envelope.seq_no;
}
self.outbox.push_back(envelope);
Ok(())
}
fn handle_status_snapshot(
&mut self,
snapshot: &JobStatusSnapshot,
) -> Result<(), WorkerChannelError> {
if snapshot.state.is_terminal() {
return Err(WorkerChannelError::TerminalStatusSnapshot {
job_id: snapshot.job_id,
state: snapshot.state,
});
}
if !snapshot.state.allowed_in_status_snapshot() {
return Err(WorkerChannelError::InvalidStatusSnapshotState {
job_id: snapshot.job_id,
state: snapshot.state,
});
}
let job = self
.jobs
.get_mut(&snapshot.job_id)
.ok_or(WorkerChannelError::UnknownJobId(snapshot.job_id))?;
if !matches!(
job.state,
JobState::Created | JobState::Queued | JobState::Running
) {
return Ok(());
}
if job.state != snapshot.state {
job.transition_to(snapshot.state)?;
}
Ok(())
}
fn prepare_inbound(&mut self, envelope: &WorkerEnvelope) -> Result<(), WorkerChannelError> {
envelope.validate()?;
if self.should_reset_inbound_session(&envelope.op) {
self.fail_nonterminal_jobs();
self.reset_inbound_session();
}
self.validate_inbound_worker_session(envelope)?;
self.validate_inbound_sequence(envelope.seq_no)?;
self.record_inbound_sequence(envelope.seq_no);
Ok(())
}
fn should_reset_inbound_session(&self, op: &WorkerOp) -> bool {
match op {
WorkerOp::BootstrapReady { worker_id }
| WorkerOp::BootstrapFailed { worker_id, .. } => {
self.active_worker_id.as_deref() != Some(worker_id.as_str())
}
_ => false,
}
}
fn fail_nonterminal_jobs(&mut self) {
for job in self.jobs.values_mut() {
if job.state.is_terminal() {
continue;
}
let _ = job.transition_to(JobState::Failed);
}
}
fn discard_outbound_session_messages(&mut self) {
self.outbox.clear();
}
fn reset_inbound_session(&mut self) {
self.last_inbound_seq_no = 0;
self.discard_outbound_session_messages();
}
fn validate_inbound_sequence(&self, seq_no: u64) -> Result<(), WorkerChannelError> {
if seq_no <= self.last_inbound_seq_no {
return Err(WorkerChannelError::InboundSequenceNotFresh {
last_seen: self.last_inbound_seq_no,
actual: seq_no,
});
}
Ok(())
}
fn record_inbound_sequence(&mut self, seq_no: u64) {
self.last_inbound_seq_no = seq_no;
}
fn validate_inbound_worker_session(
&self,
envelope: &WorkerEnvelope,
) -> Result<(), WorkerChannelError> {
match &envelope.op {
WorkerOp::BootstrapReady { .. }
| WorkerOp::BootstrapFailed { .. }
| WorkerOp::SpawnJob(_)
| WorkerOp::PollStatus { .. }
| WorkerOp::CancelJob { .. }
| WorkerOp::DrainJob { .. }
| WorkerOp::FinalizeJob { .. }
| WorkerOp::ShutdownWorker { .. } => Ok(()),
_ => {
let actual = envelope
.worker_id
.as_deref()
.ok_or(WorkerChannelError::MissingWorkerSessionIdentity)?;
if self.active_worker_id.as_deref() == Some(actual) {
return Ok(());
}
Err(WorkerChannelError::InboundWorkerSessionMismatch {
expected: self.active_worker_id.clone(),
actual: actual.to_string(),
})
}
}
}
fn make_envelope(&mut self, op: WorkerOp) -> WorkerEnvelope {
let msg_id = self.next_message_id;
let seq = self.next_seq;
let decision_seq = self.next_decision_seq;
self.next_message_id += 1;
self.next_seq += 1;
self.next_decision_seq += 1;
WorkerEnvelope::new_with_decision_seq(msg_id, seq, decision_seq, self.seed, self.turn, op)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WorkerChannelError {
VersionMismatch {
expected: u32,
actual: u32,
},
ReplayHashMismatch {
expected: u64,
actual: u64,
},
InboundSequenceNotFresh {
last_seen: u64,
actual: u64,
},
MissingWorkerSessionIdentity,
UnexpectedWorkerSessionIdentity(String),
WorkerIdentityMismatch {
expected: String,
actual: String,
},
InboundWorkerSessionMismatch {
expected: Option<String>,
actual: String,
},
PayloadTooLarge {
size: usize,
max: usize,
},
InvalidTransition {
job_id: u64,
from: JobState,
to: JobState,
},
TerminalStatusSnapshot {
job_id: u64,
state: JobState,
},
InvalidStatusSnapshotState {
job_id: u64,
state: JobState,
},
UnexpectedCompletionPhase {
job_id: u64,
state: JobState,
},
WorkerNotReady,
ShutdownInProgress,
DuplicateJobId(u64),
UnknownJobId(u64),
JobNotPollable {
job_id: u64,
state: JobState,
},
BootstrapFailed(String),
UnexpectedDirection {
op: String,
},
}
impl fmt::Display for WorkerChannelError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::VersionMismatch { expected, actual } => {
write!(
f,
"protocol version mismatch: expected {expected}, got {actual}"
)
}
Self::ReplayHashMismatch { expected, actual } => {
write!(f, "replay hash mismatch: expected {expected}, got {actual}")
}
Self::InboundSequenceNotFresh { last_seen, actual } => {
write!(
f,
"inbound worker sequence is not fresh: saw {actual} after {last_seen}"
)
}
Self::MissingWorkerSessionIdentity => {
write!(
f,
"worker-originated envelope missing worker session identity"
)
}
Self::UnexpectedWorkerSessionIdentity(worker_id) => {
write!(
f,
"main-thread envelope unexpectedly carried worker session identity {worker_id}"
)
}
Self::WorkerIdentityMismatch { expected, actual } => {
write!(
f,
"worker identity mismatch: envelope carried {actual}, operation expects {expected}"
)
}
Self::InboundWorkerSessionMismatch { expected, actual } => match expected {
Some(expected) => write!(
f,
"inbound worker session mismatch: active session {expected}, got {actual}"
),
None => write!(
f,
"inbound worker session mismatch: no active session, got {actual}"
),
},
Self::PayloadTooLarge { size, max } => {
write!(
f,
"payload too large: {size} bytes exceeds {max} byte limit"
)
}
Self::InvalidTransition { job_id, from, to } => {
write!(f, "invalid job {job_id} transition: {from} → {to}")
}
Self::TerminalStatusSnapshot { job_id, state } => {
write!(
f,
"job {job_id} reported terminal state {state} via status snapshot"
)
}
Self::InvalidStatusSnapshotState { job_id, state } => {
write!(
f,
"job {job_id} reported coordinator-owned state {state} via status snapshot"
)
}
Self::UnexpectedCompletionPhase { job_id, state } => {
write!(
f,
"job {job_id} reported completion while coordinator was in {state}"
)
}
Self::WorkerNotReady => write!(f, "worker has not reported bootstrap readiness"),
Self::ShutdownInProgress => write!(f, "shutdown already in progress"),
Self::DuplicateJobId(id) => write!(f, "duplicate job id: {id}"),
Self::UnknownJobId(id) => write!(f, "unknown job id: {id}"),
Self::JobNotPollable { job_id, state } => {
write!(f, "job {job_id} is terminal ({state}) and cannot be polled")
}
Self::BootstrapFailed(reason) => write!(f, "worker bootstrap failed: {reason}"),
Self::UnexpectedDirection { op } => {
write!(f, "received outbound-only operation as inbound: {op}")
}
}
}
}
impl std::error::Error for WorkerChannelError {}
#[cfg(test)]
mod tests {
use super::*;
fn worker_envelope(
worker_id: &str,
message_id: u64,
seq_no: u64,
issued_at_turn: u64,
op: WorkerOp,
) -> WorkerEnvelope {
WorkerEnvelope::from_worker(worker_id, message_id, seq_no, 42, issued_at_turn, op)
}
fn test_worker_envelope(
message_id: u64,
seq_no: u64,
issued_at_turn: u64,
op: WorkerOp,
) -> WorkerEnvelope {
worker_envelope("test-worker-1", message_id, seq_no, issued_at_turn, op)
}
fn bootstrap_ready_envelope(seq: u64) -> WorkerEnvelope {
worker_envelope(
"test-worker-1",
seq,
seq,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-1".into(),
},
)
}
fn bootstrap_failed_envelope(seq: u64, worker_id: &str, reason: &str) -> WorkerEnvelope {
worker_envelope(
worker_id,
seq,
seq,
0,
WorkerOp::BootstrapFailed {
worker_id: worker_id.into(),
reason: reason.into(),
},
)
}
#[test]
fn coordinator_rejects_spawn_before_bootstrap() {
let mut coord = WorkerCoordinator::new(42);
let result = coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]);
assert_eq!(result, Err(WorkerChannelError::WorkerNotReady));
}
#[test]
fn coordinator_accepts_spawn_after_bootstrap() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
assert!(coord.is_worker_ready());
let job_id = coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
assert_eq!(job_id, 1);
assert_eq!(coord.job_state(1), Some(JobState::Queued));
assert_eq!(coord.inflight_count(), 1);
let msg = coord.drain_outbox().unwrap();
assert!(matches!(msg.op, WorkerOp::SpawnJob(_)));
assert_eq!(msg.version, WORKER_PROTOCOL_VERSION);
assert_eq!(msg.seed, 42);
}
#[test]
fn coordinator_tracks_full_job_lifecycle() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let result_env = test_worker_envelope(
2,
2,
1,
WorkerOp::JobCompleted(JobResult {
job_id: 1,
outcome: JobOutcome::Ok { payload: vec![42] },
}),
);
coord.handle_inbound(&result_env).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Completed));
assert_eq!(coord.inflight_count(), 0);
assert_eq!(
JobOutcome::Ok { payload: vec![42] }.payload_transfer(),
Some(WorkerPayloadTransfer::StructuredClone)
);
}
#[test]
fn coordinator_tracks_cancellation_lifecycle() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Running));
coord.cancel_job(1, "test cancel".into()).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
let cancel_msg = coord.drain_outbox().unwrap();
assert!(matches!(cancel_msg.op, WorkerOp::CancelJob { .. }));
let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
coord.handle_inbound(&ack).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Draining));
let drain_request = coord.drain_outbox().unwrap();
assert!(matches!(drain_request.op, WorkerOp::DrainJob { job_id: 1 }));
let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
coord.handle_inbound(&drain).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
let finalize_request = coord.drain_outbox().unwrap();
assert!(matches!(
finalize_request.op,
WorkerOp::FinalizeJob { job_id: 1 }
));
let finalize = test_worker_envelope(5, 5, 4, WorkerOp::FinalizeCompleted { job_id: 1 });
coord.handle_inbound(&finalize).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Completed));
}
#[test]
fn coordinator_rejects_terminal_status_snapshot() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let snapshot = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Completed,
detail: Some("terminal snapshots must use job_completed".into()),
}),
);
assert!(matches!(
coord.handle_inbound(&snapshot),
Err(WorkerChannelError::TerminalStatusSnapshot {
job_id: 1,
state: JobState::Completed,
})
));
assert_eq!(coord.job_state(1), Some(JobState::Queued));
}
#[test]
fn coordinator_rejects_status_snapshot_for_protocol_owned_drain_phase() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: None,
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "test cancel".into()).unwrap();
let _ = coord.drain_outbox();
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
let draining_snapshot = test_worker_envelope(
3,
3,
2,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Draining,
detail: Some("worker tried to skip cancel_acknowledged".into()),
}),
);
assert!(matches!(
coord.handle_inbound(&draining_snapshot),
Err(WorkerChannelError::InvalidStatusSnapshotState {
job_id: 1,
state: JobState::Draining,
})
));
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
}
#[test]
fn coordinator_rejects_status_snapshot_for_protocol_owned_finalize_phase() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: None,
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "test cancel".into()).unwrap();
let _ = coord.drain_outbox();
let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
coord.handle_inbound(&ack).unwrap();
let _ = coord.drain_outbox();
assert_eq!(coord.job_state(1), Some(JobState::Draining));
let finalizing_snapshot = test_worker_envelope(
4,
4,
3,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Finalizing,
detail: Some("worker tried to skip drain_completed".into()),
}),
);
assert!(matches!(
coord.handle_inbound(&finalizing_snapshot),
Err(WorkerChannelError::InvalidStatusSnapshotState {
job_id: 1,
state: JobState::Finalizing,
})
));
assert_eq!(coord.job_state(1), Some(JobState::Draining));
}
#[test]
fn coordinator_rejects_job_completed_while_cancellation_protocol_active() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: None,
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "test cancel".into()).unwrap();
let _ = coord.drain_outbox();
let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
coord.handle_inbound(&ack).unwrap();
let _ = coord.drain_outbox();
let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
coord.handle_inbound(&drain).unwrap();
let _ = coord.drain_outbox();
let completed = test_worker_envelope(
5,
5,
4,
WorkerOp::JobCompleted(JobResult {
job_id: 1,
outcome: JobOutcome::Cancelled {
reason: "worker skipped finalize".into(),
},
}),
);
assert!(matches!(
coord.handle_inbound(&completed),
Err(WorkerChannelError::UnexpectedCompletionPhase {
job_id: 1,
state: JobState::Finalizing,
})
));
assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
}
#[test]
fn coordinator_accepts_job_completed_racing_with_cancel_request() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "test cancel".into()).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
let cancel_msg = coord.drain_outbox().unwrap();
assert!(matches!(cancel_msg.op, WorkerOp::CancelJob { .. }));
let completed = test_worker_envelope(
3,
3,
2,
WorkerOp::JobCompleted(JobResult {
job_id: 1,
outcome: JobOutcome::Ok { payload: vec![7] },
}),
);
coord.handle_inbound(&completed).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Completed));
assert_eq!(coord.inflight_count(), 0);
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_rejects_invalid_transition() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
coord.cancel_job(1, "cancel before running".into()).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
let cancel = coord.drain_outbox().unwrap();
assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
}
#[test]
fn coordinator_allows_cancel_before_running_snapshot() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let spawn = coord.drain_outbox().unwrap();
assert!(matches!(spawn.op, WorkerOp::SpawnJob(_)));
coord
.cancel_job(1, "cancel before worker starts".into())
.unwrap();
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
let cancel = coord.drain_outbox().unwrap();
assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
}
#[test]
fn coordinator_rejects_oversized_payload() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
let big_payload = vec![0u8; MAX_PAYLOAD_BYTES + 1];
let result = coord.spawn_job(1, 100, 200, 300, big_payload);
assert!(matches!(
result,
Err(WorkerChannelError::PayloadTooLarge { .. })
));
}
#[test]
fn coordinator_rejects_duplicate_job_id() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let result = coord.spawn_job(1, 100, 200, 300, vec![]);
assert_eq!(result, Err(WorkerChannelError::DuplicateJobId(1)));
}
#[test]
fn coordinator_shutdown_lifecycle() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.request_shutdown("test shutdown".into()).unwrap();
assert!(coord.is_shutdown_requested());
let msg = coord.drain_outbox().unwrap();
assert!(matches!(msg.op, WorkerOp::ShutdownWorker { .. }));
let result = coord.spawn_job(1, 100, 200, 300, vec![]);
assert_eq!(result, Err(WorkerChannelError::ShutdownInProgress));
let done = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
coord.handle_inbound(&done).unwrap();
assert!(!coord.is_shutdown_requested());
assert!(!coord.is_worker_ready());
assert_eq!(
coord.spawn_job(1, 100, 200, 300, vec![]),
Err(WorkerChannelError::WorkerNotReady)
);
}
#[test]
fn coordinator_request_shutdown_discards_queued_spawn_before_shutdown() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
coord.request_shutdown("shutdown now".into()).unwrap();
let shutdown = coord.drain_outbox().unwrap();
assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_request_shutdown_discards_queued_poll_and_cancel_before_shutdown() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.poll_status(1).unwrap();
coord
.cancel_job(1, "shutdown supersedes cancel".into())
.unwrap();
coord.request_shutdown("shutdown now".into()).unwrap();
let shutdown = coord.drain_outbox().unwrap();
assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_rejects_cancel_after_shutdown_requested_without_mutating_job() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.request_shutdown("shutdown now".into()).unwrap();
assert_eq!(
coord.cancel_job(1, "too late".into()),
Err(WorkerChannelError::ShutdownInProgress)
);
assert_eq!(coord.job_state(1), Some(JobState::Running));
let shutdown = coord.drain_outbox().unwrap();
assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_rejects_poll_after_shutdown_requested_without_enqueuing_follow_up() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.request_shutdown("shutdown now".into()).unwrap();
assert_eq!(
coord.poll_status(1),
Err(WorkerChannelError::ShutdownInProgress)
);
assert_eq!(coord.job_state(1), Some(JobState::Running));
let shutdown = coord.drain_outbox().unwrap();
assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_shutdown_supersedes_cancel_acknowledged_follow_up() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "begin cancel".into()).unwrap();
let cancel = coord.drain_outbox().unwrap();
assert!(matches!(cancel.op, WorkerOp::CancelJob { job_id: 1, .. }));
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
coord.request_shutdown("shutdown now".into()).unwrap();
assert!(coord.is_shutdown_requested());
let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
coord.handle_inbound(&ack).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
let shutdown = coord.drain_outbox().unwrap();
assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_shutdown_supersedes_drain_completed_follow_up() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "begin cancel".into()).unwrap();
let _ = coord.drain_outbox();
let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
coord.handle_inbound(&ack).unwrap();
let drain = coord.drain_outbox().unwrap();
assert!(matches!(drain.op, WorkerOp::DrainJob { job_id: 1 }));
assert_eq!(coord.job_state(1), Some(JobState::Draining));
coord.request_shutdown("shutdown now".into()).unwrap();
assert!(coord.is_shutdown_requested());
let drain_completed = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
coord.handle_inbound(&drain_completed).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Draining));
let shutdown = coord.drain_outbox().unwrap();
assert!(matches!(shutdown.op, WorkerOp::ShutdownWorker { .. }));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_marks_nonterminal_job_failed_when_shutdown_completes() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker started".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "shutdown".into()).unwrap();
let _ = coord.drain_outbox();
let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
coord.handle_inbound(&ack).unwrap();
let _ = coord.drain_outbox();
let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
coord.handle_inbound(&drain).unwrap();
let _ = coord.drain_outbox();
assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
assert_eq!(coord.inflight_count(), 1);
let shutdown = test_worker_envelope(5, 5, 4, WorkerOp::ShutdownCompleted);
coord.handle_inbound(&shutdown).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Failed));
assert_eq!(coord.inflight_count(), 0);
assert!(!coord.is_worker_ready());
}
#[test]
fn coordinator_rejects_wrong_direction_messages() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
let bad = WorkerEnvelope::new(
2,
2,
42,
1,
WorkerOp::SpawnJob(SpawnJobRequest {
job_id: 1,
region_id: 100,
task_id: 200,
obligation_id: 300,
payload: vec![],
}),
);
let result = coord.handle_inbound(&bad);
assert!(matches!(
result,
Err(WorkerChannelError::UnexpectedDirection { .. })
));
}
#[test]
fn coordinator_polls_status_and_applies_snapshot() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
coord.poll_status(1).unwrap();
let poll = coord.drain_outbox().unwrap();
assert!(matches!(poll.op, WorkerOp::PollStatus { job_id: 1 }));
let snapshot = test_worker_envelope(
3,
3,
2,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: None,
}),
);
coord.handle_inbound(&snapshot).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Running));
}
#[test]
fn coordinator_rejects_poll_after_job_completed_without_enqueuing_follow_up() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let completed = test_worker_envelope(
2,
2,
1,
WorkerOp::JobCompleted(JobResult {
job_id: 1,
outcome: JobOutcome::Ok { payload: vec![7] },
}),
);
coord.handle_inbound(&completed).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Completed));
assert_eq!(
coord.poll_status(1),
Err(WorkerChannelError::JobNotPollable {
job_id: 1,
state: JobState::Completed,
})
);
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_rejects_poll_after_rebootstrap_fails_prior_job_without_enqueuing_follow_up() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let reboot = worker_envelope(
"test-worker-2",
2,
2,
1,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Failed));
assert_eq!(
coord.poll_status(1),
Err(WorkerChannelError::JobNotPollable {
job_id: 1,
state: JobState::Failed,
})
);
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_rejects_duplicate_inbound_sequence() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
let diag = test_worker_envelope(
2,
2,
1,
WorkerOp::Diagnostic(DiagnosticEvent {
level: DiagnosticLevel::Info,
category: "lifecycle".into(),
message: "worker initialized".into(),
metadata: None,
}),
);
coord.handle_inbound(&diag).unwrap();
assert_eq!(coord.last_inbound_seq_no, 2);
let duplicate = test_worker_envelope(3, 2, 2, WorkerOp::ShutdownCompleted);
assert_eq!(
coord.handle_inbound(&duplicate),
Err(WorkerChannelError::InboundSequenceNotFresh {
last_seen: 2,
actual: 2,
})
);
assert_eq!(coord.last_inbound_seq_no, 2);
assert!(coord.is_worker_ready());
}
#[test]
fn coordinator_accepts_fresh_bootstrap_after_shutdown_completed() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.request_shutdown("done".into()).unwrap();
let _ = coord.drain_outbox();
let shutdown = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
coord.handle_inbound(&shutdown).unwrap();
assert_eq!(coord.last_inbound_seq_no, 2);
assert!(!coord.is_worker_ready());
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
assert!(coord.is_worker_ready());
assert_eq!(coord.last_inbound_seq_no, 1);
}
#[test]
fn coordinator_allows_reusing_failed_job_id_after_shutdown_completed() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![1]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker one accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.request_shutdown("done".into()).unwrap();
let _ = coord.drain_outbox();
let shutdown = test_worker_envelope(3, 3, 2, WorkerOp::ShutdownCompleted);
coord.handle_inbound(&shutdown).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Failed));
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
coord.spawn_job(1, 100, 201, 301, vec![7, 8]).unwrap();
let respawn = coord.drain_outbox().unwrap();
match respawn.op {
WorkerOp::SpawnJob(request) => {
assert_eq!(request.job_id, 1);
assert_eq!(request.task_id, 201);
assert_eq!(request.obligation_id, 301);
assert_eq!(request.payload, vec![7, 8]);
}
other => panic!("expected respawn after shutdown-complete reboot, got {other:?}"),
}
assert_eq!(coord.job_state(1), Some(JobState::Queued));
}
#[test]
fn coordinator_marks_running_job_failed_when_worker_instance_changes() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker one accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Running));
assert_eq!(coord.inflight_count(), 1);
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
assert!(coord.is_worker_ready());
assert_eq!(coord.last_inbound_seq_no, 1);
assert_eq!(coord.job_state(1), Some(JobState::Failed));
assert_eq!(coord.inflight_count(), 0);
}
#[test]
fn coordinator_discards_queued_spawn_when_worker_instance_changes() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Failed));
assert!(coord.drain_outbox().is_none());
coord.spawn_job(2, 100, 201, 301, vec![9]).unwrap();
let fresh_spawn = coord.drain_outbox().unwrap();
match fresh_spawn.op {
WorkerOp::SpawnJob(request) => assert_eq!(request.job_id, 2),
other => panic!("expected fresh spawn after reboot, got {other:?}"),
}
}
#[test]
fn coordinator_allows_reusing_failed_job_id_after_worker_instance_change() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![1, 2, 3]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker one accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Failed));
coord.spawn_job(1, 100, 201, 301, vec![9]).unwrap();
let respawn = coord.drain_outbox().unwrap();
match respawn.op {
WorkerOp::SpawnJob(request) => {
assert_eq!(request.job_id, 1);
assert_eq!(request.task_id, 201);
assert_eq!(request.obligation_id, 301);
assert_eq!(request.payload, vec![9]);
}
other => panic!("expected respawn after reboot, got {other:?}"),
}
assert_eq!(coord.job_state(1), Some(JobState::Queued));
}
#[test]
fn coordinator_discards_queued_cancel_when_worker_instance_changes() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "worker replaced".into()).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::CancelRequested));
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Failed));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_discards_queued_finalize_when_shutdown_completes() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker started".into()),
}),
);
coord.handle_inbound(&running).unwrap();
coord.cancel_job(1, "shutdown".into()).unwrap();
let _ = coord.drain_outbox();
let ack = test_worker_envelope(3, 3, 2, WorkerOp::CancelAcknowledged { job_id: 1 });
coord.handle_inbound(&ack).unwrap();
let _ = coord.drain_outbox();
let drain = test_worker_envelope(4, 4, 3, WorkerOp::DrainCompleted { job_id: 1 });
coord.handle_inbound(&drain).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Finalizing));
let pending_finalize = coord.drain_outbox().unwrap();
assert!(matches!(
pending_finalize.op,
WorkerOp::FinalizeJob { job_id: 1 }
));
coord
.enqueue_job_message(1, WorkerOp::FinalizeJob { job_id: 1 })
.unwrap();
let shutdown = test_worker_envelope(5, 5, 4, WorkerOp::ShutdownCompleted);
coord.handle_inbound(&shutdown).unwrap();
assert_eq!(coord.job_state(1), Some(JobState::Failed));
assert!(coord.drain_outbox().is_none());
}
#[test]
fn coordinator_accepts_fresh_bootstrap_after_bootstrap_failed() {
let mut coord = WorkerCoordinator::new(42);
let failed = bootstrap_failed_envelope(1, "failed-worker-1", "synthetic boot failure");
assert_eq!(
coord.handle_inbound(&failed),
Err(WorkerChannelError::BootstrapFailed(
"synthetic boot failure".into()
))
);
assert_eq!(coord.last_inbound_seq_no, 1);
assert!(!coord.is_worker_ready());
let retry = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&retry).unwrap();
assert!(coord.is_worker_ready());
assert_eq!(coord.last_inbound_seq_no, 1);
}
#[test]
fn coordinator_accepts_fresh_bootstrap_failed_after_live_session() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: Some("worker one accepted job".into()),
}),
);
coord.handle_inbound(&running).unwrap();
let failed = bootstrap_failed_envelope(1, "failed-worker-2", "reboot failed to init");
assert_eq!(
coord.handle_inbound(&failed),
Err(WorkerChannelError::BootstrapFailed(
"reboot failed to init".into()
))
);
assert_eq!(coord.last_inbound_seq_no, 1);
assert!(!coord.is_worker_ready());
assert_eq!(coord.job_state(1), Some(JobState::Failed));
assert_eq!(coord.inflight_count(), 0);
}
#[test]
fn coordinator_keeps_prior_high_water_mark_until_rebootstrap() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.request_shutdown("done".into()).unwrap();
let _ = coord.drain_outbox();
let shutdown = test_worker_envelope(2, 2, 1, WorkerOp::ShutdownCompleted);
coord.handle_inbound(&shutdown).unwrap();
assert_eq!(coord.last_inbound_seq_no, 2);
assert!(!coord.is_worker_ready());
let stale = test_worker_envelope(
1,
1,
0,
WorkerOp::Diagnostic(DiagnosticEvent {
level: DiagnosticLevel::Info,
category: "lifecycle".into(),
message: "stale pre-restart message".into(),
metadata: None,
}),
);
assert_eq!(
coord.handle_inbound(&stale),
Err(WorkerChannelError::InboundWorkerSessionMismatch {
expected: None,
actual: "test-worker-1".into(),
})
);
assert_eq!(coord.last_inbound_seq_no, 2);
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
assert!(coord.is_worker_ready());
assert_eq!(coord.last_inbound_seq_no, 1);
}
#[test]
fn coordinator_rejects_replayed_bootstrap_from_same_worker_session() {
let mut coord = WorkerCoordinator::new(42);
let ready = worker_envelope(
"stable-worker",
5,
5,
0,
WorkerOp::BootstrapReady {
worker_id: "stable-worker".into(),
},
);
coord.handle_inbound(&ready).unwrap();
let replay = worker_envelope(
"stable-worker",
1,
1,
1,
WorkerOp::BootstrapReady {
worker_id: "stable-worker".into(),
},
);
assert_eq!(
coord.handle_inbound(&replay),
Err(WorkerChannelError::InboundSequenceNotFresh {
last_seen: 5,
actual: 1,
})
);
}
#[test]
fn coordinator_rejects_replayed_bootstrap_failed_from_same_worker_session() {
let mut coord = WorkerCoordinator::new(42);
let ready = worker_envelope(
"stable-worker",
5,
5,
0,
WorkerOp::BootstrapReady {
worker_id: "stable-worker".into(),
},
);
coord.handle_inbound(&ready).unwrap();
let replay = bootstrap_failed_envelope(1, "stable-worker", "stale failure replay");
assert_eq!(
coord.handle_inbound(&replay),
Err(WorkerChannelError::InboundSequenceNotFresh {
last_seen: 5,
actual: 1,
})
);
assert!(coord.is_worker_ready());
}
#[test]
fn coordinator_rejects_out_of_order_job_message_sequence() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
let _ = coord.drain_outbox();
let running = test_worker_envelope(
2,
2,
1,
WorkerOp::StatusSnapshot(JobStatusSnapshot {
job_id: 1,
state: JobState::Running,
detail: None,
}),
);
coord.handle_inbound(&running).unwrap();
let diag = test_worker_envelope(
3,
4,
2,
WorkerOp::Diagnostic(DiagnosticEvent {
level: DiagnosticLevel::Info,
category: "scheduler".into(),
message: "worker tick".into(),
metadata: None,
}),
);
coord.handle_inbound(&diag).unwrap();
let stale_completion = test_worker_envelope(
4,
3,
3,
WorkerOp::JobCompleted(JobResult {
job_id: 1,
outcome: JobOutcome::Ok { payload: vec![7] },
}),
);
assert_eq!(
coord.handle_inbound(&stale_completion),
Err(WorkerChannelError::InboundSequenceNotFresh {
last_seen: 4,
actual: 3,
})
);
assert_eq!(coord.last_inbound_seq_no, 4);
assert_eq!(coord.job_state(1), Some(JobState::Running));
}
#[test]
fn envelope_validates_version() {
let mut env = worker_envelope(
"w",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "w".into(),
},
);
assert!(env.validate().is_ok());
env.version = 99;
assert!(matches!(
env.validate(),
Err(WorkerChannelError::VersionMismatch { .. })
));
}
#[test]
fn envelope_validates_replay_metadata() {
let mut env = worker_envelope(
"w",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "w".into(),
},
);
assert!(env.validate().is_ok());
env.decision_seq = 2;
env.replay_hash = replay_hash(
env.message_id,
env.seq_no,
env.decision_seq,
env.seed,
env.issued_at_turn,
env.worker_id.as_deref(),
&env.op,
);
assert!(env.validate().is_ok());
env.op = WorkerOp::ShutdownCompleted;
assert!(matches!(
env.validate(),
Err(WorkerChannelError::ReplayHashMismatch { .. })
));
}
#[test]
fn envelope_validates_payload_size() {
let env = WorkerEnvelope::new(
1,
1,
42,
0,
WorkerOp::SpawnJob(SpawnJobRequest {
job_id: 1,
region_id: 100,
task_id: 200,
obligation_id: 300,
payload: vec![0u8; MAX_PAYLOAD_BYTES + 1],
}),
);
assert!(matches!(
env.validate(),
Err(WorkerChannelError::PayloadTooLarge { .. })
));
let completed = test_worker_envelope(
2,
2,
0,
WorkerOp::JobCompleted(JobResult {
job_id: 1,
outcome: JobOutcome::Ok {
payload: vec![0u8; MAX_PAYLOAD_BYTES + 1],
},
}),
);
assert!(matches!(
completed.validate(),
Err(WorkerChannelError::PayloadTooLarge { .. })
));
}
#[test]
fn job_state_transitions_are_correct() {
assert!(JobState::Created.can_transition_to(JobState::Queued));
assert!(JobState::Created.can_transition_to(JobState::Failed));
assert!(JobState::Queued.can_transition_to(JobState::Running));
assert!(JobState::Queued.can_transition_to(JobState::CancelRequested));
assert!(JobState::Queued.can_transition_to(JobState::Failed));
assert!(JobState::Running.can_transition_to(JobState::Completed));
assert!(JobState::Running.can_transition_to(JobState::CancelRequested));
assert!(JobState::Running.can_transition_to(JobState::Failed));
assert!(JobState::CancelRequested.can_transition_to(JobState::Completed));
assert!(JobState::CancelRequested.can_transition_to(JobState::Draining));
assert!(JobState::CancelRequested.can_transition_to(JobState::Failed));
assert!(JobState::Draining.can_transition_to(JobState::Finalizing));
assert!(JobState::Draining.can_transition_to(JobState::Failed));
assert!(JobState::Finalizing.can_transition_to(JobState::Completed));
assert!(JobState::Finalizing.can_transition_to(JobState::Failed));
assert!(JobState::Queued.can_transition_to(JobState::Completed));
assert!(!JobState::Created.can_transition_to(JobState::Running));
assert!(!JobState::Created.can_transition_to(JobState::Completed));
assert!(!JobState::Draining.can_transition_to(JobState::Completed));
assert!(!JobState::Completed.can_transition_to(JobState::Running));
}
#[test]
fn envelope_serialization_round_trip() {
let env = WorkerEnvelope::new(
1,
1,
42,
0,
WorkerOp::SpawnJob(SpawnJobRequest {
job_id: 1,
region_id: 100,
task_id: 200,
obligation_id: 300,
payload: vec![1, 2, 3],
}),
);
let json = serde_json::to_string(&env).unwrap();
let deserialized: WorkerEnvelope = serde_json::from_str(&json).unwrap();
assert_eq!(env, deserialized);
}
#[test]
fn worker_envelope_serialization_round_trip_preserves_worker_identity() {
let env = worker_envelope(
"worker-round-trip",
7,
9,
3,
WorkerOp::Diagnostic(DiagnosticEvent {
level: DiagnosticLevel::Info,
category: "serde".into(),
message: "round-trip".into(),
metadata: Some("worker identity must survive serialization".into()),
}),
);
let json = serde_json::to_string(&env).unwrap();
let deserialized: WorkerEnvelope = serde_json::from_str(&json).unwrap();
assert_eq!(env, deserialized);
assert!(deserialized.validate().is_ok());
}
#[test]
fn coordinator_sequence_numbers_are_monotonic() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
coord.spawn_job(2, 100, 201, 301, vec![]).unwrap();
let msg1 = coord.drain_outbox().unwrap();
let msg2 = coord.drain_outbox().unwrap();
assert!(msg2.seq_no > msg1.seq_no);
assert!(msg2.message_id > msg1.message_id);
}
#[test]
fn coordinator_emits_independent_decision_sequence() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
coord.next_decision_seq = 19;
coord.spawn_job(1, 100, 200, 300, vec![]).unwrap();
coord.spawn_job(2, 100, 201, 301, vec![]).unwrap();
let msg1 = coord.drain_outbox().unwrap();
let msg2 = coord.drain_outbox().unwrap();
assert_eq!(msg1.seq_no, 1);
assert_eq!(msg1.decision_seq, 19);
assert_ne!(msg1.seq_no, msg1.decision_seq);
assert!(msg1.validate().is_ok());
assert_eq!(msg2.seq_no, 2);
assert_eq!(msg2.decision_seq, 20);
assert_ne!(msg2.seq_no, msg2.decision_seq);
assert!(msg2.validate().is_ok());
}
#[test]
fn diagnostic_events_are_accepted() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
let diag = test_worker_envelope(
2,
2,
1,
WorkerOp::Diagnostic(DiagnosticEvent {
level: DiagnosticLevel::Info,
category: "lifecycle".into(),
message: "worker initialized".into(),
metadata: None,
}),
);
assert!(coord.handle_inbound(&diag).is_ok());
}
#[test]
fn coordinator_rejects_stale_old_worker_message_after_rebootstrap() {
let mut coord = WorkerCoordinator::new(42);
coord.handle_inbound(&bootstrap_ready_envelope(1)).unwrap();
let reboot = worker_envelope(
"test-worker-2",
1,
1,
0,
WorkerOp::BootstrapReady {
worker_id: "test-worker-2".into(),
},
);
coord.handle_inbound(&reboot).unwrap();
let stale = test_worker_envelope(
9,
9,
1,
WorkerOp::Diagnostic(DiagnosticEvent {
level: DiagnosticLevel::Warn,
category: "stale".into(),
message: "late message from replaced worker".into(),
metadata: None,
}),
);
assert_eq!(
coord.handle_inbound(&stale),
Err(WorkerChannelError::InboundWorkerSessionMismatch {
expected: Some("test-worker-2".into()),
actual: "test-worker-1".into(),
})
);
let fresh = worker_envelope(
"test-worker-2",
2,
2,
1,
WorkerOp::Diagnostic(DiagnosticEvent {
level: DiagnosticLevel::Info,
category: "fresh".into(),
message: "new worker follow-up".into(),
metadata: None,
}),
);
coord.handle_inbound(&fresh).unwrap();
assert_eq!(coord.last_inbound_seq_no, 2);
}
#[test]
fn envelope_rejects_worker_message_without_session_identity() {
let env = WorkerEnvelope::new(2, 2, 42, 1, WorkerOp::ShutdownCompleted);
assert_eq!(
env.validate(),
Err(WorkerChannelError::MissingWorkerSessionIdentity)
);
}
}