io_uring_epoll/
lib.rs

1#![warn(
2    clippy::unwrap_used,
3    missing_docs,
4    rust_2018_idioms,
5    unused_lifetimes,
6    unused_qualifications
7)]
8#![doc = include_str!("../README.md")]
9
10use io_uring::opcode::EpollCtl;
11use io_uring::IoUring;
12
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::os::fd::RawFd;
16
17/// Errors from the handler
18#[derive(Debug)]
19pub enum EpollHandlerError {
20    /// Error creating IoUring instance
21    IoUringCreate(String),
22    /// Error creating epoll handle in Kernel
23    EpollCreate1(String),
24    /// EpollCtl OpCode is not supported in your kernel
25    NotSupported,
26    /// Error probing the support of EpollCtl from kernel
27    Probing(String),
28    /// The Fd is already in the handler and would override existing
29    Duplicate,
30    /// Something went yoinks in io_uring::IoRing::submit[_and_wait]
31    Submission(String),
32}
33
34impl core::fmt::Display for EpollHandlerError {
35    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
36        match self {
37            Self::IoUringCreate(s) => write!(f, "IoUring Create: {}", s),
38            Self::EpollCreate1(s) => write!(f, "epoll_create1(): {}", s),
39            Self::NotSupported => write!(
40                f,
41                "EpollCtl io_uring OpCode is not supported in your Kernel"
42            ),
43            Self::Probing(s) => write!(
44                f,
45                "Error whilst probing EpollCtl support from kernel: {}",
46                s
47            ),
48            Self::Duplicate => write!(f, "The filehandle is already maped in. Possible duplicate?"),
49            Self::Submission(s) => write!(f, "Submission: {}", s),
50        }
51    }
52}
53
54/// Manage the io_uring Submission and Completion Queues
55/// related to EpollCtrl opcode in io_uring.
56/// See io_epoll(7):
57/// <https://man7.org/linux/man-pages/man7/epoll.7.html>
58pub struct EpollHandler<'fd> {
59    pub(crate) epfd: u32,
60    pub(crate) io_uring: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>,
61    pub(crate) in_flight: u32,
62    pub(crate) fds: HashMap<i32, &'fd HandledFd>,
63    pub(crate) submit_counter: u64,
64}
65
66impl<'fd> EpollHandler<'fd> {
67    /// Create a new handler with new io-uring::IoUring
68    ///
69    /// capacity must be power of two as per io-uring documentation
70    /// ```rust
71    /// use io_uring_epoll::EpollHandler;
72    ///
73    /// EpollHandler::new(10).expect("Unable to create EPoll Handler");
74    /// ```    
75    pub fn new(capacity: u32) -> Result<Self, EpollHandlerError> {
76        let iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry> = IoUring::builder()
77            .build(capacity)
78            .map_err(|e| EpollHandlerError::IoUringCreate(e.to_string()))?;
79
80        Self::from_io_uring(iou)
81    }
82    /// Create a new handler from an existing io-uring::IoUring builder
83    /// To construct a custom IoUring see io-uring Builder:
84    /// <https://docs.rs/io-uring/latest/io_uring/struct.Builder.html>
85    ///
86    /// Example:
87    /// ```rust
88    /// use io_uring::IoUring;
89    /// use io_uring_epoll::EpollHandler;
90    ///
91    /// let mut iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>
92    ///     = IoUring::builder()
93    ///         .build(100)
94    ///         .expect("Unable to build IoUring");
95    ///
96    /// EpollHandler::from_io_uring(iou).expect("Unable to create from io_uring Builder");
97    /// ```    
98    pub fn from_io_uring(
99        iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>,
100    ) -> Result<Self, EpollHandlerError> {
101        let mut epoll_probe = io_uring::Probe::new();
102        iou.submitter()
103            .register_probe(&mut epoll_probe)
104            .map_err(|e| EpollHandlerError::Probing(e.to_string()))?;
105
106        #[allow(clippy::bool_comparison)]
107        if epoll_probe.is_supported(EpollCtl::CODE) == false {
108            return Err(EpollHandlerError::NotSupported);
109        }
110
111        // SAFETY: FFI no-data in
112        let epfd = unsafe { libc::epoll_create1(0) };
113        if epfd == -1 {
114            // SAFETY: ffi no-data
115            let errno = unsafe { libc::__errno_location() };
116            return Err(EpollHandlerError::EpollCreate1(format!(
117                "errno: {:?}",
118                errno
119            )));
120        }
121
122        Ok(Self {
123            epfd: epfd as u32,
124            io_uring: iou,
125            in_flight: 0,
126            submit_counter: 0,
127            fds: HashMap::new(),
128        })
129    }
130    /// Borrow the underlying io-uring::IoUring instance
131    pub fn io_uring(&mut self) -> &mut IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry> {
132        &mut self.io_uring
133    }
134    /// Add [`HandledFd`]
135    /// Finally Commit changes to all changed HandledFds with [`EpollHandler::prepare_submit()`]
136    pub fn add_fd(&mut self, handled_fd: &'fd HandledFd) -> Result<(), EpollHandlerError> {
137        self.fds.insert(handled_fd.fd, handled_fd);
138        Ok(())
139    }
140    /// This calls the underlying io_uring::IoUring::submit submitting all the staged commits
141    pub fn submit(&self) -> Result<usize, EpollHandlerError> {
142        self.io_uring
143            .submit()
144            .map_err(|e| EpollHandlerError::Submission(e.to_string()))
145    }
146    /// Same as submit but using io_uring::IoUring::submit_and_wait
147    pub fn submit_and_wait(&self, want: usize) -> Result<usize, EpollHandlerError> {
148        self.io_uring
149            .submit_and_wait(want)
150            .map_err(|e| EpollHandlerError::Submission(e.to_string()))
151    }
152    /// Stage commit of all changed [`HandledFd`] resources to submission queue
153    /// This will make all changes within the underlying io_uring::squeue::SubmissionQueue
154    /// Errors related on any individual staged Fds are available via FdCommitResults
155    /// Use the [`Self::submit`] or [`Self::submit_and_wait`] to push the commits to kernel
156    pub fn prepare_submit(&mut self) -> Result<FdCommitResults<'fd>, EpollHandlerError> {
157        let mut fd_commit_results = FdCommitResults {
158            new_commits: 0,
159            change_commits: 0,
160            no_change: 0,
161            empty: 0,
162            errors_on_submit: vec![],
163        };
164
165        // TODO: this is too long - split & refactor
166
167        let iou = &mut self.io_uring;
168
169        let mut s_queue = iou.submission();
170
171        let mut updates: VecDeque<HandledFd> = VecDeque::new();
172
173        for (_, handled_fd) in self.fds.iter() {
174            let mut new_fd = (**handled_fd).clone();
175
176            let mut commit_new: Option<i32> = None;
177            let mut epoll_op = EPOLL_CTL_MOD;
178            if handled_fd.wants.is_none() && handled_fd.committed.is_some() {
179                epoll_op = EPOLL_CTL_DEL;
180                commit_new = Some(0);
181            }
182            if let Some(fd_wants) = handled_fd.wants {
183                match handled_fd.committed {
184                    None => {
185                        commit_new = Some(handled_fd.wants.unwrap_or(0));
186                        epoll_op = EPOLL_CTL_ADD;
187                        fd_commit_results.new_commits += 1;
188                    }
189                    Some(committed) => {
190                        if committed != fd_wants {
191                            commit_new = Some(fd_wants);
192                            fd_commit_results.change_commits += 1;
193                        } else {
194                            fd_commit_results.no_change += 1;
195                        }
196                    }
197                }
198                if let Some(commit) = commit_new {
199                    self.submit_counter += 1;
200
201                    let epoll_event = libc::epoll_event {
202                        events: commit as u32,
203                        u64: self.submit_counter,
204                    };
205
206                    let uring_submission_rec = EpollCtl::new(
207                        io_uring::types::Fixed(self.epfd),
208                        io_uring::types::Fd(handled_fd.fd),
209                        epoll_op,
210                        std::ptr::addr_of!(epoll_event) as *const io_uring::types::epoll_event,
211                    )
212                    .build();
213
214                    // SAFETY: op_code construct is type-valid and epoll_event is from libc Rust struct
215                    let p_result = unsafe { s_queue.push(&uring_submission_rec) };
216                    match p_result {
217                        Err(e) => {
218                            new_fd.error = Some(e.to_string());
219                            fd_commit_results.errors_on_submit.push(handled_fd);
220                        }
221                        Ok(_) => {
222                            new_fd.current_submission = Some(self.submit_counter);
223                            self.in_flight += 1;
224                        }
225                    }
226                }
227            } else {
228                // TODO: too long - refactor
229                fd_commit_results.empty += 1;
230            }
231
232            // TODO: cursed
233            if new_fd != **handled_fd {
234                updates.push_back(new_fd);
235            }
236        }
237
238        // TODO: This is a &&cursed &'mut &'fd situation ?!
239        while let Some(update) = updates.pop_front() {
240            let fd_get: Option<&&HandledFd> = self.fds.get(&update.fd);
241            if let Some(unwrapped_fd) = fd_get {
242                let mut pinned_mut: std::pin::Pin<&mut &HandledFd> = std::pin::pin!(*unwrapped_fd);
243                let pinned_update: std::pin::Pin<&mut &HandledFd> = std::pin::pin!(&update);
244                let _st = std::mem::replace(&mut pinned_mut, pinned_update);
245            }
246        }
247
248        Ok(fd_commit_results)
249    }
250}
251
252/// Create [`HandledFd::new(RawFd)`] and then add it to [`EpollHandler::add_fd()`]
253///
254/// *WARNING*: Your kernel may or may not have all wanted modes available
255/// Consult your kernels epoll.h header to be sure and / or test if needed
256#[derive(Debug, Clone, PartialEq)]
257pub struct HandledFd {
258    pub(crate) fd: RawFd,
259    pub(crate) wants: Option<i32>,
260    pub(crate) pending: Option<i32>,
261    pub(crate) committed: Option<i32>,
262    pub(crate) error: Option<String>,
263    pub(crate) current_submission: Option<u64>,
264}
265
266const EPOLL_CTL_ADD: i32 = 1;
267const EPOLL_CTL_DEL: i32 = 2;
268const EPOLL_CTL_MOD: i32 = 3;
269
270impl HandledFd {
271    /// Create a new EpollHandler associated [`HandledFd`]
272    /// Then add via [`EpollHandler::add_fd()`]
273    pub fn new(fd: RawFd) -> Self {
274        HandledFd {
275            fd,
276            wants: None,
277            committed: None,
278            current_submission: None,
279            error: None,
280            pending: None,
281        }
282    }
283    /// Extract RawFd
284    pub fn as_raw(&self) -> RawFd {
285        self.fd
286    }
287    // All setters
288    fn turn_on_or_off(&mut self, mask_in: i32, on_or_off: bool) -> i32 {
289        let cur_wants: i32 = self.wants.unwrap_or(0);
290        self.wants = match on_or_off {
291            true => Some(cur_wants | mask_in),
292            false => Some(cur_wants ^ mask_in),
293        };
294        self.wants.unwrap_or(0)
295    }
296    /// Set EPOLLIN per epoll.h in userspace On or Off
297    /// Returns returns raw mask as to be sent to kernel
298    /// Use [`EpollHandler::prepare_submit()`] after
299    pub fn set_in(&mut self, on_or_off: bool) -> i32 {
300        self.turn_on_or_off(libc::EPOLLIN, on_or_off)
301    }
302    /// EPOLLPRI
303    pub fn set_pri(&mut self, on_or_off: bool) -> i32 {
304        self.turn_on_or_off(libc::EPOLLPRI, on_or_off)
305    }
306    /// EPOLLOUT
307    pub fn set_out(&mut self, on_or_off: bool) -> i32 {
308        self.turn_on_or_off(libc::EPOLLOUT, on_or_off)
309    }
310    /// EPOLLERR
311    pub fn set_err(&mut self, on_or_off: bool) -> i32 {
312        self.turn_on_or_off(libc::EPOLLERR, on_or_off)
313    }
314    /// EPOLLHUP
315    pub fn set_hup(&mut self, on_or_off: bool) -> i32 {
316        self.turn_on_or_off(libc::EPOLLHUP, on_or_off)
317    }
318    /// EPOLLRDNORM
319    pub fn set_rdnorm(&mut self, on_or_off: bool) -> i32 {
320        self.turn_on_or_off(libc::EPOLLRDNORM, on_or_off)
321    }
322    /// EPOLLRDBAND
323    pub fn set_rdband(&mut self, on_or_off: bool) -> i32 {
324        self.turn_on_or_off(libc::EPOLLRDBAND, on_or_off)
325    }
326    /// EPOLLWRNORM
327    pub fn set_wrnorm(&mut self, on_or_off: bool) -> i32 {
328        self.turn_on_or_off(libc::EPOLLWRNORM, on_or_off)
329    }
330    /// EPOLLWRBAND per epoll.h userspace On or Off
331    pub fn set_wrband(&mut self, on_or_off: bool) -> i32 {
332        self.turn_on_or_off(libc::EPOLLWRBAND, on_or_off)
333    }
334    /// EPOLLMSG
335    pub fn set_msg(&mut self, on_or_off: bool) -> i32 {
336        self.turn_on_or_off(libc::EPOLLMSG, on_or_off)
337    }
338    /// EPOLLRDHUP
339    pub fn set_rdhup(&mut self, on_or_off: bool) -> i32 {
340        self.turn_on_or_off(libc::EPOLLRDHUP, on_or_off)
341    }
342    /// EPOLLWAKEUP
343    pub fn set_wakeup(&mut self, on_or_off: bool) -> i32 {
344        self.turn_on_or_off(libc::EPOLLWAKEUP, on_or_off)
345    }
346    /// EPOLLONESHOT
347    pub fn set_oneshot(&mut self, on_or_off: bool) -> i32 {
348        self.turn_on_or_off(libc::EPOLLONESHOT, on_or_off)
349    }
350    /// EPOLLET
351    pub fn set_et(&mut self, on_or_off: bool) -> i32 {
352        self.turn_on_or_off(libc::EPOLLET, on_or_off)
353    }
354    /// Get the raw u32 Epoll event mask as set in userspace
355    /// This may not have been sent and may be pending send or not committed
356    /// Use [`EpollHandler::prepare_submit()`] after
357    pub fn get_mask_raw(&mut self) -> Option<i32> {
358        self.wants
359    }
360    /// Set the raw u32 Epoll event mask in the userspace
361    /// *WARNING*: Ensure this is valid per epoll.h of your kernel
362    /// Use [`EpollHandler::prepare_submit()`] after
363    pub fn set_mask_raw(&mut self, mask: i32) {
364        self.wants = Some(mask);
365    }
366    /// Get the pending eq u32 Epoll
367    /// This may not be committed into kernel yet use get_committed to check
368    /// This will be none if there is no pending change or it has not been sent
369    /// Use [`EpollHandler::prepare_submit()`] after
370    pub fn get_pending(&self) -> Option<i32> {
371        self.pending
372    }
373    // /// Get the committed raw u32 Epoll event mask if any
374    // /// This represents the state that has been confirmed by the kernel
375    // /// Use [`EpollHandler::commit()`] to commit all pending changes if any
376    // TODO: handle this internally
377    //pub fn get_committed(&self) -> Option<i32> {
378    //    self.committed
379    //}
380}
381
382/// Represents Submission queue results as described in [`EpollHandler::commit()`]
383#[derive(Debug)]
384pub struct FdCommitResults<'fd> {
385    pub(crate) new_commits: u32,
386    pub(crate) change_commits: u32,
387    pub(crate) no_change: u32,
388    pub(crate) empty: u32,
389    pub(crate) errors_on_submit: Vec<&'fd HandledFd>,
390}
391
392impl<'fd> FdCommitResults<'fd> {
393    /// How many new EPOLL_ADD entries in SubmissionQueue
394    pub fn count_new(&self) -> u32 {
395        self.new_commits
396    }
397    /// How many nw EPOLL_MOD entries in SubmissionQueue
398    pub fn count_changes(&self) -> u32 {
399        self.change_commits
400    }
401    /// How many handles did not see any changes
402    pub fn count_no_changes(&self) -> u32 {
403        self.no_change
404    }
405    /// How many handles are empty
406    pub fn count_empty(&self) -> u32 {
407        self.empty
408    }
409    /// How many updates gave error upon submission
410    pub fn errors(&'fd self) -> &'fd Vec<&'fd HandledFd> {
411        &self.errors_on_submit
412    }
413}
414
415#[cfg(test)]
416mod test {
417    use super::HandledFd;
418    use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
419    use std::os::fd::AsRawFd;
420
421    fn handle_fd() -> HandledFd {
422        let s =
423            TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0)).unwrap();
424        HandledFd::new(s.as_raw_fd())
425    }
426
427    #[test]
428    fn mask_fd_inouts() {
429        let mut fd = handle_fd();
430        fd.set_in(true);
431        assert_eq!(fd.set_out(true), 5);
432        assert_eq!(fd.set_in(false), 4);
433        assert_eq!(fd.set_out(false), 0);
434    }
435    #[test]
436    fn mask_fd_in() {
437        let mut fd = handle_fd();
438        assert_eq!(fd.set_in(true), 1);
439        assert_eq!(fd.set_in(false), 0);
440    }
441    #[test]
442    fn mask_fd_pri() {
443        let mut fd = handle_fd();
444        assert_eq!(fd.set_pri(true), 2);
445        assert_eq!(fd.set_pri(false), 0);
446    }
447    #[test]
448    fn mask_fd_out() {
449        let mut fd = handle_fd();
450        assert_eq!(fd.set_out(true), 4);
451        assert_eq!(fd.set_out(false), 0);
452    }
453    #[test]
454    fn mask_fd_err() {
455        let mut fd = handle_fd();
456        assert_eq!(fd.set_err(true), 8);
457        assert_eq!(fd.set_err(false), 0);
458    }
459    #[test]
460    fn mask_fd_hup() {
461        let mut fd = handle_fd();
462        assert_eq!(fd.set_hup(true), 0x00000010);
463        assert_eq!(fd.set_hup(false), 0);
464    }
465    #[test]
466    fn mask_fd_rdnorm() {
467        let mut fd = handle_fd();
468        assert_eq!(fd.set_rdnorm(true), 0x00000040);
469        assert_eq!(fd.set_rdnorm(false), 0);
470    }
471    #[test]
472    fn mask_fd_rdband() {
473        let mut fd = handle_fd();
474        assert_eq!(fd.set_rdband(true), 0x00000080);
475        assert_eq!(fd.set_rdband(false), 0);
476    }
477    #[test]
478    fn mask_fd_wrnorm() {
479        let mut fd = handle_fd();
480        assert_eq!(fd.set_wrnorm(true), 0x00000100);
481        assert_eq!(fd.set_wrnorm(false), 0);
482    }
483    #[test]
484    fn mask_fd_wrband() {
485        let mut fd = handle_fd();
486        assert_eq!(fd.set_wrband(true), 0x00000200);
487        assert_eq!(fd.set_wrband(false), 0);
488    }
489    #[test]
490    fn mask_fd_msg() {
491        let mut fd = handle_fd();
492        assert_eq!(fd.set_msg(true), 0x00000400);
493        assert_eq!(fd.set_msg(false), 0);
494    }
495    #[test]
496    fn mask_fd_rdhup() {
497        let mut fd = handle_fd();
498        assert_eq!(fd.set_rdhup(true), 0x00002000);
499        assert_eq!(fd.set_rdhup(false), 0);
500    }
501    #[test]
502    fn mask_fd_wakeup() {
503        let mut fd = handle_fd();
504        assert_eq!(fd.set_wakeup(true), 0x20000000);
505        assert_eq!(fd.set_wakeup(false), 0);
506    }
507    #[test]
508    fn mask_fd_oneshot() {
509        let mut fd = handle_fd();
510        assert_eq!(fd.set_oneshot(true), 0x40000000);
511        assert_eq!(fd.set_oneshot(false), 0);
512    }
513    #[test]
514    fn mask_fd_et() {
515        let mut fd = handle_fd();
516        assert_eq!(fd.set_et(true) as u32, 0x80000000);
517        assert_eq!(fd.set_et(false), 0);
518    }
519}