1#![allow(clippy::type_complexity)]
17#![feature(
18 once_cell_try,
19 result_flattening,
20 div_duration
21)]
22
23use std::{
24 backtrace::Backtrace,
25 future::Future,
26 marker::PhantomData,
27 sync::{
28 atomic::{AtomicBool, Ordering},
29 Arc, OnceLock, Weak,
30 },
31 thread::{panicking, JoinHandle},
32 time::{Duration, Instant},
33};
34
35pub mod logging;
36pub mod pubsub;
37pub mod rng;
38pub mod service;
39pub mod utils;
40
41pub use anyhow;
42use anyhow::Context;
43pub use async_trait::async_trait;
44pub use bytes;
45use config::Config;
46use crossbeam::queue::SegQueue;
47pub use log;
48use log::{debug, error, info, warn};
49pub use rayon;
50use serde::Deserialize;
51use sysinfo::Pid;
52pub use tokio;
53use tokio::{
54 io::{AsyncReadExt, AsyncWriteExt},
55 net::TcpListener,
56 runtime::Runtime,
57 sync::mpsc,
58 task::JoinSet,
59};
60
61use crate::logging::init_logger;
62
63#[derive(Clone, PartialEq, Eq)]
64enum Running {
65 No,
66 Yes(Arc<str>),
67 Ignored,
68}
69
70pub struct NodeIntrinsics<N: Node + ?Sized> {
76 running: Running,
77 _phantom: PhantomData<N>,
78}
79
80impl<N: Node + ?Sized> NodeIntrinsics<N> {
81 pub fn ignore_drop(&mut self) {
83 self.running = Running::Ignored;
84 }
85 pub fn manually_run(&mut self, name: Arc<str>) {
90 self.running = Running::Yes(name);
91 }
92}
93
94impl<N: Node + ?Sized> Default for NodeIntrinsics<N> {
95 fn default() -> Self {
96 Self {
97 running: Running::No,
98 _phantom: PhantomData,
99 }
100 }
101}
102
103impl<N: Node + ?Sized> Drop for NodeIntrinsics<N> {
104 fn drop(&mut self) {
105 match &self.running {
106 Running::No => warn!("{} was dropped without being ran!", N::DEFAULT_NAME),
107 Running::Yes(name) => {
108 if panicking() {
109 error!("{name} has panicked!");
110 }
111 }
112 Running::Ignored => {}
113 }
114 }
115}
116
117#[async_trait]
134pub trait Node: Send + 'static {
135 const DEFAULT_NAME: &'static str;
136
137 async fn run(self, context: RuntimeContext) -> anyhow::Result<()>;
153
154 fn get_intrinsics(&mut self) -> &mut NodeIntrinsics<Self>;
158}
159
160pub struct Application {
165 pending: Vec<
166 Box<
167 dyn FnOnce(
168 &mut JoinSet<anyhow::Result<()>>,
169 mpsc::UnboundedSender<Box<dyn FnOnce(&mut JoinSet<anyhow::Result<()>>) + Send>>,
170 ) + Send,
171 >,
172 >,
173 drop_check: DropCheck,
174}
175
176impl Application {
177 pub fn add_node<N: Node>(&mut self, mut node: N) {
179 let name: Arc<str> = Arc::from(N::DEFAULT_NAME.to_string().into_boxed_str());
180 let name2 = name.clone();
181 self.add_task_inner(
182 |x| {
183 node.get_intrinsics().running = Running::Yes(name2);
184 node.run(x)
185 },
186 name,
187 );
188 }
189
190 pub fn add_node_with_name<N: Node>(&mut self, mut node: N, name: impl Into<String>) {
192 let name: Arc<str> = Arc::from(name.into().into_boxed_str());
193 let name2 = name.clone();
194 self.add_task_inner(
195 |x| {
196 node.get_intrinsics().running = Running::Yes(name2);
197 node.run(x)
198 },
199 name,
200 );
201 }
202
203 pub fn add_future(
207 &mut self,
208 fut: impl Future<Output = anyhow::Result<()>> + Send + 'static,
209 name: impl Into<String>,
210 ) {
211 let name: Arc<str> = Arc::from(name.into().into_boxed_str());
212 self.pending.push(Box::new(move |join_set, _| {
213 join_set.spawn(async move {
214 fut.await
215 .with_context(|| format!("{name} has faced an error"))
216 });
217 }));
218 }
219
220 pub fn add_task<F: Future<Output = anyhow::Result<()>> + Send + 'static>(
226 &mut self,
227 f: impl FnOnce(RuntimeContext) -> F + Send + 'static,
228 name: impl Into<String>,
229 ) {
230 self.add_task_inner(f, Arc::from(name.into().into_boxed_str()));
231 }
232
233 fn add_task_inner<F: Future<Output = anyhow::Result<()>> + Send + 'static>(
234 &mut self,
235 f: impl FnOnce(RuntimeContext) -> F + Send + 'static,
236 name: Arc<str>,
237 ) {
238 self.pending.push(Box::new(move |join_set, node_sender| {
239 join_set.spawn(async move {
240 f(RuntimeContext {
241 name: name.clone(),
242 node_sender,
243 quit_on_drop: false,
244 })
245 .await
246 .with_context(|| format!("{name} has faced an error"))
247 });
248 }));
249 }
250
251 async fn run(self) -> anyhow::Result<()> {
252 let mut join_set = JoinSet::new();
253 let (node_sender, mut node_recv) = mpsc::unbounded_channel();
254
255 for pending in self.pending {
256 pending(&mut join_set, node_sender.clone());
257 }
258
259 loop {
260 tokio::select! {
261 pending = node_recv.recv() => (pending.unwrap())(&mut join_set),
262 result = join_set.join_next() => {
263 let Some(result) = result else {
264 info!("All nodes have terminated");
265 break Ok(());
266 };
267 match result {
268 Ok(Ok(())) => {}
269 Ok(Err(e)) => break Err(e),
270 Err(e) => {
271 error!("Faced the following error while trying to join with node task: {e}");
272 }
273 }
274 }
275 }
276 }
277 }
278
279 #[must_use]
282 pub fn get_main_thread_drop_check(&self) -> ObservingDropCheck {
283 self.drop_check.get_observing()
284 }
285}
286
287#[derive(Clone)]
293pub struct RuntimeContext {
294 name: Arc<str>,
295 node_sender: mpsc::UnboundedSender<Box<dyn FnOnce(&mut JoinSet<anyhow::Result<()>>) + Send>>,
296 quit_on_drop: bool,
297}
298
299impl RuntimeContext {
300 #[must_use]
302 pub fn get_name(&self) -> &Arc<str> {
303 &self.name
304 }
305
306 pub fn set_quit_on_drop(&mut self, value: bool) {
308 self.quit_on_drop = value;
309 }
310
311 pub fn spawn_node<N: Node>(&self, mut node: N) {
313 let mut new_context = self.clone();
314 let name: Arc<str> = Arc::from(N::DEFAULT_NAME.to_string().into_boxed_str());
315 new_context.name = name.clone();
316 let _ = self.node_sender.send(Box::new(|join_set| {
317 join_set.spawn(async {
318 node.get_intrinsics().running = Running::Yes(name.clone());
319 node.run(new_context)
320 .await
321 .with_context(move || format!("{name} has faced an error"))
322 });
323 }));
324 }
325
326 pub fn spawn_node_with_name<N: Node>(&self, mut node: N, name: impl Into<String>) {
328 let mut new_context = self.clone();
329 let name: Arc<str> = Arc::from(name.into().into_boxed_str());
330 new_context.name = name.clone();
331 let _ = self.node_sender.send(Box::new(|join_set| {
332 join_set.spawn(async {
333 node.get_intrinsics().running = Running::Yes(name.clone());
334 node.run(new_context)
335 .await
336 .with_context(move || format!("{name} has faced an error"))
337 });
338 }));
339 }
340}
341
342impl Drop for RuntimeContext {
343 fn drop(&mut self) {
344 if self.quit_on_drop {
345 let name = self.name.clone();
346 let _ = self.node_sender.send(Box::new(move |join_set| {
347 warn!("Quitting runtime from {name}...");
348 join_set.abort_all();
349 }));
350 }
351 }
352}
353
354#[derive(Clone)]
360pub struct DropCheck {
361 dropped: Arc<AtomicBool>,
362 update_on_drop: bool,
363}
364
365impl Default for DropCheck {
366 fn default() -> Self {
367 Self {
368 dropped: Arc::default(),
369 update_on_drop: true,
370 }
371 }
372}
373
374impl Drop for DropCheck {
375 fn drop(&mut self) {
376 if self.update_on_drop {
377 self.dropped.store(true, Ordering::SeqCst);
378 }
379 }
380}
381
382impl DropCheck {
383 #[must_use]
385 pub fn has_dropped(&self) -> bool {
386 self.dropped.load(Ordering::SeqCst)
387 }
388
389 pub fn reset(&self) {
391 self.dropped.store(true, Ordering::SeqCst);
392 }
393
394 pub fn update_on_drop(&mut self) {
396 self.update_on_drop = true;
397 }
398
399 pub fn dont_update_on_drop(&mut self) {
401 self.update_on_drop = true;
402 }
403
404 pub fn get_observing(&self) -> ObservingDropCheck {
406 ObservingDropCheck {
407 dropped: self.dropped.clone(),
408 }
409 }
410}
411
412#[derive(Clone)]
418pub struct ObservingDropCheck {
419 dropped: Arc<AtomicBool>,
420}
421
422impl ObservingDropCheck {
423 #[must_use]
425 pub fn has_dropped(&self) -> bool {
426 self.dropped.load(Ordering::SeqCst)
427 }
428}
429
430#[derive(Deserialize, Clone, Copy)]
432pub struct RunOptions {
433 #[serde(default)]
437 pub runtime_name: &'static str,
438
439 #[serde(default = "default_auxilliary_control")]
447 pub auxilliary_control: bool,
448
449 #[serde(default = "default_enable_console_subscriber")]
455 pub enable_console_subscriber: bool,
456}
457
458fn default_auxilliary_control() -> bool {
459 true
460}
461
462fn default_enable_console_subscriber() -> bool {
463 true
464}
465
466#[macro_export]
475macro_rules! super_panic {
476 () => {{
477 $crate::log::error!("super_panic was invoked from {}:{}", file!(), line!());
478 std::process::exit(1);
479 }};
480 ($($arg: tt)*) => {{
481 $crate::log::error!("super_panic was invoked from {}:{} due to {}", file!(), line!(), format!($($arg)*));
482 std::process::exit(1);
483 }}
484}
485
486#[macro_export]
494macro_rules! default_run_options {
495 () => {
496 $crate::RunOptions {
497 runtime_name: env!("CARGO_PKG_NAME"),
498 auxilliary_control: true,
499 enable_console_subscriber: true,
500 }
501 };
502}
503
504static THREADS: SegQueue<JoinHandle<()>> = SegQueue::new();
505static THREAD_DROP_CHECKS: SegQueue<Weak<Backtrace>> = SegQueue::new();
506
507pub fn spawn_persistent_thread<F>(f: F)
520where
521 F: FnOnce(),
522 F: Send + 'static,
523{
524 let count = THREAD_DROP_CHECKS.len();
525 if count >= 16 {
526 for _ in 0..count {
527 let current = THREAD_DROP_CHECKS.pop().unwrap();
528 if current.strong_count() > 0 {
529 THREAD_DROP_CHECKS.push(current);
530 }
531 }
532 }
533 let backtrace = Arc::new(Backtrace::force_capture());
534 THREAD_DROP_CHECKS.push(Arc::downgrade(&backtrace));
535 THREADS.push(std::thread::spawn(move || {
536 let _backtrace = backtrace;
537 f();
538 }));
539}
540
541pub fn asyncify_run<F, T>(f: F) -> impl Future<Output = anyhow::Result<T>>
546where
547 F: FnOnce() -> anyhow::Result<T>,
548 F: Send + 'static,
549 T: Send + 'static,
550{
551 let (tx, rx) = tokio::sync::oneshot::channel();
552 spawn_persistent_thread(move || {
553 let _ = tx.send(f());
554 });
555 async { rx.await.map_err(anyhow::Error::from).flatten() }
556}
557
558static CONFIG: OnceLock<Config> = OnceLock::new();
559
560pub fn get_env<'de, T: Deserialize<'de>>() -> anyhow::Result<T> {
562 CONFIG
563 .get_or_try_init(|| {
564 Config::builder()
565 .add_source(config::File::with_name(".env"))
567 .add_source(config::Environment::with_prefix(""))
568 .build()
569 })?
570 .clone()
571 .try_deserialize()
572 .map_err(Into::into)
573}
574
575enum EndCondition {
576 CtrlC,
577 Dropped,
578}
579
580pub fn start_unros_runtime<F: Future<Output = anyhow::Result<Application>> + Send + 'static>(
586 main: impl FnOnce(Application) -> F,
587 run_options: RunOptions,
588) -> anyhow::Result<()> {
589 let pid = std::process::id();
590 init_logger(&run_options)?;
591
592 std::thread::spawn(move || {
593 let mut sys = sysinfo::System::new();
594 let mut last_cpu_check = Instant::now();
595 let pid = Pid::from_u32(pid);
596 loop {
597 std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
598 sys.refresh_cpu();
599 sys.refresh_process(pid);
600 if last_cpu_check.elapsed().as_secs() < 3 {
601 continue;
602 }
603 let cpus = sys.cpus();
604 let usage = cpus.iter().map(sysinfo::Cpu::cpu_usage).sum::<f32>() / cpus.len() as f32;
605 if usage >= 80.0 {
606 if let Some(proc) = sys.process(pid) {
607 warn!(
608 "CPU Usage at {usage:.1}%. Process Usage: {:.1}%",
609 proc.cpu_usage() / cpus.len() as f32
610 );
611 } else {
612 warn!("CPU Usage at {usage:.1}%. Err checking process");
613 }
614 last_cpu_check = Instant::now();
615 }
616 }
617 });
618
619 let (end_sender, mut end_recv) = tokio::sync::mpsc::channel(1);
620 let end_sender2 = end_sender.clone();
621
622 ctrlc::set_handler(move || {
623 let _ = end_sender2.blocking_send(EndCondition::CtrlC);
624 })?;
625
626 let runtime = Runtime::new()?;
627 let ctrl_c_sender2 = end_sender.clone();
628 if run_options.auxilliary_control {
629 runtime.spawn(async move {
630 let tcp_listener = match TcpListener::bind("0.0.0.0:0").await {
631 Ok(x) => x,
632 Err(e) => {
633 debug!(target: "auxilliary-control", "Failed to initialize auxilliary control port: {e}");
634 return;
635 }
636 };
637
638 match tcp_listener.local_addr() {
639 Ok(addr) => debug!(target: "auxilliary-control", "Successfully binded to: {addr}"),
640 Err(e) => {
641 debug!(target: "auxilliary-control", "Failed to get local address of auxilliary control port: {e}");
642 return;
643 }
644 }
645
646 loop {
647 let mut stream = match tcp_listener.accept().await {
648 Ok(x) => x.0,
649 Err(e) => {
650 debug!(target: "auxilliary-control", "Failed to accept auxilliary control stream: {e}");
651 continue;
652 }
653 };
654 let end_sender = ctrl_c_sender2.clone();
655 tokio::spawn(async move {
656 let mut string_buf = Vec::with_capacity(1024);
657 let mut buf = [0u8; 1024];
658 loop {
659 macro_rules! write_all {
660 ($data: expr) => {
661 if let Err(e) = stream.write_all($data).await {
662 debug!(target: "auxilliary-control", "Failed to write to auxilliary control stream: {e}");
663 break;
664 }
665 }
666 }
667 match stream.read(&mut buf).await {
668 Ok(n) => {
669 string_buf.extend_from_slice(buf.split_at(n).0);
670 }
671 Err(e) => {
672 debug!(target: "auxilliary-control", "Failed to read from auxilliary control stream: {e}");
673 break;
674 }
675 }
676
677 let Ok(string) = std::str::from_utf8(&buf) else {
678 continue;
679 };
680 let Some(newline_idx) = string.find('\n') else {
681 continue;
682 };
683
684 let command = string.split_at(newline_idx).0;
685
686 match command {
687 "stop" => {
688 let _ = end_sender.send(EndCondition::CtrlC).await;
689 write_all!(b"Stopping...\n");
690 }
691 _ => write_all!(b"Unrecognized command"),
692 }
693
694 string_buf.drain(0..newline_idx);
695 }
696 });
697 }
698 });
699 }
700
701 runtime.block_on(async {
702 let fut = async {
703 let mut grp = Application {
704 pending: vec![],
705 drop_check: DropCheck::default(),
706 };
707 grp = tokio::spawn(main(grp)).await??;
708 grp.run().await
709 };
710 info!("Runtime started with pid: {pid}");
711 tokio::select! {
712 res = fut => res,
713 _ = end_recv.recv() => {
714 info!("Ctrl-C received");
715 Ok(())
716 },
717
718 }
719 })?;
720
721 info!("Exiting...");
722
723 std::thread::spawn(|| {
724 std::thread::sleep(Duration::from_secs(5));
725 while let Some(backtrace) = THREAD_DROP_CHECKS.pop() {
726 if let Some(backtrace) = backtrace.upgrade() {
727 warn!("The following persistent thread has not exited yet:\n{backtrace}");
728 }
729 }
730 });
731 let dropper = std::thread::spawn(move || {
732 drop(runtime);
733 while let Some(x) = THREADS.pop() {
734 if let Err(e) = x.join() {
735 error!("Failed to join thread: {e:?}");
736 }
737 }
738 let _ = end_sender.blocking_send(EndCondition::Dropped);
739 });
740
741 match end_recv.blocking_recv().unwrap() {
742 EndCondition::CtrlC => warn!("Ctrl-C received. Force exiting..."),
743 EndCondition::Dropped => dropper.join().unwrap(),
744 }
745
746 Ok(())
747}