chiralmap 0.1.2

Left-Right map using IndexMap
Documentation
pub(crate) mod inner;

use builder_pattern::Builder;
use inner::{ChiralMapInner, single_map::SingleMap};
use left_right::ReadHandle;
use std::{
    hash::Hash,
    sync::{
        Arc, Mutex, MutexGuard,
        mpsc::{Sender, channel},
    },
    thread::JoinHandle,
};

pub(crate) enum FlushableOp<K, V> {
    Flush(Option<Sender<()>>),
    Op(Operation<K, V>),
}

impl<K, V> From<Operation<K, V>> for FlushableOp<K, V> {
    fn from(value: Operation<K, V>) -> Self {
        FlushableOp::Op(value)
    }
}

#[derive(Clone)]
pub(crate) enum Operation<K, V> {
    Insert(K, V),
    Delete(K),
}

/// Configuration for the [ChiralMap], optional.
#[derive(Builder)]
pub struct ChiralMapConfig {
    #[cfg(feature = "metrics")]
    #[into]
    #[default(String::new())]
    pub id: String,
    #[default(0)]
    pub initial_capacity: usize,
    #[default(10000)]
    pub auto_flush_count: usize,
    #[default(10)]
    pub auto_flush_seconds: u64,
    #[default(1000)]
    pub reader_pool_size: usize,
}

/// A reader handle to access lock-free read operations on the map.
///
/// Acquired via [ChiralMap::read_handle].
///
/// Use [ChiralReader::get] to read a value from the map.
pub struct ChiralReader<K, V>(ReadHandle<SingleMap<K, V>>)
where
    K: Hash + Eq + Clone,
    V: Clone;

/// A read handle in case the user requires a stable one instead of the one
/// acquired by the pool.
impl<K, V> ChiralReader<K, V>
where
    K: Hash + Eq + Clone,
    V: Clone,
{
    pub fn get(&self, k: &K) -> Option<V> {
        self.0.enter()?.get(k).cloned()
    }
}

/// A map that has fast read access without lock (when using
/// [ChiralMap::read_handle]).
///
/// Backed up by a [`left_right`](https://docs.rs/left-right) algorythm.
///
/// The internal map is an [`indexmap`](https://docs.rs/indexmap).
///
/// This map will spawn an internal thread when created, using [std::thread].
/// This thread will be aborted when the last copy of this instance is dropped.
///
/// In the thread, a channel is used to pipe in the write operations, while for
/// reads, either normally uses a pool of readers, or if performance is more
/// needed, can lend off a read handle using [ChiralMap::read_handle].
///
/// # Meta structure
///
/// The chiral map has a pool of readers that is used to access when a
/// [ChiralMap::get] is called. This method is accessing the pool via locks and
/// it is not the best performant way to read values from the map.
/// The default size of the pool is 1000. See [ChiralMapConfig] and
/// [ChiralMap::with_config] to have a different size.
///
/// However, because it's holding a pool of readers, the chiral map is [Sync].
/// This allows the map to be shared across some frameworks, such as axum's
/// Extensions.
///
/// This map struct is actually only holding references to the internal
/// structures, which allows it to be [Clone]. It is encouraged to clone this
/// struct to have multiple accesses to the map itself so it can be passed
/// across threads, being also [Send].
///
/// For accessing reads lock-free, get a copy of a read handle first using
/// [ChiralMap::read_handle] instead.
///
/// For configuring the behavior of the map use the [ChiralMap::with_config]
/// and [ChiralMapConfig].
///
/// # Keys and Values
///
/// The types of keys and values have an additional requirement from the map
/// classic traits. They also need to be Sync, Send and 'static. This is needed
/// for them to be passed across channels and handles by the internal write
/// thread.
///
/// # Eventual consistency
///
/// The map is eventually consistent, according to the behavior of the
/// [`left_right`](https://docs.rs/left-right) crate. A flush operation can be
/// issued directly with [ChiralMap::flush].
///
/// In addition to that, the map also has an auto flush feature. The auto flush
/// is executed when a number of queued writes is reached or when some amount
/// of seconds is elapsed, whichever comes first. This is configured in the
/// [ChiralMapConfig]. The default values are for 10.000 operations and 10
/// seconds.
///
/// A call to flush when there are no pending write operations is a no-op.
/// The periodic flush of every 10 seconds is also a no-op when nothing is
/// beng written to the map. This is ensured by the
/// [`left_right`](https://docs.rs/left-right) crate.
///
/// # Sync (pooled) reads
///
/// This is usually when a handle cannot be kept reliably in the same local
/// thread. Usually this happens with web frameworks, and [ChiralMap::get] is
/// the only best available solution for reading data.
/// The reads are still concurrent as long as there are avilable handles in the
/// pool.
///
/// ```
/// use chiralmap::ChiralMap;
///
/// let map = ChiralMap::<String, String>::default();
/// map.insert("A".into(), "B".into()).unwrap();
/// map.flush();
///
/// let v = map.get(&"A".to_string()).unwrap();
/// println!("{v}"); // "B"
/// ```
///
/// # Lock-free reads
///
/// When it is possible to keep a handle within a thread and not get one each
/// read operation, then it is recommended to read from the map using
/// [ChiralMap::read_handle]. This clones and returns a new handle at every
/// call. It is not recommended to throw away a handle just for one read
/// operation, instead use [ChiralMap::get], which will pull from an internal
/// pool of handles.
///
/// ```
/// use chiralmap::ChiralMap;
///
/// let map = ChiralMap::<String, String>::default();
/// map.insert("A".into(), "B".into()).unwrap();
/// map.flush();
///
/// let h = map.read_handle();
/// let v = h.get(&"A".to_string()).unwrap();
/// println!("{v}"); // "B"
/// ```
#[derive(Clone)]
pub struct ChiralMap<K, V>
where
    K: Hash + Eq + Clone,
    V: Clone,
{
    sender: Sender<FlushableOp<K, V>>,
    readers: ReaderPool<SingleMap<K, V>>,
    _handle: Arc<JoinHandle<()>>,
}

impl<K, V> ChiralMap<K, V>
where
    K: Hash + Eq + Clone,
    V: Clone,
{
    pub fn with_config(config: ChiralMapConfig) -> Self
    where
        K: Hash + Eq + Clone + Send + Sync + 'static,
        V: Clone + Send + Sync + 'static,
    {
        let (tx, rx) = channel::<FlushableOp<K, V>>();
        let inner = ChiralMapInner::<K, V>::new(&config, rx);
        let reader = inner.reader();
        let readers = ReaderPool::new(reader.clone(), config.reader_pool_size);

        // Creation of the receiver. The inner store needs a task to ingest the
        // write ops sent through the channel so they can be consumed by its
        // WriteHandle in the background.
        // This consumes the inner store.
        let handle = std::thread::spawn(|| inner.receiver_loop());
        Self {
            sender: tx,
            readers,
            _handle: Arc::new(handle),
        }
    }

    pub fn insert(&self, k: K, v: V) -> Result<(), String>
    where
        K: Hash + Eq + Clone + Send + Sync + 'static,
        V: Clone + Send + Sync + 'static,
    {
        self.sender
            .send(FlushableOp::Op(Operation::Insert(k, v)))
            .map_err(|_| "failed to insert".to_string())
    }

    pub fn delete(&self, k: &K) -> Result<(), String>
    where
        K: Hash + Eq + Clone + Send + Sync + 'static,
        V: Clone + Send + Sync + 'static,
    {
        self.sender
            .send(FlushableOp::Op(Operation::Delete(k.clone())))
            .map_err(|_| "failed to insert".to_string())
    }

    pub fn get(&self, k: &K) -> Option<V> {
        self.readers.acquire_one().enter()?.get(k).cloned()
    }

    pub fn flush(&self) {
        let (t, r) = channel();
        let Ok(_) = self.sender.send(FlushableOp::Flush(Some(t))) else {
            return;
        };
        let _ = r.recv();
    }

    pub fn read_handle(&self) -> ChiralReader<K, V> {
        ChiralReader(self.readers.acquire_one().clone())
    }
}

impl<K, V> Default for ChiralMap<K, V>
where
    K: Hash + Eq + Clone + Send + Sync + 'static,
    V: Clone + Send + Sync + 'static,
{
    fn default() -> Self {
        Self::with_config(ChiralMapConfig::new().build())
    }
}

#[derive(Clone)]
struct ReaderPool<T> {
    items: Arc<Vec<Mutex<ReadHandle<T>>>>,
}

impl<T> ReaderPool<T> {
    fn new(r: ReadHandle<T>, max: usize) -> Self {
        let mut items = Vec::with_capacity(max);
        for _ in 0..max {
            items.push(Mutex::new(r.clone()));
        }
        Self {
            items: Arc::new(items),
        }
    }

    /// Acquires one of the mutexes from the pool.
    fn acquire_one(&self) -> MutexGuard<'_, ReadHandle<T>> {
        // These are a vec of mutexes so it's not like we can wait on a
        // specific one, the next one becoming available could be any of them.
        loop {
            for i in self.items.iter() {
                if let Ok(g) = i.try_lock() {
                    return g;
                }
            }
        }
    }
}