serf_core/serf/base/tests/serf/
leave.rs1use super::*;
2
3pub async fn leave_intent_buffer_early<T>(transport_opts: T::Options)
5where
6 T: Transport<Id = SmolStr>,
7{
8 let opts = test_config();
9 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
10
11 let j = LeaveMessage {
13 ltime: 10.into(),
14 id: "test".into(),
15 prune: false,
16 };
17
18 assert!(s1.handle_node_leave_intent(&j).await, "should rebroadcast");
19 assert!(
20 !s1.handle_node_leave_intent(&j).await,
21 "should not rebroadcast"
22 );
23
24 {
26 let members = s1.inner.members.read().await;
27 let ltime = recent_intent(&members.recent_intents, &"test".into(), MessageType::Leave).unwrap();
28 assert_eq!(ltime, 10.into(), "bad buffer");
29 }
30
31 s1.shutdown().await.unwrap();
32}
33
34pub async fn leave_intent_old_message<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
36where
37 T: Transport<Id = SmolStr>,
38{
39 let opts = test_config();
40 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
41
42 {
43 let mut members = s1.inner.members.write().await;
44 members.states.insert(
45 "test".into(),
46 MemberState {
47 member: Member {
48 node: Node::new("test".into(), addr),
49 tags: Arc::new(Default::default()),
50 status: MemberStatus::Alive,
51 memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
52 memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
53 protocol_version: crate::types::ProtocolVersion::V1,
54 delegate_version: crate::types::DelegateVersion::V1,
55 },
56 status_time: 12.into(),
57 leave_time: None,
58 },
59 );
60 }
61
62 let j = LeaveMessage {
63 ltime: 10.into(),
64 id: "test".into(),
65 prune: false,
66 };
67
68 assert!(
69 !s1.handle_node_leave_intent(&j).await,
70 "should not rebroadcast"
71 );
72
73 {
74 let members = s1.inner.members.read().await;
75 assert!(
76 recent_intent(&members.recent_intents, &"test".into(), MessageType::Leave).is_none(),
77 "should not have buffered intent"
78 );
79 }
80
81 s1.shutdown().await.unwrap();
82}
83
84pub async fn leave_intent_newer<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
86where
87 T: Transport<Id = SmolStr>,
88{
89 let opts = test_config();
90 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
91 {
92 let mut members = s1.inner.members.write().await;
93 members.states.insert(
94 "test".into(),
95 MemberState {
96 member: Member {
97 node: Node::new("test".into(), addr),
98 tags: Arc::new(Default::default()),
99 status: MemberStatus::Alive,
100 memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
101 memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
102 protocol_version: crate::types::ProtocolVersion::V1,
103 delegate_version: crate::types::DelegateVersion::V1,
104 },
105 status_time: 12.into(),
106 leave_time: None,
107 },
108 );
109 }
110
111 let j = LeaveMessage {
112 ltime: 14.into(),
113 id: "test".into(),
114 prune: false,
115 };
116
117 assert!(s1.handle_node_leave_intent(&j).await, "should rebroadcast");
118
119 {
120 let members = s1.inner.members.read().await;
121 assert!(
122 recent_intent(&members.recent_intents, &"test".into(), MessageType::Leave).is_none(),
123 "should not have buffered intent"
124 );
125
126 let m = members.states.get("test").unwrap();
127 assert_eq!(
128 m.member.status,
129 MemberStatus::Leaving,
130 "should update status"
131 );
132 assert_eq!(s1.inner.clock.time(), 15.into(), "should update clock");
133 }
134
135 s1.shutdown().await.unwrap();
136}
137
138pub async fn serf_force_leave_failed<T>(
140 transport_opts1: T::Options,
141 transport_opts2: T::Options,
142 transport_opts3: T::Options,
143) where
144 T: Transport,
145{
146 let s1 = Serf::<T>::new(transport_opts1, test_config())
147 .await
148 .unwrap();
149 let s2 = Serf::<T>::new(transport_opts2, test_config())
150 .await
151 .unwrap();
152 let s3 = Serf::<T>::new(transport_opts3, test_config())
153 .await
154 .unwrap();
155
156 let mut serfs = [s1, s2, s3];
157 wait_until_num_nodes(1, &serfs).await;
158
159 let node = serfs[1]
160 .advertise_node()
161 .map_address(MaybeResolvedAddress::resolved);
162 serfs[0].join(node, false).await.unwrap();
163
164 let node = serfs[2]
165 .advertise_node()
166 .map_address(MaybeResolvedAddress::resolved);
167 serfs[0].join(node, false).await.unwrap();
168
169 wait_until_num_nodes(3, &serfs).await;
170
171 serfs[1].shutdown().await.unwrap();
172
173 let s2id = serfs[1].local_id().clone();
174
175 let start = Epoch::now();
176 loop {
177 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
178
179 let members = serfs[0].inner.members.read().await;
180 if test_member_status(&members.states, s2id.clone(), MemberStatus::Failed).is_ok() {
181 break;
182 }
183
184 if start.elapsed() > Duration::from_secs(7) {
185 panic!("timed out");
186 }
187 }
188 serfs[0].force_leave(s2id, true).await.unwrap();
189 serfs.swap(1, 2);
190 wait_until_num_nodes(2, &serfs[..2]).await;
191}
192
193pub async fn serf_force_leave_leaving<T>(
195 transport_opts1: T::Options,
196 transport_opts2: T::Options,
197 transport_opts3: T::Options,
198) where
199 T: Transport,
200{
201 const TOMBSTONE_TIMEOUT: Duration = Duration::from_secs(3600);
202 const LEAVE_PROPAGATE_DELAY: Duration = Duration::from_secs(5);
203
204 let s1 = Serf::<T>::new(
205 transport_opts1,
206 test_config()
207 .with_tombstone_timeout(TOMBSTONE_TIMEOUT)
208 .with_leave_propagate_delay(LEAVE_PROPAGATE_DELAY),
209 )
210 .await
211 .unwrap();
212 let s2 = Serf::<T>::new(
213 transport_opts2,
214 test_config()
215 .with_tombstone_timeout(TOMBSTONE_TIMEOUT)
216 .with_leave_propagate_delay(LEAVE_PROPAGATE_DELAY),
217 )
218 .await
219 .unwrap();
220 let s3 = Serf::<T>::new(
221 transport_opts3,
222 test_config()
223 .with_tombstone_timeout(TOMBSTONE_TIMEOUT)
224 .with_leave_propagate_delay(LEAVE_PROPAGATE_DELAY),
225 )
226 .await
227 .unwrap();
228
229 let mut serfs = [s1, s2, s3];
230 wait_until_num_nodes(1, &serfs).await;
231
232 let node = serfs[1]
233 .advertise_node()
234 .map_address(MaybeResolvedAddress::resolved);
235 serfs[0].join(node, false).await.unwrap();
236
237 let node = serfs[2]
238 .advertise_node()
239 .map_address(MaybeResolvedAddress::resolved);
240 serfs[0].join(node, false).await.unwrap();
241
242 wait_until_num_nodes(3, &serfs).await;
243
244 serfs[1].leave().await.unwrap();
246
247 let s2id = serfs[1].local_id().clone();
248
249 let start = Epoch::now();
250 loop {
251 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
252
253 let members = serfs[0].inner.members.read().await;
254 if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
255 break;
256 }
257
258 if start.elapsed() > Duration::from_secs(7) {
259 panic!("timed out");
260 }
261 }
262
263 serfs[0].force_leave(s2id, true).await.unwrap();
264 serfs.swap(1, 2);
265 wait_until_num_nodes(2, &serfs[..2]).await;
266}
267
268pub async fn serf_force_leave_left<T>(
270 transport_opts1: T::Options,
271 transport_opts2: T::Options,
272 transport_opts3: T::Options,
273) where
274 T: Transport,
275{
276 const TOMBSTONE_TIMEOUT: Duration = Duration::from_secs(3600);
277
278 let s1 = Serf::<T>::new(
279 transport_opts1,
280 test_config().with_tombstone_timeout(TOMBSTONE_TIMEOUT),
281 )
282 .await
283 .unwrap();
284 let s2 = Serf::<T>::new(
285 transport_opts2,
286 test_config().with_tombstone_timeout(TOMBSTONE_TIMEOUT),
287 )
288 .await
289 .unwrap();
290 let s3 = Serf::<T>::new(
291 transport_opts3,
292 test_config().with_tombstone_timeout(TOMBSTONE_TIMEOUT),
293 )
294 .await
295 .unwrap();
296
297 let mut serfs = [s1, s2, s3];
298 wait_until_num_nodes(1, &serfs).await;
299
300 let node = serfs[1]
301 .advertise_node()
302 .map_address(MaybeResolvedAddress::resolved);
303 serfs[0].join(node, false).await.unwrap();
304
305 let node = serfs[2]
306 .advertise_node()
307 .map_address(MaybeResolvedAddress::resolved);
308 serfs[0].join(node, false).await.unwrap();
309
310 wait_until_num_nodes(3, &serfs).await;
311
312 serfs[1].leave().await.unwrap();
314
315 let s2id = serfs[1].local_id().clone();
316
317 let start = Epoch::now();
318 loop {
319 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
320
321 let members = serfs[0].inner.members.read().await;
322 if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
323 break;
324 }
325
326 if start.elapsed() > Duration::from_secs(7) {
327 panic!("timed out");
328 }
329 }
330
331 serfs[0].force_leave(s2id, true).await.unwrap();
332 serfs.swap(1, 2);
333 wait_until_num_nodes(2, &serfs[..2]).await;
334}
335
336pub async fn serf_leave_rejoin_different_role<T>(
338 transport_opts1: T::Options,
339 transport_opts2: T::Options,
340) where
341 T: Transport,
342 T::Options: Clone,
343{
344 let s1 = Serf::<T>::new(transport_opts1, test_config())
345 .await
346 .unwrap();
347 let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
348 .await
349 .unwrap();
350
351 let mut serfs = [s1, s2];
352 wait_until_num_nodes(1, &serfs).await;
353
354 let node = serfs[1]
355 .inner
356 .memberlist
357 .advertise_node()
358 .map_address(MaybeResolvedAddress::resolved);
359 serfs[0].join(node.clone(), false).await.unwrap();
360
361 wait_until_num_nodes(2, &serfs).await;
362
363 serfs[1].leave().await.unwrap();
364 serfs[1].shutdown().await.unwrap();
365
366 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(10)).await;
367
368 let s3 = Serf::<T>::new(
370 transport_opts2,
371 test_config().with_tags([("role", "bar")].into_iter()),
372 )
373 .await
374 .unwrap();
375
376 let node = serfs[0]
377 .advertise_node()
378 .map_address(MaybeResolvedAddress::resolved);
379
380 serfs[1] = s3;
381
382 serfs[1].join(node, false).await.unwrap();
383
384 wait_until_num_nodes(2, &serfs).await;
385
386 let start = Epoch::now();
387 let s3id = serfs[1].local_id().clone();
388 loop {
389 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
390
391 let members = serfs[0].inner.members.read().await;
392 let mut find = None;
393 for (id, member) in members.states.iter() {
394 if s3id.eq(id) {
395 find = Some(member);
396 break;
397 }
398 }
399
400 if let Some(member) = find {
401 let role = member.member.tags.get("role");
402 assert_eq!(role, Some(&"bar".into()), "bad role: {:?}", role);
403 return;
404 }
405
406 if start.elapsed() > Duration::from_secs(7) {
407 panic!("timed out");
408 }
409 }
410}
411
412pub async fn serf_leave_snapshot_recovery<T, F>(
414 transport_opts1: T::Options,
415 transport_opts2: T::Options,
416 get_transport: impl FnOnce(T::Id, T::ResolvedAddress) -> F + Copy,
417) where
418 T: Transport,
419 F: core::future::Future<Output = T::Options>,
420{
421 let td = tempfile::tempdir().unwrap();
422 let snap_path = td.path().join("serf_leave_snapshot_recovery");
423
424 let s1 = Serf::<T>::new(
425 transport_opts1,
426 test_config().with_reap_interval(Duration::from_secs(30)),
427 )
428 .await
429 .unwrap();
430 let s2 = Serf::<T>::new(
431 transport_opts2,
432 test_config()
433 .with_snapshot_path(Some(snap_path.clone()))
434 .with_reap_interval(Duration::from_secs(30)),
435 )
436 .await
437 .unwrap();
438
439 let mut serfs = vec![s1, s2];
440 wait_until_num_nodes(1, &serfs).await;
441
442 let node = serfs[1]
443 .inner
444 .memberlist
445 .advertise_node()
446 .map_address(MaybeResolvedAddress::resolved);
447 serfs[0].join(node.clone(), false).await.unwrap();
448
449 wait_until_num_nodes(2, &serfs).await;
450
451 serfs[1].leave().await.unwrap();
453 serfs[1].shutdown().await.unwrap();
454 let t = serfs[1].inner.opts.memberlist_options.probe_interval * 5;
455 let (s2id, s2addr) = serfs[1].advertise_node().into_components();
456 let _ = serfs.pop().unwrap();
457
458 <T::Runtime as RuntimeLite>::sleep(t).await;
459
460 let start = Epoch::now();
461 loop {
462 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
463
464 let members = serfs[0].inner.members.read().await;
465 if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
466 break;
467 }
468
469 if start.elapsed() > Duration::from_secs(7) {
470 panic!("timed out");
471 }
472 }
473
474 let s2 = Serf::<T>::new(
476 get_transport(s2id.clone(), s2addr).await,
477 test_config()
478 .with_snapshot_path(Some(snap_path.clone()))
479 .with_reap_interval(Duration::from_secs(30)),
480 )
481 .await
482 .unwrap();
483 serfs.push(s2);
484
485 let start = Epoch::now();
489 let mut cond1 = false;
490 let mut cond2 = false;
491 loop {
492 if !cond1 {
493 let num = serfs[1].num_members().await;
494 if num == 1 {
495 cond1 = true;
496 }
497 }
498
499 if !cond2 {
500 let members = serfs[0].inner.members.read().await;
501 if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
502 cond2 = true;
503 }
504 }
505
506 if cond1 && cond2 {
507 break;
508 }
509
510 if start.elapsed() > Duration::from_secs(7) {
511 panic!("time out");
512 }
513
514 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
515 }
516}