1use bytes::{Bytes, BytesMut};
2use io_uring::{opcode, types, IoUring};
3use std::cell::RefCell;
4use std::collections::HashMap;
5use std::fs::OpenOptions;
6use std::io::{self, ErrorKind, IoSliceMut};
7use std::os::fd::AsRawFd;
8use std::path::{Path, PathBuf};
9use std::rc::Rc;
10use std::sync::mpsc::{Receiver, Sender};
11use std::sync::{Arc, Mutex};
12
13use crate::buffer::{PageId, PAGE_SIZE};
14use crate::error::QuillSQLError;
15use crate::error::QuillSQLResult;
16use crate::storage::disk_manager::{AlignedPageBuf, DiskManager};
17use crate::storage::disk_scheduler::DiskRequest;
18use crate::storage::page::META_PAGE_SIZE;
19
20type DiskResultSender<T> = Sender<QuillSQLResult<T>>;
21
22struct PendingEntry {
23 kind: PendingKind,
24}
25
26enum PendingKind {
27 ReadFixed {
28 idx: usize,
29 sender: DiskResultSender<BytesMut>,
30 },
31 ReadVec {
32 buffer: Vec<u8>,
33 sender: DiskResultSender<BytesMut>,
34 },
35 ReadBatchFixed {
36 idx: usize,
37 batch: Rc<RefCell<BatchState>>,
38 index: usize,
39 },
40 ReadBatchVec {
41 buffer: Vec<u8>,
42 batch: Rc<RefCell<BatchState>>,
43 index: usize,
44 },
45 WriteFixed {
46 idx: usize,
47 state: Rc<RefCell<WriteState>>,
48 len: usize,
49 },
50 WriteVec {
51 state: Rc<RefCell<WriteState>>,
52 _buffer: Bytes,
53 len: usize,
54 },
55 Fsync {
56 state: Rc<RefCell<WriteState>>,
57 required: bool,
58 },
59}
60
61struct BatchState {
62 sender: DiskResultSender<Vec<BytesMut>>,
63 results: Vec<Option<BytesMut>>,
64 remaining: usize,
65 error: Option<QuillSQLError>,
66 finished: bool,
67}
68
69impl BatchState {
70 fn new(sender: DiskResultSender<Vec<BytesMut>>, len: usize) -> Self {
71 BatchState {
72 sender,
73 results: vec![None; len],
74 remaining: len,
75 error: None,
76 finished: false,
77 }
78 }
79
80 fn record_result(
81 &mut self,
82 index: usize,
83 outcome: QuillSQLResult<BytesMut>,
84 ) -> Option<QuillSQLResult<Vec<BytesMut>>> {
85 if self.finished {
86 return None;
87 }
88
89 match outcome {
90 Ok(bytes) => {
91 self.results[index] = Some(bytes);
92 }
93 Err(err) => {
94 if self.error.is_none() {
95 self.error = Some(err);
96 }
97 }
98 }
99
100 if self.remaining > 0 {
101 self.remaining -= 1;
102 }
103
104 if self.remaining == 0 {
105 self.finished = true;
106 if let Some(err) = self.error.take() {
107 Some(Err(err))
108 } else {
109 let mut ordered = Vec::with_capacity(self.results.len());
110 for entry in self.results.drain(..) {
111 ordered.push(entry.expect("batch result missing"));
112 }
113 Some(Ok(ordered))
114 }
115 } else {
116 None
117 }
118 }
119}
120
121struct WriteState {
122 sender: DiskResultSender<()>,
123 pending: u8,
124 error: Option<QuillSQLError>,
125 finished: bool,
126}
127
128impl WriteState {
129 fn new(sender: DiskResultSender<()>, pending: u8) -> Self {
130 WriteState {
131 sender,
132 pending,
133 error: None,
134 finished: false,
135 }
136 }
137
138 fn record(&mut self, outcome: QuillSQLResult<()>) -> bool {
139 if self.finished {
140 return false;
141 }
142
143 if let Err(err) = outcome {
144 if self.error.is_none() {
145 self.error = Some(err);
146 }
147 }
148
149 if self.pending > 0 {
150 self.pending -= 1;
151 }
152
153 if self.pending == 0 {
154 self.finished = true;
155 let final_res = match self.error.take() {
156 Some(err) => Err(err),
157 None => Ok(()),
158 };
159 if let Err(send_err) = self.sender.send(final_res) {
160 log::error!("io_uring write result send failed: {}", send_err);
161 }
162 true
163 } else {
164 false
165 }
166 }
167}
168
169fn next_token(counter: &mut u64) -> u64 {
170 let token = *counter;
171 *counter = counter.wrapping_add(1);
172 if *counter == 0 {
173 *counter = 1;
174 }
175 token
176}
177
178fn push_sqe(ring: &mut IoUring, sqe: io_uring::squeue::Entry) {
179 let entry = sqe;
180 loop {
181 let push_result = unsafe {
182 let mut sq = ring.submission();
183 sq.push(&entry)
184 };
185
186 match push_result {
187 Ok(()) => break,
188 Err(_) => {
189 if let Err(err) = ring.submit() {
190 log::error!("io_uring submit while pushing sqe failed: {}", err);
191 break;
192 }
193 }
194 }
195 }
196}
197
198struct FixedBufferPool {
199 buffers: Vec<AlignedPageBuf>,
200 free: Mutex<Vec<usize>>,
201}
202
203impl FixedBufferPool {
204 fn new(count: usize) -> QuillSQLResult<Self> {
205 if count > u16::MAX as usize {
206 return Err(QuillSQLError::Internal(
207 "io_uring fixed buffer count exceeds u16::MAX".into(),
208 ));
209 }
210 let mut buffers = Vec::with_capacity(count);
211 for _ in 0..count {
212 buffers.push(AlignedPageBuf::new_zeroed()?);
213 }
214 let mut free = Vec::with_capacity(count);
215 for idx in 0..count {
216 free.push(idx);
217 }
218 Ok(Self {
219 buffers,
220 free: Mutex::new(free),
221 })
222 }
223
224 fn register(&mut self, ring: &mut IoUring) -> io::Result<()> {
225 if self.buffers.is_empty() {
226 return Ok(());
227 }
228 let io_slices: Vec<IoSliceMut<'_>> = self
229 .buffers
230 .iter_mut()
231 .map(|buf| IoSliceMut::new(buf.as_mut_slice()))
232 .collect();
233 let ptr = io_slices.as_ptr() as *const libc::iovec;
234 let iovecs = unsafe { std::slice::from_raw_parts(ptr, io_slices.len()) };
235 match unsafe { ring.submitter().register_buffers(iovecs) } {
236 Ok(()) => Ok(()),
237 Err(err)
238 if err.kind() == ErrorKind::OutOfMemory
239 || err.raw_os_error() == Some(libc::ENOMEM)
240 || err.kind() == ErrorKind::InvalidInput
241 || err.raw_os_error() == Some(libc::EINVAL) =>
242 {
243 log::warn!(
244 "io_uring buffer registration failed ({}); falling back to heap buffers",
245 err
246 );
247 self.buffers.clear();
248 self.free.lock().unwrap().clear();
249 Ok(())
250 }
251 Err(err) => Err(err),
252 }
253 }
254
255 fn acquire(&self) -> Option<(usize, *mut u8)> {
256 let idx = self.free.lock().unwrap().pop()?;
257 let ptr = self.buffers[idx].ptr();
258 Some((idx, ptr))
259 }
260
261 fn release(&self, idx: usize) {
262 self.free.lock().unwrap().push(idx);
263 }
264
265 fn fill_from_slice(&mut self, idx: usize, data: &[u8]) {
266 debug_assert_eq!(data.len(), PAGE_SIZE);
267 let slice = self.buffers[idx].as_mut_slice();
268 slice.copy_from_slice(data);
269 }
270
271 fn extract_bytes(&self, idx: usize, len: usize) -> BytesMut {
272 BytesMut::from(&self.buffers[idx].as_slice()[..len])
273 }
274}
275
276struct WalFileEntry {
277 _path: PathBuf,
278 _file: std::fs::File,
279 fd: i32,
280}
281
282struct WalFileCache {
283 files: HashMap<PathBuf, WalFileEntry>,
284}
285
286impl WalFileCache {
287 fn new() -> Self {
288 Self {
289 files: HashMap::new(),
290 }
291 }
292
293 fn fd_for(&mut self, path: &Path) -> io::Result<i32> {
294 if let Some(entry) = self.files.get(path) {
295 return Ok(entry.fd);
296 }
297
298 if let Some(parent) = path.parent() {
299 std::fs::create_dir_all(parent)?;
300 }
301
302 let file = OpenOptions::new()
303 .create(true)
304 .read(true)
305 .write(true)
306 .open(path)?;
307 let fd = file.as_raw_fd();
308 let entry = WalFileEntry {
309 _path: path.to_path_buf(),
310 _file: file,
311 fd,
312 };
313 self.files.insert(path.to_path_buf(), entry);
314 Ok(fd)
315 }
316}
317
318fn queue_read(
319 ring: &mut IoUring,
320 pending: &mut HashMap<u64, PendingEntry>,
321 token_counter: &mut u64,
322 buffer_pool: &FixedBufferPool,
323 fd: i32,
324 page_id: PageId,
325 sender: DiskResultSender<BytesMut>,
326) {
327 let token = next_token(token_counter);
328 let offset = page_file_offset(page_id);
329
330 if let Some((idx, ptr)) = buffer_pool.acquire() {
331 let read_e = opcode::ReadFixed::new(types::Fd(fd), ptr, PAGE_SIZE as u32, idx as u16)
332 .offset(offset)
333 .build()
334 .user_data(token);
335 push_sqe(ring, read_e);
336 pending.insert(
337 token,
338 PendingEntry {
339 kind: PendingKind::ReadFixed { idx, sender },
340 },
341 );
342 } else {
343 let mut buffer = vec![0u8; PAGE_SIZE];
344 let read_e = opcode::Read::new(types::Fd(fd), buffer.as_mut_ptr(), PAGE_SIZE as u32)
345 .offset(offset)
346 .build()
347 .user_data(token);
348 push_sqe(ring, read_e);
349 pending.insert(
350 token,
351 PendingEntry {
352 kind: PendingKind::ReadVec { buffer, sender },
353 },
354 );
355 }
356}
357
358fn queue_read_batch(
359 ring: &mut IoUring,
360 pending: &mut HashMap<u64, PendingEntry>,
361 token_counter: &mut u64,
362 buffer_pool: &FixedBufferPool,
363 fd: i32,
364 page_ids: Vec<PageId>,
365 sender: DiskResultSender<Vec<BytesMut>>,
366) {
367 if page_ids.is_empty() {
368 if let Err(err) = sender.send(Ok(vec![])) {
369 log::error!("io_uring batch read send failed: {}", err);
370 }
371 return;
372 }
373
374 let batch = Rc::new(RefCell::new(BatchState::new(sender, page_ids.len())));
375
376 for (index, page_id) in page_ids.into_iter().enumerate() {
377 let token = next_token(token_counter);
378 let offset = page_file_offset(page_id);
379 if let Some((idx, ptr)) = buffer_pool.acquire() {
380 let read_e = opcode::ReadFixed::new(types::Fd(fd), ptr, PAGE_SIZE as u32, idx as u16)
381 .offset(offset)
382 .build()
383 .user_data(token);
384 push_sqe(ring, read_e);
385 pending.insert(
386 token,
387 PendingEntry {
388 kind: PendingKind::ReadBatchFixed {
389 idx,
390 batch: Rc::clone(&batch),
391 index,
392 },
393 },
394 );
395 } else {
396 let mut buffer = vec![0u8; PAGE_SIZE];
397 let read_e = opcode::Read::new(types::Fd(fd), buffer.as_mut_ptr(), PAGE_SIZE as u32)
398 .offset(offset)
399 .build()
400 .user_data(token);
401 push_sqe(ring, read_e);
402 pending.insert(
403 token,
404 PendingEntry {
405 kind: PendingKind::ReadBatchVec {
406 buffer,
407 batch: Rc::clone(&batch),
408 index,
409 },
410 },
411 );
412 }
413 }
414}
415
416fn queue_write(
417 ring: &mut IoUring,
418 pending: &mut HashMap<u64, PendingEntry>,
419 token_counter: &mut u64,
420 buffer_pool: &mut FixedBufferPool,
421 fd: i32,
422 page_id: PageId,
423 data: Bytes,
424 sender: DiskResultSender<()>,
425 fsync_on_write: bool,
426) {
427 let has_data = !data.is_empty();
428 let pending_ops = (if has_data { 1 } else { 0 }) + if fsync_on_write { 1 } else { 0 };
429
430 if pending_ops == 0 {
431 if let Err(err) = sender.send(Ok(())) {
432 log::error!("io_uring write result send failed: {}", err);
433 }
434 return;
435 }
436
437 let state = Rc::new(RefCell::new(WriteState::new(sender, pending_ops as u8)));
438
439 if has_data {
440 debug_assert_eq!(data.len(), PAGE_SIZE);
441 let write_token = next_token(token_counter);
442 let offset = page_file_offset(page_id);
443
444 if let Some((idx, ptr)) = buffer_pool.acquire() {
445 buffer_pool.fill_from_slice(idx, &data);
446 let entry = opcode::WriteFixed::new(types::Fd(fd), ptr, data.len() as u32, idx as u16)
447 .offset(offset)
448 .build()
449 .user_data(write_token);
450 push_sqe(ring, entry);
451 pending.insert(
452 write_token,
453 PendingEntry {
454 kind: PendingKind::WriteFixed {
455 idx,
456 state: Rc::clone(&state),
457 len: data.len(),
458 },
459 },
460 );
461 } else {
462 let buffer = data.clone();
463 let entry = opcode::Write::new(types::Fd(fd), buffer.as_ptr(), buffer.len() as u32)
464 .offset(offset)
465 .build()
466 .user_data(write_token);
467 push_sqe(ring, entry);
468 pending.insert(
469 write_token,
470 PendingEntry {
471 kind: PendingKind::WriteVec {
472 state: Rc::clone(&state),
473 _buffer: buffer,
474 len: data.len(),
475 },
476 },
477 );
478 }
479 }
480
481 if fsync_on_write {
482 let fsync_token = next_token(token_counter);
483 let fsync_entry = opcode::Fsync::new(types::Fd(fd))
484 .flags(types::FsyncFlags::DATASYNC)
485 .build()
486 .user_data(fsync_token);
487
488 push_sqe(ring, fsync_entry);
489
490 pending.insert(
491 fsync_token,
492 PendingEntry {
493 kind: PendingKind::Fsync {
494 state,
495 required: true,
496 },
497 },
498 );
499 }
500}
501
502fn queue_wal_write(
503 ring: &mut IoUring,
504 pending: &mut HashMap<u64, PendingEntry>,
505 token_counter: &mut u64,
506 fd: i32,
507 offset: u64,
508 data: Bytes,
509 sender: DiskResultSender<()>,
510 sync: bool,
511) {
512 let has_data = !data.is_empty();
513 let pending_ops = (if has_data { 1 } else { 0 }) + if sync { 1 } else { 0 };
514
515 if pending_ops == 0 {
516 if let Err(err) = sender.send(Ok(())) {
517 log::error!("io_uring WAL write result send failed: {}", err);
518 }
519 return;
520 }
521
522 let state = Rc::new(RefCell::new(WriteState::new(sender, pending_ops as u8)));
523
524 if has_data {
525 let write_token = next_token(token_counter);
526 let buffer = data.clone();
527 let entry = opcode::Write::new(types::Fd(fd), buffer.as_ptr(), buffer.len() as u32)
528 .offset(offset)
529 .build()
530 .user_data(write_token);
531 push_sqe(ring, entry);
532 pending.insert(
533 write_token,
534 PendingEntry {
535 kind: PendingKind::WriteVec {
536 state: Rc::clone(&state),
537 _buffer: buffer,
538 len: data.len(),
539 },
540 },
541 );
542 }
543
544 if sync {
545 let fsync_token = next_token(token_counter);
546 let fsync_entry = opcode::Fsync::new(types::Fd(fd))
547 .flags(types::FsyncFlags::DATASYNC)
548 .build()
549 .user_data(fsync_token);
550 push_sqe(ring, fsync_entry);
551 pending.insert(
552 fsync_token,
553 PendingEntry {
554 kind: PendingKind::Fsync {
555 state,
556 required: true,
557 },
558 },
559 );
560 }
561}
562
563fn queue_wal_fsync(
564 ring: &mut IoUring,
565 pending: &mut HashMap<u64, PendingEntry>,
566 token_counter: &mut u64,
567 fd: i32,
568 sender: DiskResultSender<()>,
569) {
570 let state = Rc::new(RefCell::new(WriteState::new(sender, 1)));
571 let fsync_token = next_token(token_counter);
572 let fsync_entry = opcode::Fsync::new(types::Fd(fd))
573 .flags(types::FsyncFlags::DATASYNC)
574 .build()
575 .user_data(fsync_token);
576 push_sqe(ring, fsync_entry);
577 pending.insert(
578 fsync_token,
579 PendingEntry {
580 kind: PendingKind::Fsync {
581 state,
582 required: true,
583 },
584 },
585 );
586}
587
588fn handle_request_direct(
589 request: DiskRequest,
590 ring: &mut IoUring,
591 pending: &mut HashMap<u64, PendingEntry>,
592 token_counter: &mut u64,
593 fd: i32,
594 disk_manager: &Arc<DiskManager>,
595 fsync_on_write: bool,
596 buffer_pool: &mut FixedBufferPool,
597 wal_files: &mut WalFileCache,
598) -> bool {
599 match request {
600 DiskRequest::ReadPage {
601 page_id,
602 result_sender,
603 } => {
604 queue_read(
605 ring,
606 pending,
607 token_counter,
608 buffer_pool,
609 fd,
610 page_id,
611 result_sender,
612 );
613 }
614 DiskRequest::ReadPages {
615 page_ids,
616 result_sender,
617 } => {
618 queue_read_batch(
619 ring,
620 pending,
621 token_counter,
622 buffer_pool,
623 fd,
624 page_ids,
625 result_sender,
626 );
627 }
628 DiskRequest::WritePage {
629 page_id,
630 data,
631 result_sender,
632 } => {
633 queue_write(
634 ring,
635 pending,
636 token_counter,
637 buffer_pool,
638 fd,
639 page_id,
640 data,
641 result_sender,
642 fsync_on_write,
643 );
644 }
645 DiskRequest::WriteWal {
646 path,
647 offset,
648 data,
649 sync,
650 result_sender,
651 } => match wal_files.fd_for(&path) {
652 Ok(wal_fd) => {
653 queue_wal_write(
654 ring,
655 pending,
656 token_counter,
657 wal_fd,
658 offset,
659 data,
660 result_sender,
661 sync,
662 );
663 }
664 Err(err) => {
665 let _ = result_sender.send(Err(QuillSQLError::Io(err)));
666 }
667 },
668 DiskRequest::FsyncWal {
669 path,
670 result_sender,
671 } => match wal_files.fd_for(&path) {
672 Ok(wal_fd) => {
673 queue_wal_fsync(ring, pending, token_counter, wal_fd, result_sender);
674 }
675 Err(err) => {
676 let _ = result_sender.send(Err(QuillSQLError::Io(err)));
677 }
678 },
679 DiskRequest::AllocatePage { result_sender } => {
680 let _ = result_sender.send(disk_manager.allocate_page());
681 }
682 DiskRequest::DeallocatePage {
683 page_id,
684 result_sender,
685 } => {
686 let _ = result_sender.send(disk_manager.deallocate_page(page_id));
687 }
688 DiskRequest::Shutdown => {
689 log::debug!("Disk I/O io_uring worker received Shutdown signal.");
690 return true;
691 }
692 }
693 false
694}
695
696fn drain_completions(
697 ring: &mut IoUring,
698 pending: &mut HashMap<u64, PendingEntry>,
699 buffer_pool: &FixedBufferPool,
700) {
701 while let Some(cqe) = ring.completion().next() {
702 let token = cqe.user_data();
703 if let Some(entry) = pending.remove(&token) {
704 let result_code = cqe.result();
705 match entry.kind {
706 PendingKind::ReadFixed { idx, sender } => {
707 let bytes = buffer_pool.extract_bytes(idx, PAGE_SIZE);
708 buffer_pool.release(idx);
709 let outcome = parse_read_result(result_code, bytes.to_vec());
710 if let Err(err) = sender.send(outcome.map(BytesMut::from)) {
711 log::error!("io_uring read fixed result send failed: {}", err);
712 }
713 }
714 PendingKind::ReadVec { buffer, sender } => {
715 let outcome = parse_read_result(result_code, buffer);
716 if let Err(err) = sender.send(outcome.map(BytesMut::from)) {
717 log::error!("io_uring read result send failed: {}", err);
718 }
719 }
720 PendingKind::ReadBatchFixed { idx, batch, index } => {
721 let bytes = buffer_pool.extract_bytes(idx, PAGE_SIZE);
722 buffer_pool.release(idx);
723 let outcome = parse_read_result(result_code, bytes.to_vec());
724 let mut batch_ref = batch.borrow_mut();
725 if let Some(result) = batch_ref.record_result(index, outcome) {
726 if let Err(err) = batch_ref.sender.send(result) {
727 log::error!("io_uring batch read send failed: {}", err);
728 }
729 }
730 }
731 PendingKind::ReadBatchVec {
732 buffer,
733 batch,
734 index,
735 } => {
736 let outcome = parse_read_result(result_code, buffer).map(BytesMut::from);
737 let mut batch_ref = batch.borrow_mut();
738 if let Some(result) = batch_ref.record_result(index, outcome) {
739 if let Err(err) = batch_ref.sender.send(result) {
740 log::error!("io_uring batch read send failed: {}", err);
741 }
742 }
743 }
744 PendingKind::WriteFixed { idx, state, len } => {
745 buffer_pool.release(idx);
746 let outcome = parse_write_result(result_code, len);
747 let _ = state.borrow_mut().record(outcome);
748 }
749 PendingKind::WriteVec { state, len, .. } => {
750 let outcome = parse_write_result(result_code, len);
751 let _ = state.borrow_mut().record(outcome);
752 }
753 PendingKind::Fsync { state, required } => {
754 let outcome = parse_fsync_result(result_code, required);
755 let _ = state.borrow_mut().record(outcome);
756 }
757 }
758 }
759 }
760}
761
762fn parse_read_result(code: i32, buffer: Vec<u8>) -> QuillSQLResult<BytesMut> {
763 if code < 0 {
764 let err = io::Error::from_raw_os_error(-code);
765 Err(QuillSQLError::Storage(format!(
766 "io_uring read failed: {}",
767 err
768 )))
769 } else if code as usize != PAGE_SIZE {
770 Err(QuillSQLError::Storage(format!(
771 "io_uring short read: {} bytes",
772 code
773 )))
774 } else {
775 Ok(BytesMut::from(&buffer[..]))
776 }
777}
778
779fn parse_write_result(code: i32, expected: usize) -> QuillSQLResult<()> {
780 if code < 0 {
781 let err = io::Error::from_raw_os_error(-code);
782 Err(QuillSQLError::Storage(format!(
783 "io_uring write failed: {}",
784 err
785 )))
786 } else if code as usize != expected {
787 Err(QuillSQLError::Storage(format!(
788 "io_uring short write: {} bytes",
789 code
790 )))
791 } else {
792 Ok(())
793 }
794}
795
796fn parse_fsync_result(code: i32, required: bool) -> QuillSQLResult<()> {
797 if !required {
798 return Ok(());
799 }
800 if code < 0 {
801 let err = io::Error::from_raw_os_error(-code);
802 Err(QuillSQLError::Storage(format!(
803 "io_uring fdatasync failed: {}",
804 err
805 )))
806 } else {
807 Ok(())
808 }
809}
810
811#[inline]
812fn page_file_offset(page_id: PageId) -> u64 {
813 (*META_PAGE_SIZE + (page_id - 1) as usize * PAGE_SIZE) as u64
814}
815
816pub(crate) fn worker_loop(
817 receiver: Receiver<DiskRequest>,
818 disk_manager: Arc<DiskManager>,
819 entries: u32,
820 fixed_buffers: usize,
821 sqpoll_idle: Option<u32>,
822 fsync_on_write: bool,
823) {
824 log::debug!("Disk I/O io_uring worker thread started.");
825 let mut builder = IoUring::builder();
826 if let Some(idle) = sqpoll_idle {
827 builder.setup_sqpoll(idle);
828 }
829 let mut ring = builder.build(entries).expect("io_uring init failed");
830
831 let mut buffer_pool = FixedBufferPool::new(fixed_buffers).expect("create fixed buffer pool");
832 buffer_pool
833 .register(&mut ring)
834 .expect("register fixed buffers");
835
836 let file = disk_manager
838 .try_clone_db_file()
839 .expect("clone db file for io_uring failed");
840 let fd = file.as_raw_fd();
841
842 let mut pending: HashMap<u64, PendingEntry> = HashMap::new();
843 let mut token_counter: u64 = 1;
844 let mut shutdown = false;
845 let mut wal_files = WalFileCache::new();
846
847 while !shutdown || !pending.is_empty() {
848 drain_completions(&mut ring, &mut pending, &buffer_pool);
850
851 if shutdown && pending.is_empty() {
852 break;
853 }
854
855 let mut received_any = false;
856 while let Ok(request) = receiver.try_recv() {
857 received_any = true;
858 if handle_request_direct(
859 request,
860 &mut ring,
861 &mut pending,
862 &mut token_counter,
863 fd,
864 &disk_manager,
865 fsync_on_write,
866 &mut buffer_pool,
867 &mut wal_files,
868 ) {
869 shutdown = true;
870 break;
871 }
872 }
873
874 if shutdown && pending.is_empty() {
875 break;
876 }
877
878 if pending.is_empty() {
879 match receiver.recv() {
881 Ok(req) => match req {
882 other => {
883 if handle_request_direct(
884 other,
885 &mut ring,
886 &mut pending,
887 &mut token_counter,
888 fd,
889 &disk_manager,
890 fsync_on_write,
891 &mut buffer_pool,
892 &mut wal_files,
893 ) {
894 shutdown = true;
895 }
896 }
897 },
898 Err(_) => {
899 shutdown = true;
900 }
901 }
902 } else if !received_any {
903 if let Err(e) = ring.submit_and_wait(1) {
905 log::error!("io_uring submit_and_wait failed: {}", e);
906 }
907 } else if let Err(e) = ring.submit() {
908 log::error!("io_uring submit failed: {}", e);
909 }
910 }
911
912 drain_completions(&mut ring, &mut pending, &buffer_pool);
914
915 log::debug!("Disk I/O io_uring worker thread finished.");
916}