1use std::future::Future;
6use std::time::Duration;
7
8use autometrics::autometrics;
9use log::debug;
10use log::error;
11use log::warn;
12use tokio::select;
13use tokio::time::timeout;
14use tokio_util::sync::CancellationToken;
15use tonic::Request;
16use tonic::Response;
17use tonic::Status;
18
19use crate::proto::rpc_service_server::RpcService;
20use crate::proto::AppendEntriesRequest;
21use crate::proto::AppendEntriesResponse;
22use crate::proto::ClientProposeRequest;
23use crate::proto::ClientReadRequest;
24use crate::proto::ClientResponse;
25use crate::proto::ClusteMembershipChangeRequest;
26use crate::proto::ClusterConfUpdateResponse;
27use crate::proto::ClusterMembership;
28use crate::proto::MetadataRequest;
29use crate::proto::VoteRequest;
30use crate::proto::VoteResponse;
31use crate::MaybeCloneOneshot;
32use crate::Node;
33use crate::RaftEvent;
34use crate::RaftOneshot;
35use crate::TypeConfig;
36use crate::API_SLO;
37
38#[tonic::async_trait]
39impl<T> RpcService for Node<T>
40where T: TypeConfig
41{
42 #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
48 #[tracing::instrument]
49 async fn request_vote(
50 &self,
51 request: tonic::Request<VoteRequest>,
52 ) -> std::result::Result<Response<VoteResponse>, Status> {
53 if !self.server_is_ready() {
54 warn!("[rpc|request_vote] Node-{} is not ready!", self.node_id);
55 return Err(Status::unavailable("Service is not ready"));
56 }
57
58 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
59 self.event_tx
60 .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
61 .await
62 .map_err(|_| Status::internal("Event channel closed"))?;
63 let timeout_duration = Duration::from_millis(self.settings.raft.election.election_timeout_min);
64 handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
65 }
66
67 #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
75 #[tracing::instrument]
76 async fn append_entries(
77 &self,
78 request: Request<AppendEntriesRequest>,
79 ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
80 if !self.server_is_ready() {
81 warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
82 return Err(Status::unavailable("Service is not ready"));
83 }
84
85 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
86 self.event_tx
87 .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
88 .await
89 .map_err(|_| Status::internal("Event channel closed"))?;
90
91 let timeout_duration = Duration::from_millis(self.settings.retry.election.timeout_ms);
92
93 handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
94 }
95
96 #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
102 #[tracing::instrument]
103 async fn update_cluster_conf(
104 &self,
105 request: tonic::Request<ClusteMembershipChangeRequest>,
106 ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
107 if !self.server_is_ready() {
108 warn!("[rpc|update_cluster_conf] Node-{} is not ready!", self.node_id);
109 return Err(Status::unavailable("Service is not ready"));
110 }
111
112 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
113 self.event_tx
114 .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
115 .await
116 .map_err(|_| Status::internal("Event channel closed"))?;
117
118 let timeout_duration = Duration::from_millis(self.settings.retry.membership.timeout_ms);
119 handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf").await
120 }
121
122 #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
128 #[tracing::instrument]
129 async fn handle_client_propose(
130 &self,
131 request: tonic::Request<ClientProposeRequest>,
132 ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
133 if !self.server_is_ready() {
134 warn!("[handle_client_propose] Node-{} is not ready!", self.node_id);
135 return Err(Status::unavailable("Service is not ready"));
136 }
137
138 let remote_addr = request.remote_addr();
139 let event_tx = self.event_tx.clone();
140 let timeout_duration = Duration::from_millis(self.settings.raft.general_raft_timeout_duration_in_ms);
141
142 let request_future = async move {
143 let req: ClientProposeRequest = request.into_inner();
144 if req.commands.is_empty() {
146 return Err(Status::invalid_argument("Commands cannot be empty"));
147 }
148
149 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
150 event_tx
151 .send(RaftEvent::ClientPropose(req, resp_tx))
152 .await
153 .map_err(|_| Status::internal("Event channel closed"))?;
154
155 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_propose").await
156 };
157
158 let cancellation_future = async move {
159 warn!("Request from {:?} cancelled by client", remote_addr);
160 Err::<Response<ClientResponse>, Status>(Status::cancelled("Request cancelled by client"))
163 };
164
165 with_cancellation_handler(request_future, cancellation_future).await
166 }
167
168 #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
173 #[tracing::instrument]
174 async fn get_cluster_metadata(
175 &self,
176 request: tonic::Request<MetadataRequest>,
177 ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
178 debug!("receive get_cluster_metadata");
179 if !self.server_is_ready() {
180 warn!("[rpc|get_cluster_metadata] Node-{} is not ready!", self.node_id);
181 return Err(Status::unavailable("Service is not ready"));
182 }
183
184 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
185 self.event_tx
186 .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
187 .await
188 .map_err(|_| Status::internal("Event channel closed"))?;
189
190 let timeout_duration = Duration::from_millis(self.settings.raft.general_raft_timeout_duration_in_ms);
191 handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
192 }
193
194 #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
200 #[tracing::instrument]
201 async fn handle_client_read(
202 &self,
203 request: tonic::Request<ClientReadRequest>,
204 ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
205 if !self.server_is_ready() {
206 warn!("handle_client_read: Node-{} is not ready!", self.node_id);
207 return Err(Status::unavailable("Service is not ready"));
208 }
209
210 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
211 self.event_tx
212 .send(RaftEvent::ClientReadRequest(request.into_inner(), resp_tx))
213 .await
214 .map_err(|_| Status::internal("Event channel closed"))?;
215
216 let timeout_duration = Duration::from_millis(self.settings.raft.general_raft_timeout_duration_in_ms);
217 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read").await
218 }
219}
220
221pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
227 request_future: FRequest,
228 cancellation_future: FCancellation,
229) -> std::result::Result<Response<ClientResponse>, Status>
230where
231 FRequest: Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
232 FCancellation: Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
233{
234 let token = CancellationToken::new();
235 let _drop_guard = token.clone().drop_guard();
238 let select_task = tokio::spawn(async move {
239 select! {
242 res = request_future => res,
243 _ = token.cancelled() => cancellation_future.await,
244 }
245 });
246
247 select_task.await.unwrap()
248}
249
250async fn handle_rpc_timeout<T, E>(
259 resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
260 timeout_duration: Duration,
261 rpc_name: &'static str,
262) -> Result<Response<T>, Status>
263where
264 T: std::fmt::Debug,
265 E: std::fmt::Debug,
266{
267 debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
268
269 match timeout(timeout_duration, resp_rx).await {
270 Ok(Ok(Ok(response))) => {
271 debug!("[{}] Success response: {:?}", rpc_name, &response);
272 Ok(Response::new(response))
273 }
274 Ok(Ok(Err(status))) => {
275 error!("[{}] Error status: {:?}", rpc_name, &status);
276 Err(status)
277 }
278 Ok(Err(e)) => {
279 error!("[{}] Channel error: {:?}", rpc_name, e);
280 Err(Status::deadline_exceeded("RPC channel closed"))
281 }
282 Err(_) => {
283 warn!("[{}] Response timeout after {:?}", rpc_name, timeout_duration);
284 Err(Status::deadline_exceeded("RPC timeout exceeded"))
285 }
286 }
287}