1use std::sync::Arc;
15
16use crate::server::{ServerError, ServerResult};
17
18#[cfg(feature = "cluster")]
21use amaters_cluster::{
22 ClusterCommand, Command, LogIndex, NodeId, PlacementStateMachine, RaftConfig, RaftError,
23 RaftNode, ShardId, ShardMetadata, ShardRegistry,
24};
25
26enum ClusterInner {
31 Standalone,
33 #[cfg(feature = "cluster")]
35 Raft {
36 raft: Arc<RaftNode>,
37 registry: Arc<ShardRegistry>,
38 },
39}
40
41pub struct ClusterHandle {
48 node_id: u64,
50 inner: ClusterInner,
51}
52
53impl ClusterHandle {
54 #[cfg(feature = "cluster")]
61 pub async fn start(
62 node_id: u64,
63 peers: Vec<(u64, std::net::SocketAddr)>,
64 ) -> ServerResult<Self> {
65 let registry = Arc::new(ShardRegistry::new());
66
67 let sm = PlacementStateMachine::new(Arc::clone(®istry));
69
70 let mut peer_ids: Vec<u64> = peers.iter().map(|(id, _)| *id).collect();
72 if !peer_ids.contains(&node_id) {
73 peer_ids.push(node_id);
74 }
75
76 let config = RaftConfig::new(node_id, peer_ids);
77
78 let raft = RaftNode::new(config)
79 .map_err(|e| ServerError::Cluster(format!("Failed to create RaftNode: {}", e)))?;
80
81 raft.set_state_machine(sm)
82 .map_err(|e| ServerError::Cluster(format!("Failed to set state machine: {}", e)))?;
83
84 let raft = Arc::new(raft);
85
86 Ok(Self {
87 node_id,
88 inner: ClusterInner::Raft { raft, registry },
89 })
90 }
91
92 pub async fn start_standalone(node_id: u64) -> ServerResult<Self> {
97 Ok(Self {
98 node_id,
99 inner: ClusterInner::Standalone,
100 })
101 }
102
103 pub fn is_leader(&self) -> bool {
105 match &self.inner {
106 ClusterInner::Standalone => true,
107 #[cfg(feature = "cluster")]
108 ClusterInner::Raft { raft, .. } => raft.is_leader(),
109 }
110 }
111
112 pub fn node_id(&self) -> u64 {
114 self.node_id
115 }
116
117 pub fn shard_count(&self) -> usize {
121 match &self.inner {
122 ClusterInner::Standalone => 0,
123 #[cfg(feature = "cluster")]
124 ClusterInner::Raft { registry, .. } => registry.count(),
125 }
126 }
127
128 #[cfg(feature = "cluster")]
130 pub fn raft_node(&self) -> Option<Arc<RaftNode>> {
131 match &self.inner {
132 ClusterInner::Raft { raft, .. } => Some(Arc::clone(raft)),
133 ClusterInner::Standalone => None,
134 }
135 }
136
137 #[cfg(feature = "cluster")]
139 pub fn shard_registry(&self) -> Option<Arc<ShardRegistry>> {
140 match &self.inner {
141 ClusterInner::Raft { registry, .. } => Some(Arc::clone(registry)),
142 ClusterInner::Standalone => None,
143 }
144 }
145
146 #[cfg(feature = "cluster")]
152 pub fn list_shards(&self) -> Vec<ShardMetadata> {
153 match &self.inner {
154 ClusterInner::Standalone => vec![],
155 ClusterInner::Raft { registry, .. } => registry.get_all(),
156 }
157 }
158
159 #[cfg(feature = "cluster")]
163 pub fn shards_on_node(&self, node_id: NodeId) -> Vec<ShardMetadata> {
164 match &self.inner {
165 ClusterInner::Standalone => vec![],
166 ClusterInner::Raft { registry, .. } => registry.get_by_node(node_id),
167 }
168 }
169
170 #[cfg(feature = "cluster")]
174 pub fn find_shard_for_key(&self, key: &amaters_core::Key) -> Option<ShardMetadata> {
175 match &self.inner {
176 ClusterInner::Standalone => None,
177 ClusterInner::Raft { registry, .. } => registry.find_shard_for_key(key),
178 }
179 }
180
181 #[cfg(feature = "cluster")]
186 pub fn propose_split(
187 &self,
188 shard_id: ShardId,
189 split_key: amaters_core::Key,
190 ) -> ServerResult<LogIndex> {
191 match &self.inner {
192 ClusterInner::Standalone => Err(ServerError::Cluster(
193 "NotLeader: standalone node has no Raft consensus".to_string(),
194 )),
195 ClusterInner::Raft { raft, .. } => {
196 let cmd_bytes = ClusterCommand::PlaceSplit {
197 shard_id,
198 split_key: split_key.as_bytes().to_vec(),
199 }
200 .encode();
201 let cmd = Command::new(cmd_bytes);
202 raft.propose(cmd).map_err(|e| match &e {
203 RaftError::NotLeader { leader_id } => ServerError::Cluster(format!(
204 "NotLeader: current leader is {:?}",
205 leader_id
206 )),
207 _ => ServerError::Cluster(format!("Raft propose error: {}", e)),
208 })
209 }
210 }
211 }
212
213 #[cfg(feature = "cluster")]
218 pub fn propose_merge(
219 &self,
220 left_shard_id: ShardId,
221 right_shard_id: ShardId,
222 ) -> ServerResult<LogIndex> {
223 match &self.inner {
224 ClusterInner::Standalone => Err(ServerError::Cluster(
225 "NotLeader: standalone node has no Raft consensus".to_string(),
226 )),
227 ClusterInner::Raft { raft, .. } => {
228 let cmd_bytes = ClusterCommand::PlaceMerge {
229 left_shard_id,
230 right_shard_id,
231 }
232 .encode();
233 let cmd = Command::new(cmd_bytes);
234 raft.propose(cmd).map_err(|e| match &e {
235 RaftError::NotLeader { leader_id } => ServerError::Cluster(format!(
236 "NotLeader: current leader is {:?}",
237 leader_id
238 )),
239 _ => ServerError::Cluster(format!("Raft propose error: {}", e)),
240 })
241 }
242 }
243 }
244
245 #[cfg(feature = "cluster")]
250 pub fn propose_transfer(
251 &self,
252 shard_id: ShardId,
253 from_node: NodeId,
254 to_node: NodeId,
255 ) -> ServerResult<LogIndex> {
256 match &self.inner {
257 ClusterInner::Standalone => Err(ServerError::Cluster(
258 "NotLeader: standalone node has no Raft consensus".to_string(),
259 )),
260 ClusterInner::Raft { raft, .. } => {
261 let cmd_bytes = ClusterCommand::PlaceTransfer {
262 shard_id,
263 from_node,
264 to_node,
265 }
266 .encode();
267 let cmd = Command::new(cmd_bytes);
268 raft.propose(cmd).map_err(|e| match &e {
269 RaftError::NotLeader { leader_id } => ServerError::Cluster(format!(
270 "NotLeader: current leader is {:?}",
271 leader_id
272 )),
273 _ => ServerError::Cluster(format!("Raft propose error: {}", e)),
274 })
275 }
276 }
277 }
278}
279
280#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[tokio::test]
287 async fn test_standalone_handle_is_leader() {
288 let handle = ClusterHandle::start_standalone(1)
289 .await
290 .expect("standalone");
291 assert!(
292 handle.is_leader(),
293 "standalone should always report is_leader"
294 );
295 assert_eq!(handle.node_id(), 1);
296 }
297
298 #[tokio::test]
299 async fn test_standalone_shard_count_is_zero() {
300 let handle = ClusterHandle::start_standalone(7)
301 .await
302 .expect("standalone");
303 assert_eq!(handle.shard_count(), 0);
304 }
305
306 #[cfg(feature = "cluster")]
308 #[tokio::test]
309 async fn test_cluster_start_three_node() {
310 let peers: Vec<(u64, std::net::SocketAddr)> = vec![
311 (1, "127.0.0.1:17878".parse().expect("addr1")),
312 (2, "127.0.0.1:17879".parse().expect("addr2")),
313 (3, "127.0.0.1:17880".parse().expect("addr3")),
314 ];
315 let handle = ClusterHandle::start(1, peers).await.expect("start cluster");
316 let _ = handle.is_leader();
318 assert_eq!(handle.shard_count(), 0);
319 assert_eq!(handle.node_id(), 1);
320 }
321
322 #[cfg(feature = "cluster")]
324 #[tokio::test]
325 async fn test_standalone_propose_split_is_error() {
326 let handle = ClusterHandle::start_standalone(1)
327 .await
328 .expect("standalone");
329 let key = amaters_core::Key::from_slice(&[0x80]);
330 let result = handle.propose_split(1, key);
331 assert!(
332 result.is_err(),
333 "standalone propose_split must return an error"
334 );
335 }
336}
337
338#[cfg(all(test, feature = "cluster"))]
341mod cluster_tests {
342 use super::*;
343 use amaters_cluster::{PlacementStateMachine, RaftConfig, RaftNode, ShardRegistry};
344
345 struct TestCluster {
349 nodes: Vec<Arc<RaftNode>>,
350 #[allow(dead_code)]
351 registries: Vec<Arc<ShardRegistry>>,
352 }
353
354 impl TestCluster {
355 fn new_three_node() -> Self {
356 let peer_ids = vec![1u64, 2, 3];
357 let mut nodes = Vec::new();
358 let mut registries = Vec::new();
359
360 for &id in &peer_ids {
361 let registry = Arc::new(ShardRegistry::new());
362 let sm = PlacementStateMachine::new(Arc::clone(®istry));
363 let config = RaftConfig::new(id, peer_ids.clone());
364 let node = RaftNode::new(config).expect("create node");
365 node.set_state_machine(sm).expect("set sm");
366 nodes.push(Arc::new(node));
367 registries.push(registry);
368 }
369
370 Self { nodes, registries }
371 }
372
373 #[allow(dead_code)]
377 fn pump(&self, rounds: usize) -> usize {
378 for round in 0..rounds {
379 let mut any_sent = false;
380
381 for sender_idx in 0..self.nodes.len() {
383 let sender = &self.nodes[sender_idx];
384 let messages = sender.replicate_to_followers();
385
386 for (target_id, req) in messages {
387 if let Some(target) = self.nodes.iter().find(|n| n.node_id() == target_id) {
389 let resp = target.handle_append_entries(req);
390 let _ = sender.handle_replication_response(target_id, resp);
391 any_sent = true;
392 }
393 }
394 }
395
396 if !any_sent {
397 return round + 1;
398 }
399 }
400 rounds
401 }
402
403 #[allow(dead_code)]
405 fn find_leader(&self) -> Option<&Arc<RaftNode>> {
406 self.nodes.iter().find(|n| n.is_leader())
407 }
408
409 #[allow(dead_code)]
411 fn commit_index(&self, idx: usize) -> u64 {
412 self.nodes[idx].commit_index()
413 }
414 }
415
416 #[test]
419 fn test_leader_election_three_node() {
420 let cluster = TestCluster::new_three_node();
421
422 cluster.pump(50);
427
428 let leaders: Vec<_> = cluster.nodes.iter().filter(|n| n.is_leader()).collect();
429 assert!(
430 leaders.len() <= 1,
431 "At most one leader should exist; found {}",
432 leaders.len()
433 );
434 }
435
436 #[test]
437 fn test_multi_node_replication() {
438 let cluster = TestCluster::new_three_node();
439
440 for node in &cluster.nodes {
443 if !node.is_leader() {
444 let msgs = node.replicate_to_followers();
446 let _ = msgs;
447 }
448 }
449
450 cluster.pump(10);
452
453 for idx in 0..cluster.nodes.len() {
455 let ci = cluster.commit_index(idx);
456 let _ = ci; }
458 }
459
460 #[test]
461 fn test_read_your_writes_leader_routed() {
462 let cluster = TestCluster::new_three_node();
470
471 let mut leader_found = false;
472 for node in &cluster.nodes {
473 let cmd = Command::new(vec![0u8]);
474 match node.propose(cmd) {
475 Ok(_index) => {
476 leader_found = true;
477 cluster.pump(20);
479 let _ = node.commit_index();
481 }
482 Err(amaters_cluster::RaftError::NotLeader { .. }) => {
483 }
485 Err(e) => panic!("Unexpected error: {:?}", e),
486 }
487 }
488 let _ = leader_found;
490 }
491
492 #[test]
495 #[ignore = "needs live gRPC Raft transport — no socket transport is wired; cross-process replication requires the Phase 8 transport layer"]
496 fn test_cross_process_replication_via_grpc() {
497 unimplemented!()
501 }
502
503 #[test]
504 #[ignore = "needs ReadIndex/quorum-confirmed linearizable read — RaftNode has no read_index() API; see TODO for quorum reads"]
505 fn test_quorum_linearizable_read() {
506 unimplemented!()
509 }
510}