1use tonic::{Request, Response, Status};
2
3use crate::raft_rpc::raft_rpc_server::RaftRpc;
4use crate::raft_rpc::raft_rpc_client::RaftRpcClient;
5
6use crate::raft_rpc::{AppendEntriesRequest, AppendEntriesResponse};
7use crate::raft_rpc::{RequestVoteRequest, RequestVoteResponse};
8use crate::raft_rpc::{ForwardEntryRequest, ForwardEntryResponse};
9use crate::raft_rpc::{SnapshotRequest, SnapshotResponse};
10
11use crate::{Node, RaftMessage, ClientData};
12
13use tokio::sync::{mpsc, oneshot};
14
15use tracing::{info, error};
16
17#[derive(Debug)]
18pub struct RaftRpcService<T: ClientData> {
19 tx_rpc: mpsc::UnboundedSender<RaftMessage<T>>
20}
21
22impl<T: ClientData> RaftRpcService<T> {
23 pub fn new(tx_rpc: mpsc::UnboundedSender<RaftMessage<T>>) -> RaftRpcService<T> {
24 RaftRpcService { tx_rpc }
25 }
26}
27
28#[tonic::async_trait]
29impl<T: ClientData> RaftRpc for RaftRpcService<T> {
30 async fn append_entries(&self,
31 request: Request<AppendEntriesRequest>) ->
32 Result<Response<AppendEntriesResponse>, Status> {
33 let (tx, rx) = oneshot::channel();
34 let resp = self.tx_rpc.send(RaftMessage::AppendMsg{
35 body: request.into_inner(),
36 tx
37 });
38 if let Err(err) = resp {
39 return Err(Status::internal(err.to_string()));
40 }
41 let resp = match rx.await {
42 Ok(msg) => msg,
43 Err(err) => return Err(Status::internal(err.to_string()))
44 };
45 match resp {
46 RaftMessage::AppendResp{payload, status} => {
47 if let Some(status) = status {
48 return Err(status);
49 }
50 return Ok(Response::new(payload.unwrap()));
51 },
52 _ => {return Err(Status::unknown("Unkown response recived"));}
53 }
54 }
55
56 async fn request_vote(&self, request: Request<RequestVoteRequest>)
57 -> Result<Response<RequestVoteResponse>, Status> {
58 let (tx, rx) = oneshot::channel();
59 let body = request.into_inner();
60 info!("{:?} Asked for vote", &body.candidate_id);
61 let resp = self.tx_rpc.send(RaftMessage::VoteMsg{
62 body,
63 tx
64 });
65 if let Err(err) = resp {
66 return Err(Status::internal(err.to_string()));
67 }
68 let resp = match rx.await {
69 Ok(msg) => msg,
70 Err(err) => return Err(Status::internal(err.to_string()))
71 };
72 match resp {
73 RaftMessage::VoteResp{payload, status} => {
74 if let Some(status) = status {
75 return Err(status);
76 }
77 return Ok(Response::new(payload));
78 },
79 _ => {return Err(Status::unknown("Unkown response recived"));}
80 }
81 }
82
83 async fn forward_entry(&self, request: Request<ForwardEntryRequest>) ->
84 Result<Response<ForwardEntryResponse>, Status> {
85 let (tx, rx) = oneshot::channel();
86 let req = request.into_inner();
87
88 let entity: T = match serde_json::from_str(&req.payload) {
89 Ok(entity) => entity,
90 Err(err) => {
91 error!(cause = %err, "Caused an error: ");
92 return Err(Status::invalid_argument("Could not parse the entity"));
93 }
94 };
95
96 let msg = if req.iswrite {
97 RaftMessage::ClientWriteMsg {
98 body: entity,
99 tx
100 }
101 } else {
102 RaftMessage::ClientReadMsg {
103 body: entity,
104 tx
105 }
106 };
107 let resp = self.tx_rpc.send(msg);
108
109 if let Err(err) = resp {
110 return Err(Status::internal(err.to_string()));
111 }
112 let resp = match rx.await {
113 Ok(msg) => msg,
114 Err(err) => return Err(Status::internal(err.to_string()))
115 };
116 match resp {
117 RaftMessage::ClientResp{body} => {
118 let payload = serde_json::to_string(&body).unwrap();
119 return Ok(Response::new(ForwardEntryResponse { payload }));
120 },
121 _ => {return Err(Status::unknown("Unkown response recived"));}
122 }
123
124 }
125
126 async fn install_snapshot(&self, request: Request<SnapshotRequest>) ->
127 Result<Response<SnapshotResponse>, Status> {
128 let (tx, rx) = oneshot::channel();
129 let body = request.into_inner();
130 info!("{:?} Recived a snapshot", &body.leader_id);
131 let resp = self.tx_rpc.send(RaftMessage::InstallSnapshot{
132 body,
133 tx
134 });
135 if let Err(err) = resp {
136 return Err(Status::internal(err.to_string()));
137 }
138 let resp = match rx.await {
139 Ok(msg) => msg,
140 Err(err) => return Err(Status::internal(err.to_string()))
141 };
142 match resp {
143 RaftMessage::InstallSnapshotResp{payload, status} => {
144 if let Some(status) = status {
145 return Err(status);
146 }
147 return Ok(Response::new(payload));
148 },
149 _ => {return Err(Status::unknown("Unkown response recived"));}
150 }
151
152 }
153
154}
155
156pub async fn ask_for_vote(node: &Node, request: RequestVoteRequest)
157 -> crate::Result<RequestVoteResponse> {
158 info!("Asking {} for vote", node.id);
159 let addr = format!("http://{}:{}", node.ip, node.port);
160 let mut client = RaftRpcClient::connect(addr).await?;
161 let response = client.request_vote(request).await?;
162 let resp = response.into_inner();
163 info!("Answerd with {:?}", resp);
164 Ok(resp)
165}
166
167pub async fn append_entries(node: &Node, request: AppendEntriesRequest)
168 -> crate::Result<AppendEntriesResponse> {
169 let addr = format!("http://{}:{}", node.ip, node.port);
170 let mut client = RaftRpcClient::connect(addr).await?;
171 let response = client.append_entries(request).await?;
172 Ok(response.into_inner())
173}
174
175pub async fn forward(node: &Node, request: ForwardEntryRequest)
176 -> crate::Result<ForwardEntryResponse> {
177 let addr = format!("http://{}:{}", node.ip, node.port);
178 info!("Forwarding entry to {}:{}", node.ip, node.port);
179 let mut client = RaftRpcClient::connect(addr).await?;
180 let response = client.forward_entry(request).await?;
181 Ok(response.into_inner())
182}
183
184pub async fn install_snapshot(node: &Node, request: SnapshotRequest)
185 -> crate::Result<SnapshotResponse> {
186 info!("Sending snapshot to {}", node.id);
187 let addr = format!("http://{}:{}", node.ip, node.port);
188 let mut client = RaftRpcClient::connect(addr).await?;
189 let response = client.install_snapshot(request).await?;
190 let resp = response.into_inner();
191 info!("Answerd with {:?}", resp);
192 Ok(resp)
193}