1use super::disk_manager::DiskManager;
2use crate::buffer::PageId;
3use crate::config::{IOSchedulerConfig, IOStrategy};
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::storage::io::thread_pool;
6use crate::storage::io::IOBackend; use bytes::{Bytes, BytesMut};
8use std::path::PathBuf;
9#[cfg(target_os = "linux")]
10use std::sync::mpsc::{self, Receiver, Sender};
11use std::sync::Arc;
12use std::thread;
13
14#[derive(Debug)]
15pub enum DiskError {
16 Io(std::io::Error),
17 Cancelled,
18}
19
20pub enum DiskResponse {
21 Read { data: BytesMut },
22 Write,
23 Allocate { page_id: PageId },
24 Error(QuillSQLError),
25}
26pub type DiskCommandResultSender<T> = Sender<QuillSQLResult<T>>;
30pub type DiskCommandResultReceiver<T> = Receiver<QuillSQLResult<T>>;
32
33#[derive(Debug, Clone)]
35pub enum DiskRequest {
36 ReadPage {
37 page_id: PageId,
38 result_sender: DiskCommandResultSender<BytesMut>,
39 },
40 ReadPages {
42 page_ids: Vec<PageId>,
43 result_sender: DiskCommandResultSender<Vec<BytesMut>>,
44 },
45 WritePage {
46 page_id: PageId,
47 data: Bytes,
48 result_sender: DiskCommandResultSender<()>,
49 },
50 AllocatePage {
51 result_sender: Sender<QuillSQLResult<PageId>>,
52 },
53 DeallocatePage {
54 page_id: PageId,
55 result_sender: DiskCommandResultSender<()>,
56 },
57 WriteWal {
58 path: PathBuf,
59 offset: u64,
60 bytes: Bytes,
61 sync: bool,
62 result_sender: DiskCommandResultSender<()>,
63 },
64 ReadWal {
65 path: PathBuf,
66 offset: u64,
67 len: usize,
68 result_sender: DiskCommandResultSender<Vec<u8>>,
69 },
70 Shutdown,
71}
72
73#[derive(Debug)]
75pub struct DiskScheduler {
76 request_sender: Sender<DiskRequest>,
77 dispatcher_thread: Option<thread::JoinHandle<()>>,
79 worker_threads: Vec<thread::JoinHandle<()>>,
81 pub config: IOSchedulerConfig,
83}
84
85impl DiskScheduler {
86 pub fn new(disk_manager: Arc<DiskManager>) -> Self {
87 Self::new_with_config(disk_manager, IOSchedulerConfig::default())
88 }
89
90 pub fn new_with_config(disk_manager: Arc<DiskManager>, config: IOSchedulerConfig) -> Self {
91 let (request_sender, dispatcher_thread, worker_threads) =
92 thread_pool::start(disk_manager, config);
93
94 DiskScheduler {
95 request_sender,
96 dispatcher_thread: Some(dispatcher_thread),
97 worker_threads,
98 config,
99 }
100 }
101
102 pub fn new_with_strategy(disk_manager: Arc<DiskManager>, strategy: IOStrategy) -> Self {
105 match strategy {
106 IOStrategy::ThreadPool { workers } => {
107 let mut cfg = IOSchedulerConfig::default();
108 if let Some(w) = workers {
109 cfg.workers = w;
110 }
111 Self::new_with_config(disk_manager, cfg)
112 }
113 IOStrategy::IoUring { queue_depth } => {
114 Self::new_with_iouring(disk_manager, queue_depth)
115 }
116 }
117 }
118
119 #[cfg(not(target_os = "linux"))]
120 fn new_with_iouring(disk_manager: Arc<DiskManager>, _queue_depth: Option<usize>) -> Self {
121 eprintln!("WARN: IoUring selected on non-Linux platform; falling back to thread-pool");
122 Self::new(disk_manager)
123 }
124
125 #[cfg(target_os = "linux")]
126 fn new_with_iouring(disk_manager: Arc<DiskManager>, queue_depth: Option<usize>) -> Self {
127 let mut config = IOSchedulerConfig::default();
128 if let Some(q) = queue_depth {
129 config.iouring_queue_depth = q;
130 }
131 let (request_sender, dispatcher_thread, worker_threads) =
132 crate::storage::io::iouring::start(disk_manager, config);
133
134 DiskScheduler {
135 request_sender,
136 dispatcher_thread: Some(dispatcher_thread),
137 worker_threads,
138 config,
139 }
140 }
141
142 pub fn schedule_read(
145 &self,
146 page_id: PageId,
147 ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
148 let (tx, rx) = mpsc::channel();
149 self.request_sender
150 .send(DiskRequest::ReadPage {
151 page_id,
152 result_sender: tx,
153 })
154 .map_err(|e| QuillSQLError::Internal(format!("Failed to send Read request: {}", e)))?;
155 Ok(rx)
156 }
157
158 pub fn schedule_write(
159 &self,
160 page_id: PageId,
161 data: Bytes,
162 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
163 let (tx, rx) = mpsc::channel();
164 self.request_sender
165 .send(DiskRequest::WritePage {
166 page_id,
167 data,
168 result_sender: tx,
169 })
170 .map_err(|e| QuillSQLError::Internal(format!("Failed to send Write request: {}", e)))?;
171 Ok(rx)
172 }
173
174 pub fn schedule_read_pages(
175 &self,
176 page_ids: Vec<PageId>,
177 ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
178 let (tx, rx) = mpsc::channel();
179 self.request_sender
180 .send(DiskRequest::ReadPages {
181 page_ids,
182 result_sender: tx,
183 })
184 .map_err(|e| {
185 QuillSQLError::Internal(format!("Failed to send ReadPages request: {}", e))
186 })?;
187 Ok(rx)
188 }
189
190 pub fn schedule_allocate(&self) -> QuillSQLResult<Receiver<QuillSQLResult<PageId>>> {
193 let (tx, rx) = mpsc::channel();
194 self.request_sender
195 .send(DiskRequest::AllocatePage { result_sender: tx })
196 .map_err(|e| {
197 QuillSQLError::Internal(format!("Failed to send Allocate request: {}", e))
198 })?;
199 Ok(rx)
200 }
201
202 pub fn schedule_deallocate(
203 &self,
204 page_id: PageId,
205 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
206 let (tx, rx) = mpsc::channel();
207 self.request_sender
208 .send(DiskRequest::DeallocatePage {
209 page_id,
210 result_sender: tx,
211 })
212 .map_err(|e| {
213 QuillSQLError::Internal(format!("Failed to send Deallocate request: {}", e))
214 })?;
215 Ok(rx)
216 }
217
218 pub fn schedule_wal_write(
219 &self,
220 path: PathBuf,
221 offset: u64,
222 bytes: Bytes,
223 sync: bool,
224 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
225 let (tx, rx) = mpsc::channel();
226 self.request_sender
227 .send(DiskRequest::WriteWal {
228 path,
229 offset,
230 bytes,
231 sync,
232 result_sender: tx,
233 })
234 .map_err(|e| {
235 QuillSQLError::Internal(format!("Failed to send WAL write request: {}", e))
236 })?;
237 Ok(rx)
238 }
239
240 pub fn schedule_wal_read(
241 &self,
242 path: PathBuf,
243 offset: u64,
244 len: usize,
245 ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<u8>>> {
246 let (tx, rx) = mpsc::channel();
247 self.request_sender
248 .send(DiskRequest::ReadWal {
249 path,
250 offset,
251 len,
252 result_sender: tx,
253 })
254 .map_err(|e| {
255 QuillSQLError::Internal(format!("Failed to send WAL read request: {}", e))
256 })?;
257 Ok(rx)
258 }
259}
260
261impl Drop for DiskScheduler {
263 fn drop(&mut self) {
264 let _ = self.request_sender.send(DiskRequest::Shutdown);
267
268 if let Some(handle) = self.dispatcher_thread.take() {
270 if let Err(e) = handle.join() {
271 log::error!("Disk dispatcher thread panicked: {:?}", e);
272 }
273 }
274
275 for handle in self.worker_threads.drain(..) {
277 if let Err(e) = handle.join() {
278 log::error!("Disk worker thread panicked: {:?}", e);
279 }
280 }
281 }
282}
283
284impl IOBackend for DiskScheduler {
286 fn schedule_read(
287 &self,
288 page_id: PageId,
289 ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
290 Self::schedule_read(self, page_id)
291 }
292
293 fn schedule_read_pages(
294 &self,
295 page_ids: Vec<PageId>,
296 ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
297 Self::schedule_read_pages(self, page_ids)
298 }
299
300 fn schedule_write(
301 &self,
302 page_id: PageId,
303 data: Bytes,
304 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
305 Self::schedule_write(self, page_id, data)
306 }
307
308 fn schedule_allocate(&self) -> QuillSQLResult<Receiver<QuillSQLResult<PageId>>> {
309 Self::schedule_allocate(self)
310 }
311
312 fn schedule_deallocate(
313 &self,
314 page_id: PageId,
315 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
316 Self::schedule_deallocate(self, page_id)
317 }
318}
319
320#[cfg(test)]
322mod tests {
323 use super::DiskManager;
324 use super::*;
325 use crate::buffer::PAGE_SIZE;
326 use crate::error::QuillSQLResult;
327 use bytes::{Bytes, BytesMut};
328 use std::sync::Arc;
329 use std::thread;
330 use std::time::Duration;
331 use tempfile::TempDir;
332
333 fn create_test_scheduler() -> (TempDir, Arc<DiskScheduler>, Arc<DiskManager>) {
335 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
336 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
337 let scheduler = Arc::new(DiskScheduler::new(dm.clone()));
338 (temp_dir, scheduler, dm)
339 }
340
341 fn create_dummy_page_bytes(content: &str) -> Bytes {
343 let mut data = BytesMut::zeroed(PAGE_SIZE);
344 let content_bytes = content.as_bytes();
345 let len = std::cmp::min(content_bytes.len(), PAGE_SIZE);
346 data[..len].copy_from_slice(&content_bytes[..len]);
347 data.freeze() }
349
350 fn read_page_content(data: &BytesMut) -> String {
352 let first_null = data.iter().position(|&b| b == 0).unwrap_or(data.len());
353 String::from_utf8_lossy(&data[..first_null]).to_string()
354 }
355
356 #[test]
357 fn test_scheduler_allocate_write_read() -> QuillSQLResult<()> {
358 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
359
360 let rx_alloc = scheduler.schedule_allocate()?;
362 let page_id = rx_alloc
363 .recv()
364 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
365
366 let content = "Hello DiskScheduler!";
368 let data_bytes = create_dummy_page_bytes(content);
369 let rx_write = scheduler.schedule_write(page_id, data_bytes)?;
370 rx_write
371 .recv()
372 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
373
374 let rx_read = scheduler.schedule_read(page_id)?;
376 let read_result = rx_read
377 .recv()
378 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
379 assert_eq!(read_page_content(&read_result), content);
380
381 Ok(())
382 }
383
384 #[test]
385 fn test_scheduler_deallocate() -> QuillSQLResult<()> {
386 let (_temp_dir, scheduler, dm) = create_test_scheduler();
387
388 let page_id = scheduler
390 .schedule_allocate()?
391 .recv()
392 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
393
394 scheduler
395 .schedule_write(page_id, create_dummy_page_bytes("Test Data"))?
396 .recv()
397 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
398
399 let rx_dealloc = scheduler.schedule_deallocate(page_id)?;
401 rx_dealloc
402 .recv()
403 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
404
405 let data_after_dealloc = dm.read_page(page_id)?;
407 assert!(data_after_dealloc.iter().all(|&b| b == 0));
408
409 Ok(())
410 }
411
412 #[test]
413 fn test_concurrent_operations() -> QuillSQLResult<()> {
414 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
415
416 let page_id = scheduler
418 .schedule_allocate()?
419 .recv()
420 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
421
422 scheduler
423 .schedule_write(page_id, create_dummy_page_bytes("Concurrent Test"))?
424 .recv()
425 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
426
427 let mut handles = vec![];
429 let num_threads = 10; for i in 0..num_threads {
432 let scheduler_clone = scheduler.clone();
433 let handle = thread::spawn(move || {
434 thread::sleep(Duration::from_millis(i * 5));
436
437 scheduler_clone
438 .schedule_read(page_id)
439 .map_err(|e| e.to_string())
440 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
441 .and_then(|res| res.map_err(|e| e.to_string()))
442 });
443 handles.push(handle);
444 }
445
446 for handle in handles {
448 match handle.join().unwrap() {
449 Ok(read_data) => assert_eq!(read_page_content(&read_data), "Concurrent Test"),
450 Err(e) => panic!("Concurrent read thread failed: {}", e),
451 }
452 }
453
454 Ok(())
455 }
456
457 #[test]
458 fn test_mixed_operations() -> QuillSQLResult<()> {
459 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
460
461 let mut page_ids = vec![];
463 let num_pages = 5;
464
465 for _ in 0..num_pages {
466 let page_id = scheduler
467 .schedule_allocate()?
468 .recv()
469 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
470 page_ids.push(page_id);
471 }
472
473 for (i, &page_id) in page_ids.iter().enumerate() {
475 let content = format!("Page {} content", i);
476
477 scheduler
479 .schedule_write(page_id, create_dummy_page_bytes(&content))?
480 .recv()
481 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
482
483 let read_data = scheduler
485 .schedule_read(page_id)?
486 .recv()
487 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
488
489 assert_eq!(read_page_content(&read_data), content);
490 }
491
492 for &page_id in page_ids.iter().take(2) {
494 scheduler
495 .schedule_deallocate(page_id)?
496 .recv()
497 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
498 }
499
500 Ok(())
501 }
502
503 #[test]
504 fn test_scheduler_shutdown() -> QuillSQLResult<()> {
505 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
506 let scheduler_arc = scheduler;
507
508 let scheduler_clone = scheduler_arc.clone();
510 let handle = thread::spawn(move || {
511 thread::sleep(Duration::from_millis(100));
513
514 scheduler_clone
516 .schedule_allocate()
517 .map_err(|e| e.to_string())
518 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
519 .and_then(|res| res.map_err(|e| e.to_string()))
520 });
521
522 drop(scheduler_arc);
524
525 match handle.join().unwrap() {
527 Ok(page_id) => println!("Thread completed after shutdown: {:?}", page_id),
528 Err(e) => println!("Thread failed as expected after shutdown: {}", e),
529 }
530
531 Ok(())
532 }
533
534 #[test]
535 fn test_large_data_transfer() -> QuillSQLResult<()> {
536 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
537
538 let page_id = scheduler
540 .schedule_allocate()?
541 .recv()
542 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
543
544 let large_string = "X".repeat(PAGE_SIZE - 100);
546 let large_data = create_dummy_page_bytes(&large_string);
547
548 scheduler
550 .schedule_write(page_id, large_data)?
551 .recv()
552 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
553
554 let read_result = scheduler
556 .schedule_read(page_id)?
557 .recv()
558 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
559
560 let read_content = read_page_content(&read_result);
562 assert_eq!(read_content.len(), large_string.len());
563 assert_eq!(&read_content[0..10], &large_string[0..10]); Ok(())
566 }
567
568 #[cfg(target_os = "linux")]
569 #[test]
570 fn test_iouring_allocate_write_read() -> QuillSQLResult<()> {
571 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
572 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
573 let scheduler = Arc::new(DiskScheduler::new_with_strategy(
574 dm.clone(),
575 IOStrategy::IoUring {
576 queue_depth: Some(256),
577 },
578 ));
579
580 let rx_alloc = scheduler.schedule_allocate()?;
582 let page_id = rx_alloc
583 .recv()
584 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
585
586 let mut data = BytesMut::zeroed(PAGE_SIZE);
588 data[..4].copy_from_slice(&[1, 2, 3, 4]);
589 scheduler
590 .schedule_write(page_id, data.freeze())?
591 .recv()
592 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
593
594 let read = scheduler
596 .schedule_read(page_id)?
597 .recv()
598 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
599 assert_eq!(&read[..4], &[1, 2, 3, 4]);
600 Ok(())
601 }
602
603 #[cfg(target_os = "linux")]
604 #[test]
605 fn test_iouring_concurrent_reads() -> QuillSQLResult<()> {
606 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
607 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
608 let scheduler = Arc::new(DiskScheduler::new_with_strategy(
609 dm.clone(),
610 IOStrategy::IoUring {
611 queue_depth: Some(256),
612 },
613 ));
614
615 let page_id = scheduler
616 .schedule_allocate()?
617 .recv()
618 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
619
620 scheduler
621 .schedule_write(page_id, {
622 let mut b = BytesMut::zeroed(PAGE_SIZE);
623 b[..13].copy_from_slice(b"Hello, World!");
624 b.freeze()
625 })?
626 .recv()
627 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
628
629 let mut handles = vec![];
630 for _ in 0..8u32 {
631 let s = scheduler.clone();
632 handles.push(thread::spawn(move || {
633 let data = s
634 .schedule_read(page_id)
635 .map_err(|e| e.to_string())
636 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
637 .and_then(|res| res.map_err(|e| e.to_string()))?;
638 if &data[..13] != b"Hello, World!" {
639 return Err("mismatch".into());
640 }
641 Ok::<(), String>(())
642 }));
643 }
644 for h in handles {
645 h.join().unwrap().unwrap();
646 }
647 Ok(())
648 }
649}