1use super::disk_manager::DiskManager;
2use crate::buffer::PageId;
3use crate::config::IOSchedulerConfig;
4use crate::error::{QuillSQLError, QuillSQLResult};
5use bytes::{Bytes, BytesMut};
6use std::path::PathBuf;
7use std::sync::mpsc::{self, Receiver, Sender};
8use std::sync::Arc;
9use std::thread;
10
11#[cfg(target_os = "linux")]
12use crate::storage::io::io_uring;
13
14#[derive(Debug)]
15pub enum DiskError {
16 Io(std::io::Error),
17 Cancelled,
18}
19
20pub enum DiskResponse {
21 Read { data: BytesMut },
22 Write,
23 Allocate { page_id: PageId },
24 Error(QuillSQLError),
25}
26pub type DiskCommandResultSender<T> = Sender<QuillSQLResult<T>>;
28pub type DiskCommandResultReceiver<T> = Receiver<QuillSQLResult<T>>;
30
31#[derive(Debug, Clone)]
33pub enum DiskRequest {
34 ReadPage {
35 page_id: PageId,
36 result_sender: DiskCommandResultSender<BytesMut>,
37 },
38 ReadPages {
40 page_ids: Vec<PageId>,
41 result_sender: DiskCommandResultSender<Vec<BytesMut>>,
42 },
43 WritePage {
44 page_id: PageId,
45 data: Bytes,
46 result_sender: DiskCommandResultSender<()>,
47 },
48 WriteWal {
49 path: PathBuf,
50 offset: u64,
51 data: Bytes,
52 sync: bool,
53 result_sender: DiskCommandResultSender<()>,
54 },
55 FsyncWal {
56 path: PathBuf,
57 result_sender: DiskCommandResultSender<()>,
58 },
59 AllocatePage {
60 result_sender: Sender<QuillSQLResult<PageId>>,
61 },
62 DeallocatePage {
63 page_id: PageId,
64 result_sender: DiskCommandResultSender<()>,
65 },
66 Shutdown,
67}
68
69#[derive(Debug)]
71pub struct DiskScheduler {
72 request_sender: Sender<DiskRequest>,
73 dispatcher_thread: Option<thread::JoinHandle<()>>,
74 worker_threads: Vec<thread::JoinHandle<()>>,
75 pub config: IOSchedulerConfig,
76}
77
78impl DiskScheduler {
79 pub fn new(disk_manager: Arc<DiskManager>) -> Self {
80 Self::new_with_config(disk_manager, IOSchedulerConfig::default())
81 }
82
83 pub fn new_with_config(disk_manager: Arc<DiskManager>, config: IOSchedulerConfig) -> Self {
84 #[cfg(target_os = "linux")]
85 {
86 let (request_sender, dispatcher_thread, worker_threads) =
87 spawn_runtime(disk_manager.clone(), config);
88 return DiskScheduler {
89 request_sender,
90 dispatcher_thread: Some(dispatcher_thread),
91 worker_threads,
92 config,
93 };
94 }
95
96 #[cfg(not(target_os = "linux"))]
97 {
98 panic!("io_uring disk scheduler requires Linux");
99 }
100 }
101
102 pub fn schedule_read(
105 &self,
106 page_id: PageId,
107 ) -> QuillSQLResult<DiskCommandResultReceiver<BytesMut>> {
108 let (tx, rx) = mpsc::channel();
109 self.request_sender
110 .send(DiskRequest::ReadPage {
111 page_id,
112 result_sender: tx,
113 })
114 .map_err(|e| QuillSQLError::Internal(format!("Failed to send Read request: {}", e)))?;
115 Ok(rx)
116 }
117
118 pub fn schedule_write(
119 &self,
120 page_id: PageId,
121 data: Bytes,
122 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
123 let (tx, rx) = mpsc::channel();
124 self.request_sender
125 .send(DiskRequest::WritePage {
126 page_id,
127 data,
128 result_sender: tx,
129 })
130 .map_err(|e| QuillSQLError::Internal(format!("Failed to send Write request: {}", e)))?;
131 Ok(rx)
132 }
133
134 pub fn schedule_wal_write(
135 &self,
136 path: PathBuf,
137 offset: u64,
138 data: Bytes,
139 sync: bool,
140 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
141 let (tx, rx) = mpsc::channel();
142 self.request_sender
143 .send(DiskRequest::WriteWal {
144 path,
145 offset,
146 data,
147 sync,
148 result_sender: tx,
149 })
150 .map_err(|e| {
151 QuillSQLError::Internal(format!("Failed to send WAL write request: {}", e))
152 })?;
153 Ok(rx)
154 }
155
156 pub fn schedule_wal_fsync(
157 &self,
158 path: PathBuf,
159 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
160 let (tx, rx) = mpsc::channel();
161 self.request_sender
162 .send(DiskRequest::FsyncWal {
163 path,
164 result_sender: tx,
165 })
166 .map_err(|e| {
167 QuillSQLError::Internal(format!("Failed to send WAL fsync request: {}", e))
168 })?;
169 Ok(rx)
170 }
171
172 pub fn schedule_read_pages(
173 &self,
174 page_ids: Vec<PageId>,
175 ) -> QuillSQLResult<DiskCommandResultReceiver<Vec<BytesMut>>> {
176 let (tx, rx) = mpsc::channel();
177 self.request_sender
178 .send(DiskRequest::ReadPages {
179 page_ids,
180 result_sender: tx,
181 })
182 .map_err(|e| {
183 QuillSQLError::Internal(format!("Failed to send ReadPages request: {}", e))
184 })?;
185 Ok(rx)
186 }
187
188 pub fn schedule_allocate(&self) -> QuillSQLResult<Receiver<QuillSQLResult<PageId>>> {
191 let (tx, rx) = mpsc::channel();
192 self.request_sender
193 .send(DiskRequest::AllocatePage { result_sender: tx })
194 .map_err(|e| {
195 QuillSQLError::Internal(format!("Failed to send Allocate request: {}", e))
196 })?;
197 Ok(rx)
198 }
199
200 pub fn schedule_deallocate(
201 &self,
202 page_id: PageId,
203 ) -> QuillSQLResult<DiskCommandResultReceiver<()>> {
204 let (tx, rx) = mpsc::channel();
205 self.request_sender
206 .send(DiskRequest::DeallocatePage {
207 page_id,
208 result_sender: tx,
209 })
210 .map_err(|e| {
211 QuillSQLError::Internal(format!("Failed to send Deallocate request: {}", e))
212 })?;
213 Ok(rx)
214 }
215}
216
217impl Drop for DiskScheduler {
219 fn drop(&mut self) {
220 let _ = self.request_sender.send(DiskRequest::Shutdown);
221 if let Some(handle) = self.dispatcher_thread.take() {
222 if let Err(e) = handle.join() {
223 log::error!("Disk dispatcher thread panicked: {:?}", e);
224 }
225 }
226 for handle in self.worker_threads.drain(..) {
227 if let Err(e) = handle.join() {
228 log::error!("Disk worker thread panicked: {:?}", e);
229 }
230 }
231 }
232}
233
234#[cfg(target_os = "linux")]
235fn spawn_runtime(
236 disk_manager: Arc<DiskManager>,
237 config: IOSchedulerConfig,
238) -> (
239 Sender<DiskRequest>,
240 thread::JoinHandle<()>,
241 Vec<thread::JoinHandle<()>>,
242) {
243 let worker_count = config.workers;
244 let (request_sender, request_receiver) = mpsc::channel::<DiskRequest>();
245
246 let mut worker_senders = Vec::with_capacity(worker_count);
247 let mut worker_threads = Vec::with_capacity(worker_count);
248 for i in 0..worker_count {
249 let (tx, rx) = mpsc::channel::<DiskRequest>();
250 worker_senders.push(tx);
251 let dm = disk_manager.clone();
252 let worker_config = config;
253 let entries = worker_config.iouring_queue_depth as u32;
254 let fixed_count = worker_config.iouring_fixed_buffers;
255 let sqpoll_idle = worker_config.iouring_sqpoll_idle_ms;
256 let fsync_on_write = worker_config.fsync_on_write;
257 let handle = thread::Builder::new()
258 .name(format!("disk-scheduler-iouring-worker-{}", i))
259 .spawn(move || {
260 io_uring::worker_loop(rx, dm, entries, fixed_count, sqpoll_idle, fsync_on_write);
261 })
262 .expect("Failed to spawn DiskScheduler io_uring worker thread");
263 worker_threads.push(handle);
264 }
265
266 let dispatcher_thread = thread::Builder::new()
267 .name("disk-scheduler-dispatcher".to_string())
268 .spawn(move || dispatcher_loop(request_receiver, worker_senders))
269 .expect("Failed to spawn DiskScheduler dispatcher thread");
270
271 (request_sender, dispatcher_thread, worker_threads)
272}
273
274#[cfg(target_os = "linux")]
275fn dispatcher_loop(receiver: Receiver<DiskRequest>, workers: Vec<Sender<DiskRequest>>) {
276 log::debug!("DiskScheduler dispatcher started");
277 let mut rr = 0usize;
278 let worker_len = workers.len();
279 while let Ok(req) = receiver.recv() {
280 match req {
281 DiskRequest::Shutdown => {
282 for tx in &workers {
283 let _ = tx.send(DiskRequest::Shutdown);
284 }
285 break;
286 }
287 other => {
288 if worker_len == 0 {
289 log::error!("No io_uring workers available");
290 break;
291 }
292 let idx = rr % worker_len;
293 rr = rr.wrapping_add(1);
294 if workers[idx].send(other).is_err() {
295 log::error!("Failed to forward request to io_uring worker");
296 }
297 }
298 }
299 }
300 log::debug!("DiskScheduler dispatcher finished");
301}
302
303#[cfg(test)]
305mod tests {
306 use super::DiskManager;
307 use super::*;
308 use crate::buffer::PAGE_SIZE;
309 use crate::error::QuillSQLResult;
310 use bytes::{Bytes, BytesMut};
311 use std::sync::Arc;
312 use std::thread;
313 use std::time::Duration;
314 use tempfile::TempDir;
315
316 fn create_test_scheduler() -> (TempDir, Arc<DiskScheduler>, Arc<DiskManager>) {
318 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
319 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
320 let scheduler = Arc::new(DiskScheduler::new(dm.clone()));
321 (temp_dir, scheduler, dm)
322 }
323
324 fn create_dummy_page_bytes(content: &str) -> Bytes {
326 let mut data = BytesMut::zeroed(PAGE_SIZE);
327 let content_bytes = content.as_bytes();
328 let len = std::cmp::min(content_bytes.len(), PAGE_SIZE);
329 data[..len].copy_from_slice(&content_bytes[..len]);
330 data.freeze() }
332
333 fn read_page_content(data: &BytesMut) -> String {
335 let first_null = data.iter().position(|&b| b == 0).unwrap_or(data.len());
336 String::from_utf8_lossy(&data[..first_null]).to_string()
337 }
338
339 #[test]
340 fn test_scheduler_allocate_write_read() -> QuillSQLResult<()> {
341 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
342
343 let rx_alloc = scheduler.schedule_allocate()?;
345 let page_id = rx_alloc
346 .recv()
347 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
348
349 let content = "Hello DiskScheduler!";
351 let data_bytes = create_dummy_page_bytes(content);
352 let rx_write = scheduler.schedule_write(page_id, data_bytes)?;
353 rx_write
354 .recv()
355 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
356
357 let rx_read = scheduler.schedule_read(page_id)?;
359 let read_result = rx_read
360 .recv()
361 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
362 assert_eq!(read_page_content(&read_result), content);
363
364 Ok(())
365 }
366
367 #[test]
368 fn test_scheduler_deallocate() -> QuillSQLResult<()> {
369 let (_temp_dir, scheduler, dm) = create_test_scheduler();
370
371 let page_id = scheduler
373 .schedule_allocate()?
374 .recv()
375 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
376
377 scheduler
378 .schedule_write(page_id, create_dummy_page_bytes("Test Data"))?
379 .recv()
380 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
381
382 let rx_dealloc = scheduler.schedule_deallocate(page_id)?;
384 rx_dealloc
385 .recv()
386 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
387
388 let data_after_dealloc = dm.read_page(page_id)?;
390 assert!(data_after_dealloc.iter().all(|&b| b == 0));
391
392 Ok(())
393 }
394
395 #[test]
396 fn test_concurrent_operations() -> QuillSQLResult<()> {
397 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
398
399 let page_id = scheduler
401 .schedule_allocate()?
402 .recv()
403 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
404
405 scheduler
406 .schedule_write(page_id, create_dummy_page_bytes("Concurrent Test"))?
407 .recv()
408 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
409
410 let mut handles = vec![];
412 let num_threads = 10; for i in 0..num_threads {
415 let scheduler_clone = scheduler.clone();
416 let handle = thread::spawn(move || {
417 thread::sleep(Duration::from_millis(i * 5));
419
420 scheduler_clone
421 .schedule_read(page_id)
422 .map_err(|e| e.to_string())
423 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
424 .and_then(|res| res.map_err(|e| e.to_string()))
425 });
426 handles.push(handle);
427 }
428
429 for handle in handles {
431 match handle.join().unwrap() {
432 Ok(read_data) => assert_eq!(read_page_content(&read_data), "Concurrent Test"),
433 Err(e) => panic!("Concurrent read thread failed: {}", e),
434 }
435 }
436
437 Ok(())
438 }
439
440 #[test]
441 fn test_mixed_operations() -> QuillSQLResult<()> {
442 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
443
444 let mut page_ids = vec![];
446 let num_pages = 5;
447
448 for _ in 0..num_pages {
449 let page_id = scheduler
450 .schedule_allocate()?
451 .recv()
452 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
453 page_ids.push(page_id);
454 }
455
456 for (i, &page_id) in page_ids.iter().enumerate() {
458 let content = format!("Page {} content", i);
459
460 scheduler
462 .schedule_write(page_id, create_dummy_page_bytes(&content))?
463 .recv()
464 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
465
466 let read_data = scheduler
468 .schedule_read(page_id)?
469 .recv()
470 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
471
472 assert_eq!(read_page_content(&read_data), content);
473 }
474
475 for &page_id in page_ids.iter().take(2) {
477 scheduler
478 .schedule_deallocate(page_id)?
479 .recv()
480 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
481 }
482
483 Ok(())
484 }
485
486 #[test]
487 fn test_scheduler_shutdown() -> QuillSQLResult<()> {
488 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
489 let scheduler_arc = scheduler;
490
491 let scheduler_clone = scheduler_arc.clone();
493 let handle = thread::spawn(move || {
494 thread::sleep(Duration::from_millis(100));
496
497 scheduler_clone
499 .schedule_allocate()
500 .map_err(|e| e.to_string())
501 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
502 .and_then(|res| res.map_err(|e| e.to_string()))
503 });
504
505 drop(scheduler_arc);
507
508 match handle.join().unwrap() {
510 Ok(page_id) => println!("Thread completed after shutdown: {:?}", page_id),
511 Err(e) => println!("Thread failed as expected after shutdown: {}", e),
512 }
513
514 Ok(())
515 }
516
517 #[test]
518 fn test_large_data_transfer() -> QuillSQLResult<()> {
519 let (_temp_dir, scheduler, _dm) = create_test_scheduler();
520
521 let page_id = scheduler
523 .schedule_allocate()?
524 .recv()
525 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
526
527 let large_string = "X".repeat(PAGE_SIZE - 100);
529 let large_data = create_dummy_page_bytes(&large_string);
530
531 scheduler
533 .schedule_write(page_id, large_data)?
534 .recv()
535 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
536
537 let read_result = scheduler
539 .schedule_read(page_id)?
540 .recv()
541 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
542
543 let read_content = read_page_content(&read_result);
545 assert_eq!(read_content.len(), large_string.len());
546 assert_eq!(&read_content[0..10], &large_string[0..10]); Ok(())
549 }
550
551 #[cfg(target_os = "linux")]
552 #[test]
553 fn test_iouring_allocate_write_read() -> QuillSQLResult<()> {
554 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
555 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
556 let mut cfg = IOSchedulerConfig::default();
557 cfg.iouring_queue_depth = 256;
558 let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
559
560 let rx_alloc = scheduler.schedule_allocate()?;
562 let page_id = rx_alloc
563 .recv()
564 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
565
566 let mut data = BytesMut::zeroed(PAGE_SIZE);
568 data[..4].copy_from_slice(&[1, 2, 3, 4]);
569 scheduler
570 .schedule_write(page_id, data.freeze())?
571 .recv()
572 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
573
574 let read = scheduler
576 .schedule_read(page_id)?
577 .recv()
578 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
579 assert_eq!(&read[..4], &[1, 2, 3, 4]);
580 Ok(())
581 }
582
583 #[cfg(target_os = "linux")]
584 #[test]
585 fn test_iouring_concurrent_reads() -> QuillSQLResult<()> {
586 let temp_dir = TempDir::new().expect("unable to create temporary working directory");
587 let dm = Arc::new(DiskManager::try_new(temp_dir.path().join("test.db")).unwrap());
588 let mut cfg = IOSchedulerConfig::default();
589 cfg.iouring_queue_depth = 256;
590 let scheduler = Arc::new(DiskScheduler::new_with_config(dm.clone(), cfg));
591
592 let page_id = scheduler
593 .schedule_allocate()?
594 .recv()
595 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
596
597 scheduler
598 .schedule_write(page_id, {
599 let mut b = BytesMut::zeroed(PAGE_SIZE);
600 b[..13].copy_from_slice(b"Hello, World!");
601 b.freeze()
602 })?
603 .recv()
604 .map_err(|e| QuillSQLError::Internal(format!("RecvError: {}", e)))??;
605
606 let mut handles = vec![];
607 for _ in 0..8u32 {
608 let s = scheduler.clone();
609 handles.push(thread::spawn(move || {
610 let data = s
611 .schedule_read(page_id)
612 .map_err(|e| e.to_string())
613 .and_then(|rx| rx.recv().map_err(|e| e.to_string()))
614 .and_then(|res| res.map_err(|e| e.to_string()))?;
615 if &data[..13] != b"Hello, World!" {
616 return Err("mismatch".into());
617 }
618 Ok::<(), String>(())
619 }));
620 }
621 for h in handles {
622 h.join().unwrap().unwrap();
623 }
624 Ok(())
625 }
626}