use parking_lot::{Mutex, RwLock};
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::Arc;
use thiserror::Error;
pub use ::net::adapter::net::compute::{
DaemonBindings, DaemonError as CoreDaemonError, DaemonHostConfig, DaemonStats, MeshDaemon,
MigrationError, MigrationFailureReason, MigrationPhase, PlacementDecision, SchedulerError,
SubscriptionBinding, SUBPROTOCOL_MIGRATION,
};
pub use ::net::adapter::net::state::causal::{CausalEvent, CausalLink};
pub use ::net::adapter::net::state::snapshot::StateSnapshot;
use ::net::adapter::net::channel::ChannelName;
use ::net::adapter::net::identity::PermissionToken;
use ::net::adapter::net::behavior::capability::CapabilitySet;
use ::net::adapter::net::compute::{
chunk_snapshot, orchestrator::wire as migration_wire, DaemonFactoryRegistry, DaemonHost,
DaemonRegistry, MigrationMessage, MigrationOrchestrator, MigrationSourceHandler,
MigrationTargetHandler, Scheduler,
};
use ::net::adapter::net::identity::EntityId;
use ::net::adapter::net::subprotocol::{
FailureCallback, MigrationHandlerHooks, MigrationSubprotocolHandler, PostRestoreCallback,
PreCleanupCallback, ReadinessCallback,
};
use crate::identity::Identity;
use crate::mesh::Mesh;
type FactoryFn = Arc<dyn Fn() -> Box<dyn MeshDaemon> + Send + Sync>;
#[derive(Debug, Error)]
pub enum DaemonError {
#[error("daemon runtime is not ready — call DaemonRuntime::start() first")]
NotReady,
#[error("daemon runtime has been shut down")]
ShuttingDown,
#[error("factory for kind '{0}' is already registered")]
FactoryAlreadyRegistered(String),
#[error("no factory registered for kind '{0}'")]
FactoryNotFound(String),
#[error(
"snapshot/identity mismatch: snapshot origin {snapshot:#x} != identity origin {identity:#x}"
)]
SnapshotIdentityMismatch { snapshot: u64, identity: u64 },
#[error(transparent)]
Core(#[from] CoreDaemonError),
#[error("migration failed: {0}")]
Migration(#[from] MigrationError),
#[error("migration failed: {0}")]
MigrationFailed(MigrationFailureReason),
}
#[repr(u8)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum State {
Registering = 0,
Ready = 1,
ShuttingDown = 2,
}
impl State {
fn from_u8(v: u8) -> Self {
match v {
0 => State::Registering,
1 => State::Ready,
2 => State::ShuttingDown,
other => panic!("daemon runtime: corrupt state byte {other}"),
}
}
}
#[derive(Clone)]
pub struct DaemonRuntime {
inner: Arc<Inner>,
}
struct Inner {
mesh: Arc<Mesh>,
state: AtomicU8,
factories: RwLock<HashMap<String, FactoryFn>>,
registry: Arc<DaemonRegistry>,
#[cfg_attr(not(feature = "groups"), allow(dead_code))]
scheduler: Arc<Scheduler>,
factory_registry: Arc<DaemonFactoryRegistry>,
orchestrator: Arc<MigrationOrchestrator>,
source_handler: Arc<MigrationSourceHandler>,
target_handler: Arc<MigrationTargetHandler>,
recent_failures: Mutex<HashMap<u64, MigrationFailureReason>>,
simulate_not_ready: AtomicBool,
spawn_stall_ms: std::sync::atomic::AtomicU32,
start_stall_ms: std::sync::atomic::AtomicU32,
observers: Mutex<HashMap<u64, Vec<(u64, DeliverObserver)>>>,
#[cfg_attr(not(feature = "groups"), allow(dead_code))]
observer_id_counter: std::sync::atomic::AtomicU64,
}
pub(crate) type DeliverObserver = Arc<dyn Fn(&CausalEvent) + Send + Sync>;
#[cfg_attr(not(feature = "groups"), allow(dead_code))]
pub(crate) struct ObserverHandle {
runtime: std::sync::Weak<Inner>,
origin: u64,
id: u64,
}
impl Drop for ObserverHandle {
fn drop(&mut self) {
if let Some(inner) = self.runtime.upgrade() {
let mut map = inner.observers.lock();
if let Some(v) = map.get_mut(&self.origin) {
v.retain(|(id, _)| *id != self.id);
if v.is_empty() {
map.remove(&self.origin);
}
}
}
}
}
impl DaemonRuntime {
pub fn new(mesh: Arc<Mesh>) -> Self {
let local_node_id = mesh.inner().node_id();
let registry = Arc::new(DaemonRegistry::new());
let factory_registry = Arc::new(DaemonFactoryRegistry::new());
let source_handler = Arc::new(MigrationSourceHandler::new(registry.clone()));
let orchestrator = Arc::new(
MigrationOrchestrator::new(registry.clone(), local_node_id)
.with_source_handler(source_handler.clone()),
);
let target_handler = Arc::new(MigrationTargetHandler::new_with_factories(
registry.clone(),
factory_registry.clone(),
));
let scheduler = Arc::new(Scheduler::new(
mesh.inner().capability_fold().clone(),
local_node_id,
CapabilitySet::default(),
));
Self {
inner: Arc::new(Inner {
mesh,
state: AtomicU8::new(State::Registering as u8),
factories: RwLock::new(HashMap::new()),
registry,
scheduler,
factory_registry,
orchestrator,
source_handler,
target_handler,
recent_failures: Mutex::new(HashMap::new()),
simulate_not_ready: AtomicBool::new(false),
spawn_stall_ms: std::sync::atomic::AtomicU32::new(0),
start_stall_ms: std::sync::atomic::AtomicU32::new(0),
observers: Mutex::new(HashMap::new()),
observer_id_counter: std::sync::atomic::AtomicU64::new(1),
}),
}
}
#[cfg_attr(not(feature = "groups"), allow(dead_code))]
pub(crate) fn register_deliver_observer(
&self,
origin: u64,
cb: DeliverObserver,
) -> ObserverHandle {
let id = self
.inner
.observer_id_counter
.fetch_add(1, Ordering::Relaxed);
{
let mut map = self.inner.observers.lock();
map.entry(origin).or_default().push((id, cb));
}
ObserverHandle {
runtime: Arc::downgrade(&self.inner),
origin,
id,
}
}
pub fn register_factory<F>(&self, kind: &str, factory: F) -> Result<(), DaemonError>
where
F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
{
if self.state() == State::ShuttingDown {
return Err(DaemonError::ShuttingDown);
}
let mut map = self.inner.factories.write();
match map.entry(kind.to_string()) {
std::collections::hash_map::Entry::Occupied(_) => {
Err(DaemonError::FactoryAlreadyRegistered(kind.to_string()))
}
std::collections::hash_map::Entry::Vacant(slot) => {
slot.insert(Arc::new(factory));
Ok(())
}
}
}
pub async fn start(&self) -> Result<(), DaemonError> {
loop {
match self.state() {
State::Registering => {
let handler = Arc::new(self.build_migration_handler());
self.inner.mesh.inner().set_migration_handler(handler);
let stall_ms = self.inner.start_stall_ms.load(Ordering::Acquire);
if stall_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
}
let swap = self.inner.state.compare_exchange(
State::Registering as u8,
State::Ready as u8,
Ordering::AcqRel,
Ordering::Acquire,
);
if swap.is_ok() {
return Ok(());
}
if self.state() == State::ShuttingDown {
self.inner.mesh.inner().clear_migration_handler();
return Err(DaemonError::ShuttingDown);
}
}
State::Ready => return Ok(()),
State::ShuttingDown => return Err(DaemonError::ShuttingDown),
}
}
}
fn build_migration_handler(&self) -> MigrationSubprotocolHandler {
let local_node_id = self.inner.mesh.inner().node_id();
let ctx = self.inner.mesh.inner().migration_identity_context();
let rt = tokio::runtime::Handle::current();
let inner_for_rebind = self.inner.clone();
let rt_for_rebind = rt.clone();
let post_restore: PostRestoreCallback = Arc::new(move |origin_hash: u64| {
let inner = inner_for_rebind.clone();
rt_for_rebind.spawn(async move {
replay_subscriptions(inner, origin_hash).await;
});
});
let inner_for_teardown = self.inner.clone();
let rt_for_teardown = rt.clone();
let pre_cleanup: PreCleanupCallback = Arc::new(move |origin_hash: u64| {
let bindings = inner_for_teardown
.registry
.with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
.unwrap_or_default();
if bindings.is_empty() {
return;
}
let inner = inner_for_teardown.clone();
rt_for_teardown.spawn(async move {
teardown_subscriptions(inner, bindings).await;
});
});
let inner_for_readiness = self.inner.clone();
let readiness: ReadinessCallback = Arc::new(move || {
if inner_for_readiness
.simulate_not_ready
.load(Ordering::Acquire)
{
return false;
}
inner_for_readiness.state.load(Ordering::Acquire) == State::Ready as u8
});
let inner_for_failure = self.inner.clone();
let failure: FailureCallback =
Arc::new(move |origin_hash: u64, reason: MigrationFailureReason| {
inner_for_failure
.recent_failures
.lock()
.insert(origin_hash, reason);
});
MigrationSubprotocolHandler::with_hooks(
self.inner.orchestrator.clone(),
self.inner.source_handler.clone(),
self.inner.target_handler.clone(),
local_node_id,
MigrationHandlerHooks {
identity: Some(ctx),
post_restore: Some(post_restore),
pre_cleanup: Some(pre_cleanup),
readiness: Some(readiness),
failure: Some(failure),
},
)
}
pub async fn shutdown(&self) -> Result<(), DaemonError> {
self.inner
.state
.store(State::ShuttingDown as u8, Ordering::Release);
let origins: Vec<u64> = self
.inner
.registry
.list()
.into_iter()
.map(|(origin, _)| origin)
.collect();
for origin in origins {
let _ = self.inner.registry.unregister(origin);
self.inner.factory_registry.remove(origin);
}
self.inner.recent_failures.lock().clear();
self.inner.mesh.inner().clear_migration_handler();
Ok(())
}
pub fn simulate_not_ready(&self, flag: bool) {
self.inner.simulate_not_ready.store(flag, Ordering::Release);
}
#[doc(hidden)]
pub fn set_spawn_stall_ms(&self, millis: u32) {
self.inner.spawn_stall_ms.store(millis, Ordering::Release);
}
#[doc(hidden)]
pub fn set_start_stall_ms(&self, millis: u32) {
self.inner.start_stall_ms.store(millis, Ordering::Release);
}
pub fn is_ready(&self) -> bool {
self.state() == State::Ready
}
pub async fn spawn(
&self,
kind: &str,
identity: Identity,
config: DaemonHostConfig,
) -> Result<DaemonHandle, DaemonError> {
self.require_ready()?;
let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
if stall_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
}
let factory = self.factory_for_kind(kind)?;
let daemon = (factory)();
let keypair = identity.keypair().as_ref().clone();
let origin_hash = keypair.origin_hash();
let entity_id = keypair.entity_id().clone();
let factory_for_core = factory.clone();
self.inner
.factory_registry
.register(keypair.clone(), config.clone(), move || {
(factory_for_core)()
})
.map_err(DaemonError::Core)?;
let host = DaemonHost::new(daemon, keypair, config);
if let Err(e) = self.inner.registry.register(host) {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::Core(e));
}
if self.state() == State::ShuttingDown {
let _ = self.inner.registry.unregister(origin_hash);
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::ShuttingDown);
}
Ok(DaemonHandle {
origin_hash,
entity_id,
inner: self.inner.clone(),
})
}
pub async fn spawn_with_daemon<F>(
&self,
identity: Identity,
config: DaemonHostConfig,
daemon: Box<dyn MeshDaemon>,
kind_factory: F,
) -> Result<DaemonHandle, DaemonError>
where
F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
{
self.require_ready()?;
let stall_ms = self.inner.spawn_stall_ms.load(Ordering::Acquire);
if stall_ms > 0 {
tokio::time::sleep(std::time::Duration::from_millis(stall_ms as u64)).await;
}
let keypair = identity.keypair().as_ref().clone();
let origin_hash = keypair.origin_hash();
let entity_id = keypair.entity_id().clone();
self.inner
.factory_registry
.register(keypair.clone(), config.clone(), kind_factory)
.map_err(DaemonError::Core)?;
let host = DaemonHost::new(daemon, keypair, config);
if let Err(e) = self.inner.registry.register(host) {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::Core(e));
}
if self.state() == State::ShuttingDown {
let _ = self.inner.registry.unregister(origin_hash);
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::ShuttingDown);
}
Ok(DaemonHandle {
origin_hash,
entity_id,
inner: self.inner.clone(),
})
}
pub async fn spawn_from_snapshot_with_daemon<F>(
&self,
identity: Identity,
snapshot: StateSnapshot,
config: DaemonHostConfig,
daemon: Box<dyn MeshDaemon>,
kind_factory: F,
) -> Result<DaemonHandle, DaemonError>
where
F: Fn() -> Box<dyn MeshDaemon> + Send + Sync + 'static,
{
self.require_ready()?;
let keypair = identity.keypair().as_ref().clone();
let origin_hash = keypair.origin_hash();
let entity_id = keypair.entity_id().clone();
if snapshot.entity_id != entity_id {
return Err(DaemonError::SnapshotIdentityMismatch {
snapshot: snapshot.entity_id.origin_hash(),
identity: origin_hash,
});
}
self.inner
.factory_registry
.register(keypair.clone(), config.clone(), kind_factory)
.map_err(DaemonError::Core)?;
let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
Ok(h) => h,
Err(e) => {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::Core(e));
}
};
if let Err(e) = self.inner.registry.register(host) {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::Core(e));
}
if self.state() == State::ShuttingDown {
let _ = self.inner.registry.unregister(origin_hash);
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::ShuttingDown);
}
Ok(DaemonHandle {
origin_hash,
entity_id,
inner: self.inner.clone(),
})
}
pub async fn spawn_from_snapshot(
&self,
kind: &str,
identity: Identity,
snapshot: StateSnapshot,
config: DaemonHostConfig,
) -> Result<DaemonHandle, DaemonError> {
self.require_ready()?;
let factory = self.factory_for_kind(kind)?;
let keypair = identity.keypair().as_ref().clone();
let origin_hash = keypair.origin_hash();
let entity_id = keypair.entity_id().clone();
if snapshot.entity_id != entity_id {
return Err(DaemonError::SnapshotIdentityMismatch {
snapshot: snapshot.entity_id.origin_hash(),
identity: origin_hash,
});
}
let daemon = (factory)();
let factory_for_core = factory.clone();
self.inner
.factory_registry
.register(keypair.clone(), config.clone(), move || {
(factory_for_core)()
})
.map_err(DaemonError::Core)?;
let host = match DaemonHost::from_snapshot(daemon, keypair, &snapshot, config) {
Ok(h) => h,
Err(e) => {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::Core(e));
}
};
if let Err(e) = self.inner.registry.register(host) {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::Core(e));
}
if self.state() == State::ShuttingDown {
let _ = self.inner.registry.unregister(origin_hash);
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::ShuttingDown);
}
Ok(DaemonHandle {
origin_hash,
entity_id,
inner: self.inner.clone(),
})
}
pub async fn stop(&self, origin_hash: u64) -> Result<(), DaemonError> {
if self.state() == State::Registering {
return Err(DaemonError::NotReady);
}
match self.inner.registry.unregister(origin_hash) {
Ok(_) => {
self.inner.factory_registry.remove(origin_hash);
Ok(())
}
Err(CoreDaemonError::NotFound(_)) if self.state() == State::ShuttingDown => Ok(()),
Err(e) => Err(DaemonError::Core(e)),
}
}
pub async fn snapshot(&self, origin_hash: u64) -> Result<Option<StateSnapshot>, DaemonError> {
self.require_ready()?;
self.inner
.registry
.snapshot(origin_hash)
.map_err(DaemonError::Core)
}
pub fn deliver(
&self,
origin_hash: u64,
event: &CausalEvent,
) -> Result<Vec<CausalEvent>, DaemonError> {
self.require_ready()?;
let outputs = self
.inner
.registry
.deliver(origin_hash, event)
.map_err(DaemonError::Core)?;
let to_fire: Vec<DeliverObserver> = {
let map = self.inner.observers.lock();
map.get(&origin_hash)
.map(|v| v.iter().map(|(_, cb)| cb.clone()).collect())
.unwrap_or_default()
};
for cb in to_fire {
cb(event);
}
Ok(outputs)
}
pub fn daemon_count(&self) -> usize {
self.inner.registry.count()
}
pub fn migration_phase(&self, origin_hash: u64) -> Option<MigrationPhase> {
self.inner.orchestrator.status(origin_hash)
}
#[doc(hidden)]
pub fn peek_migration_failure(&self, origin_hash: u64) -> Option<MigrationFailureReason> {
self.inner.recent_failures.lock().get(&origin_hash).cloned()
}
#[doc(hidden)]
pub fn inject_migration_failure(&self, origin_hash: u64, reason: MigrationFailureReason) {
self.inner
.recent_failures
.lock()
.insert(origin_hash, reason);
}
pub fn subscriptions(&self, origin_hash: u64) -> Result<Vec<SubscriptionBinding>, DaemonError> {
self.inner
.registry
.with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
.map_err(DaemonError::Core)
}
pub async fn subscribe_channel(
&self,
origin_hash: u64,
publisher: u64,
channel: ChannelName,
token: Option<PermissionToken>,
) -> Result<(), DaemonError> {
self.require_ready()?;
if !self.inner.registry.contains(origin_hash) {
return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
}
let token_bytes = token.as_ref().map(|t| t.to_bytes().to_vec());
let result = match token {
Some(tok) => {
self.inner
.mesh
.inner()
.subscribe_channel_with_token(publisher, channel.clone(), tok)
.await
}
None => {
self.inner
.mesh
.inner()
.subscribe_channel(publisher, channel.clone())
.await
}
};
result.map_err(|e| {
DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
"subscribe_channel failed: {e}"
)))
})?;
if let Err(e) = self.inner.registry.with_host(origin_hash, |host| {
host.record_subscription(publisher, channel, token_bytes);
}) {
return Err(DaemonError::Core(e));
}
Ok(())
}
pub async fn unsubscribe_channel(
&self,
origin_hash: u64,
publisher: u64,
channel: ChannelName,
) -> Result<(), DaemonError> {
self.require_ready()?;
if !self.inner.registry.contains(origin_hash) {
return Err(DaemonError::Core(CoreDaemonError::NotFound(origin_hash)));
}
self.inner
.mesh
.inner()
.unsubscribe_channel(publisher, channel.clone())
.await
.map_err(|e| {
DaemonError::Core(CoreDaemonError::ProcessFailed(format!(
"unsubscribe_channel failed: {e}"
)))
})?;
let _ = self.inner.registry.with_host(origin_hash, |host| {
host.forget_subscription(publisher, &channel);
});
Ok(())
}
pub fn register_migration_target_identity(
&self,
kind: &str,
identity: Identity,
config: DaemonHostConfig,
) -> Result<(), DaemonError> {
if self.state() == State::ShuttingDown {
return Err(DaemonError::ShuttingDown);
}
let factory = self.factory_for_kind(kind)?;
let keypair = identity.keypair().as_ref().clone();
let origin_hash = keypair.origin_hash();
let factory_clone = factory.clone();
self.inner
.factory_registry
.register(keypair, config, move || (factory_clone)())
.map_err(DaemonError::Core)?;
if self.state() == State::ShuttingDown {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::ShuttingDown);
}
Ok(())
}
pub fn expect_migration(
&self,
kind: &str,
origin_hash: u64,
config: DaemonHostConfig,
) -> Result<(), DaemonError> {
if self.state() == State::ShuttingDown {
return Err(DaemonError::ShuttingDown);
}
let factory = self.factory_for_kind(kind)?;
let factory_clone = factory.clone();
self.inner
.factory_registry
.register_placeholder(origin_hash, config, move || (factory_clone)())
.map_err(DaemonError::Core)?;
if self.state() == State::ShuttingDown {
self.inner.factory_registry.remove(origin_hash);
return Err(DaemonError::ShuttingDown);
}
Ok(())
}
pub async fn start_migration(
&self,
origin_hash: u64,
source_node: u64,
target_node: u64,
) -> Result<MigrationHandle, DaemonError> {
self.start_migration_with(
origin_hash,
source_node,
target_node,
MigrationOpts::default(),
)
.await
}
pub async fn start_migration_with(
&self,
origin_hash: u64,
source_node: u64,
target_node: u64,
opts: MigrationOpts,
) -> Result<MigrationHandle, DaemonError> {
self.require_ready()?;
self.inner.recent_failures.lock().remove(&origin_hash);
let msgs = self
.inner
.orchestrator
.start_migration(origin_hash, source_node, target_node)
.map_err(DaemonError::Migration)?;
let msgs = if opts.transport_identity {
self.maybe_seal_chunked_snapshot(origin_hash, target_node, msgs)
.await?
} else {
msgs
};
let dest_node = match msgs.first() {
Some(MigrationMessage::TakeSnapshot { .. }) => source_node,
Some(MigrationMessage::SnapshotReady { .. }) => target_node,
Some(other) => {
let _ = self
.inner
.orchestrator
.abort_migration(origin_hash, "unexpected initial message".into());
return Err(DaemonError::Migration(MigrationError::StateFailed(
format!(
"orchestrator returned unexpected initial migration message: {:?}",
other
),
)));
}
None => {
let _ = self
.inner
.orchestrator
.abort_migration(origin_hash, "orchestrator returned no messages".into());
return Err(DaemonError::Migration(MigrationError::StateFailed(
"orchestrator returned no migration messages".into(),
)));
}
};
for msg in &msgs {
if let Err(e) = self.send_migration_message(dest_node, msg).await {
let _ = self
.inner
.orchestrator
.abort_migration(origin_hash, format!("initial send failed: {e}"));
return Err(e);
}
}
Ok(MigrationHandle {
origin_hash,
source_node,
target_node,
runtime: self.clone(),
opts,
})
}
fn maybe_seal_local_snapshot(
&self,
daemon_origin: u64,
target_node: u64,
snapshot_bytes: &[u8],
) -> Result<Option<Vec<u8>>, DaemonError> {
let snapshot = StateSnapshot::from_bytes(snapshot_bytes).ok_or_else(|| {
DaemonError::Migration(MigrationError::StateFailed(
"failed to decode local snapshot for envelope sealing".into(),
))
})?;
if snapshot.identity_envelope.is_some() {
return Ok(None);
}
let Some(target_pub) = self.inner.mesh.inner().peer_static_x25519(target_node) else {
return Err(DaemonError::Migration(MigrationError::StateFailed(
format!(
"identity transport requested but peer X25519 static for \
{target_node:#x} is unknown (e.g. NKpsk0-responder \
side) — cannot seal envelope; use \
`transport_identity: false` to proceed unsealed"
),
)));
};
let Some(kp) = self.inner.registry.daemon_keypair(daemon_origin) else {
return Err(DaemonError::Migration(MigrationError::StateFailed(
format!(
"identity transport requested but daemon {daemon_origin:#x} has \
no local keypair to seal with"
),
)));
};
snapshot
.with_identity_envelope(&kp, target_pub)
.map(|sealed| Some(sealed.to_bytes()))
.map_err(|e| {
DaemonError::Migration(MigrationError::StateFailed(format!(
"identity envelope seal failed for daemon {daemon_origin:#x}: {e}"
)))
})
}
async fn maybe_seal_chunked_snapshot(
&self,
origin_hash: u64,
target_node: u64,
mut msgs: Vec<MigrationMessage>,
) -> Result<Vec<MigrationMessage>, DaemonError> {
if !matches!(msgs.first(), Some(MigrationMessage::SnapshotReady { .. })) {
return Ok(msgs);
}
let seq_through = match msgs.first() {
Some(MigrationMessage::SnapshotReady { seq_through, .. }) => *seq_through,
_ => unreachable!("checked by matches! above"),
};
msgs.sort_by_key(|m| match m {
MigrationMessage::SnapshotReady { chunk_index, .. } => *chunk_index,
_ => 0,
});
let mut reassembled: Vec<u8> = Vec::new();
for m in &msgs {
if let MigrationMessage::SnapshotReady { snapshot_bytes, .. } = m {
reassembled.extend_from_slice(snapshot_bytes);
}
}
match self.maybe_seal_local_snapshot(origin_hash, target_node, &reassembled) {
Ok(Some(sealed)) => chunk_snapshot(origin_hash, sealed, seq_through).map_err(|e| {
let _ = self
.inner
.orchestrator
.abort_migration(origin_hash, format!("rechunk after seal failed: {e}"));
DaemonError::Migration(e)
}),
Ok(None) => Ok(msgs),
Err(e) => {
let _ = self
.inner
.orchestrator
.abort_migration(origin_hash, format!("envelope seal failed: {e}"));
Err(e)
}
}
}
async fn send_migration_message(
&self,
dest_node: u64,
msg: &MigrationMessage,
) -> Result<(), DaemonError> {
let addr = self
.inner
.mesh
.inner()
.peer_addr(dest_node)
.ok_or(DaemonError::Migration(MigrationError::TargetUnavailable(
dest_node,
)))?;
let bytes = migration_wire::encode(msg).map_err(DaemonError::Migration)?;
self.inner
.mesh
.inner()
.send_subprotocol(addr, SUBPROTOCOL_MIGRATION, &bytes)
.await
.map_err(|e| {
DaemonError::Migration(MigrationError::StateFailed(format!(
"send_subprotocol failed: {e}"
)))
})
}
pub fn mesh(&self) -> &Arc<Mesh> {
&self.inner.mesh
}
fn state(&self) -> State {
State::from_u8(self.inner.state.load(Ordering::Acquire))
}
fn require_ready(&self) -> Result<(), DaemonError> {
match self.state() {
State::Ready => Ok(()),
State::Registering => Err(DaemonError::NotReady),
State::ShuttingDown => Err(DaemonError::ShuttingDown),
}
}
fn factory_for_kind(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
self.inner
.factories
.read()
.get(kind)
.cloned()
.ok_or_else(|| DaemonError::FactoryNotFound(kind.to_string()))
}
#[cfg(feature = "groups")]
pub(crate) fn scheduler_arc(&self) -> Arc<Scheduler> {
self.inner.scheduler.clone()
}
#[cfg(feature = "testing")]
pub(crate) fn migration_orchestrator_arc(&self) -> Arc<MigrationOrchestrator> {
self.inner.orchestrator.clone()
}
#[cfg(feature = "groups")]
pub(crate) fn registry_arc(&self) -> Arc<DaemonRegistry> {
self.inner.registry.clone()
}
#[cfg(feature = "groups")]
pub(crate) fn factory_for_kind_pub(&self, kind: &str) -> Result<FactoryFn, DaemonError> {
self.factory_for_kind(kind)
}
#[cfg(feature = "groups")]
pub(crate) fn is_ready_pub(&self) -> bool {
self.state() == State::Ready
}
}
#[derive(Clone)]
pub struct DaemonHandle {
pub origin_hash: u64,
pub entity_id: EntityId,
inner: Arc<Inner>,
}
impl DaemonHandle {
pub fn stats(&self) -> Result<DaemonStats, DaemonError> {
self.inner
.registry
.stats(self.origin_hash)
.map_err(DaemonError::Core)
}
pub async fn snapshot(&self) -> Result<Option<StateSnapshot>, DaemonError> {
self.inner
.registry
.snapshot(self.origin_hash)
.map_err(DaemonError::Core)
}
}
impl std::fmt::Debug for DaemonRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let factory_count = self.inner.factories.read().len();
f.debug_struct("DaemonRuntime")
.field("state", &self.state())
.field("factories", &factory_count)
.field("daemons", &self.inner.registry.count())
.finish()
}
}
impl std::fmt::Debug for DaemonHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DaemonHandle")
.field("origin_hash", &format_args!("{:#x}", self.origin_hash))
.field("entity_id", &self.entity_id)
.finish()
}
}
async fn replay_subscriptions(inner: Arc<Inner>, origin_hash: u64) {
let bindings = match inner
.registry
.with_host(origin_hash, |host| host.bindings_snapshot().subscriptions)
{
Ok(list) => list,
Err(_) => return,
};
for sub in bindings {
let token = sub
.token_bytes
.as_deref()
.and_then(|bytes| PermissionToken::from_bytes(bytes).ok());
let result = match token {
Some(tok) => {
inner
.mesh
.inner()
.subscribe_channel_with_token(sub.publisher, sub.channel.clone(), tok)
.await
}
None => {
inner
.mesh
.inner()
.subscribe_channel(sub.publisher, sub.channel.clone())
.await
}
};
if let Err(e) = result {
eprintln!(
"channel re-bind replay failed: daemon={:#x} channel={} publisher={:#x} error={}",
origin_hash, sub.channel, sub.publisher, e,
);
}
}
}
async fn teardown_subscriptions(inner: Arc<Inner>, bindings: Vec<SubscriptionBinding>) {
for sub in bindings {
if let Err(e) = inner
.mesh
.inner()
.unsubscribe_channel(sub.publisher, sub.channel.clone())
.await
{
eprintln!(
"channel re-bind teardown failed: channel={} publisher={:#x} error={}",
sub.channel, sub.publisher, e,
);
}
}
}
#[derive(Debug, Clone)]
pub struct MigrationOpts {
pub transport_identity: bool,
pub retry_not_ready: Option<std::time::Duration>,
}
impl Default for MigrationOpts {
fn default() -> Self {
Self {
transport_identity: true,
retry_not_ready: Some(std::time::Duration::from_secs(30)),
}
}
}
fn not_ready_backoff(attempt: u8) -> std::time::Duration {
use std::time::Duration;
let ms = 500u64 << (attempt.saturating_sub(1).min(5));
Duration::from_millis(ms.min(16_000))
}
#[derive(Clone)]
pub struct MigrationHandle {
pub origin_hash: u64,
pub source_node: u64,
pub target_node: u64,
runtime: DaemonRuntime,
opts: MigrationOpts,
}
impl MigrationHandle {
pub fn phase(&self) -> Option<MigrationPhase> {
self.runtime.inner.orchestrator.status(self.origin_hash)
}
pub async fn wait(self) -> Result<(), DaemonError> {
self.wait_until(None).await
}
pub async fn wait_with_timeout(self, timeout: std::time::Duration) -> Result<(), DaemonError> {
let deadline = tokio::time::Instant::now() + timeout;
self.wait_until(Some(deadline)).await
}
async fn wait_until(
self,
overall_deadline: Option<tokio::time::Instant>,
) -> Result<(), DaemonError> {
let start = tokio::time::Instant::now();
let retry_deadline = self.opts.retry_not_ready.map(|b| start + b);
let mut attempts: u8 = 1; loop {
match self.wait_one_attempt(overall_deadline).await {
Ok(()) => return Ok(()),
Err(DaemonError::MigrationFailed(reason)) if reason.is_retriable() => {
let Some(retry_d) = retry_deadline else {
return Err(DaemonError::MigrationFailed(reason));
};
let now = tokio::time::Instant::now();
let overall_exhausted = overall_deadline.map(|d| now >= d).unwrap_or(false);
if now >= retry_d || overall_exhausted {
return Err(DaemonError::MigrationFailed(
MigrationFailureReason::NotReadyTimeout { attempts },
));
}
let backoff = not_ready_backoff(attempts);
tokio::time::sleep(backoff).await;
attempts = attempts.saturating_add(1);
self.reinitiate_attempt().await?;
}
Err(e) => return Err(e),
}
}
}
async fn wait_one_attempt(
&self,
overall_deadline: Option<tokio::time::Instant>,
) -> Result<(), DaemonError> {
loop {
let current_phase = self.runtime.inner.orchestrator.status(self.origin_hash);
match current_phase {
Some(phase) => {
if phase == MigrationPhase::Complete {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
return Ok(());
}
}
None => {
if let Some(reason) = self.take_recent_failure() {
return Err(DaemonError::MigrationFailed(reason));
}
return Ok(());
}
}
if let Some(d) = overall_deadline {
if tokio::time::Instant::now() >= d {
let _ = self
.runtime
.inner
.orchestrator
.abort_migration(self.origin_hash, "timeout".into());
return Err(DaemonError::Migration(MigrationError::StateFailed(
format!("migration timed out in phase {:?}", current_phase),
)));
}
}
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
async fn reinitiate_attempt(&self) -> Result<(), DaemonError> {
let msgs = self
.runtime
.inner
.orchestrator
.start_migration(self.origin_hash, self.source_node, self.target_node)
.map_err(DaemonError::Migration)?;
let msgs = if self.opts.transport_identity {
self.runtime
.maybe_seal_chunked_snapshot(self.origin_hash, self.target_node, msgs)
.await?
} else {
msgs
};
let dest_node = match msgs.first() {
Some(MigrationMessage::TakeSnapshot { .. }) => self.source_node,
Some(MigrationMessage::SnapshotReady { .. }) => self.target_node,
Some(other) => {
let _ = self
.runtime
.inner
.orchestrator
.abort_migration(self.origin_hash, "unexpected retry message".into());
return Err(DaemonError::Migration(MigrationError::StateFailed(
format!("unexpected retry initial message: {other:?}"),
)));
}
None => {
let _ = self.runtime.inner.orchestrator.abort_migration(
self.origin_hash,
"orchestrator returned no retry messages".into(),
);
return Err(DaemonError::Migration(MigrationError::StateFailed(
"orchestrator returned no migration messages on retry".into(),
)));
}
};
for msg in &msgs {
self.runtime.send_migration_message(dest_node, msg).await?;
}
Ok(())
}
fn take_recent_failure(&self) -> Option<MigrationFailureReason> {
self.runtime
.inner
.recent_failures
.lock()
.remove(&self.origin_hash)
}
pub async fn cancel(&self) -> Result<(), DaemonError> {
let msg = self
.runtime
.inner
.orchestrator
.abort_migration(self.origin_hash, "cancel requested".into())
.map_err(DaemonError::Migration)?;
let _ = self
.runtime
.send_migration_message(self.source_node, &msg)
.await;
let _ = self
.runtime
.send_migration_message(self.target_node, &msg)
.await;
Ok(())
}
}
impl std::fmt::Debug for MigrationHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MigrationHandle")
.field("origin_hash", &format_args!("{:#x}", self.origin_hash))
.field("source_node", &format_args!("{:#x}", self.source_node))
.field("target_node", &format_args!("{:#x}", self.target_node))
.field("phase", &self.phase())
.finish()
}
}