catalyzer/internals/
runtime.rs

1pub(crate) use tokio::runtime::Runtime as TokioRuntime;
2use tokio::runtime::Builder as TokioRuntimeBuilder;
3use core::future::Future;
4use utils::*;
5use crate::*;
6
7/// A runtime for the Catalyzer framework.
8/// 
9/// You most likely won't need to use this directly,
10/// as everything is handled by the `#[main]` macro.
11#[derive(Debug)]
12pub struct CatalyzerRuntime {
13    tokio: TokioRuntime
14}
15
16/// A builder for the [`CatalyzerRuntime`](crate::internals::runtime::CatalyzerRuntime).
17#[derive(Debug)]
18pub struct CatalyzerRuntimeBuilder {
19    tokio: Option<TokioRuntime>,
20}
21
22impl CatalyzerRuntime {
23    fn default_preinit() -> Result<CatalyzerRuntime> {
24        #[cfg(feature = "builtin-logger")]
25        {
26            let log_level = std::env::var("CATALYZER_LOG_LEVEL").unwrap_or("info".to_string());
27            let log_level = log_level.parse().unwrap_or(log::LevelFilter::Info);
28            let mut l = ::builtin_logger::SimpleLogger::new()
29                .with_level(log_level);
30            #[cfg(debug_assertions)]
31            { l = l.with_colors(true); }
32            #[cfg(not(debug_assertions))]
33            { l = l.with_colors(false); }
34            let _ = l.init();
35        }
36        use std::sync::atomic::{AtomicU8, Ordering};
37        static ATOMIC_ID: AtomicU8 = AtomicU8::new(0);
38        CatalyzerRuntime::builder()
39            .setup_tokio(|b|
40                b.enable_all()
41                .thread_name_fn(|| {
42                    let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
43                    format!("Catalyzer Runtime Worker #{id}")
44                })
45            )?
46            .build()
47    }
48    /// Creates a new builder for the runtime.
49    #[inline]
50    pub fn builder() -> CatalyzerRuntimeBuilder {
51        CatalyzerRuntimeBuilder {
52            tokio: None,
53        }
54    }
55    /// Initializes the runtime with an optional custom initialization function.
56    pub fn init(func: Option<fn() -> Result<Self>>) -> Self {
57        match func.map_or_else(Self::default_preinit, |f| f()) {
58            Err(e) => {
59                log::error!("Failed to initialize runtime: {}", e);
60                std::process::exit(1);
61            }
62            Ok(rt) => rt,
63        }
64    }
65    /// Runs the given future on the runtime.
66    /// 
67    /// This function will also install signal handlers for Ctrl+C and SIGTERM.
68    ///
69    /// # Example
70    /// 
71    /// ```rust
72    /// # use catalyzer::internals::runtime::CatalyzerRuntime;
73    /// # use catalyzer::Result;
74    /// fn main() {
75    ///     async fn main() -> Result {
76    ///         // Your code here
77    ///         Ok(())
78    ///     }
79    ///     CatalyzerRuntime::init(None).run(main);
80    /// }
81    /// ```
82    pub fn run<F, Fut>(self, f: F) where
83        Fut: Future<Output = Result>,
84        F: FnOnce() -> Fut,
85    {
86        let (sender, reciever) = tokio::sync::oneshot::channel::<()>();
87        let mercy_handlers = async {
88            tokio::select! {
89                _ = signals::ctrl_c() => {
90                    log::info!("Received Ctrl+C, shutting down...");
91                },
92                _ = signals::term() => {
93                    log::info!("Received SIGTERM, shutting down...");
94                },
95            }
96            tokio::select! {
97                _ = signals::ctrl_c() => {},
98                _ = signals::term() => {},
99            }
100            log::warn!("Received second signal, please mercy...");
101            if let Err(_) = sender.send(()) {
102                log::error!("Failed to emit mercy signal, shutting down...");
103                std::process::exit(1);
104            }
105            tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
106            log::error!("Mercy timeout reached, shutting down...");
107            std::process::exit(1);
108        };
109        self.tokio.spawn(mercy_handlers);
110        self.tokio.block_on(async move {
111            tokio::select! {
112                _ = f() => {
113                    log::debug!("Webserver shutdown successfully!");
114                },
115                _ = reciever => {
116                    log::trace!("Received mercy signal, shutting down...");
117                },
118            }
119        });
120        self.tokio.shutdown_timeout(tokio::time::Duration::from_secs(5));
121        log::info!("Shutdown successful!");
122    }
123}
124
125impl CatalyzerRuntimeBuilder {
126    /// Allows you to set up the Tokio runtime.
127    /// 
128    /// This function is chainable.
129    /// 
130    /// # Example
131    /// 
132    /// ```rust
133    /// # use catalyzer::internals::runtime::CatalyzerRuntimeBuilder;
134    /// # use catalyzer::Result;
135    /// # fn main() -> Result {
136    /// CatalyzerRuntime::builder()
137    ///     .setup_tokio(|b| b.enable_all())?
138    ///     .build()
139    /// # ;
140    /// # }
141    pub fn setup_tokio<F>(mut self, f: F) -> Result<Self> where
142        F: FnOnce(&mut TokioRuntimeBuilder) -> &mut TokioRuntimeBuilder,
143    {
144        let mut builder = TokioRuntimeBuilder::new_multi_thread();
145        f(&mut builder);
146        builder.build()
147            .map(|t| { self.tokio = Some(t); self})
148            .map_auto()
149    }
150    /// Builds the [`CatalyzerRuntime`](crate::internals::runtime::CatalyzerRuntime).
151    /// 
152    /// This function consumes the builder, and returns a runtime.
153    pub fn build(self) -> Result<CatalyzerRuntime> {
154        let tokio = self.tokio.ok_or(CatalyzerError::RuntimeInitializationError)?;
155        Ok(CatalyzerRuntime { tokio, })
156    }
157}
158
159pub(crate) mod signals {
160    use tokio::signal;
161    pub(crate) async fn ctrl_c() {
162        if let Err(_) = signal::ctrl_c().await {
163            log::error!("Failed to install signal handler");
164            std::process::exit(1);
165        }
166    }
167    #[cfg(unix)]
168    pub(crate) async fn term() {
169        match signal::unix::signal(signal::unix::SignalKind::terminate()) {
170            Ok(mut stream) => { stream.recv().await; },
171            Err(e) => {
172                log::error!("Failed to install signal handler: {}", e);
173                std::process::exit(1);
174            },
175        }
176    }
177    #[cfg(not(unix))]
178    pub(crate) async fn term() {
179        core::future::pending::<()>().await;
180    }
181}