native_timer/
unix.rs

1use std::{
2    time::Duration,
3    sync::mpsc::{channel, Sender},
4    ffi::{c_void, CString},
5    ptr, mem, thread, sync
6};
7use libc::{c_int, sigaction, sigevent, sigval, sigemptyset, siginfo_t, size_t, strerror, SIGRTMIN, SIGEV_THREAD_ID,
8           syscall, SYS_gettid, timer_create, itimerspec, timespec, c_long, timer_settime, timer_t, timer_delete, CLOCK_REALTIME};
9use sync_wait_object::WaitEvent;
10use crate::{
11    CallbackHint, Result, TimerError,
12    common::MutWrapper
13};
14use crate::common::MutCallable;
15
16// ------------------------------------- DATA STRUCTURE & MARKERS -------------------------------------
17pub struct TimerQueueCore {
18    timer_queue: Sender<TimerCreationUnsafeRequest>,
19    quick_dispatcher: Sender<MutWrapperUnsafeRepr>
20}
21
22#[doc = include_str!("../docs/TimerQueue.md")]
23pub struct TimerQueue(sync::Arc<TimerQueueCore>);
24
25#[doc = include_str!("../docs/Timer.md")]
26pub struct Timer<'h> {
27    handle: Option<timer_t>,
28    callback: Box<MutWrapper<'h>>
29}
30
31type MutWrapperUnsafeRepr = usize;
32type TimerHandleUnsafeRepr = usize;
33type TimerHandleResult = Result<TimerHandleUnsafeRepr>;
34
35struct TimerCreationUnsafeRequest {
36    due: Duration,
37    period: Duration,
38    callback_ref: MutWrapperUnsafeRepr,
39    signal: Sender<TimerHandleResult>
40}
41
42// ----------------------------------------- FUNCTIONS --------------------------------------------------
43fn to_error(err_no: c_int) -> TimerError {
44    assert_ne!(err_no, 0);
45    let message = unsafe { CString::from_raw(strerror(err_no)) };
46    TimerError::OsError(err_no as isize, message.into_string().unwrap())
47}
48
49#[inline] fn get_errno() -> TimerError {
50    to_error(unsafe { *libc::__errno_location() })
51}
52
53fn to_result(ret: c_int) -> Result<()> {
54    if ret == 0 { Ok(()) }
55    else { Err(get_errno()) }
56}
57
58fn close_timer(handle: timer_t, callback: &MutWrapper) -> Result<()> {
59    let acceptable_execution_time = match callback.hint {
60        Some(CallbackHint::SlowFunction(d)) => d,
61        _ => crate::DEFAULT_ACCEPTABLE_EXECUTION_TIME
62    };
63    callback.mark_delete();
64
65    change_period(handle, Duration::ZERO, Duration::ZERO)?;
66    callback.wait_idle(acceptable_execution_time)?;
67    unsafe { to_result(timer_delete(handle)) }
68}
69
70fn change_period(handle: timer_t, due: Duration, period: Duration) -> Result<()> {
71    let interval = itimerspec {
72        it_value: to_timespec(due),
73        it_interval: to_timespec(period)
74    };
75    unsafe { to_result(timer_settime(handle, 0, &interval, ptr::null_mut())) }
76}
77
78fn to_timespec(value: Duration) -> timespec {
79    let ns = value.as_nanos();
80    let secs = ns / 1_000_000_000;
81    let pure_ns = ns - (secs * 1_000_000_000);
82    timespec { tv_sec: secs as c_long, tv_nsec: pure_ns as c_long }
83}
84
85// ----------------------------------------- IMPLEMENTATIONS --------------------------------------------------
86static DEFAULT_QUEUE_ONCE: sync::Once = sync::Once::new();
87static mut DEFAULT_QUEUE: Option<TimerQueue> = None;
88
89impl TimerQueue {
90    /// Create a new TimerQueue
91    pub fn new() -> Self {
92        let (dispatcher, receiver) = channel::<TimerCreationUnsafeRequest>();
93        thread::spawn(move || {
94            for req in receiver {
95                let timer = Self::schedule_signal_callback(req.due, req.period, req.callback_ref);
96                let message = timer.map(|t| t as TimerHandleUnsafeRepr);
97                req.signal.send(message).unwrap();
98            }
99        });
100
101        let (quick_dispatcher, quick_queue) = channel();
102        thread::spawn(move || {
103            for ctx in quick_queue {
104                Self::unsafe_call(ctx);
105            }
106        });
107        TimerQueue(sync::Arc::new(TimerQueueCore{
108            timer_queue: dispatcher, quick_dispatcher
109        }))
110    }
111
112    /// Default OS common timer queue
113    pub fn default() -> &'static TimerQueue {
114        unsafe {
115            DEFAULT_QUEUE_ONCE.call_once(|| {
116                DEFAULT_QUEUE = Some(Self::new());
117            });
118            DEFAULT_QUEUE.as_ref().unwrap()
119        }
120    }
121
122    #[doc = include_str!("../docs/TimerQueue_schedule_timer.md")]
123    pub fn schedule_timer<'h, F>(&self, due: Duration, period: Duration, hint: Option<CallbackHint>, handler: F) -> Result<Timer<'h>>
124        where F: FnMut() + Send + 'h
125    {
126        let callback = Box::new(MutWrapper::new(self.0.clone(), hint, handler));
127        let timer_unsafe = self.create_timer(due, period, &callback)?;
128
129        timer_unsafe.map(|t| Timer::<'h> {
130            handle: Some(t as timer_t),
131            callback
132        })
133    }
134
135    #[doc = include_str!("../docs/TimerQueue_schedule_oneshot.md")]
136    pub fn schedule_oneshot<'h, F>(&self, due: Duration, hint: Option<CallbackHint>, handler: F) -> Result<Timer<'h>>
137        where F: FnOnce() + Send + 'h
138    {
139        let callback = Box::new(MutWrapper::new_once(self.0.clone(), hint, handler));
140        let timer_unsafe = self.create_timer(due, Duration::ZERO, &callback)?;
141
142        timer_unsafe.map(|t| Timer::<'h> {
143            handle: Some(t as timer_t),
144            callback
145        })
146    }
147
148    #[doc = include_str!("../docs/TimerQueue_fire_oneshot.md")]
149    pub fn fire_oneshot<F>(&self, due: Duration, hint: Option<CallbackHint>, handler: F) -> Result<()>
150    where F: FnOnce() + Send + 'static
151    {
152        let journal: WaitEvent<Option<(TimerHandleUnsafeRepr, MutWrapperUnsafeRepr)>> = WaitEvent::new_init(None);
153        let mut journal_write = journal.clone();
154        let wrapper = move || {
155            handler();
156            let (handle, callback_ptr) = journal.wait_reset(None, || None, |v| v.is_some()).unwrap().unwrap();
157
158            // TODO use a common thread to clean up?
159            thread::spawn(move || {
160                let callback = unsafe { Box::from_raw(callback_ptr as *mut MutWrapper) };
161                close_timer(handle as timer_t, &callback).unwrap();
162            });
163        };
164
165        let callback = Box::new(MutWrapper::new_once(self.0.clone(), hint, wrapper));
166        let timer_unsafe = self.create_timer(due, Duration::ZERO, &callback)?;
167
168        let callback_ptr = Box::into_raw(callback) as MutWrapperUnsafeRepr;
169
170        match timer_unsafe {
171            Ok(handle) => journal_write.set_state(Some((handle, callback_ptr))).map_err(|e| e.into()),
172            Err(e) => Err(e.into())
173        }
174    }
175
176    #[inline]
177    pub(crate) fn new_with_context(context: sync::Arc<TimerQueueCore>) -> Self {
178        TimerQueue(context)
179    }
180
181    fn dispatch_quick_call(&self, ctx: MutWrapperUnsafeRepr) -> Result<()> {
182        self.0.quick_dispatcher.send(ctx).map_err(|_| TimerError::SynchronizationBroken)
183    }
184
185    fn create_timer<'h>(&self, due: Duration, period: Duration, callback: &Box<MutWrapper<'h>>) -> Result<TimerHandleResult> {
186        let callback_ref = callback.as_ref() as *const MutWrapper as MutWrapperUnsafeRepr;
187        let (signal, timer_receiver) = channel();
188        let unsafe_request = TimerCreationUnsafeRequest { due, period, callback_ref, signal };
189
190        self.0.timer_queue.send(unsafe_request).unwrap();
191
192        match timer_receiver.recv() {
193            Err(_) => Err(TimerError::SynchronizationBroken),
194            Ok(thm) => Ok(thm)
195        }
196    }
197
198    fn schedule_signal_callback(due: Duration, period: Duration, callback_ref: MutWrapperUnsafeRepr) -> Result<timer_t>
199    {
200        unsafe {
201            let mut sa_mask = mem::zeroed();
202            to_result(sigemptyset(&mut sa_mask))?;
203            let sa = sigaction {
204                sa_flags: libc::SA_SIGINFO,
205                sa_sigaction: Self::timer_callback as size_t,
206                sa_mask,
207                sa_restorer: None
208            };
209            to_result(sigaction(SIGRTMIN(), &sa, ptr::null_mut()))?;
210
211            let mut sev: sigevent = mem::zeroed();
212            sev.sigev_value = sigval {
213                sival_ptr: callback_ref as *mut c_void
214            };
215            sev.sigev_signo = SIGRTMIN();
216            sev.sigev_notify = SIGEV_THREAD_ID;
217            sev.sigev_notify_thread_id = syscall(SYS_gettid) as i32;
218            let mut timer = ptr::null_mut();
219            to_result(timer_create(CLOCK_REALTIME, &mut sev, &mut timer))?;
220
221            let interval = itimerspec {
222                it_value: to_timespec(due),
223                it_interval: to_timespec(period)
224            };
225
226            to_result(timer_settime(timer, 0, &interval, ptr::null_mut()))?;
227            Ok(timer)
228        }
229    }
230
231    fn unsafe_call(ctx: MutWrapperUnsafeRepr) {
232        let wrapper = unsafe { &mut *(ctx as *mut MutWrapper) };
233        if let Err(e) = wrapper.call() {
234            println!("WARNING: Error occurred during timer callback: {e:?}");
235        }
236    }
237
238    extern "C" fn timer_callback(_id: c_int, signal: *mut siginfo_t, _uc: *mut c_void){
239        let ctx = unsafe { (*signal).si_value().sival_ptr as MutWrapperUnsafeRepr };
240
241        let wrapper = unsafe { &mut *(ctx as *mut MutWrapper) };
242        match wrapper.hint {
243            Some(CallbackHint::SlowFunction(_)) => { thread::spawn(move || Self::unsafe_call(ctx)); },
244            _ => wrapper.timer_queue().dispatch_quick_call(ctx).unwrap()
245        }
246    }
247}
248
249impl<'h> Timer<'h> {
250    /// Reset the timer with a new due time and a new period.
251    pub fn change_period(&self, due: Duration, period: Duration) -> Result<()> {
252        if let Some(handle) = self.handle { change_period(handle, due, period) }
253        else { Ok(()) }
254    }
255
256    /// Manually close the timer. It is safe to call this method more than once, but it is not thread-safe.
257    pub fn close(&mut self) -> Result<()> {
258        if let Some(handle) = self.handle.take() {
259            close_timer(handle, &self.callback)
260        } else {
261            Ok(())
262        }
263    }
264}