1#![deny(unused_crate_dependencies)]
8#![deny(unused_extern_crates)]
9
10#[cfg(feature = "archive")]
11pub mod archive;
12pub mod chain;
13pub mod database;
14pub mod databroker;
15pub mod mempool;
16pub mod network;
17pub mod telemetry;
18pub mod vm;
19
20use std::net::SocketAddr;
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23
24use async_trait::async_trait;
25use node_data::message::payload::Inv;
26use node_data::message::{AsyncQueue, Message};
27use tokio::signal::unix::{SignalKind, signal};
28use tokio::sync::RwLock;
29use tokio::task::JoinSet;
30use tracing::{error, info, warn};
31
32pub trait Filter {
38 fn filter(&mut self, msg: &Message) -> anyhow::Result<()>;
40}
41
42pub type BoxedFilter = Box<dyn Filter + Sync + Send>;
43
44#[async_trait]
45pub trait Network: Send + Sync + 'static {
46 async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>;
48
49 async fn flood_request(
51 &self,
52 msg_inv: &Inv,
53 ttl_as_sec: Option<u64>,
54 hops_limit: u16,
55 ) -> anyhow::Result<()>;
56
57 async fn send_to_peer(
59 &self,
60 msg: Message,
61 peer_addr: std::net::SocketAddr,
62 ) -> anyhow::Result<()>;
63
64 async fn send_to_alive_peers(
66 &self,
67 msg: Message,
68 amount: usize,
69 ) -> anyhow::Result<()>;
70
71 async fn add_route(
73 &mut self,
74 msg_type: u8,
75 queue: AsyncQueue<Message>,
76 ) -> anyhow::Result<()>;
77
78 async fn add_filter(
80 &mut self,
81 msg_type: u8,
82 filter: BoxedFilter,
83 ) -> anyhow::Result<()>;
84
85 fn get_info(&self) -> anyhow::Result<String>;
87
88 fn public_addr(&self) -> &SocketAddr;
90
91 async fn alive_nodes_count(&self) -> usize;
93
94 async fn wait_for_alive_nodes(&self, amount: usize, timeout: Duration) {
95 let start = Instant::now();
96 while self.alive_nodes_count().await < amount {
97 warn!("wait_for_alive_nodes");
98 if start.elapsed() > timeout {
99 return;
100 }
101 tokio::time::sleep(Duration::from_secs(1)).await;
102 }
103 }
104}
105
106#[async_trait]
111pub trait LongLivedService<N: Network, DB: database::DB, VM: vm::VMExecution>:
112 Send + Sync
113{
114 #[allow(unused_variables)]
115 async fn initialize(
116 &mut self,
117 network: Arc<RwLock<N>>,
118 database: Arc<RwLock<DB>>,
119 vm: Arc<RwLock<VM>>,
120 ) -> anyhow::Result<()> {
121 Ok(())
122 }
123
124 async fn execute(
125 &mut self,
126 network: Arc<RwLock<N>>,
127 database: Arc<RwLock<DB>>,
128 vm: Arc<RwLock<VM>>,
129 ) -> anyhow::Result<usize>;
130
131 async fn add_routes(
132 &self,
133 my_topics: &[u8],
134 queue: AsyncQueue<Message>,
135 network: &Arc<RwLock<N>>,
136 ) -> anyhow::Result<()> {
137 let mut guard = network.write().await;
138 for topic in my_topics {
139 guard.add_route(*topic, queue.clone()).await?
140 }
141 Ok(())
142 }
143
144 fn name(&self) -> &'static str;
146}
147
148#[derive(Debug)]
149pub struct Node<N: Network, DB: database::DB, VM: vm::VMExecution> {
150 network: Arc<RwLock<N>>,
151 database: Arc<RwLock<DB>>,
152 vm_handler: Arc<RwLock<VM>>,
153}
154
155impl<N: Network, DB: database::DB, VM: vm::VMExecution> Clone
156 for Node<N, DB, VM>
157{
158 fn clone(&self) -> Self {
159 Self {
160 network: self.network.clone(),
161 database: self.database.clone(),
162 vm_handler: self.vm_handler.clone(),
163 }
164 }
165}
166
167impl<N: Network, DB: database::DB, VM: vm::VMExecution> Node<N, DB, VM> {
168 pub fn new(n: N, d: DB, vm_h: VM) -> Self {
169 Self {
170 network: Arc::new(RwLock::new(n)),
171 database: Arc::new(RwLock::new(d)),
172 vm_handler: Arc::new(RwLock::new(vm_h)),
173 }
174 }
175
176 pub fn database(&self) -> Arc<RwLock<DB>> {
177 self.database.clone()
178 }
179
180 pub fn network(&self) -> Arc<RwLock<N>> {
181 self.network.clone()
182 }
183
184 pub fn vm_handler(&self) -> Arc<RwLock<VM>> {
185 self.vm_handler.clone()
186 }
187
188 pub async fn initialize(
189 &self,
190 services: &mut [Box<dyn LongLivedService<N, DB, VM>>],
191 ) -> anyhow::Result<()> {
192 for service in services.iter_mut() {
194 info!("initialize service {}", service.name());
195 service
196 .initialize(
197 self.network.clone(),
198 self.database.clone(),
199 self.vm_handler.clone(),
200 )
201 .await?;
202 }
203
204 Ok(())
205 }
206
207 pub async fn spawn_all(
209 &self,
210 service_list: Vec<Box<dyn LongLivedService<N, DB, VM>>>,
211 ) -> anyhow::Result<()> {
212 let mut set = JoinSet::new();
214 set.spawn(async {
215 signal(SignalKind::interrupt())?.recv().await;
216 Ok(2)
218 });
219
220 for mut s in service_list.into_iter() {
221 let n = self.network.clone();
222 let d = self.database.clone();
223 let vm = self.vm_handler.clone();
224
225 let name = s.name();
226 info!("starting service {}", name);
227
228 set.spawn(async move { s.execute(n, d, vm).await });
229 }
230
231 while let Some(res) = set.join_next().await {
235 if let Ok(r) = res {
236 match r {
237 Ok(rcode) => {
238 if rcode == 2 {
240 set.abort_all();
241 }
242 }
243 Err(e) => {
244 error!("service terminated with err{}", e);
245 }
246 }
247 }
248 }
249
250 info!("shutdown ...");
251
252 Ok(())
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use criterion as _;
263}