1use sod::{RetryError, Retryable, Service};
28use std::sync::mpsc::{
29 Receiver, RecvError, SendError, Sender, SyncSender, TryRecvError, TrySendError,
30};
31
32#[derive(Clone)]
34pub struct MpscSender<T> {
35 tx: Sender<T>,
36}
37impl<T> MpscSender<T> {
38 pub fn new(tx: Sender<T>) -> Self {
39 Self { tx }
40 }
41}
42impl<T> Service for MpscSender<T> {
43 type Input = T;
44 type Output = ();
45 type Error = SendError<T>;
46 fn process(&self, input: T) -> Result<Self::Output, Self::Error> {
47 self.tx.send(input)
48 }
49}
50
51#[derive(Clone)]
53pub struct MpscSyncSender<T> {
54 tx: SyncSender<T>,
55}
56impl<T> MpscSyncSender<T> {
57 pub fn new(tx: SyncSender<T>) -> Self {
58 Self { tx }
59 }
60}
61impl<T> Service for MpscSyncSender<T> {
62 type Input = T;
63 type Output = ();
64 type Error = SendError<T>;
65 fn process(&self, input: T) -> Result<Self::Output, Self::Error> {
66 self.tx.send(input)
67 }
68}
69
70#[derive(Clone)]
72pub struct MpscSyncTrySender<T> {
73 tx: SyncSender<T>,
74}
75impl<T> MpscSyncTrySender<T> {
76 pub fn new(tx: SyncSender<T>) -> Self {
77 Self { tx }
78 }
79}
80impl<T> Service for MpscSyncTrySender<T> {
81 type Input = T;
82 type Output = ();
83 type Error = TrySendError<T>;
84 fn process(&self, input: T) -> Result<Self::Output, Self::Error> {
85 self.tx.try_send(input)
86 }
87}
88impl<T> Retryable<T, TrySendError<T>> for MpscSyncTrySender<T> {
89 fn parse_retry(&self, err: TrySendError<T>) -> Result<T, RetryError<TrySendError<T>>> {
90 match err {
91 TrySendError::Full(input) => Ok(input),
92 err => Err(RetryError::ServiceError(err)),
93 }
94 }
95}
96
97pub struct MpscReceiver<T> {
99 rx: Receiver<T>,
100}
101impl<T> MpscReceiver<T> {
102 pub fn new(rx: Receiver<T>) -> Self {
103 Self { rx }
104 }
105}
106impl<T> Service for MpscReceiver<T> {
107 type Input = ();
108 type Output = T;
109 type Error = RecvError;
110 fn process(&self, _: ()) -> Result<Self::Output, Self::Error> {
111 self.rx.recv()
112 }
113}
114
115pub struct MpscTryReceiver<T> {
117 rx: Receiver<T>,
118}
119impl<T> MpscTryReceiver<T> {
120 pub fn new(rx: Receiver<T>) -> Self {
121 Self { rx }
122 }
123}
124impl<T> Service for MpscTryReceiver<T> {
125 type Input = ();
126 type Output = T;
127 type Error = TryRecvError;
128 fn process(&self, _: ()) -> Result<Self::Output, Self::Error> {
129 self.rx.try_recv()
130 }
131}
132impl<T> Retryable<(), TryRecvError> for MpscTryReceiver<T> {
133 fn parse_retry(&self, err: TryRecvError) -> Result<(), RetryError<TryRecvError>> {
134 match err {
135 TryRecvError::Empty => Ok(()),
136 err => Err(RetryError::ServiceError(err)),
137 }
138 }
139}
140
141#[cfg(test)]
142mod tests {
143 use super::*;
144 use std::sync::mpsc;
145
146 #[test]
147 fn channel() {
148 let (tx, rx) = mpsc::channel();
149 let pusher = MpscSender::new(tx);
150 let poller = MpscReceiver::new(rx);
151
152 pusher.process(1).unwrap();
153 pusher.process(2).unwrap();
154 pusher.process(3).unwrap();
155
156 assert_eq!(poller.process(()).unwrap(), 1);
157 assert_eq!(poller.process(()).unwrap(), 2);
158 assert_eq!(poller.process(()).unwrap(), 3);
159 }
160
161 #[test]
162 fn sync_channel() {
163 let (tx, rx) = mpsc::sync_channel(5);
164 let pusher = MpscSyncSender::new(tx);
165 let poller = MpscReceiver::new(rx);
166
167 pusher.process(1).unwrap();
168 pusher.process(2).unwrap();
169 pusher.process(3).unwrap();
170
171 assert_eq!(poller.process(()), Ok(1));
172 assert_eq!(poller.process(()), Ok(2));
173 assert_eq!(poller.process(()), Ok(3));
174 }
175
176 #[test]
177 fn try_sync_channel() {
178 let (tx, rx) = mpsc::sync_channel(5);
179 let pusher = MpscSyncTrySender::new(tx);
180 let poller = MpscTryReceiver::new(rx);
181
182 pusher.process(1).unwrap();
183 pusher.process(2).unwrap();
184 pusher.process(3).unwrap();
185
186 assert_eq!(poller.process(()), Ok(1));
187 assert_eq!(poller.process(()), Ok(2));
188 assert_eq!(poller.process(()), Ok(3));
189 assert_eq!(poller.process(()), Err(TryRecvError::Empty));
190 }
191}