shrev/
lib.rs

1//! Event channel, pull based, that use a ringbuffer for internal
2//! storage, to make it possible to do immutable reads.
3//!
4//! See examples directory for examples.
5
6#![warn(missing_docs)]
7
8pub use crate::storage::{ReaderId, StorageIterator as EventIterator};
9
10use crate::storage::RingBuffer;
11
12mod storage;
13mod util;
14
15/// Marker trait for data to use with the EventChannel.
16///
17/// Has an implementation for all types where its bounds are satisfied.
18pub trait Event: Send + Sync + 'static {}
19
20impl<T> Event for T where T: Send + Sync + 'static {}
21
22const DEFAULT_CAPACITY: usize = 64;
23
24/// The `EventChannel`, which is the central component of `shrev`.
25///
26/// ## How it works
27///
28/// This channel has a ring buffer, which it allocates with an initial capacity.
29/// Once allocated, it writes new events into the buffer, wrapping around when
30/// it reaches the "end" of the buffer.
31///
32/// However, before an event gets written into the buffer, the channel checks if
33/// all readers have read the event which is about to be overwritten. In case
34/// the answer is "No", it will grow the buffer so no events get overwritten.
35///
36/// Readers are stores in the `EventChannel` itself, because we need to access
37/// their position in a write, so we can check what's described above. Thus, you
38/// only get a `ReaderId` as a handle.
39///
40/// ## What do I use it for?
41///
42/// The `EventChannel` is basically a single producer, multiple consumer
43/// ("SPMC") channel. That is, a `write` to the channel requires mutable access,
44/// while reading can be done with just an immutable reference. All readers
45/// (consumers) will always get all the events since their last read (or when
46/// they were created, if there was no read yet).
47///
48/// ## Examples
49///
50/// ```
51/// use std::mem::drop;
52///
53/// use shrev::{EventChannel, ReaderId};
54///
55/// // The buffer will initially be 16 events big
56/// let mut channel = EventChannel::with_capacity(16);
57///
58/// // This is basically with no effect; no reader can possibly observe it
59/// channel.single_write(42i32);
60///
61/// let mut first_reader = channel.register_reader();
62///
63/// // What's interesting here is that we don't check the readers' positions _yet_
64/// // That is because the size of 16 allows us to write 16 events before we need to perform
65/// // such a check.
66/// channel.iter_write(0..4);
67///
68/// // Now, we read 4 events (0, 1, 2, 3)
69/// // Notice how we borrow the ID mutably; this is because logically we modify the reader,
70/// // and we shall not read with the same ID concurrently
71/// let _events = channel.read(&mut first_reader);
72///
73/// // Let's create a second reader; this one will not receive any of the previous events
74/// let mut second_reader = channel.register_reader();
75///
76/// // No event returned
77/// let _events = channel.read(&mut second_reader);
78///
79/// channel.iter_write(4..6);
80///
81/// // Both now get the same two events
82/// let _events = channel.read(&mut first_reader);
83/// let _events = channel.read(&mut second_reader);
84///
85/// // We no longer need our second reader, so we drop it
86/// // This is important, since otherwise the buffer would keep growing if our reader doesn't read
87/// // any events
88/// drop(second_reader);
89/// ```
90#[derive(Debug)]
91pub struct EventChannel<E> {
92    storage: RingBuffer<E>,
93}
94
95impl<E> Default for EventChannel<E>
96where
97    E: Event,
98{
99    fn default() -> Self {
100        EventChannel::with_capacity(DEFAULT_CAPACITY)
101    }
102}
103
104impl<E> EventChannel<E>
105where
106    E: Event,
107{
108    /// Create a new `EventChannel` with a default size of 64.
109    pub fn new() -> Self {
110        Default::default()
111    }
112
113    /// Create a new `EventChannel` with the given starting capacity.
114    pub fn with_capacity(size: usize) -> Self {
115        Self {
116            storage: RingBuffer::new(size),
117        }
118    }
119
120    /// Returns `true` if any reader would observe an additional event.
121    ///
122    /// This can be used to skip calls to `iter_write` in case the event
123    /// construction is expensive.
124    pub fn would_write(&mut self) -> bool {
125        self.storage.would_write()
126    }
127
128    /// Register a new reader.
129    ///
130    /// To be able to read events, a reader id is required. This is because
131    /// otherwise the channel wouldn't know where in the ring buffer the
132    /// reader has read to earlier. This information is stored in the channel,
133    /// associated with the returned `ReaderId`.
134    ///
135    /// A newly created `ReaderId` will only receive the events written after
136    /// its creation.
137    ///
138    /// Once you no longer perform `read`s with your `ReaderId`, you should
139    /// drop it so the channel can safely overwrite events not read by it.
140    pub fn register_reader(&mut self) -> ReaderId<E> {
141        self.storage.new_reader_id()
142    }
143
144    /// Write a slice of events into storage
145    #[deprecated(note = "please use `iter_write` instead")]
146    pub fn slice_write(&mut self, events: &[E])
147    where
148        E: Clone,
149    {
150        self.storage.iter_write(events.into_iter().cloned());
151    }
152
153    /// Write an iterator of events into storage
154    pub fn iter_write<I>(&mut self, iter: I)
155    where
156        I: IntoIterator<Item = E>,
157        I::IntoIter: ExactSizeIterator,
158    {
159        self.storage.iter_write(iter);
160    }
161
162    /// Drain a vector of events into storage.
163    pub fn drain_vec_write(&mut self, events: &mut Vec<E>) {
164        self.storage.drain_vec_write(events);
165    }
166
167    /// Write a single event into storage.
168    pub fn single_write(&mut self, event: E) {
169        self.storage.single_write(event);
170    }
171
172    /// Read any events that have been written to storage since the last read
173    /// with `reader_id` (or the creation of the `ReaderId`, if it hasn't read
174    /// yet).
175    ///
176    /// Note that this will advance the position of the reader regardless of
177    /// what you do with the iterator. In other words, calling `read`
178    /// without iterating the result won't preserve the events returned. You
179    /// need to iterate all the events as soon as you got them from this
180    /// method. This behavior is equivalent to e.g. `Vec::drain`.
181    pub fn read(&self, reader_id: &mut ReaderId<E>) -> EventIterator<E> {
182        self.storage.read(reader_id)
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[derive(Debug, Clone, PartialEq)]
191    struct Test {
192        pub id: u32,
193    }
194
195    #[test]
196    fn test_grow() {
197        let mut channel = EventChannel::with_capacity(10);
198
199        let mut reader0 = channel.register_reader();
200        let mut reader1 = channel.register_reader();
201
202        channel.iter_write(vec![1, 2, 3, 4, 5, 6, 7, 8]);
203
204        let data = channel.read(&mut reader0).cloned().collect::<Vec<_>>();
205        assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8]);
206
207        channel.iter_write(vec![9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]);
208
209        let data = channel.read(&mut reader0).cloned().collect::<Vec<_>>();
210        assert_eq!(
211            data,
212            vec![9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
213        );
214
215        for i in 23..10_000 {
216            channel.single_write(i);
217        }
218
219        let data = channel.read(&mut reader1).cloned().collect::<Vec<_>>();
220        assert_eq!(data, (1..10_000).collect::<Vec<_>>());
221    }
222
223    #[test]
224    fn test_read_write() {
225        let mut channel = EventChannel::with_capacity(14);
226
227        let mut reader_id = channel.register_reader();
228        let mut reader_id_extra = channel.register_reader();
229
230        channel.single_write(Test { id: 1 });
231        assert_eq!(
232            vec![Test { id: 1 }],
233            channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
234        );
235        channel.single_write(Test { id: 2 });
236        assert_eq!(
237            vec![Test { id: 2 }],
238            channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
239        );
240
241        assert_eq!(
242            vec![Test { id: 1 }, Test { id: 2 }],
243            channel
244                .read(&mut reader_id_extra)
245                .cloned()
246                .collect::<Vec<_>>()
247        );
248
249        channel.single_write(Test { id: 3 });
250        assert_eq!(
251            vec![Test { id: 3 }],
252            channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
253        );
254        assert_eq!(
255            vec![Test { id: 3 }],
256            channel
257                .read(&mut reader_id_extra)
258                .cloned()
259                .collect::<Vec<_>>()
260        );
261    }
262
263    // There was previously a case where the tests worked but the example didn't, so
264    // the example was added as a test case.
265    #[test]
266    fn test_example() {
267        let mut channel = EventChannel::new();
268
269        channel.drain_vec_write(&mut vec![TestEvent { data: 1 }, TestEvent { data: 2 }]);
270
271        let mut reader_id = channel.register_reader();
272
273        // Should be empty, because reader was created after the write
274        assert_eq!(
275            Vec::<TestEvent>::default(),
276            channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
277        );
278
279        // Should have data, as a second write was done
280        channel.single_write(TestEvent { data: 5 });
281
282        assert_eq!(
283            vec![TestEvent { data: 5 }],
284            channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
285        );
286
287        // We can also just send in an iterator.
288        channel.iter_write(
289            [TestEvent { data: 8 }, TestEvent { data: 9 }]
290                .iter()
291                .cloned(),
292        );
293
294        assert_eq!(
295            vec![TestEvent { data: 8 }, TestEvent { data: 9 }],
296            channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
297        );
298    }
299
300    #[derive(Clone, Debug, PartialEq, Eq)]
301    pub struct TestEvent {
302        data: u32,
303    }
304}