quill_sql/storage/
disk_scheduler.rs

1use super::disk_manager::DiskManager;
2use crate::buffer::PageId;
3use crate::config::IOSchedulerConfig;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use bytes::{Bytes, BytesMut};
6use std::collections::VecDeque;
7use std::fmt;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::{mpsc, Arc, Condvar, Mutex};
11use std::thread;
12
13#[cfg(not(target_os = "linux"))]
14use crate::storage::io::block_io;
15#[cfg(target_os = "linux")]
16use crate::storage::io::io_uring;
17
18#[derive(Debug)]
19pub enum DiskError {
20    Io(std::io::Error),
21    Cancelled,
22}
23
24pub enum DiskResponse {
25    Read { data: BytesMut },
26    Write,
27    Allocate { page_id: PageId },
28    Error(QuillSQLError),
29}
30// Type alias for the sender part of the result channel
31pub type DiskCommandResultSender<T> = mpsc::Sender<QuillSQLResult<T>>;
32// Type alias for the receiver part of the result channel
33pub type DiskCommandResultReceiver<T> = mpsc::Receiver<QuillSQLResult<T>>;
34
35// Commands sent from BufferManager to the DiskScheduler task
36#[derive(Debug, Clone)]
37pub enum DiskRequest {
38    ReadPage {
39        page_id: PageId,
40        result_sender: DiskCommandResultSender<BytesMut>,
41    },
42    /// Read arbitrary pages by id order; returns buffers in the same order
43    ReadPages {
44        page_ids: Vec<PageId>,
45        result_sender: DiskCommandResultSender<Vec<BytesMut>>,
46    },
47    WritePage {
48        page_id: PageId,
49        data: Bytes,
50        result_sender: DiskCommandResultSender<()>,
51    },
52    WriteWal {
53        path: PathBuf,
54        offset: u64,
55        data: Bytes,
56        sync: bool,
57        result_sender: DiskCommandResultSender<()>,
58    },
59    FsyncWal {
60        path: PathBuf,
61        result_sender: DiskCommandResultSender<()>,
62    },
63    AllocatePage {
64        result_sender: mpsc::Sender<QuillSQLResult<PageId>>,
65    },
66    DeallocatePage {
67        page_id: PageId,
68        result_sender: DiskCommandResultSender<()>,
69    },
70    Shutdown,
71}
72
73// Structure to manage the background I/O thread
74#[derive(Debug)]
75pub struct DiskScheduler {
76    request_sender: RequestSender,
77    worker_threads: Vec<thread::JoinHandle<()>>,
78    pub config: IOSchedulerConfig,
79}
80
81pub(crate) struct RequestQueue {
82    queue: Mutex<VecDeque<DiskRequest>>,
83    condvar: Condvar,
84    shutdown: AtomicBool,
85}
86
87impl RequestQueue {
88    pub(crate) fn new() -> Self {
89        RequestQueue {
90            queue: Mutex::new(VecDeque::new()),
91            condvar: Condvar::new(),
92            shutdown: AtomicBool::new(false),
93        }
94    }
95
96    fn is_shutdown(&self) -> bool {
97        self.shutdown.load(Ordering::Acquire)
98    }
99
100    fn mark_shutdown(&self) {
101        if !self.shutdown.swap(true, Ordering::AcqRel) {
102            self.condvar.notify_all();
103        }
104    }
105}
106
107#[derive(Clone)]
108pub(crate) struct RequestSender {
109    queue: Arc<RequestQueue>,
110}
111
112impl fmt::Debug for RequestSender {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        f.debug_struct("RequestSender").finish()
115    }
116}
117
118impl RequestSender {
119    pub(crate) fn new(queue: Arc<RequestQueue>) -> Self {
120        RequestSender { queue }
121    }
122
123    fn send(&self, request: DiskRequest) -> Result<(), DiskRequest> {
124        if self.queue.is_shutdown() {
125            return Err(request);
126        }
127
128        let mut guard = self.queue.queue.lock().unwrap();
129        if self.queue.is_shutdown() {
130            return Err(request);
131        }
132
133        guard.push_back(request);
134        self.queue.condvar.notify_one();
135        Ok(())
136    }
137
138    fn close(&self) {
139        self.queue.mark_shutdown();
140    }
141}
142
143#[derive(Clone)]
144pub(crate) struct RequestReceiver {
145    queue: Arc<RequestQueue>,
146}
147
148impl RequestReceiver {
149    pub(crate) fn new(queue: Arc<RequestQueue>) -> Self {
150        RequestReceiver { queue }
151    }
152
153    pub(crate) fn try_recv(&self) -> Option<DiskRequest> {
154        let mut guard = self.queue.queue.lock().unwrap();
155        guard.pop_front()
156    }
157
158    pub(crate) fn recv(&self) -> Option<DiskRequest> {
159        let mut guard = self.queue.queue.lock().unwrap();
160        loop {
161            if let Some(request) = guard.pop_front() {
162                return Some(request);
163            }
164
165            if self.queue.is_shutdown() {
166                return None;
167            }
168
169            guard = self.queue.condvar.wait(guard).unwrap();
170        }
171    }
172
173    pub(crate) fn is_shutdown(&self) -> bool {
174        self.queue.is_shutdown()
175    }
176}
177
178impl DiskScheduler {
179    pub fn new(disk_manager: Arc<DiskManager>) -> Self {
180        Self::new_with_config(disk_manager, IOSchedulerConfig::default())
181    }
182
183    pub fn new_with_config(disk_manager: Arc<DiskManager>, config: IOSchedulerConfig) -> Self {
184        #[cfg(target_os = "linux")]
185        let (request_sender, worker_threads) = spawn_runtime(disk_manager.clone(), config.clone());
186
187        #[cfg(not(target_os = "linux"))]
188        let (request_sender, worker_threads) =
189            block_io::spawn_runtime(disk_manager.clone(), config.clone());
190
191        DiskScheduler {
192            request_sender,
193            worker_threads,
194            config,
195        }
196    }
197
198    // --- Public methods to send requests ---
199
200    pub fn schedule_read(
201        &self,
202        page_id: PageId,
203    ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
204        let (tx, rx) = mpsc::channel();
205        self.request_sender
206            .send(DiskRequest::ReadPage {
207                page_id,
208                result_sender: tx,
209            })
210            .map_err(|_| {
211                QuillSQLError::Internal(
212                    "Failed to enqueue Read request: scheduler shutting down".to_string(),
213                )
214            })?;
215        Ok(rx)
216    }
217
218    pub fn schedule_write(
219        &self,
220        page_id: PageId,
221        data: Bytes,
222    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
223        let (tx, rx) = mpsc::channel();
224        self.request_sender
225            .send(DiskRequest::WritePage {
226                page_id,
227                data,
228                result_sender: tx,
229            })
230            .map_err(|_| {
231                QuillSQLError::Internal(
232                    "Failed to enqueue Write request: scheduler shutting down".to_string(),
233                )
234            })?;
235        Ok(rx)
236    }
237
238    pub fn schedule_wal_write(
239        &self,
240        path: PathBuf,
241        offset: u64,
242        data: Bytes,
243        sync: bool,
244    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
245        let (tx, rx) = mpsc::channel();
246        self.request_sender
247            .send(DiskRequest::WriteWal {
248                path,
249                offset,
250                data,
251                sync,
252                result_sender: tx,
253            })
254            .map_err(|_| {
255                QuillSQLError::Internal(
256                    "Failed to enqueue WAL write request: scheduler shutting down".to_string(),
257                )
258            })?;
259        Ok(rx)
260    }
261
262    pub fn schedule_wal_fsync(
263        &self,
264        path: PathBuf,
265    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
266        let (tx, rx) = mpsc::channel();
267        self.request_sender
268            .send(DiskRequest::FsyncWal {
269                path,
270                result_sender: tx,
271            })
272            .map_err(|_| {
273                QuillSQLError::Internal(
274                    "Failed to enqueue WAL fsync request: scheduler shutting down".to_string(),
275                )
276            })?;
277        Ok(rx)
278    }
279
280    pub fn schedule_read_pages(
281        &self,
282        page_ids: Vec<PageId>,
283    ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
284        let (tx, rx) = mpsc::channel();
285        self.request_sender
286            .send(DiskRequest::ReadPages {
287                page_ids,
288                result_sender: tx,
289            })
290            .map_err(|_| {
291                QuillSQLError::Internal(
292                    "Failed to enqueue ReadPages request: scheduler shutting down".to_string(),
293                )
294            })?;
295        Ok(rx)
296    }
297
298    // removed schedule_write_pages_contiguous
299
300    pub fn schedule_allocate(&self) -> QuillSQLResult<mpsc::Receiver<QuillSQLResult<PageId>>> {
301        let (tx, rx) = mpsc::channel();
302        self.request_sender
303            .send(DiskRequest::AllocatePage { result_sender: tx })
304            .map_err(|_| {
305                QuillSQLError::Internal(
306                    "Failed to enqueue Allocate request: scheduler shutting down".to_string(),
307                )
308            })?;
309        Ok(rx)
310    }
311
312    pub fn schedule_deallocate(
313        &self,
314        page_id: PageId,
315    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
316        let (tx, rx) = mpsc::channel();
317        self.request_sender
318            .send(DiskRequest::DeallocatePage {
319                page_id,
320                result_sender: tx,
321            })
322            .map_err(|_| {
323                QuillSQLError::Internal(
324                    "Failed to enqueue Deallocate request: scheduler shutting down".to_string(),
325                )
326            })?;
327        Ok(rx)
328    }
329}
330
331// Implement Drop for graceful shutdown
332impl Drop for DiskScheduler {
333    fn drop(&mut self) {
334        for _ in 0..self.config.workers {
335            let _ = self.request_sender.send(DiskRequest::Shutdown);
336        }
337        self.request_sender.close();
338        for handle in self.worker_threads.drain(..) {
339            if let Err(e) = handle.join() {
340                log::error!("Disk worker thread panicked: {:?}", e);
341            }
342        }
343    }
344}
345
346#[cfg(target_os = "linux")]
347fn spawn_runtime(
348    disk_manager: Arc<DiskManager>,
349    config: IOSchedulerConfig,
350) -> (RequestSender, Vec<thread::JoinHandle<()>>) {
351    let worker_count = config.workers;
352    let queue = Arc::new(RequestQueue::new());
353    let sender = RequestSender::new(queue.clone());
354
355    let mut worker_threads = Vec::with_capacity(worker_count);
356    for i in 0..worker_count {
357        let dm = disk_manager.clone();
358        let worker_config = config;
359        let entries = worker_config.iouring_queue_depth as u32;
360        let fixed_count = worker_config.iouring_fixed_buffers;
361        let sqpoll_idle = worker_config.iouring_sqpoll_idle_ms;
362        let fsync_on_write = worker_config.fsync_on_write;
363        let rx = RequestReceiver::new(queue.clone());
364        let handle = thread::Builder::new()
365            .name(format!("disk-scheduler-iouring-worker-{}", i))
366            .spawn(move || {
367                io_uring::worker_loop(rx, dm, entries, fixed_count, sqpoll_idle, fsync_on_write);
368            })
369            .expect("Failed to spawn DiskScheduler io_uring worker thread");
370        worker_threads.push(handle);
371    }
372
373    (sender, worker_threads)
374}
375
376// --- Tests for DiskScheduler ---
377#[cfg(test)]
378mod tests {
379    use super::DiskManager;
380    use super::*;
381    use crate::buffer::PAGE_SIZE;
382    use crate::error::QuillSQLResult;
383    use bytes::{Bytes, BytesMut};
384    use std::sync::Arc;
385    use std::thread;
386    use std::time::Duration;
387    use tempfile::TempDir;
388
389    // Helper to create a scheduler with temp directory
390    fn create_test_scheduler() -> (TempDir, Arc<DiskScheduler>, Arc<DiskManager>) {
391        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
392        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
393        let scheduler = Arc::new(DiskScheduler::new(dm.clone()));
394        (temp_dir, scheduler, dm)
395    }
396
397    // Helper to create dummy page data as Bytes
398    fn create_dummy_page_bytes(content: &str) -> Bytes {
399        let mut data = BytesMut::zeroed(PAGE_SIZE);
400        let content_bytes = content.as_bytes();
401        let len = std::cmp::min(content_bytes.len(), PAGE_SIZE);
402        data[..len].copy_from_slice(&content_bytes[..len]);
403        data.freeze() // Convert to Bytes
404    }
405
406    // Helper to read content back from BytesMut
407    fn read_page_content(data: &BytesMut) -> String {
408        let first_null = data.iter().position(|&b| b == 0).unwrap_or(data.len());
409        String::from_utf8_lossy(&data[..first_null]).to_string()
410    }
411
412    #[test]
413    fn test_scheduler_allocate_write_read() -> QuillSQLResult<()> {
414        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
415
416        // allocate pagge
417        let rx_alloc = scheduler.schedule_allocate()?;
418        let page_id = rx_alloc
419            .recv()
420            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
421
422        // write page test
423        let content = "Hello DiskScheduler!";
424        let data_bytes = create_dummy_page_bytes(content);
425        let rx_write = scheduler.schedule_write(page_id, data_bytes)?;
426        rx_write
427            .recv()
428            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
429
430        // read and verify data
431        let rx_read = scheduler.schedule_read(page_id)?;
432        let read_result = rx_read
433            .recv()
434            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
435        assert_eq!(read_page_content(&read_result), content);
436
437        Ok(())
438    }
439
440    #[test]
441    fn test_scheduler_deallocate() -> QuillSQLResult<()> {
442        let (_temp_dir, scheduler, dm) = create_test_scheduler();
443
444        // allocate page and write data
445        let page_id = scheduler
446            .schedule_allocate()?
447            .recv()
448            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
449
450        scheduler
451            .schedule_write(page_id, create_dummy_page_bytes("Test Data"))?
452            .recv()
453            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
454
455        // free page
456        let rx_dealloc = scheduler.schedule_deallocate(page_id)?;
457        rx_dealloc
458            .recv()
459            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
460
461        // verify deallocation by attempting to read (should return zeroed data)
462        let data_after_dealloc = dm.read_page(page_id)?;
463        assert!(data_after_dealloc.iter().all(|&b| b == 0));
464
465        Ok(())
466    }
467
468    #[test]
469    fn test_concurrent_operations() -> QuillSQLResult<()> {
470        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
471
472        // 创建测试页面
473        let page_id = scheduler
474            .schedule_allocate()?
475            .recv()
476            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
477
478        scheduler
479            .schedule_write(page_id, create_dummy_page_bytes("Concurrent Test"))?
480            .recv()
481            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
482
483        // 启动多个并发读取线程
484        let mut handles = vec![];
485        let num_threads = 10; // 增加并发线程数
486
487        for i in 0..num_threads {
488            let scheduler_clone = scheduler.clone();
489            let handle = thread::spawn(move || {
490                // 每个线程有轻微延迟,增加并发可能性
491                thread::sleep(Duration::from_millis(i * 5));
492
493                scheduler_clone
494                    .schedule_read(page_id)
495                    .map_err(|e| e.to_string())
496                    .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
497                    .and_then(|res| res.map_err(|e| e.to_string()))
498            });
499            handles.push(handle);
500        }
501
502        // 验证所有线程都能正确读取数据
503        for handle in handles {
504            match handle.join().unwrap() {
505                Ok(read_data) => assert_eq!(read_page_content(&read_data), "Concurrent Test"),
506                Err(e) => panic!("Concurrent read thread failed: {}", e),
507            }
508        }
509
510        Ok(())
511    }
512
513    #[test]
514    fn test_mixed_operations() -> QuillSQLResult<()> {
515        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
516
517        // 分配多个页面
518        let mut page_ids = vec![];
519        let num_pages = 5;
520
521        for _ in 0..num_pages {
522            let page_id = scheduler
523                .schedule_allocate()?
524                .recv()
525                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
526            page_ids.push(page_id);
527        }
528
529        // 对每个页面执行读写测试
530        for (i, &page_id) in page_ids.iter().enumerate() {
531            let content = format!("Page {} content", i);
532
533            // 写入
534            scheduler
535                .schedule_write(page_id, create_dummy_page_bytes(&content))?
536                .recv()
537                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
538
539            // 读取并验证
540            let read_data = scheduler
541                .schedule_read(page_id)?
542                .recv()
543                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
544
545            assert_eq!(read_page_content(&read_data), content);
546        }
547
548        // 释放一部分页面
549        for &page_id in page_ids.iter().take(2) {
550            scheduler
551                .schedule_deallocate(page_id)?
552                .recv()
553                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
554        }
555
556        Ok(())
557    }
558
559    #[test]
560    fn test_scheduler_shutdown() -> QuillSQLResult<()> {
561        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
562        let scheduler_arc = scheduler;
563
564        // 启动后台线程,在调度器关闭后尝试操作
565        let scheduler_clone = scheduler_arc.clone();
566        let handle = thread::spawn(move || {
567            // 等待一段时间,以便主线程有时间关闭调度器
568            thread::sleep(Duration::from_millis(100));
569
570            // 尝试在调度器关闭后分配页面,应该会失败
571            scheduler_clone
572                .schedule_allocate()
573                .map_err(|e| e.to_string())
574                .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
575                .and_then(|res| res.map_err(|e| e.to_string()))
576        });
577
578        // 关闭调度器
579        drop(scheduler_arc);
580
581        // 检查后台线程结果
582        match handle.join().unwrap() {
583            Ok(page_id) => println!("Thread completed after shutdown: {:?}", page_id),
584            Err(e) => println!("Thread failed as expected after shutdown: {}", e),
585        }
586
587        Ok(())
588    }
589
590    #[test]
591    fn test_large_data_transfer() -> QuillSQLResult<()> {
592        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
593
594        // 分配页面
595        let page_id = scheduler
596            .schedule_allocate()?
597            .recv()
598            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
599
600        // 创建一个接近页面大小限制的大数据
601        let large_string = "X".repeat(PAGE_SIZE - 100);
602        let large_data = create_dummy_page_bytes(&large_string);
603
604        // 写入大数据
605        scheduler
606            .schedule_write(page_id, large_data)?
607            .recv()
608            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
609
610        // 读取并验证大数据
611        let read_result = scheduler
612            .schedule_read(page_id)?
613            .recv()
614            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
615
616        // 验证数据长度,避免完整字符串比较
617        let read_content = read_page_content(&read_result);
618        assert_eq!(read_content.len(), large_string.len());
619        assert_eq!(&read_content[0..10], &large_string[0..10]); // 检查前缀
620
621        Ok(())
622    }
623
624    #[cfg(target_os = "linux")]
625    #[test]
626    fn test_iouring_allocate_write_read() -> QuillSQLResult<()> {
627        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
628        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
629        let mut cfg = IOSchedulerConfig::default();
630        cfg.iouring_queue_depth = 256;
631        let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
632
633        // allocate
634        let rx_alloc = scheduler.schedule_allocate()?;
635        let page_id = rx_alloc
636            .recv()
637            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
638
639        // write
640        let mut data = BytesMut::zeroed(PAGE_SIZE);
641        data[..4].copy_from_slice(&[1, 2, 3, 4]);
642        scheduler
643            .schedule_write(page_id, data.freeze())?
644            .recv()
645            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
646
647        // read
648        let read = scheduler
649            .schedule_read(page_id)?
650            .recv()
651            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
652        assert_eq!(&read[..4], &[1, 2, 3, 4]);
653        Ok(())
654    }
655
656    #[cfg(target_os = "linux")]
657    #[test]
658    fn test_iouring_concurrent_reads() -> QuillSQLResult<()> {
659        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
660        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
661        let mut cfg = IOSchedulerConfig::default();
662        cfg.iouring_queue_depth = 256;
663        let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
664
665        let page_id = scheduler
666            .schedule_allocate()?
667            .recv()
668            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
669
670        scheduler
671            .schedule_write(page_id, {
672                let mut b = BytesMut::zeroed(PAGE_SIZE);
673                b[..13].copy_from_slice(b"Hello, World!");
674                b.freeze()
675            })?
676            .recv()
677            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
678
679        let mut handles = vec![];
680        for _ in 0..8u32 {
681            let s = scheduler.clone();
682            handles.push(thread::spawn(move || {
683                let data = s
684                    .schedule_read(page_id)
685                    .map_err(|e| e.to_string())
686                    .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
687                    .and_then(|res| res.map_err(|e| e.to_string()))?;
688                if &data[..13] != b"Hello, World!" {
689                    return Err("mismatch".into());
690                }
691                Ok::<(), String>(())
692            }));
693        }
694        for h in handles {
695            h.join().unwrap().unwrap();
696        }
697        Ok(())
698    }
699}