snarkos_node_bft/helpers/
channels.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::events::{
17    BatchPropose,
18    BatchSignature,
19    CertificateRequest,
20    CertificateResponse,
21    TransmissionRequest,
22    TransmissionResponse,
23};
24use snarkos_node_sync::locators::BlockLocators;
25use snarkvm::{
26    console::network::*,
27    ledger::{
28        block::{Block, Transaction},
29        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
30        puzzle::{Solution, SolutionID},
31    },
32    prelude::Result,
33};
34
35use indexmap::IndexMap;
36use std::net::SocketAddr;
37use tokio::sync::{mpsc, oneshot};
38
39const MAX_CHANNEL_SIZE: usize = 8192;
40
41#[derive(Debug)]
42pub struct ConsensusSender<N: Network> {
43    pub tx_consensus_subdag:
44        mpsc::Sender<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
45}
46
47#[derive(Debug)]
48pub struct ConsensusReceiver<N: Network> {
49    pub rx_consensus_subdag:
50        mpsc::Receiver<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
51}
52
53/// Initializes the consensus channels.
54pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusReceiver<N>) {
55    let (tx_consensus_subdag, rx_consensus_subdag) = mpsc::channel(MAX_CHANNEL_SIZE);
56
57    let sender = ConsensusSender { tx_consensus_subdag };
58    let receiver = ConsensusReceiver { rx_consensus_subdag };
59
60    (sender, receiver)
61}
62
63#[derive(Clone, Debug)]
64pub struct BFTSender<N: Network> {
65    pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<bool>)>,
66    pub tx_primary_certificate: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
67    pub tx_sync_bft_dag_at_bootup: mpsc::Sender<Vec<BatchCertificate<N>>>,
68    pub tx_sync_bft: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
69}
70
71impl<N: Network> BFTSender<N> {
72    /// Sends the current round to the BFT.
73    pub async fn send_primary_round_to_bft(&self, current_round: u64) -> Result<bool> {
74        // Initialize a callback sender and receiver.
75        let (callback_sender, callback_receiver) = oneshot::channel();
76        // Send the current round to the BFT.
77        self.tx_primary_round.send((current_round, callback_sender)).await?;
78        // Await the callback to continue.
79        Ok(callback_receiver.await?)
80    }
81
82    /// Sends the batch certificate to the BFT.
83    pub async fn send_primary_certificate_to_bft(&self, certificate: BatchCertificate<N>) -> Result<()> {
84        // Initialize a callback sender and receiver.
85        let (callback_sender, callback_receiver) = oneshot::channel();
86        // Send the certificate to the BFT.
87        self.tx_primary_certificate.send((certificate, callback_sender)).await?;
88        // Await the callback to continue.
89        callback_receiver.await?
90    }
91
92    /// Sends the batch certificates to the BFT for syncing.
93    pub async fn send_sync_bft(&self, certificate: BatchCertificate<N>) -> Result<()> {
94        // Initialize a callback sender and receiver.
95        let (callback_sender, callback_receiver) = oneshot::channel();
96        // Send the certificate to the BFT for syncing.
97        self.tx_sync_bft.send((certificate, callback_sender)).await?;
98        // Await the callback to continue.
99        callback_receiver.await?
100    }
101}
102
103#[derive(Debug)]
104pub struct BFTReceiver<N: Network> {
105    pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender<bool>)>,
106    pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
107    pub rx_sync_bft_dag_at_bootup: mpsc::Receiver<Vec<BatchCertificate<N>>>,
108    pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
109}
110
111/// Initializes the BFT channels.
112pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
113    let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE);
114    let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);
115    let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE);
116    let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE);
117
118    let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft };
119    let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft };
120
121    (sender, receiver)
122}
123
124#[derive(Clone, Debug)]
125pub struct PrimarySender<N: Network> {
126    pub tx_batch_propose: mpsc::Sender<(SocketAddr, BatchPropose<N>)>,
127    pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature<N>)>,
128    pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
129    pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
130    pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<()>>)>,
131    pub tx_unconfirmed_transaction: mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<()>>)>,
132}
133
134impl<N: Network> PrimarySender<N> {
135    /// Sends the unconfirmed solution to the primary.
136    pub async fn send_unconfirmed_solution(
137        &self,
138        solution_id: SolutionID<N>,
139        solution: Data<Solution<N>>,
140    ) -> Result<()> {
141        // Initialize a callback sender and receiver.
142        let (callback_sender, callback_receiver) = oneshot::channel();
143        // Send the unconfirmed solution to the primary.
144        self.tx_unconfirmed_solution.send((solution_id, solution, callback_sender)).await?;
145        // Await the callback to continue.
146        callback_receiver.await?
147    }
148
149    /// Sends the unconfirmed transaction to the primary.
150    pub async fn send_unconfirmed_transaction(
151        &self,
152        transaction_id: N::TransactionID,
153        transaction: Data<Transaction<N>>,
154    ) -> Result<()> {
155        // Initialize a callback sender and receiver.
156        let (callback_sender, callback_receiver) = oneshot::channel();
157        // Send the unconfirmed transaction to the primary.
158        self.tx_unconfirmed_transaction.send((transaction_id, transaction, callback_sender)).await?;
159        // Await the callback to continue.
160        callback_receiver.await?
161    }
162}
163
164#[derive(Debug)]
165pub struct PrimaryReceiver<N: Network> {
166    pub rx_batch_propose: mpsc::Receiver<(SocketAddr, BatchPropose<N>)>,
167    pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>,
168    pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
169    pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
170    pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<()>>)>,
171    pub rx_unconfirmed_transaction:
172        mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<()>>)>,
173}
174
175/// Initializes the primary channels.
176pub fn init_primary_channels<N: Network>() -> (PrimarySender<N>, PrimaryReceiver<N>) {
177    let (tx_batch_propose, rx_batch_propose) = mpsc::channel(MAX_CHANNEL_SIZE);
178    let (tx_batch_signature, rx_batch_signature) = mpsc::channel(MAX_CHANNEL_SIZE);
179    let (tx_batch_certified, rx_batch_certified) = mpsc::channel(MAX_CHANNEL_SIZE);
180    let (tx_primary_ping, rx_primary_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
181    let (tx_unconfirmed_solution, rx_unconfirmed_solution) = mpsc::channel(MAX_CHANNEL_SIZE);
182    let (tx_unconfirmed_transaction, rx_unconfirmed_transaction) = mpsc::channel(MAX_CHANNEL_SIZE);
183
184    let sender = PrimarySender {
185        tx_batch_propose,
186        tx_batch_signature,
187        tx_batch_certified,
188        tx_primary_ping,
189        tx_unconfirmed_solution,
190        tx_unconfirmed_transaction,
191    };
192    let receiver = PrimaryReceiver {
193        rx_batch_propose,
194        rx_batch_signature,
195        rx_batch_certified,
196        rx_primary_ping,
197        rx_unconfirmed_solution,
198        rx_unconfirmed_transaction,
199    };
200
201    (sender, receiver)
202}
203
204#[derive(Debug)]
205pub struct WorkerSender<N: Network> {
206    pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>,
207    pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>,
208    pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>,
209}
210
211#[derive(Debug)]
212pub struct WorkerReceiver<N: Network> {
213    pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>,
214    pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>,
215    pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>,
216}
217
218/// Initializes the worker channels.
219pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) {
220    let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
221    let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE);
222    let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE);
223
224    let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response };
225    let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response };
226
227    (sender, receiver)
228}
229
230#[derive(Debug)]
231pub struct SyncSender<N: Network> {
232    pub tx_block_sync_advance_with_sync_blocks: mpsc::Sender<(SocketAddr, Vec<Block<N>>, oneshot::Sender<Result<()>>)>,
233    pub tx_block_sync_remove_peer: mpsc::Sender<SocketAddr>,
234    pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
235    pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest<N>)>,
236    pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse<N>)>,
237}
238
239impl<N: Network> SyncSender<N> {
240    /// Sends the request to update the peer locators.
241    pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators<N>) -> Result<()> {
242        // Initialize a callback sender and receiver.
243        let (callback_sender, callback_receiver) = oneshot::channel();
244        // Send the request to update the peer locators.
245        self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?;
246        // Await the callback to continue.
247        callback_receiver.await?
248    }
249
250    /// Sends the request to advance with sync blocks.
251    pub async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
252        // Initialize a callback sender and receiver.
253        let (callback_sender, callback_receiver) = oneshot::channel();
254        // Send the request to advance with sync blocks.
255        self.tx_block_sync_advance_with_sync_blocks.send((peer_ip, blocks, callback_sender)).await?;
256        // Await the callback to continue.
257        callback_receiver.await?
258    }
259}
260
261#[derive(Debug)]
262pub struct SyncReceiver<N: Network> {
263    pub rx_block_sync_advance_with_sync_blocks:
264        mpsc::Receiver<(SocketAddr, Vec<Block<N>>, oneshot::Sender<Result<()>>)>,
265    pub rx_block_sync_remove_peer: mpsc::Receiver<SocketAddr>,
266    pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
267    pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest<N>)>,
268    pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse<N>)>,
269}
270
271/// Initializes the sync channels.
272pub fn init_sync_channels<N: Network>() -> (SyncSender<N>, SyncReceiver<N>) {
273    let (tx_block_sync_advance_with_sync_blocks, rx_block_sync_advance_with_sync_blocks) =
274        mpsc::channel(MAX_CHANNEL_SIZE);
275    let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE);
276    let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE);
277    let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE);
278    let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE);
279
280    let sender = SyncSender {
281        tx_block_sync_advance_with_sync_blocks,
282        tx_block_sync_remove_peer,
283        tx_block_sync_update_peer_locators,
284        tx_certificate_request,
285        tx_certificate_response,
286    };
287    let receiver = SyncReceiver {
288        rx_block_sync_advance_with_sync_blocks,
289        rx_block_sync_remove_peer,
290        rx_block_sync_update_peer_locators,
291        rx_certificate_request,
292        rx_certificate_response,
293    };
294
295    (sender, receiver)
296}