1use crate::group_commit::EventDrivenGroupCommit;
77use crate::ssi::SsiManager;
78use crate::txn_wal::TxnWal;
79use dashmap::DashMap;
80use parking_lot::RwLock;
81use sochdb_core::{Result, SochDBError};
82use std::collections::HashMap;
83use std::path::Path;
84use std::sync::Arc;
85use std::sync::atomic::{AtomicU64, Ordering};
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum TxnState {
90 Active,
92 Prepared,
94 Committed,
96 Aborted,
98}
99
100#[derive(Debug)]
102pub struct Transaction {
103 pub id: u64,
105 pub start_ts: u64,
107 pub state: TxnState,
109 writes: Vec<(Vec<u8>, Vec<u8>)>,
111 reads: Vec<Vec<u8>>,
113}
114
115impl Transaction {
116 fn new(id: u64, start_ts: u64) -> Self {
117 Self {
118 id,
119 start_ts,
120 state: TxnState::Active,
121 writes: Vec::new(),
122 reads: Vec::new(),
123 }
124 }
125
126 pub fn write(&mut self, key: Vec<u8>, value: Vec<u8>) {
128 self.writes.push((key, value));
129 }
130
131 pub fn record_read(&mut self, key: Vec<u8>) {
133 self.reads.push(key);
134 }
135
136 pub fn writes(&self) -> &[(Vec<u8>, Vec<u8>)] {
138 &self.writes
139 }
140}
141
142#[allow(clippy::type_complexity)]
146pub struct WalStorageManager {
147 wal: Arc<TxnWal>,
149 active_txns: RwLock<HashMap<u64, Transaction>>,
151 timestamp: AtomicU64,
153 apply_fn: Box<dyn Fn(&[u8], &[u8]) -> Result<()> + Send + Sync>,
155}
156
157impl WalStorageManager {
158 pub fn new<P: AsRef<Path>, F>(wal_path: P, apply_fn: F) -> Result<Self>
160 where
161 F: Fn(&[u8], &[u8]) -> Result<()> + Send + Sync + 'static,
162 {
163 let wal = Arc::new(TxnWal::new(wal_path)?);
164
165 Ok(Self {
166 wal,
167 active_txns: RwLock::new(HashMap::new()),
168 timestamp: AtomicU64::new(1),
169 apply_fn: Box::new(apply_fn),
170 })
171 }
172
173 pub fn begin_txn(&self) -> Result<u64> {
175 let txn_id = self.wal.begin_transaction()?;
176 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
177
178 let txn = Transaction::new(txn_id, start_ts);
179 self.active_txns.write().insert(txn_id, txn);
180
181 Ok(txn_id)
182 }
183
184 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
188 let mut txns = self.active_txns.write();
189 let txn = txns
190 .get_mut(&txn_id)
191 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
192
193 if txn.state != TxnState::Active {
194 return Err(SochDBError::InvalidArgument(
195 "Transaction not active".into(),
196 ));
197 }
198
199 txn.write(key, value);
200 Ok(())
201 }
202
203 pub fn write_immediate(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
207 {
209 let txns = self.active_txns.read();
210 let txn = txns
211 .get(&txn_id)
212 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
213
214 if txn.state != TxnState::Active {
215 return Err(SochDBError::InvalidArgument(
216 "Transaction not active".into(),
217 ));
218 }
219 }
220
221 self.wal.write(txn_id, key.clone(), value.clone())?;
223
224 (self.apply_fn)(&key, &value)?;
226
227 Ok(())
228 }
229
230 pub fn commit(&self, txn_id: u64) -> Result<u64> {
237 let txn = {
238 let mut txns = self.active_txns.write();
239 txns.remove(&txn_id)
240 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
241 };
242
243 if txn.state != TxnState::Active {
244 return Err(SochDBError::InvalidArgument(
245 "Transaction not active".into(),
246 ));
247 }
248
249 for (key, value) in &txn.writes {
251 self.wal.write(txn_id, key.clone(), value.clone())?;
252 }
253
254 self.wal.commit_transaction(txn_id)?;
256
257 for (key, value) in &txn.writes {
259 (self.apply_fn)(key, value)?;
260 }
261
262 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
264 Ok(commit_ts)
265 }
266
267 pub fn abort(&self, txn_id: u64) -> Result<()> {
271 let mut txns = self.active_txns.write();
272 let txn = txns
273 .remove(&txn_id)
274 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
275
276 if txn.state != TxnState::Active && txn.state != TxnState::Prepared {
277 return Err(SochDBError::InvalidArgument(
278 "Transaction cannot be aborted".into(),
279 ));
280 }
281
282 self.wal.abort_transaction(txn_id)?;
284
285 Ok(())
287 }
288
289 pub fn recover(&self) -> Result<RecoveryStats> {
293 let (committed_writes, txn_count) = self.wal.replay_for_recovery()?;
294
295 for (key, value) in &committed_writes {
296 (self.apply_fn)(key, value)?;
297 }
298
299 Ok(RecoveryStats {
300 transactions_recovered: txn_count,
301 writes_applied: committed_writes.len(),
302 })
303 }
304
305 pub fn checkpoint(&self) -> Result<()> {
309 self.wal.write_checkpoint()?;
310 self.wal.truncate()?;
311 Ok(())
312 }
313
314 pub fn wal(&self) -> &Arc<TxnWal> {
316 &self.wal
317 }
318
319 pub fn current_timestamp(&self) -> u64 {
321 self.timestamp.load(Ordering::SeqCst)
322 }
323}
324
325#[derive(Debug, Clone, Default)]
327pub struct RecoveryStats {
328 pub transactions_recovered: usize,
330 pub writes_applied: usize,
332}
333
334#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum IsolationLevel {
341 ReadCommitted,
343 SnapshotIsolation,
345 Serializable,
347}
348
349#[derive(Debug)]
351pub struct MvccTransaction {
352 pub txn_id: u64,
354 pub snapshot_ts: u64,
356 pub status: MvccTxnStatus,
358 pub read_set: std::collections::HashSet<Vec<u8>>,
360 pub write_set: HashMap<Vec<u8>, Vec<u8>>,
362 pub isolation_level: IsolationLevel,
364}
365
366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum MvccTxnStatus {
369 Active,
370 Committed(u64), Aborted,
372}
373
374#[derive(Debug, Clone)]
376pub struct MvccVersion {
377 pub xmin: u64,
379 pub xmax: u64,
381 pub created_ts: u64,
383 pub deleted_ts: u64,
385 pub value: Vec<u8>,
387}
388
389impl MvccVersion {
390 pub fn new(xmin: u64, created_ts: u64, value: Vec<u8>) -> Self {
392 Self {
393 xmin,
394 xmax: 0,
395 created_ts,
396 deleted_ts: u64::MAX,
397 value,
398 }
399 }
400
401 pub fn mark_deleted(&mut self, xmax: u64, deleted_ts: u64) {
403 self.xmax = xmax;
404 self.deleted_ts = deleted_ts;
405 }
406
407 pub fn is_visible(
409 &self,
410 snapshot_ts: u64,
411 txn_id: u64,
412 committed_txns: &HashMap<u64, u64>,
413 ) -> bool {
414 if self.xmin == txn_id {
416 return self.xmax != txn_id; }
418
419 match committed_txns.get(&self.xmin) {
421 Some(&commit_ts) if commit_ts < snapshot_ts => {}
422 _ => return false, }
424
425 if self.xmax == 0 {
427 return true; }
429 if self.xmax == txn_id {
430 return false; }
432 match committed_txns.get(&self.xmax) {
433 Some(&commit_ts) => commit_ts >= snapshot_ts, None => true, }
436 }
437
438 pub fn is_visible_dashmap(
440 &self,
441 snapshot_ts: u64,
442 txn_id: u64,
443 committed_txns: &DashMap<u64, u64>,
444 ) -> bool {
445 if self.xmin == txn_id {
447 return self.xmax != txn_id; }
449
450 match committed_txns.get(&self.xmin) {
452 Some(commit_ts_ref) if *commit_ts_ref < snapshot_ts => {}
453 _ => return false, }
455
456 if self.xmax == 0 {
458 return true; }
460 if self.xmax == txn_id {
461 return false; }
463 match committed_txns.get(&self.xmax) {
464 Some(commit_ts_ref) => *commit_ts_ref >= snapshot_ts, None => true, }
467 }
468}
469
470#[derive(Debug, Default)]
472pub struct MvccVersionChain {
473 versions: Vec<MvccVersion>,
475}
476
477impl MvccVersionChain {
478 pub fn add(&mut self, version: MvccVersion) {
480 self.versions.insert(0, version);
481 }
482
483 pub fn get_visible(
486 &self,
487 snapshot_ts: u64,
488 txn_id: u64,
489 committed: &DashMap<u64, u64>,
490 ) -> Option<&Vec<u8>> {
491 for v in &self.versions {
492 if v.is_visible_dashmap(snapshot_ts, txn_id, committed) {
493 return Some(&v.value);
494 }
495 }
496 None
497 }
498
499 pub fn get_visible_legacy(
501 &self,
502 snapshot_ts: u64,
503 txn_id: u64,
504 committed: &HashMap<u64, u64>,
505 ) -> Option<&Vec<u8>> {
506 for v in &self.versions {
507 if v.is_visible(snapshot_ts, txn_id, committed) {
508 return Some(&v.value);
509 }
510 }
511 None
512 }
513
514 pub fn delete(&mut self, xmax: u64, deleted_ts: u64) -> bool {
516 if let Some(v) = self.versions.first_mut()
517 && v.xmax == 0
518 {
519 v.mark_deleted(xmax, deleted_ts);
520 return true;
521 }
522 false
523 }
524
525 pub fn gc(&mut self, min_visible_ts: u64) -> usize {
527 let old_len = self.versions.len();
528 if old_len <= 1 {
529 return 0;
530 }
531 self.versions.retain(|v| v.deleted_ts >= min_visible_ts);
532 if self.versions.is_empty() {
533 return old_len;
534 }
535 old_len - self.versions.len()
536 }
537}
538
539pub struct MvccTransactionManager {
558 wal: Arc<TxnWal>,
560 next_txn_id: AtomicU64,
562 timestamp: AtomicU64,
564 active_txns: RwLock<HashMap<u64, MvccTransaction>>,
566 committed_txns: DashMap<u64, u64>,
568 versions: DashMap<Vec<u8>, MvccVersionChain>,
570 ssi_manager: SsiManager,
572 group_commit: EventDrivenGroupCommit,
574 min_snapshot_ts: AtomicU64,
576 #[allow(clippy::type_complexity)]
578 apply_fn: Box<dyn Fn(&[u8], &[u8]) -> Result<()> + Send + Sync>,
579}
580
581impl MvccTransactionManager {
582 pub fn new<P: AsRef<Path>, F>(wal_path: P, apply_fn: F) -> Result<Self>
584 where
585 F: Fn(&[u8], &[u8]) -> Result<()> + Send + Sync + 'static,
586 {
587 let wal = Arc::new(TxnWal::new(wal_path)?);
588 let wal_for_gc = wal.clone();
589
590 let group_commit = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
592 for &txn_id in txn_ids {
594 wal_for_gc
595 .commit_transaction(txn_id)
596 .map_err(|e| e.to_string())?;
597 }
598 let commit_ts = std::time::SystemTime::now()
599 .duration_since(std::time::UNIX_EPOCH)
600 .unwrap()
601 .as_micros() as u64;
602 Ok(commit_ts)
603 });
604
605 Ok(Self {
606 wal,
607 next_txn_id: AtomicU64::new(1),
608 timestamp: AtomicU64::new(1),
609 active_txns: RwLock::new(HashMap::new()),
610 committed_txns: DashMap::new(),
611 versions: DashMap::new(),
612 ssi_manager: SsiManager::new(),
613 group_commit,
614 min_snapshot_ts: AtomicU64::new(u64::MAX),
615 apply_fn: Box::new(apply_fn),
616 })
617 }
618
619 pub fn begin(&self, isolation_level: IsolationLevel) -> Result<u64> {
621 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
622 let snapshot_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
623
624 self.wal.begin_transaction().ok(); let txn = MvccTransaction {
629 txn_id,
630 snapshot_ts,
631 status: MvccTxnStatus::Active,
632 read_set: std::collections::HashSet::new(),
633 write_set: HashMap::new(),
634 isolation_level,
635 };
636
637 self.active_txns.write().insert(txn_id, txn);
638
639 self.update_min_snapshot();
641
642 if isolation_level == IsolationLevel::Serializable {
649 self.ssi_manager.begin_with_id(txn_id).ok();
650 }
651
652 Ok(txn_id)
653 }
654
655 pub fn begin_default(&self) -> Result<u64> {
657 self.begin(IsolationLevel::SnapshotIsolation)
658 }
659
660 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
662 let mut txns = self.active_txns.write();
663 let txn = txns
664 .get_mut(&txn_id)
665 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
666
667 if txn.status != MvccTxnStatus::Active {
668 return Err(SochDBError::InvalidArgument(
669 "Transaction not active".into(),
670 ));
671 }
672
673 if let Some(value) = txn.write_set.get(key) {
675 return Ok(Some(value.clone()));
676 }
677
678 txn.read_set.insert(key.to_vec());
680
681 let snapshot_ts = txn.snapshot_ts;
682 let isolation = txn.isolation_level;
683 drop(txns);
684
685 if isolation == IsolationLevel::Serializable {
687 self.ssi_manager
688 .record_read(txn_id, key)
689 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
690 }
691
692 if let Some(chain) = self.versions.get(key) {
694 Ok(chain
695 .get_visible(snapshot_ts, txn_id, &self.committed_txns)
696 .cloned())
697 } else {
698 Ok(None)
699 }
700 }
701
702 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
704 let mut txns = self.active_txns.write();
705 let txn = txns
706 .get_mut(&txn_id)
707 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
708
709 if txn.status != MvccTxnStatus::Active {
710 return Err(SochDBError::InvalidArgument(
711 "Transaction not active".into(),
712 ));
713 }
714
715 let isolation = txn.isolation_level;
716
717 if isolation == IsolationLevel::Serializable {
719 self.ssi_manager
720 .record_write(txn_id, &key)
721 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
722 }
723
724 txn.write_set.insert(key, value);
726 Ok(())
727 }
728
729 pub fn commit(&self, txn_id: u64) -> Result<u64> {
731 let txn = {
733 let mut txns = self.active_txns.write();
734 txns.remove(&txn_id)
735 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
736 };
737
738 if txn.status != MvccTxnStatus::Active {
739 return Err(SochDBError::InvalidArgument(
740 "Transaction not active".into(),
741 ));
742 }
743
744 if txn.isolation_level == IsolationLevel::Serializable {
746 self.ssi_manager
747 .commit(txn_id)
748 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
749 }
750
751 for (key, value) in &txn.write_set {
753 self.wal.write(txn_id, key.clone(), value.clone())?;
754 }
755
756 let commit_ts = self
758 .group_commit
759 .submit_and_wait(txn_id)
760 .map_err(|e| SochDBError::Internal(format!("Group commit error: {}", e)))?;
761
762 let apply_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
764 for (key, value) in &txn.write_set {
765 self.versions
766 .entry(key.clone())
767 .or_default()
768 .add(MvccVersion::new(txn_id, apply_ts, value.clone()));
769 }
770
771 for (key, value) in &txn.write_set {
773 (self.apply_fn)(key, value)?;
774 }
775
776 self.committed_txns.insert(txn_id, commit_ts);
778
779 self.update_min_snapshot();
781
782 Ok(commit_ts)
783 }
784
785 pub fn abort(&self, txn_id: u64) -> Result<()> {
787 let txn = {
788 let mut txns = self.active_txns.write();
789 txns.remove(&txn_id)
790 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
791 };
792
793 if txn.status != MvccTxnStatus::Active {
794 return Err(SochDBError::InvalidArgument(
795 "Transaction not active".into(),
796 ));
797 }
798
799 self.wal.abort_transaction(txn_id)?;
801
802 if txn.isolation_level == IsolationLevel::Serializable {
804 self.ssi_manager.abort(txn_id);
805 }
806
807 self.update_min_snapshot();
809 Ok(())
810 }
811
812 pub fn delete(&self, txn_id: u64, key: &[u8]) -> Result<bool> {
814 let txns = self.active_txns.read();
815 let txn = txns
816 .get(&txn_id)
817 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
818
819 if txn.status != MvccTxnStatus::Active {
820 return Err(SochDBError::InvalidArgument(
821 "Transaction not active".into(),
822 ));
823 }
824
825 drop(txns);
826
827 let deleted_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
828
829 if let Some(mut chain) = self.versions.get_mut(key) {
831 Ok(chain.delete(txn_id, deleted_ts))
832 } else {
833 Ok(false)
834 }
835 }
836
837 pub fn gc(&self) -> usize {
839 let min_ts = self.min_snapshot_ts.load(Ordering::SeqCst);
840 let mut total_gc = 0;
841
842 for mut entry in self.versions.iter_mut() {
844 total_gc += entry.value_mut().gc(min_ts);
845 }
846
847 self.committed_txns.retain(|_, ts| *ts >= min_ts);
849
850 total_gc += self.ssi_manager.gc(min_ts);
852
853 total_gc
854 }
855
856 fn update_min_snapshot(&self) {
858 let txns = self.active_txns.read();
859 let min = txns
860 .values()
861 .map(|t| t.snapshot_ts)
862 .min()
863 .unwrap_or(u64::MAX);
864 self.min_snapshot_ts.store(min, Ordering::SeqCst);
865 }
866
867 pub fn recover(&self) -> Result<RecoveryStats> {
869 let (committed_writes, txn_count) = self.wal.replay_for_recovery()?;
870
871 for (key, value) in &committed_writes {
872 (self.apply_fn)(key, value)?;
873 }
874
875 Ok(RecoveryStats {
876 transactions_recovered: txn_count,
877 writes_applied: committed_writes.len(),
878 })
879 }
880
881 pub fn current_timestamp(&self) -> u64 {
883 self.timestamp.load(Ordering::SeqCst)
884 }
885
886 pub fn active_count(&self) -> usize {
888 self.active_txns.read().len()
889 }
890}
891
892pub struct GroupCommitBuffer {
898 pending: RwLock<Vec<PendingCommit>>,
900 max_pending: usize,
902 max_wait_us: u64,
904 last_flush: AtomicU64,
906 arrival_rate_ema: AtomicU64,
908 last_arrival: AtomicU64,
910 fsync_latency_us: AtomicU64,
912 adaptive_batch_size: AtomicU64,
914}
915
916#[derive(Debug, Clone)]
918pub struct PendingCommit {
919 pub txn_id: u64,
920 pub enqueue_time_us: u64,
921}
922
923impl GroupCommitBuffer {
924 pub fn new(max_pending: usize, max_wait_us: u64) -> Self {
926 Self {
927 pending: RwLock::new(Vec::with_capacity(max_pending)),
928 max_pending,
929 max_wait_us,
930 last_flush: AtomicU64::new(0),
931 arrival_rate_ema: AtomicU64::new(100_000), last_arrival: AtomicU64::new(0),
933 fsync_latency_us: AtomicU64::new(5000), adaptive_batch_size: AtomicU64::new(10), }
936 }
937
938 pub fn with_fsync_latency(max_pending: usize, max_wait_us: u64, fsync_latency_us: u64) -> Self {
940 let buffer = Self::new(max_pending, max_wait_us);
941 buffer
942 .fsync_latency_us
943 .store(fsync_latency_us, Ordering::Relaxed);
944 buffer.recompute_batch_size();
945 buffer
946 }
947
948 fn now_us() -> u64 {
949 std::time::SystemTime::now()
950 .duration_since(std::time::UNIX_EPOCH)
951 .unwrap()
952 .as_micros() as u64
953 }
954
955 fn update_arrival_rate(&self) {
957 let now = Self::now_us();
958 let last = self.last_arrival.swap(now, Ordering::Relaxed);
959
960 if last > 0 {
961 let delta_us = now.saturating_sub(last);
962 if delta_us > 0 {
963 let instant_rate = 1_000_000_000 / delta_us;
966
967 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
969 let new_rate = (old_rate * 9 + instant_rate) / 10;
970 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
971 }
972 }
973 }
974
975 fn recompute_batch_size(&self) {
980 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; let l_fsync = self.fsync_latency_us.load(Ordering::Relaxed) as f64; let c_wait = 1.0; let l_fsync_s = l_fsync / 1_000_000.0;
987 let n_opt = (2.0 * l_fsync_s * lambda / c_wait).sqrt();
988
989 let batch_size = n_opt.clamp(1.0, self.max_pending as f64) as u64;
990 self.adaptive_batch_size
991 .store(batch_size, Ordering::Relaxed);
992 }
993
994 pub fn add(&self, txn_id: u64) -> bool {
998 self.update_arrival_rate();
999
1000 let now = Self::now_us();
1001 let commit = PendingCommit {
1002 txn_id,
1003 enqueue_time_us: now,
1004 };
1005
1006 let mut pending = self.pending.write();
1007 pending.push(commit);
1008
1009 let adaptive_size = self.adaptive_batch_size.load(Ordering::Relaxed) as usize;
1010 let target_size = adaptive_size.max(1).min(self.max_pending);
1011
1012 if pending.len() >= target_size {
1013 return true;
1014 }
1015
1016 let last = self.last_flush.load(Ordering::Relaxed);
1018 if now - last > self.max_wait_us {
1019 return true;
1020 }
1021
1022 false
1023 }
1024
1025 pub fn take_pending(&self) -> Vec<PendingCommit> {
1027 let mut pending = self.pending.write();
1028 let result = std::mem::take(&mut *pending);
1029
1030 let now = Self::now_us();
1031 self.last_flush.store(now, Ordering::Relaxed);
1032
1033 self.recompute_batch_size();
1035
1036 result
1037 }
1038
1039 pub fn record_fsync_latency(&self, latency_us: u64) {
1041 let old = self.fsync_latency_us.load(Ordering::Relaxed);
1043 let new = (old * 4 + latency_us) / 5;
1044 self.fsync_latency_us.store(new, Ordering::Relaxed);
1045
1046 self.recompute_batch_size();
1048 }
1049
1050 pub fn current_batch_size(&self) -> usize {
1052 self.adaptive_batch_size.load(Ordering::Relaxed) as usize
1053 }
1054
1055 pub fn current_arrival_rate(&self) -> f64 {
1057 self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
1058 }
1059
1060 pub fn stats(&self) -> GroupCommitStats {
1062 GroupCommitStats {
1063 adaptive_batch_size: self.adaptive_batch_size.load(Ordering::Relaxed) as usize,
1064 arrival_rate: self.current_arrival_rate(),
1065 fsync_latency_us: self.fsync_latency_us.load(Ordering::Relaxed),
1066 pending_count: self.pending.read().len(),
1067 }
1068 }
1069}
1070
1071#[derive(Debug, Clone)]
1073pub struct GroupCommitStats {
1074 pub adaptive_batch_size: usize,
1076 pub arrival_rate: f64,
1078 pub fsync_latency_us: u64,
1080 pub pending_count: usize,
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use super::*;
1087 use std::sync::atomic::AtomicUsize;
1088 use tempfile::tempdir;
1089
1090 #[test]
1091 fn test_basic_transaction() {
1092 let dir = tempdir().unwrap();
1093 let wal_path = dir.path().join("test.wal");
1094
1095 let writes = Arc::new(RwLock::new(Vec::new()));
1096 let writes_clone = writes.clone();
1097
1098 let manager = WalStorageManager::new(wal_path, move |k, v| {
1099 writes_clone.write().push((k.to_vec(), v.to_vec()));
1100 Ok(())
1101 })
1102 .unwrap();
1103
1104 let txn_id = manager.begin_txn().unwrap();
1106
1107 manager
1109 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1110 .unwrap();
1111 manager
1112 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
1113 .unwrap();
1114
1115 assert!(writes.read().is_empty());
1117
1118 manager.commit(txn_id).unwrap();
1120
1121 let applied = writes.read();
1123 assert_eq!(applied.len(), 2);
1124 assert_eq!(applied[0], (b"key1".to_vec(), b"value1".to_vec()));
1125 assert_eq!(applied[1], (b"key2".to_vec(), b"value2".to_vec()));
1126 }
1127
1128 #[test]
1129 fn test_abort_transaction() {
1130 let dir = tempdir().unwrap();
1131 let wal_path = dir.path().join("test.wal");
1132
1133 let writes = Arc::new(RwLock::new(Vec::new()));
1134 let writes_clone = writes.clone();
1135
1136 let manager = WalStorageManager::new(wal_path, move |k, v| {
1137 writes_clone.write().push((k.to_vec(), v.to_vec()));
1138 Ok(())
1139 })
1140 .unwrap();
1141
1142 let txn_id = manager.begin_txn().unwrap();
1143 manager
1144 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1145 .unwrap();
1146
1147 manager.abort(txn_id).unwrap();
1149
1150 assert!(writes.read().is_empty());
1152 }
1153
1154 #[test]
1155 fn test_immediate_write() {
1156 let dir = tempdir().unwrap();
1157 let wal_path = dir.path().join("test.wal");
1158
1159 let write_count = Arc::new(AtomicUsize::new(0));
1160 let count_clone = write_count.clone();
1161
1162 let manager = WalStorageManager::new(wal_path, move |_, _| {
1163 count_clone.fetch_add(1, Ordering::SeqCst);
1164 Ok(())
1165 })
1166 .unwrap();
1167
1168 let txn_id = manager.begin_txn().unwrap();
1169
1170 manager
1172 .write_immediate(txn_id, b"key1".to_vec(), b"value1".to_vec())
1173 .unwrap();
1174 assert_eq!(write_count.load(Ordering::SeqCst), 1);
1175
1176 manager.commit(txn_id).unwrap();
1177 }
1178
1179 #[test]
1180 fn test_group_commit_buffer() {
1181 let buffer = GroupCommitBuffer::with_fsync_latency(10, 1000, 5000);
1183
1184 let _ = buffer.take_pending();
1190
1191 buffer.add(1);
1193 buffer.add(2);
1194 buffer.add(3);
1195
1196 let pending = buffer.take_pending();
1197 assert_eq!(pending.len(), 3);
1198 assert_eq!(pending[0].txn_id, 1);
1199 assert_eq!(pending[1].txn_id, 2);
1200 assert_eq!(pending[2].txn_id, 3);
1201 }
1202
1203 #[test]
1204 fn test_adaptive_batch_sizing() {
1205 let buffer = GroupCommitBuffer::with_fsync_latency(100, 10000, 5000);
1206
1207 for i in 0..50 {
1209 buffer.add(i);
1210 std::thread::sleep(std::time::Duration::from_micros(100)); }
1212
1213 let stats = buffer.stats();
1215 assert!(stats.adaptive_batch_size >= 1);
1216 }
1217
1218 #[test]
1223 fn test_mvcc_basic_transaction() {
1224 let dir = tempdir().unwrap();
1225 let wal_path = dir.path().join("mvcc_test.wal");
1226
1227 let writes = Arc::new(RwLock::new(Vec::new()));
1228 let writes_clone = writes.clone();
1229
1230 let manager = MvccTransactionManager::new(wal_path, move |k, v| {
1231 writes_clone.write().push((k.to_vec(), v.to_vec()));
1232 Ok(())
1233 })
1234 .unwrap();
1235
1236 let txn_id = manager.begin_default().unwrap();
1238
1239 manager
1241 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1242 .unwrap();
1243
1244 let value = manager.read(txn_id, b"key1").unwrap();
1246 assert_eq!(value, Some(b"value1".to_vec()));
1247
1248 let commit_ts = manager.commit(txn_id).unwrap();
1250 assert!(commit_ts > 0);
1251
1252 assert_eq!(writes.read().len(), 1);
1254 }
1255
1256 #[test]
1257 fn test_mvcc_snapshot_isolation() {
1258 let dir = tempdir().unwrap();
1259 let wal_path = dir.path().join("mvcc_si_test.wal");
1260
1261 let manager = MvccTransactionManager::new(wal_path, |_, _| Ok(())).unwrap();
1262
1263 let txn1 = manager.begin_default().unwrap();
1265 manager
1266 .write(txn1, b"key1".to_vec(), b"v1".to_vec())
1267 .unwrap();
1268 manager.commit(txn1).unwrap();
1269
1270 let txn2 = manager.begin_default().unwrap();
1272
1273 let txn3 = manager.begin_default().unwrap();
1275 manager
1276 .write(txn3, b"key1".to_vec(), b"v3".to_vec())
1277 .unwrap();
1278 manager.commit(txn3).unwrap();
1279
1280 let _value = manager.read(txn2, b"key1").unwrap();
1285
1286 manager.commit(txn2).unwrap();
1287 }
1288
1289 #[test]
1299 fn test_ssi_txn_id_divergence_serializable_read() {
1300 let dir = tempdir().unwrap();
1301 let wal_path = dir.path().join("ssi_divergence.wal");
1302 let manager = MvccTransactionManager::new(wal_path, |_, _| Ok(())).unwrap();
1303
1304 let _si = manager.begin(IsolationLevel::SnapshotIsolation).unwrap();
1306
1307 let ser = manager.begin(IsolationLevel::Serializable).unwrap();
1309
1310 let res = manager.read(ser, b"key");
1313 assert!(
1314 res.is_ok(),
1315 "Serializable read failed due to SSI txn-id divergence: {:?}",
1316 res.err()
1317 );
1318 }
1319
1320 #[test]
1321 fn test_mvcc_abort() {
1322 let dir = tempdir().unwrap();
1323 let wal_path = dir.path().join("mvcc_abort_test.wal");
1324
1325 let writes = Arc::new(RwLock::new(Vec::new()));
1326 let writes_clone = writes.clone();
1327
1328 let manager = MvccTransactionManager::new(wal_path, move |k, v| {
1329 writes_clone.write().push((k.to_vec(), v.to_vec()));
1330 Ok(())
1331 })
1332 .unwrap();
1333
1334 let txn_id = manager.begin_default().unwrap();
1335 manager
1336 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1337 .unwrap();
1338
1339 manager.abort(txn_id).unwrap();
1341
1342 assert!(writes.read().is_empty());
1344 }
1345
1346 #[test]
1347 fn test_mvcc_version_visibility() {
1348 let mut chain = MvccVersionChain::default();
1349 let committed: HashMap<u64, u64> = [(1, 10), (2, 20)].into_iter().collect();
1350
1351 chain.add(MvccVersion::new(1, 5, b"v1".to_vec()));
1353
1354 chain.add(MvccVersion::new(2, 15, b"v2".to_vec()));
1356
1357 let visible = chain.get_visible_legacy(15, 99, &committed);
1359 assert_eq!(visible, Some(&b"v1".to_vec()));
1360
1361 let visible = chain.get_visible_legacy(25, 99, &committed);
1363 assert_eq!(visible, Some(&b"v2".to_vec()));
1364 }
1365
1366 #[test]
1367 fn test_mvcc_version_gc() {
1368 let mut chain = MvccVersionChain::default();
1369
1370 for i in 0..5 {
1372 let mut version = MvccVersion::new(i, i * 10, vec![i as u8]);
1373 if i < 4 {
1375 version.mark_deleted(i + 1, (i + 1) * 10);
1376 }
1377 chain.add(version);
1378 }
1379
1380 assert_eq!(chain.versions.len(), 5);
1381
1382 let gc_count = chain.gc(45);
1385 assert!(chain.versions.len() < 5 || gc_count == 0);
1387 }
1388
1389 #[test]
1390 fn test_mvcc_concurrent_transactions() {
1391 let dir = tempdir().unwrap();
1392 let wal_path = dir.path().join("mvcc_concurrent_test.wal");
1393
1394 let manager = Arc::new(MvccTransactionManager::new(wal_path, |_, _| Ok(())).unwrap());
1395
1396 let handles: Vec<_> = (0..4)
1398 .map(|i| {
1399 let m = manager.clone();
1400 std::thread::spawn(move || {
1401 let txn = m.begin_default().unwrap();
1402 m.write(
1403 txn,
1404 format!("key{}", i).into_bytes(),
1405 format!("value{}", i).into_bytes(),
1406 )
1407 .unwrap();
1408 m.commit(txn).unwrap();
1409 })
1410 })
1411 .collect();
1412
1413 for h in handles {
1414 h.join().unwrap();
1415 }
1416
1417 assert_eq!(manager.active_count(), 0);
1419 }
1420}