1use std::{
4 cell::UnsafeCell,
5 fmt,
6 marker::PhantomData,
7 num::Wrapping,
8 ops::{Add, AddAssign, Sub, SubAssign},
9 ptr,
10 sync::mpsc::{self, Receiver, Sender},
11};
12
13use crate::util::{InstanceId, NoSharedAccess, Reference};
14use std::fmt::Debug;
15
16#[derive(Clone, Copy, Debug)]
17struct CircularIndex {
18 index: usize,
19 size: usize,
20}
21
22impl CircularIndex {
23 fn new(index: usize, size: usize) -> Self {
24 CircularIndex { index, size }
25 }
26
27 fn at_end(size: usize) -> Self {
28 CircularIndex {
29 index: size - 1,
30 size,
31 }
32 }
33
34 fn magic(size: usize) -> Self {
38 CircularIndex::new(!0, size)
39 }
40
41 fn is_magic(&self) -> bool {
42 self.index == !0
43 }
44
45 fn step(&mut self, inclusive_end: usize) -> Option<usize> {
46 match self.index {
47 x if x == !0 => None,
48 x if x == inclusive_end => {
49 let r = Some(x);
50 self.index = !0;
51
52 r
53 }
54 x => {
55 let r = Some(x);
56 *self += 1;
57
58 r
59 }
60 }
61 }
62
63 fn step_back(&mut self, inclusive_end: &mut usize) -> Option<usize> {
64 match self.index {
65 x if x == !0 => None,
66 x if x == *inclusive_end => {
67 let r = Some(x);
68 self.index = !0;
69
70 r
71 }
72 _ => {
73 let r = Some(*inclusive_end);
74 if *inclusive_end == 0 {
75 *inclusive_end = self.size;
76 }
77
78 *inclusive_end -= 1;
79
80 r
81 }
82 }
83 }
84}
85
86impl Add<usize> for CircularIndex {
87 type Output = usize;
88
89 fn add(self, rhs: usize) -> usize {
90 (self.index + rhs) % self.size
91 }
92}
93
94impl AddAssign<usize> for CircularIndex {
95 fn add_assign(&mut self, rhs: usize) {
96 self.index = *self + rhs;
97 }
98}
99
100impl Sub<usize> for CircularIndex {
101 type Output = usize;
102
103 fn sub(self, rhs: usize) -> usize {
104 (self.size - rhs + self.index) % self.size
105 }
106}
107
108impl SubAssign<usize> for CircularIndex {
109 fn sub_assign(&mut self, rhs: usize) {
110 self.index = *self - rhs;
111 }
112}
113
114struct Data<T> {
115 data: Vec<T>,
116 uninitialized: usize,
117}
118
119impl<T> Data<T> {
120 fn new(size: usize) -> Self {
121 let mut data = Data {
122 data: vec![],
123 uninitialized: 0,
124 };
125
126 unsafe {
127 data.grow(0, size);
128 }
129
130 debug_assert_eq!(data.uninitialized, size, "Bug in shrev");
131
132 data
133 }
134
135 unsafe fn get(&self, index: usize) -> &T {
136 self.data.get_unchecked(index)
137 }
138
139 unsafe fn put(&mut self, cursor: usize, elem: T) {
140 if self.uninitialized > 0 {
141 ptr::write(self.data.get_unchecked_mut(cursor) as *mut T, elem);
144 self.uninitialized -= 1;
145 } else {
146 *self.data.get_unchecked_mut(cursor) = elem;
148 }
149 }
150
151 unsafe fn grow(&mut self, cursor: usize, by: usize) {
154 assert!(by >= self.data.len());
155
156 let to_move = self.data.len() - cursor;
158
159 self.data.reserve_exact(by);
161 let new = self.data.len() + by;
162 self.data.set_len(new);
163
164 let src = self.data.as_ptr().offset(cursor as isize);
168 let dst = self.data.as_mut_ptr().offset((cursor + by) as isize);
169 ptr::copy_nonoverlapping(src, dst, to_move);
170
171 self.uninitialized += by;
172 }
173
174 unsafe fn clean(&mut self, cursor: usize) {
176 let mut cursor = CircularIndex::new(cursor, self.data.len());
177 let end = cursor - 1;
178
179 while let Some(i) = cursor.step(end) {
180 if self.uninitialized > 0 {
181 self.uninitialized -= 1;
182 } else {
183 ptr::drop_in_place(self.data.get_unchecked_mut(i) as *mut T);
184 }
185 }
186
187 self.data.set_len(0);
188 }
189
190 fn num_initialized(&self) -> usize {
191 self.data.len() - self.uninitialized
192 }
193}
194
195impl<T: Debug> Debug for Data<T> {
196 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197 f.debug_struct("Data")
198 .field("num_initialized", &self.num_initialized())
199 .field("num_uninitialized", &self.uninitialized)
200 .finish()
201 }
202}
203
204#[derive(Copy, Clone, Debug)]
205struct Reader {
206 generation: usize,
207 last_index: usize,
208}
209
210impl Reader {
211 fn set_inactive(&mut self) {
212 self.last_index = !0;
213 }
214
215 fn active(&self) -> bool {
216 self.last_index != !0
217 }
218
219 fn distance_from(&self, last: CircularIndex, current_gen: usize) -> usize {
220 let this = CircularIndex {
221 index: self.last_index,
222 size: last.size,
223 };
224
225 match this - last.index {
226 0 if self.generation == current_gen => last.size,
227 x => x,
228 }
229 }
230
231 fn needs_shift(&self, last_index: usize, current_gen: usize) -> bool {
232 self.last_index > last_index
233 || (self.last_index == last_index && self.generation != current_gen)
234 }
235}
236
237pub struct ReaderId<T: 'static> {
250 id: usize,
251 marker: PhantomData<&'static [T]>,
252 reference: Reference,
253 drop_notifier: NoSharedAccess<Sender<usize>>,
255}
256
257impl<T: 'static> fmt::Debug for ReaderId<T> {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 f.debug_struct("ReaderId")
260 .field("id", &self.id)
261 .field("marker", &self.marker)
262 .field("reference", &self.reference)
263 .field("drop_notifier", &self.drop_notifier)
264 .finish()
265 }
266}
267
268impl<T: 'static> Drop for ReaderId<T> {
269 fn drop(&mut self) {
270 let _ = self.drop_notifier.get_mut().send(self.id);
271 }
272}
273
274#[derive(Default)]
275struct ReaderMeta {
276 free: Vec<usize>,
278 readers: Vec<UnsafeCell<Reader>>,
279}
280
281impl ReaderMeta {
282 fn new() -> Self {
283 Default::default()
284 }
285
286 fn reader<T>(&self, id: &mut ReaderId<T>) -> Option<&mut Reader> {
287 self.readers.get(id.id).map(|r| unsafe { &mut *r.get() })
288 }
289
290 fn reader_exclusive(&mut self, id: usize) -> &mut Reader {
291 unsafe { &mut *self.readers[id].get() }
292 }
293
294 fn has_reader(&mut self) -> bool {
295 self.readers
296 .iter()
297 .map(|r| unsafe { &mut *r.get() })
298 .any(|r| r.active())
299 }
300
301 fn alloc(&mut self, last_index: usize, generation: usize) -> usize {
302 match self.free.pop() {
303 Some(id) => {
304 self.reader_exclusive(id).last_index = last_index;
305 self.reader_exclusive(id).generation = generation;
306
307 id
308 }
309 None => {
310 let id = self.readers.len();
311 self.readers.push(UnsafeCell::new(Reader {
312 generation,
313 last_index,
314 }));
315
316 id
317 }
318 }
319 }
320
321 fn remove(&mut self, id: usize) {
322 self.reader_exclusive(id).set_inactive();
323 self.free.push(id);
324 }
325
326 fn nearest_index(&mut self, last: CircularIndex, current_gen: usize) -> Option<&Reader> {
328 self.readers
329 .iter()
330 .map(|reader| unsafe { &*reader.get() })
331 .filter(|reader| reader.active())
332 .min_by_key(|reader| reader.distance_from(last, current_gen))
333 }
334
335 fn shift(&mut self, last_index: usize, current_gen: usize, grow_by: usize) {
336 for reader in &mut self.readers {
337 let reader = unsafe { &mut *reader.get() } as &mut Reader;
338 if !reader.active() {
339 continue;
340 }
341
342 if reader.needs_shift(last_index, current_gen) {
343 reader.last_index += grow_by;
344 }
345 }
346 }
347}
348
349unsafe impl Send for ReaderMeta {}
350unsafe impl Sync for ReaderMeta {}
351
352pub struct RingBuffer<T> {
354 available: usize,
355 last_index: CircularIndex,
356 data: Data<T>,
357 free_rx: NoSharedAccess<Receiver<usize>>,
358 free_tx: NoSharedAccess<Sender<usize>>,
359 generation: Wrapping<usize>,
360 instance_id: InstanceId,
361 meta: ReaderMeta,
362}
363
364impl<T: 'static> RingBuffer<T> {
365 pub fn new(size: usize) -> Self {
367 assert!(size > 1);
368
369 let (free_tx, free_rx) = mpsc::channel();
370 let free_tx = NoSharedAccess::new(free_tx);
371 let free_rx = NoSharedAccess::new(free_rx);
372
373 RingBuffer {
374 available: size,
375 last_index: CircularIndex::at_end(size),
376 data: Data::new(size),
377 free_rx,
378 free_tx,
379 generation: Wrapping(0),
380 instance_id: InstanceId::new("`ReaderId` was not allocated by this `EventChannel`"),
381 meta: ReaderMeta::new(),
382 }
383 }
384
385 pub fn iter_write<I>(&mut self, iter: I)
387 where
388 I: IntoIterator<Item = T>,
389 I::IntoIter: ExactSizeIterator,
390 {
391 let iter = iter.into_iter();
392 let len = iter.len();
393 if len > 0 {
394 self.ensure_additional(len);
395 for element in iter {
396 unsafe {
397 self.data.put(self.last_index + 1, element);
398 }
399 self.last_index += 1;
400 }
401 self.available -= len;
402 self.generation += Wrapping(1);
403 }
404 }
405
406 pub fn drain_vec_write(&mut self, data: &mut Vec<T>) {
408 self.iter_write(data.drain(..));
409 }
410
411 pub fn would_write(&mut self) -> bool {
413 self.maintain();
414
415 self.meta.has_reader()
416 }
417
418 #[inline(always)]
421 pub fn ensure_additional(&mut self, num: usize) {
422 if self.available >= num {
423 return;
424 }
425
426 self.ensure_additional_slow(num);
427 }
428
429 #[inline(never)]
430 fn ensure_additional_slow(&mut self, num: usize) {
431 self.maintain();
432 let left: usize = match self.meta.nearest_index(self.last_index, self.generation.0) {
433 None => {
434 self.available = self.last_index.size;
435
436 return;
437 }
438 Some(reader) => {
439 let left = reader.distance_from(self.last_index, self.generation.0);
440
441 self.available = left;
442
443 if left >= num {
444 return;
445 } else {
446 left
447 }
448 }
449 };
450 let grow_by = num - left;
451 let min_target_size = self.last_index.size + grow_by;
452
453 let mut size = 2 * self.last_index.size;
455 while size < min_target_size {
456 size *= 2;
457 }
458
459 let grow_by = size - self.last_index.size;
461
462 unsafe {
464 self.data.grow(self.last_index + 1, grow_by);
465 }
466 self.last_index.size = size;
467
468 self.meta
469 .shift(self.last_index.index, self.generation.0, grow_by);
470 self.available = grow_by + left
471 }
472
473 fn maintain(&mut self) {
474 while let Ok(id) = self.free_rx.get_mut().try_recv() {
475 self.meta.remove(id);
476 }
477 }
478
479 pub fn single_write(&mut self, element: T) {
481 use std::iter::once;
482
483 self.iter_write(once(element));
484 }
485
486 pub fn new_reader_id(&mut self) -> ReaderId<T> {
488 self.maintain();
489 let last_index = self.last_index.index;
490 let generation = self.generation.0;
491 let id = self.meta.alloc(last_index, generation);
492
493 ReaderId {
494 id,
495 marker: PhantomData,
496 reference: self.instance_id.reference(),
497 drop_notifier: NoSharedAccess::new(self.free_tx.get_mut().clone()),
498 }
499 }
500
501 pub fn read(&self, reader_id: &mut ReaderId<T>) -> StorageIterator<T> {
504 self.instance_id.assert_eq(&reader_id.reference);
508
509 let (last_read_index, gen) = {
510 let reader = self.meta.reader(reader_id).unwrap_or_else(|| {
511 panic!(
512 "ReaderId not registered: {}\n\
513 This usually means that this ReaderId \
514 was created by a different storage",
515 reader_id.id
516 )
517 });
518 let old = reader.last_index;
519 reader.last_index = self.last_index.index;
520 let old_gen = reader.generation;
521 reader.generation = self.generation.0;
522
523 (old, old_gen)
524 };
525 let mut index = CircularIndex::new(last_read_index, self.last_index.size);
526 index += 1;
527 if gen == self.generation.0 {
528 index = CircularIndex::magic(index.size);
530 }
531
532 let iter = StorageIterator {
533 data: &self.data,
534 end: self.last_index.index,
535 index,
536 };
537
538 iter
539 }
540}
541
542impl<T: Debug> Debug for RingBuffer<T> {
543 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
544 f.debug_struct("RingBuffer")
545 .field("available", &self.available)
546 .field("instance_id", &self.instance_id)
547 .field("data", &self.data)
548 .field("last_index", &self.last_index)
549 .finish()
550 }
551}
552
553impl<T> Drop for RingBuffer<T> {
554 fn drop(&mut self) {
555 unsafe {
556 self.data.clean(self.last_index + 1);
557 }
558 }
559}
560
561#[derive(Debug)]
563pub struct StorageIterator<'a, T: 'a> {
564 data: &'a Data<T>,
565 end: usize,
567 index: CircularIndex,
568}
569
570impl<'a, T> Iterator for StorageIterator<'a, T> {
571 type Item = &'a T;
572
573 fn next(&mut self) -> Option<&'a T> {
574 self.index
575 .step(self.end)
576 .map(|i| unsafe { self.data.get(i) })
577 }
578
579 fn size_hint(&self) -> (usize, Option<usize>) {
581 let len = self.len();
582
583 (len, Some(len))
584 }
585}
586
587impl<'a, T> DoubleEndedIterator for StorageIterator<'a, T> {
588 fn next_back(&mut self) -> Option<Self::Item> {
589 self.index
590 .step_back(&mut self.end)
591 .map(|i| unsafe { self.data.get(i) })
592 }
593}
594
595impl<'a, T> ExactSizeIterator for StorageIterator<'a, T> {
596 fn len(&self) -> usize {
597 match self.index.is_magic() {
598 true => 0,
599 false => (CircularIndex::new(self.end, self.index.size) - self.index.index) + 1,
600 }
601 }
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607
608 #[derive(Debug, Clone, PartialEq)]
609 struct Test {
610 pub id: u32,
611 }
612
613 #[derive(Debug, Clone, PartialEq)]
614 struct Test2 {
615 pub id: u32,
616 }
617
618 #[test]
619 fn test_size() {
620 let mut buffer = RingBuffer::<i32>::new(4);
621
622 buffer.single_write(55);
623
624 let mut reader = buffer.new_reader_id();
625
626 buffer.iter_write(0..16);
627 assert_eq!(buffer.read(&mut reader).len(), 16);
628
629 buffer.iter_write(0..6);
630 assert_eq!(buffer.read(&mut reader).len(), 6);
631 }
632
633 #[test]
634 fn test_circular() {
635 let mut buffer = RingBuffer::<i32>::new(4);
636
637 buffer.single_write(55);
638
639 let mut reader = buffer.new_reader_id();
640
641 buffer.iter_write(0..4);
642 assert_eq!(
643 buffer.read(&mut reader).cloned().collect::<Vec<_>>(),
644 vec![0, 1, 2, 3]
645 );
646 }
647
648 #[test]
649 fn test_empty_write() {
650 let mut buffer = RingBuffer::<Test>::new(10);
651 buffer.drain_vec_write(&mut vec![]);
652 assert_eq!(buffer.data.num_initialized(), 0);
653 }
654
655 #[test]
656 fn test_too_large_write() {
657 let mut buffer = RingBuffer::<Test>::new(10);
658 let _reader = buffer.new_reader_id();
660 buffer.drain_vec_write(&mut events(15));
661 assert_eq!(buffer.data.num_initialized(), 15);
662 }
663
664 #[test]
665 fn test_empty_read() {
666 let mut buffer = RingBuffer::<Test>::new(10);
667 let mut reader_id = buffer.new_reader_id();
668 let data = buffer.read(&mut reader_id);
669 assert_eq!(Vec::<Test>::default(), data.cloned().collect::<Vec<_>>())
670 }
671
672 #[test]
673 fn test_empty_read_rev() {
674 let mut buffer = RingBuffer::<Test>::new(10);
675 let mut reader_id = buffer.new_reader_id();
676 let data = buffer.read(&mut reader_id);
677 assert_eq!(
678 Vec::<Test>::default(),
679 data.rev().cloned().collect::<Vec<_>>()
680 )
681 }
682
683 #[test]
684 fn test_empty_read_write_before_id() {
685 let mut buffer = RingBuffer::<Test>::new(10);
686 buffer.drain_vec_write(&mut events(2));
687 let mut reader_id = buffer.new_reader_id();
688 let data = buffer.read(&mut reader_id);
689 assert_eq!(Vec::<Test>::default(), data.cloned().collect::<Vec<_>>())
690 }
691
692 #[test]
693 fn test_empty_read_write_before_id_rev() {
694 let mut buffer = RingBuffer::<Test>::new(10);
695 buffer.drain_vec_write(&mut events(2));
696 let mut reader_id = buffer.new_reader_id();
697 let data = buffer.read(&mut reader_id);
698 assert_eq!(
699 Vec::<Test>::default(),
700 data.rev().cloned().collect::<Vec<_>>()
701 )
702 }
703
704 #[test]
705 fn test_read() {
706 let mut buffer = RingBuffer::<Test>::new(10);
707 let mut reader_id = buffer.new_reader_id();
708 buffer.drain_vec_write(&mut events(2));
709 assert_eq!(
710 vec![Test { id: 0 }, Test { id: 1 }],
711 buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
712 );
713
714 assert_eq!(
715 Vec::<Test>::new(),
716 buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
717 );
718 }
719
720 #[test]
721 fn test_read_rev() {
722 let mut buffer = RingBuffer::<Test>::new(10);
723 let mut reader_id = buffer.new_reader_id();
724 buffer.drain_vec_write(&mut events(2));
725 assert_eq!(
726 vec![Test { id: 1 }, Test { id: 0 }],
727 buffer
728 .read(&mut reader_id)
729 .rev()
730 .cloned()
731 .collect::<Vec<_>>()
732 );
733
734 assert_eq!(
735 Vec::<Test>::new(),
736 buffer
737 .read(&mut reader_id)
738 .rev()
739 .cloned()
740 .collect::<Vec<_>>()
741 );
742 }
743
744 #[test]
745 fn test_read_wrap_around_rev() {
746 let mut buffer = RingBuffer::<Test>::new(4);
747 let mut reader_id = buffer.new_reader_id();
748 buffer.drain_vec_write(&mut events(2));
749 assert_eq!(
751 vec![Test { id: 1 }, Test { id: 0 }],
752 buffer
753 .read(&mut reader_id)
754 .rev()
755 .cloned()
756 .collect::<Vec<_>>()
757 );
758 buffer.drain_vec_write(&mut events(3));
759 assert_eq!(
761 vec![Test { id: 2 }, Test { id: 1 }, Test { id: 0 }],
762 buffer
763 .read(&mut reader_id)
764 .rev()
765 .cloned()
766 .collect::<Vec<_>>()
767 );
768
769 assert_eq!(
770 Vec::<Test>::new(),
771 buffer
772 .read(&mut reader_id)
773 .rev()
774 .cloned()
775 .collect::<Vec<_>>()
776 );
777 }
778
779 #[test]
780 fn test_read_wrap_around_overflow_rev() {
781 let mut buffer = RingBuffer::<Test>::new(5);
782 let mut reader_id = buffer.new_reader_id();
783 buffer.drain_vec_write(&mut events(2));
784 assert_eq!(
785 vec![Test { id: 1 }, Test { id: 0 }],
786 buffer
787 .read(&mut reader_id)
788 .rev()
789 .cloned()
790 .collect::<Vec<_>>()
791 );
792 buffer.drain_vec_write(&mut events(6));
793 assert_eq!(
795 events(6).into_iter().rev().collect::<Vec<_>>(),
796 buffer
797 .read(&mut reader_id)
798 .rev()
799 .cloned()
800 .collect::<Vec<_>>()
801 );
802
803 assert_eq!(
804 Vec::<Test>::new(),
805 buffer
806 .read(&mut reader_id)
807 .rev()
808 .cloned()
809 .collect::<Vec<_>>()
810 );
811 }
812
813 #[test]
814 fn test_write_overflow() {
815 let mut buffer = RingBuffer::<Test>::new(3);
816 let mut reader_id = buffer.new_reader_id();
817 buffer.drain_vec_write(&mut events(4));
818 let data = buffer.read(&mut reader_id);
819 assert_eq!(
820 vec![
821 Test { id: 0 },
822 Test { id: 1 },
823 Test { id: 2 },
824 Test { id: 3 },
825 ],
826 data.cloned().collect::<Vec<_>>()
827 );
828 }
829
830 #[test]
832 fn test_send_sync() {
833 trait SendSync: Send + Sync {
834 fn is_send_sync() -> bool;
835 }
836
837 impl<T> SendSync for T
838 where
839 T: Send + Sync,
840 {
841 fn is_send_sync() -> bool {
842 true
843 }
844 }
845
846 assert!(RingBuffer::<Test>::is_send_sync());
847 assert!(ReaderId::<Test>::is_send_sync());
848 }
849
850 #[test]
851 fn test_reader_reuse() {
852 let mut buffer = RingBuffer::<Test>::new(3);
853 {
854 let _reader_id = buffer.new_reader_id();
855 }
856 let _reader_id = buffer.new_reader_id();
857 assert_eq!(_reader_id.id, 0);
858 assert_eq!(buffer.meta.readers.len(), 1);
859 }
860
861 #[test]
862 fn test_prevent_excess_growth() {
863 let mut buffer = RingBuffer::<Test>::new(3);
864 let mut reader_id = buffer.new_reader_id();
865 println!("Initial buffer state: {:#?}", buffer);
866 println!("--- first write ---");
867 buffer.drain_vec_write(&mut events(2));
868 println!("--- second write ---");
869 buffer.drain_vec_write(&mut events(2));
870 println!("--- writes complete ---");
871 assert_eq!(
873 vec![
874 Test { id: 0 },
875 Test { id: 1 },
876 Test { id: 0 },
877 Test { id: 1 },
878 ],
879 buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
880 );
881
882 buffer.drain_vec_write(&mut events(4));
883 assert_eq!(buffer.data.num_initialized(), 6);
886 assert_eq!(
887 vec![
888 Test { id: 0 },
889 Test { id: 1 },
890 Test { id: 2 },
891 Test { id: 3 },
892 ],
893 buffer.read(&mut reader_id).cloned().collect::<Vec<_>>()
894 );
895 }
896
897 #[test]
898 fn test_write_slice() {
899 let mut buffer = RingBuffer::<Test>::new(10);
900 let mut reader_id = buffer.new_reader_id();
901 buffer.iter_write(events(2));
902 let data = buffer.read(&mut reader_id);
903 assert_eq!(
904 vec![Test { id: 0 }, Test { id: 1 }],
905 data.cloned().collect::<Vec<_>>()
906 );
907 }
908
909 #[test]
910 fn iter_write_empty() {
911 let mut buffer = RingBuffer::<Test>::new(10);
912 let mut reader_id = buffer.new_reader_id();
913 buffer.iter_write(Vec::new());
914 let mut data = buffer.read(&mut reader_id);
915 assert_eq!(None, data.next());
916 }
917
918 fn events(n: u32) -> Vec<Test> {
919 (0..n).map(|i| Test { id: i }).collect::<Vec<_>>()
920 }
921}