commonware_p2p/utils/requester/
requester.rs

1//! Requester for sending rate-limited requests to peers.
2
3use super::{Config, PeerLabel};
4use commonware_runtime::{
5    telemetry::metrics::status::{CounterExt, Status},
6    Clock, Metrics,
7};
8use commonware_utils::{Array, PrioritySet};
9use either::Either;
10use governor::{
11    clock::Clock as GClock, middleware::NoOpMiddleware, state::keyed::HashMapStateStore,
12    RateLimiter,
13};
14use rand::{seq::SliceRandom, Rng};
15use std::{
16    collections::{HashMap, HashSet},
17    time::{Duration, SystemTime},
18};
19
20/// Unique identifier for a request.
21///
22/// Once u64 requests have been made, the ID wraps around (resetting to zero).
23/// As long as there are less than u64 requests outstanding, this should not be
24/// an issue.
25pub type ID = u64;
26
27/// Send rate-limited requests to peers prioritized by performance.
28///
29/// Requester attempts to saturate the bandwidth (inferred by rate limit)
30/// of the most performant peers (based on our latency observations). To encourage
31/// exploration, set the value of `initial` to less than the expected latency of
32/// performant peers and/or periodically set `shuffle` in `request`.
33pub struct Requester<E: Clock + GClock + Rng + Metrics, P: Array> {
34    context: E,
35    public_key: P,
36    metrics: super::Metrics,
37    initial: Duration,
38    timeout: Duration,
39
40    // Participants to exclude from requests
41    excluded: HashSet<P>,
42
43    // Rate limiter for participants
44    #[allow(clippy::type_complexity)]
45    rate_limiter: RateLimiter<P, HashMapStateStore<P>, E, NoOpMiddleware<E::Instant>>,
46    // Participants and their performance (lower is better)
47    participants: PrioritySet<P, u128>,
48
49    // Next ID to use for a request
50    id: ID,
51    // Outstanding requests (ID -> (participant, start time))
52    requests: HashMap<ID, (P, SystemTime)>,
53    // Deadlines for outstanding requests (ID -> deadline)
54    deadlines: PrioritySet<ID, SystemTime>,
55}
56
57/// Request responded from handling an ID.
58///
59/// When handling a request, the requester will remove the request and return
60/// this struct in case we want to `resolve` or `timeout` the request. This approach
61/// makes it impossible to forget to remove a handled request if it doesn't warrant
62/// updating the performance of the participant.
63pub struct Request<P: Array> {
64    /// Unique identifier for the request.
65    pub id: ID,
66
67    /// Participant that handled the request.
68    participant: P,
69
70    /// Time the request was issued.
71    start: SystemTime,
72}
73
74impl<E: Clock + GClock + Rng + Metrics, P: Array> Requester<E, P> {
75    /// Create a new requester.
76    pub fn new(context: E, config: Config<P>) -> Self {
77        let rate_limiter = RateLimiter::hashmap_with_clock(config.rate_limit, &context);
78        let metrics = super::Metrics::init(context.clone());
79        Self {
80            context,
81            public_key: config.public_key,
82            metrics,
83            initial: config.initial,
84            timeout: config.timeout,
85
86            excluded: HashSet::new(),
87
88            rate_limiter,
89            participants: PrioritySet::new(),
90
91            id: 0,
92            requests: HashMap::new(),
93            deadlines: PrioritySet::new(),
94        }
95    }
96
97    /// Indicate which participants can be sent requests.
98    pub fn reconcile(&mut self, participants: &[P]) {
99        self.participants
100            .reconcile(participants, self.initial.as_millis());
101        self.rate_limiter.shrink_to_fit();
102    }
103
104    /// Skip a participant for future requests.
105    ///
106    /// Participants added to this list will never be removed (even if dropped
107    /// during `reconcile`, in case they are re-added later).
108    pub fn block(&mut self, participant: P) {
109        self.excluded.insert(participant);
110    }
111
112    /// Ask for a participant to handle a request.
113    ///
114    /// If `shuffle` is true, the order of participants is shuffled before
115    /// a request is made. This is typically used when a request to the preferred
116    /// participant fails.
117    pub fn request(&mut self, shuffle: bool) -> Option<(P, ID)> {
118        // Prepare participant iterator
119        let participant_iter = if shuffle {
120            let mut participants = self.participants.iter().collect::<Vec<_>>();
121            participants.shuffle(&mut self.context);
122            Either::Left(participants.into_iter())
123        } else {
124            Either::Right(self.participants.iter())
125        };
126
127        // Look for a participant that can handle request
128        for (participant, _) in participant_iter {
129            // Check if me
130            if *participant == self.public_key {
131                continue;
132            }
133
134            // Check if excluded
135            if self.excluded.contains(participant) {
136                continue;
137            }
138
139            // Check if rate limit is exceeded (and update rate limiter if not)
140            if self.rate_limiter.check_key(participant).is_err() {
141                continue;
142            }
143
144            // Compute ID
145            let id = self.id;
146            self.id = self.id.wrapping_add(1);
147
148            // Record request issuance time
149            let now = self.context.current();
150            self.requests.insert(id, (participant.clone(), now));
151            let deadline = now.checked_add(self.timeout).expect("time overflowed");
152            self.deadlines.put(id, deadline);
153
154            // Increment metric if-and-only-if request is successful
155            self.metrics.created.inc(Status::Success);
156            return Some((participant.clone(), id));
157        }
158
159        // Increment failed metric if no participants are available
160        self.metrics.created.inc(Status::Failure);
161        None
162    }
163
164    /// Calculate a participant's new priority using exponential moving average.
165    fn update(&mut self, participant: P, elapsed: Duration) {
166        let Some(past) = self.participants.get(&participant) else {
167            return;
168        };
169        let next = past.saturating_add(elapsed.as_millis()) / 2;
170        self.metrics
171            .performance
172            .get_or_create(&PeerLabel::from(&participant))
173            .set(next as i64);
174        self.participants.put(participant, next);
175    }
176
177    /// Drop an outstanding request regardless of who it was intended for.
178    pub fn cancel(&mut self, id: ID) -> Option<Request<P>> {
179        let (participant, start) = self.requests.remove(&id)?;
180        self.deadlines.remove(&id);
181        Some(Request {
182            id,
183            participant,
184            start,
185        })
186    }
187
188    /// Handle a request by ID, ensuring the provided `participant` was
189    /// associated with said ID.
190    ///
191    /// If the request was outstanding, a `Request` is returned that can
192    /// either be resolved or timed out.
193    pub fn handle(&mut self, participant: &P, id: ID) -> Option<Request<P>> {
194        // Confirm ID exists and is for the participant
195        let (expected, _) = self.requests.get(&id)?;
196        if expected != participant {
197            return None;
198        }
199
200        // If expected, remove
201        self.cancel(id)
202    }
203
204    /// Resolve an outstanding request.
205    pub fn resolve(&mut self, request: Request<P>) {
206        // Get elapsed time
207        //
208        // If we can't compute the elapsed time for some reason (i.e. current time does
209        // not monotonically increase), we should still credit the participant for a
210        // timely response.
211        let elapsed = self
212            .context
213            .current()
214            .duration_since(request.start)
215            .unwrap_or_default();
216
217        // Update performance
218        self.update(request.participant, elapsed);
219        self.metrics.requests.inc(Status::Success);
220        self.metrics.resolves.observe(elapsed.as_secs_f64());
221    }
222
223    /// Timeout an outstanding request.
224    pub fn timeout(&mut self, request: Request<P>) {
225        self.update(request.participant, self.timeout);
226        self.metrics.requests.inc(Status::Timeout);
227    }
228
229    /// Fail an outstanding request and penalize the request
230    /// participant with the timeout duration.
231    ///
232    /// This is used when we fail to send a request to a participant.
233    pub fn fail(&mut self, request: Request<P>) {
234        self.update(request.participant, self.timeout);
235        self.metrics.requests.inc(Status::Failure);
236    }
237
238    /// Get the next outstanding ID and deadline.
239    pub fn next(&self) -> Option<(ID, SystemTime)> {
240        let (id, deadline) = self.deadlines.peek()?;
241        Some((*id, *deadline))
242    }
243
244    /// Get the number of outstanding requests.
245    #[allow(clippy::len_without_is_empty)]
246    pub fn len(&self) -> usize {
247        self.requests.len()
248    }
249
250    /// Get the number of blocked participants.
251    pub fn len_blocked(&self) -> usize {
252        self.excluded.len()
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use commonware_cryptography::{Ed25519, Signer};
260    use commonware_runtime::deterministic;
261    use commonware_runtime::Runner;
262    use commonware_utils::NZU32;
263    use governor::Quota;
264    use std::time::Duration;
265
266    #[test]
267    fn test_requester_basic() {
268        // Instantiate context
269        let executor = deterministic::Runner::seeded(0);
270        executor.start(|context| async move {
271            // Create requester
272            let scheme = Ed25519::from_seed(0);
273            let me = scheme.public_key();
274            let timeout = Duration::from_secs(5);
275            let config = Config {
276                public_key: scheme.public_key(),
277                rate_limit: Quota::per_second(NZU32!(1)),
278                initial: Duration::from_millis(100),
279                timeout,
280            };
281            let mut requester = Requester::new(context.clone(), config);
282
283            // Request before any participants
284            assert_eq!(requester.request(false), None);
285            assert_eq!(requester.len(), 0);
286
287            // Ensure we aren't waiting
288            assert_eq!(requester.next(), None);
289
290            // Handle non-existent request
291            assert!(requester.handle(&me, 0).is_none());
292
293            // Initialize requester
294            let other = Ed25519::from_seed(1).public_key();
295            requester.reconcile(&[me.clone(), other.clone()]);
296
297            // Get request
298            let current = context.current();
299            let (participant, id) = requester.request(false).expect("failed to get participant");
300            assert_eq!(id, 0);
301            assert_eq!(participant, other);
302
303            // Check deadline
304            let (id, deadline) = requester.next().expect("failed to get deadline");
305            assert_eq!(id, 0);
306            assert_eq!(deadline, current + timeout);
307            assert_eq!(requester.len(), 1);
308
309            // Try to make another request (would exceed rate limit and can't do self)
310            assert_eq!(requester.request(false), None);
311
312            // Simulate processing time
313            context.sleep(Duration::from_millis(10)).await;
314
315            // Mark request as resolved with wrong participant
316            assert!(requester.handle(&me, id).is_none());
317
318            // Mark request as resolved
319            let request = requester
320                .handle(&participant, id)
321                .expect("failed to get request");
322            assert_eq!(request.id, id);
323            requester.resolve(request);
324
325            // Ensure no more requests
326            assert_eq!(requester.request(false), None);
327
328            // Ensure can't make another request
329            assert_eq!(requester.request(false), None);
330
331            // Wait for rate limit to reset
332            context.sleep(Duration::from_secs(1)).await;
333
334            // Get request
335            let (participant, id) = requester.request(false).expect("failed to get participant");
336            assert_eq!(participant, other);
337            assert_eq!(id, 1);
338
339            // Timeout request
340            let request = requester
341                .handle(&participant, id)
342                .expect("failed to get request");
343            requester.timeout(request);
344
345            // Ensure no more requests
346            assert_eq!(requester.request(false), None);
347
348            // Sleep until reset
349            context.sleep(Duration::from_secs(1)).await;
350
351            // Get request
352            let (participant, id) = requester.request(false).expect("failed to get participant");
353            assert_eq!(participant, other);
354            assert_eq!(id, 2);
355
356            // Cancel request
357            assert!(requester.cancel(id).is_some());
358
359            // Ensure no more requests
360            assert_eq!(requester.next(), None);
361            assert_eq!(requester.len(), 0);
362
363            // Sleep until reset
364            context.sleep(Duration::from_secs(1)).await;
365
366            // Block participant
367            requester.block(other);
368
369            // Get request
370            assert_eq!(requester.request(false), None);
371        });
372    }
373
374    #[test]
375    fn test_requester_multiple() {
376        // Instantiate context
377        let executor = deterministic::Runner::seeded(0);
378        executor.start(|context| async move {
379            // Create requester
380            let scheme = Ed25519::from_seed(0);
381            let me = scheme.public_key();
382            let timeout = Duration::from_secs(5);
383            let config = Config {
384                public_key: scheme.public_key(),
385                rate_limit: Quota::per_second(NZU32!(1)),
386                initial: Duration::from_millis(100),
387                timeout,
388            };
389            let mut requester = Requester::new(context.clone(), config);
390
391            // Request before any participants
392            assert_eq!(requester.request(false), None);
393
394            // Ensure we aren't waiting
395            assert_eq!(requester.next(), None);
396
397            // Initialize requester
398            let other1 = Ed25519::from_seed(1).public_key();
399            let other2 = Ed25519::from_seed(2).public_key();
400            requester.reconcile(&[me.clone(), other1.clone(), other2.clone()]);
401
402            // Get request
403            let (participant, id) = requester.request(false).expect("failed to get participant");
404            assert_eq!(id, 0);
405            if participant == other2 {
406                let request = requester
407                    .handle(&participant, id)
408                    .expect("failed to get request");
409                requester.timeout(request);
410            } else {
411                panic!("unexpected participant");
412            }
413
414            // Get request
415            let (participant, id) = requester.request(false).expect("failed to get participant");
416            assert_eq!(id, 1);
417            if participant == other1 {
418                context.sleep(Duration::from_millis(10)).await;
419                let request = requester
420                    .handle(&participant, id)
421                    .expect("failed to get request");
422                requester.resolve(request);
423            } else {
424                panic!("unexpected participant");
425            }
426
427            // Try to make another request (would exceed rate limit and can't do self)
428            assert_eq!(requester.request(false), None);
429
430            // Wait for rate limit to reset
431            context.sleep(Duration::from_secs(1)).await;
432
433            // Get request
434            let (participant, id) = requester.request(false).expect("failed to get participant");
435            assert_eq!(participant, other1);
436            assert_eq!(id, 2);
437
438            // Cancel request
439            assert!(requester.cancel(id).is_some());
440
441            // Add another participant
442            let other3 = Ed25519::from_seed(3).public_key();
443            requester.reconcile(&[me, other1, other2.clone(), other3.clone()]);
444
445            // Get request (new should be prioritized because lower default time)
446            let (participant, id) = requester.request(false).expect("failed to get participant");
447            assert_eq!(participant, other3);
448            assert_eq!(id, 3);
449
450            // Wait until eventually get slower participant
451            context.sleep(Duration::from_secs(1)).await;
452            loop {
453                // Shuffle participants
454                let (participant, _) = requester.request(true).unwrap();
455                if participant == other2 {
456                    break;
457                }
458
459                // Sleep until reset
460                context.sleep(Duration::from_secs(1)).await;
461            }
462        });
463    }
464}