Skip to main content

d_engine_core/network/
mod.rs

1//! This module is the network abstraction layer with timeout-aware gRPC
2//! implementation
3//!
4//! This module provides network communication facilities with configurable
5//! timeout policies for distributed system operations. All network operations
6//! are governed by timeout parameters defined in [`RaftConfig`] to ensure
7//! system responsiveness.
8
9mod background_snapshot_transfer;
10
11#[cfg(test)]
12mod background_snapshot_transfer_test;
13use async_trait::async_trait;
14pub use background_snapshot_transfer::*;
15use d_engine_proto::server::cluster::ClusterConfChangeRequest;
16use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
17use d_engine_proto::server::election::VoteRequest;
18use d_engine_proto::server::election::VoteResponse;
19use d_engine_proto::server::replication::AppendEntriesRequest;
20use d_engine_proto::server::replication::AppendEntriesResponse;
21use d_engine_proto::server::storage::SnapshotChunk;
22use d_engine_proto::server::storage::SnapshotMetadata;
23use futures::stream::BoxStream;
24#[cfg(any(test, feature = "__test_support"))]
25use mockall::automock;
26use tokio::sync::mpsc;
27
28use crate::BackoffPolicy;
29use crate::NetworkError;
30use crate::Result;
31use crate::RetryPolicies;
32use crate::TypeConfig;
33
34/// Per-peer persistent bidirectional replication stream.
35///
36/// Opened once per follower at leader startup (or on reconnect). The sender
37/// pushes `AppendEntriesRequest` batches; the receiver yields
38/// `AppendEntriesResponse` ACKs. Dropping the sender closes the stream.
39pub struct ReplicationStream {
40    /// Push AppendEntries batches directly into the open h2 stream (capacity = 128).
41    pub sender: mpsc::Sender<AppendEntriesRequest>,
42    /// Receive ACKs from the follower; items are `Result<_, tonic::Status>`.
43    pub receiver: BoxStream<'static, std::result::Result<AppendEntriesResponse, tonic::Status>>,
44}
45
46// Define a structured return value
47#[derive(Debug, Clone)]
48pub struct AppendResults {
49    /// Whether a majority quorum is achieved (can be directly determined via
50    /// peer_updates)
51    pub commit_quorum_achieved: bool,
52    /// Updates to each peer's match_index and next_index
53    pub peer_updates: HashMap<u32, PeerUpdate>,
54    /// Learner log catch-up progress
55    pub learner_progress: HashMap<u32, Option<u64>>,
56}
57
58#[derive(Debug, Clone, PartialEq)]
59pub struct PeerUpdate {
60    pub match_index: Option<u64>,
61    pub next_index: u64,
62    /// if peer response success
63    pub success: bool,
64}
65
66impl PeerUpdate {
67    #[allow(unused)]
68    pub fn success(
69        match_index: u64,
70        next_index: u64,
71    ) -> Self {
72        PeerUpdate {
73            match_index: Some(match_index),
74            next_index,
75            success: true,
76        }
77    }
78
79    #[allow(unused)]
80    pub fn failed() -> Self {
81        Self {
82            match_index: None,
83            next_index: 1,
84            success: false,
85        }
86    }
87}
88#[derive(Debug)]
89pub struct AppendResult {
90    pub peer_ids: HashSet<u32>,
91    pub responses: Vec<Result<AppendEntriesResponse>>,
92}
93#[derive(Debug)]
94pub struct VoteResult {
95    pub peer_ids: HashSet<u32>,
96    pub responses: Vec<Result<VoteResponse>>,
97}
98#[allow(dead_code)]
99#[derive(Debug)]
100pub struct ClusterUpdateResult {
101    pub peer_ids: HashSet<u32>,
102    pub responses: Vec<Result<ClusterConfUpdateResponse>>,
103}
104
105#[cfg_attr(any(test, feature = "__test_support"), automock)]
106#[async_trait]
107pub trait Transport<T>: Send + Sync + 'static
108where
109    T: TypeConfig,
110{
111    /// Propagates cluster configuration changes to voting members using Raft's joint consensus.
112    ///
113    /// # Protocol
114    /// - Implements membership change protocol from Raft §6
115    /// - Leader-exclusive operation
116    /// - Automatically filters self-references and duplicates
117    ///
118    /// # Parameters
119    /// - `req`: Configuration change details with transition state
120    /// - `retry`: Network retry policy with exponential backoff
121    /// - `membership`: Cluster membership for channel resolution
122    ///
123    /// # Errors
124    /// - `NetworkError::EmptyPeerList` if no peers provided
125    /// - `NetworkError::TaskFailed` for background execution failures
126    /// - `ConsensusError::NotLeader` if executed by non-leader
127    ///
128    /// # Implementation
129    /// - Uses compressed gRPC streams for efficiency
130    /// - Maintains response order matching input peers
131    /// - Concurrent request processing with ordered aggregation
132    #[allow(dead_code)]
133    async fn send_cluster_update(
134        &self,
135        req: ClusterConfChangeRequest,
136        retry: &RetryPolicies,
137        membership: std::sync::Arc<crate::alias::MOF<T>>,
138    ) -> Result<ClusterUpdateResult>;
139
140    /// Replicates log entries to followers and learners.
141    ///
142    /// # Protocol
143    /// - Implements log replication from Raft §5.3
144    /// - Leader-exclusive operation
145    /// - Handles log consistency checks automatically
146    ///
147    /// # Parameters
148    /// - `requests`: Vector of (peer_id, AppendEntriesRequest)
149    /// - `retry`: Network retry configuration
150    /// - `membership`: Cluster membership for channel resolution
151    /// - `response_compress_enabled`: Enable compression for replication responses
152    ///
153    /// # Returns
154    /// - On success: `Ok(AppendResult)` containing aggregated responses
155    /// - On failure: `Err(NetworkError)` for unrecoverable errors
156    ///
157    /// ## **Error Conditions**: Top-level `Err` is returned ONLY when:
158    /// - Input `requests_with_peer_address` is empty (`NetworkError::EmptyPeerList`)
159    /// - Critical failures prevent spawning async tasks (not shown in current impl)
160    ///
161    /// # Errors
162    /// - `NetworkError::EmptyPeerList` for empty input
163    /// - `NetworkError::TaskFailed` for partial execution failures
164    ///
165    /// # Guarantees
166    /// - At-least-once delivery semantics
167    /// - Automatic deduplication of peer entries
168    /// - Non-blocking error handling
169    async fn send_append_requests(
170        &self,
171        requests: Vec<(u32, AppendEntriesRequest)>,
172        retry: &RetryPolicies,
173        membership: std::sync::Arc<crate::alias::MOF<T>>,
174        response_compress_enabled: bool,
175    ) -> Result<AppendResult>;
176
177    /// Initiates leader election by requesting votes from cluster peers.
178    ///
179    /// # Protocol
180    /// - Implements leader election from Raft §5.2
181    /// - Candidate-exclusive operation
182    /// - Validates log completeness requirements
183    ///
184    /// # Parameters
185    /// - `req`: Election metadata with candidate's term and log state
186    /// - `retry`: Election-specific retry strategy
187    /// - `membership`: Cluster membership for channel resolution
188    ///
189    /// # Errors
190    /// - `NetworkError::EmptyPeerList` for empty peer list
191    /// - `NetworkError::TaskFailed` for RPC execution failures
192    ///
193    /// # Safety
194    /// - Automatic term validation in responses
195    /// - Strict candidate state enforcement
196    /// - Non-blocking partial failure handling
197    async fn send_vote_requests(
198        &self,
199        req: VoteRequest,
200        retry: &RetryPolicies,
201        membership: std::sync::Arc<crate::alias::MOF<T>>,
202    ) -> Result<VoteResult>;
203
204    /// Orchestrates log compaction across cluster peers after snapshot creation.
205    ///
206    /// # Protocol
207    /// - Implements log truncation from Raft §7
208    /// Initiates cluster join process for a learner node
209    ///
210    /// # Protocol
211    /// - Implements cluster join protocol from Raft §6
212    /// - Learner-exclusive operation
213    /// - Requires leader connection
214    ///
215    /// # Parameters
216    /// - `leader_channel`: Pre-established gRPC channel to cluster leader
217    /// - `request`: Join request with node metadata
218    /// - `retry`: Join-specific retry configuration
219    /// - `membership`: Cluster membership for channel resolution
220    ///
221    /// # Errors
222    /// - NetworkError::JoinFailed: On unrecoverable join failure
223    /// - NetworkError::NotLeader: If contacted node isn't leader
224    ///
225    /// # Guarantees
226    /// - At-least-once delivery
227    /// - Automatic leader discovery
228    /// - Idempotent operation
229    async fn join_cluster(
230        &self,
231        leader_id: u32,
232        request: d_engine_proto::server::cluster::JoinRequest,
233        retry: BackoffPolicy,
234        membership: std::sync::Arc<crate::alias::MOF<T>>,
235    ) -> Result<d_engine_proto::server::cluster::JoinResponse>;
236
237    /// Discovers current cluster leader
238    ///
239    /// # Protocol
240    /// - Broadcast-based leader discovery
241    /// - Handles redirection to current leader
242    ///
243    /// # Parameters
244    /// - `bootstrap_endpoints`: Initial cluster endpoints
245    /// - `request`: Discovery request with node metadata
246    ///
247    /// # Errors
248    /// - `NetworkError::DiscoveryTimeout`: When no response received
249    async fn discover_leader(
250        &self,
251        request: d_engine_proto::server::cluster::LeaderDiscoveryRequest,
252        rpc_enable_compression: bool,
253        membership: std::sync::Arc<crate::alias::MOF<T>>,
254    ) -> Result<Vec<d_engine_proto::server::cluster::LeaderDiscoveryResponse>>;
255
256    /// Send a single AppendEntries request to one peer (used by ReplicationWorker).
257    ///
258    /// Non-blocking from the caller's perspective: the caller fires this and the
259    /// per-follower worker task awaits the response independently. Reuses the
260    /// existing FIFO `peer_appender_task` infrastructure internally.
261    ///
262    /// # Parameters
263    /// - `peer_id`: Target follower node ID
264    /// - `request`: AppendEntries RPC request
265    /// - `retry`: Retry / timeout configuration
266    /// - `membership`: Cluster membership for channel resolution
267    /// - `response_compress_enabled`: Enable gRPC response compression
268    ///
269    /// # Returns
270    /// `Ok(AppendEntriesResponse)` on success, `Err` on network / timeout failure
271    async fn send_append_request(
272        &self,
273        peer_id: u32,
274        request: AppendEntriesRequest,
275        retry: &RetryPolicies,
276        membership: std::sync::Arc<crate::alias::MOF<T>>,
277        response_compress_enabled: bool,
278    ) -> Result<AppendEntriesResponse>;
279
280    /// Pushes a snapshot to a lagging peer (called by per-follower ReplicationWorker).
281    ///
282    /// Used when a peer's `next_index` falls below the leader's purge boundary and
283    /// AppendEntries would carry a stale `prev_log_term = 0`, causing a perpetual
284    /// conflict loop.  The worker calls this and awaits completion before emitting
285    /// `RoleEvent::SnapshotPushCompleted`.
286    ///
287    /// # Parameters
288    /// - `peer_id`: Target follower node ID
289    /// - `metadata`: Snapshot metadata (term, index, size)
290    /// - `state_machine_handler`: Used to load the snapshot data stream
291    /// - `membership`: Cluster membership for bulk-channel resolution
292    /// - `config`: Snapshot transfer configuration (chunk size, timeout, etc.)
293    ///
294    /// # Returns
295    /// `Ok(())` on successful transfer, `Err` on network or serialization failure.
296    async fn send_snapshot(
297        &self,
298        peer_id: u32,
299        metadata: SnapshotMetadata,
300        state_machine_handler: std::sync::Arc<crate::alias::SMHOF<T>>,
301        membership: std::sync::Arc<crate::alias::MOF<T>>,
302        config: crate::SnapshotConfig,
303    ) -> Result<()>;
304
305    /// Requests and streams a snapshot from the current leader.
306    ///
307    /// # Parameters
308    /// - `leader_id`: Current leader node ID
309    /// - `retry`: Retry configuration (currently unused in implementation)
310    /// - `membership`: Cluster membership for channel resolution
311    ///
312    /// # Returns
313    /// Streaming of snapshot chunks from the leader
314    ///
315    /// # Errors
316    /// Returns `NetworkError` if:
317    /// - Connection to leader fails
318    /// - gRPC call fails
319    async fn request_snapshot_from_leader(
320        &self,
321        leader_id: u32,
322        ack_tx: tokio::sync::mpsc::Receiver<d_engine_proto::server::storage::SnapshotAck>,
323        retry: &crate::InstallSnapshotBackoffPolicy,
324        membership: std::sync::Arc<crate::alias::MOF<T>>,
325    ) -> Result<Box<tonic::Streaming<SnapshotChunk>>>;
326
327    /// Opens a persistent bidirectional AppendEntries stream to the given peer.
328    ///
329    /// Called once per follower when the leader becomes active (or on reconnect).
330    /// The returned [`ReplicationStream`] contains:
331    /// - `sender`: push batches into the open h2 stream (non-blocking, capacity 128)
332    /// - `receiver`: stream of ACKs from the follower
333    ///
334    /// When the stream breaks (network error, peer restart), the receiver yields
335    /// an `Err(tonic::Status)` and the caller should emit
336    /// `RoleEvent::PeerStreamError { peer_id }` so the Raft loop can reset
337    /// `next_index` and schedule reconnection.
338    ///
339    /// # Errors
340    /// Returns `NetworkError` if the initial stream handshake fails.
341    async fn open_replication_stream(
342        &self,
343        peer_id: u32,
344        membership: std::sync::Arc<crate::alias::MOF<T>>,
345        compress: bool,
346    ) -> Result<ReplicationStream>;
347}
348
349// Module level utils
350// -----------------------------------------------------------------------------
351use std::collections::HashMap;
352use std::collections::HashSet;
353use std::time::Duration;
354
355use tokio::time::sleep;
356use tokio::time::timeout;
357use tonic::Code;
358use tracing::debug;
359use tracing::warn;
360
361use crate::Error;
362
363/// As soon as task has return we should return from this function
364pub async fn grpc_task_with_timeout_and_exponential_backoff<F, T, U>(
365    task_name: &'static str,
366    mut task: F,
367    policy: BackoffPolicy,
368) -> std::result::Result<tonic::Response<U>, Error>
369where
370    F: FnMut() -> T,
371    T: std::future::Future<Output = std::result::Result<tonic::Response<U>, tonic::Status>>
372        + Send
373        + 'static,
374{
375    // let max_retries = 5;
376    let mut retries = 0;
377    let mut current_delay = Duration::from_millis(policy.base_delay_ms);
378    let timeout_duration = Duration::from_millis(policy.timeout_ms);
379    let max_delay = Duration::from_millis(policy.max_delay_ms);
380    let max_retries = policy.max_retries;
381
382    let mut last_error =
383        NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
384    while retries < max_retries {
385        debug!("[{task_name}] Attempt {} of {}", retries + 1, max_retries);
386        match timeout(timeout_duration, task()).await {
387            Ok(Ok(r)) => {
388                return Ok(r); // Exit on success
389            }
390            Ok(Err(status)) => {
391                last_error = match status.code() {
392                    Code::Unavailable => {
393                        warn!("[{task_name}] Service unavailable: {}", status.message());
394                        NetworkError::ServiceUnavailable(format!(
395                            "Service unavailable: {}",
396                            status.message()
397                        ))
398                    }
399                    _ => {
400                        warn!("[{task_name}] RPC error: {}", status);
401                        NetworkError::TonicStatusError(Box::new(status))
402                    }
403                };
404            }
405            Err(_e) => {
406                warn!("[{task_name}] Task timed out after {:?}", timeout_duration);
407                last_error = NetworkError::RetryTimeoutError(timeout_duration);
408            }
409        };
410
411        if retries < max_retries - 1 {
412            debug!("[{task_name}] Retrying in {:?}...", current_delay);
413            sleep(current_delay).await;
414
415            // Exponential backoff (double the delay each time)
416            current_delay = (current_delay * 2).min(max_delay);
417        } else {
418            warn!("[{task_name}] Task failed after {} retries", retries);
419            //bug: no need to return if the it is not a business logic error
420            // return Err(Error::RetryTaskFailed("Task failed after max
421            // retries".to_string())); // Return the last error after max
422            // retries
423        }
424        retries += 1;
425    }
426    warn!("[{task_name}] Task failed after {} retries", max_retries);
427    Err(last_error.into()) // Fallback error message if no task returns Ok
428}