use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::{Mutex, RwLock};
use tokio::sync::Mutex as AsyncMutex;
use super::daemon::AggregatorDaemon;
use crate::adapter::net::behavior::lifecycle::{
HealthMonitor, LifecycleDaemon, LifecycleGroup, ReplicaHealth,
};
use crate::adapter::net::compute::PlacementDecision;
pub struct AggregatorGroupEntry {
pub name: String,
pub group_seed: [u8; 32],
group: Arc<AsyncMutex<Option<LifecycleGroup<AggregatorDaemon>>>>,
monitor: Mutex<Option<Arc<HealthMonitor<AggregatorDaemon>>>>,
}
impl AggregatorGroupEntry {
pub async fn replica_count(&self) -> usize {
match &*self.group.lock().await {
Some(g) => g.replica_count(),
None => 0,
}
}
pub async fn replicas(&self) -> Vec<Arc<AggregatorDaemon>> {
match &*self.group.lock().await {
Some(g) => g.replicas(),
None => Vec::new(),
}
}
pub async fn placements(&self) -> Vec<PlacementDecision> {
match &*self.group.lock().await {
Some(g) => g.placements().to_vec(),
None => Vec::new(),
}
}
pub async fn health(&self) -> Vec<ReplicaHealth> {
match &*self.group.lock().await {
Some(g) => g.health().await,
None => Vec::new(),
}
}
pub fn monitor(&self) -> Option<Arc<HealthMonitor<AggregatorDaemon>>> {
self.monitor.lock().clone()
}
pub async fn snapshot(&self) -> EntrySnapshot {
let (replicas, placements) = {
let guard = self.group.lock().await;
match guard.as_ref() {
Some(g) => (g.replicas(), g.placements().to_vec()),
None => return EntrySnapshot::default(),
}
};
let healths =
futures::future::join_all(replicas.iter().map(|r| async move { r.health().await }))
.await;
EntrySnapshot {
replicas,
placements,
healths,
}
}
}
#[derive(Default)]
pub struct EntrySnapshot {
pub replicas: Vec<Arc<AggregatorDaemon>>,
pub placements: Vec<PlacementDecision>,
pub healths: Vec<ReplicaHealth>,
}
pub struct AggregatorRegistry {
groups: RwLock<HashMap<String, Arc<AggregatorGroupEntry>>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AggregatorRegistryError {
DuplicateName(String),
NotFound(String),
ScaleFailed(String),
}
impl std::fmt::Display for AggregatorRegistryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::DuplicateName(n) => write!(f, "aggregator group already registered: {n}"),
Self::NotFound(n) => write!(f, "aggregator group not found: {n}"),
Self::ScaleFailed(d) => write!(f, "aggregator group scale failed: {d}"),
}
}
}
impl std::error::Error for AggregatorRegistryError {}
impl AggregatorRegistry {
pub fn new() -> Self {
Self {
groups: RwLock::new(HashMap::new()),
}
}
pub fn register(
&self,
name: impl Into<String>,
group: LifecycleGroup<AggregatorDaemon>,
) -> Result<Arc<AggregatorGroupEntry>, AggregatorRegistryError> {
let name = name.into();
let group_seed = *group.group_seed();
let mut groups = self.groups.write();
if groups.contains_key(&name) {
return Err(AggregatorRegistryError::DuplicateName(name));
}
let entry = Arc::new(AggregatorGroupEntry {
name: name.clone(),
group_seed,
group: Arc::new(AsyncMutex::new(Some(group))),
monitor: Mutex::new(None),
});
groups.insert(name, entry.clone());
Ok(entry)
}
pub fn register_with_monitor<F>(
&self,
name: impl Into<String>,
group: LifecycleGroup<AggregatorDaemon>,
factory: F,
monitor_interval: std::time::Duration,
) -> Result<Arc<AggregatorGroupEntry>, AggregatorRegistryError>
where
F: FnMut(u8) -> Arc<AggregatorDaemon> + Send + 'static,
{
let name = name.into();
let group_seed = *group.group_seed();
let mut groups = self.groups.write();
if groups.contains_key(&name) {
return Err(AggregatorRegistryError::DuplicateName(name));
}
let group_arc = Arc::new(AsyncMutex::new(Some(group)));
let monitor = Arc::new(HealthMonitor::spawn(
group_arc.clone(),
factory,
monitor_interval,
));
let entry = Arc::new(AggregatorGroupEntry {
name: name.clone(),
group_seed,
group: group_arc,
monitor: Mutex::new(Some(monitor)),
});
groups.insert(name, entry.clone());
Ok(entry)
}
pub fn get(&self, name: &str) -> Option<Arc<AggregatorGroupEntry>> {
self.groups.read().get(name).cloned()
}
pub fn names(&self) -> Vec<String> {
let mut names: Vec<String> = self.groups.read().keys().cloned().collect();
names.sort();
names
}
pub fn entries(&self) -> Vec<Arc<AggregatorGroupEntry>> {
let mut entries: Vec<Arc<AggregatorGroupEntry>> =
self.groups.read().values().cloned().collect();
entries.sort_by(|a, b| a.name.cmp(&b.name));
entries
}
pub fn len(&self) -> usize {
self.groups.read().len()
}
pub fn is_empty(&self) -> bool {
self.groups.read().is_empty()
}
pub async fn scale_group<F>(
&self,
name: &str,
target_replica_count: u8,
mut factory: F,
) -> Result<Arc<AggregatorGroupEntry>, AggregatorRegistryError>
where
F: FnMut(u8) -> Arc<AggregatorDaemon> + Send,
{
if target_replica_count == 0 {
return Err(AggregatorRegistryError::ScaleFailed(
"target_replica_count must be > 0".into(),
));
}
let entry = self
.get(name)
.ok_or_else(|| AggregatorRegistryError::NotFound(name.to_string()))?;
let mut group_guard = entry.group.lock().await;
let group = group_guard
.as_mut()
.ok_or_else(|| AggregatorRegistryError::NotFound(name.to_string()))?;
let current = group.replica_count();
let target = target_replica_count as usize;
if target > current {
let delta = (target - current) as u8;
group
.add_replicas(delta, &mut factory)
.await
.map_err(|e| AggregatorRegistryError::ScaleFailed(format!("add_replicas: {e}")))?;
} else if target < current {
for _ in 0..(current - target) {
group.remove_last().await.map_err(|e| {
AggregatorRegistryError::ScaleFailed(format!("remove_last: {e}"))
})?;
}
}
drop(group_guard);
Ok(entry)
}
pub async fn unregister(
&self,
name: &str,
) -> Result<LifecycleGroup<AggregatorDaemon>, AggregatorRegistryError> {
let entry = {
let mut groups = self.groups.write();
groups
.remove(name)
.ok_or_else(|| AggregatorRegistryError::NotFound(name.to_string()))?
};
let monitor = entry.monitor.lock().take();
if let Some(m) = monitor {
m.stop().await;
}
let group = entry
.group
.lock()
.await
.take()
.ok_or_else(|| AggregatorRegistryError::NotFound(name.to_string()))?;
Ok(group)
}
}
impl Default for AggregatorRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::behavior::aggregator::{AggregatorConfig, AggregatorDaemon};
use crate::adapter::net::behavior::fold::capability::CapabilityFold;
use crate::adapter::net::behavior::fold::FoldKind;
use crate::adapter::net::behavior::lifecycle::LifecycleGroup;
use crate::adapter::net::identity::EntityKeypair;
use crate::adapter::net::{MeshNode, MeshNodeConfig, SubnetId};
use std::net::SocketAddr;
use std::time::Duration;
async fn build_mesh() -> Arc<MeshNode> {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
Arc::new(
MeshNode::new(EntityKeypair::generate(), cfg)
.await
.expect("MeshNode::new"),
)
}
async fn spawn_group_2(interval_ms: u64) -> LifecycleGroup<AggregatorDaemon> {
let mesh = build_mesh().await;
let cfg = AggregatorConfig::new(SubnetId::GLOBAL)
.with_fold_kind(CapabilityFold::KIND_ID)
.with_interval(Duration::from_millis(interval_ms));
let cfg_clone = cfg.clone();
let mesh_clone = mesh.clone();
LifecycleGroup::<AggregatorDaemon>::spawn(2, [0xCDu8; 32], move |_idx| {
Arc::new(AggregatorDaemon::new(cfg_clone.clone(), mesh_clone.clone()).expect("new"))
})
.await
.expect("spawn group")
}
#[tokio::test]
async fn register_then_accessors_reflect_live_group_state() {
let group = spawn_group_2(50).await;
let registry = AggregatorRegistry::new();
let entry = registry.register("a", group).expect("register");
assert_eq!(entry.name, "a");
assert_eq!(entry.replica_count().await, 2);
assert_eq!(entry.replicas().await.len(), 2);
assert!(entry.placements().await.is_empty());
let health = entry.health().await;
assert_eq!(health.len(), 2);
let g = registry.unregister("a").await.expect("unregister");
g.stop().await;
}
#[tokio::test]
async fn register_rejects_duplicate_names() {
let registry = AggregatorRegistry::new();
registry
.register("dup", spawn_group_2(50).await)
.expect("first register");
match registry.register("dup", spawn_group_2(50).await) {
Err(AggregatorRegistryError::DuplicateName(n)) => assert_eq!(n, "dup"),
Err(other) => panic!("expected DuplicateName, got {other:?}"),
Ok(_) => panic!("expected DuplicateName, got Ok"),
}
let g = registry.unregister("dup").await.expect("unregister");
g.stop().await;
}
#[tokio::test]
async fn unregister_returns_group_and_removes_entry() {
let registry = AggregatorRegistry::new();
registry
.register("a", spawn_group_2(50).await)
.expect("register");
assert_eq!(registry.len(), 1);
let group = registry.unregister("a").await.expect("unregister");
assert_eq!(group.replica_count(), 2);
group.stop().await;
assert_eq!(registry.len(), 0);
match registry.unregister("a").await {
Err(AggregatorRegistryError::NotFound(n)) => assert_eq!(n, "a"),
Err(other) => panic!("expected NotFound, got {other:?}"),
Ok(_) => panic!("expected NotFound, got Ok"),
}
}
#[tokio::test]
async fn entries_are_sorted_by_name_for_deterministic_cli_output() {
let registry = AggregatorRegistry::new();
registry
.register("zulu", spawn_group_2(50).await)
.expect("register zulu");
registry
.register("alpha", spawn_group_2(50).await)
.expect("register alpha");
registry
.register("mike", spawn_group_2(50).await)
.expect("register mike");
let names: Vec<String> = registry.entries().iter().map(|e| e.name.clone()).collect();
assert_eq!(names, vec!["alpha", "mike", "zulu"]);
for n in ["alpha", "mike", "zulu"] {
let g = registry.unregister(n).await.expect("unregister");
g.stop().await;
}
}
#[tokio::test]
async fn register_with_monitor_stops_monitor_on_unregister() {
let registry = AggregatorRegistry::new();
let mesh = build_mesh().await;
let cfg = AggregatorConfig::new(SubnetId::GLOBAL)
.with_fold_kind(CapabilityFold::KIND_ID)
.with_interval(Duration::from_millis(50));
let factory_cfg = cfg.clone();
let factory_mesh = mesh.clone();
let group = LifecycleGroup::<AggregatorDaemon>::spawn(2, [0u8; 32], {
let cfg = cfg.clone();
let mesh = mesh.clone();
move |_idx| Arc::new(AggregatorDaemon::new(cfg.clone(), mesh.clone()).expect("new"))
})
.await
.expect("spawn");
let entry = registry
.register_with_monitor(
"monitored",
group,
move |_idx| {
Arc::new(
AggregatorDaemon::new(factory_cfg.clone(), factory_mesh.clone())
.expect("new"),
)
},
Duration::from_millis(20),
)
.expect("register_with_monitor");
assert!(entry.monitor().is_some());
tokio::time::sleep(Duration::from_millis(80)).await;
let monitor_handle = entry.monitor().expect("monitor");
let ticks_before_stop = monitor_handle
.stats()
.ticks
.load(std::sync::atomic::Ordering::Acquire);
assert!(
ticks_before_stop >= 1,
"monitor should have ticked at least once"
);
let group = registry.unregister("monitored").await.expect("unregister");
tokio::time::sleep(Duration::from_millis(80)).await;
let ticks_after_stop = monitor_handle
.stats()
.ticks
.load(std::sync::atomic::Ordering::Acquire);
assert_eq!(
ticks_before_stop, ticks_after_stop,
"monitor must stop ticking after unregister"
);
group.stop().await;
}
}