d_engine_server/network/grpc/
grpc_raft_service.rs

1//! Raft gRPC service implementation handling RPC communication between cluster nodes
2//! and client requests. Implements core Raft protocol logic for leader election,
3//! log replication, and cluster configuration management.
4
5use std::future::Future;
6use std::pin::Pin;
7use std::time::Duration;
8
9use d_engine_core::MaybeCloneOneshot;
10use d_engine_core::RaftEvent;
11use d_engine_core::RaftOneshot;
12use d_engine_core::StreamResponseSender;
13use d_engine_core::TypeConfig;
14use d_engine_proto::client::ClientReadRequest;
15use d_engine_proto::client::ClientResponse;
16use d_engine_proto::client::ClientWriteRequest;
17use d_engine_proto::client::WatchRequest;
18use d_engine_proto::client::raft_client_service_server::RaftClientService;
19use d_engine_proto::server::cluster::ClusterConfChangeRequest;
20use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
21use d_engine_proto::server::cluster::ClusterMembership;
22use d_engine_proto::server::cluster::JoinRequest;
23use d_engine_proto::server::cluster::JoinResponse;
24use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
25use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
26use d_engine_proto::server::cluster::MetadataRequest;
27use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
28use d_engine_proto::server::election::VoteRequest;
29use d_engine_proto::server::election::VoteResponse;
30use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
31use d_engine_proto::server::replication::AppendEntriesRequest;
32use d_engine_proto::server::replication::AppendEntriesResponse;
33use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
34use d_engine_proto::server::storage::PurgeLogRequest;
35use d_engine_proto::server::storage::PurgeLogResponse;
36use d_engine_proto::server::storage::SnapshotAck;
37use d_engine_proto::server::storage::SnapshotChunk;
38use d_engine_proto::server::storage::SnapshotResponse;
39use d_engine_proto::server::storage::snapshot_service_server::SnapshotService;
40use futures::Stream;
41#[cfg(feature = "watch")]
42use futures::StreamExt;
43use tokio::select;
44use tokio::time::timeout;
45use tokio_util::sync::CancellationToken;
46use tonic::Request;
47use tonic::Response;
48use tonic::Status;
49use tonic::Streaming;
50use tracing::debug;
51use tracing::error;
52#[cfg(feature = "watch")]
53use tracing::info;
54use tracing::warn;
55
56use crate::Node;
57
58#[tonic::async_trait]
59impl<T> RaftElectionService for Node<T>
60where
61    T: TypeConfig,
62{
63    /// Handles RequestVote RPC calls from candidate nodes during leader elections
64    /// # Raft Protocol Logic
65    /// - Part of leader election mechanism (Section 5.2)
66    /// - Validates candidate's term and log completeness
67    /// - Grants vote if candidate's log is at least as up-to-date as local log
68    async fn request_vote(
69        &self,
70        request: tonic::Request<VoteRequest>,
71    ) -> std::result::Result<Response<VoteResponse>, Status> {
72        if !self.is_rpc_ready() {
73            warn!(
74                "[rpc|request_vote] My raft setup(Node:{}) is not ready!",
75                self.node_id
76            );
77            return Err(Status::unavailable("Service is not ready"));
78        }
79
80        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
81        self.event_tx
82            .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
83            .await
84            .map_err(|_| Status::internal("Event channel closed"))?;
85        let timeout_duration =
86            Duration::from_millis(self.node_config.raft.election.election_timeout_min);
87        handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
88    }
89}
90#[tonic::async_trait]
91impl<T> RaftReplicationService for Node<T>
92where
93    T: TypeConfig,
94{
95    /// Processes AppendEntries RPC calls from cluster leader
96    /// # Raft Protocol Logic
97    /// - Heartbeat mechanism (Section 5.2)
98    /// - Log replication entry point (Section 5.3)
99    /// - Term comparison logic:
100    ///   - If incoming term > current term: revert to follower state
101    ///   - Reset election timeout on valid leader communication
102    async fn append_entries(
103        &self,
104        request: Request<AppendEntriesRequest>,
105    ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
106        if !self.is_rpc_ready() {
107            warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
108            return Err(Status::unavailable("Service is not ready"));
109        }
110
111        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
112        self.event_tx
113            .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
114            .await
115            .map_err(|_| Status::internal("Event channel closed"))?;
116
117        let timeout_duration =
118            Duration::from_millis(self.node_config.retry.append_entries.timeout_ms);
119
120        handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
121    }
122}
123
124#[tonic::async_trait]
125impl<T> SnapshotService for Node<T>
126where
127    T: TypeConfig,
128{
129    type StreamSnapshotStream = tonic::Streaming<SnapshotChunk>;
130
131    async fn stream_snapshot(
132        &self,
133        request: tonic::Request<tonic::Streaming<SnapshotAck>>,
134    ) -> std::result::Result<tonic::Response<Self::StreamSnapshotStream>, tonic::Status> {
135        if !self.is_rpc_ready() {
136            warn!("stream_snapshot: Node-{} is not ready!", self.node_id);
137            return Err(Status::unavailable("Service is not ready"));
138        }
139
140        let (resp_tx, resp_rx) = StreamResponseSender::new();
141
142        self.event_tx
143            .send(RaftEvent::StreamSnapshot(
144                Box::new(request.into_inner()),
145                resp_tx,
146            ))
147            .await
148            .map_err(|_| Status::internal("Event channel closed"))?;
149
150        let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
151
152        handle_rpc_timeout(
153            async { resp_rx.await.map_err(|_| Status::internal("Response channel closed")) },
154            timeout_duration,
155            "stream_snapshot",
156        )
157        .await
158    }
159
160    async fn install_snapshot(
161        &self,
162        request: tonic::Request<Streaming<SnapshotChunk>>,
163    ) -> std::result::Result<tonic::Response<SnapshotResponse>, tonic::Status> {
164        if !self.is_rpc_ready() {
165            warn!("install_snapshot: Node-{} is not ready!", self.node_id);
166            return Err(Status::unavailable("Service is not ready"));
167        }
168
169        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
170
171        self.event_tx
172            .send(RaftEvent::InstallSnapshotChunk(
173                Box::new(request.into_inner()),
174                resp_tx,
175            ))
176            .await
177            .map_err(|_| Status::internal("Event channel closed"))?;
178
179        let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
180        handle_rpc_timeout(resp_rx, timeout_duration, "install_snapshot").await
181    }
182
183    async fn purge_log(
184        &self,
185        request: tonic::Request<PurgeLogRequest>,
186    ) -> std::result::Result<tonic::Response<PurgeLogResponse>, Status> {
187        if !self.is_rpc_ready() {
188            warn!("purge_log: Node-{} is not ready!", self.node_id);
189            return Err(Status::unavailable("Service is not ready"));
190        }
191
192        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
193
194        self.event_tx
195            .send(RaftEvent::RaftLogCleanUp(request.into_inner(), resp_tx))
196            .await
197            .map_err(|_| Status::internal("Event channel closed"))?;
198
199        let timeout_duration =
200            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
201        handle_rpc_timeout(resp_rx, timeout_duration, "purge_log").await
202    }
203}
204
205#[tonic::async_trait]
206impl<T> ClusterManagementService for Node<T>
207where
208    T: TypeConfig,
209{
210    /// Handles cluster membership changes (joint consensus)
211    /// # Raft Protocol Logic
212    /// - Implements cluster configuration changes (Section 6)
213    /// - Validates new configuration against current cluster state
214    /// - Ensures safety during membership transitions
215    async fn update_cluster_conf(
216        &self,
217        request: tonic::Request<ClusterConfChangeRequest>,
218    ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
219        if !self.is_rpc_ready() {
220            warn!(
221                "[rpc|update_cluster_conf_from_leader] Node-{} is not ready!",
222                self.node_id
223            );
224            return Err(Status::unavailable("Service is not ready"));
225        }
226
227        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
228        self.event_tx
229            .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
230            .await
231            .map_err(|_| Status::internal("Event channel closed"))?;
232
233        let timeout_duration = Duration::from_millis(self.node_config.retry.membership.timeout_ms);
234        handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf_from_leader").await
235    }
236
237    /// Returns current cluster membership and state metadata
238    /// # Usage
239    /// - Administrative API for cluster inspection
240    /// - Provides snapshot of current configuration
241    async fn get_cluster_metadata(
242        &self,
243        request: tonic::Request<MetadataRequest>,
244    ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
245        debug!("receive get_cluster_metadata");
246        if !self.is_rpc_ready() {
247            warn!(
248                "[rpc|get_cluster_metadata] Node-{} is not ready!",
249                self.node_id
250            );
251            return Err(Status::unavailable("Service is not ready"));
252        }
253
254        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
255        self.event_tx
256            .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
257            .await
258            .map_err(|_| Status::internal("Event channel closed"))?;
259
260        let timeout_duration =
261            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
262        handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
263    }
264
265    // Request to join the cluster as a new learner node
266    async fn join_cluster(
267        &self,
268        request: tonic::Request<JoinRequest>,
269    ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
270        debug!("receive join_cluster");
271        if !self.is_rpc_ready() {
272            warn!("[rpc|join_cluster] Node-{} is not ready!", self.node_id);
273            return Err(Status::unavailable("Service is not ready"));
274        }
275
276        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
277        self.event_tx
278            .send(RaftEvent::JoinCluster(request.into_inner(), resp_tx))
279            .await
280            .map_err(|_| Status::internal("Event channel closed"))?;
281
282        let timeout_duration =
283            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
284        handle_rpc_timeout(resp_rx, timeout_duration, "join_cluster").await
285    }
286
287    async fn discover_leader(
288        &self,
289        request: tonic::Request<LeaderDiscoveryRequest>,
290    ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
291        debug!("receive discover_leader");
292        if !self.is_rpc_ready() {
293            warn!("[rpc|discover_leader] Node-{} is not ready!", self.node_id);
294            return Err(Status::unavailable("Service is not ready"));
295        }
296
297        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
298        self.event_tx
299            .send(RaftEvent::DiscoverLeader(request.into_inner(), resp_tx))
300            .await
301            .map_err(|_| Status::internal("Event channel closed"))?;
302
303        let timeout_duration =
304            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
305        handle_rpc_timeout(resp_rx, timeout_duration, "discover_leader").await
306    }
307}
308#[tonic::async_trait]
309impl<T> RaftClientService for Node<T>
310where
311    T: TypeConfig,
312{
313    type WatchStream =
314        Pin<Box<dyn Stream<Item = Result<d_engine_proto::client::WatchResponse, Status>> + Send>>;
315
316    /// Processes client write requests requiring consensus
317    /// # Raft Protocol Logic
318    /// - Entry point for client proposals (Section 7)
319    /// - Validates requests before appending to leader's log
320    /// - Ensures linearizable writes through log replication
321    async fn handle_client_write(
322        &self,
323        request: tonic::Request<ClientWriteRequest>,
324    ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
325        if !self.is_rpc_ready() {
326            warn!("[handle_client_write] Node-{} is not ready!", self.node_id);
327            return Err(Status::unavailable("Service is not ready"));
328        }
329
330        let remote_addr = request.remote_addr();
331        let event_tx = self.event_tx.clone();
332        let timeout_duration =
333            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
334
335        let request_future = async move {
336            let req: ClientWriteRequest = request.into_inner();
337            // Extract request and validate
338            if req.commands.is_empty() {
339                return Err(Status::invalid_argument("Commands cannot be empty"));
340            }
341
342            let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
343            event_tx
344                .send(RaftEvent::ClientPropose(req, resp_tx))
345                .await
346                .map_err(|_| Status::internal("Event channel closed"))?;
347
348            handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_write").await
349        };
350
351        let cancellation_future = async move {
352            warn!("Request from {:?} cancelled by client", remote_addr);
353            // If this future is executed it means the request future was dropped,
354            // so it doesn't actually matter what is returned here
355            Err::<Response<ClientResponse>, Status>(Status::cancelled(
356                "Request cancelled by client",
357            ))
358        };
359
360        with_cancellation_handler(request_future, cancellation_future).await
361    }
362
363    /// Handles client read requests with linearizability guarantees
364    /// # Raft Protocol Logic
365    /// - Implements lease-based leader reads (Section 6.4)
366    /// - Verifies leadership before serving reads
367    /// - Ensures read-after-write consistency
368    async fn handle_client_read(
369        &self,
370        request: tonic::Request<ClientReadRequest>,
371    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
372        if !self.is_rpc_ready() {
373            warn!("handle_client_read: Node-{} is not ready!", self.node_id);
374            return Err(Status::unavailable("Service is not ready"));
375        }
376
377        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
378        self.event_tx
379            .send(RaftEvent::ClientReadRequest(request.into_inner(), resp_tx))
380            .await
381            .map_err(|_| Status::internal("Event channel closed"))?;
382
383        let timeout_duration =
384            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
385        handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read").await
386    }
387
388    /// Watch for changes to a specific key
389    ///
390    /// Returns a stream of events (PUT/DELETE) for the specified key.
391    /// The stream will continue until the client disconnects or the server shuts down.
392    ///
393    /// # Arguments
394    /// * `request` - Contains the key to watch
395    ///
396    /// # Returns
397    /// A stream of WatchResponse messages containing PUT/DELETE events
398    ///
399    /// # Errors
400    /// Returns Status::UNAVAILABLE if Watch is disabled in configuration
401    #[cfg(feature = "watch")]
402    async fn watch(
403        &self,
404        request: tonic::Request<WatchRequest>,
405    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
406        let watch_request = request.into_inner();
407        let key = watch_request.key;
408
409        // Check if watch registry is available
410        let registry = self.watch_registry.as_ref().ok_or_else(|| {
411            Status::unavailable("Watch feature is disabled in server configuration")
412        })?;
413
414        info!(
415            node_id = self.node_id,
416            key = ?key,
417            "Registering watch for key"
418        );
419
420        // Register watcher and get receiver
421        let handle = registry.register(key);
422        let (_watcher_id, _key, receiver) = handle.into_receiver();
423
424        // Convert mpsc::Receiver -> Boxed Stream with sentinel error on close
425        // When the stream ends (receiver closed), send an UNAVAILABLE error to help clients
426        // detect server shutdown/restart and implement reconnection logic
427        let stream = tokio_stream::wrappers::ReceiverStream::new(receiver)
428            .map(Ok)
429            .chain(futures::stream::once(async {
430                Err(Status::unavailable(
431                    "Watch stream closed: server may have shut down or restarted. Please reconnect and re-register the watcher."
432                ))
433            }));
434
435        Ok(tonic::Response::new(Box::pin(stream)))
436    }
437
438    #[cfg(not(feature = "watch"))]
439    async fn watch(
440        &self,
441        _request: tonic::Request<WatchRequest>,
442    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
443        Err(Status::unimplemented(
444            "Watch feature is not compiled in this build",
445        ))
446    }
447}
448
449/// Gracefully handles client request cancellations
450/// # Functionality
451/// - Manages cleanup of abandoned requests
452/// - Tracks request cancellation metrics
453/// - Prevents resource leaks from dropped requests
454pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
455    request_future: FRequest,
456    cancellation_future: FCancellation,
457) -> std::result::Result<Response<ClientResponse>, Status>
458where
459    FRequest:
460        Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
461    FCancellation:
462        Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
463{
464    let token = CancellationToken::new();
465    // Will call token.cancel() when the future is dropped, such as when the client
466    // cancels the request
467    let _drop_guard = token.clone().drop_guard();
468    let select_task = tokio::spawn(async move {
469        // Can select on token cancellation on any cancellable future while handling the
470        // request, allowing for custom cleanup code or monitoring
471        select! {
472            res = request_future => res,
473            _ = token.cancelled() => cancellation_future.await,
474        }
475    });
476
477    select_task.await.unwrap()
478}
479
480/// Centralized timeout handler for all RPC operations
481/// # Features
482/// - Uniform timeout enforcement across RPC types
483/// - Detailed error categorization:
484///   - Channel errors
485///   - Application-level errors
486///   - Deadline exceeded
487/// - Logging and metrics integration
488async fn handle_rpc_timeout<T, E>(
489    resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
490    timeout_duration: Duration,
491    rpc_name: &'static str,
492) -> Result<Response<T>, Status>
493where
494    T: std::fmt::Debug,
495    E: std::fmt::Debug,
496{
497    debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
498
499    match timeout(timeout_duration, resp_rx).await {
500        Ok(Ok(Ok(response))) => {
501            debug!("[{}] Success response: {:?}", rpc_name, &response);
502            Ok(Response::new(response))
503        }
504        Ok(Ok(Err(status))) => {
505            error!("[{}] Error status: {:?}", rpc_name, &status);
506            Err(status)
507        }
508        Ok(Err(e)) => {
509            error!("[{}] Channel error: {:?}", rpc_name, e);
510            Err(Status::deadline_exceeded("RPC channel closed"))
511        }
512        Err(_) => {
513            warn!(
514                "[{}] Response timeout after {:?}",
515                rpc_name, timeout_duration
516            );
517            Err(Status::deadline_exceeded("RPC timeout exceeded"))
518        }
519    }
520}