near_actix_test_utils/
lib.rs1use actix_rt::signal;
2use futures::{future, select, task::Poll, FutureExt};
3use std::sync::atomic::{AtomicBool, Ordering};
4
5pub struct ShutdownableThread {
6 pub join: Option<std::thread::JoinHandle<()>>,
7 pub actix_system: actix_rt::System,
8}
9
10impl ShutdownableThread {
11 pub fn start<F>(_name: &'static str, f: F) -> ShutdownableThread
12 where
13 F: FnOnce() + Send + 'static,
14 {
15 let (tx, rx) = std::sync::mpsc::channel();
16 let join = std::thread::spawn(move || {
17 run_actix(async move {
18 f();
19 tx.send(actix_rt::System::current()).unwrap();
20 });
21 });
22
23 let actix_system = rx.recv().unwrap();
24 ShutdownableThread { join: Some(join), actix_system }
25 }
26
27 pub fn shutdown(&self) {
28 self.actix_system.stop();
29 }
30}
31
32impl Drop for ShutdownableThread {
33 fn drop(&mut self) {
34 self.shutdown();
35 self.join.take().unwrap().join().unwrap();
36 }
37}
38
39static CAUGHT_SIGINT: AtomicBool = AtomicBool::new(false);
40
41macro_rules! handle_interrupt {
42 ($future:expr) => {
43 async move {
44 assert!(!CAUGHT_SIGINT.load(Ordering::SeqCst), "SIGINT received");
45 select! {
46 _ = {
47 future::poll_fn(|_| {
48 if CAUGHT_SIGINT.load(Ordering::SeqCst) {
49 return Poll::Ready(());
50 }
51 Poll::Pending
52 })
53 }.fuse() => panic!("SIGINT received"),
54 _ = $future.fuse() => {},
55 }
56 }
57 };
58}
59
60#[inline]
61pub fn spawn_interruptible<F: std::future::Future + 'static>(
62 f: F,
63) -> actix_rt::task::JoinHandle<()> {
64 actix_rt::spawn(handle_interrupt!(f))
65}
66
67pub fn run_actix<F: std::future::Future>(f: F) {
68 static SET_PANIC_HOOK: std::sync::Once = std::sync::Once::new();
69
70 SET_PANIC_HOOK.call_once(|| {
73 let default_hook = std::panic::take_hook();
74 std::panic::set_hook(Box::new(move |info| {
75 if actix_rt::System::is_registered() {
76 let exit_code = if CAUGHT_SIGINT.load(Ordering::SeqCst) { 130 } else { 1 };
77 actix_rt::System::current().stop_with_code(exit_code);
78 }
79 default_hook(info);
80 }));
81 });
82
83 static TRAP_SIGINT_HOOK: std::sync::Once = std::sync::Once::new();
84
85 TRAP_SIGINT_HOOK.call_once(|| {
88 std::thread::Builder::new()
89 .name("SIGINT trap".into())
90 .spawn(|| {
91 let sys = actix_rt::System::new();
92 sys.block_on(async {
93 signal::ctrl_c().await.expect("failed to listen for SIGINT");
94 CAUGHT_SIGINT.store(true, Ordering::SeqCst);
95 });
96 sys.run().unwrap();
97 })
98 .expect("failed to spawn SIGINT handler thread");
99 });
100
101 let sys = actix_rt::System::new();
102 sys.block_on(handle_interrupt!(f));
103 sys.run().unwrap();
104}