quill_sql/storage/
disk_scheduler.rs

1use super::disk_manager::DiskManager;
2use crate::buffer::PageId;
3use crate::config::IOSchedulerConfig;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use bytes::{Bytes, BytesMut};
6use std::path::PathBuf;
7use std::sync::mpsc::{self, Receiver, Sender};
8use std::sync::Arc;
9use std::thread;
10
11#[cfg(target_os = "linux")]
12use crate::storage::io::io_uring;
13
14#[derive(Debug)]
15pub enum DiskError {
16    Io(std::io::Error),
17    Cancelled,
18}
19
20pub enum DiskResponse {
21    Read { data: BytesMut },
22    Write,
23    Allocate { page_id: PageId },
24    Error(QuillSQLError),
25}
26// Type alias for the sender part of the result channel
27pub type DiskCommandResultSender<T> = Sender<QuillSQLResult<T>>;
28// Type alias for the receiver part of the result channel
29pub type DiskCommandResultReceiver<T> = Receiver<QuillSQLResult<T>>;
30
31// Commands sent from BufferManager to the DiskScheduler task
32#[derive(Debug, Clone)]
33pub enum DiskRequest {
34    ReadPage {
35        page_id: PageId,
36        result_sender: DiskCommandResultSender<BytesMut>,
37    },
38    /// Read arbitrary pages by id order; returns buffers in the same order
39    ReadPages {
40        page_ids: Vec<PageId>,
41        result_sender: DiskCommandResultSender<Vec<BytesMut>>,
42    },
43    WritePage {
44        page_id: PageId,
45        data: Bytes,
46        result_sender: DiskCommandResultSender<()>,
47    },
48    WriteWal {
49        path: PathBuf,
50        offset: u64,
51        data: Bytes,
52        sync: bool,
53        result_sender: DiskCommandResultSender<()>,
54    },
55    FsyncWal {
56        path: PathBuf,
57        result_sender: DiskCommandResultSender<()>,
58    },
59    AllocatePage {
60        result_sender: Sender<QuillSQLResult<PageId>>,
61    },
62    DeallocatePage {
63        page_id: PageId,
64        result_sender: DiskCommandResultSender<()>,
65    },
66    Shutdown,
67}
68
69// Structure to manage the background I/O thread
70#[derive(Debug)]
71pub struct DiskScheduler {
72    request_sender: Sender<DiskRequest>,
73    dispatcher_thread: Option<thread::JoinHandle<()>>,
74    worker_threads: Vec<thread::JoinHandle<()>>,
75    pub config: IOSchedulerConfig,
76}
77
78impl DiskScheduler {
79    pub fn new(disk_manager: Arc<DiskManager>) -> Self {
80        Self::new_with_config(disk_manager, IOSchedulerConfig::default())
81    }
82
83    pub fn new_with_config(disk_manager: Arc<DiskManager>, config: IOSchedulerConfig) -> Self {
84        #[cfg(target_os = "linux")]
85        {
86            let (request_sender, dispatcher_thread, worker_threads) =
87                spawn_runtime(disk_manager.clone(), config);
88            return DiskScheduler {
89                request_sender,
90                dispatcher_thread: Some(dispatcher_thread),
91                worker_threads,
92                config,
93            };
94        }
95
96        #[cfg(not(target_os = "linux"))]
97        {
98            panic!("io_uring disk scheduler requires Linux");
99        }
100    }
101
102    // --- Public methods to send requests ---
103
104    pub fn schedule_read(
105        &self,
106        page_id: PageId,
107    ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
108        let (tx, rx) = mpsc::channel();
109        self.request_sender
110            .send(DiskRequest::ReadPage {
111                page_id,
112                result_sender: tx,
113            })
114            .map_err(|e| QuillSQLError::Internal(format!("Failed to send Read request: {}", e)))?;
115        Ok(rx)
116    }
117
118    pub fn schedule_write(
119        &self,
120        page_id: PageId,
121        data: Bytes,
122    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
123        let (tx, rx) = mpsc::channel();
124        self.request_sender
125            .send(DiskRequest::WritePage {
126                page_id,
127                data,
128                result_sender: tx,
129            })
130            .map_err(|e| QuillSQLError::Internal(format!("Failed to send Write request: {}", e)))?;
131        Ok(rx)
132    }
133
134    pub fn schedule_wal_write(
135        &self,
136        path: PathBuf,
137        offset: u64,
138        data: Bytes,
139        sync: bool,
140    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
141        let (tx, rx) = mpsc::channel();
142        self.request_sender
143            .send(DiskRequest::WriteWal {
144                path,
145                offset,
146                data,
147                sync,
148                result_sender: tx,
149            })
150            .map_err(|e| {
151                QuillSQLError::Internal(format!("Failed to send WAL write request: {}", e))
152            })?;
153        Ok(rx)
154    }
155
156    pub fn schedule_wal_fsync(
157        &self,
158        path: PathBuf,
159    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
160        let (tx, rx) = mpsc::channel();
161        self.request_sender
162            .send(DiskRequest::FsyncWal {
163                path,
164                result_sender: tx,
165            })
166            .map_err(|e| {
167                QuillSQLError::Internal(format!("Failed to send WAL fsync request: {}", e))
168            })?;
169        Ok(rx)
170    }
171
172    pub fn schedule_read_pages(
173        &self,
174        page_ids: Vec<PageId>,
175    ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
176        let (tx, rx) = mpsc::channel();
177        self.request_sender
178            .send(DiskRequest::ReadPages {
179                page_ids,
180                result_sender: tx,
181            })
182            .map_err(|e| {
183                QuillSQLError::Internal(format!("Failed to send ReadPages request: {}", e))
184            })?;
185        Ok(rx)
186    }
187
188    // removed schedule_write_pages_contiguous
189
190    pub fn schedule_allocate(&self) -> QuillSQLResult<Receiver<QuillSQLResult<PageId>>> {
191        let (tx, rx) = mpsc::channel();
192        self.request_sender
193            .send(DiskRequest::AllocatePage { result_sender: tx })
194            .map_err(|e| {
195                QuillSQLError::Internal(format!("Failed to send Allocate request: {}", e))
196            })?;
197        Ok(rx)
198    }
199
200    pub fn schedule_deallocate(
201        &self,
202        page_id: PageId,
203    ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
204        let (tx, rx) = mpsc::channel();
205        self.request_sender
206            .send(DiskRequest::DeallocatePage {
207                page_id,
208                result_sender: tx,
209            })
210            .map_err(|e| {
211                QuillSQLError::Internal(format!("Failed to send Deallocate request: {}", e))
212            })?;
213        Ok(rx)
214    }
215}
216
217// Implement Drop for graceful shutdown
218impl Drop for DiskScheduler {
219    fn drop(&mut self) {
220        let _ = self.request_sender.send(DiskRequest::Shutdown);
221        if let Some(handle) = self.dispatcher_thread.take() {
222            if let Err(e) = handle.join() {
223                log::error!("Disk dispatcher thread panicked: {:?}", e);
224            }
225        }
226        for handle in self.worker_threads.drain(..) {
227            if let Err(e) = handle.join() {
228                log::error!("Disk worker thread panicked: {:?}", e);
229            }
230        }
231    }
232}
233
234#[cfg(target_os = "linux")]
235fn spawn_runtime(
236    disk_manager: Arc<DiskManager>,
237    config: IOSchedulerConfig,
238) -> (
239    Sender<DiskRequest>,
240    thread::JoinHandle<()>,
241    Vec<thread::JoinHandle<()>>,
242) {
243    let worker_count = config.workers;
244    let (request_sender, request_receiver) = mpsc::channel::<DiskRequest>();
245
246    let mut worker_senders = Vec::with_capacity(worker_count);
247    let mut worker_threads = Vec::with_capacity(worker_count);
248    for i in 0..worker_count {
249        let (tx, rx) = mpsc::channel::<DiskRequest>();
250        worker_senders.push(tx);
251        let dm = disk_manager.clone();
252        let worker_config = config;
253        let entries = worker_config.iouring_queue_depth as u32;
254        let fixed_count = worker_config.iouring_fixed_buffers;
255        let sqpoll_idle = worker_config.iouring_sqpoll_idle_ms;
256        let fsync_on_write = worker_config.fsync_on_write;
257        let handle = thread::Builder::new()
258            .name(format!("disk-scheduler-iouring-worker-{}", i))
259            .spawn(move || {
260                io_uring::worker_loop(rx, dm, entries, fixed_count, sqpoll_idle, fsync_on_write);
261            })
262            .expect("Failed to spawn DiskScheduler io_uring worker thread");
263        worker_threads.push(handle);
264    }
265
266    let dispatcher_thread = thread::Builder::new()
267        .name("disk-scheduler-dispatcher".to_string())
268        .spawn(move || dispatcher_loop(request_receiver, worker_senders))
269        .expect("Failed to spawn DiskScheduler dispatcher thread");
270
271    (request_sender, dispatcher_thread, worker_threads)
272}
273
274#[cfg(target_os = "linux")]
275fn dispatcher_loop(receiver: Receiver<DiskRequest>, workers: Vec<Sender<DiskRequest>>) {
276    log::debug!("DiskScheduler dispatcher started");
277    let mut rr = 0usize;
278    let worker_len = workers.len();
279    while let Ok(req) = receiver.recv() {
280        match req {
281            DiskRequest::Shutdown => {
282                for tx in &workers {
283                    let _ = tx.send(DiskRequest::Shutdown);
284                }
285                break;
286            }
287            other => {
288                if worker_len == 0 {
289                    log::error!("No io_uring workers available");
290                    break;
291                }
292                let idx = rr % worker_len;
293                rr = rr.wrapping_add(1);
294                if workers[idx].send(other).is_err() {
295                    log::error!("Failed to forward request to io_uring worker");
296                }
297            }
298        }
299    }
300    log::debug!("DiskScheduler dispatcher finished");
301}
302
303// --- Tests for DiskScheduler ---
304#[cfg(test)]
305mod tests {
306    use super::DiskManager;
307    use super::*;
308    use crate::buffer::PAGE_SIZE;
309    use crate::error::QuillSQLResult;
310    use bytes::{Bytes, BytesMut};
311    use std::sync::Arc;
312    use std::thread;
313    use std::time::Duration;
314    use tempfile::TempDir;
315
316    // Helper to create a scheduler with temp directory
317    fn create_test_scheduler() -> (TempDir, Arc<DiskScheduler>, Arc<DiskManager>) {
318        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
319        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
320        let scheduler = Arc::new(DiskScheduler::new(dm.clone()));
321        (temp_dir, scheduler, dm)
322    }
323
324    // Helper to create dummy page data as Bytes
325    fn create_dummy_page_bytes(content: &str) -> Bytes {
326        let mut data = BytesMut::zeroed(PAGE_SIZE);
327        let content_bytes = content.as_bytes();
328        let len = std::cmp::min(content_bytes.len(), PAGE_SIZE);
329        data[..len].copy_from_slice(&content_bytes[..len]);
330        data.freeze() // Convert to Bytes
331    }
332
333    // Helper to read content back from BytesMut
334    fn read_page_content(data: &BytesMut) -> String {
335        let first_null = data.iter().position(|&b| b == 0).unwrap_or(data.len());
336        String::from_utf8_lossy(&data[..first_null]).to_string()
337    }
338
339    #[test]
340    fn test_scheduler_allocate_write_read() -> QuillSQLResult<()> {
341        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
342
343        // allocate pagge
344        let rx_alloc = scheduler.schedule_allocate()?;
345        let page_id = rx_alloc
346            .recv()
347            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
348
349        // write page test
350        let content = "Hello DiskScheduler!";
351        let data_bytes = create_dummy_page_bytes(content);
352        let rx_write = scheduler.schedule_write(page_id, data_bytes)?;
353        rx_write
354            .recv()
355            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
356
357        // read and verify data
358        let rx_read = scheduler.schedule_read(page_id)?;
359        let read_result = rx_read
360            .recv()
361            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
362        assert_eq!(read_page_content(&read_result), content);
363
364        Ok(())
365    }
366
367    #[test]
368    fn test_scheduler_deallocate() -> QuillSQLResult<()> {
369        let (_temp_dir, scheduler, dm) = create_test_scheduler();
370
371        // allocate page and write data
372        let page_id = scheduler
373            .schedule_allocate()?
374            .recv()
375            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
376
377        scheduler
378            .schedule_write(page_id, create_dummy_page_bytes("Test Data"))?
379            .recv()
380            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
381
382        // free page
383        let rx_dealloc = scheduler.schedule_deallocate(page_id)?;
384        rx_dealloc
385            .recv()
386            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
387
388        // verify deallocation by attempting to read (should return zeroed data)
389        let data_after_dealloc = dm.read_page(page_id)?;
390        assert!(data_after_dealloc.iter().all(|&b| b == 0));
391
392        Ok(())
393    }
394
395    #[test]
396    fn test_concurrent_operations() -> QuillSQLResult<()> {
397        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
398
399        // 创建测试页面
400        let page_id = scheduler
401            .schedule_allocate()?
402            .recv()
403            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
404
405        scheduler
406            .schedule_write(page_id, create_dummy_page_bytes("Concurrent Test"))?
407            .recv()
408            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
409
410        // 启动多个并发读取线程
411        let mut handles = vec![];
412        let num_threads = 10; // 增加并发线程数
413
414        for i in 0..num_threads {
415            let scheduler_clone = scheduler.clone();
416            let handle = thread::spawn(move || {
417                // 每个线程有轻微延迟,增加并发可能性
418                thread::sleep(Duration::from_millis(i * 5));
419
420                scheduler_clone
421                    .schedule_read(page_id)
422                    .map_err(|e| e.to_string())
423                    .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
424                    .and_then(|res| res.map_err(|e| e.to_string()))
425            });
426            handles.push(handle);
427        }
428
429        // 验证所有线程都能正确读取数据
430        for handle in handles {
431            match handle.join().unwrap() {
432                Ok(read_data) => assert_eq!(read_page_content(&read_data), "Concurrent Test"),
433                Err(e) => panic!("Concurrent read thread failed: {}", e),
434            }
435        }
436
437        Ok(())
438    }
439
440    #[test]
441    fn test_mixed_operations() -> QuillSQLResult<()> {
442        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
443
444        // 分配多个页面
445        let mut page_ids = vec![];
446        let num_pages = 5;
447
448        for _ in 0..num_pages {
449            let page_id = scheduler
450                .schedule_allocate()?
451                .recv()
452                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
453            page_ids.push(page_id);
454        }
455
456        // 对每个页面执行读写测试
457        for (i, &page_id) in page_ids.iter().enumerate() {
458            let content = format!("Page {} content", i);
459
460            // 写入
461            scheduler
462                .schedule_write(page_id, create_dummy_page_bytes(&content))?
463                .recv()
464                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
465
466            // 读取并验证
467            let read_data = scheduler
468                .schedule_read(page_id)?
469                .recv()
470                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
471
472            assert_eq!(read_page_content(&read_data), content);
473        }
474
475        // 释放一部分页面
476        for &page_id in page_ids.iter().take(2) {
477            scheduler
478                .schedule_deallocate(page_id)?
479                .recv()
480                .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
481        }
482
483        Ok(())
484    }
485
486    #[test]
487    fn test_scheduler_shutdown() -> QuillSQLResult<()> {
488        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
489        let scheduler_arc = scheduler;
490
491        // 启动后台线程,在调度器关闭后尝试操作
492        let scheduler_clone = scheduler_arc.clone();
493        let handle = thread::spawn(move || {
494            // 等待一段时间,以便主线程有时间关闭调度器
495            thread::sleep(Duration::from_millis(100));
496
497            // 尝试在调度器关闭后分配页面,应该会失败
498            scheduler_clone
499                .schedule_allocate()
500                .map_err(|e| e.to_string())
501                .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
502                .and_then(|res| res.map_err(|e| e.to_string()))
503        });
504
505        // 关闭调度器
506        drop(scheduler_arc);
507
508        // 检查后台线程结果
509        match handle.join().unwrap() {
510            Ok(page_id) => println!("Thread completed after shutdown: {:?}", page_id),
511            Err(e) => println!("Thread failed as expected after shutdown: {}", e),
512        }
513
514        Ok(())
515    }
516
517    #[test]
518    fn test_large_data_transfer() -> QuillSQLResult<()> {
519        let (_temp_dir, scheduler, _dm) = create_test_scheduler();
520
521        // 分配页面
522        let page_id = scheduler
523            .schedule_allocate()?
524            .recv()
525            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
526
527        // 创建一个接近页面大小限制的大数据
528        let large_string = "X".repeat(PAGE_SIZE - 100);
529        let large_data = create_dummy_page_bytes(&large_string);
530
531        // 写入大数据
532        scheduler
533            .schedule_write(page_id, large_data)?
534            .recv()
535            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
536
537        // 读取并验证大数据
538        let read_result = scheduler
539            .schedule_read(page_id)?
540            .recv()
541            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
542
543        // 验证数据长度,避免完整字符串比较
544        let read_content = read_page_content(&read_result);
545        assert_eq!(read_content.len(), large_string.len());
546        assert_eq!(&read_content[0..10], &large_string[0..10]); // 检查前缀
547
548        Ok(())
549    }
550
551    #[cfg(target_os = "linux")]
552    #[test]
553    fn test_iouring_allocate_write_read() -> QuillSQLResult<()> {
554        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
555        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
556        let mut cfg = IOSchedulerConfig::default();
557        cfg.iouring_queue_depth = 256;
558        let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
559
560        // allocate
561        let rx_alloc = scheduler.schedule_allocate()?;
562        let page_id = rx_alloc
563            .recv()
564            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
565
566        // write
567        let mut data = BytesMut::zeroed(PAGE_SIZE);
568        data[..4].copy_from_slice(&[1, 2, 3, 4]);
569        scheduler
570            .schedule_write(page_id, data.freeze())?
571            .recv()
572            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
573
574        // read
575        let read = scheduler
576            .schedule_read(page_id)?
577            .recv()
578            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
579        assert_eq!(&read[..4], &[1, 2, 3, 4]);
580        Ok(())
581    }
582
583    #[cfg(target_os = "linux")]
584    #[test]
585    fn test_iouring_concurrent_reads() -> QuillSQLResult<()> {
586        let temp_dir = TempDir::new().expect("unable to create temporary working directory");
587        let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
588        let mut cfg = IOSchedulerConfig::default();
589        cfg.iouring_queue_depth = 256;
590        let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
591
592        let page_id = scheduler
593            .schedule_allocate()?
594            .recv()
595            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
596
597        scheduler
598            .schedule_write(page_id, {
599                let mut b = BytesMut::zeroed(PAGE_SIZE);
600                b[..13].copy_from_slice(b"Hello, World!");
601                b.freeze()
602            })?
603            .recv()
604            .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
605
606        let mut handles = vec![];
607        for _ in 0..8u32 {
608            let s = scheduler.clone();
609            handles.push(thread::spawn(move || {
610                let data = s
611                    .schedule_read(page_id)
612                    .map_err(|e| e.to_string())
613                    .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
614                    .and_then(|res| res.map_err(|e| e.to_string()))?;
615                if &data[..13] != b"Hello, World!" {
616                    return Err("mismatch".into());
617                }
618                Ok::<(), String>(())
619            }));
620        }
621        for h in handles {
622            h.join().unwrap().unwrap();
623        }
624        Ok(())
625    }
626}