1use crate::group_commit::EventDrivenGroupCommit;
77use crate::ssi::SsiManager;
78use crate::txn_wal::TxnWal;
79use dashmap::DashMap;
80use parking_lot::RwLock;
81use sochdb_core::{Result, SochDBError};
82use std::collections::HashMap;
83use std::path::Path;
84use std::sync::Arc;
85use std::sync::atomic::{AtomicU64, Ordering};
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum TxnState {
90 Active,
92 Prepared,
94 Committed,
96 Aborted,
98}
99
100#[derive(Debug)]
102pub struct Transaction {
103 pub id: u64,
105 pub start_ts: u64,
107 pub state: TxnState,
109 writes: Vec<(Vec<u8>, Vec<u8>)>,
111 reads: Vec<Vec<u8>>,
113}
114
115impl Transaction {
116 fn new(id: u64, start_ts: u64) -> Self {
117 Self {
118 id,
119 start_ts,
120 state: TxnState::Active,
121 writes: Vec::new(),
122 reads: Vec::new(),
123 }
124 }
125
126 pub fn write(&mut self, key: Vec<u8>, value: Vec<u8>) {
128 self.writes.push((key, value));
129 }
130
131 pub fn record_read(&mut self, key: Vec<u8>) {
133 self.reads.push(key);
134 }
135
136 pub fn writes(&self) -> &[(Vec<u8>, Vec<u8>)] {
138 &self.writes
139 }
140}
141
142#[allow(clippy::type_complexity)]
146pub struct WalStorageManager {
147 wal: Arc<TxnWal>,
149 active_txns: RwLock<HashMap<u64, Transaction>>,
151 timestamp: AtomicU64,
153 apply_fn: Box<dyn Fn(&[u8], &[u8]) -> Result<()> + Send + Sync>,
155}
156
157impl WalStorageManager {
158 pub fn new<P: AsRef<Path>, F>(wal_path: P, apply_fn: F) -> Result<Self>
160 where
161 F: Fn(&[u8], &[u8]) -> Result<()> + Send + Sync + 'static,
162 {
163 let wal = Arc::new(TxnWal::new(wal_path)?);
164
165 Ok(Self {
166 wal,
167 active_txns: RwLock::new(HashMap::new()),
168 timestamp: AtomicU64::new(1),
169 apply_fn: Box::new(apply_fn),
170 })
171 }
172
173 pub fn begin_txn(&self) -> Result<u64> {
175 let txn_id = self.wal.begin_transaction()?;
176 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
177
178 let txn = Transaction::new(txn_id, start_ts);
179 self.active_txns.write().insert(txn_id, txn);
180
181 Ok(txn_id)
182 }
183
184 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
188 let mut txns = self.active_txns.write();
189 let txn = txns
190 .get_mut(&txn_id)
191 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
192
193 if txn.state != TxnState::Active {
194 return Err(SochDBError::InvalidArgument(
195 "Transaction not active".into(),
196 ));
197 }
198
199 txn.write(key, value);
200 Ok(())
201 }
202
203 pub fn write_immediate(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
207 {
209 let txns = self.active_txns.read();
210 let txn = txns
211 .get(&txn_id)
212 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
213
214 if txn.state != TxnState::Active {
215 return Err(SochDBError::InvalidArgument(
216 "Transaction not active".into(),
217 ));
218 }
219 }
220
221 self.wal.write(txn_id, key.clone(), value.clone())?;
223
224 (self.apply_fn)(&key, &value)?;
226
227 Ok(())
228 }
229
230 pub fn commit(&self, txn_id: u64) -> Result<u64> {
237 let txn = {
238 let mut txns = self.active_txns.write();
239 txns.remove(&txn_id)
240 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
241 };
242
243 if txn.state != TxnState::Active {
244 return Err(SochDBError::InvalidArgument(
245 "Transaction not active".into(),
246 ));
247 }
248
249 for (key, value) in &txn.writes {
251 self.wal.write(txn_id, key.clone(), value.clone())?;
252 }
253
254 self.wal.commit_transaction(txn_id)?;
256
257 for (key, value) in &txn.writes {
259 (self.apply_fn)(key, value)?;
260 }
261
262 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
264 Ok(commit_ts)
265 }
266
267 pub fn abort(&self, txn_id: u64) -> Result<()> {
271 let mut txns = self.active_txns.write();
272 let txn = txns
273 .remove(&txn_id)
274 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
275
276 if txn.state != TxnState::Active && txn.state != TxnState::Prepared {
277 return Err(SochDBError::InvalidArgument(
278 "Transaction cannot be aborted".into(),
279 ));
280 }
281
282 self.wal.abort_transaction(txn_id)?;
284
285 Ok(())
287 }
288
289 pub fn recover(&self) -> Result<RecoveryStats> {
293 let (committed_writes, txn_count) = self.wal.replay_for_recovery()?;
294
295 for (key, value) in &committed_writes {
296 (self.apply_fn)(key, value)?;
297 }
298
299 Ok(RecoveryStats {
300 transactions_recovered: txn_count,
301 writes_applied: committed_writes.len(),
302 })
303 }
304
305 pub fn checkpoint(&self) -> Result<()> {
309 self.wal.write_checkpoint()?;
310 self.wal.truncate()?;
311 Ok(())
312 }
313
314 pub fn wal(&self) -> &Arc<TxnWal> {
316 &self.wal
317 }
318
319 pub fn current_timestamp(&self) -> u64 {
321 self.timestamp.load(Ordering::SeqCst)
322 }
323}
324
325#[derive(Debug, Clone, Default)]
327pub struct RecoveryStats {
328 pub transactions_recovered: usize,
330 pub writes_applied: usize,
332}
333
334#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum IsolationLevel {
341 ReadCommitted,
343 SnapshotIsolation,
345 Serializable,
347}
348
349#[derive(Debug)]
351pub struct MvccTransaction {
352 pub txn_id: u64,
354 pub snapshot_ts: u64,
356 pub status: MvccTxnStatus,
358 pub read_set: std::collections::HashSet<Vec<u8>>,
360 pub write_set: HashMap<Vec<u8>, Vec<u8>>,
362 pub isolation_level: IsolationLevel,
364}
365
366#[derive(Debug, Clone, Copy, PartialEq, Eq)]
368pub enum MvccTxnStatus {
369 Active,
370 Committed(u64), Aborted,
372}
373
374#[derive(Debug, Clone)]
376pub struct MvccVersion {
377 pub xmin: u64,
379 pub xmax: u64,
381 pub created_ts: u64,
383 pub deleted_ts: u64,
385 pub value: Vec<u8>,
387}
388
389impl MvccVersion {
390 pub fn new(xmin: u64, created_ts: u64, value: Vec<u8>) -> Self {
392 Self {
393 xmin,
394 xmax: 0,
395 created_ts,
396 deleted_ts: u64::MAX,
397 value,
398 }
399 }
400
401 pub fn mark_deleted(&mut self, xmax: u64, deleted_ts: u64) {
403 self.xmax = xmax;
404 self.deleted_ts = deleted_ts;
405 }
406
407 pub fn is_visible(
409 &self,
410 snapshot_ts: u64,
411 txn_id: u64,
412 committed_txns: &HashMap<u64, u64>,
413 ) -> bool {
414 if self.xmin == txn_id {
416 return self.xmax != txn_id; }
418
419 match committed_txns.get(&self.xmin) {
421 Some(&commit_ts) if commit_ts < snapshot_ts => {}
422 _ => return false, }
424
425 if self.xmax == 0 {
427 return true; }
429 if self.xmax == txn_id {
430 return false; }
432 match committed_txns.get(&self.xmax) {
433 Some(&commit_ts) => commit_ts >= snapshot_ts, None => true, }
436 }
437
438 pub fn is_visible_dashmap(
440 &self,
441 snapshot_ts: u64,
442 txn_id: u64,
443 committed_txns: &DashMap<u64, u64>,
444 ) -> bool {
445 if self.xmin == txn_id {
447 return self.xmax != txn_id; }
449
450 match committed_txns.get(&self.xmin) {
452 Some(commit_ts_ref) if *commit_ts_ref < snapshot_ts => {}
453 _ => return false, }
455
456 if self.xmax == 0 {
458 return true; }
460 if self.xmax == txn_id {
461 return false; }
463 match committed_txns.get(&self.xmax) {
464 Some(commit_ts_ref) => *commit_ts_ref >= snapshot_ts, None => true, }
467 }
468}
469
470#[derive(Debug, Default)]
472pub struct MvccVersionChain {
473 versions: Vec<MvccVersion>,
475}
476
477impl MvccVersionChain {
478 pub fn add(&mut self, version: MvccVersion) {
480 self.versions.insert(0, version);
481 }
482
483 pub fn get_visible(
486 &self,
487 snapshot_ts: u64,
488 txn_id: u64,
489 committed: &DashMap<u64, u64>,
490 ) -> Option<&Vec<u8>> {
491 for v in &self.versions {
492 if v.is_visible_dashmap(snapshot_ts, txn_id, committed) {
493 return Some(&v.value);
494 }
495 }
496 None
497 }
498
499 pub fn get_visible_legacy(
501 &self,
502 snapshot_ts: u64,
503 txn_id: u64,
504 committed: &HashMap<u64, u64>,
505 ) -> Option<&Vec<u8>> {
506 for v in &self.versions {
507 if v.is_visible(snapshot_ts, txn_id, committed) {
508 return Some(&v.value);
509 }
510 }
511 None
512 }
513
514 pub fn delete(&mut self, xmax: u64, deleted_ts: u64) -> bool {
516 if let Some(v) = self.versions.first_mut()
517 && v.xmax == 0
518 {
519 v.mark_deleted(xmax, deleted_ts);
520 return true;
521 }
522 false
523 }
524
525 pub fn gc(&mut self, min_visible_ts: u64) -> usize {
527 let old_len = self.versions.len();
528 if old_len <= 1 {
529 return 0;
530 }
531 self.versions.retain(|v| v.deleted_ts >= min_visible_ts);
532 if self.versions.is_empty() {
533 return old_len;
534 }
535 old_len - self.versions.len()
536 }
537}
538
539pub struct MvccTransactionManager {
552 wal: Arc<TxnWal>,
554 next_txn_id: AtomicU64,
556 timestamp: AtomicU64,
558 active_txns: RwLock<HashMap<u64, MvccTransaction>>,
560 committed_txns: DashMap<u64, u64>,
562 versions: DashMap<Vec<u8>, MvccVersionChain>,
564 ssi_manager: SsiManager,
566 group_commit: EventDrivenGroupCommit,
568 min_snapshot_ts: AtomicU64,
570 #[allow(clippy::type_complexity)]
572 apply_fn: Box<dyn Fn(&[u8], &[u8]) -> Result<()> + Send + Sync>,
573}
574
575impl MvccTransactionManager {
576 pub fn new<P: AsRef<Path>, F>(wal_path: P, apply_fn: F) -> Result<Self>
578 where
579 F: Fn(&[u8], &[u8]) -> Result<()> + Send + Sync + 'static,
580 {
581 let wal = Arc::new(TxnWal::new(wal_path)?);
582 let wal_for_gc = wal.clone();
583
584 let group_commit = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
586 for &txn_id in txn_ids {
588 wal_for_gc
589 .commit_transaction(txn_id)
590 .map_err(|e| e.to_string())?;
591 }
592 let commit_ts = std::time::SystemTime::now()
593 .duration_since(std::time::UNIX_EPOCH)
594 .unwrap()
595 .as_micros() as u64;
596 Ok(commit_ts)
597 });
598
599 Ok(Self {
600 wal,
601 next_txn_id: AtomicU64::new(1),
602 timestamp: AtomicU64::new(1),
603 active_txns: RwLock::new(HashMap::new()),
604 committed_txns: DashMap::new(),
605 versions: DashMap::new(),
606 ssi_manager: SsiManager::new(),
607 group_commit,
608 min_snapshot_ts: AtomicU64::new(u64::MAX),
609 apply_fn: Box::new(apply_fn),
610 })
611 }
612
613 pub fn begin(&self, isolation_level: IsolationLevel) -> Result<u64> {
615 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
616 let snapshot_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
617
618 self.wal.begin_transaction().ok(); let txn = MvccTransaction {
623 txn_id,
624 snapshot_ts,
625 status: MvccTxnStatus::Active,
626 read_set: std::collections::HashSet::new(),
627 write_set: HashMap::new(),
628 isolation_level,
629 };
630
631 self.active_txns.write().insert(txn_id, txn);
632
633 self.update_min_snapshot();
635
636 if isolation_level == IsolationLevel::Serializable {
638 self.ssi_manager.begin().ok();
639 }
640
641 Ok(txn_id)
642 }
643
644 pub fn begin_default(&self) -> Result<u64> {
646 self.begin(IsolationLevel::SnapshotIsolation)
647 }
648
649 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
651 let mut txns = self.active_txns.write();
652 let txn = txns
653 .get_mut(&txn_id)
654 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
655
656 if txn.status != MvccTxnStatus::Active {
657 return Err(SochDBError::InvalidArgument(
658 "Transaction not active".into(),
659 ));
660 }
661
662 if let Some(value) = txn.write_set.get(key) {
664 return Ok(Some(value.clone()));
665 }
666
667 txn.read_set.insert(key.to_vec());
669
670 let snapshot_ts = txn.snapshot_ts;
671 let isolation = txn.isolation_level;
672 drop(txns);
673
674 if isolation == IsolationLevel::Serializable {
676 self.ssi_manager
677 .record_read(txn_id, key)
678 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
679 }
680
681 if let Some(chain) = self.versions.get(key) {
683 Ok(chain
684 .get_visible(snapshot_ts, txn_id, &self.committed_txns)
685 .cloned())
686 } else {
687 Ok(None)
688 }
689 }
690
691 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
693 let mut txns = self.active_txns.write();
694 let txn = txns
695 .get_mut(&txn_id)
696 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
697
698 if txn.status != MvccTxnStatus::Active {
699 return Err(SochDBError::InvalidArgument(
700 "Transaction not active".into(),
701 ));
702 }
703
704 let isolation = txn.isolation_level;
705
706 if isolation == IsolationLevel::Serializable {
708 self.ssi_manager
709 .record_write(txn_id, &key)
710 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
711 }
712
713 txn.write_set.insert(key, value);
715 Ok(())
716 }
717
718 pub fn commit(&self, txn_id: u64) -> Result<u64> {
720 let txn = {
722 let mut txns = self.active_txns.write();
723 txns.remove(&txn_id)
724 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
725 };
726
727 if txn.status != MvccTxnStatus::Active {
728 return Err(SochDBError::InvalidArgument(
729 "Transaction not active".into(),
730 ));
731 }
732
733 if txn.isolation_level == IsolationLevel::Serializable {
735 self.ssi_manager
736 .commit(txn_id)
737 .map_err(|e| SochDBError::Internal(format!("SSI conflict: {}", e.message)))?;
738 }
739
740 for (key, value) in &txn.write_set {
742 self.wal.write(txn_id, key.clone(), value.clone())?;
743 }
744
745 let commit_ts = self
747 .group_commit
748 .submit_and_wait(txn_id)
749 .map_err(|e| SochDBError::Internal(format!("Group commit error: {}", e)))?;
750
751 let apply_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
753 for (key, value) in &txn.write_set {
754 self.versions
755 .entry(key.clone())
756 .or_default()
757 .add(MvccVersion::new(txn_id, apply_ts, value.clone()));
758 }
759
760 for (key, value) in &txn.write_set {
762 (self.apply_fn)(key, value)?;
763 }
764
765 self.committed_txns.insert(txn_id, commit_ts);
767
768 self.update_min_snapshot();
770
771 Ok(commit_ts)
772 }
773
774 pub fn abort(&self, txn_id: u64) -> Result<()> {
776 let txn = {
777 let mut txns = self.active_txns.write();
778 txns.remove(&txn_id)
779 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?
780 };
781
782 if txn.status != MvccTxnStatus::Active {
783 return Err(SochDBError::InvalidArgument(
784 "Transaction not active".into(),
785 ));
786 }
787
788 self.wal.abort_transaction(txn_id)?;
790
791 if txn.isolation_level == IsolationLevel::Serializable {
793 self.ssi_manager.abort(txn_id);
794 }
795
796 self.update_min_snapshot();
798 Ok(())
799 }
800
801 pub fn delete(&self, txn_id: u64, key: &[u8]) -> Result<bool> {
803 let txns = self.active_txns.read();
804 let txn = txns
805 .get(&txn_id)
806 .ok_or_else(|| SochDBError::InvalidArgument("Transaction not found".into()))?;
807
808 if txn.status != MvccTxnStatus::Active {
809 return Err(SochDBError::InvalidArgument(
810 "Transaction not active".into(),
811 ));
812 }
813
814 drop(txns);
815
816 let deleted_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
817
818 if let Some(mut chain) = self.versions.get_mut(key) {
820 Ok(chain.delete(txn_id, deleted_ts))
821 } else {
822 Ok(false)
823 }
824 }
825
826 pub fn gc(&self) -> usize {
828 let min_ts = self.min_snapshot_ts.load(Ordering::SeqCst);
829 let mut total_gc = 0;
830
831 for mut entry in self.versions.iter_mut() {
833 total_gc += entry.value_mut().gc(min_ts);
834 }
835
836 self.committed_txns.retain(|_, ts| *ts >= min_ts);
838
839 total_gc += self.ssi_manager.gc(min_ts);
841
842 total_gc
843 }
844
845 fn update_min_snapshot(&self) {
847 let txns = self.active_txns.read();
848 let min = txns
849 .values()
850 .map(|t| t.snapshot_ts)
851 .min()
852 .unwrap_or(u64::MAX);
853 self.min_snapshot_ts.store(min, Ordering::SeqCst);
854 }
855
856 pub fn recover(&self) -> Result<RecoveryStats> {
858 let (committed_writes, txn_count) = self.wal.replay_for_recovery()?;
859
860 for (key, value) in &committed_writes {
861 (self.apply_fn)(key, value)?;
862 }
863
864 Ok(RecoveryStats {
865 transactions_recovered: txn_count,
866 writes_applied: committed_writes.len(),
867 })
868 }
869
870 pub fn current_timestamp(&self) -> u64 {
872 self.timestamp.load(Ordering::SeqCst)
873 }
874
875 pub fn active_count(&self) -> usize {
877 self.active_txns.read().len()
878 }
879}
880
881pub struct GroupCommitBuffer {
887 pending: RwLock<Vec<PendingCommit>>,
889 max_pending: usize,
891 max_wait_us: u64,
893 last_flush: AtomicU64,
895 arrival_rate_ema: AtomicU64,
897 last_arrival: AtomicU64,
899 fsync_latency_us: AtomicU64,
901 adaptive_batch_size: AtomicU64,
903}
904
905#[derive(Debug, Clone)]
907pub struct PendingCommit {
908 pub txn_id: u64,
909 pub enqueue_time_us: u64,
910}
911
912impl GroupCommitBuffer {
913 pub fn new(max_pending: usize, max_wait_us: u64) -> Self {
915 Self {
916 pending: RwLock::new(Vec::with_capacity(max_pending)),
917 max_pending,
918 max_wait_us,
919 last_flush: AtomicU64::new(0),
920 arrival_rate_ema: AtomicU64::new(100_000), last_arrival: AtomicU64::new(0),
922 fsync_latency_us: AtomicU64::new(5000), adaptive_batch_size: AtomicU64::new(10), }
925 }
926
927 pub fn with_fsync_latency(max_pending: usize, max_wait_us: u64, fsync_latency_us: u64) -> Self {
929 let buffer = Self::new(max_pending, max_wait_us);
930 buffer
931 .fsync_latency_us
932 .store(fsync_latency_us, Ordering::Relaxed);
933 buffer.recompute_batch_size();
934 buffer
935 }
936
937 fn now_us() -> u64 {
938 std::time::SystemTime::now()
939 .duration_since(std::time::UNIX_EPOCH)
940 .unwrap()
941 .as_micros() as u64
942 }
943
944 fn update_arrival_rate(&self) {
946 let now = Self::now_us();
947 let last = self.last_arrival.swap(now, Ordering::Relaxed);
948
949 if last > 0 {
950 let delta_us = now.saturating_sub(last);
951 if delta_us > 0 {
952 let instant_rate = 1_000_000_000 / delta_us;
955
956 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
958 let new_rate = (old_rate * 9 + instant_rate) / 10;
959 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
960 }
961 }
962 }
963
964 fn recompute_batch_size(&self) {
969 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; let l_fsync = self.fsync_latency_us.load(Ordering::Relaxed) as f64; let c_wait = 1.0; let l_fsync_s = l_fsync / 1_000_000.0;
976 let n_opt = (2.0 * l_fsync_s * lambda / c_wait).sqrt();
977
978 let batch_size = n_opt.clamp(1.0, self.max_pending as f64) as u64;
979 self.adaptive_batch_size
980 .store(batch_size, Ordering::Relaxed);
981 }
982
983 pub fn add(&self, txn_id: u64) -> bool {
987 self.update_arrival_rate();
988
989 let now = Self::now_us();
990 let commit = PendingCommit {
991 txn_id,
992 enqueue_time_us: now,
993 };
994
995 let mut pending = self.pending.write();
996 pending.push(commit);
997
998 let adaptive_size = self.adaptive_batch_size.load(Ordering::Relaxed) as usize;
999 let target_size = adaptive_size.max(1).min(self.max_pending);
1000
1001 if pending.len() >= target_size {
1002 return true;
1003 }
1004
1005 let last = self.last_flush.load(Ordering::Relaxed);
1007 if now - last > self.max_wait_us {
1008 return true;
1009 }
1010
1011 false
1012 }
1013
1014 pub fn take_pending(&self) -> Vec<PendingCommit> {
1016 let mut pending = self.pending.write();
1017 let result = std::mem::take(&mut *pending);
1018
1019 let now = Self::now_us();
1020 self.last_flush.store(now, Ordering::Relaxed);
1021
1022 self.recompute_batch_size();
1024
1025 result
1026 }
1027
1028 pub fn record_fsync_latency(&self, latency_us: u64) {
1030 let old = self.fsync_latency_us.load(Ordering::Relaxed);
1032 let new = (old * 4 + latency_us) / 5;
1033 self.fsync_latency_us.store(new, Ordering::Relaxed);
1034
1035 self.recompute_batch_size();
1037 }
1038
1039 pub fn current_batch_size(&self) -> usize {
1041 self.adaptive_batch_size.load(Ordering::Relaxed) as usize
1042 }
1043
1044 pub fn current_arrival_rate(&self) -> f64 {
1046 self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
1047 }
1048
1049 pub fn stats(&self) -> GroupCommitStats {
1051 GroupCommitStats {
1052 adaptive_batch_size: self.adaptive_batch_size.load(Ordering::Relaxed) as usize,
1053 arrival_rate: self.current_arrival_rate(),
1054 fsync_latency_us: self.fsync_latency_us.load(Ordering::Relaxed),
1055 pending_count: self.pending.read().len(),
1056 }
1057 }
1058}
1059
1060#[derive(Debug, Clone)]
1062pub struct GroupCommitStats {
1063 pub adaptive_batch_size: usize,
1065 pub arrival_rate: f64,
1067 pub fsync_latency_us: u64,
1069 pub pending_count: usize,
1071}
1072
1073#[cfg(test)]
1074mod tests {
1075 use super::*;
1076 use std::sync::atomic::AtomicUsize;
1077 use tempfile::tempdir;
1078
1079 #[test]
1080 fn test_basic_transaction() {
1081 let dir = tempdir().unwrap();
1082 let wal_path = dir.path().join("test.wal");
1083
1084 let writes = Arc::new(RwLock::new(Vec::new()));
1085 let writes_clone = writes.clone();
1086
1087 let manager = WalStorageManager::new(wal_path, move |k, v| {
1088 writes_clone.write().push((k.to_vec(), v.to_vec()));
1089 Ok(())
1090 })
1091 .unwrap();
1092
1093 let txn_id = manager.begin_txn().unwrap();
1095
1096 manager
1098 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1099 .unwrap();
1100 manager
1101 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
1102 .unwrap();
1103
1104 assert!(writes.read().is_empty());
1106
1107 manager.commit(txn_id).unwrap();
1109
1110 let applied = writes.read();
1112 assert_eq!(applied.len(), 2);
1113 assert_eq!(applied[0], (b"key1".to_vec(), b"value1".to_vec()));
1114 assert_eq!(applied[1], (b"key2".to_vec(), b"value2".to_vec()));
1115 }
1116
1117 #[test]
1118 fn test_abort_transaction() {
1119 let dir = tempdir().unwrap();
1120 let wal_path = dir.path().join("test.wal");
1121
1122 let writes = Arc::new(RwLock::new(Vec::new()));
1123 let writes_clone = writes.clone();
1124
1125 let manager = WalStorageManager::new(wal_path, move |k, v| {
1126 writes_clone.write().push((k.to_vec(), v.to_vec()));
1127 Ok(())
1128 })
1129 .unwrap();
1130
1131 let txn_id = manager.begin_txn().unwrap();
1132 manager
1133 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1134 .unwrap();
1135
1136 manager.abort(txn_id).unwrap();
1138
1139 assert!(writes.read().is_empty());
1141 }
1142
1143 #[test]
1144 fn test_immediate_write() {
1145 let dir = tempdir().unwrap();
1146 let wal_path = dir.path().join("test.wal");
1147
1148 let write_count = Arc::new(AtomicUsize::new(0));
1149 let count_clone = write_count.clone();
1150
1151 let manager = WalStorageManager::new(wal_path, move |_, _| {
1152 count_clone.fetch_add(1, Ordering::SeqCst);
1153 Ok(())
1154 })
1155 .unwrap();
1156
1157 let txn_id = manager.begin_txn().unwrap();
1158
1159 manager
1161 .write_immediate(txn_id, b"key1".to_vec(), b"value1".to_vec())
1162 .unwrap();
1163 assert_eq!(write_count.load(Ordering::SeqCst), 1);
1164
1165 manager.commit(txn_id).unwrap();
1166 }
1167
1168 #[test]
1169 fn test_group_commit_buffer() {
1170 let buffer = GroupCommitBuffer::with_fsync_latency(10, 1000, 5000);
1172
1173 let _ = buffer.take_pending();
1179
1180 buffer.add(1);
1182 buffer.add(2);
1183 buffer.add(3);
1184
1185 let pending = buffer.take_pending();
1186 assert_eq!(pending.len(), 3);
1187 assert_eq!(pending[0].txn_id, 1);
1188 assert_eq!(pending[1].txn_id, 2);
1189 assert_eq!(pending[2].txn_id, 3);
1190 }
1191
1192 #[test]
1193 fn test_adaptive_batch_sizing() {
1194 let buffer = GroupCommitBuffer::with_fsync_latency(100, 10000, 5000);
1195
1196 for i in 0..50 {
1198 buffer.add(i);
1199 std::thread::sleep(std::time::Duration::from_micros(100)); }
1201
1202 let stats = buffer.stats();
1204 assert!(stats.adaptive_batch_size >= 1);
1205 }
1206
1207 #[test]
1212 fn test_mvcc_basic_transaction() {
1213 let dir = tempdir().unwrap();
1214 let wal_path = dir.path().join("mvcc_test.wal");
1215
1216 let writes = Arc::new(RwLock::new(Vec::new()));
1217 let writes_clone = writes.clone();
1218
1219 let manager = MvccTransactionManager::new(wal_path, move |k, v| {
1220 writes_clone.write().push((k.to_vec(), v.to_vec()));
1221 Ok(())
1222 })
1223 .unwrap();
1224
1225 let txn_id = manager.begin_default().unwrap();
1227
1228 manager
1230 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1231 .unwrap();
1232
1233 let value = manager.read(txn_id, b"key1").unwrap();
1235 assert_eq!(value, Some(b"value1".to_vec()));
1236
1237 let commit_ts = manager.commit(txn_id).unwrap();
1239 assert!(commit_ts > 0);
1240
1241 assert_eq!(writes.read().len(), 1);
1243 }
1244
1245 #[test]
1246 fn test_mvcc_snapshot_isolation() {
1247 let dir = tempdir().unwrap();
1248 let wal_path = dir.path().join("mvcc_si_test.wal");
1249
1250 let manager = MvccTransactionManager::new(wal_path, |_, _| Ok(())).unwrap();
1251
1252 let txn1 = manager.begin_default().unwrap();
1254 manager
1255 .write(txn1, b"key1".to_vec(), b"v1".to_vec())
1256 .unwrap();
1257 manager.commit(txn1).unwrap();
1258
1259 let txn2 = manager.begin_default().unwrap();
1261
1262 let txn3 = manager.begin_default().unwrap();
1264 manager
1265 .write(txn3, b"key1".to_vec(), b"v3".to_vec())
1266 .unwrap();
1267 manager.commit(txn3).unwrap();
1268
1269 let _value = manager.read(txn2, b"key1").unwrap();
1274
1275 manager.commit(txn2).unwrap();
1276 }
1277
1278 #[test]
1279 fn test_mvcc_abort() {
1280 let dir = tempdir().unwrap();
1281 let wal_path = dir.path().join("mvcc_abort_test.wal");
1282
1283 let writes = Arc::new(RwLock::new(Vec::new()));
1284 let writes_clone = writes.clone();
1285
1286 let manager = MvccTransactionManager::new(wal_path, move |k, v| {
1287 writes_clone.write().push((k.to_vec(), v.to_vec()));
1288 Ok(())
1289 })
1290 .unwrap();
1291
1292 let txn_id = manager.begin_default().unwrap();
1293 manager
1294 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
1295 .unwrap();
1296
1297 manager.abort(txn_id).unwrap();
1299
1300 assert!(writes.read().is_empty());
1302 }
1303
1304 #[test]
1305 fn test_mvcc_version_visibility() {
1306 let mut chain = MvccVersionChain::default();
1307 let committed: HashMap<u64, u64> = [(1, 10), (2, 20)].into_iter().collect();
1308
1309 chain.add(MvccVersion::new(1, 5, b"v1".to_vec()));
1311
1312 chain.add(MvccVersion::new(2, 15, b"v2".to_vec()));
1314
1315 let visible = chain.get_visible_legacy(15, 99, &committed);
1317 assert_eq!(visible, Some(&b"v1".to_vec()));
1318
1319 let visible = chain.get_visible_legacy(25, 99, &committed);
1321 assert_eq!(visible, Some(&b"v2".to_vec()));
1322 }
1323
1324 #[test]
1325 fn test_mvcc_version_gc() {
1326 let mut chain = MvccVersionChain::default();
1327
1328 for i in 0..5 {
1330 let mut version = MvccVersion::new(i, i * 10, vec![i as u8]);
1331 if i < 4 {
1333 version.mark_deleted(i + 1, (i + 1) * 10);
1334 }
1335 chain.add(version);
1336 }
1337
1338 assert_eq!(chain.versions.len(), 5);
1339
1340 let gc_count = chain.gc(45);
1343 assert!(chain.versions.len() < 5 || gc_count == 0);
1345 }
1346
1347 #[test]
1348 fn test_mvcc_concurrent_transactions() {
1349 let dir = tempdir().unwrap();
1350 let wal_path = dir.path().join("mvcc_concurrent_test.wal");
1351
1352 let manager = Arc::new(MvccTransactionManager::new(wal_path, |_, _| Ok(())).unwrap());
1353
1354 let handles: Vec<_> = (0..4)
1356 .map(|i| {
1357 let m = manager.clone();
1358 std::thread::spawn(move || {
1359 let txn = m.begin_default().unwrap();
1360 m.write(
1361 txn,
1362 format!("key{}", i).into_bytes(),
1363 format!("value{}", i).into_bytes(),
1364 )
1365 .unwrap();
1366 m.commit(txn).unwrap();
1367 })
1368 })
1369 .collect();
1370
1371 for h in handles {
1372 h.join().unwrap();
1373 }
1374
1375 assert_eq!(manager.active_count(), 0);
1377 }
1378}