Skip to main content

meerkat_core/
ops_lifecycle.rs

1//! Canonical async-operation lifecycle seam for shared child/background work.
2
3use serde::{Deserialize, Serialize};
4
5#[cfg(target_arch = "wasm32")]
6use crate::tokio::sync::oneshot;
7#[cfg(not(target_arch = "wasm32"))]
8use tokio::sync::oneshot;
9
10use crate::comms::TrustedPeerDescriptor;
11use crate::lifecycle::{RunId, WaitRequestId};
12pub use crate::ops::{OperationId, OperationResult};
13use crate::types::SessionId;
14
15/// Default maximum number of completed operations to retain before eviction.
16pub const DEFAULT_MAX_COMPLETED: usize = 256;
17
18/// The kind of async operation tracked by the shared lifecycle registry.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum OperationKind {
22    MobMemberChild,
23    BackgroundToolOp,
24}
25
26impl OperationKind {
27    /// Whether this kind can expose a peer-ready handoff.
28    pub fn expects_peer_channel(self) -> bool {
29        matches!(self, Self::MobMemberChild)
30    }
31}
32
33/// Lifecycle-relevant registration payload for an operation.
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct OperationSpec {
36    pub id: OperationId,
37    pub kind: OperationKind,
38    /// Compatibility owner binding field.
39    ///
40    /// Under the identity-first Mob regime this carries the canonical bridge
41    /// session binding, even though the stored field name still says
42    /// `owner_session_id`.
43    pub owner_session_id: SessionId,
44    pub display_name: String,
45    pub source_label: String,
46    pub child_session_id: Option<SessionId>,
47    pub expect_peer_channel: bool,
48}
49
50impl OperationSpec {
51    /// Canonical owner bridge binding for this operation.
52    pub fn owner_bridge_session_id(&self) -> &SessionId {
53        &self.owner_session_id
54    }
55
56    /// Compatibility alias retained while older surfaces still speak in
57    /// session-centric terms.
58    pub fn owner_session_id(&self) -> &SessionId {
59        &self.owner_session_id
60    }
61}
62
63/// Peer-facing connection handoff surfaced once an operation is ready.
64#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
65pub struct OperationPeerHandle {
66    pub peer_name: crate::comms::PeerName,
67    pub trusted_peer: TrustedPeerDescriptor,
68}
69
70/// Progress update for a long-running async operation.
71#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
72pub struct OperationProgressUpdate {
73    pub message: String,
74    pub percent: Option<f32>,
75}
76
77/// Terminal lifecycle outcome recorded for an operation.
78#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(tag = "outcome_type", rename_all = "snake_case")]
80pub enum OperationTerminalOutcome {
81    Completed(OperationResult),
82    Failed { error: String },
83    Aborted { reason: Option<String> },
84    Cancelled { reason: Option<String> },
85    Retired,
86    Terminated { reason: String },
87}
88
89/// Current lifecycle status for an operation.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
91#[serde(rename_all = "snake_case")]
92pub enum OperationStatus {
93    Absent,
94    Provisioning,
95    Running,
96    Retiring,
97    Completed,
98    Failed,
99    Aborted,
100    Cancelled,
101    Retired,
102    Terminated,
103}
104
105impl OperationStatus {
106    pub fn is_terminal(self) -> bool {
107        matches!(
108            self,
109            Self::Completed
110                | Self::Failed
111                | Self::Aborted
112                | Self::Cancelled
113                | Self::Retired
114                | Self::Terminated
115        )
116    }
117
118    pub fn allows_terminalization(self) -> bool {
119        matches!(self, Self::Provisioning | Self::Running | Self::Retiring)
120    }
121
122    /// Stable string representation for app-facing surfaces.
123    ///
124    /// Unlike `Debug` format, this is an explicit mapping that won't
125    /// produce uncontrolled strings when new variants are added.
126    pub fn as_str(self) -> &'static str {
127        match self {
128            Self::Absent => "absent",
129            Self::Provisioning => "provisioning",
130            Self::Running => "running",
131            Self::Retiring => "retiring",
132            Self::Completed => "completed",
133            Self::Failed => "failed",
134            Self::Aborted => "aborted",
135            Self::Cancelled => "cancelled",
136            Self::Retired => "retired",
137            Self::Terminated => "terminated",
138        }
139    }
140}
141
142/// Public snapshot of one operation's lifecycle state.
143#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144pub struct OperationLifecycleSnapshot {
145    pub id: OperationId,
146    pub kind: OperationKind,
147    pub display_name: String,
148    pub status: OperationStatus,
149    pub peer_ready: bool,
150    pub progress_count: u32,
151    pub watcher_count: u32,
152    pub terminal_outcome: Option<OperationTerminalOutcome>,
153    pub child_session_id: Option<SessionId>,
154    /// Peer handle info (exposed when peer_ready is true).
155    #[serde(skip_serializing_if = "Option::is_none", default)]
156    pub peer_handle: Option<OperationPeerHandle>,
157    /// Wall-clock epoch millis when the operation was registered.
158    #[serde(default)]
159    pub created_at_ms: u64,
160    /// Wall-clock epoch millis when provisioning succeeded (entered Running).
161    #[serde(skip_serializing_if = "Option::is_none", default)]
162    pub started_at_ms: Option<u64>,
163    /// Wall-clock epoch millis when the operation reached terminal state.
164    #[serde(skip_serializing_if = "Option::is_none", default)]
165    pub completed_at_ms: Option<u64>,
166    /// Monotonic elapsed millis from creation to terminal (computed from Instant).
167    #[serde(skip_serializing_if = "Option::is_none", default)]
168    pub elapsed_ms: Option<u64>,
169}
170
171/// One watcher for a terminal lifecycle outcome.
172pub struct OperationCompletionWatch {
173    rx: oneshot::Receiver<OperationTerminalOutcome>,
174}
175
176impl OperationCompletionWatch {
177    /// Create a pending watch and its sender.
178    pub fn channel() -> (
179        oneshot::Sender<OperationTerminalOutcome>,
180        OperationCompletionWatch,
181    ) {
182        let (tx, rx) = oneshot::channel();
183        (tx, Self { rx })
184    }
185
186    /// Await the operation's terminal outcome.
187    pub async fn wait(self) -> OperationTerminalOutcome {
188        match self.rx.await {
189            Ok(outcome) => outcome,
190            Err(_) => OperationTerminalOutcome::Terminated {
191                reason: "operation completion watch dropped".into(),
192            },
193        }
194    }
195
196    /// Create a watch that is already resolved.
197    pub fn already_resolved(outcome: OperationTerminalOutcome) -> Self {
198        let (tx, rx) = oneshot::channel();
199        let _ = tx.send(outcome);
200        Self { rx }
201    }
202}
203
204/// Errors returned by the shared lifecycle registry.
205#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
206pub enum OpsLifecycleError {
207    #[error("operation already registered: {0}")]
208    AlreadyRegistered(OperationId),
209    #[error("operation not found: {0}")]
210    NotFound(OperationId),
211    #[error("invalid lifecycle transition for {id}: {status:?} -> {action}")]
212    InvalidTransition {
213        id: OperationId,
214        status: OperationStatus,
215        action: &'static str,
216    },
217    #[error("operation does not expect a peer handoff: {0}")]
218    PeerNotExpected(OperationId),
219    #[error("operation is already peer-ready: {0}")]
220    AlreadyPeerReady(OperationId),
221    #[error("max concurrent operations exceeded (limit: {limit}, active: {active})")]
222    MaxConcurrentExceeded { limit: usize, active: usize },
223    #[error("operation not supported: {0}")]
224    Unsupported(String),
225    #[error("wait_all already active")]
226    WaitAlreadyActive,
227    #[error("wait_all not active for request: {0}")]
228    WaitNotActive(WaitRequestId),
229    #[error("wait_all contains duplicate operation id: {0}")]
230    DuplicateWaitOperation(OperationId),
231    #[error("internal lifecycle registry error: {0}")]
232    Internal(String),
233}
234
235/// Authority-owned result of `wait_all()`.
236///
237/// Carries the per-operation outcomes alongside an authority-derived
238/// obligation token (`satisfied`). The obligation proves the authority owned
239/// the wait request lifecycle and emitted `WaitAllSatisfied` when the tracked
240/// barrier set became terminal.
241#[derive(Debug)]
242pub struct WaitAllResult {
243    /// Per-operation terminal outcomes.
244    pub outcomes: Vec<(OperationId, OperationTerminalOutcome)>,
245    /// Authority-validated obligation token for the ops_barrier_satisfaction protocol.
246    pub satisfied: WaitAllSatisfied,
247}
248
249/// Authority-owned obligation token emitted by the `WaitAllSatisfied` effect.
250///
251/// Created only by the `OpsLifecycleRegistry::wait_all()` implementation after
252/// the authority resolves an outstanding wait request. Core-owned so it can be
253/// consumed by the `protocol_ops_barrier_satisfaction` helper without crossing
254/// crate boundaries.
255#[derive(Debug)]
256pub struct WaitAllSatisfied {
257    /// The authority-owned wait request that reached satisfaction.
258    pub wait_request_id: WaitRequestId,
259    /// The operation IDs validated as terminal by the authority.
260    pub operation_ids: Vec<OperationId>,
261}
262
263/// Shared async-operation lifecycle registry.
264pub trait OpsLifecycleRegistry: Send + Sync {
265    fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError>;
266    fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
267    fn provisioning_failed(&self, id: &OperationId, error: String)
268    -> Result<(), OpsLifecycleError>;
269    fn peer_ready(
270        &self,
271        id: &OperationId,
272        peer: OperationPeerHandle,
273    ) -> Result<(), OpsLifecycleError>;
274    fn register_watcher(
275        &self,
276        id: &OperationId,
277    ) -> Result<OperationCompletionWatch, OpsLifecycleError>;
278    fn report_progress(
279        &self,
280        id: &OperationId,
281        update: OperationProgressUpdate,
282    ) -> Result<(), OpsLifecycleError>;
283    fn complete_operation(
284        &self,
285        id: &OperationId,
286        result: OperationResult,
287    ) -> Result<(), OpsLifecycleError>;
288    fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError>;
289    fn abort_provisioning(
290        &self,
291        id: &OperationId,
292        reason: Option<String>,
293    ) -> Result<(), OpsLifecycleError>;
294    fn cancel_operation(
295        &self,
296        id: &OperationId,
297        reason: Option<String>,
298    ) -> Result<(), OpsLifecycleError>;
299    fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
300    fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
301    fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot>;
302    fn list_operations(&self) -> Vec<OperationLifecycleSnapshot>;
303    fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError>;
304
305    /// Drain all completed operations from the registry, returning their outcomes.
306    fn collect_completed(
307        &self,
308    ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
309        Err(OpsLifecycleError::Unsupported("collect_completed".into()))
310    }
311
312    /// Return the canonical completion feed, if this registry supports it.
313    ///
314    /// Runtime-backed registries return a feed handle that consumers (agent
315    /// boundary, idle wake) use for cursor-based completion delivery.
316    /// Returns `None` for registries that don't support the feed protocol.
317    fn completion_feed(
318        &self,
319    ) -> Option<std::sync::Arc<dyn crate::completion_feed::CompletionFeed>> {
320        None
321    }
322
323    /// Register an authority-owned barrier wait and await its completion.
324    ///
325    /// Returns a [`WaitAllResult`] containing per-operation outcomes and an
326    /// authority-owned obligation token. The runtime may host the async future,
327    /// but wait completion truth comes from the registry authority emitting
328    /// `WaitAllSatisfied`, not from shell watcher timing alone.
329    fn wait_all(
330        &self,
331        _run_id: &RunId,
332        _ids: &[OperationId],
333    ) -> std::pin::Pin<
334        Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
335    > {
336        Box::pin(std::future::ready(Err(OpsLifecycleError::Unsupported(
337            "wait_all".into(),
338        ))))
339    }
340}
341
342#[cfg(test)]
343#[allow(clippy::unwrap_used, clippy::panic)]
344mod tests {
345    use super::*;
346
347    #[tokio::test]
348    async fn already_resolved_watch_returns_terminal_outcome() {
349        let watch = OperationCompletionWatch::already_resolved(OperationTerminalOutcome::Retired);
350        assert_eq!(watch.wait().await, OperationTerminalOutcome::Retired);
351    }
352
353    #[test]
354    fn operation_kind_peer_expectation_matches_contract() {
355        assert!(OperationKind::MobMemberChild.expects_peer_channel());
356        assert!(!OperationKind::BackgroundToolOp.expects_peer_channel());
357    }
358
359    #[test]
360    fn terminal_statuses_match_contract() {
361        assert!(OperationStatus::Completed.is_terminal());
362        assert!(OperationStatus::Failed.is_terminal());
363        assert!(OperationStatus::Aborted.is_terminal());
364        assert!(OperationStatus::Cancelled.is_terminal());
365        assert!(OperationStatus::Retired.is_terminal());
366        assert!(OperationStatus::Terminated.is_terminal());
367        assert!(!OperationStatus::Running.is_terminal());
368        assert!(OperationStatus::Running.allows_terminalization());
369        assert!(OperationStatus::Retiring.allows_terminalization());
370        assert!(!OperationStatus::Completed.allows_terminalization());
371    }
372}