1pub mod codec;
2pub mod page;
3
4use parking_lot::{Condvar, Mutex};
5use std::cmp;
6use std::collections::VecDeque;
7use std::fmt;
8use std::fs::{self, File};
9use std::io::{self, BufReader, ErrorKind, Read};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::{Arc, Weak};
13use std::thread;
14use std::time::Duration;
15
16use crate::buffer::{PageId, PAGE_SIZE};
17use crate::config::WalConfig;
18use crate::error::{QuillSQLError, QuillSQLResult};
19use crate::recovery::control_file::{ControlFileManager, WalInitState};
20use crate::recovery::wal::codec::{
21 decode_frame, encode_body, encode_frame, CheckpointPayload, PageDeltaPayload, PageWritePayload,
22 WalFrame, WAL_CRC_LEN, WAL_HEADER_LEN,
23};
24use crate::recovery::wal::page::{
25 WalFrameContinuation, WalPage, WalPageFragmentKind, WAL_PAGE_SIZE,
26};
27use crate::recovery::wal_record::WalRecordPayload;
28use crate::storage::disk_manager::DiskManager;
29use crate::storage::disk_scheduler::{DiskCommandResultReceiver, DiskScheduler};
30use crate::utils::util::find_contiguous_diff;
31use bytes::Bytes;
32use dashmap::DashSet;
33
34pub type Lsn = u64;
35
36#[derive(Debug, Clone, Copy)]
37pub struct WalAppendContext {
38 pub start_lsn: Lsn,
39 pub end_lsn: Lsn,
40}
41
42#[derive(Debug, Clone, Copy)]
43pub struct WalAppendResult {
44 pub start_lsn: Lsn,
45 pub end_lsn: Lsn,
46}
47
48#[derive(Clone)]
49pub struct WalRecord {
50 pub start_lsn: Lsn,
51 pub end_lsn: Lsn,
52 pub payload: Bytes,
53}
54
55struct WalState {
56 buffer: Vec<WalRecord>,
57 buffer_bytes: usize,
58 storage: WalStorage,
59 last_record_start: Lsn,
60}
61
62impl std::fmt::Debug for WalState {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("WalState")
65 .field("buffer_len", &self.buffer.len())
66 .field("current_segment", &self.storage.current_segment)
67 .field("last_record_start", &self.last_record_start)
68 .finish()
69 }
70}
71
72pub struct WalManager {
73 next_lsn: AtomicU64,
74 durable_lsn: AtomicU64,
75 state: Mutex<WalState>,
76 writer: Mutex<Option<WalWriterRuntime>>,
77 max_buffer_records: usize,
78 flush_coalesce_bytes: usize,
79 last_checkpoint: AtomicU64,
80 last_record_start: AtomicU64,
81 control_file: Option<Arc<ControlFileManager>>,
82 flush_lock: Mutex<()>,
83 flush_cond: Condvar,
84 checkpoint_redo_start: AtomicU64,
85 touched_pages: DashSet<PageId>,
86}
87
88pub struct WalWriterHandle {
89 manager: Option<Arc<WalManager>>,
90}
91
92impl WalWriterHandle {
93 fn new(manager: Arc<WalManager>) -> Self {
94 Self {
95 manager: Some(manager),
96 }
97 }
98
99 pub fn stop(mut self) -> QuillSQLResult<()> {
100 if let Some(manager) = self.manager.take() {
101 manager.stop_background_writer()
102 } else {
103 Ok(())
104 }
105 }
106}
107
108impl Drop for WalWriterHandle {
109 fn drop(&mut self) {
110 if let Some(manager) = self.manager.take() {
111 let _ = manager.stop_background_writer();
112 }
113 }
114}
115
116impl WalManager {
117 pub fn new(
118 config: WalConfig,
119 init_state: Option<WalInitState>,
120 control_file: Option<Arc<ControlFileManager>>,
121 ) -> QuillSQLResult<Self> {
122 let scheduler = Self::default_scheduler(&config)?;
123 Self::new_with_scheduler(config, init_state, control_file, scheduler)
124 }
125
126 pub fn new_with_scheduler(
127 config: WalConfig,
128 init_state: Option<WalInitState>,
129 control_file: Option<Arc<ControlFileManager>>,
130 scheduler: Arc<DiskScheduler>,
131 ) -> QuillSQLResult<Self> {
132 let max_buffer_records = if config.buffer_capacity == 0 {
133 usize::MAX
134 } else {
135 config.buffer_capacity
136 };
137 let flush_bytes = if config.flush_coalesce_bytes == 0 {
138 usize::MAX
139 } else {
140 config.flush_coalesce_bytes
141 };
142 let mut storage = WalStorage::new(config, scheduler)?;
143 let (durable_ptr, next_ptr, checkpoint_ptr, last_record_start, redo_start) =
144 if let Some(state) = init_state {
145 (
146 state.durable_lsn,
147 state.max_assigned_lsn,
148 state.last_checkpoint_lsn,
149 state.last_record_start,
150 state.checkpoint_redo_start,
151 )
152 } else {
153 let (next_offset, last_start) = storage.recover_offsets()?;
154 (next_offset, next_offset, last_start, last_start, 0)
155 };
156 Ok(Self {
157 next_lsn: AtomicU64::new(next_ptr),
158 durable_lsn: AtomicU64::new(durable_ptr),
159 state: Mutex::new(WalState {
160 buffer: Vec::new(),
161 buffer_bytes: 0,
162 storage,
163 last_record_start,
164 }),
165 writer: Mutex::new(None),
166 max_buffer_records,
167 flush_coalesce_bytes: flush_bytes,
168 last_checkpoint: AtomicU64::new(checkpoint_ptr),
169 last_record_start: AtomicU64::new(last_record_start),
170 control_file,
171 flush_lock: Mutex::new(()),
172 flush_cond: Condvar::new(),
173 checkpoint_redo_start: AtomicU64::new(redo_start),
174 touched_pages: DashSet::new(),
175 })
176 }
177
178 #[inline]
179 pub fn max_assigned_lsn(&self) -> Lsn {
180 self.next_lsn.load(Ordering::Acquire)
181 }
182
183 #[inline]
184 pub fn durable_lsn(&self) -> Lsn {
185 self.durable_lsn.load(Ordering::Acquire)
186 }
187
188 pub fn append_record_with<F>(&self, mut build: F) -> QuillSQLResult<WalAppendResult>
189 where
190 F: FnMut(WalAppendContext) -> WalRecordPayload,
191 {
192 let preview_ctx = WalAppendContext {
193 start_lsn: 0,
194 end_lsn: WAL_HEADER_LEN as u64 + WAL_CRC_LEN as u64,
195 };
196 let preview_payload = build(preview_ctx);
197 let (_, _, preview_body) = encode_body(&preview_payload);
198 let preview_frame_len = WAL_HEADER_LEN + preview_body.len() + WAL_CRC_LEN;
199
200 let mut guard = self.state.lock();
201 let prev_start = guard.last_record_start;
202 let start_lsn = self
203 .next_lsn
204 .fetch_add(preview_frame_len as u64, Ordering::SeqCst);
205 let end_lsn_preview = start_lsn + preview_frame_len as u64;
206
207 let preview_ctx = WalAppendContext {
208 start_lsn,
209 end_lsn: end_lsn_preview,
210 };
211 let payload = build(preview_ctx);
212 let frame_bytes = encode_frame(start_lsn, prev_start, &payload);
213 let frame_len = frame_bytes.len();
214 debug_assert_eq!(frame_len, preview_frame_len);
215 let end_lsn = start_lsn + frame_len as u64;
216 let encoded = Bytes::from(frame_bytes);
217
218 guard.buffer.push(WalRecord {
219 start_lsn,
220 end_lsn,
221 payload: encoded,
222 });
223 guard.buffer_bytes = guard.buffer_bytes.saturating_add(frame_len);
224 guard.last_record_start = start_lsn;
225
226 let should_flush = guard.buffer.len() >= self.max_buffer_records
227 || guard.buffer_bytes >= self.flush_coalesce_bytes
228 || guard.buffer_bytes >= WAL_PAGE_SIZE;
229 drop(guard);
230
231 self.last_record_start.store(start_lsn, Ordering::Release);
232
233 if should_flush {
234 self.flush(Some(end_lsn))?;
235 }
236 Ok(WalAppendResult { start_lsn, end_lsn })
237 }
238
239 pub fn log_page_update(
242 &self,
243 page_id: PageId,
244 prev_page_lsn: Lsn,
245 old_image: &[u8],
246 new_image: &[u8],
247 ) -> QuillSQLResult<Option<WalAppendResult>> {
248 if old_image.len() != new_image.len() || old_image.len() != PAGE_SIZE {
249 return Err(QuillSQLError::Internal(format!(
250 "page {} image size mismatch: old={}, new={}",
251 page_id,
252 old_image.len(),
253 new_image.len()
254 )));
255 }
256
257 let Some((start, end)) = find_contiguous_diff(old_image, new_image) else {
258 return Ok(None);
259 };
260
261 let delta_threshold = PAGE_SIZE / 16;
262 let first_touch = self.fpw_first_touch(page_id);
263 if first_touch || (end - start) > delta_threshold {
264 let page_image = new_image.to_vec();
265 let result = self.append_record_with(|_| {
266 WalRecordPayload::PageWrite(PageWritePayload {
267 page_id,
268 prev_page_lsn,
269 page_image: page_image.clone(),
270 })
271 })?;
272 return Ok(Some(result));
273 }
274
275 let diff = new_image[start..end].to_vec();
276 let result = self.append_record_with(|_| {
277 WalRecordPayload::PageDelta(PageDeltaPayload {
278 page_id,
279 prev_page_lsn,
280 offset: start as u16,
281 data: diff.clone(),
282 })
283 })?;
284 Ok(Some(result))
285 }
286
287 pub fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
288 let redo_start = payload
289 .dpt
290 .iter()
291 .map(|(_, lsn)| *lsn)
292 .min()
293 .unwrap_or(payload.last_lsn);
294 let result = self.append_record_with(|_| WalRecordPayload::Checkpoint(payload.clone()))?;
295 self.last_checkpoint
296 .store(result.start_lsn, Ordering::Release);
297 self.checkpoint_redo_start
298 .store(redo_start, Ordering::Release);
299 self.flush(Some(result.end_lsn))?;
300 self.touched_pages.clear();
302 if let Some(ctrl) = &self.control_file {
303 ctrl.update(
304 self.durable_lsn(),
305 self.max_assigned_lsn(),
306 self.last_checkpoint_lsn(),
307 self.last_record_start.load(Ordering::Acquire),
308 self.checkpoint_redo_start.load(Ordering::Acquire),
309 )?;
310 }
311 Ok(result.end_lsn)
312 }
313
314 #[inline]
315 pub fn last_checkpoint_lsn(&self) -> Lsn {
316 self.last_checkpoint.load(Ordering::Acquire)
317 }
318
319 pub fn start_background_flush(
320 self: &Arc<Self>,
321 interval: Duration,
322 ) -> QuillSQLResult<Option<WalWriterHandle>> {
323 if interval.is_zero() {
324 return Ok(None);
325 }
326 let mut guard = self.writer.lock();
327 if guard.is_some() {
328 return Ok(None);
329 }
330 let runtime = WalWriterRuntime::spawn(Arc::downgrade(self), interval)?;
331 *guard = Some(runtime);
332 Ok(Some(WalWriterHandle::new(Arc::clone(self))))
333 }
334
335 fn stop_background_writer(&self) -> QuillSQLResult<()> {
336 if let Some(runtime) = self.writer.lock().take() {
337 runtime.stop()?;
338 }
339 Ok(())
340 }
341
342 pub fn flush(&self, target: Option<Lsn>) -> QuillSQLResult<Lsn> {
343 let recycle_lsn = self.last_checkpoint.load(Ordering::Acquire);
344
345 let (desired, tickets) = {
346 let mut guard = self.state.lock();
347 let current_durable = self.durable_lsn();
348 let highest_buffered = guard
349 .buffer
350 .last()
351 .map(|r| r.end_lsn)
352 .unwrap_or(current_durable);
353 let mut desired = target.filter(|lsn| *lsn != 0).unwrap_or(highest_buffered);
354 desired = cmp::min(self.max_assigned_lsn(), desired);
355
356 if desired <= current_durable {
357 guard.storage.recycle_segments(recycle_lsn)?;
358 drop(guard);
359 return Ok(current_durable);
360 }
361
362 let flush_count = guard
363 .buffer
364 .iter()
365 .take_while(|record| record.end_lsn <= desired)
366 .count();
367
368 if flush_count > 0 {
369 let to_flush: Vec<WalRecord> = guard.buffer[..flush_count].to_vec();
370 let flushed_bytes: usize = to_flush.iter().map(|r| r.encoded_len() as usize).sum();
371 guard.storage.append_records(&to_flush)?;
372 guard.storage.flush()?;
373 guard.buffer.drain(..flush_count);
374 guard.buffer_bytes = guard.buffer_bytes.saturating_sub(flushed_bytes);
375 }
376
377 guard.storage.recycle_segments(recycle_lsn)?;
378 let ready = guard.storage.take_ready(desired);
379 (desired, ready)
380 };
381
382 self.wait_for_flush_tickets(tickets)?;
383 self.durable_lsn.store(desired, Ordering::Release);
384 self.persist_control_file()?;
385 self.flush_cond.notify_all();
386 Ok(desired)
387 }
388
389 pub fn pending_records(&self) -> Vec<WalRecord> {
390 self.state.lock().buffer.clone()
391 }
392
393 fn default_scheduler(config: &WalConfig) -> QuillSQLResult<Arc<DiskScheduler>> {
394 fs::create_dir_all(&config.directory)?;
395 let db_path = config.directory.join("wal_scheduler.db");
396 let disk_manager = Arc::new(DiskManager::try_new(&db_path)?);
397 Ok(Arc::new(DiskScheduler::new(disk_manager)))
398 }
399
400 fn wait_for_flush_tickets(&self, tickets: Vec<WalFlushTicket>) -> QuillSQLResult<()> {
401 for ticket in tickets {
402 match ticket.receiver.recv() {
403 Ok(Ok(())) => {}
404 Ok(Err(err)) => return Err(err),
405 Err(err) => {
406 return Err(QuillSQLError::Internal(format!(
407 "WAL flush completion dropped: {}",
408 err
409 )))
410 }
411 }
412 }
413 Ok(())
414 }
415
416 fn persist_control_file(&self) -> QuillSQLResult<()> {
417 if let Some(ctrl) = &self.control_file {
418 ctrl.update(
419 self.durable_lsn(),
420 self.max_assigned_lsn(),
421 self.last_checkpoint_lsn(),
422 self.last_record_start.load(Ordering::Acquire),
423 self.checkpoint_redo_start.load(Ordering::Acquire),
424 )?;
425 }
426 Ok(())
427 }
428
429 pub fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
430 if target == 0 {
431 return Ok(self.durable_lsn());
432 }
433 if self.durable_lsn() >= target {
434 return Ok(self.durable_lsn());
435 }
436 self.flush(Some(target))
437 }
438
439 pub fn wait_for_durable(&self, target: Lsn) -> QuillSQLResult<()> {
440 if target == 0 {
441 return Ok(());
442 }
443 if self.durable_lsn() >= target {
444 return Ok(());
445 }
446 self.flush(Some(target))?;
447 if self.durable_lsn() >= target {
448 return Ok(());
449 }
450 let mut guard = self.flush_lock.lock();
451 while self.durable_lsn() < target {
452 self.flush_cond.wait(&mut guard);
453 }
454 Ok(())
455 }
456
457 pub fn reader(&self) -> QuillSQLResult<WalReader> {
458 let directory = self.state.lock().storage.directory.clone();
459 WalReader::new(directory)
460 }
461
462 pub fn fpw_first_touch(&self, page_id: PageId) -> bool {
464 self.touched_pages.insert(page_id)
465 }
466
467 pub fn control_file(&self) -> Option<Arc<ControlFileManager>> {
469 self.control_file.as_ref().map(Arc::clone)
470 }
471}
472
473impl fmt::Debug for WalManager {
474 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475 f.debug_struct("WalManager")
476 .field("next_lsn", &self.next_lsn.load(Ordering::Relaxed))
477 .field("durable_lsn", &self.durable_lsn.load(Ordering::Relaxed))
478 .field("max_buffer_records", &self.max_buffer_records)
479 .field("flush_coalesce_bytes", &self.flush_coalesce_bytes)
480 .finish()
481 }
482}
483
484#[derive(Clone)]
485struct WalSegmentInfo {
486 id: u64,
487 size: u64,
488}
489
490struct WalStorage {
491 directory: PathBuf,
492 segment_size: u64,
493 sync_on_flush: bool,
494 current_segment: WalSegmentInfo,
495 scheduler: Arc<DiskScheduler>,
496 retain_segments: usize,
497 open_page: Option<WalPage>,
498 last_page_end_lsn: Lsn,
499 last_sync_request: Lsn,
500 pending: VecDeque<WalFlushTicket>,
501}
502
503struct WalFlushTicket {
504 lsn: Lsn,
505 receiver: DiskCommandResultReceiver<()>,
506}
507
508impl WalStorage {
509 fn new(config: WalConfig, scheduler: Arc<DiskScheduler>) -> QuillSQLResult<Self> {
510 fs::create_dir_all(&config.directory)?;
511 let segment = Self::discover_latest_segment(&config.directory, config.segment_size)?;
512 Ok(Self {
513 directory: config.directory,
514 segment_size: config.segment_size,
515 sync_on_flush: config.sync_on_flush,
516 current_segment: segment,
517 scheduler,
518 retain_segments: config.retain_segments.max(1),
519 open_page: None,
520 last_page_end_lsn: 0,
521 last_sync_request: 0,
522 pending: VecDeque::new(),
523 })
524 }
525
526 fn recover_offsets(&mut self) -> QuillSQLResult<(Lsn, Lsn)> {
527 let segments = list_segments(&self.directory)?;
528 let mut next_lsn = 0u64;
529 let mut last_record_start = 0u64;
530 let mut pending_fragment = Vec::new();
531 let mut physical_offset = 0u64;
532
533 for segment_id in segments {
534 let path = segment_path(&self.directory, segment_id);
535 let file = match fs::File::open(&path) {
536 Ok(f) => f,
537 Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
538 Err(e) => return Err(QuillSQLError::Io(e)),
539 };
540 let metadata = file.metadata()?;
541 let mut reader = io::BufReader::new(file);
542 let segment_base = (segment_id.saturating_sub(1)) * self.segment_size;
543 let mut segment_consumed = 0u64;
544 pending_fragment.clear();
545
546 while segment_consumed + WAL_PAGE_SIZE as u64 <= metadata.len() {
547 let mut page_buf = vec![0u8; WAL_PAGE_SIZE];
548 if let Err(err) = reader.read_exact(&mut page_buf) {
549 if err.kind() == io::ErrorKind::UnexpectedEof {
550 break;
551 }
552 return Err(QuillSQLError::Io(err));
553 }
554
555 let page = match WalPage::unpack_frames(&page_buf) {
556 Ok(page) => page,
557 Err(QuillSQLError::Internal(message))
558 if message.contains("truncated")
559 || message.contains("CRC mismatch")
560 || message.contains("Invalid WAL page magic") =>
561 {
562 break;
563 }
564 Err(err) => return Err(err),
565 };
566
567 if !page.has_payload() {
568 break;
569 }
570
571 let mut stop_segment = false;
572 for slot in page.fragments() {
573 let start = slot.offset as usize;
574 let end = start + slot.len as usize;
575 let fragment = &page.payload()[start..end];
576 match slot.kind {
577 WalPageFragmentKind::Complete => {
578 pending_fragment.clear();
579 match decode_frame(fragment) {
580 Ok((frame, _)) => {
581 let frame_len = fragment.len() as u64;
582 last_record_start = frame.lsn;
583 next_lsn = frame.lsn.saturating_add(frame_len);
584 }
585 Err(QuillSQLError::Internal(message))
586 if message.contains("too short")
587 || message.contains("truncated")
588 || message.contains("CRC mismatch") =>
589 {
590 stop_segment = true;
591 pending_fragment.clear();
592 break;
593 }
594 Err(err) => return Err(err),
595 }
596 }
597 WalPageFragmentKind::Start => {
598 pending_fragment.clear();
599 pending_fragment.extend_from_slice(fragment);
600 }
601 WalPageFragmentKind::Middle => {
602 if pending_fragment.is_empty() {
603 stop_segment = true;
604 pending_fragment.clear();
605 break;
606 }
607 pending_fragment.extend_from_slice(fragment);
608 }
609 WalPageFragmentKind::End => {
610 if pending_fragment.is_empty() {
611 stop_segment = true;
612 pending_fragment.clear();
613 break;
614 }
615 pending_fragment.extend_from_slice(fragment);
616 let frame_bytes = std::mem::take(&mut pending_fragment);
617 match decode_frame(&frame_bytes) {
618 Ok((frame, _)) => {
619 let frame_len = frame_bytes.len() as u64;
620 last_record_start = frame.lsn;
621 next_lsn = frame.lsn.saturating_add(frame_len);
622 }
623 Err(QuillSQLError::Internal(message))
624 if message.contains("too short")
625 || message.contains("truncated")
626 || message.contains("CRC mismatch") =>
627 {
628 stop_segment = true;
629 pending_fragment.clear();
630 break;
631 }
632 Err(err) => return Err(err),
633 }
634 }
635 }
636 }
637
638 if stop_segment {
639 break;
640 }
641 segment_consumed += WAL_PAGE_SIZE as u64;
642 }
643
644 physical_offset = segment_base + segment_consumed;
645 }
646
647 self.last_page_end_lsn = next_lsn;
648 self.last_sync_request = next_lsn;
649 let segment_index = physical_offset / self.segment_size;
650 self.current_segment = WalSegmentInfo {
651 id: segment_index + 1,
652 size: physical_offset % self.segment_size,
653 };
654
655 Ok((next_lsn, last_record_start))
656 }
657
658 fn recycle_segments(&mut self, keep_from_lsn: Lsn) -> QuillSQLResult<()> {
659 if self.retain_segments == 0 {
660 return Ok(());
661 }
662 if keep_from_lsn == 0 {
663 return Ok(());
664 }
665 let keep_segment_id = 1 + (keep_from_lsn / self.segment_size);
666 let min_keep = keep_segment_id
667 .saturating_sub(self.retain_segments as u64)
668 .max(1);
669 let segments = list_segments(&self.directory)?;
670 for id in segments {
671 if id < min_keep && id < self.current_segment.id {
672 let path = segment_path(&self.directory, id);
673 let _ = fs::remove_file(&path);
674 }
675 }
676 Ok(())
677 }
678
679 fn append_records(&mut self, records: &[WalRecord]) -> std::io::Result<()> {
680 let mut queue: Vec<WalRecord> = records.iter().cloned().collect();
681 let mut carry: Option<WalFrameContinuation> = None;
682
683 if let Some(page) = self.open_page.take() {
684 let (page, leftover, next_carry) = page.continue_pack(queue);
685 queue = leftover;
686 carry = next_carry;
687
688 if page.is_full() {
689 self.write_page(&page)?;
690 } else {
691 if let Some(lsn) = page.last_end_lsn() {
692 self.last_page_end_lsn = lsn;
693 }
694 self.open_page = Some(page);
695 }
696 }
697
698 while !queue.is_empty() || carry.is_some() {
699 let (page, leftover, next_carry) =
700 WalPage::pack_frames(self.last_page_end_lsn, queue, carry);
701 queue = leftover;
702 carry = next_carry;
703
704 if !page.has_payload() {
705 break;
706 }
707
708 if page.is_full() {
709 self.write_page(&page)?;
710 } else {
711 if let Some(lsn) = page.last_end_lsn() {
712 self.last_page_end_lsn = lsn;
713 }
714 self.open_page = Some(page);
715 break;
716 }
717 }
718
719 debug_assert!(carry.is_none());
720
721 Ok(())
722 }
723
724 fn rotate_segment(&mut self) -> std::io::Result<()> {
725 self.flush()?;
726 self.current_segment = WalSegmentInfo {
727 id: self.current_segment.id + 1,
728 size: 0,
729 };
730 Ok(())
731 }
732
733 fn flush(&mut self) -> std::io::Result<()> {
734 if let Some(page) = self.open_page.take() {
735 self.write_page(&page)?;
736 }
737 if self.sync_on_flush && self.last_page_end_lsn > self.last_sync_request {
738 if let Some(receiver) =
739 self.write_bytes(self.current_segment.size, Bytes::new(), true)?
740 {
741 self.last_sync_request = self.last_page_end_lsn;
742 self.pending.push_back(WalFlushTicket {
743 lsn: self.last_sync_request,
744 receiver,
745 });
746 }
747 }
748 Ok(())
749 }
750
751 fn take_ready(&mut self, upto: Lsn) -> Vec<WalFlushTicket> {
752 let mut ready = Vec::new();
753 while let Some(front) = self.pending.front() {
754 if front.lsn <= upto {
755 if let Some(ticket) = self.pending.pop_front() {
756 ready.push(ticket);
757 }
758 } else {
759 break;
760 }
761 }
762 ready
763 }
764
765 fn write_page(&mut self, page: &WalPage) -> std::io::Result<()> {
766 if !page.has_payload() {
767 return Ok(());
768 }
769 if self.current_segment.size + WAL_PAGE_SIZE as u64 > self.segment_size {
770 self.rotate_segment()?;
771 }
772 let offset = self.current_segment.size;
773 let bytes = Bytes::from(page.to_bytes());
774 let receiver = self.write_bytes(offset, bytes, false)?;
775 self.current_segment.size += WAL_PAGE_SIZE as u64;
776 if let Some(lsn) = page.last_end_lsn() {
777 self.last_page_end_lsn = lsn;
778 }
779 let ticket_lsn = self.last_page_end_lsn;
780 if let Some(receiver) = receiver {
781 self.pending.push_back(WalFlushTicket {
782 lsn: ticket_lsn,
783 receiver,
784 });
785 }
786 Ok(())
787 }
788
789 fn discover_latest_segment(
790 directory: &Path,
791 segment_size: u64,
792 ) -> std::io::Result<WalSegmentInfo> {
793 if !directory.exists() {
794 return Ok(WalSegmentInfo { id: 1, size: 0 });
795 }
796
797 let mut segments = Vec::new();
798 for entry in fs::read_dir(directory)? {
799 let entry = entry?;
800 if !entry.file_type()?.is_file() {
801 continue;
802 }
803 let name = entry.file_name();
804 let name = name.to_string_lossy();
805 if let Some(id) = parse_segment_id(&name) {
806 let size = entry.metadata()?.len();
807 segments.push(WalSegmentInfo { id, size });
808 }
809 }
810
811 if segments.is_empty() {
812 return Ok(WalSegmentInfo { id: 1, size: 0 });
813 }
814
815 segments.sort_by_key(|info| info.id);
816 let mut latest = segments.pop().unwrap();
817
818 if latest.size >= segment_size {
819 latest = WalSegmentInfo {
820 id: latest.id + 1,
821 size: 0,
822 };
823 }
824
825 Ok(latest)
826 }
827
828 fn write_bytes(
829 &self,
830 offset: u64,
831 data: Bytes,
832 sync: bool,
833 ) -> std::io::Result<Option<DiskCommandResultReceiver<()>>> {
834 if data.is_empty() && !sync {
835 return Ok(None);
836 }
837 let path = segment_path(&self.directory, self.current_segment.id);
838 if data.is_empty() {
839 let receiver = self
840 .scheduler
841 .schedule_wal_fsync(path)
842 .map_err(|e| io_error(&e))?;
843 Ok(Some(receiver))
844 } else {
845 let receiver = self
846 .scheduler
847 .schedule_wal_write(path, offset, data, sync)
848 .map_err(|e| io_error(&e))?;
849 Ok(Some(receiver))
850 }
851 }
852}
853
854impl std::fmt::Display for WalStorage {
855 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
856 write!(
857 f,
858 "segment={} bytes={} segment_size={} dir={}",
859 self.current_segment.id,
860 self.current_segment.size,
861 self.segment_size,
862 self.directory.display()
863 )
864 }
865}
866
867impl WalRecord {
868 fn encoded_len(&self) -> u64 {
869 self.end_lsn.saturating_sub(self.start_lsn)
870 }
871}
872
873pub struct WalReader {
874 directory: PathBuf,
875 segments: Vec<u64>,
876 current_idx: usize,
877 cursor: Option<SegmentCursor>,
878}
879
880impl WalReader {
881 pub fn new(directory: PathBuf) -> QuillSQLResult<Self> {
882 let mut segments = list_segments(&directory)?;
883 segments.sort_unstable();
884 Ok(Self {
885 directory,
886 segments,
887 current_idx: 0,
888 cursor: None,
889 })
890 }
891
892 pub fn next_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
893 loop {
894 if self.cursor.is_none() && !self.open_next_segment()? {
895 return Ok(None);
896 }
897
898 if let Some(cursor) = self.cursor.as_mut() {
899 match cursor.read_frame()? {
900 Some(frame) => return Ok(Some(frame)),
901 None => {
902 self.cursor = None;
903 continue;
904 }
905 }
906 } else {
907 return Ok(None);
908 }
909 }
910 }
911
912 fn open_next_segment(&mut self) -> QuillSQLResult<bool> {
913 let Some(segment_id) = self.segments.get(self.current_idx).copied() else {
914 return Ok(false);
915 };
916 let path = segment_path(&self.directory, segment_id);
917 let cursor = SegmentCursor::open(segment_id, &path)?;
918 self.cursor = Some(cursor);
919 self.current_idx += 1;
920 Ok(true)
921 }
922}
923
924struct SegmentCursor {
925 len: u64,
926 offset: u64,
927 reader: BufReader<File>,
928 fragment_buf: Vec<u8>,
929 ready_frames: VecDeque<WalFrame>,
930}
931
932impl SegmentCursor {
933 fn open(_id: u64, path: &Path) -> QuillSQLResult<Self> {
934 let file = File::open(path)?;
935 let len = file.metadata()?.len();
936 Ok(Self {
937 len,
938 offset: 0,
939 reader: BufReader::new(file),
940 fragment_buf: Vec::new(),
941 ready_frames: VecDeque::new(),
942 })
943 }
944
945 fn read_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
946 loop {
947 if let Some(frame) = self.ready_frames.pop_front() {
948 return Ok(Some(frame));
949 }
950
951 if self.offset >= self.len {
952 return Ok(None);
953 }
954
955 let mut page_buf = vec![0u8; WAL_PAGE_SIZE];
956 if let Err(err) = self.reader.read_exact(&mut page_buf) {
957 return match err.kind() {
958 ErrorKind::UnexpectedEof => {
959 self.offset = self.len;
960 Ok(None)
961 }
962 _ => Err(QuillSQLError::Io(err)),
963 };
964 }
965 self.offset = self.offset.saturating_add(WAL_PAGE_SIZE as u64);
966
967 match WalPage::unpack_frames(&page_buf) {
968 Ok(page) => {
969 if !page.has_payload() {
970 self.offset = self.len;
971 return Ok(None);
972 }
973 for slot in page.fragments() {
974 let start = slot.offset as usize;
975 let end = start + slot.len as usize;
976 let fragment = &page.payload()[start..end];
977 match slot.kind {
978 WalPageFragmentKind::Complete => {
979 self.fragment_buf.clear();
980 match decode_frame(fragment) {
981 Ok((frame, _)) => self.ready_frames.push_back(frame),
982 Err(QuillSQLError::Internal(message))
983 if message.contains("too short")
984 || message.contains("truncated")
985 || message.contains("CRC mismatch") =>
986 {
987 self.offset = self.len;
988 return Ok(None);
989 }
990 Err(err) => return Err(err),
991 }
992 }
993 WalPageFragmentKind::Start => {
994 self.fragment_buf.clear();
995 self.fragment_buf.extend_from_slice(fragment);
996 }
997 WalPageFragmentKind::Middle => {
998 if self.fragment_buf.is_empty() {
999 self.offset = self.len;
1000 return Ok(None);
1001 }
1002 self.fragment_buf.extend_from_slice(fragment);
1003 }
1004 WalPageFragmentKind::End => {
1005 if self.fragment_buf.is_empty() {
1006 self.offset = self.len;
1007 return Ok(None);
1008 }
1009 self.fragment_buf.extend_from_slice(fragment);
1010 let frame_bytes = std::mem::take(&mut self.fragment_buf);
1011 match decode_frame(&frame_bytes) {
1012 Ok((frame, _)) => self.ready_frames.push_back(frame),
1013 Err(QuillSQLError::Internal(message))
1014 if message.contains("too short")
1015 || message.contains("truncated")
1016 || message.contains("CRC mismatch") =>
1017 {
1018 self.offset = self.len;
1019 return Ok(None);
1020 }
1021 Err(err) => return Err(err),
1022 }
1023 }
1024 }
1025 }
1026 }
1027 Err(QuillSQLError::Internal(message))
1028 if message.contains("truncated")
1029 || message.contains("CRC mismatch")
1030 || message.contains("Invalid WAL page magic") =>
1031 {
1032 self.offset = self.len;
1033 return Ok(None);
1034 }
1035 Err(err) => return Err(err),
1036 }
1037 }
1038 }
1039}
1040
1041impl std::fmt::Debug for WalStorage {
1042 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1043 f.debug_struct("WalStorage")
1044 .field("directory", &self.directory)
1045 .field("segment_size", &self.segment_size)
1046 .field("sync_on_flush", &self.sync_on_flush)
1047 .field("current_segment", &self.current_segment)
1048 .finish()
1049 }
1050}
1051
1052impl std::fmt::Debug for WalSegmentInfo {
1053 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1054 f.debug_struct("WalSegmentInfo")
1055 .field("id", &self.id)
1056 .field("size", &self.size)
1057 .finish()
1058 }
1059}
1060
1061fn segment_path(directory: &Path, id: u64) -> PathBuf {
1062 directory.join(format!("wal_{:016X}.log", id))
1063}
1064
1065fn parse_segment_id(name: &str) -> Option<u64> {
1066 let name = name.strip_prefix("wal_")?;
1067 let name = name.strip_suffix(".log")?;
1068 u64::from_str_radix(name, 16).ok()
1069}
1070
1071fn list_segments(directory: &Path) -> QuillSQLResult<Vec<u64>> {
1072 if !directory.exists() {
1073 return Ok(Vec::new());
1074 }
1075 let mut ids = Vec::new();
1076 for entry in fs::read_dir(directory)? {
1077 let entry = entry?;
1078 if !entry.file_type()?.is_file() {
1079 continue;
1080 }
1081 let name = entry.file_name();
1082 let name = name.to_string_lossy();
1083 if let Some(id) = parse_segment_id(&name) {
1084 ids.push(id);
1085 }
1086 }
1087 ids.sort_unstable();
1088 Ok(ids)
1089}
1090
1091fn io_error(err: &QuillSQLError) -> std::io::Error {
1092 std::io::Error::other(err.to_string())
1093}
1094
1095#[derive(Debug)]
1096struct WalWriterRuntime {
1097 stop_flag: Arc<AtomicBool>,
1098 thread: thread::JoinHandle<()>,
1099}
1100
1101impl WalWriterRuntime {
1102 fn spawn(target: Weak<WalManager>, interval: Duration) -> QuillSQLResult<Self> {
1103 let stop_flag = Arc::new(AtomicBool::new(false));
1104 let thread_stop = stop_flag.clone();
1105 let handle = thread::Builder::new()
1106 .name("walwriter".into())
1107 .spawn(move || {
1108 while !thread_stop.load(Ordering::Relaxed) {
1109 if let Some(manager) = target.upgrade() {
1110 let _ = manager.flush(None);
1111 } else {
1112 break;
1113 }
1114 thread::sleep(interval);
1115 }
1116 if let Some(manager) = target.upgrade() {
1117 let _ = manager.flush(None);
1118 }
1119 })
1120 .map_err(|e| QuillSQLError::Internal(format!("Failed to spawn walwriter: {}", e)))?;
1121 Ok(Self {
1122 stop_flag,
1123 thread: handle,
1124 })
1125 }
1126
1127 fn stop(self) -> QuillSQLResult<()> {
1128 self.stop_flag.store(true, Ordering::Release);
1129 self.thread
1130 .join()
1131 .map_err(|_| QuillSQLError::Internal("walwriter thread panicked".to_string()))
1132 }
1133}
1134
1135impl Drop for WalManager {
1136 fn drop(&mut self) {
1137 if let Some(runtime) = self.writer.lock().take() {
1138 let _ = runtime.stop();
1139 }
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use crate::buffer::PAGE_SIZE;
1146 use crate::config::WalConfig;
1147 use crate::recovery::wal::codec::decode_payload;
1148 use crate::recovery::wal_record::{
1149 CheckpointPayload, ResourceManagerId, TransactionPayload, TransactionRecordKind,
1150 WalRecordPayload,
1151 };
1152 use crate::storage::disk_manager::DiskManager;
1153 use crate::storage::disk_scheduler::DiskScheduler;
1154 use std::sync::Arc;
1155 use tempfile::TempDir;
1156
1157 use super::WalManager;
1158
1159 fn build_scheduler(tmp: &TempDir) -> Arc<DiskScheduler> {
1160 let db_path = tmp.path().join("wal_test.db");
1161 let disk_manager = Arc::new(DiskManager::try_new(&db_path).expect("disk manager"));
1162 Arc::new(DiskScheduler::new(disk_manager))
1163 }
1164
1165 fn build_wal_manager_with_config(tmp: &TempDir, mut config: WalConfig) -> WalManager {
1166 config.directory = tmp.path().join("wal");
1167 let scheduler = build_scheduler(tmp);
1168 WalManager::new_with_scheduler(config, None, None, scheduler).expect("wal manager")
1169 }
1170
1171 fn build_wal_manager(tmp: &TempDir) -> WalManager {
1172 build_wal_manager_with_config(tmp, WalConfig::default())
1173 }
1174
1175 #[test]
1176 fn append_and_flush_tracks_durable_lsn() {
1177 let tmp = TempDir::new().expect("tempdir");
1178 let wal = build_wal_manager(&tmp);
1179 let l1 = wal
1180 .append_record_with(|_| {
1181 WalRecordPayload::Transaction(TransactionPayload {
1182 marker: TransactionRecordKind::Begin,
1183 txn_id: 1,
1184 })
1185 })
1186 .expect("append record")
1187 .end_lsn;
1188 let l2 = wal
1189 .append_record_with(|_| {
1190 WalRecordPayload::Transaction(TransactionPayload {
1191 marker: TransactionRecordKind::Commit,
1192 txn_id: 1,
1193 })
1194 })
1195 .expect("append record")
1196 .end_lsn;
1197 assert!(l2 > l1);
1198
1199 assert_eq!(wal.durable_lsn(), 0);
1200 let flushed = wal.flush(None).unwrap();
1201 assert_eq!(flushed, l2);
1202 assert_eq!(wal.durable_lsn(), l2);
1203 assert!(wal.pending_records().is_empty());
1204 }
1205
1206 #[test]
1207 fn wal_reader_reads_back_frames() {
1208 let tmp = TempDir::new().expect("tempdir");
1209 let wal = build_wal_manager(&tmp);
1210 let expected: Vec<_> = (0..4)
1211 .map(|txn| {
1212 wal.append_record_with(|_| {
1213 WalRecordPayload::Transaction(TransactionPayload {
1214 marker: TransactionRecordKind::Begin,
1215 txn_id: txn,
1216 })
1217 })
1218 .expect("append")
1219 .start_lsn
1220 })
1221 .collect();
1222 wal.flush(None).expect("flush");
1223
1224 let mut reader = wal.reader().expect("reader");
1225 let mut observed = Vec::new();
1226 while let Some(frame) = reader.next_frame().expect("frame") {
1227 observed.push(frame.lsn);
1228 }
1229 assert_eq!(observed, expected);
1230 }
1231
1232 #[test]
1233 fn wal_manager_bootstraps_existing_segments() {
1234 use TransactionRecordKind::*;
1235
1236 let tmp = TempDir::new().expect("tempdir");
1237 let wal_dir = tmp.path().join("wal");
1238
1239 let mut config = WalConfig::default();
1240 config.directory = wal_dir.clone();
1241
1242 let wal1 = build_wal_manager_with_config(&tmp, config.clone());
1243 let l1 = wal1
1244 .append_record_with(|_| {
1245 WalRecordPayload::Transaction(TransactionPayload {
1246 marker: Begin,
1247 txn_id: 1,
1248 })
1249 })
1250 .expect("append begin")
1251 .end_lsn;
1252 let l2 = wal1
1253 .append_record_with(|_| {
1254 WalRecordPayload::Transaction(TransactionPayload {
1255 marker: Commit,
1256 txn_id: 1,
1257 })
1258 })
1259 .expect("append commit")
1260 .end_lsn;
1261 assert!(l2 > l1);
1262 wal1.flush(None).expect("flush wal1");
1263 drop(wal1);
1264
1265 let wal2 = build_wal_manager_with_config(&tmp, config);
1266 assert_eq!(wal2.durable_lsn(), l2);
1267 assert_eq!(wal2.max_assigned_lsn(), l2);
1268
1269 let l3 = wal2
1270 .append_record_with(|_| {
1271 WalRecordPayload::Transaction(TransactionPayload {
1272 marker: Abort,
1273 txn_id: 2,
1274 })
1275 })
1276 .expect("append after reopen")
1277 .end_lsn;
1278 assert!(l3 > l2);
1279 wal2.flush(None).expect("flush wal2");
1280 assert_eq!(wal2.durable_lsn(), l3);
1281 }
1282
1283 #[test]
1284 fn flush_until_advances_in_order() {
1285 let tmp = TempDir::new().expect("tempdir");
1286 let wal = build_wal_manager(&tmp);
1287
1288 let first = wal
1289 .append_record_with(|_| {
1290 WalRecordPayload::Transaction(TransactionPayload {
1291 marker: TransactionRecordKind::Begin,
1292 txn_id: 42,
1293 })
1294 })
1295 .expect("append begin")
1296 .end_lsn;
1297
1298 let second = wal
1299 .append_record_with(|_| {
1300 WalRecordPayload::Transaction(TransactionPayload {
1301 marker: TransactionRecordKind::Commit,
1302 txn_id: 42,
1303 })
1304 })
1305 .expect("append commit")
1306 .end_lsn;
1307
1308 assert!(second > first);
1309 assert_eq!(wal.durable_lsn(), 0);
1310
1311 let dur = wal.flush_until(first).expect("flush first");
1312 assert_eq!(dur, first);
1313 assert_eq!(wal.durable_lsn(), first);
1314
1315 let dur = wal.flush_until(second).expect("flush second");
1316 assert_eq!(dur, second);
1317 assert_eq!(wal.durable_lsn(), second);
1318 }
1319
1320 #[test]
1321 fn wal_recycles_segments_after_checkpoint() {
1322 let tmp = TempDir::new().expect("tempdir");
1323 let mut config = WalConfig::default();
1324 config.directory = tmp.path().join("wal");
1325 config.segment_size = 64;
1326 config.retain_segments = 1;
1327 config.writer_interval_ms = None;
1328 let wal_dir = config.directory.clone();
1329 let retain_segments = config.retain_segments;
1330 let wal = WalManager::new(config, None, None).expect("wal manager");
1331
1332 for i in 0..256 {
1333 wal.append_record_with(|_| {
1334 WalRecordPayload::Transaction(TransactionPayload {
1335 marker: TransactionRecordKind::Begin,
1336 txn_id: i,
1337 })
1338 })
1339 .expect("append")
1340 .end_lsn;
1341 }
1342 wal.flush(None).expect("flush");
1343
1344 let last_lsn = wal.max_assigned_lsn();
1345 let payload = CheckpointPayload {
1346 last_lsn,
1347 dirty_pages: Vec::new(),
1348 active_transactions: Vec::new(),
1349 dpt: Vec::new(),
1350 };
1351 wal.log_checkpoint(payload).expect("checkpoint");
1352 wal.flush(None).expect("final flush");
1353
1354 drop(wal);
1355
1356 let mut remaining_ids: Vec<u64> = std::fs::read_dir(&wal_dir)
1357 .expect("wal dir")
1358 .filter_map(|entry| {
1359 entry.ok().and_then(|e| {
1360 let name = e.file_name();
1361 let name = name.to_string_lossy();
1362 super::parse_segment_id(&name)
1363 })
1364 })
1365 .collect();
1366
1367 remaining_ids.sort_unstable();
1368 let max_id = *remaining_ids.last().unwrap_or(&0);
1369 let min_keep = max_id.saturating_sub(retain_segments as u64).max(1);
1370
1371 assert!(
1372 remaining_ids.iter().all(|id| *id >= min_keep),
1373 "segments older than {} should be recycled, remaining {:?}",
1374 min_keep,
1375 remaining_ids
1376 );
1377 }
1378
1379 #[test]
1380 fn auto_flush_respects_buffer_limits() {
1381 let tmp = TempDir::new().expect("tempdir");
1382 let mut config = WalConfig::default();
1383 config.buffer_capacity = 1;
1384 config.flush_coalesce_bytes = 64;
1385 let wal = build_wal_manager_with_config(&tmp, config);
1386
1387 let lsn = wal
1388 .append_record_with(|_| {
1389 WalRecordPayload::Transaction(TransactionPayload {
1390 marker: TransactionRecordKind::Begin,
1391 txn_id: 42,
1392 })
1393 })
1394 .expect("append")
1395 .end_lsn;
1396
1397 assert!(wal.pending_records().is_empty());
1398 assert!(wal.durable_lsn() >= lsn);
1399 }
1400
1401 #[test]
1402 fn log_page_update_switches_between_full_and_delta() {
1403 let tmp = TempDir::new().expect("tempdir");
1404 let wal = build_wal_manager(&tmp);
1405 let base = vec![0u8; PAGE_SIZE];
1406
1407 let mut first_image = base.clone();
1408 first_image[0] = 1;
1409 let first = wal
1410 .log_page_update(5, 0, &base, &first_image)
1411 .expect("first update")
1412 .expect("should emit page write");
1413 wal.flush(None).expect("flush first");
1414 let mut reader = wal.reader().expect("reader");
1415 let first_frame = reader.next_frame().expect("frame").expect("frame");
1416 match decode_payload(&first_frame).expect("payload") {
1417 WalRecordPayload::PageWrite(payload) => {
1418 assert_eq!(payload.page_id, 5);
1419 }
1420 other => panic!("expected PageWrite, got {:?}", other),
1421 }
1422 assert!(reader.next_frame().expect("no extra").is_none());
1423
1424 let mut second_image = first_image.clone();
1425 second_image[128] = 77;
1426 let second = wal
1427 .log_page_update(5, first.end_lsn, &first_image, &second_image)
1428 .expect("second update")
1429 .expect("should emit delta");
1430 wal.flush(None).expect("flush second");
1431 let mut reader = wal.reader().expect("reader");
1432 let mut frames = Vec::new();
1433 while let Some(frame) = reader.next_frame().expect("frame") {
1434 frames.push(frame);
1435 }
1436 assert_eq!(frames.len(), 2);
1437 match decode_payload(&frames[1]).expect("payload") {
1438 WalRecordPayload::PageDelta(payload) => {
1439 assert_eq!(payload.page_id, 5);
1440 assert_eq!(payload.offset as usize, 128);
1441 assert_eq!(payload.data.len(), 1);
1442 assert_eq!(payload.data[0], 77);
1443 }
1444 other => panic!("expected PageDelta, got {:?}", other),
1445 }
1446
1447 let none = wal
1448 .log_page_update(5, second.end_lsn, &second_image, &second_image)
1449 .expect("noop update");
1450 assert!(none.is_none());
1451 wal.flush(None).expect("flush noop");
1452 let mut reader = wal.reader().expect("reader");
1453 let mut count = 0usize;
1454 while reader.next_frame().expect("frame").is_some() {
1455 count += 1;
1456 }
1457 assert_eq!(count, 2, "noop update should not add frames");
1458 }
1459
1460 #[test]
1461 fn log_checkpoint_persists_record() {
1462 let tmp = TempDir::new().expect("tempdir");
1463 let wal = build_wal_manager(&tmp);
1464
1465 let payload = CheckpointPayload {
1466 last_lsn: 7,
1467 dirty_pages: vec![1, 2, 3],
1468 active_transactions: vec![10, 11],
1469 dpt: Vec::new(),
1470 };
1471 let checkpoint_lsn = wal.log_checkpoint(payload.clone()).expect("checkpoint");
1472 assert!(wal.last_checkpoint_lsn() <= checkpoint_lsn);
1473 assert!(wal.durable_lsn() >= checkpoint_lsn);
1474
1475 let mut reader = wal.reader().expect("reader");
1476 let mut seen = Vec::new();
1477 while let Some(frame) = reader.next_frame().expect("frame") {
1478 seen.push(frame);
1479 }
1480 assert_eq!(seen.len(), 1);
1481 assert_eq!(seen[0].rmid, ResourceManagerId::Checkpoint);
1482 let observed = crate::recovery::wal_record::decode_checkpoint(&seen[0].body).unwrap();
1483 assert_eq!(observed.last_lsn, payload.last_lsn);
1484 assert_eq!(observed.dirty_pages, payload.dirty_pages);
1485 assert_eq!(observed.active_transactions, payload.active_transactions);
1486 }
1487}