Skip to main content

snarkos_node_bft/helpers/
ready.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use snarkvm::{
17    console::prelude::*,
18    ledger::{
19        block::Transaction,
20        narwhal::{Data, Transmission, TransmissionID},
21        puzzle::{Solution, SolutionID},
22    },
23};
24
25use indexmap::{IndexMap, IndexSet};
26use std::collections::{HashMap, VecDeque, hash_map::Entry::Vacant};
27
28/// Maintains a queue of verified ("ready") transmissions.
29#[derive(Clone, Debug)]
30pub struct Ready<N: Network> {
31    /// Maps each transmission ID to its logical index (physical index + offset)
32    /// in `transmissions`.
33    transmission_ids: HashMap<TransmissionID<N>, i64>,
34    /// An ordered collection of (transmission ID, transmission).
35    transmissions: VecDeque<(TransmissionID<N>, Transmission<N>)>,
36    /// An offset used to adjust logical indices when elements are inserted or
37    /// removed at the front.
38    offset: i64,
39}
40
41impl<N: Network> Default for Ready<N> {
42    /// Initializes a new instance of the ready queue.
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl<N: Network> Ready<N> {
49    /// Initializes a new instance of the ready queue.
50    pub fn new() -> Self {
51        Self { transmission_ids: Default::default(), transmissions: Default::default(), offset: Default::default() }
52    }
53
54    /// Returns `true` if the ready queue is empty.
55    pub fn is_empty(&self) -> bool {
56        self.transmissions.is_empty()
57    }
58
59    /// Returns the number of transmissions in the ready queue.
60    pub fn num_transmissions(&self) -> usize {
61        self.transmissions.len()
62    }
63
64    /// Returns the number of ratifications in the ready queue.
65    pub fn num_ratifications(&self) -> usize {
66        self.transmission_ids.keys().filter(|id| matches!(id, TransmissionID::Ratification)).count()
67    }
68
69    /// Returns the number of solutions in the ready queue.
70    pub fn num_solutions(&self) -> usize {
71        self.transmission_ids.keys().filter(|id| matches!(id, TransmissionID::Solution(..))).count()
72    }
73
74    /// Returns the number of transactions in the ready queue.
75    pub fn num_transactions(&self) -> usize {
76        self.transmission_ids.keys().filter(|id| matches!(id, TransmissionID::Transaction(..))).count()
77    }
78
79    /// Returns the transmission IDs in the ready queue.
80    pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
81        self.transmission_ids.keys().copied().collect()
82    }
83
84    /// Returns the transmissions in the ready queue.
85    pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
86        self.transmissions.iter().cloned().collect()
87    }
88
89    /// Returns the solutions in the ready queue.
90    pub fn solutions(&self) -> Vec<(SolutionID<N>, Data<Solution<N>>)> {
91        self.transmissions
92            .iter()
93            .filter_map(|(id, transmission)| match (id, transmission) {
94                (TransmissionID::Solution(id, _), Transmission::Solution(solution)) => Some((*id, solution.clone())),
95                _ => None,
96            })
97            .collect()
98    }
99
100    /// Returns the transactions in the ready queue.
101    pub fn transactions(&self) -> Vec<(N::TransactionID, Data<Transaction<N>>)> {
102        self.transmissions
103            .iter()
104            .filter_map(|(id, transmission)| match (id, transmission) {
105                (TransmissionID::Transaction(id, _), Transmission::Transaction(tx)) => Some((*id, tx.clone())),
106                _ => None,
107            })
108            .collect()
109    }
110}
111
112impl<N: Network> Ready<N> {
113    /// Returns `true` if the ready queue contains the specified `transmission ID`.
114    pub fn contains(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
115        self.transmission_ids.contains_key(&transmission_id.into())
116    }
117
118    /// Returns the transmission, given the specified `transmission ID`.
119    pub fn get(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
120        self.transmission_ids
121            .get(&transmission_id.into())
122            .and_then(|&index| self.transmissions.get((index - self.offset) as usize))
123            .map(|(_, transmission)| transmission.clone())
124    }
125
126    /// Inserts the specified (`transmission ID`, `transmission`) to the ready queue.
127    /// Returns `true` if the transmission is new, and was added to the ready queue.
128    pub fn insert(&mut self, transmission_id: impl Into<TransmissionID<N>>, transmission: Transmission<N>) -> bool {
129        let physical_index = self.transmissions.len();
130        let transmission_id = transmission_id.into();
131
132        if let Vacant(entry) = self.transmission_ids.entry(transmission_id) {
133            entry.insert(physical_index as i64 + self.offset);
134            self.transmissions.push_back((transmission_id, transmission));
135            true
136        } else {
137            false
138        }
139    }
140
141    /// Inserts the specified (`transmission ID`, `transmission`) at the front
142    /// of the ready queue.
143    /// Returns `true` if the transmission is new, and was added to the ready queue.
144    pub fn insert_front(
145        &mut self,
146        transmission_id: impl Into<TransmissionID<N>>,
147        transmission: Transmission<N>,
148    ) -> bool {
149        let transmission_id = transmission_id.into();
150        if let Vacant(entry) = self.transmission_ids.entry(transmission_id) {
151            self.offset -= 1;
152            let index = self.offset;
153
154            entry.insert(index);
155            self.transmissions.push_front((transmission_id, transmission));
156            true
157        } else {
158            false
159        }
160    }
161
162    /// Removes and returns the transmission at the front of the queue.
163    pub fn remove_front(&mut self) -> Option<(TransmissionID<N>, Transmission<N>)> {
164        if let Some((transmission_id, transmission)) = self.transmissions.pop_front() {
165            self.transmission_ids.remove(&transmission_id);
166
167            if self.transmission_ids.is_empty() {
168                debug_assert!(self.transmissions.is_empty());
169                self.offset = 0;
170            } else {
171                self.offset += 1;
172            }
173
174            Some((transmission_id, transmission))
175        } else {
176            None
177        }
178    }
179
180    /// Removes all solution transmissions from the queue (O(n)).
181    pub fn clear_solutions(&mut self) {
182        self.transmissions.retain(|(_, transmission)| !matches!(transmission, Transmission::Solution(_)));
183
184        // Rebuild the index and reset the offset.
185        self.transmission_ids.clear();
186        self.offset = 0;
187        for (i, (id, _)) in self.transmissions.iter().enumerate() {
188            self.transmission_ids.insert(*id, i as i64);
189        }
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use snarkvm::{ledger::narwhal::Data, prelude::Field};
197
198    use ::bytes::Bytes;
199
200    type CurrentNetwork = snarkvm::prelude::MainnetV0;
201
202    #[test]
203    fn test_ready() {
204        let rng = &mut TestRng::default();
205
206        // Sample random fake bytes.
207        let data =
208            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
209
210        // Initialize the ready queue.
211        let mut ready = Ready::<CurrentNetwork>::new();
212
213        // Initialize the solution IDs.
214        let solution_id_1 = TransmissionID::Solution(
215            rng.random::<u64>().into(),
216            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
217        );
218        let solution_id_2 = TransmissionID::Solution(
219            rng.random::<u64>().into(),
220            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
221        );
222        let solution_id_3 = TransmissionID::Solution(
223            rng.random::<u64>().into(),
224            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
225        );
226
227        // Initialize the solutions.
228        let solution_1 = Transmission::Solution(data(rng));
229        let solution_2 = Transmission::Solution(data(rng));
230        let solution_3 = Transmission::Solution(data(rng));
231
232        // Insert the solution IDs.
233        assert!(ready.insert(solution_id_1, solution_1.clone()));
234        assert!(ready.insert(solution_id_2, solution_2.clone()));
235        assert!(ready.insert(solution_id_3, solution_3.clone()));
236
237        // Check the number of transmissions.
238        assert_eq!(ready.num_transmissions(), 3);
239
240        // Check the transmission IDs.
241        let transmission_ids = vec![solution_id_1, solution_id_2, solution_id_3].into_iter().collect::<IndexSet<_>>();
242        assert_eq!(ready.transmission_ids(), transmission_ids);
243        transmission_ids.iter().for_each(|id| assert!(ready.contains(*id)));
244
245        // Check that an unknown solution ID is not in the ready queue.
246        let solution_id_unknown = TransmissionID::Solution(
247            rng.random::<u64>().into(),
248            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
249        );
250        assert!(!ready.contains(solution_id_unknown));
251
252        // Check the transmissions.
253        assert_eq!(ready.get(solution_id_1), Some(solution_1.clone()));
254        assert_eq!(ready.get(solution_id_2), Some(solution_2.clone()));
255        assert_eq!(ready.get(solution_id_3), Some(solution_3.clone()));
256        assert_eq!(ready.get(solution_id_unknown), None);
257
258        // Drain the ready queue.
259        let mut transmissions = Vec::with_capacity(3);
260        for _ in 0..3 {
261            transmissions.push(ready.remove_front().unwrap())
262        }
263
264        // Check the number of transmissions.
265        assert!(ready.is_empty());
266        // Check the transmission IDs.
267        assert_eq!(ready.transmission_ids(), IndexSet::new());
268        // Check the transmissions.
269        assert_eq!(transmissions, vec![
270            (solution_id_1, solution_1),
271            (solution_id_2, solution_2),
272            (solution_id_3, solution_3)
273        ]);
274    }
275
276    #[test]
277    fn test_ready_duplicate() {
278        use rand::RngExt;
279        let rng = &mut TestRng::default();
280
281        // Sample random fake bytes.
282        let vec: Vec<u8> = (0..512).map(|_| rng.random::<u8>()).collect();
283        let data = Data::Buffer(Bytes::from(vec));
284
285        // Initialize the ready queue.
286        let mut ready = Ready::<CurrentNetwork>::new();
287
288        // Initialize the solution ID.
289        let solution_id = TransmissionID::Solution(
290            rng.random::<u64>().into(),
291            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
292        );
293
294        // Initialize the solution.
295        let solution = Transmission::Solution(data);
296
297        // Insert the solution ID.
298        assert!(ready.insert(solution_id, solution.clone()));
299        assert!(!ready.insert(solution_id, solution));
300
301        // Check the number of transmissions.
302        assert_eq!(ready.num_transmissions(), 1);
303    }
304
305    #[test]
306    fn test_insert_front() {
307        let rng = &mut TestRng::default();
308        let data =
309            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
310
311        // Initialize the ready queue.
312        let mut ready = Ready::<CurrentNetwork>::new();
313
314        // Initialize the solution IDs.
315        let solution_id_1 = TransmissionID::Solution(
316            rng.random::<u64>().into(),
317            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
318        );
319        let solution_id_2 = TransmissionID::Solution(
320            rng.random::<u64>().into(),
321            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
322        );
323
324        // Initialize the solutions.
325        let solution_1 = Transmission::Solution(data(rng));
326        let solution_2 = Transmission::Solution(data(rng));
327
328        // Insert the two solutions at the front, check the offset.
329        assert!(ready.insert_front(solution_id_1, solution_1.clone()));
330        assert_eq!(ready.offset, -1);
331        assert!(ready.insert_front(solution_id_2, solution_2.clone()));
332        assert_eq!(ready.offset, -2);
333
334        // Check retrieval.
335        assert_eq!(ready.get(solution_id_1), Some(solution_1.clone()));
336        assert_eq!(ready.get(solution_id_2), Some(solution_2.clone()));
337
338        // Remove from the front, offset should have increased by 1.
339        let removed_solution = ready.remove_front().unwrap();
340        assert_eq!(removed_solution, (solution_id_2, solution_2));
341        assert_eq!(ready.offset, -1);
342
343        // Remove another transmission from the front, the offset should be back to 0.
344        let removed_solution = ready.remove_front().unwrap();
345        assert_eq!(removed_solution, (solution_id_1, solution_1));
346        assert_eq!(ready.offset, 0);
347    }
348
349    #[test]
350    fn test_clear_solutions() {
351        let rng = &mut TestRng::default();
352        let solution_data =
353            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
354        let transaction_data =
355            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
356
357        // Initialize the ready queue.
358        let mut ready = Ready::<CurrentNetwork>::new();
359
360        // Initialize the solution IDs.
361        let solution_id_1 = TransmissionID::Solution(
362            rng.random::<u64>().into(),
363            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
364        );
365        let solution_id_2 = TransmissionID::Solution(
366            rng.random::<u64>().into(),
367            rng.random::<<CurrentNetwork as Network>::TransmissionChecksum>(),
368        );
369        let transaction_id = TransmissionID::Transaction(
370            <CurrentNetwork as Network>::TransactionID::from(Field::rand(rng)),
371            <CurrentNetwork as Network>::TransmissionChecksum::from(rng.random::<u128>()),
372        );
373
374        // Initialize the transmissions.
375        let solution_1 = Transmission::Solution(solution_data(rng));
376        let solution_2 = Transmission::Solution(solution_data(rng));
377        let transaction = Transmission::Transaction(transaction_data(rng));
378
379        // Insert the solution, check the offset should be decremented.
380        assert!(ready.insert_front(solution_id_1, solution_1.clone()));
381        assert_eq!(ready.offset, -1);
382
383        // Insert the transaction and the second solution, the offset should remain unchanged.
384        assert!(ready.insert(transaction_id, transaction.clone()));
385        assert_eq!(ready.offset, -1);
386        assert!(ready.insert(solution_id_2, solution_2.clone()));
387        assert_eq!(ready.offset, -1);
388
389        // Clear all solution transmissions.
390        ready.clear_solutions();
391        // Only the transaction should remain.
392        assert_eq!(ready.num_transmissions(), 1);
393        // The offset should now be reset to 0.
394        assert_eq!(ready.offset, 0);
395        // The remaining transmission is the transaction.
396        assert_eq!(ready.get(transaction_id), Some(transaction));
397    }
398}