serf_core/serf/base/tests/serf/
delegate.rs1use super::*;
2
3pub async fn delegate_nodemeta<T>(transport_opts: T::Options)
5where
6 T: Transport,
7{
8 let opts = test_config();
9 let s = Serf::<T>::new(
10 transport_opts,
11 opts.with_tags([("role", "test")].into_iter()),
12 )
13 .await
14 .unwrap();
15 let meta = s.inner.memberlist.delegate().unwrap().node_meta(32).await;
16
17 let (_, tags) = Tags::decode(&meta).unwrap();
18 assert_eq!(tags.get("role"), Some(&SmolStr::new("test")));
19
20 s.shutdown().await.unwrap();
21}
22
23pub async fn delegate_nodemeta_panic<T>(transport_opts: T::Options)
25where
26 T: Transport,
27{
28 let opts = test_config();
29 let s = Serf::<T>::new(
30 transport_opts,
31 opts.with_tags([("role", "test")].into_iter()),
32 )
33 .await
34 .unwrap();
35 s.inner.memberlist.delegate().unwrap().node_meta(1).await;
36 s.shutdown().await.unwrap();
37}
38
39pub async fn delegate_local_state<T>(transport_opts1: T::Options, transport_opts2: T::Options)
41where
42 T: Transport,
43{
44 let opts = test_config().with_event_buffer_size(0);
45 let s1 = Serf::<T>::new(transport_opts1, opts).await.unwrap();
46
47 let opts = test_config().with_event_buffer_size(0);
48 let s2 = Serf::<T>::new(transport_opts2, opts).await.unwrap();
49
50 let serfs = [s1, s2];
51 wait_until_num_nodes(1, &serfs).await;
52
53 let (id, addr) = serfs[1].memberlist().advertise_node().into_components();
54
55 serfs[0]
56 .join(Node::new(id, MaybeResolvedAddress::resolved(addr)), false)
57 .await
58 .unwrap();
59
60 wait_until_num_nodes(2, &serfs).await;
61
62 serfs[0]
63 .user_event("test", Bytes::from_static(b"test"), false)
64 .await
65 .unwrap();
66
67 serfs[0].query("foo", Bytes::new(), None).await.unwrap();
68
69 serfs[1].leave().await.unwrap();
71
72 let buf = serfs[0]
74 .memberlist()
75 .delegate()
76 .unwrap()
77 .local_state(false)
78 .await;
79
80 let pp = crate::types::decode_message::<T::Id, T::ResolvedAddress>(&buf).unwrap();
82
83 let MessageRef::PushPull(pp) = pp else {
84 panic!("bad message")
85 };
86 let pp = <PushPullMessage<T::Id> as Data>::from_ref(pp).unwrap();
87
88 assert_eq!(pp.ltime(), serfs[0].inner.clock.time(), "bad lamport clock");
90
91 assert_eq!(pp.status_ltimes().len(), 1, "missing ltimes");
94 assert_eq!(pp.left_members().len(), 0, "should have no left memebers");
95 assert_eq!(
96 pp.event_ltime(),
97 serfs[0].inner.event_clock.time(),
98 "bad event clock"
99 );
100 assert_eq!(
101 pp.events().len(),
102 serfs[0].inner.event_core.read().await.buffer.len(),
103 "should send full event buffer"
104 );
105 assert_eq!(
106 pp.query_ltime(),
107 serfs[0].inner.query_clock.time(),
108 "bad query clock"
109 );
110
111 for s in serfs {
112 s.shutdown().await.unwrap();
113 }
114}
115
116pub async fn delegate_merge_remote_state<T>(transport_opts: T::Options)
118where
119 T: Transport<Id = SmolStr>,
120{
121 let opts = test_config();
122 let s = Serf::<T>::new(transport_opts, opts).await.unwrap();
123 let d = s.memberlist().delegate().unwrap();
124
125 let pp = PushPullMessage {
127 ltime: 42.into(),
128 status_ltimes: [
129 (SmolStr::new("test"), 20.into()),
130 (SmolStr::new("foo"), 15.into()),
131 ]
132 .into_iter()
133 .collect(),
134 left_members: ["foo".into()].into_iter().collect(),
135 event_ltime: 50.into(),
136 events: TinyVec::from(UserEvents {
137 ltime: 45.into(),
138 events: OneOrMore::from(UserEvent {
139 name: "test".into(),
140 payload: Bytes::new(),
141 }),
142 }),
143 query_ltime: 100.into(),
144 };
145
146 let buf = crate::types::encode_message_to_bytes(&pp).unwrap();
147
148 d.merge_remote_state(&buf, false).await;
150
151 assert_eq!(s.inner.clock.time(), 42.into(), "bad lamport clock");
153
154 let members = s.inner.members.read().await;
155 let ltime = recent_intent(
157 &members.recent_intents,
158 &SmolStr::new("test"),
159 MessageType::Join,
160 )
161 .unwrap();
162 assert_eq!(ltime, 20.into(), "bad join ltime");
163 let ltime = recent_intent(
165 &members.recent_intents,
166 &SmolStr::new("foo"),
167 MessageType::Leave,
168 )
169 .unwrap();
170 assert_eq!(ltime, 16.into(), "bad leave ltime");
171
172 assert_eq!(s.inner.event_clock.time(), 50.into(), "bad event clock");
174 let buf = s.inner.event_core.read().await;
175 assert!(buf.buffer[45].is_some(), "missing event buffer for time");
176 assert_eq!(buf.buffer[45].as_ref().unwrap().events[0].name, "test");
177 assert_eq!(s.inner.query_clock.time(), 100.into(), "bad query clock");
178
179 s.shutdown().await.unwrap();
180}
181
182pub async fn serf_ping_delegate_versioning<T>(
184 transport_opts1: T::Options,
185 transport_opts2: T::Options,
186) where
187 T: Transport,
188{
189 const PROBE_INTERVAL: Duration = Duration::from_millis(2);
190
191 let s1 = Serf::<T>::new(
192 transport_opts1,
193 test_config()
194 .with_disable_coordinates(false)
195 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
196 )
197 .await
198 .unwrap();
199
200 let s2 = Serf::<T>::new(
201 transport_opts2,
202 test_config()
203 .with_disable_coordinates(false)
204 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
205 )
206 .await
207 .unwrap();
208
209 s1.memberlist()
211 .delegate()
212 .unwrap()
213 .ping_versioning_test
214 .store(true, Ordering::SeqCst);
215
216 let serfs = [s1, s2];
217 wait_until_num_nodes(1, &serfs).await;
218
219 let node = serfs[1]
220 .memberlist()
221 .advertise_node()
222 .map_address(MaybeResolvedAddress::resolved);
223 serfs[0].join(node, false).await.unwrap();
224
225 wait_until_num_nodes(2, &serfs).await;
228
229 let start = Epoch::now();
230 let mut cond1 = false;
231 let mut cond2 = false;
232 let s1id = serfs[0].local_id().clone();
233 let s2id = serfs[1].local_id().clone();
234
235 loop {
236 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
237
238 if serfs[0].cached_coordinate(&s2id).unwrap().is_some() {
239 cond1 = true;
240 } else if start.elapsed() > Duration::from_secs(7) {
241 panic!("s1 didn't get a coordinate for s2");
242 }
243
244 if serfs[1].cached_coordinate(&s1id).unwrap().is_none() {
245 cond2 = true;
246 } else if start.elapsed() > Duration::from_secs(7) {
247 panic!("s2 got an unexpected coordinate for s1");
248 }
249
250 if cond1 && cond2 {
251 break;
252 }
253
254 if start.elapsed() > Duration::from_secs(7) {
255 panic!("s1: {} s2: {}", cond1, cond2);
256 }
257 }
258
259 for s in serfs.iter() {
260 s.shutdown().await.unwrap();
261 }
262}
263
264pub async fn serf_ping_delegate_rogue_coordinate<T>(
266 transport_opts1: T::Options,
267 transport_opts2: T::Options,
268) where
269 T: Transport,
270{
271 const PROBE_INTERVAL: Duration = Duration::from_millis(2);
272
273 let opts = test_config().with_disable_coordinates(false);
274 let s1 = Serf::<T>::new(
275 transport_opts1,
276 opts
277 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
278 )
279 .await
280 .unwrap();
281
282 let opts = test_config().with_disable_coordinates(false);
283 let s2 = Serf::<T>::new(
284 transport_opts2,
285 opts
286 .with_memberlist_options(memberlist_core::Options::lan().with_probe_interval(PROBE_INTERVAL)),
287 )
288 .await
289 .unwrap();
290
291 s1.memberlist()
293 .delegate()
294 .unwrap()
295 .ping_dimension_test
296 .store(true, Ordering::SeqCst);
297
298 let serfs = [s1, s2];
299 wait_until_num_nodes(1, &serfs).await;
300
301 let node = serfs[1]
302 .memberlist()
303 .advertise_node()
304 .map_address(MaybeResolvedAddress::resolved);
305 serfs[0].join(node, false).await.unwrap();
306
307 wait_until_num_nodes(2, &serfs).await;
310
311 let start = Epoch::now();
312 let mut cond1 = false;
313 let mut cond2 = false;
314 let s1id = serfs[0].local_id().clone();
315 let s2id = serfs[1].local_id().clone();
316
317 loop {
320 <T::Runtime as RuntimeLite>::sleep(Duration::from_millis(25)).await;
321
322 let s1c = serfs[0].cached_coordinate(&s2id).unwrap();
323 if s1c.is_some() {
325 cond1 = true;
326 } else if start.elapsed() > Duration::from_secs(7) {
327 panic!("s1 didn't get a coordinate for s2");
328 }
329
330 let s2c = serfs[1].cached_coordinate(&s1id).unwrap();
331 if s2c.is_none() {
333 cond2 = true;
334 } else if start.elapsed() > Duration::from_secs(7) {
335 panic!("s2 got an unexpected coordinate for s1");
336 }
337
338 if cond1 && cond2 {
339 break;
340 }
341
342 if start.elapsed() > Duration::from_secs(7) {
343 panic!("s1: {} s2: {}", cond1, cond2);
344 }
345 }
346
347 for s in serfs.iter() {
348 s.shutdown().await.unwrap();
349 }
350}