Skip to main content

serf_core/serf/base/tests/
serf.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use memberlist_core::{tests::AnyError, transport::Id};
4
5use crate::{
6  event::EventProducer,
7  options::MemberlistOptions,
8  types::{Member, MemberState, MemberStatus, Tags},
9};
10
11use super::*;
12
13/// Unit tests for the serf events related functionalities
14pub mod event;
15
16/// Unit tests for the serf leave related functionalities
17pub mod leave;
18
19/// Unit tests for the serf join related functionalities
20pub mod join;
21
22/// Unit tests for the serf ping delegate related functionalities
23pub mod delegate;
24
25/// Unit tests for the serf reconnect related functionalities
26pub mod reconnect;
27
28/// Unit tests for the serf remove related functionalities
29pub mod remove;
30
31/// Unit tests for serf reap related functionalities
32pub mod reap;
33
34/// Unit tests for the serf snapshot related functionalities
35pub mod snapshot;
36
37fn test_member_status<I: Id, A>(
38  members: &HashMap<I, MemberState<I, A>>,
39  id: I,
40  status: MemberStatus,
41) -> Result<(), AnyError> {
42  for member in members.values() {
43    if id.eq(member.member.node.id()) {
44      if member.member.status != status {
45        return Err(AnyError::from(format!(
46          "expected member {} to have status {:?}, got {:?}",
47          id, status, member.member.status
48        )));
49      }
50      return Ok(());
51    }
52  }
53  Err(AnyError::from(format!("member {} not found", id)))
54}
55
56/// Unit tests for the get queue max
57pub async fn serf_get_queue_max<T>(
58  transport_opts: T::Options,
59  mut get_addr: impl FnMut(usize) -> T::ResolvedAddress,
60) where
61  T: Transport<Id = SmolStr>,
62  T::Options: Clone,
63{
64  let s = Serf::<T>::new(transport_opts.clone(), test_config())
65    .await
66    .unwrap();
67
68  // We don't need a running Serf so fake it out with the required
69  // state.
70  {
71    let mut members = s.inner.members.write().await;
72    members.states.clear();
73    for i in 0..100 {
74      let name: SmolStr = format!("Member{i}").into();
75      members.states.insert(
76        name.clone(),
77        MemberState {
78          member: Member::new(
79            Node::new(name.clone(), get_addr(i)),
80            Default::default(),
81            MemberStatus::Alive,
82          ),
83          status_time: 0.into(),
84          leave_time: None,
85        },
86      );
87    }
88  }
89
90  // Default mode just uses the max depth.
91  let got = s.get_queue_max().await;
92  let want = 4096;
93  assert_eq!(got, want);
94
95  // Now configure a min which should take precedence.
96  s.shutdown().await.unwrap();
97  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
98
99  let sn = Serf::<T>::new(
100    transport_opts.clone(),
101    test_config().with_min_queue_depth(1024),
102  )
103  .await
104  .unwrap();
105
106  {
107    let mut members = sn.inner.members.write().await;
108    members.states.clear();
109    let old_members = s.inner.members.read().await;
110    members.states.clone_from(&old_members.states);
111  }
112
113  let got = sn.get_queue_max().await;
114  let want = 1024;
115  assert_eq!(got, want);
116
117  sn.shutdown().await.unwrap();
118  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
119
120  // Bring it under the number of nodes, so the calculation based on
121  // the number of nodes takes precedence.
122  let snn = Serf::<T>::new(transport_opts, test_config().with_min_queue_depth(16))
123    .await
124    .unwrap();
125
126  {
127    let mut members = snn.inner.members.write().await;
128    members.states.clear();
129    let old_members = sn.inner.members.read().await;
130    members.states.clone_from(&old_members.states);
131  }
132
133  let got = snn.get_queue_max().await;
134  let want = 200;
135  assert_eq!(got, want);
136
137  // Try adjusting the node count.
138  {
139    let mut members = snn.inner.members.write().await;
140    let name = SmolStr::new("another");
141    members.states.insert(
142      name.clone(),
143      MemberState {
144        member: Member::new(
145          Node::new(name.clone(), get_addr(10000)),
146          Default::default(),
147          MemberStatus::Alive,
148        ),
149        status_time: 0.into(),
150        leave_time: None,
151      },
152    );
153  }
154
155  let got = snn.get_queue_max().await;
156  let want = 202;
157  assert_eq!(got, want);
158  snn.shutdown().await.unwrap();
159  drop(snn);
160}
161
162/// Unit tests for the update
163pub async fn serf_update<T, F>(
164  transport_opts1: T::Options,
165  transport_opts2: T::Options,
166  get_transport: impl FnOnce(T::Id, T::ResolvedAddress) -> F + Copy,
167) where
168  T: Transport,
169  T::Options: Clone,
170  F: core::future::Future<Output = T::Options>,
171{
172  let (event_tx, event_rx) = EventProducer::bounded(64);
173  let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
174    .await
175    .unwrap();
176  let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
177    .await
178    .unwrap();
179  let (s2id, s2addr) = s2.advertise_node().into_components();
180
181  let mut serfs = vec![s1, s2];
182  wait_until_num_nodes(1, &serfs).await;
183
184  let node = serfs[1]
185    .inner
186    .memberlist
187    .advertise_node()
188    .map_address(MaybeResolvedAddress::resolved);
189  serfs[0].join(node.address().clone(), false).await.unwrap();
190
191  wait_until_num_nodes(2, &serfs).await;
192  // Now force the shutdown of s2 so it appears to fail.
193  serfs[1].shutdown().await.unwrap();
194  drop(serfs.pop().unwrap());
195
196  // Don't wait for a failure to be detected. Bring back s2 immediately
197  let start = Epoch::now();
198  let s2 = loop {
199    match Serf::<T>::new(
200      get_transport(s2id.clone(), s2addr.clone()).await,
201      test_config().with_tags([("foo", "bar")].into_iter()),
202    )
203    .await
204    {
205      Ok(s) => break s,
206      Err(e) => {
207        <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(1)).await;
208        if start.elapsed() > Duration::from_secs(20) {
209          panic!("timed out: {}", e);
210        }
211      }
212    }
213  };
214
215  let s1node = serfs[0].advertise_node();
216  s2.join(
217    s1node
218      .map_address(MaybeResolvedAddress::resolved)
219      .address()
220      .clone(),
221    false,
222  )
223  .await
224  .unwrap();
225  serfs.push(s2);
226  wait_until_num_nodes(2, &serfs).await;
227
228  test_events(
229    event_rx.rx,
230    node.id().clone(),
231    [
232      CrateEventType::Member(MemberEventType::Join),
233      CrateEventType::Member(MemberEventType::Update),
234    ]
235    .into_iter()
236    .collect(),
237  )
238  .await;
239
240  // Verify that the member data got updated.
241  let mut found = false;
242  let members = serfs[0].inner.members.read().await;
243
244  for member in members.states.values() {
245    if member.member.node.id().eq(node.id())
246      && member.member.tags().get("foo").map(|v| v.as_str()) == Some("bar")
247    {
248      found = true;
249      break;
250    }
251  }
252  assert!(found, "did not found s2 in members");
253
254  for s in serfs.iter() {
255    s.shutdown().await.unwrap();
256  }
257}
258
259/// Unit tests for the role
260pub async fn serf_role<T>(transport_opts1: T::Options, transport_opts2: T::Options)
261where
262  T: Transport,
263{
264  let s1 = Serf::<T>::new(
265    transport_opts1,
266    test_config().with_tags([("role", "web")].into_iter()),
267  )
268  .await
269  .unwrap();
270  let s2 = Serf::<T>::new(
271    transport_opts2,
272    test_config().with_tags([("role", "lb")].into_iter()),
273  )
274  .await
275  .unwrap();
276
277  let serfs = [s1, s2];
278  wait_until_num_nodes(1, &serfs).await;
279
280  let node = serfs[1]
281    .inner
282    .memberlist
283    .advertise_node()
284    .map_address(MaybeResolvedAddress::resolved);
285  serfs[0].join(node.address().clone(), false).await.unwrap();
286
287  wait_until_num_nodes(2, &serfs).await;
288
289  let mut roles = HashMap::new();
290
291  let start = Epoch::now();
292  let mut cond1 = false;
293  let mut cond2 = false;
294  loop {
295    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
296
297    let members = serfs[0].inner.members.read().await;
298    for m in members.states.values() {
299      roles.insert(
300        m.member.node.id().clone(),
301        m.member.tags().get("role").cloned().unwrap(),
302      );
303    }
304
305    if let Some(role) = roles.get(node.id()) {
306      if role == "lb" {
307        cond1 = true;
308      }
309    }
310
311    if let Some(role) = roles.get(serfs[0].local_id()) {
312      if role == "web" {
313        cond2 = true;
314      }
315    }
316
317    if cond1 && cond2 {
318      break;
319    }
320
321    if start.elapsed() > Duration::from_secs(7) {
322      panic!("timed out");
323    }
324  }
325}
326
327/// Unit test for serf state
328pub async fn serf_state<T>(transport_opts1: T::Options)
329where
330  T: Transport,
331{
332  let s1 = Serf::<T>::new(transport_opts1, test_config())
333    .await
334    .unwrap();
335
336  assert_eq!(s1.state(), SerfState::Alive);
337
338  s1.leave().await.unwrap();
339
340  assert_eq!(s1.state(), SerfState::Left);
341
342  s1.shutdown().await.unwrap();
343
344  assert_eq!(s1.state(), SerfState::Shutdown);
345}
346
347/// Unit tests for serf set tags
348pub async fn serf_set_tags<T>(transport_opts1: T::Options, transport_opts2: T::Options)
349where
350  T: Transport,
351{
352  let (event_tx, event_rx) = EventProducer::bounded(4);
353  let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
354    .await
355    .unwrap();
356  let s2 = Serf::<T>::new(transport_opts2, test_config())
357    .await
358    .unwrap();
359
360  let serfs = [s1, s2];
361
362  wait_until_num_nodes(1, &serfs).await;
363
364  let node = serfs[1]
365    .inner
366    .memberlist
367    .advertise_node()
368    .map_address(MaybeResolvedAddress::resolved);
369  serfs[0].join(node.address().clone(), false).await.unwrap();
370
371  wait_until_num_nodes(2, &serfs).await;
372
373  // Update the tags
374  serfs[0]
375    .set_tags([("port", "8080")].into_iter().collect())
376    .await
377    .unwrap();
378
379  serfs[1]
380    .set_tags([("datacenter", "east-aws")].into_iter().collect())
381    .await
382    .unwrap();
383
384  let start = Epoch::now();
385  let mut cond1 = false;
386  let mut cond2 = false;
387  let mut cond3 = false;
388  let mut cond4 = false;
389
390  loop {
391    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
392
393    let m1m = serfs[0].members().await;
394    let mut m1m_tags = HashMap::with_capacity(2);
395    for m in m1m {
396      m1m_tags.insert(m.node.id().clone(), m.tags.clone());
397    }
398
399    if m1m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
400      cond1 = true;
401    }
402
403    if m1m_tags
404      .get(serfs[1].local_id())
405      .map(|t| t.get("datacenter"))
406      == Some(Some(&"east-aws".into()))
407    {
408      cond2 = true;
409    }
410
411    let m2m = serfs[1].members().await;
412    let mut m2m_tags = HashMap::with_capacity(2);
413    for m in m2m {
414      m2m_tags.insert(m.node.id().clone(), m.tags.clone());
415    }
416
417    if m2m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
418      cond3 = true;
419    }
420
421    if m2m_tags
422      .get(serfs[1].local_id())
423      .map(|t| t.get("datacenter"))
424      == Some(Some(&"east-aws".into()))
425    {
426      cond4 = true;
427    }
428
429    if cond1 && cond2 && cond3 && cond4 {
430      break;
431    }
432
433    if start.elapsed() > Duration::from_secs(7) {
434      panic!("timed out");
435    }
436  }
437
438  // we check the events to make sure we got failures.
439  test_events(
440    event_rx.rx,
441    node.id().clone(),
442    [
443      CrateEventType::Member(MemberEventType::Join),
444      CrateEventType::Member(MemberEventType::Update),
445    ]
446    .into_iter()
447    .collect(),
448  )
449  .await;
450
451  for s in serfs.iter() {
452    s.shutdown().await.unwrap();
453  }
454}
455
456/// Unit tests for serf num nodes
457pub async fn serf_num_nodes<T>(transport_opts1: T::Options, transport_opts2: T::Options)
458where
459  T: Transport,
460{
461  let s1 = Serf::<T>::new(transport_opts1, test_config())
462    .await
463    .unwrap();
464  let s2 = Serf::<T>::new(transport_opts2, test_config())
465    .await
466    .unwrap();
467
468  assert_eq!(s1.num_members().await, 1);
469
470  let serfs = [s1, s2];
471  wait_until_num_nodes(1, &serfs).await;
472
473  let node = serfs[1]
474    .inner
475    .memberlist
476    .advertise_node()
477    .map_address(MaybeResolvedAddress::resolved);
478  serfs[0].join(node.address().clone(), false).await.unwrap();
479
480  wait_until_num_nodes(2, &serfs).await;
481}
482
483/// Unit tests for serf coordinates
484pub async fn serf_coordinates<T>(
485  transport_opts1: T::Options,
486  transport_opts2: T::Options,
487  transport_opts3: T::Options,
488) where
489  T: Transport,
490{
491  const PROBE_INTERVAL: Duration = Duration::from_millis(2);
492
493  let opts = test_config()
494    .with_disable_coordinates(false)
495    .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL));
496  let s1 = Serf::<T>::new(transport_opts1, opts.clone()).await.unwrap();
497  let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
498
499  let mut serfs = vec![s1, s2];
500  wait_until_num_nodes(1, &serfs).await;
501
502  // Make sure both nodes start out the origin so we can prove they did
503  // an update later.
504  let c1 = serfs[0].cooridate().unwrap();
505  let c2 = serfs[1].cooridate().unwrap();
506
507  const ZERO_THRESHOLD: f64 = 20.0e-6;
508
509  assert!(
510    c1.distance_to(&c2).as_secs_f64() <= ZERO_THRESHOLD,
511    "coordinates didn't start at the origin"
512  );
513
514  // Join the two nodes together and give them time to probe each other.
515  let node = serfs[1]
516    .inner
517    .memberlist
518    .advertise_node()
519    .map_address(MaybeResolvedAddress::resolved);
520  serfs[0].join(node.address().clone(), false).await.unwrap();
521
522  wait_until_num_nodes(2, &serfs).await;
523
524  let start = Epoch::now();
525  let mut cond1 = false;
526  let mut cond2 = false;
527  let mut cond3 = false;
528  let mut cond4 = false;
529  let s2id = serfs[1].local_id().clone();
530  let s1id = serfs[0].local_id().clone();
531  loop {
532    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
533
534    // See if they know about each other.
535
536    if serfs[0].cached_coordinate(&s2id.clone()).is_ok() {
537      cond1 = true;
538    } else if start.elapsed() > Duration::from_secs(7) {
539      panic!("s1 didn't get a coordinate for s2");
540    }
541
542    if serfs[1].cached_coordinate(&s1id.clone()).is_ok() {
543      cond2 = true;
544    } else if start.elapsed() > Duration::from_secs(7) {
545      panic!("s2 didn't get a coordinate for s1");
546    }
547
548    // With only one ping they won't have a good estimate of the other node's
549    // coordinate, but they should both have updated their own coordinate.
550    let c1 = serfs[0].cooridate().unwrap();
551    let c2 = serfs[1].cooridate().unwrap();
552
553    if c1.distance_to(&c2).as_secs_f64() >= ZERO_THRESHOLD {
554      cond3 = true;
555    } else if start.elapsed() > Duration::from_secs(7) {
556      panic!("coordinates didn't update after probes");
557    }
558
559    // Make sure they cached their own current coordinate after the update.
560    let c1c = serfs[0].cached_coordinate(&s1id.clone()).unwrap();
561    match c1c {
562      None => {
563        if start.elapsed() > Duration::from_secs(7) {
564          panic!("s1 didn't cache its own coordinate");
565        }
566      }
567      Some(c1c) => {
568        if c1 == c1c {
569          cond4 = true;
570        } else if start.elapsed() > Duration::from_secs(7) {
571          panic!("s1 coordinates are not equal");
572        }
573      }
574    }
575
576    if cond1 && cond2 && cond3 && cond4 {
577      break;
578    }
579
580    if start.elapsed() > Duration::from_secs(7) {
581      panic!(
582        "timed out cond1 {} cond2 {} cond3 {} cond4 {}",
583        cond1, cond2, cond3, cond4
584      );
585    }
586  }
587
588  // Break up the cluster and make sure the coordinates get removed by
589  // the reaper.
590  serfs[1].shutdown().await.unwrap();
591  let t = serfs[1].inner.opts.reap_interval * 4;
592  drop(serfs.pop().unwrap());
593
594  <T::Runtime as RuntimeLite>::sleep(t).await;
595
596  wait_until_num_nodes(1, &serfs).await;
597
598  let start = Epoch::now();
599  loop {
600    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
601
602    if serfs[0].cached_coordinate(&s2id.clone()).unwrap().is_none() {
603      break;
604    }
605
606    if start.elapsed() > Duration::from_secs(7) {
607      panic!("s1 should have removed s2's cached coordinate");
608    }
609  }
610
611  // Try a setup with coordinates disabled.
612  let s3 = Serf::<T>::new(
613    transport_opts3,
614    test_config()
615      .with_disable_coordinates(true)
616      .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
617  )
618  .await
619  .unwrap();
620
621  serfs.push(s3);
622  wait_until_num_nodes(1, &serfs).await;
623
624  let node = serfs[0]
625    .inner
626    .memberlist
627    .advertise_node()
628    .map_address(MaybeResolvedAddress::resolved);
629  serfs[1].join(node.address().clone(), false).await.unwrap();
630
631  wait_until_num_nodes(2, &serfs).await;
632
633  let start = Epoch::now();
634  let mut cond1 = false;
635  let mut cond2 = false;
636  loop {
637    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
638
639    // See if they know about each other.
640
641    if let Err(e) = serfs[1].cooridate() {
642      if e.to_string().contains("coordinates are disabled") {
643        cond1 = true;
644      }
645    }
646
647    if serfs[1].cached_coordinate(&s1id.clone()).is_err() {
648      cond2 = true;
649    }
650
651    if cond1 && cond2 {
652      break;
653    }
654
655    if start.elapsed() > Duration::from_secs(14) {
656      panic!("timed out: cond1 {} cond2 {}", cond1, cond2);
657    }
658  }
659
660  for s in serfs.iter() {
661    s.shutdown().await.unwrap();
662  }
663}
664
665/// Unit tests for serf name resolution
666///
667/// set_id is a function that takes the transport options and the id of the node, and returns the
668/// transport options with the id set to the given id.
669pub async fn serf_name_resolution<T>(
670  transport_opts1: T::Options,
671  transport_opts2: T::Options,
672  transport_opts3: T::Options,
673  set_id: impl FnOnce(T::Options, T::Id) -> T::Options,
674) where
675  T: Transport,
676{
677  let s1 = Serf::<T>::new(transport_opts1, test_config())
678    .await
679    .unwrap();
680  let s2 = Serf::<T>::new(transport_opts2, test_config())
681    .await
682    .unwrap();
683  let s3 = Serf::<T>::new(
684    set_id(transport_opts3, s1.local_id().clone()),
685    test_config(),
686  )
687  .await
688  .unwrap();
689
690  let serfs = [s1, s2, s3];
691  wait_until_num_nodes(1, &serfs).await;
692
693  // Join s1 to s2 first. s2 should vote for s1 in conflict
694  let node = serfs[1]
695    .inner
696    .memberlist
697    .advertise_node()
698    .map_address(MaybeResolvedAddress::resolved);
699  serfs[0].join(node.address().clone(), false).await.unwrap();
700
701  wait_until_num_nodes(2, &serfs[..2]).await;
702  wait_until_num_nodes(1, &serfs[2..]).await;
703
704  let node = serfs[2]
705    .inner
706    .memberlist
707    .advertise_node()
708    .map_address(MaybeResolvedAddress::resolved);
709  serfs[0].join(node.address().clone(), false).await.unwrap();
710
711  // Wait for the query period to end
712  <T::Runtime as RuntimeLite>::sleep(serfs[0].default_query_timeout().await * 30).await;
713
714  let start = Epoch::now();
715  let mut cond1 = false;
716  let mut cond2 = false;
717  let mut cond3 = false;
718  loop {
719    // s3 should have shutdown, while s1 is running
720    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
721
722    if serfs[0].state() == SerfState::Alive {
723      cond1 = true;
724    }
725
726    if serfs[1].state() == SerfState::Alive {
727      cond2 = true;
728    }
729
730    if serfs[2].state() == SerfState::Shutdown {
731      cond3 = true;
732    }
733
734    if cond1 && cond2 && cond3 {
735      break;
736    }
737
738    if start.elapsed() > Duration::from_secs(14) {
739      println!("cond1 {cond1} cond2 {cond2} cond3 {cond3}");
740      panic!("timed out");
741    }
742  }
743
744  for s in serfs.iter() {
745    s.shutdown().await.unwrap();
746  }
747}
748
749/// Unit test for serf local member
750pub async fn serf_local_member<T>(opts: T::Options)
751where
752  T: Transport,
753{
754  let s = Serf::<T>::new(opts, test_config()).await.unwrap();
755
756  let local = s.local_member().await;
757  assert_eq!(local.node.id(), s.local_id());
758
759  assert_eq!(local.tags, s.inner.opts.tags());
760  assert_eq!(local.status, MemberStatus::Alive);
761
762  let new_tags = [("foo", "bar"), ("test", "ing")]
763    .into_iter()
764    .collect::<Tags>();
765  s.set_tags(new_tags.clone()).await.unwrap();
766
767  let local = s.local_member().await;
768  assert_eq!(&*local.tags, &new_tags);
769}
770
771/// Unit test for serf stats
772pub async fn serf_stats<T>(opts: T::Options)
773where
774  T: Transport,
775{
776  let s = Serf::<T>::new(opts, test_config()).await.unwrap();
777
778  let stats = s.stats().await;
779  assert_eq!(stats.get_event_queue(), 0);
780  assert_eq!(stats.get_event_time(), 1);
781  assert_eq!(stats.get_failed(), 0);
782  assert_eq!(stats.get_intent_queue(), 0);
783  assert_eq!(stats.get_left(), 0);
784  assert_eq!(stats.get_health_score(), 0);
785  assert_eq!(stats.get_member_time(), 1);
786  assert_eq!(stats.get_members(), 1);
787  assert!(!stats.get_encrypted());
788}
789
790/// Unit test for serf write keying file
791#[cfg(feature = "encryption")]
792pub async fn serf_write_keyring_file<T>(
793  get_transport_opts: impl FnOnce(memberlist_core::proto::SecretKey) -> (T::Options, MemberlistOptions),
794) where
795  T: Transport,
796{
797  use std::io::Read;
798
799  const EXISTING: &str = "T9jncgl9mbLus+baTTa7q7nPSUrXwbDi2dhbtqir37s=";
800  const NEW_KEY: &str = "HvY8ubRZMgafUOWvrOadwOckVa1wN3QWAo46FVKbVN8=";
801
802  let td = tempfile::tempdir().unwrap();
803  let mut p = td.path().join("serf_write_keying_file");
804  p.set_extension("json");
805
806  let sk = crate::types::SecretKey::try_from(EXISTING).unwrap();
807
808  let (topts, mopts) = get_transport_opts(sk);
809  let serf = Serf::<T>::new(
810    topts,
811    test_config()
812      .with_keyring_file(Some(p.clone()))
813      .with_memberlist_options(mopts),
814  )
815  .await
816  .unwrap();
817  assert!(
818    serf.encryption_enabled(),
819    "write keyring file test only works on encrypted serf"
820  );
821
822  let manager = serf.key_manager();
823  let new_sk = crate::types::SecretKey::try_from(NEW_KEY).unwrap();
824  manager.install_key(new_sk, None).await.unwrap();
825
826  let mut keyring_file = std::fs::File::open(&p).unwrap();
827  let mut s = String::new();
828  keyring_file.read_to_string(&mut s).unwrap();
829
830  let lines = s.split('\n').collect::<Vec<_>>();
831  assert_eq!(lines.len(), 4);
832
833  // Ensure both the original key and the new key are present in the file
834  assert!(s.contains(EXISTING));
835  assert!(s.contains(NEW_KEY));
836
837  // Ensure the existing key remains primary. This is in position 1 because
838  // the file writer will use json.MarshalIndent(), leaving the first line as
839  // the opening bracket.
840  assert!(lines[1].contains(EXISTING));
841
842  // Swap primary keys
843  manager.use_key(new_sk, None).await.unwrap();
844
845  let mut keyring_file = std::fs::File::open(&p).unwrap();
846  let mut s = String::new();
847  keyring_file.read_to_string(&mut s).unwrap();
848
849  let lines = s.split('\n').collect::<Vec<_>>();
850  assert_eq!(lines.len(), 4);
851
852  // Key order should have changed in keyring file
853  assert!(lines[1].contains(NEW_KEY));
854
855  // Remove the old key
856  manager.remove_key(sk, None).await.unwrap();
857
858  let mut keyring_file = std::fs::File::open(&p).unwrap();
859  let mut s = String::new();
860  keyring_file.read_to_string(&mut s).unwrap();
861
862  let lines = s.split('\n').collect::<Vec<_>>();
863  // Only the new key should now be present in the keyring file
864  assert_eq!(lines.len(), 3);
865
866  assert!(lines[1].contains(NEW_KEY));
867
868  let resp = manager.list_keys().await.unwrap();
869  assert_eq!(resp.primary_keys().len(), 1);
870  assert_eq!(resp.keys().len(), 1);
871}
872
873#[test]
874fn test_recent_intent() {
875  assert!(recent_intent::<SmolStr>(&HashMap::new(), &"foo".into(), MessageType::Join).is_none());
876
877  let now = Epoch::now();
878  let expire = || now - Duration::from_secs(2);
879  let save = || now;
880
881  let mut intents = HashMap::<SmolStr, _>::new();
882  assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
883
884  assert!(upsert_intent(
885    &mut intents,
886    &"foo".into(),
887    MessageType::Join,
888    1.into(),
889    expire
890  ));
891  assert!(upsert_intent(
892    &mut intents,
893    &"bar".into(),
894    MessageType::Leave,
895    2.into(),
896    expire
897  ));
898  assert!(upsert_intent(
899    &mut intents,
900    &"baz".into(),
901    MessageType::Join,
902    3.into(),
903    save
904  ));
905  assert!(upsert_intent(
906    &mut intents,
907    &"bar".into(),
908    MessageType::Join,
909    4.into(),
910    expire
911  ));
912  assert!(!upsert_intent(
913    &mut intents,
914    &"bar".into(),
915    MessageType::Join,
916    0.into(),
917    expire
918  ));
919  assert!(upsert_intent(
920    &mut intents,
921    &"bar".into(),
922    MessageType::Join,
923    5.into(),
924    expire
925  ));
926
927  let ltime = recent_intent(&intents, &"foo".into(), MessageType::Join).unwrap();
928  assert_eq!(ltime, 1.into());
929
930  let ltime = recent_intent(&intents, &"bar".into(), MessageType::Join).unwrap();
931  assert_eq!(ltime, 5.into());
932
933  let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
934  assert_eq!(ltime, 3.into());
935
936  assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
937
938  reap_intents(&mut intents, Epoch::now(), Duration::from_secs(1));
939  assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
940  assert!(recent_intent(&intents, &"bar".into(), MessageType::Join).is_none());
941  let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
942  assert_eq!(ltime, 3.into());
943  assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
944  reap_intents(
945    &mut intents,
946    Epoch::now() + Duration::from_secs(2),
947    Duration::from_secs(1),
948  );
949  assert!(recent_intent(&intents, &"baz".into(), MessageType::Join).is_none());
950}