dusk_node/
lib.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9
10#[cfg(feature = "archive")]
11pub mod archive;
12pub mod chain;
13pub mod database;
14pub mod databroker;
15pub mod mempool;
16pub mod network;
17pub mod telemetry;
18pub mod vm;
19
20use std::net::SocketAddr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use async_trait::async_trait;
25use node_data::message::payload::Inv;
26use node_data::message::{AsyncQueue, Message};
27use tokio::signal::unix::{signal, SignalKind};
28use tokio::sync::RwLock;
29use tokio::task::JoinSet;
30use tracing::{error, info, warn};
31
32use native_tls as _; // Required to satisfy unused_crate_dependencies
33
34/// Filter is used by Network implementor to filter messages before re-routing
35/// them. It's like the middleware in HTTP pipeline.
36///
37/// To avoid delaying other messages handling, the execution of any filter
38/// should be fast as it is performed in the message handler .
39pub trait Filter {
40    /// Filters a message.
41    fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
42}
43
44pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
45
46#[async_trait]
47pub trait Network: Send + Sync + 'static {
48    /// Broadcasts a fire-and-forget message.
49    async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
50
51    /// Broadcasts a request message
52    async fn flood_request(
53        &self,
54        msg_inv: &Inv,
55        ttl_as_sec: Option<u64>,
56        hops_limit: u16,
57    ) -> anyhow::Result<()>;
58
59    /// Sends a message to a specified peer.
60    async fn send_to_peer(
61        &self,
62        msg: Message,
63        peer_addr: std::net::SocketAddr,
64    ) -> anyhow::Result<()>;
65
66    /// Sends to random set of alive peers.
67    async fn send_to_alive_peers(
68        &self,
69        msg: Message,
70        amount: usize,
71    ) -> anyhow::Result<()>;
72
73    /// Routes any message of the specified type to this queue.
74    async fn add_route(
75        &mut self,
76        msg_type: u8,
77        queue: AsyncQueue<Message>,
78    ) -> anyhow::Result<()>;
79
80    /// Moves a filter of a specified topic to Network.
81    async fn add_filter(
82        &mut self,
83        msg_type: u8,
84        filter: BoxedFilter,
85    ) -> anyhow::Result<()>;
86
87    /// Retrieves information about the network.
88    fn get_info(&self) -> anyhow::Result<String>;
89
90    /// Returns public address in Kadcast
91    fn public_addr(&self) -> &SocketAddr;
92
93    /// Retrieves number of alive nodes
94    async fn alive_nodes_count(&self) -> usize;
95
96    async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
97        let start = Instant::now();
98        while self.alive_nodes_count().await < amount {
99            warn!("wait_for_alive_nodes");
100            if start.elapsed() > timeout {
101                return;
102            }
103            tokio::time::sleep(Duration::from_secs(1)).await;
104        }
105    }
106}
107
108/// Service processes specified set of messages and eventually produces a
109/// DataSource query or update.
110///
111/// Service is allowed to propagate a message to the network as well.
112#[async_trait]
113pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
114    Send + Sync
115{
116    #[allow(unused_variables)]
117    async fn initialize(
118        &mut self,
119        network: Arc<RwLock<N>>,
120        database: Arc<RwLock<DB>>,
121        vm: Arc<RwLock<VM>>,
122    ) -> anyhow::Result<()> {
123        Ok(())
124    }
125
126    async fn execute(
127        &mut self,
128        network: Arc<RwLock<N>>,
129        database: Arc<RwLock<DB>>,
130        vm: Arc<RwLock<VM>>,
131    ) -> anyhow::Result<usize>;
132
133    async fn add_routes(
134        &self,
135        my_topics: &[u8],
136        queue: AsyncQueue<Message>,
137        network: &Arc<RwLock<N>>,
138    ) -> anyhow::Result<()> {
139        let mut guard = network.write().await;
140        for topic in my_topics {
141            guard.add_route(*topic, queue.clone()).await?
142        }
143        Ok(())
144    }
145
146    /// Returns service name.
147    fn name(&self) -> &'static str;
148}
149
150#[derive(Debug)]
151pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
152    network: Arc<RwLock<N>>,
153    database: Arc<RwLock<DB>>,
154    vm_handler: Arc<RwLock<VM>>,
155}
156
157impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
158    for Node<N, DB, VM>
159{
160    fn clone(&self) -> Self {
161        Self {
162            network: self.network.clone(),
163            database: self.database.clone(),
164            vm_handler: self.vm_handler.clone(),
165        }
166    }
167}
168
169impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
170    pub fn new(n: N, d: DB, vm_h: VM) -> Self {
171        Self {
172            network: Arc::new(RwLock::new(n)),
173            database: Arc::new(RwLock::new(d)),
174            vm_handler: Arc::new(RwLock::new(vm_h)),
175        }
176    }
177
178    pub fn database(&self) -> Arc<RwLock<DB>> {
179        self.database.clone()
180    }
181
182    pub fn network(&self) -> Arc<RwLock<N>> {
183        self.network.clone()
184    }
185
186    pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
187        self.vm_handler.clone()
188    }
189
190    pub async fn initialize(
191        &self,
192        services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
193    ) -> anyhow::Result<()> {
194        // Run lazy-initialization of all registered services
195        for service in services.iter_mut() {
196            info!("initialize service {}", service.name());
197            service
198                .initialize(
199                    self.network.clone(),
200                    self.database.clone(),
201                    self.vm_handler.clone(),
202                )
203                .await?;
204        }
205
206        Ok(())
207    }
208
209    /// Sets up and runs a list of services.
210    pub async fn spawn_all(
211        &self,
212        service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
213    ) -> anyhow::Result<()> {
214        // Spawn all services and join-wait for their termination.
215        let mut set = JoinSet::new();
216        set.spawn(async {
217            signal(SignalKind::interrupt())?.recv().await;
218            // TODO: ResultCode
219            Ok(2)
220        });
221
222        for mut s in service_list.into_iter() {
223            let n = self.network.clone();
224            let d = self.database.clone();
225            let vm = self.vm_handler.clone();
226
227            let name = s.name();
228            info!("starting service {}", name);
229
230            set.spawn(async move { s.execute(n, d, vm).await });
231        }
232
233        // Wait for all spawned services to terminate with a result code or
234        // an error. Result code 1 means abort all services.
235        // This is usually triggered by SIGINIT signal.
236        while let Some(res) = set.join_next().await {
237            if let Ok(r) = res {
238                match r {
239                    Ok(rcode) => {
240                        // handle SIGTERM signal
241                        if rcode == 2 {
242                            set.abort_all();
243                        }
244                    }
245                    Err(e) => {
246                        error!("service terminated with err{}", e);
247                    }
248                }
249            }
250        }
251
252        info!("shutdown ...");
253
254        // Release DataSource
255
256        Ok(())
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    // need to add the benchmark dep here so that the
263    // `unused_crate_dependencies` lint is satisfied
264    use criterion as _;
265}