actix_nats/
lib.rs

1extern crate actix;
2extern crate backoff;
3extern crate futures;
4#[macro_use]
5extern crate log;
6pub extern crate nitox;
7
8use actix::prelude::*;
9use backoff::backoff::Backoff;
10use backoff::ExponentialBackoff;
11use futures::{future, Future};
12use nitox::{
13    commands::{Message as NatsMessage, PubCommand as NatsPublish},
14    NatsClient, NatsClientOptions, NatsError,
15};
16
17mod messages;
18pub use self::messages::*;
19
20/// Actor to give to Actix to do the background processing of NATS messages/requests
21#[derive(Default)]
22pub struct NATSActor {
23    inner: Option<NatsClient>,
24    backoff: ExponentialBackoff,
25    opts: NatsClientOptions,
26}
27
28impl NATSActor {
29    /// Start new `Supervisor` with `NATSActor`.
30    pub fn start(opts: NatsClientOptions) -> Addr<NATSActor> {
31        let mut backoff = ExponentialBackoff::default();
32        backoff.max_elapsed_time = None;
33
34        debug!(target: "actix-nats", "Starting Supervisor/Actor with opts {:#?}", opts);
35        Supervisor::start(move |_| NATSActor {
36            opts,
37            backoff,
38            inner: None,
39        })
40    }
41}
42
43impl Actor for NATSActor {
44    type Context = Context<Self>;
45
46    fn started(&mut self, ctx: &mut Self::Context) {
47        debug!(target: "actix-nats", "Starting client...");
48        NatsClient::from_options(self.opts.clone())
49            .and_then(|client| {
50                debug!(target: "actix-nats", "Client created {:#?}", client);
51                client.connect()
52            }).into_actor(self)
53            .map(|client, act, _| {
54                info!(target: "actix-nats", "Connected to NATS server: {:#?}", client);
55                act.inner = Some(client);
56                act.backoff.reset();
57            }).map_err(|err, act, ctx| {
58                error!(target: "actix-nats", "Cannot connect to NATS server: {}", err);
59                if let Some(timeout) = act.backoff.next_backoff() {
60                    ctx.run_later(timeout, |_, ctx| ctx.stop());
61                }
62            }).wait(ctx);
63    }
64}
65
66impl Supervised for NATSActor {
67    fn restarting(&mut self, _: &mut Self::Context) {
68        debug!(target: "actix-nats", "Supervisor restarted actor");
69        self.inner.take();
70        self.backoff.reset();
71    }
72}
73
74impl Handler<PublishMessage> for NATSActor {
75    type Result = ResponseFuture<(), NatsError>;
76
77    fn handle(&mut self, msg: PublishMessage, _: &mut Self::Context) -> Self::Result {
78        if let Some(ref mut client) = self.inner {
79            let cmd = match NatsPublish::builder()
80                .subject(msg.subject)
81                .payload(msg.data)
82                .build()
83            {
84                Ok(cmd) => cmd,
85                Err(e) => return Box::new(future::err(NatsError::CommandBuildError(e))),
86            };
87
88            Box::new(client.publish(cmd))
89        } else {
90            error!(target: "actix-nats", "Cannot send message because client is not ready");
91            Box::new(future::err(NatsError::ServerDisconnected(None)))
92        }
93    }
94}
95
96impl Handler<RequestWithReply> for NATSActor {
97    type Result = ResponseFuture<NatsMessage, NatsError>;
98
99    fn handle(&mut self, msg: RequestWithReply, _: &mut Self::Context) -> Self::Result {
100        if let Some(ref mut client) = self.inner {
101            debug!(target: "actix-nats", "Sending request with payload {:#?}", msg);
102            Box::new(client.request(msg.subject, msg.data.into()))
103        } else {
104            error!(target: "actix-nats", "Cannot send message because client is not ready");
105            Box::new(future::err(NatsError::ServerDisconnected(None)))
106        }
107    }
108}