serf_core/serf/base/tests/serf/
leave.rs

1use super::*;
2
3/// Unit tests for the leave intent buffer early
4pub async fn leave_intent_buffer_early<T>(transport_opts: T::Options)
5where
6  T: Transport<Id = SmolStr>,
7{
8  let opts = test_config();
9  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
10
11  // Deliver a leave intent message early
12  let j = LeaveMessage {
13    ltime: 10.into(),
14    id: "test".into(),
15    prune: false,
16  };
17
18  assert!(s1.handle_node_leave_intent(&j).await, "should rebroadcast");
19  assert!(
20    !s1.handle_node_leave_intent(&j).await,
21    "should not rebroadcast"
22  );
23
24  // Check that we buffered
25  {
26    let members = s1.inner.members.read().await;
27    let ltime = recent_intent(&members.recent_intents, &"test".into(), MessageType::Leave).unwrap();
28    assert_eq!(ltime, 10.into(), "bad buffer");
29  }
30
31  s1.shutdown().await.unwrap();
32}
33
34/// Unit tests for the leave intent old message
35pub async fn leave_intent_old_message<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
36where
37  T: Transport<Id = SmolStr>,
38{
39  let opts = test_config();
40  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
41
42  {
43    let mut members = s1.inner.members.write().await;
44    members.states.insert(
45      "test".into(),
46      MemberState {
47        member: Member {
48          node: Node::new("test".into(), addr),
49          tags: Arc::new(Default::default()),
50          status: MemberStatus::Alive,
51          memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
52          memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
53          protocol_version: crate::types::ProtocolVersion::V1,
54          delegate_version: crate::types::DelegateVersion::V1,
55        },
56        status_time: 12.into(),
57        leave_time: None,
58      },
59    );
60  }
61
62  let j = LeaveMessage {
63    ltime: 10.into(),
64    id: "test".into(),
65    prune: false,
66  };
67
68  assert!(
69    !s1.handle_node_leave_intent(&j).await,
70    "should not rebroadcast"
71  );
72
73  {
74    let members = s1.inner.members.read().await;
75    assert!(
76      recent_intent(&members.recent_intents, &"test".into(), MessageType::Leave).is_none(),
77      "should not have buffered intent"
78    );
79  }
80
81  s1.shutdown().await.unwrap();
82}
83
84/// Unit tests for the leave intent newer
85pub async fn leave_intent_newer<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
86where
87  T: Transport<Id = SmolStr>,
88{
89  let opts = test_config();
90  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
91  {
92    let mut members = s1.inner.members.write().await;
93    members.states.insert(
94      "test".into(),
95      MemberState {
96        member: Member {
97          node: Node::new("test".into(), addr),
98          tags: Arc::new(Default::default()),
99          status: MemberStatus::Alive,
100          memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
101          memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
102          protocol_version: crate::types::ProtocolVersion::V1,
103          delegate_version: crate::types::DelegateVersion::V1,
104        },
105        status_time: 12.into(),
106        leave_time: None,
107      },
108    );
109  }
110
111  let j = LeaveMessage {
112    ltime: 14.into(),
113    id: "test".into(),
114    prune: false,
115  };
116
117  assert!(s1.handle_node_leave_intent(&j).await, "should rebroadcast");
118
119  {
120    let members = s1.inner.members.read().await;
121    assert!(
122      recent_intent(&members.recent_intents, &"test".into(), MessageType::Leave).is_none(),
123      "should not have buffered intent"
124    );
125
126    let m = members.states.get("test").unwrap();
127    assert_eq!(
128      m.member.status,
129      MemberStatus::Leaving,
130      "should update status"
131    );
132    assert_eq!(s1.inner.clock.time(), 15.into(), "should update clock");
133  }
134
135  s1.shutdown().await.unwrap();
136}
137
138/// Unit tests for the force leave failed
139pub async fn serf_force_leave_failed<T>(
140  transport_opts1: T::Options,
141  transport_opts2: T::Options,
142  transport_opts3: T::Options,
143) where
144  T: Transport,
145{
146  let s1 = Serf::<T>::new(transport_opts1, test_config())
147    .await
148    .unwrap();
149  let s2 = Serf::<T>::new(transport_opts2, test_config())
150    .await
151    .unwrap();
152  let s3 = Serf::<T>::new(transport_opts3, test_config())
153    .await
154    .unwrap();
155
156  let mut serfs = [s1, s2, s3];
157  wait_until_num_nodes(1, &serfs).await;
158
159  let node = serfs[1]
160    .advertise_node()
161    .map_address(MaybeResolvedAddress::resolved);
162  serfs[0].join(node, false).await.unwrap();
163
164  let node = serfs[2]
165    .advertise_node()
166    .map_address(MaybeResolvedAddress::resolved);
167  serfs[0].join(node, false).await.unwrap();
168
169  wait_until_num_nodes(3, &serfs).await;
170
171  serfs[1].shutdown().await.unwrap();
172
173  let s2id = serfs[1].local_id().clone();
174
175  let start = Epoch::now();
176  loop {
177    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
178
179    let members = serfs[0].inner.members.read().await;
180    if test_member_status(&members.states, s2id.clone(), MemberStatus::Failed).is_ok() {
181      break;
182    }
183
184    if start.elapsed() > Duration::from_secs(7) {
185      panic!("timed out");
186    }
187  }
188  serfs[0].force_leave(s2id, true).await.unwrap();
189  serfs.swap(1, 2);
190  wait_until_num_nodes(2, &serfs[..2]).await;
191}
192
193/// Unit tests for the force leave leaving
194pub async fn serf_force_leave_leaving<T>(
195  transport_opts1: T::Options,
196  transport_opts2: T::Options,
197  transport_opts3: T::Options,
198) where
199  T: Transport,
200{
201  const TOMBSTONE_TIMEOUT: Duration = Duration::from_secs(3600);
202  const LEAVE_PROPAGATE_DELAY: Duration = Duration::from_secs(5);
203
204  let s1 = Serf::<T>::new(
205    transport_opts1,
206    test_config()
207      .with_tombstone_timeout(TOMBSTONE_TIMEOUT)
208      .with_leave_propagate_delay(LEAVE_PROPAGATE_DELAY),
209  )
210  .await
211  .unwrap();
212  let s2 = Serf::<T>::new(
213    transport_opts2,
214    test_config()
215      .with_tombstone_timeout(TOMBSTONE_TIMEOUT)
216      .with_leave_propagate_delay(LEAVE_PROPAGATE_DELAY),
217  )
218  .await
219  .unwrap();
220  let s3 = Serf::<T>::new(
221    transport_opts3,
222    test_config()
223      .with_tombstone_timeout(TOMBSTONE_TIMEOUT)
224      .with_leave_propagate_delay(LEAVE_PROPAGATE_DELAY),
225  )
226  .await
227  .unwrap();
228
229  let mut serfs = [s1, s2, s3];
230  wait_until_num_nodes(1, &serfs).await;
231
232  let node = serfs[1]
233    .advertise_node()
234    .map_address(MaybeResolvedAddress::resolved);
235  serfs[0].join(node, false).await.unwrap();
236
237  let node = serfs[2]
238    .advertise_node()
239    .map_address(MaybeResolvedAddress::resolved);
240  serfs[0].join(node, false).await.unwrap();
241
242  wait_until_num_nodes(3, &serfs).await;
243
244  //Put s2 in left state
245  serfs[1].leave().await.unwrap();
246
247  let s2id = serfs[1].local_id().clone();
248
249  let start = Epoch::now();
250  loop {
251    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
252
253    let members = serfs[0].inner.members.read().await;
254    if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
255      break;
256    }
257
258    if start.elapsed() > Duration::from_secs(7) {
259      panic!("timed out");
260    }
261  }
262
263  serfs[0].force_leave(s2id, true).await.unwrap();
264  serfs.swap(1, 2);
265  wait_until_num_nodes(2, &serfs[..2]).await;
266}
267
268/// Unit tests for the force leave left
269pub async fn serf_force_leave_left<T>(
270  transport_opts1: T::Options,
271  transport_opts2: T::Options,
272  transport_opts3: T::Options,
273) where
274  T: Transport,
275{
276  const TOMBSTONE_TIMEOUT: Duration = Duration::from_secs(3600);
277
278  let s1 = Serf::<T>::new(
279    transport_opts1,
280    test_config().with_tombstone_timeout(TOMBSTONE_TIMEOUT),
281  )
282  .await
283  .unwrap();
284  let s2 = Serf::<T>::new(
285    transport_opts2,
286    test_config().with_tombstone_timeout(TOMBSTONE_TIMEOUT),
287  )
288  .await
289  .unwrap();
290  let s3 = Serf::<T>::new(
291    transport_opts3,
292    test_config().with_tombstone_timeout(TOMBSTONE_TIMEOUT),
293  )
294  .await
295  .unwrap();
296
297  let mut serfs = [s1, s2, s3];
298  wait_until_num_nodes(1, &serfs).await;
299
300  let node = serfs[1]
301    .advertise_node()
302    .map_address(MaybeResolvedAddress::resolved);
303  serfs[0].join(node, false).await.unwrap();
304
305  let node = serfs[2]
306    .advertise_node()
307    .map_address(MaybeResolvedAddress::resolved);
308  serfs[0].join(node, false).await.unwrap();
309
310  wait_until_num_nodes(3, &serfs).await;
311
312  //Put s2 in left state
313  serfs[1].leave().await.unwrap();
314
315  let s2id = serfs[1].local_id().clone();
316
317  let start = Epoch::now();
318  loop {
319    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
320
321    let members = serfs[0].inner.members.read().await;
322    if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
323      break;
324    }
325
326    if start.elapsed() > Duration::from_secs(7) {
327      panic!("timed out");
328    }
329  }
330
331  serfs[0].force_leave(s2id, true).await.unwrap();
332  serfs.swap(1, 2);
333  wait_until_num_nodes(2, &serfs[..2]).await;
334}
335
336/// Unit tests for the leave rejoin different role
337pub async fn serf_leave_rejoin_different_role<T>(
338  transport_opts1: T::Options,
339  transport_opts2: T::Options,
340) where
341  T: Transport,
342  T::Options: Clone,
343{
344  let s1 = Serf::<T>::new(transport_opts1, test_config())
345    .await
346    .unwrap();
347  let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
348    .await
349    .unwrap();
350
351  let mut serfs = [s1, s2];
352  wait_until_num_nodes(1, &serfs).await;
353
354  let node = serfs[1]
355    .inner
356    .memberlist
357    .advertise_node()
358    .map_address(MaybeResolvedAddress::resolved);
359  serfs[0].join(node.clone(), false).await.unwrap();
360
361  wait_until_num_nodes(2, &serfs).await;
362
363  serfs[1].leave().await.unwrap();
364  serfs[1].shutdown().await.unwrap();
365
366  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(10)).await;
367
368  // Make s3 look just like s2, but create a new node with a new role
369  let s3 = Serf::<T>::new(
370    transport_opts2,
371    test_config().with_tags([("role", "bar")].into_iter()),
372  )
373  .await
374  .unwrap();
375
376  let node = serfs[0]
377    .advertise_node()
378    .map_address(MaybeResolvedAddress::resolved);
379
380  serfs[1] = s3;
381
382  serfs[1].join(node, false).await.unwrap();
383
384  wait_until_num_nodes(2, &serfs).await;
385
386  let start = Epoch::now();
387  let s3id = serfs[1].local_id().clone();
388  loop {
389    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
390
391    let members = serfs[0].inner.members.read().await;
392    let mut find = None;
393    for (id, member) in members.states.iter() {
394      if s3id.eq(id) {
395        find = Some(member);
396        break;
397      }
398    }
399
400    if let Some(member) = find {
401      let role = member.member.tags.get("role");
402      assert_eq!(role, Some(&"bar".into()), "bad role: {:?}", role);
403      return;
404    }
405
406    if start.elapsed() > Duration::from_secs(7) {
407      panic!("timed out");
408    }
409  }
410}
411
412/// Unit tests for the serf leave snapshot recovery
413pub async fn serf_leave_snapshot_recovery<T, F>(
414  transport_opts1: T::Options,
415  transport_opts2: T::Options,
416  get_transport: impl FnOnce(T::Id, T::ResolvedAddress) -> F + Copy,
417) where
418  T: Transport,
419  F: core::future::Future<Output = T::Options>,
420{
421  let td = tempfile::tempdir().unwrap();
422  let snap_path = td.path().join("serf_leave_snapshot_recovery");
423
424  let s1 = Serf::<T>::new(
425    transport_opts1,
426    test_config().with_reap_interval(Duration::from_secs(30)),
427  )
428  .await
429  .unwrap();
430  let s2 = Serf::<T>::new(
431    transport_opts2,
432    test_config()
433      .with_snapshot_path(Some(snap_path.clone()))
434      .with_reap_interval(Duration::from_secs(30)),
435  )
436  .await
437  .unwrap();
438
439  let mut serfs = vec![s1, s2];
440  wait_until_num_nodes(1, &serfs).await;
441
442  let node = serfs[1]
443    .inner
444    .memberlist
445    .advertise_node()
446    .map_address(MaybeResolvedAddress::resolved);
447  serfs[0].join(node.clone(), false).await.unwrap();
448
449  wait_until_num_nodes(2, &serfs).await;
450
451  // Put s2 in left state
452  serfs[1].leave().await.unwrap();
453  serfs[1].shutdown().await.unwrap();
454  let t = serfs[1].inner.opts.memberlist_options.probe_interval * 5;
455  let (s2id, s2addr) = serfs[1].advertise_node().into_components();
456  let _ = serfs.pop().unwrap();
457
458  <T::Runtime as RuntimeLite>::sleep(t).await;
459
460  let start = Epoch::now();
461  loop {
462    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
463
464    let members = serfs[0].inner.members.read().await;
465    if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
466      break;
467    }
468
469    if start.elapsed() > Duration::from_secs(7) {
470      panic!("timed out");
471    }
472  }
473
474  // Restart s2 from the snapshot now!
475  let s2 = Serf::<T>::new(
476    get_transport(s2id.clone(), s2addr).await,
477    test_config()
478      .with_snapshot_path(Some(snap_path.clone()))
479      .with_reap_interval(Duration::from_secs(30)),
480  )
481  .await
482  .unwrap();
483  serfs.push(s2);
484
485  // Wait for the node to auto rejoin
486
487  // Verify that s2 did not join
488  let start = Epoch::now();
489  let mut cond1 = false;
490  let mut cond2 = false;
491  loop {
492    if !cond1 {
493      let num = serfs[1].num_members().await;
494      if num == 1 {
495        cond1 = true;
496      }
497    }
498
499    if !cond2 {
500      let members = serfs[0].inner.members.read().await;
501      if test_member_status(&members.states, s2id.clone(), MemberStatus::Left).is_ok() {
502        cond2 = true;
503      }
504    }
505
506    if cond1 && cond2 {
507      break;
508    }
509
510    if start.elapsed() > Duration::from_secs(7) {
511      panic!("time out");
512    }
513
514    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
515  }
516}