Skip to main content

d_engine_server/network/grpc/
grpc_raft_service.rs

1//! Raft gRPC service implementation handling RPC communication between cluster nodes
2//! and client requests. Implements core Raft protocol logic for leader election,
3//! log replication, and cluster configuration management.
4
5use std::future::Future;
6use std::pin::Pin;
7use std::time::Duration;
8
9use d_engine_core::MaybeCloneOneshot;
10use d_engine_core::RaftEvent;
11use d_engine_core::RaftOneshot;
12use d_engine_core::StreamResponseSender;
13use d_engine_core::TypeConfig;
14use d_engine_proto::client::ClientReadRequest;
15use d_engine_proto::client::ClientResponse;
16use d_engine_proto::client::ClientWriteRequest;
17use d_engine_proto::client::WatchRequest;
18use d_engine_proto::client::raft_client_service_server::RaftClientService;
19use d_engine_proto::server::cluster::ClusterConfChangeRequest;
20use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
21use d_engine_proto::server::cluster::ClusterMembership;
22use d_engine_proto::server::cluster::JoinRequest;
23use d_engine_proto::server::cluster::JoinResponse;
24use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
25use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
26use d_engine_proto::server::cluster::MetadataRequest;
27use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
28use d_engine_proto::server::election::VoteRequest;
29use d_engine_proto::server::election::VoteResponse;
30use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
31use d_engine_proto::server::replication::AppendEntriesRequest;
32use d_engine_proto::server::replication::AppendEntriesResponse;
33use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
34use d_engine_proto::server::storage::SnapshotAck;
35use d_engine_proto::server::storage::SnapshotChunk;
36use d_engine_proto::server::storage::SnapshotResponse;
37use d_engine_proto::server::storage::snapshot_service_server::SnapshotService;
38use futures::Stream;
39#[cfg(feature = "watch")]
40use futures::StreamExt;
41use tokio::select;
42use tokio::time::timeout;
43use tokio_util::sync::CancellationToken;
44use tonic::Request;
45use tonic::Response;
46use tonic::Status;
47use tonic::Streaming;
48use tracing::debug;
49use tracing::error;
50#[cfg(feature = "watch")]
51use tracing::info;
52use tracing::warn;
53
54use crate::Node;
55
56#[tonic::async_trait]
57impl<T> RaftElectionService for Node<T>
58where
59    T: TypeConfig,
60{
61    /// Handles RequestVote RPC calls from candidate nodes during leader elections
62    /// # Raft Protocol Logic
63    /// - Part of leader election mechanism (Section 5.2)
64    /// - Validates candidate's term and log completeness
65    /// - Grants vote if candidate's log is at least as up-to-date as local log
66    async fn request_vote(
67        &self,
68        request: tonic::Request<VoteRequest>,
69    ) -> std::result::Result<Response<VoteResponse>, Status> {
70        if !self.is_rpc_ready() {
71            warn!(
72                "[rpc|request_vote] My raft setup(Node:{}) is not ready!",
73                self.node_id
74            );
75            return Err(Status::unavailable("Service is not ready"));
76        }
77
78        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
79        self.event_tx
80            .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
81            .await
82            .map_err(|_| Status::internal("Event channel closed"))?;
83        let timeout_duration =
84            Duration::from_millis(self.node_config.raft.election.election_timeout_min);
85        handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
86    }
87}
88#[tonic::async_trait]
89impl<T> RaftReplicationService for Node<T>
90where
91    T: TypeConfig,
92{
93    /// Processes AppendEntries RPC calls from cluster leader
94    /// # Raft Protocol Logic
95    /// - Heartbeat mechanism (Section 5.2)
96    /// - Log replication entry point (Section 5.3)
97    /// - Term comparison logic:
98    ///   - If incoming term > current term: revert to follower state
99    ///   - Reset election timeout on valid leader communication
100    async fn append_entries(
101        &self,
102        request: Request<AppendEntriesRequest>,
103    ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
104        if !self.is_rpc_ready() {
105            warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
106            return Err(Status::unavailable("Service is not ready"));
107        }
108
109        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
110        self.event_tx
111            .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
112            .await
113            .map_err(|_| Status::internal("Event channel closed"))?;
114
115        let timeout_duration =
116            Duration::from_millis(self.node_config.retry.append_entries.timeout_ms);
117
118        handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
119    }
120}
121
122#[tonic::async_trait]
123impl<T> SnapshotService for Node<T>
124where
125    T: TypeConfig,
126{
127    type StreamSnapshotStream = tonic::Streaming<SnapshotChunk>;
128
129    async fn stream_snapshot(
130        &self,
131        request: tonic::Request<tonic::Streaming<SnapshotAck>>,
132    ) -> std::result::Result<tonic::Response<Self::StreamSnapshotStream>, tonic::Status> {
133        if !self.is_rpc_ready() {
134            warn!("stream_snapshot: Node-{} is not ready!", self.node_id);
135            return Err(Status::unavailable("Service is not ready"));
136        }
137
138        let (resp_tx, resp_rx) = StreamResponseSender::new();
139
140        self.event_tx
141            .send(RaftEvent::StreamSnapshot(
142                Box::new(request.into_inner()),
143                resp_tx,
144            ))
145            .await
146            .map_err(|_| Status::internal("Event channel closed"))?;
147
148        let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
149
150        handle_rpc_timeout(
151            async { resp_rx.await.map_err(|_| Status::internal("Response channel closed")) },
152            timeout_duration,
153            "stream_snapshot",
154        )
155        .await
156    }
157
158    async fn install_snapshot(
159        &self,
160        request: tonic::Request<Streaming<SnapshotChunk>>,
161    ) -> std::result::Result<tonic::Response<SnapshotResponse>, tonic::Status> {
162        if !self.is_rpc_ready() {
163            warn!("install_snapshot: Node-{} is not ready!", self.node_id);
164            return Err(Status::unavailable("Service is not ready"));
165        }
166
167        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
168
169        self.event_tx
170            .send(RaftEvent::InstallSnapshotChunk(
171                Box::new(request.into_inner()),
172                resp_tx,
173            ))
174            .await
175            .map_err(|_| Status::internal("Event channel closed"))?;
176
177        let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
178        handle_rpc_timeout(resp_rx, timeout_duration, "install_snapshot").await
179    }
180}
181
182#[tonic::async_trait]
183impl<T> ClusterManagementService for Node<T>
184where
185    T: TypeConfig,
186{
187    /// Handles cluster membership changes (joint consensus)
188    /// # Raft Protocol Logic
189    /// - Implements cluster configuration changes (Section 6)
190    /// - Validates new configuration against current cluster state
191    /// - Ensures safety during membership transitions
192    async fn update_cluster_conf(
193        &self,
194        request: tonic::Request<ClusterConfChangeRequest>,
195    ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
196        if !self.is_rpc_ready() {
197            warn!(
198                "[rpc|update_cluster_conf_from_leader] Node-{} is not ready!",
199                self.node_id
200            );
201            return Err(Status::unavailable("Service is not ready"));
202        }
203
204        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
205        self.event_tx
206            .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
207            .await
208            .map_err(|_| Status::internal("Event channel closed"))?;
209
210        let timeout_duration = Duration::from_millis(self.node_config.retry.membership.timeout_ms);
211        handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf_from_leader").await
212    }
213
214    /// Returns current cluster membership and state metadata
215    /// # Usage
216    /// - Administrative API for cluster inspection
217    /// - Provides snapshot of current configuration
218    async fn get_cluster_metadata(
219        &self,
220        request: tonic::Request<MetadataRequest>,
221    ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
222        debug!("receive get_cluster_metadata");
223        if !self.is_rpc_ready() {
224            warn!(
225                "[rpc|get_cluster_metadata] Node-{} is not ready!",
226                self.node_id
227            );
228            return Err(Status::unavailable("Service is not ready"));
229        }
230
231        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
232        self.event_tx
233            .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
234            .await
235            .map_err(|_| Status::internal("Event channel closed"))?;
236
237        let timeout_duration =
238            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
239        handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
240    }
241
242    // Request to join the cluster as a new learner node
243    async fn join_cluster(
244        &self,
245        request: tonic::Request<JoinRequest>,
246    ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
247        debug!("receive join_cluster");
248        if !self.is_rpc_ready() {
249            warn!("[rpc|join_cluster] Node-{} is not ready!", self.node_id);
250            return Err(Status::unavailable("Service is not ready"));
251        }
252
253        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
254        self.event_tx
255            .send(RaftEvent::JoinCluster(request.into_inner(), resp_tx))
256            .await
257            .map_err(|_| Status::internal("Event channel closed"))?;
258
259        let timeout_duration =
260            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
261        handle_rpc_timeout(resp_rx, timeout_duration, "join_cluster").await
262    }
263
264    async fn discover_leader(
265        &self,
266        request: tonic::Request<LeaderDiscoveryRequest>,
267    ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
268        debug!("receive discover_leader");
269        if !self.is_rpc_ready() {
270            warn!("[rpc|discover_leader] Node-{} is not ready!", self.node_id);
271            return Err(Status::unavailable("Service is not ready"));
272        }
273
274        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
275        self.event_tx
276            .send(RaftEvent::DiscoverLeader(request.into_inner(), resp_tx))
277            .await
278            .map_err(|_| Status::internal("Event channel closed"))?;
279
280        let timeout_duration =
281            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
282        handle_rpc_timeout(resp_rx, timeout_duration, "discover_leader").await
283    }
284}
285#[tonic::async_trait]
286impl<T> RaftClientService for Node<T>
287where
288    T: TypeConfig,
289{
290    type WatchStream =
291        Pin<Box<dyn Stream<Item = Result<d_engine_proto::client::WatchResponse, Status>> + Send>>;
292
293    /// Processes client write requests requiring consensus
294    /// # Raft Protocol Logic
295    /// - Entry point for client proposals (Section 7)
296    /// - Validates requests before appending to leader's log
297    /// - Ensures linearizable writes through log replication
298    async fn handle_client_write(
299        &self,
300        request: tonic::Request<ClientWriteRequest>,
301    ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
302        if !self.is_rpc_ready() {
303            warn!("[handle_client_write] Node-{} is not ready!", self.node_id);
304            return Err(Status::unavailable("Service is not ready"));
305        }
306
307        let remote_addr = request.remote_addr();
308        let timeout_duration =
309            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
310
311        // Clone cmd_tx before async move to avoid capturing &self
312        let cmd_tx = self.cmd_tx.clone();
313
314        let request_future = async move {
315            let req: ClientWriteRequest = request.into_inner();
316            // Extract request and validate
317            if req.command.is_none() {
318                return Err(Status::invalid_argument("Command cannot be empty"));
319            }
320
321            let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
322            cmd_tx
323                .send(d_engine_core::ClientCmd::Propose(req, resp_tx))
324                .map_err(|_| Status::internal("Command channel closed"))?;
325
326            handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_write").await
327        };
328
329        let cancellation_future = async move {
330            warn!("Request from {:?} cancelled by client", remote_addr);
331            // If this future is executed it means the request future was dropped,
332            // so it doesn't actually matter what is returned here
333            Err::<Response<ClientResponse>, Status>(Status::cancelled(
334                "Request cancelled by client",
335            ))
336        };
337
338        with_cancellation_handler(request_future, cancellation_future).await
339    }
340
341    /// Handles client read requests with linearizability guarantees
342    /// # Raft Protocol Logic
343    /// - Implements lease-based leader reads (Section 6.4)
344    /// - Verifies leadership before serving reads
345    /// - Ensures read-after-write consistency
346    async fn handle_client_read(
347        &self,
348        request: tonic::Request<ClientReadRequest>,
349    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
350        if !self.is_rpc_ready() {
351            warn!("handle_client_read: Node-{} is not ready!", self.node_id);
352            return Err(Status::unavailable("Service is not ready"));
353        }
354
355        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
356        self.cmd_tx
357            .send(d_engine_core::ClientCmd::Read(
358                request.into_inner(),
359                resp_tx,
360            ))
361            .map_err(|_| Status::internal("Command channel closed"))?;
362
363        let timeout_duration =
364            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
365        handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read").await
366    }
367
368    /// Watch for changes to a specific key
369    ///
370    /// Returns a stream of events (PUT/DELETE) for the specified key.
371    /// The stream will continue until the client disconnects or the server shuts down.
372    ///
373    /// # Arguments
374    /// * `request` - Contains the key to watch
375    ///
376    /// # Returns
377    /// A stream of WatchResponse messages containing PUT/DELETE events
378    ///
379    /// # Errors
380    /// Returns Status::UNAVAILABLE if Watch is disabled in configuration
381    #[cfg(feature = "watch")]
382    async fn watch(
383        &self,
384        request: tonic::Request<WatchRequest>,
385    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
386        let watch_request = request.into_inner();
387        let key = watch_request.key;
388
389        // Check if watch registry is available
390        let registry = self.watch_registry.as_ref().ok_or_else(|| {
391            Status::unavailable("Watch feature is disabled in server configuration")
392        })?;
393
394        info!(
395            node_id = self.node_id,
396            key = ?key,
397            "Registering watch for key"
398        );
399
400        // Register watcher and get receiver
401        let handle = registry.register(key);
402        let (_watcher_id, _key, receiver) = handle.into_receiver();
403
404        // Convert mpsc::Receiver -> Boxed Stream with sentinel error on close
405        // When the stream ends (receiver closed), send an UNAVAILABLE error to help clients
406        // detect server shutdown/restart and implement reconnection logic
407        let stream = tokio_stream::wrappers::ReceiverStream::new(receiver)
408            .map(Ok)
409            .chain(futures::stream::once(async {
410                Err(Status::unavailable(
411                    "Watch stream closed: server may have shut down or restarted. Please reconnect and re-register the watcher."
412                ))
413            }));
414
415        Ok(tonic::Response::new(Box::pin(stream)))
416    }
417
418    #[cfg(not(feature = "watch"))]
419    async fn watch(
420        &self,
421        _request: tonic::Request<WatchRequest>,
422    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
423        Err(Status::unimplemented(
424            "Watch feature is not compiled in this build",
425        ))
426    }
427}
428
429/// Gracefully handles client request cancellations
430/// # Functionality
431/// - Manages cleanup of abandoned requests
432/// - Tracks request cancellation metrics
433/// - Prevents resource leaks from dropped requests
434pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
435    request_future: FRequest,
436    cancellation_future: FCancellation,
437) -> std::result::Result<Response<ClientResponse>, Status>
438where
439    FRequest:
440        Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
441    FCancellation:
442        Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
443{
444    let token = CancellationToken::new();
445    // Will call token.cancel() when the future is dropped, such as when the client
446    // cancels the request
447    let _drop_guard = token.clone().drop_guard();
448    let select_task = tokio::spawn(async move {
449        // Can select on token cancellation on any cancellable future while handling the
450        // request, allowing for custom cleanup code or monitoring
451        select! {
452            res = request_future => res,
453            _ = token.cancelled() => cancellation_future.await,
454        }
455    });
456
457    select_task.await.unwrap()
458}
459
460/// Centralized timeout handler for all RPC operations
461/// # Features
462/// - Uniform timeout enforcement across RPC types
463/// - Detailed error categorization:
464///   - Channel errors
465///   - Application-level errors
466///   - Deadline exceeded
467/// - Logging and metrics integration
468async fn handle_rpc_timeout<T, E>(
469    resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
470    timeout_duration: Duration,
471    rpc_name: &'static str,
472) -> Result<Response<T>, Status>
473where
474    T: std::fmt::Debug,
475    E: std::fmt::Debug,
476{
477    debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
478
479    match timeout(timeout_duration, resp_rx).await {
480        Ok(Ok(Ok(response))) => {
481            debug!("[{}] Success response: {:?}", rpc_name, &response);
482            Ok(Response::new(response))
483        }
484        Ok(Ok(Err(status))) => {
485            error!("[{}] Error status: {:?}", rpc_name, &status);
486            Err(status)
487        }
488        Ok(Err(e)) => {
489            error!("[{}] Channel error: {:?}", rpc_name, e);
490            Err(Status::deadline_exceeded("RPC channel closed"))
491        }
492        Err(_) => {
493            warn!(
494                "[{}] Response timeout after {:?}",
495                rpc_name, timeout_duration
496            );
497            Err(Status::deadline_exceeded("RPC timeout exceeded"))
498        }
499    }
500}