1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use memberlist_core::{tests::AnyError, transport::Id};
4
5use crate::{
6 event::EventProducer,
7 options::MemberlistOptions,
8 types::{Member, MemberState, MemberStatus, Tags},
9};
10
11use super::*;
12
13pub mod event;
15
16pub mod leave;
18
19pub mod join;
21
22pub mod delegate;
24
25pub mod reconnect;
27
28pub mod remove;
30
31pub mod reap;
33
34pub mod snapshot;
36
37fn test_member_status<I: Id, A>(
38 members: &HashMap<I, MemberState<I, A>>,
39 id: I,
40 status: MemberStatus,
41) -> Result<(), AnyError> {
42 for member in members.values() {
43 if id.eq(member.member.node.id()) {
44 if member.member.status != status {
45 return Err(AnyError::from(format!(
46 "expected member {} to have status {:?}, got {:?}",
47 id, status, member.member.status
48 )));
49 }
50 return Ok(());
51 }
52 }
53 Err(AnyError::from(format!("member {} not found", id)))
54}
55
56pub async fn serf_get_queue_max<T>(
58 transport_opts: T::Options,
59 mut get_addr: impl FnMut(usize) -> T::ResolvedAddress,
60) where
61 T: Transport<Id = SmolStr>,
62 T::Options: Clone,
63{
64 let s = Serf::<T>::new(transport_opts.clone(), test_config())
65 .await
66 .unwrap();
67
68 {
71 let mut members = s.inner.members.write().await;
72 members.states.clear();
73 for i in 0..100 {
74 let name: SmolStr = format!("Member{i}").into();
75 members.states.insert(
76 name.clone(),
77 MemberState {
78 member: Member::new(
79 Node::new(name.clone(), get_addr(i)),
80 Default::default(),
81 MemberStatus::Alive,
82 ),
83 status_time: 0.into(),
84 leave_time: None,
85 },
86 );
87 }
88 }
89
90 let got = s.get_queue_max().await;
92 let want = 4096;
93 assert_eq!(got, want);
94
95 s.shutdown().await.unwrap();
97 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
98
99 let sn = Serf::<T>::new(
100 transport_opts.clone(),
101 test_config().with_min_queue_depth(1024),
102 )
103 .await
104 .unwrap();
105
106 {
107 let mut members = sn.inner.members.write().await;
108 members.states.clear();
109 let old_members = s.inner.members.read().await;
110 members.states.clone_from(&old_members.states);
111 }
112
113 let got = sn.get_queue_max().await;
114 let want = 1024;
115 assert_eq!(got, want);
116
117 sn.shutdown().await.unwrap();
118 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
119
120 let snn = Serf::<T>::new(transport_opts, test_config().with_min_queue_depth(16))
123 .await
124 .unwrap();
125
126 {
127 let mut members = snn.inner.members.write().await;
128 members.states.clear();
129 let old_members = sn.inner.members.read().await;
130 members.states.clone_from(&old_members.states);
131 }
132
133 let got = snn.get_queue_max().await;
134 let want = 200;
135 assert_eq!(got, want);
136
137 {
139 let mut members = snn.inner.members.write().await;
140 let name = SmolStr::new("another");
141 members.states.insert(
142 name.clone(),
143 MemberState {
144 member: Member::new(
145 Node::new(name.clone(), get_addr(10000)),
146 Default::default(),
147 MemberStatus::Alive,
148 ),
149 status_time: 0.into(),
150 leave_time: None,
151 },
152 );
153 }
154
155 let got = snn.get_queue_max().await;
156 let want = 202;
157 assert_eq!(got, want);
158 snn.shutdown().await.unwrap();
159 drop(snn);
160}
161
162pub async fn serf_update<T, F>(
164 transport_opts1: T::Options,
165 transport_opts2: T::Options,
166 get_transport: impl FnOnce(T::Id, T::ResolvedAddress) -> F + Copy,
167) where
168 T: Transport,
169 T::Options: Clone,
170 F: core::future::Future<Output = T::Options>,
171{
172 let (event_tx, event_rx) = EventProducer::bounded(64);
173 let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
174 .await
175 .unwrap();
176 let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
177 .await
178 .unwrap();
179 let (s2id, s2addr) = s2.advertise_node().into_components();
180
181 let mut serfs = vec![s1, s2];
182 wait_until_num_nodes(1, &serfs).await;
183
184 let node = serfs[1]
185 .inner
186 .memberlist
187 .advertise_node()
188 .map_address(MaybeResolvedAddress::resolved);
189 serfs[0].join(node.address().clone(), false).await.unwrap();
190
191 wait_until_num_nodes(2, &serfs).await;
192 serfs[1].shutdown().await.unwrap();
194 drop(serfs.pop().unwrap());
195
196 let start = Epoch::now();
198 let s2 = loop {
199 match Serf::<T>::new(
200 get_transport(s2id.clone(), s2addr.clone()).await,
201 test_config().with_tags([("foo", "bar")].into_iter()),
202 )
203 .await
204 {
205 Ok(s) => break s,
206 Err(e) => {
207 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(1)).await;
208 if start.elapsed() > Duration::from_secs(20) {
209 panic!("timed out: {}", e);
210 }
211 }
212 }
213 };
214
215 let s1node = serfs[0].advertise_node();
216 s2.join(
217 s1node
218 .map_address(MaybeResolvedAddress::resolved)
219 .address()
220 .clone(),
221 false,
222 )
223 .await
224 .unwrap();
225 serfs.push(s2);
226 wait_until_num_nodes(2, &serfs).await;
227
228 test_events(
229 event_rx.rx,
230 node.id().clone(),
231 [
232 CrateEventType::Member(MemberEventType::Join),
233 CrateEventType::Member(MemberEventType::Update),
234 ]
235 .into_iter()
236 .collect(),
237 )
238 .await;
239
240 let mut found = false;
242 let members = serfs[0].inner.members.read().await;
243
244 for member in members.states.values() {
245 if member.member.node.id().eq(node.id())
246 && member.member.tags().get("foo").map(|v| v.as_str()) == Some("bar")
247 {
248 found = true;
249 break;
250 }
251 }
252 assert!(found, "did not found s2 in members");
253
254 for s in serfs.iter() {
255 s.shutdown().await.unwrap();
256 }
257}
258
259pub async fn serf_role<T>(transport_opts1: T::Options, transport_opts2: T::Options)
261where
262 T: Transport,
263{
264 let s1 = Serf::<T>::new(
265 transport_opts1,
266 test_config().with_tags([("role", "web")].into_iter()),
267 )
268 .await
269 .unwrap();
270 let s2 = Serf::<T>::new(
271 transport_opts2,
272 test_config().with_tags([("role", "lb")].into_iter()),
273 )
274 .await
275 .unwrap();
276
277 let serfs = [s1, s2];
278 wait_until_num_nodes(1, &serfs).await;
279
280 let node = serfs[1]
281 .inner
282 .memberlist
283 .advertise_node()
284 .map_address(MaybeResolvedAddress::resolved);
285 serfs[0].join(node.address().clone(), false).await.unwrap();
286
287 wait_until_num_nodes(2, &serfs).await;
288
289 let mut roles = HashMap::new();
290
291 let start = Epoch::now();
292 let mut cond1 = false;
293 let mut cond2 = false;
294 loop {
295 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
296
297 let members = serfs[0].inner.members.read().await;
298 for m in members.states.values() {
299 roles.insert(
300 m.member.node.id().clone(),
301 m.member.tags().get("role").cloned().unwrap(),
302 );
303 }
304
305 if let Some(role) = roles.get(node.id()) {
306 if role == "lb" {
307 cond1 = true;
308 }
309 }
310
311 if let Some(role) = roles.get(serfs[0].local_id()) {
312 if role == "web" {
313 cond2 = true;
314 }
315 }
316
317 if cond1 && cond2 {
318 break;
319 }
320
321 if start.elapsed() > Duration::from_secs(7) {
322 panic!("timed out");
323 }
324 }
325}
326
327pub async fn serf_state<T>(transport_opts1: T::Options)
329where
330 T: Transport,
331{
332 let s1 = Serf::<T>::new(transport_opts1, test_config())
333 .await
334 .unwrap();
335
336 assert_eq!(s1.state(), SerfState::Alive);
337
338 s1.leave().await.unwrap();
339
340 assert_eq!(s1.state(), SerfState::Left);
341
342 s1.shutdown().await.unwrap();
343
344 assert_eq!(s1.state(), SerfState::Shutdown);
345}
346
347pub async fn serf_set_tags<T>(transport_opts1: T::Options, transport_opts2: T::Options)
349where
350 T: Transport,
351{
352 let (event_tx, event_rx) = EventProducer::bounded(4);
353 let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
354 .await
355 .unwrap();
356 let s2 = Serf::<T>::new(transport_opts2, test_config())
357 .await
358 .unwrap();
359
360 let serfs = [s1, s2];
361
362 wait_until_num_nodes(1, &serfs).await;
363
364 let node = serfs[1]
365 .inner
366 .memberlist
367 .advertise_node()
368 .map_address(MaybeResolvedAddress::resolved);
369 serfs[0].join(node.address().clone(), false).await.unwrap();
370
371 wait_until_num_nodes(2, &serfs).await;
372
373 serfs[0]
375 .set_tags([("port", "8080")].into_iter().collect())
376 .await
377 .unwrap();
378
379 serfs[1]
380 .set_tags([("datacenter", "east-aws")].into_iter().collect())
381 .await
382 .unwrap();
383
384 let start = Epoch::now();
385 let mut cond1 = false;
386 let mut cond2 = false;
387 let mut cond3 = false;
388 let mut cond4 = false;
389
390 loop {
391 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
392
393 let m1m = serfs[0].members().await;
394 let mut m1m_tags = HashMap::with_capacity(2);
395 for m in m1m {
396 m1m_tags.insert(m.node.id().clone(), m.tags.clone());
397 }
398
399 if m1m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
400 cond1 = true;
401 }
402
403 if m1m_tags
404 .get(serfs[1].local_id())
405 .map(|t| t.get("datacenter"))
406 == Some(Some(&"east-aws".into()))
407 {
408 cond2 = true;
409 }
410
411 let m2m = serfs[1].members().await;
412 let mut m2m_tags = HashMap::with_capacity(2);
413 for m in m2m {
414 m2m_tags.insert(m.node.id().clone(), m.tags.clone());
415 }
416
417 if m2m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
418 cond3 = true;
419 }
420
421 if m2m_tags
422 .get(serfs[1].local_id())
423 .map(|t| t.get("datacenter"))
424 == Some(Some(&"east-aws".into()))
425 {
426 cond4 = true;
427 }
428
429 if cond1 && cond2 && cond3 && cond4 {
430 break;
431 }
432
433 if start.elapsed() > Duration::from_secs(7) {
434 panic!("timed out");
435 }
436 }
437
438 test_events(
440 event_rx.rx,
441 node.id().clone(),
442 [
443 CrateEventType::Member(MemberEventType::Join),
444 CrateEventType::Member(MemberEventType::Update),
445 ]
446 .into_iter()
447 .collect(),
448 )
449 .await;
450
451 for s in serfs.iter() {
452 s.shutdown().await.unwrap();
453 }
454}
455
456pub async fn serf_num_nodes<T>(transport_opts1: T::Options, transport_opts2: T::Options)
458where
459 T: Transport,
460{
461 let s1 = Serf::<T>::new(transport_opts1, test_config())
462 .await
463 .unwrap();
464 let s2 = Serf::<T>::new(transport_opts2, test_config())
465 .await
466 .unwrap();
467
468 assert_eq!(s1.num_members().await, 1);
469
470 let serfs = [s1, s2];
471 wait_until_num_nodes(1, &serfs).await;
472
473 let node = serfs[1]
474 .inner
475 .memberlist
476 .advertise_node()
477 .map_address(MaybeResolvedAddress::resolved);
478 serfs[0].join(node.address().clone(), false).await.unwrap();
479
480 wait_until_num_nodes(2, &serfs).await;
481}
482
483pub async fn serf_coordinates<T>(
485 transport_opts1: T::Options,
486 transport_opts2: T::Options,
487 transport_opts3: T::Options,
488) where
489 T: Transport,
490{
491 const PROBE_INTERVAL: Duration = Duration::from_millis(2);
492
493 let opts = test_config()
494 .with_disable_coordinates(false)
495 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL));
496 let s1 = Serf::<T>::new(transport_opts1, opts.clone()).await.unwrap();
497 let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
498
499 let mut serfs = vec![s1, s2];
500 wait_until_num_nodes(1, &serfs).await;
501
502 let c1 = serfs[0].cooridate().unwrap();
505 let c2 = serfs[1].cooridate().unwrap();
506
507 const ZERO_THRESHOLD: f64 = 20.0e-6;
508
509 assert!(
510 c1.distance_to(&c2).as_secs_f64() <= ZERO_THRESHOLD,
511 "coordinates didn't start at the origin"
512 );
513
514 let node = serfs[1]
516 .inner
517 .memberlist
518 .advertise_node()
519 .map_address(MaybeResolvedAddress::resolved);
520 serfs[0].join(node.address().clone(), false).await.unwrap();
521
522 wait_until_num_nodes(2, &serfs).await;
523
524 let start = Epoch::now();
525 let mut cond1 = false;
526 let mut cond2 = false;
527 let mut cond3 = false;
528 let mut cond4 = false;
529 let s2id = serfs[1].local_id().clone();
530 let s1id = serfs[0].local_id().clone();
531 loop {
532 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
533
534 if serfs[0].cached_coordinate(&s2id.clone()).is_ok() {
537 cond1 = true;
538 } else if start.elapsed() > Duration::from_secs(7) {
539 panic!("s1 didn't get a coordinate for s2");
540 }
541
542 if serfs[1].cached_coordinate(&s1id.clone()).is_ok() {
543 cond2 = true;
544 } else if start.elapsed() > Duration::from_secs(7) {
545 panic!("s2 didn't get a coordinate for s1");
546 }
547
548 let c1 = serfs[0].cooridate().unwrap();
551 let c2 = serfs[1].cooridate().unwrap();
552
553 if c1.distance_to(&c2).as_secs_f64() >= ZERO_THRESHOLD {
554 cond3 = true;
555 } else if start.elapsed() > Duration::from_secs(7) {
556 panic!("coordinates didn't update after probes");
557 }
558
559 let c1c = serfs[0].cached_coordinate(&s1id.clone()).unwrap();
561 match c1c {
562 None => {
563 if start.elapsed() > Duration::from_secs(7) {
564 panic!("s1 didn't cache its own coordinate");
565 }
566 }
567 Some(c1c) => {
568 if c1 == c1c {
569 cond4 = true;
570 } else if start.elapsed() > Duration::from_secs(7) {
571 panic!("s1 coordinates are not equal");
572 }
573 }
574 }
575
576 if cond1 && cond2 && cond3 && cond4 {
577 break;
578 }
579
580 if start.elapsed() > Duration::from_secs(7) {
581 panic!(
582 "timed out cond1 {} cond2 {} cond3 {} cond4 {}",
583 cond1, cond2, cond3, cond4
584 );
585 }
586 }
587
588 serfs[1].shutdown().await.unwrap();
591 let t = serfs[1].inner.opts.reap_interval * 4;
592 drop(serfs.pop().unwrap());
593
594 <T::Runtime as RuntimeLite>::sleep(t).await;
595
596 wait_until_num_nodes(1, &serfs).await;
597
598 let start = Epoch::now();
599 loop {
600 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
601
602 if serfs[0].cached_coordinate(&s2id.clone()).unwrap().is_none() {
603 break;
604 }
605
606 if start.elapsed() > Duration::from_secs(7) {
607 panic!("s1 should have removed s2's cached coordinate");
608 }
609 }
610
611 let s3 = Serf::<T>::new(
613 transport_opts3,
614 test_config()
615 .with_disable_coordinates(true)
616 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
617 )
618 .await
619 .unwrap();
620
621 serfs.push(s3);
622 wait_until_num_nodes(1, &serfs).await;
623
624 let node = serfs[0]
625 .inner
626 .memberlist
627 .advertise_node()
628 .map_address(MaybeResolvedAddress::resolved);
629 serfs[1].join(node.address().clone(), false).await.unwrap();
630
631 wait_until_num_nodes(2, &serfs).await;
632
633 let start = Epoch::now();
634 let mut cond1 = false;
635 let mut cond2 = false;
636 loop {
637 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
638
639 if let Err(e) = serfs[1].cooridate() {
642 if e.to_string().contains("coordinates are disabled") {
643 cond1 = true;
644 }
645 }
646
647 if serfs[1].cached_coordinate(&s1id.clone()).is_err() {
648 cond2 = true;
649 }
650
651 if cond1 && cond2 {
652 break;
653 }
654
655 if start.elapsed() > Duration::from_secs(14) {
656 panic!("timed out: cond1 {} cond2 {}", cond1, cond2);
657 }
658 }
659
660 for s in serfs.iter() {
661 s.shutdown().await.unwrap();
662 }
663}
664
665pub async fn serf_name_resolution<T>(
670 transport_opts1: T::Options,
671 transport_opts2: T::Options,
672 transport_opts3: T::Options,
673 set_id: impl FnOnce(T::Options, T::Id) -> T::Options,
674) where
675 T: Transport,
676{
677 let s1 = Serf::<T>::new(transport_opts1, test_config())
678 .await
679 .unwrap();
680 let s2 = Serf::<T>::new(transport_opts2, test_config())
681 .await
682 .unwrap();
683 let s3 = Serf::<T>::new(
684 set_id(transport_opts3, s1.local_id().clone()),
685 test_config(),
686 )
687 .await
688 .unwrap();
689
690 let serfs = [s1, s2, s3];
691 wait_until_num_nodes(1, &serfs).await;
692
693 let node = serfs[1]
695 .inner
696 .memberlist
697 .advertise_node()
698 .map_address(MaybeResolvedAddress::resolved);
699 serfs[0].join(node.address().clone(), false).await.unwrap();
700
701 wait_until_num_nodes(2, &serfs[..2]).await;
702 wait_until_num_nodes(1, &serfs[2..]).await;
703
704 let node = serfs[2]
705 .inner
706 .memberlist
707 .advertise_node()
708 .map_address(MaybeResolvedAddress::resolved);
709 serfs[0].join(node.address().clone(), false).await.unwrap();
710
711 <T::Runtime as RuntimeLite>::sleep(serfs[0].default_query_timeout().await * 30).await;
713
714 let start = Epoch::now();
715 let mut cond1 = false;
716 let mut cond2 = false;
717 let mut cond3 = false;
718 loop {
719 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
721
722 if serfs[0].state() == SerfState::Alive {
723 cond1 = true;
724 }
725
726 if serfs[1].state() == SerfState::Alive {
727 cond2 = true;
728 }
729
730 if serfs[2].state() == SerfState::Shutdown {
731 cond3 = true;
732 }
733
734 if cond1 && cond2 && cond3 {
735 break;
736 }
737
738 if start.elapsed() > Duration::from_secs(14) {
739 println!("cond1 {cond1} cond2 {cond2} cond3 {cond3}");
740 panic!("timed out");
741 }
742 }
743
744 for s in serfs.iter() {
745 s.shutdown().await.unwrap();
746 }
747}
748
749pub async fn serf_local_member<T>(opts: T::Options)
751where
752 T: Transport,
753{
754 let s = Serf::<T>::new(opts, test_config()).await.unwrap();
755
756 let local = s.local_member().await;
757 assert_eq!(local.node.id(), s.local_id());
758
759 assert_eq!(local.tags, s.inner.opts.tags());
760 assert_eq!(local.status, MemberStatus::Alive);
761
762 let new_tags = [("foo", "bar"), ("test", "ing")]
763 .into_iter()
764 .collect::<Tags>();
765 s.set_tags(new_tags.clone()).await.unwrap();
766
767 let local = s.local_member().await;
768 assert_eq!(&*local.tags, &new_tags);
769}
770
771pub async fn serf_stats<T>(opts: T::Options)
773where
774 T: Transport,
775{
776 let s = Serf::<T>::new(opts, test_config()).await.unwrap();
777
778 let stats = s.stats().await;
779 assert_eq!(stats.get_event_queue(), 0);
780 assert_eq!(stats.get_event_time(), 1);
781 assert_eq!(stats.get_failed(), 0);
782 assert_eq!(stats.get_intent_queue(), 0);
783 assert_eq!(stats.get_left(), 0);
784 assert_eq!(stats.get_health_score(), 0);
785 assert_eq!(stats.get_member_time(), 1);
786 assert_eq!(stats.get_members(), 1);
787 assert!(!stats.get_encrypted());
788}
789
790#[cfg(feature = "encryption")]
792pub async fn serf_write_keyring_file<T>(
793 get_transport_opts: impl FnOnce(memberlist_core::proto::SecretKey) -> (T::Options, MemberlistOptions),
794) where
795 T: Transport,
796{
797 use std::io::Read;
798
799 const EXISTING: &str = "T9jncgl9mbLus+baTTa7q7nPSUrXwbDi2dhbtqir37s=";
800 const NEW_KEY: &str = "HvY8ubRZMgafUOWvrOadwOckVa1wN3QWAo46FVKbVN8=";
801
802 let td = tempfile::tempdir().unwrap();
803 let mut p = td.path().join("serf_write_keying_file");
804 p.set_extension("json");
805
806 let sk = crate::types::SecretKey::try_from(EXISTING).unwrap();
807
808 let (topts, mopts) = get_transport_opts(sk);
809 let serf = Serf::<T>::new(
810 topts,
811 test_config()
812 .with_keyring_file(Some(p.clone()))
813 .with_memberlist_options(mopts),
814 )
815 .await
816 .unwrap();
817 assert!(
818 serf.encryption_enabled(),
819 "write keyring file test only works on encrypted serf"
820 );
821
822 let manager = serf.key_manager();
823 let new_sk = crate::types::SecretKey::try_from(NEW_KEY).unwrap();
824 manager.install_key(new_sk, None).await.unwrap();
825
826 let mut keyring_file = std::fs::File::open(&p).unwrap();
827 let mut s = String::new();
828 keyring_file.read_to_string(&mut s).unwrap();
829
830 let lines = s.split('\n').collect::<Vec<_>>();
831 assert_eq!(lines.len(), 4);
832
833 assert!(s.contains(EXISTING));
835 assert!(s.contains(NEW_KEY));
836
837 assert!(lines[1].contains(EXISTING));
841
842 manager.use_key(new_sk, None).await.unwrap();
844
845 let mut keyring_file = std::fs::File::open(&p).unwrap();
846 let mut s = String::new();
847 keyring_file.read_to_string(&mut s).unwrap();
848
849 let lines = s.split('\n').collect::<Vec<_>>();
850 assert_eq!(lines.len(), 4);
851
852 assert!(lines[1].contains(NEW_KEY));
854
855 manager.remove_key(sk, None).await.unwrap();
857
858 let mut keyring_file = std::fs::File::open(&p).unwrap();
859 let mut s = String::new();
860 keyring_file.read_to_string(&mut s).unwrap();
861
862 let lines = s.split('\n').collect::<Vec<_>>();
863 assert_eq!(lines.len(), 3);
865
866 assert!(lines[1].contains(NEW_KEY));
867
868 let resp = manager.list_keys().await.unwrap();
869 assert_eq!(resp.primary_keys().len(), 1);
870 assert_eq!(resp.keys().len(), 1);
871}
872
873#[test]
874fn test_recent_intent() {
875 assert!(recent_intent::<SmolStr>(&HashMap::new(), &"foo".into(), MessageType::Join).is_none());
876
877 let now = Epoch::now();
878 let expire = || now - Duration::from_secs(2);
879 let save = || now;
880
881 let mut intents = HashMap::<SmolStr, _>::new();
882 assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
883
884 assert!(upsert_intent(
885 &mut intents,
886 &"foo".into(),
887 MessageType::Join,
888 1.into(),
889 expire
890 ));
891 assert!(upsert_intent(
892 &mut intents,
893 &"bar".into(),
894 MessageType::Leave,
895 2.into(),
896 expire
897 ));
898 assert!(upsert_intent(
899 &mut intents,
900 &"baz".into(),
901 MessageType::Join,
902 3.into(),
903 save
904 ));
905 assert!(upsert_intent(
906 &mut intents,
907 &"bar".into(),
908 MessageType::Join,
909 4.into(),
910 expire
911 ));
912 assert!(!upsert_intent(
913 &mut intents,
914 &"bar".into(),
915 MessageType::Join,
916 0.into(),
917 expire
918 ));
919 assert!(upsert_intent(
920 &mut intents,
921 &"bar".into(),
922 MessageType::Join,
923 5.into(),
924 expire
925 ));
926
927 let ltime = recent_intent(&intents, &"foo".into(), MessageType::Join).unwrap();
928 assert_eq!(ltime, 1.into());
929
930 let ltime = recent_intent(&intents, &"bar".into(), MessageType::Join).unwrap();
931 assert_eq!(ltime, 5.into());
932
933 let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
934 assert_eq!(ltime, 3.into());
935
936 assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
937
938 reap_intents(&mut intents, Epoch::now(), Duration::from_secs(1));
939 assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
940 assert!(recent_intent(&intents, &"bar".into(), MessageType::Join).is_none());
941 let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
942 assert_eq!(ltime, 3.into());
943 assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
944 reap_intents(
945 &mut intents,
946 Epoch::now() + Duration::from_secs(2),
947 Duration::from_secs(1),
948 );
949 assert!(recent_intent(&intents, &"baz".into(), MessageType::Join).is_none());
950}