#![cfg(feature = "groups")]
use std::sync::Arc;
use bytes::Bytes;
use net::adapter::net::behavior::capability::{CapabilityAnnouncement, CapabilitySet};
use net::adapter::net::behavior::loadbalance::{RequestContext, Strategy};
use net::adapter::net::compute::DaemonError as CoreDaemonError;
use net::adapter::net::identity::EntityId;
use net::adapter::net::state::causal::CausalEvent;
use net_sdk::capabilities::CapabilityFilter;
use net_sdk::compute::{DaemonHostConfig, DaemonRuntime, MeshDaemon};
use net_sdk::groups::{
ForkGroup, ForkGroupConfig, GroupError, GroupHealth, ReplicaGroup, ReplicaGroupConfig,
StandbyGroup, StandbyGroupConfig,
};
use net_sdk::mesh::MeshBuilder;
const PSK: [u8; 32] = [0x42u8; 32];
struct NoopDaemon;
impl MeshDaemon for NoopDaemon {
fn name(&self) -> &str {
"noop"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, CoreDaemonError> {
Ok(vec![])
}
}
async fn runtime_with_peers(extra_peers: usize) -> DaemonRuntime {
let mesh = MeshBuilder::new("127.0.0.1:0", &PSK)
.unwrap()
.build()
.await
.expect("build mesh");
let filler_eid = EntityId::from_bytes([0u8; 32]);
for i in 0..extra_peers {
let node_id = 0x1000_0000_0000_0000u64 + (i as u64 + 1);
mesh.inner()
.test_inject_capability_announcement(CapabilityAnnouncement::new(
node_id,
filler_eid.clone(),
1,
CapabilitySet::new(),
));
}
let rt = DaemonRuntime::new(Arc::new(mesh));
rt.register_factory("noop", || Box::new(NoopDaemon))
.expect("register factory");
rt.start().await.expect("start runtime");
rt
}
fn replica_config(n: u8, seed: u8) -> ReplicaGroupConfig {
ReplicaGroupConfig {
replica_count: n,
group_seed: [seed; 32],
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn replica_group_spawn_registers_members_and_reports_health() {
let rt = runtime_with_peers(3).await;
let group =
ReplicaGroup::spawn(&rt, "noop", replica_config(3, 0x11)).expect("spawn replica group");
assert_eq!(group.replica_count(), 3);
assert_eq!(group.healthy_count(), 3);
assert_eq!(group.health(), GroupHealth::Healthy);
assert_eq!(rt.daemon_count(), 3);
let replicas = group.replicas();
assert_eq!(replicas.len(), 3);
for r in &replicas {
assert!(r.healthy);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn replica_group_route_event_returns_live_origin() {
let rt = runtime_with_peers(3).await;
let group =
ReplicaGroup::spawn(&rt, "noop", replica_config(3, 0x22)).expect("spawn replica group");
let live: std::collections::HashSet<u64> =
group.replicas().iter().map(|m| m.origin_hash).collect();
for i in 0..30u64 {
let ctx = RequestContext::new().with_routing_key(format!("req-{i}"));
let origin = group.route_event(&ctx).expect("route");
assert!(
live.contains(&origin),
"route returned {:#x}; not in live set {:?}",
origin,
live
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn replica_group_scale_up_and_down_tracks_daemon_count() {
let rt = runtime_with_peers(5).await;
let group = ReplicaGroup::spawn(&rt, "noop", replica_config(2, 0x33)).expect("spawn");
assert_eq!(rt.daemon_count(), 2);
group.scale_to(5).expect("scale up");
assert_eq!(group.replica_count(), 5);
assert_eq!(rt.daemon_count(), 5);
group.scale_to(1).expect("scale down");
assert_eq!(group.replica_count(), 1);
assert_eq!(rt.daemon_count(), 1);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn replica_group_spawn_on_not_ready_runtime_errors() {
let mesh = MeshBuilder::new("127.0.0.1:0", &PSK)
.unwrap()
.build()
.await
.expect("build mesh");
let rt = DaemonRuntime::new(Arc::new(mesh));
rt.register_factory("noop", || Box::new(NoopDaemon))
.expect("register");
let err = ReplicaGroup::spawn(&rt, "noop", replica_config(2, 0x44))
.expect_err("spawn before start must error");
assert!(matches!(err, GroupError::NotReady));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn replica_group_unknown_kind_errors_with_factory_not_found() {
let rt = runtime_with_peers(2).await;
let err = ReplicaGroup::spawn(&rt, "never-registered", replica_config(2, 0x55))
.expect_err("unknown kind must error");
match err {
GroupError::FactoryNotFound(k) => assert_eq!(k, "never-registered"),
other => panic!("expected FactoryNotFound, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_group_forks_produce_unique_origins_with_verifiable_lineage() {
let rt = runtime_with_peers(4).await;
let parent_origin: u64 = 0xabcd_ef01;
let fork_seq: u64 = 42;
let group = ForkGroup::fork(
&rt,
"noop",
parent_origin,
fork_seq,
ForkGroupConfig {
fork_count: 3,
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
)
.expect("fork");
assert_eq!(group.fork_count(), 3);
assert_eq!(group.parent_origin(), parent_origin);
assert_eq!(group.fork_seq(), fork_seq);
let members = group.members();
let mut origins: Vec<u64> = members.iter().map(|m| m.origin_hash).collect();
origins.sort_unstable();
origins.dedup();
assert_eq!(origins.len(), 3, "each fork must have a unique origin_hash");
assert!(group.verify_lineage());
let records = group.fork_records();
assert_eq!(records.len(), 3);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_group_spawn_on_not_ready_runtime_errors() {
let mesh = MeshBuilder::new("127.0.0.1:0", &PSK)
.unwrap()
.build()
.await
.expect("build mesh");
let rt = DaemonRuntime::new(Arc::new(mesh));
rt.register_factory("noop", || Box::new(NoopDaemon))
.expect("register");
let err = ForkGroup::fork(
&rt,
"noop",
0x1234,
1,
ForkGroupConfig {
fork_count: 2,
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
},
)
.expect_err("fork on not-ready runtime");
assert!(matches!(err, GroupError::NotReady));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn standby_group_spawn_member_zero_is_active() {
let rt = runtime_with_peers(3).await;
let group = StandbyGroup::spawn(
&rt,
"noop",
StandbyGroupConfig {
member_count: 3,
group_seed: [0x77; 32],
host_config: DaemonHostConfig::default(),
},
)
.expect("spawn standby");
assert_eq!(group.member_count(), 3);
assert_eq!(group.standby_count(), 2);
assert_eq!(group.active_index(), 0);
assert!(group.active_healthy());
assert!(group.active_origin() != 0);
assert_eq!(group.buffered_event_count(), 0);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn standby_group_member_count_below_two_is_rejected() {
let rt = runtime_with_peers(3).await;
let err = StandbyGroup::spawn(
&rt,
"noop",
StandbyGroupConfig {
member_count: 1,
group_seed: [0x88; 32],
host_config: DaemonHostConfig::default(),
},
)
.expect_err("member_count=1 must error");
match err {
GroupError::Core(_) => {}
other => panic!("expected GroupError::Core, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn standby_group_unknown_kind_errors() {
let rt = runtime_with_peers(3).await;
let err = StandbyGroup::spawn(
&rt,
"never-registered",
StandbyGroupConfig {
member_count: 2,
group_seed: [0x99; 32],
host_config: DaemonHostConfig::default(),
},
)
.expect_err("unknown kind");
match err {
GroupError::FactoryNotFound(k) => assert_eq!(k, "never-registered"),
other => panic!("expected FactoryNotFound, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn standby_group_buffers_events_without_manual_hook() {
use ::net::adapter::net::state::causal::{CausalEvent, CausalLink};
use bytes::Bytes;
let rt = runtime_with_peers(3).await;
let group = StandbyGroup::spawn(
&rt,
"noop",
StandbyGroupConfig {
member_count: 3,
group_seed: [0xAAu8; 32],
host_config: DaemonHostConfig::default(),
},
)
.expect("spawn standby");
let active = group.active_origin();
assert_eq!(group.buffered_event_count(), 0, "buffer starts empty");
for i in 1..=3u64 {
let event = CausalEvent {
link: CausalLink {
origin_hash: active,
horizon_encoded: 0,
sequence: i,
parent_hash: 0,
},
payload: Bytes::from(format!("tick-{i}")),
received_at: 0,
};
rt.deliver(active, &event).expect("deliver");
}
assert_eq!(
group.buffered_event_count(),
3,
"expected 3 buffered events from automatic observer",
);
}