1use crate::sync::atomics::AtomicBox;
2use crate::txn::prelude::*;
3
4use std::collections::hash_map::{Iter, Keys, RandomState};
5use std::collections::HashMap;
6
7use anyhow::Result;
8use std::collections::hash_map;
9use std::fmt;
10use std::hash::Hash;
11use std::hash::{BuildHasher, Hasher};
12use std::ptr::NonNull;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15
16const DEFAULT_CAP: usize = 1024;
17
18#[derive(Clone)]
19pub struct LOTable<K, V, S = RandomState>
26where
27 K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
28 V: 'static + Clone + Send + Sync,
29 S: BuildHasher,
30{
31 latch: Vec<TVar<Arc<AtomicBox<Container<K, V>>>>>,
32 txn_man: Arc<TxnManager>,
33 txn: Arc<Txn>,
34 hash_builder: S,
35}
36
37impl<K, V> LOTable<K, V, RandomState>
38where
39 K: PartialEq + Eq + Hash + Clone + Send + Sync,
40 V: Clone + Send + Sync,
41{
42 pub fn new() -> Self {
43 Self::with_capacity(DEFAULT_CAP)
44 }
45
46 pub fn with_capacity(cap: usize) -> Self {
47 Self::with_capacity_and_hasher(cap, RandomState::new())
48 }
49}
50
51impl<K, V, S> LOTable<K, V, S>
52where
53 K: PartialEq + Eq + Hash + Clone + Send + Sync,
54 V: Clone + Send + Sync,
55 S: BuildHasher,
56{
57 fn with_capacity_and_hasher(cap: usize, hasher: S) -> LOTable<K, V, S> {
58 let txn_man = Arc::new(TxnManager {
59 txid: Arc::new(AtomicU64::new(GLOBAL_VCLOCK.load(Ordering::SeqCst))),
60 });
61
62 let txn: Arc<Txn> = Arc::new(txn_man.txn_build(
63 TransactionConcurrency::Optimistic,
64 TransactionIsolation::RepeatableRead,
65 100_usize,
66 1_usize,
67 "default".into(),
68 ));
69
70 Self {
71 latch: vec![TVar::new(Arc::new(AtomicBox::new(Container(HashMap::default())))); cap],
72 txn_man,
73 txn,
74 hash_builder: hasher,
75 }
76 }
77
78 #[inline]
79 pub fn insert(&self, k: K, v: V) -> Result<Arc<Option<V>>> {
80 let tvar = self.seek_tvar(&k);
81
82 let container = self.txn.begin(|t| t.read(&tvar))?;
83
84 let previous: Arc<AtomicBox<Option<V>>> = Arc::new(AtomicBox::new(None));
85 container.replace_with(|r| {
86 let mut entries = r.0.clone();
87 let p = entries.insert(k.clone(), v.clone());
88 previous.replace_with(|_| p.clone());
89 Container(entries)
90 });
91
92 previous.extract()
93 }
94
95 #[inline]
96 pub fn remove(&self, k: &K) -> Result<Arc<Option<V>>> {
97 let tvar = self.seek_tvar(&k);
98
99 let container = self.txn.begin(|t| t.read(&tvar))?;
100
101 let previous: Arc<AtomicBox<Option<V>>> = Arc::new(AtomicBox::new(None));
102 container.replace_with(|r| {
103 let mut c = r.0.clone();
104 let p = c.remove(k);
105 previous.replace_with(|_| p.clone());
106 Container(c)
107 });
108
109 previous.extract()
110 }
111
112 #[inline]
113 pub fn get(&self, k: &K) -> Option<V> {
114 let tvar = self.seek_tvar(k);
115
116 self.txn
117 .begin(|t| {
118 let container = t.read(&tvar);
119 let entries = container.get();
120 entries.0.get(k).cloned()
121 })
122 .unwrap_or(None)
123 }
124
125 #[inline]
126 pub fn replace_with<F>(&self, k: &K, f: F) -> Option<V>
127 where
128 F: Fn(Option<&V>) -> Option<V>,
129 {
130 let tvar = self.seek_tvar(k);
131
132 self.txn
133 .begin(|t| {
134 let container = t.read(&tvar);
135 let entries = container.get();
136 f(entries.0.get(k))
137 })
138 .unwrap_or(None)
139 }
140
141 #[inline]
142 pub fn replace_with_mut<F>(&self, k: &K, mut f: F) -> Option<V>
143 where
144 F: FnMut(&mut Option<V>) -> &mut Option<V>,
145 {
146 let tvar = self.seek_tvar(k);
147
148 self.txn
149 .begin(|t| {
150 let container = t.read(&tvar);
151 let entries = container.get();
152 let mut mv = entries.0.get(k).cloned();
153 f(&mut mv).clone()
154 })
155 .unwrap_or(None)
156 }
157
158 #[inline]
159 pub fn contains_key(&self, k: &K) -> bool {
160 let tvar = self.seek_tvar(&k);
161
162 self.txn
163 .begin(|t| {
164 let container = t.read(&tvar);
165 container.get().0.contains_key(k)
166 })
167 .unwrap_or(false)
168 }
169
170 #[inline]
171 pub fn len(&self) -> usize {
172 self.latch
173 .first()
174 .map(move |b| {
175 self.txn
176 .begin(|t| {
177 let container = t.read(&b);
178 container.get().0.len()
179 })
180 .unwrap_or(0_usize)
181 })
182 .unwrap_or(0_usize)
183 }
184
185 #[inline]
186 pub fn iter(&self) -> LOIter<K, V> {
187 LOIter {
188 idx: 0,
189 inner: None,
190 reader: HashMap::default(),
191 current_frame: 0,
192 latch_snapshot: self.latch.clone(),
193 txn: self.txn.clone(),
194 }
195 }
196
197 #[inline]
198 pub fn clear(&self) {
199 self.latch.iter().for_each(move |b| {
200 let _ = self.txn.begin(|t| {
201 let container = t.read(&b);
202 container.replace_with(|_r| Container(HashMap::default()));
203 });
204 });
205 }
208
209 pub fn keys<'table>(&'table self) -> impl Iterator<Item = K> + 'table {
210 let buckets: Vec<K> = self
211 .latch
212 .first()
213 .iter()
214 .flat_map(move |b| {
215 self.txn
216 .begin(|t| {
217 let container = t.read(&b);
218 container
219 .get()
220 .0
221 .keys()
222 .into_iter()
223 .map(Clone::clone)
224 .collect::<Vec<K>>()
225 })
226 .unwrap_or(vec![])
227 })
228 .collect();
229
230 buckets.into_iter()
231 }
232
233 pub fn values<'table>(&'table self) -> impl Iterator<Item = V> + 'table {
234 let buckets: Vec<V> = self
235 .latch
236 .first()
237 .iter()
238 .flat_map(move |b| {
239 self.txn
240 .begin(|t| {
241 let container = t.read(&b);
242 container
243 .get()
244 .0
245 .values()
246 .into_iter()
247 .map(Clone::clone)
248 .collect::<Vec<V>>()
249 })
250 .unwrap_or(vec![])
251 })
252 .collect();
253
254 buckets.into_iter()
255 }
256
257 fn hash(&self, key: &K) -> usize {
258 let mut hasher = self.hash_builder.build_hasher();
259 key.hash(&mut hasher);
260 hasher.finish() as usize % self.latch.len()
261 }
262
263 fn seek_tvar(&self, key: &K) -> TVar<Arc<AtomicBox<Container<K, V>>>> {
264 self.latch[self.hash(key)].clone()
265 }
266
267 fn fetch_frame(&self, frame_id: usize) -> hash_map::HashMap<K, V> {
268 let frame_tvar = self.latch[frame_id].clone();
269 match self.txn.begin(|t| t.read(&frame_tvar)) {
270 Ok(init_frame) => init_frame.get().0.clone(),
271 Err(_) => HashMap::new(),
272 }
273 }
274
275 pub fn tx_manager(&self) -> Arc<TxnManager> {
280 self.txn_man.clone()
281 }
282}
283
284#[derive(Clone)]
285struct Container<K, V>(HashMap<K, V>)
286where
287 K: PartialEq + Hash + Clone + Send + Sync,
288 V: Clone + Send + Sync;
289
290impl<K, V, S> Default for LOTable<K, V, S>
291where
292 K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
293 V: 'static + Clone + Send + Sync,
294 S: Default + BuildHasher,
295{
296 #[inline]
298 fn default() -> LOTable<K, V, S> {
299 LOTable::with_capacity_and_hasher(128, Default::default())
300 }
301}
302
303impl<K, V, S> fmt::Debug for LOTable<K, V, S>
304where
305 K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync + fmt::Debug,
306 V: 'static + Clone + Send + Sync + fmt::Debug,
307 S: std::hash::BuildHasher,
308{
309 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310 f.debug_map().entries(self.iter()).finish()
311 }
312}
313
314pub struct LOIter<'it, K, V>
315where
316 K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
317 V: 'static + Clone + Send + Sync,
318{
319 idx: usize,
320 inner: Option<hash_map::Iter<'it, K, V>>,
321 reader: HashMap<K, V>,
322 current_frame: usize,
323 latch_snapshot: Vec<TVar<Arc<AtomicBox<Container<K, V>>>>>,
324 txn: Arc<Txn>,
325}
326
327impl<'it, K, V> Iterator for LOIter<'it, K, V>
328where
329 K: 'static + PartialEq + Eq + Hash + Clone + Send + Sync,
330 V: 'static + Clone + Send + Sync,
331{
332 type Item = (K, V);
333
334 #[inline(always)]
335 fn next(&mut self) -> Option<Self::Item> {
336 if self.idx == 0 {
337 let tvar = &self.latch_snapshot[self.current_frame];
338 if let Ok(read) = self.txn.begin(|t| {
339 let frame = t.read(&tvar);
340 frame.get().0.clone()
341 }) {
342 self.reader = read;
343 self.inner = Some(unsafe { std::mem::transmute(self.reader.iter()) });
344 }
345 }
346
347 let read_iter = self.inner.as_mut().unwrap();
348 if let Some(x) = read_iter.next() {
349 self.idx += 1;
350 self.inner = Some(read_iter.clone());
351 Some((x.0.clone(), x.1.clone()))
352 } else {
353 if self.idx == self.reader.len() {
354 self.current_frame += 1;
355 self.idx = 0;
356 }
357 None
358 }
359 }
360
361 #[inline(always)]
362 fn size_hint(&self) -> (usize, Option<usize>) {
363 let tvar = &self.latch_snapshot[self.current_frame];
364 if let Ok(frame_len) = self.txn.begin(|t| t.read(&tvar)) {
365 (frame_len.get().0.len(), None)
368 } else {
369 (0, None)
370 }
371 }
372}
373
374#[cfg(test)]
375mod lotable_tests {
376 use super::LOTable;
377
378 #[test]
379 fn iter_generator() {
380 let lotable: LOTable<String, u64> = LOTable::new();
381 lotable.insert("Saudade0".to_string(), 123123);
382 lotable.insert("Saudade0".to_string(), 123);
383 lotable.insert("Saudade1".to_string(), 123123);
384 lotable.insert("Saudade2".to_string(), 123123);
385 lotable.insert("Saudade3".to_string(), 123123);
386 lotable.insert("Saudade4".to_string(), 123123);
387 lotable.insert("Saudade5".to_string(), 123123);
388
389 lotable.insert("123123".to_string(), 123123);
390 lotable.insert("1231231".to_string(), 123123);
391 lotable.insert("1231232".to_string(), 123123);
392 lotable.insert("1231233".to_string(), 123123);
393 lotable.insert("1231234".to_string(), 123123);
394 lotable.insert("1231235".to_string(), 123123);
395
396 let res: Vec<(String, u64)> = lotable.iter().collect();
397 assert_eq!(res.len(), 12);
398
399 assert_eq!(lotable.get(&"Saudade0".to_string()), Some(123));
400 }
401
402 #[test]
403 fn values_iter_generator() {
404 let lotable: LOTable<String, u64> = LOTable::new();
405
406 (0..100).into_iter().for_each(|_i| {
407 lotable.insert("Saudade0".to_string(), 123123);
408 lotable.insert("Saudade0".to_string(), 123);
409 lotable.insert("Saudade1".to_string(), 123123);
410 lotable.insert("Saudade2".to_string(), 123123);
411 lotable.insert("Saudade3".to_string(), 123123);
412 lotable.insert("Saudade4".to_string(), 123123);
413 lotable.insert("Saudade5".to_string(), 123123);
414
415 lotable.insert("123123".to_string(), 123123);
416 lotable.insert("1231231".to_string(), 123123);
417 lotable.insert("1231232".to_string(), 123123);
418 lotable.insert("1231233".to_string(), 123123);
419 lotable.insert("1231234".to_string(), 123123);
420 lotable.insert("1231235".to_string(), 123123);
421
422 let res: Vec<u64> = lotable.values().into_iter().collect();
423 assert_eq!(res.len(), 12);
425 });
426
427 lotable.clear();
428 let res: Vec<u64> = lotable.values().into_iter().collect();
429 assert_eq!(res.len(), 0);
430
431 (0..1_000).into_iter().for_each(|i| {
432 lotable.insert(format!("{}", i), i as u64);
433
434 let resvals: Vec<u64> = lotable.values().into_iter().collect();
435 assert_eq!(resvals.len(), i + 1);
437 });
438
439 lotable.clear();
440 let res: Vec<u64> = lotable.values().into_iter().collect();
441 assert_eq!(res.len(), 0);
442
443 (0..1_000).into_iter().for_each(|i| {
444 lotable.insert(format!("{}", i), i as u64);
445
446 let reskeys: Vec<String> = lotable.keys().into_iter().collect();
447 assert_eq!(reskeys.len(), i + 1);
449 });
450 }
451}