snarkos_node_bft/helpers/
ready.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use snarkvm::{
17    console::prelude::*,
18    ledger::{
19        block::Transaction,
20        narwhal::{Data, Transmission, TransmissionID},
21        puzzle::{Solution, SolutionID},
22    },
23};
24
25use indexmap::{IndexMap, IndexSet};
26use parking_lot::RwLock;
27use std::sync::Arc;
28
29#[derive(Clone, Debug)]
30pub struct Ready<N: Network> {
31    /// The current map of `(transmission ID, transmission)` entries.
32    transmissions: Arc<RwLock<IndexMap<TransmissionID<N>, Transmission<N>>>>,
33}
34
35impl<N: Network> Default for Ready<N> {
36    /// Initializes a new instance of the ready queue.
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl<N: Network> Ready<N> {
43    /// Initializes a new instance of the ready queue.
44    pub fn new() -> Self {
45        Self { transmissions: Default::default() }
46    }
47
48    /// Returns `true` if the ready queue is empty.
49    pub fn is_empty(&self) -> bool {
50        self.transmissions.read().is_empty()
51    }
52
53    /// Returns the number of transmissions in the ready queue.
54    pub fn num_transmissions(&self) -> usize {
55        self.transmissions.read().len()
56    }
57
58    /// Returns the number of ratifications in the ready queue.
59    pub fn num_ratifications(&self) -> usize {
60        self.transmissions.read().keys().filter(|id| matches!(id, TransmissionID::Ratification)).count()
61    }
62
63    /// Returns the number of solutions in the ready queue.
64    pub fn num_solutions(&self) -> usize {
65        self.transmissions.read().keys().filter(|id| matches!(id, TransmissionID::Solution(..))).count()
66    }
67
68    /// Returns the number of transactions in the ready queue.
69    pub fn num_transactions(&self) -> usize {
70        self.transmissions.read().keys().filter(|id| matches!(id, TransmissionID::Transaction(..))).count()
71    }
72
73    /// Returns the transmission IDs in the ready queue.
74    pub fn transmission_ids(&self) -> IndexSet<TransmissionID<N>> {
75        self.transmissions.read().keys().copied().collect()
76    }
77
78    /// Returns the transmissions in the ready queue.
79    pub fn transmissions(&self) -> IndexMap<TransmissionID<N>, Transmission<N>> {
80        self.transmissions.read().clone()
81    }
82
83    /// Returns the solutions in the ready queue.
84    pub fn solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
85        self.transmissions.read().clone().into_iter().filter_map(|(id, transmission)| match (id, transmission) {
86            (TransmissionID::Solution(id, _), Transmission::Solution(solution)) => Some((id, solution)),
87            _ => None,
88        })
89    }
90
91    /// Returns the transactions in the ready queue.
92    pub fn transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
93        self.transmissions.read().clone().into_iter().filter_map(|(id, transmission)| match (id, transmission) {
94            (TransmissionID::Transaction(id, _), Transmission::Transaction(tx)) => Some((id, tx)),
95            _ => None,
96        })
97    }
98}
99
100impl<N: Network> Ready<N> {
101    /// Returns `true` if the ready queue contains the specified `transmission ID`.
102    pub fn contains(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
103        self.transmissions.read().contains_key(&transmission_id.into())
104    }
105
106    /// Returns the transmission, given the specified `transmission ID`.
107    pub fn get(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
108        self.transmissions.read().get(&transmission_id.into()).cloned()
109    }
110
111    /// Inserts the specified (`transmission ID`, `transmission`) to the ready queue.
112    /// Returns `true` if the transmission is new, and was added to the ready queue.
113    pub fn insert(&self, transmission_id: impl Into<TransmissionID<N>>, transmission: Transmission<N>) -> bool {
114        let transmission_id = transmission_id.into();
115        // Insert the transmission ID.
116        let is_new = self.transmissions.write().insert(transmission_id, transmission).is_none();
117        // Return whether the transmission is new.
118        is_new
119    }
120
121    /// Removes up to the specified number of transmissions and returns them.
122    pub fn drain(&self, num_transmissions: usize) -> IndexMap<TransmissionID<N>, Transmission<N>> {
123        // Acquire the write lock.
124        let mut transmissions = self.transmissions.write();
125        // Determine the number of transmissions to drain.
126        let range = 0..transmissions.len().min(num_transmissions);
127        // Drain the transmission IDs.
128        transmissions.drain(range).collect::<IndexMap<_, _>>()
129    }
130
131    /// Clears all solutions from the ready queue.
132    pub(crate) fn clear_solutions(&self) {
133        // Acquire the write lock.
134        let mut transmissions = self.transmissions.write();
135        // Remove all solutions.
136        transmissions.retain(|id, _| !matches!(id, TransmissionID::Solution(..)));
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use snarkvm::ledger::narwhal::Data;
144
145    use ::bytes::Bytes;
146
147    type CurrentNetwork = snarkvm::prelude::MainnetV0;
148
149    #[test]
150    fn test_ready() {
151        let rng = &mut TestRng::default();
152
153        // Sample random fake bytes.
154        let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));
155
156        // Initialize the ready queue.
157        let ready = Ready::<CurrentNetwork>::new();
158
159        // Initialize the solution IDs.
160        let solution_id_1 = TransmissionID::Solution(
161            rng.gen::<u64>().into(),
162            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
163        );
164        let solution_id_2 = TransmissionID::Solution(
165            rng.gen::<u64>().into(),
166            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
167        );
168        let solution_id_3 = TransmissionID::Solution(
169            rng.gen::<u64>().into(),
170            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
171        );
172
173        // Initialize the solutions.
174        let solution_1 = Transmission::Solution(data(rng));
175        let solution_2 = Transmission::Solution(data(rng));
176        let solution_3 = Transmission::Solution(data(rng));
177
178        // Insert the solution IDs.
179        assert!(ready.insert(solution_id_1, solution_1.clone()));
180        assert!(ready.insert(solution_id_2, solution_2.clone()));
181        assert!(ready.insert(solution_id_3, solution_3.clone()));
182
183        // Check the number of transmissions.
184        assert_eq!(ready.num_transmissions(), 3);
185
186        // Check the transmission IDs.
187        let transmission_ids = vec![solution_id_1, solution_id_2, solution_id_3].into_iter().collect::<IndexSet<_>>();
188        assert_eq!(ready.transmission_ids(), transmission_ids);
189        transmission_ids.iter().for_each(|id| assert!(ready.contains(*id)));
190
191        // Check that an unknown solution ID is not in the ready queue.
192        let solution_id_unknown = TransmissionID::Solution(
193            rng.gen::<u64>().into(),
194            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
195        );
196        assert!(!ready.contains(solution_id_unknown));
197
198        // Check the transmissions.
199        assert_eq!(ready.get(solution_id_1), Some(solution_1.clone()));
200        assert_eq!(ready.get(solution_id_2), Some(solution_2.clone()));
201        assert_eq!(ready.get(solution_id_3), Some(solution_3.clone()));
202        assert_eq!(ready.get(solution_id_unknown), None);
203
204        // Drain the ready queue.
205        let transmissions = ready.drain(3);
206
207        // Check the number of transmissions.
208        assert!(ready.is_empty());
209        // Check the transmission IDs.
210        assert_eq!(ready.transmission_ids(), IndexSet::new());
211
212        // Check the transmissions.
213        assert_eq!(
214            transmissions,
215            vec![(solution_id_1, solution_1), (solution_id_2, solution_2), (solution_id_3, solution_3)]
216                .into_iter()
217                .collect::<IndexMap<_, _>>()
218        );
219    }
220
221    #[test]
222    fn test_ready_duplicate() {
223        use rand::RngCore;
224        let rng = &mut TestRng::default();
225
226        // Sample random fake bytes.
227        let mut vec = vec![0u8; 512];
228        rng.fill_bytes(&mut vec);
229        let data = Data::Buffer(Bytes::from(vec));
230
231        // Initialize the ready queue.
232        let ready = Ready::<CurrentNetwork>::new();
233
234        // Initialize the solution ID.
235        let solution_id = TransmissionID::Solution(
236            rng.gen::<u64>().into(),
237            rng.gen::<<CurrentNetwork as Network>::TransmissionChecksum>(),
238        );
239
240        // Initialize the solution.
241        let solution = Transmission::Solution(data);
242
243        // Insert the solution ID.
244        assert!(ready.insert(solution_id, solution.clone()));
245        assert!(!ready.insert(solution_id, solution));
246
247        // Check the number of transmissions.
248        assert_eq!(ready.num_transmissions(), 1);
249    }
250}