1extern crate actix;
2extern crate backoff;
3extern crate futures;
4#[macro_use]
5extern crate log;
6pub extern crate nitox;
7
8use actix::prelude::*;
9use backoff::backoff::Backoff;
10use backoff::ExponentialBackoff;
11use futures::{future, Future};
12use nitox::{
13 commands::{Message as NatsMessage, PubCommand as NatsPublish},
14 NatsClient, NatsClientOptions, NatsError,
15};
16
17mod messages;
18pub use self::messages::*;
19
20#[derive(Default)]
22pub struct NATSActor {
23 inner: Option<NatsClient>,
24 backoff: ExponentialBackoff,
25 opts: NatsClientOptions,
26}
27
28impl NATSActor {
29 pub fn start(opts: NatsClientOptions) -> Addr<NATSActor> {
31 let mut backoff = ExponentialBackoff::default();
32 backoff.max_elapsed_time = None;
33
34 debug!(target: "actix-nats", "Starting Supervisor/Actor with opts {:#?}", opts);
35 Supervisor::start(move |_| NATSActor {
36 opts,
37 backoff,
38 inner: None,
39 })
40 }
41}
42
43impl Actor for NATSActor {
44 type Context = Context<Self>;
45
46 fn started(&mut self, ctx: &mut Self::Context) {
47 debug!(target: "actix-nats", "Starting client...");
48 NatsClient::from_options(self.opts.clone())
49 .and_then(|client| {
50 debug!(target: "actix-nats", "Client created {:#?}", client);
51 client.connect()
52 }).into_actor(self)
53 .map(|client, act, _| {
54 info!(target: "actix-nats", "Connected to NATS server: {:#?}", client);
55 act.inner = Some(client);
56 act.backoff.reset();
57 }).map_err(|err, act, ctx| {
58 error!(target: "actix-nats", "Cannot connect to NATS server: {}", err);
59 if let Some(timeout) = act.backoff.next_backoff() {
60 ctx.run_later(timeout, |_, ctx| ctx.stop());
61 }
62 }).wait(ctx);
63 }
64}
65
66impl Supervised for NATSActor {
67 fn restarting(&mut self, _: &mut Self::Context) {
68 debug!(target: "actix-nats", "Supervisor restarted actor");
69 self.inner.take();
70 self.backoff.reset();
71 }
72}
73
74impl Handler<PublishMessage> for NATSActor {
75 type Result = ResponseFuture<(), NatsError>;
76
77 fn handle(&mut self, msg: PublishMessage, _: &mut Self::Context) -> Self::Result {
78 if let Some(ref mut client) = self.inner {
79 let cmd = match NatsPublish::builder()
80 .subject(msg.subject)
81 .payload(msg.data)
82 .build()
83 {
84 Ok(cmd) => cmd,
85 Err(e) => return Box::new(future::err(NatsError::CommandBuildError(e))),
86 };
87
88 Box::new(client.publish(cmd))
89 } else {
90 error!(target: "actix-nats", "Cannot send message because client is not ready");
91 Box::new(future::err(NatsError::ServerDisconnected(None)))
92 }
93 }
94}
95
96impl Handler<RequestWithReply> for NATSActor {
97 type Result = ResponseFuture<NatsMessage, NatsError>;
98
99 fn handle(&mut self, msg: RequestWithReply, _: &mut Self::Context) -> Self::Result {
100 if let Some(ref mut client) = self.inner {
101 debug!(target: "actix-nats", "Sending request with payload {:#?}", msg);
102 Box::new(client.request(msg.subject, msg.data.into()))
103 } else {
104 error!(target: "actix-nats", "Cannot send message because client is not ready");
105 Box::new(future::err(NatsError::ServerDisconnected(None)))
106 }
107 }
108}