flume_overwrite/lib.rs
1//! # Flume Overwrite
2//!
3//! A library that provides bounded channels with overwrite capability, built on top of the `flume` crate.
4//! When the channel reaches capacity, new messages will overwrite the oldest unread messages.
5//!
6//! ## Features
7//!
8//! - **Bounded channels with overwrite**: Messages sent to a full channel will replace the oldest messages
9//! - **Async support**: Both blocking and async send operations
10//! - **Drain tracking**: Returns information about which messages were overwritten
11//!
12//! ## Examples
13//!
14//! ```rust
15//! use flume_overwrite::bounded;
16//!
17//! // Create a channel with capacity 3
18//! let (sender, receiver) = bounded(3);
19//!
20//! // Send messages normally when under capacity
21//! sender.send_overwrite(1).unwrap();
22//! sender.send_overwrite(2).unwrap();
23//! sender.send_overwrite(3).unwrap();
24//!
25//! // This will overwrite the first message (1)
26//! let overwritten = sender.send_overwrite(4).unwrap();
27//! assert_eq!(overwritten, Some(vec![1]));
28//!
29//! // Receive the remaining messages
30//! assert_eq!(receiver.recv().unwrap(), 2);
31//! assert_eq!(receiver.recv().unwrap(), 3);
32//! assert_eq!(receiver.recv().unwrap(), 4);
33//! ```
34
35use flume::{Receiver, SendError, Sender};
36use std::ops::Deref;
37
38/// Creates a bounded channel with overwrite capability.
39///
40/// Returns a tuple of `(OverwriteSender<T>, Receiver<T>)` where the sender can overwrite
41/// old messages when the channel reaches capacity, and the receiver is a standard flume receiver.
42///
43/// # Arguments
44///
45/// * `cap` - The maximum number of messages the channel can hold
46///
47/// # Returns
48///
49/// A tuple containing:
50/// - `OverwriteSender<T>` - A sender that can overwrite old messages when at capacity
51/// - `Receiver<T>` - A standard flume receiver for reading messages
52///
53/// # Examples
54///
55/// ```rust
56/// use flume_overwrite::bounded;
57///
58/// let (sender, receiver) = bounded(2);
59/// sender.send_overwrite("hello").unwrap();
60/// sender.send_overwrite("world").unwrap();
61///
62/// assert_eq!(receiver.recv().unwrap(), "hello");
63/// assert_eq!(receiver.recv().unwrap(), "world");
64/// ```
65pub fn bounded<T>(cap: usize) -> (OverwriteSender<T>, Receiver<T>) {
66 let (tx, rx) = flume::bounded(cap);
67 let overwrite_sender = OverwriteSender {
68 sender: tx,
69 receiver: rx.clone(),
70 };
71 (overwrite_sender, rx)
72}
73
74/// A sender that can overwrite old messages when the channel reaches capacity.
75///
76/// `OverwriteSender<T>` wraps a flume `Sender<T>` and provides additional functionality
77/// to automatically remove old messages when sending would block due to a full channel.
78///
79/// This struct implements `Deref` to `Sender<T>`, so all standard sender methods are available.
80/// Additionally, it provides `send_overwrite` and `send_overwrite_async` methods that will
81/// never block due to a full channel.
82///
83/// # Examples
84///
85/// ```rust
86/// use flume_overwrite::bounded;
87///
88/// let (sender, receiver) = bounded(1);
89///
90/// // First message goes through normally
91/// sender.send_overwrite("first").unwrap();
92///
93/// // Second message overwrites the first
94/// let overwritten = sender.send_overwrite("second").unwrap();
95/// assert_eq!(overwritten, Some(vec!["first"]));
96/// ```
97#[derive(Clone)]
98pub struct OverwriteSender<T> {
99 sender: Sender<T>,
100 receiver: Receiver<T>,
101}
102
103impl<T> Deref for OverwriteSender<T> {
104 type Target = Sender<T>;
105
106 fn deref(&self) -> &Self::Target {
107 &self.sender
108 }
109}
110
111impl<T> OverwriteSender<T> {
112 /// Sends a value, overwriting old messages if the channel is at capacity.
113 ///
114 /// This method will never block. If the channel is at capacity, it will remove
115 /// old messages from the front of the queue until there's space for the new message.
116 ///
117 /// # Arguments
118 ///
119 /// * `value` - The value to send through the channel
120 ///
121 /// # Returns
122 ///
123 /// - `Ok(None)` - The message was sent without overwriting any existing messages
124 /// - `Ok(Some(Vec<T>))` - The message was sent and the returned vector contains
125 /// the messages that were overwritten (removed from the channel)
126 /// - `Err(SendError<T>)` - The channel is disconnected
127 ///
128 /// # Examples
129 ///
130 /// ```rust
131 /// use flume_overwrite::bounded;
132 ///
133 /// let (sender, receiver) = bounded(2);
134 ///
135 /// // Send without overwriting
136 /// assert_eq!(sender.send_overwrite(1).unwrap(), None);
137 /// assert_eq!(sender.send_overwrite(2).unwrap(), None);
138 ///
139 /// // This will overwrite the first message
140 /// let overwritten = sender.send_overwrite(3).unwrap();
141 /// assert_eq!(overwritten, Some(vec![1]));
142 /// ```
143 pub fn send_overwrite(&self, value: T) -> Result<Option<Vec<T>>, SendError<T>> {
144 if let Some(capacity) = self.sender.capacity() {
145 let mut drained = Vec::new();
146 while self.sender.len() >= capacity {
147 match self.receiver.try_recv() {
148 Ok(old_value) => drained.push(old_value),
149 Err(flume::TryRecvError::Empty) => (),
150 Err(_) => {
151 return Err(SendError(value));
152 }
153 }
154 }
155 self.sender.send(value)?;
156 Ok(if drained.is_empty() {
157 None
158 } else {
159 Some(drained)
160 })
161 } else {
162 self.sender.send(value)?;
163 Ok(None)
164 }
165 }
166
167 /// Asynchronously sends a value, overwriting old messages if the channel is at capacity.
168 ///
169 /// This is the async version of `send_overwrite`. Like its synchronous counterpart,
170 /// this method will never block due to a full channel - it will instead remove old
171 /// messages to make space.
172 ///
173 /// # Arguments
174 ///
175 /// * `value` - The value to send through the channel
176 ///
177 /// # Returns
178 ///
179 /// A future that resolves to:
180 /// - `Ok(None)` - The message was sent without overwriting any existing messages
181 /// - `Ok(Some(Vec<T>))` - The message was sent and the returned vector contains
182 /// the messages that were overwritten (removed from the channel)
183 /// - `Err(SendError<T>)` - The channel is disconnected
184 ///
185 /// # Examples
186 ///
187 /// ```rust
188 /// use flume_overwrite::bounded;
189 /// use futures::executor::block_on;
190 ///
191 /// let (sender, receiver) = bounded(1);
192 ///
193 /// block_on(async {
194 /// // Send without overwriting
195 /// assert_eq!(sender.send_overwrite_async(1).await.unwrap(), None);
196 ///
197 /// // This will overwrite the first message
198 /// let overwritten = sender.send_overwrite_async(2).await.unwrap();
199 /// assert_eq!(overwritten, Some(vec![1]));
200 /// });
201 /// ```
202 pub async fn send_overwrite_async(&self, value: T) -> Result<Option<Vec<T>>, SendError<T>> {
203 if let Some(capacity) = self.sender.capacity() {
204 let mut drained = Vec::new();
205 while self.sender.len() >= capacity {
206 if let Ok(old_value) = self.receiver.recv_async().await {
207 drained.push(old_value);
208 }
209 }
210 self.sender.send_async(value).await?;
211 Ok(if drained.is_empty() {
212 None
213 } else {
214 Some(drained)
215 })
216 } else {
217 self.sender.send_async(value).await?;
218 Ok(None)
219 }
220 }
221}
222
223#[cfg(test)]
224mod test {
225 use super::*;
226
227 use std::sync::Arc;
228 use std::thread;
229 use std::time::Duration;
230
231 use futures::executor::block_on;
232
233 #[test]
234 fn test_send_overwrite_under_capacity() {
235 let (sender, receiver) = bounded(3);
236 assert_eq!(sender.send_overwrite(1).unwrap(), None);
237 assert_eq!(sender.send_overwrite(2).unwrap(), None);
238 assert_eq!(receiver.try_recv().unwrap(), 1);
239 assert_eq!(receiver.try_recv().unwrap(), 2);
240 }
241
242 #[test]
243 fn test_send_overwrite_at_capacity() {
244 let (sender, receiver) = bounded(2);
245 assert_eq!(sender.send_overwrite(1).unwrap(), None);
246 assert_eq!(sender.send_overwrite(2).unwrap(), None);
247
248 let drained = sender.send_overwrite(3).unwrap();
249 assert_eq!(drained, Some(vec![1]));
250 assert_eq!(receiver.try_recv().unwrap(), 2);
251 assert_eq!(receiver.try_recv().unwrap(), 3);
252 }
253
254 #[test]
255 fn test_send_overwrite_multiple_overwrites() {
256 let (sender, receiver) = bounded(2);
257 assert_eq!(sender.send_overwrite(1).unwrap(), None);
258 assert_eq!(sender.send_overwrite(2).unwrap(), None);
259 // Fill up, then send two more, should drain two
260 let drained = sender.send_overwrite(3).unwrap();
261 assert_eq!(drained, Some(vec![1]));
262 let drained2 = sender.send_overwrite(4).unwrap();
263 assert_eq!(drained2, Some(vec![2]));
264 assert_eq!(receiver.try_recv().unwrap(), 3);
265 assert_eq!(receiver.try_recv().unwrap(), 4);
266 }
267
268 #[test]
269 fn test_send_overwrite_unbounded() {
270 let (sender, receiver) = bounded(2);
271 assert_eq!(sender.send_overwrite(1).unwrap(), None);
272 assert_eq!(sender.send_overwrite(2).unwrap(), None);
273 assert_eq!(receiver.try_recv().unwrap(), 1);
274 assert_eq!(receiver.try_recv().unwrap(), 2);
275 }
276
277 #[test]
278 fn test_send_overwrite_async_under_capacity() {
279 let (sender, receiver) = bounded(3);
280 let fut = sender.send_overwrite_async(1);
281 assert_eq!(block_on(fut).unwrap(), None);
282 let fut = sender.send_overwrite_async(2);
283 assert_eq!(block_on(fut).unwrap(), None);
284 assert_eq!(block_on(receiver.recv_async()).unwrap(), 1);
285 assert_eq!(block_on(receiver.recv_async()).unwrap(), 2);
286 }
287
288 #[test]
289 fn test_send_overwrite_async_at_capacity() {
290 let (sender, receiver) = bounded(2);
291 block_on(sender.send_overwrite_async(1)).unwrap();
292 block_on(sender.send_overwrite_async(2)).unwrap();
293 let drained = block_on(sender.send_overwrite_async(3)).unwrap();
294 assert_eq!(drained, Some(vec![1]));
295 assert_eq!(block_on(receiver.recv_async()).unwrap(), 2);
296 assert_eq!(block_on(receiver.recv_async()).unwrap(), 3);
297 }
298
299 #[test]
300 fn test_send_overwrite_async_multiple_overwrites() {
301 let (sender, receiver) = bounded(2);
302 block_on(sender.send_overwrite_async(1)).unwrap();
303 block_on(sender.send_overwrite_async(2)).unwrap();
304 let drained = block_on(sender.send_overwrite_async(3)).unwrap();
305 assert_eq!(drained, Some(vec![1]));
306 let drained2 = block_on(sender.send_overwrite_async(4)).unwrap();
307 assert_eq!(drained2, Some(vec![2]));
308 assert_eq!(block_on(receiver.recv_async()).unwrap(), 3);
309 assert_eq!(block_on(receiver.recv_async()).unwrap(), 4);
310 }
311
312 #[test]
313 fn test_send_overwrite_async_unbounded() {
314 let (sender, receiver) = bounded(2);
315 assert_eq!(block_on(sender.send_overwrite_async(1)).unwrap(), None);
316 assert_eq!(block_on(sender.send_overwrite_async(2)).unwrap(), None);
317 assert_eq!(block_on(receiver.recv_async()).unwrap(), 1);
318 assert_eq!(block_on(receiver.recv_async()).unwrap(), 2);
319 }
320
321 #[test]
322 fn test_send_overwrite_concurrent() {
323 let (sender, receiver) = bounded(2);
324 let sender_clone = sender.clone();
325 let handle = thread::spawn(move || {
326 for i in 0..5 {
327 sender_clone.send_overwrite(i).unwrap();
328 thread::sleep(Duration::from_millis(10));
329 }
330 });
331 handle.join().unwrap();
332 let mut received = Vec::new();
333 while let Ok(val) = receiver.try_recv() {
334 received.push(val);
335 }
336 // Should have at most 2 items, the last two sent
337 assert!(received.len() <= 2);
338 if received.len() == 2 {
339 assert_eq!(received, vec![3, 4]);
340 }
341 }
342
343 #[test]
344 fn test_send_overwrite_async_concurrent() {
345 use std::sync::Mutex;
346 let (sender, receiver) = bounded(2);
347 let sender_clone = sender.clone();
348 let received = Arc::new(Mutex::new(Vec::new()));
349 let received2 = received.clone();
350 let handle = thread::spawn(move || {
351 block_on(async {
352 for i in 0..5 {
353 sender_clone.send_overwrite_async(i).await.unwrap();
354 // TODO: use a real delay
355 // simulate work
356 futures_timer::Delay::new(Duration::from_millis(10)).await;
357 }
358 });
359 });
360 handle.join().unwrap();
361 while let Ok(val) = receiver.try_recv() {
362 received2.lock().unwrap().push(val);
363 }
364 let got = received.lock().unwrap();
365 // Should have at most 2 items, the last two sent
366 assert!(got.len() <= 2);
367 if got.len() == 2 {
368 assert_eq!(*got, vec![3, 4]);
369 }
370 }
371}