1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use memberlist_core::{tests::AnyError, transport::Id};
4
5use crate::{
6 event::EventProducer,
7 options::MemberlistOptions,
8 types::{Member, MemberState, MemberStatus, Tags},
9};
10
11use super::*;
12
13pub mod event;
15
16pub mod leave;
18
19pub mod join;
21
22pub mod delegate;
24
25pub mod reconnect;
27
28pub mod remove;
30
31pub mod reap;
33
34pub mod snapshot;
36
37fn test_member_status<I: Id, A>(
38 members: &HashMap<I, MemberState<I, A>>,
39 id: I,
40 status: MemberStatus,
41) -> Result<(), AnyError> {
42 for member in members.values() {
43 if id.eq(member.member.node.id()) {
44 if member.member.status != status {
45 return Err(AnyError::from(format!(
46 "expected member {} to have status {:?}, got {:?}",
47 id, status, member.member.status
48 )));
49 }
50 return Ok(());
51 }
52 }
53 Err(AnyError::from(format!("member {} not found", id)))
54}
55
56pub async fn serf_get_queue_max<T>(
58 transport_opts: T::Options,
59 mut get_addr: impl FnMut(usize) -> T::ResolvedAddress,
60) where
61 T: Transport<Id = SmolStr>,
62 T::Options: Clone,
63{
64 let s = Serf::<T>::new(transport_opts.clone(), test_config())
65 .await
66 .unwrap();
67
68 {
71 let mut members = s.inner.members.write().await;
72 members.states.clear();
73 for i in 0..100 {
74 let name: SmolStr = format!("Member{i}").into();
75 members.states.insert(
76 name.clone(),
77 MemberState {
78 member: Member::new(
79 Node::new(name.clone(), get_addr(i)),
80 Default::default(),
81 MemberStatus::Alive,
82 ),
83 status_time: 0.into(),
84 leave_time: None,
85 },
86 );
87 }
88 }
89
90 let got = s.get_queue_max().await;
92 let want = 4096;
93 assert_eq!(got, want);
94
95 s.shutdown().await.unwrap();
97 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
98
99 let sn = Serf::<T>::new(
100 transport_opts.clone(),
101 test_config().with_min_queue_depth(1024),
102 )
103 .await
104 .unwrap();
105
106 {
107 let mut members = sn.inner.members.write().await;
108 members.states.clear();
109 let old_members = s.inner.members.read().await;
110 members.states.clone_from(&old_members.states);
111 }
112
113 let got = sn.get_queue_max().await;
114 let want = 1024;
115 assert_eq!(got, want);
116
117 sn.shutdown().await.unwrap();
118 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(2)).await;
119
120 let snn = Serf::<T>::new(transport_opts, test_config().with_min_queue_depth(16))
123 .await
124 .unwrap();
125
126 {
127 let mut members = snn.inner.members.write().await;
128 members.states.clear();
129 let old_members = sn.inner.members.read().await;
130 members.states.clone_from(&old_members.states);
131 }
132
133 let got = snn.get_queue_max().await;
134 let want = 200;
135 assert_eq!(got, want);
136
137 {
139 let mut members = snn.inner.members.write().await;
140 let name = SmolStr::new("another");
141 members.states.insert(
142 name.clone(),
143 MemberState {
144 member: Member::new(
145 Node::new(name.clone(), get_addr(10000)),
146 Default::default(),
147 MemberStatus::Alive,
148 ),
149 status_time: 0.into(),
150 leave_time: None,
151 },
152 );
153 }
154
155 let got = snn.get_queue_max().await;
156 let want = 202;
157 assert_eq!(got, want);
158 snn.shutdown().await.unwrap();
159 drop(snn);
160}
161
162pub async fn serf_update<T, F>(
164 transport_opts1: T::Options,
165 transport_opts2: T::Options,
166 get_transport: impl FnOnce(T::Id, T::ResolvedAddress) -> F + Copy,
167) where
168 T: Transport,
169 T::Options: Clone,
170 F: core::future::Future<Output = T::Options>,
171{
172 let (event_tx, event_rx) = EventProducer::bounded(64);
173 let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
174 .await
175 .unwrap();
176 let s2 = Serf::<T>::new(transport_opts2.clone(), test_config())
177 .await
178 .unwrap();
179 let (s2id, s2addr) = s2.advertise_node().into_components();
180
181 let mut serfs = vec![s1, s2];
182 wait_until_num_nodes(1, &serfs).await;
183
184 let node = serfs[1]
185 .inner
186 .memberlist
187 .advertise_node()
188 .map_address(MaybeResolvedAddress::resolved);
189 serfs[0].join(node.clone(), false).await.unwrap();
190
191 wait_until_num_nodes(2, &serfs).await;
192 serfs[1].shutdown().await.unwrap();
194 drop(serfs.pop().unwrap());
195
196 let start = Epoch::now();
198 let s2 = loop {
199 match Serf::<T>::new(
200 get_transport(s2id.clone(), s2addr.clone()).await,
201 test_config().with_tags([("foo", "bar")].into_iter()),
202 )
203 .await
204 {
205 Ok(s) => break s,
206 Err(e) => {
207 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(1)).await;
208 if start.elapsed() > Duration::from_secs(20) {
209 panic!("timed out: {}", e);
210 }
211 }
212 }
213 };
214
215 let s1node = serfs[0].advertise_node();
216 s2.join(s1node.map_address(MaybeResolvedAddress::resolved), false)
217 .await
218 .unwrap();
219 serfs.push(s2);
220 wait_until_num_nodes(2, &serfs).await;
221
222 test_events(
223 event_rx.rx,
224 node.id().clone(),
225 [
226 CrateEventType::Member(MemberEventType::Join),
227 CrateEventType::Member(MemberEventType::Update),
228 ]
229 .into_iter()
230 .collect(),
231 )
232 .await;
233
234 let mut found = false;
236 let members = serfs[0].inner.members.read().await;
237
238 for member in members.states.values() {
239 if member.member.node.id().eq(node.id())
240 && member.member.tags().get("foo").map(|v| v.as_str()) == Some("bar")
241 {
242 found = true;
243 break;
244 }
245 }
246 assert!(found, "did not found s2 in members");
247
248 for s in serfs.iter() {
249 s.shutdown().await.unwrap();
250 }
251}
252
253pub async fn serf_role<T>(transport_opts1: T::Options, transport_opts2: T::Options)
255where
256 T: Transport,
257{
258 let s1 = Serf::<T>::new(
259 transport_opts1,
260 test_config().with_tags([("role", "web")].into_iter()),
261 )
262 .await
263 .unwrap();
264 let s2 = Serf::<T>::new(
265 transport_opts2,
266 test_config().with_tags([("role", "lb")].into_iter()),
267 )
268 .await
269 .unwrap();
270
271 let serfs = [s1, s2];
272 wait_until_num_nodes(1, &serfs).await;
273
274 let node = serfs[1]
275 .inner
276 .memberlist
277 .advertise_node()
278 .map_address(MaybeResolvedAddress::resolved);
279 serfs[0].join(node.clone(), false).await.unwrap();
280
281 wait_until_num_nodes(2, &serfs).await;
282
283 let mut roles = HashMap::new();
284
285 let start = Epoch::now();
286 let mut cond1 = false;
287 let mut cond2 = false;
288 loop {
289 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
290
291 let members = serfs[0].inner.members.read().await;
292 for m in members.states.values() {
293 roles.insert(
294 m.member.node.id().clone(),
295 m.member.tags().get("role").cloned().unwrap(),
296 );
297 }
298
299 if let Some(role) = roles.get(node.id()) {
300 if role == "lb" {
301 cond1 = true;
302 }
303 }
304
305 if let Some(role) = roles.get(serfs[0].local_id()) {
306 if role == "web" {
307 cond2 = true;
308 }
309 }
310
311 if cond1 && cond2 {
312 break;
313 }
314
315 if start.elapsed() > Duration::from_secs(7) {
316 panic!("timed out");
317 }
318 }
319}
320
321pub async fn serf_state<T>(transport_opts1: T::Options)
323where
324 T: Transport,
325{
326 let s1 = Serf::<T>::new(transport_opts1, test_config())
327 .await
328 .unwrap();
329
330 assert_eq!(s1.state(), SerfState::Alive);
331
332 s1.leave().await.unwrap();
333
334 assert_eq!(s1.state(), SerfState::Left);
335
336 s1.shutdown().await.unwrap();
337
338 assert_eq!(s1.state(), SerfState::Shutdown);
339}
340
341pub async fn serf_set_tags<T>(transport_opts1: T::Options, transport_opts2: T::Options)
343where
344 T: Transport,
345{
346 let (event_tx, event_rx) = EventProducer::bounded(4);
347 let s1 = Serf::<T>::with_event_producer(transport_opts1, test_config(), event_tx)
348 .await
349 .unwrap();
350 let s2 = Serf::<T>::new(transport_opts2, test_config())
351 .await
352 .unwrap();
353
354 let serfs = [s1, s2];
355
356 wait_until_num_nodes(1, &serfs).await;
357
358 let node = serfs[1]
359 .inner
360 .memberlist
361 .advertise_node()
362 .map_address(MaybeResolvedAddress::resolved);
363 serfs[0].join(node.clone(), false).await.unwrap();
364
365 wait_until_num_nodes(2, &serfs).await;
366
367 serfs[0]
369 .set_tags([("port", "8080")].into_iter().collect())
370 .await
371 .unwrap();
372
373 serfs[1]
374 .set_tags([("datacenter", "east-aws")].into_iter().collect())
375 .await
376 .unwrap();
377
378 let start = Epoch::now();
379 let mut cond1 = false;
380 let mut cond2 = false;
381 let mut cond3 = false;
382 let mut cond4 = false;
383
384 loop {
385 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
386
387 let m1m = serfs[0].members().await;
388 let mut m1m_tags = HashMap::with_capacity(2);
389 for m in m1m {
390 m1m_tags.insert(m.node.id().clone(), m.tags.clone());
391 }
392
393 if m1m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
394 cond1 = true;
395 }
396
397 if m1m_tags
398 .get(serfs[1].local_id())
399 .map(|t| t.get("datacenter"))
400 == Some(Some(&"east-aws".into()))
401 {
402 cond2 = true;
403 }
404
405 let m2m = serfs[1].members().await;
406 let mut m2m_tags = HashMap::with_capacity(2);
407 for m in m2m {
408 m2m_tags.insert(m.node.id().clone(), m.tags.clone());
409 }
410
411 if m2m_tags.get(serfs[0].local_id()).map(|t| t.get("port")) == Some(Some(&"8080".into())) {
412 cond3 = true;
413 }
414
415 if m2m_tags
416 .get(serfs[1].local_id())
417 .map(|t| t.get("datacenter"))
418 == Some(Some(&"east-aws".into()))
419 {
420 cond4 = true;
421 }
422
423 if cond1 && cond2 && cond3 && cond4 {
424 break;
425 }
426
427 if start.elapsed() > Duration::from_secs(7) {
428 panic!("timed out");
429 }
430 }
431
432 test_events(
434 event_rx.rx,
435 node.id().clone(),
436 [
437 CrateEventType::Member(MemberEventType::Join),
438 CrateEventType::Member(MemberEventType::Update),
439 ]
440 .into_iter()
441 .collect(),
442 )
443 .await;
444
445 for s in serfs.iter() {
446 s.shutdown().await.unwrap();
447 }
448}
449
450pub async fn serf_num_nodes<T>(transport_opts1: T::Options, transport_opts2: T::Options)
452where
453 T: Transport,
454{
455 let s1 = Serf::<T>::new(transport_opts1, test_config())
456 .await
457 .unwrap();
458 let s2 = Serf::<T>::new(transport_opts2, test_config())
459 .await
460 .unwrap();
461
462 assert_eq!(s1.num_members().await, 1);
463
464 let serfs = [s1, s2];
465 wait_until_num_nodes(1, &serfs).await;
466
467 let node = serfs[1]
468 .inner
469 .memberlist
470 .advertise_node()
471 .map_address(MaybeResolvedAddress::resolved);
472 serfs[0].join(node.clone(), false).await.unwrap();
473
474 wait_until_num_nodes(2, &serfs).await;
475}
476
477pub async fn serf_coordinates<T>(
479 transport_opts1: T::Options,
480 transport_opts2: T::Options,
481 transport_opts3: T::Options,
482) where
483 T: Transport,
484{
485 const PROBE_INTERVAL: Duration = Duration::from_millis(2);
486
487 let opts = test_config()
488 .with_disable_coordinates(false)
489 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL));
490 let s1 = Serf::<T>::new(transport_opts1, opts.clone()).await.unwrap();
491 let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
492
493 let mut serfs = vec![s1, s2];
494 wait_until_num_nodes(1, &serfs).await;
495
496 let c1 = serfs[0].cooridate().unwrap();
499 let c2 = serfs[1].cooridate().unwrap();
500
501 const ZERO_THRESHOLD: f64 = 20.0e-6;
502
503 assert!(
504 c1.distance_to(&c2).as_secs_f64() <= ZERO_THRESHOLD,
505 "coordinates didn't start at the origin"
506 );
507
508 let node = serfs[1]
510 .inner
511 .memberlist
512 .advertise_node()
513 .map_address(MaybeResolvedAddress::resolved);
514 serfs[0].join(node.clone(), false).await.unwrap();
515
516 wait_until_num_nodes(2, &serfs).await;
517
518 let start = Epoch::now();
519 let mut cond1 = false;
520 let mut cond2 = false;
521 let mut cond3 = false;
522 let mut cond4 = false;
523 let s2id = serfs[1].local_id().clone();
524 let s1id = serfs[0].local_id().clone();
525 loop {
526 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
527
528 if serfs[0].cached_coordinate(&s2id.clone()).is_ok() {
531 cond1 = true;
532 } else if start.elapsed() > Duration::from_secs(7) {
533 panic!("s1 didn't get a coordinate for s2");
534 }
535
536 if serfs[1].cached_coordinate(&s1id.clone()).is_ok() {
537 cond2 = true;
538 } else if start.elapsed() > Duration::from_secs(7) {
539 panic!("s2 didn't get a coordinate for s1");
540 }
541
542 let c1 = serfs[0].cooridate().unwrap();
545 let c2 = serfs[1].cooridate().unwrap();
546
547 if c1.distance_to(&c2).as_secs_f64() >= ZERO_THRESHOLD {
548 cond3 = true;
549 } else if start.elapsed() > Duration::from_secs(7) {
550 panic!("coordinates didn't update after probes");
551 }
552
553 let c1c = serfs[0].cached_coordinate(&s1id.clone()).unwrap();
555 match c1c {
556 None => {
557 if start.elapsed() > Duration::from_secs(7) {
558 panic!("s1 didn't cache its own coordinate");
559 }
560 }
561 Some(c1c) => {
562 if c1 == c1c {
563 cond4 = true;
564 } else if start.elapsed() > Duration::from_secs(7) {
565 panic!("s1 coordinates are not equal");
566 }
567 }
568 }
569
570 if cond1 && cond2 && cond3 && cond4 {
571 break;
572 }
573
574 if start.elapsed() > Duration::from_secs(7) {
575 panic!(
576 "timed out cond1 {} cond2 {} cond3 {} cond4 {}",
577 cond1, cond2, cond3, cond4
578 );
579 }
580 }
581
582 serfs[1].shutdown().await.unwrap();
585 let t = serfs[1].inner.opts.reap_interval * 4;
586 drop(serfs.pop().unwrap());
587
588 <T::Runtime as RuntimeLite>::sleep(t).await;
589
590 wait_until_num_nodes(1, &serfs).await;
591
592 let start = Epoch::now();
593 loop {
594 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
595
596 if serfs[0].cached_coordinate(&s2id.clone()).unwrap().is_none() {
597 break;
598 }
599
600 if start.elapsed() > Duration::from_secs(7) {
601 panic!("s1 should have removed s2's cached coordinate");
602 }
603 }
604
605 let s3 = Serf::<T>::new(
607 transport_opts3,
608 test_config()
609 .with_disable_coordinates(true)
610 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
611 )
612 .await
613 .unwrap();
614
615 serfs.push(s3);
616 wait_until_num_nodes(1, &serfs).await;
617
618 let node = serfs[0]
619 .inner
620 .memberlist
621 .advertise_node()
622 .map_address(MaybeResolvedAddress::resolved);
623 serfs[1].join(node.clone(), false).await.unwrap();
624
625 wait_until_num_nodes(2, &serfs).await;
626
627 let start = Epoch::now();
628 let mut cond1 = false;
629 let mut cond2 = false;
630 loop {
631 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
632
633 if let Err(e) = serfs[1].cooridate() {
636 if e.to_string().contains("coordinates are disabled") {
637 cond1 = true;
638 }
639 }
640
641 if serfs[1].cached_coordinate(&s1id.clone()).is_err() {
642 cond2 = true;
643 }
644
645 if cond1 && cond2 {
646 break;
647 }
648
649 if start.elapsed() > Duration::from_secs(14) {
650 panic!("timed out: cond1 {} cond2 {}", cond1, cond2);
651 }
652 }
653
654 for s in serfs.iter() {
655 s.shutdown().await.unwrap();
656 }
657}
658
659pub async fn serf_name_resolution<T>(
664 transport_opts1: T::Options,
665 transport_opts2: T::Options,
666 transport_opts3: T::Options,
667 set_id: impl FnOnce(T::Options, T::Id) -> T::Options,
668) where
669 T: Transport,
670{
671 let s1 = Serf::<T>::new(transport_opts1, test_config())
672 .await
673 .unwrap();
674 let s2 = Serf::<T>::new(transport_opts2, test_config())
675 .await
676 .unwrap();
677 let s3 = Serf::<T>::new(
678 set_id(transport_opts3, s1.local_id().clone()),
679 test_config(),
680 )
681 .await
682 .unwrap();
683
684 let serfs = [s1, s2, s3];
685 wait_until_num_nodes(1, &serfs).await;
686
687 let node = serfs[1]
689 .inner
690 .memberlist
691 .advertise_node()
692 .map_address(MaybeResolvedAddress::resolved);
693 serfs[0].join(node.clone(), false).await.unwrap();
694
695 wait_until_num_nodes(2, &serfs[..2]).await;
696 wait_until_num_nodes(1, &serfs[2..]).await;
697
698 let node = serfs[2]
699 .inner
700 .memberlist
701 .advertise_node()
702 .map_address(MaybeResolvedAddress::resolved);
703 serfs[0].join(node.clone(), false).await.unwrap();
704
705 <T::Runtime as RuntimeLite>::sleep(serfs[0].default_query_timeout().await * 30).await;
707
708 let start = Epoch::now();
709 let mut cond1 = false;
710 let mut cond2 = false;
711 let mut cond3 = false;
712 loop {
713 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
715
716 if serfs[0].state() == SerfState::Alive {
717 cond1 = true;
718 }
719
720 if serfs[1].state() == SerfState::Alive {
721 cond2 = true;
722 }
723
724 if serfs[2].state() == SerfState::Shutdown {
725 cond3 = true;
726 }
727
728 if cond1 && cond2 && cond3 {
729 break;
730 }
731
732 if start.elapsed() > Duration::from_secs(14) {
733 println!("cond1 {cond1} cond2 {cond2} cond3 {cond3}");
734 panic!("timed out");
735 }
736 }
737
738 for s in serfs.iter() {
739 s.shutdown().await.unwrap();
740 }
741}
742
743pub async fn serf_local_member<T>(opts: T::Options)
745where
746 T: Transport,
747{
748 let s = Serf::<T>::new(opts, test_config()).await.unwrap();
749
750 let local = s.local_member().await;
751 assert_eq!(local.node.id(), s.local_id());
752
753 assert_eq!(local.tags, s.inner.opts.tags());
754 assert_eq!(local.status, MemberStatus::Alive);
755
756 let new_tags = [("foo", "bar"), ("test", "ing")]
757 .into_iter()
758 .collect::<Tags>();
759 s.set_tags(new_tags.clone()).await.unwrap();
760
761 let local = s.local_member().await;
762 assert_eq!(&*local.tags, &new_tags);
763}
764
765pub async fn serf_stats<T>(opts: T::Options)
767where
768 T: Transport,
769{
770 let s = Serf::<T>::new(opts, test_config()).await.unwrap();
771
772 let stats = s.stats().await;
773 assert_eq!(stats.get_event_queue(), 0);
774 assert_eq!(stats.get_event_time(), 1);
775 assert_eq!(stats.get_failed(), 0);
776 assert_eq!(stats.get_intent_queue(), 0);
777 assert_eq!(stats.get_left(), 0);
778 assert_eq!(stats.get_health_score(), 0);
779 assert_eq!(stats.get_member_time(), 1);
780 assert_eq!(stats.get_members(), 1);
781 assert!(!stats.get_encrypted());
782}
783
784#[cfg(feature = "encryption")]
786pub async fn serf_write_keyring_file<T>(
787 get_transport_opts: impl FnOnce(memberlist_core::proto::SecretKey) -> (T::Options, MemberlistOptions),
788) where
789 T: Transport,
790{
791 use std::io::Read;
792
793 const EXISTING: &str = "T9jncgl9mbLus+baTTa7q7nPSUrXwbDi2dhbtqir37s=";
794 const NEW_KEY: &str = "HvY8ubRZMgafUOWvrOadwOckVa1wN3QWAo46FVKbVN8=";
795
796 let td = tempfile::tempdir().unwrap();
797 let mut p = td.path().join("serf_write_keying_file");
798 p.set_extension("json");
799
800 let sk = crate::types::SecretKey::try_from(EXISTING).unwrap();
801
802 let (topts, mopts) = get_transport_opts(sk);
803 let serf = Serf::<T>::new(
804 topts,
805 test_config()
806 .with_keyring_file(Some(p.clone()))
807 .with_memberlist_options(mopts),
808 )
809 .await
810 .unwrap();
811 assert!(
812 serf.encryption_enabled(),
813 "write keyring file test only works on encrypted serf"
814 );
815
816 let manager = serf.key_manager();
817 let new_sk = crate::types::SecretKey::try_from(NEW_KEY).unwrap();
818 manager.install_key(new_sk, None).await.unwrap();
819
820 let mut keyring_file = std::fs::File::open(&p).unwrap();
821 let mut s = String::new();
822 keyring_file.read_to_string(&mut s).unwrap();
823
824 let lines = s.split('\n').collect::<Vec<_>>();
825 assert_eq!(lines.len(), 4);
826
827 assert!(s.contains(EXISTING));
829 assert!(s.contains(NEW_KEY));
830
831 assert!(lines[1].contains(EXISTING));
835
836 manager.use_key(new_sk, None).await.unwrap();
838
839 let mut keyring_file = std::fs::File::open(&p).unwrap();
840 let mut s = String::new();
841 keyring_file.read_to_string(&mut s).unwrap();
842
843 let lines = s.split('\n').collect::<Vec<_>>();
844 assert_eq!(lines.len(), 4);
845
846 assert!(lines[1].contains(NEW_KEY));
848
849 manager.remove_key(sk, None).await.unwrap();
851
852 let mut keyring_file = std::fs::File::open(&p).unwrap();
853 let mut s = String::new();
854 keyring_file.read_to_string(&mut s).unwrap();
855
856 let lines = s.split('\n').collect::<Vec<_>>();
857 assert_eq!(lines.len(), 3);
859
860 assert!(lines[1].contains(NEW_KEY));
861
862 let resp = manager.list_keys().await.unwrap();
863 assert_eq!(resp.primary_keys().len(), 1);
864 assert_eq!(resp.keys().len(), 1);
865}
866
867#[test]
868fn test_recent_intent() {
869 assert!(recent_intent::<SmolStr>(&HashMap::new(), &"foo".into(), MessageType::Join).is_none());
870
871 let now = Epoch::now();
872 let expire = || now - Duration::from_secs(2);
873 let save = || now;
874
875 let mut intents = HashMap::<SmolStr, _>::new();
876 assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
877
878 assert!(upsert_intent(
879 &mut intents,
880 &"foo".into(),
881 MessageType::Join,
882 1.into(),
883 expire
884 ));
885 assert!(upsert_intent(
886 &mut intents,
887 &"bar".into(),
888 MessageType::Leave,
889 2.into(),
890 expire
891 ));
892 assert!(upsert_intent(
893 &mut intents,
894 &"baz".into(),
895 MessageType::Join,
896 3.into(),
897 save
898 ));
899 assert!(upsert_intent(
900 &mut intents,
901 &"bar".into(),
902 MessageType::Join,
903 4.into(),
904 expire
905 ));
906 assert!(!upsert_intent(
907 &mut intents,
908 &"bar".into(),
909 MessageType::Join,
910 0.into(),
911 expire
912 ));
913 assert!(upsert_intent(
914 &mut intents,
915 &"bar".into(),
916 MessageType::Join,
917 5.into(),
918 expire
919 ));
920
921 let ltime = recent_intent(&intents, &"foo".into(), MessageType::Join).unwrap();
922 assert_eq!(ltime, 1.into());
923
924 let ltime = recent_intent(&intents, &"bar".into(), MessageType::Join).unwrap();
925 assert_eq!(ltime, 5.into());
926
927 let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
928 assert_eq!(ltime, 3.into());
929
930 assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
931
932 reap_intents(&mut intents, Epoch::now(), Duration::from_secs(1));
933 assert!(recent_intent(&intents, &"foo".into(), MessageType::Join).is_none());
934 assert!(recent_intent(&intents, &"bar".into(), MessageType::Join).is_none());
935 let ltime = recent_intent(&intents, &"baz".into(), MessageType::Join).unwrap();
936 assert_eq!(ltime, 3.into());
937 assert!(recent_intent(&intents, &"tubez".into(), MessageType::Join).is_none());
938 reap_intents(
939 &mut intents,
940 Epoch::now() + Duration::from_secs(2),
941 Duration::from_secs(1),
942 );
943 assert!(recent_intent(&intents, &"baz".into(), MessageType::Join).is_none());
944}