use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use nodedb_types::DatabaseId;
use tracing::info;
use nodedb_cluster::mirror::CrossClusterLink;
struct MirrorLinkEntry {
link: Arc<CrossClusterLink>,
last_received_ms: AtomicU64,
}
impl MirrorLinkEntry {
fn new(link: Arc<CrossClusterLink>) -> Self {
Self {
link,
last_received_ms: AtomicU64::new(now_ms()),
}
}
}
pub struct MirrorLinkRegistry {
entries: RwLock<HashMap<u64, Arc<MirrorLinkEntry>>>,
}
impl Default for MirrorLinkRegistry {
fn default() -> Self {
Self::new()
}
}
impl MirrorLinkRegistry {
pub fn new() -> Self {
Self {
entries: RwLock::new(HashMap::new()),
}
}
pub fn insert(&self, db_id: DatabaseId, link: Arc<CrossClusterLink>) {
let mut w = self.entries.write().unwrap_or_else(|p| p.into_inner());
w.insert(db_id.as_u64(), Arc::new(MirrorLinkEntry::new(link)));
}
pub fn remove(&self, db_id: DatabaseId) -> Option<Arc<CrossClusterLink>> {
let mut w = self.entries.write().unwrap_or_else(|p| p.into_inner());
w.remove(&db_id.as_u64()).map(|e| e.link.clone())
}
pub fn get(&self, db_id: DatabaseId) -> Option<Arc<CrossClusterLink>> {
let r = self.entries.read().unwrap_or_else(|p| p.into_inner());
r.get(&db_id.as_u64()).map(|e| e.link.clone())
}
pub fn all_ids(&self) -> Vec<DatabaseId> {
let r = self.entries.read().unwrap_or_else(|p| p.into_inner());
r.keys().map(|&id| DatabaseId::new(id)).collect()
}
pub fn record_received(&self, db_id: DatabaseId) {
let r = self.entries.read().unwrap_or_else(|p| p.into_inner());
if let Some(entry) = r.get(&db_id.as_u64()) {
entry.last_received_ms.store(now_ms(), Ordering::Relaxed);
}
}
pub fn last_received_ms(&self, db_id: DatabaseId) -> Option<u64> {
let r = self.entries.read().unwrap_or_else(|p| p.into_inner());
r.get(&db_id.as_u64())
.map(|e| e.last_received_ms.load(Ordering::Relaxed))
}
pub fn teardown_link(&self, db_id: DatabaseId) {
if let Some(link) = self.remove(db_id) {
info!(
db_id = db_id.as_u64(),
source_cluster = link.source_cluster_id(),
"mirror link teardown: removing cross-cluster observer link"
);
drop(link);
}
}
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64
}