1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::fmt::Debug;
use amy::{self, Poller, Registrar};
use pid::Pid;
use serde::{Serialize, Deserialize};
use msg::Msg;
use envelope::Envelope;
use node::Node;
use errors::*;
use slog;
use super::ServiceHandler;
pub struct Service<T, H> {
pub pid: Pid,
pub tx: amy::Sender<Envelope<T>>,
rx: amy::Receiver<Envelope<T>>,
node: Node<T>,
poller: Poller,
registrar: Registrar,
handler: H,
logger: slog::Logger
}
impl<'de, T, H> Service<T, H>
where T: Serialize + Deserialize<'de> + Debug + Clone,
H: ServiceHandler<T>
{
pub fn new(pid: Pid, node: Node<T>, mut handler: H)
-> Result<Service<T, H>>
{
let poller = Poller::new().unwrap();
let mut registrar = poller.get_registrar()?;
let (tx, rx) = registrar.channel()?;
node.register_service(&pid, &tx)?;
handler.init(®istrar, &node)?;
let logger = node.logger.new(o!("component" => "service", "pid" => pid.to_string()));
Ok(Service {
pid: pid,
tx: tx,
rx: rx,
node: node,
poller: poller,
registrar: registrar,
handler: handler,
logger: logger
})
}
pub fn wait(&mut self) {
loop {
for notification in self.poller.wait(1000).unwrap() {
if notification.id == self.rx.get_id() {
if let Err(e) = self.handle_envelopes() {
if let ErrorKind::Shutdown(_) = *e.kind() {
info!(self.logger, "Service shutting down";
"pid" => self.pid.to_string());
return;
}
error!(self.logger,
"Failed to handle envelope";
"error" => e.to_string())
}
} else {
if let Err(e) = self.handler.handle_notification(&self.node,
notification,
&self.registrar) {
warn!(self.logger,
"Failed to handle poll notification";
"error" => e.to_string())
}
}
}
}
}
pub fn handle_envelopes(&mut self) -> Result<()> {
while let Ok(envelope) = self.rx.try_recv() {
if let Msg::Shutdown = envelope.msg {
return Err(ErrorKind::Shutdown(self.pid.clone()).into());
}
try!(self.handler.handle_envelope(&self.node, envelope, &self.registrar));
}
Ok(())
}
}