use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
use crate::comms::{PeerAddress, PeerId, TrustedPeerDescriptor};
use crate::lifecycle::{RunId, WaitRequestId};
pub use crate::ops::{OperationId, OperationResult};
use crate::runtime_epoch::EpochCursorState;
use crate::types::SessionId;
pub const DEFAULT_MAX_COMPLETED: usize = 256;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationKind {
MobMemberChild,
BackgroundToolOp,
BackgroundToolCapacitySlot,
}
impl OperationKind {
pub const ALL: [Self; 3] = [
Self::MobMemberChild,
Self::BackgroundToolOp,
Self::BackgroundToolCapacitySlot,
];
pub const fn generated_variant(self) -> &'static str {
match self {
Self::MobMemberChild => "MobMemberChild",
Self::BackgroundToolOp => "BackgroundToolOp",
Self::BackgroundToolCapacitySlot => "BackgroundToolCapacitySlot",
}
}
pub fn expects_peer_channel(self) -> bool {
matches!(self, Self::MobMemberChild)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum OperationSource {
SessionChild {
session_id: SessionId,
},
BackendPeer {
peer_id: PeerId,
address: PeerAddress,
},
}
impl OperationSource {
pub fn session_child(session_id: SessionId) -> Self {
Self::SessionChild { session_id }
}
pub fn backend_peer(peer_id: PeerId, address: PeerAddress) -> Self {
Self::BackendPeer { peer_id, address }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OperationSpec {
pub id: OperationId,
pub kind: OperationKind,
pub owner_session_id: SessionId,
pub display_name: String,
pub source_label: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub operation_source: Option<OperationSource>,
pub child_session_id: Option<SessionId>,
pub expect_peer_channel: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OperationPeerHandle {
pub peer_name: crate::comms::PeerName,
pub trusted_peer: TrustedPeerDescriptor,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct OperationProgressUpdate {
pub message: String,
pub percent: Option<f32>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "outcome_type", rename_all = "snake_case")]
pub enum OperationTerminalOutcome {
Completed(OperationResult),
Failed {
error: String,
},
Aborted {
reason: Option<String>,
},
Cancelled {
reason: Option<String>,
},
#[default]
Retired,
Terminated {
reason: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationStatus {
Absent,
Provisioning,
Running,
Retiring,
Completed,
Failed,
Aborted,
Cancelled,
Retired,
Terminated,
}
impl OperationStatus {
pub fn as_str(self) -> &'static str {
match self {
Self::Absent => "absent",
Self::Provisioning => "provisioning",
Self::Running => "running",
Self::Retiring => "retiring",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Aborted => "aborted",
Self::Cancelled => "cancelled",
Self::Retired => "retired",
Self::Terminated => "terminated",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationPublicResultClass {
MissingAuthority,
Running,
Completed,
Failed,
Cancelled,
}
impl OperationPublicResultClass {
pub const ALL: [Self; 5] = [
Self::MissingAuthority,
Self::Running,
Self::Completed,
Self::Failed,
Self::Cancelled,
];
pub const fn generated_variant(self) -> &'static str {
match self {
Self::MissingAuthority => "MissingAuthority",
Self::Running => "Running",
Self::Completed => "Completed",
Self::Failed => "Failed",
Self::Cancelled => "Cancelled",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationCompletionWakeClass {
Wake,
Ignore,
}
impl OperationCompletionWakeClass {
pub const ALL: [Self; 2] = [Self::Wake, Self::Ignore];
pub const fn generated_variant(self) -> &'static str {
match self {
Self::Wake => "Wake",
Self::Ignore => "Ignore",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum OperationLifecycleAction {
Start,
Fail,
PeerReady,
ProgressReported,
Complete,
Abort,
Cancel,
RetireRequested,
RetireCompleted,
Terminate,
}
impl OperationLifecycleAction {
pub const ALL: [Self; 10] = [
Self::Start,
Self::Fail,
Self::PeerReady,
Self::ProgressReported,
Self::Complete,
Self::Abort,
Self::Cancel,
Self::RetireRequested,
Self::RetireCompleted,
Self::Terminate,
];
pub const fn generated_variant(self) -> &'static str {
match self {
Self::Start => "Start",
Self::Fail => "Fail",
Self::PeerReady => "PeerReady",
Self::ProgressReported => "ProgressReported",
Self::Complete => "Complete",
Self::Abort => "Abort",
Self::Cancel => "Cancel",
Self::RetireRequested => "RetireRequested",
Self::RetireCompleted => "RetireCompleted",
Self::Terminate => "Terminate",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct OperationLifecycleSnapshot {
pub id: OperationId,
pub kind: OperationKind,
pub display_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub operation_source: Option<OperationSource>,
pub status: OperationStatus,
pub terminal: bool,
pub public_result_class: OperationPublicResultClass,
pub peer_ready: bool,
pub progress_count: u32,
pub watcher_count: u32,
pub terminal_outcome: Option<OperationTerminalOutcome>,
pub child_session_id: Option<SessionId>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub peer_handle: Option<OperationPeerHandle>,
#[serde(default)]
pub created_at_ms: u64,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub started_at_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub completed_at_ms: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub elapsed_ms: Option<u64>,
}
pub type OperationCompletionWatch = Pin<
Box<
dyn Future<Output = Result<OperationTerminalOutcome, OperationCompletionWatchError>>
+ Send
+ 'static,
>,
>;
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum OperationCompletionWatchError {
#[error("operation completion channel closed without an authorized terminal outcome")]
ChannelClosed,
}
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum OpsLifecycleError {
#[error("operation already registered: {0}")]
AlreadyRegistered(OperationId),
#[error("operation not found: {0}")]
NotFound(OperationId),
#[error("invalid lifecycle transition for {id}: {status:?} -> {action}")]
InvalidTransition {
id: OperationId,
status: OperationStatus,
action: &'static str,
},
#[error("operation does not expect a peer handoff: {0}")]
PeerNotExpected(OperationId),
#[error("operation is already peer-ready: {0}")]
AlreadyPeerReady(OperationId),
#[error("max concurrent operations exceeded (limit: {limit}, active: {active})")]
MaxConcurrentExceeded { limit: usize, active: usize },
#[error("operation not supported: {0}")]
Unsupported(String),
#[error("wait_all already active")]
WaitAlreadyActive,
#[error("wait_all not active for request: {0}")]
WaitNotActive(WaitRequestId),
#[error("wait_all contains duplicate operation id: {0}")]
DuplicateWaitOperation(OperationId),
#[error("internal lifecycle registry error: {0}")]
Internal(String),
}
#[derive(Debug)]
pub struct WaitAllResult {
pub outcomes: Vec<(OperationId, OperationTerminalOutcome)>,
pub satisfied: WaitAllSatisfied,
}
#[derive(Debug)]
pub struct WaitAllSatisfied {
pub wait_request_id: WaitRequestId,
pub run_id: RunId,
pub operation_ids: Vec<OperationId>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum CompletionCursorConsumer {
AgentApplied,
RuntimeObserved,
RuntimeInjected,
}
pub trait OpsLifecycleRegistry: Send + Sync {
fn register_operation(&self, spec: OperationSpec) -> Result<(), OpsLifecycleError>;
fn provisioning_succeeded(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
fn provisioning_failed(&self, id: &OperationId, error: String)
-> Result<(), OpsLifecycleError>;
fn peer_ready(
&self,
id: &OperationId,
peer: OperationPeerHandle,
) -> Result<(), OpsLifecycleError>;
fn register_watcher(
&self,
id: &OperationId,
) -> Result<OperationCompletionWatch, OpsLifecycleError>;
fn report_progress(
&self,
id: &OperationId,
update: OperationProgressUpdate,
) -> Result<(), OpsLifecycleError>;
fn complete_operation(
&self,
id: &OperationId,
result: OperationResult,
) -> Result<(), OpsLifecycleError>;
fn fail_operation(&self, id: &OperationId, error: String) -> Result<(), OpsLifecycleError>;
fn abort_provisioning(
&self,
id: &OperationId,
reason: Option<String>,
) -> Result<(), OpsLifecycleError>;
fn cancel_operation(
&self,
id: &OperationId,
reason: Option<String>,
) -> Result<(), OpsLifecycleError>;
fn request_retire(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
fn mark_retired(&self, id: &OperationId) -> Result<(), OpsLifecycleError>;
fn snapshot(
&self,
id: &OperationId,
) -> Result<Option<OperationLifecycleSnapshot>, OpsLifecycleError>;
fn list_operations(&self) -> Result<Vec<OperationLifecycleSnapshot>, OpsLifecycleError>;
fn terminate_owner(&self, reason: String) -> Result<(), OpsLifecycleError>;
fn classify_operation_terminality(
&self,
_id: &OperationId,
_status: OperationStatus,
) -> Result<bool, OpsLifecycleError> {
Err(OpsLifecycleError::Unsupported(
"classify_operation_terminality".into(),
))
}
fn classify_operation_public_result(
&self,
_id: &OperationId,
) -> Result<OperationPublicResultClass, OpsLifecycleError> {
Err(OpsLifecycleError::Unsupported(
"classify_operation_public_result".into(),
))
}
fn classify_operation_completion_wake(
&self,
_id: &OperationId,
_kind: OperationKind,
) -> Result<OperationCompletionWakeClass, OpsLifecycleError> {
Err(OpsLifecycleError::Unsupported(
"classify_operation_completion_wake".into(),
))
}
fn classify_operation_transition_idempotence(
&self,
_id: &OperationId,
_action: OperationLifecycleAction,
) -> Result<bool, OpsLifecycleError> {
Err(OpsLifecycleError::Unsupported(
"classify_operation_transition_idempotence".into(),
))
}
fn register_operation_with_admission_limit(
&self,
_spec: OperationSpec,
_max_concurrent: Option<usize>,
) -> Result<(), OpsLifecycleError> {
Err(OpsLifecycleError::Unsupported(
"register_operation_with_admission_limit".into(),
))
}
fn collect_completed(
&self,
) -> Result<Vec<(OperationId, OperationTerminalOutcome)>, OpsLifecycleError> {
Err(OpsLifecycleError::Unsupported("collect_completed".into()))
}
fn completion_feed(
&self,
) -> Option<std::sync::Arc<dyn crate::completion_feed::CompletionFeed>> {
None
}
fn completion_cursor(
&self,
_consumer: CompletionCursorConsumer,
) -> Result<Option<crate::completion_feed::CompletionSeq>, OpsLifecycleError> {
Ok(None)
}
fn advance_completion_cursor(
&self,
_consumer: CompletionCursorConsumer,
_cursor: crate::completion_feed::CompletionSeq,
_projection: Option<&EpochCursorState>,
) -> Result<crate::completion_feed::CompletionSeq, OpsLifecycleError> {
Err(OpsLifecycleError::Unsupported(
"advance_completion_cursor".into(),
))
}
fn wait_all(
&self,
_run_id: &RunId,
_ids: &[OperationId],
) -> std::pin::Pin<
Box<dyn std::future::Future<Output = Result<WaitAllResult, OpsLifecycleError>> + Send + '_>,
> {
Box::pin(std::future::ready(Err(OpsLifecycleError::Unsupported(
"wait_all".into(),
))))
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn operation_kind_peer_expectation_matches_contract() {
assert!(OperationKind::MobMemberChild.expects_peer_channel());
assert!(!OperationKind::BackgroundToolOp.expects_peer_channel());
assert!(!OperationKind::BackgroundToolCapacitySlot.expects_peer_channel());
}
}