1use std::net::{AddrParseError, SocketAddr};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use kadcast::config::Config;
13use kadcast::{MessageInfo, Peer};
14use metrics::counter;
15use node_data::message::payload::{GetResource, Inv, Nonce};
16use node_data::message::{AsyncQueue, Metadata, PROTOCOL_VERSION};
17use node_data::{get_current_timestamp, Serializable};
18use tokio::sync::RwLock;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{BoxedFilter, Message};
22
23const REDUNDANCY_PEER_COUNT: usize = 8;
25
26type RoutesList<const N: usize> = [Option<AsyncQueue<Message>>; N];
27type FilterList<const N: usize> = [Option<BoxedFilter>; N];
28
29pub struct Listener<const N: usize> {
30 routes: Arc<RwLock<RoutesList<N>>>,
31 filters: Arc<RwLock<FilterList<N>>>,
32}
33
34impl<const N: usize> Listener<N> {
35 fn reroute(&self, topic: u8, msg: Message) {
36 let routes = self.routes.clone();
37 tokio::spawn(async move {
38 if let Some(Some(queue)) = routes.read().await.get(topic as usize) {
39 queue.try_send(msg);
40 };
41 });
42 }
43
44 fn call_filters(
45 &self,
46 topic: impl Into<u8>,
47 msg: &Message,
48 ) -> anyhow::Result<()> {
49 let topic = topic.into() as usize;
50
51 match self.filters.try_write()?.get_mut(topic) {
52 Some(Some(f)) => f.filter(msg),
53 _ => Ok(()),
54 }
55 }
56}
57
58impl<const N: usize> kadcast::NetworkListen for Listener<N> {
59 fn on_message(&self, blob: Vec<u8>, md: MessageInfo) {
60 let msg_size = blob.len();
61 match Message::read(&mut &blob.to_vec()[..]) {
62 Ok(mut msg) => {
63 counter!("dusk_bytes_recv").increment(msg_size as u64);
64 counter!(format!("dusk_inbound_{:?}_size", msg.topic()))
65 .increment(msg_size as u64);
66 counter!(format!("dusk_inbound_{:?}_count", msg.topic()))
67 .increment(1);
68
69 #[cfg(feature = "network-trace")]
70 let ray_id = node_data::ledger::to_str(md.ray_id());
71 #[cfg(not(feature = "network-trace"))]
72 #[allow(non_upper_case_globals)]
73 const ray_id: String = String::new();
74
75 debug!(
76 event = "msg received",
77 src = ?md.src(),
78 kad_height = md.height(),
79 ray_id,
80 topic = ?msg.topic(),
81 height = msg.get_height(),
82 iteration = msg.get_iteration(),
83 );
84
85 msg.metadata = Some(Metadata {
87 height: md.height(),
88 src_addr: md.src(),
89 ray_id,
90 });
91
92 if let Err(e) = self.call_filters(msg.topic(), &msg) {
94 info!("discard message due to {e}");
95 return;
96 }
97
98 self.reroute(msg.topic().into(), msg);
100 }
101 Err(err) => {
102 let topic = blob.get(node_data::message::TOPIC_FIELD_POS);
104 error!("err: {err}, msg_topic: {topic:?}",);
105 }
106 };
107 }
108}
109
110pub struct Kadcast<const N: usize> {
111 peer: Peer,
112 routes: Arc<RwLock<RoutesList<N>>>,
113 filters: Arc<RwLock<FilterList<N>>>,
114 conf: Config,
115
116 public_addr: SocketAddr,
118
119 counter: AtomicU64,
120}
121
122impl<const N: usize> Kadcast<N> {
123 pub fn new(mut conf: Config) -> Result<Self, AddrParseError> {
124 const INIT: Option<AsyncQueue<Message>> = None;
125 let routes = Arc::new(RwLock::new([INIT; N]));
126
127 const INIT_FN: Option<BoxedFilter> = None;
128 let filters = Arc::new(RwLock::new([INIT_FN; N]));
129
130 info!(
131 "Loading network with public_address {} and private_address {:?}",
132 &conf.public_address, &conf.listen_address
133 );
134 let listener = Listener {
135 routes: routes.clone(),
136 filters: filters.clone(),
137 };
138 conf.version = format!("{PROTOCOL_VERSION}");
139 conf.version_match = format!("{PROTOCOL_VERSION}");
140 let peer = Peer::new(conf.clone(), listener)?;
141 let public_addr = conf
142 .public_address
143 .parse::<SocketAddr>()
144 .expect("valid kadcast public address");
145
146 let nonce = Nonce::from(public_addr.ip());
147
148 Ok(Kadcast {
149 routes,
150 filters,
151 peer,
152 conf,
153 public_addr,
154 counter: AtomicU64::new(nonce.into()),
155 })
156 }
157
158 pub fn route_internal(&self, msg: Message) {
159 let topic = msg.topic() as usize;
160 let routes = self.routes.clone();
161
162 tokio::spawn(async move {
163 if let Some(Some(queue)) = routes.read().await.get(topic) {
164 queue.try_send(msg.clone());
165 };
166 });
167 }
168
169 pub async fn alive_nodes(&self, amount: usize) -> Vec<SocketAddr> {
170 self.peer.alive_nodes(amount).await
171 }
172
173 pub async fn table(&self) -> Vec<SocketAddr> {
174 self.peer
175 .to_route_table()
176 .await
177 .into_values()
178 .flat_map(|v| v.into_iter().map(|(addr, _)| addr))
179 .collect()
180 }
181
182 pub fn conf(&self) -> &Config {
183 &self.conf
184 }
185
186 async fn send_with_metrics(
187 &self,
188 bytes: &[u8],
189 recv_addr: Vec<SocketAddr>,
190 ) {
191 if !recv_addr.is_empty() {
192 let bytes_sent = bytes.len() * recv_addr.len();
193 counter!("dusk_bytes_sent").increment(bytes_sent as u64);
194 self.peer.send_to_peers(bytes, recv_addr).await;
195 }
196 }
197}
198
199#[async_trait]
200impl<const N: usize> crate::Network for Kadcast<N> {
201 async fn broadcast(&self, msg: &Message) -> anyhow::Result<()> {
202 let kad_height = msg.metadata.as_ref().map(|m| m.height);
203 debug!(
204 event = "broadcasting msg",
205 kad_height,
206 ray_id = msg.ray_id(),
207 topic = ?msg.topic(),
208 height = msg.get_height(),
209 iteration = msg.get_iteration(),
210 );
211
212 let height = match kad_height {
213 Some(0) => return Ok(()),
214 Some(height) => Some(height - 1),
215 None => None,
216 };
217
218 let mut encoded = vec![];
219 msg.write(&mut encoded).map_err(|err| {
220 error!("could not encode message {msg:?}: {err}");
221 anyhow::anyhow!("failed to broadcast: {err}")
222 })?;
223
224 counter!("dusk_bytes_cast").increment(encoded.len() as u64);
225 counter!(format!("dusk_outbound_{:?}_size", msg.topic()))
226 .increment(encoded.len() as u64);
227
228 self.peer.broadcast(&encoded, height).await;
229
230 Ok(())
231 }
232
233 async fn flood_request(
247 &self,
248 msg_inv: &Inv,
249 ttl_as_sec: Option<u64>,
250 hops_limit: u16,
251 ) -> anyhow::Result<()> {
252 let ttl_as_sec = ttl_as_sec
253 .map_or_else(|| u64::MAX, |v| get_current_timestamp() + v);
254
255 let msg = GetResource::new(
256 msg_inv.clone(),
257 Some(self.public_addr),
258 ttl_as_sec,
259 hops_limit,
260 );
261 self.send_to_alive_peers(msg.into(), REDUNDANCY_PEER_COUNT)
262 .await
263 }
264
265 async fn send_to_peer(
267 &self,
268 mut msg: Message,
269 recv_addr: SocketAddr,
270 ) -> anyhow::Result<()> {
271 let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst);
273
274 msg.payload.set_nonce(rnd_count);
275
276 let mut encoded = vec![];
277 msg.write(&mut encoded)
278 .map_err(|err| anyhow::anyhow!("failed to send_to_peer: {err}"))?;
279 let topic = msg.topic();
280
281 debug!(
282 event = "Sending msg",
283 topic = ?topic,
284 info = ?msg.header,
285 destination = ?recv_addr
286 );
287
288 self.send_with_metrics(&encoded, vec![recv_addr]).await;
289
290 Ok(())
291 }
292
293 async fn send_to_alive_peers(
295 &self,
296 mut msg: Message,
297 amount: usize,
298 ) -> anyhow::Result<()> {
299 let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst);
301
302 msg.payload.set_nonce(rnd_count);
303
304 let mut encoded = vec![];
305 msg.write(&mut encoded)
306 .map_err(|err| anyhow::anyhow!("failed to encode: {err}"))?;
307 let topic = msg.topic();
308
309 counter!(format!("dusk_requests_{:?}", topic)).increment(1);
310
311 let mut alive_nodes = self.peer.alive_nodes(amount).await;
312
313 if alive_nodes.len() < amount {
314 let current = alive_nodes.len();
315
316 let route_table = self.peer.to_route_table().await;
317 let new_nodes: Vec<_> = route_table
318 .into_values()
319 .flatten()
320 .map(|(s, _)| s)
321 .filter(|s| !alive_nodes.contains(s))
322 .take(amount - current)
323 .collect();
324
325 alive_nodes.extend(new_nodes);
326 warn!(
327 event = "Not enought alive peers to send msg, increased",
328 ?topic,
329 requested = amount,
330 current,
331 increased = alive_nodes.len(),
332 );
333 }
334 trace!("sending msg ({topic:?}) to peers {alive_nodes:?}");
335 self.send_with_metrics(&encoded, alive_nodes).await;
336
337 Ok(())
338 }
339
340 async fn add_route(
342 &mut self,
343 topic: u8,
344 queue: AsyncQueue<Message>,
345 ) -> anyhow::Result<()> {
346 let mut guard = self.routes.write().await;
347
348 let route = guard
349 .get_mut(topic as usize)
350 .ok_or_else(|| anyhow::anyhow!("topic out of range: {topic}"))?;
351
352 debug_assert!(route.is_none(), "topic already registered");
353
354 *route = Some(queue);
355
356 Ok(())
357 }
358
359 async fn add_filter(
360 &mut self,
361 msg_type: u8,
362 filter_fn: BoxedFilter,
363 ) -> anyhow::Result<()> {
364 let mut guard = self.filters.write().await;
365
366 let filter = guard
367 .get_mut(msg_type as usize)
368 .expect("should be valid type");
369
370 *filter = Some(filter_fn);
371
372 Ok(())
373 }
374
375 fn get_info(&self) -> anyhow::Result<String> {
377 Ok(self.conf.public_address.to_string())
378 }
379
380 fn public_addr(&self) -> &SocketAddr {
381 &self.public_addr
382 }
383
384 async fn alive_nodes_count(&self) -> usize {
385 self.peer.alive_nodes(u16::MAX as usize).await.len()
387 }
388}