shrev/
storage.rs

1//! Ring buffer implementation, that does immutable reads.
2
3use std::{
4    cell::UnsafeCell,
5    fmt,
6    marker::PhantomData,
7    num::Wrapping,
8    ops::{Add, AddAssign, Sub, SubAssign},
9    ptr,
10    sync::mpsc::{self, Receiver, Sender},
11};
12
13use crate::util::{InstanceId, NoSharedAccess, Reference};
14use std::fmt::Debug;
15
16#[derive(Clone, Copy, Debug)]
17struct CircularIndex {
18    index: usize,
19    size: usize,
20}
21
22impl CircularIndex {
23    fn new(index: usize, size: usize) -> Self {
24        CircularIndex { index, size }
25    }
26
27    fn at_end(size: usize) -> Self {
28        CircularIndex {
29            index: size - 1,
30            size,
31        }
32    }
33
34    /// A magic value (!0).
35    /// This value gets set when calling `step` and we reached
36    /// the end.
37    fn magic(size: usize) -> Self {
38        CircularIndex::new(!0, size)
39    }
40
41    fn is_magic(&self) -> bool {
42        self.index == !0
43    }
44
45    fn step(&mut self, inclusive_end: usize) -> Option<usize> {
46        match self.index {
47            x if x == !0 => None,
48            x if x == inclusive_end => {
49                let r = Some(x);
50                self.index = !0;
51
52                r
53            }
54            x => {
55                let r = Some(x);
56                *self += 1;
57
58                r
59            }
60        }
61    }
62
63    fn step_back(&mut self, inclusive_end: &mut usize) -> Option<usize> {
64        match self.index {
65            x if x == !0 => None,
66            x if x == *inclusive_end => {
67                let r = Some(x);
68                self.index = !0;
69
70                r
71            }
72            _ => {
73                let r = Some(*inclusive_end);
74                if *inclusive_end == 0 {
75                    *inclusive_end = self.size;
76                }
77
78                *inclusive_end -= 1;
79
80                r
81            }
82        }
83    }
84}
85
86impl Add<usize> for CircularIndex {
87    type Output = usize;
88
89    fn add(self, rhs: usize) -> usize {
90        (self.index + rhs) % self.size
91    }
92}
93
94impl AddAssign<usize> for CircularIndex {
95    fn add_assign(&mut self, rhs: usize) {
96        self.index = *self + rhs;
97    }
98}
99
100impl Sub<usize> for CircularIndex {
101    type Output = usize;
102
103    fn sub(self, rhs: usize) -> usize {
104        (self.size - rhs + self.index) % self.size
105    }
106}
107
108impl SubAssign<usize> for CircularIndex {
109    fn sub_assign(&mut self, rhs: usize) {
110        self.index = *self - rhs;
111    }
112}
113
114struct Data<T> {
115    data: Vec<T>,
116    uninitialized: usize,
117}
118
119impl<T> Data<T> {
120    fn new(size: usize) -> Self {
121        let mut data = Data {
122            data: vec![],
123            uninitialized: 0,
124        };
125
126        unsafe {
127            data.grow(0, size);
128        }
129
130        debug_assert_eq!(data.uninitialized, size, "Bug in shrev");
131
132        data
133    }
134
135    unsafe fn get(&self, index: usize) -> &T {
136        self.data.get_unchecked(index)
137    }
138
139    unsafe fn put(&mut self, cursor: usize, elem: T) {
140        if self.uninitialized > 0 {
141            // There is no element stored under `cursor`
142            // -> do not drop anything!
143            ptr::write(self.data.get_unchecked_mut(cursor) as *mut T, elem);
144            self.uninitialized -= 1;
145        } else {
146            // We can safely drop this, it's initialized.
147            *self.data.get_unchecked_mut(cursor) = elem;
148        }
149    }
150
151    /// `cursor` is the first position that gets moved to the back,
152    /// free memory will be created between `cursor - 1` and `cursor`.
153    unsafe fn grow(&mut self, cursor: usize, by: usize) {
154        assert!(by >= self.data.len());
155
156        // Calculate how many elements we need to move
157        let to_move = self.data.len() - cursor;
158
159        // Reserve space and set the new length
160        self.data.reserve_exact(by);
161        let new = self.data.len() + by;
162        self.data.set_len(new);
163
164        // Move the elements after the cursor to the end of the buffer.
165        // Since we grew the buffer at least by the old length,
166        // the elements are non-overlapping.
167        let src = self.data.as_ptr().offset(cursor as isize);
168        let dst = self.data.as_mut_ptr().offset((cursor + by) as isize);
169        ptr::copy_nonoverlapping(src, dst, to_move);
170
171        self.uninitialized += by;
172    }
173
174    /// Called when dropping the ring buffer.
175    unsafe fn clean(&mut self, cursor: usize) {
176        let mut cursor = CircularIndex::new(cursor, self.data.len());
177        let end = cursor - 1;
178
179        while let Some(i) = cursor.step(end) {
180            if self.uninitialized > 0 {
181                self.uninitialized -= 1;
182            } else {
183                ptr::drop_in_place(self.data.get_unchecked_mut(i) as *mut T);
184            }
185        }
186
187        self.data.set_len(0);
188    }
189
190    fn num_initialized(&self) -> usize {
191        self.data.len() - self.uninitialized
192    }
193}
194
195impl<T: Debug> Debug for Data<T> {
196    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197        f.debug_struct("Data")
198            .field("num_initialized", &self.num_initialized())
199            .field("num_uninitialized", &self.uninitialized)
200            .finish()
201    }
202}
203
204#[derive(Copy, Clone, Debug)]
205struct Reader {
206    generation: usize,
207    last_index: usize,
208}
209
210impl Reader {
211    fn set_inactive(&mut self) {
212        self.last_index = !0;
213    }
214
215    fn active(&self) -> bool {
216        self.last_index != !0
217    }
218
219    fn distance_from(&self, last: CircularIndex, current_gen: usize) -> usize {
220        let this = CircularIndex {
221            index: self.last_index,
222            size: last.size,
223        };
224
225        match this - last.index {
226            0 if self.generation == current_gen => last.size,
227            x => x,
228        }
229    }
230
231    fn needs_shift(&self, last_index: usize, current_gen: usize) -> bool {
232        self.last_index > last_index
233            || (self.last_index == last_index && self.generation != current_gen)
234    }
235}
236
237/// A reader ID which represents a subscription to the events pushed to the
238/// `EventChannel`.
239///
240/// For each reader ID, the last read event is tracked; this way, the buffer
241/// gets grown whenever it would overwrite an event which was not yet observed
242/// by every `ReaderId`.
243///
244/// Dropping a `ReaderId` effectively cancels the subscription to those events.
245///
246/// Note that as long as a `ReaderId` exists, it is crucial to use it to read
247/// the events; otherwise the buffer of the `EventChannel` **will** keep
248/// growing.
249pub struct ReaderId<T: 'static> {
250    id: usize,
251    marker: PhantomData<&'static [T]>,
252    reference: Reference,
253    // stupid way to make this `Sync`
254    drop_notifier: NoSharedAccess<Sender<usize>>,
255}
256
257impl<T: 'static> fmt::Debug for ReaderId<T> {
258    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259        f.debug_struct("ReaderId")
260            .field("id", &self.id)
261            .field("marker", &self.marker)
262            .field("reference", &self.reference)
263            .field("drop_notifier", &self.drop_notifier)
264            .finish()
265    }
266}
267
268impl<T: 'static> Drop for ReaderId<T> {
269    fn drop(&mut self) {
270        let _ = self.drop_notifier.get_mut().send(self.id);
271    }
272}
273
274#[derive(Default)]
275struct ReaderMeta {
276    /// Free ids
277    free: Vec<usize>,
278    readers: Vec<UnsafeCell<Reader>>,
279}
280
281impl ReaderMeta {
282    fn new() -> Self {
283        Default::default()
284    }
285
286    fn reader<T>(&self, id: &mut ReaderId<T>) -> Option<&mut Reader> {
287        self.readers.get(id.id).map(|r| unsafe { &mut *r.get() })
288    }
289
290    fn reader_exclusive(&mut self, id: usize) -> &mut Reader {
291        unsafe { &mut *self.readers[id].get() }
292    }
293
294    fn has_reader(&mut self) -> bool {
295        self.readers
296            .iter()
297            .map(|r| unsafe { &mut *r.get() })
298            .any(|r| r.active())
299    }
300
301    fn alloc(&mut self, last_index: usize, generation: usize) -> usize {
302        match self.free.pop() {
303            Some(id) => {
304                self.reader_exclusive(id).last_index = last_index;
305                self.reader_exclusive(id).generation = generation;
306
307                id
308            }
309            None => {
310                let id = self.readers.len();
311                self.readers.push(UnsafeCell::new(Reader {
312                    generation,
313                    last_index,
314                }));
315
316                id
317            }
318        }
319    }
320
321    fn remove(&mut self, id: usize) {
322        self.reader_exclusive(id).set_inactive();
323        self.free.push(id);
324    }
325
326    // This needs to be mutable since `readers` might be borrowed in `reader`!
327    fn nearest_index(&mut self, last: CircularIndex, current_gen: usize) -> Option<&Reader> {
328        self.readers
329            .iter()
330            .map(|reader| unsafe { &*reader.get() })
331            .filter(|reader| reader.active())
332            .min_by_key(|reader| reader.distance_from(last, current_gen))
333    }
334
335    fn shift(&mut self, last_index: usize, current_gen: usize, grow_by: usize) {
336        for reader in &mut self.readers {
337            let reader = unsafe { &mut *reader.get() } as &mut Reader;
338            if !reader.active() {
339                continue;
340            }
341
342            if reader.needs_shift(last_index, current_gen) {
343                reader.last_index += grow_by;
344            }
345        }
346    }
347}
348
349unsafe impl Send for ReaderMeta {}
350unsafe impl Sync for ReaderMeta {}
351
352/// Ring buffer, holding data of type `T`.
353pub struct RingBuffer<T> {
354    available: usize,
355    last_index: CircularIndex,
356    data: Data<T>,
357    free_rx: NoSharedAccess<Receiver<usize>>,
358    free_tx: NoSharedAccess<Sender<usize>>,
359    generation: Wrapping<usize>,
360    instance_id: InstanceId,
361    meta: ReaderMeta,
362}
363
364impl<T: 'static> RingBuffer<T> {
365    /// Create a new ring buffer with the given max size.
366    pub fn new(size: usize) -> Self {
367        assert!(size > 1);
368
369        let (free_tx, free_rx) = mpsc::channel();
370        let free_tx = NoSharedAccess::new(free_tx);
371        let free_rx = NoSharedAccess::new(free_rx);
372
373        RingBuffer {
374            available: size,
375            last_index: CircularIndex::at_end(size),
376            data: Data::new(size),
377            free_rx,
378            free_tx,
379            generation: Wrapping(0),
380            instance_id: InstanceId::new("`ReaderId` was not allocated by this `EventChannel`"),
381            meta: ReaderMeta::new(),
382        }
383    }
384
385    /// Iterates over all elements of `iter` and pushes them to the buffer.
386    pub fn iter_write<I>(&mut self, iter: I)
387    where
388        I: IntoIterator<Item = T>,
389        I::IntoIter: ExactSizeIterator,
390    {
391        let iter = iter.into_iter();
392        let len = iter.len();
393        if len > 0 {
394            self.ensure_additional(len);
395            for element in iter {
396                unsafe {
397                    self.data.put(self.last_index + 1, element);
398                }
399                self.last_index += 1;
400            }
401            self.available -= len;
402            self.generation += Wrapping(1);
403        }
404    }
405
406    /// Removes all elements from a `Vec` and pushes them to the ring buffer.
407    pub fn drain_vec_write(&mut self, data: &mut Vec<T>) {
408        self.iter_write(data.drain(..));
409    }
410
411    // Checks if any reader would observe an additional event.
412    pub fn would_write(&mut self) -> bool {
413        self.maintain();
414
415        self.meta.has_reader()
416    }
417
418    /// Ensures that `num` elements can be inserted.
419    /// Does nothing if there's enough space, grows the buffer otherwise.
420    #[inline(always)]
421    pub fn ensure_additional(&mut self, num: usize) {
422        if self.available >= num {
423            return;
424        }
425
426        self.ensure_additional_slow(num);
427    }
428
429    #[inline(never)]
430    fn ensure_additional_slow(&mut self, num: usize) {
431        self.maintain();
432        let left: usize = match self.meta.nearest_index(self.last_index, self.generation.0) {
433            None => {
434                self.available = self.last_index.size;
435
436                return;
437            }
438            Some(reader) => {
439                let left = reader.distance_from(self.last_index, self.generation.0);
440
441                self.available = left;
442
443                if left >= num {
444                    return;
445                } else {
446                    left
447                }
448            }
449        };
450        let grow_by = num - left;
451        let min_target_size = self.last_index.size + grow_by;
452
453        // Make sure size' = 2^n * size
454        let mut size = 2 * self.last_index.size;
455        while size < min_target_size {
456            size *= 2;
457        }
458
459        // Calculate adjusted growth
460        let grow_by = size - self.last_index.size;
461
462        // Insert the additional elements
463        unsafe {
464            self.data.grow(self.last_index + 1, grow_by);
465        }
466        self.last_index.size = size;
467
468        self.meta
469            .shift(self.last_index.index, self.generation.0, grow_by);
470        self.available = grow_by + left
471    }
472
473    fn maintain(&mut self) {
474        while let Ok(id) = self.free_rx.get_mut().try_recv() {
475            self.meta.remove(id);
476        }
477    }
478
479    /// Write a single data point into the ring buffer.
480    pub fn single_write(&mut self, element: T) {
481        use std::iter::once;
482
483        self.iter_write(once(element));
484    }
485
486    /// Create a new reader id for this ring buffer.
487    pub fn new_reader_id(&mut self) -> ReaderId<T> {
488        self.maintain();
489        let last_index = self.last_index.index;
490        let generation = self.generation.0;
491        let id = self.meta.alloc(last_index, generation);
492
493        ReaderId {
494            id,
495            marker: PhantomData,
496            reference: self.instance_id.reference(),
497            drop_notifier: NoSharedAccess::new(self.free_tx.get_mut().clone()),
498        }
499    }
500
501    /// Read data from the ring buffer, starting where the last read ended, and
502    /// up to where the last element was written.
503    pub fn read(&self, reader_id: &mut ReaderId<T>) -> StorageIterator<T> {
504        // Check if `reader_id` was actually created for this buffer.
505        // This is very important as `reader_id` is a token allowing memory access,
506        // and without this check a race could be caused by duplicate IDs.
507        self.instance_id.assert_eq(&reader_id.reference);
508
509        let (last_read_index, gen) = {
510            let reader = self.meta.reader(reader_id).unwrap_or_else(|| {
511                panic!(
512                    "ReaderId not registered: {}\n\
513                     This usually means that this ReaderId \
514                     was created by a different storage",
515                    reader_id.id
516                )
517            });
518            let old = reader.last_index;
519            reader.last_index = self.last_index.index;
520            let old_gen = reader.generation;
521            reader.generation = self.generation.0;
522
523            (old, old_gen)
524        };
525        let mut index = CircularIndex::new(last_read_index, self.last_index.size);
526        index += 1;
527        if gen == self.generation.0 {
528            // It is empty
529            index = CircularIndex::magic(index.size);
530        }
531
532        let iter = StorageIterator {
533            data: &self.data,
534            end: self.last_index.index,
535            index,
536        };
537
538        iter
539    }
540}
541
542impl<T: Debug> Debug for RingBuffer<T> {
543    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
544        f.debug_struct("RingBuffer")
545            .field("available", &self.available)
546            .field("instance_id", &self.instance_id)
547            .field("data", &self.data)
548            .field("last_index", &self.last_index)
549            .finish()
550    }
551}
552
553impl<T> Drop for RingBuffer<T> {
554    fn drop(&mut self) {
555        unsafe {
556            self.data.clean(self.last_index + 1);
557        }
558    }
559}
560
561/// Iterator over a slice of data in `RingBufferStorage`.
562#[derive(Debug)]
563pub struct StorageIterator<'a, T: 'a> {
564    data: &'a Data<T>,
565    /// Inclusive end
566    end: usize,
567    index: CircularIndex,
568}
569
570impl<'a, T> Iterator for StorageIterator<'a, T> {
571    type Item = &'a T;
572
573    fn next(&mut self) -> Option<&'a T> {
574        self.index
575            .step(self.end)
576            .map(|i| unsafe { self.data.get(i) })
577    }
578
579    // Needed to fulfill contract of `ExactSizeIterator`
580    fn size_hint(&self) -> (usize, Option<usize>) {
581        let len = self.len();
582
583        (len, Some(len))
584    }
585}
586
587impl<'a, T> DoubleEndedIterator for StorageIterator<'a, T> {
588    fn next_back(&mut self) -> Option<Self::Item> {
589        self.index
590            .step_back(&mut self.end)
591            .map(|i| unsafe { self.data.get(i) })
592    }
593}
594
595impl<'a, T> ExactSizeIterator for StorageIterator<'a, T> {
596    fn len(&self) -> usize {
597        match self.index.is_magic() {
598            true => 0,
599            false => (CircularIndex::new(self.end, self.index.size) - self.index.index) + 1,
600        }
601    }
602}
603
604#[cfg(test)]
605mod tests {
606    use super::*;
607
608    #[derive(Debug, Clone, PartialEq)]
609    struct Test {
610        pub id: u32,
611    }
612
613    #[derive(Debug, Clone, PartialEq)]
614    struct Test2 {
615        pub id: u32,
616    }
617
618    #[test]
619    fn test_size() {
620        let mut buffer = RingBuffer::<i32>::new(4);
621
622        buffer.single_write(55);
623
624        let mut reader = buffer.new_reader_id();
625
626        buffer.iter_write(0..16);
627        assert_eq!(buffer.read(&mut reader).len(), 16);
628
629        buffer.iter_write(0..6);
630        assert_eq!(buffer.read(&mut reader).len(), 6);
631    }
632
633    #[test]
634    fn test_circular() {
635        let mut buffer = RingBuffer::<i32>::new(4);
636
637        buffer.single_write(55);
638
639        let mut reader = buffer.new_reader_id();
640
641        buffer.iter_write(0..4);
642        assert_eq!(
643            buffer.read(&mut reader).cloned().collect::<Vec<_>>(),
644            vec![0, 1, 2, 3]
645        );
646    }
647
648    #[test]
649    fn test_empty_write() {
650        let mut buffer = RingBuffer::<Test>::new(10);
651        buffer.drain_vec_write(&mut vec![]);
652        assert_eq!(buffer.data.num_initialized(), 0);
653    }
654
655    #[test]
656    fn test_too_large_write() {
657        let mut buffer = RingBuffer::<Test>::new(10);
658        // Events just go off into the void if there's no reader registered.
659        let _reader = buffer.new_reader_id();
660        buffer.drain_vec_write(&mut events(15));
661        assert_eq!(buffer.data.num_initialized(), 15);
662    }
663
664    #[test]
665    fn test_empty_read() {
666        let mut buffer = RingBuffer::<Test>::new(10);
667        let mut reader_id = buffer.new_reader_id();
668        let data = buffer.read(&mut reader_id);
669        assert_eq!(Vec::<Test>::default(), data.cloned().collect::<Vec<_>>())
670    }
671
672    #[test]
673    fn test_empty_read_rev() {
674        let mut buffer = RingBuffer::<Test>::new(10);
675        let mut reader_id = buffer.new_reader_id();
676        let data = buffer.read(&mut reader_id);
677        assert_eq!(
678            Vec::<Test>::default(),
679            data.rev().cloned().collect::<Vec<_>>()
680        )
681    }
682
683    #[test]
684    fn test_empty_read_write_before_id() {
685        let mut buffer = RingBuffer::<Test>::new(10);
686        buffer.drain_vec_write(&mut events(2));
687        let mut reader_id = buffer.new_reader_id();
688        let data = buffer.read(&mut reader_id);
689        assert_eq!(Vec::<Test>::default(), data.cloned().collect::<Vec<_>>())
690    }
691
692    #[test]
693    fn test_empty_read_write_before_id_rev() {
694        let mut buffer = RingBuffer::<Test>::new(10);
695        buffer.drain_vec_write(&mut events(2));
696        let mut reader_id = buffer.new_reader_id();
697        let data = buffer.read(&mut reader_id);
698        assert_eq!(
699            Vec::<Test>::default(),
700            data.rev().cloned().collect::<Vec<_>>()
701        )
702    }
703
704    #[test]
705    fn test_read() {
706        let mut buffer = RingBuffer::<Test>::new(10);
707        let mut reader_id = buffer.new_reader_id();
708        buffer.drain_vec_write(&mut events(2));
709        assert_eq!(
710            vec![Test { id: 0 }, Test { id: 1 }],
711            buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
712        );
713
714        assert_eq!(
715            Vec::<Test>::new(),
716            buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
717        );
718    }
719
720    #[test]
721    fn test_read_rev() {
722        let mut buffer = RingBuffer::<Test>::new(10);
723        let mut reader_id = buffer.new_reader_id();
724        buffer.drain_vec_write(&mut events(2));
725        assert_eq!(
726            vec![Test { id: 1 }, Test { id: 0 }],
727            buffer
728                .read(&mut reader_id)
729                .rev()
730                .cloned()
731                .collect::<Vec<_>>()
732        );
733
734        assert_eq!(
735            Vec::<Test>::new(),
736            buffer
737                .read(&mut reader_id)
738                .rev()
739                .cloned()
740                .collect::<Vec<_>>()
741        );
742    }
743
744    #[test]
745    fn test_read_wrap_around_rev() {
746        let mut buffer = RingBuffer::<Test>::new(4);
747        let mut reader_id = buffer.new_reader_id();
748        buffer.drain_vec_write(&mut events(2));
749        // buffer should now be [--> 0, 1, -, -]
750        assert_eq!(
751            vec![Test { id: 1 }, Test { id: 0 }],
752            buffer
753                .read(&mut reader_id)
754                .rev()
755                .cloned()
756                .collect::<Vec<_>>()
757        );
758        buffer.drain_vec_write(&mut events(3));
759        // buffer should now be [2, 1, --> 0, 1]
760        assert_eq!(
761            vec![Test { id: 2 }, Test { id: 1 }, Test { id: 0 }],
762            buffer
763                .read(&mut reader_id)
764                .rev()
765                .cloned()
766                .collect::<Vec<_>>()
767        );
768
769        assert_eq!(
770            Vec::<Test>::new(),
771            buffer
772                .read(&mut reader_id)
773                .rev()
774                .cloned()
775                .collect::<Vec<_>>()
776        );
777    }
778
779    #[test]
780    fn test_read_wrap_around_overflow_rev() {
781        let mut buffer = RingBuffer::<Test>::new(5);
782        let mut reader_id = buffer.new_reader_id();
783        buffer.drain_vec_write(&mut events(2));
784        assert_eq!(
785            vec![Test { id: 1 }, Test { id: 0 }],
786            buffer
787                .read(&mut reader_id)
788                .rev()
789                .cloned()
790                .collect::<Vec<_>>()
791        );
792        buffer.drain_vec_write(&mut events(6));
793        // buffer should now be [2, 1, --> 0, 1]
794        assert_eq!(
795            events(6).into_iter().rev().collect::<Vec<_>>(),
796            buffer
797                .read(&mut reader_id)
798                .rev()
799                .cloned()
800                .collect::<Vec<_>>()
801        );
802
803        assert_eq!(
804            Vec::<Test>::new(),
805            buffer
806                .read(&mut reader_id)
807                .rev()
808                .cloned()
809                .collect::<Vec<_>>()
810        );
811    }
812
813    #[test]
814    fn test_write_overflow() {
815        let mut buffer = RingBuffer::<Test>::new(3);
816        let mut reader_id = buffer.new_reader_id();
817        buffer.drain_vec_write(&mut events(4));
818        let data = buffer.read(&mut reader_id);
819        assert_eq!(
820            vec![
821                Test { id: 0 },
822                Test { id: 1 },
823                Test { id: 2 },
824                Test { id: 3 },
825            ],
826            data.cloned().collect::<Vec<_>>()
827        );
828    }
829
830    /// If you're getting a compilation error here this test has failed!
831    #[test]
832    fn test_send_sync() {
833        trait SendSync: Send + Sync {
834            fn is_send_sync() -> bool;
835        }
836
837        impl<T> SendSync for T
838        where
839            T: Send + Sync,
840        {
841            fn is_send_sync() -> bool {
842                true
843            }
844        }
845
846        assert!(RingBuffer::<Test>::is_send_sync());
847        assert!(ReaderId::<Test>::is_send_sync());
848    }
849
850    #[test]
851    fn test_reader_reuse() {
852        let mut buffer = RingBuffer::<Test>::new(3);
853        {
854            let _reader_id = buffer.new_reader_id();
855        }
856        let _reader_id = buffer.new_reader_id();
857        assert_eq!(_reader_id.id, 0);
858        assert_eq!(buffer.meta.readers.len(), 1);
859    }
860
861    #[test]
862    fn test_prevent_excess_growth() {
863        let mut buffer = RingBuffer::<Test>::new(3);
864        let mut reader_id = buffer.new_reader_id();
865        println!("Initial buffer state: {:#?}", buffer);
866        println!("--- first write ---");
867        buffer.drain_vec_write(&mut events(2));
868        println!("--- second write ---");
869        buffer.drain_vec_write(&mut events(2));
870        println!("--- writes complete ---");
871        // we wrote 0,1,0,1, if the buffer grew correctly we'll get all of these back.
872        assert_eq!(
873            vec![
874                Test { id: 0 },
875                Test { id: 1 },
876                Test { id: 0 },
877                Test { id: 1 },
878            ],
879            buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
880        );
881
882        buffer.drain_vec_write(&mut events(4));
883        // After writing 4 more events the buffer should have no reason to grow beyond 6
884        // (2 * 3).
885        assert_eq!(buffer.data.num_initialized(), 6);
886        assert_eq!(
887            vec![
888                Test { id: 0 },
889                Test { id: 1 },
890                Test { id: 2 },
891                Test { id: 3 },
892            ],
893            buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
894        );
895    }
896
897    #[test]
898    fn test_write_slice() {
899        let mut buffer = RingBuffer::<Test>::new(10);
900        let mut reader_id = buffer.new_reader_id();
901        buffer.iter_write(events(2));
902        let data = buffer.read(&mut reader_id);
903        assert_eq!(
904            vec![Test { id: 0 }, Test { id: 1 }],
905            data.cloned().collect::<Vec<_>>()
906        );
907    }
908
909    #[test]
910    fn iter_write_empty() {
911        let mut buffer = RingBuffer::<Test>::new(10);
912        let mut reader_id = buffer.new_reader_id();
913        buffer.iter_write(Vec::new());
914        let mut data = buffer.read(&mut reader_id);
915        assert_eq!(None, data.next());
916    }
917
918    fn events(n: u32) -> Vec<Test> {
919        (0..n).map(|i| Test { id: i }).collect::<Vec<_>>()
920    }
921}