pub mod bindings;
mod daemon;
pub mod daemon_factory;
pub mod fork_group;
pub mod group_coord;
mod host;
mod migration;
pub mod migration_source;
pub mod migration_target;
pub mod orchestrator;
mod registry;
pub mod replica_group;
mod scheduler;
pub mod standby_group;
pub use bindings::{DaemonBindings, SubscriptionBinding};
pub use daemon::{
DaemonControl, DaemonError, DaemonHealth, DaemonHostConfig, DaemonLifecycleEvent,
DaemonLifecycleObserver, DaemonStats, MeshDaemon,
};
pub use daemon_factory::{DaemonFactoryRegistry, FactoryEntry};
pub use fork_group::{ForkGroup, ForkGroupConfig, ForkInfo};
pub use group_coord::{GroupCoordinator, GroupError, GroupHealth, MemberInfo};
pub use host::DaemonHost;
pub use migration::{
MigrationError, MigrationFailureReason, MigrationPhase, MigrationState, SUBPROTOCOL_MIGRATION,
};
pub use migration_source::MigrationSourceHandler;
pub use migration_target::MigrationTargetHandler;
pub use orchestrator::{
chunk_snapshot, MigrationMessage, MigrationOrchestrator, SnapshotReassembler,
MAX_SNAPSHOT_CHUNK_SIZE, MAX_SNAPSHOT_SIZE,
};
pub use registry::DaemonRegistry;
pub use replica_group::{ReplicaGroup, ReplicaGroupConfig, SUBPROTOCOL_REPLICA_GROUP};
pub use scheduler::{PlacementDecision, PlacementReason, Scheduler, SchedulerError};
pub use standby_group::{MemberRole, StandbyGroup, StandbyGroupConfig};
pub type RecoveryHandler = Box<dyn FnMut() -> Vec<u8> + Send>;
#[derive(Clone, Default)]
pub struct RecoveryRegistry {
handlers: std::sync::Arc<parking_lot::Mutex<Vec<RecoveryHandler>>>,
}
impl RecoveryRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn register(&self, handler: RecoveryHandler) -> usize {
let mut guard = self.handlers.lock();
guard.push(handler);
guard.len()
}
pub fn len(&self) -> usize {
self.handlers.lock().len()
}
pub fn is_empty(&self) -> bool {
self.handlers.lock().is_empty()
}
pub fn try_run_all(&self) -> Vec<u8> {
let handlers_to_run = {
let mut guard = self.handlers.lock();
std::mem::take(&mut *guard)
};
let mut survivors: Vec<RecoveryHandler> = Vec::with_capacity(handlers_to_run.len());
let mut all = Vec::new();
for mut h in handlers_to_run {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(&mut h));
match result {
Ok(recovered) => {
all.extend(recovered);
survivors.push(h);
}
Err(_) => {
tracing::warn!(
target: "meshos",
"RecoveryRegistry: handler panicked; evicting"
);
}
}
}
let mut guard = self.handlers.lock();
if guard.is_empty() {
*guard = survivors;
} else {
let mut combined = survivors;
combined.extend(guard.drain(..));
*guard = combined;
}
all
}
}
impl std::fmt::Debug for RecoveryRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RecoveryRegistry")
.field("handlers", &self.handlers.lock().len())
.finish()
}
}
pub trait UnhealthySlotRecovery: Send + Sync {
fn has_unhealthy_slots(&self) -> bool;
fn try_recover(
&mut self,
scheduler: &Scheduler,
registry: &DaemonRegistry,
daemon_factory: &dyn Fn() -> Box<dyn MeshDaemon>,
) -> Vec<u8>;
}
#[cfg(test)]
mod recovery_registry_tests {
use super::*;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
#[test]
fn try_run_all_collects_recovered_slots_and_drops_panicking_handlers() {
let reg = RecoveryRegistry::new();
assert!(reg.is_empty());
let calls_a = Arc::new(AtomicU32::new(0));
let counter_a = calls_a.clone();
reg.register(Box::new(move || {
counter_a.fetch_add(1, Ordering::Relaxed);
vec![0, 1]
}));
let calls_b = Arc::new(AtomicU32::new(0));
let counter_b = calls_b.clone();
reg.register(Box::new(move || {
counter_b.fetch_add(1, Ordering::Relaxed);
panic!("simulated handler panic");
}));
assert_eq!(reg.len(), 2);
let recovered = reg.try_run_all();
assert_eq!(recovered, vec![0, 1]);
assert_eq!(reg.len(), 1, "panicking handler must be evicted");
assert_eq!(calls_a.load(Ordering::Relaxed), 1);
assert_eq!(calls_b.load(Ordering::Relaxed), 1);
let recovered = reg.try_run_all();
assert_eq!(recovered, vec![0, 1]);
assert_eq!(reg.len(), 1);
assert_eq!(calls_a.load(Ordering::Relaxed), 2);
assert_eq!(calls_b.load(Ordering::Relaxed), 1);
}
#[test]
fn try_run_all_allows_handler_to_register_new_handler() {
let reg = RecoveryRegistry::new();
let reg_for_handler = reg.clone();
let invoked = Arc::new(AtomicU32::new(0));
let invoked_for_handler = invoked.clone();
reg.register(Box::new(move || {
reg_for_handler.register(Box::new(|| vec![99]));
invoked_for_handler.fetch_add(1, Ordering::Relaxed);
vec![7]
}));
assert_eq!(reg.len(), 1);
let recovered = reg.try_run_all();
assert_eq!(recovered, vec![7]);
assert_eq!(invoked.load(Ordering::Relaxed), 1);
assert_eq!(
reg.len(),
2,
"concurrently-registered handler must land in the registry",
);
let recovered = reg.try_run_all();
let mut got = recovered;
got.sort();
assert_eq!(got, vec![7, 99]);
}
#[test]
fn register_during_try_run_all_does_not_block_indefinitely() {
use std::sync::Barrier;
use std::thread;
use std::time::{Duration, Instant};
let reg = Arc::new(RecoveryRegistry::new());
let barrier_start = Arc::new(Barrier::new(2));
let in_handler = Arc::new(AtomicU32::new(0));
let in_handler_for_h = in_handler.clone();
let barrier_for_h = barrier_start.clone();
reg.register(Box::new(move || {
in_handler_for_h.fetch_add(1, Ordering::SeqCst);
barrier_for_h.wait();
std::thread::sleep(Duration::from_millis(200));
vec![0]
}));
let reg_runner = reg.clone();
let runner = thread::spawn(move || reg_runner.try_run_all());
barrier_start.wait();
let reg_writer = reg.clone();
let started = Instant::now();
reg_writer.register(Box::new(|| vec![42]));
let elapsed = started.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"register must not block on a long-running handler — \
elapsed = {elapsed:?}; pre-fix this would have been ~200 ms",
);
runner.join().unwrap();
assert_eq!(in_handler.load(Ordering::SeqCst), 1);
}
}