1use std::collections::HashSet;
61use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
62use std::sync::Arc;
63
64use dashmap::DashMap;
65use parking_lot::{Mutex, RwLock};
66use smallvec::SmallVec;
67
68use crate::durable_storage::InlineKey;
69use sochdb_core::Result;
70
71pub const DEFAULT_HOT_BUFFER_CAPACITY: usize = 100_000;
73
74pub const FLUSH_THRESHOLD_RATIO: f64 = 0.8;
76
77#[derive(Debug, Clone)]
83pub struct HotEntry {
84 pub key: InlineKey,
86 pub value: Option<Vec<u8>>,
88 pub txn_id: u64,
90 pub seq: u64,
92}
93
94impl HotEntry {
95 pub fn new(key: InlineKey, value: Option<Vec<u8>>, txn_id: u64, seq: u64) -> Self {
96 Self {
97 key,
98 value,
99 txn_id,
100 seq,
101 }
102 }
103}
104
105#[derive(Debug)]
111pub struct SortedBatch {
112 entries: Vec<HotEntry>,
114 key_index: DashMap<u64, usize>,
117 min_ts: u64,
119 max_ts: u64,
121}
122
123impl SortedBatch {
124 pub fn from_unsorted(mut entries: Vec<HotEntry>) -> Self {
126 if entries.is_empty() {
127 return Self {
128 entries: Vec::new(),
129 key_index: DashMap::new(),
130 min_ts: u64::MAX,
131 max_ts: 0,
132 };
133 }
134
135 entries.sort_unstable_by(|a, b| {
137 match a.key.as_slice().cmp(b.key.as_slice()) {
138 std::cmp::Ordering::Equal => b.seq.cmp(&a.seq), other => other,
140 }
141 });
142
143 let key_index = DashMap::new();
145 let mut last_key: Option<&[u8]> = None;
146 for (idx, entry) in entries.iter().enumerate() {
147 if last_key != Some(entry.key.as_slice()) {
148 let hash = Self::hash_key(&entry.key);
149 key_index.insert(hash, idx);
150 last_key = Some(entry.key.as_slice());
151 }
152 }
153
154 let min_ts = entries.iter().map(|e| e.seq).min().unwrap_or(u64::MAX);
156 let max_ts = entries.iter().map(|e| e.seq).max().unwrap_or(0);
157
158 Self {
159 entries,
160 key_index,
161 min_ts,
162 max_ts,
163 }
164 }
165
166 #[inline]
168 fn hash_key(key: &[u8]) -> u64 {
169 twox_hash::xxh3::hash64(key)
170 }
171
172 pub fn get(&self, key: &[u8]) -> Option<&HotEntry> {
174 let hash = Self::hash_key(key);
175
176 if let Some(idx) = self.key_index.get(&hash) {
177 let idx = *idx;
179 if idx < self.entries.len() && self.entries[idx].key.as_slice() == key {
180 return Some(&self.entries[idx]);
181 }
182 }
183
184 self.entries
186 .binary_search_by(|e| e.key.as_slice().cmp(key))
187 .ok()
188 .map(|idx| &self.entries[idx])
189 }
190
191 pub fn prefix_scan(&self, prefix: &[u8]) -> impl Iterator<Item = &HotEntry> {
193 let start_idx = self.entries
195 .partition_point(|e| e.key.as_slice() < prefix);
196
197 self.entries[start_idx..]
198 .iter()
199 .take_while(move |e| e.key.starts_with(prefix))
200 }
201
202 pub fn len(&self) -> usize {
204 self.entries.len()
205 }
206
207 pub fn is_empty(&self) -> bool {
209 self.entries.is_empty()
210 }
211
212 pub fn iter(&self) -> impl Iterator<Item = &HotEntry> {
214 self.entries.iter()
215 }
216
217 pub fn timestamp_range(&self) -> (u64, u64) {
219 (self.min_ts, self.max_ts)
220 }
221}
222
223pub struct TieredMemTable {
247 hot_buffer: RwLock<Vec<HotEntry>>,
249 hot_capacity: usize,
251 warm_batches: RwLock<Vec<Arc<SortedBatch>>>,
253 #[allow(dead_code)]
256 point_index: DashMap<Vec<u8>, (usize, usize)>,
257 seq_counter: AtomicU64,
259 size_bytes: AtomicU64,
261 entry_count: AtomicUsize,
263 pending_commits: DashMap<u64, u64>,
265 flush_lock: Mutex<()>,
267}
268
269impl TieredMemTable {
270 pub fn new() -> Self {
271 Self::with_capacity(DEFAULT_HOT_BUFFER_CAPACITY)
272 }
273
274 pub fn with_capacity(capacity: usize) -> Self {
275 Self {
276 hot_buffer: RwLock::new(Vec::with_capacity(capacity)),
277 hot_capacity: capacity,
278 warm_batches: RwLock::new(Vec::new()),
279 point_index: DashMap::new(),
280 seq_counter: AtomicU64::new(1),
281 size_bytes: AtomicU64::new(0),
282 entry_count: AtomicUsize::new(0),
283 pending_commits: DashMap::new(),
284 flush_lock: Mutex::new(()),
285 }
286 }
287
288 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
293 let key_inline = SmallVec::from_slice(key);
294 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
295 let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
296
297 let entry = HotEntry::new(key_inline, value, txn_id, seq);
298
299 {
301 let mut buffer = self.hot_buffer.write();
302 buffer.push(entry);
303 }
304
305 self.size_bytes.fetch_add((key.len() + value_size) as u64, Ordering::Relaxed);
306 self.entry_count.fetch_add(1, Ordering::Relaxed);
307
308 if self.should_flush() {
310 self.try_flush()?;
311 }
312
313 Ok(())
314 }
315
316 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
318 let mut total_size = 0u64;
319 let mut entries = Vec::with_capacity(writes.len());
320
321 for (key, value) in writes {
322 let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
323 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
324 total_size += (key.len() + value_size) as u64;
325
326 entries.push(HotEntry::new(
327 SmallVec::from_slice(key),
328 value.clone(),
329 txn_id,
330 seq,
331 ));
332 }
333
334 {
336 let mut buffer = self.hot_buffer.write();
337 buffer.extend(entries);
338 }
339
340 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
341 self.entry_count.fetch_add(writes.len(), Ordering::Relaxed);
342
343 if self.should_flush() {
344 self.try_flush()?;
345 }
346
347 Ok(())
348 }
349
350 pub fn read(
352 &self,
353 key: &[u8],
354 snapshot_ts: u64,
355 current_txn_id: Option<u64>,
356 ) -> Option<Vec<u8>> {
357 {
359 let buffer = self.hot_buffer.read();
360 for entry in buffer.iter().rev() {
362 if entry.key.as_slice() == key {
363 if self.is_visible(entry, snapshot_ts, current_txn_id) {
365 return entry.value.clone();
366 }
367 }
368 }
369 }
370
371 {
373 let batches = self.warm_batches.read();
374 for batch in batches.iter().rev() {
376 if let Some(entry) = batch.get(key) {
377 if self.is_visible(entry, snapshot_ts, current_txn_id) {
378 return entry.value.clone();
379 }
380 }
381 }
382 }
383
384 None
385 }
386
387 #[inline]
389 fn is_visible(&self, entry: &HotEntry, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
390 if let Some(my_txn) = current_txn_id {
392 if entry.txn_id == my_txn {
393 return true;
394 }
395 }
396
397 if let Some(commit_ts) = self.pending_commits.get(&entry.txn_id) {
399 return *commit_ts < snapshot_ts;
400 }
401
402 false
403 }
404
405 pub fn commit(&self, txn_id: u64, commit_ts: u64, _write_set: &HashSet<InlineKey>) {
407 self.pending_commits.insert(txn_id, commit_ts);
409
410 }
413
414 pub fn abort(&self, txn_id: u64) {
416 self.pending_commits.remove(&txn_id);
417
418 let mut buffer = self.hot_buffer.write();
420 buffer.retain(|e| e.txn_id != txn_id);
421 }
422
423 pub fn scan_prefix(
427 &self,
428 prefix: &[u8],
429 snapshot_ts: u64,
430 current_txn_id: Option<u64>,
431 ) -> Vec<(Vec<u8>, Vec<u8>)> {
432 let mut results = Vec::new();
433 let mut seen_keys: HashSet<Vec<u8>> = HashSet::new();
434
435 {
437 let buffer = self.hot_buffer.read();
438 for entry in buffer.iter().rev() {
439 if entry.key.starts_with(prefix)
440 && !seen_keys.contains(entry.key.as_slice())
441 && self.is_visible(entry, snapshot_ts, current_txn_id)
442 {
443 if let Some(ref value) = entry.value {
444 results.push((entry.key.to_vec(), value.clone()));
445 seen_keys.insert(entry.key.to_vec());
446 }
447 }
448 }
449 }
450
451 {
453 let batches = self.warm_batches.read();
454 for batch in batches.iter().rev() {
455 for entry in batch.prefix_scan(prefix) {
456 if !seen_keys.contains(entry.key.as_slice())
457 && self.is_visible(entry, snapshot_ts, current_txn_id)
458 {
459 if let Some(ref value) = entry.value {
460 results.push((entry.key.to_vec(), value.clone()));
461 seen_keys.insert(entry.key.to_vec());
462 }
463 }
464 }
465 }
466 }
467
468 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
470 results
471 }
472
473 pub fn scan_prefix_tournament(
493 &self,
494 prefix: &[u8],
495 snapshot_ts: u64,
496 current_txn_id: Option<u64>,
497 ) -> Vec<(Vec<u8>, Vec<u8>)> {
498 use crate::tournament_tree::TournamentTree;
499
500 let mut sorted_sources: Vec<Vec<HotEntry>> = Vec::new();
502
503 {
505 let buffer = self.hot_buffer.read();
506 let mut hot_entries: Vec<HotEntry> = buffer
507 .iter()
508 .filter(|e| e.key.starts_with(prefix))
509 .cloned()
510 .collect();
511
512 hot_entries.sort_unstable_by(|a, b| {
514 match a.key.as_slice().cmp(b.key.as_slice()) {
515 std::cmp::Ordering::Equal => b.seq.cmp(&a.seq),
516 other => other,
517 }
518 });
519
520 let mut seen = HashSet::new();
522 hot_entries.retain(|e| seen.insert(e.key.to_vec()));
523
524 if !hot_entries.is_empty() {
525 sorted_sources.push(hot_entries);
526 }
527 }
528
529 {
531 let batches = self.warm_batches.read();
532 for batch in batches.iter().rev() {
534 let entries: Vec<HotEntry> = batch
535 .prefix_scan(prefix)
536 .cloned()
537 .collect();
538
539 if !entries.is_empty() {
540 sorted_sources.push(entries);
541 }
542 }
543 }
544
545 if sorted_sources.is_empty() {
546 return Vec::new();
547 }
548
549 if sorted_sources.len() == 1 {
551 return sorted_sources
552 .into_iter()
553 .next()
554 .unwrap()
555 .into_iter()
556 .filter(|e| self.is_visible(e, snapshot_ts, current_txn_id))
557 .filter_map(|e| e.value.map(|v| (e.key.to_vec(), v)))
558 .collect();
559 }
560
561 #[derive(Clone)]
565 struct KeyedEntry {
566 entry: HotEntry,
567 source_idx: usize,
568 }
569
570 impl PartialEq for KeyedEntry {
571 fn eq(&self, other: &Self) -> bool {
572 self.entry.key.as_slice() == other.entry.key.as_slice()
573 }
574 }
575 impl Eq for KeyedEntry {}
576 impl PartialOrd for KeyedEntry {
577 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
578 Some(self.cmp(other))
579 }
580 }
581 impl Ord for KeyedEntry {
582 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
583 match self.entry.key.as_slice().cmp(other.entry.key.as_slice()) {
586 std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
587 other => other,
588 }
589 }
590 }
591
592 let iters: Vec<_> = sorted_sources
593 .into_iter()
594 .enumerate()
595 .map(|(source_idx, v)| {
596 v.into_iter().map(move |e| KeyedEntry { entry: e, source_idx })
597 })
598 .collect();
599
600 let mut tree = TournamentTree::new(iters);
601 let mut results = Vec::new();
602 let mut last_key: Option<Vec<u8>> = None;
603
604 while let Some((_, keyed)) = tree.pop() {
606 let entry = keyed.entry;
607
608 if let Some(ref last) = last_key {
610 if entry.key.as_slice() == last.as_slice() {
611 continue;
612 }
613 }
614 last_key = Some(entry.key.to_vec());
615
616 if !self.is_visible(&entry, snapshot_ts, current_txn_id) {
618 continue;
622 }
623
624 if let Some(value) = entry.value {
626 results.push((entry.key.to_vec(), value));
627 }
628 }
629
630 results
631 }
632
633 fn should_flush(&self) -> bool {
635 let buffer = self.hot_buffer.read();
636 buffer.len() >= (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize
637 }
638
639 pub fn try_flush(&self) -> Result<()> {
641 let guard = match self.flush_lock.try_lock() {
643 Some(g) => g,
644 None => return Ok(()), };
646
647 let entries = {
649 let mut buffer = self.hot_buffer.write();
650 if buffer.len() < (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize {
651 return Ok(());
653 }
654 std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
655 };
656
657 if entries.is_empty() {
658 return Ok(());
659 }
660
661 let batch = Arc::new(SortedBatch::from_unsorted(entries));
663
664 {
666 let mut batches = self.warm_batches.write();
667 batches.push(batch);
668 }
669
670 drop(guard);
671 Ok(())
672 }
673
674 pub fn flush(&self) -> Result<()> {
676 let _guard = self.flush_lock.lock();
677
678 let entries = {
679 let mut buffer = self.hot_buffer.write();
680 std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
681 };
682
683 if entries.is_empty() {
684 return Ok(());
685 }
686
687 let batch = Arc::new(SortedBatch::from_unsorted(entries));
688
689 {
690 let mut batches = self.warm_batches.write();
691 batches.push(batch);
692 }
693
694 Ok(())
695 }
696
697 pub fn size(&self) -> u64 {
699 self.size_bytes.load(Ordering::Relaxed)
700 }
701
702 pub fn len(&self) -> usize {
704 self.entry_count.load(Ordering::Relaxed)
705 }
706
707 pub fn is_empty(&self) -> bool {
709 self.len() == 0
710 }
711
712 pub fn batch_count(&self) -> usize {
714 self.warm_batches.read().len()
715 }
716
717 pub fn hot_buffer_len(&self) -> usize {
719 self.hot_buffer.read().len()
720 }
721
722 pub fn compact(&self) -> Result<()> {
724 let batches = {
725 let mut b = self.warm_batches.write();
726 std::mem::take(&mut *b)
727 };
728
729 if batches.len() <= 1 {
730 let mut b = self.warm_batches.write();
731 *b = batches;
732 return Ok(());
733 }
734
735 let all_entries: Vec<HotEntry> = batches
737 .iter()
738 .flat_map(|b| b.iter().cloned())
739 .collect();
740
741 let merged = Arc::new(SortedBatch::from_unsorted(all_entries));
743
744 {
745 let mut b = self.warm_batches.write();
746 b.clear();
747 b.push(merged);
748 }
749
750 Ok(())
751 }
752}
753
754impl Default for TieredMemTable {
755 fn default() -> Self {
756 Self::new()
757 }
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763
764 #[test]
765 fn test_tiered_memtable_basic() {
766 let table = TieredMemTable::new();
767
768 table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
769 table.write(b"key2", Some(b"value2".to_vec()), 1).unwrap();
770
771 let mut write_set = HashSet::new();
773 write_set.insert(SmallVec::from_slice(b"key1"));
774 write_set.insert(SmallVec::from_slice(b"key2"));
775 table.commit(1, 100, &write_set);
776
777 let v1 = table.read(b"key1", 200, None);
779 let v2 = table.read(b"key2", 200, None);
780
781 assert_eq!(v1, Some(b"value1".to_vec()));
782 assert_eq!(v2, Some(b"value2".to_vec()));
783 }
784
785 #[test]
786 fn test_tiered_memtable_uncommitted_own() {
787 let table = TieredMemTable::new();
788
789 table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
790
791 let v = table.read(b"key1", 100, Some(1));
793 assert_eq!(v, Some(b"value1".to_vec()));
794
795 let v = table.read(b"key1", 100, Some(2));
797 assert_eq!(v, None);
798 }
799
800 #[test]
801 fn test_tiered_memtable_flush() {
802 let table = TieredMemTable::with_capacity(100);
803
804 for i in 0..90 {
806 table.write(
807 format!("key{:04}", i).as_bytes(),
808 Some(format!("value{}", i).into_bytes()),
809 1,
810 ).unwrap();
811 }
812
813 table.flush().unwrap();
815
816 assert!(table.batch_count() >= 1);
817 assert_eq!(table.hot_buffer_len(), 0);
818 }
819
820 #[test]
821 fn test_tiered_memtable_scan_prefix() {
822 let table = TieredMemTable::new();
823
824 table.write(b"users:1", Some(b"alice".to_vec()), 1).unwrap();
825 table.write(b"users:2", Some(b"bob".to_vec()), 1).unwrap();
826 table.write(b"posts:1", Some(b"post1".to_vec()), 1).unwrap();
827
828 let mut write_set = HashSet::new();
829 write_set.insert(SmallVec::from_slice(b"users:1"));
830 write_set.insert(SmallVec::from_slice(b"users:2"));
831 write_set.insert(SmallVec::from_slice(b"posts:1"));
832 table.commit(1, 100, &write_set);
833
834 let results = table.scan_prefix(b"users:", 200, None);
835 assert_eq!(results.len(), 2);
836 }
837
838 #[test]
839 fn test_sorted_batch() {
840 let entries = vec![
841 HotEntry::new(SmallVec::from_slice(b"c"), Some(b"3".to_vec()), 1, 3),
842 HotEntry::new(SmallVec::from_slice(b"a"), Some(b"1".to_vec()), 1, 1),
843 HotEntry::new(SmallVec::from_slice(b"b"), Some(b"2".to_vec()), 1, 2),
844 ];
845
846 let batch = SortedBatch::from_unsorted(entries);
847
848 assert_eq!(batch.len(), 3);
849 assert_eq!(batch.get(b"a").unwrap().value, Some(b"1".to_vec()));
850 assert_eq!(batch.get(b"b").unwrap().value, Some(b"2".to_vec()));
851 assert_eq!(batch.get(b"c").unwrap().value, Some(b"3".to_vec()));
852 }
853
854 #[test]
855 fn test_sorted_batch_prefix_scan() {
856 let entries = vec![
857 HotEntry::new(SmallVec::from_slice(b"ab"), Some(b"1".to_vec()), 1, 1),
858 HotEntry::new(SmallVec::from_slice(b"abc"), Some(b"2".to_vec()), 1, 2),
859 HotEntry::new(SmallVec::from_slice(b"abd"), Some(b"3".to_vec()), 1, 3),
860 HotEntry::new(SmallVec::from_slice(b"xyz"), Some(b"4".to_vec()), 1, 4),
861 ];
862
863 let batch = SortedBatch::from_unsorted(entries);
864 let results: Vec<_> = batch.prefix_scan(b"ab").collect();
865
866 assert_eq!(results.len(), 3);
867 }
868
869 #[test]
870 fn test_scan_prefix_tournament() {
871 let table = TieredMemTable::with_capacity(100);
872
873 for batch_idx in 0..3 {
875 for i in 0..10 {
876 let key = format!("users:{:02}", i);
877 let value = format!("value_batch{}_item{}", batch_idx, i);
878 table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
879 }
880 table.flush().unwrap();
881 }
882
883 for i in 0..5 {
885 let key = format!("users:{:02}", i);
886 let value = format!("newest_value_{}", i);
887 table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
888 }
889
890 let mut write_set = HashSet::new();
892 for i in 0..10 {
893 write_set.insert(SmallVec::from_slice(format!("users:{:02}", i).as_bytes()));
894 }
895 table.commit(1, 100, &write_set);
896
897 let results = table.scan_prefix_tournament(b"users:", 200, None);
899
900 assert_eq!(results.len(), 10);
902
903 for (i, (key, value)) in results.iter().take(5).enumerate() {
905 let expected_key = format!("users:{:02}", i);
906 assert_eq!(key.as_slice(), expected_key.as_bytes());
907 assert!(String::from_utf8_lossy(value).starts_with("newest_value_"));
908 }
909 }
910
911 #[test]
912 fn test_scan_tournament_deduplication() {
913 let table = TieredMemTable::with_capacity(100);
914
915 table.write(b"key:001", Some(b"old1".to_vec()), 1).unwrap();
917 table.flush().unwrap();
918
919 table.write(b"key:001", Some(b"old2".to_vec()), 1).unwrap();
920 table.flush().unwrap();
921
922 table.write(b"key:001", Some(b"newest".to_vec()), 1).unwrap();
923
924 let mut write_set = HashSet::new();
926 write_set.insert(SmallVec::from_slice(b"key:001"));
927 table.commit(1, 100, &write_set);
928
929 let results = table.scan_prefix_tournament(b"key:", 200, None);
931 assert_eq!(results.len(), 1);
932 assert_eq!(results[0].1.as_slice(), b"newest");
933 }
934}