1use std::fmt::{Display, Formatter};
4
5use serde::{Deserialize, Serialize};
6
7use crate::mob_handle_runtime::MobRuntimeError;
8use crate::runtime::{
9 NormalizationError, RuntimeRouteMutationError, RuntimeShutdownReport, ScheduleValidationError,
10 SubscribeError,
11};
12
13use super::edge_types::{DesiredPeerEdge, EdgeReconcileFailure};
14
15#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
20pub struct UnifiedRuntimeReconcileEdgesReport {
21 pub desired_edges: Vec<DesiredPeerEdge>,
22 pub wired_edges: Vec<DesiredPeerEdge>,
23 pub unwired_edges: Vec<DesiredPeerEdge>,
24 pub retained_edges: Vec<DesiredPeerEdge>,
25 pub preexisting_edges: Vec<DesiredPeerEdge>,
26 pub skipped_missing_members: Vec<DesiredPeerEdge>,
27 pub pruned_stale_managed_edges: Vec<DesiredPeerEdge>,
28 #[serde(default)]
29 pub failures: Vec<EdgeReconcileFailure>,
30}
31
32impl UnifiedRuntimeReconcileEdgesReport {
33 pub fn is_complete(&self) -> bool {
35 self.failures.is_empty() && self.skipped_missing_members.is_empty()
36 }
37}
38
39#[derive(Debug)]
40pub enum UnifiedRuntimeBootstrapError {
41 Mob(MobRuntimeError),
42 Module(crate::runtime::MobkitRuntimeError),
43 ModuleStartupThreadPanicked,
44 ModuleStartupRollbackFailed {
45 startup_error: Box<UnifiedRuntimeBootstrapError>,
46 rollback_error: MobRuntimeError,
47 },
48 PreSpawnHook(String),
49 IdentityFirst(String),
50}
51
52impl Display for UnifiedRuntimeBootstrapError {
53 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
54 match self {
55 Self::Mob(err) => write!(f, "failed to bootstrap mob runtime: {err}"),
56 Self::Module(err) => write!(f, "failed to bootstrap module runtime: {err:?}"),
57 Self::ModuleStartupThreadPanicked => {
58 write!(
59 f,
60 "failed to bootstrap module runtime: startup thread panicked"
61 )
62 }
63 Self::PreSpawnHook(err) => {
64 write!(f, "pre-spawn hook failed: {err}")
65 }
66 Self::IdentityFirst(err) => {
67 write!(f, "identity-first bootstrap failed: {err}")
68 }
69 Self::ModuleStartupRollbackFailed {
70 startup_error,
71 rollback_error,
72 } => {
73 write!(
74 f,
75 "failed to bootstrap unified runtime: startup error ({startup_error}) and rollback failed: {rollback_error}"
76 )
77 }
78 }
79 }
80}
81
82impl std::error::Error for UnifiedRuntimeBootstrapError {}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum UnifiedRuntimeBuilderField {
86 MobSpec,
87 ModuleConfig,
88 Timeout,
89}
90
91#[derive(Debug)]
92pub enum UnifiedRuntimeBuilderError {
93 MissingRequiredField(UnifiedRuntimeBuilderField),
94 Bootstrap(UnifiedRuntimeBootstrapError),
95 Io(String),
97 DefinitionLoad(String),
99 ConflictingConfiguration(String),
101}
102
103impl Display for UnifiedRuntimeBuilderError {
104 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
105 match self {
106 Self::MissingRequiredField(UnifiedRuntimeBuilderField::MobSpec) => {
107 write!(f, "missing required builder field: mob_spec or definition")
108 }
109 Self::MissingRequiredField(UnifiedRuntimeBuilderField::ModuleConfig) => {
110 write!(f, "missing required builder field: module_config")
111 }
112 Self::MissingRequiredField(UnifiedRuntimeBuilderField::Timeout) => {
113 write!(f, "missing required builder field: timeout")
114 }
115 Self::Bootstrap(err) => write!(f, "{err}"),
116 Self::Io(msg) => write!(f, "{msg}"),
117 Self::DefinitionLoad(msg) => write!(f, "{msg}"),
118 Self::ConflictingConfiguration(msg) => write!(f, "conflicting configuration: {msg}"),
119 }
120 }
121}
122
123impl std::error::Error for UnifiedRuntimeBuilderError {}
124
125#[derive(Debug)]
126pub enum UnifiedRuntimeError {
127 Normalize(NormalizationError),
128 Subscribe(SubscribeError),
129 ScheduleValidation(ScheduleValidationError),
130 RuntimeShuttingDown,
131 ScheduleDispatchThreadPanicked,
132}
133
134impl Display for UnifiedRuntimeError {
135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Normalize(err) => write!(f, "failed to normalize unified event: {err:?}"),
138 Self::Subscribe(err) => write!(f, "failed to subscribe to unified events: {err:?}"),
139 Self::ScheduleValidation(err) => {
140 write!(f, "failed to dispatch schedule tick: {err:?}")
141 }
142 Self::RuntimeShuttingDown => {
143 write!(
144 f,
145 "failed to dispatch schedule tick: unified runtime is shutting down"
146 )
147 }
148 Self::ScheduleDispatchThreadPanicked => {
149 write!(
150 f,
151 "failed to dispatch schedule tick: dispatch thread panicked"
152 )
153 }
154 }
155 }
156}
157
158impl std::error::Error for UnifiedRuntimeError {}
159
160impl From<NormalizationError> for UnifiedRuntimeError {
161 fn from(value: NormalizationError) -> Self {
162 Self::Normalize(value)
163 }
164}
165
166impl From<SubscribeError> for UnifiedRuntimeError {
167 fn from(value: SubscribeError) -> Self {
168 Self::Subscribe(value)
169 }
170}
171
172impl From<ScheduleValidationError> for UnifiedRuntimeError {
173 fn from(value: ScheduleValidationError) -> Self {
174 Self::ScheduleValidation(value)
175 }
176}
177
178#[derive(Debug)]
179pub struct UnifiedRuntimeShutdownReport {
180 pub drain: ShutdownDrainReport,
181 pub module_shutdown: RuntimeShutdownReport,
182 pub mob_stop: Result<(), MobRuntimeError>,
183}
184
185#[derive(Debug)]
186pub struct UnifiedRuntimeRunReport {
187 pub serve_result: std::io::Result<()>,
188 pub shutdown: UnifiedRuntimeShutdownReport,
189}
190
191#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
193pub struct RediscoverReport {
194 pub spawned: Vec<String>,
196 pub edges: UnifiedRuntimeReconcileEdgesReport,
198}
199
200#[derive(Debug, Clone, PartialEq, Eq)]
201pub struct UnifiedRuntimeReconcileRoutingReport {
202 pub router_module_loaded: bool,
203 pub active_members: Vec<String>,
204 pub added_route_keys: Vec<String>,
205 pub removed_route_keys: Vec<String>,
206}
207
208pub use meerkat_contracts::MobReconcileFailureWire as MobReconcileFailure;
212
213pub use meerkat_contracts::MobReconcileReportWire as MobReconcileReport;
218
219pub fn meerkat_reconcile_report_to_wire(
225 mob_id: &str,
226 report: meerkat_mob::runtime::reconcile::ReconcileReport,
227) -> MobReconcileReport {
228 use meerkat_contracts::{MobSpawnReceiptWire, WireMemberRef};
229 MobReconcileReport {
230 desired: report
231 .desired
232 .into_iter()
233 .map(|id| id.to_string())
234 .collect(),
235 retained: report
236 .retained
237 .into_iter()
238 .map(|id| id.to_string())
239 .collect(),
240 spawned: report
241 .spawned
242 .into_iter()
243 .map(|receipt| {
244 let identity_str = receipt.agent_identity.to_string();
245 MobSpawnReceiptWire {
246 member_ref: WireMemberRef::encode(mob_id, &identity_str),
247 agent_identity: identity_str,
248 }
249 })
250 .collect(),
251 retired: report
252 .retired
253 .into_iter()
254 .map(|id| id.to_string())
255 .collect(),
256 failures: report
257 .failures
258 .into_iter()
259 .map(|failure| MobReconcileFailure {
260 agent_identity: failure.agent_identity.to_string(),
261 stage: match failure.stage {
262 meerkat_mob::runtime::reconcile::ReconcileStage::Spawn => {
263 meerkat_contracts::WireMobReconcileStage::Spawn
264 }
265 meerkat_mob::runtime::reconcile::ReconcileStage::Retire => {
266 meerkat_contracts::WireMobReconcileStage::Retire
267 }
268 },
269 error: failure.error.to_string(),
270 })
271 .collect(),
272 }
273}
274
275#[derive(Debug, Clone, PartialEq)]
278pub struct UnifiedRuntimeReconcileReport {
279 pub mob: MobReconcileReport,
280 pub edges: UnifiedRuntimeReconcileEdgesReport,
281 pub routing: UnifiedRuntimeReconcileRoutingReport,
282}
283
284#[derive(Debug)]
285pub enum UnifiedRuntimeReconcileError {
286 Mob(MobRuntimeError),
287 RouteMutation(RuntimeRouteMutationError),
288 PartialFailure(Box<UnifiedRuntimeReconcileReport>),
294}
295
296impl Display for UnifiedRuntimeReconcileError {
297 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
298 match self {
299 Self::Mob(err) => write!(f, "failed to reconcile mob roster: {err}"),
300 Self::RouteMutation(err) => {
301 write!(f, "failed to reconcile routing wiring: {err:?}")
302 }
303 Self::PartialFailure(report) => {
304 write!(
305 f,
306 "reconcile completed with {} per-identity failure(s): {:?}",
307 report.mob.failures.len(),
308 report.mob.failures
309 )
310 }
311 }
312 }
313}
314
315impl std::error::Error for UnifiedRuntimeReconcileError {}
316
317#[derive(Debug)]
318pub struct ShutdownDrainReport {
319 pub drained_count: usize,
320 pub timed_out: bool,
321 pub drain_duration_ms: u64,
322}
323
324#[non_exhaustive]
339#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
340#[serde(tag = "category", rename_all = "snake_case")]
341pub enum ErrorEvent {
342 SpawnFailure {
343 member_id: String,
344 profile: String,
345 error: String,
346 },
347 ReconcileIncomplete {
348 failures: usize,
349 skipped: usize,
350 },
351 CheckpointFailure {
352 session_id: String,
353 error: String,
354 },
355 HostLoopCrash {
356 member_id: String,
357 error: String,
358 },
359 RediscoverFailure {
360 error: String,
361 },
362 EventLogFlushFailure {
363 error: String,
364 },
365}
366
367impl Display for ErrorEvent {
368 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
369 match self {
370 Self::SpawnFailure {
371 member_id, error, ..
372 } => {
373 write!(f, "spawn_failure: {member_id}: {error}")
374 }
375 Self::ReconcileIncomplete { failures, skipped } => {
376 write!(
377 f,
378 "reconcile_incomplete: {failures} failures, {skipped} skipped"
379 )
380 }
381 Self::CheckpointFailure { session_id, error } => {
382 write!(f, "checkpoint_failure: {session_id}: {error}")
383 }
384 Self::HostLoopCrash { member_id, error } => {
385 write!(f, "host_loop_crash: {member_id}: {error}")
386 }
387 Self::RediscoverFailure { error } => {
388 write!(f, "rediscover_failure: {error}")
389 }
390 Self::EventLogFlushFailure { error } => {
391 write!(f, "event_log_flush_failure: {error}")
392 }
393 }
394 }
395}