lever/table/
lotable.rs

1use crate::sync::atomics::AtomicBox;
2use crate::txn::prelude::*;
3
4use std::collections::hash_map::{Iter, Keys, RandomState};
5use std::collections::HashMap;
6
7use anyhow::Result;
8use std::collections::hash_map;
9use std::fmt;
10use std::hash::Hash;
11use std::hash::{BuildHasher, Hasher};
12use std::ptr::NonNull;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15
16const DEFAULT_CAP: usize = 1024;
17
18#[derive(Clone)]
19///
20/// Lever Transactional Table implementation with [Optimistic](TransactionConcurrency::Optimistic)
21/// concurrency and [RepeatableRead](TransactionIsolation::RepeatableRead) isolation.
22///
23/// Transactional hash table fully concurrent and as long as no conflicts are made
24/// it is both lock and wait free.
25pub struct LOTable<K, V, S = RandomState>
26where
27    K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
28    V: 'static + Clone + Send + Sync,
29    S: BuildHasher,
30{
31    latch: Vec<TVar<Arc<AtomicBox<Container<K, V>>>>>,
32    txn_man: Arc<TxnManager>,
33    txn: Arc<Txn>,
34    hash_builder: S,
35}
36
37impl<K, V> LOTable<K, V, RandomState>
38where
39    K: PartialEq + Eq + Hash + Clone + Send + Sync,
40    V: Clone + Send + Sync,
41{
42    pub fn new() -> Self {
43        Self::with_capacity(DEFAULT_CAP)
44    }
45
46    pub fn with_capacity(cap: usize) -> Self {
47        Self::with_capacity_and_hasher(cap, RandomState::new())
48    }
49}
50
51impl<K, V, S> LOTable<K, V, S>
52where
53    K: PartialEq + Eq + Hash + Clone + Send + Sync,
54    V: Clone + Send + Sync,
55    S: BuildHasher,
56{
57    fn with_capacity_and_hasher(cap: usize, hasher: S) -> LOTable<K, V, S> {
58        let txn_man = Arc::new(TxnManager {
59            txid: Arc::new(AtomicU64::new(GLOBAL_VCLOCK.load(Ordering::SeqCst))),
60        });
61
62        let txn: Arc<Txn> = Arc::new(txn_man.txn_build(
63            TransactionConcurrency::Optimistic,
64            TransactionIsolation::RepeatableRead,
65            100_usize,
66            1_usize,
67            "default".into(),
68        ));
69
70        Self {
71            latch: vec![TVar::new(Arc::new(AtomicBox::new(Container(HashMap::default())))); cap],
72            txn_man,
73            txn,
74            hash_builder: hasher,
75        }
76    }
77
78    #[inline]
79    pub fn insert(&self, k: K, v: V) -> Result<Arc<Option<V>>> {
80        let tvar = self.seek_tvar(&k);
81
82        let container = self.txn.begin(|t| t.read(&tvar))?;
83
84        let previous: Arc<AtomicBox<Option<V>>> = Arc::new(AtomicBox::new(None));
85        container.replace_with(|r| {
86            let mut entries = r.0.clone();
87            let p = entries.insert(k.clone(), v.clone());
88            previous.replace_with(|_| p.clone());
89            Container(entries)
90        });
91
92        previous.extract()
93    }
94
95    #[inline]
96    pub fn remove(&self, k: &K) -> Result<Arc<Option<V>>> {
97        let tvar = self.seek_tvar(&k);
98
99        let container = self.txn.begin(|t| t.read(&tvar))?;
100
101        let previous: Arc<AtomicBox<Option<V>>> = Arc::new(AtomicBox::new(None));
102        container.replace_with(|r| {
103            let mut c = r.0.clone();
104            let p = c.remove(k);
105            previous.replace_with(|_| p.clone());
106            Container(c)
107        });
108
109        previous.extract()
110    }
111
112    #[inline]
113    pub fn get(&self, k: &K) -> Option<V> {
114        let tvar = self.seek_tvar(k);
115
116        self.txn
117            .begin(|t| {
118                let container = t.read(&tvar);
119                let entries = container.get();
120                entries.0.get(k).cloned()
121            })
122            .unwrap_or(None)
123    }
124
125    #[inline]
126    pub fn replace_with<F>(&self, k: &K, f: F) -> Option<V>
127    where
128        F: Fn(Option<&V>) -> Option<V>,
129    {
130        let tvar = self.seek_tvar(k);
131
132        self.txn
133            .begin(|t| {
134                let container = t.read(&tvar);
135                let entries = container.get();
136                f(entries.0.get(k))
137            })
138            .unwrap_or(None)
139    }
140
141    #[inline]
142    pub fn replace_with_mut<F>(&self, k: &K, mut f: F) -> Option<V>
143    where
144        F: FnMut(&mut Option<V>) -> &mut Option<V>,
145    {
146        let tvar = self.seek_tvar(k);
147
148        self.txn
149            .begin(|t| {
150                let container = t.read(&tvar);
151                let entries = container.get();
152                let mut mv = entries.0.get(k).cloned();
153                f(&mut mv).clone()
154            })
155            .unwrap_or(None)
156    }
157
158    #[inline]
159    pub fn contains_key(&self, k: &K) -> bool {
160        let tvar = self.seek_tvar(&k);
161
162        self.txn
163            .begin(|t| {
164                let container = t.read(&tvar);
165                container.get().0.contains_key(k)
166            })
167            .unwrap_or(false)
168    }
169
170    #[inline]
171    pub fn len(&self) -> usize {
172        self.latch
173            .first()
174            .map(move |b| {
175                self.txn
176                    .begin(|t| {
177                        let container = t.read(&b);
178                        container.get().0.len()
179                    })
180                    .unwrap_or(0_usize)
181            })
182            .unwrap_or(0_usize)
183    }
184
185    #[inline]
186    pub fn iter(&self) -> LOIter<K, V> {
187        LOIter {
188            idx: 0,
189            inner: None,
190            reader: HashMap::default(),
191            current_frame: 0,
192            latch_snapshot: self.latch.clone(),
193            txn: self.txn.clone(),
194        }
195    }
196
197    #[inline]
198    pub fn clear(&self) {
199        self.latch.iter().for_each(move |b| {
200            let _ = self.txn.begin(|t| {
201                let container = t.read(&b);
202                container.replace_with(|_r| Container(HashMap::default()));
203            });
204        });
205        // TODO: (vcq): Shrink to fit as a optimized table.
206        // self.latch.shrink_to_fit();
207    }
208
209    pub fn keys<'table>(&'table self) -> impl Iterator<Item = K> + 'table {
210        let buckets: Vec<K> = self
211            .latch
212            .first()
213            .iter()
214            .flat_map(move |b| {
215                self.txn
216                    .begin(|t| {
217                        let container = t.read(&b);
218                        container
219                            .get()
220                            .0
221                            .keys()
222                            .into_iter()
223                            .map(Clone::clone)
224                            .collect::<Vec<K>>()
225                    })
226                    .unwrap_or(vec![])
227            })
228            .collect();
229
230        buckets.into_iter()
231    }
232
233    pub fn values<'table>(&'table self) -> impl Iterator<Item = V> + 'table {
234        let buckets: Vec<V> = self
235            .latch
236            .first()
237            .iter()
238            .flat_map(move |b| {
239                self.txn
240                    .begin(|t| {
241                        let container = t.read(&b);
242                        container
243                            .get()
244                            .0
245                            .values()
246                            .into_iter()
247                            .map(Clone::clone)
248                            .collect::<Vec<V>>()
249                    })
250                    .unwrap_or(vec![])
251            })
252            .collect();
253
254        buckets.into_iter()
255    }
256
257    fn hash(&self, key: &K) -> usize {
258        let mut hasher = self.hash_builder.build_hasher();
259        key.hash(&mut hasher);
260        hasher.finish() as usize % self.latch.len()
261    }
262
263    fn seek_tvar(&self, key: &K) -> TVar<Arc<AtomicBox<Container<K, V>>>> {
264        self.latch[self.hash(key)].clone()
265    }
266
267    fn fetch_frame(&self, frame_id: usize) -> hash_map::HashMap<K, V> {
268        let frame_tvar = self.latch[frame_id].clone();
269        match self.txn.begin(|t| t.read(&frame_tvar)) {
270            Ok(init_frame) => init_frame.get().0.clone(),
271            Err(_) => HashMap::new(),
272        }
273    }
274
275    ////////////////////////////////////////////////////////////////////////////////
276    ////////// Transactional Area
277    ////////////////////////////////////////////////////////////////////////////////
278
279    pub fn tx_manager(&self) -> Arc<TxnManager> {
280        self.txn_man.clone()
281    }
282}
283
284#[derive(Clone)]
285struct Container<K, V>(HashMap<K, V>)
286where
287    K: PartialEq + Hash + Clone + Send + Sync,
288    V: Clone + Send + Sync;
289
290impl<K, V, S> Default for LOTable<K, V, S>
291where
292    K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
293    V: 'static + Clone + Send + Sync,
294    S: Default + BuildHasher,
295{
296    /// Creates an empty `LOTable<K, V, S>`, with the `Default` value for the hasher.
297    #[inline]
298    fn default() -> LOTable<K, V, S> {
299        LOTable::with_capacity_and_hasher(128, Default::default())
300    }
301}
302
303impl<K, V, S> fmt::Debug for LOTable<K, V, S>
304where
305    K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync + fmt::Debug,
306    V: 'static + Clone + Send + Sync + fmt::Debug,
307    S: std::hash::BuildHasher,
308{
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        f.debug_map().entries(self.iter()).finish()
311    }
312}
313
314pub struct LOIter<'it, K, V>
315where
316    K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
317    V: 'static + Clone + Send + Sync,
318{
319    idx: usize,
320    inner: Option<hash_map::Iter<'it, K, V>>,
321    reader: HashMap<K, V>,
322    current_frame: usize,
323    latch_snapshot: Vec<TVar<Arc<AtomicBox<Container<K, V>>>>>,
324    txn: Arc<Txn>,
325}
326
327impl<'it, K, V> Iterator for LOIter<'it, K, V>
328where
329    K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
330    V: 'static + Clone + Send + Sync,
331{
332    type Item = (K, V);
333
334    #[inline(always)]
335    fn next(&mut self) -> Option<Self::Item> {
336        if self.idx == 0 {
337            let tvar = &self.latch_snapshot[self.current_frame];
338            if let Ok(read) = self.txn.begin(|t| {
339                let frame = t.read(&tvar);
340                frame.get().0.clone()
341            }) {
342                self.reader = read;
343                self.inner = Some(unsafe { std::mem::transmute(self.reader.iter()) });
344            }
345        }
346
347        let read_iter = self.inner.as_mut().unwrap();
348        if let Some(x) = read_iter.next() {
349            self.idx += 1;
350            self.inner = Some(read_iter.clone());
351            Some((x.0.clone(), x.1.clone()))
352        } else {
353            if self.idx == self.reader.len() {
354                self.current_frame += 1;
355                self.idx = 0;
356            }
357            None
358        }
359    }
360
361    #[inline(always)]
362    fn size_hint(&self) -> (usize, Option<usize>) {
363        let tvar = &self.latch_snapshot[self.current_frame];
364        if let Ok(frame_len) = self.txn.begin(|t| t.read(&tvar)) {
365            // TODO: (frame_len, Some(max_bound)) is possible.
366            // Written like this to not overshoot the alloc
367            (frame_len.get().0.len(), None)
368        } else {
369            (0, None)
370        }
371    }
372}
373
374#[cfg(test)]
375mod lotable_tests {
376    use super::LOTable;
377
378    #[test]
379    fn iter_generator() {
380        let lotable: LOTable<String, u64> = LOTable::new();
381        lotable.insert("Saudade0".to_string(), 123123);
382        lotable.insert("Saudade0".to_string(), 123);
383        lotable.insert("Saudade1".to_string(), 123123);
384        lotable.insert("Saudade2".to_string(), 123123);
385        lotable.insert("Saudade3".to_string(), 123123);
386        lotable.insert("Saudade4".to_string(), 123123);
387        lotable.insert("Saudade5".to_string(), 123123);
388
389        lotable.insert("123123".to_string(), 123123);
390        lotable.insert("1231231".to_string(), 123123);
391        lotable.insert("1231232".to_string(), 123123);
392        lotable.insert("1231233".to_string(), 123123);
393        lotable.insert("1231234".to_string(), 123123);
394        lotable.insert("1231235".to_string(), 123123);
395
396        let res: Vec<(String, u64)> = lotable.iter().collect();
397        assert_eq!(res.len(), 12);
398
399        assert_eq!(lotable.get(&"Saudade0".to_string()), Some(123));
400    }
401
402    #[test]
403    fn values_iter_generator() {
404        let lotable: LOTable<String, u64> = LOTable::new();
405
406        (0..100).into_iter().for_each(|_i| {
407            lotable.insert("Saudade0".to_string(), 123123);
408            lotable.insert("Saudade0".to_string(), 123);
409            lotable.insert("Saudade1".to_string(), 123123);
410            lotable.insert("Saudade2".to_string(), 123123);
411            lotable.insert("Saudade3".to_string(), 123123);
412            lotable.insert("Saudade4".to_string(), 123123);
413            lotable.insert("Saudade5".to_string(), 123123);
414
415            lotable.insert("123123".to_string(), 123123);
416            lotable.insert("1231231".to_string(), 123123);
417            lotable.insert("1231232".to_string(), 123123);
418            lotable.insert("1231233".to_string(), 123123);
419            lotable.insert("1231234".to_string(), 123123);
420            lotable.insert("1231235".to_string(), 123123);
421
422            let res: Vec<u64> = lotable.values().into_iter().collect();
423            // dbg!(&res);
424            assert_eq!(res.len(), 12);
425        });
426
427        lotable.clear();
428        let res: Vec<u64> = lotable.values().into_iter().collect();
429        assert_eq!(res.len(), 0);
430
431        (0..1_000).into_iter().for_each(|i| {
432            lotable.insert(format!("{}", i), i as u64);
433
434            let resvals: Vec<u64> = lotable.values().into_iter().collect();
435            // dbg!(&resvals);
436            assert_eq!(resvals.len(), i + 1);
437        });
438
439        lotable.clear();
440        let res: Vec<u64> = lotable.values().into_iter().collect();
441        assert_eq!(res.len(), 0);
442
443        (0..1_000).into_iter().for_each(|i| {
444            lotable.insert(format!("{}", i), i as u64);
445
446            let reskeys: Vec<String> = lotable.keys().into_iter().collect();
447            // dbg!(&reskeys);
448            assert_eq!(reskeys.len(), i + 1);
449        });
450    }
451}