shrev/lib.rs
1//! Event channel, pull based, that use a ringbuffer for internal
2//! storage, to make it possible to do immutable reads.
3//!
4//! See examples directory for examples.
5
6#![warn(missing_docs)]
7
8pub use crate::storage::{ReaderId, StorageIterator as EventIterator};
9
10use crate::storage::RingBuffer;
11
12mod storage;
13mod util;
14
15/// Marker trait for data to use with the EventChannel.
16///
17/// Has an implementation for all types where its bounds are satisfied.
18pub trait Event: Send + Sync + 'static {}
19
20impl<T> Event for T where T: Send + Sync + 'static {}
21
22const DEFAULT_CAPACITY: usize = 64;
23
24/// The `EventChannel`, which is the central component of `shrev`.
25///
26/// ## How it works
27///
28/// This channel has a ring buffer, which it allocates with an initial capacity.
29/// Once allocated, it writes new events into the buffer, wrapping around when
30/// it reaches the "end" of the buffer.
31///
32/// However, before an event gets written into the buffer, the channel checks if
33/// all readers have read the event which is about to be overwritten. In case
34/// the answer is "No", it will grow the buffer so no events get overwritten.
35///
36/// Readers are stores in the `EventChannel` itself, because we need to access
37/// their position in a write, so we can check what's described above. Thus, you
38/// only get a `ReaderId` as a handle.
39///
40/// ## What do I use it for?
41///
42/// The `EventChannel` is basically a single producer, multiple consumer
43/// ("SPMC") channel. That is, a `write` to the channel requires mutable access,
44/// while reading can be done with just an immutable reference. All readers
45/// (consumers) will always get all the events since their last read (or when
46/// they were created, if there was no read yet).
47///
48/// ## Examples
49///
50/// ```
51/// use std::mem::drop;
52///
53/// use shrev::{EventChannel, ReaderId};
54///
55/// // The buffer will initially be 16 events big
56/// let mut channel = EventChannel::with_capacity(16);
57///
58/// // This is basically with no effect; no reader can possibly observe it
59/// channel.single_write(42i32);
60///
61/// let mut first_reader = channel.register_reader();
62///
63/// // What's interesting here is that we don't check the readers' positions _yet_
64/// // That is because the size of 16 allows us to write 16 events before we need to perform
65/// // such a check.
66/// channel.iter_write(0..4);
67///
68/// // Now, we read 4 events (0, 1, 2, 3)
69/// // Notice how we borrow the ID mutably; this is because logically we modify the reader,
70/// // and we shall not read with the same ID concurrently
71/// let _events = channel.read(&mut first_reader);
72///
73/// // Let's create a second reader; this one will not receive any of the previous events
74/// let mut second_reader = channel.register_reader();
75///
76/// // No event returned
77/// let _events = channel.read(&mut second_reader);
78///
79/// channel.iter_write(4..6);
80///
81/// // Both now get the same two events
82/// let _events = channel.read(&mut first_reader);
83/// let _events = channel.read(&mut second_reader);
84///
85/// // We no longer need our second reader, so we drop it
86/// // This is important, since otherwise the buffer would keep growing if our reader doesn't read
87/// // any events
88/// drop(second_reader);
89/// ```
90#[derive(Debug)]
91pub struct EventChannel<E> {
92 storage: RingBuffer<E>,
93}
94
95impl<E> Default for EventChannel<E>
96where
97 E: Event,
98{
99 fn default() -> Self {
100 EventChannel::with_capacity(DEFAULT_CAPACITY)
101 }
102}
103
104impl<E> EventChannel<E>
105where
106 E: Event,
107{
108 /// Create a new `EventChannel` with a default size of 64.
109 pub fn new() -> Self {
110 Default::default()
111 }
112
113 /// Create a new `EventChannel` with the given starting capacity.
114 pub fn with_capacity(size: usize) -> Self {
115 Self {
116 storage: RingBuffer::new(size),
117 }
118 }
119
120 /// Returns `true` if any reader would observe an additional event.
121 ///
122 /// This can be used to skip calls to `iter_write` in case the event
123 /// construction is expensive.
124 pub fn would_write(&mut self) -> bool {
125 self.storage.would_write()
126 }
127
128 /// Register a new reader.
129 ///
130 /// To be able to read events, a reader id is required. This is because
131 /// otherwise the channel wouldn't know where in the ring buffer the
132 /// reader has read to earlier. This information is stored in the channel,
133 /// associated with the returned `ReaderId`.
134 ///
135 /// A newly created `ReaderId` will only receive the events written after
136 /// its creation.
137 ///
138 /// Once you no longer perform `read`s with your `ReaderId`, you should
139 /// drop it so the channel can safely overwrite events not read by it.
140 pub fn register_reader(&mut self) -> ReaderId<E> {
141 self.storage.new_reader_id()
142 }
143
144 /// Write a slice of events into storage
145 #[deprecated(note = "please use `iter_write` instead")]
146 pub fn slice_write(&mut self, events: &[E])
147 where
148 E: Clone,
149 {
150 self.storage.iter_write(events.into_iter().cloned());
151 }
152
153 /// Write an iterator of events into storage
154 pub fn iter_write<I>(&mut self, iter: I)
155 where
156 I: IntoIterator<Item = E>,
157 I::IntoIter: ExactSizeIterator,
158 {
159 self.storage.iter_write(iter);
160 }
161
162 /// Drain a vector of events into storage.
163 pub fn drain_vec_write(&mut self, events: &mut Vec<E>) {
164 self.storage.drain_vec_write(events);
165 }
166
167 /// Write a single event into storage.
168 pub fn single_write(&mut self, event: E) {
169 self.storage.single_write(event);
170 }
171
172 /// Read any events that have been written to storage since the last read
173 /// with `reader_id` (or the creation of the `ReaderId`, if it hasn't read
174 /// yet).
175 ///
176 /// Note that this will advance the position of the reader regardless of
177 /// what you do with the iterator. In other words, calling `read`
178 /// without iterating the result won't preserve the events returned. You
179 /// need to iterate all the events as soon as you got them from this
180 /// method. This behavior is equivalent to e.g. `Vec::drain`.
181 pub fn read(&self, reader_id: &mut ReaderId<E>) -> EventIterator<E> {
182 self.storage.read(reader_id)
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[derive(Debug, Clone, PartialEq)]
191 struct Test {
192 pub id: u32,
193 }
194
195 #[test]
196 fn test_grow() {
197 let mut channel = EventChannel::with_capacity(10);
198
199 let mut reader0 = channel.register_reader();
200 let mut reader1 = channel.register_reader();
201
202 channel.iter_write(vec![1, 2, 3, 4, 5, 6, 7, 8]);
203
204 let data = channel.read(&mut reader0).cloned().collect::<Vec<_>>();
205 assert_eq!(data, vec![1, 2, 3, 4, 5, 6, 7, 8]);
206
207 channel.iter_write(vec![9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]);
208
209 let data = channel.read(&mut reader0).cloned().collect::<Vec<_>>();
210 assert_eq!(
211 data,
212 vec![9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22]
213 );
214
215 for i in 23..10_000 {
216 channel.single_write(i);
217 }
218
219 let data = channel.read(&mut reader1).cloned().collect::<Vec<_>>();
220 assert_eq!(data, (1..10_000).collect::<Vec<_>>());
221 }
222
223 #[test]
224 fn test_read_write() {
225 let mut channel = EventChannel::with_capacity(14);
226
227 let mut reader_id = channel.register_reader();
228 let mut reader_id_extra = channel.register_reader();
229
230 channel.single_write(Test { id: 1 });
231 assert_eq!(
232 vec![Test { id: 1 }],
233 channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
234 );
235 channel.single_write(Test { id: 2 });
236 assert_eq!(
237 vec![Test { id: 2 }],
238 channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
239 );
240
241 assert_eq!(
242 vec![Test { id: 1 }, Test { id: 2 }],
243 channel
244 .read(&mut reader_id_extra)
245 .cloned()
246 .collect::<Vec<_>>()
247 );
248
249 channel.single_write(Test { id: 3 });
250 assert_eq!(
251 vec![Test { id: 3 }],
252 channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
253 );
254 assert_eq!(
255 vec![Test { id: 3 }],
256 channel
257 .read(&mut reader_id_extra)
258 .cloned()
259 .collect::<Vec<_>>()
260 );
261 }
262
263 // There was previously a case where the tests worked but the example didn't, so
264 // the example was added as a test case.
265 #[test]
266 fn test_example() {
267 let mut channel = EventChannel::new();
268
269 channel.drain_vec_write(&mut vec![TestEvent { data: 1 }, TestEvent { data: 2 }]);
270
271 let mut reader_id = channel.register_reader();
272
273 // Should be empty, because reader was created after the write
274 assert_eq!(
275 Vec::<TestEvent>::default(),
276 channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
277 );
278
279 // Should have data, as a second write was done
280 channel.single_write(TestEvent { data: 5 });
281
282 assert_eq!(
283 vec![TestEvent { data: 5 }],
284 channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
285 );
286
287 // We can also just send in an iterator.
288 channel.iter_write(
289 [TestEvent { data: 8 }, TestEvent { data: 9 }]
290 .iter()
291 .cloned(),
292 );
293
294 assert_eq!(
295 vec![TestEvent { data: 8 }, TestEvent { data: 9 }],
296 channel.read(&mut reader_id).cloned().collect::<Vec<_>>()
297 );
298 }
299
300 #[derive(Clone, Debug, PartialEq, Eq)]
301 pub struct TestEvent {
302 data: u32,
303 }
304}