Skip to main content

serf_core/serf/base/tests/serf/
join.rs

1use std::marker::PhantomData;
2
3use crate::delegate::MergeDelegate;
4
5use super::*;
6
7/// Unit tests for the join intent buffer early
8pub async fn join_intent_buffer_early<T>(transport_opts: T::Options)
9where
10  T: Transport<Id = SmolStr>,
11{
12  let opts = test_config();
13  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
14
15  // Deliver a join intent message early
16  let j = JoinMessage {
17    ltime: 10.into(),
18    id: "test".into(),
19  };
20
21  assert!(s1.handle_node_join_intent(&j).await, "should rebroadcast");
22  assert!(
23    !s1.handle_node_join_intent(&j).await,
24    "should not rebroadcast"
25  );
26
27  // Check that we buffered
28  {
29    let members = s1.inner.members.read().await;
30    let ltime = recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).unwrap();
31    assert_eq!(ltime, 10.into(), "bad buffer");
32  }
33
34  s1.shutdown().await.unwrap();
35}
36
37/// Unit tests for the join intent old message
38pub async fn join_intent_old_message<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
39where
40  T: Transport<Id = SmolStr>,
41{
42  let opts = test_config();
43  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
44
45  {
46    let mut members = s1.inner.members.write().await;
47    members.states.insert(
48      "test".into(),
49      MemberState {
50        member: Member {
51          node: Node::new("test".into(), addr),
52          tags: Arc::new(Default::default()),
53          status: MemberStatus::Alive,
54          memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
55          memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
56          protocol_version: crate::types::ProtocolVersion::V1,
57          delegate_version: crate::types::DelegateVersion::V1,
58        },
59        status_time: 12.into(),
60        leave_time: None,
61      },
62    );
63  }
64
65  let j = JoinMessage {
66    ltime: 10.into(),
67    id: "test".into(),
68  };
69
70  assert!(
71    !s1.handle_node_join_intent(&j).await,
72    "should not rebroadcast"
73  );
74
75  // Check that we didn't buffer anything
76  {
77    let members = s1.inner.members.read().await;
78    assert!(
79      recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).is_none(),
80      "should not have buffered intent"
81    );
82  }
83
84  s1.shutdown().await.unwrap();
85}
86
87/// Unit tests for the join intent newer
88pub async fn join_intent_newer<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
89where
90  T: Transport<Id = SmolStr>,
91{
92  let opts = test_config();
93  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
94  {
95    let mut members = s1.inner.members.write().await;
96    members.states.insert(
97      "test".into(),
98      MemberState {
99        member: Member {
100          node: Node::new("test".into(), addr),
101          tags: Arc::new(Default::default()),
102          status: MemberStatus::Alive,
103          memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
104          memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
105          protocol_version: crate::types::ProtocolVersion::V1,
106          delegate_version: crate::types::DelegateVersion::V1,
107        },
108        status_time: 12.into(),
109        leave_time: None,
110      },
111    );
112  }
113
114  let j = JoinMessage {
115    ltime: 14.into(),
116    id: "test".into(),
117  };
118
119  assert!(s1.handle_node_join_intent(&j).await, "should rebroadcast");
120
121  {
122    let members = s1.inner.members.read().await;
123    assert!(
124      recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).is_none(),
125      "should not have buffered intent"
126    );
127
128    let m = members.states.get("test").unwrap();
129    assert_eq!(m.status_time, 14.into(), "should update join time");
130    assert_eq!(s1.inner.clock.time(), 15.into(), "should update clock");
131  }
132
133  s1.shutdown().await.unwrap();
134}
135
136/// Unit tests for the join intent reset leaving
137pub async fn join_intent_reset_leaving<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
138where
139  T: Transport<Id = SmolStr>,
140{
141  let opts = test_config();
142  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
143
144  {
145    let mut members = s1.inner.members.write().await;
146    members.states.insert(
147      "test".into(),
148      MemberState {
149        member: Member {
150          node: Node::new("test".into(), addr),
151          tags: Arc::new(Default::default()),
152          status: MemberStatus::Leaving,
153          memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
154          memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
155          protocol_version: crate::types::ProtocolVersion::V1,
156          delegate_version: crate::types::DelegateVersion::V1,
157        },
158        status_time: 12.into(),
159        leave_time: None,
160      },
161    );
162  }
163
164  let j = JoinMessage {
165    ltime: 14.into(),
166    id: "test".into(),
167  };
168
169  assert!(s1.handle_node_join_intent(&j).await, "should rebroadcast");
170
171  {
172    let members = s1.inner.members.read().await;
173    assert!(
174      recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).is_none(),
175      "should not have buffered intent"
176    );
177
178    let m = members.states.get("test").unwrap();
179    assert_eq!(m.status_time, 14.into(), "should update join time");
180    assert_eq!(m.member.status, MemberStatus::Alive, "should update status");
181    assert_eq!(s1.inner.clock.time(), 15.into(), "should update clock");
182  }
183
184  s1.shutdown().await.unwrap();
185}
186
187/// Unit tests for the join leave ltime logic
188pub async fn join_leave_ltime<T>(transport_opts1: T::Options, transport_opts2: T::Options)
189where
190  T: Transport,
191{
192  let opts = test_config();
193  let s1 = Serf::<T>::new(transport_opts1, opts).await.unwrap();
194  let opts = test_config();
195  let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
196
197  let serfs = [s1, s2];
198  wait_until_num_nodes(1, &serfs).await;
199
200  let (_, addr) = serfs[1].inner.memberlist.advertise_node().into_components();
201  serfs[0]
202    .join(MaybeResolvedAddress::resolved(addr), false)
203    .await
204    .unwrap();
205
206  wait_until_num_nodes(2, &serfs).await;
207
208  let now = Epoch::now();
209
210  loop {
211    let members = serfs[1].inner.members.read().await;
212    let mut cond1 = false;
213    let mut cond2 = false;
214    if let Some(m) = members.states.get(serfs[0].inner.memberlist.local_id()) {
215      if m.status_time == 1.into() {
216        cond1 = true;
217      }
218
219      if serfs[1].inner.clock.time() > m.status_time {
220        cond2 = true;
221      }
222    }
223
224    if cond1 && cond2 {
225      break;
226    }
227
228    if now.elapsed() > Duration::from_secs(7) {
229      panic!("timed out waiting for status time to be updated");
230    }
231
232    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
233  }
234
235  let old_clock = serfs[1].inner.clock.time();
236
237  serfs[0].leave().await.unwrap();
238
239  loop {
240    let mut cond1 = false;
241
242    if serfs[1].inner.clock.time() > old_clock {
243      cond1 = true;
244    }
245
246    if cond1 {
247      break;
248    }
249
250    if now.elapsed() > Duration::from_secs(7) {
251      panic!(
252        "leave should increment ({} / {})",
253        serfs[1].inner.clock.time(),
254        old_clock
255      );
256    }
257
258    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
259  }
260
261  for s in serfs.iter() {
262    s.shutdown().await.unwrap();
263  }
264}
265
266/// Unit tests for the join pending intent logic
267pub async fn join_pending_intent<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
268where
269  T: Transport<Id = SmolStr>,
270{
271  let opts = test_config();
272  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
273  {
274    let mut members = s1.inner.members.write().await;
275    upsert_intent::<SmolStr>(
276      &mut members.recent_intents,
277      &"test".into(),
278      MessageType::Join,
279      5.into(),
280      Epoch::now,
281    );
282  }
283
284  s1.handle_node_join(Arc::new(NodeState {
285    id: "test".into(),
286    addr,
287    meta: Meta::empty(),
288    state: memberlist_core::proto::State::Alive,
289    protocol_version: crate::types::MemberlistProtocolVersion::V1,
290    delegate_version: crate::types::MemberlistDelegateVersion::V1,
291  }))
292  .await;
293
294  {
295    let members = s1.inner.members.read().await;
296    let m = members.states.get("test").unwrap();
297    assert_eq!(m.status_time, 5.into());
298    assert_eq!(m.member.status, MemberStatus::Alive);
299  }
300
301  s1.shutdown().await.unwrap();
302}
303
304/// Unit tests for the join pending intent logic
305pub async fn join_pending_intents<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
306where
307  T: Transport<Id = SmolStr>,
308{
309  let opts = test_config();
310  let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
311  {
312    let mut members = s1.inner.members.write().await;
313    upsert_intent::<SmolStr>(
314      &mut members.recent_intents,
315      &"test".into(),
316      MessageType::Join,
317      5.into(),
318      Epoch::now,
319    );
320    upsert_intent::<SmolStr>(
321      &mut members.recent_intents,
322      &"test".into(),
323      MessageType::Leave,
324      6.into(),
325      Epoch::now,
326    );
327  }
328
329  s1.handle_node_join(Arc::new(NodeState {
330    id: "test".into(),
331    addr,
332    meta: Meta::empty(),
333    state: memberlist_core::proto::State::Alive,
334    protocol_version: crate::types::MemberlistProtocolVersion::V1,
335    delegate_version: crate::types::MemberlistDelegateVersion::V1,
336  }))
337  .await;
338
339  {
340    let members = s1.inner.members.read().await;
341    let m = members.states.get("test").unwrap();
342    assert_eq!(m.status_time, 6.into());
343    assert_eq!(m.member.status, MemberStatus::Leaving);
344  }
345
346  s1.shutdown().await.unwrap();
347}
348
349/// Unit tests for the join leave
350pub async fn serf_join_leave<T>(transport_opts1: T::Options, transport_opts2: T::Options)
351where
352  T: Transport,
353{
354  let s1 = Serf::<T>::new(transport_opts1, test_config())
355    .await
356    .unwrap();
357  let reap_interval = s1.inner.opts.reap_interval();
358  let s2 = Serf::<T>::new(transport_opts2, test_config())
359    .await
360    .unwrap();
361
362  let serfs = [s1, s2];
363  wait_until_num_nodes(1, &serfs).await;
364
365  let node = serfs[1]
366    .inner
367    .memberlist
368    .advertise_node()
369    .map_address(MaybeResolvedAddress::resolved);
370  serfs[0].join(node.address().clone(), false).await.unwrap();
371
372  wait_until_num_nodes(2, &serfs).await;
373
374  serfs[1].leave().await.unwrap();
375
376  <T::Runtime as RuntimeLite>::sleep(reap_interval * 2).await;
377
378  wait_until_num_nodes(1, &serfs).await;
379}
380
381/// Unit test for serf join leave join
382pub async fn serf_join_leave_join<T>(transport_opts1: T::Options, transport_opts2: T::Options)
383where
384  T: Transport,
385  T::Options: Clone,
386{
387  let s1 = Serf::<T>::new(
388    transport_opts1,
389    test_config().with_reap_interval(Duration::from_secs(10)),
390  )
391  .await
392  .unwrap();
393  let s2 = Serf::<T>::new(
394    transport_opts2.clone(),
395    test_config().with_reap_interval(Duration::from_secs(10)),
396  )
397  .await
398  .unwrap();
399
400  let mut serfs = [s1, s2];
401  wait_until_num_nodes(1, &serfs).await;
402
403  let node = serfs[1]
404    .inner
405    .memberlist
406    .advertise_node()
407    .map_address(MaybeResolvedAddress::resolved);
408  serfs[0].join(node.address().clone(), false).await.unwrap();
409
410  wait_until_num_nodes(2, &serfs).await;
411
412  serfs[1].leave().await.unwrap();
413  serfs[1].shutdown().await.unwrap();
414
415  let t = serfs[1].inner.opts.memberlist_options.probe_interval() * 5;
416  // Give the reaper time to reap nodes
417  <T::Runtime as RuntimeLite>::sleep(t).await;
418
419  // s1 should see the node as having left
420  let start = Epoch::now();
421  loop {
422    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
423
424    let members = serfs[0].inner.members.read().await;
425    let mut any_left = false;
426    for member in members.states.values() {
427      if member.member.status == MemberStatus::Left {
428        any_left = true;
429        break;
430      }
431    }
432
433    if any_left {
434      break;
435    }
436
437    if !any_left && start.elapsed() > Duration::from_secs(7) {
438      panic!("Node should have left");
439    }
440  }
441
442  // Bring the node back
443  let s2 = Serf::<T>::new(
444    transport_opts2,
445    test_config().with_reap_interval(Duration::from_secs(10)),
446  )
447  .await
448  .unwrap();
449
450  serfs[1] = s2;
451  wait_until_num_nodes(1, &serfs[1..]).await;
452
453  // Re-attempt the join
454  let node = serfs[1]
455    .inner
456    .memberlist
457    .advertise_node()
458    .map_address(MaybeResolvedAddress::resolved);
459  serfs[0].join(node.address().clone(), false).await.unwrap();
460
461  let start = Epoch::now();
462  loop {
463    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
464
465    let members = serfs[0].inner.members.read().await;
466    let mut any_left = false;
467    for member in members.states.values() {
468      if member.member.status == MemberStatus::Left {
469        any_left = true;
470        break;
471      }
472    }
473
474    if !any_left {
475      break;
476    }
477
478    if start.elapsed() > Duration::from_secs(7) {
479      panic!("all nodes should be alive!");
480    }
481  }
482
483  for s in serfs.iter() {
484    s.shutdown().await.unwrap();
485  }
486}
487
488/// Unit test for serf join ignore old
489pub async fn serf_join_ignore_old<T>(transport_opts1: T::Options, transport_opts2: T::Options)
490where
491  T: Transport,
492{
493  let s1 = Serf::<T>::new(transport_opts1, test_config())
494    .await
495    .unwrap();
496
497  let (event_tx, event_rx) = EventProducer::bounded(4);
498  let s2 = Serf::<T>::with_event_producer(transport_opts2, test_config(), event_tx)
499    .await
500    .unwrap();
501
502  let serfs = [s1, s2];
503  wait_until_num_nodes(1, &serfs).await;
504
505  // Fire a user event
506  serfs[0]
507    .user_event("event!", Bytes::from_static(b"test"), false)
508    .await
509    .unwrap();
510  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(10)).await;
511
512  serfs[0]
513    .user_event("second", Bytes::from_static(b"foobar"), false)
514    .await
515    .unwrap();
516  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(10)).await;
517
518  // join with ignoreOld set to true! should not get events
519  let node = serfs[0]
520    .inner
521    .memberlist
522    .advertise_node()
523    .map_address(MaybeResolvedAddress::resolved);
524  serfs[1].join(node.address().clone(), true).await.unwrap();
525
526  wait_until_num_nodes(2, &serfs).await;
527
528  // check the events to make sure we got nothing
529  test_user_events(event_rx.rx, vec![], vec![]).await;
530
531  for s in serfs.iter() {
532    s.shutdown().await.unwrap();
533  }
534}
535
536#[derive(Debug, thiserror::Error)]
537#[error("merge canceled")]
538struct CancelMergeError;
539
540#[derive(Clone)]
541struct CancelMergeDelegate<A: CheapClone + Send + Sync + 'static> {
542  invoked: Arc<AtomicBool>,
543  _phantom: PhantomData<A>,
544}
545
546impl<A: CheapClone + Send + Sync + 'static> MergeDelegate for CancelMergeDelegate<A> {
547  type Error = CancelMergeError;
548
549  type Id = SmolStr;
550
551  type Address = A;
552
553  async fn notify_merge(
554    &self,
555    _members: Arc<[Member<Self::Id, Self::Address>]>,
556  ) -> Result<(), Self::Error> {
557    self.invoked.store(true, Ordering::SeqCst);
558    Err(CancelMergeError)
559  }
560}
561
562/// Unit test for serf join cancel
563pub async fn serf_join_cancel<T>(transport_opts1: T::Options, transport_opts2: T::Options)
564where
565  T: Transport<Id = SmolStr>,
566{
567  let cmd1 = CancelMergeDelegate {
568    invoked: Arc::new(AtomicBool::new(false)),
569    _phantom: PhantomData,
570  };
571  let s1 = Serf::<T, _>::with_delegate(
572    transport_opts1,
573    test_config(),
574    DefaultDelegate::<T>::new().with_merge_delegate(cmd1.clone()),
575  )
576  .await
577  .unwrap();
578  let cmd2 = CancelMergeDelegate {
579    invoked: Arc::new(AtomicBool::new(false)),
580    _phantom: PhantomData,
581  };
582  let s2 = Serf::<T, _>::with_delegate(
583    transport_opts2,
584    test_config(),
585    DefaultDelegate::<T>::new().with_merge_delegate(cmd2.clone()),
586  )
587  .await
588  .unwrap();
589
590  let serfs = [s1, s2];
591  wait_until_num_nodes(0, &serfs).await;
592
593  let node = serfs[1]
594    .inner
595    .memberlist
596    .advertise_node()
597    .map_address(MaybeResolvedAddress::resolved);
598
599  let err = serfs[0]
600    .join(node.address().clone(), false)
601    .await
602    .unwrap_err();
603  assert!(err.to_string().contains("merge canceled"));
604
605  wait_until_num_nodes(0, &serfs).await;
606
607  assert!(cmd1.invoked.load(Ordering::SeqCst));
608  assert!(cmd2.invoked.load(Ordering::SeqCst));
609
610  for s in serfs.iter() {
611    s.shutdown().await.unwrap();
612  }
613}