fluvio_stream_model/store/
concurrent_hashmap.rs

1use std::collections::HashMap;
2use std::collections::BTreeMap;
3use std::hash::Hash;
4use async_lock::RwLock;
5use async_lock::RwLockReadGuard;
6use async_lock::RwLockWriteGuard;
7
8/// inefficient but simple concurrent hashmap
9/// this should be only used in a test
10/// it locks for every write
11pub struct SimpleConcurrentHashMap<K, V>(RwLock<HashMap<K, V>>);
12
13impl<K, V> SimpleConcurrentHashMap<K, V>
14where
15    K: Eq + Hash,
16{
17    pub fn new() -> Self {
18        SimpleConcurrentHashMap(RwLock::new(HashMap::new()))
19    }
20
21    pub async fn insert(&self, key: K, value: V) -> Option<V> {
22        let mut lock = self.write().await;
23        lock.insert(key, value)
24    }
25
26    pub async fn read(&'_ self) -> RwLockReadGuard<'_, HashMap<K, V>> {
27        self.0.read().await
28    }
29
30    pub async fn write(&'_ self) -> RwLockWriteGuard<'_, HashMap<K, V>> {
31        self.0.write().await
32    }
33
34    pub async fn contains_key(&self, key: &K) -> bool {
35        self.read().await.contains_key(key)
36    }
37}
38
39impl<K, V> Default for SimpleConcurrentHashMap<K, V>
40where
41    K: Eq + Hash,
42{
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48#[derive(Debug)]
49pub struct SimpleConcurrentBTreeMap<K, V>(RwLock<BTreeMap<K, V>>);
50
51impl<K, V> Default for SimpleConcurrentBTreeMap<K, V>
52where
53    K: Ord,
54{
55    fn default() -> Self {
56        SimpleConcurrentBTreeMap(RwLock::new(BTreeMap::new()))
57    }
58}
59
60impl<K, V> SimpleConcurrentBTreeMap<K, V>
61where
62    K: Ord,
63{
64    pub fn new() -> Self {
65        Self(RwLock::new(BTreeMap::new()))
66    }
67
68    pub fn new_with_map(map: BTreeMap<K, V>) -> Self {
69        Self(RwLock::new(map))
70    }
71
72    pub async fn read(&'_ self) -> RwLockReadGuard<'_, BTreeMap<K, V>> {
73        self.0.read().await
74    }
75
76    pub async fn write(&'_ self) -> RwLockWriteGuard<'_, BTreeMap<K, V>> {
77        self.0.write().await
78    }
79
80    pub fn try_write(&self) -> Option<RwLockWriteGuard<BTreeMap<K, V>>> {
81        self.0.try_write()
82    }
83
84    pub async fn insert(&self, key: K, value: V) -> Option<V> {
85        let mut lock = self.write().await;
86        lock.insert(key, value)
87    }
88
89    pub async fn contains_key(&self, key: &K) -> bool {
90        self.read().await.contains_key(key)
91    }
92}