dgate 2.1.0

DGate API Gateway - High-performance API gateway with JavaScript module support
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
//! Raft consensus implementation for DGate
//!
//! This module provides a full Raft consensus implementation using openraft,
//! with leader election, log replication, and snapshot transfer.
//!
//! # Architecture
//!
//! - `RaftConsensus`: Main Raft wrapper implementing the Consensus trait
//! - `RaftLogStore`: In-memory log storage
//! - `RaftStateMachine`: State machine for applying logs
//! - `RaftNetwork`: HTTP-based networking between nodes

mod network;
mod state_machine;
mod store;

use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::io::Cursor;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use openraft::raft::ClientWriteResponse;
use openraft::Config as RaftConfig;
use openraft::{BasicNode, ChangeMembers, Raft};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};

use super::consensus::{Consensus, ConsensusMetrics, ConsensusResponse, NodeId, NodeState};
use super::discovery::NodeDiscovery;
use crate::config::{ClusterConfig, ClusterMember, ClusterMode};
use crate::resources::ChangeLog;
use crate::storage::ProxyStore;

pub use network::NetworkFactory;
pub use state_machine::DGateStateMachine;
pub use store::RaftLogStore;

// Raft type configuration using openraft's declarative macro
openraft::declare_raft_types!(
    pub TypeConfig:
        D = ChangeLog,
        R = RaftClientResponse,
        Node = BasicNode,
);

/// Response from the Raft state machine after applying a log entry
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct RaftClientResponse {
    pub success: bool,
    pub message: Option<String>,
}

impl From<RaftClientResponse> for ConsensusResponse {
    fn from(r: RaftClientResponse) -> Self {
        ConsensusResponse {
            success: r.success,
            message: r.message,
        }
    }
}

/// Snapshot data for state machine
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct SnapshotData {
    pub changelogs: Vec<ChangeLog>,
}

/// The Raft instance type alias for convenience
pub type DGateRaft = Raft<TypeConfig>;

/// Raft consensus implementation
pub struct RaftConsensus {
    /// Configuration
    config: ClusterConfig,
    /// The Raft instance
    raft: Arc<DGateRaft>,
    /// Node discovery service
    discovery: Option<Arc<NodeDiscovery>>,
    /// Cached members list
    cached_members: RwLock<Vec<ClusterMember>>,
}

impl RaftConsensus {
    /// Create a new Raft consensus manager
    pub async fn new(
        cluster_config: ClusterConfig,
        store: Arc<ProxyStore>,
        change_tx: mpsc::UnboundedSender<ChangeLog>,
    ) -> anyhow::Result<Self> {
        let node_id = cluster_config.node_id;

        info!(
            "Creating Raft consensus for node {} at {}",
            node_id, cluster_config.advertise_addr
        );

        // Create Raft configuration
        let raft_config = RaftConfig {
            cluster_name: "dgate-cluster".to_string(),
            heartbeat_interval: 200,
            election_timeout_min: 500,
            election_timeout_max: 1000,
            snapshot_policy: openraft::SnapshotPolicy::LogsSinceLast(1000),
            max_in_snapshot_log_to_keep: 100,
            ..Default::default()
        };

        let raft_config = Arc::new(raft_config.validate()?);

        // Create log store
        let log_store = RaftLogStore::new();

        // Create network factory
        let network_factory = NetworkFactory::new();

        // Create state machine
        let state_machine = DGateStateMachine::with_change_notifier(store, change_tx);

        // Create the Raft instance
        let raft = Raft::new(
            node_id,
            raft_config,
            network_factory,
            log_store,
            state_machine,
        )
        .await?;

        let raft = Arc::new(raft);

        // Setup discovery if configured
        let discovery = cluster_config
            .discovery
            .as_ref()
            .map(|disc_config| Arc::new(NodeDiscovery::new(disc_config.clone())));

        // Cache initial members from config
        let cached_members = RwLock::new(cluster_config.initial_members.clone());

        Ok(Self {
            config: cluster_config,
            raft,
            discovery,
            cached_members,
        })
    }

    /// Get the Raft instance for direct access (e.g., for handling RPC requests)
    pub fn raft(&self) -> &Arc<DGateRaft> {
        &self.raft
    }

    /// Bootstrap this node as a single-node cluster
    async fn bootstrap_single_node(&self) -> anyhow::Result<()> {
        let node_id = self.config.node_id;
        let mut members = BTreeMap::new();
        members.insert(
            node_id,
            BasicNode {
                addr: self.config.advertise_addr.clone(),
            },
        );

        match self.raft.initialize(members).await {
            Ok(_) => {
                info!("Successfully bootstrapped single-node Raft cluster");
                Ok(())
            }
            Err(e) => {
                if e.to_string().contains("already initialized") {
                    debug!("Raft cluster already initialized");
                    Ok(())
                } else {
                    Err(e.into())
                }
            }
        }
    }

    /// Bootstrap as the initial leader
    ///
    /// When bootstrapping, we include ALL initial members in the initial cluster.
    /// This avoids the complexity and race conditions of adding nodes one by one.
    async fn bootstrap_cluster(&self) -> anyhow::Result<()> {
        let node_id = self.config.node_id;
        let mut members = BTreeMap::new();

        // Add this node first
        members.insert(
            node_id,
            BasicNode {
                addr: self.config.advertise_addr.clone(),
            },
        );

        // Add all other initial members
        for member in &self.config.initial_members {
            if member.id != node_id {
                members.insert(
                    member.id,
                    BasicNode {
                        addr: member.addr.clone(),
                    },
                );
            }
        }

        info!(
            "Bootstrapping Raft cluster with {} members (node {} as leader)",
            members.len(),
            node_id
        );

        match self.raft.initialize(members).await {
            Ok(_) => {
                info!("Successfully bootstrapped Raft cluster");
                Ok(())
            }
            Err(e) => {
                if e.to_string().contains("already initialized") {
                    debug!("Raft cluster already initialized");
                    Ok(())
                } else {
                    Err(e.into())
                }
            }
        }
    }

    /// Join an existing cluster
    ///
    /// Since the bootstrap node initializes the cluster with all members,
    /// non-bootstrap nodes just need to wait for the leader to replicate to them.
    /// The Raft protocol will handle the log synchronization automatically.
    async fn join_cluster(&self) -> anyhow::Result<()> {
        let node_id = self.config.node_id;
        let my_addr = &self.config.advertise_addr;

        info!(
            "Node {} at {} waiting to receive Raft replication from leader",
            node_id, my_addr
        );

        // The node is already included in the initial membership by the bootstrap node.
        // Raft will automatically handle log replication once the leader connects to us.
        // We just need to be ready to receive append_entries RPCs.

        Ok(())
    }

    /// Discovery loop that periodically checks for new nodes
    async fn run_discovery_loop(
        discovery: Arc<NodeDiscovery>,
        raft: Arc<DGateRaft>,
        my_node_id: NodeId,
    ) {
        loop {
            let nodes = discovery.discover().await;
            for (node_id, node) in nodes {
                let metrics = raft.metrics().borrow().clone();
                if metrics.current_leader == Some(my_node_id) {
                    // Step 1: Add as learner
                    let add_learner =
                        ChangeMembers::AddNodes([(node_id, node.clone())].into_iter().collect());

                    match raft.change_membership(add_learner, false).await {
                        Ok(_) => {
                            info!("Added discovered node {} as learner", node_id);
                            // Step 2: Promote to voter
                            let mut voter_ids = BTreeSet::new();
                            voter_ids.insert(node_id);
                            let promote = ChangeMembers::AddVoterIds(voter_ids);
                            match raft.change_membership(promote, false).await {
                                Ok(_) => info!("Promoted discovered node {} to voter", node_id),
                                Err(e) => debug!("Could not promote node {}: {}", node_id, e),
                            }
                        }
                        Err(e) => debug!("Could not add node {}: {}", node_id, e),
                    }
                }
            }

            tokio::time::sleep(Duration::from_secs(10)).await;
        }
    }
}

#[async_trait]
impl Consensus for RaftConsensus {
    fn node_id(&self) -> NodeId {
        self.config.node_id
    }

    fn mode(&self) -> ClusterMode {
        self.config.mode
    }

    async fn initialize(&self) -> anyhow::Result<()> {
        let node_id = self.config.node_id;

        if self.config.bootstrap {
            info!(
                "Bootstrapping Raft cluster with node_id={} as initial leader",
                node_id
            );
            self.bootstrap_cluster().await?;
        } else if !self.config.initial_members.is_empty() {
            info!(
                "Joining existing Raft cluster with {} known members",
                self.config.initial_members.len()
            );
            self.join_cluster().await?;
        } else {
            warn!("No bootstrap flag and no initial members - starting as isolated node");
            self.bootstrap_single_node().await?;
        }

        // Start discovery background task if configured
        if let Some(ref discovery) = self.discovery {
            let discovery_clone = discovery.clone();
            let raft_clone = self.raft.clone();
            let my_node_id = self.config.node_id;
            tokio::spawn(async move {
                Self::run_discovery_loop(discovery_clone, raft_clone, my_node_id).await;
            });
        }

        Ok(())
    }

    async fn can_write(&self) -> bool {
        // In Raft, only the leader can accept writes
        let metrics = self.raft.metrics().borrow().clone();
        metrics.current_leader == Some(self.config.node_id)
    }

    async fn leader_id(&self) -> Option<NodeId> {
        self.raft.metrics().borrow().current_leader
    }

    async fn propose(&self, changelog: ChangeLog) -> anyhow::Result<ConsensusResponse> {
        let result: ClientWriteResponse<TypeConfig> = self
            .raft
            .client_write(changelog)
            .await
            .map_err(|e| anyhow::anyhow!("Raft write failed: {}", e))?;

        Ok(result.data.into())
    }

    async fn metrics(&self) -> ConsensusMetrics {
        let raft_metrics = self.raft.metrics().borrow().clone();
        let members = self.cached_members.read().await.clone();

        let state = match raft_metrics.state {
            openraft::ServerState::Leader => NodeState::Leader,
            openraft::ServerState::Follower => NodeState::Follower,
            openraft::ServerState::Candidate => NodeState::Candidate,
            openraft::ServerState::Learner => NodeState::Learner,
            openraft::ServerState::Shutdown => NodeState::Shutdown,
        };

        ConsensusMetrics {
            id: self.config.node_id,
            mode: self.config.mode,
            can_write: raft_metrics.current_leader == Some(self.config.node_id),
            leader_id: raft_metrics.current_leader,
            state,
            current_term: Some(raft_metrics.vote.leader_id().term),
            last_applied: raft_metrics.last_applied.map(|l| l.index),
            committed: raft_metrics.last_applied.map(|l| l.index),
            members,
            extra: None,
        }
    }

    async fn add_node(&self, node_id: NodeId, addr: String) -> anyhow::Result<()> {
        info!("Adding node {} at {} to Raft cluster", node_id, addr);

        // Step 1: Add the node as a learner first
        let node = BasicNode { addr: addr.clone() };
        let add_learner = ChangeMembers::AddNodes([(node_id, node)].into_iter().collect());

        self.raft
            .change_membership(add_learner, false)
            .await
            .map_err(|e| anyhow::anyhow!("Failed to add node as learner: {}", e))?;

        info!("Node {} added as learner, now promoting to voter", node_id);

        // Step 2: Promote the learner to a voter
        let mut voter_ids = BTreeSet::new();
        voter_ids.insert(node_id);
        let promote_voter = ChangeMembers::AddVoterIds(voter_ids);

        self.raft
            .change_membership(promote_voter, false)
            .await
            .map_err(|e| anyhow::anyhow!("Failed to promote node to voter: {}", e))?;

        info!("Node {} successfully promoted to voter", node_id);

        // Update cached members
        let mut cached = self.cached_members.write().await;
        if !cached.iter().any(|m| m.id == node_id) {
            cached.push(ClusterMember {
                id: node_id,
                addr,
                admin_port: None,
                tls: false,
            });
        }

        Ok(())
    }

    async fn remove_node(&self, node_id: NodeId) -> anyhow::Result<()> {
        info!("Removing node {} from Raft cluster", node_id);

        let mut remove_set = BTreeSet::new();
        remove_set.insert(node_id);

        let change = ChangeMembers::RemoveNodes(remove_set);

        self.raft
            .change_membership(change, false)
            .await
            .map_err(|e| anyhow::anyhow!("Failed to remove node: {}", e))?;

        // Update cached members
        let mut cached = self.cached_members.write().await;
        cached.retain(|m| m.id != node_id);

        Ok(())
    }

    async fn members(&self) -> Vec<ClusterMember> {
        self.cached_members.read().await.clone()
    }

    async fn shutdown(&self) -> anyhow::Result<()> {
        info!("Shutting down Raft consensus");
        self.raft
            .shutdown()
            .await
            .map_err(|e| anyhow::anyhow!("Shutdown failed: {}", e))?;
        Ok(())
    }
}