use actix::prelude::*;
use log::{debug, error};
use futures::sync::{mpsc, oneshot};
use crate::{
AppData, AppDataResponse, AppError,
common::{DependencyAddr, UpdateCurrentLeader},
network::RaftNetwork,
messages::{InstallSnapshotRequest, InstallSnapshotResponse},
raft::{RaftState, Raft, SnapshotState},
storage::{InstallSnapshot, InstallSnapshotChunk, RaftStorage},
};
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Handler<InstallSnapshotRequest> for Raft<D, R, E, N, S> {
type Result = ResponseActFuture<Self, InstallSnapshotResponse, ()>;
fn handle(&mut self, msg: InstallSnapshotRequest, ctx: &mut Self::Context) -> Self::Result {
if let &RaftState::Initializing = &self.state {
return Box::new(fut::err(()));
}
if !self.membership.contains(&msg.leader_id) {
return Box::new(fut::err(()));
}
if &msg.term < &self.current_term {
return Box::new(fut::err(()));
}
self.update_election_timeout_stamp();
if self.current_term != msg.term {
self.update_current_term(msg.term, None);
self.save_hard_state(ctx);
}
if self.current_leader != Some(msg.leader_id) {
self.update_current_leader(ctx, UpdateCurrentLeader::OtherNode(msg.leader_id));
}
if !self.state.is_follower() && !self.state.is_non_voter() {
self.become_follower(ctx);
}
let state = match &mut self.state {
RaftState::Follower(state) => state,
_ => return Box::new(fut::err(())),
};
match &mut state.snapshot_state {
SnapshotState::Idle if msg.done => self.handle_mini_snapshot(ctx, msg),
SnapshotState::Idle => self.handle_snapshot_stream(ctx, msg),
SnapshotState::Streaming(txopt, finalrxopt) if msg.done => {
if let (Some(tx), Some(finalrx)) = (txopt.take(), finalrxopt.take()) {
self.handle_final_snapshot_chunk(ctx, msg, tx, finalrx)
} else {
state.snapshot_state = SnapshotState::Idle;
Box::new(fut::err(()))
}
}
SnapshotState::Streaming(Some(tx), _) => {
let tx = tx.clone();
self.handle_snapshot_chunk(ctx, msg, tx.clone())
},
SnapshotState::Streaming(_, _) => Box::new(fut::err(())),
}
}
}
impl<D: AppData, R: AppDataResponse, E: AppError, N: RaftNetwork<D>, S: RaftStorage<D, R, E>> Raft<D, R, E, N, S> {
fn handle_mini_snapshot(&mut self, ctx: &mut Context<Self>, msg: InstallSnapshotRequest) -> Box<dyn ActorFuture<Actor=Self, Item=InstallSnapshotResponse, Error=()>> {
let (tx, rx) = mpsc::unbounded();
let (chunktx, chunkrx) = oneshot::channel();
let (finaltx, finalrx) = oneshot::channel();
let (snap_index, snap_term) = (msg.last_included_index, msg.last_included_term);
let task = fut::wrap_future(self.storage.send::<InstallSnapshot<E>>(InstallSnapshot::new(snap_term, snap_index, rx)))
.map_err(|err, act: &mut Self, ctx| act.map_fatal_actix_messaging_error(ctx, err, DependencyAddr::RaftStorage))
.and_then(|res, act, ctx| act.map_fatal_storage_result(ctx, res))
.map(move |_, _, _| {
let _ = finaltx.send(());
});
ctx.spawn(task);
match tx.unbounded_send(InstallSnapshotChunk{offset: msg.offset, data: msg.data, done: msg.done, cb: chunktx}) {
Ok(_) => (),
Err(_) => {
error!("Error streaming snapshot chunks to storage engine. Channel was closed.");
if let RaftState::Follower(state) = &mut self.state {
state.snapshot_state = SnapshotState::Idle;
}
return Box::new(fut::err(()));
}
};
Box::new(fut::wrap_future(chunkrx)
.and_then(|_, _, _| fut::wrap_future(finalrx))
.then(move |res, act: &mut Self, _| match res {
Ok(_) => match &mut act.state {
RaftState::Follower(state) => {
debug!("Finished installing snapshot. Update index & term to {} & {}.", snap_index, snap_term);
state.snapshot_state = SnapshotState::Idle;
if act.last_log_index < snap_index {
act.last_log_index = snap_index;
act.last_log_term = snap_term;
act.last_applied = snap_index;
}
fut::ok(InstallSnapshotResponse{term: act.current_term})
}
_ => fut::err(()),
}
Err(_) => {
error!("Error awaiting response from storage engine for final snapshot chunk. Channel was closed.");
fut::err(())
}
}))
}
fn handle_snapshot_stream(&mut self, ctx: &mut Context<Self>, msg: InstallSnapshotRequest) -> Box<dyn ActorFuture<Actor=Self, Item=InstallSnapshotResponse, Error=()>> {
let (tx, rx) = mpsc::unbounded();
let (chunktx, chunkrx) = oneshot::channel();
let (finaltx, finalrx) = oneshot::channel();
match &mut self.state {
RaftState::Follower(state) => {
state.snapshot_state = SnapshotState::Streaming(Some(tx.clone()), Some(finalrx));
}
_ => return Box::new(fut::err(())),
}
let (snap_index, snap_term) = (msg.last_included_index, msg.last_included_term);
let f = fut::wrap_future(self.storage.send::<InstallSnapshot<E>>(InstallSnapshot::new(snap_term, snap_index, rx)))
.map_err(|err, act: &mut Self, ctx| act.map_fatal_actix_messaging_error(ctx, err, DependencyAddr::RaftStorage))
.and_then(|res, act, ctx| act.map_fatal_storage_result(ctx, res))
.map(move |_, _, _| {
debug!("Received final response from storage engine for snapshot stream.");
let _ = finaltx.send(());
});
ctx.spawn(f);
match tx.unbounded_send(InstallSnapshotChunk{offset: msg.offset, data: msg.data, done: msg.done, cb: chunktx}) {
Ok(_) => (),
Err(_) => {
error!("Error streaming snapshot chunks to storage engine. Channel was closed.");
if let RaftState::Follower(state) = &mut self.state {
state.snapshot_state = SnapshotState::Idle;
}
return Box::new(fut::err(()));
}
};
Box::new(fut::wrap_future(chunkrx)
.then(|res, act: &mut Self, _| match res {
Ok(_) => fut::ok(InstallSnapshotResponse{term: act.current_term}),
Err(_) => {
error!("Error awaiting response from storage engine for chunk response. Channel was closed.");
fut::err(())
}
}))
}
fn handle_final_snapshot_chunk(
&mut self, _: &mut Context<Self>, msg: InstallSnapshotRequest, tx: mpsc::UnboundedSender<InstallSnapshotChunk>, finalrx: oneshot::Receiver<()>,
) -> Box<dyn ActorFuture<Actor=Self, Item=InstallSnapshotResponse, Error=()>> {
let (chunktx, chunkrx) = oneshot::channel();
let (snap_index, snap_term) = (msg.last_included_index, msg.last_included_term);
match tx.unbounded_send(InstallSnapshotChunk{offset: msg.offset, data: msg.data, done: msg.done, cb: chunktx}) {
Ok(_) => (),
Err(_) => {
error!("Error streaming snapshot chunks for storage engine. Channel was closed.");
if let RaftState::Follower(state) = &mut self.state {
state.snapshot_state = SnapshotState::Idle;
}
return Box::new(fut::err(()));
}
};
Box::new(fut::wrap_future(chunkrx)
.and_then(|_, _, _| fut::wrap_future(finalrx))
.then(move |res, act: &mut Self, _| match res {
Ok(_) => match &mut act.state {
RaftState::Follower(state) => {
debug!("Finished installing snapshot. Update index & term to {} & {}.", snap_index, snap_term);
state.snapshot_state = SnapshotState::Idle;
if act.last_log_index < snap_index {
act.last_log_index = snap_index;
act.last_log_term = snap_term;
act.last_applied = snap_index;
}
fut::ok(InstallSnapshotResponse{term: act.current_term})
}
_ => fut::err(()),
}
Err(_) => {
error!("Error awaiting response from storage engine for final snapshot chunk. Channel was closed.");
fut::err(())
}
}))
}
fn handle_snapshot_chunk(
&mut self, _: &mut Context<Self>, msg: InstallSnapshotRequest, tx: mpsc::UnboundedSender<InstallSnapshotChunk>,
) -> Box<dyn ActorFuture<Actor=Self, Item=InstallSnapshotResponse, Error=()>> {
let (chunktx, chunkrx) = oneshot::channel();
match tx.unbounded_send(InstallSnapshotChunk{offset: msg.offset, data: msg.data, done: msg.done, cb: chunktx}) {
Ok(_) => (),
Err(_) => {
error!("Error streaming snapshot chunks to storage engine. Channel was closed.");
if let RaftState::Follower(state) = &mut self.state {
state.snapshot_state = SnapshotState::Idle;
}
return Box::new(fut::err(()));
}
};
Box::new(fut::wrap_future(chunkrx)
.then(|res, act: &mut Self, _| match res {
Ok(_) => fut::ok(InstallSnapshotResponse{term: act.current_term}),
Err(_) => {
error!("Node {}: awaiting response from storage engine for chunk response. Channel was closed.", act.id);
fut::err(())
}
}))
}
}