io_engine/driver/
uring.rs1use crate::context::IoCtxShared;
2use crate::tasks::{IOAction, IOEvent, IoCallback};
3use crossfire::BlockingRxTrait;
4use io_uring::{IoUring, opcode, types};
5use log::{error, info};
6use std::{cell::UnsafeCell, io, marker::PhantomData, sync::Arc, thread, time::Duration};
7
8const URING_EXIT_SIGNAL_USER_DATA: u64 = u64::MAX;
9
10pub struct UringDriver<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>>> {
11 _marker: PhantomData<(C, Q)>,
12}
13
14struct UringInner(UnsafeCell<IoUring>);
15
16unsafe impl Send for UringInner {}
17unsafe impl Sync for UringInner {}
18
19impl<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>> + Send + 'static> UringDriver<C, Q> {
20 pub fn start(ctx: Arc<IoCtxShared<C, Q>>) -> io::Result<()> {
21 let depth = ctx.depth as u32;
22 let ring = IoUring::new(depth.max(2))?;
23 let ring_arc = Arc::new(UringInner(UnsafeCell::new(ring)));
24 let ring_submit = ring_arc.clone();
25 let ring_complete = ring_arc.clone();
26 let ctx_submit = ctx.clone();
27 let ctx_complete = ctx.clone();
28 thread::spawn(move || {
29 Self::submit(ctx_submit, ring_submit);
30 });
31 thread::spawn(move || {
32 Self::complete(ctx_complete, ring_complete);
33 });
34
35 Ok(())
36 }
37
38 fn submit(ctx: Arc<IoCtxShared<C, Q>>, ring_arc: Arc<UringInner>) {
39 info!("io_uring submitter thread start");
40 let depth = ctx.depth;
41 let exit_sent = false;
42
43 let ring = unsafe { &mut *ring_arc.0.get() };
44
45 loop {
46 let mut events = Vec::with_capacity(depth);
48
49 match ctx.queue.recv() {
50 Ok(event) => events.push(event),
51 Err(_) => {
52 if !exit_sent {
53 let nop_sqe =
54 opcode::Nop::new().build().user_data(URING_EXIT_SIGNAL_USER_DATA);
55
56 {
57 let mut sq = ring.submission();
58 unsafe {
59 if sq.push(&nop_sqe).is_err() {
60 drop(sq);
61 let _ = ring.submit();
62 let mut sq = ring.submission();
63 let _ = sq.push(&nop_sqe);
64 }
65 }
66 }
67 info!("io_uring submitter sent exit signal");
68 }
69 break;
70 }
71 }
72
73 {
74 let sq = ring.submission();
75 if sq.is_full() {
76 drop(sq);
77 let _ = ring.submit();
78 }
79 }
80
81 while events.len() < depth {
82 match ctx.queue.try_recv() {
83 Ok(event) => events.push(event),
84 Err(_) => break,
85 }
86 }
87
88 if !events.is_empty() {
90 {
91 let mut sq = ring.submission();
92 for event in events {
93 let event = event;
94 let fd = event.fd;
95 let offset = event.offset as u64;
96 let buf_slice = event.get_buf_ref();
97 let buf_len = buf_slice.len() as u32;
98 let buf_ptr = buf_slice.as_ptr() as *mut u8;
99
100 let sqe = match event.action {
101 IOAction::Read => opcode::Read::new(types::Fd(fd), buf_ptr, buf_len)
102 .offset(offset)
103 .build(),
104 IOAction::Write => opcode::Write::new(types::Fd(fd), buf_ptr, buf_len)
105 .offset(offset)
106 .build(),
107 };
108 let user_data = Box::into_raw(event) as u64;
109 let sqe = sqe.user_data(user_data);
110 unsafe {
111 if let Err(_) = sq.push(&sqe) {
112 error!("SQ full (should not happen)");
113 let _ = Box::from_raw(user_data as *mut IOEvent<C>);
114 }
115 }
116 }
117 }
118
119 if let Err(e) = ring.submit() {
120 error!("io_uring submit error: {:?}", e);
121 }
122 }
123 }
124 info!("io_uring submitter thread exit");
125 }
126
127 fn complete(ctx: Arc<IoCtxShared<C, Q>>, ring_arc: Arc<UringInner>) {
128 info!("io_uring completer thread start");
129
130 let ring = unsafe { &mut *ring_arc.0.get() };
131
132 loop {
133 match ring.submit_and_wait(1) {
134 Ok(_) => {
135 let mut exit_received = false;
136 {
137 let mut cq = ring.completion();
138 cq.sync();
139 for cqe in cq {
140 let user_data = cqe.user_data();
141
142 if user_data == URING_EXIT_SIGNAL_USER_DATA {
143 info!("io_uring completer received exit signal");
144 exit_received = true;
145 continue;
146 }
147
148 let event_ptr = user_data as *mut IOEvent<C>;
149 let mut event = unsafe { Box::from_raw(event_ptr) };
150 let res = cqe.result();
151 if res >= 0 {
152 event.set_copied(res as usize);
153 } else {
154 event.set_error(-res);
155 }
156 ctx.cb_workers.send(event);
157 }
158 }
159 if exit_received {
160 break;
161 }
162 }
163 Err(e) => {
164 error!("io_uring submit_and_wait error: {:?}", e);
165 thread::sleep(Duration::from_millis(10));
166 }
167 }
168 }
169 info!("io_uring completer thread exit");
170 }
171}