1pub mod codec;
2pub mod page;
3
4mod buffer;
5mod io;
6mod record;
7mod storage;
8mod writer;
9
10use parking_lot::{Condvar, Mutex};
11use std::cmp;
12use std::collections::VecDeque;
13use std::fmt;
14use std::fs::{self, File};
15use std::io::{BufReader, ErrorKind, Read};
16use std::path::PathBuf;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::Arc;
19use std::time::Duration;
20
21use crate::buffer::{PageId, PAGE_SIZE};
22use crate::config::WalConfig;
23use crate::error::{QuillSQLError, QuillSQLResult};
24use crate::recovery::control_file::{ControlFileManager, WalInitState};
25use crate::recovery::wal::codec::{
26 decode_frame, encode_body, encode_frame, CheckpointPayload, PageWritePayload,
27 ResourceManagerId, WalFrame, WAL_CRC_LEN, WAL_HEADER_LEN,
28};
29use crate::recovery::wal::page::{WalPage, WalPageFragmentKind, WAL_PAGE_SIZE};
30use crate::recovery::wal_record::WalRecordPayload;
31use crate::storage::disk_manager::DiskManager;
32use crate::storage::disk_scheduler::DiskScheduler;
33use bytes::Bytes;
34use dashmap::DashSet;
35use serde::Serialize;
36
37use buffer::WalBuffer;
38use io::{DiskSchedulerWalSink, WalSink};
39use record::WalRecord;
40use storage::{list_segments, segment_path, WalFlushTicket, WalStorage};
41use writer::WalWriterRuntime;
42
43pub type Lsn = u64;
44const DEFAULT_WAL_BUFFER_CAPACITY: usize = 4096;
45
46#[derive(Debug, Clone, Copy)]
47pub struct WalAppendContext {
48 pub start_lsn: Lsn,
49 pub end_lsn: Lsn,
50}
51
52#[derive(Debug, Clone, Copy)]
53pub struct WalAppendResult {
54 pub start_lsn: Lsn,
55 pub end_lsn: Lsn,
56}
57
58struct WalState {
59 storage: WalStorage,
60}
61
62pub struct WalManager {
63 next_lsn: AtomicU64,
64 durable_lsn: AtomicU64,
65 buffer: WalBuffer,
66 state: Mutex<WalState>,
67 writer: Mutex<Option<WalWriterRuntime>>,
68 persist_control_file_on_flush: bool,
69 max_buffer_records: usize,
70 flush_coalesce_bytes: usize,
71 last_checkpoint: AtomicU64,
72 last_record_start: AtomicU64,
73 control_file: Option<Arc<ControlFileManager>>,
74 flush_lock: Mutex<()>,
75 flush_cond: Condvar,
76 checkpoint_redo_start: AtomicU64,
77 touched_pages: DashSet<PageId>,
78}
79
80#[derive(Debug, Clone, Serialize)]
81pub struct WalHeadDebug {
82 pub durable_lsn: Lsn,
83 pub max_assigned_lsn: Lsn,
84 pub last_checkpoint_lsn: Lsn,
85 pub pending_records: usize,
86 pub pending_bytes: usize,
87 pub has_background_writer: bool,
88 pub flush_coalesce_bytes: usize,
89}
90
91#[derive(Debug, Clone, Serialize)]
92pub struct WalSegmentDebug {
93 pub id: u64,
94 pub size_bytes: u64,
95}
96
97#[derive(Debug, Clone, Serialize)]
98pub struct WalPeekDebug {
99 pub lsn: Lsn,
100 pub prev_lsn: Lsn,
101 pub rmid: String,
102 pub info: u8,
103 pub body_len: usize,
104}
105
106pub struct WalWriterHandle {
107 manager: Option<Arc<WalManager>>,
108}
109
110impl WalWriterHandle {
111 fn new(manager: Arc<WalManager>) -> Self {
112 Self {
113 manager: Some(manager),
114 }
115 }
116
117 pub fn stop(mut self) -> QuillSQLResult<()> {
118 if let Some(manager) = self.manager.take() {
119 manager.stop_background_writer()
120 } else {
121 Ok(())
122 }
123 }
124}
125
126impl Drop for WalWriterHandle {
127 fn drop(&mut self) {
128 if let Some(manager) = self.manager.take() {
129 let _ = manager.stop_background_writer();
130 }
131 }
132}
133
134impl WalManager {
135 pub fn new(
136 config: WalConfig,
137 init_state: Option<WalInitState>,
138 control_file: Option<Arc<ControlFileManager>>,
139 ) -> QuillSQLResult<Self> {
140 let scheduler = Self::default_scheduler(&config)?;
141 Self::new_with_scheduler(config, init_state, control_file, scheduler)
142 }
143
144 pub fn new_with_scheduler(
145 config: WalConfig,
146 init_state: Option<WalInitState>,
147 control_file: Option<Arc<ControlFileManager>>,
148 scheduler: Arc<DiskScheduler>,
149 ) -> QuillSQLResult<Self> {
150 let buffer_capacity_cfg = config.buffer_capacity;
151 let max_buffer_records = if buffer_capacity_cfg == 0 {
152 usize::MAX
153 } else {
154 buffer_capacity_cfg
155 };
156 let flush_bytes = if config.flush_coalesce_bytes == 0 {
157 usize::MAX
158 } else {
159 config.flush_coalesce_bytes
160 };
161 let sink: Arc<dyn WalSink> = Arc::new(DiskSchedulerWalSink::new(scheduler.clone()));
162 let persist_control_file_on_flush = config.persist_control_file_on_flush;
163 let mut storage = WalStorage::new(config, sink)?;
164 let (durable_ptr, next_ptr, checkpoint_ptr, last_record_start, redo_start) =
165 if let Some(state) = init_state {
166 (
167 state.durable_lsn,
168 state.max_assigned_lsn,
169 state.last_checkpoint_lsn,
170 state.last_record_start,
171 state.checkpoint_redo_start,
172 )
173 } else {
174 let (next_offset, last_start) = storage.recover_offsets()?;
175 (next_offset, next_offset, last_start, last_start, 0)
176 };
177 let ring_capacity = if buffer_capacity_cfg == 0 {
178 DEFAULT_WAL_BUFFER_CAPACITY
179 } else {
180 buffer_capacity_cfg.max(DEFAULT_WAL_BUFFER_CAPACITY)
181 };
182 Ok(Self {
183 next_lsn: AtomicU64::new(next_ptr),
184 durable_lsn: AtomicU64::new(durable_ptr),
185 buffer: WalBuffer::with_capacity(ring_capacity),
186 state: Mutex::new(WalState { storage }),
187 writer: Mutex::new(None),
188 persist_control_file_on_flush,
189 max_buffer_records,
190 flush_coalesce_bytes: flush_bytes,
191 last_checkpoint: AtomicU64::new(checkpoint_ptr),
192 last_record_start: AtomicU64::new(last_record_start),
193 control_file,
194 flush_lock: Mutex::new(()),
195 flush_cond: Condvar::new(),
196 checkpoint_redo_start: AtomicU64::new(redo_start),
197 touched_pages: DashSet::new(),
198 })
199 }
200
201 #[inline]
202 pub fn max_assigned_lsn(&self) -> Lsn {
203 self.next_lsn.load(Ordering::Acquire)
204 }
205
206 #[inline]
207 pub fn durable_lsn(&self) -> Lsn {
208 self.durable_lsn.load(Ordering::Acquire)
209 }
210
211 pub fn debug_head(&self) -> WalHeadDebug {
212 let pending = self.pending_records();
213 let pending_bytes: usize = pending.iter().map(|rec| rec.payload.len()).sum();
214 WalHeadDebug {
215 durable_lsn: self.durable_lsn(),
216 max_assigned_lsn: self.max_assigned_lsn(),
217 last_checkpoint_lsn: self.last_checkpoint_lsn(),
218 pending_records: pending.len(),
219 pending_bytes,
220 has_background_writer: self.has_background_writer(),
221 flush_coalesce_bytes: self.flush_coalesce_bytes,
222 }
223 }
224
225 pub fn debug_segments(&self) -> QuillSQLResult<Vec<WalSegmentDebug>> {
226 let guard = self.state.lock();
227 let directory = guard.storage.directory_path();
228 drop(guard);
229 let mut info = Vec::new();
230 for id in list_segments(&directory)? {
231 let path = segment_path(&directory, id);
232 let size_bytes = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
233 info.push(WalSegmentDebug { id, size_bytes });
234 }
235 Ok(info)
236 }
237
238 pub fn debug_peek(&self, limit: usize) -> QuillSQLResult<Vec<WalPeekDebug>> {
239 let mut reader = self.reader()?;
240 let mut frames = Vec::new();
241 while let Some(frame) = reader.next_frame()? {
242 frames.push(frame);
243 }
244 let start = frames.len().saturating_sub(limit);
245 Ok(frames[start..]
246 .iter()
247 .map(|frame| WalPeekDebug {
248 lsn: frame.lsn,
249 prev_lsn: frame.prev_lsn,
250 rmid: match frame.rmid {
251 ResourceManagerId::Page => "Page",
252 ResourceManagerId::Transaction => "Transaction",
253 ResourceManagerId::Heap => "Heap",
254 ResourceManagerId::Index => "Index",
255 ResourceManagerId::Checkpoint => "Checkpoint",
256 ResourceManagerId::Clr => "Clr",
257 }
258 .to_string(),
259 info: frame.info,
260 body_len: frame.body.len(),
261 })
262 .collect())
263 }
264
265 pub fn append_record_with<F>(&self, mut build: F) -> QuillSQLResult<WalAppendResult>
266 where
267 F: FnMut(WalAppendContext) -> WalRecordPayload,
268 {
269 let preview_ctx = WalAppendContext {
270 start_lsn: 0,
271 end_lsn: WAL_HEADER_LEN as u64 + WAL_CRC_LEN as u64,
272 };
273 let preview_payload = build(preview_ctx);
274 let (_, _, preview_body) = encode_body(&preview_payload);
275 let preview_frame_len = WAL_HEADER_LEN + preview_body.len() + WAL_CRC_LEN;
276
277 let prev_start = self.last_record_start.load(Ordering::Acquire);
278 let start_lsn = self
279 .next_lsn
280 .fetch_add(preview_frame_len as u64, Ordering::SeqCst);
281 let end_lsn_preview = start_lsn + preview_frame_len as u64;
282
283 let preview_ctx = WalAppendContext {
284 start_lsn,
285 end_lsn: end_lsn_preview,
286 };
287 let payload = build(preview_ctx);
288 let frame_bytes = encode_frame(start_lsn, prev_start, &payload);
289 let frame_len = frame_bytes.len();
290 debug_assert_eq!(frame_len, preview_frame_len);
291 let end_lsn = start_lsn + frame_len as u64;
292 let encoded = Bytes::from(frame_bytes);
293
294 self.buffer.push(WalRecord {
295 start_lsn,
296 end_lsn,
297 payload: encoded,
298 });
299
300 let buffer_len = self.buffer.len();
301 let buffer_bytes = self.buffer.bytes();
302 let should_flush = buffer_len >= self.max_buffer_records
303 || buffer_bytes >= self.flush_coalesce_bytes
304 || buffer_bytes >= WAL_PAGE_SIZE;
305
306 self.last_record_start.store(start_lsn, Ordering::Release);
307
308 if should_flush {
309 self.flush_with_mode(Some(end_lsn), false)?;
310 }
311 Ok(WalAppendResult { start_lsn, end_lsn })
312 }
313
314 pub fn log_page_update(
315 &self,
316 page_id: PageId,
317 prev_page_lsn: Lsn,
318 new_image: &[u8],
319 ) -> QuillSQLResult<Option<WalAppendResult>> {
320 if new_image.len() != PAGE_SIZE {
321 return Err(QuillSQLError::Internal(format!(
322 "page {} image size mismatch: new={}",
323 page_id,
324 new_image.len()
325 )));
326 }
327
328 let _ = self.fpw_first_touch(page_id);
330 let page_image = new_image.to_vec();
331 let result = self.append_record_with(|_| {
332 WalRecordPayload::PageWrite(PageWritePayload {
333 page_id,
334 prev_page_lsn,
335 page_image: page_image.clone(),
336 })
337 })?;
338 Ok(Some(result))
339 }
340
341 pub fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
342 let redo_start = payload
343 .dpt
344 .iter()
345 .map(|(_, lsn)| *lsn)
346 .min()
347 .unwrap_or(payload.last_lsn);
348 let result = self.append_record_with(|_| WalRecordPayload::Checkpoint(payload.clone()))?;
349 self.last_checkpoint
350 .store(result.start_lsn, Ordering::Release);
351 self.checkpoint_redo_start
352 .store(redo_start, Ordering::Release);
353 self.flush_with_mode(Some(result.end_lsn), true)?;
354 self.touched_pages.clear();
355 if let Some(ctrl) = &self.control_file {
356 ctrl.update(
357 self.durable_lsn(),
358 self.max_assigned_lsn(),
359 self.last_checkpoint_lsn(),
360 self.last_record_start.load(Ordering::Acquire),
361 self.checkpoint_redo_start.load(Ordering::Acquire),
362 )?;
363 }
364 Ok(result.end_lsn)
365 }
366
367 #[inline]
368 pub fn last_checkpoint_lsn(&self) -> Lsn {
369 self.last_checkpoint.load(Ordering::Acquire)
370 }
371
372 pub fn start_background_flush(
373 self: &Arc<Self>,
374 interval: Duration,
375 ) -> QuillSQLResult<Option<WalWriterHandle>> {
376 if interval.is_zero() {
377 return Ok(None);
378 }
379 let mut guard = self.writer.lock();
380 if guard.is_some() {
381 return Ok(None);
382 }
383 let runtime = WalWriterRuntime::spawn(Arc::downgrade(self), interval)?;
384 *guard = Some(runtime);
385 Ok(Some(WalWriterHandle::new(Arc::clone(self))))
386 }
387
388 fn stop_background_writer(&self) -> QuillSQLResult<()> {
389 if let Some(runtime) = self.writer.lock().take() {
390 runtime.stop()?;
391 }
392 Ok(())
393 }
394
395 pub fn has_background_writer(&self) -> bool {
396 self.writer.lock().is_some()
397 }
398
399 pub fn flush(&self, target: Option<Lsn>) -> QuillSQLResult<Lsn> {
400 self.flush_with_mode(target, false)
401 }
402
403 fn flush_with_mode(&self, target: Option<Lsn>, force_sync: bool) -> QuillSQLResult<Lsn> {
404 let recycle_lsn = self.checkpoint_redo_start.load(Ordering::Acquire);
405
406 let (desired, tickets) = {
407 let mut guard = self.state.lock();
408 let current_durable = self.durable_lsn();
409 let highest_buffered = if self.buffer.is_empty() {
410 current_durable
411 } else {
412 self.buffer.highest_end_lsn()
413 };
414 let mut desired = target.filter(|lsn| *lsn != 0).unwrap_or(highest_buffered);
415 desired = cmp::min(self.max_assigned_lsn(), desired);
416
417 if desired <= current_durable {
418 guard.storage.flush(force_sync)?;
419 guard.storage.recycle_segments(recycle_lsn)?;
420 drop(guard);
421 return Ok(current_durable);
422 }
423
424 let (to_flush, _) = self.buffer.drain_until(desired);
425 if !to_flush.is_empty() {
426 guard.storage.append_records(&to_flush)?;
427 }
428 guard.storage.flush(force_sync)?;
429
430 guard.storage.recycle_segments(recycle_lsn)?;
431 let ready = guard.storage.take_ready(desired);
432 (desired, ready)
433 };
434
435 self.wait_for_flush_tickets(tickets)?;
436 self.durable_lsn.store(desired, Ordering::Release);
437 if self.persist_control_file_on_flush {
438 self.persist_control_file()?;
439 }
440 self.flush_cond.notify_all();
441 Ok(desired)
442 }
443
444 pub fn pending_records(&self) -> Vec<WalRecord> {
445 self.buffer.pending()
446 }
447
448 fn default_scheduler(config: &WalConfig) -> QuillSQLResult<Arc<DiskScheduler>> {
449 fs::create_dir_all(&config.directory)?;
450 let db_path = config.directory.join("wal_scheduler.db");
451 let disk_manager = Arc::new(DiskManager::try_new(&db_path)?);
452 Ok(Arc::new(DiskScheduler::new(disk_manager)))
453 }
454
455 fn wait_for_flush_tickets(&self, tickets: Vec<WalFlushTicket>) -> QuillSQLResult<()> {
456 for ticket in tickets {
457 match ticket.receiver.recv() {
458 Ok(Ok(())) => {}
459 Ok(Err(err)) => return Err(err),
460 Err(err) => {
461 return Err(QuillSQLError::Internal(format!(
462 "WAL flush completion dropped: {}",
463 err
464 )))
465 }
466 }
467 }
468 Ok(())
469 }
470
471 pub fn persist_control_file(&self) -> QuillSQLResult<()> {
472 if let Some(ctrl) = &self.control_file {
473 ctrl.update(
474 self.durable_lsn(),
475 self.max_assigned_lsn(),
476 self.last_checkpoint_lsn(),
477 self.last_record_start.load(Ordering::Acquire),
478 self.checkpoint_redo_start.load(Ordering::Acquire),
479 )?;
480 }
481 Ok(())
482 }
483
484 pub fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
485 if target == 0 {
486 return Ok(self.durable_lsn());
487 }
488 if self.durable_lsn() >= target {
489 return Ok(self.durable_lsn());
490 }
491 self.flush_with_mode(Some(target), true)
492 }
493
494 pub fn wait_for_durable(&self, target: Lsn) -> QuillSQLResult<()> {
495 if target == 0 {
496 return Ok(());
497 }
498 if self.durable_lsn() >= target {
499 return Ok(());
500 }
501 self.flush_with_mode(Some(target), true)?;
502 if self.durable_lsn() >= target {
503 return Ok(());
504 }
505 let mut guard = self.flush_lock.lock();
506 while self.durable_lsn() < target {
507 self.flush_cond.wait(&mut guard);
508 }
509 Ok(())
510 }
511
512 pub fn reader(&self) -> QuillSQLResult<WalReader> {
513 let directory = self.state.lock().storage.directory_path();
514 WalReader::new(directory)
515 }
516
517 pub fn fpw_first_touch(&self, page_id: PageId) -> bool {
518 self.touched_pages.insert(page_id)
519 }
520
521 pub fn control_file(&self) -> Option<Arc<ControlFileManager>> {
522 self.control_file.as_ref().map(Arc::clone)
523 }
524}
525
526impl fmt::Debug for WalManager {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 f.debug_struct("WalManager")
529 .field("next_lsn", &self.next_lsn.load(Ordering::Relaxed))
530 .field("durable_lsn", &self.durable_lsn.load(Ordering::Relaxed))
531 .field("max_buffer_records", &self.max_buffer_records)
532 .field("flush_coalesce_bytes", &self.flush_coalesce_bytes)
533 .finish()
534 }
535}
536
537impl Drop for WalManager {
538 fn drop(&mut self) {
539 if let Some(runtime) = self.writer.lock().take() {
540 let _ = runtime.stop();
541 }
542 }
543}
544
545pub struct WalReader {
546 directory: PathBuf,
547 segments: Vec<u64>,
548 current_idx: usize,
549 cursor: Option<SegmentCursor>,
550}
551
552impl WalReader {
553 pub fn new(directory: PathBuf) -> QuillSQLResult<Self> {
554 let mut segments = list_segments(&directory)?;
555 segments.sort_unstable();
556 Ok(Self {
557 directory,
558 segments,
559 current_idx: 0,
560 cursor: None,
561 })
562 }
563
564 pub fn next_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
565 loop {
566 if self.cursor.is_none() && !self.open_next_segment()? {
567 return Ok(None);
568 }
569
570 if let Some(cursor) = self.cursor.as_mut() {
571 match cursor.read_frame()? {
572 Some(frame) => return Ok(Some(frame)),
573 None => {
574 self.cursor = None;
575 continue;
576 }
577 }
578 } else {
579 return Ok(None);
580 }
581 }
582 }
583
584 fn open_next_segment(&mut self) -> QuillSQLResult<bool> {
585 let Some(segment_id) = self.segments.get(self.current_idx).copied() else {
586 return Ok(false);
587 };
588 let path = segment_path(&self.directory, segment_id);
589 let cursor = SegmentCursor::open(&path)?;
590 self.cursor = Some(cursor);
591 self.current_idx += 1;
592 Ok(true)
593 }
594}
595
596struct SegmentCursor {
597 len: u64,
598 offset: u64,
599 reader: BufReader<File>,
600 fragment_buf: Vec<u8>,
601 ready_frames: VecDeque<WalFrame>,
602}
603
604impl SegmentCursor {
605 fn open(path: &PathBuf) -> QuillSQLResult<Self> {
606 let file = File::open(path)?;
607 let metadata = file.metadata()?;
608 Ok(Self {
609 len: metadata.len(),
610 offset: 0,
611 reader: BufReader::new(file),
612 fragment_buf: Vec::new(),
613 ready_frames: VecDeque::new(),
614 })
615 }
616
617 fn read_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
618 loop {
619 if let Some(frame) = self.ready_frames.pop_front() {
620 return Ok(Some(frame));
621 }
622 if self.offset >= self.len {
623 return Ok(None);
624 }
625 let mut page_buf = vec![0u8; WAL_PAGE_SIZE];
626 if let Err(err) = self.reader.read_exact(&mut page_buf) {
627 if err.kind() == ErrorKind::UnexpectedEof {
628 self.offset = self.len;
629 return Ok(None);
630 }
631 return Err(QuillSQLError::Io(err));
632 }
633 self.offset += WAL_PAGE_SIZE as u64;
634 let page = match WalPage::unpack_frames(&page_buf) {
635 Ok(page) => page,
636 Err(QuillSQLError::Internal(message))
637 if message.contains("truncated")
638 || message.contains("CRC mismatch")
639 || message.contains("Invalid WAL page magic") =>
640 {
641 self.offset = self.len;
642 return Ok(None);
643 }
644 Err(err) => return Err(err),
645 };
646
647 for slot in page.fragments() {
648 let start = slot.offset as usize;
649 let end = start + slot.len as usize;
650 let fragment = &page.payload()[start..end];
651 match slot.kind {
652 WalPageFragmentKind::Complete => {
653 self.fragment_buf.clear();
654 match decode_frame(fragment) {
655 Ok((frame, _)) => {
656 self.ready_frames.push_back(frame);
657 }
658 Err(QuillSQLError::Internal(message))
659 if message.contains("too short")
660 || message.contains("truncated")
661 || message.contains("CRC mismatch") =>
662 {
663 self.offset = self.len;
664 return Ok(None);
665 }
666 Err(err) => return Err(err),
667 }
668 }
669 WalPageFragmentKind::Start => {
670 self.fragment_buf.clear();
671 self.fragment_buf.extend_from_slice(fragment);
672 }
673 WalPageFragmentKind::Middle => {
674 if self.fragment_buf.is_empty() {
675 self.offset = self.len;
676 return Ok(None);
677 }
678 self.fragment_buf.extend_from_slice(fragment);
679 }
680 WalPageFragmentKind::End => {
681 if self.fragment_buf.is_empty() {
682 self.offset = self.len;
683 return Ok(None);
684 }
685 self.fragment_buf.extend_from_slice(fragment);
686 let frame_bytes = std::mem::take(&mut self.fragment_buf);
687 match decode_frame(&frame_bytes) {
688 Ok((frame, _)) => {
689 self.ready_frames.push_back(frame);
690 }
691 Err(QuillSQLError::Internal(message))
692 if message.contains("too short")
693 || message.contains("truncated")
694 || message.contains("CRC mismatch") =>
695 {
696 self.offset = self.len;
697 return Ok(None);
698 }
699 Err(err) => return Err(err),
700 }
701 }
702 }
703 }
704 }
705 }
706}