serf_core/serf/base/tests/
serf.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use memberlist_core::{tests::AnyError, transport::Id};
4
5use serf_types::{Member, MemberStatus, Tags};
6
7use crate::{event::EventProducer, types::MemberState};
8
9use super::*;
10
11/// Unit tests for the serf events related functionalities
12pub mod event;
13
14/// Unit tests for the serf leave related functionalities
15pub mod leave;
16
17/// Unit tests for the serf join related functionalities
18pub mod join;
19
20/// Unit tests for the serf ping delegate related functionalities
21pub mod delegate;
22
23/// Unit tests for the serf reconnect related functionalities
24pub mod reconnect;
25
26/// Unit tests for the serf remove related functionalities
27pub mod remove;
28
29/// Unit tests for serf reap related functionalities
30pub mod reap;
31
32/// Unit tests for the serf snapshot related functionalities
33pub mod snapshot;
34
35fn test_member_status<I: Id, A>(
36  members: &HashMap<I, MemberState<I, A>>,
37  id: I,
38  status: MemberStatus,
39) -> Result<(), AnyError> {
40  for member in members.values() {
41    if id.eq(member.member.node.id()) {
42      if member.member.status != status {
43        return Err(AnyError::from(format!(
44          "expected member {} to have status {:?}, got {:?}",
45          id, status, member.member.status
46        )));
47      }
48      return Ok(());
49    }
50  }
51  Err(AnyError::from(format!("member {} not found", id)))
52}
53
54/// Unit tests for the get queue max
55pub async fn serf_get_queue_max<T>(
56  transport_opts: T::Options,
57  mut get_addr: impl FnMut(usize) -> <T::Resolver as AddressResolver>::ResolvedAddress,
58) where
59  T: Transport<Id = SmolStr>,
60  T::Options: Clone,
61{
62  let s = Serf::<T>::new(transport_opts.clone(), test_config())
63    .await
64    .unwrap();
65
66  // We don't need a running Serf so fake it out with the required
67  // state.
68  {
69    let mut members = s.inner.members.write().await;
70    members.states.clear();
71    for i in 0..100 {
72      let name: SmolStr = format!("Member{i}").into();
73      members.states.insert(
74        name.clone(),
75        MemberState {
76          member: Member::new(
77            Node::new(name.clone(), get_addr(i)),
78            Default::default(),
79            MemberStatus::Alive,
80          ),
81          status_time: 0.into(),
82          leave_time: None,
83        },
84      );
85    }
86  }
87
88  // Default mode just uses the max depth.
89  let got = s.get_queue_max().await;
90  let want = 4096;
91  assert_eq!(got, want);
92
93  // Now configure a min which should take precedence.
94  s.shutdown().await.unwrap();
95  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
96
97  let sn = Serf::<T>::new(
98    transport_opts.clone(),
99    test_config().with_min_queue_depth(1024),
100  )
101  .await
102  .unwrap();
103
104  {
105    let mut members = sn.inner.members.write().await;
106    members.states.clear();
107    let old_members = s.inner.members.read().await;
108    members.states.clone_from(&old_members.states);
109  }
110
111  let got = sn.get_queue_max().await;
112  let want = 1024;
113  assert_eq!(got, want);
114
115  sn.shutdown().await.unwrap();
116  <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
117
118  // Bring it under the number of nodes, so the calculation based on
119  // the number of nodes takes precedence.
120  let snn = Serf::<T>::new(transport_opts, test_config().with_min_queue_depth(16))
121    .await
122    .unwrap();
123
124  {
125    let mut members = snn.inner.members.write().await;
126    members.states.clear();
127    let old_members = sn.inner.members.read().await;
128    members.states.clone_from(&old_members.states);
129  }
130
131  let got = snn.get_queue_max().await;
132  let want = 200;
133  assert_eq!(got, want);
134
135  // Try adjusting the node count.
136  {
137    let mut members = snn.inner.members.write().await;
138    let name = SmolStr::new("another");
139    members.states.insert(
140      name.clone(),
141      MemberState {
142        member: Member::new(
143          Node::new(name.clone(), get_addr(10000)),
144          Default::default(),
145          MemberStatus::Alive,
146        ),
147        status_time: 0.into(),
148        leave_time: None,
149      },
150    );
151  }
152
153  let got = snn.get_queue_max().await;
154  let want = 202;
155  assert_eq!(got, want);
156  snn.shutdown().await.unwrap();
157  drop(snn);
158}
159
160/// Unit tests for the update
161pub async fn serf_update<T, F>(
162  transport_opts1: T::Options,
163  transport_opts2: T::Options,
164  get_transport: impl FnOnce(T::Id, <T::Resolver as AddressResolver>::ResolvedAddress) -> F + Copy,
165) where
166  T: Transport,
167  T::Options: Clone,
168  F: core::future::Future<Output = T::Options>,
169{
170  let (event_tx, event_rx) = EventProducer::bounded(64);
171  let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
172    .await
173    .unwrap();
174  let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
175    .await
176    .unwrap();
177  let (s2id, s2addr) = s2.advertise_node().into_components();
178
179  let mut serfs = vec![s1, s2];
180  wait_until_num_nodes(1, &serfs).await;
181
182  let node = serfs[1]
183    .inner
184    .memberlist
185    .advertise_node()
186    .map_address(MaybeResolvedAddress::resolved);
187  serfs[0].join(node.clone(), false).await.unwrap();
188
189  wait_until_num_nodes(2, &serfs).await;
190  // Now force the shutdown of s2 so it appears to fail.
191  serfs[1].shutdown().await.unwrap();
192  drop(serfs.pop().unwrap());
193
194  // Don't wait for a failure to be detected. Bring back s2 immediately
195  let start = Epoch::now();
196  let s2 = loop {
197    match Serf::<T>::new(
198      get_transport(s2id.clone(), s2addr.clone()).await,
199      test_config().with_tags([("foo", "bar")].into_iter()),
200    )
201    .await
202    {
203      Ok(s) => break s,
204      Err(e) => {
205        <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(1)).await;
206        if start.elapsed() > Duration::from_secs(20) {
207          panic!("timed out: {}", e);
208        }
209      }
210    }
211  };
212
213  let s1node = serfs[0].advertise_node();
214  s2.join(s1node.map_address(MaybeResolvedAddress::resolved), false)
215    .await
216    .unwrap();
217  serfs.push(s2);
218  wait_until_num_nodes(2, &serfs).await;
219
220  test_events(
221    event_rx.rx,
222    node.id().clone(),
223    [
224      CrateEventType::Member(MemberEventType::Join),
225      CrateEventType::Member(MemberEventType::Update),
226    ]
227    .into_iter()
228    .collect(),
229  )
230  .await;
231
232  // Verify that the member data got updated.
233  let mut found = false;
234  let members = serfs[0].inner.members.read().await;
235
236  for member in members.states.values() {
237    if member.member.node.id().eq(node.id())
238      && member.member.tags().get("foo").map(|v| v.as_str()) == Some("bar")
239    {
240      found = true;
241      break;
242    }
243  }
244  assert!(found, "did not found s2 in members");
245
246  for s in serfs.iter() {
247    s.shutdown().await.unwrap();
248  }
249}
250
251/// Unit tests for the role
252pub async fn serf_role<T>(transport_opts1: T::Options, transport_opts2: T::Options)
253where
254  T: Transport,
255{
256  let s1 = Serf::<T>::new(
257    transport_opts1,
258    test_config().with_tags([("role", "web")].into_iter()),
259  )
260  .await
261  .unwrap();
262  let s2 = Serf::<T>::new(
263    transport_opts2,
264    test_config().with_tags([("role", "lb")].into_iter()),
265  )
266  .await
267  .unwrap();
268
269  let serfs = [s1, s2];
270  wait_until_num_nodes(1, &serfs).await;
271
272  let node = serfs[1]
273    .inner
274    .memberlist
275    .advertise_node()
276    .map_address(MaybeResolvedAddress::resolved);
277  serfs[0].join(node.clone(), false).await.unwrap();
278
279  wait_until_num_nodes(2, &serfs).await;
280
281  let mut roles = HashMap::new();
282
283  let start = Epoch::now();
284  let mut cond1 = false;
285  let mut cond2 = false;
286  loop {
287    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
288
289    let members = serfs[0].inner.members.read().await;
290    for m in members.states.values() {
291      roles.insert(
292        m.member.node.id().clone(),
293        m.member.tags().get("role").cloned().unwrap(),
294      );
295    }
296
297    if let Some(role) = roles.get(node.id()) {
298      if role == "lb" {
299        cond1 = true;
300      }
301    }
302
303    if let Some(role) = roles.get(serfs[0].local_id()) {
304      if role == "web" {
305        cond2 = true;
306      }
307    }
308
309    if cond1 && cond2 {
310      break;
311    }
312
313    if start.elapsed() > Duration::from_secs(7) {
314      panic!("timed out");
315    }
316  }
317}
318
319/// Unit test for serf state
320pub async fn serf_state<T>(transport_opts1: T::Options)
321where
322  T: Transport,
323{
324  let s1 = Serf::<T>::new(transport_opts1, test_config())
325    .await
326    .unwrap();
327
328  assert_eq!(s1.state(), SerfState::Alive);
329
330  s1.leave().await.unwrap();
331
332  assert_eq!(s1.state(), SerfState::Left);
333
334  s1.shutdown().await.unwrap();
335
336  assert_eq!(s1.state(), SerfState::Shutdown);
337}
338
339/// Unit tests for serf set tags
340pub async fn serf_set_tags<T>(transport_opts1: T::Options, transport_opts2: T::Options)
341where
342  T: Transport,
343{
344  let (event_tx, event_rx) = EventProducer::bounded(4);
345  let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
346    .await
347    .unwrap();
348  let s2 = Serf::<T>::new(transport_opts2, test_config())
349    .await
350    .unwrap();
351
352  let serfs = [s1, s2];
353
354  wait_until_num_nodes(1, &serfs).await;
355
356  let node = serfs[1]
357    .inner
358    .memberlist
359    .advertise_node()
360    .map_address(MaybeResolvedAddress::resolved);
361  serfs[0].join(node.clone(), false).await.unwrap();
362
363  wait_until_num_nodes(2, &serfs).await;
364
365  // Update the tags
366  serfs[0]
367    .set_tags([("port", "8080")].into_iter().collect())
368    .await
369    .unwrap();
370
371  serfs[1]
372    .set_tags([("datacenter", "east-aws")].into_iter().collect())
373    .await
374    .unwrap();
375
376  let start = Epoch::now();
377  let mut cond1 = false;
378  let mut cond2 = false;
379  let mut cond3 = false;
380  let mut cond4 = false;
381
382  loop {
383    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
384
385    let m1m = serfs[0].members().await;
386    let mut m1m_tags = HashMap::with_capacity(2);
387    for m in m1m {
388      m1m_tags.insert(m.node.id().clone(), m.tags.clone());
389    }
390
391    if m1m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
392      cond1 = true;
393    }
394
395    if m1m_tags
396      .get(serfs[1].local_id())
397      .map(|t| t.get("datacenter"))
398      == Some(Some(&"east-aws".into()))
399    {
400      cond2 = true;
401    }
402
403    let m2m = serfs[1].members().await;
404    let mut m2m_tags = HashMap::with_capacity(2);
405    for m in m2m {
406      m2m_tags.insert(m.node.id().clone(), m.tags.clone());
407    }
408
409    if m2m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
410      cond3 = true;
411    }
412
413    if m2m_tags
414      .get(serfs[1].local_id())
415      .map(|t| t.get("datacenter"))
416      == Some(Some(&"east-aws".into()))
417    {
418      cond4 = true;
419    }
420
421    if cond1 && cond2 && cond3 && cond4 {
422      break;
423    }
424
425    if start.elapsed() > Duration::from_secs(7) {
426      panic!("timed out");
427    }
428  }
429
430  // we check the events to make sure we got failures.
431  test_events(
432    event_rx.rx,
433    node.id().clone(),
434    [
435      CrateEventType::Member(MemberEventType::Join),
436      CrateEventType::Member(MemberEventType::Update),
437    ]
438    .into_iter()
439    .collect(),
440  )
441  .await;
442
443  for s in serfs.iter() {
444    s.shutdown().await.unwrap();
445  }
446}
447
448/// Unit tests for serf num nodes
449pub async fn serf_num_nodes<T>(transport_opts1: T::Options, transport_opts2: T::Options)
450where
451  T: Transport,
452{
453  let s1 = Serf::<T>::new(transport_opts1, test_config())
454    .await
455    .unwrap();
456  let s2 = Serf::<T>::new(transport_opts2, test_config())
457    .await
458    .unwrap();
459
460  assert_eq!(s1.num_members().await, 1);
461
462  let serfs = [s1, s2];
463  wait_until_num_nodes(1, &serfs).await;
464
465  let node = serfs[1]
466    .inner
467    .memberlist
468    .advertise_node()
469    .map_address(MaybeResolvedAddress::resolved);
470  serfs[0].join(node.clone(), false).await.unwrap();
471
472  wait_until_num_nodes(2, &serfs).await;
473}
474
475/// Unit tests for serf coordinates
476pub async fn serf_coordinates<T>(
477  transport_opts1: T::Options,
478  transport_opts2: T::Options,
479  transport_opts3: T::Options,
480) where
481  T: Transport,
482{
483  const PROBE_INTERVAL: Duration = Duration::from_millis(2);
484
485  let opts = test_config()
486    .with_disable_coordinates(false)
487    .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL));
488  let s1 = Serf::<T>::new(transport_opts1, opts.clone()).await.unwrap();
489  let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
490
491  let mut serfs = vec![s1, s2];
492  wait_until_num_nodes(1, &serfs).await;
493
494  // Make sure both nodes start out the origin so we can prove they did
495  // an update later.
496  let c1 = serfs[0].cooridate().unwrap();
497  let c2 = serfs[1].cooridate().unwrap();
498
499  const ZERO_THRESHOLD: f64 = 20.0e-6;
500
501  assert!(
502    c1.distance_to(&c2).as_secs_f64() <= ZERO_THRESHOLD,
503    "coordinates didn't start at the origin"
504  );
505
506  // Join the two nodes together and give them time to probe each other.
507  let node = serfs[1]
508    .inner
509    .memberlist
510    .advertise_node()
511    .map_address(MaybeResolvedAddress::resolved);
512  serfs[0].join(node.clone(), false).await.unwrap();
513
514  wait_until_num_nodes(2, &serfs).await;
515
516  let start = Epoch::now();
517  let mut cond1 = false;
518  let mut cond2 = false;
519  let mut cond3 = false;
520  let mut cond4 = false;
521  let s2id = serfs[1].local_id().clone();
522  let s1id = serfs[0].local_id().clone();
523  loop {
524    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
525
526    // See if they know about each other.
527
528    if serfs[0].cached_coordinate(&s2id.clone()).is_ok() {
529      cond1 = true;
530    } else if start.elapsed() > Duration::from_secs(7) {
531      panic!("s1 didn't get a coordinate for s2");
532    }
533
534    if serfs[1].cached_coordinate(&s1id.clone()).is_ok() {
535      cond2 = true;
536    } else if start.elapsed() > Duration::from_secs(7) {
537      panic!("s2 didn't get a coordinate for s1");
538    }
539
540    // With only one ping they won't have a good estimate of the other node's
541    // coordinate, but they should both have updated their own coordinate.
542    let c1 = serfs[0].cooridate().unwrap();
543    let c2 = serfs[1].cooridate().unwrap();
544
545    if c1.distance_to(&c2).as_secs_f64() >= ZERO_THRESHOLD {
546      cond3 = true;
547    } else if start.elapsed() > Duration::from_secs(7) {
548      panic!("coordinates didn't update after probes");
549    }
550
551    // Make sure they cached their own current coordinate after the update.
552    let c1c = serfs[0].cached_coordinate(&s1id.clone()).unwrap();
553    match c1c {
554      None => {
555        if start.elapsed() > Duration::from_secs(7) {
556          panic!("s1 didn't cache its own coordinate");
557        }
558      }
559      Some(c1c) => {
560        if c1 == c1c {
561          cond4 = true;
562        } else if start.elapsed() > Duration::from_secs(7) {
563          panic!("s1 coordinates are not equal");
564        }
565      }
566    }
567
568    if cond1 && cond2 && cond3 && cond4 {
569      break;
570    }
571
572    if start.elapsed() > Duration::from_secs(7) {
573      panic!(
574        "timed out cond1 {} cond2 {} cond3 {} cond4 {}",
575        cond1, cond2, cond3, cond4
576      );
577    }
578  }
579
580  // Break up the cluster and make sure the coordinates get removed by
581  // the reaper.
582  serfs[1].shutdown().await.unwrap();
583  let t = serfs[1].inner.opts.reap_interval * 4;
584  drop(serfs.pop().unwrap());
585
586  <T::Runtime as RuntimeLite>::sleep(t).await;
587
588  wait_until_num_nodes(1, &serfs).await;
589
590  let start = Epoch::now();
591  loop {
592    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
593
594    if serfs[0].cached_coordinate(&s2id.clone()).unwrap().is_none() {
595      break;
596    }
597
598    if start.elapsed() > Duration::from_secs(7) {
599      panic!("s1 should have removed s2's cached coordinate");
600    }
601  }
602
603  // Try a setup with coordinates disabled.
604  let s3 = Serf::<T>::new(
605    transport_opts3,
606    test_config()
607      .with_disable_coordinates(true)
608      .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
609  )
610  .await
611  .unwrap();
612
613  serfs.push(s3);
614  wait_until_num_nodes(1, &serfs).await;
615
616  let node = serfs[0]
617    .inner
618    .memberlist
619    .advertise_node()
620    .map_address(MaybeResolvedAddress::resolved);
621  serfs[1].join(node.clone(), false).await.unwrap();
622
623  wait_until_num_nodes(2, &serfs).await;
624
625  let start = Epoch::now();
626  let mut cond1 = false;
627  let mut cond2 = false;
628  loop {
629    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
630
631    // See if they know about each other.
632
633    if let Err(e) = serfs[1].cooridate() {
634      if e.to_string().contains("coordinates are disabled") {
635        cond1 = true;
636      }
637    }
638
639    if serfs[1].cached_coordinate(&s1id.clone()).is_err() {
640      cond2 = true;
641    }
642
643    if cond1 && cond2 {
644      break;
645    }
646
647    if start.elapsed() > Duration::from_secs(14) {
648      panic!("timed out: cond1 {} cond2 {}", cond1, cond2);
649    }
650  }
651
652  for s in serfs.iter() {
653    s.shutdown().await.unwrap();
654  }
655}
656
657/// Unit tests for serf name resolution
658///
659/// set_id is a function that takes the transport options and the id of the node, and returns the
660/// transport options with the id set to the given id.
661pub async fn serf_name_resolution<T>(
662  transport_opts1: T::Options,
663  transport_opts2: T::Options,
664  transport_opts3: T::Options,
665  set_id: impl FnOnce(T::Options, T::Id) -> T::Options,
666) where
667  T: Transport,
668{
669  let s1 = Serf::<T>::new(transport_opts1, test_config())
670    .await
671    .unwrap();
672  let s2 = Serf::<T>::new(transport_opts2, test_config())
673    .await
674    .unwrap();
675  let s3 = Serf::<T>::new(
676    set_id(transport_opts3, s1.local_id().clone()),
677    test_config(),
678  )
679  .await
680  .unwrap();
681
682  let serfs = [s1, s2, s3];
683  wait_until_num_nodes(1, &serfs).await;
684
685  // Join s1 to s2 first. s2 should vote for s1 in conflict
686  let node = serfs[1]
687    .inner
688    .memberlist
689    .advertise_node()
690    .map_address(MaybeResolvedAddress::resolved);
691  serfs[0].join(node.clone(), false).await.unwrap();
692
693  wait_until_num_nodes(2, &serfs[..2]).await;
694  wait_until_num_nodes(1, &serfs[2..]).await;
695
696  let node = serfs[2]
697    .inner
698    .memberlist
699    .advertise_node()
700    .map_address(MaybeResolvedAddress::resolved);
701  serfs[0].join(node.clone(), false).await.unwrap();
702
703  // Wait for the query period to end
704  <T::Runtime as RuntimeLite>::sleep(serfs[0].default_query_timeout().await * 30).await;
705
706  let start = Epoch::now();
707  let mut cond1 = false;
708  let mut cond2 = false;
709  let mut cond3 = false;
710  loop {
711    // s3 should have shutdown, while s1 is running
712    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
713
714    if serfs[0].state() == SerfState::Alive {
715      cond1 = true;
716    }
717
718    if serfs[1].state() == SerfState::Alive {
719      cond2 = true;
720    }
721
722    if serfs[2].state() == SerfState::Shutdown {
723      cond3 = true;
724    }
725
726    if cond1 && cond2 && cond3 {
727      break;
728    }
729
730    if start.elapsed() > Duration::from_secs(14) {
731      println!("cond1 {cond1} cond2 {cond2} cond3 {cond3}");
732      panic!("timed out");
733    }
734  }
735
736  for s in serfs.iter() {
737    s.shutdown().await.unwrap();
738  }
739}
740
741/// Unit test for serf local member
742pub async fn serf_local_member<T>(opts: T::Options)
743where
744  T: Transport,
745{
746  let s = Serf::<T>::new(opts, test_config()).await.unwrap();
747
748  let local = s.local_member().await;
749  assert_eq!(local.node.id(), s.local_id());
750
751  assert_eq!(local.tags, s.inner.opts.tags());
752  assert_eq!(local.status, MemberStatus::Alive);
753
754  let new_tags = [("foo", "bar"), ("test", "ing")]
755    .into_iter()
756    .collect::<Tags>();
757  s.set_tags(new_tags.clone()).await.unwrap();
758
759  let local = s.local_member().await;
760  assert_eq!(&*local.tags, &new_tags);
761}
762
763/// Unit test for serf stats
764pub async fn serf_stats<T>(opts: T::Options)
765where
766  T: Transport,
767{
768  let s = Serf::<T>::new(opts, test_config()).await.unwrap();
769
770  let stats = s.stats().await;
771  assert_eq!(stats.get_event_queue(), 0);
772  assert_eq!(stats.get_event_time(), 1);
773  assert_eq!(stats.get_failed(), 0);
774  assert_eq!(stats.get_intent_queue(), 0);
775  assert_eq!(stats.get_left(), 0);
776  assert_eq!(stats.get_health_score(), 0);
777  assert_eq!(stats.get_member_time(), 1);
778  assert_eq!(stats.get_members(), 1);
779  assert!(!stats.get_encrypted());
780}
781
782/// Unit test for serf write keying file
783#[cfg(feature = "encryption")]
784pub async fn serf_write_keyring_file<T>(
785  get_transport_opts: impl FnOnce(memberlist_core::types::SecretKey) -> T::Options,
786) where
787  T: Transport,
788{
789  use std::io::Read;
790
791  use base64::{engine::general_purpose, Engine as _};
792
793  const EXISTING: &str = "T9jncgl9mbLus+baTTa7q7nPSUrXwbDi2dhbtqir37s=";
794  const NEW_KEY: &str = "HvY8ubRZMgafUOWvrOadwOckVa1wN3QWAo46FVKbVN8=";
795
796  let td = tempfile::tempdir().unwrap();
797  let mut p = td.path().join("serf_write_keying_file");
798  p.set_extension("json");
799
800  let existing_bytes = general_purpose::STANDARD.decode(EXISTING).unwrap();
801  let sk = memberlist_core::types::SecretKey::try_from(existing_bytes.as_slice()).unwrap();
802
803  let serf = Serf::<T>::new(
804    get_transport_opts(sk),
805    test_config().with_keyring_file(Some(p.clone())),
806  )
807  .await
808  .unwrap();
809  assert!(
810    serf.encryption_enabled(),
811    "write keyring file test only works on encrypted serf"
812  );
813
814  let manager = serf.key_manager();
815  let new_key = general_purpose::STANDARD.decode(NEW_KEY).unwrap();
816  let new_sk = memberlist_core::types::SecretKey::try_from(new_key.as_slice()).unwrap();
817  manager.install_key(new_sk, None).await.unwrap();
818
819  let mut keyring_file = std::fs::File::open(&p).unwrap();
820  let mut s = String::new();
821  keyring_file.read_to_string(&mut s).unwrap();
822
823  let lines = s.split('\n').collect::<Vec<_>>();
824  assert_eq!(lines.len(), 4);
825
826  // Ensure both the original key and the new key are present in the file
827  assert!(s.contains(EXISTING));
828  assert!(s.contains(NEW_KEY));
829
830  // Ensure the existing key remains primary. This is in position 1 because
831  // the file writer will use json.MarshalIndent(), leaving the first line as
832  // the opening bracket.
833  assert!(lines[1].contains(EXISTING));
834
835  // Swap primary keys
836  manager.use_key(new_sk, None).await.unwrap();
837
838  let mut keyring_file = std::fs::File::open(&p).unwrap();
839  let mut s = String::new();
840  keyring_file.read_to_string(&mut s).unwrap();
841
842  let lines = s.split('\n').collect::<Vec<_>>();
843  assert_eq!(lines.len(), 4);
844
845  // Key order should have changed in keyring file
846  assert!(lines[1].contains(NEW_KEY));
847
848  // Remove the old key
849  manager.remove_key(sk, None).await.unwrap();
850
851  let mut keyring_file = std::fs::File::open(&p).unwrap();
852  let mut s = String::new();
853  keyring_file.read_to_string(&mut s).unwrap();
854
855  let lines = s.split('\n').collect::<Vec<_>>();
856  // Only the new key should now be present in the keyring file
857  assert_eq!(lines.len(), 3);
858
859  assert!(lines[1].contains(NEW_KEY));
860
861  let resp = manager.list_keys().await.unwrap();
862  assert_eq!(resp.primary_keys().len(), 1);
863  assert_eq!(resp.keys().len(), 1);
864}
865
866#[test]
867fn test_recent_intent() {
868  assert!(recent_intent::<SmolStr>(&HashMap::new(), &"foo".into(), MessageType::Join).is_none());
869
870  let now = Epoch::now();
871  let expire = || now - Duration::from_secs(2);
872  let save = || now;
873
874  let mut intents = HashMap::<SmolStr, _>::new();
875  assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
876
877  assert!(upsert_intent(
878    &mut intents,
879    &"foo".into(),
880    MessageType::Join,
881    1.into(),
882    expire
883  ));
884  assert!(upsert_intent(
885    &mut intents,
886    &"bar".into(),
887    MessageType::Leave,
888    2.into(),
889    expire
890  ));
891  assert!(upsert_intent(
892    &mut intents,
893    &"baz".into(),
894    MessageType::Join,
895    3.into(),
896    save
897  ));
898  assert!(upsert_intent(
899    &mut intents,
900    &"bar".into(),
901    MessageType::Join,
902    4.into(),
903    expire
904  ));
905  assert!(!upsert_intent(
906    &mut intents,
907    &"bar".into(),
908    MessageType::Join,
909    0.into(),
910    expire
911  ));
912  assert!(upsert_intent(
913    &mut intents,
914    &"bar".into(),
915    MessageType::Join,
916    5.into(),
917    expire
918  ));
919
920  let ltime = recent_intent(&intents, &"foo".into(), MessageType::Join).unwrap();
921  assert_eq!(ltime, 1.into());
922
923  let ltime = recent_intent(&intents, &"bar".into(), MessageType::Join).unwrap();
924  assert_eq!(ltime, 5.into());
925
926  let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
927  assert_eq!(ltime, 3.into());
928
929  assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
930
931  reap_intents(&mut intents, Epoch::now(), Duration::from_secs(1));
932  assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
933  assert!(recent_intent(&intents, &"bar".into(), MessageType::Join).is_none());
934  let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
935  assert_eq!(ltime, 3.into());
936  assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
937  reap_intents(
938    &mut intents,
939    Epoch::now() + Duration::from_secs(2),
940    Duration::from_secs(1),
941  );
942  assert!(recent_intent(&intents, &"baz".into(), MessageType::Join).is_none());
943}