Skip to main content

ipckit/
waker.rs

1//! # Event Loop Waker
2//!
3//! This module provides traits and implementations for waking up event loops
4//! when IPC messages arrive. This is essential for integrating ipckit with
5//! GUI frameworks (Qt, GTK, winit/tao) and async runtimes (tokio).
6//!
7//! ## Example
8//!
9//! ```rust,ignore
10//! use ipckit::{EventLoopWaker, ThreadWaker, WakeableChannel};
11//!
12//! // Create a waker for the current thread
13//! let waker = ThreadWaker::current();
14//!
15//! // Set the waker on a channel
16//! channel.set_waker(Box::new(waker));
17//!
18//! // Now when messages arrive, the thread will be woken
19//! ```
20
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23use std::thread::Thread;
24
25#[cfg(feature = "async")]
26use tokio::sync::Notify;
27
28/// Trait for waking up an event loop when IPC messages arrive.
29///
30/// Implementations of this trait can be used to integrate ipckit with
31/// various event loop systems like GUI frameworks or async runtimes.
32pub trait EventLoopWaker: Send + Sync {
33    /// Wake up the event loop.
34    ///
35    /// This method should be called when new IPC messages are available
36    /// and the event loop needs to process them.
37    fn wake(&self);
38
39    /// Check if the waker is still valid.
40    ///
41    /// Returns `false` if the associated event loop has been closed
42    /// or the waker can no longer wake it up.
43    fn is_valid(&self) -> bool;
44
45    /// Clone the waker into a boxed trait object.
46    fn clone_box(&self) -> Box<dyn EventLoopWaker>;
47}
48
49impl Clone for Box<dyn EventLoopWaker> {
50    fn clone(&self) -> Self {
51        self.clone_box()
52    }
53}
54
55/// A waker that wakes a specific thread using `std::thread::Thread::unpark()`.
56///
57/// This is useful for simple blocking scenarios where a thread is waiting
58/// for IPC messages using `std::thread::park()`.
59#[derive(Debug, Clone)]
60pub struct ThreadWaker {
61    thread: Thread,
62    valid: Arc<AtomicBool>,
63}
64
65impl ThreadWaker {
66    /// Create a waker for the current thread.
67    pub fn current() -> Self {
68        Self {
69            thread: std::thread::current(),
70            valid: Arc::new(AtomicBool::new(true)),
71        }
72    }
73
74    /// Create a waker for a specific thread.
75    pub fn new(thread: Thread) -> Self {
76        Self {
77            thread,
78            valid: Arc::new(AtomicBool::new(true)),
79        }
80    }
81
82    /// Invalidate this waker.
83    ///
84    /// After calling this, `is_valid()` will return `false`.
85    pub fn invalidate(&self) {
86        self.valid.store(false, Ordering::SeqCst);
87    }
88}
89
90impl EventLoopWaker for ThreadWaker {
91    fn wake(&self) {
92        if self.is_valid() {
93            self.thread.unpark();
94        }
95    }
96
97    fn is_valid(&self) -> bool {
98        self.valid.load(Ordering::SeqCst)
99    }
100
101    fn clone_box(&self) -> Box<dyn EventLoopWaker> {
102        Box::new(self.clone())
103    }
104}
105
106/// A waker that uses a callback function.
107///
108/// This is useful for integrating with custom event loop systems.
109pub struct CallbackWaker<F>
110where
111    F: Fn() + Send + Sync + Clone + 'static,
112{
113    callback: F,
114    valid: Arc<AtomicBool>,
115}
116
117impl<F> CallbackWaker<F>
118where
119    F: Fn() + Send + Sync + Clone + 'static,
120{
121    /// Create a new callback waker.
122    pub fn new(callback: F) -> Self {
123        Self {
124            callback,
125            valid: Arc::new(AtomicBool::new(true)),
126        }
127    }
128
129    /// Invalidate this waker.
130    pub fn invalidate(&self) {
131        self.valid.store(false, Ordering::SeqCst);
132    }
133}
134
135impl<F> Clone for CallbackWaker<F>
136where
137    F: Fn() + Send + Sync + Clone + 'static,
138{
139    fn clone(&self) -> Self {
140        Self {
141            callback: self.callback.clone(),
142            valid: Arc::clone(&self.valid),
143        }
144    }
145}
146
147impl<F> EventLoopWaker for CallbackWaker<F>
148where
149    F: Fn() + Send + Sync + Clone + 'static,
150{
151    fn wake(&self) {
152        if self.is_valid() {
153            (self.callback)();
154        }
155    }
156
157    fn is_valid(&self) -> bool {
158        self.valid.load(Ordering::SeqCst)
159    }
160
161    fn clone_box(&self) -> Box<dyn EventLoopWaker> {
162        Box::new(self.clone())
163    }
164}
165
166/// A waker for tokio async runtime.
167///
168/// Uses `tokio::sync::Notify` to wake up async tasks waiting for IPC messages.
169#[cfg(feature = "async")]
170#[derive(Debug, Clone)]
171pub struct TokioWaker {
172    notify: Arc<Notify>,
173    valid: Arc<AtomicBool>,
174}
175
176#[cfg(feature = "async")]
177impl TokioWaker {
178    /// Create a new tokio waker.
179    pub fn new() -> Self {
180        Self {
181            notify: Arc::new(Notify::new()),
182            valid: Arc::new(AtomicBool::new(true)),
183        }
184    }
185
186    /// Get the notify handle for waiting.
187    ///
188    /// Use this to await notifications in async code:
189    /// ```rust,ignore
190    /// waker.notified().await;
191    /// ```
192    pub fn notified(&self) -> tokio::sync::futures::Notified<'_> {
193        self.notify.notified()
194    }
195
196    /// Invalidate this waker.
197    pub fn invalidate(&self) {
198        self.valid.store(false, Ordering::SeqCst);
199    }
200}
201
202#[cfg(feature = "async")]
203impl Default for TokioWaker {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209#[cfg(feature = "async")]
210impl EventLoopWaker for TokioWaker {
211    fn wake(&self) {
212        if self.is_valid() {
213            self.notify.notify_one();
214        }
215    }
216
217    fn is_valid(&self) -> bool {
218        self.valid.load(Ordering::SeqCst)
219    }
220
221    fn clone_box(&self) -> Box<dyn EventLoopWaker> {
222        Box::new(self.clone())
223    }
224}
225
226/// A waker that broadcasts to multiple wakers.
227///
228/// Useful when multiple event loops need to be notified of the same event.
229#[derive(Clone, Default)]
230pub struct BroadcastWaker {
231    wakers: Vec<Box<dyn EventLoopWaker>>,
232}
233
234impl BroadcastWaker {
235    /// Create a new broadcast waker.
236    pub fn new() -> Self {
237        Self { wakers: Vec::new() }
238    }
239
240    /// Add a waker to the broadcast list.
241    pub fn add(&mut self, waker: Box<dyn EventLoopWaker>) {
242        self.wakers.push(waker);
243    }
244
245    /// Remove invalid wakers from the list.
246    pub fn cleanup(&mut self) {
247        self.wakers.retain(|w| w.is_valid());
248    }
249
250    /// Get the number of wakers.
251    pub fn len(&self) -> usize {
252        self.wakers.len()
253    }
254
255    /// Check if there are no wakers.
256    pub fn is_empty(&self) -> bool {
257        self.wakers.is_empty()
258    }
259}
260
261impl EventLoopWaker for BroadcastWaker {
262    fn wake(&self) {
263        for waker in &self.wakers {
264            if waker.is_valid() {
265                waker.wake();
266            }
267        }
268    }
269
270    fn is_valid(&self) -> bool {
271        self.wakers.iter().any(|w| w.is_valid())
272    }
273
274    fn clone_box(&self) -> Box<dyn EventLoopWaker> {
275        Box::new(self.clone())
276    }
277}
278
279/// A channel that can wake an event loop when messages arrive.
280pub trait WakeableChannel {
281    /// Set the event loop waker.
282    ///
283    /// When messages arrive on this channel, the waker will be called
284    /// to notify the event loop.
285    fn set_waker(&mut self, waker: Box<dyn EventLoopWaker>);
286
287    /// Remove the waker.
288    fn clear_waker(&mut self);
289
290    /// Get a reference to the current waker, if any.
291    fn waker(&self) -> Option<&dyn EventLoopWaker>;
292}
293
294/// A wrapper that adds waker support to any channel.
295pub struct WakeableWrapper<C> {
296    inner: C,
297    waker: Option<Box<dyn EventLoopWaker>>,
298}
299
300impl<C> WakeableWrapper<C> {
301    /// Create a new wakeable wrapper around a channel.
302    pub fn new(channel: C) -> Self {
303        Self {
304            inner: channel,
305            waker: None,
306        }
307    }
308
309    /// Get a reference to the inner channel.
310    pub fn inner(&self) -> &C {
311        &self.inner
312    }
313
314    /// Get a mutable reference to the inner channel.
315    pub fn inner_mut(&mut self) -> &mut C {
316        &mut self.inner
317    }
318
319    /// Consume the wrapper and return the inner channel.
320    pub fn into_inner(self) -> C {
321        self.inner
322    }
323
324    /// Wake the event loop if a waker is set.
325    pub fn wake(&self) {
326        if let Some(ref waker) = self.waker {
327            if waker.is_valid() {
328                waker.wake();
329            }
330        }
331    }
332}
333
334impl<C> WakeableChannel for WakeableWrapper<C> {
335    fn set_waker(&mut self, waker: Box<dyn EventLoopWaker>) {
336        self.waker = Some(waker);
337    }
338
339    fn clear_waker(&mut self) {
340        self.waker = None;
341    }
342
343    fn waker(&self) -> Option<&dyn EventLoopWaker> {
344        self.waker.as_deref()
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use super::*;
351    use std::sync::atomic::AtomicUsize;
352    use std::time::Duration;
353
354    #[test]
355    fn test_thread_waker() {
356        let waker = ThreadWaker::current();
357        assert!(waker.is_valid());
358
359        waker.wake();
360        // Should not panic
361
362        waker.invalidate();
363        assert!(!waker.is_valid());
364    }
365
366    #[test]
367    fn test_callback_waker() {
368        let counter = Arc::new(AtomicUsize::new(0));
369        let counter_clone = Arc::clone(&counter);
370
371        let waker = CallbackWaker::new(move || {
372            counter_clone.fetch_add(1, Ordering::SeqCst);
373        });
374
375        assert!(waker.is_valid());
376        waker.wake();
377        assert_eq!(counter.load(Ordering::SeqCst), 1);
378
379        waker.wake();
380        assert_eq!(counter.load(Ordering::SeqCst), 2);
381
382        waker.invalidate();
383        waker.wake();
384        assert_eq!(counter.load(Ordering::SeqCst), 2); // Should not increment
385    }
386
387    #[test]
388    fn test_broadcast_waker() {
389        let counter1 = Arc::new(AtomicUsize::new(0));
390        let counter2 = Arc::new(AtomicUsize::new(0));
391
392        let c1 = Arc::clone(&counter1);
393        let c2 = Arc::clone(&counter2);
394
395        let mut broadcast = BroadcastWaker::new();
396        broadcast.add(Box::new(CallbackWaker::new(move || {
397            c1.fetch_add(1, Ordering::SeqCst);
398        })));
399        broadcast.add(Box::new(CallbackWaker::new(move || {
400            c2.fetch_add(1, Ordering::SeqCst);
401        })));
402
403        assert_eq!(broadcast.len(), 2);
404        assert!(broadcast.is_valid());
405
406        broadcast.wake();
407        assert_eq!(counter1.load(Ordering::SeqCst), 1);
408        assert_eq!(counter2.load(Ordering::SeqCst), 1);
409    }
410
411    #[cfg(feature = "async")]
412    #[tokio::test]
413    async fn test_tokio_waker() {
414        let waker = TokioWaker::new();
415        assert!(waker.is_valid());
416
417        let waker_clone = waker.clone();
418        tokio::spawn(async move {
419            tokio::time::sleep(Duration::from_millis(10)).await;
420            waker_clone.wake();
421        });
422
423        tokio::time::timeout(Duration::from_millis(100), waker.notified())
424            .await
425            .expect("Should be notified");
426    }
427
428    #[test]
429    fn test_wakeable_wrapper() {
430        struct DummyChannel;
431
432        let mut wrapper = WakeableWrapper::new(DummyChannel);
433        assert!(wrapper.waker().is_none());
434
435        let counter = Arc::new(AtomicUsize::new(0));
436        let c = Arc::clone(&counter);
437        wrapper.set_waker(Box::new(CallbackWaker::new(move || {
438            c.fetch_add(1, Ordering::SeqCst);
439        })));
440
441        assert!(wrapper.waker().is_some());
442        wrapper.wake();
443        assert_eq!(counter.load(Ordering::SeqCst), 1);
444
445        wrapper.clear_waker();
446        assert!(wrapper.waker().is_none());
447    }
448}