pub(crate) mod single_map;
#[cfg(feature = "metrics")]
use metrics::{Counter, Gauge, counter, gauge};
use left_right::{ReadHandle, WriteHandle};
use single_map::SingleMap;
use std::{
hash::Hash,
sync::mpsc::{Receiver, RecvTimeoutError},
time::{Duration, Instant},
};
use tracing::{debug, error};
use super::{ChiralMapConfig, FlushableOp, Operation};
pub(crate) struct ChiralMapInner<K, V>
where
K: Hash + Eq + Clone,
V: Clone,
{
writer: WriteHandle<SingleMap<K, V>, Operation<K, V>>,
reader: ReadHandle<SingleMap<K, V>>,
rx: Receiver<FlushableOp<K, V>>,
opcount: usize,
auto_flush_count: usize,
auto_flush_seconds: u64,
#[cfg(feature = "metrics")]
met_flushes: Counter,
#[cfg(feature = "metrics")]
met_opslog: Gauge,
}
impl<K, V> ChiralMapInner<K, V>
where
K: Hash + Eq + Clone,
V: Clone,
{
#[allow(clippy::type_complexity)]
pub(crate) fn new(config: &ChiralMapConfig, rx: Receiver<FlushableOp<K, V>>) -> Self {
let inner_map = SingleMap::<K, V>::new(config);
let (mut writer, reader) = left_right::new_from_empty(inner_map);
writer.publish();
writer.publish();
#[cfg(feature = "metrics")]
let met_flushes = counter!("chiral_map_flushes", "id" => config.id.clone());
#[cfg(feature = "metrics")]
let met_opslog = gauge!("chiral_map_queued_ops", "id" => config.id.clone());
Self {
writer,
reader,
rx,
opcount: 0,
auto_flush_count: config.auto_flush_count,
auto_flush_seconds: config.auto_flush_seconds,
#[cfg(feature = "metrics")]
met_flushes,
#[cfg(feature = "metrics")]
met_opslog,
}
}
pub(crate) fn op(&mut self, op: Operation<K, V>) {
self.writer.append(op);
self.opcount += 1;
#[cfg(feature = "metrics")]
self.met_opslog.set(self.opcount as f64);
if self.opcount >= self.auto_flush_count {
debug!("Auto flush triggered by opscount");
self.flush();
}
}
pub(crate) fn flush(&mut self) {
debug!("Flushing");
self.writer.flush();
self.opcount = 0;
#[cfg(feature = "metrics")]
self.met_opslog.set(self.opcount as f64);
#[cfg(feature = "metrics")]
self.met_flushes.increment(1);
}
pub(crate) fn reader(&self) -> ReadHandle<SingleMap<K, V>> {
self.reader.clone()
}
pub(crate) fn receiver_loop(mut self)
where
K: Hash + Eq + Clone,
V: Clone,
{
let dur = Duration::from_secs(self.auto_flush_seconds);
let mut interval = Instant::now();
loop {
match self
.rx
.recv_timeout(Duration::from_secs(self.auto_flush_seconds))
{
Ok(op) => match op {
FlushableOp::Flush(rec) => {
self.flush();
if let Some(rec) = rec {
let _ = rec.send(());
}
}
FlushableOp::Op(op) => self.op(op),
},
Err(e) => match e {
RecvTimeoutError::Disconnected => {
error!("ChiralMap receiver disconnected");
return;
}
RecvTimeoutError::Timeout => {
self.flush();
debug!("Auto flush triggered by time(out)");
}
},
}
if interval.elapsed() > dur {
self.flush();
debug!("Auto flush triggered by time");
interval = Instant::now();
}
}
}
}