1#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9
10#[cfg(feature = "archive")]
11pub mod archive;
12pub mod chain;
13pub mod database;
14pub mod databroker;
15pub mod mempool;
16pub mod network;
17pub mod telemetry;
18pub mod vm;
19
20use std::net::SocketAddr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use async_trait::async_trait;
25use node_data::message::payload::Inv;
26use node_data::message::{AsyncQueue, Message};
27use tokio::signal::unix::{signal, SignalKind};
28use tokio::sync::RwLock;
29use tokio::task::JoinSet;
30use tracing::{error, info, warn};
31
32use native_tls as _; pub trait Filter {
40 fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
42}
43
44pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
45
46#[async_trait]
47pub trait Network: Send + Sync + 'static {
48 async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
50
51 async fn flood_request(
53 &self,
54 msg_inv: &Inv,
55 ttl_as_sec: Option<u64>,
56 hops_limit: u16,
57 ) -> anyhow::Result<()>;
58
59 async fn send_to_peer(
61 &self,
62 msg: Message,
63 peer_addr: std::net::SocketAddr,
64 ) -> anyhow::Result<()>;
65
66 async fn send_to_alive_peers(
68 &self,
69 msg: Message,
70 amount: usize,
71 ) -> anyhow::Result<()>;
72
73 async fn add_route(
75 &mut self,
76 msg_type: u8,
77 queue: AsyncQueue<Message>,
78 ) -> anyhow::Result<()>;
79
80 async fn add_filter(
82 &mut self,
83 msg_type: u8,
84 filter: BoxedFilter,
85 ) -> anyhow::Result<()>;
86
87 fn get_info(&self) -> anyhow::Result<String>;
89
90 fn public_addr(&self) -> &SocketAddr;
92
93 async fn alive_nodes_count(&self) -> usize;
95
96 async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
97 let start = Instant::now();
98 while self.alive_nodes_count().await < amount {
99 warn!("wait_for_alive_nodes");
100 if start.elapsed() > timeout {
101 return;
102 }
103 tokio::time::sleep(Duration::from_secs(1)).await;
104 }
105 }
106}
107
108#[async_trait]
113pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
114 Send + Sync
115{
116 #[allow(unused_variables)]
117 async fn initialize(
118 &mut self,
119 network: Arc<RwLock<N>>,
120 database: Arc<RwLock<DB>>,
121 vm: Arc<RwLock<VM>>,
122 ) -> anyhow::Result<()> {
123 Ok(())
124 }
125
126 async fn execute(
127 &mut self,
128 network: Arc<RwLock<N>>,
129 database: Arc<RwLock<DB>>,
130 vm: Arc<RwLock<VM>>,
131 ) -> anyhow::Result<usize>;
132
133 async fn add_routes(
134 &self,
135 my_topics: &[u8],
136 queue: AsyncQueue<Message>,
137 network: &Arc<RwLock<N>>,
138 ) -> anyhow::Result<()> {
139 let mut guard = network.write().await;
140 for topic in my_topics {
141 guard.add_route(*topic, queue.clone()).await?
142 }
143 Ok(())
144 }
145
146 fn name(&self) -> &'static str;
148}
149
150#[derive(Debug)]
151pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
152 network: Arc<RwLock<N>>,
153 database: Arc<RwLock<DB>>,
154 vm_handler: Arc<RwLock<VM>>,
155}
156
157impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
158 for Node<N, DB, VM>
159{
160 fn clone(&self) -> Self {
161 Self {
162 network: self.network.clone(),
163 database: self.database.clone(),
164 vm_handler: self.vm_handler.clone(),
165 }
166 }
167}
168
169impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
170 pub fn new(n: N, d: DB, vm_h: VM) -> Self {
171 Self {
172 network: Arc::new(RwLock::new(n)),
173 database: Arc::new(RwLock::new(d)),
174 vm_handler: Arc::new(RwLock::new(vm_h)),
175 }
176 }
177
178 pub fn database(&self) -> Arc<RwLock<DB>> {
179 self.database.clone()
180 }
181
182 pub fn network(&self) -> Arc<RwLock<N>> {
183 self.network.clone()
184 }
185
186 pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
187 self.vm_handler.clone()
188 }
189
190 pub async fn initialize(
191 &self,
192 services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
193 ) -> anyhow::Result<()> {
194 for service in services.iter_mut() {
196 info!("initialize service {}", service.name());
197 service
198 .initialize(
199 self.network.clone(),
200 self.database.clone(),
201 self.vm_handler.clone(),
202 )
203 .await?;
204 }
205
206 Ok(())
207 }
208
209 pub async fn spawn_all(
211 &self,
212 service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
213 ) -> anyhow::Result<()> {
214 let mut set = JoinSet::new();
216 set.spawn(async {
217 signal(SignalKind::interrupt())?.recv().await;
218 Ok(2)
220 });
221
222 for mut s in service_list.into_iter() {
223 let n = self.network.clone();
224 let d = self.database.clone();
225 let vm = self.vm_handler.clone();
226
227 let name = s.name();
228 info!("starting service {}", name);
229
230 set.spawn(async move { s.execute(n, d, vm).await });
231 }
232
233 while let Some(res) = set.join_next().await {
237 if let Ok(r) = res {
238 match r {
239 Ok(rcode) => {
240 if rcode == 2 {
242 set.abort_all();
243 }
244 }
245 Err(e) => {
246 error!("service terminated with err{}", e);
247 }
248 }
249 }
250 }
251
252 info!("shutdown ...");
253
254 Ok(())
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use criterion as _;
265}