use std::collections::HashSet;
use std::sync::Arc;
use super::daemon::{LifecycleDaemon, LifecycleError, LifecycleHandle, ReplicaHealth};
use crate::adapter::net::behavior::capability::CapabilityFilter;
use crate::adapter::net::compute::group_coord::GroupCoordinator;
use crate::adapter::net::compute::replica_group::derive_replica_keypair;
use crate::adapter::net::compute::{PlacementDecision, Scheduler};
use crate::adapter::net::identity::EntityKeypair;
#[derive(Debug)]
pub enum LifecycleGroupError {
InvalidConfig(String),
StartFailed {
index: u8,
error: LifecycleError,
},
PlacementFailed {
index: u8,
reason: String,
},
}
impl std::fmt::Display for LifecycleGroupError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidConfig(msg) => write!(f, "invalid lifecycle group config: {msg}"),
Self::StartFailed { index, error } => {
write!(f, "replica {index} failed to start: {error}")
}
Self::PlacementFailed { index, reason } => {
write!(f, "replica {index} placement failed: {reason}")
}
}
}
}
impl std::error::Error for LifecycleGroupError {}
#[derive(Debug, Clone)]
pub struct ReplicaContext {
pub index: u8,
pub placement: Option<PlacementDecision>,
}
pub struct LifecycleGroup<L: LifecycleDaemon> {
handles: Vec<LifecycleHandle>,
replicas: Vec<Arc<L>>,
placements: Vec<PlacementDecision>,
group_seed: [u8; 32],
}
impl<L: LifecycleDaemon> LifecycleGroup<L> {
pub async fn spawn<F>(
replica_count: u8,
group_seed: [u8; 32],
factory: F,
) -> Result<Self, LifecycleGroupError>
where
F: FnMut(u8) -> Arc<L>,
{
if replica_count == 0 {
return Err(LifecycleGroupError::InvalidConfig(
"replica_count must be > 0".into(),
));
}
let (replicas, handles) = start_replicas(replica_count, factory).await?;
Ok(Self {
handles,
replicas,
placements: Vec::new(),
group_seed,
})
}
pub async fn spawn_with_placement<F>(
replica_count: u8,
group_seed: [u8; 32],
requirements: CapabilityFilter,
scheduler: &Scheduler,
mut factory: F,
) -> Result<Self, LifecycleGroupError>
where
F: FnMut(ReplicaContext) -> Arc<L>,
{
if replica_count == 0 {
return Err(LifecycleGroupError::InvalidConfig(
"replica_count must be > 0".into(),
));
}
let mut placements: Vec<PlacementDecision> = Vec::with_capacity(replica_count as usize);
let mut used_nodes: HashSet<u64> = HashSet::new();
for index in 0..replica_count {
match GroupCoordinator::place_with_spread(scheduler, &requirements, &used_nodes) {
Ok(decision) => {
used_nodes.insert(decision.node_id);
placements.push(decision);
}
Err(e) => {
return Err(LifecycleGroupError::PlacementFailed {
index,
reason: format!("{e}"),
});
}
}
}
let placements_for_factory = placements.clone();
let (replicas, handles) = start_replicas(replica_count, move |index| {
let ctx = ReplicaContext {
index,
placement: Some(placements_for_factory[index as usize].clone()),
};
factory(ctx)
})
.await?;
Ok(Self {
handles,
replicas,
placements,
group_seed,
})
}
pub fn replica_count(&self) -> usize {
self.handles.len()
}
pub fn group_seed(&self) -> &[u8; 32] {
&self.group_seed
}
pub fn replica_keypair(&self, index: u8) -> EntityKeypair {
derive_replica_keypair(&self.group_seed, index)
}
pub fn replica(&self, index: usize) -> Option<Arc<L>> {
self.replicas.get(index).cloned()
}
pub fn replicas(&self) -> Vec<Arc<L>> {
self.replicas.clone()
}
pub fn placement(&self, index: usize) -> Option<&PlacementDecision> {
self.placements.get(index)
}
pub fn placements(&self) -> &[PlacementDecision] {
&self.placements
}
pub async fn health(&self) -> Vec<ReplicaHealth> {
let futures = self.replicas.iter().map(|r| {
let r = r.clone();
async move { r.health().await }
});
futures::future::join_all(futures).await
}
pub async fn replace(
&mut self,
index: usize,
new_daemon: Arc<L>,
) -> Result<Arc<L>, LifecycleGroupError> {
if index >= self.replicas.len() {
return Err(LifecycleGroupError::InvalidConfig(format!(
"replace index {index} out of bounds for {} replicas",
self.replicas.len()
)));
}
let old_handle = self.handles.remove(index);
old_handle.stop().await;
let old_replica = std::mem::replace(&mut self.replicas[index], new_daemon.clone());
let trait_obj: Arc<dyn LifecycleDaemon> = new_daemon;
let new_handle = match LifecycleHandle::start(trait_obj).await {
Ok(h) => h,
Err(error) => {
return Err(LifecycleGroupError::StartFailed {
index: u8::try_from(index).unwrap_or(u8::MAX),
error,
});
}
};
self.handles.insert(index, new_handle);
Ok(old_replica)
}
pub async fn add_replica<F>(&mut self, factory: F) -> Result<u8, LifecycleGroupError>
where
F: FnOnce(u8) -> Arc<L>,
{
if self.replicas.len() >= u8::MAX as usize {
return Err(LifecycleGroupError::InvalidConfig(format!(
"cannot grow past u8::MAX replicas (current: {})",
self.replicas.len()
)));
}
let new_idx = self.replicas.len() as u8;
let daemon = factory(new_idx);
let trait_obj: Arc<dyn LifecycleDaemon> = daemon.clone();
let handle = LifecycleHandle::start(trait_obj).await.map_err(|error| {
LifecycleGroupError::StartFailed {
index: new_idx,
error,
}
})?;
self.replicas.push(daemon);
self.handles.push(handle);
Ok(new_idx)
}
pub async fn add_replicas<F>(
&mut self,
count: u8,
mut factory: F,
) -> Result<(), LifecycleGroupError>
where
F: FnMut(u8) -> Arc<L>,
{
if count == 0 {
return Ok(());
}
let new_total = (self.replicas.len() as u32) + (count as u32);
if new_total > u8::MAX as u32 {
return Err(LifecycleGroupError::InvalidConfig(format!(
"cannot grow past u8::MAX replicas (current: {}, requested +{})",
self.replicas.len(),
count
)));
}
let base_idx = self.replicas.len() as u8;
let mut new_daemons: Vec<Arc<L>> = Vec::with_capacity(count as usize);
let mut starts = Vec::with_capacity(count as usize);
for offset in 0..count {
let idx = base_idx + offset;
let daemon = factory(idx);
new_daemons.push(daemon.clone());
let trait_obj: Arc<dyn LifecycleDaemon> = daemon;
starts.push((idx, LifecycleHandle::start(trait_obj)));
}
let started: Vec<_> = futures::future::join_all(
starts
.into_iter()
.map(|(idx, fut)| async move { (idx, fut.await) }),
)
.await;
let mut handles = Vec::with_capacity(count as usize);
for (idx, result) in started {
match result {
Ok(h) => handles.push(h),
Err(error) => {
drop(handles);
drop(new_daemons);
return Err(LifecycleGroupError::StartFailed { index: idx, error });
}
}
}
self.replicas.extend(new_daemons);
self.handles.extend(handles);
Ok(())
}
pub async fn remove_last(&mut self) -> Result<Arc<L>, LifecycleGroupError> {
if self.replicas.len() <= 1 {
return Err(LifecycleGroupError::InvalidConfig(format!(
"cannot remove last replica below count 1 (current: {}); \
call stop() to dismantle the whole group instead",
self.replicas.len()
)));
}
#[allow(clippy::expect_used)]
let handle = self
.handles
.pop()
.expect("replica_count > 1 above; handles parallel to replicas");
handle.stop().await;
#[allow(clippy::expect_used)]
let replica = self
.replicas
.pop()
.expect("replica_count > 1 above; pop after handle.stop succeeded");
if !self.placements.is_empty() {
self.placements.pop();
}
Ok(replica)
}
pub fn handles(&self) -> &[LifecycleHandle] {
&self.handles
}
pub async fn stop(self) {
for h in self.handles {
h.stop().await;
}
}
pub fn into_parts(
self,
) -> (
Vec<Arc<L>>,
Vec<PlacementDecision>,
Vec<LifecycleHandle>,
[u8; 32],
) {
(
self.replicas,
self.placements,
self.handles,
self.group_seed,
)
}
}
async fn start_replicas<L, F>(
replica_count: u8,
mut factory: F,
) -> Result<(Vec<Arc<L>>, Vec<LifecycleHandle>), LifecycleGroupError>
where
L: LifecycleDaemon,
F: FnMut(u8) -> Arc<L>,
{
let mut replicas: Vec<Arc<L>> = Vec::with_capacity(replica_count as usize);
let mut starts = Vec::with_capacity(replica_count as usize);
for index in 0..replica_count {
let daemon = factory(index);
replicas.push(daemon.clone());
let trait_obj: Arc<dyn LifecycleDaemon> = daemon;
starts.push((index, LifecycleHandle::start(trait_obj)));
}
let started: Vec<_> = futures::future::join_all(
starts
.into_iter()
.map(|(i, fut)| async move { (i, fut.await) }),
)
.await;
let mut handles = Vec::with_capacity(replica_count as usize);
for (index, result) in started {
match result {
Ok(h) => handles.push(h),
Err(error) => {
drop(handles);
drop(replicas);
return Err(LifecycleGroupError::StartFailed { index, error });
}
}
}
Ok((replicas, handles))
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
struct CountingDaemon {
starts: AtomicU64,
stops: AtomicU64,
fail_start: AtomicBool,
}
impl CountingDaemon {
fn new() -> Self {
Self {
starts: AtomicU64::new(0),
stops: AtomicU64::new(0),
fail_start: AtomicBool::new(false),
}
}
}
#[async_trait]
impl LifecycleDaemon for CountingDaemon {
fn name(&self) -> &str {
"counting"
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
if self.fail_start.load(Ordering::Acquire) {
return Err(LifecycleError::StartFailed("intentional".into()));
}
self.starts.fetch_add(1, Ordering::AcqRel);
Ok(())
}
async fn on_stop(&self) {
self.stops.fetch_add(1, Ordering::AcqRel);
}
}
#[tokio::test]
async fn spawn_zero_replicas_is_rejected_as_config_error() {
let result = LifecycleGroup::<CountingDaemon>::spawn(0, [0u8; 32], |_| {
panic!("factory must not be called when replica_count == 0")
})
.await;
match result {
Err(LifecycleGroupError::InvalidConfig(msg)) => {
assert!(msg.contains("replica_count"), "msg was: {msg}");
}
Err(other) => panic!("expected InvalidConfig, got {other:?}"),
Ok(_) => panic!("expected InvalidConfig, got Ok"),
}
}
#[tokio::test]
async fn spawn_three_replicas_runs_each_lifecycle_then_stops_all() {
let factory_calls = Arc::new(parking_lot::Mutex::new(Vec::<u8>::new()));
let factory_calls_clone = factory_calls.clone();
let daemons: Arc<parking_lot::Mutex<Vec<Arc<CountingDaemon>>>> =
Arc::new(parking_lot::Mutex::new(Vec::new()));
let daemons_clone = daemons.clone();
let group = LifecycleGroup::<CountingDaemon>::spawn(3, [0xABu8; 32], move |idx| {
factory_calls_clone.lock().push(idx);
let d = Arc::new(CountingDaemon::new());
daemons_clone.lock().push(d.clone());
d
})
.await
.expect("group spawn");
assert_eq!(group.replica_count(), 3);
assert_eq!(*factory_calls.lock(), vec![0u8, 1, 2]);
for d in daemons.lock().iter() {
assert_eq!(d.starts.load(Ordering::Acquire), 1);
assert_eq!(d.stops.load(Ordering::Acquire), 0);
}
let r0 = group.replica(0).expect("replica 0");
assert_eq!(r0.starts.load(Ordering::Acquire), 1);
assert!(group.replica(3).is_none());
group.stop().await;
for d in daemons.lock().iter() {
assert_eq!(d.stops.load(Ordering::Acquire), 1);
}
}
#[tokio::test]
async fn replica_keypair_is_deterministic_for_a_given_index() {
let seed = [0x42u8; 32];
let group =
LifecycleGroup::<CountingDaemon>::spawn(
2,
seed,
|_idx| Arc::new(CountingDaemon::new()),
)
.await
.expect("group spawn");
let expected_kp_0 = derive_replica_keypair(&seed, 0);
let expected_kp_1 = derive_replica_keypair(&seed, 1);
assert_eq!(
group.replica_keypair(0).entity_id(),
expected_kp_0.entity_id()
);
assert_eq!(
group.replica_keypair(1).entity_id(),
expected_kp_1.entity_id()
);
assert_ne!(
group.replica_keypair(0).entity_id(),
group.replica_keypair(1).entity_id()
);
assert_eq!(group.group_seed(), &seed);
group.stop().await;
}
fn make_scheduler(node_ids: &[u64]) -> Scheduler {
use crate::adapter::net::behavior::capability::{CapabilityAnnouncement, CapabilitySet};
use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
let fold: Arc<Fold<CapabilityFold>> =
Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
let eid = crate::adapter::net::identity::EntityId::from_bytes([0u8; 32]);
for &id in node_ids {
capability_bridge::apply_legacy_announcement(
&fold,
CapabilityAnnouncement::new(id, eid.clone(), 1, CapabilitySet::new()),
)
.expect("apply legacy announcement in fixture");
}
let local = node_ids.first().copied().unwrap_or(0xFFFF);
Scheduler::new(fold, local, CapabilitySet::new())
}
#[tokio::test]
async fn spawn_with_placement_records_distinct_node_per_replica() {
let scheduler = make_scheduler(&[0x1111, 0x2222, 0x3333]);
let seen_placements = Arc::new(parking_lot::Mutex::new(Vec::<u64>::new()));
let seen_placements_clone = seen_placements.clone();
let group = LifecycleGroup::<CountingDaemon>::spawn_with_placement(
3,
[0u8; 32],
CapabilityFilter::default(),
&scheduler,
move |ctx| {
let node_id = ctx
.placement
.as_ref()
.expect("placement set under spawn_with_placement")
.node_id;
seen_placements_clone.lock().push(node_id);
Arc::new(CountingDaemon::new())
},
)
.await
.expect("spawn_with_placement");
let recorded: Vec<u64> = group.placements().iter().map(|p| p.node_id).collect();
assert_eq!(recorded.len(), 3);
let unique: std::collections::HashSet<u64> = recorded.iter().copied().collect();
assert_eq!(unique.len(), 3, "placements must be on distinct nodes");
assert_eq!(*seen_placements.lock(), recorded);
for i in 0..3 {
assert!(group.placement(i).is_some());
}
assert!(group.placement(3).is_none());
group.stop().await;
}
#[tokio::test]
async fn spawn_with_placement_fails_when_fewer_nodes_than_replicas() {
let scheduler = make_scheduler(&[0xAA, 0xBB]);
let result = LifecycleGroup::<CountingDaemon>::spawn_with_placement(
3,
[0u8; 32],
CapabilityFilter::default(),
&scheduler,
|_ctx| Arc::new(CountingDaemon::new()),
)
.await;
match result {
Err(LifecycleGroupError::PlacementFailed { index, .. }) => {
assert_eq!(index, 2);
}
Err(other) => panic!("expected PlacementFailed, got {other:?}"),
Ok(_) => panic!("expected PlacementFailed, got Ok"),
}
}
struct HealthControlDaemon {
force_unhealthy: AtomicBool,
starts: AtomicU64,
stops: AtomicU64,
}
impl HealthControlDaemon {
fn new() -> Self {
Self {
force_unhealthy: AtomicBool::new(false),
starts: AtomicU64::new(0),
stops: AtomicU64::new(0),
}
}
}
#[async_trait]
impl LifecycleDaemon for HealthControlDaemon {
fn name(&self) -> &str {
"health-control"
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
self.starts.fetch_add(1, Ordering::AcqRel);
Ok(())
}
async fn on_stop(&self) {
self.stops.fetch_add(1, Ordering::AcqRel);
}
async fn health(&self) -> ReplicaHealth {
if self.force_unhealthy.load(Ordering::Acquire) {
ReplicaHealth::unhealthy("test-forced")
} else {
ReplicaHealth::healthy()
}
}
}
#[tokio::test]
async fn health_returns_per_replica_snapshot_in_declaration_order() {
let daemons: Arc<parking_lot::Mutex<Vec<Arc<HealthControlDaemon>>>> =
Arc::new(parking_lot::Mutex::new(Vec::new()));
let daemons_clone = daemons.clone();
let group = LifecycleGroup::<HealthControlDaemon>::spawn(3, [0u8; 32], move |_idx| {
let d = Arc::new(HealthControlDaemon::new());
daemons_clone.lock().push(d.clone());
d
})
.await
.expect("spawn");
let snapshot = group.health().await;
assert_eq!(snapshot.len(), 3);
for h in &snapshot {
assert!(h.healthy);
assert!(h.diagnostic.is_none());
}
daemons.lock()[1]
.force_unhealthy
.store(true, Ordering::Release);
let snapshot = group.health().await;
assert!(snapshot[0].healthy);
assert!(!snapshot[1].healthy);
assert_eq!(snapshot[1].diagnostic.as_deref(), Some("test-forced"));
assert!(snapshot[2].healthy);
group.stop().await;
}
#[tokio::test]
async fn replace_stops_old_handle_and_installs_new_daemon() {
let daemons: Arc<parking_lot::Mutex<Vec<Arc<HealthControlDaemon>>>> =
Arc::new(parking_lot::Mutex::new(Vec::new()));
let daemons_clone = daemons.clone();
let mut group = LifecycleGroup::<HealthControlDaemon>::spawn(2, [0u8; 32], move |_idx| {
let d = Arc::new(HealthControlDaemon::new());
daemons_clone.lock().push(d.clone());
d
})
.await
.expect("spawn");
let original_idx_1 = daemons.lock()[1].clone();
assert_eq!(original_idx_1.stops.load(Ordering::Acquire), 0);
let replacement = Arc::new(HealthControlDaemon::new());
let returned = group
.replace(1, replacement.clone())
.await
.expect("replace");
assert!(Arc::ptr_eq(&returned, &original_idx_1));
assert_eq!(original_idx_1.stops.load(Ordering::Acquire), 1);
assert_eq!(replacement.starts.load(Ordering::Acquire), 1);
let now_at_1 = group.replica(1).expect("replica 1");
assert!(Arc::ptr_eq(&now_at_1, &replacement));
group.stop().await;
assert_eq!(replacement.stops.load(Ordering::Acquire), 1);
}
#[tokio::test]
async fn replace_rejects_out_of_bounds_index() {
let mut group = LifecycleGroup::<HealthControlDaemon>::spawn(2, [0u8; 32], |_idx| {
Arc::new(HealthControlDaemon::new())
})
.await
.expect("spawn");
let replacement = Arc::new(HealthControlDaemon::new());
match group.replace(5, replacement).await {
Err(LifecycleGroupError::InvalidConfig(msg)) => {
assert!(msg.contains("out of bounds"), "msg was: {msg}");
}
Err(other) => panic!("expected InvalidConfig, got {other:?}"),
Ok(_) => panic!("expected InvalidConfig, got Ok"),
}
group.stop().await;
}
#[tokio::test]
async fn spawn_path_leaves_placements_empty() {
let group = LifecycleGroup::<CountingDaemon>::spawn(2, [0u8; 32], |_idx| {
Arc::new(CountingDaemon::new())
})
.await
.expect("spawn");
assert!(group.placements().is_empty());
assert!(group.placement(0).is_none());
group.stop().await;
}
#[tokio::test]
async fn start_failure_at_index_two_returns_typed_error_with_index() {
let result = LifecycleGroup::<CountingDaemon>::spawn(3, [0u8; 32], |idx| {
let d = Arc::new(CountingDaemon::new());
if idx == 2 {
d.fail_start.store(true, Ordering::Release);
}
d
})
.await;
match result {
Err(LifecycleGroupError::StartFailed { index, error }) => {
assert_eq!(index, 2);
match error {
LifecycleError::StartFailed(msg) => assert_eq!(msg, "intentional"),
}
}
Err(other) => panic!("expected StartFailed, got {other:?}"),
Ok(_) => panic!("expected StartFailed, got Ok"),
}
}
#[tokio::test]
async fn add_replica_grows_in_place_preserving_existing_replicas() {
let daemons: Arc<parking_lot::Mutex<Vec<Arc<CountingDaemon>>>> =
Arc::new(parking_lot::Mutex::new(Vec::new()));
let daemons_clone = daemons.clone();
let mut group = LifecycleGroup::<CountingDaemon>::spawn(2, [0u8; 32], move |_idx| {
let d = Arc::new(CountingDaemon::new());
daemons_clone.lock().push(d.clone());
d
})
.await
.expect("initial spawn");
for d in daemons.lock().iter() {
assert_eq!(d.starts.load(Ordering::Acquire), 1);
}
let new_replica = Arc::new(CountingDaemon::new());
let new_replica_clone = new_replica.clone();
let new_idx = group
.add_replica(move |_idx| new_replica_clone)
.await
.expect("add_replica");
assert_eq!(new_idx, 2, "new index = old replica_count");
assert_eq!(group.replica_count(), 3);
assert_eq!(new_replica.starts.load(Ordering::Acquire), 1);
for d in daemons.lock().iter() {
assert_eq!(
d.starts.load(Ordering::Acquire),
1,
"existing replica restarted"
);
assert_eq!(
d.stops.load(Ordering::Acquire),
0,
"existing replica stopped"
);
}
group.stop().await;
}
#[tokio::test]
async fn remove_last_stops_only_the_last_replica() {
let daemons: Arc<parking_lot::Mutex<Vec<Arc<CountingDaemon>>>> =
Arc::new(parking_lot::Mutex::new(Vec::new()));
let daemons_clone = daemons.clone();
let mut group = LifecycleGroup::<CountingDaemon>::spawn(3, [0u8; 32], move |_idx| {
let d = Arc::new(CountingDaemon::new());
daemons_clone.lock().push(d.clone());
d
})
.await
.expect("spawn");
let removed = group.remove_last().await.expect("remove_last");
assert_eq!(group.replica_count(), 2);
let last_original = daemons.lock()[2].clone();
assert!(Arc::ptr_eq(&removed, &last_original));
assert_eq!(removed.stops.load(Ordering::Acquire), 1);
{
let kept = daemons.lock();
assert_eq!(kept[0].stops.load(Ordering::Acquire), 0);
assert_eq!(kept[1].stops.load(Ordering::Acquire), 0);
}
group.stop().await;
}
#[tokio::test]
async fn remove_last_refuses_to_drop_below_one() {
let mut group = LifecycleGroup::<CountingDaemon>::spawn(1, [0u8; 32], |_idx| {
Arc::new(CountingDaemon::new())
})
.await
.expect("spawn");
match group.remove_last().await {
Ok(_) => panic!("expected InvalidConfig, got Ok"),
Err(LifecycleGroupError::InvalidConfig(msg)) => {
assert!(msg.contains("cannot remove last replica"), "msg was: {msg}");
}
Err(other) => panic!("expected InvalidConfig, got {other:?}"),
}
assert_eq!(group.replica_count(), 1);
group.stop().await;
}
#[tokio::test]
async fn add_replicas_bulk_runs_starts_concurrently() {
use std::time::Duration;
const SLEEP: Duration = Duration::from_millis(120);
const N: u8 = 8;
struct SleepyDaemon {
stops: AtomicU64,
}
#[async_trait]
impl LifecycleDaemon for SleepyDaemon {
fn name(&self) -> &str {
"sleepy"
}
async fn on_start(self: Arc<Self>) -> Result<(), LifecycleError> {
tokio::time::sleep(SLEEP).await;
Ok(())
}
async fn on_stop(&self) {
self.stops.fetch_add(1, Ordering::AcqRel);
}
}
let mut group = LifecycleGroup::<SleepyDaemon>::spawn(1, [0u8; 32], |_idx| {
Arc::new(SleepyDaemon {
stops: AtomicU64::new(0),
})
})
.await
.expect("initial spawn");
let started = std::time::Instant::now();
group
.add_replicas(N, |_idx| {
Arc::new(SleepyDaemon {
stops: AtomicU64::new(0),
})
})
.await
.expect("add_replicas");
let elapsed = started.elapsed();
assert_eq!(group.replica_count(), 1 + N as usize);
assert!(
elapsed < SLEEP * 5 / 2,
"add_replicas took {elapsed:?} — likely serialized (serial bound {}ms)",
(SLEEP * N as u32).as_millis()
);
group.stop().await;
}
#[tokio::test]
async fn add_replicas_propagates_first_failure_and_leaves_group_unchanged() {
let mut group = LifecycleGroup::<CountingDaemon>::spawn(1, [0u8; 32], |_idx| {
Arc::new(CountingDaemon::new())
})
.await
.expect("spawn");
let mut call = 0u8;
let result = group
.add_replicas(3, |_idx| {
let d = Arc::new(CountingDaemon::new());
if call == 1 {
d.fail_start.store(true, Ordering::Release);
}
call += 1;
d
})
.await;
match result {
Ok(_) => panic!("expected StartFailed, got Ok"),
Err(LifecycleGroupError::StartFailed { index, .. }) => {
assert_eq!(index, 2);
}
Err(other) => panic!("expected StartFailed, got {other:?}"),
}
assert_eq!(group.replica_count(), 1);
group.stop().await;
}
#[tokio::test]
async fn add_replica_propagates_on_start_failure() {
let mut group = LifecycleGroup::<CountingDaemon>::spawn(1, [0u8; 32], |_idx| {
Arc::new(CountingDaemon::new())
})
.await
.expect("spawn");
let result = group
.add_replica(|_idx| {
let d = Arc::new(CountingDaemon::new());
d.fail_start.store(true, Ordering::Release);
d
})
.await;
match result {
Ok(_) => panic!("expected StartFailed, got Ok"),
Err(LifecycleGroupError::StartFailed { index, .. }) => {
assert_eq!(index, 1);
}
Err(other) => panic!("expected StartFailed, got {other:?}"),
}
assert_eq!(group.replica_count(), 1);
group.stop().await;
}
}