1#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9#![feature(lazy_cell)]
10
11#[cfg(feature = "archive")]
12pub mod archive;
13pub mod chain;
14pub mod database;
15pub mod databroker;
16pub mod mempool;
17pub mod network;
18pub mod telemetry;
19pub mod vm;
20
21use std::net::SocketAddr;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use async_trait::async_trait;
26use node_data::message::payload::Inv;
27use node_data::message::{AsyncQueue, Message};
28use tokio::signal::unix::{signal, SignalKind};
29use tokio::sync::RwLock;
30use tokio::task::JoinSet;
31use tracing::{error, info, warn};
32
33pub trait Filter {
39 fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
41}
42
43pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
44
45#[async_trait]
46pub trait Network: Send + Sync + 'static {
47 async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
49
50 async fn flood_request(
52 &self,
53 msg_inv: &Inv,
54 ttl_as_sec: Option<u64>,
55 hops_limit: u16,
56 ) -> anyhow::Result<()>;
57
58 async fn send_to_peer(
60 &self,
61 msg: Message,
62 peer_addr: std::net::SocketAddr,
63 ) -> anyhow::Result<()>;
64
65 async fn send_to_alive_peers(
67 &self,
68 msg: Message,
69 amount: usize,
70 ) -> anyhow::Result<()>;
71
72 async fn add_route(
74 &mut self,
75 msg_type: u8,
76 queue: AsyncQueue<Message>,
77 ) -> anyhow::Result<()>;
78
79 async fn add_filter(
81 &mut self,
82 msg_type: u8,
83 filter: BoxedFilter,
84 ) -> anyhow::Result<()>;
85
86 fn get_info(&self) -> anyhow::Result<String>;
88
89 fn public_addr(&self) -> &SocketAddr;
91
92 async fn alive_nodes_count(&self) -> usize;
94
95 async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
96 let start = Instant::now();
97 while self.alive_nodes_count().await < amount {
98 warn!("wait_for_alive_nodes");
99 if start.elapsed() > timeout {
100 return;
101 }
102 tokio::time::sleep(Duration::from_secs(1)).await;
103 }
104 }
105}
106
107#[async_trait]
112pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
113 Send + Sync
114{
115 #[allow(unused_variables)]
116 async fn initialize(
117 &mut self,
118 network: Arc<RwLock<N>>,
119 database: Arc<RwLock<DB>>,
120 vm: Arc<RwLock<VM>>,
121 ) -> anyhow::Result<()> {
122 Ok(())
123 }
124
125 async fn execute(
126 &mut self,
127 network: Arc<RwLock<N>>,
128 database: Arc<RwLock<DB>>,
129 vm: Arc<RwLock<VM>>,
130 ) -> anyhow::Result<usize>;
131
132 async fn add_routes(
133 &self,
134 my_topics: &[u8],
135 queue: AsyncQueue<Message>,
136 network: &Arc<RwLock<N>>,
137 ) -> anyhow::Result<()> {
138 let mut guard = network.write().await;
139 for topic in my_topics {
140 guard.add_route(*topic, queue.clone()).await?
141 }
142 Ok(())
143 }
144
145 fn name(&self) -> &'static str;
147}
148
149#[derive(Debug)]
150pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
151 network: Arc<RwLock<N>>,
152 database: Arc<RwLock<DB>>,
153 vm_handler: Arc<RwLock<VM>>,
154}
155
156impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
157 for Node<N, DB, VM>
158{
159 fn clone(&self) -> Self {
160 Self {
161 network: self.network.clone(),
162 database: self.database.clone(),
163 vm_handler: self.vm_handler.clone(),
164 }
165 }
166}
167
168impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
169 pub fn new(n: N, d: DB, vm_h: VM) -> Self {
170 Self {
171 network: Arc::new(RwLock::new(n)),
172 database: Arc::new(RwLock::new(d)),
173 vm_handler: Arc::new(RwLock::new(vm_h)),
174 }
175 }
176
177 pub fn database(&self) -> Arc<RwLock<DB>> {
178 self.database.clone()
179 }
180
181 pub fn network(&self) -> Arc<RwLock<N>> {
182 self.network.clone()
183 }
184
185 pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
186 self.vm_handler.clone()
187 }
188
189 pub async fn initialize(
190 &self,
191 services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
192 ) -> anyhow::Result<()> {
193 for service in services.iter_mut() {
195 info!("initialize service {}", service.name());
196 service
197 .initialize(
198 self.network.clone(),
199 self.database.clone(),
200 self.vm_handler.clone(),
201 )
202 .await?;
203 }
204
205 Ok(())
206 }
207
208 pub async fn spawn_all(
210 &self,
211 service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
212 ) -> anyhow::Result<()> {
213 let mut set = JoinSet::new();
215 set.spawn(async {
216 signal(SignalKind::interrupt())?.recv().await;
217 Ok(2)
219 });
220
221 for mut s in service_list.into_iter() {
222 let n = self.network.clone();
223 let d = self.database.clone();
224 let vm = self.vm_handler.clone();
225
226 let name = s.name();
227 info!("starting service {}", name);
228
229 set.spawn(async move { s.execute(n, d, vm).await });
230 }
231
232 while let Some(res) = set.join_next().await {
236 if let Ok(r) = res {
237 match r {
238 Ok(rcode) => {
239 if rcode == 2 {
241 set.abort_all();
242 }
243 }
244 Err(e) => {
245 error!("service terminated with err{}", e);
246 }
247 }
248 }
249 }
250
251 info!("shutdown ...");
252
253 Ok(())
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use criterion as _;
264}