use crate::ids::{AgentRuntimeId, FenceToken, FlowId, LoopId, MeerkatId, ProfileName, WorkRef};
use crate::runtime::MobState;
use crate::store::FrameAtomicOperation;
use crate::validate::Diagnostic;
use crate::{MobId, RunId, StepId};
use meerkat_contracts::MobSpawnManyFailureCause;
use meerkat_contracts::wire::supervisor_bridge::{BridgeRejectionCause, BridgeRejectionReply};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MobMemberCapability {
InteractionEventInjector,
}
impl std::fmt::Display for MobMemberCapability {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InteractionEventInjector => f.write_str("interaction_event_injector"),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum MobError {
#[error("profile not found: {0}")]
ProfileNotFound(ProfileName),
#[error("mob member not found: {0}")]
MemberNotFound(MeerkatId),
#[error("mob member already exists: {0}")]
MemberAlreadyExists(MeerkatId),
#[error("mob member is not externally addressable: {0}")]
NotExternallyAddressable(MeerkatId),
#[error("invalid state transition: {from} -> {to}")]
InvalidTransition { from: MobState, to: MobState },
#[error("wiring error: {0}")]
WiringError(String),
#[error(
"supervisor rotation incomplete: failed after {rotated_peer_count} remote peer(s) accepted attempted epoch {attempted_epoch}; local authority remains at epoch {previous_epoch}; rollback_succeeded={rollback_succeeded}; pending_authority_recorded={pending_authority_recorded}; pending_authority_process_local={pending_authority_process_local}; failure: {reason}"
)]
SupervisorRotationIncomplete {
previous_epoch: u64,
attempted_epoch: u64,
attempted_public_peer_id: String,
rotated_peer_count: usize,
rollback_succeeded: bool,
pending_authority_recorded: bool,
pending_authority_process_local: bool,
rollback_error: Option<String>,
reason: String,
},
#[error("bridge command rejected ({cause:?}): {reason}")]
BridgeCommandRejected {
cause: BridgeRejectionCause,
reason: String,
},
#[error(
"member {member_id} failed to restore {}: {reason}",
format_member_restore_target(.session_id.as_ref())
)]
MemberRestoreFailed {
member_id: MeerkatId,
session_id: Option<meerkat_core::types::SessionId>,
reason: String,
},
#[error("kickoff wait timed out")]
KickoffWaitTimedOut { pending_member_ids: Vec<MeerkatId> },
#[error("member ready wait timed out")]
ReadyWaitTimedOut { pending_member_ids: Vec<MeerkatId> },
#[error("definition error: {}", format_diagnostics(.0))]
DefinitionError(Vec<Diagnostic>),
#[error("flow not found: {0}")]
FlowNotFound(FlowId),
#[error("flow failed for run {run_id}: {reason}")]
FlowFailed { run_id: RunId, reason: String },
#[error("run not found: {0}")]
RunNotFound(RunId),
#[error("run canceled: {0}")]
RunCanceled(RunId),
#[error("flow turn timed out")]
FlowTurnTimedOut,
#[error(
"loop '{loop_id}' would exceed max_frame_depth={max_frame_depth} (current depth={current_depth})"
)]
FrameDepthLimitExceeded {
loop_id: LoopId,
max_frame_depth: u32,
current_depth: u32,
},
#[error("mob run store cannot atomically persist frame operation '{operation}'")]
FrameAtomicPersistenceUnavailable { operation: FrameAtomicOperation },
#[error("spec revision conflict for mob {mob_id}: expected {expected:?}, actual {actual}")]
SpecRevisionConflict {
mob_id: MobId,
expected: Option<u64>,
actual: u64,
},
#[error("schema validation failed for step {step_id}: {message}")]
SchemaValidation { step_id: StepId, message: String },
#[error("insufficient targets for step {step_id}: required {required}, available {available}")]
InsufficientTargets {
step_id: StepId,
required: u8,
available: usize,
},
#[error("topology violation: {from_role} -> {to_role}")]
TopologyViolation {
from_role: ProfileName,
to_role: ProfileName,
},
#[error("bridge delivery rejected ({cause}): {reason}")]
BridgeDeliveryRejected {
cause: meerkat_contracts::wire::supervisor_bridge::BridgeDeliveryRejectionCause,
reason: String,
},
#[error("supervisor escalation: {0}")]
SupervisorEscalation(String),
#[error("unsupported for runtime mode {mode}: {reason}")]
UnsupportedForMode {
mode: crate::MobRuntimeMode,
reason: String,
},
#[error("mob member {member_id} missing required capability {capability}: {context}")]
MissingMemberCapability {
member_id: MeerkatId,
capability: MobMemberCapability,
context: &'static str,
},
#[error("reset barrier active")]
ResetBarrier,
#[error("storage error: {0}")]
StorageError(#[source] Box<dyn std::error::Error + Send + Sync>),
#[error("session error: {0}")]
SessionError(#[from] meerkat_core::service::SessionError),
#[error("comms error: {0}")]
CommsError(#[from] meerkat_core::comms::SendError),
#[error("callback pending for session {session_id} on tool '{tool_name}'")]
CallbackPending {
session_id: meerkat_core::types::SessionId,
tool_name: String,
args: serde_json::Value,
},
#[error("stale fence token for {runtime_id}: expected {expected}, got {actual}")]
StaleFenceToken {
runtime_id: AgentRuntimeId,
expected: FenceToken,
actual: FenceToken,
},
#[error("stale mob event cursor: requested {after_cursor}, latest {latest_cursor}")]
StaleEventCursor {
after_cursor: u64,
latest_cursor: u64,
},
#[error("work not found: {0}")]
WorkNotFound(WorkRef),
#[error("internal error: {0}")]
Internal(String),
}
fn format_diagnostics(diagnostics: &[Diagnostic]) -> String {
diagnostics
.iter()
.map(|d| format!("{}: {}", d.code, d.message))
.collect::<Vec<_>>()
.join("; ")
}
fn format_member_restore_target(session_id: Option<&meerkat_core::types::SessionId>) -> String {
match session_id {
Some(session_id) => format!("session {session_id}"),
None => "runtime bridge state".to_string(),
}
}
impl From<Box<dyn std::error::Error + Send + Sync>> for MobError {
fn from(error: Box<dyn std::error::Error + Send + Sync>) -> Self {
Self::StorageError(error)
}
}
impl From<crate::store::MobStoreError> for MobError {
fn from(error: crate::store::MobStoreError) -> Self {
match error {
crate::store::MobStoreError::SpecRevisionConflict {
mob_id,
expected,
actual,
} => Self::SpecRevisionConflict {
mob_id,
expected,
actual,
},
crate::store::MobStoreError::FrameAtomicPersistenceUnavailable { operation } => {
Self::FrameAtomicPersistenceUnavailable { operation }
}
other => Self::StorageError(Box::new(other)),
}
}
}
impl From<BridgeRejectionReply> for MobError {
fn from(rejection: BridgeRejectionReply) -> Self {
let cause = rejection.typed_cause();
let reason = rejection.reason().to_string();
match cause {
Some(cause) => Self::BridgeCommandRejected { cause, reason },
None => Self::WiringError(reason),
}
}
}
impl MobError {
pub fn bridge_rejection_cause(&self) -> Option<BridgeRejectionCause> {
match self {
Self::BridgeCommandRejected { cause, .. } => Some(*cause),
_ => None,
}
}
pub fn spawn_many_failure_cause(&self) -> MobSpawnManyFailureCause {
match self {
Self::ProfileNotFound(_) => MobSpawnManyFailureCause::ProfileNotFound,
Self::MemberNotFound(_) => MobSpawnManyFailureCause::MemberNotFound,
Self::MemberAlreadyExists(_) => MobSpawnManyFailureCause::MemberAlreadyExists,
Self::NotExternallyAddressable(_) => MobSpawnManyFailureCause::NotExternallyAddressable,
Self::InvalidTransition { .. } => MobSpawnManyFailureCause::InvalidTransition,
Self::WiringError(_) => MobSpawnManyFailureCause::WiringError,
Self::SupervisorRotationIncomplete { .. } => MobSpawnManyFailureCause::WiringError,
Self::BridgeCommandRejected { .. } => MobSpawnManyFailureCause::BridgeCommandRejected,
Self::MemberRestoreFailed { .. } => MobSpawnManyFailureCause::MemberRestoreFailed,
Self::KickoffWaitTimedOut { .. } => MobSpawnManyFailureCause::KickoffWaitTimedOut,
Self::ReadyWaitTimedOut { .. } => MobSpawnManyFailureCause::ReadyWaitTimedOut,
Self::DefinitionError(_) => MobSpawnManyFailureCause::DefinitionError,
Self::FlowNotFound(_) => MobSpawnManyFailureCause::FlowNotFound,
Self::FlowFailed { .. } => MobSpawnManyFailureCause::FlowFailed,
Self::RunNotFound(_) => MobSpawnManyFailureCause::RunNotFound,
Self::RunCanceled(_) => MobSpawnManyFailureCause::RunCanceled,
Self::FlowTurnTimedOut => MobSpawnManyFailureCause::FlowTurnTimedOut,
Self::FrameDepthLimitExceeded { .. } => {
MobSpawnManyFailureCause::FrameDepthLimitExceeded
}
Self::FrameAtomicPersistenceUnavailable { .. } => {
MobSpawnManyFailureCause::FrameAtomicPersistenceUnavailable
}
Self::SpecRevisionConflict { .. } => MobSpawnManyFailureCause::SpecRevisionConflict,
Self::SchemaValidation { .. } => MobSpawnManyFailureCause::SchemaValidation,
Self::InsufficientTargets { .. } => MobSpawnManyFailureCause::InsufficientTargets,
Self::TopologyViolation { .. } => MobSpawnManyFailureCause::TopologyViolation,
Self::BridgeDeliveryRejected { .. } => MobSpawnManyFailureCause::BridgeDeliveryRejected,
Self::SupervisorEscalation(_) => MobSpawnManyFailureCause::SupervisorEscalation,
Self::UnsupportedForMode { .. } => MobSpawnManyFailureCause::UnsupportedForMode,
Self::MissingMemberCapability { .. } => {
MobSpawnManyFailureCause::MissingMemberCapability
}
Self::ResetBarrier => MobSpawnManyFailureCause::ResetBarrier,
Self::StorageError(_) => MobSpawnManyFailureCause::StorageError,
Self::SessionError(_) => MobSpawnManyFailureCause::SessionError,
Self::CommsError(_) => MobSpawnManyFailureCause::CommsError,
Self::CallbackPending { .. } => MobSpawnManyFailureCause::CallbackPending,
Self::StaleFenceToken { .. } => MobSpawnManyFailureCause::StaleFenceToken,
Self::StaleEventCursor { .. } => MobSpawnManyFailureCause::StaleEventCursor,
Self::WorkNotFound(_) => MobSpawnManyFailureCause::WorkNotFound,
Self::Internal(_) => MobSpawnManyFailureCause::Internal,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::validate::{Diagnostic, DiagnosticCode, DiagnosticSeverity};
#[test]
fn test_profile_not_found_display() {
let err = MobError::ProfileNotFound(ProfileName::from("missing"));
assert!(format!("{err}").contains("missing"));
}
#[test]
fn member_not_found_and_already_exists_use_identity_first_display() {
let not_found = MobError::MemberNotFound(MeerkatId::from("singer"));
let already = MobError::MemberAlreadyExists(MeerkatId::from("singer"));
let not_addressable = MobError::NotExternallyAddressable(MeerkatId::from("singer"));
let msg_nf = format!("{not_found}");
let msg_ae = format!("{already}");
let msg_na = format!("{not_addressable}");
assert_eq!(msg_nf, "mob member not found: singer");
assert_eq!(msg_ae, "mob member already exists: singer");
assert_eq!(msg_na, "mob member is not externally addressable: singer");
for msg in [&msg_nf, &msg_ae, &msg_na] {
assert!(
!msg.to_lowercase().contains("meerkat"),
"identity-first mob errors must not carry legacy 'meerkat' wording: {msg}",
);
}
}
#[test]
fn spawn_many_failure_cause_preserves_typed_mob_error_variant() {
let profile_missing = MobError::ProfileNotFound(ProfileName::from("missing"));
assert_eq!(
profile_missing.spawn_many_failure_cause(),
MobSpawnManyFailureCause::ProfileNotFound
);
let internal = MobError::Internal("unexpected".to_string());
assert_eq!(
internal.spawn_many_failure_cause(),
MobSpawnManyFailureCause::Internal
);
}
#[test]
fn test_invalid_transition_display() {
let err = MobError::InvalidTransition {
from: MobState::Completed,
to: MobState::Running,
};
let msg = format!("{err}");
assert!(msg.contains("Completed"));
assert!(msg.contains("Running"));
}
#[test]
fn test_definition_error_display() {
let err = MobError::DefinitionError(vec![
Diagnostic {
code: DiagnosticCode::MissingSkillRef,
message: "skill 'foo' not found".to_string(),
location: Some("profiles.worker.skills[0]".to_string()),
severity: DiagnosticSeverity::Error,
},
Diagnostic {
code: DiagnosticCode::EmptyProfiles,
message: "no spawnable profiles".to_string(),
location: Some("profiles".to_string()),
severity: DiagnosticSeverity::Error,
},
]);
let msg = format!("{err}");
assert!(msg.contains("missing_skill_ref"));
assert!(msg.contains("empty_profiles"));
}
#[test]
fn test_session_error_from() {
let session_err = meerkat_core::service::SessionError::NotFound {
id: meerkat_core::types::SessionId::new(),
};
let mob_err: MobError = session_err.into();
assert!(matches!(mob_err, MobError::SessionError(_)));
}
#[test]
fn test_comms_error_from() {
let send_err = meerkat_core::comms::SendError::PeerNotFound("agent-1".to_string());
let mob_err: MobError = send_err.into();
assert!(matches!(mob_err, MobError::CommsError(_)));
}
#[test]
fn test_storage_error() {
let err = MobError::StorageError(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"disk full",
)));
assert!(format!("{err}").contains("disk full"));
}
#[test]
fn test_all_variants_exist() {
let _variants: Vec<MobError> = vec![
MobError::ProfileNotFound(ProfileName::from("p")),
MobError::MemberNotFound(MeerkatId::from("m")),
MobError::MemberAlreadyExists(MeerkatId::from("m")),
MobError::NotExternallyAddressable(MeerkatId::from("m")),
MobError::InvalidTransition {
from: MobState::Creating,
to: MobState::Running,
},
MobError::WiringError("w".to_string()),
MobError::SupervisorRotationIncomplete {
previous_epoch: 1,
attempted_epoch: 2,
attempted_public_peer_id: "peer-next".to_string(),
rotated_peer_count: 1,
rollback_succeeded: false,
pending_authority_recorded: true,
pending_authority_process_local: false,
rollback_error: Some("rollback failed".to_string()),
reason: "remote failed".to_string(),
},
MobError::BridgeCommandRejected {
cause: BridgeRejectionCause::NotBound,
reason: "bind required".to_string(),
},
MobError::MemberRestoreFailed {
member_id: MeerkatId::from("m"),
session_id: Some(meerkat_core::types::SessionId::new()),
reason: "restore failed".to_string(),
},
MobError::KickoffWaitTimedOut {
pending_member_ids: vec![MeerkatId::from("m")],
},
MobError::DefinitionError(vec![]),
MobError::FlowNotFound(FlowId::from("f")),
MobError::FlowFailed {
run_id: RunId::new(),
reason: "r".to_string(),
},
MobError::RunNotFound(RunId::new()),
MobError::RunCanceled(RunId::new()),
MobError::FlowTurnTimedOut,
MobError::FrameDepthLimitExceeded {
loop_id: LoopId::from("loop"),
max_frame_depth: 1,
current_depth: 1,
},
MobError::FrameAtomicPersistenceUnavailable {
operation: FrameAtomicOperation::CasGrantNodeSlot,
},
MobError::SpecRevisionConflict {
mob_id: MobId::from("mob"),
expected: Some(2),
actual: 3,
},
MobError::SchemaValidation {
step_id: StepId::from("step"),
message: "invalid".to_string(),
},
MobError::InsufficientTargets {
step_id: StepId::from("step"),
required: 2,
available: 1,
},
MobError::TopologyViolation {
from_role: ProfileName::from("lead"),
to_role: ProfileName::from("worker"),
},
MobError::SupervisorEscalation("boom".to_string()),
MobError::UnsupportedForMode {
mode: crate::MobRuntimeMode::TurnDriven,
reason: "autonomous host runtime required".to_string(),
},
MobError::ResetBarrier,
MobError::StorageError(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"e",
))),
MobError::SessionError(meerkat_core::service::SessionError::PersistenceDisabled),
MobError::CommsError(meerkat_core::comms::SendError::PeerOffline),
MobError::StaleFenceToken {
runtime_id: crate::ids::AgentRuntimeId::initial(crate::ids::AgentIdentity::from(
"m",
)),
expected: FenceToken::new(1),
actual: FenceToken::new(0),
},
MobError::WorkNotFound(WorkRef::new()),
MobError::Internal("i".to_string()),
];
}
}