serf_core/serf/base/tests/serf/
delegate.rs

1use super::*;
2
3/// Unit test for delegate node meta
4pub async fn delegate_nodemeta<T>(transport_opts: T::Options)
5where
6  T: Transport,
7{
8  let opts = test_config();
9  let s = Serf::<T>::new(
10    transport_opts,
11    opts.with_tags([("role", "test")].into_iter()),
12  )
13  .await
14  .unwrap();
15  let meta = s.inner.memberlist.delegate().unwrap().node_meta(32).await;
16
17  let (_, tags) = Tags::decode(&meta).unwrap();
18  assert_eq!(tags.get("role"), Some(&SmolStr::new("test")));
19
20  s.shutdown().await.unwrap();
21}
22
23/// Unit test for delegate node meta panic
24pub async fn delegate_nodemeta_panic<T>(transport_opts: T::Options)
25where
26  T: Transport,
27{
28  let opts = test_config();
29  let s = Serf::<T>::new(
30    transport_opts,
31    opts.with_tags([("role", "test")].into_iter()),
32  )
33  .await
34  .unwrap();
35  s.inner.memberlist.delegate().unwrap().node_meta(1).await;
36  s.shutdown().await.unwrap();
37}
38
39/// Unit test for delegate local state
40pub async fn delegate_local_state<T>(transport_opts1: T::Options, transport_opts2: T::Options)
41where
42  T: Transport,
43{
44  let opts = test_config().with_event_buffer_size(0);
45  let s1 = Serf::<T>::new(transport_opts1, opts).await.unwrap();
46
47  let opts = test_config().with_event_buffer_size(0);
48  let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
49
50  let serfs = [s1, s2];
51  wait_until_num_nodes(1, &serfs).await;
52
53  let (id, addr) = serfs[1].memberlist().advertise_node().into_components();
54
55  serfs[0]
56    .join(Node::new(id, MaybeResolvedAddress::resolved(addr)), false)
57    .await
58    .unwrap();
59
60  wait_until_num_nodes(2, &serfs).await;
61
62  serfs[0]
63    .user_event("test", Bytes::from_static(b"test"), false)
64    .await
65    .unwrap();
66
67  serfs[0].query("foo", Bytes::new(), None).await.unwrap();
68
69  // s2 can leave now
70  serfs[1].leave().await.unwrap();
71
72  // Do a state dump
73  let buf = serfs[0]
74    .memberlist()
75    .delegate()
76    .unwrap()
77    .local_state(false)
78    .await;
79
80  // Attempt a decode
81  let pp = crate::types::decode_message::<T::Id, T::ResolvedAddress>(&buf).unwrap();
82
83  let MessageRef::PushPull(pp) = pp else {
84    panic!("bad message")
85  };
86  let pp = <PushPullMessage<T::Id> as Data>::from_ref(pp).unwrap();
87
88  // Verify lamport clock
89  assert_eq!(pp.ltime(), serfs[0].inner.clock.time(), "bad lamport clock");
90
91  // Verify the status
92  // Leave waits until propagation so this should only have one member
93  assert_eq!(pp.status_ltimes().len(), 1, "missing ltimes");
94  assert_eq!(pp.left_members().len(), 0, "should have no left memebers");
95  assert_eq!(
96    pp.event_ltime(),
97    serfs[0].inner.event_clock.time(),
98    "bad event clock"
99  );
100  assert_eq!(
101    pp.events().len(),
102    serfs[0].inner.event_core.read().await.buffer.len(),
103    "should send full event buffer"
104  );
105  assert_eq!(
106    pp.query_ltime(),
107    serfs[0].inner.query_clock.time(),
108    "bad query clock"
109  );
110
111  for s in serfs {
112    s.shutdown().await.unwrap();
113  }
114}
115
116/// Unit test for delegate merge remote state
117pub async fn delegate_merge_remote_state<T>(transport_opts: T::Options)
118where
119  T: Transport<Id = SmolStr>,
120{
121  let opts = test_config();
122  let s = Serf::<T>::new(transport_opts, opts).await.unwrap();
123  let d = s.memberlist().delegate().unwrap();
124
125  // Make a fake push pull
126  let pp = PushPullMessage {
127    ltime: 42.into(),
128    status_ltimes: [
129      (SmolStr::new("test"), 20.into()),
130      (SmolStr::new("foo"), 15.into()),
131    ]
132    .into_iter()
133    .collect(),
134    left_members: ["foo".into()].into_iter().collect(),
135    event_ltime: 50.into(),
136    events: TinyVec::from(UserEvents {
137      ltime: 45.into(),
138      events: OneOrMore::from(UserEvent {
139        name: "test".into(),
140        payload: Bytes::new(),
141      }),
142    }),
143    query_ltime: 100.into(),
144  };
145
146  let buf = crate::types::encode_message_to_bytes(&pp).unwrap();
147
148  // Merge in fake state
149  d.merge_remote_state(&buf, false).await;
150
151  // Verify lamport
152  assert_eq!(s.inner.clock.time(), 42.into(), "bad lamport clock");
153
154  let members = s.inner.members.read().await;
155  // Verify pending join for test
156  let ltime = recent_intent(
157    &members.recent_intents,
158    &SmolStr::new("test"),
159    MessageType::Join,
160  )
161  .unwrap();
162  assert_eq!(ltime, 20.into(), "bad join ltime");
163  // Verify pending leave for foo
164  let ltime = recent_intent(
165    &members.recent_intents,
166    &SmolStr::new("foo"),
167    MessageType::Leave,
168  )
169  .unwrap();
170  assert_eq!(ltime, 16.into(), "bad leave ltime");
171
172  // Verify event clock
173  assert_eq!(s.inner.event_clock.time(), 50.into(), "bad event clock");
174  let buf = s.inner.event_core.read().await;
175  assert!(buf.buffer[45].is_some(), "missing event buffer for time");
176  assert_eq!(buf.buffer[45].as_ref().unwrap().events[0].name, "test");
177  assert_eq!(s.inner.query_clock.time(), 100.into(), "bad query clock");
178
179  s.shutdown().await.unwrap();
180}
181
182/// Unit test for serf ping delegate versioning
183pub async fn serf_ping_delegate_versioning<T>(
184  transport_opts1: T::Options,
185  transport_opts2: T::Options,
186) where
187  T: Transport,
188{
189  const PROBE_INTERVAL: Duration = Duration::from_millis(2);
190
191  let s1 = Serf::<T>::new(
192    transport_opts1,
193    test_config()
194      .with_disable_coordinates(false)
195      .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
196  )
197  .await
198  .unwrap();
199
200  let s2 = Serf::<T>::new(
201    transport_opts2,
202    test_config()
203      .with_disable_coordinates(false)
204      .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
205  )
206  .await
207  .unwrap();
208
209  // Monkey patch s1 to send weird versions of the ping messages.
210  s1.memberlist()
211    .delegate()
212    .unwrap()
213    .ping_versioning_test
214    .store(true, Ordering::SeqCst);
215
216  let serfs = [s1, s2];
217  wait_until_num_nodes(1, &serfs).await;
218
219  let node = serfs[1]
220    .memberlist()
221    .advertise_node()
222    .map_address(MaybeResolvedAddress::resolved);
223  serfs[0].join(node, false).await.unwrap();
224
225  // They both should show 2 members, but only s1 should know about s2
226  // in the cache, since s1 spoke an alien ping protocol.
227  wait_until_num_nodes(2, &serfs).await;
228
229  let start = Epoch::now();
230  let mut cond1 = false;
231  let mut cond2 = false;
232  let s1id = serfs[0].local_id().clone();
233  let s2id = serfs[1].local_id().clone();
234
235  loop {
236    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
237
238    if serfs[0].cached_coordinate(&s2id).unwrap().is_some() {
239      cond1 = true;
240    } else if start.elapsed() > Duration::from_secs(7) {
241      panic!("s1 didn't get a coordinate for s2");
242    }
243
244    if serfs[1].cached_coordinate(&s1id).unwrap().is_none() {
245      cond2 = true;
246    } else if start.elapsed() > Duration::from_secs(7) {
247      panic!("s2 got an unexpected coordinate for s1");
248    }
249
250    if cond1 && cond2 {
251      break;
252    }
253
254    if start.elapsed() > Duration::from_secs(7) {
255      panic!("s1: {} s2: {}", cond1, cond2);
256    }
257  }
258
259  for s in serfs.iter() {
260    s.shutdown().await.unwrap();
261  }
262}
263
264/// Unit test for serf ping delegate rogue coordinate
265pub async fn serf_ping_delegate_rogue_coordinate<T>(
266  transport_opts1: T::Options,
267  transport_opts2: T::Options,
268) where
269  T: Transport,
270{
271  const PROBE_INTERVAL: Duration = Duration::from_millis(2);
272
273  let opts = test_config().with_disable_coordinates(false);
274  let s1 = Serf::<T>::new(
275    transport_opts1,
276    opts
277      .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
278  )
279  .await
280  .unwrap();
281
282  let opts = test_config().with_disable_coordinates(false);
283  let s2 = Serf::<T>::new(
284    transport_opts2,
285    opts
286      .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
287  )
288  .await
289  .unwrap();
290
291  // Monkey patch s1 to send ping messages with bad coordinates.
292  s1.memberlist()
293    .delegate()
294    .unwrap()
295    .ping_dimension_test
296    .store(true, Ordering::SeqCst);
297
298  let serfs = [s1, s2];
299  wait_until_num_nodes(1, &serfs).await;
300
301  let node = serfs[1]
302    .memberlist()
303    .advertise_node()
304    .map_address(MaybeResolvedAddress::resolved);
305  serfs[0].join(node, false).await.unwrap();
306
307  // They both should show 2 members, but only s1 should know about s2
308  // in the cache, since s1 spoke an alien ping protocol.
309  wait_until_num_nodes(2, &serfs).await;
310
311  let start = Epoch::now();
312  let mut cond1 = false;
313  let mut cond2 = false;
314  let s1id = serfs[0].local_id().clone();
315  let s2id = serfs[1].local_id().clone();
316
317  // They both should show 2 members, but only s1 should know about s2
318  // in the cache, since s1 sent a bad coordinate.
319  loop {
320    <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
321
322    let s1c = serfs[0].cached_coordinate(&s2id).unwrap();
323    // println!("s1c {:?}", s1c);
324    if s1c.is_some() {
325      cond1 = true;
326    } else if start.elapsed() > Duration::from_secs(7) {
327      panic!("s1 didn't get a coordinate for s2");
328    }
329
330    let s2c = serfs[1].cached_coordinate(&s1id).unwrap();
331    // println!("s2c {:?}", s2c);
332    if s2c.is_none() {
333      cond2 = true;
334    } else if start.elapsed() > Duration::from_secs(7) {
335      panic!("s2 got an unexpected coordinate for s1");
336    }
337
338    if cond1 && cond2 {
339      break;
340    }
341
342    if start.elapsed() > Duration::from_secs(7) {
343      panic!("s1: {} s2: {}", cond1, cond2);
344    }
345  }
346
347  for s in serfs.iter() {
348    s.shutdown().await.unwrap();
349  }
350}