gandalf_consensus/
rpc.rs

1use tonic::{Request, Response, Status};
2
3use crate::raft_rpc::raft_rpc_server::RaftRpc;
4use crate::raft_rpc::raft_rpc_client::RaftRpcClient;
5
6use crate::raft_rpc::{AppendEntriesRequest, AppendEntriesResponse};
7use crate::raft_rpc::{RequestVoteRequest, RequestVoteResponse};
8use crate::raft_rpc::{ForwardEntryRequest, ForwardEntryResponse};
9use crate::raft_rpc::{SnapshotRequest, SnapshotResponse};
10
11use crate::{Node, RaftMessage, ClientData};
12
13use tokio::sync::{mpsc, oneshot};
14
15use tracing::{info, error};
16
17#[derive(Debug)]
18pub struct RaftRpcService<T: ClientData> {
19    tx_rpc: mpsc::UnboundedSender<RaftMessage<T>>
20}
21
22impl<T: ClientData> RaftRpcService<T> {
23    pub fn new(tx_rpc: mpsc::UnboundedSender<RaftMessage<T>>) -> RaftRpcService<T> {
24        RaftRpcService { tx_rpc }
25    }
26}
27
28#[tonic::async_trait]
29impl<T: ClientData> RaftRpc for RaftRpcService<T> {
30    async fn append_entries(&self,
31        request: Request<AppendEntriesRequest>) ->
32        Result<Response<AppendEntriesResponse>, Status> {
33        let (tx, rx) = oneshot::channel();
34        let resp = self.tx_rpc.send(RaftMessage::AppendMsg{
35            body: request.into_inner(),
36            tx
37        });
38        if let Err(err) = resp {
39            return Err(Status::internal(err.to_string()));
40        }
41        let resp = match rx.await {
42            Ok(msg) => msg,
43            Err(err) => return Err(Status::internal(err.to_string()))
44        };
45        match resp {
46            RaftMessage::AppendResp{payload, status} => {
47                if let Some(status) = status {
48                    return Err(status);
49                }
50                return Ok(Response::new(payload.unwrap()));
51            },
52            _ => {return Err(Status::unknown("Unkown response recived"));}
53        }
54    }
55
56    async fn request_vote(&self, request: Request<RequestVoteRequest>) 
57        -> Result<Response<RequestVoteResponse>, Status> {
58        let (tx, rx) = oneshot::channel();
59        let body = request.into_inner();
60        info!("{:?} Asked for vote", &body.candidate_id);
61        let resp = self.tx_rpc.send(RaftMessage::VoteMsg{
62            body,
63            tx
64        }); 
65        if let Err(err) = resp {
66            return Err(Status::internal(err.to_string()));
67        }
68        let resp = match rx.await {
69            Ok(msg) => msg,
70            Err(err) => return Err(Status::internal(err.to_string()))
71        };
72        match resp {
73            RaftMessage::VoteResp{payload, status} => {
74                if let Some(status) = status {
75                    return Err(status);
76                }
77                return Ok(Response::new(payload));
78            },
79            _ => {return Err(Status::unknown("Unkown response recived"));}
80        }
81    }
82
83    async fn forward_entry(&self, request: Request<ForwardEntryRequest>) ->
84        Result<Response<ForwardEntryResponse>, Status> {
85        let (tx, rx) = oneshot::channel();
86        let req = request.into_inner();
87
88        let entity: T = match serde_json::from_str(&req.payload) {
89            Ok(entity) => entity,
90            Err(err) => {
91                error!(cause = %err, "Caused an error: ");
92                return Err(Status::invalid_argument("Could not parse the entity")); 
93            }
94        };
95
96        let msg = if req.iswrite {
97            RaftMessage::ClientWriteMsg {
98                body: entity,
99                tx
100            }
101        } else {
102            RaftMessage::ClientReadMsg {
103                body: entity,
104                tx
105            }
106        };
107        let resp = self.tx_rpc.send(msg);
108
109        if let Err(err) = resp {
110            return Err(Status::internal(err.to_string()));
111        }
112        let resp = match rx.await {
113            Ok(msg) => msg,
114            Err(err) => return Err(Status::internal(err.to_string()))
115        };
116        match resp {
117            RaftMessage::ClientResp{body} => {
118                let payload = serde_json::to_string(&body).unwrap();
119                return Ok(Response::new(ForwardEntryResponse { payload }));
120            },
121            _ => {return Err(Status::unknown("Unkown response recived"));}
122        }
123
124    }
125
126    async fn install_snapshot(&self, request: Request<SnapshotRequest>) ->
127        Result<Response<SnapshotResponse>, Status> {
128        let (tx, rx) = oneshot::channel();
129        let body = request.into_inner();
130        info!("{:?} Recived a snapshot", &body.leader_id);
131        let resp = self.tx_rpc.send(RaftMessage::InstallSnapshot{
132            body,
133            tx
134        }); 
135        if let Err(err) = resp {
136            return Err(Status::internal(err.to_string()));
137        }
138        let resp = match rx.await {
139            Ok(msg) => msg,
140            Err(err) => return Err(Status::internal(err.to_string()))
141        };
142        match resp {
143            RaftMessage::InstallSnapshotResp{payload, status} => {
144                if let Some(status) = status {
145                    return Err(status);
146                }
147                return Ok(Response::new(payload));
148            },
149            _ => {return Err(Status::unknown("Unkown response recived"));}
150        }
151        
152    }
153
154}
155
156pub async fn ask_for_vote(node: &Node, request: RequestVoteRequest) 
157    -> crate::Result<RequestVoteResponse> {
158    info!("Asking {} for vote", node.id);
159    let addr = format!("http://{}:{}", node.ip, node.port);
160    let mut client = RaftRpcClient::connect(addr).await?;
161    let response = client.request_vote(request).await?;
162    let resp = response.into_inner();
163    info!("Answerd with {:?}", resp);
164    Ok(resp)
165}
166
167pub async fn append_entries(node: &Node, request: AppendEntriesRequest) 
168    -> crate::Result<AppendEntriesResponse> {
169    let addr = format!("http://{}:{}", node.ip, node.port);
170    let mut client = RaftRpcClient::connect(addr).await?;
171    let response = client.append_entries(request).await?;
172    Ok(response.into_inner())
173}
174
175pub async fn forward(node: &Node, request: ForwardEntryRequest) 
176    -> crate::Result<ForwardEntryResponse> {
177    let addr = format!("http://{}:{}", node.ip, node.port);
178    info!("Forwarding entry to {}:{}", node.ip, node.port);
179    let mut client = RaftRpcClient::connect(addr).await?;
180    let response = client.forward_entry(request).await?;
181    Ok(response.into_inner())
182}
183
184pub async fn install_snapshot(node: &Node, request: SnapshotRequest) 
185    -> crate::Result<SnapshotResponse> {
186    info!("Sending snapshot to {}", node.id);
187    let addr = format!("http://{}:{}", node.ip, node.port);
188    let mut client = RaftRpcClient::connect(addr).await?;
189    let response = client.install_snapshot(request).await?;
190    let resp = response.into_inner();
191    info!("Answerd with {:?}", resp);
192    Ok(resp)
193}