1use std::{
2 time::Duration,
3 sync::mpsc::{channel, Sender},
4 ffi::{c_void, CString},
5 ptr, mem, thread, sync
6};
7use libc::{c_int, sigaction, sigevent, sigval, sigemptyset, siginfo_t, size_t, strerror, SIGRTMIN, SIGEV_THREAD_ID,
8 syscall, SYS_gettid, timer_create, itimerspec, timespec, c_long, timer_settime, timer_t, timer_delete, CLOCK_REALTIME};
9use sync_wait_object::WaitEvent;
10use crate::{
11 CallbackHint, Result, TimerError,
12 common::MutWrapper
13};
14use crate::common::MutCallable;
15
16pub struct TimerQueueCore {
18 timer_queue: Sender<TimerCreationUnsafeRequest>,
19 quick_dispatcher: Sender<MutWrapperUnsafeRepr>
20}
21
22#[doc = include_str!("../docs/TimerQueue.md")]
23pub struct TimerQueue(sync::Arc<TimerQueueCore>);
24
25#[doc = include_str!("../docs/Timer.md")]
26pub struct Timer<'h> {
27 handle: Option<timer_t>,
28 callback: Box<MutWrapper<'h>>
29}
30
31type MutWrapperUnsafeRepr = usize;
32type TimerHandleUnsafeRepr = usize;
33type TimerHandleResult = Result<TimerHandleUnsafeRepr>;
34
35struct TimerCreationUnsafeRequest {
36 due: Duration,
37 period: Duration,
38 callback_ref: MutWrapperUnsafeRepr,
39 signal: Sender<TimerHandleResult>
40}
41
42fn to_error(err_no: c_int) -> TimerError {
44 assert_ne!(err_no, 0);
45 let message = unsafe { CString::from_raw(strerror(err_no)) };
46 TimerError::OsError(err_no as isize, message.into_string().unwrap())
47}
48
49#[inline] fn get_errno() -> TimerError {
50 to_error(unsafe { *libc::__errno_location() })
51}
52
53fn to_result(ret: c_int) -> Result<()> {
54 if ret == 0 { Ok(()) }
55 else { Err(get_errno()) }
56}
57
58fn close_timer(handle: timer_t, callback: &MutWrapper) -> Result<()> {
59 let acceptable_execution_time = match callback.hint {
60 Some(CallbackHint::SlowFunction(d)) => d,
61 _ => crate::DEFAULT_ACCEPTABLE_EXECUTION_TIME
62 };
63 callback.mark_delete();
64
65 change_period(handle, Duration::ZERO, Duration::ZERO)?;
66 callback.wait_idle(acceptable_execution_time)?;
67 unsafe { to_result(timer_delete(handle)) }
68}
69
70fn change_period(handle: timer_t, due: Duration, period: Duration) -> Result<()> {
71 let interval = itimerspec {
72 it_value: to_timespec(due),
73 it_interval: to_timespec(period)
74 };
75 unsafe { to_result(timer_settime(handle, 0, &interval, ptr::null_mut())) }
76}
77
78fn to_timespec(value: Duration) -> timespec {
79 let ns = value.as_nanos();
80 let secs = ns / 1_000_000_000;
81 let pure_ns = ns - (secs * 1_000_000_000);
82 timespec { tv_sec: secs as c_long, tv_nsec: pure_ns as c_long }
83}
84
85static DEFAULT_QUEUE_ONCE: sync::Once = sync::Once::new();
87static mut DEFAULT_QUEUE: Option<TimerQueue> = None;
88
89impl TimerQueue {
90 pub fn new() -> Self {
92 let (dispatcher, receiver) = channel::<TimerCreationUnsafeRequest>();
93 thread::spawn(move || {
94 for req in receiver {
95 let timer = Self::schedule_signal_callback(req.due, req.period, req.callback_ref);
96 let message = timer.map(|t| t as TimerHandleUnsafeRepr);
97 req.signal.send(message).unwrap();
98 }
99 });
100
101 let (quick_dispatcher, quick_queue) = channel();
102 thread::spawn(move || {
103 for ctx in quick_queue {
104 Self::unsafe_call(ctx);
105 }
106 });
107 TimerQueue(sync::Arc::new(TimerQueueCore{
108 timer_queue: dispatcher, quick_dispatcher
109 }))
110 }
111
112 pub fn default() -> &'static TimerQueue {
114 unsafe {
115 DEFAULT_QUEUE_ONCE.call_once(|| {
116 DEFAULT_QUEUE = Some(Self::new());
117 });
118 DEFAULT_QUEUE.as_ref().unwrap()
119 }
120 }
121
122 #[doc = include_str!("../docs/TimerQueue_schedule_timer.md")]
123 pub fn schedule_timer<'h, F>(&self, due: Duration, period: Duration, hint: Option<CallbackHint>, handler: F) -> Result<Timer<'h>>
124 where F: FnMut() + Send + 'h
125 {
126 let callback = Box::new(MutWrapper::new(self.0.clone(), hint, handler));
127 let timer_unsafe = self.create_timer(due, period, &callback)?;
128
129 timer_unsafe.map(|t| Timer::<'h> {
130 handle: Some(t as timer_t),
131 callback
132 })
133 }
134
135 #[doc = include_str!("../docs/TimerQueue_schedule_oneshot.md")]
136 pub fn schedule_oneshot<'h, F>(&self, due: Duration, hint: Option<CallbackHint>, handler: F) -> Result<Timer<'h>>
137 where F: FnOnce() + Send + 'h
138 {
139 let callback = Box::new(MutWrapper::new_once(self.0.clone(), hint, handler));
140 let timer_unsafe = self.create_timer(due, Duration::ZERO, &callback)?;
141
142 timer_unsafe.map(|t| Timer::<'h> {
143 handle: Some(t as timer_t),
144 callback
145 })
146 }
147
148 #[doc = include_str!("../docs/TimerQueue_fire_oneshot.md")]
149 pub fn fire_oneshot<F>(&self, due: Duration, hint: Option<CallbackHint>, handler: F) -> Result<()>
150 where F: FnOnce() + Send + 'static
151 {
152 let journal: WaitEvent<Option<(TimerHandleUnsafeRepr, MutWrapperUnsafeRepr)>> = WaitEvent::new_init(None);
153 let mut journal_write = journal.clone();
154 let wrapper = move || {
155 handler();
156 let (handle, callback_ptr) = journal.wait_reset(None, || None, |v| v.is_some()).unwrap().unwrap();
157
158 thread::spawn(move || {
160 let callback = unsafe { Box::from_raw(callback_ptr as *mut MutWrapper) };
161 close_timer(handle as timer_t, &callback).unwrap();
162 });
163 };
164
165 let callback = Box::new(MutWrapper::new_once(self.0.clone(), hint, wrapper));
166 let timer_unsafe = self.create_timer(due, Duration::ZERO, &callback)?;
167
168 let callback_ptr = Box::into_raw(callback) as MutWrapperUnsafeRepr;
169
170 match timer_unsafe {
171 Ok(handle) => journal_write.set_state(Some((handle, callback_ptr))).map_err(|e| e.into()),
172 Err(e) => Err(e.into())
173 }
174 }
175
176 #[inline]
177 pub(crate) fn new_with_context(context: sync::Arc<TimerQueueCore>) -> Self {
178 TimerQueue(context)
179 }
180
181 fn dispatch_quick_call(&self, ctx: MutWrapperUnsafeRepr) -> Result<()> {
182 self.0.quick_dispatcher.send(ctx).map_err(|_| TimerError::SynchronizationBroken)
183 }
184
185 fn create_timer<'h>(&self, due: Duration, period: Duration, callback: &Box<MutWrapper<'h>>) -> Result<TimerHandleResult> {
186 let callback_ref = callback.as_ref() as *const MutWrapper as MutWrapperUnsafeRepr;
187 let (signal, timer_receiver) = channel();
188 let unsafe_request = TimerCreationUnsafeRequest { due, period, callback_ref, signal };
189
190 self.0.timer_queue.send(unsafe_request).unwrap();
191
192 match timer_receiver.recv() {
193 Err(_) => Err(TimerError::SynchronizationBroken),
194 Ok(thm) => Ok(thm)
195 }
196 }
197
198 fn schedule_signal_callback(due: Duration, period: Duration, callback_ref: MutWrapperUnsafeRepr) -> Result<timer_t>
199 {
200 unsafe {
201 let mut sa_mask = mem::zeroed();
202 to_result(sigemptyset(&mut sa_mask))?;
203 let sa = sigaction {
204 sa_flags: libc::SA_SIGINFO,
205 sa_sigaction: Self::timer_callback as size_t,
206 sa_mask,
207 sa_restorer: None
208 };
209 to_result(sigaction(SIGRTMIN(), &sa, ptr::null_mut()))?;
210
211 let mut sev: sigevent = mem::zeroed();
212 sev.sigev_value = sigval {
213 sival_ptr: callback_ref as *mut c_void
214 };
215 sev.sigev_signo = SIGRTMIN();
216 sev.sigev_notify = SIGEV_THREAD_ID;
217 sev.sigev_notify_thread_id = syscall(SYS_gettid) as i32;
218 let mut timer = ptr::null_mut();
219 to_result(timer_create(CLOCK_REALTIME, &mut sev, &mut timer))?;
220
221 let interval = itimerspec {
222 it_value: to_timespec(due),
223 it_interval: to_timespec(period)
224 };
225
226 to_result(timer_settime(timer, 0, &interval, ptr::null_mut()))?;
227 Ok(timer)
228 }
229 }
230
231 fn unsafe_call(ctx: MutWrapperUnsafeRepr) {
232 let wrapper = unsafe { &mut *(ctx as *mut MutWrapper) };
233 if let Err(e) = wrapper.call() {
234 println!("WARNING: Error occurred during timer callback: {e:?}");
235 }
236 }
237
238 extern "C" fn timer_callback(_id: c_int, signal: *mut siginfo_t, _uc: *mut c_void){
239 let ctx = unsafe { (*signal).si_value().sival_ptr as MutWrapperUnsafeRepr };
240
241 let wrapper = unsafe { &mut *(ctx as *mut MutWrapper) };
242 match wrapper.hint {
243 Some(CallbackHint::SlowFunction(_)) => { thread::spawn(move || Self::unsafe_call(ctx)); },
244 _ => wrapper.timer_queue().dispatch_quick_call(ctx).unwrap()
245 }
246 }
247}
248
249impl<'h> Timer<'h> {
250 pub fn change_period(&self, due: Duration, period: Duration) -> Result<()> {
252 if let Some(handle) = self.handle { change_period(handle, due, period) }
253 else { Ok(()) }
254 }
255
256 pub fn close(&mut self) -> Result<()> {
258 if let Some(handle) = self.handle.take() {
259 close_timer(handle, &self.callback)
260 } else {
261 Ok(())
262 }
263 }
264}