crossbeam_requests/lib.rs
1//! mpsc_requests rewritten for crossbeam, written by @stjepang (https://github.com/crossbeam-rs/crossbeam/issues/353#issuecomment-484013974)
2//!
3//! crossbeam_requests is a small library built on top of crossbeam-channel but with
4//! the addition of the consumer responding with a message to the producer.
5//! Since the producer no longer only produces and the consumer no longer only consumes, the
6//! Producer is renamed to [RequestSender] and the Consumer is renamed to [RequestReceiver].
7//!
8//! This library is based on crossbeam-requests instead of mpsc channels in the standard library
9//! because crossbeam has better performance and better compatibility with android.
10//!
11//! A perfect use-case for this library is single-threaded databases which need
12//! to be accessed from multiple threads (such as SQLite)
13//!
14//! Here's a diagram of the dataflow
15//!
16//! |--------------------------------------------------------------------------------------|
17//! | Thread | Request thread | Respond thread | Request thread |
18//! |--------------------------------------------------------------------------------------|
19//! | Struct | RequestSender -> RequestReceiver -> ResponseSender -> ResponseReceiver |
20//! | (methods) | (request) -> (poll, poll_loop) -> (respond) -> (collect) |
21//! |--------------------------------------------------------------------------------------|
22//!
23//! # Examples
24//! For more examples, see the examples directory
25//!
26//! For even more examples see the tests in the tests directory
27//!
28//! ## Simple echo example
29//! ```rust,run
30//! use std::thread;
31//! use crossbeam_requests::channel;
32//!
33//! type RequestType = String;
34//! type ResponseType = String;
35//! let (requester, responder) = channel::<RequestType, ResponseType>();
36//! thread::spawn(move || {
37//! responder.poll_loop(|req, res_sender| {
38//! res_sender.respond(req);
39//! });
40//! });
41//! let msg = String::from("Hello");
42//! let receiver = requester.request(msg.clone()).unwrap();
43//! let res = receiver.collect().unwrap();
44//! assert_eq!(res, msg);
45//! ```
46
47#![deny(missing_docs)]
48
49use crossbeam_channel as cc;
50
51/// Create a [RequestSender] and a [RequestReceiver] with a channel between them
52///
53/// The [RequestSender] can be cloned to be able to do requests to the same [RequestReceiver] from multiple
54/// threads.
55pub fn channel<Req, Res>() -> (RequestSender<Req, Res>, RequestReceiver<Req, Res>) {
56 let (request_sender, request_receiver) = cc::unbounded::<(Req, ResponseSender<Res>)>();
57 let request_sender = RequestSender::new(request_sender);
58 let request_receiver = RequestReceiver::new(request_receiver);
59 (request_sender, request_receiver)
60}
61
62#[derive(Debug)]
63/// Errors which can occur when a [RequestReceiver] handles a request
64pub enum RequestError {
65 /// Error occuring when channel from [RequestSender] to [RequestReceiver] is broken
66 RecvError,
67 /// Error occuring when channel from [RequestReceiver] to [RequestSender] is broken
68 SendError
69}
70impl From<cc::RecvError> for RequestError {
71 fn from(_err: cc::RecvError) -> RequestError {
72 RequestError::RecvError
73 }
74}
75impl<T> From<cc::SendError<T>> for RequestError {
76 fn from(_err: cc::SendError<T>) -> RequestError {
77 RequestError::SendError
78 }
79}
80
81/// A [ResponseSender] is received from the [RequestReceiver] to respond to the request back to the
82/// [RequestSender]
83pub struct ResponseSender<Res> {
84 response_sender: cc::Sender<Res>,
85}
86
87impl<Res> ResponseSender<Res> {
88 fn new(response_sender: cc::Sender<Res>) -> ResponseSender<Res> {
89 ResponseSender {
90 response_sender: response_sender,
91 }
92 }
93
94 /// Responds a request from the [RequestSender] which finishes the request
95 pub fn respond(&self, response: Res) {
96 match self.response_sender.send(response) {
97 Ok(_) => (),
98 Err(_e) => panic!("Response failed, send pipe was broken during request!")
99 }
100 }
101}
102
103/// A [RequestReceiver] listens to requests. Requests are a tuple of a message
104/// and a [ResponseSender] which is used to respond back to the [ResponseReceiver]
105pub struct RequestReceiver<Req, Res> {
106 request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>,
107}
108
109impl<Req, Res> RequestReceiver<Req, Res> {
110 fn new(request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>) -> RequestReceiver<Req, Res> {
111 RequestReceiver {
112 request_receiver,
113 }
114 }
115
116 /// Poll if the [RequestReceiver] has received any requests.
117 /// The poll returns a tuple of the request message and a [ResponseSender]
118 /// which is used to respond back to the ResponseReceiver.
119 ///
120 /// NOTE: It is considered an programmer error to not send a response with
121 /// the [ResponseSender]
122 ///
123 /// This call is blocking
124 /// TODO: add a poll equivalent which is not blocking and/or has a timeout
125 pub fn poll(&self) -> Result<(Req, ResponseSender<Res>), RequestError> {
126 match self.request_receiver.recv() {
127 Ok((request, response_sender)) => Ok((request, response_sender)),
128 Err(_e) => Err(RequestError::RecvError)
129 }
130 }
131
132 /// A shorthand for running poll with a closure for as long as there is one or more [RequestSender]s alive
133 /// referencing this [RequestReceiver]
134 pub fn poll_loop<F>(&self, mut f: F) where F: FnMut(Req, ResponseSender<Res>) {
135 loop {
136 match self.poll() {
137 Ok((request, response_sender)) => f(request, response_sender),
138 Err(e) => match e {
139 // No more send channels open, quitting
140 RequestError::RecvError => break,
141 _ => panic!("This is a bug")
142 }
143 };
144 }
145 }
146}
147
148
149/// [ResponseReceiver] listens for a response from a [ResponseSender].
150/// The response is received using the collect method.
151#[derive(Clone)]
152pub struct ResponseReceiver<Res> {
153 response_receiver: cc::Receiver<Res>,
154}
155
156impl<Res> ResponseReceiver<Res> {
157 fn new(response_receiver: cc::Receiver<Res>) -> ResponseReceiver<Res> {
158 ResponseReceiver {
159 response_receiver
160 }
161 }
162
163 /// Send request to the connected [RequestReceiver]
164 pub fn collect(&self) -> Result<Res, RequestError> {
165 Ok(self.response_receiver.recv()?)
166 }
167}
168
169
170/// [RequestSender] has a connection to a [RequestReceiver] to which it can
171/// send a requests to.
172/// The request method is used to make a request and it returns a
173/// [ResponseReceiver] which is used to receive the response.
174#[derive(Clone)]
175pub struct RequestSender<Req, Res> {
176 request_sender: cc::Sender<(Req, ResponseSender<Res>)>,
177}
178
179impl<Req, Res> RequestSender<Req, Res> {
180 fn new(request_sender: cc::Sender<(Req, ResponseSender<Res>)>) -> RequestSender<Req, Res> {
181 RequestSender {
182 request_sender: request_sender,
183 }
184 }
185
186 /// Send request to the connected [RequestReceiver]
187 /// Returns a [RequestReceiver] which is used to receive the response.
188 pub fn request(&self, request: Req) -> Result<ResponseReceiver<Res>, RequestError> {
189 let (response_sender, response_receiver) = cc::unbounded::<Res>();
190 let response_sender = ResponseSender::new(response_sender);
191 self.request_sender.send((request, response_sender))?;
192 Ok(ResponseReceiver::new(response_receiver))
193 }
194}