commonware_p2p/utils/requester/
requester.rs

1//! Requester for sending rate-limited requests to peers.
2
3use super::{Config, PeerLabel};
4use commonware_runtime::{
5    telemetry::metrics::status::{CounterExt, Status},
6    Clock, Metrics,
7};
8use commonware_utils::{Array, PrioritySet};
9use either::Either;
10use governor::{
11    clock::Clock as GClock, middleware::NoOpMiddleware, state::keyed::HashMapStateStore,
12    RateLimiter,
13};
14use rand::{seq::SliceRandom, Rng};
15use std::{
16    collections::{HashMap, HashSet},
17    time::{Duration, SystemTime},
18};
19
20/// Unique identifier for a request.
21///
22/// Once u64 requests have been made, the ID wraps around (resetting to zero).
23/// As long as there are less than u64 requests outstanding, this should not be
24/// an issue.
25pub type ID = u64;
26
27/// Send rate-limited requests to peers prioritized by performance.
28///
29/// Requester attempts to saturate the bandwidth (inferred by rate limit)
30/// of the most performant peers (based on our latency observations). To encourage
31/// exploration, set the value of `initial` to less than the expected latency of
32/// performant peers and/or periodically set `shuffle` in `request`.
33pub struct Requester<E: Clock + GClock + Rng + Metrics, P: Array> {
34    context: E,
35    public_key: P,
36    metrics: super::Metrics,
37    initial: Duration,
38    timeout: Duration,
39
40    // Participants to exclude from requests
41    excluded: HashSet<P>,
42
43    // Rate limiter for participants
44    #[allow(clippy::type_complexity)]
45    rate_limiter: RateLimiter<P, HashMapStateStore<P>, E, NoOpMiddleware<E::Instant>>,
46    // Participants and their performance (lower is better)
47    participants: PrioritySet<P, u128>,
48
49    // Next ID to use for a request
50    id: ID,
51    // Outstanding requests (ID -> (participant, start time))
52    requests: HashMap<ID, (P, SystemTime)>,
53    // Deadlines for outstanding requests (ID -> deadline)
54    deadlines: PrioritySet<ID, SystemTime>,
55}
56
57/// Request responded from handling an ID.
58///
59/// When handling a request, the requester will remove the request and return
60/// this struct in case we want to `resolve` or `timeout` the request. This approach
61/// makes it impossible to forget to remove a handled request if it doesn't warrant
62/// updating the performance of the participant.
63pub struct Request<P: Array> {
64    /// Unique identifier for the request.
65    pub id: ID,
66
67    /// Participant that handled the request.
68    participant: P,
69
70    /// Time the request was issued.
71    start: SystemTime,
72}
73
74impl<E: Clock + GClock + Rng + Metrics, P: Array> Requester<E, P> {
75    /// Create a new requester.
76    pub fn new(context: E, config: Config<P>) -> Self {
77        let rate_limiter = RateLimiter::hashmap_with_clock(config.rate_limit, &context);
78        let metrics = super::Metrics::init(context.clone());
79        Self {
80            context,
81            public_key: config.public_key,
82            metrics,
83            initial: config.initial,
84            timeout: config.timeout,
85
86            excluded: HashSet::new(),
87
88            rate_limiter,
89            participants: PrioritySet::new(),
90
91            id: 0,
92            requests: HashMap::new(),
93            deadlines: PrioritySet::new(),
94        }
95    }
96
97    /// Indicate which participants can be sent requests.
98    pub fn reconcile(&mut self, participants: &[P]) {
99        self.participants
100            .reconcile(participants, self.initial.as_millis());
101        self.rate_limiter.shrink_to_fit();
102    }
103
104    /// Skip a participant for future requests.
105    ///
106    /// Participants added to this list will never be removed (even if dropped
107    /// during `reconcile`, in case they are re-added later).
108    pub fn block(&mut self, participant: P) {
109        self.excluded.insert(participant);
110    }
111
112    /// Ask for a participant to handle a request.
113    ///
114    /// If `shuffle` is true, the order of participants is shuffled before
115    /// a request is made. This is typically used when a request to the preferred
116    /// participant fails.
117    pub fn request(&mut self, shuffle: bool) -> Option<(P, ID)> {
118        // Prepare participant iterator
119        let participant_iter = if shuffle {
120            let mut participants = self.participants.iter().collect::<Vec<_>>();
121            participants.shuffle(&mut self.context);
122            Either::Left(participants.into_iter())
123        } else {
124            Either::Right(self.participants.iter())
125        };
126
127        // Look for a participant that can handle request
128        for (participant, _) in participant_iter {
129            // Check if me
130            if *participant == self.public_key {
131                continue;
132            }
133
134            // Check if excluded
135            if self.excluded.contains(participant) {
136                continue;
137            }
138
139            // Check if rate limit is exceeded (and update rate limiter if not)
140            if self.rate_limiter.check_key(participant).is_err() {
141                continue;
142            }
143
144            // Compute ID
145            let id = self.id;
146            self.id = self.id.wrapping_add(1);
147
148            // Record request issuance time
149            let now = self.context.current();
150            self.requests.insert(id, (participant.clone(), now));
151            let deadline = now.checked_add(self.timeout).expect("time overflowed");
152            self.deadlines.put(id, deadline);
153
154            // Increment metric if-and-only-if request is successful
155            self.metrics.created.inc(Status::Success);
156            return Some((participant.clone(), id));
157        }
158
159        // Increment failed metric if no participants are available
160        self.metrics.created.inc(Status::Failure);
161        None
162    }
163
164    /// Calculate a participant's new priority using exponential moving average.
165    fn update(&mut self, participant: P, elapsed: Duration) {
166        let Some(past) = self.participants.get(&participant) else {
167            return;
168        };
169        let next = past.saturating_add(elapsed.as_millis()) / 2;
170        self.metrics
171            .performance
172            .get_or_create(&PeerLabel::from(&participant))
173            .set(next as i64);
174        self.participants.put(participant, next);
175    }
176
177    /// Drop an outstanding request regardless of who it was intended for.
178    pub fn cancel(&mut self, id: ID) -> Option<Request<P>> {
179        let (participant, start) = self.requests.remove(&id)?;
180        self.deadlines.remove(&id);
181        Some(Request {
182            id,
183            participant,
184            start,
185        })
186    }
187
188    /// Handle a request by ID, ensuring the provided `participant` was
189    /// associated with said ID.
190    ///
191    /// If the request was outstanding, a `Request` is returned that can
192    /// either be resolved or timed out.
193    pub fn handle(&mut self, participant: &P, id: ID) -> Option<Request<P>> {
194        // Confirm ID exists and is for the participant
195        let (expected, _) = self.requests.get(&id)?;
196        if expected != participant {
197            return None;
198        }
199
200        // If expected, remove
201        self.cancel(id)
202    }
203
204    /// Resolve an outstanding request.
205    pub fn resolve(&mut self, request: Request<P>) {
206        // Get elapsed time
207        //
208        // If we can't compute the elapsed time for some reason (i.e. current time does
209        // not monotonically increase), we should still credit the participant for a
210        // timely response.
211        let elapsed = self
212            .context
213            .current()
214            .duration_since(request.start)
215            .unwrap_or_default();
216
217        // Update performance
218        self.update(request.participant, elapsed);
219        self.metrics.requests.inc(Status::Success);
220        self.metrics.resolves.observe(elapsed.as_secs_f64());
221    }
222
223    /// Timeout an outstanding request.
224    pub fn timeout(&mut self, request: Request<P>) {
225        self.update(request.participant, self.timeout);
226        self.metrics.requests.inc(Status::Timeout);
227    }
228
229    /// Fail an outstanding request and penalize the request
230    /// participant with the timeout duration.
231    ///
232    /// This is used when we fail to send a request to a participant.
233    pub fn fail(&mut self, request: Request<P>) {
234        self.update(request.participant, self.timeout);
235        self.metrics.requests.inc(Status::Failure);
236    }
237
238    /// Get the next outstanding ID and deadline.
239    pub fn next(&self) -> Option<(ID, SystemTime)> {
240        let (id, deadline) = self.deadlines.peek()?;
241        Some((*id, *deadline))
242    }
243
244    /// Get the number of outstanding requests.
245    #[allow(clippy::len_without_is_empty)]
246    pub fn len(&self) -> usize {
247        self.requests.len()
248    }
249
250    /// Get the number of blocked participants.
251    pub fn len_blocked(&self) -> usize {
252        self.excluded.len()
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use commonware_cryptography::ed25519::PrivateKey;
260    use commonware_cryptography::{PrivateKeyExt as _, Signer as _};
261    use commonware_runtime::deterministic;
262    use commonware_runtime::Runner;
263    use commonware_utils::NZU32;
264    use governor::Quota;
265    use std::time::Duration;
266
267    #[test]
268    fn test_requester_basic() {
269        // Instantiate context
270        let executor = deterministic::Runner::seeded(0);
271        executor.start(|context| async move {
272            // Create requester
273            let scheme = PrivateKey::from_seed(0);
274            let me = scheme.public_key();
275            let timeout = Duration::from_secs(5);
276            let config = Config {
277                public_key: scheme.public_key(),
278                rate_limit: Quota::per_second(NZU32!(1)),
279                initial: Duration::from_millis(100),
280                timeout,
281            };
282            let mut requester = Requester::new(context.clone(), config);
283
284            // Request before any participants
285            assert_eq!(requester.request(false), None);
286            assert_eq!(requester.len(), 0);
287
288            // Ensure we aren't waiting
289            assert_eq!(requester.next(), None);
290
291            // Handle non-existent request
292            assert!(requester.handle(&me, 0).is_none());
293
294            // Initialize requester
295            let other = PrivateKey::from_seed(1).public_key();
296            requester.reconcile(&[me.clone(), other.clone()]);
297
298            // Get request
299            let current = context.current();
300            let (participant, id) = requester.request(false).expect("failed to get participant");
301            assert_eq!(id, 0);
302            assert_eq!(participant, other);
303
304            // Check deadline
305            let (id, deadline) = requester.next().expect("failed to get deadline");
306            assert_eq!(id, 0);
307            assert_eq!(deadline, current + timeout);
308            assert_eq!(requester.len(), 1);
309
310            // Try to make another request (would exceed rate limit and can't do self)
311            assert_eq!(requester.request(false), None);
312
313            // Simulate processing time
314            context.sleep(Duration::from_millis(10)).await;
315
316            // Mark request as resolved with wrong participant
317            assert!(requester.handle(&me, id).is_none());
318
319            // Mark request as resolved
320            let request = requester
321                .handle(&participant, id)
322                .expect("failed to get request");
323            assert_eq!(request.id, id);
324            requester.resolve(request);
325
326            // Ensure no more requests
327            assert_eq!(requester.request(false), None);
328
329            // Ensure can't make another request
330            assert_eq!(requester.request(false), None);
331
332            // Wait for rate limit to reset
333            context.sleep(Duration::from_secs(1)).await;
334
335            // Get request
336            let (participant, id) = requester.request(false).expect("failed to get participant");
337            assert_eq!(participant, other);
338            assert_eq!(id, 1);
339
340            // Timeout request
341            let request = requester
342                .handle(&participant, id)
343                .expect("failed to get request");
344            requester.timeout(request);
345
346            // Ensure no more requests
347            assert_eq!(requester.request(false), None);
348
349            // Sleep until reset
350            context.sleep(Duration::from_secs(1)).await;
351
352            // Get request
353            let (participant, id) = requester.request(false).expect("failed to get participant");
354            assert_eq!(participant, other);
355            assert_eq!(id, 2);
356
357            // Cancel request
358            assert!(requester.cancel(id).is_some());
359
360            // Ensure no more requests
361            assert_eq!(requester.next(), None);
362            assert_eq!(requester.len(), 0);
363
364            // Sleep until reset
365            context.sleep(Duration::from_secs(1)).await;
366
367            // Block participant
368            requester.block(other);
369
370            // Get request
371            assert_eq!(requester.request(false), None);
372        });
373    }
374
375    #[test]
376    fn test_requester_multiple() {
377        // Instantiate context
378        let executor = deterministic::Runner::seeded(0);
379        executor.start(|context| async move {
380            // Create requester
381            let scheme = PrivateKey::from_seed(0);
382            let me = scheme.public_key();
383            let timeout = Duration::from_secs(5);
384            let config = Config {
385                public_key: scheme.public_key(),
386                rate_limit: Quota::per_second(NZU32!(1)),
387                initial: Duration::from_millis(100),
388                timeout,
389            };
390            let mut requester = Requester::new(context.clone(), config);
391
392            // Request before any participants
393            assert_eq!(requester.request(false), None);
394
395            // Ensure we aren't waiting
396            assert_eq!(requester.next(), None);
397
398            // Initialize requester
399            let other1 = PrivateKey::from_seed(1).public_key();
400            let other2 = PrivateKey::from_seed(2).public_key();
401            requester.reconcile(&[me.clone(), other1.clone(), other2.clone()]);
402
403            // Get request
404            let (participant, id) = requester.request(false).expect("failed to get participant");
405            assert_eq!(id, 0);
406            if participant == other2 {
407                let request = requester
408                    .handle(&participant, id)
409                    .expect("failed to get request");
410                requester.timeout(request);
411            } else {
412                panic!("unexpected participant");
413            }
414
415            // Get request
416            let (participant, id) = requester.request(false).expect("failed to get participant");
417            assert_eq!(id, 1);
418            if participant == other1 {
419                context.sleep(Duration::from_millis(10)).await;
420                let request = requester
421                    .handle(&participant, id)
422                    .expect("failed to get request");
423                requester.resolve(request);
424            } else {
425                panic!("unexpected participant");
426            }
427
428            // Try to make another request (would exceed rate limit and can't do self)
429            assert_eq!(requester.request(false), None);
430
431            // Wait for rate limit to reset
432            context.sleep(Duration::from_secs(1)).await;
433
434            // Get request
435            let (participant, id) = requester.request(false).expect("failed to get participant");
436            assert_eq!(participant, other1);
437            assert_eq!(id, 2);
438
439            // Cancel request
440            assert!(requester.cancel(id).is_some());
441
442            // Add another participant
443            let other3 = PrivateKey::from_seed(3).public_key();
444            requester.reconcile(&[me, other1, other2.clone(), other3.clone()]);
445
446            // Get request (new should be prioritized because lower default time)
447            let (participant, id) = requester.request(false).expect("failed to get participant");
448            assert_eq!(participant, other3);
449            assert_eq!(id, 3);
450
451            // Wait until eventually get slower participant
452            context.sleep(Duration::from_secs(1)).await;
453            loop {
454                // Shuffle participants
455                let (participant, _) = requester.request(true).unwrap();
456                if participant == other2 {
457                    break;
458                }
459
460                // Sleep until reset
461                context.sleep(Duration::from_secs(1)).await;
462            }
463        });
464    }
465}