Skip to main content

dusk_node/
lib.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9
10#[cfg(feature = "archive")]
11pub mod archive;
12pub mod chain;
13pub mod database;
14pub mod databroker;
15pub mod mempool;
16pub mod network;
17pub mod telemetry;
18pub mod vm;
19
20use std::net::SocketAddr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use async_trait::async_trait;
25use node_data::message::payload::Inv;
26use node_data::message::{AsyncQueue, Message};
27use tokio::signal::unix::{SignalKind, signal};
28use tokio::sync::RwLock;
29use tokio::task::JoinSet;
30use tracing::{error, info, warn};
31
32/// Filter is used by Network implementor to filter messages before re-routing
33/// them. It's like the middleware in HTTP pipeline.
34///
35/// To avoid delaying other messages handling, the execution of any filter
36/// should be fast as it is performed in the message handler .
37pub trait Filter {
38    /// Filters a message.
39    fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
40}
41
42pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
43
44#[async_trait]
45pub trait Network: Send + Sync + 'static {
46    /// Broadcasts a fire-and-forget message.
47    async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
48
49    /// Broadcasts a request message
50    async fn flood_request(
51        &self,
52        msg_inv: &Inv,
53        ttl_as_sec: Option<u64>,
54        hops_limit: u16,
55    ) -> anyhow::Result<()>;
56
57    /// Sends a message to a specified peer.
58    async fn send_to_peer(
59        &self,
60        msg: Message,
61        peer_addr: std::net::SocketAddr,
62    ) -> anyhow::Result<()>;
63
64    /// Sends to random set of alive peers.
65    async fn send_to_alive_peers(
66        &self,
67        msg: Message,
68        amount: usize,
69    ) -> anyhow::Result<()>;
70
71    /// Routes any message of the specified type to this queue.
72    async fn add_route(
73        &mut self,
74        msg_type: u8,
75        queue: AsyncQueue<Message>,
76    ) -> anyhow::Result<()>;
77
78    /// Moves a filter of a specified topic to Network.
79    async fn add_filter(
80        &mut self,
81        msg_type: u8,
82        filter: BoxedFilter,
83    ) -> anyhow::Result<()>;
84
85    /// Retrieves information about the network.
86    fn get_info(&self) -> anyhow::Result<String>;
87
88    /// Returns public address in Kadcast
89    fn public_addr(&self) -> &SocketAddr;
90
91    /// Retrieves number of alive nodes
92    async fn alive_nodes_count(&self) -> usize;
93
94    async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
95        let start = Instant::now();
96        while self.alive_nodes_count().await < amount {
97            warn!("wait_for_alive_nodes");
98            if start.elapsed() > timeout {
99                return;
100            }
101            tokio::time::sleep(Duration::from_secs(1)).await;
102        }
103    }
104}
105
106/// Service processes specified set of messages and eventually produces a
107/// DataSource query or update.
108///
109/// Service is allowed to propagate a message to the network as well.
110#[async_trait]
111pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
112    Send + Sync
113{
114    #[allow(unused_variables)]
115    async fn initialize(
116        &mut self,
117        network: Arc<RwLock<N>>,
118        database: Arc<RwLock<DB>>,
119        vm: Arc<RwLock<VM>>,
120    ) -> anyhow::Result<()> {
121        Ok(())
122    }
123
124    async fn execute(
125        &mut self,
126        network: Arc<RwLock<N>>,
127        database: Arc<RwLock<DB>>,
128        vm: Arc<RwLock<VM>>,
129    ) -> anyhow::Result<usize>;
130
131    async fn add_routes(
132        &self,
133        my_topics: &[u8],
134        queue: AsyncQueue<Message>,
135        network: &Arc<RwLock<N>>,
136    ) -> anyhow::Result<()> {
137        let mut guard = network.write().await;
138        for topic in my_topics {
139            guard.add_route(*topic, queue.clone()).await?
140        }
141        Ok(())
142    }
143
144    /// Returns service name.
145    fn name(&self) -> &'static str;
146}
147
148#[derive(Debug)]
149pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
150    network: Arc<RwLock<N>>,
151    database: Arc<RwLock<DB>>,
152    vm_handler: Arc<RwLock<VM>>,
153}
154
155impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
156    for Node<N, DB, VM>
157{
158    fn clone(&self) -> Self {
159        Self {
160            network: self.network.clone(),
161            database: self.database.clone(),
162            vm_handler: self.vm_handler.clone(),
163        }
164    }
165}
166
167impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
168    pub fn new(n: N, d: DB, vm_h: VM) -> Self {
169        Self {
170            network: Arc::new(RwLock::new(n)),
171            database: Arc::new(RwLock::new(d)),
172            vm_handler: Arc::new(RwLock::new(vm_h)),
173        }
174    }
175
176    pub fn database(&self) -> Arc<RwLock<DB>> {
177        self.database.clone()
178    }
179
180    pub fn network(&self) -> Arc<RwLock<N>> {
181        self.network.clone()
182    }
183
184    pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
185        self.vm_handler.clone()
186    }
187
188    pub async fn initialize(
189        &self,
190        services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
191    ) -> anyhow::Result<()> {
192        // Run lazy-initialization of all registered services
193        for service in services.iter_mut() {
194            info!("initialize service {}", service.name());
195            service
196                .initialize(
197                    self.network.clone(),
198                    self.database.clone(),
199                    self.vm_handler.clone(),
200                )
201                .await?;
202        }
203
204        Ok(())
205    }
206
207    /// Sets up and runs a list of services.
208    pub async fn spawn_all(
209        &self,
210        service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
211    ) -> anyhow::Result<()> {
212        // Spawn all services and join-wait for their termination.
213        let mut set = JoinSet::new();
214        set.spawn(async {
215            signal(SignalKind::interrupt())?.recv().await;
216            // TODO: ResultCode
217            Ok(2)
218        });
219
220        for mut s in service_list.into_iter() {
221            let n = self.network.clone();
222            let d = self.database.clone();
223            let vm = self.vm_handler.clone();
224
225            let name = s.name();
226            info!("starting service {}", name);
227
228            set.spawn(async move { s.execute(n, d, vm).await });
229        }
230
231        // Wait for all spawned services to terminate with a result code or
232        // an error. Result code 1 means abort all services.
233        // This is usually triggered by SIGINIT signal.
234        while let Some(res) = set.join_next().await {
235            if let Ok(r) = res {
236                match r {
237                    Ok(rcode) => {
238                        // handle SIGTERM signal
239                        if rcode == 2 {
240                            set.abort_all();
241                        }
242                    }
243                    Err(e) => {
244                        error!("service terminated with err{}", e);
245                    }
246                }
247            }
248        }
249
250        info!("shutdown ...");
251
252        // Release DataSource
253
254        Ok(())
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    // Dev-dependencies only used in integration tests trigger the
261    // unused_crate_dependencies lint, so we re-import them here.
262    use criterion as _;
263}