1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
extern crate actix;
extern crate nats;
use actix::prelude::*;
pub struct PublishMessage {
subject: String,
data: Vec<u8>,
}
impl PublishMessage {
pub fn new(subject: String, data: Vec<u8>) -> Self {
PublishMessage { subject, data }
}
}
impl Message for PublishMessage {
type Result = Result<(), nats::NatsError>;
}
pub struct RequestWithReply {
subject: String,
data: Vec<u8>,
inbox: Option<String>,
}
impl RequestWithReply {
pub fn new(subject: String, data: Vec<u8>) -> Self {
RequestWithReply {
subject,
data,
inbox: None,
}
}
}
impl Message for RequestWithReply {
type Result = Result<Vec<u8>, nats::NatsError>;
}
pub struct NATSExecutorSync(nats::Client);
impl NATSExecutorSync {
fn new(client: nats::Client) -> Self {
NATSExecutorSync(client)
}
pub fn start<F>(threads: usize, client_factory: F) -> Addr<Self>
where
F: Fn() -> nats::Client + Send + Sync + 'static,
{
SyncArbiter::start(threads, move || Self::new(client_factory()))
}
}
impl Actor for NATSExecutorSync {
type Context = SyncContext<Self>;
}
impl Handler<PublishMessage> for NATSExecutorSync {
type Result = Result<(), nats::NatsError>;
fn handle(&mut self, msg: PublishMessage, _: &mut Self::Context) -> Self::Result {
self.0.publish(&msg.subject, &msg.data)
}
}
impl Handler<RequestWithReply> for NATSExecutorSync {
type Result = Result<Vec<u8>, nats::NatsError>;
fn handle(&mut self, mut msg: RequestWithReply, _: &mut Self::Context) -> Self::Result {
msg.inbox = Some(self.0.make_request(&msg.subject, &msg.data)?);
Ok(self.0.wait()?.msg)
}
}