pub(crate) mod inner;
use builder_pattern::Builder;
use inner::{ChiralMapInner, single_map::SingleMap};
use left_right::ReadHandle;
use std::{
hash::Hash,
sync::{
Arc, Mutex, MutexGuard,
mpsc::{Sender, channel},
},
thread::JoinHandle,
};
pub(crate) enum FlushableOp<K, V> {
Flush(Option<Sender<()>>),
Op(Operation<K, V>),
}
impl<K, V> From<Operation<K, V>> for FlushableOp<K, V> {
fn from(value: Operation<K, V>) -> Self {
FlushableOp::Op(value)
}
}
#[derive(Clone)]
pub(crate) enum Operation<K, V> {
Insert(K, V),
Delete(K),
}
#[derive(Builder)]
pub struct ChiralMapConfig {
#[cfg(feature = "metrics")]
#[into]
#[default(String::new())]
pub id: String,
#[default(0)]
pub initial_capacity: usize,
#[default(10000)]
pub auto_flush_count: usize,
#[default(10)]
pub auto_flush_seconds: u64,
#[default(1000)]
pub reader_pool_size: usize,
}
pub struct ChiralReader<K, V>(ReadHandle<SingleMap<K, V>>)
where
K: Hash + Eq + Clone,
V: Clone;
impl<K, V> ChiralReader<K, V>
where
K: Hash + Eq + Clone,
V: Clone,
{
pub fn get(&self, k: &K) -> Option<V> {
self.0.enter()?.get(k).cloned()
}
}
#[derive(Clone)]
pub struct ChiralMap<K, V>
where
K: Hash + Eq + Clone,
V: Clone,
{
sender: Sender<FlushableOp<K, V>>,
readers: ReaderPool<SingleMap<K, V>>,
_handle: Arc<JoinHandle<()>>,
}
impl<K, V> ChiralMap<K, V>
where
K: Hash + Eq + Clone,
V: Clone,
{
pub fn with_config(config: ChiralMapConfig) -> Self
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
let (tx, rx) = channel::<FlushableOp<K, V>>();
let inner = ChiralMapInner::<K, V>::new(&config, rx);
let reader = inner.reader();
let readers = ReaderPool::new(reader.clone(), config.reader_pool_size);
let handle = std::thread::spawn(|| inner.receiver_loop());
Self {
sender: tx,
readers,
_handle: Arc::new(handle),
}
}
pub fn insert(&self, k: K, v: V) -> Result<(), String>
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
self.sender
.send(FlushableOp::Op(Operation::Insert(k, v)))
.map_err(|_| "failed to insert".to_string())
}
pub fn delete(&self, k: &K) -> Result<(), String>
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
self.sender
.send(FlushableOp::Op(Operation::Delete(k.clone())))
.map_err(|_| "failed to insert".to_string())
}
pub fn get(&self, k: &K) -> Option<V> {
self.readers.acquire_one().enter()?.get(k).cloned()
}
pub fn flush(&self) {
let (t, r) = channel();
let Ok(_) = self.sender.send(FlushableOp::Flush(Some(t))) else {
return;
};
let _ = r.recv();
}
pub fn read_handle(&self) -> ChiralReader<K, V> {
ChiralReader(self.readers.acquire_one().clone())
}
}
impl<K, V> Default for ChiralMap<K, V>
where
K: Hash + Eq + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
fn default() -> Self {
Self::with_config(ChiralMapConfig::new().build())
}
}
#[derive(Clone)]
struct ReaderPool<T> {
items: Arc<Vec<Mutex<ReadHandle<T>>>>,
}
impl<T> ReaderPool<T> {
fn new(r: ReadHandle<T>, max: usize) -> Self {
let mut items = Vec::with_capacity(max);
for _ in 0..max {
items.push(Mutex::new(r.clone()));
}
Self {
items: Arc::new(items),
}
}
fn acquire_one(&self) -> MutexGuard<'_, ReadHandle<T>> {
loop {
for i in self.items.iter() {
if let Ok(g) = i.try_lock() {
return g;
}
}
}
}
}