d_engine_server/network/grpc/
grpc_raft_service.rs1use std::future::Future;
6use std::pin::Pin;
7use std::time::Duration;
8
9use d_engine_core::MaybeCloneOneshot;
10use d_engine_core::RaftEvent;
11use d_engine_core::RaftOneshot;
12use d_engine_core::StreamResponseSender;
13use d_engine_core::TypeConfig;
14use d_engine_proto::client::ClientReadRequest;
15use d_engine_proto::client::ClientResponse;
16use d_engine_proto::client::ClientWriteRequest;
17use d_engine_proto::client::WatchRequest;
18use d_engine_proto::client::raft_client_service_server::RaftClientService;
19use d_engine_proto::server::cluster::ClusterConfChangeRequest;
20use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
21use d_engine_proto::server::cluster::ClusterMembership;
22use d_engine_proto::server::cluster::JoinRequest;
23use d_engine_proto::server::cluster::JoinResponse;
24use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
25use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
26use d_engine_proto::server::cluster::MetadataRequest;
27use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
28use d_engine_proto::server::election::VoteRequest;
29use d_engine_proto::server::election::VoteResponse;
30use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
31use d_engine_proto::server::replication::AppendEntriesRequest;
32use d_engine_proto::server::replication::AppendEntriesResponse;
33use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
34use d_engine_proto::server::storage::SnapshotAck;
35use d_engine_proto::server::storage::SnapshotChunk;
36use d_engine_proto::server::storage::SnapshotResponse;
37use d_engine_proto::server::storage::snapshot_service_server::SnapshotService;
38use futures::Stream;
39#[cfg(feature = "watch")]
40use futures::StreamExt;
41use tokio::select;
42use tokio::time::timeout;
43use tokio_util::sync::CancellationToken;
44use tonic::Request;
45use tonic::Response;
46use tonic::Status;
47use tonic::Streaming;
48use tracing::debug;
49use tracing::error;
50#[cfg(feature = "watch")]
51use tracing::info;
52use tracing::warn;
53
54use crate::Node;
55
56#[tonic::async_trait]
57impl<T> RaftElectionService for Node<T>
58where
59 T: TypeConfig,
60{
61 async fn request_vote(
67 &self,
68 request: tonic::Request<VoteRequest>,
69 ) -> std::result::Result<Response<VoteResponse>, Status> {
70 if !self.is_rpc_ready() {
71 warn!(
72 "[rpc|request_vote] My raft setup(Node:{}) is not ready!",
73 self.node_id
74 );
75 return Err(Status::unavailable("Service is not ready"));
76 }
77
78 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
79 self.event_tx
80 .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
81 .await
82 .map_err(|_| Status::internal("Event channel closed"))?;
83 let timeout_duration =
84 Duration::from_millis(self.node_config.raft.election.election_timeout_min);
85 handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
86 }
87}
88#[tonic::async_trait]
89impl<T> RaftReplicationService for Node<T>
90where
91 T: TypeConfig,
92{
93 async fn append_entries(
101 &self,
102 request: Request<AppendEntriesRequest>,
103 ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
104 if !self.is_rpc_ready() {
105 warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
106 return Err(Status::unavailable("Service is not ready"));
107 }
108
109 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
110 self.event_tx
111 .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
112 .await
113 .map_err(|_| Status::internal("Event channel closed"))?;
114
115 let timeout_duration =
116 Duration::from_millis(self.node_config.retry.append_entries.timeout_ms);
117
118 handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
119 }
120}
121
122#[tonic::async_trait]
123impl<T> SnapshotService for Node<T>
124where
125 T: TypeConfig,
126{
127 type StreamSnapshotStream = tonic::Streaming<SnapshotChunk>;
128
129 async fn stream_snapshot(
130 &self,
131 request: tonic::Request<tonic::Streaming<SnapshotAck>>,
132 ) -> std::result::Result<tonic::Response<Self::StreamSnapshotStream>, tonic::Status> {
133 if !self.is_rpc_ready() {
134 warn!("stream_snapshot: Node-{} is not ready!", self.node_id);
135 return Err(Status::unavailable("Service is not ready"));
136 }
137
138 let (resp_tx, resp_rx) = StreamResponseSender::new();
139
140 self.event_tx
141 .send(RaftEvent::StreamSnapshot(
142 Box::new(request.into_inner()),
143 resp_tx,
144 ))
145 .await
146 .map_err(|_| Status::internal("Event channel closed"))?;
147
148 let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
149
150 handle_rpc_timeout(
151 async { resp_rx.await.map_err(|_| Status::internal("Response channel closed")) },
152 timeout_duration,
153 "stream_snapshot",
154 )
155 .await
156 }
157
158 async fn install_snapshot(
159 &self,
160 request: tonic::Request<Streaming<SnapshotChunk>>,
161 ) -> std::result::Result<tonic::Response<SnapshotResponse>, tonic::Status> {
162 if !self.is_rpc_ready() {
163 warn!("install_snapshot: Node-{} is not ready!", self.node_id);
164 return Err(Status::unavailable("Service is not ready"));
165 }
166
167 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
168
169 self.event_tx
170 .send(RaftEvent::InstallSnapshotChunk(
171 Box::new(request.into_inner()),
172 resp_tx,
173 ))
174 .await
175 .map_err(|_| Status::internal("Event channel closed"))?;
176
177 let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
178 handle_rpc_timeout(resp_rx, timeout_duration, "install_snapshot").await
179 }
180}
181
182#[tonic::async_trait]
183impl<T> ClusterManagementService for Node<T>
184where
185 T: TypeConfig,
186{
187 async fn update_cluster_conf(
193 &self,
194 request: tonic::Request<ClusterConfChangeRequest>,
195 ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
196 if !self.is_rpc_ready() {
197 warn!(
198 "[rpc|update_cluster_conf_from_leader] Node-{} is not ready!",
199 self.node_id
200 );
201 return Err(Status::unavailable("Service is not ready"));
202 }
203
204 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
205 self.event_tx
206 .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
207 .await
208 .map_err(|_| Status::internal("Event channel closed"))?;
209
210 let timeout_duration = Duration::from_millis(self.node_config.retry.membership.timeout_ms);
211 handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf_from_leader").await
212 }
213
214 async fn get_cluster_metadata(
219 &self,
220 request: tonic::Request<MetadataRequest>,
221 ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
222 debug!("receive get_cluster_metadata");
223 if !self.is_rpc_ready() {
224 warn!(
225 "[rpc|get_cluster_metadata] Node-{} is not ready!",
226 self.node_id
227 );
228 return Err(Status::unavailable("Service is not ready"));
229 }
230
231 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
232 self.event_tx
233 .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
234 .await
235 .map_err(|_| Status::internal("Event channel closed"))?;
236
237 let timeout_duration =
238 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
239 handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
240 }
241
242 async fn join_cluster(
244 &self,
245 request: tonic::Request<JoinRequest>,
246 ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
247 debug!("receive join_cluster");
248 if !self.is_rpc_ready() {
249 warn!("[rpc|join_cluster] Node-{} is not ready!", self.node_id);
250 return Err(Status::unavailable("Service is not ready"));
251 }
252
253 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
254 self.event_tx
255 .send(RaftEvent::JoinCluster(request.into_inner(), resp_tx))
256 .await
257 .map_err(|_| Status::internal("Event channel closed"))?;
258
259 let timeout_duration =
260 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
261 handle_rpc_timeout(resp_rx, timeout_duration, "join_cluster").await
262 }
263
264 async fn discover_leader(
265 &self,
266 request: tonic::Request<LeaderDiscoveryRequest>,
267 ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
268 debug!("receive discover_leader");
269 if !self.is_rpc_ready() {
270 warn!("[rpc|discover_leader] Node-{} is not ready!", self.node_id);
271 return Err(Status::unavailable("Service is not ready"));
272 }
273
274 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
275 self.event_tx
276 .send(RaftEvent::DiscoverLeader(request.into_inner(), resp_tx))
277 .await
278 .map_err(|_| Status::internal("Event channel closed"))?;
279
280 let timeout_duration =
281 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
282 handle_rpc_timeout(resp_rx, timeout_duration, "discover_leader").await
283 }
284}
285#[tonic::async_trait]
286impl<T> RaftClientService for Node<T>
287where
288 T: TypeConfig,
289{
290 type WatchStream =
291 Pin<Box<dyn Stream<Item = Result<d_engine_proto::client::WatchResponse, Status>> + Send>>;
292
293 async fn handle_client_write(
299 &self,
300 request: tonic::Request<ClientWriteRequest>,
301 ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
302 if !self.is_rpc_ready() {
303 warn!("[handle_client_write] Node-{} is not ready!", self.node_id);
304 return Err(Status::unavailable("Service is not ready"));
305 }
306
307 let remote_addr = request.remote_addr();
308 let timeout_duration =
309 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
310
311 let cmd_tx = self.cmd_tx.clone();
313
314 let request_future = async move {
315 let req: ClientWriteRequest = request.into_inner();
316 if req.command.is_none() {
318 return Err(Status::invalid_argument("Command cannot be empty"));
319 }
320
321 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
322 cmd_tx
323 .send(d_engine_core::ClientCmd::Propose(req, resp_tx))
324 .map_err(|_| Status::internal("Command channel closed"))?;
325
326 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_write").await
327 };
328
329 let cancellation_future = async move {
330 warn!("Request from {:?} cancelled by client", remote_addr);
331 Err::<Response<ClientResponse>, Status>(Status::cancelled(
334 "Request cancelled by client",
335 ))
336 };
337
338 with_cancellation_handler(request_future, cancellation_future).await
339 }
340
341 async fn handle_client_read(
347 &self,
348 request: tonic::Request<ClientReadRequest>,
349 ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
350 if !self.is_rpc_ready() {
351 warn!("handle_client_read: Node-{} is not ready!", self.node_id);
352 return Err(Status::unavailable("Service is not ready"));
353 }
354
355 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
356 self.cmd_tx
357 .send(d_engine_core::ClientCmd::Read(
358 request.into_inner(),
359 resp_tx,
360 ))
361 .map_err(|_| Status::internal("Command channel closed"))?;
362
363 let timeout_duration =
364 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
365 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read").await
366 }
367
368 #[cfg(feature = "watch")]
382 async fn watch(
383 &self,
384 request: tonic::Request<WatchRequest>,
385 ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
386 let watch_request = request.into_inner();
387 let key = watch_request.key;
388
389 let registry = self.watch_registry.as_ref().ok_or_else(|| {
391 Status::unavailable("Watch feature is disabled in server configuration")
392 })?;
393
394 info!(
395 node_id = self.node_id,
396 key = ?key,
397 "Registering watch for key"
398 );
399
400 let handle = registry.register(key);
402 let (_watcher_id, _key, receiver) = handle.into_receiver();
403
404 let stream = tokio_stream::wrappers::ReceiverStream::new(receiver)
408 .map(Ok)
409 .chain(futures::stream::once(async {
410 Err(Status::unavailable(
411 "Watch stream closed: server may have shut down or restarted. Please reconnect and re-register the watcher."
412 ))
413 }));
414
415 Ok(tonic::Response::new(Box::pin(stream)))
416 }
417
418 #[cfg(not(feature = "watch"))]
419 async fn watch(
420 &self,
421 _request: tonic::Request<WatchRequest>,
422 ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
423 Err(Status::unimplemented(
424 "Watch feature is not compiled in this build",
425 ))
426 }
427}
428
429pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
435 request_future: FRequest,
436 cancellation_future: FCancellation,
437) -> std::result::Result<Response<ClientResponse>, Status>
438where
439 FRequest:
440 Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
441 FCancellation:
442 Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
443{
444 let token = CancellationToken::new();
445 let _drop_guard = token.clone().drop_guard();
448 let select_task = tokio::spawn(async move {
449 select! {
452 res = request_future => res,
453 _ = token.cancelled() => cancellation_future.await,
454 }
455 });
456
457 select_task.await.unwrap()
458}
459
460async fn handle_rpc_timeout<T, E>(
469 resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
470 timeout_duration: Duration,
471 rpc_name: &'static str,
472) -> Result<Response<T>, Status>
473where
474 T: std::fmt::Debug,
475 E: std::fmt::Debug,
476{
477 debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
478
479 match timeout(timeout_duration, resp_rx).await {
480 Ok(Ok(Ok(response))) => {
481 debug!("[{}] Success response: {:?}", rpc_name, &response);
482 Ok(Response::new(response))
483 }
484 Ok(Ok(Err(status))) => {
485 error!("[{}] Error status: {:?}", rpc_name, &status);
486 Err(status)
487 }
488 Ok(Err(e)) => {
489 error!("[{}] Channel error: {:?}", rpc_name, e);
490 Err(Status::deadline_exceeded("RPC channel closed"))
491 }
492 Err(_) => {
493 warn!(
494 "[{}] Response timeout after {:?}",
495 rpc_name, timeout_duration
496 );
497 Err(Status::deadline_exceeded("RPC timeout exceeded"))
498 }
499 }
500}