io_engine/driver/
uring.rs

1use crate::callback_worker::Worker;
2use crate::context::CtxShared;
3use crate::tasks::{IOAction, IOCallback, IOEvent, IOEvent_};
4use crossfire::BlockingRxTrait;
5use io_uring::{IoUring, opcode, types};
6use log::{error, info};
7use std::{cell::UnsafeCell, io, marker::PhantomData, sync::Arc, thread, time::Duration};
8
9const URING_EXIT_SIGNAL_USER_DATA: u64 = u64::MAX;
10
11pub struct UringDriver<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>>, W: Worker<C>> {
12    _marker: PhantomData<(C, Q, W)>,
13}
14
15struct UringInner(UnsafeCell<IoUring>);
16
17unsafe impl Send for UringInner {}
18unsafe impl Sync for UringInner {}
19
20impl<C: IOCallback, Q: BlockingRxTrait<IOEvent<C>> + Send + 'static, W: Worker<C> + Send + 'static>
21    UringDriver<C, Q, W>
22{
23    pub fn start(ctx: Arc<CtxShared<C, Q, W>>) -> io::Result<()> {
24        let depth = ctx.depth as u32;
25        let ring = IoUring::new(depth.max(2))?;
26        let ring_arc = Arc::new(UringInner(UnsafeCell::new(ring)));
27        let ring_submit = ring_arc.clone();
28        let ring_complete = ring_arc.clone();
29        let ctx_submit = ctx.clone();
30        let ctx_complete = ctx.clone();
31        thread::spawn(move || {
32            Self::submit(ctx_submit, ring_submit);
33        });
34        thread::spawn(move || {
35            Self::complete(ctx_complete, ring_complete);
36        });
37
38        Ok(())
39    }
40
41    fn submit(ctx: Arc<CtxShared<C, Q, W>>, ring_arc: Arc<UringInner>) {
42        info!("io_uring submitter thread start");
43        let depth = ctx.depth;
44        let exit_sent = false;
45
46        let ring = unsafe { &mut *ring_arc.0.get() };
47
48        loop {
49            // 1. Receive events
50            let mut events = Vec::with_capacity(depth);
51
52            match ctx.queue.recv() {
53                Ok(event) => events.push(event),
54                Err(_) => {
55                    if !exit_sent {
56                        let nop_sqe =
57                            opcode::Nop::new().build().user_data(URING_EXIT_SIGNAL_USER_DATA);
58
59                        {
60                            let mut sq = ring.submission();
61                            unsafe {
62                                if sq.push(&nop_sqe).is_err() {
63                                    drop(sq);
64                                    let _ = ring.submit();
65                                    let mut sq = ring.submission();
66                                    let _ = sq.push(&nop_sqe);
67                                }
68                            }
69                        }
70                        info!("io_uring submitter sent exit signal");
71                    }
72                    break;
73                }
74            }
75
76            {
77                let sq = ring.submission();
78                if sq.is_full() {
79                    drop(sq);
80                    let _ = ring.submit();
81                }
82            }
83
84            while events.len() < depth {
85                match ctx.queue.try_recv() {
86                    Ok(event) => events.push(event),
87                    Err(_) => break,
88                }
89            }
90
91            // 2. Push to SQ
92            if !events.is_empty() {
93                {
94                    let mut sq = ring.submission();
95                    for event in events {
96                        let event = event;
97                        let fd = event.fd;
98                        let buf_slice = event.get_buf_ref();
99
100                        let (offset, buf_ptr, buf_len) = if event.res > 0 {
101                            let progress = event.res as u64;
102                            (
103                                event.offset as u64 + progress,
104                                unsafe { (buf_slice.as_ptr() as *mut u8).add(progress as usize) },
105                                (buf_slice.len() as u64 - progress) as u32,
106                            )
107                        } else {
108                            (
109                                event.offset as u64,
110                                buf_slice.as_ptr() as *mut u8,
111                                buf_slice.len() as u32,
112                            )
113                        };
114
115                        let sqe = match event.action {
116                            IOAction::Read => opcode::Read::new(types::Fd(fd), buf_ptr, buf_len)
117                                .offset(offset)
118                                .build(),
119                            IOAction::Write => opcode::Write::new(types::Fd(fd), buf_ptr, buf_len)
120                                .offset(offset)
121                                .build(),
122                        };
123                        let user_data = Box::into_raw(event.0) as u64;
124                        let sqe = sqe.user_data(user_data);
125                        unsafe {
126                            if let Err(_) = sq.push(&sqe) {
127                                error!("SQ full (should not happen)");
128                                let _ = Box::from_raw(user_data as *mut IOEvent_<C>);
129                            }
130                        }
131                    }
132                }
133
134                if let Err(e) = ring.submit() {
135                    error!("io_uring submit error: {:?}", e);
136                }
137            }
138        }
139        info!("io_uring submitter thread exit");
140    }
141
142    fn complete(ctx: Arc<CtxShared<C, Q, W>>, ring_arc: Arc<UringInner>) {
143        info!("io_uring completer thread start");
144
145        let ring = unsafe { &mut *ring_arc.0.get() };
146
147        loop {
148            match ring.submit_and_wait(1) {
149                Ok(_) => {
150                    let mut exit_received = false;
151                    {
152                        let mut cq = ring.completion();
153                        cq.sync();
154                        for cqe in cq {
155                            let user_data = cqe.user_data();
156
157                            if user_data == URING_EXIT_SIGNAL_USER_DATA {
158                                info!("io_uring completer received exit signal");
159                                exit_received = true;
160                                continue;
161                            }
162
163                            let event_ptr = user_data as *mut IOEvent_<C>;
164                            let mut event = IOEvent(unsafe { Box::from_raw(event_ptr) });
165                            let res = cqe.result();
166                            if res >= 0 {
167                                event.set_copied(res as usize);
168                            } else {
169                                event.set_error(-res);
170                            }
171                            ctx.cb_workers.done(event);
172                        }
173                    }
174                    if exit_received {
175                        break;
176                    }
177                }
178                Err(e) => {
179                    error!("io_uring submit_and_wait error: {:?}", e);
180                    thread::sleep(Duration::from_millis(10));
181                }
182            }
183        }
184        info!("io_uring completer thread exit");
185    }
186}