Skip to main content

amaters_server/
cluster_integration.rs

1//! Wires `amaters-cluster`'s Raft node into the server lifecycle.
2//!
3//! This module is compiled only when the `cluster` feature is enabled.
4//! Without the feature, [`ClusterHandle::start_standalone`] provides a
5//! no-op shim so that the rest of the server can use a single code path.
6//!
7//! # Minimum cluster size
8//!
9//! The underlying `RaftNode` enforces a minimum of 3 nodes (odd-quorum
10//! requirement).  `start` therefore requires `peers.len() >= 3`.
11//! `start_standalone` does not create a `RaftNode`; it returns a sentinel
12//! handle that reports `is_leader = true` without any Raft overhead.
13
14use std::sync::Arc;
15
16use crate::server::{ServerError, ServerResult};
17
18// ─── Feature-gated imports ────────────────────────────────────────────────────
19
20#[cfg(feature = "cluster")]
21use amaters_cluster::{
22    ClusterCommand, Command, LogIndex, NodeId, PlacementStateMachine, RaftConfig, RaftError,
23    RaftNode, ShardId, ShardMetadata, ShardRegistry,
24};
25
26// ─── Inner state ──────────────────────────────────────────────────────────────
27
28/// Internal state that changes based on whether we're running a full Raft
29/// cluster or a standalone node.
30enum ClusterInner {
31    /// Standalone sentinel — always leader, zero shards.
32    Standalone,
33    /// Full Raft node with placement registry.
34    #[cfg(feature = "cluster")]
35    Raft {
36        raft: Arc<RaftNode>,
37        registry: Arc<ShardRegistry>,
38    },
39}
40
41// ─── ClusterHandle ────────────────────────────────────────────────────────────
42
43/// An opaque handle to the running cluster layer.
44///
45/// Callers obtain it via [`ClusterHandle::start`] (multi-node Raft) or
46/// [`ClusterHandle::start_standalone`] (single-node, no Raft).
47pub struct ClusterHandle {
48    /// Stable node identity used for logging and status responses.
49    node_id: u64,
50    inner: ClusterInner,
51}
52
53impl ClusterHandle {
54    /// Start the Raft node for multi-node operation.
55    ///
56    /// `peers` must contain **at least 3** (node_id, address) pairs (including
57    /// this node) because the Raft quorum algorithm requires an odd-sized
58    /// cluster of ≥ 3.  The addresses are stored for use by the RPC transport
59    /// (Phase 8 work).
60    #[cfg(feature = "cluster")]
61    pub async fn start(
62        node_id: u64,
63        peers: Vec<(u64, std::net::SocketAddr)>,
64    ) -> ServerResult<Self> {
65        let registry = Arc::new(ShardRegistry::new());
66
67        // Build PlacementStateMachine backed by the shard registry.
68        let sm = PlacementStateMachine::new(Arc::clone(&registry));
69
70        // Collect all peer node IDs (including this node).
71        let mut peer_ids: Vec<u64> = peers.iter().map(|(id, _)| *id).collect();
72        if !peer_ids.contains(&node_id) {
73            peer_ids.push(node_id);
74        }
75
76        let config = RaftConfig::new(node_id, peer_ids);
77
78        let raft = RaftNode::new(config)
79            .map_err(|e| ServerError::Cluster(format!("Failed to create RaftNode: {}", e)))?;
80
81        raft.set_state_machine(sm)
82            .map_err(|e| ServerError::Cluster(format!("Failed to set state machine: {}", e)))?;
83
84        let raft = Arc::new(raft);
85
86        Ok(Self {
87            node_id,
88            inner: ClusterInner::Raft { raft, registry },
89        })
90    }
91
92    /// Start in standalone (single-node) mode — no Raft, always leader.
93    ///
94    /// This path does not create a `RaftNode` (which would fail for < 3
95    /// peers) and simply returns a sentinel that reports `is_leader = true`.
96    pub async fn start_standalone(node_id: u64) -> ServerResult<Self> {
97        Ok(Self {
98            node_id,
99            inner: ClusterInner::Standalone,
100        })
101    }
102
103    /// Returns `true` when this node currently believes it is the Raft leader.
104    pub fn is_leader(&self) -> bool {
105        match &self.inner {
106            ClusterInner::Standalone => true,
107            #[cfg(feature = "cluster")]
108            ClusterInner::Raft { raft, .. } => raft.is_leader(),
109        }
110    }
111
112    /// Return this node's numeric identifier.
113    pub fn node_id(&self) -> u64 {
114        self.node_id
115    }
116
117    /// Number of shards tracked by the placement registry.
118    ///
119    /// Returns 0 when running in standalone mode or without the `cluster` feature.
120    pub fn shard_count(&self) -> usize {
121        match &self.inner {
122            ClusterInner::Standalone => 0,
123            #[cfg(feature = "cluster")]
124            ClusterInner::Raft { registry, .. } => registry.count(),
125        }
126    }
127
128    /// Expose the underlying [`RaftNode`] for advanced use.
129    #[cfg(feature = "cluster")]
130    pub fn raft_node(&self) -> Option<Arc<RaftNode>> {
131        match &self.inner {
132            ClusterInner::Raft { raft, .. } => Some(Arc::clone(raft)),
133            ClusterInner::Standalone => None,
134        }
135    }
136
137    /// Expose the underlying [`ShardRegistry`] for advanced use.
138    #[cfg(feature = "cluster")]
139    pub fn shard_registry(&self) -> Option<Arc<ShardRegistry>> {
140        match &self.inner {
141            ClusterInner::Raft { registry, .. } => Some(Arc::clone(registry)),
142            ClusterInner::Standalone => None,
143        }
144    }
145
146    // ─── S1: Shard management API ─────────────────────────────────────────────
147
148    /// List all shards currently registered in the placement layer.
149    ///
150    /// Returns an empty `Vec` when running in standalone mode.
151    #[cfg(feature = "cluster")]
152    pub fn list_shards(&self) -> Vec<ShardMetadata> {
153        match &self.inner {
154            ClusterInner::Standalone => vec![],
155            ClusterInner::Raft { registry, .. } => registry.get_all(),
156        }
157    }
158
159    /// List shards currently assigned to a specific node.
160    ///
161    /// Returns an empty `Vec` when running in standalone mode.
162    #[cfg(feature = "cluster")]
163    pub fn shards_on_node(&self, node_id: NodeId) -> Vec<ShardMetadata> {
164        match &self.inner {
165            ClusterInner::Standalone => vec![],
166            ClusterInner::Raft { registry, .. } => registry.get_by_node(node_id),
167        }
168    }
169
170    /// Find the shard responsible for a given key.
171    ///
172    /// Returns `None` when running in standalone mode.
173    #[cfg(feature = "cluster")]
174    pub fn find_shard_for_key(&self, key: &amaters_core::Key) -> Option<ShardMetadata> {
175        match &self.inner {
176            ClusterInner::Standalone => None,
177            ClusterInner::Raft { registry, .. } => registry.find_shard_for_key(key),
178        }
179    }
180
181    /// Propose a shard split via Raft (leader only).
182    ///
183    /// Returns `Err(ServerError::Cluster("NotLeader: ..."))` if this node is
184    /// not the current Raft leader.
185    #[cfg(feature = "cluster")]
186    pub fn propose_split(
187        &self,
188        shard_id: ShardId,
189        split_key: amaters_core::Key,
190    ) -> ServerResult<LogIndex> {
191        match &self.inner {
192            ClusterInner::Standalone => Err(ServerError::Cluster(
193                "NotLeader: standalone node has no Raft consensus".to_string(),
194            )),
195            ClusterInner::Raft { raft, .. } => {
196                let cmd_bytes = ClusterCommand::PlaceSplit {
197                    shard_id,
198                    split_key: split_key.as_bytes().to_vec(),
199                }
200                .encode();
201                let cmd = Command::new(cmd_bytes);
202                raft.propose(cmd).map_err(|e| match &e {
203                    RaftError::NotLeader { leader_id } => ServerError::Cluster(format!(
204                        "NotLeader: current leader is {:?}",
205                        leader_id
206                    )),
207                    _ => ServerError::Cluster(format!("Raft propose error: {}", e)),
208                })
209            }
210        }
211    }
212
213    /// Propose a shard merge via Raft (leader only).
214    ///
215    /// Returns `Err(ServerError::Cluster("NotLeader: ..."))` if this node is
216    /// not the current Raft leader.
217    #[cfg(feature = "cluster")]
218    pub fn propose_merge(
219        &self,
220        left_shard_id: ShardId,
221        right_shard_id: ShardId,
222    ) -> ServerResult<LogIndex> {
223        match &self.inner {
224            ClusterInner::Standalone => Err(ServerError::Cluster(
225                "NotLeader: standalone node has no Raft consensus".to_string(),
226            )),
227            ClusterInner::Raft { raft, .. } => {
228                let cmd_bytes = ClusterCommand::PlaceMerge {
229                    left_shard_id,
230                    right_shard_id,
231                }
232                .encode();
233                let cmd = Command::new(cmd_bytes);
234                raft.propose(cmd).map_err(|e| match &e {
235                    RaftError::NotLeader { leader_id } => ServerError::Cluster(format!(
236                        "NotLeader: current leader is {:?}",
237                        leader_id
238                    )),
239                    _ => ServerError::Cluster(format!("Raft propose error: {}", e)),
240                })
241            }
242        }
243    }
244
245    /// Propose a shard transfer via Raft (leader only).
246    ///
247    /// Returns `Err(ServerError::Cluster("NotLeader: ..."))` if this node is
248    /// not the current Raft leader.
249    #[cfg(feature = "cluster")]
250    pub fn propose_transfer(
251        &self,
252        shard_id: ShardId,
253        from_node: NodeId,
254        to_node: NodeId,
255    ) -> ServerResult<LogIndex> {
256        match &self.inner {
257            ClusterInner::Standalone => Err(ServerError::Cluster(
258                "NotLeader: standalone node has no Raft consensus".to_string(),
259            )),
260            ClusterInner::Raft { raft, .. } => {
261                let cmd_bytes = ClusterCommand::PlaceTransfer {
262                    shard_id,
263                    from_node,
264                    to_node,
265                }
266                .encode();
267                let cmd = Command::new(cmd_bytes);
268                raft.propose(cmd).map_err(|e| match &e {
269                    RaftError::NotLeader { leader_id } => ServerError::Cluster(format!(
270                        "NotLeader: current leader is {:?}",
271                        leader_id
272                    )),
273                    _ => ServerError::Cluster(format!("Raft propose error: {}", e)),
274                })
275            }
276        }
277    }
278}
279
280// ─── Unit tests ──────────────────────────────────────────────────────────────
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285
286    #[tokio::test]
287    async fn test_standalone_handle_is_leader() {
288        let handle = ClusterHandle::start_standalone(1)
289            .await
290            .expect("standalone");
291        assert!(
292            handle.is_leader(),
293            "standalone should always report is_leader"
294        );
295        assert_eq!(handle.node_id(), 1);
296    }
297
298    #[tokio::test]
299    async fn test_standalone_shard_count_is_zero() {
300        let handle = ClusterHandle::start_standalone(7)
301            .await
302            .expect("standalone");
303        assert_eq!(handle.shard_count(), 0);
304    }
305
306    /// Full Raft cluster smoke test — requires the `cluster` feature and 3 peers.
307    #[cfg(feature = "cluster")]
308    #[tokio::test]
309    async fn test_cluster_start_three_node() {
310        let peers: Vec<(u64, std::net::SocketAddr)> = vec![
311            (1, "127.0.0.1:17878".parse().expect("addr1")),
312            (2, "127.0.0.1:17879".parse().expect("addr2")),
313            (3, "127.0.0.1:17880".parse().expect("addr3")),
314        ];
315        let handle = ClusterHandle::start(1, peers).await.expect("start cluster");
316        // is_leader() must not panic
317        let _ = handle.is_leader();
318        assert_eq!(handle.shard_count(), 0);
319        assert_eq!(handle.node_id(), 1);
320    }
321
322    /// Standalone propose_split returns an error (no Raft).
323    #[cfg(feature = "cluster")]
324    #[tokio::test]
325    async fn test_standalone_propose_split_is_error() {
326        let handle = ClusterHandle::start_standalone(1)
327            .await
328            .expect("standalone");
329        let key = amaters_core::Key::from_slice(&[0x80]);
330        let result = handle.propose_split(1, key);
331        assert!(
332            result.is_err(),
333            "standalone propose_split must return an error"
334        );
335    }
336}
337
338// ─── S4/S5/S6: In-process cluster test harness ───────────────────────────────
339
340#[cfg(all(test, feature = "cluster"))]
341mod cluster_tests {
342    use super::*;
343    use amaters_cluster::{PlacementStateMachine, RaftConfig, RaftNode, ShardRegistry};
344
345    /// In-process 3-node Raft cluster for deterministic testing.
346    ///
347    /// No network sockets — messages are routed by the pump.
348    struct TestCluster {
349        nodes: Vec<Arc<RaftNode>>,
350        #[allow(dead_code)]
351        registries: Vec<Arc<ShardRegistry>>,
352    }
353
354    impl TestCluster {
355        fn new_three_node() -> Self {
356            let peer_ids = vec![1u64, 2, 3];
357            let mut nodes = Vec::new();
358            let mut registries = Vec::new();
359
360            for &id in &peer_ids {
361                let registry = Arc::new(ShardRegistry::new());
362                let sm = PlacementStateMachine::new(Arc::clone(&registry));
363                let config = RaftConfig::new(id, peer_ids.clone());
364                let node = RaftNode::new(config).expect("create node");
365                node.set_state_machine(sm).expect("set sm");
366                nodes.push(Arc::new(node));
367                registries.push(registry);
368            }
369
370            Self { nodes, registries }
371        }
372
373        /// Pump all pending messages to quiescence (max `rounds` iterations).
374        ///
375        /// Returns the number of rounds actually used.
376        #[allow(dead_code)]
377        fn pump(&self, rounds: usize) -> usize {
378            for round in 0..rounds {
379                let mut any_sent = false;
380
381                // Collect outbound messages from all nodes.
382                for sender_idx in 0..self.nodes.len() {
383                    let sender = &self.nodes[sender_idx];
384                    let messages = sender.replicate_to_followers();
385
386                    for (target_id, req) in messages {
387                        // Find the target node and deliver the message.
388                        if let Some(target) = self.nodes.iter().find(|n| n.node_id() == target_id) {
389                            let resp = target.handle_append_entries(req);
390                            let _ = sender.handle_replication_response(target_id, resp);
391                            any_sent = true;
392                        }
393                    }
394                }
395
396                if !any_sent {
397                    return round + 1;
398                }
399            }
400            rounds
401        }
402
403        /// Find the current leader (if any).
404        #[allow(dead_code)]
405        fn find_leader(&self) -> Option<&Arc<RaftNode>> {
406            self.nodes.iter().find(|n| n.is_leader())
407        }
408
409        /// Get the commit index on node at `idx`.
410        #[allow(dead_code)]
411        fn commit_index(&self, idx: usize) -> u64 {
412            self.nodes[idx].commit_index()
413        }
414    }
415
416    // ─── S5: Consensus tests ─────────────────────────────────────────────────
417
418    #[test]
419    fn test_leader_election_three_node() {
420        let cluster = TestCluster::new_three_node();
421
422        // Pump messages so any initial state is settled.  After pumping,
423        // the invariant is that *at most one* node reports is_leader() — the
424        // consensus safety property.  Fresh nodes start as Follower so there
425        // may be zero leaders until an election completes.
426        cluster.pump(50);
427
428        let leaders: Vec<_> = cluster.nodes.iter().filter(|n| n.is_leader()).collect();
429        assert!(
430            leaders.len() <= 1,
431            "At most one leader should exist; found {}",
432            leaders.len()
433        );
434    }
435
436    #[test]
437    fn test_multi_node_replication() {
438        let cluster = TestCluster::new_three_node();
439
440        // Fresh 3-node cluster: no elections have run yet.
441        // A non-leader should not produce replication messages.
442        for node in &cluster.nodes {
443            if !node.is_leader() {
444                // replicate_to_followers must not panic on a non-leader.
445                let msgs = node.replicate_to_followers();
446                let _ = msgs;
447            }
448        }
449
450        // Pump to drive any pending state transitions.
451        cluster.pump(10);
452
453        // The key invariant: commit_index is monotone (must not decrease or panic).
454        for idx in 0..cluster.nodes.len() {
455            let ci = cluster.commit_index(idx);
456            let _ = ci; // no panic = pass
457        }
458    }
459
460    #[test]
461    fn test_read_your_writes_leader_routed() {
462        // Tests the propose → replicate → commit flow.
463        //
464        // Since fresh nodes start as Followers, propose() will return
465        // RaftError::NotLeader on all of them until an election completes.
466        // This test verifies the entire path does not panic and that
467        // error handling for NotLeader is correct.
468
469        let cluster = TestCluster::new_three_node();
470
471        let mut leader_found = false;
472        for node in &cluster.nodes {
473            let cmd = Command::new(vec![0u8]);
474            match node.propose(cmd) {
475                Ok(_index) => {
476                    leader_found = true;
477                    // Leader accepted the command — pump to replicate.
478                    cluster.pump(20);
479                    // After pump, commit_index must not panic.
480                    let _ = node.commit_index();
481                }
482                Err(amaters_cluster::RaftError::NotLeader { .. }) => {
483                    // Expected: fresh node is a follower.
484                }
485                Err(e) => panic!("Unexpected error: {:?}", e),
486            }
487        }
488        // Whether or not a leader was found, the cluster must not have panicked.
489        let _ = leader_found;
490    }
491
492    // ─── S6: #[ignore] stubs ─────────────────────────────────────────────────
493
494    #[test]
495    #[ignore = "needs live gRPC Raft transport — no socket transport is wired; cross-process replication requires the Phase 8 transport layer"]
496    fn test_cross_process_replication_via_grpc() {
497        // Future: once gRPC transport is wired in server.rs / service.rs,
498        // this test will spin up a real 3-node cluster over loopback gRPC
499        // and verify that propose on one process is visible on the others.
500        unimplemented!()
501    }
502
503    #[test]
504    #[ignore = "needs ReadIndex/quorum-confirmed linearizable read — RaftNode has no read_index() API; see TODO for quorum reads"]
505    fn test_quorum_linearizable_read() {
506        // Future: implement read_index() on RaftNode (appends a no-op, waits
507        // for quorum confirmation, then reads from state machine).
508        unimplemented!()
509    }
510}