d_engine_core/network/mod.rs
1//! This module is the network abstraction layer with timeout-aware gRPC
2//! implementation
3//!
4//! This module provides network communication facilities with configurable
5//! timeout policies for distributed system operations. All network operations
6//! are governed by timeout parameters defined in [`RaftConfig`] to ensure
7//! system responsiveness.
8
9mod background_snapshot_transfer;
10
11#[cfg(test)]
12mod background_snapshot_transfer_test;
13use async_trait::async_trait;
14pub use background_snapshot_transfer::*;
15use d_engine_proto::server::cluster::ClusterConfChangeRequest;
16use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
17use d_engine_proto::server::election::VoteRequest;
18use d_engine_proto::server::election::VoteResponse;
19use d_engine_proto::server::replication::AppendEntriesRequest;
20use d_engine_proto::server::replication::AppendEntriesResponse;
21use d_engine_proto::server::storage::SnapshotChunk;
22use d_engine_proto::server::storage::SnapshotMetadata;
23use futures::stream::BoxStream;
24#[cfg(any(test, feature = "__test_support"))]
25use mockall::automock;
26use tokio::sync::mpsc;
27
28use crate::BackoffPolicy;
29use crate::NetworkError;
30use crate::Result;
31use crate::RetryPolicies;
32use crate::TypeConfig;
33
34/// Per-peer persistent bidirectional replication stream.
35///
36/// Opened once per follower at leader startup (or on reconnect). The sender
37/// pushes `AppendEntriesRequest` batches; the receiver yields
38/// `AppendEntriesResponse` ACKs. Dropping the sender closes the stream.
39pub struct ReplicationStream {
40 /// Push AppendEntries batches directly into the open h2 stream (capacity = 128).
41 pub sender: mpsc::Sender<AppendEntriesRequest>,
42 /// Receive ACKs from the follower; items are `Result<_, tonic::Status>`.
43 pub receiver: BoxStream<'static, std::result::Result<AppendEntriesResponse, tonic::Status>>,
44}
45
46// Define a structured return value
47#[derive(Debug, Clone)]
48pub struct AppendResults {
49 /// Whether a majority quorum is achieved (can be directly determined via
50 /// peer_updates)
51 pub commit_quorum_achieved: bool,
52 /// Updates to each peer's match_index and next_index
53 pub peer_updates: HashMap<u32, PeerUpdate>,
54 /// Learner log catch-up progress
55 pub learner_progress: HashMap<u32, Option<u64>>,
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub struct PeerUpdate {
60 pub match_index: Option<u64>,
61 pub next_index: u64,
62 /// if peer response success
63 pub success: bool,
64}
65
66impl PeerUpdate {
67 #[allow(unused)]
68 pub fn success(
69 match_index: u64,
70 next_index: u64,
71 ) -> Self {
72 PeerUpdate {
73 match_index: Some(match_index),
74 next_index,
75 success: true,
76 }
77 }
78
79 #[allow(unused)]
80 pub fn failed() -> Self {
81 Self {
82 match_index: None,
83 next_index: 1,
84 success: false,
85 }
86 }
87}
88#[derive(Debug)]
89pub struct AppendResult {
90 pub peer_ids: HashSet<u32>,
91 pub responses: Vec<Result<AppendEntriesResponse>>,
92}
93#[derive(Debug)]
94pub struct VoteResult {
95 pub peer_ids: HashSet<u32>,
96 pub responses: Vec<Result<VoteResponse>>,
97}
98#[allow(dead_code)]
99#[derive(Debug)]
100pub struct ClusterUpdateResult {
101 pub peer_ids: HashSet<u32>,
102 pub responses: Vec<Result<ClusterConfUpdateResponse>>,
103}
104
105#[cfg_attr(any(test, feature = "__test_support"), automock)]
106#[async_trait]
107pub trait Transport<T>: Send + Sync + 'static
108where
109 T: TypeConfig,
110{
111 /// Propagates cluster configuration changes to voting members using Raft's joint consensus.
112 ///
113 /// # Protocol
114 /// - Implements membership change protocol from Raft §6
115 /// - Leader-exclusive operation
116 /// - Automatically filters self-references and duplicates
117 ///
118 /// # Parameters
119 /// - `req`: Configuration change details with transition state
120 /// - `retry`: Network retry policy with exponential backoff
121 /// - `membership`: Cluster membership for channel resolution
122 ///
123 /// # Errors
124 /// - `NetworkError::EmptyPeerList` if no peers provided
125 /// - `NetworkError::TaskFailed` for background execution failures
126 /// - `ConsensusError::NotLeader` if executed by non-leader
127 ///
128 /// # Implementation
129 /// - Uses compressed gRPC streams for efficiency
130 /// - Maintains response order matching input peers
131 /// - Concurrent request processing with ordered aggregation
132 #[allow(dead_code)]
133 async fn send_cluster_update(
134 &self,
135 req: ClusterConfChangeRequest,
136 retry: &RetryPolicies,
137 membership: std::sync::Arc<crate::alias::MOF<T>>,
138 ) -> Result<ClusterUpdateResult>;
139
140 /// Replicates log entries to followers and learners.
141 ///
142 /// # Protocol
143 /// - Implements log replication from Raft §5.3
144 /// - Leader-exclusive operation
145 /// - Handles log consistency checks automatically
146 ///
147 /// # Parameters
148 /// - `requests`: Vector of (peer_id, AppendEntriesRequest)
149 /// - `retry`: Network retry configuration
150 /// - `membership`: Cluster membership for channel resolution
151 /// - `response_compress_enabled`: Enable compression for replication responses
152 ///
153 /// # Returns
154 /// - On success: `Ok(AppendResult)` containing aggregated responses
155 /// - On failure: `Err(NetworkError)` for unrecoverable errors
156 ///
157 /// ## **Error Conditions**: Top-level `Err` is returned ONLY when:
158 /// - Input `requests_with_peer_address` is empty (`NetworkError::EmptyPeerList`)
159 /// - Critical failures prevent spawning async tasks (not shown in current impl)
160 ///
161 /// # Errors
162 /// - `NetworkError::EmptyPeerList` for empty input
163 /// - `NetworkError::TaskFailed` for partial execution failures
164 ///
165 /// # Guarantees
166 /// - At-least-once delivery semantics
167 /// - Automatic deduplication of peer entries
168 /// - Non-blocking error handling
169 async fn send_append_requests(
170 &self,
171 requests: Vec<(u32, AppendEntriesRequest)>,
172 retry: &RetryPolicies,
173 membership: std::sync::Arc<crate::alias::MOF<T>>,
174 response_compress_enabled: bool,
175 ) -> Result<AppendResult>;
176
177 /// Initiates leader election by requesting votes from cluster peers.
178 ///
179 /// # Protocol
180 /// - Implements leader election from Raft §5.2
181 /// - Candidate-exclusive operation
182 /// - Validates log completeness requirements
183 ///
184 /// # Parameters
185 /// - `req`: Election metadata with candidate's term and log state
186 /// - `retry`: Election-specific retry strategy
187 /// - `membership`: Cluster membership for channel resolution
188 ///
189 /// # Errors
190 /// - `NetworkError::EmptyPeerList` for empty peer list
191 /// - `NetworkError::TaskFailed` for RPC execution failures
192 ///
193 /// # Safety
194 /// - Automatic term validation in responses
195 /// - Strict candidate state enforcement
196 /// - Non-blocking partial failure handling
197 async fn send_vote_requests(
198 &self,
199 req: VoteRequest,
200 retry: &RetryPolicies,
201 membership: std::sync::Arc<crate::alias::MOF<T>>,
202 ) -> Result<VoteResult>;
203
204 /// Orchestrates log compaction across cluster peers after snapshot creation.
205 ///
206 /// # Protocol
207 /// - Implements log truncation from Raft §7
208 /// Initiates cluster join process for a learner node
209 ///
210 /// # Protocol
211 /// - Implements cluster join protocol from Raft §6
212 /// - Learner-exclusive operation
213 /// - Requires leader connection
214 ///
215 /// # Parameters
216 /// - `leader_channel`: Pre-established gRPC channel to cluster leader
217 /// - `request`: Join request with node metadata
218 /// - `retry`: Join-specific retry configuration
219 /// - `membership`: Cluster membership for channel resolution
220 ///
221 /// # Errors
222 /// - NetworkError::JoinFailed: On unrecoverable join failure
223 /// - NetworkError::NotLeader: If contacted node isn't leader
224 ///
225 /// # Guarantees
226 /// - At-least-once delivery
227 /// - Automatic leader discovery
228 /// - Idempotent operation
229 async fn join_cluster(
230 &self,
231 leader_id: u32,
232 request: d_engine_proto::server::cluster::JoinRequest,
233 retry: BackoffPolicy,
234 membership: std::sync::Arc<crate::alias::MOF<T>>,
235 ) -> Result<d_engine_proto::server::cluster::JoinResponse>;
236
237 /// Discovers current cluster leader
238 ///
239 /// # Protocol
240 /// - Broadcast-based leader discovery
241 /// - Handles redirection to current leader
242 ///
243 /// # Parameters
244 /// - `bootstrap_endpoints`: Initial cluster endpoints
245 /// - `request`: Discovery request with node metadata
246 ///
247 /// # Errors
248 /// - `NetworkError::DiscoveryTimeout`: When no response received
249 async fn discover_leader(
250 &self,
251 request: d_engine_proto::server::cluster::LeaderDiscoveryRequest,
252 rpc_enable_compression: bool,
253 membership: std::sync::Arc<crate::alias::MOF<T>>,
254 ) -> Result<Vec<d_engine_proto::server::cluster::LeaderDiscoveryResponse>>;
255
256 /// Send a single AppendEntries request to one peer (used by ReplicationWorker).
257 ///
258 /// Non-blocking from the caller's perspective: the caller fires this and the
259 /// per-follower worker task awaits the response independently. Reuses the
260 /// existing FIFO `peer_appender_task` infrastructure internally.
261 ///
262 /// # Parameters
263 /// - `peer_id`: Target follower node ID
264 /// - `request`: AppendEntries RPC request
265 /// - `retry`: Retry / timeout configuration
266 /// - `membership`: Cluster membership for channel resolution
267 /// - `response_compress_enabled`: Enable gRPC response compression
268 ///
269 /// # Returns
270 /// `Ok(AppendEntriesResponse)` on success, `Err` on network / timeout failure
271 async fn send_append_request(
272 &self,
273 peer_id: u32,
274 request: AppendEntriesRequest,
275 retry: &RetryPolicies,
276 membership: std::sync::Arc<crate::alias::MOF<T>>,
277 response_compress_enabled: bool,
278 ) -> Result<AppendEntriesResponse>;
279
280 /// Pushes a snapshot to a lagging peer (called by per-follower ReplicationWorker).
281 ///
282 /// Used when a peer's `next_index` falls below the leader's purge boundary and
283 /// AppendEntries would carry a stale `prev_log_term = 0`, causing a perpetual
284 /// conflict loop. The worker calls this and awaits completion before emitting
285 /// `RoleEvent::SnapshotPushCompleted`.
286 ///
287 /// # Parameters
288 /// - `peer_id`: Target follower node ID
289 /// - `metadata`: Snapshot metadata (term, index, size)
290 /// - `state_machine_handler`: Used to load the snapshot data stream
291 /// - `membership`: Cluster membership for bulk-channel resolution
292 /// - `config`: Snapshot transfer configuration (chunk size, timeout, etc.)
293 ///
294 /// # Returns
295 /// `Ok(())` on successful transfer, `Err` on network or serialization failure.
296 async fn send_snapshot(
297 &self,
298 peer_id: u32,
299 metadata: SnapshotMetadata,
300 state_machine_handler: std::sync::Arc<crate::alias::SMHOF<T>>,
301 membership: std::sync::Arc<crate::alias::MOF<T>>,
302 config: crate::SnapshotConfig,
303 ) -> Result<()>;
304
305 /// Requests and streams a snapshot from the current leader.
306 ///
307 /// # Parameters
308 /// - `leader_id`: Current leader node ID
309 /// - `retry`: Retry configuration (currently unused in implementation)
310 /// - `membership`: Cluster membership for channel resolution
311 ///
312 /// # Returns
313 /// Streaming of snapshot chunks from the leader
314 ///
315 /// # Errors
316 /// Returns `NetworkError` if:
317 /// - Connection to leader fails
318 /// - gRPC call fails
319 async fn request_snapshot_from_leader(
320 &self,
321 leader_id: u32,
322 ack_tx: tokio::sync::mpsc::Receiver<d_engine_proto::server::storage::SnapshotAck>,
323 retry: &crate::InstallSnapshotBackoffPolicy,
324 membership: std::sync::Arc<crate::alias::MOF<T>>,
325 ) -> Result<Box<tonic::Streaming<SnapshotChunk>>>;
326
327 /// Opens a persistent bidirectional AppendEntries stream to the given peer.
328 ///
329 /// Called once per follower when the leader becomes active (or on reconnect).
330 /// The returned [`ReplicationStream`] contains:
331 /// - `sender`: push batches into the open h2 stream (non-blocking, capacity 128)
332 /// - `receiver`: stream of ACKs from the follower
333 ///
334 /// When the stream breaks (network error, peer restart), the receiver yields
335 /// an `Err(tonic::Status)` and the caller should emit
336 /// `RoleEvent::PeerStreamError { peer_id }` so the Raft loop can reset
337 /// `next_index` and schedule reconnection.
338 ///
339 /// # Errors
340 /// Returns `NetworkError` if the initial stream handshake fails.
341 async fn open_replication_stream(
342 &self,
343 peer_id: u32,
344 membership: std::sync::Arc<crate::alias::MOF<T>>,
345 compress: bool,
346 ) -> Result<ReplicationStream>;
347}
348
349// Module level utils
350// -----------------------------------------------------------------------------
351use std::collections::HashMap;
352use std::collections::HashSet;
353use std::time::Duration;
354
355use tokio::time::sleep;
356use tokio::time::timeout;
357use tonic::Code;
358use tracing::debug;
359use tracing::warn;
360
361use crate::Error;
362
363/// As soon as task has return we should return from this function
364pub async fn grpc_task_with_timeout_and_exponential_backoff<F, T, U>(
365 task_name: &'static str,
366 mut task: F,
367 policy: BackoffPolicy,
368) -> std::result::Result<tonic::Response<U>, Error>
369where
370 F: FnMut() -> T,
371 T: std::future::Future<Output = std::result::Result<tonic::Response<U>, tonic::Status>>
372 + Send
373 + 'static,
374{
375 // let max_retries = 5;
376 let mut retries = 0;
377 let mut current_delay = Duration::from_millis(policy.base_delay_ms);
378 let timeout_duration = Duration::from_millis(policy.timeout_ms);
379 let max_delay = Duration::from_millis(policy.max_delay_ms);
380 let max_retries = policy.max_retries;
381
382 let mut last_error =
383 NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
384 while retries < max_retries {
385 debug!("[{task_name}] Attempt {} of {}", retries + 1, max_retries);
386 match timeout(timeout_duration, task()).await {
387 Ok(Ok(r)) => {
388 return Ok(r); // Exit on success
389 }
390 Ok(Err(status)) => {
391 last_error = match status.code() {
392 Code::Unavailable => {
393 warn!("[{task_name}] Service unavailable: {}", status.message());
394 NetworkError::ServiceUnavailable(format!(
395 "Service unavailable: {}",
396 status.message()
397 ))
398 }
399 _ => {
400 warn!("[{task_name}] RPC error: {}", status);
401 NetworkError::TonicStatusError(Box::new(status))
402 }
403 };
404 }
405 Err(_e) => {
406 warn!("[{task_name}] Task timed out after {:?}", timeout_duration);
407 last_error = NetworkError::RetryTimeoutError(timeout_duration);
408 }
409 };
410
411 if retries < max_retries - 1 {
412 debug!("[{task_name}] Retrying in {:?}...", current_delay);
413 sleep(current_delay).await;
414
415 // Exponential backoff (double the delay each time)
416 current_delay = (current_delay * 2).min(max_delay);
417 } else {
418 warn!("[{task_name}] Task failed after {} retries", retries);
419 //bug: no need to return if the it is not a business logic error
420 // return Err(Error::RetryTaskFailed("Task failed after max
421 // retries".to_string())); // Return the last error after max
422 // retries
423 }
424 retries += 1;
425 }
426 warn!("[{task_name}] Task failed after {} retries", max_retries);
427 Err(last_error.into()) // Fallback error message if no task returns Ok
428}