pub struct EpollHandler<'fd> { /* private fields */ }Expand description
Manage the io_uring Submission and Completion Queues related to EpollCtrl opcode in io_uring. See io_epoll(7): https://man7.org/linux/man-pages/man7/epoll.7.html
Implementations§
Source§impl<'fd> EpollHandler<'fd>
impl<'fd> EpollHandler<'fd>
Sourcepub fn new(capacity: u32) -> Result<Self, EpollHandlerError>
pub fn new(capacity: u32) -> Result<Self, EpollHandlerError>
Create a new handler with new io-uring::IoUring
capacity must be power of two as per io-uring documentation
use io_uring_epoll::EpollHandler;
EpollHandler::new(10).expect("Unable to create EPoll Handler");Examples found in repository?
5fn main() {
6 // The 10 denotes power of two capacity to io_uring::IoUring
7 let mut handler = EpollHandler::new(10).expect("Unable to create EPoll Handler");
8
9 // This works with any impl that provides std::os::fd::AsRawFd impl
10 // In POSIX/UNIX-like it's just i32 file number or "fileno"
11 let listen =
12 std::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0))
13 .unwrap();
14
15 // Add the listen handle into EpollHandler
16 let mut handle_fd = HandledFd::new(listen.as_raw_fd());
17 let set_mask = handle_fd.set_in(true);
18 assert_eq!(set_mask, 1);
19 handler.add_fd(&handle_fd).unwrap();
20
21 // Prepare a commit all changes into io_uring::SubmissionQueue
22 let handle_status = handler.prepare_submit().unwrap();
23 assert_eq!(handle_status.count_new(), 1);
24 assert_eq!(handle_status.count_changes(), 0);
25 assert_eq!(handle_status.count_empty(), 0);
26 assert_eq!(handle_status.errors().len(), 0);
27
28 // Take temp ref to io_uring::SubmissionQeueue
29 let submission = handler.io_uring().submission();
30 assert_eq!(submission.len(), 1);
31 assert_eq!(submission.is_empty(), false);
32 assert_eq!(submission.dropped(), 0);
33 assert_eq!(submission.cq_overflow(), false);
34 assert_eq!(submission.is_full(), false);
35 drop(submission);
36
37 // async version is with submit()
38 handler.submit_and_wait(1).unwrap();
39
40 // Ensure that the kernel ate it
41 let submission = handler.io_uring().submission();
42 assert_eq!(submission.len(), 0);
43 assert_eq!(submission.is_empty(), true);
44 assert_eq!(submission.dropped(), 0);
45 assert_eq!(submission.cq_overflow(), false);
46 assert_eq!(submission.is_full(), false);
47 drop(submission);
48
49 let c_queue = handler.io_uring().completion();
50 let mut c_attempts = 0;
51 loop {
52 if c_queue.is_empty() == false {
53 assert_eq!(c_queue.len(), 1);
54 break;
55 }
56 if c_attempts == 10 {
57 panic!("Took more than 100 ms - completion never finished?");
58 }
59 std::thread::sleep(std::time::Duration::from_millis(10));
60 c_attempts += 1;
61 }
62}Sourcepub fn from_io_uring(
iou: IoUring<Entry, Entry>,
) -> Result<Self, EpollHandlerError>
pub fn from_io_uring( iou: IoUring<Entry, Entry>, ) -> Result<Self, EpollHandlerError>
Create a new handler from an existing io-uring::IoUring builder To construct a custom IoUring see io-uring Builder: https://docs.rs/io-uring/latest/io_uring/struct.Builder.html
Example:
use io_uring::IoUring;
use io_uring_epoll::EpollHandler;
let mut iou: IoUring<io_uring::squeue::Entry, io_uring::cqueue::Entry>
= IoUring::builder()
.build(100)
.expect("Unable to build IoUring");
EpollHandler::from_io_uring(iou).expect("Unable to create from io_uring Builder");Sourcepub fn io_uring(&mut self) -> &mut IoUring<Entry, Entry>
pub fn io_uring(&mut self) -> &mut IoUring<Entry, Entry>
Borrow the underlying io-uring::IoUring instance
Examples found in repository?
5fn main() {
6 // The 10 denotes power of two capacity to io_uring::IoUring
7 let mut handler = EpollHandler::new(10).expect("Unable to create EPoll Handler");
8
9 // This works with any impl that provides std::os::fd::AsRawFd impl
10 // In POSIX/UNIX-like it's just i32 file number or "fileno"
11 let listen =
12 std::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0))
13 .unwrap();
14
15 // Add the listen handle into EpollHandler
16 let mut handle_fd = HandledFd::new(listen.as_raw_fd());
17 let set_mask = handle_fd.set_in(true);
18 assert_eq!(set_mask, 1);
19 handler.add_fd(&handle_fd).unwrap();
20
21 // Prepare a commit all changes into io_uring::SubmissionQueue
22 let handle_status = handler.prepare_submit().unwrap();
23 assert_eq!(handle_status.count_new(), 1);
24 assert_eq!(handle_status.count_changes(), 0);
25 assert_eq!(handle_status.count_empty(), 0);
26 assert_eq!(handle_status.errors().len(), 0);
27
28 // Take temp ref to io_uring::SubmissionQeueue
29 let submission = handler.io_uring().submission();
30 assert_eq!(submission.len(), 1);
31 assert_eq!(submission.is_empty(), false);
32 assert_eq!(submission.dropped(), 0);
33 assert_eq!(submission.cq_overflow(), false);
34 assert_eq!(submission.is_full(), false);
35 drop(submission);
36
37 // async version is with submit()
38 handler.submit_and_wait(1).unwrap();
39
40 // Ensure that the kernel ate it
41 let submission = handler.io_uring().submission();
42 assert_eq!(submission.len(), 0);
43 assert_eq!(submission.is_empty(), true);
44 assert_eq!(submission.dropped(), 0);
45 assert_eq!(submission.cq_overflow(), false);
46 assert_eq!(submission.is_full(), false);
47 drop(submission);
48
49 let c_queue = handler.io_uring().completion();
50 let mut c_attempts = 0;
51 loop {
52 if c_queue.is_empty() == false {
53 assert_eq!(c_queue.len(), 1);
54 break;
55 }
56 if c_attempts == 10 {
57 panic!("Took more than 100 ms - completion never finished?");
58 }
59 std::thread::sleep(std::time::Duration::from_millis(10));
60 c_attempts += 1;
61 }
62}Sourcepub fn add_fd(
&mut self,
handled_fd: &'fd HandledFd,
) -> Result<(), EpollHandlerError>
pub fn add_fd( &mut self, handled_fd: &'fd HandledFd, ) -> Result<(), EpollHandlerError>
Add HandledFd
Finally Commit changes to all changed HandledFds with EpollHandler::prepare_submit()
Examples found in repository?
5fn main() {
6 // The 10 denotes power of two capacity to io_uring::IoUring
7 let mut handler = EpollHandler::new(10).expect("Unable to create EPoll Handler");
8
9 // This works with any impl that provides std::os::fd::AsRawFd impl
10 // In POSIX/UNIX-like it's just i32 file number or "fileno"
11 let listen =
12 std::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0))
13 .unwrap();
14
15 // Add the listen handle into EpollHandler
16 let mut handle_fd = HandledFd::new(listen.as_raw_fd());
17 let set_mask = handle_fd.set_in(true);
18 assert_eq!(set_mask, 1);
19 handler.add_fd(&handle_fd).unwrap();
20
21 // Prepare a commit all changes into io_uring::SubmissionQueue
22 let handle_status = handler.prepare_submit().unwrap();
23 assert_eq!(handle_status.count_new(), 1);
24 assert_eq!(handle_status.count_changes(), 0);
25 assert_eq!(handle_status.count_empty(), 0);
26 assert_eq!(handle_status.errors().len(), 0);
27
28 // Take temp ref to io_uring::SubmissionQeueue
29 let submission = handler.io_uring().submission();
30 assert_eq!(submission.len(), 1);
31 assert_eq!(submission.is_empty(), false);
32 assert_eq!(submission.dropped(), 0);
33 assert_eq!(submission.cq_overflow(), false);
34 assert_eq!(submission.is_full(), false);
35 drop(submission);
36
37 // async version is with submit()
38 handler.submit_and_wait(1).unwrap();
39
40 // Ensure that the kernel ate it
41 let submission = handler.io_uring().submission();
42 assert_eq!(submission.len(), 0);
43 assert_eq!(submission.is_empty(), true);
44 assert_eq!(submission.dropped(), 0);
45 assert_eq!(submission.cq_overflow(), false);
46 assert_eq!(submission.is_full(), false);
47 drop(submission);
48
49 let c_queue = handler.io_uring().completion();
50 let mut c_attempts = 0;
51 loop {
52 if c_queue.is_empty() == false {
53 assert_eq!(c_queue.len(), 1);
54 break;
55 }
56 if c_attempts == 10 {
57 panic!("Took more than 100 ms - completion never finished?");
58 }
59 std::thread::sleep(std::time::Duration::from_millis(10));
60 c_attempts += 1;
61 }
62}Sourcepub fn submit(&self) -> Result<usize, EpollHandlerError>
pub fn submit(&self) -> Result<usize, EpollHandlerError>
This calls the underlying io_uring::IoUring::submit submitting all the staged commits
Sourcepub fn submit_and_wait(&self, want: usize) -> Result<usize, EpollHandlerError>
pub fn submit_and_wait(&self, want: usize) -> Result<usize, EpollHandlerError>
Same as submit but using io_uring::IoUring::submit_and_wait
Examples found in repository?
5fn main() {
6 // The 10 denotes power of two capacity to io_uring::IoUring
7 let mut handler = EpollHandler::new(10).expect("Unable to create EPoll Handler");
8
9 // This works with any impl that provides std::os::fd::AsRawFd impl
10 // In POSIX/UNIX-like it's just i32 file number or "fileno"
11 let listen =
12 std::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0))
13 .unwrap();
14
15 // Add the listen handle into EpollHandler
16 let mut handle_fd = HandledFd::new(listen.as_raw_fd());
17 let set_mask = handle_fd.set_in(true);
18 assert_eq!(set_mask, 1);
19 handler.add_fd(&handle_fd).unwrap();
20
21 // Prepare a commit all changes into io_uring::SubmissionQueue
22 let handle_status = handler.prepare_submit().unwrap();
23 assert_eq!(handle_status.count_new(), 1);
24 assert_eq!(handle_status.count_changes(), 0);
25 assert_eq!(handle_status.count_empty(), 0);
26 assert_eq!(handle_status.errors().len(), 0);
27
28 // Take temp ref to io_uring::SubmissionQeueue
29 let submission = handler.io_uring().submission();
30 assert_eq!(submission.len(), 1);
31 assert_eq!(submission.is_empty(), false);
32 assert_eq!(submission.dropped(), 0);
33 assert_eq!(submission.cq_overflow(), false);
34 assert_eq!(submission.is_full(), false);
35 drop(submission);
36
37 // async version is with submit()
38 handler.submit_and_wait(1).unwrap();
39
40 // Ensure that the kernel ate it
41 let submission = handler.io_uring().submission();
42 assert_eq!(submission.len(), 0);
43 assert_eq!(submission.is_empty(), true);
44 assert_eq!(submission.dropped(), 0);
45 assert_eq!(submission.cq_overflow(), false);
46 assert_eq!(submission.is_full(), false);
47 drop(submission);
48
49 let c_queue = handler.io_uring().completion();
50 let mut c_attempts = 0;
51 loop {
52 if c_queue.is_empty() == false {
53 assert_eq!(c_queue.len(), 1);
54 break;
55 }
56 if c_attempts == 10 {
57 panic!("Took more than 100 ms - completion never finished?");
58 }
59 std::thread::sleep(std::time::Duration::from_millis(10));
60 c_attempts += 1;
61 }
62}Sourcepub fn prepare_submit(
&mut self,
) -> Result<FdCommitResults<'fd>, EpollHandlerError>
pub fn prepare_submit( &mut self, ) -> Result<FdCommitResults<'fd>, EpollHandlerError>
Stage commit of all changed HandledFd resources to submission queue
This will make all changes within the underlying io_uring::squeue::SubmissionQueue
Errors related on any individual staged Fds are available via FdCommitResults
Use the Self::submit or Self::submit_and_wait to push the commits to kernel
Examples found in repository?
5fn main() {
6 // The 10 denotes power of two capacity to io_uring::IoUring
7 let mut handler = EpollHandler::new(10).expect("Unable to create EPoll Handler");
8
9 // This works with any impl that provides std::os::fd::AsRawFd impl
10 // In POSIX/UNIX-like it's just i32 file number or "fileno"
11 let listen =
12 std::net::TcpListener::bind(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0))
13 .unwrap();
14
15 // Add the listen handle into EpollHandler
16 let mut handle_fd = HandledFd::new(listen.as_raw_fd());
17 let set_mask = handle_fd.set_in(true);
18 assert_eq!(set_mask, 1);
19 handler.add_fd(&handle_fd).unwrap();
20
21 // Prepare a commit all changes into io_uring::SubmissionQueue
22 let handle_status = handler.prepare_submit().unwrap();
23 assert_eq!(handle_status.count_new(), 1);
24 assert_eq!(handle_status.count_changes(), 0);
25 assert_eq!(handle_status.count_empty(), 0);
26 assert_eq!(handle_status.errors().len(), 0);
27
28 // Take temp ref to io_uring::SubmissionQeueue
29 let submission = handler.io_uring().submission();
30 assert_eq!(submission.len(), 1);
31 assert_eq!(submission.is_empty(), false);
32 assert_eq!(submission.dropped(), 0);
33 assert_eq!(submission.cq_overflow(), false);
34 assert_eq!(submission.is_full(), false);
35 drop(submission);
36
37 // async version is with submit()
38 handler.submit_and_wait(1).unwrap();
39
40 // Ensure that the kernel ate it
41 let submission = handler.io_uring().submission();
42 assert_eq!(submission.len(), 0);
43 assert_eq!(submission.is_empty(), true);
44 assert_eq!(submission.dropped(), 0);
45 assert_eq!(submission.cq_overflow(), false);
46 assert_eq!(submission.is_full(), false);
47 drop(submission);
48
49 let c_queue = handler.io_uring().completion();
50 let mut c_attempts = 0;
51 loop {
52 if c_queue.is_empty() == false {
53 assert_eq!(c_queue.len(), 1);
54 break;
55 }
56 if c_attempts == 10 {
57 panic!("Took more than 100 ms - completion never finished?");
58 }
59 std::thread::sleep(std::time::Duration::from_millis(10));
60 c_attempts += 1;
61 }
62}