quill_sql/recovery/
wal.rs

1pub mod codec;
2pub mod page;
3
4use parking_lot::{Condvar, Mutex};
5use std::cmp;
6use std::collections::VecDeque;
7use std::fmt;
8use std::fs::{self, File};
9use std::io::{self, BufReader, ErrorKind, Read};
10use std::path::{Path, PathBuf};
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::{Arc, Weak};
13use std::thread;
14use std::time::Duration;
15
16use crate::buffer::{PageId, PAGE_SIZE};
17use crate::config::WalConfig;
18use crate::error::{QuillSQLError, QuillSQLResult};
19use crate::recovery::control_file::{ControlFileManager, WalInitState};
20use crate::recovery::wal::codec::{
21    decode_frame, encode_body, encode_frame, CheckpointPayload, PageDeltaPayload, PageWritePayload,
22    WalFrame, WAL_CRC_LEN, WAL_HEADER_LEN,
23};
24use crate::recovery::wal::page::{
25    WalFrameContinuation, WalPage, WalPageFragmentKind, WAL_PAGE_SIZE,
26};
27use crate::recovery::wal_record::WalRecordPayload;
28use crate::storage::disk_manager::DiskManager;
29use crate::storage::disk_scheduler::{DiskCommandResultReceiver, DiskScheduler};
30use crate::utils::util::find_contiguous_diff;
31use bytes::Bytes;
32use dashmap::DashSet;
33
34pub type Lsn = u64;
35
36#[derive(Debug, Clone, Copy)]
37pub struct WalAppendContext {
38    pub start_lsn: Lsn,
39    pub end_lsn: Lsn,
40}
41
42#[derive(Debug, Clone, Copy)]
43pub struct WalAppendResult {
44    pub start_lsn: Lsn,
45    pub end_lsn: Lsn,
46}
47
48#[derive(Clone)]
49pub struct WalRecord {
50    pub start_lsn: Lsn,
51    pub end_lsn: Lsn,
52    pub payload: Bytes,
53}
54
55struct WalState {
56    buffer: Vec<WalRecord>,
57    buffer_bytes: usize,
58    storage: WalStorage,
59    last_record_start: Lsn,
60}
61
62impl std::fmt::Debug for WalState {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("WalState")
65            .field("buffer_len", &self.buffer.len())
66            .field("current_segment", &self.storage.current_segment)
67            .field("last_record_start", &self.last_record_start)
68            .finish()
69    }
70}
71
72pub struct WalManager {
73    next_lsn: AtomicU64,
74    durable_lsn: AtomicU64,
75    state: Mutex<WalState>,
76    writer: Mutex<Option<WalWriterRuntime>>,
77    max_buffer_records: usize,
78    flush_coalesce_bytes: usize,
79    last_checkpoint: AtomicU64,
80    last_record_start: AtomicU64,
81    control_file: Option<Arc<ControlFileManager>>,
82    flush_lock: Mutex<()>,
83    flush_cond: Condvar,
84    checkpoint_redo_start: AtomicU64,
85    touched_pages: DashSet<PageId>,
86}
87
88pub struct WalWriterHandle {
89    manager: Option<Arc<WalManager>>,
90}
91
92impl WalWriterHandle {
93    fn new(manager: Arc<WalManager>) -> Self {
94        Self {
95            manager: Some(manager),
96        }
97    }
98
99    pub fn stop(mut self) -> QuillSQLResult<()> {
100        if let Some(manager) = self.manager.take() {
101            manager.stop_background_writer()
102        } else {
103            Ok(())
104        }
105    }
106}
107
108impl Drop for WalWriterHandle {
109    fn drop(&mut self) {
110        if let Some(manager) = self.manager.take() {
111            let _ = manager.stop_background_writer();
112        }
113    }
114}
115
116impl WalManager {
117    pub fn new(
118        config: WalConfig,
119        init_state: Option<WalInitState>,
120        control_file: Option<Arc<ControlFileManager>>,
121    ) -> QuillSQLResult<Self> {
122        let scheduler = Self::default_scheduler(&config)?;
123        Self::new_with_scheduler(config, init_state, control_file, scheduler)
124    }
125
126    pub fn new_with_scheduler(
127        config: WalConfig,
128        init_state: Option<WalInitState>,
129        control_file: Option<Arc<ControlFileManager>>,
130        scheduler: Arc<DiskScheduler>,
131    ) -> QuillSQLResult<Self> {
132        let max_buffer_records = if config.buffer_capacity == 0 {
133            usize::MAX
134        } else {
135            config.buffer_capacity
136        };
137        let flush_bytes = if config.flush_coalesce_bytes == 0 {
138            usize::MAX
139        } else {
140            config.flush_coalesce_bytes
141        };
142        let mut storage = WalStorage::new(config, scheduler)?;
143        let (durable_ptr, next_ptr, checkpoint_ptr, last_record_start, redo_start) =
144            if let Some(state) = init_state {
145                (
146                    state.durable_lsn,
147                    state.max_assigned_lsn,
148                    state.last_checkpoint_lsn,
149                    state.last_record_start,
150                    state.checkpoint_redo_start,
151                )
152            } else {
153                let (next_offset, last_start) = storage.recover_offsets()?;
154                (next_offset, next_offset, last_start, last_start, 0)
155            };
156        Ok(Self {
157            next_lsn: AtomicU64::new(next_ptr),
158            durable_lsn: AtomicU64::new(durable_ptr),
159            state: Mutex::new(WalState {
160                buffer: Vec::new(),
161                buffer_bytes: 0,
162                storage,
163                last_record_start,
164            }),
165            writer: Mutex::new(None),
166            max_buffer_records,
167            flush_coalesce_bytes: flush_bytes,
168            last_checkpoint: AtomicU64::new(checkpoint_ptr),
169            last_record_start: AtomicU64::new(last_record_start),
170            control_file,
171            flush_lock: Mutex::new(()),
172            flush_cond: Condvar::new(),
173            checkpoint_redo_start: AtomicU64::new(redo_start),
174            touched_pages: DashSet::new(),
175        })
176    }
177
178    #[inline]
179    pub fn max_assigned_lsn(&self) -> Lsn {
180        self.next_lsn.load(Ordering::Acquire)
181    }
182
183    #[inline]
184    pub fn durable_lsn(&self) -> Lsn {
185        self.durable_lsn.load(Ordering::Acquire)
186    }
187
188    pub fn append_record_with<F>(&self, mut build: F) -> QuillSQLResult<WalAppendResult>
189    where
190        F: FnMut(WalAppendContext) -> WalRecordPayload,
191    {
192        let preview_ctx = WalAppendContext {
193            start_lsn: 0,
194            end_lsn: WAL_HEADER_LEN as u64 + WAL_CRC_LEN as u64,
195        };
196        let preview_payload = build(preview_ctx);
197        let (_, _, preview_body) = encode_body(&preview_payload);
198        let preview_frame_len = WAL_HEADER_LEN + preview_body.len() + WAL_CRC_LEN;
199
200        let mut guard = self.state.lock();
201        let prev_start = guard.last_record_start;
202        let start_lsn = self
203            .next_lsn
204            .fetch_add(preview_frame_len as u64, Ordering::SeqCst);
205        let end_lsn_preview = start_lsn + preview_frame_len as u64;
206
207        let preview_ctx = WalAppendContext {
208            start_lsn,
209            end_lsn: end_lsn_preview,
210        };
211        let payload = build(preview_ctx);
212        let frame_bytes = encode_frame(start_lsn, prev_start, &payload);
213        let frame_len = frame_bytes.len();
214        debug_assert_eq!(frame_len, preview_frame_len);
215        let end_lsn = start_lsn + frame_len as u64;
216        let encoded = Bytes::from(frame_bytes);
217
218        guard.buffer.push(WalRecord {
219            start_lsn,
220            end_lsn,
221            payload: encoded,
222        });
223        guard.buffer_bytes = guard.buffer_bytes.saturating_add(frame_len);
224        guard.last_record_start = start_lsn;
225
226        let should_flush = guard.buffer.len() >= self.max_buffer_records
227            || guard.buffer_bytes >= self.flush_coalesce_bytes
228            || guard.buffer_bytes >= WAL_PAGE_SIZE;
229        drop(guard);
230
231        self.last_record_start.store(start_lsn, Ordering::Release);
232
233        if should_flush {
234            self.flush(Some(end_lsn))?;
235        }
236        Ok(WalAppendResult { start_lsn, end_lsn })
237    }
238
239    /// Record a page update by automatically choosing between FPW and delta logging.
240    /// Returns `Ok(None)` if the new image matches the existing bytes (no-op).
241    pub fn log_page_update(
242        &self,
243        page_id: PageId,
244        prev_page_lsn: Lsn,
245        old_image: &[u8],
246        new_image: &[u8],
247    ) -> QuillSQLResult<Option<WalAppendResult>> {
248        if old_image.len() != new_image.len() || old_image.len() != PAGE_SIZE {
249            return Err(QuillSQLError::Internal(format!(
250                "page {} image size mismatch: old={}, new={}",
251                page_id,
252                old_image.len(),
253                new_image.len()
254            )));
255        }
256
257        let Some((start, end)) = find_contiguous_diff(old_image, new_image) else {
258            return Ok(None);
259        };
260
261        let delta_threshold = PAGE_SIZE / 16;
262        let first_touch = self.fpw_first_touch(page_id);
263        if first_touch || (end - start) > delta_threshold {
264            let page_image = new_image.to_vec();
265            let result = self.append_record_with(|_| {
266                WalRecordPayload::PageWrite(PageWritePayload {
267                    page_id,
268                    prev_page_lsn,
269                    page_image: page_image.clone(),
270                })
271            })?;
272            return Ok(Some(result));
273        }
274
275        let diff = new_image[start..end].to_vec();
276        let result = self.append_record_with(|_| {
277            WalRecordPayload::PageDelta(PageDeltaPayload {
278                page_id,
279                prev_page_lsn,
280                offset: start as u16,
281                data: diff.clone(),
282            })
283        })?;
284        Ok(Some(result))
285    }
286
287    pub fn log_checkpoint(&self, payload: CheckpointPayload) -> QuillSQLResult<Lsn> {
288        let redo_start = payload
289            .dpt
290            .iter()
291            .map(|(_, lsn)| *lsn)
292            .min()
293            .unwrap_or(payload.last_lsn);
294        let result = self.append_record_with(|_| WalRecordPayload::Checkpoint(payload.clone()))?;
295        self.last_checkpoint
296            .store(result.start_lsn, Ordering::Release);
297        self.checkpoint_redo_start
298            .store(redo_start, Ordering::Release);
299        self.flush(Some(result.end_lsn))?;
300        // Reset FPW epoch
301        self.touched_pages.clear();
302        if let Some(ctrl) = &self.control_file {
303            ctrl.update(
304                self.durable_lsn(),
305                self.max_assigned_lsn(),
306                self.last_checkpoint_lsn(),
307                self.last_record_start.load(Ordering::Acquire),
308                self.checkpoint_redo_start.load(Ordering::Acquire),
309            )?;
310        }
311        Ok(result.end_lsn)
312    }
313
314    #[inline]
315    pub fn last_checkpoint_lsn(&self) -> Lsn {
316        self.last_checkpoint.load(Ordering::Acquire)
317    }
318
319    pub fn start_background_flush(
320        self: &Arc<Self>,
321        interval: Duration,
322    ) -> QuillSQLResult<Option<WalWriterHandle>> {
323        if interval.is_zero() {
324            return Ok(None);
325        }
326        let mut guard = self.writer.lock();
327        if guard.is_some() {
328            return Ok(None);
329        }
330        let runtime = WalWriterRuntime::spawn(Arc::downgrade(self), interval)?;
331        *guard = Some(runtime);
332        Ok(Some(WalWriterHandle::new(Arc::clone(self))))
333    }
334
335    fn stop_background_writer(&self) -> QuillSQLResult<()> {
336        if let Some(runtime) = self.writer.lock().take() {
337            runtime.stop()?;
338        }
339        Ok(())
340    }
341
342    pub fn flush(&self, target: Option<Lsn>) -> QuillSQLResult<Lsn> {
343        let recycle_lsn = self.last_checkpoint.load(Ordering::Acquire);
344
345        let (desired, tickets) = {
346            let mut guard = self.state.lock();
347            let current_durable = self.durable_lsn();
348            let highest_buffered = guard
349                .buffer
350                .last()
351                .map(|r| r.end_lsn)
352                .unwrap_or(current_durable);
353            let mut desired = target.filter(|lsn| *lsn != 0).unwrap_or(highest_buffered);
354            desired = cmp::min(self.max_assigned_lsn(), desired);
355
356            if desired <= current_durable {
357                guard.storage.recycle_segments(recycle_lsn)?;
358                drop(guard);
359                return Ok(current_durable);
360            }
361
362            let flush_count = guard
363                .buffer
364                .iter()
365                .take_while(|record| record.end_lsn <= desired)
366                .count();
367
368            if flush_count > 0 {
369                let to_flush: Vec<WalRecord> = guard.buffer[..flush_count].to_vec();
370                let flushed_bytes: usize = to_flush.iter().map(|r| r.encoded_len() as usize).sum();
371                guard.storage.append_records(&to_flush)?;
372                guard.storage.flush()?;
373                guard.buffer.drain(..flush_count);
374                guard.buffer_bytes = guard.buffer_bytes.saturating_sub(flushed_bytes);
375            }
376
377            guard.storage.recycle_segments(recycle_lsn)?;
378            let ready = guard.storage.take_ready(desired);
379            (desired, ready)
380        };
381
382        self.wait_for_flush_tickets(tickets)?;
383        self.durable_lsn.store(desired, Ordering::Release);
384        self.persist_control_file()?;
385        self.flush_cond.notify_all();
386        Ok(desired)
387    }
388
389    pub fn pending_records(&self) -> Vec<WalRecord> {
390        self.state.lock().buffer.clone()
391    }
392
393    fn default_scheduler(config: &WalConfig) -> QuillSQLResult<Arc<DiskScheduler>> {
394        fs::create_dir_all(&config.directory)?;
395        let db_path = config.directory.join("wal_scheduler.db");
396        let disk_manager = Arc::new(DiskManager::try_new(&db_path)?);
397        Ok(Arc::new(DiskScheduler::new(disk_manager)))
398    }
399
400    fn wait_for_flush_tickets(&self, tickets: Vec<WalFlushTicket>) -> QuillSQLResult<()> {
401        for ticket in tickets {
402            match ticket.receiver.recv() {
403                Ok(Ok(())) => {}
404                Ok(Err(err)) => return Err(err),
405                Err(err) => {
406                    return Err(QuillSQLError::Internal(format!(
407                        "WAL flush completion dropped: {}",
408                        err
409                    )))
410                }
411            }
412        }
413        Ok(())
414    }
415
416    fn persist_control_file(&self) -> QuillSQLResult<()> {
417        if let Some(ctrl) = &self.control_file {
418            ctrl.update(
419                self.durable_lsn(),
420                self.max_assigned_lsn(),
421                self.last_checkpoint_lsn(),
422                self.last_record_start.load(Ordering::Acquire),
423                self.checkpoint_redo_start.load(Ordering::Acquire),
424            )?;
425        }
426        Ok(())
427    }
428
429    pub fn flush_until(&self, target: Lsn) -> QuillSQLResult<Lsn> {
430        if target == 0 {
431            return Ok(self.durable_lsn());
432        }
433        if self.durable_lsn() >= target {
434            return Ok(self.durable_lsn());
435        }
436        self.flush(Some(target))
437    }
438
439    pub fn wait_for_durable(&self, target: Lsn) -> QuillSQLResult<()> {
440        if target == 0 {
441            return Ok(());
442        }
443        if self.durable_lsn() >= target {
444            return Ok(());
445        }
446        self.flush(Some(target))?;
447        if self.durable_lsn() >= target {
448            return Ok(());
449        }
450        let mut guard = self.flush_lock.lock();
451        while self.durable_lsn() < target {
452            self.flush_cond.wait(&mut guard);
453        }
454        Ok(())
455    }
456
457    pub fn reader(&self) -> QuillSQLResult<WalReader> {
458        let directory = self.state.lock().storage.directory.clone();
459        WalReader::new(directory)
460    }
461
462    /// Returns true if this is the first touch of the page since last checkpoint.
463    pub fn fpw_first_touch(&self, page_id: PageId) -> bool {
464        self.touched_pages.insert(page_id)
465    }
466
467    /// Returns the control file manager if configured (used during recovery to read snapshot info).
468    pub fn control_file(&self) -> Option<Arc<ControlFileManager>> {
469        self.control_file.as_ref().map(Arc::clone)
470    }
471}
472
473impl fmt::Debug for WalManager {
474    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
475        f.debug_struct("WalManager")
476            .field("next_lsn", &self.next_lsn.load(Ordering::Relaxed))
477            .field("durable_lsn", &self.durable_lsn.load(Ordering::Relaxed))
478            .field("max_buffer_records", &self.max_buffer_records)
479            .field("flush_coalesce_bytes", &self.flush_coalesce_bytes)
480            .finish()
481    }
482}
483
484#[derive(Clone)]
485struct WalSegmentInfo {
486    id: u64,
487    size: u64,
488}
489
490struct WalStorage {
491    directory: PathBuf,
492    segment_size: u64,
493    sync_on_flush: bool,
494    current_segment: WalSegmentInfo,
495    scheduler: Arc<DiskScheduler>,
496    retain_segments: usize,
497    open_page: Option<WalPage>,
498    last_page_end_lsn: Lsn,
499    last_sync_request: Lsn,
500    pending: VecDeque<WalFlushTicket>,
501}
502
503struct WalFlushTicket {
504    lsn: Lsn,
505    receiver: DiskCommandResultReceiver<()>,
506}
507
508impl WalStorage {
509    fn new(config: WalConfig, scheduler: Arc<DiskScheduler>) -> QuillSQLResult<Self> {
510        fs::create_dir_all(&config.directory)?;
511        let segment = Self::discover_latest_segment(&config.directory, config.segment_size)?;
512        Ok(Self {
513            directory: config.directory,
514            segment_size: config.segment_size,
515            sync_on_flush: config.sync_on_flush,
516            current_segment: segment,
517            scheduler,
518            retain_segments: config.retain_segments.max(1),
519            open_page: None,
520            last_page_end_lsn: 0,
521            last_sync_request: 0,
522            pending: VecDeque::new(),
523        })
524    }
525
526    fn recover_offsets(&mut self) -> QuillSQLResult<(Lsn, Lsn)> {
527        let segments = list_segments(&self.directory)?;
528        let mut next_lsn = 0u64;
529        let mut last_record_start = 0u64;
530        let mut pending_fragment = Vec::new();
531        let mut physical_offset = 0u64;
532
533        for segment_id in segments {
534            let path = segment_path(&self.directory, segment_id);
535            let file = match fs::File::open(&path) {
536                Ok(f) => f,
537                Err(e) if e.kind() == io::ErrorKind::NotFound => continue,
538                Err(e) => return Err(QuillSQLError::Io(e)),
539            };
540            let metadata = file.metadata()?;
541            let mut reader = io::BufReader::new(file);
542            let segment_base = (segment_id.saturating_sub(1)) * self.segment_size;
543            let mut segment_consumed = 0u64;
544            pending_fragment.clear();
545
546            while segment_consumed + WAL_PAGE_SIZE as u64 <= metadata.len() {
547                let mut page_buf = vec![0u8; WAL_PAGE_SIZE];
548                if let Err(err) = reader.read_exact(&mut page_buf) {
549                    if err.kind() == io::ErrorKind::UnexpectedEof {
550                        break;
551                    }
552                    return Err(QuillSQLError::Io(err));
553                }
554
555                let page = match WalPage::unpack_frames(&page_buf) {
556                    Ok(page) => page,
557                    Err(QuillSQLError::Internal(message))
558                        if message.contains("truncated")
559                            || message.contains("CRC mismatch")
560                            || message.contains("Invalid WAL page magic") =>
561                    {
562                        break;
563                    }
564                    Err(err) => return Err(err),
565                };
566
567                if !page.has_payload() {
568                    break;
569                }
570
571                let mut stop_segment = false;
572                for slot in page.fragments() {
573                    let start = slot.offset as usize;
574                    let end = start + slot.len as usize;
575                    let fragment = &page.payload()[start..end];
576                    match slot.kind {
577                        WalPageFragmentKind::Complete => {
578                            pending_fragment.clear();
579                            match decode_frame(fragment) {
580                                Ok((frame, _)) => {
581                                    let frame_len = fragment.len() as u64;
582                                    last_record_start = frame.lsn;
583                                    next_lsn = frame.lsn.saturating_add(frame_len);
584                                }
585                                Err(QuillSQLError::Internal(message))
586                                    if message.contains("too short")
587                                        || message.contains("truncated")
588                                        || message.contains("CRC mismatch") =>
589                                {
590                                    stop_segment = true;
591                                    pending_fragment.clear();
592                                    break;
593                                }
594                                Err(err) => return Err(err),
595                            }
596                        }
597                        WalPageFragmentKind::Start => {
598                            pending_fragment.clear();
599                            pending_fragment.extend_from_slice(fragment);
600                        }
601                        WalPageFragmentKind::Middle => {
602                            if pending_fragment.is_empty() {
603                                stop_segment = true;
604                                pending_fragment.clear();
605                                break;
606                            }
607                            pending_fragment.extend_from_slice(fragment);
608                        }
609                        WalPageFragmentKind::End => {
610                            if pending_fragment.is_empty() {
611                                stop_segment = true;
612                                pending_fragment.clear();
613                                break;
614                            }
615                            pending_fragment.extend_from_slice(fragment);
616                            let frame_bytes = std::mem::take(&mut pending_fragment);
617                            match decode_frame(&frame_bytes) {
618                                Ok((frame, _)) => {
619                                    let frame_len = frame_bytes.len() as u64;
620                                    last_record_start = frame.lsn;
621                                    next_lsn = frame.lsn.saturating_add(frame_len);
622                                }
623                                Err(QuillSQLError::Internal(message))
624                                    if message.contains("too short")
625                                        || message.contains("truncated")
626                                        || message.contains("CRC mismatch") =>
627                                {
628                                    stop_segment = true;
629                                    pending_fragment.clear();
630                                    break;
631                                }
632                                Err(err) => return Err(err),
633                            }
634                        }
635                    }
636                }
637
638                if stop_segment {
639                    break;
640                }
641                segment_consumed += WAL_PAGE_SIZE as u64;
642            }
643
644            physical_offset = segment_base + segment_consumed;
645        }
646
647        self.last_page_end_lsn = next_lsn;
648        self.last_sync_request = next_lsn;
649        let segment_index = physical_offset / self.segment_size;
650        self.current_segment = WalSegmentInfo {
651            id: segment_index + 1,
652            size: physical_offset % self.segment_size,
653        };
654
655        Ok((next_lsn, last_record_start))
656    }
657
658    fn recycle_segments(&mut self, keep_from_lsn: Lsn) -> QuillSQLResult<()> {
659        if self.retain_segments == 0 {
660            return Ok(());
661        }
662        if keep_from_lsn == 0 {
663            return Ok(());
664        }
665        let keep_segment_id = 1 + (keep_from_lsn / self.segment_size);
666        let min_keep = keep_segment_id
667            .saturating_sub(self.retain_segments as u64)
668            .max(1);
669        let segments = list_segments(&self.directory)?;
670        for id in segments {
671            if id < min_keep && id < self.current_segment.id {
672                let path = segment_path(&self.directory, id);
673                let _ = fs::remove_file(&path);
674            }
675        }
676        Ok(())
677    }
678
679    fn append_records(&mut self, records: &[WalRecord]) -> std::io::Result<()> {
680        let mut queue: Vec<WalRecord> = records.iter().cloned().collect();
681        let mut carry: Option<WalFrameContinuation> = None;
682
683        if let Some(page) = self.open_page.take() {
684            let (page, leftover, next_carry) = page.continue_pack(queue);
685            queue = leftover;
686            carry = next_carry;
687
688            if page.is_full() {
689                self.write_page(&page)?;
690            } else {
691                if let Some(lsn) = page.last_end_lsn() {
692                    self.last_page_end_lsn = lsn;
693                }
694                self.open_page = Some(page);
695            }
696        }
697
698        while !queue.is_empty() || carry.is_some() {
699            let (page, leftover, next_carry) =
700                WalPage::pack_frames(self.last_page_end_lsn, queue, carry);
701            queue = leftover;
702            carry = next_carry;
703
704            if !page.has_payload() {
705                break;
706            }
707
708            if page.is_full() {
709                self.write_page(&page)?;
710            } else {
711                if let Some(lsn) = page.last_end_lsn() {
712                    self.last_page_end_lsn = lsn;
713                }
714                self.open_page = Some(page);
715                break;
716            }
717        }
718
719        debug_assert!(carry.is_none());
720
721        Ok(())
722    }
723
724    fn rotate_segment(&mut self) -> std::io::Result<()> {
725        self.flush()?;
726        self.current_segment = WalSegmentInfo {
727            id: self.current_segment.id + 1,
728            size: 0,
729        };
730        Ok(())
731    }
732
733    fn flush(&mut self) -> std::io::Result<()> {
734        if let Some(page) = self.open_page.take() {
735            self.write_page(&page)?;
736        }
737        if self.sync_on_flush && self.last_page_end_lsn > self.last_sync_request {
738            if let Some(receiver) =
739                self.write_bytes(self.current_segment.size, Bytes::new(), true)?
740            {
741                self.last_sync_request = self.last_page_end_lsn;
742                self.pending.push_back(WalFlushTicket {
743                    lsn: self.last_sync_request,
744                    receiver,
745                });
746            }
747        }
748        Ok(())
749    }
750
751    fn take_ready(&mut self, upto: Lsn) -> Vec<WalFlushTicket> {
752        let mut ready = Vec::new();
753        while let Some(front) = self.pending.front() {
754            if front.lsn <= upto {
755                if let Some(ticket) = self.pending.pop_front() {
756                    ready.push(ticket);
757                }
758            } else {
759                break;
760            }
761        }
762        ready
763    }
764
765    fn write_page(&mut self, page: &WalPage) -> std::io::Result<()> {
766        if !page.has_payload() {
767            return Ok(());
768        }
769        if self.current_segment.size + WAL_PAGE_SIZE as u64 > self.segment_size {
770            self.rotate_segment()?;
771        }
772        let offset = self.current_segment.size;
773        let bytes = Bytes::from(page.to_bytes());
774        let receiver = self.write_bytes(offset, bytes, false)?;
775        self.current_segment.size += WAL_PAGE_SIZE as u64;
776        if let Some(lsn) = page.last_end_lsn() {
777            self.last_page_end_lsn = lsn;
778        }
779        let ticket_lsn = self.last_page_end_lsn;
780        if let Some(receiver) = receiver {
781            self.pending.push_back(WalFlushTicket {
782                lsn: ticket_lsn,
783                receiver,
784            });
785        }
786        Ok(())
787    }
788
789    fn discover_latest_segment(
790        directory: &Path,
791        segment_size: u64,
792    ) -> std::io::Result<WalSegmentInfo> {
793        if !directory.exists() {
794            return Ok(WalSegmentInfo { id: 1, size: 0 });
795        }
796
797        let mut segments = Vec::new();
798        for entry in fs::read_dir(directory)? {
799            let entry = entry?;
800            if !entry.file_type()?.is_file() {
801                continue;
802            }
803            let name = entry.file_name();
804            let name = name.to_string_lossy();
805            if let Some(id) = parse_segment_id(&name) {
806                let size = entry.metadata()?.len();
807                segments.push(WalSegmentInfo { id, size });
808            }
809        }
810
811        if segments.is_empty() {
812            return Ok(WalSegmentInfo { id: 1, size: 0 });
813        }
814
815        segments.sort_by_key(|info| info.id);
816        let mut latest = segments.pop().unwrap();
817
818        if latest.size >= segment_size {
819            latest = WalSegmentInfo {
820                id: latest.id + 1,
821                size: 0,
822            };
823        }
824
825        Ok(latest)
826    }
827
828    fn write_bytes(
829        &self,
830        offset: u64,
831        data: Bytes,
832        sync: bool,
833    ) -> std::io::Result<Option<DiskCommandResultReceiver<()>>> {
834        if data.is_empty() && !sync {
835            return Ok(None);
836        }
837        let path = segment_path(&self.directory, self.current_segment.id);
838        if data.is_empty() {
839            let receiver = self
840                .scheduler
841                .schedule_wal_fsync(path)
842                .map_err(|e| io_error(&e))?;
843            Ok(Some(receiver))
844        } else {
845            let receiver = self
846                .scheduler
847                .schedule_wal_write(path, offset, data, sync)
848                .map_err(|e| io_error(&e))?;
849            Ok(Some(receiver))
850        }
851    }
852}
853
854impl std::fmt::Display for WalStorage {
855    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
856        write!(
857            f,
858            "segment={} bytes={} segment_size={} dir={}",
859            self.current_segment.id,
860            self.current_segment.size,
861            self.segment_size,
862            self.directory.display()
863        )
864    }
865}
866
867impl WalRecord {
868    fn encoded_len(&self) -> u64 {
869        self.end_lsn.saturating_sub(self.start_lsn)
870    }
871}
872
873pub struct WalReader {
874    directory: PathBuf,
875    segments: Vec<u64>,
876    current_idx: usize,
877    cursor: Option<SegmentCursor>,
878}
879
880impl WalReader {
881    pub fn new(directory: PathBuf) -> QuillSQLResult<Self> {
882        let mut segments = list_segments(&directory)?;
883        segments.sort_unstable();
884        Ok(Self {
885            directory,
886            segments,
887            current_idx: 0,
888            cursor: None,
889        })
890    }
891
892    pub fn next_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
893        loop {
894            if self.cursor.is_none() && !self.open_next_segment()? {
895                return Ok(None);
896            }
897
898            if let Some(cursor) = self.cursor.as_mut() {
899                match cursor.read_frame()? {
900                    Some(frame) => return Ok(Some(frame)),
901                    None => {
902                        self.cursor = None;
903                        continue;
904                    }
905                }
906            } else {
907                return Ok(None);
908            }
909        }
910    }
911
912    fn open_next_segment(&mut self) -> QuillSQLResult<bool> {
913        let Some(segment_id) = self.segments.get(self.current_idx).copied() else {
914            return Ok(false);
915        };
916        let path = segment_path(&self.directory, segment_id);
917        let cursor = SegmentCursor::open(segment_id, &path)?;
918        self.cursor = Some(cursor);
919        self.current_idx += 1;
920        Ok(true)
921    }
922}
923
924struct SegmentCursor {
925    len: u64,
926    offset: u64,
927    reader: BufReader<File>,
928    fragment_buf: Vec<u8>,
929    ready_frames: VecDeque<WalFrame>,
930}
931
932impl SegmentCursor {
933    fn open(_id: u64, path: &Path) -> QuillSQLResult<Self> {
934        let file = File::open(path)?;
935        let len = file.metadata()?.len();
936        Ok(Self {
937            len,
938            offset: 0,
939            reader: BufReader::new(file),
940            fragment_buf: Vec::new(),
941            ready_frames: VecDeque::new(),
942        })
943    }
944
945    fn read_frame(&mut self) -> QuillSQLResult<Option<WalFrame>> {
946        loop {
947            if let Some(frame) = self.ready_frames.pop_front() {
948                return Ok(Some(frame));
949            }
950
951            if self.offset >= self.len {
952                return Ok(None);
953            }
954
955            let mut page_buf = vec![0u8; WAL_PAGE_SIZE];
956            if let Err(err) = self.reader.read_exact(&mut page_buf) {
957                return match err.kind() {
958                    ErrorKind::UnexpectedEof => {
959                        self.offset = self.len;
960                        Ok(None)
961                    }
962                    _ => Err(QuillSQLError::Io(err)),
963                };
964            }
965            self.offset = self.offset.saturating_add(WAL_PAGE_SIZE as u64);
966
967            match WalPage::unpack_frames(&page_buf) {
968                Ok(page) => {
969                    if !page.has_payload() {
970                        self.offset = self.len;
971                        return Ok(None);
972                    }
973                    for slot in page.fragments() {
974                        let start = slot.offset as usize;
975                        let end = start + slot.len as usize;
976                        let fragment = &page.payload()[start..end];
977                        match slot.kind {
978                            WalPageFragmentKind::Complete => {
979                                self.fragment_buf.clear();
980                                match decode_frame(fragment) {
981                                    Ok((frame, _)) => self.ready_frames.push_back(frame),
982                                    Err(QuillSQLError::Internal(message))
983                                        if message.contains("too short")
984                                            || message.contains("truncated")
985                                            || message.contains("CRC mismatch") =>
986                                    {
987                                        self.offset = self.len;
988                                        return Ok(None);
989                                    }
990                                    Err(err) => return Err(err),
991                                }
992                            }
993                            WalPageFragmentKind::Start => {
994                                self.fragment_buf.clear();
995                                self.fragment_buf.extend_from_slice(fragment);
996                            }
997                            WalPageFragmentKind::Middle => {
998                                if self.fragment_buf.is_empty() {
999                                    self.offset = self.len;
1000                                    return Ok(None);
1001                                }
1002                                self.fragment_buf.extend_from_slice(fragment);
1003                            }
1004                            WalPageFragmentKind::End => {
1005                                if self.fragment_buf.is_empty() {
1006                                    self.offset = self.len;
1007                                    return Ok(None);
1008                                }
1009                                self.fragment_buf.extend_from_slice(fragment);
1010                                let frame_bytes = std::mem::take(&mut self.fragment_buf);
1011                                match decode_frame(&frame_bytes) {
1012                                    Ok((frame, _)) => self.ready_frames.push_back(frame),
1013                                    Err(QuillSQLError::Internal(message))
1014                                        if message.contains("too short")
1015                                            || message.contains("truncated")
1016                                            || message.contains("CRC mismatch") =>
1017                                    {
1018                                        self.offset = self.len;
1019                                        return Ok(None);
1020                                    }
1021                                    Err(err) => return Err(err),
1022                                }
1023                            }
1024                        }
1025                    }
1026                }
1027                Err(QuillSQLError::Internal(message))
1028                    if message.contains("truncated")
1029                        || message.contains("CRC mismatch")
1030                        || message.contains("Invalid WAL page magic") =>
1031                {
1032                    self.offset = self.len;
1033                    return Ok(None);
1034                }
1035                Err(err) => return Err(err),
1036            }
1037        }
1038    }
1039}
1040
1041impl std::fmt::Debug for WalStorage {
1042    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1043        f.debug_struct("WalStorage")
1044            .field("directory", &self.directory)
1045            .field("segment_size", &self.segment_size)
1046            .field("sync_on_flush", &self.sync_on_flush)
1047            .field("current_segment", &self.current_segment)
1048            .finish()
1049    }
1050}
1051
1052impl std::fmt::Debug for WalSegmentInfo {
1053    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1054        f.debug_struct("WalSegmentInfo")
1055            .field("id", &self.id)
1056            .field("size", &self.size)
1057            .finish()
1058    }
1059}
1060
1061fn segment_path(directory: &Path, id: u64) -> PathBuf {
1062    directory.join(format!("wal_{:016X}.log", id))
1063}
1064
1065fn parse_segment_id(name: &str) -> Option<u64> {
1066    let name = name.strip_prefix("wal_")?;
1067    let name = name.strip_suffix(".log")?;
1068    u64::from_str_radix(name, 16).ok()
1069}
1070
1071fn list_segments(directory: &Path) -> QuillSQLResult<Vec<u64>> {
1072    if !directory.exists() {
1073        return Ok(Vec::new());
1074    }
1075    let mut ids = Vec::new();
1076    for entry in fs::read_dir(directory)? {
1077        let entry = entry?;
1078        if !entry.file_type()?.is_file() {
1079            continue;
1080        }
1081        let name = entry.file_name();
1082        let name = name.to_string_lossy();
1083        if let Some(id) = parse_segment_id(&name) {
1084            ids.push(id);
1085        }
1086    }
1087    ids.sort_unstable();
1088    Ok(ids)
1089}
1090
1091fn io_error(err: &QuillSQLError) -> std::io::Error {
1092    std::io::Error::other(err.to_string())
1093}
1094
1095#[derive(Debug)]
1096struct WalWriterRuntime {
1097    stop_flag: Arc<AtomicBool>,
1098    thread: thread::JoinHandle<()>,
1099}
1100
1101impl WalWriterRuntime {
1102    fn spawn(target: Weak<WalManager>, interval: Duration) -> QuillSQLResult<Self> {
1103        let stop_flag = Arc::new(AtomicBool::new(false));
1104        let thread_stop = stop_flag.clone();
1105        let handle = thread::Builder::new()
1106            .name("walwriter".into())
1107            .spawn(move || {
1108                while !thread_stop.load(Ordering::Relaxed) {
1109                    if let Some(manager) = target.upgrade() {
1110                        let _ = manager.flush(None);
1111                    } else {
1112                        break;
1113                    }
1114                    thread::sleep(interval);
1115                }
1116                if let Some(manager) = target.upgrade() {
1117                    let _ = manager.flush(None);
1118                }
1119            })
1120            .map_err(|e| QuillSQLError::Internal(format!("Failed to spawn walwriter: {}", e)))?;
1121        Ok(Self {
1122            stop_flag,
1123            thread: handle,
1124        })
1125    }
1126
1127    fn stop(self) -> QuillSQLResult<()> {
1128        self.stop_flag.store(true, Ordering::Release);
1129        self.thread
1130            .join()
1131            .map_err(|_| QuillSQLError::Internal("walwriter thread panicked".to_string()))
1132    }
1133}
1134
1135impl Drop for WalManager {
1136    fn drop(&mut self) {
1137        if let Some(runtime) = self.writer.lock().take() {
1138            let _ = runtime.stop();
1139        }
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use crate::buffer::PAGE_SIZE;
1146    use crate::config::WalConfig;
1147    use crate::recovery::wal::codec::decode_payload;
1148    use crate::recovery::wal_record::{
1149        CheckpointPayload, ResourceManagerId, TransactionPayload, TransactionRecordKind,
1150        WalRecordPayload,
1151    };
1152    use crate::storage::disk_manager::DiskManager;
1153    use crate::storage::disk_scheduler::DiskScheduler;
1154    use std::sync::Arc;
1155    use tempfile::TempDir;
1156
1157    use super::WalManager;
1158
1159    fn build_scheduler(tmp: &TempDir) -> Arc<DiskScheduler> {
1160        let db_path = tmp.path().join("wal_test.db");
1161        let disk_manager = Arc::new(DiskManager::try_new(&db_path).expect("disk manager"));
1162        Arc::new(DiskScheduler::new(disk_manager))
1163    }
1164
1165    fn build_wal_manager_with_config(tmp: &TempDir, mut config: WalConfig) -> WalManager {
1166        config.directory = tmp.path().join("wal");
1167        let scheduler = build_scheduler(tmp);
1168        WalManager::new_with_scheduler(config, None, None, scheduler).expect("wal manager")
1169    }
1170
1171    fn build_wal_manager(tmp: &TempDir) -> WalManager {
1172        build_wal_manager_with_config(tmp, WalConfig::default())
1173    }
1174
1175    #[test]
1176    fn append_and_flush_tracks_durable_lsn() {
1177        let tmp = TempDir::new().expect("tempdir");
1178        let wal = build_wal_manager(&tmp);
1179        let l1 = wal
1180            .append_record_with(|_| {
1181                WalRecordPayload::Transaction(TransactionPayload {
1182                    marker: TransactionRecordKind::Begin,
1183                    txn_id: 1,
1184                })
1185            })
1186            .expect("append record")
1187            .end_lsn;
1188        let l2 = wal
1189            .append_record_with(|_| {
1190                WalRecordPayload::Transaction(TransactionPayload {
1191                    marker: TransactionRecordKind::Commit,
1192                    txn_id: 1,
1193                })
1194            })
1195            .expect("append record")
1196            .end_lsn;
1197        assert!(l2 > l1);
1198
1199        assert_eq!(wal.durable_lsn(), 0);
1200        let flushed = wal.flush(None).unwrap();
1201        assert_eq!(flushed, l2);
1202        assert_eq!(wal.durable_lsn(), l2);
1203        assert!(wal.pending_records().is_empty());
1204    }
1205
1206    #[test]
1207    fn wal_reader_reads_back_frames() {
1208        let tmp = TempDir::new().expect("tempdir");
1209        let wal = build_wal_manager(&tmp);
1210        let expected: Vec<_> = (0..4)
1211            .map(|txn| {
1212                wal.append_record_with(|_| {
1213                    WalRecordPayload::Transaction(TransactionPayload {
1214                        marker: TransactionRecordKind::Begin,
1215                        txn_id: txn,
1216                    })
1217                })
1218                .expect("append")
1219                .start_lsn
1220            })
1221            .collect();
1222        wal.flush(None).expect("flush");
1223
1224        let mut reader = wal.reader().expect("reader");
1225        let mut observed = Vec::new();
1226        while let Some(frame) = reader.next_frame().expect("frame") {
1227            observed.push(frame.lsn);
1228        }
1229        assert_eq!(observed, expected);
1230    }
1231
1232    #[test]
1233    fn wal_manager_bootstraps_existing_segments() {
1234        use TransactionRecordKind::*;
1235
1236        let tmp = TempDir::new().expect("tempdir");
1237        let wal_dir = tmp.path().join("wal");
1238
1239        let mut config = WalConfig::default();
1240        config.directory = wal_dir.clone();
1241
1242        let wal1 = build_wal_manager_with_config(&tmp, config.clone());
1243        let l1 = wal1
1244            .append_record_with(|_| {
1245                WalRecordPayload::Transaction(TransactionPayload {
1246                    marker: Begin,
1247                    txn_id: 1,
1248                })
1249            })
1250            .expect("append begin")
1251            .end_lsn;
1252        let l2 = wal1
1253            .append_record_with(|_| {
1254                WalRecordPayload::Transaction(TransactionPayload {
1255                    marker: Commit,
1256                    txn_id: 1,
1257                })
1258            })
1259            .expect("append commit")
1260            .end_lsn;
1261        assert!(l2 > l1);
1262        wal1.flush(None).expect("flush wal1");
1263        drop(wal1);
1264
1265        let wal2 = build_wal_manager_with_config(&tmp, config);
1266        assert_eq!(wal2.durable_lsn(), l2);
1267        assert_eq!(wal2.max_assigned_lsn(), l2);
1268
1269        let l3 = wal2
1270            .append_record_with(|_| {
1271                WalRecordPayload::Transaction(TransactionPayload {
1272                    marker: Abort,
1273                    txn_id: 2,
1274                })
1275            })
1276            .expect("append after reopen")
1277            .end_lsn;
1278        assert!(l3 > l2);
1279        wal2.flush(None).expect("flush wal2");
1280        assert_eq!(wal2.durable_lsn(), l3);
1281    }
1282
1283    #[test]
1284    fn flush_until_advances_in_order() {
1285        let tmp = TempDir::new().expect("tempdir");
1286        let wal = build_wal_manager(&tmp);
1287
1288        let first = wal
1289            .append_record_with(|_| {
1290                WalRecordPayload::Transaction(TransactionPayload {
1291                    marker: TransactionRecordKind::Begin,
1292                    txn_id: 42,
1293                })
1294            })
1295            .expect("append begin")
1296            .end_lsn;
1297
1298        let second = wal
1299            .append_record_with(|_| {
1300                WalRecordPayload::Transaction(TransactionPayload {
1301                    marker: TransactionRecordKind::Commit,
1302                    txn_id: 42,
1303                })
1304            })
1305            .expect("append commit")
1306            .end_lsn;
1307
1308        assert!(second > first);
1309        assert_eq!(wal.durable_lsn(), 0);
1310
1311        let dur = wal.flush_until(first).expect("flush first");
1312        assert_eq!(dur, first);
1313        assert_eq!(wal.durable_lsn(), first);
1314
1315        let dur = wal.flush_until(second).expect("flush second");
1316        assert_eq!(dur, second);
1317        assert_eq!(wal.durable_lsn(), second);
1318    }
1319
1320    #[test]
1321    fn wal_recycles_segments_after_checkpoint() {
1322        let tmp = TempDir::new().expect("tempdir");
1323        let mut config = WalConfig::default();
1324        config.directory = tmp.path().join("wal");
1325        config.segment_size = 64;
1326        config.retain_segments = 1;
1327        config.writer_interval_ms = None;
1328        let wal_dir = config.directory.clone();
1329        let retain_segments = config.retain_segments;
1330        let wal = WalManager::new(config, None, None).expect("wal manager");
1331
1332        for i in 0..256 {
1333            wal.append_record_with(|_| {
1334                WalRecordPayload::Transaction(TransactionPayload {
1335                    marker: TransactionRecordKind::Begin,
1336                    txn_id: i,
1337                })
1338            })
1339            .expect("append")
1340            .end_lsn;
1341        }
1342        wal.flush(None).expect("flush");
1343
1344        let last_lsn = wal.max_assigned_lsn();
1345        let payload = CheckpointPayload {
1346            last_lsn,
1347            dirty_pages: Vec::new(),
1348            active_transactions: Vec::new(),
1349            dpt: Vec::new(),
1350        };
1351        wal.log_checkpoint(payload).expect("checkpoint");
1352        wal.flush(None).expect("final flush");
1353
1354        drop(wal);
1355
1356        let mut remaining_ids: Vec<u64> = std::fs::read_dir(&wal_dir)
1357            .expect("wal dir")
1358            .filter_map(|entry| {
1359                entry.ok().and_then(|e| {
1360                    let name = e.file_name();
1361                    let name = name.to_string_lossy();
1362                    super::parse_segment_id(&name)
1363                })
1364            })
1365            .collect();
1366
1367        remaining_ids.sort_unstable();
1368        let max_id = *remaining_ids.last().unwrap_or(&0);
1369        let min_keep = max_id.saturating_sub(retain_segments as u64).max(1);
1370
1371        assert!(
1372            remaining_ids.iter().all(|id| *id >= min_keep),
1373            "segments older than {} should be recycled, remaining {:?}",
1374            min_keep,
1375            remaining_ids
1376        );
1377    }
1378
1379    #[test]
1380    fn auto_flush_respects_buffer_limits() {
1381        let tmp = TempDir::new().expect("tempdir");
1382        let mut config = WalConfig::default();
1383        config.buffer_capacity = 1;
1384        config.flush_coalesce_bytes = 64;
1385        let wal = build_wal_manager_with_config(&tmp, config);
1386
1387        let lsn = wal
1388            .append_record_with(|_| {
1389                WalRecordPayload::Transaction(TransactionPayload {
1390                    marker: TransactionRecordKind::Begin,
1391                    txn_id: 42,
1392                })
1393            })
1394            .expect("append")
1395            .end_lsn;
1396
1397        assert!(wal.pending_records().is_empty());
1398        assert!(wal.durable_lsn() >= lsn);
1399    }
1400
1401    #[test]
1402    fn log_page_update_switches_between_full_and_delta() {
1403        let tmp = TempDir::new().expect("tempdir");
1404        let wal = build_wal_manager(&tmp);
1405        let base = vec![0u8; PAGE_SIZE];
1406
1407        let mut first_image = base.clone();
1408        first_image[0] = 1;
1409        let first = wal
1410            .log_page_update(5, 0, &base, &first_image)
1411            .expect("first update")
1412            .expect("should emit page write");
1413        wal.flush(None).expect("flush first");
1414        let mut reader = wal.reader().expect("reader");
1415        let first_frame = reader.next_frame().expect("frame").expect("frame");
1416        match decode_payload(&first_frame).expect("payload") {
1417            WalRecordPayload::PageWrite(payload) => {
1418                assert_eq!(payload.page_id, 5);
1419            }
1420            other => panic!("expected PageWrite, got {:?}", other),
1421        }
1422        assert!(reader.next_frame().expect("no extra").is_none());
1423
1424        let mut second_image = first_image.clone();
1425        second_image[128] = 77;
1426        let second = wal
1427            .log_page_update(5, first.end_lsn, &first_image, &second_image)
1428            .expect("second update")
1429            .expect("should emit delta");
1430        wal.flush(None).expect("flush second");
1431        let mut reader = wal.reader().expect("reader");
1432        let mut frames = Vec::new();
1433        while let Some(frame) = reader.next_frame().expect("frame") {
1434            frames.push(frame);
1435        }
1436        assert_eq!(frames.len(), 2);
1437        match decode_payload(&frames[1]).expect("payload") {
1438            WalRecordPayload::PageDelta(payload) => {
1439                assert_eq!(payload.page_id, 5);
1440                assert_eq!(payload.offset as usize, 128);
1441                assert_eq!(payload.data.len(), 1);
1442                assert_eq!(payload.data[0], 77);
1443            }
1444            other => panic!("expected PageDelta, got {:?}", other),
1445        }
1446
1447        let none = wal
1448            .log_page_update(5, second.end_lsn, &second_image, &second_image)
1449            .expect("noop update");
1450        assert!(none.is_none());
1451        wal.flush(None).expect("flush noop");
1452        let mut reader = wal.reader().expect("reader");
1453        let mut count = 0usize;
1454        while reader.next_frame().expect("frame").is_some() {
1455            count += 1;
1456        }
1457        assert_eq!(count, 2, "noop update should not add frames");
1458    }
1459
1460    #[test]
1461    fn log_checkpoint_persists_record() {
1462        let tmp = TempDir::new().expect("tempdir");
1463        let wal = build_wal_manager(&tmp);
1464
1465        let payload = CheckpointPayload {
1466            last_lsn: 7,
1467            dirty_pages: vec![1, 2, 3],
1468            active_transactions: vec![10, 11],
1469            dpt: Vec::new(),
1470        };
1471        let checkpoint_lsn = wal.log_checkpoint(payload.clone()).expect("checkpoint");
1472        assert!(wal.last_checkpoint_lsn() <= checkpoint_lsn);
1473        assert!(wal.durable_lsn() >= checkpoint_lsn);
1474
1475        let mut reader = wal.reader().expect("reader");
1476        let mut seen = Vec::new();
1477        while let Some(frame) = reader.next_frame().expect("frame") {
1478            seen.push(frame);
1479        }
1480        assert_eq!(seen.len(), 1);
1481        assert_eq!(seen[0].rmid, ResourceManagerId::Checkpoint);
1482        let observed = crate::recovery::wal_record::decode_checkpoint(&seen[0].body).unwrap();
1483        assert_eq!(observed.last_lsn, payload.last_lsn);
1484        assert_eq!(observed.dirty_pages, payload.dirty_pages);
1485        assert_eq!(observed.active_transactions, payload.active_transactions);
1486    }
1487}