chiralmap 0.1.2

Left-Right map using IndexMap
Documentation
//! Inner type used to access the memory writer / map

pub(crate) mod single_map;

#[cfg(feature = "metrics")]
use metrics::{Counter, Gauge, counter, gauge};

use left_right::{ReadHandle, WriteHandle};
use single_map::SingleMap;
use std::{
    hash::Hash,
    sync::mpsc::{Receiver, RecvTimeoutError},
    time::{Duration, Instant},
};
use tracing::{debug, error};

use super::{ChiralMapConfig, FlushableOp, Operation};

/// This inner struct is maintained unique referenced and cannot be cloned,
/// because it holds on the chiral writer.
/// Here is also where the write operations get forwarded.
pub(crate) struct ChiralMapInner<K, V>
where
    K: Hash + Eq + Clone,
    V: Clone,
{
    writer: WriteHandle<SingleMap<K, V>, Operation<K, V>>,
    reader: ReadHandle<SingleMap<K, V>>,
    rx: Receiver<FlushableOp<K, V>>,
    opcount: usize,
    auto_flush_count: usize,
    auto_flush_seconds: u64,

    #[cfg(feature = "metrics")]
    met_flushes: Counter,
    #[cfg(feature = "metrics")]
    met_opslog: Gauge,
}

impl<K, V> ChiralMapInner<K, V>
where
    K: Hash + Eq + Clone,
    V: Clone,
{
    #[allow(clippy::type_complexity)]
    pub(crate) fn new(config: &ChiralMapConfig, rx: Receiver<FlushableOp<K, V>>) -> Self {
        let inner_map = SingleMap::<K, V>::new(config);
        let (mut writer, reader) = left_right::new_from_empty(inner_map);
        // This will trigger the first sync and swap the two maps around twice,
        // entrering the situation when (even if empty) they are rotated with
        // the readers too, and the Absorb::sync_with would be already called.
        writer.publish();
        writer.publish();

        #[cfg(feature = "metrics")]
        let met_flushes = counter!("chiral_map_flushes", "id" => config.id.clone());
        #[cfg(feature = "metrics")]
        let met_opslog = gauge!("chiral_map_queued_ops", "id" => config.id.clone());

        Self {
            writer,
            reader,
            rx,
            opcount: 0,
            auto_flush_count: config.auto_flush_count,
            auto_flush_seconds: config.auto_flush_seconds,

            #[cfg(feature = "metrics")]
            met_flushes,
            #[cfg(feature = "metrics")]
            met_opslog,
        }
    }

    pub(crate) fn op(&mut self, op: Operation<K, V>) {
        self.writer.append(op);
        self.opcount += 1;

        #[cfg(feature = "metrics")]
        self.met_opslog.set(self.opcount as f64);

        if self.opcount >= self.auto_flush_count {
            debug!("Auto flush triggered by opscount");
            self.flush();
        }
    }

    pub(crate) fn flush(&mut self) {
        debug!("Flushing");
        self.writer.flush();
        self.opcount = 0;

        #[cfg(feature = "metrics")]
        self.met_opslog.set(self.opcount as f64);
        #[cfg(feature = "metrics")]
        self.met_flushes.increment(1);
    }

    pub(crate) fn reader(&self) -> ReadHandle<SingleMap<K, V>> {
        self.reader.clone()
    }

    pub(crate) fn receiver_loop(mut self)
    where
        K: Hash + Eq + Clone,
        V: Clone,
    {
        let dur = Duration::from_secs(self.auto_flush_seconds);
        let mut interval = Instant::now();
        loop {
            match self
                .rx
                .recv_timeout(Duration::from_secs(self.auto_flush_seconds))
            {
                Ok(op) => match op {
                    FlushableOp::Flush(rec) => {
                        self.flush();
                        if let Some(rec) = rec {
                            let _ = rec.send(());
                        }
                    }
                    FlushableOp::Op(op) => self.op(op),
                },
                Err(e) => match e {
                    RecvTimeoutError::Disconnected => {
                        error!("ChiralMap receiver disconnected");
                        return;
                    }
                    RecvTimeoutError::Timeout => {
                        self.flush();
                        debug!("Auto flush triggered by time(out)");
                    }
                },
            }
            if interval.elapsed() > dur {
                self.flush();
                debug!("Auto flush triggered by time");
                interval = Instant::now();
            }
        }
    }
}