Skip to main content

ntex_rt/
signals.rs

1#![allow(static_mut_refs)]
2use std::{cell::RefCell, future::poll_fn, sync::Arc, task::Poll};
3
4use atomic_waker::AtomicWaker;
5
6use crate::System;
7
8thread_local! {
9    static STOP: RefCell<Option<oneshot::Sender<()>>> = const { RefCell::new(None) };
10    static HANDLERS: RefCell<Vec<oneshot::Sender<Arc<[Signal]>>>> = RefCell::default();
11}
12
13static mut CUR_SYS: Option<System> = None;
14static mut SIGS: [Option<Signal>; 10] = [None; 10];
15static HND_WAKER: AtomicWaker = AtomicWaker::new();
16
17/// Different types of process signals
18#[derive(PartialEq, Eq, Clone, Copy, Debug)]
19pub enum Signal {
20    /// SIGHUP
21    Hup,
22    /// SIGINT
23    Int,
24    /// SIGTERM
25    Term,
26    /// SIGQUIT
27    Quit,
28}
29
30/// Register signal handler.
31///
32/// Signals are handled by oneshots, you have to re-register
33/// interest after each signal.
34pub fn signal() -> oneshot::AsyncReceiver<Arc<[Signal]>> {
35    let (tx, rx) = oneshot::async_channel();
36    System::current().handle().spawn(async move {
37        HANDLERS.with(|handlers| {
38            handlers.borrow_mut().push(tx);
39        });
40    });
41
42    rx
43}
44
45fn register_system(sys: &System) -> bool {
46    unsafe {
47        if CUR_SYS.is_some() {
48            false
49        } else {
50            CUR_SYS = Some(sys.clone());
51
52            let (tx, rx) = oneshot::async_channel();
53            sys.handle().spawn(signals(rx));
54            STOP.with(|stop| {
55                *stop.borrow_mut() = Some(tx);
56            });
57            true
58        }
59    }
60}
61
62fn unregister_system(sys: &System) -> bool {
63    unsafe {
64        if let Some(cur) = CUR_SYS.take() {
65            if cur.id() == sys.id() {
66                sys.handle().spawn(async move {
67                    STOP.with(|stop| {
68                        if let Some(tx) = stop.borrow_mut().take() {
69                            let _ = tx.send(());
70                        }
71                    });
72                });
73                true
74            } else {
75                CUR_SYS = Some(cur);
76                false
77            }
78        } else {
79            false
80        }
81    }
82}
83
84fn handle_signal(sig: Signal) {
85    unsafe {
86        for s in &mut SIGS {
87            if s.is_none() {
88                *s = Some(sig);
89                break;
90            }
91        }
92        HND_WAKER.wake();
93    }
94}
95
96#[cfg(target_family = "unix")]
97static mut SIG_HANDLERS: [Option<signal_hook::SigId>; 10] = [None; 10];
98
99#[cfg(target_family = "unix")]
100/// Register signal handler.
101pub(crate) fn start(sys: &System) {
102    if register_system(sys) {
103        use signal_hook::consts::signal::{SIGHUP, SIGINT, SIGQUIT, SIGTERM, SIGUSR2};
104        use signal_hook::low_level::register;
105
106        for (idx, s, sig) in [
107            (0, SIGHUP, Signal::Hup),
108            (1, SIGINT, Signal::Int),
109            (2, SIGTERM, Signal::Term),
110            (3, SIGQUIT, Signal::Quit),
111        ] {
112            unsafe {
113                match register(s, move || handle_signal(sig)) {
114                    Ok(s) => SIG_HANDLERS[idx] = Some(s),
115                    Err(e) => {
116                        log::error!("Cannot install signal handler for {sig:?} with {e:?}");
117                    }
118                }
119            }
120        }
121
122        unsafe {
123            match register(SIGUSR2, || crate::system::sig_usr2()) {
124                Ok(s) => SIG_HANDLERS[5] = Some(s),
125                Err(_) => log::error!("Cannot install signal handler for SIGUSR2"),
126            }
127        }
128    }
129}
130
131#[cfg(target_family = "unix")]
132/// Unregister signal handler.
133pub(crate) fn stop(sys: &System) {
134    if unregister_system(sys) {
135        use signal_hook::low_level::unregister;
136
137        unsafe {
138            for sig in &mut SIG_HANDLERS {
139                if let Some(s) = sig.take() {
140                    let _ = unregister(s);
141                }
142            }
143        }
144    }
145}
146
147#[cfg(target_family = "windows")]
148/// Register signal handler.
149///
150/// Signals are handled by oneshots, you have to re-register
151/// after each signal.
152pub(crate) fn start(sys: &System) {
153    if register_system(sys) {
154        ctrlc::set_handler(move || handle_signal(Signal::Int))
155            .expect("Error setting Ctrl-C handler");
156    }
157}
158
159#[cfg(target_family = "windows")]
160/// Unregister signal handler.
161pub(crate) fn stop(sys: &System) {
162    if unregister_system(sys) {
163        log::info!("Signals handling is disabled");
164    }
165}
166
167async fn signals(rx: oneshot::AsyncReceiver<()>) {
168    let mut rx = std::pin::pin!(rx);
169
170    poll_fn(|cx| {
171        if rx.as_mut().poll(cx).is_ready() {
172            Poll::Ready(())
173        } else {
174            HND_WAKER.register(cx.waker());
175
176            let mut sigs = Vec::new();
177            unsafe {
178                for sig in &mut SIGS {
179                    if let Some(sig) = sig.take() {
180                        sigs.push(sig);
181                    }
182                }
183            }
184            if !sigs.is_empty() {
185                let sigs: Arc<[Signal]> = Arc::from(sigs);
186
187                HANDLERS.with(|handlers| {
188                    for tx in handlers.borrow_mut().drain(..) {
189                        let _ = tx.send(sigs.clone());
190                    }
191                });
192            }
193
194            Poll::Pending
195        }
196    })
197    .await;
198}