sod_mpsc/
lib.rs

1//! [`sod::Service`] implementations to interact with [`std::sync::mpsc`] queues.
2//!
3//! ## Service Impls
4//! * [`MpscSender`] sends to a [`std::sync::mpsc::channel`].
5//! * [`MpscSyncSender`] sends to a [`std::sync::mpsc::sync_channel`] and blocks if the channel is full.
6//! * [`MpscSyncTrySender`] tries to send to a [`std::sync::mpsc::sync_channel`] and is able to be retried via [`sod::RetryService`] when the channel is full.
7//! * [`MpscReceiver`] receives from a [`std::sync::mpsc::channel`] or [`std::sync::mpsc::sync_channel`], blocking until an element is received.
8//! * [`MpscTryReceiver`] tries to receive from a [`std::sync::mpsc::channel`] or [`std::sync::mpsc::sync_channel`], and is able to be retried via [`sod::RetryService`] when the channel is empty.
9//!
10//! ## Example
11//! ```
12//! use sod::Service;
13//! use sod_mpsc::{MpscSender, MpscReceiver};
14//! use std::sync::mpsc;
15//!
16//! let (tx, rx) = mpsc::channel();
17//! let pusher = MpscSender::new(tx);
18//! let poller = MpscReceiver::new(rx);
19//!
20//! pusher.process(1).unwrap();
21//! pusher.process(2).unwrap();
22//!
23//! assert_eq!(poller.process(()).unwrap(), 1);
24//! assert_eq!(poller.process(()).unwrap(), 2);
25//! ```
26
27use sod::{RetryError, Retryable, Service};
28use std::sync::mpsc::{
29    Receiver, RecvError, SendError, Sender, SyncSender, TryRecvError, TrySendError,
30};
31
32/// A non-blocking [`sod::Service`] that sends to an underlying [`std::sync::mpsc::Sender`].
33#[derive(Clone)]
34pub struct MpscSender<T> {
35    tx: Sender<T>,
36}
37impl<T> MpscSender<T> {
38    pub fn new(tx: Sender<T>) -> Self {
39        Self { tx }
40    }
41}
42impl<T> Service for MpscSender<T> {
43    type Input = T;
44    type Output = ();
45    type Error = SendError<T>;
46    fn process(&self, input: T) -> Result<Self::Output, Self::Error> {
47        self.tx.send(input)
48    }
49}
50
51/// A blocking [`sod::Service`] that sends to an underlying [`std::sync::mpsc::SyncSender`] using the `send` function.
52#[derive(Clone)]
53pub struct MpscSyncSender<T> {
54    tx: SyncSender<T>,
55}
56impl<T> MpscSyncSender<T> {
57    pub fn new(tx: SyncSender<T>) -> Self {
58        Self { tx }
59    }
60}
61impl<T> Service for MpscSyncSender<T> {
62    type Input = T;
63    type Output = ();
64    type Error = SendError<T>;
65    fn process(&self, input: T) -> Result<Self::Output, Self::Error> {
66        self.tx.send(input)
67    }
68}
69
70/// A non-blocking [`sod::Service`] that is [`sod::Retryable`] and sends to an underlying [`std::sync::mpsc::SyncSender`] using the `try_send` function.
71#[derive(Clone)]
72pub struct MpscSyncTrySender<T> {
73    tx: SyncSender<T>,
74}
75impl<T> MpscSyncTrySender<T> {
76    pub fn new(tx: SyncSender<T>) -> Self {
77        Self { tx }
78    }
79}
80impl<T> Service for MpscSyncTrySender<T> {
81    type Input = T;
82    type Output = ();
83    type Error = TrySendError<T>;
84    fn process(&self, input: T) -> Result<Self::Output, Self::Error> {
85        self.tx.try_send(input)
86    }
87}
88impl<T> Retryable<T, TrySendError<T>> for MpscSyncTrySender<T> {
89    fn parse_retry(&self, err: TrySendError<T>) -> Result<T, RetryError<TrySendError<T>>> {
90        match err {
91            TrySendError::Full(input) => Ok(input),
92            err => Err(RetryError::ServiceError(err)),
93        }
94    }
95}
96
97/// A blocking [`sod::Service`] that receives from an underlying [`std::sync::mpsc::Receiver`], blocking per the rules of `Receiver::recv`
98pub struct MpscReceiver<T> {
99    rx: Receiver<T>,
100}
101impl<T> MpscReceiver<T> {
102    pub fn new(rx: Receiver<T>) -> Self {
103        Self { rx }
104    }
105}
106impl<T> Service for MpscReceiver<T> {
107    type Input = ();
108    type Output = T;
109    type Error = RecvError;
110    fn process(&self, _: ()) -> Result<Self::Output, Self::Error> {
111        self.rx.recv()
112    }
113}
114
115/// A non-blocking [`sod::Service`] that is [`sod::Retryable`] and receives from an underlying [`std::sync::mpsc::Receiver`], blocking per the rules of `Receiver::recv`
116pub struct MpscTryReceiver<T> {
117    rx: Receiver<T>,
118}
119impl<T> MpscTryReceiver<T> {
120    pub fn new(rx: Receiver<T>) -> Self {
121        Self { rx }
122    }
123}
124impl<T> Service for MpscTryReceiver<T> {
125    type Input = ();
126    type Output = T;
127    type Error = TryRecvError;
128    fn process(&self, _: ()) -> Result<Self::Output, Self::Error> {
129        self.rx.try_recv()
130    }
131}
132impl<T> Retryable<(), TryRecvError> for MpscTryReceiver<T> {
133    fn parse_retry(&self, err: TryRecvError) -> Result<(), RetryError<TryRecvError>> {
134        match err {
135            TryRecvError::Empty => Ok(()),
136            err => Err(RetryError::ServiceError(err)),
137        }
138    }
139}
140
141#[cfg(test)]
142mod tests {
143    use super::*;
144    use std::sync::mpsc;
145
146    #[test]
147    fn channel() {
148        let (tx, rx) = mpsc::channel();
149        let pusher = MpscSender::new(tx);
150        let poller = MpscReceiver::new(rx);
151
152        pusher.process(1).unwrap();
153        pusher.process(2).unwrap();
154        pusher.process(3).unwrap();
155
156        assert_eq!(poller.process(()).unwrap(), 1);
157        assert_eq!(poller.process(()).unwrap(), 2);
158        assert_eq!(poller.process(()).unwrap(), 3);
159    }
160
161    #[test]
162    fn sync_channel() {
163        let (tx, rx) = mpsc::sync_channel(5);
164        let pusher = MpscSyncSender::new(tx);
165        let poller = MpscReceiver::new(rx);
166
167        pusher.process(1).unwrap();
168        pusher.process(2).unwrap();
169        pusher.process(3).unwrap();
170
171        assert_eq!(poller.process(()), Ok(1));
172        assert_eq!(poller.process(()), Ok(2));
173        assert_eq!(poller.process(()), Ok(3));
174    }
175
176    #[test]
177    fn try_sync_channel() {
178        let (tx, rx) = mpsc::sync_channel(5);
179        let pusher = MpscSyncTrySender::new(tx);
180        let poller = MpscTryReceiver::new(rx);
181
182        pusher.process(1).unwrap();
183        pusher.process(2).unwrap();
184        pusher.process(3).unwrap();
185
186        assert_eq!(poller.process(()), Ok(1));
187        assert_eq!(poller.process(()), Ok(2));
188        assert_eq!(poller.process(()), Ok(3));
189        assert_eq!(poller.process(()), Err(TryRecvError::Empty));
190    }
191}