commonware_p2p/utils/requester/
requester.rs1use super::{Config, PeerLabel};
4use commonware_cryptography::PublicKey;
5use commonware_runtime::{
6 telemetry::metrics::status::{CounterExt, Status},
7 Clock, Metrics,
8};
9use commonware_utils::PrioritySet;
10use either::Either;
11use governor::{
12 clock::Clock as GClock, middleware::NoOpMiddleware, state::keyed::HashMapStateStore,
13 RateLimiter,
14};
15use rand::{seq::SliceRandom, Rng};
16use std::{
17 collections::{HashMap, HashSet},
18 time::{Duration, SystemTime},
19};
20
21pub type ID = u64;
27
28pub struct Requester<E: Clock + GClock + Rng + Metrics, P: PublicKey> {
35 context: E,
36 public_key: P,
37 metrics: super::Metrics,
38 initial: Duration,
39 timeout: Duration,
40
41 excluded: HashSet<P>,
43
44 #[allow(clippy::type_complexity)]
46 rate_limiter: RateLimiter<P, HashMapStateStore<P>, E, NoOpMiddleware<E::Instant>>,
47 participants: PrioritySet<P, u128>,
49
50 id: ID,
52 requests: HashMap<ID, (P, SystemTime)>,
54 deadlines: PrioritySet<ID, SystemTime>,
56}
57
58pub struct Request<P: PublicKey> {
65 pub id: ID,
67
68 participant: P,
70
71 start: SystemTime,
73}
74
75impl<E: Clock + GClock + Rng + Metrics, P: PublicKey> Requester<E, P> {
76 pub fn new(context: E, config: Config<P>) -> Self {
78 let rate_limiter = RateLimiter::hashmap_with_clock(config.rate_limit, &context);
79 let metrics = super::Metrics::init(context.clone());
80 Self {
81 context,
82 public_key: config.public_key,
83 metrics,
84 initial: config.initial,
85 timeout: config.timeout,
86
87 excluded: HashSet::new(),
88
89 rate_limiter,
90 participants: PrioritySet::new(),
91
92 id: 0,
93 requests: HashMap::new(),
94 deadlines: PrioritySet::new(),
95 }
96 }
97
98 pub fn reconcile(&mut self, participants: &[P]) {
100 self.participants
101 .reconcile(participants, self.initial.as_millis());
102 self.rate_limiter.shrink_to_fit();
103 }
104
105 pub fn block(&mut self, participant: P) {
110 self.excluded.insert(participant);
111 }
112
113 pub fn request(&mut self, shuffle: bool) -> Option<(P, ID)> {
119 let participant_iter = if shuffle {
121 let mut participants = self.participants.iter().collect::<Vec<_>>();
122 participants.shuffle(&mut self.context);
123 Either::Left(participants.into_iter())
124 } else {
125 Either::Right(self.participants.iter())
126 };
127
128 for (participant, _) in participant_iter {
130 if *participant == self.public_key {
132 continue;
133 }
134
135 if self.excluded.contains(participant) {
137 continue;
138 }
139
140 if self.rate_limiter.check_key(participant).is_err() {
142 continue;
143 }
144
145 let id = self.id;
147 self.id = self.id.wrapping_add(1);
148
149 let now = self.context.current();
151 self.requests.insert(id, (participant.clone(), now));
152 let deadline = now.checked_add(self.timeout).expect("time overflowed");
153 self.deadlines.put(id, deadline);
154
155 self.metrics.created.inc(Status::Success);
157 return Some((participant.clone(), id));
158 }
159
160 self.metrics.created.inc(Status::Failure);
162 None
163 }
164
165 fn update(&mut self, participant: P, elapsed: Duration) {
167 let Some(past) = self.participants.get(&participant) else {
168 return;
169 };
170 let next = past.saturating_add(elapsed.as_millis()) / 2;
171 self.metrics
172 .performance
173 .get_or_create(&PeerLabel::from(&participant))
174 .set(next as i64);
175 self.participants.put(participant, next);
176 }
177
178 pub fn cancel(&mut self, id: ID) -> Option<Request<P>> {
180 let (participant, start) = self.requests.remove(&id)?;
181 self.deadlines.remove(&id);
182 Some(Request {
183 id,
184 participant,
185 start,
186 })
187 }
188
189 pub fn handle(&mut self, participant: &P, id: ID) -> Option<Request<P>> {
195 let (expected, _) = self.requests.get(&id)?;
197 if expected != participant {
198 return None;
199 }
200
201 self.cancel(id)
203 }
204
205 pub fn resolve(&mut self, request: Request<P>) {
207 let elapsed = self
213 .context
214 .current()
215 .duration_since(request.start)
216 .unwrap_or_default();
217
218 self.update(request.participant, elapsed);
220 self.metrics.requests.inc(Status::Success);
221 self.metrics.resolves.observe(elapsed.as_secs_f64());
222 }
223
224 pub fn timeout(&mut self, request: Request<P>) {
226 self.update(request.participant, self.timeout);
227 self.metrics.requests.inc(Status::Timeout);
228 }
229
230 pub fn fail(&mut self, request: Request<P>) {
235 self.update(request.participant, self.timeout);
236 self.metrics.requests.inc(Status::Failure);
237 }
238
239 pub fn next(&self) -> Option<(ID, SystemTime)> {
241 let (id, deadline) = self.deadlines.peek()?;
242 Some((*id, *deadline))
243 }
244
245 #[allow(clippy::len_without_is_empty)]
247 pub fn len(&self) -> usize {
248 self.requests.len()
249 }
250
251 pub fn len_blocked(&self) -> usize {
253 self.excluded.len()
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use commonware_cryptography::{ed25519::PrivateKey, PrivateKeyExt as _, Signer as _};
261 use commonware_runtime::{deterministic, Runner};
262 use commonware_utils::NZU32;
263 use governor::Quota;
264 use std::time::Duration;
265
266 #[test]
267 fn test_requester_basic() {
268 let executor = deterministic::Runner::seeded(0);
270 executor.start(|context| async move {
271 let scheme = PrivateKey::from_seed(0);
273 let me = scheme.public_key();
274 let timeout = Duration::from_secs(5);
275 let config = Config {
276 public_key: scheme.public_key(),
277 rate_limit: Quota::per_second(NZU32!(1)),
278 initial: Duration::from_millis(100),
279 timeout,
280 };
281 let mut requester = Requester::new(context.clone(), config);
282
283 assert_eq!(requester.request(false), None);
285 assert_eq!(requester.len(), 0);
286
287 assert_eq!(requester.next(), None);
289
290 assert!(requester.handle(&me, 0).is_none());
292
293 let other = PrivateKey::from_seed(1).public_key();
295 requester.reconcile(&[me.clone(), other.clone()]);
296
297 let current = context.current();
299 let (participant, id) = requester.request(false).expect("failed to get participant");
300 assert_eq!(id, 0);
301 assert_eq!(participant, other);
302
303 let (id, deadline) = requester.next().expect("failed to get deadline");
305 assert_eq!(id, 0);
306 assert_eq!(deadline, current + timeout);
307 assert_eq!(requester.len(), 1);
308
309 assert_eq!(requester.request(false), None);
311
312 context.sleep(Duration::from_millis(10)).await;
314
315 assert!(requester.handle(&me, id).is_none());
317
318 let request = requester
320 .handle(&participant, id)
321 .expect("failed to get request");
322 assert_eq!(request.id, id);
323 requester.resolve(request);
324
325 assert_eq!(requester.request(false), None);
327
328 assert_eq!(requester.request(false), None);
330
331 context.sleep(Duration::from_secs(1)).await;
333
334 let (participant, id) = requester.request(false).expect("failed to get participant");
336 assert_eq!(participant, other);
337 assert_eq!(id, 1);
338
339 let request = requester
341 .handle(&participant, id)
342 .expect("failed to get request");
343 requester.timeout(request);
344
345 assert_eq!(requester.request(false), None);
347
348 context.sleep(Duration::from_secs(1)).await;
350
351 let (participant, id) = requester.request(false).expect("failed to get participant");
353 assert_eq!(participant, other);
354 assert_eq!(id, 2);
355
356 assert!(requester.cancel(id).is_some());
358
359 assert_eq!(requester.next(), None);
361 assert_eq!(requester.len(), 0);
362
363 context.sleep(Duration::from_secs(1)).await;
365
366 requester.block(other);
368
369 assert_eq!(requester.request(false), None);
371 });
372 }
373
374 #[test]
375 fn test_requester_multiple() {
376 let executor = deterministic::Runner::seeded(0);
378 executor.start(|context| async move {
379 let scheme = PrivateKey::from_seed(0);
381 let me = scheme.public_key();
382 let timeout = Duration::from_secs(5);
383 let config = Config {
384 public_key: scheme.public_key(),
385 rate_limit: Quota::per_second(NZU32!(1)),
386 initial: Duration::from_millis(100),
387 timeout,
388 };
389 let mut requester = Requester::new(context.clone(), config);
390
391 assert_eq!(requester.request(false), None);
393
394 assert_eq!(requester.next(), None);
396
397 let other1 = PrivateKey::from_seed(1).public_key();
399 let other2 = PrivateKey::from_seed(2).public_key();
400 requester.reconcile(&[me.clone(), other1.clone(), other2.clone()]);
401
402 let (participant, id) = requester.request(false).expect("failed to get participant");
404 assert_eq!(id, 0);
405 if participant == other2 {
406 let request = requester
407 .handle(&participant, id)
408 .expect("failed to get request");
409 requester.timeout(request);
410 } else {
411 panic!("unexpected participant");
412 }
413
414 let (participant, id) = requester.request(false).expect("failed to get participant");
416 assert_eq!(id, 1);
417 if participant == other1 {
418 context.sleep(Duration::from_millis(10)).await;
419 let request = requester
420 .handle(&participant, id)
421 .expect("failed to get request");
422 requester.resolve(request);
423 } else {
424 panic!("unexpected participant");
425 }
426
427 assert_eq!(requester.request(false), None);
429
430 context.sleep(Duration::from_secs(1)).await;
432
433 let (participant, id) = requester.request(false).expect("failed to get participant");
435 assert_eq!(participant, other1);
436 assert_eq!(id, 2);
437
438 assert!(requester.cancel(id).is_some());
440
441 let other3 = PrivateKey::from_seed(3).public_key();
443 requester.reconcile(&[me, other1, other2.clone(), other3.clone()]);
444
445 let (participant, id) = requester.request(false).expect("failed to get participant");
447 assert_eq!(participant, other3);
448 assert_eq!(id, 3);
449
450 context.sleep(Duration::from_secs(1)).await;
452 loop {
453 let (participant, _) = requester.request(true).unwrap();
455 if participant == other2 {
456 break;
457 }
458
459 context.sleep(Duration::from_secs(1)).await;
461 }
462 });
463 }
464}