d_engine/network/grpc/
grpc_raft_service.rs

1//! Raft gRPC service implementation handling RPC communication between cluster nodes
2//! and client requests. Implements core Raft protocol logic for leader election,
3//! log replication, and cluster configuration management.
4
5use std::future::Future;
6use std::time::Duration;
7
8use autometrics::autometrics;
9use log::debug;
10use log::error;
11use log::warn;
12use tokio::select;
13use tokio::time::timeout;
14use tokio_util::sync::CancellationToken;
15use tonic::Request;
16use tonic::Response;
17use tonic::Status;
18
19use crate::proto::rpc_service_server::RpcService;
20use crate::proto::AppendEntriesRequest;
21use crate::proto::AppendEntriesResponse;
22use crate::proto::ClientProposeRequest;
23use crate::proto::ClientReadRequest;
24use crate::proto::ClientResponse;
25use crate::proto::ClusteMembershipChangeRequest;
26use crate::proto::ClusterConfUpdateResponse;
27use crate::proto::ClusterMembership;
28use crate::proto::MetadataRequest;
29use crate::proto::VoteRequest;
30use crate::proto::VoteResponse;
31use crate::MaybeCloneOneshot;
32use crate::Node;
33use crate::RaftEvent;
34use crate::RaftOneshot;
35use crate::TypeConfig;
36use crate::API_SLO;
37
38#[tonic::async_trait]
39impl<T> RpcService for Node<T>
40where T: TypeConfig
41{
42    /// Handles RequestVote RPC calls from candidate nodes during leader elections
43    /// # Raft Protocol Logic
44    /// - Part of leader election mechanism (Section 5.2)
45    /// - Validates candidate's term and log completeness
46    /// - Grants vote if candidate's log is at least as up-to-date as local log
47    #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
48    #[tracing::instrument]
49    async fn request_vote(
50        &self,
51        request: tonic::Request<VoteRequest>,
52    ) -> std::result::Result<Response<VoteResponse>, Status> {
53        if !self.server_is_ready() {
54            warn!("[rpc|request_vote] Node-{} is not ready!", self.node_id);
55            return Err(Status::unavailable("Service is not ready"));
56        }
57
58        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
59        self.event_tx
60            .send(RaftEvent::ReceiveVoteRequest(request.into_inner(), resp_tx))
61            .await
62            .map_err(|_| Status::internal("Event channel closed"))?;
63        let timeout_duration = Duration::from_millis(self.settings.raft.election.election_timeout_min);
64        handle_rpc_timeout(resp_rx, timeout_duration, "request_vote").await
65    }
66
67    /// Processes AppendEntries RPC calls from cluster leader
68    /// # Raft Protocol Logic
69    /// - Heartbeat mechanism (Section 5.2)
70    /// - Log replication entry point (Section 5.3)
71    /// - Term comparison logic:
72    ///   - If incoming term > current term: revert to follower state
73    ///   - Reset election timeout on valid leader communication
74    #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
75    #[tracing::instrument]
76    async fn append_entries(
77        &self,
78        request: Request<AppendEntriesRequest>,
79    ) -> std::result::Result<Response<AppendEntriesResponse>, tonic::Status> {
80        if !self.server_is_ready() {
81            warn!("[rpc|append_entries] Node-{} is not ready!", self.node_id);
82            return Err(Status::unavailable("Service is not ready"));
83        }
84
85        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
86        self.event_tx
87            .send(RaftEvent::AppendEntries(request.into_inner(), resp_tx))
88            .await
89            .map_err(|_| Status::internal("Event channel closed"))?;
90
91        let timeout_duration = Duration::from_millis(self.settings.retry.election.timeout_ms);
92
93        handle_rpc_timeout(resp_rx, timeout_duration, "append_entries").await
94    }
95
96    /// Handles cluster membership changes (joint consensus)
97    /// # Raft Protocol Logic
98    /// - Implements cluster configuration changes (Section 6)
99    /// - Validates new configuration against current cluster state
100    /// - Ensures safety during membership transitions
101    #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
102    #[tracing::instrument]
103    async fn update_cluster_conf(
104        &self,
105        request: tonic::Request<ClusteMembershipChangeRequest>,
106    ) -> std::result::Result<Response<ClusterConfUpdateResponse>, Status> {
107        if !self.server_is_ready() {
108            warn!("[rpc|update_cluster_conf] Node-{} is not ready!", self.node_id);
109            return Err(Status::unavailable("Service is not ready"));
110        }
111
112        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
113        self.event_tx
114            .send(RaftEvent::ClusterConfUpdate(request.into_inner(), resp_tx))
115            .await
116            .map_err(|_| Status::internal("Event channel closed"))?;
117
118        let timeout_duration = Duration::from_millis(self.settings.retry.membership.timeout_ms);
119        handle_rpc_timeout(resp_rx, timeout_duration, "update_cluster_conf").await
120    }
121
122    /// Processes client write requests requiring consensus
123    /// # Raft Protocol Logic
124    /// - Entry point for client proposals (Section 7)
125    /// - Validates requests before appending to leader's log
126    /// - Ensures linearizable writes through log replication
127    #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
128    #[tracing::instrument]
129    async fn handle_client_propose(
130        &self,
131        request: tonic::Request<ClientProposeRequest>,
132    ) -> std::result::Result<tonic::Response<ClientResponse>, Status> {
133        if !self.server_is_ready() {
134            warn!("[handle_client_propose] Node-{} is not ready!", self.node_id);
135            return Err(Status::unavailable("Service is not ready"));
136        }
137
138        let remote_addr = request.remote_addr();
139        let event_tx = self.event_tx.clone();
140        let timeout_duration = Duration::from_millis(self.settings.raft.general_raft_timeout_duration_in_ms);
141
142        let request_future = async move {
143            let req: ClientProposeRequest = request.into_inner();
144            // Extract request and validate
145            if req.commands.is_empty() {
146                return Err(Status::invalid_argument("Commands cannot be empty"));
147            }
148
149            let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
150            event_tx
151                .send(RaftEvent::ClientPropose(req, resp_tx))
152                .await
153                .map_err(|_| Status::internal("Event channel closed"))?;
154
155            handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_propose").await
156        };
157
158        let cancellation_future = async move {
159            warn!("Request from {:?} cancelled by client", remote_addr);
160            // If this future is executed it means the request future was dropped,
161            // so it doesn't actually matter what is returned here
162            Err::<Response<ClientResponse>, Status>(Status::cancelled("Request cancelled by client"))
163        };
164
165        with_cancellation_handler(request_future, cancellation_future).await
166    }
167
168    /// Returns current cluster membership and state metadata
169    /// # Usage
170    /// - Administrative API for cluster inspection
171    /// - Provides snapshot of current configuration
172    #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
173    #[tracing::instrument]
174    async fn get_cluster_metadata(
175        &self,
176        request: tonic::Request<MetadataRequest>,
177    ) -> std::result::Result<tonic::Response<ClusterMembership>, tonic::Status> {
178        debug!("receive get_cluster_metadata");
179        if !self.server_is_ready() {
180            warn!("[rpc|get_cluster_metadata] Node-{} is not ready!", self.node_id);
181            return Err(Status::unavailable("Service is not ready"));
182        }
183
184        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
185        self.event_tx
186            .send(RaftEvent::ClusterConf(request.into_inner(), resp_tx))
187            .await
188            .map_err(|_| Status::internal("Event channel closed"))?;
189
190        let timeout_duration = Duration::from_millis(self.settings.raft.general_raft_timeout_duration_in_ms);
191        handle_rpc_timeout(resp_rx, timeout_duration, "get_cluster_metadata").await
192    }
193
194    /// Handles client read requests with linearizability guarantees
195    /// # Raft Protocol Logic
196    /// - Implements lease-based leader reads (Section 6.4)
197    /// - Verifies leadership before serving reads
198    /// - Ensures read-after-write consistency
199    #[cfg_attr(not(doc), autometrics(objective = API_SLO))]
200    #[tracing::instrument]
201    async fn handle_client_read(
202        &self,
203        request: tonic::Request<ClientReadRequest>,
204    ) -> std::result::Result<tonic::Response<ClientResponse>, tonic::Status> {
205        if !self.server_is_ready() {
206            warn!("handle_client_read: Node-{} is not ready!", self.node_id);
207            return Err(Status::unavailable("Service is not ready"));
208        }
209
210        let (resp_tx, resp_rx) = MaybeCloneOneshot::new();
211        self.event_tx
212            .send(RaftEvent::ClientReadRequest(request.into_inner(), resp_tx))
213            .await
214            .map_err(|_| Status::internal("Event channel closed"))?;
215
216        let timeout_duration = Duration::from_millis(self.settings.raft.general_raft_timeout_duration_in_ms);
217        handle_rpc_timeout(resp_rx, timeout_duration, "handle_client_read").await
218    }
219}
220
221/// Gracefully handles client request cancellations
222/// # Functionality
223/// - Manages cleanup of abandoned requests
224/// - Tracks request cancellation metrics
225/// - Prevents resource leaks from dropped requests
226pub(crate) async fn with_cancellation_handler<FRequest, FCancellation>(
227    request_future: FRequest,
228    cancellation_future: FCancellation,
229) -> std::result::Result<Response<ClientResponse>, Status>
230where
231    FRequest: Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
232    FCancellation: Future<Output = std::result::Result<Response<ClientResponse>, Status>> + Send + 'static,
233{
234    let token = CancellationToken::new();
235    // Will call token.cancel() when the future is dropped, such as when the client
236    // cancels the request
237    let _drop_guard = token.clone().drop_guard();
238    let select_task = tokio::spawn(async move {
239        // Can select on token cancellation on any cancellable future while handling the
240        // request, allowing for custom cleanup code or monitoring
241        select! {
242            res = request_future => res,
243            _ = token.cancelled() => cancellation_future.await,
244        }
245    });
246
247    select_task.await.unwrap()
248}
249
250/// Centralized timeout handler for all RPC operations
251/// # Features
252/// - Uniform timeout enforcement across RPC types
253/// - Detailed error categorization:
254///   - Channel errors
255///   - Application-level errors
256///   - Deadline exceeded
257/// - Logging and metrics integration
258async fn handle_rpc_timeout<T, E>(
259    resp_rx: impl Future<Output = Result<Result<T, Status>, E>>,
260    timeout_duration: Duration,
261    rpc_name: &'static str,
262) -> Result<Response<T>, Status>
263where
264    T: std::fmt::Debug,
265    E: std::fmt::Debug,
266{
267    debug!("grpc_raft_serice::handle_rpc_timeout::{}", rpc_name);
268
269    match timeout(timeout_duration, resp_rx).await {
270        Ok(Ok(Ok(response))) => {
271            debug!("[{}] Success response: {:?}", rpc_name, &response);
272            Ok(Response::new(response))
273        }
274        Ok(Ok(Err(status))) => {
275            error!("[{}] Error status: {:?}", rpc_name, &status);
276            Err(status)
277        }
278        Ok(Err(e)) => {
279            error!("[{}] Channel error: {:?}", rpc_name, e);
280            Err(Status::deadline_exceeded("RPC channel closed"))
281        }
282        Err(_) => {
283            warn!("[{}] Response timeout after {:?}", rpc_name, timeout_duration);
284            Err(Status::deadline_exceeded("RPC timeout exceeded"))
285        }
286    }
287}