d_engine_core/network/mod.rs
1//! This module is the network abstraction layer with timeout-aware gRPC
2//! implementation
3//!
4//! This module provides network communication facilities with configurable
5//! timeout policies for distributed system operations. All network operations
6//! are governed by timeout parameters defined in [`RaftConfig`] to ensure
7//! system responsiveness.
8
9mod background_snapshot_transfer;
10
11#[cfg(test)]
12mod background_snapshot_transfer_test;
13pub(crate) use background_snapshot_transfer::*;
14use d_engine_proto::server::cluster::ClusterConfChangeRequest;
15use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
16use d_engine_proto::server::election::VoteRequest;
17use d_engine_proto::server::election::VoteResponse;
18use d_engine_proto::server::replication::AppendEntriesRequest;
19use d_engine_proto::server::replication::AppendEntriesResponse;
20use d_engine_proto::server::storage::PurgeLogRequest;
21use d_engine_proto::server::storage::PurgeLogResponse;
22use d_engine_proto::server::storage::SnapshotChunk;
23#[cfg(any(test, feature = "test-utils"))]
24use mockall::automock;
25use tonic::async_trait;
26
27use crate::BackoffPolicy;
28use crate::NetworkError;
29use crate::Result;
30use crate::RetryPolicies;
31use crate::TypeConfig;
32
33// Define a structured return value
34#[derive(Debug, Clone)]
35pub struct AppendResults {
36 /// Whether a majority quorum is achieved (can be directly determined via
37 /// peer_updates)
38 pub commit_quorum_achieved: bool,
39 /// Updates to each peer's match_index and next_index
40 pub peer_updates: HashMap<u32, PeerUpdate>,
41 /// Learner log catch-up progress
42 pub learner_progress: HashMap<u32, Option<u64>>,
43}
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct PeerUpdate {
47 pub match_index: Option<u64>,
48 pub next_index: u64,
49 /// if peer response success
50 pub success: bool,
51}
52
53impl PeerUpdate {
54 #[allow(unused)]
55 pub fn success(
56 match_index: u64,
57 next_index: u64,
58 ) -> Self {
59 PeerUpdate {
60 match_index: Some(match_index),
61 next_index,
62 success: true,
63 }
64 }
65
66 #[allow(unused)]
67 pub fn failed() -> Self {
68 Self {
69 match_index: None,
70 next_index: 1,
71 success: false,
72 }
73 }
74}
75#[derive(Debug)]
76pub struct AppendResult {
77 pub peer_ids: HashSet<u32>,
78 pub responses: Vec<Result<AppendEntriesResponse>>,
79}
80#[derive(Debug)]
81pub struct VoteResult {
82 pub peer_ids: HashSet<u32>,
83 pub responses: Vec<Result<VoteResponse>>,
84}
85#[allow(dead_code)]
86#[derive(Debug)]
87pub struct ClusterUpdateResult {
88 pub peer_ids: HashSet<u32>,
89 pub responses: Vec<Result<ClusterConfUpdateResponse>>,
90}
91
92#[cfg_attr(any(test, feature = "test-utils"), automock)]
93#[async_trait]
94pub trait Transport<T>: Send + Sync + 'static
95where
96 T: TypeConfig,
97{
98 /// Propagates cluster configuration changes to voting members using Raft's joint consensus.
99 ///
100 /// # Protocol
101 /// - Implements membership change protocol from Raft §6
102 /// - Leader-exclusive operation
103 /// - Automatically filters self-references and duplicates
104 ///
105 /// # Parameters
106 /// - `req`: Configuration change details with transition state
107 /// - `retry`: Network retry policy with exponential backoff
108 /// - `membership`: Cluster membership for channel resolution
109 ///
110 /// # Errors
111 /// - `NetworkError::EmptyPeerList` if no peers provided
112 /// - `NetworkError::TaskFailed` for background execution failures
113 /// - `ConsensusError::NotLeader` if executed by non-leader
114 ///
115 /// # Implementation
116 /// - Uses compressed gRPC streams for efficiency
117 /// - Maintains response order matching input peers
118 /// - Concurrent request processing with ordered aggregation
119 #[allow(dead_code)]
120 async fn send_cluster_update(
121 &self,
122 req: ClusterConfChangeRequest,
123 retry: &RetryPolicies,
124 membership: std::sync::Arc<crate::alias::MOF<T>>,
125 ) -> Result<ClusterUpdateResult>;
126
127 /// Replicates log entries to followers and learners.
128 ///
129 /// # Protocol
130 /// - Implements log replication from Raft §5.3
131 /// - Leader-exclusive operation
132 /// - Handles log consistency checks automatically
133 ///
134 /// # Parameters
135 /// - `requests`: Vector of (peer_id, AppendEntriesRequest)
136 /// - `retry`: Network retry configuration
137 /// - `membership`: Cluster membership for channel resolution
138 /// - `response_compress_enabled`: Enable compression for replication responses
139 ///
140 /// # Returns
141 /// - On success: `Ok(AppendResult)` containing aggregated responses
142 /// - On failure: `Err(NetworkError)` for unrecoverable errors
143 ///
144 /// ## **Error Conditions**: Top-level `Err` is returned ONLY when:
145 /// - Input `requests_with_peer_address` is empty (`NetworkError::EmptyPeerList`)
146 /// - Critical failures prevent spawning async tasks (not shown in current impl)
147 ///
148 /// # Errors
149 /// - `NetworkError::EmptyPeerList` for empty input
150 /// - `NetworkError::TaskFailed` for partial execution failures
151 ///
152 /// # Guarantees
153 /// - At-least-once delivery semantics
154 /// - Automatic deduplication of peer entries
155 /// - Non-blocking error handling
156 async fn send_append_requests(
157 &self,
158 requests: Vec<(u32, AppendEntriesRequest)>,
159 retry: &RetryPolicies,
160 membership: std::sync::Arc<crate::alias::MOF<T>>,
161 response_compress_enabled: bool,
162 ) -> Result<AppendResult>;
163
164 /// Initiates leader election by requesting votes from cluster peers.
165 ///
166 /// # Protocol
167 /// - Implements leader election from Raft §5.2
168 /// - Candidate-exclusive operation
169 /// - Validates log completeness requirements
170 ///
171 /// # Parameters
172 /// - `req`: Election metadata with candidate's term and log state
173 /// - `retry`: Election-specific retry strategy
174 /// - `membership`: Cluster membership for channel resolution
175 ///
176 /// # Errors
177 /// - `NetworkError::EmptyPeerList` for empty peer list
178 /// - `NetworkError::TaskFailed` for RPC execution failures
179 ///
180 /// # Safety
181 /// - Automatic term validation in responses
182 /// - Strict candidate state enforcement
183 /// - Non-blocking partial failure handling
184 async fn send_vote_requests(
185 &self,
186 req: VoteRequest,
187 retry: &RetryPolicies,
188 membership: std::sync::Arc<crate::alias::MOF<T>>,
189 ) -> Result<VoteResult>;
190
191 /// Orchestrates log compaction across cluster peers after snapshot creation.
192 ///
193 /// # Protocol
194 /// - Implements log truncation from Raft §7
195 /// - Leader-exclusive operation
196 /// - Requires valid snapshot checksum
197 ///
198 /// # Parameters
199 /// - `req`: Snapshot metadata with truncation index
200 /// - `retry`: Purge-specific retry configuration
201 /// - `membership`: Cluster membership for channel resolution
202 ///
203 /// # Errors
204 /// - `NetworkError::EmptyPeerList` for empty peer list
205 /// - `NetworkError::TaskFailed` for background execution errors
206 ///
207 /// # Guarantees
208 /// - At-least-once delivery
209 /// - Automatic progress tracking
210 /// - Crash-safe persistence requirements
211 async fn send_purge_requests(
212 &self,
213 req: PurgeLogRequest,
214 retry: &RetryPolicies,
215 membership: std::sync::Arc<crate::alias::MOF<T>>,
216 ) -> Result<Vec<Result<PurgeLogResponse>>>;
217
218 /// Initiates cluster join process for a learner node
219 ///
220 /// # Protocol
221 /// - Implements cluster join protocol from Raft §6
222 /// - Learner-exclusive operation
223 /// - Requires leader connection
224 ///
225 /// # Parameters
226 /// - `leader_channel`: Pre-established gRPC channel to cluster leader
227 /// - `request`: Join request with node metadata
228 /// - `retry`: Join-specific retry configuration
229 /// - `membership`: Cluster membership for channel resolution
230 ///
231 /// # Errors
232 /// - NetworkError::JoinFailed: On unrecoverable join failure
233 /// - NetworkError::NotLeader: If contacted node isn't leader
234 ///
235 /// # Guarantees
236 /// - At-least-once delivery
237 /// - Automatic leader discovery
238 /// - Idempotent operation
239 async fn join_cluster(
240 &self,
241 leader_id: u32,
242 request: d_engine_proto::server::cluster::JoinRequest,
243 retry: BackoffPolicy,
244 membership: std::sync::Arc<crate::alias::MOF<T>>,
245 ) -> Result<d_engine_proto::server::cluster::JoinResponse>;
246
247 /// Discovers current cluster leader
248 ///
249 /// # Protocol
250 /// - Broadcast-based leader discovery
251 /// - Handles redirection to current leader
252 ///
253 /// # Parameters
254 /// - `bootstrap_endpoints`: Initial cluster endpoints
255 /// - `request`: Discovery request with node metadata
256 ///
257 /// # Errors
258 /// - `NetworkError::DiscoveryTimeout`: When no response received
259 async fn discover_leader(
260 &self,
261 request: d_engine_proto::server::cluster::LeaderDiscoveryRequest,
262 rpc_enable_compression: bool,
263 membership: std::sync::Arc<crate::alias::MOF<T>>,
264 ) -> Result<Vec<d_engine_proto::server::cluster::LeaderDiscoveryResponse>>;
265
266 /// Requests and streams a snapshot from the current leader.
267 ///
268 /// # Parameters
269 /// - `leader_id`: Current leader node ID
270 /// - `retry`: Retry configuration (currently unused in implementation)
271 /// - `membership`: Cluster membership for channel resolution
272 ///
273 /// # Returns
274 /// Streaming of snapshot chunks from the leader
275 ///
276 /// # Errors
277 /// Returns `NetworkError` if:
278 /// - Connection to leader fails
279 /// - gRPC call fails
280 async fn request_snapshot_from_leader(
281 &self,
282 leader_id: u32,
283 ack_tx: tokio::sync::mpsc::Receiver<d_engine_proto::server::storage::SnapshotAck>,
284 retry: &crate::InstallSnapshotBackoffPolicy,
285 membership: std::sync::Arc<crate::alias::MOF<T>>,
286 ) -> Result<Box<tonic::Streaming<SnapshotChunk>>>;
287}
288
289// Module level utils
290// -----------------------------------------------------------------------------
291use std::collections::HashMap;
292use std::collections::HashSet;
293use std::time::Duration;
294
295use tokio::time::sleep;
296use tokio::time::timeout;
297use tonic::Code;
298use tracing::debug;
299use tracing::warn;
300
301use crate::Error;
302
303/// As soon as task has return we should return from this function
304pub async fn grpc_task_with_timeout_and_exponential_backoff<F, T, U>(
305 task_name: &'static str,
306 mut task: F,
307 policy: BackoffPolicy,
308) -> std::result::Result<tonic::Response<U>, Error>
309where
310 F: FnMut() -> T,
311 T: std::future::Future<Output = std::result::Result<tonic::Response<U>, tonic::Status>>
312 + Send
313 + 'static,
314{
315 // let max_retries = 5;
316 let mut retries = 0;
317 let mut current_delay = Duration::from_millis(policy.base_delay_ms);
318 let timeout_duration = Duration::from_millis(policy.timeout_ms);
319 let max_delay = Duration::from_millis(policy.max_delay_ms);
320 let max_retries = policy.max_retries;
321
322 let mut last_error =
323 NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
324 while retries < max_retries {
325 debug!("[{task_name}] Attempt {} of {}", retries + 1, max_retries);
326 match timeout(timeout_duration, task()).await {
327 Ok(Ok(r)) => {
328 return Ok(r); // Exit on success
329 }
330 Ok(Err(status)) => {
331 last_error = match status.code() {
332 Code::Unavailable => {
333 warn!("[{task_name}] Service unavailable: {}", status.message());
334 NetworkError::ServiceUnavailable(format!(
335 "Service unavailable: {}",
336 status.message()
337 ))
338 }
339 _ => {
340 warn!("[{task_name}] RPC error: {}", status);
341 NetworkError::TonicStatusError(Box::new(status))
342 }
343 };
344 }
345 Err(_e) => {
346 warn!("[{task_name}] Task timed out after {:?}", timeout_duration);
347 last_error = NetworkError::RetryTimeoutError(timeout_duration);
348 }
349 };
350
351 if retries < max_retries - 1 {
352 debug!("[{task_name}] Retrying in {:?}...", current_delay);
353 sleep(current_delay).await;
354
355 // Exponential backoff (double the delay each time)
356 current_delay = (current_delay * 2).min(max_delay);
357 } else {
358 warn!("[{task_name}] Task failed after {} retries", retries);
359 //bug: no need to return if the it is not a business logic error
360 // return Err(Error::RetryTaskFailed("Task failed after max
361 // retries".to_string())); // Return the last error after max
362 // retries
363 }
364 retries += 1;
365 }
366 warn!("[{task_name}] Task failed after {} retries", max_retries);
367 Err(last_error.into()) // Fallback error message if no task returns Ok
368}