1use crate::events::{
17 BatchPropose,
18 BatchSignature,
19 CertificateRequest,
20 CertificateResponse,
21 TransmissionRequest,
22 TransmissionResponse,
23};
24use snarkos_node_sync::locators::BlockLocators;
25use snarkvm::{
26 console::network::*,
27 ledger::{
28 block::{Block, Transaction},
29 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
30 puzzle::{Solution, SolutionID},
31 },
32 prelude::Result,
33};
34
35use indexmap::IndexMap;
36use std::net::SocketAddr;
37use tokio::sync::{mpsc, oneshot};
38
39const MAX_CHANNEL_SIZE: usize = 8192;
40
41#[derive(Debug)]
42pub struct ConsensusSender<N: Network> {
43 pub tx_consensus_subdag:
44 mpsc::Sender<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
45}
46
47#[derive(Debug)]
48pub struct ConsensusReceiver<N: Network> {
49 pub rx_consensus_subdag:
50 mpsc::Receiver<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
51}
52
53pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusReceiver<N>) {
55 let (tx_consensus_subdag, rx_consensus_subdag) = mpsc::channel(MAX_CHANNEL_SIZE);
56
57 let sender = ConsensusSender { tx_consensus_subdag };
58 let receiver = ConsensusReceiver { rx_consensus_subdag };
59
60 (sender, receiver)
61}
62
63#[derive(Clone, Debug)]
64pub struct BFTSender<N: Network> {
65 pub tx_primary_round: mpsc::Sender<(u64, oneshot::Sender<bool>)>,
66 pub tx_primary_certificate: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
67 pub tx_sync_bft_dag_at_bootup: mpsc::Sender<Vec<BatchCertificate<N>>>,
68 pub tx_sync_bft: mpsc::Sender<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
69}
70
71impl<N: Network> BFTSender<N> {
72 pub async fn send_primary_round_to_bft(&self, current_round: u64) -> Result<bool> {
74 let (callback_sender, callback_receiver) = oneshot::channel();
76 self.tx_primary_round.send((current_round, callback_sender)).await?;
78 Ok(callback_receiver.await?)
80 }
81
82 pub async fn send_primary_certificate_to_bft(&self, certificate: BatchCertificate<N>) -> Result<()> {
84 let (callback_sender, callback_receiver) = oneshot::channel();
86 self.tx_primary_certificate.send((certificate, callback_sender)).await?;
88 callback_receiver.await?
90 }
91
92 pub async fn send_sync_bft(&self, certificate: BatchCertificate<N>) -> Result<()> {
94 let (callback_sender, callback_receiver) = oneshot::channel();
96 self.tx_sync_bft.send((certificate, callback_sender)).await?;
98 callback_receiver.await?
100 }
101}
102
103#[derive(Debug)]
104pub struct BFTReceiver<N: Network> {
105 pub rx_primary_round: mpsc::Receiver<(u64, oneshot::Sender<bool>)>,
106 pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
107 pub rx_sync_bft_dag_at_bootup: mpsc::Receiver<Vec<BatchCertificate<N>>>,
108 pub rx_sync_bft: mpsc::Receiver<(BatchCertificate<N>, oneshot::Sender<Result<()>>)>,
109}
110
111pub fn init_bft_channels<N: Network>() -> (BFTSender<N>, BFTReceiver<N>) {
113 let (tx_primary_round, rx_primary_round) = mpsc::channel(MAX_CHANNEL_SIZE);
114 let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE);
115 let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE);
116 let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE);
117
118 let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft };
119 let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft };
120
121 (sender, receiver)
122}
123
124#[derive(Clone, Debug)]
125pub struct PrimarySender<N: Network> {
126 pub tx_batch_propose: mpsc::Sender<(SocketAddr, BatchPropose<N>)>,
127 pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature<N>)>,
128 pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
129 pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
130 pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<()>>)>,
131 pub tx_unconfirmed_transaction: mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<()>>)>,
132}
133
134impl<N: Network> PrimarySender<N> {
135 pub async fn send_unconfirmed_solution(
137 &self,
138 solution_id: SolutionID<N>,
139 solution: Data<Solution<N>>,
140 ) -> Result<()> {
141 let (callback_sender, callback_receiver) = oneshot::channel();
143 self.tx_unconfirmed_solution.send((solution_id, solution, callback_sender)).await?;
145 callback_receiver.await?
147 }
148
149 pub async fn send_unconfirmed_transaction(
151 &self,
152 transaction_id: N::TransactionID,
153 transaction: Data<Transaction<N>>,
154 ) -> Result<()> {
155 let (callback_sender, callback_receiver) = oneshot::channel();
157 self.tx_unconfirmed_transaction.send((transaction_id, transaction, callback_sender)).await?;
159 callback_receiver.await?
161 }
162}
163
164#[derive(Debug)]
165pub struct PrimaryReceiver<N: Network> {
166 pub rx_batch_propose: mpsc::Receiver<(SocketAddr, BatchPropose<N>)>,
167 pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>,
168 pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
169 pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
170 pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<()>>)>,
171 pub rx_unconfirmed_transaction:
172 mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<()>>)>,
173}
174
175pub fn init_primary_channels<N: Network>() -> (PrimarySender<N>, PrimaryReceiver<N>) {
177 let (tx_batch_propose, rx_batch_propose) = mpsc::channel(MAX_CHANNEL_SIZE);
178 let (tx_batch_signature, rx_batch_signature) = mpsc::channel(MAX_CHANNEL_SIZE);
179 let (tx_batch_certified, rx_batch_certified) = mpsc::channel(MAX_CHANNEL_SIZE);
180 let (tx_primary_ping, rx_primary_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
181 let (tx_unconfirmed_solution, rx_unconfirmed_solution) = mpsc::channel(MAX_CHANNEL_SIZE);
182 let (tx_unconfirmed_transaction, rx_unconfirmed_transaction) = mpsc::channel(MAX_CHANNEL_SIZE);
183
184 let sender = PrimarySender {
185 tx_batch_propose,
186 tx_batch_signature,
187 tx_batch_certified,
188 tx_primary_ping,
189 tx_unconfirmed_solution,
190 tx_unconfirmed_transaction,
191 };
192 let receiver = PrimaryReceiver {
193 rx_batch_propose,
194 rx_batch_signature,
195 rx_batch_certified,
196 rx_primary_ping,
197 rx_unconfirmed_solution,
198 rx_unconfirmed_transaction,
199 };
200
201 (sender, receiver)
202}
203
204#[derive(Debug)]
205pub struct WorkerSender<N: Network> {
206 pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>,
207 pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>,
208 pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>,
209}
210
211#[derive(Debug)]
212pub struct WorkerReceiver<N: Network> {
213 pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>,
214 pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>,
215 pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>,
216}
217
218pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) {
220 let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
221 let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE);
222 let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE);
223
224 let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response };
225 let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response };
226
227 (sender, receiver)
228}
229
230#[derive(Debug)]
231pub struct SyncSender<N: Network> {
232 pub tx_block_sync_advance_with_sync_blocks: mpsc::Sender<(SocketAddr, Vec<Block<N>>, oneshot::Sender<Result<()>>)>,
233 pub tx_block_sync_remove_peer: mpsc::Sender<SocketAddr>,
234 pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
235 pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest<N>)>,
236 pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse<N>)>,
237}
238
239impl<N: Network> SyncSender<N> {
240 pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators<N>) -> Result<()> {
242 let (callback_sender, callback_receiver) = oneshot::channel();
244 self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?;
246 callback_receiver.await?
248 }
249
250 pub async fn advance_with_sync_blocks(&self, peer_ip: SocketAddr, blocks: Vec<Block<N>>) -> Result<()> {
252 let (callback_sender, callback_receiver) = oneshot::channel();
254 self.tx_block_sync_advance_with_sync_blocks.send((peer_ip, blocks, callback_sender)).await?;
256 callback_receiver.await?
258 }
259}
260
261#[derive(Debug)]
262pub struct SyncReceiver<N: Network> {
263 pub rx_block_sync_advance_with_sync_blocks:
264 mpsc::Receiver<(SocketAddr, Vec<Block<N>>, oneshot::Sender<Result<()>>)>,
265 pub rx_block_sync_remove_peer: mpsc::Receiver<SocketAddr>,
266 pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
267 pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest<N>)>,
268 pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse<N>)>,
269}
270
271pub fn init_sync_channels<N: Network>() -> (SyncSender<N>, SyncReceiver<N>) {
273 let (tx_block_sync_advance_with_sync_blocks, rx_block_sync_advance_with_sync_blocks) =
274 mpsc::channel(MAX_CHANNEL_SIZE);
275 let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE);
276 let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE);
277 let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE);
278 let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE);
279
280 let sender = SyncSender {
281 tx_block_sync_advance_with_sync_blocks,
282 tx_block_sync_remove_peer,
283 tx_block_sync_update_peer_locators,
284 tx_certificate_request,
285 tx_certificate_response,
286 };
287 let receiver = SyncReceiver {
288 rx_block_sync_advance_with_sync_blocks,
289 rx_block_sync_remove_peer,
290 rx_block_sync_update_peer_locators,
291 rx_certificate_request,
292 rx_certificate_response,
293 };
294
295 (sender, receiver)
296}