1use fsqlite_error::{FrankenError, Result};
17use fsqlite_types::cx::Cx;
18use fsqlite_types::flags::SyncFlags;
19use fsqlite_vfs::{SyncKind, VfsFile};
20use tracing::{debug, error, warn};
21
22static PARANOID_DURABILITY: std::sync::LazyLock<bool> = std::sync::LazyLock::new(|| {
25 std::env::var("FRANKENSQLITE_PARANOID_DURABILITY")
26 .is_ok_and(|v| v == "1" || v.eq_ignore_ascii_case("true"))
27});
28
29use crate::checksum::{
30 SqliteWalChecksum, WAL_FORMAT_VERSION, WAL_FRAME_HEADER_SIZE, WAL_HEADER_SIZE, WAL_MAGIC_LE,
31 WalChecksumTransform, WalFrameHeader, WalHeader, WalSalts, compute_wal_frame_checksum,
32 read_wal_header_checksum, wal_header_checksum, write_wal_frame_checksum,
33 write_wal_frame_checksum_fields, write_wal_frame_salts,
34};
35
36#[inline]
37fn log_replay_decision(
38 replay_cursor: &'static str,
39 frame_no: usize,
40 commit_boundary: usize,
41 decision_reason: &'static str,
42) {
43 debug!(
44 replay_cursor,
45 frame_no, commit_boundary, decision_reason, "WAL replay decision"
46 );
47}
48
49#[derive(Debug, Clone, Copy)]
51pub struct WalAppendFrameRef<'a> {
52 pub page_number: u32,
54 pub page_data: &'a [u8],
56 pub db_size_if_commit: u32,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
67pub struct WalGenerationIdentity {
68 pub checkpoint_seq: u32,
70 pub salts: WalSalts,
72}
73
74impl WalGenerationIdentity {
75 #[must_use]
77 pub const fn from_header(header: &WalHeader) -> Self {
78 Self {
79 checkpoint_seq: header.checkpoint_seq,
80 salts: header.salts,
81 }
82 }
83}
84
85#[inline]
86fn push_wal_frame_bytes(
87 frame_scratch: &mut Vec<u8>,
88 page_number: u32,
89 db_size_if_commit: u32,
90 salts: WalSalts,
91 page_data: &[u8],
92) {
93 frame_scratch.extend_from_slice(&page_number.to_be_bytes());
94 frame_scratch.extend_from_slice(&db_size_if_commit.to_be_bytes());
95 frame_scratch.extend_from_slice(&salts.salt1.to_be_bytes());
96 frame_scratch.extend_from_slice(&salts.salt2.to_be_bytes());
97 frame_scratch.extend_from_slice(&[0_u8; 8]);
98 frame_scratch.extend_from_slice(page_data);
99}
100
101pub struct WalFile<F: VfsFile> {
106 file: F,
107 page_size: usize,
108 big_endian_checksum: bool,
109 header: WalHeader,
110 running_checksum: SqliteWalChecksum,
112 frame_count: usize,
114 last_commit_frame: Option<usize>,
116 frame_scratch: Vec<u8>,
122 last_fsynced_frame_count: usize,
127}
128
129impl<F: VfsFile> WalFile<F> {
130 pub fn refresh(&mut self, cx: &Cx) -> Result<()> {
136 let frame_size = self.frame_size();
137 let expected_size = u64::try_from(WAL_HEADER_SIZE)
138 .expect("WAL header size fits u64")
139 .saturating_add(
140 u64::try_from(self.frame_count)
141 .unwrap_or(u64::MAX)
142 .saturating_mul(u64::try_from(frame_size).unwrap_or(u64::MAX)),
143 );
144 let file_size = self.file.file_size(cx)?;
145
146 if file_size < expected_size {
150 log_replay_decision("refresh", 0, self.frame_count, "file_shrank_rebuild");
151 return self.rebuild_state_from_file(cx);
152 }
153
154 let mut header_buf = [0u8; WAL_HEADER_SIZE];
158 let header_read = self.file.read(cx, &mut header_buf, 0)?;
159 if header_read < WAL_HEADER_SIZE {
160 log_replay_decision("refresh", 0, self.frame_count, "header_short_read_corrupt");
161 return Err(FrankenError::WalCorrupt {
162 detail: format!(
163 "WAL file too small for header during refresh: read {header_read}, need {WAL_HEADER_SIZE}"
164 ),
165 });
166 }
167
168 let disk_header = WalHeader::from_bytes(&header_buf)?;
169 let disk_big_endian = disk_header.big_endian_checksum();
170 let disk_header_checksum = read_wal_header_checksum(&header_buf)?;
171 let expected_header_checksum = wal_header_checksum(&header_buf, disk_big_endian)?;
172 if disk_header_checksum != expected_header_checksum {
173 log_replay_decision(
174 "refresh",
175 0,
176 self.frame_count,
177 "header_checksum_mismatch_corrupt",
178 );
179 return Err(FrankenError::WalCorrupt {
180 detail: "WAL header checksum mismatch during refresh".to_owned(),
181 });
182 }
183
184 if disk_header.magic != self.header.magic
186 || disk_header.format_version != self.header.format_version
187 || disk_header.page_size != self.header.page_size
188 || disk_header.checkpoint_seq != self.header.checkpoint_seq
189 || disk_header.salts != self.header.salts
190 {
191 log_replay_decision(
192 "refresh",
193 0,
194 self.frame_count,
195 "header_generation_changed_rebuild",
196 );
197 return self.rebuild_state_from_file(cx);
198 }
199
200 if file_size == expected_size {
201 return Ok(());
202 }
203
204 let frame_size_u64 = u64::try_from(frame_size).unwrap_or(u64::MAX);
214 let available_frames = usize::try_from(
215 file_size.saturating_sub(u64::try_from(WAL_HEADER_SIZE).unwrap_or(0)) / frame_size_u64,
216 )
217 .unwrap_or(usize::MAX);
218 if available_frames <= self.frame_count {
219 return Ok(());
220 }
221
222 let mut new_frame_count = self.frame_count;
223 let mut new_running_checksum = self.running_checksum;
224 let mut last_commit_count = self.frame_count;
225 let mut last_commit_checksum = self.running_checksum;
226
227 let mut frame_buf = vec![0u8; frame_size];
228 for frame_index in self.frame_count..available_frames {
229 let frame_no = frame_index.saturating_add(1);
230 let offset = self.frame_offset(frame_index);
231 let bytes_read = self.file.read(cx, &mut frame_buf, offset)?;
232 if bytes_read < frame_size {
233 log_replay_decision(
234 "refresh_incremental",
235 frame_no,
236 last_commit_count,
237 "truncated_tail_stop",
238 );
239 break; }
241
242 let frame_header = WalFrameHeader::from_bytes(&frame_buf[..WAL_FRAME_HEADER_SIZE])?;
243 if frame_header.salts != self.header.salts {
244 log_replay_decision(
245 "refresh_incremental",
246 frame_no,
247 last_commit_count,
248 "salt_mismatch_stop",
249 );
250 break; }
252
253 let expected = compute_wal_frame_checksum(
254 &frame_buf,
255 self.page_size,
256 new_running_checksum,
257 self.big_endian_checksum,
258 )?;
259 if frame_header.checksum != expected {
260 log_replay_decision(
261 "refresh_incremental",
262 frame_no,
263 last_commit_count,
264 "checksum_mismatch_stop",
265 );
266 break; }
268
269 new_running_checksum = expected;
270 new_frame_count += 1;
271
272 if frame_header.is_commit() {
273 last_commit_count = new_frame_count;
274 last_commit_checksum = new_running_checksum;
275 log_replay_decision(
276 "refresh_incremental",
277 frame_no,
278 last_commit_count,
279 "accept_commit",
280 );
281 } else {
282 log_replay_decision(
283 "refresh_incremental",
284 frame_no,
285 last_commit_count,
286 "accept_non_commit",
287 );
288 }
289 }
290
291 self.frame_count = last_commit_count;
292 self.running_checksum = last_commit_checksum;
293 self.last_commit_frame = last_commit_count.checked_sub(1);
294
295 Ok(())
296 }
297
298 fn rebuild_state_from_file(&mut self, cx: &Cx) -> Result<()> {
299 let mut header_buf = [0u8; WAL_HEADER_SIZE];
300 let header_read = self.file.read(cx, &mut header_buf, 0)?;
301 if header_read < WAL_HEADER_SIZE {
302 log_replay_decision("rebuild", 0, self.frame_count, "header_short_read_corrupt");
303 return Err(FrankenError::WalCorrupt {
304 detail: format!(
305 "WAL file too small for header during rebuild: read {header_read}, need {WAL_HEADER_SIZE}"
306 ),
307 });
308 }
309
310 let header = WalHeader::from_bytes(&header_buf)?;
311 let page_size = usize::try_from(header.page_size).expect("WAL header page size fits usize");
312 let big_endian_checksum = header.big_endian_checksum();
313 let header_checksum = read_wal_header_checksum(&header_buf)?;
314 let expected_header_checksum = wal_header_checksum(&header_buf, big_endian_checksum)?;
315 if header_checksum != expected_header_checksum {
316 log_replay_decision(
317 "rebuild",
318 0,
319 self.frame_count,
320 "header_checksum_mismatch_corrupt",
321 );
322 return Err(FrankenError::WalCorrupt {
323 detail: "WAL header checksum mismatch during rebuild".to_owned(),
324 });
325 }
326
327 self.header = header;
328 self.page_size = page_size;
329 self.big_endian_checksum = big_endian_checksum;
330 self.running_checksum = header_checksum;
331 self.frame_count = 0;
332
333 let mut new_frame_count = 0;
334 let mut new_running_checksum = header_checksum;
335 let mut last_commit_count = 0;
336 let mut last_commit_checksum = header_checksum;
337
338 let frame_size = self.frame_size();
339 let file_size = self.file.file_size(cx)?;
340 let max_frames = usize::try_from(
341 file_size.saturating_sub(u64::try_from(WAL_HEADER_SIZE).unwrap_or(0))
342 / u64::try_from(frame_size).unwrap_or(1),
343 )
344 .unwrap_or(usize::MAX);
345
346 let mut frame_buf = vec![0u8; frame_size];
347 for frame_index in 0..max_frames {
348 let frame_no = frame_index.saturating_add(1);
349 let offset = self.frame_offset(frame_index);
350 let bytes_read = self.file.read(cx, &mut frame_buf, offset)?;
351 if bytes_read < frame_size {
352 log_replay_decision(
353 "rebuild",
354 frame_no,
355 last_commit_count,
356 "truncated_tail_stop",
357 );
358 break;
359 }
360
361 let frame_header = WalFrameHeader::from_bytes(&frame_buf[..WAL_FRAME_HEADER_SIZE])?;
362 if frame_header.salts != self.header.salts {
363 log_replay_decision("rebuild", frame_no, last_commit_count, "salt_mismatch_stop");
364 break;
365 }
366
367 let expected = compute_wal_frame_checksum(
368 &frame_buf,
369 self.page_size,
370 new_running_checksum,
371 self.big_endian_checksum,
372 )?;
373 if frame_header.checksum != expected {
374 log_replay_decision(
375 "rebuild",
376 frame_no,
377 last_commit_count,
378 "checksum_mismatch_stop",
379 );
380 break;
381 }
382
383 new_running_checksum = expected;
384 new_frame_count += 1;
385
386 if frame_header.is_commit() {
387 last_commit_count = new_frame_count;
388 last_commit_checksum = new_running_checksum;
389 log_replay_decision("rebuild", frame_no, last_commit_count, "accept_commit");
390 } else {
391 log_replay_decision("rebuild", frame_no, last_commit_count, "accept_non_commit");
392 }
393 }
394
395 self.frame_count = last_commit_count;
396 self.running_checksum = last_commit_checksum;
397 self.last_commit_frame = last_commit_count.checked_sub(1);
398
399 Ok(())
400 }
401
402 #[must_use]
404 pub fn frame_size(&self) -> usize {
405 WAL_FRAME_HEADER_SIZE + self.page_size
406 }
407
408 #[allow(clippy::cast_possible_truncation)]
410 pub(crate) fn frame_offset(&self, index: usize) -> u64 {
411 let header_size = WAL_HEADER_SIZE as u64;
414 let idx = index as u64;
415 let frame_sz = self.frame_size() as u64;
416 header_size.saturating_add(idx.saturating_mul(frame_sz))
417 }
418
419 #[must_use]
421 pub fn frame_count(&self) -> usize {
422 self.frame_count
423 }
424
425 #[must_use]
427 pub fn header(&self) -> &WalHeader {
428 &self.header
429 }
430
431 #[must_use]
433 pub fn generation_identity(&self) -> WalGenerationIdentity {
434 WalGenerationIdentity::from_header(&self.header)
435 }
436
437 #[must_use]
439 pub fn page_size(&self) -> usize {
440 self.page_size
441 }
442
443 #[must_use]
445 pub fn big_endian_checksum(&self) -> bool {
446 self.big_endian_checksum
447 }
448
449 #[must_use]
451 pub fn running_checksum(&self) -> SqliteWalChecksum {
452 self.running_checksum
453 }
454
455 #[cfg(test)]
456 #[must_use]
457 fn frame_scratch_len(&self) -> usize {
458 self.frame_scratch.len()
459 }
460
461 #[cfg(test)]
462 #[must_use]
463 fn frame_scratch_capacity(&self) -> usize {
464 self.frame_scratch.capacity()
465 }
466
467 #[cfg(test)]
468 #[must_use]
469 fn frame_scratch_ptr(&self) -> *const u8 {
470 self.frame_scratch.as_ptr()
471 }
472
473 pub fn create(
478 cx: &Cx,
479 mut file: F,
480 page_size: u32,
481 checkpoint_seq: u32,
482 salts: WalSalts,
483 ) -> Result<Self> {
484 let header = WalHeader {
485 magic: WAL_MAGIC_LE,
486 format_version: WAL_FORMAT_VERSION,
487 page_size,
488 checkpoint_seq,
489 salts,
490 checksum: SqliteWalChecksum::default(), };
492 let header_bytes = header.to_bytes()?;
493 file.write(cx, &header_bytes, 0)?;
494 file.truncate(
495 cx,
496 u64::try_from(WAL_HEADER_SIZE).expect("header size fits u64"),
497 )?;
498
499 let running_checksum = read_wal_header_checksum(&header_bytes)?;
500
501 debug!(
502 page_size,
503 checkpoint_seq,
504 salt1 = header.salts.salt1,
505 salt2 = header.salts.salt2,
506 "WAL file created"
507 );
508 crate::metrics::GLOBAL_WAL_METRICS.set_wal_frames_current(0);
509
510 Ok(Self {
511 file,
512 page_size: usize::try_from(page_size).expect("page size fits usize"),
513 big_endian_checksum: false,
514 header,
515 running_checksum,
516 frame_count: 0,
517 last_commit_frame: None,
518 frame_scratch: Vec::new(),
519 last_fsynced_frame_count: 0,
520 })
521 }
522
523 #[allow(clippy::too_many_lines)]
527 pub fn open(cx: &Cx, file: F) -> Result<Self> {
528 let mut header_buf = [0u8; WAL_HEADER_SIZE];
530 let bytes_read = file.read(cx, &mut header_buf, 0)?;
531 if bytes_read < WAL_HEADER_SIZE {
532 log_replay_decision("startup_open", 0, 0, "header_short_read_corrupt");
533 return Err(FrankenError::WalCorrupt {
534 detail: format!(
535 "WAL file too small for header: read {bytes_read}, need {WAL_HEADER_SIZE}"
536 ),
537 });
538 }
539 let header = WalHeader::from_bytes(&header_buf)?;
540 let page_size = usize::try_from(header.page_size).expect("WAL header page size fits usize");
541 let big_endian_checksum = header.big_endian_checksum();
542 let frame_size = WAL_FRAME_HEADER_SIZE + page_size;
543
544 let header_checksum = read_wal_header_checksum(&header_buf)?;
546 let expected_checksum =
547 crate::checksum::wal_header_checksum(&header_buf, big_endian_checksum)?;
548 if header_checksum != expected_checksum {
549 error!("WAL header checksum mismatch — file may be corrupt");
550 log_replay_decision("startup_open", 0, 0, "header_checksum_mismatch_corrupt");
551 return Err(FrankenError::WalCorrupt {
552 detail: "WAL header checksum mismatch".to_owned(),
553 });
554 }
555
556 let file_size = file.file_size(cx)?;
558 let data_bytes =
559 file_size.saturating_sub(u64::try_from(WAL_HEADER_SIZE).expect("header size fits u64"));
560 let max_frames = usize::try_from(data_bytes / u64::try_from(frame_size).unwrap_or(1))
561 .unwrap_or(usize::MAX);
562
563 let mut running_checksum = header_checksum;
564 let mut valid_frames = 0_usize;
565 let mut last_commit_frames = 0_usize;
566 let mut last_commit_checksum = header_checksum;
567 let mut frame_buf = vec![0u8; frame_size];
568
569 for frame_index in 0..max_frames {
570 let frame_no = frame_index.saturating_add(1);
571 let header_size = WAL_HEADER_SIZE as u64;
576 let idx = frame_index as u64;
577 let frame_sz = frame_size as u64;
578 let file_offset = header_size.saturating_add(idx.saturating_mul(frame_sz));
579
580 let bytes_read = file.read(cx, &mut frame_buf, file_offset)?;
581 if bytes_read < frame_size {
582 log_replay_decision(
583 "startup_open",
584 frame_no,
585 last_commit_frames,
586 "truncated_tail_stop",
587 );
588 break; }
590
591 let frame_header = WalFrameHeader::from_bytes(&frame_buf[..WAL_FRAME_HEADER_SIZE])?;
593 if frame_header.salts != header.salts {
594 warn!(frame_index, "WAL frame salt mismatch — chain terminated");
595 log_replay_decision(
596 "startup_open",
597 frame_no,
598 last_commit_frames,
599 "salt_mismatch_stop",
600 );
601 break; }
603
604 let expected = compute_wal_frame_checksum(
606 &frame_buf,
607 page_size,
608 running_checksum,
609 big_endian_checksum,
610 )?;
611 if frame_header.checksum != expected {
612 warn!(
613 frame_index,
614 "WAL frame checksum mismatch — chain terminated"
615 );
616 log_replay_decision(
617 "startup_open",
618 frame_no,
619 last_commit_frames,
620 "checksum_mismatch_stop",
621 );
622 break; }
624
625 running_checksum = expected;
626 valid_frames += 1;
627
628 if frame_header.is_commit() {
629 last_commit_frames = valid_frames;
630 last_commit_checksum = running_checksum;
631 log_replay_decision(
632 "startup_open",
633 frame_no,
634 last_commit_frames,
635 "accept_commit",
636 );
637 } else {
638 log_replay_decision(
639 "startup_open",
640 frame_no,
641 last_commit_frames,
642 "accept_non_commit",
643 );
644 }
645 }
646
647 debug!(
648 page_size,
649 big_endian_checksum,
650 checkpoint_seq = header.checkpoint_seq,
651 valid_frames = last_commit_frames,
652 "WAL file opened"
653 );
654 crate::metrics::GLOBAL_WAL_METRICS
655 .set_wal_frames_current(u64::try_from(last_commit_frames).unwrap_or(u64::MAX));
656
657 Ok(Self {
658 file,
659 page_size,
660 big_endian_checksum,
661 header,
662 running_checksum: last_commit_checksum,
663 frame_count: last_commit_frames,
664 last_commit_frame: last_commit_frames.checked_sub(1),
665 frame_scratch: Vec::new(),
666 last_fsynced_frame_count: last_commit_frames,
667 })
668 }
669
670 pub fn advance_state_after_write(
676 &mut self,
677 frames_written: usize,
678 new_running_checksum: SqliteWalChecksum,
679 ) -> Result<()> {
680 let new_count = self
681 .frame_count
682 .checked_add(frames_written)
683 .ok_or(FrankenError::DatabaseFull)?;
684
685 if new_count > usize::try_from(u32::MAX).unwrap_or(usize::MAX) {
686 return Err(FrankenError::DatabaseFull);
687 }
688
689 self.frame_count = new_count;
690 self.running_checksum = new_running_checksum;
691 crate::metrics::GLOBAL_WAL_METRICS
692 .set_wal_frames_current(u64::try_from(self.frame_count).unwrap_or(u64::MAX));
693 Ok(())
694 }
695
696 pub fn append_frame(
703 &mut self,
704 cx: &Cx,
705 page_number: u32,
706 page_data: &[u8],
707 db_size_if_commit: u32,
708 ) -> Result<()> {
709 if self.frame_count >= usize::try_from(u32::MAX).unwrap_or(usize::MAX) {
710 return Err(FrankenError::DatabaseFull);
711 }
712
713 if page_data.len() != self.page_size {
714 return Err(FrankenError::WalCorrupt {
715 detail: format!(
716 "page data size mismatch: expected {}, got {}",
717 self.page_size,
718 page_data.len()
719 ),
720 });
721 }
722
723 let frame_size = self.frame_size();
725 let page_size = self.page_size;
726 let salts = self.header.salts;
727 let running_checksum = self.running_checksum;
728 let big_endian_checksum = self.big_endian_checksum;
729 let offset = self.frame_offset(self.frame_count);
730
731 let mut frame_scratch = std::mem::take(&mut self.frame_scratch);
732 frame_scratch.clear();
733 if frame_scratch.capacity() < frame_size {
734 frame_scratch.reserve(frame_size - frame_scratch.capacity());
735 }
736 let append_result = (|| -> Result<SqliteWalChecksum> {
737 push_wal_frame_bytes(
738 &mut frame_scratch,
739 page_number,
740 db_size_if_commit,
741 salts,
742 page_data,
743 );
744 let frame = &mut frame_scratch[..frame_size];
745
746 let new_checksum =
748 write_wal_frame_checksum(frame, page_size, running_checksum, big_endian_checksum)?;
749
750 self.file.write(cx, frame, offset)?;
751 Ok(new_checksum)
752 })();
753 self.frame_scratch = frame_scratch;
754 let new_checksum = append_result?;
755
756 self.running_checksum = new_checksum;
757 self.frame_count += 1;
758 if db_size_if_commit != 0 {
759 self.last_commit_frame = Some(self.frame_count - 1);
760 }
761 crate::metrics::GLOBAL_WAL_METRICS
762 .set_wal_frames_current(u64::try_from(self.frame_count).unwrap_or(u64::MAX));
763
764 let bytes_written = u64::try_from(frame_size).unwrap_or(u64::MAX);
765 let span = tracing::span!(
766 tracing::Level::DEBUG,
767 "wal_write",
768 frame_count = self.frame_count,
769 bytes_written = bytes_written,
770 page_number = page_number,
771 is_commit = db_size_if_commit > 0,
772 );
773 let _guard = span.enter();
774
775 debug!(
776 frame_index = self.frame_count - 1,
777 page_number,
778 is_commit = db_size_if_commit > 0,
779 "WAL frame appended"
780 );
781
782 crate::metrics::GLOBAL_WAL_METRICS.record_frame_write(bytes_written);
783
784 Ok(())
785 }
786
787 pub fn prepare_frame_bytes(&self, frames: &[WalAppendFrameRef<'_>]) -> Result<Vec<u8>> {
794 let mut frame_buf = Vec::new();
795 let mut checksum_transforms = Vec::new();
796 let _ = self.prepare_frame_bytes_with_transforms_into(
797 frames.len(),
798 frames.iter().copied(),
799 &mut frame_buf,
800 &mut checksum_transforms,
801 )?;
802 Ok(frame_buf)
803 }
804
805 pub fn prepare_frame_bytes_with_transforms_into<'a, I>(
812 &self,
813 frame_count: usize,
814 frames: I,
815 frame_buf: &mut Vec<u8>,
816 checksum_transforms: &mut Vec<WalChecksumTransform>,
817 ) -> Result<Option<usize>>
818 where
819 I: IntoIterator<Item = WalAppendFrameRef<'a>>,
820 {
821 frame_buf.clear();
822 checksum_transforms.clear();
823 if frame_count == 0 {
824 return Ok(None);
825 }
826
827 let frame_size = self.frame_size();
828 let total_bytes = frame_count
829 .checked_mul(frame_size)
830 .ok_or(FrankenError::DatabaseFull)?;
831 frame_buf.resize(total_bytes, 0);
832 if checksum_transforms.capacity() < frame_count {
833 checksum_transforms.reserve(frame_count - checksum_transforms.capacity());
834 }
835
836 let mut observed_frame_count = 0usize;
837 let mut last_commit_offset = None;
838 for (idx, frame) in frames.into_iter().enumerate() {
839 if idx >= frame_count {
840 return Err(FrankenError::WalCorrupt {
841 detail: format!(
842 "prepared batch frame count mismatch: expected {frame_count}, got more than declared"
843 ),
844 });
845 }
846 if frame.page_data.len() != self.page_size {
847 return Err(FrankenError::WalCorrupt {
848 detail: format!(
849 "page data size mismatch in batch frame {idx}: expected {}, got {}",
850 self.page_size,
851 frame.page_data.len()
852 ),
853 });
854 }
855
856 let buf_offset = idx
857 .checked_mul(frame_size)
858 .ok_or(FrankenError::DatabaseFull)?;
859 let frame_slice = &mut frame_buf[buf_offset..buf_offset + frame_size];
860
861 frame_slice[..4].copy_from_slice(&frame.page_number.to_be_bytes());
862 frame_slice[4..8].copy_from_slice(&frame.db_size_if_commit.to_be_bytes());
863 write_wal_frame_salts(&mut frame_slice[..WAL_FRAME_HEADER_SIZE], self.header.salts)?;
864 frame_slice[WAL_FRAME_HEADER_SIZE..].copy_from_slice(frame.page_data);
865 checksum_transforms.push(WalChecksumTransform::for_wal_frame(
866 frame_slice,
867 self.page_size,
868 self.big_endian_checksum,
869 )?);
870 if frame.db_size_if_commit != 0 {
871 last_commit_offset = Some(idx);
872 }
873 observed_frame_count = idx.saturating_add(1);
874 }
875
876 if observed_frame_count != frame_count {
877 return Err(FrankenError::WalCorrupt {
878 detail: format!(
879 "prepared batch frame count mismatch: expected {frame_count}, got {observed_frame_count}"
880 ),
881 });
882 }
883
884 Ok(last_commit_offset)
885 }
886
887 pub fn prepared_append_window_still_current(
894 &self,
895 cx: &Cx,
896 generation: WalGenerationIdentity,
897 start_frame_index: usize,
898 ) -> Result<bool> {
899 let expected_size = u64::try_from(WAL_HEADER_SIZE)
900 .expect("WAL header size fits u64")
901 .saturating_add(
902 u64::try_from(start_frame_index)
903 .unwrap_or(u64::MAX)
904 .saturating_mul(u64::try_from(self.frame_size()).unwrap_or(u64::MAX)),
905 );
906 if self.file.file_size(cx)? != expected_size {
907 return Ok(false);
908 }
909
910 let mut header_buf = [0u8; WAL_HEADER_SIZE];
911 let header_read = self.file.read(cx, &mut header_buf, 0)?;
912 if header_read < WAL_HEADER_SIZE {
913 return Err(FrankenError::WalCorrupt {
914 detail: format!(
915 "WAL file too small for header during prepared append validation: read {header_read}, need {WAL_HEADER_SIZE}"
916 ),
917 });
918 }
919
920 let disk_header = WalHeader::from_bytes(&header_buf)?;
921 Ok(WalGenerationIdentity::from_header(&disk_header) == generation)
922 }
923
924 pub fn finalize_prepared_frame_bytes(
931 &self,
932 prepared_frame_bytes: &mut [u8],
933 frame_transforms: &[WalChecksumTransform],
934 ) -> Result<SqliteWalChecksum> {
935 let frame_count = frame_transforms.len();
936 if frame_count == 0 {
937 return Ok(self.running_checksum);
938 }
939
940 let frame_size = self.frame_size();
941 let expected_bytes = frame_count
942 .checked_mul(frame_size)
943 .ok_or(FrankenError::DatabaseFull)?;
944 if prepared_frame_bytes.len() != expected_bytes {
945 return Err(FrankenError::WalCorrupt {
946 detail: format!(
947 "prepared batch byte length mismatch: expected {expected_bytes}, got {}",
948 prepared_frame_bytes.len()
949 ),
950 });
951 }
952
953 let mut running_checksum = self.running_checksum;
954 for (frame_slice, frame_transform) in prepared_frame_bytes
955 .chunks_exact_mut(frame_size)
956 .zip(frame_transforms.iter())
957 {
958 write_wal_frame_salts(&mut frame_slice[..WAL_FRAME_HEADER_SIZE], self.header.salts)?;
959 running_checksum = frame_transform.apply(running_checksum);
960 write_wal_frame_checksum_fields(frame_slice, running_checksum)?;
961 }
962
963 Ok(running_checksum)
964 }
965
966 pub fn append_finalized_prepared_frame_bytes(
969 &mut self,
970 cx: &Cx,
971 prepared_frame_bytes: &[u8],
972 frame_count: usize,
973 final_running_checksum: SqliteWalChecksum,
974 last_commit_offset: Option<usize>,
975 ) -> Result<()> {
976 if frame_count == 0 {
977 return Ok(());
978 }
979
980 let new_count = self
981 .frame_count
982 .checked_add(frame_count)
983 .ok_or(FrankenError::DatabaseFull)?;
984 if new_count > usize::try_from(u32::MAX).unwrap_or(usize::MAX) {
985 return Err(FrankenError::DatabaseFull);
986 }
987
988 let frame_size = self.frame_size();
989 let expected_bytes = frame_count
990 .checked_mul(frame_size)
991 .ok_or(FrankenError::DatabaseFull)?;
992 if prepared_frame_bytes.len() != expected_bytes {
993 return Err(FrankenError::WalCorrupt {
994 detail: format!(
995 "prepared batch byte length mismatch: expected {expected_bytes}, got {}",
996 prepared_frame_bytes.len()
997 ),
998 });
999 }
1000
1001 let start_frame_index = self.frame_count;
1002 let offset = self.frame_offset(start_frame_index);
1003
1004 #[cfg(any(test, feature = "fault-injection"))]
1005 crate::fault_hooks::maybe_inject_crash_at(
1006 crate::fault_hooks::CrashBoundary::BeforeWalFrameAppend,
1007 &format!("start_frame={start_frame_index} frame_count={frame_count}"),
1008 )?;
1009
1010 self.file.write(cx, prepared_frame_bytes, offset)?;
1011 self.advance_state_after_write(frame_count, final_running_checksum)?;
1012 if let Some(last_commit_offset) = last_commit_offset {
1013 self.last_commit_frame = Some(start_frame_index + last_commit_offset);
1014 }
1015
1016 #[cfg(any(test, feature = "fault-injection"))]
1017 crate::fault_hooks::maybe_inject_crash_at(
1018 crate::fault_hooks::CrashBoundary::AfterWalFrameAppendBeforeFsync,
1019 &format!(
1020 "end_frame={} frames_written={frame_count}",
1021 self.frame_count
1022 ),
1023 )?;
1024
1025 let bytes_per_frame = u64::try_from(frame_size).unwrap_or(u64::MAX);
1026 let bytes_written = u64::try_from(expected_bytes).unwrap_or(u64::MAX);
1027 let span = tracing::span!(
1028 tracing::Level::DEBUG,
1029 "wal_batch_write",
1030 start_frame_index = start_frame_index,
1031 frames_written = frame_count,
1032 bytes_written = bytes_written,
1033 );
1034 let _guard = span.enter();
1035
1036 debug!(
1037 end_frame_count = self.frame_count,
1038 frames_written = frame_count,
1039 "WAL frames appended in batch"
1040 );
1041
1042 for _ in 0..frame_count {
1043 crate::metrics::GLOBAL_WAL_METRICS.record_frame_write(bytes_per_frame);
1044 }
1045
1046 Ok(())
1047 }
1048
1049 pub fn append_prepared_frame_bytes(
1056 &mut self,
1057 cx: &Cx,
1058 prepared_frame_bytes: &mut [u8],
1059 frame_transforms: &[WalChecksumTransform],
1060 ) -> Result<()> {
1061 let frame_count = frame_transforms.len();
1062 if frame_count == 0 {
1063 return Ok(());
1064 }
1065
1066 let new_count = self
1067 .frame_count
1068 .checked_add(frame_count)
1069 .ok_or(FrankenError::DatabaseFull)?;
1070 if new_count > usize::try_from(u32::MAX).unwrap_or(usize::MAX) {
1071 return Err(FrankenError::DatabaseFull);
1072 }
1073
1074 let frame_size = self.frame_size();
1075 let running_checksum =
1076 self.finalize_prepared_frame_bytes(prepared_frame_bytes, frame_transforms)?;
1077 let last_commit_offset = prepared_frame_bytes
1078 .chunks_exact(frame_size)
1079 .enumerate()
1080 .rev()
1081 .find_map(|(offset, frame_slice)| {
1082 let db_size_if_commit = u32::from_be_bytes([
1083 frame_slice[4],
1084 frame_slice[5],
1085 frame_slice[6],
1086 frame_slice[7],
1087 ]);
1088 (db_size_if_commit != 0).then_some(offset)
1089 });
1090 self.append_finalized_prepared_frame_bytes(
1091 cx,
1092 prepared_frame_bytes,
1093 frame_count,
1094 running_checksum,
1095 last_commit_offset,
1096 )
1097 }
1098
1099 pub fn append_frames(&mut self, cx: &Cx, frames: &[WalAppendFrameRef<'_>]) -> Result<()> {
1105 self.append_frame_iter(cx, frames.len(), frames.iter().copied())
1106 }
1107
1108 pub(crate) fn append_frame_iter<'a, I>(
1111 &mut self,
1112 cx: &Cx,
1113 frame_count: usize,
1114 frames: I,
1115 ) -> Result<()>
1116 where
1117 I: IntoIterator<Item = WalAppendFrameRef<'a>>,
1118 {
1119 if frame_count == 0 {
1120 return Ok(());
1121 }
1122
1123 #[cfg(any(test, feature = "fault-injection"))]
1124 crate::fault_hooks::maybe_inject_append_busy(self.frame_count, frame_count)?;
1125
1126 let frame_size = self.frame_size();
1127 let total_bytes = frame_count
1128 .checked_mul(frame_size)
1129 .ok_or(FrankenError::DatabaseFull)?;
1130 let page_size = self.page_size;
1131 let salts = self.header.salts;
1132 let big_endian_checksum = self.big_endian_checksum;
1133 #[cfg(any(test, feature = "fault-injection"))]
1134 let frame_count_before = self.frame_count;
1135
1136 let mut frame_scratch = std::mem::take(&mut self.frame_scratch);
1137 frame_scratch.clear();
1138 if frame_scratch.capacity() < total_bytes {
1139 frame_scratch.reserve(total_bytes - frame_scratch.capacity());
1140 }
1141 let append_result = (|| -> Result<()> {
1146 let mut running_checksum = self.running_checksum;
1147 let mut last_commit_offset: Option<usize> = None;
1148 let mut observed_frame_count = 0usize;
1149
1150 for (idx, frame) in frames.into_iter().enumerate() {
1151 if idx >= frame_count {
1152 return Err(FrankenError::WalCorrupt {
1153 detail: format!(
1154 "append batch frame count mismatch: expected {frame_count}, got more than declared"
1155 ),
1156 });
1157 }
1158 if frame.page_data.len() != page_size {
1159 return Err(FrankenError::WalCorrupt {
1160 detail: format!(
1161 "page data size mismatch in batch frame {idx}: expected {page_size}, got {}",
1162 frame.page_data.len()
1163 ),
1164 });
1165 }
1166
1167 let buf_offset = idx
1168 .checked_mul(frame_size)
1169 .ok_or(FrankenError::DatabaseFull)?;
1170
1171 push_wal_frame_bytes(
1173 &mut frame_scratch,
1174 frame.page_number,
1175 frame.db_size_if_commit,
1176 salts,
1177 frame.page_data,
1178 );
1179 let frame_slice = &mut frame_scratch[buf_offset..buf_offset + frame_size];
1180
1181 running_checksum = write_wal_frame_checksum(
1183 frame_slice,
1184 page_size,
1185 running_checksum,
1186 big_endian_checksum,
1187 )?;
1188
1189 if frame.db_size_if_commit != 0 {
1190 last_commit_offset = Some(idx);
1191 }
1192 observed_frame_count = idx + 1;
1193 }
1194
1195 if observed_frame_count != frame_count {
1196 return Err(FrankenError::WalCorrupt {
1197 detail: format!(
1198 "append batch frame count mismatch: expected {frame_count}, got {observed_frame_count}"
1199 ),
1200 });
1201 }
1202
1203 self.append_finalized_prepared_frame_bytes(
1204 cx,
1205 &frame_scratch,
1206 frame_count,
1207 running_checksum,
1208 last_commit_offset,
1209 )
1210 })();
1211 self.frame_scratch = frame_scratch;
1212
1213 #[cfg(any(test, feature = "fault-injection"))]
1214 if append_result.is_ok() {
1215 crate::fault_hooks::maybe_inject_after_append(frame_count_before, frame_count)?;
1216 }
1217
1218 append_result
1219 }
1220
1221 pub fn read_frame(&self, cx: &Cx, frame_index: usize) -> Result<(WalFrameHeader, Vec<u8>)> {
1223 let frame_size = self.frame_size();
1224 let mut buf = vec![0u8; frame_size];
1225 let header = self.read_frame_into(cx, frame_index, &mut buf)?;
1226 let page_data = buf[WAL_FRAME_HEADER_SIZE..].to_vec();
1227 Ok((header, page_data))
1228 }
1229
1230 pub fn read_frame_into(
1236 &self,
1237 cx: &Cx,
1238 frame_index: usize,
1239 buf: &mut [u8],
1240 ) -> Result<WalFrameHeader> {
1241 if frame_index >= self.frame_count {
1242 return Err(FrankenError::WalCorrupt {
1243 detail: format!(
1244 "frame index {frame_index} out of range (count: {})",
1245 self.frame_count
1246 ),
1247 });
1248 }
1249
1250 let frame_size = self.frame_size();
1251 if buf.len() < frame_size {
1252 return Err(FrankenError::Internal(format!(
1253 "read_frame_into buffer too small: got {}, need {}",
1254 buf.len(),
1255 frame_size
1256 )));
1257 }
1258
1259 let offset = self.frame_offset(frame_index);
1260 let bytes_read = self.file.read(cx, &mut buf[..frame_size], offset)?;
1261 if bytes_read < frame_size {
1262 return Err(FrankenError::WalCorrupt {
1263 detail: format!(
1264 "short read at frame {frame_index}: got {bytes_read}, need {frame_size}"
1265 ),
1266 });
1267 }
1268
1269 WalFrameHeader::from_bytes(&buf[..WAL_FRAME_HEADER_SIZE])
1270 }
1271
1272 pub fn read_frame_header(&self, cx: &Cx, frame_index: usize) -> Result<WalFrameHeader> {
1274 if frame_index >= self.frame_count {
1275 return Err(FrankenError::WalCorrupt {
1276 detail: format!(
1277 "frame index {frame_index} out of range (count: {})",
1278 self.frame_count
1279 ),
1280 });
1281 }
1282
1283 let mut header_buf = [0u8; WAL_FRAME_HEADER_SIZE];
1284 let offset = self.frame_offset(frame_index);
1285 let bytes_read = self.file.read(cx, &mut header_buf, offset)?;
1286 if bytes_read < WAL_FRAME_HEADER_SIZE {
1287 return Err(FrankenError::WalCorrupt {
1288 detail: format!("short header read at frame {frame_index}: got {bytes_read}"),
1289 });
1290 }
1291
1292 WalFrameHeader::from_bytes(&header_buf)
1293 }
1294
1295 pub fn last_commit_frame(&mut self, cx: &Cx) -> Result<Option<usize>> {
1297 let _ = cx;
1298 Ok(self.last_commit_frame)
1299 }
1300
1301 pub fn sync(&mut self, cx: &Cx, flags: SyncFlags) -> Result<()> {
1303 #[cfg(any(test, feature = "fault-injection"))]
1304 crate::fault_hooks::maybe_inject_sync_failure(self.frame_count, flags)?;
1305
1306 self.file.sync(cx, flags)
1307 }
1308
1309 pub fn durable_sync(&mut self, cx: &Cx, kind: SyncKind) -> Result<()> {
1315 #[cfg(any(test, feature = "fault-injection"))]
1316 {
1317 let flags = match kind {
1318 SyncKind::DataOnly => SyncFlags::DATAONLY,
1319 SyncKind::DataAndMetadata | SyncKind::FullDurable => SyncFlags::FULL,
1320 };
1321 crate::fault_hooks::maybe_inject_sync_failure(self.frame_count, flags)?;
1322 }
1323
1324 self.file.durable_sync(cx, kind)?;
1325 self.last_fsynced_frame_count = self.frame_count;
1326
1327 debug!(
1328 target: "fsqlite_wal::durability",
1329 fsynced_up_to = self.frame_count,
1330 kind = ?kind,
1331 "WAL durable sync complete"
1332 );
1333
1334 #[cfg(any(test, feature = "fault-injection"))]
1335 crate::fault_hooks::maybe_inject_crash_at(
1336 crate::fault_hooks::CrashBoundary::AfterFsyncBeforePublish,
1337 &format!("fsynced_up_to={}", self.frame_count),
1338 )?;
1339
1340 Ok(())
1341 }
1342
1343 pub fn assert_publish_safe(&self, publish_frame_count: usize) -> Result<()> {
1349 if self.last_fsynced_frame_count >= publish_frame_count {
1350 return Ok(());
1351 }
1352
1353 let msg = format!(
1354 "publish-before-fsync: attempting to publish frame_count={publish_frame_count} \
1355 but last fsynced only up to {fsynced}",
1356 fsynced = self.last_fsynced_frame_count,
1357 );
1358
1359 debug_assert!(false, "WAL durability invariant violated: {msg}");
1360
1361 if *PARANOID_DURABILITY {
1362 error!(
1363 target: "fsqlite_wal::durability",
1364 publish_frame_count,
1365 last_fsynced = self.last_fsynced_frame_count,
1366 "PARANOID_DURABILITY: publish-before-fsync detected"
1367 );
1368 return Err(FrankenError::Internal(msg));
1369 }
1370
1371 Ok(())
1372 }
1373
1374 #[must_use]
1376 pub fn last_fsynced_frame_count(&self) -> usize {
1377 self.last_fsynced_frame_count
1378 }
1379
1380 pub fn reset(
1386 &mut self,
1387 cx: &Cx,
1388 new_checkpoint_seq: u32,
1389 new_salts: WalSalts,
1390 truncate_file: bool,
1391 ) -> Result<()> {
1392 let new_header = WalHeader {
1393 magic: self.header.magic,
1394 format_version: WAL_FORMAT_VERSION,
1395 page_size: self.header.page_size,
1396 checkpoint_seq: new_checkpoint_seq,
1397 salts: new_salts,
1398 checksum: SqliteWalChecksum::default(),
1399 };
1400 let header_bytes = new_header.to_bytes()?;
1401
1402 #[cfg(any(test, feature = "fault-injection"))]
1403 crate::fault_hooks::maybe_inject_crash_at(
1404 crate::fault_hooks::CrashBoundary::BeforeWalHeaderWrite,
1405 &format!("checkpoint_seq={new_checkpoint_seq}"),
1406 )?;
1407
1408 self.file.write(cx, &header_bytes, 0)?;
1409
1410 #[cfg(any(test, feature = "fault-injection"))]
1415 {
1416 let old_fc = self.frame_count;
1417 crate::fault_hooks::maybe_inject_crash_header_truncate(old_fc, new_checkpoint_seq)?;
1418 }
1419
1420 if truncate_file {
1421 self.file.truncate(
1422 cx,
1423 u64::try_from(WAL_HEADER_SIZE).expect("header size fits u64"),
1424 )?;
1425 }
1426
1427 self.file.sync(cx, SyncFlags::NORMAL)?;
1430
1431 self.running_checksum = read_wal_header_checksum(&header_bytes)?;
1432 self.header = WalHeader::from_bytes(&header_bytes)?;
1433 self.frame_count = 0;
1434 self.last_commit_frame = None;
1435 self.last_fsynced_frame_count = 0;
1436 self.frame_scratch.clear();
1437 crate::metrics::GLOBAL_WAL_METRICS.set_wal_frames_current(0);
1438
1439 debug!(
1440 checkpoint_seq = new_checkpoint_seq,
1441 salt1 = new_salts.salt1,
1442 salt2 = new_salts.salt2,
1443 "WAL reset"
1444 );
1445
1446 crate::metrics::GLOBAL_WAL_METRICS.record_wal_reset();
1447
1448 Ok(())
1449 }
1450
1451 pub fn close(mut self, cx: &Cx) -> Result<()> {
1453 self.file.close(cx)
1454 }
1455
1456 #[must_use]
1458 pub fn file(&self) -> &F {
1459 &self.file
1460 }
1461
1462 pub fn file_mut(&mut self) -> &mut F {
1464 &mut self.file
1465 }
1466}
1467
1468#[cfg(test)]
1469mod tests {
1470 use std::sync::Mutex;
1471 use std::time::Instant;
1472
1473 use fsqlite_types::flags::VfsOpenFlags;
1474 use fsqlite_vfs::MemoryVfs;
1475 use fsqlite_vfs::traits::Vfs;
1476 use serde_json::{Value, json};
1477
1478 use super::*;
1479
1480 static FAULT_TEST_LOCK: Mutex<()> = Mutex::new(());
1484
1485 const PAGE_SIZE: u32 = 4096;
1486 const TRACK_C_SCRATCH_BENCH_BEAD_ID: &str = "bd-db300.3.4.3";
1487 const TRACK_C_SCRATCH_BENCH_WARMUP_ITERS: usize = 4;
1488 const TRACK_C_SCRATCH_BENCH_MEASURE_ITERS: usize = 12;
1489
1490 #[derive(Clone, Copy)]
1491 enum TrackCScratchBenchMode {
1492 FreshAllocBaseline,
1493 ScratchReuseCandidate,
1494 }
1495
1496 #[derive(Clone, Copy)]
1497 struct TrackCScratchBenchRun {
1498 elapsed_ns: u64,
1499 explicit_fresh_buffer_allocations: usize,
1500 scratch_capacity_growth_events: usize,
1501 peak_scratch_capacity_bytes: usize,
1502 frame_buffer_bytes_per_operation: usize,
1503 operations_per_sample: usize,
1504 }
1505
1506 fn test_cx() -> Cx {
1507 Cx::default()
1508 }
1509
1510 fn test_salts() -> WalSalts {
1511 WalSalts {
1512 salt1: 0xDEAD_BEEF,
1513 salt2: 0xCAFE_BABE,
1514 }
1515 }
1516
1517 fn sample_page(seed: u8) -> Vec<u8> {
1518 let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
1519 let mut page = vec![0u8; page_size];
1520 for (i, byte) in page.iter_mut().enumerate() {
1521 let reduced = u8::try_from(i % 251).expect("modulo fits u8");
1522 *byte = reduced ^ seed;
1523 }
1524 page
1525 }
1526
1527 fn frame_ref(
1528 page_number: u32,
1529 page_data: &[u8],
1530 db_size_if_commit: u32,
1531 ) -> WalAppendFrameRef<'_> {
1532 WalAppendFrameRef {
1533 page_number,
1534 page_data,
1535 db_size_if_commit,
1536 }
1537 }
1538
1539 fn open_wal_file(vfs: &MemoryVfs, cx: &Cx) -> <MemoryVfs as Vfs>::File {
1540 let flags = VfsOpenFlags::READWRITE | VfsOpenFlags::CREATE | VfsOpenFlags::WAL;
1541 let (file, _) = vfs
1542 .open(cx, Some(std::path::Path::new("test.db-wal")), flags)
1543 .expect("open WAL file");
1544 file
1545 }
1546
1547 fn track_c_scratch_run_summary(runs: &[TrackCScratchBenchRun]) -> Value {
1548 let mut elapsed_samples: Vec<_> = runs.iter().map(|run| run.elapsed_ns).collect();
1549 elapsed_samples.sort_unstable();
1550 let sample_count = elapsed_samples.len();
1551 let min_ns = elapsed_samples.first().copied().unwrap_or(0);
1552 let median_ns = if sample_count == 0 {
1553 0
1554 } else {
1555 elapsed_samples[sample_count / 2]
1556 };
1557 let max_ns = elapsed_samples.last().copied().unwrap_or(0);
1558 let mean_ns = if sample_count == 0 {
1559 0.0
1560 } else {
1561 let total_ns: u128 = elapsed_samples.iter().map(|ns| u128::from(*ns)).sum();
1562 (total_ns as f64) / (sample_count as f64)
1563 };
1564 let explicit_fresh_buffer_allocations = runs
1565 .iter()
1566 .map(|run| run.explicit_fresh_buffer_allocations)
1567 .max()
1568 .unwrap_or(0);
1569 let scratch_capacity_growth_events = runs
1570 .iter()
1571 .map(|run| run.scratch_capacity_growth_events)
1572 .max()
1573 .unwrap_or(0);
1574 let peak_scratch_capacity_bytes = runs
1575 .iter()
1576 .map(|run| run.peak_scratch_capacity_bytes)
1577 .max()
1578 .unwrap_or(0);
1579 let frame_buffer_bytes_per_operation = runs
1580 .first()
1581 .map(|run| run.frame_buffer_bytes_per_operation)
1582 .unwrap_or(0);
1583 let operations_per_sample = runs
1584 .first()
1585 .map(|run| run.operations_per_sample)
1586 .unwrap_or(0);
1587
1588 json!({
1589 "samples_ns": elapsed_samples,
1590 "min_ns": min_ns,
1591 "median_ns": median_ns,
1592 "max_ns": max_ns,
1593 "mean_ns": mean_ns,
1594 "explicit_fresh_buffer_allocations_per_sample": explicit_fresh_buffer_allocations,
1595 "scratch_capacity_growth_events_per_sample": scratch_capacity_growth_events,
1596 "peak_scratch_capacity_bytes": peak_scratch_capacity_bytes,
1597 "frame_buffer_bytes_per_operation": frame_buffer_bytes_per_operation,
1598 "operations_per_sample": operations_per_sample,
1599 "frame_buffer_bytes_requested_per_sample": explicit_fresh_buffer_allocations
1600 .saturating_mul(frame_buffer_bytes_per_operation),
1601 })
1602 }
1603
1604 fn append_frames_fresh_alloc<F: VfsFile>(
1605 wal: &mut WalFile<F>,
1606 cx: &Cx,
1607 frames: &[WalAppendFrameRef<'_>],
1608 ) -> Result<()> {
1609 let frame_size = wal.frame_size();
1610 let mut frame_buf = wal.prepare_frame_bytes(frames)?;
1611 let frame_transforms = frame_buf
1612 .chunks_exact(frame_size)
1613 .map(|frame| {
1614 WalChecksumTransform::for_wal_frame(
1615 frame,
1616 wal.page_size(),
1617 wal.big_endian_checksum(),
1618 )
1619 })
1620 .collect::<Result<Vec<_>>>()?;
1621 wal.append_prepared_frame_bytes(cx, &mut frame_buf, &frame_transforms)
1622 }
1623
1624 fn track_c_measure_single_frame_case(
1625 mode: TrackCScratchBenchMode,
1626 operations: usize,
1627 ) -> TrackCScratchBenchRun {
1628 let cx = test_cx();
1629 let vfs = MemoryVfs::new();
1630 let file = open_wal_file(&vfs, &cx);
1631 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1632 let pages: Vec<Vec<u8>> = (0..operations)
1633 .map(|i| sample_page(u8::try_from(i % 251).expect("modulo fits u8")))
1634 .collect();
1635 let frame_buffer_bytes_per_operation = wal.frame_size();
1636 let mut explicit_fresh_buffer_allocations = 0usize;
1637 let mut scratch_capacity_growth_events = 0usize;
1638 let mut previous_scratch_capacity = 0usize;
1639
1640 let start = Instant::now();
1641 for (idx, page) in pages.iter().enumerate() {
1642 let page_number = u32::try_from(idx).expect("index fits u32") + 1;
1643 match mode {
1644 TrackCScratchBenchMode::FreshAllocBaseline => {
1645 let frames = [WalAppendFrameRef {
1646 page_number,
1647 page_data: page,
1648 db_size_if_commit: page_number,
1649 }];
1650 append_frames_fresh_alloc(&mut wal, &cx, &frames).expect("append baseline");
1651 explicit_fresh_buffer_allocations += 1;
1652 }
1653 TrackCScratchBenchMode::ScratchReuseCandidate => {
1654 wal.append_frame(&cx, page_number, page, page_number)
1655 .expect("append candidate");
1656 let scratch_capacity = wal.frame_scratch_capacity();
1657 if scratch_capacity > previous_scratch_capacity {
1658 scratch_capacity_growth_events += 1;
1659 previous_scratch_capacity = scratch_capacity;
1660 }
1661 }
1662 }
1663 }
1664
1665 TrackCScratchBenchRun {
1666 elapsed_ns: u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX),
1667 explicit_fresh_buffer_allocations,
1668 scratch_capacity_growth_events,
1669 peak_scratch_capacity_bytes: wal.frame_scratch_capacity(),
1670 frame_buffer_bytes_per_operation,
1671 operations_per_sample: operations,
1672 }
1673 }
1674
1675 fn track_c_measure_batch_case<const N: usize>(
1676 mode: TrackCScratchBenchMode,
1677 operations: usize,
1678 ) -> TrackCScratchBenchRun {
1679 let cx = test_cx();
1680 let vfs = MemoryVfs::new();
1681 let file = open_wal_file(&vfs, &cx);
1682 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
1683 let pages_per_operation: Vec<[Vec<u8>; N]> = (0..operations)
1684 .map(|operation_idx| {
1685 std::array::from_fn(|frame_idx| {
1686 sample_page(
1687 u8::try_from((operation_idx * N + frame_idx) % 251)
1688 .expect("modulo fits u8"),
1689 )
1690 })
1691 })
1692 .collect();
1693 let frame_buffer_bytes_per_operation = wal
1694 .frame_size()
1695 .checked_mul(N)
1696 .expect("frame bytes per operation fit usize");
1697 let mut explicit_fresh_buffer_allocations = 0usize;
1698 let mut scratch_capacity_growth_events = 0usize;
1699 let mut previous_scratch_capacity = 0usize;
1700
1701 let start = Instant::now();
1702 for (operation_idx, pages) in pages_per_operation.iter().enumerate() {
1703 let page_base = u32::try_from(
1704 operation_idx
1705 .checked_mul(N)
1706 .expect("operation frame base fits usize"),
1707 )
1708 .expect("frame base fits u32")
1709 + 1;
1710 let commit_db_size = page_base + u32::try_from(N).expect("N fits u32") - 1;
1711 let frames: [WalAppendFrameRef<'_>; N] =
1712 std::array::from_fn(|frame_idx| WalAppendFrameRef {
1713 page_number: page_base
1714 + u32::try_from(frame_idx).expect("frame index fits u32"),
1715 page_data: &pages[frame_idx],
1716 db_size_if_commit: if frame_idx + 1 == N {
1717 commit_db_size
1718 } else {
1719 0
1720 },
1721 });
1722
1723 match mode {
1724 TrackCScratchBenchMode::FreshAllocBaseline => {
1725 append_frames_fresh_alloc(&mut wal, &cx, &frames).expect("append baseline");
1726 explicit_fresh_buffer_allocations += 1;
1727 }
1728 TrackCScratchBenchMode::ScratchReuseCandidate => {
1729 wal.append_frames(&cx, &frames).expect("append candidate");
1730 let scratch_capacity = wal.frame_scratch_capacity();
1731 if scratch_capacity > previous_scratch_capacity {
1732 scratch_capacity_growth_events += 1;
1733 previous_scratch_capacity = scratch_capacity;
1734 }
1735 }
1736 }
1737 }
1738
1739 TrackCScratchBenchRun {
1740 elapsed_ns: u64::try_from(start.elapsed().as_nanos()).unwrap_or(u64::MAX),
1741 explicit_fresh_buffer_allocations,
1742 scratch_capacity_growth_events,
1743 peak_scratch_capacity_bytes: wal.frame_scratch_capacity(),
1744 frame_buffer_bytes_per_operation,
1745 operations_per_sample: operations,
1746 }
1747 }
1748
1749 fn track_c_scratch_case_report(
1750 scenario_id: &str,
1751 frames_per_operation: usize,
1752 operations_per_sample: usize,
1753 baseline_measure: impl Fn() -> TrackCScratchBenchRun,
1754 candidate_measure: impl Fn() -> TrackCScratchBenchRun,
1755 ) -> Value {
1756 for _ in 0..TRACK_C_SCRATCH_BENCH_WARMUP_ITERS {
1757 let _ = baseline_measure();
1758 let _ = candidate_measure();
1759 }
1760
1761 let baseline_runs: Vec<_> = (0..TRACK_C_SCRATCH_BENCH_MEASURE_ITERS)
1762 .map(|_| baseline_measure())
1763 .collect();
1764 let candidate_runs: Vec<_> = (0..TRACK_C_SCRATCH_BENCH_MEASURE_ITERS)
1765 .map(|_| candidate_measure())
1766 .collect();
1767 let baseline_summary = track_c_scratch_run_summary(&baseline_runs);
1768 let candidate_summary = track_c_scratch_run_summary(&candidate_runs);
1769 let baseline_median = baseline_summary["median_ns"].as_u64().unwrap_or(0);
1770 let candidate_median = candidate_summary["median_ns"].as_u64().unwrap_or(0);
1771 let baseline_allocations = baseline_summary["explicit_fresh_buffer_allocations_per_sample"]
1772 .as_u64()
1773 .unwrap_or(0);
1774 let candidate_growths = candidate_summary["scratch_capacity_growth_events_per_sample"]
1775 .as_u64()
1776 .unwrap_or(0);
1777 let baseline_requested_bytes = baseline_summary["frame_buffer_bytes_requested_per_sample"]
1778 .as_u64()
1779 .unwrap_or(0);
1780 let candidate_peak_scratch_bytes = candidate_summary["peak_scratch_capacity_bytes"]
1781 .as_u64()
1782 .unwrap_or(0);
1783
1784 json!({
1785 "scenario_id": scenario_id,
1786 "frames_per_operation": frames_per_operation,
1787 "operations_per_sample": operations_per_sample,
1788 "fresh_alloc_baseline": baseline_summary,
1789 "scratch_reuse_candidate": candidate_summary,
1790 "fresh_buffer_allocations_avoided_per_sample": baseline_allocations.saturating_sub(candidate_growths),
1791 "buffer_bytes_saved_vs_fresh_requested_per_sample": baseline_requested_bytes.saturating_sub(candidate_peak_scratch_bytes),
1792 "speedup_vs_baseline_median": if candidate_median == 0 {
1793 0.0
1794 } else {
1795 (baseline_median as f64) / (candidate_median as f64)
1796 },
1797 "faster_variant_by_median": if candidate_median <= baseline_median {
1798 "scratch_reuse_candidate"
1799 } else {
1800 "fresh_alloc_baseline"
1801 },
1802 })
1803 }
1804
1805 #[test]
1806 fn test_create_and_open_empty_wal() {
1807 let cx = test_cx();
1808 let vfs = MemoryVfs::new();
1809 let file = open_wal_file(&vfs, &cx);
1810
1811 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1812 assert_eq!(wal.frame_count(), 0);
1813 assert_eq!(wal.page_size(), usize::try_from(PAGE_SIZE).unwrap());
1814 assert!(!wal.big_endian_checksum());
1815 assert_eq!(wal.header().checkpoint_seq, 0);
1816 assert_eq!(wal.header().salts, test_salts());
1817
1818 wal.close(&cx).expect("close WAL");
1819
1820 let file2 = open_wal_file(&vfs, &cx);
1822 let wal2 = WalFile::open(&cx, file2).expect("open WAL");
1823 assert_eq!(wal2.frame_count(), 0);
1824 assert_eq!(wal2.header().salts, test_salts());
1825
1826 wal2.close(&cx).expect("close WAL");
1827 }
1828
1829 #[test]
1830 fn test_append_and_read_single_frame() {
1831 let cx = test_cx();
1832 let vfs = MemoryVfs::new();
1833 let file = open_wal_file(&vfs, &cx);
1834
1835 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 1, test_salts()).expect("create WAL");
1836
1837 let page = sample_page(0x42);
1838 wal.append_frame(&cx, 1, &page, 0).expect("append frame");
1839 assert_eq!(wal.frame_count(), 1);
1840
1841 let (header, data) = wal.read_frame(&cx, 0).expect("read frame");
1842 assert_eq!(header.page_number, 1);
1843 assert_eq!(header.db_size, 0);
1844 assert_eq!(header.salts, test_salts());
1845 assert_eq!(data, page);
1846
1847 wal.close(&cx).expect("close WAL");
1848 }
1849
1850 #[test]
1851 fn test_fault_hook_after_wal_append_returns_error_and_records_context() {
1852 let _guard = FAULT_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
1853 crate::fault_hooks::clear();
1854
1855 let cx = test_cx();
1856 let vfs = MemoryVfs::new();
1857 let file = open_wal_file(&vfs, &cx);
1858 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 1, test_salts()).expect("create WAL");
1859 let page = sample_page(0x33);
1860 let frames = [WalAppendFrameRef {
1861 page_number: 1,
1862 page_data: &page,
1863 db_size_if_commit: 1,
1864 }];
1865
1866 crate::fault_hooks::arm_after_append(crate::fault_hooks::FaultHookArm::new(
1867 "bd-db300.7.2.2-after-append",
1868 "WAL-AFTER-APPEND",
1869 "wal_append_recovery",
1870 ));
1871
1872 let error = wal
1873 .append_frames(&cx, &frames)
1874 .expect_err("fault hook should force an error after append");
1875 assert!(
1876 error.to_string().contains("fault_inject:wal_after_append"),
1877 "fault error should identify the append hook: {error}"
1878 );
1879 assert_eq!(
1880 wal.frame_count(),
1881 1,
1882 "append should still have reached the WAL"
1883 );
1884
1885 wal.close(&cx).expect("close WAL");
1886 let reopened_file = open_wal_file(&vfs, &cx);
1887 let reopened = WalFile::open(&cx, reopened_file).expect("reopen WAL");
1888 assert_eq!(
1889 reopened.frame_count(),
1890 1,
1891 "reopened WAL should preserve the appended frame for later recovery checks"
1892 );
1893
1894 let records = crate::fault_hooks::take_records();
1895 assert_eq!(
1896 records.len(),
1897 1,
1898 "exactly one append fault should be recorded"
1899 );
1900 assert_eq!(records[0].point, "wal_after_append");
1901 assert_eq!(records[0].run_id, "bd-db300.7.2.2-after-append");
1902 assert_eq!(records[0].scenario_id, "WAL-AFTER-APPEND");
1903 assert_eq!(records[0].invariant_family, "wal_append_recovery");
1904 assert!(
1905 records[0].detail.contains("appended_frames=1"),
1906 "record should preserve append context: {}",
1907 records[0].detail
1908 );
1909
1910 crate::fault_hooks::clear();
1911 }
1912
1913 #[test]
1914 fn test_append_commit_frame() {
1915 let cx = test_cx();
1916 let vfs = MemoryVfs::new();
1917 let file = open_wal_file(&vfs, &cx);
1918
1919 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1920
1921 let page = sample_page(0x10);
1922 wal.append_frame(&cx, 5, &page, 10)
1923 .expect("append commit frame");
1924
1925 let header = wal.read_frame_header(&cx, 0).expect("read header");
1926 assert!(header.is_commit());
1927 assert_eq!(header.db_size, 10);
1928 assert_eq!(header.page_number, 5);
1929
1930 wal.close(&cx).expect("close WAL");
1931 }
1932
1933 #[test]
1934 fn test_multi_frame_checksum_chain() {
1935 let cx = test_cx();
1936 let vfs = MemoryVfs::new();
1937 let file = open_wal_file(&vfs, &cx);
1938
1939 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 3, test_salts()).expect("create WAL");
1940
1941 for i in 0..5u32 {
1943 let page = sample_page(u8::try_from(i).expect("fits"));
1944 let db_size = if i == 4 { 5 } else { 0 };
1945 wal.append_frame(&cx, i + 1, &page, db_size)
1946 .expect("append frame");
1947 }
1948 assert_eq!(wal.frame_count(), 5);
1949
1950 wal.close(&cx).expect("close WAL");
1951
1952 let file2 = open_wal_file(&vfs, &cx);
1954 let wal2 = WalFile::open(&cx, file2).expect("open WAL");
1955 assert_eq!(wal2.frame_count(), 5);
1956
1957 for i in 0..5u32 {
1959 let (header, data) = wal2
1960 .read_frame(&cx, usize::try_from(i).unwrap())
1961 .expect("read frame");
1962 assert_eq!(header.page_number, i + 1);
1963 let expected = sample_page(u8::try_from(i).expect("fits"));
1964 assert_eq!(data, expected);
1965 }
1966
1967 let last_header = wal2.read_frame_header(&cx, 4).expect("read header");
1969 assert!(last_header.is_commit());
1970 assert_eq!(last_header.db_size, 5);
1971
1972 wal2.close(&cx).expect("close WAL");
1973 }
1974
1975 #[test]
1976 fn test_last_commit_frame() {
1977 let cx = test_cx();
1978 let vfs = MemoryVfs::new();
1979 let file = open_wal_file(&vfs, &cx);
1980
1981 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
1982
1983 assert_eq!(wal.last_commit_frame(&cx).expect("query"), None);
1985
1986 wal.append_frame(&cx, 1, &sample_page(1), 0)
1988 .expect("append");
1989 assert_eq!(wal.last_commit_frame(&cx).expect("query"), None);
1990
1991 wal.append_frame(&cx, 2, &sample_page(2), 3)
1993 .expect("append");
1994 assert_eq!(wal.last_commit_frame(&cx).expect("query"), Some(1));
1995
1996 wal.append_frame(&cx, 3, &sample_page(3), 0)
1998 .expect("append");
1999 wal.append_frame(&cx, 4, &sample_page(4), 5)
2000 .expect("append");
2001 assert_eq!(wal.last_commit_frame(&cx).expect("query"), Some(3));
2002
2003 wal.close(&cx).expect("close WAL");
2004 }
2005
2006 #[test]
2007 fn test_reset_clears_frames() {
2008 let cx = test_cx();
2009 let vfs = MemoryVfs::new();
2010 let file = open_wal_file(&vfs, &cx);
2011
2012 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2013
2014 for i in 0..3u8 {
2016 let db_size = if i == 2 { 3 } else { 0 };
2017 wal.append_frame(&cx, u32::from(i) + 1, &sample_page(i), db_size)
2018 .expect("append");
2019 }
2020 assert_eq!(wal.frame_count(), 3);
2021
2022 let new_salts = WalSalts {
2024 salt1: 0x1111_2222,
2025 salt2: 0x3333_4444,
2026 };
2027 wal.reset(&cx, 1, new_salts, true).expect("reset");
2028 assert_eq!(wal.frame_count(), 0);
2029 assert_eq!(wal.last_commit_frame(&cx).expect("query"), None);
2030 assert_eq!(wal.header().checkpoint_seq, 1);
2031 assert_eq!(wal.header().salts, new_salts);
2032
2033 wal.append_frame(&cx, 10, &sample_page(0xAA), 1)
2035 .expect("append after reset");
2036 assert_eq!(wal.frame_count(), 1);
2037 assert_eq!(wal.last_commit_frame(&cx).expect("query"), Some(0));
2038
2039 wal.close(&cx).expect("close WAL");
2040
2041 let file2 = open_wal_file(&vfs, &cx);
2043 let wal2 = WalFile::open(&cx, file2).expect("open WAL");
2044 assert_eq!(wal2.frame_count(), 1);
2045 assert_eq!(wal2.header().checkpoint_seq, 1);
2046 assert_eq!(wal2.header().salts, new_salts);
2047
2048 wal2.close(&cx).expect("close WAL");
2049 }
2050
2051 #[test]
2052 fn test_page_size_mismatch_rejected() {
2053 let cx = test_cx();
2054 let vfs = MemoryVfs::new();
2055 let file = open_wal_file(&vfs, &cx);
2056
2057 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2058
2059 let short_page = vec![0u8; 100];
2061 let result = wal.append_frame(&cx, 1, &short_page, 0);
2062 assert!(result.is_err());
2063
2064 let long_page = vec![0u8; 8192];
2065 let result = wal.append_frame(&cx, 1, &long_page, 0);
2066 assert!(result.is_err());
2067
2068 wal.close(&cx).expect("close WAL");
2069 }
2070
2071 #[test]
2072 fn test_frame_index_out_of_range() {
2073 let cx = test_cx();
2074 let vfs = MemoryVfs::new();
2075 let file = open_wal_file(&vfs, &cx);
2076
2077 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2078
2079 assert!(wal.read_frame(&cx, 0).is_err());
2081 assert!(wal.read_frame_header(&cx, 0).is_err());
2082
2083 wal.append_frame(&cx, 1, &sample_page(0), 0)
2085 .expect("append");
2086 assert!(wal.read_frame(&cx, 0).is_ok());
2087 assert!(wal.read_frame(&cx, 1).is_err());
2088
2089 wal.close(&cx).expect("close WAL");
2090 }
2091
2092 #[test]
2093 fn test_reopen_preserves_checksum_chain() {
2094 let cx = test_cx();
2095 let vfs = MemoryVfs::new();
2096 let file = open_wal_file(&vfs, &cx);
2097
2098 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2099
2100 for i in 0..3u8 {
2102 let db_size = if i == 2 { 3 } else { 0 };
2103 wal.append_frame(&cx, u32::from(i) + 1, &sample_page(i), db_size)
2104 .expect("append");
2105 }
2106 let checksum_after_3 = wal.running_checksum();
2107 wal.close(&cx).expect("close WAL");
2108
2109 let file2 = open_wal_file(&vfs, &cx);
2111 let mut wal2 = WalFile::open(&cx, file2).expect("open WAL");
2112 assert_eq!(wal2.frame_count(), 3);
2113 assert_eq!(wal2.running_checksum(), checksum_after_3);
2114
2115 wal2.append_frame(&cx, 4, &sample_page(3), 0)
2116 .expect("append");
2117 wal2.append_frame(&cx, 5, &sample_page(4), 5)
2118 .expect("append commit");
2119 assert_eq!(wal2.frame_count(), 5);
2120 wal2.close(&cx).expect("close WAL");
2121
2122 let file3 = open_wal_file(&vfs, &cx);
2124 let wal3 = WalFile::open(&cx, file3).expect("open WAL");
2125 assert_eq!(wal3.frame_count(), 5);
2126 wal3.close(&cx).expect("close WAL");
2127 }
2128
2129 #[test]
2130 fn test_sync_does_not_panic() {
2131 let cx = test_cx();
2132 let vfs = MemoryVfs::new();
2133 let file = open_wal_file(&vfs, &cx);
2134
2135 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2136 wal.append_frame(&cx, 1, &sample_page(0), 1)
2137 .expect("append");
2138 wal.sync(&cx, SyncFlags::NORMAL).expect("sync");
2139 wal.sync(&cx, SyncFlags::FULL).expect("full sync");
2140
2141 wal.close(&cx).expect("close WAL");
2142 }
2143
2144 #[test]
2145 fn test_fault_hook_sync_failure_returns_error_and_records_context() {
2146 let _guard = FAULT_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
2147 crate::fault_hooks::clear();
2148
2149 let cx = test_cx();
2150 let vfs = MemoryVfs::new();
2151 let file = open_wal_file(&vfs, &cx);
2152 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 1, test_salts()).expect("create WAL");
2153 wal.append_frame(&cx, 1, &sample_page(0x44), 1)
2154 .expect("append frame");
2155
2156 crate::fault_hooks::arm_sync_failure(crate::fault_hooks::FaultHookArm::new(
2157 "bd-db300.7.2.2-sync-failure",
2158 "WAL-SYNC-FAILURE",
2159 "wal_sync_recovery",
2160 ));
2161
2162 let error = wal
2163 .sync(&cx, SyncFlags::NORMAL)
2164 .expect_err("fault hook should force sync failure");
2165 assert!(
2166 error.to_string().contains("fault_inject:wal_sync_failure"),
2167 "fault error should identify the sync hook: {error}"
2168 );
2169
2170 let records = crate::fault_hooks::take_records();
2171 assert_eq!(
2172 records.len(),
2173 1,
2174 "exactly one sync fault should be recorded"
2175 );
2176 assert_eq!(records[0].point, "wal_sync_failure");
2177 assert_eq!(records[0].run_id, "bd-db300.7.2.2-sync-failure");
2178 assert!(
2179 records[0].detail.contains("frame_count_before=1"),
2180 "record should capture sync context: {}",
2181 records[0].detail
2182 );
2183
2184 crate::fault_hooks::clear();
2185 }
2186
2187 #[test]
2188 fn test_fault_hook_append_busy_countdown_fires_once_and_preserves_retry_surface() {
2189 let _guard = FAULT_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
2190 crate::fault_hooks::clear();
2191
2192 let cx = test_cx();
2193 let vfs = MemoryVfs::new();
2194 let file = open_wal_file(&vfs, &cx);
2195 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 1, test_salts()).expect("create WAL");
2196
2197 crate::fault_hooks::arm_append_busy_countdown(
2198 crate::fault_hooks::FaultHookArm::new(
2199 "bd-db300.7.2.2-busy-countdown",
2200 "WAL-APPEND-BUSY",
2201 "wal_append_retry",
2202 ),
2203 2,
2204 );
2205
2206 let first_page = sample_page(0x55);
2207 let first_frames = [WalAppendFrameRef {
2208 page_number: 1,
2209 page_data: &first_page,
2210 db_size_if_commit: 1,
2211 }];
2212 wal.append_frames(&cx, &first_frames)
2213 .expect("countdown should not fire on first append");
2214
2215 let second_page = sample_page(0x66);
2216 let second_frames = [WalAppendFrameRef {
2217 page_number: 2,
2218 page_data: &second_page,
2219 db_size_if_commit: 2,
2220 }];
2221 let busy = wal
2222 .append_frames(&cx, &second_frames)
2223 .expect_err("countdown should fire on second append");
2224 assert!(matches!(busy, FrankenError::Busy));
2225 assert_eq!(
2226 wal.frame_count(),
2227 1,
2228 "busy fault should fire before the second append mutates WAL state"
2229 );
2230
2231 wal.append_frames(&cx, &second_frames)
2232 .expect("hook should disarm after firing once");
2233 assert_eq!(
2234 wal.frame_count(),
2235 2,
2236 "retry should succeed once the hook is spent"
2237 );
2238
2239 let records = crate::fault_hooks::take_records();
2240 assert_eq!(records.len(), 1, "busy countdown should record one trigger");
2241 assert_eq!(records[0].point, "wal_append_busy_countdown");
2242 assert_eq!(records[0].run_id, "bd-db300.7.2.2-busy-countdown");
2243 assert!(
2244 records[0].detail.contains("submitted_frames=1"),
2245 "record should preserve append batch context: {}",
2246 records[0].detail
2247 );
2248
2249 crate::fault_hooks::clear();
2250 }
2251
2252 #[test]
2263 fn test_fault_crash_between_header_and_truncate_recovers_to_zero_frames() {
2264 let _guard = FAULT_TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner());
2265 crate::fault_hooks::clear();
2266
2267 let cx = test_cx();
2268 let vfs = MemoryVfs::new();
2269 let file = open_wal_file(&vfs, &cx);
2270 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2271
2272 for i in 1..=3_u32 {
2274 let page = sample_page(i as u8);
2275 wal.append_frame(&cx, i, &page, i)
2276 .expect("append frame before reset fault injection");
2277 }
2278 wal.sync(&cx, SyncFlags::NORMAL).expect("sync WAL");
2279 assert_eq!(wal.frame_count(), 3, "pre-reset: 3 frames");
2280
2281 let original_salts = wal.generation_identity().salts;
2282
2283 crate::fault_hooks::arm_crash_header_truncate(crate::fault_hooks::FaultHookArm::new(
2285 "bd-db300.7.2.2-h9",
2286 "WAL-CRASH-HEADER-TRUNCATE",
2287 "wal_reset_recovery",
2288 ));
2289
2290 let new_salts = WalSalts {
2292 salt1: original_salts.salt1.wrapping_add(1),
2293 salt2: original_salts.salt2.wrapping_add(1),
2294 };
2295 let err = wal
2296 .reset(&cx, 1, new_salts, true)
2297 .expect_err("fault hook should fire between header write and truncate");
2298 assert!(
2299 err.to_string()
2300 .contains("fault_inject:wal_crash_header_truncate"),
2301 "error should identify the hook: {err}"
2302 );
2303
2304 wal.close(&cx).expect("close WAL handle");
2309
2310 let recovered_file = open_wal_file(&vfs, &cx);
2312 let recovered = WalFile::open(&cx, recovered_file).expect("reopen WAL");
2313
2314 assert_eq!(
2317 recovered.frame_count(),
2318 0,
2319 "recovery must discard all old-salt frames after header rewrite"
2320 );
2321 assert_eq!(
2322 recovered.generation_identity().salts,
2323 new_salts,
2324 "recovered header must have the new salts"
2325 );
2326
2327 let records = crate::fault_hooks::take_records();
2329 assert_eq!(records.len(), 1, "exactly one crash hook should fire");
2330 assert_eq!(records[0].point, "wal_crash_header_truncate");
2331 assert_eq!(records[0].scenario_id, "WAL-CRASH-HEADER-TRUNCATE");
2332 assert!(
2333 records[0].detail.contains("old_frame_count=3"),
2334 "record should capture pre-reset frame count: {}",
2335 records[0].detail
2336 );
2337 assert!(
2338 records[0].detail.contains("new_checkpoint_seq=1"),
2339 "record should capture checkpoint seq: {}",
2340 records[0].detail
2341 );
2342
2343 crate::fault_hooks::clear();
2344 }
2345
2346 #[test]
2347 fn test_file_accessors() {
2348 let cx = test_cx();
2349 let vfs = MemoryVfs::new();
2350 let file = open_wal_file(&vfs, &cx);
2351
2352 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2353
2354 let _size = wal.file().file_size(&cx).expect("file_size");
2356 let _size = wal.file_mut().file_size(&cx).expect("file_size via mut");
2357
2358 wal.close(&cx).expect("close WAL");
2359 }
2360
2361 #[test]
2364 fn test_truncated_wal_recovers_committed_prefix() {
2365 let cx = test_cx();
2369 let vfs = MemoryVfs::new();
2370 let file = open_wal_file(&vfs, &cx);
2371
2372 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2373 for i in 0..5u8 {
2374 let db_size = if i == 2 {
2375 3
2376 } else if i == 4 {
2377 5
2378 } else {
2379 0
2380 };
2381 wal.append_frame(&cx, u32::from(i) + 1, &sample_page(i), db_size)
2382 .expect("append");
2383 }
2384 assert_eq!(wal.frame_count(), 5);
2385
2386 let frame_size = wal.frame_size();
2388 let truncate_at = WAL_HEADER_SIZE + frame_size * 3 + frame_size / 2;
2390 let truncate_at_u64 = u64::try_from(truncate_at).expect("truncate_at fits u64");
2391 wal.file_mut()
2392 .truncate(&cx, truncate_at_u64)
2393 .expect("truncate");
2394 wal.close(&cx).expect("close WAL");
2395
2396 let file2 = open_wal_file(&vfs, &cx);
2398 let wal2 = WalFile::open(&cx, file2).expect("open WAL after truncation");
2399 assert_eq!(
2400 wal2.frame_count(),
2401 3,
2402 "only the 3 complete frames before truncation should survive"
2403 );
2404
2405 for i in 0..3u8 {
2407 let (header, data) = wal2.read_frame(&cx, usize::from(i)).expect("read frame");
2408 assert_eq!(header.page_number, u32::from(i) + 1);
2409 assert_eq!(data, sample_page(i));
2410 }
2411 wal2.close(&cx).expect("close WAL");
2412 }
2413
2414 #[test]
2415 fn test_corrupt_frame_payload_detected_on_reopen() {
2416 let cx = test_cx();
2420 let vfs = MemoryVfs::new();
2421 let file = open_wal_file(&vfs, &cx);
2422
2423 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2424 for i in 0..5u8 {
2425 let db_size = if i == 2 {
2426 3
2427 } else if i == 4 {
2428 5
2429 } else {
2430 0
2431 };
2432 wal.append_frame(&cx, u32::from(i) + 1, &sample_page(i), db_size)
2433 .expect("append");
2434 }
2435 let frame_size = wal.frame_size();
2436 wal.close(&cx).expect("close WAL");
2437
2438 let corrupt_offset = WAL_HEADER_SIZE + frame_size * 3 + WAL_FRAME_HEADER_SIZE + 42;
2440 let corrupt_offset_u64 = u64::try_from(corrupt_offset).expect("corrupt_offset fits u64");
2441 let mut f = open_wal_file(&vfs, &cx);
2442 let mut buf = [0u8; 1];
2443 f.read(&cx, &mut buf, corrupt_offset_u64)
2444 .expect("read byte");
2445 buf[0] ^= 0xFF;
2446 f.write(&cx, &buf, corrupt_offset_u64)
2447 .expect("write corrupted byte");
2448 drop(f);
2449
2450 let file3 = open_wal_file(&vfs, &cx);
2452 let wal3 = WalFile::open(&cx, file3).expect("open WAL after corruption");
2453 assert_eq!(
2454 wal3.frame_count(),
2455 3,
2456 "frames after corruption point should be discarded"
2457 );
2458 wal3.close(&cx).expect("close WAL");
2459 }
2460
2461 #[test]
2462 fn test_multi_commit_recovery_to_last_valid() {
2463 let cx = test_cx();
2467 let vfs = MemoryVfs::new();
2468 let file = open_wal_file(&vfs, &cx);
2469
2470 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2471
2472 for i in 1..=3u32 {
2474 let db_size = if i == 3 { 3 } else { 0 };
2475 wal.append_frame(
2476 &cx,
2477 i,
2478 &sample_page(u8::try_from(i).expect("i fits u8")),
2479 db_size,
2480 )
2481 .expect("append");
2482 }
2483
2484 for i in 4..=6u32 {
2486 let db_size = if i == 6 { 6 } else { 0 };
2487 wal.append_frame(
2488 &cx,
2489 i,
2490 &sample_page(u8::try_from(i).expect("i fits u8")),
2491 db_size,
2492 )
2493 .expect("append");
2494 }
2495 assert_eq!(wal.frame_count(), 6);
2496 let frame_size = wal.frame_size();
2497 wal.close(&cx).expect("close WAL");
2498
2499 let corrupt_offset = WAL_HEADER_SIZE + frame_size * 4 + WAL_FRAME_HEADER_SIZE + 10;
2501 let corrupt_offset_u64 = u64::try_from(corrupt_offset).expect("corrupt_offset fits u64");
2502 let mut f = open_wal_file(&vfs, &cx);
2503 let mut buf = [0u8; 1];
2504 f.read(&cx, &mut buf, corrupt_offset_u64).expect("read");
2505 buf[0] ^= 0xAA;
2506 f.write(&cx, &buf, corrupt_offset_u64).expect("corrupt");
2507 drop(f);
2508
2509 let file2 = open_wal_file(&vfs, &cx);
2512 let wal2 = WalFile::open(&cx, file2).expect("open WAL after corruption");
2513 assert_eq!(
2514 wal2.frame_count(),
2515 3,
2516 "chain should break at corrupted frame 5, keeping committed prefix (frames 1-3)"
2517 );
2518
2519 let header3 = wal2.read_frame_header(&cx, 2).expect("read frame 3 header");
2521 assert!(header3.is_commit(), "frame 3 should be a commit frame");
2522
2523 wal2.close(&cx).expect("close WAL");
2524 }
2525
2526 #[test]
2527 fn test_wal_growth_bounded_by_restart_checkpoint() {
2528 use crate::checkpoint::{CheckpointMode, CheckpointState};
2531 use crate::checkpoint_executor::CheckpointTarget;
2532 use crate::checkpoint_executor::execute_checkpoint;
2533 use fsqlite_types::PageNumber;
2534
2535 struct DummyTarget;
2536 impl CheckpointTarget for DummyTarget {
2537 fn write_page(&mut self, _: &Cx, _: PageNumber, _: &[u8]) -> fsqlite_error::Result<()> {
2538 Ok(())
2539 }
2540 fn truncate_db(&mut self, _: &Cx, _: u32) -> fsqlite_error::Result<()> {
2541 Ok(())
2542 }
2543 fn sync_db(&mut self, _: &Cx) -> fsqlite_error::Result<()> {
2544 Ok(())
2545 }
2546 }
2547
2548 let cx = test_cx();
2549 let vfs = MemoryVfs::new();
2550 let file = open_wal_file(&vfs, &cx);
2551 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2552
2553 for i in 1..=100u32 {
2555 let seed = u8::try_from(i % 256).expect("seed fits u8");
2556 let db_size = if i % 10 == 0 { i } else { 0 };
2557 wal.append_frame(&cx, (i - 1) % 50 + 1, &sample_page(seed), db_size)
2558 .expect("append");
2559 }
2560 assert_eq!(wal.frame_count(), 100);
2561
2562 let state = CheckpointState {
2564 total_frames: 100,
2565 backfilled_frames: 0,
2566 oldest_reader_frame: None,
2567 };
2568 let mut target = DummyTarget;
2569 let result = execute_checkpoint(&cx, &mut wal, CheckpointMode::Restart, state, &mut target)
2570 .expect("restart checkpoint");
2571
2572 assert_eq!(result.frames_backfilled, 100);
2573 assert!(result.wal_was_reset);
2574 assert_eq!(wal.frame_count(), 0, "WAL should be empty after restart");
2575
2576 wal.append_frame(&cx, 1, &sample_page(0xAA), 1)
2578 .expect("append after reset");
2579 assert_eq!(wal.frame_count(), 1);
2580 assert_eq!(wal.header().checkpoint_seq, 1, "checkpoint_seq incremented");
2581
2582 wal.close(&cx).expect("close WAL");
2583 }
2584
2585 #[test]
2586 fn test_wal_header_corruption_detected() {
2587 let cx = test_cx();
2590 let vfs = MemoryVfs::new();
2591 let file = open_wal_file(&vfs, &cx);
2592
2593 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2594 wal.append_frame(&cx, 1, &sample_page(1), 1)
2595 .expect("append");
2596 wal.close(&cx).expect("close WAL");
2597
2598 let mut f = open_wal_file(&vfs, &cx);
2600 let corrupted_magic = [0xFF, 0xFF, 0xFF, 0xFF];
2601 f.write(&cx, &corrupted_magic, 0).expect("corrupt header");
2602 drop(f);
2603
2604 let file2 = open_wal_file(&vfs, &cx);
2606 let result = WalFile::open(&cx, file2);
2607 assert!(
2608 result.is_err(),
2609 "opening WAL with corrupted header magic should fail"
2610 );
2611 }
2612
2613 #[test]
2614 fn test_empty_wal_after_crash_reopen() {
2615 let cx = test_cx();
2618 let vfs = MemoryVfs::new();
2619 let file = open_wal_file(&vfs, &cx);
2620
2621 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2622 wal.close(&cx).expect("close WAL");
2623
2624 let file2 = open_wal_file(&vfs, &cx);
2625 let wal2 = WalFile::open(&cx, file2).expect("reopen empty WAL");
2626 assert_eq!(wal2.frame_count(), 0);
2627 wal2.close(&cx).expect("close WAL");
2628 }
2629
2630 #[test]
2631 fn test_crash_after_single_uncommitted_frame() {
2632 let cx = test_cx();
2637 let vfs = MemoryVfs::new();
2638 let file = open_wal_file(&vfs, &cx);
2639
2640 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2641 wal.append_frame(&cx, 1, &sample_page(0x77), 0)
2642 .expect("append non-commit");
2643 wal.close(&cx).expect("close WAL");
2644
2645 let file2 = open_wal_file(&vfs, &cx);
2646 let wal2 = WalFile::open(&cx, file2).expect("reopen WAL");
2647 assert_eq!(
2648 wal2.frame_count(),
2649 0,
2650 "uncommitted frame excluded from recovery"
2651 );
2652 wal2.close(&cx).expect("close WAL");
2653 }
2654
2655 #[test]
2656 fn test_frame_offset_calculation_overflow_safety() {
2657 let page_size: u64 = 4096;
2661 let wal_header_size: u64 = 32;
2662 let wal_frame_header_size: u64 = 24;
2663 let frame_size = wal_frame_header_size + page_size;
2664
2665 let large_index: u64 = 1_042_468;
2671
2672 let idx_u64 = large_index;
2673 let expected_offset = wal_header_size + idx_u64 * frame_size;
2674
2675 let calculated_offset = wal_header_size + idx_u64 * frame_size;
2677
2678 assert_eq!(calculated_offset, expected_offset);
2679
2680 }
2683
2684 #[test]
2689 fn test_frame_offsets_sequential_no_gaps() {
2690 let cx = test_cx();
2693 let vfs = MemoryVfs::new();
2694 let file = open_wal_file(&vfs, &cx);
2695
2696 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2697
2698 let n = 20u32;
2699 for i in 0..n {
2700 let db_size = if i == n - 1 { n } else { 0 };
2701 wal.append_frame(
2702 &cx,
2703 i + 1,
2704 &sample_page(u8::try_from(i % 251).unwrap()),
2705 db_size,
2706 )
2707 .expect("append");
2708 }
2709
2710 let frame_size = wal.frame_size();
2711 let file_size = wal.file().file_size(&cx).expect("file_size");
2712 let expected_size =
2713 u64::try_from(WAL_HEADER_SIZE + usize::try_from(n).unwrap() * frame_size).unwrap();
2714 assert_eq!(
2715 file_size, expected_size,
2716 "WAL file size must equal header + n*frame_size with no padding or gaps"
2717 );
2718
2719 for i in 0..n {
2721 let header = wal
2722 .read_frame_header(&cx, usize::try_from(i).unwrap())
2723 .expect("read header");
2724 assert_eq!(header.page_number, i + 1, "frame {i} page_number");
2725 }
2726
2727 wal.close(&cx).expect("close WAL");
2728 }
2729
2730 #[test]
2731 fn test_checksum_determinism_same_input() {
2732 let cx = test_cx();
2735 let vfs1 = MemoryVfs::new();
2736 let vfs2 = MemoryVfs::new();
2737
2738 let mut checksums_a = Vec::new();
2739 let mut checksums_b = Vec::new();
2740
2741 for (vfs, checksums) in [(&vfs1, &mut checksums_a), (&vfs2, &mut checksums_b)] {
2742 let file = open_wal_file(vfs, &cx);
2743 let mut wal =
2744 WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create WAL");
2745
2746 for i in 0..10u8 {
2747 let page = sample_page(i);
2748 let db_size = if i == 9 { 10 } else { 0 };
2749 wal.append_frame(&cx, u32::from(i) + 1, &page, db_size)
2750 .expect("append");
2751 checksums.push(wal.running_checksum());
2752 }
2753 wal.close(&cx).expect("close WAL");
2754 }
2755
2756 assert_eq!(
2757 checksums_a, checksums_b,
2758 "identical inputs must produce identical checksum chains"
2759 );
2760 }
2761
2762 #[test]
2763 fn test_checksum_sensitivity_one_byte_difference() {
2764 let cx = test_cx();
2767 let vfs1 = MemoryVfs::new();
2768 let vfs2 = MemoryVfs::new();
2769
2770 let mut checksums_a = Vec::new();
2771 let mut checksums_b = Vec::new();
2772
2773 let file1 = open_wal_file(&vfs1, &cx);
2774 let mut wal1 = WalFile::create(&cx, file1, PAGE_SIZE, 0, test_salts()).expect("create");
2775 let file2 = open_wal_file(&vfs2, &cx);
2776 let mut wal2 = WalFile::create(&cx, file2, PAGE_SIZE, 0, test_salts()).expect("create");
2777
2778 for i in 0..5u8 {
2779 let mut page = sample_page(i);
2780 let db_size = if i == 4 { 5 } else { 0 };
2781 wal1.append_frame(&cx, u32::from(i) + 1, &page, db_size)
2782 .expect("append");
2783 checksums_a.push(wal1.running_checksum());
2784
2785 if i == 2 {
2787 page[0] ^= 0x01;
2788 }
2789 wal2.append_frame(&cx, u32::from(i) + 1, &page, db_size)
2790 .expect("append");
2791 checksums_b.push(wal2.running_checksum());
2792 }
2793
2794 assert_eq!(checksums_a[0], checksums_b[0], "frame 0 should match");
2796 assert_eq!(checksums_a[1], checksums_b[1], "frame 1 should match");
2797 assert_ne!(checksums_a[2], checksums_b[2], "frame 2 must diverge");
2798 assert_ne!(checksums_a[3], checksums_b[3], "frame 3 must diverge");
2799 assert_ne!(checksums_a[4], checksums_b[4], "frame 4 must diverge");
2800
2801 wal1.close(&cx).expect("close");
2802 wal2.close(&cx).expect("close");
2803 }
2804
2805 #[test]
2806 fn test_commit_boundary_every_frame() {
2807 let cx = test_cx();
2810 let vfs = MemoryVfs::new();
2811 let file = open_wal_file(&vfs, &cx);
2812
2813 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
2814
2815 let n = 8u32;
2816 for i in 0..n {
2817 wal.append_frame(&cx, i + 1, &sample_page(u8::try_from(i).unwrap()), i + 1)
2818 .expect("append");
2819 }
2820 assert_eq!(wal.frame_count(), usize::try_from(n).unwrap());
2821 wal.close(&cx).expect("close");
2822
2823 let file2 = open_wal_file(&vfs, &cx);
2824 let mut wal2 = WalFile::open(&cx, file2).expect("reopen");
2825 assert_eq!(
2826 wal2.frame_count(),
2827 usize::try_from(n).unwrap(),
2828 "all frames are commits so all should survive reopen"
2829 );
2830
2831 for i in 0..n {
2833 let h = wal2
2834 .read_frame_header(&cx, usize::try_from(i).unwrap())
2835 .expect("read");
2836 assert!(h.is_commit(), "frame {i} must be a commit");
2837 assert_eq!(h.db_size, i + 1);
2838 }
2839
2840 let last = wal2.last_commit_frame(&cx).expect("query");
2842 assert_eq!(last, Some(usize::try_from(n - 1).unwrap()));
2843
2844 wal2.close(&cx).expect("close");
2845 }
2846
2847 #[test]
2848 fn test_commit_boundary_interleaved_multi_txn() {
2849 let cx = test_cx();
2854 let vfs = MemoryVfs::new();
2855 let file = open_wal_file(&vfs, &cx);
2856 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
2857
2858 let frames: [(u32, u32); 6] = [
2859 (1, 0),
2860 (2, 0),
2861 (3, 3), (4, 0),
2863 (5, 5), (6, 6), ];
2866
2867 for (pg, db_sz) in frames {
2868 wal.append_frame(&cx, pg, &sample_page(u8::try_from(pg).unwrap()), db_sz)
2869 .expect("append");
2870 }
2871 assert_eq!(wal.frame_count(), 6);
2872 wal.close(&cx).expect("close");
2873
2874 let file2 = open_wal_file(&vfs, &cx);
2876 let mut wal2 = WalFile::open(&cx, file2).expect("reopen");
2877 assert_eq!(wal2.frame_count(), 6);
2878
2879 for i in 0..6u32 {
2881 let (header, data) = wal2
2882 .read_frame(&cx, usize::try_from(i).unwrap())
2883 .expect("read frame");
2884 assert_eq!(header.page_number, i + 1);
2885 let expected = sample_page(u8::try_from(i + 1).expect("fits"));
2886 assert_eq!(data, expected);
2887 }
2888
2889 let last_header = wal2.read_frame_header(&cx, 5).expect("read header");
2891 assert!(last_header.is_commit());
2892 assert_eq!(last_header.db_size, 6);
2893 let last = wal2.last_commit_frame(&cx).expect("query");
2894 assert_eq!(last, Some(5), "last commit is frame 6 (index 5)");
2895
2896 wal2.close(&cx).expect("close");
2897 }
2898
2899 #[test]
2900 fn test_same_page_overwritten_multiple_times() {
2901 let cx = test_cx();
2905 let vfs = MemoryVfs::new();
2906 let file = open_wal_file(&vfs, &cx);
2907 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
2908
2909 let page_num = 42u32;
2910 let versions = 5;
2911 for v in 0..versions {
2912 let db_size = if v == versions - 1 { 100 } else { 0 };
2913 wal.append_frame(&cx, page_num, &sample_page(v), db_size)
2914 .expect("append");
2915 }
2916 assert_eq!(wal.frame_count(), usize::from(versions));
2917
2918 for v in 0..versions {
2920 let (header, data) = wal.read_frame(&cx, usize::from(v)).expect("read frame");
2921 assert_eq!(header.page_number, page_num);
2922 assert_eq!(data, sample_page(v), "frame {v} data mismatch");
2923 }
2924
2925 wal.close(&cx).expect("close");
2926 }
2927
2928 #[test]
2929 fn test_refresh_detects_concurrent_append() {
2930 let cx = test_cx();
2933 let vfs = MemoryVfs::new();
2934 let file1 = open_wal_file(&vfs, &cx);
2935 let mut wal1 = WalFile::create(&cx, file1, PAGE_SIZE, 0, test_salts()).expect("create");
2936
2937 for i in 0..3u8 {
2939 let db_size = if i == 2 { 3 } else { 0 };
2940 wal1.append_frame(&cx, u32::from(i) + 1, &sample_page(i), db_size)
2941 .expect("append");
2942 }
2943 let checksum_after_3 = wal1.running_checksum();
2944 wal1.close(&cx).expect("close wal1");
2945
2946 let file_reader = open_wal_file(&vfs, &cx);
2948 let mut reader = WalFile::open(&cx, file_reader).expect("open reader");
2949 assert_eq!(reader.frame_count(), 3);
2950 assert_eq!(reader.last_commit_frame(&cx).expect("query"), Some(2));
2951
2952 let file_w2 = open_wal_file(&vfs, &cx);
2954 let mut w2 = WalFile::open(&cx, file_w2).expect("open w2");
2955 assert_eq!(w2.running_checksum(), checksum_after_3);
2956 w2.append_frame(&cx, 4, &sample_page(3), 0).expect("append");
2957 w2.append_frame(&cx, 5, &sample_page(4), 5)
2958 .expect("append commit");
2959 assert_eq!(w2.frame_count(), 5);
2960 w2.close(&cx).expect("close w2");
2961
2962 assert_eq!(reader.frame_count(), 3);
2964 reader.refresh(&cx).expect("refresh");
2965 assert_eq!(
2966 reader.frame_count(),
2967 5,
2968 "after refresh, reader must see the 2 new committed frames"
2969 );
2970 assert_eq!(reader.last_commit_frame(&cx).expect("query"), Some(4));
2971
2972 reader.close(&cx).expect("close reader");
2973 }
2974
2975 #[test]
2976 fn test_refresh_after_reset_detects_new_generation() {
2977 let cx = test_cx();
2980 let vfs = MemoryVfs::new();
2981 let file = open_wal_file(&vfs, &cx);
2982 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
2983
2984 wal.append_frame(&cx, 1, &sample_page(1), 1)
2986 .expect("append");
2987 wal.close(&cx).expect("close");
2988
2989 let file_r = open_wal_file(&vfs, &cx);
2991 let mut reader = WalFile::open(&cx, file_r).expect("open reader");
2992 assert_eq!(reader.frame_count(), 1);
2993 assert_eq!(reader.last_commit_frame(&cx).expect("query"), Some(0));
2994
2995 let file_cp = open_wal_file(&vfs, &cx);
2997 let mut cp = WalFile::open(&cx, file_cp).expect("open cp");
2998 let new_salts = WalSalts {
2999 salt1: 0xAAAA_BBBB,
3000 salt2: 0xCCCC_DDDD,
3001 };
3002 cp.reset(&cx, 1, new_salts, false).expect("reset");
3003 cp.append_frame(&cx, 1, &sample_page(0xAA), 1)
3004 .expect("append after reset");
3005 cp.close(&cx).expect("close cp");
3006
3007 reader.refresh(&cx).expect("refresh");
3009 assert_eq!(reader.frame_count(), 1);
3010 assert_eq!(reader.last_commit_frame(&cx).expect("query"), Some(0));
3011 assert_eq!(
3012 reader.header().salts,
3013 new_salts,
3014 "salts should be new generation"
3015 );
3016
3017 reader.close(&cx).expect("close reader");
3018 }
3019
3020 #[test]
3021 fn test_refresh_after_reset_with_same_salts_detects_new_generation() {
3022 let cx = test_cx();
3025 let vfs = MemoryVfs::new();
3026 let file = open_wal_file(&vfs, &cx);
3027 let salts = test_salts();
3028 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, salts).expect("create");
3029
3030 wal.append_frame(&cx, 1, &sample_page(1), 1)
3031 .expect("append");
3032 wal.close(&cx).expect("close");
3033
3034 let file_r = open_wal_file(&vfs, &cx);
3035 let mut reader = WalFile::open(&cx, file_r).expect("open reader");
3036 let before = reader.generation_identity();
3037 assert_eq!(before.checkpoint_seq, 0);
3038 assert_eq!(before.salts, salts);
3039 assert_eq!(reader.frame_count(), 1);
3040
3041 let file_cp = open_wal_file(&vfs, &cx);
3042 let mut cp = WalFile::open(&cx, file_cp).expect("open cp");
3043 cp.reset(&cx, 1, salts, false)
3044 .expect("reset with same salts");
3045 cp.append_frame(&cx, 2, &sample_page(0xAA), 2)
3046 .expect("append after reset");
3047 cp.close(&cx).expect("close cp");
3048
3049 reader.refresh(&cx).expect("refresh");
3050 let after = reader.generation_identity();
3051 assert_eq!(
3052 after.checkpoint_seq, 1,
3053 "refresh must observe new checkpoint_seq"
3054 );
3055 assert_eq!(
3056 after.salts, salts,
3057 "same-salt reset is intentional in this test"
3058 );
3059 assert_ne!(
3060 before, after,
3061 "generation identity must change even when salts are reused"
3062 );
3063 assert_eq!(
3064 reader.frame_count(),
3065 1,
3066 "reader must rebuild to new generation"
3067 );
3068
3069 let (header, data) = reader.read_frame(&cx, 0).expect("read rebuilt frame");
3070 assert_eq!(
3071 header.page_number, 2,
3072 "reader must see new-generation frame"
3073 );
3074 assert_eq!(data, sample_page(0xAA));
3075
3076 reader.close(&cx).expect("close reader");
3077 }
3078
3079 #[test]
3080 fn test_group_commit_checksum_chain_matches_single_append() {
3081 use crate::group_commit::{
3084 FrameSubmission, TransactionFrameBatch, write_consolidated_frames,
3085 };
3086
3087 let cx = test_cx();
3088 let vfs_single = MemoryVfs::new();
3089 let vfs_group = MemoryVfs::new();
3090
3091 let pages: Vec<Vec<u8>> = (0..6u8).map(sample_page).collect();
3092 let page_nums: Vec<u32> = (1..=6u32).collect();
3093 let commit_sizes: Vec<u32> = vec![0, 0, 3, 0, 0, 6];
3095
3096 let file_s = open_wal_file(&vfs_single, &cx);
3098 let mut wal_s =
3099 WalFile::create(&cx, file_s, PAGE_SIZE, 0, test_salts()).expect("create single");
3100 for i in 0..6 {
3101 wal_s
3102 .append_frame(&cx, page_nums[i], &pages[i], commit_sizes[i])
3103 .expect("append single");
3104 }
3105 let single_checksum = wal_s.running_checksum();
3106 let single_count = wal_s.frame_count();
3107
3108 let file_g = open_wal_file(&vfs_group, &cx);
3110 let mut wal_g =
3111 WalFile::create(&cx, file_g, PAGE_SIZE, 0, test_salts()).expect("create group");
3112
3113 let batch1 = TransactionFrameBatch::new(
3114 (0..3)
3115 .map(|i| FrameSubmission {
3116 page_number: page_nums[i],
3117 page_data: pages[i].clone(),
3118 db_size_if_commit: commit_sizes[i],
3119 })
3120 .collect(),
3121 );
3122 let batch2 = TransactionFrameBatch::new(
3123 (3..6)
3124 .map(|i| FrameSubmission {
3125 page_number: page_nums[i],
3126 page_data: pages[i].clone(),
3127 db_size_if_commit: commit_sizes[i],
3128 })
3129 .collect(),
3130 );
3131
3132 write_consolidated_frames(&cx, &mut wal_g, &[batch1, batch2]).expect("group write");
3133 let group_checksum = wal_g.running_checksum();
3134 let group_count = wal_g.frame_count();
3135
3136 assert_eq!(single_count, group_count, "frame counts must match");
3137 assert_eq!(
3138 single_checksum, group_checksum,
3139 "group commit must produce identical checksum chain as single-frame append"
3140 );
3141
3142 for i in 0..6 {
3144 let (h_s, d_s) = wal_s.read_frame(&cx, i).expect("read single");
3145 let (h_g, d_g) = wal_g.read_frame(&cx, i).expect("read group");
3146 assert_eq!(h_s.page_number, h_g.page_number, "frame {i} page_number");
3147 assert_eq!(h_s.db_size, h_g.db_size, "frame {i} db_size");
3148 assert_eq!(h_s.checksum, h_g.checksum, "frame {i} checksum");
3149 assert_eq!(h_s.salts, h_g.salts, "frame {i} salts");
3150 assert_eq!(d_s, d_g, "frame {i} data");
3151 }
3152
3153 wal_s.close(&cx).expect("close single");
3154 wal_g.close(&cx).expect("close group");
3155 }
3156
3157 #[test]
3158 fn test_batch_append_checksum_chain_matches_single_append() {
3159 let cx = test_cx();
3160 let vfs_single = MemoryVfs::new();
3161 let vfs_batch = MemoryVfs::new();
3162
3163 let pages: Vec<Vec<u8>> = (0..6u8).map(sample_page).collect();
3164 let page_nums: Vec<u32> = (1..=6u32).collect();
3165 let commit_sizes: Vec<u32> = vec![0, 0, 3, 0, 0, 6];
3166
3167 let file_single = open_wal_file(&vfs_single, &cx);
3168 let mut wal_single =
3169 WalFile::create(&cx, file_single, PAGE_SIZE, 0, test_salts()).expect("create single");
3170 for i in 0..6 {
3171 wal_single
3172 .append_frame(&cx, page_nums[i], &pages[i], commit_sizes[i])
3173 .expect("append single");
3174 }
3175
3176 let file_batch = open_wal_file(&vfs_batch, &cx);
3177 let mut wal_batch =
3178 WalFile::create(&cx, file_batch, PAGE_SIZE, 0, test_salts()).expect("create batch");
3179 let frames: Vec<_> = (0..6)
3180 .map(|i| WalAppendFrameRef {
3181 page_number: page_nums[i],
3182 page_data: &pages[i],
3183 db_size_if_commit: commit_sizes[i],
3184 })
3185 .collect();
3186 wal_batch.append_frames(&cx, &frames).expect("append batch");
3187
3188 assert_eq!(
3189 wal_single.frame_count(),
3190 wal_batch.frame_count(),
3191 "batch append must preserve frame count"
3192 );
3193 assert_eq!(
3194 wal_single.running_checksum(),
3195 wal_batch.running_checksum(),
3196 "batch append must preserve checksum chain"
3197 );
3198
3199 for i in 0..6 {
3200 let (single_header, single_data) = wal_single.read_frame(&cx, i).expect("read single");
3201 let (batch_header, batch_data) = wal_batch.read_frame(&cx, i).expect("read batch");
3202 assert_eq!(single_header, batch_header, "frame header {i} must match");
3203 assert_eq!(single_data, batch_data, "frame payload {i} must match");
3204 }
3205 }
3206
3207 #[test]
3211 fn test_fused_append_frames_produces_byte_identical_wal_file() {
3212 let cx = test_cx();
3213 let vfs_single = MemoryVfs::new();
3214 let vfs_fused = MemoryVfs::new();
3215
3216 let pages: Vec<Vec<u8>> = (0..4u8).map(sample_page).collect();
3217 let page_nums: Vec<u32> = vec![3, 1, 4, 2];
3218 let commit_sizes: Vec<u32> = vec![0, 0, 0, 4];
3219
3220 let file_s = open_wal_file(&vfs_single, &cx);
3222 let mut wal_s =
3223 WalFile::create(&cx, file_s, PAGE_SIZE, 0, test_salts()).expect("create single");
3224 for i in 0..4 {
3225 wal_s
3226 .append_frame(&cx, page_nums[i], &pages[i], commit_sizes[i])
3227 .expect("append single");
3228 }
3229
3230 let file_f = open_wal_file(&vfs_fused, &cx);
3232 let mut wal_f =
3233 WalFile::create(&cx, file_f, PAGE_SIZE, 0, test_salts()).expect("create fused");
3234 let frames: Vec<_> = (0..4)
3235 .map(|i| WalAppendFrameRef {
3236 page_number: page_nums[i],
3237 page_data: &pages[i],
3238 db_size_if_commit: commit_sizes[i],
3239 })
3240 .collect();
3241 wal_f.append_frames(&cx, &frames).expect("append fused");
3242
3243 assert_eq!(wal_s.frame_count(), wal_f.frame_count());
3245 assert_eq!(wal_s.running_checksum(), wal_f.running_checksum());
3246
3247 let frame_size = wal_s.frame_size();
3248 for i in 0..4 {
3249 let mut buf_s = vec![0u8; frame_size];
3250 let mut buf_f = vec![0u8; frame_size];
3251 wal_s
3252 .read_frame_into(&cx, i, &mut buf_s)
3253 .expect("read single");
3254 wal_f
3255 .read_frame_into(&cx, i, &mut buf_f)
3256 .expect("read fused");
3257 assert_eq!(
3258 buf_s, buf_f,
3259 "raw frame bytes at index {i} must be identical"
3260 );
3261 }
3262 }
3263
3264 #[test]
3268 fn test_append_frames_restores_scratch_on_error() {
3269 let cx = test_cx();
3270 let vfs = MemoryVfs::new();
3271 let file = open_wal_file(&vfs, &cx);
3272 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
3273
3274 let good_page = sample_page(0x11);
3276 wal.append_frame(&cx, 1, &good_page, 0)
3277 .expect("first append");
3278 let checksum_before = wal.running_checksum();
3279 let scratch_cap_before = wal.frame_scratch_capacity();
3280 let mut first_frame_before = vec![0u8; wal.frame_size()];
3281 wal.read_frame_into(&cx, 0, &mut first_frame_before)
3282 .expect("read baseline frame");
3283
3284 let bad_page = vec![0xBBu8; PAGE_SIZE as usize + 1]; let good_page2 = sample_page(0x22);
3287 let bad_frames = vec![
3288 WalAppendFrameRef {
3289 page_number: 2,
3290 page_data: &good_page2,
3291 db_size_if_commit: 0,
3292 },
3293 WalAppendFrameRef {
3294 page_number: 3,
3295 page_data: &bad_page, db_size_if_commit: 3,
3297 },
3298 ];
3299 let err = wal.append_frames(&cx, &bad_frames);
3300 assert!(err.is_err(), "bad page size should cause error");
3301
3302 assert_eq!(
3304 wal.frame_count(),
3305 1,
3306 "failed append must not advance frame count"
3307 );
3308 assert_eq!(
3309 wal.running_checksum(),
3310 checksum_before,
3311 "failed append must preserve the running checksum of prior committed frames"
3312 );
3313 assert!(
3314 wal.frame_scratch_capacity() >= scratch_cap_before,
3315 "scratch capacity must not shrink after error"
3316 );
3317 let mut first_frame_after = vec![0u8; wal.frame_size()];
3318 wal.read_frame_into(&cx, 0, &mut first_frame_after)
3319 .expect("read preserved frame");
3320 assert_eq!(
3321 first_frame_after, first_frame_before,
3322 "failed append must not rewrite previously committed raw frame bytes"
3323 );
3324
3325 let recovery_page = sample_page(0x33);
3327 wal.append_frame(&cx, 2, &recovery_page, 2)
3328 .expect("recovery append after error");
3329 assert_eq!(wal.frame_count(), 2, "recovery append should succeed");
3330 }
3331
3332 #[test]
3333 fn test_append_frame_reuses_frame_scratch_between_calls() {
3334 let cx = test_cx();
3335 let vfs = MemoryVfs::new();
3336 let file = open_wal_file(&vfs, &cx);
3337 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
3338
3339 wal.append_frame(&cx, 1, &sample_page(0x11), 0)
3340 .expect("append first");
3341 let scratch_len = wal.frame_scratch_len();
3342 let scratch_capacity = wal.frame_scratch_capacity();
3343 let scratch_ptr = wal.frame_scratch_ptr();
3344
3345 wal.append_frame(&cx, 2, &sample_page(0x22), 2)
3346 .expect("append second");
3347
3348 assert_eq!(
3349 wal.frame_scratch_len(),
3350 wal.frame_size(),
3351 "single-frame append should keep one frame sized scratch"
3352 );
3353 assert_eq!(
3354 wal.frame_scratch_len(),
3355 scratch_len,
3356 "single-frame scratch length should stay constant across appends"
3357 );
3358 assert_eq!(
3359 wal.frame_scratch_capacity(),
3360 scratch_capacity,
3361 "single-frame scratch should retain its allocation"
3362 );
3363 assert_eq!(
3364 wal.frame_scratch_ptr(),
3365 scratch_ptr,
3366 "single-frame scratch should reuse the same backing buffer"
3367 );
3368 }
3369
3370 #[test]
3371 fn test_batch_append_reuses_frame_scratch_between_calls() {
3372 let cx = test_cx();
3373 let vfs = MemoryVfs::new();
3374 let file = open_wal_file(&vfs, &cx);
3375 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
3376
3377 let first_pages: Vec<Vec<u8>> = (0..3u8).map(sample_page).collect();
3378 let first_frames: Vec<_> = (0..3)
3379 .map(|i| WalAppendFrameRef {
3380 page_number: u32::try_from(i).expect("index fits u32") + 1,
3381 page_data: &first_pages[i],
3382 db_size_if_commit: 0,
3383 })
3384 .collect();
3385 wal.append_frames(&cx, &first_frames)
3386 .expect("append first batch");
3387 let scratch_capacity = wal.frame_scratch_capacity();
3388 let scratch_ptr = wal.frame_scratch_ptr();
3389
3390 let second_pages: Vec<Vec<u8>> = (0..2u8).map(|i| sample_page(i + 10)).collect();
3391 let second_frames: Vec<_> = (0..2)
3392 .map(|i| WalAppendFrameRef {
3393 page_number: u32::try_from(i).expect("index fits u32") + 10,
3394 page_data: &second_pages[i],
3395 db_size_if_commit: if i == 1 { 11 } else { 0 },
3396 })
3397 .collect();
3398 wal.append_frames(&cx, &second_frames)
3399 .expect("append second batch");
3400
3401 assert_eq!(
3402 wal.frame_scratch_len(),
3403 second_frames.len() * wal.frame_size(),
3404 "batch scratch length should track the active batch size"
3405 );
3406 assert_eq!(
3407 wal.frame_scratch_capacity(),
3408 scratch_capacity,
3409 "smaller follow-on batches should retain the existing scratch allocation"
3410 );
3411 assert_eq!(
3412 wal.frame_scratch_ptr(),
3413 scratch_ptr,
3414 "smaller follow-on batches should reuse the same backing buffer"
3415 );
3416 }
3417
3418 #[test]
3419 fn test_reset_clears_frame_scratch_len_without_dropping_capacity() {
3420 let cx = test_cx();
3421 let vfs = MemoryVfs::new();
3422 let file = open_wal_file(&vfs, &cx);
3423 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
3424 let reset_salts = WalSalts {
3425 salt1: 0xABCD_EF01,
3426 salt2: 0x1020_3040,
3427 };
3428
3429 let pages: Vec<Vec<u8>> = (0..4u8).map(sample_page).collect();
3430 let frames: Vec<_> = (0..4)
3431 .map(|i| WalAppendFrameRef {
3432 page_number: u32::try_from(i).expect("index fits u32") + 1,
3433 page_data: &pages[i],
3434 db_size_if_commit: if i == 3 { 4 } else { 0 },
3435 })
3436 .collect();
3437 wal.append_frames(&cx, &frames).expect("append batch");
3438 let scratch_capacity = wal.frame_scratch_capacity();
3439 let scratch_ptr = wal.frame_scratch_ptr();
3440
3441 wal.reset(&cx, 1, reset_salts, false).expect("reset WAL");
3442
3443 assert_eq!(
3444 wal.frame_scratch_len(),
3445 0,
3446 "reset should leave scratch empty for the next append cycle"
3447 );
3448 assert_eq!(
3449 wal.frame_scratch_capacity(),
3450 scratch_capacity,
3451 "reset should preserve scratch capacity for reuse"
3452 );
3453 assert_eq!(
3454 wal.frame_scratch_ptr(),
3455 scratch_ptr,
3456 "reset should keep the existing scratch allocation alive"
3457 );
3458 }
3459
3460 #[test]
3461 #[ignore = "benchmark evidence only"]
3462 fn wal_frame_scratch_benchmark_report() {
3463 let cases = vec![
3464 track_c_scratch_case_report(
3465 "single_frame_256_ops",
3466 1,
3467 256,
3468 || {
3469 track_c_measure_single_frame_case(
3470 TrackCScratchBenchMode::FreshAllocBaseline,
3471 256,
3472 )
3473 },
3474 || {
3475 track_c_measure_single_frame_case(
3476 TrackCScratchBenchMode::ScratchReuseCandidate,
3477 256,
3478 )
3479 },
3480 ),
3481 track_c_scratch_case_report(
3482 "batch_8_frames_64_ops",
3483 8,
3484 64,
3485 || track_c_measure_batch_case::<8>(TrackCScratchBenchMode::FreshAllocBaseline, 64),
3486 || {
3487 track_c_measure_batch_case::<8>(
3488 TrackCScratchBenchMode::ScratchReuseCandidate,
3489 64,
3490 )
3491 },
3492 ),
3493 track_c_scratch_case_report(
3494 "batch_32_frames_16_ops",
3495 32,
3496 16,
3497 || track_c_measure_batch_case::<32>(TrackCScratchBenchMode::FreshAllocBaseline, 16),
3498 || {
3499 track_c_measure_batch_case::<32>(
3500 TrackCScratchBenchMode::ScratchReuseCandidate,
3501 16,
3502 )
3503 },
3504 ),
3505 ];
3506
3507 let report = json!({
3508 "schema_version": "fsqlite.track_c.wal_scratch_benchmark.v1",
3509 "bead_id": TRACK_C_SCRATCH_BENCH_BEAD_ID,
3510 "parent_bead_id": "bd-db300.3.4",
3511 "measured_operation": "wal_frame_assembly_and_append",
3512 "warmup_iterations": TRACK_C_SCRATCH_BENCH_WARMUP_ITERS,
3513 "measurement_iterations": TRACK_C_SCRATCH_BENCH_MEASURE_ITERS,
3514 "vfs": "memory",
3515 "baseline_variant": "fresh_frame_buffer_per_operation",
3516 "candidate_variant": "reusable_wal_handle_frame_scratch",
3517 "cases": cases,
3518 });
3519
3520 println!("BEGIN_BD_DB300_3_4_3_REPORT");
3521 println!("{}", serde_json::to_string_pretty(&report).unwrap());
3522 println!("END_BD_DB300_3_4_3_REPORT");
3523 }
3524
3525 #[test]
3526 fn test_prepared_batch_append_checksum_chain_matches_single_append() {
3527 let cx = test_cx();
3528 let vfs_single = MemoryVfs::new();
3529 let vfs_prepared = MemoryVfs::new();
3530 let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
3531
3532 let pages: Vec<Vec<u8>> = (0..6u8).map(sample_page).collect();
3533 let page_nums: Vec<u32> = (1..=6u32).collect();
3534 let commit_sizes: Vec<u32> = vec![0, 0, 3, 0, 0, 6];
3535
3536 let file_single = open_wal_file(&vfs_single, &cx);
3537 let mut wal_single =
3538 WalFile::create(&cx, file_single, PAGE_SIZE, 0, test_salts()).expect("create single");
3539 for i in 0..6 {
3540 wal_single
3541 .append_frame(&cx, page_nums[i], &pages[i], commit_sizes[i])
3542 .expect("append single");
3543 }
3544
3545 let file_prepared = open_wal_file(&vfs_prepared, &cx);
3546 let mut wal_prepared = WalFile::create(&cx, file_prepared, PAGE_SIZE, 0, test_salts())
3547 .expect("create prepared");
3548 let frames: Vec<_> = (0..6)
3549 .map(|i| WalAppendFrameRef {
3550 page_number: page_nums[i],
3551 page_data: &pages[i],
3552 db_size_if_commit: commit_sizes[i],
3553 })
3554 .collect();
3555 let mut prepared_bytes = wal_prepared
3556 .prepare_frame_bytes(&frames)
3557 .expect("prepare frame bytes");
3558 let frame_transforms = prepared_bytes
3559 .chunks_exact(wal_prepared.frame_size())
3560 .map(|frame| {
3561 WalChecksumTransform::for_wal_frame(
3562 frame,
3563 page_size,
3564 wal_prepared.big_endian_checksum(),
3565 )
3566 })
3567 .collect::<Result<Vec<_>>>()
3568 .expect("compute frame transforms");
3569 wal_prepared
3570 .append_prepared_frame_bytes(&cx, &mut prepared_bytes, &frame_transforms)
3571 .expect("append prepared batch");
3572
3573 assert_eq!(
3574 wal_single.frame_count(),
3575 wal_prepared.frame_count(),
3576 "prepared append must preserve frame count"
3577 );
3578 assert_eq!(
3579 wal_single.running_checksum(),
3580 wal_prepared.running_checksum(),
3581 "prepared append must preserve checksum chain"
3582 );
3583
3584 for i in 0..6 {
3585 let (single_header, single_data) = wal_single.read_frame(&cx, i).expect("read single");
3586 let (prepared_header, prepared_data) =
3587 wal_prepared.read_frame(&cx, i).expect("read prepared");
3588 assert_eq!(
3589 single_header, prepared_header,
3590 "frame header {i} must match"
3591 );
3592 assert_eq!(single_data, prepared_data, "frame payload {i} must match");
3593 }
3594 }
3595
3596 #[test]
3597 fn test_prepared_batch_reseeds_after_intervening_growth() {
3598 let cx = test_cx();
3599 let vfs_single = MemoryVfs::new();
3600 let vfs_prepared = MemoryVfs::new();
3601 let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
3602
3603 let pages: Vec<Vec<u8>> = (0..3u8).map(sample_page).collect();
3604 let page_nums: Vec<u32> = (1..=3u32).collect();
3605 let commit_sizes: Vec<u32> = vec![0, 0, 3];
3606 let intervening_page = sample_page(0xAA);
3607
3608 let file_single = open_wal_file(&vfs_single, &cx);
3609 let mut wal_single =
3610 WalFile::create(&cx, file_single, PAGE_SIZE, 0, test_salts()).expect("create single");
3611 wal_single
3612 .append_frame(&cx, 99, &intervening_page, 0)
3613 .expect("append intervening single");
3614 for i in 0..3 {
3615 wal_single
3616 .append_frame(&cx, page_nums[i], &pages[i], commit_sizes[i])
3617 .expect("append single");
3618 }
3619
3620 let file_prepared = open_wal_file(&vfs_prepared, &cx);
3621 let mut wal_prepared = WalFile::create(&cx, file_prepared, PAGE_SIZE, 0, test_salts())
3622 .expect("create prepared");
3623 let frames: Vec<_> = (0..3)
3624 .map(|i| WalAppendFrameRef {
3625 page_number: page_nums[i],
3626 page_data: &pages[i],
3627 db_size_if_commit: commit_sizes[i],
3628 })
3629 .collect();
3630 let mut prepared_bytes = wal_prepared
3631 .prepare_frame_bytes(&frames)
3632 .expect("prepare frame bytes");
3633 let frame_transforms = prepared_bytes
3634 .chunks_exact(wal_prepared.frame_size())
3635 .map(|frame| {
3636 WalChecksumTransform::for_wal_frame(
3637 frame,
3638 page_size,
3639 wal_prepared.big_endian_checksum(),
3640 )
3641 })
3642 .collect::<Result<Vec<_>>>()
3643 .expect("compute frame transforms");
3644 wal_prepared
3645 .append_frame(&cx, 99, &intervening_page, 0)
3646 .expect("append intervening prepared");
3647 wal_prepared
3648 .append_prepared_frame_bytes(&cx, &mut prepared_bytes, &frame_transforms)
3649 .expect("append prepared batch");
3650
3651 assert_eq!(
3652 wal_single.frame_count(),
3653 wal_prepared.frame_count(),
3654 "prepared append after growth must preserve frame count"
3655 );
3656 assert_eq!(
3657 wal_single.running_checksum(),
3658 wal_prepared.running_checksum(),
3659 "prepared append after growth must rebind to the live checksum seed"
3660 );
3661
3662 for i in 0..wal_single.frame_count() {
3663 let (single_header, single_data) = wal_single.read_frame(&cx, i).expect("read single");
3664 let (prepared_header, prepared_data) =
3665 wal_prepared.read_frame(&cx, i).expect("read prepared");
3666 assert_eq!(
3667 single_header, prepared_header,
3668 "frame header {i} must match"
3669 );
3670 assert_eq!(single_data, prepared_data, "frame payload {i} must match");
3671 }
3672 }
3673
3674 #[test]
3675 fn test_prepared_batch_rewrites_salts_after_reset() {
3676 let cx = test_cx();
3677 let vfs_fresh = MemoryVfs::new();
3678 let vfs_reset = MemoryVfs::new();
3679 let page_size = usize::try_from(PAGE_SIZE).expect("page size fits usize");
3680 let reset_salts = WalSalts {
3681 salt1: 0x0102_0304,
3682 salt2: 0xA0B0_C0D0,
3683 };
3684
3685 let pages: Vec<Vec<u8>> = (0..2u8).map(sample_page).collect();
3686 let page_nums: Vec<u32> = (1..=2u32).collect();
3687 let commit_sizes: Vec<u32> = vec![0, 2];
3688
3689 let file_fresh = open_wal_file(&vfs_fresh, &cx);
3690 let mut wal_fresh =
3691 WalFile::create(&cx, file_fresh, PAGE_SIZE, 7, reset_salts).expect("create fresh");
3692 for i in 0..2 {
3693 wal_fresh
3694 .append_frame(&cx, page_nums[i], &pages[i], commit_sizes[i])
3695 .expect("append fresh");
3696 }
3697
3698 let file_reset = open_wal_file(&vfs_reset, &cx);
3699 let mut wal_reset =
3700 WalFile::create(&cx, file_reset, PAGE_SIZE, 0, test_salts()).expect("create reset");
3701 let frames: Vec<_> = (0..2)
3702 .map(|i| WalAppendFrameRef {
3703 page_number: page_nums[i],
3704 page_data: &pages[i],
3705 db_size_if_commit: commit_sizes[i],
3706 })
3707 .collect();
3708 let mut prepared_bytes = wal_reset
3709 .prepare_frame_bytes(&frames)
3710 .expect("prepare frame bytes");
3711 let frame_transforms = prepared_bytes
3712 .chunks_exact(wal_reset.frame_size())
3713 .map(|frame| {
3714 WalChecksumTransform::for_wal_frame(
3715 frame,
3716 page_size,
3717 wal_reset.big_endian_checksum(),
3718 )
3719 })
3720 .collect::<Result<Vec<_>>>()
3721 .expect("compute frame transforms");
3722 wal_reset
3723 .reset(&cx, 7, reset_salts, false)
3724 .expect("reset WAL");
3725 wal_reset
3726 .append_prepared_frame_bytes(&cx, &mut prepared_bytes, &frame_transforms)
3727 .expect("append prepared batch");
3728
3729 assert_eq!(
3730 wal_fresh.frame_count(),
3731 wal_reset.frame_count(),
3732 "prepared append after reset must preserve frame count"
3733 );
3734 assert_eq!(
3735 wal_fresh.running_checksum(),
3736 wal_reset.running_checksum(),
3737 "prepared append after reset must rebind to the reset checksum seed"
3738 );
3739
3740 for i in 0..2 {
3741 let (fresh_header, fresh_data) = wal_fresh.read_frame(&cx, i).expect("read fresh");
3742 let (reset_header, reset_data) = wal_reset.read_frame(&cx, i).expect("read reset");
3743 assert_eq!(fresh_header, reset_header, "frame header {i} must match");
3744 assert_eq!(
3745 fresh_header.salts, reset_salts,
3746 "frame {i} must use reset salts"
3747 );
3748 assert_eq!(fresh_data, reset_data, "frame payload {i} must match");
3749 }
3750 }
3751
3752 #[test]
3753 fn test_uncommitted_tail_trimmed_on_reopen() {
3754 let cx = test_cx();
3757 let vfs = MemoryVfs::new();
3758 let file = open_wal_file(&vfs, &cx);
3759 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
3760
3761 let commit_map: [(u32, u32); 5] = [(1, 0), (2, 0), (3, 3), (4, 0), (5, 0)];
3762 for (pg, db_sz) in commit_map {
3763 wal.append_frame(&cx, pg, &sample_page(u8::try_from(pg).unwrap()), db_sz)
3764 .expect("append");
3765 }
3766 assert_eq!(wal.frame_count(), 5);
3767 wal.close(&cx).expect("close");
3768
3769 let file2 = open_wal_file(&vfs, &cx);
3771 let wal2 = WalFile::open(&cx, file2).expect("reopen");
3772 assert_eq!(
3773 wal2.frame_count(),
3774 3,
3775 "frames after last commit should be trimmed on reopen"
3776 );
3777 wal2.close(&cx).expect("close");
3778 }
3779
3780 #[test]
3781 fn test_large_transaction_50_frames() {
3782 let cx = test_cx();
3784 let vfs = MemoryVfs::new();
3785 let file = open_wal_file(&vfs, &cx);
3786 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
3787
3788 let n = 50u32;
3789 for i in 0..n {
3790 let db_size = if i == n - 1 { n } else { 0 };
3791 let seed = u8::try_from(i % 251).unwrap();
3792 wal.append_frame(&cx, i + 1, &sample_page(seed), db_size)
3793 .expect("append");
3794 }
3795 assert_eq!(wal.frame_count(), usize::try_from(n).unwrap());
3796 let final_checksum = wal.running_checksum();
3797 wal.close(&cx).expect("close");
3798
3799 let file2 = open_wal_file(&vfs, &cx);
3801 let wal2 = WalFile::open(&cx, file2).expect("reopen");
3802 assert_eq!(wal2.frame_count(), usize::try_from(n).unwrap());
3803 assert_eq!(wal2.running_checksum(), final_checksum);
3804
3805 for idx in [0, 24, 49] {
3807 let (h, d) = wal2.read_frame(&cx, idx).expect("read");
3808 let i = u32::try_from(idx).unwrap();
3809 assert_eq!(h.page_number, i + 1);
3810 assert_eq!(d, sample_page(u8::try_from(i % 251).unwrap()));
3811 }
3812
3813 wal2.close(&cx).expect("close");
3814 }
3815
3816 #[test]
3817 fn test_append_after_reset_checksum_independent() {
3818 let cx = test_cx();
3822
3823 let salts = WalSalts {
3824 salt1: 0x1234_5678,
3825 salt2: 0x9ABC_DEF0,
3826 };
3827
3828 let vfs1 = MemoryVfs::new();
3830 let file1 = open_wal_file(&vfs1, &cx);
3831 let mut wal_fresh = WalFile::create(&cx, file1, PAGE_SIZE, 1, salts).expect("create fresh");
3832 wal_fresh
3833 .append_frame(&cx, 1, &sample_page(0x42), 1)
3834 .expect("append fresh");
3835 let fresh_checksum = wal_fresh.running_checksum();
3836 wal_fresh.close(&cx).expect("close fresh");
3837
3838 let vfs2 = MemoryVfs::new();
3840 let file2 = open_wal_file(&vfs2, &cx);
3841 let mut wal_reset =
3842 WalFile::create(&cx, file2, PAGE_SIZE, 0, test_salts()).expect("create reset");
3843 wal_reset
3845 .append_frame(&cx, 99, &sample_page(0xFF), 99)
3846 .expect("append old");
3847 wal_reset.reset(&cx, 1, salts, false).expect("reset");
3849 wal_reset
3850 .append_frame(&cx, 1, &sample_page(0x42), 1)
3851 .expect("append after reset");
3852 let reset_checksum = wal_reset.running_checksum();
3853 wal_reset.close(&cx).expect("close reset");
3854
3855 assert_eq!(
3856 fresh_checksum, reset_checksum,
3857 "after reset with same salts, checksum chain must match fresh WAL"
3858 );
3859 }
3860
3861 fn build_two_txn_wal() -> (MemoryVfs, Vec<Vec<u8>>) {
3869 let cx = test_cx();
3870 let vfs = MemoryVfs::new();
3871 let file = open_wal_file(&vfs, &cx);
3872 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
3873
3874 let mut pages = Vec::new();
3875 let frame_specs: [(u32, u32); 6] = [
3876 (1, 0),
3877 (2, 0),
3878 (3, 3), (4, 0),
3880 (5, 0),
3881 (6, 6), ];
3883 for (pg, db_sz) in frame_specs {
3884 let page = sample_page(u8::try_from(pg).unwrap());
3885 wal.append_frame(&cx, pg, &page, db_sz).expect("append");
3886 pages.push(page);
3887 }
3888 wal.close(&cx).expect("close");
3889 (vfs, pages)
3890 }
3891
3892 #[test]
3893 fn test_crash_matrix_truncate_at_every_frame_boundary() {
3894 let cx = test_cx();
3897 let frame_size = WAL_FRAME_HEADER_SIZE + usize::try_from(PAGE_SIZE).unwrap();
3898
3899 for cut_frames in 0..=6usize {
3900 let (vfs, _) = build_two_txn_wal();
3902
3903 let cut_at = WAL_HEADER_SIZE + cut_frames * frame_size;
3904 let mut f = open_wal_file(&vfs, &cx);
3905 f.truncate(&cx, u64::try_from(cut_at).unwrap())
3906 .expect("truncate");
3907 drop(f);
3908
3909 let f2 = open_wal_file(&vfs, &cx);
3910 let wal = WalFile::open(&cx, f2).expect("open after truncation");
3911 let expected = match cut_frames {
3912 0..=2 => 0, 3..=5 => 3, 6 => 6, _ => unreachable!(),
3916 };
3917 assert_eq!(
3918 wal.frame_count(),
3919 expected,
3920 "truncated at {cut_frames} frames should give {expected} committed"
3921 );
3922 wal.close(&cx).expect("close");
3923 }
3924
3925 for partial in 0..20usize {
3927 let (vfs, _) = build_two_txn_wal();
3928 let cx = test_cx();
3929
3930 let cut_byte = WAL_HEADER_SIZE + partial * frame_size / 3;
3931 let mut f = open_wal_file(&vfs, &cx);
3932 f.truncate(&cx, u64::try_from(cut_byte).unwrap())
3933 .expect("truncate");
3934 drop(f);
3935
3936 let f2 = open_wal_file(&vfs, &cx);
3937 let wal = WalFile::open(&cx, f2).expect("open");
3938 assert!(
3940 wal.frame_count() == 0 || wal.frame_count() == 3 || wal.frame_count() == 6,
3941 "cut_byte={cut_byte} gave frame_count={}, expected 0/3/6",
3942 wal.frame_count()
3943 );
3944 wal.close(&cx).expect("close");
3945 }
3946 }
3947
3948 #[test]
3949 fn test_crash_matrix_bit_flip_at_every_frame() {
3950 for target_frame in 0..6usize {
3953 let (vfs, _) = build_two_txn_wal();
3954 let cx = test_cx();
3955
3956 let frame_size = WAL_FRAME_HEADER_SIZE + usize::try_from(PAGE_SIZE).unwrap();
3957 let corrupt_offset =
3958 WAL_HEADER_SIZE + target_frame * frame_size + WAL_FRAME_HEADER_SIZE + 42;
3959
3960 let mut f = open_wal_file(&vfs, &cx);
3962 let mut buf = [0u8; 1];
3963 let off = u64::try_from(corrupt_offset).unwrap();
3964 f.read(&cx, &mut buf, off).expect("read");
3965 buf[0] ^= 0xFF;
3966 f.write(&cx, &buf, off).expect("write corrupt");
3967 drop(f);
3968
3969 let f2 = open_wal_file(&vfs, &cx);
3970 let wal = WalFile::open(&cx, f2).expect("open");
3971 let expected = if target_frame < 3 {
3972 0 } else {
3974 3 };
3976 assert_eq!(
3977 wal.frame_count(),
3978 expected,
3979 "bit flip in frame {target_frame} should give {expected}"
3980 );
3981 wal.close(&cx).expect("close");
3982 }
3983 }
3984
3985 #[test]
3986 fn test_crash_matrix_continue_after_recovery() {
3987 let (vfs, _) = build_two_txn_wal();
3990 let cx = test_cx();
3991
3992 let frame_size = WAL_FRAME_HEADER_SIZE + usize::try_from(PAGE_SIZE).unwrap();
3993
3994 let corrupt_offset = WAL_HEADER_SIZE + 4 * frame_size + WAL_FRAME_HEADER_SIZE + 10;
3996 let mut f = open_wal_file(&vfs, &cx);
3997 let mut buf = [0u8; 1];
3998 let off = u64::try_from(corrupt_offset).unwrap();
3999 f.read(&cx, &mut buf, off).expect("read");
4000 buf[0] ^= 0xAA;
4001 f.write(&cx, &buf, off).expect("write corrupt");
4002 drop(f);
4003
4004 let f2 = open_wal_file(&vfs, &cx);
4006 let mut wal = WalFile::open(&cx, f2).expect("open");
4007 assert_eq!(wal.frame_count(), 3);
4008
4009 wal.append_frame(&cx, 10, &sample_page(0xAA), 0)
4011 .expect("append");
4012 wal.append_frame(&cx, 11, &sample_page(0xBB), 5)
4013 .expect("append commit");
4014 assert_eq!(wal.frame_count(), 5);
4015 let checksum_after = wal.running_checksum();
4016 wal.close(&cx).expect("close");
4017
4018 let f3 = open_wal_file(&vfs, &cx);
4020 let wal2 = WalFile::open(&cx, f3).expect("reopen");
4021 assert_eq!(wal2.frame_count(), 5);
4022 assert_eq!(wal2.running_checksum(), checksum_after);
4023
4024 for i in 0..3 {
4026 let (h, d) = wal2.read_frame(&cx, i).expect("read");
4027 let pg = u32::try_from(i + 1).unwrap();
4028 assert_eq!(h.page_number, pg);
4029 assert_eq!(d, sample_page(u8::try_from(pg).unwrap()));
4030 }
4031
4032 let (h4, d4) = wal2.read_frame(&cx, 3).expect("read new frame 4");
4034 assert_eq!(h4.page_number, 10);
4035 assert_eq!(d4, sample_page(0xAA));
4036
4037 wal2.close(&cx).expect("close");
4038 }
4039
4040 #[test]
4041 fn test_crash_matrix_zero_length_wal() {
4042 let cx = test_cx();
4044 let vfs = MemoryVfs::new();
4045 let file = open_wal_file(&vfs, &cx);
4046 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4047 wal.close(&cx).expect("close");
4048
4049 let f2 = open_wal_file(&vfs, &cx);
4050 let wal2 = WalFile::open(&cx, f2).expect("open");
4051 assert_eq!(wal2.frame_count(), 0);
4052 wal2.close(&cx).expect("close");
4053 }
4054
4055 #[test]
4056 fn test_crash_matrix_header_only_partial_first_frame() {
4057 let cx = test_cx();
4059 let vfs = MemoryVfs::new();
4060 let file = open_wal_file(&vfs, &cx);
4061
4062 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4063 wal.append_frame(&cx, 1, &sample_page(1), 1)
4064 .expect("append");
4065 let partial_size = WAL_HEADER_SIZE + WAL_FRAME_HEADER_SIZE + 10;
4066 wal.file_mut()
4067 .truncate(&cx, u64::try_from(partial_size).unwrap())
4068 .expect("truncate");
4069 wal.close(&cx).expect("close");
4070
4071 let f2 = open_wal_file(&vfs, &cx);
4072 let wal2 = WalFile::open(&cx, f2).expect("open");
4073 assert_eq!(wal2.frame_count(), 0, "partial frame should be dropped");
4074 wal2.close(&cx).expect("close");
4075 }
4076
4077 #[test]
4078 fn test_crash_matrix_many_txns_deterministic_recovery() {
4079 let cx = test_cx();
4082
4083 for crash_txn in 0..=10usize {
4084 let vfs = MemoryVfs::new();
4085 let file = open_wal_file(&vfs, &cx);
4086 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4087
4088 let total_frames = crash_txn * 3;
4089 for txn in 0..crash_txn {
4090 for f in 0..3u32 {
4091 let pg = u32::try_from(txn * 3).unwrap() + f + 1;
4092 let db_size = if f == 2 {
4093 u32::try_from(txn * 3 + 3).unwrap()
4094 } else {
4095 0
4096 };
4097 let seed = u8::try_from(pg % 251).unwrap();
4098 wal.append_frame(&cx, pg, &sample_page(seed), db_size)
4099 .expect("append");
4100 }
4101 }
4102 assert_eq!(wal.frame_count(), total_frames);
4103 wal.close(&cx).expect("close");
4104
4105 let f2 = open_wal_file(&vfs, &cx);
4107 let wal2 = WalFile::open(&cx, f2).expect("open");
4108 assert_eq!(
4109 wal2.frame_count(),
4110 total_frames,
4111 "crash_txn={crash_txn}: all {total_frames} committed frames should survive"
4112 );
4113 wal2.close(&cx).expect("close");
4114
4115 if crash_txn < 10 {
4117 let f3 = open_wal_file(&vfs, &cx);
4119 let mut wal3 = WalFile::open(&cx, f3).expect("open");
4120 let extra_pg = u32::try_from(total_frames + 1).unwrap();
4121 wal3.append_frame(
4122 &cx,
4123 extra_pg,
4124 &sample_page(u8::try_from(extra_pg % 251).unwrap()),
4125 0,
4126 )
4127 .expect("append uncommitted");
4128 wal3.close(&cx).expect("close");
4129
4130 let f4 = open_wal_file(&vfs, &cx);
4132 let wal4 = WalFile::open(&cx, f4).expect("open");
4133 assert_eq!(
4134 wal4.frame_count(),
4135 total_frames,
4136 "crash_txn={crash_txn}: uncommitted extra frame dropped"
4137 );
4138 wal4.close(&cx).expect("close");
4139 }
4140 }
4141 }
4142
4143 #[test]
4144 fn test_crash_matrix_reset_then_crash() {
4145 let cx = test_cx();
4147 let vfs = MemoryVfs::new();
4148 let file = open_wal_file(&vfs, &cx);
4149 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4150
4151 wal.append_frame(&cx, 1, &sample_page(1), 1)
4153 .expect("append");
4154 let new_salts = WalSalts {
4156 salt1: 0x5555_6666,
4157 salt2: 0x7777_8888,
4158 };
4159 wal.reset(&cx, 1, new_salts, true).expect("reset");
4160 assert_eq!(wal.frame_count(), 0);
4161
4162 wal.append_frame(&cx, 1, &sample_page(0xCC), 0)
4164 .expect("append");
4165 wal.append_frame(&cx, 2, &sample_page(0xDD), 0)
4166 .expect("append");
4167 wal.close(&cx).expect("close");
4168
4169 let f2 = open_wal_file(&vfs, &cx);
4171 let wal2 = WalFile::open(&cx, f2).expect("open");
4172 assert_eq!(wal2.frame_count(), 0, "no commits after reset");
4173 assert_eq!(wal2.header().salts, new_salts);
4174 wal2.close(&cx).expect("close");
4175 }
4176
4177 #[test]
4178 fn test_truncated_file_mid_second_txn_recovers_first_commit() {
4179 let cx = test_cx();
4180 let vfs = MemoryVfs::new();
4181 let file = open_wal_file(&vfs, &cx);
4182
4183 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4184
4185 wal.append_frame(&cx, 1, &sample_page(0x11), 0)
4187 .expect("append");
4188 wal.append_frame(&cx, 2, &sample_page(0x22), 2)
4189 .expect("commit txn1");
4190
4191 wal.append_frame(&cx, 3, &sample_page(0x33), 0)
4193 .expect("append");
4194 wal.append_frame(&cx, 4, &sample_page(0x44), 0)
4195 .expect("append");
4196 assert_eq!(wal.frame_count(), 4);
4197 let frame_size = wal.frame_size();
4198 wal.close(&cx).expect("close");
4199
4200 let truncate_offset =
4202 u64::try_from(WAL_HEADER_SIZE + frame_size * 2 + frame_size / 2).unwrap();
4203 let mut f = open_wal_file(&vfs, &cx);
4204 f.truncate(&cx, truncate_offset).expect("truncate");
4205 drop(f);
4206
4207 let file2 = open_wal_file(&vfs, &cx);
4208 let wal2 = WalFile::open(&cx, file2).expect("reopen after truncation");
4209 assert_eq!(
4210 wal2.frame_count(),
4211 2,
4212 "only first committed transaction (2 frames) should survive truncation"
4213 );
4214 let h = wal2.read_frame_header(&cx, 1).expect("read frame 2 header");
4215 assert!(h.is_commit(), "frame 2 must be a commit frame");
4216 wal2.close(&cx).expect("close");
4217 }
4218
4219 #[test]
4220 fn test_recovery_is_idempotent_across_multiple_reopens() {
4221 let cx = test_cx();
4222 let vfs = MemoryVfs::new();
4223 let file = open_wal_file(&vfs, &cx);
4224
4225 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4226
4227 wal.append_frame(&cx, 1, &sample_page(0xAA), 0)
4229 .expect("append");
4230 wal.append_frame(&cx, 2, &sample_page(0xBB), 2)
4231 .expect("commit txn1");
4232 wal.append_frame(&cx, 3, &sample_page(0xCC), 0)
4233 .expect("append");
4234 wal.append_frame(&cx, 4, &sample_page(0xDD), 4)
4235 .expect("commit txn2");
4236 wal.append_frame(&cx, 5, &sample_page(0xEE), 0)
4238 .expect("append");
4239 wal.close(&cx).expect("close");
4240
4241 for reopen in 0..3_u32 {
4243 let f = open_wal_file(&vfs, &cx);
4244 let wal_reopened = WalFile::open(&cx, f).expect("reopen");
4245 assert_eq!(
4246 wal_reopened.frame_count(),
4247 4,
4248 "reopen {reopen}: committed frame count must be 4"
4249 );
4250 let (_, data) = wal_reopened.read_frame(&cx, 3).expect("read frame 4");
4251 assert_eq!(
4252 data,
4253 sample_page(0xDD),
4254 "reopen {reopen}: frame 4 content must be intact"
4255 );
4256 wal_reopened.close(&cx).expect("close");
4257 }
4258 }
4259
4260 #[test]
4261 fn wal_generation_identity_from_header_and_eq() {
4262 let header = WalHeader {
4263 magic: WAL_MAGIC_LE,
4264 format_version: WAL_FORMAT_VERSION,
4265 page_size: PAGE_SIZE,
4266 checkpoint_seq: 7,
4267 salts: test_salts(),
4268 checksum: SqliteWalChecksum { s1: 0, s2: 0 },
4269 };
4270 let identity = WalGenerationIdentity::from_header(&header);
4271 assert_eq!(identity.checkpoint_seq, 7);
4272 assert_eq!(identity.salts, test_salts());
4273 let copied = identity;
4274 assert_eq!(copied, identity);
4275 let other = WalGenerationIdentity {
4276 checkpoint_seq: 8,
4277 salts: test_salts(),
4278 };
4279 assert_ne!(identity, other);
4280 let dbg = format!("{identity:?}");
4281 assert!(dbg.contains("WalGenerationIdentity"));
4282 }
4283
4284 #[test]
4285 fn wal_append_frame_ref_debug_clone_copy() {
4286 let data = [0xABu8; 16];
4287 let frame = WalAppendFrameRef {
4288 page_number: 3,
4289 page_data: &data,
4290 db_size_if_commit: 10,
4291 };
4292 let copied = frame;
4293 assert_eq!(copied.page_number, 3);
4294 assert_eq!(copied.db_size_if_commit, 10);
4295 assert_eq!(copied.page_data[0], 0xAB);
4296 let cloned = frame;
4297 assert_eq!(cloned.page_number, frame.page_number);
4298 let dbg = format!("{frame:?}");
4299 assert!(dbg.contains("WalAppendFrameRef"));
4300 }
4301
4302 #[test]
4303 fn wal_file_generation_identity_matches_create_params() {
4304 let cx = test_cx();
4305 let vfs = MemoryVfs::new();
4306 let file = open_wal_file(&vfs, &cx);
4307 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4308 let identity = wal.generation_identity();
4309 assert_eq!(identity.checkpoint_seq, 0);
4310 assert_eq!(identity.salts, test_salts());
4311 wal.close(&cx).expect("close");
4312 }
4313
4314 #[test]
4315 fn wal_file_page_size_and_frame_count_after_create() {
4316 let cx = test_cx();
4317 let vfs = MemoryVfs::new();
4318 let file = open_wal_file(&vfs, &cx);
4319 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4320 assert_eq!(wal.page_size(), PAGE_SIZE as usize);
4321 assert_eq!(wal.frame_count(), 0);
4322 assert!(wal.last_commit_frame(&cx).expect("query").is_none());
4323 wal.append_frame(&cx, 1, &sample_page(1), 5)
4324 .expect("append");
4325 assert_eq!(wal.frame_count(), 1);
4326 assert_eq!(wal.last_commit_frame(&cx).expect("query"), Some(0));
4327 wal.close(&cx).expect("close");
4328 }
4329
4330 #[test]
4331 fn wal_file_big_endian_checksum_and_running_checksum() {
4332 let cx = test_cx();
4333 let vfs = MemoryVfs::new();
4334 let file = open_wal_file(&vfs, &cx);
4335 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4336 let _be = wal.big_endian_checksum();
4337 let rc = wal.running_checksum();
4338 assert_eq!(rc.s1.wrapping_add(0), rc.s1);
4339 wal.close(&cx).expect("close");
4340 }
4341
4342 #[test]
4343 fn wal_file_frame_size_equals_header_plus_page() {
4344 let cx = test_cx();
4345 let vfs = MemoryVfs::new();
4346 let file = open_wal_file(&vfs, &cx);
4347 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4348 assert_eq!(wal.frame_size(), WAL_FRAME_HEADER_SIZE + PAGE_SIZE as usize);
4349 wal.close(&cx).expect("close");
4350 }
4351
4352 #[test]
4353 fn wal_file_header_accessor() {
4354 let cx = test_cx();
4355 let vfs = MemoryVfs::new();
4356 let file = open_wal_file(&vfs, &cx);
4357 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4358 let hdr = wal.header();
4359 assert_eq!(hdr.page_size, PAGE_SIZE);
4360 assert_eq!(hdr.salts, test_salts());
4361 wal.close(&cx).expect("close");
4362 }
4363
4364 #[test]
4365 fn wal_file_file_and_file_mut_accessors() {
4366 let cx = test_cx();
4367 let vfs = MemoryVfs::new();
4368 let file = open_wal_file(&vfs, &cx);
4369 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4370 let _f_ref = wal.file();
4371 let _f_mut = wal.file_mut();
4372 wal.close(&cx).expect("close");
4373 }
4374
4375 #[test]
4376 fn durable_sync_records_fsynced_frame_count() {
4377 let cx = test_cx();
4378 let vfs = MemoryVfs::new();
4379 let file = open_wal_file(&vfs, &cx);
4380 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4381 assert_eq!(wal.last_fsynced_frame_count(), 0);
4382
4383 let page = vec![0xABu8; PAGE_SIZE as usize];
4384 let frames = [frame_ref(1, &page, 1)];
4385 wal.append_frames(&cx, &frames).expect("append");
4386 assert_eq!(wal.frame_count(), 1);
4387 assert_eq!(wal.last_fsynced_frame_count(), 0);
4388
4389 wal.durable_sync(&cx, fsqlite_vfs::SyncKind::FullDurable)
4390 .expect("durable_sync");
4391 assert_eq!(wal.last_fsynced_frame_count(), 1);
4392 wal.close(&cx).expect("close");
4393 }
4394
4395 #[test]
4396 fn assert_publish_safe_passes_after_durable_sync() {
4397 let cx = test_cx();
4398 let vfs = MemoryVfs::new();
4399 let file = open_wal_file(&vfs, &cx);
4400 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4401
4402 let page = vec![0xCDu8; PAGE_SIZE as usize];
4403 let frames = [frame_ref(1, &page, 1)];
4404 wal.append_frames(&cx, &frames).expect("append");
4405 wal.durable_sync(&cx, fsqlite_vfs::SyncKind::FullDurable)
4406 .expect("durable_sync");
4407 wal.assert_publish_safe(1)
4408 .expect("should be safe after fsync");
4409 wal.close(&cx).expect("close");
4410 }
4411
4412 #[test]
4413 fn assert_publish_safe_passes_for_zero_frames() {
4414 let cx = test_cx();
4415 let vfs = MemoryVfs::new();
4416 let file = open_wal_file(&vfs, &cx);
4417 let wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4418 wal.assert_publish_safe(0).expect("zero frames always safe");
4419 wal.close(&cx).expect("close");
4420 }
4421
4422 #[test]
4423 fn durable_sync_with_data_only_kind() {
4424 let cx = test_cx();
4425 let vfs = MemoryVfs::new();
4426 let file = open_wal_file(&vfs, &cx);
4427 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4428
4429 let page = vec![0xEFu8; PAGE_SIZE as usize];
4430 let frames = [frame_ref(2, &page, 1)];
4431 wal.append_frames(&cx, &frames).expect("append");
4432 wal.durable_sync(&cx, fsqlite_vfs::SyncKind::DataOnly)
4433 .expect("data-only sync");
4434 assert_eq!(wal.last_fsynced_frame_count(), 1);
4435 wal.close(&cx).expect("close");
4436 }
4437
4438 #[test]
4439 fn durable_sync_reset_clears_fsynced_count() {
4440 let cx = test_cx();
4441 let vfs = MemoryVfs::new();
4442 let file = open_wal_file(&vfs, &cx);
4443 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4444
4445 let page = vec![0x11u8; PAGE_SIZE as usize];
4446 let frames = [frame_ref(1, &page, 1)];
4447 wal.append_frames(&cx, &frames).expect("append");
4448 wal.durable_sync(&cx, fsqlite_vfs::SyncKind::FullDurable)
4449 .expect("durable_sync");
4450 assert_eq!(wal.last_fsynced_frame_count(), 1);
4451
4452 let new_salts = WalSalts {
4453 salt1: 0x1111_1111,
4454 salt2: 0x2222_2222,
4455 };
4456 wal.reset(&cx, 1, new_salts, true).expect("reset");
4457 assert_eq!(wal.last_fsynced_frame_count(), 0);
4458 wal.close(&cx).expect("close");
4459 }
4460
4461 #[test]
4462 fn opened_wal_has_fsynced_count_equal_to_frame_count() {
4463 let cx = test_cx();
4464 let vfs = MemoryVfs::new();
4465 let file = open_wal_file(&vfs, &cx);
4466 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4467
4468 let page = vec![0x22u8; PAGE_SIZE as usize];
4469 let frames = [frame_ref(1, &page, 1)];
4470 wal.append_frames(&cx, &frames).expect("append");
4471 wal.durable_sync(&cx, fsqlite_vfs::SyncKind::FullDurable)
4472 .expect("durable_sync");
4473 wal.close(&cx).expect("close");
4474
4475 let file2 = open_wal_file(&vfs, &cx);
4476 let wal2 = WalFile::open(&cx, file2).expect("open");
4477 assert_eq!(wal2.frame_count(), 1);
4478 assert_eq!(
4479 wal2.last_fsynced_frame_count(),
4480 wal2.frame_count(),
4481 "opened WAL assumes existing frames are durable"
4482 );
4483 wal2.close(&cx).expect("close");
4484 }
4485
4486 #[test]
4487 fn crash_before_wal_frame_append_preserves_existing_frames() {
4488 let _guard = FAULT_TEST_LOCK.lock().unwrap();
4489 let cx = test_cx();
4490 let vfs = MemoryVfs::new();
4491 let file = open_wal_file(&vfs, &cx);
4492 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4493
4494 let p1 = sample_page(0xAA);
4495 let first_frames = [frame_ref(1, &p1, 1)];
4496 wal.append_frames(&cx, &first_frames).expect("first append");
4497 wal.durable_sync(&cx, SyncKind::FullDurable)
4498 .expect("sync first frame");
4499 assert_eq!(wal.frame_count(), 1);
4500
4501 crate::fault_hooks::arm_crash_boundary(
4502 crate::fault_hooks::CrashBoundary::BeforeWalFrameAppend,
4503 crate::fault_hooks::FaultHookArm::new(
4504 "crash-before-append",
4505 "WAL-FRAME-APPEND-CRASH",
4506 "test_crash_before_append",
4507 ),
4508 );
4509
4510 let p2 = sample_page(0xBB);
4511 let second_frames = [frame_ref(2, &p2, 2)];
4512 let err = wal
4513 .append_frames(&cx, &second_frames)
4514 .expect_err("should fail at crash boundary");
4515 assert!(
4516 err.to_string().contains("fault_inject"),
4517 "error identifies the fault hook: {err}"
4518 );
4519
4520 crate::fault_hooks::clear_crash_boundary();
4521
4522 wal.close(&cx).expect("close after crash");
4523
4524 let file2 = open_wal_file(&vfs, &cx);
4525 let recovered = WalFile::open(&cx, file2).expect("reopen");
4526 assert_eq!(
4527 recovered.frame_count(),
4528 1,
4529 "only the first committed frame survives; the second was never written"
4530 );
4531 recovered.close(&cx).expect("close recovered");
4532 }
4533
4534 #[test]
4535 fn crash_after_fsync_before_publish_leaves_frames_durable() {
4536 let _guard = FAULT_TEST_LOCK.lock().unwrap();
4537 let cx = test_cx();
4538 let vfs = MemoryVfs::new();
4539 let file = open_wal_file(&vfs, &cx);
4540 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4541
4542 let p1 = sample_page(0xCC);
4543 let frames = [frame_ref(1, &p1, 1)];
4544 wal.append_frames(&cx, &frames).expect("append");
4545
4546 crate::fault_hooks::arm_crash_boundary(
4547 crate::fault_hooks::CrashBoundary::AfterFsyncBeforePublish,
4548 crate::fault_hooks::FaultHookArm::new(
4549 "crash-after-fsync",
4550 "WAL-FSYNC-PUBLISH-CRASH",
4551 "test_crash_after_fsync",
4552 ),
4553 );
4554
4555 let err = wal
4556 .durable_sync(&cx, SyncKind::FullDurable)
4557 .expect_err("should fail at crash boundary");
4558 assert!(
4559 err.to_string().contains("fault_inject"),
4560 "error identifies the fault hook: {err}"
4561 );
4562
4563 crate::fault_hooks::clear_crash_boundary();
4564
4565 wal.close(&cx).expect("close after crash");
4566
4567 let file2 = open_wal_file(&vfs, &cx);
4568 let recovered = WalFile::open(&cx, file2).expect("reopen");
4569 assert_eq!(
4570 recovered.frame_count(),
4571 1,
4572 "frame was fsynced before crash so it survives recovery"
4573 );
4574 recovered.close(&cx).expect("close recovered");
4575 }
4576
4577 #[test]
4578 fn crash_before_wal_header_write_on_reset_preserves_old_generation() {
4579 let _guard = FAULT_TEST_LOCK.lock().unwrap();
4580 let cx = test_cx();
4581 let vfs = MemoryVfs::new();
4582 let file = open_wal_file(&vfs, &cx);
4583 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4584
4585 let p1 = sample_page(0xDD);
4586 let frames = [frame_ref(1, &p1, 1)];
4587 wal.append_frames(&cx, &frames).expect("append");
4588 wal.durable_sync(&cx, SyncKind::FullDurable).expect("sync");
4589 assert_eq!(wal.frame_count(), 1);
4590
4591 let original_salts = wal.generation_identity().salts;
4592
4593 crate::fault_hooks::arm_crash_boundary(
4594 crate::fault_hooks::CrashBoundary::BeforeWalHeaderWrite,
4595 crate::fault_hooks::FaultHookArm::new(
4596 "crash-before-header",
4597 "WAL-HEADER-WRITE-CRASH",
4598 "test_crash_before_header",
4599 ),
4600 );
4601
4602 let new_salts = WalSalts {
4603 salt1: original_salts.salt1.wrapping_add(1),
4604 salt2: original_salts.salt2.wrapping_add(1),
4605 };
4606 let err = wal
4607 .reset(&cx, 1, new_salts, true)
4608 .expect_err("should fail at crash boundary");
4609 assert!(
4610 err.to_string().contains("fault_inject"),
4611 "error identifies the fault hook: {err}"
4612 );
4613
4614 crate::fault_hooks::clear_crash_boundary();
4615
4616 wal.close(&cx).expect("close after crash");
4617
4618 let file2 = open_wal_file(&vfs, &cx);
4619 let recovered = WalFile::open(&cx, file2).expect("reopen");
4620 assert_eq!(
4621 recovered.frame_count(),
4622 1,
4623 "header was never rewritten so old generation with 1 frame persists"
4624 );
4625 assert_eq!(
4626 recovered.generation_identity().salts,
4627 original_salts,
4628 "salts unchanged — reset never wrote new header"
4629 );
4630 recovered.close(&cx).expect("close recovered");
4631 }
4632
4633 #[test]
4634 fn crash_after_frame_append_before_fsync_frames_on_disk_but_not_durable() {
4635 let _guard = FAULT_TEST_LOCK.lock().unwrap();
4636 let cx = test_cx();
4637 let vfs = MemoryVfs::new();
4638 let file = open_wal_file(&vfs, &cx);
4639 let mut wal = WalFile::create(&cx, file, PAGE_SIZE, 0, test_salts()).expect("create");
4640
4641 crate::fault_hooks::arm_crash_boundary(
4642 crate::fault_hooks::CrashBoundary::AfterWalFrameAppendBeforeFsync,
4643 crate::fault_hooks::FaultHookArm::new(
4644 "crash-after-append",
4645 "WAL-APPEND-FSYNC-CRASH",
4646 "test_crash_after_append_before_fsync",
4647 ),
4648 );
4649
4650 let p1 = sample_page(0xEE);
4651 let frames = [frame_ref(1, &p1, 1)];
4652 let err = wal
4653 .append_frames(&cx, &frames)
4654 .expect_err("should fail after append but before fsync");
4655 assert!(
4656 err.to_string().contains("fault_inject"),
4657 "error identifies the fault hook: {err}"
4658 );
4659
4660 crate::fault_hooks::clear_crash_boundary();
4661
4662 assert_eq!(
4663 wal.last_fsynced_frame_count(),
4664 0,
4665 "no fsync happened so fsynced count is still zero"
4666 );
4667
4668 wal.close(&cx).expect("close after crash");
4669
4670 let file2 = open_wal_file(&vfs, &cx);
4671 let recovered = WalFile::open(&cx, file2).expect("reopen");
4672 assert!(
4673 recovered.frame_count() <= 1,
4674 "frame may or may not survive recovery depending on MemoryVfs behavior (data written but not fsynced)"
4675 );
4676 recovered.close(&cx).expect("close recovered");
4677 }
4678}