xaeroflux_core/
lib.rs

1//! Core initialization and utilities for xaeroflux.
2//!
3//! This module provides:
4//! - Global configuration loading and access (`load_config`, `CONF`).
5//! - Initialization of global thread pools for dispatch and I/O.
6//! - Serialization and deserialization helpers for rkyv.
7//! - Application startup (`initialize`) with logging and banner.
8#![feature(trivial_bounds)]
9pub mod config;
10pub mod date_time;
11pub mod event;
12pub mod hash;
13pub mod keys;
14pub mod logs;
15pub mod merkle_tree;
16pub mod pipe;
17pub mod pool;
18pub mod size;
19pub mod sys;
20pub mod system_paths;
21pub mod vector_clock;
22pub mod vector_clock_actor;
23
24use std::{
25    env,
26    sync::{
27        OnceLock,
28        atomic::{AtomicU64, Ordering},
29    },
30};
31
32use figlet_rs::FIGfont;
33use threadpool::ThreadPool;
34use tracing::info;
35
36use crate as xaeroflux_core;
37pub use crate::{config::Config, logs::init_logging};
38
39pub static NEXT_ID: AtomicU64 = AtomicU64::new(1);
40
41/// Returns a unique, thread-safe `u64` ID.
42pub fn next_id() -> u64 {
43    NEXT_ID.fetch_add(1, Ordering::SeqCst)
44}
45/// Global, singleton configuration instance.
46///
47/// Initialized by `load_config` and reused thereafter.
48pub static CONF: OnceLock<config::Config> = OnceLock::new();
49
50/// Global thread pool for dispatching work to worker threads.
51pub static DISPATCHER_POOL: OnceLock<ThreadPool> = OnceLock::new();
52
53/// Global thread pool for performing I/O-bound tasks.
54pub static IO_POOL: OnceLock<ThreadPool> = OnceLock::new();
55
56/// `XaeroEvent` thread pool for dispatching work to worker threads.
57pub static XAERO_DISPATCHER_POOL: OnceLock<ThreadPool> = OnceLock::new();
58
59/// Global runtime for peer-to-peer networking tasks.
60pub static P2P_RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
61
62// Automatically initializes on first access
63pub fn shutdown_all_pools() -> Result<(), Box<dyn std::error::Error>> {
64    let dpo = DISPATCHER_POOL.get();
65    match dpo.as_ref() {
66        None => {
67            tracing::warn!("Dispatcher pool cannot be killed because it is unavailable!")
68        }
69        Some(dp) => {
70            dp.join();
71        }
72    }
73
74    let iopo = IO_POOL.get();
75    match iopo.as_ref() {
76        None => {
77            tracing::warn!("Dispatcher pool cannot be killed because it is unavailable!")
78        }
79        Some(iop) => {
80            iop.join();
81        }
82    }
83
84    let xdpo = XAERO_DISPATCHER_POOL.get();
85    match xdpo.as_ref() {
86        None => {
87            tracing::warn!("Dispatcher pool cannot be killed because it is unavailable!")
88        }
89        Some(xdp) => {
90            xdp.join();
91        }
92    }
93    if P2P_RUNTIME.get().is_some() {
94        tracing::info!("P2P runtime will shutdown automatically on process exit");
95    } else {
96        tracing::warn!("P2P runtime was not initialized");
97    }
98    Ok(())
99}
100/// Initializes the global P2P Tokio runtime.
101///
102/// Uses the `threads.num_worker_threads` setting from configuration,
103/// defaulting to at least one thread, and stores it in `P2P_RUNTIME`.
104pub fn init_p2p_runtime() -> &'static tokio::runtime::Runtime {
105    P2P_RUNTIME.get_or_init(|| {
106        let conf = CONF.get_or_init(Config::default);
107        let threads = conf.threads.num_worker_threads.max(2);
108        tokio::runtime::Builder::new_multi_thread()
109            .worker_threads(threads)
110            .enable_all()
111            .thread_name("xaeroflux-p2p")
112            .build()
113            .expect("Failed to create P2P runtime")
114    })
115}
116
117pub fn init_xaero_pool() {
118    XAERO_DISPATCHER_POOL.get_or_init(|| {
119        let conf = CONF.get_or_init(Config::default);
120        let no_of_worker_threads = conf.threads.num_worker_threads.max(2);
121        ThreadPool::new(no_of_worker_threads)
122    });
123}
124
125/// Initializes the global dispatcher thread pool.
126///
127/// Uses the `threads.num_worker_threads` setting from configuration,
128/// defaulting to at least one thread, and stores it in `DISPATCHER_POOL`.
129pub fn init_global_dispatcher_pool() {
130    DISPATCHER_POOL.get_or_init(|| {
131        let conf = CONF.get_or_init(Config::default);
132        let no_of_worker_threads = conf.threads.num_worker_threads.max(2);
133        ThreadPool::new(no_of_worker_threads)
134    });
135}
136
137/// Initializes the global I/O thread pool.
138///
139/// Uses the `threads.num_io_threads` setting from configuration,
140/// defaulting to at least one thread, and stores it in `IO_POOL`.
141pub fn init_global_io_pool() {
142    IO_POOL.get_or_init(|| {
143        let conf = CONF.get_or_init(Config::default);
144        let num_io_threads = conf.threads.num_io_threads.max(1);
145
146        ThreadPool::new(num_io_threads)
147    });
148}
149
150/// Perform global initialization of xaeroflux core.
151///
152/// - Loads and validates configuration (`xaeroflux.toml`).
153/// - Initializes dispatcher and I/O thread pools.
154/// - Sets up logging and displays startup banner.
155///
156/// # Panics
157/// Will panic if the configuration name is not "xaeroflux".
158pub fn initialize() {
159    #[cfg(not(test))]
160    xaeroflux_core::size::init(); // Initialize the size module
161    xaeroflux_core::size::init();
162    let project_root = env!("CARGO_MANIFEST_DIR");
163    let cfg_path = format!("{}/xaeroflux.toml", project_root);
164    unsafe { env::set_var("XAERO_CONFIG", &cfg_path) };
165    let config = load_config();
166    if config.name != "xaeroflux" {
167        panic!("Invalid config file. Expected 'xaeroflux'.");
168    }
169    init_global_dispatcher_pool();
170    init_global_io_pool();
171    init_xaero_pool();
172    init_logging(); // Initialize the logging system
173    show_banner();
174    info!("XaeroFlux initialized");
175}
176
177/// Load or retrieve the global configuration.
178///
179/// Reads the `XAERO_CONFIG` environment variable or defaults to
180/// `xaeroflux.toml` in the project root, parses it via `toml`.
181///
182/// # Panics
183/// Will panic if the file cannot be read or parsed.
184pub fn load_config() -> &'static config::Config {
185    CONF.get_or_init(|| {
186        let path = std::env::var("XAERO_CONFIG").unwrap_or_else(|_| "xaeroflux.toml".into());
187        let s = std::fs::read_to_string(path).expect("read config");
188        toml::from_str(&s).expect("parse config")
189    })
190}
191
192/// Display the ASCII art banner for xaeroflux startup.
193///
194/// Uses FIGfont to render "XAER0FLUX v.{version}" and logs it.
195pub fn show_banner() {
196    info!("XaeroFlux initializing...");
197    let slant = FIGfont::standard().expect("load slant font");
198    let v = env!("CARGO_PKG_VERSION");
199    let x = format!("XAER0FLUX v. {v}");
200    let s = x.as_str();
201    let figure = slant.convert(s).expect("convert text");
202    tracing::info!("\n{}", figure);
203}
204#[cfg(test)]
205mod tests {
206    use rkyv::{Archive, Deserialize, Serialize, rancor::Failure};
207    use xaeroflux_core::event;
208
209    use super::*;
210
211    #[test]
212    fn test_initialize() {
213        initialize();
214        assert!(CONF.get().is_some());
215        assert_eq!(CONF.get().expect("failed to load config").name, "xaeroflux");
216        assert_eq!(CONF.get().expect("failed to load config").version, 1_u64);
217    }
218
219    #[test]
220    fn test_load_config() {
221        initialize();
222        let config = load_config();
223        assert_eq!(config.name, "xaeroflux");
224        assert_eq!(config.version, 1_u64);
225    }
226    #[test]
227    fn test_xaero_data() {
228        initialize();
229        #[derive(Archive, Serialize, Deserialize, Debug, Clone, Default)]
230        struct TestData {
231            id: u32,
232            name: String,
233        }
234        // No explicit implementation needed as the blanket implementation covers this.
235        let data = TestData {
236            id: 1,
237            name: "Test".to_string(),
238        };
239        assert_eq!(data.id, 1);
240        assert_eq!(data.name, "Test");
241        // serialize
242        let d = rkyv::to_bytes::<Failure>(&data).expect("failed to serialize");
243        assert!(!d.is_empty());
244    }
245    #[test]
246    fn test_event_type() {
247        initialize();
248        let e = event::EventType::SystemEvent(event::SystemEventKind::Start);
249        let event = event::EventType::from_u8(1);
250        assert_eq!(event, e);
251    }
252}