1use serde::{Deserialize, Serialize};
4
5#[cfg(target_arch = "wasm32")]
6use crate::tokio::sync::oneshot;
7#[cfg(not(target_arch = "wasm32"))]
8use tokio::sync::oneshot;
9
10use crate::comms::TrustedPeerSpec;
11use crate::lifecycle::{RunId, WaitRequestId};
12pub use crate::ops::{OperationId, OperationResult};
13use crate::types::SessionId;
14
15pub const DEFAULT_MAX_COMPLETED: usize = 256;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(rename_all = "snake_case")]
21pub enum OperationKind {
22 MobMemberChild,
23 BackgroundToolOp,
24}
25
26impl OperationKind {
27 pub fn expects_peer_channel(self) -> bool {
29 matches!(self, Self::MobMemberChild)
30 }
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
35pub struct OperationSpec {
36 pub id: OperationId,
37 pub kind: OperationKind,
38 pub owner_session_id: SessionId,
39 pub display_name: String,
40 pub source_label: String,
41 pub child_session_id: Option<SessionId>,
42 pub expect_peer_channel: bool,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct OperationPeerHandle {
48 pub peer_name: String,
49 pub trusted_peer: TrustedPeerSpec,
50}
51
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
54pub struct OperationProgressUpdate {
55 pub message: String,
56 pub percent: Option<f32>,
57}
58
59#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(tag = "outcome_type", rename_all = "snake_case")]
62pub enum OperationTerminalOutcome {
63 Completed(OperationResult),
64 Failed { error: String },
65 Aborted { reason: Option<String> },
66 Cancelled { reason: Option<String> },
67 Retired,
68 Terminated { reason: String },
69}
70
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
73#[serde(rename_all = "snake_case")]
74pub enum OperationStatus {
75 Absent,
76 Provisioning,
77 Running,
78 Retiring,
79 Completed,
80 Failed,
81 Aborted,
82 Cancelled,
83 Retired,
84 Terminated,
85}
86
87impl OperationStatus {
88 pub fn is_terminal(self) -> bool {
89 matches!(
90 self,
91 Self::Completed
92 | Self::Failed
93 | Self::Aborted
94 | Self::Cancelled
95 | Self::Retired
96 | Self::Terminated
97 )
98 }
99
100 pub fn allows_terminalization(self) -> bool {
101 matches!(self, Self::Provisioning | Self::Running | Self::Retiring)
102 }
103
104 pub fn as_str(self) -> &'static str {
109 match self {
110 Self::Absent => "absent",
111 Self::Provisioning => "provisioning",
112 Self::Running => "running",
113 Self::Retiring => "retiring",
114 Self::Completed => "completed",
115 Self::Failed => "failed",
116 Self::Aborted => "aborted",
117 Self::Cancelled => "cancelled",
118 Self::Retired => "retired",
119 Self::Terminated => "terminated",
120 }
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
126pub struct OperationLifecycleSnapshot {
127 pub id: OperationId,
128 pub kind: OperationKind,
129 pub display_name: String,
130 pub status: OperationStatus,
131 pub peer_ready: bool,
132 pub progress_count: u32,
133 pub watcher_count: u32,
134 pub terminal_outcome: Option<OperationTerminalOutcome>,
135 pub child_session_id: Option<SessionId>,
136 #[serde(skip_serializing_if = "Option::is_none", default)]
138 pub peer_handle: Option<OperationPeerHandle>,
139 #[serde(default)]
141 pub created_at_ms: u64,
142 #[serde(skip_serializing_if = "Option::is_none", default)]
144 pub started_at_ms: Option<u64>,
145 #[serde(skip_serializing_if = "Option::is_none", default)]
147 pub completed_at_ms: Option<u64>,
148 #[serde(skip_serializing_if = "Option::is_none", default)]
150 pub elapsed_ms: Option<u64>,
151}
152
153pub struct OperationCompletionWatch {
155 rx: oneshot::Receiver<OperationTerminalOutcome>,
156}
157
158impl OperationCompletionWatch {
159 pub fn channel() -> (
161 oneshot::Sender<OperationTerminalOutcome>,
162 OperationCompletionWatch,
163 ) {
164 let (tx, rx) = oneshot::channel();
165 (tx, Self { rx })
166 }
167
168 pub async fn wait(self) -> OperationTerminalOutcome {
170 match self.rx.await {
171 Ok(outcome) => outcome,
172 Err(_) => OperationTerminalOutcome::Terminated {
173 reason: "operation completion watch dropped".into(),
174 },
175 }
176 }
177
178 pub fn already_resolved(outcome: OperationTerminalOutcome) -> Self {
180 let (tx, rx) = oneshot::channel();
181 let _ = tx.send(outcome);
182 Self { rx }
183 }
184}
185
186#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
188pub enum OpsLifecycleError {
189 #[error("operation already registered: {0}")]
190 AlreadyRegistered(OperationId),
191 #[error("operation not found: {0}")]
192 NotFound(OperationId),
193 #[error("invalid lifecycle transition for {id}: {status:?} -> {action}")]
194 InvalidTransition {
195 id: OperationId,
196 status: OperationStatus,
197 action: &'static str,
198 },
199 #[error("operation does not expect a peer handoff: {0}")]
200 PeerNotExpected(OperationId),
201 #[error("operation is already peer-ready: {0}")]
202 AlreadyPeerReady(OperationId),
203 #[error("max concurrent operations exceeded (limit: {limit}, active: {active})")]
204 MaxConcurrentExceeded { limit: usize, active: usize },
205 #[error("operation not supported: {0}")]
206 Unsupported(String),
207 #[error("wait_all already active")]
208 WaitAlreadyActive,
209 #[error("wait_all not active for request: {0}")]
210 WaitNotActive(WaitRequestId),
211 #[error("wait_all contains duplicate operation id: {0}")]
212 DuplicateWaitOperation(OperationId),
213 #[error("internal lifecycle registry error: {0}")]
214 Internal(String),
215}
216
217#[derive(Debug)]
224pub struct WaitAllResult {
225 pub outcomes: Vec<(OperationId, OperationTerminalOutcome)>,
227 pub satisfied: WaitAllSatisfied,
229}
230
231#[derive(Debug)]
238pub struct WaitAllSatisfied {
239 pub wait_request_id: WaitRequestId,
241 pub operation_ids: Vec<OperationId>,
243}
244
245pub trait OpsLifecycleRegistry: Send + Sync {
247 fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError>;
248 fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
249 fn provisioning_failed(&self, id: &OperationId, error: String)
250 -> Result<(), OpsLifecycleError>;
251 fn peer_ready(
252 &self,
253 id: &OperationId,
254 peer: OperationPeerHandle,
255 ) -> Result<(), OpsLifecycleError>;
256 fn register_watcher(
257 &self,
258 id: &OperationId,
259 ) -> Result<OperationCompletionWatch, OpsLifecycleError>;
260 fn report_progress(
261 &self,
262 id: &OperationId,
263 update: OperationProgressUpdate,
264 ) -> Result<(), OpsLifecycleError>;
265 fn complete_operation(
266 &self,
267 id: &OperationId,
268 result: OperationResult,
269 ) -> Result<(), OpsLifecycleError>;
270 fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError>;
271 fn abort_provisioning(
272 &self,
273 id: &OperationId,
274 reason: Option<String>,
275 ) -> Result<(), OpsLifecycleError>;
276 fn cancel_operation(
277 &self,
278 id: &OperationId,
279 reason: Option<String>,
280 ) -> Result<(), OpsLifecycleError>;
281 fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
282 fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
283 fn snapshot(&self, id: &OperationId) -> Option<OperationLifecycleSnapshot>;
284 fn list_operations(&self) -> Vec<OperationLifecycleSnapshot>;
285 fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError>;
286
287 fn collect_completed(
289 &self,
290 ) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
291 Err(OpsLifecycleError::Unsupported("collect_completed".into()))
292 }
293
294 fn completion_feed(
300 &self,
301 ) -> Option<std::sync::Arc<dyn crate::completion_feed::CompletionFeed>> {
302 None
303 }
304
305 fn wait_all(
312 &self,
313 _run_id: &RunId,
314 _ids: &[OperationId],
315 ) -> std::pin::Pin<
316 Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
317 > {
318 Box::pin(std::future::ready(Err(OpsLifecycleError::Unsupported(
319 "wait_all".into(),
320 ))))
321 }
322}
323
324#[cfg(test)]
325#[allow(clippy::unwrap_used, clippy::panic)]
326mod tests {
327 use super::*;
328
329 #[tokio::test]
330 async fn already_resolved_watch_returns_terminal_outcome() {
331 let watch = OperationCompletionWatch::already_resolved(OperationTerminalOutcome::Retired);
332 assert_eq!(watch.wait().await, OperationTerminalOutcome::Retired);
333 }
334
335 #[test]
336 fn operation_kind_peer_expectation_matches_contract() {
337 assert!(OperationKind::MobMemberChild.expects_peer_channel());
338 assert!(!OperationKind::BackgroundToolOp.expects_peer_channel());
339 }
340
341 #[test]
342 fn terminal_statuses_match_contract() {
343 assert!(OperationStatus::Completed.is_terminal());
344 assert!(OperationStatus::Failed.is_terminal());
345 assert!(OperationStatus::Aborted.is_terminal());
346 assert!(OperationStatus::Cancelled.is_terminal());
347 assert!(OperationStatus::Retired.is_terminal());
348 assert!(OperationStatus::Terminated.is_terminal());
349 assert!(!OperationStatus::Running.is_terminal());
350 assert!(OperationStatus::Running.allows_terminalization());
351 assert!(OperationStatus::Retiring.allows_terminalization());
352 assert!(!OperationStatus::Completed.allows_terminalization());
353 }
354}