Skip to main content

d_engine_core/network/
mod.rs

1//! This module is the network abstraction layer with timeout-aware gRPC
2//! implementation
3//!
4//! This module provides network communication facilities with configurable
5//! timeout policies for distributed system operations. All network operations
6//! are governed by timeout parameters defined in [`RaftConfig`] to ensure
7//! system responsiveness.
8
9mod background_snapshot_transfer;
10
11#[cfg(test)]
12mod background_snapshot_transfer_test;
13pub(crate) use background_snapshot_transfer::*;
14use d_engine_proto::server::cluster::ClusterConfChangeRequest;
15use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
16use d_engine_proto::server::election::VoteRequest;
17use d_engine_proto::server::election::VoteResponse;
18use d_engine_proto::server::replication::AppendEntriesRequest;
19use d_engine_proto::server::replication::AppendEntriesResponse;
20use d_engine_proto::server::storage::SnapshotChunk;
21#[cfg(any(test, feature = "__test_support"))]
22use mockall::automock;
23use tonic::async_trait;
24
25use crate::BackoffPolicy;
26use crate::NetworkError;
27use crate::Result;
28use crate::RetryPolicies;
29use crate::TypeConfig;
30
31// Define a structured return value
32#[derive(Debug, Clone)]
33pub struct AppendResults {
34    /// Whether a majority quorum is achieved (can be directly determined via
35    /// peer_updates)
36    pub commit_quorum_achieved: bool,
37    /// Updates to each peer's match_index and next_index
38    pub peer_updates: HashMap<u32, PeerUpdate>,
39    /// Learner log catch-up progress
40    pub learner_progress: HashMap<u32, Option<u64>>,
41}
42
43#[derive(Debug, Clone, PartialEq)]
44pub struct PeerUpdate {
45    pub match_index: Option<u64>,
46    pub next_index: u64,
47    /// if peer response success
48    pub success: bool,
49}
50
51impl PeerUpdate {
52    #[allow(unused)]
53    pub fn success(
54        match_index: u64,
55        next_index: u64,
56    ) -> Self {
57        PeerUpdate {
58            match_index: Some(match_index),
59            next_index,
60            success: true,
61        }
62    }
63
64    #[allow(unused)]
65    pub fn failed() -> Self {
66        Self {
67            match_index: None,
68            next_index: 1,
69            success: false,
70        }
71    }
72}
73#[derive(Debug)]
74pub struct AppendResult {
75    pub peer_ids: HashSet<u32>,
76    pub responses: Vec<Result<AppendEntriesResponse>>,
77}
78#[derive(Debug)]
79pub struct VoteResult {
80    pub peer_ids: HashSet<u32>,
81    pub responses: Vec<Result<VoteResponse>>,
82}
83#[allow(dead_code)]
84#[derive(Debug)]
85pub struct ClusterUpdateResult {
86    pub peer_ids: HashSet<u32>,
87    pub responses: Vec<Result<ClusterConfUpdateResponse>>,
88}
89
90#[cfg_attr(any(test, feature = "__test_support"), automock)]
91#[async_trait]
92pub trait Transport<T>: Send + Sync + 'static
93where
94    T: TypeConfig,
95{
96    /// Propagates cluster configuration changes to voting members using Raft's joint consensus.
97    ///
98    /// # Protocol
99    /// - Implements membership change protocol from Raft §6
100    /// - Leader-exclusive operation
101    /// - Automatically filters self-references and duplicates
102    ///
103    /// # Parameters
104    /// - `req`: Configuration change details with transition state
105    /// - `retry`: Network retry policy with exponential backoff
106    /// - `membership`: Cluster membership for channel resolution
107    ///
108    /// # Errors
109    /// - `NetworkError::EmptyPeerList` if no peers provided
110    /// - `NetworkError::TaskFailed` for background execution failures
111    /// - `ConsensusError::NotLeader` if executed by non-leader
112    ///
113    /// # Implementation
114    /// - Uses compressed gRPC streams for efficiency
115    /// - Maintains response order matching input peers
116    /// - Concurrent request processing with ordered aggregation
117    #[allow(dead_code)]
118    async fn send_cluster_update(
119        &self,
120        req: ClusterConfChangeRequest,
121        retry: &RetryPolicies,
122        membership: std::sync::Arc<crate::alias::MOF<T>>,
123    ) -> Result<ClusterUpdateResult>;
124
125    /// Replicates log entries to followers and learners.
126    ///
127    /// # Protocol
128    /// - Implements log replication from Raft §5.3
129    /// - Leader-exclusive operation
130    /// - Handles log consistency checks automatically
131    ///
132    /// # Parameters
133    /// - `requests`: Vector of (peer_id, AppendEntriesRequest)
134    /// - `retry`: Network retry configuration
135    /// - `membership`: Cluster membership for channel resolution
136    /// - `response_compress_enabled`: Enable compression for replication responses
137    ///
138    /// # Returns
139    /// - On success: `Ok(AppendResult)` containing aggregated responses
140    /// - On failure: `Err(NetworkError)` for unrecoverable errors
141    ///
142    /// ## **Error Conditions**: Top-level `Err` is returned ONLY when:
143    /// - Input `requests_with_peer_address` is empty (`NetworkError::EmptyPeerList`)
144    /// - Critical failures prevent spawning async tasks (not shown in current impl)
145    ///
146    /// # Errors
147    /// - `NetworkError::EmptyPeerList` for empty input
148    /// - `NetworkError::TaskFailed` for partial execution failures
149    ///
150    /// # Guarantees
151    /// - At-least-once delivery semantics
152    /// - Automatic deduplication of peer entries
153    /// - Non-blocking error handling
154    async fn send_append_requests(
155        &self,
156        requests: Vec<(u32, AppendEntriesRequest)>,
157        retry: &RetryPolicies,
158        membership: std::sync::Arc<crate::alias::MOF<T>>,
159        response_compress_enabled: bool,
160    ) -> Result<AppendResult>;
161
162    /// Initiates leader election by requesting votes from cluster peers.
163    ///
164    /// # Protocol
165    /// - Implements leader election from Raft §5.2
166    /// - Candidate-exclusive operation
167    /// - Validates log completeness requirements
168    ///
169    /// # Parameters
170    /// - `req`: Election metadata with candidate's term and log state
171    /// - `retry`: Election-specific retry strategy
172    /// - `membership`: Cluster membership for channel resolution
173    ///
174    /// # Errors
175    /// - `NetworkError::EmptyPeerList` for empty peer list
176    /// - `NetworkError::TaskFailed` for RPC execution failures
177    ///
178    /// # Safety
179    /// - Automatic term validation in responses
180    /// - Strict candidate state enforcement
181    /// - Non-blocking partial failure handling
182    async fn send_vote_requests(
183        &self,
184        req: VoteRequest,
185        retry: &RetryPolicies,
186        membership: std::sync::Arc<crate::alias::MOF<T>>,
187    ) -> Result<VoteResult>;
188
189    /// Orchestrates log compaction across cluster peers after snapshot creation.
190    ///
191    /// # Protocol
192    /// - Implements log truncation from Raft §7
193    /// Initiates cluster join process for a learner node
194    ///
195    /// # Protocol
196    /// - Implements cluster join protocol from Raft §6
197    /// - Learner-exclusive operation
198    /// - Requires leader connection
199    ///
200    /// # Parameters
201    /// - `leader_channel`: Pre-established gRPC channel to cluster leader
202    /// - `request`: Join request with node metadata
203    /// - `retry`: Join-specific retry configuration
204    /// - `membership`: Cluster membership for channel resolution
205    ///
206    /// # Errors
207    /// - NetworkError::JoinFailed: On unrecoverable join failure
208    /// - NetworkError::NotLeader: If contacted node isn't leader
209    ///
210    /// # Guarantees
211    /// - At-least-once delivery
212    /// - Automatic leader discovery
213    /// - Idempotent operation
214    async fn join_cluster(
215        &self,
216        leader_id: u32,
217        request: d_engine_proto::server::cluster::JoinRequest,
218        retry: BackoffPolicy,
219        membership: std::sync::Arc<crate::alias::MOF<T>>,
220    ) -> Result<d_engine_proto::server::cluster::JoinResponse>;
221
222    /// Discovers current cluster leader
223    ///
224    /// # Protocol
225    /// - Broadcast-based leader discovery
226    /// - Handles redirection to current leader
227    ///
228    /// # Parameters
229    /// - `bootstrap_endpoints`: Initial cluster endpoints
230    /// - `request`: Discovery request with node metadata
231    ///
232    /// # Errors
233    /// - `NetworkError::DiscoveryTimeout`: When no response received
234    async fn discover_leader(
235        &self,
236        request: d_engine_proto::server::cluster::LeaderDiscoveryRequest,
237        rpc_enable_compression: bool,
238        membership: std::sync::Arc<crate::alias::MOF<T>>,
239    ) -> Result<Vec<d_engine_proto::server::cluster::LeaderDiscoveryResponse>>;
240
241    /// Requests and streams a snapshot from the current leader.
242    ///
243    /// # Parameters
244    /// - `leader_id`: Current leader node ID
245    /// - `retry`: Retry configuration (currently unused in implementation)
246    /// - `membership`: Cluster membership for channel resolution
247    ///
248    /// # Returns
249    /// Streaming of snapshot chunks from the leader
250    ///
251    /// # Errors
252    /// Returns `NetworkError` if:
253    /// - Connection to leader fails
254    /// - gRPC call fails
255    async fn request_snapshot_from_leader(
256        &self,
257        leader_id: u32,
258        ack_tx: tokio::sync::mpsc::Receiver<d_engine_proto::server::storage::SnapshotAck>,
259        retry: &crate::InstallSnapshotBackoffPolicy,
260        membership: std::sync::Arc<crate::alias::MOF<T>>,
261    ) -> Result<Box<tonic::Streaming<SnapshotChunk>>>;
262}
263
264// Module level utils
265// -----------------------------------------------------------------------------
266use std::collections::HashMap;
267use std::collections::HashSet;
268use std::time::Duration;
269
270use tokio::time::sleep;
271use tokio::time::timeout;
272use tonic::Code;
273use tracing::debug;
274use tracing::warn;
275
276use crate::Error;
277
278/// As soon as task has return we should return from this function
279pub async fn grpc_task_with_timeout_and_exponential_backoff<F, T, U>(
280    task_name: &'static str,
281    mut task: F,
282    policy: BackoffPolicy,
283) -> std::result::Result<tonic::Response<U>, Error>
284where
285    F: FnMut() -> T,
286    T: std::future::Future<Output = std::result::Result<tonic::Response<U>, tonic::Status>>
287        + Send
288        + 'static,
289{
290    // let max_retries = 5;
291    let mut retries = 0;
292    let mut current_delay = Duration::from_millis(policy.base_delay_ms);
293    let timeout_duration = Duration::from_millis(policy.timeout_ms);
294    let max_delay = Duration::from_millis(policy.max_delay_ms);
295    let max_retries = policy.max_retries;
296
297    let mut last_error =
298        NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
299    while retries < max_retries {
300        debug!("[{task_name}] Attempt {} of {}", retries + 1, max_retries);
301        match timeout(timeout_duration, task()).await {
302            Ok(Ok(r)) => {
303                return Ok(r); // Exit on success
304            }
305            Ok(Err(status)) => {
306                last_error = match status.code() {
307                    Code::Unavailable => {
308                        warn!("[{task_name}] Service unavailable: {}", status.message());
309                        NetworkError::ServiceUnavailable(format!(
310                            "Service unavailable: {}",
311                            status.message()
312                        ))
313                    }
314                    _ => {
315                        warn!("[{task_name}] RPC error: {}", status);
316                        NetworkError::TonicStatusError(Box::new(status))
317                    }
318                };
319            }
320            Err(_e) => {
321                warn!("[{task_name}] Task timed out after {:?}", timeout_duration);
322                last_error = NetworkError::RetryTimeoutError(timeout_duration);
323            }
324        };
325
326        if retries < max_retries - 1 {
327            debug!("[{task_name}] Retrying in {:?}...", current_delay);
328            sleep(current_delay).await;
329
330            // Exponential backoff (double the delay each time)
331            current_delay = (current_delay * 2).min(max_delay);
332        } else {
333            warn!("[{task_name}] Task failed after {} retries", retries);
334            //bug: no need to return if the it is not a business logic error
335            // return Err(Error::RetryTaskFailed("Task failed after max
336            // retries".to_string())); // Return the last error after max
337            // retries
338        }
339        retries += 1;
340    }
341    warn!("[{task_name}] Task failed after {} retries", max_retries);
342    Err(last_error.into()) // Fallback error message if no task returns Ok
343}