Skip to main content

meerkat_mobkit/unified_runtime/
types.rs

1//! Error types, hook definitions, and report structures for the unified runtime.
2
3use std::fmt::{Display, Formatter};
4
5use serde::{Deserialize, Serialize};
6
7use crate::mob_handle_runtime::MobRuntimeError;
8use crate::runtime::{
9    NormalizationError, RuntimeRouteMutationError, RuntimeShutdownReport, ScheduleValidationError,
10    SubscribeError,
11};
12
13use super::edge_types::{DesiredPeerEdge, EdgeReconcileFailure};
14
15/// Report from dynamic edge reconciliation.
16///
17/// Best-effort: partial success is reported clearly. Apps decide whether
18/// to treat failures as fatal.
19#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
20pub struct UnifiedRuntimeReconcileEdgesReport {
21    pub desired_edges: Vec<DesiredPeerEdge>,
22    pub wired_edges: Vec<DesiredPeerEdge>,
23    pub unwired_edges: Vec<DesiredPeerEdge>,
24    pub retained_edges: Vec<DesiredPeerEdge>,
25    pub preexisting_edges: Vec<DesiredPeerEdge>,
26    pub skipped_missing_members: Vec<DesiredPeerEdge>,
27    pub pruned_stale_managed_edges: Vec<DesiredPeerEdge>,
28    #[serde(default)]
29    pub failures: Vec<EdgeReconcileFailure>,
30}
31
32impl UnifiedRuntimeReconcileEdgesReport {
33    /// True if all desired edges were successfully applied or retained.
34    pub fn is_complete(&self) -> bool {
35        self.failures.is_empty() && self.skipped_missing_members.is_empty()
36    }
37}
38
39#[derive(Debug)]
40pub enum UnifiedRuntimeBootstrapError {
41    Mob(MobRuntimeError),
42    Module(crate::runtime::MobkitRuntimeError),
43    ModuleStartupThreadPanicked,
44    ModuleStartupRollbackFailed {
45        startup_error: Box<UnifiedRuntimeBootstrapError>,
46        rollback_error: MobRuntimeError,
47    },
48    PreSpawnHook(String),
49    IdentityFirst(String),
50}
51
52impl Display for UnifiedRuntimeBootstrapError {
53    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54        match self {
55            Self::Mob(err) => write!(f, "failed to bootstrap mob runtime: {err}"),
56            Self::Module(err) => write!(f, "failed to bootstrap module runtime: {err:?}"),
57            Self::ModuleStartupThreadPanicked => {
58                write!(
59                    f,
60                    "failed to bootstrap module runtime: startup thread panicked"
61                )
62            }
63            Self::PreSpawnHook(err) => {
64                write!(f, "pre-spawn hook failed: {err}")
65            }
66            Self::IdentityFirst(err) => {
67                write!(f, "identity-first bootstrap failed: {err}")
68            }
69            Self::ModuleStartupRollbackFailed {
70                startup_error,
71                rollback_error,
72            } => {
73                write!(
74                    f,
75                    "failed to bootstrap unified runtime: startup error ({startup_error}) and rollback failed: {rollback_error}"
76                )
77            }
78        }
79    }
80}
81
82impl std::error::Error for UnifiedRuntimeBootstrapError {}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum UnifiedRuntimeBuilderField {
86    MobSpec,
87    ModuleConfig,
88    Timeout,
89}
90
91#[derive(Debug)]
92pub enum UnifiedRuntimeBuilderError {
93    MissingRequiredField(UnifiedRuntimeBuilderField),
94    Bootstrap(UnifiedRuntimeBootstrapError),
95    /// Failed to read a definition TOML file or create a state directory.
96    Io(String),
97    /// Failed to parse a mob definition TOML.
98    DefinitionLoad(String),
99    /// Conflicting builder configuration (e.g., persistent_state + continuity_store).
100    ConflictingConfiguration(String),
101}
102
103impl Display for UnifiedRuntimeBuilderError {
104    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105        match self {
106            Self::MissingRequiredField(UnifiedRuntimeBuilderField::MobSpec) => {
107                write!(f, "missing required builder field: mob_spec or definition")
108            }
109            Self::MissingRequiredField(UnifiedRuntimeBuilderField::ModuleConfig) => {
110                write!(f, "missing required builder field: module_config")
111            }
112            Self::MissingRequiredField(UnifiedRuntimeBuilderField::Timeout) => {
113                write!(f, "missing required builder field: timeout")
114            }
115            Self::Bootstrap(err) => write!(f, "{err}"),
116            Self::Io(msg) => write!(f, "{msg}"),
117            Self::DefinitionLoad(msg) => write!(f, "{msg}"),
118            Self::ConflictingConfiguration(msg) => write!(f, "conflicting configuration: {msg}"),
119        }
120    }
121}
122
123impl std::error::Error for UnifiedRuntimeBuilderError {}
124
125#[derive(Debug)]
126pub enum UnifiedRuntimeError {
127    Normalize(NormalizationError),
128    Subscribe(SubscribeError),
129    ScheduleValidation(ScheduleValidationError),
130    RuntimeShuttingDown,
131    ScheduleDispatchThreadPanicked,
132}
133
134impl Display for UnifiedRuntimeError {
135    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136        match self {
137            Self::Normalize(err) => write!(f, "failed to normalize unified event: {err:?}"),
138            Self::Subscribe(err) => write!(f, "failed to subscribe to unified events: {err:?}"),
139            Self::ScheduleValidation(err) => {
140                write!(f, "failed to dispatch schedule tick: {err:?}")
141            }
142            Self::RuntimeShuttingDown => {
143                write!(
144                    f,
145                    "failed to dispatch schedule tick: unified runtime is shutting down"
146                )
147            }
148            Self::ScheduleDispatchThreadPanicked => {
149                write!(
150                    f,
151                    "failed to dispatch schedule tick: dispatch thread panicked"
152                )
153            }
154        }
155    }
156}
157
158impl std::error::Error for UnifiedRuntimeError {}
159
160impl From<NormalizationError> for UnifiedRuntimeError {
161    fn from(value: NormalizationError) -> Self {
162        Self::Normalize(value)
163    }
164}
165
166impl From<SubscribeError> for UnifiedRuntimeError {
167    fn from(value: SubscribeError) -> Self {
168        Self::Subscribe(value)
169    }
170}
171
172impl From<ScheduleValidationError> for UnifiedRuntimeError {
173    fn from(value: ScheduleValidationError) -> Self {
174        Self::ScheduleValidation(value)
175    }
176}
177
178#[derive(Debug)]
179pub struct UnifiedRuntimeShutdownReport {
180    pub drain: ShutdownDrainReport,
181    pub module_shutdown: RuntimeShutdownReport,
182    pub mob_stop: Result<(), MobRuntimeError>,
183}
184
185#[derive(Debug)]
186pub struct UnifiedRuntimeRunReport {
187    pub serve_result: std::io::Result<()>,
188    pub shutdown: UnifiedRuntimeShutdownReport,
189}
190
191/// Report from a rediscover operation (reset + re-run discovery + reconcile edges).
192#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
193pub struct RediscoverReport {
194    /// Number of members spawned by discovery.
195    pub spawned: Vec<String>,
196    /// Edge reconciliation report (if EdgeDiscovery is configured).
197    pub edges: UnifiedRuntimeReconcileEdgesReport,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct UnifiedRuntimeReconcileRoutingReport {
202    pub router_module_loaded: bool,
203    pub active_members: Vec<String>,
204    pub added_route_keys: Vec<String>,
205    pub removed_route_keys: Vec<String>,
206}
207
208/// Per-identity reconcile failure — re-export of the canonical
209/// meerkat-contracts wire shape so SDK consumers see the same field
210/// names whether they go through `mob/reconcile` or `mobkit/reconcile`.
211pub use meerkat_contracts::MobReconcileFailureWire as MobReconcileFailure;
212
213/// Roster half of a reconcile pass — re-export of meerkat-contracts'
214/// canonical wire shape. `spawned: Vec<MobSpawnReceiptWire>` carries the
215/// server-resolved `WireMemberRef` per receipt, replacing the
216/// identity-string list mobkit projected before 0.6.
217pub use meerkat_contracts::MobReconcileReportWire as MobReconcileReport;
218
219/// Project meerkat's native `ReconcileReport` into the canonical wire shape.
220///
221/// Mirrors the `mob/reconcile` RPC handler's projection in
222/// `meerkat-rpc/src/handlers/mob.rs` so both surfaces emit byte-identical
223/// JSON for the same reconcile outcome.
224pub fn meerkat_reconcile_report_to_wire(
225    mob_id: &str,
226    report: meerkat_mob::runtime::reconcile::ReconcileReport,
227) -> MobReconcileReport {
228    use meerkat_contracts::{MobSpawnReceiptWire, WireMemberRef};
229    MobReconcileReport {
230        desired: report
231            .desired
232            .into_iter()
233            .map(|id| id.to_string())
234            .collect(),
235        retained: report
236            .retained
237            .into_iter()
238            .map(|id| id.to_string())
239            .collect(),
240        spawned: report
241            .spawned
242            .into_iter()
243            .map(|receipt| {
244                let identity_str = receipt.agent_identity.to_string();
245                MobSpawnReceiptWire {
246                    member_ref: WireMemberRef::encode(mob_id, &identity_str),
247                    agent_identity: identity_str,
248                }
249            })
250            .collect(),
251        retired: report
252            .retired
253            .into_iter()
254            .map(|id| id.to_string())
255            .collect(),
256        failures: report
257            .failures
258            .into_iter()
259            .map(|failure| MobReconcileFailure {
260                agent_identity: failure.agent_identity.to_string(),
261                stage: match failure.stage {
262                    meerkat_mob::runtime::reconcile::ReconcileStage::Spawn => {
263                        meerkat_contracts::WireMobReconcileStage::Spawn
264                    }
265                    meerkat_mob::runtime::reconcile::ReconcileStage::Retire => {
266                        meerkat_contracts::WireMobReconcileStage::Retire
267                    }
268                },
269                error: failure.error.to_string(),
270            })
271            .collect(),
272    }
273}
274
275// Eq is dropped because the canonical wire `MobReconcileReportWire` does
276// not implement `Eq` (its nested types are PartialEq only).
277#[derive(Debug, Clone, PartialEq)]
278pub struct UnifiedRuntimeReconcileReport {
279    pub mob: MobReconcileReport,
280    pub edges: UnifiedRuntimeReconcileEdgesReport,
281    pub routing: UnifiedRuntimeReconcileRoutingReport,
282}
283
284#[derive(Debug)]
285pub enum UnifiedRuntimeReconcileError {
286    Mob(MobRuntimeError),
287    RouteMutation(RuntimeRouteMutationError),
288    /// Meerkat 0.6's `MobHandle::reconcile` collects per-identity failures
289    /// into the returned report rather than returning `Err` on first failure.
290    /// `UnifiedRuntime::reconcile` re-lifts that into an error variant so
291    /// Rust callers using `?` still see failure propagation, while keeping
292    /// the full report available for inspection.
293    PartialFailure(Box<UnifiedRuntimeReconcileReport>),
294}
295
296impl Display for UnifiedRuntimeReconcileError {
297    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
298        match self {
299            Self::Mob(err) => write!(f, "failed to reconcile mob roster: {err}"),
300            Self::RouteMutation(err) => {
301                write!(f, "failed to reconcile routing wiring: {err:?}")
302            }
303            Self::PartialFailure(report) => {
304                write!(
305                    f,
306                    "reconcile completed with {} per-identity failure(s): {:?}",
307                    report.mob.failures.len(),
308                    report.mob.failures
309                )
310            }
311        }
312    }
313}
314
315impl std::error::Error for UnifiedRuntimeReconcileError {}
316
317#[derive(Debug)]
318pub struct ShutdownDrainReport {
319    pub drained_count: usize,
320    pub timed_out: bool,
321    pub drain_duration_ms: u64,
322}
323
324/// Operational error event for alerting.
325///
326/// Fired via the `on_error` hook when runtime operations fail. Apps
327/// match on variants to decide alerting (Slack, PagerDuty, log, etc.).
328///
329/// Marked `#[non_exhaustive]` — new variants can be added without
330/// breaking downstream match arms (use a `_` wildcard).
331///
332/// **Wired fire points:**
333/// - `SpawnFailure` — `mob_ops.rs` spawn error path
334/// - `ReconcileIncomplete` — `edge_reconcile.rs` after `reconcile_edges`
335/// - `RediscoverFailure` — `lifecycle.rs` rediscover error path
336/// - `HostLoopCrash` — `lifecycle.rs` detects `run_failed` agent events during drain
337/// - `CheckpointFailure` — via `run_periodic_gc_with_error_callback` in session store
338#[non_exhaustive]
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340#[serde(tag = "category", rename_all = "snake_case")]
341pub enum ErrorEvent {
342    SpawnFailure {
343        member_id: String,
344        profile: String,
345        error: String,
346    },
347    ReconcileIncomplete {
348        failures: usize,
349        skipped: usize,
350    },
351    CheckpointFailure {
352        session_id: String,
353        error: String,
354    },
355    HostLoopCrash {
356        member_id: String,
357        error: String,
358    },
359    RediscoverFailure {
360        error: String,
361    },
362    EventLogFlushFailure {
363        error: String,
364    },
365}
366
367impl Display for ErrorEvent {
368    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
369        match self {
370            Self::SpawnFailure {
371                member_id, error, ..
372            } => {
373                write!(f, "spawn_failure: {member_id}: {error}")
374            }
375            Self::ReconcileIncomplete { failures, skipped } => {
376                write!(
377                    f,
378                    "reconcile_incomplete: {failures} failures, {skipped} skipped"
379                )
380            }
381            Self::CheckpointFailure { session_id, error } => {
382                write!(f, "checkpoint_failure: {session_id}: {error}")
383            }
384            Self::HostLoopCrash { member_id, error } => {
385                write!(f, "host_loop_crash: {member_id}: {error}")
386            }
387            Self::RediscoverFailure { error } => {
388                write!(f, "rediscover_failure: {error}")
389            }
390            Self::EventLogFlushFailure { error } => {
391                write!(f, "event_log_flush_failure: {error}")
392            }
393        }
394    }
395}