futures_shim/mpsc.rs
1// Copyright 2017 Yurii Rashkovskii
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15//
16// Permission is hereby granted, free of charge, to any
17// person obtaining a copy of this software and associated
18// documentation files (the "Software"), to deal in the
19// Software without restriction, including without
20// limitation the rights to use, copy, modify, merge,
21// publish, distribute, sublicense, and/or sell copies of
22// the Software, and to permit persons to whom the Software
23// is furnished to do so, subject to the following
24// conditions:
25//
26// The above copyright notice and this permission notice
27// shall be included in all copies or substantial portions
28// of the Software.
29//
30// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
31// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
32// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
33// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
34// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
35// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
36// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
37// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
38// DEALINGS IN THE SOFTWARE.
39
40//! The purpose of this module is to enable smooth
41//! integration of unbounded mpsc-using components into
42//! futures-based pipelines.
43
44use futures::{Future, Stream, Poll, Async, task};
45use std::sync::mpsc::{Receiver as MpscReceiver, Sender as MpscSender,
46 TryRecvError, SendError as MpscSendError,
47 channel as mpsc_channel};
48
49/// Wrapper for an mpsc Receiver
50///
51/// One can create one using the `From` or `Into` trait:
52///
53/// ```
54/// use futures_shim::mpsc;
55/// let (_, rx) = ::std::sync::mpsc::channel::<i8>();
56/// let _ : mpsc::Receiver<i8> = rx.into();
57/// ```
58///
59/// Receiver also implements futures' `Stream` trait
60#[derive(Debug)]
61pub struct Receiver<T>(MpscReceiver<T>);
62
63impl<T> From<MpscReceiver<T>> for Receiver<T> {
64 fn from(value: MpscReceiver<T>) -> Self {
65 Receiver(value)
66 }
67}
68
69impl<T> Stream for Receiver<T> {
70 type Item = T;
71 type Error = ();
72
73 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
74 match self.0.try_recv() {
75 Ok(result) => Ok(Async::from(Some(result))),
76 Err(TryRecvError::Empty) => {
77 task::current().notify();
78 Ok(Async::NotReady)
79 },
80 Err(TryRecvError::Disconnected) => Ok(Async::from(None)),
81 }
82 }
83}
84
85/// Wrapper for an mpsc Sender
86///
87/// One can create one using the `From` or `Into` trait:
88///
89/// ```
90/// use futures_shim::mpsc;
91/// let (tx, _) = ::std::sync::mpsc::channel::<i8>();
92/// let _ : mpsc::Sender<i8> = tx.into();
93/// ```
94#[derive(Debug)]
95pub struct Sender<T>(MpscSender<T>);
96
97impl<T> From<MpscSender<T>> for Sender<T> {
98 fn from(value: MpscSender<T>) -> Self {
99 Sender(value)
100 }
101}
102
103/// Represents an error sending data
104///
105/// To quote stdlib's documentation:
106///
107/// A **send** operation can only fail if the receiving end of a channel is
108/// disconnected, implying that the data could never be received. The error
109/// contains the data being sent as a payload so it can be recovered.
110#[derive(Debug)]
111pub struct SendError<T>(T);
112
113pub struct SendResult<T>(Option<Result<(), MpscSendError<T>>>);
114
115impl<T> Future for SendResult<T> {
116 type Item = ();
117 type Error = SendError<T>;
118
119 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
120 let result = self.0.take().expect("can't poll SendResult twice");
121 match result {
122 Ok(()) => Ok(Async::from(())),
123 Err(MpscSendError(data)) => Err(SendError(data)),
124 }
125 }
126}
127
128impl<T> Sender<T> {
129 /// Sends the data, returning a future
130 pub fn send(&self, data: T) -> SendResult<T> {
131 SendResult(Some(self.0.send(data)))
132 }
133}
134
135impl<T> Clone for Sender<T> {
136 fn clone(&self) -> Self {
137 self.0.clone().into()
138 }
139}
140
141
142/// Creates an unbounded mpsc channel
143///
144/// This function is Using `std::sync::mpsc::channel` behind the scenes, and
145/// converting into future_shim's `Sender` and `Receiver`
146///
147/// ```
148/// use futures_shim::mpsc;
149/// let (tx, rx) = mpsc::channel::<i8>();
150/// ```
151pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
152 let (tx, rx) = mpsc_channel();
153 (tx.into(), rx.into())
154}
155
156#[cfg(test)]
157mod tests {
158
159 use mpsc;
160 use tokio_core;
161 use futures::{Stream, Future};
162 use std::thread;
163
164 #[test]
165 fn it_works() {
166 let mut core = tokio_core::reactor::Core::new().unwrap();
167 let (tx, rx) = mpsc::channel();
168 assert!(core.run(tx.send(1i8)).is_ok());
169 let (received, _) = core.run(rx.into_future()).unwrap();
170 assert_eq!(Some(1i8), received);
171 }
172
173 #[test]
174 fn from_into() {
175 let (tx, rx) = ::std::sync::mpsc::channel::<i8>();
176 let _ : mpsc::Receiver<i8> = rx.into();
177 let _ : mpsc::Sender<i8> = tx.into();
178 }
179
180 #[test]
181 fn stream_end_on_drop() {
182 let mut core = tokio_core::reactor::Core::new().unwrap();
183 let (tx, rx) = mpsc::channel::<i8>();
184 drop(tx);
185 let (received, _) = core.run(rx.into_future()).unwrap();
186 assert_eq!(received, None);
187 }
188
189
190 #[test]
191 fn does_not_block_on_empty() {
192 let mut core = tokio_core::reactor::Core::new().unwrap();
193 let (tx, rx) = mpsc::channel();
194 let tx1 = tx.clone();
195 let remote = core.remote();
196 thread::spawn(move || {
197 thread::sleep(::std::time::Duration::from_millis(500));
198 remote.spawn(move |_| tx1.send(1i8).map_err(|_| ()));
199 });
200 let (received, _) = core.run(rx.into_future()).unwrap();
201 assert_eq!(Some(1i8), received);
202 }
203
204}