use std::sync::Arc;
use dashmap::DashMap;
use log::warn;
use myko::{
client::MykoClient,
event::MEvent,
server::{PersistError, PersistHealth, Persister},
};
pub struct PeerPersister {
peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>,
health: Arc<PersistHealth>,
}
impl PeerPersister {
pub fn new(peer_clients: Arc<DashMap<Arc<str>, Arc<MykoClient>>>) -> Self {
Self {
peer_clients,
health: Arc::new(PersistHealth::default()),
}
}
pub fn peer_count(&self) -> usize {
self.peer_clients.len()
}
}
impl Persister for PeerPersister {
fn persist(&self, event: MEvent) -> Result<(), PersistError> {
let entity_type = event.item_type.clone();
self.health.record_enqueue();
if self.peer_clients.is_empty() {
self.health.record_success();
return Ok(());
}
let peer_count = self.peer_clients.len();
let mut error_count = 0usize;
for entry in self.peer_clients.iter() {
let client = entry.value();
if let Err(e) = client.send_event(event.clone()) {
warn!(
"PeerPersister: broadcast failed for entity={} peer={}: {}",
entity_type,
entry.key(),
e
);
error_count += 1;
}
}
if error_count == peer_count {
let msg = format!(
"PeerPersister: broadcast failed for all {} peer(s)",
peer_count
);
self.health.record_error(msg.clone());
return Err(PersistError {
entity_type,
message: msg,
});
}
self.health.record_success();
Ok(())
}
fn health(&self) -> Arc<PersistHealth> {
self.health.clone()
}
fn startup_healthcheck(&self) -> Result<(), String> {
Ok(())
}
}