near_actix_test_utils/
lib.rs

1use actix_rt::signal;
2use futures::{future, select, task::Poll, FutureExt};
3use std::sync::atomic::{AtomicBool, Ordering};
4
5pub struct ShutdownableThread {
6    pub join: Option<std::thread::JoinHandle<()>>,
7    pub actix_system: actix_rt::System,
8}
9
10impl ShutdownableThread {
11    pub fn start<F>(_name: &'static str, f: F) -> ShutdownableThread
12    where
13        F: FnOnce() + Send + 'static,
14    {
15        let (tx, rx) = std::sync::mpsc::channel();
16        let join = std::thread::spawn(move || {
17            run_actix(async move {
18                f();
19                tx.send(actix_rt::System::current()).unwrap();
20            });
21        });
22
23        let actix_system = rx.recv().unwrap();
24        ShutdownableThread { join: Some(join), actix_system }
25    }
26
27    pub fn shutdown(&self) {
28        self.actix_system.stop();
29    }
30}
31
32impl Drop for ShutdownableThread {
33    fn drop(&mut self) {
34        self.shutdown();
35        self.join.take().unwrap().join().unwrap();
36    }
37}
38
39static CAUGHT_SIGINT: AtomicBool = AtomicBool::new(false);
40
41macro_rules! handle_interrupt {
42    ($future:expr) => {
43        async move {
44            assert!(!CAUGHT_SIGINT.load(Ordering::SeqCst), "SIGINT received");
45            select! {
46                _ = {
47                    future::poll_fn(|_| {
48                        if CAUGHT_SIGINT.load(Ordering::SeqCst) {
49                            return Poll::Ready(());
50                        }
51                        Poll::Pending
52                    })
53                }.fuse() => panic!("SIGINT received"),
54                _ = $future.fuse() => {},
55            }
56        }
57    };
58}
59
60#[inline]
61pub fn spawn_interruptible<F: std::future::Future + 'static>(
62    f: F,
63) -> actix_rt::task::JoinHandle<()> {
64    actix_rt::spawn(handle_interrupt!(f))
65}
66
67pub fn run_actix<F: std::future::Future>(f: F) {
68    static SET_PANIC_HOOK: std::sync::Once = std::sync::Once::new();
69
70    // This is a workaround to make actix/tokio runtime stop when a task panics.
71    // See: https://github.com/actix/actix-net/issues/80
72    SET_PANIC_HOOK.call_once(|| {
73        let default_hook = std::panic::take_hook();
74        std::panic::set_hook(Box::new(move |info| {
75            if actix_rt::System::is_registered() {
76                let exit_code = if CAUGHT_SIGINT.load(Ordering::SeqCst) { 130 } else { 1 };
77                actix_rt::System::current().stop_with_code(exit_code);
78            }
79            default_hook(info);
80        }));
81    });
82
83    static TRAP_SIGINT_HOOK: std::sync::Once = std::sync::Once::new();
84
85    // This is a workaround to ensure all threads get the exit memo.
86    // Plainly polling ctrl_c() on busy threads like ours can be problematic.
87    TRAP_SIGINT_HOOK.call_once(|| {
88        std::thread::Builder::new()
89            .name("SIGINT trap".into())
90            .spawn(|| {
91                let sys = actix_rt::System::new();
92                sys.block_on(async {
93                    signal::ctrl_c().await.expect("failed to listen for SIGINT");
94                    CAUGHT_SIGINT.store(true, Ordering::SeqCst);
95                });
96                sys.run().unwrap();
97            })
98            .expect("failed to spawn SIGINT handler thread");
99    });
100
101    let sys = actix_rt::System::new();
102    sys.block_on(handle_interrupt!(f));
103    sys.run().unwrap();
104}