1use std::collections::HashSet;
64use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
65use std::sync::Arc;
66
67use dashmap::DashMap;
68use parking_lot::{Mutex, RwLock};
69use smallvec::SmallVec;
70
71use crate::durable_storage::InlineKey;
72use sochdb_core::Result;
73
74pub const DEFAULT_HOT_BUFFER_CAPACITY: usize = 100_000;
76
77pub const FLUSH_THRESHOLD_RATIO: f64 = 0.8;
79
80#[derive(Debug, Clone)]
86pub struct HotEntry {
87 pub key: InlineKey,
89 pub value: Option<Vec<u8>>,
91 pub txn_id: u64,
93 pub seq: u64,
95}
96
97impl HotEntry {
98 pub fn new(key: InlineKey, value: Option<Vec<u8>>, txn_id: u64, seq: u64) -> Self {
99 Self {
100 key,
101 value,
102 txn_id,
103 seq,
104 }
105 }
106}
107
108#[derive(Debug)]
114pub struct SortedBatch {
115 entries: Vec<HotEntry>,
117 key_index: DashMap<u64, usize>,
120 min_ts: u64,
122 max_ts: u64,
124}
125
126impl SortedBatch {
127 pub fn from_unsorted(mut entries: Vec<HotEntry>) -> Self {
129 if entries.is_empty() {
130 return Self {
131 entries: Vec::new(),
132 key_index: DashMap::new(),
133 min_ts: u64::MAX,
134 max_ts: 0,
135 };
136 }
137
138 entries.sort_unstable_by(|a, b| {
140 match a.key.as_slice().cmp(b.key.as_slice()) {
141 std::cmp::Ordering::Equal => b.seq.cmp(&a.seq), other => other,
143 }
144 });
145
146 let key_index = DashMap::new();
148 let mut last_key: Option<&[u8]> = None;
149 for (idx, entry) in entries.iter().enumerate() {
150 if last_key != Some(entry.key.as_slice()) {
151 let hash = Self::hash_key(&entry.key);
152 key_index.insert(hash, idx);
153 last_key = Some(entry.key.as_slice());
154 }
155 }
156
157 let min_ts = entries.iter().map(|e| e.seq).min().unwrap_or(u64::MAX);
159 let max_ts = entries.iter().map(|e| e.seq).max().unwrap_or(0);
160
161 Self {
162 entries,
163 key_index,
164 min_ts,
165 max_ts,
166 }
167 }
168
169 #[inline]
171 fn hash_key(key: &[u8]) -> u64 {
172 twox_hash::xxh3::hash64(key)
173 }
174
175 pub fn get(&self, key: &[u8]) -> Option<&HotEntry> {
177 let hash = Self::hash_key(key);
178
179 if let Some(idx) = self.key_index.get(&hash) {
180 let idx = *idx;
182 if idx < self.entries.len() && self.entries[idx].key.as_slice() == key {
183 return Some(&self.entries[idx]);
184 }
185 }
186
187 self.entries
189 .binary_search_by(|e| e.key.as_slice().cmp(key))
190 .ok()
191 .map(|idx| &self.entries[idx])
192 }
193
194 pub fn prefix_scan(&self, prefix: &[u8]) -> impl Iterator<Item = &HotEntry> {
196 let start_idx = self.entries
198 .partition_point(|e| e.key.as_slice() < prefix);
199
200 self.entries[start_idx..]
201 .iter()
202 .take_while(move |e| e.key.starts_with(prefix))
203 }
204
205 pub fn len(&self) -> usize {
207 self.entries.len()
208 }
209
210 pub fn is_empty(&self) -> bool {
212 self.entries.is_empty()
213 }
214
215 pub fn iter(&self) -> impl Iterator<Item = &HotEntry> {
217 self.entries.iter()
218 }
219
220 pub fn timestamp_range(&self) -> (u64, u64) {
222 (self.min_ts, self.max_ts)
223 }
224}
225
226pub struct TieredMemTable {
250 hot_buffer: RwLock<Vec<HotEntry>>,
252 hot_capacity: usize,
254 warm_batches: RwLock<Vec<Arc<SortedBatch>>>,
256 #[allow(dead_code)]
259 point_index: DashMap<Vec<u8>, (usize, usize)>,
260 seq_counter: AtomicU64,
262 size_bytes: AtomicU64,
264 entry_count: AtomicUsize,
266 pending_commits: DashMap<u64, u64>,
268 flush_lock: Mutex<()>,
270}
271
272impl TieredMemTable {
273 pub fn new() -> Self {
274 Self::with_capacity(DEFAULT_HOT_BUFFER_CAPACITY)
275 }
276
277 pub fn with_capacity(capacity: usize) -> Self {
278 Self {
279 hot_buffer: RwLock::new(Vec::with_capacity(capacity)),
280 hot_capacity: capacity,
281 warm_batches: RwLock::new(Vec::new()),
282 point_index: DashMap::new(),
283 seq_counter: AtomicU64::new(1),
284 size_bytes: AtomicU64::new(0),
285 entry_count: AtomicUsize::new(0),
286 pending_commits: DashMap::new(),
287 flush_lock: Mutex::new(()),
288 }
289 }
290
291 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
296 let key_inline = SmallVec::from_slice(key);
297 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
298 let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
299
300 let entry = HotEntry::new(key_inline, value, txn_id, seq);
301
302 {
304 let mut buffer = self.hot_buffer.write();
305 buffer.push(entry);
306 }
307
308 self.size_bytes.fetch_add((key.len() + value_size) as u64, Ordering::Relaxed);
309 self.entry_count.fetch_add(1, Ordering::Relaxed);
310
311 if self.should_flush() {
313 self.try_flush()?;
314 }
315
316 Ok(())
317 }
318
319 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
321 let mut total_size = 0u64;
322 let mut entries = Vec::with_capacity(writes.len());
323
324 for (key, value) in writes {
325 let seq = self.seq_counter.fetch_add(1, Ordering::Relaxed);
326 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
327 total_size += (key.len() + value_size) as u64;
328
329 entries.push(HotEntry::new(
330 SmallVec::from_slice(key),
331 value.clone(),
332 txn_id,
333 seq,
334 ));
335 }
336
337 {
339 let mut buffer = self.hot_buffer.write();
340 buffer.extend(entries);
341 }
342
343 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
344 self.entry_count.fetch_add(writes.len(), Ordering::Relaxed);
345
346 if self.should_flush() {
347 self.try_flush()?;
348 }
349
350 Ok(())
351 }
352
353 pub fn read(
355 &self,
356 key: &[u8],
357 snapshot_ts: u64,
358 current_txn_id: Option<u64>,
359 ) -> Option<Vec<u8>> {
360 {
362 let buffer = self.hot_buffer.read();
363 for entry in buffer.iter().rev() {
365 if entry.key.as_slice() == key {
366 if self.is_visible(entry, snapshot_ts, current_txn_id) {
368 return entry.value.clone();
369 }
370 }
371 }
372 }
373
374 {
376 let batches = self.warm_batches.read();
377 for batch in batches.iter().rev() {
379 if let Some(entry) = batch.get(key) {
380 if self.is_visible(entry, snapshot_ts, current_txn_id) {
381 return entry.value.clone();
382 }
383 }
384 }
385 }
386
387 None
388 }
389
390 #[inline]
392 fn is_visible(&self, entry: &HotEntry, snapshot_ts: u64, current_txn_id: Option<u64>) -> bool {
393 if let Some(my_txn) = current_txn_id {
395 if entry.txn_id == my_txn {
396 return true;
397 }
398 }
399
400 if let Some(commit_ts) = self.pending_commits.get(&entry.txn_id) {
402 return *commit_ts < snapshot_ts;
403 }
404
405 false
406 }
407
408 pub fn commit(&self, txn_id: u64, commit_ts: u64, _write_set: &HashSet<InlineKey>) {
410 self.pending_commits.insert(txn_id, commit_ts);
412
413 }
416
417 pub fn abort(&self, txn_id: u64) {
419 self.pending_commits.remove(&txn_id);
420
421 let mut buffer = self.hot_buffer.write();
423 buffer.retain(|e| e.txn_id != txn_id);
424 }
425
426 pub fn scan_prefix(
430 &self,
431 prefix: &[u8],
432 snapshot_ts: u64,
433 current_txn_id: Option<u64>,
434 ) -> Vec<(Vec<u8>, Vec<u8>)> {
435 let mut results = Vec::new();
436 let mut seen_keys: HashSet<Vec<u8>> = HashSet::new();
437
438 {
440 let buffer = self.hot_buffer.read();
441 for entry in buffer.iter().rev() {
442 if entry.key.starts_with(prefix)
443 && !seen_keys.contains(entry.key.as_slice())
444 && self.is_visible(entry, snapshot_ts, current_txn_id)
445 {
446 if let Some(ref value) = entry.value {
447 results.push((entry.key.to_vec(), value.clone()));
448 seen_keys.insert(entry.key.to_vec());
449 }
450 }
451 }
452 }
453
454 {
456 let batches = self.warm_batches.read();
457 for batch in batches.iter().rev() {
458 for entry in batch.prefix_scan(prefix) {
459 if !seen_keys.contains(entry.key.as_slice())
460 && self.is_visible(entry, snapshot_ts, current_txn_id)
461 {
462 if let Some(ref value) = entry.value {
463 results.push((entry.key.to_vec(), value.clone()));
464 seen_keys.insert(entry.key.to_vec());
465 }
466 }
467 }
468 }
469 }
470
471 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
473 results
474 }
475
476 pub fn scan_prefix_tournament(
496 &self,
497 prefix: &[u8],
498 snapshot_ts: u64,
499 current_txn_id: Option<u64>,
500 ) -> Vec<(Vec<u8>, Vec<u8>)> {
501 use crate::tournament_tree::TournamentTree;
502
503 let mut sorted_sources: Vec<Vec<HotEntry>> = Vec::new();
505
506 {
508 let buffer = self.hot_buffer.read();
509 let mut hot_entries: Vec<HotEntry> = buffer
510 .iter()
511 .filter(|e| e.key.starts_with(prefix))
512 .cloned()
513 .collect();
514
515 hot_entries.sort_unstable_by(|a, b| {
517 match a.key.as_slice().cmp(b.key.as_slice()) {
518 std::cmp::Ordering::Equal => b.seq.cmp(&a.seq),
519 other => other,
520 }
521 });
522
523 let mut seen = HashSet::new();
525 hot_entries.retain(|e| seen.insert(e.key.to_vec()));
526
527 if !hot_entries.is_empty() {
528 sorted_sources.push(hot_entries);
529 }
530 }
531
532 {
534 let batches = self.warm_batches.read();
535 for batch in batches.iter().rev() {
537 let entries: Vec<HotEntry> = batch
538 .prefix_scan(prefix)
539 .cloned()
540 .collect();
541
542 if !entries.is_empty() {
543 sorted_sources.push(entries);
544 }
545 }
546 }
547
548 if sorted_sources.is_empty() {
549 return Vec::new();
550 }
551
552 if sorted_sources.len() == 1 {
554 return sorted_sources
555 .into_iter()
556 .next()
557 .unwrap()
558 .into_iter()
559 .filter(|e| self.is_visible(e, snapshot_ts, current_txn_id))
560 .filter_map(|e| e.value.map(|v| (e.key.to_vec(), v)))
561 .collect();
562 }
563
564 #[derive(Clone)]
568 struct KeyedEntry {
569 entry: HotEntry,
570 source_idx: usize,
571 }
572
573 impl PartialEq for KeyedEntry {
574 fn eq(&self, other: &Self) -> bool {
575 self.entry.key.as_slice() == other.entry.key.as_slice()
576 }
577 }
578 impl Eq for KeyedEntry {}
579 impl PartialOrd for KeyedEntry {
580 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
581 Some(self.cmp(other))
582 }
583 }
584 impl Ord for KeyedEntry {
585 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
586 match self.entry.key.as_slice().cmp(other.entry.key.as_slice()) {
589 std::cmp::Ordering::Equal => self.source_idx.cmp(&other.source_idx),
590 other => other,
591 }
592 }
593 }
594
595 let iters: Vec<_> = sorted_sources
596 .into_iter()
597 .enumerate()
598 .map(|(source_idx, v)| {
599 v.into_iter().map(move |e| KeyedEntry { entry: e, source_idx })
600 })
601 .collect();
602
603 let mut tree = TournamentTree::new(iters);
604 let mut results = Vec::new();
605 let mut last_key: Option<Vec<u8>> = None;
606
607 while let Some((_, keyed)) = tree.pop() {
609 let entry = keyed.entry;
610
611 if let Some(ref last) = last_key {
613 if entry.key.as_slice() == last.as_slice() {
614 continue;
615 }
616 }
617 last_key = Some(entry.key.to_vec());
618
619 if !self.is_visible(&entry, snapshot_ts, current_txn_id) {
621 continue;
625 }
626
627 if let Some(value) = entry.value {
629 results.push((entry.key.to_vec(), value));
630 }
631 }
632
633 results
634 }
635
636 fn should_flush(&self) -> bool {
638 let buffer = self.hot_buffer.read();
639 buffer.len() >= (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize
640 }
641
642 pub fn try_flush(&self) -> Result<()> {
644 let guard = match self.flush_lock.try_lock() {
646 Some(g) => g,
647 None => return Ok(()), };
649
650 let entries = {
652 let mut buffer = self.hot_buffer.write();
653 if buffer.len() < (self.hot_capacity as f64 * FLUSH_THRESHOLD_RATIO) as usize {
654 return Ok(());
656 }
657 std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
658 };
659
660 if entries.is_empty() {
661 return Ok(());
662 }
663
664 let batch = Arc::new(SortedBatch::from_unsorted(entries));
666
667 {
669 let mut batches = self.warm_batches.write();
670 batches.push(batch);
671 }
672
673 drop(guard);
674 Ok(())
675 }
676
677 pub fn flush(&self) -> Result<()> {
679 let _guard = self.flush_lock.lock();
680
681 let entries = {
682 let mut buffer = self.hot_buffer.write();
683 std::mem::replace(&mut *buffer, Vec::with_capacity(self.hot_capacity))
684 };
685
686 if entries.is_empty() {
687 return Ok(());
688 }
689
690 let batch = Arc::new(SortedBatch::from_unsorted(entries));
691
692 {
693 let mut batches = self.warm_batches.write();
694 batches.push(batch);
695 }
696
697 Ok(())
698 }
699
700 pub fn size(&self) -> u64 {
702 self.size_bytes.load(Ordering::Relaxed)
703 }
704
705 pub fn len(&self) -> usize {
707 self.entry_count.load(Ordering::Relaxed)
708 }
709
710 pub fn is_empty(&self) -> bool {
712 self.len() == 0
713 }
714
715 pub fn batch_count(&self) -> usize {
717 self.warm_batches.read().len()
718 }
719
720 pub fn hot_buffer_len(&self) -> usize {
722 self.hot_buffer.read().len()
723 }
724
725 pub fn compact(&self) -> Result<()> {
727 let batches = {
728 let mut b = self.warm_batches.write();
729 std::mem::take(&mut *b)
730 };
731
732 if batches.len() <= 1 {
733 let mut b = self.warm_batches.write();
734 *b = batches;
735 return Ok(());
736 }
737
738 let all_entries: Vec<HotEntry> = batches
740 .iter()
741 .flat_map(|b| b.iter().cloned())
742 .collect();
743
744 let merged = Arc::new(SortedBatch::from_unsorted(all_entries));
746
747 {
748 let mut b = self.warm_batches.write();
749 b.clear();
750 b.push(merged);
751 }
752
753 Ok(())
754 }
755}
756
757impl Default for TieredMemTable {
758 fn default() -> Self {
759 Self::new()
760 }
761}
762
763#[cfg(test)]
764mod tests {
765 use super::*;
766
767 #[test]
768 fn test_tiered_memtable_basic() {
769 let table = TieredMemTable::new();
770
771 table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
772 table.write(b"key2", Some(b"value2".to_vec()), 1).unwrap();
773
774 let mut write_set = HashSet::new();
776 write_set.insert(SmallVec::from_slice(b"key1"));
777 write_set.insert(SmallVec::from_slice(b"key2"));
778 table.commit(1, 100, &write_set);
779
780 let v1 = table.read(b"key1", 200, None);
782 let v2 = table.read(b"key2", 200, None);
783
784 assert_eq!(v1, Some(b"value1".to_vec()));
785 assert_eq!(v2, Some(b"value2".to_vec()));
786 }
787
788 #[test]
789 fn test_tiered_memtable_uncommitted_own() {
790 let table = TieredMemTable::new();
791
792 table.write(b"key1", Some(b"value1".to_vec()), 1).unwrap();
793
794 let v = table.read(b"key1", 100, Some(1));
796 assert_eq!(v, Some(b"value1".to_vec()));
797
798 let v = table.read(b"key1", 100, Some(2));
800 assert_eq!(v, None);
801 }
802
803 #[test]
804 fn test_tiered_memtable_flush() {
805 let table = TieredMemTable::with_capacity(100);
806
807 for i in 0..90 {
809 table.write(
810 format!("key{:04}", i).as_bytes(),
811 Some(format!("value{}", i).into_bytes()),
812 1,
813 ).unwrap();
814 }
815
816 table.flush().unwrap();
818
819 assert!(table.batch_count() >= 1);
820 assert_eq!(table.hot_buffer_len(), 0);
821 }
822
823 #[test]
824 fn test_tiered_memtable_scan_prefix() {
825 let table = TieredMemTable::new();
826
827 table.write(b"users:1", Some(b"alice".to_vec()), 1).unwrap();
828 table.write(b"users:2", Some(b"bob".to_vec()), 1).unwrap();
829 table.write(b"posts:1", Some(b"post1".to_vec()), 1).unwrap();
830
831 let mut write_set = HashSet::new();
832 write_set.insert(SmallVec::from_slice(b"users:1"));
833 write_set.insert(SmallVec::from_slice(b"users:2"));
834 write_set.insert(SmallVec::from_slice(b"posts:1"));
835 table.commit(1, 100, &write_set);
836
837 let results = table.scan_prefix(b"users:", 200, None);
838 assert_eq!(results.len(), 2);
839 }
840
841 #[test]
842 fn test_sorted_batch() {
843 let entries = vec![
844 HotEntry::new(SmallVec::from_slice(b"c"), Some(b"3".to_vec()), 1, 3),
845 HotEntry::new(SmallVec::from_slice(b"a"), Some(b"1".to_vec()), 1, 1),
846 HotEntry::new(SmallVec::from_slice(b"b"), Some(b"2".to_vec()), 1, 2),
847 ];
848
849 let batch = SortedBatch::from_unsorted(entries);
850
851 assert_eq!(batch.len(), 3);
852 assert_eq!(batch.get(b"a").unwrap().value, Some(b"1".to_vec()));
853 assert_eq!(batch.get(b"b").unwrap().value, Some(b"2".to_vec()));
854 assert_eq!(batch.get(b"c").unwrap().value, Some(b"3".to_vec()));
855 }
856
857 #[test]
858 fn test_sorted_batch_prefix_scan() {
859 let entries = vec![
860 HotEntry::new(SmallVec::from_slice(b"ab"), Some(b"1".to_vec()), 1, 1),
861 HotEntry::new(SmallVec::from_slice(b"abc"), Some(b"2".to_vec()), 1, 2),
862 HotEntry::new(SmallVec::from_slice(b"abd"), Some(b"3".to_vec()), 1, 3),
863 HotEntry::new(SmallVec::from_slice(b"xyz"), Some(b"4".to_vec()), 1, 4),
864 ];
865
866 let batch = SortedBatch::from_unsorted(entries);
867 let results: Vec<_> = batch.prefix_scan(b"ab").collect();
868
869 assert_eq!(results.len(), 3);
870 }
871
872 #[test]
873 fn test_scan_prefix_tournament() {
874 let table = TieredMemTable::with_capacity(100);
875
876 for batch_idx in 0..3 {
878 for i in 0..10 {
879 let key = format!("users:{:02}", i);
880 let value = format!("value_batch{}_item{}", batch_idx, i);
881 table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
882 }
883 table.flush().unwrap();
884 }
885
886 for i in 0..5 {
888 let key = format!("users:{:02}", i);
889 let value = format!("newest_value_{}", i);
890 table.write(key.as_bytes(), Some(value.into_bytes()), 1).unwrap();
891 }
892
893 let mut write_set = HashSet::new();
895 for i in 0..10 {
896 write_set.insert(SmallVec::from_slice(format!("users:{:02}", i).as_bytes()));
897 }
898 table.commit(1, 100, &write_set);
899
900 let results = table.scan_prefix_tournament(b"users:", 200, None);
902
903 assert_eq!(results.len(), 10);
905
906 for (i, (key, value)) in results.iter().take(5).enumerate() {
908 let expected_key = format!("users:{:02}", i);
909 assert_eq!(key.as_slice(), expected_key.as_bytes());
910 assert!(String::from_utf8_lossy(value).starts_with("newest_value_"));
911 }
912 }
913
914 #[test]
915 fn test_scan_tournament_deduplication() {
916 let table = TieredMemTable::with_capacity(100);
917
918 table.write(b"key:001", Some(b"old1".to_vec()), 1).unwrap();
920 table.flush().unwrap();
921
922 table.write(b"key:001", Some(b"old2".to_vec()), 1).unwrap();
923 table.flush().unwrap();
924
925 table.write(b"key:001", Some(b"newest".to_vec()), 1).unwrap();
926
927 let mut write_set = HashSet::new();
929 write_set.insert(SmallVec::from_slice(b"key:001"));
930 table.commit(1, 100, &write_set);
931
932 let results = table.scan_prefix_tournament(b"key:", 200, None);
934 assert_eq!(results.len(), 1);
935 assert_eq!(results[0].1.as_slice(), b"newest");
936 }
937}