Skip to main content

walrust/
ltx.rs

1//! LTX (Litestream Transaction) format support
2//!
3//! This module provides utilities for encoding and decoding LTX files,
4//! which are Litestream-compatible transaction files containing SQLite pages.
5
6use anyhow::{anyhow, Result};
7use litepages::{Checksum, Decoder, Encoder, Header, HeaderFlags, PageNum, PageSize, TXID};
8use std::io::{Read, Write};
9use std::path::Path;
10use std::time::SystemTime;
11
12/// Create an LTX file from a SQLite database snapshot
13pub fn encode_snapshot<W: Write>(
14    writer: W,
15    db_path: &Path,
16    page_size: u32,
17    txid: u64,
18) -> Result<()> {
19    let db_data = std::fs::read(db_path)?;
20    let page_size_val = PageSize::new(page_size).map_err(|e| anyhow!("Invalid page size: {}", e))?;
21    let num_pages = db_data.len() / page_size as usize;
22
23    let header = Header {
24        flags: HeaderFlags::COMPRESS_LZ4,
25        page_size: page_size_val,
26        commit: PageNum::new(num_pages as u32).map_err(|e| anyhow!("Invalid page count: {}", e))?,
27        min_txid: TXID::ONE, // Snapshot starts at TXID 1
28        max_txid: TXID::new(txid).map_err(|e| anyhow!("Invalid TXID: {}", e))?,
29        timestamp: SystemTime::now(),
30        pre_apply_checksum: None,
31    };
32
33    let mut encoder = Encoder::new(writer, &header)?;
34
35    // Encode each page
36    for i in 0..num_pages {
37        let page_num = PageNum::new((i + 1) as u32).map_err(|e| anyhow!("Invalid page num: {}", e))?;
38        let start = i * page_size as usize;
39        let end = start + page_size as usize;
40        let page_data = &db_data[start..end];
41
42        encoder.encode_page(page_num, page_data)?;
43    }
44
45    // Compute final checksum and finish
46    let checksum = compute_db_checksum(&db_data);
47    encoder.finish(checksum)?;
48
49    Ok(())
50}
51
52/// Result of decoding an LTX snapshot, including the post-apply checksum for chain verification
53#[derive(Debug)]
54pub struct DecodeResult {
55    pub header: Header,
56    pub post_apply_checksum: Checksum,
57}
58
59/// Decode an LTX file and reconstruct the database (full write)
60///
61/// Returns the header and post_apply_checksum for chain tracking.
62/// The post_apply_checksum should be used as the expected pre_apply_checksum
63/// for the next incremental LTX file.
64///
65/// Checksum verification is skipped when the NO_CHECKSUM flag is set (litestream compatibility).
66pub fn decode_to_db<R: Read>(reader: R, output_path: &Path) -> Result<DecodeResult> {
67    let (mut decoder, header) = Decoder::new(reader)?;
68
69    // Check if this is a litestream file with NO_CHECKSUM flag
70    let skip_checksums = header.flags.contains(HeaderFlags::NO_CHECKSUM);
71
72    if skip_checksums {
73        tracing::debug!(
74            "Skipping checksum verification (NO_CHECKSUM flag set - litestream compatibility)"
75        );
76    }
77
78    let page_size = header.page_size.into_inner() as usize;
79    let num_pages = header.commit.into_inner() as usize;
80
81    let mut db_data = vec![0u8; num_pages * page_size];
82    let mut page_buf = vec![0u8; page_size];
83
84    while let Some(page_num) = decoder.decode_page(&mut page_buf)? {
85        let idx = (page_num.into_inner() - 1) as usize;
86        let start = idx * page_size;
87        db_data[start..start + page_size].copy_from_slice(&page_buf);
88    }
89
90    // Verify file checksum (internal integrity)
91    let trailer = decoder.finish()?;
92
93    // Write database file
94    std::fs::write(output_path, &db_data)?;
95
96    // Compute actual checksum from database
97    let actual_checksum = compute_db_checksum(&db_data);
98
99    // Verify post_apply_checksum matches actual written DB (skip if NO_CHECKSUM)
100    if !skip_checksums {
101        if trailer.post_apply_checksum != actual_checksum {
102            return Err(anyhow!(
103                "Post-apply checksum mismatch after decode: expected {:016x}, got {:016x}. \
104                 This may indicate corruption in the LTX file.",
105                trailer.post_apply_checksum.into_inner(),
106                actual_checksum.into_inner()
107            ));
108        }
109        tracing::debug!(
110            "Post-apply checksum verified: {:016x}",
111            actual_checksum.into_inner()
112        );
113    }
114
115    tracing::debug!(
116        "Decoded snapshot (TXID {}-{})",
117        header.min_txid.into_inner(),
118        header.max_txid.into_inner()
119    );
120
121    Ok(DecodeResult {
122        header,
123        post_apply_checksum: actual_checksum,
124    })
125}
126
127/// Result of applying an LTX file, including the post-apply checksum for chain verification
128#[derive(Debug)]
129pub struct ApplyResult {
130    pub header: Header,
131    pub post_apply_checksum: Checksum,
132}
133
134/// Apply an incremental LTX file to an existing database (in-place page writes)
135///
136/// This verifies the checksum chain using chained page checksums:
137/// 1. After applying: verifies post_apply_checksum matches chain_checksum(pre, decoded_pages)
138///
139/// No full-DB read is needed for verification — the chain is self-contained.
140///
141/// Checksum verification is skipped when the NO_CHECKSUM flag is set (litestream compatibility).
142///
143/// Returns the header and post_apply_checksum for chain tracking.
144pub fn apply_ltx_to_db<R: Read>(reader: R, db_path: &Path) -> Result<ApplyResult> {
145    use std::fs::OpenOptions;
146    use std::io::{Seek, SeekFrom, Write as IoWrite};
147
148    let (mut decoder, header) = Decoder::new(reader)?;
149
150    // Check if this is a litestream file with NO_CHECKSUM flag
151    let skip_checksums = header.flags.contains(HeaderFlags::NO_CHECKSUM);
152
153    if skip_checksums {
154        tracing::debug!(
155            "Skipping checksum verification (NO_CHECKSUM flag set - litestream compatibility)"
156        );
157    }
158
159    let page_size = header.page_size.into_inner() as usize;
160    let mut page_buf = vec![0u8; page_size];
161
162    // Open existing db file for page-level writes
163    let mut file = OpenOptions::new()
164        .write(true)
165        .open(db_path)
166        .map_err(|e| anyhow!("Failed to open database for in-place apply: {}", e))?;
167
168    // Streaming chain hasher: hash each page during decode instead of accumulating
169    // a Vec<(u32, Vec<u8>)>. Pages arrive sorted from our encoder.
170    let mut chain_hasher = header.pre_apply_checksum.map(ChainHasher::new);
171
172    while let Some(page_num) = decoder.decode_page(&mut page_buf)? {
173        let offset = (page_num.into_inner() as u64 - 1) * page_size as u64;
174        file.seek(SeekFrom::Start(offset))?;
175        file.write_all(&page_buf)?;
176        if let Some(ref mut hasher) = chain_hasher {
177            hasher.update(page_num.into_inner(), &page_buf);
178        }
179    }
180
181    // Ensure all writes are flushed
182    file.sync_all()?;
183    drop(file);
184
185    // Verify file checksum (internal integrity)
186    let trailer = decoder.finish()?;
187
188    // Get page count before consuming the hasher
189    let page_count = chain_hasher.as_ref().map(|h| h.page_count()).unwrap_or(0);
190
191    // Verify checksums using chained page hash (skip if NO_CHECKSUM)
192    let post_checksum = if !skip_checksums {
193        if let Some(expected_pre) = header.pre_apply_checksum {
194            tracing::debug!(
195                "Pre-apply checksum: {:016x}",
196                expected_pre.into_inner()
197            );
198        }
199
200        if let Some(hasher) = chain_hasher {
201            let expected_post = hasher.finish();
202            if trailer.post_apply_checksum != expected_post {
203                return Err(anyhow!(
204                    "Post-apply checksum mismatch: expected {:016x}, got {:016x}. \
205                     This may indicate corruption during apply.",
206                    trailer.post_apply_checksum.into_inner(),
207                    expected_post.into_inner()
208                ));
209            }
210            tracing::debug!(
211                "Post-apply checksum verified (chain): {:016x}",
212                expected_post.into_inner()
213            );
214            expected_post
215        } else {
216            trailer.post_apply_checksum
217        }
218    } else {
219        if let Some(hasher) = chain_hasher {
220            hasher.finish()
221        } else {
222            compute_checksum_from_file(db_path)?
223        }
224    };
225
226    tracing::debug!(
227        "Applied {} pages in-place (TXID {}-{})",
228        page_count,
229        header.min_txid.into_inner(),
230        header.max_txid.into_inner()
231    );
232
233    Ok(ApplyResult {
234        header,
235        post_apply_checksum: post_checksum,
236    })
237}
238
239/// Compute checksum from database file (for checksum tracking)
240pub fn compute_checksum_from_file(db_path: &Path) -> Result<Checksum> {
241    let data = std::fs::read(db_path)?;
242    Ok(compute_db_checksum(&data))
243}
244
245/// Encode WAL changes as an LTX file (incremental, not snapshot)
246///
247/// `pre_checksum`: Checksum of database BEFORE applying these changes (required for incrementals)
248/// `post_checksum`: Checksum of database AFTER applying these changes
249///
250/// The caller must compute `post_checksum` by simulating the changes or reading the final state.
251pub fn encode_wal_changes<W: Write>(
252    writer: W,
253    pages: &[(u32, Vec<u8>)], // (page_num, page_data)
254    page_size: u32,
255    min_txid: u64,
256    max_txid: u64,
257    commit_page: u32,
258    pre_checksum: Option<Checksum>,
259    post_checksum: Checksum,
260) -> Result<Checksum> {
261    let page_size_val = PageSize::new(page_size).map_err(|e| anyhow!("Invalid page size: {}", e))?;
262
263    let header = Header {
264        flags: HeaderFlags::COMPRESS_LZ4,
265        page_size: page_size_val,
266        commit: PageNum::new(commit_page).map_err(|e| anyhow!("Invalid commit page: {}", e))?,
267        min_txid: TXID::new(min_txid).map_err(|e| anyhow!("Invalid min TXID: {}", e))?,
268        max_txid: TXID::new(max_txid).map_err(|e| anyhow!("Invalid max TXID: {}", e))?,
269        timestamp: SystemTime::now(),
270        pre_apply_checksum: pre_checksum,
271    };
272
273    let mut encoder = Encoder::new(writer, &header)?;
274
275    // Sort by index to avoid cloning all page data
276    let mut indices: Vec<usize> = (0..pages.len()).collect();
277    indices.sort_by_key(|&i| pages[i].0);
278
279    for &i in &indices {
280        let pn = PageNum::new(pages[i].0).map_err(|e| anyhow!("Invalid page num: {}", e))?;
281        encoder.encode_page(pn, &pages[i].1)?;
282    }
283
284    let trailer = encoder.finish(post_checksum)?;
285
286    Ok(trailer.post_apply_checksum)
287}
288
289/// Chained page checksum: O(changed pages), not O(entire DB).
290///
291/// Computes `SHA-256(pre_checksum_bytes || page1_num_be || page1_data || page2_num_be || page2_data || ...)`
292/// with pages sorted by page number for determinism.
293pub fn chain_checksum(pre: Checksum, pages: &[(u32, Vec<u8>)]) -> Checksum {
294    use sha2::{Digest, Sha256};
295    let mut hasher = Sha256::new();
296    hasher.update(pre.into_inner().to_be_bytes());
297
298    let mut sorted_indices: Vec<usize> = (0..pages.len()).collect();
299    sorted_indices.sort_by_key(|&i| pages[i].0);
300
301    for &i in &sorted_indices {
302        hasher.update(pages[i].0.to_be_bytes());
303        hasher.update(&pages[i].1);
304    }
305
306    let result = hasher.finalize();
307    Checksum::new(u64::from_be_bytes(result[0..8].try_into().unwrap()))
308}
309
310/// Streaming chain hasher: computes chain checksum incrementally during LTX decode.
311///
312/// Pages MUST be fed in sorted order (by page number). Our encoder sorts pages,
313/// so they arrive sorted during decode. This eliminates the need to accumulate
314/// all decoded pages in a Vec just for checksum verification.
315pub struct ChainHasher {
316    hasher: sha2::Sha256,
317    page_count: usize,
318}
319
320impl ChainHasher {
321    /// Create a new chain hasher seeded with the pre-apply checksum.
322    pub fn new(pre: Checksum) -> Self {
323        use sha2::Digest;
324        let mut hasher = sha2::Sha256::new();
325        hasher.update(pre.into_inner().to_be_bytes());
326        Self { hasher, page_count: 0 }
327    }
328
329    /// Feed a page into the hash. Pages must arrive in sorted order.
330    pub fn update(&mut self, page_num: u32, page_data: &[u8]) {
331        use sha2::Digest;
332        self.hasher.update(page_num.to_be_bytes());
333        self.hasher.update(page_data);
334        self.page_count += 1;
335    }
336
337    /// Finalize and return the chain checksum.
338    pub fn finish(self) -> Checksum {
339        use sha2::Digest;
340        let result = self.hasher.finalize();
341        Checksum::new(u64::from_be_bytes(result[0..8].try_into().unwrap()))
342    }
343
344    /// Number of pages fed into the hasher.
345    pub fn page_count(&self) -> usize {
346        self.page_count
347    }
348}
349
350/// Verify an LTX file by decoding all pages and checking the checksum
351/// Returns the header on success, or an error describing the verification failure
352pub fn verify_ltx<R: Read>(reader: R) -> Result<Header> {
353    let (mut decoder, header) = Decoder::new(reader)?;
354
355    let page_size = header.page_size.into_inner() as usize;
356    let mut page_buf = vec![0u8; page_size];
357
358    // Decode all pages (required to verify checksum)
359    while decoder.decode_page(&mut page_buf)?.is_some() {
360        // Just consume the pages
361    }
362
363    // Verify checksum - this will fail if corrupted
364    decoder.finish()?;
365
366    Ok(header)
367}
368
369/// Compute database checksum (single u64 from SHA256)
370pub fn compute_db_checksum(data: &[u8]) -> Checksum {
371    use sha2::{Digest, Sha256};
372    let mut hasher = Sha256::new();
373    hasher.update(data);
374    let result = hasher.finalize();
375    // Take first 8 bytes as u64
376    let hash = u64::from_be_bytes(result[0..8].try_into().unwrap());
377    Checksum::new(hash)
378}
379
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use tempfile::tempdir;
385
386    #[test]
387    fn test_snapshot_roundtrip_single_page() {
388        let dir = tempdir().unwrap();
389        let db_path = dir.path().join("test.db");
390        let ltx_path = dir.path().join("test.ltx");
391        let restored_path = dir.path().join("restored.db");
392
393        // Create a simple SQLite database (4KB page size, 1 page)
394        let page_size = 4096u32;
395        let db_data = vec![0x42u8; page_size as usize];
396        std::fs::write(&db_path, &db_data).unwrap();
397
398        // Encode as LTX
399        let ltx_file = std::fs::File::create(&ltx_path).unwrap();
400        encode_snapshot(ltx_file, &db_path, page_size, 1).unwrap();
401
402        // Decode back
403        let ltx_file = std::fs::File::open(&ltx_path).unwrap();
404        let result = decode_to_db(ltx_file, &restored_path).unwrap();
405
406        // Verify
407        let restored_data = std::fs::read(&restored_path).unwrap();
408        assert_eq!(db_data, restored_data);
409        assert_eq!(result.header.page_size.into_inner(), page_size);
410        assert_eq!(result.header.min_txid.into_inner(), 1);
411        assert_eq!(result.header.max_txid.into_inner(), 1);
412    }
413
414    #[test]
415    fn test_snapshot_roundtrip_multiple_pages() {
416        let dir = tempdir().unwrap();
417        let db_path = dir.path().join("test.db");
418        let restored_path = dir.path().join("restored.db");
419
420        let page_size = 4096u32;
421        let num_pages = 10;
422
423        // Create database with multiple pages, each with unique content
424        let mut db_data = Vec::new();
425        for i in 0..num_pages {
426            let mut page = vec![(i as u8).wrapping_mul(17); page_size as usize];
427            // Add page number marker at start
428            page[0..4].copy_from_slice(&(i as u32).to_be_bytes());
429            db_data.extend(page);
430        }
431        std::fs::write(&db_path, &db_data).unwrap();
432
433        // Encode as LTX to buffer
434        let mut ltx_buffer = Vec::new();
435        encode_snapshot(&mut ltx_buffer, &db_path, page_size, 100).unwrap();
436
437        // Decode back
438        let cursor = std::io::Cursor::new(ltx_buffer);
439        let result = decode_to_db(cursor, &restored_path).unwrap();
440
441        // Verify byte-for-byte
442        let restored_data = std::fs::read(&restored_path).unwrap();
443        assert_eq!(db_data.len(), restored_data.len());
444        assert_eq!(db_data, restored_data);
445        assert_eq!(result.header.commit.into_inner(), num_pages as u32);
446        assert_eq!(result.header.max_txid.into_inner(), 100);
447    }
448
449    #[test]
450    fn test_snapshot_various_page_sizes() {
451        let dir = tempdir().unwrap();
452
453        for page_size in [512u32, 1024, 2048, 4096, 8192, 16384, 32768] {
454            let db_path = dir.path().join(format!("test_{}.db", page_size));
455            let restored_path = dir.path().join(format!("restored_{}.db", page_size));
456
457            // Create 3-page database
458            let db_data: Vec<u8> = (0..3)
459                .flat_map(|i| vec![(i * 50) as u8; page_size as usize])
460                .collect();
461            std::fs::write(&db_path, &db_data).unwrap();
462
463            let mut ltx_buffer = Vec::new();
464            encode_snapshot(&mut ltx_buffer, &db_path, page_size, 1).unwrap();
465
466            let cursor = std::io::Cursor::new(ltx_buffer);
467            let result = decode_to_db(cursor, &restored_path).unwrap();
468
469            let restored_data = std::fs::read(&restored_path).unwrap();
470            assert_eq!(
471                db_data, restored_data,
472                "Mismatch for page_size={}",
473                page_size
474            );
475            assert_eq!(result.header.page_size.into_inner(), page_size);
476        }
477    }
478
479    #[test]
480    fn test_snapshot_preserves_binary_data() {
481        let dir = tempdir().unwrap();
482        let db_path = dir.path().join("binary.db");
483        let restored_path = dir.path().join("restored.db");
484
485        let page_size = 4096u32;
486
487        // Create database with all byte values (0x00-0xFF pattern)
488        let mut db_data = Vec::new();
489        for page_num in 0..4 {
490            let mut page = vec![0u8; page_size as usize];
491            for (i, byte) in page.iter_mut().enumerate() {
492                *byte = ((page_num * 256 + i) % 256) as u8;
493            }
494            db_data.extend(page);
495        }
496        std::fs::write(&db_path, &db_data).unwrap();
497
498        let mut ltx_buffer = Vec::new();
499        encode_snapshot(&mut ltx_buffer, &db_path, page_size, 50).unwrap();
500
501        let cursor = std::io::Cursor::new(ltx_buffer);
502        decode_to_db(cursor, &restored_path).unwrap();
503
504        let restored_data = std::fs::read(&restored_path).unwrap();
505
506        // Verify every single byte
507        for (i, (orig, rest)) in db_data.iter().zip(restored_data.iter()).enumerate() {
508            assert_eq!(
509                orig, rest,
510                "Byte mismatch at offset {}: expected 0x{:02x}, got 0x{:02x}",
511                i, orig, rest
512            );
513        }
514    }
515
516    #[test]
517    fn test_incremental_ltx_encoding_with_checksum() {
518        // Test encoding WAL changes as incremental LTX
519        // Note: LTX format requires pre_apply_checksum for incremental files
520        let page_size = 4096u32;
521
522        // Simulate WAL changes: sequential pages (LTX requirement)
523        let pages: Vec<(u32, Vec<u8>)> = vec![
524            (1, vec![0xAA; page_size as usize]),
525            (2, vec![0xBB; page_size as usize]),
526            (3, vec![0xCC; page_size as usize]),
527        ];
528
529        // Pre-apply checksum is required for non-snapshot LTX files
530        let pre_checksum = Checksum::new(0x123456789ABCDEF0);
531        let expected_post = Checksum::new(0xFEDCBA9876543210);
532
533        let mut ltx_buffer = Vec::new();
534        let checksum = encode_wal_changes(
535            &mut ltx_buffer,
536            &pages,
537            page_size,
538            10,  // min_txid
539            12,  // max_txid
540            3,   // commit_page (db size in pages)
541            Some(pre_checksum),
542            expected_post,
543        )
544        .unwrap();
545
546        // Verify we got the expected checksum
547        assert_eq!(checksum.into_inner(), expected_post.into_inner());
548
549        // Verify buffer is non-empty and reasonable size
550        assert!(!ltx_buffer.is_empty());
551        assert!(ltx_buffer.len() > 100); // At least header
552    }
553
554    #[test]
555    fn test_incremental_ltx_format_rules() {
556        // LTX format rules:
557        // - min_txid=1 is a "snapshot" (no pre_checksum allowed)
558        // - min_txid>1 is "incremental" (pre_checksum required)
559        let page_size = 1024u32;
560        let pre_checksum = Checksum::new(0x123456789ABCDEF0);
561
562        // Incremental (min_txid > 1) requires pre_checksum
563        let pages: Vec<(u32, Vec<u8>)> = vec![
564            (1, vec![0x11; page_size as usize]),
565            (2, vec![0x22; page_size as usize]),
566        ];
567
568        let expected_post = Checksum::new(0xABCDEF1234567890);
569
570        let mut ltx_buffer = Vec::new();
571        let result = encode_wal_changes(
572            &mut ltx_buffer,
573            &pages,
574            page_size,
575            10, // min_txid > 1 = incremental
576            11, // max_txid
577            2,
578            Some(pre_checksum),
579            expected_post,
580        );
581        assert!(
582            result.is_ok(),
583            "Incremental with pre_checksum should succeed: {:?}",
584            result.err()
585        );
586
587        // Incremental without pre_checksum should fail
588        let mut ltx_buffer2 = Vec::new();
589        let result2 = encode_wal_changes(
590            &mut ltx_buffer2,
591            &pages,
592            page_size,
593            10, // min_txid > 1 = incremental
594            11,
595            2,
596            None, // Missing pre_checksum!
597            expected_post,
598        );
599        assert!(
600            result2.is_err(),
601            "Incremental without pre_checksum should fail"
602        );
603
604        // Snapshot (min_txid = 1) should not have pre_checksum
605        let mut ltx_buffer3 = Vec::new();
606        let result3 = encode_wal_changes(
607            &mut ltx_buffer3,
608            &pages,
609            page_size,
610            1, // min_txid = 1 = snapshot
611            2,
612            2,
613            None, // No pre_checksum for snapshot
614            expected_post,
615        );
616        assert!(
617            result3.is_ok(),
618            "Snapshot without pre_checksum should succeed: {:?}",
619            result3.err()
620        );
621    }
622
623    #[test]
624    fn test_txid_ranges() {
625        let dir = tempdir().unwrap();
626        let db_path = dir.path().join("test.db");
627        let restored_path = dir.path().join("restored.db");
628
629        let page_size = 4096u32;
630        let db_data = vec![0x42u8; page_size as usize];
631        std::fs::write(&db_path, &db_data).unwrap();
632
633        // Test various TXID values
634        for txid in [1u64, 100, 1000, 999999, u32::MAX as u64] {
635            let mut ltx_buffer = Vec::new();
636            encode_snapshot(&mut ltx_buffer, &db_path, page_size, txid).unwrap();
637
638            let cursor = std::io::Cursor::new(ltx_buffer);
639            let result = decode_to_db(cursor, &restored_path).unwrap();
640
641            assert_eq!(result.header.max_txid.into_inner(), txid);
642        }
643    }
644
645    #[test]
646    fn test_checksum_computation() {
647        // Verify checksum is deterministic
648        let data1 = b"hello world";
649        let data2 = b"hello world";
650        let data3 = b"hello worlD"; // Different
651
652        let cs1 = compute_db_checksum(data1);
653        let cs2 = compute_db_checksum(data2);
654        let cs3 = compute_db_checksum(data3);
655
656        assert_eq!(cs1.into_inner(), cs2.into_inner());
657        assert_ne!(cs1.into_inner(), cs3.into_inner());
658    }
659
660
661    #[test]
662    fn test_large_database() {
663        let dir = tempdir().unwrap();
664        let db_path = dir.path().join("large.db");
665        let restored_path = dir.path().join("restored.db");
666
667        let page_size = 4096u32;
668        let num_pages = 100; // 400KB database
669
670        // Create large database with varying content
671        let mut db_data = Vec::with_capacity(num_pages * page_size as usize);
672        for i in 0..num_pages {
673            let pattern = (i as u8).wrapping_mul(37);
674            let mut page = vec![pattern; page_size as usize];
675            // Mark page with its number
676            let page_num_bytes = (i as u32).to_le_bytes();
677            page[0..4].copy_from_slice(&page_num_bytes);
678            db_data.extend(page);
679        }
680        std::fs::write(&db_path, &db_data).unwrap();
681
682        let mut ltx_buffer = Vec::new();
683        encode_snapshot(&mut ltx_buffer, &db_path, page_size, 1000).unwrap();
684
685        // LTX should be compressed
686        assert!(
687            ltx_buffer.len() < db_data.len(),
688            "LTX ({}) should be smaller than raw DB ({}) due to compression",
689            ltx_buffer.len(),
690            db_data.len()
691        );
692
693        let cursor = std::io::Cursor::new(ltx_buffer);
694        decode_to_db(cursor, &restored_path).unwrap();
695
696        let restored_data = std::fs::read(&restored_path).unwrap();
697        assert_eq!(db_data, restored_data);
698    }
699
700    #[test]
701    fn test_encode_to_memory_buffer() {
702        let dir = tempdir().unwrap();
703        let db_path = dir.path().join("test.db");
704        let restored_path = dir.path().join("restored.db");
705
706        let page_size = 4096u32;
707        let db_data = vec![0x42u8; page_size as usize * 5];
708        std::fs::write(&db_path, &db_data).unwrap();
709
710        // Encode to Vec<u8> (common use case for S3 upload)
711        let mut buffer: Vec<u8> = Vec::new();
712        encode_snapshot(&mut buffer, &db_path, page_size, 1).unwrap();
713
714        // Decode from Cursor (common use case for S3 download)
715        let cursor = std::io::Cursor::new(buffer);
716        decode_to_db(cursor, &restored_path).unwrap();
717
718        let restored_data = std::fs::read(&restored_path).unwrap();
719        assert_eq!(db_data, restored_data);
720    }
721
722    #[test]
723    fn test_apply_ltx_in_place_basic() {
724        let dir = tempdir().unwrap();
725        let db_path = dir.path().join("test.db");
726
727        let page_size = 4096u32;
728        let num_pages = 5;
729
730        // Create initial database
731        let db_data = vec![0x00u8; (page_size as usize) * num_pages];
732        std::fs::write(&db_path, &db_data).unwrap();
733
734        // Create incremental LTX that updates pages 2 and 4
735        let pages: Vec<(u32, Vec<u8>)> = vec![
736            (2, vec![0xAA; page_size as usize]),
737            (4, vec![0xBB; page_size as usize]),
738        ];
739
740        let pre_checksum = compute_checksum_from_file(&db_path).unwrap();
741
742        // Compute expected post_checksum after applying changes
743        let expected_post = chain_checksum(pre_checksum, &pages);
744
745        let mut ltx_buffer = Vec::new();
746        encode_wal_changes(
747            &mut ltx_buffer,
748            &pages,
749            page_size,
750            2,  // min_txid
751            3,  // max_txid
752            num_pages as u32,
753            Some(pre_checksum),
754            expected_post,
755        )
756        .unwrap();
757
758        // Apply in-place (verifies checksum chain)
759        let cursor = std::io::Cursor::new(ltx_buffer);
760        let result = apply_ltx_to_db(cursor, &db_path).unwrap();
761
762        // Verify only changed pages were updated
763        let result_data = std::fs::read(&db_path).unwrap();
764
765        // Page 1 (index 0): unchanged
766        assert_eq!(&result_data[0..page_size as usize], &vec![0x00u8; page_size as usize][..]);
767        // Page 2 (index 1): updated to 0xAA
768        let page2_start = page_size as usize;
769        assert_eq!(&result_data[page2_start..page2_start + page_size as usize], &vec![0xAAu8; page_size as usize][..]);
770        // Page 3 (index 2): unchanged
771        let page3_start = 2 * page_size as usize;
772        assert_eq!(&result_data[page3_start..page3_start + page_size as usize], &vec![0x00u8; page_size as usize][..]);
773        // Page 4 (index 3): updated to 0xBB
774        let page4_start = 3 * page_size as usize;
775        assert_eq!(&result_data[page4_start..page4_start + page_size as usize], &vec![0xBBu8; page_size as usize][..]);
776        // Page 5 (index 4): unchanged
777        let page5_start = 4 * page_size as usize;
778        assert_eq!(&result_data[page5_start..page5_start + page_size as usize], &vec![0x00u8; page_size as usize][..]);
779
780        assert_eq!(result.header.min_txid.into_inner(), 2);
781        assert_eq!(result.header.max_txid.into_inner(), 3);
782    }
783
784    #[test]
785    fn test_apply_ltx_in_place_preserves_other_data() {
786        let dir = tempdir().unwrap();
787        let db_path = dir.path().join("test.db");
788
789        let page_size = 4096u32;
790
791        // Create database with unique content per page
792        let mut db_data = Vec::new();
793        for i in 0..4u8 {
794            db_data.extend(vec![i * 10; page_size as usize]);
795        }
796        std::fs::write(&db_path, &db_data).unwrap();
797
798        // Update only page 3
799        let pages: Vec<(u32, Vec<u8>)> = vec![
800            (3, vec![0xFF; page_size as usize]),
801        ];
802
803        let pre_checksum = compute_checksum_from_file(&db_path).unwrap();
804
805        // Compute expected post_checksum after applying changes
806        let expected_post = chain_checksum(pre_checksum, &pages);
807
808        let mut ltx_buffer = Vec::new();
809        encode_wal_changes(&mut ltx_buffer, &pages, page_size, 10, 11, 4, Some(pre_checksum), expected_post).unwrap();
810
811        let cursor = std::io::Cursor::new(ltx_buffer);
812        apply_ltx_to_db(cursor, &db_path).unwrap();
813
814        let result_data = std::fs::read(&db_path).unwrap();
815
816        // Verify pages 1, 2, 4 unchanged
817        assert_eq!(&result_data[0..page_size as usize], &vec![0u8; page_size as usize][..]);
818        assert_eq!(&result_data[page_size as usize..2 * page_size as usize], &vec![10u8; page_size as usize][..]);
819        // Page 3 updated
820        assert_eq!(&result_data[2 * page_size as usize..3 * page_size as usize], &vec![0xFFu8; page_size as usize][..]);
821        // Page 4 unchanged
822        assert_eq!(&result_data[3 * page_size as usize..4 * page_size as usize], &vec![30u8; page_size as usize][..]);
823    }
824
825    #[test]
826    fn test_compute_checksum_from_file() {
827        let dir = tempdir().unwrap();
828        let db_path = dir.path().join("test.db");
829
830        let data = vec![0x42u8; 4096];
831        std::fs::write(&db_path, &data).unwrap();
832
833        let checksum1 = compute_checksum_from_file(&db_path).unwrap();
834        let checksum2 = compute_checksum_from_file(&db_path).unwrap();
835
836        // Same file should produce same checksum
837        assert_eq!(checksum1.into_inner(), checksum2.into_inner());
838
839        // Different content should produce different checksum
840        std::fs::write(&db_path, vec![0x43u8; 4096]).unwrap();
841        let checksum3 = compute_checksum_from_file(&db_path).unwrap();
842        assert_ne!(checksum1.into_inner(), checksum3.into_inner());
843    }
844
845    #[test]
846    fn test_apply_ltx_chain_simulation() {
847        // Simulate a realistic scenario: snapshot -> incremental -> incremental
848        let dir = tempdir().unwrap();
849        let db_path = dir.path().join("test.db");
850
851        let page_size = 4096u32;
852        let num_pages = 3;
853
854        // Initial database state
855        let initial_data: Vec<u8> = (0..num_pages)
856            .flat_map(|i| vec![(i as u8) * 10; page_size as usize])
857            .collect();
858        std::fs::write(&db_path, &initial_data).unwrap();
859
860        // Snapshot (TXID 1)
861        let mut snapshot_buffer = Vec::new();
862        encode_snapshot(&mut snapshot_buffer, &db_path, page_size, 1).unwrap();
863
864        // First incremental: update page 1 (TXID 2)
865        let pre_checksum1 = compute_checksum_from_file(&db_path).unwrap();
866        let pages1: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
867        let expected_post1 = chain_checksum(pre_checksum1, &pages1);
868
869        let mut inc1_buffer = Vec::new();
870        let post_checksum1 = encode_wal_changes(
871            &mut inc1_buffer,
872            &pages1,
873            page_size,
874            2, 2,
875            num_pages as u32,
876            Some(pre_checksum1),
877            expected_post1,
878        ).unwrap();
879
880        // Apply first incremental
881        let cursor1 = std::io::Cursor::new(inc1_buffer);
882        let result1 = apply_ltx_to_db(cursor1, &db_path).unwrap();
883
884        // Chain continues: post_checksum1 becomes pre_checksum2
885        let pre_checksum2 = result1.post_apply_checksum;
886
887        // Second incremental: update page 2 (TXID 3)
888        let pages2: Vec<(u32, Vec<u8>)> = vec![(2, vec![0xBB; page_size as usize])];
889        let expected_post2 = chain_checksum(pre_checksum2, &pages2);
890
891        let mut inc2_buffer = Vec::new();
892        encode_wal_changes(
893            &mut inc2_buffer,
894            &pages2,
895            page_size,
896            3, 3,
897            num_pages as u32,
898            Some(pre_checksum2),
899            expected_post2,
900        ).unwrap();
901
902        // Apply second incremental
903        let cursor2 = std::io::Cursor::new(inc2_buffer);
904        apply_ltx_to_db(cursor2, &db_path).unwrap();
905
906        // Final verification
907        let final_data = std::fs::read(&db_path).unwrap();
908        assert_eq!(&final_data[0..page_size as usize], &vec![0xAAu8; page_size as usize][..]); // Page 1 updated
909        assert_eq!(&final_data[page_size as usize..2 * page_size as usize], &vec![0xBBu8; page_size as usize][..]); // Page 2 updated
910        assert_eq!(&final_data[2 * page_size as usize..3 * page_size as usize], &vec![20u8; page_size as usize][..]); // Page 3 unchanged
911    }
912
913    // ============================================
914    // Checksum Chain Error Tests
915    // ============================================
916
917    #[test]
918    fn test_apply_ltx_post_checksum_mismatch_via_wrong_pre() {
919        // With chained checksums, a wrong pre produces a self-consistent but different chain.
920        // The LTX is internally consistent, so apply succeeds. Chain breaks are detected
921        // at the file-linking level (previous file's post != this file's pre).
922        let dir = tempdir().unwrap();
923        let db_path = dir.path().join("test.db");
924        let page_size = 4096u32;
925
926        let initial_data = vec![0x00u8; page_size as usize * 3];
927        std::fs::write(&db_path, &initial_data).unwrap();
928
929        let pages: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
930        let wrong_pre_checksum = Checksum::new(0xDEADBEEF);
931        let wrong_post = chain_checksum(wrong_pre_checksum, &pages);
932
933        let mut ltx_buffer = Vec::new();
934        encode_wal_changes(
935            &mut ltx_buffer,
936            &pages,
937            page_size,
938            2, 2,
939            3,
940            Some(wrong_pre_checksum),
941            wrong_post,
942        ).unwrap();
943
944        // Self-consistent LTX applies successfully
945        let cursor = std::io::Cursor::new(ltx_buffer);
946        let result = apply_ltx_to_db(cursor, &db_path);
947        assert!(result.is_ok(), "Self-consistent LTX should apply successfully");
948    }
949
950    #[test]
951    fn test_apply_ltx_post_checksum_mismatch() {
952        // Test that apply_ltx_to_db detects corruption when post_checksum doesn't match actual result
953        let dir = tempdir().unwrap();
954        let db_path = dir.path().join("test.db");
955        let page_size = 4096u32;
956
957        // Create initial database
958        let initial_data = vec![0x00u8; page_size as usize * 3];
959        std::fs::write(&db_path, &initial_data).unwrap();
960
961        let pre_checksum = compute_checksum_from_file(&db_path).unwrap();
962
963        // Create incremental with pages
964        let pages: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
965
966        // Use WRONG post_checksum (not matching actual result)
967        let wrong_post_checksum = Checksum::new(0xBADC0FFEE);
968
969        let mut ltx_buffer = Vec::new();
970        encode_wal_changes(
971            &mut ltx_buffer,
972            &pages,
973            page_size,
974            2, 2,
975            3,
976            Some(pre_checksum),
977            wrong_post_checksum, // This is wrong!
978        ).unwrap();
979
980        // Applying should FAIL with post-apply checksum mismatch
981        let cursor = std::io::Cursor::new(ltx_buffer);
982        let result = apply_ltx_to_db(cursor, &db_path);
983
984        assert!(result.is_err(), "Should detect post-apply checksum mismatch");
985        let err_msg = result.unwrap_err().to_string();
986        assert!(err_msg.contains("Post-apply checksum mismatch"), "Error should mention post-apply mismatch: {}", err_msg);
987        assert!(err_msg.contains(&format!("{:016x}", wrong_post_checksum.into_inner())), "Error should show expected checksum");
988    }
989
990    #[test]
991    fn test_apply_ltx_out_of_order() {
992        // With chained checksums, apply_ltx_to_db only verifies internal consistency
993        // (chain_checksum(pre, pages) == post). A self-consistent LTX applies successfully
994        // even to the "wrong" DB state. Out-of-order detection is the caller's job:
995        // each file's pre_checksum must equal the previous file's post_checksum.
996        let dir = tempdir().unwrap();
997        let db_path = dir.path().join("test.db");
998        let page_size = 4096u32;
999
1000        let initial_data = vec![0x00u8; page_size as usize * 3];
1001        std::fs::write(&db_path, &initial_data).unwrap();
1002
1003        let checksum0 = compute_checksum_from_file(&db_path).unwrap();
1004
1005        // Create three chained incrementals
1006        let pages1: Vec<(u32, Vec<u8>)> = vec![(1, vec![0xAA; page_size as usize])];
1007        let post1 = chain_checksum(checksum0, &pages1);
1008        let mut buf1 = Vec::new();
1009        encode_wal_changes(&mut buf1, &pages1, page_size, 2, 2, 3, Some(checksum0), post1).unwrap();
1010
1011        let pages2: Vec<(u32, Vec<u8>)> = vec![(2, vec![0xBB; page_size as usize])];
1012        let post2 = chain_checksum(post1, &pages2);
1013        let mut buf2 = Vec::new();
1014        encode_wal_changes(&mut buf2, &pages2, page_size, 3, 3, 3, Some(post1), post2).unwrap();
1015
1016        let pages3: Vec<(u32, Vec<u8>)> = vec![(3, vec![0xCC; page_size as usize])];
1017        let post3 = chain_checksum(post2, &pages3);
1018        let mut buf3 = Vec::new();
1019        encode_wal_changes(&mut buf3, &pages3, page_size, 4, 4, 3, Some(post2), post3).unwrap();
1020
1021        // Apply 1st only, then skip 2nd and apply 3rd
1022        apply_ltx_to_db(std::io::Cursor::new(&buf1), &db_path).unwrap();
1023        let result1 = apply_ltx_to_db(std::io::Cursor::new(&buf1), &db_path).unwrap();
1024
1025        // 3rd LTX applies successfully (internally self-consistent)
1026        let result3 = apply_ltx_to_db(std::io::Cursor::new(&buf3), &db_path).unwrap();
1027
1028        // But the chain is broken: result1.post != buf3's pre (which is post2)
1029        let buf3_pre = result3.header.pre_apply_checksum.unwrap();
1030        assert_ne!(
1031            result1.post_apply_checksum, buf3_pre,
1032            "Chain should be broken when skipping an incremental"
1033        );
1034
1035        // Correct chain: apply all three in order
1036        std::fs::write(&db_path, &initial_data).unwrap();
1037        let r1 = apply_ltx_to_db(std::io::Cursor::new(&buf1), &db_path).unwrap();
1038        let r2 = apply_ltx_to_db(std::io::Cursor::new(&buf2), &db_path).unwrap();
1039        let r3 = apply_ltx_to_db(std::io::Cursor::new(&buf3), &db_path).unwrap();
1040
1041        // Chain links match
1042        assert_eq!(r1.post_apply_checksum.into_inner(), r2.header.pre_apply_checksum.unwrap().into_inner());
1043        assert_eq!(r2.post_apply_checksum.into_inner(), r3.header.pre_apply_checksum.unwrap().into_inner());
1044    }
1045
1046    #[test]
1047    fn test_decode_to_db_post_checksum_verification() {
1048        // Test that decode_to_db verifies post_checksum matches actual restored file
1049        let dir = tempdir().unwrap();
1050        let db_path = dir.path().join("test.db");
1051        let restored_path = dir.path().join("restored.db");
1052        let page_size = 4096u32;
1053
1054        // Create source database with varied content per page
1055        let mut db_data = Vec::new();
1056        for i in 0..3u8 {
1057            db_data.extend(vec![i * 42; page_size as usize]);
1058        }
1059        std::fs::write(&db_path, &db_data).unwrap();
1060
1061        let expected_checksum = compute_checksum_from_file(&db_path).unwrap();
1062
1063        // Create snapshot
1064        let mut snapshot_buffer = Vec::new();
1065        encode_snapshot(&mut snapshot_buffer, &db_path, page_size, 1).unwrap();
1066
1067        // Decode successfully
1068        let cursor = std::io::Cursor::new(&snapshot_buffer);
1069        let result = decode_to_db(cursor, &restored_path);
1070        assert!(result.is_ok(), "Valid snapshot should decode successfully");
1071
1072        // Verify the post_apply_checksum in result matches the actual restored file
1073        let decoded_result = result.unwrap();
1074        let actual_checksum = compute_checksum_from_file(&restored_path).unwrap();
1075
1076        assert_eq!(
1077            decoded_result.post_apply_checksum.into_inner(),
1078            actual_checksum.into_inner(),
1079            "post_apply_checksum in result should match actual restored file checksum"
1080        );
1081
1082        assert_eq!(
1083            actual_checksum.into_inner(),
1084            expected_checksum.into_inner(),
1085            "Restored file should match original"
1086        );
1087
1088        // Verify restored content byte-for-byte
1089        let restored_data = std::fs::read(&restored_path).unwrap();
1090        assert_eq!(restored_data, db_data, "Restored data should match original exactly");
1091    }
1092
1093    // ============================================
1094    // NO_CHECKSUM Flag Tests (Litestream Compatibility)
1095    // ============================================
1096
1097    #[test]
1098    fn test_no_checksum_flag_decode() {
1099        // Test that files with NO_CHECKSUM flag skip checksum verification
1100        use litepages::Encoder;
1101
1102        let dir = tempdir().unwrap();
1103        let db_path = dir.path().join("test.db");
1104        let restored_path = dir.path().join("restored.db");
1105        let page_size = 4096u32;
1106
1107        // Create source database
1108        let db_data = vec![0x42u8; page_size as usize * 3];
1109        std::fs::write(&db_path, &db_data).unwrap();
1110
1111        // Create LTX file with NO_CHECKSUM flag (litestream format)
1112        let header = Header {
1113            flags: HeaderFlags::NO_CHECKSUM | HeaderFlags::COMPRESS_LZ4,
1114            page_size: PageSize::new(page_size).unwrap(),
1115            commit: PageNum::new(3).unwrap(),
1116            min_txid: TXID::ONE,
1117            max_txid: TXID::ONE,
1118            timestamp: SystemTime::now(),
1119            pre_apply_checksum: None,
1120        };
1121
1122        let mut ltx_buffer = Vec::new();
1123        let mut encoder = Encoder::new(&mut ltx_buffer, &header).unwrap();
1124
1125        // Encode all 3 pages
1126        for i in 0..3u32 {
1127            encoder.encode_page(PageNum::new(i + 1).unwrap(), &db_data[(i as usize * page_size as usize)..(i as usize + 1) * page_size as usize]).unwrap();
1128        }
1129
1130        // Use zero checksum (litestream doesn't track checksums)
1131        encoder.finish(Checksum::new(0)).unwrap();
1132
1133        // Decode should succeed even though checksum is zero
1134        let cursor = std::io::Cursor::new(&ltx_buffer);
1135        let result = decode_to_db(cursor, &restored_path);
1136
1137        assert!(result.is_ok(), "Should decode successfully with NO_CHECKSUM flag");
1138        let decode_result = result.unwrap();
1139
1140        // Walrust computes checksums internally even when NO_CHECKSUM is set (for tracking)
1141        // But it doesn't verify them against the LTX file's checksums
1142        assert!(decode_result.post_apply_checksum.into_inner() != 0, "Should compute actual checksum even with NO_CHECKSUM");
1143
1144        // Verify data was restored correctly
1145        let restored_data = std::fs::read(&restored_path).unwrap();
1146        assert_eq!(restored_data, db_data, "Data should be restored correctly even with NO_CHECKSUM");
1147
1148        // Verify the checksum matches the actual data
1149        let expected_checksum = compute_db_checksum(&db_data);
1150        assert_eq!(decode_result.post_apply_checksum.into_inner(), expected_checksum.into_inner());
1151    }
1152
1153    #[test]
1154    fn test_no_checksum_flag_apply() {
1155        // Test that incremental LTX with NO_CHECKSUM flag skips checksum verification
1156        use litepages::Encoder;
1157
1158        let dir = tempdir().unwrap();
1159        let db_path = dir.path().join("test.db");
1160        let page_size = 4096u32;
1161
1162        // Create initial database (3 pages of zeros)
1163        let initial_data = vec![0x00u8; page_size as usize * 3];
1164        std::fs::write(&db_path, &initial_data).unwrap();
1165
1166        // Create incremental LTX with NO_CHECKSUM flag (like litestream)
1167        // This would normally require pre_apply_checksum, but NO_CHECKSUM skips that
1168        let header = Header {
1169            flags: HeaderFlags::NO_CHECKSUM | HeaderFlags::COMPRESS_LZ4,
1170            page_size: PageSize::new(page_size).unwrap(),
1171            commit: PageNum::new(3).unwrap(),
1172            min_txid: TXID::new(2).unwrap(), // Incremental (not snapshot)
1173            max_txid: TXID::new(2).unwrap(),
1174            timestamp: SystemTime::now(),
1175            pre_apply_checksum: Some(Checksum::new(0)), // Zero checksum (litestream doesn't track)
1176        };
1177
1178        let mut ltx_buffer = Vec::new();
1179        let mut encoder = Encoder::new(&mut ltx_buffer, &header).unwrap();
1180
1181        // Modify page 2
1182        let modified_page = vec![0xAAu8; page_size as usize];
1183        encoder.encode_page(PageNum::new(2).unwrap(), &modified_page).unwrap();
1184
1185        // Use zero checksum
1186        encoder.finish(Checksum::new(0)).unwrap();
1187
1188        // Apply should succeed even with zero/wrong checksums
1189        let cursor = std::io::Cursor::new(&ltx_buffer);
1190        let result = apply_ltx_to_db(cursor, &db_path);
1191
1192        assert!(result.is_ok(), "Should apply successfully with NO_CHECKSUM flag");
1193        let apply_result = result.unwrap();
1194
1195        // Walrust computes checksums internally even when NO_CHECKSUM is set (for tracking)
1196        // But it doesn't verify them against the LTX file's checksums
1197        assert!(apply_result.post_apply_checksum.into_inner() != 0, "Should compute actual checksum even with NO_CHECKSUM");
1198
1199        // Verify page 2 was modified
1200        let result_data = std::fs::read(&db_path).unwrap();
1201        assert_eq!(&result_data[page_size as usize..2 * page_size as usize], &modified_page[..]);
1202        // Verify other pages unchanged
1203        assert_eq!(&result_data[0..page_size as usize], &vec![0x00u8; page_size as usize][..]);
1204        assert_eq!(&result_data[2 * page_size as usize..3 * page_size as usize], &vec![0x00u8; page_size as usize][..]);
1205
1206        // With NO_CHECKSUM and a zero pre_checksum, the chain checksum is computed
1207        assert!(apply_result.post_apply_checksum.into_inner() != 0, "Should compute a chain checksum");
1208    }
1209
1210    #[test]
1211    fn test_no_checksum_flag_skips_verification() {
1212        // Verify that NO_CHECKSUM truly skips verification by using intentionally wrong checksums
1213        use litepages::Encoder;
1214
1215        let dir = tempdir().unwrap();
1216        let db_path = dir.path().join("test.db");
1217        let page_size = 4096u32;
1218
1219        // Create database
1220        let db_data = vec![0x42u8; page_size as usize];
1221        std::fs::write(&db_path, &db_data).unwrap();
1222
1223        // Create incremental with NO_CHECKSUM and WRONG pre_checksum
1224        // This should still succeed because NO_CHECKSUM skips verification
1225        let actual_checksum = compute_checksum_from_file(&db_path).unwrap();
1226        let wrong_checksum = Checksum::new(0xDEADBEEF); // Intentionally wrong
1227
1228        assert_ne!(wrong_checksum.into_inner(), actual_checksum.into_inner(), "Checksums should be different");
1229
1230        let header = Header {
1231            flags: HeaderFlags::NO_CHECKSUM | HeaderFlags::COMPRESS_LZ4,
1232            page_size: PageSize::new(page_size).unwrap(),
1233            commit: PageNum::new(1).unwrap(),
1234            min_txid: TXID::new(2).unwrap(),
1235            max_txid: TXID::new(2).unwrap(),
1236            timestamp: SystemTime::now(),
1237            pre_apply_checksum: Some(wrong_checksum), // WRONG on purpose!
1238        };
1239
1240        let mut ltx_buffer = Vec::new();
1241        let mut encoder = Encoder::new(&mut ltx_buffer, &header).unwrap();
1242        encoder.encode_page(PageNum::new(1).unwrap(), &vec![0x99u8; page_size as usize]).unwrap();
1243        encoder.finish(Checksum::new(0xBADC0FFEE)).unwrap(); // Also wrong!
1244
1245        // Should succeed because NO_CHECKSUM skips all verification
1246        let cursor = std::io::Cursor::new(&ltx_buffer);
1247        let result = apply_ltx_to_db(cursor, &db_path);
1248
1249        assert!(result.is_ok(), "Should succeed with wrong checksums when NO_CHECKSUM is set");
1250    }
1251}
1252