unros_core/
lib.rs

1//! Unros is an experimental alternative to the ROS 1 & 2 frameworks.
2//!
3//! It is written from the ground up in Rust and seeks to replicate most
4//! of the common functionality in ROS while adding some extra features
5//! that exploit Rust's abilities.
6//!
7//! This crate contains the core functionality which defines what this
8//! framework offers:
9//!
10//! 1. The Node trait
11//! 2. A complete logging system
12//! 3. An asynchronous Node runtime
13//! 4. Publisher and Subscribers (analagous to ROS publisher and subscribers)
14//! 5. The Service framework (analagous to ROS actions and services)
15
16#![allow(clippy::type_complexity)]
17#![feature(
18    once_cell_try,
19    result_flattening,
20    div_duration
21)]
22
23use std::{
24    backtrace::Backtrace,
25    future::Future,
26    marker::PhantomData,
27    sync::{
28        atomic::{AtomicBool, Ordering},
29        Arc, OnceLock, Weak,
30    },
31    thread::{panicking, JoinHandle},
32    time::{Duration, Instant},
33};
34
35pub mod logging;
36pub mod pubsub;
37pub mod rng;
38pub mod service;
39pub mod utils;
40
41pub use anyhow;
42use anyhow::Context;
43pub use async_trait::async_trait;
44pub use bytes;
45use config::Config;
46use crossbeam::queue::SegQueue;
47pub use log;
48use log::{debug, error, info, warn};
49pub use rayon;
50use serde::Deserialize;
51use sysinfo::Pid;
52pub use tokio;
53use tokio::{
54    io::{AsyncReadExt, AsyncWriteExt},
55    net::TcpListener,
56    runtime::Runtime,
57    sync::mpsc,
58    task::JoinSet,
59};
60
61use crate::logging::init_logger;
62
63#[derive(Clone, PartialEq, Eq)]
64enum Running {
65    No,
66    Yes(Arc<str>),
67    Ignored,
68}
69
70/// An object that all Nodes must store.
71/// 
72/// This object allows Unros to track the state of the Node,
73/// specifically if it was dropped before being added to the Application
74/// or if the thread running this node has panicked.
75pub struct NodeIntrinsics<N: Node + ?Sized> {
76    running: Running,
77    _phantom: PhantomData<N>,
78}
79
80impl<N: Node + ?Sized> NodeIntrinsics<N> {
81    /// Do not warn if the Node was dropped without being added to the Application.
82    pub fn ignore_drop(&mut self) {
83        self.running = Running::Ignored;
84    }
85    /// Explicitly state that this Node has already started running.
86    /// 
87    /// If the thread panics, an error will be printed from now on. This is useful
88    /// if you're wrapping around another Node that will not be added to the Application.
89    pub fn manually_run(&mut self, name: Arc<str>) {
90        self.running = Running::Yes(name);
91    }
92}
93
94impl<N: Node + ?Sized> Default for NodeIntrinsics<N> {
95    fn default() -> Self {
96        Self {
97            running: Running::No,
98            _phantom: PhantomData,
99        }
100    }
101}
102
103impl<N: Node + ?Sized> Drop for NodeIntrinsics<N> {
104    fn drop(&mut self) {
105        match &self.running {
106            Running::No => warn!("{} was dropped without being ran!", N::DEFAULT_NAME),
107            Running::Yes(name) => {
108                if panicking() {
109                    error!("{name} has panicked!");
110                }
111            }
112            Running::Ignored => {}
113        }
114    }
115}
116
117/// A Node just represents a long running task.
118///
119/// Nodes are only required to run once, and may terminate at any point in time.
120/// Nodes in ROS also serve as forms of isolation. If a thread faces an exception
121/// while running code in ROS, other code in the same thread will stop executing.
122/// Developers would then segment their code into nodes such that each node could
123/// operate in a different thread.
124///
125/// Rust allows developers to make stronger guarantees regarding when and where
126/// their code will panic. As such, Nodes are expected to never panic. Instead,
127/// they must return an `anyhow::Error` when facing an unrecoverable error, or
128/// log using the `error!` macro if normal functionality can be continued.
129///
130/// In the event that a Node panics, the thread running the node will not be taken
131/// down, nor will any other node. An error message including the name of the node
132/// that panicked will be logged. Even so, panics should be avoided.
133#[async_trait]
134pub trait Node: Send + 'static {
135    const DEFAULT_NAME: &'static str;
136
137    /// The entry point of the node.
138    ///
139    /// Nodes are always expected to be asynchronous, as asynchronous code is much
140    /// easier to manage.
141    ///
142    /// If a node needs to run blocking code, it is recommended to use `asyncify_run`
143    /// instead of `rayon::spawn` or `std::thread::spawn`, as `asyncify_run` allows you
144    /// to await the spawned thread in a non-blocking way. If you spawn a thread and do not
145    /// wait on it, you may accidentally exit this method while threads are still running.
146    /// While this is not unsafe or incorrect, it can lead to misleading logs. Unros automatically
147    /// logs all nodes whose `run` methods have returned as terminated, even if they have spawned
148    /// threads that are still running.
149    ///
150    /// Do keep in mind that `asyncify_run` threads do not terminate if not awaited or dropped,
151    /// which relates back to the issue previously mentioned.
152    async fn run(self, context: RuntimeContext) -> anyhow::Result<()>;
153
154    /// Get a mutable reference to the `NodeIntrinsics` in this Node.
155    /// 
156    /// Implementors only need to store 1 `NodeIntrinsics` object.
157    fn get_intrinsics(&mut self) -> &mut NodeIntrinsics<Self>;
158}
159
160/// Configuration for an `Application` that will be ran by Unros.
161/// 
162/// Nodes and tasks added to this `Application` will not run until this
163/// object is returned back to Unros.
164pub struct Application {
165    pending: Vec<
166        Box<
167            dyn FnOnce(
168                    &mut JoinSet<anyhow::Result<()>>,
169                    mpsc::UnboundedSender<Box<dyn FnOnce(&mut JoinSet<anyhow::Result<()>>) + Send>>,
170                ) + Send,
171        >,
172    >,
173    drop_check: DropCheck,
174}
175
176impl Application {
177    /// Add a `Node` to the application with its default name.
178    pub fn add_node<N: Node>(&mut self, mut node: N) {
179        let name: Arc<str> = Arc::from(N::DEFAULT_NAME.to_string().into_boxed_str());
180        let name2 = name.clone();
181        self.add_task_inner(
182            |x| {
183                node.get_intrinsics().running = Running::Yes(name2);
184                node.run(x)
185            },
186            name,
187        );
188    }
189
190    /// Add a `Node` to the application with the given name.
191    pub fn add_node_with_name<N: Node>(&mut self, mut node: N, name: impl Into<String>) {
192        let name: Arc<str> = Arc::from(name.into().into_boxed_str());
193        let name2 = name.clone();
194        self.add_task_inner(
195            |x| {
196                node.get_intrinsics().running = Running::Yes(name2);
197                node.run(x)
198            },
199            name,
200        );
201    }
202
203    /// Add a `Future` to this `Application`.
204    /// 
205    /// It will not be polled until this `Application` is ran.
206    pub fn add_future(
207        &mut self,
208        fut: impl Future<Output = anyhow::Result<()>> + Send + 'static,
209        name: impl Into<String>,
210    ) {
211        let name: Arc<str> = Arc::from(name.into().into_boxed_str());
212        self.pending.push(Box::new(move |join_set, _| {
213            join_set.spawn(async move {
214                fut.await
215                    .with_context(|| format!("{name} has faced an error"))
216            });
217        }));
218    }
219
220    /// Add a task to this `Application`.
221    /// 
222    /// A task in this context is a function that returns a `Future`. This function
223    /// will not be called until the `Application` is ran. The function will be given
224    /// the `RuntimeContext` as its only parameter.
225    pub fn add_task<F: Future<Output = anyhow::Result<()>> + Send + 'static>(
226        &mut self,
227        f: impl FnOnce(RuntimeContext) -> F + Send + 'static,
228        name: impl Into<String>,
229    ) {
230        self.add_task_inner(f, Arc::from(name.into().into_boxed_str()));
231    }
232
233    fn add_task_inner<F: Future<Output = anyhow::Result<()>> + Send + 'static>(
234        &mut self,
235        f: impl FnOnce(RuntimeContext) -> F + Send + 'static,
236        name: Arc<str>,
237    ) {
238        self.pending.push(Box::new(move |join_set, node_sender| {
239            join_set.spawn(async move {
240                f(RuntimeContext {
241                    name: name.clone(),
242                    node_sender,
243                    quit_on_drop: false,
244                })
245                .await
246                .with_context(|| format!("{name} has faced an error"))
247            });
248        }));
249    }
250
251    async fn run(self) -> anyhow::Result<()> {
252        let mut join_set = JoinSet::new();
253        let (node_sender, mut node_recv) = mpsc::unbounded_channel();
254
255        for pending in self.pending {
256            pending(&mut join_set, node_sender.clone());
257        }
258
259        loop {
260            tokio::select! {
261                pending = node_recv.recv() => (pending.unwrap())(&mut join_set),
262                result = join_set.join_next() => {
263                    let Some(result) = result else {
264                        info!("All nodes have terminated");
265                        break Ok(());
266                    };
267                    match result {
268                        Ok(Ok(())) => {}
269                        Ok(Err(e)) => break Err(e),
270                        Err(e) => {
271                            error!("Faced the following error while trying to join with node task: {e}");
272                        }
273                    }
274                }
275            }
276        }
277    }
278
279    /// Gets an `ObservingDropCheck` for the main thread that can be used to check
280    /// if the Unros runtime is exiting.
281    #[must_use]
282    pub fn get_main_thread_drop_check(&self) -> ObservingDropCheck {
283        self.drop_check.get_observing()
284    }
285}
286
287/// A reference to the runtime that is currently running.
288///
289/// The typical way of receiving this is through the `run` method
290/// of `Node`. As such, the runtime in question is the runtime that
291/// is currently running the node.
292#[derive(Clone)]
293pub struct RuntimeContext {
294    name: Arc<str>,
295    node_sender: mpsc::UnboundedSender<Box<dyn FnOnce(&mut JoinSet<anyhow::Result<()>>) + Send>>,
296    quit_on_drop: bool,
297}
298
299impl RuntimeContext {
300    /// Get the name of the node that received this `RuntimeContext`.
301    #[must_use]
302    pub fn get_name(&self) -> &Arc<str> {
303        &self.name
304    }
305
306    /// If set to `true`, the entire Unros runtime will exit if this `RuntimeContext` is dropped.
307    pub fn set_quit_on_drop(&mut self, value: bool) {
308        self.quit_on_drop = value;
309    }
310
311    /// Spawn a new node into the runtime that the runtime will keep track of.
312    pub fn spawn_node<N: Node>(&self, mut node: N) {
313        let mut new_context = self.clone();
314        let name: Arc<str> = Arc::from(N::DEFAULT_NAME.to_string().into_boxed_str());
315        new_context.name = name.clone();
316        let _ = self.node_sender.send(Box::new(|join_set| {
317            join_set.spawn(async {
318                node.get_intrinsics().running = Running::Yes(name.clone());
319                node.run(new_context)
320                    .await
321                    .with_context(move || format!("{name} has faced an error"))
322            });
323        }));
324    }
325
326    /// Spawn a new node into the runtime that the runtime will keep track of.
327    pub fn spawn_node_with_name<N: Node>(&self, mut node: N, name: impl Into<String>) {
328        let mut new_context = self.clone();
329        let name: Arc<str> = Arc::from(name.into().into_boxed_str());
330        new_context.name = name.clone();
331        let _ = self.node_sender.send(Box::new(|join_set| {
332            join_set.spawn(async {
333                node.get_intrinsics().running = Running::Yes(name.clone());
334                node.run(new_context)
335                    .await
336                    .with_context(move || format!("{name} has faced an error"))
337            });
338        }));
339    }
340}
341
342impl Drop for RuntimeContext {
343    fn drop(&mut self) {
344        if self.quit_on_drop {
345            let name = self.name.clone();
346            let _ = self.node_sender.send(Box::new(move |join_set| {
347                warn!("Quitting runtime from {name}...");
348                join_set.abort_all();
349            }));
350        }
351    }
352}
353
354/// A simple primitive for tracking when clones of itself have been dropped.
355/// 
356/// Clones of this are all connected such that if any clone is dropped, all other
357/// clones will be aware of that. For an object that only tracks if its clones were
358/// dropped without updating them when itself is dropped, refer to `ObservingDropCheck`.
359#[derive(Clone)]
360pub struct DropCheck {
361    dropped: Arc<AtomicBool>,
362    update_on_drop: bool,
363}
364
365impl Default for DropCheck {
366    fn default() -> Self {
367        Self {
368            dropped: Arc::default(),
369            update_on_drop: true,
370        }
371    }
372}
373
374impl Drop for DropCheck {
375    fn drop(&mut self) {
376        if self.update_on_drop {
377            self.dropped.store(true, Ordering::SeqCst);
378        }
379    }
380}
381
382impl DropCheck {
383    /// Returns true iff a clone has been dropped.
384    #[must_use]
385    pub fn has_dropped(&self) -> bool {
386        self.dropped.load(Ordering::SeqCst)
387    }
388
389    /// Forget if a clone has been dropped.
390    pub fn reset(&self) {
391        self.dropped.store(true, Ordering::SeqCst);
392    }
393
394    /// Ensures that this `DropCheck` will update its clones when dropped.
395    pub fn update_on_drop(&mut self) {
396        self.update_on_drop = true;
397    }
398
399    /// Ensures that this `DropCheck` will *not* update its clones when dropped.
400    pub fn dont_update_on_drop(&mut self) {
401        self.update_on_drop = true;
402    }
403
404    /// Get an observer to this `DropCheck` and its clones.
405    pub fn get_observing(&self) -> ObservingDropCheck {
406        ObservingDropCheck {
407            dropped: self.dropped.clone(),
408        }
409    }
410}
411
412/// A similar object to `DropCheck`, however, none of its clones
413/// will be updated when this is dropped.
414/// 
415/// This is equivalent to calling `dont_update_on_drop` on `DropCheck`,
416/// except that this is enforced statically.
417#[derive(Clone)]
418pub struct ObservingDropCheck {
419    dropped: Arc<AtomicBool>,
420}
421
422impl ObservingDropCheck {
423    /// Returns true iff a clone has been dropped.
424    #[must_use]
425    pub fn has_dropped(&self) -> bool {
426        self.dropped.load(Ordering::SeqCst)
427    }
428}
429
430/// Configurations for the runtime
431#[derive(Deserialize, Clone, Copy)]
432pub struct RunOptions {
433    /// The name of this runtime.
434    ///
435    /// This changes what the sub-logging directory name is.
436    #[serde(default)]
437    pub runtime_name: &'static str,
438
439    /// Whether or not auxilliary control should be enabled.
440    ///
441    /// Auxilliary control is a way for the current runtime
442    /// to be controlled externally, such as from another program.
443    /// This is typically used to terminate the runtime remotely
444    /// when the interface to the program running the runtime has been
445    /// lost.
446    #[serde(default = "default_auxilliary_control")]
447    pub auxilliary_control: bool,
448
449    /// Enablinng console subscriber allows you to view the state of
450    /// each `tokio` task as the program is running. However, under
451    /// certain circumstances (such as running in `examples`) may lead
452    /// to an irrecoverable panic from `console-subscriber`. To avoid this,
453    /// simply set this field to `false`.
454    #[serde(default = "default_enable_console_subscriber")]
455    pub enable_console_subscriber: bool,
456}
457
458fn default_auxilliary_control() -> bool {
459    true
460}
461
462fn default_enable_console_subscriber() -> bool {
463    true
464}
465
466/// A safe way to terminate the program with ample logged information.
467///
468/// Panics do not get logged, and panics intentionally are not able to terminate
469/// the runtime. The only way to terminate the program forcefully, other than pressing Ctrl-C
470/// twice, is to use `std::process::exit`, which this macro uses, on top of some helpful logging.
471///
472/// Usage is discouraged if clean exits are required, but this macro is one of the fastest ways
473/// to terminate *all* threads of the program to ensure that all computations stop immediately.
474#[macro_export]
475macro_rules! super_panic {
476    () => {{
477        $crate::log::error!("super_panic was invoked from {}:{}", file!(), line!());
478        std::process::exit(1);
479    }};
480    ($($arg: tt)*) => {{
481        $crate::log::error!("super_panic was invoked from {}:{} due to {}", file!(), line!(), format!($($arg)*));
482        std::process::exit(1);
483    }}
484}
485
486/// Creates a default `RunOptions`.
487///
488/// This macro was created instead of implementing `Default`
489/// so that the crate calling this macro can have its name
490/// used as the `runtime_name`.
491///
492/// By default, `auxilliary_control` is `true`.
493#[macro_export]
494macro_rules! default_run_options {
495    () => {
496        $crate::RunOptions {
497            runtime_name: env!("CARGO_PKG_NAME"),
498            auxilliary_control: true,
499            enable_console_subscriber: true,
500        }
501    };
502}
503
504static THREADS: SegQueue<JoinHandle<()>> = SegQueue::new();
505static THREAD_DROP_CHECKS: SegQueue<Weak<Backtrace>> = SegQueue::new();
506
507/// Spawns a thread that is guaranteed to run the given closure to completion.
508///
509/// There is a caveat, and that is if the program is killed, this
510/// function cannot do anything.
511///
512/// Functionally, this just spawns a thread that will always be joined before the
513/// main thread exits, *assuming* that you call `start_unros_runtime`.
514/// 
515/// There is no mechanism to terminate this thread when the runtime is exiting, so
516/// that is up to you. If your thread does not terminate affter some time after exiting,
517/// a backtrace will be printed, allowing you to identify which of your persistent threads
518/// is stuck. *As such, this function is expensive to call.*
519pub fn spawn_persistent_thread<F>(f: F)
520where
521    F: FnOnce(),
522    F: Send + 'static,
523{
524    let count = THREAD_DROP_CHECKS.len();
525    if count >= 16 {
526        for _ in 0..count {
527            let current = THREAD_DROP_CHECKS.pop().unwrap();
528            if current.strong_count() > 0 {
529                THREAD_DROP_CHECKS.push(current);
530            }
531        }
532    }
533    let backtrace = Arc::new(Backtrace::force_capture());
534    THREAD_DROP_CHECKS.push(Arc::downgrade(&backtrace));
535    THREADS.push(std::thread::spawn(move || {
536        let _backtrace = backtrace;
537        f();
538    }));
539}
540
541/// Spawns a blocking thread (using `spawn_persistent_thread`) that can be awaited on.
542/// 
543/// Dropping or cancelling the future is not a valid way to terminate the thread. Refer
544/// to `spawn_persistent_thread` for more information.
545pub fn asyncify_run<F, T>(f: F) -> impl Future<Output = anyhow::Result<T>>
546where
547    F: FnOnce() -> anyhow::Result<T>,
548    F: Send + 'static,
549    T: Send + 'static,
550{
551    let (tx, rx) = tokio::sync::oneshot::channel();
552    spawn_persistent_thread(move || {
553        let _ = tx.send(f());
554    });
555    async { rx.await.map_err(anyhow::Error::from).flatten() }
556}
557
558static CONFIG: OnceLock<Config> = OnceLock::new();
559
560/// Deserialize environment variables and dhe default config file into the given generic type.
561pub fn get_env<'de, T: Deserialize<'de>>() -> anyhow::Result<T> {
562    CONFIG
563        .get_or_try_init(|| {
564            Config::builder()
565                // Add in `./Settings.toml`
566                .add_source(config::File::with_name(".env"))
567                .add_source(config::Environment::with_prefix(""))
568                .build()
569        })?
570        .clone()
571        .try_deserialize()
572        .map_err(Into::into)
573}
574
575enum EndCondition {
576    CtrlC,
577    Dropped,
578}
579
580/// The main entry point to an Unros runtime.
581/// 
582/// The easiest way to use this is to use the `#[unros::main]` procedural macro that works
583/// very similarly to `#[tokio::main]`. Most functionality in this library depends on this
584/// function being called *exactly once.*
585pub fn start_unros_runtime<F: Future<Output = anyhow::Result<Application>> + Send + 'static>(
586    main: impl FnOnce(Application) -> F,
587    run_options: RunOptions,
588) -> anyhow::Result<()> {
589    let pid = std::process::id();
590    init_logger(&run_options)?;
591
592    std::thread::spawn(move || {
593        let mut sys = sysinfo::System::new();
594        let mut last_cpu_check = Instant::now();
595        let pid = Pid::from_u32(pid);
596        loop {
597            std::thread::sleep(sysinfo::MINIMUM_CPU_UPDATE_INTERVAL);
598            sys.refresh_cpu();
599            sys.refresh_process(pid);
600            if last_cpu_check.elapsed().as_secs() < 3 {
601                continue;
602            }
603            let cpus = sys.cpus();
604            let usage = cpus.iter().map(sysinfo::Cpu::cpu_usage).sum::<f32>() / cpus.len() as f32;
605            if usage >= 80.0 {
606                if let Some(proc) = sys.process(pid) {
607                    warn!(
608                        "CPU Usage at {usage:.1}%. Process Usage: {:.1}%",
609                        proc.cpu_usage() / cpus.len() as f32
610                    );
611                } else {
612                    warn!("CPU Usage at {usage:.1}%. Err checking process");
613                }
614                last_cpu_check = Instant::now();
615            }
616        }
617    });
618
619    let (end_sender, mut end_recv) = tokio::sync::mpsc::channel(1);
620    let end_sender2 = end_sender.clone();
621
622    ctrlc::set_handler(move || {
623        let _ = end_sender2.blocking_send(EndCondition::CtrlC);
624    })?;
625
626    let runtime = Runtime::new()?;
627    let ctrl_c_sender2 = end_sender.clone();
628    if run_options.auxilliary_control {
629        runtime.spawn(async move {
630            let tcp_listener = match TcpListener::bind("0.0.0.0:0").await {
631                Ok(x) => x,
632                Err(e) => {
633                    debug!(target: "auxilliary-control", "Failed to initialize auxilliary control port: {e}");
634                    return;
635                }
636            };
637
638            match tcp_listener.local_addr() {
639                Ok(addr) => debug!(target: "auxilliary-control", "Successfully binded to: {addr}"),
640                Err(e) => {
641                    debug!(target: "auxilliary-control", "Failed to get local address of auxilliary control port: {e}");
642                    return;
643                }
644            }
645
646            loop {
647                let mut stream = match tcp_listener.accept().await {
648                    Ok(x) => x.0,
649                    Err(e) => {
650                        debug!(target: "auxilliary-control", "Failed to accept auxilliary control stream: {e}");
651                        continue;
652                    }
653                };
654                let end_sender = ctrl_c_sender2.clone();
655                tokio::spawn(async move {
656                    let mut string_buf = Vec::with_capacity(1024);
657                    let mut buf = [0u8; 1024];
658                    loop {
659                        macro_rules! write_all {
660                            ($data: expr) => {
661                                if let Err(e) = stream.write_all($data).await {
662                                    debug!(target: "auxilliary-control", "Failed to write to auxilliary control stream: {e}");
663                                    break;
664                                }
665                            }
666                        }
667                        match stream.read(&mut buf).await {
668                            Ok(n) => {
669                                string_buf.extend_from_slice(buf.split_at(n).0);
670                            }
671                            Err(e) => {
672                                debug!(target: "auxilliary-control", "Failed to read from auxilliary control stream: {e}");
673                                break;
674                            }
675                        }
676
677                        let Ok(string) = std::str::from_utf8(&buf) else {
678                            continue;
679                        };
680                        let Some(newline_idx) = string.find('\n') else {
681                            continue;
682                        };
683
684                        let command = string.split_at(newline_idx).0;
685
686                        match command {
687                            "stop" => {
688                                let _ = end_sender.send(EndCondition::CtrlC).await;
689                                write_all!(b"Stopping...\n");
690                            }
691                            _ => write_all!(b"Unrecognized command"),
692                        }
693
694                        string_buf.drain(0..newline_idx);
695                    }
696                });
697            }
698        });
699    }
700
701    runtime.block_on(async {
702        let fut = async {
703            let mut grp = Application {
704                pending: vec![],
705                drop_check: DropCheck::default(),
706            };
707            grp = tokio::spawn(main(grp)).await??;
708            grp.run().await
709        };
710        info!("Runtime started with pid: {pid}");
711        tokio::select! {
712            res = fut => res,
713            _ = end_recv.recv() => {
714                info!("Ctrl-C received");
715                Ok(())
716            },
717
718        }
719    })?;
720
721    info!("Exiting...");
722
723    std::thread::spawn(|| {
724        std::thread::sleep(Duration::from_secs(5));
725        while let Some(backtrace) = THREAD_DROP_CHECKS.pop() {
726            if let Some(backtrace) = backtrace.upgrade() {
727                warn!("The following persistent thread has not exited yet:\n{backtrace}");
728            }
729        }
730    });
731    let dropper = std::thread::spawn(move || {
732        drop(runtime);
733        while let Some(x) = THREADS.pop() {
734            if let Err(e) = x.join() {
735                error!("Failed to join thread: {e:?}");
736            }
737        }
738        let _ = end_sender.blocking_send(EndCondition::Dropped);
739    });
740
741    match end_recv.blocking_recv().unwrap() {
742        EndCondition::CtrlC => warn!("Ctrl-C received. Force exiting..."),
743        EndCondition::Dropped => dropper.join().unwrap(),
744    }
745
746    Ok(())
747}