Skip to main content

d_engine_server/network/grpc/
grpc_raft_service.rs

1//! Raft gRPC service implementation handling RPC communication between cluster nodes
2//! and client requests. Implements core Raft protocol logic for leader election,
3//! log replication, and cluster configuration management.
4
5use std::future::Future;
6use std::pin::Pin;
7use std::time::Duration;
8
9use d_engine_core::MaybeCloneOneshot;
10use d_engine_core::MaybeCloneOneshotReceiver;
11use d_engine_core::RaftEvent;
12use d_engine_core::RaftOneshot;
13use d_engine_core::StreamResponseSender;
14use d_engine_core::TypeConfig;
15#[cfg(feature = "watch")]
16use d_engine_core::WatchError;
17use d_engine_proto::client::ClientReadRequest;
18use d_engine_proto::client::ClientResponse;
19use d_engine_proto::client::ClientWriteRequest;
20use d_engine_proto::client::KvEntry;
21use d_engine_proto::client::MembershipSnapshot as ProtoMembershipSnapshot;
22use d_engine_proto::client::ScanRequest;
23use d_engine_proto::client::ScanResponse;
24use d_engine_proto::client::WatchMembershipRequest;
25use d_engine_proto::client::WatchRequest;
26use d_engine_proto::client::raft_client_service_server::RaftClientService;
27use d_engine_proto::server::cluster::ClusterConfChangeRequest;
28use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
29use d_engine_proto::server::cluster::ClusterMembership;
30use d_engine_proto::server::cluster::JoinRequest;
31use d_engine_proto::server::cluster::JoinResponse;
32use d_engine_proto::server::cluster::LeaderDiscoveryRequest;
33use d_engine_proto::server::cluster::LeaderDiscoveryResponse;
34use d_engine_proto::server::cluster::MetadataRequest;
35use d_engine_proto::server::cluster::cluster_management_service_server::ClusterManagementService;
36use d_engine_proto::server::election::VoteRequest;
37use d_engine_proto::server::election::VoteResponse;
38use d_engine_proto::server::election::raft_election_service_server::RaftElectionService;
39use d_engine_proto::server::replication::AppendEntriesRequest;
40use d_engine_proto::server::replication::AppendEntriesResponse;
41use d_engine_proto::server::replication::raft_replication_service_server::RaftReplicationService;
42use d_engine_proto::server::storage::SnapshotAck;
43use d_engine_proto::server::storage::SnapshotChunk;
44use d_engine_proto::server::storage::SnapshotResponse;
45use d_engine_proto::server::storage::snapshot_service_server::SnapshotService;
46use futures::Stream;
47use futures::StreamExt;
48use tokio::select;
49use tokio::sync::mpsc;
50use tokio::time::timeout;
51use tokio_util::sync::CancellationToken;
52use tonic::Request;
53use tonic::Response;
54use tonic::Status;
55use tonic::Streaming;
56use tracing::debug;
57use tracing::error;
58use tracing::info;
59use tracing::warn;
60
61use crate::Node;
62use crate::proto_convert;
63
64#[tonic::async_trait]
65impl<T> RaftElectionService for Node<T>
66where
67    T: TypeConfig,
68{
69    /// Handles RequestVote RPC calls from candidate nodes during leader elections
70    /// # Raft Protocol Logic
71    /// - Part of leader election mechanism (Section 5.2)
72    /// - Validates candidate's term and log completeness
73    /// - Grants vote if candidate's log is at least as up-to-date as local log
74    async fn request_vote(
75        &self,
76        request: tonic::Request<VoteRequest>,
77    ) -> std::result::Result<Response<VoteResponse>, Status> {
78        if !self.is_rpc_ready() {
79            warn!(
80                "[rpc|request_vote] My raft setup(Node:{}) is not ready!",
81                self.node_id
82            );
83            return Err(Status::unavailable("Service is not ready"));
84        }
85
86        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
87        self.event_tx
88            .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
89            .await
90            .map_err(|_| Status::internal("Event channel closed"))?;
91        let timeout_duration =
92            Duration::from_millis(self.node_config.raft.election.election_timeout_min);
93        handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
94    }
95}
96#[tonic::async_trait]
97impl<T> RaftReplicationService for Node<T>
98where
99    T: TypeConfig,
100{
101    /// Processes AppendEntries RPC calls from cluster leader
102    /// # Raft Protocol Logic
103    /// - Heartbeat mechanism (Section 5.2)
104    /// - Log replication entry point (Section 5.3)
105    /// - Term comparison logic:
106    ///   - If incoming term > current term: revert to follower state
107    ///   - Reset election timeout on valid leader communication
108    async fn append_entries(
109        &self,
110        request: Request<AppendEntriesRequest>,
111    ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
112        if !self.is_rpc_ready() {
113            warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
114            return Err(Status::unavailable("Service is not ready"));
115        }
116
117        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
118        self.event_tx
119            .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
120            .await
121            .map_err(|_| Status::internal("Event channel closed"))?;
122
123        let timeout_duration =
124            Duration::from_millis(self.node_config.retry.append_entries.timeout_ms);
125
126        handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
127    }
128
129    type StreamAppendEntriesStream =
130        Pin<Box<dyn Stream<Item = Result<AppendEntriesResponse, Status>> + Send>>;
131
132    /// Processes a persistent bidirectional AppendEntries stream from the cluster leader.
133    ///
134    /// Decouples request ingestion from response emission:
135    /// - recv task: reads batches from the stream, dispatches each as a `RaftEvent::AppendEntries`
136    ///   (non-blocking between batches)
137    /// - forwarder task: drains ordered response handles sequentially; ordering is guaranteed
138    ///   by the Raft single-threaded event loop
139    async fn stream_append_entries(
140        &self,
141        request: tonic::Request<tonic::Streaming<AppendEntriesRequest>>,
142    ) -> std::result::Result<tonic::Response<Self::StreamAppendEntriesStream>, tonic::Status> {
143        if !self.is_rpc_ready() {
144            warn!(
145                "[rpc|stream_append_entries] Node-{} is not ready!",
146                self.node_id
147            );
148            return Err(Status::unavailable("Service is not ready"));
149        }
150
151        let mut in_stream = request.into_inner();
152        let event_tx = self.event_tx.clone();
153        let ordered_channel_capacity = self.node_config.raft.ordered_channel_capacity;
154        let mut shutdown = self.shutdown_signal.clone();
155
156        // Output: ordered ACKs sent back to the leader over the bidi stream
157        let (out_tx, out_rx) = mpsc::channel::<Result<AppendEntriesResponse, Status>>(128);
158
159        // Ordered queue: response oneshot receivers in FIFO arrival order
160        let (ordered_tx, mut ordered_rx) = mpsc::channel::<
161            MaybeCloneOneshotReceiver<Result<AppendEntriesResponse, Status>>,
162        >(ordered_channel_capacity);
163
164        // Recv task: read batches, dispatch to Raft loop without waiting for each ACK.
165        // Selects on shutdown signal so the task exits immediately on node stop, rather
166        // than waiting for the next message from the leader. This unblocks serve_with_shutdown
167        // and allows Arc<Node> (and Arc<DB>) to be released promptly after stop().
168        tokio::spawn(async move {
169            use futures::StreamExt;
170            loop {
171                tokio::select! {
172                    biased;
173                    _ = shutdown.changed() => {
174                        break;
175                    }
176                    result = in_stream.next() => {
177                        match result {
178                            Some(Ok(req)) => {
179                                let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
180                                if event_tx.send(RaftEvent::AppendEntries(req, resp_tx)).await.is_err() {
181                                    warn!("[stream_append_entries|recv] event_tx closed");
182                                    break;
183                                }
184                                if ordered_tx.send(resp_rx).await.is_err() {
185                                    break;
186                                }
187                            }
188                            Some(Err(e)) => {
189                                warn!("[stream_append_entries|recv] stream error: {:?}", e);
190                                break;
191                            }
192                            None => break,
193                        }
194                    }
195                }
196            }
197        });
198
199        // Forwarder task: drain ordered queue sequentially (FIFO guaranteed by Raft loop)
200        tokio::spawn(async move {
201            while let Some(resp_rx) = ordered_rx.recv().await {
202                let result = match resp_rx.await {
203                    Ok(Ok(resp)) => Ok(resp),
204                    Ok(Err(status)) => Err(status),
205                    Err(_) => Err(Status::internal("Response channel closed")),
206                };
207                if out_tx.send(result).await.is_err() {
208                    break;
209                }
210            }
211        });
212
213        let out_stream = tokio_stream::wrappers::ReceiverStream::new(out_rx);
214        Ok(tonic::Response::new(Box::pin(out_stream)))
215    }
216}
217
218#[tonic::async_trait]
219impl<T> SnapshotService for Node<T>
220where
221    T: TypeConfig,
222{
223    type StreamSnapshotStream = tonic::Streaming<SnapshotChunk>;
224
225    async fn stream_snapshot(
226        &self,
227        request: tonic::Request<tonic::Streaming<SnapshotAck>>,
228    ) -> std::result::Result<tonic::Response<Self::StreamSnapshotStream>, tonic::Status> {
229        if !self.is_rpc_ready() {
230            warn!("stream_snapshot: Node-{} is not ready!", self.node_id);
231            return Err(Status::unavailable("Service is not ready"));
232        }
233
234        let (resp_tx, resp_rx) = StreamResponseSender::new();
235
236        self.event_tx
237            .send(RaftEvent::StreamSnapshot(
238                Box::new(request.into_inner()),
239                resp_tx,
240            ))
241            .await
242            .map_err(|_| Status::internal("Event channel closed"))?;
243
244        let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
245
246        handle_rpc_timeout(
247            async { resp_rx.await.map_err(|_| Status::internal("Response channel closed")) },
248            timeout_duration,
249            "stream_snapshot",
250        )
251        .await
252    }
253
254    async fn install_snapshot(
255        &self,
256        request: tonic::Request<Streaming<SnapshotChunk>>,
257    ) -> std::result::Result<tonic::Response<SnapshotResponse>, tonic::Status> {
258        if !self.is_rpc_ready() {
259            warn!("install_snapshot: Node-{} is not ready!", self.node_id);
260            return Err(Status::unavailable("Service is not ready"));
261        }
262
263        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
264
265        self.event_tx
266            .send(RaftEvent::InstallSnapshotChunk(
267                Box::new(request.into_inner()),
268                resp_tx,
269            ))
270            .await
271            .map_err(|_| Status::internal("Event channel closed"))?;
272
273        let timeout_duration = Duration::from_millis(self.node_config.raft.snapshot_rpc_timeout_ms);
274        handle_rpc_timeout(resp_rx, timeout_duration, "install_snapshot").await
275    }
276}
277
278#[tonic::async_trait]
279impl<T> ClusterManagementService for Node<T>
280where
281    T: TypeConfig,
282{
283    /// Handles cluster membership changes (joint consensus)
284    /// # Raft Protocol Logic
285    /// - Implements cluster configuration changes (Section 6)
286    /// - Validates new configuration against current cluster state
287    /// - Ensures safety during membership transitions
288    async fn update_cluster_conf(
289        &self,
290        request: tonic::Request<ClusterConfChangeRequest>,
291    ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
292        if !self.is_rpc_ready() {
293            warn!(
294                "[rpc|update_cluster_conf_from_leader] Node-{} is not ready!",
295                self.node_id
296            );
297            return Err(Status::unavailable("Service is not ready"));
298        }
299
300        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
301        self.event_tx
302            .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
303            .await
304            .map_err(|_| Status::internal("Event channel closed"))?;
305
306        let timeout_duration = Duration::from_millis(self.node_config.retry.membership.timeout_ms);
307        handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf_from_leader").await
308    }
309
310    /// Returns current cluster membership and state metadata
311    /// # Usage
312    /// - Administrative API for cluster inspection
313    /// - Provides snapshot of current configuration
314    async fn get_cluster_metadata(
315        &self,
316        request: tonic::Request<MetadataRequest>,
317    ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
318        debug!("receive get_cluster_metadata");
319        if !self.is_rpc_ready() {
320            warn!(
321                "[rpc|get_cluster_metadata] Node-{} is not ready!",
322                self.node_id
323            );
324            return Err(Status::unavailable("Service is not ready"));
325        }
326
327        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
328        self.event_tx
329            .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
330            .await
331            .map_err(|_| Status::internal("Event channel closed"))?;
332
333        let timeout_duration =
334            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
335        handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
336    }
337
338    // Request to join the cluster as a new learner node
339    async fn join_cluster(
340        &self,
341        request: tonic::Request<JoinRequest>,
342    ) -> std::result::Result<tonic::Response<JoinResponse>, tonic::Status> {
343        debug!("receive join_cluster");
344        if !self.is_rpc_ready() {
345            warn!("[rpc|join_cluster] Node-{} is not ready!", self.node_id);
346            return Err(Status::unavailable("Service is not ready"));
347        }
348
349        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
350        self.event_tx
351            .send(RaftEvent::JoinCluster(request.into_inner(), resp_tx))
352            .await
353            .map_err(|_| Status::internal("Event channel closed"))?;
354
355        let timeout_duration =
356            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
357        handle_rpc_timeout(resp_rx, timeout_duration, "join_cluster").await
358    }
359
360    async fn discover_leader(
361        &self,
362        request: tonic::Request<LeaderDiscoveryRequest>,
363    ) -> std::result::Result<tonic::Response<LeaderDiscoveryResponse>, tonic::Status> {
364        debug!("receive discover_leader");
365        if !self.is_rpc_ready() {
366            warn!("[rpc|discover_leader] Node-{} is not ready!", self.node_id);
367            return Err(Status::unavailable("Service is not ready"));
368        }
369
370        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
371        self.event_tx
372            .send(RaftEvent::DiscoverLeader(request.into_inner(), resp_tx))
373            .await
374            .map_err(|_| Status::internal("Event channel closed"))?;
375
376        let timeout_duration =
377            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
378        handle_rpc_timeout(resp_rx, timeout_duration, "discover_leader").await
379    }
380}
381#[tonic::async_trait]
382impl<T> RaftClientService for Node<T>
383where
384    T: TypeConfig,
385{
386    type WatchStream =
387        Pin<Box<dyn Stream<Item = Result<d_engine_proto::client::WatchResponse, Status>> + Send>>;
388
389    type WatchMembershipStream =
390        Pin<Box<dyn Stream<Item = Result<ProtoMembershipSnapshot, Status>> + Send>>;
391
392    /// Processes client write requests requiring consensus
393    /// # Raft Protocol Logic
394    /// - Entry point for client proposals (Section 7)
395    /// - Validates requests before appending to leader's log
396    /// - Ensures linearizable writes through log replication
397    async fn handle_client_write(
398        &self,
399        request: tonic::Request<ClientWriteRequest>,
400    ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
401        if !self.is_rpc_ready() {
402            warn!("[handle_client_write] Node-{} is not ready!", self.node_id);
403            return Err(Status::unavailable("Service is not ready"));
404        }
405
406        let remote_addr = request.remote_addr();
407        let timeout_duration =
408            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
409
410        // Clone cmd_tx before async move to avoid capturing &self
411        let cmd_tx = self.cmd_tx.clone();
412
413        let request_future = async move {
414            let proto_req: ClientWriteRequest = request.into_inner();
415            let operation_present =
416                proto_req.command.as_ref().and_then(|c| c.operation.as_ref()).is_some();
417            if !operation_present {
418                return Err(Status::invalid_argument(
419                    "WriteCommand must contain an operation",
420                ));
421            }
422            let core_req = proto_convert::to_core_write_req(proto_req);
423
424            let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
425            cmd_tx
426                .send(d_engine_core::ClientCmd::Propose(core_req, resp_tx))
427                .await
428                .map_err(|_| Status::internal("Command channel closed"))?;
429
430            handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_write")
431                .await
432                .map(|resp| resp.map(proto_convert::to_proto_response))
433        };
434
435        let cancellation_future = async move {
436            warn!("Request from {:?} cancelled by client", remote_addr);
437            // If this future is executed it means the request future was dropped,
438            // so it doesn't actually matter what is returned here
439            Err::<Response<ClientResponse>, Status>(Status::cancelled(
440                "Request cancelled by client",
441            ))
442        };
443
444        with_cancellation_handler(request_future, cancellation_future).await
445    }
446
447    /// Handles client read requests with linearizability guarantees
448    /// # Raft Protocol Logic
449    /// - Implements lease-based leader reads (Section 6.4)
450    /// - Verifies leadership before serving reads
451    /// - Ensures read-after-write consistency
452    async fn handle_client_read(
453        &self,
454        request: tonic::Request<ClientReadRequest>,
455    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
456        if !self.is_rpc_ready() {
457            warn!("handle_client_read: Node-{} is not ready!", self.node_id);
458            return Err(Status::unavailable("Service is not ready"));
459        }
460
461        let proto_req = request.into_inner();
462        if let Some(raw) = proto_req.consistency_policy
463            && d_engine_proto::client::ReadConsistencyPolicy::try_from(raw).is_err()
464        {
465            warn!(
466                raw_value = raw,
467                "Unknown consistency_policy value received, degrading to cluster default"
468            );
469        }
470        let core_req = proto_convert::to_core_read_req(proto_req);
471        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
472        self.cmd_tx
473            .send(d_engine_core::ClientCmd::Read(core_req, resp_tx))
474            .await
475            .map_err(|_| Status::internal("Command channel closed"))?;
476
477        let timeout_duration =
478            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
479        handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read")
480            .await
481            .map(|resp| resp.map(proto_convert::to_proto_response))
482    }
483
484    /// Scan all keys under a prefix.
485    ///
486    /// Routes through the Raft command channel so the leader serves the scan
487    /// (linearizable by default). Returns all matching entries plus the applied
488    /// index at scan time — clients use the revision to filter watch events
489    /// during reconnection.
490    async fn handle_client_scan(
491        &self,
492        request: tonic::Request<ScanRequest>,
493    ) -> std::result::Result<tonic::Response<ScanResponse>, tonic::Status> {
494        if !self.is_rpc_ready() {
495            warn!("handle_client_scan: Node-{} is not ready!", self.node_id);
496            return Err(Status::unavailable("Service is not ready"));
497        }
498
499        let req = request.into_inner();
500        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
501
502        self.cmd_tx
503            .send(d_engine_core::ClientCmd::Scan(req.prefix, resp_tx))
504            .await
505            .map_err(|_| Status::internal("Command channel closed"))?;
506
507        let timeout_duration =
508            Duration::from_millis(self.node_config.raft.general_raft_timeout_duration_in_ms);
509
510        let scan_result = handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_scan")
511            .await?
512            .into_inner();
513
514        Ok(tonic::Response::new(ScanResponse {
515            entries: scan_result
516                .entries
517                .into_iter()
518                .map(|(k, v)| KvEntry { key: k, value: v })
519                .collect(),
520            revision: scan_result.revision,
521        }))
522    }
523
524    /// Watch for changes to a specific key
525    ///
526    /// Returns a stream of events (PUT/DELETE) for the specified key.
527    /// The stream will continue until the client disconnects or the server shuts down.
528    ///
529    /// # Arguments
530    /// * `request` - Contains the key to watch
531    ///
532    /// # Returns
533    /// A stream of WatchResponse messages containing PUT/DELETE events
534    ///
535    /// # Errors
536    /// Returns Status::UNAVAILABLE if Watch is disabled in configuration
537    #[cfg(feature = "watch")]
538    async fn watch(
539        &self,
540        request: tonic::Request<WatchRequest>,
541    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
542        let watch_request = request.into_inner();
543        let key = watch_request.key;
544
545        // Check if watch registry is available
546        let registry = self.watch_registry.as_ref().ok_or_else(|| {
547            Status::unavailable("Watch feature is disabled in server configuration")
548        })?;
549
550        let is_prefix = watch_request.prefix;
551        let prev_kv = watch_request.prev_kv;
552        info!(
553            node_id = self.node_id,
554            key = ?key,
555            is_prefix,
556            prev_kv,
557            "Registering watch for key"
558        );
559
560        // Register watcher (exact or prefix) and get receiver
561        let handle = if is_prefix {
562            registry.register_prefix(key, prev_kv).map_err(|e| match e {
563                WatchError::LimitExceeded(_) => Status::resource_exhausted(e.to_string()),
564                WatchError::InvalidPrefix => Status::invalid_argument(e.to_string()),
565            })?
566        } else {
567            registry
568                .register(key, prev_kv)
569                .map_err(|e| Status::resource_exhausted(e.to_string()))?
570        };
571        let (_watcher_id, _key, receiver) = handle.into_receiver();
572
573        // Convert mpsc::Receiver<WatchEvent> -> Boxed Stream<WatchResponse> for gRPC.
574        // WatchEvent (opaque) is converted back to proto WatchResponse at this boundary.
575        let stream = tokio_stream::wrappers::ReceiverStream::new(receiver)
576            .map(|e| Ok(d_engine_proto::client::WatchResponse::from(&e)))
577            .chain(futures::stream::once(async {
578                Err(Status::unavailable(
579                    "Watch stream closed: server may have shut down or restarted. Please reconnect and re-register the watcher."
580                ))
581            }));
582
583        Ok(tonic::Response::new(Box::pin(stream)))
584    }
585
586    #[cfg(not(feature = "watch"))]
587    async fn watch(
588        &self,
589        _request: tonic::Request<WatchRequest>,
590    ) -> std::result::Result<tonic::Response<Self::WatchStream>, tonic::Status> {
591        Err(Status::unimplemented(
592            "Watch feature is not compiled in this build",
593        ))
594    }
595
596    /// Stream committed membership snapshots to the client.
597    ///
598    /// Sends the current snapshot immediately on connect (via `mark_changed`), then
599    /// one snapshot per committed ConfChange. Closes with UNAVAILABLE on server shutdown
600    /// so clients know to reconnect.
601    async fn watch_membership(
602        &self,
603        _request: tonic::Request<WatchMembershipRequest>,
604    ) -> std::result::Result<tonic::Response<Self::WatchMembershipStream>, tonic::Status> {
605        if !self.is_rpc_ready() {
606            warn!("[watch_membership] Node-{} is not ready", self.node_id);
607            return Err(Status::unavailable("Service is not ready"));
608        }
609
610        let mut rx = self.membership_rx.clone();
611        // Deliver the current snapshot immediately; subsequent items arrive on each ConfChange.
612        rx.mark_changed();
613
614        info!(node_id = self.node_id, "Membership watch stream opened");
615
616        let stream = tokio_stream::wrappers::WatchStream::new(rx)
617            .map(|s| {
618                Ok(ProtoMembershipSnapshot {
619                    members: s.members.into_iter().collect(),
620                    learners: s.learners.into_iter().collect(),
621                    committed_index: s.committed_index,
622                })
623            })
624            .chain(futures::stream::once(async {
625                Err(Status::unavailable(
626                    "Membership watch stream closed: server shut down. Reconnect to re-subscribe.",
627                ))
628            }));
629
630        Ok(tonic::Response::new(Box::pin(stream)))
631    }
632}
633
634/// Gracefully handles client request cancellations
635/// # Functionality
636/// - Manages cleanup of abandoned requests
637/// - Tracks request cancellation metrics
638/// - Prevents resource leaks from dropped requests
639pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
640    request_future: FRequest,
641    cancellation_future: FCancellation,
642) -> std::result::Result<Response<ClientResponse>, Status>
643where
644    FRequest:
645        Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
646    FCancellation:
647        Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
648{
649    let token = CancellationToken::new();
650    // Will call token.cancel() when the future is dropped, such as when the client
651    // cancels the request
652    let _drop_guard = token.clone().drop_guard();
653    let select_task = tokio::spawn(async move {
654        // Can select on token cancellation on any cancellable future while handling the
655        // request, allowing for custom cleanup code or monitoring
656        select! {
657            res = request_future => res,
658            _ = token.cancelled() => cancellation_future.await,
659        }
660    });
661
662    select_task.await.unwrap()
663}
664
665/// Centralized timeout handler for all RPC operations
666/// # Features
667/// - Uniform timeout enforcement across RPC types
668/// - Detailed error categorization:
669///   - Channel errors
670///   - Application-level errors
671///   - Deadline exceeded
672/// - Logging and metrics integration
673async fn handle_rpc_timeout<T, E>(
674    resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
675    timeout_duration: Duration,
676    rpc_name: &'static str,
677) -> Result<Response<T>, Status>
678where
679    T: std::fmt::Debug,
680    E: std::fmt::Debug,
681{
682    debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
683
684    match timeout(timeout_duration, resp_rx).await {
685        Ok(Ok(Ok(response))) => {
686            debug!("[{}] Success response: {:?}", rpc_name, &response);
687            Ok(Response::new(response))
688        }
689        Ok(Ok(Err(status))) => {
690            error!("[{}] Error status: {:?}", rpc_name, &status);
691            Err(status)
692        }
693        Ok(Err(e)) => {
694            error!("[{}] Channel error: {:?}", rpc_name, e);
695            Err(Status::deadline_exceeded("RPC channel closed"))
696        }
697        Err(_) => {
698            warn!(
699                "[{}] Response timeout after {:?}",
700                rpc_name, timeout_duration
701            );
702            Err(Status::deadline_exceeded("RPC timeout exceeded"))
703        }
704    }
705}