nsq_client/
producer.rs

1// MIT License
2// 
3// Copyright (c) 2019-2021 Alessandro Cresto Miseroglio <alex179ohm@gmail.com>
4// Copyright (c) 2019-2021 Tangram Technologies S.R.L. <https://tngrm.io>
5// 
6// Permission is hereby granted, free of charge, to any person obtaining a copy
7// of this software and associated documentation files (the "Software"), to deal
8// in the Software without restriction, including without limitation the rights
9// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10// copies of the Software, and to permit persons to whom the Software is
11// furnished to do so, subject to the following conditions:
12// 
13// The above copyright notice and this permission notice shall be included in all
14// copies or substantial portions of the Software.
15// 
16// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22// SOFTWARE.
23
24use std::io;
25use std::collections::VecDeque;
26
27use actix::actors::resolver::{Connect, Resolver};
28use actix::prelude::*;
29use futures::unsync::oneshot;
30use futures::Future;
31use backoff::backoff::Backoff;
32use backoff::ExponentialBackoff;
33use log::{error, info, debug};
34use serde_json;
35use tokio_codec::FramedRead;
36use tokio_io::io::WriteHalf;
37use tokio_io::AsyncRead;
38use tokio_tcp::TcpStream;
39//use bytes::BytesMut;
40
41use crate::codec::{NsqCodec, Cmd};
42use crate::commands::{identify, nop, auth, sub, rdy, publish, VERSION};
43use crate::config::{Config, NsqdConfig};
44use crate::error::Error;
45use crate::msgs::{Auth, Pub, Sub, Ready};
46use crate::conn::ConnState;
47
48pub struct Producer
49{
50    topic: String,
51    channel: String,
52    addr: String,
53    config: Config,
54    backoff: ExponentialBackoff,
55    cell: Option<actix::io::FramedWrite<WriteHalf<TcpStream>, NsqCodec>>,
56    state: ConnState,
57    queue: VecDeque<oneshot::Sender<Result<Cmd, Error>>>,
58    auth: String,
59//    rdy: u32,
60}
61
62impl Default for Producer
63{
64    fn default() -> Producer {
65        Producer {
66            topic: String::new(),
67            channel: String::new(),
68            addr: String::new(),
69            config: Config::default(),
70            backoff: ExponentialBackoff::default(),
71            cell: None,
72            state: ConnState::Neg,
73            queue: VecDeque::new(),
74            auth: String::new(),
75 //           rdy: 0,
76        }
77    }
78}
79
80impl Producer
81{
82    pub fn new<S: Into<String>>(
83        topic: S,
84        channel: S,
85        addr: S,
86        config: Option<Config>,
87        auth: S,
88 //       rdy: Option<u32>,
89    ) -> Producer {
90        let mut backoff = ExponentialBackoff::default();
91        let mut _rdy = 0;
92        //if rdy.is_some() { _rdy = rdy.unwrap() };
93        let cfg = match config {
94            Some(cfg) => cfg,
95            None => Config::default(),
96        };
97        backoff.max_elapsed_time = None;
98        Producer {
99            addr: addr.into(),
100            config: cfg,
101            backoff,
102            cell: None,
103            topic: topic.into(),
104            channel: channel.into(),
105            state: ConnState::Neg,
106            queue: VecDeque::new(),
107            auth: auth.into(),
108  //          rdy: _rdy,
109        }
110    }
111}
112
113impl Actor for Producer
114{
115    type Context = Context<Self>;
116
117    fn started(&mut self, ctx: &mut Context<Self>) {
118        Resolver::from_registry()
119            .send(Connect::host(self.addr.as_str()))
120            .into_actor(self)
121            .map(|res, act, ctx| match res {
122                Ok(stream) => {
123                    info!("Connected to nsqd: {}", act.addr);
124
125                    let (r, w) = stream.split();
126
127                    // write connection
128                    let mut framed =
129                        actix::io::FramedWrite::new(w, NsqCodec{}, ctx);
130                    let mut rx = FramedRead::new(r, NsqCodec{});
131
132                    // send magic version
133                    framed.write(Cmd::Magic(VERSION));
134                    // send configuration to nsqd
135                    let json = match serde_json::to_string(&act.config) {
136                        Ok(s) => s,
137                        Err(e) => {
138                            error!("Config cannot be formatted as json string: {}", e);
139                            return ctx.stop();
140                        }
141                    };
142                    // read connection
143                    ctx.add_stream(rx);
144                    framed.write(identify(json));
145                    act.cell = Some(framed);
146
147                    // reset backoff
148                    act.backoff.reset();
149                }
150                Err(err) => {
151                    error!("Can not connect to nsqd: {}", err);
152                    if let Some(timeout) = act.backoff.next_backoff() {
153                        ctx.run_later(timeout, |_, ctx| ctx.stop());
154                    }
155                }
156            })
157            .map_err(|err, act, ctx| {
158                error!("Can not connect to nsqd: {}", err);
159                if let Some(timeout) = act.backoff.next_backoff() {
160                    ctx.run_later(timeout, |_, ctx| ctx.stop());
161                }
162            })
163            .wait(ctx);
164    }
165}
166
167impl actix::io::WriteHandler<io::Error> for Producer
168{
169    fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
170        error!("nsqd connection dropped: {} error: {}", self.addr, err);
171        Running::Stop
172    }
173}
174
175// TODO: Implement error
176impl StreamHandler<Cmd, Error> for Producer
177{
178    fn error(&mut self, err: Error, _ctx: &mut Self::Context) -> Running {
179        match err {
180            Error::Remote(err) => {
181                if let Some(tx) = self.queue.pop_front() {
182                    let _ = tx.send(Err(Error::Remote(err)));
183                }
184                return Running::Continue
185            },
186            _ => {
187                error!("Something goes wrong decoding message");
188            },
189        };
190        Running::Stop
191    }
192
193    fn handle(&mut self, msg: Cmd, ctx: &mut Self::Context)
194    {
195        match msg {
196            Cmd::Heartbeat => {
197                debug!("received heartbeat");
198                if let Some(ref mut cell) = self.cell {
199                    cell.write(nop());
200                } else {
201                    error!("Nsqd connection dropped. trying reconnecting");
202                    ctx.stop();
203                }
204            }
205            Cmd::Response(s) => {
206                match self.state {
207                    ConnState::Neg => {
208                        let config: NsqdConfig = match serde_json::from_str(s.as_str()) {
209                            Ok(s) => s,
210                            Err(err) => {
211                                error!("Negotiating json response invalid: {:?}", err);
212                                return ctx.stop();
213                            }
214                        };
215                        debug!("json response: {:?}", config);
216                        if config.auth_required {
217                            ctx.notify(Auth);
218                        } else {
219                            //ctx.notify(Sub);
220                            self.state = ConnState::Started;
221                        }
222                    },
223                    ConnState::Sub => {
224                        ctx.notify(Sub);
225                    },
226                    ConnState::Ready => {
227                        debug!("sub response: {}", s);
228                        ctx.notify(Ready(0));
229                    }
230                    _ => {
231                        debug!("response: {}", s);
232                        if let Some(tx) = self.queue.pop_front() {
233                            let _ = tx.send(Ok(Cmd::Response(s)));
234                        }
235                    },
236                }
237            },
238            _ => {},
239        }
240    }
241}
242
243impl Handler<Auth> for Producer
244{
245    type Result = ();
246    fn handle(&mut self, _msg: Auth, ctx: &mut Self::Context) {
247        if let Some(ref mut cell) = self.cell {
248            cell.write(auth(self.auth.clone()));
249        } else {
250            error!("Unable to identify nsqd connection dropped");
251            ctx.stop();
252        }
253        self.state = ConnState::Sub;
254    }
255
256}
257
258impl Handler<Sub> for Producer
259{
260    type Result = ();
261
262    fn handle(&mut self, _msg: Sub, _ctx: &mut Self::Context) {
263        if let Some(ref mut cell) = self.cell {
264            let topic = self.topic.clone();
265            let channel = self.channel.clone();
266            cell.write(sub(topic.as_str(), channel.as_str()));
267        }
268        self.state = ConnState::Ready
269    }
270}
271
272impl Handler<Ready> for Producer
273{
274    type Result = ();
275
276    fn handle(&mut self, msg: Ready, _ctx: &mut Self::Context) {
277        if let Some(ref mut cell) = self.cell {
278            cell.write(rdy(msg.0));
279        }
280        if self.state != ConnState::Started { self.state = ConnState::Started }
281    }
282}
283
284
285impl Handler<Pub> for Producer
286{
287    type Result = ResponseFuture<Cmd, Error>;
288
289    fn handle(&mut self, msg: Pub, _ctx: &mut Self::Context) -> Self::Result {
290        let (tx, rx) = oneshot::channel();
291        if let Some(ref mut cell) = self.cell {
292            self.queue.push_back(tx);
293            let topic = self.topic.clone();
294            let m = &msg.0.clone();
295            println!("publish: {}", m);
296            cell.write(publish(topic.as_str(), &msg.0));
297        } else {
298            let _ = tx.send(Err(Error::NotConnected));
299        }
300        Box::new(rx.map_err(|_| Error::Disconnected).and_then(|res| res))
301    }
302}
303
304
305impl Supervised for Producer
306{
307    fn restarting(&mut self, _: &mut Self::Context) {}
308}