1use std::{
2 sync,
3 time::Duration,
4 ffi::c_void
5};
6use sync_wait_object::WaitEvent;
7use windows::Win32::{
8 Foundation::{HANDLE, BOOLEAN, ERROR_IO_PENDING, WIN32_ERROR, GetLastError},
9 System::Threading::*,
10};
11use super::timer::{CallbackHint, Result, DEFAULT_ACCEPTABLE_EXECUTION_TIME};
12use crate::common::*;
13use super::TimerError;
14
15#[doc = include_str!("../docs/TimerQueue.md")]
17pub struct TimerQueue(sync::Arc<TimerQueueCore>);
18
19#[doc = include_str!("../docs/Timer.md")]
20pub struct Timer<'h> {
21 queue: sync::Arc<TimerQueueCore>,
22 handle: HANDLE,
23 callback: Box<MutWrapper<'h>>,
24 acceptable_execution_time: Duration
25}
26
27pub(crate) struct TimerQueueCore(HANDLE);
28
29#[inline]
31pub(crate) fn get_win32_last_error() -> WIN32_ERROR {
32 unsafe { GetLastError() }
33}
34
35#[inline]
36pub(crate) fn get_last_error() -> TimerError {
37 get_win32_last_error().into()
38}
39
40pub(crate) fn to_result(ret: bool) -> Result<()> {
41 if ret { Ok(()) }
42 else { Err(get_last_error()) }
43}
44
45fn change_period(queue: HANDLE, timer: HANDLE, due: Duration, period: Duration) -> Result<()> {
46 to_result(unsafe { ChangeTimerQueueTimer(queue, timer, due.as_millis() as u32, period.as_millis() as u32).as_bool() })
47}
48
49fn close_timer(queue: HANDLE, handle: HANDLE, acceptable_execution_time: Duration, callback: &MutWrapper) -> Result<()> {
50 callback.mark_delete();
51
52 let key = callback as *const MutWrapper as MutWrapperUnsafeRepr;
53 remove_mutwrapper_unsafe_repr(key);
54
55 change_period(queue, handle, Duration::default(), Duration::default())?;
57
58 callback.wait_idle(acceptable_execution_time)?;
59 let result = unsafe { DeleteTimerQueueTimer(queue, handle, None).as_bool() };
60
61 if !result {
62 let e = get_win32_last_error();
63 if e != ERROR_IO_PENDING {
64 return Err(e.into());
65 }
66 }
67 Ok(())
68}
69
70static DEFAULT_QUEUE_INIT: sync::Once = sync::Once::new();
71static mut DEFAULT_QUEUE: Option<TimerQueue> = None;
72
73fn get_acceptable_execution_time(hint: Option<CallbackHint>) -> Duration {
74 hint.map(|o| match o {
75 CallbackHint::QuickFunction => DEFAULT_ACCEPTABLE_EXECUTION_TIME,
76 CallbackHint::SlowFunction(t) => t
77 }).unwrap_or(DEFAULT_ACCEPTABLE_EXECUTION_TIME)
78}
79
80impl TimerQueue {
82 #[inline]
84 pub fn default() -> &'static TimerQueue {
85 DEFAULT_QUEUE_INIT.call_once(|| unsafe {
86 DEFAULT_QUEUE = Some(TimerQueue(sync::Arc::new(TimerQueueCore(HANDLE(0)))));
87 });
88 unsafe { DEFAULT_QUEUE.as_ref().unwrap() }
89 }
90
91 pub fn new() -> Self {
93 let core = unsafe { CreateTimerQueue().unwrap() };
94 TimerQueue(sync::Arc::new(TimerQueueCore(core)))
95 }
96
97 #[doc = include_str!("../docs/TimerQueue_schedule_timer.md")]
98 pub fn schedule_timer<'h, F>(&self, due: Duration, period: Duration, hint: Option<CallbackHint>, handler: F) -> Result<Timer<'h>>
99 where F: FnMut() + Send + 'h
100 {
101 let acceptable_execution_time = get_acceptable_execution_time(hint);
102 let period = period.as_millis() as u32;
103 let callback = Box::new(MutWrapper::new(self.0.clone(), hint, handler));
104 let timer_handle = self.create_timer(due, period, hint, &callback)?;
105 Ok(Timer::<'h> { queue: self.0.clone(), handle: timer_handle, callback, acceptable_execution_time })
106 }
107
108 #[doc = include_str!("../docs/TimerQueue_schedule_oneshot.md")]
109 pub fn schedule_oneshot<'h, F>(&self, due: Duration, hint: Option<CallbackHint>, handler: F) -> Result<Timer<'h>> where F: FnOnce() + Send + 'h {
110 let acceptable_execution_time = get_acceptable_execution_time(hint);
111 let callback = Box::new(MutWrapper::new_once(self.0.clone(), hint, handler));
112 let timer_handle = self.create_timer(due, 0, hint, &callback)?;
113 Ok(Timer::<'h> { queue: self.0.clone(), handle: timer_handle, callback, acceptable_execution_time })
114 }
115
116 #[doc = include_str!("../docs/TimerQueue_fire_oneshot.md")]
117 pub fn fire_oneshot<F>(&self, due: Duration, hint: Option<CallbackHint>, handler: F) -> Result<()> where F: FnOnce() + Send + 'static {
118 let journal: WaitEvent<Option<(HANDLE, usize)>> = WaitEvent::new_init(None);
119 let mut journal_write = journal.clone();
120 let queue_handle = self.0.0;
121
122 let acceptable_execution_time = get_acceptable_execution_time(hint);
123
124 let wrapper = move || {
125 handler();
126 let (handle, callback_ptr) = journal.wait_reset(None, || None, |v| v.is_some()).unwrap().unwrap();
127
128 std::thread::spawn(move || {
131 let callback = unsafe { Box::from_raw(callback_ptr as *mut MutWrapper) };
132 close_timer(queue_handle, handle, acceptable_execution_time, &callback).unwrap();
133 });
134 };
135 let callback = Box::new(MutWrapper::new_once(self.0.clone(), hint, wrapper));
136 let timer_handle = self.create_timer(due, 0, hint, &callback)?;
137 let callback_ptr = Box::into_raw(callback) as usize;
138 journal_write.set_state(Some((timer_handle, callback_ptr))).map_err(|e| e.into())
139 }
140
141 #[allow(dead_code)]
142 pub(crate) fn new_with_context(context: sync::Arc<TimerQueueCore>) -> Self {
143 TimerQueue(context)
144 }
145
146 fn create_timer(&self, due: Duration, period: u32, hint: Option<CallbackHint>, callback: &Box<MutWrapper>) -> Result<HANDLE>
147 {
148 let option = hint.map(|o| match o {
149 CallbackHint::QuickFunction => WT_EXECUTEINPERSISTENTTHREAD,
150 CallbackHint::SlowFunction(_) => WT_EXECUTELONGFUNCTION
151 }).unwrap_or(WT_EXECUTEDEFAULT);
152 let option = if period == 0 { option | WT_EXECUTEONLYONCE } else { option };
153
154 let mut timer_handle = HANDLE::default();
155 let callback_ref = callback.as_ref() as *const MutWrapper as *const c_void;
156
157 save_mutwrapper_unsafe_repr(callback_ref as MutWrapperUnsafeRepr);
158
159 let create_timer_queue_timer_result = unsafe {
160 CreateTimerQueueTimer(&mut timer_handle, self.0.0, Some(timer_callback), Some(callback_ref),
161 due.as_millis() as u32, period, option).as_bool()
162 };
163 if create_timer_queue_timer_result {
164 Ok(timer_handle)
165 } else {
166 Err(get_last_error())
167 }
168 }
169}
170
171extern "system" fn timer_callback(ctx: *mut c_void, _: BOOLEAN) {
172 let valid = is_mutwrapper_unsafe_repr_valid(ctx as MutWrapperUnsafeRepr);
173 if valid {
174 let wrapper = unsafe { &mut *(ctx as *mut MutWrapper) };
175 if let Err(e) = wrapper.call() {
176 println!("WARNING: Error occurred during timer callback: {e:?}");
177 }
178 }
179}
180
181impl Drop for TimerQueueCore {
182 fn drop(&mut self) {
183 if !self.0.is_invalid() {
184 assert!(unsafe { DeleteTimerQueue(self.0).as_bool() });
185 self.0 = HANDLE::default();
186 }
187 }
188}
189
190impl<'h> Timer<'h> {
191 pub fn change_period(&self, due: Duration, period: Duration) -> Result<()> {
193 change_period(self.queue.0, self.handle, due, period)
194 }
195
196 pub fn close(&mut self) -> Result<()> {
198 if !self.handle.is_invalid() {
199 let handle = self.handle;
200 self.handle = HANDLE::default();
201 close_timer(self.queue.0, handle, self.acceptable_execution_time, &self.callback)
202 } else {
203 Ok(())
204 }
205 }
206}
207
208impl From<WIN32_ERROR> for TimerError {
209 fn from(value: WIN32_ERROR) -> Self {
210 TimerError::OsError(value.0 as isize, value.to_hresult().message().to_string())
211 }
212}