Skip to main content

meerkat_core/
ops_lifecycle.rs

1//! Canonical async-operation lifecycle seam for shared child/background work.
2
3use serde::{Deserialize, Serialize};
4
5#[cfg(target_arch = "wasm32")]
6use crate::tokio::sync::oneshot;
7#[cfg(not(target_arch = "wasm32"))]
8use tokio::sync::oneshot;
9
10use crate::comms::TrustedPeerSpec;
11use crate::lifecycle::{RunId, WaitRequestId};
12pub use crate::ops::{OperationId, OperationResult};
13use crate::types::SessionId;
14
15/// Default maximum number of completed operations to retain before eviction.
16pub const DEFAULT_MAX_COMPLETED: usize = 256;
17
18/// The kind of async operation tracked by the shared lifecycle registry.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum OperationKind {
22    MobMemberChild,
23    BackgroundToolOp,
24}
25
26impl OperationKind {
27    /// Whether this kind can expose a peer-ready handoff.
28    pub fn expects_peer_channel(self) -> bool {
29        matches!(self, Self::MobMemberChild)
30    }
31}
32
33/// Lifecycle-relevant registration payload for an operation.
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct OperationSpec {
36    pub id: OperationId,
37    pub kind: OperationKind,
38    pub owner_session_id: SessionId,
39    pub display_name: String,
40    pub source_label: String,
41    pub child_session_id: Option<SessionId>,
42    pub expect_peer_channel: bool,
43}
44
45/// Peer-facing connection handoff surfaced once an operation is ready.
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct OperationPeerHandle {
48    pub peer_name: String,
49    pub trusted_peer: TrustedPeerSpec,
50}
51
52/// Progress update for a long-running async operation.
53#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
54pub struct OperationProgressUpdate {
55    pub message: String,
56    pub percent: Option<f32>,
57}
58
59/// Terminal lifecycle outcome recorded for an operation.
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(tag = "outcome_type", rename_all = "snake_case")]
62pub enum OperationTerminalOutcome {
63    Completed(OperationResult),
64    Failed { error: String },
65    Aborted { reason: Option<String> },
66    Cancelled { reason: Option<String> },
67    Retired,
68    Terminated { reason: String },
69}
70
71/// Current lifecycle status for an operation.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
73#[serde(rename_all = "snake_case")]
74pub enum OperationStatus {
75    Absent,
76    Provisioning,
77    Running,
78    Retiring,
79    Completed,
80    Failed,
81    Aborted,
82    Cancelled,
83    Retired,
84    Terminated,
85}
86
87impl OperationStatus {
88    pub fn is_terminal(self) -> bool {
89        matches!(
90            self,
91            Self::Completed
92                | Self::Failed
93                | Self::Aborted
94                | Self::Cancelled
95                | Self::Retired
96                | Self::Terminated
97        )
98    }
99
100    pub fn allows_terminalization(self) -> bool {
101        matches!(self, Self::Provisioning | Self::Running | Self::Retiring)
102    }
103
104    /// Stable string representation for app-facing surfaces.
105    ///
106    /// Unlike `Debug` format, this is an explicit mapping that won't
107    /// produce uncontrolled strings when new variants are added.
108    pub fn as_str(self) -> &'static str {
109        match self {
110            Self::Absent => "absent",
111            Self::Provisioning => "provisioning",
112            Self::Running => "running",
113            Self::Retiring => "retiring",
114            Self::Completed => "completed",
115            Self::Failed => "failed",
116            Self::Aborted => "aborted",
117            Self::Cancelled => "cancelled",
118            Self::Retired => "retired",
119            Self::Terminated => "terminated",
120        }
121    }
122}
123
124/// Public snapshot of one operation's lifecycle state.
125#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
126pub struct OperationLifecycleSnapshot {
127    pub id: OperationId,
128    pub kind: OperationKind,
129    pub display_name: String,
130    pub status: OperationStatus,
131    pub peer_ready: bool,
132    pub progress_count: u32,
133    pub watcher_count: u32,
134    pub terminal_outcome: Option<OperationTerminalOutcome>,
135    pub child_session_id: Option<SessionId>,
136    /// Peer handle info (exposed when peer_ready is true).
137    #[serde(skip_serializing_if = "Option::is_none", default)]
138    pub peer_handle: Option<OperationPeerHandle>,
139    /// Wall-clock epoch millis when the operation was registered.
140    #[serde(default)]
141    pub created_at_ms: u64,
142    /// Wall-clock epoch millis when provisioning succeeded (entered Running).
143    #[serde(skip_serializing_if = "Option::is_none", default)]
144    pub started_at_ms: Option<u64>,
145    /// Wall-clock epoch millis when the operation reached terminal state.
146    #[serde(skip_serializing_if = "Option::is_none", default)]
147    pub completed_at_ms: Option<u64>,
148    /// Monotonic elapsed millis from creation to terminal (computed from Instant).
149    #[serde(skip_serializing_if = "Option::is_none", default)]
150    pub elapsed_ms: Option<u64>,
151}
152
153/// One watcher for a terminal lifecycle outcome.
154pub struct OperationCompletionWatch {
155    rx: oneshot::Receiver<OperationTerminalOutcome>,
156}
157
158impl OperationCompletionWatch {
159    /// Create a pending watch and its sender.
160    pub fn channel() -> (
161        oneshot::Sender<OperationTerminalOutcome>,
162        OperationCompletionWatch,
163    ) {
164        let (tx, rx) = oneshot::channel();
165        (tx, Self { rx })
166    }
167
168    /// Await the operation's terminal outcome.
169    pub async fn wait(self) -> OperationTerminalOutcome {
170        match self.rx.await {
171            Ok(outcome) => outcome,
172            Err(_) => OperationTerminalOutcome::Terminated {
173                reason: "operation completion watch dropped".into(),
174            },
175        }
176    }
177
178    /// Create a watch that is already resolved.
179    pub fn already_resolved(outcome: OperationTerminalOutcome) -> Self {
180        let (tx, rx) = oneshot::channel();
181        let _ = tx.send(outcome);
182        Self { rx }
183    }
184}
185
186/// Errors returned by the shared lifecycle registry.
187#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
188pub enum OpsLifecycleError {
189    #[error("operation already registered: {0}")]
190    AlreadyRegistered(OperationId),
191    #[error("operation not found: {0}")]
192    NotFound(OperationId),
193    #[error("invalid lifecycle transition for {id}: {status:?} -> {action}")]
194    InvalidTransition {
195        id: OperationId,
196        status: OperationStatus,
197        action: &'static str,
198    },
199    #[error("operation does not expect a peer handoff: {0}")]
200    PeerNotExpected(OperationId),
201    #[error("operation is already peer-ready: {0}")]
202    AlreadyPeerReady(OperationId),
203    #[error("max concurrent operations exceeded (limit: {limit}, active: {active})")]
204    MaxConcurrentExceeded { limit: usize, active: usize },
205    #[error("operation not supported: {0}")]
206    Unsupported(String),
207    #[error("wait_all already active")]
208    WaitAlreadyActive,
209    #[error("wait_all not active for request: {0}")]
210    WaitNotActive(WaitRequestId),
211    #[error("wait_all contains duplicate operation id: {0}")]
212    DuplicateWaitOperation(OperationId),
213    #[error("internal lifecycle registry error: {0}")]
214    Internal(String),
215}
216
217/// Authority-owned result of `wait_all()`.
218///
219/// Carries the per-operation outcomes alongside an authority-derived
220/// obligation token (`satisfied`). The obligation proves the authority owned
221/// the wait request lifecycle and emitted `WaitAllSatisfied` when the tracked
222/// barrier set became terminal.
223#[derive(Debug)]
224pub struct WaitAllResult {
225    /// Per-operation terminal outcomes.
226    pub outcomes: Vec<(OperationId, OperationTerminalOutcome)>,
227    /// Authority-validated obligation token for the ops_barrier_satisfaction protocol.
228    pub satisfied: WaitAllSatisfied,
229}
230
231/// Authority-owned obligation token emitted by the `WaitAllSatisfied` effect.
232///
233/// Created only by the `OpsLifecycleRegistry::wait_all()` implementation after
234/// the authority resolves an outstanding wait request. Core-owned so it can be
235/// consumed by the `protocol_ops_barrier_satisfaction` helper without crossing
236/// crate boundaries.
237#[derive(Debug)]
238pub struct WaitAllSatisfied {
239    /// The authority-owned wait request that reached satisfaction.
240    pub wait_request_id: WaitRequestId,
241    /// The operation IDs validated as terminal by the authority.
242    pub operation_ids: Vec<OperationId>,
243}
244
245/// Shared async-operation lifecycle registry.
246pub trait OpsLifecycleRegistry: Send + Sync {
247    fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError>;
248    fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
249    fn provisioning_failed(&self, id: &OperationId, error: String)
250    -> Result<(), OpsLifecycleError>;
251    fn peer_ready(
252        &self,
253        id: &OperationId,
254        peer: OperationPeerHandle,
255    ) -> Result<(), OpsLifecycleError>;
256    fn register_watcher(
257        &self,
258        id: &OperationId,
259    ) -> Result<OperationCompletionWatch, OpsLifecycleError>;
260    fn report_progress(
261        &self,
262        id: &OperationId,
263        update: OperationProgressUpdate,
264    ) -> Result<(), OpsLifecycleError>;
265    fn complete_operation(
266        &self,
267        id: &OperationId,
268        result: OperationResult,
269    ) -> Result<(), OpsLifecycleError>;
270    fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError>;
271    fn abort_provisioning(
272        &self,
273        id: &OperationId,
274        reason: Option<String>,
275    ) -> Result<(), OpsLifecycleError>;
276    fn cancel_operation(
277        &self,
278        id: &OperationId,
279        reason: Option<String>,
280    ) -> Result<(), OpsLifecycleError>;
281    fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
282    fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
283    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot>;
284    fn list_operations(&self) -> Vec<OperationLifecycleSnapshot>;
285    fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError>;
286
287    /// Drain all completed operations from the registry, returning their outcomes.
288    fn collect_completed(
289        &self,
290    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
291        Err(OpsLifecycleError::Unsupported("collect_completed".into()))
292    }
293
294    /// Return the canonical completion feed, if this registry supports it.
295    ///
296    /// Runtime-backed registries return a feed handle that consumers (agent
297    /// boundary, idle wake) use for cursor-based completion delivery.
298    /// Returns `None` for registries that don't support the feed protocol.
299    fn completion_feed(
300        &self,
301    ) -> Option<std::sync::Arc<dyn crate::completion_feed::CompletionFeed>> {
302        None
303    }
304
305    /// Register an authority-owned barrier wait and await its completion.
306    ///
307    /// Returns a [`WaitAllResult`] containing per-operation outcomes and an
308    /// authority-owned obligation token. The runtime may host the async future,
309    /// but wait completion truth comes from the registry authority emitting
310    /// `WaitAllSatisfied`, not from shell watcher timing alone.
311    fn wait_all(
312        &self,
313        _run_id: &RunId,
314        _ids: &[OperationId],
315    ) -> std::pin::Pin<
316        Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
317    > {
318        Box::pin(std::future::ready(Err(OpsLifecycleError::Unsupported(
319            "wait_all".into(),
320        ))))
321    }
322}
323
324#[cfg(test)]
325#[allow(clippy::unwrap_used, clippy::panic)]
326mod tests {
327    use super::*;
328
329    #[tokio::test]
330    async fn already_resolved_watch_returns_terminal_outcome() {
331        let watch = OperationCompletionWatch::already_resolved(OperationTerminalOutcome::Retired);
332        assert_eq!(watch.wait().await, OperationTerminalOutcome::Retired);
333    }
334
335    #[test]
336    fn operation_kind_peer_expectation_matches_contract() {
337        assert!(OperationKind::MobMemberChild.expects_peer_channel());
338        assert!(!OperationKind::BackgroundToolOp.expects_peer_channel());
339    }
340
341    #[test]
342    fn terminal_statuses_match_contract() {
343        assert!(OperationStatus::Completed.is_terminal());
344        assert!(OperationStatus::Failed.is_terminal());
345        assert!(OperationStatus::Aborted.is_terminal());
346        assert!(OperationStatus::Cancelled.is_terminal());
347        assert!(OperationStatus::Retired.is_terminal());
348        assert!(OperationStatus::Terminated.is_terminal());
349        assert!(!OperationStatus::Running.is_terminal());
350        assert!(OperationStatus::Running.allows_terminalization());
351        assert!(OperationStatus::Retiring.allows_terminalization());
352        assert!(!OperationStatus::Completed.allows_terminalization());
353    }
354}