d-engine-core 0.2.4

Pure Raft consensus algorithm - for building custom Raft-based systems
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
//! This module is the network abstraction layer with timeout-aware gRPC
//! implementation
//!
//! This module provides network communication facilities with configurable
//! timeout policies for distributed system operations. All network operations
//! are governed by timeout parameters defined in [`RaftConfig`] to ensure
//! system responsiveness.

mod background_snapshot_transfer;

#[cfg(test)]
mod background_snapshot_transfer_test;
use async_trait::async_trait;
pub use background_snapshot_transfer::*;
use d_engine_proto::server::cluster::ClusterConfChangeRequest;
use d_engine_proto::server::cluster::ClusterConfUpdateResponse;
use d_engine_proto::server::election::VoteRequest;
use d_engine_proto::server::election::VoteResponse;
use d_engine_proto::server::replication::AppendEntriesRequest;
use d_engine_proto::server::replication::AppendEntriesResponse;
use d_engine_proto::server::storage::SnapshotChunk;
use d_engine_proto::server::storage::SnapshotMetadata;
use futures::stream::BoxStream;
#[cfg(any(test, feature = "__test_support"))]
use mockall::automock;
use tokio::sync::mpsc;

use crate::BackoffPolicy;
use crate::NetworkError;
use crate::Result;
use crate::RetryPolicies;
use crate::TypeConfig;

/// Per-peer persistent bidirectional replication stream.
///
/// Opened once per follower at leader startup (or on reconnect). The sender
/// pushes `AppendEntriesRequest` batches; the receiver yields
/// `AppendEntriesResponse` ACKs. Dropping the sender closes the stream.
pub struct ReplicationStream {
    /// Push AppendEntries batches directly into the open h2 stream (capacity = 128).
    pub sender: mpsc::Sender<AppendEntriesRequest>,
    /// Receive ACKs from the follower; items are `Result<_, tonic::Status>`.
    pub receiver: BoxStream<'static, std::result::Result<AppendEntriesResponse, tonic::Status>>,
}

// Define a structured return value
#[derive(Debug, Clone)]
pub struct AppendResults {
    /// Whether a majority quorum is achieved (can be directly determined via
    /// peer_updates)
    pub commit_quorum_achieved: bool,
    /// Updates to each peer's match_index and next_index
    pub peer_updates: HashMap<u32, PeerUpdate>,
    /// Learner log catch-up progress
    pub learner_progress: HashMap<u32, Option<u64>>,
}

#[derive(Debug, Clone, PartialEq)]
pub struct PeerUpdate {
    pub match_index: Option<u64>,
    pub next_index: u64,
    /// if peer response success
    pub success: bool,
}

impl PeerUpdate {
    #[allow(unused)]
    pub fn success(
        match_index: u64,
        next_index: u64,
    ) -> Self {
        PeerUpdate {
            match_index: Some(match_index),
            next_index,
            success: true,
        }
    }

    #[allow(unused)]
    pub fn failed() -> Self {
        Self {
            match_index: None,
            next_index: 1,
            success: false,
        }
    }
}
#[derive(Debug)]
pub struct AppendResult {
    pub peer_ids: HashSet<u32>,
    pub responses: Vec<Result<AppendEntriesResponse>>,
}
#[derive(Debug)]
pub struct VoteResult {
    pub peer_ids: HashSet<u32>,
    pub responses: Vec<Result<VoteResponse>>,
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct ClusterUpdateResult {
    pub peer_ids: HashSet<u32>,
    pub responses: Vec<Result<ClusterConfUpdateResponse>>,
}

#[cfg_attr(any(test, feature = "__test_support"), automock)]
#[async_trait]
pub trait Transport<T>: Send + Sync + 'static
where
    T: TypeConfig,
{
    /// Propagates cluster configuration changes to voting members using Raft's joint consensus.
    ///
    /// # Protocol
    /// - Implements membership change protocol from Raft §6
    /// - Leader-exclusive operation
    /// - Automatically filters self-references and duplicates
    ///
    /// # Parameters
    /// - `req`: Configuration change details with transition state
    /// - `retry`: Network retry policy with exponential backoff
    /// - `membership`: Cluster membership for channel resolution
    ///
    /// # Errors
    /// - `NetworkError::EmptyPeerList` if no peers provided
    /// - `NetworkError::TaskFailed` for background execution failures
    /// - `ConsensusError::NotLeader` if executed by non-leader
    ///
    /// # Implementation
    /// - Uses compressed gRPC streams for efficiency
    /// - Maintains response order matching input peers
    /// - Concurrent request processing with ordered aggregation
    #[allow(dead_code)]
    async fn send_cluster_update(
        &self,
        req: ClusterConfChangeRequest,
        retry: &RetryPolicies,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
    ) -> Result<ClusterUpdateResult>;

    /// Replicates log entries to followers and learners.
    ///
    /// # Protocol
    /// - Implements log replication from Raft §5.3
    /// - Leader-exclusive operation
    /// - Handles log consistency checks automatically
    ///
    /// # Parameters
    /// - `requests`: Vector of (peer_id, AppendEntriesRequest)
    /// - `retry`: Network retry configuration
    /// - `membership`: Cluster membership for channel resolution
    /// - `response_compress_enabled`: Enable compression for replication responses
    ///
    /// # Returns
    /// - On success: `Ok(AppendResult)` containing aggregated responses
    /// - On failure: `Err(NetworkError)` for unrecoverable errors
    ///
    /// ## **Error Conditions**: Top-level `Err` is returned ONLY when:
    /// - Input `requests_with_peer_address` is empty (`NetworkError::EmptyPeerList`)
    /// - Critical failures prevent spawning async tasks (not shown in current impl)
    ///
    /// # Errors
    /// - `NetworkError::EmptyPeerList` for empty input
    /// - `NetworkError::TaskFailed` for partial execution failures
    ///
    /// # Guarantees
    /// - At-least-once delivery semantics
    /// - Automatic deduplication of peer entries
    /// - Non-blocking error handling
    async fn send_append_requests(
        &self,
        requests: Vec<(u32, AppendEntriesRequest)>,
        retry: &RetryPolicies,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
        response_compress_enabled: bool,
    ) -> Result<AppendResult>;

    /// Initiates leader election by requesting votes from cluster peers.
    ///
    /// # Protocol
    /// - Implements leader election from Raft §5.2
    /// - Candidate-exclusive operation
    /// - Validates log completeness requirements
    ///
    /// # Parameters
    /// - `req`: Election metadata with candidate's term and log state
    /// - `retry`: Election-specific retry strategy
    /// - `membership`: Cluster membership for channel resolution
    ///
    /// # Errors
    /// - `NetworkError::EmptyPeerList` for empty peer list
    /// - `NetworkError::TaskFailed` for RPC execution failures
    ///
    /// # Safety
    /// - Automatic term validation in responses
    /// - Strict candidate state enforcement
    /// - Non-blocking partial failure handling
    async fn send_vote_requests(
        &self,
        req: VoteRequest,
        retry: &RetryPolicies,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
    ) -> Result<VoteResult>;

    /// Orchestrates log compaction across cluster peers after snapshot creation.
    ///
    /// # Protocol
    /// - Implements log truncation from Raft §7
    /// Initiates cluster join process for a learner node
    ///
    /// # Protocol
    /// - Implements cluster join protocol from Raft §6
    /// - Learner-exclusive operation
    /// - Requires leader connection
    ///
    /// # Parameters
    /// - `leader_channel`: Pre-established gRPC channel to cluster leader
    /// - `request`: Join request with node metadata
    /// - `retry`: Join-specific retry configuration
    /// - `membership`: Cluster membership for channel resolution
    ///
    /// # Errors
    /// - NetworkError::JoinFailed: On unrecoverable join failure
    /// - NetworkError::NotLeader: If contacted node isn't leader
    ///
    /// # Guarantees
    /// - At-least-once delivery
    /// - Automatic leader discovery
    /// - Idempotent operation
    async fn join_cluster(
        &self,
        leader_id: u32,
        request: d_engine_proto::server::cluster::JoinRequest,
        retry: BackoffPolicy,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
    ) -> Result<d_engine_proto::server::cluster::JoinResponse>;

    /// Discovers current cluster leader
    ///
    /// # Protocol
    /// - Broadcast-based leader discovery
    /// - Handles redirection to current leader
    ///
    /// # Parameters
    /// - `bootstrap_endpoints`: Initial cluster endpoints
    /// - `request`: Discovery request with node metadata
    ///
    /// # Errors
    /// - `NetworkError::DiscoveryTimeout`: When no response received
    async fn discover_leader(
        &self,
        request: d_engine_proto::server::cluster::LeaderDiscoveryRequest,
        rpc_enable_compression: bool,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
    ) -> Result<Vec<d_engine_proto::server::cluster::LeaderDiscoveryResponse>>;

    /// Send a single AppendEntries request to one peer (used by ReplicationWorker).
    ///
    /// Non-blocking from the caller's perspective: the caller fires this and the
    /// per-follower worker task awaits the response independently. Reuses the
    /// existing FIFO `peer_appender_task` infrastructure internally.
    ///
    /// # Parameters
    /// - `peer_id`: Target follower node ID
    /// - `request`: AppendEntries RPC request
    /// - `retry`: Retry / timeout configuration
    /// - `membership`: Cluster membership for channel resolution
    /// - `response_compress_enabled`: Enable gRPC response compression
    ///
    /// # Returns
    /// `Ok(AppendEntriesResponse)` on success, `Err` on network / timeout failure
    async fn send_append_request(
        &self,
        peer_id: u32,
        request: AppendEntriesRequest,
        retry: &RetryPolicies,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
        response_compress_enabled: bool,
    ) -> Result<AppendEntriesResponse>;

    /// Pushes a snapshot to a lagging peer (called by per-follower ReplicationWorker).
    ///
    /// Used when a peer's `next_index` falls below the leader's purge boundary and
    /// AppendEntries would carry a stale `prev_log_term = 0`, causing a perpetual
    /// conflict loop.  The worker calls this and awaits completion before emitting
    /// `RoleEvent::SnapshotPushCompleted`.
    ///
    /// # Parameters
    /// - `peer_id`: Target follower node ID
    /// - `metadata`: Snapshot metadata (term, index, size)
    /// - `state_machine_handler`: Used to load the snapshot data stream
    /// - `membership`: Cluster membership for bulk-channel resolution
    /// - `config`: Snapshot transfer configuration (chunk size, timeout, etc.)
    ///
    /// # Returns
    /// `Ok(())` on successful transfer, `Err` on network or serialization failure.
    async fn send_snapshot(
        &self,
        peer_id: u32,
        metadata: SnapshotMetadata,
        state_machine_handler: std::sync::Arc<crate::alias::SMHOF<T>>,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
        config: crate::SnapshotConfig,
    ) -> Result<()>;

    /// Requests and streams a snapshot from the current leader.
    ///
    /// # Parameters
    /// - `leader_id`: Current leader node ID
    /// - `retry`: Retry configuration (currently unused in implementation)
    /// - `membership`: Cluster membership for channel resolution
    ///
    /// # Returns
    /// Streaming of snapshot chunks from the leader
    ///
    /// # Errors
    /// Returns `NetworkError` if:
    /// - Connection to leader fails
    /// - gRPC call fails
    async fn request_snapshot_from_leader(
        &self,
        leader_id: u32,
        ack_tx: tokio::sync::mpsc::Receiver<d_engine_proto::server::storage::SnapshotAck>,
        retry: &crate::InstallSnapshotBackoffPolicy,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
    ) -> Result<Box<tonic::Streaming<SnapshotChunk>>>;

    /// Opens a persistent bidirectional AppendEntries stream to the given peer.
    ///
    /// Called once per follower when the leader becomes active (or on reconnect).
    /// The returned [`ReplicationStream`] contains:
    /// - `sender`: push batches into the open h2 stream (non-blocking, capacity 128)
    /// - `receiver`: stream of ACKs from the follower
    ///
    /// When the stream breaks (network error, peer restart), the receiver yields
    /// an `Err(tonic::Status)` and the caller should emit
    /// `RoleEvent::PeerStreamError { peer_id }` so the Raft loop can reset
    /// `next_index` and schedule reconnection.
    ///
    /// # Errors
    /// Returns `NetworkError` if the initial stream handshake fails.
    async fn open_replication_stream(
        &self,
        peer_id: u32,
        membership: std::sync::Arc<crate::alias::MOF<T>>,
        compress: bool,
    ) -> Result<ReplicationStream>;
}

// Module level utils
// -----------------------------------------------------------------------------
use std::collections::HashMap;
use std::collections::HashSet;
use std::time::Duration;

use tokio::time::sleep;
use tokio::time::timeout;
use tonic::Code;
use tracing::debug;
use tracing::warn;

use crate::Error;

/// As soon as task has return we should return from this function
pub async fn grpc_task_with_timeout_and_exponential_backoff<F, T, U>(
    task_name: &'static str,
    mut task: F,
    policy: BackoffPolicy,
) -> std::result::Result<tonic::Response<U>, Error>
where
    F: FnMut() -> T,
    T: std::future::Future<Output = std::result::Result<tonic::Response<U>, tonic::Status>>
        + Send
        + 'static,
{
    // let max_retries = 5;
    let mut retries = 0;
    let mut current_delay = Duration::from_millis(policy.base_delay_ms);
    let timeout_duration = Duration::from_millis(policy.timeout_ms);
    let max_delay = Duration::from_millis(policy.max_delay_ms);
    let max_retries = policy.max_retries;

    let mut last_error =
        NetworkError::TaskBackoffFailed("Task failed after max retries".to_string());
    while retries < max_retries {
        debug!("[{task_name}] Attempt {} of {}", retries + 1, max_retries);
        match timeout(timeout_duration, task()).await {
            Ok(Ok(r)) => {
                return Ok(r); // Exit on success
            }
            Ok(Err(status)) => {
                last_error = match status.code() {
                    Code::Unavailable => {
                        warn!("[{task_name}] Service unavailable: {}", status.message());
                        NetworkError::ServiceUnavailable(format!(
                            "Service unavailable: {}",
                            status.message()
                        ))
                    }
                    _ => {
                        warn!("[{task_name}] RPC error: {}", status);
                        NetworkError::TonicStatusError(Box::new(status))
                    }
                };
            }
            Err(_e) => {
                warn!("[{task_name}] Task timed out after {:?}", timeout_duration);
                last_error = NetworkError::RetryTimeoutError(timeout_duration);
            }
        };

        if retries < max_retries - 1 {
            debug!("[{task_name}] Retrying in {:?}...", current_delay);
            sleep(current_delay).await;

            // Exponential backoff (double the delay each time)
            current_delay = (current_delay * 2).min(max_delay);
        } else {
            warn!("[{task_name}] Task failed after {} retries", retries);
            //bug: no need to return if the it is not a business logic error
            // return Err(Error::RetryTaskFailed("Task failed after max
            // retries".to_string())); // Return the last error after max
            // retries
        }
        retries += 1;
    }
    warn!("[{task_name}] Task failed after {} retries", max_retries);
    Err(last_error.into()) // Fallback error message if no task returns Ok
}