1#![warn(
2 clippy::unwrap_used,
3 missing_docs,
4 rust_2018_idioms,
5 unused_lifetimes,
6 unused_qualifications
7)]
8#![doc = include_str!("../README.md")]
9
10use io_uring::opcode::EpollCtl;
11use io_uring::IoUring;
12
13use std::collections::HashMap;
14use std::collections::VecDeque;
15use std::os::fd::RawFd;
16
17#[derive(Debug)]
19pub enum EpollHandlerError {
20 IoUringCreate(String),
22 EpollCreate1(String),
24 NotSupported,
26 Probing(String),
28 Duplicate,
30 Submission(String),
32}
33
34impl core::fmt::Display for EpollHandlerError {
35 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
36 match self {
37 Self::IoUringCreate(s) => write!(f, "IoUring Create: {}", s),
38 Self::EpollCreate1(s) => write!(f, "epoll_create1(): {}", s),
39 Self::NotSupported => write!(
40 f,
41 "EpollCtl io_uring OpCode is not supported in your Kernel"
42 ),
43 Self::Probing(s) => write!(
44 f,
45 "Error whilst probing EpollCtl support from kernel: {}",
46 s
47 ),
48 Self::Duplicate => write!(f, "The filehandle is already maped in. Possible duplicate?"),
49 Self::Submission(s) => write!(f, "Submission: {}", s),
50 }
51 }
52}
53
54pub struct EpollHandler<'fd> {
59 pub(crate) epfd: u32,
60 pub(crate) io_uring: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>,
61 pub(crate) in_flight: u32,
62 pub(crate) fds: HashMap<i32, &'fd HandledFd>,
63 pub(crate) submit_counter: u64,
64}
65
66impl<'fd> EpollHandler<'fd> {
67 pub fn new(capacity: u32) -> Result<Self, EpollHandlerError> {
76 let iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry> = IoUring::builder()
77 .build(capacity)
78 .map_err(|e| EpollHandlerError::IoUringCreate(e.to_string()))?;
79
80 Self::from_io_uring(iou)
81 }
82 pub fn from_io_uring(
99 iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>,
100 ) -> Result<Self, EpollHandlerError> {
101 let mut epoll_probe = io_uring::Probe::new();
102 iou.submitter()
103 .register_probe(&mut epoll_probe)
104 .map_err(|e| EpollHandlerError::Probing(e.to_string()))?;
105
106 #[allow(clippy::bool_comparison)]
107 if epoll_probe.is_supported(EpollCtl::CODE) == false {
108 return Err(EpollHandlerError::NotSupported);
109 }
110
111 let epfd = unsafe { libc::epoll_create1(0) };
113 if epfd == -1 {
114 let errno = unsafe { libc::__errno_location() };
116 return Err(EpollHandlerError::EpollCreate1(format!(
117 "errno: {:?}",
118 errno
119 )));
120 }
121
122 Ok(Self {
123 epfd: epfd as u32,
124 io_uring: iou,
125 in_flight: 0,
126 submit_counter: 0,
127 fds: HashMap::new(),
128 })
129 }
130 pub fn io_uring(&mut self) -> &mut IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry> {
132 &mut self.io_uring
133 }
134 pub fn add_fd(&mut self, handled_fd: &'fd HandledFd) -> Result<(), EpollHandlerError> {
137 self.fds.insert(handled_fd.fd, handled_fd);
138 Ok(())
139 }
140 pub fn submit(&self) -> Result<usize, EpollHandlerError> {
142 self.io_uring
143 .submit()
144 .map_err(|e| EpollHandlerError::Submission(e.to_string()))
145 }
146 pub fn submit_and_wait(&self, want: usize) -> Result<usize, EpollHandlerError> {
148 self.io_uring
149 .submit_and_wait(want)
150 .map_err(|e| EpollHandlerError::Submission(e.to_string()))
151 }
152 pub fn prepare_submit(&mut self) -> Result<FdCommitResults<'fd>, EpollHandlerError> {
157 let mut fd_commit_results = FdCommitResults {
158 new_commits: 0,
159 change_commits: 0,
160 no_change: 0,
161 empty: 0,
162 errors_on_submit: vec![],
163 };
164
165 let iou = &mut self.io_uring;
168
169 let mut s_queue = iou.submission();
170
171 let mut updates: VecDeque<HandledFd> = VecDeque::new();
172
173 for (_, handled_fd) in self.fds.iter() {
174 let mut new_fd = (**handled_fd).clone();
175
176 let mut commit_new: Option<i32> = None;
177 let mut epoll_op = EPOLL_CTL_MOD;
178 if handled_fd.wants.is_none() && handled_fd.committed.is_some() {
179 epoll_op = EPOLL_CTL_DEL;
180 commit_new = Some(0);
181 }
182 if let Some(fd_wants) = handled_fd.wants {
183 match handled_fd.committed {
184 None => {
185 commit_new = Some(handled_fd.wants.unwrap_or(0));
186 epoll_op = EPOLL_CTL_ADD;
187 fd_commit_results.new_commits += 1;
188 }
189 Some(committed) => {
190 if committed != fd_wants {
191 commit_new = Some(fd_wants);
192 fd_commit_results.change_commits += 1;
193 } else {
194 fd_commit_results.no_change += 1;
195 }
196 }
197 }
198 if let Some(commit) = commit_new {
199 self.submit_counter += 1;
200
201 let epoll_event = libc::epoll_event {
202 events: commit as u32,
203 u64: self.submit_counter,
204 };
205
206 let uring_submission_rec = EpollCtl::new(
207 io_uring::types::Fixed(self.epfd),
208 io_uring::types::Fd(handled_fd.fd),
209 epoll_op,
210 std::ptr::addr_of!(epoll_event) as *const io_uring::types::epoll_event,
211 )
212 .build();
213
214 let p_result = unsafe { s_queue.push(&uring_submission_rec) };
216 match p_result {
217 Err(e) => {
218 new_fd.error = Some(e.to_string());
219 fd_commit_results.errors_on_submit.push(handled_fd);
220 }
221 Ok(_) => {
222 new_fd.current_submission = Some(self.submit_counter);
223 self.in_flight += 1;
224 }
225 }
226 }
227 } else {
228 fd_commit_results.empty += 1;
230 }
231
232 if new_fd != **handled_fd {
234 updates.push_back(new_fd);
235 }
236 }
237
238 while let Some(update) = updates.pop_front() {
240 let fd_get: Option<&&HandledFd> = self.fds.get(&update.fd);
241 if let Some(unwrapped_fd) = fd_get {
242 let mut pinned_mut: std::pin::Pin<&mut &HandledFd> = std::pin::pin!(*unwrapped_fd);
243 let pinned_update: std::pin::Pin<&mut &HandledFd> = std::pin::pin!(&update);
244 let _st = std::mem::replace(&mut pinned_mut, pinned_update);
245 }
246 }
247
248 Ok(fd_commit_results)
249 }
250}
251
252#[derive(Debug, Clone, PartialEq)]
257pub struct HandledFd {
258 pub(crate) fd: RawFd,
259 pub(crate) wants: Option<i32>,
260 pub(crate) pending: Option<i32>,
261 pub(crate) committed: Option<i32>,
262 pub(crate) error: Option<String>,
263 pub(crate) current_submission: Option<u64>,
264}
265
266const EPOLL_CTL_ADD: i32 = 1;
267const EPOLL_CTL_DEL: i32 = 2;
268const EPOLL_CTL_MOD: i32 = 3;
269
270impl HandledFd {
271 pub fn new(fd: RawFd) -> Self {
274 HandledFd {
275 fd,
276 wants: None,
277 committed: None,
278 current_submission: None,
279 error: None,
280 pending: None,
281 }
282 }
283 pub fn as_raw(&self) -> RawFd {
285 self.fd
286 }
287 fn turn_on_or_off(&mut self, mask_in: i32, on_or_off: bool) -> i32 {
289 let cur_wants: i32 = self.wants.unwrap_or(0);
290 self.wants = match on_or_off {
291 true => Some(cur_wants | mask_in),
292 false => Some(cur_wants ^ mask_in),
293 };
294 self.wants.unwrap_or(0)
295 }
296 pub fn set_in(&mut self, on_or_off: bool) -> i32 {
300 self.turn_on_or_off(libc::EPOLLIN, on_or_off)
301 }
302 pub fn set_pri(&mut self, on_or_off: bool) -> i32 {
304 self.turn_on_or_off(libc::EPOLLPRI, on_or_off)
305 }
306 pub fn set_out(&mut self, on_or_off: bool) -> i32 {
308 self.turn_on_or_off(libc::EPOLLOUT, on_or_off)
309 }
310 pub fn set_err(&mut self, on_or_off: bool) -> i32 {
312 self.turn_on_or_off(libc::EPOLLERR, on_or_off)
313 }
314 pub fn set_hup(&mut self, on_or_off: bool) -> i32 {
316 self.turn_on_or_off(libc::EPOLLHUP, on_or_off)
317 }
318 pub fn set_rdnorm(&mut self, on_or_off: bool) -> i32 {
320 self.turn_on_or_off(libc::EPOLLRDNORM, on_or_off)
321 }
322 pub fn set_rdband(&mut self, on_or_off: bool) -> i32 {
324 self.turn_on_or_off(libc::EPOLLRDBAND, on_or_off)
325 }
326 pub fn set_wrnorm(&mut self, on_or_off: bool) -> i32 {
328 self.turn_on_or_off(libc::EPOLLWRNORM, on_or_off)
329 }
330 pub fn set_wrband(&mut self, on_or_off: bool) -> i32 {
332 self.turn_on_or_off(libc::EPOLLWRBAND, on_or_off)
333 }
334 pub fn set_msg(&mut self, on_or_off: bool) -> i32 {
336 self.turn_on_or_off(libc::EPOLLMSG, on_or_off)
337 }
338 pub fn set_rdhup(&mut self, on_or_off: bool) -> i32 {
340 self.turn_on_or_off(libc::EPOLLRDHUP, on_or_off)
341 }
342 pub fn set_wakeup(&mut self, on_or_off: bool) -> i32 {
344 self.turn_on_or_off(libc::EPOLLWAKEUP, on_or_off)
345 }
346 pub fn set_oneshot(&mut self, on_or_off: bool) -> i32 {
348 self.turn_on_or_off(libc::EPOLLONESHOT, on_or_off)
349 }
350 pub fn set_et(&mut self, on_or_off: bool) -> i32 {
352 self.turn_on_or_off(libc::EPOLLET, on_or_off)
353 }
354 pub fn get_mask_raw(&mut self) -> Option<i32> {
358 self.wants
359 }
360 pub fn set_mask_raw(&mut self, mask: i32) {
364 self.wants = Some(mask);
365 }
366 pub fn get_pending(&self) -> Option<i32> {
371 self.pending
372 }
373 }
381
382#[derive(Debug)]
384pub struct FdCommitResults<'fd> {
385 pub(crate) new_commits: u32,
386 pub(crate) change_commits: u32,
387 pub(crate) no_change: u32,
388 pub(crate) empty: u32,
389 pub(crate) errors_on_submit: Vec<&'fd HandledFd>,
390}
391
392impl<'fd> FdCommitResults<'fd> {
393 pub fn count_new(&self) -> u32 {
395 self.new_commits
396 }
397 pub fn count_changes(&self) -> u32 {
399 self.change_commits
400 }
401 pub fn count_no_changes(&self) -> u32 {
403 self.no_change
404 }
405 pub fn count_empty(&self) -> u32 {
407 self.empty
408 }
409 pub fn errors(&'fd self) -> &'fd Vec<&'fd HandledFd> {
411 &self.errors_on_submit
412 }
413}
414
415#[cfg(test)]
416mod test {
417 use super::HandledFd;
418 use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener};
419 use std::os::fd::AsRawFd;
420
421 fn handle_fd() -> HandledFd {
422 let s =
423 TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0)).unwrap();
424 HandledFd::new(s.as_raw_fd())
425 }
426
427 #[test]
428 fn mask_fd_inouts() {
429 let mut fd = handle_fd();
430 fd.set_in(true);
431 assert_eq!(fd.set_out(true), 5);
432 assert_eq!(fd.set_in(false), 4);
433 assert_eq!(fd.set_out(false), 0);
434 }
435 #[test]
436 fn mask_fd_in() {
437 let mut fd = handle_fd();
438 assert_eq!(fd.set_in(true), 1);
439 assert_eq!(fd.set_in(false), 0);
440 }
441 #[test]
442 fn mask_fd_pri() {
443 let mut fd = handle_fd();
444 assert_eq!(fd.set_pri(true), 2);
445 assert_eq!(fd.set_pri(false), 0);
446 }
447 #[test]
448 fn mask_fd_out() {
449 let mut fd = handle_fd();
450 assert_eq!(fd.set_out(true), 4);
451 assert_eq!(fd.set_out(false), 0);
452 }
453 #[test]
454 fn mask_fd_err() {
455 let mut fd = handle_fd();
456 assert_eq!(fd.set_err(true), 8);
457 assert_eq!(fd.set_err(false), 0);
458 }
459 #[test]
460 fn mask_fd_hup() {
461 let mut fd = handle_fd();
462 assert_eq!(fd.set_hup(true), 0x00000010);
463 assert_eq!(fd.set_hup(false), 0);
464 }
465 #[test]
466 fn mask_fd_rdnorm() {
467 let mut fd = handle_fd();
468 assert_eq!(fd.set_rdnorm(true), 0x00000040);
469 assert_eq!(fd.set_rdnorm(false), 0);
470 }
471 #[test]
472 fn mask_fd_rdband() {
473 let mut fd = handle_fd();
474 assert_eq!(fd.set_rdband(true), 0x00000080);
475 assert_eq!(fd.set_rdband(false), 0);
476 }
477 #[test]
478 fn mask_fd_wrnorm() {
479 let mut fd = handle_fd();
480 assert_eq!(fd.set_wrnorm(true), 0x00000100);
481 assert_eq!(fd.set_wrnorm(false), 0);
482 }
483 #[test]
484 fn mask_fd_wrband() {
485 let mut fd = handle_fd();
486 assert_eq!(fd.set_wrband(true), 0x00000200);
487 assert_eq!(fd.set_wrband(false), 0);
488 }
489 #[test]
490 fn mask_fd_msg() {
491 let mut fd = handle_fd();
492 assert_eq!(fd.set_msg(true), 0x00000400);
493 assert_eq!(fd.set_msg(false), 0);
494 }
495 #[test]
496 fn mask_fd_rdhup() {
497 let mut fd = handle_fd();
498 assert_eq!(fd.set_rdhup(true), 0x00002000);
499 assert_eq!(fd.set_rdhup(false), 0);
500 }
501 #[test]
502 fn mask_fd_wakeup() {
503 let mut fd = handle_fd();
504 assert_eq!(fd.set_wakeup(true), 0x20000000);
505 assert_eq!(fd.set_wakeup(false), 0);
506 }
507 #[test]
508 fn mask_fd_oneshot() {
509 let mut fd = handle_fd();
510 assert_eq!(fd.set_oneshot(true), 0x40000000);
511 assert_eq!(fd.set_oneshot(false), 0);
512 }
513 #[test]
514 fn mask_fd_et() {
515 let mut fd = handle_fd();
516 assert_eq!(fd.set_et(true) as u32, 0x80000000);
517 assert_eq!(fd.set_et(false), 0);
518 }
519}