1use std::collections::HashSet;
58use std::path::{Path, PathBuf};
59use std::sync::Arc;
60use std::sync::atomic::{AtomicU64, Ordering};
61
62use dashmap::DashMap;
63use smallvec::SmallVec;
64
65use crossbeam_skiplist::SkipMap;
66
67use crate::deferred_index::{DeferredSortedIndex, DeferredIndexConfig};
68use crate::group_commit::EventDrivenGroupCommit;
69use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
70use sochdb_core::{Result, SochDBError};
71use sochdb_core::version_chain::{
72 BinarySearchChain, ChainEntry,
73 MvccVersionChain, MvccVersionChainMut, WriteConflictDetection,
74 VisibilityContext, TxnId, Timestamp,
75};
76
77#[derive(Clone, Debug)]
98pub struct SsiBloomFilter {
99 bits: Option<Vec<u64>>,
101 expected_capacity: usize,
103 num_hashes: u32,
105}
106
107impl SsiBloomFilter {
108 const BITS_PER_ITEM: f64 = 9.6;
111
112 const DEFAULT_NUM_HASHES: u32 = 7;
115
116 const MIN_CAPACITY: usize = 64;
118
119 #[inline]
124 pub fn new(expected_items: usize) -> Self {
125 Self {
126 bits: None,
127 expected_capacity: expected_items.max(Self::MIN_CAPACITY),
128 num_hashes: Self::DEFAULT_NUM_HASHES,
129 }
130 }
131
132 pub fn with_word_capacity(words: usize) -> Self {
134 Self {
135 bits: None,
136 expected_capacity: words.max(1) * 64 / 10, num_hashes: Self::DEFAULT_NUM_HASHES,
138 }
139 }
140
141 #[inline]
143 fn ensure_allocated(&mut self) {
144 if self.bits.is_none() {
145 let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
146 let num_words = num_bits.div_ceil(64);
147 self.bits = Some(vec![0u64; num_words]);
148 }
149 }
150
151 #[inline]
153 pub fn insert(&mut self, key: &[u8]) {
154 self.ensure_allocated();
155 let bits = self.bits.as_mut().unwrap();
156 let num_bits = bits.len() * 64;
157 if num_bits == 0 {
158 return;
159 }
160
161 let h1 = Self::hash1(key);
164 let h2 = Self::hash2(key);
165
166 for i in 0..self.num_hashes {
167 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
168 let bit_idx = (h as usize) % num_bits;
169 let word_idx = bit_idx / 64;
170 let bit_pos = bit_idx % 64;
171 bits[word_idx] |= 1 << bit_pos;
172 }
173 }
174
175 #[inline]
181 pub fn may_contain(&self, key: &[u8]) -> bool {
182 let bits = match &self.bits {
183 Some(b) => b,
184 None => return false, };
186 let num_bits = bits.len() * 64;
187 if num_bits == 0 {
188 return false;
189 }
190
191 let h1 = Self::hash1(key);
192 let h2 = Self::hash2(key);
193
194 for i in 0..self.num_hashes {
195 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
196 let bit_idx = (h as usize) % num_bits;
197 let word_idx = bit_idx / 64;
198 let bit_pos = bit_idx % 64;
199 if bits[word_idx] & (1 << bit_pos) == 0 {
200 return false; }
202 }
203 true }
205
206 #[inline]
211 pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
212 let (self_bits, other_bits) = match (&self.bits, &other.bits) {
213 (Some(s), Some(o)) => (s, o),
214 _ => return false, };
216 let min_len = self_bits.len().min(other_bits.len());
217 for i in 0..min_len {
218 if self_bits[i] & other_bits[i] != 0 {
219 return true; }
221 }
222 false }
224
225 #[inline]
227 fn hash1(key: &[u8]) -> u64 {
228 use std::collections::hash_map::DefaultHasher;
229 use std::hash::{Hash, Hasher};
230 let mut hasher = DefaultHasher::new();
231 key.hash(&mut hasher);
232 hasher.finish()
233 }
234
235 #[inline]
237 fn hash2(key: &[u8]) -> u64 {
238 twox_hash::xxh3::hash64(key)
239 }
240
241 pub fn size_bytes(&self) -> usize {
243 self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
244 }
245
246 pub fn is_empty(&self) -> bool {
248 match &self.bits {
249 Some(bits) => bits.iter().all(|&w| w == 0),
250 None => true,
251 }
252 }
253}
254
255pub type InlineKey = SmallVec<[u8; 32]>;
258
259#[derive(Debug, Clone)]
261pub struct Version {
262 pub value: Option<Vec<u8>>,
264 pub txn_id: u64,
266 pub commit_ts: u64,
268}
269
270impl ChainEntry for Version {
272 #[inline] fn commit_ts(&self) -> u64 { self.commit_ts }
273 #[inline] fn txn_id(&self) -> u64 { self.txn_id }
274 #[inline] fn set_commit_ts(&mut self, ts: u64) { self.commit_ts = ts; }
275}
276
277#[derive(Debug, Default)]
299pub struct VersionChain {
300 inner: BinarySearchChain<Version>,
302}
303
304impl VersionChain {
305 #[inline]
307 pub fn new() -> Self {
308 Self { inner: BinarySearchChain::new() }
309 }
310
311 #[inline]
316 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
317 match self.inner.uncommitted_mut() {
318 Some(v) if v.txn_id == txn_id => {
319 v.value = value;
321 }
322 _ => {
323 self.inner.set_uncommitted(Version {
325 value,
326 txn_id,
327 commit_ts: 0,
328 });
329 }
330 }
331 }
332
333 #[inline]
337 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
338 self.inner.commit(txn_id, commit_ts)
339 }
340
341 #[inline]
345 pub fn abort(&mut self, txn_id: u64) {
346 self.inner.abort(txn_id);
347 }
348
349 #[inline]
353 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
354 self.inner.read_at(snapshot_ts, current_txn_id)
355 }
356
357 #[inline]
361 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
362 self.inner.has_write_conflict(my_txn_id)
363 }
364
365 pub fn gc(&mut self, min_active_ts: u64) {
367 self.inner.gc_by_ts(min_active_ts);
368 }
369
370 #[inline]
372 pub fn version_count(&self) -> usize {
373 self.inner.version_count()
374 }
375
376 #[cfg(test)]
378 pub fn versions(&self) -> Vec<Version> {
379 let mut result = self.inner.committed_versions().to_vec();
380 if let Some(v) = self.inner.uncommitted() {
381 result.push(v.clone());
382 }
383 result
384 }
385}
386
387impl MvccVersionChain for VersionChain {
392 type Value = Option<Vec<u8>>;
393
394 fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
395 self.inner.read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
397 .map(|v| &v.value)
398 }
399
400 fn get_latest(&self) -> Option<&Self::Value> {
401 self.inner.latest().map(|v| &v.value)
402 }
403
404 fn version_count(&self) -> usize {
405 self.inner.version_count()
406 }
407}
408
409impl MvccVersionChainMut for VersionChain {
410 fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
411 self.add_uncommitted(value, txn_id);
412 }
413
414 fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
415 self.inner.commit(txn_id, commit_ts)
416 }
417
418 fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
419 self.add_uncommitted(None, txn_id);
421 true
422 }
423
424 fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
425 let before = self.inner.committed_count();
426 self.inner.gc_by_ts(min_visible_ts);
427 let removed = before - self.inner.committed_count();
428 (removed, removed * std::mem::size_of::<Version>())
429 }
430}
431
432impl WriteConflictDetection for VersionChain {
433 fn has_write_conflict(&self, txn_id: TxnId) -> bool {
434 self.has_write_conflict(txn_id)
435 }
436}
437
438const WRITE_SET_INITIAL_CAPACITY: usize = 32;
446
447const READ_SET_INITIAL_CAPACITY: usize = 64;
450
451#[derive(Debug, Clone)]
453pub struct MvccTransaction {
454 pub txn_id: u64,
456 pub snapshot_ts: u64,
458 pub write_set: HashSet<InlineKey>,
461 pub read_set: HashSet<InlineKey>,
464 pub write_bloom: SsiBloomFilter,
466 pub read_bloom: SsiBloomFilter,
468 pub state: TxnState,
470 pub mode: TransactionMode,
473}
474
475impl MvccTransaction {
476 #[inline]
481 pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
482 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
483 }
484
485 #[inline]
496 pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
497 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
498 }
499
500 #[inline]
507 pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
508 Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
509 }
510
511 #[inline]
513 pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
514 let (write_capacity, read_capacity) = match mode {
516 TransactionMode::ReadOnly => (0, 0), TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
518 TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
519 };
520 Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
521 }
522
523 #[inline]
528 pub fn with_capacity(
529 txn_id: u64,
530 snapshot_ts: u64,
531 write_capacity: usize,
532 read_capacity: usize,
533 mode: TransactionMode,
534 ) -> Self {
535 Self {
536 txn_id,
537 snapshot_ts,
538 write_set: HashSet::with_capacity(write_capacity),
539 read_set: HashSet::with_capacity(read_capacity),
540 write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
541 read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
542 state: TxnState::Active,
543 mode,
544 }
545 }
546
547 #[inline]
549 pub fn is_read_only(&self) -> bool {
550 self.write_set.is_empty()
551 }
552
553 #[inline]
555 pub fn is_single_key_write(&self) -> bool {
556 self.write_set.len() == 1 && self.read_set.len() <= 1
557 }
558}
559
560#[derive(Debug, Clone, Copy, PartialEq, Eq)]
562pub enum TxnState {
563 Active,
564 Committed,
565 Aborted,
566}
567
568#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
596pub enum TransactionMode {
597 ReadOnly,
601
602 WriteOnly,
606
607 #[default]
611 ReadWrite,
612}
613
614impl TransactionMode {
615 #[inline]
617 pub fn tracks_reads(&self) -> bool {
618 matches!(self, TransactionMode::ReadWrite)
619 }
620
621 #[inline]
623 pub fn tracks_writes(&self) -> bool {
624 matches!(self, TransactionMode::WriteOnly | TransactionMode::ReadWrite)
625 }
626
627 #[inline]
629 pub fn needs_ssi_validation(&self) -> bool {
630 matches!(self, TransactionMode::ReadWrite)
631 }
632}
633
634#[derive(Debug, Clone, Copy, PartialEq, Eq)]
636pub enum ConflictType {
637 ReadWrite,
639 WriteRead,
641}
642
643#[derive(Debug, Clone)]
645pub struct ConflictEdge {
646 pub from_txn: u64,
648 pub to_txn: u64,
650 pub conflict_type: ConflictType,
652}
653
654#[allow(clippy::type_complexity)]
660pub struct MvccManager {
661 active_txns: DashMap<u64, MvccTransaction>,
663 ts_counter: AtomicU64,
665 min_active_ts: AtomicU64,
667 recent_commits: DashMap<
671 u64,
672 (
673 u64,
674 SsiBloomFilter,
675 SsiBloomFilter,
676 HashSet<InlineKey>,
677 HashSet<InlineKey>,
678 ),
679 >,
680 max_recent_commits: usize,
682}
683
684impl Default for MvccManager {
685 fn default() -> Self {
686 Self::new()
687 }
688}
689
690impl MvccManager {
691 pub fn new() -> Self {
692 Self {
693 active_txns: DashMap::new(),
694 ts_counter: AtomicU64::new(1),
695 min_active_ts: AtomicU64::new(0),
696 recent_commits: DashMap::new(),
697 max_recent_commits: 1000, }
699 }
700
701 pub fn begin(&self, txn_id: u64) -> MvccTransaction {
705 self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
706 }
707
708 #[inline]
719 pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
720 self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
721 }
722
723 #[inline]
728 pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
729 self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
730 }
731
732 pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
737 let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
738
739 let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
741
742 self.active_txns.insert(txn_id, txn.clone());
743 self.update_min_active_ts();
744
745 txn
746 }
747
748 pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
750 self.active_txns.get(&txn_id).map(|t| t.clone())
751 }
752
753 #[inline]
756 pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
757 self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
758 }
759
760 #[inline]
770 pub fn record_read(&self, txn_id: u64, key: &[u8]) {
771 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
772 if !txn.mode.tracks_reads() {
774 return;
775 }
776
777 if txn.read_set.len() < 10000 {
779 txn.read_set.insert(SmallVec::from_slice(key));
780 txn.read_bloom.insert(key);
781 }
782 }
783 }
784
785 pub fn record_write(&self, txn_id: u64, key: &[u8]) {
790 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
791 txn.write_set.insert(SmallVec::from_slice(key));
792 txn.write_bloom.insert(key);
793 }
794 }
795
796 pub fn alloc_commit_ts(&self) -> u64 {
798 self.ts_counter.fetch_add(1, Ordering::SeqCst)
799 }
800
801 pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
811 let txn = self.active_txns.get(&txn_id)?.clone();
813
814 if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
817 if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
820 self.active_txns.remove(&txn_id);
822 self.update_min_active_ts();
823 return None;
824 }
825 }
826
827 let commit_ts = self.alloc_commit_ts();
828
829 let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
831
832 let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
835 && !removed_txn.read_set.is_empty()
836 && !removed_txn.write_set.is_empty();
837
838 if needs_ssi_tracking {
839 let write_set_for_return = removed_txn.write_set.clone();
841
842 self.track_commit_owned(
843 txn_id,
844 commit_ts,
845 removed_txn.read_bloom,
846 removed_txn.write_bloom,
847 removed_txn.read_set,
848 removed_txn.write_set,
849 );
850
851 self.update_min_active_ts();
852 Some((commit_ts, write_set_for_return))
853 } else {
854 self.update_min_active_ts();
856 Some((commit_ts, removed_txn.write_set))
857 }
858 }
859
860 #[inline]
884 fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
885 if txn.write_set.is_empty() {
891 return true;
892 }
893
894 if self.recent_commits.is_empty() {
898 return true;
899 }
900
901 if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
911 return true;
912 }
913
914 let my_snapshot = txn.snapshot_ts;
915
916 let mut any_may_intersect = false;
923 for entry in self.recent_commits.iter() {
924 let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
925
926 if *other_commit_ts <= my_snapshot {
928 continue;
929 }
930
931 if txn.write_bloom.may_intersect(other_read_bloom)
934 || other_write_bloom.may_intersect(&txn.read_bloom)
935 {
936 any_may_intersect = true;
937 break;
938 }
939 }
940
941 if !any_may_intersect {
943 return true;
944 }
945
946 let mut in_conflict_with: Vec<u64> = Vec::new();
955 let mut out_conflict_to: Vec<u64> = Vec::new();
956
957 for entry in self.recent_commits.iter() {
958 let (
959 other_txn_id,
960 (
961 other_commit_ts,
962 _other_read_bloom,
963 other_write_bloom,
964 other_read_set,
965 other_write_set,
966 ),
967 ) = entry.pair();
968
969 if *other_commit_ts <= my_snapshot {
972 continue;
973 }
974
975 let mut has_in_conflict = false;
981 for key in txn.read_set.iter() {
982 if other_write_bloom.may_contain(key) {
983 if other_write_set.contains(key) {
985 has_in_conflict = true;
986 break;
987 }
988 }
989 }
990 if has_in_conflict {
991 in_conflict_with.push(*other_txn_id);
992 }
993
994 let mut has_out_conflict = false;
999 for key in other_read_set.iter() {
1000 if txn.write_bloom.may_contain(key) {
1001 if txn.write_set.contains(key) {
1003 has_out_conflict = true;
1004 break;
1005 }
1006 }
1007 }
1008 if has_out_conflict {
1009 out_conflict_to.push(*other_txn_id);
1010 }
1011 }
1012
1013 if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1019 return false; }
1021
1022 true
1023 }
1024
1025 fn track_commit_owned(
1036 &self,
1037 txn_id: u64,
1038 commit_ts: u64,
1039 read_bloom: SsiBloomFilter,
1040 write_bloom: SsiBloomFilter,
1041 read_set: HashSet<InlineKey>,
1042 write_set: HashSet<InlineKey>,
1043 ) {
1044 if read_set.is_empty() || write_set.is_empty() {
1048 return; }
1050
1051 self.recent_commits.insert(
1054 txn_id,
1055 (
1056 commit_ts,
1057 read_bloom,
1058 write_bloom,
1059 read_set,
1060 write_set,
1061 ),
1062 );
1063
1064 if self.recent_commits.len() > self.max_recent_commits * 2 {
1067 let min_active = self.min_active_ts.load(Ordering::Relaxed);
1069 self.recent_commits
1070 .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1071 }
1072 }
1073
1074 #[allow(dead_code)]
1076 fn track_commit(
1077 &self,
1078 txn_id: u64,
1079 commit_ts: u64,
1080 read_bloom: SsiBloomFilter,
1081 write_bloom: SsiBloomFilter,
1082 read_set: &HashSet<InlineKey>,
1083 write_set: &HashSet<InlineKey>,
1084 ) {
1085 if read_set.is_empty() || write_set.is_empty() {
1086 return;
1087 }
1088 self.recent_commits.insert(
1089 txn_id,
1090 (
1091 commit_ts,
1092 read_bloom,
1093 write_bloom,
1094 read_set.clone(),
1095 write_set.clone(),
1096 ),
1097 );
1098 }
1099
1100 pub fn abort(&self, txn_id: u64) {
1102 self.active_txns.remove(&txn_id);
1103 self.update_min_active_ts();
1104 }
1105
1106 pub fn min_active_snapshot(&self) -> u64 {
1108 self.min_active_ts.load(Ordering::SeqCst)
1109 }
1110
1111 pub fn active_transaction_count(&self) -> usize {
1113 self.active_txns.len()
1114 }
1115
1116 fn update_min_active_ts(&self) {
1117 let min = self
1118 .active_txns
1119 .iter()
1120 .map(|entry| entry.value().snapshot_ts)
1121 .min()
1122 .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1123 self.min_active_ts.store(min, Ordering::SeqCst);
1124 }
1125}
1126
1127struct EpochDirtyList {
1132 epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1135 current_epoch: AtomicU64,
1137}
1138
1139const EPOCH_RING_SIZE: usize = 4;
1140
1141impl EpochDirtyList {
1142 fn new() -> Self {
1143 Self {
1144 epochs: [
1145 parking_lot::Mutex::new(Vec::new()),
1146 parking_lot::Mutex::new(Vec::new()),
1147 parking_lot::Mutex::new(Vec::new()),
1148 parking_lot::Mutex::new(Vec::new()),
1149 ],
1150 current_epoch: AtomicU64::new(0),
1151 }
1152 }
1153
1154 #[inline]
1156 fn record_version(&self, key: Vec<u8>) {
1157 let epoch = self.current_epoch.load(Ordering::Relaxed);
1158 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1159 self.epochs[idx].lock().push(key);
1160 }
1161
1162 #[inline]
1167 fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1168 let epoch = self.current_epoch.load(Ordering::Relaxed);
1169 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1170 let mut guard = self.epochs[idx].lock();
1171 guard.extend(keys);
1172 }
1173
1174 fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1176 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1177 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1178
1179 let mut guard = self.epochs[old_idx].lock();
1181 let keys = std::mem::take(&mut *guard);
1182 (old_epoch, keys)
1183 }
1184
1185 #[allow(dead_code)]
1187 fn current(&self) -> u64 {
1188 self.current_epoch.load(Ordering::Relaxed)
1189 }
1190}
1191
1192struct ScanRangeIterator<'a> {
1202 memtable: &'a MvccMemTable,
1203 start: Vec<u8>,
1204 end: Vec<u8>,
1205 snapshot_ts: u64,
1206 current_txn_id: Option<u64>,
1207 use_ordered: bool,
1208 ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1210 unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1211 initialized: bool,
1212}
1213
1214impl<'a> Iterator for ScanRangeIterator<'a> {
1215 type Item = (Vec<u8>, Vec<u8>);
1216
1217 fn next(&mut self) -> Option<Self::Item> {
1218 if !self.initialized {
1220 self.initialized = true;
1221
1222 if self.use_ordered {
1223 if let Some(ref def_idx) = self.memtable.deferred_index {
1225 let start = self.start.clone();
1226 let end = self.end.clone();
1227 let snapshot_ts = self.snapshot_ts;
1228 let current_txn_id = self.current_txn_id;
1229 let data = &self.memtable.data;
1230
1231 let keys: Vec<Vec<u8>> = if end.is_empty() {
1233 def_idx.range_from(&start).collect()
1234 } else {
1235 def_idx.range(&start, &end).collect()
1236 };
1237
1238 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1239 keys.into_iter()
1240 .filter_map(move |key| {
1241 if let Some(chain) = data.get(&key)
1242 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1243 && let Some(value) = &v.value
1244 {
1245 Some((key, value.clone()))
1246 } else {
1247 None
1248 }
1249 })
1250 );
1251 self.ordered_iter = Some(iter);
1252 } else if let Some(ref idx) = self.memtable.ordered_index {
1253 let start = self.start.clone();
1254 let end = self.end.clone();
1255 let snapshot_ts = self.snapshot_ts;
1256 let current_txn_id = self.current_txn_id;
1257 let data = &self.memtable.data;
1258
1259 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty() {
1260 Box::new(
1261 idx.range(start..)
1262 .filter_map(move |entry| {
1263 let key = entry.key();
1264 if let Some(chain) = data.get(key)
1265 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1266 && let Some(value) = &v.value
1267 {
1268 Some((key.clone(), value.clone()))
1269 } else {
1270 None
1271 }
1272 })
1273 )
1274 } else {
1275 Box::new(
1276 idx.range(start..end)
1277 .filter_map(move |entry| {
1278 let key = entry.key();
1279 if let Some(chain) = data.get(key)
1280 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1281 && let Some(value) = &v.value
1282 {
1283 Some((key.clone(), value.clone()))
1284 } else {
1285 None
1286 }
1287 })
1288 )
1289 };
1290 self.ordered_iter = Some(iter);
1291 }
1292 } else {
1293 let start = self.start.clone();
1295 let end = self.end.clone();
1296 let snapshot_ts = self.snapshot_ts;
1297 let current_txn_id = self.current_txn_id;
1298
1299 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1300 self.memtable.data.iter()
1301 .filter_map(move |entry| {
1302 let key = entry.key();
1303
1304 if key.as_slice() < start.as_slice() {
1305 return None;
1306 }
1307 if !end.is_empty() && key.as_slice() >= end.as_slice() {
1308 return None;
1309 }
1310
1311 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1312 && let Some(value) = &v.value
1313 {
1314 Some((key.clone(), value.clone()))
1315 } else {
1316 None
1317 }
1318 })
1319 );
1320 self.unordered_iter = Some(iter);
1321 }
1322 }
1323
1324 if let Some(ref mut iter) = self.ordered_iter {
1326 iter.next()
1327 } else if let Some(ref mut iter) = self.unordered_iter {
1328 iter.next()
1329 } else {
1330 None
1331 }
1332 }
1333}
1334
1335pub struct MvccMemTable {
1345 data: DashMap<Vec<u8>, VersionChain>,
1347 deferred_index: Option<DeferredSortedIndex>,
1351 ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1353 #[allow(dead_code)]
1355 use_deferred: bool,
1356 size_bytes: AtomicU64,
1358 dirty_list: EpochDirtyList,
1360}
1361
1362impl Default for MvccMemTable {
1363 fn default() -> Self {
1364 Self::new()
1365 }
1366}
1367
1368impl MvccMemTable {
1369 pub fn new() -> Self {
1370 Self::with_ordered_index(true)
1371 }
1372
1373 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1382 Self::with_index_mode(enable_ordered_index, true)
1383 }
1384
1385 pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1392 Self {
1393 data: DashMap::new(),
1394 deferred_index: if enable_ordered_index && use_deferred {
1395 Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1396 max_unsorted_entries: 10_000, enabled: true,
1398 }))
1399 } else {
1400 None
1401 },
1402 ordered_index: if enable_ordered_index && !use_deferred {
1403 Some(SkipMap::new())
1404 } else {
1405 None
1406 },
1407 use_deferred,
1408 size_bytes: AtomicU64::new(0),
1409 dirty_list: EpochDirtyList::new(),
1410 }
1411 }
1412
1413 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1415 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1416 let key_len = key.len();
1417
1418 self.dirty_list.record_version(key.clone());
1420
1421 if let Some(ref idx) = self.deferred_index {
1425 idx.insert(key.clone());
1426 } else if let Some(ref idx) = self.ordered_index {
1427 idx.insert(key.clone(), ());
1428 }
1429
1430 let mut entry = self.data.entry(key).or_default();
1432
1433 if entry.has_write_conflict(txn_id) {
1435 return Err(SochDBError::Internal(
1436 "Write-write conflict detected".into(),
1437 ));
1438 }
1439 entry.add_uncommitted(value, txn_id);
1440 self.size_bytes
1441 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1442
1443 Ok(())
1444 }
1445
1446 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1452 let mut total_size = 0u64;
1453
1454 self.dirty_list.record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1456
1457 for (key, value) in writes {
1458 if let Some(ref idx) = self.deferred_index {
1461 idx.insert(key.clone());
1462 } else if let Some(ref idx) = self.ordered_index {
1463 idx.insert(key.clone(), ());
1464 }
1465
1466 let mut entry = self.data.entry(key.clone()).or_default();
1467
1468 if entry.has_write_conflict(txn_id) {
1469 return Err(SochDBError::Internal(
1470 "Write-write conflict detected".into(),
1471 ));
1472 }
1473
1474 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1475 entry.add_uncommitted(value.clone(), txn_id);
1476 total_size += (key.len() + value_size) as u64;
1477 }
1478
1479 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1480 Ok(())
1481 }
1482
1483 pub fn read(
1485 &self,
1486 key: &[u8],
1487 snapshot_ts: u64,
1488 current_txn_id: Option<u64>,
1489 ) -> Option<Vec<u8>> {
1490 self.data.get(key).and_then(|chain| {
1491 chain
1492 .read_at(snapshot_ts, current_txn_id)
1493 .and_then(|v| v.value.clone())
1494 })
1495 }
1496
1497 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1502 for key in write_set {
1504 if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1505 chain.commit(txn_id, commit_ts);
1506 }
1507 }
1508 }
1509
1510 #[allow(dead_code)]
1512 pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1513 for mut entry in self.data.iter_mut() {
1514 entry.value_mut().commit(txn_id, commit_ts);
1515 }
1516 }
1517
1518 pub fn abort(&self, txn_id: u64) {
1520 for mut entry in self.data.iter_mut() {
1521 entry.value_mut().abort(txn_id);
1522 }
1523 }
1524
1525 pub fn scan_prefix(
1542 &self,
1543 prefix: &[u8],
1544 snapshot_ts: u64,
1545 current_txn_id: Option<u64>,
1546 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1547 let estimated_size = (self.data.len() / 10).max(64);
1549 let mut results = Vec::with_capacity(estimated_size);
1550
1551 if let Some(ref idx) = self.deferred_index {
1552 for key in idx.range_from(prefix) {
1554 if !key.starts_with(prefix) {
1556 break;
1557 }
1558
1559 if let Some(chain) = self.data.get(&key)
1561 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1562 && let Some(value) = &v.value
1563 {
1564 results.push((key, value.clone()));
1565 }
1566 }
1567 } else if let Some(ref idx) = self.ordered_index {
1568 for entry in idx.range(prefix.to_vec()..) {
1570 let key = entry.key();
1571
1572 if !key.starts_with(prefix) {
1574 break;
1575 }
1576
1577 if let Some(chain) = self.data.get(key)
1579 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1580 && let Some(value) = &v.value
1581 {
1582 results.push((key.clone(), value.clone()));
1583 }
1584 }
1585 } else {
1586 for entry in self.data.iter() {
1589 let key = entry.key();
1590 if !key.starts_with(prefix) {
1591 continue;
1592 }
1593 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1594 && let Some(value) = &v.value
1595 {
1596 results.push((key.clone(), value.clone()));
1597 }
1598 }
1599 }
1600
1601 results
1602 }
1603
1604 pub fn scan_all(
1609 &self,
1610 snapshot_ts: u64,
1611 current_txn_id: Option<u64>,
1612 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1613 let mut results = Vec::with_capacity(self.data.len());
1614
1615 for entry in self.data.iter() {
1616 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1617 && let Some(value) = &v.value
1618 {
1619 results.push((entry.key().clone(), value.clone()));
1620 }
1621 }
1622
1623 results
1624 }
1625
1626 pub fn scan_prefix_iter<'a>(
1631 &'a self,
1632 prefix: &'a [u8],
1633 snapshot_ts: u64,
1634 current_txn_id: Option<u64>,
1635 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1636 self.data.iter().filter_map(move |entry| {
1637 let key = entry.key();
1638 if !key.starts_with(prefix) {
1639 return None;
1640 }
1641 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1642 && let Some(value) = &v.value
1643 {
1644 Some((key.clone(), value.clone()))
1645 } else {
1646 None
1647 }
1648 })
1649 }
1650
1651 pub fn scan_range(
1653 &self,
1654 start: &[u8],
1655 end: &[u8],
1656 snapshot_ts: u64,
1657 current_txn_id: Option<u64>,
1658 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1659 let mut results = Vec::new();
1660
1661 if let Some(ref idx) = self.deferred_index {
1662 if end.is_empty() {
1664 for key in idx.range_from(start) {
1665 if let Some(chain) = self.data.get(&key)
1666 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1667 && let Some(value) = &v.value
1668 {
1669 results.push((key, value.clone()));
1670 }
1671 }
1672 } else {
1673 for key in idx.range(start, end) {
1674 if let Some(chain) = self.data.get(&key)
1675 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1676 && let Some(value) = &v.value
1677 {
1678 results.push((key, value.clone()));
1679 }
1680 }
1681 }
1682 } else if let Some(ref idx) = self.ordered_index {
1683 if end.is_empty() {
1685 for entry in idx.range(start.to_vec()..) {
1687 let key = entry.key();
1688 if let Some(chain) = self.data.get(key)
1689 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1690 && let Some(value) = &v.value
1691 {
1692 results.push((key.clone(), value.clone()));
1693 }
1694 }
1695 } else {
1696 for entry in idx.range(start.to_vec()..end.to_vec()) {
1697 let key = entry.key();
1698 if let Some(chain) = self.data.get(key)
1699 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1700 && let Some(value) = &v.value
1701 {
1702 results.push((key.clone(), value.clone()));
1703 }
1704 }
1705 }
1706 } else {
1707 for entry in self.data.iter() {
1709 let key = entry.key();
1710
1711 if key.as_slice() < start {
1712 continue;
1713 }
1714 if !end.is_empty() && key.as_slice() >= end {
1715 continue;
1716 }
1717
1718 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1719 && let Some(value) = &v.value
1720 {
1721 results.push((key.clone(), value.clone()));
1722 }
1723 }
1724 }
1725
1726 results
1727 }
1728
1729 pub fn scan_range_iter<'a>(
1752 &'a self,
1753 start: &'a [u8],
1754 end: &'a [u8],
1755 snapshot_ts: u64,
1756 current_txn_id: Option<u64>,
1757 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1758 if let Some(ref idx) = self.deferred_index {
1760 idx.compact();
1761 }
1762
1763 let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1765
1766 ScanRangeIterator {
1768 memtable: self,
1769 start: start.to_vec(),
1770 end: end.to_vec(),
1771 snapshot_ts,
1772 current_txn_id,
1773 use_ordered,
1774 ordered_iter: None,
1775 unordered_iter: None,
1776 initialized: false,
1777 }
1778 }
1779
1780 pub fn size(&self) -> u64 {
1782 self.size_bytes.load(Ordering::Relaxed)
1783 }
1784
1785 pub fn gc(&self, min_active_ts: u64) -> usize {
1790 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1792
1793 if dirty_keys.is_empty() {
1794 return 0;
1795 }
1796
1797 let mut gc_count = 0;
1798
1799 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1802
1803 for key in unique_keys {
1804 if let Some(mut entry) = self.data.get_mut(&key) {
1805 let before = entry.value().version_count();
1806 entry.value_mut().gc(min_active_ts);
1807 gc_count += before.saturating_sub(entry.value().version_count());
1808 }
1809 }
1810
1811 gc_count
1812 }
1813
1814 #[allow(dead_code)]
1816 pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1817 let mut gc_count = 0;
1818
1819 for mut entry in self.data.iter_mut() {
1820 let before = entry.value().version_count();
1821 entry.value_mut().gc(min_active_ts);
1822 gc_count += before.saturating_sub(entry.value().version_count());
1823 }
1824
1825 gc_count
1826 }
1827}
1828
1829impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1834 fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1835 self.read(key, snapshot_ts, txn_id)
1836 }
1837
1838 fn mvcc_put(
1839 &self,
1840 key: &[u8],
1841 value: Option<Vec<u8>>,
1842 txn_id: u64,
1843 ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1844 let mut entry = self.data.entry(key.to_vec()).or_default();
1845 if entry.has_write_conflict(txn_id) {
1846 return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1847 }
1848 entry.add_uncommitted(value, txn_id);
1849 Ok(())
1850 }
1851
1852 fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1853 if let Some(mut chain) = self.data.get_mut(key) {
1854 return chain.commit(txn_id, commit_ts);
1855 }
1856 false
1857 }
1858
1859 fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1860 if let Some(mut chain) = self.data.get_mut(key) {
1861 chain.abort(txn_id);
1862 }
1863 }
1864
1865 fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1866 self.data
1867 .get(key)
1868 .map(|chain| chain.has_write_conflict(txn_id))
1869 .unwrap_or(false)
1870 }
1871
1872 fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1873 let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1874 for mut entry in self.data.iter_mut() {
1875 stats.keys_scanned += 1;
1876 let before = entry.value().version_count();
1877 entry.value_mut().gc(min_ts);
1878 stats.versions_removed += before.saturating_sub(entry.value().version_count());
1879 }
1880 stats
1881 }
1882
1883 fn mvcc_key_count(&self) -> usize {
1884 self.data.len()
1885 }
1886}
1887
1888use crate::key_buffer::ArenaKeyHandle;
1893
1894struct ArenaEpochDirtyList {
1896 epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1897 current_epoch: AtomicU64,
1898}
1899
1900impl ArenaEpochDirtyList {
1901 fn new() -> Self {
1902 Self {
1903 epochs: [
1904 parking_lot::Mutex::new(Vec::new()),
1905 parking_lot::Mutex::new(Vec::new()),
1906 parking_lot::Mutex::new(Vec::new()),
1907 parking_lot::Mutex::new(Vec::new()),
1908 ],
1909 current_epoch: AtomicU64::new(0),
1910 }
1911 }
1912
1913 #[inline]
1914 fn record_version(&self, key: ArenaKeyHandle) {
1915 let epoch = self.current_epoch.load(Ordering::Relaxed);
1916 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1917 self.epochs[idx].lock().push(key);
1918 }
1919
1920 fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1921 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1922 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1923 let mut guard = self.epochs[old_idx].lock();
1924 let keys = std::mem::take(&mut *guard);
1925 (old_epoch, keys)
1926 }
1927}
1928
1929pub struct ArenaMvccMemTable {
1944 data: DashMap<ArenaKeyHandle, VersionChain>,
1946 ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1948 size_bytes: AtomicU64,
1950 dirty_list: ArenaEpochDirtyList,
1952}
1953
1954impl ArenaMvccMemTable {
1955 pub fn new() -> Self {
1956 Self::with_ordered_index(true)
1957 }
1958
1959 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1960 Self {
1961 data: DashMap::new(),
1962 ordered_index: if enable_ordered_index {
1963 Some(SkipMap::new())
1964 } else {
1965 None
1966 },
1967 size_bytes: AtomicU64::new(0),
1968 dirty_list: ArenaEpochDirtyList::new(),
1969 }
1970 }
1971
1972 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1977 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1978 let key_len = key.len();
1979
1980 let key_handle = ArenaKeyHandle::new(key);
1982
1983 self.dirty_list.record_version(key_handle.clone());
1985
1986 if let Some(ref idx) = self.ordered_index {
1988 idx.insert(key_handle.clone(), ());
1989 }
1990
1991 let mut entry = self.data.entry(key_handle).or_default();
1993
1994 if entry.has_write_conflict(txn_id) {
1995 return Err(SochDBError::Internal(
1996 "Write-write conflict detected".into(),
1997 ));
1998 }
1999 entry.add_uncommitted(value, txn_id);
2000 self.size_bytes
2001 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2002
2003 Ok(())
2004 }
2005
2006 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2008 let mut total_size = 0u64;
2009
2010 for (key, value) in writes {
2011 let key_handle = ArenaKeyHandle::new(key);
2012
2013 self.dirty_list.record_version(key_handle.clone());
2014
2015 if let Some(ref idx) = self.ordered_index {
2016 idx.insert(key_handle.clone(), ());
2017 }
2018
2019 let mut entry = self.data.entry(key_handle).or_default();
2020
2021 if entry.has_write_conflict(txn_id) {
2022 return Err(SochDBError::Internal(
2023 "Write-write conflict detected".into(),
2024 ));
2025 }
2026
2027 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2028 entry.add_uncommitted(value.clone(), txn_id);
2029 total_size += (key.len() + value_size) as u64;
2030 }
2031
2032 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2033 Ok(())
2034 }
2035
2036 pub fn read(
2038 &self,
2039 key: &[u8],
2040 snapshot_ts: u64,
2041 current_txn_id: Option<u64>,
2042 ) -> Option<Vec<u8>> {
2043 let key_handle = ArenaKeyHandle::new(key);
2045 self.data.get(&key_handle).and_then(|chain| {
2046 chain
2047 .read_at(snapshot_ts, current_txn_id)
2048 .and_then(|v| v.value.clone())
2049 })
2050 }
2051
2052 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2054 for key in write_set {
2055 let key_handle = ArenaKeyHandle::new(key.as_slice());
2056 if let Some(mut chain) = self.data.get_mut(&key_handle) {
2057 chain.commit(txn_id, commit_ts);
2058 }
2059 }
2060 }
2061
2062 pub fn abort(&self, txn_id: u64) {
2064 for mut entry in self.data.iter_mut() {
2065 entry.value_mut().abort(txn_id);
2066 }
2067 }
2068
2069 pub fn scan_prefix(
2071 &self,
2072 prefix: &[u8],
2073 snapshot_ts: u64,
2074 current_txn_id: Option<u64>,
2075 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2076 let mut results = Vec::new();
2077 let prefix_handle = ArenaKeyHandle::new(prefix);
2078
2079 if let Some(ref idx) = self.ordered_index {
2080 for entry in idx.range(prefix_handle..) {
2081 let key = entry.key();
2082
2083 if !key.as_bytes().starts_with(prefix) {
2084 break;
2085 }
2086
2087 if let Some(chain) = self.data.get(key)
2088 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2089 && let Some(value) = &v.value
2090 {
2091 results.push((key.as_bytes().to_vec(), value.clone()));
2092 }
2093 }
2094 } else {
2095 for entry in self.data.iter() {
2096 let key = entry.key();
2097 if !key.as_bytes().starts_with(prefix) {
2098 continue;
2099 }
2100 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2101 && let Some(value) = &v.value
2102 {
2103 results.push((key.as_bytes().to_vec(), value.clone()));
2104 }
2105 }
2106 }
2107
2108 results
2109 }
2110
2111 pub fn size(&self) -> u64 {
2113 self.size_bytes.load(Ordering::Relaxed)
2114 }
2115
2116 pub fn gc(&self, min_active_ts: u64) -> usize {
2118 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2119
2120 if dirty_keys.is_empty() {
2121 return 0;
2122 }
2123
2124 let mut gc_count = 0;
2125 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2126
2127 for key in unique_keys {
2128 if let Some(mut entry) = self.data.get_mut(&key) {
2129 let before = entry.value().version_count();
2130 entry.value_mut().gc(min_active_ts);
2131 gc_count += before.saturating_sub(entry.value().version_count());
2132 }
2133 }
2134
2135 gc_count
2136 }
2137}
2138
2139impl Default for ArenaMvccMemTable {
2140 fn default() -> Self {
2141 Self::new()
2142 }
2143}
2144
2145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2151pub enum MemTableType {
2152 Standard,
2155 Arena,
2158}
2159
2160impl Default for MemTableType {
2161 fn default() -> Self {
2162 MemTableType::Standard
2164 }
2165}
2166
2167pub enum MemTableKind {
2180 Standard(MvccMemTable),
2181 Arena(ArenaMvccMemTable),
2182}
2183
2184impl MemTableKind {
2185 pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2187 match kind {
2188 MemTableType::Standard => {
2189 MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2190 }
2191 MemTableType::Arena => {
2192 MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2193 }
2194 }
2195 }
2196
2197 #[inline]
2199 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2200 match self {
2201 MemTableKind::Standard(m) => m.write(key, value, txn_id),
2202 MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2203 }
2204 }
2205
2206 #[inline]
2208 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2209 match self {
2210 MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2211 MemTableKind::Arena(m) => {
2212 let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2214 .iter()
2215 .map(|(k, v)| (k.as_slice(), v.clone()))
2216 .collect();
2217 m.write_batch(&arena_writes, txn_id)
2218 }
2219 }
2220 }
2221
2222 #[inline]
2224 pub fn read(
2225 &self,
2226 key: &[u8],
2227 snapshot_ts: u64,
2228 current_txn_id: Option<u64>,
2229 ) -> Option<Vec<u8>> {
2230 match self {
2231 MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2232 MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2233 }
2234 }
2235
2236 #[inline]
2238 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2239 match self {
2240 MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2241 MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2242 }
2243 }
2244
2245 #[inline]
2247 pub fn abort(&self, txn_id: u64) {
2248 match self {
2249 MemTableKind::Standard(m) => m.abort(txn_id),
2250 MemTableKind::Arena(m) => m.abort(txn_id),
2251 }
2252 }
2253
2254 #[inline]
2256 pub fn scan_prefix(
2257 &self,
2258 prefix: &[u8],
2259 snapshot_ts: u64,
2260 current_txn_id: Option<u64>,
2261 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2262 match self {
2263 MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2264 MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2265 }
2266 }
2267
2268 #[inline]
2270 pub fn scan_range(
2271 &self,
2272 start: &[u8],
2273 end: &[u8],
2274 snapshot_ts: u64,
2275 current_txn_id: Option<u64>,
2276 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2277 match self {
2278 MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2279 MemTableKind::Arena(m) => {
2280 let mut results = Vec::new();
2282 if let Some(ref idx) = m.ordered_index {
2283 let start_handle = ArenaKeyHandle::new(start);
2284 let end_handle = ArenaKeyHandle::new(end);
2285
2286 if end.is_empty() {
2287 for entry in idx.range(start_handle..) {
2288 let key = entry.key();
2289 if let Some(chain) = m.data.get(key)
2290 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2291 && let Some(value) = &v.value
2292 {
2293 results.push((key.as_bytes().to_vec(), value.clone()));
2294 }
2295 }
2296 } else {
2297 for entry in idx.range(start_handle..end_handle) {
2298 let key = entry.key();
2299 if let Some(chain) = m.data.get(key)
2300 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2301 && let Some(value) = &v.value
2302 {
2303 results.push((key.as_bytes().to_vec(), value.clone()));
2304 }
2305 }
2306 }
2307 } else {
2308 for entry in m.data.iter() {
2309 let key = entry.key();
2310 let key_bytes = key.as_bytes();
2311 if key_bytes < start {
2312 continue;
2313 }
2314 if !end.is_empty() && key_bytes >= end {
2315 continue;
2316 }
2317 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2318 && let Some(value) = &v.value
2319 {
2320 results.push((key_bytes.to_vec(), value.clone()));
2321 }
2322 }
2323 }
2324 results
2325 }
2326 }
2327 }
2328
2329 #[inline]
2331 pub fn scan_range_iter<'a>(
2332 &'a self,
2333 start: &'a [u8],
2334 end: &'a [u8],
2335 snapshot_ts: u64,
2336 current_txn_id: Option<u64>,
2337 ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2338 match self {
2339 MemTableKind::Standard(m) => {
2340 Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2341 }
2342 MemTableKind::Arena(_) => {
2343 let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2345 Box::new(results.into_iter())
2346 }
2347 }
2348 }
2349
2350 #[inline]
2352 pub fn size(&self) -> u64 {
2353 match self {
2354 MemTableKind::Standard(m) => m.size(),
2355 MemTableKind::Arena(m) => m.size(),
2356 }
2357 }
2358
2359 #[inline]
2361 pub fn gc(&self, min_active_ts: u64) -> usize {
2362 match self {
2363 MemTableKind::Standard(m) => m.gc(min_active_ts),
2364 MemTableKind::Arena(m) => m.gc(min_active_ts),
2365 }
2366 }
2367
2368 pub fn kind(&self) -> MemTableType {
2370 match self {
2371 MemTableKind::Standard(_) => MemTableType::Standard,
2372 MemTableKind::Arena(_) => MemTableType::Arena,
2373 }
2374 }
2375}
2376
2377pub struct DurableStorage {
2379 path: PathBuf,
2381 wal: Arc<TxnWal>,
2383 mvcc: Arc<MvccManager>,
2385 memtable: Arc<MemTableKind>,
2387 txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2391 group_commit: Option<Arc<EventDrivenGroupCommit>>,
2393 needs_recovery: AtomicU64, last_checkpoint_lsn: AtomicU64,
2397 sync_mode: AtomicU64,
2400 commits_since_sync: AtomicU64,
2402 arrival_rate_ema: AtomicU64,
2405 last_commit_us: AtomicU64,
2407 fsync_latency_us: AtomicU64,
2409 #[allow(dead_code)]
2411 db_lock: Option<crate::lock::DatabaseLock>,
2412}
2413
2414impl DurableStorage {
2415 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2417 Self::open_with_config(path, true)
2418 }
2419
2420 pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2425 Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2426 }
2427
2428 pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2436 Self::open_with_full_config(path, true, MemTableType::Arena)
2437 }
2438
2439 pub fn open_with_full_config<P: AsRef<Path>>(
2452 path: P,
2453 enable_ordered_index: bool,
2454 memtable_type: MemTableType,
2455 ) -> Result<Self> {
2456 Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, true)
2457 }
2458
2459 #[cfg(test)]
2465 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2466 Self::open_with_full_config_internal(path, true, MemTableType::Standard, false)
2467 }
2468
2469 fn open_with_full_config_internal<P: AsRef<Path>>(
2470 path: P,
2471 enable_ordered_index: bool,
2472 memtable_type: MemTableType,
2473 acquire_lock: bool,
2474 ) -> Result<Self> {
2475 let path = path.as_ref().to_path_buf();
2476 std::fs::create_dir_all(&path)?;
2477
2478 let db_lock = if acquire_lock {
2480 Some(crate::lock::DatabaseLock::acquire(&path)
2481 .map_err(|e| SochDBError::LockError(e.to_string()))?)
2482 } else {
2483 None
2484 };
2485
2486 let wal_path = path.join("wal.log");
2487 let wal = Arc::new(TxnWal::new(&wal_path)?);
2488
2489 let storage = Self {
2490 path,
2491 wal: wal.clone(),
2492 mvcc: Arc::new(MvccManager::new()),
2493 memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2494 txn_write_buffers: DashMap::new(),
2495 group_commit: None,
2496 needs_recovery: AtomicU64::new(0),
2497 last_checkpoint_lsn: AtomicU64::new(0),
2498 sync_mode: AtomicU64::new(1), commits_since_sync: AtomicU64::new(0),
2500 arrival_rate_ema: AtomicU64::new(1_000_000), last_commit_us: AtomicU64::new(0),
2503 fsync_latency_us: AtomicU64::new(5000), db_lock,
2505 };
2506
2507 if storage.check_recovery_needed()? {
2509 storage.needs_recovery.store(1, Ordering::SeqCst);
2510 }
2511
2512 Ok(storage)
2513 }
2514
2515 pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2517 Self::open_with_group_commit_and_config(path, true)
2518 }
2519
2520 pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2522 path: P,
2523 enable_ordered_index: bool,
2524 ) -> Result<Self> {
2525 let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2526
2527 let wal = storage.wal.clone();
2528 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2529 for &txn_id in txn_ids {
2531 let entry = TxnWalEntry::txn_commit(txn_id);
2532 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2533 }
2534
2535 wal.flush().map_err(|e| e.to_string())?;
2537 wal.sync().map_err(|e| e.to_string())?;
2538
2539 Ok(std::time::SystemTime::now()
2541 .duration_since(std::time::UNIX_EPOCH)
2542 .unwrap()
2543 .as_micros() as u64)
2544 });
2545
2546 storage.group_commit = Some(Arc::new(gc));
2547 Ok(storage)
2548 }
2549
2550 pub fn open_with_policy<P: AsRef<Path>>(
2562 path: P,
2563 policy: crate::index_policy::IndexPolicy,
2564 group_commit: bool,
2565 ) -> Result<Self> {
2566 use crate::index_policy::IndexPolicy;
2567
2568 let (enable_ordered_index, memtable_type) = match policy {
2570 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2571 (false, MemTableType::Arena)
2573 }
2574 IndexPolicy::Balanced => {
2575 (true, MemTableType::Standard)
2577 }
2578 IndexPolicy::ScanOptimized => {
2579 (true, MemTableType::Standard)
2581 }
2582 };
2583
2584 if group_commit {
2585 let mut storage = Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2586
2587 let wal = storage.wal.clone();
2588 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2589 for &txn_id in txn_ids {
2590 let entry = TxnWalEntry::txn_commit(txn_id);
2591 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2592 }
2593 wal.flush().map_err(|e| e.to_string())?;
2594 wal.sync().map_err(|e| e.to_string())?;
2595 Ok(std::time::SystemTime::now()
2596 .duration_since(std::time::UNIX_EPOCH)
2597 .unwrap()
2598 .as_micros() as u64)
2599 });
2600 storage.group_commit = Some(Arc::new(gc));
2601 Ok(storage)
2602 } else {
2603 Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2604 }
2605 }
2606
2607 pub fn open_for_concurrent<P: AsRef<Path>>(
2618 path: P,
2619 policy: crate::index_policy::IndexPolicy,
2620 ) -> Result<Self> {
2621 use crate::index_policy::IndexPolicy;
2622
2623 let (enable_ordered_index, memtable_type) = match policy {
2624 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2625 (false, MemTableType::Arena)
2626 }
2627 IndexPolicy::Balanced => {
2628 (true, MemTableType::Standard)
2629 }
2630 IndexPolicy::ScanOptimized => {
2631 (true, MemTableType::Standard)
2632 }
2633 };
2634
2635 Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, false)
2637 }
2638
2639 pub fn memtable_type(&self) -> MemTableType {
2641 self.memtable.kind()
2642 }
2643
2644 fn check_recovery_needed(&self) -> Result<bool> {
2650 let marker_path = self.path.join(".clean_shutdown");
2651 if marker_path.exists() {
2652 std::fs::remove_file(&marker_path)?;
2654 }
2655 Ok(true)
2658 }
2659
2660 pub fn recover(&self) -> Result<RecoveryStats> {
2662 if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2663 return Ok(RecoveryStats::default());
2664 }
2665
2666 let (writes, txn_count) = self.wal.replay_for_recovery()?;
2667
2668 let recovery_txn_id = self.wal.alloc_txn_id();
2670 let commit_ts = self.mvcc.alloc_commit_ts();
2671
2672 let mut write_set: HashSet<InlineKey> = HashSet::new();
2674 for (key, value) in &writes {
2675 write_set.insert(SmallVec::from_slice(key));
2676 self.memtable
2677 .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2678 }
2679 self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2680
2681 self.needs_recovery.store(0, Ordering::SeqCst);
2682
2683 Ok(RecoveryStats {
2684 transactions_recovered: txn_count,
2685 writes_recovered: writes.len(),
2686 commit_ts,
2687 })
2688 }
2689
2690 pub fn begin_transaction(&self) -> Result<u64> {
2692 let txn_id = self.wal.begin_transaction()?;
2693 self.mvcc.begin(txn_id);
2694 Ok(txn_id)
2695 }
2696
2697 pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2704 let txn_id = self.wal.begin_transaction()?;
2705 self.mvcc.begin_with_mode(txn_id, mode);
2706 Ok(txn_id)
2707 }
2708
2709 #[inline]
2717 pub fn begin_read_only_fast(&self) -> u64 {
2718 let txn_id = self.wal.alloc_txn_id();
2719 self.mvcc.begin_read_only(txn_id);
2720 txn_id
2721 }
2722
2723 #[inline]
2727 pub fn abort_read_only_fast(&self, txn_id: u64) {
2728 self.mvcc.abort(txn_id);
2729 }
2730
2731 #[inline]
2737 pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
2738 let snapshot_ts = self.mvcc.ts_counter.load(std::sync::atomic::Ordering::Relaxed);
2739 self.memtable.read(key, snapshot_ts, None)
2740 }
2741
2742 #[inline]
2746 pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
2747 let snapshot_ts = self.mvcc.ts_counter.load(std::sync::atomic::Ordering::Relaxed);
2748 self.memtable.scan_prefix(prefix, snapshot_ts, None)
2749 }
2750
2751 #[inline]
2753 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2754 let snapshot_ts = self
2756 .mvcc
2757 .get_snapshot_ts(txn_id)
2758 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2759
2760 self.mvcc.record_read(txn_id, key);
2762
2763 Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2765 }
2766
2767 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2772 self.write_refs(txn_id, &key, &value)?;
2774
2775 Ok(())
2776 }
2777
2778 #[inline]
2783 pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2784 self.mvcc.record_write(txn_id, key);
2786
2787 self.txn_write_buffers
2790 .entry(txn_id)
2791 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2792 .append(key, value);
2793
2794 self.memtable
2796 .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2797
2798 Ok(())
2799 }
2800
2801 pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2803 self.mvcc.record_write(txn_id, &key);
2805
2806 self.txn_write_buffers
2808 .entry(txn_id)
2809 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2810 .append(&key, &[]); self.memtable.write(key, None, txn_id)?;
2814
2815 Ok(())
2816 }
2817
2818 #[inline]
2832 pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2833 if writes.is_empty() {
2834 return Ok(());
2835 }
2836
2837 let mut buffer_entry = self
2839 .txn_write_buffers
2840 .entry(txn_id)
2841 .or_insert_with(|| TxnWalBuffer::new(txn_id));
2842
2843 for (key, value) in writes {
2845 self.mvcc.record_write(txn_id, key);
2847
2848 buffer_entry.append(key, value);
2850 }
2851 drop(buffer_entry);
2852
2853 let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2855 .iter()
2856 .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2857 .collect();
2858 self.memtable.write_batch(&owned_writes, txn_id)?;
2859
2860 Ok(())
2861 }
2862
2863 pub fn commit(&self, txn_id: u64) -> Result<u64> {
2870 if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2873 && !buffer.is_empty()
2874 {
2875 self.wal.flush_buffer(&buffer)?;
2877 }
2878
2879 if let Some(gc) = &self.group_commit {
2881 gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
2884
2885 let (commit_ts, write_set) = self
2887 .mvcc
2888 .commit(txn_id)
2889 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2890
2891 self.memtable.commit(txn_id, commit_ts, &write_set);
2893
2894 Ok(commit_ts)
2895 } else {
2896 let sync_mode = self.sync_mode.load(Ordering::Relaxed);
2898 let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
2899
2900 self.update_arrival_rate();
2902
2903 let entry = TxnWalEntry::txn_commit(txn_id);
2905 self.wal.append_no_flush(&entry)?;
2906
2907 let should_sync = match sync_mode {
2909 0 => false, 1 => commits >= self.adaptive_batch_threshold(), _ => true, };
2913
2914 if should_sync {
2915 let start = std::time::Instant::now();
2917 self.wal.flush()?;
2918 self.wal.sync()?;
2919 let latency_us = start.elapsed().as_micros() as u64;
2920
2921 let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
2923 let new_latency = (old_latency * 9 + latency_us) / 10;
2924 self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
2925
2926 self.commits_since_sync.store(0, Ordering::Relaxed);
2927 }
2928
2929 let (commit_ts, write_set) = self
2931 .mvcc
2932 .commit(txn_id)
2933 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2934
2935 self.memtable.commit(txn_id, commit_ts, &write_set);
2937
2938 Ok(commit_ts)
2939 }
2940 }
2941
2942 #[inline]
2944 fn update_arrival_rate(&self) {
2945 let now_us = std::time::SystemTime::now()
2946 .duration_since(std::time::UNIX_EPOCH)
2947 .unwrap()
2948 .as_micros() as u64;
2949
2950 let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
2951
2952 if last > 0 {
2953 let delta_us = now_us.saturating_sub(last);
2954 if delta_us > 0 && delta_us < 10_000_000 {
2955 let instant_rate = 1_000_000_000 / delta_us;
2959
2960 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
2962 let new_rate = (old_rate * 9 + instant_rate) / 10;
2963 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
2964 }
2965 }
2966 }
2967
2968 #[inline]
2973 fn adaptive_batch_threshold(&self) -> u64 {
2974 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; if lambda <= 0.0 || tau <= 0.0 {
2978 return 100; }
2980
2981 let n_opt = (2.0 * tau * lambda).sqrt();
2984
2985 (n_opt as u64).clamp(1, 1000)
2987 }
2988
2989 pub fn set_sync_mode(&self, mode: u64) {
2995 self.sync_mode.store(mode.min(2), Ordering::Relaxed);
2996 }
2997
2998 pub fn flush_group_commit(&self) {
3000 if let Some(gc) = &self.group_commit {
3001 gc.flush_batch();
3002 }
3003 }
3004
3005 pub fn abort(&self, txn_id: u64) -> Result<()> {
3011 let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3015
3016 if had_writes {
3017 self.wal.abort_transaction(txn_id)?;
3019 self.memtable.abort(txn_id);
3021 }
3022
3023 self.mvcc.abort(txn_id);
3025
3026 Ok(())
3027 }
3028
3029 #[inline]
3031 pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3032 let snapshot_ts = self
3034 .mvcc
3035 .get_snapshot_ts(txn_id)
3036 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3037
3038 Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3041 }
3042
3043 #[inline]
3045 pub fn scan_range(
3046 &self,
3047 txn_id: u64,
3048 start: &[u8],
3049 end: &[u8],
3050 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3051 let snapshot_ts = self
3052 .mvcc
3053 .get_snapshot_ts(txn_id)
3054 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3055
3056 Ok(self
3057 .memtable
3058 .scan_range(start, end, snapshot_ts, Some(txn_id)))
3059 }
3060
3061 #[inline]
3066 pub fn scan_range_iter<'a>(
3067 &'a self,
3068 txn_id: u64,
3069 start: &'a [u8],
3070 end: &'a [u8],
3071 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3072 let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3073 self.memtable.scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3074 }
3075
3076 pub fn flush_wal(&self) -> Result<()> {
3083 self.wal.flush()
3084 }
3085
3086 pub fn fsync(&self) -> Result<()> {
3088 self.wal.sync()
3089 }
3090
3091 pub fn checkpoint(&self) -> Result<u64> {
3093 let txn_id = 0; let entry = TxnWalEntry::checkpoint(txn_id);
3095 let lsn = self.wal.append_sync(&entry)?;
3096 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3097 Ok(lsn)
3098 }
3099
3100 pub fn truncate_wal(&self) -> Result<()> {
3110 self.wal.truncate()
3111 }
3112
3113 pub fn stats(&self) -> StorageStats {
3115 let wal_size = self.wal.size_bytes();
3117
3118 let active_txns = self.mvcc.active_transaction_count();
3120
3121 StorageStats {
3122 memtable_size_bytes: self.memtable.size(),
3123 wal_size_bytes: wal_size,
3124 active_transactions: active_txns,
3125 min_active_snapshot: self.mvcc.min_active_snapshot(),
3126 last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3127 }
3128 }
3129
3130 pub fn gc(&self) -> usize {
3132 let min_ts = self.mvcc.min_active_snapshot();
3133 self.memtable.gc(min_ts)
3134 }
3135
3136 pub fn shutdown(&self) -> Result<()> {
3138 self.fsync()?;
3140
3141 let marker_path = self.path.join(".clean_shutdown");
3143 std::fs::write(&marker_path, b"clean")?;
3144
3145 Ok(())
3146 }
3147}
3148
3149impl Drop for DurableStorage {
3150 fn drop(&mut self) {
3151 let _ = self.shutdown();
3152 }
3153}
3154
3155#[derive(Debug, Default)]
3157pub struct RecoveryStats {
3158 pub transactions_recovered: usize,
3159 pub writes_recovered: usize,
3160 pub commit_ts: u64,
3161}
3162
3163#[derive(Debug, Default)]
3165pub struct StorageStats {
3166 pub memtable_size_bytes: u64,
3167 pub wal_size_bytes: u64,
3168 pub active_transactions: usize,
3169 pub min_active_snapshot: u64,
3170 pub last_checkpoint_lsn: u64,
3171}
3172
3173#[cfg(test)]
3174mod tests {
3175 use super::*;
3176 use tempfile::tempdir;
3177
3178 #[test]
3179 fn test_basic_transaction() {
3180 let dir = tempdir().unwrap();
3181 let storage = DurableStorage::open(dir.path()).unwrap();
3182
3183 let txn_id = storage.begin_transaction().unwrap();
3185
3186 storage
3188 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3189 .unwrap();
3190 storage
3191 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3192 .unwrap();
3193
3194 let v1 = storage.read(txn_id, b"key1").unwrap();
3196 assert_eq!(v1, Some(b"value1".to_vec()));
3197
3198 let commit_ts = storage.commit(txn_id).unwrap();
3200 assert!(commit_ts > 0);
3201
3202 let txn2 = storage.begin_transaction().unwrap();
3204 let v1 = storage.read(txn2, b"key1").unwrap();
3205 assert_eq!(v1, Some(b"value1".to_vec()));
3206 storage.abort(txn2).unwrap();
3207 }
3208
3209 #[test]
3210 fn test_snapshot_isolation() {
3211 let dir = tempdir().unwrap();
3212 let storage = DurableStorage::open(dir.path()).unwrap();
3213
3214 let t1 = storage.begin_transaction().unwrap();
3216 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3217 storage.commit(t1).unwrap();
3218
3219 let t2 = storage.begin_transaction().unwrap();
3221
3222 let t3 = storage.begin_transaction().unwrap();
3224 storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3225 storage.commit(t3).unwrap();
3226
3227 let v = storage.read(t2, b"key").unwrap();
3229 assert_eq!(v, Some(b"v1".to_vec()));
3230
3231 let t4 = storage.begin_transaction().unwrap();
3233 let v = storage.read(t4, b"key").unwrap();
3234 assert_eq!(v, Some(b"v2".to_vec()));
3235
3236 storage.abort(t2).unwrap();
3237 storage.abort(t4).unwrap();
3238 }
3239
3240 #[test]
3241 fn test_abort_transaction() {
3242 let dir = tempdir().unwrap();
3243 let storage = DurableStorage::open(dir.path()).unwrap();
3244
3245 let t1 = storage.begin_transaction().unwrap();
3247 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3248 storage.commit(t1).unwrap();
3249
3250 let t2 = storage.begin_transaction().unwrap();
3252 storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3253 storage.abort(t2).unwrap();
3254
3255 let t3 = storage.begin_transaction().unwrap();
3257 let v = storage.read(t3, b"key").unwrap();
3258 assert_eq!(v, Some(b"v1".to_vec()));
3259 storage.abort(t3).unwrap();
3260 }
3261
3262 #[test]
3263 fn test_crash_recovery() {
3264 let dir = tempdir().unwrap();
3265
3266 {
3268 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3270
3271 storage.set_sync_mode(2); let txn = storage.begin_transaction().unwrap();
3275 storage
3276 .write(txn, b"persist".to_vec(), b"data".to_vec())
3277 .unwrap();
3278 storage.commit(txn).unwrap();
3279
3280 std::mem::forget(storage);
3282 }
3283
3284 {
3286 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3287 let stats = storage.recover().unwrap();
3288 assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3289
3290 let txn = storage.begin_transaction().unwrap();
3292 let v = storage.read(txn, b"persist").unwrap();
3293 assert_eq!(v, Some(b"data".to_vec()));
3294 storage.abort(txn).unwrap();
3295 }
3296 }
3297
3298 #[test]
3299 fn test_scan_prefix() {
3300 let dir = tempdir().unwrap();
3301 let storage = DurableStorage::open(dir.path()).unwrap();
3302
3303 let txn = storage.begin_transaction().unwrap();
3304 storage
3305 .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3306 .unwrap();
3307 storage
3308 .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3309 .unwrap();
3310 storage
3311 .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3312 .unwrap();
3313 storage.commit(txn).unwrap();
3314
3315 let txn2 = storage.begin_transaction().unwrap();
3316 let users = storage.scan(txn2, b"user:").unwrap();
3317 assert_eq!(users.len(), 2);
3318 storage.abort(txn2).unwrap();
3319 }
3320
3321 #[test]
3322 fn test_delete() {
3323 let dir = tempdir().unwrap();
3324 let storage = DurableStorage::open(dir.path()).unwrap();
3325
3326 let t1 = storage.begin_transaction().unwrap();
3328 storage
3329 .write(t1, b"key".to_vec(), b"value".to_vec())
3330 .unwrap();
3331 storage.commit(t1).unwrap();
3332
3333 let t2 = storage.begin_transaction().unwrap();
3335 assert!(storage.read(t2, b"key").unwrap().is_some());
3336 storage.abort(t2).unwrap();
3337
3338 let t3 = storage.begin_transaction().unwrap();
3340 storage.delete(t3, b"key".to_vec()).unwrap();
3341 storage.commit(t3).unwrap();
3342
3343 let t4 = storage.begin_transaction().unwrap();
3345 assert!(storage.read(t4, b"key").unwrap().is_none());
3346 storage.abort(t4).unwrap();
3347 }
3348
3349 #[test]
3350 fn test_gc() {
3351 let dir = tempdir().unwrap();
3352 let storage = DurableStorage::open(dir.path()).unwrap();
3353
3354 for i in 0..10 {
3356 let txn = storage.begin_transaction().unwrap();
3357 storage
3358 .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3359 .unwrap();
3360 storage.commit(txn).unwrap();
3361 }
3362
3363 let gc_count = storage.gc();
3365 let _ = gc_count; }
3369
3370 #[test]
3371 fn test_group_commit() {
3372 use std::sync::Arc;
3373 use std::thread;
3374
3375 let dir = tempdir().unwrap();
3376 let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3377
3378 let mut handles = vec![];
3380 for i in 0..4 {
3381 let storage = Arc::clone(&storage);
3382 handles.push(thread::spawn(move || {
3383 let txn = storage.begin_transaction().unwrap();
3384 storage
3385 .write(
3386 txn,
3387 format!("key{}", i).into_bytes(),
3388 format!("val{}", i).into_bytes(),
3389 )
3390 .unwrap();
3391 storage.commit(txn).unwrap()
3392 }));
3393 }
3394
3395 let mut commit_times = vec![];
3397 for h in handles {
3398 commit_times.push(h.join().unwrap());
3399 }
3400
3401 assert!(commit_times.iter().all(|&ts| ts > 0));
3403
3404 let txn = storage.begin_transaction().unwrap();
3406 for i in 0..4 {
3407 let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3408 assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3409 }
3410 storage.abort(txn).unwrap();
3411 }
3412
3413 #[test]
3416 fn test_arena_memtable_basic_write_read() {
3417 let memtable = ArenaMvccMemTable::new();
3418
3419 memtable
3421 .write(b"key1", Some(b"value1".to_vec()), 1)
3422 .unwrap();
3423 memtable
3424 .write(b"key2", Some(b"value2".to_vec()), 1)
3425 .unwrap();
3426
3427 assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3429 assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3430 assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3431 }
3432
3433 #[test]
3434 fn test_arena_memtable_update() {
3435 let memtable = ArenaMvccMemTable::new();
3436
3437 memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3438 memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3439
3440 assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3441 }
3442
3443 #[test]
3444 fn test_arena_memtable_delete() {
3445 let memtable = ArenaMvccMemTable::new();
3446
3447 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3448 memtable.write(b"key", None, 1).unwrap(); assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3451 }
3452
3453 #[test]
3454 fn test_arena_memtable_scan_prefix() {
3455 let memtable = ArenaMvccMemTable::new();
3456
3457 memtable
3458 .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3459 .unwrap();
3460 memtable
3461 .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3462 .unwrap();
3463 memtable
3464 .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3465 .unwrap();
3466 memtable
3467 .write(b"order:1", Some(b"order_data".to_vec()), 1)
3468 .unwrap();
3469
3470 let mut write_set = HashSet::new();
3472 write_set.insert(InlineKey::from_slice(b"user:1:name"));
3473 write_set.insert(InlineKey::from_slice(b"user:1:email"));
3474 write_set.insert(InlineKey::from_slice(b"user:2:name"));
3475 write_set.insert(InlineKey::from_slice(b"order:1"));
3476 memtable.commit(1, 10, &write_set);
3477
3478 let results = memtable.scan_prefix(b"user:1:", 11, None);
3480 assert_eq!(results.len(), 2);
3481
3482 let results = memtable.scan_prefix(b"user:", 11, None);
3484 assert_eq!(results.len(), 3);
3485 }
3486
3487 #[test]
3488 fn test_arena_memtable_write_batch() {
3489 let memtable = ArenaMvccMemTable::new();
3490
3491 let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3492 (b"k1", Some(b"v1".to_vec())),
3493 (b"k2", Some(b"v2".to_vec())),
3494 (b"k3", Some(b"v3".to_vec())),
3495 ];
3496
3497 memtable.write_batch(&writes, 1).unwrap();
3498
3499 assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3500 assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3501 assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3502 }
3503
3504 #[test]
3505 fn test_arena_memtable_gc() {
3506 let memtable = ArenaMvccMemTable::new();
3507
3508 for i in 0..10 {
3510 memtable
3511 .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3512 .unwrap();
3513
3514 let mut write_set = HashSet::new();
3515 write_set.insert(InlineKey::from_slice(b"key"));
3516 memtable.commit(i + 1, (i + 1) * 10, &write_set);
3517 }
3518
3519 let gc_count = memtable.gc(90);
3521 let _ = gc_count; }
3523
3524 #[test]
3525 fn test_arena_memtable_size_tracking() {
3526 let memtable = ArenaMvccMemTable::new();
3527
3528 assert_eq!(memtable.size(), 0);
3529
3530 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3531
3532 assert!(memtable.size() > 0);
3533 }
3534
3535 #[test]
3536 fn test_arena_memtable_abort() {
3537 let memtable = ArenaMvccMemTable::new();
3538
3539 memtable
3540 .write(b"key", Some(b"uncommitted".to_vec()), 1)
3541 .unwrap();
3542
3543 assert_eq!(
3545 memtable.read(b"key", 0, Some(1)),
3546 Some(b"uncommitted".to_vec())
3547 );
3548
3549 assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3551
3552 memtable.abort(1);
3554
3555 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3557 }
3558
3559 #[test]
3564 fn test_memtable_kind_standard() {
3565 let memtable = MemTableKind::new(MemTableType::Standard, true);
3566 assert_eq!(memtable.kind(), MemTableType::Standard);
3567
3568 memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3570
3571 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3573 memtable.commit(1, 100, &write_set);
3574
3575 let v = memtable.read(b"key1", 101, None);
3577 assert_eq!(v, Some(b"value1".to_vec()));
3578 }
3579
3580 #[test]
3581 fn test_memtable_kind_arena() {
3582 let memtable = MemTableKind::new(MemTableType::Arena, true);
3583 assert_eq!(memtable.kind(), MemTableType::Arena);
3584
3585 memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3587
3588 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3590 memtable.commit(1, 100, &write_set);
3591
3592 let v = memtable.read(b"key1", 101, None);
3594 assert_eq!(v, Some(b"value1".to_vec()));
3595 }
3596
3597 #[test]
3598 fn test_memtable_kind_scan_range() {
3599 for kind in [MemTableType::Standard, MemTableType::Arena] {
3601 let memtable = MemTableKind::new(kind, true);
3602
3603 for i in 0..5 {
3605 let key = format!("key{}", i);
3606 let value = format!("value{}", i);
3607 memtable.write(key.into_bytes(), Some(value.into_bytes()), 1).unwrap();
3608 }
3609
3610 let write_set: HashSet<InlineKey> = (0..5)
3612 .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3613 .collect();
3614 memtable.commit(1, 100, &write_set);
3615
3616 let results = memtable.scan_range(b"key1", b"key4", 101, None);
3618 assert_eq!(results.len(), 3, "kind={:?} should have 3 results (key1, key2, key3)", kind);
3619 }
3620 }
3621
3622 #[test]
3623 fn test_durable_storage_arena() {
3624 let dir = tempdir().unwrap();
3625 let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3626
3627 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3628
3629 let txn_id = storage.begin_transaction().unwrap();
3631 storage.write(txn_id, b"key1".to_vec(), b"value1".to_vec()).unwrap();
3632 storage.commit(txn_id).unwrap();
3633
3634 let txn2 = storage.begin_transaction().unwrap();
3635 let v = storage.read(txn2, b"key1").unwrap();
3636 assert_eq!(v, Some(b"value1".to_vec()));
3637 storage.abort(txn2).unwrap();
3638 }
3639
3640 #[test]
3641 fn test_durable_storage_full_config() {
3642 let dir = tempdir().unwrap();
3643
3644 let storage = DurableStorage::open_with_full_config(
3646 dir.path(),
3647 true,
3648 MemTableType::Arena,
3649 ).unwrap();
3650
3651 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3652
3653 let txn = storage.begin_transaction().unwrap();
3655 for i in 0..10 {
3656 let key = format!("key{:02}", i);
3657 let value = format!("value{}", i);
3658 storage.write(txn, key.into_bytes(), value.into_bytes()).unwrap();
3659 }
3660 storage.commit(txn).unwrap();
3661
3662 let txn2 = storage.begin_transaction().unwrap();
3664 let results = storage.scan(txn2, b"key0").unwrap();
3665 assert_eq!(results.len(), 10); storage.abort(txn2).unwrap();
3667 }
3668}