Skip to main content

snarkos_node_bft/helpers/
channels.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::events::{
17    BatchPropose,
18    BatchSignature,
19    CertificateRequest,
20    CertificateResponse,
21    TransmissionRequest,
22    TransmissionResponse,
23};
24use snarkos_node_sync::{InsertBlockResponseError, locators::BlockLocators};
25use snarkvm::{
26    console::network::*,
27    ledger::{
28        block::{Block, Transaction},
29        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
30        puzzle::{Solution, SolutionID},
31    },
32    prelude::Result,
33};
34
35use indexmap::IndexMap;
36use std::net::SocketAddr;
37use tokio::sync::{mpsc, oneshot};
38
39const MAX_CHANNEL_SIZE: usize = 8192;
40
41#[derive(Debug)]
42pub struct ConsensusSender<N: Network> {
43    pub tx_consensus_subdag:
44        mpsc::Sender<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
45}
46
47#[derive(Debug)]
48pub struct ConsensusReceiver<N: Network> {
49    pub rx_consensus_subdag:
50        mpsc::Receiver<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
51}
52
53/// Initializes the consensus channels.
54pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusReceiver<N>) {
55    let (tx_consensus_subdag, rx_consensus_subdag) = mpsc::channel(MAX_CHANNEL_SIZE);
56
57    let sender = ConsensusSender { tx_consensus_subdag };
58    let receiver = ConsensusReceiver { rx_consensus_subdag };
59
60    (sender, receiver)
61}
62
63#[derive(Clone, Debug)]
64pub struct PrimarySender<N: Network> {
65    pub tx_batch_propose: mpsc::Sender<(SocketAddr, BatchPropose<N>)>,
66    pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature<N>)>,
67    pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
68    pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
69    pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
70    pub tx_unconfirmed_transaction:
71        mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
72}
73
74impl<N: Network> PrimarySender<N> {
75    /// Sends the unconfirmed solution to the primary.
76    ///
77    /// # Returns
78    /// - `Ok(true)` if the solution was added to the ready queue.
79    /// - `Ok(false)` if the solution was valid but already exists in the ready queue.
80    /// - `Err(anyhow::Error)` if the solution was invalid.
81    pub async fn send_unconfirmed_solution(
82        &self,
83        solution_id: SolutionID<N>,
84        solution: Data<Solution<N>>,
85    ) -> Result<bool> {
86        // Initialize a callback sender and receiver.
87        let (callback_sender, callback_receiver) = oneshot::channel();
88        // Send the unconfirmed solution to the primary.
89        self.tx_unconfirmed_solution.send((solution_id, solution, callback_sender)).await?;
90        // Await the callback to continue.
91        callback_receiver.await?
92    }
93
94    /// Sends the unconfirmed transaction to the primary.
95    ///
96    /// # Returns
97    /// - `Ok(true)` if the transaction was added to the ready queue.
98    /// - `Ok(false)` if the transaction was valid but already exists in the ready queue.
99    /// - `Err(anyhow::Error)` if the transaction was invalid.
100    pub async fn send_unconfirmed_transaction(
101        &self,
102        transaction_id: N::TransactionID,
103        transaction: Data<Transaction<N>>,
104    ) -> Result<bool> {
105        // Initialize a callback sender and receiver.
106        let (callback_sender, callback_receiver) = oneshot::channel();
107        // Send the unconfirmed transaction to the primary.
108        self.tx_unconfirmed_transaction.send((transaction_id, transaction, callback_sender)).await?;
109        // Await the callback to continue.
110        callback_receiver.await?
111    }
112}
113
114#[derive(Debug)]
115pub struct PrimaryReceiver<N: Network> {
116    pub rx_batch_propose: mpsc::Receiver<(SocketAddr, BatchPropose<N>)>,
117    pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>,
118    pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
119    pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
120    pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
121    pub rx_unconfirmed_transaction:
122        mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
123}
124
125/// Initializes the primary channels.
126pub fn init_primary_channels<N: Network>() -> (PrimarySender<N>, PrimaryReceiver<N>) {
127    let (tx_batch_propose, rx_batch_propose) = mpsc::channel(MAX_CHANNEL_SIZE);
128    let (tx_batch_signature, rx_batch_signature) = mpsc::channel(MAX_CHANNEL_SIZE);
129    let (tx_batch_certified, rx_batch_certified) = mpsc::channel(MAX_CHANNEL_SIZE);
130    let (tx_primary_ping, rx_primary_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
131    let (tx_unconfirmed_solution, rx_unconfirmed_solution) = mpsc::channel(MAX_CHANNEL_SIZE);
132    let (tx_unconfirmed_transaction, rx_unconfirmed_transaction) = mpsc::channel(MAX_CHANNEL_SIZE);
133
134    let sender = PrimarySender {
135        tx_batch_propose,
136        tx_batch_signature,
137        tx_batch_certified,
138        tx_primary_ping,
139        tx_unconfirmed_solution,
140        tx_unconfirmed_transaction,
141    };
142    let receiver = PrimaryReceiver {
143        rx_batch_propose,
144        rx_batch_signature,
145        rx_batch_certified,
146        rx_primary_ping,
147        rx_unconfirmed_solution,
148        rx_unconfirmed_transaction,
149    };
150
151    (sender, receiver)
152}
153
154#[derive(Debug)]
155pub struct WorkerSender<N: Network> {
156    pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>,
157    pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>,
158    pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>,
159}
160
161#[derive(Debug)]
162pub struct WorkerReceiver<N: Network> {
163    pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>,
164    pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>,
165    pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>,
166}
167
168/// Initializes the worker channels.
169pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) {
170    let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
171    let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE);
172    let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE);
173
174    let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response };
175    let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response };
176
177    (sender, receiver)
178}
179
180#[derive(Debug)]
181pub struct SyncSender<N: Network> {
182    pub tx_block_sync_insert_block_response: mpsc::Sender<(
183        SocketAddr,
184        Vec<Block<N>>,
185        Option<ConsensusVersion>,
186        oneshot::Sender<Result<(), InsertBlockResponseError<N>>>,
187    )>,
188    pub tx_block_sync_remove_peer: mpsc::Sender<SocketAddr>,
189    pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
190    pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest<N>)>,
191    pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse<N>)>,
192}
193
194impl<N: Network> SyncSender<N> {
195    /// Sends the request to update the peer locators.
196    pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators<N>) -> Result<()> {
197        // Initialize a callback sender and receiver.
198        let (callback_sender, callback_receiver) = oneshot::channel();
199        // Send the request to update the peer locators.
200        // This `tx_block_sync_update_peer_locators.send()` call
201        // causes the `rx_block_sync_update_peer_locators.recv()` call
202        // in one of the loops in [`Sync::run()`] to return.
203        self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?;
204        // Await the callback to continue.
205        callback_receiver.await?
206    }
207
208    /// Sends the request to insert a new block response.
209    pub async fn insert_block_response(
210        &self,
211        peer_ip: SocketAddr,
212        blocks: Vec<Block<N>>,
213        latest_consensus_version: Option<ConsensusVersion>,
214    ) -> Result<(), InsertBlockResponseError<N>> {
215        // Initialize a callback sender and receiver.
216        let (callback_sender, callback_receiver) = oneshot::channel();
217        // Send the request to advance with sync blocks.
218        // This `tx_block_sync_advance_with_sync_blocks.send()` call
219        // causes the `rx_block_sync_advance_with_sync_blocks.recv()` call
220        // in one of the loops in [`Sync::run()`] to return.
221        if let Err(err) = self
222            .tx_block_sync_insert_block_response
223            .send((peer_ip, blocks, latest_consensus_version, callback_sender))
224            .await
225        {
226            return Err(anyhow!("Failed to send block response - {err}").into());
227        }
228
229        // Await the callback to continue.
230        match callback_receiver.await {
231            Ok(result) => result,
232            Err(err) => Err(anyhow!("Failed to wait for block response insertion - {err}").into()),
233        }
234    }
235}
236
237#[derive(Debug)]
238pub struct SyncReceiver<N: Network> {
239    pub rx_block_sync_insert_block_response: mpsc::Receiver<(
240        SocketAddr,
241        Vec<Block<N>>,
242        Option<ConsensusVersion>,
243        oneshot::Sender<Result<(), InsertBlockResponseError<N>>>,
244    )>,
245    pub rx_block_sync_remove_peer: mpsc::Receiver<SocketAddr>,
246    pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
247    pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest<N>)>,
248    pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse<N>)>,
249}
250
251/// Initializes the sync channels.
252pub fn init_sync_channels<N: Network>() -> (SyncSender<N>, SyncReceiver<N>) {
253    let (tx_block_sync_insert_block_response, rx_block_sync_insert_block_response) = mpsc::channel(MAX_CHANNEL_SIZE);
254    let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE);
255    let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE);
256    let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE);
257    let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE);
258
259    let sender = SyncSender {
260        tx_block_sync_insert_block_response,
261        tx_block_sync_remove_peer,
262        tx_block_sync_update_peer_locators,
263        tx_certificate_request,
264        tx_certificate_response,
265    };
266    let receiver = SyncReceiver {
267        rx_block_sync_insert_block_response,
268        rx_block_sync_remove_peer,
269        rx_block_sync_update_peer_locators,
270        rx_certificate_request,
271        rx_certificate_response,
272    };
273
274    (sender, receiver)
275}