quill_sql/storage/
disk_scheduler.rs

1use super::disk_manager::DiskManager;
2use crate::buffer::PageId;
3use crate::config::{IOSchedulerConfig, IOStrategy};
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::storage::io::thread_pool;
6use crate::storage::io::IOBackend; // trait facade
7use bytes::{Bytes, BytesMut};
8use std::path::PathBuf;
9#[cfg(target_os = "linux")]
10use std::sync::mpsc::{self, Receiver, Sender};
11use std::sync::Arc;
12use std::thread;
13
14#[derive(Debug)]
15pub enum DiskError {
16    Io(std::io::Error),
17    Cancelled,
18}
19
20pub enum DiskResponse {
21    Read { data: BytesMut },
22    Write,
23    Allocate { page_id: PageId },
24    Error(QuillSQLError),
25}
26// IOStrategy and IOSchedulerConfig moved to crate::config
27
28// Type alias for the sender part of the result channel
29pub type DiskCommandResultSender<T> = Sender<QuillSQLResult<T>>;
30// Type alias for the receiver part of the result channel
31pub type DiskCommandResultReceiver<T> = Receiver<QuillSQLResult<T>>;
32
33// Commands sent from BufferPoolManager to the DiskScheduler task
34#[derive(Debug, Clone)]
35pub enum DiskRequest {
36    ReadPage {
37        page_id: PageId,
38        result_sender: DiskCommandResultSender<BytesMut>,
39    },
40    /// Read arbitrary pages by id order; returns buffers in the same order
41    ReadPages {
42        page_ids: Vec<PageId>,
43        result_sender: DiskCommandResultSender<Vec<BytesMut>>,
44    },
45    WritePage {
46        page_id: PageId,
47        data: Bytes,
48        result_sender: DiskCommandResultSender<()>,
49    },
50    AllocatePage {
51        result_sender: Sender<QuillSQLResult<PageId>>,
52    },
53    DeallocatePage {
54        page_id: PageId,
55        result_sender: DiskCommandResultSender<()>,
56    },
57    WriteWal {
58        path: PathBuf,
59        offset: u64,
60        bytes: Bytes,
61        sync: bool,
62        result_sender: DiskCommandResultSender<()>,
63    },
64    ReadWal {
65        path: PathBuf,
66        offset: u64,
67        len: usize,
68        result_sender: DiskCommandResultSender<Vec<u8>>,
69    },
70    Shutdown,
71}
72
73// Structure to manage the background I/O thread
74#[derive(Debug)]
75pub struct DiskScheduler {
76    request_sender: Sender<DiskRequest>,
77    // Dispatcher thread receives all requests and forwards to workers
78    dispatcher_thread: Option<thread::JoinHandle<()>>,
79    // Worker threads execute actual I/O tasks concurrently
80    worker_threads: Vec<thread::JoinHandle<()>>,
81    /// Centralized runtime configuration
82    pub config: IOSchedulerConfig,
83}
84
85impl DiskScheduler {
86    pub fn new(disk_manager: Arc<DiskManager>) -> Self {
87        Self::new_with_config(disk_manager, IOSchedulerConfig::default())
88    }
89
90    pub fn new_with_config(disk_manager: Arc<DiskManager>, config: IOSchedulerConfig) -> Self {
91        let (request_sender, dispatcher_thread, worker_threads) =
92            thread_pool::start(disk_manager, config);
93
94        DiskScheduler {
95            request_sender,
96            dispatcher_thread: Some(dispatcher_thread),
97            worker_threads,
98            config,
99        }
100    }
101
102    /// Create scheduler with explicit strategy. IoUring currently logs and
103    /// falls back to thread-pool to preserve compatibility.
104    pub fn new_with_strategy(disk_manager: Arc<DiskManager>, strategy: IOStrategy) -> Self {
105        match strategy {
106            IOStrategy::ThreadPool { workers } => {
107                let mut cfg = IOSchedulerConfig::default();
108                if let Some(w) = workers {
109                    cfg.workers = w;
110                }
111                Self::new_with_config(disk_manager, cfg)
112            }
113            IOStrategy::IoUring { queue_depth } => {
114                Self::new_with_iouring(disk_manager, queue_depth)
115            }
116        }
117    }
118
119    #[cfg(not(target_os = "linux"))]
120    fn new_with_iouring(disk_manager: Arc<DiskManager>, _queue_depth: Option<usize>) -> Self {
121        eprintln!("WARN: IoUring selected on non-Linux platform; falling back to thread-pool");
122        Self::new(disk_manager)
123    }
124
125    #[cfg(target_os = "linux")]
126    fn new_with_iouring(disk_manager: Arc<DiskManager>, queue_depth: Option<usize>) -> Self {
127        let mut config = IOSchedulerConfig::default();
128        if let Some(q) = queue_depth {
129            config.iouring_queue_depth = q;
130        }
131        let (request_sender, dispatcher_thread, worker_threads) =
132            crate::storage::io::iouring::start(disk_manager, config);
133
134        DiskScheduler {
135            request_sender,
136            dispatcher_thread: Some(dispatcher_thread),
137            worker_threads,
138            config,
139        }
140    }
141
142    // --- Public methods to send requests ---
143
144    pub fn schedule_read(
145        &self,
146        page_id: PageId,
147    ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
148        let (tx, rx) = mpsc::channel();
149        self.request_sender
150            .send(DiskRequest::ReadPage {
151                page_id,
152                result_sender: tx,
153            })
154            .map_err(|e| QuillSQLError::Internal(format!("Failed to send Read request: {}", e)))?;
155        Ok(rx)
156    }
157
158    pub fn schedule_write(
159        &self,
160        page_id: PageId,
161        data: Bytes,
162    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
163        let (tx, rx) = mpsc::channel();
164        self.request_sender
165            .send(DiskRequest::WritePage {
166                page_id,
167                data,
168                result_sender: tx,
169            })
170            .map_err(|e| QuillSQLError::Internal(format!("Failed to send Write request: {}", e)))?;
171        Ok(rx)
172    }
173
174    pub fn schedule_read_pages(
175        &self,
176        page_ids: Vec<PageId>,
177    ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
178        let (tx, rx) = mpsc::channel();
179        self.request_sender
180            .send(DiskRequest::ReadPages {
181                page_ids,
182                result_sender: tx,
183            })
184            .map_err(|e| {
185                QuillSQLError::Internal(format!("Failed to send ReadPages request: {}", e))
186            })?;
187        Ok(rx)
188    }
189
190    // removed schedule_write_pages_contiguous
191
192    pub fn schedule_allocate(&self) -> QuillSQLResult<Receiver<QuillSQLResult<PageId>>> {
193        let (tx, rx) = mpsc::channel();
194        self.request_sender
195            .send(DiskRequest::AllocatePage { result_sender: tx })
196            .map_err(|e| {
197                QuillSQLError::Internal(format!("Failed to send Allocate request: {}", e))
198            })?;
199        Ok(rx)
200    }
201
202    pub fn schedule_deallocate(
203        &self,
204        page_id: PageId,
205    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
206        let (tx, rx) = mpsc::channel();
207        self.request_sender
208            .send(DiskRequest::DeallocatePage {
209                page_id,
210                result_sender: tx,
211            })
212            .map_err(|e| {
213                QuillSQLError::Internal(format!("Failed to send Deallocate request: {}", e))
214            })?;
215        Ok(rx)
216    }
217
218    pub fn schedule_wal_write(
219        &self,
220        path: PathBuf,
221        offset: u64,
222        bytes: Bytes,
223        sync: bool,
224    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
225        let (tx, rx) = mpsc::channel();
226        self.request_sender
227            .send(DiskRequest::WriteWal {
228                path,
229                offset,
230                bytes,
231                sync,
232                result_sender: tx,
233            })
234            .map_err(|e| {
235                QuillSQLError::Internal(format!("Failed to send WAL write request: {}", e))
236            })?;
237        Ok(rx)
238    }
239
240    pub fn schedule_wal_read(
241        &self,
242        path: PathBuf,
243        offset: u64,
244        len: usize,
245    ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<u8>>> {
246        let (tx, rx) = mpsc::channel();
247        self.request_sender
248            .send(DiskRequest::ReadWal {
249                path,
250                offset,
251                len,
252                result_sender: tx,
253            })
254            .map_err(|e| {
255                QuillSQLError::Internal(format!("Failed to send WAL read request: {}", e))
256            })?;
257        Ok(rx)
258    }
259}
260
261// Implement Drop for graceful shutdown
262impl Drop for DiskScheduler {
263    fn drop(&mut self) {
264        // println!("DEBUG: DiskScheduler dropping. Sending Shutdown signal...");
265        // Send shutdown signal. Ignore error if channel already closed.
266        let _ = self.request_sender.send(DiskRequest::Shutdown);
267
268        // Join dispatcher first
269        if let Some(handle) = self.dispatcher_thread.take() {
270            if let Err(e) = handle.join() {
271                log::error!("Disk dispatcher thread panicked: {:?}", e);
272            }
273        }
274
275        // Then join all workers
276        for handle in self.worker_threads.drain(..) {
277            if let Err(e) = handle.join() {
278                log::error!("Disk worker thread panicked: {:?}", e);
279            }
280        }
281    }
282}
283
284// ---- IOBackend facade for DiskScheduler (compatibility shim) ----
285impl IOBackend for DiskScheduler {
286    fn schedule_read(
287        &self,
288        page_id: PageId,
289    ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
290        Self::schedule_read(self, page_id)
291    }
292
293    fn schedule_read_pages(
294        &self,
295        page_ids: Vec<PageId>,
296    ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
297        Self::schedule_read_pages(self, page_ids)
298    }
299
300    fn schedule_write(
301        &self,
302        page_id: PageId,
303        data: Bytes,
304    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
305        Self::schedule_write(self, page_id, data)
306    }
307
308    fn schedule_allocate(&self) -> QuillSQLResult<Receiver<QuillSQLResult<PageId>>> {
309        Self::schedule_allocate(self)
310    }
311
312    fn schedule_deallocate(
313        &self,
314        page_id: PageId,
315    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
316        Self::schedule_deallocate(self, page_id)
317    }
318}
319
320// --- Tests for DiskScheduler ---
321#[cfg(test)]
322mod tests {
323    use super::DiskManager;
324    use super::*;
325    use crate::buffer::PAGE_SIZE;
326    use crate::error::QuillSQLResult;
327    use bytes::{Bytes, BytesMut};
328    use std::sync::Arc;
329    use std::thread;
330    use std::time::Duration;
331    use tempfile::TempDir;
332
333    // Helper to create a scheduler with temp directory
334    fn create_test_scheduler() -> (TempDir, Arc<DiskScheduler>, Arc<DiskManager>) {
335        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
336        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
337        let scheduler = Arc::new(DiskScheduler::new(dm.clone()));
338        (temp_dir, scheduler, dm)
339    }
340
341    // Helper to create dummy page data as Bytes
342    fn create_dummy_page_bytes(content: &str) -> Bytes {
343        let mut data = BytesMut::zeroed(PAGE_SIZE);
344        let content_bytes = content.as_bytes();
345        let len = std::cmp::min(content_bytes.len(), PAGE_SIZE);
346        data[..len].copy_from_slice(&content_bytes[..len]);
347        data.freeze() // Convert to Bytes
348    }
349
350    // Helper to read content back from BytesMut
351    fn read_page_content(data: &BytesMut) -> String {
352        let first_null = data.iter().position(|&b| b == 0).unwrap_or(data.len());
353        String::from_utf8_lossy(&data[..first_null]).to_string()
354    }
355
356    #[test]
357    fn test_scheduler_allocate_write_read() -> QuillSQLResult<()> {
358        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
359
360        // allocate pagge
361        let rx_alloc = scheduler.schedule_allocate()?;
362        let page_id = rx_alloc
363            .recv()
364            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
365
366        // write page test
367        let content = "Hello DiskScheduler!";
368        let data_bytes = create_dummy_page_bytes(content);
369        let rx_write = scheduler.schedule_write(page_id, data_bytes)?;
370        rx_write
371            .recv()
372            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
373
374        // read and verify data
375        let rx_read = scheduler.schedule_read(page_id)?;
376        let read_result = rx_read
377            .recv()
378            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
379        assert_eq!(read_page_content(&read_result), content);
380
381        Ok(())
382    }
383
384    #[test]
385    fn test_scheduler_deallocate() -> QuillSQLResult<()> {
386        let (_temp_dir, scheduler, dm) = create_test_scheduler();
387
388        // allocate page and write data
389        let page_id = scheduler
390            .schedule_allocate()?
391            .recv()
392            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
393
394        scheduler
395            .schedule_write(page_id, create_dummy_page_bytes("Test Data"))?
396            .recv()
397            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
398
399        // free page
400        let rx_dealloc = scheduler.schedule_deallocate(page_id)?;
401        rx_dealloc
402            .recv()
403            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
404
405        // verify deallocation by attempting to read (should return zeroed data)
406        let data_after_dealloc = dm.read_page(page_id)?;
407        assert!(data_after_dealloc.iter().all(|&b| b == 0));
408
409        Ok(())
410    }
411
412    #[test]
413    fn test_concurrent_operations() -> QuillSQLResult<()> {
414        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
415
416        // 创建测试页面
417        let page_id = scheduler
418            .schedule_allocate()?
419            .recv()
420            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
421
422        scheduler
423            .schedule_write(page_id, create_dummy_page_bytes("Concurrent Test"))?
424            .recv()
425            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
426
427        // 启动多个并发读取线程
428        let mut handles = vec![];
429        let num_threads = 10; // 增加并发线程数
430
431        for i in 0..num_threads {
432            let scheduler_clone = scheduler.clone();
433            let handle = thread::spawn(move || {
434                // 每个线程有轻微延迟,增加并发可能性
435                thread::sleep(Duration::from_millis(i * 5));
436
437                scheduler_clone
438                    .schedule_read(page_id)
439                    .map_err(|e| e.to_string())
440                    .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
441                    .and_then(|res| res.map_err(|e| e.to_string()))
442            });
443            handles.push(handle);
444        }
445
446        // 验证所有线程都能正确读取数据
447        for handle in handles {
448            match handle.join().unwrap() {
449                Ok(read_data) => assert_eq!(read_page_content(&read_data), "Concurrent Test"),
450                Err(e) => panic!("Concurrent read thread failed: {}", e),
451            }
452        }
453
454        Ok(())
455    }
456
457    #[test]
458    fn test_mixed_operations() -> QuillSQLResult<()> {
459        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
460
461        // 分配多个页面
462        let mut page_ids = vec![];
463        let num_pages = 5;
464
465        for _ in 0..num_pages {
466            let page_id = scheduler
467                .schedule_allocate()?
468                .recv()
469                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
470            page_ids.push(page_id);
471        }
472
473        // 对每个页面执行读写测试
474        for (i, &page_id) in page_ids.iter().enumerate() {
475            let content = format!("Page {} content", i);
476
477            // 写入
478            scheduler
479                .schedule_write(page_id, create_dummy_page_bytes(&content))?
480                .recv()
481                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
482
483            // 读取并验证
484            let read_data = scheduler
485                .schedule_read(page_id)?
486                .recv()
487                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
488
489            assert_eq!(read_page_content(&read_data), content);
490        }
491
492        // 释放一部分页面
493        for &page_id in page_ids.iter().take(2) {
494            scheduler
495                .schedule_deallocate(page_id)?
496                .recv()
497                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
498        }
499
500        Ok(())
501    }
502
503    #[test]
504    fn test_scheduler_shutdown() -> QuillSQLResult<()> {
505        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
506        let scheduler_arc = scheduler;
507
508        // 启动后台线程,在调度器关闭后尝试操作
509        let scheduler_clone = scheduler_arc.clone();
510        let handle = thread::spawn(move || {
511            // 等待一段时间,以便主线程有时间关闭调度器
512            thread::sleep(Duration::from_millis(100));
513
514            // 尝试在调度器关闭后分配页面,应该会失败
515            scheduler_clone
516                .schedule_allocate()
517                .map_err(|e| e.to_string())
518                .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
519                .and_then(|res| res.map_err(|e| e.to_string()))
520        });
521
522        // 关闭调度器
523        drop(scheduler_arc);
524
525        // 检查后台线程结果
526        match handle.join().unwrap() {
527            Ok(page_id) => println!("Thread completed after shutdown: {:?}", page_id),
528            Err(e) => println!("Thread failed as expected after shutdown: {}", e),
529        }
530
531        Ok(())
532    }
533
534    #[test]
535    fn test_large_data_transfer() -> QuillSQLResult<()> {
536        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
537
538        // 分配页面
539        let page_id = scheduler
540            .schedule_allocate()?
541            .recv()
542            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
543
544        // 创建一个接近页面大小限制的大数据
545        let large_string = "X".repeat(PAGE_SIZE - 100);
546        let large_data = create_dummy_page_bytes(&large_string);
547
548        // 写入大数据
549        scheduler
550            .schedule_write(page_id, large_data)?
551            .recv()
552            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
553
554        // 读取并验证大数据
555        let read_result = scheduler
556            .schedule_read(page_id)?
557            .recv()
558            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
559
560        // 验证数据长度,避免完整字符串比较
561        let read_content = read_page_content(&read_result);
562        assert_eq!(read_content.len(), large_string.len());
563        assert_eq!(&read_content[0..10], &large_string[0..10]); // 检查前缀
564
565        Ok(())
566    }
567
568    #[cfg(target_os = "linux")]
569    #[test]
570    fn test_iouring_allocate_write_read() -> QuillSQLResult<()> {
571        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
572        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
573        let scheduler = Arc::new(DiskScheduler::new_with_strategy(
574            dm.clone(),
575            IOStrategy::IoUring {
576                queue_depth: Some(256),
577            },
578        ));
579
580        // allocate
581        let rx_alloc = scheduler.schedule_allocate()?;
582        let page_id = rx_alloc
583            .recv()
584            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
585
586        // write
587        let mut data = BytesMut::zeroed(PAGE_SIZE);
588        data[..4].copy_from_slice(&[1, 2, 3, 4]);
589        scheduler
590            .schedule_write(page_id, data.freeze())?
591            .recv()
592            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
593
594        // read
595        let read = scheduler
596            .schedule_read(page_id)?
597            .recv()
598            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
599        assert_eq!(&read[..4], &[1, 2, 3, 4]);
600        Ok(())
601    }
602
603    #[cfg(target_os = "linux")]
604    #[test]
605    fn test_iouring_concurrent_reads() -> QuillSQLResult<()> {
606        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
607        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
608        let scheduler = Arc::new(DiskScheduler::new_with_strategy(
609            dm.clone(),
610            IOStrategy::IoUring {
611                queue_depth: Some(256),
612            },
613        ));
614
615        let page_id = scheduler
616            .schedule_allocate()?
617            .recv()
618            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
619
620        scheduler
621            .schedule_write(page_id, {
622                let mut b = BytesMut::zeroed(PAGE_SIZE);
623                b[..13].copy_from_slice(b"Hello, World!");
624                b.freeze()
625            })?
626            .recv()
627            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
628
629        let mut handles = vec![];
630        for _ in 0..8u32 {
631            let s = scheduler.clone();
632            handles.push(thread::spawn(move || {
633                let data = s
634                    .schedule_read(page_id)
635                    .map_err(|e| e.to_string())
636                    .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
637                    .and_then(|res| res.map_err(|e| e.to_string()))?;
638                if &data[..13] != b"Hello, World!" {
639                    return Err("mismatch".into());
640                }
641                Ok::<(), String>(())
642            }));
643        }
644        for h in handles {
645            h.join().unwrap().unwrap();
646        }
647        Ok(())
648    }
649}