1#![feature(trivial_bounds)]
9pub mod config;
10pub mod date_time;
11pub mod event;
12pub mod hash;
13pub mod keys;
14pub mod logs;
15pub mod merkle_tree;
16pub mod pipe;
17pub mod pool;
18pub mod size;
19pub mod sys;
20pub mod system_paths;
21pub mod vector_clock;
22pub mod vector_clock_actor;
23
24use std::{
25 env,
26 sync::{
27 OnceLock,
28 atomic::{AtomicU64, Ordering},
29 },
30};
31
32use figlet_rs::FIGfont;
33use threadpool::ThreadPool;
34use tracing::info;
35
36use crate as xaeroflux_core;
37pub use crate::{config::Config, logs::init_logging};
38
39pub static NEXT_ID: AtomicU64 = AtomicU64::new(1);
40
41pub fn next_id() -> u64 {
43 NEXT_ID.fetch_add(1, Ordering::SeqCst)
44}
45pub static CONF: OnceLock<config::Config> = OnceLock::new();
49
50pub static DISPATCHER_POOL: OnceLock<ThreadPool> = OnceLock::new();
52
53pub static IO_POOL: OnceLock<ThreadPool> = OnceLock::new();
55
56pub static XAERO_DISPATCHER_POOL: OnceLock<ThreadPool> = OnceLock::new();
58
59pub static P2P_RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
61
62pub fn shutdown_all_pools() -> Result<(), Box<dyn std::error::Error>> {
64 let dpo = DISPATCHER_POOL.get();
65 match dpo.as_ref() {
66 None => {
67 tracing::warn!("Dispatcher pool cannot be killed because it is unavailable!")
68 }
69 Some(dp) => {
70 dp.join();
71 }
72 }
73
74 let iopo = IO_POOL.get();
75 match iopo.as_ref() {
76 None => {
77 tracing::warn!("Dispatcher pool cannot be killed because it is unavailable!")
78 }
79 Some(iop) => {
80 iop.join();
81 }
82 }
83
84 let xdpo = XAERO_DISPATCHER_POOL.get();
85 match xdpo.as_ref() {
86 None => {
87 tracing::warn!("Dispatcher pool cannot be killed because it is unavailable!")
88 }
89 Some(xdp) => {
90 xdp.join();
91 }
92 }
93 if P2P_RUNTIME.get().is_some() {
94 tracing::info!("P2P runtime will shutdown automatically on process exit");
95 } else {
96 tracing::warn!("P2P runtime was not initialized");
97 }
98 Ok(())
99}
100pub fn init_p2p_runtime() -> &'static tokio::runtime::Runtime {
105 P2P_RUNTIME.get_or_init(|| {
106 let conf = CONF.get_or_init(Config::default);
107 let threads = conf.threads.num_worker_threads.max(2);
108 tokio::runtime::Builder::new_multi_thread()
109 .worker_threads(threads)
110 .enable_all()
111 .thread_name("xaeroflux-p2p")
112 .build()
113 .expect("Failed to create P2P runtime")
114 })
115}
116
117pub fn init_xaero_pool() {
118 XAERO_DISPATCHER_POOL.get_or_init(|| {
119 let conf = CONF.get_or_init(Config::default);
120 let no_of_worker_threads = conf.threads.num_worker_threads.max(2);
121 ThreadPool::new(no_of_worker_threads)
122 });
123}
124
125pub fn init_global_dispatcher_pool() {
130 DISPATCHER_POOL.get_or_init(|| {
131 let conf = CONF.get_or_init(Config::default);
132 let no_of_worker_threads = conf.threads.num_worker_threads.max(2);
133 ThreadPool::new(no_of_worker_threads)
134 });
135}
136
137pub fn init_global_io_pool() {
142 IO_POOL.get_or_init(|| {
143 let conf = CONF.get_or_init(Config::default);
144 let num_io_threads = conf.threads.num_io_threads.max(1);
145
146 ThreadPool::new(num_io_threads)
147 });
148}
149
150pub fn initialize() {
159 #[cfg(not(test))]
160 xaeroflux_core::size::init(); xaeroflux_core::size::init();
162 let project_root = env!("CARGO_MANIFEST_DIR");
163 let cfg_path = format!("{}/xaeroflux.toml", project_root);
164 unsafe { env::set_var("XAERO_CONFIG", &cfg_path) };
165 let config = load_config();
166 if config.name != "xaeroflux" {
167 panic!("Invalid config file. Expected 'xaeroflux'.");
168 }
169 init_global_dispatcher_pool();
170 init_global_io_pool();
171 init_xaero_pool();
172 init_logging(); show_banner();
174 info!("XaeroFlux initialized");
175}
176
177pub fn load_config() -> &'static config::Config {
185 CONF.get_or_init(|| {
186 let path = std::env::var("XAERO_CONFIG").unwrap_or_else(|_| "xaeroflux.toml".into());
187 let s = std::fs::read_to_string(path).expect("read config");
188 toml::from_str(&s).expect("parse config")
189 })
190}
191
192pub fn show_banner() {
196 info!("XaeroFlux initializing...");
197 let slant = FIGfont::standard().expect("load slant font");
198 let v = env!("CARGO_PKG_VERSION");
199 let x = format!("XAER0FLUX v. {v}");
200 let s = x.as_str();
201 let figure = slant.convert(s).expect("convert text");
202 tracing::info!("\n{}", figure);
203}
204#[cfg(test)]
205mod tests {
206 use rkyv::{Archive, Deserialize, Serialize, rancor::Failure};
207 use xaeroflux_core::event;
208
209 use super::*;
210
211 #[test]
212 fn test_initialize() {
213 initialize();
214 assert!(CONF.get().is_some());
215 assert_eq!(CONF.get().expect("failed to load config").name, "xaeroflux");
216 assert_eq!(CONF.get().expect("failed to load config").version, 1_u64);
217 }
218
219 #[test]
220 fn test_load_config() {
221 initialize();
222 let config = load_config();
223 assert_eq!(config.name, "xaeroflux");
224 assert_eq!(config.version, 1_u64);
225 }
226 #[test]
227 fn test_xaero_data() {
228 initialize();
229 #[derive(Archive, Serialize, Deserialize, Debug, Clone, Default)]
230 struct TestData {
231 id: u32,
232 name: String,
233 }
234 let data = TestData {
236 id: 1,
237 name: "Test".to_string(),
238 };
239 assert_eq!(data.id, 1);
240 assert_eq!(data.name, "Test");
241 let d = rkyv::to_bytes::<Failure>(&data).expect("failed to serialize");
243 assert!(!d.is_empty());
244 }
245 #[test]
246 fn test_event_type() {
247 initialize();
248 let e = event::EventType::SystemEvent(event::SystemEventKind::Start);
249 let event = event::EventType::from_u8(1);
250 assert_eq!(event, e);
251 }
252}