use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use thiserror::Error;
use tokio::sync::RwLock;
use crate::error::OverlayError;
use crate::gossip::{GossipPool, PeerInfo};
#[derive(Debug, Clone)]
pub struct NodeCapacity {
pub cpu_cores: u32,
pub ram_mib: u64,
pub disk_mib: u64,
pub geo: Option<String>,
}
#[derive(Debug, Clone)]
pub struct EdgeCacheNode {
pub node_id: u64,
pub capacity: NodeCapacity,
pub enabled_at: SystemTime,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct EdgeCacheStats {
pub hits: u64,
pub misses: u64,
}
#[derive(Debug, Error)]
pub enum EdgeCacheError {
#[error("gossip label push failed: {0}")]
Gossip(String),
#[error("node {0} is not registered as edge-cache eligible")]
NodeNotFound(u64),
}
impl From<OverlayError> for EdgeCacheError {
fn from(err: OverlayError) -> Self {
EdgeCacheError::Gossip(err.to_string())
}
}
const LABEL_KEY_ENABLED: &str = "edge_cache";
const LABEL_KEY_CPU: &str = "edge_cache_cpu";
const LABEL_KEY_RAM_MIB: &str = "edge_cache_ram_mib";
const LABEL_KEY_DISK_MIB: &str = "edge_cache_disk_mib";
const LABEL_KEY_GEO: &str = "edge_cache_geo";
#[derive(Clone)]
pub struct EdgeCacheRegistry {
inner: Arc<RwLock<HashMap<u64, EdgeCacheNode>>>,
gossip: Option<Arc<GossipPool>>,
self_info: Arc<RwLock<Option<PeerInfo>>>,
}
impl std::fmt::Debug for EdgeCacheRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EdgeCacheRegistry")
.field("gossip_attached", &self.gossip.is_some())
.finish_non_exhaustive()
}
}
impl EdgeCacheRegistry {
#[must_use]
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
gossip: None,
self_info: Arc::new(RwLock::new(None)),
}
}
#[must_use]
pub fn with_gossip(gossip: Arc<GossipPool>, self_info: PeerInfo) -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
gossip: Some(gossip),
self_info: Arc::new(RwLock::new(Some(self_info))),
}
}
pub async fn enable(&self, node_id: u64, capacity: NodeCapacity) -> Result<(), EdgeCacheError> {
{
let mut guard = self.inner.write().await;
guard.insert(
node_id,
EdgeCacheNode {
node_id,
capacity: capacity.clone(),
enabled_at: SystemTime::now(),
},
);
}
if let Some(gossip) = self.gossip.as_ref() {
let mut info_guard = self.self_info.write().await;
if let Some(info) = info_guard.as_mut() {
info.labels
.insert(LABEL_KEY_ENABLED.to_string(), "true".to_string());
info.labels
.insert(LABEL_KEY_CPU.to_string(), capacity.cpu_cores.to_string());
info.labels
.insert(LABEL_KEY_RAM_MIB.to_string(), capacity.ram_mib.to_string());
info.labels.insert(
LABEL_KEY_DISK_MIB.to_string(),
capacity.disk_mib.to_string(),
);
if let Some(geo) = capacity.geo.as_ref() {
info.labels.insert(LABEL_KEY_GEO.to_string(), geo.clone());
} else {
info.labels.remove(LABEL_KEY_GEO);
}
gossip.announce_self(info).await?;
}
}
Ok(())
}
pub async fn disable(&self, node_id: u64) -> Result<(), EdgeCacheError> {
{
let mut guard = self.inner.write().await;
if guard.remove(&node_id).is_none() {
return Err(EdgeCacheError::NodeNotFound(node_id));
}
}
if let Some(gossip) = self.gossip.as_ref() {
let mut info_guard = self.self_info.write().await;
if let Some(info) = info_guard.as_mut() {
info.labels.remove(LABEL_KEY_ENABLED);
info.labels.remove(LABEL_KEY_CPU);
info.labels.remove(LABEL_KEY_RAM_MIB);
info.labels.remove(LABEL_KEY_DISK_MIB);
info.labels.remove(LABEL_KEY_GEO);
gossip.announce_self(info).await?;
}
}
Ok(())
}
pub async fn list_eligible(&self) -> Vec<EdgeCacheNode> {
let guard = self.inner.read().await;
guard.values().cloned().collect()
}
pub async fn is_enabled(&self, node_id: u64) -> bool {
let guard = self.inner.read().await;
guard.contains_key(&node_id)
}
#[allow(clippy::unused_async)]
pub async fn stats(&self, _node_id: u64) -> EdgeCacheStats {
EdgeCacheStats::default()
}
}
impl Default for EdgeCacheRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn cap(cpu: u32, ram: u64, disk: u64, geo: Option<&str>) -> NodeCapacity {
NodeCapacity {
cpu_cores: cpu,
ram_mib: ram,
disk_mib: disk,
geo: geo.map(str::to_string),
}
}
#[tokio::test]
async fn enable_then_disable_round_trip() {
let reg = EdgeCacheRegistry::new();
assert!(reg.list_eligible().await.is_empty());
assert!(!reg.is_enabled(7).await);
reg.enable(7, cap(4, 1024, 8192, Some("us-east-1")))
.await
.expect("enable");
assert!(reg.is_enabled(7).await);
let listed = reg.list_eligible().await;
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].node_id, 7);
assert_eq!(listed[0].capacity.cpu_cores, 4);
assert_eq!(listed[0].capacity.geo.as_deref(), Some("us-east-1"));
reg.disable(7).await.expect("disable");
assert!(!reg.is_enabled(7).await);
assert!(reg.list_eligible().await.is_empty());
}
#[tokio::test]
async fn disable_unknown_node_errors() {
let reg = EdgeCacheRegistry::new();
let err = reg.disable(99).await.expect_err("should fail");
match err {
EdgeCacheError::NodeNotFound(99) => {}
other => panic!("unexpected error: {other:?}"),
}
}
#[tokio::test]
async fn stats_returns_placeholder_zero() {
let reg = EdgeCacheRegistry::new();
reg.enable(3, cap(1, 64, 256, None)).await.expect("enable");
let s = reg.stats(3).await;
assert_eq!(s.hits, 0);
assert_eq!(s.misses, 0);
let s2 = reg.stats(999).await;
assert_eq!(s2.hits, 0);
assert_eq!(s2.misses, 0);
}
#[tokio::test]
async fn enable_overwrites_capacity() {
let reg = EdgeCacheRegistry::new();
reg.enable(5, cap(2, 128, 512, None)).await.expect("enable");
reg.enable(5, cap(8, 4096, 16_384, Some("eu-west-1")))
.await
.expect("re-enable");
let listed = reg.list_eligible().await;
assert_eq!(listed.len(), 1);
assert_eq!(listed[0].capacity.cpu_cores, 8);
assert_eq!(listed[0].capacity.disk_mib, 16_384);
assert_eq!(listed[0].capacity.geo.as_deref(), Some("eu-west-1"));
}
}