vertx_tcp_eventbus_bridge_client_rust/
future.rs

1use std::collections::HashMap;
2use std::io::Error as IoError;
3use std::net::{IpAddr, SocketAddr};
4use std::sync::{Arc, RwLock};
5
6use crossbeam::sync::ArcCell;
7use futures::IntoFuture;
8use futures::stream::Forward;
9use futures::sync::mpsc::*;
10use futures::sync::oneshot::{self, Receiver as OneshotReceiver, Sender as OneshotSender};
11use serde_json::Value;
12use tokio::codec::{FramedRead, FramedWrite};
13use tokio::io::{ErrorKind, ReadHalf, WriteHalf};
14use tokio::io::AsyncRead;
15use tokio::net::TcpStream;
16use tokio::prelude::{Async, Stream};
17use tokio::prelude::future::Future;
18
19use crate::codec::{RequestCodec, ResponseCodec};
20use crate::request;
21use crate::request::Request;
22use crate::response::Response;
23
24/// The core struct to communicate with vert.x eventbus.
25/// Can be created by calling `Eventbus::connect`.
26pub struct Eventbus {
27    tx: Arc<RwLock<HashMap<String, Sender>>>,
28    writer: UnboundedSender<Request>,
29}
30
31/// Responsible for writing messages to the server through vert.x protocol.
32/// Should be spawned into background.
33/// Can be created by calling `Eventbus::connect`.
34pub struct EventbusWriteFuture {
35    inner: FramedWrite<WriteHalf<TcpStream>, RequestCodec>,
36    rx: UnboundedReceiverWithError<Request>,
37}
38
39/// Responsible for reading messages from the server through vert.x protocol.
40/// It will decode and dispatch messages to related future/stream.
41/// Should be spawned into background.
42/// Can be created by calling `Eventbus::connect`.
43pub struct EventbusReadStream {
44    reader: FramedRead<ReadHalf<TcpStream>, ResponseCodec>,
45    tx: Arc<RwLock<HashMap<String, Sender>>>,
46}
47
48enum Sender {
49    Unbounded(UnboundedSender<Response>),
50    Oneshot(ArcCell<Option<OneshotSender<Response>>>),
51}
52
53impl Stream for EventbusReadStream {
54    type Item = (Response, String);
55    type Error = std::io::Error;
56
57    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
58        match self.reader.poll() {
59            Ok(Async::Ready(Some((response, address)))) => {
60                self.send(&address, response.clone());
61                return Ok(Async::Ready(Some((response, address))));
62            }
63            other => other
64        }
65    }
66}
67
68pub struct UnboundedReceiverWithError<T>(pub UnboundedReceiver<T>);
69
70impl<T> Stream for UnboundedReceiverWithError<T> {
71    type Item = T;
72    type Error = IoError;
73
74    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
75        match self.0.poll() {
76            Err(_) => {
77                Err(IoError::new(ErrorKind::BrokenPipe, "UnboundedSender closed ?"))
78            }
79            Ok(other) => {
80                Ok(other)
81            }
82        }
83    }
84}
85
86impl IntoFuture for EventbusWriteFuture {
87    type Future = Forward<UnboundedReceiverWithError<Request>, FramedWrite<WriteHalf<TcpStream>, RequestCodec>>;
88    type Item = (UnboundedReceiverWithError<Request>, FramedWrite<WriteHalf<TcpStream>, RequestCodec>);
89    type Error = IoError;
90
91    fn into_future(self) -> Self::Future {
92        self.rx.forward(self.inner)
93    }
94}
95
96/// A stream of response messages from a subscribe/register operation.
97/// Can be created by `Eventbus.register`.
98pub struct ResponseStream {
99    rx: UnboundedReceiver<Response>
100}
101
102/// A future of response messages from a send_reply operation.
103/// Can be created by `Eventbus.send_reply`.
104pub struct ResponseFut {
105    rx: OneshotReceiver<Response>
106}
107
108impl Future for ResponseFut {
109    type Item = Response;
110    type Error = IoError;
111
112    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
113        match self.rx.poll() {
114            Ok(i) => Ok(i),
115            Err(_) => Err(IoError::new(ErrorKind::Other, "canceled"))
116        }
117    }
118}
119
120impl Stream for ResponseStream {
121    type Item = Response;
122    type Error = ();
123
124    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
125        self.rx.poll().map_err(|_| ())
126    }
127}
128
129impl Eventbus {
130    pub fn connect(address: IpAddr, port: u16) -> impl Future<Item=(Eventbus, EventbusReadStream, EventbusWriteFuture), Error=IoError> {
131        TcpStream::connect(&SocketAddr::new(address.clone(), port)).map(move |s| {
132            let (write_tx, write_rx) = unbounded();
133            let map = Arc::new(RwLock::new(HashMap::new()));
134            let (r, w) = s.split();
135            let reader = FramedRead::new(r, ResponseCodec);
136            let writer = FramedWrite::new(w, RequestCodec);
137            let read_stream = EventbusReadStream {
138                reader,
139                tx: map.clone(),
140            };
141            let write_stream = EventbusWriteFuture {
142                inner: writer,
143                rx: UnboundedReceiverWithError(write_rx),
144            };
145            let eventbus = Eventbus {
146                tx: map,
147                writer: write_tx,
148            };
149            (eventbus, read_stream, write_stream)
150        })
151    }
152
153    fn send_frame(&self, req: Request) -> Result<(), SendError<Request>> {
154        self.writer.unbounded_send(req)
155    }
156
157    pub fn register(&self, address: String, headers: Option<Value>) -> Result<ResponseStream, SendError<Request>> {
158        let req = request::Request::Register {
159            address: address.to_string(),
160            headers,
161        };
162        self.send_frame(req).map(|_| {
163            let (tx, rx) = unbounded();
164            let mut map = self.tx.write().unwrap();
165            map.insert(address, Sender::Unbounded(tx));
166            ResponseStream {
167                rx
168            }
169        })
170    }
171
172    pub fn unregister(&self, address: String) {
173        let req = request::Request::Unregister {
174            address: address.to_string(),
175            headers: None,
176        };
177        self.send_frame(req).unwrap();
178        let mut map = self.tx.write().unwrap();
179        map.remove(&address);
180    }
181
182    pub fn ping(&mut self) -> Result<(), SendError<Request>> {
183        let s = request::Request::Ping;
184        self.send_frame(s)
185    }
186
187    /// send with no reply
188    pub fn send(&self, address: String, message: Value) -> Result<(), SendError<Request>> {
189        let req = request::Request::Send {
190            address: address.to_string(),
191            body: message,
192            headers: None,
193            replyAddress: Some(address.to_string()),
194        };
195        self.send_frame(req)
196    }
197
198    /// send with reply
199    pub fn send_reply(&self, address: String, message: Value) -> Result<ResponseFut, SendError<Request>> {
200        let req = request::Request::Send {
201            address: address.to_string(),
202            body: message,
203            headers: None,
204            replyAddress: Some(address.to_string()),
205        };
206        self.send_frame(req).map(|_| {
207            let (tx, rx) = oneshot::channel();
208            let mut map = self.tx.write().unwrap();
209            map.insert(address, Sender::Oneshot(ArcCell::new(Arc::new(Some(tx)))));
210            ResponseFut {
211                rx
212            }
213        })
214    }
215
216    pub fn publish(&mut self, address: String, message: Value) -> Result<(), SendError<Request>> {
217        let req = request::Request::Publish {
218            address: address.to_string(),
219            body: message,
220            headers: None,
221            replyAddress: None,
222        };
223        self.send_frame(req)
224    }
225}
226
227impl EventbusReadStream {
228    fn send(&mut self, address: &String, response: Response) {
229        let remove = if let Some(tx) = self.tx.read().unwrap().get(address) {
230            match tx {
231                Sender::Unbounded(tx) => {
232                    match tx.unbounded_send(response) {
233                        Ok(_) => false,
234                        Err(_) => true
235                    }
236                }
237                Sender::Oneshot(cell) => {
238                    let tx_opt = Arc::try_unwrap(cell.set(Arc::new(None))).unwrap();
239                    if let Some(tx) = tx_opt {
240                        tx.send(response).unwrap();
241                    }
242                    true
243                }
244            }
245        } else { false };
246        if remove {
247            if let Ok(mut map_mut) = self.tx.write() {
248                map_mut.remove(address);
249            }
250        }
251    }
252}