Skip to main content

hadb_changeset/
physical.rs

1use sha2::{Digest, Sha256};
2
3use crate::error::ChangesetError;
4
5pub const HADBP_MAGIC: [u8; 5] = *b"HADBP";
6pub const HADBP_VERSION: u8 = 1;
7/// Header: magic(5) + version(1) + flags(1) + page_id_size(1) + page_size(4) + seq(8) + prev_checksum(8) + page_count(4) + created_ms(8) = 40
8const HEADER_SIZE: usize = 40;
9/// Trailer: checksum(8)
10const TRAILER_SIZE: usize = 8;
11/// Minimum encoded size: header + trailer
12const MIN_SIZE: usize = HEADER_SIZE + TRAILER_SIZE;
13
14/// Page ID byte width. Stored in the header so the format is self-describing.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum PageIdSize {
17    /// 4 bytes (u32). Used by SQLite.
18    U32 = 4,
19    /// 8 bytes (u64). Used by DuckDB.
20    U64 = 8,
21}
22
23impl PageIdSize {
24    fn from_byte(b: u8) -> Result<Self, ChangesetError> {
25        match b {
26            4 => Ok(Self::U32),
27            8 => Ok(Self::U64),
28            _ => Err(ChangesetError::InvalidPageIdSize(b)),
29        }
30    }
31
32    fn byte_len(self) -> usize {
33        self as usize
34    }
35}
36
37/// A page ID that can be either u32 or u64.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum PageId {
40    U32(u32),
41    U64(u64),
42}
43
44impl PageId {
45    pub fn to_u64(self) -> u64 {
46        match self {
47            PageId::U32(v) => v as u64,
48            PageId::U64(v) => v,
49        }
50    }
51
52}
53
54impl PartialOrd for PageId {
55    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
56        Some(self.cmp(other))
57    }
58}
59
60impl Ord for PageId {
61    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
62        self.to_u64().cmp(&other.to_u64())
63    }
64}
65
66/// Header for a physical changeset.
67#[derive(Debug, Clone, PartialEq, Eq)]
68pub struct PhysicalHeader {
69    pub flags: u8,
70    pub page_id_size: PageIdSize,
71    pub page_size: u32,
72    pub seq: u64,
73    pub prev_checksum: u64,
74    pub page_count: u32,
75    /// Milliseconds since Unix epoch when this changeset was created.
76    /// Used for debugging, retention policies, and diagnostics.
77    pub created_ms: i64,
78}
79
80/// A single page entry within a changeset.
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct PageEntry {
83    pub page_id: PageId,
84    pub data: Vec<u8>,
85}
86
87/// A complete physical changeset: header + pages + checksum.
88#[derive(Debug, Clone, PartialEq, Eq)]
89pub struct PhysicalChangeset {
90    pub header: PhysicalHeader,
91    pub pages: Vec<PageEntry>,
92    pub checksum: u64,
93}
94
95impl PhysicalChangeset {
96    /// Create a new physical changeset. Pages are sorted by page_id for determinism.
97    ///
98    /// Panics if any page_id variant doesn't match the declared page_id_size.
99    pub fn new(
100        seq: u64,
101        prev_checksum: u64,
102        page_id_size: PageIdSize,
103        page_size: u32,
104        mut pages: Vec<PageEntry>,
105    ) -> Self {
106        // Validate all page IDs match the declared size
107        for page in &pages {
108            match (page_id_size, &page.page_id) {
109                (PageIdSize::U32, PageId::U32(_)) | (PageIdSize::U64, PageId::U64(_)) => {}
110                (expected, got) => panic!(
111                    "page_id variant mismatch: declared {:?} but got {:?}",
112                    expected, got
113                ),
114            }
115        }
116
117        pages.sort_by_key(|p| p.page_id);
118        let checksum = compute_checksum(prev_checksum, page_id_size, &pages);
119        let created_ms = std::time::SystemTime::now()
120            .duration_since(std::time::UNIX_EPOCH)
121            .map(|d| d.as_millis() as i64)
122            .unwrap_or(0);
123        Self {
124            header: PhysicalHeader {
125                flags: 0,
126                page_id_size,
127                page_size,
128                seq,
129                prev_checksum,
130                page_count: pages.len() as u32,
131                created_ms,
132            },
133            pages,
134            checksum,
135        }
136    }
137}
138
139/// Compute checksum for a physical changeset.
140/// SHA-256(prev_checksum_be || page_id_be || data_len_be || data ...) truncated to u64.
141/// Pages are sorted by page_id for determinism.
142pub fn compute_checksum(prev_checksum: u64, page_id_size: PageIdSize, pages: &[PageEntry]) -> u64 {
143    let mut hasher = Sha256::new();
144    hasher.update(prev_checksum.to_be_bytes());
145
146    let mut sorted_indices: Vec<usize> = (0..pages.len()).collect();
147    sorted_indices.sort_by_key(|&i| pages[i].page_id);
148
149    for &i in &sorted_indices {
150        // Write page_id as the correct width
151        match page_id_size {
152            PageIdSize::U32 => {
153                let id = pages[i].page_id.to_u64() as u32;
154                hasher.update(id.to_be_bytes());
155            }
156            PageIdSize::U64 => {
157                hasher.update(pages[i].page_id.to_u64().to_be_bytes());
158            }
159        }
160        hasher.update((pages[i].data.len() as u32).to_be_bytes());
161        hasher.update(&pages[i].data);
162    }
163
164    let result = hasher.finalize();
165    u64::from_be_bytes(result[0..8].try_into().expect("sha256 is 32 bytes"))
166}
167
168/// Verify that a changeset's checksum matches the expected chain.
169pub fn verify_chain(
170    expected_prev_checksum: u64,
171    changeset: &PhysicalChangeset,
172) -> Result<(), ChangesetError> {
173    if changeset.header.prev_checksum != expected_prev_checksum {
174        return Err(ChangesetError::ChainBroken {
175            expected: expected_prev_checksum,
176            changeset_prev: changeset.header.prev_checksum,
177        });
178    }
179    let computed = compute_checksum(
180        expected_prev_checksum,
181        changeset.header.page_id_size,
182        &changeset.pages,
183    );
184    if computed != changeset.checksum {
185        return Err(ChangesetError::ChecksumMismatch {
186            expected: changeset.checksum,
187            actual: computed,
188        });
189    }
190    Ok(())
191}
192
193/// Encode a physical changeset into binary format.
194pub fn encode(changeset: &PhysicalChangeset) -> Vec<u8> {
195    let pid_len = changeset.header.page_id_size.byte_len();
196    let body_size: usize = changeset
197        .pages
198        .iter()
199        .map(|p| pid_len + 4 + p.data.len())
200        .sum();
201    let mut buf = Vec::with_capacity(HEADER_SIZE + body_size + TRAILER_SIZE);
202
203    // Header
204    buf.extend_from_slice(&HADBP_MAGIC);
205    buf.push(HADBP_VERSION);
206    buf.push(changeset.header.flags);
207    buf.push(changeset.header.page_id_size as u8);
208    buf.extend_from_slice(&changeset.header.page_size.to_be_bytes());
209    buf.extend_from_slice(&changeset.header.seq.to_be_bytes());
210    buf.extend_from_slice(&changeset.header.prev_checksum.to_be_bytes());
211    buf.extend_from_slice(&changeset.header.page_count.to_be_bytes());
212    buf.extend_from_slice(&changeset.header.created_ms.to_be_bytes());
213
214    // Pages (sorted by page_id)
215    let mut sorted_indices: Vec<usize> = (0..changeset.pages.len()).collect();
216    sorted_indices.sort_by_key(|&i| changeset.pages[i].page_id);
217
218    for &i in &sorted_indices {
219        let page = &changeset.pages[i];
220        match page.page_id {
221            PageId::U32(v) => buf.extend_from_slice(&v.to_be_bytes()),
222            PageId::U64(v) => buf.extend_from_slice(&v.to_be_bytes()),
223        }
224        buf.extend_from_slice(&(page.data.len() as u32).to_be_bytes());
225        buf.extend_from_slice(&page.data);
226    }
227
228    // Checksum
229    buf.extend_from_slice(&changeset.checksum.to_be_bytes());
230
231    buf
232}
233
234/// Decode a physical changeset from binary data.
235/// Validates magic, version, and recomputes checksum.
236pub fn decode(data: &[u8]) -> Result<PhysicalChangeset, ChangesetError> {
237    if data.len() < MIN_SIZE {
238        return Err(ChangesetError::Truncated {
239            needed: MIN_SIZE,
240            available: data.len(),
241        });
242    }
243
244    let mut pos = 0;
245
246    // Magic
247    if &data[pos..pos + 5] != &HADBP_MAGIC {
248        return Err(ChangesetError::InvalidMagic);
249    }
250    pos += 5;
251
252    // Version
253    let version = data[pos];
254    if version != HADBP_VERSION {
255        return Err(ChangesetError::UnsupportedVersion(version));
256    }
257    pos += 1;
258
259    // Flags
260    let flags = data[pos];
261    pos += 1;
262
263    // Page ID size
264    let page_id_size = PageIdSize::from_byte(data[pos])?;
265    pos += 1;
266
267    // Page size
268    let page_size = u32::from_be_bytes(data[pos..pos + 4].try_into().expect("4 bytes"));
269    pos += 4;
270
271    // Seq
272    let seq = u64::from_be_bytes(data[pos..pos + 8].try_into().expect("8 bytes"));
273    pos += 8;
274
275    // Prev checksum
276    let prev_checksum = u64::from_be_bytes(data[pos..pos + 8].try_into().expect("8 bytes"));
277    pos += 8;
278
279    // Page count
280    let page_count = u32::from_be_bytes(data[pos..pos + 4].try_into().expect("4 bytes"));
281    pos += 4;
282
283    // Created timestamp
284    let created_ms = i64::from_be_bytes(data[pos..pos + 8].try_into().expect("8 bytes"));
285    pos += 8;
286
287    // Pages
288    let pid_len = page_id_size.byte_len();
289    let mut pages = Vec::with_capacity(page_count as usize);
290
291    for _ in 0..page_count {
292        // Need pid_len + 4 (data_len)
293        if pos + pid_len + 4 > data.len() {
294            return Err(ChangesetError::Truncated {
295                needed: pos + pid_len + 4,
296                available: data.len(),
297            });
298        }
299
300        let page_id = match page_id_size {
301            PageIdSize::U32 => {
302                let v = u32::from_be_bytes(data[pos..pos + 4].try_into().expect("4 bytes"));
303                PageId::U32(v)
304            }
305            PageIdSize::U64 => {
306                let v = u64::from_be_bytes(data[pos..pos + 8].try_into().expect("8 bytes"));
307                PageId::U64(v)
308            }
309        };
310        pos += pid_len;
311
312        let data_len = u32::from_be_bytes(data[pos..pos + 4].try_into().expect("4 bytes"));
313        pos += 4;
314
315        if data_len > page_size {
316            return Err(ChangesetError::PageTooLarge {
317                data_len,
318                page_size,
319            });
320        }
321
322        if pos + data_len as usize > data.len() {
323            return Err(ChangesetError::Truncated {
324                needed: pos + data_len as usize,
325                available: data.len(),
326            });
327        }
328        let page_data = data[pos..pos + data_len as usize].to_vec();
329        pos += data_len as usize;
330
331        pages.push(PageEntry {
332            page_id,
333            data: page_data,
334        });
335    }
336
337    // Checksum
338    if pos + 8 > data.len() {
339        return Err(ChangesetError::Truncated {
340            needed: pos + 8,
341            available: data.len(),
342        });
343    }
344    let stored_checksum = u64::from_be_bytes(data[pos..pos + 8].try_into().expect("8 bytes"));
345    pos += 8;
346
347    // Reject trailing bytes
348    if pos != data.len() {
349        return Err(ChangesetError::Truncated {
350            needed: pos,
351            available: data.len(),
352        });
353    }
354
355    // Verify checksum
356    let computed_checksum = compute_checksum(prev_checksum, page_id_size, &pages);
357    if computed_checksum != stored_checksum {
358        return Err(ChangesetError::ChecksumMismatch {
359            expected: stored_checksum,
360            actual: computed_checksum,
361        });
362    }
363
364    Ok(PhysicalChangeset {
365        header: PhysicalHeader {
366            flags,
367            page_id_size,
368            page_size,
369            seq,
370            prev_checksum,
371            page_count,
372            created_ms,
373        },
374        pages,
375        checksum: stored_checksum,
376    })
377}
378
379#[cfg(test)]
380mod tests {
381    use super::*;
382
383    fn page_u32(id: u32, fill: u8, len: usize) -> PageEntry {
384        PageEntry {
385            page_id: PageId::U32(id),
386            data: vec![fill; len],
387        }
388    }
389
390    fn page_u64(id: u64, fill: u8, len: usize) -> PageEntry {
391        PageEntry {
392            page_id: PageId::U64(id),
393            data: vec![fill; len],
394        }
395    }
396
397    // --- Happy path ---
398
399    #[test]
400    fn test_encode_decode_roundtrip_u32() {
401        let pages = vec![page_u32(1, 0xAA, 256), page_u32(2, 0xBB, 512), page_u32(5, 0xCC, 128)];
402        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, pages);
403        let encoded = encode(&cs);
404        let decoded = decode(&encoded).unwrap();
405        assert_eq!(cs, decoded);
406        assert_eq!(decoded.header.page_id_size, PageIdSize::U32);
407    }
408
409    #[test]
410    fn test_encode_decode_roundtrip_u64() {
411        let pages = vec![page_u64(0, 0xAA, 256), page_u64(1, 0xBB, 512), page_u64(5, 0xCC, 128)];
412        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, pages);
413        let encoded = encode(&cs);
414        let decoded = decode(&encoded).unwrap();
415        assert_eq!(cs, decoded);
416        assert_eq!(decoded.header.page_id_size, PageIdSize::U64);
417    }
418
419    #[test]
420    fn test_single_page() {
421        let cs = PhysicalChangeset::new(42, 12345, PageIdSize::U32, 4096, vec![page_u32(7, 0xFF, 100)]);
422        let decoded = decode(&encode(&cs)).unwrap();
423        assert_eq!(decoded.header.seq, 42);
424        assert_eq!(decoded.header.prev_checksum, 12345);
425        assert_eq!(decoded.pages.len(), 1);
426        assert_eq!(decoded.pages[0].page_id, PageId::U32(7));
427    }
428
429    #[test]
430    fn test_checksum_chain_valid() {
431        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
432        verify_chain(0, &cs).unwrap();
433    }
434
435    #[test]
436    fn test_sequential_chain() {
437        let cs1 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
438        verify_chain(0, &cs1).unwrap();
439
440        let cs2 = PhysicalChangeset::new(2, cs1.checksum, PageIdSize::U64, 262144, vec![page_u64(1, 0xBB, 64)]);
441        verify_chain(cs1.checksum, &cs2).unwrap();
442    }
443
444    #[test]
445    fn test_three_changeset_chain() {
446        let cs1 = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![page_u32(1, 0x11, 32)]);
447        let cs2 = PhysicalChangeset::new(2, cs1.checksum, PageIdSize::U32, 4096, vec![page_u32(2, 0x22, 32), page_u32(3, 0x33, 32)]);
448        let cs3 = PhysicalChangeset::new(3, cs2.checksum, PageIdSize::U32, 4096, vec![page_u32(1, 0x44, 32)]);
449
450        verify_chain(0, &cs1).unwrap();
451        verify_chain(cs1.checksum, &cs2).unwrap();
452        verify_chain(cs2.checksum, &cs3).unwrap();
453    }
454
455    #[test]
456    fn test_page_id_size_preserved() {
457        let cs_u32 = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![page_u32(1, 0xAA, 32)]);
458        let cs_u64 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(1, 0xAA, 32)]);
459
460        assert_eq!(decode(&encode(&cs_u32)).unwrap().header.page_id_size, PageIdSize::U32);
461        assert_eq!(decode(&encode(&cs_u64)).unwrap().header.page_id_size, PageIdSize::U64);
462    }
463
464    // --- Negative ---
465
466    #[test]
467    fn test_bad_magic() {
468        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
469        let mut encoded = encode(&cs);
470        encoded[0] = b'X';
471        assert!(matches!(decode(&encoded).unwrap_err(), ChangesetError::InvalidMagic));
472    }
473
474    #[test]
475    fn test_bad_version() {
476        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
477        let mut encoded = encode(&cs);
478        encoded[5] = 99;
479        assert!(matches!(decode(&encoded).unwrap_err(), ChangesetError::UnsupportedVersion(99)));
480    }
481
482    #[test]
483    fn test_checksum_mismatch() {
484        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
485        let mut encoded = encode(&cs);
486        let data_offset = HEADER_SIZE + 8 + 4; // past header + page_id(8) + data_len(4)
487        encoded[data_offset] ^= 0xFF;
488        assert!(matches!(decode(&encoded).unwrap_err(), ChangesetError::ChecksumMismatch { .. }));
489    }
490
491    #[test]
492    fn test_truncated_header() {
493        assert!(matches!(decode(&[0u8; 10]).unwrap_err(), ChangesetError::Truncated { .. }));
494    }
495
496    #[test]
497    fn test_truncated_page_data() {
498        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
499        let encoded = encode(&cs);
500        assert!(matches!(decode(&encoded[..HEADER_SIZE + 5]).unwrap_err(), ChangesetError::Truncated { .. }));
501    }
502
503    #[test]
504    fn test_chain_broken() {
505        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
506        assert!(matches!(verify_chain(999, &cs).unwrap_err(), ChangesetError::ChainBroken { .. }));
507    }
508
509    #[test]
510    fn test_invalid_page_id_size() {
511        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
512        let mut encoded = encode(&cs);
513        encoded[7] = 3; // invalid: not 4 or 8
514        assert!(matches!(decode(&encoded).unwrap_err(), ChangesetError::InvalidPageIdSize(3)));
515    }
516
517    #[test]
518    fn test_page_too_large() {
519        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
520        let mut encoded = encode(&cs);
521        // Overwrite data_len to exceed page_size
522        let data_len_offset = HEADER_SIZE + 8; // past header + page_id(8)
523        let huge: u32 = 262144 + 1;
524        encoded[data_len_offset..data_len_offset + 4].copy_from_slice(&huge.to_be_bytes());
525        assert!(matches!(decode(&encoded).unwrap_err(), ChangesetError::PageTooLarge { .. }));
526    }
527
528    #[test]
529    fn test_trailing_bytes() {
530        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 64)]);
531        let mut encoded = encode(&cs);
532        encoded.push(0xFF);
533        assert!(matches!(decode(&encoded).unwrap_err(), ChangesetError::Truncated { .. }));
534    }
535
536    // --- Edge cases ---
537
538    #[test]
539    fn test_empty_changeset() {
540        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![]);
541        let decoded = decode(&encode(&cs)).unwrap();
542        assert_eq!(decoded.pages.len(), 0);
543        verify_chain(0, &decoded).unwrap();
544    }
545
546    #[test]
547    fn test_large_changeset() {
548        let pages: Vec<PageEntry> = (0..1000).map(|i| page_u64(i, (i % 256) as u8, 64)).collect();
549        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, pages);
550        let decoded = decode(&encode(&cs)).unwrap();
551        assert_eq!(decoded.pages.len(), 1000);
552        verify_chain(0, &decoded).unwrap();
553    }
554
555    #[test]
556    fn test_partial_page() {
557        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![page_u32(1, 0xAA, 1024)]);
558        let decoded = decode(&encode(&cs)).unwrap();
559        assert_eq!(decoded.pages[0].data.len(), 1024);
560    }
561
562    #[test]
563    fn test_full_size_page_u32() {
564        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![page_u32(1, 0xAA, 4096)]);
565        let decoded = decode(&encode(&cs)).unwrap();
566        assert_eq!(decoded.pages[0].data.len(), 4096);
567    }
568
569    #[test]
570    fn test_full_size_page_u64() {
571        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xBB, 262144)]);
572        let decoded = decode(&encode(&cs)).unwrap();
573        assert_eq!(decoded.pages[0].data.len(), 262144);
574    }
575
576    #[test]
577    fn test_page_ordering_determinism() {
578        let asc = vec![page_u64(0, 0xAA, 32), page_u64(1, 0xBB, 32)];
579        let desc = vec![page_u64(1, 0xBB, 32), page_u64(0, 0xAA, 32)];
580
581        let cs1 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, asc);
582        let cs2 = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, desc);
583        assert_eq!(encode(&cs1), encode(&cs2));
584    }
585
586    #[test]
587    fn test_duplicate_page_ids() {
588        let pages = vec![page_u64(0, 0xAA, 32), page_u64(0, 0xBB, 32)];
589        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, pages);
590        assert_eq!(cs.pages.len(), 2);
591        let decoded = decode(&encode(&cs)).unwrap();
592        assert_eq!(decoded.pages.len(), 2);
593        verify_chain(0, &decoded).unwrap();
594    }
595
596    #[test]
597    fn test_zero_length_page_data() {
598        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![PageEntry { page_id: PageId::U32(1), data: vec![] }]);
599        let decoded = decode(&encode(&cs)).unwrap();
600        assert_eq!(decoded.pages[0].data.len(), 0);
601        verify_chain(0, &decoded).unwrap();
602    }
603
604    #[test]
605    fn test_unsorted_pages_sorted_on_new() {
606        let pages = vec![page_u64(5, 0xCC, 32), page_u64(0, 0xAA, 32), page_u64(3, 0xBB, 32)];
607        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, pages);
608        assert_eq!(cs.pages[0].page_id, PageId::U64(0));
609        assert_eq!(cs.pages[1].page_id, PageId::U64(3));
610        assert_eq!(cs.pages[2].page_id, PageId::U64(5));
611        assert_eq!(cs, decode(&encode(&cs)).unwrap());
612    }
613
614    #[test]
615    fn test_different_data_different_checksum() {
616        let cs1 = compute_checksum(0, PageIdSize::U64, &[page_u64(0, 0xAA, 32)]);
617        let cs2 = compute_checksum(0, PageIdSize::U64, &[page_u64(0, 0xBB, 32)]);
618        assert_ne!(cs1, cs2);
619    }
620
621    #[test]
622    fn test_different_prev_different_checksum() {
623        let pages = vec![page_u64(0, 0xAA, 32)];
624        let cs1 = compute_checksum(0, PageIdSize::U64, &pages);
625        let cs2 = compute_checksum(1, PageIdSize::U64, &pages);
626        assert_ne!(cs1, cs2);
627    }
628
629    #[test]
630    fn test_u32_max_page_id() {
631        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, vec![page_u32(u32::MAX, 0xAA, 16)]);
632        let decoded = decode(&encode(&cs)).unwrap();
633        assert_eq!(decoded.pages[0].page_id, PageId::U32(u32::MAX));
634    }
635
636    #[test]
637    fn test_u64_max_page_id() {
638        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(u64::MAX, 0xBB, 16)]);
639        let decoded = decode(&encode(&cs)).unwrap();
640        assert_eq!(decoded.pages[0].page_id, PageId::U64(u64::MAX));
641    }
642
643    #[test]
644    fn test_different_page_id_size_different_checksum() {
645        // Same numeric page ID and data, different PageIdSize should produce different checksums
646        // because the byte width of the page_id in the hash input differs
647        let cs_u32 = compute_checksum(0, PageIdSize::U32, &[page_u32(1, 0xAA, 32)]);
648        let cs_u64 = compute_checksum(0, PageIdSize::U64, &[page_u64(1, 0xAA, 32)]);
649        assert_ne!(cs_u32, cs_u64);
650    }
651
652    #[test]
653    fn test_page_size_preserved() {
654        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U32, 8192, vec![page_u32(1, 0xAA, 32)]);
655        let decoded = decode(&encode(&cs)).unwrap();
656        assert_eq!(decoded.header.page_size, 8192);
657    }
658
659    #[test]
660    #[should_panic(expected = "page_id variant mismatch")]
661    fn test_mixed_page_id_variants_panics() {
662        // Mixing U32 and U64 page IDs in a U32 changeset should panic
663        let pages = vec![
664            PageEntry { page_id: PageId::U32(1), data: vec![0xAA; 32] },
665            PageEntry { page_id: PageId::U64(2), data: vec![0xBB; 32] },
666        ];
667        PhysicalChangeset::new(1, 0, PageIdSize::U32, 4096, pages);
668    }
669
670    #[test]
671    fn test_flags_roundtrip() {
672        // Create changeset with non-zero flags (reserved bits)
673        let mut cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 32)]);
674        cs.header.flags = 0x03; // simulate compression + encryption flags
675
676        let encoded = encode(&cs);
677        let decoded = decode(&encoded).unwrap();
678        assert_eq!(decoded.header.flags, 0x03);
679    }
680
681    #[test]
682    fn test_timestamp_preserved() {
683        let cs = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page_u64(0, 0xAA, 32)]);
684        assert!(cs.header.created_ms > 0, "timestamp should be set by new()");
685
686        let decoded = decode(&encode(&cs)).unwrap();
687        assert_eq!(decoded.header.created_ms, cs.header.created_ms);
688    }
689}