1use bytes::Bytes;
2
3use futures::{
4 future::{self, Either},
5 prelude::*,
6 stream,
7 sync::mpsc,
8 Future,
9};
10use parking_lot::RwLock;
11use std::{
12 collections::HashMap,
13 net::{SocketAddr, ToSocketAddrs},
14 str::FromStr,
15 sync::Arc,
16};
17use tokio_executor;
18use url::Url;
19
20use error::NatsError;
21use net::*;
22use protocol::{commands::*, Op};
23
24type NatsSink = stream::SplitSink<NatsConnection>;
26type NatsStream = stream::SplitStream<NatsConnection>;
28type NatsSubscriptionId = String;
30
31#[derive(Clone, Debug)]
33struct NatsClientSender {
34 tx: mpsc::UnboundedSender<Op>,
35 verbose: bool,
36}
37
38impl NatsClientSender {
39 pub fn new(sink: NatsSink) -> Self {
40 let (tx, rx) = mpsc::unbounded();
41 let rx = rx.map_err(|_| NatsError::InnerBrokenChain);
42 let work = sink.send_all(rx).map(|_| ()).map_err(|_| ());
43 tokio_executor::spawn(work);
44
45 NatsClientSender { tx, verbose: false }
46 }
47
48 #[allow(dead_code)]
49 pub fn set_verbose(&mut self, verbose: bool) {
50 self.verbose = verbose;
51 }
52
53 pub fn send(&self, op: Op) -> impl Future<Item = (), Error = NatsError> {
55 self.tx
57 .unbounded_send(op)
58 .map_err(|_| NatsError::InnerBrokenChain)
59 .into_future()
60 }
61}
62
63#[derive(Debug)]
64struct SubscriptionSink {
65 tx: mpsc::UnboundedSender<Message>,
66 max_count: Option<u32>,
67 count: u32,
68}
69
70#[derive(Debug)]
72struct NatsClientMultiplexer {
73 other_tx: Arc<mpsc::UnboundedSender<Op>>,
74 subs_tx: Arc<RwLock<HashMap<NatsSubscriptionId, SubscriptionSink>>>,
75}
76
77impl NatsClientMultiplexer {
78 pub fn new(stream: NatsStream) -> (Self, mpsc::UnboundedReceiver<Op>) {
79 let subs_tx: Arc<RwLock<HashMap<NatsSubscriptionId, SubscriptionSink>>> =
80 Arc::new(RwLock::new(HashMap::default()));
81
82 let (other_tx, other_rx) = mpsc::unbounded();
83 let other_tx = Arc::new(other_tx);
84
85 let stx_inner = Arc::clone(&subs_tx);
86 let otx_inner = Arc::clone(&other_tx);
87
88 let work_tx = stream
90 .for_each(move |op| {
91 match op {
92 Op::MSG(msg) => {
93 debug!(target: "nitox", "Found MSG from global Stream {:?}", msg);
94 if let Some(s) = (*stx_inner.read()).get(&msg.sid) {
95 debug!(target: "nitox", "Found multiplexed receiver to send to {}", msg.sid);
96 let _ = s.tx.unbounded_send(msg);
97 }
98 }
99 op => {
101 debug!(target: "nitox", "Sending OP to the rest of the queue: {:?}", op);
102 let _ = otx_inner.unbounded_send(op);
103 }
104 }
105
106 future::ok::<(), NatsError>(())
107 }).map(|_| ())
108 .map_err(|_| ());
109
110 tokio_executor::spawn(work_tx);
111
112 (NatsClientMultiplexer { subs_tx, other_tx }, other_rx)
113 }
114
115 pub fn for_sid(&self, sid: NatsSubscriptionId) -> impl Stream<Item = Message, Error = NatsError> + Send + Sync {
116 let (tx, rx) = mpsc::unbounded();
117 (*self.subs_tx.write()).insert(
118 sid,
119 SubscriptionSink {
120 tx,
121 max_count: None,
122 count: 0,
123 },
124 );
125
126 rx.map_err(|_| NatsError::InnerBrokenChain)
127 }
128
129 pub fn remove_sid(&self, sid: &str) {
130 (*self.subs_tx.write()).remove(sid);
131 }
132}
133
134#[derive(Debug, Default, Clone, Builder)]
136#[builder(setter(into))]
137pub struct NatsClientOptions {
138 pub connect_command: ConnectCommand,
140 pub cluster_uri: String,
142}
143
144impl NatsClientOptions {
145 pub fn builder() -> NatsClientOptionsBuilder {
146 NatsClientOptionsBuilder::default()
147 }
148}
149
150pub struct NatsClient {
153 opts: NatsClientOptions,
155 server_info: Arc<RwLock<Option<ServerInfo>>>,
157 other_rx: Box<dyn Stream<Item = Op, Error = NatsError> + Send + Sync>,
159 tx: NatsClientSender,
161 rx: Arc<NatsClientMultiplexer>,
163}
164
165impl ::std::fmt::Debug for NatsClient {
166 fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
167 f.debug_struct("NatsClient")
168 .field("opts", &self.opts)
169 .field("tx", &self.tx)
170 .field("rx", &self.rx)
171 .field("other_rx", &"Box<Stream>...")
172 .finish()
173 }
174}
175
176impl Stream for NatsClient {
177 type Error = NatsError;
178 type Item = Op;
179
180 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
181 self.other_rx.poll().map_err(|_| NatsError::InnerBrokenChain)
182 }
183}
184
185impl NatsClient {
186 pub fn from_options(opts: NatsClientOptions) -> impl Future<Item = Self, Error = NatsError> + Send + Sync {
190 let tls_required = opts.connect_command.tls_required;
191
192 let cluster_uri = opts.cluster_uri.clone();
193 let cluster_sa = if let Ok(sockaddr) = SocketAddr::from_str(&cluster_uri) {
194 Ok(sockaddr)
195 } else {
196 match cluster_uri.to_socket_addrs() {
197 Ok(mut ips_iter) => ips_iter.next().ok_or(NatsError::UriDNSResolveError(None)),
198 Err(e) => Err(NatsError::UriDNSResolveError(Some(e))),
199 }
200 };
201
202 future::result(cluster_sa)
203 .from_err()
204 .and_then(move |cluster_sa| {
205 if tls_required {
206 match Url::parse(&cluster_uri) {
207 Ok(url) => match url.host_str() {
208 Some(host) => future::ok(Either::B(connect_tls(host.to_string(), cluster_sa))),
209 None => future::err(NatsError::TlsHostMissingError),
210 },
211 Err(e) => future::err(e.into()),
212 }
213 } else {
214 future::ok(Either::A(connect(cluster_sa)))
215 }
216 }).and_then(|either| either)
217 .and_then(move |connection| {
218 let (sink, stream): (NatsSink, NatsStream) = connection.split();
219 let (rx, other_rx) = NatsClientMultiplexer::new(stream);
220 let tx = NatsClientSender::new(sink);
221
222 let (tmp_other_tx, tmp_other_rx) = mpsc::unbounded();
223 let tx_inner = tx.clone();
224 let client = NatsClient {
225 tx,
226 server_info: Arc::new(RwLock::new(None)),
227 other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)),
228 rx: Arc::new(rx),
229 opts,
230 };
231
232 let server_info_arc = Arc::clone(&client.server_info);
233
234 tokio_executor::spawn(
235 other_rx
236 .for_each(move |op| {
237 match op {
238 Op::PING => {
239 tokio_executor::spawn(tx_inner.send(Op::PONG).map_err(|_| ()));
240 let _ = tmp_other_tx.unbounded_send(op);
241 }
242 Op::INFO(server_info) => {
243 *server_info_arc.write() = Some(server_info);
244 }
245 op => {
246 let _ = tmp_other_tx.unbounded_send(op);
247 }
248 }
249
250 future::ok(())
251 }).into_future()
252 .map_err(|_| ()),
253 );
254
255 future::ok(client)
256 })
257 }
258
259 pub fn connect(self) -> impl Future<Item = Self, Error = NatsError> + Send + Sync {
263 self.tx
264 .send(Op::CONNECT(self.opts.connect_command.clone()))
265 .and_then(move |_| future::ok(self))
266 }
267
268 #[deprecated(
272 since = "0.1.4",
273 note = "Using this method prevents the library to track what you are sending to the server and causes memory leaks in case of subscriptions/unsubs, it'll be fully removed in v0.2.0"
274 )]
275 pub fn send(self, op: Op) -> impl Future<Item = Self, Error = NatsError> {
276 self.tx.send(op).and_then(move |_| future::ok(self))
277 }
278
279 pub fn publish(&self, cmd: PubCommand) -> impl Future<Item = (), Error = NatsError> + Send + Sync {
283 if let Some(ref server_info) = *self.server_info.read() {
284 if cmd.payload.len() > server_info.max_payload as usize {
285 return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload)));
286 }
287 }
288
289 Either::B(self.tx.send(Op::PUB(cmd)))
290 }
291
292 pub fn unsubscribe(&self, cmd: UnsubCommand) -> impl Future<Item = (), Error = NatsError> + Send + Sync {
296 if let Some(max) = cmd.max_msgs {
297 if let Some(mut s) = (*self.rx.subs_tx.write()).get_mut(&cmd.sid) {
298 s.max_count = Some(max);
299 }
300 }
301
302 self.tx.send(Op::UNSUB(cmd))
303 }
304
305 pub fn subscribe(
309 &self,
310 cmd: SubCommand,
311 ) -> impl Future<Item = impl Stream<Item = Message, Error = NatsError> + Send + Sync, Error = NatsError> + Send + Sync
312 {
313 let inner_rx = self.rx.clone();
314 let sid = cmd.sid.clone();
315 self.tx.send(Op::SUB(cmd)).and_then(move |_| {
316 let stream = inner_rx.for_sid(sid.clone()).and_then(move |msg| {
317 {
318 let mut stx = inner_rx.subs_tx.write();
319 let mut delete = None;
320 debug!(target: "nitox", "Retrieving sink for sid {:?}", sid);
321 if let Some(s) = stx.get_mut(&sid) {
322 debug!(target: "nitox", "Checking if count exists");
323 if let Some(max_count) = s.max_count {
324 s.count += 1;
325 debug!(target: "nitox", "Max: {} / current: {}", max_count, s.count);
326 if s.count >= max_count {
327 debug!(target: "nitox", "Starting deletion");
328 delete = Some(max_count);
329 }
330 }
331 }
332
333 if let Some(count) = delete.take() {
334 debug!(target: "nitox", "Deleted stream for sid {} at count {}", sid, count);
335 stx.remove(&sid);
336 return Err(NatsError::SubscriptionReachedMaxMsgs(count));
337 }
338 }
339
340 Ok(msg)
341 });
342
343 future::ok(stream)
344 })
345 }
346
347 pub fn request(
351 &self,
352 subject: String,
353 payload: Bytes,
354 ) -> impl Future<Item = Message, Error = NatsError> + Send + Sync {
355 if let Some(ref server_info) = *self.server_info.read() {
356 if payload.len() > server_info.max_payload as usize {
357 return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload)));
358 }
359 }
360
361 let inbox = PubCommand::generate_reply_to();
362 let pub_cmd = PubCommand {
363 subject,
364 payload,
365 reply_to: Some(inbox.clone()),
366 };
367
368 let sub_cmd = SubCommand {
369 queue_group: None,
370 sid: SubCommand::generate_sid(),
371 subject: inbox,
372 };
373
374 let sid = sub_cmd.sid.clone();
375
376 let unsub_cmd = UnsubCommand {
377 sid: sub_cmd.sid.clone(),
378 max_msgs: Some(1),
379 };
380
381 let tx1 = self.tx.clone();
382 let tx2 = self.tx.clone();
383 let rx_arc = Arc::clone(&self.rx);
384
385 let stream = self
386 .rx
387 .for_sid(sid.clone())
388 .inspect(|msg| debug!(target: "nitox", "Request saw msg in multiplexed stream {:#?}", msg))
389 .take(1)
390 .into_future()
391 .map(|(surely_message, _)| surely_message.unwrap())
392 .map_err(|(e, _)| e)
393 .and_then(move |msg| {
394 rx_arc.remove_sid(&sid);
395 future::ok(msg)
396 });
397
398 Either::B(
399 self.tx
400 .send(Op::SUB(sub_cmd))
401 .and_then(move |_| tx1.send(Op::UNSUB(unsub_cmd)))
402 .and_then(move |_| tx2.send(Op::PUB(pub_cmd)))
403 .and_then(move |_| stream),
404 )
405 }
406}