1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use memberlist_core::{tests::AnyError, transport::Id};
4
5use serf_types::{Member, MemberStatus, Tags};
6
7use crate::{event::EventProducer, types::MemberState};
8
9use super::*;
10
11pub mod event;
13
14pub mod leave;
16
17pub mod join;
19
20pub mod delegate;
22
23pub mod reconnect;
25
26pub mod remove;
28
29pub mod reap;
31
32pub mod snapshot;
34
35fn test_member_status<I: Id, A>(
36 members: &HashMap<I, MemberState<I, A>>,
37 id: I,
38 status: MemberStatus,
39) -> Result<(), AnyError> {
40 for member in members.values() {
41 if id.eq(member.member.node.id()) {
42 if member.member.status != status {
43 return Err(AnyError::from(format!(
44 "expected member {} to have status {:?}, got {:?}",
45 id, status, member.member.status
46 )));
47 }
48 return Ok(());
49 }
50 }
51 Err(AnyError::from(format!("member {} not found", id)))
52}
53
54pub async fn serf_get_queue_max<T>(
56 transport_opts: T::Options,
57 mut get_addr: impl FnMut(usize) -> <T::Resolver as AddressResolver>::ResolvedAddress,
58) where
59 T: Transport<Id = SmolStr>,
60 T::Options: Clone,
61{
62 let s = Serf::<T>::new(transport_opts.clone(), test_config())
63 .await
64 .unwrap();
65
66 {
69 let mut members = s.inner.members.write().await;
70 members.states.clear();
71 for i in 0..100 {
72 let name: SmolStr = format!("Member{i}").into();
73 members.states.insert(
74 name.clone(),
75 MemberState {
76 member: Member::new(
77 Node::new(name.clone(), get_addr(i)),
78 Default::default(),
79 MemberStatus::Alive,
80 ),
81 status_time: 0.into(),
82 leave_time: None,
83 },
84 );
85 }
86 }
87
88 let got = s.get_queue_max().await;
90 let want = 4096;
91 assert_eq!(got, want);
92
93 s.shutdown().await.unwrap();
95 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
96
97 let sn = Serf::<T>::new(
98 transport_opts.clone(),
99 test_config().with_min_queue_depth(1024),
100 )
101 .await
102 .unwrap();
103
104 {
105 let mut members = sn.inner.members.write().await;
106 members.states.clear();
107 let old_members = s.inner.members.read().await;
108 members.states.clone_from(&old_members.states);
109 }
110
111 let got = sn.get_queue_max().await;
112 let want = 1024;
113 assert_eq!(got, want);
114
115 sn.shutdown().await.unwrap();
116 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
117
118 let snn = Serf::<T>::new(transport_opts, test_config().with_min_queue_depth(16))
121 .await
122 .unwrap();
123
124 {
125 let mut members = snn.inner.members.write().await;
126 members.states.clear();
127 let old_members = sn.inner.members.read().await;
128 members.states.clone_from(&old_members.states);
129 }
130
131 let got = snn.get_queue_max().await;
132 let want = 200;
133 assert_eq!(got, want);
134
135 {
137 let mut members = snn.inner.members.write().await;
138 let name = SmolStr::new("another");
139 members.states.insert(
140 name.clone(),
141 MemberState {
142 member: Member::new(
143 Node::new(name.clone(), get_addr(10000)),
144 Default::default(),
145 MemberStatus::Alive,
146 ),
147 status_time: 0.into(),
148 leave_time: None,
149 },
150 );
151 }
152
153 let got = snn.get_queue_max().await;
154 let want = 202;
155 assert_eq!(got, want);
156 snn.shutdown().await.unwrap();
157 drop(snn);
158}
159
160pub async fn serf_update<T, F>(
162 transport_opts1: T::Options,
163 transport_opts2: T::Options,
164 get_transport: impl FnOnce(T::Id, <T::Resolver as AddressResolver>::ResolvedAddress) -> F + Copy,
165) where
166 T: Transport,
167 T::Options: Clone,
168 F: core::future::Future<Output = T::Options>,
169{
170 let (event_tx, event_rx) = EventProducer::bounded(64);
171 let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
172 .await
173 .unwrap();
174 let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
175 .await
176 .unwrap();
177 let (s2id, s2addr) = s2.advertise_node().into_components();
178
179 let mut serfs = vec![s1, s2];
180 wait_until_num_nodes(1, &serfs).await;
181
182 let node = serfs[1]
183 .inner
184 .memberlist
185 .advertise_node()
186 .map_address(MaybeResolvedAddress::resolved);
187 serfs[0].join(node.clone(), false).await.unwrap();
188
189 wait_until_num_nodes(2, &serfs).await;
190 serfs[1].shutdown().await.unwrap();
192 drop(serfs.pop().unwrap());
193
194 let start = Epoch::now();
196 let s2 = loop {
197 match Serf::<T>::new(
198 get_transport(s2id.clone(), s2addr.clone()).await,
199 test_config().with_tags([("foo", "bar")].into_iter()),
200 )
201 .await
202 {
203 Ok(s) => break s,
204 Err(e) => {
205 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(1)).await;
206 if start.elapsed() > Duration::from_secs(20) {
207 panic!("timed out: {}", e);
208 }
209 }
210 }
211 };
212
213 let s1node = serfs[0].advertise_node();
214 s2.join(s1node.map_address(MaybeResolvedAddress::resolved), false)
215 .await
216 .unwrap();
217 serfs.push(s2);
218 wait_until_num_nodes(2, &serfs).await;
219
220 test_events(
221 event_rx.rx,
222 node.id().clone(),
223 [
224 CrateEventType::Member(MemberEventType::Join),
225 CrateEventType::Member(MemberEventType::Update),
226 ]
227 .into_iter()
228 .collect(),
229 )
230 .await;
231
232 let mut found = false;
234 let members = serfs[0].inner.members.read().await;
235
236 for member in members.states.values() {
237 if member.member.node.id().eq(node.id())
238 && member.member.tags().get("foo").map(|v| v.as_str()) == Some("bar")
239 {
240 found = true;
241 break;
242 }
243 }
244 assert!(found, "did not found s2 in members");
245
246 for s in serfs.iter() {
247 s.shutdown().await.unwrap();
248 }
249}
250
251pub async fn serf_role<T>(transport_opts1: T::Options, transport_opts2: T::Options)
253where
254 T: Transport,
255{
256 let s1 = Serf::<T>::new(
257 transport_opts1,
258 test_config().with_tags([("role", "web")].into_iter()),
259 )
260 .await
261 .unwrap();
262 let s2 = Serf::<T>::new(
263 transport_opts2,
264 test_config().with_tags([("role", "lb")].into_iter()),
265 )
266 .await
267 .unwrap();
268
269 let serfs = [s1, s2];
270 wait_until_num_nodes(1, &serfs).await;
271
272 let node = serfs[1]
273 .inner
274 .memberlist
275 .advertise_node()
276 .map_address(MaybeResolvedAddress::resolved);
277 serfs[0].join(node.clone(), false).await.unwrap();
278
279 wait_until_num_nodes(2, &serfs).await;
280
281 let mut roles = HashMap::new();
282
283 let start = Epoch::now();
284 let mut cond1 = false;
285 let mut cond2 = false;
286 loop {
287 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
288
289 let members = serfs[0].inner.members.read().await;
290 for m in members.states.values() {
291 roles.insert(
292 m.member.node.id().clone(),
293 m.member.tags().get("role").cloned().unwrap(),
294 );
295 }
296
297 if let Some(role) = roles.get(node.id()) {
298 if role == "lb" {
299 cond1 = true;
300 }
301 }
302
303 if let Some(role) = roles.get(serfs[0].local_id()) {
304 if role == "web" {
305 cond2 = true;
306 }
307 }
308
309 if cond1 && cond2 {
310 break;
311 }
312
313 if start.elapsed() > Duration::from_secs(7) {
314 panic!("timed out");
315 }
316 }
317}
318
319pub async fn serf_state<T>(transport_opts1: T::Options)
321where
322 T: Transport,
323{
324 let s1 = Serf::<T>::new(transport_opts1, test_config())
325 .await
326 .unwrap();
327
328 assert_eq!(s1.state(), SerfState::Alive);
329
330 s1.leave().await.unwrap();
331
332 assert_eq!(s1.state(), SerfState::Left);
333
334 s1.shutdown().await.unwrap();
335
336 assert_eq!(s1.state(), SerfState::Shutdown);
337}
338
339pub async fn serf_set_tags<T>(transport_opts1: T::Options, transport_opts2: T::Options)
341where
342 T: Transport,
343{
344 let (event_tx, event_rx) = EventProducer::bounded(4);
345 let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
346 .await
347 .unwrap();
348 let s2 = Serf::<T>::new(transport_opts2, test_config())
349 .await
350 .unwrap();
351
352 let serfs = [s1, s2];
353
354 wait_until_num_nodes(1, &serfs).await;
355
356 let node = serfs[1]
357 .inner
358 .memberlist
359 .advertise_node()
360 .map_address(MaybeResolvedAddress::resolved);
361 serfs[0].join(node.clone(), false).await.unwrap();
362
363 wait_until_num_nodes(2, &serfs).await;
364
365 serfs[0]
367 .set_tags([("port", "8080")].into_iter().collect())
368 .await
369 .unwrap();
370
371 serfs[1]
372 .set_tags([("datacenter", "east-aws")].into_iter().collect())
373 .await
374 .unwrap();
375
376 let start = Epoch::now();
377 let mut cond1 = false;
378 let mut cond2 = false;
379 let mut cond3 = false;
380 let mut cond4 = false;
381
382 loop {
383 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
384
385 let m1m = serfs[0].members().await;
386 let mut m1m_tags = HashMap::with_capacity(2);
387 for m in m1m {
388 m1m_tags.insert(m.node.id().clone(), m.tags.clone());
389 }
390
391 if m1m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
392 cond1 = true;
393 }
394
395 if m1m_tags
396 .get(serfs[1].local_id())
397 .map(|t| t.get("datacenter"))
398 == Some(Some(&"east-aws".into()))
399 {
400 cond2 = true;
401 }
402
403 let m2m = serfs[1].members().await;
404 let mut m2m_tags = HashMap::with_capacity(2);
405 for m in m2m {
406 m2m_tags.insert(m.node.id().clone(), m.tags.clone());
407 }
408
409 if m2m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
410 cond3 = true;
411 }
412
413 if m2m_tags
414 .get(serfs[1].local_id())
415 .map(|t| t.get("datacenter"))
416 == Some(Some(&"east-aws".into()))
417 {
418 cond4 = true;
419 }
420
421 if cond1 && cond2 && cond3 && cond4 {
422 break;
423 }
424
425 if start.elapsed() > Duration::from_secs(7) {
426 panic!("timed out");
427 }
428 }
429
430 test_events(
432 event_rx.rx,
433 node.id().clone(),
434 [
435 CrateEventType::Member(MemberEventType::Join),
436 CrateEventType::Member(MemberEventType::Update),
437 ]
438 .into_iter()
439 .collect(),
440 )
441 .await;
442
443 for s in serfs.iter() {
444 s.shutdown().await.unwrap();
445 }
446}
447
448pub async fn serf_num_nodes<T>(transport_opts1: T::Options, transport_opts2: T::Options)
450where
451 T: Transport,
452{
453 let s1 = Serf::<T>::new(transport_opts1, test_config())
454 .await
455 .unwrap();
456 let s2 = Serf::<T>::new(transport_opts2, test_config())
457 .await
458 .unwrap();
459
460 assert_eq!(s1.num_members().await, 1);
461
462 let serfs = [s1, s2];
463 wait_until_num_nodes(1, &serfs).await;
464
465 let node = serfs[1]
466 .inner
467 .memberlist
468 .advertise_node()
469 .map_address(MaybeResolvedAddress::resolved);
470 serfs[0].join(node.clone(), false).await.unwrap();
471
472 wait_until_num_nodes(2, &serfs).await;
473}
474
475pub async fn serf_coordinates<T>(
477 transport_opts1: T::Options,
478 transport_opts2: T::Options,
479 transport_opts3: T::Options,
480) where
481 T: Transport,
482{
483 const PROBE_INTERVAL: Duration = Duration::from_millis(2);
484
485 let opts = test_config()
486 .with_disable_coordinates(false)
487 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL));
488 let s1 = Serf::<T>::new(transport_opts1, opts.clone()).await.unwrap();
489 let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
490
491 let mut serfs = vec![s1, s2];
492 wait_until_num_nodes(1, &serfs).await;
493
494 let c1 = serfs[0].cooridate().unwrap();
497 let c2 = serfs[1].cooridate().unwrap();
498
499 const ZERO_THRESHOLD: f64 = 20.0e-6;
500
501 assert!(
502 c1.distance_to(&c2).as_secs_f64() <= ZERO_THRESHOLD,
503 "coordinates didn't start at the origin"
504 );
505
506 let node = serfs[1]
508 .inner
509 .memberlist
510 .advertise_node()
511 .map_address(MaybeResolvedAddress::resolved);
512 serfs[0].join(node.clone(), false).await.unwrap();
513
514 wait_until_num_nodes(2, &serfs).await;
515
516 let start = Epoch::now();
517 let mut cond1 = false;
518 let mut cond2 = false;
519 let mut cond3 = false;
520 let mut cond4 = false;
521 let s2id = serfs[1].local_id().clone();
522 let s1id = serfs[0].local_id().clone();
523 loop {
524 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
525
526 if serfs[0].cached_coordinate(&s2id.clone()).is_ok() {
529 cond1 = true;
530 } else if start.elapsed() > Duration::from_secs(7) {
531 panic!("s1 didn't get a coordinate for s2");
532 }
533
534 if serfs[1].cached_coordinate(&s1id.clone()).is_ok() {
535 cond2 = true;
536 } else if start.elapsed() > Duration::from_secs(7) {
537 panic!("s2 didn't get a coordinate for s1");
538 }
539
540 let c1 = serfs[0].cooridate().unwrap();
543 let c2 = serfs[1].cooridate().unwrap();
544
545 if c1.distance_to(&c2).as_secs_f64() >= ZERO_THRESHOLD {
546 cond3 = true;
547 } else if start.elapsed() > Duration::from_secs(7) {
548 panic!("coordinates didn't update after probes");
549 }
550
551 let c1c = serfs[0].cached_coordinate(&s1id.clone()).unwrap();
553 match c1c {
554 None => {
555 if start.elapsed() > Duration::from_secs(7) {
556 panic!("s1 didn't cache its own coordinate");
557 }
558 }
559 Some(c1c) => {
560 if c1 == c1c {
561 cond4 = true;
562 } else if start.elapsed() > Duration::from_secs(7) {
563 panic!("s1 coordinates are not equal");
564 }
565 }
566 }
567
568 if cond1 && cond2 && cond3 && cond4 {
569 break;
570 }
571
572 if start.elapsed() > Duration::from_secs(7) {
573 panic!(
574 "timed out cond1 {} cond2 {} cond3 {} cond4 {}",
575 cond1, cond2, cond3, cond4
576 );
577 }
578 }
579
580 serfs[1].shutdown().await.unwrap();
583 let t = serfs[1].inner.opts.reap_interval * 4;
584 drop(serfs.pop().unwrap());
585
586 <T::Runtime as RuntimeLite>::sleep(t).await;
587
588 wait_until_num_nodes(1, &serfs).await;
589
590 let start = Epoch::now();
591 loop {
592 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
593
594 if serfs[0].cached_coordinate(&s2id.clone()).unwrap().is_none() {
595 break;
596 }
597
598 if start.elapsed() > Duration::from_secs(7) {
599 panic!("s1 should have removed s2's cached coordinate");
600 }
601 }
602
603 let s3 = Serf::<T>::new(
605 transport_opts3,
606 test_config()
607 .with_disable_coordinates(true)
608 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
609 )
610 .await
611 .unwrap();
612
613 serfs.push(s3);
614 wait_until_num_nodes(1, &serfs).await;
615
616 let node = serfs[0]
617 .inner
618 .memberlist
619 .advertise_node()
620 .map_address(MaybeResolvedAddress::resolved);
621 serfs[1].join(node.clone(), false).await.unwrap();
622
623 wait_until_num_nodes(2, &serfs).await;
624
625 let start = Epoch::now();
626 let mut cond1 = false;
627 let mut cond2 = false;
628 loop {
629 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
630
631 if let Err(e) = serfs[1].cooridate() {
634 if e.to_string().contains("coordinates are disabled") {
635 cond1 = true;
636 }
637 }
638
639 if serfs[1].cached_coordinate(&s1id.clone()).is_err() {
640 cond2 = true;
641 }
642
643 if cond1 && cond2 {
644 break;
645 }
646
647 if start.elapsed() > Duration::from_secs(14) {
648 panic!("timed out: cond1 {} cond2 {}", cond1, cond2);
649 }
650 }
651
652 for s in serfs.iter() {
653 s.shutdown().await.unwrap();
654 }
655}
656
657pub async fn serf_name_resolution<T>(
662 transport_opts1: T::Options,
663 transport_opts2: T::Options,
664 transport_opts3: T::Options,
665 set_id: impl FnOnce(T::Options, T::Id) -> T::Options,
666) where
667 T: Transport,
668{
669 let s1 = Serf::<T>::new(transport_opts1, test_config())
670 .await
671 .unwrap();
672 let s2 = Serf::<T>::new(transport_opts2, test_config())
673 .await
674 .unwrap();
675 let s3 = Serf::<T>::new(
676 set_id(transport_opts3, s1.local_id().clone()),
677 test_config(),
678 )
679 .await
680 .unwrap();
681
682 let serfs = [s1, s2, s3];
683 wait_until_num_nodes(1, &serfs).await;
684
685 let node = serfs[1]
687 .inner
688 .memberlist
689 .advertise_node()
690 .map_address(MaybeResolvedAddress::resolved);
691 serfs[0].join(node.clone(), false).await.unwrap();
692
693 wait_until_num_nodes(2, &serfs[..2]).await;
694 wait_until_num_nodes(1, &serfs[2..]).await;
695
696 let node = serfs[2]
697 .inner
698 .memberlist
699 .advertise_node()
700 .map_address(MaybeResolvedAddress::resolved);
701 serfs[0].join(node.clone(), false).await.unwrap();
702
703 <T::Runtime as RuntimeLite>::sleep(serfs[0].default_query_timeout().await * 30).await;
705
706 let start = Epoch::now();
707 let mut cond1 = false;
708 let mut cond2 = false;
709 let mut cond3 = false;
710 loop {
711 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
713
714 if serfs[0].state() == SerfState::Alive {
715 cond1 = true;
716 }
717
718 if serfs[1].state() == SerfState::Alive {
719 cond2 = true;
720 }
721
722 if serfs[2].state() == SerfState::Shutdown {
723 cond3 = true;
724 }
725
726 if cond1 && cond2 && cond3 {
727 break;
728 }
729
730 if start.elapsed() > Duration::from_secs(14) {
731 println!("cond1 {cond1} cond2 {cond2} cond3 {cond3}");
732 panic!("timed out");
733 }
734 }
735
736 for s in serfs.iter() {
737 s.shutdown().await.unwrap();
738 }
739}
740
741pub async fn serf_local_member<T>(opts: T::Options)
743where
744 T: Transport,
745{
746 let s = Serf::<T>::new(opts, test_config()).await.unwrap();
747
748 let local = s.local_member().await;
749 assert_eq!(local.node.id(), s.local_id());
750
751 assert_eq!(local.tags, s.inner.opts.tags());
752 assert_eq!(local.status, MemberStatus::Alive);
753
754 let new_tags = [("foo", "bar"), ("test", "ing")]
755 .into_iter()
756 .collect::<Tags>();
757 s.set_tags(new_tags.clone()).await.unwrap();
758
759 let local = s.local_member().await;
760 assert_eq!(&*local.tags, &new_tags);
761}
762
763pub async fn serf_stats<T>(opts: T::Options)
765where
766 T: Transport,
767{
768 let s = Serf::<T>::new(opts, test_config()).await.unwrap();
769
770 let stats = s.stats().await;
771 assert_eq!(stats.get_event_queue(), 0);
772 assert_eq!(stats.get_event_time(), 1);
773 assert_eq!(stats.get_failed(), 0);
774 assert_eq!(stats.get_intent_queue(), 0);
775 assert_eq!(stats.get_left(), 0);
776 assert_eq!(stats.get_health_score(), 0);
777 assert_eq!(stats.get_member_time(), 1);
778 assert_eq!(stats.get_members(), 1);
779 assert!(!stats.get_encrypted());
780}
781
782#[cfg(feature = "encryption")]
784pub async fn serf_write_keyring_file<T>(
785 get_transport_opts: impl FnOnce(memberlist_core::types::SecretKey) -> T::Options,
786) where
787 T: Transport,
788{
789 use std::io::Read;
790
791 use base64::{engine::general_purpose, Engine as _};
792
793 const EXISTING: &str = "T9jncgl9mbLus+baTTa7q7nPSUrXwbDi2dhbtqir37s=";
794 const NEW_KEY: &str = "HvY8ubRZMgafUOWvrOadwOckVa1wN3QWAo46FVKbVN8=";
795
796 let td = tempfile::tempdir().unwrap();
797 let mut p = td.path().join("serf_write_keying_file");
798 p.set_extension("json");
799
800 let existing_bytes = general_purpose::STANDARD.decode(EXISTING).unwrap();
801 let sk = memberlist_core::types::SecretKey::try_from(existing_bytes.as_slice()).unwrap();
802
803 let serf = Serf::<T>::new(
804 get_transport_opts(sk),
805 test_config().with_keyring_file(Some(p.clone())),
806 )
807 .await
808 .unwrap();
809 assert!(
810 serf.encryption_enabled(),
811 "write keyring file test only works on encrypted serf"
812 );
813
814 let manager = serf.key_manager();
815 let new_key = general_purpose::STANDARD.decode(NEW_KEY).unwrap();
816 let new_sk = memberlist_core::types::SecretKey::try_from(new_key.as_slice()).unwrap();
817 manager.install_key(new_sk, None).await.unwrap();
818
819 let mut keyring_file = std::fs::File::open(&p).unwrap();
820 let mut s = String::new();
821 keyring_file.read_to_string(&mut s).unwrap();
822
823 let lines = s.split('\n').collect::<Vec<_>>();
824 assert_eq!(lines.len(), 4);
825
826 assert!(s.contains(EXISTING));
828 assert!(s.contains(NEW_KEY));
829
830 assert!(lines[1].contains(EXISTING));
834
835 manager.use_key(new_sk, None).await.unwrap();
837
838 let mut keyring_file = std::fs::File::open(&p).unwrap();
839 let mut s = String::new();
840 keyring_file.read_to_string(&mut s).unwrap();
841
842 let lines = s.split('\n').collect::<Vec<_>>();
843 assert_eq!(lines.len(), 4);
844
845 assert!(lines[1].contains(NEW_KEY));
847
848 manager.remove_key(sk, None).await.unwrap();
850
851 let mut keyring_file = std::fs::File::open(&p).unwrap();
852 let mut s = String::new();
853 keyring_file.read_to_string(&mut s).unwrap();
854
855 let lines = s.split('\n').collect::<Vec<_>>();
856 assert_eq!(lines.len(), 3);
858
859 assert!(lines[1].contains(NEW_KEY));
860
861 let resp = manager.list_keys().await.unwrap();
862 assert_eq!(resp.primary_keys().len(), 1);
863 assert_eq!(resp.keys().len(), 1);
864}
865
866#[test]
867fn test_recent_intent() {
868 assert!(recent_intent::<SmolStr>(&HashMap::new(), &"foo".into(), MessageType::Join).is_none());
869
870 let now = Epoch::now();
871 let expire = || now - Duration::from_secs(2);
872 let save = || now;
873
874 let mut intents = HashMap::<SmolStr, _>::new();
875 assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
876
877 assert!(upsert_intent(
878 &mut intents,
879 &"foo".into(),
880 MessageType::Join,
881 1.into(),
882 expire
883 ));
884 assert!(upsert_intent(
885 &mut intents,
886 &"bar".into(),
887 MessageType::Leave,
888 2.into(),
889 expire
890 ));
891 assert!(upsert_intent(
892 &mut intents,
893 &"baz".into(),
894 MessageType::Join,
895 3.into(),
896 save
897 ));
898 assert!(upsert_intent(
899 &mut intents,
900 &"bar".into(),
901 MessageType::Join,
902 4.into(),
903 expire
904 ));
905 assert!(!upsert_intent(
906 &mut intents,
907 &"bar".into(),
908 MessageType::Join,
909 0.into(),
910 expire
911 ));
912 assert!(upsert_intent(
913 &mut intents,
914 &"bar".into(),
915 MessageType::Join,
916 5.into(),
917 expire
918 ));
919
920 let ltime = recent_intent(&intents, &"foo".into(), MessageType::Join).unwrap();
921 assert_eq!(ltime, 1.into());
922
923 let ltime = recent_intent(&intents, &"bar".into(), MessageType::Join).unwrap();
924 assert_eq!(ltime, 5.into());
925
926 let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
927 assert_eq!(ltime, 3.into());
928
929 assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
930
931 reap_intents(&mut intents, Epoch::now(), Duration::from_secs(1));
932 assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
933 assert!(recent_intent(&intents, &"bar".into(), MessageType::Join).is_none());
934 let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
935 assert_eq!(ltime, 3.into());
936 assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
937 reap_intents(
938 &mut intents,
939 Epoch::now() + Duration::from_secs(2),
940 Duration::from_secs(1),
941 );
942 assert!(recent_intent(&intents, &"baz".into(), MessageType::Join).is_none());
943}