catalyzer/internals/
runtime.rs1pub(crate) use tokio::runtime::Runtime as TokioRuntime;
2use tokio::runtime::Builder as TokioRuntimeBuilder;
3use core::future::Future;
4use utils::*;
5use crate::*;
6
7#[derive(Debug)]
12pub struct CatalyzerRuntime {
13 tokio: TokioRuntime
14}
15
16#[derive(Debug)]
18pub struct CatalyzerRuntimeBuilder {
19 tokio: Option<TokioRuntime>,
20}
21
22impl CatalyzerRuntime {
23 fn default_preinit() -> Result<CatalyzerRuntime> {
24 #[cfg(feature = "builtin-logger")]
25 {
26 let log_level = std::env::var("CATALYZER_LOG_LEVEL").unwrap_or("info".to_string());
27 let log_level = log_level.parse().unwrap_or(log::LevelFilter::Info);
28 let mut l = ::builtin_logger::SimpleLogger::new()
29 .with_level(log_level);
30 #[cfg(debug_assertions)]
31 { l = l.with_colors(true); }
32 #[cfg(not(debug_assertions))]
33 { l = l.with_colors(false); }
34 let _ = l.init();
35 }
36 use std::sync::atomic::{AtomicU8, Ordering};
37 static ATOMIC_ID: AtomicU8 = AtomicU8::new(0);
38 CatalyzerRuntime::builder()
39 .setup_tokio(|b|
40 b.enable_all()
41 .thread_name_fn(|| {
42 let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst);
43 format!("Catalyzer Runtime Worker #{id}")
44 })
45 )?
46 .build()
47 }
48 #[inline]
50 pub fn builder() -> CatalyzerRuntimeBuilder {
51 CatalyzerRuntimeBuilder {
52 tokio: None,
53 }
54 }
55 pub fn init(func: Option<fn() -> Result<Self>>) -> Self {
57 match func.map_or_else(Self::default_preinit, |f| f()) {
58 Err(e) => {
59 log::error!("Failed to initialize runtime: {}", e);
60 std::process::exit(1);
61 }
62 Ok(rt) => rt,
63 }
64 }
65 pub fn run<F, Fut>(self, f: F) where
83 Fut: Future<Output = Result>,
84 F: FnOnce() -> Fut,
85 {
86 let (sender, reciever) = tokio::sync::oneshot::channel::<()>();
87 let mercy_handlers = async {
88 tokio::select! {
89 _ = signals::ctrl_c() => {
90 log::info!("Received Ctrl+C, shutting down...");
91 },
92 _ = signals::term() => {
93 log::info!("Received SIGTERM, shutting down...");
94 },
95 }
96 tokio::select! {
97 _ = signals::ctrl_c() => {},
98 _ = signals::term() => {},
99 }
100 log::warn!("Received second signal, please mercy...");
101 if let Err(_) = sender.send(()) {
102 log::error!("Failed to emit mercy signal, shutting down...");
103 std::process::exit(1);
104 }
105 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
106 log::error!("Mercy timeout reached, shutting down...");
107 std::process::exit(1);
108 };
109 self.tokio.spawn(mercy_handlers);
110 self.tokio.block_on(async move {
111 tokio::select! {
112 _ = f() => {
113 log::debug!("Webserver shutdown successfully!");
114 },
115 _ = reciever => {
116 log::trace!("Received mercy signal, shutting down...");
117 },
118 }
119 });
120 self.tokio.shutdown_timeout(tokio::time::Duration::from_secs(5));
121 log::info!("Shutdown successful!");
122 }
123}
124
125impl CatalyzerRuntimeBuilder {
126 pub fn setup_tokio<F>(mut self, f: F) -> Result<Self> where
142 F: FnOnce(&mut TokioRuntimeBuilder) -> &mut TokioRuntimeBuilder,
143 {
144 let mut builder = TokioRuntimeBuilder::new_multi_thread();
145 f(&mut builder);
146 builder.build()
147 .map(|t| { self.tokio = Some(t); self})
148 .map_auto()
149 }
150 pub fn build(self) -> Result<CatalyzerRuntime> {
154 let tokio = self.tokio.ok_or(CatalyzerError::RuntimeInitializationError)?;
155 Ok(CatalyzerRuntime { tokio, })
156 }
157}
158
159pub(crate) mod signals {
160 use tokio::signal;
161 pub(crate) async fn ctrl_c() {
162 if let Err(_) = signal::ctrl_c().await {
163 log::error!("Failed to install signal handler");
164 std::process::exit(1);
165 }
166 }
167 #[cfg(unix)]
168 pub(crate) async fn term() {
169 match signal::unix::signal(signal::unix::SignalKind::terminate()) {
170 Ok(mut stream) => { stream.recv().await; },
171 Err(e) => {
172 log::error!("Failed to install signal handler: {}", e);
173 std::process::exit(1);
174 },
175 }
176 }
177 #[cfg(not(unix))]
178 pub(crate) async fn term() {
179 core::future::pending::<()>().await;
180 }
181}