nitox/
client.rs

1use bytes::Bytes;
2
3use futures::{
4    future::{self, Either},
5    prelude::*,
6    stream,
7    sync::mpsc,
8    Future,
9};
10use parking_lot::RwLock;
11use std::{
12    collections::HashMap,
13    net::{SocketAddr, ToSocketAddrs},
14    str::FromStr,
15    sync::Arc,
16};
17use tokio_executor;
18use url::Url;
19
20use error::NatsError;
21use net::*;
22use protocol::{commands::*, Op};
23
24/// Sink (write) part of a TCP stream
25type NatsSink = stream::SplitSink<NatsConnection>;
26/// Stream (read) part of a TCP stream
27type NatsStream = stream::SplitStream<NatsConnection>;
28/// Useless pretty much, just for code semantics
29type NatsSubscriptionId = String;
30
31/// Keep-alive for the sink, also supposed to take care of handling verbose messaging, but can't for now
32#[derive(Clone, Debug)]
33struct NatsClientSender {
34    tx: mpsc::UnboundedSender<Op>,
35    verbose: bool,
36}
37
38impl NatsClientSender {
39    pub fn new(sink: NatsSink) -> Self {
40        let (tx, rx) = mpsc::unbounded();
41        let rx = rx.map_err(|_| NatsError::InnerBrokenChain);
42        let work = sink.send_all(rx).map(|_| ()).map_err(|_| ());
43        tokio_executor::spawn(work);
44
45        NatsClientSender { tx, verbose: false }
46    }
47
48    #[allow(dead_code)]
49    pub fn set_verbose(&mut self, verbose: bool) {
50        self.verbose = verbose;
51    }
52
53    /// Sends an OP to the server
54    pub fn send(&self, op: Op) -> impl Future<Item = (), Error = NatsError> {
55        //let _verbose = self.verbose.clone();
56        self.tx
57            .unbounded_send(op)
58            .map_err(|_| NatsError::InnerBrokenChain)
59            .into_future()
60    }
61}
62
63#[derive(Debug)]
64struct SubscriptionSink {
65    tx: mpsc::UnboundedSender<Message>,
66    max_count: Option<u32>,
67    count: u32,
68}
69
70/// Internal multiplexer for incoming streams and subscriptions. Quite a piece of code, with almost no overhead yay
71#[derive(Debug)]
72struct NatsClientMultiplexer {
73    other_tx: Arc<mpsc::UnboundedSender<Op>>,
74    subs_tx: Arc<RwLock<HashMap<NatsSubscriptionId, SubscriptionSink>>>,
75}
76
77impl NatsClientMultiplexer {
78    pub fn new(stream: NatsStream) -> (Self, mpsc::UnboundedReceiver<Op>) {
79        let subs_tx: Arc<RwLock<HashMap<NatsSubscriptionId, SubscriptionSink>>> =
80            Arc::new(RwLock::new(HashMap::default()));
81
82        let (other_tx, other_rx) = mpsc::unbounded();
83        let other_tx = Arc::new(other_tx);
84
85        let stx_inner = Arc::clone(&subs_tx);
86        let otx_inner = Arc::clone(&other_tx);
87
88        // Here we filter the incoming TCP stream Messages by subscription ID and sending it to the appropriate Sender
89        let work_tx = stream
90            .for_each(move |op| {
91                match op {
92                    Op::MSG(msg) => {
93                        debug!(target: "nitox", "Found MSG from global Stream {:?}", msg);
94                        if let Some(s) = (*stx_inner.read()).get(&msg.sid) {
95                            debug!(target: "nitox", "Found multiplexed receiver to send to {}", msg.sid);
96                            let _ = s.tx.unbounded_send(msg);
97                        }
98                    }
99                    // Forward the rest of the messages to the owning client
100                    op => {
101                        debug!(target: "nitox", "Sending OP to the rest of the queue: {:?}", op);
102                        let _ = otx_inner.unbounded_send(op);
103                    }
104                }
105
106                future::ok::<(), NatsError>(())
107            }).map(|_| ())
108            .map_err(|_| ());
109
110        tokio_executor::spawn(work_tx);
111
112        (NatsClientMultiplexer { subs_tx, other_tx }, other_rx)
113    }
114
115    pub fn for_sid(&self, sid: NatsSubscriptionId) -> impl Stream<Item = Message, Error = NatsError> + Send + Sync {
116        let (tx, rx) = mpsc::unbounded();
117        (*self.subs_tx.write()).insert(
118            sid,
119            SubscriptionSink {
120                tx,
121                max_count: None,
122                count: 0,
123            },
124        );
125
126        rx.map_err(|_| NatsError::InnerBrokenChain)
127    }
128
129    pub fn remove_sid(&self, sid: &str) {
130        (*self.subs_tx.write()).remove(sid);
131    }
132}
133
134/// Options that are to be given to the client for initialization
135#[derive(Debug, Default, Clone, Builder)]
136#[builder(setter(into))]
137pub struct NatsClientOptions {
138    /// CONNECT command that will be sent upon calling the `connect()` method
139    pub connect_command: ConnectCommand,
140    /// Cluster URI in the IP:PORT format
141    pub cluster_uri: String,
142}
143
144impl NatsClientOptions {
145    pub fn builder() -> NatsClientOptionsBuilder {
146        NatsClientOptionsBuilder::default()
147    }
148}
149
150/// The NATS Client. What you'll be using mostly. All the async handling is made internally except for
151/// the system messages that are forwarded on the `Stream` that the client implements
152pub struct NatsClient {
153    /// Backup of options
154    opts: NatsClientOptions,
155    /// Server info
156    server_info: Arc<RwLock<Option<ServerInfo>>>,
157    /// Stream of the messages that are not caught for subscriptions (only system messages like PING/PONG should be here)
158    other_rx: Box<dyn Stream<Item = Op, Error = NatsError> + Send + Sync>,
159    /// Sink part to send commands
160    tx: NatsClientSender,
161    /// Subscription multiplexer
162    rx: Arc<NatsClientMultiplexer>,
163}
164
165impl ::std::fmt::Debug for NatsClient {
166    fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result {
167        f.debug_struct("NatsClient")
168            .field("opts", &self.opts)
169            .field("tx", &self.tx)
170            .field("rx", &self.rx)
171            .field("other_rx", &"Box<Stream>...")
172            .finish()
173    }
174}
175
176impl Stream for NatsClient {
177    type Error = NatsError;
178    type Item = Op;
179
180    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
181        self.other_rx.poll().map_err(|_| NatsError::InnerBrokenChain)
182    }
183}
184
185impl NatsClient {
186    /// Creates a client and initiates a connection to the server
187    ///
188    /// Returns `impl Future<Item = Self, Error = NatsError>`
189    pub fn from_options(opts: NatsClientOptions) -> impl Future<Item = Self, Error = NatsError> + Send + Sync {
190        let tls_required = opts.connect_command.tls_required;
191
192        let cluster_uri = opts.cluster_uri.clone();
193        let cluster_sa = if let Ok(sockaddr) = SocketAddr::from_str(&cluster_uri) {
194            Ok(sockaddr)
195        } else {
196            match cluster_uri.to_socket_addrs() {
197                Ok(mut ips_iter) => ips_iter.next().ok_or(NatsError::UriDNSResolveError(None)),
198                Err(e) => Err(NatsError::UriDNSResolveError(Some(e))),
199            }
200        };
201
202        future::result(cluster_sa)
203            .from_err()
204            .and_then(move |cluster_sa| {
205                if tls_required {
206                    match Url::parse(&cluster_uri) {
207                        Ok(url) => match url.host_str() {
208                            Some(host) => future::ok(Either::B(connect_tls(host.to_string(), cluster_sa))),
209                            None => future::err(NatsError::TlsHostMissingError),
210                        },
211                        Err(e) => future::err(e.into()),
212                    }
213                } else {
214                    future::ok(Either::A(connect(cluster_sa)))
215                }
216            }).and_then(|either| either)
217            .and_then(move |connection| {
218                let (sink, stream): (NatsSink, NatsStream) = connection.split();
219                let (rx, other_rx) = NatsClientMultiplexer::new(stream);
220                let tx = NatsClientSender::new(sink);
221
222                let (tmp_other_tx, tmp_other_rx) = mpsc::unbounded();
223                let tx_inner = tx.clone();
224                let client = NatsClient {
225                    tx,
226                    server_info: Arc::new(RwLock::new(None)),
227                    other_rx: Box::new(tmp_other_rx.map_err(|_| NatsError::InnerBrokenChain)),
228                    rx: Arc::new(rx),
229                    opts,
230                };
231
232                let server_info_arc = Arc::clone(&client.server_info);
233
234                tokio_executor::spawn(
235                    other_rx
236                        .for_each(move |op| {
237                            match op {
238                                Op::PING => {
239                                    tokio_executor::spawn(tx_inner.send(Op::PONG).map_err(|_| ()));
240                                    let _ = tmp_other_tx.unbounded_send(op);
241                                }
242                                Op::INFO(server_info) => {
243                                    *server_info_arc.write() = Some(server_info);
244                                }
245                                op => {
246                                    let _ = tmp_other_tx.unbounded_send(op);
247                                }
248                            }
249
250                            future::ok(())
251                        }).into_future()
252                        .map_err(|_| ()),
253                );
254
255                future::ok(client)
256            })
257    }
258
259    /// Sends the CONNECT command to the server to setup connection
260    ///
261    /// Returns `impl Future<Item = Self, Error = NatsError>`
262    pub fn connect(self) -> impl Future<Item = Self, Error = NatsError> + Send + Sync {
263        self.tx
264            .send(Op::CONNECT(self.opts.connect_command.clone()))
265            .and_then(move |_| future::ok(self))
266    }
267
268    /// Send a raw command to the server
269    ///
270    /// Returns `impl Future<Item = Self, Error = NatsError>`
271    #[deprecated(
272        since = "0.1.4",
273        note = "Using this method prevents the library to track what you are sending to the server and causes memory leaks in case of subscriptions/unsubs, it'll be fully removed in v0.2.0"
274    )]
275    pub fn send(self, op: Op) -> impl Future<Item = Self, Error = NatsError> {
276        self.tx.send(op).and_then(move |_| future::ok(self))
277    }
278
279    /// Send a PUB command to the server
280    ///
281    /// Returns `impl Future<Item = (), Error = NatsError>`
282    pub fn publish(&self, cmd: PubCommand) -> impl Future<Item = (), Error = NatsError> + Send + Sync {
283        if let Some(ref server_info) = *self.server_info.read() {
284            if cmd.payload.len() > server_info.max_payload as usize {
285                return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload)));
286            }
287        }
288
289        Either::B(self.tx.send(Op::PUB(cmd)))
290    }
291
292    /// Send a UNSUB command to the server and de-register stream in the multiplexer
293    ///
294    /// Returns `impl Future<Item = (), Error = NatsError>`
295    pub fn unsubscribe(&self, cmd: UnsubCommand) -> impl Future<Item = (), Error = NatsError> + Send + Sync {
296        if let Some(max) = cmd.max_msgs {
297            if let Some(mut s) = (*self.rx.subs_tx.write()).get_mut(&cmd.sid) {
298                s.max_count = Some(max);
299            }
300        }
301
302        self.tx.send(Op::UNSUB(cmd))
303    }
304
305    /// Send a SUB command and register subscription stream in the multiplexer and return that `Stream` in a future
306    ///
307    /// Returns `impl Future<Item = impl Stream<Item = Message, Error = NatsError>>`
308    pub fn subscribe(
309        &self,
310        cmd: SubCommand,
311    ) -> impl Future<Item = impl Stream<Item = Message, Error = NatsError> + Send + Sync, Error = NatsError> + Send + Sync
312    {
313        let inner_rx = self.rx.clone();
314        let sid = cmd.sid.clone();
315        self.tx.send(Op::SUB(cmd)).and_then(move |_| {
316            let stream = inner_rx.for_sid(sid.clone()).and_then(move |msg| {
317                {
318                    let mut stx = inner_rx.subs_tx.write();
319                    let mut delete = None;
320                    debug!(target: "nitox", "Retrieving sink for sid {:?}", sid);
321                    if let Some(s) = stx.get_mut(&sid) {
322                        debug!(target: "nitox", "Checking if count exists");
323                        if let Some(max_count) = s.max_count {
324                            s.count += 1;
325                            debug!(target: "nitox", "Max: {} / current: {}", max_count, s.count);
326                            if s.count >= max_count {
327                                debug!(target: "nitox", "Starting deletion");
328                                delete = Some(max_count);
329                            }
330                        }
331                    }
332
333                    if let Some(count) = delete.take() {
334                        debug!(target: "nitox", "Deleted stream for sid {} at count {}", sid, count);
335                        stx.remove(&sid);
336                        return Err(NatsError::SubscriptionReachedMaxMsgs(count));
337                    }
338                }
339
340                Ok(msg)
341            });
342
343            future::ok(stream)
344        })
345    }
346
347    /// Performs a request to the server following the Request/Reply pattern. Returns a future containing the MSG that will be replied at some point by a third party
348    ///
349    /// Returns `impl Future<Item = Message, Error = NatsError>`
350    pub fn request(
351        &self,
352        subject: String,
353        payload: Bytes,
354    ) -> impl Future<Item = Message, Error = NatsError> + Send + Sync {
355        if let Some(ref server_info) = *self.server_info.read() {
356            if payload.len() > server_info.max_payload as usize {
357                return Either::A(future::err(NatsError::MaxPayloadOverflow(server_info.max_payload)));
358            }
359        }
360
361        let inbox = PubCommand::generate_reply_to();
362        let pub_cmd = PubCommand {
363            subject,
364            payload,
365            reply_to: Some(inbox.clone()),
366        };
367
368        let sub_cmd = SubCommand {
369            queue_group: None,
370            sid: SubCommand::generate_sid(),
371            subject: inbox,
372        };
373
374        let sid = sub_cmd.sid.clone();
375
376        let unsub_cmd = UnsubCommand {
377            sid: sub_cmd.sid.clone(),
378            max_msgs: Some(1),
379        };
380
381        let tx1 = self.tx.clone();
382        let tx2 = self.tx.clone();
383        let rx_arc = Arc::clone(&self.rx);
384
385        let stream = self
386            .rx
387            .for_sid(sid.clone())
388            .inspect(|msg| debug!(target: "nitox", "Request saw msg in multiplexed stream {:#?}", msg))
389            .take(1)
390            .into_future()
391            .map(|(surely_message, _)| surely_message.unwrap())
392            .map_err(|(e, _)| e)
393            .and_then(move |msg| {
394                rx_arc.remove_sid(&sid);
395                future::ok(msg)
396            });
397
398        Either::B(
399            self.tx
400                .send(Op::SUB(sub_cmd))
401                .and_then(move |_| tx1.send(Op::UNSUB(unsub_cmd)))
402                .and_then(move |_| tx2.send(Op::PUB(pub_cmd)))
403                .and_then(move |_| stream),
404        )
405    }
406}