1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
extern crate actix;
extern crate backoff;
extern crate futures;
#[macro_use]
extern crate log;
pub extern crate nitox;
use actix::prelude::*;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use futures::{future, Future};
use nitox::{
commands::{Message as NatsMessage, PubCommand as NatsPublish},
NatsClient, NatsClientOptions, NatsError,
};
mod messages;
pub use self::messages::*;
#[derive(Default)]
pub struct NATSActor {
inner: Option<NatsClient>,
backoff: ExponentialBackoff,
opts: NatsClientOptions,
}
impl NATSActor {
pub fn start(opts: NatsClientOptions) -> Addr<NATSActor> {
let mut backoff = ExponentialBackoff::default();
backoff.max_elapsed_time = None;
debug!(target: "actix-nats", "Starting Supervisor/Actor with opts {:#?}", opts);
Supervisor::start(move |_| NATSActor {
opts,
backoff,
inner: None,
})
}
}
impl Actor for NATSActor {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
debug!(target: "actix-nats", "Starting client...");
NatsClient::from_options(self.opts.clone())
.and_then(|client| {
debug!(target: "actix-nats", "Client created {:#?}", client);
client.connect()
}).into_actor(self)
.map(|client, act, _| {
info!(target: "actix-nats", "Connected to NATS server: {:#?}", client);
act.inner = Some(client);
act.backoff.reset();
}).map_err(|err, act, ctx| {
error!(target: "actix-nats", "Cannot connect to NATS server: {}", err);
if let Some(timeout) = act.backoff.next_backoff() {
ctx.run_later(timeout, |_, ctx| ctx.stop());
}
}).wait(ctx);
}
}
impl Supervised for NATSActor {
fn restarting(&mut self, _: &mut Self::Context) {
debug!(target: "actix-nats", "Supervisor restarted actor");
self.inner.take();
self.backoff.reset();
}
}
impl Handler<PublishMessage> for NATSActor {
type Result = ResponseFuture<(), NatsError>;
fn handle(&mut self, msg: PublishMessage, _: &mut Self::Context) -> Self::Result {
if let Some(ref mut client) = self.inner {
let cmd = match NatsPublish::builder()
.subject(msg.subject)
.payload(msg.data)
.build()
{
Ok(cmd) => cmd,
Err(e) => return Box::new(future::err(NatsError::CommandBuildError(e))),
};
Box::new(client.publish(cmd))
} else {
error!(target: "actix-nats", "Cannot send message because client is not ready");
Box::new(future::err(NatsError::ServerDisconnected(None)))
}
}
}
impl Handler<RequestWithReply> for NATSActor {
type Result = ResponseFuture<NatsMessage, NatsError>;
fn handle(&mut self, msg: RequestWithReply, _: &mut Self::Context) -> Self::Result {
if let Some(ref mut client) = self.inner {
debug!(target: "actix-nats", "Sending request with payload {:#?}", msg);
Box::new(client.request(msg.subject, msg.data.into()))
} else {
error!(target: "actix-nats", "Cannot send message because client is not ready");
Box::new(future::err(NatsError::ServerDisconnected(None)))
}
}
}