use super::flow_frame_engine::FlowFrameLoopStorePlan;
use super::terminalization::{TerminalizationOutcome, TerminalizationTarget};
use super::*;
use crate::machines::mob_machine as mob_dsl;
use crate::run::{MobMachineFlowRunCommand, MobRun, flow_run};
#[cfg(target_arch = "wasm32")]
use crate::tokio;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum MobState {
Creating = 0,
Running = 1,
Stopped = 2,
Completed = 3,
Destroyed = 4,
}
impl MobState {
pub fn as_str(self) -> &'static str {
match self {
Self::Creating => "Creating",
Self::Running => "Running",
Self::Stopped => "Stopped",
Self::Completed => "Completed",
Self::Destroyed => "Destroyed",
}
}
}
impl std::fmt::Display for MobState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MobOrchestratorSnapshot {
pub phase: MobState,
pub coordinator_bound: bool,
pub pending_spawn_count: u32,
pub active_flow_count: u32,
pub topology_revision: u32,
pub supervisor_active: bool,
}
impl Default for MobOrchestratorSnapshot {
fn default() -> Self {
Self {
phase: MobState::Creating,
coordinator_bound: false,
pending_spawn_count: 0,
active_flow_count: 0,
topology_revision: 0,
supervisor_active: false,
}
}
}
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct MobLifecycleSnapshot {
pub phase: MobState,
pub active_run_count: u32,
pub cleanup_pending: bool,
}
#[cfg(test)]
impl Default for MobLifecycleSnapshot {
fn default() -> Self {
Self {
phase: MobState::Running,
active_run_count: 0,
cleanup_pending: false,
}
}
}
#[cfg(test)]
#[derive(Debug, Clone, Default)]
pub(crate) struct MobDslT2Snapshot {
pub member_state_markers: std::collections::BTreeMap<
crate::machines::mob_machine::AgentRuntimeId,
crate::machines::mob_machine::MobMemberState,
>,
pub wiring_edges: std::collections::BTreeSet<crate::machines::mob_machine::WiringEdge>,
pub external_peer_edges:
std::collections::BTreeSet<crate::machines::mob_machine::ExternalPeerEdge>,
pub identity_to_runtime: std::collections::BTreeMap<
crate::machines::mob_machine::AgentIdentity,
crate::machines::mob_machine::AgentRuntimeId,
>,
pub member_restore_failures:
std::collections::BTreeMap<crate::machines::mob_machine::AgentIdentity, String>,
pub member_session_bindings: std::collections::BTreeMap<
crate::machines::mob_machine::AgentIdentity,
crate::machines::mob_machine::SessionId,
>,
pub pending_spawn_sessions: std::collections::BTreeMap<
crate::machines::mob_machine::AgentIdentity,
crate::machines::mob_machine::SessionId,
>,
pub pending_session_ingress_detach_runtime_ids:
std::collections::BTreeSet<crate::machines::mob_machine::AgentRuntimeId>,
pub topology_epoch: u64,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct MobStartupKickoffSnapshot {
pub pending_kickoff_member_ids: std::collections::BTreeSet<String>,
pub ready_runtime_ids: std::collections::BTreeSet<String>,
}
#[derive(Debug, Clone, Default)]
pub(crate) struct MobMemberMachineProjection {
pub runtime_id: Option<crate::machines::mob_machine::AgentRuntimeId>,
pub state_marker: Option<crate::machines::mob_machine::MobMemberState>,
pub live_runtime: bool,
pub bound_session_id: Option<crate::machines::mob_machine::SessionId>,
}
pub(super) struct SubmitWorkPayload {
pub runtime_id: AgentRuntimeId,
pub fence_token: FenceToken,
pub work_ref: WorkRef,
pub content: ContentInput,
pub origin: WorkOrigin,
pub handling_mode: meerkat_core::types::HandlingMode,
pub render_metadata: Option<meerkat_core::types::RenderMetadata>,
pub ack_mode: crate::mob_machine::SubmitWorkAckMode,
}
pub(super) enum MobCommand {
Spawn {
spec: Box<super::handle::SpawnMemberSpec>,
spawn_source: super::handle::SpawnSource,
owner_bridge_session_id: Option<SessionId>,
ops_registry: Option<Arc<dyn meerkat_core::ops_lifecycle::OpsLifecycleRegistry>>,
reply_tx: oneshot::Sender<Result<super::handle::MemberSpawnReceipt, MobError>>,
},
SpawnProvisioned {
spawn_ticket: u64,
result: Result<super::handle::MemberSpawnReceipt, MobError>,
},
Retire {
agent_identity: MeerkatId,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
Respawn {
agent_identity: MeerkatId,
initial_message: Option<ContentInput>,
reply_tx: oneshot::Sender<
Result<super::handle::MemberRespawnReceipt, super::handle::MobRespawnError>,
>,
},
RetireAll {
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
SubmitWork {
payload: Box<SubmitWorkPayload>,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
SendPeerMessage {
from: MeerkatId,
to: MeerkatId,
content: ContentInput,
handling_mode: meerkat_core::types::HandlingMode,
reply_tx: oneshot::Sender<Result<meerkat_core::comms::SendReceipt, MobError>>,
},
CancelAllWork {
runtime_id: AgentRuntimeId,
fence_token: FenceToken,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
#[cfg(feature = "runtime-adapter")]
KickoffOutcomeResolved {
agent_identity: MeerkatId,
outcome: meerkat_runtime::completion::CompletionOutcome,
ack_tx: oneshot::Sender<()>,
},
RunFlow {
flow_id: FlowId,
activation_params: serde_json::Value,
scoped_event_tx: Option<tokio::sync::mpsc::Sender<meerkat_core::ScopedAgentEvent>>,
reply_tx: oneshot::Sender<Result<RunId, MobError>>,
},
CancelFlow {
run_id: RunId,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
FlowStatus {
run_id: RunId,
reply_tx: oneshot::Sender<Result<Option<MobRun>, MobError>>,
},
CommitFlowRunCommand {
run_id: RunId,
command: Box<MobMachineFlowRunCommand>,
context: &'static str,
reply_tx: oneshot::Sender<Result<Option<Vec<flow_run::Effect>>, MobError>>,
},
CommitFlowTerminalization {
run_id: RunId,
flow_id: FlowId,
target: TerminalizationTarget,
command: Box<MobMachineFlowRunCommand>,
context: &'static str,
reply_tx: oneshot::Sender<Result<TerminalizationOutcome, MobError>>,
},
CommitFlowFrameStorePlan {
run_id: RunId,
plan: Box<FlowFrameLoopStorePlan>,
reply_tx: oneshot::Sender<Result<bool, MobError>>,
},
ProjectMachineInput {
input: Box<mob_dsl::MobMachineInput>,
reply_tx: oneshot::Sender<Result<mob_dsl::MobMachineState, MobError>>,
},
PreviewMachineInput {
input: Box<mob_dsl::MobMachineInput>,
reply_tx: oneshot::Sender<Result<mob_dsl::MobMachineState, MobError>>,
},
QueryMachineState {
reply_tx: oneshot::Sender<mob_dsl::MobMachineState>,
},
ProjectMachineSignal {
signal: mob_dsl::MobMachineSignal,
},
FlowFinished {
run_id: RunId,
},
FlowCanceledCleanup {
run_id: RunId,
},
#[cfg(test)]
FlowTrackerCounts {
reply_tx: oneshot::Sender<(usize, usize)>,
},
#[cfg(test)]
OrchestratorSnapshot {
reply_tx: oneshot::Sender<MobOrchestratorSnapshot>,
},
#[cfg(test)]
LifecycleSnapshot {
reply_tx: oneshot::Sender<MobLifecycleSnapshot>,
},
#[cfg(test)]
LifecycleNotificationBurst {
count: usize,
message: String,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
#[cfg(test)]
DslT2Snapshot {
reply_tx: oneshot::Sender<MobDslT2Snapshot>,
},
StartupKickoffSnapshot {
reply_tx: oneshot::Sender<MobStartupKickoffSnapshot>,
},
ProjectMemberList {
include_retiring: bool,
reply_tx: oneshot::Sender<Vec<super::MobMemberListEntry>>,
},
ProjectMemberStatus {
agent_identity: crate::ids::AgentIdentity,
reply_tx: oneshot::Sender<super::MobMemberSnapshot>,
},
MemberMachineProjection {
agent_identity: crate::ids::AgentIdentity,
reply_tx: oneshot::Sender<MobMemberMachineProjection>,
},
Stop {
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
ResumeLifecycle {
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
Complete {
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
Destroy {
reply_tx: oneshot::Sender<
Result<super::handle::MobDestroyReport, super::handle::MobDestroyError>,
>,
},
Reset {
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
SubscribeAgentEvents {
agent_identity: MeerkatId,
reply_tx: oneshot::Sender<Result<EventStream, MobError>>,
},
SubscribeAllAgentEvents {
reply_tx: oneshot::Sender<Result<Vec<(MeerkatId, EventStream)>, MobError>>,
},
RotateSupervisor {
reply_tx: oneshot::Sender<Result<super::handle::SupervisorRotationReport, MobError>>,
},
PollEvents {
after_cursor: u64,
limit: usize,
reply_tx: oneshot::Sender<Result<Vec<crate::event::MobEvent>, MobError>>,
},
ReplayAllEvents {
reply_tx: oneshot::Sender<Result<Vec<crate::event::MobEvent>, MobError>>,
},
RecordOperatorActionProvenance {
tool_name: String,
authority_context: meerkat_core::service::MobToolAuthorityContext,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
ForceCancel {
agent_identity: MeerkatId,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
Wire {
local: MeerkatId,
target: super::handle::PeerTarget,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
WireMembersBatch {
edges: Vec<(AgentIdentity, AgentIdentity)>,
reply_tx: oneshot::Sender<Result<super::handle::MobWireMembersBatchReport, MobError>>,
},
Unwire {
local: MeerkatId,
target: super::handle::PeerTarget,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
SetSpawnPolicy {
policy: Option<Arc<dyn super::spawn_policy::SpawnPolicy>>,
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
Shutdown {
reply_tx: oneshot::Sender<Result<(), MobError>>,
},
QueryPhase {
reply_tx: oneshot::Sender<MobState>,
},
}