serf_core/serf/base/tests/
serf.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use memberlist_core::{tests::AnyError, transport::Id};
4
5use crate::{
6  event::EventProducer,
7  options::MemberlistOptions,
8  types::{Member, MemberState, MemberStatus, Tags},
9};
10
11use super::*;
12
13/// Unit tests for the serf events related functionalities
14pub mod event;
15
16/// Unit tests for the serf leave related functionalities
17pub mod leave;
18
19/// Unit tests for the serf join related functionalities
20pub mod join;
21
22/// Unit tests for the serf ping delegate related functionalities
23pub mod delegate;
24
25/// Unit tests for the serf reconnect related functionalities
26pub mod reconnect;
27
28/// Unit tests for the serf remove related functionalities
29pub mod remove;
30
31/// Unit tests for serf reap related functionalities
32pub mod reap;
33
34/// Unit tests for the serf snapshot related functionalities
35pub mod snapshot;
36
37fn test_member_status<I: Id, A>(
38  members: &HashMap<I, MemberState<I, A>>,
39  id: I,
40  status: MemberStatus,
41) -> Result<(), AnyError> {
42  for member in members.values() {
43    if id.eq(member.member.node.id()) {
44      if member.member.status != status {
45        return Err(AnyError::from(format!(
46          "expected member {} to have status {:?}, got {:?}",
47          id, status, member.member.status
48        )));
49      }
50      return Ok(());
51    }
52  }
53  Err(AnyError::from(format!("member {} not found", id)))
54}
55
56/// Unit tests for the get queue max
57pub async fn serf_get_queue_max<T>(
58  transport_opts: T::Options,
59  mut get_addr: impl FnMut(usize) -> T::ResolvedAddress,
60) where
61  T: Transport<Id = SmolStr>,
62  T::Options: Clone,
63{
64  let s = Serf::<T>::new(transport_opts.clone(), test_config())
65    .await
66    .unwrap();
67
68  // We don't need a running Serf so fake it out with the required
69  // state.
70  {
71    let mut members = s.inner.members.write().await;
72    members.states.clear();
73    for i in 0..100 {
74      let name: SmolStr = format!("Member{i}").into();
75      members.states.insert(
76        name.clone(),
77        MemberState {
78          member: Member::new(
79            Node::new(name.clone(), get_addr(i)),
80            Default::default(),
81            MemberStatus::Alive,
82          ),
83          status_time: 0.into(),
84          leave_time: None,
85        },
86      );
87    }
88  }
89
90  // Default mode just uses the max depth.
91  let got = s.get_queue_max().await;
92  let want = 4096;
93  assert_eq!(got, want);
94
95  // Now configure a min which should take precedence.
96  s.shutdown().await.unwrap();
97  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
98
99  let sn = Serf::<T>::new(
100    transport_opts.clone(),
101    test_config().with_min_queue_depth(1024),
102  )
103  .await
104  .unwrap();
105
106  {
107    let mut members = sn.inner.members.write().await;
108    members.states.clear();
109    let old_members = s.inner.members.read().await;
110    members.states.clone_from(&old_members.states);
111  }
112
113  let got = sn.get_queue_max().await;
114  let want = 1024;
115  assert_eq!(got, want);
116
117  sn.shutdown().await.unwrap();
118  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
119
120  // Bring it under the number of nodes, so the calculation based on
121  // the number of nodes takes precedence.
122  let snn = Serf::<T>::new(transport_opts, test_config().with_min_queue_depth(16))
123    .await
124    .unwrap();
125
126  {
127    let mut members = snn.inner.members.write().await;
128    members.states.clear();
129    let old_members = sn.inner.members.read().await;
130    members.states.clone_from(&old_members.states);
131  }
132
133  let got = snn.get_queue_max().await;
134  let want = 200;
135  assert_eq!(got, want);
136
137  // Try adjusting the node count.
138  {
139    let mut members = snn.inner.members.write().await;
140    let name = SmolStr::new("another");
141    members.states.insert(
142      name.clone(),
143      MemberState {
144        member: Member::new(
145          Node::new(name.clone(), get_addr(10000)),
146          Default::default(),
147          MemberStatus::Alive,
148        ),
149        status_time: 0.into(),
150        leave_time: None,
151      },
152    );
153  }
154
155  let got = snn.get_queue_max().await;
156  let want = 202;
157  assert_eq!(got, want);
158  snn.shutdown().await.unwrap();
159  drop(snn);
160}
161
162/// Unit tests for the update
163pub async fn serf_update<T, F>(
164  transport_opts1: T::Options,
165  transport_opts2: T::Options,
166  get_transport: impl FnOnce(T::Id, T::ResolvedAddress) -> F + Copy,
167) where
168  T: Transport,
169  T::Options: Clone,
170  F: core::future::Future<Output = T::Options>,
171{
172  let (event_tx, event_rx) = EventProducer::bounded(64);
173  let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
174    .await
175    .unwrap();
176  let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
177    .await
178    .unwrap();
179  let (s2id, s2addr) = s2.advertise_node().into_components();
180
181  let mut serfs = vec![s1, s2];
182  wait_until_num_nodes(1, &serfs).await;
183
184  let node = serfs[1]
185    .inner
186    .memberlist
187    .advertise_node()
188    .map_address(MaybeResolvedAddress::resolved);
189  serfs[0].join(node.clone(), false).await.unwrap();
190
191  wait_until_num_nodes(2, &serfs).await;
192  // Now force the shutdown of s2 so it appears to fail.
193  serfs[1].shutdown().await.unwrap();
194  drop(serfs.pop().unwrap());
195
196  // Don't wait for a failure to be detected. Bring back s2 immediately
197  let start = Epoch::now();
198  let s2 = loop {
199    match Serf::<T>::new(
200      get_transport(s2id.clone(), s2addr.clone()).await,
201      test_config().with_tags([("foo", "bar")].into_iter()),
202    )
203    .await
204    {
205      Ok(s) => break s,
206      Err(e) => {
207        <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(1)).await;
208        if start.elapsed() > Duration::from_secs(20) {
209          panic!("timed out: {}", e);
210        }
211      }
212    }
213  };
214
215  let s1node = serfs[0].advertise_node();
216  s2.join(s1node.map_address(MaybeResolvedAddress::resolved), false)
217    .await
218    .unwrap();
219  serfs.push(s2);
220  wait_until_num_nodes(2, &serfs).await;
221
222  test_events(
223    event_rx.rx,
224    node.id().clone(),
225    [
226      CrateEventType::Member(MemberEventType::Join),
227      CrateEventType::Member(MemberEventType::Update),
228    ]
229    .into_iter()
230    .collect(),
231  )
232  .await;
233
234  // Verify that the member data got updated.
235  let mut found = false;
236  let members = serfs[0].inner.members.read().await;
237
238  for member in members.states.values() {
239    if member.member.node.id().eq(node.id())
240      && member.member.tags().get("foo").map(|v| v.as_str()) == Some("bar")
241    {
242      found = true;
243      break;
244    }
245  }
246  assert!(found, "did not found s2 in members");
247
248  for s in serfs.iter() {
249    s.shutdown().await.unwrap();
250  }
251}
252
253/// Unit tests for the role
254pub async fn serf_role<T>(transport_opts1: T::Options, transport_opts2: T::Options)
255where
256  T: Transport,
257{
258  let s1 = Serf::<T>::new(
259    transport_opts1,
260    test_config().with_tags([("role", "web")].into_iter()),
261  )
262  .await
263  .unwrap();
264  let s2 = Serf::<T>::new(
265    transport_opts2,
266    test_config().with_tags([("role", "lb")].into_iter()),
267  )
268  .await
269  .unwrap();
270
271  let serfs = [s1, s2];
272  wait_until_num_nodes(1, &serfs).await;
273
274  let node = serfs[1]
275    .inner
276    .memberlist
277    .advertise_node()
278    .map_address(MaybeResolvedAddress::resolved);
279  serfs[0].join(node.clone(), false).await.unwrap();
280
281  wait_until_num_nodes(2, &serfs).await;
282
283  let mut roles = HashMap::new();
284
285  let start = Epoch::now();
286  let mut cond1 = false;
287  let mut cond2 = false;
288  loop {
289    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
290
291    let members = serfs[0].inner.members.read().await;
292    for m in members.states.values() {
293      roles.insert(
294        m.member.node.id().clone(),
295        m.member.tags().get("role").cloned().unwrap(),
296      );
297    }
298
299    if let Some(role) = roles.get(node.id()) {
300      if role == "lb" {
301        cond1 = true;
302      }
303    }
304
305    if let Some(role) = roles.get(serfs[0].local_id()) {
306      if role == "web" {
307        cond2 = true;
308      }
309    }
310
311    if cond1 && cond2 {
312      break;
313    }
314
315    if start.elapsed() > Duration::from_secs(7) {
316      panic!("timed out");
317    }
318  }
319}
320
321/// Unit test for serf state
322pub async fn serf_state<T>(transport_opts1: T::Options)
323where
324  T: Transport,
325{
326  let s1 = Serf::<T>::new(transport_opts1, test_config())
327    .await
328    .unwrap();
329
330  assert_eq!(s1.state(), SerfState::Alive);
331
332  s1.leave().await.unwrap();
333
334  assert_eq!(s1.state(), SerfState::Left);
335
336  s1.shutdown().await.unwrap();
337
338  assert_eq!(s1.state(), SerfState::Shutdown);
339}
340
341/// Unit tests for serf set tags
342pub async fn serf_set_tags<T>(transport_opts1: T::Options, transport_opts2: T::Options)
343where
344  T: Transport,
345{
346  let (event_tx, event_rx) = EventProducer::bounded(4);
347  let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
348    .await
349    .unwrap();
350  let s2 = Serf::<T>::new(transport_opts2, test_config())
351    .await
352    .unwrap();
353
354  let serfs = [s1, s2];
355
356  wait_until_num_nodes(1, &serfs).await;
357
358  let node = serfs[1]
359    .inner
360    .memberlist
361    .advertise_node()
362    .map_address(MaybeResolvedAddress::resolved);
363  serfs[0].join(node.clone(), false).await.unwrap();
364
365  wait_until_num_nodes(2, &serfs).await;
366
367  // Update the tags
368  serfs[0]
369    .set_tags([("port", "8080")].into_iter().collect())
370    .await
371    .unwrap();
372
373  serfs[1]
374    .set_tags([("datacenter", "east-aws")].into_iter().collect())
375    .await
376    .unwrap();
377
378  let start = Epoch::now();
379  let mut cond1 = false;
380  let mut cond2 = false;
381  let mut cond3 = false;
382  let mut cond4 = false;
383
384  loop {
385    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
386
387    let m1m = serfs[0].members().await;
388    let mut m1m_tags = HashMap::with_capacity(2);
389    for m in m1m {
390      m1m_tags.insert(m.node.id().clone(), m.tags.clone());
391    }
392
393    if m1m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
394      cond1 = true;
395    }
396
397    if m1m_tags
398      .get(serfs[1].local_id())
399      .map(|t| t.get("datacenter"))
400      == Some(Some(&"east-aws".into()))
401    {
402      cond2 = true;
403    }
404
405    let m2m = serfs[1].members().await;
406    let mut m2m_tags = HashMap::with_capacity(2);
407    for m in m2m {
408      m2m_tags.insert(m.node.id().clone(), m.tags.clone());
409    }
410
411    if m2m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
412      cond3 = true;
413    }
414
415    if m2m_tags
416      .get(serfs[1].local_id())
417      .map(|t| t.get("datacenter"))
418      == Some(Some(&"east-aws".into()))
419    {
420      cond4 = true;
421    }
422
423    if cond1 && cond2 && cond3 && cond4 {
424      break;
425    }
426
427    if start.elapsed() > Duration::from_secs(7) {
428      panic!("timed out");
429    }
430  }
431
432  // we check the events to make sure we got failures.
433  test_events(
434    event_rx.rx,
435    node.id().clone(),
436    [
437      CrateEventType::Member(MemberEventType::Join),
438      CrateEventType::Member(MemberEventType::Update),
439    ]
440    .into_iter()
441    .collect(),
442  )
443  .await;
444
445  for s in serfs.iter() {
446    s.shutdown().await.unwrap();
447  }
448}
449
450/// Unit tests for serf num nodes
451pub async fn serf_num_nodes<T>(transport_opts1: T::Options, transport_opts2: T::Options)
452where
453  T: Transport,
454{
455  let s1 = Serf::<T>::new(transport_opts1, test_config())
456    .await
457    .unwrap();
458  let s2 = Serf::<T>::new(transport_opts2, test_config())
459    .await
460    .unwrap();
461
462  assert_eq!(s1.num_members().await, 1);
463
464  let serfs = [s1, s2];
465  wait_until_num_nodes(1, &serfs).await;
466
467  let node = serfs[1]
468    .inner
469    .memberlist
470    .advertise_node()
471    .map_address(MaybeResolvedAddress::resolved);
472  serfs[0].join(node.clone(), false).await.unwrap();
473
474  wait_until_num_nodes(2, &serfs).await;
475}
476
477/// Unit tests for serf coordinates
478pub async fn serf_coordinates<T>(
479  transport_opts1: T::Options,
480  transport_opts2: T::Options,
481  transport_opts3: T::Options,
482) where
483  T: Transport,
484{
485  const PROBE_INTERVAL: Duration = Duration::from_millis(2);
486
487  let opts = test_config()
488    .with_disable_coordinates(false)
489    .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL));
490  let s1 = Serf::<T>::new(transport_opts1, opts.clone()).await.unwrap();
491  let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
492
493  let mut serfs = vec![s1, s2];
494  wait_until_num_nodes(1, &serfs).await;
495
496  // Make sure both nodes start out the origin so we can prove they did
497  // an update later.
498  let c1 = serfs[0].cooridate().unwrap();
499  let c2 = serfs[1].cooridate().unwrap();
500
501  const ZERO_THRESHOLD: f64 = 20.0e-6;
502
503  assert!(
504    c1.distance_to(&c2).as_secs_f64() <= ZERO_THRESHOLD,
505    "coordinates didn't start at the origin"
506  );
507
508  // Join the two nodes together and give them time to probe each other.
509  let node = serfs[1]
510    .inner
511    .memberlist
512    .advertise_node()
513    .map_address(MaybeResolvedAddress::resolved);
514  serfs[0].join(node.clone(), false).await.unwrap();
515
516  wait_until_num_nodes(2, &serfs).await;
517
518  let start = Epoch::now();
519  let mut cond1 = false;
520  let mut cond2 = false;
521  let mut cond3 = false;
522  let mut cond4 = false;
523  let s2id = serfs[1].local_id().clone();
524  let s1id = serfs[0].local_id().clone();
525  loop {
526    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
527
528    // See if they know about each other.
529
530    if serfs[0].cached_coordinate(&s2id.clone()).is_ok() {
531      cond1 = true;
532    } else if start.elapsed() > Duration::from_secs(7) {
533      panic!("s1 didn't get a coordinate for s2");
534    }
535
536    if serfs[1].cached_coordinate(&s1id.clone()).is_ok() {
537      cond2 = true;
538    } else if start.elapsed() > Duration::from_secs(7) {
539      panic!("s2 didn't get a coordinate for s1");
540    }
541
542    // With only one ping they won't have a good estimate of the other node's
543    // coordinate, but they should both have updated their own coordinate.
544    let c1 = serfs[0].cooridate().unwrap();
545    let c2 = serfs[1].cooridate().unwrap();
546
547    if c1.distance_to(&c2).as_secs_f64() >= ZERO_THRESHOLD {
548      cond3 = true;
549    } else if start.elapsed() > Duration::from_secs(7) {
550      panic!("coordinates didn't update after probes");
551    }
552
553    // Make sure they cached their own current coordinate after the update.
554    let c1c = serfs[0].cached_coordinate(&s1id.clone()).unwrap();
555    match c1c {
556      None => {
557        if start.elapsed() > Duration::from_secs(7) {
558          panic!("s1 didn't cache its own coordinate");
559        }
560      }
561      Some(c1c) => {
562        if c1 == c1c {
563          cond4 = true;
564        } else if start.elapsed() > Duration::from_secs(7) {
565          panic!("s1 coordinates are not equal");
566        }
567      }
568    }
569
570    if cond1 && cond2 && cond3 && cond4 {
571      break;
572    }
573
574    if start.elapsed() > Duration::from_secs(7) {
575      panic!(
576        "timed out cond1 {} cond2 {} cond3 {} cond4 {}",
577        cond1, cond2, cond3, cond4
578      );
579    }
580  }
581
582  // Break up the cluster and make sure the coordinates get removed by
583  // the reaper.
584  serfs[1].shutdown().await.unwrap();
585  let t = serfs[1].inner.opts.reap_interval * 4;
586  drop(serfs.pop().unwrap());
587
588  <T::Runtime as RuntimeLite>::sleep(t).await;
589
590  wait_until_num_nodes(1, &serfs).await;
591
592  let start = Epoch::now();
593  loop {
594    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
595
596    if serfs[0].cached_coordinate(&s2id.clone()).unwrap().is_none() {
597      break;
598    }
599
600    if start.elapsed() > Duration::from_secs(7) {
601      panic!("s1 should have removed s2's cached coordinate");
602    }
603  }
604
605  // Try a setup with coordinates disabled.
606  let s3 = Serf::<T>::new(
607    transport_opts3,
608    test_config()
609      .with_disable_coordinates(true)
610      .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
611  )
612  .await
613  .unwrap();
614
615  serfs.push(s3);
616  wait_until_num_nodes(1, &serfs).await;
617
618  let node = serfs[0]
619    .inner
620    .memberlist
621    .advertise_node()
622    .map_address(MaybeResolvedAddress::resolved);
623  serfs[1].join(node.clone(), false).await.unwrap();
624
625  wait_until_num_nodes(2, &serfs).await;
626
627  let start = Epoch::now();
628  let mut cond1 = false;
629  let mut cond2 = false;
630  loop {
631    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
632
633    // See if they know about each other.
634
635    if let Err(e) = serfs[1].cooridate() {
636      if e.to_string().contains("coordinates are disabled") {
637        cond1 = true;
638      }
639    }
640
641    if serfs[1].cached_coordinate(&s1id.clone()).is_err() {
642      cond2 = true;
643    }
644
645    if cond1 && cond2 {
646      break;
647    }
648
649    if start.elapsed() > Duration::from_secs(14) {
650      panic!("timed out: cond1 {} cond2 {}", cond1, cond2);
651    }
652  }
653
654  for s in serfs.iter() {
655    s.shutdown().await.unwrap();
656  }
657}
658
659/// Unit tests for serf name resolution
660///
661/// set_id is a function that takes the transport options and the id of the node, and returns the
662/// transport options with the id set to the given id.
663pub async fn serf_name_resolution<T>(
664  transport_opts1: T::Options,
665  transport_opts2: T::Options,
666  transport_opts3: T::Options,
667  set_id: impl FnOnce(T::Options, T::Id) -> T::Options,
668) where
669  T: Transport,
670{
671  let s1 = Serf::<T>::new(transport_opts1, test_config())
672    .await
673    .unwrap();
674  let s2 = Serf::<T>::new(transport_opts2, test_config())
675    .await
676    .unwrap();
677  let s3 = Serf::<T>::new(
678    set_id(transport_opts3, s1.local_id().clone()),
679    test_config(),
680  )
681  .await
682  .unwrap();
683
684  let serfs = [s1, s2, s3];
685  wait_until_num_nodes(1, &serfs).await;
686
687  // Join s1 to s2 first. s2 should vote for s1 in conflict
688  let node = serfs[1]
689    .inner
690    .memberlist
691    .advertise_node()
692    .map_address(MaybeResolvedAddress::resolved);
693  serfs[0].join(node.clone(), false).await.unwrap();
694
695  wait_until_num_nodes(2, &serfs[..2]).await;
696  wait_until_num_nodes(1, &serfs[2..]).await;
697
698  let node = serfs[2]
699    .inner
700    .memberlist
701    .advertise_node()
702    .map_address(MaybeResolvedAddress::resolved);
703  serfs[0].join(node.clone(), false).await.unwrap();
704
705  // Wait for the query period to end
706  <T::Runtime as RuntimeLite>::sleep(serfs[0].default_query_timeout().await * 30).await;
707
708  let start = Epoch::now();
709  let mut cond1 = false;
710  let mut cond2 = false;
711  let mut cond3 = false;
712  loop {
713    // s3 should have shutdown, while s1 is running
714    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
715
716    if serfs[0].state() == SerfState::Alive {
717      cond1 = true;
718    }
719
720    if serfs[1].state() == SerfState::Alive {
721      cond2 = true;
722    }
723
724    if serfs[2].state() == SerfState::Shutdown {
725      cond3 = true;
726    }
727
728    if cond1 && cond2 && cond3 {
729      break;
730    }
731
732    if start.elapsed() > Duration::from_secs(14) {
733      println!("cond1 {cond1} cond2 {cond2} cond3 {cond3}");
734      panic!("timed out");
735    }
736  }
737
738  for s in serfs.iter() {
739    s.shutdown().await.unwrap();
740  }
741}
742
743/// Unit test for serf local member
744pub async fn serf_local_member<T>(opts: T::Options)
745where
746  T: Transport,
747{
748  let s = Serf::<T>::new(opts, test_config()).await.unwrap();
749
750  let local = s.local_member().await;
751  assert_eq!(local.node.id(), s.local_id());
752
753  assert_eq!(local.tags, s.inner.opts.tags());
754  assert_eq!(local.status, MemberStatus::Alive);
755
756  let new_tags = [("foo", "bar"), ("test", "ing")]
757    .into_iter()
758    .collect::<Tags>();
759  s.set_tags(new_tags.clone()).await.unwrap();
760
761  let local = s.local_member().await;
762  assert_eq!(&*local.tags, &new_tags);
763}
764
765/// Unit test for serf stats
766pub async fn serf_stats<T>(opts: T::Options)
767where
768  T: Transport,
769{
770  let s = Serf::<T>::new(opts, test_config()).await.unwrap();
771
772  let stats = s.stats().await;
773  assert_eq!(stats.get_event_queue(), 0);
774  assert_eq!(stats.get_event_time(), 1);
775  assert_eq!(stats.get_failed(), 0);
776  assert_eq!(stats.get_intent_queue(), 0);
777  assert_eq!(stats.get_left(), 0);
778  assert_eq!(stats.get_health_score(), 0);
779  assert_eq!(stats.get_member_time(), 1);
780  assert_eq!(stats.get_members(), 1);
781  assert!(!stats.get_encrypted());
782}
783
784/// Unit test for serf write keying file
785#[cfg(feature = "encryption")]
786pub async fn serf_write_keyring_file<T>(
787  get_transport_opts: impl FnOnce(memberlist_core::proto::SecretKey) -> (T::Options, MemberlistOptions),
788) where
789  T: Transport,
790{
791  use std::io::Read;
792
793  const EXISTING: &str = "T9jncgl9mbLus+baTTa7q7nPSUrXwbDi2dhbtqir37s=";
794  const NEW_KEY: &str = "HvY8ubRZMgafUOWvrOadwOckVa1wN3QWAo46FVKbVN8=";
795
796  let td = tempfile::tempdir().unwrap();
797  let mut p = td.path().join("serf_write_keying_file");
798  p.set_extension("json");
799
800  let sk = crate::types::SecretKey::try_from(EXISTING).unwrap();
801
802  let (topts, mopts) = get_transport_opts(sk);
803  let serf = Serf::<T>::new(
804    topts,
805    test_config()
806      .with_keyring_file(Some(p.clone()))
807      .with_memberlist_options(mopts),
808  )
809  .await
810  .unwrap();
811  assert!(
812    serf.encryption_enabled(),
813    "write keyring file test only works on encrypted serf"
814  );
815
816  let manager = serf.key_manager();
817  let new_sk = crate::types::SecretKey::try_from(NEW_KEY).unwrap();
818  manager.install_key(new_sk, None).await.unwrap();
819
820  let mut keyring_file = std::fs::File::open(&p).unwrap();
821  let mut s = String::new();
822  keyring_file.read_to_string(&mut s).unwrap();
823
824  let lines = s.split('\n').collect::<Vec<_>>();
825  assert_eq!(lines.len(), 4);
826
827  // Ensure both the original key and the new key are present in the file
828  assert!(s.contains(EXISTING));
829  assert!(s.contains(NEW_KEY));
830
831  // Ensure the existing key remains primary. This is in position 1 because
832  // the file writer will use json.MarshalIndent(), leaving the first line as
833  // the opening bracket.
834  assert!(lines[1].contains(EXISTING));
835
836  // Swap primary keys
837  manager.use_key(new_sk, None).await.unwrap();
838
839  let mut keyring_file = std::fs::File::open(&p).unwrap();
840  let mut s = String::new();
841  keyring_file.read_to_string(&mut s).unwrap();
842
843  let lines = s.split('\n').collect::<Vec<_>>();
844  assert_eq!(lines.len(), 4);
845
846  // Key order should have changed in keyring file
847  assert!(lines[1].contains(NEW_KEY));
848
849  // Remove the old key
850  manager.remove_key(sk, None).await.unwrap();
851
852  let mut keyring_file = std::fs::File::open(&p).unwrap();
853  let mut s = String::new();
854  keyring_file.read_to_string(&mut s).unwrap();
855
856  let lines = s.split('\n').collect::<Vec<_>>();
857  // Only the new key should now be present in the keyring file
858  assert_eq!(lines.len(), 3);
859
860  assert!(lines[1].contains(NEW_KEY));
861
862  let resp = manager.list_keys().await.unwrap();
863  assert_eq!(resp.primary_keys().len(), 1);
864  assert_eq!(resp.keys().len(), 1);
865}
866
867#[test]
868fn test_recent_intent() {
869  assert!(recent_intent::<SmolStr>(&HashMap::new(), &"foo".into(), MessageType::Join).is_none());
870
871  let now = Epoch::now();
872  let expire = || now - Duration::from_secs(2);
873  let save = || now;
874
875  let mut intents = HashMap::<SmolStr, _>::new();
876  assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
877
878  assert!(upsert_intent(
879    &mut intents,
880    &"foo".into(),
881    MessageType::Join,
882    1.into(),
883    expire
884  ));
885  assert!(upsert_intent(
886    &mut intents,
887    &"bar".into(),
888    MessageType::Leave,
889    2.into(),
890    expire
891  ));
892  assert!(upsert_intent(
893    &mut intents,
894    &"baz".into(),
895    MessageType::Join,
896    3.into(),
897    save
898  ));
899  assert!(upsert_intent(
900    &mut intents,
901    &"bar".into(),
902    MessageType::Join,
903    4.into(),
904    expire
905  ));
906  assert!(!upsert_intent(
907    &mut intents,
908    &"bar".into(),
909    MessageType::Join,
910    0.into(),
911    expire
912  ));
913  assert!(upsert_intent(
914    &mut intents,
915    &"bar".into(),
916    MessageType::Join,
917    5.into(),
918    expire
919  ));
920
921  let ltime = recent_intent(&intents, &"foo".into(), MessageType::Join).unwrap();
922  assert_eq!(ltime, 1.into());
923
924  let ltime = recent_intent(&intents, &"bar".into(), MessageType::Join).unwrap();
925  assert_eq!(ltime, 5.into());
926
927  let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
928  assert_eq!(ltime, 3.into());
929
930  assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
931
932  reap_intents(&mut intents, Epoch::now(), Duration::from_secs(1));
933  assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
934  assert!(recent_intent(&intents, &"bar".into(), MessageType::Join).is_none());
935  let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
936  assert_eq!(ltime, 3.into());
937  assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
938  reap_intents(
939    &mut intents,
940    Epoch::now() + Duration::from_secs(2),
941    Duration::from_secs(1),
942  );
943  assert!(recent_intent(&intents, &"baz".into(), MessageType::Join).is_none());
944}