1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
//! mpsc_requests rewritten for crossbeam, written by @stjepang (https://github.com/crossbeam-rs/crossbeam/issues/353#issuecomment-484013974) //! //! crossbeam_requests is a small library built on top of crossbeam-channel but with //! the addition of the consumer responding with a message to the producer. //! Since the producer no longer only produces and the consumer no longer only consumes, the //! Producer is renamed to [RequestSender] and the Consumer is renamed to [RequestReceiver]. //! //! This library is based on crossbeam-requests instead of mpsc channels in the standard library //! because crossbeam has better performance and better compatibility with android. //! //! A perfect use-case for this library is single-threaded databases which need //! to be accessed from multiple threads (such as SQLite) //! //! Here's a diagram of the dataflow //! //! |--------------------------------------------------------------------------------------| //! | Thread | Request thread | Respond thread | Request thread | //! |--------------------------------------------------------------------------------------| //! | Struct | RequestSender -> RequestReceiver -> ResponseSender -> ResponseReceiver | //! | (methods) | (request) -> (poll, poll_loop) -> (respond) -> (collect) | //! |--------------------------------------------------------------------------------------| //! //! # Examples //! For more examples, see the examples directory //! //! For even more examples see the tests in the tests directory //! //! ## Simple echo example //! ```rust,run //! use std::thread; //! use mpsc_requests::channel; //! //! type RequestType = String; //! type ResponseType = String; //! let (requester, responder) = channel::<RequestType, ResponseType>(); //! thread::spawn(move || { //! responder.poll_loop(|req, res_sender| { //! res_sender.respond(req); //! }); //! }); //! let msg = String::from("Hello"); //! let receiver = requester.request(msg.clone()).unwrap(); //! let res = receiver.collect().unwrap(); //! assert_eq!(res, msg); //! ``` #![deny(missing_docs)] use crossbeam_channel as cc; /// Create a [RequestSender] and a [RequestReceiver] with a channel between them /// /// The [RequestSender] can be cloned to be able to do requests to the same [RequestReceiver] from multiple /// threads. pub fn channel<Req, Res>() -> (RequestSender<Req, Res>, RequestReceiver<Req, Res>) { let (request_sender, request_receiver) = cc::unbounded::<(Req, ResponseSender<Res>)>(); let request_sender = RequestSender::new(request_sender); let request_receiver = RequestReceiver::new(request_receiver); (request_sender, request_receiver) } #[derive(Debug)] /// Errors which can occur when a [RequestReceiver] handles a request pub enum RequestError { /// Error occuring when channel from [RequestSender] to [RequestReceiver] is broken RecvError, /// Error occuring when channel from [RequestReceiver] to [RequestSender] is broken SendError } impl From<cc::RecvError> for RequestError { fn from(_err: cc::RecvError) -> RequestError { RequestError::RecvError } } impl<T> From<cc::SendError<T>> for RequestError { fn from(_err: cc::SendError<T>) -> RequestError { RequestError::SendError } } /// A [ResponseSender] is received from the [RequestReceiver] to respond to the request back to the /// [RequestSender] pub struct ResponseSender<Res> { response_sender: cc::Sender<Res>, } impl<Res> ResponseSender<Res> { fn new(response_sender: cc::Sender<Res>) -> ResponseSender<Res> { ResponseSender { response_sender: response_sender, } } /// Responds a request from the [RequestSender] which finishes the request pub fn respond(&self, response: Res) { match self.response_sender.send(response) { Ok(_) => (), Err(_e) => panic!("Response failed, send pipe was broken during request!") } } } /// A [RequestReceiver] listens to requests. Requests are a tuple of a message /// and a [ResponseSender] which is used to respond back to the [ResponseReceiver] pub struct RequestReceiver<Req, Res> { request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>, } impl<Req, Res> RequestReceiver<Req, Res> { fn new(request_receiver: cc::Receiver<(Req, ResponseSender<Res>)>) -> RequestReceiver<Req, Res> { RequestReceiver { request_receiver, } } /// Poll if the [RequestReceiver] has received any requests. /// The poll returns a tuple of the request message and a [ResponseSender] /// which is used to respond back to the ResponseReceiver. /// /// NOTE: It is considered an programmer error to not send a response with /// the [ResponseSender] /// /// This call is blocking /// TODO: add a poll equivalent which is not blocking and/or has a timeout pub fn poll(&self) -> Result<(Req, ResponseSender<Res>), RequestError> { match self.request_receiver.recv() { Ok((request, response_sender)) => Ok((request, response_sender)), Err(_e) => Err(RequestError::RecvError) } } /// A shorthand for running poll with a closure for as long as there is one or more [RequestSender]s alive /// referencing this [RequestReceiver] pub fn poll_loop<F>(&self, mut f: F) where F: FnMut(Req, ResponseSender<Res>) { loop { match self.poll() { Ok((request, response_sender)) => f(request, response_sender), Err(e) => match e { // No more send channels open, quitting RequestError::RecvError => break, _ => panic!("This is a bug") } }; } } } /// [ResponseReceiver] listens for a response from a [ResponseSender]. /// The response is received using the collect method. pub struct ResponseReceiver<Res> { response_receiver: cc::Receiver<Res>, } impl <Res> Clone for ResponseReceiver<Res> { fn clone(&self) -> Self { ResponseReceiver { response_receiver: self.response_receiver.clone() } } } impl<Res> ResponseReceiver<Res> { fn new(response_receiver: cc::Receiver<Res>) -> ResponseReceiver<Res> { ResponseReceiver { response_receiver } } /// Send request to the connected [RequestReceiver] pub fn collect(&self) -> Result<Res, RequestError> { Ok(self.response_receiver.recv()?) } } /// [RequestSender] has a connection to a [RequestReceiver] to which it can /// send a requests to. /// The request method is used to make a request and it returns a /// [ResponseReceiver] which is used to receive the response. pub struct RequestSender<Req, Res> { request_sender: cc::Sender<(Req, ResponseSender<Res>)>, } impl<Req, Res> Clone for RequestSender<Req, Res> { fn clone(&self) -> Self { RequestSender { request_sender: self.request_sender.clone() } } } impl<Req, Res> RequestSender<Req, Res> { fn new(request_sender: cc::Sender<(Req, ResponseSender<Res>)>) -> RequestSender<Req, Res> { RequestSender { request_sender: request_sender, } } /// Send request to the connected [RequestReceiver] /// Returns a [RequestReceiver] which is used to receive the response. pub fn request(&self, request: Req) -> Result<ResponseReceiver<Res>, RequestError> { let (response_sender, response_receiver) = cc::unbounded::<Res>(); let response_sender = ResponseSender::new(response_sender); self.request_sender.send((request, response_sender))?; Ok(ResponseReceiver::new(response_receiver)) } }