commonware_p2p/utils/requester/
requester.rs1use super::{Config, PeerLabel};
4use commonware_runtime::{
5 telemetry::metrics::status::{CounterExt, Status},
6 Clock, Metrics,
7};
8use commonware_utils::{Array, PrioritySet};
9use either::Either;
10use governor::{
11 clock::Clock as GClock, middleware::NoOpMiddleware, state::keyed::HashMapStateStore,
12 RateLimiter,
13};
14use rand::{seq::SliceRandom, Rng};
15use std::{
16 collections::{HashMap, HashSet},
17 time::{Duration, SystemTime},
18};
19
20pub type ID = u64;
26
27pub struct Requester<E: Clock + GClock + Rng + Metrics, P: Array> {
34 context: E,
35 public_key: P,
36 metrics: super::Metrics,
37 initial: Duration,
38 timeout: Duration,
39
40 excluded: HashSet<P>,
42
43 #[allow(clippy::type_complexity)]
45 rate_limiter: RateLimiter<P, HashMapStateStore<P>, E, NoOpMiddleware<E::Instant>>,
46 participants: PrioritySet<P, u128>,
48
49 id: ID,
51 requests: HashMap<ID, (P, SystemTime)>,
53 deadlines: PrioritySet<ID, SystemTime>,
55}
56
57pub struct Request<P: Array> {
64 pub id: ID,
66
67 participant: P,
69
70 start: SystemTime,
72}
73
74impl<E: Clock + GClock + Rng + Metrics, P: Array> Requester<E, P> {
75 pub fn new(context: E, config: Config<P>) -> Self {
77 let rate_limiter = RateLimiter::hashmap_with_clock(config.rate_limit, &context);
78 let metrics = super::Metrics::init(context.clone());
79 Self {
80 context,
81 public_key: config.public_key,
82 metrics,
83 initial: config.initial,
84 timeout: config.timeout,
85
86 excluded: HashSet::new(),
87
88 rate_limiter,
89 participants: PrioritySet::new(),
90
91 id: 0,
92 requests: HashMap::new(),
93 deadlines: PrioritySet::new(),
94 }
95 }
96
97 pub fn reconcile(&mut self, participants: &[P]) {
99 self.participants
100 .reconcile(participants, self.initial.as_millis());
101 self.rate_limiter.shrink_to_fit();
102 }
103
104 pub fn block(&mut self, participant: P) {
109 self.excluded.insert(participant);
110 }
111
112 pub fn request(&mut self, shuffle: bool) -> Option<(P, ID)> {
118 let participant_iter = if shuffle {
120 let mut participants = self.participants.iter().collect::<Vec<_>>();
121 participants.shuffle(&mut self.context);
122 Either::Left(participants.into_iter())
123 } else {
124 Either::Right(self.participants.iter())
125 };
126
127 for (participant, _) in participant_iter {
129 if *participant == self.public_key {
131 continue;
132 }
133
134 if self.excluded.contains(participant) {
136 continue;
137 }
138
139 if self.rate_limiter.check_key(participant).is_err() {
141 continue;
142 }
143
144 let id = self.id;
146 self.id = self.id.wrapping_add(1);
147
148 let now = self.context.current();
150 self.requests.insert(id, (participant.clone(), now));
151 let deadline = now.checked_add(self.timeout).expect("time overflowed");
152 self.deadlines.put(id, deadline);
153
154 self.metrics.created.inc(Status::Success);
156 return Some((participant.clone(), id));
157 }
158
159 self.metrics.created.inc(Status::Failure);
161 None
162 }
163
164 fn update(&mut self, participant: P, elapsed: Duration) {
166 let Some(past) = self.participants.get(&participant) else {
167 return;
168 };
169 let next = past.saturating_add(elapsed.as_millis()) / 2;
170 self.metrics
171 .performance
172 .get_or_create(&PeerLabel::from(&participant))
173 .set(next as i64);
174 self.participants.put(participant, next);
175 }
176
177 pub fn cancel(&mut self, id: ID) -> Option<Request<P>> {
179 let (participant, start) = self.requests.remove(&id)?;
180 self.deadlines.remove(&id);
181 Some(Request {
182 id,
183 participant,
184 start,
185 })
186 }
187
188 pub fn handle(&mut self, participant: &P, id: ID) -> Option<Request<P>> {
194 let (expected, _) = self.requests.get(&id)?;
196 if expected != participant {
197 return None;
198 }
199
200 self.cancel(id)
202 }
203
204 pub fn resolve(&mut self, request: Request<P>) {
206 let elapsed = self
212 .context
213 .current()
214 .duration_since(request.start)
215 .unwrap_or_default();
216
217 self.update(request.participant, elapsed);
219 self.metrics.requests.inc(Status::Success);
220 self.metrics.resolves.observe(elapsed.as_secs_f64());
221 }
222
223 pub fn timeout(&mut self, request: Request<P>) {
225 self.update(request.participant, self.timeout);
226 self.metrics.requests.inc(Status::Timeout);
227 }
228
229 pub fn fail(&mut self, request: Request<P>) {
234 self.update(request.participant, self.timeout);
235 self.metrics.requests.inc(Status::Failure);
236 }
237
238 pub fn next(&self) -> Option<(ID, SystemTime)> {
240 let (id, deadline) = self.deadlines.peek()?;
241 Some((*id, *deadline))
242 }
243
244 #[allow(clippy::len_without_is_empty)]
246 pub fn len(&self) -> usize {
247 self.requests.len()
248 }
249
250 pub fn len_blocked(&self) -> usize {
252 self.excluded.len()
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use commonware_cryptography::{Ed25519, Signer};
260 use commonware_runtime::deterministic;
261 use commonware_runtime::Runner;
262 use commonware_utils::NZU32;
263 use governor::Quota;
264 use std::time::Duration;
265
266 #[test]
267 fn test_requester_basic() {
268 let executor = deterministic::Runner::seeded(0);
270 executor.start(|context| async move {
271 let scheme = Ed25519::from_seed(0);
273 let me = scheme.public_key();
274 let timeout = Duration::from_secs(5);
275 let config = Config {
276 public_key: scheme.public_key(),
277 rate_limit: Quota::per_second(NZU32!(1)),
278 initial: Duration::from_millis(100),
279 timeout,
280 };
281 let mut requester = Requester::new(context.clone(), config);
282
283 assert_eq!(requester.request(false), None);
285 assert_eq!(requester.len(), 0);
286
287 assert_eq!(requester.next(), None);
289
290 assert!(requester.handle(&me, 0).is_none());
292
293 let other = Ed25519::from_seed(1).public_key();
295 requester.reconcile(&[me.clone(), other.clone()]);
296
297 let current = context.current();
299 let (participant, id) = requester.request(false).expect("failed to get participant");
300 assert_eq!(id, 0);
301 assert_eq!(participant, other);
302
303 let (id, deadline) = requester.next().expect("failed to get deadline");
305 assert_eq!(id, 0);
306 assert_eq!(deadline, current + timeout);
307 assert_eq!(requester.len(), 1);
308
309 assert_eq!(requester.request(false), None);
311
312 context.sleep(Duration::from_millis(10)).await;
314
315 assert!(requester.handle(&me, id).is_none());
317
318 let request = requester
320 .handle(&participant, id)
321 .expect("failed to get request");
322 assert_eq!(request.id, id);
323 requester.resolve(request);
324
325 assert_eq!(requester.request(false), None);
327
328 assert_eq!(requester.request(false), None);
330
331 context.sleep(Duration::from_secs(1)).await;
333
334 let (participant, id) = requester.request(false).expect("failed to get participant");
336 assert_eq!(participant, other);
337 assert_eq!(id, 1);
338
339 let request = requester
341 .handle(&participant, id)
342 .expect("failed to get request");
343 requester.timeout(request);
344
345 assert_eq!(requester.request(false), None);
347
348 context.sleep(Duration::from_secs(1)).await;
350
351 let (participant, id) = requester.request(false).expect("failed to get participant");
353 assert_eq!(participant, other);
354 assert_eq!(id, 2);
355
356 assert!(requester.cancel(id).is_some());
358
359 assert_eq!(requester.next(), None);
361 assert_eq!(requester.len(), 0);
362
363 context.sleep(Duration::from_secs(1)).await;
365
366 requester.block(other);
368
369 assert_eq!(requester.request(false), None);
371 });
372 }
373
374 #[test]
375 fn test_requester_multiple() {
376 let executor = deterministic::Runner::seeded(0);
378 executor.start(|context| async move {
379 let scheme = Ed25519::from_seed(0);
381 let me = scheme.public_key();
382 let timeout = Duration::from_secs(5);
383 let config = Config {
384 public_key: scheme.public_key(),
385 rate_limit: Quota::per_second(NZU32!(1)),
386 initial: Duration::from_millis(100),
387 timeout,
388 };
389 let mut requester = Requester::new(context.clone(), config);
390
391 assert_eq!(requester.request(false), None);
393
394 assert_eq!(requester.next(), None);
396
397 let other1 = Ed25519::from_seed(1).public_key();
399 let other2 = Ed25519::from_seed(2).public_key();
400 requester.reconcile(&[me.clone(), other1.clone(), other2.clone()]);
401
402 let (participant, id) = requester.request(false).expect("failed to get participant");
404 assert_eq!(id, 0);
405 if participant == other2 {
406 let request = requester
407 .handle(&participant, id)
408 .expect("failed to get request");
409 requester.timeout(request);
410 } else {
411 panic!("unexpected participant");
412 }
413
414 let (participant, id) = requester.request(false).expect("failed to get participant");
416 assert_eq!(id, 1);
417 if participant == other1 {
418 context.sleep(Duration::from_millis(10)).await;
419 let request = requester
420 .handle(&participant, id)
421 .expect("failed to get request");
422 requester.resolve(request);
423 } else {
424 panic!("unexpected participant");
425 }
426
427 assert_eq!(requester.request(false), None);
429
430 context.sleep(Duration::from_secs(1)).await;
432
433 let (participant, id) = requester.request(false).expect("failed to get participant");
435 assert_eq!(participant, other1);
436 assert_eq!(id, 2);
437
438 assert!(requester.cancel(id).is_some());
440
441 let other3 = Ed25519::from_seed(3).public_key();
443 requester.reconcile(&[me, other1, other2.clone(), other3.clone()]);
444
445 let (participant, id) = requester.request(false).expect("failed to get participant");
447 assert_eq!(participant, other3);
448 assert_eq!(id, 3);
449
450 context.sleep(Duration::from_secs(1)).await;
452 loop {
453 let (participant, _) = requester.request(true).unwrap();
455 if participant == other2 {
456 break;
457 }
458
459 context.sleep(Duration::from_secs(1)).await;
461 }
462 });
463 }
464}