d_engine_server/network/grpc/
grpc_raft_service.rs1use std::future::Future;
6use std::pin::Pin;
7use std::time::Duration;
8
9use d_engine_core::MaybeCloneOneshot;
10use d_engine_core::RaftEvent;
11use d_engine_core::RaftOneshot;
12use d_engine_core::StreamResponseSender;
13use d_engine_core::TypeConfig;
14use d_engine_proto::client::ClientReadRequest;
15use d_engine_proto::client::ClientResponse;
16use d_engine_proto::client::ClientWriteRequest;
17use d_engine_proto::client::WatchRequest;
18use d_engine_proto::client::raft_client_service_server::RaftClientService;
19use d_engine_proto::server::cluster::ClusterConfChangeRequest;
20use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
21use d_engine_proto::server::cluster::ClusterMembership;
22use d_engine_proto::server::cluster::JoinRequest;
23use d_engine_proto::server::cluster::JoinResponse;
24use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
25use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
26use d_engine_proto::server::cluster::MetadataRequest;
27use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
28use d_engine_proto::server::election::VoteRequest;
29use d_engine_proto::server::election::VoteResponse;
30use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
31use d_engine_proto::server::replication::AppendEntriesRequest;
32use d_engine_proto::server::replication::AppendEntriesResponse;
33use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
34use d_engine_proto::server::storage::PurgeLogRequest;
35use d_engine_proto::server::storage::PurgeLogResponse;
36use d_engine_proto::server::storage::SnapshotAck;
37use d_engine_proto::server::storage::SnapshotChunk;
38use d_engine_proto::server::storage::SnapshotResponse;
39use d_engine_proto::server::storage::snapshot_service_server::SnapshotService;
40use futures::Stream;
41#[cfg(feature = "watch")]
42use futures::StreamExt;
43use tokio::select;
44use tokio::time::timeout;
45use tokio_util::sync::CancellationToken;
46use tonic::Request;
47use tonic::Response;
48use tonic::Status;
49use tonic::Streaming;
50use tracing::debug;
51use tracing::error;
52#[cfg(feature = "watch")]
53use tracing::info;
54use tracing::warn;
55
56use crate::Node;
57
58#[tonic::async_trait]
59impl<T> RaftElectionService for Node<T>
60where
61 T: TypeConfig,
62{
63 async fn request_vote(
69 &self,
70 request: tonic::Request<VoteRequest>,
71 ) -> std::result::Result<Response<VoteResponse>, Status> {
72 if !self.is_rpc_ready() {
73 warn!(
74 "[rpc|request_vote] My raft setup(Node:{}) is not ready!",
75 self.node_id
76 );
77 return Err(Status::unavailable("Service is not ready"));
78 }
79
80 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
81 self.event_tx
82 .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
83 .await
84 .map_err(|_| Status::internal("Event channel closed"))?;
85 let timeout_duration =
86 Duration::from_millis(self.node_config.raft.election.election_timeout_min);
87 handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
88 }
89}
90#[tonic::async_trait]
91impl<T> RaftReplicationService for Node<T>
92where
93 T: TypeConfig,
94{
95 async fn append_entries(
103 &self,
104 request: Request<AppendEntriesRequest>,
105 ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
106 if !self.is_rpc_ready() {
107 warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
108 return Err(Status::unavailable("Service is not ready"));
109 }
110
111 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
112 self.event_tx
113 .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
114 .await
115 .map_err(|_| Status::internal("Event channel closed"))?;
116
117 let timeout_duration =
118 Duration::from_millis(self.node_config.retry.append_entries.timeout_ms);
119
120 handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
121 }
122}
123
124#[tonic::async_trait]
125impl<T> SnapshotService for Node<T>
126where
127 T: TypeConfig,
128{
129 type StreamSnapshotStream = tonic::Streaming<SnapshotChunk>;
130
131 async fn stream_snapshot(
132 &self,
133 request: tonic::Request<tonic::Streaming<SnapshotAck>>,
134 ) -> std::result::Result<tonic::Response<Self::StreamSnapshotStream>, tonic::Status> {
135 if !self.is_rpc_ready() {
136 warn!("stream_snapshot: Node-{} is not ready!", self.node_id);
137 return Err(Status::unavailable("Service is not ready"));
138 }
139
140 let (resp_tx, resp_rx) = StreamResponseSender::new();
141
142 self.event_tx
143 .send(RaftEvent::StreamSnapshot(
144 Box::new(request.into_inner()),
145 resp_tx,
146 ))
147 .await
148 .map_err(|_| Status::internal("Event channel closed"))?;
149
150 let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
151
152 handle_rpc_timeout(
153 async { resp_rx.await.map_err(|_| Status::internal("Response channel closed")) },
154 timeout_duration,
155 "stream_snapshot",
156 )
157 .await
158 }
159
160 async fn install_snapshot(
161 &self,
162 request: tonic::Request<Streaming<SnapshotChunk>>,
163 ) -> std::result::Result<tonic::Response<SnapshotResponse>, tonic::Status> {
164 if !self.is_rpc_ready() {
165 warn!("install_snapshot: Node-{} is not ready!", self.node_id);
166 return Err(Status::unavailable("Service is not ready"));
167 }
168
169 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
170
171 self.event_tx
172 .send(RaftEvent::InstallSnapshotChunk(
173 Box::new(request.into_inner()),
174 resp_tx,
175 ))
176 .await
177 .map_err(|_| Status::internal("Event channel closed"))?;
178
179 let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
180 handle_rpc_timeout(resp_rx, timeout_duration, "install_snapshot").await
181 }
182
183 async fn purge_log(
184 &self,
185 request: tonic::Request<PurgeLogRequest>,
186 ) -> std::result::Result<tonic::Response<PurgeLogResponse>, Status> {
187 if !self.is_rpc_ready() {
188 warn!("purge_log: Node-{} is not ready!", self.node_id);
189 return Err(Status::unavailable("Service is not ready"));
190 }
191
192 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
193
194 self.event_tx
195 .send(RaftEvent::RaftLogCleanUp(request.into_inner(), resp_tx))
196 .await
197 .map_err(|_| Status::internal("Event channel closed"))?;
198
199 let timeout_duration =
200 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
201 handle_rpc_timeout(resp_rx, timeout_duration, "purge_log").await
202 }
203}
204
205#[tonic::async_trait]
206impl<T> ClusterManagementService for Node<T>
207where
208 T: TypeConfig,
209{
210 async fn update_cluster_conf(
216 &self,
217 request: tonic::Request<ClusterConfChangeRequest>,
218 ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
219 if !self.is_rpc_ready() {
220 warn!(
221 "[rpc|update_cluster_conf_from_leader] Node-{} is not ready!",
222 self.node_id
223 );
224 return Err(Status::unavailable("Service is not ready"));
225 }
226
227 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
228 self.event_tx
229 .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
230 .await
231 .map_err(|_| Status::internal("Event channel closed"))?;
232
233 let timeout_duration = Duration::from_millis(self.node_config.retry.membership.timeout_ms);
234 handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf_from_leader").await
235 }
236
237 async fn get_cluster_metadata(
242 &self,
243 request: tonic::Request<MetadataRequest>,
244 ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
245 debug!("receive get_cluster_metadata");
246 if !self.is_rpc_ready() {
247 warn!(
248 "[rpc|get_cluster_metadata] Node-{} is not ready!",
249 self.node_id
250 );
251 return Err(Status::unavailable("Service is not ready"));
252 }
253
254 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
255 self.event_tx
256 .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
257 .await
258 .map_err(|_| Status::internal("Event channel closed"))?;
259
260 let timeout_duration =
261 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
262 handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
263 }
264
265 async fn join_cluster(
267 &self,
268 request: tonic::Request<JoinRequest>,
269 ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
270 debug!("receive join_cluster");
271 if !self.is_rpc_ready() {
272 warn!("[rpc|join_cluster] Node-{} is not ready!", self.node_id);
273 return Err(Status::unavailable("Service is not ready"));
274 }
275
276 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
277 self.event_tx
278 .send(RaftEvent::JoinCluster(request.into_inner(), resp_tx))
279 .await
280 .map_err(|_| Status::internal("Event channel closed"))?;
281
282 let timeout_duration =
283 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
284 handle_rpc_timeout(resp_rx, timeout_duration, "join_cluster").await
285 }
286
287 async fn discover_leader(
288 &self,
289 request: tonic::Request<LeaderDiscoveryRequest>,
290 ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
291 debug!("receive discover_leader");
292 if !self.is_rpc_ready() {
293 warn!("[rpc|discover_leader] Node-{} is not ready!", self.node_id);
294 return Err(Status::unavailable("Service is not ready"));
295 }
296
297 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
298 self.event_tx
299 .send(RaftEvent::DiscoverLeader(request.into_inner(), resp_tx))
300 .await
301 .map_err(|_| Status::internal("Event channel closed"))?;
302
303 let timeout_duration =
304 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
305 handle_rpc_timeout(resp_rx, timeout_duration, "discover_leader").await
306 }
307}
308#[tonic::async_trait]
309impl<T> RaftClientService for Node<T>
310where
311 T: TypeConfig,
312{
313 type WatchStream =
314 Pin<Box<dyn Stream<Item = Result<d_engine_proto::client::WatchResponse, Status>> + Send>>;
315
316 async fn handle_client_write(
322 &self,
323 request: tonic::Request<ClientWriteRequest>,
324 ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
325 if !self.is_rpc_ready() {
326 warn!("[handle_client_write] Node-{} is not ready!", self.node_id);
327 return Err(Status::unavailable("Service is not ready"));
328 }
329
330 let remote_addr = request.remote_addr();
331 let event_tx = self.event_tx.clone();
332 let timeout_duration =
333 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
334
335 let request_future = async move {
336 let req: ClientWriteRequest = request.into_inner();
337 if req.commands.is_empty() {
339 return Err(Status::invalid_argument("Commands cannot be empty"));
340 }
341
342 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
343 event_tx
344 .send(RaftEvent::ClientPropose(req, resp_tx))
345 .await
346 .map_err(|_| Status::internal("Event channel closed"))?;
347
348 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_write").await
349 };
350
351 let cancellation_future = async move {
352 warn!("Request from {:?} cancelled by client", remote_addr);
353 Err::<Response<ClientResponse>, Status>(Status::cancelled(
356 "Request cancelled by client",
357 ))
358 };
359
360 with_cancellation_handler(request_future, cancellation_future).await
361 }
362
363 async fn handle_client_read(
369 &self,
370 request: tonic::Request<ClientReadRequest>,
371 ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
372 if !self.is_rpc_ready() {
373 warn!("handle_client_read: Node-{} is not ready!", self.node_id);
374 return Err(Status::unavailable("Service is not ready"));
375 }
376
377 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
378 self.event_tx
379 .send(RaftEvent::ClientReadRequest(request.into_inner(), resp_tx))
380 .await
381 .map_err(|_| Status::internal("Event channel closed"))?;
382
383 let timeout_duration =
384 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
385 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read").await
386 }
387
388 #[cfg(feature = "watch")]
402 async fn watch(
403 &self,
404 request: tonic::Request<WatchRequest>,
405 ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
406 let watch_request = request.into_inner();
407 let key = watch_request.key;
408
409 let registry = self.watch_registry.as_ref().ok_or_else(|| {
411 Status::unavailable("Watch feature is disabled in server configuration")
412 })?;
413
414 info!(
415 node_id = self.node_id,
416 key = ?key,
417 "Registering watch for key"
418 );
419
420 let handle = registry.register(key);
422 let (_watcher_id, _key, receiver) = handle.into_receiver();
423
424 let stream = tokio_stream::wrappers::ReceiverStream::new(receiver)
428 .map(Ok)
429 .chain(futures::stream::once(async {
430 Err(Status::unavailable(
431 "Watch stream closed: server may have shut down or restarted. Please reconnect and re-register the watcher."
432 ))
433 }));
434
435 Ok(tonic::Response::new(Box::pin(stream)))
436 }
437
438 #[cfg(not(feature = "watch"))]
439 async fn watch(
440 &self,
441 _request: tonic::Request<WatchRequest>,
442 ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
443 Err(Status::unimplemented(
444 "Watch feature is not compiled in this build",
445 ))
446 }
447}
448
449pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
455 request_future: FRequest,
456 cancellation_future: FCancellation,
457) -> std::result::Result<Response<ClientResponse>, Status>
458where
459 FRequest:
460 Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
461 FCancellation:
462 Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
463{
464 let token = CancellationToken::new();
465 let _drop_guard = token.clone().drop_guard();
468 let select_task = tokio::spawn(async move {
469 select! {
472 res = request_future => res,
473 _ = token.cancelled() => cancellation_future.await,
474 }
475 });
476
477 select_task.await.unwrap()
478}
479
480async fn handle_rpc_timeout<T, E>(
489 resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
490 timeout_duration: Duration,
491 rpc_name: &'static str,
492) -> Result<Response<T>, Status>
493where
494 T: std::fmt::Debug,
495 E: std::fmt::Debug,
496{
497 debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
498
499 match timeout(timeout_duration, resp_rx).await {
500 Ok(Ok(Ok(response))) => {
501 debug!("[{}] Success response: {:?}", rpc_name, &response);
502 Ok(Response::new(response))
503 }
504 Ok(Ok(Err(status))) => {
505 error!("[{}] Error status: {:?}", rpc_name, &status);
506 Err(status)
507 }
508 Ok(Err(e)) => {
509 error!("[{}] Channel error: {:?}", rpc_name, e);
510 Err(Status::deadline_exceeded("RPC channel closed"))
511 }
512 Err(_) => {
513 warn!(
514 "[{}] Response timeout after {:?}",
515 rpc_name, timeout_duration
516 );
517 Err(Status::deadline_exceeded("RPC timeout exceeded"))
518 }
519 }
520}