1use bytes::Bytes;
45use crossbeam_channel::{bounded, unbounded, Receiver, Sender, TryRecvError, TrySendError};
46use parking_lot::RwLock;
47use std::cell::UnsafeCell;
48use std::collections::HashMap;
49use std::hash::{Hash, Hasher};
50use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering};
51use std::sync::Arc;
52
53#[repr(C, align(64))]
59pub struct CacheAligned<T>(pub T);
60
61impl<T> std::ops::Deref for CacheAligned<T> {
62 type Target = T;
63 fn deref(&self) -> &T {
64 &self.0
65 }
66}
67
68impl<T> std::ops::DerefMut for CacheAligned<T> {
69 fn deref_mut(&mut self) -> &mut T {
70 &mut self.0
71 }
72}
73
74pub struct LockFreeQueue<T> {
85 sender: Sender<T>,
86 receiver: Receiver<T>,
87 capacity: usize,
88 len: AtomicUsize,
90 total_enqueued: AtomicU64,
92 total_dequeued: AtomicU64,
94 blocked_enqueues: AtomicU64,
96}
97
98impl<T> LockFreeQueue<T> {
99 pub fn bounded(capacity: usize) -> Self {
101 let (sender, receiver) = bounded(capacity);
102 Self {
103 sender,
104 receiver,
105 capacity,
106 len: AtomicUsize::new(0),
107 total_enqueued: AtomicU64::new(0),
108 total_dequeued: AtomicU64::new(0),
109 blocked_enqueues: AtomicU64::new(0),
110 }
111 }
112
113 pub fn unbounded() -> Self {
115 let (sender, receiver) = unbounded();
116 Self {
117 sender,
118 receiver,
119 capacity: usize::MAX,
120 len: AtomicUsize::new(0),
121 total_enqueued: AtomicU64::new(0),
122 total_dequeued: AtomicU64::new(0),
123 blocked_enqueues: AtomicU64::new(0),
124 }
125 }
126
127 pub fn try_push(&self, item: T) -> Result<(), T> {
129 match self.sender.try_send(item) {
130 Ok(()) => {
131 self.len.fetch_add(1, Ordering::Relaxed);
132 self.total_enqueued.fetch_add(1, Ordering::Relaxed);
133 Ok(())
134 }
135 Err(TrySendError::Full(item)) => {
136 self.blocked_enqueues.fetch_add(1, Ordering::Relaxed);
137 Err(item)
138 }
139 Err(TrySendError::Disconnected(item)) => Err(item),
140 }
141 }
142
143 pub fn push(&self, item: T) -> Result<(), T> {
145 match self.sender.send(item) {
146 Ok(()) => {
147 self.len.fetch_add(1, Ordering::Relaxed);
148 self.total_enqueued.fetch_add(1, Ordering::Relaxed);
149 Ok(())
150 }
151 Err(e) => Err(e.0),
152 }
153 }
154
155 pub fn try_pop(&self) -> Option<T> {
157 match self.receiver.try_recv() {
158 Ok(item) => {
159 self.len.fetch_sub(1, Ordering::Relaxed);
160 self.total_dequeued.fetch_add(1, Ordering::Relaxed);
161 Some(item)
162 }
163 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => None,
164 }
165 }
166
167 pub fn pop(&self) -> Option<T> {
169 match self.receiver.recv() {
170 Ok(item) => {
171 self.len.fetch_sub(1, Ordering::Relaxed);
172 self.total_dequeued.fetch_add(1, Ordering::Relaxed);
173 Some(item)
174 }
175 Err(_) => None,
176 }
177 }
178
179 pub fn pop_batch(&self, max: usize) -> Vec<T> {
181 let mut batch = Vec::with_capacity(max.min(64));
182 for _ in 0..max {
183 match self.receiver.try_recv() {
184 Ok(item) => {
185 batch.push(item);
186 }
187 Err(_) => break,
188 }
189 }
190 let count = batch.len();
191 if count > 0 {
192 self.len.fetch_sub(count, Ordering::Relaxed);
193 self.total_dequeued
194 .fetch_add(count as u64, Ordering::Relaxed);
195 }
196 batch
197 }
198
199 pub fn len(&self) -> usize {
201 self.len.load(Ordering::Relaxed)
202 }
203
204 pub fn is_empty(&self) -> bool {
206 self.len() == 0
207 }
208
209 pub fn capacity(&self) -> usize {
211 self.capacity
212 }
213
214 pub fn fill_ratio(&self) -> f64 {
216 if self.capacity == usize::MAX {
217 0.0
218 } else {
219 self.len() as f64 / self.capacity as f64
220 }
221 }
222
223 pub fn stats(&self) -> QueueStats {
225 QueueStats {
226 len: self.len(),
227 capacity: self.capacity,
228 total_enqueued: self.total_enqueued.load(Ordering::Relaxed),
229 total_dequeued: self.total_dequeued.load(Ordering::Relaxed),
230 blocked_enqueues: self.blocked_enqueues.load(Ordering::Relaxed),
231 }
232 }
233}
234
235#[derive(Debug, Clone)]
237pub struct QueueStats {
238 pub len: usize,
239 pub capacity: usize,
240 pub total_enqueued: u64,
241 pub total_dequeued: u64,
242 pub blocked_enqueues: u64,
243}
244
245#[derive(Debug, Clone)]
251pub struct AppendLogConfig {
252 pub segment_size: usize,
254 pub max_segments: usize,
256 pub preallocate: bool,
258}
259
260impl Default for AppendLogConfig {
261 fn default() -> Self {
262 Self {
263 segment_size: 64 * 1024 * 1024, max_segments: 4,
265 preallocate: true,
266 }
267 }
268}
269
270struct LogSegment {
272 data: Vec<u8>,
274 write_pos: AtomicUsize,
276 base_offset: u64,
278 sealed: AtomicBool,
280}
281
282impl LogSegment {
283 fn new(base_offset: u64, capacity: usize, preallocate: bool) -> Self {
284 let mut data = if preallocate {
285 vec![0u8; capacity]
286 } else {
287 Vec::with_capacity(capacity)
288 };
289
290 if !preallocate {
291 data.clear();
293 }
294
295 Self {
296 data,
297 write_pos: AtomicUsize::new(0),
298 base_offset,
299 sealed: AtomicBool::new(false),
300 }
301 }
302
303 fn try_append(&self, data: &[u8]) -> Option<(usize, u64)> {
306 if self.sealed.load(Ordering::Acquire) {
307 return None;
308 }
309
310 let needed = 4 + data.len(); loop {
314 let current_pos = self.write_pos.load(Ordering::Acquire);
315 let new_pos = current_pos + needed;
316
317 if new_pos > self.data.len() {
318 self.sealed.store(true, Ordering::Release);
320 return None;
321 }
322
323 match self.write_pos.compare_exchange_weak(
325 current_pos,
326 new_pos,
327 Ordering::AcqRel,
328 Ordering::Acquire,
329 ) {
330 Ok(_) => {
331 let ptr = self.data.as_ptr() as *mut u8;
334 unsafe {
335 let len = data.len() as u32;
337 let len_bytes = len.to_be_bytes();
338 std::ptr::copy_nonoverlapping(len_bytes.as_ptr(), ptr.add(current_pos), 4);
339
340 std::ptr::copy_nonoverlapping(
342 data.as_ptr(),
343 ptr.add(current_pos + 4),
344 data.len(),
345 );
346 }
347
348 let offset = self.base_offset + current_pos as u64;
349 return Some((current_pos, offset));
350 }
351 Err(_) => {
352 std::hint::spin_loop();
354 }
355 }
356 }
357 }
358
359 fn read(&self, position: usize) -> Option<&[u8]> {
361 let committed = self.write_pos.load(Ordering::Acquire);
362
363 if position + 4 > committed {
364 return None;
365 }
366
367 let len_bytes: [u8; 4] = self.data[position..position + 4].try_into().ok()?;
369 let len = u32::from_be_bytes(len_bytes) as usize;
370
371 if position + 4 + len > committed {
372 return None;
373 }
374
375 Some(&self.data[position + 4..position + 4 + len])
376 }
377
378 fn committed_size(&self) -> usize {
380 self.write_pos.load(Ordering::Acquire)
381 }
382
383 fn is_sealed(&self) -> bool {
385 self.sealed.load(Ordering::Acquire)
386 }
387}
388
389pub struct AppendOnlyLog {
396 config: AppendLogConfig,
398 segments: RwLock<Vec<Arc<LogSegment>>>,
400 total_bytes: AtomicU64,
402 total_entries: AtomicU64,
404}
405
406impl AppendOnlyLog {
407 pub fn new(config: AppendLogConfig) -> Self {
409 let initial_segment = Arc::new(LogSegment::new(0, config.segment_size, config.preallocate));
410
411 Self {
412 config,
413 segments: RwLock::new(vec![initial_segment]),
414 total_bytes: AtomicU64::new(0),
415 total_entries: AtomicU64::new(0),
416 }
417 }
418
419 pub fn append(&self, data: &[u8]) -> u64 {
421 loop {
422 {
424 let segments = self.segments.read();
425 if let Some(segment) = segments.last() {
426 if let Some((_, offset)) = segment.try_append(data) {
427 self.total_bytes
428 .fetch_add(data.len() as u64, Ordering::Relaxed);
429 self.total_entries.fetch_add(1, Ordering::Relaxed);
430 return offset;
431 }
432 }
433 }
434
435 self.rotate_segment();
437 }
438 }
439
440 pub fn append_batch(&self, entries: &[&[u8]]) -> Vec<u64> {
442 let mut offsets = Vec::with_capacity(entries.len());
443
444 for data in entries {
445 offsets.push(self.append(data));
446 }
447
448 offsets
449 }
450
451 fn rotate_segment(&self) {
453 let mut segments = self.segments.write();
454
455 if let Some(last) = segments.last() {
457 if !last.is_sealed() {
458 return;
460 }
461 }
462
463 let next_base = segments
465 .last()
466 .map(|s| s.base_offset + s.committed_size() as u64)
467 .unwrap_or(0);
468
469 let new_segment = Arc::new(LogSegment::new(
471 next_base,
472 self.config.segment_size,
473 self.config.preallocate,
474 ));
475
476 segments.push(new_segment);
477
478 while segments.len() > self.config.max_segments {
480 segments.remove(0);
481 }
482 }
483
484 pub fn read(&self, start_offset: u64, max_entries: usize) -> Vec<Bytes> {
486 let segments = self.segments.read();
487 let mut entries = Vec::with_capacity(max_entries);
488
489 for segment in segments.iter() {
491 if segment.base_offset > start_offset {
492 continue;
493 }
494
495 let relative_pos = (start_offset - segment.base_offset) as usize;
496 let mut pos = relative_pos;
497
498 while entries.len() < max_entries {
499 match segment.read(pos) {
500 Some(data) => {
501 entries.push(Bytes::copy_from_slice(data));
502 pos += 4 + data.len(); }
504 None => break,
505 }
506 }
507 }
508
509 entries
510 }
511
512 pub fn total_bytes(&self) -> u64 {
514 self.total_bytes.load(Ordering::Relaxed)
515 }
516
517 pub fn total_entries(&self) -> u64 {
519 self.total_entries.load(Ordering::Relaxed)
520 }
521
522 pub fn end_offset(&self) -> u64 {
524 let segments = self.segments.read();
525 segments
526 .last()
527 .map(|s| s.base_offset + s.committed_size() as u64)
528 .unwrap_or(0)
529 }
530
531 pub fn segment_count(&self) -> usize {
533 self.segments.read().len()
534 }
535}
536
537const SHARD_COUNT: usize = 64;
543
544pub struct ConcurrentHashMap<K, V> {
549 shards: [CacheAligned<RwLock<HashMap<K, V>>>; SHARD_COUNT],
550 len: AtomicUsize,
551}
552
553impl<K: Hash + Eq + Clone, V: Clone> ConcurrentHashMap<K, V> {
554 pub fn new() -> Self {
556 let shards = std::array::from_fn(|_| CacheAligned(RwLock::new(HashMap::new())));
558
559 Self {
560 shards,
561 len: AtomicUsize::new(0),
562 }
563 }
564
565 fn shard_index(&self, key: &K) -> usize {
567 let mut hasher = std::collections::hash_map::DefaultHasher::new();
568 key.hash(&mut hasher);
569 hasher.finish() as usize % SHARD_COUNT
570 }
571
572 pub fn insert(&self, key: K, value: V) -> Option<V> {
574 let shard_idx = self.shard_index(&key);
575 let mut shard = self.shards[shard_idx].write();
576
577 let old = shard.insert(key, value);
578 if old.is_none() {
579 self.len.fetch_add(1, Ordering::Relaxed);
580 }
581 old
582 }
583
584 pub fn get(&self, key: &K) -> Option<V> {
586 let shard_idx = self.shard_index(key);
587 let shard = self.shards[shard_idx].read();
588 shard.get(key).cloned()
589 }
590
591 pub fn contains_key(&self, key: &K) -> bool {
593 let shard_idx = self.shard_index(key);
594 let shard = self.shards[shard_idx].read();
595 shard.contains_key(key)
596 }
597
598 pub fn remove(&self, key: &K) -> Option<V> {
600 let shard_idx = self.shard_index(key);
601 let mut shard = self.shards[shard_idx].write();
602
603 let removed = shard.remove(key);
604 if removed.is_some() {
605 self.len.fetch_sub(1, Ordering::Relaxed);
606 }
607 removed
608 }
609
610 pub fn len(&self) -> usize {
612 self.len.load(Ordering::Relaxed)
613 }
614
615 pub fn is_empty(&self) -> bool {
617 self.len() == 0
618 }
619
620 pub fn update<F>(&self, key: &K, f: F) -> Option<V>
622 where
623 F: FnOnce(&mut V),
624 {
625 let shard_idx = self.shard_index(key);
626 let mut shard = self.shards[shard_idx].write();
627
628 if let Some(value) = shard.get_mut(key) {
629 f(value);
630 Some(value.clone())
631 } else {
632 None
633 }
634 }
635
636 pub fn get_or_insert(&self, key: K, default: V) -> V {
638 let shard_idx = self.shard_index(&key);
639 let mut shard = self.shards[shard_idx].write();
640
641 if let Some(value) = shard.get(&key) {
642 value.clone()
643 } else {
644 self.len.fetch_add(1, Ordering::Relaxed);
645 shard.insert(key, default.clone());
646 default
647 }
648 }
649
650 pub fn get_or_insert_with<F>(&self, key: K, f: F) -> V
652 where
653 F: FnOnce() -> V,
654 {
655 let shard_idx = self.shard_index(&key);
656 let mut shard = self.shards[shard_idx].write();
657
658 if let Some(value) = shard.get(&key) {
659 value.clone()
660 } else {
661 let value = f();
662 self.len.fetch_add(1, Ordering::Relaxed);
663 shard.insert(key, value.clone());
664 value
665 }
666 }
667
668 pub fn snapshot(&self) -> Vec<(K, V)> {
670 let mut entries = Vec::new();
671
672 for shard in &self.shards {
673 let shard = shard.read();
674 for (k, v) in shard.iter() {
675 entries.push((k.clone(), v.clone()));
676 }
677 }
678
679 entries
680 }
681
682 pub fn clear(&self) {
684 for shard in &self.shards {
685 shard.write().clear();
686 }
687 self.len.store(0, Ordering::Relaxed);
688 }
689}
690
691impl<K: Hash + Eq + Clone, V: Clone> Default for ConcurrentHashMap<K, V> {
692 fn default() -> Self {
693 Self::new()
694 }
695}
696
697const MAX_HEIGHT: usize = 32;
703
704struct SkipNode<K, V> {
706 key: K,
707 value: UnsafeCell<V>,
708 forward: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
710 #[allow(dead_code)]
712 height: usize,
713}
714
715impl<K, V> SkipNode<K, V> {
716 fn new(key: K, value: V, height: usize) -> *mut Self {
717 let forward = std::array::from_fn(|_| AtomicPtr::new(std::ptr::null_mut()));
718
719 let node = Box::new(Self {
720 key,
721 value: UnsafeCell::new(value),
722 forward,
723 height,
724 });
725
726 Box::into_raw(node)
727 }
728}
729
730pub struct ConcurrentSkipList<K: Ord + Clone, V: Clone> {
735 head: *mut SkipNode<K, V>,
737 max_level: AtomicUsize,
739 len: AtomicUsize,
741 rand_state: AtomicU64,
743}
744
745unsafe impl<K: Ord + Clone + Send, V: Clone + Send> Send for ConcurrentSkipList<K, V> {}
747unsafe impl<K: Ord + Clone + Sync, V: Clone + Sync> Sync for ConcurrentSkipList<K, V> {}
748
749impl<K: Ord + Clone + Default, V: Clone + Default> ConcurrentSkipList<K, V> {
750 pub fn new() -> Self {
752 let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
754
755 Self {
756 head,
757 max_level: AtomicUsize::new(1),
758 len: AtomicUsize::new(0),
759 rand_state: AtomicU64::new(0x12345678),
760 }
761 }
762
763 fn random_level(&self) -> usize {
765 let mut level = 1;
767 let mut x = self.rand_state.load(Ordering::Relaxed);
768
769 loop {
770 x ^= x << 13;
771 x ^= x >> 7;
772 x ^= x << 17;
773 self.rand_state.store(x, Ordering::Relaxed);
774
775 if x & 1 == 0 || level >= MAX_HEIGHT {
776 break;
777 }
778 level += 1;
779 }
780
781 level
782 }
783
784 pub fn insert(&self, key: K, value: V) {
786 let height = self.random_level();
787 let new_node = SkipNode::new(key.clone(), value, height);
788
789 let mut current_max = self.max_level.load(Ordering::Relaxed);
791 while height > current_max {
792 match self.max_level.compare_exchange_weak(
793 current_max,
794 height,
795 Ordering::AcqRel,
796 Ordering::Relaxed,
797 ) {
798 Ok(_) => break,
799 Err(m) => current_max = m,
800 }
801 }
802
803 let mut update = [std::ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
805 let mut current = self.head;
806
807 #[allow(clippy::needless_range_loop)]
808 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
810 unsafe {
814 loop {
815 let next = (*current).forward[i].load(Ordering::Acquire);
816 if next.is_null() || (*next).key >= key {
817 break;
818 }
819 current = next;
820 }
821 update[i] = current;
822 }
823 }
824
825 #[allow(clippy::needless_range_loop)]
827 for i in 0..height {
828 unsafe {
832 let pred = if update[i].is_null() {
833 self.head
834 } else {
835 update[i]
836 };
837 let next = (*pred).forward[i].load(Ordering::Acquire);
838 (*new_node).forward[i].store(next, Ordering::Release);
839 (*pred).forward[i].store(new_node, Ordering::Release);
840 }
841 }
842
843 self.len.fetch_add(1, Ordering::Relaxed);
844 }
845
846 pub fn get(&self, key: &K) -> Option<V> {
848 let mut current = self.head;
849
850 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
851 unsafe {
855 loop {
856 let next = (*current).forward[i].load(Ordering::Acquire);
857 if next.is_null() {
858 break;
859 }
860 if (*next).key == *key {
861 return Some((*(*next).value.get()).clone());
862 }
863 if (*next).key > *key {
864 break;
865 }
866 current = next;
867 }
868 }
869 }
870
871 None
872 }
873
874 pub fn floor(&self, key: &K) -> Option<(K, V)> {
877 let mut current = self.head;
878 let mut result: Option<*mut SkipNode<K, V>> = None;
879
880 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
881 unsafe {
885 loop {
886 let next = (*current).forward[i].load(Ordering::Acquire);
887 if next.is_null() {
888 break;
889 }
890 if (*next).key <= *key {
891 result = Some(next);
892 current = next;
893 } else {
894 break;
895 }
896 }
897 }
898 }
899
900 result.map(|node| unsafe { ((*node).key.clone(), (*(*node).value.get()).clone()) })
902 }
903
904 pub fn ceiling(&self, key: &K) -> Option<(K, V)> {
906 let mut current = self.head;
907
908 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
909 unsafe {
912 loop {
913 let next = (*current).forward[i].load(Ordering::Acquire);
914 if next.is_null() || (*next).key >= *key {
915 break;
916 }
917 current = next;
918 }
919 }
920 }
921
922 unsafe {
925 let next = (*current).forward[0].load(Ordering::Acquire);
926 if !next.is_null() {
927 Some(((*next).key.clone(), (*(*next).value.get()).clone()))
928 } else {
929 None
930 }
931 }
932 }
933
934 pub fn len(&self) -> usize {
936 self.len.load(Ordering::Relaxed)
937 }
938
939 pub fn is_empty(&self) -> bool {
941 self.len() == 0
942 }
943
944 pub fn range(&self, start: &K, end: &K, limit: usize) -> Vec<(K, V)> {
946 let mut entries = Vec::with_capacity(limit.min(1000));
947 let mut current = self.head;
948
949 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
951 unsafe {
954 loop {
955 let next = (*current).forward[i].load(Ordering::Acquire);
956 if next.is_null() || (*next).key >= *start {
957 break;
958 }
959 current = next;
960 }
961 }
962 }
963
964 unsafe {
968 let mut node = (*current).forward[0].load(Ordering::Acquire);
969 while !node.is_null() && entries.len() < limit {
970 if (*node).key > *end {
971 break;
972 }
973 entries.push(((*node).key.clone(), (*(*node).value.get()).clone()));
974 node = (*node).forward[0].load(Ordering::Acquire);
975 }
976 }
977
978 entries
979 }
980}
981
982impl<K: Ord + Clone + Default, V: Clone + Default> Default for ConcurrentSkipList<K, V> {
983 fn default() -> Self {
984 Self::new()
985 }
986}
987
988impl<K: Ord + Clone, V: Clone> Drop for ConcurrentSkipList<K, V> {
989 fn drop(&mut self) {
990 let mut current = self.head;
992 unsafe {
996 while !current.is_null() {
997 let next = (*current).forward[0].load(Ordering::Relaxed);
998 drop(Box::from_raw(current));
999 current = next;
1000 }
1001 }
1002 }
1003}
1004
1005#[cfg(test)]
1010mod tests {
1011 use super::*;
1012 use std::sync::Arc;
1013 use std::thread;
1014
1015 #[test]
1016 fn test_lock_free_queue() {
1017 let queue = LockFreeQueue::<i32>::bounded(100);
1018
1019 assert!(queue.is_empty());
1020
1021 queue.push(1).unwrap();
1022 queue.push(2).unwrap();
1023 queue.push(3).unwrap();
1024
1025 assert_eq!(queue.len(), 3);
1026 assert_eq!(queue.pop(), Some(1));
1027 assert_eq!(queue.pop(), Some(2));
1028 assert_eq!(queue.pop(), Some(3));
1029 assert!(queue.is_empty());
1030 }
1031
1032 #[test]
1033 fn test_lock_free_queue_concurrent() {
1034 let queue = Arc::new(LockFreeQueue::<i32>::bounded(1000));
1035 let mut handles = vec![];
1036
1037 for i in 0..4 {
1039 let q = queue.clone();
1040 handles.push(thread::spawn(move || {
1041 for j in 0..250 {
1042 q.push(i * 250 + j).unwrap();
1043 }
1044 }));
1045 }
1046
1047 for h in handles {
1049 h.join().unwrap();
1050 }
1051
1052 assert_eq!(queue.len(), 1000);
1053
1054 let batch = queue.pop_batch(1000);
1056 assert_eq!(batch.len(), 1000);
1057 }
1058
1059 #[test]
1060 fn test_append_only_log() {
1061 let config = AppendLogConfig {
1062 segment_size: 1024,
1063 max_segments: 4,
1064 preallocate: true,
1065 };
1066 let log = AppendOnlyLog::new(config);
1067
1068 let offset1 = log.append(b"hello");
1069 let offset2 = log.append(b"world");
1070
1071 assert!(offset2 > offset1);
1072
1073 let entries = log.read(offset1, 10);
1074 assert_eq!(entries.len(), 2);
1075 assert_eq!(&entries[0][..], b"hello");
1076 assert_eq!(&entries[1][..], b"world");
1077 }
1078
1079 #[test]
1080 fn test_concurrent_hash_map() {
1081 let map = Arc::new(ConcurrentHashMap::<String, i32>::new());
1082 let mut handles = vec![];
1083
1084 for i in 0..4 {
1086 let m = map.clone();
1087 handles.push(thread::spawn(move || {
1088 for j in 0..250 {
1089 m.insert(format!("key-{}-{}", i, j), i * 250 + j);
1090 }
1091 }));
1092 }
1093
1094 for h in handles {
1095 h.join().unwrap();
1096 }
1097
1098 assert_eq!(map.len(), 1000);
1099
1100 assert_eq!(map.get(&"key-0-0".to_string()), Some(0));
1102 assert_eq!(map.get(&"key-3-249".to_string()), Some(999));
1103 }
1104
1105 #[test]
1106 fn test_skip_list() {
1107 let list = ConcurrentSkipList::<u64, String>::new();
1108
1109 list.insert(10, "ten".to_string());
1110 list.insert(20, "twenty".to_string());
1111 list.insert(5, "five".to_string());
1112 list.insert(15, "fifteen".to_string());
1113
1114 assert_eq!(list.len(), 4);
1115 assert_eq!(list.get(&10), Some("ten".to_string()));
1116 assert_eq!(list.get(&99), None);
1117
1118 assert_eq!(list.floor(&12), Some((10, "ten".to_string())));
1120 assert_eq!(list.floor(&15), Some((15, "fifteen".to_string())));
1121
1122 assert_eq!(list.ceiling(&12), Some((15, "fifteen".to_string())));
1124 assert_eq!(list.ceiling(&1), Some((5, "five".to_string())));
1125 }
1126
1127 #[test]
1128 fn test_skip_list_range() {
1129 let list = ConcurrentSkipList::<u64, String>::new();
1130
1131 for i in 0..100 {
1132 list.insert(i * 10, format!("value-{}", i));
1133 }
1134
1135 let range = list.range(&150, &350, 100);
1136 assert!(!range.is_empty());
1137
1138 for (k, _) in &range {
1140 assert!(*k >= 150 && *k <= 350);
1141 }
1142 }
1143}