1use crate::events::{
17 BatchPropose,
18 BatchSignature,
19 CertificateRequest,
20 CertificateResponse,
21 TransmissionRequest,
22 TransmissionResponse,
23};
24use snarkos_node_sync::{InsertBlockResponseError, locators::BlockLocators};
25use snarkvm::{
26 console::network::*,
27 ledger::{
28 block::{Block, Transaction},
29 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
30 puzzle::{Solution, SolutionID},
31 },
32 prelude::Result,
33};
34
35use indexmap::IndexMap;
36use std::net::SocketAddr;
37use tokio::sync::{mpsc, oneshot};
38
39const MAX_CHANNEL_SIZE: usize = 8192;
40
41#[derive(Debug)]
42pub struct ConsensusSender<N: Network> {
43 pub tx_consensus_subdag:
44 mpsc::Sender<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
45}
46
47#[derive(Debug)]
48pub struct ConsensusReceiver<N: Network> {
49 pub rx_consensus_subdag:
50 mpsc::Receiver<(Subdag<N>, IndexMap<TransmissionID<N>, Transmission<N>>, oneshot::Sender<Result<()>>)>,
51}
52
53pub fn init_consensus_channels<N: Network>() -> (ConsensusSender<N>, ConsensusReceiver<N>) {
55 let (tx_consensus_subdag, rx_consensus_subdag) = mpsc::channel(MAX_CHANNEL_SIZE);
56
57 let sender = ConsensusSender { tx_consensus_subdag };
58 let receiver = ConsensusReceiver { rx_consensus_subdag };
59
60 (sender, receiver)
61}
62
63#[derive(Clone, Debug)]
64pub struct PrimarySender<N: Network> {
65 pub tx_batch_propose: mpsc::Sender<(SocketAddr, BatchPropose<N>)>,
66 pub tx_batch_signature: mpsc::Sender<(SocketAddr, BatchSignature<N>)>,
67 pub tx_batch_certified: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
68 pub tx_primary_ping: mpsc::Sender<(SocketAddr, Data<BatchCertificate<N>>)>,
69 pub tx_unconfirmed_solution: mpsc::Sender<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
70 pub tx_unconfirmed_transaction:
71 mpsc::Sender<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
72}
73
74impl<N: Network> PrimarySender<N> {
75 pub async fn send_unconfirmed_solution(
82 &self,
83 solution_id: SolutionID<N>,
84 solution: Data<Solution<N>>,
85 ) -> Result<bool> {
86 let (callback_sender, callback_receiver) = oneshot::channel();
88 self.tx_unconfirmed_solution.send((solution_id, solution, callback_sender)).await?;
90 callback_receiver.await?
92 }
93
94 pub async fn send_unconfirmed_transaction(
101 &self,
102 transaction_id: N::TransactionID,
103 transaction: Data<Transaction<N>>,
104 ) -> Result<bool> {
105 let (callback_sender, callback_receiver) = oneshot::channel();
107 self.tx_unconfirmed_transaction.send((transaction_id, transaction, callback_sender)).await?;
109 callback_receiver.await?
111 }
112}
113
114#[derive(Debug)]
115pub struct PrimaryReceiver<N: Network> {
116 pub rx_batch_propose: mpsc::Receiver<(SocketAddr, BatchPropose<N>)>,
117 pub rx_batch_signature: mpsc::Receiver<(SocketAddr, BatchSignature<N>)>,
118 pub rx_batch_certified: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
119 pub rx_primary_ping: mpsc::Receiver<(SocketAddr, Data<BatchCertificate<N>>)>,
120 pub rx_unconfirmed_solution: mpsc::Receiver<(SolutionID<N>, Data<Solution<N>>, oneshot::Sender<Result<bool>>)>,
121 pub rx_unconfirmed_transaction:
122 mpsc::Receiver<(N::TransactionID, Data<Transaction<N>>, oneshot::Sender<Result<bool>>)>,
123}
124
125pub fn init_primary_channels<N: Network>() -> (PrimarySender<N>, PrimaryReceiver<N>) {
127 let (tx_batch_propose, rx_batch_propose) = mpsc::channel(MAX_CHANNEL_SIZE);
128 let (tx_batch_signature, rx_batch_signature) = mpsc::channel(MAX_CHANNEL_SIZE);
129 let (tx_batch_certified, rx_batch_certified) = mpsc::channel(MAX_CHANNEL_SIZE);
130 let (tx_primary_ping, rx_primary_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
131 let (tx_unconfirmed_solution, rx_unconfirmed_solution) = mpsc::channel(MAX_CHANNEL_SIZE);
132 let (tx_unconfirmed_transaction, rx_unconfirmed_transaction) = mpsc::channel(MAX_CHANNEL_SIZE);
133
134 let sender = PrimarySender {
135 tx_batch_propose,
136 tx_batch_signature,
137 tx_batch_certified,
138 tx_primary_ping,
139 tx_unconfirmed_solution,
140 tx_unconfirmed_transaction,
141 };
142 let receiver = PrimaryReceiver {
143 rx_batch_propose,
144 rx_batch_signature,
145 rx_batch_certified,
146 rx_primary_ping,
147 rx_unconfirmed_solution,
148 rx_unconfirmed_transaction,
149 };
150
151 (sender, receiver)
152}
153
154#[derive(Debug)]
155pub struct WorkerSender<N: Network> {
156 pub tx_worker_ping: mpsc::Sender<(SocketAddr, TransmissionID<N>)>,
157 pub tx_transmission_request: mpsc::Sender<(SocketAddr, TransmissionRequest<N>)>,
158 pub tx_transmission_response: mpsc::Sender<(SocketAddr, TransmissionResponse<N>)>,
159}
160
161#[derive(Debug)]
162pub struct WorkerReceiver<N: Network> {
163 pub rx_worker_ping: mpsc::Receiver<(SocketAddr, TransmissionID<N>)>,
164 pub rx_transmission_request: mpsc::Receiver<(SocketAddr, TransmissionRequest<N>)>,
165 pub rx_transmission_response: mpsc::Receiver<(SocketAddr, TransmissionResponse<N>)>,
166}
167
168pub fn init_worker_channels<N: Network>() -> (WorkerSender<N>, WorkerReceiver<N>) {
170 let (tx_worker_ping, rx_worker_ping) = mpsc::channel(MAX_CHANNEL_SIZE);
171 let (tx_transmission_request, rx_transmission_request) = mpsc::channel(MAX_CHANNEL_SIZE);
172 let (tx_transmission_response, rx_transmission_response) = mpsc::channel(MAX_CHANNEL_SIZE);
173
174 let sender = WorkerSender { tx_worker_ping, tx_transmission_request, tx_transmission_response };
175 let receiver = WorkerReceiver { rx_worker_ping, rx_transmission_request, rx_transmission_response };
176
177 (sender, receiver)
178}
179
180#[derive(Debug)]
181pub struct SyncSender<N: Network> {
182 pub tx_block_sync_insert_block_response: mpsc::Sender<(
183 SocketAddr,
184 Vec<Block<N>>,
185 Option<ConsensusVersion>,
186 oneshot::Sender<Result<(), InsertBlockResponseError<N>>>,
187 )>,
188 pub tx_block_sync_remove_peer: mpsc::Sender<SocketAddr>,
189 pub tx_block_sync_update_peer_locators: mpsc::Sender<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
190 pub tx_certificate_request: mpsc::Sender<(SocketAddr, CertificateRequest<N>)>,
191 pub tx_certificate_response: mpsc::Sender<(SocketAddr, CertificateResponse<N>)>,
192}
193
194impl<N: Network> SyncSender<N> {
195 pub async fn update_peer_locators(&self, peer_ip: SocketAddr, block_locators: BlockLocators<N>) -> Result<()> {
197 let (callback_sender, callback_receiver) = oneshot::channel();
199 self.tx_block_sync_update_peer_locators.send((peer_ip, block_locators, callback_sender)).await?;
204 callback_receiver.await?
206 }
207
208 pub async fn insert_block_response(
210 &self,
211 peer_ip: SocketAddr,
212 blocks: Vec<Block<N>>,
213 latest_consensus_version: Option<ConsensusVersion>,
214 ) -> Result<(), InsertBlockResponseError<N>> {
215 let (callback_sender, callback_receiver) = oneshot::channel();
217 if let Err(err) = self
222 .tx_block_sync_insert_block_response
223 .send((peer_ip, blocks, latest_consensus_version, callback_sender))
224 .await
225 {
226 return Err(anyhow!("Failed to send block response - {err}").into());
227 }
228
229 match callback_receiver.await {
231 Ok(result) => result,
232 Err(err) => Err(anyhow!("Failed to wait for block response insertion - {err}").into()),
233 }
234 }
235}
236
237#[derive(Debug)]
238pub struct SyncReceiver<N: Network> {
239 pub rx_block_sync_insert_block_response: mpsc::Receiver<(
240 SocketAddr,
241 Vec<Block<N>>,
242 Option<ConsensusVersion>,
243 oneshot::Sender<Result<(), InsertBlockResponseError<N>>>,
244 )>,
245 pub rx_block_sync_remove_peer: mpsc::Receiver<SocketAddr>,
246 pub rx_block_sync_update_peer_locators: mpsc::Receiver<(SocketAddr, BlockLocators<N>, oneshot::Sender<Result<()>>)>,
247 pub rx_certificate_request: mpsc::Receiver<(SocketAddr, CertificateRequest<N>)>,
248 pub rx_certificate_response: mpsc::Receiver<(SocketAddr, CertificateResponse<N>)>,
249}
250
251pub fn init_sync_channels<N: Network>() -> (SyncSender<N>, SyncReceiver<N>) {
253 let (tx_block_sync_insert_block_response, rx_block_sync_insert_block_response) = mpsc::channel(MAX_CHANNEL_SIZE);
254 let (tx_block_sync_remove_peer, rx_block_sync_remove_peer) = mpsc::channel(MAX_CHANNEL_SIZE);
255 let (tx_block_sync_update_peer_locators, rx_block_sync_update_peer_locators) = mpsc::channel(MAX_CHANNEL_SIZE);
256 let (tx_certificate_request, rx_certificate_request) = mpsc::channel(MAX_CHANNEL_SIZE);
257 let (tx_certificate_response, rx_certificate_response) = mpsc::channel(MAX_CHANNEL_SIZE);
258
259 let sender = SyncSender {
260 tx_block_sync_insert_block_response,
261 tx_block_sync_remove_peer,
262 tx_block_sync_update_peer_locators,
263 tx_certificate_request,
264 tx_certificate_response,
265 };
266 let receiver = SyncReceiver {
267 rx_block_sync_insert_block_response,
268 rx_block_sync_remove_peer,
269 rx_block_sync_update_peer_locators,
270 rx_certificate_request,
271 rx_certificate_response,
272 };
273
274 (sender, receiver)
275}