awaken/
lib.rs

1// Copyright 2022 Twitter, Inc.
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Provides a `Waker` trait to allow using the `Waker` from `mio` or a provided
6//! `Waker` that uses eventfd directly (supported only on linux) interchangably.
7//!
8//! This is particularly useful in cases where some struct (such as a queue) may
9//! be used with either `mio`-based event loops, or with io_uring. The `Waker`
10//! provided by `mio` is not directly usable in io_uring based code due to the
11//! fact that it must be registered to an event loop (such as epoll).
12
13use core::sync::atomic::{AtomicU64, Ordering};
14
15pub struct Waker {
16    inner: Box<dyn GenericWaker>,
17    pending: AtomicU64,
18}
19
20impl From<MioWaker> for Waker {
21    fn from(other: MioWaker) -> Self {
22        Self {
23            inner: Box::new(other),
24            pending: AtomicU64::new(0),
25        }
26    }
27}
28
29impl Waker {
30    pub fn wake(&self) -> std::io::Result<()> {
31        if self.pending.fetch_add(1, Ordering::Relaxed) == 0 {
32            self.inner.wake()
33        } else {
34            Ok(())
35        }
36    }
37
38    #[cfg(target_os = "linux")]
39    pub fn as_raw_fd(&self) -> Option<RawFd> {
40        self.inner.as_raw_fd()
41    }
42
43    pub fn reset(&self) {
44        self.pending.store(0, Ordering::Relaxed);
45    }
46}
47
48pub trait GenericWaker: Send + Sync {
49    fn wake(&self) -> std::io::Result<()>;
50
51    #[cfg(target_os = "linux")]
52    fn as_raw_fd(&self) -> Option<RawFd>;
53}
54
55#[cfg(target_os = "linux")]
56use std::os::unix::prelude::RawFd;
57
58pub use mio::Waker as MioWaker;
59
60impl GenericWaker for MioWaker {
61    fn wake(&self) -> std::io::Result<()> {
62        self.wake()
63    }
64
65    #[cfg(target_os = "linux")]
66    fn as_raw_fd(&self) -> Option<RawFd> {
67        None
68    }
69}
70
71#[cfg(target_os = "linux")]
72pub use self::eventfd::EventfdWaker;
73
74#[cfg(target_os = "linux")]
75mod eventfd {
76    use crate::*;
77    use std::fs::File;
78    use std::io::{ErrorKind, Result, Write};
79    use std::os::unix::io::{AsRawFd, FromRawFd};
80    use std::os::unix::prelude::RawFd;
81
82    pub struct EventfdWaker {
83        inner: File,
84    }
85
86    // a simple eventfd waker. based off the implementation in mio
87    impl EventfdWaker {
88        pub fn new() -> Result<Self> {
89            let ret = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
90            if ret < 0 {
91                Err(std::io::Error::new(
92                    ErrorKind::Other,
93                    "failed to create eventfd",
94                ))
95            } else {
96                Ok(Self {
97                    inner: unsafe { File::from_raw_fd(ret) },
98                })
99            }
100        }
101
102        pub fn wake(&self) -> Result<()> {
103            match (&self.inner).write(&[1, 0, 0, 0, 0, 0, 0, 0]) {
104                Ok(_) => Ok(()),
105                Err(e) => {
106                    if e.kind() == ErrorKind::WouldBlock {
107                        // writing blocks if the counter would overflow, reset it
108                        // and wake again
109                        self.reset()?;
110                        self.wake()
111                    } else {
112                        Err(e)
113                    }
114                }
115            }
116        }
117
118        fn reset(&self) -> Result<()> {
119            match (&self.inner).write(&[0, 0, 0, 0, 0, 0, 0, 0]) {
120                Ok(_) => Ok(()),
121                Err(e) => {
122                    if e.kind() == ErrorKind::WouldBlock {
123                        // we can ignore wouldblock during reset
124                        Ok(())
125                    } else {
126                        Err(e)
127                    }
128                }
129            }
130        }
131    }
132
133    impl GenericWaker for EventfdWaker {
134        fn wake(&self) -> Result<()> {
135            self.wake()
136        }
137
138        fn as_raw_fd(&self) -> Option<RawFd> {
139            Some(self.inner.as_raw_fd())
140        }
141    }
142
143    impl From<EventfdWaker> for Waker {
144        fn from(other: EventfdWaker) -> Self {
145            Self {
146                inner: Box::new(other),
147                pending: AtomicU64::new(0),
148            }
149        }
150    }
151}