Skip to main content

asupersync/
remote.rs

1//! Remote task execution via named computations.
2//!
3//! This module provides the API for spawning tasks on remote nodes within
4//! Asupersync's distributed structured concurrency model. Key design principles:
5//!
6//! - **No closure shipping**: Remote execution uses *named computations*, not closures.
7//!   The caller specifies a computation name (string) and serialized inputs.
8//! - **Explicit capability**: All remote operations require [`RemoteCap`], a capability
9//!   token held in [`Cx`]. Without it, remote spawning is impossible.
10//! - **Region ownership**: Remote handles are owned by the local region and participate
11//!   in region close/quiescence. Cancellation propagates to remote nodes.
12//! - **Lease-based liveness**: Remote tasks maintain liveness via leases. If a lease
13//!   expires, the local region can escalate (cancel, restart, or fail).
14//!
15//! # Phase 0
16//!
17//! This is the API surface definition. The actual network transport and remote
18//! execution protocol are defined in the remote protocol bead (tmh.1.2). In Phase 0,
19//! `spawn_remote` creates a handle but does not perform real network operations.
20
21use crate::channel::oneshot;
22use crate::cx::Cx;
23use crate::trace::distributed::LogicalTime;
24use crate::types::{Budget, CancelReason, ObligationId, RegionId, TaskId, Time};
25use std::fmt;
26use std::marker::PhantomData;
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use std::time::Duration;
30
31// ---------------------------------------------------------------------------
32// Identifiers
33// ---------------------------------------------------------------------------
34
35static REMOTE_TASK_COUNTER: AtomicU64 = AtomicU64::new(1);
36
37/// Identifier for a remote node in the cluster.
38///
39/// Nodes are opaque identifiers. The runtime does not interpret them beyond
40/// equality comparison and display. The transport layer maps `NodeId` to
41/// actual network addresses.
42#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct NodeId(String);
44
45impl NodeId {
46    /// Creates a new node identifier from a string.
47    #[must_use]
48    pub fn new(id: impl Into<String>) -> Self {
49        Self(id.into())
50    }
51
52    /// Returns the node identifier as a string slice.
53    #[must_use]
54    pub fn as_str(&self) -> &str {
55        &self.0
56    }
57}
58
59impl fmt::Display for NodeId {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        write!(f, "Node({})", self.0)
62    }
63}
64
65/// A unique identifier for a remote task.
66///
67/// Remote task IDs are separate from local [`TaskId`]s because the remote
68/// task may not have an arena slot in the local runtime. The local proxy
69/// task that owns the [`RemoteHandle`] has a regular `TaskId`.
70#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
71pub struct RemoteTaskId(u64);
72
73impl RemoteTaskId {
74    /// Allocates a new unique remote task ID.
75    #[must_use]
76    pub fn next() -> Self {
77        Self(REMOTE_TASK_COUNTER.fetch_add(1, Ordering::Relaxed))
78    }
79
80    /// Creates a remote task ID from a raw value.
81    #[must_use]
82    pub const fn from_raw(value: u64) -> Self {
83        Self(value)
84    }
85
86    /// Returns the raw numeric identifier.
87    #[must_use]
88    pub const fn raw(self) -> u64 {
89        self.0
90    }
91}
92
93impl fmt::Display for RemoteTaskId {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        write!(f, "RT{}", self.0)
96    }
97}
98
99// ---------------------------------------------------------------------------
100// Named computation
101// ---------------------------------------------------------------------------
102
103/// Name of a computation that can be executed on a remote node.
104///
105/// Named computations are the only way to run code remotely. Unlike closure
106/// shipping, this approach:
107/// - Keeps the set of remotely-executable operations explicit and auditable
108/// - Avoids serialization of arbitrary Rust closures (which is unsound)
109/// - Allows remote nodes to validate computation names against a registry
110///
111/// # Example
112///
113/// ```
114/// use asupersync::remote::ComputationName;
115///
116/// let name = ComputationName::new("encode_block");
117/// assert_eq!(name.as_str(), "encode_block");
118/// ```
119#[derive(Clone, Debug, PartialEq, Eq, Hash)]
120pub struct ComputationName(String);
121
122impl ComputationName {
123    /// Creates a new computation name.
124    #[must_use]
125    pub fn new(name: impl Into<String>) -> Self {
126        Self(name.into())
127    }
128
129    /// Returns the computation name as a string slice.
130    #[must_use]
131    pub fn as_str(&self) -> &str {
132        &self.0
133    }
134}
135
136impl fmt::Display for ComputationName {
137    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
138        write!(f, "{}", self.0)
139    }
140}
141
142// ---------------------------------------------------------------------------
143// Serialized input
144// ---------------------------------------------------------------------------
145
146/// Serialized input for a remote computation.
147///
148/// The caller is responsible for serialization. The runtime treats this as
149/// opaque bytes. The remote node deserializes using the computation's
150/// expected schema.
151#[derive(Clone, Debug)]
152pub struct RemoteInput {
153    data: Vec<u8>,
154}
155
156impl RemoteInput {
157    /// Creates a new remote input from raw bytes.
158    #[must_use]
159    pub fn new(data: Vec<u8>) -> Self {
160        Self { data }
161    }
162
163    /// Creates an empty remote input (for computations that take no arguments).
164    #[must_use]
165    pub fn empty() -> Self {
166        Self { data: Vec::new() }
167    }
168
169    /// Returns the serialized data.
170    #[must_use]
171    pub fn data(&self) -> &[u8] {
172        &self.data
173    }
174
175    /// Consumes self and returns the underlying bytes.
176    #[must_use]
177    pub fn into_data(self) -> Vec<u8> {
178        self.data
179    }
180
181    /// Returns the size of the serialized input in bytes.
182    #[must_use]
183    pub fn len(&self) -> usize {
184        self.data.len()
185    }
186
187    /// Returns true if the input is empty.
188    #[must_use]
189    pub fn is_empty(&self) -> bool {
190        self.data.is_empty()
191    }
192}
193
194// ---------------------------------------------------------------------------
195// RemoteRuntime - High-level transport integration
196// ---------------------------------------------------------------------------
197
198/// Abstract interface for the remote runtime (transport + state).
199///
200/// This trait allows the [`RemoteCap`] to bridge the high-level `spawn_remote`
201/// API with the underlying transport (network or virtual harness).
202pub trait RemoteRuntime: Send + Sync + fmt::Debug {
203    /// Sends a message to the network.
204    fn send_message(
205        &self,
206        destination: &NodeId,
207        envelope: MessageEnvelope<RemoteMessage>,
208    ) -> Result<(), RemoteError>;
209
210    /// Registers a pending local task expecting a result.
211    fn register_task(
212        &self,
213        task_id: RemoteTaskId,
214        tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
215    );
216
217    /// Unregisters a pending local task after spawn failure.
218    ///
219    /// Implementations that keep a pending-results map should remove the
220    /// entry for `task_id`. The default implementation is a no-op.
221    fn unregister_task(&self, _task_id: RemoteTaskId) {}
222}
223
224// ---------------------------------------------------------------------------
225// RemoteCap — capability token
226// ---------------------------------------------------------------------------
227
228/// Capability token authorizing remote task operations.
229///
230/// `RemoteCap` is the gate for all remote operations. A [`Cx`] without a
231/// `RemoteCap` cannot spawn remote tasks — the call fails at compile time
232/// (via the `spawn_remote` signature requiring `&RemoteCap`) or at runtime
233/// (via `cx.remote()` returning `None`).
234///
235/// # Capability Model
236///
237/// The capability is granted during Cx construction and flows through the
238/// capability context. This ensures:
239///
240/// - Code that doesn't need remote execution never has access to it
241/// - Remote authority can be tested by constructing Cx with/without the cap
242/// - Auditing which code paths can spawn remote work is straightforward
243///
244/// # Configuration
245///
246/// The cap holds optional configuration that governs remote execution policy:
247/// - Default lease duration for remote tasks
248/// - Budget constraints for remote operations
249/// - The transport runtime (if connected)
250///
251/// # Example
252///
253/// ```
254/// use asupersync::remote::RemoteCap;
255///
256/// let cap = RemoteCap::new();
257/// assert_eq!(cap.default_lease().as_secs(), 30);
258/// ```
259#[derive(Clone, Debug)]
260pub struct RemoteCap {
261    /// Default lease duration for remote tasks.
262    default_lease: Duration,
263    /// Budget ceiling for remote tasks (if set, tighter than region budget).
264    remote_budget: Option<Budget>,
265    /// Identity used as the origin node for outbound remote protocol messages.
266    local_node: NodeId,
267    /// The connected remote runtime (transport).
268    runtime: Option<Arc<dyn RemoteRuntime>>,
269}
270
271impl RemoteCap {
272    /// Creates a new `RemoteCap` with default configuration.
273    #[must_use]
274    pub fn new() -> Self {
275        Self {
276            default_lease: Duration::from_secs(30),
277            remote_budget: None,
278            local_node: NodeId::new("local"),
279            runtime: None,
280        }
281    }
282
283    /// Sets the default lease duration for remote tasks.
284    #[must_use]
285    pub fn with_default_lease(mut self, lease: Duration) -> Self {
286        self.default_lease = lease;
287        self
288    }
289
290    /// Sets a budget ceiling for remote tasks.
291    #[must_use]
292    pub fn with_remote_budget(mut self, budget: Budget) -> Self {
293        self.remote_budget = Some(budget);
294        self
295    }
296
297    /// Sets the local node identity used as protocol origin.
298    #[must_use]
299    pub fn with_local_node(mut self, node: NodeId) -> Self {
300        self.local_node = node;
301        self
302    }
303
304    /// Attaches a remote runtime (transport).
305    #[must_use]
306    pub fn with_runtime(mut self, runtime: Arc<dyn RemoteRuntime>) -> Self {
307        self.runtime = Some(runtime);
308        self
309    }
310
311    /// Returns the default lease duration.
312    #[must_use]
313    pub fn default_lease(&self) -> Duration {
314        self.default_lease
315    }
316
317    /// Returns the remote budget ceiling, if configured.
318    #[must_use]
319    pub fn remote_budget(&self) -> Option<&Budget> {
320        self.remote_budget.as_ref()
321    }
322
323    /// Returns the local node identity used for protocol origin metadata.
324    #[must_use]
325    pub fn local_node(&self) -> &NodeId {
326        &self.local_node
327    }
328
329    /// Returns the attached remote runtime, if any.
330    #[must_use]
331    pub fn runtime(&self) -> Option<&Arc<dyn RemoteRuntime>> {
332        self.runtime.as_ref()
333    }
334}
335
336impl Default for RemoteCap {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342// ---------------------------------------------------------------------------
343// Remote task state
344// ---------------------------------------------------------------------------
345
346/// Lifecycle state of a remote task as observed from the local node.
347#[derive(Clone, Debug, PartialEq, Eq)]
348pub enum RemoteTaskState {
349    /// Spawn request sent, waiting for acknowledgement from remote node.
350    Pending,
351    /// Remote node acknowledged the spawn; task is running remotely.
352    Running,
353    /// Remote task completed successfully.
354    Completed,
355    /// Remote task failed with an error.
356    Failed,
357    /// Remote task was cancelled.
358    Cancelled,
359    /// Lease expired without renewal — remote status unknown.
360    LeaseExpired,
361}
362
363impl fmt::Display for RemoteTaskState {
364    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365        match self {
366            Self::Pending => write!(f, "Pending"),
367            Self::Running => write!(f, "Running"),
368            Self::Completed => write!(f, "Completed"),
369            Self::Failed => write!(f, "Failed"),
370            Self::Cancelled => write!(f, "Cancelled"),
371            Self::LeaseExpired => write!(f, "LeaseExpired"),
372        }
373    }
374}
375
376// ---------------------------------------------------------------------------
377// Errors
378// ---------------------------------------------------------------------------
379
380/// Errors that can occur during remote task operations.
381#[derive(Debug, Clone, PartialEq, Eq)]
382pub enum RemoteError {
383    /// No remote capability available in the context.
384    NoCapability,
385    /// The remote node is unreachable or unknown.
386    NodeUnreachable(String),
387    /// The computation name is not registered on the remote node.
388    UnknownComputation(String),
389    /// The lease expired before the task completed.
390    LeaseExpired,
391    /// The remote task was cancelled.
392    Cancelled(CancelReason),
393    /// The remote task panicked.
394    RemotePanic(String),
395    /// Serialization/deserialization error for inputs or outputs.
396    SerializationError(String),
397    /// Transport-level error.
398    TransportError(String),
399}
400
401impl fmt::Display for RemoteError {
402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403        match self {
404            Self::NoCapability => write!(f, "remote capability not available"),
405            Self::NodeUnreachable(node) => write!(f, "node unreachable: {node}"),
406            Self::UnknownComputation(name) => {
407                write!(f, "unknown computation: {name}")
408            }
409            Self::LeaseExpired => write!(f, "remote task lease expired"),
410            Self::Cancelled(reason) => write!(f, "remote task cancelled: {reason}"),
411            Self::RemotePanic(msg) => write!(f, "remote task panicked: {msg}"),
412            Self::SerializationError(msg) => write!(f, "serialization error: {msg}"),
413            Self::TransportError(msg) => write!(f, "transport error: {msg}"),
414        }
415    }
416}
417
418impl std::error::Error for RemoteError {}
419
420// ---------------------------------------------------------------------------
421// RemoteHandle
422// ---------------------------------------------------------------------------
423
424/// Handle to a remote task, analogous to [`TaskHandle`](crate::runtime::task_handle::TaskHandle).
425///
426/// `RemoteHandle` is returned by [`spawn_remote`] and provides:
427/// - The remote task ID for identification and tracing
428/// - The target node and computation name for debugging
429/// - `join()` to await the remote result
430/// - `abort()` to request cancellation of the remote task
431///
432/// # Region Ownership
433///
434/// The `RemoteHandle` is owned by the local region. When the region closes,
435/// all remote handles participate in quiescence: the region waits for remote
436/// tasks to complete (or escalates via cancellation/lease expiry).
437///
438/// # Phase 0
439///
440/// In Phase 0, the handle wraps a oneshot channel. The actual remote protocol
441/// (spawn/ack/cancel/result/heartbeat) is defined in tmh.1.2.
442pub struct RemoteHandle {
443    /// Unique identifier for this remote task.
444    remote_task_id: RemoteTaskId,
445    /// Local proxy task ID (if registered in the runtime).
446    local_task_id: Option<TaskId>,
447    /// Target node.
448    node: NodeId,
449    /// Computation name.
450    computation: ComputationName,
451    /// Region that owns this remote task.
452    owner_region: RegionId,
453    /// Receiver for the remote result.
454    receiver: oneshot::Receiver<Result<RemoteOutcome, RemoteError>>,
455    /// Lease duration for this task.
456    lease: Duration,
457    /// Current observed state.
458    state: RemoteTaskState,
459}
460
461impl fmt::Debug for RemoteHandle {
462    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463        f.debug_struct("RemoteHandle")
464            .field("remote_task_id", &self.remote_task_id)
465            .field("local_task_id", &self.local_task_id)
466            .field("node", &self.node)
467            .field("computation", &self.computation)
468            .field("owner_region", &self.owner_region)
469            .field("lease", &self.lease)
470            .field("state", &self.state)
471            .finish_non_exhaustive()
472    }
473}
474
475impl RemoteHandle {
476    /// Returns the remote task ID.
477    #[must_use]
478    pub fn remote_task_id(&self) -> RemoteTaskId {
479        self.remote_task_id
480    }
481
482    /// Returns the local proxy task ID, if one was assigned.
483    #[must_use]
484    pub fn local_task_id(&self) -> Option<TaskId> {
485        self.local_task_id
486    }
487
488    /// Returns the target node.
489    #[must_use]
490    pub fn node(&self) -> &NodeId {
491        &self.node
492    }
493
494    /// Returns the computation name.
495    #[must_use]
496    pub fn computation(&self) -> &ComputationName {
497        &self.computation
498    }
499
500    /// Returns the owning region.
501    #[must_use]
502    pub fn owner_region(&self) -> RegionId {
503        self.owner_region
504    }
505
506    /// Returns the lease duration.
507    #[must_use]
508    pub fn lease(&self) -> Duration {
509        self.lease
510    }
511
512    /// Returns the current observed state of the remote task.
513    #[must_use]
514    pub fn state(&self) -> &RemoteTaskState {
515        &self.state
516    }
517
518    /// Returns true if the remote result is ready.
519    #[must_use]
520    pub fn is_finished(&self) -> bool {
521        self.receiver.is_ready()
522    }
523
524    /// Waits for the remote task to complete and returns its result.
525    ///
526    /// This method yields until the remote task completes (or fails/cancels).
527    ///
528    /// # Errors
529    ///
530    /// Returns `RemoteError` if the remote task failed, was cancelled,
531    /// or the lease expired.
532    pub async fn join(&self, cx: &Cx) -> Result<RemoteOutcome, RemoteError> {
533        self.receiver.recv(cx).await.unwrap_or_else(|_| {
534            Err(RemoteError::Cancelled(CancelReason::user(
535                "remote handle channel closed",
536            )))
537        })
538    }
539
540    /// Attempts to get the remote task's result without waiting.
541    ///
542    /// # Returns
543    ///
544    /// - `Ok(Some(result))` if the remote task has completed
545    /// - `Ok(None)` if the remote task is still running
546    /// - `Err(RemoteError)` if the remote task failed
547    pub fn try_join(&self) -> Result<Option<RemoteOutcome>, RemoteError> {
548        match self.receiver.try_recv() {
549            Ok(result) => Ok(Some(result?)),
550            Err(oneshot::TryRecvError::Empty) => Ok(None),
551            Err(oneshot::TryRecvError::Closed) => Err(RemoteError::Cancelled(CancelReason::user(
552                "remote handle channel closed",
553            ))),
554        }
555    }
556
557    /// Requests cancellation of the remote task.
558    ///
559    /// This is a request — the remote node may not stop immediately.
560    /// The cancellation propagates via the remote protocol (Phase 1+).
561    ///
562    /// In Phase 0, this is a no-op since there is no actual remote node.
563    pub fn abort(&self) {
564        // Phase 0: No remote node to notify.
565        // Phase 1+: Send cancel message via transport.
566    }
567}
568
569// ---------------------------------------------------------------------------
570// spawn_remote
571// ---------------------------------------------------------------------------
572
573/// Spawns a named computation on a remote node.
574///
575/// This is the primary entry point for distributed structured concurrency.
576/// The caller specifies:
577/// - A target [`NodeId`] identifying where to run the computation
578/// - A [`ComputationName`] identifying *what* to run (no closure shipping)
579/// - A [`RemoteInput`] containing serialized arguments
580///
581/// The function requires a [`RemoteCap`] from the [`Cx`], ensuring that
582/// remote operations are impossible without explicit capability.
583///
584/// # Region Ownership
585///
586/// The returned [`RemoteHandle`] is conceptually owned by the region of
587/// the calling task. When the region closes, it waits for all remote
588/// handles to resolve (or escalates per policy).
589///
590/// # Phase 0
591///
592/// In Phase 0, no actual network communication occurs. The handle is
593/// created in [`RemoteTaskState::Pending`] state. The remote protocol
594/// (spawn/ack/cancel/result/heartbeat) is defined in tmh.1.2.
595///
596/// # Errors
597///
598/// Returns [`RemoteError::NoCapability`] if the context does not have
599/// a [`RemoteCap`].
600///
601/// # Example
602///
603/// ```ignore
604/// use asupersync::remote::{spawn_remote, NodeId, ComputationName, RemoteInput};
605///
606/// let handle = spawn_remote(
607///     &cx,
608///     NodeId::new("worker-1"),
609///     ComputationName::new("encode_block"),
610///     RemoteInput::new(serialized_data),
611/// )?;
612///
613/// let result = handle.join(&cx).await?;
614/// if let RemoteOutcome::Success(data) = result {
615///     // process data
616/// }
617/// ```
618pub fn spawn_remote(
619    cx: &Cx,
620    node: NodeId,
621    computation: ComputationName,
622    input: RemoteInput,
623) -> Result<RemoteHandle, RemoteError> {
624    // Check capability
625    let cap = cx.remote().ok_or(RemoteError::NoCapability)?;
626
627    let remote_task_id = RemoteTaskId::next();
628    let region = cx.region_id();
629    let lease = cap.default_lease();
630
631    cx.trace("spawn_remote");
632
633    // Create the oneshot channel for result delivery.
634    let (tx, rx) = oneshot::channel::<Result<RemoteOutcome, RemoteError>>();
635
636    // If a remote runtime is attached, register the task and send the request.
637    if let Some(runtime) = cap.runtime() {
638        runtime.register_task(remote_task_id, tx);
639
640        let req = SpawnRequest {
641            remote_task_id,
642            computation: computation.clone(),
643            input,
644            lease,
645            idempotency_key: IdempotencyKey::generate(cx),
646            budget: cap.remote_budget,
647            origin_node: cap.local_node().clone(),
648            origin_region: region,
649            origin_task: cx.task_id(),
650        };
651
652        // Envelope timestamps always come from the task logical clock.
653        let sender_time = cx.logical_now();
654
655        let envelope = MessageEnvelope::new(
656            req.origin_node.clone(),
657            sender_time,
658            RemoteMessage::SpawnRequest(req),
659        );
660        if let Err(err) = runtime.send_message(&node, envelope) {
661            runtime.unregister_task(remote_task_id);
662            return Err(err);
663        }
664    } else {
665        // Phase 0: Drop sender (simulates network that never returns)
666        // or keep it alive if we want to simulate timeout?
667        // Dropping tx means rx.recv() will fail with Closed, which we map to Cancelled.
668        // This is fine for Phase 0 stub.
669    }
670
671    Ok(RemoteHandle {
672        remote_task_id,
673        local_task_id: None,
674        node,
675        computation,
676        owner_region: region,
677        receiver: rx,
678        lease,
679        state: RemoteTaskState::Pending,
680    })
681}
682
683// ===========================================================================
684// Lease (tmh.2.1)
685// ===========================================================================
686//
687// A Lease is a time-bounded obligation that keeps remote work alive.
688// The holder must renew periodically; expiry triggers cleanup/fencing.
689// Leases are obligations (`ObligationKind::Lease`) and block region close.
690
691/// Error type for lease operations.
692#[derive(Debug, Clone, PartialEq, Eq)]
693pub enum LeaseError {
694    /// The lease has already expired.
695    Expired,
696    /// The lease has already been released.
697    Released,
698    /// The lease obligation could not be created (region closed, limit hit).
699    CreationFailed(String),
700}
701
702impl fmt::Display for LeaseError {
703    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
704        match self {
705            Self::Expired => write!(f, "lease expired"),
706            Self::Released => write!(f, "lease already released"),
707            Self::CreationFailed(msg) => write!(f, "lease creation failed: {msg}"),
708        }
709    }
710}
711
712impl std::error::Error for LeaseError {}
713
714/// A time-bounded obligation that keeps remote work alive.
715///
716/// Leases are the distributed equivalent of structured ownership. A lease
717/// holder must periodically renew the lease; if the lease expires without
718/// renewal, the remote side assumes the holder is gone and cleans up.
719///
720/// # Obligation Integration
721///
722/// A `Lease` wraps an [`ObligationId`] with `ObligationKind::Lease`. This
723/// means the owning region cannot close until the lease is resolved (released
724/// or expired). This is how remote tasks participate in region quiescence.
725///
726/// # Lifecycle
727///
728/// ```text
729/// create() → Active ──renew()──► Active (extended)
730///                    │
731///                    ├─ release() ──► Released (obligation committed)
732///                    │
733///                    └─ expires ────► Expired (obligation aborted)
734/// ```
735///
736/// # Example
737///
738/// ```ignore
739/// use asupersync::remote::{Lease, LeaseId};
740/// use std::time::Duration;
741///
742/// let lease = Lease::new(obligation_id, region, task, Duration::from_secs(30), now);
743/// assert!(lease.is_active(now));
744///
745/// // Renew before expiry
746/// lease.renew(Duration::from_secs(30), later);
747///
748/// // Release when done
749/// lease.release(even_later);
750/// ```
751#[derive(Debug)]
752pub struct Lease {
753    /// The underlying obligation ID.
754    obligation_id: ObligationId,
755    /// Region owning this lease.
756    region: RegionId,
757    /// Task holding this lease.
758    holder: TaskId,
759    /// Absolute expiry time (virtual time in lab, wall time in prod).
760    expires_at: Time,
761    /// Original lease duration (for diagnostics).
762    initial_duration: Duration,
763    /// Current state.
764    state: LeaseState,
765    /// Number of times this lease has been renewed.
766    renewal_count: u32,
767}
768
769/// State of a lease.
770#[derive(Debug, Clone, Copy, PartialEq, Eq)]
771pub enum LeaseState {
772    /// Lease is active and has not expired.
773    Active,
774    /// Lease has been explicitly released by the holder.
775    Released,
776    /// Lease expired without renewal.
777    Expired,
778}
779
780impl fmt::Display for LeaseState {
781    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
782        match self {
783            Self::Active => write!(f, "Active"),
784            Self::Released => write!(f, "Released"),
785            Self::Expired => write!(f, "Expired"),
786        }
787    }
788}
789
790impl Lease {
791    /// Creates a new active lease.
792    ///
793    /// The `obligation_id` should be created via
794    /// `RuntimeState::create_obligation(ObligationKind::Lease, ...)`.
795    #[must_use]
796    pub fn new(
797        obligation_id: ObligationId,
798        region: RegionId,
799        holder: TaskId,
800        duration: Duration,
801        now: Time,
802    ) -> Self {
803        let expires_at = now + duration;
804        Self {
805            obligation_id,
806            region,
807            holder,
808            expires_at,
809            initial_duration: duration,
810            state: LeaseState::Active,
811            renewal_count: 0,
812        }
813    }
814
815    /// Returns the underlying obligation ID.
816    #[must_use]
817    pub fn obligation_id(&self) -> ObligationId {
818        self.obligation_id
819    }
820
821    /// Returns the owning region.
822    #[must_use]
823    pub fn region(&self) -> RegionId {
824        self.region
825    }
826
827    /// Returns the holding task.
828    #[must_use]
829    pub fn holder(&self) -> TaskId {
830        self.holder
831    }
832
833    /// Returns the absolute expiry time.
834    #[must_use]
835    pub fn expires_at(&self) -> Time {
836        self.expires_at
837    }
838
839    /// Returns the initial lease duration.
840    #[must_use]
841    pub fn initial_duration(&self) -> Duration {
842        self.initial_duration
843    }
844
845    /// Returns the current lease state.
846    #[must_use]
847    pub fn state(&self) -> LeaseState {
848        self.state
849    }
850
851    /// Returns the number of times this lease has been renewed.
852    #[must_use]
853    pub fn renewal_count(&self) -> u32 {
854        self.renewal_count
855    }
856
857    /// Returns true if the lease is active (not expired, not released).
858    #[must_use]
859    pub fn is_active(&self, now: Time) -> bool {
860        self.state == LeaseState::Active && now < self.expires_at
861    }
862
863    /// Returns true if the lease has expired (time exceeded without renewal).
864    #[must_use]
865    pub fn is_expired(&self, now: Time) -> bool {
866        self.state == LeaseState::Expired
867            || (self.state == LeaseState::Active && now >= self.expires_at)
868    }
869
870    /// Returns true if the lease has been explicitly released.
871    #[must_use]
872    pub fn is_released(&self) -> bool {
873        self.state == LeaseState::Released
874    }
875
876    /// Returns the remaining time before expiry, or zero if expired.
877    #[must_use]
878    pub fn remaining(&self, now: Time) -> Duration {
879        if now >= self.expires_at {
880            Duration::ZERO
881        } else {
882            let nanos = self.expires_at.duration_since(now);
883            Duration::from_nanos(nanos)
884        }
885    }
886
887    /// Renews the lease by extending the expiry from `now`.
888    ///
889    /// # Errors
890    ///
891    /// Returns `LeaseError::Expired` if the lease has already expired.
892    /// Returns `LeaseError::Released` if the lease was already released.
893    pub fn renew(&mut self, duration: Duration, now: Time) -> Result<(), LeaseError> {
894        match self.state {
895            LeaseState::Released => return Err(LeaseError::Released),
896            LeaseState::Expired => return Err(LeaseError::Expired),
897            LeaseState::Active => {}
898        }
899        if now >= self.expires_at {
900            self.state = LeaseState::Expired;
901            return Err(LeaseError::Expired);
902        }
903        self.expires_at = now + duration;
904        self.renewal_count += 1;
905        Ok(())
906    }
907
908    /// Explicitly releases the lease.
909    ///
910    /// This resolves the underlying obligation as committed (clean release).
911    ///
912    /// # Errors
913    ///
914    /// Returns `LeaseError::Released` if already released.
915    /// Returns `LeaseError::Expired` if already expired.
916    pub fn release(&mut self, now: Time) -> Result<(), LeaseError> {
917        match self.state {
918            LeaseState::Released => return Err(LeaseError::Released),
919            LeaseState::Expired => return Err(LeaseError::Expired),
920            LeaseState::Active => {}
921        }
922        self.state = LeaseState::Released;
923        // The caller is responsible for committing the obligation in RuntimeState.
924        // This method just updates the lease state.
925        let _ = now; // used by caller for obligation commit timestamp
926        Ok(())
927    }
928
929    /// Marks the lease as expired.
930    ///
931    /// Called by the runtime when it detects that the lease has passed its
932    /// expiry time without renewal. The underlying obligation should be
933    /// aborted with `ObligationAbortReason::Cancel`.
934    ///
935    /// # Errors
936    ///
937    /// Returns `LeaseError::Released` if already released.
938    pub fn mark_expired(&mut self) -> Result<(), LeaseError> {
939        match self.state {
940            LeaseState::Released => return Err(LeaseError::Released),
941            LeaseState::Expired => return Ok(()), // idempotent
942            LeaseState::Active => {}
943        }
944        self.state = LeaseState::Expired;
945        Ok(())
946    }
947}
948
949// ===========================================================================
950// Idempotency Store (tmh.2.2)
951// ===========================================================================
952//
953// The remote side uses an IdempotencyStore to deduplicate spawn requests.
954// Each entry maps an IdempotencyKey to its recorded outcome. Entries expire
955// after a configurable TTL to bound memory usage.
956
957/// Recorded outcome of a previously-processed idempotent request.
958#[derive(Clone, Debug)]
959pub struct IdempotencyRecord {
960    /// The key for this record.
961    pub key: IdempotencyKey,
962    /// The remote task ID assigned to this request.
963    pub remote_task_id: RemoteTaskId,
964    /// The computation that was requested.
965    pub computation: ComputationName,
966    /// When this record was created.
967    pub created_at: Time,
968    /// When this record expires (for eviction).
969    pub expires_at: Time,
970    /// The outcome, if the request has completed.
971    pub outcome: Option<RemoteOutcome>,
972}
973
974/// Decision from the idempotency store when a request arrives.
975#[derive(Clone, Debug)]
976pub enum DedupDecision {
977    /// New request — not seen before. Proceed with execution.
978    New,
979    /// Duplicate request — already processed. Return cached result.
980    Duplicate(IdempotencyRecord),
981    /// Conflict — same key but different parameters. Reject.
982    Conflict,
983}
984
985/// Store for tracking idempotent request deduplication.
986///
987/// The remote node uses this to ensure exactly-once execution semantics.
988/// When a `SpawnRequest` arrives:
989/// 1. Check the store for the idempotency key
990/// 2. If new: record and execute
991/// 3. If duplicate: return cached ack/result
992/// 4. If conflict (same key, different params): reject
993///
994/// Entries are evicted after their TTL expires.
995///
996/// # Thread Safety
997///
998/// The store is designed for single-threaded use within the deterministic
999/// lab runtime. For production multi-threaded use, wrap in a lock.
1000pub struct IdempotencyStore {
1001    entries: std::collections::HashMap<IdempotencyKey, IdempotencyRecord>,
1002    /// Default TTL for new entries.
1003    default_ttl: Duration,
1004}
1005
1006impl IdempotencyStore {
1007    /// Creates a new idempotency store with the given default TTL.
1008    #[must_use]
1009    pub fn new(default_ttl: Duration) -> Self {
1010        Self {
1011            entries: std::collections::HashMap::new(),
1012            default_ttl,
1013        }
1014    }
1015
1016    /// Checks whether a request with the given key has been seen before.
1017    ///
1018    /// This does NOT insert the key — call [`record`](Self::record) to do that.
1019    #[must_use]
1020    pub fn check(&self, key: &IdempotencyKey, computation: &ComputationName) -> DedupDecision {
1021        self.entries.get(key).map_or(DedupDecision::New, |record| {
1022            if record.computation == *computation {
1023                DedupDecision::Duplicate(record.clone())
1024            } else {
1025                DedupDecision::Conflict
1026            }
1027        })
1028    }
1029
1030    /// Records a new idempotent request.
1031    ///
1032    /// Returns `true` if the entry was inserted (new key).
1033    /// Returns `false` if the key already existed (no update).
1034    pub fn record(
1035        &mut self,
1036        key: IdempotencyKey,
1037        remote_task_id: RemoteTaskId,
1038        computation: ComputationName,
1039        now: Time,
1040    ) -> bool {
1041        use std::collections::hash_map::Entry;
1042        match self.entries.entry(key) {
1043            Entry::Vacant(e) => {
1044                let expires_at = now + self.default_ttl;
1045                e.insert(IdempotencyRecord {
1046                    key,
1047                    remote_task_id,
1048                    computation,
1049                    created_at: now,
1050                    expires_at,
1051                    outcome: None,
1052                });
1053                true
1054            }
1055            Entry::Occupied(_) => false,
1056        }
1057    }
1058
1059    /// Updates the outcome of a previously-recorded request.
1060    ///
1061    /// Returns `true` if the record was found and updated.
1062    pub fn complete(&mut self, key: &IdempotencyKey, outcome: RemoteOutcome) -> bool {
1063        match self.entries.get_mut(key) {
1064            Some(record) => {
1065                record.outcome = Some(outcome);
1066                true
1067            }
1068            None => false,
1069        }
1070    }
1071
1072    /// Evicts expired entries.
1073    ///
1074    /// Returns the number of entries evicted.
1075    pub fn evict_expired(&mut self, now: Time) -> usize {
1076        let before = self.entries.len();
1077        self.entries.retain(|_, record| now < record.expires_at);
1078        before - self.entries.len()
1079    }
1080
1081    /// Returns the number of entries in the store.
1082    #[must_use]
1083    pub fn len(&self) -> usize {
1084        self.entries.len()
1085    }
1086
1087    /// Returns true if the store is empty.
1088    #[must_use]
1089    pub fn is_empty(&self) -> bool {
1090        self.entries.is_empty()
1091    }
1092}
1093
1094impl fmt::Debug for IdempotencyStore {
1095    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1096        f.debug_struct("IdempotencyStore")
1097            .field("entries", &self.entries.len())
1098            .field("default_ttl", &self.default_ttl)
1099            .finish()
1100    }
1101}
1102
1103// ===========================================================================
1104// Saga Framework (tmh.2.3)
1105// ===========================================================================
1106//
1107// A Saga is a sequence of steps where each step has a forward action and a
1108// compensation. On failure, compensations run in reverse order. This is the
1109// distributed equivalent of structured finalizers.
1110
1111/// Identifier for a saga step.
1112pub type StepIndex = usize;
1113
1114/// A recorded compensation for a saga step.
1115///
1116/// Compensations are stored as boxed closures that take the step output
1117/// and undo the effect. In Phase 0, compensations are synchronous functions
1118/// that return a description of what was undone.
1119///
1120/// In Phase 1+, compensations will be async and budget-constrained.
1121struct CompensationEntry {
1122    /// Index of the step this compensation belongs to.
1123    step: StepIndex,
1124    /// Description of the step (for tracing).
1125    description: String,
1126    /// The compensation function.
1127    compensate: Box<dyn FnOnce() -> String + Send>,
1128}
1129
1130impl fmt::Debug for CompensationEntry {
1131    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132        f.debug_struct("CompensationEntry")
1133            .field("step", &self.step)
1134            .field("description", &self.description)
1135            .finish_non_exhaustive()
1136    }
1137}
1138
1139/// State of a saga.
1140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1141pub enum SagaState {
1142    /// Saga is executing forward steps.
1143    Running,
1144    /// Saga completed all steps successfully.
1145    Completed,
1146    /// Saga is executing compensations (rolling back).
1147    Compensating,
1148    /// Saga finished compensating (all compensations ran).
1149    Aborted,
1150}
1151
1152impl fmt::Display for SagaState {
1153    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1154        match self {
1155            Self::Running => write!(f, "Running"),
1156            Self::Completed => write!(f, "Completed"),
1157            Self::Compensating => write!(f, "Compensating"),
1158            Self::Aborted => write!(f, "Aborted"),
1159        }
1160    }
1161}
1162
1163/// Error from a saga step.
1164#[derive(Debug, Clone)]
1165pub struct SagaStepError {
1166    /// Which step failed.
1167    pub step: StepIndex,
1168    /// Description of the step.
1169    pub description: String,
1170    /// The error message.
1171    pub message: String,
1172}
1173
1174impl fmt::Display for SagaStepError {
1175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1176        write!(
1177            f,
1178            "saga step {} ({}) failed: {}",
1179            self.step, self.description, self.message
1180        )
1181    }
1182}
1183
1184impl std::error::Error for SagaStepError {}
1185
1186/// A record of a compensation that was executed during saga abort.
1187#[derive(Debug, Clone)]
1188pub struct CompensationResult {
1189    /// The step index that was compensated.
1190    pub step: StepIndex,
1191    /// Description of the step.
1192    pub description: String,
1193    /// Description of what the compensation did.
1194    pub result: String,
1195}
1196
1197/// Saga: a sequence of steps with structured compensations.
1198///
1199/// Each step has a forward action and a compensation. If any step fails,
1200/// all previously-completed compensations run in reverse order. This is
1201/// the distributed equivalent of structured finalizers.
1202///
1203/// # Design Principles
1204///
1205/// - **Compensations are deterministic**: Given the same inputs, compensations
1206///   produce the same effects. This enables lab testing of failure scenarios.
1207/// - **Reverse order**: Compensations run last-to-first, ensuring that
1208///   later steps' effects are undone before earlier steps'.
1209/// - **Budget-aware**: In Phase 1+, compensations will be budget-constrained
1210///   (they are finalizers, which run under masked cancellation).
1211/// - **Trace-aware**: Each step and compensation emits trace events.
1212///
1213/// # API Pattern
1214///
1215/// The compensation closure captures its own context. The forward action
1216/// returns a value for the caller to use in subsequent steps.
1217///
1218/// ```ignore
1219/// use asupersync::remote::Saga;
1220///
1221/// let mut saga = Saga::new();
1222///
1223/// // Step 1: Create resource — compensation captures what it needs
1224/// let id = "resource-1".to_string();
1225/// let id_for_comp = id.clone();
1226/// saga.step(
1227///     "create resource",
1228///     || Ok(id),
1229///     move || format!("deleted {id_for_comp}"),
1230/// )?;
1231///
1232/// // Step 2: Configure — no value needed for compensation
1233/// saga.step("configure", || Ok(()), || "reset config".into())?;
1234///
1235/// // Complete on success
1236/// saga.complete();
1237/// ```
1238pub struct Saga {
1239    /// Current state.
1240    state: SagaState,
1241    /// Registered compensations (in forward order; executed in reverse).
1242    compensations: Vec<CompensationEntry>,
1243    /// Number of completed steps.
1244    completed_steps: StepIndex,
1245    /// Results from compensation execution (if aborted).
1246    compensation_results: Vec<CompensationResult>,
1247}
1248
1249impl fmt::Debug for Saga {
1250    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1251        f.debug_struct("Saga")
1252            .field("state", &self.state)
1253            .field("completed_steps", &self.completed_steps)
1254            .field("compensations", &self.compensations.len())
1255            .field("compensation_results", &self.compensation_results)
1256            .finish()
1257    }
1258}
1259
1260impl Saga {
1261    /// Creates a new empty saga.
1262    #[must_use]
1263    pub fn new() -> Self {
1264        Self {
1265            state: SagaState::Running,
1266            compensations: Vec::new(),
1267            completed_steps: 0,
1268            compensation_results: Vec::new(),
1269        }
1270    }
1271
1272    /// Returns the current saga state.
1273    #[must_use]
1274    pub fn state(&self) -> SagaState {
1275        self.state
1276    }
1277
1278    /// Returns the number of completed steps.
1279    #[must_use]
1280    pub fn completed_steps(&self) -> StepIndex {
1281        self.completed_steps
1282    }
1283
1284    /// Returns the compensation results (populated after abort).
1285    #[must_use]
1286    pub fn compensation_results(&self) -> &[CompensationResult] {
1287        &self.compensation_results
1288    }
1289
1290    /// Executes a forward step and registers its compensation.
1291    ///
1292    /// The forward action runs immediately. If it succeeds, the compensation
1293    /// closure is registered for potential rollback. If it fails, the saga
1294    /// enters the compensating state and runs all registered compensations
1295    /// in reverse order.
1296    ///
1297    /// The compensation closure should capture whatever context it needs
1298    /// to undo the forward action's effect (e.g., clone the resource ID
1299    /// before passing it to the step).
1300    ///
1301    /// # Errors
1302    ///
1303    /// Returns `SagaStepError` if the forward action fails. In that case,
1304    /// compensations have already been executed before this returns.
1305    pub fn step<T>(
1306        &mut self,
1307        description: &str,
1308        action: impl FnOnce() -> Result<T, String>,
1309        compensate: impl FnOnce() -> String + Send + 'static,
1310    ) -> Result<T, SagaStepError> {
1311        assert_eq!(
1312            self.state,
1313            SagaState::Running,
1314            "cannot add steps to a saga that is not Running"
1315        );
1316
1317        let step_idx = self.completed_steps;
1318
1319        match action() {
1320            Ok(value) => {
1321                self.compensations.push(CompensationEntry {
1322                    step: step_idx,
1323                    description: description.to_string(),
1324                    compensate: Box::new(compensate),
1325                });
1326                self.completed_steps += 1;
1327                Ok(value)
1328            }
1329            Err(msg) => {
1330                let err = SagaStepError {
1331                    step: step_idx,
1332                    description: description.to_string(),
1333                    message: msg,
1334                };
1335                self.run_compensations();
1336                Err(err)
1337            }
1338        }
1339    }
1340
1341    /// Marks the saga as successfully completed.
1342    ///
1343    /// After completion, the registered compensations are dropped (they
1344    /// are no longer needed since all steps succeeded).
1345    ///
1346    /// # Panics
1347    ///
1348    /// Panics if the saga is not in `Running` state.
1349    pub fn complete(&mut self) {
1350        assert_eq!(
1351            self.state,
1352            SagaState::Running,
1353            "can only complete a Running saga"
1354        );
1355        self.state = SagaState::Completed;
1356        self.compensations.clear();
1357    }
1358
1359    /// Explicitly aborts the saga, running compensations in reverse order.
1360    ///
1361    /// This is called when the caller wants to roll back, even if no step
1362    /// has failed. For example, when cancellation is requested.
1363    ///
1364    /// # Panics
1365    ///
1366    /// Panics if the saga is not in `Running` state.
1367    pub fn abort(&mut self) {
1368        assert_eq!(
1369            self.state,
1370            SagaState::Running,
1371            "can only abort a Running saga"
1372        );
1373        self.run_compensations();
1374    }
1375
1376    /// Runs compensations in reverse order.
1377    fn run_compensations(&mut self) {
1378        self.state = SagaState::Compensating;
1379        let compensations: Vec<_> = self.compensations.drain(..).collect();
1380        for entry in compensations.into_iter().rev() {
1381            let result_desc = (entry.compensate)();
1382            self.compensation_results.push(CompensationResult {
1383                step: entry.step,
1384                description: entry.description,
1385                result: result_desc,
1386            });
1387        }
1388        self.state = SagaState::Aborted;
1389    }
1390}
1391
1392impl Default for Saga {
1393    fn default() -> Self {
1394        Self::new()
1395    }
1396}
1397
1398//
1399//   1. SpawnRequest  — originator → remote node
1400//   2. SpawnAck      — remote node → originator
1401//   3. CancelRequest — originator → remote node (or reverse for lease expiry)
1402//   4. ResultDelivery — remote node → originator
1403//   5. LeaseRenewal  — bidirectional heartbeat/renewal
1404//
1405// All messages carry the RemoteTaskId for correlation. The protocol is
1406// idempotent: duplicate SpawnRequests with the same IdempotencyKey are
1407// deduplicated by the remote node.
1408
1409// ---------------------------------------------------------------------------
1410// Idempotency key
1411// ---------------------------------------------------------------------------
1412
1413/// Idempotency key for exactly-once remote spawn semantics.
1414///
1415/// The originator generates a unique key per spawn request. The remote node
1416/// uses this to deduplicate retried requests (e.g., after network partition
1417/// recovery). Keys are 128-bit random values.
1418#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
1419pub struct IdempotencyKey(u128);
1420
1421impl IdempotencyKey {
1422    /// Generates a new random idempotency key from the context's entropy.
1423    #[must_use]
1424    pub fn generate(cx: &Cx) -> Self {
1425        let high = cx.random_u64();
1426        let low = cx.random_u64();
1427        Self((u128::from(high) << 64) | u128::from(low))
1428    }
1429
1430    /// Creates an idempotency key from a raw value (for testing/deserialization).
1431    #[must_use]
1432    pub const fn from_raw(value: u128) -> Self {
1433        Self(value)
1434    }
1435
1436    /// Returns the raw 128-bit value.
1437    #[must_use]
1438    pub const fn raw(self) -> u128 {
1439        self.0
1440    }
1441}
1442
1443impl fmt::Display for IdempotencyKey {
1444    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1445        write!(f, "IK-{:032x}", self.0)
1446    }
1447}
1448
1449// ---------------------------------------------------------------------------
1450// Protocol messages
1451// ---------------------------------------------------------------------------
1452
1453/// Envelope for protocol messages with logical time metadata.
1454///
1455/// The `sender_time` field carries the sender's logical clock snapshot,
1456/// enabling causal ordering across nodes without relying on wall clocks.
1457#[derive(Clone, Debug)]
1458pub struct MessageEnvelope<T> {
1459    /// Logical identity of the sender.
1460    pub sender: NodeId,
1461    /// Logical time at send.
1462    pub sender_time: LogicalTime,
1463    /// The wrapped protocol message.
1464    pub payload: T,
1465}
1466
1467impl<T> MessageEnvelope<T> {
1468    /// Creates a new message envelope.
1469    #[must_use]
1470    pub fn new(sender: NodeId, sender_time: LogicalTime, payload: T) -> Self {
1471        Self {
1472            sender,
1473            sender_time,
1474            payload,
1475        }
1476    }
1477}
1478
1479/// Transport hook for Phase 1+ remote protocol integration.
1480///
1481/// Implementations are responsible for framing, handshake, and delivery of
1482/// `RemoteMessage` envelopes between nodes. The runtime remains transport-agnostic.
1483pub trait RemoteTransport {
1484    /// Send a protocol message to a target node.
1485    ///
1486    /// Implementations should perform version checks and framing at the
1487    /// transport layer.
1488    fn send(
1489        &mut self,
1490        to: &NodeId,
1491        envelope: MessageEnvelope<RemoteMessage>,
1492    ) -> Result<(), RemoteError>;
1493
1494    /// Try to receive the next inbound protocol message.
1495    ///
1496    /// Returns `None` if no message is available.
1497    fn try_recv(&mut self) -> Option<MessageEnvelope<RemoteMessage>>;
1498}
1499
1500/// A message in the remote structured concurrency protocol.
1501///
1502/// All protocol messages are tagged with the enum variant for dispatch.
1503/// Each message carries the `RemoteTaskId` for correlation.
1504#[derive(Clone, Debug)]
1505pub enum RemoteMessage {
1506    /// Request to spawn a named computation on a remote node.
1507    SpawnRequest(SpawnRequest),
1508    /// Acknowledgement of a spawn request (accepted or rejected).
1509    SpawnAck(SpawnAck),
1510    /// Request to cancel a running remote task.
1511    CancelRequest(CancelRequest),
1512    /// Delivery of a remote task's terminal result.
1513    ResultDelivery(ResultDelivery),
1514    /// Lease renewal / heartbeat for an active remote task.
1515    LeaseRenewal(LeaseRenewal),
1516}
1517
1518impl RemoteMessage {
1519    /// Returns the remote task ID associated with this message.
1520    #[must_use]
1521    pub fn remote_task_id(&self) -> RemoteTaskId {
1522        match self {
1523            Self::SpawnRequest(m) => m.remote_task_id,
1524            Self::SpawnAck(m) => m.remote_task_id,
1525            Self::CancelRequest(m) => m.remote_task_id,
1526            Self::ResultDelivery(m) => m.remote_task_id,
1527            Self::LeaseRenewal(m) => m.remote_task_id,
1528        }
1529    }
1530}
1531
1532// ---------------------------------------------------------------------------
1533// SpawnRequest
1534// ---------------------------------------------------------------------------
1535
1536/// Request to spawn a named computation on a remote node.
1537///
1538/// Contains all information needed to start a remote task:
1539/// - What to run (computation name + serialized inputs)
1540/// - Who is asking (origin node, region, task)
1541/// - How long to keep it alive (lease)
1542/// - Deduplication key (idempotency)
1543///
1544/// # Idempotency
1545///
1546/// The `idempotency_key` ensures exactly-once execution. If the remote node
1547/// receives a duplicate SpawnRequest (same key), it returns the existing
1548/// SpawnAck without re-executing.
1549#[derive(Clone, Debug)]
1550pub struct SpawnRequest {
1551    /// Unique identifier for this remote task.
1552    pub remote_task_id: RemoteTaskId,
1553    /// Name of the computation to execute.
1554    pub computation: ComputationName,
1555    /// Serialized input data.
1556    pub input: RemoteInput,
1557    /// Requested lease duration.
1558    pub lease: Duration,
1559    /// Idempotency key for deduplication.
1560    pub idempotency_key: IdempotencyKey,
1561    /// Budget constraints for the remote task (optional).
1562    pub budget: Option<Budget>,
1563    /// Node that originated the request.
1564    pub origin_node: NodeId,
1565    /// Region that owns the remote task on the originator.
1566    pub origin_region: RegionId,
1567    /// Task that spawned the remote task on the originator.
1568    pub origin_task: TaskId,
1569}
1570
1571// ---------------------------------------------------------------------------
1572// SpawnAck
1573// ---------------------------------------------------------------------------
1574
1575/// Acknowledgement of a spawn request.
1576///
1577/// Sent by the remote node back to the originator to confirm or reject
1578/// the spawn request.
1579#[derive(Clone, Debug)]
1580pub struct SpawnAck {
1581    /// The remote task ID from the original request.
1582    pub remote_task_id: RemoteTaskId,
1583    /// Whether the spawn was accepted or rejected.
1584    pub status: SpawnAckStatus,
1585    /// The node that will execute the task (may differ from target if redirected).
1586    pub assigned_node: NodeId,
1587}
1588
1589/// Status of a spawn acknowledgement.
1590#[derive(Clone, Debug, PartialEq, Eq)]
1591pub enum SpawnAckStatus {
1592    /// The remote node accepted the spawn request; task is running.
1593    Accepted,
1594    /// The remote node rejected the spawn request.
1595    Rejected(SpawnRejectReason),
1596}
1597
1598/// Reason for rejecting a spawn request.
1599#[derive(Clone, Debug, PartialEq, Eq)]
1600pub enum SpawnRejectReason {
1601    /// The computation name is not registered on the remote node.
1602    UnknownComputation,
1603    /// The remote node is at capacity and cannot accept more tasks.
1604    CapacityExceeded,
1605    /// The remote node is shutting down.
1606    NodeShuttingDown,
1607    /// The input data is invalid for this computation.
1608    InvalidInput(String),
1609    /// The idempotency key was already used with different parameters.
1610    IdempotencyConflict,
1611}
1612
1613impl fmt::Display for SpawnRejectReason {
1614    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1615        match self {
1616            Self::UnknownComputation => write!(f, "unknown computation"),
1617            Self::CapacityExceeded => write!(f, "capacity exceeded"),
1618            Self::NodeShuttingDown => write!(f, "node shutting down"),
1619            Self::InvalidInput(msg) => write!(f, "invalid input: {msg}"),
1620            Self::IdempotencyConflict => write!(f, "idempotency conflict"),
1621        }
1622    }
1623}
1624
1625// ---------------------------------------------------------------------------
1626// CancelRequest
1627// ---------------------------------------------------------------------------
1628
1629/// Request to cancel a running remote task.
1630///
1631/// Sent by the originator to request cancellation, or by the remote node
1632/// to propagate a lease-expiry cancellation back.
1633#[derive(Clone, Debug)]
1634pub struct CancelRequest {
1635    /// The remote task ID to cancel.
1636    pub remote_task_id: RemoteTaskId,
1637    /// The cancellation reason.
1638    pub reason: CancelReason,
1639    /// The node sending the cancel request.
1640    pub origin_node: NodeId,
1641}
1642
1643// ---------------------------------------------------------------------------
1644// ResultDelivery
1645// ---------------------------------------------------------------------------
1646
1647/// Delivery of a remote task's terminal result.
1648///
1649/// Sent by the remote node to the originator when the task completes
1650/// (success, failure, cancellation, or panic).
1651#[derive(Clone, Debug)]
1652pub struct ResultDelivery {
1653    /// The remote task ID.
1654    pub remote_task_id: RemoteTaskId,
1655    /// The terminal outcome.
1656    pub outcome: RemoteOutcome,
1657    /// Wall-clock execution time on the remote node.
1658    pub execution_time: Duration,
1659}
1660
1661/// Terminal outcome of a remote task execution.
1662///
1663/// This mirrors the local [`Outcome`](crate::types::Outcome) lattice but
1664/// carries serialized data instead of typed values.
1665#[derive(Clone, Debug)]
1666pub enum RemoteOutcome {
1667    /// The computation completed successfully. Payload is serialized output.
1668    Success(Vec<u8>),
1669    /// The computation failed with an application error.
1670    Failed(String),
1671    /// The computation was cancelled.
1672    Cancelled(CancelReason),
1673    /// The computation panicked.
1674    Panicked(String),
1675}
1676
1677impl RemoteOutcome {
1678    /// Returns the severity level of this outcome.
1679    #[must_use]
1680    pub fn severity(&self) -> crate::types::Severity {
1681        match self {
1682            Self::Success(_) => crate::types::Severity::Ok,
1683            Self::Failed(_) => crate::types::Severity::Err,
1684            Self::Cancelled(_) => crate::types::Severity::Cancelled,
1685            Self::Panicked(_) => crate::types::Severity::Panicked,
1686        }
1687    }
1688
1689    /// Returns true if this outcome represents success.
1690    #[must_use]
1691    pub fn is_success(&self) -> bool {
1692        matches!(self, Self::Success(_))
1693    }
1694}
1695
1696impl fmt::Display for RemoteOutcome {
1697    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1698        match self {
1699            Self::Success(_) => write!(f, "Success"),
1700            Self::Failed(msg) => write!(f, "Failed: {msg}"),
1701            Self::Cancelled(reason) => write!(f, "Cancelled: {reason}"),
1702            Self::Panicked(msg) => write!(f, "Panicked: {msg}"),
1703        }
1704    }
1705}
1706
1707// ---------------------------------------------------------------------------
1708// LeaseRenewal
1709// ---------------------------------------------------------------------------
1710
1711/// Lease renewal / heartbeat for an active remote task.
1712///
1713/// Sent periodically by the remote node to the originator (or vice versa)
1714/// to confirm the task is still alive and extend the lease.
1715///
1716/// If no renewal is received within the lease window, the originator
1717/// transitions the handle to [`RemoteTaskState::LeaseExpired`] and may
1718/// escalate (cancel, retry, or fail the region).
1719#[derive(Clone, Debug)]
1720pub struct LeaseRenewal {
1721    /// The remote task ID.
1722    pub remote_task_id: RemoteTaskId,
1723    /// Requested new lease duration (from now).
1724    pub new_lease: Duration,
1725    /// Current state of the remote task.
1726    pub current_state: RemoteTaskState,
1727    /// Node sending the renewal.
1728    pub node: NodeId,
1729}
1730
1731// ---------------------------------------------------------------------------
1732// Session-typed protocol states
1733// ---------------------------------------------------------------------------
1734
1735/// Errors surfaced by the session-typed remote protocol state machine.
1736#[derive(Debug, Clone, PartialEq, Eq)]
1737pub enum RemoteProtocolError {
1738    /// Message correlated to a different remote task id than this session.
1739    RemoteTaskIdMismatch {
1740        /// Expected task id.
1741        expected: RemoteTaskId,
1742        /// Actual task id from the message.
1743        got: RemoteTaskId,
1744    },
1745    /// Spawn acknowledgement status did not match the expected transition.
1746    UnexpectedAckStatus {
1747        /// Expected status label.
1748        expected: &'static str,
1749        /// Actual status.
1750        got: SpawnAckStatus,
1751    },
1752}
1753
1754impl fmt::Display for RemoteProtocolError {
1755    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1756        match self {
1757            Self::RemoteTaskIdMismatch { expected, got } => {
1758                write!(f, "remote task id mismatch: expected {expected}, got {got}")
1759            }
1760            Self::UnexpectedAckStatus { expected, got } => write!(
1761                f,
1762                "unexpected spawn ack status: expected {expected}, got {got:?}"
1763            ),
1764        }
1765    }
1766}
1767
1768impl std::error::Error for RemoteProtocolError {}
1769
1770/// Origin-side session state: prior to sending a spawn request.
1771#[derive(Debug)]
1772pub struct OriginInit;
1773/// Origin-side session state: spawn request sent, awaiting ack.
1774#[derive(Debug)]
1775pub struct OriginSpawned;
1776/// Origin-side session state: remote task running.
1777#[derive(Debug)]
1778pub struct OriginRunning;
1779/// Origin-side session state: cancellation request sent.
1780#[derive(Debug)]
1781pub struct OriginCancelSent;
1782/// Origin-side session state: lease expired without renewal.
1783#[derive(Debug)]
1784pub struct OriginLeaseExpired;
1785/// Origin-side session state: terminal result received.
1786#[derive(Debug)]
1787pub struct OriginCompleted;
1788/// Origin-side session state: spawn rejected by remote.
1789#[derive(Debug)]
1790pub struct OriginRejected;
1791
1792/// Remote-side session state: prior to receiving a spawn request.
1793#[derive(Debug)]
1794pub struct RemoteInit;
1795/// Remote-side session state: spawn request received, awaiting ack response.
1796#[derive(Debug)]
1797pub struct RemoteSpawnReceived;
1798/// Remote-side session state: cancel received before ack was sent.
1799#[derive(Debug)]
1800pub struct RemoteCancelPending;
1801/// Remote-side session state: remote task running.
1802#[derive(Debug)]
1803pub struct RemoteRunning;
1804/// Remote-side session state: cancel received while running.
1805#[derive(Debug)]
1806pub struct RemoteCancelReceived;
1807/// Remote-side session state: terminal result sent.
1808#[derive(Debug)]
1809pub struct RemoteCompleted;
1810/// Remote-side session state: spawn rejected.
1811#[derive(Debug)]
1812pub struct RemoteRejected;
1813
1814/// Session-typed protocol state machine for the originator.
1815#[must_use = "OriginSession must be advanced to completion or rejected"]
1816#[derive(Debug)]
1817pub struct OriginSession<S> {
1818    remote_task_id: RemoteTaskId,
1819    _state: PhantomData<S>,
1820}
1821
1822impl OriginSession<OriginInit> {
1823    /// Creates a new origin-side session for a given remote task id.
1824    pub fn new(remote_task_id: RemoteTaskId) -> Self {
1825        Self {
1826            remote_task_id,
1827            _state: PhantomData,
1828        }
1829    }
1830
1831    /// Send a spawn request, transitioning into `OriginSpawned`.
1832    pub fn send_spawn(
1833        self,
1834        req: &SpawnRequest,
1835    ) -> Result<OriginSession<OriginSpawned>, RemoteProtocolError> {
1836        self.ensure_id(req.remote_task_id)?;
1837        Ok(self.transition())
1838    }
1839}
1840
1841impl<S> OriginSession<S> {
1842    /// Returns the correlated remote task id.
1843    #[must_use]
1844    pub fn remote_task_id(&self) -> RemoteTaskId {
1845        self.remote_task_id
1846    }
1847
1848    fn ensure_id(&self, got: RemoteTaskId) -> Result<(), RemoteProtocolError> {
1849        if self.remote_task_id == got {
1850            Ok(())
1851        } else {
1852            Err(RemoteProtocolError::RemoteTaskIdMismatch {
1853                expected: self.remote_task_id,
1854                got,
1855            })
1856        }
1857    }
1858
1859    fn transition<T>(self) -> OriginSession<T> {
1860        OriginSession {
1861            remote_task_id: self.remote_task_id,
1862            _state: PhantomData,
1863        }
1864    }
1865}
1866
1867/// Outcome of a spawn acknowledgement on the origin side.
1868pub enum OriginAckOutcome {
1869    /// Spawn accepted; session is running.
1870    Accepted(OriginSession<OriginRunning>),
1871    /// Spawn rejected; session ends.
1872    Rejected(OriginSession<OriginRejected>),
1873}
1874
1875impl OriginSession<OriginSpawned> {
1876    /// Receive the spawn acknowledgement and transition to running or rejected.
1877    pub fn recv_spawn_ack(self, ack: &SpawnAck) -> Result<OriginAckOutcome, RemoteProtocolError> {
1878        self.ensure_id(ack.remote_task_id)?;
1879        match ack.status {
1880            SpawnAckStatus::Accepted => Ok(OriginAckOutcome::Accepted(self.transition())),
1881            SpawnAckStatus::Rejected(_) => Ok(OriginAckOutcome::Rejected(self.transition())),
1882        }
1883    }
1884
1885    /// Send a cancellation before receiving the spawn ack.
1886    pub fn send_cancel(
1887        self,
1888        cancel: &CancelRequest,
1889    ) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
1890        self.ensure_id(cancel.remote_task_id)?;
1891        Ok(self.transition())
1892    }
1893}
1894
1895impl OriginSession<OriginRunning> {
1896    /// Receive a lease renewal while running.
1897    pub fn recv_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
1898        self.ensure_id(renewal.remote_task_id)?;
1899        Ok(self)
1900    }
1901
1902    /// Send a cancellation request while running.
1903    pub fn send_cancel(
1904        self,
1905        cancel: &CancelRequest,
1906    ) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
1907        self.ensure_id(cancel.remote_task_id)?;
1908        Ok(self.transition())
1909    }
1910
1911    /// Receive the terminal result.
1912    pub fn recv_result(
1913        self,
1914        result: &ResultDelivery,
1915    ) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
1916        self.ensure_id(result.remote_task_id)?;
1917        Ok(self.transition())
1918    }
1919
1920    /// Mark the lease as expired without renewal.
1921    pub fn lease_expired(self) -> OriginSession<OriginLeaseExpired> {
1922        self.transition()
1923    }
1924}
1925
1926impl OriginSession<OriginCancelSent> {
1927    /// Receive the terminal result after cancellation.
1928    pub fn recv_result(
1929        self,
1930        result: &ResultDelivery,
1931    ) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
1932        self.ensure_id(result.remote_task_id)?;
1933        Ok(self.transition())
1934    }
1935
1936    /// Accept a lease renewal while waiting for completion.
1937    pub fn recv_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
1938        self.ensure_id(renewal.remote_task_id)?;
1939        Ok(self)
1940    }
1941}
1942
1943impl OriginSession<OriginLeaseExpired> {
1944    /// Send a cancellation request after lease expiry.
1945    pub fn send_cancel(
1946        self,
1947        cancel: &CancelRequest,
1948    ) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
1949        self.ensure_id(cancel.remote_task_id)?;
1950        Ok(self.transition())
1951    }
1952
1953    /// Receive a late terminal result after lease expiry.
1954    pub fn recv_result(
1955        self,
1956        result: &ResultDelivery,
1957    ) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
1958        self.ensure_id(result.remote_task_id)?;
1959        Ok(self.transition())
1960    }
1961}
1962
1963/// Session-typed protocol state machine for the remote node.
1964#[must_use = "RemoteSession must be advanced to completion or rejected"]
1965#[derive(Debug)]
1966pub struct RemoteSession<S> {
1967    remote_task_id: RemoteTaskId,
1968    _state: PhantomData<S>,
1969}
1970
1971impl RemoteSession<RemoteInit> {
1972    /// Creates a new remote-side session for a given remote task id.
1973    pub fn new(remote_task_id: RemoteTaskId) -> Self {
1974        Self {
1975            remote_task_id,
1976            _state: PhantomData,
1977        }
1978    }
1979
1980    /// Receive a spawn request.
1981    pub fn recv_spawn(
1982        self,
1983        req: &SpawnRequest,
1984    ) -> Result<RemoteSession<RemoteSpawnReceived>, RemoteProtocolError> {
1985        self.ensure_id(req.remote_task_id)?;
1986        Ok(self.transition())
1987    }
1988}
1989
1990impl<S> RemoteSession<S> {
1991    /// Returns the correlated remote task id.
1992    #[must_use]
1993    pub fn remote_task_id(&self) -> RemoteTaskId {
1994        self.remote_task_id
1995    }
1996
1997    fn ensure_id(&self, got: RemoteTaskId) -> Result<(), RemoteProtocolError> {
1998        if self.remote_task_id == got {
1999            Ok(())
2000        } else {
2001            Err(RemoteProtocolError::RemoteTaskIdMismatch {
2002                expected: self.remote_task_id,
2003                got,
2004            })
2005        }
2006    }
2007
2008    fn transition<T>(self) -> RemoteSession<T> {
2009        RemoteSession {
2010            remote_task_id: self.remote_task_id,
2011            _state: PhantomData,
2012        }
2013    }
2014}
2015
2016impl RemoteSession<RemoteSpawnReceived> {
2017    /// Send an accepted spawn acknowledgement.
2018    pub fn send_ack_accepted(
2019        self,
2020        ack: &SpawnAck,
2021    ) -> Result<RemoteSession<RemoteRunning>, RemoteProtocolError> {
2022        self.ensure_id(ack.remote_task_id)?;
2023        match ack.status {
2024            SpawnAckStatus::Accepted => Ok(self.transition()),
2025            SpawnAckStatus::Rejected(_) => Err(RemoteProtocolError::UnexpectedAckStatus {
2026                expected: "Accepted",
2027                got: ack.status.clone(),
2028            }),
2029        }
2030    }
2031
2032    /// Send a rejected spawn acknowledgement.
2033    pub fn send_ack_rejected(
2034        self,
2035        ack: &SpawnAck,
2036    ) -> Result<RemoteSession<RemoteRejected>, RemoteProtocolError> {
2037        self.ensure_id(ack.remote_task_id)?;
2038        match ack.status {
2039            SpawnAckStatus::Rejected(_) => Ok(self.transition()),
2040            SpawnAckStatus::Accepted => Err(RemoteProtocolError::UnexpectedAckStatus {
2041                expected: "Rejected",
2042                got: ack.status.clone(),
2043            }),
2044        }
2045    }
2046
2047    /// Receive a cancellation before the spawn ack is sent.
2048    pub fn recv_cancel(
2049        self,
2050        cancel: &CancelRequest,
2051    ) -> Result<RemoteSession<RemoteCancelPending>, RemoteProtocolError> {
2052        self.ensure_id(cancel.remote_task_id)?;
2053        Ok(self.transition())
2054    }
2055}
2056
2057impl RemoteSession<RemoteCancelPending> {
2058    /// Send an accepted spawn acknowledgement while a cancel is pending.
2059    pub fn send_ack_accepted(
2060        self,
2061        ack: &SpawnAck,
2062    ) -> Result<RemoteSession<RemoteCancelReceived>, RemoteProtocolError> {
2063        self.ensure_id(ack.remote_task_id)?;
2064        match ack.status {
2065            SpawnAckStatus::Accepted => Ok(self.transition()),
2066            SpawnAckStatus::Rejected(_) => Err(RemoteProtocolError::UnexpectedAckStatus {
2067                expected: "Accepted",
2068                got: ack.status.clone(),
2069            }),
2070        }
2071    }
2072
2073    /// Send a rejected spawn acknowledgement while a cancel is pending.
2074    pub fn send_ack_rejected(
2075        self,
2076        ack: &SpawnAck,
2077    ) -> Result<RemoteSession<RemoteRejected>, RemoteProtocolError> {
2078        self.ensure_id(ack.remote_task_id)?;
2079        match ack.status {
2080            SpawnAckStatus::Rejected(_) => Ok(self.transition()),
2081            SpawnAckStatus::Accepted => Err(RemoteProtocolError::UnexpectedAckStatus {
2082                expected: "Rejected",
2083                got: ack.status.clone(),
2084            }),
2085        }
2086    }
2087}
2088
2089impl RemoteSession<RemoteRunning> {
2090    /// Receive a cancellation while running.
2091    pub fn recv_cancel(
2092        self,
2093        cancel: &CancelRequest,
2094    ) -> Result<RemoteSession<RemoteCancelReceived>, RemoteProtocolError> {
2095        self.ensure_id(cancel.remote_task_id)?;
2096        Ok(self.transition())
2097    }
2098
2099    /// Send a lease renewal heartbeat.
2100    pub fn send_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
2101        self.ensure_id(renewal.remote_task_id)?;
2102        Ok(self)
2103    }
2104
2105    /// Send the terminal result.
2106    pub fn send_result(
2107        self,
2108        result: &ResultDelivery,
2109    ) -> Result<RemoteSession<RemoteCompleted>, RemoteProtocolError> {
2110        self.ensure_id(result.remote_task_id)?;
2111        Ok(self.transition())
2112    }
2113}
2114
2115impl RemoteSession<RemoteCancelReceived> {
2116    /// Send the terminal result after cancellation.
2117    pub fn send_result(
2118        self,
2119        result: &ResultDelivery,
2120    ) -> Result<RemoteSession<RemoteCompleted>, RemoteProtocolError> {
2121        self.ensure_id(result.remote_task_id)?;
2122        Ok(self.transition())
2123    }
2124}
2125
2126// ---------------------------------------------------------------------------
2127// Trace events for protocol messages
2128// ---------------------------------------------------------------------------
2129
2130/// Trace event names for remote protocol messages.
2131///
2132/// These are used with `cx.trace()` to emit structured trace events
2133/// that represent the remote message flow. They enable deterministic
2134/// replay and debugging of distributed scenarios in the lab runtime.
2135pub mod trace_events {
2136    /// Emitted when a spawn request is created.
2137    pub const SPAWN_REQUEST_CREATED: &str = "remote::spawn_request_created";
2138    /// Emitted when a spawn request is sent to the transport.
2139    pub const SPAWN_REQUEST_SENT: &str = "remote::spawn_request_sent";
2140    /// Emitted when a spawn ack is received.
2141    pub const SPAWN_ACK_RECEIVED: &str = "remote::spawn_ack_received";
2142    /// Emitted when a spawn request is rejected.
2143    pub const SPAWN_REJECTED: &str = "remote::spawn_rejected";
2144    /// Emitted when a cancel request is sent.
2145    pub const CANCEL_SENT: &str = "remote::cancel_sent";
2146    /// Emitted when a cancel request is received on the remote side.
2147    pub const CANCEL_RECEIVED: &str = "remote::cancel_received";
2148    /// Emitted when a result is delivered.
2149    pub const RESULT_DELIVERED: &str = "remote::result_delivered";
2150    /// Emitted when a lease renewal is sent.
2151    pub const LEASE_RENEWAL_SENT: &str = "remote::lease_renewal_sent";
2152    /// Emitted when a lease renewal is received.
2153    pub const LEASE_RENEWAL_RECEIVED: &str = "remote::lease_renewal_received";
2154    /// Emitted when a lease expires without renewal.
2155    pub const LEASE_EXPIRED: &str = "remote::lease_expired";
2156}
2157
2158// ---------------------------------------------------------------------------
2159// Tests
2160// ---------------------------------------------------------------------------
2161
2162#[cfg(test)]
2163mod tests {
2164    use super::*;
2165    use std::sync::Mutex;
2166
2167    #[test]
2168    fn node_id_basics() {
2169        let node = NodeId::new("worker-1");
2170        assert_eq!(node.as_str(), "worker-1");
2171        assert_eq!(format!("{node}"), "Node(worker-1)");
2172
2173        let node2 = NodeId::new("worker-1");
2174        assert_eq!(node, node2);
2175
2176        let node3 = NodeId::new("worker-2");
2177        assert_ne!(node, node3);
2178    }
2179
2180    #[test]
2181    fn computation_name_basics() {
2182        let name = ComputationName::new("encode_block");
2183        assert_eq!(name.as_str(), "encode_block");
2184        assert_eq!(format!("{name}"), "encode_block");
2185
2186        let name2 = ComputationName::new("encode_block");
2187        assert_eq!(name, name2);
2188    }
2189
2190    #[test]
2191    fn remote_input_basics() {
2192        let input = RemoteInput::new(vec![1, 2, 3]);
2193        assert_eq!(input.data(), &[1, 2, 3]);
2194        assert_eq!(input.len(), 3);
2195        assert!(!input.is_empty());
2196
2197        let empty = RemoteInput::empty();
2198        assert!(empty.is_empty());
2199        assert_eq!(empty.len(), 0);
2200
2201        let owned = input.into_data();
2202        assert_eq!(owned, vec![1, 2, 3]);
2203    }
2204
2205    #[test]
2206    fn remote_cap_defaults() {
2207        let cap = RemoteCap::new();
2208        assert_eq!(cap.default_lease(), Duration::from_secs(30));
2209        assert!(cap.remote_budget().is_none());
2210        assert_eq!(cap.local_node().as_str(), "local");
2211    }
2212
2213    #[test]
2214    fn remote_cap_builder() {
2215        let cap = RemoteCap::new()
2216            .with_default_lease(Duration::from_mins(1))
2217            .with_remote_budget(Budget::INFINITE)
2218            .with_local_node(NodeId::new("origin-a"));
2219        assert_eq!(cap.default_lease(), Duration::from_mins(1));
2220        assert!(cap.remote_budget().is_some());
2221        assert_eq!(cap.local_node().as_str(), "origin-a");
2222    }
2223
2224    #[derive(Debug, Default)]
2225    struct CaptureRuntime {
2226        sent: Mutex<Vec<(NodeId, MessageEnvelope<RemoteMessage>)>>,
2227    }
2228
2229    impl RemoteRuntime for CaptureRuntime {
2230        fn send_message(
2231            &self,
2232            destination: &NodeId,
2233            envelope: MessageEnvelope<RemoteMessage>,
2234        ) -> Result<(), RemoteError> {
2235            self.sent
2236                .lock()
2237                .expect("capture runtime lock poisoned")
2238                .push((destination.clone(), envelope));
2239            Ok(())
2240        }
2241
2242        fn register_task(
2243            &self,
2244            _task_id: RemoteTaskId,
2245            _tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
2246        ) {
2247            // Intentionally dropped in this capture runtime.
2248        }
2249    }
2250
2251    #[derive(Debug, Default)]
2252    struct FailingSendRuntime {
2253        registered: Mutex<Vec<RemoteTaskId>>,
2254        unregistered: Mutex<Vec<RemoteTaskId>>,
2255    }
2256
2257    impl RemoteRuntime for FailingSendRuntime {
2258        fn send_message(
2259            &self,
2260            _destination: &NodeId,
2261            _envelope: MessageEnvelope<RemoteMessage>,
2262        ) -> Result<(), RemoteError> {
2263            Err(RemoteError::TransportError("simulated send failure".into()))
2264        }
2265
2266        fn register_task(
2267            &self,
2268            task_id: RemoteTaskId,
2269            _tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
2270        ) {
2271            self.registered
2272                .lock()
2273                .expect("failing runtime lock poisoned")
2274                .push(task_id);
2275        }
2276
2277        fn unregister_task(&self, task_id: RemoteTaskId) {
2278            self.unregistered
2279                .lock()
2280                .expect("failing runtime lock poisoned")
2281                .push(task_id);
2282        }
2283    }
2284
2285    #[test]
2286    fn spawn_remote_uses_cap_local_node_for_origin() {
2287        let runtime = Arc::new(CaptureRuntime::default());
2288        let cap = RemoteCap::new()
2289            .with_local_node(NodeId::new("origin-a"))
2290            .with_runtime(runtime.clone());
2291        let cx: Cx = Cx::for_testing_with_remote(cap);
2292
2293        let _ = spawn_remote(
2294            &cx,
2295            NodeId::new("worker-1"),
2296            ComputationName::new("encode_block"),
2297            RemoteInput::new(vec![1, 2, 3]),
2298        )
2299        .expect("spawn_remote should succeed");
2300
2301        let (destination, envelope) = {
2302            let sent = runtime.sent.lock().expect("capture runtime lock poisoned");
2303            assert_eq!(sent.len(), 1);
2304            sent[0].clone()
2305        };
2306        assert_eq!(destination.as_str(), "worker-1");
2307        assert_eq!(envelope.sender.as_str(), "origin-a");
2308        match &envelope.payload {
2309            RemoteMessage::SpawnRequest(req) => {
2310                assert_eq!(req.origin_node.as_str(), "origin-a");
2311            }
2312            other => unreachable!("expected SpawnRequest, got {other:?}"),
2313        }
2314    }
2315
2316    #[test]
2317    fn spawn_remote_send_failure_unregisters_pending_task() {
2318        let runtime = Arc::new(FailingSendRuntime::default());
2319        let cap = RemoteCap::new().with_runtime(runtime.clone());
2320        let cx: Cx = Cx::for_testing_with_remote(cap);
2321
2322        let err = spawn_remote(
2323            &cx,
2324            NodeId::new("worker-1"),
2325            ComputationName::new("encode_block"),
2326            RemoteInput::new(vec![1, 2, 3]),
2327        )
2328        .expect_err("spawn_remote should fail when send_message fails");
2329        match err {
2330            RemoteError::TransportError(msg) => {
2331                assert!(msg.contains("simulated send failure"));
2332            }
2333            other => unreachable!("expected TransportError, got {other:?}"),
2334        }
2335
2336        let registered = runtime
2337            .registered
2338            .lock()
2339            .expect("failing runtime lock poisoned")
2340            .clone();
2341        let unregistered = runtime
2342            .unregistered
2343            .lock()
2344            .expect("failing runtime lock poisoned")
2345            .clone();
2346
2347        assert_eq!(registered.len(), 1);
2348        assert_eq!(unregistered, registered);
2349    }
2350
2351    #[test]
2352    fn remote_task_id_uniqueness() {
2353        let id1 = RemoteTaskId::next();
2354        let id2 = RemoteTaskId::next();
2355        assert_ne!(id1, id2);
2356        assert!(id2.raw() > id1.raw());
2357    }
2358
2359    #[test]
2360    fn remote_task_state_display() {
2361        assert_eq!(format!("{}", RemoteTaskState::Pending), "Pending");
2362        assert_eq!(format!("{}", RemoteTaskState::Running), "Running");
2363        assert_eq!(format!("{}", RemoteTaskState::Completed), "Completed");
2364        assert_eq!(format!("{}", RemoteTaskState::LeaseExpired), "LeaseExpired");
2365    }
2366
2367    #[test]
2368    fn remote_error_display() {
2369        let err = RemoteError::NoCapability;
2370        assert_eq!(format!("{err}"), "remote capability not available");
2371
2372        let err = RemoteError::NodeUnreachable("worker-9".into());
2373        assert!(format!("{err}").contains("worker-9"));
2374
2375        let err = RemoteError::UnknownComputation("bad_fn".into());
2376        assert!(format!("{err}").contains("bad_fn"));
2377    }
2378
2379    #[test]
2380    fn spawn_remote_without_cap_fails() {
2381        let cx: Cx = Cx::for_testing();
2382        assert!(!cx.has_remote());
2383
2384        let result = spawn_remote(
2385            &cx,
2386            NodeId::new("worker-1"),
2387            ComputationName::new("encode"),
2388            RemoteInput::empty(),
2389        );
2390        assert!(result.is_err());
2391        assert_eq!(result.unwrap_err(), RemoteError::NoCapability);
2392    }
2393
2394    #[test]
2395    fn spawn_remote_with_cap_succeeds() {
2396        let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2397        assert!(cx.has_remote());
2398
2399        let result = spawn_remote(
2400            &cx,
2401            NodeId::new("worker-1"),
2402            ComputationName::new("encode_block"),
2403            RemoteInput::new(vec![42]),
2404        );
2405        assert!(result.is_ok());
2406
2407        let handle = result.unwrap();
2408        assert_eq!(handle.node().as_str(), "worker-1");
2409        assert_eq!(handle.computation().as_str(), "encode_block");
2410        assert_eq!(*handle.state(), RemoteTaskState::Pending);
2411        assert_eq!(handle.lease(), Duration::from_secs(30));
2412        assert!(handle.local_task_id().is_none());
2413    }
2414
2415    #[test]
2416    fn remote_handle_debug() {
2417        let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2418        let handle = spawn_remote(
2419            &cx,
2420            NodeId::new("n1"),
2421            ComputationName::new("compute"),
2422            RemoteInput::empty(),
2423        )
2424        .unwrap();
2425
2426        let debug = format!("{handle:?}");
2427        assert!(debug.contains("RemoteHandle"));
2428        assert!(debug.contains("n1"));
2429        assert!(debug.contains("compute"));
2430    }
2431
2432    #[test]
2433    fn remote_handle_not_finished_initially() {
2434        let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2435        let handle = spawn_remote(
2436            &cx,
2437            NodeId::new("n1"),
2438            ComputationName::new("add"),
2439            RemoteInput::empty(),
2440        )
2441        .unwrap();
2442
2443        // Phase 0: sender is dropped immediately, so the channel is closed
2444        // and is_finished will be true (closed counts as ready).
2445        // In Phase 1+, the transport holds the sender open.
2446        // For now, just verify the method exists and doesn't panic.
2447        let _ = handle.is_finished();
2448    }
2449
2450    #[test]
2451    fn remote_handle_try_join_pending() {
2452        let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2453        let handle = spawn_remote(
2454            &cx,
2455            NodeId::new("n1"),
2456            ComputationName::new("work"),
2457            RemoteInput::empty(),
2458        )
2459        .unwrap();
2460
2461        // Phase 0: sender is dropped, so try_join returns Cancelled.
2462        // In Phase 1+, the transport holds the sender and try_join returns None.
2463        let result = handle.try_join();
2464        // Either None (transport holds sender) or Cancelled (Phase 0 sender dropped)
2465        assert!(result.is_ok() || matches!(result, Err(RemoteError::Cancelled(_))));
2466    }
2467
2468    #[test]
2469    fn remote_handle_abort_no_panic() {
2470        let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2471        let handle = spawn_remote(
2472            &cx,
2473            NodeId::new("n1"),
2474            ComputationName::new("long_task"),
2475            RemoteInput::empty(),
2476        )
2477        .unwrap();
2478
2479        // Phase 0: abort is a no-op, just verify it doesn't panic.
2480        handle.abort();
2481    }
2482
2483    #[test]
2484    fn remote_cap_custom_lease_propagates() {
2485        let cap = RemoteCap::new().with_default_lease(Duration::from_mins(2));
2486        let cx: Cx = Cx::for_testing_with_remote(cap);
2487
2488        let handle = spawn_remote(
2489            &cx,
2490            NodeId::new("n1"),
2491            ComputationName::new("slow"),
2492            RemoteInput::empty(),
2493        )
2494        .unwrap();
2495
2496        assert_eq!(handle.lease(), Duration::from_mins(2));
2497    }
2498
2499    // -----------------------------------------------------------------------
2500    // Protocol tests (tmh.1.2)
2501    // -----------------------------------------------------------------------
2502
2503    #[test]
2504    fn idempotency_key_generate() {
2505        let cx: Cx = Cx::for_testing();
2506        let k1 = IdempotencyKey::generate(&cx);
2507        let k2 = IdempotencyKey::generate(&cx);
2508        // Keys should be unique (with overwhelming probability)
2509        assert_ne!(k1, k2);
2510        assert_ne!(k1.raw(), 0);
2511    }
2512
2513    #[test]
2514    fn idempotency_key_from_raw() {
2515        let key = IdempotencyKey::from_raw(0xDEAD_BEEF);
2516        assert_eq!(key.raw(), 0xDEAD_BEEF);
2517        let display = format!("{key}");
2518        assert!(display.starts_with("IK-"));
2519        assert!(display.contains("deadbeef"));
2520    }
2521
2522    #[test]
2523    fn spawn_request_construction() {
2524        let cx: Cx = Cx::for_testing();
2525        let req = SpawnRequest {
2526            remote_task_id: RemoteTaskId::next(),
2527            computation: ComputationName::new("encode_block"),
2528            input: RemoteInput::new(vec![1, 2, 3]),
2529            lease: Duration::from_mins(1),
2530            idempotency_key: IdempotencyKey::generate(&cx),
2531            budget: None,
2532            origin_node: NodeId::new("origin-1"),
2533            origin_region: cx.region_id(),
2534            origin_task: cx.task_id(),
2535        };
2536
2537        assert_eq!(req.computation.as_str(), "encode_block");
2538        assert_eq!(req.input.len(), 3);
2539        assert_eq!(req.lease, Duration::from_mins(1));
2540        assert_eq!(req.origin_node.as_str(), "origin-1");
2541    }
2542
2543    #[test]
2544    fn spawn_ack_accepted() {
2545        let ack = SpawnAck {
2546            remote_task_id: RemoteTaskId::next(),
2547            status: SpawnAckStatus::Accepted,
2548            assigned_node: NodeId::new("worker-3"),
2549        };
2550        assert_eq!(ack.status, SpawnAckStatus::Accepted);
2551        assert_eq!(ack.assigned_node.as_str(), "worker-3");
2552    }
2553
2554    #[test]
2555    fn spawn_ack_rejected() {
2556        let ack = SpawnAck {
2557            remote_task_id: RemoteTaskId::next(),
2558            status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
2559            assigned_node: NodeId::new("worker-1"),
2560        };
2561        assert_eq!(
2562            ack.status,
2563            SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded)
2564        );
2565    }
2566
2567    #[test]
2568    fn spawn_reject_reason_display() {
2569        assert_eq!(
2570            format!("{}", SpawnRejectReason::UnknownComputation),
2571            "unknown computation"
2572        );
2573        assert_eq!(
2574            format!("{}", SpawnRejectReason::CapacityExceeded),
2575            "capacity exceeded"
2576        );
2577        assert_eq!(
2578            format!("{}", SpawnRejectReason::NodeShuttingDown),
2579            "node shutting down"
2580        );
2581        assert!(
2582            format!("{}", SpawnRejectReason::InvalidInput("bad data".into())).contains("bad data")
2583        );
2584        assert_eq!(
2585            format!("{}", SpawnRejectReason::IdempotencyConflict),
2586            "idempotency conflict"
2587        );
2588    }
2589
2590    #[test]
2591    fn cancel_request_construction() {
2592        let req = CancelRequest {
2593            remote_task_id: RemoteTaskId::next(),
2594            reason: CancelReason::user("user abort"),
2595            origin_node: NodeId::new("origin-1"),
2596        };
2597        assert_eq!(req.origin_node.as_str(), "origin-1");
2598    }
2599
2600    #[test]
2601    fn result_delivery_success() {
2602        let delivery = ResultDelivery {
2603            remote_task_id: RemoteTaskId::next(),
2604            outcome: RemoteOutcome::Success(vec![42]),
2605            execution_time: Duration::from_millis(150),
2606        };
2607        assert!(delivery.outcome.is_success());
2608        assert_eq!(delivery.outcome.severity(), crate::types::Severity::Ok);
2609        assert_eq!(delivery.execution_time, Duration::from_millis(150));
2610    }
2611
2612    #[test]
2613    fn result_delivery_failure() {
2614        let delivery = ResultDelivery {
2615            remote_task_id: RemoteTaskId::next(),
2616            outcome: RemoteOutcome::Failed("out of memory".into()),
2617            execution_time: Duration::from_secs(5),
2618        };
2619        assert!(!delivery.outcome.is_success());
2620        assert_eq!(delivery.outcome.severity(), crate::types::Severity::Err);
2621    }
2622
2623    #[test]
2624    fn remote_outcome_display() {
2625        assert_eq!(format!("{}", RemoteOutcome::Success(vec![])), "Success");
2626        assert!(format!("{}", RemoteOutcome::Failed("oops".into())).contains("oops"));
2627        assert!(
2628            format!("{}", RemoteOutcome::Cancelled(CancelReason::user("done")))
2629                .contains("Cancelled")
2630        );
2631        assert!(format!("{}", RemoteOutcome::Panicked("boom".into())).contains("boom"));
2632    }
2633
2634    #[test]
2635    fn lease_renewal_construction() {
2636        let renewal = LeaseRenewal {
2637            remote_task_id: RemoteTaskId::next(),
2638            new_lease: Duration::from_secs(30),
2639            current_state: RemoteTaskState::Running,
2640            node: NodeId::new("worker-1"),
2641        };
2642        assert_eq!(renewal.new_lease, Duration::from_secs(30));
2643        assert_eq!(renewal.current_state, RemoteTaskState::Running);
2644    }
2645
2646    #[test]
2647    fn remote_message_task_id_dispatch() {
2648        let rtid = RemoteTaskId::next();
2649        let cx: Cx = Cx::for_testing();
2650
2651        let spawn_msg = RemoteMessage::SpawnRequest(SpawnRequest {
2652            remote_task_id: rtid,
2653            computation: ComputationName::new("test"),
2654            input: RemoteInput::empty(),
2655            lease: Duration::from_secs(30),
2656            idempotency_key: IdempotencyKey::generate(&cx),
2657            budget: None,
2658            origin_node: NodeId::new("n1"),
2659            origin_region: cx.region_id(),
2660            origin_task: cx.task_id(),
2661        });
2662        assert_eq!(spawn_msg.remote_task_id(), rtid);
2663
2664        let ack_msg = RemoteMessage::SpawnAck(SpawnAck {
2665            remote_task_id: rtid,
2666            status: SpawnAckStatus::Accepted,
2667            assigned_node: NodeId::new("n2"),
2668        });
2669        assert_eq!(ack_msg.remote_task_id(), rtid);
2670
2671        let cancel_msg = RemoteMessage::CancelRequest(CancelRequest {
2672            remote_task_id: rtid,
2673            reason: CancelReason::user("test"),
2674            origin_node: NodeId::new("n1"),
2675        });
2676        assert_eq!(cancel_msg.remote_task_id(), rtid);
2677
2678        let result_msg = RemoteMessage::ResultDelivery(ResultDelivery {
2679            remote_task_id: rtid,
2680            outcome: RemoteOutcome::Success(vec![]),
2681            execution_time: Duration::ZERO,
2682        });
2683        assert_eq!(result_msg.remote_task_id(), rtid);
2684
2685        let renewal_msg = RemoteMessage::LeaseRenewal(LeaseRenewal {
2686            remote_task_id: rtid,
2687            new_lease: Duration::from_secs(30),
2688            current_state: RemoteTaskState::Running,
2689            node: NodeId::new("n2"),
2690        });
2691        assert_eq!(renewal_msg.remote_task_id(), rtid);
2692    }
2693
2694    fn test_spawn_request(cx: &Cx, remote_task_id: RemoteTaskId) -> SpawnRequest {
2695        SpawnRequest {
2696            remote_task_id,
2697            computation: ComputationName::new("compute"),
2698            input: RemoteInput::empty(),
2699            lease: Duration::from_secs(30),
2700            idempotency_key: IdempotencyKey::generate(cx),
2701            budget: None,
2702            origin_node: NodeId::new("origin-1"),
2703            origin_region: cx.region_id(),
2704            origin_task: cx.task_id(),
2705        }
2706    }
2707
2708    fn test_ack_accepted(remote_task_id: RemoteTaskId) -> SpawnAck {
2709        SpawnAck {
2710            remote_task_id,
2711            status: SpawnAckStatus::Accepted,
2712            assigned_node: NodeId::new("worker-1"),
2713        }
2714    }
2715
2716    fn test_ack_rejected(remote_task_id: RemoteTaskId) -> SpawnAck {
2717        SpawnAck {
2718            remote_task_id,
2719            status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
2720            assigned_node: NodeId::new("worker-1"),
2721        }
2722    }
2723
2724    fn test_cancel(remote_task_id: RemoteTaskId) -> CancelRequest {
2725        CancelRequest {
2726            remote_task_id,
2727            reason: CancelReason::user("cancel"),
2728            origin_node: NodeId::new("origin-1"),
2729        }
2730    }
2731
2732    fn test_result(remote_task_id: RemoteTaskId, outcome: RemoteOutcome) -> ResultDelivery {
2733        ResultDelivery {
2734            remote_task_id,
2735            outcome,
2736            execution_time: Duration::ZERO,
2737        }
2738    }
2739
2740    fn test_renewal(remote_task_id: RemoteTaskId) -> LeaseRenewal {
2741        LeaseRenewal {
2742            remote_task_id,
2743            new_lease: Duration::from_secs(10),
2744            current_state: RemoteTaskState::Running,
2745            node: NodeId::new("worker-1"),
2746        }
2747    }
2748
2749    #[test]
2750    fn origin_session_cancel_flow() {
2751        let cx: Cx = Cx::for_testing();
2752        let rtid = RemoteTaskId::next();
2753        let origin = OriginSession::<OriginInit>::new(rtid);
2754        let req = test_spawn_request(&cx, rtid);
2755        let origin = origin.send_spawn(&req).unwrap();
2756        let ack = test_ack_accepted(rtid);
2757        let outcome = origin.recv_spawn_ack(&ack).unwrap();
2758        assert!(matches!(outcome, OriginAckOutcome::Accepted(_)));
2759        let origin = match outcome {
2760            OriginAckOutcome::Accepted(session) => session,
2761            OriginAckOutcome::Rejected(_) => return,
2762        };
2763        let origin = origin.recv_lease_renewal(&test_renewal(rtid)).unwrap();
2764        let origin = origin.send_cancel(&test_cancel(rtid)).unwrap();
2765        let result = test_result(
2766            rtid,
2767            RemoteOutcome::Cancelled(CancelReason::user("cancelled")),
2768        );
2769        let origin = origin.recv_result(&result).unwrap();
2770        assert_eq!(origin.remote_task_id(), rtid);
2771    }
2772
2773    #[test]
2774    fn origin_session_reject_flow() {
2775        let cx: Cx = Cx::for_testing();
2776        let rtid = RemoteTaskId::next();
2777        let origin = OriginSession::<OriginInit>::new(rtid);
2778        let req = test_spawn_request(&cx, rtid);
2779        let origin = origin.send_spawn(&req).unwrap();
2780        let ack = test_ack_rejected(rtid);
2781        let outcome = origin.recv_spawn_ack(&ack).unwrap();
2782        assert!(matches!(outcome, OriginAckOutcome::Rejected(_)));
2783        if let OriginAckOutcome::Rejected(session) = outcome {
2784            assert_eq!(session.remote_task_id(), rtid);
2785        }
2786    }
2787
2788    #[test]
2789    fn remote_session_cancel_before_ack_flow() {
2790        let cx: Cx = Cx::for_testing();
2791        let rtid = RemoteTaskId::next();
2792        let remote = RemoteSession::<RemoteInit>::new(rtid);
2793        let req = test_spawn_request(&cx, rtid);
2794        let remote = remote.recv_spawn(&req).unwrap();
2795        let remote = remote.recv_cancel(&test_cancel(rtid)).unwrap();
2796        let remote = remote.send_ack_accepted(&test_ack_accepted(rtid)).unwrap();
2797        let result = test_result(rtid, RemoteOutcome::Cancelled(CancelReason::user("done")));
2798        let remote = remote.send_result(&result).unwrap();
2799        assert_eq!(remote.remote_task_id(), rtid);
2800    }
2801
2802    #[test]
2803    fn protocol_id_mismatch_is_error() {
2804        let cx: Cx = Cx::for_testing();
2805        let rtid = RemoteTaskId::next();
2806        let origin = OriginSession::<OriginInit>::new(rtid);
2807        let req = test_spawn_request(&cx, RemoteTaskId::next());
2808        let err = origin.send_spawn(&req).unwrap_err();
2809        assert!(matches!(
2810            err,
2811            RemoteProtocolError::RemoteTaskIdMismatch { .. }
2812        ));
2813    }
2814
2815    #[test]
2816    fn protocol_ack_status_mismatch_is_error() {
2817        let cx: Cx = Cx::for_testing();
2818        let rtid = RemoteTaskId::next();
2819        let remote = RemoteSession::<RemoteInit>::new(rtid);
2820        let req = test_spawn_request(&cx, rtid);
2821        let remote = remote.recv_spawn(&req).unwrap();
2822        let ack = test_ack_rejected(rtid);
2823        let err = remote.send_ack_accepted(&ack).unwrap_err();
2824        assert!(matches!(
2825            err,
2826            RemoteProtocolError::UnexpectedAckStatus { .. }
2827        ));
2828    }
2829
2830    #[test]
2831    fn trace_event_names_are_namespaced() {
2832        // Verify all trace events follow the "remote::" namespace convention.
2833        assert!(trace_events::SPAWN_REQUEST_CREATED.starts_with("remote::"));
2834        assert!(trace_events::SPAWN_REQUEST_SENT.starts_with("remote::"));
2835        assert!(trace_events::SPAWN_ACK_RECEIVED.starts_with("remote::"));
2836        assert!(trace_events::SPAWN_REJECTED.starts_with("remote::"));
2837        assert!(trace_events::CANCEL_SENT.starts_with("remote::"));
2838        assert!(trace_events::CANCEL_RECEIVED.starts_with("remote::"));
2839        assert!(trace_events::RESULT_DELIVERED.starts_with("remote::"));
2840        assert!(trace_events::LEASE_RENEWAL_SENT.starts_with("remote::"));
2841        assert!(trace_events::LEASE_RENEWAL_RECEIVED.starts_with("remote::"));
2842        assert!(trace_events::LEASE_EXPIRED.starts_with("remote::"));
2843    }
2844
2845    // -----------------------------------------------------------------------
2846    // Lease tests (tmh.2.1)
2847    // -----------------------------------------------------------------------
2848
2849    fn test_obligation_id() -> ObligationId {
2850        ObligationId::new_for_test(0, 0)
2851    }
2852
2853    fn test_region_id() -> RegionId {
2854        RegionId::new_for_test(0, 0)
2855    }
2856
2857    fn test_task_id() -> TaskId {
2858        TaskId::new_for_test(0, 0)
2859    }
2860
2861    #[test]
2862    fn lease_creation() {
2863        let now = Time::from_secs(10);
2864        let lease = Lease::new(
2865            test_obligation_id(),
2866            test_region_id(),
2867            test_task_id(),
2868            Duration::from_secs(30),
2869            now,
2870        );
2871        assert!(lease.is_active(now));
2872        assert!(!lease.is_expired(now));
2873        assert!(!lease.is_released());
2874        assert_eq!(lease.renewal_count(), 0);
2875        assert_eq!(lease.initial_duration(), Duration::from_secs(30));
2876        assert_eq!(lease.expires_at(), Time::from_secs(40));
2877    }
2878
2879    #[test]
2880    fn lease_remaining_time() {
2881        let now = Time::from_secs(10);
2882        let lease = Lease::new(
2883            test_obligation_id(),
2884            test_region_id(),
2885            test_task_id(),
2886            Duration::from_secs(30),
2887            now,
2888        );
2889        let remaining = lease.remaining(Time::from_secs(20));
2890        assert_eq!(remaining, Duration::from_secs(20));
2891
2892        // At expiry: zero remaining
2893        let remaining = lease.remaining(Time::from_secs(40));
2894        assert_eq!(remaining, Duration::ZERO);
2895
2896        // Past expiry: zero remaining
2897        let remaining = lease.remaining(Time::from_secs(50));
2898        assert_eq!(remaining, Duration::ZERO);
2899    }
2900
2901    #[test]
2902    fn lease_expiry_detection() {
2903        let now = Time::from_secs(10);
2904        let lease = Lease::new(
2905            test_obligation_id(),
2906            test_region_id(),
2907            test_task_id(),
2908            Duration::from_secs(30),
2909            now,
2910        );
2911
2912        // Before expiry
2913        assert!(!lease.is_expired(Time::from_secs(39)));
2914        assert!(lease.is_active(Time::from_secs(39)));
2915
2916        // At expiry boundary
2917        assert!(lease.is_expired(Time::from_secs(40)));
2918        assert!(!lease.is_active(Time::from_secs(40)));
2919
2920        // After expiry
2921        assert!(lease.is_expired(Time::from_secs(50)));
2922    }
2923
2924    #[test]
2925    fn lease_renew_extends_expiry() {
2926        let now = Time::from_secs(10);
2927        let mut lease = Lease::new(
2928            test_obligation_id(),
2929            test_region_id(),
2930            test_task_id(),
2931            Duration::from_secs(30),
2932            now,
2933        );
2934
2935        // Renew at t=25 for another 30s
2936        let result = lease.renew(Duration::from_secs(30), Time::from_secs(25));
2937        assert!(result.is_ok());
2938        assert_eq!(lease.expires_at(), Time::from_secs(55));
2939        assert_eq!(lease.renewal_count(), 1);
2940
2941        // Renew again at t=50
2942        let result = lease.renew(Duration::from_secs(30), Time::from_secs(50));
2943        assert!(result.is_ok());
2944        assert_eq!(lease.expires_at(), Time::from_secs(80));
2945        assert_eq!(lease.renewal_count(), 2);
2946    }
2947
2948    #[test]
2949    fn lease_renew_after_expiry_fails() {
2950        let now = Time::from_secs(10);
2951        let mut lease = Lease::new(
2952            test_obligation_id(),
2953            test_region_id(),
2954            test_task_id(),
2955            Duration::from_secs(30),
2956            now,
2957        );
2958
2959        // Try to renew after expiry
2960        let result = lease.renew(Duration::from_secs(30), Time::from_secs(50));
2961        assert_eq!(result, Err(LeaseError::Expired));
2962        assert_eq!(lease.state(), LeaseState::Expired);
2963    }
2964
2965    #[test]
2966    fn lease_release() {
2967        let now = Time::from_secs(10);
2968        let mut lease = Lease::new(
2969            test_obligation_id(),
2970            test_region_id(),
2971            test_task_id(),
2972            Duration::from_secs(30),
2973            now,
2974        );
2975
2976        let result = lease.release(Time::from_secs(20));
2977        assert!(result.is_ok());
2978        assert!(lease.is_released());
2979        assert_eq!(lease.state(), LeaseState::Released);
2980    }
2981
2982    #[test]
2983    fn lease_double_release_fails() {
2984        let now = Time::from_secs(10);
2985        let mut lease = Lease::new(
2986            test_obligation_id(),
2987            test_region_id(),
2988            test_task_id(),
2989            Duration::from_secs(30),
2990            now,
2991        );
2992
2993        lease.release(Time::from_secs(20)).unwrap();
2994        let result = lease.release(Time::from_secs(25));
2995        assert_eq!(result, Err(LeaseError::Released));
2996    }
2997
2998    #[test]
2999    fn lease_renew_after_release_fails() {
3000        let now = Time::from_secs(10);
3001        let mut lease = Lease::new(
3002            test_obligation_id(),
3003            test_region_id(),
3004            test_task_id(),
3005            Duration::from_secs(30),
3006            now,
3007        );
3008
3009        lease.release(Time::from_secs(20)).unwrap();
3010        let result = lease.renew(Duration::from_secs(30), Time::from_secs(25));
3011        assert_eq!(result, Err(LeaseError::Released));
3012    }
3013
3014    #[test]
3015    fn lease_mark_expired() {
3016        let now = Time::from_secs(10);
3017        let mut lease = Lease::new(
3018            test_obligation_id(),
3019            test_region_id(),
3020            test_task_id(),
3021            Duration::from_secs(30),
3022            now,
3023        );
3024
3025        let result = lease.mark_expired();
3026        assert!(result.is_ok());
3027        assert_eq!(lease.state(), LeaseState::Expired);
3028
3029        // Idempotent
3030        let result = lease.mark_expired();
3031        assert!(result.is_ok());
3032    }
3033
3034    #[test]
3035    fn lease_mark_expired_after_release_fails() {
3036        let now = Time::from_secs(10);
3037        let mut lease = Lease::new(
3038            test_obligation_id(),
3039            test_region_id(),
3040            test_task_id(),
3041            Duration::from_secs(30),
3042            now,
3043        );
3044
3045        lease.release(Time::from_secs(20)).unwrap();
3046        let result = lease.mark_expired();
3047        assert_eq!(result, Err(LeaseError::Released));
3048    }
3049
3050    #[test]
3051    fn lease_state_display() {
3052        assert_eq!(format!("{}", LeaseState::Active), "Active");
3053        assert_eq!(format!("{}", LeaseState::Released), "Released");
3054        assert_eq!(format!("{}", LeaseState::Expired), "Expired");
3055    }
3056
3057    #[test]
3058    fn lease_error_display() {
3059        assert_eq!(format!("{}", LeaseError::Expired), "lease expired");
3060        assert_eq!(
3061            format!("{}", LeaseError::Released),
3062            "lease already released"
3063        );
3064        assert!(format!("{}", LeaseError::CreationFailed("full".into())).contains("full"));
3065    }
3066
3067    // -----------------------------------------------------------------------
3068    // Idempotency store tests (tmh.2.2)
3069    // -----------------------------------------------------------------------
3070
3071    #[test]
3072    fn idempotency_store_new_request() {
3073        let mut store = IdempotencyStore::new(Duration::from_mins(5));
3074        assert!(store.is_empty());
3075
3076        let key = IdempotencyKey::from_raw(1);
3077        let decision = store.check(&key, &ComputationName::new("encode"));
3078        assert!(matches!(decision, DedupDecision::New));
3079
3080        let inserted = store.record(
3081            key,
3082            RemoteTaskId::next(),
3083            ComputationName::new("encode"),
3084            Time::from_secs(10),
3085        );
3086        assert!(inserted);
3087        assert_eq!(store.len(), 1);
3088    }
3089
3090    #[test]
3091    fn idempotency_store_duplicate_detection() {
3092        let mut store = IdempotencyStore::new(Duration::from_mins(5));
3093        let key = IdempotencyKey::from_raw(42);
3094        let comp = ComputationName::new("encode");
3095
3096        store.record(key, RemoteTaskId::next(), comp.clone(), Time::from_secs(10));
3097
3098        // Same key, same computation → Duplicate
3099        let decision = store.check(&key, &comp);
3100        assert!(matches!(decision, DedupDecision::Duplicate(_)));
3101
3102        // Trying to record again returns false
3103        let inserted = store.record(key, RemoteTaskId::next(), comp, Time::from_secs(20));
3104        assert!(!inserted);
3105        assert_eq!(store.len(), 1);
3106    }
3107
3108    #[test]
3109    fn idempotency_store_conflict_detection() {
3110        let mut store = IdempotencyStore::new(Duration::from_mins(5));
3111        let key = IdempotencyKey::from_raw(42);
3112
3113        store.record(
3114            key,
3115            RemoteTaskId::next(),
3116            ComputationName::new("encode"),
3117            Time::from_secs(10),
3118        );
3119
3120        // Same key, DIFFERENT computation → Conflict
3121        let decision = store.check(&key, &ComputationName::new("decode"));
3122        assert!(matches!(decision, DedupDecision::Conflict));
3123    }
3124
3125    #[test]
3126    fn idempotency_store_complete_outcome() {
3127        let mut store = IdempotencyStore::new(Duration::from_mins(5));
3128        let key = IdempotencyKey::from_raw(99);
3129
3130        store.record(
3131            key,
3132            RemoteTaskId::next(),
3133            ComputationName::new("work"),
3134            Time::from_secs(10),
3135        );
3136
3137        // Complete with success
3138        let updated = store.complete(&key, RemoteOutcome::Success(vec![1, 2, 3]));
3139        assert!(updated);
3140
3141        // Check returns duplicate with outcome
3142        let decision = store.check(&key, &ComputationName::new("work"));
3143        assert!(matches!(decision, DedupDecision::Duplicate(_)));
3144        if let DedupDecision::Duplicate(record) = decision {
3145            assert!(record.outcome.is_some());
3146            assert!(record.outcome.unwrap().is_success());
3147        }
3148    }
3149
3150    #[test]
3151    fn idempotency_store_complete_unknown_key() {
3152        let mut store = IdempotencyStore::new(Duration::from_mins(5));
3153        let key = IdempotencyKey::from_raw(999);
3154
3155        // Complete on unknown key returns false
3156        let updated = store.complete(&key, RemoteOutcome::Failed("oops".into()));
3157        assert!(!updated);
3158    }
3159
3160    #[test]
3161    fn idempotency_store_eviction() {
3162        let mut store = IdempotencyStore::new(Duration::from_mins(1));
3163
3164        // Insert at t=10 (expires at t=70)
3165        store.record(
3166            IdempotencyKey::from_raw(1),
3167            RemoteTaskId::next(),
3168            ComputationName::new("a"),
3169            Time::from_secs(10),
3170        );
3171
3172        // Insert at t=50 (expires at t=110)
3173        store.record(
3174            IdempotencyKey::from_raw(2),
3175            RemoteTaskId::next(),
3176            ComputationName::new("b"),
3177            Time::from_secs(50),
3178        );
3179        assert_eq!(store.len(), 2);
3180
3181        // Evict at t=80: key 1 expired (70), key 2 still live (110)
3182        let evicted = store.evict_expired(Time::from_secs(80));
3183        assert_eq!(evicted, 1);
3184        assert_eq!(store.len(), 1);
3185
3186        // Key 2 is still there
3187        let decision = store.check(&IdempotencyKey::from_raw(2), &ComputationName::new("b"));
3188        assert!(matches!(decision, DedupDecision::Duplicate(_)));
3189
3190        // Key 1 is gone
3191        let decision = store.check(&IdempotencyKey::from_raw(1), &ComputationName::new("a"));
3192        assert!(matches!(decision, DedupDecision::New));
3193    }
3194
3195    #[test]
3196    fn idempotency_store_debug() {
3197        let store = IdempotencyStore::new(Duration::from_mins(1));
3198        let debug = format!("{store:?}");
3199        assert!(debug.contains("IdempotencyStore"));
3200        assert!(debug.contains("entries"));
3201    }
3202
3203    // -----------------------------------------------------------------------
3204    // Saga tests (tmh.2.3)
3205    // -----------------------------------------------------------------------
3206
3207    #[test]
3208    fn saga_successful_completion() {
3209        let mut saga = Saga::new();
3210        assert_eq!(saga.state(), SagaState::Running);
3211        assert_eq!(saga.completed_steps(), 0);
3212
3213        let r1: Result<String, _> = saga.step(
3214            "create resource",
3215            || Ok("resource-1".to_string()),
3216            || "deleted resource-1".to_string(),
3217        );
3218        assert!(r1.is_ok());
3219        assert_eq!(r1.unwrap(), "resource-1");
3220        assert_eq!(saga.completed_steps(), 1);
3221
3222        let r2: Result<(), _> = saga.step("configure", || Ok(()), || "reset config".to_string());
3223        assert!(r2.is_ok());
3224        assert_eq!(saga.completed_steps(), 2);
3225
3226        saga.complete();
3227        assert_eq!(saga.state(), SagaState::Completed);
3228        assert!(saga.compensation_results().is_empty());
3229    }
3230
3231    #[test]
3232    fn saga_step_failure_runs_compensations_reverse() {
3233        use std::sync::{Arc, Mutex};
3234
3235        let order = Arc::new(Mutex::new(Vec::new()));
3236
3237        let o1 = Arc::clone(&order);
3238        let mut saga = Saga::new();
3239
3240        saga.step(
3241            "step-0",
3242            || Ok(()),
3243            move || {
3244                o1.lock().unwrap().push(0);
3245                "comp-0".to_string()
3246            },
3247        )
3248        .unwrap();
3249
3250        let o2 = Arc::clone(&order);
3251        saga.step(
3252            "step-1",
3253            || Ok(()),
3254            move || {
3255                o2.lock().unwrap().push(1);
3256                "comp-1".to_string()
3257            },
3258        )
3259        .unwrap();
3260
3261        let o3 = Arc::clone(&order);
3262        // Step 2 fails
3263        let result: Result<(), SagaStepError> = saga.step(
3264            "step-2",
3265            || Err("boom".to_string()),
3266            move || {
3267                o3.lock().unwrap().push(2);
3268                "comp-2".to_string()
3269            },
3270        );
3271
3272        assert!(result.is_err());
3273        let err = result.unwrap_err();
3274        assert_eq!(err.step, 2);
3275        assert!(err.message.contains("boom"));
3276
3277        // Saga should be aborted
3278        assert_eq!(saga.state(), SagaState::Aborted);
3279
3280        // Compensations should have run in reverse: step-1, step-0
3281        // (step-2 never succeeded, so no compensation for it)
3282        let comps = saga.compensation_results();
3283        assert_eq!(comps.len(), 2);
3284        assert_eq!(comps[0].step, 1); // step-1 first (reverse order)
3285        assert_eq!(comps[1].step, 0); // step-0 second
3286
3287        // Verify execution order: 1 then 0 (reverse)
3288        let executed = order.lock().unwrap().clone();
3289        assert_eq!(executed, vec![1, 0]);
3290    }
3291
3292    #[test]
3293    fn saga_explicit_abort() {
3294        use std::sync::{Arc, Mutex};
3295
3296        let compensated = Arc::new(Mutex::new(Vec::new()));
3297        let mut saga = Saga::new();
3298
3299        let c1 = Arc::clone(&compensated);
3300        saga.step(
3301            "step-0",
3302            || Ok(()),
3303            move || {
3304                c1.lock().unwrap().push("step-0");
3305                "undid step-0".to_string()
3306            },
3307        )
3308        .unwrap();
3309
3310        let c2 = Arc::clone(&compensated);
3311        saga.step(
3312            "step-1",
3313            || Ok(()),
3314            move || {
3315                c2.lock().unwrap().push("step-1");
3316                "undid step-1".to_string()
3317            },
3318        )
3319        .unwrap();
3320
3321        // Explicitly abort (e.g., due to cancellation)
3322        saga.abort();
3323        assert_eq!(saga.state(), SagaState::Aborted);
3324
3325        let comps = saga.compensation_results();
3326        assert_eq!(comps.len(), 2);
3327        assert_eq!(comps[0].description, "step-1"); // reverse order
3328        assert_eq!(comps[1].description, "step-0");
3329
3330        let executed = compensated.lock().unwrap().clone();
3331        assert_eq!(executed, vec!["step-1", "step-0"]);
3332    }
3333
3334    #[test]
3335    fn saga_first_step_failure_no_compensations() {
3336        let mut saga = Saga::new();
3337
3338        // First step fails — nothing to compensate
3339        let result: Result<(), _> = saga.step("fail-step", || Err("bad".to_string()), String::new);
3340        assert!(result.is_err());
3341        assert_eq!(saga.state(), SagaState::Aborted);
3342        assert!(saga.compensation_results().is_empty());
3343    }
3344
3345    #[test]
3346    fn saga_state_display() {
3347        assert_eq!(format!("{}", SagaState::Running), "Running");
3348        assert_eq!(format!("{}", SagaState::Completed), "Completed");
3349        assert_eq!(format!("{}", SagaState::Compensating), "Compensating");
3350        assert_eq!(format!("{}", SagaState::Aborted), "Aborted");
3351    }
3352
3353    #[test]
3354    fn saga_step_error_display() {
3355        let err = SagaStepError {
3356            step: 3,
3357            description: "deploy".to_string(),
3358            message: "timeout".to_string(),
3359        };
3360        let text = format!("{err}");
3361        assert!(text.contains('3'));
3362        assert!(text.contains("deploy"));
3363        assert!(text.contains("timeout"));
3364    }
3365
3366    #[test]
3367    fn saga_debug() {
3368        let saga = Saga::new();
3369        let debug = format!("{saga:?}");
3370        assert!(debug.contains("Saga"));
3371        assert!(debug.contains("Running"));
3372    }
3373
3374    #[test]
3375    fn saga_default_trait() {
3376        let saga = Saga::default();
3377        assert_eq!(saga.state(), SagaState::Running);
3378    }
3379}