snarkos_node_bft/helpers/
ready.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use snarkvm::{
17    console::prelude::*,
18    ledger::{
19        block::Transaction,
20        narwhal::{Data, Transmission, TransmissionID},
21        puzzle::{Solution, SolutionID},
22    },
23};
24
25use indexmap::{IndexMap, IndexSet};
26use std::collections::{HashMap, VecDeque, hash_map::Entry::Vacant};
27
28/// Maintains a queue of verified ("ready") transmissions.
29#[derive(Clone, Debug)]
30pub struct Ready<N: Network> {
31    /// Maps each transmission ID to its logical index (physical index + offset)
32    /// in `transmissions`.
33    transmission_ids: HashMap<TransmissionID<N>, i64>,
34    /// An ordered collection of (transmission ID, transmission).
35    transmissions: VecDeque<(TransmissionID<N>, Transmission<N>)>,
36    /// An offset used to adjust logical indices when elements are inserted or
37    /// removed at the front.
38    offset: i64,
39}
40
41impl<N: Network> Default for Ready<N> {
42    /// Initializes a new instance of the ready queue.
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl<N: Network> Ready<N> {
49    /// Initializes a new instance of the ready queue.
50    pub fn new() -> Self {
51        Self { transmission_ids: Default::default(), transmissions: Default::default(), offset: Default::default() }
52    }
53
54    /// Returns `true` if the ready queue is empty.
55    pub fn is_empty(&self) -> bool {
56        self.transmissions.is_empty()
57    }
58
59    /// Returns the number of transmissions in the ready queue.
60    pub fn num_transmissions(&self) -> usize {
61        self.transmissions.len()
62    }
63
64    /// Returns the number of ratifications in the ready queue.
65    pub fn num_ratifications(&self) -> usize {
66        self.transmission_ids.keys().filter(|id| matches!(id, TransmissionID::Ratification)).count()
67    }
68
69    /// Returns the number of solutions in the ready queue.
70    pub fn num_solutions(&self) -> usize {
71        self.transmission_ids.keys().filter(|id| matches!(id, TransmissionID::Solution(..))).count()
72    }
73
74    /// Returns the number of transactions in the ready queue.
75    pub fn num_transactions(&self) -> usize {
76        self.transmission_ids.keys().filter(|id| matches!(id, TransmissionID::Transaction(..))).count()
77    }
78
79    /// Returns the transmission IDs in the ready queue.
80    pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
81        self.transmission_ids.keys().copied().collect()
82    }
83
84    /// Returns the transmissions in the ready queue.
85    pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
86        self.transmissions.iter().cloned().collect()
87    }
88
89    /// Returns the solutions in the ready queue.
90    pub fn solutions(&self) -> Vec<(SolutionID<N>, Data<Solution<N>>)> {
91        self.transmissions
92            .iter()
93            .filter_map(|(id, transmission)| match (id, transmission) {
94                (TransmissionID::Solution(id, _), Transmission::Solution(solution)) => Some((*id, solution.clone())),
95                _ => None,
96            })
97            .collect()
98    }
99
100    /// Returns the transactions in the ready queue.
101    pub fn transactions(&self) -> Vec<(N::TransactionID, Data<Transaction<N>>)> {
102        self.transmissions
103            .iter()
104            .filter_map(|(id, transmission)| match (id, transmission) {
105                (TransmissionID::Transaction(id, _), Transmission::Transaction(tx)) => Some((*id, tx.clone())),
106                _ => None,
107            })
108            .collect()
109    }
110}
111
112impl<N: Network> Ready<N> {
113    /// Returns `true` if the ready queue contains the specified `transmission ID`.
114    pub fn contains(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
115        self.transmission_ids.contains_key(&transmission_id.into())
116    }
117
118    /// Returns the transmission, given the specified `transmission ID`.
119    pub fn get(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
120        self.transmission_ids
121            .get(&transmission_id.into())
122            .and_then(|&index| self.transmissions.get((index - self.offset) as usize))
123            .map(|(_, transmission)| transmission.clone())
124    }
125
126    /// Inserts the specified (`transmission ID`, `transmission`) to the ready queue.
127    /// Returns `true` if the transmission is new, and was added to the ready queue.
128    pub fn insert(&mut self, transmission_id: impl Into<TransmissionID<N>>, transmission: Transmission<N>) -> bool {
129        let physical_index = self.transmissions.len();
130        let transmission_id = transmission_id.into();
131
132        if let Vacant(entry) = self.transmission_ids.entry(transmission_id) {
133            entry.insert(physical_index as i64 + self.offset);
134            self.transmissions.push_back((transmission_id, transmission));
135            true
136        } else {
137            false
138        }
139    }
140
141    /// Inserts the specified (`transmission ID`, `transmission`) at the front
142    /// of the ready queue.
143    /// Returns `true` if the transmission is new, and was added to the ready queue.
144    pub fn insert_front(
145        &mut self,
146        transmission_id: impl Into<TransmissionID<N>>,
147        transmission: Transmission<N>,
148    ) -> bool {
149        let transmission_id = transmission_id.into();
150        if let Vacant(entry) = self.transmission_ids.entry(transmission_id) {
151            self.offset -= 1;
152            let index = self.offset;
153
154            entry.insert(index);
155            self.transmissions.push_front((transmission_id, transmission));
156            true
157        } else {
158            false
159        }
160    }
161
162    /// Removes and returns the transmission at the front of the queue.
163    pub fn remove_front(&mut self) -> Option<(TransmissionID<N>, Transmission<N>)> {
164        if let Some((transmission_id, transmission)) = self.transmissions.pop_front() {
165            self.transmission_ids.remove(&transmission_id);
166
167            if self.transmission_ids.is_empty() {
168                debug_assert!(self.transmissions.is_empty());
169                self.offset = 0;
170            } else {
171                self.offset += 1;
172            }
173
174            Some((transmission_id, transmission))
175        } else {
176            None
177        }
178    }
179
180    /// Removes all solution transmissions from the queue (O(n)).
181    pub fn clear_solutions(&mut self) {
182        self.transmissions.retain(|(_, transmission)| !matches!(transmission, Transmission::Solution(_)));
183
184        // Rebuild the index and reset the offset.
185        self.transmission_ids.clear();
186        self.offset = 0;
187        for (i, (id, _)) in self.transmissions.iter().enumerate() {
188            self.transmission_ids.insert(*id, i as i64);
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use snarkvm::{ledger::narwhal::Data, prelude::Field};
197
198    use ::bytes::Bytes;
199
200    type CurrentNetwork = snarkvm::prelude::MainnetV0;
201
202    #[test]
203    fn test_ready() {
204        let rng = &mut TestRng::default();
205
206        // Sample random fake bytes.
207        let data =
208            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
209
210        // Initialize the ready queue.
211        let mut ready = Ready::<CurrentNetwork>::new();
212
213        // Initialize the solution IDs.
214        let solution_id_1 = TransmissionID::Solution(
215            rng.r#gen::<u64>().into(),
216            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
217        );
218        let solution_id_2 = TransmissionID::Solution(
219            rng.r#gen::<u64>().into(),
220            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
221        );
222        let solution_id_3 = TransmissionID::Solution(
223            rng.r#gen::<u64>().into(),
224            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
225        );
226
227        // Initialize the solutions.
228        let solution_1 = Transmission::Solution(data(rng));
229        let solution_2 = Transmission::Solution(data(rng));
230        let solution_3 = Transmission::Solution(data(rng));
231
232        // Insert the solution IDs.
233        assert!(ready.insert(solution_id_1, solution_1.clone()));
234        assert!(ready.insert(solution_id_2, solution_2.clone()));
235        assert!(ready.insert(solution_id_3, solution_3.clone()));
236
237        // Check the number of transmissions.
238        assert_eq!(ready.num_transmissions(), 3);
239
240        // Check the transmission IDs.
241        let transmission_ids = vec![solution_id_1, solution_id_2, solution_id_3].into_iter().collect::<IndexSet<_>>();
242        assert_eq!(ready.transmission_ids(), transmission_ids);
243        transmission_ids.iter().for_each(|id| assert!(ready.contains(*id)));
244
245        // Check that an unknown solution ID is not in the ready queue.
246        let solution_id_unknown = TransmissionID::Solution(
247            rng.r#gen::<u64>().into(),
248            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
249        );
250        assert!(!ready.contains(solution_id_unknown));
251
252        // Check the transmissions.
253        assert_eq!(ready.get(solution_id_1), Some(solution_1.clone()));
254        assert_eq!(ready.get(solution_id_2), Some(solution_2.clone()));
255        assert_eq!(ready.get(solution_id_3), Some(solution_3.clone()));
256        assert_eq!(ready.get(solution_id_unknown), None);
257
258        // Drain the ready queue.
259        let mut transmissions = Vec::with_capacity(3);
260        for _ in 0..3 {
261            transmissions.push(ready.remove_front().unwrap())
262        }
263
264        // Check the number of transmissions.
265        assert!(ready.is_empty());
266        // Check the transmission IDs.
267        assert_eq!(ready.transmission_ids(), IndexSet::new());
268        // Check the transmissions.
269        assert_eq!(transmissions, vec![
270            (solution_id_1, solution_1),
271            (solution_id_2, solution_2),
272            (solution_id_3, solution_3)
273        ]);
274    }
275
276    #[test]
277    fn test_ready_duplicate() {
278        use rand::RngCore;
279        let rng = &mut TestRng::default();
280
281        // Sample random fake bytes.
282        let mut vec = vec![0u8; 512];
283        rng.fill_bytes(&mut vec);
284        let data = Data::Buffer(Bytes::from(vec));
285
286        // Initialize the ready queue.
287        let mut ready = Ready::<CurrentNetwork>::new();
288
289        // Initialize the solution ID.
290        let solution_id = TransmissionID::Solution(
291            rng.r#gen::<u64>().into(),
292            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
293        );
294
295        // Initialize the solution.
296        let solution = Transmission::Solution(data);
297
298        // Insert the solution ID.
299        assert!(ready.insert(solution_id, solution.clone()));
300        assert!(!ready.insert(solution_id, solution));
301
302        // Check the number of transmissions.
303        assert_eq!(ready.num_transmissions(), 1);
304    }
305
306    #[test]
307    fn test_insert_front() {
308        let rng = &mut TestRng::default();
309        let data =
310            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
311
312        // Initialize the ready queue.
313        let mut ready = Ready::<CurrentNetwork>::new();
314
315        // Initialize the solution IDs.
316        let solution_id_1 = TransmissionID::Solution(
317            rng.r#gen::<u64>().into(),
318            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
319        );
320        let solution_id_2 = TransmissionID::Solution(
321            rng.r#gen::<u64>().into(),
322            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
323        );
324
325        // Initialize the solutions.
326        let solution_1 = Transmission::Solution(data(rng));
327        let solution_2 = Transmission::Solution(data(rng));
328
329        // Insert the two solutions at the front, check the offset.
330        assert!(ready.insert_front(solution_id_1, solution_1.clone()));
331        assert_eq!(ready.offset, -1);
332        assert!(ready.insert_front(solution_id_2, solution_2.clone()));
333        assert_eq!(ready.offset, -2);
334
335        // Check retrieval.
336        assert_eq!(ready.get(solution_id_1), Some(solution_1.clone()));
337        assert_eq!(ready.get(solution_id_2), Some(solution_2.clone()));
338
339        // Remove from the front, offset should have increased by 1.
340        let removed_solution = ready.remove_front().unwrap();
341        assert_eq!(removed_solution, (solution_id_2, solution_2));
342        assert_eq!(ready.offset, -1);
343
344        // Remove another transmission from the front, the offset should be back to 0.
345        let removed_solution = ready.remove_front().unwrap();
346        assert_eq!(removed_solution, (solution_id_1, solution_1));
347        assert_eq!(ready.offset, 0);
348    }
349
350    #[test]
351    fn test_clear_solutions() {
352        let rng = &mut TestRng::default();
353        let solution_data =
354            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
355        let transaction_data =
356            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
357
358        // Initialize the ready queue.
359        let mut ready = Ready::<CurrentNetwork>::new();
360
361        // Initialize the solution IDs.
362        let solution_id_1 = TransmissionID::Solution(
363            rng.r#gen::<u64>().into(),
364            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
365        );
366        let solution_id_2 = TransmissionID::Solution(
367            rng.r#gen::<u64>().into(),
368            rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
369        );
370        let transaction_id = TransmissionID::Transaction(
371            <CurrentNetwork as Network>::TransactionID::from(Field::rand(rng)),
372            <CurrentNetwork as Network>::TransmissionChecksum::from(rng.r#gen::<u128>()),
373        );
374
375        // Initialize the transmissions.
376        let solution_1 = Transmission::Solution(solution_data(rng));
377        let solution_2 = Transmission::Solution(solution_data(rng));
378        let transaction = Transmission::Transaction(transaction_data(rng));
379
380        // Insert the solution, check the offset should be decremented.
381        assert!(ready.insert_front(solution_id_1, solution_1.clone()));
382        assert_eq!(ready.offset, -1);
383
384        // Insert the transaction and the second solution, the offset should remain unchanged.
385        assert!(ready.insert(transaction_id, transaction.clone()));
386        assert_eq!(ready.offset, -1);
387        assert!(ready.insert(solution_id_2, solution_2.clone()));
388        assert_eq!(ready.offset, -1);
389
390        // Clear all solution transmissions.
391        ready.clear_solutions();
392        // Only the transaction should remain.
393        assert_eq!(ready.num_transmissions(), 1);
394        // The offset should now be reset to 0.
395        assert_eq!(ready.offset, 0);
396        // The remaining transmission is the transaction.
397        assert_eq!(ready.get(transaction_id), Some(transaction));
398    }
399}