dusk_node/
lib.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9
10#[cfg(feature = "archive")]
11pub mod archive;
12pub mod chain;
13pub mod database;
14pub mod databroker;
15pub mod mempool;
16pub mod network;
17pub mod telemetry;
18pub mod vm;
19
20use std::net::SocketAddr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use async_trait::async_trait;
25use node_data::message::payload::Inv;
26use node_data::message::{AsyncQueue, Message};
27use tokio::signal::unix::{signal, SignalKind};
28use tokio::sync::RwLock;
29use tokio::task::JoinSet;
30use tracing::{error, info, warn};
31
32use indexmap as _; // Required to satisfy unused_crate_dependencies
33use native_tls as _; // Required to satisfy unused_crate_dependencies
34
35/// Filter is used by Network implementor to filter messages before re-routing
36/// them. It's like the middleware in HTTP pipeline.
37///
38/// To avoid delaying other messages handling, the execution of any filter
39/// should be fast as it is performed in the message handler .
40pub trait Filter {
41    /// Filters a message.
42    fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
43}
44
45pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
46
47#[async_trait]
48pub trait Network: Send + Sync + 'static {
49    /// Broadcasts a fire-and-forget message.
50    async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
51
52    /// Broadcasts a request message
53    async fn flood_request(
54        &self,
55        msg_inv: &Inv,
56        ttl_as_sec: Option<u64>,
57        hops_limit: u16,
58    ) -> anyhow::Result<()>;
59
60    /// Sends a message to a specified peer.
61    async fn send_to_peer(
62        &self,
63        msg: Message,
64        peer_addr: std::net::SocketAddr,
65    ) -> anyhow::Result<()>;
66
67    /// Sends to random set of alive peers.
68    async fn send_to_alive_peers(
69        &self,
70        msg: Message,
71        amount: usize,
72    ) -> anyhow::Result<()>;
73
74    /// Routes any message of the specified type to this queue.
75    async fn add_route(
76        &mut self,
77        msg_type: u8,
78        queue: AsyncQueue<Message>,
79    ) -> anyhow::Result<()>;
80
81    /// Moves a filter of a specified topic to Network.
82    async fn add_filter(
83        &mut self,
84        msg_type: u8,
85        filter: BoxedFilter,
86    ) -> anyhow::Result<()>;
87
88    /// Retrieves information about the network.
89    fn get_info(&self) -> anyhow::Result<String>;
90
91    /// Returns public address in Kadcast
92    fn public_addr(&self) -> &SocketAddr;
93
94    /// Retrieves number of alive nodes
95    async fn alive_nodes_count(&self) -> usize;
96
97    async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
98        let start = Instant::now();
99        while self.alive_nodes_count().await < amount {
100            warn!("wait_for_alive_nodes");
101            if start.elapsed() > timeout {
102                return;
103            }
104            tokio::time::sleep(Duration::from_secs(1)).await;
105        }
106    }
107}
108
109/// Service processes specified set of messages and eventually produces a
110/// DataSource query or update.
111///
112/// Service is allowed to propagate a message to the network as well.
113#[async_trait]
114pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
115    Send + Sync
116{
117    #[allow(unused_variables)]
118    async fn initialize(
119        &mut self,
120        network: Arc<RwLock<N>>,
121        database: Arc<RwLock<DB>>,
122        vm: Arc<RwLock<VM>>,
123    ) -> anyhow::Result<()> {
124        Ok(())
125    }
126
127    async fn execute(
128        &mut self,
129        network: Arc<RwLock<N>>,
130        database: Arc<RwLock<DB>>,
131        vm: Arc<RwLock<VM>>,
132    ) -> anyhow::Result<usize>;
133
134    async fn add_routes(
135        &self,
136        my_topics: &[u8],
137        queue: AsyncQueue<Message>,
138        network: &Arc<RwLock<N>>,
139    ) -> anyhow::Result<()> {
140        let mut guard = network.write().await;
141        for topic in my_topics {
142            guard.add_route(*topic, queue.clone()).await?
143        }
144        Ok(())
145    }
146
147    /// Returns service name.
148    fn name(&self) -> &'static str;
149}
150
151#[derive(Debug)]
152pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
153    network: Arc<RwLock<N>>,
154    database: Arc<RwLock<DB>>,
155    vm_handler: Arc<RwLock<VM>>,
156}
157
158impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
159    for Node<N, DB, VM>
160{
161    fn clone(&self) -> Self {
162        Self {
163            network: self.network.clone(),
164            database: self.database.clone(),
165            vm_handler: self.vm_handler.clone(),
166        }
167    }
168}
169
170impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
171    pub fn new(n: N, d: DB, vm_h: VM) -> Self {
172        Self {
173            network: Arc::new(RwLock::new(n)),
174            database: Arc::new(RwLock::new(d)),
175            vm_handler: Arc::new(RwLock::new(vm_h)),
176        }
177    }
178
179    pub fn database(&self) -> Arc<RwLock<DB>> {
180        self.database.clone()
181    }
182
183    pub fn network(&self) -> Arc<RwLock<N>> {
184        self.network.clone()
185    }
186
187    pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
188        self.vm_handler.clone()
189    }
190
191    pub async fn initialize(
192        &self,
193        services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
194    ) -> anyhow::Result<()> {
195        // Run lazy-initialization of all registered services
196        for service in services.iter_mut() {
197            info!("initialize service {}", service.name());
198            service
199                .initialize(
200                    self.network.clone(),
201                    self.database.clone(),
202                    self.vm_handler.clone(),
203                )
204                .await?;
205        }
206
207        Ok(())
208    }
209
210    /// Sets up and runs a list of services.
211    pub async fn spawn_all(
212        &self,
213        service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
214    ) -> anyhow::Result<()> {
215        // Spawn all services and join-wait for their termination.
216        let mut set = JoinSet::new();
217        set.spawn(async {
218            signal(SignalKind::interrupt())?.recv().await;
219            // TODO: ResultCode
220            Ok(2)
221        });
222
223        for mut s in service_list.into_iter() {
224            let n = self.network.clone();
225            let d = self.database.clone();
226            let vm = self.vm_handler.clone();
227
228            let name = s.name();
229            info!("starting service {}", name);
230
231            set.spawn(async move { s.execute(n, d, vm).await });
232        }
233
234        // Wait for all spawned services to terminate with a result code or
235        // an error. Result code 1 means abort all services.
236        // This is usually triggered by SIGINIT signal.
237        while let Some(res) = set.join_next().await {
238            if let Ok(r) = res {
239                match r {
240                    Ok(rcode) => {
241                        // handle SIGTERM signal
242                        if rcode == 2 {
243                            set.abort_all();
244                        }
245                    }
246                    Err(e) => {
247                        error!("service terminated with err{}", e);
248                    }
249                }
250            }
251        }
252
253        info!("shutdown ...");
254
255        // Release DataSource
256
257        Ok(())
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    // need to add the benchmark dep here so that the
264    // `unused_crate_dependencies` lint is satisfied
265    use criterion as _;
266}