awaken_runtime_contract/contract/live_control.rs
1//! Ephemeral live-control commands consumed by the runtime.
2//!
3//! This is intentionally smaller than the server mailbox contract: runtime
4//! only needs to subscribe to commands for the active run and acknowledge
5//! accepted deliveries. Durable queueing, leases, retries, and dispatch state
6//! live in the server mailbox contract.
7
8use std::pin::Pin;
9
10use async_trait::async_trait;
11use futures::Stream;
12use serde::{Deserialize, Serialize};
13use thiserror::Error;
14
15use super::message::Message;
16use super::suspension::ToolCallResume;
17
18#[derive(Debug, Error, Clone, PartialEq, Eq)]
19pub enum LiveControlError {
20 #[error("live control subscribe failed: {0}")]
21 Subscribe(String),
22}
23
24// ── LiveRunCommand ─────────────────────────────────────────────────────────
25
26/// Control command delivered to an active run's owning node (out-of-band
27/// relative to durable dispatch). Consumed by the runtime forwarder attached
28/// to each `RunHandle`; unsubscribed targets silently drop commands (best
29/// effort — steering is ephemeral by design).
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[non_exhaustive]
32pub enum LiveRunCommand {
33 /// Inject messages into the running agent's next step boundary inbox.
34 Messages(Vec<Message>),
35 /// Wake the owner run to consume already-staged pending messages.
36 PendingBoundaryWake,
37 /// Cooperatively cancel the run (`immediate` cancellation token).
38 Cancel,
39 /// Deliver tool-call resume decisions to the run.
40 Decision(Vec<(String, ToolCallResume)>),
41}
42
43/// Exact live-run target for cross-node ephemeral control.
44///
45/// Thread-only routing is intentionally insufficient for distributed
46/// backends: a stale subscriber for the same thread must not be able to ack a
47/// command intended for a newer run/dispatch.
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct LiveRunTarget {
50 pub thread_id: String,
51 pub run_id: String,
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub dispatch_id: Option<String>,
54}
55
56impl LiveRunTarget {
57 #[must_use]
58 pub fn new(thread_id: impl Into<String>, run_id: impl Into<String>) -> Self {
59 Self {
60 thread_id: thread_id.into(),
61 run_id: run_id.into(),
62 dispatch_id: None,
63 }
64 }
65
66 #[must_use]
67 pub fn with_dispatch_id(mut self, dispatch_id: impl Into<String>) -> Self {
68 self.dispatch_id = Some(dispatch_id.into());
69 self
70 }
71}
72
73/// Completion receipt for a delivered `LiveRunCommand`. Consumers call
74/// [`LiveCommandReceipt::ack`] **only after the run has actually accepted
75/// the command** (e.g. the inbox channel returned success). A dropped
76/// receipt signals the producer that delivery did not complete, and the
77/// producer's `deliver_live` resolves as
78/// [`LiveDeliveryOutcome::NoSubscriber`] so the caller can fall back to
79/// durable dispatch. Producers MUST NOT observe `Delivered` until the
80/// receipt has been acknowledged.
81pub trait LiveCommandReceipt: Send + Sync {
82 /// Confirm the command was handed to the live consumer. Consumes the
83 /// receipt; dropping the handle without calling `ack` is treated as
84 /// non-delivery by the producer.
85 fn ack(self: Box<Self>);
86}
87
88/// Entry yielded by a [`LiveRunCommandStream`]: the command plus the
89/// receipt the consumer must `ack` once the run has received it.
90pub struct LiveRunCommandEntry {
91 pub command: LiveRunCommand,
92 pub receipt: Box<dyn LiveCommandReceipt>,
93}
94
95impl std::fmt::Debug for LiveRunCommandEntry {
96 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97 f.debug_struct("LiveRunCommandEntry")
98 .field("command", &self.command)
99 .finish_non_exhaustive()
100 }
101}
102
103/// Stream of [`LiveRunCommandEntry`] consumed by the owning node's runtime
104/// forwarder.
105pub type LiveRunCommandStream = Pin<Box<dyn Stream<Item = LiveRunCommandEntry> + Send>>;
106
107/// Outcome of a live-control delivery call — lets the caller decide
108/// whether to fall back to the durable queue. `NoSubscriber` means *no node
109/// acknowledged the command*; the command was either lost in transit or
110/// the run failed to accept it. Callers must treat `NoSubscriber` as
111/// "did not deliver" and fall back to durable dispatch.
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum LiveDeliveryOutcome {
114 /// The owning run accepted the command (forwarder acked after handing
115 /// the command to the in-process channel).
116 Delivered,
117 /// No subscriber, or the subscriber failed to accept within the
118 /// producer's timeout. Caller should fall back to durable dispatch.
119 NoSubscriber,
120}
121
122#[async_trait]
123pub trait LiveRunCommandSource: Send + Sync {
124 async fn open_live_channel_for(
125 &self,
126 target: &LiveRunTarget,
127 ) -> Result<LiveRunCommandStream, LiveControlError>;
128}