1use std::cmp::Ordering;
49use std::collections::{BTreeMap, HashSet};
50use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
51use std::sync::Arc;
52use std::time::Duration;
53
54use parking_lot::RwLock;
55
56pub type TxnId = u64;
58
59#[derive(Debug, Clone, PartialEq, Eq)]
61pub enum RangeBound {
62 Unbounded,
64 Inclusive(Vec<u8>),
66 Exclusive(Vec<u8>),
68}
69
70impl RangeBound {
71 pub fn is_before(&self, key: &[u8]) -> bool {
73 match self {
74 RangeBound::Unbounded => true,
75 RangeBound::Inclusive(bound) => bound.as_slice() <= key,
76 RangeBound::Exclusive(bound) => bound.as_slice() < key,
77 }
78 }
79
80 pub fn is_after(&self, key: &[u8]) -> bool {
82 match self {
83 RangeBound::Unbounded => true,
84 RangeBound::Inclusive(bound) => bound.as_slice() >= key,
85 RangeBound::Exclusive(bound) => bound.as_slice() > key,
86 }
87 }
88}
89
90#[derive(Debug, Clone)]
92pub struct KeyRange {
93 pub start: RangeBound,
95 pub end: RangeBound,
97 pub table_id: Option<u64>,
99}
100
101impl KeyRange {
102 pub fn point(key: Vec<u8>) -> Self {
104 Self {
105 start: RangeBound::Inclusive(key.clone()),
106 end: RangeBound::Inclusive(key),
107 table_id: None,
108 }
109 }
110
111 pub fn range(start: RangeBound, end: RangeBound) -> Self {
113 Self {
114 start,
115 end,
116 table_id: None,
117 }
118 }
119
120 pub fn full_table(table_id: u64) -> Self {
122 Self {
123 start: RangeBound::Unbounded,
124 end: RangeBound::Unbounded,
125 table_id: Some(table_id),
126 }
127 }
128
129 pub fn contains(&self, key: &[u8]) -> bool {
131 self.start.is_before(key) && self.end.is_after(key)
132 }
133
134 pub fn overlaps(&self, other: &KeyRange) -> bool {
136 if self.table_id.is_some() && other.table_id.is_some() && self.table_id != other.table_id {
138 return false;
139 }
140
141 let self_start_before_other_end = match (&self.start, &other.end) {
144 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => true,
145 (RangeBound::Inclusive(s), RangeBound::Inclusive(e)) => s <= e,
146 (RangeBound::Inclusive(s), RangeBound::Exclusive(e)) => s < e,
147 (RangeBound::Exclusive(s), RangeBound::Inclusive(e)) => s < e,
148 (RangeBound::Exclusive(s), RangeBound::Exclusive(e)) => s < e,
149 };
150
151 let other_start_before_self_end = match (&other.start, &self.end) {
152 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => true,
153 (RangeBound::Inclusive(s), RangeBound::Inclusive(e)) => s <= e,
154 (RangeBound::Inclusive(s), RangeBound::Exclusive(e)) => s < e,
155 (RangeBound::Exclusive(s), RangeBound::Inclusive(e)) => s < e,
156 (RangeBound::Exclusive(s), RangeBound::Exclusive(e)) => s < e,
157 };
158
159 self_start_before_other_end && other_start_before_self_end
160 }
161
162 pub fn try_merge(&self, other: &KeyRange) -> Option<KeyRange> {
164 if !self.overlaps(other) {
165 return None;
166 }
167
168 let start = match (&self.start, &other.start) {
170 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => RangeBound::Unbounded,
171 (RangeBound::Inclusive(a), RangeBound::Inclusive(b)) => {
172 RangeBound::Inclusive(a.min(b).clone())
173 }
174 (RangeBound::Exclusive(a), RangeBound::Exclusive(b)) => {
175 RangeBound::Exclusive(a.min(b).clone())
176 }
177 (RangeBound::Inclusive(a), RangeBound::Exclusive(b)) => {
178 if a <= b {
179 RangeBound::Inclusive(a.clone())
180 } else {
181 RangeBound::Exclusive(b.clone())
182 }
183 }
184 (RangeBound::Exclusive(a), RangeBound::Inclusive(b)) => {
185 if b <= a {
186 RangeBound::Inclusive(b.clone())
187 } else {
188 RangeBound::Exclusive(a.clone())
189 }
190 }
191 };
192
193 let end = match (&self.end, &other.end) {
194 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => RangeBound::Unbounded,
195 (RangeBound::Inclusive(a), RangeBound::Inclusive(b)) => {
196 RangeBound::Inclusive(a.max(b).clone())
197 }
198 (RangeBound::Exclusive(a), RangeBound::Exclusive(b)) => {
199 RangeBound::Exclusive(a.max(b).clone())
200 }
201 (RangeBound::Inclusive(a), RangeBound::Exclusive(b)) => {
202 if a >= b {
203 RangeBound::Inclusive(a.clone())
204 } else {
205 RangeBound::Exclusive(b.clone())
206 }
207 }
208 (RangeBound::Exclusive(a), RangeBound::Inclusive(b)) => {
209 if b >= a {
210 RangeBound::Inclusive(b.clone())
211 } else {
212 RangeBound::Exclusive(a.clone())
213 }
214 }
215 };
216
217 Some(KeyRange {
218 start,
219 end,
220 table_id: self.table_id.or(other.table_id),
221 })
222 }
223}
224
225#[derive(Debug, Clone)]
227struct RangeLockEntry {
228 range: KeyRange,
229 txn_id: TxnId,
230 is_write: bool,
231}
232
233pub struct RangeLockManager {
237 locks: RwLock<Vec<RangeLockEntry>>,
239 stats: RangeLockStats,
241}
242
243#[derive(Default)]
245pub struct RangeLockStats {
246 pub total_locks: AtomicU64,
247 pub range_locks: AtomicU64,
248 pub point_locks: AtomicU64,
249 pub conflicts_detected: AtomicU64,
250 pub merges_performed: AtomicU64,
251}
252
253impl RangeLockManager {
254 pub fn new() -> Self {
255 Self {
256 locks: RwLock::new(Vec::new()),
257 stats: RangeLockStats::default(),
258 }
259 }
260
261 pub fn acquire(
263 &self,
264 txn_id: TxnId,
265 range: KeyRange,
266 is_write: bool,
267 ) -> Result<(), RangeLockConflict> {
268 self.stats.total_locks.fetch_add(1, AtomicOrdering::Relaxed);
269
270 {
272 let locks = self.locks.read();
273 for entry in locks.iter() {
274 if entry.txn_id == txn_id {
275 continue; }
277
278 if range.overlaps(&entry.range) {
279 if is_write || entry.is_write {
281 self.stats.conflicts_detected.fetch_add(1, AtomicOrdering::Relaxed);
282 return Err(RangeLockConflict {
283 holder_txn: entry.txn_id,
284 requester_txn: txn_id,
285 is_write_conflict: is_write && entry.is_write,
286 });
287 }
288 }
289 }
290 }
291
292 let mut locks = self.locks.write();
294
295 let mut merged = false;
297 for entry in locks.iter_mut() {
298 if entry.txn_id == txn_id && entry.is_write == is_write {
299 if let Some(merged_range) = entry.range.try_merge(&range) {
300 entry.range = merged_range;
301 merged = true;
302 self.stats.merges_performed.fetch_add(1, AtomicOrdering::Relaxed);
303 break;
304 }
305 }
306 }
307
308 if !merged {
309 locks.push(RangeLockEntry {
310 range,
311 txn_id,
312 is_write,
313 });
314 }
315
316 Ok(())
317 }
318
319 pub fn release(&self, txn_id: TxnId) {
321 let mut locks = self.locks.write();
322 locks.retain(|entry| entry.txn_id != txn_id);
323 }
324
325 pub fn check_conflict(
327 &self,
328 txn_id: TxnId,
329 range: &KeyRange,
330 is_write: bool,
331 ) -> Option<TxnId> {
332 let locks = self.locks.read();
333 for entry in locks.iter() {
334 if entry.txn_id == txn_id {
335 continue;
336 }
337 if range.overlaps(&entry.range) && (is_write || entry.is_write) {
338 return Some(entry.txn_id);
339 }
340 }
341 None
342 }
343
344 pub fn lock_count(&self, txn_id: TxnId) -> usize {
346 self.locks
347 .read()
348 .iter()
349 .filter(|e| e.txn_id == txn_id)
350 .count()
351 }
352}
353
354impl Default for RangeLockManager {
355 fn default() -> Self {
356 Self::new()
357 }
358}
359
360#[derive(Debug, Clone)]
362pub struct RangeLockConflict {
363 pub holder_txn: TxnId,
364 pub requester_txn: TxnId,
365 pub is_write_conflict: bool,
366}
367
368impl std::fmt::Display for RangeLockConflict {
369 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
370 write!(
371 f,
372 "Range lock conflict: txn {} blocked by txn {} (write: {})",
373 self.requester_txn, self.holder_txn, self.is_write_conflict
374 )
375 }
376}
377
378impl std::error::Error for RangeLockConflict {}
379
380pub struct BloomReadSet {
385 bits: Vec<u64>,
387 k: usize,
389 m: usize,
391 count: usize,
393}
394
395impl BloomReadSet {
396 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
402 let m = (-((expected_items as f64) * false_positive_rate.ln())
403 / (2.0_f64.ln().powi(2))) as usize;
404 let k = ((m as f64 / expected_items as f64) * 2.0_f64.ln()).ceil() as usize;
405
406 let words = m.div_ceil(64);
408
409 Self {
410 bits: vec![0u64; words],
411 k: k.max(1),
412 m: words * 64,
413 count: 0,
414 }
415 }
416
417 pub fn add(&mut self, key: &[u8]) {
419 for i in 0..self.k {
420 let hash = self.hash(key, i);
421 let bit_idx = hash % self.m;
422 let word_idx = bit_idx / 64;
423 let bit_offset = bit_idx % 64;
424 self.bits[word_idx] |= 1 << bit_offset;
425 }
426 self.count += 1;
427 }
428
429 pub fn might_contain(&self, key: &[u8]) -> bool {
435 for i in 0..self.k {
436 let hash = self.hash(key, i);
437 let bit_idx = hash % self.m;
438 let word_idx = bit_idx / 64;
439 let bit_offset = bit_idx % 64;
440 if self.bits[word_idx] & (1 << bit_offset) == 0 {
441 return false;
442 }
443 }
444 true
445 }
446
447 fn hash(&self, key: &[u8], seed: usize) -> usize {
449 let h1 = self.hash_fnv1a(key);
451 let h2 = self.hash_murmur_like(key);
452 ((h1 as u128 + (seed as u128) * (h2 as u128)) % (self.m as u128)) as usize
453 }
454
455 fn hash_fnv1a(&self, key: &[u8]) -> u64 {
456 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
457 const FNV_PRIME: u64 = 0x100000001b3;
458
459 let mut hash = FNV_OFFSET;
460 for byte in key {
461 hash ^= *byte as u64;
462 hash = hash.wrapping_mul(FNV_PRIME);
463 }
464 hash
465 }
466
467 fn hash_murmur_like(&self, key: &[u8]) -> u64 {
468 const M: u64 = 0xc6a4a7935bd1e995;
469 const R: u32 = 47;
470
471 let mut h: u64 = 0xdeadbeef ^ (key.len() as u64).wrapping_mul(M);
472
473 for chunk in key.chunks(8) {
474 let mut k = 0u64;
475 for (i, byte) in chunk.iter().enumerate() {
476 k |= (*byte as u64) << (i * 8);
477 }
478 k = k.wrapping_mul(M);
479 k ^= k >> R;
480 k = k.wrapping_mul(M);
481 h ^= k;
482 h = h.wrapping_mul(M);
483 }
484
485 h ^= h >> R;
486 h = h.wrapping_mul(M);
487 h ^= h >> R;
488 h
489 }
490
491 pub fn estimated_false_positive_rate(&self) -> f64 {
493 let ones = self.bits.iter().map(|w| w.count_ones()).sum::<u32>() as f64;
494 let ratio = ones / self.m as f64;
495 ratio.powi(self.k as i32)
496 }
497
498 pub fn count(&self) -> usize {
500 self.count
501 }
502
503 pub fn memory_bytes(&self) -> usize {
505 self.bits.len() * 8
506 }
507}
508
509pub struct AdaptiveReadSet {
511 threshold: usize,
513 exact_keys: Option<HashSet<Vec<u8>>>,
515 bloom: Option<BloomReadSet>,
517 ranges: Vec<KeyRange>,
519}
520
521impl AdaptiveReadSet {
522 pub fn new(threshold: usize) -> Self {
524 Self {
525 threshold,
526 exact_keys: Some(HashSet::new()),
527 bloom: None,
528 ranges: Vec::new(),
529 }
530 }
531
532 pub fn add_point(&mut self, key: Vec<u8>) {
534 if let Some(ref mut exact) = self.exact_keys {
535 exact.insert(key.clone());
536 if exact.len() >= self.threshold {
537 let mut bloom = BloomReadSet::new(self.threshold * 10, 0.01);
539 for k in exact.drain() {
540 bloom.add(&k);
541 }
542 bloom.add(&key);
543 self.bloom = Some(bloom);
544 self.exact_keys = None;
545 }
546 } else if let Some(ref mut bloom) = self.bloom {
547 bloom.add(&key);
548 }
549 }
550
551 pub fn add_range(&mut self, range: KeyRange) {
553 let mut merged = false;
555 for existing in &mut self.ranges {
556 if let Some(merged_range) = existing.try_merge(&range) {
557 *existing = merged_range;
558 merged = true;
559 break;
560 }
561 }
562 if !merged {
563 self.ranges.push(range);
564 }
565 }
566
567 pub fn might_conflict(&self, key: &[u8]) -> bool {
569 if let Some(ref exact) = self.exact_keys {
571 if exact.contains(key) {
572 return true;
573 }
574 }
575
576 if let Some(ref bloom) = self.bloom {
578 if bloom.might_contain(key) {
579 return true;
580 }
581 }
582
583 for range in &self.ranges {
585 if range.contains(key) {
586 return true;
587 }
588 }
589
590 false
591 }
592
593 pub fn memory_bytes(&self) -> usize {
595 let exact = self
596 .exact_keys
597 .as_ref()
598 .map(|e| e.iter().map(|k| k.len()).sum::<usize>())
599 .unwrap_or(0);
600 let bloom = self.bloom.as_ref().map(|b| b.memory_bytes()).unwrap_or(0);
601 let ranges = self.ranges.len() * 64; exact + bloom + ranges
603 }
604
605 pub fn is_exact(&self) -> bool {
607 self.exact_keys.is_some()
608 }
609}
610
611#[derive(Debug, Clone)]
613pub struct BackoffStrategy {
614 pub initial: Duration,
616 pub max: Duration,
618 pub multiplier: f64,
620 pub jitter: bool,
622}
623
624impl Default for BackoffStrategy {
625 fn default() -> Self {
626 Self {
627 initial: Duration::from_micros(100),
628 max: Duration::from_millis(100),
629 multiplier: 2.0,
630 jitter: true,
631 }
632 }
633}
634
635impl BackoffStrategy {
636 pub fn backoff_for(&self, attempt: u32) -> Duration {
638 let base = self.initial.as_nanos() as f64 * self.multiplier.powi(attempt as i32);
639 let capped = base.min(self.max.as_nanos() as f64);
640
641 let final_nanos = if self.jitter {
642 let jitter = (capped * 0.25) * (rand_simple() * 2.0 - 1.0);
644 (capped + jitter).max(0.0)
645 } else {
646 capped
647 };
648
649 Duration::from_nanos(final_nanos as u64)
650 }
651}
652
653fn rand_simple() -> f64 {
655 use std::time::SystemTime;
656 let nanos = SystemTime::now()
657 .duration_since(SystemTime::UNIX_EPOCH)
658 .unwrap_or_default()
659 .subsec_nanos();
660 ((nanos % 1000) as f64) / 1000.0
661}
662
663#[cfg(test)]
664mod tests {
665 use super::*;
666
667 #[test]
668 fn test_key_range_contains() {
669 let range = KeyRange::range(
670 RangeBound::Inclusive(b"aaa".to_vec()),
671 RangeBound::Inclusive(b"zzz".to_vec()),
672 );
673
674 assert!(range.contains(b"mmm"));
675 assert!(range.contains(b"aaa"));
676 assert!(range.contains(b"zzz"));
677 assert!(!range.contains(b"AAA"));
678 }
679
680 #[test]
681 fn test_key_range_overlap() {
682 let r1 = KeyRange::range(
683 RangeBound::Inclusive(b"a".to_vec()),
684 RangeBound::Inclusive(b"m".to_vec()),
685 );
686 let r2 = KeyRange::range(
687 RangeBound::Inclusive(b"k".to_vec()),
688 RangeBound::Inclusive(b"z".to_vec()),
689 );
690 let r3 = KeyRange::range(
691 RangeBound::Inclusive(b"n".to_vec()),
692 RangeBound::Inclusive(b"z".to_vec()),
693 );
694
695 assert!(r1.overlaps(&r2)); assert!(!r1.overlaps(&r3)); }
698
699 #[test]
700 fn test_bloom_filter() {
701 let mut bloom = BloomReadSet::new(1000, 0.01);
702
703 bloom.add(b"key1");
704 bloom.add(b"key2");
705 bloom.add(b"key3");
706
707 assert!(bloom.might_contain(b"key1"));
708 assert!(bloom.might_contain(b"key2"));
709 assert!(bloom.might_contain(b"key3"));
710 }
712
713 #[test]
714 fn test_range_lock_manager() {
715 let manager = RangeLockManager::new();
716
717 assert!(manager
719 .acquire(1, KeyRange::point(b"key1".to_vec()), true)
720 .is_ok());
721 assert!(manager
722 .acquire(2, KeyRange::point(b"key2".to_vec()), true)
723 .is_ok());
724
725 assert!(manager
727 .acquire(3, KeyRange::point(b"key1".to_vec()), true)
728 .is_err());
729
730 manager.release(1);
732 assert!(manager
733 .acquire(3, KeyRange::point(b"key1".to_vec()), true)
734 .is_ok());
735 }
736
737 #[test]
738 fn test_adaptive_read_set() {
739 let mut set = AdaptiveReadSet::new(100);
740
741 for i in 0..50 {
743 set.add_point(format!("key{}", i).into_bytes());
744 }
745 assert!(set.is_exact());
746
747 for i in 50..150 {
749 set.add_point(format!("key{}", i).into_bytes());
750 }
751 assert!(!set.is_exact());
752
753 assert!(set.might_conflict(b"key0"));
755 assert!(set.might_conflict(b"key100"));
756 }
757
758 #[test]
759 fn test_backoff_strategy() {
760 let strategy = BackoffStrategy::default();
761
762 let b0 = strategy.backoff_for(0);
763 let b1 = strategy.backoff_for(1);
764 let b5 = strategy.backoff_for(5);
765
766 assert!(b1 > b0);
767 assert!(b5 > b1);
768 assert!(b5 <= strategy.max);
769 }
770}