io_engine/driver/
uring.rs1use crate::callback_worker::Worker;
2use crate::context::CtxShared;
3use crate::tasks::{IOAction, IOCallback, IOEvent, IOEvent_};
4use crossfire::BlockingRxTrait;
5use io_uring::{IoUring, opcode, types};
6use log::{error, info};
7use std::{cell::UnsafeCell, io, marker::PhantomData, sync::Arc, thread, time::Duration};
8
9const URING_EXIT_SIGNAL_USER_DATA: u64 = u64::MAX;
10
11pub struct UringDriver<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>>, W: Worker<C>> {
12 _marker: PhantomData<(C, Q, W)>,
13}
14
15struct UringInner(UnsafeCell<IoUring>);
16
17unsafe impl Send for UringInner {}
18unsafe impl Sync for UringInner {}
19
20impl<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>> + Send + 'static, W: Worker<C> + Send + 'static>
21 UringDriver<C, Q, W>
22{
23 pub fn start(ctx: Arc<CtxShared<C, Q, W>>) -> io::Result<()> {
24 let depth = ctx.depth as u32;
25 let ring = IoUring::new(depth.max(2))?;
26 let ring_arc = Arc::new(UringInner(UnsafeCell::new(ring)));
27 let ring_submit = ring_arc.clone();
28 let ring_complete = ring_arc.clone();
29 let ctx_submit = ctx.clone();
30 let ctx_complete = ctx.clone();
31 thread::spawn(move || {
32 Self::submit(ctx_submit, ring_submit);
33 });
34 thread::spawn(move || {
35 Self::complete(ctx_complete, ring_complete);
36 });
37
38 Ok(())
39 }
40
41 fn submit(ctx: Arc<CtxShared<C, Q, W>>, ring_arc: Arc<UringInner>) {
42 info!("io_uring submitter thread start");
43 let depth = ctx.depth;
44 let exit_sent = false;
45
46 let ring = unsafe { &mut *ring_arc.0.get() };
47
48 loop {
49 let mut events = Vec::with_capacity(depth);
51
52 match ctx.queue.recv() {
53 Ok(event) => events.push(event),
54 Err(_) => {
55 if !exit_sent {
56 let nop_sqe =
57 opcode::Nop::new().build().user_data(URING_EXIT_SIGNAL_USER_DATA);
58
59 {
60 let mut sq = ring.submission();
61 unsafe {
62 if sq.push(&nop_sqe).is_err() {
63 drop(sq);
64 let _ = ring.submit();
65 let mut sq = ring.submission();
66 let _ = sq.push(&nop_sqe);
67 }
68 }
69 }
70 info!("io_uring submitter sent exit signal");
71 }
72 break;
73 }
74 }
75
76 {
77 let sq = ring.submission();
78 if sq.is_full() {
79 drop(sq);
80 let _ = ring.submit();
81 }
82 }
83
84 while events.len() < depth {
85 match ctx.queue.try_recv() {
86 Ok(event) => events.push(event),
87 Err(_) => break,
88 }
89 }
90
91 if !events.is_empty() {
93 {
94 let mut sq = ring.submission();
95 for event in events {
96 let event = event;
97 let fd = event.fd;
98 let buf_slice = event.get_buf_ref();
99
100 let (offset, buf_ptr, buf_len) = if event.res > 0 {
101 let progress = event.res as u64;
102 (
103 event.offset as u64 + progress,
104 unsafe { (buf_slice.as_ptr() as *mut u8).add(progress as usize) },
105 (buf_slice.len() as u64 - progress) as u32,
106 )
107 } else {
108 (
109 event.offset as u64,
110 buf_slice.as_ptr() as *mut u8,
111 buf_slice.len() as u32,
112 )
113 };
114
115 let sqe = match event.action {
116 IOAction::Read => opcode::Read::new(types::Fd(fd), buf_ptr, buf_len)
117 .offset(offset)
118 .build(),
119 IOAction::Write => opcode::Write::new(types::Fd(fd), buf_ptr, buf_len)
120 .offset(offset)
121 .build(),
122 };
123 let user_data = Box::into_raw(event.0) as u64;
124 let sqe = sqe.user_data(user_data);
125 unsafe {
126 if let Err(_) = sq.push(&sqe) {
127 error!("SQ full (should not happen)");
128 let _ = Box::from_raw(user_data as *mut IOEvent_<C>);
129 }
130 }
131 }
132 }
133
134 if let Err(e) = ring.submit() {
135 error!("io_uring submit error: {:?}", e);
136 }
137 }
138 }
139 info!("io_uring submitter thread exit");
140 }
141
142 fn complete(ctx: Arc<CtxShared<C, Q, W>>, ring_arc: Arc<UringInner>) {
143 info!("io_uring completer thread start");
144
145 let ring = unsafe { &mut *ring_arc.0.get() };
146
147 loop {
148 match ring.submit_and_wait(1) {
149 Ok(_) => {
150 let mut exit_received = false;
151 {
152 let mut cq = ring.completion();
153 cq.sync();
154 for cqe in cq {
155 let user_data = cqe.user_data();
156
157 if user_data == URING_EXIT_SIGNAL_USER_DATA {
158 info!("io_uring completer received exit signal");
159 exit_received = true;
160 continue;
161 }
162
163 let event_ptr = user_data as *mut IOEvent_<C>;
164 let mut event = IOEvent(unsafe { Box::from_raw(event_ptr) });
165 let res = cqe.result();
166 if res >= 0 {
167 event.set_copied(res as usize);
168 } else {
169 event.set_error(-res);
170 }
171 ctx.cb_workers.done(event);
172 }
173 }
174 if exit_received {
175 break;
176 }
177 }
178 Err(e) => {
179 error!("io_uring submit_and_wait error: {:?}", e);
180 thread::sleep(Duration::from_millis(10));
181 }
182 }
183 }
184 info!("io_uring completer thread exit");
185 }
186}