1use std::marker::PhantomData;
2
3use crate::delegate::MergeDelegate;
4
5use super::*;
6
7pub async fn join_intent_buffer_early<T>(transport_opts: T::Options)
9where
10 T: Transport<Id = SmolStr>,
11{
12 let opts = test_config();
13 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
14
15 let j = JoinMessage {
17 ltime: 10.into(),
18 id: "test".into(),
19 };
20
21 assert!(s1.handle_node_join_intent(&j).await, "should rebroadcast");
22 assert!(
23 !s1.handle_node_join_intent(&j).await,
24 "should not rebroadcast"
25 );
26
27 {
29 let members = s1.inner.members.read().await;
30 let ltime = recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).unwrap();
31 assert_eq!(ltime, 10.into(), "bad buffer");
32 }
33
34 s1.shutdown().await.unwrap();
35}
36
37pub async fn join_intent_old_message<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
39where
40 T: Transport<Id = SmolStr>,
41{
42 let opts = test_config();
43 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
44
45 {
46 let mut members = s1.inner.members.write().await;
47 members.states.insert(
48 "test".into(),
49 MemberState {
50 member: Member {
51 node: Node::new("test".into(), addr),
52 tags: Arc::new(Default::default()),
53 status: MemberStatus::Alive,
54 memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
55 memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
56 protocol_version: crate::types::ProtocolVersion::V1,
57 delegate_version: crate::types::DelegateVersion::V1,
58 },
59 status_time: 12.into(),
60 leave_time: None,
61 },
62 );
63 }
64
65 let j = JoinMessage {
66 ltime: 10.into(),
67 id: "test".into(),
68 };
69
70 assert!(
71 !s1.handle_node_join_intent(&j).await,
72 "should not rebroadcast"
73 );
74
75 {
77 let members = s1.inner.members.read().await;
78 assert!(
79 recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).is_none(),
80 "should not have buffered intent"
81 );
82 }
83
84 s1.shutdown().await.unwrap();
85}
86
87pub async fn join_intent_newer<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
89where
90 T: Transport<Id = SmolStr>,
91{
92 let opts = test_config();
93 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
94 {
95 let mut members = s1.inner.members.write().await;
96 members.states.insert(
97 "test".into(),
98 MemberState {
99 member: Member {
100 node: Node::new("test".into(), addr),
101 tags: Arc::new(Default::default()),
102 status: MemberStatus::Alive,
103 memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
104 memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
105 protocol_version: crate::types::ProtocolVersion::V1,
106 delegate_version: crate::types::DelegateVersion::V1,
107 },
108 status_time: 12.into(),
109 leave_time: None,
110 },
111 );
112 }
113
114 let j = JoinMessage {
115 ltime: 14.into(),
116 id: "test".into(),
117 };
118
119 assert!(s1.handle_node_join_intent(&j).await, "should rebroadcast");
120
121 {
122 let members = s1.inner.members.read().await;
123 assert!(
124 recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).is_none(),
125 "should not have buffered intent"
126 );
127
128 let m = members.states.get("test").unwrap();
129 assert_eq!(m.status_time, 14.into(), "should update join time");
130 assert_eq!(s1.inner.clock.time(), 15.into(), "should update clock");
131 }
132
133 s1.shutdown().await.unwrap();
134}
135
136pub async fn join_intent_reset_leaving<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
138where
139 T: Transport<Id = SmolStr>,
140{
141 let opts = test_config();
142 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
143
144 {
145 let mut members = s1.inner.members.write().await;
146 members.states.insert(
147 "test".into(),
148 MemberState {
149 member: Member {
150 node: Node::new("test".into(), addr),
151 tags: Arc::new(Default::default()),
152 status: MemberStatus::Leaving,
153 memberlist_protocol_version: crate::types::MemberlistProtocolVersion::V1,
154 memberlist_delegate_version: crate::types::MemberlistDelegateVersion::V1,
155 protocol_version: crate::types::ProtocolVersion::V1,
156 delegate_version: crate::types::DelegateVersion::V1,
157 },
158 status_time: 12.into(),
159 leave_time: None,
160 },
161 );
162 }
163
164 let j = JoinMessage {
165 ltime: 14.into(),
166 id: "test".into(),
167 };
168
169 assert!(s1.handle_node_join_intent(&j).await, "should rebroadcast");
170
171 {
172 let members = s1.inner.members.read().await;
173 assert!(
174 recent_intent(&members.recent_intents, &"test".into(), MessageType::Join).is_none(),
175 "should not have buffered intent"
176 );
177
178 let m = members.states.get("test").unwrap();
179 assert_eq!(m.status_time, 14.into(), "should update join time");
180 assert_eq!(m.member.status, MemberStatus::Alive, "should update status");
181 assert_eq!(s1.inner.clock.time(), 15.into(), "should update clock");
182 }
183
184 s1.shutdown().await.unwrap();
185}
186
187pub async fn join_leave_ltime<T>(transport_opts1: T::Options, transport_opts2: T::Options)
189where
190 T: Transport,
191{
192 let opts = test_config();
193 let s1 = Serf::<T>::new(transport_opts1, opts).await.unwrap();
194 let opts = test_config();
195 let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
196
197 let serfs = [s1, s2];
198 wait_until_num_nodes(1, &serfs).await;
199
200 let (_, addr) = serfs[1].inner.memberlist.advertise_node().into_components();
201 serfs[0]
202 .join(MaybeResolvedAddress::resolved(addr), false)
203 .await
204 .unwrap();
205
206 wait_until_num_nodes(2, &serfs).await;
207
208 let now = Epoch::now();
209
210 loop {
211 let members = serfs[1].inner.members.read().await;
212 let mut cond1 = false;
213 let mut cond2 = false;
214 if let Some(m) = members.states.get(serfs[0].inner.memberlist.local_id()) {
215 if m.status_time == 1.into() {
216 cond1 = true;
217 }
218
219 if serfs[1].inner.clock.time() > m.status_time {
220 cond2 = true;
221 }
222 }
223
224 if cond1 && cond2 {
225 break;
226 }
227
228 if now.elapsed() > Duration::from_secs(7) {
229 panic!("timed out waiting for status time to be updated");
230 }
231
232 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
233 }
234
235 let old_clock = serfs[1].inner.clock.time();
236
237 serfs[0].leave().await.unwrap();
238
239 loop {
240 let mut cond1 = false;
241
242 if serfs[1].inner.clock.time() > old_clock {
243 cond1 = true;
244 }
245
246 if cond1 {
247 break;
248 }
249
250 if now.elapsed() > Duration::from_secs(7) {
251 panic!(
252 "leave should increment ({} / {})",
253 serfs[1].inner.clock.time(),
254 old_clock
255 );
256 }
257
258 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
259 }
260
261 for s in serfs.iter() {
262 s.shutdown().await.unwrap();
263 }
264}
265
266pub async fn join_pending_intent<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
268where
269 T: Transport<Id = SmolStr>,
270{
271 let opts = test_config();
272 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
273 {
274 let mut members = s1.inner.members.write().await;
275 upsert_intent::<SmolStr>(
276 &mut members.recent_intents,
277 &"test".into(),
278 MessageType::Join,
279 5.into(),
280 Epoch::now,
281 );
282 }
283
284 s1.handle_node_join(Arc::new(NodeState {
285 id: "test".into(),
286 addr,
287 meta: Meta::empty(),
288 state: memberlist_core::proto::State::Alive,
289 protocol_version: crate::types::MemberlistProtocolVersion::V1,
290 delegate_version: crate::types::MemberlistDelegateVersion::V1,
291 }))
292 .await;
293
294 {
295 let members = s1.inner.members.read().await;
296 let m = members.states.get("test").unwrap();
297 assert_eq!(m.status_time, 5.into());
298 assert_eq!(m.member.status, MemberStatus::Alive);
299 }
300
301 s1.shutdown().await.unwrap();
302}
303
304pub async fn join_pending_intents<T>(transport_opts: T::Options, addr: T::ResolvedAddress)
306where
307 T: Transport<Id = SmolStr>,
308{
309 let opts = test_config();
310 let s1 = Serf::<T>::new(transport_opts, opts).await.unwrap();
311 {
312 let mut members = s1.inner.members.write().await;
313 upsert_intent::<SmolStr>(
314 &mut members.recent_intents,
315 &"test".into(),
316 MessageType::Join,
317 5.into(),
318 Epoch::now,
319 );
320 upsert_intent::<SmolStr>(
321 &mut members.recent_intents,
322 &"test".into(),
323 MessageType::Leave,
324 6.into(),
325 Epoch::now,
326 );
327 }
328
329 s1.handle_node_join(Arc::new(NodeState {
330 id: "test".into(),
331 addr,
332 meta: Meta::empty(),
333 state: memberlist_core::proto::State::Alive,
334 protocol_version: crate::types::MemberlistProtocolVersion::V1,
335 delegate_version: crate::types::MemberlistDelegateVersion::V1,
336 }))
337 .await;
338
339 {
340 let members = s1.inner.members.read().await;
341 let m = members.states.get("test").unwrap();
342 assert_eq!(m.status_time, 6.into());
343 assert_eq!(m.member.status, MemberStatus::Leaving);
344 }
345
346 s1.shutdown().await.unwrap();
347}
348
349pub async fn serf_join_leave<T>(transport_opts1: T::Options, transport_opts2: T::Options)
351where
352 T: Transport,
353{
354 let s1 = Serf::<T>::new(transport_opts1, test_config())
355 .await
356 .unwrap();
357 let reap_interval = s1.inner.opts.reap_interval();
358 let s2 = Serf::<T>::new(transport_opts2, test_config())
359 .await
360 .unwrap();
361
362 let serfs = [s1, s2];
363 wait_until_num_nodes(1, &serfs).await;
364
365 let node = serfs[1]
366 .inner
367 .memberlist
368 .advertise_node()
369 .map_address(MaybeResolvedAddress::resolved);
370 serfs[0].join(node.address().clone(), false).await.unwrap();
371
372 wait_until_num_nodes(2, &serfs).await;
373
374 serfs[1].leave().await.unwrap();
375
376 <T::Runtime as RuntimeLite>::sleep(reap_interval * 2).await;
377
378 wait_until_num_nodes(1, &serfs).await;
379}
380
381pub async fn serf_join_leave_join<T>(transport_opts1: T::Options, transport_opts2: T::Options)
383where
384 T: Transport,
385 T::Options: Clone,
386{
387 let s1 = Serf::<T>::new(
388 transport_opts1,
389 test_config().with_reap_interval(Duration::from_secs(10)),
390 )
391 .await
392 .unwrap();
393 let s2 = Serf::<T>::new(
394 transport_opts2.clone(),
395 test_config().with_reap_interval(Duration::from_secs(10)),
396 )
397 .await
398 .unwrap();
399
400 let mut serfs = [s1, s2];
401 wait_until_num_nodes(1, &serfs).await;
402
403 let node = serfs[1]
404 .inner
405 .memberlist
406 .advertise_node()
407 .map_address(MaybeResolvedAddress::resolved);
408 serfs[0].join(node.address().clone(), false).await.unwrap();
409
410 wait_until_num_nodes(2, &serfs).await;
411
412 serfs[1].leave().await.unwrap();
413 serfs[1].shutdown().await.unwrap();
414
415 let t = serfs[1].inner.opts.memberlist_options.probe_interval() * 5;
416 <T::Runtime as RuntimeLite>::sleep(t).await;
418
419 let start = Epoch::now();
421 loop {
422 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
423
424 let members = serfs[0].inner.members.read().await;
425 let mut any_left = false;
426 for member in members.states.values() {
427 if member.member.status == MemberStatus::Left {
428 any_left = true;
429 break;
430 }
431 }
432
433 if any_left {
434 break;
435 }
436
437 if !any_left && start.elapsed() > Duration::from_secs(7) {
438 panic!("Node should have left");
439 }
440 }
441
442 let s2 = Serf::<T>::new(
444 transport_opts2,
445 test_config().with_reap_interval(Duration::from_secs(10)),
446 )
447 .await
448 .unwrap();
449
450 serfs[1] = s2;
451 wait_until_num_nodes(1, &serfs[1..]).await;
452
453 let node = serfs[1]
455 .inner
456 .memberlist
457 .advertise_node()
458 .map_address(MaybeResolvedAddress::resolved);
459 serfs[0].join(node.address().clone(), false).await.unwrap();
460
461 let start = Epoch::now();
462 loop {
463 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
464
465 let members = serfs[0].inner.members.read().await;
466 let mut any_left = false;
467 for member in members.states.values() {
468 if member.member.status == MemberStatus::Left {
469 any_left = true;
470 break;
471 }
472 }
473
474 if !any_left {
475 break;
476 }
477
478 if start.elapsed() > Duration::from_secs(7) {
479 panic!("all nodes should be alive!");
480 }
481 }
482
483 for s in serfs.iter() {
484 s.shutdown().await.unwrap();
485 }
486}
487
488pub async fn serf_join_ignore_old<T>(transport_opts1: T::Options, transport_opts2: T::Options)
490where
491 T: Transport,
492{
493 let s1 = Serf::<T>::new(transport_opts1, test_config())
494 .await
495 .unwrap();
496
497 let (event_tx, event_rx) = EventProducer::bounded(4);
498 let s2 = Serf::<T>::with_event_producer(transport_opts2, test_config(), event_tx)
499 .await
500 .unwrap();
501
502 let serfs = [s1, s2];
503 wait_until_num_nodes(1, &serfs).await;
504
505 serfs[0]
507 .user_event("event!", Bytes::from_static(b"test"), false)
508 .await
509 .unwrap();
510 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(10)).await;
511
512 serfs[0]
513 .user_event("second", Bytes::from_static(b"foobar"), false)
514 .await
515 .unwrap();
516 <T::Runtime as RuntimeLite>::sleep(Duration::from_secs(10)).await;
517
518 let node = serfs[0]
520 .inner
521 .memberlist
522 .advertise_node()
523 .map_address(MaybeResolvedAddress::resolved);
524 serfs[1].join(node.address().clone(), true).await.unwrap();
525
526 wait_until_num_nodes(2, &serfs).await;
527
528 test_user_events(event_rx.rx, vec![], vec![]).await;
530
531 for s in serfs.iter() {
532 s.shutdown().await.unwrap();
533 }
534}
535
536#[derive(Debug, thiserror::Error)]
537#[error("merge canceled")]
538struct CancelMergeError;
539
540#[derive(Clone)]
541struct CancelMergeDelegate<A: CheapClone + Send + Sync + 'static> {
542 invoked: Arc<AtomicBool>,
543 _phantom: PhantomData<A>,
544}
545
546impl<A: CheapClone + Send + Sync + 'static> MergeDelegate for CancelMergeDelegate<A> {
547 type Error = CancelMergeError;
548
549 type Id = SmolStr;
550
551 type Address = A;
552
553 async fn notify_merge(
554 &self,
555 _members: Arc<[Member<Self::Id, Self::Address>]>,
556 ) -> Result<(), Self::Error> {
557 self.invoked.store(true, Ordering::SeqCst);
558 Err(CancelMergeError)
559 }
560}
561
562pub async fn serf_join_cancel<T>(transport_opts1: T::Options, transport_opts2: T::Options)
564where
565 T: Transport<Id = SmolStr>,
566{
567 let cmd1 = CancelMergeDelegate {
568 invoked: Arc::new(AtomicBool::new(false)),
569 _phantom: PhantomData,
570 };
571 let s1 = Serf::<T, _>::with_delegate(
572 transport_opts1,
573 test_config(),
574 DefaultDelegate::<T>::new().with_merge_delegate(cmd1.clone()),
575 )
576 .await
577 .unwrap();
578 let cmd2 = CancelMergeDelegate {
579 invoked: Arc::new(AtomicBool::new(false)),
580 _phantom: PhantomData,
581 };
582 let s2 = Serf::<T, _>::with_delegate(
583 transport_opts2,
584 test_config(),
585 DefaultDelegate::<T>::new().with_merge_delegate(cmd2.clone()),
586 )
587 .await
588 .unwrap();
589
590 let serfs = [s1, s2];
591 wait_until_num_nodes(0, &serfs).await;
592
593 let node = serfs[1]
594 .inner
595 .memberlist
596 .advertise_node()
597 .map_address(MaybeResolvedAddress::resolved);
598
599 let err = serfs[0]
600 .join(node.address().clone(), false)
601 .await
602 .unwrap_err();
603 assert!(err.to_string().contains("merge canceled"));
604
605 wait_until_num_nodes(0, &serfs).await;
606
607 assert!(cmd1.invoked.load(Ordering::SeqCst));
608 assert!(cmd2.invoked.load(Ordering::SeqCst));
609
610 for s in serfs.iter() {
611 s.shutdown().await.unwrap();
612 }
613}