nsq_client/
conn.rs

1// MIT License
2//
3// Copyright (c) 2019-2021 Alessandro Cresto Miseroglio <alex179ohm@gmail.com>
4// Copyright (c) 2019-2021 Tangram Technologies S.R.L. <https://tngrm.io>
5//
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12//
13// The above copyright notice and this permission notice shall be included in all
14// copies or substantial portions of the Software.
15//
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22// SOFTWARE.
23
24use std::io;
25use std::any::{Any, TypeId};
26
27use actix::actors::resolver::{Connect, Resolver};
28use actix::prelude::*;
29use backoff::backoff::Backoff as TcpBackoff;
30use backoff::ExponentialBackoff;
31use log::{error, info};
32use serde_json;
33use tokio_codec::FramedRead;
34use tokio_io::io::WriteHalf;
35use tokio_io::AsyncRead;
36use tokio_tcp::TcpStream;
37use futures::stream::once;
38use fnv::FnvHashMap;
39
40use crate::codec::{NsqCodec, Cmd};
41use crate::commands::{identify, nop, rdy, sub, fin, auth, VERSION};
42use crate::config::{Config, NsqdConfig};
43use crate::error::Error;
44use crate::msgs::{
45    Auth, OnAuth, Sub, Ready, Cls,
46    Resume, Backoff, Fin, Msg,
47    NsqMsg, AddHandler, InFlight, OnIdentify, OnClose, OnBackoff, OnResume};
48use crate::auth::AuthResp;
49
50#[derive(Message, Clone)]
51pub struct TcpConnect(pub String);
52
53#[derive(Debug, PartialEq)]
54pub enum ConnState {
55    Neg,
56    Auth,
57    Sub,
58    Ready,
59    Started,
60    Backoff,
61    Resume,
62    Closing,
63    Stopped,
64}
65
66/// Tcp Connection to NSQ system.
67///
68/// Tries to connect to nsqd early as started:
69///
70/// # Examples
71/// ```no-run
72/// use actix::prelude::*;
73/// use nsq_client::Connection;
74///
75/// fn main() {
76///     let sys = System::new("consumer");
77///     Supervisor::start(|_| Connection::new(
78///         "test", // <- topic
79///         "test", // <- channel
80///         "0.0.0.0:4150", // <- nsqd tcp address
81///         None, // <- config (Optional)
82///         None, // <- secret used by Auth
83///         Some(1) // <- RDY setting for the Connection
84///     ));
85///     sys.run();
86/// }
87/// ```
88pub struct Connection
89{
90    addr: String,
91    handlers: Vec<Box<Any>>,
92    info_hashmap: FnvHashMap<TypeId, Box<Any>>,
93    topic: String,
94    channel: String,
95    config: Config,
96    secret: String,
97    tcp_backoff: ExponentialBackoff,
98    backoff: ExponentialBackoff,
99    cell: Option<actix::io::FramedWrite<WriteHalf<TcpStream>, NsqCodec>>,
100    state: ConnState,
101    rdy: u32,
102    in_flight: u32,
103    handler_ready: usize,
104}
105
106impl Default for Connection
107{
108    fn default() -> Connection {
109        Connection {
110            handlers: Vec::new(),
111            info_hashmap: FnvHashMap::default(),
112            topic: String::new(),
113            channel: String::new(),
114            config: Config::default(),
115            secret: String::new(),
116            tcp_backoff: ExponentialBackoff::default(),
117            backoff: ExponentialBackoff::default(),
118            cell: None,
119            state: ConnState::Neg,
120            addr: String::new(),
121            rdy: 1,
122            in_flight: 0,
123            handler_ready: 0,
124        }
125    }
126}
127
128impl Connection
129{
130    /// Return a Tcp Connection to nsqd.
131    ///
132    /// * `topic`    - Topic String
133    /// * `channel`  - Channel String
134    /// * `addr`     - Tcp address of nsqd
135    /// * `config`   - Optional [`Config`]
136    /// * `secret`   - Optional String used to autenticate to nsqd
137    /// * `rdy`      - Optional initial RDY setting
138    pub fn new<S: Into<String>>(
139        topic: S,
140        channel: S,
141        addr: S,
142        config: Option<Config>,
143        secret: Option<String>,
144        rdy: Option<u32>) -> Connection
145    {
146        let mut tcp_backoff = ExponentialBackoff::default();
147        let backoff = ExponentialBackoff::default();
148        let cfg = match config {
149            Some(cfg) => cfg,
150            None => Config::default(),
151        };
152        let mut scrt = String::new();
153        if let Some(sec) = secret {
154            scrt = sec;
155        }
156        let rdy = match rdy {
157            Some(r) => r,
158            None => 1,
159        };
160        tcp_backoff.max_elapsed_time = None;
161        Connection {
162            config: cfg,
163            secret: scrt,
164            tcp_backoff,
165            backoff,
166            cell: None,
167            topic: topic.into(),
168            channel: channel.into(),
169            state: ConnState::Neg,
170            handlers: Vec::new(),
171            info_hashmap: FnvHashMap::default(),
172            addr: addr.into(),
173            rdy: rdy,
174            in_flight: 0,
175            handler_ready: 0,
176        }
177    }
178}
179
180impl Connection {
181
182    fn info_in_flight(&self, n: u32) {
183        if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<InFlight>>()) {
184            if let Some(handler) = box_handler.downcast_ref::<Recipient<InFlight>>() {
185                match handler.do_send(InFlight(n)) {
186                    Ok(_) => {},
187                    Err(e) => {
188                        error!("sending InFlight: {}", e)
189                    }
190                }
191            }
192        }
193    }
194
195    fn info_on_auth(&self, resp: AuthResp) {
196        if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnAuth>>()) {
197            if let Some(handler) = box_handler.downcast_ref::<Recipient<OnAuth>>() {
198                match handler.do_send(OnAuth(resp)) {
199                    Ok(_) => {},
200                    Err(e) => {
201                        error!("sending OnAuth: {}", e);
202                    }
203                }
204            }
205        }
206    }
207
208    fn info_on_identify(&self, resp: NsqdConfig) {
209        if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnIdentify>>()) {
210            if let Some(handler) = box_handler.downcast_ref::<Recipient<OnIdentify>>() {
211                match handler.do_send(OnIdentify(resp)) {
212                    Ok(_) => {},
213                    Err(e) => {
214                        error!("sending OnIdentify: {}", e);
215                    }
216                }
217            }
218        }
219    }
220
221    fn info_on_close(&self, resp: bool) {
222        if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnClose>>()) {
223            if let Some(handler) = box_handler.downcast_ref::<Recipient<OnClose>>() {
224                match handler.do_send(OnClose(resp)) {
225                    Ok(_) => {},
226                    Err(e) => {
227                        error!("sending OnClose: {}", e);
228                    }
229                }
230            }
231        }
232    }
233
234    fn info_on_backoff(&self) {
235        if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnBackoff>>()) {
236            if let Some(handler) = box_handler.downcast_ref::<Recipient<OnBackoff>>() {
237                match handler.do_send(OnBackoff) {
238                    Ok(_) => {},
239                    Err(e) => {
240                        error!("sending OnBackoff: {}", e);
241                    }
242                }
243            }
244        }
245    }
246
247    fn info_on_resume(&self) {
248        if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnResume>>()) {
249            if let Some(handler) = box_handler.downcast_ref::<Recipient<OnResume>>() {
250                match handler.do_send(OnResume) {
251                    Ok(_) => {},
252                    Err(e) => {
253                        error!("sending OnBackoff: {}", e);
254                    }
255                }
256            }
257        }
258    }
259}
260
261impl Actor for Connection
262{
263    type Context = Context<Self>;
264
265    fn started(&mut self, ctx: &mut Context<Self>) {
266        info!("trying to connect [{}]", self.addr);
267        self.handler_ready = self.handlers.len();
268        ctx.add_message_stream(once(Ok(TcpConnect(self.addr.to_owned()))));
269    }
270}
271
272impl actix::io::WriteHandler<io::Error> for Connection
273{
274    fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
275        error!("nsqd connection dropped: {}", err);
276        Running::Stop
277    }
278}
279
280// TODO: implement error
281impl StreamHandler<Cmd, Error> for Connection
282{
283
284    fn finished(&mut self, ctx: &mut Self::Context) {
285        error!("Nsqd connection dropped");
286        ctx.stop();
287    }
288
289    fn error(&mut self, err: Error, _ctx: &mut Self::Context) -> Running {
290        error!("Something goes wrong decoding message: {}", err);
291        Running::Stop
292    }
293
294    fn handle(&mut self, msg: Cmd, ctx: &mut Self::Context)
295    {
296        match msg {
297            Cmd::Heartbeat => {
298                if let Some(ref mut cell) = self.cell {
299                    cell.write(nop());
300                } else {
301                    error!("Nsqd connection dropped. trying reconnecting");
302                    ctx.stop();
303                }
304            }
305            Cmd::Response(s) => {
306                match self.state {
307                    ConnState::Neg => {
308                        info!("trying negotiation [{}]", self.addr);
309                        let config: NsqdConfig = match serde_json::from_str(s.as_str()) {
310                            Ok(s) => { s },
311                            Err(err) => {
312                                error!("Negotiating json response invalid: {:?}", err);
313                                return ctx.stop();
314                            }
315                        };
316                        info!("configuration [{}] {:#?}", self.addr, config);
317                        self.info_on_identify(config.clone());
318                        if config.auth_required {
319                            info!("trying authentication [{}]", self.addr);
320                            ctx.notify(Auth);
321                        } else {
322                            info!("subscribing [{}] topic: {} channel: {}", self.addr, self.topic, self.channel);
323                            ctx.notify(Sub);
324                        }
325                    },
326                    ConnState::Auth => {
327                        let auth_resp: AuthResp = match serde_json::from_str(s.as_str()) {
328                            Ok(s) => { s },
329                            Err(err) => {
330                                error!("Auth json response invalid: {:?}", err);
331                                return ctx.stop();
332                            }
333                        };
334                        info!("authenticated [{}] {:#?}", self.addr, auth_resp);
335                        self.info_on_auth(auth_resp);
336                        ctx.notify(Sub);
337                    },
338                    ConnState::Sub => {
339                        ctx.notify(Sub);
340                    },
341                    ConnState::Ready => {
342                        ctx.notify(Ready(self.rdy));
343                    },
344                    ConnState::Closing => {
345                        self.info_on_close(true);
346                        self.state = ConnState::Stopped;
347                    },
348                    _ => {},
349                }
350            }
351            // TODO: implement msg_queue and tumable RDY for fast processing multiple msgs
352            Cmd::ResponseMsg(msgs) => {
353                //let mut count = self.rdy;
354                for (timestamp, attemps, id, body) in msgs {
355                    if self.handler_ready > 0 { self.handler_ready -= 1 };
356                    if let Some(handler) = self.handlers.get(self.handler_ready) {
357                        if let Some(rec) = handler.downcast_ref::<Recipient<Msg>>() {
358                            match rec.do_send(Msg{
359                                timestamp, attemps, id, body,
360                            }) {
361                                Ok(_s) => {
362                                    self.in_flight += 1;
363                                    self.info_in_flight(self.in_flight);
364                                },
365                                Err(e) => { error!("error sending msg to reader: {}", e) }
366                            }
367
368                        }
369                    }
370                    if self.handler_ready == 0 { self.handler_ready = self.handlers.len() }
371                }
372            },
373            Cmd::ResponseError(s) => {
374                if self.state == ConnState::Closing {
375                    error!("Closing connection: {}", s);
376                    self.info_on_close(false);
377                    self.state = ConnState::Started;
378                }
379                error!("failed: {}", s);
380            }
381            Cmd::Command(_) => {
382                if let Some(ref mut cell) = self.cell {
383                    cell.write(rdy(1));
384                }
385            }
386            _ => {},
387        }
388    }
389}
390
391impl Handler<TcpConnect> for Connection
392{
393    type Result=();
394    fn handle(&mut self, msg:TcpConnect, ctx: &mut Self::Context) {
395        Resolver::from_registry()
396            .send(Connect::host(msg.0.as_str()))
397            .into_actor(self)
398            .map(move |res, act, ctx| match res {
399                Ok(stream) => {
400                    info!("connected [{}]", msg.0);
401                    //stream.set_recv_buffer_size(act.config.output_buffer_size as usize);
402
403                    let (r, w) = stream.split();
404
405                    // configure write side of the connection
406                    let mut framed =
407                        actix::io::FramedWrite::new(w, NsqCodec{}, ctx);
408                    let mut rx = FramedRead::new(r, NsqCodec{});
409                    framed.write(Cmd::Magic(VERSION));
410                    // send configuration to nsqd
411                    let json = match serde_json::to_string(&act.config) {
412                        Ok(s) => s,
413                        Err(e) => {
414                            error!("config cannot be formatted as json string: {}", e);
415                            return ctx.stop();
416                        }
417                    };
418                    // read connection
419                    ctx.add_stream(rx);
420                    framed.write(identify(json));
421                    act.cell = Some(framed);
422
423                    act.backoff.reset();
424                    act.state = ConnState::Neg;
425                }
426                Err(err) => {
427                    error!("can not connect [{}]", err);
428                    // re-connect with backoff time.
429                    // we stop current context, supervisor will restart it.
430                    if let Some(timeout) = act.tcp_backoff.next_backoff() {
431                        ctx.run_later(timeout, |_, ctx| ctx.stop());
432                    }
433                }
434            })
435            .map_err(|err, act, ctx| {
436                error!("can not connect [{}]", err);
437                // re-connect with backoff time.
438                // we stop current context, supervisor will restart it.
439                if let Some(timeout) = act.tcp_backoff.next_backoff() {
440                    ctx.run_later(timeout, |_, ctx| ctx.stop());
441                }
442            })
443            .wait(ctx);
444    }
445}
446
447impl Handler<Cls> for Connection {
448    type Result=();
449    fn handle(&mut self, _msg: Cls, ctx: &mut Self::Context) {
450        self.state = ConnState::Closing;
451        ctx.stop();
452    }
453}
454
455impl Handler<Fin> for Connection
456{
457    type Result = ();
458    fn handle(&mut self, msg: Fin, ctx: &mut Self::Context) {
459        // discard the in_flight messages
460        if let Some(ref mut cell) = self.cell {
461            cell.write(fin(&msg.0));
462        }
463        if self.state == ConnState::Resume {
464            ctx.notify(Ready(self.rdy));
465            self.state = ConnState::Started;
466        }
467        self.in_flight -= 1;
468        self.info_in_flight(self.in_flight);
469    }
470}
471
472impl Handler<Ready> for Connection
473{
474    type Result = ();
475
476    fn handle(&mut self, msg: Ready, _ctx: &mut Self::Context) {
477        if self.state != ConnState::Ready {
478            self.rdy = msg.0;
479            return
480        }
481        if let Some(ref mut cell) = self.cell {
482            cell.write(rdy(msg.0));
483        }
484        if self.state == ConnState::Started {
485            self.rdy = msg.0;
486            info!("rdy updated [{}]", self.addr);
487
488        } else { self.state = ConnState::Started; info!("Ready to go [{}] RDY: {}", self.addr, msg.0); }
489    }
490}
491
492
493impl Handler<Auth> for Connection
494{
495    type Result = ();
496    fn handle(&mut self, _msg: Auth, ctx: &mut Self::Context) {
497        if let Some(ref mut cell) = self.cell {
498            cell.write(auth(self.secret.clone()));
499        } else {
500            error!("unable to identify: connection dropped [{}]", self.addr);
501            ctx.stop();
502        }
503        self.state = ConnState::Auth;
504    }
505}
506
507impl Handler<Sub> for Connection
508{
509    type Result = ();
510    fn handle(&mut self, _msg: Sub, ctx: &mut Self::Context) {
511        if let Some(ref mut cell) = self.cell {
512            cell.write(sub(&self.topic, &self.channel));
513        } else {
514            error!("unable to subscribing: connection dropped [{}]", self.addr);
515            ctx.stop();
516        }
517        self.state = ConnState::Ready;
518        info!("subscribed [{}] topic: {} channel: {}", self.addr, self.topic, self.channel);
519    }
520}
521
522impl Handler<Backoff> for Connection
523{
524    type Result=();
525    fn handle(&mut self, _msg: Backoff, ctx: &mut Self::Context) {
526        if let Some(timeout) = self.backoff.next_backoff() {
527            if let Some(ref mut cell) = self.cell {
528                cell.write(rdy(0));
529                ctx.run_later(timeout, |_, ctx| ctx.notify(Resume));
530                self.state = ConnState::Backoff;
531            } else {
532                error!("backoff failed: connection dropped [{}]", self.addr);
533                Self::add_stream(once::<Cmd, Error>(Err(Error::NotConnected)), ctx);
534            }
535            self.info_in_flight(0);
536            self.info_on_backoff();
537        }
538    }
539}
540
541impl Handler<Resume> for Connection
542{
543    type Result=();
544    fn handle(&mut self, _msg: Resume, ctx: &mut Self::Context) {
545        if let Some(ref mut cell) = self.cell {
546            cell.write(rdy(1));
547            self.state = ConnState::Resume;
548        } else {
549            error!("resume failed: connection dropped [{}]", self.addr);
550            Self::add_stream(once::<Cmd, Error>(Err(Error::NotConnected)), ctx);
551        }
552        self.info_in_flight(1);
553        self.info_on_resume();
554    }
555}
556
557impl<M: NsqMsg> Handler<AddHandler<M>> for Connection
558{
559    type Result=();
560    fn handle(&mut self, msg: AddHandler<M>, _: &mut Self::Context) {
561        let msg_id = TypeId::of::<Recipient<M>>();
562        if msg_id == TypeId::of::<Recipient<Msg>>() {
563            self.handlers.push(Box::new(msg.0));
564            info!("Reader added");
565        } else {
566            self.info_hashmap.insert(msg_id, Box::new(msg.0));
567            info!("info handler added");
568        }
569    }
570}
571
572impl Supervised for Connection
573{
574    fn restarting(&mut self, ctx: &mut Self::Context) {
575        if self.state == ConnState::Stopped {
576            ctx.stop();
577        }
578    }
579}