1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use atomic_immut::AtomicImmut;
use fibers::sync::mpsc;
use fibers_rpc::server::ServerBuilder;
use futures::{Async, Future, Poll, Stream};
use raftlog::{Error, ErrorKind, Result};
use slog::Logger;
use std::collections::HashMap;
use std::sync::Arc;
use super::mail::{Mailbox, Mailer};
use super::server::RpcServer;
use crate::rpc;
use crate::LocalNodeId;
type Nodes = Arc<AtomicImmut<HashMap<LocalNodeId, Mailbox>>>;
#[derive(Debug)]
pub struct Service {
logger: Logger,
nodes: Nodes,
command_tx: mpsc::Sender<Command>,
command_rx: mpsc::Receiver<Command>,
}
impl Service {
pub fn new(logger: Logger, builder: &mut ServerBuilder) -> Self {
let nodes = Arc::new(AtomicImmut::new(HashMap::new()));
let (command_tx, command_rx) = mpsc::channel();
let this = Service {
logger,
nodes,
command_tx,
command_rx,
};
builder.add_cast_handler::<rpc::RequestVoteCallRpc, _>(RpcServer::new(this.handle()));
builder.add_cast_handler::<rpc::RequestVoteReplyRpc, _>(RpcServer::new(this.handle()));
builder.add_cast_handler::<rpc::AppendEntriesCallRpc, _>(RpcServer::new(this.handle()));
builder.add_cast_handler::<rpc::AppendEntriesReplyRpc, _>(RpcServer::new(this.handle()));
builder.add_cast_handler::<rpc::InstallSnapshotCastRpc, _>(RpcServer::new(this.handle()));
this
}
pub fn handle(&self) -> ServiceHandle {
ServiceHandle {
nodes: self.nodes.clone(),
command_tx: self.command_tx.clone(),
}
}
fn handle_command(&mut self, command: Command) {
match command {
Command::AddNode(id, mailbox) => {
info!(self.logger, "Adds node: {}", dump!(id, mailbox));
let mut nodes = (&*self.nodes.load()).clone();
nodes.insert(id, mailbox);
self.nodes.store(nodes);
}
Command::RemoveNode(id) => {
let mut nodes = (&*self.nodes.load()).clone();
let removed = nodes.remove(&id);
self.nodes.store(nodes);
info!(self.logger, "Removes node: {}", dump!(id, removed));
}
}
}
}
impl Future for Service {
type Item = ();
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let polled = self.command_rx.poll().expect("Never fails");
if let Async::Ready(command) = polled {
let command = command.expect("Unreachable");
self.handle_command(command);
} else {
return Ok(Async::NotReady);
}
}
}
}
#[derive(Debug)]
enum Command {
AddNode(LocalNodeId, Mailbox),
RemoveNode(LocalNodeId),
}
#[derive(Debug, Clone)]
pub struct ServiceHandle {
nodes: Nodes,
command_tx: mpsc::Sender<Command>,
}
impl ServiceHandle {
pub(crate) fn add_node(&self, id: LocalNodeId, mailer: &Mailer) -> Result<()> {
let mailbox = mailer.mailbox();
let command = Command::AddNode(id, mailbox);
if self.command_tx.send(command).is_err() {
track_panic!(ErrorKind::Other, "Service down: {}", dump!(id));
}
Ok(())
}
pub(crate) fn remove_node(&self, id: LocalNodeId) -> Result<()> {
let command = Command::RemoveNode(id);
if self.command_tx.send(command).is_err() {
track_panic!(ErrorKind::Other, "Service down: {}", dump!(id));
}
Ok(())
}
pub(crate) fn get_node(&self, id: LocalNodeId) -> Option<Mailbox> {
self.nodes.load().get(&id).cloned()
}
}