commonware_p2p/utils/requester/
requester.rs

1//! Requester for sending rate-limited requests to peers.
2
3use super::{Config, PeerLabel};
4use commonware_cryptography::PublicKey;
5use commonware_runtime::{
6    telemetry::metrics::status::{CounterExt, Status},
7    Clock, Metrics,
8};
9use commonware_utils::PrioritySet;
10use either::Either;
11use governor::{
12    clock::Clock as GClock, middleware::NoOpMiddleware, state::keyed::HashMapStateStore,
13    RateLimiter,
14};
15use rand::{seq::SliceRandom, Rng};
16use std::{
17    collections::{HashMap, HashSet},
18    time::{Duration, SystemTime},
19};
20
21/// Unique identifier for a request.
22///
23/// Once u64 requests have been made, the ID wraps around (resetting to zero).
24/// As long as there are less than u64 requests outstanding, this should not be
25/// an issue.
26pub type ID = u64;
27
28/// Send rate-limited requests to peers prioritized by performance.
29///
30/// Requester attempts to saturate the bandwidth (inferred by rate limit)
31/// of the most performant peers (based on our latency observations). To encourage
32/// exploration, set the value of `initial` to less than the expected latency of
33/// performant peers and/or periodically set `shuffle` in `request`.
34pub struct Requester<E: Clock + GClock + Rng + Metrics, P: PublicKey> {
35    context: E,
36    public_key: P,
37    metrics: super::Metrics,
38    initial: Duration,
39    timeout: Duration,
40
41    // Participants to exclude from requests
42    excluded: HashSet<P>,
43
44    // Rate limiter for participants
45    #[allow(clippy::type_complexity)]
46    rate_limiter: RateLimiter<P, HashMapStateStore<P>, E, NoOpMiddleware<E::Instant>>,
47    // Participants and their performance (lower is better)
48    participants: PrioritySet<P, u128>,
49
50    // Next ID to use for a request
51    id: ID,
52    // Outstanding requests (ID -> (participant, start time))
53    requests: HashMap<ID, (P, SystemTime)>,
54    // Deadlines for outstanding requests (ID -> deadline)
55    deadlines: PrioritySet<ID, SystemTime>,
56}
57
58/// Request responded from handling an ID.
59///
60/// When handling a request, the requester will remove the request and return
61/// this struct in case we want to `resolve` or `timeout` the request. This approach
62/// makes it impossible to forget to remove a handled request if it doesn't warrant
63/// updating the performance of the participant.
64pub struct Request<P: PublicKey> {
65    /// Unique identifier for the request.
66    pub id: ID,
67
68    /// Participant that handled the request.
69    participant: P,
70
71    /// Time the request was issued.
72    start: SystemTime,
73}
74
75impl<E: Clock + GClock + Rng + Metrics, P: PublicKey> Requester<E, P> {
76    /// Create a new requester.
77    pub fn new(context: E, config: Config<P>) -> Self {
78        let rate_limiter = RateLimiter::hashmap_with_clock(config.rate_limit, &context);
79        let metrics = super::Metrics::init(context.clone());
80        Self {
81            context,
82            public_key: config.public_key,
83            metrics,
84            initial: config.initial,
85            timeout: config.timeout,
86
87            excluded: HashSet::new(),
88
89            rate_limiter,
90            participants: PrioritySet::new(),
91
92            id: 0,
93            requests: HashMap::new(),
94            deadlines: PrioritySet::new(),
95        }
96    }
97
98    /// Indicate which participants can be sent requests.
99    pub fn reconcile(&mut self, participants: &[P]) {
100        self.participants
101            .reconcile(participants, self.initial.as_millis());
102        self.rate_limiter.shrink_to_fit();
103    }
104
105    /// Skip a participant for future requests.
106    ///
107    /// Participants added to this list will never be removed (even if dropped
108    /// during `reconcile`, in case they are re-added later).
109    pub fn block(&mut self, participant: P) {
110        self.excluded.insert(participant);
111    }
112
113    /// Ask for a participant to handle a request.
114    ///
115    /// If `shuffle` is true, the order of participants is shuffled before
116    /// a request is made. This is typically used when a request to the preferred
117    /// participant fails.
118    pub fn request(&mut self, shuffle: bool) -> Option<(P, ID)> {
119        // Prepare participant iterator
120        let participant_iter = if shuffle {
121            let mut participants = self.participants.iter().collect::<Vec<_>>();
122            participants.shuffle(&mut self.context);
123            Either::Left(participants.into_iter())
124        } else {
125            Either::Right(self.participants.iter())
126        };
127
128        // Look for a participant that can handle request
129        for (participant, _) in participant_iter {
130            // Check if me
131            if *participant == self.public_key {
132                continue;
133            }
134
135            // Check if excluded
136            if self.excluded.contains(participant) {
137                continue;
138            }
139
140            // Check if rate limit is exceeded (and update rate limiter if not)
141            if self.rate_limiter.check_key(participant).is_err() {
142                continue;
143            }
144
145            // Compute ID
146            let id = self.id;
147            self.id = self.id.wrapping_add(1);
148
149            // Record request issuance time
150            let now = self.context.current();
151            self.requests.insert(id, (participant.clone(), now));
152            let deadline = now.checked_add(self.timeout).expect("time overflowed");
153            self.deadlines.put(id, deadline);
154
155            // Increment metric if-and-only-if request is successful
156            self.metrics.created.inc(Status::Success);
157            return Some((participant.clone(), id));
158        }
159
160        // Increment failed metric if no participants are available
161        self.metrics.created.inc(Status::Failure);
162        None
163    }
164
165    /// Calculate a participant's new priority using exponential moving average.
166    fn update(&mut self, participant: P, elapsed: Duration) {
167        let Some(past) = self.participants.get(&participant) else {
168            return;
169        };
170        let next = past.saturating_add(elapsed.as_millis()) / 2;
171        self.metrics
172            .performance
173            .get_or_create(&PeerLabel::from(&participant))
174            .set(next as i64);
175        self.participants.put(participant, next);
176    }
177
178    /// Drop an outstanding request regardless of who it was intended for.
179    pub fn cancel(&mut self, id: ID) -> Option<Request<P>> {
180        let (participant, start) = self.requests.remove(&id)?;
181        self.deadlines.remove(&id);
182        Some(Request {
183            id,
184            participant,
185            start,
186        })
187    }
188
189    /// Handle a request by ID, ensuring the provided `participant` was
190    /// associated with said ID.
191    ///
192    /// If the request was outstanding, a `Request` is returned that can
193    /// either be resolved or timed out.
194    pub fn handle(&mut self, participant: &P, id: ID) -> Option<Request<P>> {
195        // Confirm ID exists and is for the participant
196        let (expected, _) = self.requests.get(&id)?;
197        if expected != participant {
198            return None;
199        }
200
201        // If expected, remove
202        self.cancel(id)
203    }
204
205    /// Resolve an outstanding request.
206    pub fn resolve(&mut self, request: Request<P>) {
207        // Get elapsed time
208        //
209        // If we can't compute the elapsed time for some reason (i.e. current time does
210        // not monotonically increase), we should still credit the participant for a
211        // timely response.
212        let elapsed = self
213            .context
214            .current()
215            .duration_since(request.start)
216            .unwrap_or_default();
217
218        // Update performance
219        self.update(request.participant, elapsed);
220        self.metrics.requests.inc(Status::Success);
221        self.metrics.resolves.observe(elapsed.as_secs_f64());
222    }
223
224    /// Timeout an outstanding request.
225    pub fn timeout(&mut self, request: Request<P>) {
226        self.update(request.participant, self.timeout);
227        self.metrics.requests.inc(Status::Timeout);
228    }
229
230    /// Fail an outstanding request and penalize the request
231    /// participant with the timeout duration.
232    ///
233    /// This is used when we fail to send a request to a participant.
234    pub fn fail(&mut self, request: Request<P>) {
235        self.update(request.participant, self.timeout);
236        self.metrics.requests.inc(Status::Failure);
237    }
238
239    /// Get the next outstanding ID and deadline.
240    pub fn next(&self) -> Option<(ID, SystemTime)> {
241        let (id, deadline) = self.deadlines.peek()?;
242        Some((*id, *deadline))
243    }
244
245    /// Get the number of outstanding requests.
246    #[allow(clippy::len_without_is_empty)]
247    pub fn len(&self) -> usize {
248        self.requests.len()
249    }
250
251    /// Get the number of blocked participants.
252    pub fn len_blocked(&self) -> usize {
253        self.excluded.len()
254    }
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use commonware_cryptography::{ed25519::PrivateKey, PrivateKeyExt as _, Signer as _};
261    use commonware_runtime::{deterministic, Runner};
262    use commonware_utils::NZU32;
263    use governor::Quota;
264    use std::time::Duration;
265
266    #[test]
267    fn test_requester_basic() {
268        // Instantiate context
269        let executor = deterministic::Runner::seeded(0);
270        executor.start(|context| async move {
271            // Create requester
272            let scheme = PrivateKey::from_seed(0);
273            let me = scheme.public_key();
274            let timeout = Duration::from_secs(5);
275            let config = Config {
276                public_key: scheme.public_key(),
277                rate_limit: Quota::per_second(NZU32!(1)),
278                initial: Duration::from_millis(100),
279                timeout,
280            };
281            let mut requester = Requester::new(context.clone(), config);
282
283            // Request before any participants
284            assert_eq!(requester.request(false), None);
285            assert_eq!(requester.len(), 0);
286
287            // Ensure we aren't waiting
288            assert_eq!(requester.next(), None);
289
290            // Handle non-existent request
291            assert!(requester.handle(&me, 0).is_none());
292
293            // Initialize requester
294            let other = PrivateKey::from_seed(1).public_key();
295            requester.reconcile(&[me.clone(), other.clone()]);
296
297            // Get request
298            let current = context.current();
299            let (participant, id) = requester.request(false).expect("failed to get participant");
300            assert_eq!(id, 0);
301            assert_eq!(participant, other);
302
303            // Check deadline
304            let (id, deadline) = requester.next().expect("failed to get deadline");
305            assert_eq!(id, 0);
306            assert_eq!(deadline, current + timeout);
307            assert_eq!(requester.len(), 1);
308
309            // Try to make another request (would exceed rate limit and can't do self)
310            assert_eq!(requester.request(false), None);
311
312            // Simulate processing time
313            context.sleep(Duration::from_millis(10)).await;
314
315            // Mark request as resolved with wrong participant
316            assert!(requester.handle(&me, id).is_none());
317
318            // Mark request as resolved
319            let request = requester
320                .handle(&participant, id)
321                .expect("failed to get request");
322            assert_eq!(request.id, id);
323            requester.resolve(request);
324
325            // Ensure no more requests
326            assert_eq!(requester.request(false), None);
327
328            // Ensure can't make another request
329            assert_eq!(requester.request(false), None);
330
331            // Wait for rate limit to reset
332            context.sleep(Duration::from_secs(1)).await;
333
334            // Get request
335            let (participant, id) = requester.request(false).expect("failed to get participant");
336            assert_eq!(participant, other);
337            assert_eq!(id, 1);
338
339            // Timeout request
340            let request = requester
341                .handle(&participant, id)
342                .expect("failed to get request");
343            requester.timeout(request);
344
345            // Ensure no more requests
346            assert_eq!(requester.request(false), None);
347
348            // Sleep until reset
349            context.sleep(Duration::from_secs(1)).await;
350
351            // Get request
352            let (participant, id) = requester.request(false).expect("failed to get participant");
353            assert_eq!(participant, other);
354            assert_eq!(id, 2);
355
356            // Cancel request
357            assert!(requester.cancel(id).is_some());
358
359            // Ensure no more requests
360            assert_eq!(requester.next(), None);
361            assert_eq!(requester.len(), 0);
362
363            // Sleep until reset
364            context.sleep(Duration::from_secs(1)).await;
365
366            // Block participant
367            requester.block(other);
368
369            // Get request
370            assert_eq!(requester.request(false), None);
371        });
372    }
373
374    #[test]
375    fn test_requester_multiple() {
376        // Instantiate context
377        let executor = deterministic::Runner::seeded(0);
378        executor.start(|context| async move {
379            // Create requester
380            let scheme = PrivateKey::from_seed(0);
381            let me = scheme.public_key();
382            let timeout = Duration::from_secs(5);
383            let config = Config {
384                public_key: scheme.public_key(),
385                rate_limit: Quota::per_second(NZU32!(1)),
386                initial: Duration::from_millis(100),
387                timeout,
388            };
389            let mut requester = Requester::new(context.clone(), config);
390
391            // Request before any participants
392            assert_eq!(requester.request(false), None);
393
394            // Ensure we aren't waiting
395            assert_eq!(requester.next(), None);
396
397            // Initialize requester
398            let other1 = PrivateKey::from_seed(1).public_key();
399            let other2 = PrivateKey::from_seed(2).public_key();
400            requester.reconcile(&[me.clone(), other1.clone(), other2.clone()]);
401
402            // Get request
403            let (participant, id) = requester.request(false).expect("failed to get participant");
404            assert_eq!(id, 0);
405            if participant == other2 {
406                let request = requester
407                    .handle(&participant, id)
408                    .expect("failed to get request");
409                requester.timeout(request);
410            } else {
411                panic!("unexpected participant");
412            }
413
414            // Get request
415            let (participant, id) = requester.request(false).expect("failed to get participant");
416            assert_eq!(id, 1);
417            if participant == other1 {
418                context.sleep(Duration::from_millis(10)).await;
419                let request = requester
420                    .handle(&participant, id)
421                    .expect("failed to get request");
422                requester.resolve(request);
423            } else {
424                panic!("unexpected participant");
425            }
426
427            // Try to make another request (would exceed rate limit and can't do self)
428            assert_eq!(requester.request(false), None);
429
430            // Wait for rate limit to reset
431            context.sleep(Duration::from_secs(1)).await;
432
433            // Get request
434            let (participant, id) = requester.request(false).expect("failed to get participant");
435            assert_eq!(participant, other1);
436            assert_eq!(id, 2);
437
438            // Cancel request
439            assert!(requester.cancel(id).is_some());
440
441            // Add another participant
442            let other3 = PrivateKey::from_seed(3).public_key();
443            requester.reconcile(&[me, other1, other2.clone(), other3.clone()]);
444
445            // Get request (new should be prioritized because lower default time)
446            let (participant, id) = requester.request(false).expect("failed to get participant");
447            assert_eq!(participant, other3);
448            assert_eq!(id, 3);
449
450            // Wait until eventually get slower participant
451            context.sleep(Duration::from_secs(1)).await;
452            loop {
453                // Shuffle participants
454                let (participant, _) = requester.request(true).unwrap();
455                if participant == other2 {
456                    break;
457                }
458
459                // Sleep until reset
460                context.sleep(Duration::from_secs(1)).await;
461            }
462        });
463    }
464}