Skip to main content

awaken_runtime_contract/contract/
live_control.rs

1//! Ephemeral live-control commands consumed by the runtime.
2//!
3//! This is intentionally smaller than the server mailbox contract: runtime
4//! only needs to subscribe to commands for the active run and acknowledge
5//! accepted deliveries. Durable queueing, leases, retries, and dispatch state
6//! live in the server mailbox contract.
7
8use std::pin::Pin;
9
10use async_trait::async_trait;
11use futures::Stream;
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14
15use super::message::Message;
16use super::suspension::ToolCallResume;
17
18#[derive(Debug, Error, Clone, PartialEq, Eq)]
19pub enum LiveControlError {
20    #[error("live control subscribe failed: {0}")]
21    Subscribe(String),
22}
23
24// ── LiveRunCommand ─────────────────────────────────────────────────────────
25
26/// Control command delivered to an active run's owning node (out-of-band
27/// relative to durable dispatch). Consumed by the runtime forwarder attached
28/// to each `RunHandle`; unsubscribed targets silently drop commands (best
29/// effort — steering is ephemeral by design).
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[non_exhaustive]
32pub enum LiveRunCommand {
33    /// Inject messages into the running agent's next step boundary inbox.
34    Messages(Vec<Message>),
35    /// Wake the owner run to consume already-staged pending messages.
36    PendingBoundaryWake,
37    /// Cooperatively cancel the run (`immediate` cancellation token).
38    Cancel,
39    /// Deliver tool-call resume decisions to the run.
40    Decision(Vec<(String, ToolCallResume)>),
41}
42
43/// Exact live-run target for cross-node ephemeral control.
44///
45/// Thread-only routing is intentionally insufficient for distributed
46/// backends: a stale subscriber for the same thread must not be able to ack a
47/// command intended for a newer run/dispatch.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct LiveRunTarget {
50    pub thread_id: String,
51    pub run_id: String,
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub dispatch_id: Option<String>,
54}
55
56impl LiveRunTarget {
57    #[must_use]
58    pub fn new(thread_id: impl Into<String>, run_id: impl Into<String>) -> Self {
59        Self {
60            thread_id: thread_id.into(),
61            run_id: run_id.into(),
62            dispatch_id: None,
63        }
64    }
65
66    #[must_use]
67    pub fn with_dispatch_id(mut self, dispatch_id: impl Into<String>) -> Self {
68        self.dispatch_id = Some(dispatch_id.into());
69        self
70    }
71}
72
73/// Completion receipt for a delivered `LiveRunCommand`. Consumers call
74/// [`LiveCommandReceipt::ack`] **only after the run has actually accepted
75/// the command** (e.g. the inbox channel returned success). A dropped
76/// receipt signals the producer that delivery did not complete, and the
77/// producer's `deliver_live` resolves as
78/// [`LiveDeliveryOutcome::NoSubscriber`] so the caller can fall back to
79/// durable dispatch. Producers MUST NOT observe `Delivered` until the
80/// receipt has been acknowledged.
81pub trait LiveCommandReceipt: Send + Sync {
82    /// Confirm the command was handed to the live consumer. Consumes the
83    /// receipt; dropping the handle without calling `ack` is treated as
84    /// non-delivery by the producer.
85    fn ack(self: Box<Self>);
86}
87
88/// Entry yielded by a [`LiveRunCommandStream`]: the command plus the
89/// receipt the consumer must `ack` once the run has received it.
90pub struct LiveRunCommandEntry {
91    pub command: LiveRunCommand,
92    pub receipt: Box<dyn LiveCommandReceipt>,
93}
94
95impl std::fmt::Debug for LiveRunCommandEntry {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        f.debug_struct("LiveRunCommandEntry")
98            .field("command", &self.command)
99            .finish_non_exhaustive()
100    }
101}
102
103/// Stream of [`LiveRunCommandEntry`] consumed by the owning node's runtime
104/// forwarder.
105pub type LiveRunCommandStream = Pin<Box<dyn Stream<Item = LiveRunCommandEntry> + Send>>;
106
107/// Outcome of a live-control delivery call — lets the caller decide
108/// whether to fall back to the durable queue. `NoSubscriber` means *no node
109/// acknowledged the command*; the command was either lost in transit or
110/// the run failed to accept it. Callers must treat `NoSubscriber` as
111/// "did not deliver" and fall back to durable dispatch.
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum LiveDeliveryOutcome {
114    /// The owning run accepted the command (forwarder acked after handing
115    /// the command to the in-process channel).
116    Delivered,
117    /// No subscriber, or the subscriber failed to accept within the
118    /// producer's timeout. Caller should fall back to durable dispatch.
119    NoSubscriber,
120}
121
122#[async_trait]
123pub trait LiveRunCommandSource: Send + Sync {
124    async fn open_live_channel_for(
125        &self,
126        target: &LiveRunTarget,
127    ) -> Result<LiveRunCommandStream, LiveControlError>;
128}