1use crate::group_commit::EventDrivenGroupCommit;
74use crate::ssi::SsiManager;
75use crate::txn_wal::TxnWal;
76use dashmap::DashMap;
77use parking_lot::RwLock;
78use std::collections::HashMap;
79use std::path::Path;
80use std::sync::Arc;
81use std::sync::atomic::{AtomicU64, Ordering};
82use sochdb_core::{Result, SochDBError};
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum TxnState {
87 Active,
89 Prepared,
91 Committed,
93 Aborted,
95}
96
97#[derive(Debug)]
99pub struct Transaction {
100 pub id: u64,
102 pub start_ts: u64,
104 pub state: TxnState,
106 writes: Vec<(Vec<u8>, Vec<u8>)>,
108 reads: Vec<Vec<u8>>,
110}
111
112impl Transaction {
113 fn new(id: u64, start_ts: u64) -> Self {
114 Self {
115 id,
116 start_ts,
117 state: TxnState::Active,
118 writes: Vec::new(),
119 reads: Vec::new(),
120 }
121 }
122
123 pub fn write(&mut self, key: Vec<u8>, value: Vec<u8>) {
125 self.writes.push((key, value));
126 }
127
128 pub fn record_read(&mut self, key: Vec<u8>) {
130 self.reads.push(key);
131 }
132
133 pub fn writes(&self) -> &[(Vec<u8>, Vec<u8>)] {
135 &self.writes
136 }
137}
138
139#[allow(clippy::type_complexity)]
143pub struct WalStorageManager {
144 wal: Arc<TxnWal>,
146 active_txns: RwLock<HashMap<u64, Transaction>>,
148 timestamp: AtomicU64,
150 apply_fn: Box<dyn Fn(&[u8], &[u8]) -> Result<()> + Send + Sync>,
152}
153
154impl WalStorageManager {
155 pub fn new<P: AsRef<Path>, F>(wal_path: P, apply_fn: F) -> Result<Self>
157 where
158 F: Fn(&[u8], &[u8]) -> Result<()> + Send + Sync + 'static,
159 {
160 let wal = Arc::new(TxnWal::new(wal_path)?);
161
162 Ok(Self {
163 wal,
164 active_txns: RwLock::new(HashMap::new()),
165 timestamp: AtomicU64::new(1),
166 apply_fn: Box::new(apply_fn),
167 })
168 }
169
170 pub fn begin_txn(&self) -> Result<u64> {
172 let txn_id = self.wal.begin_transaction()?;
173 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
174
175 let txn = Transaction::new(txn_id, start_ts);
176 self.active_txns.write().insert(txn_id, txn);
177
178 Ok(txn_id)
179 }
180
181 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
185 let mut txns = self.active_txns.write();
186 let txn = txns
187 .get_mut(&txn_id)
188 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
189
190 if txn.state != TxnState::Active {
191 return Err(SochDBError::InvalidArgument(
192 "Transaction not active".into(),
193 ));
194 }
195
196 txn.write(key, value);
197 Ok(())
198 }
199
200 pub fn write_immediate(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
204 {
206 let txns = self.active_txns.read();
207 let txn = txns
208 .get(&txn_id)
209 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
210
211 if txn.state != TxnState::Active {
212 return Err(SochDBError::InvalidArgument(
213 "Transaction not active".into(),
214 ));
215 }
216 }
217
218 self.wal.write(txn_id, key.clone(), value.clone())?;
220
221 (self.apply_fn)(&key, &value)?;
223
224 Ok(())
225 }
226
227 pub fn commit(&self, txn_id: u64) -> Result<u64> {
234 let txn = {
235 let mut txns = self.active_txns.write();
236 txns.remove(&txn_id)
237 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
238 };
239
240 if txn.state != TxnState::Active {
241 return Err(SochDBError::InvalidArgument(
242 "Transaction not active".into(),
243 ));
244 }
245
246 for (key, value) in &txn.writes {
248 self.wal.write(txn_id, key.clone(), value.clone())?;
249 }
250
251 self.wal.commit_transaction(txn_id)?;
253
254 for (key, value) in &txn.writes {
256 (self.apply_fn)(key, value)?;
257 }
258
259 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
261 Ok(commit_ts)
262 }
263
264 pub fn abort(&self, txn_id: u64) -> Result<()> {
268 let mut txns = self.active_txns.write();
269 let txn = txns
270 .remove(&txn_id)
271 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
272
273 if txn.state != TxnState::Active && txn.state != TxnState::Prepared {
274 return Err(SochDBError::InvalidArgument(
275 "Transaction cannot be aborted".into(),
276 ));
277 }
278
279 self.wal.abort_transaction(txn_id)?;
281
282 Ok(())
284 }
285
286 pub fn recover(&self) -> Result<RecoveryStats> {
290 let (committed_writes, txn_count) = self.wal.replay_for_recovery()?;
291
292 for (key, value) in &committed_writes {
293 (self.apply_fn)(key, value)?;
294 }
295
296 Ok(RecoveryStats {
297 transactions_recovered: txn_count,
298 writes_applied: committed_writes.len(),
299 })
300 }
301
302 pub fn checkpoint(&self) -> Result<()> {
306 self.wal.write_checkpoint()?;
307 self.wal.truncate()?;
308 Ok(())
309 }
310
311 pub fn wal(&self) -> &Arc<TxnWal> {
313 &self.wal
314 }
315
316 pub fn current_timestamp(&self) -> u64 {
318 self.timestamp.load(Ordering::SeqCst)
319 }
320}
321
322#[derive(Debug, Clone, Default)]
324pub struct RecoveryStats {
325 pub transactions_recovered: usize,
327 pub writes_applied: usize,
329}
330
331#[derive(Debug, Clone, Copy, PartialEq, Eq)]
337pub enum IsolationLevel {
338 ReadCommitted,
340 SnapshotIsolation,
342 Serializable,
344}
345
346#[derive(Debug)]
348pub struct MvccTransaction {
349 pub txn_id: u64,
351 pub snapshot_ts: u64,
353 pub status: MvccTxnStatus,
355 pub read_set: std::collections::HashSet<Vec<u8>>,
357 pub write_set: HashMap<Vec<u8>, Vec<u8>>,
359 pub isolation_level: IsolationLevel,
361}
362
363#[derive(Debug, Clone, Copy, PartialEq, Eq)]
365pub enum MvccTxnStatus {
366 Active,
367 Committed(u64), Aborted,
369}
370
371#[derive(Debug, Clone)]
373pub struct MvccVersion {
374 pub xmin: u64,
376 pub xmax: u64,
378 pub created_ts: u64,
380 pub deleted_ts: u64,
382 pub value: Vec<u8>,
384}
385
386impl MvccVersion {
387 pub fn new(xmin: u64, created_ts: u64, value: Vec<u8>) -> Self {
389 Self {
390 xmin,
391 xmax: 0,
392 created_ts,
393 deleted_ts: u64::MAX,
394 value,
395 }
396 }
397
398 pub fn mark_deleted(&mut self, xmax: u64, deleted_ts: u64) {
400 self.xmax = xmax;
401 self.deleted_ts = deleted_ts;
402 }
403
404 pub fn is_visible(
406 &self,
407 snapshot_ts: u64,
408 txn_id: u64,
409 committed_txns: &HashMap<u64, u64>,
410 ) -> bool {
411 if self.xmin == txn_id {
413 return self.xmax != txn_id; }
415
416 match committed_txns.get(&self.xmin) {
418 Some(&commit_ts) if commit_ts < snapshot_ts => {}
419 _ => return false, }
421
422 if self.xmax == 0 {
424 return true; }
426 if self.xmax == txn_id {
427 return false; }
429 match committed_txns.get(&self.xmax) {
430 Some(&commit_ts) => commit_ts >= snapshot_ts, None => true, }
433 }
434
435 pub fn is_visible_dashmap(
437 &self,
438 snapshot_ts: u64,
439 txn_id: u64,
440 committed_txns: &DashMap<u64, u64>,
441 ) -> bool {
442 if self.xmin == txn_id {
444 return self.xmax != txn_id; }
446
447 match committed_txns.get(&self.xmin) {
449 Some(commit_ts_ref) if *commit_ts_ref < snapshot_ts => {}
450 _ => return false, }
452
453 if self.xmax == 0 {
455 return true; }
457 if self.xmax == txn_id {
458 return false; }
460 match committed_txns.get(&self.xmax) {
461 Some(commit_ts_ref) => *commit_ts_ref >= snapshot_ts, None => true, }
464 }
465}
466
467#[derive(Debug, Default)]
469pub struct MvccVersionChain {
470 versions: Vec<MvccVersion>,
472}
473
474impl MvccVersionChain {
475 pub fn add(&mut self, version: MvccVersion) {
477 self.versions.insert(0, version);
478 }
479
480 pub fn get_visible(
483 &self,
484 snapshot_ts: u64,
485 txn_id: u64,
486 committed: &DashMap<u64, u64>,
487 ) -> Option<&Vec<u8>> {
488 for v in &self.versions {
489 if v.is_visible_dashmap(snapshot_ts, txn_id, committed) {
490 return Some(&v.value);
491 }
492 }
493 None
494 }
495
496 pub fn get_visible_legacy(
498 &self,
499 snapshot_ts: u64,
500 txn_id: u64,
501 committed: &HashMap<u64, u64>,
502 ) -> Option<&Vec<u8>> {
503 for v in &self.versions {
504 if v.is_visible(snapshot_ts, txn_id, committed) {
505 return Some(&v.value);
506 }
507 }
508 None
509 }
510
511 pub fn delete(&mut self, xmax: u64, deleted_ts: u64) -> bool {
513 if let Some(v) = self.versions.first_mut()
514 && v.xmax == 0
515 {
516 v.mark_deleted(xmax, deleted_ts);
517 return true;
518 }
519 false
520 }
521
522 pub fn gc(&mut self, min_visible_ts: u64) -> usize {
524 let old_len = self.versions.len();
525 if old_len <= 1 {
526 return 0;
527 }
528 self.versions.retain(|v| v.deleted_ts >= min_visible_ts);
529 if self.versions.is_empty() {
530 return old_len;
531 }
532 old_len - self.versions.len()
533 }
534}
535
536pub struct MvccTransactionManager {
549 wal: Arc<TxnWal>,
551 next_txn_id: AtomicU64,
553 timestamp: AtomicU64,
555 active_txns: RwLock<HashMap<u64, MvccTransaction>>,
557 committed_txns: DashMap<u64, u64>,
559 versions: DashMap<Vec<u8>, MvccVersionChain>,
561 ssi_manager: SsiManager,
563 group_commit: EventDrivenGroupCommit,
565 min_snapshot_ts: AtomicU64,
567 #[allow(clippy::type_complexity)]
569 apply_fn: Box<dyn Fn(&[u8], &[u8]) -> Result<()> + Send + Sync>,
570}
571
572impl MvccTransactionManager {
573 pub fn new<P: AsRef<Path>, F>(wal_path: P, apply_fn: F) -> Result<Self>
575 where
576 F: Fn(&[u8], &[u8]) -> Result<()> + Send + Sync + 'static,
577 {
578 let wal = Arc::new(TxnWal::new(wal_path)?);
579 let wal_for_gc = wal.clone();
580
581 let group_commit = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
583 for &txn_id in txn_ids {
585 wal_for_gc
586 .commit_transaction(txn_id)
587 .map_err(|e| e.to_string())?;
588 }
589 let commit_ts = std::time::SystemTime::now()
590 .duration_since(std::time::UNIX_EPOCH)
591 .unwrap()
592 .as_micros() as u64;
593 Ok(commit_ts)
594 });
595
596 Ok(Self {
597 wal,
598 next_txn_id: AtomicU64::new(1),
599 timestamp: AtomicU64::new(1),
600 active_txns: RwLock::new(HashMap::new()),
601 committed_txns: DashMap::new(),
602 versions: DashMap::new(),
603 ssi_manager: SsiManager::new(),
604 group_commit,
605 min_snapshot_ts: AtomicU64::new(u64::MAX),
606 apply_fn: Box::new(apply_fn),
607 })
608 }
609
610 pub fn begin(&self, isolation_level: IsolationLevel) -> Result<u64> {
612 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
613 let snapshot_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
614
615 self.wal.begin_transaction().ok(); let txn = MvccTransaction {
620 txn_id,
621 snapshot_ts,
622 status: MvccTxnStatus::Active,
623 read_set: std::collections::HashSet::new(),
624 write_set: HashMap::new(),
625 isolation_level,
626 };
627
628 self.active_txns.write().insert(txn_id, txn);
629
630 self.update_min_snapshot();
632
633 if isolation_level == IsolationLevel::Serializable {
635 self.ssi_manager.begin().ok();
636 }
637
638 Ok(txn_id)
639 }
640
641 pub fn begin_default(&self) -> Result<u64> {
643 self.begin(IsolationLevel::SnapshotIsolation)
644 }
645
646 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
648 let mut txns = self.active_txns.write();
649 let txn = txns
650 .get_mut(&txn_id)
651 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
652
653 if txn.status != MvccTxnStatus::Active {
654 return Err(SochDBError::InvalidArgument(
655 "Transaction not active".into(),
656 ));
657 }
658
659 if let Some(value) = txn.write_set.get(key) {
661 return Ok(Some(value.clone()));
662 }
663
664 txn.read_set.insert(key.to_vec());
666
667 let snapshot_ts = txn.snapshot_ts;
668 let isolation = txn.isolation_level;
669 drop(txns);
670
671 if isolation == IsolationLevel::Serializable {
673 self.ssi_manager
674 .record_read(txn_id, key)
675 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
676 }
677
678 if let Some(chain) = self.versions.get(key) {
680 Ok(chain
681 .get_visible(snapshot_ts, txn_id, &self.committed_txns)
682 .cloned())
683 } else {
684 Ok(None)
685 }
686 }
687
688 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
690 let mut txns = self.active_txns.write();
691 let txn = txns
692 .get_mut(&txn_id)
693 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
694
695 if txn.status != MvccTxnStatus::Active {
696 return Err(SochDBError::InvalidArgument(
697 "Transaction not active".into(),
698 ));
699 }
700
701 let isolation = txn.isolation_level;
702
703 if isolation == IsolationLevel::Serializable {
705 self.ssi_manager
706 .record_write(txn_id, &key)
707 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
708 }
709
710 txn.write_set.insert(key, value);
712 Ok(())
713 }
714
715 pub fn commit(&self, txn_id: u64) -> Result<u64> {
717 let txn = {
719 let mut txns = self.active_txns.write();
720 txns.remove(&txn_id)
721 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
722 };
723
724 if txn.status != MvccTxnStatus::Active {
725 return Err(SochDBError::InvalidArgument(
726 "Transaction not active".into(),
727 ));
728 }
729
730 if txn.isolation_level == IsolationLevel::Serializable {
732 self.ssi_manager
733 .commit(txn_id)
734 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
735 }
736
737 for (key, value) in &txn.write_set {
739 self.wal.write(txn_id, key.clone(), value.clone())?;
740 }
741
742 let commit_ts = self
744 .group_commit
745 .submit_and_wait(txn_id)
746 .map_err(|e| SochDBError::Internal(format!("Group commit error: {}", e)))?;
747
748 let apply_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
750 for (key, value) in &txn.write_set {
751 self.versions
752 .entry(key.clone())
753 .or_default()
754 .add(MvccVersion::new(txn_id, apply_ts, value.clone()));
755 }
756
757 for (key, value) in &txn.write_set {
759 (self.apply_fn)(key, value)?;
760 }
761
762 self.committed_txns.insert(txn_id, commit_ts);
764
765 self.update_min_snapshot();
767
768 Ok(commit_ts)
769 }
770
771 pub fn abort(&self, txn_id: u64) -> Result<()> {
773 let txn = {
774 let mut txns = self.active_txns.write();
775 txns.remove(&txn_id)
776 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
777 };
778
779 if txn.status != MvccTxnStatus::Active {
780 return Err(SochDBError::InvalidArgument(
781 "Transaction not active".into(),
782 ));
783 }
784
785 self.wal.abort_transaction(txn_id)?;
787
788 if txn.isolation_level == IsolationLevel::Serializable {
790 self.ssi_manager.abort(txn_id);
791 }
792
793 self.update_min_snapshot();
795 Ok(())
796 }
797
798 pub fn delete(&self, txn_id: u64, key: &[u8]) -> Result<bool> {
800 let txns = self.active_txns.read();
801 let txn = txns
802 .get(&txn_id)
803 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
804
805 if txn.status != MvccTxnStatus::Active {
806 return Err(SochDBError::InvalidArgument(
807 "Transaction not active".into(),
808 ));
809 }
810
811 drop(txns);
812
813 let deleted_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
814
815 if let Some(mut chain) = self.versions.get_mut(key) {
817 Ok(chain.delete(txn_id, deleted_ts))
818 } else {
819 Ok(false)
820 }
821 }
822
823 pub fn gc(&self) -> usize {
825 let min_ts = self.min_snapshot_ts.load(Ordering::SeqCst);
826 let mut total_gc = 0;
827
828 for mut entry in self.versions.iter_mut() {
830 total_gc += entry.value_mut().gc(min_ts);
831 }
832
833 self.committed_txns.retain(|_, ts| *ts >= min_ts);
835
836 total_gc += self.ssi_manager.gc(min_ts);
838
839 total_gc
840 }
841
842 fn update_min_snapshot(&self) {
844 let txns = self.active_txns.read();
845 let min = txns
846 .values()
847 .map(|t| t.snapshot_ts)
848 .min()
849 .unwrap_or(u64::MAX);
850 self.min_snapshot_ts.store(min, Ordering::SeqCst);
851 }
852
853 pub fn recover(&self) -> Result<RecoveryStats> {
855 let (committed_writes, txn_count) = self.wal.replay_for_recovery()?;
856
857 for (key, value) in &committed_writes {
858 (self.apply_fn)(key, value)?;
859 }
860
861 Ok(RecoveryStats {
862 transactions_recovered: txn_count,
863 writes_applied: committed_writes.len(),
864 })
865 }
866
867 pub fn current_timestamp(&self) -> u64 {
869 self.timestamp.load(Ordering::SeqCst)
870 }
871
872 pub fn active_count(&self) -> usize {
874 self.active_txns.read().len()
875 }
876}
877
878pub struct GroupCommitBuffer {
884 pending: RwLock<Vec<PendingCommit>>,
886 max_pending: usize,
888 max_wait_us: u64,
890 last_flush: AtomicU64,
892 arrival_rate_ema: AtomicU64,
894 last_arrival: AtomicU64,
896 fsync_latency_us: AtomicU64,
898 adaptive_batch_size: AtomicU64,
900}
901
902#[derive(Debug, Clone)]
904pub struct PendingCommit {
905 pub txn_id: u64,
906 pub enqueue_time_us: u64,
907}
908
909impl GroupCommitBuffer {
910 pub fn new(max_pending: usize, max_wait_us: u64) -> Self {
912 Self {
913 pending: RwLock::new(Vec::with_capacity(max_pending)),
914 max_pending,
915 max_wait_us,
916 last_flush: AtomicU64::new(0),
917 arrival_rate_ema: AtomicU64::new(100_000), last_arrival: AtomicU64::new(0),
919 fsync_latency_us: AtomicU64::new(5000), adaptive_batch_size: AtomicU64::new(10), }
922 }
923
924 pub fn with_fsync_latency(max_pending: usize, max_wait_us: u64, fsync_latency_us: u64) -> Self {
926 let buffer = Self::new(max_pending, max_wait_us);
927 buffer
928 .fsync_latency_us
929 .store(fsync_latency_us, Ordering::Relaxed);
930 buffer.recompute_batch_size();
931 buffer
932 }
933
934 fn now_us() -> u64 {
935 std::time::SystemTime::now()
936 .duration_since(std::time::UNIX_EPOCH)
937 .unwrap()
938 .as_micros() as u64
939 }
940
941 fn update_arrival_rate(&self) {
943 let now = Self::now_us();
944 let last = self.last_arrival.swap(now, Ordering::Relaxed);
945
946 if last > 0 {
947 let delta_us = now.saturating_sub(last);
948 if delta_us > 0 {
949 let instant_rate = 1_000_000_000 / delta_us;
952
953 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
955 let new_rate = (old_rate * 9 + instant_rate) / 10;
956 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
957 }
958 }
959 }
960
961 fn recompute_batch_size(&self) {
966 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; let l_fsync = self.fsync_latency_us.load(Ordering::Relaxed) as f64; let c_wait = 1.0; let l_fsync_s = l_fsync / 1_000_000.0;
973 let n_opt = (2.0 * l_fsync_s * lambda / c_wait).sqrt();
974
975 let batch_size = n_opt.clamp(1.0, self.max_pending as f64) as u64;
976 self.adaptive_batch_size
977 .store(batch_size, Ordering::Relaxed);
978 }
979
980 pub fn add(&self, txn_id: u64) -> bool {
984 self.update_arrival_rate();
985
986 let now = Self::now_us();
987 let commit = PendingCommit {
988 txn_id,
989 enqueue_time_us: now,
990 };
991
992 let mut pending = self.pending.write();
993 pending.push(commit);
994
995 let adaptive_size = self.adaptive_batch_size.load(Ordering::Relaxed) as usize;
996 let target_size = adaptive_size.max(1).min(self.max_pending);
997
998 if pending.len() >= target_size {
999 return true;
1000 }
1001
1002 let last = self.last_flush.load(Ordering::Relaxed);
1004 if now - last > self.max_wait_us {
1005 return true;
1006 }
1007
1008 false
1009 }
1010
1011 pub fn take_pending(&self) -> Vec<PendingCommit> {
1013 let mut pending = self.pending.write();
1014 let result = std::mem::take(&mut *pending);
1015
1016 let now = Self::now_us();
1017 self.last_flush.store(now, Ordering::Relaxed);
1018
1019 self.recompute_batch_size();
1021
1022 result
1023 }
1024
1025 pub fn record_fsync_latency(&self, latency_us: u64) {
1027 let old = self.fsync_latency_us.load(Ordering::Relaxed);
1029 let new = (old * 4 + latency_us) / 5;
1030 self.fsync_latency_us.store(new, Ordering::Relaxed);
1031
1032 self.recompute_batch_size();
1034 }
1035
1036 pub fn current_batch_size(&self) -> usize {
1038 self.adaptive_batch_size.load(Ordering::Relaxed) as usize
1039 }
1040
1041 pub fn current_arrival_rate(&self) -> f64 {
1043 self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
1044 }
1045
1046 pub fn stats(&self) -> GroupCommitStats {
1048 GroupCommitStats {
1049 adaptive_batch_size: self.adaptive_batch_size.load(Ordering::Relaxed) as usize,
1050 arrival_rate: self.current_arrival_rate(),
1051 fsync_latency_us: self.fsync_latency_us.load(Ordering::Relaxed),
1052 pending_count: self.pending.read().len(),
1053 }
1054 }
1055}
1056
1057#[derive(Debug, Clone)]
1059pub struct GroupCommitStats {
1060 pub adaptive_batch_size: usize,
1062 pub arrival_rate: f64,
1064 pub fsync_latency_us: u64,
1066 pub pending_count: usize,
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072 use super::*;
1073 use std::sync::atomic::AtomicUsize;
1074 use tempfile::tempdir;
1075
1076 #[test]
1077 fn test_basic_transaction() {
1078 let dir = tempdir().unwrap();
1079 let wal_path = dir.path().join("test.wal");
1080
1081 let writes = Arc::new(RwLock::new(Vec::new()));
1082 let writes_clone = writes.clone();
1083
1084 let manager = WalStorageManager::new(wal_path, move |k, v| {
1085 writes_clone.write().push((k.to_vec(), v.to_vec()));
1086 Ok(())
1087 })
1088 .unwrap();
1089
1090 let txn_id = manager.begin_txn().unwrap();
1092
1093 manager
1095 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1096 .unwrap();
1097 manager
1098 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
1099 .unwrap();
1100
1101 assert!(writes.read().is_empty());
1103
1104 manager.commit(txn_id).unwrap();
1106
1107 let applied = writes.read();
1109 assert_eq!(applied.len(), 2);
1110 assert_eq!(applied[0], (b"key1".to_vec(), b"value1".to_vec()));
1111 assert_eq!(applied[1], (b"key2".to_vec(), b"value2".to_vec()));
1112 }
1113
1114 #[test]
1115 fn test_abort_transaction() {
1116 let dir = tempdir().unwrap();
1117 let wal_path = dir.path().join("test.wal");
1118
1119 let writes = Arc::new(RwLock::new(Vec::new()));
1120 let writes_clone = writes.clone();
1121
1122 let manager = WalStorageManager::new(wal_path, move |k, v| {
1123 writes_clone.write().push((k.to_vec(), v.to_vec()));
1124 Ok(())
1125 })
1126 .unwrap();
1127
1128 let txn_id = manager.begin_txn().unwrap();
1129 manager
1130 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1131 .unwrap();
1132
1133 manager.abort(txn_id).unwrap();
1135
1136 assert!(writes.read().is_empty());
1138 }
1139
1140 #[test]
1141 fn test_immediate_write() {
1142 let dir = tempdir().unwrap();
1143 let wal_path = dir.path().join("test.wal");
1144
1145 let write_count = Arc::new(AtomicUsize::new(0));
1146 let count_clone = write_count.clone();
1147
1148 let manager = WalStorageManager::new(wal_path, move |_, _| {
1149 count_clone.fetch_add(1, Ordering::SeqCst);
1150 Ok(())
1151 })
1152 .unwrap();
1153
1154 let txn_id = manager.begin_txn().unwrap();
1155
1156 manager
1158 .write_immediate(txn_id, b"key1".to_vec(), b"value1".to_vec())
1159 .unwrap();
1160 assert_eq!(write_count.load(Ordering::SeqCst), 1);
1161
1162 manager.commit(txn_id).unwrap();
1163 }
1164
1165 #[test]
1166 fn test_group_commit_buffer() {
1167 let buffer = GroupCommitBuffer::with_fsync_latency(10, 1000, 5000);
1169
1170 let _ = buffer.take_pending();
1176
1177 buffer.add(1);
1179 buffer.add(2);
1180 buffer.add(3);
1181
1182 let pending = buffer.take_pending();
1183 assert_eq!(pending.len(), 3);
1184 assert_eq!(pending[0].txn_id, 1);
1185 assert_eq!(pending[1].txn_id, 2);
1186 assert_eq!(pending[2].txn_id, 3);
1187 }
1188
1189 #[test]
1190 fn test_adaptive_batch_sizing() {
1191 let buffer = GroupCommitBuffer::with_fsync_latency(100, 10000, 5000);
1192
1193 for i in 0..50 {
1195 buffer.add(i);
1196 std::thread::sleep(std::time::Duration::from_micros(100)); }
1198
1199 let stats = buffer.stats();
1201 assert!(stats.adaptive_batch_size >= 1);
1202 }
1203
1204 #[test]
1209 fn test_mvcc_basic_transaction() {
1210 let dir = tempdir().unwrap();
1211 let wal_path = dir.path().join("mvcc_test.wal");
1212
1213 let writes = Arc::new(RwLock::new(Vec::new()));
1214 let writes_clone = writes.clone();
1215
1216 let manager = MvccTransactionManager::new(wal_path, move |k, v| {
1217 writes_clone.write().push((k.to_vec(), v.to_vec()));
1218 Ok(())
1219 })
1220 .unwrap();
1221
1222 let txn_id = manager.begin_default().unwrap();
1224
1225 manager
1227 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1228 .unwrap();
1229
1230 let value = manager.read(txn_id, b"key1").unwrap();
1232 assert_eq!(value, Some(b"value1".to_vec()));
1233
1234 let commit_ts = manager.commit(txn_id).unwrap();
1236 assert!(commit_ts > 0);
1237
1238 assert_eq!(writes.read().len(), 1);
1240 }
1241
1242 #[test]
1243 fn test_mvcc_snapshot_isolation() {
1244 let dir = tempdir().unwrap();
1245 let wal_path = dir.path().join("mvcc_si_test.wal");
1246
1247 let manager = MvccTransactionManager::new(wal_path, |_, _| Ok(())).unwrap();
1248
1249 let txn1 = manager.begin_default().unwrap();
1251 manager
1252 .write(txn1, b"key1".to_vec(), b"v1".to_vec())
1253 .unwrap();
1254 manager.commit(txn1).unwrap();
1255
1256 let txn2 = manager.begin_default().unwrap();
1258
1259 let txn3 = manager.begin_default().unwrap();
1261 manager
1262 .write(txn3, b"key1".to_vec(), b"v3".to_vec())
1263 .unwrap();
1264 manager.commit(txn3).unwrap();
1265
1266 let _value = manager.read(txn2, b"key1").unwrap();
1271
1272 manager.commit(txn2).unwrap();
1273 }
1274
1275 #[test]
1276 fn test_mvcc_abort() {
1277 let dir = tempdir().unwrap();
1278 let wal_path = dir.path().join("mvcc_abort_test.wal");
1279
1280 let writes = Arc::new(RwLock::new(Vec::new()));
1281 let writes_clone = writes.clone();
1282
1283 let manager = MvccTransactionManager::new(wal_path, move |k, v| {
1284 writes_clone.write().push((k.to_vec(), v.to_vec()));
1285 Ok(())
1286 })
1287 .unwrap();
1288
1289 let txn_id = manager.begin_default().unwrap();
1290 manager
1291 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1292 .unwrap();
1293
1294 manager.abort(txn_id).unwrap();
1296
1297 assert!(writes.read().is_empty());
1299 }
1300
1301 #[test]
1302 fn test_mvcc_version_visibility() {
1303 let mut chain = MvccVersionChain::default();
1304 let committed: HashMap<u64, u64> = [(1, 10), (2, 20)].into_iter().collect();
1305
1306 chain.add(MvccVersion::new(1, 5, b"v1".to_vec()));
1308
1309 chain.add(MvccVersion::new(2, 15, b"v2".to_vec()));
1311
1312 let visible = chain.get_visible_legacy(15, 99, &committed);
1314 assert_eq!(visible, Some(&b"v1".to_vec()));
1315
1316 let visible = chain.get_visible_legacy(25, 99, &committed);
1318 assert_eq!(visible, Some(&b"v2".to_vec()));
1319 }
1320
1321 #[test]
1322 fn test_mvcc_version_gc() {
1323 let mut chain = MvccVersionChain::default();
1324
1325 for i in 0..5 {
1327 let mut version = MvccVersion::new(i, i * 10, vec![i as u8]);
1328 if i < 4 {
1330 version.mark_deleted(i + 1, (i + 1) * 10);
1331 }
1332 chain.add(version);
1333 }
1334
1335 assert_eq!(chain.versions.len(), 5);
1336
1337 let gc_count = chain.gc(45);
1340 assert!(chain.versions.len() < 5 || gc_count == 0);
1342 }
1343
1344 #[test]
1345 fn test_mvcc_concurrent_transactions() {
1346 let dir = tempdir().unwrap();
1347 let wal_path = dir.path().join("mvcc_concurrent_test.wal");
1348
1349 let manager = Arc::new(MvccTransactionManager::new(wal_path, |_, _| Ok(())).unwrap());
1350
1351 let handles: Vec<_> = (0..4)
1353 .map(|i| {
1354 let m = manager.clone();
1355 std::thread::spawn(move || {
1356 let txn = m.begin_default().unwrap();
1357 m.write(
1358 txn,
1359 format!("key{}", i).into_bytes(),
1360 format!("value{}", i).into_bytes(),
1361 )
1362 .unwrap();
1363 m.commit(txn).unwrap();
1364 })
1365 })
1366 .collect();
1367
1368 for h in handles {
1369 h.join().unwrap();
1370 }
1371
1372 assert_eq!(manager.active_count(), 0);
1374 }
1375}