1use bytes::Bytes;
45use crossbeam_channel::{bounded, unbounded, Receiver, Sender, TryRecvError, TrySendError};
46use parking_lot::RwLock;
47use std::cell::UnsafeCell;
48use std::collections::HashMap;
49use std::hash::{Hash, Hasher};
50use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering};
51use std::sync::Arc;
52
53#[repr(C, align(64))]
59pub struct CacheAligned<T>(pub T);
60
61impl<T> std::ops::Deref for CacheAligned<T> {
62 type Target = T;
63 fn deref(&self) -> &T {
64 &self.0
65 }
66}
67
68impl<T> std::ops::DerefMut for CacheAligned<T> {
69 fn deref_mut(&mut self) -> &mut T {
70 &mut self.0
71 }
72}
73
74pub struct LockFreeQueue<T> {
85 sender: Sender<T>,
86 receiver: Receiver<T>,
87 capacity: usize,
88 len: AtomicUsize,
90 total_enqueued: AtomicU64,
92 total_dequeued: AtomicU64,
94 blocked_enqueues: AtomicU64,
96}
97
98impl<T> LockFreeQueue<T> {
99 pub fn bounded(capacity: usize) -> Self {
101 let (sender, receiver) = bounded(capacity);
102 Self {
103 sender,
104 receiver,
105 capacity,
106 len: AtomicUsize::new(0),
107 total_enqueued: AtomicU64::new(0),
108 total_dequeued: AtomicU64::new(0),
109 blocked_enqueues: AtomicU64::new(0),
110 }
111 }
112
113 pub fn unbounded() -> Self {
115 let (sender, receiver) = unbounded();
116 Self {
117 sender,
118 receiver,
119 capacity: usize::MAX,
120 len: AtomicUsize::new(0),
121 total_enqueued: AtomicU64::new(0),
122 total_dequeued: AtomicU64::new(0),
123 blocked_enqueues: AtomicU64::new(0),
124 }
125 }
126
127 pub fn try_push(&self, item: T) -> Result<(), T> {
129 match self.sender.try_send(item) {
130 Ok(()) => {
131 self.len.fetch_add(1, Ordering::Relaxed);
132 self.total_enqueued.fetch_add(1, Ordering::Relaxed);
133 Ok(())
134 }
135 Err(TrySendError::Full(item)) => {
136 self.blocked_enqueues.fetch_add(1, Ordering::Relaxed);
137 Err(item)
138 }
139 Err(TrySendError::Disconnected(item)) => Err(item),
140 }
141 }
142
143 pub fn push(&self, item: T) -> Result<(), T> {
145 match self.sender.send(item) {
146 Ok(()) => {
147 self.len.fetch_add(1, Ordering::Relaxed);
148 self.total_enqueued.fetch_add(1, Ordering::Relaxed);
149 Ok(())
150 }
151 Err(e) => Err(e.0),
152 }
153 }
154
155 pub fn try_pop(&self) -> Option<T> {
157 match self.receiver.try_recv() {
158 Ok(item) => {
159 self.len.fetch_sub(1, Ordering::Relaxed);
160 self.total_dequeued.fetch_add(1, Ordering::Relaxed);
161 Some(item)
162 }
163 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => None,
164 }
165 }
166
167 pub fn pop(&self) -> Option<T> {
169 match self.receiver.recv() {
170 Ok(item) => {
171 self.len.fetch_sub(1, Ordering::Relaxed);
172 self.total_dequeued.fetch_add(1, Ordering::Relaxed);
173 Some(item)
174 }
175 Err(_) => None,
176 }
177 }
178
179 pub fn pop_batch(&self, max: usize) -> Vec<T> {
181 let mut batch = Vec::with_capacity(max.min(64));
182 for _ in 0..max {
183 match self.receiver.try_recv() {
184 Ok(item) => {
185 batch.push(item);
186 }
187 Err(_) => break,
188 }
189 }
190 let count = batch.len();
191 if count > 0 {
192 self.len.fetch_sub(count, Ordering::Relaxed);
193 self.total_dequeued
194 .fetch_add(count as u64, Ordering::Relaxed);
195 }
196 batch
197 }
198
199 pub fn len(&self) -> usize {
201 self.len.load(Ordering::Relaxed)
202 }
203
204 pub fn is_empty(&self) -> bool {
206 self.len() == 0
207 }
208
209 pub fn capacity(&self) -> usize {
211 self.capacity
212 }
213
214 pub fn fill_ratio(&self) -> f64 {
216 if self.capacity == usize::MAX {
217 0.0
218 } else {
219 self.len() as f64 / self.capacity as f64
220 }
221 }
222
223 pub fn stats(&self) -> QueueStats {
225 QueueStats {
226 len: self.len(),
227 capacity: self.capacity,
228 total_enqueued: self.total_enqueued.load(Ordering::Relaxed),
229 total_dequeued: self.total_dequeued.load(Ordering::Relaxed),
230 blocked_enqueues: self.blocked_enqueues.load(Ordering::Relaxed),
231 }
232 }
233}
234
235#[derive(Debug, Clone)]
237pub struct QueueStats {
238 pub len: usize,
239 pub capacity: usize,
240 pub total_enqueued: u64,
241 pub total_dequeued: u64,
242 pub blocked_enqueues: u64,
243}
244
245#[derive(Debug, Clone)]
251pub struct AppendLogConfig {
252 pub segment_size: usize,
254 pub max_segments: usize,
256 pub preallocate: bool,
258}
259
260impl Default for AppendLogConfig {
261 fn default() -> Self {
262 Self {
263 segment_size: 64 * 1024 * 1024, max_segments: 4,
265 preallocate: true,
266 }
267 }
268}
269
270struct LogSegment {
272 data: UnsafeCell<Vec<u8>>,
274 write_pos: AtomicUsize,
276 capacity: usize,
278 base_offset: u64,
280 sealed: AtomicBool,
282}
283
284unsafe impl Send for LogSegment {}
288unsafe impl Sync for LogSegment {}
289
290impl LogSegment {
291 fn new(base_offset: u64, capacity: usize, preallocate: bool) -> Self {
292 let data = if preallocate {
293 vec![0u8; capacity]
294 } else {
295 let v = vec![0; capacity];
296 v
297 };
298
299 Self {
300 capacity,
301 data: UnsafeCell::new(data),
302 write_pos: AtomicUsize::new(0),
303 base_offset,
304 sealed: AtomicBool::new(false),
305 }
306 }
307
308 fn try_append(&self, data: &[u8]) -> Option<(usize, u64)> {
311 if self.sealed.load(Ordering::Acquire) {
312 return None;
313 }
314
315 let needed = 4 + data.len(); loop {
319 let current_pos = self.write_pos.load(Ordering::Acquire);
320 let new_pos = current_pos + needed;
321
322 if new_pos > self.capacity {
323 self.sealed.store(true, Ordering::Release);
325 return None;
326 }
327
328 match self.write_pos.compare_exchange_weak(
330 current_pos,
331 new_pos,
332 Ordering::AcqRel,
333 Ordering::Acquire,
334 ) {
335 Ok(_) => {
336 unsafe {
341 let buf = &mut *self.data.get();
342 let ptr = buf.as_mut_ptr();
343 let len = data.len() as u32;
345 let len_bytes = len.to_be_bytes();
346 std::ptr::copy_nonoverlapping(len_bytes.as_ptr(), ptr.add(current_pos), 4);
347
348 std::ptr::copy_nonoverlapping(
350 data.as_ptr(),
351 ptr.add(current_pos + 4),
352 data.len(),
353 );
354 }
355
356 let offset = self.base_offset + current_pos as u64;
357 return Some((current_pos, offset));
358 }
359 Err(_) => {
360 std::hint::spin_loop();
362 }
363 }
364 }
365 }
366
367 fn read(&self, position: usize) -> Option<&[u8]> {
369 let committed = self.write_pos.load(Ordering::Acquire);
370
371 if position + 4 > committed {
372 return None;
373 }
374
375 let buf = unsafe { &*self.data.get() };
378
379 let len_bytes: [u8; 4] = buf[position..position + 4].try_into().ok()?;
381 let len = u32::from_be_bytes(len_bytes) as usize;
382
383 if position + 4 + len > committed {
384 return None;
385 }
386
387 Some(&buf[position + 4..position + 4 + len])
388 }
389
390 fn committed_size(&self) -> usize {
392 self.write_pos.load(Ordering::Acquire)
393 }
394
395 fn is_sealed(&self) -> bool {
397 self.sealed.load(Ordering::Acquire)
398 }
399}
400
401pub struct AppendOnlyLog {
408 config: AppendLogConfig,
410 segments: RwLock<Vec<Arc<LogSegment>>>,
412 total_bytes: AtomicU64,
414 total_entries: AtomicU64,
416}
417
418impl AppendOnlyLog {
419 pub fn new(config: AppendLogConfig) -> Self {
421 let initial_segment = Arc::new(LogSegment::new(0, config.segment_size, config.preallocate));
422
423 Self {
424 config,
425 segments: RwLock::new(vec![initial_segment]),
426 total_bytes: AtomicU64::new(0),
427 total_entries: AtomicU64::new(0),
428 }
429 }
430
431 pub fn append(&self, data: &[u8]) -> u64 {
433 loop {
434 {
436 let segments = self.segments.read();
437 if let Some(segment) = segments.last() {
438 if let Some((_, offset)) = segment.try_append(data) {
439 self.total_bytes
440 .fetch_add(data.len() as u64, Ordering::Relaxed);
441 self.total_entries.fetch_add(1, Ordering::Relaxed);
442 return offset;
443 }
444 }
445 }
446
447 self.rotate_segment();
449 }
450 }
451
452 pub fn append_batch(&self, entries: &[&[u8]]) -> Vec<u64> {
454 let mut offsets = Vec::with_capacity(entries.len());
455
456 for data in entries {
457 offsets.push(self.append(data));
458 }
459
460 offsets
461 }
462
463 fn rotate_segment(&self) {
465 let mut segments = self.segments.write();
466
467 if let Some(last) = segments.last() {
469 if !last.is_sealed() {
470 return;
472 }
473 }
474
475 let next_base = segments
477 .last()
478 .map(|s| s.base_offset + s.committed_size() as u64)
479 .unwrap_or(0);
480
481 let new_segment = Arc::new(LogSegment::new(
483 next_base,
484 self.config.segment_size,
485 self.config.preallocate,
486 ));
487
488 segments.push(new_segment);
489
490 while segments.len() > self.config.max_segments {
492 segments.remove(0);
493 }
494 }
495
496 pub fn read(&self, start_offset: u64, max_entries: usize) -> Vec<Bytes> {
498 let segments = self.segments.read();
499 let mut entries = Vec::with_capacity(max_entries);
500
501 for segment in segments.iter() {
503 if segment.base_offset > start_offset {
504 continue;
505 }
506
507 let relative_pos = (start_offset - segment.base_offset) as usize;
508 let mut pos = relative_pos;
509
510 while entries.len() < max_entries {
511 match segment.read(pos) {
512 Some(data) => {
513 entries.push(Bytes::copy_from_slice(data));
514 pos += 4 + data.len(); }
516 None => break,
517 }
518 }
519 }
520
521 entries
522 }
523
524 pub fn total_bytes(&self) -> u64 {
526 self.total_bytes.load(Ordering::Relaxed)
527 }
528
529 pub fn total_entries(&self) -> u64 {
531 self.total_entries.load(Ordering::Relaxed)
532 }
533
534 pub fn end_offset(&self) -> u64 {
536 let segments = self.segments.read();
537 segments
538 .last()
539 .map(|s| s.base_offset + s.committed_size() as u64)
540 .unwrap_or(0)
541 }
542
543 pub fn segment_count(&self) -> usize {
545 self.segments.read().len()
546 }
547}
548
549const SHARD_COUNT: usize = 64;
555
556pub struct ConcurrentHashMap<K, V> {
561 shards: [CacheAligned<RwLock<HashMap<K, V>>>; SHARD_COUNT],
562 len: AtomicUsize,
563}
564
565impl<K: Hash + Eq + Clone, V: Clone> ConcurrentHashMap<K, V> {
566 pub fn new() -> Self {
568 let shards = std::array::from_fn(|_| CacheAligned(RwLock::new(HashMap::new())));
570
571 Self {
572 shards,
573 len: AtomicUsize::new(0),
574 }
575 }
576
577 fn shard_index(&self, key: &K) -> usize {
579 let mut hasher = std::collections::hash_map::DefaultHasher::new();
580 key.hash(&mut hasher);
581 hasher.finish() as usize % SHARD_COUNT
582 }
583
584 pub fn insert(&self, key: K, value: V) -> Option<V> {
586 let shard_idx = self.shard_index(&key);
587 let mut shard = self.shards[shard_idx].write();
588
589 let old = shard.insert(key, value);
590 if old.is_none() {
591 self.len.fetch_add(1, Ordering::Relaxed);
592 }
593 old
594 }
595
596 pub fn get(&self, key: &K) -> Option<V> {
598 let shard_idx = self.shard_index(key);
599 let shard = self.shards[shard_idx].read();
600 shard.get(key).cloned()
601 }
602
603 pub fn contains_key(&self, key: &K) -> bool {
605 let shard_idx = self.shard_index(key);
606 let shard = self.shards[shard_idx].read();
607 shard.contains_key(key)
608 }
609
610 pub fn remove(&self, key: &K) -> Option<V> {
612 let shard_idx = self.shard_index(key);
613 let mut shard = self.shards[shard_idx].write();
614
615 let removed = shard.remove(key);
616 if removed.is_some() {
617 self.len.fetch_sub(1, Ordering::Relaxed);
618 }
619 removed
620 }
621
622 pub fn len(&self) -> usize {
624 self.len.load(Ordering::Relaxed)
625 }
626
627 pub fn is_empty(&self) -> bool {
629 self.len() == 0
630 }
631
632 pub fn update<F>(&self, key: &K, f: F) -> Option<V>
634 where
635 F: FnOnce(&mut V),
636 {
637 let shard_idx = self.shard_index(key);
638 let mut shard = self.shards[shard_idx].write();
639
640 if let Some(value) = shard.get_mut(key) {
641 f(value);
642 Some(value.clone())
643 } else {
644 None
645 }
646 }
647
648 pub fn get_or_insert(&self, key: K, default: V) -> V {
650 let shard_idx = self.shard_index(&key);
651 let mut shard = self.shards[shard_idx].write();
652
653 if let Some(value) = shard.get(&key) {
654 value.clone()
655 } else {
656 self.len.fetch_add(1, Ordering::Relaxed);
657 shard.insert(key, default.clone());
658 default
659 }
660 }
661
662 pub fn get_or_insert_with<F>(&self, key: K, f: F) -> V
664 where
665 F: FnOnce() -> V,
666 {
667 let shard_idx = self.shard_index(&key);
668 let mut shard = self.shards[shard_idx].write();
669
670 if let Some(value) = shard.get(&key) {
671 value.clone()
672 } else {
673 let value = f();
674 self.len.fetch_add(1, Ordering::Relaxed);
675 shard.insert(key, value.clone());
676 value
677 }
678 }
679
680 pub fn snapshot(&self) -> Vec<(K, V)> {
682 let mut entries = Vec::new();
683
684 for shard in &self.shards {
685 let shard = shard.read();
686 for (k, v) in shard.iter() {
687 entries.push((k.clone(), v.clone()));
688 }
689 }
690
691 entries
692 }
693
694 pub fn clear(&self) {
696 for shard in &self.shards {
697 shard.write().clear();
698 }
699 self.len.store(0, Ordering::Relaxed);
700 }
701}
702
703impl<K: Hash + Eq + Clone, V: Clone> Default for ConcurrentHashMap<K, V> {
704 fn default() -> Self {
705 Self::new()
706 }
707}
708
709const MAX_HEIGHT: usize = 32;
715
716struct SkipNode<K, V> {
718 key: K,
719 value: UnsafeCell<V>,
720 forward: [AtomicPtr<SkipNode<K, V>>; MAX_HEIGHT],
722 #[allow(dead_code)]
724 height: usize,
725}
726
727impl<K, V> SkipNode<K, V> {
728 fn new(key: K, value: V, height: usize) -> *mut Self {
729 let forward = std::array::from_fn(|_| AtomicPtr::new(std::ptr::null_mut()));
730
731 let node = Box::new(Self {
732 key,
733 value: UnsafeCell::new(value),
734 forward,
735 height,
736 });
737
738 Box::into_raw(node)
739 }
740}
741
742pub struct ConcurrentSkipList<K: Ord + Clone, V: Clone> {
764 head: *mut SkipNode<K, V>,
766 max_level: AtomicUsize,
768 len: AtomicUsize,
770 rand_state: AtomicU64,
772}
773
774unsafe impl<K: Ord + Clone + Send, V: Clone + Send> Send for ConcurrentSkipList<K, V> {}
776unsafe impl<K: Ord + Clone + Sync, V: Clone + Sync> Sync for ConcurrentSkipList<K, V> {}
777
778impl<K: Ord + Clone + Default, V: Clone + Default> ConcurrentSkipList<K, V> {
779 pub fn new() -> Self {
781 let head = SkipNode::new(K::default(), V::default(), MAX_HEIGHT);
783
784 let seed = Self::generate_seed();
786
787 Self {
788 head,
789 max_level: AtomicUsize::new(1),
790 len: AtomicUsize::new(0),
791 rand_state: AtomicU64::new(seed),
792 }
793 }
794
795 fn generate_seed() -> u64 {
797 use std::collections::hash_map::RandomState;
798 use std::hash::{BuildHasher, Hasher};
799
800 let state = RandomState::new();
802 let mut hasher = state.build_hasher();
803
804 hasher.write_u64(std::process::id().into());
806
807 if let Ok(time) = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
808 hasher.write_u64(time.as_nanos() as u64);
809 }
810
811 hasher.write_usize(&hasher as *const _ as usize);
813
814 hasher.finish().max(1)
816 }
817
818 fn random_level(&self) -> usize {
820 let mut level = 1;
822
823 let x = self
824 .rand_state
825 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |mut x| {
826 x ^= x << 13;
827 x ^= x >> 7;
828 x ^= x << 17;
829 Some(x)
830 })
831 .unwrap_or(1);
832
833 let mut bits = x;
834 while bits & 1 == 0 && level < MAX_HEIGHT {
835 level += 1;
836 bits >>= 1;
837 }
838
839 level
840 }
841
842 pub fn insert(&self, key: K, value: V) {
844 let height = self.random_level();
845 let new_node = SkipNode::new(key.clone(), value, height);
846
847 let mut current_max = self.max_level.load(Ordering::Relaxed);
849 while height > current_max {
850 match self.max_level.compare_exchange_weak(
851 current_max,
852 height,
853 Ordering::AcqRel,
854 Ordering::Relaxed,
855 ) {
856 Ok(_) => break,
857 Err(m) => current_max = m,
858 }
859 }
860
861 let mut update = [std::ptr::null_mut::<SkipNode<K, V>>(); MAX_HEIGHT];
863 let mut current = self.head;
864
865 #[allow(clippy::needless_range_loop)]
866 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
868 unsafe {
872 loop {
873 let next = (*current).forward[i].load(Ordering::Acquire);
874 if next.is_null() || (*next).key >= key {
875 break;
876 }
877 current = next;
878 }
879 update[i] = current;
880 }
881 }
882
883 #[allow(clippy::needless_range_loop)]
885 for i in 0..height {
886 unsafe {
890 let pred = if update[i].is_null() {
891 self.head
892 } else {
893 update[i]
894 };
895 let next = (*pred).forward[i].load(Ordering::Acquire);
896 (*new_node).forward[i].store(next, Ordering::Release);
897 (*pred).forward[i].store(new_node, Ordering::Release);
898 }
899 }
900
901 self.len.fetch_add(1, Ordering::Relaxed);
902 }
903
904 pub fn get(&self, key: &K) -> Option<V> {
906 let mut current = self.head;
907
908 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
909 unsafe {
913 loop {
914 let next = (*current).forward[i].load(Ordering::Acquire);
915 if next.is_null() {
916 break;
917 }
918 if (*next).key == *key {
919 return Some((*(*next).value.get()).clone());
920 }
921 if (*next).key > *key {
922 break;
923 }
924 current = next;
925 }
926 }
927 }
928
929 None
930 }
931
932 pub fn floor(&self, key: &K) -> Option<(K, V)> {
935 let mut current = self.head;
936 let mut result: Option<*mut SkipNode<K, V>> = None;
937
938 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
939 unsafe {
943 loop {
944 let next = (*current).forward[i].load(Ordering::Acquire);
945 if next.is_null() {
946 break;
947 }
948 if (*next).key <= *key {
949 result = Some(next);
950 current = next;
951 } else {
952 break;
953 }
954 }
955 }
956 }
957
958 result.map(|node| unsafe { ((*node).key.clone(), (*(*node).value.get()).clone()) })
960 }
961
962 pub fn ceiling(&self, key: &K) -> Option<(K, V)> {
964 let mut current = self.head;
965
966 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
967 unsafe {
970 loop {
971 let next = (*current).forward[i].load(Ordering::Acquire);
972 if next.is_null() || (*next).key >= *key {
973 break;
974 }
975 current = next;
976 }
977 }
978 }
979
980 unsafe {
983 let next = (*current).forward[0].load(Ordering::Acquire);
984 if !next.is_null() {
985 Some(((*next).key.clone(), (*(*next).value.get()).clone()))
986 } else {
987 None
988 }
989 }
990 }
991
992 pub fn len(&self) -> usize {
994 self.len.load(Ordering::Relaxed)
995 }
996
997 pub fn is_empty(&self) -> bool {
999 self.len() == 0
1000 }
1001
1002 pub fn range(&self, start: &K, end: &K, limit: usize) -> Vec<(K, V)> {
1004 let mut entries = Vec::with_capacity(limit.min(1000));
1005 let mut current = self.head;
1006
1007 for i in (0..self.max_level.load(Ordering::Acquire)).rev() {
1009 unsafe {
1012 loop {
1013 let next = (*current).forward[i].load(Ordering::Acquire);
1014 if next.is_null() || (*next).key >= *start {
1015 break;
1016 }
1017 current = next;
1018 }
1019 }
1020 }
1021
1022 unsafe {
1026 let mut node = (*current).forward[0].load(Ordering::Acquire);
1027 while !node.is_null() && entries.len() < limit {
1028 if (*node).key > *end {
1029 break;
1030 }
1031 entries.push(((*node).key.clone(), (*(*node).value.get()).clone()));
1032 node = (*node).forward[0].load(Ordering::Acquire);
1033 }
1034 }
1035
1036 entries
1037 }
1038}
1039
1040impl<K: Ord + Clone + Default, V: Clone + Default> Default for ConcurrentSkipList<K, V> {
1041 fn default() -> Self {
1042 Self::new()
1043 }
1044}
1045
1046impl<K: Ord + Clone, V: Clone> Drop for ConcurrentSkipList<K, V> {
1047 fn drop(&mut self) {
1048 let mut current = self.head;
1050 unsafe {
1054 while !current.is_null() {
1055 let next = (*current).forward[0].load(Ordering::Relaxed);
1056 drop(Box::from_raw(current));
1057 current = next;
1058 }
1059 }
1060 }
1061}
1062
1063#[cfg(test)]
1068mod tests {
1069 use super::*;
1070 use std::sync::Arc;
1071 use std::thread;
1072
1073 #[test]
1074 fn test_lock_free_queue() {
1075 let queue = LockFreeQueue::<i32>::bounded(100);
1076
1077 assert!(queue.is_empty());
1078
1079 queue.push(1).unwrap();
1080 queue.push(2).unwrap();
1081 queue.push(3).unwrap();
1082
1083 assert_eq!(queue.len(), 3);
1084 assert_eq!(queue.pop(), Some(1));
1085 assert_eq!(queue.pop(), Some(2));
1086 assert_eq!(queue.pop(), Some(3));
1087 assert!(queue.is_empty());
1088 }
1089
1090 #[test]
1091 fn test_lock_free_queue_concurrent() {
1092 let queue = Arc::new(LockFreeQueue::<i32>::bounded(1000));
1093 let mut handles = vec![];
1094
1095 for i in 0..4 {
1097 let q = queue.clone();
1098 handles.push(thread::spawn(move || {
1099 for j in 0..250 {
1100 q.push(i * 250 + j).unwrap();
1101 }
1102 }));
1103 }
1104
1105 for h in handles {
1107 h.join().unwrap();
1108 }
1109
1110 assert_eq!(queue.len(), 1000);
1111
1112 let batch = queue.pop_batch(1000);
1114 assert_eq!(batch.len(), 1000);
1115 }
1116
1117 #[test]
1118 fn test_append_only_log() {
1119 let config = AppendLogConfig {
1120 segment_size: 1024,
1121 max_segments: 4,
1122 preallocate: true,
1123 };
1124 let log = AppendOnlyLog::new(config);
1125
1126 let offset1 = log.append(b"hello");
1127 let offset2 = log.append(b"world");
1128
1129 assert!(offset2 > offset1);
1130
1131 let entries = log.read(offset1, 10);
1132 assert_eq!(entries.len(), 2);
1133 assert_eq!(&entries[0][..], b"hello");
1134 assert_eq!(&entries[1][..], b"world");
1135 }
1136
1137 #[test]
1138 fn test_concurrent_hash_map() {
1139 let map = Arc::new(ConcurrentHashMap::<String, i32>::new());
1140 let mut handles = vec![];
1141
1142 for i in 0..4 {
1144 let m = map.clone();
1145 handles.push(thread::spawn(move || {
1146 for j in 0..250 {
1147 m.insert(format!("key-{}-{}", i, j), i * 250 + j);
1148 }
1149 }));
1150 }
1151
1152 for h in handles {
1153 h.join().unwrap();
1154 }
1155
1156 assert_eq!(map.len(), 1000);
1157
1158 assert_eq!(map.get(&"key-0-0".to_string()), Some(0));
1160 assert_eq!(map.get(&"key-3-249".to_string()), Some(999));
1161 }
1162
1163 #[test]
1164 fn test_skip_list() {
1165 let list = ConcurrentSkipList::<u64, String>::new();
1166
1167 list.insert(10, "ten".to_string());
1168 list.insert(20, "twenty".to_string());
1169 list.insert(5, "five".to_string());
1170 list.insert(15, "fifteen".to_string());
1171
1172 assert_eq!(list.len(), 4);
1173 assert_eq!(list.get(&10), Some("ten".to_string()));
1174 assert_eq!(list.get(&99), None);
1175
1176 assert_eq!(list.floor(&12), Some((10, "ten".to_string())));
1178 assert_eq!(list.floor(&15), Some((15, "fifteen".to_string())));
1179
1180 assert_eq!(list.ceiling(&12), Some((15, "fifteen".to_string())));
1182 assert_eq!(list.ceiling(&1), Some((5, "five".to_string())));
1183 }
1184
1185 #[test]
1186 fn test_skip_list_range() {
1187 let list = ConcurrentSkipList::<u64, String>::new();
1188
1189 for i in 0..100 {
1190 list.insert(i * 10, format!("value-{}", i));
1191 }
1192
1193 let range = list.range(&150, &350, 100);
1194 assert!(!range.is_empty());
1195
1196 for (k, _) in &range {
1198 assert!(*k >= 150 && *k <= 350);
1199 }
1200 }
1201}