magic_orb/
magic_orb.rs

1#[cfg(feature = "not_nightly")]
2use crate::sync_unsafe_cell::SyncUnsafeCell;
3#[cfg(not(feature = "not_nightly"))]
4use std::cell::SyncUnsafeCell;
5
6use std::{
7    cmp::min,
8    fmt::Debug,
9    sync::{
10        Arc,
11        atomic::{AtomicBool, AtomicUsize, Ordering},
12    },
13};
14
15#[derive(Debug, Clone)]
16pub struct MagicOrb<T>
17where
18    T: Clone + Send,
19{
20    buf: Arc<SyncUnsafeCell<Vec<T>>>,
21    write: Arc<AtomicUsize>,
22    lock: Arc<AtomicBool>,
23    len: Arc<AtomicUsize>,
24    capacity: Arc<AtomicUsize>,
25}
26
27impl<T: Default + Clone + Send> MagicOrb<T> {
28    pub fn new_default(size: usize) -> Self {
29        if size == 0 {
30            panic!("Can't crate MagicOrb with buffer size 0");
31        }
32        MagicOrb {
33            buf: Arc::new(SyncUnsafeCell::new(vec![T::default(); size])),
34            write: Arc::new(AtomicUsize::new(0)),
35            lock: Arc::new(AtomicBool::new(false)),
36            len: Arc::new(AtomicUsize::new(size)),
37            capacity: Arc::new(AtomicUsize::new(size)),
38        }
39    }
40}
41
42impl<T: Clone + Send> From<Vec<T>> for MagicOrb<T> {
43    fn from(value: Vec<T>) -> Self {
44        if value.is_empty() {
45            panic!("Can't crate MagicOrb with buffer size 0");
46        }
47        MagicOrb {
48            capacity: Arc::new(AtomicUsize::new(value.len())),
49            len: Arc::new(AtomicUsize::new(value.len())),
50            buf: Arc::new(SyncUnsafeCell::new(value)),
51            write: Arc::new(AtomicUsize::new(0)),
52            lock: Arc::new(AtomicBool::new(false)),
53        }
54    }
55}
56
57impl<T: Clone + Send + Debug> MagicOrb<T> {
58    pub fn new(size: usize, default_val: T) -> Self {
59        if size == 0 {
60            panic!("Can't crate MagicOrb with buffer size 0");
61        }
62        MagicOrb {
63            buf: Arc::new(SyncUnsafeCell::new(vec![default_val; size])),
64            write: Arc::new(AtomicUsize::new(0)),
65            lock: Arc::new(AtomicBool::new(false)),
66            len: Arc::new(AtomicUsize::new(size)),
67            capacity: Arc::new(AtomicUsize::new(size)),
68        }
69    }
70
71    pub fn push_slice_overwrite(&self, data: &[T]) {
72        let occupied = self.capacity.load(Ordering::Acquire);
73        let data = if data.len() > occupied {
74            &data[data.len() - occupied..]
75        } else {
76            data
77        };
78
79        self.take_lock();
80        {
81            // SAFETY: Lock prevents aliasing &mut T
82            // Guarantees required: should be between self.take_lock() and self.return_lock()
83            let buf = unsafe { self.buf.get().as_mut().unwrap() };
84            let write = self.write.load(Ordering::Relaxed);
85
86            if data.len() + write <= occupied {
87                buf[write..write + data.len()].clone_from_slice(data);
88                self.write
89                    .store((write + data.len()) % occupied, Ordering::Relaxed);
90            } else {
91                let first_len = occupied - write;
92                buf[write..].clone_from_slice(&data[..first_len]);
93                buf[..data.len() - first_len].clone_from_slice(&data[first_len..]);
94                self.write.store(data.len() - first_len, Ordering::Relaxed);
95            }
96
97            let capacity = self.capacity();
98
99            _ = self
100                .len
101                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur_val| {
102                    if cur_val < capacity {
103                        Some(min(cur_val + data.len(), capacity))
104                    } else {
105                        None
106                    }
107                });
108        }
109        self.return_lock();
110    }
111
112    pub fn get_contiguous(&self) -> Vec<T> {
113        let capacity = self.capacity();
114        let mut ret = Vec::with_capacity(capacity);
115
116        self.take_lock();
117        if self.is_empty() {
118            self.return_lock();
119            return ret;
120        }
121
122        let vacant_amount = {
123            // SAFETY: Lock prevents aliasing &mut T.
124            // Guarantees required: should be between self.take_lock() and self.return_lock()
125            let buf = unsafe { self.buf.get().as_mut().unwrap() };
126            let write = self.write.load(Ordering::Relaxed);
127            let vacant_amount = capacity - self.len();
128            ret.extend_from_slice(&buf[write..]);
129            ret.extend_from_slice(&buf[..write]);
130            vacant_amount
131        };
132        self.return_lock();
133
134        if vacant_amount > 0 {
135            ret = ret.split_off(vacant_amount);
136        }
137
138        ret
139    }
140
141    pub fn pop_back(&self) {
142        self.take_lock();
143        {
144            if self
145                .len
146                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |cur_val| {
147                    if cur_val == 0 {
148                        None
149                    } else {
150                        Some(cur_val - 1)
151                    }
152                })
153                .is_ok()
154            {
155                let max_len = self.capacity();
156                _ = self
157                    .write
158                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |idx| {
159                        Some((idx + max_len - 1) % max_len)
160                    });
161            }
162        }
163        self.return_lock();
164    }
165
166    pub fn capacity(&self) -> usize {
167        self.capacity.load(Ordering::Relaxed)
168    }
169
170    pub fn len(&self) -> usize {
171        self.len.load(Ordering::Relaxed)
172    }
173
174    pub fn is_empty(&self) -> bool {
175        if self.len() == 0 {
176            return true;
177        }
178        false
179    }
180
181    fn take_lock(&self) {
182        while self
183            .lock
184            .compare_exchange(false, true, Ordering::Release, Ordering::Acquire)
185            .is_err()
186        {
187            std::hint::spin_loop();
188        }
189    }
190
191    fn return_lock(&self) {
192        self.lock.store(false, Ordering::Release);
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use std::thread;
200
201    #[test]
202    fn test_new_default() {
203        let orb: MagicOrb<i32> = MagicOrb::new_default(5);
204        let result = orb.get_contiguous();
205        assert_eq!(result, vec![0, 0, 0, 0, 0]);
206    }
207
208    #[test]
209    fn test_new_with_value() {
210        let orb = MagicOrb::new(5, 7);
211        let result = orb.get_contiguous();
212        assert_eq!(result, vec![7, 7, 7, 7, 7]);
213    }
214
215    #[test]
216    fn test_from_vec() {
217        let data = vec![1, 2, 3, 4, 5];
218        let orb = MagicOrb::from(data);
219        let result = orb.get_contiguous();
220        assert_eq!(result, vec![1, 2, 3, 4, 5]);
221    }
222
223    #[test]
224    fn test_push_non_wrapping() {
225        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
226        orb.push_slice_overwrite(&[6, 7]);
227        let result = orb.get_contiguous();
228        assert_eq!(result, vec![3, 4, 5, 6, 7]);
229    }
230
231    #[test]
232    fn test_push_wrapping() {
233        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
234        orb.push_slice_overwrite(&[6, 7, 8, 9, 10]);
235        let result = orb.get_contiguous();
236        assert_eq!(result, vec![6, 7, 8, 9, 10]);
237    }
238
239    #[test]
240    fn test_push_wrapping_partial_1() {
241        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
242        orb.push_slice_overwrite(&[6, 7, 8, 9]);
243        let result = orb.get_contiguous();
244        assert_eq!(result, vec![5, 6, 7, 8, 9]);
245    }
246
247    #[test]
248    fn test_push_larger_than_buffer() {
249        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
250        orb.push_slice_overwrite(&[10, 11, 12, 13, 14, 15, 16]);
251        let result = orb.get_contiguous();
252        assert_eq!(result, vec![12, 13, 14, 15, 16]);
253    }
254
255    #[test]
256    fn test_multithreaded_push() {
257        let orb = Arc::new(MagicOrb::new(1000, 0usize));
258        let mut handles = vec![];
259
260        for i in 0..10 {
261            let orb_clone = Arc::clone(&orb);
262            handles.push(thread::spawn(move || {
263                let data: Vec<usize> = (i * 100..(i + 1) * 100).collect();
264                orb_clone.push_slice_overwrite(&data);
265            }));
266        }
267
268        for handle in handles {
269            handle.join().unwrap();
270        }
271
272        let result = orb.get_contiguous();
273
274        assert_eq!(result.iter().sum::<usize>(), 500 * 999usize);
275    }
276
277    #[test]
278    fn test_pop_front_from_full_orb() {
279        let orb = MagicOrb::from(vec![1, 2, 3, 4, 5]);
280        assert_eq!(orb.len(), 5);
281        orb.pop_back();
282        assert_eq!(orb.len(), 4);
283        let contents = orb.get_contiguous();
284        assert_eq!(contents, vec![1, 2, 3, 4]);
285    }
286
287    #[test]
288    fn test_pop_front_until_empty() {
289        let orb = MagicOrb::from(vec![1, 2, 3]);
290        assert_eq!(orb.len(), 3);
291        orb.pop_back();
292        assert_eq!(orb.len(), 2);
293        let contents = orb.get_contiguous();
294        assert_eq!(contents, vec![1, 2]);
295
296        orb.pop_back();
297        assert_eq!(orb.len(), 1);
298        let contents = orb.get_contiguous();
299        assert_eq!(contents, vec![1]);
300
301        orb.pop_back();
302        assert_eq!(orb.len(), 0);
303        assert!(orb.is_empty());
304        let contents = orb.get_contiguous();
305        assert!(contents.is_empty());
306    }
307
308    #[test]
309    fn test_pop_front_from_empty_orb() {
310        let orb = MagicOrb::new(5, 0);
311        assert_eq!(orb.len(), 5);
312        for _ in 0..5 {
313            orb.pop_back();
314        }
315        assert_eq!(orb.len(), 0);
316
317        orb.pop_back();
318        assert_eq!(orb.len(), 0);
319        assert!(orb.is_empty());
320    }
321
322    #[test]
323    fn test_pop_front_after_push_and_overwrite() {
324        let orb = MagicOrb::new_default(4);
325        orb.push_slice_overwrite(&[1, 2, 3, 4, 5, 6]);
326        assert_eq!(orb.len(), 4);
327        let contents = orb.get_contiguous();
328        assert_eq!(contents, vec![3, 4, 5, 6]);
329
330        orb.pop_back();
331        orb.pop_back();
332        orb.push_slice_overwrite(&[7]);
333        assert_eq!(orb.len(), 3);
334        let contents = orb.get_contiguous();
335        assert_eq!(contents, vec![3, 4, 7]);
336
337        orb.push_slice_overwrite(&[9, 10]);
338        assert_eq!(orb.get_contiguous(), vec![4, 7, 9, 10]);
339        orb.pop_back();
340        assert_eq!(orb.len(), 3);
341        let contents = orb.get_contiguous();
342        assert_eq!(contents, vec![4, 7, 9]);
343    }
344}