use super::*;
mod raft {
tonic::include_proto!("sorock");
}
use process::*;
use raft::raft_server::{Raft, RaftServer};
use std::pin::Pin;
pub mod client;
mod stream;
pub fn new(node: Arc<node::RaftNode>) -> RaftServer<impl Raft> {
let inner = RaftService { node };
raft::raft_server::RaftServer::new(inner)
}
#[doc(hidden)]
pub struct RaftService {
node: Arc<node::RaftNode>,
}
#[tonic::async_trait]
impl raft::raft_server::Raft for RaftService {
type GetSnapshotStream = stream::SnapshotStreamOut;
async fn write(
&self,
request: tonic::Request<raft::WriteRequest>,
) -> std::result::Result<tonic::Response<raft::Response>, tonic::Status> {
let req = request.into_inner();
let shard_index = req.shard_index;
let req = request::ApplicationWriteRequest {
message: req.message,
request_id: req.request_id,
};
let resp = self
.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.process_application_write_request(req)
.await
.unwrap();
Ok(tonic::Response::new(raft::Response { message: resp }))
}
async fn read(
&self,
request: tonic::Request<raft::ReadRequest>,
) -> std::result::Result<tonic::Response<raft::Response>, tonic::Status> {
let req = request.into_inner();
let shard_index = req.shard_index;
let req = request::ApplicationReadRequest {
message: req.message,
read_locally: req.read_locally,
};
let resp = self
.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.process_application_read_request(req)
.await
.unwrap();
Ok(tonic::Response::new(raft::Response { message: resp }))
}
async fn process_kernel_request(
&self,
request: tonic::Request<raft::KernelRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
let req = request.into_inner();
let shard_index = req.shard_index;
let req = request::KernelRequest {
message: req.message,
};
self.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.process_kernel_request(req)
.await
.unwrap();
Ok(tonic::Response::new(()))
}
async fn request_vote(
&self,
request: tonic::Request<raft::VoteRequest>,
) -> std::result::Result<tonic::Response<raft::VoteResponse>, tonic::Status> {
let req = request.into_inner();
let shard_index = req.shard_index;
let req = request::RequestVote {
candidate_id: req.candidate_id.parse().unwrap(),
candidate_clock: {
let clock = req.candidate_clock.unwrap();
Clock {
term: clock.term,
index: clock.index,
}
},
vote_term: req.vote_term,
force_vote: req.force_vote,
pre_vote: req.pre_vote,
};
let resp = self
.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.request_vote(req)
.await
.unwrap();
Ok(tonic::Response::new(raft::VoteResponse {
vote_granted: resp,
}))
}
async fn add_server(
&self,
request: tonic::Request<raft::AddServerRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
let req = request.into_inner();
let shard_index = req.shard_index;
let req = request::AddServer {
server_id: req.server_id.parse().unwrap(),
};
self.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.add_server(req)
.await
.unwrap();
Ok(tonic::Response::new(()))
}
async fn remove_server(
&self,
request: tonic::Request<raft::RemoveServerRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
let req = request.into_inner();
let shard_index = req.shard_index;
let req = request::RemoveServer {
server_id: req.server_id.parse().unwrap(),
};
self.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.remove_server(req)
.await
.unwrap();
Ok(tonic::Response::new(()))
}
async fn get_membership(
&self,
req: tonic::Request<raft::Shard>,
) -> std::result::Result<tonic::Response<raft::Membership>, tonic::Status> {
let shard_index = req.into_inner().id;
let process = self
.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap();
let members = process.get_membership().await.unwrap().members;
let out = raft::Membership {
members: members.into_iter().map(|x| x.to_string()).collect(),
};
Ok(tonic::Response::new(out))
}
async fn send_replication_stream(
&self,
request: tonic::Request<tonic::Streaming<raft::ReplicationStreamChunk>>,
) -> std::result::Result<tonic::Response<raft::ReplicationStreamResponse>, tonic::Status> {
let st = request.into_inner();
let (shard_index, st) = stream::into_internal_replication_stream(st).await.unwrap();
let resp = self
.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.send_replication_stream(st)
.await
.unwrap();
Ok(tonic::Response::new(raft::ReplicationStreamResponse {
n_inserted: resp.n_inserted,
log_last_index: resp.log_last_index,
}))
}
async fn get_snapshot(
&self,
request: tonic::Request<raft::GetSnapshotRequest>,
) -> std::result::Result<tonic::Response<Self::GetSnapshotStream>, tonic::Status> {
let req = request.into_inner();
let shard_index = req.shard_index;
let resp = self
.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.get_snapshot(req.index)
.await
.unwrap();
let resp = stream::into_external_snapshot_stream(resp);
Ok(tonic::Response::new(resp))
}
async fn send_heartbeat(
&self,
request: tonic::Request<raft::Heartbeat>,
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
let req = request.into_inner();
let leader_id: NodeAddress = req.leader_id.parse().unwrap();
let mut futs = vec![];
for (shard_index, leader_state) in req.leader_commit_states {
let req = request::Heartbeat {
leader_term: leader_state.leader_term,
leader_commit_index: leader_state.leader_commit_index,
};
if let Some(process) = self.node.get_process(shard_index) {
let leader_id = leader_id.clone();
futs.push(async move { process.receive_heartbeat(leader_id, req).await });
}
}
futures::future::try_join_all(futs).await.unwrap();
Ok(tonic::Response::new(()))
}
async fn send_timeout_now(
&self,
req: tonic::Request<raft::TimeoutNow>,
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
let req = req.into_inner();
let shard_index = req.shard_index;
self.node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap()
.send_timeout_now()
.await
.unwrap();
Ok(tonic::Response::new(()))
}
type WatchLogMetricsStream =
Pin<Box<dyn Stream<Item = Result<raft::LogMetrics, tonic::Status>> + Send>>;
async fn watch_log_metrics(
&self,
req: tonic::Request<raft::Shard>,
) -> std::result::Result<tonic::Response<Self::WatchLogMetricsStream>, tonic::Status> {
let shard_index = req.into_inner().id;
let node = self.node.clone();
let st = async_stream::try_stream! {
let mut intvl = tokio::time::interval(std::time::Duration::from_secs(1));
loop {
intvl.tick().await;
let process = node
.get_process(shard_index)
.context(Error::ProcessNotFound(shard_index))
.unwrap();
let log_state = process.get_log_state().await.unwrap();
let metrics = raft::LogMetrics {
head_index: log_state.head_index,
snapshot_index: log_state.snapshot_index,
application_index: log_state.application_index,
commit_index: log_state.commit_index,
last_index: log_state.last_index,
};
yield metrics
}
};
Ok(tonic::Response::new(Box::pin(st)))
}
}