1use std::collections::{HashMap, HashSet};
49use std::sync::Arc;
50use std::sync::atomic::{AtomicU64, Ordering};
51
52use dashmap::DashMap;
53use parking_lot::RwLock;
54use smallvec::SmallVec;
55
56use crate::durable_storage::InlineKey;
57
58pub type TxnId = u64;
60
61pub type Timestamp = u64;
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum SsiTxnStatus {
67 Active,
69 Committed(Timestamp),
71 Aborted,
73}
74
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub enum ConflictType {
78 WriteWrite,
80 ReadWriteAnti,
82 DangerousStructure,
84}
85
86#[derive(Debug, Clone)]
88pub struct SsiConflictError {
89 pub victim_txn: TxnId,
91 pub winner_txn: Option<TxnId>,
93 pub conflict_type: ConflictType,
95 pub message: String,
97}
98
99impl std::fmt::Display for SsiConflictError {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 write!(
102 f,
103 "SSI conflict ({:?}): {}",
104 self.conflict_type, self.message
105 )
106 }
107}
108
109impl std::error::Error for SsiConflictError {}
110
111#[derive(Debug, Clone)]
113pub struct SsiVersionInfo {
114 pub xmin: TxnId,
116 pub xmax: TxnId,
118 pub begin_ts: Timestamp,
120 pub end_ts: Timestamp,
122 pub commit_ts: Option<Timestamp>,
124}
125
126impl SsiVersionInfo {
127 pub fn new(xmin: TxnId, begin_ts: Timestamp) -> Self {
129 Self {
130 xmin,
131 xmax: 0,
132 begin_ts,
133 end_ts: Timestamp::MAX,
134 commit_ts: None,
135 }
136 }
137
138 pub fn is_visible(&self, snapshot_ts: Timestamp, txn_states: &SsiTxnStates) -> bool {
144 match txn_states.get_status(self.xmin) {
146 Some(SsiTxnStatus::Committed(commit_ts)) => {
147 if commit_ts > snapshot_ts {
148 return false; }
150 }
151 Some(SsiTxnStatus::Active) | Some(SsiTxnStatus::Aborted) | None => {
152 return false; }
154 }
155
156 if self.xmax == 0 {
158 return true; }
160
161 match txn_states.get_status(self.xmax) {
162 Some(SsiTxnStatus::Committed(commit_ts)) => {
163 commit_ts > snapshot_ts }
165 Some(SsiTxnStatus::Active) | Some(SsiTxnStatus::Aborted) | None => {
166 true }
168 }
169 }
170}
171
172#[derive(Debug, Clone, PartialEq, Eq, Hash)]
174pub struct RwDependency {
175 pub reader: TxnId,
177 pub writer: TxnId,
179 pub key: InlineKey,
181}
182
183#[derive(Debug)]
185pub struct SsiTransaction {
186 pub txn_id: TxnId,
188 pub start_ts: Timestamp,
190 pub status: SsiTxnStatus,
192 pub commit_ts: Option<Timestamp>,
194 pub read_set: HashSet<InlineKey>,
197 pub write_set: HashSet<InlineKey>,
199 read_bloom: [u64; 4],
202 pub in_rw_deps: HashSet<TxnId>,
204 pub out_rw_deps: HashSet<TxnId>,
206 pub has_committed_in_rw: bool,
208 pub has_committed_out_rw: bool,
210}
211
212impl SsiTransaction {
213 pub fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
215 Self {
216 txn_id,
217 start_ts,
218 status: SsiTxnStatus::Active,
219 commit_ts: None,
220 read_set: HashSet::new(),
221 write_set: HashSet::new(),
222 read_bloom: [0u64; 4],
223 in_rw_deps: HashSet::new(),
224 out_rw_deps: HashSet::new(),
225 has_committed_in_rw: false,
226 has_committed_out_rw: false,
227 }
228 }
229
230 pub fn record_read(&mut self, key: &[u8]) {
232 let ik = SmallVec::from_slice(key);
233 self.read_set.insert(ik);
234 let h = twox_hash::xxh3::hash64(key);
236 let h1 = (h & 0xFF) as usize; let h2 = ((h >> 8) & 0xFF) as usize; self.read_bloom[h1 / 64] |= 1u64 << (h1 % 64);
239 self.read_bloom[h2 / 64] |= 1u64 << (h2 % 64);
240 }
241
242 pub fn record_write(&mut self, key: &[u8]) {
244 self.write_set.insert(SmallVec::from_slice(key));
245 }
246
247 pub fn maybe_read(&self, key: &[u8]) -> bool {
250 let h = twox_hash::xxh3::hash64(key);
251 let h1 = (h & 0xFF) as usize;
252 let h2 = ((h >> 8) & 0xFF) as usize;
253 (self.read_bloom[h1 / 64] & (1u64 << (h1 % 64))) != 0
254 && (self.read_bloom[h2 / 64] & (1u64 << (h2 % 64))) != 0
255 }
256
257 pub fn is_dangerous(&self) -> bool {
263 self.has_committed_in_rw && self.has_committed_out_rw
264 }
265}
266
267pub struct SsiTxnStates {
269 states: RwLock<HashMap<TxnId, SsiTxnStatus>>,
271}
272
273impl SsiTxnStates {
274 pub fn new() -> Self {
275 Self {
276 states: RwLock::new(HashMap::new()),
277 }
278 }
279
280 pub fn get_status(&self, txn_id: TxnId) -> Option<SsiTxnStatus> {
281 self.states.read().get(&txn_id).copied()
282 }
283
284 pub fn set_status(&self, txn_id: TxnId, status: SsiTxnStatus) {
285 self.states.write().insert(txn_id, status);
286 }
287}
288
289impl Default for SsiTxnStates {
290 fn default() -> Self {
291 Self::new()
292 }
293}
294
295pub struct SsiManager {
317 next_txn_id: AtomicU64,
319 timestamp: AtomicU64,
321 transactions: RwLock<HashMap<TxnId, SsiTransaction>>,
323 txn_states: Arc<SsiTxnStates>,
325 key_writers: DashMap<Vec<u8>, (TxnId, Timestamp)>,
329 key_readers: DashMap<Vec<u8>, HashSet<TxnId>>,
333}
334
335impl SsiManager {
336 pub fn new() -> Self {
338 Self {
339 next_txn_id: AtomicU64::new(1),
340 timestamp: AtomicU64::new(1),
341 transactions: RwLock::new(HashMap::new()),
342 txn_states: Arc::new(SsiTxnStates::new()),
343 key_writers: DashMap::new(),
344 key_readers: DashMap::new(),
345 }
346 }
347
348 pub fn begin(&self) -> Result<(TxnId, Timestamp), SsiConflictError> {
350 let txn_id = self.next_txn_id.fetch_add(1, Ordering::SeqCst);
351 let start_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
352
353 let txn = SsiTransaction::new(txn_id, start_ts);
354 self.transactions.write().insert(txn_id, txn);
355 self.txn_states.set_status(txn_id, SsiTxnStatus::Active);
356
357 Ok((txn_id, start_ts))
358 }
359
360 pub fn record_read(&self, txn_id: TxnId, key: &[u8]) -> Result<(), SsiConflictError> {
365 let snapshot_ts = {
367 let mut txns = self.transactions.write();
368 let txn = txns.get_mut(&txn_id).ok_or_else(|| SsiConflictError {
369 victim_txn: txn_id,
370 winner_txn: None,
371 conflict_type: ConflictType::ReadWriteAnti,
372 message: "Transaction not found".into(),
373 })?;
374 let ts = txn.start_ts;
375 txn.record_read(key);
376 ts
377 };
378
379 self.key_readers
381 .entry(key.to_vec())
382 .or_default()
383 .insert(txn_id);
384
385 let writer_info = self.key_writers.get(key).map(|r| *r);
387 if let Some((writer_txn, write_ts)) = writer_info
388 && write_ts > snapshot_ts
389 && writer_txn != txn_id
390 {
391 let writer_committed = matches!(
394 self.txn_states.get_status(writer_txn),
395 Some(SsiTxnStatus::Committed(_))
396 );
397
398 let mut txns = self.transactions.write();
399
400 if let Some(reader_txn) = txns.get_mut(&txn_id) {
402 reader_txn.out_rw_deps.insert(writer_txn);
403 if writer_committed {
404 reader_txn.has_committed_out_rw = true;
405 if reader_txn.is_dangerous() {
407 return Err(SsiConflictError {
408 victim_txn: txn_id,
409 winner_txn: Some(writer_txn),
410 conflict_type: ConflictType::DangerousStructure,
411 message: format!(
412 "Transaction {} would create serialization anomaly with {}",
413 txn_id, writer_txn
414 ),
415 });
416 }
417 }
418 }
419
420 if let Some(writer_txn_entry) = txns.get_mut(&writer_txn) {
422 writer_txn_entry.in_rw_deps.insert(txn_id);
423 }
424 }
425
426 Ok(())
427 }
428
429 pub fn record_write(&self, txn_id: TxnId, key: &[u8]) -> Result<(), SsiConflictError> {
434 let mut txns = self.transactions.write();
435 let txn = txns.get_mut(&txn_id).ok_or_else(|| SsiConflictError {
436 victim_txn: txn_id,
437 winner_txn: None,
438 conflict_type: ConflictType::WriteWrite,
439 message: "Transaction not found".into(),
440 })?;
441
442 let snapshot_ts = txn.start_ts;
443
444 if let Some(entry) = self.key_writers.get(key) {
446 let (prev_writer, write_ts) = *entry;
447 if write_ts > snapshot_ts && prev_writer != txn_id {
448 return Err(SsiConflictError {
449 victim_txn: txn_id,
450 winner_txn: Some(prev_writer),
451 conflict_type: ConflictType::WriteWrite,
452 message: format!(
453 "Write-write conflict: transaction {} already wrote to key, ts {}",
454 prev_writer, write_ts
455 ),
456 });
457 }
458 }
459
460 txn.record_write(key);
462
463 let write_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
465 drop(txns);
466
467 self.key_writers.insert(key.to_vec(), (txn_id, write_ts));
468
469 if let Some(readers) = self.key_readers.get(key) {
471 let mut txns = self.transactions.write();
472 for reader_id in readers.value() {
473 if *reader_id != txn_id
474 && let Some(reader_txn) = txns.get(reader_id)
475 && reader_txn.start_ts < write_ts
476 {
477 if let Some(writer_txn) = txns.get_mut(&txn_id) {
479 writer_txn.in_rw_deps.insert(*reader_id);
480
481 if let Some(SsiTxnStatus::Committed(_)) =
483 self.txn_states.get_status(*reader_id)
484 {
485 writer_txn.has_committed_in_rw = true;
486 }
487 }
488 if let Some(reader_txn) = txns.get_mut(reader_id) {
489 reader_txn.out_rw_deps.insert(txn_id);
490 }
491 }
492 }
493 }
494
495 Ok(())
496 }
497
498 pub fn commit(&self, txn_id: TxnId) -> Result<Timestamp, SsiConflictError> {
502 let commit_ts = self.timestamp.fetch_add(1, Ordering::SeqCst);
503
504 let (is_dangerous, out_deps, in_deps) = {
506 let txns = self.transactions.read();
507 let txn = txns.get(&txn_id).ok_or_else(|| SsiConflictError {
508 victim_txn: txn_id,
509 winner_txn: None,
510 conflict_type: ConflictType::DangerousStructure,
511 message: "Transaction not found".into(),
512 })?;
513 (
514 txn.is_dangerous(),
515 txn.out_rw_deps.clone(),
516 txn.in_rw_deps.clone(),
517 )
518 };
519
520 if is_dangerous {
521 let mut txns = self.transactions.write();
522 if let Some(txn) = txns.get_mut(&txn_id) {
523 txn.status = SsiTxnStatus::Aborted;
524 }
525 self.txn_states.set_status(txn_id, SsiTxnStatus::Aborted);
526 return Err(SsiConflictError {
527 victim_txn: txn_id,
528 winner_txn: None,
529 conflict_type: ConflictType::DangerousStructure,
530 message: "Transaction would create serialization anomaly (dangerous structure)"
531 .into(),
532 });
533 }
534
535 {
537 let mut txns = self.transactions.write();
538
539 if let Some(txn) = txns.get_mut(&txn_id) {
541 txn.status = SsiTxnStatus::Committed(commit_ts);
542 txn.commit_ts = Some(commit_ts);
543 }
544
545 for out_dep in &out_deps {
547 if let Some(other_txn) = txns.get_mut(out_dep) {
548 other_txn.has_committed_in_rw = true;
549 }
550 }
551
552 for in_dep in &in_deps {
554 if let Some(other_txn) = txns.get_mut(in_dep) {
555 other_txn.has_committed_out_rw = true;
556 }
557 }
558 }
559
560 self.txn_states
561 .set_status(txn_id, SsiTxnStatus::Committed(commit_ts));
562 Ok(commit_ts)
563 }
564
565 pub fn abort(&self, txn_id: TxnId) {
567 let mut txns = self.transactions.write();
568 if let Some(txn) = txns.get_mut(&txn_id) {
569 txn.status = SsiTxnStatus::Aborted;
570 self.txn_states.set_status(txn_id, SsiTxnStatus::Aborted);
571 }
572
573 self.key_writers.retain(|_, (writer, _)| *writer != txn_id);
575
576 for mut entry in self.key_readers.iter_mut() {
578 entry.value_mut().remove(&txn_id);
579 }
580 }
581
582 pub fn get_status(&self, txn_id: TxnId) -> Option<SsiTxnStatus> {
584 self.txn_states.get_status(txn_id)
585 }
586
587 pub fn get_snapshot_ts(&self, txn_id: TxnId) -> Option<Timestamp> {
589 self.transactions.read().get(&txn_id).map(|t| t.start_ts)
590 }
591
592 pub fn is_visible(&self, txn_id: TxnId, version: &SsiVersionInfo) -> bool {
594 if let Some(snapshot_ts) = self.get_snapshot_ts(txn_id) {
595 version.is_visible(snapshot_ts, &self.txn_states)
596 } else {
597 false
598 }
599 }
600
601 pub fn gc(&self, watermark: Timestamp) -> usize {
603 let mut removed = 0;
604
605 self.transactions.write().retain(|_, txn| {
607 if let Some(commit_ts) = txn.commit_ts
608 && commit_ts < watermark
609 {
610 removed += 1;
611 return false;
612 }
613 true
614 });
615
616 removed
617 }
618}
619
620impl Default for SsiManager {
621 fn default() -> Self {
622 Self::new()
623 }
624}
625
626pub struct HybridLogicalClock {
634 timestamp: AtomicU64,
636}
637
638impl HybridLogicalClock {
639 const LOGICAL_BITS: u32 = 20;
640 const LOGICAL_MASK: u64 = (1 << Self::LOGICAL_BITS) - 1;
641
642 pub fn new() -> Self {
644 let now_ms = Self::physical_time_ms();
645 Self {
646 timestamp: AtomicU64::new(now_ms << Self::LOGICAL_BITS),
647 }
648 }
649
650 fn physical_time_ms() -> u64 {
652 use std::time::{SystemTime, UNIX_EPOCH};
653 SystemTime::now()
654 .duration_since(UNIX_EPOCH)
655 .unwrap()
656 .as_millis() as u64
657 }
658
659 pub fn get_physical(ts: u64) -> u64 {
661 ts >> Self::LOGICAL_BITS
662 }
663
664 pub fn get_logical(ts: u64) -> u64 {
666 ts & Self::LOGICAL_MASK
667 }
668
669 pub fn next(&self) -> u64 {
676 loop {
677 let current = self.timestamp.load(Ordering::Acquire);
678 let current_physical = Self::get_physical(current);
679 let current_logical = Self::get_logical(current);
680
681 let now_physical = Self::physical_time_ms();
682
683 let (new_physical, new_logical) = if now_physical > current_physical {
684 (now_physical, 0)
686 } else {
687 if current_logical >= Self::LOGICAL_MASK {
689 std::thread::yield_now();
691 continue;
692 }
693 (current_physical, current_logical + 1)
694 };
695
696 let new_ts = (new_physical << Self::LOGICAL_BITS) | new_logical;
697
698 if self
699 .timestamp
700 .compare_exchange(current, new_ts, Ordering::AcqRel, Ordering::Acquire)
701 .is_ok()
702 {
703 return new_ts;
704 }
705 }
706 }
707
708 pub fn update(&self, msg_ts: u64) {
712 loop {
713 let current = self.timestamp.load(Ordering::Acquire);
714 let now_physical = Self::physical_time_ms();
715
716 let new_ts = if msg_ts > current {
717 let msg_physical = Self::get_physical(msg_ts);
719 let msg_logical = Self::get_logical(msg_ts);
720
721 if msg_physical > now_physical {
722 let bounded_physical = now_physical.max(msg_physical.saturating_sub(1000));
724 (bounded_physical << Self::LOGICAL_BITS) | (msg_logical + 1)
725 } else {
726 (now_physical << Self::LOGICAL_BITS) | (msg_logical + 1)
727 }
728 } else {
729 return;
731 };
732
733 if self
734 .timestamp
735 .compare_exchange(current, new_ts, Ordering::AcqRel, Ordering::Acquire)
736 .is_ok()
737 {
738 return;
739 }
740 }
741 }
742
743 pub fn now(&self) -> u64 {
745 self.timestamp.load(Ordering::Acquire)
746 }
747}
748
749impl Default for HybridLogicalClock {
750 fn default() -> Self {
751 Self::new()
752 }
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758
759 #[test]
760 fn test_ssi_basic_commit() {
761 let ssi = SsiManager::new();
762
763 let (txn1, _) = ssi.begin().unwrap();
764 ssi.record_read(txn1, b"key1").unwrap();
765 ssi.record_write(txn1, b"key1").unwrap();
766 let commit_ts = ssi.commit(txn1).unwrap();
767
768 assert!(commit_ts > 0);
769 assert!(matches!(
770 ssi.get_status(txn1),
771 Some(SsiTxnStatus::Committed(_))
772 ));
773 }
774
775 #[test]
776 fn test_ssi_write_write_conflict() {
777 let ssi = SsiManager::new();
778
779 let (txn1, _) = ssi.begin().unwrap();
781
782 let (txn2, _) = ssi.begin().unwrap();
784 ssi.record_write(txn2, b"key1").unwrap();
785 ssi.commit(txn2).unwrap();
786
787 let result = ssi.record_write(txn1, b"key1");
789 assert!(result.is_err());
790 assert!(matches!(
791 result.unwrap_err().conflict_type,
792 ConflictType::WriteWrite
793 ));
794 }
795
796 #[test]
797 fn test_ssi_rw_antidependency() {
798 let ssi = SsiManager::new();
799
800 let (txn1, _) = ssi.begin().unwrap();
802 ssi.record_read(txn1, b"key1").unwrap();
803
804 let (txn2, _) = ssi.begin().unwrap();
806 ssi.record_write(txn2, b"key1").unwrap();
807 ssi.commit(txn2).unwrap();
808
809 let result = ssi.commit(txn1);
812 assert!(result.is_ok());
813 }
814
815 #[test]
816 fn test_ssi_dangerous_structure() {
817 let ssi = SsiManager::new();
818
819 let (txn1, _) = ssi.begin().unwrap();
823 ssi.record_read(txn1, b"key1").unwrap();
824
825 let (txn2, _) = ssi.begin().unwrap();
826 ssi.record_write(txn2, b"key1").unwrap();
827 ssi.commit(txn2).unwrap(); ssi.record_write(txn1, b"key2").unwrap();
830
831 let (txn3, _) = ssi.begin().unwrap();
833 ssi.record_read(txn3, b"key2").unwrap();
834 ssi.record_write(txn3, b"key1").unwrap();
837 ssi.commit(txn3).unwrap(); let _result = ssi.commit(txn1);
841 }
845
846 #[test]
847 fn test_hlc_monotonic() {
848 let hlc = HybridLogicalClock::new();
849
850 let mut prev = hlc.next();
851 for _ in 0..1000 {
852 let curr = hlc.next();
853 assert!(curr > prev, "HLC must be monotonic");
854 prev = curr;
855 }
856 }
857
858 #[test]
859 fn test_hlc_physical_extraction() {
860 let hlc = HybridLogicalClock::new();
861 let ts = hlc.next();
862
863 let physical = HybridLogicalClock::get_physical(ts);
864 let logical = HybridLogicalClock::get_logical(ts);
865
866 assert!(physical > 1577836800000); assert!(logical < 1000);
871 }
872
873 #[test]
874 fn test_version_visibility() {
875 let states = SsiTxnStates::new();
876
877 states.set_status(1, SsiTxnStatus::Committed(100));
879
880 let version = SsiVersionInfo::new(1, 100);
882
883 assert!(version.is_visible(150, &states));
885
886 assert!(!version.is_visible(50, &states));
888 }
889
890 #[test]
891 fn test_ssi_abort_cleanup() {
892 let ssi = SsiManager::new();
893
894 let (txn1, _) = ssi.begin().unwrap();
895 ssi.record_write(txn1, b"key1").unwrap();
896 ssi.abort(txn1);
897
898 let (txn2, _) = ssi.begin().unwrap();
900 let result = ssi.record_write(txn2, b"key1");
901 assert!(result.is_ok());
902 }
903
904 #[test]
905 fn test_ssi_bloom_filter_negative() {
906 let mut txn = SsiTransaction::new(1, 100);
908 txn.record_read(b"alpha");
909 txn.record_read(b"beta");
910
911 assert!(txn.maybe_read(b"alpha"));
913 assert!(txn.maybe_read(b"beta"));
914
915 let _ = txn.maybe_read(b"gamma");
919 }
920
921 #[test]
922 fn test_ssi_inline_key_read_write_sets() {
923 let mut txn = SsiTransaction::new(1, 100);
925
926 txn.record_read(b"short");
928 txn.record_write(b"short_w");
929
930 let k32 = [0xABu8; 32];
932 txn.record_read(&k32);
933 txn.record_write(&k32);
934
935 let k64 = [0xCDu8; 64];
937 txn.record_read(&k64);
938 txn.record_write(&k64);
939
940 assert_eq!(txn.read_set.len(), 3);
941 assert_eq!(txn.write_set.len(), 3);
942 }
943
944 #[test]
945 fn test_ssi_dashmap_concurrent_disjoint_keys() {
946 let ssi = SsiManager::new();
948 let mut txns = Vec::new();
949 for i in 0..10 {
950 let (tid, _) = ssi.begin().unwrap();
951 let key = format!("key_{}", i);
952 ssi.record_write(tid, key.as_bytes()).unwrap();
953 txns.push(tid);
954 }
955 for tid in txns {
957 assert!(ssi.commit(tid).is_ok());
958 }
959 }
960
961 #[test]
962 fn test_ssi_dashmap_abort_cleans_all_shards() {
963 let ssi = SsiManager::new();
964
965 let (txn1, _) = ssi.begin().unwrap();
966 for i in 0..20 {
968 let key = format!("shard_test_{}", i);
969 ssi.record_write(txn1, key.as_bytes()).unwrap();
970 }
971
972 ssi.abort(txn1);
973
974 for i in 0..20 {
976 let key = format!("shard_test_{}", i);
977 let has_entry = ssi.key_writers.get(key.as_bytes()).is_some();
978 assert!(!has_entry, "key_writers should be cleaned for {}", key);
979 }
980
981 let (txn2, _) = ssi.begin().unwrap();
983 ssi.record_write(txn2, b"shard_test_5").unwrap();
984 assert!(ssi.commit(txn2).is_ok());
985 }
986}