dusk_node/
lib.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9#![feature(lazy_cell)]
10
11#[cfg(feature = "archive")]
12pub mod archive;
13pub mod chain;
14pub mod database;
15pub mod databroker;
16pub mod mempool;
17pub mod network;
18pub mod telemetry;
19pub mod vm;
20
21use std::net::SocketAddr;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use async_trait::async_trait;
26use node_data::message::payload::Inv;
27use node_data::message::{AsyncQueue, Message};
28use tokio::signal::unix::{signal, SignalKind};
29use tokio::sync::RwLock;
30use tokio::task::JoinSet;
31use tracing::{error, info, warn};
32
33/// Filter is used by Network implementor to filter messages before re-routing
34/// them. It's like the middleware in HTTP pipeline.
35///
36/// To avoid delaying other messages handling, the execution of any filter
37/// should be fast as it is performed in the message handler .
38pub trait Filter {
39    /// Filters a message.
40    fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
41}
42
43pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
44
45#[async_trait]
46pub trait Network: Send + Sync + 'static {
47    /// Broadcasts a fire-and-forget message.
48    async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
49
50    /// Broadcasts a request message
51    async fn flood_request(
52        &self,
53        msg_inv: &Inv,
54        ttl_as_sec: Option<u64>,
55        hops_limit: u16,
56    ) -> anyhow::Result<()>;
57
58    /// Sends a message to a specified peer.
59    async fn send_to_peer(
60        &self,
61        msg: Message,
62        peer_addr: std::net::SocketAddr,
63    ) -> anyhow::Result<()>;
64
65    /// Sends to random set of alive peers.
66    async fn send_to_alive_peers(
67        &self,
68        msg: Message,
69        amount: usize,
70    ) -> anyhow::Result<()>;
71
72    /// Routes any message of the specified type to this queue.
73    async fn add_route(
74        &mut self,
75        msg_type: u8,
76        queue: AsyncQueue<Message>,
77    ) -> anyhow::Result<()>;
78
79    /// Moves a filter of a specified topic to Network.
80    async fn add_filter(
81        &mut self,
82        msg_type: u8,
83        filter: BoxedFilter,
84    ) -> anyhow::Result<()>;
85
86    /// Retrieves information about the network.
87    fn get_info(&self) -> anyhow::Result<String>;
88
89    /// Returns public address in Kadcast
90    fn public_addr(&self) -> &SocketAddr;
91
92    /// Retrieves number of alive nodes
93    async fn alive_nodes_count(&self) -> usize;
94
95    async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
96        let start = Instant::now();
97        while self.alive_nodes_count().await < amount {
98            warn!("wait_for_alive_nodes");
99            if start.elapsed() > timeout {
100                return;
101            }
102            tokio::time::sleep(Duration::from_secs(1)).await;
103        }
104    }
105}
106
107/// Service processes specified set of messages and eventually produces a
108/// DataSource query or update.
109///
110/// Service is allowed to propagate a message to the network as well.
111#[async_trait]
112pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
113    Send + Sync
114{
115    #[allow(unused_variables)]
116    async fn initialize(
117        &mut self,
118        network: Arc<RwLock<N>>,
119        database: Arc<RwLock<DB>>,
120        vm: Arc<RwLock<VM>>,
121    ) -> anyhow::Result<()> {
122        Ok(())
123    }
124
125    async fn execute(
126        &mut self,
127        network: Arc<RwLock<N>>,
128        database: Arc<RwLock<DB>>,
129        vm: Arc<RwLock<VM>>,
130    ) -> anyhow::Result<usize>;
131
132    async fn add_routes(
133        &self,
134        my_topics: &[u8],
135        queue: AsyncQueue<Message>,
136        network: &Arc<RwLock<N>>,
137    ) -> anyhow::Result<()> {
138        let mut guard = network.write().await;
139        for topic in my_topics {
140            guard.add_route(*topic, queue.clone()).await?
141        }
142        Ok(())
143    }
144
145    /// Returns service name.
146    fn name(&self) -> &'static str;
147}
148
149#[derive(Debug)]
150pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
151    network: Arc<RwLock<N>>,
152    database: Arc<RwLock<DB>>,
153    vm_handler: Arc<RwLock<VM>>,
154}
155
156impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
157    for Node<N, DB, VM>
158{
159    fn clone(&self) -> Self {
160        Self {
161            network: self.network.clone(),
162            database: self.database.clone(),
163            vm_handler: self.vm_handler.clone(),
164        }
165    }
166}
167
168impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
169    pub fn new(n: N, d: DB, vm_h: VM) -> Self {
170        Self {
171            network: Arc::new(RwLock::new(n)),
172            database: Arc::new(RwLock::new(d)),
173            vm_handler: Arc::new(RwLock::new(vm_h)),
174        }
175    }
176
177    pub fn database(&self) -> Arc<RwLock<DB>> {
178        self.database.clone()
179    }
180
181    pub fn network(&self) -> Arc<RwLock<N>> {
182        self.network.clone()
183    }
184
185    pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
186        self.vm_handler.clone()
187    }
188
189    pub async fn initialize(
190        &self,
191        services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
192    ) -> anyhow::Result<()> {
193        // Run lazy-initialization of all registered services
194        for service in services.iter_mut() {
195            info!("initialize service {}", service.name());
196            service
197                .initialize(
198                    self.network.clone(),
199                    self.database.clone(),
200                    self.vm_handler.clone(),
201                )
202                .await?;
203        }
204
205        Ok(())
206    }
207
208    /// Sets up and runs a list of services.
209    pub async fn spawn_all(
210        &self,
211        service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
212    ) -> anyhow::Result<()> {
213        // Spawn all services and join-wait for their termination.
214        let mut set = JoinSet::new();
215        set.spawn(async {
216            signal(SignalKind::interrupt())?.recv().await;
217            // TODO: ResultCode
218            Ok(2)
219        });
220
221        for mut s in service_list.into_iter() {
222            let n = self.network.clone();
223            let d = self.database.clone();
224            let vm = self.vm_handler.clone();
225
226            let name = s.name();
227            info!("starting service {}", name);
228
229            set.spawn(async move { s.execute(n, d, vm).await });
230        }
231
232        // Wait for all spawned services to terminate with a result code or
233        // an error. Result code 1 means abort all services.
234        // This is usually triggered by SIGINIT signal.
235        while let Some(res) = set.join_next().await {
236            if let Ok(r) = res {
237                match r {
238                    Ok(rcode) => {
239                        // handle SIGTERM signal
240                        if rcode == 2 {
241                            set.abort_all();
242                        }
243                    }
244                    Err(e) => {
245                        error!("service terminated with err{}", e);
246                    }
247                }
248            }
249        }
250
251        info!("shutdown ...");
252
253        // Release DataSource
254
255        Ok(())
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    // need to add the benchmark dep here so that the
262    // `unused_crate_dependencies` lint is satisfied
263    use criterion as _;
264}