Skip to main content

snarkos_node_bft/helpers/
pending.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::MAX_FETCH_TIMEOUT_IN_MS;
17use snarkos_node_bft_ledger_service::LedgerService;
18use snarkvm::{
19    console::network::{Network, consensus_config_value},
20    prelude::Result,
21};
22
23use anyhow::anyhow;
24#[cfg(feature = "locktick")]
25use locktick::parking_lot::RwLock;
26#[cfg(not(feature = "locktick"))]
27use parking_lot::RwLock;
28use std::{
29    collections::{HashMap, HashSet},
30    hash::Hash,
31    net::SocketAddr,
32    sync::Arc,
33};
34use time::OffsetDateTime;
35use tokio::sync::oneshot;
36
37/// The maximum number of seconds to wait before expiring a callback.
38/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds.
39pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS.div_ceil(1000) as i64;
40
41/// Returns the maximum number of redundant requests for the number of validators in the specified round.
42pub fn max_redundant_requests<N: Network>(ledger: Arc<dyn LedgerService<N>>, round: u64) -> Result<usize> {
43    // Determine the number of validators in the committee lookback for the given round.
44    let num_validators =
45        if let Ok(n) = ledger.get_committee_lookback_for_round(round).map(|committee| committee.num_members()) {
46            n
47        } else {
48            let max_committee_size = consensus_config_value!(N, MAX_CERTIFICATES, ledger.latest_block_height())
49                .ok_or_else(|| anyhow!("Couldn't obtain MAX_CERTIFICATES"))?;
50            max_committee_size as usize
51        };
52
53    // Note: It is adequate to set this value to the availability threshold,
54    // as with high probability one will respond honestly (in the best and worst case
55    // with stake spread across the validators evenly and unevenly, respectively).
56    Ok(1 + num_validators.saturating_div(3))
57}
58
59#[derive(Debug)]
60pub struct Pending<T: PartialEq + Eq + Hash, V: Clone> {
61    /// The map of pending `items` to a map of `peer IPs` and their optional `callback` queue.
62    /// Each callback has a timeout and a flag indicating if it is associated with a sent request.
63    pending: RwLock<HashMap<T, HashMap<SocketAddr, Vec<(oneshot::Sender<V>, i64, bool)>>>>,
64}
65
66impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Default for Pending<T, V> {
67    /// Initializes a new instance of the pending queue.
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl<T: Copy + Clone + PartialEq + Eq + Hash, V: Clone> Pending<T, V> {
74    /// Initializes a new instance of the pending queue.
75    pub fn new() -> Self {
76        Self { pending: Default::default() }
77    }
78
79    /// Returns `true` if the pending queue is empty.
80    pub fn is_empty(&self) -> bool {
81        self.pending.read().is_empty()
82    }
83
84    /// Returns the number of pending in the pending queue.
85    pub fn len(&self) -> usize {
86        self.pending.read().len()
87    }
88
89    /// Returns `true` if the pending queue contains the specified `item`.
90    pub fn contains(&self, item: impl Into<T>) -> bool {
91        self.pending.read().contains_key(&item.into())
92    }
93
94    /// Returns `true` if the pending queue contains the specified `item` for the specified `peer IP`.
95    pub fn contains_peer(&self, item: impl Into<T>, peer_ip: SocketAddr) -> bool {
96        self.pending.read().get(&item.into()).is_some_and(|peer_ips| peer_ips.contains_key(&peer_ip))
97    }
98
99    /// Returns `true` if the pending queue contains the specified `item` for the specified `peer IP` with a sent request.
100    pub fn contains_peer_with_sent_request(&self, item: impl Into<T>, peer_ip: SocketAddr) -> bool {
101        self.pending.read().get(&item.into()).is_some_and(|peer_ips| {
102            peer_ips
103                .get(&peer_ip)
104                .map(|callbacks| callbacks.iter().any(|(_, _, request_sent)| *request_sent))
105                .unwrap_or(false)
106        })
107    }
108
109    /// Returns the peer IPs for the specified `item`.
110    pub fn get_peers(&self, item: impl Into<T>) -> Option<HashSet<SocketAddr>> {
111        self.pending.read().get(&item.into()).map(|map| map.keys().cloned().collect())
112    }
113
114    /// Returns the number of pending callbacks for the specified `item`.
115    pub fn num_callbacks(&self, item: impl Into<T>) -> usize {
116        let item = item.into();
117        let now = OffsetDateTime::now_utc().unix_timestamp();
118        // Clear the callbacks that have expired.
119        self.clear_expired_callbacks_for_item(now, item);
120        // Return the number of live callbacks.
121        self.pending.read().get(&item).map_or(0, |peers| peers.values().fold(0, |acc, v| acc.saturating_add(v.len())))
122    }
123
124    /// Returns the number of pending sent requests for the specified `item`.
125    pub fn num_sent_requests(&self, item: impl Into<T>) -> usize {
126        let item = item.into();
127        let now = OffsetDateTime::now_utc().unix_timestamp();
128        // Clear the callbacks that have expired.
129        self.clear_expired_callbacks_for_item(now, item);
130        // Return the number of live callbacks.
131        self.pending
132            .read()
133            .get(&item)
134            .map_or(0, |peers| peers.values().flatten().filter(|(_, _, request_sent)| *request_sent).count())
135    }
136
137    /// Inserts the specified `item` and `peer IP` to the pending queue,
138    /// returning `true` if the `peer IP` was newly-inserted into the entry for the `item`.
139    ///
140    /// In addition, an optional `callback` may be provided, that is triggered upon removal.
141    /// Note: The callback, if provided, is **always** inserted into the callback queue.
142    pub fn insert(
143        &self,
144        item: impl Into<T>,
145        peer_ip: SocketAddr,
146        callback: Option<(oneshot::Sender<V>, bool)>,
147    ) -> bool {
148        let item = item.into();
149        let now = OffsetDateTime::now_utc().unix_timestamp();
150        // Insert the peer IP and optional callback into the pending queue.
151        let result = {
152            // Acquire the pending lock.
153            let mut pending = self.pending.write();
154
155            // Insert a peer into the pending queue.
156            let entry = pending.entry(item).or_default();
157
158            // Check if the peer IP is already present in the entry.
159            let is_new_peer = !entry.contains_key(&peer_ip);
160
161            // Get the entry for the peer IP.
162            let peer_entry = entry.entry(peer_ip).or_default();
163
164            // If a callback is provided, insert it into the callback queue.
165            if let Some((callback, request_sent)) = callback {
166                peer_entry.push((callback, now, request_sent));
167            }
168
169            is_new_peer
170        };
171
172        // Clear the callbacks that have expired.
173        self.clear_expired_callbacks_for_item(now, item);
174
175        // Return the result.
176        result
177    }
178
179    /// Removes the specified `item` from the pending queue.
180    /// If the `item` exists and is removed, the peer IPs are returned.
181    /// If the `item` does not exist, `None` is returned.
182    pub fn remove(&self, item: impl Into<T>, callback_value: Option<V>) -> Option<HashSet<SocketAddr>> {
183        let item = item.into();
184        // Remove the item from the pending queue and process any remaining callbacks.
185        match self.pending.write().remove(&item) {
186            Some(callbacks) => {
187                // Get the peer IPs.
188                let peer_ips = callbacks.keys().copied().collect();
189                // Process the callbacks.
190                if let Some(callback_value) = callback_value {
191                    // Send a notification to the callback.
192                    for (callback, _, _) in callbacks.into_values().flat_map(|callbacks| callbacks.into_iter()) {
193                        callback.send(callback_value.clone()).ok();
194                    }
195                }
196                // Return the peer IPs.
197                Some(peer_ips)
198            }
199            None => None,
200        }
201    }
202
203    /// Removes the callbacks for the specified `item` that have expired.
204    pub fn clear_expired_callbacks_for_item(&self, now: i64, item: impl Into<T>) {
205        let item = item.into();
206
207        // Acquire the pending lock.
208        let mut pending = self.pending.write();
209
210        // Clear the callbacks that have expired.
211        if let Some(peer_map) = pending.get_mut(&item) {
212            // Iterate over each peer IP for the item and filter out expired callbacks.
213            for (_, callbacks) in peer_map.iter_mut() {
214                callbacks.retain(|(_, timestamp, _)| now - *timestamp <= CALLBACK_EXPIRATION_IN_SECS);
215            }
216
217            // Remove peer IPs that no longer have any callbacks.
218            peer_map.retain(|_, callbacks| !callbacks.is_empty());
219
220            // If there are no more remaining callbacks for the item across all peer IPs, remove the item from pending.
221            if peer_map.is_empty() {
222                pending.remove(&item);
223            }
224        }
225    }
226
227    /// Removes the callbacks for all items have that expired.
228    pub fn clear_expired_callbacks(&self) {
229        let now = OffsetDateTime::now_utc().unix_timestamp();
230        // Acquire the pending lock once for write access.
231        let mut pending = self.pending.write();
232
233        // Iterate over all items in pending to modify the data structure in-place.
234        pending.retain(|_, peer_map| {
235            // Iterate over each peer IP for the item and filter out expired callbacks.
236            for (_, callbacks) in peer_map.iter_mut() {
237                callbacks.retain(|(_, timestamp, _)| now - *timestamp <= CALLBACK_EXPIRATION_IN_SECS);
238            }
239
240            // Remove peer IPs that no longer have any callbacks.
241            peer_map.retain(|_, callbacks| !callbacks.is_empty());
242
243            // Keep the item in the pending map only if there are callbacks left.
244            !peer_map.is_empty()
245        });
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use snarkvm::{
253        ledger::narwhal::TransmissionID,
254        prelude::{Rng, TestRng},
255    };
256
257    use std::{thread, time::Duration};
258
259    type CurrentNetwork = snarkvm::prelude::MainnetV0;
260
261    const ITERATIONS: usize = 100;
262
263    #[test]
264    fn test_pending() {
265        let rng = &mut TestRng::default();
266
267        // Initialize the ready queue.
268        let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
269
270        // Check initially empty.
271        assert!(pending.is_empty());
272        assert_eq!(pending.len(), 0);
273
274        // Initialize the solution IDs.
275        let solution_id_1 = TransmissionID::Solution(
276            rng.r#gen::<u64>().into(),
277            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
278        );
279        let solution_id_2 = TransmissionID::Solution(
280            rng.r#gen::<u64>().into(),
281            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
282        );
283        let solution_id_3 = TransmissionID::Solution(
284            rng.r#gen::<u64>().into(),
285            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
286        );
287        let solution_id_4 = TransmissionID::Solution(
288            rng.r#gen::<u64>().into(),
289            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
290        );
291
292        // Initialize the SocketAddrs.
293        let addr_1 = SocketAddr::from(([127, 0, 0, 1], 1234));
294        let addr_2 = SocketAddr::from(([127, 0, 0, 1], 2345));
295        let addr_3 = SocketAddr::from(([127, 0, 0, 1], 3456));
296        let addr_4 = SocketAddr::from(([127, 0, 0, 1], 4567));
297
298        // Initialize the callbacks.
299        let (callback_sender_1, _) = oneshot::channel();
300        let (callback_sender_2, _) = oneshot::channel();
301        let (callback_sender_3, _) = oneshot::channel();
302        let (callback_sender_4, _) = oneshot::channel();
303
304        // Insert the solution IDs.
305        assert!(pending.insert(solution_id_1, addr_1, Some((callback_sender_1, true))));
306        assert!(pending.insert(solution_id_2, addr_2, Some((callback_sender_2, true))));
307        assert!(pending.insert(solution_id_3, addr_3, Some((callback_sender_3, true))));
308        // Add a callback without a sent request.
309        assert!(pending.insert(solution_id_4, addr_4, Some((callback_sender_4, false))));
310
311        // Check the number of SocketAddrs.
312        assert_eq!(pending.len(), 4);
313        assert!(!pending.is_empty());
314
315        // Check the items.
316        let ids = [solution_id_1, solution_id_2, solution_id_3];
317        let peers = [addr_1, addr_2, addr_3];
318
319        for i in 0..3 {
320            let id = ids[i];
321            assert!(pending.contains(id));
322            assert!(pending.contains_peer(id, peers[i]));
323            assert!(pending.contains_peer_with_sent_request(id, peers[i]));
324        }
325        // Ensure the last item does not have a sent request.
326        assert!(pending.contains_peer(solution_id_4, addr_4));
327        assert!(!pending.contains_peer_with_sent_request(solution_id_4, addr_4));
328
329        let unknown_id = TransmissionID::Solution(
330            rng.r#gen::<u64>().into(),
331            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
332        );
333        assert!(!pending.contains(unknown_id));
334
335        // Check get.
336        assert_eq!(pending.get_peers(solution_id_1), Some(HashSet::from([addr_1])));
337        assert_eq!(pending.get_peers(solution_id_2), Some(HashSet::from([addr_2])));
338        assert_eq!(pending.get_peers(solution_id_3), Some(HashSet::from([addr_3])));
339        assert_eq!(pending.get_peers(solution_id_4), Some(HashSet::from([addr_4])));
340        assert_eq!(pending.get_peers(unknown_id), None);
341
342        // Check remove.
343        assert!(pending.remove(solution_id_1, None).is_some());
344        assert!(pending.remove(solution_id_2, None).is_some());
345        assert!(pending.remove(solution_id_3, None).is_some());
346        assert!(pending.remove(solution_id_4, None).is_some());
347        assert!(pending.remove(unknown_id, None).is_none());
348
349        // Check empty again.
350        assert!(pending.is_empty());
351    }
352
353    #[test]
354    fn test_expired_callbacks() {
355        let rng = &mut TestRng::default();
356
357        // Initialize the ready queue.
358        let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
359
360        // Check initially empty.
361        assert!(pending.is_empty());
362        assert_eq!(pending.len(), 0);
363
364        // Initialize the solution ID.
365        let solution_id_1 = TransmissionID::Solution(
366            rng.r#gen::<u64>().into(),
367            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
368        );
369
370        // Initialize the SocketAddrs.
371        let addr_1 = SocketAddr::from(([127, 0, 0, 1], 1234));
372        let addr_2 = SocketAddr::from(([127, 0, 0, 1], 2345));
373        let addr_3 = SocketAddr::from(([127, 0, 0, 1], 3456));
374
375        // Initialize the callbacks.
376        let (callback_sender_1, _) = oneshot::channel();
377        let (callback_sender_2, _) = oneshot::channel();
378        let (callback_sender_3, _) = oneshot::channel();
379
380        // Insert the solution ID.
381        assert!(pending.insert(solution_id_1, addr_1, Some((callback_sender_1, true))));
382        assert!(pending.insert(solution_id_1, addr_2, Some((callback_sender_2, true))));
383
384        // Sleep for a few seconds.
385        thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 - 1));
386
387        assert!(pending.insert(solution_id_1, addr_3, Some((callback_sender_3, true))));
388
389        // Check that the number of callbacks has not changed.
390        assert_eq!(pending.num_callbacks(solution_id_1), 3);
391
392        // Wait for 2 seconds.
393        thread::sleep(Duration::from_secs(2));
394
395        // Ensure that the expired callbacks have been removed.
396        assert_eq!(pending.num_callbacks(solution_id_1), 1);
397
398        // Wait for ` CALLBACK_EXPIRATION_IN_SECS` seconds.
399        thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64));
400
401        // Ensure that the expired callbacks have been removed.
402        assert_eq!(pending.num_callbacks(solution_id_1), 0);
403    }
404
405    #[test]
406    fn test_num_sent_requests() {
407        let rng = &mut TestRng::default();
408
409        // Initialize the ready queue.
410        let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
411
412        for _ in 0..ITERATIONS {
413            // Generate a solution ID.
414            let solution_id = TransmissionID::Solution(
415                rng.r#gen::<u64>().into(),
416                rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
417            );
418            // Check if the number of sent requests is correct.
419            let mut expected_num_sent_requests = 0;
420            for i in 0..ITERATIONS {
421                // Generate a peer address.
422                let addr = SocketAddr::from(([127, 0, 0, 1], i as u16));
423                // Initialize a callback.
424                let (callback_sender, _) = oneshot::channel();
425                // Randomly determine if the callback is associated with a sent request.
426                let is_sent_request = rng.r#gen();
427                // Increment the expected number of sent requests.
428                if is_sent_request {
429                    expected_num_sent_requests += 1;
430                }
431                // Insert the solution ID.
432                assert!(pending.insert(solution_id, addr, Some((callback_sender, is_sent_request))));
433            }
434            // Ensure that the number of sent requests is correct.
435            assert_eq!(pending.num_sent_requests(solution_id), expected_num_sent_requests);
436        }
437    }
438
439    #[test]
440    fn test_expired_items() {
441        let rng = &mut TestRng::default();
442
443        // Initialize the ready queue.
444        let pending = Pending::<TransmissionID<CurrentNetwork>, ()>::new();
445
446        // Check initially empty.
447        assert!(pending.is_empty());
448        assert_eq!(pending.len(), 0);
449
450        // Initialize the solution IDs.
451        let solution_id_1 = TransmissionID::Solution(
452            rng.r#gen::<u64>().into(),
453            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
454        );
455        let solution_id_2 = TransmissionID::Solution(
456            rng.r#gen::<u64>().into(),
457            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
458        );
459
460        // Initialize the SocketAddrs.
461        let addr_1 = SocketAddr::from(([127, 0, 0, 1], 1234));
462        let addr_2 = SocketAddr::from(([127, 0, 0, 1], 2345));
463        let addr_3 = SocketAddr::from(([127, 0, 0, 1], 3456));
464
465        // Initialize the callbacks.
466        let (callback_sender_1, _) = oneshot::channel();
467        let (callback_sender_2, _) = oneshot::channel();
468        let (callback_sender_3, _) = oneshot::channel();
469
470        // Insert the commitments.
471        assert!(pending.insert(solution_id_1, addr_1, Some((callback_sender_1, true))));
472        assert!(pending.insert(solution_id_1, addr_2, Some((callback_sender_2, true))));
473        assert!(pending.insert(solution_id_2, addr_3, Some((callback_sender_3, true))));
474
475        // Ensure that the items have not been expired yet.
476        assert_eq!(pending.num_callbacks(solution_id_1), 2);
477        assert_eq!(pending.num_callbacks(solution_id_2), 1);
478        assert_eq!(pending.len(), 2);
479
480        // Wait for ` CALLBACK_EXPIRATION_IN_SECS + 1` seconds.
481        thread::sleep(Duration::from_secs(CALLBACK_EXPIRATION_IN_SECS as u64 + 1));
482
483        // Expire the pending callbacks.
484        pending.clear_expired_callbacks();
485
486        // Ensure that the items have been expired.
487        assert_eq!(pending.num_callbacks(solution_id_1), 0);
488        assert_eq!(pending.num_callbacks(solution_id_2), 0);
489        assert!(pending.is_empty());
490    }
491}
492
493#[cfg(test)]
494mod prop_tests {
495    use super::*;
496
497    use test_strategy::{Arbitrary, proptest};
498
499    #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
500    pub struct Item {
501        pub id: usize,
502    }
503
504    #[derive(Arbitrary, Clone, Debug)]
505    pub struct PendingInput {
506        #[strategy(1..5_000usize)]
507        pub count: usize,
508    }
509
510    impl PendingInput {
511        pub fn to_pending(&self) -> Pending<Item, ()> {
512            let pending = Pending::<Item, ()>::new();
513            for i in 0..self.count {
514                pending.insert(
515                    Item { id: i },
516                    SocketAddr::from(([127, 0, 0, 1], i as u16)),
517                    Some((oneshot::channel().0, true)),
518                );
519            }
520            pending
521        }
522    }
523
524    #[proptest]
525    fn test_pending_proptest(input: PendingInput) {
526        let pending = input.to_pending();
527        assert_eq!(pending.len(), input.count);
528        assert!(!pending.is_empty());
529        assert!(!pending.contains(Item { id: input.count + 1 }));
530        assert_eq!(pending.get_peers(Item { id: input.count + 1 }), None);
531        assert!(pending.remove(Item { id: input.count + 1 }, None).is_none());
532        for i in 0..input.count {
533            assert!(pending.contains(Item { id: i }));
534            let peer_ip = SocketAddr::from(([127, 0, 0, 1], i as u16));
535            assert!(pending.contains_peer(Item { id: i }, peer_ip));
536            assert_eq!(pending.get_peers(Item { id: i }), Some(HashSet::from([peer_ip])));
537            assert!(pending.remove(Item { id: i }, None).is_some());
538        }
539        assert!(pending.is_empty());
540    }
541}