dusk_node/
network.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::net::{AddrParseError, SocketAddr};
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use kadcast::config::Config;
13use kadcast::{MessageInfo, Peer};
14use metrics::counter;
15use node_data::message::payload::{GetResource, Inv, Nonce};
16use node_data::message::{AsyncQueue, Metadata, PROTOCOL_VERSION};
17use node_data::{get_current_timestamp, Serializable};
18use tokio::sync::RwLock;
19use tracing::{debug, error, info, trace, warn};
20
21use crate::{BoxedFilter, Message};
22
23/// Number of alive peers randomly selected which a `flood_request` is sent to
24const REDUNDANCY_PEER_COUNT: usize = 8;
25
26type RoutesList<const N: usize> = [Option<AsyncQueue<Message>>; N];
27type FilterList<const N: usize> = [Option<BoxedFilter>; N];
28
29pub struct Listener<const N: usize> {
30    routes: Arc<RwLock<RoutesList<N>>>,
31    filters: Arc<RwLock<FilterList<N>>>,
32}
33
34impl<const N: usize> Listener<N> {
35    fn reroute(&self, topic: u8, msg: Message) {
36        let routes = self.routes.clone();
37        tokio::spawn(async move {
38            if let Some(Some(queue)) = routes.read().await.get(topic as usize) {
39                queue.try_send(msg);
40            };
41        });
42    }
43
44    fn call_filters(
45        &self,
46        topic: impl Into<u8>,
47        msg: &Message,
48    ) -> anyhow::Result<()> {
49        let topic = topic.into() as usize;
50
51        match self.filters.try_write()?.get_mut(topic) {
52            Some(Some(f)) => f.filter(msg),
53            _ => Ok(()),
54        }
55    }
56}
57
58impl<const N: usize> kadcast::NetworkListen for Listener<N> {
59    fn on_message(&self, blob: Vec<u8>, md: MessageInfo) {
60        let msg_size = blob.len();
61        match Message::read(&mut &blob.to_vec()[..]) {
62            Ok(mut msg) => {
63                counter!("dusk_bytes_recv").increment(msg_size as u64);
64                counter!(format!("dusk_inbound_{:?}_size", msg.topic()))
65                    .increment(msg_size as u64);
66                counter!(format!("dusk_inbound_{:?}_count", msg.topic()))
67                    .increment(1);
68
69                #[cfg(feature = "network-trace")]
70                let ray_id = node_data::ledger::to_str(md.ray_id());
71                #[cfg(not(feature = "network-trace"))]
72                #[allow(non_upper_case_globals)]
73                const ray_id: String = String::new();
74
75                debug!(
76                    event = "msg received",
77                    src = ?md.src(),
78                    kad_height = md.height(),
79                    ray_id,
80                    topic = ?msg.topic(),
81                    height = msg.get_height(),
82                    iteration = msg.get_iteration(),
83                );
84
85                // Update Transport Data
86                msg.metadata = Some(Metadata {
87                    height: md.height(),
88                    src_addr: md.src(),
89                    ray_id,
90                });
91
92                // Allow upper layers to fast-discard a message before queueing
93                if let Err(e) = self.call_filters(msg.topic(), &msg) {
94                    info!("discard message due to {e}");
95                    return;
96                }
97
98                // Reroute message to the upper layer
99                self.reroute(msg.topic().into(), msg);
100            }
101            Err(err) => {
102                // Dump message blob and topic number
103                let topic = blob.get(node_data::message::TOPIC_FIELD_POS);
104                error!("err: {err}, msg_topic: {topic:?}",);
105            }
106        };
107    }
108}
109
110pub struct Kadcast<const N: usize> {
111    peer: Peer,
112    routes: Arc<RwLock<RoutesList<N>>>,
113    filters: Arc<RwLock<FilterList<N>>>,
114    conf: Config,
115
116    /// Represents a parsed conf.public_addr
117    public_addr: SocketAddr,
118
119    counter: AtomicU64,
120}
121
122impl<const N: usize> Kadcast<N> {
123    pub fn new(mut conf: Config) -> Result<Self, AddrParseError> {
124        const INIT: Option<AsyncQueue<Message>> = None;
125        let routes = Arc::new(RwLock::new([INIT; N]));
126
127        const INIT_FN: Option<BoxedFilter> = None;
128        let filters = Arc::new(RwLock::new([INIT_FN; N]));
129
130        info!(
131            "Loading network with public_address {} and private_address {:?}",
132            &conf.public_address, &conf.listen_address
133        );
134        let listener = Listener {
135            routes: routes.clone(),
136            filters: filters.clone(),
137        };
138        conf.version = format!("{PROTOCOL_VERSION}");
139        conf.version_match = format!("{PROTOCOL_VERSION}");
140        let peer = Peer::new(conf.clone(), listener)?;
141        let public_addr = conf
142            .public_address
143            .parse::<SocketAddr>()
144            .expect("valid kadcast public address");
145
146        let nonce = Nonce::from(public_addr.ip());
147
148        Ok(Kadcast {
149            routes,
150            filters,
151            peer,
152            conf,
153            public_addr,
154            counter: AtomicU64::new(nonce.into()),
155        })
156    }
157
158    pub fn route_internal(&self, msg: Message) {
159        let topic = msg.topic() as usize;
160        let routes = self.routes.clone();
161
162        tokio::spawn(async move {
163            if let Some(Some(queue)) = routes.read().await.get(topic) {
164                queue.try_send(msg.clone());
165            };
166        });
167    }
168
169    pub async fn alive_nodes(&self, amount: usize) -> Vec<SocketAddr> {
170        self.peer.alive_nodes(amount).await
171    }
172
173    pub async fn table(&self) -> Vec<SocketAddr> {
174        self.peer
175            .to_route_table()
176            .await
177            .into_values()
178            .flat_map(|v| v.into_iter().map(|(addr, _)| addr))
179            .collect()
180    }
181
182    pub fn conf(&self) -> &Config {
183        &self.conf
184    }
185
186    async fn send_with_metrics(
187        &self,
188        bytes: &Vec<u8>,
189        recv_addr: Vec<SocketAddr>,
190    ) {
191        if !recv_addr.is_empty() {
192            let bytes_sent = bytes.len() * recv_addr.len();
193            counter!("dusk_bytes_sent").increment(bytes_sent as u64);
194            self.peer.send_to_peers(bytes, recv_addr).await;
195        }
196    }
197}
198
199#[async_trait]
200impl<const N: usize> crate::Network for Kadcast<N> {
201    async fn broadcast(&self, msg: &Message) -> anyhow::Result<()> {
202        let kad_height = msg.metadata.as_ref().map(|m| m.height);
203        debug!(
204            event = "broadcasting msg",
205            kad_height,
206            ray_id = msg.ray_id(),
207            topic = ?msg.topic(),
208            height = msg.get_height(),
209            iteration = msg.get_iteration(),
210        );
211
212        let height = match kad_height {
213            Some(0) => return Ok(()),
214            Some(height) => Some(height - 1),
215            None => None,
216        };
217
218        let mut encoded = vec![];
219        msg.write(&mut encoded).map_err(|err| {
220            error!("could not encode message {msg:?}: {err}");
221            anyhow::anyhow!("failed to broadcast: {err}")
222        })?;
223
224        counter!("dusk_bytes_cast").increment(encoded.len() as u64);
225        counter!(format!("dusk_outbound_{:?}_size", msg.topic()))
226            .increment(encoded.len() as u64);
227
228        self.peer.broadcast(&encoded, height).await;
229
230        Ok(())
231    }
232
233    /// Broadcast a GetResource request.
234    ///
235    /// By utilizing the randomly selected peers per bucket in Kadcast, this
236    /// broadcast does follow the so-called "Flood with Random Walk" blind
237    /// search (resource discovery).
238    ///
239    /// A receiver of this message is supposed to look up the resource and
240    /// either return it or, if not found, rebroadcast the message to the next
241    /// Kadcast bucket
242    ///
243    /// * `ttl_as_sec` - Defines the lifespan of the request in seconds
244    ///
245    /// * `hops_limit` - Defines maximum number of hops to receive the request
246    async fn flood_request(
247        &self,
248        msg_inv: &Inv,
249        ttl_as_sec: Option<u64>,
250        hops_limit: u16,
251    ) -> anyhow::Result<()> {
252        let ttl_as_sec = ttl_as_sec
253            .map_or_else(|| u64::MAX, |v| get_current_timestamp() + v);
254
255        let msg = GetResource::new(
256            msg_inv.clone(),
257            Some(self.public_addr),
258            ttl_as_sec,
259            hops_limit,
260        );
261        self.send_to_alive_peers(msg.into(), REDUNDANCY_PEER_COUNT)
262            .await
263    }
264
265    /// Sends an encoded message to a given peer.
266    async fn send_to_peer(
267        &self,
268        mut msg: Message,
269        recv_addr: SocketAddr,
270    ) -> anyhow::Result<()> {
271        // rnd_count is added to bypass kadcast dupemap
272        let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst);
273
274        msg.payload.set_nonce(rnd_count);
275
276        let mut encoded = vec![];
277        msg.write(&mut encoded)
278            .map_err(|err| anyhow::anyhow!("failed to send_to_peer: {err}"))?;
279        let topic = msg.topic();
280
281        debug!(
282          event = "Sending msg",
283          topic = ?topic,
284          info = ?msg.header,
285          destination = ?recv_addr
286        );
287
288        self.send_with_metrics(&encoded, vec![recv_addr]).await;
289
290        Ok(())
291    }
292
293    /// Sends to random set of alive peers.
294    async fn send_to_alive_peers(
295        &self,
296        mut msg: Message,
297        amount: usize,
298    ) -> anyhow::Result<()> {
299        // rnd_count is added to bypass kadcast dupemap
300        let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst);
301
302        msg.payload.set_nonce(rnd_count);
303
304        let mut encoded = vec![];
305        msg.write(&mut encoded)
306            .map_err(|err| anyhow::anyhow!("failed to encode: {err}"))?;
307        let topic = msg.topic();
308
309        counter!(format!("dusk_requests_{:?}", topic)).increment(1);
310
311        let mut alive_nodes = self.peer.alive_nodes(amount).await;
312
313        if alive_nodes.len() < amount {
314            let current = alive_nodes.len();
315
316            let route_table = self.peer.to_route_table().await;
317            let new_nodes: Vec<_> = route_table
318                .into_values()
319                .flatten()
320                .map(|(s, _)| s)
321                .filter(|s| !alive_nodes.contains(s))
322                .take(amount - current)
323                .collect();
324
325            alive_nodes.extend(new_nodes);
326            warn!(
327                event = "Not enought alive peers to send msg, increased",
328                ?topic,
329                requested = amount,
330                current,
331                increased = alive_nodes.len(),
332            );
333        }
334        trace!("sending msg ({topic:?}) to peers {alive_nodes:?}");
335        self.send_with_metrics(&encoded, alive_nodes).await;
336
337        Ok(())
338    }
339
340    /// Route any message of the specified type to this queue.
341    async fn add_route(
342        &mut self,
343        topic: u8,
344        queue: AsyncQueue<Message>,
345    ) -> anyhow::Result<()> {
346        let mut guard = self.routes.write().await;
347
348        let route = guard
349            .get_mut(topic as usize)
350            .ok_or_else(|| anyhow::anyhow!("topic out of range: {topic}"))?;
351
352        debug_assert!(route.is_none(), "topic already registered");
353
354        *route = Some(queue);
355
356        Ok(())
357    }
358
359    async fn add_filter(
360        &mut self,
361        msg_type: u8,
362        filter_fn: BoxedFilter,
363    ) -> anyhow::Result<()> {
364        let mut guard = self.filters.write().await;
365
366        let filter = guard
367            .get_mut(msg_type as usize)
368            .expect("should be valid type");
369
370        *filter = Some(filter_fn);
371
372        Ok(())
373    }
374
375    // TODO: Duplicated func
376    fn get_info(&self) -> anyhow::Result<String> {
377        Ok(self.conf.public_address.to_string())
378    }
379
380    fn public_addr(&self) -> &SocketAddr {
381        &self.public_addr
382    }
383
384    async fn alive_nodes_count(&self) -> usize {
385        // TODO: This call should be replaced with no-copy Kadcast API
386        self.peer.alive_nodes(u16::MAX as usize).await.len()
387    }
388}