use std::collections::HashSet;
use crate::adapter::net::behavior::loadbalance::{RequestContext, Strategy};
use crate::adapter::net::behavior::metadata::NodeId;
use crate::adapter::net::behavior::placement::{Artifact, PlacementFilter, TieBreakContext};
use crate::adapter::net::compute::daemon::{DaemonHostConfig, MeshDaemon};
use crate::adapter::net::compute::group_coord::{
GroupCoordinator, GroupError, GroupHealth, MemberInfo,
};
use crate::adapter::net::compute::host::DaemonHost;
use crate::adapter::net::compute::registry::DaemonRegistry;
use crate::adapter::net::compute::scheduler::Scheduler;
use crate::adapter::net::identity::EntityKeypair;
pub const SUBPROTOCOL_REPLICA_GROUP: u16 = 0x0900;
#[derive(Debug, Clone)]
pub struct ReplicaGroupConfig {
pub replica_count: u8,
pub group_seed: [u8; 32],
pub lb_strategy: Strategy,
pub host_config: DaemonHostConfig,
}
#[expect(
clippy::expect_used,
reason = "Blake2sMac::new_from_slice rejects only keys longer than 32 bytes; b\"net-replica-v1\" is a 16-byte compile-time-constant label"
)]
pub fn derive_replica_keypair(group_seed: &[u8; 32], index: u8) -> EntityKeypair {
use blake2::{
digest::{consts::U32, Mac},
Blake2sMac,
};
let mut input = [0u8; 33];
input[..32].copy_from_slice(group_seed);
input[32] = index;
let mut mac = <Blake2sMac<U32> as Mac>::new_from_slice(b"net-replica-v1")
.expect("BLAKE2s accepts variable-length keys");
Mac::update(&mut mac, &input);
let secret: [u8; 32] = mac.finalize().into_bytes().into();
EntityKeypair::from_bytes(secret)
}
pub struct ReplicaGroup {
group_id: u32,
config: ReplicaGroupConfig,
coord: GroupCoordinator,
term: u64,
}
impl ReplicaGroup {
pub fn spawn<F>(
config: ReplicaGroupConfig,
daemon_factory: F,
scheduler: &Scheduler,
registry: &DaemonRegistry,
) -> Result<Self, GroupError>
where
F: Fn() -> Box<dyn MeshDaemon>,
{
if config.replica_count == 0 {
return Err(GroupError::InvalidConfig(
"replica_count must be > 0".into(),
));
}
let group_id = {
use xxhash_rust::xxh3::xxh3_64;
xxh3_64(&config.group_seed) as u32
};
let mut coord = GroupCoordinator::new(config.lb_strategy);
let mut used_nodes: HashSet<u64> = HashSet::new();
let requirements = daemon_factory().requirements();
for index in 0..config.replica_count {
let keypair = derive_replica_keypair(&config.group_seed, index);
let origin_hash = keypair.origin_hash();
let entity_id_bytes: NodeId = *keypair.entity_id().as_bytes();
let placement =
GroupCoordinator::place_with_spread(scheduler, &requirements, &used_nodes)?;
let node_id = placement.node_id;
used_nodes.insert(node_id);
let daemon = daemon_factory();
let host = DaemonHost::new(daemon, keypair, config.host_config.clone());
registry.register(host)?;
coord.add_member(MemberInfo {
index,
origin_hash,
node_id,
entity_id_bytes,
healthy: true,
});
}
Ok(Self {
group_id,
config,
coord,
term: 1,
})
}
pub fn route_event(&self, ctx: &RequestContext) -> Result<u64, GroupError> {
self.coord.route_event(ctx)
}
pub fn term(&self) -> u64 {
self.term
}
pub fn scale_to<F>(
&mut self,
n: u8,
daemon_factory: F,
scheduler: &Scheduler,
registry: &DaemonRegistry,
) -> Result<(), GroupError>
where
F: Fn() -> Box<dyn MeshDaemon>,
{
if n == 0 {
return Err(GroupError::InvalidConfig(
"replica_count must be > 0".into(),
));
}
let current = self.coord.member_count();
if n > current {
let requirements = daemon_factory().requirements();
let mut used_nodes: HashSet<u64> =
self.coord.members().iter().map(|m| m.node_id).collect();
for index in current..n {
let keypair = derive_replica_keypair(&self.config.group_seed, index);
let origin_hash = keypair.origin_hash();
let entity_id_bytes: NodeId = *keypair.entity_id().as_bytes();
let placement =
GroupCoordinator::place_with_spread(scheduler, &requirements, &used_nodes)?;
used_nodes.insert(placement.node_id);
let daemon = daemon_factory();
let host = DaemonHost::new(daemon, keypair, self.config.host_config.clone());
registry.register(host)?;
self.coord.add_member(MemberInfo {
index,
origin_hash,
node_id: placement.node_id,
entity_id_bytes,
healthy: true,
});
}
} else if n < current {
while self.coord.member_count() > n {
let Some(info) = self.coord.remove_last() else {
debug_assert!(
false,
"member_count > n but remove_last is None — coord invariant violation",
);
break;
};
let _ = registry.unregister(info.origin_hash);
}
}
self.config.replica_count = n;
Ok(())
}
pub fn on_node_failure<F>(
&mut self,
failed_node_id: u64,
daemon_factory: F,
scheduler: &Scheduler,
registry: &DaemonRegistry,
) -> Result<Vec<u8>, GroupError>
where
F: Fn() -> Box<dyn MeshDaemon>,
{
let mut replaced = Vec::new();
let requirements = daemon_factory().requirements();
let mut exclude: HashSet<u64> = HashSet::new();
exclude.insert(failed_node_id);
let affected = self.coord.members_on_node(failed_node_id);
for index in affected {
self.coord.mark_unhealthy(index);
#[expect(
clippy::unwrap_used,
reason = "index came from coord.members_on_node above; the matching member is guaranteed to exist"
)]
let old_origin_hash = self
.coord
.members()
.iter()
.find(|m| m.index == index)
.unwrap()
.origin_hash;
let _ = old_origin_hash; let keypair = derive_replica_keypair(&self.config.group_seed, index);
let entity_id_bytes: NodeId = *keypair.entity_id().as_bytes();
let placement =
match GroupCoordinator::place_with_spread(scheduler, &requirements, &exclude) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
index,
error = %e,
"ReplicaGroup::on_node_failure: place_with_spread failed; \
slot left registered for later recovery (#7)"
);
continue;
}
};
let daemon = daemon_factory();
let host = DaemonHost::new(daemon, keypair, self.config.host_config.clone());
registry.replace(host);
self.coord
.update_member_placement(index, placement.node_id, entity_id_bytes);
exclude.insert(placement.node_id);
replaced.push(index);
}
Ok(replaced)
}
pub fn spawn_with_placement<F>(
config: ReplicaGroupConfig,
daemon_factory: F,
scheduler: &Scheduler,
registry: &DaemonRegistry,
placement: &dyn PlacementFilter,
tie_break: &TieBreakContext<'_>,
) -> Result<Self, GroupError>
where
F: Fn() -> Box<dyn MeshDaemon>,
{
if config.replica_count == 0 {
return Err(GroupError::InvalidConfig(
"replica_count must be > 0".into(),
));
}
let group_id = {
use xxhash_rust::xxh3::xxh3_64;
xxh3_64(&config.group_seed) as u32
};
let mut coord = GroupCoordinator::new(config.lb_strategy);
let mut used_nodes: HashSet<u64> = HashSet::new();
let prototype = daemon_factory();
let requirements = prototype.requirements();
let required = prototype.required_capabilities();
let optional = prototype.optional_capabilities();
drop(prototype);
for index in 0..config.replica_count {
let keypair = derive_replica_keypair(&config.group_seed, index);
let origin_hash = keypair.origin_hash();
let entity_id_bytes: NodeId = *keypair.entity_id().as_bytes();
let artifact = Artifact::Daemon {
daemon_id: entity_id_bytes,
required: &required,
optional: &optional,
};
let decision = GroupCoordinator::place_member(
scheduler,
&artifact,
&requirements,
&used_nodes,
placement,
tie_break,
)?;
let node_id = decision.node_id;
used_nodes.insert(node_id);
let daemon = daemon_factory();
let host = DaemonHost::new(daemon, keypair, config.host_config.clone());
registry.register(host)?;
coord.add_member(MemberInfo {
index,
origin_hash,
node_id,
entity_id_bytes,
healthy: true,
});
}
Ok(Self {
group_id,
config,
coord,
term: 1,
})
}
pub fn scale_to_with_placement<F>(
&mut self,
n: u8,
daemon_factory: F,
scheduler: &Scheduler,
registry: &DaemonRegistry,
placement: &dyn PlacementFilter,
tie_break: &TieBreakContext<'_>,
) -> Result<(), GroupError>
where
F: Fn() -> Box<dyn MeshDaemon>,
{
if n == 0 {
return Err(GroupError::InvalidConfig(
"replica_count must be > 0".into(),
));
}
let current = self.coord.member_count();
if n > current {
let prototype = daemon_factory();
let requirements = prototype.requirements();
let required = prototype.required_capabilities();
let optional = prototype.optional_capabilities();
drop(prototype);
let mut used_nodes: HashSet<u64> =
self.coord.members().iter().map(|m| m.node_id).collect();
for index in current..n {
let keypair = derive_replica_keypair(&self.config.group_seed, index);
let origin_hash = keypair.origin_hash();
let entity_id_bytes: NodeId = *keypair.entity_id().as_bytes();
let artifact = Artifact::Daemon {
daemon_id: entity_id_bytes,
required: &required,
optional: &optional,
};
let decision = GroupCoordinator::place_member(
scheduler,
&artifact,
&requirements,
&used_nodes,
placement,
tie_break,
)?;
used_nodes.insert(decision.node_id);
let daemon = daemon_factory();
let host = DaemonHost::new(daemon, keypair, self.config.host_config.clone());
registry.register(host)?;
self.coord.add_member(MemberInfo {
index,
origin_hash,
node_id: decision.node_id,
entity_id_bytes,
healthy: true,
});
}
} else if n < current {
while self.coord.member_count() > n {
let Some(info) = self.coord.remove_last() else {
debug_assert!(
false,
"member_count > n but remove_last is None — coord invariant violation",
);
break;
};
let _ = registry.unregister(info.origin_hash);
}
}
self.config.replica_count = n;
Ok(())
}
pub fn on_node_failure_with_placement<F>(
&mut self,
failed_node_id: u64,
daemon_factory: F,
scheduler: &Scheduler,
registry: &DaemonRegistry,
placement: &dyn PlacementFilter,
tie_break: &TieBreakContext<'_>,
) -> Result<Vec<u8>, GroupError>
where
F: Fn() -> Box<dyn MeshDaemon>,
{
let mut replaced = Vec::new();
let prototype = daemon_factory();
let requirements = prototype.requirements();
let required = prototype.required_capabilities();
let optional = prototype.optional_capabilities();
drop(prototype);
let mut exclude: HashSet<u64> = HashSet::new();
exclude.insert(failed_node_id);
let affected = self.coord.members_on_node(failed_node_id);
for index in affected {
self.coord.mark_unhealthy(index);
let keypair = derive_replica_keypair(&self.config.group_seed, index);
let entity_id_bytes: NodeId = *keypair.entity_id().as_bytes();
let artifact = Artifact::Daemon {
daemon_id: entity_id_bytes,
required: &required,
optional: &optional,
};
let decision = match GroupCoordinator::place_member(
scheduler,
&artifact,
&requirements,
&exclude,
placement,
tie_break,
) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
index,
error = %e,
"ReplicaGroup::on_node_failure_with_placement: place_member failed; \
slot left registered for later recovery"
);
continue;
}
};
let daemon = daemon_factory();
let host = DaemonHost::new(daemon, keypair, self.config.host_config.clone());
registry.replace(host);
self.coord
.update_member_placement(index, decision.node_id, entity_id_bytes);
exclude.insert(decision.node_id);
replaced.push(index);
}
Ok(replaced)
}
pub fn on_node_recovery(&mut self, recovered_node_id: u64, registry: &DaemonRegistry) {
self.coord.on_node_recovery(recovered_node_id, registry);
}
pub fn health(&self) -> GroupHealth {
self.coord.health()
}
pub fn group_id(&self) -> u32 {
self.group_id
}
pub fn replicas(&self) -> &[MemberInfo] {
self.coord.members()
}
pub fn replica_count(&self) -> u8 {
self.coord.member_count()
}
pub fn healthy_count(&self) -> u8 {
self.coord.healthy_count()
}
fn try_recover_inner<F>(
&mut self,
scheduler: &Scheduler,
registry: &DaemonRegistry,
daemon_factory: F,
) -> Vec<u8>
where
F: Fn() -> Box<dyn MeshDaemon>,
{
const MAX_RECOVERIES_PER_TICK: usize = 4;
let unhealthy: Vec<u8> = self
.coord
.members()
.iter()
.filter(|m| !m.healthy)
.map(|m| m.index)
.take(MAX_RECOVERIES_PER_TICK)
.collect();
if unhealthy.is_empty() {
return Vec::new();
}
let requirements = daemon_factory().requirements();
let mut exclude: HashSet<u64> = self
.coord
.members()
.iter()
.filter(|m| m.healthy)
.map(|m| m.node_id)
.collect();
let mut recovered = Vec::with_capacity(unhealthy.len());
for index in unhealthy {
let keypair = derive_replica_keypair(&self.config.group_seed, index);
let entity_id_bytes: NodeId = *keypair.entity_id().as_bytes();
let placement =
match GroupCoordinator::place_with_spread(scheduler, &requirements, &exclude) {
Ok(p) => p,
Err(e) => {
tracing::trace!(
index,
error = %e,
"ReplicaGroup::try_recover: place_with_spread still failing; \
slot remains unhealthy for next tick"
);
continue;
}
};
let daemon = daemon_factory();
let host = DaemonHost::new(daemon, keypair, self.config.host_config.clone());
registry.replace(host);
self.coord
.update_member_placement(index, placement.node_id, entity_id_bytes);
exclude.insert(placement.node_id);
recovered.push(index);
}
if !recovered.is_empty() {
self.term = self.term.saturating_add(1);
}
recovered
}
}
impl crate::adapter::net::compute::UnhealthySlotRecovery for ReplicaGroup {
fn has_unhealthy_slots(&self) -> bool {
self.coord.members().iter().any(|m| !m.healthy)
}
fn try_recover(
&mut self,
scheduler: &Scheduler,
registry: &DaemonRegistry,
daemon_factory: &dyn Fn() -> Box<dyn MeshDaemon>,
) -> Vec<u8> {
self.try_recover_inner(scheduler, registry, daemon_factory)
}
}
impl std::fmt::Debug for ReplicaGroup {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReplicaGroup")
.field("group_id", &format!("{:#x}", self.group_id))
.field("replicas", &self.coord.member_count())
.field("healthy", &self.coord.healthy_count())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::capability::{
CapabilityAnnouncement, CapabilityFilter, CapabilitySet,
};
use crate::adapter::net::compute::DaemonError;
use crate::adapter::net::state::causal::CausalEvent;
use bytes::Bytes;
use std::sync::Arc;
struct NoopDaemon;
impl MeshDaemon for NoopDaemon {
fn name(&self) -> &str {
"noop"
}
fn requirements(&self) -> CapabilityFilter {
CapabilityFilter::default()
}
fn process(&mut self, _event: &CausalEvent) -> Result<Vec<Bytes>, DaemonError> {
Ok(vec![])
}
}
fn make_scheduler() -> Scheduler {
use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let eid = crate::adapter::net::identity::EntityId::from_bytes([0u8; 32]);
for node_id in [0x1111u64, 0x2222, 0x3333, 0x4444] {
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(node_id, eid.clone(), 1, CapabilitySet::new()),
)
.expect("apply legacy announcement in fixture");
}
Scheduler::new(fold, 0x1111, CapabilitySet::new())
}
fn test_config(n: u8) -> ReplicaGroupConfig {
ReplicaGroupConfig {
replica_count: n,
group_seed: [42u8; 32],
lb_strategy: Strategy::RoundRobin,
host_config: DaemonHostConfig::default(),
}
}
#[test]
fn test_spawn_group() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let group =
ReplicaGroup::spawn(test_config(3), || Box::new(NoopDaemon), &sched, ®).unwrap();
assert_eq!(group.replica_count(), 3);
assert_eq!(group.health(), GroupHealth::Healthy);
assert_eq!(reg.count(), 3);
let hashes: HashSet<u64> = group.replicas().iter().map(|r| r.origin_hash).collect();
assert_eq!(hashes.len(), 3);
}
#[test]
fn test_deterministic_keypairs() {
let seed = [7u8; 32];
let kp1 = derive_replica_keypair(&seed, 0);
let kp2 = derive_replica_keypair(&seed, 0);
assert_eq!(kp1.origin_hash(), kp2.origin_hash());
let kp3 = derive_replica_keypair(&seed, 1);
assert_ne!(kp1.origin_hash(), kp3.origin_hash());
}
#[test]
fn test_zero_replicas_rejected() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let err =
ReplicaGroup::spawn(test_config(0), || Box::new(NoopDaemon), &sched, ®).unwrap_err();
assert_eq!(
err,
GroupError::InvalidConfig("replica_count must be > 0".into())
);
}
#[test]
fn test_route_event() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let group =
ReplicaGroup::spawn(test_config(3), || Box::new(NoopDaemon), &sched, ®).unwrap();
let ctx = RequestContext::default();
let origin = group.route_event(&ctx).unwrap();
assert!(group.replicas().iter().any(|r| r.origin_hash == origin));
}
#[test]
fn test_scale_up() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let mut group =
ReplicaGroup::spawn(test_config(2), || Box::new(NoopDaemon), &sched, ®).unwrap();
group
.scale_to(4, || Box::new(NoopDaemon), &sched, ®)
.unwrap();
assert_eq!(group.replica_count(), 4);
assert_eq!(reg.count(), 4);
}
#[test]
fn scale_up_does_not_colocate_new_replicas() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let mut group =
ReplicaGroup::spawn(test_config(1), || Box::new(NoopDaemon), &sched, ®).unwrap();
group
.scale_to(4, || Box::new(NoopDaemon), &sched, ®)
.unwrap();
let node_ids: HashSet<u64> = group.replicas().iter().map(|r| r.node_id).collect();
assert_eq!(
node_ids.len(),
4,
"all 4 replicas should land on distinct nodes — \
colocation indicates BUG_REPORT.md #6 has regressed; \
got node ids {:?}",
group
.replicas()
.iter()
.map(|r| r.node_id)
.collect::<Vec<_>>()
);
}
#[test]
fn test_scale_down() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let mut group =
ReplicaGroup::spawn(test_config(4), || Box::new(NoopDaemon), &sched, ®).unwrap();
group
.scale_to(2, || Box::new(NoopDaemon), &sched, ®)
.unwrap();
assert_eq!(group.replica_count(), 2);
assert_eq!(reg.count(), 2);
}
#[test]
fn test_node_failure_and_replacement() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let mut group =
ReplicaGroup::spawn(test_config(3), || Box::new(NoopDaemon), &sched, ®).unwrap();
let failed_node = group.replicas()[0].node_id;
let failed_origin = group.replicas()[0].origin_hash;
let replaced = group
.on_node_failure(failed_node, || Box::new(NoopDaemon), &sched, ®)
.unwrap();
assert!(!replaced.is_empty());
assert_ne!(group.health(), GroupHealth::Dead);
assert!(group
.replicas()
.iter()
.any(|r| r.origin_hash == failed_origin));
}
#[test]
fn test_node_recovery() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let mut group =
ReplicaGroup::spawn(test_config(2), || Box::new(NoopDaemon), &sched, ®).unwrap();
let node = group.replicas()[0].node_id;
group.coord.mark_unhealthy(0);
assert_eq!(
group.health(),
GroupHealth::Degraded {
healthy: 1,
total: 2
}
);
group.on_node_recovery(node, ®);
assert_eq!(group.health(), GroupHealth::Healthy);
}
#[test]
fn test_group_health_dead() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let mut group =
ReplicaGroup::spawn(test_config(2), || Box::new(NoopDaemon), &sched, ®).unwrap();
group.coord.mark_unhealthy(0);
group.coord.mark_unhealthy(1);
assert_eq!(group.health(), GroupHealth::Dead);
}
#[test]
fn test_group_id_deterministic() {
let reg1 = DaemonRegistry::new();
let reg2 = DaemonRegistry::new();
let sched = make_scheduler();
let g1 =
ReplicaGroup::spawn(test_config(1), || Box::new(NoopDaemon), &sched, ®1).unwrap();
let g2 =
ReplicaGroup::spawn(test_config(1), || Box::new(NoopDaemon), &sched, ®2).unwrap();
assert_eq!(g1.group_id(), g2.group_id());
}
#[test]
fn place_failure_does_not_strand_slot_in_unregistered_state() {
fn single_node_scheduler() -> Scheduler {
use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let eid = crate::adapter::net::identity::EntityId::from_bytes([0u8; 32]);
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(0x9999, eid, 1, CapabilitySet::new()),
)
.expect("apply legacy announcement in fixture");
Scheduler::new(fold, 0x9999, CapabilitySet::new())
}
let reg = DaemonRegistry::new();
let sched = single_node_scheduler();
let mut group =
ReplicaGroup::spawn(test_config(1), || Box::new(NoopDaemon), &sched, ®).unwrap();
let failed_node = group.replicas()[0].node_id;
let failed_origin = group.replicas()[0].origin_hash;
assert_eq!(failed_node, 0x9999);
assert!(reg.contains(failed_origin));
let replaced = group
.on_node_failure(failed_node, || Box::new(NoopDaemon), &sched, ®)
.unwrap();
assert!(
replaced.is_empty(),
"with no spare nodes, placement must fail and no replacement is recorded"
);
assert!(
reg.contains(failed_origin),
"BUG_REPORT.md #7: slot must remain registered when placement \
fails — otherwise on_node_recovery cannot restore it"
);
group.on_node_recovery(failed_node, ®);
assert_eq!(
group.health(),
GroupHealth::Healthy,
"after recovery the slot must be healthy again — the pre-fix \
code left it permanently unhealthy + unregistered"
);
}
use crate::adapter::net::behavior::placement::{NodeId as PlacementNodeId, ResourceAxis};
fn make_scheduler_and_index(node_ids: &[u64]) -> Scheduler {
use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let eid = crate::adapter::net::identity::EntityId::from_bytes([0u8; 32]);
for &id in node_ids {
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(id, eid.clone(), 1, CapabilitySet::new()),
)
.expect("apply legacy announcement in fixture");
}
let local = node_ids.first().copied().unwrap_or(0xFFFF);
Scheduler::new(fold, local, CapabilitySet::new())
}
struct AllowAll;
impl PlacementFilter for AllowAll {
fn placement_score(&self, _: &PlacementNodeId, _: &Artifact<'_>) -> Option<f32> {
Some(1.0)
}
}
#[test]
fn spawn_with_placement_spreads_across_nodes() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_and_index(&[0x1111, 0x2222, 0x3333, 0x4444]);
let tb = TieBreakContext {
rtt_lookup: None,
resource_axis: ResourceAxis::Compute,
};
let group = ReplicaGroup::spawn_with_placement(
test_config(3),
|| Box::new(NoopDaemon),
&sched,
®,
&AllowAll,
&tb,
)
.expect("spawn_with_placement should succeed with 4 candidate nodes");
assert_eq!(group.replica_count(), 3);
assert_eq!(group.health(), GroupHealth::Healthy);
let node_ids: HashSet<u64> = group.replicas().iter().map(|r| r.node_id).collect();
assert_eq!(
node_ids.len(),
3,
"spread invariant: all 3 replicas on distinct nodes"
);
}
#[test]
fn spawn_with_placement_routes_first_replica_to_highest_scorer() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_and_index(&[0x1111, 0x2222, 0x3333, 0x4444]);
let tb = TieBreakContext {
rtt_lookup: None,
resource_axis: ResourceAxis::Compute,
};
struct PreferHighest;
impl PlacementFilter for PreferHighest {
fn placement_score(&self, t: &PlacementNodeId, _: &Artifact<'_>) -> Option<f32> {
Some(if *t == 0x4444 { 1.0 } else { 0.1 })
}
}
let group = ReplicaGroup::spawn_with_placement(
test_config(1),
|| Box::new(NoopDaemon),
&sched,
®,
&PreferHighest,
&tb,
)
.expect("spawn_with_placement with 1 replica should succeed");
assert_eq!(group.replicas()[0].node_id, 0x4444);
}
#[test]
fn spawn_with_placement_returns_placement_failed_when_all_vetoed() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_and_index(&[0x1111, 0x2222]);
let tb = TieBreakContext {
rtt_lookup: None,
resource_axis: ResourceAxis::Compute,
};
struct VetoAll;
impl PlacementFilter for VetoAll {
fn placement_score(&self, _: &PlacementNodeId, _: &Artifact<'_>) -> Option<f32> {
None
}
}
let err = ReplicaGroup::spawn_with_placement(
test_config(1),
|| Box::new(NoopDaemon),
&sched,
®,
&VetoAll,
&tb,
)
.expect_err("VetoAll filter should make placement fail");
assert!(matches!(err, GroupError::PlacementFailed(_)));
assert_eq!(reg.count(), 0, "no host registered when placement fails");
}
#[test]
fn scale_to_with_placement_spreads_new_replicas() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_and_index(&[0x1111, 0x2222, 0x3333, 0x4444]);
let tb = TieBreakContext {
rtt_lookup: None,
resource_axis: ResourceAxis::Compute,
};
let mut group = ReplicaGroup::spawn_with_placement(
test_config(1),
|| Box::new(NoopDaemon),
&sched,
®,
&AllowAll,
&tb,
)
.unwrap();
group
.scale_to_with_placement(4, || Box::new(NoopDaemon), &sched, ®, &AllowAll, &tb)
.expect("scale_to_with_placement should succeed with 4 nodes");
let node_ids: HashSet<u64> = group.replicas().iter().map(|r| r.node_id).collect();
assert_eq!(
node_ids.len(),
4,
"spread invariant under v2: all 4 replicas on distinct nodes"
);
assert_eq!(reg.count(), 4);
}
#[test]
fn scale_to_with_placement_scale_down_does_not_invoke_filter() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_and_index(&[0x1111, 0x2222, 0x3333]);
let tb = TieBreakContext {
rtt_lookup: None,
resource_axis: ResourceAxis::Compute,
};
let mut group = ReplicaGroup::spawn_with_placement(
test_config(3),
|| Box::new(NoopDaemon),
&sched,
®,
&AllowAll,
&tb,
)
.unwrap();
assert_eq!(reg.count(), 3);
struct VetoAll;
impl PlacementFilter for VetoAll {
fn placement_score(&self, _: &PlacementNodeId, _: &Artifact<'_>) -> Option<f32> {
None
}
}
group
.scale_to_with_placement(1, || Box::new(NoopDaemon), &sched, ®, &VetoAll, &tb)
.expect("scale-down does not invoke the filter");
assert_eq!(group.replica_count(), 1);
assert_eq!(reg.count(), 1);
}
#[test]
fn on_node_failure_with_placement_replaces_member_on_spare_node() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_and_index(&[0x1111, 0x2222, 0x3333, 0x4444]);
let tb = TieBreakContext {
rtt_lookup: None,
resource_axis: ResourceAxis::Compute,
};
let mut group = ReplicaGroup::spawn_with_placement(
test_config(2),
|| Box::new(NoopDaemon),
&sched,
®,
&AllowAll,
&tb,
)
.unwrap();
let failed_node = group.replicas()[0].node_id;
let failed_index = group.replicas()[0].index;
let failed_origin = group.replicas()[0].origin_hash;
let replaced = group
.on_node_failure_with_placement(
failed_node,
|| Box::new(NoopDaemon),
&sched,
®,
&AllowAll,
&tb,
)
.unwrap();
assert_eq!(replaced, vec![failed_index]);
let new_node = group
.replicas()
.iter()
.find(|r| r.index == failed_index)
.unwrap()
.node_id;
assert_ne!(new_node, failed_node);
assert!(reg.contains(failed_origin));
}
#[test]
fn on_node_failure_with_placement_preserves_slot_when_placement_fails() {
let reg = DaemonRegistry::new();
let sched = make_scheduler_and_index(&[0x9999]);
let tb = TieBreakContext {
rtt_lookup: None,
resource_axis: ResourceAxis::Compute,
};
let mut group = ReplicaGroup::spawn_with_placement(
test_config(1),
|| Box::new(NoopDaemon),
&sched,
®,
&AllowAll,
&tb,
)
.unwrap();
let failed_node = group.replicas()[0].node_id;
let failed_origin = group.replicas()[0].origin_hash;
assert_eq!(failed_node, 0x9999);
let replaced = group
.on_node_failure_with_placement(
failed_node,
|| Box::new(NoopDaemon),
&sched,
®,
&AllowAll,
&tb,
)
.unwrap();
assert!(
replaced.is_empty(),
"no spare → placement must fail and no replacement recorded"
);
assert!(
reg.contains(failed_origin),
"slot must remain registered when placement fails (recovery guarantee #7)"
);
group.on_node_recovery(failed_node, ®);
assert_eq!(group.health(), GroupHealth::Healthy);
}
#[test]
fn spawn_v1_path_unchanged_after_v2_added() {
let reg = DaemonRegistry::new();
let sched = make_scheduler();
let group =
ReplicaGroup::spawn(test_config(3), || Box::new(NoopDaemon), &sched, ®).unwrap();
assert_eq!(group.replica_count(), 3);
assert_eq!(group.health(), GroupHealth::Healthy);
}
}