io_engine/driver/
uring.rs

1use crate::context::IoCtxShared;
2use crate::tasks::{IOAction, IOEvent, IoCallback};
3use crossfire::BlockingRxTrait;
4use io_uring::{IoUring, opcode, types};
5use log::{error, info};
6use std::{cell::UnsafeCell, io, marker::PhantomData, sync::Arc, thread, time::Duration};
7
8const URING_EXIT_SIGNAL_USER_DATA: u64 = u64::MAX;
9
10pub struct UringDriver<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>>> {
11    _marker: PhantomData<(C, Q)>,
12}
13
14struct UringInner(UnsafeCell<IoUring>);
15
16unsafe impl Send for UringInner {}
17unsafe impl Sync for UringInner {}
18
19impl<C: IoCallback, Q: BlockingRxTrait<Box<IOEvent<C>>> + Send + 'static> UringDriver<C, Q> {
20    pub fn start(ctx: Arc<IoCtxShared<C, Q>>) -> io::Result<()> {
21        let depth = ctx.depth as u32;
22        let ring = IoUring::new(depth.max(2))?;
23        let ring_arc = Arc::new(UringInner(UnsafeCell::new(ring)));
24        let ring_submit = ring_arc.clone();
25        let ring_complete = ring_arc.clone();
26        let ctx_submit = ctx.clone();
27        let ctx_complete = ctx.clone();
28        thread::spawn(move || {
29            Self::submit(ctx_submit, ring_submit);
30        });
31        thread::spawn(move || {
32            Self::complete(ctx_complete, ring_complete);
33        });
34
35        Ok(())
36    }
37
38    fn submit(ctx: Arc<IoCtxShared<C, Q>>, ring_arc: Arc<UringInner>) {
39        info!("io_uring submitter thread start");
40        let depth = ctx.depth;
41        let exit_sent = false;
42
43        let ring = unsafe { &mut *ring_arc.0.get() };
44
45        loop {
46            // 1. Receive events
47            let mut events = Vec::with_capacity(depth);
48
49            match ctx.queue.recv() {
50                Ok(event) => events.push(event),
51                Err(_) => {
52                    if !exit_sent {
53                        let nop_sqe =
54                            opcode::Nop::new().build().user_data(URING_EXIT_SIGNAL_USER_DATA);
55
56                        {
57                            let mut sq = ring.submission();
58                            unsafe {
59                                if sq.push(&nop_sqe).is_err() {
60                                    drop(sq);
61                                    let _ = ring.submit();
62                                    let mut sq = ring.submission();
63                                    let _ = sq.push(&nop_sqe);
64                                }
65                            }
66                        }
67                        info!("io_uring submitter sent exit signal");
68                    }
69                    break;
70                }
71            }
72
73            {
74                let sq = ring.submission();
75                if sq.is_full() {
76                    drop(sq);
77                    let _ = ring.submit();
78                }
79            }
80
81            while events.len() < depth {
82                match ctx.queue.try_recv() {
83                    Ok(event) => events.push(event),
84                    Err(_) => break,
85                }
86            }
87
88            // 2. Push to SQ
89            if !events.is_empty() {
90                {
91                    let mut sq = ring.submission();
92                    for event in events {
93                        let event = event;
94                        let fd = event.fd;
95                        let offset = event.offset as u64;
96                        let buf_slice = event.get_buf_ref();
97                        let buf_len = buf_slice.len() as u32;
98                        let buf_ptr = buf_slice.as_ptr() as *mut u8;
99
100                        let sqe = match event.action {
101                            IOAction::Read => opcode::Read::new(types::Fd(fd), buf_ptr, buf_len)
102                                .offset(offset)
103                                .build(),
104                            IOAction::Write => opcode::Write::new(types::Fd(fd), buf_ptr, buf_len)
105                                .offset(offset)
106                                .build(),
107                        };
108                        let user_data = Box::into_raw(event) as u64;
109                        let sqe = sqe.user_data(user_data);
110                        unsafe {
111                            if let Err(_) = sq.push(&sqe) {
112                                error!("SQ full (should not happen)");
113                                let _ = Box::from_raw(user_data as *mut IOEvent<C>);
114                            }
115                        }
116                    }
117                }
118
119                if let Err(e) = ring.submit() {
120                    error!("io_uring submit error: {:?}", e);
121                }
122            }
123        }
124        info!("io_uring submitter thread exit");
125    }
126
127    fn complete(ctx: Arc<IoCtxShared<C, Q>>, ring_arc: Arc<UringInner>) {
128        info!("io_uring completer thread start");
129
130        let ring = unsafe { &mut *ring_arc.0.get() };
131
132        loop {
133            match ring.submit_and_wait(1) {
134                Ok(_) => {
135                    let mut exit_received = false;
136                    {
137                        let mut cq = ring.completion();
138                        cq.sync();
139                        for cqe in cq {
140                            let user_data = cqe.user_data();
141
142                            if user_data == URING_EXIT_SIGNAL_USER_DATA {
143                                info!("io_uring completer received exit signal");
144                                exit_received = true;
145                                continue;
146                            }
147
148                            let event_ptr = user_data as *mut IOEvent<C>;
149                            let mut event = unsafe { Box::from_raw(event_ptr) };
150                            let res = cqe.result();
151                            if res >= 0 {
152                                event.set_copied(res as usize);
153                            } else {
154                                event.set_error(-res);
155                            }
156                            ctx.cb_workers.send(event);
157                        }
158                    }
159                    if exit_received {
160                        break;
161                    }
162                }
163                Err(e) => {
164                    error!("io_uring submit_and_wait error: {:?}", e);
165                    thread::sleep(Duration::from_millis(10));
166                }
167            }
168        }
169        info!("io_uring completer thread exit");
170    }
171}