1use std::collections::HashSet;
64use std::sync::Arc;
65use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
66
67use dashmap::DashMap;
68use parking_lot::{Mutex, RwLock};
69use smallvec::SmallVec;
70
71use crate::durable_storage::InlineKey;
72use sochdb_core::Result;
73
74pub const DEFAULT_HOT_BUFFER_CAPACITY: usize = 100_000;
76
77pub const FLUSH_THRESHOLD_RATIO: f64 = 0.8;
79
80#[derive(Debug, Clone)]
86pub struct HotEntry {
87 pub key: InlineKey,
89 pub value: Option<Vec<u8>>,
91 pub txn_id: u64,
93 pub seq: u64,
95}
96
97impl HotEntry {
98 pub fn new(key: InlineKey, value: Option<Vec<u8>>, txn_id: u64, seq: u64) -> Self {
99 Self {
100 key,
101 value,
102 txn_id,
103 seq,
104 }
105 }
106}
107
108#[derive(Debug)]
114pub struct SortedBatch {
115 entries: Vec<HotEntry>,
117 key_index: DashMap<u64, usize>,
120 min_ts: u64,
122 max_ts: u64,
124}
125
126impl SortedBatch {
127 pub fn from_unsorted(mut entries: Vec<HotEntry>) -> Self {
129 if entries.is_empty() {
130 return Self {
131 entries: Vec::new(),
132 key_index: DashMap::new(),
133 min_ts: u64::MAX,
134 max_ts: 0,
135 };
136 }
137
138 entries.sort_unstable_by(|a, b| {
140 match a.key.as_slice().cmp(b.key.as_slice()) {
141 std::cmp::Ordering::Equal => b.seq.cmp(&a.seq), other => other,
143 }
144 });
145
146 let key_index = DashMap::new();
148 let mut last_key: Option<&[u8]> = None;
149 for (idx, entry) in entries.iter().enumerate() {
150 if last_key != Some(entry.key.as_slice()) {
151 let hash = Self::hash_key(&entry.key);
152 key_index.insert(hash, idx);
153 last_key = Some(entry.key.as_slice());
154 }
155 }
156
157 let min_ts = entries.iter().map(|e| e.seq).min().unwrap_or(u64::MAX);
159 let max_ts = entries.iter().map(|e| e.seq).max().unwrap_or(0);
160
161 Self {
162 entries,
163 key_index,
164 min_ts,
165 max_ts,
166 }
167 }
168
169 #[inline]
171 fn hash_key(key: &[u8]) -> u64 {
172 twox_hash::xxh3::hash64(key)
173 }
174
175 pub fn get(&self, key: &[u8]) -> Option<&HotEntry> {
177 let hash = Self::hash_key(key);
178
179 if let Some(idx) = self.key_index.get(&hash) {
180 let idx = *idx;
182 if idx < self.entries.len() && self.entries[idx].key.as_slice() == key {
183 return Some(&self.entries[idx]);
184 }
185 }
186
187 self.entries
189 .binary_search_by(|e| e.key.as_slice().cmp(key))
190 .ok()
191 .map(|idx| &self.entries[idx])
192 }
193
194 pub fn prefix_scan(&self, prefix: &[u8]) -> impl Iterator<Item = &HotEntry> {
196 let start_idx = self.entries.partition_point(|e| e.key.as_slice() < prefix);
198
199 self.entries[start_idx..]
200 .iter()
201 .take_while(move |e| e.key.starts_with(prefix))
202 }
203
204 pub fn len(&self) -> usize {
206 self.entries.len()
207 }
208
209 pub fn is_empty(&self) -> bool {
211 self.entries.is_empty()
212 }
213
214 pub fn iter(&self) -> impl Iterator<Item = &HotEntry> {
216 self.entries.iter()
217 }
218
219 pub fn timestamp_range(&self) -> (u64, u64) {
221 (self.min_ts, self.max_ts)
222 }
223}
224
225pub struct TieredMemTable {
249 hot_buffer: RwLock<Vec<HotEntry>>,
251 hot_capacity: usize,
253 warm_batches: RwLock<Vec<Arc<SortedBatch>>>,
255 #[allow(dead_code)]
258 point_index: DashMap<Vec<u8>, (usize, usize)>,
259 seq_counter: AtomicU64,
261 size_bytes: AtomicU64,
263 entry_count: AtomicUsize,
265 pending_commits: DashMap<u64, u64>,
267 flush_lock: Mutex<()>,
269}
270
271impl TieredMemTable {
272 pub fn new() -> Self {
273 Self::with_capacity(DEFAULT_HOT_BUFFER_CAPACITY)
274 }
275
276 pub fn with_capacity(capacity: usize) -> Self {
277 Self {
278 hot_buffer: RwLock::new(Vec::with_capacity(capacity)),
279 hot_capacity: capacity,
280 warm_batches: RwLock::new(Vec::new()),
281 point_index: DashMap::new(),
282 seq_counter: AtomicU64::new(1),
283 size_bytes: AtomicU64::new(0),
284 entry_count: AtomicUsize::new(0),
285 pending_commits: DashMap::new(),
286 flush_lock: Mutex::new(()),
287 }
288 }
289
290 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
295 let key_inline = SmallVec::from_slice(key);
296 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
297 let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
298
299 let entry = HotEntry::new(key_inline, value, txn_id, seq);
300
301 {
303 let mut buffer = self.hot_buffer.write();
304 buffer.push(entry);
305 }
306
307 self.size_bytes
308 .fetch_add((key.len() + value_size) as u64, Ordering::Relaxed);
309 self.entry_count.fetch_add(1, Ordering::Relaxed);
310
311 if self.should_flush() {
313 self.try_flush()?;
314 }
315
316 Ok(())
317 }
318
319 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
321 let mut total_size = 0u64;
322 let mut entries = Vec::with_capacity(writes.len());
323
324 for (key, value) in writes {
325 let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
326 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
327 total_size += (key.len() + value_size) as u64;
328
329 entries.push(HotEntry::new(
330 SmallVec::from_slice(key),
331 value.clone(),
332 txn_id,
333 seq,
334 ));
335 }
336
337 {
339 let mut buffer = self.hot_buffer.write();
340 buffer.extend(entries);
341 }
342
343 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
344 self.entry_count.fetch_add(writes.len(), Ordering::Relaxed);
345
346 if self.should_flush() {
347 self.try_flush()?;
348 }
349
350 Ok(())
351 }
352
353 pub fn read(
355 &self,
356 key: &[u8],
357 snapshot_ts: u64,
358 current_txn_id: Option<u64>,
359 ) -> Option<Vec<u8>> {
360 {
362 let buffer = self.hot_buffer.read();
363 for entry in buffer.iter().rev() {
365 if entry.key.as_slice() == key {
366 if self.is_visible(entry, snapshot_ts, current_txn_id) {
368 return entry.value.clone();
369 }
370 }
371 }
372 }
373
374 {
376 let batches = self.warm_batches.read();
377 for batch in batches.iter().rev() {
379 if let Some(entry) = batch.get(key) {
380 if self.is_visible(entry, snapshot_ts, current_txn_id) {
381 return entry.value.clone();
382 }
383 }
384 }
385 }
386
387 None
388 }
389
390 #[inline]
392 fn is_visible(&self, entry: &HotEntry, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
393 if let Some(my_txn) = current_txn_id {
395 if entry.txn_id == my_txn {
396 return true;
397 }
398 }
399
400 if let Some(commit_ts) = self.pending_commits.get(&entry.txn_id) {
402 return *commit_ts < snapshot_ts;
403 }
404
405 false
406 }
407
408 pub fn commit(&self, txn_id: u64, commit_ts: u64, _write_set: &HashSet<InlineKey>) {
410 self.pending_commits.insert(txn_id, commit_ts);
412
413 }
416
417 pub fn abort(&self, txn_id: u64) {
419 self.pending_commits.remove(&txn_id);
420
421 let mut buffer = self.hot_buffer.write();
423 buffer.retain(|e| e.txn_id != txn_id);
424 }
425
426 pub fn scan_prefix(
430 &self,
431 prefix: &[u8],
432 snapshot_ts: u64,
433 current_txn_id: Option<u64>,
434 ) -> Vec<(Vec<u8>, Vec<u8>)> {
435 let mut results = Vec::new();
436 let mut seen_keys: HashSet<Vec<u8>> = HashSet::new();
437
438 {
440 let buffer = self.hot_buffer.read();
441 for entry in buffer.iter().rev() {
442 if entry.key.starts_with(prefix)
443 && !seen_keys.contains(entry.key.as_slice())
444 && self.is_visible(entry, snapshot_ts, current_txn_id)
445 {
446 if let Some(ref value) = entry.value {
447 results.push((entry.key.to_vec(), value.clone()));
448 seen_keys.insert(entry.key.to_vec());
449 }
450 }
451 }
452 }
453
454 {
456 let batches = self.warm_batches.read();
457 for batch in batches.iter().rev() {
458 for entry in batch.prefix_scan(prefix) {
459 if !seen_keys.contains(entry.key.as_slice())
460 && self.is_visible(entry, snapshot_ts, current_txn_id)
461 {
462 if let Some(ref value) = entry.value {
463 results.push((entry.key.to_vec(), value.clone()));
464 seen_keys.insert(entry.key.to_vec());
465 }
466 }
467 }
468 }
469 }
470
471 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
473 results
474 }
475
476 pub fn scan_prefix_tournament(
496 &self,
497 prefix: &[u8],
498 snapshot_ts: u64,
499 current_txn_id: Option<u64>,
500 ) -> Vec<(Vec<u8>, Vec<u8>)> {
501 use crate::tournament_tree::TournamentTree;
502
503 let mut sorted_sources: Vec<Vec<HotEntry>> = Vec::new();
505
506 {
508 let buffer = self.hot_buffer.read();
509 let mut hot_entries: Vec<HotEntry> = buffer
510 .iter()
511 .filter(|e| e.key.starts_with(prefix))
512 .cloned()
513 .collect();
514
515 hot_entries.sort_unstable_by(|a, b| match a.key.as_slice().cmp(b.key.as_slice()) {
517 std::cmp::Ordering::Equal => b.seq.cmp(&a.seq),
518 other => other,
519 });
520
521 let mut seen = HashSet::new();
523 hot_entries.retain(|e| seen.insert(e.key.to_vec()));
524
525 if !hot_entries.is_empty() {
526 sorted_sources.push(hot_entries);
527 }
528 }
529
530 {
532 let batches = self.warm_batches.read();
533 for batch in batches.iter().rev() {
535 let entries: Vec<HotEntry> = batch.prefix_scan(prefix).cloned().collect();
536
537 if !entries.is_empty() {
538 sorted_sources.push(entries);
539 }
540 }
541 }
542
543 if sorted_sources.is_empty() {
544 return Vec::new();
545 }
546
547 if sorted_sources.len() == 1 {
549 return sorted_sources
550 .into_iter()
551 .next()
552 .unwrap()
553 .into_iter()
554 .filter(|e| self.is_visible(e, snapshot_ts, current_txn_id))
555 .filter_map(|e| e.value.map(|v| (e.key.to_vec(), v)))
556 .collect();
557 }
558
559 #[derive(Clone)]
563 struct KeyedEntry {
564 entry: HotEntry,
565 source_idx: usize,
566 }
567
568 impl PartialEq for KeyedEntry {
569 fn eq(&self, other: &Self) -> bool {
570 self.entry.key.as_slice() == other.entry.key.as_slice()
571 }
572 }
573 impl Eq for KeyedEntry {}
574 impl PartialOrd for KeyedEntry {
575 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
576 Some(self.cmp(other))
577 }
578 }
579 impl Ord for KeyedEntry {
580 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
581 match self.entry.key.as_slice().cmp(other.entry.key.as_slice()) {
584 std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
585 other => other,
586 }
587 }
588 }
589
590 let iters: Vec<_> = sorted_sources
591 .into_iter()
592 .enumerate()
593 .map(|(source_idx, v)| {
594 v.into_iter().map(move |e| KeyedEntry {
595 entry: e,
596 source_idx,
597 })
598 })
599 .collect();
600
601 let mut tree = TournamentTree::new(iters);
602 let mut results = Vec::new();
603 let mut last_key: Option<Vec<u8>> = None;
604
605 while let Some((_, keyed)) = tree.pop() {
607 let entry = keyed.entry;
608
609 if let Some(ref last) = last_key {
611 if entry.key.as_slice() == last.as_slice() {
612 continue;
613 }
614 }
615 last_key = Some(entry.key.to_vec());
616
617 if !self.is_visible(&entry, snapshot_ts, current_txn_id) {
619 continue;
623 }
624
625 if let Some(value) = entry.value {
627 results.push((entry.key.to_vec(), value));
628 }
629 }
630
631 results
632 }
633
634 fn should_flush(&self) -> bool {
636 let buffer = self.hot_buffer.read();
637 buffer.len() >= (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize
638 }
639
640 pub fn try_flush(&self) -> Result<()> {
642 let guard = match self.flush_lock.try_lock() {
644 Some(g) => g,
645 None => return Ok(()), };
647
648 let entries = {
650 let mut buffer = self.hot_buffer.write();
651 if buffer.len() < (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize {
652 return Ok(());
654 }
655 std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
656 };
657
658 if entries.is_empty() {
659 return Ok(());
660 }
661
662 let batch = Arc::new(SortedBatch::from_unsorted(entries));
664
665 {
667 let mut batches = self.warm_batches.write();
668 batches.push(batch);
669 }
670
671 drop(guard);
672 Ok(())
673 }
674
675 pub fn flush(&self) -> Result<()> {
677 let _guard = self.flush_lock.lock();
678
679 let entries = {
680 let mut buffer = self.hot_buffer.write();
681 std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
682 };
683
684 if entries.is_empty() {
685 return Ok(());
686 }
687
688 let batch = Arc::new(SortedBatch::from_unsorted(entries));
689
690 {
691 let mut batches = self.warm_batches.write();
692 batches.push(batch);
693 }
694
695 Ok(())
696 }
697
698 pub fn size(&self) -> u64 {
700 self.size_bytes.load(Ordering::Relaxed)
701 }
702
703 pub fn len(&self) -> usize {
705 self.entry_count.load(Ordering::Relaxed)
706 }
707
708 pub fn is_empty(&self) -> bool {
710 self.len() == 0
711 }
712
713 pub fn batch_count(&self) -> usize {
715 self.warm_batches.read().len()
716 }
717
718 pub fn hot_buffer_len(&self) -> usize {
720 self.hot_buffer.read().len()
721 }
722
723 pub fn compact(&self) -> Result<()> {
725 let batches = {
726 let mut b = self.warm_batches.write();
727 std::mem::take(&mut *b)
728 };
729
730 if batches.len() <= 1 {
731 let mut b = self.warm_batches.write();
732 *b = batches;
733 return Ok(());
734 }
735
736 let all_entries: Vec<HotEntry> = batches.iter().flat_map(|b| b.iter().cloned()).collect();
738
739 let merged = Arc::new(SortedBatch::from_unsorted(all_entries));
741
742 {
743 let mut b = self.warm_batches.write();
744 b.clear();
745 b.push(merged);
746 }
747
748 Ok(())
749 }
750}
751
752impl Default for TieredMemTable {
753 fn default() -> Self {
754 Self::new()
755 }
756}
757
758#[cfg(test)]
759mod tests {
760 use super::*;
761
762 #[test]
763 fn test_tiered_memtable_basic() {
764 let table = TieredMemTable::new();
765
766 table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
767 table.write(b"key2", Some(b"value2".to_vec()), 1).unwrap();
768
769 let mut write_set = HashSet::new();
771 write_set.insert(SmallVec::from_slice(b"key1"));
772 write_set.insert(SmallVec::from_slice(b"key2"));
773 table.commit(1, 100, &write_set);
774
775 let v1 = table.read(b"key1", 200, None);
777 let v2 = table.read(b"key2", 200, None);
778
779 assert_eq!(v1, Some(b"value1".to_vec()));
780 assert_eq!(v2, Some(b"value2".to_vec()));
781 }
782
783 #[test]
784 fn test_tiered_memtable_uncommitted_own() {
785 let table = TieredMemTable::new();
786
787 table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
788
789 let v = table.read(b"key1", 100, Some(1));
791 assert_eq!(v, Some(b"value1".to_vec()));
792
793 let v = table.read(b"key1", 100, Some(2));
795 assert_eq!(v, None);
796 }
797
798 #[test]
799 fn test_tiered_memtable_flush() {
800 let table = TieredMemTable::with_capacity(100);
801
802 for i in 0..90 {
804 table
805 .write(
806 format!("key{:04}", i).as_bytes(),
807 Some(format!("value{}", i).into_bytes()),
808 1,
809 )
810 .unwrap();
811 }
812
813 table.flush().unwrap();
815
816 assert!(table.batch_count() >= 1);
817 assert_eq!(table.hot_buffer_len(), 0);
818 }
819
820 #[test]
821 fn test_tiered_memtable_scan_prefix() {
822 let table = TieredMemTable::new();
823
824 table.write(b"users:1", Some(b"alice".to_vec()), 1).unwrap();
825 table.write(b"users:2", Some(b"bob".to_vec()), 1).unwrap();
826 table.write(b"posts:1", Some(b"post1".to_vec()), 1).unwrap();
827
828 let mut write_set = HashSet::new();
829 write_set.insert(SmallVec::from_slice(b"users:1"));
830 write_set.insert(SmallVec::from_slice(b"users:2"));
831 write_set.insert(SmallVec::from_slice(b"posts:1"));
832 table.commit(1, 100, &write_set);
833
834 let results = table.scan_prefix(b"users:", 200, None);
835 assert_eq!(results.len(), 2);
836 }
837
838 #[test]
839 fn test_sorted_batch() {
840 let entries = vec![
841 HotEntry::new(SmallVec::from_slice(b"c"), Some(b"3".to_vec()), 1, 3),
842 HotEntry::new(SmallVec::from_slice(b"a"), Some(b"1".to_vec()), 1, 1),
843 HotEntry::new(SmallVec::from_slice(b"b"), Some(b"2".to_vec()), 1, 2),
844 ];
845
846 let batch = SortedBatch::from_unsorted(entries);
847
848 assert_eq!(batch.len(), 3);
849 assert_eq!(batch.get(b"a").unwrap().value, Some(b"1".to_vec()));
850 assert_eq!(batch.get(b"b").unwrap().value, Some(b"2".to_vec()));
851 assert_eq!(batch.get(b"c").unwrap().value, Some(b"3".to_vec()));
852 }
853
854 #[test]
855 fn test_sorted_batch_prefix_scan() {
856 let entries = vec![
857 HotEntry::new(SmallVec::from_slice(b"ab"), Some(b"1".to_vec()), 1, 1),
858 HotEntry::new(SmallVec::from_slice(b"abc"), Some(b"2".to_vec()), 1, 2),
859 HotEntry::new(SmallVec::from_slice(b"abd"), Some(b"3".to_vec()), 1, 3),
860 HotEntry::new(SmallVec::from_slice(b"xyz"), Some(b"4".to_vec()), 1, 4),
861 ];
862
863 let batch = SortedBatch::from_unsorted(entries);
864 let results: Vec<_> = batch.prefix_scan(b"ab").collect();
865
866 assert_eq!(results.len(), 3);
867 }
868
869 #[test]
870 fn test_scan_prefix_tournament() {
871 let table = TieredMemTable::with_capacity(100);
872
873 for batch_idx in 0..3 {
875 for i in 0..10 {
876 let key = format!("users:{:02}", i);
877 let value = format!("value_batch{}_item{}", batch_idx, i);
878 table
879 .write(key.as_bytes(), Some(value.into_bytes()), 1)
880 .unwrap();
881 }
882 table.flush().unwrap();
883 }
884
885 for i in 0..5 {
887 let key = format!("users:{:02}", i);
888 let value = format!("newest_value_{}", i);
889 table
890 .write(key.as_bytes(), Some(value.into_bytes()), 1)
891 .unwrap();
892 }
893
894 let mut write_set = HashSet::new();
896 for i in 0..10 {
897 write_set.insert(SmallVec::from_slice(format!("users:{:02}", i).as_bytes()));
898 }
899 table.commit(1, 100, &write_set);
900
901 let results = table.scan_prefix_tournament(b"users:", 200, None);
903
904 assert_eq!(results.len(), 10);
906
907 for (i, (key, value)) in results.iter().take(5).enumerate() {
909 let expected_key = format!("users:{:02}", i);
910 assert_eq!(key.as_slice(), expected_key.as_bytes());
911 assert!(String::from_utf8_lossy(value).starts_with("newest_value_"));
912 }
913 }
914
915 #[test]
916 fn test_scan_tournament_deduplication() {
917 let table = TieredMemTable::with_capacity(100);
918
919 table.write(b"key:001", Some(b"old1".to_vec()), 1).unwrap();
921 table.flush().unwrap();
922
923 table.write(b"key:001", Some(b"old2".to_vec()), 1).unwrap();
924 table.flush().unwrap();
925
926 table
927 .write(b"key:001", Some(b"newest".to_vec()), 1)
928 .unwrap();
929
930 let mut write_set = HashSet::new();
932 write_set.insert(SmallVec::from_slice(b"key:001"));
933 table.commit(1, 100, &write_set);
934
935 let results = table.scan_prefix_tournament(b"key:", 200, None);
937 assert_eq!(results.len(), 1);
938 assert_eq!(results[0].1.as_slice(), b"newest");
939 }
940}