1use std::{
2 collections::{BTreeMap, HashMap},
3 net::SocketAddr,
4 str::FromStr,
5 sync::Arc,
6 time::Duration,
7};
8
9use anyhow::Result;
10use parking_lot::RwLock;
11use tokio::sync::watch;
12use tonic::{
13 transport::{ClientTlsConfig, Endpoint},
14 Request,
15};
16use tracing as log;
17
18use crate::flow_control::MAX_MESSAGE_SIZE;
19
20pub mod gossip {
21 #![allow(unused_qualifications, clippy::absolute_paths)]
22 #![allow(clippy::trivially_copy_pass_by_ref, clippy::allow_attributes)]
23 tonic::include_proto!("mesh.gossip");
24}
25use gossip::{
26 gossip_client, gossip_message, GossipMessage, NodeState, NodeStatus, NodeUpdate, Ping,
27 StateSync,
28};
29
30use crate::{
31 controller::MeshController,
32 mtls::{MTLSConfig, MTLSManager},
33 node_state_machine::{ConvergenceConfig, NodeStateMachine},
34 partition::PartitionDetector,
35 ping_server::GossipService,
36 stores::{AppState, StateStores},
37 sync::MeshSyncManager,
38};
39
40pub type ClusterState = Arc<RwLock<BTreeMap<String, NodeState>>>;
41
42pub struct MeshServerConfig {
43 pub self_name: String,
44 pub bind_addr: SocketAddr,
45 pub advertise_addr: SocketAddr,
46 pub init_peer: Option<SocketAddr>,
47 pub mtls_config: Option<MTLSConfig>,
48}
49
50pub struct MeshServerHandler {
55 pub state: ClusterState,
56 pub stores: Arc<StateStores>,
57 pub sync_manager: Arc<MeshSyncManager>,
58 pub self_name: String,
59 _self_addr: SocketAddr,
60 signal_tx: watch::Sender<bool>,
61 partition_detector: Option<Arc<PartitionDetector>>,
62 state_machine: Option<Arc<NodeStateMachine>>,
63 rate_limit_task_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
64}
65
66impl MeshServerHandler {
67 pub fn partition_detector(&self) -> Option<&Arc<PartitionDetector>> {
69 self.partition_detector.as_ref()
70 }
71
72 pub fn state_machine(&self) -> Option<&Arc<NodeStateMachine>> {
74 self.state_machine.as_ref()
75 }
76
77 pub fn is_ready(&self) -> bool {
79 self.state_machine
80 .as_ref()
81 .map(|sm| sm.is_ready())
82 .unwrap_or(true) }
84
85 pub fn should_serve(&self) -> bool {
87 self.partition_detector
88 .as_ref()
89 .map(|pd| pd.should_serve())
90 .unwrap_or(true) }
92
93 pub fn start_rate_limit_task(&self, window_seconds: u64) {
96 use crate::rate_limit_window::RateLimitWindow;
97
98 let window_manager = RateLimitWindow::new(self.sync_manager.clone(), window_seconds);
99 let shutdown_rx = self.signal_tx.subscribe();
100
101 #[expect(
102 clippy::disallowed_methods,
103 reason = "handle is stored in rate_limit_task_handle and awaited on shutdown via stop_rate_limit_task"
104 )]
105 let handle = tokio::spawn(async move {
106 window_manager.start_reset_task(shutdown_rx).await;
107 });
108
109 if let Ok(mut task_handle) = self.rate_limit_task_handle.lock() {
110 *task_handle = Some(handle);
111 }
112 }
113
114 pub fn stop_rate_limit_task(&self) {
116 self.signal_tx.send(true).ok();
117 if let Ok(mut task_handle) = self.rate_limit_task_handle.lock() {
118 if let Some(handle) = task_handle.take() {
119 #[expect(
120 clippy::disallowed_methods,
121 reason = "short-lived join task that awaits the rate_limit_task handle during shutdown; completes when the inner task finishes"
122 )]
123 tokio::spawn(async move {
124 if let Err(err) = handle.await {
125 log::warn!("Rate limit task shutdown failed: {}", err);
126 }
127 });
128 }
129 }
130 }
131
132 pub fn shutdown(&self) {
134 self.stop_rate_limit_task();
135 }
136
137 pub async fn graceful_shutdown(&self) -> Result<()> {
140 log::info!("Graceful shutdown for node {}", self.self_name);
141
142 let maybe_leaving = {
143 let state = self.state.read();
144
145 if let Some(self_node) = state.get(&self.self_name) {
146 let mut self_node = self_node.clone();
147 if self_node.status == NodeStatus::Leaving as i32 {
148 None
149 } else {
150 self_node.status = NodeStatus::Leaving as i32;
151 self_node.version += 1;
152
153 let alive_nodes = state
154 .values()
155 .filter(|node| {
156 node.status == NodeStatus::Alive as i32 && node.name != self.self_name
157 })
159 .cloned()
160 .collect::<Vec<NodeState>>();
161
162 Some((self_node, alive_nodes))
163 }
164 } else {
165 None
166 }
167 };
168 let (leaving_node, alive_nodes) = match maybe_leaving {
169 Some(values) => values,
170 None => {
171 self.stop_rate_limit_task();
172 return Ok(());
173 }
174 };
175
176 log::info!(
177 "Broadcasting LEAVING status to {} alive nodes",
178 alive_nodes.len()
179 );
180
181 let (success_count, total_count) = broadcast_node_states(
183 vec![leaving_node],
184 alive_nodes,
185 Some(Duration::from_secs(3)),
186 )
187 .await;
188
189 log::info!(
190 "Broadcast LEAVING status: {}/{} successful",
191 success_count,
192 total_count
193 );
194
195 let propagation_delay = Duration::from_secs(1);
197 log::info!(
198 "Waiting {} seconds for LEAVING status propagation",
199 propagation_delay.as_secs()
200 );
201 tokio::time::sleep(propagation_delay).await;
202
203 log::info!("Stopping rate limit task and signaling shutdown");
204 self.stop_rate_limit_task();
205 Ok(())
206 }
207
208 fn next_version(&self, key: &str) -> u64 {
212 self.stores
213 .app
214 .get(key)
215 .map(|app_state| app_state.version + 1)
216 .unwrap_or(1)
217 }
218
219 pub fn write_data(&self, key: String, value: Vec<u8>) -> Result<()> {
220 let mut state = self.state.write();
222 let node = state.get_mut(&self.self_name).ok_or_else(|| {
223 anyhow::anyhow!(
224 "Node {} not found in cluster state during write_data",
225 self.self_name
226 )
227 })?;
228
229 let version = self.next_version(&key);
230 let app_state = AppState {
231 key: key.clone(),
232 value: value.clone(),
233 version,
234 };
235 self.stores
236 .app
237 .insert(key.clone(), app_state)
238 .map_err(|err| anyhow::anyhow!("Failed to persist app state for key {key}: {err}"))?;
239
240 node.metadata.insert(key, value);
241 node.version += 1;
242 Ok(())
243 }
244
245 pub fn read_data(&self, key: String) -> Option<Vec<u8>> {
246 self.stores
248 .app
249 .get(&key)
250 .map(|app_state| app_state.value.clone())
251 }
252
253 pub fn get_operation_log(&self) -> crate::crdt_kv::OperationLog {
256 self.stores.app.get_operation_log()
257 }
258
259 pub fn sync_app_from_log(&self, log: &crate::crdt_kv::OperationLog) {
262 self.stores.app.merge(log);
264 }
265}
266
267pub struct MeshServerBuilder {
268 state: ClusterState,
269 stores: Arc<StateStores>,
270 self_name: String,
271 bind_addr: SocketAddr,
272 advertise_addr: SocketAddr,
273 init_peer: Option<SocketAddr>,
274 mtls_manager: Option<Arc<MTLSManager>>,
275}
276
277impl MeshServerBuilder {
278 pub fn new(
279 self_name: String,
280 bind_addr: SocketAddr,
281 advertise_addr: SocketAddr,
282 init_peer: Option<SocketAddr>,
283 ) -> Self {
284 let state = Arc::new(RwLock::new(BTreeMap::from([(
285 self_name.clone(),
286 NodeState {
287 name: self_name.clone(),
288 address: advertise_addr.to_string(),
289 status: NodeStatus::Alive as i32,
290 version: 1,
291 metadata: HashMap::new(),
292 },
293 )])));
294 let stores = Arc::new(StateStores::with_self_name(self_name.clone()));
295 Self {
296 state,
297 stores,
298 self_name,
299 bind_addr,
300 advertise_addr,
301 init_peer,
302 mtls_manager: None,
303 }
304 }
305
306 pub fn with_mtls(mut self, mtls_config: MTLSConfig) -> Self {
307 self.mtls_manager = Some(Arc::new(MTLSManager::new(mtls_config)));
308 self
309 }
310
311 pub fn build(&self) -> (MeshServer, MeshServerHandler) {
312 let (signal_tx, signal_rx) = watch::channel(false);
313 let partition_detector = Arc::new(PartitionDetector::default());
314 let sync_manager = Arc::new(MeshSyncManager::new(
315 self.stores.clone(),
316 self.self_name.clone(),
317 ));
318 let state_machine = Arc::new(NodeStateMachine::new(
319 self.stores.clone(),
320 ConvergenceConfig::default(),
321 ));
322 sync_manager.update_rate_limit_membership();
324 (
325 MeshServer {
326 state: self.state.clone(),
327 stores: self.stores.clone(),
328 sync_manager: sync_manager.clone(),
329 self_name: self.self_name.clone(),
330 bind_addr: self.bind_addr,
331 advertise_addr: self.advertise_addr,
332 init_peer: self.init_peer,
333 signal_rx,
334 partition_detector: Some(partition_detector.clone()),
335 mtls_manager: self.mtls_manager.clone(),
336 },
337 MeshServerHandler {
338 state: self.state.clone(),
339 stores: self.stores.clone(),
340 sync_manager,
341 self_name: self.self_name.clone(),
342 _self_addr: self.advertise_addr,
343 signal_tx,
344 partition_detector: Some(partition_detector),
345 state_machine: Some(state_machine),
346 rate_limit_task_handle: std::sync::Mutex::new(None),
347 },
348 )
349 }
350}
351
352impl From<&MeshServerConfig> for MeshServerBuilder {
353 fn from(value: &MeshServerConfig) -> Self {
354 let mut builder = MeshServerBuilder::new(
355 value.self_name.clone(),
356 value.bind_addr,
357 value.advertise_addr,
358 value.init_peer,
359 );
360 if let Some(mtls_config) = &value.mtls_config {
361 builder = builder.with_mtls(mtls_config.clone());
362 }
363 builder
364 }
365}
366
367pub struct MeshServer {
368 state: ClusterState,
369 stores: Arc<StateStores>,
370 sync_manager: Arc<MeshSyncManager>,
371 self_name: String,
372 bind_addr: SocketAddr,
373 advertise_addr: SocketAddr,
374 init_peer: Option<SocketAddr>,
375 signal_rx: watch::Receiver<bool>,
376 partition_detector: Option<Arc<PartitionDetector>>,
377 mtls_manager: Option<Arc<MTLSManager>>,
378}
379
380impl MeshServer {
381 fn build_ping_server(&self) -> GossipService {
382 GossipService::new(
383 self.state.clone(),
384 self.bind_addr,
385 self.advertise_addr,
386 &self.self_name,
387 )
388 }
389
390 fn build_controller(&self) -> MeshController {
391 MeshController::new(
392 self.state.clone(),
393 self.advertise_addr,
394 &self.self_name,
395 self.init_peer,
396 self.stores.clone(),
397 self.sync_manager.clone(),
398 self.mtls_manager.clone(),
399 )
400 }
401
402 pub async fn start(self) -> Result<()> {
403 self.start_inner(None).await
404 }
405
406 pub async fn start_with_listener(self, listener: tokio::net::TcpListener) -> Result<()> {
407 let bound_addr = listener
408 .local_addr()
409 .map_err(|e| anyhow::anyhow!("Failed to read listener local addr: {e}"))?;
410 if bound_addr != self.bind_addr {
411 return Err(anyhow::anyhow!(
412 "Listener/bind_addr mismatch: listener={}, bind_addr={}",
413 bound_addr,
414 self.bind_addr
415 ));
416 }
417 self.start_inner(Some(listener)).await
418 }
419
420 async fn start_inner(self, listener: Option<tokio::net::TcpListener>) -> Result<()> {
421 log::info!(
422 "Mesh server listening on {} and advertising {}",
423 self.bind_addr,
424 self.advertise_addr
425 );
426 let self_name = self.self_name.clone();
427 let advertise_address = self.advertise_addr;
428
429 #[expect(
430 clippy::expect_used,
431 reason = "partition_detector is always set to Some by MeshServerBuilder::build() before start() is called"
432 )]
433 let partition_detector = self
434 .partition_detector
435 .clone()
436 .expect("partition detector missing");
437
438 let mut service = self.build_ping_server();
439 service = service.with_stores(self.stores.clone());
440
441 service = service.with_sync_manager(self.sync_manager.clone());
442
443 service = service.with_partition_detector(partition_detector);
444
445 if let Some(mtls_manager) = self.mtls_manager.clone() {
447 service = service.with_mtls_manager(mtls_manager);
448 }
449
450 let controller = self.build_controller();
451
452 let mut service_shutdown = self.signal_rx.clone();
453
454 #[expect(
455 clippy::disallowed_methods,
456 reason = "handle is awaited immediately below via tokio::select!, bounded by shutdown signal"
457 )]
458 let server_handle = if let Some(tcp_listener) = listener {
459 tokio::spawn(service.serve_ping_with_listener(tcp_listener, async move {
460 _ = service_shutdown.changed().await;
461 }))
462 } else {
463 tokio::spawn(service.serve_ping_with_shutdown(async move {
464 _ = service_shutdown.changed().await;
465 }))
466 };
467 tokio::time::sleep(Duration::from_secs(1)).await;
468 #[expect(
469 clippy::disallowed_methods,
470 reason = "handle is awaited immediately below via tokio::select!, bounded by shutdown signal"
471 )]
472 let app_handle = tokio::spawn(controller.event_loop(self.signal_rx.clone()));
473
474 tokio::select! {
475 res = server_handle => res??,
476 res = app_handle => res??,
477 }
478
479 log::info!(
480 "Mesh server {} at {} is shutting down",
481 self_name,
482 advertise_address
483 );
484 Ok(())
485 }
486}
487
488pub async fn broadcast_node_states(
491 nodes_to_broadcast: Vec<NodeState>,
492 target_nodes: Vec<NodeState>,
493 timeout: Option<Duration>,
494) -> (usize, usize) {
495 if nodes_to_broadcast.is_empty() || target_nodes.is_empty() {
496 log::debug!(
497 "Nothing to broadcast: nodes_to_broadcast={}, target_nodes={}",
498 nodes_to_broadcast.len(),
499 target_nodes.len()
500 );
501 return (0, target_nodes.len());
502 }
503
504 let mut broadcast_tasks = Vec::new();
505 for target_node in &target_nodes {
506 let target_node_clone = target_node.clone();
507 let nodes_for_task = nodes_to_broadcast.clone();
508 #[expect(
509 clippy::disallowed_methods,
510 reason = "broadcast tasks are collected and awaited via join_all with a timeout immediately below"
511 )]
512 let task = tokio::spawn(async move {
513 let state_sync = StateSync {
514 nodes: nodes_for_task,
515 };
516 let ping_payload = gossip_message::Payload::Ping(Ping {
517 state_sync: Some(state_sync),
518 });
519 match try_ping(&target_node_clone, Some(ping_payload), None).await {
520 Ok(_) => {
521 log::debug!("Successfully broadcasted to {}", target_node_clone.name);
522 Ok(())
523 }
524 Err(e) => {
525 log::warn!("Failed to broadcast to {}: {}", target_node_clone.name, e);
526 Err(e)
527 }
528 }
529 });
530 broadcast_tasks.push(task);
531 }
532
533 let timeout_duration = timeout.unwrap_or(Duration::from_secs(3));
534 let broadcast_result = tokio::time::timeout(timeout_duration, async {
535 futures::future::join_all(broadcast_tasks).await
536 })
537 .await;
538
539 match broadcast_result {
540 Ok(results) => {
541 let success_count = results.iter().filter(|r| matches!(r, Ok(Ok(())))).count();
542 let total_count = target_nodes.len();
543 log::info!(
544 "Broadcast completed: {}/{} successful",
545 success_count,
546 total_count
547 );
548 (success_count, total_count)
549 }
550 Err(_) => {
551 log::warn!(
552 "Broadcast timeout after {} seconds",
553 timeout_duration.as_secs()
554 );
555 (0, target_nodes.len())
556 }
557 }
558}
559
560pub async fn try_ping(
561 peer_node: &NodeState,
562 payload: Option<gossip_message::Payload>,
563 mtls_manager: Option<Arc<MTLSManager>>,
564) -> Result<NodeUpdate, tonic::Status> {
565 let peer_name = peer_node.name.clone();
566
567 let peer_addr = SocketAddr::from_str(&peer_node.address).map_err(|e| {
568 tonic::Status::invalid_argument(format!(
569 "Invalid address for node {}: {}, {}",
570 peer_name, peer_node.address, e
571 ))
572 })?;
573
574 let connect_url = if mtls_manager.is_some() {
575 format!("https://{peer_addr}")
576 } else {
577 format!("http://{peer_addr}")
578 };
579
580 let mut endpoint = Endpoint::from_shared(connect_url.clone())
581 .map_err(|e| {
582 tonic::Status::invalid_argument(format!(
583 "Invalid endpoint for node {peer_name}: {connect_url}, {e}"
584 ))
585 })?
586 .connect_timeout(Duration::from_secs(5))
587 .timeout(Duration::from_secs(10));
588
589 if let Some(mtls_manager) = mtls_manager {
590 mtls_manager.load_client_config().await.map_err(|e| {
591 tonic::Status::unavailable(format!(
592 "Failed to load mTLS client config for {peer_name}: {e}"
593 ))
594 })?;
595
596 let tls_domain = endpoint
597 .uri()
598 .host()
599 .map(str::to_owned)
600 .unwrap_or_else(|| peer_name.clone());
601 let ca_certificate = mtls_manager.load_ca_certificate().await.map_err(|e| {
602 tonic::Status::unavailable(format!(
603 "Failed to load mTLS CA certificate for {peer_name}: {e}"
604 ))
605 })?;
606
607 endpoint = endpoint
608 .tls_config(
609 ClientTlsConfig::new()
610 .domain_name(tls_domain)
611 .ca_certificate(ca_certificate),
612 )
613 .map_err(|e| {
614 tonic::Status::unavailable(format!(
615 "Failed to configure TLS endpoint for {peer_name}: {e}"
616 ))
617 })?;
618 }
619
620 let channel = endpoint.connect().await.map_err(|e| {
621 log::warn!(
622 "Failed to connect to peer {} {}: {}.",
623 peer_name,
624 peer_addr,
625 e
626 );
627 tonic::Status::unavailable("Failed to connect to peer")
628 })?;
629 let mut client = gossip_client::GossipClient::new(channel)
630 .max_decoding_message_size(MAX_MESSAGE_SIZE)
631 .max_encoding_message_size(MAX_MESSAGE_SIZE)
632 .accept_compressed(tonic::codec::CompressionEncoding::Gzip)
633 .send_compressed(tonic::codec::CompressionEncoding::Gzip);
634
635 let ping_message = GossipMessage { payload };
636 let response = client.ping_server(Request::new(ping_message)).await?;
637
638 Ok(response.into_inner())
639}
640
641#[macro_export]
642macro_rules! mesh_run {
643 ($addr:expr, $init_peer:expr) => {{
644 mesh_run!($addr.to_string(), $addr, $init_peer)
645 }};
646
647 ($name:expr, $addr:expr, $init_peer:expr) => {{
648 tracing::info!("Starting mesh server : {}", $addr);
649 use $crate::MeshServerBuilder;
650 let (server, handler) =
651 MeshServerBuilder::new($name.to_string(), $addr, $addr, $init_peer).build();
652 #[expect(clippy::disallowed_methods, reason = "test macro: spawned server runs for the test lifetime and handler is returned for assertions")]
653 tokio::spawn(async move {
654 if let Err(e) = server.start().await {
655 tracing::error!("Mesh server failed: {}", e);
656 }
657 });
658 handler
659 }};
660
661 ($name:expr, $listener:expr, $addr:expr, $init_peer:expr) => {{
662 tracing::info!("Starting mesh server : {}", $addr);
663 use $crate::MeshServerBuilder;
664 let (server, handler) =
665 MeshServerBuilder::new($name.to_string(), $addr, $addr, $init_peer).build();
666 #[expect(clippy::disallowed_methods, reason = "test macro: spawned server runs for the test lifetime and handler is returned for assertions")]
667 tokio::spawn(async move {
668 if let Err(e) = server.start_with_listener($listener).await {
669 tracing::error!("Mesh server failed: {}", e);
670 }
671 });
672 handler
673 }};
674}
675
676#[cfg(test)]
677mod tests {
678 use std::sync::Once;
679
680 use tracing as log;
681 use tracing_subscriber::{
682 filter::LevelFilter, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter,
683 };
684
685 use super::*;
686 use crate::tests::test_utils::{bind_node, wait_for};
687
688 static INIT: Once = Once::new();
689 fn init() {
690 INIT.call_once(|| {
691 let _ = tracing_subscriber::registry()
692 .with(tracing_subscriber::fmt::layer())
693 .with(
694 EnvFilter::builder()
695 .with_default_directive(LevelFilter::INFO.into())
696 .from_env_lossy(),
697 )
698 .try_init();
699 });
700 }
701
702 #[tokio::test]
703 async fn test_ping_advertises_configured_address() {
704 init();
705
706 let (listener, bind_addr) = bind_node().await;
707 let advertise_addr = SocketAddr::from(([10, 20, 30, 40], bind_addr.port()));
708 let (server, handler) =
709 MeshServerBuilder::new("A".to_string(), bind_addr, advertise_addr, None).build();
710
711 #[expect(
712 clippy::disallowed_methods,
713 reason = "test server runs in the background for the duration of the assertion"
714 )]
715 tokio::spawn(async move {
716 if let Err(e) = server.start_with_listener(listener).await {
717 tracing::error!("Mesh server failed: {}", e);
718 }
719 });
720
721 wait_for(
722 || std::net::TcpStream::connect(bind_addr).is_ok(),
723 Duration::from_secs(5),
724 "mesh listener started",
725 )
726 .await;
727
728 let response = try_ping(
729 &NodeState {
730 name: "A".to_string(),
731 address: bind_addr.to_string(),
732 status: NodeStatus::Alive as i32,
733 version: 1,
734 metadata: HashMap::new(),
735 },
736 Some(gossip_message::Payload::Ping(Ping {
737 state_sync: Some(StateSync { nodes: vec![] }),
738 })),
739 None,
740 )
741 .await
742 .unwrap();
743
744 assert_eq!(response.address, advertise_addr.to_string());
745 handler.shutdown();
746 }
747
748 #[tokio::test]
749 #[ignore = "SWIM failure detection for hard-shutdown nodes needs many gossip rounds; flaky under parallel CI load"]
750 async fn test_state_synchronization() {
751 init();
752 log::info!("Starting test_state_synchronization");
753
754 let (listener_a, addr_a) = bind_node().await;
756 let handler_a = mesh_run!("A", listener_a, addr_a, None);
757 let (listener_b, addr_b) = bind_node().await;
758 let handler_b = mesh_run!("B", listener_b, addr_b, Some(addr_a));
759
760 wait_for(
761 || handler_a.state.read().len() == 2,
762 Duration::from_secs(15),
763 "A-B cluster formed",
764 )
765 .await;
766
767 handler_a
768 .write_data("hello".into(), "world".into())
769 .unwrap();
770
771 let (listener_c, addr_c) = bind_node().await;
773 let handler_c = mesh_run!("C", listener_c, addr_c, Some(addr_a));
774 let (listener_d, addr_d) = bind_node().await;
775 let handler_d = mesh_run!("D", listener_d, addr_d, Some(addr_c));
776
777 wait_for(
778 || handler_a.state.read().len() == 4,
779 Duration::from_secs(30),
780 "4-node cluster formed",
781 )
782 .await;
783
784 {
786 let (listener_e, addr_e) = bind_node().await;
787 let handler_e = mesh_run!("E", listener_e, addr_e, Some(addr_d));
788
789 wait_for(
790 || handler_a.state.read().len() == 5,
791 Duration::from_secs(30),
792 "E joined cluster",
793 )
794 .await;
795
796 handler_e.shutdown();
797 }
798
799 handler_d.graceful_shutdown().await.unwrap();
801
802 let check_statuses = |handler: &MeshServerHandler| {
804 let state = handler.state.read();
805 let d_leaving = state
806 .get("D")
807 .is_some_and(|n| n.status == NodeStatus::Leaving as i32);
808 let e_not_alive = state
809 .get("E")
810 .is_some_and(|n| n.status != NodeStatus::Alive as i32);
811 d_leaving && e_not_alive
812 };
813
814 for (handler, name) in [(&handler_a, "A"), (&handler_b, "B"), (&handler_c, "C")] {
815 wait_for(
816 || check_statuses(handler),
817 Duration::from_secs(60),
818 &format!("D=Leaving, E not Alive on node {name}"),
819 )
820 .await;
821 }
822
823 log::info!("All nodes converged to expected state");
824 }
825}