1use std::collections::HashSet;
49use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
50use std::time::Duration;
51
52use parking_lot::RwLock;
53
54pub type TxnId = u64;
56
57#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum RangeBound {
60 Unbounded,
62 Inclusive(Vec<u8>),
64 Exclusive(Vec<u8>),
66}
67
68impl RangeBound {
69 pub fn is_before(&self, key: &[u8]) -> bool {
71 match self {
72 RangeBound::Unbounded => true,
73 RangeBound::Inclusive(bound) => bound.as_slice() <= key,
74 RangeBound::Exclusive(bound) => bound.as_slice() < key,
75 }
76 }
77
78 pub fn is_after(&self, key: &[u8]) -> bool {
80 match self {
81 RangeBound::Unbounded => true,
82 RangeBound::Inclusive(bound) => bound.as_slice() >= key,
83 RangeBound::Exclusive(bound) => bound.as_slice() > key,
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct KeyRange {
91 pub start: RangeBound,
93 pub end: RangeBound,
95 pub table_id: Option<u64>,
97}
98
99impl KeyRange {
100 pub fn point(key: Vec<u8>) -> Self {
102 Self {
103 start: RangeBound::Inclusive(key.clone()),
104 end: RangeBound::Inclusive(key),
105 table_id: None,
106 }
107 }
108
109 pub fn range(start: RangeBound, end: RangeBound) -> Self {
111 Self {
112 start,
113 end,
114 table_id: None,
115 }
116 }
117
118 pub fn full_table(table_id: u64) -> Self {
120 Self {
121 start: RangeBound::Unbounded,
122 end: RangeBound::Unbounded,
123 table_id: Some(table_id),
124 }
125 }
126
127 pub fn contains(&self, key: &[u8]) -> bool {
129 self.start.is_before(key) && self.end.is_after(key)
130 }
131
132 pub fn overlaps(&self, other: &KeyRange) -> bool {
134 if self.table_id.is_some() && other.table_id.is_some() && self.table_id != other.table_id {
136 return false;
137 }
138
139 let self_start_before_other_end = match (&self.start, &other.end) {
142 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => true,
143 (RangeBound::Inclusive(s), RangeBound::Inclusive(e)) => s <= e,
144 (RangeBound::Inclusive(s), RangeBound::Exclusive(e)) => s < e,
145 (RangeBound::Exclusive(s), RangeBound::Inclusive(e)) => s < e,
146 (RangeBound::Exclusive(s), RangeBound::Exclusive(e)) => s < e,
147 };
148
149 let other_start_before_self_end = match (&other.start, &self.end) {
150 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => true,
151 (RangeBound::Inclusive(s), RangeBound::Inclusive(e)) => s <= e,
152 (RangeBound::Inclusive(s), RangeBound::Exclusive(e)) => s < e,
153 (RangeBound::Exclusive(s), RangeBound::Inclusive(e)) => s < e,
154 (RangeBound::Exclusive(s), RangeBound::Exclusive(e)) => s < e,
155 };
156
157 self_start_before_other_end && other_start_before_self_end
158 }
159
160 pub fn try_merge(&self, other: &KeyRange) -> Option<KeyRange> {
162 if !self.overlaps(other) {
163 return None;
164 }
165
166 let start = match (&self.start, &other.start) {
168 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => RangeBound::Unbounded,
169 (RangeBound::Inclusive(a), RangeBound::Inclusive(b)) => {
170 RangeBound::Inclusive(a.min(b).clone())
171 }
172 (RangeBound::Exclusive(a), RangeBound::Exclusive(b)) => {
173 RangeBound::Exclusive(a.min(b).clone())
174 }
175 (RangeBound::Inclusive(a), RangeBound::Exclusive(b)) => {
176 if a <= b {
177 RangeBound::Inclusive(a.clone())
178 } else {
179 RangeBound::Exclusive(b.clone())
180 }
181 }
182 (RangeBound::Exclusive(a), RangeBound::Inclusive(b)) => {
183 if b <= a {
184 RangeBound::Inclusive(b.clone())
185 } else {
186 RangeBound::Exclusive(a.clone())
187 }
188 }
189 };
190
191 let end = match (&self.end, &other.end) {
192 (RangeBound::Unbounded, _) | (_, RangeBound::Unbounded) => RangeBound::Unbounded,
193 (RangeBound::Inclusive(a), RangeBound::Inclusive(b)) => {
194 RangeBound::Inclusive(a.max(b).clone())
195 }
196 (RangeBound::Exclusive(a), RangeBound::Exclusive(b)) => {
197 RangeBound::Exclusive(a.max(b).clone())
198 }
199 (RangeBound::Inclusive(a), RangeBound::Exclusive(b)) => {
200 if a >= b {
201 RangeBound::Inclusive(a.clone())
202 } else {
203 RangeBound::Exclusive(b.clone())
204 }
205 }
206 (RangeBound::Exclusive(a), RangeBound::Inclusive(b)) => {
207 if b >= a {
208 RangeBound::Inclusive(b.clone())
209 } else {
210 RangeBound::Exclusive(a.clone())
211 }
212 }
213 };
214
215 Some(KeyRange {
216 start,
217 end,
218 table_id: self.table_id.or(other.table_id),
219 })
220 }
221}
222
223#[derive(Debug, Clone)]
225struct RangeLockEntry {
226 range: KeyRange,
227 txn_id: TxnId,
228 is_write: bool,
229}
230
231pub struct RangeLockManager {
235 locks: RwLock<Vec<RangeLockEntry>>,
237 stats: RangeLockStats,
239}
240
241#[derive(Default)]
243pub struct RangeLockStats {
244 pub total_locks: AtomicU64,
245 pub range_locks: AtomicU64,
246 pub point_locks: AtomicU64,
247 pub conflicts_detected: AtomicU64,
248 pub merges_performed: AtomicU64,
249}
250
251impl RangeLockManager {
252 pub fn new() -> Self {
253 Self {
254 locks: RwLock::new(Vec::new()),
255 stats: RangeLockStats::default(),
256 }
257 }
258
259 pub fn acquire(
261 &self,
262 txn_id: TxnId,
263 range: KeyRange,
264 is_write: bool,
265 ) -> Result<(), RangeLockConflict> {
266 self.stats.total_locks.fetch_add(1, AtomicOrdering::Relaxed);
267
268 {
270 let locks = self.locks.read();
271 for entry in locks.iter() {
272 if entry.txn_id == txn_id {
273 continue; }
275
276 if range.overlaps(&entry.range) {
277 if is_write || entry.is_write {
279 self.stats
280 .conflicts_detected
281 .fetch_add(1, AtomicOrdering::Relaxed);
282 return Err(RangeLockConflict {
283 holder_txn: entry.txn_id,
284 requester_txn: txn_id,
285 is_write_conflict: is_write && entry.is_write,
286 });
287 }
288 }
289 }
290 }
291
292 let mut locks = self.locks.write();
294
295 let mut merged = false;
297 for entry in locks.iter_mut() {
298 if entry.txn_id == txn_id && entry.is_write == is_write {
299 if let Some(merged_range) = entry.range.try_merge(&range) {
300 entry.range = merged_range;
301 merged = true;
302 self.stats
303 .merges_performed
304 .fetch_add(1, AtomicOrdering::Relaxed);
305 break;
306 }
307 }
308 }
309
310 if !merged {
311 locks.push(RangeLockEntry {
312 range,
313 txn_id,
314 is_write,
315 });
316 }
317
318 Ok(())
319 }
320
321 pub fn release(&self, txn_id: TxnId) {
323 let mut locks = self.locks.write();
324 locks.retain(|entry| entry.txn_id != txn_id);
325 }
326
327 pub fn check_conflict(&self, txn_id: TxnId, range: &KeyRange, is_write: bool) -> Option<TxnId> {
329 let locks = self.locks.read();
330 for entry in locks.iter() {
331 if entry.txn_id == txn_id {
332 continue;
333 }
334 if range.overlaps(&entry.range) && (is_write || entry.is_write) {
335 return Some(entry.txn_id);
336 }
337 }
338 None
339 }
340
341 pub fn lock_count(&self, txn_id: TxnId) -> usize {
343 self.locks
344 .read()
345 .iter()
346 .filter(|e| e.txn_id == txn_id)
347 .count()
348 }
349}
350
351impl Default for RangeLockManager {
352 fn default() -> Self {
353 Self::new()
354 }
355}
356
357#[derive(Debug, Clone)]
359pub struct RangeLockConflict {
360 pub holder_txn: TxnId,
361 pub requester_txn: TxnId,
362 pub is_write_conflict: bool,
363}
364
365impl std::fmt::Display for RangeLockConflict {
366 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 write!(
368 f,
369 "Range lock conflict: txn {} blocked by txn {} (write: {})",
370 self.requester_txn, self.holder_txn, self.is_write_conflict
371 )
372 }
373}
374
375impl std::error::Error for RangeLockConflict {}
376
377pub struct BloomReadSet {
382 bits: Vec<u64>,
384 k: usize,
386 m: usize,
388 count: usize,
390}
391
392impl BloomReadSet {
393 pub fn new(expected_items: usize, false_positive_rate: f64) -> Self {
399 let m = (-((expected_items as f64) * false_positive_rate.ln()) / (2.0_f64.ln().powi(2)))
400 as usize;
401 let k = ((m as f64 / expected_items as f64) * 2.0_f64.ln()).ceil() as usize;
402
403 let words = m.div_ceil(64);
405
406 Self {
407 bits: vec![0u64; words],
408 k: k.max(1),
409 m: words * 64,
410 count: 0,
411 }
412 }
413
414 pub fn add(&mut self, key: &[u8]) {
416 for i in 0..self.k {
417 let hash = self.hash(key, i);
418 let bit_idx = hash % self.m;
419 let word_idx = bit_idx / 64;
420 let bit_offset = bit_idx % 64;
421 self.bits[word_idx] |= 1 << bit_offset;
422 }
423 self.count += 1;
424 }
425
426 pub fn might_contain(&self, key: &[u8]) -> bool {
432 for i in 0..self.k {
433 let hash = self.hash(key, i);
434 let bit_idx = hash % self.m;
435 let word_idx = bit_idx / 64;
436 let bit_offset = bit_idx % 64;
437 if self.bits[word_idx] & (1 << bit_offset) == 0 {
438 return false;
439 }
440 }
441 true
442 }
443
444 fn hash(&self, key: &[u8], seed: usize) -> usize {
446 let h1 = self.hash_fnv1a(key);
448 let h2 = self.hash_murmur_like(key);
449 ((h1 as u128 + (seed as u128) * (h2 as u128)) % (self.m as u128)) as usize
450 }
451
452 fn hash_fnv1a(&self, key: &[u8]) -> u64 {
453 const FNV_OFFSET: u64 = 0xcbf29ce484222325;
454 const FNV_PRIME: u64 = 0x100000001b3;
455
456 let mut hash = FNV_OFFSET;
457 for byte in key {
458 hash ^= *byte as u64;
459 hash = hash.wrapping_mul(FNV_PRIME);
460 }
461 hash
462 }
463
464 fn hash_murmur_like(&self, key: &[u8]) -> u64 {
465 const M: u64 = 0xc6a4a7935bd1e995;
466 const R: u32 = 47;
467
468 let mut h: u64 = 0xdeadbeef ^ (key.len() as u64).wrapping_mul(M);
469
470 for chunk in key.chunks(8) {
471 let mut k = 0u64;
472 for (i, byte) in chunk.iter().enumerate() {
473 k |= (*byte as u64) << (i * 8);
474 }
475 k = k.wrapping_mul(M);
476 k ^= k >> R;
477 k = k.wrapping_mul(M);
478 h ^= k;
479 h = h.wrapping_mul(M);
480 }
481
482 h ^= h >> R;
483 h = h.wrapping_mul(M);
484 h ^= h >> R;
485 h
486 }
487
488 pub fn estimated_false_positive_rate(&self) -> f64 {
490 let ones = self.bits.iter().map(|w| w.count_ones()).sum::<u32>() as f64;
491 let ratio = ones / self.m as f64;
492 ratio.powi(self.k as i32)
493 }
494
495 pub fn count(&self) -> usize {
497 self.count
498 }
499
500 pub fn memory_bytes(&self) -> usize {
502 self.bits.len() * 8
503 }
504}
505
506pub struct AdaptiveReadSet {
508 threshold: usize,
510 exact_keys: Option<HashSet<Vec<u8>>>,
512 bloom: Option<BloomReadSet>,
514 ranges: Vec<KeyRange>,
516}
517
518impl AdaptiveReadSet {
519 pub fn new(threshold: usize) -> Self {
521 Self {
522 threshold,
523 exact_keys: Some(HashSet::new()),
524 bloom: None,
525 ranges: Vec::new(),
526 }
527 }
528
529 pub fn add_point(&mut self, key: Vec<u8>) {
531 if let Some(ref mut exact) = self.exact_keys {
532 exact.insert(key.clone());
533 if exact.len() >= self.threshold {
534 let mut bloom = BloomReadSet::new(self.threshold * 10, 0.01);
536 for k in exact.drain() {
537 bloom.add(&k);
538 }
539 bloom.add(&key);
540 self.bloom = Some(bloom);
541 self.exact_keys = None;
542 }
543 } else if let Some(ref mut bloom) = self.bloom {
544 bloom.add(&key);
545 }
546 }
547
548 pub fn add_range(&mut self, range: KeyRange) {
550 let mut merged = false;
552 for existing in &mut self.ranges {
553 if let Some(merged_range) = existing.try_merge(&range) {
554 *existing = merged_range;
555 merged = true;
556 break;
557 }
558 }
559 if !merged {
560 self.ranges.push(range);
561 }
562 }
563
564 pub fn might_conflict(&self, key: &[u8]) -> bool {
566 if let Some(ref exact) = self.exact_keys {
568 if exact.contains(key) {
569 return true;
570 }
571 }
572
573 if let Some(ref bloom) = self.bloom {
575 if bloom.might_contain(key) {
576 return true;
577 }
578 }
579
580 for range in &self.ranges {
582 if range.contains(key) {
583 return true;
584 }
585 }
586
587 false
588 }
589
590 pub fn memory_bytes(&self) -> usize {
592 let exact = self
593 .exact_keys
594 .as_ref()
595 .map(|e| e.iter().map(|k| k.len()).sum::<usize>())
596 .unwrap_or(0);
597 let bloom = self.bloom.as_ref().map(|b| b.memory_bytes()).unwrap_or(0);
598 let ranges = self.ranges.len() * 64; exact + bloom + ranges
600 }
601
602 pub fn is_exact(&self) -> bool {
604 self.exact_keys.is_some()
605 }
606}
607
608#[derive(Debug, Clone)]
610pub struct BackoffStrategy {
611 pub initial: Duration,
613 pub max: Duration,
615 pub multiplier: f64,
617 pub jitter: bool,
619}
620
621impl Default for BackoffStrategy {
622 fn default() -> Self {
623 Self {
624 initial: Duration::from_micros(100),
625 max: Duration::from_millis(100),
626 multiplier: 2.0,
627 jitter: true,
628 }
629 }
630}
631
632impl BackoffStrategy {
633 pub fn backoff_for(&self, attempt: u32) -> Duration {
635 let base = self.initial.as_nanos() as f64 * self.multiplier.powi(attempt as i32);
636 let capped = base.min(self.max.as_nanos() as f64);
637
638 let final_nanos = if self.jitter {
639 let jitter = (capped * 0.25) * (rand_simple() * 2.0 - 1.0);
641 (capped + jitter).max(0.0)
642 } else {
643 capped
644 };
645
646 Duration::from_nanos(final_nanos as u64)
647 }
648}
649
650fn rand_simple() -> f64 {
652 use std::time::SystemTime;
653 let nanos = SystemTime::now()
654 .duration_since(SystemTime::UNIX_EPOCH)
655 .unwrap_or_default()
656 .subsec_nanos();
657 ((nanos % 1000) as f64) / 1000.0
658}
659
660#[cfg(test)]
661mod tests {
662 use super::*;
663
664 #[test]
665 fn test_key_range_contains() {
666 let range = KeyRange::range(
667 RangeBound::Inclusive(b"aaa".to_vec()),
668 RangeBound::Inclusive(b"zzz".to_vec()),
669 );
670
671 assert!(range.contains(b"mmm"));
672 assert!(range.contains(b"aaa"));
673 assert!(range.contains(b"zzz"));
674 assert!(!range.contains(b"AAA"));
675 }
676
677 #[test]
678 fn test_key_range_overlap() {
679 let r1 = KeyRange::range(
680 RangeBound::Inclusive(b"a".to_vec()),
681 RangeBound::Inclusive(b"m".to_vec()),
682 );
683 let r2 = KeyRange::range(
684 RangeBound::Inclusive(b"k".to_vec()),
685 RangeBound::Inclusive(b"z".to_vec()),
686 );
687 let r3 = KeyRange::range(
688 RangeBound::Inclusive(b"n".to_vec()),
689 RangeBound::Inclusive(b"z".to_vec()),
690 );
691
692 assert!(r1.overlaps(&r2)); assert!(!r1.overlaps(&r3)); }
695
696 #[test]
697 fn test_bloom_filter() {
698 let mut bloom = BloomReadSet::new(1000, 0.01);
699
700 bloom.add(b"key1");
701 bloom.add(b"key2");
702 bloom.add(b"key3");
703
704 assert!(bloom.might_contain(b"key1"));
705 assert!(bloom.might_contain(b"key2"));
706 assert!(bloom.might_contain(b"key3"));
707 }
709
710 #[test]
711 fn test_range_lock_manager() {
712 let manager = RangeLockManager::new();
713
714 assert!(
716 manager
717 .acquire(1, KeyRange::point(b"key1".to_vec()), true)
718 .is_ok()
719 );
720 assert!(
721 manager
722 .acquire(2, KeyRange::point(b"key2".to_vec()), true)
723 .is_ok()
724 );
725
726 assert!(
728 manager
729 .acquire(3, KeyRange::point(b"key1".to_vec()), true)
730 .is_err()
731 );
732
733 manager.release(1);
735 assert!(
736 manager
737 .acquire(3, KeyRange::point(b"key1".to_vec()), true)
738 .is_ok()
739 );
740 }
741
742 #[test]
743 fn test_adaptive_read_set() {
744 let mut set = AdaptiveReadSet::new(100);
745
746 for i in 0..50 {
748 set.add_point(format!("key{}", i).into_bytes());
749 }
750 assert!(set.is_exact());
751
752 for i in 50..150 {
754 set.add_point(format!("key{}", i).into_bytes());
755 }
756 assert!(!set.is_exact());
757
758 assert!(set.might_conflict(b"key0"));
760 assert!(set.might_conflict(b"key100"));
761 }
762
763 #[test]
764 fn test_backoff_strategy() {
765 let strategy = BackoffStrategy::default();
766
767 let b0 = strategy.backoff_for(0);
768 let b1 = strategy.backoff_for(1);
769 let b5 = strategy.backoff_for(5);
770
771 assert!(b1 > b0);
772 assert!(b5 > b1);
773 assert!(b5 <= strategy.max);
774 }
775}