d_engine_core/network/
mod.rs

1//! This module is the network abstraction layer with timeout-aware gRPC
2//! implementation
3//!
4//! This module provides network communication facilities with configurable
5//! timeout policies for distributed system operations. All network operations
6//! are governed by timeout parameters defined in [`RaftConfig`] to ensure
7//! system responsiveness.
8
9mod background_snapshot_transfer;
10
11#[cfg(test)]
12mod background_snapshot_transfer_test;
13pub(crate) use background_snapshot_transfer::*;
14use d_engine_proto::server::cluster::ClusterConfChangeRequest;
15use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
16use d_engine_proto::server::election::VoteRequest;
17use d_engine_proto::server::election::VoteResponse;
18use d_engine_proto::server::replication::AppendEntriesRequest;
19use d_engine_proto::server::replication::AppendEntriesResponse;
20use d_engine_proto::server::storage::PurgeLogRequest;
21use d_engine_proto::server::storage::PurgeLogResponse;
22use d_engine_proto::server::storage::SnapshotChunk;
23#[cfg(any(test, feature = "test-utils"))]
24use mockall::automock;
25use tonic::async_trait;
26
27use crate::BackoffPolicy;
28use crate::NetworkError;
29use crate::Result;
30use crate::RetryPolicies;
31use crate::TypeConfig;
32
33// Define a structured return value
34#[derive(Debug, Clone)]
35pub struct AppendResults {
36    /// Whether a majority quorum is achieved (can be directly determined via
37    /// peer_updates)
38    pub commit_quorum_achieved: bool,
39    /// Updates to each peer's match_index and next_index
40    pub peer_updates: HashMap<u32, PeerUpdate>,
41    /// Learner log catch-up progress
42    pub learner_progress: HashMap<u32, Option<u64>>,
43}
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct PeerUpdate {
47    pub match_index: Option<u64>,
48    pub next_index: u64,
49    /// if peer response success
50    pub success: bool,
51}
52
53impl PeerUpdate {
54    #[allow(unused)]
55    pub fn success(
56        match_index: u64,
57        next_index: u64,
58    ) -> Self {
59        PeerUpdate {
60            match_index: Some(match_index),
61            next_index,
62            success: true,
63        }
64    }
65
66    #[allow(unused)]
67    pub fn failed() -> Self {
68        Self {
69            match_index: None,
70            next_index: 1,
71            success: false,
72        }
73    }
74}
75#[derive(Debug)]
76pub struct AppendResult {
77    pub peer_ids: HashSet<u32>,
78    pub responses: Vec<Result<AppendEntriesResponse>>,
79}
80#[derive(Debug)]
81pub struct VoteResult {
82    pub peer_ids: HashSet<u32>,
83    pub responses: Vec<Result<VoteResponse>>,
84}
85#[allow(dead_code)]
86#[derive(Debug)]
87pub struct ClusterUpdateResult {
88    pub peer_ids: HashSet<u32>,
89    pub responses: Vec<Result<ClusterConfUpdateResponse>>,
90}
91
92#[cfg_attr(any(test, feature = "test-utils"), automock)]
93#[async_trait]
94pub trait Transport<T>: Send + Sync + 'static
95where
96    T: TypeConfig,
97{
98    /// Propagates cluster configuration changes to voting members using Raft's joint consensus.
99    ///
100    /// # Protocol
101    /// - Implements membership change protocol from Raft §6
102    /// - Leader-exclusive operation
103    /// - Automatically filters self-references and duplicates
104    ///
105    /// # Parameters
106    /// - `req`: Configuration change details with transition state
107    /// - `retry`: Network retry policy with exponential backoff
108    /// - `membership`: Cluster membership for channel resolution
109    ///
110    /// # Errors
111    /// - `NetworkError::EmptyPeerList` if no peers provided
112    /// - `NetworkError::TaskFailed` for background execution failures
113    /// - `ConsensusError::NotLeader` if executed by non-leader
114    ///
115    /// # Implementation
116    /// - Uses compressed gRPC streams for efficiency
117    /// - Maintains response order matching input peers
118    /// - Concurrent request processing with ordered aggregation
119    #[allow(dead_code)]
120    async fn send_cluster_update(
121        &self,
122        req: ClusterConfChangeRequest,
123        retry: &RetryPolicies,
124        membership: std::sync::Arc<crate::alias::MOF<T>>,
125    ) -> Result<ClusterUpdateResult>;
126
127    /// Replicates log entries to followers and learners.
128    ///
129    /// # Protocol
130    /// - Implements log replication from Raft §5.3
131    /// - Leader-exclusive operation
132    /// - Handles log consistency checks automatically
133    ///
134    /// # Parameters
135    /// - `requests`: Vector of (peer_id, AppendEntriesRequest)
136    /// - `retry`: Network retry configuration
137    /// - `membership`: Cluster membership for channel resolution
138    /// - `response_compress_enabled`: Enable compression for replication responses
139    ///
140    /// # Returns
141    /// - On success: `Ok(AppendResult)` containing aggregated responses
142    /// - On failure: `Err(NetworkError)` for unrecoverable errors
143    ///
144    /// ## **Error Conditions**: Top-level `Err` is returned ONLY when:
145    /// - Input `requests_with_peer_address` is empty (`NetworkError::EmptyPeerList`)
146    /// - Critical failures prevent spawning async tasks (not shown in current impl)
147    ///
148    /// # Errors
149    /// - `NetworkError::EmptyPeerList` for empty input
150    /// - `NetworkError::TaskFailed` for partial execution failures
151    ///
152    /// # Guarantees
153    /// - At-least-once delivery semantics
154    /// - Automatic deduplication of peer entries
155    /// - Non-blocking error handling
156    async fn send_append_requests(
157        &self,
158        requests: Vec<(u32, AppendEntriesRequest)>,
159        retry: &RetryPolicies,
160        membership: std::sync::Arc<crate::alias::MOF<T>>,
161        response_compress_enabled: bool,
162    ) -> Result<AppendResult>;
163
164    /// Initiates leader election by requesting votes from cluster peers.
165    ///
166    /// # Protocol
167    /// - Implements leader election from Raft §5.2
168    /// - Candidate-exclusive operation
169    /// - Validates log completeness requirements
170    ///
171    /// # Parameters
172    /// - `req`: Election metadata with candidate's term and log state
173    /// - `retry`: Election-specific retry strategy
174    /// - `membership`: Cluster membership for channel resolution
175    ///
176    /// # Errors
177    /// - `NetworkError::EmptyPeerList` for empty peer list
178    /// - `NetworkError::TaskFailed` for RPC execution failures
179    ///
180    /// # Safety
181    /// - Automatic term validation in responses
182    /// - Strict candidate state enforcement
183    /// - Non-blocking partial failure handling
184    async fn send_vote_requests(
185        &self,
186        req: VoteRequest,
187        retry: &RetryPolicies,
188        membership: std::sync::Arc<crate::alias::MOF<T>>,
189    ) -> Result<VoteResult>;
190
191    /// Orchestrates log compaction across cluster peers after snapshot creation.
192    ///
193    /// # Protocol
194    /// - Implements log truncation from Raft §7
195    /// - Leader-exclusive operation
196    /// - Requires valid snapshot checksum
197    ///
198    /// # Parameters
199    /// - `req`: Snapshot metadata with truncation index
200    /// - `retry`: Purge-specific retry configuration
201    /// - `membership`: Cluster membership for channel resolution
202    ///
203    /// # Errors
204    /// - `NetworkError::EmptyPeerList` for empty peer list
205    /// - `NetworkError::TaskFailed` for background execution errors
206    ///
207    /// # Guarantees
208    /// - At-least-once delivery
209    /// - Automatic progress tracking
210    /// - Crash-safe persistence requirements
211    async fn send_purge_requests(
212        &self,
213        req: PurgeLogRequest,
214        retry: &RetryPolicies,
215        membership: std::sync::Arc<crate::alias::MOF<T>>,
216    ) -> Result<Vec<Result<PurgeLogResponse>>>;
217
218    /// Initiates cluster join process for a learner node
219    ///
220    /// # Protocol
221    /// - Implements cluster join protocol from Raft §6
222    /// - Learner-exclusive operation
223    /// - Requires leader connection
224    ///
225    /// # Parameters
226    /// - `leader_channel`: Pre-established gRPC channel to cluster leader
227    /// - `request`: Join request with node metadata
228    /// - `retry`: Join-specific retry configuration
229    /// - `membership`: Cluster membership for channel resolution
230    ///
231    /// # Errors
232    /// - NetworkError::JoinFailed: On unrecoverable join failure
233    /// - NetworkError::NotLeader: If contacted node isn't leader
234    ///
235    /// # Guarantees
236    /// - At-least-once delivery
237    /// - Automatic leader discovery
238    /// - Idempotent operation
239    async fn join_cluster(
240        &self,
241        leader_id: u32,
242        request: d_engine_proto::server::cluster::JoinRequest,
243        retry: BackoffPolicy,
244        membership: std::sync::Arc<crate::alias::MOF<T>>,
245    ) -> Result<d_engine_proto::server::cluster::JoinResponse>;
246
247    /// Discovers current cluster leader
248    ///
249    /// # Protocol
250    /// - Broadcast-based leader discovery
251    /// - Handles redirection to current leader
252    ///
253    /// # Parameters
254    /// - `bootstrap_endpoints`: Initial cluster endpoints
255    /// - `request`: Discovery request with node metadata
256    ///
257    /// # Errors
258    /// - `NetworkError::DiscoveryTimeout`: When no response received
259    async fn discover_leader(
260        &self,
261        request: d_engine_proto::server::cluster::LeaderDiscoveryRequest,
262        rpc_enable_compression: bool,
263        membership: std::sync::Arc<crate::alias::MOF<T>>,
264    ) -> Result<Vec<d_engine_proto::server::cluster::LeaderDiscoveryResponse>>;
265
266    /// Requests and streams a snapshot from the current leader.
267    ///
268    /// # Parameters
269    /// - `leader_id`: Current leader node ID
270    /// - `retry`: Retry configuration (currently unused in implementation)
271    /// - `membership`: Cluster membership for channel resolution
272    ///
273    /// # Returns
274    /// Streaming of snapshot chunks from the leader
275    ///
276    /// # Errors
277    /// Returns `NetworkError` if:
278    /// - Connection to leader fails
279    /// - gRPC call fails
280    async fn request_snapshot_from_leader(
281        &self,
282        leader_id: u32,
283        ack_tx: tokio::sync::mpsc::Receiver<d_engine_proto::server::storage::SnapshotAck>,
284        retry: &crate::InstallSnapshotBackoffPolicy,
285        membership: std::sync::Arc<crate::alias::MOF<T>>,
286    ) -> Result<Box<tonic::Streaming<SnapshotChunk>>>;
287}
288
289// Module level utils
290// -----------------------------------------------------------------------------
291use std::collections::HashMap;
292use std::collections::HashSet;
293use std::time::Duration;
294
295use tokio::time::sleep;
296use tokio::time::timeout;
297use tonic::Code;
298use tracing::debug;
299use tracing::warn;
300
301use crate::Error;
302
303/// As soon as task has return we should return from this function
304pub async fn grpc_task_with_timeout_and_exponential_backoff<F, T, U>(
305    task_name: &'static str,
306    mut task: F,
307    policy: BackoffPolicy,
308) -> std::result::Result<tonic::Response<U>, Error>
309where
310    F: FnMut() -> T,
311    T: std::future::Future<Output = std::result::Result<tonic::Response<U>, tonic::Status>>
312        + Send
313        + 'static,
314{
315    // let max_retries = 5;
316    let mut retries = 0;
317    let mut current_delay = Duration::from_millis(policy.base_delay_ms);
318    let timeout_duration = Duration::from_millis(policy.timeout_ms);
319    let max_delay = Duration::from_millis(policy.max_delay_ms);
320    let max_retries = policy.max_retries;
321
322    let mut last_error =
323        NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
324    while retries < max_retries {
325        debug!("[{task_name}] Attempt {} of {}", retries + 1, max_retries);
326        match timeout(timeout_duration, task()).await {
327            Ok(Ok(r)) => {
328                return Ok(r); // Exit on success
329            }
330            Ok(Err(status)) => {
331                last_error = match status.code() {
332                    Code::Unavailable => {
333                        warn!("[{task_name}] Service unavailable: {}", status.message());
334                        NetworkError::ServiceUnavailable(format!(
335                            "Service unavailable: {}",
336                            status.message()
337                        ))
338                    }
339                    _ => {
340                        warn!("[{task_name}] RPC error: {}", status);
341                        NetworkError::TonicStatusError(Box::new(status))
342                    }
343                };
344            }
345            Err(_e) => {
346                warn!("[{task_name}] Task timed out after {:?}", timeout_duration);
347                last_error = NetworkError::RetryTimeoutError(timeout_duration);
348            }
349        };
350
351        if retries < max_retries - 1 {
352            debug!("[{task_name}] Retrying in {:?}...", current_delay);
353            sleep(current_delay).await;
354
355            // Exponential backoff (double the delay each time)
356            current_delay = (current_delay * 2).min(max_delay);
357        } else {
358            warn!("[{task_name}] Task failed after {} retries", retries);
359            //bug: no need to return if the it is not a business logic error
360            // return Err(Error::RetryTaskFailed("Task failed after max
361            // retries".to_string())); // Return the last error after max
362            // retries
363        }
364        retries += 1;
365    }
366    warn!("[{task_name}] Task failed after {} retries", max_retries);
367    Err(last_error.into()) // Fallback error message if no task returns Ok
368}