d_engine_core/network/mod.rs
1//! This module is the network abstraction layer with timeout-aware gRPC
2//! implementation
3//!
4//! This module provides network communication facilities with configurable
5//! timeout policies for distributed system operations. All network operations
6//! are governed by timeout parameters defined in [`RaftConfig`] to ensure
7//! system responsiveness.
8
9mod background_snapshot_transfer;
10
11#[cfg(test)]
12mod background_snapshot_transfer_test;
13pub(crate) use background_snapshot_transfer::*;
14use d_engine_proto::server::cluster::ClusterConfChangeRequest;
15use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
16use d_engine_proto::server::election::VoteRequest;
17use d_engine_proto::server::election::VoteResponse;
18use d_engine_proto::server::replication::AppendEntriesRequest;
19use d_engine_proto::server::replication::AppendEntriesResponse;
20use d_engine_proto::server::storage::SnapshotChunk;
21#[cfg(any(test, feature = "__test_support"))]
22use mockall::automock;
23use tonic::async_trait;
24
25use crate::BackoffPolicy;
26use crate::NetworkError;
27use crate::Result;
28use crate::RetryPolicies;
29use crate::TypeConfig;
30
31// Define a structured return value
32#[derive(Debug, Clone)]
33pub struct AppendResults {
34 /// Whether a majority quorum is achieved (can be directly determined via
35 /// peer_updates)
36 pub commit_quorum_achieved: bool,
37 /// Updates to each peer's match_index and next_index
38 pub peer_updates: HashMap<u32, PeerUpdate>,
39 /// Learner log catch-up progress
40 pub learner_progress: HashMap<u32, Option<u64>>,
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct PeerUpdate {
45 pub match_index: Option<u64>,
46 pub next_index: u64,
47 /// if peer response success
48 pub success: bool,
49}
50
51impl PeerUpdate {
52 #[allow(unused)]
53 pub fn success(
54 match_index: u64,
55 next_index: u64,
56 ) -> Self {
57 PeerUpdate {
58 match_index: Some(match_index),
59 next_index,
60 success: true,
61 }
62 }
63
64 #[allow(unused)]
65 pub fn failed() -> Self {
66 Self {
67 match_index: None,
68 next_index: 1,
69 success: false,
70 }
71 }
72}
73#[derive(Debug)]
74pub struct AppendResult {
75 pub peer_ids: HashSet<u32>,
76 pub responses: Vec<Result<AppendEntriesResponse>>,
77}
78#[derive(Debug)]
79pub struct VoteResult {
80 pub peer_ids: HashSet<u32>,
81 pub responses: Vec<Result<VoteResponse>>,
82}
83#[allow(dead_code)]
84#[derive(Debug)]
85pub struct ClusterUpdateResult {
86 pub peer_ids: HashSet<u32>,
87 pub responses: Vec<Result<ClusterConfUpdateResponse>>,
88}
89
90#[cfg_attr(any(test, feature = "__test_support"), automock)]
91#[async_trait]
92pub trait Transport<T>: Send + Sync + 'static
93where
94 T: TypeConfig,
95{
96 /// Propagates cluster configuration changes to voting members using Raft's joint consensus.
97 ///
98 /// # Protocol
99 /// - Implements membership change protocol from Raft §6
100 /// - Leader-exclusive operation
101 /// - Automatically filters self-references and duplicates
102 ///
103 /// # Parameters
104 /// - `req`: Configuration change details with transition state
105 /// - `retry`: Network retry policy with exponential backoff
106 /// - `membership`: Cluster membership for channel resolution
107 ///
108 /// # Errors
109 /// - `NetworkError::EmptyPeerList` if no peers provided
110 /// - `NetworkError::TaskFailed` for background execution failures
111 /// - `ConsensusError::NotLeader` if executed by non-leader
112 ///
113 /// # Implementation
114 /// - Uses compressed gRPC streams for efficiency
115 /// - Maintains response order matching input peers
116 /// - Concurrent request processing with ordered aggregation
117 #[allow(dead_code)]
118 async fn send_cluster_update(
119 &self,
120 req: ClusterConfChangeRequest,
121 retry: &RetryPolicies,
122 membership: std::sync::Arc<crate::alias::MOF<T>>,
123 ) -> Result<ClusterUpdateResult>;
124
125 /// Replicates log entries to followers and learners.
126 ///
127 /// # Protocol
128 /// - Implements log replication from Raft §5.3
129 /// - Leader-exclusive operation
130 /// - Handles log consistency checks automatically
131 ///
132 /// # Parameters
133 /// - `requests`: Vector of (peer_id, AppendEntriesRequest)
134 /// - `retry`: Network retry configuration
135 /// - `membership`: Cluster membership for channel resolution
136 /// - `response_compress_enabled`: Enable compression for replication responses
137 ///
138 /// # Returns
139 /// - On success: `Ok(AppendResult)` containing aggregated responses
140 /// - On failure: `Err(NetworkError)` for unrecoverable errors
141 ///
142 /// ## **Error Conditions**: Top-level `Err` is returned ONLY when:
143 /// - Input `requests_with_peer_address` is empty (`NetworkError::EmptyPeerList`)
144 /// - Critical failures prevent spawning async tasks (not shown in current impl)
145 ///
146 /// # Errors
147 /// - `NetworkError::EmptyPeerList` for empty input
148 /// - `NetworkError::TaskFailed` for partial execution failures
149 ///
150 /// # Guarantees
151 /// - At-least-once delivery semantics
152 /// - Automatic deduplication of peer entries
153 /// - Non-blocking error handling
154 async fn send_append_requests(
155 &self,
156 requests: Vec<(u32, AppendEntriesRequest)>,
157 retry: &RetryPolicies,
158 membership: std::sync::Arc<crate::alias::MOF<T>>,
159 response_compress_enabled: bool,
160 ) -> Result<AppendResult>;
161
162 /// Initiates leader election by requesting votes from cluster peers.
163 ///
164 /// # Protocol
165 /// - Implements leader election from Raft §5.2
166 /// - Candidate-exclusive operation
167 /// - Validates log completeness requirements
168 ///
169 /// # Parameters
170 /// - `req`: Election metadata with candidate's term and log state
171 /// - `retry`: Election-specific retry strategy
172 /// - `membership`: Cluster membership for channel resolution
173 ///
174 /// # Errors
175 /// - `NetworkError::EmptyPeerList` for empty peer list
176 /// - `NetworkError::TaskFailed` for RPC execution failures
177 ///
178 /// # Safety
179 /// - Automatic term validation in responses
180 /// - Strict candidate state enforcement
181 /// - Non-blocking partial failure handling
182 async fn send_vote_requests(
183 &self,
184 req: VoteRequest,
185 retry: &RetryPolicies,
186 membership: std::sync::Arc<crate::alias::MOF<T>>,
187 ) -> Result<VoteResult>;
188
189 /// Orchestrates log compaction across cluster peers after snapshot creation.
190 ///
191 /// # Protocol
192 /// - Implements log truncation from Raft §7
193 /// Initiates cluster join process for a learner node
194 ///
195 /// # Protocol
196 /// - Implements cluster join protocol from Raft §6
197 /// - Learner-exclusive operation
198 /// - Requires leader connection
199 ///
200 /// # Parameters
201 /// - `leader_channel`: Pre-established gRPC channel to cluster leader
202 /// - `request`: Join request with node metadata
203 /// - `retry`: Join-specific retry configuration
204 /// - `membership`: Cluster membership for channel resolution
205 ///
206 /// # Errors
207 /// - NetworkError::JoinFailed: On unrecoverable join failure
208 /// - NetworkError::NotLeader: If contacted node isn't leader
209 ///
210 /// # Guarantees
211 /// - At-least-once delivery
212 /// - Automatic leader discovery
213 /// - Idempotent operation
214 async fn join_cluster(
215 &self,
216 leader_id: u32,
217 request: d_engine_proto::server::cluster::JoinRequest,
218 retry: BackoffPolicy,
219 membership: std::sync::Arc<crate::alias::MOF<T>>,
220 ) -> Result<d_engine_proto::server::cluster::JoinResponse>;
221
222 /// Discovers current cluster leader
223 ///
224 /// # Protocol
225 /// - Broadcast-based leader discovery
226 /// - Handles redirection to current leader
227 ///
228 /// # Parameters
229 /// - `bootstrap_endpoints`: Initial cluster endpoints
230 /// - `request`: Discovery request with node metadata
231 ///
232 /// # Errors
233 /// - `NetworkError::DiscoveryTimeout`: When no response received
234 async fn discover_leader(
235 &self,
236 request: d_engine_proto::server::cluster::LeaderDiscoveryRequest,
237 rpc_enable_compression: bool,
238 membership: std::sync::Arc<crate::alias::MOF<T>>,
239 ) -> Result<Vec<d_engine_proto::server::cluster::LeaderDiscoveryResponse>>;
240
241 /// Requests and streams a snapshot from the current leader.
242 ///
243 /// # Parameters
244 /// - `leader_id`: Current leader node ID
245 /// - `retry`: Retry configuration (currently unused in implementation)
246 /// - `membership`: Cluster membership for channel resolution
247 ///
248 /// # Returns
249 /// Streaming of snapshot chunks from the leader
250 ///
251 /// # Errors
252 /// Returns `NetworkError` if:
253 /// - Connection to leader fails
254 /// - gRPC call fails
255 async fn request_snapshot_from_leader(
256 &self,
257 leader_id: u32,
258 ack_tx: tokio::sync::mpsc::Receiver<d_engine_proto::server::storage::SnapshotAck>,
259 retry: &crate::InstallSnapshotBackoffPolicy,
260 membership: std::sync::Arc<crate::alias::MOF<T>>,
261 ) -> Result<Box<tonic::Streaming<SnapshotChunk>>>;
262}
263
264// Module level utils
265// -----------------------------------------------------------------------------
266use std::collections::HashMap;
267use std::collections::HashSet;
268use std::time::Duration;
269
270use tokio::time::sleep;
271use tokio::time::timeout;
272use tonic::Code;
273use tracing::debug;
274use tracing::warn;
275
276use crate::Error;
277
278/// As soon as task has return we should return from this function
279pub async fn grpc_task_with_timeout_and_exponential_backoff<F, T, U>(
280 task_name: &'static str,
281 mut task: F,
282 policy: BackoffPolicy,
283) -> std::result::Result<tonic::Response<U>, Error>
284where
285 F: FnMut() -> T,
286 T: std::future::Future<Output = std::result::Result<tonic::Response<U>, tonic::Status>>
287 + Send
288 + 'static,
289{
290 // let max_retries = 5;
291 let mut retries = 0;
292 let mut current_delay = Duration::from_millis(policy.base_delay_ms);
293 let timeout_duration = Duration::from_millis(policy.timeout_ms);
294 let max_delay = Duration::from_millis(policy.max_delay_ms);
295 let max_retries = policy.max_retries;
296
297 let mut last_error =
298 NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
299 while retries < max_retries {
300 debug!("[{task_name}] Attempt {} of {}", retries + 1, max_retries);
301 match timeout(timeout_duration, task()).await {
302 Ok(Ok(r)) => {
303 return Ok(r); // Exit on success
304 }
305 Ok(Err(status)) => {
306 last_error = match status.code() {
307 Code::Unavailable => {
308 warn!("[{task_name}] Service unavailable: {}", status.message());
309 NetworkError::ServiceUnavailable(format!(
310 "Service unavailable: {}",
311 status.message()
312 ))
313 }
314 _ => {
315 warn!("[{task_name}] RPC error: {}", status);
316 NetworkError::TonicStatusError(Box::new(status))
317 }
318 };
319 }
320 Err(_e) => {
321 warn!("[{task_name}] Task timed out after {:?}", timeout_duration);
322 last_error = NetworkError::RetryTimeoutError(timeout_duration);
323 }
324 };
325
326 if retries < max_retries - 1 {
327 debug!("[{task_name}] Retrying in {:?}...", current_delay);
328 sleep(current_delay).await;
329
330 // Exponential backoff (double the delay each time)
331 current_delay = (current_delay * 2).min(max_delay);
332 } else {
333 warn!("[{task_name}] Task failed after {} retries", retries);
334 //bug: no need to return if the it is not a business logic error
335 // return Err(Error::RetryTaskFailed("Task failed after max
336 // retries".to_string())); // Return the last error after max
337 // retries
338 }
339 retries += 1;
340 }
341 warn!("[{task_name}] Task failed after {} retries", max_retries);
342 Err(last_error.into()) // Fallback error message if no task returns Ok
343}