magic_orb/
magic_orb.rs

1#[cfg(feature = "not_nightly")]
2use crate::sync_unsafe_cell::SyncUnsafeCell;
3#[cfg(not(feature = "not_nightly"))]
4use std::cell::SyncUnsafeCell;
5
6use std::{
7    sync::{
8        Arc,
9        atomic::{AtomicBool, AtomicUsize, Ordering},
10    },
11};
12
13#[derive(Debug, Clone)]
14pub struct MagicOrb<T>
15where
16    T: Clone + Send,
17{
18    buf: Arc<SyncUnsafeCell<Vec<T>>>,
19    write: Arc<AtomicUsize>,
20    lock: Arc<AtomicBool>,
21    len: usize,
22}
23
24impl<T: Default + Clone + Send> MagicOrb<T> {
25    pub fn new_default(size: usize) -> Self {
26        if size == 0 {
27            panic!("Can't crate MagicOrb with buffer size 0");
28        }
29        MagicOrb {
30            buf: Arc::new(SyncUnsafeCell::new(vec![T::default(); size])),
31            write: Arc::new(AtomicUsize::new(0)),
32            lock: Arc::new(AtomicBool::new(false)),
33            len: size,
34        }
35    }
36}
37
38impl<T: Clone + Send> From<Vec<T>> for MagicOrb<T> {
39    fn from(value: Vec<T>) -> Self {
40        if value.is_empty() {
41            panic!("Can't crate MagicOrb with buffer size 0");
42        }
43        MagicOrb {
44            len: value.len(),
45            buf: Arc::new(SyncUnsafeCell::new(value)),
46            write: Arc::new(AtomicUsize::new(0)),
47            lock: Arc::new(AtomicBool::new(false)),
48        }
49    }
50}
51
52impl<T: Clone + Send> MagicOrb<T> {
53    pub fn new(size: usize, default_val: T) -> Self {
54        if size == 0 {
55            panic!("Can't crate MagicOrb with buffer size 0");
56        }
57        MagicOrb {
58            buf: Arc::new(SyncUnsafeCell::new(vec![default_val; size])),
59            write: Arc::new(AtomicUsize::new(0)),
60            lock: Arc::new(AtomicBool::new(false)),
61            len: size,
62        }
63    }
64
65    pub fn push_slice_overwrite(&self, data: &[T]) {
66        let occupied = self.len;
67        let data = if data.len() > occupied {
68            &data[data.len() - occupied..]
69        } else {
70            data
71        };
72
73        self.take_lock();
74        {
75            // SAFETY: Lock prevents aliasing &mut T
76            // Guarantees required: should be between self.take_lock() and self.return_lock()
77            let buf = unsafe { self.buf.get().as_mut().unwrap() };
78            let write = self.write.load(Ordering::Relaxed);
79
80            if data.len() + write <= occupied {
81                buf[write..write + data.len()].clone_from_slice(data);
82                self.write
83                    .store((write + data.len()) % occupied, Ordering::Relaxed);
84            } else {
85                let first_len = occupied - write;
86                buf[write..].clone_from_slice(&data[..first_len]);
87                buf[..data.len() - first_len].clone_from_slice(&data[first_len..]);
88            }
89        }
90        self.return_lock();
91    }
92
93    pub fn get_contiguous(&self) -> Vec<T> {
94        let mut ret = Vec::with_capacity(self.len);
95
96        self.take_lock();
97        {
98            // SAFETY: Lock prevents aliasing &mut T.
99            // Guarantees required: should be between self.take_lock() and self.return_lock()
100            let buf = unsafe { self.buf.get().as_mut().unwrap() };
101            let write = self.write.load(Ordering::Relaxed);
102            ret.extend_from_slice(&buf[write..]);
103            ret.extend_from_slice(&buf[..write]);
104        }
105        self.return_lock();
106
107        ret
108    }
109
110    fn take_lock(&self) {
111        while self
112            .lock
113            .compare_exchange(false, true, Ordering::SeqCst, Ordering::Acquire)
114            .is_err()
115        {
116            std::hint::spin_loop();
117        }
118    }
119
120    fn return_lock(&self) {
121        self.lock.store(false, Ordering::Release);
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use std::{thread, time::{Duration, Instant}};
129
130    #[test]
131    fn test_new_default() {
132        let orb: MagicOrb<i32> = MagicOrb::new_default(5);
133        let result = orb.get_contiguous();
134        assert_eq!(result, vec![0, 0, 0, 0, 0]);
135    }
136
137    #[test]
138    fn test_new_with_value() {
139        let orb = MagicOrb::new(5, 7);
140        let result = orb.get_contiguous();
141        assert_eq!(result, vec![7, 7, 7, 7, 7]);
142    }
143
144    #[test]
145    fn test_from_vec() {
146        let data = vec![1, 2, 3, 4, 5];
147        let orb = MagicOrb::from(data);
148        let result = orb.get_contiguous();
149        assert_eq!(result, vec![1, 2, 3, 4, 5]);
150    }
151
152    #[test]
153    fn test_push_non_wrapping() {
154        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
155        orb.push_slice_overwrite(&[6, 7]);
156        let result = orb.get_contiguous();
157        assert_eq!(result, vec![3, 4, 5, 6, 7]);
158    }
159
160    #[test]
161    fn test_push_wrapping() {
162        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
163        orb.push_slice_overwrite(&[6, 7, 8, 9, 10]);
164        let result = orb.get_contiguous();
165        assert_eq!(result, vec![6, 7, 8, 9, 10]);
166    }
167
168    #[test]
169    fn test_push_wrapping_partial_1() {
170        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
171        orb.push_slice_overwrite(&[6, 7, 8, 9]);
172        let result = orb.get_contiguous();
173        assert_eq!(result, vec![5, 6, 7, 8, 9]);
174    }
175
176    #[test]
177    fn test_push_larger_than_buffer() {
178        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
179        orb.push_slice_overwrite(&[10, 11, 12, 13, 14, 15, 16]);
180        let result = orb.get_contiguous();
181        assert_eq!(result, vec![12, 13, 14, 15, 16]);
182    }
183
184    #[test]
185    fn test_multithreaded_push() {
186        let orb = Arc::new(MagicOrb::new(1000, 0usize));
187        let mut handles = vec![];
188
189        for i in 0..10 {
190            let orb_clone = Arc::clone(&orb);
191            handles.push(thread::spawn(move || {
192                let data: Vec<usize> = (i * 100..(i + 1) * 100).collect();
193                orb_clone.push_slice_overwrite(&data);
194            }));
195        }
196
197        for handle in handles {
198            handle.join().unwrap();
199        }
200
201        let result = orb.get_contiguous();
202
203        assert_eq!(result.iter().sum::<usize>(), 500 * 999usize);
204    }
205
206    #[test]
207    fn bench_read_performance() {
208        let buffer_size = 4096;
209        let orb = Arc::new(MagicOrb::new(buffer_size, 0usize));
210        let mut reads = 0;
211        let start = Instant::now();
212
213        while start.elapsed() < Duration::from_secs(1) {
214            let _result = orb.get_contiguous();
215            reads += 1;
216        }
217
218        eprintln!("\n--- Test results ---");
219        eprintln!("Reads within 1 sec (buffer size: {}): {}", buffer_size, reads);
220    }
221    
222    #[test]
223    fn bench_write_performance() {
224        let buffer_size = 4096;
225        let slice_size = 16;
226        let orb = Arc::new(MagicOrb::new_default(buffer_size));
227        let slice: Vec<usize> = (0..slice_size).collect();
228        let mut writes = 0;
229        let start = Instant::now();
230
231        while start.elapsed() < Duration::from_secs(1) {
232            orb.push_slice_overwrite(&slice);
233            writes += 1;
234        }
235
236        eprintln!("\n--- Test results ---");
237        eprintln!("Slices writes of 16 bytes within 1 sec (buffer size: {}): {}", slice_size, writes);
238    }
239
240    #[test]
241    fn bench_alternating_performance() {
242        let buffer_size = 4096;
243        let slice_size = 16;
244        let orb = Arc::new(MagicOrb::new_default(buffer_size));
245        let slice: Vec<usize> = (0..slice_size).collect();
246
247        let orb_clone_read = Arc::clone(&orb);
248        let read_handle = thread::spawn(move || {
249            let mut reads = 0;
250            let start = Instant::now();
251            while start.elapsed() < Duration::from_secs(1) {
252                let _result = orb_clone_read.get_contiguous();
253                reads += 1;
254            }
255            reads
256        });
257
258        let orb_clone_write = Arc::clone(&orb);
259        let write_handle = thread::spawn(move || {
260            let mut writes = 0;
261            let start = Instant::now();
262            while start.elapsed() < Duration::from_secs(1) {
263                orb_clone_write.push_slice_overwrite(&slice);
264                writes += 1;
265            }
266            writes
267        });
268
269        let reads = read_handle.join().unwrap();
270        let writes = write_handle.join().unwrap();
271        
272        eprintln!("\n--- Test results ---");
273        eprintln!("Reads - writes within 1 sec (buffer size: {}): Reads: {} Writes: {}", buffer_size, reads, writes);
274    }
275}