use super::network::MAX_DUPLICATE_PACKET_DELAY;
use crate::bytes::Bytes;
use crate::cx::Cx;
use crate::lab::network::{Fault, HostId, NetworkConfig, SimulatedNetwork};
use crate::remote::{
CancelRequest, IdempotencyKey, IdempotencyRequestFingerprint, IdempotencyStore, LeaseRenewal,
MessageEnvelope, NodeId, RemoteCap, RemoteError, RemoteMessage, RemoteOutcome, RemoteRuntime,
RemoteTaskId, RemoteTaskState, ResultDelivery, SpawnAck, SpawnAckStatus, SpawnRejectReason,
SpawnRequest,
};
use crate::trace::distributed::{CausalTracker, LogicalTime, VectorClock};
use crate::types::Time;
use parking_lot::Mutex;
use std::collections::{BTreeMap, VecDeque};
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug)]
struct PendingResultEntry {
tx: Option<crate::channel::oneshot::Sender<Result<RemoteOutcome, RemoteError>>>,
state: RemoteTaskState,
}
type PendingResultsMap = BTreeMap<RemoteTaskId, PendingResultEntry>;
type SharedPendingResults = Arc<Mutex<PendingResultsMap>>;
#[derive(Clone, Debug)]
struct StoredEnvelope {
envelope: MessageEnvelope<RemoteMessage>,
retain_until: Option<Duration>,
}
#[derive(Debug)]
pub struct VirtualNetworkRuntime {
local_node: NodeId,
outbox: Arc<Mutex<VecDeque<(NodeId, RemoteMessage)>>>,
pending_results: SharedPendingResults,
}
impl RemoteRuntime for VirtualNetworkRuntime {
fn send_message(
&self,
destination: &NodeId,
envelope: MessageEnvelope<RemoteMessage>,
) -> Result<(), RemoteError> {
let message = match envelope.payload {
RemoteMessage::SpawnRequest(mut req) => {
req.origin_node = self.local_node.clone();
RemoteMessage::SpawnRequest(req)
}
RemoteMessage::CancelRequest(mut cancel) => {
cancel.origin_node = self.local_node.clone();
RemoteMessage::CancelRequest(cancel)
}
other => other,
};
self.outbox.lock().push_back((destination.clone(), message));
Ok(())
}
fn register_task(
&self,
task_id: RemoteTaskId,
tx: crate::channel::oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
) {
let mut pending = self.pending_results.lock();
pending.insert(
task_id,
PendingResultEntry {
tx: Some(tx),
state: RemoteTaskState::Pending,
},
);
}
fn observe_task_state(&self, task_id: RemoteTaskId) -> Option<RemoteTaskState> {
let pending = self.pending_results.lock();
pending.get(&task_id).map(|entry| entry.state)
}
fn clear_task_state(&self, task_id: RemoteTaskId) {
let mut pending = self.pending_results.lock();
pending.remove(&task_id);
}
fn unregister_task(&self, task_id: RemoteTaskId) {
let mut pending = self.pending_results.lock();
pending.remove(&task_id);
}
}
#[derive(Debug)]
pub struct SimNode {
pub node_id: NodeId,
pub host_id: HostId,
outbox: VecDeque<(NodeId, RemoteMessage)>,
app_outbox: Arc<Mutex<VecDeque<(NodeId, RemoteMessage)>>>,
running_tasks: BTreeMap<RemoteTaskId, RunningTask>,
duplicate_waiters: BTreeMap<RemoteTaskId, Vec<RemoteTaskId>>,
duplicate_aliases: BTreeMap<RemoteTaskId, RemoteTaskId>,
pending_results: SharedPendingResults,
dedup: IdempotencyStore,
causal: CausalTracker,
crashed: bool,
event_log: Vec<NodeEvent>,
}
#[derive(Debug, Clone)]
pub struct RunningTask {
pub task_id: RemoteTaskId,
pub idempotency_key: IdempotencyKey,
pub origin: NodeId,
pub work_remaining: Duration,
pub cancel_requested: bool,
}
#[derive(Debug, Clone)]
pub enum NodeEvent {
SpawnReceived {
task_id: RemoteTaskId,
from: NodeId,
},
SpawnAccepted {
task_id: RemoteTaskId,
},
SpawnRejected {
task_id: RemoteTaskId,
reason: SpawnRejectReason,
},
TaskCompleted {
task_id: RemoteTaskId,
},
TaskCancelled {
task_id: RemoteTaskId,
},
CancelReceived {
task_id: RemoteTaskId,
},
LeaseRenewed {
task_id: RemoteTaskId,
},
DuplicateSpawn {
task_id: RemoteTaskId,
},
Crashed,
Restarted,
}
impl SimNode {
#[must_use]
pub fn new(node_id: NodeId, host_id: HostId) -> Self {
Self {
causal: CausalTracker::new(node_id.clone()),
node_id,
host_id,
outbox: VecDeque::new(),
app_outbox: Arc::new(Mutex::new(VecDeque::new())),
running_tasks: BTreeMap::new(),
duplicate_waiters: BTreeMap::new(),
duplicate_aliases: BTreeMap::new(),
pending_results: Arc::new(Mutex::new(BTreeMap::new())),
dedup: IdempotencyStore::new(Duration::from_mins(5)),
crashed: false,
event_log: Vec::new(),
}
}
#[must_use]
pub fn create_cap(&self) -> RemoteCap {
let runtime = VirtualNetworkRuntime {
local_node: self.node_id.clone(),
outbox: self.app_outbox.clone(),
pending_results: self.pending_results.clone(),
};
RemoteCap::new()
.with_local_node(self.node_id.clone())
.with_runtime(Arc::new(runtime))
}
pub fn handle_message(&mut self, envelope: MessageEnvelope<RemoteMessage>, now: Time) {
if self.crashed {
return; }
self.record_receive(&envelope.sender_time);
match envelope.payload {
RemoteMessage::SpawnRequest(req) => self.handle_spawn(req, now),
RemoteMessage::SpawnAck(ack) => self.handle_spawn_ack(ack),
RemoteMessage::CancelRequest(cancel) => self.handle_cancel(&cancel),
RemoteMessage::ResultDelivery(result) => self.handle_result(result),
RemoteMessage::LeaseRenewal(renewal) => self.handle_lease_renewal(&renewal),
}
}
fn record_receive(&mut self, sender_time: &LogicalTime) {
match sender_time {
LogicalTime::Vector(clock) => self.causal.on_receive(clock),
_ => {
self.causal.record_local_event();
}
}
}
fn handle_spawn(&mut self, req: SpawnRequest, now: Time) {
self.event_log.push(NodeEvent::SpawnReceived {
task_id: req.remote_task_id,
from: req.origin_node.clone(),
});
let _ = self.dedup.evict_expired(now);
let request = IdempotencyRequestFingerprint::from_spawn_request(&req);
let dedup = self.dedup.check(&req.idempotency_key, &request, now);
match dedup {
crate::remote::DedupDecision::Duplicate(record) => {
if record.outcome.is_none() {
self.register_duplicate_alias(record.remote_task_id, req.remote_task_id);
}
self.event_log.push(NodeEvent::DuplicateSpawn {
task_id: req.remote_task_id,
});
self.outbox.push_back((
req.origin_node.clone(),
RemoteMessage::SpawnAck(SpawnAck {
remote_task_id: req.remote_task_id,
status: SpawnAckStatus::Accepted,
assigned_node: self.node_id.clone(),
}),
));
if let Some(outcome) = record.outcome.clone() {
self.outbox.push_back((
req.origin_node,
RemoteMessage::ResultDelivery(ResultDelivery {
remote_task_id: req.remote_task_id,
outcome,
execution_time: Duration::ZERO,
}),
));
}
return;
}
crate::remote::DedupDecision::Conflict => {
self.outbox.push_back((
req.origin_node.clone(),
RemoteMessage::SpawnAck(SpawnAck {
remote_task_id: req.remote_task_id,
status: SpawnAckStatus::Rejected(SpawnRejectReason::IdempotencyConflict),
assigned_node: self.node_id.clone(),
}),
));
self.event_log.push(NodeEvent::SpawnRejected {
task_id: req.remote_task_id,
reason: SpawnRejectReason::IdempotencyConflict,
});
return;
}
crate::remote::DedupDecision::New => {}
}
self.dedup
.record(req.idempotency_key, req.remote_task_id, request, now);
let task = RunningTask {
task_id: req.remote_task_id,
idempotency_key: req.idempotency_key,
origin: req.origin_node.clone(),
work_remaining: Duration::from_millis(100), cancel_requested: false,
};
self.running_tasks.insert(req.remote_task_id, task);
self.outbox.push_back((
req.origin_node,
RemoteMessage::SpawnAck(SpawnAck {
remote_task_id: req.remote_task_id,
status: SpawnAckStatus::Accepted,
assigned_node: self.node_id.clone(),
}),
));
self.event_log.push(NodeEvent::SpawnAccepted {
task_id: req.remote_task_id,
});
}
fn handle_spawn_ack(&self, ack: SpawnAck) {
let rejected = {
let mut pending = self.pending_results.lock();
pending
.get_mut(&ack.remote_task_id)
.and_then(|entry| match ack.status {
SpawnAckStatus::Accepted => {
if entry.state == RemoteTaskState::Pending {
entry.state = RemoteTaskState::Running;
}
None
}
SpawnAckStatus::Rejected(reason) => {
if entry.state != RemoteTaskState::Pending {
return None;
}
entry.state = RemoteTaskState::Failed;
entry.tx.take().map(|tx| (tx, reason))
}
})
};
if let Some((tx, reason)) = rejected {
let cx = Cx::for_testing();
let _ = tx.send(&cx, Err(RemoteError::SpawnRejected(reason)));
}
}
fn handle_cancel(&mut self, cancel: &CancelRequest) {
self.event_log.push(NodeEvent::CancelReceived {
task_id: cancel.remote_task_id,
});
let canonical_task_id = self.canonical_task_id(cancel.remote_task_id);
if let Some(task) = self.running_tasks.get_mut(&canonical_task_id) {
task.cancel_requested = true;
}
}
fn handle_result(&self, result: ResultDelivery) {
let ResultDelivery {
remote_task_id,
outcome,
execution_time: _,
} = result;
let tx = {
let mut pending = self.pending_results.lock();
pending.get_mut(&remote_task_id).and_then(|entry| {
let tx = entry.tx.take()?;
entry.state = match &outcome {
RemoteOutcome::Success(_) => RemoteTaskState::Completed,
RemoteOutcome::Cancelled(_) => RemoteTaskState::Cancelled,
RemoteOutcome::Failed(_) | RemoteOutcome::Panicked(_) => {
RemoteTaskState::Failed
}
};
Some(tx)
})
};
if let Some(tx) = tx {
let cx = Cx::for_testing();
if tx.send(&cx, Ok(outcome)).is_err() {
self.pending_results.lock().remove(&remote_task_id);
}
}
}
fn handle_lease_renewal(&mut self, renewal: &LeaseRenewal) {
self.event_log.push(NodeEvent::LeaseRenewed {
task_id: renewal.remote_task_id,
});
}
pub fn tick(&mut self, elapsed: Duration) -> Vec<(NodeId, RemoteMessage)> {
if self.crashed {
return Vec::new();
}
let mut completed = Vec::new();
let mut finalized = Vec::new();
let mut to_remove = Vec::new();
for (id, task) in &mut self.running_tasks {
if task.cancel_requested {
let outcome =
RemoteOutcome::Cancelled(crate::types::CancelReason::user("harness cancel"));
let _ = self.dedup.complete(&task.idempotency_key, outcome.clone());
finalized.push((*id, task.origin.clone(), outcome));
self.event_log
.push(NodeEvent::TaskCancelled { task_id: *id });
to_remove.push(*id);
} else if task.work_remaining <= elapsed {
let outcome = RemoteOutcome::Success(vec![]);
let _ = self.dedup.complete(&task.idempotency_key, outcome.clone());
finalized.push((*id, task.origin.clone(), outcome));
self.event_log
.push(NodeEvent::TaskCompleted { task_id: *id });
to_remove.push(*id);
} else {
task.work_remaining -= elapsed;
}
}
for (task_id, origin, outcome) in finalized {
completed.push((
origin.clone(),
RemoteMessage::ResultDelivery(ResultDelivery {
remote_task_id: task_id,
outcome: outcome.clone(),
execution_time: Duration::ZERO,
}),
));
for duplicate_task_id in self.take_duplicate_waiters(task_id) {
completed.push((
origin.clone(),
RemoteMessage::ResultDelivery(ResultDelivery {
remote_task_id: duplicate_task_id,
outcome: outcome.clone(),
execution_time: Duration::ZERO,
}),
));
}
}
for id in to_remove {
self.running_tasks.remove(&id);
}
completed
}
pub fn crash(&mut self) {
self.crashed = true;
self.running_tasks.clear();
self.duplicate_waiters.clear();
self.duplicate_aliases.clear();
self.outbox.clear();
{
let mut app = self.app_outbox.lock();
app.clear();
}
self.event_log.push(NodeEvent::Crashed);
}
pub fn restart(&mut self) {
self.crashed = false;
self.duplicate_waiters.clear();
self.duplicate_aliases.clear();
self.dedup = IdempotencyStore::new(Duration::from_mins(5));
self.event_log.push(NodeEvent::Restarted);
}
fn register_duplicate_alias(
&mut self,
canonical_task_id: RemoteTaskId,
duplicate_task_id: RemoteTaskId,
) {
if canonical_task_id == duplicate_task_id {
return;
}
let waiters = self.duplicate_waiters.entry(canonical_task_id).or_default();
if !waiters.contains(&duplicate_task_id) {
waiters.push(duplicate_task_id);
}
self.duplicate_aliases
.insert(duplicate_task_id, canonical_task_id);
}
fn canonical_task_id(&self, task_id: RemoteTaskId) -> RemoteTaskId {
self.duplicate_aliases
.get(&task_id)
.copied()
.unwrap_or(task_id)
}
fn take_duplicate_waiters(&mut self, canonical_task_id: RemoteTaskId) -> Vec<RemoteTaskId> {
let waiters = self
.duplicate_waiters
.remove(&canonical_task_id)
.unwrap_or_default();
for duplicate_task_id in &waiters {
self.duplicate_aliases.remove(duplicate_task_id);
}
waiters
}
#[must_use]
pub fn events(&self) -> &[NodeEvent] {
&self.event_log
}
#[must_use]
pub fn running_task_count(&self) -> usize {
self.running_tasks.len()
}
#[must_use]
pub fn causal_tracker(&self) -> &CausalTracker {
&self.causal
}
pub fn drain_outbox(&mut self) -> Vec<(NodeId, RemoteMessage)> {
let mut msgs: Vec<_> = self.outbox.drain(..).collect();
{
let mut app = self.app_outbox.lock();
msgs.extend(app.drain(..));
}
msgs
}
}
#[derive(Clone, Debug)]
pub struct FaultEvent {
pub at: Duration,
pub fault: HarnessFault,
}
#[derive(Clone, Debug)]
pub enum HarnessFault {
Network(Fault),
CrashNode(NodeId),
RestartNode(NodeId),
ExpireLeases(NodeId),
}
#[derive(Clone, Debug, Default)]
pub struct FaultScript {
events: Vec<FaultEvent>,
}
impl FaultScript {
#[must_use]
pub fn new() -> Self {
Self { events: Vec::new() }
}
#[must_use]
pub fn at(mut self, offset: Duration, fault: HarnessFault) -> Self {
self.events.push(FaultEvent { at: offset, fault });
self
}
#[must_use]
pub fn sorted_events(&self) -> Vec<&FaultEvent> {
let mut sorted: Vec<_> = self.events.iter().collect();
sorted.sort_by_key(|e| e.at);
sorted
}
}
pub struct DistributedHarness {
network: SimulatedNetwork,
nodes: BTreeMap<NodeId, SimNode>,
node_to_host: BTreeMap<NodeId, HostId>,
host_to_node: BTreeMap<HostId, NodeId>,
fault_script: FaultScript,
sim_time: Duration,
tick: Duration,
trace: Vec<HarnessTraceEvent>,
next_msg_id: u64,
msg_store: BTreeMap<u64, StoredEnvelope>,
}
#[derive(Clone, Debug)]
pub struct HarnessTraceEvent {
pub time: Duration,
pub kind: HarnessTraceKind,
}
#[derive(Clone, Debug)]
pub enum HarnessTraceKind {
MessageSent {
from: NodeId,
to: NodeId,
msg_type: String,
},
MessageDelivered {
from: NodeId,
to: NodeId,
msg_type: String,
},
FaultInjected(String),
TaskCompleted {
node: NodeId,
task_id: RemoteTaskId,
},
}
impl DistributedHarness {
#[must_use]
pub fn new(config: NetworkConfig) -> Self {
let tick = normalized_tick(config.tick_resolution);
Self {
network: SimulatedNetwork::new(config),
nodes: BTreeMap::new(),
node_to_host: BTreeMap::new(),
host_to_node: BTreeMap::new(),
fault_script: FaultScript::new(),
sim_time: Duration::ZERO,
tick,
trace: Vec::new(),
next_msg_id: 1,
msg_store: BTreeMap::new(),
}
}
pub fn add_node(&mut self, name: &str) -> NodeId {
let node_id = NodeId::new(name);
let host_id = self.network.add_host(name);
let sim_node = SimNode::new(node_id.clone(), host_id);
self.nodes.insert(node_id.clone(), sim_node);
self.node_to_host.insert(node_id.clone(), host_id);
self.host_to_node.insert(host_id, node_id.clone());
node_id
}
pub fn set_fault_script(&mut self, script: FaultScript) {
self.fault_script = script;
}
pub fn set_tick(&mut self, tick: Duration) {
self.tick = normalized_tick(tick);
}
pub fn inject_spawn(&mut self, origin: &NodeId, target: &NodeId, task_id: RemoteTaskId) {
let req = SpawnRequest {
remote_task_id: task_id,
computation: crate::remote::ComputationName::new("test-computation"),
input: crate::remote::RemoteInput::new(vec![]),
lease: Duration::from_secs(30),
idempotency_key: IdempotencyKey::from_raw(u128::from(task_id.raw())),
budget: None,
origin_node: origin.clone(),
origin_region: crate::types::RegionId::new_for_test(0, 0),
origin_task: crate::types::TaskId::new_for_test(0, 0),
};
let msg = RemoteMessage::SpawnRequest(req);
self.send_message(origin, target, &msg);
}
pub fn inject_cancel(&mut self, origin: &NodeId, target: &NodeId, task_id: RemoteTaskId) {
let cancel = CancelRequest {
remote_task_id: task_id,
reason: crate::types::CancelReason::user("harness cancel"),
origin_node: origin.clone(),
};
let msg = RemoteMessage::CancelRequest(cancel);
self.send_message(origin, target, &msg);
}
fn send_message(&mut self, from: &NodeId, to: &NodeId, msg: &RemoteMessage) {
let src = self.node_to_host[from];
let Some(&dst) = self.node_to_host.get(to) else {
if let RemoteMessage::SpawnRequest(req) = msg {
let tx = if let Some(node) = self.nodes.get_mut(from) {
let mut pending = node.pending_results.lock();
pending.get_mut(&req.remote_task_id).and_then(|entry| {
entry.state = RemoteTaskState::Failed;
entry.tx.take()
})
} else {
None
};
if let Some(tx) = tx {
let cx = Cx::for_testing();
let _ = tx.send(
&cx,
Err(RemoteError::NodeUnreachable(to.as_str().to_owned())),
);
}
}
return;
};
let msg_type = msg_type_name(msg);
self.trace.push(HarnessTraceEvent {
time: self.sim_time,
kind: HarnessTraceKind::MessageSent {
from: from.clone(),
to: to.clone(),
msg_type: msg_type.to_string(),
},
});
let sender_time = self.nodes.get_mut(from).map_or_else(
|| LogicalTime::Vector(VectorClock::new()),
|node| LogicalTime::Vector(node.causal.on_send()),
);
let envelope = MessageEnvelope::new(from.clone(), sender_time, msg.clone());
let encoded = self.encode_message(&envelope);
self.network.send(src, dst, Bytes::from(encoded));
}
pub fn run_for(&mut self, duration: Duration) {
let target = self.sim_time.saturating_add(duration);
let fault_events: Vec<FaultEvent> = self
.fault_script
.sorted_events()
.into_iter()
.cloned()
.collect();
let mut next_fault = 0;
while self.sim_time < target {
while fault_events
.get(next_fault)
.is_some_and(|fe| fe.at < self.sim_time)
{
next_fault += 1;
}
let step_end = self.sim_time.saturating_add(self.tick).min(target);
let next_stop = fault_events
.get(next_fault)
.filter(|fe| fe.at < step_end)
.map_or(step_end, |fe| fe.at);
if next_stop > self.sim_time {
self.advance_segment(next_stop.saturating_sub(self.sim_time));
}
while let Some(fe) = fault_events.get(next_fault) {
if fe.at != self.sim_time || fe.at >= target {
break;
}
self.execute_fault(&fe.fault);
next_fault += 1;
}
}
}
fn deliver_packets(&mut self) {
let mut raw_payloads: Vec<(NodeId, Bytes)> = Vec::new();
let node_hosts: Vec<(NodeId, HostId)> = self
.nodes
.iter()
.map(|(node_id, node)| (node_id.clone(), node.host_id))
.collect();
for (node_id, host_id) in node_hosts {
if let Some(packets) = self.network.take_inbox(host_id) {
for packet in packets {
raw_payloads.push((node_id.clone(), packet.payload));
}
}
}
let mut deliveries: Vec<(NodeId, MessageEnvelope<RemoteMessage>)> = Vec::new();
for (node_id, payload) in raw_payloads {
if let Some(envelope) = self.decode_message(&payload) {
let src_node = envelope.sender.clone();
self.trace.push(HarnessTraceEvent {
time: self.sim_time,
kind: HarnessTraceKind::MessageDelivered {
from: src_node,
to: node_id.clone(),
msg_type: msg_type_name(&envelope.payload).to_string(),
},
});
deliveries.push((node_id, envelope));
}
}
let now = {
let nanos = self.sim_time.as_nanos().min(u128::from(u64::MAX)) as u64;
Time::from_nanos(nanos)
};
for (node_id, envelope) in deliveries {
if let Some(node) = self.nodes.get_mut(&node_id) {
node.handle_message(envelope, now);
}
}
}
fn tick_nodes(&mut self, elapsed: Duration) {
let mut result_messages: Vec<(NodeId, NodeId, RemoteMessage)> = Vec::new();
for (node_id, node) in &mut self.nodes {
let completed = node.tick(elapsed);
for (dest, msg) in completed {
if let RemoteMessage::ResultDelivery(ref rd) = msg {
self.trace.push(HarnessTraceEvent {
time: self.sim_time,
kind: HarnessTraceKind::TaskCompleted {
node: node_id.clone(),
task_id: rd.remote_task_id,
},
});
}
result_messages.push((node_id.clone(), dest, msg));
}
}
for (from, to, msg) in result_messages {
self.send_message(&from, &to, &msg);
}
}
fn encode_message(&mut self, msg: &MessageEnvelope<RemoteMessage>) -> Vec<u8> {
let id = self.next_msg_id;
self.next_msg_id = self.next_msg_id.wrapping_add(1);
self.msg_store.insert(
id,
StoredEnvelope {
envelope: msg.clone(),
retain_until: None,
},
);
id.to_le_bytes().to_vec()
}
fn decode_message(&mut self, payload: &Bytes) -> Option<MessageEnvelope<RemoteMessage>> {
if payload.len() < 8 {
return None;
}
let id = u64::from_le_bytes(payload[..8].try_into().ok()?);
let stored = self.msg_store.get_mut(&id)?;
if stored.retain_until.is_none() {
stored.retain_until = Some(self.sim_time.saturating_add(MAX_DUPLICATE_PACKET_DELAY));
}
Some(stored.envelope.clone())
}
fn prune_decoded_messages(&mut self) {
let now = self.sim_time;
self.msg_store.retain(|_, stored| {
stored
.retain_until
.is_none_or(|retain_until| now <= retain_until)
});
}
fn advance_segment(&mut self, elapsed: Duration) {
if elapsed.is_zero() {
return;
}
self.network.run_for(elapsed);
self.sim_time = self.sim_time.saturating_add(elapsed);
self.deliver_packets();
self.tick_nodes(elapsed);
self.flush_outboxes();
self.prune_decoded_messages();
}
fn flush_outboxes(&mut self) {
let mut outgoing: Vec<(NodeId, NodeId, RemoteMessage)> = Vec::new();
for (node_id, node) in &mut self.nodes {
for (dest, msg) in node.drain_outbox() {
outgoing.push((node_id.clone(), dest, msg));
}
}
for (from, to, msg) in outgoing {
self.send_message(&from, &to, &msg);
}
}
fn execute_fault(&mut self, fault: &HarnessFault) {
self.trace.push(HarnessTraceEvent {
time: self.sim_time,
kind: HarnessTraceKind::FaultInjected(format!("{fault:?}")),
});
match fault {
HarnessFault::Network(net_fault) => {
self.network.inject_fault(net_fault);
}
HarnessFault::CrashNode(node_id) => {
if let Some(node) = self.nodes.get_mut(node_id) {
let host = node.host_id;
node.crash();
self.network.inject_fault(&Fault::HostCrash { host });
}
}
HarnessFault::RestartNode(node_id) => {
if let Some(node) = self.nodes.get_mut(node_id) {
let host = node.host_id;
node.restart();
self.network.inject_fault(&Fault::HostRestart { host });
}
}
HarnessFault::ExpireLeases(node_id) => {
if let Some(node) = self.nodes.get_mut(node_id) {
let task_ids: Vec<RemoteTaskId> = node.running_tasks.keys().copied().collect();
for tid in task_ids {
if let Some(task) = node.running_tasks.remove(&tid) {
let outcome = RemoteOutcome::Failed("lease expired".into());
let _ = node.dedup.complete(&task.idempotency_key, outcome.clone());
node.outbox.push_back((
task.origin.clone(),
RemoteMessage::ResultDelivery(ResultDelivery {
remote_task_id: tid,
outcome: outcome.clone(),
execution_time: Duration::ZERO,
}),
));
for duplicate_task_id in node.take_duplicate_waiters(tid) {
node.outbox.push_back((
task.origin.clone(),
RemoteMessage::ResultDelivery(ResultDelivery {
remote_task_id: duplicate_task_id,
outcome: outcome.clone(),
execution_time: Duration::ZERO,
}),
));
}
}
}
}
}
}
}
#[must_use]
pub fn node(&self, node_id: &NodeId) -> Option<&SimNode> {
self.nodes.get(node_id)
}
#[must_use]
pub fn trace(&self) -> &[HarnessTraceEvent] {
&self.trace
}
#[must_use]
pub fn network_metrics(&self) -> &crate::lab::network::NetworkMetrics {
self.network.metrics()
}
#[must_use]
pub fn sim_time(&self) -> Duration {
self.sim_time
}
}
fn normalized_tick(tick: Duration) -> Duration {
if tick.is_zero() {
Duration::from_nanos(1)
} else {
tick
}
}
impl fmt::Debug for DistributedHarness {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DistributedHarness")
.field("sim_time", &self.sim_time)
.field("nodes", &self.nodes.keys().collect::<Vec<_>>())
.field("trace_len", &self.trace.len())
.finish_non_exhaustive()
}
}
fn msg_type_name(msg: &RemoteMessage) -> &'static str {
match msg {
RemoteMessage::SpawnRequest(_) => "SpawnRequest",
RemoteMessage::SpawnAck(_) => "SpawnAck",
RemoteMessage::CancelRequest(_) => "CancelRequest",
RemoteMessage::ResultDelivery(_) => "ResultDelivery",
RemoteMessage::LeaseRenewal(_) => "LeaseRenewal",
}
}
#[cfg(test)]
mod tests {
use super::*;
fn register_pending_result(
node: &mut SimNode,
task_id: RemoteTaskId,
) -> crate::channel::oneshot::Receiver<Result<RemoteOutcome, RemoteError>> {
let (tx, rx) = crate::channel::oneshot::channel();
node.pending_results.lock().insert(
task_id,
PendingResultEntry {
tx: Some(tx),
state: RemoteTaskState::Pending,
},
);
rx
}
fn setup_harness() -> (DistributedHarness, NodeId, NodeId) {
let config = NetworkConfig {
default_conditions: crate::lab::network::NetworkConditions::local(),
..NetworkConfig::default()
};
let mut harness = DistributedHarness::new(config);
let a = harness.add_node("node-a");
let b = harness.add_node("node-b");
(harness, a, b)
}
#[test]
fn spawn_and_complete_across_nodes() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(500));
let node_b = harness.node(&b).unwrap();
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::SpawnReceived { .. }))
);
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::SpawnAccepted { .. }))
);
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::TaskCompleted { .. }))
);
}
#[test]
fn duplicate_spawn_resends_ack() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(10));
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(10));
let ack_count = harness
.trace()
.iter()
.filter(|event| {
matches!(
event,
HarnessTraceEvent {
kind: HarnessTraceKind::MessageSent { from, to, msg_type },
..
} if from == &b && to == &a && msg_type == "SpawnAck"
)
})
.count();
assert!(ack_count >= 2);
let node_b = harness.node(&b).unwrap();
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::DuplicateSpawn { .. }))
);
assert_eq!(node_b.running_task_count(), 1);
}
#[test]
fn duplicate_spawn_after_completion_resends_result() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(300));
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(20));
let result_count = harness
.trace()
.iter()
.filter(|event| {
matches!(
event,
HarnessTraceEvent {
kind: HarnessTraceKind::MessageSent { from, to, msg_type },
..
} if from == &b && to == &a && msg_type == "ResultDelivery"
)
})
.count();
assert!(result_count >= 2);
}
#[test]
fn duplicate_spawn_with_fresh_task_id_delivers_result_to_retry_handle() {
let (mut harness, a, b) = setup_harness();
let canonical_task_id = RemoteTaskId::from_raw(9001);
let retry_task_id = RemoteTaskId::from_raw(9002);
let key = IdempotencyKey::from_raw(0xfeed_beef);
let mut rx1 =
register_pending_result(harness.nodes.get_mut(&a).unwrap(), canonical_task_id);
let mut rx2 = register_pending_result(harness.nodes.get_mut(&a).unwrap(), retry_task_id);
harness.send_message(
&a,
&b,
&RemoteMessage::SpawnRequest(SpawnRequest {
remote_task_id: canonical_task_id,
computation: crate::remote::ComputationName::new("test-computation"),
input: crate::remote::RemoteInput::new(vec![1, 2, 3]),
lease: Duration::from_secs(30),
idempotency_key: key,
budget: None,
origin_node: a.clone(),
origin_region: crate::types::RegionId::new_for_test(0, 0),
origin_task: crate::types::TaskId::new_for_test(0, 0),
}),
);
harness.run_for(Duration::from_millis(10));
harness.send_message(
&a,
&b,
&RemoteMessage::SpawnRequest(SpawnRequest {
remote_task_id: retry_task_id,
computation: crate::remote::ComputationName::new("test-computation"),
input: crate::remote::RemoteInput::new(vec![1, 2, 3]),
lease: Duration::from_secs(30),
idempotency_key: key,
budget: None,
origin_node: a.clone(),
origin_region: crate::types::RegionId::new_for_test(0, 0),
origin_task: crate::types::TaskId::new_for_test(0, 0),
}),
);
harness.run_for(Duration::from_millis(200));
let origin = harness.node(&a).unwrap();
let pending = origin.pending_results.lock();
assert_eq!(
pending.get(&canonical_task_id).map(|entry| entry.state),
Some(RemoteTaskState::Completed)
);
assert_eq!(
pending.get(&retry_task_id).map(|entry| entry.state),
Some(RemoteTaskState::Completed)
);
drop(pending);
let outcome1 = rx1.try_recv().expect("canonical result");
let outcome2 = rx2.try_recv().expect("retry result");
assert!(matches!(outcome1, Ok(RemoteOutcome::Success(_))));
assert!(matches!(outcome2, Ok(RemoteOutcome::Success(_))));
let remote = harness.node(&b).unwrap();
assert!(
remote.duplicate_waiters.is_empty(),
"duplicate waiter aliases must be cleared once the canonical task completes"
);
assert!(
remote.duplicate_aliases.is_empty(),
"duplicate alias reverse map must be cleared once the canonical task completes"
);
}
#[test]
fn duplicate_cancel_with_fresh_task_id_cancels_canonical_task() {
let (mut harness, a, b) = setup_harness();
let canonical_task_id = RemoteTaskId::from_raw(9101);
let retry_task_id = RemoteTaskId::from_raw(9102);
let key = IdempotencyKey::from_raw(0xcafe_feed);
let mut rx1 =
register_pending_result(harness.nodes.get_mut(&a).unwrap(), canonical_task_id);
let mut rx2 = register_pending_result(harness.nodes.get_mut(&a).unwrap(), retry_task_id);
let make_request = |remote_task_id| SpawnRequest {
remote_task_id,
computation: crate::remote::ComputationName::new("test-computation"),
input: crate::remote::RemoteInput::new(vec![4, 5, 6]),
lease: Duration::from_secs(30),
idempotency_key: key,
budget: None,
origin_node: a.clone(),
origin_region: crate::types::RegionId::new_for_test(0, 0),
origin_task: crate::types::TaskId::new_for_test(0, 0),
};
harness.send_message(
&a,
&b,
&RemoteMessage::SpawnRequest(make_request(canonical_task_id)),
);
harness.run_for(Duration::from_millis(10));
harness.send_message(
&a,
&b,
&RemoteMessage::SpawnRequest(make_request(retry_task_id)),
);
harness.run_for(Duration::from_millis(10));
harness.send_message(
&a,
&b,
&RemoteMessage::CancelRequest(CancelRequest {
remote_task_id: retry_task_id,
reason: crate::types::CancelReason::user("retry cancel"),
origin_node: a.clone(),
}),
);
harness.run_for(Duration::from_millis(200));
let remote = harness.node(&b).unwrap();
assert!(
remote.events().iter().any(|event| matches!(
event,
NodeEvent::TaskCancelled { task_id } if *task_id == canonical_task_id
)),
"duplicate cancel must cancel the canonical running task"
);
let outcome1 = rx1.try_recv().expect("canonical cancelled result");
let outcome2 = rx2.try_recv().expect("retry cancelled result");
assert!(matches!(outcome1, Ok(RemoteOutcome::Cancelled(_))));
assert!(matches!(outcome2, Ok(RemoteOutcome::Cancelled(_))));
}
#[test]
fn cancel_propagates() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(10));
harness.inject_cancel(&a, &b, task_id);
harness.run_for(Duration::from_millis(200));
let node_b = harness.node(&b).unwrap();
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::CancelReceived { .. }))
);
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::TaskCancelled { .. }))
);
}
#[test]
fn partition_prevents_delivery() {
let config = NetworkConfig {
default_conditions: crate::lab::network::NetworkConditions::local(),
..NetworkConfig::default()
};
let mut harness = DistributedHarness::new(config);
let a = harness.add_node("node-a");
let b = harness.add_node("node-b");
let host_a = harness.node(&a).unwrap().host_id;
let host_b = harness.node(&b).unwrap().host_id;
harness.execute_fault(&HarnessFault::Network(Fault::Partition {
hosts_a: vec![host_a],
hosts_b: vec![host_b],
}));
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(100));
let node_b = harness.node(&b).unwrap();
assert!(
!node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::SpawnReceived { .. }))
);
}
#[test]
fn node_crash_drops_tasks() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(10));
harness.execute_fault(&HarnessFault::CrashNode(b.clone()));
let node_b = harness.node(&b).unwrap();
assert!(node_b.crashed);
assert_eq!(node_b.running_task_count(), 0);
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::Crashed))
);
}
#[test]
fn in_flight_messages_to_crashed_node_do_not_resurface_after_restart() {
let config = NetworkConfig {
default_conditions: crate::lab::network::NetworkConditions {
latency: crate::lab::network::LatencyModel::Fixed(Duration::from_millis(50)),
..crate::lab::network::NetworkConditions::ideal()
},
..NetworkConfig::default()
};
let mut harness = DistributedHarness::new(config);
let a = harness.add_node("node-a");
let b = harness.add_node("node-b");
let task_id = RemoteTaskId::next();
harness.set_fault_script(
FaultScript::new()
.at(
Duration::from_millis(10),
HarnessFault::CrashNode(b.clone()),
)
.at(
Duration::from_millis(20),
HarnessFault::RestartNode(b.clone()),
),
);
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(200));
let events = harness.node(&b).unwrap().events();
assert!(events.iter().any(|e| matches!(e, NodeEvent::Crashed)));
assert!(events.iter().any(|e| matches!(e, NodeEvent::Restarted)));
assert!(!events.iter().any(
|e| matches!(e, NodeEvent::SpawnReceived { task_id: seen, .. } if *seen == task_id)
));
}
#[test]
fn lease_expiry_fails_tasks() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(10));
harness.execute_fault(&HarnessFault::ExpireLeases(b.clone()));
let node_b = harness.node(&b).unwrap();
assert_eq!(node_b.running_task_count(), 0);
}
#[test]
fn rejected_spawn_ack_fails_pending_remote_handle() {
let (mut harness, a, b) = setup_harness();
let cap = harness.node(&a).unwrap().create_cap();
let cx = Cx::for_testing().with_remote_cap(cap);
let assigned_node = b.clone();
let mut handle = crate::spawn_remote(
&cx,
b,
crate::ComputationName::new("compute"),
crate::remote::RemoteInput::empty(),
)
.expect("spawn");
assert_eq!(handle.state(), RemoteTaskState::Pending);
harness
.nodes
.get_mut(&a)
.expect("origin node")
.handle_spawn_ack(SpawnAck {
remote_task_id: handle.remote_task_id(),
status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
assigned_node,
});
assert_eq!(handle.state(), RemoteTaskState::Failed);
let err = handle.try_join().expect_err("rejected spawn");
assert_eq!(
err,
RemoteError::SpawnRejected(SpawnRejectReason::CapacityExceeded)
);
assert_eq!(handle.state(), RemoteTaskState::Failed);
}
#[test]
fn late_rejected_spawn_ack_does_not_clobber_running_remote_handle() {
let (mut harness, a, b) = setup_harness();
let cap = harness.node(&a).unwrap().create_cap();
let cx = Cx::for_testing().with_remote_cap(cap);
let assigned_node = b.clone();
let mut handle = crate::spawn_remote(
&cx,
b,
crate::ComputationName::new("compute"),
crate::remote::RemoteInput::empty(),
)
.expect("spawn");
harness.run_for(Duration::from_millis(15));
assert_eq!(handle.state(), RemoteTaskState::Running);
harness
.nodes
.get_mut(&a)
.expect("origin node")
.handle_spawn_ack(SpawnAck {
remote_task_id: handle.remote_task_id(),
status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
assigned_node,
});
assert_eq!(handle.state(), RemoteTaskState::Running);
assert!(matches!(handle.try_join(), Ok(None)));
harness.run_for(Duration::from_millis(200));
let outcome = handle
.try_join()
.expect("result available")
.expect("remote outcome");
assert!(matches!(outcome, RemoteOutcome::Success(_)));
assert_eq!(handle.state(), RemoteTaskState::Completed);
}
#[test]
fn duplicate_terminal_result_does_not_overwrite_completed_state() {
let (mut harness, a, b) = setup_harness();
let cap = harness.node(&a).unwrap().create_cap();
let cx = Cx::for_testing().with_remote_cap(cap);
let mut handle = crate::spawn_remote(
&cx,
b,
crate::ComputationName::new("compute"),
crate::remote::RemoteInput::empty(),
)
.expect("spawn");
harness.run_for(Duration::from_millis(200));
let outcome = handle
.try_join()
.expect("result available")
.expect("remote outcome");
assert!(matches!(outcome, RemoteOutcome::Success(_)));
assert_eq!(handle.state(), RemoteTaskState::Completed);
harness
.nodes
.get_mut(&a)
.expect("origin node")
.handle_result(ResultDelivery {
remote_task_id: handle.remote_task_id(),
outcome: RemoteOutcome::Cancelled(crate::types::CancelReason::user(
"protocol violation",
)),
execution_time: Duration::ZERO,
});
assert_eq!(handle.state(), RemoteTaskState::Completed);
assert!(matches!(
handle.try_join(),
Err(RemoteError::PolledAfterCompletion)
));
}
#[test]
fn dropped_handle_terminal_delivery_clears_pending_state_after_disconnected_send() {
let (mut harness, a, b) = setup_harness();
let cap = harness.node(&a).unwrap().create_cap();
let cx = Cx::for_testing().with_remote_cap(cap);
let handle = crate::spawn_remote(
&cx,
b,
crate::ComputationName::new("compute"),
crate::remote::RemoteInput::empty(),
)
.expect("spawn");
harness.run_for(Duration::from_millis(15));
let remote_task_id = handle.remote_task_id();
drop(handle);
harness
.nodes
.get_mut(&a)
.expect("origin node")
.handle_result(ResultDelivery {
remote_task_id,
outcome: RemoteOutcome::Success(vec![]),
execution_time: Duration::ZERO,
});
assert!(
harness
.nodes
.get(&a)
.expect("origin node")
.pending_results
.lock()
.get(&remote_task_id)
.is_none(),
"terminal delivery into a dropped handle should clear pending bookkeeping"
);
}
#[test]
fn fault_script_executes_in_order() {
let config = NetworkConfig {
default_conditions: crate::lab::network::NetworkConditions::local(),
..NetworkConfig::default()
};
let mut harness = DistributedHarness::new(config);
let a = harness.add_node("node-a");
let b = harness.add_node("node-b");
let script = FaultScript::new()
.at(
Duration::from_millis(50),
HarnessFault::CrashNode(b.clone()),
)
.at(
Duration::from_millis(150),
HarnessFault::RestartNode(b.clone()),
);
harness.set_fault_script(script);
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(200));
let node_b = harness.node(&b).unwrap();
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::Crashed))
);
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::Restarted))
);
assert!(!node_b.crashed);
}
#[test]
fn deterministic_replay() {
fn run_scenario() -> Vec<String> {
let config = NetworkConfig {
seed: 42,
default_conditions: crate::lab::network::NetworkConditions::lan(),
..NetworkConfig::default()
};
let mut harness = DistributedHarness::new(config);
let a = harness.add_node("node-a");
let b = harness.add_node("node-b");
let task_id = RemoteTaskId::from_raw(1000);
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(300));
harness
.trace()
.iter()
.map(|e| format!("{:?}:{:?}", e.time, e.kind))
.collect()
}
let run1 = run_scenario();
let run2 = run_scenario();
assert_eq!(run1, run2, "Replay should be deterministic");
}
#[test]
fn harness_drains_network_inboxes_after_delivery() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_secs(1));
let host_a = harness.node(&a).unwrap().host_id;
let host_b = harness.node(&b).unwrap().host_id;
assert!(harness.network.inbox(host_a).unwrap().is_empty());
assert!(harness.network.inbox(host_b).unwrap().is_empty());
}
#[test]
fn idempotent_spawn_dedup() {
let (mut harness, a, b) = setup_harness();
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(50));
let node_b = harness.node(&b).unwrap();
let spawn_count = node_b
.events()
.iter()
.filter(|e| matches!(e, NodeEvent::SpawnAccepted { .. }))
.count();
let dedup_count = node_b
.events()
.iter()
.filter(|e| matches!(e, NodeEvent::DuplicateSpawn { .. }))
.count();
assert_eq!(spawn_count, 1);
assert_eq!(dedup_count, 1);
}
#[test]
fn duplicate_spawn_reuses_cached_outcome_but_echoes_retry_task_id() {
let mut node = SimNode::new(NodeId::new("node-b"), HostId::new(1));
let origin = NodeId::new("node-a");
let idempotency_key = IdempotencyKey::from_raw(0xD00D);
let first_task = RemoteTaskId::from_raw(41);
let retry_task = RemoteTaskId::from_raw(99);
let make_request = |remote_task_id| SpawnRequest {
remote_task_id,
computation: crate::remote::ComputationName::new("test-computation"),
input: crate::remote::RemoteInput::new(vec![1, 2, 3]),
lease: Duration::from_secs(30),
idempotency_key,
budget: None,
origin_node: origin.clone(),
origin_region: crate::types::RegionId::new_for_test(0, 0),
origin_task: crate::types::TaskId::new_for_test(0, 0),
};
node.handle_spawn(make_request(first_task), Time::from_secs(1));
node.outbox.clear();
let _ = node.tick(Duration::from_millis(100));
node.handle_spawn(make_request(retry_task), Time::from_secs(2));
let (_, ack) = node.outbox.pop_front().expect("duplicate spawn ack");
match ack {
RemoteMessage::SpawnAck(SpawnAck {
remote_task_id,
status: SpawnAckStatus::Accepted,
..
}) => assert_eq!(remote_task_id, retry_task),
other => panic!("unexpected duplicate ack: {other:?}"),
}
let (_, delivery) = node.outbox.pop_front().expect("cached result delivery");
match delivery {
RemoteMessage::ResultDelivery(ResultDelivery {
remote_task_id,
outcome,
..
}) => {
assert_eq!(remote_task_id, retry_task);
assert!(outcome.is_success());
}
other => panic!("unexpected cached result: {other:?}"),
}
}
#[test]
fn duplicated_network_packets_reach_node_logic() {
let config = NetworkConfig {
default_conditions: crate::lab::network::NetworkConditions {
packet_duplicate: 1.0,
..crate::lab::network::NetworkConditions::local()
},
..NetworkConfig::default()
};
let mut harness = DistributedHarness::new(config);
let a = harness.add_node("node-a");
let b = harness.add_node("node-b");
let task_id = RemoteTaskId::next();
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_millis(50));
let node_b = harness.node(&b).unwrap();
let accepted = node_b
.events()
.iter()
.filter(|e| matches!(e, NodeEvent::SpawnAccepted { task_id: seen } if *seen == task_id))
.count();
let duplicates = node_b
.events()
.iter()
.filter(
|e| matches!(e, NodeEvent::DuplicateSpawn { task_id: seen } if *seen == task_id),
)
.count();
assert_eq!(accepted, 1);
assert_eq!(duplicates, 1);
}
#[test]
fn idempotent_spawn_ttl_expiry_allows_fresh_spawn() {
let (mut harness, a, b) = setup_harness();
harness.set_tick(Duration::from_secs(1));
let task_id = RemoteTaskId::from_raw(7_777);
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_secs(2));
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_secs(2));
let node_b = harness.node(&b).unwrap();
let accepted_before = node_b
.events()
.iter()
.filter(|e| matches!(e, NodeEvent::SpawnAccepted { .. }))
.count();
let dedup_before = node_b
.events()
.iter()
.filter(|e| matches!(e, NodeEvent::DuplicateSpawn { .. }))
.count();
assert_eq!(accepted_before, 1);
assert_eq!(dedup_before, 1);
harness.run_for(Duration::from_secs(301));
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_secs(2));
let node_b = harness.node(&b).unwrap();
let accepted_after = node_b
.events()
.iter()
.filter(|e| matches!(e, NodeEvent::SpawnAccepted { .. }))
.count();
let dedup_after = node_b
.events()
.iter()
.filter(|e| matches!(e, NodeEvent::DuplicateSpawn { .. }))
.count();
assert_eq!(
accepted_after, 2,
"expired dedup entry should allow respawn"
);
assert_eq!(dedup_after, 1, "only pre-expiry replay should deduplicate");
}
#[test]
fn new_harness_uses_configured_tick_resolution() {
let harness = DistributedHarness::new(NetworkConfig {
tick_resolution: Duration::from_micros(250),
..NetworkConfig::default()
});
assert_eq!(harness.tick, Duration::from_micros(250));
}
#[test]
fn zero_tick_is_clamped_to_one_nanosecond() {
let mut harness = DistributedHarness::new(NetworkConfig {
tick_resolution: Duration::ZERO,
..NetworkConfig::default()
});
assert_eq!(harness.tick, Duration::from_nanos(1));
harness.set_tick(Duration::ZERO);
assert_eq!(harness.tick, Duration::from_nanos(1));
}
#[test]
fn run_for_caps_final_step_to_remaining_duration() {
let (mut harness, _, _) = setup_harness();
harness.set_tick(Duration::from_millis(1));
harness.run_for(Duration::from_micros(250));
assert_eq!(harness.sim_time(), Duration::from_micros(250));
}
#[test]
fn faults_within_large_tick_execute_at_their_scheduled_time() {
let config = NetworkConfig {
default_conditions: crate::lab::network::NetworkConditions {
latency: crate::lab::network::LatencyModel::Fixed(Duration::from_millis(500)),
..crate::lab::network::NetworkConditions::ideal()
},
tick_resolution: Duration::from_secs(1),
..NetworkConfig::default()
};
let mut harness = DistributedHarness::new(config);
let a = harness.add_node("node-a");
let b = harness.add_node("node-b");
let task_id = RemoteTaskId::next();
harness.set_fault_script(FaultScript::new().at(
Duration::from_millis(900),
HarnessFault::CrashNode(b.clone()),
));
harness.inject_spawn(&a, &b, task_id);
harness.run_for(Duration::from_secs(1));
let node_b = harness.node(&b).unwrap();
assert!(node_b.events().iter().any(
|event| matches!(event, NodeEvent::SpawnReceived { task_id: seen, .. } if *seen == task_id)
));
let fault_times: Vec<_> = harness
.trace()
.iter()
.filter_map(|event| {
matches!(event.kind, HarnessTraceKind::FaultInjected(_)).then_some(event.time)
})
.collect();
assert_eq!(fault_times, vec![Duration::from_millis(900)]);
}
#[test]
fn same_tick_faults_follow_timestamp_order() {
let (mut harness, _, b) = setup_harness();
harness.set_tick(Duration::from_secs(1));
harness.set_fault_script(
FaultScript::new()
.at(
Duration::from_millis(900),
HarnessFault::RestartNode(b.clone()),
)
.at(
Duration::from_millis(100),
HarnessFault::CrashNode(b.clone()),
),
);
harness.run_for(Duration::from_secs(1));
let node_b = harness.node(&b).unwrap();
let crash_idx = node_b
.events()
.iter()
.position(|event| matches!(event, NodeEvent::Crashed))
.unwrap();
let restart_idx = node_b
.events()
.iter()
.position(|event| matches!(event, NodeEvent::Restarted))
.unwrap();
assert!(crash_idx < restart_idx);
assert!(!node_b.crashed);
}
#[test]
fn partition_heal_recovers_delivery() {
let (mut harness, a, b) = setup_harness();
let host_a = harness.node(&a).unwrap().host_id;
let host_b = harness.node(&b).unwrap().host_id;
harness.execute_fault(&HarnessFault::Network(Fault::Partition {
hosts_a: vec![host_a],
hosts_b: vec![host_b],
}));
let dropped_task = RemoteTaskId::next();
harness.inject_spawn(&a, &b, dropped_task);
harness.run_for(Duration::from_millis(100));
let node_b = harness.node(&b).unwrap();
assert!(!node_b.events().iter().any(
|e| matches!(e, NodeEvent::SpawnReceived { task_id, .. } if *task_id == dropped_task)
));
harness.execute_fault(&HarnessFault::Network(Fault::Heal {
hosts_a: vec![host_a],
hosts_b: vec![host_b],
}));
let recovered_task = RemoteTaskId::next();
harness.inject_spawn(&a, &b, recovered_task);
harness.run_for(Duration::from_millis(250));
let node_b = harness.node(&b).unwrap();
assert!(node_b.events().iter().any(
|e| matches!(e, NodeEvent::SpawnReceived { task_id, .. } if *task_id == recovered_task)
));
assert!(node_b.events().iter().any(
|e| matches!(e, NodeEvent::TaskCompleted { task_id } if *task_id == recovered_task)
));
assert_eq!(node_b.running_task_count(), 0);
}
#[test]
fn crash_restart_recovers_new_tasks() {
let (mut harness, a, b) = setup_harness();
let initial_task = RemoteTaskId::next();
harness.inject_spawn(&a, &b, initial_task);
harness.run_for(Duration::from_millis(10));
harness.execute_fault(&HarnessFault::CrashNode(b.clone()));
let node_b = harness.node(&b).unwrap();
assert!(node_b.crashed);
assert_eq!(node_b.running_task_count(), 0);
harness.execute_fault(&HarnessFault::RestartNode(b.clone()));
let recovered_task = RemoteTaskId::next();
harness.inject_spawn(&a, &b, recovered_task);
harness.run_for(Duration::from_millis(250));
let node_b = harness.node(&b).unwrap();
assert!(!node_b.crashed);
assert!(
node_b
.events()
.iter()
.any(|e| matches!(e, NodeEvent::Restarted))
);
assert!(node_b.events().iter().any(
|e| matches!(e, NodeEvent::TaskCompleted { task_id } if *task_id == recovered_task)
));
assert_eq!(node_b.running_task_count(), 0);
}
#[test]
fn message_loss_then_recovery_delivers_new_work() {
let (mut harness, a, b) = setup_harness();
let host_a = harness.node(&a).unwrap().host_id;
let host_b = harness.node(&b).unwrap().host_id;
let mut loss = crate::lab::network::NetworkConditions::local();
loss.packet_loss = 1.0;
harness
.network
.set_link_conditions(host_a, host_b, loss.clone());
harness.network.set_link_conditions(host_b, host_a, loss);
let dropped_task = RemoteTaskId::next();
harness.inject_spawn(&a, &b, dropped_task);
harness.run_for(Duration::from_millis(100));
let node_b = harness.node(&b).unwrap();
assert!(!node_b.events().iter().any(
|e| matches!(e, NodeEvent::SpawnReceived { task_id, .. } if *task_id == dropped_task)
));
assert!(harness.network_metrics().packets_dropped > 0);
let recovered = crate::lab::network::NetworkConditions::local();
harness
.network
.set_link_conditions(host_a, host_b, recovered.clone());
harness
.network
.set_link_conditions(host_b, host_a, recovered);
let recovered_task = RemoteTaskId::next();
harness.inject_spawn(&a, &b, recovered_task);
harness.run_for(Duration::from_millis(250));
let node_b = harness.node(&b).unwrap();
assert!(node_b.events().iter().any(
|e| matches!(e, NodeEvent::TaskCompleted { task_id } if *task_id == recovered_task)
));
assert_eq!(node_b.running_task_count(), 0);
}
#[test]
fn clock_skew_advances_causal_clock() {
let (mut harness, a, b) = setup_harness();
let host_a = harness.node(&a).unwrap().host_id;
let host_b = harness.node(&b).unwrap().host_id;
let task_id = RemoteTaskId::next();
let req = SpawnRequest {
remote_task_id: task_id,
computation: crate::remote::ComputationName::new("test-computation"),
input: crate::remote::RemoteInput::new(vec![]),
lease: Duration::from_secs(30),
idempotency_key: IdempotencyKey::from_raw(u128::from(task_id.raw())),
budget: None,
origin_node: a.clone(),
origin_region: crate::types::RegionId::new_for_test(0, 0),
origin_task: crate::types::TaskId::new_for_test(0, 0),
};
let mut skewed = VectorClock::new();
skewed.set(&a, 100);
let envelope = MessageEnvelope::new(
a.clone(),
LogicalTime::Vector(skewed.clone()),
RemoteMessage::SpawnRequest(req),
);
let encoded = harness.encode_message(&envelope);
harness.network.send(host_a, host_b, Bytes::from(encoded));
harness.run_for(Duration::from_millis(250));
let node_b = harness.node(&b).unwrap();
assert!(node_b.events().iter().any(
|e| matches!(e, NodeEvent::SpawnReceived { task_id: seen, .. } if *seen == task_id)
));
let clock = node_b.causal_tracker().current_clock();
assert!(
clock.get(&a) >= 100,
"expected skewed clock to merge into receiver"
);
assert_eq!(node_b.running_task_count(), 0);
}
}