1use std::io;
25use std::any::{Any, TypeId};
26
27use actix::actors::resolver::{Connect, Resolver};
28use actix::prelude::*;
29use backoff::backoff::Backoff as TcpBackoff;
30use backoff::ExponentialBackoff;
31use log::{error, info};
32use serde_json;
33use tokio_codec::FramedRead;
34use tokio_io::io::WriteHalf;
35use tokio_io::AsyncRead;
36use tokio_tcp::TcpStream;
37use futures::stream::once;
38use fnv::FnvHashMap;
39
40use crate::codec::{NsqCodec, Cmd};
41use crate::commands::{identify, nop, rdy, sub, fin, auth, VERSION};
42use crate::config::{Config, NsqdConfig};
43use crate::error::Error;
44use crate::msgs::{
45 Auth, OnAuth, Sub, Ready, Cls,
46 Resume, Backoff, Fin, Msg,
47 NsqMsg, AddHandler, InFlight, OnIdentify, OnClose, OnBackoff, OnResume};
48use crate::auth::AuthResp;
49
50#[derive(Message, Clone)]
51pub struct TcpConnect(pub String);
52
53#[derive(Debug, PartialEq)]
54pub enum ConnState {
55 Neg,
56 Auth,
57 Sub,
58 Ready,
59 Started,
60 Backoff,
61 Resume,
62 Closing,
63 Stopped,
64}
65
66pub struct Connection
89{
90 addr: String,
91 handlers: Vec<Box<Any>>,
92 info_hashmap: FnvHashMap<TypeId, Box<Any>>,
93 topic: String,
94 channel: String,
95 config: Config,
96 secret: String,
97 tcp_backoff: ExponentialBackoff,
98 backoff: ExponentialBackoff,
99 cell: Option<actix::io::FramedWrite<WriteHalf<TcpStream>, NsqCodec>>,
100 state: ConnState,
101 rdy: u32,
102 in_flight: u32,
103 handler_ready: usize,
104}
105
106impl Default for Connection
107{
108 fn default() -> Connection {
109 Connection {
110 handlers: Vec::new(),
111 info_hashmap: FnvHashMap::default(),
112 topic: String::new(),
113 channel: String::new(),
114 config: Config::default(),
115 secret: String::new(),
116 tcp_backoff: ExponentialBackoff::default(),
117 backoff: ExponentialBackoff::default(),
118 cell: None,
119 state: ConnState::Neg,
120 addr: String::new(),
121 rdy: 1,
122 in_flight: 0,
123 handler_ready: 0,
124 }
125 }
126}
127
128impl Connection
129{
130 pub fn new<S: Into<String>>(
139 topic: S,
140 channel: S,
141 addr: S,
142 config: Option<Config>,
143 secret: Option<String>,
144 rdy: Option<u32>) -> Connection
145 {
146 let mut tcp_backoff = ExponentialBackoff::default();
147 let backoff = ExponentialBackoff::default();
148 let cfg = match config {
149 Some(cfg) => cfg,
150 None => Config::default(),
151 };
152 let mut scrt = String::new();
153 if let Some(sec) = secret {
154 scrt = sec;
155 }
156 let rdy = match rdy {
157 Some(r) => r,
158 None => 1,
159 };
160 tcp_backoff.max_elapsed_time = None;
161 Connection {
162 config: cfg,
163 secret: scrt,
164 tcp_backoff,
165 backoff,
166 cell: None,
167 topic: topic.into(),
168 channel: channel.into(),
169 state: ConnState::Neg,
170 handlers: Vec::new(),
171 info_hashmap: FnvHashMap::default(),
172 addr: addr.into(),
173 rdy: rdy,
174 in_flight: 0,
175 handler_ready: 0,
176 }
177 }
178}
179
180impl Connection {
181
182 fn info_in_flight(&self, n: u32) {
183 if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<InFlight>>()) {
184 if let Some(handler) = box_handler.downcast_ref::<Recipient<InFlight>>() {
185 match handler.do_send(InFlight(n)) {
186 Ok(_) => {},
187 Err(e) => {
188 error!("sending InFlight: {}", e)
189 }
190 }
191 }
192 }
193 }
194
195 fn info_on_auth(&self, resp: AuthResp) {
196 if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnAuth>>()) {
197 if let Some(handler) = box_handler.downcast_ref::<Recipient<OnAuth>>() {
198 match handler.do_send(OnAuth(resp)) {
199 Ok(_) => {},
200 Err(e) => {
201 error!("sending OnAuth: {}", e);
202 }
203 }
204 }
205 }
206 }
207
208 fn info_on_identify(&self, resp: NsqdConfig) {
209 if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnIdentify>>()) {
210 if let Some(handler) = box_handler.downcast_ref::<Recipient<OnIdentify>>() {
211 match handler.do_send(OnIdentify(resp)) {
212 Ok(_) => {},
213 Err(e) => {
214 error!("sending OnIdentify: {}", e);
215 }
216 }
217 }
218 }
219 }
220
221 fn info_on_close(&self, resp: bool) {
222 if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnClose>>()) {
223 if let Some(handler) = box_handler.downcast_ref::<Recipient<OnClose>>() {
224 match handler.do_send(OnClose(resp)) {
225 Ok(_) => {},
226 Err(e) => {
227 error!("sending OnClose: {}", e);
228 }
229 }
230 }
231 }
232 }
233
234 fn info_on_backoff(&self) {
235 if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnBackoff>>()) {
236 if let Some(handler) = box_handler.downcast_ref::<Recipient<OnBackoff>>() {
237 match handler.do_send(OnBackoff) {
238 Ok(_) => {},
239 Err(e) => {
240 error!("sending OnBackoff: {}", e);
241 }
242 }
243 }
244 }
245 }
246
247 fn info_on_resume(&self) {
248 if let Some(box_handler) = self.info_hashmap.get(&TypeId::of::<Recipient<OnResume>>()) {
249 if let Some(handler) = box_handler.downcast_ref::<Recipient<OnResume>>() {
250 match handler.do_send(OnResume) {
251 Ok(_) => {},
252 Err(e) => {
253 error!("sending OnBackoff: {}", e);
254 }
255 }
256 }
257 }
258 }
259}
260
261impl Actor for Connection
262{
263 type Context = Context<Self>;
264
265 fn started(&mut self, ctx: &mut Context<Self>) {
266 info!("trying to connect [{}]", self.addr);
267 self.handler_ready = self.handlers.len();
268 ctx.add_message_stream(once(Ok(TcpConnect(self.addr.to_owned()))));
269 }
270}
271
272impl actix::io::WriteHandler<io::Error> for Connection
273{
274 fn error(&mut self, err: io::Error, _: &mut Self::Context) -> Running {
275 error!("nsqd connection dropped: {}", err);
276 Running::Stop
277 }
278}
279
280impl StreamHandler<Cmd, Error> for Connection
282{
283
284 fn finished(&mut self, ctx: &mut Self::Context) {
285 error!("Nsqd connection dropped");
286 ctx.stop();
287 }
288
289 fn error(&mut self, err: Error, _ctx: &mut Self::Context) -> Running {
290 error!("Something goes wrong decoding message: {}", err);
291 Running::Stop
292 }
293
294 fn handle(&mut self, msg: Cmd, ctx: &mut Self::Context)
295 {
296 match msg {
297 Cmd::Heartbeat => {
298 if let Some(ref mut cell) = self.cell {
299 cell.write(nop());
300 } else {
301 error!("Nsqd connection dropped. trying reconnecting");
302 ctx.stop();
303 }
304 }
305 Cmd::Response(s) => {
306 match self.state {
307 ConnState::Neg => {
308 info!("trying negotiation [{}]", self.addr);
309 let config: NsqdConfig = match serde_json::from_str(s.as_str()) {
310 Ok(s) => { s },
311 Err(err) => {
312 error!("Negotiating json response invalid: {:?}", err);
313 return ctx.stop();
314 }
315 };
316 info!("configuration [{}] {:#?}", self.addr, config);
317 self.info_on_identify(config.clone());
318 if config.auth_required {
319 info!("trying authentication [{}]", self.addr);
320 ctx.notify(Auth);
321 } else {
322 info!("subscribing [{}] topic: {} channel: {}", self.addr, self.topic, self.channel);
323 ctx.notify(Sub);
324 }
325 },
326 ConnState::Auth => {
327 let auth_resp: AuthResp = match serde_json::from_str(s.as_str()) {
328 Ok(s) => { s },
329 Err(err) => {
330 error!("Auth json response invalid: {:?}", err);
331 return ctx.stop();
332 }
333 };
334 info!("authenticated [{}] {:#?}", self.addr, auth_resp);
335 self.info_on_auth(auth_resp);
336 ctx.notify(Sub);
337 },
338 ConnState::Sub => {
339 ctx.notify(Sub);
340 },
341 ConnState::Ready => {
342 ctx.notify(Ready(self.rdy));
343 },
344 ConnState::Closing => {
345 self.info_on_close(true);
346 self.state = ConnState::Stopped;
347 },
348 _ => {},
349 }
350 }
351 Cmd::ResponseMsg(msgs) => {
353 for (timestamp, attemps, id, body) in msgs {
355 if self.handler_ready > 0 { self.handler_ready -= 1 };
356 if let Some(handler) = self.handlers.get(self.handler_ready) {
357 if let Some(rec) = handler.downcast_ref::<Recipient<Msg>>() {
358 match rec.do_send(Msg{
359 timestamp, attemps, id, body,
360 }) {
361 Ok(_s) => {
362 self.in_flight += 1;
363 self.info_in_flight(self.in_flight);
364 },
365 Err(e) => { error!("error sending msg to reader: {}", e) }
366 }
367
368 }
369 }
370 if self.handler_ready == 0 { self.handler_ready = self.handlers.len() }
371 }
372 },
373 Cmd::ResponseError(s) => {
374 if self.state == ConnState::Closing {
375 error!("Closing connection: {}", s);
376 self.info_on_close(false);
377 self.state = ConnState::Started;
378 }
379 error!("failed: {}", s);
380 }
381 Cmd::Command(_) => {
382 if let Some(ref mut cell) = self.cell {
383 cell.write(rdy(1));
384 }
385 }
386 _ => {},
387 }
388 }
389}
390
391impl Handler<TcpConnect> for Connection
392{
393 type Result=();
394 fn handle(&mut self, msg:TcpConnect, ctx: &mut Self::Context) {
395 Resolver::from_registry()
396 .send(Connect::host(msg.0.as_str()))
397 .into_actor(self)
398 .map(move |res, act, ctx| match res {
399 Ok(stream) => {
400 info!("connected [{}]", msg.0);
401 let (r, w) = stream.split();
404
405 let mut framed =
407 actix::io::FramedWrite::new(w, NsqCodec{}, ctx);
408 let mut rx = FramedRead::new(r, NsqCodec{});
409 framed.write(Cmd::Magic(VERSION));
410 let json = match serde_json::to_string(&act.config) {
412 Ok(s) => s,
413 Err(e) => {
414 error!("config cannot be formatted as json string: {}", e);
415 return ctx.stop();
416 }
417 };
418 ctx.add_stream(rx);
420 framed.write(identify(json));
421 act.cell = Some(framed);
422
423 act.backoff.reset();
424 act.state = ConnState::Neg;
425 }
426 Err(err) => {
427 error!("can not connect [{}]", err);
428 if let Some(timeout) = act.tcp_backoff.next_backoff() {
431 ctx.run_later(timeout, |_, ctx| ctx.stop());
432 }
433 }
434 })
435 .map_err(|err, act, ctx| {
436 error!("can not connect [{}]", err);
437 if let Some(timeout) = act.tcp_backoff.next_backoff() {
440 ctx.run_later(timeout, |_, ctx| ctx.stop());
441 }
442 })
443 .wait(ctx);
444 }
445}
446
447impl Handler<Cls> for Connection {
448 type Result=();
449 fn handle(&mut self, _msg: Cls, ctx: &mut Self::Context) {
450 self.state = ConnState::Closing;
451 ctx.stop();
452 }
453}
454
455impl Handler<Fin> for Connection
456{
457 type Result = ();
458 fn handle(&mut self, msg: Fin, ctx: &mut Self::Context) {
459 if let Some(ref mut cell) = self.cell {
461 cell.write(fin(&msg.0));
462 }
463 if self.state == ConnState::Resume {
464 ctx.notify(Ready(self.rdy));
465 self.state = ConnState::Started;
466 }
467 self.in_flight -= 1;
468 self.info_in_flight(self.in_flight);
469 }
470}
471
472impl Handler<Ready> for Connection
473{
474 type Result = ();
475
476 fn handle(&mut self, msg: Ready, _ctx: &mut Self::Context) {
477 if self.state != ConnState::Ready {
478 self.rdy = msg.0;
479 return
480 }
481 if let Some(ref mut cell) = self.cell {
482 cell.write(rdy(msg.0));
483 }
484 if self.state == ConnState::Started {
485 self.rdy = msg.0;
486 info!("rdy updated [{}]", self.addr);
487
488 } else { self.state = ConnState::Started; info!("Ready to go [{}] RDY: {}", self.addr, msg.0); }
489 }
490}
491
492
493impl Handler<Auth> for Connection
494{
495 type Result = ();
496 fn handle(&mut self, _msg: Auth, ctx: &mut Self::Context) {
497 if let Some(ref mut cell) = self.cell {
498 cell.write(auth(self.secret.clone()));
499 } else {
500 error!("unable to identify: connection dropped [{}]", self.addr);
501 ctx.stop();
502 }
503 self.state = ConnState::Auth;
504 }
505}
506
507impl Handler<Sub> for Connection
508{
509 type Result = ();
510 fn handle(&mut self, _msg: Sub, ctx: &mut Self::Context) {
511 if let Some(ref mut cell) = self.cell {
512 cell.write(sub(&self.topic, &self.channel));
513 } else {
514 error!("unable to subscribing: connection dropped [{}]", self.addr);
515 ctx.stop();
516 }
517 self.state = ConnState::Ready;
518 info!("subscribed [{}] topic: {} channel: {}", self.addr, self.topic, self.channel);
519 }
520}
521
522impl Handler<Backoff> for Connection
523{
524 type Result=();
525 fn handle(&mut self, _msg: Backoff, ctx: &mut Self::Context) {
526 if let Some(timeout) = self.backoff.next_backoff() {
527 if let Some(ref mut cell) = self.cell {
528 cell.write(rdy(0));
529 ctx.run_later(timeout, |_, ctx| ctx.notify(Resume));
530 self.state = ConnState::Backoff;
531 } else {
532 error!("backoff failed: connection dropped [{}]", self.addr);
533 Self::add_stream(once::<Cmd, Error>(Err(Error::NotConnected)), ctx);
534 }
535 self.info_in_flight(0);
536 self.info_on_backoff();
537 }
538 }
539}
540
541impl Handler<Resume> for Connection
542{
543 type Result=();
544 fn handle(&mut self, _msg: Resume, ctx: &mut Self::Context) {
545 if let Some(ref mut cell) = self.cell {
546 cell.write(rdy(1));
547 self.state = ConnState::Resume;
548 } else {
549 error!("resume failed: connection dropped [{}]", self.addr);
550 Self::add_stream(once::<Cmd, Error>(Err(Error::NotConnected)), ctx);
551 }
552 self.info_in_flight(1);
553 self.info_on_resume();
554 }
555}
556
557impl<M: NsqMsg> Handler<AddHandler<M>> for Connection
558{
559 type Result=();
560 fn handle(&mut self, msg: AddHandler<M>, _: &mut Self::Context) {
561 let msg_id = TypeId::of::<Recipient<M>>();
562 if msg_id == TypeId::of::<Recipient<Msg>>() {
563 self.handlers.push(Box::new(msg.0));
564 info!("Reader added");
565 } else {
566 self.info_hashmap.insert(msg_id, Box::new(msg.0));
567 info!("info handler added");
568 }
569 }
570}
571
572impl Supervised for Connection
573{
574 fn restarting(&mut self, ctx: &mut Self::Context) {
575 if self.state == ConnState::Stopped {
576 ctx.stop();
577 }
578 }
579}