1#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9#![feature(lazy_cell)]
10
11#[cfg(feature = "archive")]
12pub mod archive;
13pub mod chain;
14pub mod database;
15pub mod databroker;
16pub mod mempool;
17pub mod network;
18pub mod telemetry;
19pub mod vm;
20
21use std::net::SocketAddr;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24
25use async_trait::async_trait;
26use node_data::message::payload::Inv;
27use node_data::message::{AsyncQueue, Message};
28use tokio::signal::unix::{signal, SignalKind};
29use tokio::sync::RwLock;
30use tokio::task::JoinSet;
31use tracing::{error, info, warn};
32
33use native_tls as _; pub trait Filter {
41 fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
43}
44
45pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
46
47#[async_trait]
48pub trait Network: Send + Sync + 'static {
49 async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
51
52 async fn flood_request(
54 &self,
55 msg_inv: &Inv,
56 ttl_as_sec: Option<u64>,
57 hops_limit: u16,
58 ) -> anyhow::Result<()>;
59
60 async fn send_to_peer(
62 &self,
63 msg: Message,
64 peer_addr: std::net::SocketAddr,
65 ) -> anyhow::Result<()>;
66
67 async fn send_to_alive_peers(
69 &self,
70 msg: Message,
71 amount: usize,
72 ) -> anyhow::Result<()>;
73
74 async fn add_route(
76 &mut self,
77 msg_type: u8,
78 queue: AsyncQueue<Message>,
79 ) -> anyhow::Result<()>;
80
81 async fn add_filter(
83 &mut self,
84 msg_type: u8,
85 filter: BoxedFilter,
86 ) -> anyhow::Result<()>;
87
88 fn get_info(&self) -> anyhow::Result<String>;
90
91 fn public_addr(&self) -> &SocketAddr;
93
94 async fn alive_nodes_count(&self) -> usize;
96
97 async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
98 let start = Instant::now();
99 while self.alive_nodes_count().await < amount {
100 warn!("wait_for_alive_nodes");
101 if start.elapsed() > timeout {
102 return;
103 }
104 tokio::time::sleep(Duration::from_secs(1)).await;
105 }
106 }
107}
108
109#[async_trait]
114pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
115 Send + Sync
116{
117 #[allow(unused_variables)]
118 async fn initialize(
119 &mut self,
120 network: Arc<RwLock<N>>,
121 database: Arc<RwLock<DB>>,
122 vm: Arc<RwLock<VM>>,
123 ) -> anyhow::Result<()> {
124 Ok(())
125 }
126
127 async fn execute(
128 &mut self,
129 network: Arc<RwLock<N>>,
130 database: Arc<RwLock<DB>>,
131 vm: Arc<RwLock<VM>>,
132 ) -> anyhow::Result<usize>;
133
134 async fn add_routes(
135 &self,
136 my_topics: &[u8],
137 queue: AsyncQueue<Message>,
138 network: &Arc<RwLock<N>>,
139 ) -> anyhow::Result<()> {
140 let mut guard = network.write().await;
141 for topic in my_topics {
142 guard.add_route(*topic, queue.clone()).await?
143 }
144 Ok(())
145 }
146
147 fn name(&self) -> &'static str;
149}
150
151#[derive(Debug)]
152pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
153 network: Arc<RwLock<N>>,
154 database: Arc<RwLock<DB>>,
155 vm_handler: Arc<RwLock<VM>>,
156}
157
158impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
159 for Node<N, DB, VM>
160{
161 fn clone(&self) -> Self {
162 Self {
163 network: self.network.clone(),
164 database: self.database.clone(),
165 vm_handler: self.vm_handler.clone(),
166 }
167 }
168}
169
170impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
171 pub fn new(n: N, d: DB, vm_h: VM) -> Self {
172 Self {
173 network: Arc::new(RwLock::new(n)),
174 database: Arc::new(RwLock::new(d)),
175 vm_handler: Arc::new(RwLock::new(vm_h)),
176 }
177 }
178
179 pub fn database(&self) -> Arc<RwLock<DB>> {
180 self.database.clone()
181 }
182
183 pub fn network(&self) -> Arc<RwLock<N>> {
184 self.network.clone()
185 }
186
187 pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
188 self.vm_handler.clone()
189 }
190
191 pub async fn initialize(
192 &self,
193 services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
194 ) -> anyhow::Result<()> {
195 for service in services.iter_mut() {
197 info!("initialize service {}", service.name());
198 service
199 .initialize(
200 self.network.clone(),
201 self.database.clone(),
202 self.vm_handler.clone(),
203 )
204 .await?;
205 }
206
207 Ok(())
208 }
209
210 pub async fn spawn_all(
212 &self,
213 service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
214 ) -> anyhow::Result<()> {
215 let mut set = JoinSet::new();
217 set.spawn(async {
218 signal(SignalKind::interrupt())?.recv().await;
219 Ok(2)
221 });
222
223 for mut s in service_list.into_iter() {
224 let n = self.network.clone();
225 let d = self.database.clone();
226 let vm = self.vm_handler.clone();
227
228 let name = s.name();
229 info!("starting service {}", name);
230
231 set.spawn(async move { s.execute(n, d, vm).await });
232 }
233
234 while let Some(res) = set.join_next().await {
238 if let Ok(r) = res {
239 match r {
240 Ok(rcode) => {
241 if rcode == 2 {
243 set.abort_all();
244 }
245 }
246 Err(e) => {
247 error!("service terminated with err{}", e);
248 }
249 }
250 }
251 }
252
253 info!("shutdown ...");
254
255 Ok(())
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use criterion as _;
266}