1use std::future::Future;
6use std::pin::Pin;
7use std::time::Duration;
8
9use d_engine_core::MaybeCloneOneshot;
10use d_engine_core::MaybeCloneOneshotReceiver;
11use d_engine_core::RaftEvent;
12use d_engine_core::RaftOneshot;
13use d_engine_core::StreamResponseSender;
14use d_engine_core::TypeConfig;
15#[cfg(feature = "watch")]
16use d_engine_core::WatchError;
17use d_engine_proto::client::ClientReadRequest;
18use d_engine_proto::client::ClientResponse;
19use d_engine_proto::client::ClientWriteRequest;
20use d_engine_proto::client::KvEntry;
21use d_engine_proto::client::MembershipSnapshot as ProtoMembershipSnapshot;
22use d_engine_proto::client::ScanRequest;
23use d_engine_proto::client::ScanResponse;
24use d_engine_proto::client::WatchMembershipRequest;
25use d_engine_proto::client::WatchRequest;
26use d_engine_proto::client::raft_client_service_server::RaftClientService;
27use d_engine_proto::server::cluster::ClusterConfChangeRequest;
28use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
29use d_engine_proto::server::cluster::ClusterMembership;
30use d_engine_proto::server::cluster::JoinRequest;
31use d_engine_proto::server::cluster::JoinResponse;
32use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
33use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
34use d_engine_proto::server::cluster::MetadataRequest;
35use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
36use d_engine_proto::server::election::VoteRequest;
37use d_engine_proto::server::election::VoteResponse;
38use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
39use d_engine_proto::server::replication::AppendEntriesRequest;
40use d_engine_proto::server::replication::AppendEntriesResponse;
41use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
42use d_engine_proto::server::storage::SnapshotAck;
43use d_engine_proto::server::storage::SnapshotChunk;
44use d_engine_proto::server::storage::SnapshotResponse;
45use d_engine_proto::server::storage::snapshot_service_server::SnapshotService;
46use futures::Stream;
47use futures::StreamExt;
48use tokio::select;
49use tokio::sync::mpsc;
50use tokio::time::timeout;
51use tokio_util::sync::CancellationToken;
52use tonic::Request;
53use tonic::Response;
54use tonic::Status;
55use tonic::Streaming;
56use tracing::debug;
57use tracing::error;
58use tracing::info;
59use tracing::warn;
60
61use crate::Node;
62use crate::proto_convert;
63
64#[tonic::async_trait]
65impl<T> RaftElectionService for Node<T>
66where
67 T: TypeConfig,
68{
69 async fn request_vote(
75 &self,
76 request: tonic::Request<VoteRequest>,
77 ) -> std::result::Result<Response<VoteResponse>, Status> {
78 if !self.is_rpc_ready() {
79 warn!(
80 "[rpc|request_vote] My raft setup(Node:{}) is not ready!",
81 self.node_id
82 );
83 return Err(Status::unavailable("Service is not ready"));
84 }
85
86 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
87 self.event_tx
88 .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
89 .await
90 .map_err(|_| Status::internal("Event channel closed"))?;
91 let timeout_duration =
92 Duration::from_millis(self.node_config.raft.election.election_timeout_min);
93 handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
94 }
95}
96#[tonic::async_trait]
97impl<T> RaftReplicationService for Node<T>
98where
99 T: TypeConfig,
100{
101 async fn append_entries(
109 &self,
110 request: Request<AppendEntriesRequest>,
111 ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
112 if !self.is_rpc_ready() {
113 warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
114 return Err(Status::unavailable("Service is not ready"));
115 }
116
117 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
118 self.event_tx
119 .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
120 .await
121 .map_err(|_| Status::internal("Event channel closed"))?;
122
123 let timeout_duration =
124 Duration::from_millis(self.node_config.retry.append_entries.timeout_ms);
125
126 handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
127 }
128
129 type StreamAppendEntriesStream =
130 Pin<Box<dyn Stream<Item = Result<AppendEntriesResponse, Status>> + Send>>;
131
132 async fn stream_append_entries(
140 &self,
141 request: tonic::Request<tonic::Streaming<AppendEntriesRequest>>,
142 ) -> std::result::Result<tonic::Response<Self::StreamAppendEntriesStream>, tonic::Status> {
143 if !self.is_rpc_ready() {
144 warn!(
145 "[rpc|stream_append_entries] Node-{} is not ready!",
146 self.node_id
147 );
148 return Err(Status::unavailable("Service is not ready"));
149 }
150
151 let mut in_stream = request.into_inner();
152 let event_tx = self.event_tx.clone();
153 let ordered_channel_capacity = self.node_config.raft.ordered_channel_capacity;
154 let mut shutdown = self.shutdown_signal.clone();
155
156 let (out_tx, out_rx) = mpsc::channel::<Result<AppendEntriesResponse, Status>>(128);
158
159 let (ordered_tx, mut ordered_rx) = mpsc::channel::<
161 MaybeCloneOneshotReceiver<Result<AppendEntriesResponse, Status>>,
162 >(ordered_channel_capacity);
163
164 tokio::spawn(async move {
169 use futures::StreamExt;
170 loop {
171 tokio::select! {
172 biased;
173 _ = shutdown.changed() => {
174 break;
175 }
176 result = in_stream.next() => {
177 match result {
178 Some(Ok(req)) => {
179 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
180 if event_tx.send(RaftEvent::AppendEntries(req, resp_tx)).await.is_err() {
181 warn!("[stream_append_entries|recv] event_tx closed");
182 break;
183 }
184 if ordered_tx.send(resp_rx).await.is_err() {
185 break;
186 }
187 }
188 Some(Err(e)) => {
189 warn!("[stream_append_entries|recv] stream error: {:?}", e);
190 break;
191 }
192 None => break,
193 }
194 }
195 }
196 }
197 });
198
199 tokio::spawn(async move {
201 while let Some(resp_rx) = ordered_rx.recv().await {
202 let result = match resp_rx.await {
203 Ok(Ok(resp)) => Ok(resp),
204 Ok(Err(status)) => Err(status),
205 Err(_) => Err(Status::internal("Response channel closed")),
206 };
207 if out_tx.send(result).await.is_err() {
208 break;
209 }
210 }
211 });
212
213 let out_stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
214 Ok(tonic::Response::new(Box::pin(out_stream)))
215 }
216}
217
218#[tonic::async_trait]
219impl<T> SnapshotService for Node<T>
220where
221 T: TypeConfig,
222{
223 type StreamSnapshotStream = tonic::Streaming<SnapshotChunk>;
224
225 async fn stream_snapshot(
226 &self,
227 request: tonic::Request<tonic::Streaming<SnapshotAck>>,
228 ) -> std::result::Result<tonic::Response<Self::StreamSnapshotStream>, tonic::Status> {
229 if !self.is_rpc_ready() {
230 warn!("stream_snapshot: Node-{} is not ready!", self.node_id);
231 return Err(Status::unavailable("Service is not ready"));
232 }
233
234 let (resp_tx, resp_rx) = StreamResponseSender::new();
235
236 self.event_tx
237 .send(RaftEvent::StreamSnapshot(
238 Box::new(request.into_inner()),
239 resp_tx,
240 ))
241 .await
242 .map_err(|_| Status::internal("Event channel closed"))?;
243
244 let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
245
246 handle_rpc_timeout(
247 async { resp_rx.await.map_err(|_| Status::internal("Response channel closed")) },
248 timeout_duration,
249 "stream_snapshot",
250 )
251 .await
252 }
253
254 async fn install_snapshot(
255 &self,
256 request: tonic::Request<Streaming<SnapshotChunk>>,
257 ) -> std::result::Result<tonic::Response<SnapshotResponse>, tonic::Status> {
258 if !self.is_rpc_ready() {
259 warn!("install_snapshot: Node-{} is not ready!", self.node_id);
260 return Err(Status::unavailable("Service is not ready"));
261 }
262
263 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
264
265 self.event_tx
266 .send(RaftEvent::InstallSnapshotChunk(
267 Box::new(request.into_inner()),
268 resp_tx,
269 ))
270 .await
271 .map_err(|_| Status::internal("Event channel closed"))?;
272
273 let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
274 handle_rpc_timeout(resp_rx, timeout_duration, "install_snapshot").await
275 }
276}
277
278#[tonic::async_trait]
279impl<T> ClusterManagementService for Node<T>
280where
281 T: TypeConfig,
282{
283 async fn update_cluster_conf(
289 &self,
290 request: tonic::Request<ClusterConfChangeRequest>,
291 ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
292 if !self.is_rpc_ready() {
293 warn!(
294 "[rpc|update_cluster_conf_from_leader] Node-{} is not ready!",
295 self.node_id
296 );
297 return Err(Status::unavailable("Service is not ready"));
298 }
299
300 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
301 self.event_tx
302 .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
303 .await
304 .map_err(|_| Status::internal("Event channel closed"))?;
305
306 let timeout_duration = Duration::from_millis(self.node_config.retry.membership.timeout_ms);
307 handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf_from_leader").await
308 }
309
310 async fn get_cluster_metadata(
315 &self,
316 request: tonic::Request<MetadataRequest>,
317 ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
318 debug!("receive get_cluster_metadata");
319 if !self.is_rpc_ready() {
320 warn!(
321 "[rpc|get_cluster_metadata] Node-{} is not ready!",
322 self.node_id
323 );
324 return Err(Status::unavailable("Service is not ready"));
325 }
326
327 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
328 self.event_tx
329 .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
330 .await
331 .map_err(|_| Status::internal("Event channel closed"))?;
332
333 let timeout_duration =
334 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
335 handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
336 }
337
338 async fn join_cluster(
340 &self,
341 request: tonic::Request<JoinRequest>,
342 ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
343 debug!("receive join_cluster");
344 if !self.is_rpc_ready() {
345 warn!("[rpc|join_cluster] Node-{} is not ready!", self.node_id);
346 return Err(Status::unavailable("Service is not ready"));
347 }
348
349 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
350 self.event_tx
351 .send(RaftEvent::JoinCluster(request.into_inner(), resp_tx))
352 .await
353 .map_err(|_| Status::internal("Event channel closed"))?;
354
355 let timeout_duration =
356 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
357 handle_rpc_timeout(resp_rx, timeout_duration, "join_cluster").await
358 }
359
360 async fn discover_leader(
361 &self,
362 request: tonic::Request<LeaderDiscoveryRequest>,
363 ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
364 debug!("receive discover_leader");
365 if !self.is_rpc_ready() {
366 warn!("[rpc|discover_leader] Node-{} is not ready!", self.node_id);
367 return Err(Status::unavailable("Service is not ready"));
368 }
369
370 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
371 self.event_tx
372 .send(RaftEvent::DiscoverLeader(request.into_inner(), resp_tx))
373 .await
374 .map_err(|_| Status::internal("Event channel closed"))?;
375
376 let timeout_duration =
377 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
378 handle_rpc_timeout(resp_rx, timeout_duration, "discover_leader").await
379 }
380}
381#[tonic::async_trait]
382impl<T> RaftClientService for Node<T>
383where
384 T: TypeConfig,
385{
386 type WatchStream =
387 Pin<Box<dyn Stream<Item = Result<d_engine_proto::client::WatchResponse, Status>> + Send>>;
388
389 type WatchMembershipStream =
390 Pin<Box<dyn Stream<Item = Result<ProtoMembershipSnapshot, Status>> + Send>>;
391
392 async fn handle_client_write(
398 &self,
399 request: tonic::Request<ClientWriteRequest>,
400 ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
401 if !self.is_rpc_ready() {
402 warn!("[handle_client_write] Node-{} is not ready!", self.node_id);
403 return Err(Status::unavailable("Service is not ready"));
404 }
405
406 let remote_addr = request.remote_addr();
407 let timeout_duration =
408 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
409
410 let cmd_tx = self.cmd_tx.clone();
412
413 let request_future = async move {
414 let proto_req: ClientWriteRequest = request.into_inner();
415 let operation_present =
416 proto_req.command.as_ref().and_then(|c| c.operation.as_ref()).is_some();
417 if !operation_present {
418 return Err(Status::invalid_argument(
419 "WriteCommand must contain an operation",
420 ));
421 }
422 let core_req = proto_convert::to_core_write_req(proto_req);
423
424 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
425 cmd_tx
426 .send(d_engine_core::ClientCmd::Propose(core_req, resp_tx))
427 .await
428 .map_err(|_| Status::internal("Command channel closed"))?;
429
430 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_write")
431 .await
432 .map(|resp| resp.map(proto_convert::to_proto_response))
433 };
434
435 let cancellation_future = async move {
436 warn!("Request from {:?} cancelled by client", remote_addr);
437 Err::<Response<ClientResponse>, Status>(Status::cancelled(
440 "Request cancelled by client",
441 ))
442 };
443
444 with_cancellation_handler(request_future, cancellation_future).await
445 }
446
447 async fn handle_client_read(
453 &self,
454 request: tonic::Request<ClientReadRequest>,
455 ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
456 if !self.is_rpc_ready() {
457 warn!("handle_client_read: Node-{} is not ready!", self.node_id);
458 return Err(Status::unavailable("Service is not ready"));
459 }
460
461 let proto_req = request.into_inner();
462 if let Some(raw) = proto_req.consistency_policy
463 && d_engine_proto::client::ReadConsistencyPolicy::try_from(raw).is_err()
464 {
465 warn!(
466 raw_value = raw,
467 "Unknown consistency_policy value received, degrading to cluster default"
468 );
469 }
470 let core_req = proto_convert::to_core_read_req(proto_req);
471 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
472 self.cmd_tx
473 .send(d_engine_core::ClientCmd::Read(core_req, resp_tx))
474 .await
475 .map_err(|_| Status::internal("Command channel closed"))?;
476
477 let timeout_duration =
478 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
479 handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read")
480 .await
481 .map(|resp| resp.map(proto_convert::to_proto_response))
482 }
483
484 async fn handle_client_scan(
491 &self,
492 request: tonic::Request<ScanRequest>,
493 ) -> std::result::Result<tonic::Response<ScanResponse>, tonic::Status> {
494 if !self.is_rpc_ready() {
495 warn!("handle_client_scan: Node-{} is not ready!", self.node_id);
496 return Err(Status::unavailable("Service is not ready"));
497 }
498
499 let req = request.into_inner();
500 let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
501
502 self.cmd_tx
503 .send(d_engine_core::ClientCmd::Scan(req.prefix, resp_tx))
504 .await
505 .map_err(|_| Status::internal("Command channel closed"))?;
506
507 let timeout_duration =
508 Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
509
510 let scan_result = handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_scan")
511 .await?
512 .into_inner();
513
514 Ok(tonic::Response::new(ScanResponse {
515 entries: scan_result
516 .entries
517 .into_iter()
518 .map(|(k, v)| KvEntry { key: k, value: v })
519 .collect(),
520 revision: scan_result.revision,
521 }))
522 }
523
524 #[cfg(feature = "watch")]
538 async fn watch(
539 &self,
540 request: tonic::Request<WatchRequest>,
541 ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
542 let watch_request = request.into_inner();
543 let key = watch_request.key;
544
545 let registry = self.watch_registry.as_ref().ok_or_else(|| {
547 Status::unavailable("Watch feature is disabled in server configuration")
548 })?;
549
550 let is_prefix = watch_request.prefix;
551 let prev_kv = watch_request.prev_kv;
552 info!(
553 node_id = self.node_id,
554 key = ?key,
555 is_prefix,
556 prev_kv,
557 "Registering watch for key"
558 );
559
560 let handle = if is_prefix {
562 registry.register_prefix(key, prev_kv).map_err(|e| match e {
563 WatchError::LimitExceeded(_) => Status::resource_exhausted(e.to_string()),
564 WatchError::InvalidPrefix => Status::invalid_argument(e.to_string()),
565 })?
566 } else {
567 registry
568 .register(key, prev_kv)
569 .map_err(|e| Status::resource_exhausted(e.to_string()))?
570 };
571 let (_watcher_id, _key, receiver) = handle.into_receiver();
572
573 let stream = tokio_stream::wrappers::ReceiverStream::new(receiver)
576 .map(|e| Ok(d_engine_proto::client::WatchResponse::from(&e)))
577 .chain(futures::stream::once(async {
578 Err(Status::unavailable(
579 "Watch stream closed: server may have shut down or restarted. Please reconnect and re-register the watcher."
580 ))
581 }));
582
583 Ok(tonic::Response::new(Box::pin(stream)))
584 }
585
586 #[cfg(not(feature = "watch"))]
587 async fn watch(
588 &self,
589 _request: tonic::Request<WatchRequest>,
590 ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
591 Err(Status::unimplemented(
592 "Watch feature is not compiled in this build",
593 ))
594 }
595
596 async fn watch_membership(
602 &self,
603 _request: tonic::Request<WatchMembershipRequest>,
604 ) -> std::result::Result<tonic::Response<Self::WatchMembershipStream>, tonic::Status> {
605 if !self.is_rpc_ready() {
606 warn!("[watch_membership] Node-{} is not ready", self.node_id);
607 return Err(Status::unavailable("Service is not ready"));
608 }
609
610 let mut rx = self.membership_rx.clone();
611 rx.mark_changed();
613
614 info!(node_id = self.node_id, "Membership watch stream opened");
615
616 let stream = tokio_stream::wrappers::WatchStream::new(rx)
617 .map(|s| {
618 Ok(ProtoMembershipSnapshot {
619 members: s.members.into_iter().collect(),
620 learners: s.learners.into_iter().collect(),
621 committed_index: s.committed_index,
622 })
623 })
624 .chain(futures::stream::once(async {
625 Err(Status::unavailable(
626 "Membership watch stream closed: server shut down. Reconnect to re-subscribe.",
627 ))
628 }));
629
630 Ok(tonic::Response::new(Box::pin(stream)))
631 }
632}
633
634pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
640 request_future: FRequest,
641 cancellation_future: FCancellation,
642) -> std::result::Result<Response<ClientResponse>, Status>
643where
644 FRequest:
645 Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
646 FCancellation:
647 Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
648{
649 let token = CancellationToken::new();
650 let _drop_guard = token.clone().drop_guard();
653 let select_task = tokio::spawn(async move {
654 select! {
657 res = request_future => res,
658 _ = token.cancelled() => cancellation_future.await,
659 }
660 });
661
662 select_task.await.unwrap()
663}
664
665async fn handle_rpc_timeout<T, E>(
674 resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
675 timeout_duration: Duration,
676 rpc_name: &'static str,
677) -> Result<Response<T>, Status>
678where
679 T: std::fmt::Debug,
680 E: std::fmt::Debug,
681{
682 debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
683
684 match timeout(timeout_duration, resp_rx).await {
685 Ok(Ok(Ok(response))) => {
686 debug!("[{}] Success response: {:?}", rpc_name, &response);
687 Ok(Response::new(response))
688 }
689 Ok(Ok(Err(status))) => {
690 error!("[{}] Error status: {:?}", rpc_name, &status);
691 Err(status)
692 }
693 Ok(Err(e)) => {
694 error!("[{}] Channel error: {:?}", rpc_name, e);
695 Err(Status::deadline_exceeded("RPC channel closed"))
696 }
697 Err(_) => {
698 warn!(
699 "[{}] Response timeout after {:?}",
700 rpc_name, timeout_duration
701 );
702 Err(Status::deadline_exceeded("RPC timeout exceeded"))
703 }
704 }
705}