commonware_p2p/utils/requester/
requester.rs1use super::{Config, PeerLabel};
4use commonware_runtime::{
5 telemetry::metrics::status::{CounterExt, Status},
6 Clock, Metrics,
7};
8use commonware_utils::{Array, PrioritySet};
9use either::Either;
10use governor::{
11 clock::Clock as GClock, middleware::NoOpMiddleware, state::keyed::HashMapStateStore,
12 RateLimiter,
13};
14use rand::{seq::SliceRandom, Rng};
15use std::{
16 collections::{HashMap, HashSet},
17 time::{Duration, SystemTime},
18};
19
20pub type ID = u64;
26
27pub struct Requester<E: Clock + GClock + Rng + Metrics, P: Array> {
34 context: E,
35 public_key: P,
36 metrics: super::Metrics,
37 initial: Duration,
38 timeout: Duration,
39
40 excluded: HashSet<P>,
42
43 #[allow(clippy::type_complexity)]
45 rate_limiter: RateLimiter<P, HashMapStateStore<P>, E, NoOpMiddleware<E::Instant>>,
46 participants: PrioritySet<P, u128>,
48
49 id: ID,
51 requests: HashMap<ID, (P, SystemTime)>,
53 deadlines: PrioritySet<ID, SystemTime>,
55}
56
57pub struct Request<P: Array> {
64 pub id: ID,
66
67 participant: P,
69
70 start: SystemTime,
72}
73
74impl<E: Clock + GClock + Rng + Metrics, P: Array> Requester<E, P> {
75 pub fn new(context: E, config: Config<P>) -> Self {
77 let rate_limiter = RateLimiter::hashmap_with_clock(config.rate_limit, &context);
78 let metrics = super::Metrics::init(context.clone());
79 Self {
80 context,
81 public_key: config.public_key,
82 metrics,
83 initial: config.initial,
84 timeout: config.timeout,
85
86 excluded: HashSet::new(),
87
88 rate_limiter,
89 participants: PrioritySet::new(),
90
91 id: 0,
92 requests: HashMap::new(),
93 deadlines: PrioritySet::new(),
94 }
95 }
96
97 pub fn reconcile(&mut self, participants: &[P]) {
99 self.participants
100 .reconcile(participants, self.initial.as_millis());
101 self.rate_limiter.shrink_to_fit();
102 }
103
104 pub fn block(&mut self, participant: P) {
109 self.excluded.insert(participant);
110 }
111
112 pub fn request(&mut self, shuffle: bool) -> Option<(P, ID)> {
118 let participant_iter = if shuffle {
120 let mut participants = self.participants.iter().collect::<Vec<_>>();
121 participants.shuffle(&mut self.context);
122 Either::Left(participants.into_iter())
123 } else {
124 Either::Right(self.participants.iter())
125 };
126
127 for (participant, _) in participant_iter {
129 if *participant == self.public_key {
131 continue;
132 }
133
134 if self.excluded.contains(participant) {
136 continue;
137 }
138
139 if self.rate_limiter.check_key(participant).is_err() {
141 continue;
142 }
143
144 let id = self.id;
146 self.id = self.id.wrapping_add(1);
147
148 let now = self.context.current();
150 self.requests.insert(id, (participant.clone(), now));
151 let deadline = now.checked_add(self.timeout).expect("time overflowed");
152 self.deadlines.put(id, deadline);
153
154 self.metrics.created.inc(Status::Success);
156 return Some((participant.clone(), id));
157 }
158
159 self.metrics.created.inc(Status::Failure);
161 None
162 }
163
164 fn update(&mut self, participant: P, elapsed: Duration) {
166 let Some(past) = self.participants.get(&participant) else {
167 return;
168 };
169 let next = past.saturating_add(elapsed.as_millis()) / 2;
170 self.metrics
171 .performance
172 .get_or_create(&PeerLabel::from(&participant))
173 .set(next as i64);
174 self.participants.put(participant, next);
175 }
176
177 pub fn cancel(&mut self, id: ID) -> Option<Request<P>> {
179 let (participant, start) = self.requests.remove(&id)?;
180 self.deadlines.remove(&id);
181 Some(Request {
182 id,
183 participant,
184 start,
185 })
186 }
187
188 pub fn handle(&mut self, participant: &P, id: ID) -> Option<Request<P>> {
194 let (expected, _) = self.requests.get(&id)?;
196 if expected != participant {
197 return None;
198 }
199
200 self.cancel(id)
202 }
203
204 pub fn resolve(&mut self, request: Request<P>) {
206 let elapsed = self
212 .context
213 .current()
214 .duration_since(request.start)
215 .unwrap_or_default();
216
217 self.update(request.participant, elapsed);
219 self.metrics.requests.inc(Status::Success);
220 self.metrics.resolves.observe(elapsed.as_secs_f64());
221 }
222
223 pub fn timeout(&mut self, request: Request<P>) {
225 self.update(request.participant, self.timeout);
226 self.metrics.requests.inc(Status::Timeout);
227 }
228
229 pub fn fail(&mut self, request: Request<P>) {
234 self.update(request.participant, self.timeout);
235 self.metrics.requests.inc(Status::Failure);
236 }
237
238 pub fn next(&self) -> Option<(ID, SystemTime)> {
240 let (id, deadline) = self.deadlines.peek()?;
241 Some((*id, *deadline))
242 }
243
244 #[allow(clippy::len_without_is_empty)]
246 pub fn len(&self) -> usize {
247 self.requests.len()
248 }
249
250 pub fn len_blocked(&self) -> usize {
252 self.excluded.len()
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use commonware_cryptography::ed25519::PrivateKey;
260 use commonware_cryptography::{PrivateKeyExt as _, Signer as _};
261 use commonware_runtime::deterministic;
262 use commonware_runtime::Runner;
263 use commonware_utils::NZU32;
264 use governor::Quota;
265 use std::time::Duration;
266
267 #[test]
268 fn test_requester_basic() {
269 let executor = deterministic::Runner::seeded(0);
271 executor.start(|context| async move {
272 let scheme = PrivateKey::from_seed(0);
274 let me = scheme.public_key();
275 let timeout = Duration::from_secs(5);
276 let config = Config {
277 public_key: scheme.public_key(),
278 rate_limit: Quota::per_second(NZU32!(1)),
279 initial: Duration::from_millis(100),
280 timeout,
281 };
282 let mut requester = Requester::new(context.clone(), config);
283
284 assert_eq!(requester.request(false), None);
286 assert_eq!(requester.len(), 0);
287
288 assert_eq!(requester.next(), None);
290
291 assert!(requester.handle(&me, 0).is_none());
293
294 let other = PrivateKey::from_seed(1).public_key();
296 requester.reconcile(&[me.clone(), other.clone()]);
297
298 let current = context.current();
300 let (participant, id) = requester.request(false).expect("failed to get participant");
301 assert_eq!(id, 0);
302 assert_eq!(participant, other);
303
304 let (id, deadline) = requester.next().expect("failed to get deadline");
306 assert_eq!(id, 0);
307 assert_eq!(deadline, current + timeout);
308 assert_eq!(requester.len(), 1);
309
310 assert_eq!(requester.request(false), None);
312
313 context.sleep(Duration::from_millis(10)).await;
315
316 assert!(requester.handle(&me, id).is_none());
318
319 let request = requester
321 .handle(&participant, id)
322 .expect("failed to get request");
323 assert_eq!(request.id, id);
324 requester.resolve(request);
325
326 assert_eq!(requester.request(false), None);
328
329 assert_eq!(requester.request(false), None);
331
332 context.sleep(Duration::from_secs(1)).await;
334
335 let (participant, id) = requester.request(false).expect("failed to get participant");
337 assert_eq!(participant, other);
338 assert_eq!(id, 1);
339
340 let request = requester
342 .handle(&participant, id)
343 .expect("failed to get request");
344 requester.timeout(request);
345
346 assert_eq!(requester.request(false), None);
348
349 context.sleep(Duration::from_secs(1)).await;
351
352 let (participant, id) = requester.request(false).expect("failed to get participant");
354 assert_eq!(participant, other);
355 assert_eq!(id, 2);
356
357 assert!(requester.cancel(id).is_some());
359
360 assert_eq!(requester.next(), None);
362 assert_eq!(requester.len(), 0);
363
364 context.sleep(Duration::from_secs(1)).await;
366
367 requester.block(other);
369
370 assert_eq!(requester.request(false), None);
372 });
373 }
374
375 #[test]
376 fn test_requester_multiple() {
377 let executor = deterministic::Runner::seeded(0);
379 executor.start(|context| async move {
380 let scheme = PrivateKey::from_seed(0);
382 let me = scheme.public_key();
383 let timeout = Duration::from_secs(5);
384 let config = Config {
385 public_key: scheme.public_key(),
386 rate_limit: Quota::per_second(NZU32!(1)),
387 initial: Duration::from_millis(100),
388 timeout,
389 };
390 let mut requester = Requester::new(context.clone(), config);
391
392 assert_eq!(requester.request(false), None);
394
395 assert_eq!(requester.next(), None);
397
398 let other1 = PrivateKey::from_seed(1).public_key();
400 let other2 = PrivateKey::from_seed(2).public_key();
401 requester.reconcile(&[me.clone(), other1.clone(), other2.clone()]);
402
403 let (participant, id) = requester.request(false).expect("failed to get participant");
405 assert_eq!(id, 0);
406 if participant == other2 {
407 let request = requester
408 .handle(&participant, id)
409 .expect("failed to get request");
410 requester.timeout(request);
411 } else {
412 panic!("unexpected participant");
413 }
414
415 let (participant, id) = requester.request(false).expect("failed to get participant");
417 assert_eq!(id, 1);
418 if participant == other1 {
419 context.sleep(Duration::from_millis(10)).await;
420 let request = requester
421 .handle(&participant, id)
422 .expect("failed to get request");
423 requester.resolve(request);
424 } else {
425 panic!("unexpected participant");
426 }
427
428 assert_eq!(requester.request(false), None);
430
431 context.sleep(Duration::from_secs(1)).await;
433
434 let (participant, id) = requester.request(false).expect("failed to get participant");
436 assert_eq!(participant, other1);
437 assert_eq!(id, 2);
438
439 assert!(requester.cancel(id).is_some());
441
442 let other3 = PrivateKey::from_seed(3).public_key();
444 requester.reconcile(&[me, other1, other2.clone(), other3.clone()]);
445
446 let (participant, id) = requester.request(false).expect("failed to get participant");
448 assert_eq!(participant, other3);
449 assert_eq!(id, 3);
450
451 context.sleep(Duration::from_secs(1)).await;
453 loop {
454 let (participant, _) = requester.request(true).unwrap();
456 if participant == other2 {
457 break;
458 }
459
460 context.sleep(Duration::from_secs(1)).await;
462 }
463 });
464 }
465}