quill_sql/storage/io/
io_uring.rs

1use bytes::{Bytes, BytesMut};
2use io_uring::{opcode, types, IoUring};
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::fs::OpenOptions;
6use std::io::{self, ErrorKind, IoSliceMut};
7use std::os::fd::AsRawFd;
8use std::path::{Path, PathBuf};
9use std::rc::Rc;
10use std::sync::mpsc::{Receiver, Sender};
11use std::sync::{Arc, Mutex};
12
13use crate::buffer::{PageId, PAGE_SIZE};
14use crate::error::QuillSQLError;
15use crate::error::QuillSQLResult;
16use crate::storage::disk_manager::{AlignedPageBuf, DiskManager};
17use crate::storage::disk_scheduler::DiskRequest;
18use crate::storage::page::META_PAGE_SIZE;
19
20type DiskResultSender<T> = Sender<QuillSQLResult<T>>;
21
22struct PendingEntry {
23    kind: PendingKind,
24}
25
26enum PendingKind {
27    ReadFixed {
28        idx: usize,
29        sender: DiskResultSender<BytesMut>,
30    },
31    ReadVec {
32        buffer: Vec<u8>,
33        sender: DiskResultSender<BytesMut>,
34    },
35    ReadBatchFixed {
36        idx: usize,
37        batch: Rc<RefCell<BatchState>>,
38        index: usize,
39    },
40    ReadBatchVec {
41        buffer: Vec<u8>,
42        batch: Rc<RefCell<BatchState>>,
43        index: usize,
44    },
45    WriteFixed {
46        idx: usize,
47        state: Rc<RefCell<WriteState>>,
48        len: usize,
49    },
50    WriteVec {
51        state: Rc<RefCell<WriteState>>,
52        _buffer: Bytes,
53        len: usize,
54    },
55    Fsync {
56        state: Rc<RefCell<WriteState>>,
57        required: bool,
58    },
59}
60
61struct BatchState {
62    sender: DiskResultSender<Vec<BytesMut>>,
63    results: Vec<Option<BytesMut>>,
64    remaining: usize,
65    error: Option<QuillSQLError>,
66    finished: bool,
67}
68
69impl BatchState {
70    fn new(sender: DiskResultSender<Vec<BytesMut>>, len: usize) -> Self {
71        BatchState {
72            sender,
73            results: vec![None; len],
74            remaining: len,
75            error: None,
76            finished: false,
77        }
78    }
79
80    fn record_result(
81        &mut self,
82        index: usize,
83        outcome: QuillSQLResult<BytesMut>,
84    ) -> Option<QuillSQLResult<Vec<BytesMut>>> {
85        if self.finished {
86            return None;
87        }
88
89        match outcome {
90            Ok(bytes) => {
91                self.results[index] = Some(bytes);
92            }
93            Err(err) => {
94                if self.error.is_none() {
95                    self.error = Some(err);
96                }
97            }
98        }
99
100        if self.remaining > 0 {
101            self.remaining -= 1;
102        }
103
104        if self.remaining == 0 {
105            self.finished = true;
106            if let Some(err) = self.error.take() {
107                Some(Err(err))
108            } else {
109                let mut ordered = Vec::with_capacity(self.results.len());
110                for entry in self.results.drain(..) {
111                    ordered.push(entry.expect("batch result missing"));
112                }
113                Some(Ok(ordered))
114            }
115        } else {
116            None
117        }
118    }
119}
120
121struct WriteState {
122    sender: DiskResultSender<()>,
123    pending: u8,
124    error: Option<QuillSQLError>,
125    finished: bool,
126}
127
128impl WriteState {
129    fn new(sender: DiskResultSender<()>, pending: u8) -> Self {
130        WriteState {
131            sender,
132            pending,
133            error: None,
134            finished: false,
135        }
136    }
137
138    fn record(&mut self, outcome: QuillSQLResult<()>) -> bool {
139        if self.finished {
140            return false;
141        }
142
143        if let Err(err) = outcome {
144            if self.error.is_none() {
145                self.error = Some(err);
146            }
147        }
148
149        if self.pending > 0 {
150            self.pending -= 1;
151        }
152
153        if self.pending == 0 {
154            self.finished = true;
155            let final_res = match self.error.take() {
156                Some(err) => Err(err),
157                None => Ok(()),
158            };
159            if let Err(send_err) = self.sender.send(final_res) {
160                log::error!("io_uring write result send failed: {}", send_err);
161            }
162            true
163        } else {
164            false
165        }
166    }
167}
168
169fn next_token(counter: &mut u64) -> u64 {
170    let token = *counter;
171    *counter = counter.wrapping_add(1);
172    if *counter == 0 {
173        *counter = 1;
174    }
175    token
176}
177
178fn push_sqe(ring: &mut IoUring, sqe: io_uring::squeue::Entry) {
179    let entry = sqe;
180    loop {
181        let push_result = unsafe {
182            let mut sq = ring.submission();
183            sq.push(&entry)
184        };
185
186        match push_result {
187            Ok(()) => break,
188            Err(_) => {
189                if let Err(err) = ring.submit() {
190                    log::error!("io_uring submit while pushing sqe failed: {}", err);
191                    break;
192                }
193            }
194        }
195    }
196}
197
198struct FixedBufferPool {
199    buffers: Vec<AlignedPageBuf>,
200    free: Mutex<Vec<usize>>,
201}
202
203impl FixedBufferPool {
204    fn new(count: usize) -> QuillSQLResult<Self> {
205        if count > u16::MAX as usize {
206            return Err(QuillSQLError::Internal(
207                "io_uring fixed buffer count exceeds u16::MAX".into(),
208            ));
209        }
210        let mut buffers = Vec::with_capacity(count);
211        for _ in 0..count {
212            buffers.push(AlignedPageBuf::new_zeroed()?);
213        }
214        let mut free = Vec::with_capacity(count);
215        for idx in 0..count {
216            free.push(idx);
217        }
218        Ok(Self {
219            buffers,
220            free: Mutex::new(free),
221        })
222    }
223
224    fn register(&mut self, ring: &mut IoUring) -> io::Result<()> {
225        if self.buffers.is_empty() {
226            return Ok(());
227        }
228        let io_slices: Vec<IoSliceMut<'_>> = self
229            .buffers
230            .iter_mut()
231            .map(|buf| IoSliceMut::new(buf.as_mut_slice()))
232            .collect();
233        let ptr = io_slices.as_ptr() as *const libc::iovec;
234        let iovecs = unsafe { std::slice::from_raw_parts(ptr, io_slices.len()) };
235        match unsafe { ring.submitter().register_buffers(iovecs) } {
236            Ok(()) => Ok(()),
237            Err(err)
238                if err.kind() == ErrorKind::OutOfMemory
239                    || err.raw_os_error() == Some(libc::ENOMEM)
240                    || err.kind() == ErrorKind::InvalidInput
241                    || err.raw_os_error() == Some(libc::EINVAL) =>
242            {
243                log::warn!(
244                    "io_uring buffer registration failed ({}); falling back to heap buffers",
245                    err
246                );
247                self.buffers.clear();
248                self.free.lock().unwrap().clear();
249                Ok(())
250            }
251            Err(err) => Err(err),
252        }
253    }
254
255    fn acquire(&self) -> Option<(usize, *mut u8)> {
256        let idx = self.free.lock().unwrap().pop()?;
257        let ptr = self.buffers[idx].ptr();
258        Some((idx, ptr))
259    }
260
261    fn release(&self, idx: usize) {
262        self.free.lock().unwrap().push(idx);
263    }
264
265    fn fill_from_slice(&mut self, idx: usize, data: &[u8]) {
266        debug_assert_eq!(data.len(), PAGE_SIZE);
267        let slice = self.buffers[idx].as_mut_slice();
268        slice.copy_from_slice(data);
269    }
270
271    fn extract_bytes(&self, idx: usize, len: usize) -> BytesMut {
272        BytesMut::from(&self.buffers[idx].as_slice()[..len])
273    }
274}
275
276struct WalFileEntry {
277    _path: PathBuf,
278    _file: std::fs::File,
279    fd: i32,
280}
281
282struct WalFileCache {
283    files: HashMap<PathBuf, WalFileEntry>,
284}
285
286impl WalFileCache {
287    fn new() -> Self {
288        Self {
289            files: HashMap::new(),
290        }
291    }
292
293    fn fd_for(&mut self, path: &Path) -> io::Result<i32> {
294        if let Some(entry) = self.files.get(path) {
295            return Ok(entry.fd);
296        }
297
298        if let Some(parent) = path.parent() {
299            std::fs::create_dir_all(parent)?;
300        }
301
302        let file = OpenOptions::new()
303            .create(true)
304            .read(true)
305            .write(true)
306            .open(path)?;
307        let fd = file.as_raw_fd();
308        let entry = WalFileEntry {
309            _path: path.to_path_buf(),
310            _file: file,
311            fd,
312        };
313        self.files.insert(path.to_path_buf(), entry);
314        Ok(fd)
315    }
316}
317
318fn queue_read(
319    ring: &mut IoUring,
320    pending: &mut HashMap<u64, PendingEntry>,
321    token_counter: &mut u64,
322    buffer_pool: &FixedBufferPool,
323    fd: i32,
324    page_id: PageId,
325    sender: DiskResultSender<BytesMut>,
326) {
327    let token = next_token(token_counter);
328    let offset = page_file_offset(page_id);
329
330    if let Some((idx, ptr)) = buffer_pool.acquire() {
331        let read_e = opcode::ReadFixed::new(types::Fd(fd), ptr, PAGE_SIZE as u32, idx as u16)
332            .offset(offset)
333            .build()
334            .user_data(token);
335        push_sqe(ring, read_e);
336        pending.insert(
337            token,
338            PendingEntry {
339                kind: PendingKind::ReadFixed { idx, sender },
340            },
341        );
342    } else {
343        let mut buffer = vec![0u8; PAGE_SIZE];
344        let read_e = opcode::Read::new(types::Fd(fd), buffer.as_mut_ptr(), PAGE_SIZE as u32)
345            .offset(offset)
346            .build()
347            .user_data(token);
348        push_sqe(ring, read_e);
349        pending.insert(
350            token,
351            PendingEntry {
352                kind: PendingKind::ReadVec { buffer, sender },
353            },
354        );
355    }
356}
357
358fn queue_read_batch(
359    ring: &mut IoUring,
360    pending: &mut HashMap<u64, PendingEntry>,
361    token_counter: &mut u64,
362    buffer_pool: &FixedBufferPool,
363    fd: i32,
364    page_ids: Vec<PageId>,
365    sender: DiskResultSender<Vec<BytesMut>>,
366) {
367    if page_ids.is_empty() {
368        if let Err(err) = sender.send(Ok(vec![])) {
369            log::error!("io_uring batch read send failed: {}", err);
370        }
371        return;
372    }
373
374    let batch = Rc::new(RefCell::new(BatchState::new(sender, page_ids.len())));
375
376    for (index, page_id) in page_ids.into_iter().enumerate() {
377        let token = next_token(token_counter);
378        let offset = page_file_offset(page_id);
379        if let Some((idx, ptr)) = buffer_pool.acquire() {
380            let read_e = opcode::ReadFixed::new(types::Fd(fd), ptr, PAGE_SIZE as u32, idx as u16)
381                .offset(offset)
382                .build()
383                .user_data(token);
384            push_sqe(ring, read_e);
385            pending.insert(
386                token,
387                PendingEntry {
388                    kind: PendingKind::ReadBatchFixed {
389                        idx,
390                        batch: Rc::clone(&batch),
391                        index,
392                    },
393                },
394            );
395        } else {
396            let mut buffer = vec![0u8; PAGE_SIZE];
397            let read_e = opcode::Read::new(types::Fd(fd), buffer.as_mut_ptr(), PAGE_SIZE as u32)
398                .offset(offset)
399                .build()
400                .user_data(token);
401            push_sqe(ring, read_e);
402            pending.insert(
403                token,
404                PendingEntry {
405                    kind: PendingKind::ReadBatchVec {
406                        buffer,
407                        batch: Rc::clone(&batch),
408                        index,
409                    },
410                },
411            );
412        }
413    }
414}
415
416fn queue_write(
417    ring: &mut IoUring,
418    pending: &mut HashMap<u64, PendingEntry>,
419    token_counter: &mut u64,
420    buffer_pool: &mut FixedBufferPool,
421    fd: i32,
422    page_id: PageId,
423    data: Bytes,
424    sender: DiskResultSender<()>,
425    fsync_on_write: bool,
426) {
427    let has_data = !data.is_empty();
428    let pending_ops = (if has_data { 1 } else { 0 }) + if fsync_on_write { 1 } else { 0 };
429
430    if pending_ops == 0 {
431        if let Err(err) = sender.send(Ok(())) {
432            log::error!("io_uring write result send failed: {}", err);
433        }
434        return;
435    }
436
437    let state = Rc::new(RefCell::new(WriteState::new(sender, pending_ops as u8)));
438
439    if has_data {
440        debug_assert_eq!(data.len(), PAGE_SIZE);
441        let write_token = next_token(token_counter);
442        let offset = page_file_offset(page_id);
443
444        if let Some((idx, ptr)) = buffer_pool.acquire() {
445            buffer_pool.fill_from_slice(idx, &data);
446            let entry = opcode::WriteFixed::new(types::Fd(fd), ptr, data.len() as u32, idx as u16)
447                .offset(offset)
448                .build()
449                .user_data(write_token);
450            push_sqe(ring, entry);
451            pending.insert(
452                write_token,
453                PendingEntry {
454                    kind: PendingKind::WriteFixed {
455                        idx,
456                        state: Rc::clone(&state),
457                        len: data.len(),
458                    },
459                },
460            );
461        } else {
462            let buffer = data.clone();
463            let entry = opcode::Write::new(types::Fd(fd), buffer.as_ptr(), buffer.len() as u32)
464                .offset(offset)
465                .build()
466                .user_data(write_token);
467            push_sqe(ring, entry);
468            pending.insert(
469                write_token,
470                PendingEntry {
471                    kind: PendingKind::WriteVec {
472                        state: Rc::clone(&state),
473                        _buffer: buffer,
474                        len: data.len(),
475                    },
476                },
477            );
478        }
479    }
480
481    if fsync_on_write {
482        let fsync_token = next_token(token_counter);
483        let fsync_entry = opcode::Fsync::new(types::Fd(fd))
484            .flags(types::FsyncFlags::DATASYNC)
485            .build()
486            .user_data(fsync_token);
487
488        push_sqe(ring, fsync_entry);
489
490        pending.insert(
491            fsync_token,
492            PendingEntry {
493                kind: PendingKind::Fsync {
494                    state,
495                    required: true,
496                },
497            },
498        );
499    }
500}
501
502fn queue_wal_write(
503    ring: &mut IoUring,
504    pending: &mut HashMap<u64, PendingEntry>,
505    token_counter: &mut u64,
506    fd: i32,
507    offset: u64,
508    data: Bytes,
509    sender: DiskResultSender<()>,
510    sync: bool,
511) {
512    let has_data = !data.is_empty();
513    let pending_ops = (if has_data { 1 } else { 0 }) + if sync { 1 } else { 0 };
514
515    if pending_ops == 0 {
516        if let Err(err) = sender.send(Ok(())) {
517            log::error!("io_uring WAL write result send failed: {}", err);
518        }
519        return;
520    }
521
522    let state = Rc::new(RefCell::new(WriteState::new(sender, pending_ops as u8)));
523
524    if has_data {
525        let write_token = next_token(token_counter);
526        let buffer = data.clone();
527        let entry = opcode::Write::new(types::Fd(fd), buffer.as_ptr(), buffer.len() as u32)
528            .offset(offset)
529            .build()
530            .user_data(write_token);
531        push_sqe(ring, entry);
532        pending.insert(
533            write_token,
534            PendingEntry {
535                kind: PendingKind::WriteVec {
536                    state: Rc::clone(&state),
537                    _buffer: buffer,
538                    len: data.len(),
539                },
540            },
541        );
542    }
543
544    if sync {
545        let fsync_token = next_token(token_counter);
546        let fsync_entry = opcode::Fsync::new(types::Fd(fd))
547            .flags(types::FsyncFlags::DATASYNC)
548            .build()
549            .user_data(fsync_token);
550        push_sqe(ring, fsync_entry);
551        pending.insert(
552            fsync_token,
553            PendingEntry {
554                kind: PendingKind::Fsync {
555                    state,
556                    required: true,
557                },
558            },
559        );
560    }
561}
562
563fn queue_wal_fsync(
564    ring: &mut IoUring,
565    pending: &mut HashMap<u64, PendingEntry>,
566    token_counter: &mut u64,
567    fd: i32,
568    sender: DiskResultSender<()>,
569) {
570    let state = Rc::new(RefCell::new(WriteState::new(sender, 1)));
571    let fsync_token = next_token(token_counter);
572    let fsync_entry = opcode::Fsync::new(types::Fd(fd))
573        .flags(types::FsyncFlags::DATASYNC)
574        .build()
575        .user_data(fsync_token);
576    push_sqe(ring, fsync_entry);
577    pending.insert(
578        fsync_token,
579        PendingEntry {
580            kind: PendingKind::Fsync {
581                state,
582                required: true,
583            },
584        },
585    );
586}
587
588fn handle_request_direct(
589    request: DiskRequest,
590    ring: &mut IoUring,
591    pending: &mut HashMap<u64, PendingEntry>,
592    token_counter: &mut u64,
593    fd: i32,
594    disk_manager: &Arc<DiskManager>,
595    fsync_on_write: bool,
596    buffer_pool: &mut FixedBufferPool,
597    wal_files: &mut WalFileCache,
598) -> bool {
599    match request {
600        DiskRequest::ReadPage {
601            page_id,
602            result_sender,
603        } => {
604            queue_read(
605                ring,
606                pending,
607                token_counter,
608                buffer_pool,
609                fd,
610                page_id,
611                result_sender,
612            );
613        }
614        DiskRequest::ReadPages {
615            page_ids,
616            result_sender,
617        } => {
618            queue_read_batch(
619                ring,
620                pending,
621                token_counter,
622                buffer_pool,
623                fd,
624                page_ids,
625                result_sender,
626            );
627        }
628        DiskRequest::WritePage {
629            page_id,
630            data,
631            result_sender,
632        } => {
633            queue_write(
634                ring,
635                pending,
636                token_counter,
637                buffer_pool,
638                fd,
639                page_id,
640                data,
641                result_sender,
642                fsync_on_write,
643            );
644        }
645        DiskRequest::WriteWal {
646            path,
647            offset,
648            data,
649            sync,
650            result_sender,
651        } => match wal_files.fd_for(&path) {
652            Ok(wal_fd) => {
653                queue_wal_write(
654                    ring,
655                    pending,
656                    token_counter,
657                    wal_fd,
658                    offset,
659                    data,
660                    result_sender,
661                    sync,
662                );
663            }
664            Err(err) => {
665                let _ = result_sender.send(Err(QuillSQLError::Io(err)));
666            }
667        },
668        DiskRequest::FsyncWal {
669            path,
670            result_sender,
671        } => match wal_files.fd_for(&path) {
672            Ok(wal_fd) => {
673                queue_wal_fsync(ring, pending, token_counter, wal_fd, result_sender);
674            }
675            Err(err) => {
676                let _ = result_sender.send(Err(QuillSQLError::Io(err)));
677            }
678        },
679        DiskRequest::AllocatePage { result_sender } => {
680            let _ = result_sender.send(disk_manager.allocate_page());
681        }
682        DiskRequest::DeallocatePage {
683            page_id,
684            result_sender,
685        } => {
686            let _ = result_sender.send(disk_manager.deallocate_page(page_id));
687        }
688        DiskRequest::Shutdown => {
689            log::debug!("Disk I/O io_uring worker received Shutdown signal.");
690            return true;
691        }
692    }
693    false
694}
695
696fn drain_completions(
697    ring: &mut IoUring,
698    pending: &mut HashMap<u64, PendingEntry>,
699    buffer_pool: &FixedBufferPool,
700) {
701    while let Some(cqe) = ring.completion().next() {
702        let token = cqe.user_data();
703        if let Some(entry) = pending.remove(&token) {
704            let result_code = cqe.result();
705            match entry.kind {
706                PendingKind::ReadFixed { idx, sender } => {
707                    let bytes = buffer_pool.extract_bytes(idx, PAGE_SIZE);
708                    buffer_pool.release(idx);
709                    let outcome = parse_read_result(result_code, bytes.to_vec());
710                    if let Err(err) = sender.send(outcome.map(BytesMut::from)) {
711                        log::error!("io_uring read fixed result send failed: {}", err);
712                    }
713                }
714                PendingKind::ReadVec { buffer, sender } => {
715                    let outcome = parse_read_result(result_code, buffer);
716                    if let Err(err) = sender.send(outcome.map(BytesMut::from)) {
717                        log::error!("io_uring read result send failed: {}", err);
718                    }
719                }
720                PendingKind::ReadBatchFixed { idx, batch, index } => {
721                    let bytes = buffer_pool.extract_bytes(idx, PAGE_SIZE);
722                    buffer_pool.release(idx);
723                    let outcome = parse_read_result(result_code, bytes.to_vec());
724                    let mut batch_ref = batch.borrow_mut();
725                    if let Some(result) = batch_ref.record_result(index, outcome) {
726                        if let Err(err) = batch_ref.sender.send(result) {
727                            log::error!("io_uring batch read send failed: {}", err);
728                        }
729                    }
730                }
731                PendingKind::ReadBatchVec {
732                    buffer,
733                    batch,
734                    index,
735                } => {
736                    let outcome = parse_read_result(result_code, buffer).map(BytesMut::from);
737                    let mut batch_ref = batch.borrow_mut();
738                    if let Some(result) = batch_ref.record_result(index, outcome) {
739                        if let Err(err) = batch_ref.sender.send(result) {
740                            log::error!("io_uring batch read send failed: {}", err);
741                        }
742                    }
743                }
744                PendingKind::WriteFixed { idx, state, len } => {
745                    buffer_pool.release(idx);
746                    let outcome = parse_write_result(result_code, len);
747                    let _ = state.borrow_mut().record(outcome);
748                }
749                PendingKind::WriteVec { state, len, .. } => {
750                    let outcome = parse_write_result(result_code, len);
751                    let _ = state.borrow_mut().record(outcome);
752                }
753                PendingKind::Fsync { state, required } => {
754                    let outcome = parse_fsync_result(result_code, required);
755                    let _ = state.borrow_mut().record(outcome);
756                }
757            }
758        }
759    }
760}
761
762fn parse_read_result(code: i32, buffer: Vec<u8>) -> QuillSQLResult<BytesMut> {
763    if code < 0 {
764        let err = io::Error::from_raw_os_error(-code);
765        Err(QuillSQLError::Storage(format!(
766            "io_uring read failed: {}",
767            err
768        )))
769    } else if code as usize != PAGE_SIZE {
770        Err(QuillSQLError::Storage(format!(
771            "io_uring short read: {} bytes",
772            code
773        )))
774    } else {
775        Ok(BytesMut::from(&buffer[..]))
776    }
777}
778
779fn parse_write_result(code: i32, expected: usize) -> QuillSQLResult<()> {
780    if code < 0 {
781        let err = io::Error::from_raw_os_error(-code);
782        Err(QuillSQLError::Storage(format!(
783            "io_uring write failed: {}",
784            err
785        )))
786    } else if code as usize != expected {
787        Err(QuillSQLError::Storage(format!(
788            "io_uring short write: {} bytes",
789            code
790        )))
791    } else {
792        Ok(())
793    }
794}
795
796fn parse_fsync_result(code: i32, required: bool) -> QuillSQLResult<()> {
797    if !required {
798        return Ok(());
799    }
800    if code < 0 {
801        let err = io::Error::from_raw_os_error(-code);
802        Err(QuillSQLError::Storage(format!(
803            "io_uring fdatasync failed: {}",
804            err
805        )))
806    } else {
807        Ok(())
808    }
809}
810
811#[inline]
812fn page_file_offset(page_id: PageId) -> u64 {
813    (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64
814}
815
816pub(crate) fn worker_loop(
817    receiver: Receiver<DiskRequest>,
818    disk_manager: Arc<DiskManager>,
819    entries: u32,
820    fixed_buffers: usize,
821    sqpoll_idle: Option<u32>,
822    fsync_on_write: bool,
823) {
824    log::debug!("Disk I/O io_uring worker thread started.");
825    let mut builder = IoUring::builder();
826    if let Some(idle) = sqpoll_idle {
827        builder.setup_sqpoll(idle);
828    }
829    let mut ring = builder.build(entries).expect("io_uring init failed");
830
831    let mut buffer_pool = FixedBufferPool::new(fixed_buffers).expect("create fixed buffer pool");
832    buffer_pool
833        .register(&mut ring)
834        .expect("register fixed buffers");
835
836    // Clone a dedicated File for this worker to get a stable fd
837    let file = disk_manager
838        .try_clone_db_file()
839        .expect("clone db file for io_uring failed");
840    let fd = file.as_raw_fd();
841
842    let mut pending: HashMap<u64, PendingEntry> = HashMap::new();
843    let mut token_counter: u64 = 1;
844    let mut shutdown = false;
845    let mut wal_files = WalFileCache::new();
846
847    while !shutdown || !pending.is_empty() {
848        // Always process completions first to reduce queue pressure.
849        drain_completions(&mut ring, &mut pending, &buffer_pool);
850
851        if shutdown && pending.is_empty() {
852            break;
853        }
854
855        let mut received_any = false;
856        while let Ok(request) = receiver.try_recv() {
857            received_any = true;
858            if handle_request_direct(
859                request,
860                &mut ring,
861                &mut pending,
862                &mut token_counter,
863                fd,
864                &disk_manager,
865                fsync_on_write,
866                &mut buffer_pool,
867                &mut wal_files,
868            ) {
869                shutdown = true;
870                break;
871            }
872        }
873
874        if shutdown && pending.is_empty() {
875            break;
876        }
877
878        if pending.is_empty() {
879            // No outstanding I/O: block waiting for next request.
880            match receiver.recv() {
881                Ok(req) => match req {
882                    other => {
883                        if handle_request_direct(
884                            other,
885                            &mut ring,
886                            &mut pending,
887                            &mut token_counter,
888                            fd,
889                            &disk_manager,
890                            fsync_on_write,
891                            &mut buffer_pool,
892                            &mut wal_files,
893                        ) {
894                            shutdown = true;
895                        }
896                    }
897                },
898                Err(_) => {
899                    shutdown = true;
900                }
901            }
902        } else if !received_any {
903            // Pending I/O but no new work; wait for at least one completion.
904            if let Err(e) = ring.submit_and_wait(1) {
905                log::error!("io_uring submit_and_wait failed: {}", e);
906            }
907        } else if let Err(e) = ring.submit() {
908            log::error!("io_uring submit failed: {}", e);
909        }
910    }
911
912    // Final drain to deliver any remaining completions before exit.
913    drain_completions(&mut ring, &mut pending, &buffer_pool);
914
915    log::debug!("Disk I/O io_uring worker thread finished.");
916}