flume_overwrite/
lib.rs

1//! # Flume Overwrite
2//!
3//! A library that provides bounded channels with overwrite capability, built on top of the `flume` crate.
4//! When the channel reaches capacity, new messages will overwrite the oldest unread messages.
5//!
6//! ## Features
7//!
8//! - **Bounded channels with overwrite**: Messages sent to a full channel will replace the oldest messages
9//! - **Async support**: Both blocking and async send operations
10//! - **Drain tracking**: Returns information about which messages were overwritten
11//!
12//! ## Examples
13//!
14//! ```rust
15//! use flume_overwrite::bounded;
16//!
17//! // Create a channel with capacity 3
18//! let (sender, receiver) = bounded(3);
19//!
20//! // Send messages normally when under capacity
21//! sender.send_overwrite(1).unwrap();
22//! sender.send_overwrite(2).unwrap();
23//! sender.send_overwrite(3).unwrap();
24//!
25//! // This will overwrite the first message (1)
26//! let overwritten = sender.send_overwrite(4).unwrap();
27//! assert_eq!(overwritten, Some(vec![1]));
28//!
29//! // Receive the remaining messages
30//! assert_eq!(receiver.recv().unwrap(), 2);
31//! assert_eq!(receiver.recv().unwrap(), 3);
32//! assert_eq!(receiver.recv().unwrap(), 4);
33//! ```
34
35use flume::{Receiver, SendError, Sender};
36use std::ops::Deref;
37
38/// Creates a bounded channel with overwrite capability.
39///
40/// Returns a tuple of `(OverwriteSender<T>, Receiver<T>)` where the sender can overwrite
41/// old messages when the channel reaches capacity, and the receiver is a standard flume receiver.
42///
43/// # Arguments
44///
45/// * `cap` - The maximum number of messages the channel can hold
46///
47/// # Returns
48///
49/// A tuple containing:
50/// - `OverwriteSender<T>` - A sender that can overwrite old messages when at capacity
51/// - `Receiver<T>` - A standard flume receiver for reading messages
52///
53/// # Examples
54///
55/// ```rust
56/// use flume_overwrite::bounded;
57///
58/// let (sender, receiver) = bounded(2);
59/// sender.send_overwrite("hello").unwrap();
60/// sender.send_overwrite("world").unwrap();
61///
62/// assert_eq!(receiver.recv().unwrap(), "hello");
63/// assert_eq!(receiver.recv().unwrap(), "world");
64/// ```
65pub fn bounded<T>(cap: usize) -> (OverwriteSender<T>, Receiver<T>) {
66    let (tx, rx) = flume::bounded(cap);
67    let overwrite_sender = OverwriteSender {
68        sender: tx,
69        receiver: rx.clone(),
70    };
71    (overwrite_sender, rx)
72}
73
74/// A sender that can overwrite old messages when the channel reaches capacity.
75///
76/// `OverwriteSender<T>` wraps a flume `Sender<T>` and provides additional functionality
77/// to automatically remove old messages when sending would block due to a full channel.
78///
79/// This struct implements `Deref` to `Sender<T>`, so all standard sender methods are available.
80/// Additionally, it provides `send_overwrite` and `send_overwrite_async` methods that will
81/// never block due to a full channel.
82///
83/// # Examples
84///
85/// ```rust
86/// use flume_overwrite::bounded;
87///
88/// let (sender, receiver) = bounded(1);
89///
90/// // First message goes through normally
91/// sender.send_overwrite("first").unwrap();
92///
93/// // Second message overwrites the first
94/// let overwritten = sender.send_overwrite("second").unwrap();
95/// assert_eq!(overwritten, Some(vec!["first"]));
96/// ```
97#[derive(Clone, Debug)]
98pub struct OverwriteSender<T> {
99    sender: Sender<T>,
100    receiver: Receiver<T>,
101}
102
103impl<T> Deref for OverwriteSender<T> {
104    type Target = Sender<T>;
105
106    fn deref(&self) -> &Self::Target {
107        &self.sender
108    }
109}
110
111impl<T> OverwriteSender<T> {
112    /// Sends a value, overwriting old messages if the channel is at capacity.
113    ///
114    /// This method will never block. If the channel is at capacity, it will remove
115    /// old messages from the front of the queue until there's space for the new message.
116    ///
117    /// # Arguments
118    ///
119    /// * `value` - The value to send through the channel
120    ///
121    /// # Returns
122    ///
123    /// - `Ok(None)` - The message was sent without overwriting any existing messages
124    /// - `Ok(Some(Vec<T>))` - The message was sent and the returned vector contains
125    ///   the messages that were overwritten (removed from the channel)
126    /// - `Err(SendError<T>)` - The channel is disconnected
127    ///
128    /// # Examples
129    ///
130    /// ```rust
131    /// use flume_overwrite::bounded;
132    ///
133    /// let (sender, receiver) = bounded(2);
134    ///
135    /// // Send without overwriting
136    /// assert_eq!(sender.send_overwrite(1).unwrap(), None);
137    /// assert_eq!(sender.send_overwrite(2).unwrap(), None);
138    ///
139    /// // This will overwrite the first message
140    /// let overwritten = sender.send_overwrite(3).unwrap();
141    /// assert_eq!(overwritten, Some(vec![1]));
142    /// ```
143    pub fn send_overwrite(&self, value: T) -> Result<Option<Vec<T>>, SendError<T>> {
144        if let Some(capacity) = self.sender.capacity() {
145            let mut drained = Vec::new();
146            while self.sender.len() >= capacity {
147                match self.receiver.try_recv() {
148                    Ok(old_value) => drained.push(old_value),
149                    Err(flume::TryRecvError::Empty) => (),
150                    Err(_) => {
151                        return Err(SendError(value));
152                    }
153                }
154            }
155            self.sender.send(value)?;
156            Ok(if drained.is_empty() {
157                None
158            } else {
159                Some(drained)
160            })
161        } else {
162            self.sender.send(value)?;
163            Ok(None)
164        }
165    }
166
167    /// Asynchronously sends a value, overwriting old messages if the channel is at capacity.
168    ///
169    /// This is the async version of `send_overwrite`. Like its synchronous counterpart,
170    /// this method will never block due to a full channel - it will instead remove old
171    /// messages to make space.
172    ///
173    /// # Arguments
174    ///
175    /// * `value` - The value to send through the channel
176    ///
177    /// # Returns
178    ///
179    /// A future that resolves to:
180    /// - `Ok(None)` - The message was sent without overwriting any existing messages
181    /// - `Ok(Some(Vec<T>))` - The message was sent and the returned vector contains
182    ///   the messages that were overwritten (removed from the channel)
183    /// - `Err(SendError<T>)` - The channel is disconnected
184    ///
185    /// # Examples
186    ///
187    /// ```rust
188    /// use flume_overwrite::bounded;
189    /// use futures::executor::block_on;
190    ///
191    /// let (sender, receiver) = bounded(1);
192    ///
193    /// block_on(async {
194    ///     // Send without overwriting
195    ///     assert_eq!(sender.send_overwrite_async(1).await.unwrap(), None);
196    ///     
197    ///     // This will overwrite the first message
198    ///     let overwritten = sender.send_overwrite_async(2).await.unwrap();
199    ///     assert_eq!(overwritten, Some(vec![1]));
200    /// });
201    /// ```
202    pub async fn send_overwrite_async(&self, value: T) -> Result<Option<Vec<T>>, SendError<T>> {
203        if let Some(capacity) = self.sender.capacity() {
204            let mut drained = Vec::new();
205            while self.sender.len() >= capacity {
206                if let Ok(old_value) = self.receiver.recv_async().await {
207                    drained.push(old_value);
208                }
209            }
210            self.sender.send_async(value).await?;
211            Ok(if drained.is_empty() {
212                None
213            } else {
214                Some(drained)
215            })
216        } else {
217            self.sender.send_async(value).await?;
218            Ok(None)
219        }
220    }
221}
222
223#[cfg(test)]
224mod test {
225    use super::*;
226
227    use std::sync::Arc;
228    use std::thread;
229    use std::time::Duration;
230
231    use futures::executor::block_on;
232
233    #[test]
234    fn test_send_overwrite_under_capacity() {
235        let (sender, receiver) = bounded(3);
236        assert_eq!(sender.send_overwrite(1).unwrap(), None);
237        assert_eq!(sender.send_overwrite(2).unwrap(), None);
238        assert_eq!(receiver.try_recv().unwrap(), 1);
239        assert_eq!(receiver.try_recv().unwrap(), 2);
240    }
241
242    #[test]
243    fn test_send_overwrite_at_capacity() {
244        let (sender, receiver) = bounded(2);
245        assert_eq!(sender.send_overwrite(1).unwrap(), None);
246        assert_eq!(sender.send_overwrite(2).unwrap(), None);
247
248        let drained = sender.send_overwrite(3).unwrap();
249        assert_eq!(drained, Some(vec![1]));
250        assert_eq!(receiver.try_recv().unwrap(), 2);
251        assert_eq!(receiver.try_recv().unwrap(), 3);
252    }
253
254    #[test]
255    fn test_send_overwrite_multiple_overwrites() {
256        let (sender, receiver) = bounded(2);
257        assert_eq!(sender.send_overwrite(1).unwrap(), None);
258        assert_eq!(sender.send_overwrite(2).unwrap(), None);
259        // Fill up, then send two more, should drain two
260        let drained = sender.send_overwrite(3).unwrap();
261        assert_eq!(drained, Some(vec![1]));
262        let drained2 = sender.send_overwrite(4).unwrap();
263        assert_eq!(drained2, Some(vec![2]));
264        assert_eq!(receiver.try_recv().unwrap(), 3);
265        assert_eq!(receiver.try_recv().unwrap(), 4);
266    }
267
268    #[test]
269    fn test_send_overwrite_unbounded() {
270        let (sender, receiver) = bounded(2);
271        assert_eq!(sender.send_overwrite(1).unwrap(), None);
272        assert_eq!(sender.send_overwrite(2).unwrap(), None);
273        assert_eq!(receiver.try_recv().unwrap(), 1);
274        assert_eq!(receiver.try_recv().unwrap(), 2);
275    }
276
277    #[test]
278    fn test_send_overwrite_async_under_capacity() {
279        let (sender, receiver) = bounded(3);
280        let fut = sender.send_overwrite_async(1);
281        assert_eq!(block_on(fut).unwrap(), None);
282        let fut = sender.send_overwrite_async(2);
283        assert_eq!(block_on(fut).unwrap(), None);
284        assert_eq!(block_on(receiver.recv_async()).unwrap(), 1);
285        assert_eq!(block_on(receiver.recv_async()).unwrap(), 2);
286    }
287
288    #[test]
289    fn test_send_overwrite_async_at_capacity() {
290        let (sender, receiver) = bounded(2);
291        block_on(sender.send_overwrite_async(1)).unwrap();
292        block_on(sender.send_overwrite_async(2)).unwrap();
293        let drained = block_on(sender.send_overwrite_async(3)).unwrap();
294        assert_eq!(drained, Some(vec![1]));
295        assert_eq!(block_on(receiver.recv_async()).unwrap(), 2);
296        assert_eq!(block_on(receiver.recv_async()).unwrap(), 3);
297    }
298
299    #[test]
300    fn test_send_overwrite_async_multiple_overwrites() {
301        let (sender, receiver) = bounded(2);
302        block_on(sender.send_overwrite_async(1)).unwrap();
303        block_on(sender.send_overwrite_async(2)).unwrap();
304        let drained = block_on(sender.send_overwrite_async(3)).unwrap();
305        assert_eq!(drained, Some(vec![1]));
306        let drained2 = block_on(sender.send_overwrite_async(4)).unwrap();
307        assert_eq!(drained2, Some(vec![2]));
308        assert_eq!(block_on(receiver.recv_async()).unwrap(), 3);
309        assert_eq!(block_on(receiver.recv_async()).unwrap(), 4);
310    }
311
312    #[test]
313    fn test_send_overwrite_async_unbounded() {
314        let (sender, receiver) = bounded(2);
315        assert_eq!(block_on(sender.send_overwrite_async(1)).unwrap(), None);
316        assert_eq!(block_on(sender.send_overwrite_async(2)).unwrap(), None);
317        assert_eq!(block_on(receiver.recv_async()).unwrap(), 1);
318        assert_eq!(block_on(receiver.recv_async()).unwrap(), 2);
319    }
320
321    #[test]
322    fn test_send_overwrite_concurrent() {
323        let (sender, receiver) = bounded(2);
324        let sender_clone = sender.clone();
325        let handle = thread::spawn(move || {
326            for i in 0..5 {
327                sender_clone.send_overwrite(i).unwrap();
328                thread::sleep(Duration::from_millis(10));
329            }
330        });
331        handle.join().unwrap();
332        let mut received = Vec::new();
333        while let Ok(val) = receiver.try_recv() {
334            received.push(val);
335        }
336        // Should have at most 2 items, the last two sent
337        assert!(received.len() <= 2);
338        if received.len() == 2 {
339            assert_eq!(received, vec![3, 4]);
340        }
341    }
342
343    #[test]
344    fn test_send_overwrite_async_concurrent() {
345        use std::sync::Mutex;
346        let (sender, receiver) = bounded(2);
347        let sender_clone = sender.clone();
348        let received = Arc::new(Mutex::new(Vec::new()));
349        let received2 = received.clone();
350        let handle = thread::spawn(move || {
351            block_on(async {
352                for i in 0..5 {
353                    sender_clone.send_overwrite_async(i).await.unwrap();
354                    // TODO: use a real delay
355                    // simulate work
356                    futures_timer::Delay::new(Duration::from_millis(10)).await;
357                }
358            });
359        });
360        handle.join().unwrap();
361        while let Ok(val) = receiver.try_recv() {
362            received2.lock().unwrap().push(val);
363        }
364        let got = received.lock().unwrap();
365        // Should have at most 2 items, the last two sent
366        assert!(got.len() <= 2);
367        if got.len() == 2 {
368            assert_eq!(*got, vec![3, 4]);
369        }
370    }
371}