1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use raftlog::election::{Ballot, Role};
use raftlog::log::{LogIndex, LogPrefix, LogSuffix};
use raftlog::message::Message;
use raftlog::{ErrorKind, Io, Result};
use slog::Logger;
use trackable::error::ErrorKindExt;
use storage::{self, Storage};
use timer::{Timeout, Timer};
use {LocalNodeId, Mailer, ServiceHandle};
#[derive(Debug)]
pub struct RaftIo {
logger: Logger,
node_id: LocalNodeId,
service: ServiceHandle,
storage: Storage,
mailer: Mailer,
timer: Timer,
}
impl RaftIo {
pub fn new(
service: ServiceHandle,
storage: Storage,
mailer: Mailer,
timer: Timer,
) -> Result<Self> {
let node_id = storage.node_id();
track!(service.add_node(node_id, &mailer))?;
Ok(RaftIo {
logger: storage.logger(),
node_id,
service,
storage,
mailer,
timer,
})
}
}
impl Io for RaftIo {
type SaveBallot = storage::SaveBallot;
type LoadBallot = storage::LoadBallot;
type SaveLog = storage::SaveLog;
type LoadLog = storage::LoadLog;
type Timeout = Timeout;
fn try_recv_message(&mut self) -> Result<Option<Message>> {
self.mailer
.try_recv_message()
.map_err(|e| ErrorKind::Other.takes_over(e).into())
}
fn send_message(&mut self, message: Message) {
let node = match message.header().destination.as_str().parse() {
Err(e) => {
crit!(self.logger, "Wrong destination: {}", e);
return;
}
Ok(id) => id,
};
self.mailer.send_message(&node, message);
}
fn save_ballot(&mut self, ballot: Ballot) -> Self::SaveBallot {
self.storage.save_ballot(ballot)
}
fn load_ballot(&mut self) -> Self::LoadBallot {
self.storage.load_ballot()
}
fn save_log_prefix(&mut self, prefix: LogPrefix) -> Self::SaveLog {
self.storage.save_log_prefix(prefix)
}
fn save_log_suffix(&mut self, suffix: &LogSuffix) -> Self::SaveLog {
self.storage.save_log_suffix(suffix)
}
fn load_log(&mut self, start: LogIndex, end: Option<LogIndex>) -> Self::LoadLog {
self.storage.load_log(start, end)
}
fn create_timeout(&mut self, role: Role) -> Self::Timeout {
self.timer.create_timeout(role)
}
fn is_busy(&mut self) -> bool {
self.storage.is_busy()
}
}
impl Drop for RaftIo {
fn drop(&mut self) {
if let Err(e) = track!(self.service.remove_node(self.node_id)) {
warn!(
self.logger,
"Cannot remove the node {:?}: {}", self.node_id, e
);
}
}
}