1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use actix::prelude::*;
use log::{error};
use futures::sync::oneshot;
use crate::{
AppData, AppDataResponse, AppError,
common::{CLIENT_RPC_RX_ERR, CLIENT_RPC_TX_ERR, ApplyLogsTask, ClientPayloadWithChan, DependencyAddr},
network::RaftNetwork,
messages::{ClientError, ClientPayload, ClientPayloadResponse, ResponseMode},
raft::{RaftState, Raft},
replication::RSReplicate,
storage::{AppendEntryToLog, RaftStorage},
};
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Handler<ClientPayload<D, R, E>> for Raft<D, R, E, N, S> {
type Result = ResponseActFuture<Self, ClientPayloadResponse<R>, ClientError<D, R, E>>;
fn handle(&mut self, msg: ClientPayload<D, R, E>, _: &mut Self::Context) -> Self::Result {
let response_chan = match &mut self.state {
RaftState::Leader(state) => {
let (tx, rx) = oneshot::channel();
let with_chan = ClientPayloadWithChan{tx, rpc: msg};
let _ = state.client_request_queue.unbounded_send(with_chan).map_err(|_| {
error!("Unexpected error while queueing client request for processing.")
});
rx
},
_ => {
return Box::new(fut::err(ClientError::ForwardToLeader{payload: msg, leader: self.current_leader}));
},
};
Box::new(fut::wrap_future(response_chan)
.map_err(|_, _: &mut Self, _| {
error!("{}", CLIENT_RPC_RX_ERR);
ClientError::Internal
})
.and_then(|res, _, _| fut::result(res)))
}
}
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Raft<D, R, E, N, S> {
pub(super) fn process_client_rpc(&mut self, _: &mut Context<Self>, msg: ClientPayloadWithChan<D, R, E>) -> impl ActorFuture<Actor=Self, Item=(), Error=()> {
match &self.state {
RaftState::Leader(_) => (),
_ => {
let _ = msg.tx.send(Err(ClientError::ForwardToLeader{payload: msg.rpc, leader: self.current_leader}))
.map_err(|_| error!("{} Error while forwarding to leader at the start of process_client_rpc.", CLIENT_RPC_TX_ERR));
return fut::Either::A(fut::ok(()));
}
};
let payload = msg.upgrade(self.last_log_index + 1, self.current_term);
self.is_appending_logs = true;
fut::Either::B(fut::wrap_future(self.storage.send::<AppendEntryToLog<D, E>>(AppendEntryToLog::new(payload.entry())))
.map_err(|err, act: &mut Self, ctx| {
act.map_fatal_actix_messaging_error(ctx, err, DependencyAddr::RaftStorage);
ClientError::Internal
})
.and_then(|res, _, _| fut::result(res.map_err(|err| ClientError::Application(err))))
.then(move |res, act, _| {
act.is_appending_logs = false;
match res {
Ok(_) => {
act.last_log_index = payload.index;
act.last_log_term = act.current_term;
fut::result(Ok(payload))
}
Err(err) => {
error!("Node {} received an error from the storage engine.", &act.id);
let _ = payload.tx.send(Err(err)).map_err(|err| error!("{} {:?}", CLIENT_RPC_RX_ERR, err));
fut::err(())
}
}
})
.and_then(move |payload, act, _| {
let state = match &mut act.state {
RaftState::Leader(state) => state,
_ => {
let msg = payload.downgrade();
let _ = msg.tx.send(Err(ClientError::ForwardToLeader{payload: msg.rpc, leader: act.current_leader}))
.map_err(|_| error!("{} Error while forwarding to leader at the end of process_client_rpc.", CLIENT_RPC_RX_ERR));
return fut::ok(());
}
};
let nodeid = &act.id;
let voting_peer_count = act.membership.members.iter().filter(|e| *e != nodeid).count();
if voting_peer_count > 0 {
let entry = payload.entry();
state.awaiting_committed.push(payload);
for rs in state.nodes.values() {
let _ = rs.addr.do_send(RSReplicate{entry: entry.clone(), line_commit: act.commit_index});
}
} else {
if act.membership.non_voters.len() > 0 {
let entry = payload.entry();
for rs in state.nodes.values() {
let _ = rs.addr.do_send(RSReplicate{entry: entry.clone(), line_commit: act.commit_index});
}
}
act.commit_index = payload.index;
if let &ResponseMode::Committed = &payload.response_mode {
let entry = payload.entry();
let _ = payload.tx.send(Ok(ClientPayloadResponse::Committed{index: payload.index})).map_err(|err| error!("{} {:?}", CLIENT_RPC_RX_ERR, err));
let _ = act.apply_logs_pipeline.unbounded_send(ApplyLogsTask::Entry{entry, chan: None});
} else {
let _ = act.apply_logs_pipeline.unbounded_send(ApplyLogsTask::Entry{entry: payload.entry(), chan: Some(payload.tx)});
}
}
fut::ok(())
}))
}
}