1use serde::{Deserialize, Serialize};
4
5#[cfg(target_arch = "wasm32")]
6use crate::tokio::sync::oneshot;
7#[cfg(not(target_arch = "wasm32"))]
8use tokio::sync::oneshot;
9
10use crate::comms::TrustedPeerDescriptor;
11use crate::lifecycle::{RunId, WaitRequestId};
12pub use crate::ops::{OperationId, OperationResult};
13use crate::types::SessionId;
14
15pub const DEFAULT_MAX_COMPLETED: usize = 256;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum OperationKind {
22 MobMemberChild,
23 BackgroundToolOp,
24}
25
26impl OperationKind {
27 pub fn expects_peer_channel(self) -> bool {
29 matches!(self, Self::MobMemberChild)
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct OperationSpec {
36 pub id: OperationId,
37 pub kind: OperationKind,
38 pub owner_session_id: SessionId,
44 pub display_name: String,
45 pub source_label: String,
46 pub child_session_id: Option<SessionId>,
47 pub expect_peer_channel: bool,
48}
49
50impl OperationSpec {
51 pub fn owner_bridge_session_id(&self) -> &SessionId {
53 &self.owner_session_id
54 }
55
56 pub fn owner_session_id(&self) -> &SessionId {
59 &self.owner_session_id
60 }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
65pub struct OperationPeerHandle {
66 pub peer_name: crate::comms::PeerName,
67 pub trusted_peer: TrustedPeerDescriptor,
68}
69
70#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
72pub struct OperationProgressUpdate {
73 pub message: String,
74 pub percent: Option<f32>,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
79#[serde(tag = "outcome_type", rename_all = "snake_case")]
80pub enum OperationTerminalOutcome {
81 Completed(OperationResult),
82 Failed { error: String },
83 Aborted { reason: Option<String> },
84 Cancelled { reason: Option<String> },
85 Retired,
86 Terminated { reason: String },
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
91#[serde(rename_all = "snake_case")]
92pub enum OperationStatus {
93 Absent,
94 Provisioning,
95 Running,
96 Retiring,
97 Completed,
98 Failed,
99 Aborted,
100 Cancelled,
101 Retired,
102 Terminated,
103}
104
105impl OperationStatus {
106 pub fn is_terminal(self) -> bool {
107 matches!(
108 self,
109 Self::Completed
110 | Self::Failed
111 | Self::Aborted
112 | Self::Cancelled
113 | Self::Retired
114 | Self::Terminated
115 )
116 }
117
118 pub fn allows_terminalization(self) -> bool {
119 matches!(self, Self::Provisioning | Self::Running | Self::Retiring)
120 }
121
122 pub fn as_str(self) -> &'static str {
127 match self {
128 Self::Absent => "absent",
129 Self::Provisioning => "provisioning",
130 Self::Running => "running",
131 Self::Retiring => "retiring",
132 Self::Completed => "completed",
133 Self::Failed => "failed",
134 Self::Aborted => "aborted",
135 Self::Cancelled => "cancelled",
136 Self::Retired => "retired",
137 Self::Terminated => "terminated",
138 }
139 }
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
144pub struct OperationLifecycleSnapshot {
145 pub id: OperationId,
146 pub kind: OperationKind,
147 pub display_name: String,
148 pub status: OperationStatus,
149 pub peer_ready: bool,
150 pub progress_count: u32,
151 pub watcher_count: u32,
152 pub terminal_outcome: Option<OperationTerminalOutcome>,
153 pub child_session_id: Option<SessionId>,
154 #[serde(skip_serializing_if = "Option::is_none", default)]
156 pub peer_handle: Option<OperationPeerHandle>,
157 #[serde(default)]
159 pub created_at_ms: u64,
160 #[serde(skip_serializing_if = "Option::is_none", default)]
162 pub started_at_ms: Option<u64>,
163 #[serde(skip_serializing_if = "Option::is_none", default)]
165 pub completed_at_ms: Option<u64>,
166 #[serde(skip_serializing_if = "Option::is_none", default)]
168 pub elapsed_ms: Option<u64>,
169}
170
171pub struct OperationCompletionWatch {
173 rx: oneshot::Receiver<OperationTerminalOutcome>,
174}
175
176impl OperationCompletionWatch {
177 pub fn channel() -> (
179 oneshot::Sender<OperationTerminalOutcome>,
180 OperationCompletionWatch,
181 ) {
182 let (tx, rx) = oneshot::channel();
183 (tx, Self { rx })
184 }
185
186 pub async fn wait(self) -> OperationTerminalOutcome {
188 match self.rx.await {
189 Ok(outcome) => outcome,
190 Err(_) => OperationTerminalOutcome::Terminated {
191 reason: "operation completion watch dropped".into(),
192 },
193 }
194 }
195
196 pub fn already_resolved(outcome: OperationTerminalOutcome) -> Self {
198 let (tx, rx) = oneshot::channel();
199 let _ = tx.send(outcome);
200 Self { rx }
201 }
202}
203
204#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
206pub enum OpsLifecycleError {
207 #[error("operation already registered: {0}")]
208 AlreadyRegistered(OperationId),
209 #[error("operation not found: {0}")]
210 NotFound(OperationId),
211 #[error("invalid lifecycle transition for {id}: {status:?} -> {action}")]
212 InvalidTransition {
213 id: OperationId,
214 status: OperationStatus,
215 action: &'static str,
216 },
217 #[error("operation does not expect a peer handoff: {0}")]
218 PeerNotExpected(OperationId),
219 #[error("operation is already peer-ready: {0}")]
220 AlreadyPeerReady(OperationId),
221 #[error("max concurrent operations exceeded (limit: {limit}, active: {active})")]
222 MaxConcurrentExceeded { limit: usize, active: usize },
223 #[error("operation not supported: {0}")]
224 Unsupported(String),
225 #[error("wait_all already active")]
226 WaitAlreadyActive,
227 #[error("wait_all not active for request: {0}")]
228 WaitNotActive(WaitRequestId),
229 #[error("wait_all contains duplicate operation id: {0}")]
230 DuplicateWaitOperation(OperationId),
231 #[error("internal lifecycle registry error: {0}")]
232 Internal(String),
233}
234
235#[derive(Debug)]
242pub struct WaitAllResult {
243 pub outcomes: Vec<(OperationId, OperationTerminalOutcome)>,
245 pub satisfied: WaitAllSatisfied,
247}
248
249#[derive(Debug)]
256pub struct WaitAllSatisfied {
257 pub wait_request_id: WaitRequestId,
259 pub operation_ids: Vec<OperationId>,
261}
262
263pub trait OpsLifecycleRegistry: Send + Sync {
265 fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError>;
266 fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
267 fn provisioning_failed(&self, id: &OperationId, error: String)
268 -> Result<(), OpsLifecycleError>;
269 fn peer_ready(
270 &self,
271 id: &OperationId,
272 peer: OperationPeerHandle,
273 ) -> Result<(), OpsLifecycleError>;
274 fn register_watcher(
275 &self,
276 id: &OperationId,
277 ) -> Result<OperationCompletionWatch, OpsLifecycleError>;
278 fn report_progress(
279 &self,
280 id: &OperationId,
281 update: OperationProgressUpdate,
282 ) -> Result<(), OpsLifecycleError>;
283 fn complete_operation(
284 &self,
285 id: &OperationId,
286 result: OperationResult,
287 ) -> Result<(), OpsLifecycleError>;
288 fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError>;
289 fn abort_provisioning(
290 &self,
291 id: &OperationId,
292 reason: Option<String>,
293 ) -> Result<(), OpsLifecycleError>;
294 fn cancel_operation(
295 &self,
296 id: &OperationId,
297 reason: Option<String>,
298 ) -> Result<(), OpsLifecycleError>;
299 fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
300 fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
301 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot>;
302 fn list_operations(&self) -> Vec<OperationLifecycleSnapshot>;
303 fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError>;
304
305 fn collect_completed(
307 &self,
308 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
309 Err(OpsLifecycleError::Unsupported("collect_completed".into()))
310 }
311
312 fn completion_feed(
318 &self,
319 ) -> Option<std::sync::Arc<dyn crate::completion_feed::CompletionFeed>> {
320 None
321 }
322
323 fn wait_all(
330 &self,
331 _run_id: &RunId,
332 _ids: &[OperationId],
333 ) -> std::pin::Pin<
334 Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
335 > {
336 Box::pin(std::future::ready(Err(OpsLifecycleError::Unsupported(
337 "wait_all".into(),
338 ))))
339 }
340}
341
342#[cfg(test)]
343#[allow(clippy::unwrap_used, clippy::panic)]
344mod tests {
345 use super::*;
346
347 #[tokio::test]
348 async fn already_resolved_watch_returns_terminal_outcome() {
349 let watch = OperationCompletionWatch::already_resolved(OperationTerminalOutcome::Retired);
350 assert_eq!(watch.wait().await, OperationTerminalOutcome::Retired);
351 }
352
353 #[test]
354 fn operation_kind_peer_expectation_matches_contract() {
355 assert!(OperationKind::MobMemberChild.expects_peer_channel());
356 assert!(!OperationKind::BackgroundToolOp.expects_peer_channel());
357 }
358
359 #[test]
360 fn terminal_statuses_match_contract() {
361 assert!(OperationStatus::Completed.is_terminal());
362 assert!(OperationStatus::Failed.is_terminal());
363 assert!(OperationStatus::Aborted.is_terminal());
364 assert!(OperationStatus::Cancelled.is_terminal());
365 assert!(OperationStatus::Retired.is_terminal());
366 assert!(OperationStatus::Terminated.is_terminal());
367 assert!(!OperationStatus::Running.is_terminal());
368 assert!(OperationStatus::Running.allows_terminalization());
369 assert!(OperationStatus::Retiring.allows_terminalization());
370 assert!(!OperationStatus::Completed.allows_terminalization());
371 }
372}