use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
use crate::message::{Message, RaftResponse};
use crate::raft_service::raft_service_server::{RaftService, RaftServiceServer};
use crate::raft_service::{self, RequestIdArgs};
use bincode::serialize;
use log::{error, info, warn};
use raft::eraftpb::{ConfChange, Message as RaftMessage};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use tonic::transport::Server;
use tonic::{Request, Response, Status};
pub struct RaftServer {
snd: mpsc::Sender<Message>,
addr: SocketAddr,
}
impl RaftServer {
pub fn new<A: ToSocketAddrs>(snd: mpsc::Sender<Message>, addr: A) -> Self {
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
RaftServer { snd, addr }
}
pub async fn run(self) {
let addr = self.addr;
info!("listening gRPC requests on: {}", addr);
let svc = RaftServiceServer::new(self);
Server::builder()
.add_service(svc)
.serve(addr)
.await
.expect("error running server");
warn!("server has quit");
}
}
#[tonic::async_trait]
impl RaftService for RaftServer {
async fn request_id(
&self,
requset: Request<RequestIdArgs>,
) -> Result<Response<raft_service::IdRequestReponse>, Status> {
let sender = self.snd.clone();
let (tx, rx) = oneshot::channel();
let RequestIdArgs { addr } = requset.get_ref();
let addr = addr.to_owned();
let _ = sender.send(Message::RequestId { addr, chan: tx }).await;
let response = rx.await.unwrap();
match response {
RaftResponse::WrongLeader {
leader_id,
leader_addr,
} => {
warn!("sending wrong leader");
Ok(Response::new(raft_service::IdRequestReponse {
code: raft_service::ResultCode::WrongLeader as i32,
data: serialize(&(leader_id, leader_addr)).unwrap(),
}))
}
RaftResponse::IdReserved {
leader_id,
reserved_id,
peer_addrs,
} => Ok(Response::new(raft_service::IdRequestReponse {
code: raft_service::ResultCode::Ok as i32,
data: serialize(&(leader_id, reserved_id, peer_addrs)).unwrap(),
})),
_ => unreachable!(),
}
}
async fn change_config(
&self,
req: Request<ConfChange>,
) -> Result<Response<raft_service::RaftResponse>, Status> {
let change = req.into_inner();
let sender = self.snd.clone();
let (tx, rx) = oneshot::channel();
let message = Message::ConfigChange { change, chan: tx };
match sender.send(message).await {
Ok(_) => (),
Err(_) => error!("send error"),
}
let mut reply = raft_service::RaftResponse::default();
match timeout(Duration::from_secs(2), rx).await {
Ok(Ok(raft_response)) => {
reply.inner = serialize(&raft_response).expect("serialize error");
}
Ok(_) => (),
Err(_e) => {
reply.inner = serialize(&RaftResponse::Error).unwrap();
error!("timeout waiting for reply");
}
}
Ok(Response::new(reply))
}
async fn send_message(
&self,
request: Request<RaftMessage>,
) -> Result<Response<raft_service::RaftResponse>, Status> {
let message = request.into_inner();
let sender = self.snd.clone();
match sender.send(Message::Raft(Box::new(message))).await {
Ok(_) => (),
Err(_) => error!("send error"),
}
let response = RaftResponse::Ok;
Ok(Response::new(raft_service::RaftResponse {
inner: serialize(&response).unwrap(),
}))
}
}