use crate::channel::oneshot;
use crate::cx::Cx;
use crate::trace::distributed::{LogicalClockHandle, LogicalTime};
use crate::types::{Budget, CancelReason, ObligationId, RegionId, TaskId, Time};
use crate::util::det_hash::DetHashMap;
use std::fmt;
use std::marker::PhantomData;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
static REMOTE_TASK_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct NodeId(String);
impl NodeId {
#[must_use]
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for NodeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Node({})", self.0)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RemoteTaskId(u64);
impl RemoteTaskId {
#[must_use]
pub fn next() -> Self {
Self(REMOTE_TASK_COUNTER.fetch_add(1, Ordering::Relaxed))
}
#[must_use]
pub const fn from_raw(value: u64) -> Self {
Self(value)
}
#[must_use]
pub const fn raw(self) -> u64 {
self.0
}
}
impl fmt::Display for RemoteTaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RT{}", self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct ComputationName(String);
impl ComputationName {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self(name.into())
}
#[must_use]
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for ComputationName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RemoteInput {
data: Vec<u8>,
}
impl RemoteInput {
#[must_use]
pub fn new(data: Vec<u8>) -> Self {
Self { data }
}
#[must_use]
pub fn empty() -> Self {
Self { data: Vec::new() }
}
#[must_use]
pub fn data(&self) -> &[u8] {
&self.data
}
#[must_use]
pub fn into_data(self) -> Vec<u8> {
self.data
}
#[must_use]
pub fn len(&self) -> usize {
self.data.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
pub trait RemoteRuntime: Send + Sync + fmt::Debug {
fn send_message(
&self,
destination: &NodeId,
envelope: MessageEnvelope<RemoteMessage>,
) -> Result<(), RemoteError>;
fn register_task(
&self,
task_id: RemoteTaskId,
tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
);
fn observe_task_state(&self, _task_id: RemoteTaskId) -> Option<RemoteTaskState> {
None
}
fn clear_task_state(&self, _task_id: RemoteTaskId) {}
fn unregister_task(&self, _task_id: RemoteTaskId) {}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Phase0RemoteFailure {
NodeUnreachable,
NodeDown,
TransportError(String),
Timeout,
}
impl Phase0RemoteFailure {
fn to_remote_error(&self, node: &NodeId) -> RemoteError {
match self {
Self::NodeUnreachable => RemoteError::NodeUnreachable(node.as_str().to_owned()),
Self::NodeDown => RemoteError::NodeDown(node.as_str().to_owned()),
Self::TransportError(message) => RemoteError::TransportError(message.clone()),
Self::Timeout => RemoteError::Cancelled(CancelReason::timeout()),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Phase0RetryPolicy {
pub max_attempts: u32,
pub initial_backoff: Duration,
pub max_backoff: Duration,
}
impl Default for Phase0RetryPolicy {
fn default() -> Self {
Self {
max_attempts: 3,
initial_backoff: Duration::from_millis(25),
max_backoff: Duration::from_millis(100),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Phase0SimulationConfig {
pub failure: Phase0RemoteFailure,
pub retry: Phase0RetryPolicy,
pub timeout: Duration,
}
impl Default for Phase0SimulationConfig {
fn default() -> Self {
Self {
failure: Phase0RemoteFailure::NodeUnreachable,
retry: Phase0RetryPolicy::default(),
timeout: Duration::from_millis(250),
}
}
}
#[derive(Clone, Debug)]
pub struct RemoteCap {
default_lease: Duration,
remote_budget: Option<Budget>,
local_node: NodeId,
runtime: Option<Arc<dyn RemoteRuntime>>,
phase0_simulation: Phase0SimulationConfig,
}
impl RemoteCap {
#[must_use]
pub fn new() -> Self {
Self {
default_lease: Duration::from_secs(30),
remote_budget: None,
local_node: NodeId::new("local"),
runtime: None,
phase0_simulation: Phase0SimulationConfig::default(),
}
}
#[must_use]
pub fn with_default_lease(mut self, lease: Duration) -> Self {
self.default_lease = lease;
self
}
#[must_use]
pub fn with_remote_budget(mut self, budget: Budget) -> Self {
self.remote_budget = Some(budget);
self
}
#[must_use]
pub fn with_local_node(mut self, node: NodeId) -> Self {
self.local_node = node;
self
}
#[must_use]
pub fn with_runtime(mut self, runtime: Arc<dyn RemoteRuntime>) -> Self {
self.runtime = Some(runtime);
self
}
#[must_use]
pub fn with_phase0_simulation(mut self, config: Phase0SimulationConfig) -> Self {
self.phase0_simulation = config;
self
}
#[must_use]
pub fn with_phase0_failure(mut self, failure: Phase0RemoteFailure) -> Self {
self.phase0_simulation.failure = failure;
self
}
#[must_use]
pub fn with_phase0_retry(mut self, retry: Phase0RetryPolicy) -> Self {
self.phase0_simulation.retry = retry;
self
}
#[must_use]
pub fn with_phase0_timeout(mut self, timeout: Duration) -> Self {
self.phase0_simulation.timeout = timeout;
self
}
#[must_use]
pub fn default_lease(&self) -> Duration {
self.default_lease
}
#[must_use]
pub fn remote_budget(&self) -> Option<&Budget> {
self.remote_budget.as_ref()
}
#[must_use]
pub fn local_node(&self) -> &NodeId {
&self.local_node
}
#[must_use]
pub fn runtime(&self) -> Option<&Arc<dyn RemoteRuntime>> {
self.runtime.as_ref()
}
#[must_use]
pub fn phase0_simulation(&self) -> &Phase0SimulationConfig {
&self.phase0_simulation
}
}
impl Default for RemoteCap {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RemoteTaskState {
Pending,
Running,
Completed,
Failed,
Cancelled,
LeaseExpired,
}
impl fmt::Display for RemoteTaskState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Pending => write!(f, "Pending"),
Self::Running => write!(f, "Running"),
Self::Completed => write!(f, "Completed"),
Self::Failed => write!(f, "Failed"),
Self::Cancelled => write!(f, "Cancelled"),
Self::LeaseExpired => write!(f, "LeaseExpired"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RemoteError {
NoCapability,
NodeUnreachable(String),
NodeDown(String),
UnknownComputation(String),
SpawnRejected(SpawnRejectReason),
LeaseExpired,
PolledAfterCompletion,
Cancelled(CancelReason),
RemotePanic(String),
SerializationError(String),
TransportError(String),
}
impl fmt::Display for RemoteError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::NoCapability => write!(f, "remote capability not available"),
Self::NodeUnreachable(node) => write!(f, "node unreachable: {node}"),
Self::NodeDown(node) => write!(f, "node down: {node}"),
Self::UnknownComputation(name) => {
write!(f, "unknown computation: {name}")
}
Self::SpawnRejected(reason) => write!(f, "remote spawn rejected: {reason}"),
Self::LeaseExpired => write!(f, "remote task lease expired"),
Self::PolledAfterCompletion => {
write!(f, "remote handle polled after completion")
}
Self::Cancelled(reason) => write!(f, "remote task cancelled: {reason}"),
Self::RemotePanic(msg) => write!(f, "remote task panicked: {msg}"),
Self::SerializationError(msg) => write!(f, "serialization error: {msg}"),
Self::TransportError(msg) => write!(f, "transport error: {msg}"),
}
}
}
impl std::error::Error for RemoteError {}
pub struct RemoteHandle {
remote_task_id: RemoteTaskId,
local_task_id: Option<TaskId>,
origin_node: NodeId,
node: NodeId,
computation: ComputationName,
owner_region: RegionId,
runtime: Option<Arc<dyn RemoteRuntime>>,
receiver: oneshot::Receiver<Result<RemoteOutcome, RemoteError>>,
sender_clock: LogicalClockHandle,
lease: Duration,
state: RemoteTaskState,
completed: bool,
}
impl Drop for RemoteHandle {
fn drop(&mut self) {
if self.completed {
return;
}
let observed_state = self.observed_state();
self.state = observed_state;
let should_cancel = self.should_request_cancel();
if should_cancel {
self.request_cancel(CancelReason::user("remote handle dropped"));
} else if self.receiver.is_ready() || self.receiver.is_closed() || self.runtime.is_none() {
self.clear_runtime_state();
}
}
}
impl fmt::Debug for RemoteHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RemoteHandle")
.field("remote_task_id", &self.remote_task_id)
.field("local_task_id", &self.local_task_id)
.field("origin_node", &self.origin_node)
.field("node", &self.node)
.field("computation", &self.computation)
.field("owner_region", &self.owner_region)
.field("runtime", &self.runtime.as_ref().map(|_| "attached"))
.field("sender_clock", &self.sender_clock)
.field("lease", &self.lease)
.field("state", &self.state)
.field("completed", &self.completed)
.finish_non_exhaustive()
}
}
impl RemoteHandle {
#[inline]
fn terminal_state_for_result(result: &Result<RemoteOutcome, RemoteError>) -> RemoteTaskState {
match result {
Ok(RemoteOutcome::Success(_)) => RemoteTaskState::Completed,
Ok(RemoteOutcome::Cancelled(_)) | Err(RemoteError::Cancelled(_)) => {
RemoteTaskState::Cancelled
}
Err(RemoteError::LeaseExpired) => RemoteTaskState::LeaseExpired,
Ok(RemoteOutcome::Failed(_) | RemoteOutcome::Panicked(_)) | Err(_) => {
RemoteTaskState::Failed
}
}
}
#[inline]
fn closed_reason() -> CancelReason {
CancelReason::user("remote handle channel closed")
}
#[inline]
fn closed_transport_error(state: RemoteTaskState) -> RemoteError {
RemoteError::TransportError(format!(
"remote result channel closed after task reached terminal state {state}"
))
}
#[inline]
fn clear_runtime_state(&self) {
if let Some(runtime) = &self.runtime {
runtime.clear_task_state(self.remote_task_id);
}
}
#[inline]
fn abort_with(&self, origin_node: NodeId, sender_time: LogicalTime, reason: CancelReason) {
let Some(runtime) = &self.runtime else {
return;
};
let envelope = MessageEnvelope::new(
origin_node.clone(),
sender_time,
RemoteMessage::CancelRequest(CancelRequest {
remote_task_id: self.remote_task_id,
reason,
origin_node,
}),
);
let _ = runtime.send_message(&self.node, envelope);
}
#[inline]
fn request_cancel(&self, reason: CancelReason) {
self.abort_with(self.origin_node.clone(), self.sender_clock.tick(), reason);
}
#[inline]
fn observed_state(&self) -> RemoteTaskState {
self.runtime
.as_ref()
.and_then(|runtime| runtime.observe_task_state(self.remote_task_id))
.unwrap_or(self.state)
}
#[inline]
fn has_buffered_terminal_result(&self) -> bool {
self.completed || self.receiver.is_ready()
}
#[inline]
fn should_request_cancel(&self) -> bool {
if self.has_buffered_terminal_result() {
return false;
}
matches!(
self.observed_state(),
RemoteTaskState::Pending | RemoteTaskState::Running
)
}
#[inline]
fn finish_result(
&mut self,
result: Result<RemoteOutcome, RemoteError>,
) -> Result<RemoteOutcome, RemoteError> {
self.completed = true;
self.state = Self::terminal_state_for_result(&result);
self.clear_runtime_state();
result
}
#[inline]
fn finish_closed(&mut self) -> RemoteError {
self.completed = true;
let observed_state = self.observed_state();
self.state = match observed_state {
RemoteTaskState::Pending | RemoteTaskState::Running => RemoteTaskState::Cancelled,
terminal => terminal,
};
self.clear_runtime_state();
match observed_state {
RemoteTaskState::LeaseExpired => RemoteError::LeaseExpired,
RemoteTaskState::Completed | RemoteTaskState::Failed => {
Self::closed_transport_error(observed_state)
}
_ => RemoteError::Cancelled(Self::closed_reason()),
}
}
#[must_use]
pub fn remote_task_id(&self) -> RemoteTaskId {
self.remote_task_id
}
#[must_use]
pub fn local_task_id(&self) -> Option<TaskId> {
self.local_task_id
}
#[must_use]
pub fn node(&self) -> &NodeId {
&self.node
}
#[must_use]
pub fn computation(&self) -> &ComputationName {
&self.computation
}
#[must_use]
pub fn owner_region(&self) -> RegionId {
self.owner_region
}
#[must_use]
pub fn lease(&self) -> Duration {
self.lease
}
#[must_use]
pub fn state(&self) -> RemoteTaskState {
if self.completed {
self.state
} else if let Some(runtime) = &self.runtime {
runtime
.observe_task_state(self.remote_task_id)
.unwrap_or(self.state)
} else {
self.state
}
}
#[must_use]
pub fn is_finished(&self) -> bool {
self.completed || self.receiver.is_ready()
}
pub async fn close(&mut self, cx: &Cx) -> Result<RemoteOutcome, RemoteError> {
if self.completed {
return Err(RemoteError::PolledAfterCompletion);
}
if self.should_request_cancel() {
let reason = cx
.cancel_reason()
.unwrap_or_else(|| CancelReason::user("remote handle close"));
self.request_cancel(reason);
}
match self.receiver.recv_uninterruptible().await {
Ok(result) => self.finish_result(result),
Err(oneshot::RecvError::Closed) => Err(self.finish_closed()),
Err(oneshot::RecvError::Cancelled) => {
unreachable!("RecvUninterruptibleFuture cannot return Cancelled")
}
Err(oneshot::RecvError::PolledAfterCompletion) => {
unreachable!("RemoteHandle::close awaits a fresh uninterruptible recv future")
}
}
}
pub async fn join(&mut self, cx: &Cx) -> Result<RemoteOutcome, RemoteError> {
if self.completed {
return Err(RemoteError::PolledAfterCompletion);
}
match self.receiver.recv(cx).await {
Ok(result) => self.finish_result(result),
Err(oneshot::RecvError::Closed) => Err(self.finish_closed()),
Err(oneshot::RecvError::Cancelled) => {
let reason = cx
.cancel_reason()
.unwrap_or_else(CancelReason::parent_cancelled);
Err(RemoteError::Cancelled(reason))
}
Err(oneshot::RecvError::PolledAfterCompletion) => {
unreachable!("RemoteHandle::join awaits a fresh oneshot recv future")
}
}
}
pub fn try_join(&mut self) -> Result<Option<RemoteOutcome>, RemoteError> {
if self.completed {
return Err(RemoteError::PolledAfterCompletion);
}
match self.receiver.try_recv() {
Ok(result) => Ok(Some(self.finish_result(result)?)),
Err(oneshot::TryRecvError::Empty) => Ok(None),
Err(oneshot::TryRecvError::Closed) => Err(self.finish_closed()),
}
}
pub fn abort(&self, cx: &Cx) {
let Some(cap) = cx.remote() else {
return;
};
if cap.runtime().is_none() {
return;
}
if !self.should_request_cancel() {
return;
}
let reason = cx
.cancel_reason()
.unwrap_or_else(|| CancelReason::user("remote handle abort"));
self.request_cancel(reason);
}
}
pub fn spawn_remote(
cx: &Cx,
node: NodeId,
computation: ComputationName,
input: RemoteInput,
) -> Result<RemoteHandle, RemoteError> {
let cap = cx.remote().ok_or(RemoteError::NoCapability)?;
let remote_task_id = RemoteTaskId::next();
let region = cx.region_id();
let lease = cap.default_lease();
let origin_node = cap.local_node().clone();
let sender_clock = cx.logical_clock_handle();
cx.trace("spawn_remote");
let (tx, rx) = oneshot::channel::<Result<RemoteOutcome, RemoteError>>();
let initial_state = if let Some(runtime) = cap.runtime() {
runtime.register_task(remote_task_id, tx);
let req = SpawnRequest {
remote_task_id,
computation: computation.clone(),
input,
lease,
idempotency_key: IdempotencyKey::generate(cx),
budget: cap.remote_budget,
origin_node: origin_node.clone(),
origin_region: region,
origin_task: cx.task_id(),
};
let sender_time = cx.logical_tick();
let envelope = MessageEnvelope::new(
req.origin_node.clone(),
sender_time,
RemoteMessage::SpawnRequest(req),
);
if let Err(err) = runtime.send_message(&node, envelope) {
runtime.unregister_task(remote_task_id);
return Err(err);
}
RemoteTaskState::Pending
} else {
let fallback_error = cap.phase0_simulation().failure.to_remote_error(&node);
tx.send(cx, Err(fallback_error.clone()))
.expect("fresh remote receiver must accept fallback result");
RemoteHandle::terminal_state_for_result(&Err(fallback_error))
};
Ok(RemoteHandle {
remote_task_id,
local_task_id: None,
origin_node,
node,
computation,
owner_region: region,
runtime: cap.runtime().cloned(),
receiver: rx,
sender_clock,
lease,
state: initial_state,
completed: false,
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeaseError {
Expired,
Released,
CreationFailed(String),
}
impl fmt::Display for LeaseError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Expired => write!(f, "lease expired"),
Self::Released => write!(f, "lease already released"),
Self::CreationFailed(msg) => write!(f, "lease creation failed: {msg}"),
}
}
}
impl std::error::Error for LeaseError {}
#[derive(Debug)]
pub struct Lease {
obligation_id: ObligationId,
region: RegionId,
holder: TaskId,
expires_at: Time,
initial_duration: Duration,
state: LeaseState,
renewal_count: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LeaseState {
Active,
Released,
Expired,
}
impl fmt::Display for LeaseState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Active => write!(f, "Active"),
Self::Released => write!(f, "Released"),
Self::Expired => write!(f, "Expired"),
}
}
}
impl Lease {
#[must_use]
pub fn new(
obligation_id: ObligationId,
region: RegionId,
holder: TaskId,
duration: Duration,
now: Time,
) -> Self {
let expires_at = now + duration;
Self {
obligation_id,
region,
holder,
expires_at,
initial_duration: duration,
state: LeaseState::Active,
renewal_count: 0,
}
}
#[must_use]
pub fn obligation_id(&self) -> ObligationId {
self.obligation_id
}
#[must_use]
pub fn region(&self) -> RegionId {
self.region
}
#[must_use]
pub fn holder(&self) -> TaskId {
self.holder
}
#[must_use]
pub fn expires_at(&self) -> Time {
self.expires_at
}
#[must_use]
pub fn initial_duration(&self) -> Duration {
self.initial_duration
}
#[must_use]
pub fn state(&self) -> LeaseState {
self.state
}
#[must_use]
pub fn renewal_count(&self) -> u32 {
self.renewal_count
}
#[must_use]
pub fn is_active(&self, now: Time) -> bool {
self.state == LeaseState::Active && now < self.expires_at
}
#[must_use]
pub fn is_expired(&self, now: Time) -> bool {
self.state == LeaseState::Expired
|| (self.state == LeaseState::Active && now >= self.expires_at)
}
#[must_use]
pub fn is_released(&self) -> bool {
self.state == LeaseState::Released
}
#[must_use]
pub fn remaining(&self, now: Time) -> Duration {
if self.state != LeaseState::Active || now >= self.expires_at {
Duration::ZERO
} else {
let nanos = self.expires_at.duration_since(now);
Duration::from_nanos(nanos)
}
}
pub fn renew(&mut self, duration: Duration, now: Time) -> Result<(), LeaseError> {
match self.state {
LeaseState::Released => return Err(LeaseError::Released),
LeaseState::Expired => return Err(LeaseError::Expired),
LeaseState::Active => {}
}
if now >= self.expires_at {
self.state = LeaseState::Expired;
return Err(LeaseError::Expired);
}
self.expires_at = now + duration;
self.renewal_count += 1;
Ok(())
}
pub fn release(&mut self, now: Time) -> Result<(), LeaseError> {
match self.state {
LeaseState::Released => return Err(LeaseError::Released),
LeaseState::Expired => return Err(LeaseError::Expired),
LeaseState::Active => {}
}
if now >= self.expires_at {
self.state = LeaseState::Expired;
return Err(LeaseError::Expired);
}
self.state = LeaseState::Released;
Ok(())
}
pub fn mark_expired(&mut self) -> Result<(), LeaseError> {
match self.state {
LeaseState::Released => return Err(LeaseError::Released),
LeaseState::Expired => return Ok(()), LeaseState::Active => {}
}
self.state = LeaseState::Expired;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct IdempotencyRecord {
pub key: IdempotencyKey,
pub remote_task_id: RemoteTaskId,
pub request: IdempotencyRequestFingerprint,
pub created_at: Time,
pub expires_at: Time,
pub outcome: Option<RemoteOutcome>,
}
#[derive(Clone, Debug)]
pub enum DedupDecision {
New,
Duplicate(IdempotencyRecord),
Conflict,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct IdempotencyRequestFingerprint {
pub computation: ComputationName,
pub input: RemoteInput,
}
impl IdempotencyRequestFingerprint {
#[must_use]
pub fn new(computation: ComputationName, input: RemoteInput) -> Self {
Self { computation, input }
}
#[must_use]
pub fn from_spawn_request(request: &SpawnRequest) -> Self {
Self {
computation: request.computation.clone(),
input: request.input.clone(),
}
}
}
pub struct IdempotencyStore {
entries: DetHashMap<IdempotencyKey, IdempotencyRecord>,
default_ttl: Duration,
}
impl IdempotencyStore {
#[must_use]
pub fn new(default_ttl: Duration) -> Self {
Self {
entries: DetHashMap::default(),
default_ttl,
}
}
#[must_use]
pub fn check(
&mut self,
key: &IdempotencyKey,
request: &IdempotencyRequestFingerprint,
now: Time,
) -> DedupDecision {
let Some(record) = self.entries.get(key).cloned() else {
return DedupDecision::New;
};
if now >= record.expires_at {
let _ = self.entries.remove(key);
return DedupDecision::New;
}
if record.request == *request {
DedupDecision::Duplicate(record)
} else {
DedupDecision::Conflict
}
}
pub fn record(
&mut self,
key: IdempotencyKey,
remote_task_id: RemoteTaskId,
request: IdempotencyRequestFingerprint,
now: Time,
) -> bool {
use std::collections::hash_map::Entry;
match self.entries.entry(key) {
Entry::Vacant(e) => {
let expires_at = now + self.default_ttl;
e.insert(IdempotencyRecord {
key,
remote_task_id,
request,
created_at: now,
expires_at,
outcome: None,
});
true
}
Entry::Occupied(_) => false,
}
}
pub fn complete(&mut self, key: &IdempotencyKey, outcome: RemoteOutcome) -> bool {
match self.entries.get_mut(key) {
Some(record) => {
record.outcome = Some(outcome);
true
}
None => false,
}
}
pub fn evict_expired(&mut self, now: Time) -> usize {
let before = self.entries.len();
self.entries.retain(|_, record| now < record.expires_at);
before - self.entries.len()
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
}
impl fmt::Debug for IdempotencyStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IdempotencyStore")
.field("entries", &self.entries.len())
.field("default_ttl", &self.default_ttl)
.finish()
}
}
pub type StepIndex = usize;
struct CompensationEntry {
step: StepIndex,
description: String,
compensate: Box<dyn FnOnce() -> String + Send>,
}
impl fmt::Debug for CompensationEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CompensationEntry")
.field("step", &self.step)
.field("description", &self.description)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SagaState {
Running,
Completed,
Compensating,
Aborted,
}
impl fmt::Display for SagaState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Running => write!(f, "Running"),
Self::Completed => write!(f, "Completed"),
Self::Compensating => write!(f, "Compensating"),
Self::Aborted => write!(f, "Aborted"),
}
}
}
#[derive(Debug, Clone)]
pub struct SagaStepError {
pub step: StepIndex,
pub description: String,
pub message: String,
}
impl fmt::Display for SagaStepError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"saga step {} ({}) failed: {}",
self.step, self.description, self.message
)
}
}
impl std::error::Error for SagaStepError {}
#[derive(Debug, Clone)]
pub struct CompensationResult {
pub step: StepIndex,
pub description: String,
pub result: String,
}
pub struct Saga {
state: SagaState,
compensations: Vec<CompensationEntry>,
completed_steps: StepIndex,
compensation_results: Vec<CompensationResult>,
}
impl fmt::Debug for Saga {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Saga")
.field("state", &self.state)
.field("completed_steps", &self.completed_steps)
.field("compensations", &self.compensations.len())
.field("compensation_results", &self.compensation_results)
.finish()
}
}
impl Saga {
#[must_use]
pub fn new() -> Self {
Self {
state: SagaState::Running,
compensations: Vec::new(),
completed_steps: 0,
compensation_results: Vec::new(),
}
}
#[must_use]
pub fn state(&self) -> SagaState {
self.state
}
#[must_use]
pub fn completed_steps(&self) -> StepIndex {
self.completed_steps
}
#[must_use]
pub fn compensation_results(&self) -> &[CompensationResult] {
&self.compensation_results
}
pub fn step<T>(
&mut self,
description: &str,
action: impl FnOnce() -> Result<T, String>,
compensate: impl FnOnce() -> String + Send + 'static,
) -> Result<T, SagaStepError> {
assert_eq!(
self.state,
SagaState::Running,
"cannot add steps to a saga that is not Running"
);
let step_idx = self.completed_steps;
match action() {
Ok(value) => {
self.compensations.push(CompensationEntry {
step: step_idx,
description: description.to_string(),
compensate: Box::new(compensate),
});
self.completed_steps += 1;
Ok(value)
}
Err(msg) => {
let err = SagaStepError {
step: step_idx,
description: description.to_string(),
message: msg,
};
self.run_compensations();
Err(err)
}
}
}
pub fn complete(&mut self) {
assert_eq!(
self.state,
SagaState::Running,
"can only complete a Running saga"
);
self.state = SagaState::Completed;
self.compensations.clear();
}
pub fn abort(&mut self) {
assert_eq!(
self.state,
SagaState::Running,
"can only abort a Running saga"
);
self.run_compensations();
}
fn run_compensations(&mut self) {
self.state = SagaState::Compensating;
let compensations: Vec<_> = self.compensations.drain(..).collect();
for entry in compensations.into_iter().rev() {
let result_desc = (entry.compensate)();
self.compensation_results.push(CompensationResult {
step: entry.step,
description: entry.description,
result: result_desc,
});
}
self.state = SagaState::Aborted;
}
}
impl Default for Saga {
fn default() -> Self {
Self::new()
}
}
impl Drop for Saga {
fn drop(&mut self) {
if self.state == SagaState::Running {
if std::thread::panicking() {
self.state = SagaState::Aborted;
return;
}
self.run_compensations();
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct IdempotencyKey(u128);
impl IdempotencyKey {
#[must_use]
pub fn generate(cx: &Cx) -> Self {
let high = cx.random_u64();
let low = cx.random_u64();
Self((u128::from(high) << 64) | u128::from(low))
}
#[must_use]
pub const fn from_raw(value: u128) -> Self {
Self(value)
}
#[must_use]
pub const fn raw(self) -> u128 {
self.0
}
}
impl fmt::Display for IdempotencyKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "IK-{:032x}", self.0)
}
}
#[derive(Clone, Debug)]
pub struct MessageEnvelope<T> {
pub sender: NodeId,
pub sender_time: LogicalTime,
pub payload: T,
}
impl<T> MessageEnvelope<T> {
#[must_use]
pub fn new(sender: NodeId, sender_time: LogicalTime, payload: T) -> Self {
Self {
sender,
sender_time,
payload,
}
}
}
pub trait RemoteTransport {
fn send(
&mut self,
to: &NodeId,
envelope: MessageEnvelope<RemoteMessage>,
) -> Result<(), RemoteError>;
fn try_recv(&mut self) -> Option<MessageEnvelope<RemoteMessage>>;
}
#[derive(Clone, Debug)]
pub enum RemoteMessage {
SpawnRequest(SpawnRequest),
SpawnAck(SpawnAck),
CancelRequest(CancelRequest),
ResultDelivery(ResultDelivery),
LeaseRenewal(LeaseRenewal),
}
impl RemoteMessage {
#[must_use]
pub fn remote_task_id(&self) -> RemoteTaskId {
match self {
Self::SpawnRequest(m) => m.remote_task_id,
Self::SpawnAck(m) => m.remote_task_id,
Self::CancelRequest(m) => m.remote_task_id,
Self::ResultDelivery(m) => m.remote_task_id,
Self::LeaseRenewal(m) => m.remote_task_id,
}
}
}
#[derive(Clone, Debug)]
pub struct SpawnRequest {
pub remote_task_id: RemoteTaskId,
pub computation: ComputationName,
pub input: RemoteInput,
pub lease: Duration,
pub idempotency_key: IdempotencyKey,
pub budget: Option<Budget>,
pub origin_node: NodeId,
pub origin_region: RegionId,
pub origin_task: TaskId,
}
#[derive(Clone, Debug)]
pub struct SpawnAck {
pub remote_task_id: RemoteTaskId,
pub status: SpawnAckStatus,
pub assigned_node: NodeId,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SpawnAckStatus {
Accepted,
Rejected(SpawnRejectReason),
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SpawnRejectReason {
UnknownComputation,
CapacityExceeded,
NodeShuttingDown,
InvalidInput(String),
IdempotencyConflict,
}
impl fmt::Display for SpawnRejectReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UnknownComputation => write!(f, "unknown computation"),
Self::CapacityExceeded => write!(f, "capacity exceeded"),
Self::NodeShuttingDown => write!(f, "node shutting down"),
Self::InvalidInput(msg) => write!(f, "invalid input: {msg}"),
Self::IdempotencyConflict => write!(f, "idempotency conflict"),
}
}
}
#[derive(Clone, Debug)]
pub struct CancelRequest {
pub remote_task_id: RemoteTaskId,
pub reason: CancelReason,
pub origin_node: NodeId,
}
#[derive(Clone, Debug)]
pub struct ResultDelivery {
pub remote_task_id: RemoteTaskId,
pub outcome: RemoteOutcome,
pub execution_time: Duration,
}
#[derive(Clone, Debug)]
pub enum RemoteOutcome {
Success(Vec<u8>),
Failed(String),
Cancelled(CancelReason),
Panicked(String),
}
impl RemoteOutcome {
#[must_use]
pub fn severity(&self) -> crate::types::Severity {
match self {
Self::Success(_) => crate::types::Severity::Ok,
Self::Failed(_) => crate::types::Severity::Err,
Self::Cancelled(_) => crate::types::Severity::Cancelled,
Self::Panicked(_) => crate::types::Severity::Panicked,
}
}
#[must_use]
pub fn is_success(&self) -> bool {
matches!(self, Self::Success(_))
}
}
impl fmt::Display for RemoteOutcome {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Success(_) => write!(f, "Success"),
Self::Failed(msg) => write!(f, "Failed: {msg}"),
Self::Cancelled(reason) => write!(f, "Cancelled: {reason}"),
Self::Panicked(msg) => write!(f, "Panicked: {msg}"),
}
}
}
#[derive(Clone, Debug)]
pub struct LeaseRenewal {
pub remote_task_id: RemoteTaskId,
pub new_lease: Duration,
pub current_state: RemoteTaskState,
pub node: NodeId,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RemoteProtocolError {
RemoteTaskIdMismatch {
expected: RemoteTaskId,
got: RemoteTaskId,
},
UnexpectedAckStatus {
expected: &'static str,
got: SpawnAckStatus,
},
}
impl fmt::Display for RemoteProtocolError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::RemoteTaskIdMismatch { expected, got } => {
write!(f, "remote task id mismatch: expected {expected}, got {got}")
}
Self::UnexpectedAckStatus { expected, got } => write!(
f,
"unexpected spawn ack status: expected {expected}, got {got:?}"
),
}
}
}
impl std::error::Error for RemoteProtocolError {}
#[derive(Debug)]
pub struct OriginInit;
#[derive(Debug)]
pub struct OriginSpawned;
#[derive(Debug)]
pub struct OriginRunning;
#[derive(Debug)]
pub struct OriginCancelSent;
#[derive(Debug)]
pub struct OriginLeaseExpired;
#[derive(Debug)]
pub struct OriginCompleted;
#[derive(Debug)]
pub struct OriginRejected;
#[derive(Debug)]
pub struct RemoteInit;
#[derive(Debug)]
pub struct RemoteSpawnReceived;
#[derive(Debug)]
pub struct RemoteCancelPending;
#[derive(Debug)]
pub struct RemoteRunning;
#[derive(Debug)]
pub struct RemoteCancelReceived;
#[derive(Debug)]
pub struct RemoteCompleted;
#[derive(Debug)]
pub struct RemoteRejected;
#[must_use = "OriginSession must be advanced to completion or rejected"]
#[derive(Debug)]
pub struct OriginSession<S> {
remote_task_id: RemoteTaskId,
_state: PhantomData<S>,
}
impl OriginSession<OriginInit> {
pub fn new(remote_task_id: RemoteTaskId) -> Self {
Self {
remote_task_id,
_state: PhantomData,
}
}
pub fn send_spawn(
self,
req: &SpawnRequest,
) -> Result<OriginSession<OriginSpawned>, RemoteProtocolError> {
self.ensure_id(req.remote_task_id)?;
Ok(self.transition())
}
}
impl<S> OriginSession<S> {
#[must_use]
pub fn remote_task_id(&self) -> RemoteTaskId {
self.remote_task_id
}
fn ensure_id(&self, got: RemoteTaskId) -> Result<(), RemoteProtocolError> {
if self.remote_task_id == got {
Ok(())
} else {
Err(RemoteProtocolError::RemoteTaskIdMismatch {
expected: self.remote_task_id,
got,
})
}
}
fn transition<T>(self) -> OriginSession<T> {
OriginSession {
remote_task_id: self.remote_task_id,
_state: PhantomData,
}
}
}
pub enum OriginAckOutcome {
Accepted(OriginSession<OriginRunning>),
Rejected(OriginSession<OriginRejected>),
}
pub enum OriginCancelAckOutcome {
Accepted(OriginSession<OriginCancelSent>),
Rejected(OriginSession<OriginRejected>),
}
impl OriginSession<OriginSpawned> {
pub fn recv_spawn_ack(self, ack: &SpawnAck) -> Result<OriginAckOutcome, RemoteProtocolError> {
self.ensure_id(ack.remote_task_id)?;
match ack.status {
SpawnAckStatus::Accepted => Ok(OriginAckOutcome::Accepted(self.transition())),
SpawnAckStatus::Rejected(_) => Ok(OriginAckOutcome::Rejected(self.transition())),
}
}
pub fn send_cancel(
self,
cancel: &CancelRequest,
) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
self.ensure_id(cancel.remote_task_id)?;
Ok(self.transition())
}
}
impl OriginSession<OriginRunning> {
pub fn recv_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
self.ensure_id(renewal.remote_task_id)?;
Ok(self)
}
pub fn send_cancel(
self,
cancel: &CancelRequest,
) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
self.ensure_id(cancel.remote_task_id)?;
Ok(self.transition())
}
pub fn recv_result(
self,
result: &ResultDelivery,
) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
self.ensure_id(result.remote_task_id)?;
Ok(self.transition())
}
pub fn lease_expired(self) -> OriginSession<OriginLeaseExpired> {
self.transition()
}
}
impl OriginSession<OriginCancelSent> {
pub fn recv_spawn_ack(
self,
ack: &SpawnAck,
) -> Result<OriginCancelAckOutcome, RemoteProtocolError> {
self.ensure_id(ack.remote_task_id)?;
match ack.status {
SpawnAckStatus::Accepted => Ok(OriginCancelAckOutcome::Accepted(self)),
SpawnAckStatus::Rejected(_) => Ok(OriginCancelAckOutcome::Rejected(self.transition())),
}
}
pub fn recv_result(
self,
result: &ResultDelivery,
) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
self.ensure_id(result.remote_task_id)?;
Ok(self.transition())
}
pub fn recv_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
self.ensure_id(renewal.remote_task_id)?;
Ok(self)
}
}
impl OriginSession<OriginLeaseExpired> {
pub fn send_cancel(
self,
cancel: &CancelRequest,
) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
self.ensure_id(cancel.remote_task_id)?;
Ok(self.transition())
}
pub fn recv_result(
self,
result: &ResultDelivery,
) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
self.ensure_id(result.remote_task_id)?;
Ok(self.transition())
}
}
#[must_use = "RemoteSession must be advanced to completion or rejected"]
#[derive(Debug)]
pub struct RemoteSession<S> {
remote_task_id: RemoteTaskId,
_state: PhantomData<S>,
}
impl RemoteSession<RemoteInit> {
pub fn new(remote_task_id: RemoteTaskId) -> Self {
Self {
remote_task_id,
_state: PhantomData,
}
}
pub fn recv_spawn(
self,
req: &SpawnRequest,
) -> Result<RemoteSession<RemoteSpawnReceived>, RemoteProtocolError> {
self.ensure_id(req.remote_task_id)?;
Ok(self.transition())
}
}
impl<S> RemoteSession<S> {
#[must_use]
pub fn remote_task_id(&self) -> RemoteTaskId {
self.remote_task_id
}
fn ensure_id(&self, got: RemoteTaskId) -> Result<(), RemoteProtocolError> {
if self.remote_task_id == got {
Ok(())
} else {
Err(RemoteProtocolError::RemoteTaskIdMismatch {
expected: self.remote_task_id,
got,
})
}
}
fn transition<T>(self) -> RemoteSession<T> {
RemoteSession {
remote_task_id: self.remote_task_id,
_state: PhantomData,
}
}
}
impl RemoteSession<RemoteSpawnReceived> {
pub fn send_ack_accepted(
self,
ack: &SpawnAck,
) -> Result<RemoteSession<RemoteRunning>, RemoteProtocolError> {
self.ensure_id(ack.remote_task_id)?;
match ack.status {
SpawnAckStatus::Accepted => Ok(self.transition()),
SpawnAckStatus::Rejected(_) => Err(RemoteProtocolError::UnexpectedAckStatus {
expected: "Accepted",
got: ack.status.clone(),
}),
}
}
pub fn send_ack_rejected(
self,
ack: &SpawnAck,
) -> Result<RemoteSession<RemoteRejected>, RemoteProtocolError> {
self.ensure_id(ack.remote_task_id)?;
match ack.status {
SpawnAckStatus::Rejected(_) => Ok(self.transition()),
SpawnAckStatus::Accepted => Err(RemoteProtocolError::UnexpectedAckStatus {
expected: "Rejected",
got: ack.status.clone(),
}),
}
}
pub fn recv_cancel(
self,
cancel: &CancelRequest,
) -> Result<RemoteSession<RemoteCancelPending>, RemoteProtocolError> {
self.ensure_id(cancel.remote_task_id)?;
Ok(self.transition())
}
}
impl RemoteSession<RemoteCancelPending> {
pub fn send_ack_accepted(
self,
ack: &SpawnAck,
) -> Result<RemoteSession<RemoteCancelReceived>, RemoteProtocolError> {
self.ensure_id(ack.remote_task_id)?;
match ack.status {
SpawnAckStatus::Accepted => Ok(self.transition()),
SpawnAckStatus::Rejected(_) => Err(RemoteProtocolError::UnexpectedAckStatus {
expected: "Accepted",
got: ack.status.clone(),
}),
}
}
pub fn send_ack_rejected(
self,
ack: &SpawnAck,
) -> Result<RemoteSession<RemoteRejected>, RemoteProtocolError> {
self.ensure_id(ack.remote_task_id)?;
match ack.status {
SpawnAckStatus::Rejected(_) => Ok(self.transition()),
SpawnAckStatus::Accepted => Err(RemoteProtocolError::UnexpectedAckStatus {
expected: "Rejected",
got: ack.status.clone(),
}),
}
}
}
impl RemoteSession<RemoteRunning> {
pub fn recv_cancel(
self,
cancel: &CancelRequest,
) -> Result<RemoteSession<RemoteCancelReceived>, RemoteProtocolError> {
self.ensure_id(cancel.remote_task_id)?;
Ok(self.transition())
}
pub fn send_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
self.ensure_id(renewal.remote_task_id)?;
Ok(self)
}
pub fn send_result(
self,
result: &ResultDelivery,
) -> Result<RemoteSession<RemoteCompleted>, RemoteProtocolError> {
self.ensure_id(result.remote_task_id)?;
Ok(self.transition())
}
}
impl RemoteSession<RemoteCancelReceived> {
pub fn send_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
self.ensure_id(renewal.remote_task_id)?;
Ok(self)
}
pub fn send_result(
self,
result: &ResultDelivery,
) -> Result<RemoteSession<RemoteCompleted>, RemoteProtocolError> {
self.ensure_id(result.remote_task_id)?;
Ok(self.transition())
}
}
pub mod trace_events {
pub const SPAWN_REQUEST_CREATED: &str = "remote::spawn_request_created";
pub const SPAWN_REQUEST_SENT: &str = "remote::spawn_request_sent";
pub const SPAWN_ACK_RECEIVED: &str = "remote::spawn_ack_received";
pub const SPAWN_REJECTED: &str = "remote::spawn_rejected";
pub const CANCEL_SENT: &str = "remote::cancel_sent";
pub const CANCEL_RECEIVED: &str = "remote::cancel_received";
pub const RESULT_DELIVERED: &str = "remote::result_delivered";
pub const LEASE_RENEWAL_SENT: &str = "remote::lease_renewal_sent";
pub const LEASE_RENEWAL_RECEIVED: &str = "remote::lease_renewal_received";
pub const LEASE_EXPIRED: &str = "remote::lease_expired";
}
#[cfg(test)]
mod tests {
use super::*;
use parking_lot::Mutex;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn lamport_raw(time: &LogicalTime) -> u64 {
match time {
LogicalTime::Lamport(time) => time.raw(),
other => panic!("expected Lamport logical time, got {other:?}"),
}
}
fn test_request_fingerprint(name: &str) -> IdempotencyRequestFingerprint {
IdempotencyRequestFingerprint::new(ComputationName::new(name), RemoteInput::empty())
}
#[test]
fn node_id_basics() {
let node = NodeId::new("worker-1");
assert_eq!(node.as_str(), "worker-1");
assert_eq!(format!("{node}"), "Node(worker-1)");
let node2 = NodeId::new("worker-1");
assert_eq!(node, node2);
let node3 = NodeId::new("worker-2");
assert_ne!(node, node3);
}
#[test]
fn computation_name_basics() {
let name = ComputationName::new("encode_block");
assert_eq!(name.as_str(), "encode_block");
assert_eq!(format!("{name}"), "encode_block");
let name2 = ComputationName::new("encode_block");
assert_eq!(name, name2);
}
#[test]
fn remote_input_basics() {
let input = RemoteInput::new(vec![1, 2, 3]);
assert_eq!(input.data(), &[1, 2, 3]);
assert_eq!(input.len(), 3);
assert!(!input.is_empty());
let empty = RemoteInput::empty();
assert!(empty.is_empty());
assert_eq!(empty.len(), 0);
let owned = input.into_data();
assert_eq!(owned, vec![1, 2, 3]);
}
#[test]
fn remote_cap_defaults() {
let cap = RemoteCap::new();
assert_eq!(cap.default_lease(), Duration::from_secs(30));
assert!(cap.remote_budget().is_none());
assert_eq!(cap.local_node().as_str(), "local");
assert_eq!(cap.phase0_simulation(), &Phase0SimulationConfig::default());
}
#[test]
fn remote_cap_builder() {
let cap = RemoteCap::new()
.with_default_lease(Duration::from_secs(60))
.with_remote_budget(Budget::INFINITE)
.with_local_node(NodeId::new("origin-a"))
.with_phase0_failure(Phase0RemoteFailure::NodeDown)
.with_phase0_timeout(Duration::from_secs(2));
assert_eq!(cap.default_lease(), Duration::from_secs(60));
assert!(cap.remote_budget().is_some());
assert_eq!(cap.local_node().as_str(), "origin-a");
assert_eq!(
cap.phase0_simulation().failure,
Phase0RemoteFailure::NodeDown
);
assert_eq!(cap.phase0_simulation().timeout, Duration::from_secs(2));
}
#[derive(Debug, Default)]
struct CaptureRuntime {
sent: Mutex<Vec<(NodeId, MessageEnvelope<RemoteMessage>)>>,
}
impl RemoteRuntime for CaptureRuntime {
fn send_message(
&self,
destination: &NodeId,
envelope: MessageEnvelope<RemoteMessage>,
) -> Result<(), RemoteError> {
self.sent.lock().push((destination.clone(), envelope));
Ok(())
}
fn register_task(
&self,
_task_id: RemoteTaskId,
_tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
) {
}
}
#[derive(Debug, Default)]
struct FailingSendRuntime {
registered: Mutex<Vec<RemoteTaskId>>,
unregistered: Mutex<Vec<RemoteTaskId>>,
}
impl RemoteRuntime for FailingSendRuntime {
fn send_message(
&self,
_destination: &NodeId,
_envelope: MessageEnvelope<RemoteMessage>,
) -> Result<(), RemoteError> {
Err(RemoteError::TransportError("simulated send failure".into()))
}
fn register_task(
&self,
task_id: RemoteTaskId,
_tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
) {
self.registered.lock().push(task_id);
}
fn unregister_task(&self, task_id: RemoteTaskId) {
self.unregistered.lock().push(task_id);
}
}
#[derive(Debug, Default)]
struct LifecycleRuntime {
sent: Mutex<Vec<(NodeId, MessageEnvelope<RemoteMessage>)>>,
pending: Mutex<BTreeMap<RemoteTaskId, oneshot::Sender<Result<RemoteOutcome, RemoteError>>>>,
states: Mutex<BTreeMap<RemoteTaskId, RemoteTaskState>>,
}
impl LifecycleRuntime {
fn mark_state(&self, task_id: RemoteTaskId, state: RemoteTaskState) {
self.states.lock().insert(task_id, state);
}
fn close_sender_preserving_state(&self, task_id: RemoteTaskId) {
self.pending.lock().remove(&task_id);
}
fn deliver(
&self,
cx: &Cx,
task_id: RemoteTaskId,
result: Result<RemoteOutcome, RemoteError>,
) {
let state = match &result {
Ok(RemoteOutcome::Success(_)) => RemoteTaskState::Completed,
Ok(RemoteOutcome::Cancelled(_)) | Err(RemoteError::Cancelled(_)) => {
RemoteTaskState::Cancelled
}
Err(RemoteError::LeaseExpired) => RemoteTaskState::LeaseExpired,
Ok(RemoteOutcome::Failed(_) | RemoteOutcome::Panicked(_)) | Err(_) => {
RemoteTaskState::Failed
}
};
self.states.lock().insert(task_id, state);
let tx = self
.pending
.lock()
.remove(&task_id)
.expect("pending remote task");
if tx.send(cx, result).is_err() {
self.states.lock().remove(&task_id);
}
}
fn sent_messages(&self) -> Vec<(NodeId, MessageEnvelope<RemoteMessage>)> {
self.sent.lock().clone()
}
}
impl RemoteRuntime for LifecycleRuntime {
fn send_message(
&self,
destination: &NodeId,
envelope: MessageEnvelope<RemoteMessage>,
) -> Result<(), RemoteError> {
self.sent.lock().push((destination.clone(), envelope));
Ok(())
}
fn register_task(
&self,
task_id: RemoteTaskId,
tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
) {
self.pending.lock().insert(task_id, tx);
self.states.lock().insert(task_id, RemoteTaskState::Pending);
}
fn observe_task_state(&self, task_id: RemoteTaskId) -> Option<RemoteTaskState> {
self.states.lock().get(&task_id).copied()
}
fn clear_task_state(&self, task_id: RemoteTaskId) {
self.pending.lock().remove(&task_id);
self.states.lock().remove(&task_id);
}
fn unregister_task(&self, task_id: RemoteTaskId) {
self.pending.lock().remove(&task_id);
self.states.lock().remove(&task_id);
}
}
fn fast_phase0_cap() -> RemoteCap {
RemoteCap::new().with_phase0_failure(Phase0RemoteFailure::NodeUnreachable)
}
fn timeout_phase0_cap() -> RemoteCap {
RemoteCap::new().with_phase0_failure(Phase0RemoteFailure::Timeout)
}
#[test]
fn spawn_remote_uses_cap_local_node_for_origin() {
let runtime = Arc::new(CaptureRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let (destination, envelope) = {
let sent = runtime.sent.lock();
assert_eq!(sent.len(), 1);
sent[0].clone()
};
assert_eq!(destination.as_str(), "worker-1");
assert_eq!(envelope.sender.as_str(), "origin-a");
assert!(lamport_raw(&envelope.sender_time) > 0);
match &envelope.payload {
RemoteMessage::SpawnRequest(req) => {
assert_eq!(req.remote_task_id, handle.remote_task_id());
assert_eq!(req.origin_node.as_str(), "origin-a");
}
other => unreachable!("expected SpawnRequest, got {other:?}"),
}
}
#[test]
fn remote_handle_abort_with_attached_runtime_sends_cancel_request() {
let runtime = Arc::new(CaptureRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
handle.abort(&cx);
let (destination, envelope, spawn_time) = {
let sent = runtime.sent.lock();
assert_eq!(sent.len(), 2);
(
sent[1].0.clone(),
sent[1].1.clone(),
lamport_raw(&sent[0].1.sender_time),
)
};
assert_eq!(destination.as_str(), "worker-1");
assert_eq!(envelope.sender.as_str(), "origin-a");
assert!(lamport_raw(&envelope.sender_time) > spawn_time);
match &envelope.payload {
RemoteMessage::CancelRequest(req) => {
assert_eq!(req.remote_task_id, handle.remote_task_id());
assert_eq!(req.origin_node.as_str(), "origin-a");
assert_eq!(req.reason, CancelReason::user("remote handle abort"));
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
}
#[test]
fn remote_handle_abort_uses_handle_origin_even_with_different_caller_cap() {
let runtime = Arc::new(CaptureRuntime::default());
let spawn_cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let spawn_cx: Cx = Cx::for_testing_with_remote(spawn_cap);
let handle = spawn_remote(
&spawn_cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let abort_cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-b"))
.with_runtime(runtime.clone());
let abort_cx: Cx = Cx::for_testing_with_remote(abort_cap);
let expected_reason = CancelReason::deadline();
abort_cx.set_cancel_reason(expected_reason.clone());
handle.abort(&abort_cx);
let (destination, envelope, spawn_time) = {
let sent = runtime.sent.lock();
assert_eq!(sent.len(), 2);
(
sent[1].0.clone(),
sent[1].1.clone(),
lamport_raw(&sent[0].1.sender_time),
)
};
assert_eq!(destination.as_str(), "worker-1");
assert_eq!(envelope.sender.as_str(), "origin-a");
assert!(lamport_raw(&envelope.sender_time) > spawn_time);
match &envelope.payload {
RemoteMessage::CancelRequest(req) => {
assert_eq!(req.remote_task_id, handle.remote_task_id());
assert_eq!(req.origin_node.as_str(), "origin-a");
assert_eq!(req.reason, expected_reason);
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
}
#[test]
fn spawn_remote_send_failure_unregisters_pending_task() {
let runtime = Arc::new(FailingSendRuntime::default());
let cap = RemoteCap::new().with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let err = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect_err("spawn_remote should fail when send_message fails");
match err {
RemoteError::TransportError(msg) => {
assert!(msg.contains("simulated send failure"));
}
other => unreachable!("expected TransportError, got {other:?}"),
}
let registered = runtime.registered.lock().clone();
let unregistered = runtime.unregistered.lock().clone();
assert_eq!(registered.len(), 1);
assert_eq!(unregistered, registered);
}
#[test]
fn remote_task_id_uniqueness() {
let id1 = RemoteTaskId::next();
let id2 = RemoteTaskId::next();
assert_ne!(id1, id2);
assert!(id2.raw() > id1.raw());
}
#[test]
fn remote_task_state_display() {
assert_eq!(format!("{}", RemoteTaskState::Pending), "Pending");
assert_eq!(format!("{}", RemoteTaskState::Running), "Running");
assert_eq!(format!("{}", RemoteTaskState::Completed), "Completed");
assert_eq!(format!("{}", RemoteTaskState::LeaseExpired), "LeaseExpired");
}
#[test]
fn remote_error_display() {
let err = RemoteError::NoCapability;
assert_eq!(format!("{err}"), "remote capability not available");
let err = RemoteError::NodeUnreachable("worker-9".into());
assert!(format!("{err}").contains("worker-9"));
let err = RemoteError::NodeDown("worker-9".into());
assert!(format!("{err}").contains("worker-9"));
let err = RemoteError::UnknownComputation("bad_fn".into());
assert!(format!("{err}").contains("bad_fn"));
}
#[test]
fn spawn_remote_without_cap_fails() {
let cx: Cx = Cx::for_testing();
assert!(!cx.has_remote());
let result = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode"),
RemoteInput::empty(),
);
assert!(result.is_err());
assert_eq!(result.unwrap_err(), RemoteError::NoCapability);
}
#[test]
fn spawn_remote_with_cap_succeeds() {
let cx: Cx = Cx::for_testing_with_remote(fast_phase0_cap());
assert!(cx.has_remote());
let result = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![42]),
);
assert!(result.is_ok());
let handle = result.unwrap();
assert_eq!(handle.node().as_str(), "worker-1");
assert_eq!(handle.computation().as_str(), "encode_block");
assert_eq!(handle.state(), RemoteTaskState::Failed);
assert!(handle.is_finished());
assert_eq!(handle.lease(), Duration::from_secs(30));
assert!(handle.local_task_id().is_none());
}
#[test]
fn remote_handle_debug() {
let cx: Cx = Cx::for_testing_with_remote(fast_phase0_cap());
let handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("compute"),
RemoteInput::empty(),
)
.unwrap();
let debug = format!("{handle:?}");
assert!(debug.contains("RemoteHandle"));
assert!(debug.contains("n1"));
assert!(debug.contains("compute"));
}
#[test]
fn remote_handle_phase0_fallback_finishes_immediately() {
let cx: Cx = Cx::for_testing_with_remote(fast_phase0_cap());
let handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("add"),
RemoteInput::empty(),
)
.unwrap();
assert!(handle.is_finished());
assert_eq!(handle.state(), RemoteTaskState::Failed);
}
#[test]
fn remote_handle_try_join_phase0_fallback_returns_configured_error() {
let cx: Cx = Cx::for_testing_with_remote(fast_phase0_cap());
let mut handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("work"),
RemoteInput::empty(),
)
.unwrap();
let err = handle
.try_join()
.expect_err("phase-0 fallback should fail explicitly");
assert!(matches!(err, RemoteError::NodeUnreachable(_)));
assert_eq!(handle.state(), RemoteTaskState::Failed);
}
#[test]
fn remote_handle_join_updates_terminal_state() {
let cap = RemoteCap::new().with_phase0_simulation(Phase0SimulationConfig {
failure: Phase0RemoteFailure::NodeDown,
retry: Phase0RetryPolicy {
max_attempts: 2,
initial_backoff: Duration::from_millis(2),
max_backoff: Duration::from_millis(2),
},
timeout: Duration::from_millis(20),
});
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("join-state"),
RemoteInput::empty(),
)
.expect("spawn");
assert_eq!(handle.state(), RemoteTaskState::Failed);
let result = futures_lite::future::block_on(handle.join(&cx));
assert!(matches!(result, Err(RemoteError::NodeDown(_))));
assert_eq!(handle.state(), RemoteTaskState::Failed);
assert!(handle.is_finished());
}
#[test]
fn remote_handle_try_join_updates_terminal_state() {
let cx: Cx = Cx::for_testing_with_remote(fast_phase0_cap());
let mut handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("try-join-state"),
RemoteInput::empty(),
)
.expect("spawn");
let err = handle
.try_join()
.expect_err("phase-0 fallback should fail explicitly");
assert!(matches!(err, RemoteError::NodeUnreachable(_)));
assert_eq!(handle.state(), RemoteTaskState::Failed);
}
#[test]
fn remote_handle_phase0_timeout_maps_to_cancelled_state() {
let cx: Cx = Cx::for_testing_with_remote(timeout_phase0_cap());
let mut handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("timeout-state"),
RemoteInput::empty(),
)
.expect("spawn");
assert_eq!(handle.state(), RemoteTaskState::Cancelled);
let result = futures_lite::future::block_on(handle.join(&cx));
assert!(matches!(
result,
Err(RemoteError::Cancelled(reason)) if reason.kind == crate::types::CancelKind::Timeout
));
assert_eq!(handle.state(), RemoteTaskState::Cancelled);
}
#[test]
fn remote_handle_state_observes_runtime_lifecycle() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
assert_eq!(handle.state(), RemoteTaskState::Pending);
runtime.mark_state(handle.remote_task_id(), RemoteTaskState::Running);
assert_eq!(handle.state(), RemoteTaskState::Running);
runtime.deliver(
&cx,
handle.remote_task_id(),
Ok(RemoteOutcome::Success(vec![9, 9, 9])),
);
assert_eq!(handle.state(), RemoteTaskState::Completed);
let outcome = handle.try_join().expect("result").expect("outcome");
assert!(matches!(outcome, RemoteOutcome::Success(_)));
assert!(
runtime
.observe_task_state(handle.remote_task_id())
.is_none()
);
}
#[test]
fn remote_handle_close_skips_cancel_when_runtime_result_is_already_buffered() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
runtime.mark_state(handle.remote_task_id(), RemoteTaskState::Running);
runtime.deliver(
&cx,
handle.remote_task_id(),
Ok(RemoteOutcome::Cancelled(CancelReason::user(
"closed remotely",
))),
);
runtime.mark_state(handle.remote_task_id(), RemoteTaskState::Running);
let outcome = futures_lite::future::block_on(handle.close(&cx)).expect("close");
assert!(matches!(outcome, RemoteOutcome::Cancelled(_)));
assert_eq!(handle.state(), RemoteTaskState::Cancelled);
assert!(
runtime
.observe_task_state(handle.remote_task_id())
.is_none()
);
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
1,
"ready terminal result should not trigger a late cancel"
);
}
#[test]
fn remote_handle_close_ignores_caller_cancellation_until_terminal_result_arrives() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
cx.set_cancel_reason(CancelReason::deadline());
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let outcome = {
let mut close = std::pin::pin!(handle.close(&cx));
assert!(matches!(
std::future::Future::poll(close.as_mut(), &mut task_cx),
Poll::Pending
));
assert_eq!(
runtime.observe_task_state(remote_task_id),
Some(RemoteTaskState::Running)
);
runtime.deliver(
&cx,
remote_task_id,
Ok(RemoteOutcome::Cancelled(CancelReason::user(
"closed remotely",
))),
);
match std::future::Future::poll(close.as_mut(), &mut task_cx) {
Poll::Ready(outcome) => outcome,
Poll::Pending => panic!("close should drain terminal result"),
}
};
assert!(matches!(
outcome,
Ok(RemoteOutcome::Cancelled(reason)) if reason == CancelReason::user("closed remotely")
));
assert!(runtime.observe_task_state(remote_task_id).is_none());
let sent = runtime.sent_messages();
assert_eq!(sent.len(), 2);
assert!(lamport_raw(&sent[1].1.sender_time) > lamport_raw(&sent[0].1.sender_time));
match &sent[1].1.payload {
RemoteMessage::CancelRequest(cancel) => {
assert_eq!(cancel.remote_task_id, remote_task_id);
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
}
#[test]
fn remote_handle_close_with_plain_context_still_requests_cancel_when_runtime_attached() {
let runtime = Arc::new(LifecycleRuntime::default());
let spawn_cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let spawn_cx: Cx = Cx::for_testing_with_remote(spawn_cap);
let close_cx: Cx = Cx::for_testing();
let expected_reason = CancelReason::deadline();
close_cx.set_cancel_reason(expected_reason.clone());
let mut handle = spawn_remote(
&spawn_cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let outcome = {
let mut close = std::pin::pin!(handle.close(&close_cx));
assert!(matches!(
std::future::Future::poll(close.as_mut(), &mut task_cx),
Poll::Pending
));
let sent = runtime.sent_messages();
assert_eq!(sent.len(), 2);
match &sent[1].1.payload {
RemoteMessage::CancelRequest(cancel) => {
assert_eq!(cancel.remote_task_id, remote_task_id);
assert_eq!(cancel.origin_node.as_str(), "origin-a");
assert_eq!(cancel.reason, expected_reason);
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
runtime.deliver(
&spawn_cx,
remote_task_id,
Ok(RemoteOutcome::Cancelled(CancelReason::user(
"closed remotely",
))),
);
match std::future::Future::poll(close.as_mut(), &mut task_cx) {
Poll::Ready(outcome) => outcome,
Poll::Pending => panic!("close should drain terminal result"),
}
};
assert!(matches!(
outcome,
Ok(RemoteOutcome::Cancelled(reason)) if reason == CancelReason::user("closed remotely")
));
assert_eq!(handle.state(), RemoteTaskState::Cancelled);
assert!(runtime.observe_task_state(remote_task_id).is_none());
}
#[test]
fn remote_handle_join_closed_channel_preserves_runtime_lease_expired_state() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::LeaseExpired);
runtime.close_sender_preserving_state(remote_task_id);
let err = futures_lite::future::block_on(handle.join(&cx))
.expect_err("closed channel should surface the observed lease-expired state");
assert_eq!(err, RemoteError::LeaseExpired);
assert_eq!(handle.state(), RemoteTaskState::LeaseExpired);
assert!(runtime.observe_task_state(remote_task_id).is_none());
}
#[test]
fn remote_handle_join_closed_channel_reports_transport_error_for_failed_state() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Failed);
runtime.close_sender_preserving_state(remote_task_id);
let err = futures_lite::future::block_on(handle.join(&cx))
.expect_err("closed terminal failed channel should surface a transport error");
assert!(matches!(
err,
RemoteError::TransportError(msg) if msg.contains("Failed")
));
assert_eq!(handle.state(), RemoteTaskState::Failed);
assert!(runtime.observe_task_state(remote_task_id).is_none());
}
#[test]
fn remote_handle_try_join_closed_channel_reports_transport_error_for_completed_state() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Completed);
runtime.close_sender_preserving_state(remote_task_id);
let err = handle
.try_join()
.expect_err("closed terminal completed channel should surface a transport error");
assert!(matches!(
err,
RemoteError::TransportError(msg) if msg.contains("Completed")
));
assert_eq!(handle.state(), RemoteTaskState::Completed);
assert!(runtime.observe_task_state(remote_task_id).is_none());
}
#[test]
fn remote_handle_close_closed_channel_still_requests_cancel_for_live_runtime_task() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
runtime.close_sender_preserving_state(remote_task_id);
let err = futures_lite::future::block_on(handle.close(&cx))
.expect_err("closed live channel should still fail the close");
assert_eq!(err, RemoteError::Cancelled(RemoteHandle::closed_reason()));
assert_eq!(handle.state(), RemoteTaskState::Cancelled);
assert!(runtime.observe_task_state(remote_task_id).is_none());
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
2,
"close should still send a best-effort cancel when the runtime still observes a live remote task"
);
match &sent[1].1.payload {
RemoteMessage::CancelRequest(cancel) => {
assert_eq!(cancel.remote_task_id, remote_task_id);
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
}
#[test]
fn remote_handle_drop_cancels_live_runtime_task_but_preserves_runtime_state() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let remote_task_id = {
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
remote_task_id
};
assert_eq!(
runtime.observe_task_state(remote_task_id),
Some(RemoteTaskState::Running),
"dropping a live handle must preserve runtime bookkeeping until the remote lifecycle finishes"
);
let sent = runtime.sent_messages();
assert_eq!(sent.len(), 2, "spawn + best-effort drop cancel");
assert!(lamport_raw(&sent[1].1.sender_time) > lamport_raw(&sent[0].1.sender_time));
match &sent[1].1.payload {
RemoteMessage::CancelRequest(cancel) => {
assert_eq!(cancel.remote_task_id, remote_task_id);
assert_eq!(cancel.reason, CancelReason::user("remote handle dropped"));
assert_eq!(cancel.origin_node.as_str(), "origin-a");
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
}
#[test]
fn remote_handle_drop_clears_runtime_state_after_terminal_result_is_observed() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let remote_task_id = {
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.deliver(
&cx,
remote_task_id,
Ok(RemoteOutcome::Success(vec![7, 8, 9])),
);
remote_task_id
};
assert!(
runtime.observe_task_state(remote_task_id).is_none(),
"dropping a handle after the runtime observes a terminal result should clear bookkeeping"
);
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
1,
"terminal drop should not send an extra cancel"
);
}
#[test]
fn remote_handle_abort_skips_cancel_when_terminal_result_is_already_buffered() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.deliver(
&cx,
remote_task_id,
Ok(RemoteOutcome::Success(vec![7, 8, 9])),
);
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
handle.abort(&cx);
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
1,
"buffered terminal result should suppress late cancel"
);
let outcome = handle.try_join().expect("result").expect("outcome");
assert!(matches!(outcome, RemoteOutcome::Success(_)));
assert_eq!(handle.state(), RemoteTaskState::Completed);
}
#[test]
fn remote_handle_drop_skips_cancel_when_terminal_result_is_buffered_but_state_is_stale() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let remote_task_id = {
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.deliver(
&cx,
remote_task_id,
Ok(RemoteOutcome::Success(vec![7, 8, 9])),
);
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
remote_task_id
};
assert!(
runtime.observe_task_state(remote_task_id).is_none(),
"dropping with a buffered terminal result should clear runtime bookkeeping even if observed state was stale"
);
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
1,
"stale running state must not trigger a late drop cancel"
);
}
#[test]
fn remote_handle_drop_preserves_terminal_runtime_state_until_sender_settles() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let remote_task_id = {
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Completed);
remote_task_id
};
assert_eq!(
runtime.observe_task_state(remote_task_id),
Some(RemoteTaskState::Completed),
"dropping after the runtime observes a terminal state must preserve bookkeeping until the terminal sender settles"
);
runtime.deliver(
&cx,
remote_task_id,
Ok(RemoteOutcome::Success(vec![7, 8, 9])),
);
assert!(
runtime.observe_task_state(remote_task_id).is_none(),
"once the terminal sender settles into a dropped receiver, runtime bookkeeping should clear"
);
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
1,
"terminal drop should not emit a late cancel while waiting for sender cleanup"
);
}
#[test]
fn remote_handle_abort_closed_channel_still_requests_cancel_for_live_runtime_task() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
runtime.close_sender_preserving_state(remote_task_id);
handle.abort(&cx);
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
2,
"explicit abort should still fence a live remote task even if the result sender already disappeared"
);
match &sent[1].1.payload {
RemoteMessage::CancelRequest(cancel) => {
assert_eq!(cancel.remote_task_id, remote_task_id);
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
assert_eq!(
runtime.observe_task_state(remote_task_id),
Some(RemoteTaskState::Running)
);
}
#[test]
fn remote_handle_is_finished_stays_false_for_closed_channel_while_runtime_task_is_live() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
runtime.close_sender_preserving_state(remote_task_id);
assert!(
!handle.is_finished(),
"closed result channel without a buffered terminal result must not look finished"
);
assert_eq!(handle.state(), RemoteTaskState::Running);
let err = futures_lite::future::block_on(handle.close(&cx))
.expect_err("closed live channel should still fail the close");
assert_eq!(err, RemoteError::Cancelled(RemoteHandle::closed_reason()));
assert!(
handle.is_finished(),
"close should transition the handle to a terminal state"
);
assert_eq!(handle.state(), RemoteTaskState::Cancelled);
assert!(runtime.observe_task_state(remote_task_id).is_none());
}
#[test]
fn remote_handle_drop_closed_channel_still_requests_cancel_for_live_runtime_task() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let remote_task_id = {
let handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
let remote_task_id = handle.remote_task_id();
runtime.mark_state(remote_task_id, RemoteTaskState::Running);
runtime.close_sender_preserving_state(remote_task_id);
remote_task_id
};
let sent = runtime.sent_messages();
assert_eq!(
sent.len(),
2,
"dropping a live handle must still request cancel even if the result sender already disappeared"
);
match &sent[1].1.payload {
RemoteMessage::CancelRequest(cancel) => {
assert_eq!(cancel.remote_task_id, remote_task_id);
}
other => unreachable!("expected CancelRequest, got {other:?}"),
}
assert_eq!(
runtime.observe_task_state(remote_task_id),
Some(RemoteTaskState::Running)
);
}
#[test]
fn remote_handle_runtime_rejection_fails_closed() {
let runtime = Arc::new(LifecycleRuntime::default());
let cap = RemoteCap::new()
.with_local_node(NodeId::new("origin-a"))
.with_runtime(runtime.clone());
let cx: Cx = Cx::for_testing_with_remote(cap);
let mut handle = spawn_remote(
&cx,
NodeId::new("worker-1"),
ComputationName::new("encode_block"),
RemoteInput::new(vec![1, 2, 3]),
)
.expect("spawn_remote should succeed");
runtime.deliver(
&cx,
handle.remote_task_id(),
Err(RemoteError::SpawnRejected(
SpawnRejectReason::CapacityExceeded,
)),
);
assert_eq!(handle.state(), RemoteTaskState::Failed);
let err = handle
.try_join()
.expect_err("runtime rejection should surface as terminal error");
assert_eq!(
err,
RemoteError::SpawnRejected(SpawnRejectReason::CapacityExceeded)
);
assert!(
runtime
.observe_task_state(handle.remote_task_id())
.is_none()
);
}
#[test]
fn remote_handle_try_join_maps_cancelled_outcome_state() {
let cx: Cx = Cx::for_testing();
let (tx, rx) = oneshot::channel::<Result<RemoteOutcome, RemoteError>>();
tx.send(
&cx,
Ok(RemoteOutcome::Cancelled(CancelReason::user(
"cancelled remotely",
))),
)
.expect("send outcome");
let mut handle = RemoteHandle {
remote_task_id: RemoteTaskId::next(),
local_task_id: None,
origin_node: NodeId::new("origin"),
node: NodeId::new("n1"),
computation: ComputationName::new("compute"),
owner_region: cx.region_id(),
runtime: None,
receiver: rx,
sender_clock: cx.logical_clock_handle(),
lease: Duration::from_secs(30),
state: RemoteTaskState::Pending,
completed: false,
};
let result = handle.try_join().expect("result").expect("outcome");
assert!(matches!(result, RemoteOutcome::Cancelled(_)));
assert_eq!(handle.state(), RemoteTaskState::Cancelled);
}
#[test]
fn remote_handle_try_join_fails_closed_after_completion() {
let cx: Cx = Cx::for_testing();
let (tx, rx) = oneshot::channel::<Result<RemoteOutcome, RemoteError>>();
tx.send(&cx, Ok(RemoteOutcome::Success(Vec::new())))
.expect("send outcome");
let mut handle = RemoteHandle {
remote_task_id: RemoteTaskId::next(),
local_task_id: None,
origin_node: NodeId::new("origin"),
node: NodeId::new("n1"),
computation: ComputationName::new("compute"),
owner_region: cx.region_id(),
runtime: None,
receiver: rx,
sender_clock: cx.logical_clock_handle(),
lease: Duration::from_secs(30),
state: RemoteTaskState::Pending,
completed: false,
};
let first = handle.try_join().expect("result").expect("outcome");
assert!(matches!(first, RemoteOutcome::Success(_)));
assert_eq!(handle.state(), RemoteTaskState::Completed);
let second = handle.try_join();
assert!(matches!(second, Err(RemoteError::PolledAfterCompletion)));
}
#[test]
fn remote_handle_join_fails_closed_after_completion() {
let cx: Cx = Cx::for_testing();
let (tx, rx) = oneshot::channel::<Result<RemoteOutcome, RemoteError>>();
tx.send(&cx, Ok(RemoteOutcome::Success(Vec::new())))
.expect("send outcome");
let mut handle = RemoteHandle {
remote_task_id: RemoteTaskId::next(),
local_task_id: None,
origin_node: NodeId::new("origin"),
node: NodeId::new("n1"),
computation: ComputationName::new("compute"),
owner_region: cx.region_id(),
runtime: None,
receiver: rx,
sender_clock: cx.logical_clock_handle(),
lease: Duration::from_secs(30),
state: RemoteTaskState::Pending,
completed: false,
};
let first = futures_lite::future::block_on(handle.join(&cx)).expect("first join");
assert!(matches!(first, RemoteOutcome::Success(_)));
assert_eq!(handle.state(), RemoteTaskState::Completed);
let second = futures_lite::future::block_on(handle.join(&cx));
assert!(matches!(second, Err(RemoteError::PolledAfterCompletion)));
}
#[test]
fn remote_handle_join_uses_caller_cancel_reason_for_cancelled_wait() {
let cx: Cx = Cx::for_testing();
let (_tx, rx) = oneshot::channel::<Result<RemoteOutcome, RemoteError>>();
let expected = CancelReason::deadline();
cx.set_cancel_reason(expected.clone());
let mut handle = RemoteHandle {
remote_task_id: RemoteTaskId::next(),
local_task_id: None,
origin_node: NodeId::new("origin"),
node: NodeId::new("n1"),
computation: ComputationName::new("compute"),
owner_region: cx.region_id(),
runtime: None,
receiver: rx,
sender_clock: cx.logical_clock_handle(),
lease: Duration::from_secs(30),
state: RemoteTaskState::Running,
completed: false,
};
let result = futures_lite::future::block_on(handle.join(&cx));
assert!(matches!(
result,
Err(RemoteError::Cancelled(reason)) if reason == expected
));
assert_eq!(handle.state(), RemoteTaskState::Running);
assert!(matches!(handle.try_join(), Ok(None)));
}
#[test]
fn remote_handle_abort_without_attached_runtime_is_noop() {
let cx: Cx = Cx::for_testing_with_remote(fast_phase0_cap());
let handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("long_task"),
RemoteInput::empty(),
)
.unwrap();
handle.abort(&cx);
}
#[test]
fn remote_cap_custom_lease_propagates() {
let cap = fast_phase0_cap().with_default_lease(Duration::from_secs(120));
let cx: Cx = Cx::for_testing_with_remote(cap);
let handle = spawn_remote(
&cx,
NodeId::new("n1"),
ComputationName::new("slow"),
RemoteInput::empty(),
)
.unwrap();
assert_eq!(handle.lease(), Duration::from_secs(120));
}
#[test]
fn idempotency_key_generate() {
let cx: Cx = Cx::for_testing();
let k1 = IdempotencyKey::generate(&cx);
let k2 = IdempotencyKey::generate(&cx);
assert_ne!(k1, k2);
assert_ne!(k1.raw(), 0);
}
#[test]
fn idempotency_key_from_raw() {
let key = IdempotencyKey::from_raw(0xDEAD_BEEF);
assert_eq!(key.raw(), 0xDEAD_BEEF);
let display = format!("{key}");
assert!(display.starts_with("IK-"));
assert!(display.contains("deadbeef"));
}
#[test]
fn spawn_request_construction() {
let cx: Cx = Cx::for_testing();
let req = SpawnRequest {
remote_task_id: RemoteTaskId::next(),
computation: ComputationName::new("encode_block"),
input: RemoteInput::new(vec![1, 2, 3]),
lease: Duration::from_secs(60),
idempotency_key: IdempotencyKey::generate(&cx),
budget: None,
origin_node: NodeId::new("origin-1"),
origin_region: cx.region_id(),
origin_task: cx.task_id(),
};
assert_eq!(req.computation.as_str(), "encode_block");
assert_eq!(req.input.len(), 3);
assert_eq!(req.lease, Duration::from_secs(60));
assert_eq!(req.origin_node.as_str(), "origin-1");
}
#[test]
fn spawn_ack_accepted() {
let ack = SpawnAck {
remote_task_id: RemoteTaskId::next(),
status: SpawnAckStatus::Accepted,
assigned_node: NodeId::new("worker-3"),
};
assert_eq!(ack.status, SpawnAckStatus::Accepted);
assert_eq!(ack.assigned_node.as_str(), "worker-3");
}
#[test]
fn spawn_ack_rejected() {
let ack = SpawnAck {
remote_task_id: RemoteTaskId::next(),
status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
assigned_node: NodeId::new("worker-1"),
};
assert_eq!(
ack.status,
SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded)
);
}
#[test]
fn spawn_reject_reason_display() {
assert_eq!(
format!("{}", SpawnRejectReason::UnknownComputation),
"unknown computation"
);
assert_eq!(
format!("{}", SpawnRejectReason::CapacityExceeded),
"capacity exceeded"
);
assert_eq!(
format!("{}", SpawnRejectReason::NodeShuttingDown),
"node shutting down"
);
assert!(
format!("{}", SpawnRejectReason::InvalidInput("bad data".into())).contains("bad data")
);
assert_eq!(
format!("{}", SpawnRejectReason::IdempotencyConflict),
"idempotency conflict"
);
}
#[test]
fn cancel_request_construction() {
let req = CancelRequest {
remote_task_id: RemoteTaskId::next(),
reason: CancelReason::user("user abort"),
origin_node: NodeId::new("origin-1"),
};
assert_eq!(req.origin_node.as_str(), "origin-1");
}
#[test]
fn result_delivery_success() {
let delivery = ResultDelivery {
remote_task_id: RemoteTaskId::next(),
outcome: RemoteOutcome::Success(vec![42]),
execution_time: Duration::from_millis(150),
};
assert!(delivery.outcome.is_success());
assert_eq!(delivery.outcome.severity(), crate::types::Severity::Ok);
assert_eq!(delivery.execution_time, Duration::from_millis(150));
}
#[test]
fn result_delivery_failure() {
let delivery = ResultDelivery {
remote_task_id: RemoteTaskId::next(),
outcome: RemoteOutcome::Failed("out of memory".into()),
execution_time: Duration::from_secs(5),
};
assert!(!delivery.outcome.is_success());
assert_eq!(delivery.outcome.severity(), crate::types::Severity::Err);
}
#[test]
fn remote_outcome_display() {
assert_eq!(format!("{}", RemoteOutcome::Success(vec![])), "Success");
assert!(format!("{}", RemoteOutcome::Failed("oops".into())).contains("oops"));
assert!(
format!("{}", RemoteOutcome::Cancelled(CancelReason::user("done")))
.contains("Cancelled")
);
assert!(format!("{}", RemoteOutcome::Panicked("boom".into())).contains("boom"));
}
#[test]
fn lease_renewal_construction() {
let renewal = LeaseRenewal {
remote_task_id: RemoteTaskId::next(),
new_lease: Duration::from_secs(30),
current_state: RemoteTaskState::Running,
node: NodeId::new("worker-1"),
};
assert_eq!(renewal.new_lease, Duration::from_secs(30));
assert_eq!(renewal.current_state, RemoteTaskState::Running);
}
#[test]
fn remote_message_task_id_dispatch() {
let rtid = RemoteTaskId::next();
let cx: Cx = Cx::for_testing();
let spawn_msg = RemoteMessage::SpawnRequest(SpawnRequest {
remote_task_id: rtid,
computation: ComputationName::new("test"),
input: RemoteInput::empty(),
lease: Duration::from_secs(30),
idempotency_key: IdempotencyKey::generate(&cx),
budget: None,
origin_node: NodeId::new("n1"),
origin_region: cx.region_id(),
origin_task: cx.task_id(),
});
assert_eq!(spawn_msg.remote_task_id(), rtid);
let ack_msg = RemoteMessage::SpawnAck(SpawnAck {
remote_task_id: rtid,
status: SpawnAckStatus::Accepted,
assigned_node: NodeId::new("n2"),
});
assert_eq!(ack_msg.remote_task_id(), rtid);
let cancel_msg = RemoteMessage::CancelRequest(CancelRequest {
remote_task_id: rtid,
reason: CancelReason::user("test"),
origin_node: NodeId::new("n1"),
});
assert_eq!(cancel_msg.remote_task_id(), rtid);
let result_msg = RemoteMessage::ResultDelivery(ResultDelivery {
remote_task_id: rtid,
outcome: RemoteOutcome::Success(vec![]),
execution_time: Duration::ZERO,
});
assert_eq!(result_msg.remote_task_id(), rtid);
let renewal_msg = RemoteMessage::LeaseRenewal(LeaseRenewal {
remote_task_id: rtid,
new_lease: Duration::from_secs(30),
current_state: RemoteTaskState::Running,
node: NodeId::new("n2"),
});
assert_eq!(renewal_msg.remote_task_id(), rtid);
}
fn test_spawn_request(cx: &Cx, remote_task_id: RemoteTaskId) -> SpawnRequest {
SpawnRequest {
remote_task_id,
computation: ComputationName::new("compute"),
input: RemoteInput::empty(),
lease: Duration::from_secs(30),
idempotency_key: IdempotencyKey::generate(cx),
budget: None,
origin_node: NodeId::new("origin-1"),
origin_region: cx.region_id(),
origin_task: cx.task_id(),
}
}
fn test_ack_accepted(remote_task_id: RemoteTaskId) -> SpawnAck {
SpawnAck {
remote_task_id,
status: SpawnAckStatus::Accepted,
assigned_node: NodeId::new("worker-1"),
}
}
fn test_ack_rejected(remote_task_id: RemoteTaskId) -> SpawnAck {
SpawnAck {
remote_task_id,
status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
assigned_node: NodeId::new("worker-1"),
}
}
fn test_cancel(remote_task_id: RemoteTaskId) -> CancelRequest {
CancelRequest {
remote_task_id,
reason: CancelReason::user("cancel"),
origin_node: NodeId::new("origin-1"),
}
}
fn test_result(remote_task_id: RemoteTaskId, outcome: RemoteOutcome) -> ResultDelivery {
ResultDelivery {
remote_task_id,
outcome,
execution_time: Duration::ZERO,
}
}
fn test_renewal(remote_task_id: RemoteTaskId) -> LeaseRenewal {
LeaseRenewal {
remote_task_id,
new_lease: Duration::from_secs(10),
current_state: RemoteTaskState::Running,
node: NodeId::new("worker-1"),
}
}
#[test]
fn origin_session_cancel_flow() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let origin = OriginSession::<OriginInit>::new(rtid);
let req = test_spawn_request(&cx, rtid);
let origin = origin.send_spawn(&req).unwrap();
let ack = test_ack_accepted(rtid);
let outcome = origin.recv_spawn_ack(&ack).unwrap();
assert!(matches!(outcome, OriginAckOutcome::Accepted(_)));
let origin = match outcome {
OriginAckOutcome::Accepted(session) => session,
OriginAckOutcome::Rejected(_) => return,
};
let origin = origin.recv_lease_renewal(&test_renewal(rtid)).unwrap();
let origin = origin.send_cancel(&test_cancel(rtid)).unwrap();
let result = test_result(
rtid,
RemoteOutcome::Cancelled(CancelReason::user("cancelled")),
);
let origin = origin.recv_result(&result).unwrap();
assert_eq!(origin.remote_task_id(), rtid);
}
#[test]
fn origin_session_cancel_before_ack_then_late_accept_flow() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let origin = OriginSession::<OriginInit>::new(rtid);
let req = test_spawn_request(&cx, rtid);
let origin = origin.send_spawn(&req).unwrap();
let origin = origin.send_cancel(&test_cancel(rtid)).unwrap();
let ack = test_ack_accepted(rtid);
let outcome = origin.recv_spawn_ack(&ack).unwrap();
assert!(matches!(outcome, OriginCancelAckOutcome::Accepted(_)));
let origin = match outcome {
OriginCancelAckOutcome::Accepted(session) => session,
OriginCancelAckOutcome::Rejected(_) => return,
};
let result = test_result(
rtid,
RemoteOutcome::Cancelled(CancelReason::user("cancelled")),
);
let origin = origin.recv_result(&result).unwrap();
assert_eq!(origin.remote_task_id(), rtid);
}
#[test]
fn origin_session_cancel_before_ack_then_late_reject_flow() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let origin = OriginSession::<OriginInit>::new(rtid);
let req = test_spawn_request(&cx, rtid);
let origin = origin.send_spawn(&req).unwrap();
let origin = origin.send_cancel(&test_cancel(rtid)).unwrap();
let ack = test_ack_rejected(rtid);
let outcome = origin.recv_spawn_ack(&ack).unwrap();
assert!(matches!(outcome, OriginCancelAckOutcome::Rejected(_)));
if let OriginCancelAckOutcome::Rejected(session) = outcome {
assert_eq!(session.remote_task_id(), rtid);
}
}
#[test]
fn origin_session_reject_flow() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let origin = OriginSession::<OriginInit>::new(rtid);
let req = test_spawn_request(&cx, rtid);
let origin = origin.send_spawn(&req).unwrap();
let ack = test_ack_rejected(rtid);
let outcome = origin.recv_spawn_ack(&ack).unwrap();
assert!(matches!(outcome, OriginAckOutcome::Rejected(_)));
if let OriginAckOutcome::Rejected(session) = outcome {
assert_eq!(session.remote_task_id(), rtid);
}
}
#[test]
fn remote_session_cancel_before_ack_flow() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let remote = RemoteSession::<RemoteInit>::new(rtid);
let req = test_spawn_request(&cx, rtid);
let remote = remote.recv_spawn(&req).unwrap();
let remote = remote.recv_cancel(&test_cancel(rtid)).unwrap();
let remote = remote.send_ack_accepted(&test_ack_accepted(rtid)).unwrap();
let result = test_result(rtid, RemoteOutcome::Cancelled(CancelReason::user("done")));
let remote = remote.send_result(&result).unwrap();
assert_eq!(remote.remote_task_id(), rtid);
}
#[test]
fn remote_session_cancelled_running_flow_allows_renewal_before_result() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let remote = RemoteSession::<RemoteInit>::new(rtid);
let req = test_spawn_request(&cx, rtid);
let remote = remote.recv_spawn(&req).unwrap();
let remote = remote.send_ack_accepted(&test_ack_accepted(rtid)).unwrap();
let remote = remote.recv_cancel(&test_cancel(rtid)).unwrap();
let remote = remote.send_lease_renewal(&test_renewal(rtid)).unwrap();
let result = test_result(rtid, RemoteOutcome::Cancelled(CancelReason::user("done")));
let remote = remote.send_result(&result).unwrap();
assert_eq!(remote.remote_task_id(), rtid);
}
#[test]
fn protocol_id_mismatch_is_error() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let origin = OriginSession::<OriginInit>::new(rtid);
let req = test_spawn_request(&cx, RemoteTaskId::next());
let err = origin.send_spawn(&req).unwrap_err();
assert!(matches!(
err,
RemoteProtocolError::RemoteTaskIdMismatch { .. }
));
}
#[test]
fn protocol_ack_status_mismatch_is_error() {
let cx: Cx = Cx::for_testing();
let rtid = RemoteTaskId::next();
let remote = RemoteSession::<RemoteInit>::new(rtid);
let req = test_spawn_request(&cx, rtid);
let remote = remote.recv_spawn(&req).unwrap();
let ack = test_ack_rejected(rtid);
let err = remote.send_ack_accepted(&ack).unwrap_err();
assert!(matches!(
err,
RemoteProtocolError::UnexpectedAckStatus { .. }
));
}
#[test]
fn trace_event_names_are_namespaced() {
assert!(trace_events::SPAWN_REQUEST_CREATED.starts_with("remote::"));
assert!(trace_events::SPAWN_REQUEST_SENT.starts_with("remote::"));
assert!(trace_events::SPAWN_ACK_RECEIVED.starts_with("remote::"));
assert!(trace_events::SPAWN_REJECTED.starts_with("remote::"));
assert!(trace_events::CANCEL_SENT.starts_with("remote::"));
assert!(trace_events::CANCEL_RECEIVED.starts_with("remote::"));
assert!(trace_events::RESULT_DELIVERED.starts_with("remote::"));
assert!(trace_events::LEASE_RENEWAL_SENT.starts_with("remote::"));
assert!(trace_events::LEASE_RENEWAL_RECEIVED.starts_with("remote::"));
assert!(trace_events::LEASE_EXPIRED.starts_with("remote::"));
}
fn test_obligation_id() -> ObligationId {
ObligationId::new_for_test(0, 0)
}
fn test_region_id() -> RegionId {
RegionId::new_for_test(0, 0)
}
fn test_task_id() -> TaskId {
TaskId::new_for_test(0, 0)
}
#[test]
fn lease_creation() {
let now = Time::from_secs(10);
let lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
assert!(lease.is_active(now));
assert!(!lease.is_expired(now));
assert!(!lease.is_released());
assert_eq!(lease.renewal_count(), 0);
assert_eq!(lease.initial_duration(), Duration::from_secs(30));
assert_eq!(lease.expires_at(), Time::from_secs(40));
}
#[test]
fn lease_remaining_time() {
let now = Time::from_secs(10);
let lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
let remaining = lease.remaining(Time::from_secs(20));
assert_eq!(remaining, Duration::from_secs(20));
let remaining = lease.remaining(Time::from_secs(40));
assert_eq!(remaining, Duration::ZERO);
let remaining = lease.remaining(Time::from_secs(50));
assert_eq!(remaining, Duration::ZERO);
}
#[test]
fn lease_expiry_detection() {
let now = Time::from_secs(10);
let lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
assert!(!lease.is_expired(Time::from_secs(39)));
assert!(lease.is_active(Time::from_secs(39)));
assert!(lease.is_expired(Time::from_secs(40)));
assert!(!lease.is_active(Time::from_secs(40)));
assert!(lease.is_expired(Time::from_secs(50)));
}
#[test]
fn lease_renew_extends_expiry() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
let result = lease.renew(Duration::from_secs(30), Time::from_secs(25));
assert!(result.is_ok());
assert_eq!(lease.expires_at(), Time::from_secs(55));
assert_eq!(lease.renewal_count(), 1);
let result = lease.renew(Duration::from_secs(30), Time::from_secs(50));
assert!(result.is_ok());
assert_eq!(lease.expires_at(), Time::from_secs(80));
assert_eq!(lease.renewal_count(), 2);
}
#[test]
fn lease_renew_after_expiry_fails() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
let result = lease.renew(Duration::from_secs(30), Time::from_secs(50));
assert_eq!(result, Err(LeaseError::Expired));
assert_eq!(lease.state(), LeaseState::Expired);
}
#[test]
fn lease_release() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
let result = lease.release(Time::from_secs(20));
assert!(result.is_ok());
assert!(lease.is_released());
assert_eq!(lease.state(), LeaseState::Released);
}
#[test]
fn lease_remaining_after_release_is_zero_even_before_original_expiry() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
lease.release(Time::from_secs(20)).unwrap();
assert_eq!(lease.remaining(Time::from_secs(20)), Duration::ZERO);
assert_eq!(lease.remaining(Time::from_secs(25)), Duration::ZERO);
}
#[test]
fn lease_double_release_fails() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
lease.release(Time::from_secs(20)).unwrap();
let result = lease.release(Time::from_secs(25));
assert_eq!(result, Err(LeaseError::Released));
}
#[test]
fn lease_renew_after_release_fails() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
lease.release(Time::from_secs(20)).unwrap();
let result = lease.renew(Duration::from_secs(30), Time::from_secs(25));
assert_eq!(result, Err(LeaseError::Released));
}
#[test]
fn lease_mark_expired() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
let result = lease.mark_expired();
assert!(result.is_ok());
assert_eq!(lease.state(), LeaseState::Expired);
let result = lease.mark_expired();
assert!(result.is_ok());
}
#[test]
fn lease_remaining_after_mark_expired_is_zero_before_wall_clock_expiry() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
lease.mark_expired().unwrap();
assert_eq!(lease.remaining(Time::from_secs(15)), Duration::ZERO);
assert_eq!(lease.remaining(Time::from_secs(39)), Duration::ZERO);
}
#[test]
fn lease_mark_expired_after_release_fails() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
lease.release(Time::from_secs(20)).unwrap();
let result = lease.mark_expired();
assert_eq!(result, Err(LeaseError::Released));
}
#[test]
fn lease_state_display() {
assert_eq!(format!("{}", LeaseState::Active), "Active");
assert_eq!(format!("{}", LeaseState::Released), "Released");
assert_eq!(format!("{}", LeaseState::Expired), "Expired");
}
#[test]
fn lease_error_display() {
assert_eq!(format!("{}", LeaseError::Expired), "lease expired");
assert_eq!(
format!("{}", LeaseError::Released),
"lease already released"
);
assert!(format!("{}", LeaseError::CreationFailed("full".into())).contains("full"));
}
#[test]
fn idempotency_store_new_request() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
assert!(store.is_empty());
let key = IdempotencyKey::from_raw(1);
let request = test_request_fingerprint("encode");
let decision = store.check(&key, &request, Time::from_secs(10));
assert!(matches!(decision, DedupDecision::New));
let inserted = store.record(key, RemoteTaskId::next(), request, Time::from_secs(10));
assert!(inserted);
assert_eq!(store.len(), 1);
}
#[test]
fn idempotency_store_duplicate_detection() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(42);
let request = test_request_fingerprint("encode");
store.record(
key,
RemoteTaskId::next(),
request.clone(),
Time::from_secs(10),
);
let decision = store.check(&key, &request, Time::from_secs(20));
assert!(matches!(decision, DedupDecision::Duplicate(_)));
let inserted = store.record(key, RemoteTaskId::next(), request, Time::from_secs(20));
assert!(!inserted);
assert_eq!(store.len(), 1);
}
#[test]
fn idempotency_store_conflict_detection() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(42);
store.record(
key,
RemoteTaskId::next(),
test_request_fingerprint("encode"),
Time::from_secs(10),
);
let decision = store.check(
&key,
&test_request_fingerprint("decode"),
Time::from_secs(20),
);
assert!(matches!(decision, DedupDecision::Conflict));
}
#[test]
fn idempotency_store_complete_outcome() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(99);
store.record(
key,
RemoteTaskId::next(),
test_request_fingerprint("work"),
Time::from_secs(10),
);
let updated = store.complete(&key, RemoteOutcome::Success(vec![1, 2, 3]));
assert!(updated);
let decision = store.check(&key, &test_request_fingerprint("work"), Time::from_secs(20));
assert!(matches!(decision, DedupDecision::Duplicate(_)));
if let DedupDecision::Duplicate(record) = decision {
assert!(record.outcome.is_some());
assert!(record.outcome.unwrap().is_success());
}
}
#[test]
fn idempotency_store_complete_unknown_key() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(999);
let updated = store.complete(&key, RemoteOutcome::Failed("oops".into()));
assert!(!updated);
}
#[test]
fn idempotency_store_eviction() {
let mut store = IdempotencyStore::new(Duration::from_secs(60));
store.record(
IdempotencyKey::from_raw(1),
RemoteTaskId::next(),
test_request_fingerprint("a"),
Time::from_secs(10),
);
store.record(
IdempotencyKey::from_raw(2),
RemoteTaskId::next(),
test_request_fingerprint("b"),
Time::from_secs(50),
);
assert_eq!(store.len(), 2);
let evicted = store.evict_expired(Time::from_secs(80));
assert_eq!(evicted, 1);
assert_eq!(store.len(), 1);
let decision = store.check(
&IdempotencyKey::from_raw(2),
&test_request_fingerprint("b"),
Time::from_secs(80),
);
assert!(matches!(decision, DedupDecision::Duplicate(_)));
let decision = store.check(
&IdempotencyKey::from_raw(1),
&test_request_fingerprint("a"),
Time::from_secs(80),
);
assert!(matches!(decision, DedupDecision::New));
}
#[test]
fn idempotency_store_check_treats_expired_records_as_new() {
let mut store = IdempotencyStore::new(Duration::from_secs(60));
let key = IdempotencyKey::from_raw(3);
store.record(
key,
RemoteTaskId::next(),
test_request_fingerprint("encode"),
Time::from_secs(10),
);
let decision = store.check(
&key,
&test_request_fingerprint("decode"),
Time::from_secs(80),
);
assert!(
matches!(decision, DedupDecision::New),
"expired keys must not survive as stale conflicts"
);
assert!(store.is_empty(), "expired entry should be removed lazily");
}
#[test]
fn idempotency_store_debug() {
let store = IdempotencyStore::new(Duration::from_secs(60));
let debug = format!("{store:?}");
assert!(debug.contains("IdempotencyStore"));
assert!(debug.contains("entries"));
}
#[test]
fn saga_successful_completion() {
let mut saga = Saga::new();
assert_eq!(saga.state(), SagaState::Running);
assert_eq!(saga.completed_steps(), 0);
let r1: Result<String, _> = saga.step(
"create resource",
|| Ok("resource-1".to_string()),
|| "deleted resource-1".to_string(),
);
assert!(r1.is_ok());
assert_eq!(r1.unwrap(), "resource-1");
assert_eq!(saga.completed_steps(), 1);
let r2: Result<(), _> = saga.step("configure", || Ok(()), || "reset config".to_string());
assert!(r2.is_ok());
assert_eq!(saga.completed_steps(), 2);
saga.complete();
assert_eq!(saga.state(), SagaState::Completed);
assert!(saga.compensation_results().is_empty());
}
#[test]
fn saga_step_failure_runs_compensations_reverse() {
use std::sync::Arc;
let order = Arc::new(Mutex::new(Vec::new()));
let o1 = Arc::clone(&order);
let mut saga = Saga::new();
saga.step(
"step-0",
|| Ok(()),
move || {
o1.lock().push(0);
"comp-0".to_string()
},
)
.unwrap();
let o2 = Arc::clone(&order);
saga.step(
"step-1",
|| Ok(()),
move || {
o2.lock().push(1);
"comp-1".to_string()
},
)
.unwrap();
let o3 = Arc::clone(&order);
let result: Result<(), SagaStepError> = saga.step(
"step-2",
|| Err("boom".to_string()),
move || {
o3.lock().push(2);
"comp-2".to_string()
},
);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.step, 2);
assert!(err.message.contains("boom"));
assert_eq!(saga.state(), SagaState::Aborted);
let comps = saga.compensation_results();
assert_eq!(comps.len(), 2);
assert_eq!(comps[0].step, 1); assert_eq!(comps[1].step, 0);
let executed = order.lock().clone();
assert_eq!(executed, vec![1, 0]);
}
#[test]
fn saga_explicit_abort() {
use std::sync::Arc;
let compensated = Arc::new(Mutex::new(Vec::new()));
let mut saga = Saga::new();
let c1 = Arc::clone(&compensated);
saga.step(
"step-0",
|| Ok(()),
move || {
c1.lock().push("step-0");
"undid step-0".to_string()
},
)
.unwrap();
let c2 = Arc::clone(&compensated);
saga.step(
"step-1",
|| Ok(()),
move || {
c2.lock().push("step-1");
"undid step-1".to_string()
},
)
.unwrap();
saga.abort();
assert_eq!(saga.state(), SagaState::Aborted);
let comps = saga.compensation_results();
assert_eq!(comps.len(), 2);
assert_eq!(comps[0].description, "step-1"); assert_eq!(comps[1].description, "step-0");
let executed = compensated.lock().clone();
assert_eq!(executed, vec!["step-1", "step-0"]);
}
#[test]
fn saga_first_step_failure_no_compensations() {
let mut saga = Saga::new();
let result: Result<(), _> = saga.step("fail-step", || Err("bad".to_string()), String::new);
assert!(result.is_err());
assert_eq!(saga.state(), SagaState::Aborted);
assert!(saga.compensation_results().is_empty());
}
#[test]
fn saga_drop_during_unwind_skips_compensation_side_effects() {
use std::panic::{self, AssertUnwindSafe};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
let compensated = Arc::new(AtomicBool::new(false));
let compensation_ran = Arc::clone(&compensated);
let unwind = panic::catch_unwind(AssertUnwindSafe(move || {
let mut saga = Saga::new();
saga.step(
"step-0",
|| Ok(()),
move || {
compensation_ran.store(true, Ordering::SeqCst);
"comp-0".to_string()
},
)
.unwrap();
panic!("outer panic");
}));
assert!(unwind.is_err());
assert!(
!compensated.load(Ordering::SeqCst),
"drop during unwind must not run compensation closures"
);
}
#[test]
fn saga_drop_during_unwind_with_panicking_compensation_preserves_process() {
const CHILD_ENV: &str = "ASUPERSYNC_SAGA_UNWIND_CHILD";
const TEST_NAME: &str =
"remote::tests::saga_drop_during_unwind_with_panicking_compensation_preserves_process";
if std::env::var_os(CHILD_ENV).is_some() {
let mut saga = Saga::new();
saga.step(
"step-0",
|| Ok(()),
|| -> String { panic!("compensation panic during unwind") },
)
.unwrap();
panic!("outer panic");
}
let output = std::process::Command::new(std::env::current_exe().unwrap())
.arg("--exact")
.arg(TEST_NAME)
.arg("--nocapture")
.env(CHILD_ENV, "1")
.output()
.expect("spawn child test binary");
assert_eq!(
output.status.code(),
Some(101),
"child should fail from the original panic without aborting the process: {:?}",
output.status
);
}
#[test]
fn saga_state_display() {
assert_eq!(format!("{}", SagaState::Running), "Running");
assert_eq!(format!("{}", SagaState::Completed), "Completed");
assert_eq!(format!("{}", SagaState::Compensating), "Compensating");
assert_eq!(format!("{}", SagaState::Aborted), "Aborted");
}
#[test]
fn saga_step_error_display() {
let err = SagaStepError {
step: 3,
description: "deploy".to_string(),
message: "timeout".to_string(),
};
let text = format!("{err}");
assert!(text.contains('3'));
assert!(text.contains("deploy"));
assert!(text.contains("timeout"));
}
#[test]
fn saga_debug() {
let saga = Saga::new();
let debug = format!("{saga:?}");
assert!(debug.contains("Saga"));
assert!(debug.contains("Running"));
}
#[test]
fn saga_default_trait() {
let saga = Saga::default();
assert_eq!(saga.state(), SagaState::Running);
}
#[test]
fn lease_renew_at_exact_expiry_boundary_fails() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
let result = lease.renew(Duration::from_secs(30), Time::from_secs(40));
assert_eq!(result, Err(LeaseError::Expired));
assert_eq!(lease.state(), LeaseState::Expired);
let result2 = lease.renew(Duration::from_secs(30), Time::from_secs(35));
assert_eq!(result2, Err(LeaseError::Expired));
}
#[test]
fn lease_release_at_exact_expiry_boundary_fails() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
let result = lease.release(Time::from_secs(40));
assert_eq!(result, Err(LeaseError::Expired));
assert_eq!(lease.state(), LeaseState::Expired);
assert!(lease.is_expired(Time::from_secs(40)));
assert!(!lease.is_released());
}
#[test]
fn lease_zero_duration_immediately_expired() {
let now = Time::from_secs(100);
let lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::ZERO,
now,
);
assert!(
lease.is_expired(now),
"zero-duration lease must be expired at creation time"
);
assert!(
!lease.is_active(now),
"zero-duration lease must not be active at creation time"
);
assert_eq!(lease.remaining(now), Duration::ZERO);
}
#[test]
fn lease_active_and_expired_are_complementary() {
let now = Time::from_secs(10);
let lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
for t in [0, 5, 10, 20, 39, 40, 41, 100] {
let time = Time::from_secs(t);
let active = lease.is_active(time);
let expired = lease.is_expired(time);
assert!(
active != expired,
"at t={t}: is_active={active}, is_expired={expired} — must be complementary"
);
}
}
#[test]
fn lease_release_then_renew_gives_released_error() {
let now = Time::from_secs(10);
let mut lease = Lease::new(
test_obligation_id(),
test_region_id(),
test_task_id(),
Duration::from_secs(30),
now,
);
lease.release(Time::from_secs(15)).unwrap();
let result = lease.renew(Duration::from_secs(30), Time::from_secs(15));
assert_eq!(result, Err(LeaseError::Released));
}
#[test]
fn idempotency_store_evicts_completed_entries_on_ttl() {
let mut store = IdempotencyStore::new(Duration::from_secs(60));
let key = IdempotencyKey::from_raw(1);
let request = test_request_fingerprint("work");
store.record(
key,
RemoteTaskId::next(),
request.clone(),
Time::from_secs(10),
);
store.complete(&key, RemoteOutcome::Success(vec![42]));
assert_eq!(store.len(), 1);
let evicted = store.evict_expired(Time::from_secs(80));
assert_eq!(evicted, 1);
assert!(store.is_empty());
let decision = store.check(&key, &request, Time::from_secs(80));
assert!(matches!(decision, DedupDecision::New));
}
#[test]
fn idempotency_store_check_after_failed_returns_duplicate_with_outcome() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(77);
let request = test_request_fingerprint("fragile_op");
store.record(
key,
RemoteTaskId::next(),
request.clone(),
Time::from_secs(10),
);
store.complete(&key, RemoteOutcome::Failed("disk full".into()));
let decision = store.check(&key, &request, Time::from_secs(20));
assert!(
matches!(
decision,
DedupDecision::Duplicate(record)
if record.outcome.as_ref().is_some_and(|outcome| {
!outcome.is_success()
&& matches!(
outcome,
RemoteOutcome::Failed(msg) if msg.contains("disk full")
)
})
),
"expected Duplicate with failed outcome recorded"
);
}
#[test]
fn idempotency_store_complete_overwrites_outcome() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(88);
let request = test_request_fingerprint("retry_op");
store.record(
key,
RemoteTaskId::next(),
request.clone(),
Time::from_secs(10),
);
store.complete(&key, RemoteOutcome::Failed("transient".into()));
store.complete(&key, RemoteOutcome::Success(vec![1, 2, 3]));
let decision = store.check(&key, &request, Time::from_secs(20));
assert!(
matches!(
decision,
DedupDecision::Duplicate(record)
if record
.outcome
.as_ref()
.is_some_and(RemoteOutcome::is_success)
),
"expected Duplicate with the latest successful outcome"
);
}
#[test]
fn idempotency_store_same_computation_different_input_conflicts() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(0xfeed);
let base = IdempotencyRequestFingerprint::new(
ComputationName::new("encode"),
RemoteInput::new(vec![1, 2, 3]),
);
let mut changed_input = base.clone();
changed_input.input = RemoteInput::new(vec![9, 9, 9]);
store.record(key, RemoteTaskId::next(), base, Time::from_secs(10));
let decision = store.check(&key, &changed_input, Time::from_secs(20));
assert!(
matches!(decision, DedupDecision::Conflict),
"same key + same computation but different payload must conflict"
);
}
#[test]
fn idempotency_store_retry_metadata_does_not_trigger_conflict() {
let mut store = IdempotencyStore::new(Duration::from_secs(300));
let key = IdempotencyKey::from_raw(0xbeef);
let base = SpawnRequest {
remote_task_id: RemoteTaskId::from_raw(7),
computation: ComputationName::new("encode"),
input: RemoteInput::new(vec![1, 2, 3]),
lease: Duration::from_secs(30),
idempotency_key: key,
budget: Some(Budget::MINIMAL),
origin_node: NodeId::new("origin-a"),
origin_region: RegionId::new_for_test(1, 1),
origin_task: TaskId::new_for_test(1, 1),
};
let retry = SpawnRequest {
remote_task_id: RemoteTaskId::from_raw(99),
lease: Duration::from_secs(120),
budget: Some(Budget::INFINITE),
origin_node: NodeId::new("origin-b"),
origin_region: RegionId::new_for_test(2, 2),
origin_task: TaskId::new_for_test(2, 2),
..base.clone()
};
let recorded = IdempotencyRequestFingerprint::from_spawn_request(&base);
assert!(store.record(
key,
base.remote_task_id,
recorded.clone(),
Time::from_secs(10),
));
let decision = store.check(
&key,
&IdempotencyRequestFingerprint::from_spawn_request(&retry),
Time::from_secs(20),
);
assert!(
matches!(decision, DedupDecision::Duplicate(record) if record.request == recorded),
"retries must deduplicate on logical operation, not lease/budget/origin metadata"
);
}
#[test]
#[should_panic(expected = "not Running")]
fn saga_step_after_complete_panics() {
let mut saga = Saga::new();
saga.step("step-0", || Ok(()), || "comp-0".to_string())
.unwrap();
saga.complete();
assert_eq!(saga.state(), SagaState::Completed);
let _: Result<(), _> = saga.step("step-1", || Ok(()), || "comp-1".to_string());
}
#[test]
#[should_panic(expected = "not Running")]
fn saga_step_after_abort_panics() {
let mut saga = Saga::new();
saga.step("step-0", || Ok(()), || "comp-0".to_string())
.unwrap();
saga.abort();
assert_eq!(saga.state(), SagaState::Aborted);
let _: Result<(), _> = saga.step("step-1", || Ok(()), || "comp-1".to_string());
}
#[test]
#[should_panic(expected = "Running")]
fn saga_complete_after_abort_panics() {
let mut saga = Saga::new();
saga.step("step-0", || Ok(()), || "comp-0".to_string())
.unwrap();
saga.abort();
saga.complete(); }
#[test]
#[should_panic(expected = "Running")]
fn saga_abort_after_complete_panics() {
let mut saga = Saga::new();
saga.step("step-0", || Ok(()), || "comp-0".to_string())
.unwrap();
saga.complete();
saga.abort(); }
#[test]
fn saga_empty_complete_is_valid() {
let mut saga = Saga::new();
assert_eq!(saga.completed_steps(), 0);
saga.complete();
assert_eq!(saga.state(), SagaState::Completed);
assert!(saga.compensation_results().is_empty());
}
#[test]
fn saga_empty_abort_is_valid() {
let mut saga = Saga::new();
saga.abort();
assert_eq!(saga.state(), SagaState::Aborted);
assert!(saga.compensation_results().is_empty());
}
#[test]
fn remote_task_id_debug_clone_copy_eq_ord_hash() {
use std::collections::HashSet;
let a = RemoteTaskId::from_raw(42);
let b = a; let c = a;
assert_eq!(a, b);
assert_eq!(a, c);
assert_ne!(a, RemoteTaskId::from_raw(99));
assert!(a < RemoteTaskId::from_raw(100));
let dbg = format!("{a:?}");
assert!(dbg.contains("42"));
let mut set = HashSet::new();
set.insert(a);
assert!(set.contains(&b));
}
#[test]
fn idempotency_key_debug_clone_copy_eq_hash() {
use std::collections::HashSet;
let k = IdempotencyKey::from_raw(12345);
let k2 = k; let k3 = k;
assert_eq!(k, k2);
assert_eq!(k, k3);
assert_ne!(k, IdempotencyKey::from_raw(99999));
let dbg = format!("{k:?}");
assert!(dbg.contains("12345"));
let mut set = HashSet::new();
set.insert(k);
assert!(set.contains(&k2));
}
#[test]
fn lease_state_debug_clone_copy_eq() {
let s = LeaseState::Active;
let s2 = s; let s3 = s;
assert_eq!(s, s2);
assert_eq!(s, s3);
assert_ne!(s, LeaseState::Released);
assert_ne!(s, LeaseState::Expired);
let dbg = format!("{s:?}");
assert!(dbg.contains("Active"));
}
#[test]
fn saga_state_debug_clone_copy_eq() {
let s = SagaState::Running;
let s2 = s; let s3 = s;
assert_eq!(s, s2);
assert_eq!(s, s3);
assert_ne!(s, SagaState::Completed);
assert_ne!(s, SagaState::Compensating);
assert_ne!(s, SagaState::Aborted);
let dbg = format!("{s:?}");
assert!(dbg.contains("Running"));
}
#[test]
#[allow(clippy::clone_on_copy)]
fn remote_task_state_debug_clone_eq() {
let s = RemoteTaskState::Pending;
let s2 = s.clone();
assert_eq!(s, s2);
assert_ne!(s, RemoteTaskState::Running);
assert_ne!(s, RemoteTaskState::Completed);
assert_ne!(s, RemoteTaskState::Failed);
assert_ne!(s, RemoteTaskState::Cancelled);
assert_ne!(s, RemoteTaskState::LeaseExpired);
let dbg = format!("{s:?}");
assert!(dbg.contains("Pending"));
}
#[test]
fn remote_error_debug_clone_eq() {
let e = RemoteError::NoCapability;
let e2 = e.clone();
assert_eq!(e, e2);
assert_ne!(e, RemoteError::LeaseExpired);
let dbg = format!("{e:?}");
assert!(dbg.contains("NoCapability"));
}
}