vertx_tcp_eventbus_bridge_client_rust/
future.rs1use std::collections::HashMap;
2use std::io::Error as IoError;
3use std::net::{IpAddr, SocketAddr};
4use std::sync::{Arc, RwLock};
5
6use crossbeam::sync::ArcCell;
7use futures::IntoFuture;
8use futures::stream::Forward;
9use futures::sync::mpsc::*;
10use futures::sync::oneshot::{self, Receiver as OneshotReceiver, Sender as OneshotSender};
11use serde_json::Value;
12use tokio::codec::{FramedRead, FramedWrite};
13use tokio::io::{ErrorKind, ReadHalf, WriteHalf};
14use tokio::io::AsyncRead;
15use tokio::net::TcpStream;
16use tokio::prelude::{Async, Stream};
17use tokio::prelude::future::Future;
18
19use crate::codec::{RequestCodec, ResponseCodec};
20use crate::request;
21use crate::request::Request;
22use crate::response::Response;
23
24pub struct Eventbus {
27 tx: Arc<RwLock<HashMap<String, Sender>>>,
28 writer: UnboundedSender<Request>,
29}
30
31pub struct EventbusWriteFuture {
35 inner: FramedWrite<WriteHalf<TcpStream>, RequestCodec>,
36 rx: UnboundedReceiverWithError<Request>,
37}
38
39pub struct EventbusReadStream {
44 reader: FramedRead<ReadHalf<TcpStream>, ResponseCodec>,
45 tx: Arc<RwLock<HashMap<String, Sender>>>,
46}
47
48enum Sender {
49 Unbounded(UnboundedSender<Response>),
50 Oneshot(ArcCell<Option<OneshotSender<Response>>>),
51}
52
53impl Stream for EventbusReadStream {
54 type Item = (Response, String);
55 type Error = std::io::Error;
56
57 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
58 match self.reader.poll() {
59 Ok(Async::Ready(Some((response, address)))) => {
60 self.send(&address, response.clone());
61 return Ok(Async::Ready(Some((response, address))));
62 }
63 other => other
64 }
65 }
66}
67
68pub struct UnboundedReceiverWithError<T>(pub UnboundedReceiver<T>);
69
70impl<T> Stream for UnboundedReceiverWithError<T> {
71 type Item = T;
72 type Error = IoError;
73
74 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
75 match self.0.poll() {
76 Err(_) => {
77 Err(IoError::new(ErrorKind::BrokenPipe, "UnboundedSender closed ?"))
78 }
79 Ok(other) => {
80 Ok(other)
81 }
82 }
83 }
84}
85
86impl IntoFuture for EventbusWriteFuture {
87 type Future = Forward<UnboundedReceiverWithError<Request>, FramedWrite<WriteHalf<TcpStream>, RequestCodec>>;
88 type Item = (UnboundedReceiverWithError<Request>, FramedWrite<WriteHalf<TcpStream>, RequestCodec>);
89 type Error = IoError;
90
91 fn into_future(self) -> Self::Future {
92 self.rx.forward(self.inner)
93 }
94}
95
96pub struct ResponseStream {
99 rx: UnboundedReceiver<Response>
100}
101
102pub struct ResponseFut {
105 rx: OneshotReceiver<Response>
106}
107
108impl Future for ResponseFut {
109 type Item = Response;
110 type Error = IoError;
111
112 fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
113 match self.rx.poll() {
114 Ok(i) => Ok(i),
115 Err(_) => Err(IoError::new(ErrorKind::Other, "canceled"))
116 }
117 }
118}
119
120impl Stream for ResponseStream {
121 type Item = Response;
122 type Error = ();
123
124 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
125 self.rx.poll().map_err(|_| ())
126 }
127}
128
129impl Eventbus {
130 pub fn connect(address: IpAddr, port: u16) -> impl Future<Item=(Eventbus, EventbusReadStream, EventbusWriteFuture), Error=IoError> {
131 TcpStream::connect(&SocketAddr::new(address.clone(), port)).map(move |s| {
132 let (write_tx, write_rx) = unbounded();
133 let map = Arc::new(RwLock::new(HashMap::new()));
134 let (r, w) = s.split();
135 let reader = FramedRead::new(r, ResponseCodec);
136 let writer = FramedWrite::new(w, RequestCodec);
137 let read_stream = EventbusReadStream {
138 reader,
139 tx: map.clone(),
140 };
141 let write_stream = EventbusWriteFuture {
142 inner: writer,
143 rx: UnboundedReceiverWithError(write_rx),
144 };
145 let eventbus = Eventbus {
146 tx: map,
147 writer: write_tx,
148 };
149 (eventbus, read_stream, write_stream)
150 })
151 }
152
153 fn send_frame(&self, req: Request) -> Result<(), SendError<Request>> {
154 self.writer.unbounded_send(req)
155 }
156
157 pub fn register(&self, address: String, headers: Option<Value>) -> Result<ResponseStream, SendError<Request>> {
158 let req = request::Request::Register {
159 address: address.to_string(),
160 headers,
161 };
162 self.send_frame(req).map(|_| {
163 let (tx, rx) = unbounded();
164 let mut map = self.tx.write().unwrap();
165 map.insert(address, Sender::Unbounded(tx));
166 ResponseStream {
167 rx
168 }
169 })
170 }
171
172 pub fn unregister(&self, address: String) {
173 let req = request::Request::Unregister {
174 address: address.to_string(),
175 headers: None,
176 };
177 self.send_frame(req).unwrap();
178 let mut map = self.tx.write().unwrap();
179 map.remove(&address);
180 }
181
182 pub fn ping(&mut self) -> Result<(), SendError<Request>> {
183 let s = request::Request::Ping;
184 self.send_frame(s)
185 }
186
187 pub fn send(&self, address: String, message: Value) -> Result<(), SendError<Request>> {
189 let req = request::Request::Send {
190 address: address.to_string(),
191 body: message,
192 headers: None,
193 replyAddress: Some(address.to_string()),
194 };
195 self.send_frame(req)
196 }
197
198 pub fn send_reply(&self, address: String, message: Value) -> Result<ResponseFut, SendError<Request>> {
200 let req = request::Request::Send {
201 address: address.to_string(),
202 body: message,
203 headers: None,
204 replyAddress: Some(address.to_string()),
205 };
206 self.send_frame(req).map(|_| {
207 let (tx, rx) = oneshot::channel();
208 let mut map = self.tx.write().unwrap();
209 map.insert(address, Sender::Oneshot(ArcCell::new(Arc::new(Some(tx)))));
210 ResponseFut {
211 rx
212 }
213 })
214 }
215
216 pub fn publish(&mut self, address: String, message: Value) -> Result<(), SendError<Request>> {
217 let req = request::Request::Publish {
218 address: address.to_string(),
219 body: message,
220 headers: None,
221 replyAddress: None,
222 };
223 self.send_frame(req)
224 }
225}
226
227impl EventbusReadStream {
228 fn send(&mut self, address: &String, response: Response) {
229 let remove = if let Some(tx) = self.tx.read().unwrap().get(address) {
230 match tx {
231 Sender::Unbounded(tx) => {
232 match tx.unbounded_send(response) {
233 Ok(_) => false,
234 Err(_) => true
235 }
236 }
237 Sender::Oneshot(cell) => {
238 let tx_opt = Arc::try_unwrap(cell.set(Arc::new(None))).unwrap();
239 if let Some(tx) = tx_opt {
240 tx.send(response).unwrap();
241 }
242 true
243 }
244 }
245 } else { false };
246 if remove {
247 if let Ok(mut map_mut) = self.tx.write() {
248 map_mut.remove(address);
249 }
250 }
251 }
252}