1use super::disk_manager::DiskManager;
2use crate::buffer::PageId;
3use crate::config::IOSchedulerConfig;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use bytes::{Bytes, BytesMut};
6use std::collections::VecDeque;
7use std::fmt;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{mpsc, Arc, Condvar, Mutex};
11use std::thread;
12
13#[cfg(not(target_os = "linux"))]
14use crate::storage::io::block_io;
15#[cfg(target_os = "linux")]
16use crate::storage::io::io_uring;
17
18#[derive(Debug)]
19pub enum DiskError {
20 Io(std::io::Error),
21 Cancelled,
22}
23
24pub enum DiskResponse {
25 Read { data: BytesMut },
26 Write,
27 Allocate { page_id: PageId },
28 Error(QuillSQLError),
29}
30pub type DiskCommandResultSender<T> = mpsc::Sender<QuillSQLResult<T>>;
32pub type DiskCommandResultReceiver<T> = mpsc::Receiver<QuillSQLResult<T>>;
34
35#[derive(Debug, Clone)]
37pub enum DiskRequest {
38 ReadPage {
39 page_id: PageId,
40 result_sender: DiskCommandResultSender<BytesMut>,
41 },
42 ReadPages {
44 page_ids: Vec<PageId>,
45 result_sender: DiskCommandResultSender<Vec<BytesMut>>,
46 },
47 WritePage {
48 page_id: PageId,
49 data: Bytes,
50 result_sender: DiskCommandResultSender<()>,
51 },
52 WriteWal {
53 path: PathBuf,
54 offset: u64,
55 data: Bytes,
56 sync: bool,
57 result_sender: DiskCommandResultSender<()>,
58 },
59 FsyncWal {
60 path: PathBuf,
61 result_sender: DiskCommandResultSender<()>,
62 },
63 AllocatePage {
64 result_sender: mpsc::Sender<QuillSQLResult<PageId>>,
65 },
66 DeallocatePage {
67 page_id: PageId,
68 result_sender: DiskCommandResultSender<()>,
69 },
70 Shutdown,
71}
72
73#[derive(Debug)]
75pub struct DiskScheduler {
76 request_sender: RequestSender,
77 worker_threads: Vec<thread::JoinHandle<()>>,
78 pub config: IOSchedulerConfig,
79}
80
81pub(crate) struct RequestQueue {
82 queue: Mutex<VecDeque<DiskRequest>>,
83 condvar: Condvar,
84 shutdown: AtomicBool,
85}
86
87impl RequestQueue {
88 pub(crate) fn new() -> Self {
89 RequestQueue {
90 queue: Mutex::new(VecDeque::new()),
91 condvar: Condvar::new(),
92 shutdown: AtomicBool::new(false),
93 }
94 }
95
96 fn is_shutdown(&self) -> bool {
97 self.shutdown.load(Ordering::Acquire)
98 }
99
100 fn mark_shutdown(&self) {
101 if !self.shutdown.swap(true, Ordering::AcqRel) {
102 self.condvar.notify_all();
103 }
104 }
105}
106
107#[derive(Clone)]
108pub(crate) struct RequestSender {
109 queue: Arc<RequestQueue>,
110}
111
112impl fmt::Debug for RequestSender {
113 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114 f.debug_struct("RequestSender").finish()
115 }
116}
117
118impl RequestSender {
119 pub(crate) fn new(queue: Arc<RequestQueue>) -> Self {
120 RequestSender { queue }
121 }
122
123 fn send(&self, request: DiskRequest) -> Result<(), DiskRequest> {
124 if self.queue.is_shutdown() {
125 return Err(request);
126 }
127
128 let mut guard = self.queue.queue.lock().unwrap();
129 if self.queue.is_shutdown() {
130 return Err(request);
131 }
132
133 guard.push_back(request);
134 self.queue.condvar.notify_one();
135 Ok(())
136 }
137
138 fn close(&self) {
139 self.queue.mark_shutdown();
140 }
141}
142
143#[derive(Clone)]
144pub(crate) struct RequestReceiver {
145 queue: Arc<RequestQueue>,
146}
147
148impl RequestReceiver {
149 pub(crate) fn new(queue: Arc<RequestQueue>) -> Self {
150 RequestReceiver { queue }
151 }
152
153 pub(crate) fn try_recv(&self) -> Option<DiskRequest> {
154 let mut guard = self.queue.queue.lock().unwrap();
155 guard.pop_front()
156 }
157
158 pub(crate) fn recv(&self) -> Option<DiskRequest> {
159 let mut guard = self.queue.queue.lock().unwrap();
160 loop {
161 if let Some(request) = guard.pop_front() {
162 return Some(request);
163 }
164
165 if self.queue.is_shutdown() {
166 return None;
167 }
168
169 guard = self.queue.condvar.wait(guard).unwrap();
170 }
171 }
172
173 pub(crate) fn is_shutdown(&self) -> bool {
174 self.queue.is_shutdown()
175 }
176}
177
178impl DiskScheduler {
179 pub fn new(disk_manager: Arc<DiskManager>) -> Self {
180 Self::new_with_config(disk_manager, IOSchedulerConfig::default())
181 }
182
183 pub fn new_with_config(disk_manager: Arc<DiskManager>, config: IOSchedulerConfig) -> Self {
184 #[cfg(target_os = "linux")]
185 let (request_sender, worker_threads) = spawn_runtime(disk_manager.clone(), config.clone());
186
187 #[cfg(not(target_os = "linux"))]
188 let (request_sender, worker_threads) =
189 block_io::spawn_runtime(disk_manager.clone(), config.clone());
190
191 DiskScheduler {
192 request_sender,
193 worker_threads,
194 config,
195 }
196 }
197
198 pub fn schedule_read(
201 &self,
202 page_id: PageId,
203 ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
204 let (tx, rx) = mpsc::channel();
205 self.request_sender
206 .send(DiskRequest::ReadPage {
207 page_id,
208 result_sender: tx,
209 })
210 .map_err(|_| {
211 QuillSQLError::Internal(
212 "Failed to enqueue Read request: scheduler shutting down".to_string(),
213 )
214 })?;
215 Ok(rx)
216 }
217
218 pub fn schedule_write(
219 &self,
220 page_id: PageId,
221 data: Bytes,
222 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
223 let (tx, rx) = mpsc::channel();
224 self.request_sender
225 .send(DiskRequest::WritePage {
226 page_id,
227 data,
228 result_sender: tx,
229 })
230 .map_err(|_| {
231 QuillSQLError::Internal(
232 "Failed to enqueue Write request: scheduler shutting down".to_string(),
233 )
234 })?;
235 Ok(rx)
236 }
237
238 pub fn schedule_wal_write(
239 &self,
240 path: PathBuf,
241 offset: u64,
242 data: Bytes,
243 sync: bool,
244 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
245 let (tx, rx) = mpsc::channel();
246 self.request_sender
247 .send(DiskRequest::WriteWal {
248 path,
249 offset,
250 data,
251 sync,
252 result_sender: tx,
253 })
254 .map_err(|_| {
255 QuillSQLError::Internal(
256 "Failed to enqueue WAL write request: scheduler shutting down".to_string(),
257 )
258 })?;
259 Ok(rx)
260 }
261
262 pub fn schedule_wal_fsync(
263 &self,
264 path: PathBuf,
265 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
266 let (tx, rx) = mpsc::channel();
267 self.request_sender
268 .send(DiskRequest::FsyncWal {
269 path,
270 result_sender: tx,
271 })
272 .map_err(|_| {
273 QuillSQLError::Internal(
274 "Failed to enqueue WAL fsync request: scheduler shutting down".to_string(),
275 )
276 })?;
277 Ok(rx)
278 }
279
280 pub fn schedule_read_pages(
281 &self,
282 page_ids: Vec<PageId>,
283 ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
284 let (tx, rx) = mpsc::channel();
285 self.request_sender
286 .send(DiskRequest::ReadPages {
287 page_ids,
288 result_sender: tx,
289 })
290 .map_err(|_| {
291 QuillSQLError::Internal(
292 "Failed to enqueue ReadPages request: scheduler shutting down".to_string(),
293 )
294 })?;
295 Ok(rx)
296 }
297
298 pub fn schedule_allocate(&self) -> QuillSQLResult<mpsc::Receiver<QuillSQLResult<PageId>>> {
301 let (tx, rx) = mpsc::channel();
302 self.request_sender
303 .send(DiskRequest::AllocatePage { result_sender: tx })
304 .map_err(|_| {
305 QuillSQLError::Internal(
306 "Failed to enqueue Allocate request: scheduler shutting down".to_string(),
307 )
308 })?;
309 Ok(rx)
310 }
311
312 pub fn schedule_deallocate(
313 &self,
314 page_id: PageId,
315 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
316 let (tx, rx) = mpsc::channel();
317 self.request_sender
318 .send(DiskRequest::DeallocatePage {
319 page_id,
320 result_sender: tx,
321 })
322 .map_err(|_| {
323 QuillSQLError::Internal(
324 "Failed to enqueue Deallocate request: scheduler shutting down".to_string(),
325 )
326 })?;
327 Ok(rx)
328 }
329}
330
331impl Drop for DiskScheduler {
333 fn drop(&mut self) {
334 for _ in 0..self.config.workers {
335 let _ = self.request_sender.send(DiskRequest::Shutdown);
336 }
337 self.request_sender.close();
338 for handle in self.worker_threads.drain(..) {
339 if let Err(e) = handle.join() {
340 log::error!("Disk worker thread panicked: {:?}", e);
341 }
342 }
343 }
344}
345
346#[cfg(target_os = "linux")]
347fn spawn_runtime(
348 disk_manager: Arc<DiskManager>,
349 config: IOSchedulerConfig,
350) -> (RequestSender, Vec<thread::JoinHandle<()>>) {
351 let worker_count = config.workers;
352 let queue = Arc::new(RequestQueue::new());
353 let sender = RequestSender::new(queue.clone());
354
355 let mut worker_threads = Vec::with_capacity(worker_count);
356 for i in 0..worker_count {
357 let dm = disk_manager.clone();
358 let worker_config = config;
359 let entries = worker_config.iouring_queue_depth as u32;
360 let fixed_count = worker_config.iouring_fixed_buffers;
361 let sqpoll_idle = worker_config.iouring_sqpoll_idle_ms;
362 let fsync_on_write = worker_config.fsync_on_write;
363 let rx = RequestReceiver::new(queue.clone());
364 let handle = thread::Builder::new()
365 .name(format!("disk-scheduler-iouring-worker-{}", i))
366 .spawn(move || {
367 io_uring::worker_loop(rx, dm, entries, fixed_count, sqpoll_idle, fsync_on_write);
368 })
369 .expect("Failed to spawn DiskScheduler io_uring worker thread");
370 worker_threads.push(handle);
371 }
372
373 (sender, worker_threads)
374}
375
376#[cfg(test)]
378mod tests {
379 use super::DiskManager;
380 use super::*;
381 use crate::buffer::PAGE_SIZE;
382 use crate::error::QuillSQLResult;
383 use bytes::{Bytes, BytesMut};
384 use std::sync::Arc;
385 use std::thread;
386 use std::time::Duration;
387 use tempfile::TempDir;
388
389 fn create_test_scheduler() -> (TempDir, Arc<DiskScheduler>, Arc<DiskManager>) {
391 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
392 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
393 let scheduler = Arc::new(DiskScheduler::new(dm.clone()));
394 (temp_dir, scheduler, dm)
395 }
396
397 fn create_dummy_page_bytes(content: &str) -> Bytes {
399 let mut data = BytesMut::zeroed(PAGE_SIZE);
400 let content_bytes = content.as_bytes();
401 let len = std::cmp::min(content_bytes.len(), PAGE_SIZE);
402 data[..len].copy_from_slice(&content_bytes[..len]);
403 data.freeze() }
405
406 fn read_page_content(data: &BytesMut) -> String {
408 let first_null = data.iter().position(|&b| b == 0).unwrap_or(data.len());
409 String::from_utf8_lossy(&data[..first_null]).to_string()
410 }
411
412 #[test]
413 fn test_scheduler_allocate_write_read() -> QuillSQLResult<()> {
414 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
415
416 let rx_alloc = scheduler.schedule_allocate()?;
418 let page_id = rx_alloc
419 .recv()
420 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
421
422 let content = "Hello DiskScheduler!";
424 let data_bytes = create_dummy_page_bytes(content);
425 let rx_write = scheduler.schedule_write(page_id, data_bytes)?;
426 rx_write
427 .recv()
428 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
429
430 let rx_read = scheduler.schedule_read(page_id)?;
432 let read_result = rx_read
433 .recv()
434 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
435 assert_eq!(read_page_content(&read_result), content);
436
437 Ok(())
438 }
439
440 #[test]
441 fn test_scheduler_deallocate() -> QuillSQLResult<()> {
442 let (_temp_dir, scheduler, dm) = create_test_scheduler();
443
444 let page_id = scheduler
446 .schedule_allocate()?
447 .recv()
448 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
449
450 scheduler
451 .schedule_write(page_id, create_dummy_page_bytes("Test Data"))?
452 .recv()
453 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
454
455 let rx_dealloc = scheduler.schedule_deallocate(page_id)?;
457 rx_dealloc
458 .recv()
459 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
460
461 let data_after_dealloc = dm.read_page(page_id)?;
463 assert!(data_after_dealloc.iter().all(|&b| b == 0));
464
465 Ok(())
466 }
467
468 #[test]
469 fn test_concurrent_operations() -> QuillSQLResult<()> {
470 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
471
472 let page_id = scheduler
474 .schedule_allocate()?
475 .recv()
476 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
477
478 scheduler
479 .schedule_write(page_id, create_dummy_page_bytes("Concurrent Test"))?
480 .recv()
481 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
482
483 let mut handles = vec![];
485 let num_threads = 10; for i in 0..num_threads {
488 let scheduler_clone = scheduler.clone();
489 let handle = thread::spawn(move || {
490 thread::sleep(Duration::from_millis(i * 5));
492
493 scheduler_clone
494 .schedule_read(page_id)
495 .map_err(|e| e.to_string())
496 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
497 .and_then(|res| res.map_err(|e| e.to_string()))
498 });
499 handles.push(handle);
500 }
501
502 for handle in handles {
504 match handle.join().unwrap() {
505 Ok(read_data) => assert_eq!(read_page_content(&read_data), "Concurrent Test"),
506 Err(e) => panic!("Concurrent read thread failed: {}", e),
507 }
508 }
509
510 Ok(())
511 }
512
513 #[test]
514 fn test_mixed_operations() -> QuillSQLResult<()> {
515 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
516
517 let mut page_ids = vec![];
519 let num_pages = 5;
520
521 for _ in 0..num_pages {
522 let page_id = scheduler
523 .schedule_allocate()?
524 .recv()
525 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
526 page_ids.push(page_id);
527 }
528
529 for (i, &page_id) in page_ids.iter().enumerate() {
531 let content = format!("Page {} content", i);
532
533 scheduler
535 .schedule_write(page_id, create_dummy_page_bytes(&content))?
536 .recv()
537 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
538
539 let read_data = scheduler
541 .schedule_read(page_id)?
542 .recv()
543 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
544
545 assert_eq!(read_page_content(&read_data), content);
546 }
547
548 for &page_id in page_ids.iter().take(2) {
550 scheduler
551 .schedule_deallocate(page_id)?
552 .recv()
553 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
554 }
555
556 Ok(())
557 }
558
559 #[test]
560 fn test_scheduler_shutdown() -> QuillSQLResult<()> {
561 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
562 let scheduler_arc = scheduler;
563
564 let scheduler_clone = scheduler_arc.clone();
566 let handle = thread::spawn(move || {
567 thread::sleep(Duration::from_millis(100));
569
570 scheduler_clone
572 .schedule_allocate()
573 .map_err(|e| e.to_string())
574 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
575 .and_then(|res| res.map_err(|e| e.to_string()))
576 });
577
578 drop(scheduler_arc);
580
581 match handle.join().unwrap() {
583 Ok(page_id) => println!("Thread completed after shutdown: {:?}", page_id),
584 Err(e) => println!("Thread failed as expected after shutdown: {}", e),
585 }
586
587 Ok(())
588 }
589
590 #[test]
591 fn test_large_data_transfer() -> QuillSQLResult<()> {
592 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
593
594 let page_id = scheduler
596 .schedule_allocate()?
597 .recv()
598 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
599
600 let large_string = "X".repeat(PAGE_SIZE - 100);
602 let large_data = create_dummy_page_bytes(&large_string);
603
604 scheduler
606 .schedule_write(page_id, large_data)?
607 .recv()
608 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
609
610 let read_result = scheduler
612 .schedule_read(page_id)?
613 .recv()
614 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
615
616 let read_content = read_page_content(&read_result);
618 assert_eq!(read_content.len(), large_string.len());
619 assert_eq!(&read_content[0..10], &large_string[0..10]); Ok(())
622 }
623
624 #[cfg(target_os = "linux")]
625 #[test]
626 fn test_iouring_allocate_write_read() -> QuillSQLResult<()> {
627 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
628 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
629 let mut cfg = IOSchedulerConfig::default();
630 cfg.iouring_queue_depth = 256;
631 let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
632
633 let rx_alloc = scheduler.schedule_allocate()?;
635 let page_id = rx_alloc
636 .recv()
637 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
638
639 let mut data = BytesMut::zeroed(PAGE_SIZE);
641 data[..4].copy_from_slice(&[1, 2, 3, 4]);
642 scheduler
643 .schedule_write(page_id, data.freeze())?
644 .recv()
645 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
646
647 let read = scheduler
649 .schedule_read(page_id)?
650 .recv()
651 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
652 assert_eq!(&read[..4], &[1, 2, 3, 4]);
653 Ok(())
654 }
655
656 #[cfg(target_os = "linux")]
657 #[test]
658 fn test_iouring_concurrent_reads() -> QuillSQLResult<()> {
659 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
660 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
661 let mut cfg = IOSchedulerConfig::default();
662 cfg.iouring_queue_depth = 256;
663 let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
664
665 let page_id = scheduler
666 .schedule_allocate()?
667 .recv()
668 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
669
670 scheduler
671 .schedule_write(page_id, {
672 let mut b = BytesMut::zeroed(PAGE_SIZE);
673 b[..13].copy_from_slice(b"Hello, World!");
674 b.freeze()
675 })?
676 .recv()
677 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
678
679 let mut handles = vec![];
680 for _ in 0..8u32 {
681 let s = scheduler.clone();
682 handles.push(thread::spawn(move || {
683 let data = s
684 .schedule_read(page_id)
685 .map_err(|e| e.to_string())
686 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
687 .and_then(|res| res.map_err(|e| e.to_string()))?;
688 if &data[..13] != b"Hello, World!" {
689 return Err("mismatch".into());
690 }
691 Ok::<(), String>(())
692 }));
693 }
694 for h in handles {
695 h.join().unwrap().unwrap();
696 }
697 Ok(())
698 }
699}