#![cfg(feature = "compute")]
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use bytes::Bytes;
use net::adapter::net::compute::DaemonError as CoreDaemonError;
use net::adapter::net::state::causal::{CausalEvent, CausalLink};
use net_sdk::capabilities::CapabilityFilter;
use net_sdk::compute::{DaemonError, DaemonHostConfig, DaemonRuntime, MeshDaemon};
use net_sdk::mesh::MeshBuilder;
use net_sdk::Identity;
const PSK: [u8; 32] = [0x42u8; 32];
struct EchoDaemon;
impl MeshDaemon for EchoDaemon {
fn name(&self) -> &str {
"echo"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, event: &CausalEvent) -> Result<Vec<Bytes>, CoreDaemonError> {
Ok(vec![event.payload.clone()])
}
}
struct CounterDaemon {
count: u64,
}
impl MeshDaemon for CounterDaemon {
fn name(&self) -> &str {
"counter"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, CoreDaemonError> {
self.count += 1;
Ok(vec![Bytes::copy_from_slice(&self.count.to_le_bytes())])
}
fn snapshot(&self) -> Option<Bytes> {
Some(Bytes::copy_from_slice(&self.count.to_le_bytes()))
}
fn restore(&mut self, state: Bytes) -> Result<(), CoreDaemonError> {
if state.len() != 8 {
return Err(CoreDaemonError::RestoreFailed(format!(
"counter state must be 8 bytes, got {}",
state.len()
)));
}
let mut arr = [0u8; 8];
arr.copy_from_slice(&state);
self.count = u64::from_le_bytes(arr);
Ok(())
}
}
async fn runtime() -> DaemonRuntime {
let mesh = MeshBuilder::new("127.0.0.1:0", &PSK)
.unwrap()
.build()
.await
.expect("build mesh");
DaemonRuntime::new(Arc::new(mesh))
}
fn event(origin_hash: u64, seq: u64, payload: &'static [u8]) -> CausalEvent {
CausalEvent {
link: CausalLink {
origin_hash,
horizon_encoded: 0,
sequence: seq,
parent_hash: 0,
},
payload: Bytes::from_static(payload),
received_at: 0,
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn registering_rejects_spawn_with_not_ready() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.expect("register");
let err = rt
.spawn("echo", Identity::generate(), DaemonHostConfig::default())
.await
.expect_err("spawn before start must fail");
assert!(
matches!(err, DaemonError::NotReady),
"expected NotReady, got {err:?}",
);
assert!(!rt.is_ready());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn start_is_idempotent() {
let rt = runtime().await;
rt.start().await.expect("first start");
rt.start().await.expect("second start is a no-op");
assert!(rt.is_ready());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shutdown_rejects_subsequent_spawn_and_register() {
let rt = runtime().await;
rt.start().await.expect("start");
rt.shutdown().await.expect("shutdown");
let spawn_err = rt
.spawn("echo", Identity::generate(), DaemonHostConfig::default())
.await
.expect_err("spawn after shutdown must fail");
assert!(
matches!(spawn_err, DaemonError::ShuttingDown),
"expected ShuttingDown, got {spawn_err:?}",
);
let reg_err = rt
.register_factory("echo", || Box::new(EchoDaemon))
.expect_err("register after shutdown must fail");
assert!(
matches!(reg_err, DaemonError::ShuttingDown),
"expected ShuttingDown, got {reg_err:?}",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn register_factory_rejects_duplicate_kind() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.expect("first register");
let err = rt
.register_factory("echo", || Box::new(EchoDaemon))
.expect_err("duplicate kind must fail");
match err {
DaemonError::FactoryAlreadyRegistered(ref k) => assert_eq!(k, "echo"),
other => panic!("expected FactoryAlreadyRegistered, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn register_new_kind_after_ready_is_allowed() {
let rt = runtime().await;
rt.start().await.expect("start");
rt.register_factory("late", || Box::new(EchoDaemon))
.expect("register after start");
let _ = rt
.spawn("late", Identity::generate(), DaemonHostConfig::default())
.await
.expect("spawn late-registered kind");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn echo_daemon_roundtrip() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.expect("register");
rt.start().await.expect("start");
let identity = Identity::generate();
let origin_hash = identity.keypair().origin_hash();
let handle = rt
.spawn("echo", identity, DaemonHostConfig::default())
.await
.expect("spawn");
assert_eq!(handle.origin_hash, origin_hash);
let outputs = rt
.deliver(handle.origin_hash, &event(origin_hash, 1, b"ping"))
.expect("deliver");
assert_eq!(outputs.len(), 1);
assert_eq!(&outputs[0].payload[..], b"ping");
let stats = handle.stats().expect("stats");
assert_eq!(stats.events_processed, 1);
assert_eq!(stats.events_emitted, 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn counter_snapshot_round_trip_through_runtime() {
let rt = runtime().await;
rt.register_factory("counter", || Box::new(CounterDaemon { count: 0 }))
.expect("register");
rt.start().await.expect("start");
let identity = Identity::generate();
let origin_hash = identity.keypair().origin_hash();
let handle = rt
.spawn("counter", identity.clone(), DaemonHostConfig::default())
.await
.expect("spawn");
for i in 1..=5u64 {
let outputs = rt
.deliver(handle.origin_hash, &event(origin_hash, i, b"tick"))
.expect("deliver");
assert_eq!(outputs.len(), 1);
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&outputs[0].payload);
assert_eq!(u64::from_le_bytes(bytes), i);
}
let snapshot = handle
.snapshot()
.await
.expect("snapshot")
.expect("counter is stateful");
assert_eq!(snapshot.through_seq, 5);
rt.stop(handle.origin_hash).await.expect("stop");
let rehydrated = rt
.spawn_from_snapshot("counter", identity, snapshot, DaemonHostConfig::default())
.await
.expect("spawn_from_snapshot");
let outputs = rt
.deliver(rehydrated.origin_hash, &event(origin_hash, 6, b"resumed"))
.expect("deliver after restore");
assert_eq!(outputs.len(), 1);
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&outputs[0].payload);
assert_eq!(u64::from_le_bytes(bytes), 6);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_from_snapshot_rejects_identity_mismatch() {
let rt = runtime().await;
rt.register_factory("counter", || Box::new(CounterDaemon { count: 0 }))
.expect("register");
rt.start().await.expect("start");
let ident_a = Identity::generate();
let handle = rt
.spawn("counter", ident_a.clone(), DaemonHostConfig::default())
.await
.expect("spawn");
let snapshot = handle
.snapshot()
.await
.expect("snapshot")
.expect("counter is stateful");
rt.stop(handle.origin_hash).await.expect("stop");
let ident_b = Identity::generate();
assert_ne!(
ident_a.keypair().origin_hash(),
ident_b.keypair().origin_hash(),
"fixture: fresh identity must differ",
);
let err = rt
.spawn_from_snapshot("counter", ident_b, snapshot, DaemonHostConfig::default())
.await
.expect_err("identity mismatch must be rejected");
assert!(
matches!(err, DaemonError::SnapshotIdentityMismatch { .. }),
"expected SnapshotIdentityMismatch, got {err:?}",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_from_snapshot_checks_full_entity_id_not_just_origin_hash() {
use std::collections::HashMap;
let mut seen: HashMap<u32, Identity> = HashMap::new();
let mut collision: Option<(Identity, Identity)> = None;
for i in 0u64..300_000 {
let mut seed = [0u8; 32];
seed[..8].copy_from_slice(&i.to_le_bytes());
let id = Identity::from_seed(seed);
let h = id.keypair().origin_hash() as u32;
if let Some(prior) = seen.remove(&h) {
if prior.entity_id() != id.entity_id() {
collision = Some((prior, id));
break;
}
}
seen.insert(h, id);
}
let (ident_a, ident_b) = collision
.expect("no origin_hash collision found within the attempt budget — try raising the bound");
assert_eq!(
ident_a.keypair().origin_hash() as u32,
ident_b.keypair().origin_hash() as u32,
"fixture: pair must collide on the u32 routing projection of origin_hash",
);
assert_ne!(
ident_a.entity_id(),
ident_b.entity_id(),
"fixture: pair must have different entity_ids",
);
let rt = runtime().await;
rt.register_factory("counter", || Box::new(CounterDaemon { count: 0 }))
.expect("register");
rt.start().await.expect("start");
let handle = rt
.spawn("counter", ident_a.clone(), DaemonHostConfig::default())
.await
.expect("spawn A");
let snapshot = handle
.snapshot()
.await
.expect("snapshot")
.expect("counter is stateful");
rt.stop(handle.origin_hash).await.expect("stop");
let err = rt
.spawn_from_snapshot("counter", ident_b, snapshot, DaemonHostConfig::default())
.await
.expect_err("collision but distinct entity_id must reject");
match err {
DaemonError::SnapshotIdentityMismatch { .. } => {}
DaemonError::Core(inner) => panic!(
"origin_hash collision slipped past the SDK check and was only caught by the \
core backstop ({inner:?}); the SDK must do its own full-entity_id check",
),
other => panic!("expected SnapshotIdentityMismatch, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn start_installs_handler_before_publishing_ready() {
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering as AOrd};
use std::sync::Arc as StdArc;
for trial in 0..64u32 {
let rt = runtime().await;
let mesh = rt.mesh().clone();
let gap_witnessed = StdArc::new(AtomicBool::new(false));
let observer_done = StdArc::new(AtomicBool::new(false));
let first_ready_tick = StdArc::new(AtomicU32::new(0));
let rt_w = rt.clone();
let mesh_w = mesh.clone();
let gap_w = gap_witnessed.clone();
let done_w = observer_done.clone();
let first_w = first_ready_tick.clone();
let observer = std::thread::spawn(move || {
let mut ticks = 0u32;
loop {
if rt_w.is_ready() {
if !mesh_w.inner().has_migration_handler() {
gap_w.store(true, AOrd::Release);
}
first_w.store(ticks, AOrd::Release);
break;
}
ticks = ticks.saturating_add(1);
if ticks & 0xFFFF == 0 {
std::thread::yield_now();
}
if done_w.load(AOrd::Acquire) {
return;
}
}
});
std::thread::sleep(std::time::Duration::from_micros(50));
rt.start().await.expect("start");
observer_done.store(true, AOrd::Release);
observer.join().expect("observer panicked");
assert!(
!gap_witnessed.load(AOrd::Acquire),
"trial {trial}: observed Ready-without-handler gap — start() flipped state \
to Ready before set_migration_handler completed",
);
let _ = first_ready_tick.load(AOrd::Acquire); }
}
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn register_factory_concurrent_same_kind_admits_exactly_one() {
use std::sync::atomic::{AtomicU32, Ordering as AOrd};
let rt = runtime().await;
let rt_shared = std::sync::Arc::new(rt);
const THREADS: u32 = 32;
let oks = std::sync::Arc::new(AtomicU32::new(0));
let dupes = std::sync::Arc::new(AtomicU32::new(0));
let mut handles = Vec::with_capacity(THREADS as usize);
for _ in 0..THREADS {
let rt_c = rt_shared.clone();
let oks_c = oks.clone();
let dupes_c = dupes.clone();
handles.push(tokio::spawn(async move {
let r = rt_c.register_factory("contended-kind", || Box::new(EchoDaemon));
match r {
Ok(()) => {
oks_c.fetch_add(1, AOrd::AcqRel);
}
Err(DaemonError::FactoryAlreadyRegistered(_)) => {
dupes_c.fetch_add(1, AOrd::AcqRel);
}
other => panic!("unexpected register_factory result: {other:?}"),
}
}));
}
for h in handles {
h.await.expect("task panicked");
}
assert_eq!(
oks.load(AOrd::Acquire),
1,
"exactly one concurrent register_factory call must succeed; got {}",
oks.load(AOrd::Acquire),
);
assert_eq!(
dupes.load(AOrd::Acquire),
THREADS - 1,
"the other {} callers must see FactoryAlreadyRegistered",
THREADS - 1,
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn start_losing_race_to_shutdown_clears_handler() {
let rt = runtime().await;
rt.set_start_stall_ms(100);
let rt_for_start = rt.clone();
let start_task = tokio::spawn(async move { rt_for_start.start().await });
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(
rt.mesh().inner().has_migration_handler(),
"fixture: start should have installed the handler by now",
);
rt.shutdown().await.expect("shutdown Ok");
let start_result = start_task.await.expect("start task panicked");
match start_result {
Err(DaemonError::ShuttingDown) => {}
other => panic!("start racing with shutdown must return Err(ShuttingDown); got {other:?}",),
}
assert!(
!rt.mesh().inner().has_migration_handler(),
"start() that lost the race to shutdown must clear its own \
handler install — leaving it on the mesh would keep the \
runtime's internals wired into inbound migration traffic \
after the runtime has already been torn down",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shutdown_clears_migration_handler() {
let rt = runtime().await;
rt.start().await.expect("start");
assert!(
rt.mesh().inner().has_migration_handler(),
"fixture: handler should be installed after start",
);
rt.shutdown().await.expect("shutdown");
assert!(
!rt.mesh().inner().has_migration_handler(),
"shutdown must uninstall the handler — leaving it installed \
keeps the torn-down runtime wired into inbound migration \
traffic",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn spawn_racing_with_shutdown_does_not_leave_daemon_registered() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.unwrap();
rt.start().await.unwrap();
rt.set_spawn_stall_ms(100);
let rt_for_spawn = rt.clone();
let spawn_task = tokio::spawn(async move {
rt_for_spawn
.spawn("echo", Identity::generate(), DaemonHostConfig::default())
.await
});
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
rt.shutdown().await.expect("shutdown Ok");
let spawn_result = spawn_task.await.expect("spawn task panicked");
assert_eq!(
rt.daemon_count(),
0,
"daemon survived shutdown (spawn returned {spawn_result:?}) — \
pre-fix race left an entry in the registry when shutdown's \
sweep completed before spawn's inserts",
);
match spawn_result {
Err(DaemonError::ShuttingDown) => {}
other => panic!(
"spawn racing with shutdown must return Err(ShuttingDown) \
when shutdown completed during the stall; got {other:?}",
),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_unknown_kind_errors() {
let rt = runtime().await;
rt.start().await.expect("start");
let err = rt
.spawn(
"never-registered",
Identity::generate(),
DaemonHostConfig::default(),
)
.await
.expect_err("unknown kind must fail");
match err {
DaemonError::FactoryNotFound(ref k) => assert_eq!(k, "never-registered"),
other => panic!("expected FactoryNotFound, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn spawn_same_identity_twice_is_rejected() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.expect("register");
rt.start().await.expect("start");
let identity = Identity::generate();
let _handle = rt
.spawn("echo", identity.clone(), DaemonHostConfig::default())
.await
.expect("first spawn");
let err = rt
.spawn("echo", identity.clone(), DaemonHostConfig::default())
.await
.expect_err("second spawn with same identity must fail");
match err {
DaemonError::Core(CoreDaemonError::ProcessFailed(ref m)) => {
assert!(
m.contains("already registered"),
"expected 'already registered' in message, got {m:?}",
);
}
other => panic!(
"expected Core(ProcessFailed(already registered)) from atomic factory_registry; \
got {other:?} — collision caught too late may mean the incumbent's slot was clobbered",
),
}
assert_eq!(rt.daemon_count(), 1);
let snapshot = rt
.snapshot(identity.keypair().origin_hash())
.await
.expect("snapshot");
assert!(
snapshot.is_none(),
"EchoDaemon is stateless, so snapshot returns Ok(None)",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn duplicate_spawn_from_snapshot_does_not_corrupt_first_daemon() {
let rt = runtime().await;
rt.register_factory("counter", || Box::new(CounterDaemon { count: 0 }))
.expect("register");
rt.start().await.expect("start");
let identity = Identity::generate();
let handle = rt
.spawn("counter", identity.clone(), DaemonHostConfig::default())
.await
.expect("first spawn");
for i in 1..=2u64 {
rt.deliver(handle.origin_hash, &event(handle.origin_hash, i, b"tick"))
.expect("deliver");
}
let snapshot = handle
.snapshot()
.await
.expect("snapshot")
.expect("counter is stateful");
let err = rt
.spawn_from_snapshot(
"counter",
identity.clone(),
snapshot,
DaemonHostConfig::default(),
)
.await
.expect_err("duplicate spawn_from_snapshot must fail");
match err {
DaemonError::Core(CoreDaemonError::ProcessFailed(ref m)) => {
assert!(
m.contains("already registered"),
"expected 'already registered' in message, got {m:?}",
);
}
other => panic!("expected Core(ProcessFailed), got {other:?}"),
}
assert_eq!(rt.daemon_count(), 1);
let outputs = rt
.deliver(
handle.origin_hash,
&event(handle.origin_hash, 3, b"post-dupe"),
)
.expect("deliver after failed duplicate");
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&outputs[0].payload);
assert_eq!(
u64::from_le_bytes(bytes),
3,
"incumbent counter must continue at 3, not reset",
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn expect_migration_collision_is_rejected() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.expect("register");
rt.start().await.expect("start");
let origin_hash = 0xDEAD_BEEFu64;
rt.expect_migration("echo", origin_hash, DaemonHostConfig::default())
.expect("first expect_migration");
let err = rt
.expect_migration("echo", origin_hash, DaemonHostConfig::default())
.expect_err("duplicate expect_migration must fail");
match err {
DaemonError::Core(CoreDaemonError::ProcessFailed(ref m)) => {
assert!(m.contains("already registered"), "got {m:?}");
}
other => panic!("expected Core(ProcessFailed), got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stop_drops_daemon_from_registry() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.expect("register");
rt.start().await.expect("start");
let handle = rt
.spawn("echo", Identity::generate(), DaemonHostConfig::default())
.await
.expect("spawn");
assert_eq!(rt.daemon_count(), 1);
rt.stop(handle.origin_hash).await.expect("stop");
assert_eq!(rt.daemon_count(), 0);
let err = rt
.deliver(handle.origin_hash, &event(handle.origin_hash, 1, b"drop"))
.expect_err("deliver to gone daemon must fail");
match err {
DaemonError::Core(CoreDaemonError::NotFound(o)) => assert_eq!(o, handle.origin_hash),
other => panic!("expected Core(NotFound), got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn shutdown_drains_all_daemons() {
let rt = runtime().await;
rt.register_factory("echo", || Box::new(EchoDaemon))
.expect("register");
rt.start().await.expect("start");
for _ in 0..5 {
rt.spawn("echo", Identity::generate(), DaemonHostConfig::default())
.await
.expect("spawn");
}
assert_eq!(rt.daemon_count(), 5);
rt.shutdown().await.expect("shutdown");
assert_eq!(rt.daemon_count(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn factory_is_invoked_once_per_spawn() {
let counter = Arc::new(AtomicU32::new(0));
let counter_for_factory = counter.clone();
let rt = runtime().await;
rt.register_factory("echo", move || {
counter_for_factory.fetch_add(1, Ordering::SeqCst);
Box::new(EchoDaemon)
})
.expect("register");
rt.start().await.expect("start");
for _ in 0..3 {
rt.spawn("echo", Identity::generate(), DaemonHostConfig::default())
.await
.expect("spawn");
}
assert_eq!(counter.load(Ordering::SeqCst), 3);
}