nodedb_cluster/swim/
bootstrap.rs1use std::net::SocketAddr;
18use std::sync::Arc;
19
20use nodedb_types::NodeId;
21use tokio::sync::watch;
22use tokio::task::JoinHandle;
23
24use super::config::SwimConfig;
25use super::detector::{FailureDetector, ProbeScheduler, Transport};
26use super::dissemination::DisseminationQueue;
27use super::error::SwimError;
28use super::incarnation::Incarnation;
29use super::member::MemberState;
30use super::member::record::MemberUpdate;
31use super::membership::MembershipList;
32
33pub struct SwimHandle {
38 detector: Arc<FailureDetector>,
39 membership: Arc<MembershipList>,
40 shutdown_tx: watch::Sender<bool>,
41 join: JoinHandle<()>,
42}
43
44impl SwimHandle {
45 pub fn detector(&self) -> &Arc<FailureDetector> {
48 &self.detector
49 }
50
51 pub fn membership(&self) -> &Arc<MembershipList> {
54 &self.membership
55 }
56
57 pub fn dissemination(&self) -> &Arc<DisseminationQueue> {
61 self.detector.dissemination()
62 }
63
64 pub async fn shutdown(self) {
67 let _ = self.shutdown_tx.send(true);
68 let _ = self.join.await;
69 }
70}
71
72pub async fn spawn(
86 cfg: SwimConfig,
87 local_id: NodeId,
88 local_addr: SocketAddr,
89 seeds: Vec<SocketAddr>,
90 transport: Arc<dyn Transport>,
91) -> Result<SwimHandle, SwimError> {
92 cfg.validate()?;
93
94 let membership = Arc::new(MembershipList::new_local(
95 local_id.clone(),
96 local_addr,
97 cfg.initial_incarnation,
98 ));
99
100 for seed_addr in &seeds {
103 if *seed_addr == local_addr {
104 continue;
105 }
106 membership.apply(&MemberUpdate {
107 node_id: NodeId::new(format!("seed:{seed_addr}")),
108 addr: seed_addr.to_string(),
109 state: MemberState::Alive,
110 incarnation: Incarnation::ZERO,
111 });
112 }
113
114 let initial_inc = cfg.initial_incarnation;
115 let detector = Arc::new(FailureDetector::new(
116 cfg,
117 Arc::clone(&membership),
118 transport,
119 ProbeScheduler::new(),
120 ));
121
122 detector.dissemination().enqueue(MemberUpdate {
128 node_id: local_id.clone(),
129 addr: local_addr.to_string(),
130 state: MemberState::Alive,
131 incarnation: initial_inc,
132 });
133
134 let (shutdown_tx, shutdown_rx) = watch::channel(false);
135 let join = tokio::spawn({
136 let detector = Arc::clone(&detector);
137 async move { detector.run(shutdown_rx).await }
138 });
139
140 Ok(SwimHandle {
141 detector,
142 membership,
143 shutdown_tx,
144 join,
145 })
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151 use crate::swim::detector::TransportFabric;
152 use std::net::{IpAddr, Ipv4Addr};
153 use std::time::Duration;
154
155 fn addr(p: u16) -> SocketAddr {
156 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), p)
157 }
158
159 fn cfg() -> SwimConfig {
160 SwimConfig {
161 probe_interval: Duration::from_millis(100),
162 probe_timeout: Duration::from_millis(40),
163 indirect_probes: 2,
164 suspicion_mult: 4,
165 min_suspicion: Duration::from_millis(500),
166 initial_incarnation: Incarnation::ZERO,
167 max_piggyback: 6,
168 fanout_lambda: 3,
169 }
170 }
171
172 #[tokio::test]
173 async fn spawn_solo_cluster_has_only_local() {
174 let fab = TransportFabric::new();
175 let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7100)).await);
176 let handle = spawn(cfg(), NodeId::new("a"), addr(7100), vec![], transport)
177 .await
178 .expect("spawn");
179 assert_eq!(handle.membership().len(), 1);
180 assert!(handle.membership().is_solo());
181 handle.shutdown().await;
182 }
183
184 #[tokio::test]
185 async fn spawn_seeds_populates_membership() {
186 let fab = TransportFabric::new();
187 let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7110)).await);
188 let handle = spawn(
189 cfg(),
190 NodeId::new("a"),
191 addr(7110),
192 vec![addr(7111), addr(7112)],
193 transport,
194 )
195 .await
196 .expect("spawn");
197 assert_eq!(handle.membership().len(), 3);
198 handle.shutdown().await;
199 }
200
201 #[tokio::test]
202 async fn spawn_skips_local_addr_in_seeds() {
203 let fab = TransportFabric::new();
204 let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7120)).await);
205 let handle = spawn(
206 cfg(),
207 NodeId::new("a"),
208 addr(7120),
209 vec![addr(7120), addr(7121)],
210 transport,
211 )
212 .await
213 .expect("spawn");
214 assert_eq!(handle.membership().len(), 2);
216 handle.shutdown().await;
217 }
218
219 #[tokio::test]
220 async fn invalid_config_rejected_before_task_spawned() {
221 let fab = TransportFabric::new();
222 let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7130)).await);
223 let mut bad = cfg();
224 bad.probe_timeout = bad.probe_interval; let res = spawn(bad, NodeId::new("a"), addr(7130), vec![], transport).await;
226 match res {
227 Err(SwimError::InvalidConfig { .. }) => {}
228 Err(other) => panic!("expected InvalidConfig, got {other:?}"),
229 Ok(_) => panic!("expected InvalidConfig error"),
230 }
231 }
232
233 #[tokio::test]
234 async fn shutdown_joins_promptly() {
235 let fab = TransportFabric::new();
236 let transport: Arc<dyn Transport> = Arc::new(fab.bind(addr(7140)).await);
237 let handle = spawn(cfg(), NodeId::new("a"), addr(7140), vec![], transport)
238 .await
239 .expect("spawn");
240 let start = std::time::Instant::now();
241 tokio::time::timeout(Duration::from_millis(500), handle.shutdown())
242 .await
243 .expect("shutdown did not join within budget");
244 assert!(start.elapsed() < Duration::from_millis(500));
245 }
246}