Skip to main content

smg_mesh/
service.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    net::SocketAddr,
4    str::FromStr,
5    sync::Arc,
6    time::Duration,
7};
8
9use anyhow::Result;
10use parking_lot::RwLock;
11use tokio::sync::watch;
12use tonic::{
13    transport::{ClientTlsConfig, Endpoint},
14    Request,
15};
16use tracing as log;
17
18use crate::flow_control::MAX_MESSAGE_SIZE;
19
20pub mod gossip {
21    #![allow(unused_qualifications, clippy::absolute_paths)]
22    #![allow(clippy::trivially_copy_pass_by_ref, clippy::allow_attributes)]
23    tonic::include_proto!("mesh.gossip");
24}
25use gossip::{
26    gossip_client, gossip_message, GossipMessage, NodeState, NodeStatus, NodeUpdate, Ping,
27    StateSync,
28};
29
30use crate::{
31    controller::MeshController,
32    mtls::{MTLSConfig, MTLSManager},
33    node_state_machine::{ConvergenceConfig, NodeStateMachine},
34    partition::PartitionDetector,
35    ping_server::GossipService,
36    stores::{AppState, StateStores},
37    sync::MeshSyncManager,
38};
39
40pub type ClusterState = Arc<RwLock<BTreeMap<String, NodeState>>>;
41
42pub struct MeshServerConfig {
43    pub self_name: String,
44    pub bind_addr: SocketAddr,
45    pub advertise_addr: SocketAddr,
46    pub init_peer: Option<SocketAddr>,
47    pub mtls_config: Option<MTLSConfig>,
48}
49
50/// MeshServerHandler
51/// It is the handler for the mesh server, which is responsible for the node management.
52/// Includes some basic node management logic, like shutdown,
53/// node discovery(TODO), node status update(TODO), etc.
54pub struct MeshServerHandler {
55    pub state: ClusterState,
56    pub stores: Arc<StateStores>,
57    pub sync_manager: Arc<MeshSyncManager>,
58    pub self_name: String,
59    _self_addr: SocketAddr,
60    signal_tx: watch::Sender<bool>,
61    partition_detector: Option<Arc<PartitionDetector>>,
62    state_machine: Option<Arc<NodeStateMachine>>,
63    rate_limit_task_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
64}
65
66impl MeshServerHandler {
67    /// Get partition detector
68    pub fn partition_detector(&self) -> Option<&Arc<PartitionDetector>> {
69        self.partition_detector.as_ref()
70    }
71
72    /// Get state machine
73    pub fn state_machine(&self) -> Option<&Arc<NodeStateMachine>> {
74        self.state_machine.as_ref()
75    }
76
77    /// Check if node is ready
78    pub fn is_ready(&self) -> bool {
79        self.state_machine
80            .as_ref()
81            .map(|sm| sm.is_ready())
82            .unwrap_or(true) // If no state machine, consider ready
83    }
84
85    /// Check if we should serve (have quorum)
86    pub fn should_serve(&self) -> bool {
87        self.partition_detector
88            .as_ref()
89            .map(|pd| pd.should_serve())
90            .unwrap_or(true) // If no partition detector, consider should serve
91    }
92
93    /// Start rate limit window reset task
94    /// This task will periodically reset the global rate limit counter
95    pub fn start_rate_limit_task(&self, window_seconds: u64) {
96        use crate::rate_limit_window::RateLimitWindow;
97
98        let window_manager = RateLimitWindow::new(self.sync_manager.clone(), window_seconds);
99        let shutdown_rx = self.signal_tx.subscribe();
100
101        #[expect(
102            clippy::disallowed_methods,
103            reason = "handle is stored in rate_limit_task_handle and awaited on shutdown via stop_rate_limit_task"
104        )]
105        let handle = tokio::spawn(async move {
106            window_manager.start_reset_task(shutdown_rx).await;
107        });
108
109        if let Ok(mut task_handle) = self.rate_limit_task_handle.lock() {
110            *task_handle = Some(handle);
111        }
112    }
113
114    /// Stop rate limit window reset task
115    pub fn stop_rate_limit_task(&self) {
116        self.signal_tx.send(true).ok();
117        if let Ok(mut task_handle) = self.rate_limit_task_handle.lock() {
118            if let Some(handle) = task_handle.take() {
119                #[expect(
120                    clippy::disallowed_methods,
121                    reason = "short-lived join task that awaits the rate_limit_task handle during shutdown; completes when the inner task finishes"
122                )]
123                tokio::spawn(async move {
124                    if let Err(err) = handle.await {
125                        log::warn!("Rate limit task shutdown failed: {}", err);
126                    }
127                });
128            }
129        }
130    }
131
132    /// Shutdown immediately without graceful shutdown
133    pub fn shutdown(&self) {
134        self.stop_rate_limit_task();
135    }
136
137    /// Graceful shutdown: broadcast LEAVING status to all alive nodes,
138    /// wait for propagation, then shutdown
139    pub async fn graceful_shutdown(&self) -> Result<()> {
140        log::info!("Graceful shutdown for node {}", self.self_name);
141
142        let maybe_leaving = {
143            let state = self.state.read();
144
145            if let Some(self_node) = state.get(&self.self_name) {
146                let mut self_node = self_node.clone();
147                if self_node.status == NodeStatus::Leaving as i32 {
148                    None
149                } else {
150                    self_node.status = NodeStatus::Leaving as i32;
151                    self_node.version += 1;
152
153                    let alive_nodes = state
154                        .values()
155                        .filter(|node| {
156                            node.status == NodeStatus::Alive as i32 && node.name != self.self_name
157                            // exclude self from broadcast targets
158                        })
159                        .cloned()
160                        .collect::<Vec<NodeState>>();
161
162                    Some((self_node, alive_nodes))
163                }
164            } else {
165                None
166            }
167        };
168        let (leaving_node, alive_nodes) = match maybe_leaving {
169            Some(values) => values,
170            None => {
171                self.stop_rate_limit_task();
172                return Ok(());
173            }
174        };
175
176        log::info!(
177            "Broadcasting LEAVING status to {} alive nodes",
178            alive_nodes.len()
179        );
180
181        // Broadcast LEAVING status to all alive nodes
182        let (success_count, total_count) = broadcast_node_states(
183            vec![leaving_node],
184            alive_nodes,
185            Some(Duration::from_secs(3)),
186        )
187        .await;
188
189        log::info!(
190            "Broadcast LEAVING status: {}/{} successful",
191            success_count,
192            total_count
193        );
194
195        // Wait a bit more for state propagation
196        let propagation_delay = Duration::from_secs(1);
197        log::info!(
198            "Waiting {} seconds for LEAVING status propagation",
199            propagation_delay.as_secs()
200        );
201        tokio::time::sleep(propagation_delay).await;
202
203        log::info!("Stopping rate limit task and signaling shutdown");
204        self.stop_rate_limit_task();
205        Ok(())
206    }
207
208    /// Calculate the next version for a key
209    /// If the key exists, increment its version by 1
210    /// If the key doesn't exist, start with version 1
211    fn next_version(&self, key: &str) -> u64 {
212        self.stores
213            .app
214            .get(key)
215            .map(|app_state| app_state.version + 1)
216            .unwrap_or(1)
217    }
218
219    pub fn write_data(&self, key: String, value: Vec<u8>) -> Result<()> {
220        // Keep app store write and metadata/version update in one lock scope.
221        let mut state = self.state.write();
222        let node = state.get_mut(&self.self_name).ok_or_else(|| {
223            anyhow::anyhow!(
224                "Node {} not found in cluster state during write_data",
225                self.self_name
226            )
227        })?;
228
229        let version = self.next_version(&key);
230        let app_state = AppState {
231            key: key.clone(),
232            value: value.clone(),
233            version,
234        };
235        self.stores
236            .app
237            .insert(key.clone(), app_state)
238            .map_err(|err| anyhow::anyhow!("Failed to persist app state for key {key}: {err}"))?;
239
240        node.metadata.insert(key, value);
241        node.version += 1;
242        Ok(())
243    }
244
245    pub fn read_data(&self, key: String) -> Option<Vec<u8>> {
246        // Read from the app store
247        self.stores
248            .app
249            .get(&key)
250            .map(|app_state| app_state.value.clone())
251    }
252
253    /// Get operation log of the app store for synchronization
254    /// Returns an operation log that can be merged into other nodes
255    pub fn get_operation_log(&self) -> crate::crdt_kv::OperationLog {
256        self.stores.app.get_operation_log()
257    }
258
259    /// Sync app store data from an operation log (for testing and manual sync)
260    /// This will be replaced by automatic sync stream in the future
261    pub fn sync_app_from_log(&self, log: &crate::crdt_kv::OperationLog) {
262        // Merge operation log into our app store using CRDT merge
263        self.stores.app.merge(log);
264    }
265}
266
267pub struct MeshServerBuilder {
268    state: ClusterState,
269    stores: Arc<StateStores>,
270    self_name: String,
271    bind_addr: SocketAddr,
272    advertise_addr: SocketAddr,
273    init_peer: Option<SocketAddr>,
274    mtls_manager: Option<Arc<MTLSManager>>,
275}
276
277impl MeshServerBuilder {
278    pub fn new(
279        self_name: String,
280        bind_addr: SocketAddr,
281        advertise_addr: SocketAddr,
282        init_peer: Option<SocketAddr>,
283    ) -> Self {
284        let state = Arc::new(RwLock::new(BTreeMap::from([(
285            self_name.clone(),
286            NodeState {
287                name: self_name.clone(),
288                address: advertise_addr.to_string(),
289                status: NodeStatus::Alive as i32,
290                version: 1,
291                metadata: HashMap::new(),
292            },
293        )])));
294        let stores = Arc::new(StateStores::with_self_name(self_name.clone()));
295        Self {
296            state,
297            stores,
298            self_name,
299            bind_addr,
300            advertise_addr,
301            init_peer,
302            mtls_manager: None,
303        }
304    }
305
306    pub fn with_mtls(mut self, mtls_config: MTLSConfig) -> Self {
307        self.mtls_manager = Some(Arc::new(MTLSManager::new(mtls_config)));
308        self
309    }
310
311    pub fn build(&self) -> (MeshServer, MeshServerHandler) {
312        let (signal_tx, signal_rx) = watch::channel(false);
313        let partition_detector = Arc::new(PartitionDetector::default());
314        let sync_manager = Arc::new(MeshSyncManager::new(
315            self.stores.clone(),
316            self.self_name.clone(),
317        ));
318        let state_machine = Arc::new(NodeStateMachine::new(
319            self.stores.clone(),
320            ConvergenceConfig::default(),
321        ));
322        // Initialize rate-limit hash ring with current membership
323        sync_manager.update_rate_limit_membership();
324        (
325            MeshServer {
326                state: self.state.clone(),
327                stores: self.stores.clone(),
328                sync_manager: sync_manager.clone(),
329                self_name: self.self_name.clone(),
330                bind_addr: self.bind_addr,
331                advertise_addr: self.advertise_addr,
332                init_peer: self.init_peer,
333                signal_rx,
334                partition_detector: Some(partition_detector.clone()),
335                mtls_manager: self.mtls_manager.clone(),
336            },
337            MeshServerHandler {
338                state: self.state.clone(),
339                stores: self.stores.clone(),
340                sync_manager,
341                self_name: self.self_name.clone(),
342                _self_addr: self.advertise_addr,
343                signal_tx,
344                partition_detector: Some(partition_detector),
345                state_machine: Some(state_machine),
346                rate_limit_task_handle: std::sync::Mutex::new(None),
347            },
348        )
349    }
350}
351
352impl From<&MeshServerConfig> for MeshServerBuilder {
353    fn from(value: &MeshServerConfig) -> Self {
354        let mut builder = MeshServerBuilder::new(
355            value.self_name.clone(),
356            value.bind_addr,
357            value.advertise_addr,
358            value.init_peer,
359        );
360        if let Some(mtls_config) = &value.mtls_config {
361            builder = builder.with_mtls(mtls_config.clone());
362        }
363        builder
364    }
365}
366
367pub struct MeshServer {
368    state: ClusterState,
369    stores: Arc<StateStores>,
370    sync_manager: Arc<MeshSyncManager>,
371    self_name: String,
372    bind_addr: SocketAddr,
373    advertise_addr: SocketAddr,
374    init_peer: Option<SocketAddr>,
375    signal_rx: watch::Receiver<bool>,
376    partition_detector: Option<Arc<PartitionDetector>>,
377    mtls_manager: Option<Arc<MTLSManager>>,
378}
379
380impl MeshServer {
381    fn build_ping_server(&self) -> GossipService {
382        GossipService::new(
383            self.state.clone(),
384            self.bind_addr,
385            self.advertise_addr,
386            &self.self_name,
387        )
388    }
389
390    fn build_controller(&self) -> MeshController {
391        MeshController::new(
392            self.state.clone(),
393            self.advertise_addr,
394            &self.self_name,
395            self.init_peer,
396            self.stores.clone(),
397            self.sync_manager.clone(),
398            self.mtls_manager.clone(),
399        )
400    }
401
402    pub async fn start(self) -> Result<()> {
403        self.start_inner(None).await
404    }
405
406    pub async fn start_with_listener(self, listener: tokio::net::TcpListener) -> Result<()> {
407        let bound_addr = listener
408            .local_addr()
409            .map_err(|e| anyhow::anyhow!("Failed to read listener local addr: {e}"))?;
410        if bound_addr != self.bind_addr {
411            return Err(anyhow::anyhow!(
412                "Listener/bind_addr mismatch: listener={}, bind_addr={}",
413                bound_addr,
414                self.bind_addr
415            ));
416        }
417        self.start_inner(Some(listener)).await
418    }
419
420    async fn start_inner(self, listener: Option<tokio::net::TcpListener>) -> Result<()> {
421        log::info!(
422            "Mesh server listening on {} and advertising {}",
423            self.bind_addr,
424            self.advertise_addr
425        );
426        let self_name = self.self_name.clone();
427        let advertise_address = self.advertise_addr;
428
429        #[expect(
430            clippy::expect_used,
431            reason = "partition_detector is always set to Some by MeshServerBuilder::build() before start() is called"
432        )]
433        let partition_detector = self
434            .partition_detector
435            .clone()
436            .expect("partition detector missing");
437
438        let mut service = self.build_ping_server();
439        service = service.with_stores(self.stores.clone());
440
441        service = service.with_sync_manager(self.sync_manager.clone());
442
443        service = service.with_partition_detector(partition_detector);
444
445        // Add mTLS support if configured
446        if let Some(mtls_manager) = self.mtls_manager.clone() {
447            service = service.with_mtls_manager(mtls_manager);
448        }
449
450        let controller = self.build_controller();
451
452        let mut service_shutdown = self.signal_rx.clone();
453
454        #[expect(
455            clippy::disallowed_methods,
456            reason = "handle is awaited immediately below via tokio::select!, bounded by shutdown signal"
457        )]
458        let server_handle = if let Some(tcp_listener) = listener {
459            tokio::spawn(service.serve_ping_with_listener(tcp_listener, async move {
460                _ = service_shutdown.changed().await;
461            }))
462        } else {
463            tokio::spawn(service.serve_ping_with_shutdown(async move {
464                _ = service_shutdown.changed().await;
465            }))
466        };
467        tokio::time::sleep(Duration::from_secs(1)).await;
468        #[expect(
469            clippy::disallowed_methods,
470            reason = "handle is awaited immediately below via tokio::select!, bounded by shutdown signal"
471        )]
472        let app_handle = tokio::spawn(controller.event_loop(self.signal_rx.clone()));
473
474        tokio::select! {
475            res = server_handle => res??,
476            res = app_handle => res??,
477        }
478
479        log::info!(
480            "Mesh server {} at {} is shutting down",
481            self_name,
482            advertise_address
483        );
484        Ok(())
485    }
486}
487
488/// Broadcast node state updates to target nodes
489/// Returns (success_count, total_count)
490pub async fn broadcast_node_states(
491    nodes_to_broadcast: Vec<NodeState>,
492    target_nodes: Vec<NodeState>,
493    timeout: Option<Duration>,
494) -> (usize, usize) {
495    if nodes_to_broadcast.is_empty() || target_nodes.is_empty() {
496        log::debug!(
497            "Nothing to broadcast: nodes_to_broadcast={}, target_nodes={}",
498            nodes_to_broadcast.len(),
499            target_nodes.len()
500        );
501        return (0, target_nodes.len());
502    }
503
504    let mut broadcast_tasks = Vec::new();
505    for target_node in &target_nodes {
506        let target_node_clone = target_node.clone();
507        let nodes_for_task = nodes_to_broadcast.clone();
508        #[expect(
509            clippy::disallowed_methods,
510            reason = "broadcast tasks are collected and awaited via join_all with a timeout immediately below"
511        )]
512        let task = tokio::spawn(async move {
513            let state_sync = StateSync {
514                nodes: nodes_for_task,
515            };
516            let ping_payload = gossip_message::Payload::Ping(Ping {
517                state_sync: Some(state_sync),
518            });
519            match try_ping(&target_node_clone, Some(ping_payload), None).await {
520                Ok(_) => {
521                    log::debug!("Successfully broadcasted to {}", target_node_clone.name);
522                    Ok(())
523                }
524                Err(e) => {
525                    log::warn!("Failed to broadcast to {}: {}", target_node_clone.name, e);
526                    Err(e)
527                }
528            }
529        });
530        broadcast_tasks.push(task);
531    }
532
533    let timeout_duration = timeout.unwrap_or(Duration::from_secs(3));
534    let broadcast_result = tokio::time::timeout(timeout_duration, async {
535        futures::future::join_all(broadcast_tasks).await
536    })
537    .await;
538
539    match broadcast_result {
540        Ok(results) => {
541            let success_count = results.iter().filter(|r| matches!(r, Ok(Ok(())))).count();
542            let total_count = target_nodes.len();
543            log::info!(
544                "Broadcast completed: {}/{} successful",
545                success_count,
546                total_count
547            );
548            (success_count, total_count)
549        }
550        Err(_) => {
551            log::warn!(
552                "Broadcast timeout after {} seconds",
553                timeout_duration.as_secs()
554            );
555            (0, target_nodes.len())
556        }
557    }
558}
559
560pub async fn try_ping(
561    peer_node: &NodeState,
562    payload: Option<gossip_message::Payload>,
563    mtls_manager: Option<Arc<MTLSManager>>,
564) -> Result<NodeUpdate, tonic::Status> {
565    let peer_name = peer_node.name.clone();
566
567    let peer_addr = SocketAddr::from_str(&peer_node.address).map_err(|e| {
568        tonic::Status::invalid_argument(format!(
569            "Invalid address for node {}: {}, {}",
570            peer_name, peer_node.address, e
571        ))
572    })?;
573
574    let connect_url = if mtls_manager.is_some() {
575        format!("https://{peer_addr}")
576    } else {
577        format!("http://{peer_addr}")
578    };
579
580    let mut endpoint = Endpoint::from_shared(connect_url.clone())
581        .map_err(|e| {
582            tonic::Status::invalid_argument(format!(
583                "Invalid endpoint for node {peer_name}: {connect_url}, {e}"
584            ))
585        })?
586        .connect_timeout(Duration::from_secs(5))
587        .timeout(Duration::from_secs(10));
588
589    if let Some(mtls_manager) = mtls_manager {
590        mtls_manager.load_client_config().await.map_err(|e| {
591            tonic::Status::unavailable(format!(
592                "Failed to load mTLS client config for {peer_name}: {e}"
593            ))
594        })?;
595
596        let tls_domain = endpoint
597            .uri()
598            .host()
599            .map(str::to_owned)
600            .unwrap_or_else(|| peer_name.clone());
601        let ca_certificate = mtls_manager.load_ca_certificate().await.map_err(|e| {
602            tonic::Status::unavailable(format!(
603                "Failed to load mTLS CA certificate for {peer_name}: {e}"
604            ))
605        })?;
606
607        endpoint = endpoint
608            .tls_config(
609                ClientTlsConfig::new()
610                    .domain_name(tls_domain)
611                    .ca_certificate(ca_certificate),
612            )
613            .map_err(|e| {
614                tonic::Status::unavailable(format!(
615                    "Failed to configure TLS endpoint for {peer_name}: {e}"
616                ))
617            })?;
618    }
619
620    let channel = endpoint.connect().await.map_err(|e| {
621        log::warn!(
622            "Failed to connect to peer {} {}: {}.",
623            peer_name,
624            peer_addr,
625            e
626        );
627        tonic::Status::unavailable("Failed to connect to peer")
628    })?;
629    let mut client = gossip_client::GossipClient::new(channel)
630        .max_decoding_message_size(MAX_MESSAGE_SIZE)
631        .max_encoding_message_size(MAX_MESSAGE_SIZE)
632        .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
633        .send_compressed(tonic::codec::CompressionEncoding::Gzip);
634
635    let ping_message = GossipMessage { payload };
636    let response = client.ping_server(Request::new(ping_message)).await?;
637
638    Ok(response.into_inner())
639}
640
641#[macro_export]
642macro_rules! mesh_run {
643    ($addr:expr, $init_peer:expr) => {{
644        mesh_run!($addr.to_string(), $addr, $init_peer)
645    }};
646
647    ($name:expr, $addr:expr, $init_peer:expr) => {{
648        tracing::info!("Starting mesh server : {}", $addr);
649        use $crate::MeshServerBuilder;
650        let (server, handler) =
651            MeshServerBuilder::new($name.to_string(), $addr, $addr, $init_peer).build();
652        #[expect(clippy::disallowed_methods, reason = "test macro: spawned server runs for the test lifetime and handler is returned for assertions")]
653        tokio::spawn(async move {
654            if let Err(e) = server.start().await {
655                tracing::error!("Mesh server failed: {}", e);
656            }
657        });
658        handler
659    }};
660
661    ($name:expr, $listener:expr, $addr:expr, $init_peer:expr) => {{
662        tracing::info!("Starting mesh server : {}", $addr);
663        use $crate::MeshServerBuilder;
664        let (server, handler) =
665            MeshServerBuilder::new($name.to_string(), $addr, $addr, $init_peer).build();
666        #[expect(clippy::disallowed_methods, reason = "test macro: spawned server runs for the test lifetime and handler is returned for assertions")]
667        tokio::spawn(async move {
668            if let Err(e) = server.start_with_listener($listener).await {
669                tracing::error!("Mesh server failed: {}", e);
670            }
671        });
672        handler
673    }};
674}
675
676#[cfg(test)]
677mod tests {
678    use std::sync::Once;
679
680    use tracing as log;
681    use tracing_subscriber::{
682        filter::LevelFilter, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter,
683    };
684
685    use super::*;
686    use crate::tests::test_utils::{bind_node, wait_for};
687
688    static INIT: Once = Once::new();
689    fn init() {
690        INIT.call_once(|| {
691            let _ = tracing_subscriber::registry()
692                .with(tracing_subscriber::fmt::layer())
693                .with(
694                    EnvFilter::builder()
695                        .with_default_directive(LevelFilter::INFO.into())
696                        .from_env_lossy(),
697                )
698                .try_init();
699        });
700    }
701
702    #[tokio::test]
703    async fn test_ping_advertises_configured_address() {
704        init();
705
706        let (listener, bind_addr) = bind_node().await;
707        let advertise_addr = SocketAddr::from(([10, 20, 30, 40], bind_addr.port()));
708        let (server, handler) =
709            MeshServerBuilder::new("A".to_string(), bind_addr, advertise_addr, None).build();
710
711        #[expect(
712            clippy::disallowed_methods,
713            reason = "test server runs in the background for the duration of the assertion"
714        )]
715        tokio::spawn(async move {
716            if let Err(e) = server.start_with_listener(listener).await {
717                tracing::error!("Mesh server failed: {}", e);
718            }
719        });
720
721        wait_for(
722            || std::net::TcpStream::connect(bind_addr).is_ok(),
723            Duration::from_secs(5),
724            "mesh listener started",
725        )
726        .await;
727
728        let response = try_ping(
729            &NodeState {
730                name: "A".to_string(),
731                address: bind_addr.to_string(),
732                status: NodeStatus::Alive as i32,
733                version: 1,
734                metadata: HashMap::new(),
735            },
736            Some(gossip_message::Payload::Ping(Ping {
737                state_sync: Some(StateSync { nodes: vec![] }),
738            })),
739            None,
740        )
741        .await
742        .unwrap();
743
744        assert_eq!(response.address, advertise_addr.to_string());
745        handler.shutdown();
746    }
747
748    #[tokio::test]
749    #[ignore = "SWIM failure detection for hard-shutdown nodes needs many gossip rounds; flaky under parallel CI load"]
750    async fn test_state_synchronization() {
751        init();
752        log::info!("Starting test_state_synchronization");
753
754        // 1. Setup A-B cluster
755        let (listener_a, addr_a) = bind_node().await;
756        let handler_a = mesh_run!("A", listener_a, addr_a, None);
757        let (listener_b, addr_b) = bind_node().await;
758        let handler_b = mesh_run!("B", listener_b, addr_b, Some(addr_a));
759
760        wait_for(
761            || handler_a.state.read().len() == 2,
762            Duration::from_secs(15),
763            "A-B cluster formed",
764        )
765        .await;
766
767        handler_a
768            .write_data("hello".into(), "world".into())
769            .unwrap();
770
771        // 2. Add C and D
772        let (listener_c, addr_c) = bind_node().await;
773        let handler_c = mesh_run!("C", listener_c, addr_c, Some(addr_a));
774        let (listener_d, addr_d) = bind_node().await;
775        let handler_d = mesh_run!("D", listener_d, addr_d, Some(addr_c));
776
777        wait_for(
778            || handler_a.state.read().len() == 4,
779            Duration::from_secs(30),
780            "4-node cluster formed",
781        )
782        .await;
783
784        // 3. Add E, let it join, then kill it
785        {
786            let (listener_e, addr_e) = bind_node().await;
787            let handler_e = mesh_run!("E", listener_e, addr_e, Some(addr_d));
788
789            wait_for(
790                || handler_a.state.read().len() == 5,
791                Duration::from_secs(30),
792                "E joined cluster",
793            )
794            .await;
795
796            handler_e.shutdown();
797        }
798
799        // 4. Gracefully shutdown D
800        handler_d.graceful_shutdown().await.unwrap();
801
802        // 5. Wait for D=Leaving and E=Down (not Alive) on all remaining nodes
803        let check_statuses = |handler: &MeshServerHandler| {
804            let state = handler.state.read();
805            let d_leaving = state
806                .get("D")
807                .is_some_and(|n| n.status == NodeStatus::Leaving as i32);
808            let e_not_alive = state
809                .get("E")
810                .is_some_and(|n| n.status != NodeStatus::Alive as i32);
811            d_leaving && e_not_alive
812        };
813
814        for (handler, name) in [(&handler_a, "A"), (&handler_b, "B"), (&handler_c, "C")] {
815            wait_for(
816                || check_statuses(handler),
817                Duration::from_secs(60),
818                &format!("D=Leaving, E not Alive on node {name}"),
819            )
820            .await;
821        }
822
823        log::info!("All nodes converged to expected state");
824    }
825}