use std::sync::Arc;
use dashmap::DashMap;
use hyphae::{
Cell, CellImmutable, MaterializeDefinite, Signal, SubscriptionGuard, TapExt, Watchable,
};
use log::info;
use myko::{
entities::server::{GetAllServers, GetPeerServers, Server, ServerId},
server::CellServerCtx,
};
mod peer_connection_handle;
use peer_connection_handle::{PeerConnectionHandle, PeerState};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PeerStatus {
pub peer_id: String,
pub is_connected: bool,
pub is_alive: bool,
pub latency_ms: Option<u64>,
pub last_seen: Option<String>,
}
pub struct PeerRegistry {
_peers_guard: SubscriptionGuard,
_self_advertise_guard: SubscriptionGuard,
_connections: Arc<DashMap<ServerId, PeerConnectionHandle>>,
_remove_guards: Arc<DashMap<ServerId, SubscriptionGuard>>,
}
#[derive(Debug, Clone)]
pub struct PeerRegistryConfig {
pub address: String,
pub port: u16,
pub version: String,
}
impl PeerRegistry {
fn build_local_server(config: &PeerRegistryConfig, host_id: &ServerId) -> Server {
Server {
address: config.address.clone(),
id: host_id.clone(),
port: config.port,
started_at: chrono::Utc::now().to_rfc3339(),
version: config.version.clone(),
}
}
fn spawn_self_advertise_guard(
ctx: CellServerCtx,
local_server: Server,
self_host_id: ServerId,
) -> SubscriptionGuard {
let all_servers = ctx
.query_map(GetAllServers {}, ctx.new_server_transaction())
.items();
all_servers.subscribe(move |signal| {
if let Signal::Value(servers) = signal {
let has_self = servers.iter().any(|s| s.id == self_host_id);
if !has_self {
log::info!(
"Local server {} missing from GetAllServers; re-advertising",
self_host_id
);
if let Err(e) = ctx.set(&local_server) {
log::error!("Failed to re-advertise local server: {e}");
}
}
}
})
}
fn reconcile_peer_snapshot<T>(
peers: &T,
host_id: &ServerId,
local_address: &str,
local_port: u16,
ctx: &CellServerCtx,
connections: &Arc<DashMap<ServerId, PeerConnectionHandle>>,
remove_guards: &Arc<DashMap<ServerId, SubscriptionGuard>>,
) where
T: AsRef<[Arc<Server>]>,
{
log::info!(
"Current Peers: {}",
peers
.as_ref()
.iter()
.map(|s| format!("{}/{}:{}", s.id, s.address, s.port))
.collect::<Vec<String>>()
.join(", ")
);
for server in peers.as_ref() {
if server.id != *host_id && server.address == local_address && server.port == local_port
{
log::warn!(
"Deleting stale entries: {}:{}:{}",
server.id,
server.address,
server.port
);
ctx.unregister_peer_client(server.id.as_ref());
if let Err(e) = ctx.del(server.as_ref()) {
log::error!("Failed to delete stale server entry: {e}");
}
continue;
}
if connections.contains_key(&server.id) {
continue;
}
let handle = PeerConnectionHandle::new(server.clone());
let remove_connection_handles = connections.clone();
let remove_state_guards = remove_guards.clone();
let remove_server = server.clone();
let remove_ctx = ctx.clone();
let state_guard = handle.signal_state.subscribe(move |state| {
if let Signal::Value(v) = state
&& v.as_ref() == &PeerState::Delete
{
log::warn!(
"Deleting: {}:{}:{}",
remove_server.id,
remove_server.address,
remove_server.port
);
remove_connection_handles.remove(&remove_server.id);
remove_state_guards.remove(&remove_server.id);
remove_ctx.unregister_peer_client(remove_server.id.as_ref());
if let Err(e) = remove_ctx.del(remove_server.as_ref()) {
log::error!("Failed to delete peer server: {e}");
}
}
});
let server_id = server.id.clone();
ctx.register_peer_client(server_id.clone(), handle.client());
connections.insert(server_id.clone(), handle);
remove_guards.insert(server_id, state_guard);
}
}
fn spawn_peer_reconcile_guard(
peer_servers: Cell<Vec<Arc<Server>>, CellImmutable>,
host_id: ServerId,
local_address: String,
local_port: u16,
ctx: CellServerCtx,
connections: Arc<DashMap<ServerId, PeerConnectionHandle>>,
remove_guards: Arc<DashMap<ServerId, SubscriptionGuard>>,
) -> SubscriptionGuard {
peer_servers
.tap(move |peers| {
Self::reconcile_peer_snapshot(
peers,
&host_id,
&local_address,
local_port,
&ctx,
&connections,
&remove_guards,
);
})
.materialize()
.subscribe(|_| {})
}
pub fn new(ctx: CellServerCtx, config: PeerRegistryConfig) -> Self {
let server_req = ctx.new_server_transaction();
let connections = Arc::new(DashMap::new());
let remove_guards = Arc::new(DashMap::new());
let peer_servers = ctx.query_map(GetPeerServers {}, server_req).items();
let host_id = ServerId(ctx.host_id.to_string().into());
let server = Self::build_local_server(&config, &host_id);
let self_advertise_guard =
Self::spawn_self_advertise_guard(ctx.clone(), server.clone(), host_id.clone());
let peer_sub = Self::spawn_peer_reconcile_guard(
peer_servers,
host_id.clone(),
config.address.clone(),
config.port,
ctx.clone(),
connections.clone(),
remove_guards.clone(),
);
log::info!(
"Publishing local server bootstrap advert: {}:{}:{}",
server.id,
server.address,
server.port
);
if let Err(e) = ctx.set(&server) {
log::error!("Failed to publish local server bootstrap advert: {e}");
}
Self {
_peers_guard: peer_sub,
_self_advertise_guard: self_advertise_guard,
_connections: connections,
_remove_guards: remove_guards,
}
}
pub fn shutdown(&self) {
info!("PeerRegistry shutting down");
}
}
impl Drop for PeerRegistry {
fn drop(&mut self) {
log::warn!("Dropping Peer Registry");
self.shutdown();
}
}