crossbeam_requests/
lib.rs

1//! mpsc_requests rewritten for crossbeam, written by @stjepang (https://github.com/crossbeam-rs/crossbeam/issues/353#issuecomment-484013974)
2//!
3//! crossbeam_requests is a small library built on top of crossbeam-channel but with
4//! the addition of the consumer responding with a message to the producer.
5//! Since the producer no longer only produces and the consumer no longer only consumes, the
6//! Producer is renamed to [RequestSender] and the Consumer is renamed to [RequestReceiver].
7//!
8//! This library is based on crossbeam-requests instead of mpsc channels in the standard library
9//! because crossbeam has better performance and better compatibility with android.
10//!
11//! A perfect use-case for this library is single-threaded databases which need
12//! to be accessed from multiple threads (such as SQLite)
13//!
14//! Here's a diagram of the dataflow
15//!
16//! |--------------------------------------------------------------------------------------|
17//! | Thread    | Request thread |            Respond thread           |  Request thread   |
18//! |--------------------------------------------------------------------------------------|
19//! | Struct    | RequestSender ->  RequestReceiver  -> ResponseSender -> ResponseReceiver |
20//! | (methods) |   (request)   -> (poll, poll_loop) ->    (respond)   ->    (collect)     |
21//! |--------------------------------------------------------------------------------------|
22//!
23//! # Examples
24//! For more examples, see the examples directory
25//!
26//! For even more examples see the tests in the tests directory
27//!
28//! ## Simple echo example
29//! ```rust,run
30//! use std::thread;
31//! use crossbeam_requests::channel;
32//!
33//! type RequestType = String;
34//! type ResponseType = String;
35//! let (requester, responder) = channel::<RequestType, ResponseType>();
36//! thread::spawn(move || {
37//!     responder.poll_loop(|req, res_sender| {
38//!         res_sender.respond(req);
39//!     });
40//! });
41//! let msg = String::from("Hello");
42//! let receiver = requester.request(msg.clone()).unwrap();
43//! let res = receiver.collect().unwrap();
44//! assert_eq!(res, msg);
45//! ```
46
47#![deny(missing_docs)]
48
49use crossbeam_channel as cc;
50
51/// Create a [RequestSender] and a [RequestReceiver] with a channel between them
52///
53/// The [RequestSender] can be cloned to be able to do requests to the same [RequestReceiver] from multiple
54/// threads.
55pub fn channel<Req, Res>() -> (RequestSender<Req, Res>, RequestReceiver<Req, Res>) {
56    let (request_sender, request_receiver) = cc::unbounded::<(Req, ResponseSender<Res>)>();
57    let request_sender = RequestSender::new(request_sender);
58    let request_receiver = RequestReceiver::new(request_receiver);
59    (request_sender, request_receiver)
60}
61
62#[derive(Debug)]
63/// Errors which can occur when a [RequestReceiver] handles a request
64pub enum RequestError {
65    /// Error occuring when channel from [RequestSender] to [RequestReceiver] is broken
66    RecvError,
67    /// Error occuring when channel from [RequestReceiver] to [RequestSender] is broken
68    SendError
69}
70impl From<cc::RecvError> for RequestError {
71    fn from(_err: cc::RecvError) -> RequestError {
72        RequestError::RecvError
73    }
74}
75impl<T> From<cc::SendError<T>> for RequestError {
76    fn from(_err: cc::SendError<T>) -> RequestError {
77        RequestError::SendError
78    }
79}
80
81/// A [ResponseSender] is received from the [RequestReceiver] to respond to the request back to the
82/// [RequestSender]
83pub struct ResponseSender<Res> {
84    response_sender: cc::Sender<Res>,
85}
86
87impl<Res> ResponseSender<Res> {
88    fn new(response_sender: cc::Sender<Res>) -> ResponseSender<Res> {
89        ResponseSender {
90            response_sender: response_sender,
91        }
92    }
93
94    /// Responds a request from the [RequestSender] which finishes the request
95    pub fn respond(&self, response: Res) {
96        match self.response_sender.send(response) {
97            Ok(_) => (),
98            Err(_e) => panic!("Response failed, send pipe was broken during request!")
99        }
100    }
101}
102
103/// A [RequestReceiver] listens to requests. Requests are a tuple of a message
104/// and a [ResponseSender] which is used to respond back to the [ResponseReceiver]
105pub struct RequestReceiver<Req, Res> {
106    request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>,
107}
108
109impl<Req, Res> RequestReceiver<Req, Res> {
110    fn new(request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>) -> RequestReceiver<Req, Res> {
111        RequestReceiver {
112            request_receiver,
113        }
114    }
115
116    /// Poll if the [RequestReceiver] has received any requests.
117    /// The poll returns a tuple of the request message and a [ResponseSender]
118    /// which is used to respond back to the ResponseReceiver.
119    ///
120    /// NOTE: It is considered an programmer error to not send a response with
121    /// the [ResponseSender]
122    ///
123    /// This call is blocking
124    /// TODO: add a poll equivalent which is not blocking and/or has a timeout
125    pub fn poll(&self) -> Result<(Req, ResponseSender<Res>), RequestError> {
126        match self.request_receiver.recv() {
127            Ok((request, response_sender)) => Ok((request, response_sender)),
128            Err(_e) => Err(RequestError::RecvError)
129        }
130    }
131
132    /// A shorthand for running poll with a closure for as long as there is one or more [RequestSender]s alive
133    /// referencing this [RequestReceiver]
134    pub fn poll_loop<F>(&self, mut f: F) where F: FnMut(Req, ResponseSender<Res>) {
135        loop {
136            match self.poll() {
137                Ok((request, response_sender)) => f(request, response_sender),
138                Err(e) => match e {
139                    // No more send channels open, quitting
140                    RequestError::RecvError => break,
141                    _ => panic!("This is a bug")
142                }
143            };
144        }
145    }
146}
147
148
149/// [ResponseReceiver] listens for a response from a [ResponseSender].
150/// The response is received using the collect method.
151#[derive(Clone)]
152pub struct ResponseReceiver<Res> {
153    response_receiver: cc::Receiver<Res>,
154}
155
156impl<Res> ResponseReceiver<Res> {
157    fn new(response_receiver: cc::Receiver<Res>) -> ResponseReceiver<Res> {
158        ResponseReceiver {
159            response_receiver
160        }
161    }
162
163    /// Send request to the connected [RequestReceiver]
164    pub fn collect(&self) -> Result<Res, RequestError> {
165        Ok(self.response_receiver.recv()?)
166    }
167}
168
169
170/// [RequestSender] has a connection to a [RequestReceiver] to which it can
171/// send a requests to.
172/// The request method is used to make a request and it returns a
173/// [ResponseReceiver] which is used to receive the response.
174#[derive(Clone)]
175pub struct RequestSender<Req, Res> {
176    request_sender: cc::Sender<(Req, ResponseSender<Res>)>,
177}
178
179impl<Req, Res> RequestSender<Req, Res> {
180    fn new(request_sender: cc::Sender<(Req, ResponseSender<Res>)>) -> RequestSender<Req, Res> {
181        RequestSender {
182            request_sender: request_sender,
183        }
184    }
185
186    /// Send request to the connected [RequestReceiver]
187    /// Returns a [RequestReceiver] which is used to receive the response.
188    pub fn request(&self, request: Req) -> Result<ResponseReceiver<Res>, RequestError> {
189        let (response_sender, response_receiver) = cc::unbounded::<Res>();
190        let response_sender = ResponseSender::new(response_sender);
191        self.request_sender.send((request, response_sender))?;
192        Ok(ResponseReceiver::new(response_receiver))
193    }
194}