Skip to main content

hadb_changeset/
journal.rs

1//! HADBJ (hadb journal) format for logical replication.
2//!
3//! Ships opaque journal entries (rewritten queries, WAL records, etc.) with
4//! per-entry CRC32C + SHA-256 chain integrity. Supports sealed/unsealed
5//! lifecycle and optional zstd compression.
6//!
7//! The entry payload is opaque bytes. Each engine defines its own payload
8//! format (e.g., graphstream uses protobuf Cypher entries).
9//!
10//! ## Format
11//!
12//! ```text
13//! Header (128 bytes):
14//!   magic(5) "HADBJ" | version(1) | flags(1) | compression(1)
15//!   first_seq(8) | last_seq(8) | entry_count(8) | body_len(8)
16//!   body_checksum(32) | prev_segment_checksum(8) | created_ms(8)
17//!   reserved(32)
18//!
19//! Body (variable):
20//!   [entry_crc32c(4) | payload_len(4) | sequence(8) | prev_hash(32) | payload(payload_len)]*
21//!
22//! Trailer (optional, 32 bytes when FLAG_HAS_CHAIN_HASH):
23//!   chain_hash(32)   -- SHA-256 of last entry, enables O(1) recovery
24//! ```
25
26use sha2::{Digest, Sha256};
27
28use crate::error::ChangesetError;
29
30pub const HADBJ_MAGIC: [u8; 5] = *b"HADBJ";
31pub const HADBJ_VERSION: u8 = 1;
32
33/// Header size in bytes.
34pub const HEADER_SIZE: usize = 128;
35/// Per-entry fixed header: crc32c(4) + payload_len(4) + sequence(8) + prev_hash(32) = 48.
36pub const ENTRY_HEADER_SIZE: usize = 48;
37/// Chain hash trailer size.
38pub const CHAIN_HASH_TRAILER_SIZE: usize = 32;
39
40// Flags
41pub const FLAG_SEALED: u8 = 0x04;
42pub const FLAG_COMPRESSED: u8 = 0x01;
43pub const FLAG_HAS_CHAIN_HASH: u8 = 0x08;
44
45// Compression algorithms
46pub const COMPRESSION_NONE: u8 = 0;
47#[cfg(feature = "journal")]
48pub const COMPRESSION_ZSTD: u8 = 1;
49
50/// Zero hash (32 bytes of zeros) used as prev_hash for the first entry.
51pub const ZERO_HASH: [u8; 32] = [0u8; 32];
52
53/// Header for a journal segment.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct JournalHeader {
56    pub flags: u8,
57    pub compression: u8,
58    /// Sequence number of the first entry in this segment.
59    pub first_seq: u64,
60    /// Sequence number of the last entry (0 if unsealed).
61    pub last_seq: u64,
62    /// Number of entries (0 if unsealed).
63    pub entry_count: u64,
64    /// Length of the body in bytes (0 if unsealed).
65    pub body_len: u64,
66    /// SHA-256 of the body bytes (zeros if unsealed).
67    pub body_checksum: [u8; 32],
68    /// Checksum from the previous segment (for cross-segment chain verification).
69    pub prev_segment_checksum: u64,
70    /// Milliseconds since Unix epoch.
71    pub created_ms: i64,
72}
73
74impl JournalHeader {
75    /// Returns true if this segment is sealed (finalized).
76    pub fn is_sealed(&self) -> bool {
77        self.flags & FLAG_SEALED != 0
78    }
79
80    /// Returns true if the body is compressed.
81    pub fn is_compressed(&self) -> bool {
82        self.flags & FLAG_COMPRESSED != 0
83    }
84
85    /// Returns true if a chain hash trailer is present.
86    pub fn has_chain_hash(&self) -> bool {
87        self.flags & FLAG_HAS_CHAIN_HASH != 0
88    }
89}
90
91/// A single journal entry with opaque payload.
92#[derive(Debug, Clone, PartialEq, Eq)]
93pub struct JournalEntry {
94    /// Entry sequence number (1-indexed, monotonically increasing).
95    pub sequence: u64,
96    /// SHA-256 hash of the previous entry (zeros for the first entry).
97    pub prev_hash: [u8; 32],
98    /// Opaque payload bytes (engine defines the format).
99    pub payload: Vec<u8>,
100}
101
102/// A complete sealed journal segment: header + entries + chain hash.
103#[derive(Debug, Clone, PartialEq, Eq)]
104pub struct JournalSegment {
105    pub header: JournalHeader,
106    pub entries: Vec<JournalEntry>,
107    /// SHA-256 chain hash of the last entry.
108    pub chain_hash: [u8; 32],
109}
110
111// ============================================================================
112// Entry-level operations
113// ============================================================================
114
115/// Compute the chain hash for an entry: SHA-256(prev_hash || payload).
116pub fn compute_entry_hash(prev_hash: &[u8; 32], payload: &[u8]) -> [u8; 32] {
117    let mut hasher = Sha256::new();
118    hasher.update(prev_hash);
119    hasher.update(payload);
120    let result = hasher.finalize();
121    let mut hash = [0u8; 32];
122    hash.copy_from_slice(&result);
123    hash
124}
125
126/// Compute CRC32C of entry bytes (everything after the CRC32C field).
127fn compute_entry_crc32c(payload_len: u32, sequence: u64, prev_hash: &[u8; 32], payload: &[u8]) -> u32 {
128    let mut buf = Vec::with_capacity(4 + 8 + 32 + payload.len());
129    buf.extend_from_slice(&payload_len.to_be_bytes());
130    buf.extend_from_slice(&sequence.to_be_bytes());
131    buf.extend_from_slice(prev_hash);
132    buf.extend_from_slice(payload);
133    crc32c::crc32c(&buf)
134}
135
136/// Encode a single journal entry to bytes.
137pub fn encode_entry(entry: &JournalEntry) -> Vec<u8> {
138    let payload_len = entry.payload.len() as u32;
139    let crc = compute_entry_crc32c(payload_len, entry.sequence, &entry.prev_hash, &entry.payload);
140
141    let mut buf = Vec::with_capacity(ENTRY_HEADER_SIZE + entry.payload.len());
142    buf.extend_from_slice(&crc.to_be_bytes());
143    buf.extend_from_slice(&payload_len.to_be_bytes());
144    buf.extend_from_slice(&entry.sequence.to_be_bytes());
145    buf.extend_from_slice(&entry.prev_hash);
146    buf.extend_from_slice(&entry.payload);
147    buf
148}
149
150/// Decode a single journal entry from bytes at the given offset.
151/// Returns the entry and the number of bytes consumed.
152pub fn decode_entry(data: &[u8], offset: usize) -> Result<(JournalEntry, usize), ChangesetError> {
153    if offset + ENTRY_HEADER_SIZE > data.len() {
154        return Err(ChangesetError::Truncated {
155            needed: offset + ENTRY_HEADER_SIZE,
156            available: data.len(),
157        });
158    }
159
160    let pos = offset;
161    let stored_crc = u32::from_be_bytes(data[pos..pos + 4].try_into().expect("4 bytes"));
162    let payload_len = u32::from_be_bytes(data[pos + 4..pos + 8].try_into().expect("4 bytes"));
163    let sequence = u64::from_be_bytes(data[pos + 8..pos + 16].try_into().expect("8 bytes"));
164
165    let mut prev_hash = [0u8; 32];
166    prev_hash.copy_from_slice(&data[pos + 16..pos + 48]);
167
168    let payload_start = pos + ENTRY_HEADER_SIZE;
169    let payload_end = payload_start + payload_len as usize;
170
171    if payload_end > data.len() {
172        return Err(ChangesetError::Truncated {
173            needed: payload_end,
174            available: data.len(),
175        });
176    }
177
178    let payload = data[payload_start..payload_end].to_vec();
179
180    // Verify CRC32C
181    let computed_crc = compute_entry_crc32c(payload_len, sequence, &prev_hash, &payload);
182    if computed_crc != stored_crc {
183        return Err(ChangesetError::ChecksumMismatch {
184            expected: stored_crc as u64,
185            actual: computed_crc as u64,
186        });
187    }
188
189    let entry = JournalEntry {
190        sequence,
191        prev_hash,
192        payload,
193    };
194    let consumed = ENTRY_HEADER_SIZE + payload_len as usize;
195    Ok((entry, consumed))
196}
197
198// ============================================================================
199// Segment-level operations
200// ============================================================================
201
202/// Encode a header to bytes.
203pub fn encode_header(header: &JournalHeader) -> [u8; HEADER_SIZE] {
204    let mut buf = [0u8; HEADER_SIZE];
205    buf[0..5].copy_from_slice(&HADBJ_MAGIC);
206    buf[5] = HADBJ_VERSION;
207    buf[6] = header.flags;
208    buf[7] = header.compression;
209    // 8-11: reserved (zeros)
210    buf[12..20].copy_from_slice(&header.first_seq.to_be_bytes());
211    buf[20..28].copy_from_slice(&header.last_seq.to_be_bytes());
212    buf[28..36].copy_from_slice(&header.entry_count.to_be_bytes());
213    buf[36..44].copy_from_slice(&header.body_len.to_be_bytes());
214    buf[44..76].copy_from_slice(&header.body_checksum);
215    buf[76..84].copy_from_slice(&header.prev_segment_checksum.to_be_bytes());
216    buf[84..92].copy_from_slice(&header.created_ms.to_be_bytes());
217    // 92-127: reserved (zeros)
218    buf
219}
220
221/// Decode a header from bytes.
222pub fn decode_header(data: &[u8]) -> Result<JournalHeader, ChangesetError> {
223    if data.len() < HEADER_SIZE {
224        return Err(ChangesetError::Truncated {
225            needed: HEADER_SIZE,
226            available: data.len(),
227        });
228    }
229
230    if &data[0..5] != &HADBJ_MAGIC {
231        return Err(ChangesetError::InvalidMagic);
232    }
233    if data[5] != HADBJ_VERSION {
234        return Err(ChangesetError::UnsupportedVersion(data[5]));
235    }
236
237    let flags = data[6];
238    let compression = data[7];
239
240    let first_seq = u64::from_be_bytes(data[12..20].try_into().expect("8 bytes"));
241    let last_seq = u64::from_be_bytes(data[20..28].try_into().expect("8 bytes"));
242    let entry_count = u64::from_be_bytes(data[28..36].try_into().expect("8 bytes"));
243    let body_len = u64::from_be_bytes(data[36..44].try_into().expect("8 bytes"));
244
245    let mut body_checksum = [0u8; 32];
246    body_checksum.copy_from_slice(&data[44..76]);
247
248    let prev_segment_checksum = u64::from_be_bytes(data[76..84].try_into().expect("8 bytes"));
249    let created_ms = i64::from_be_bytes(data[84..92].try_into().expect("8 bytes"));
250
251    Ok(JournalHeader {
252        flags,
253        compression,
254        first_seq,
255        last_seq,
256        entry_count,
257        body_len,
258        body_checksum,
259        prev_segment_checksum,
260        created_ms,
261    })
262}
263
264/// Create a new sealed journal segment from entries.
265///
266/// Entries must be provided in sequence order. The chain hash is computed
267/// by walking the entries and verifying/computing prev_hash for each.
268///
269/// `prev_hash` is the chain hash from the previous segment (or ZERO_HASH for the first).
270/// `prev_segment_checksum` is the truncated checksum from the previous segment (or 0).
271pub fn seal(
272    entries: Vec<JournalEntry>,
273    prev_segment_checksum: u64,
274) -> JournalSegment {
275    assert!(!entries.is_empty(), "cannot seal an empty journal segment");
276
277    let first_seq = entries[0].sequence;
278    let last_seq = entries[entries.len() - 1].sequence;
279    let entry_count = entries.len() as u64;
280
281    // Encode raw body
282    let mut body = Vec::new();
283    for entry in &entries {
284        body.extend_from_slice(&encode_entry(entry));
285    }
286
287    // Compute body checksum
288    let body_checksum = {
289        let mut hasher = Sha256::new();
290        hasher.update(&body);
291        let result = hasher.finalize();
292        let mut cs = [0u8; 32];
293        cs.copy_from_slice(&result);
294        cs
295    };
296
297    // Chain hash is the last entry's hash
298    let chain_hash = compute_entry_hash(
299        &entries[entries.len() - 1].prev_hash,
300        &entries[entries.len() - 1].payload,
301    );
302
303    let created_ms = std::time::SystemTime::now()
304        .duration_since(std::time::UNIX_EPOCH)
305        .map(|d| d.as_millis() as i64)
306        .unwrap_or(0);
307
308    let header = JournalHeader {
309        flags: FLAG_SEALED | FLAG_HAS_CHAIN_HASH,
310        compression: COMPRESSION_NONE,
311        first_seq,
312        last_seq,
313        entry_count,
314        body_len: body.len() as u64,
315        body_checksum,
316        prev_segment_checksum,
317        created_ms,
318    };
319
320    JournalSegment {
321        header,
322        entries,
323        chain_hash,
324    }
325}
326
327/// Encode a sealed journal segment to bytes.
328pub fn encode(segment: &JournalSegment) -> Vec<u8> {
329    let mut body = Vec::new();
330    for entry in &segment.entries {
331        body.extend_from_slice(&encode_entry(entry));
332    }
333
334    let total = HEADER_SIZE + body.len() + CHAIN_HASH_TRAILER_SIZE;
335    let mut buf = Vec::with_capacity(total);
336
337    buf.extend_from_slice(&encode_header(&segment.header));
338    buf.extend_from_slice(&body);
339    buf.extend_from_slice(&segment.chain_hash);
340
341    buf
342}
343
344/// Encode a sealed journal segment with zstd compression.
345#[cfg(feature = "journal")]
346pub fn encode_compressed(segment: &JournalSegment, zstd_level: i32) -> Vec<u8> {
347    let mut raw_body = Vec::new();
348    for entry in &segment.entries {
349        raw_body.extend_from_slice(&encode_entry(entry));
350    }
351
352    let compressed = zstd::encode_all(raw_body.as_slice(), zstd_level)
353        .expect("zstd compression should not fail");
354
355    let body_checksum = {
356        let mut hasher = Sha256::new();
357        hasher.update(&compressed);
358        let result = hasher.finalize();
359        let mut cs = [0u8; 32];
360        cs.copy_from_slice(&result);
361        cs
362    };
363
364    let mut header = segment.header.clone();
365    header.flags |= FLAG_COMPRESSED;
366    header.compression = COMPRESSION_ZSTD;
367    header.body_len = compressed.len() as u64;
368    header.body_checksum = body_checksum;
369
370    let total = HEADER_SIZE + compressed.len() + CHAIN_HASH_TRAILER_SIZE;
371    let mut buf = Vec::with_capacity(total);
372
373    buf.extend_from_slice(&encode_header(&header));
374    buf.extend_from_slice(&compressed);
375    buf.extend_from_slice(&segment.chain_hash);
376
377    buf
378}
379
380/// Decode a sealed journal segment from bytes.
381///
382/// Handles both raw and compressed bodies. Verifies body checksum,
383/// per-entry CRC32C, and SHA-256 chain.
384pub fn decode(data: &[u8]) -> Result<JournalSegment, ChangesetError> {
385    let header = decode_header(data)?;
386
387    if !header.is_sealed() {
388        return Err(ChangesetError::InvalidFlags(header.flags));
389    }
390
391    let body_start = HEADER_SIZE;
392    let body_end = body_start + header.body_len as usize;
393
394    if body_end > data.len() {
395        return Err(ChangesetError::Truncated {
396            needed: body_end,
397            available: data.len(),
398        });
399    }
400
401    let body_bytes = &data[body_start..body_end];
402
403    // Verify body checksum
404    let computed_body_checksum = {
405        let mut hasher = Sha256::new();
406        hasher.update(body_bytes);
407        let result = hasher.finalize();
408        let mut cs = [0u8; 32];
409        cs.copy_from_slice(&result);
410        cs
411    };
412    if computed_body_checksum != header.body_checksum {
413        return Err(ChangesetError::ChecksumMismatch {
414            expected: u64::from_be_bytes(header.body_checksum[0..8].try_into().expect("8 bytes")),
415            actual: u64::from_be_bytes(computed_body_checksum[0..8].try_into().expect("8 bytes")),
416        });
417    }
418
419    // Decompress if needed
420    let raw_body: Vec<u8> = if header.is_compressed() {
421        #[cfg(feature = "journal")]
422        {
423            zstd::decode_all(body_bytes)
424                .map_err(|e| ChangesetError::Io(std::io::Error::new(std::io::ErrorKind::InvalidData, e)))?
425        }
426        #[cfg(not(feature = "journal"))]
427        {
428            return Err(ChangesetError::InvalidFlags(header.flags));
429        }
430    } else {
431        body_bytes.to_vec()
432    };
433
434    // Decode entries
435    let mut entries = Vec::with_capacity(header.entry_count as usize);
436    let mut offset = 0;
437    while offset < raw_body.len() {
438        let (entry, consumed) = decode_entry(&raw_body, offset)?;
439        entries.push(entry);
440        offset += consumed;
441    }
442
443    if entries.len() as u64 != header.entry_count {
444        return Err(ChangesetError::Truncated {
445            needed: header.entry_count as usize,
446            available: entries.len(),
447        });
448    }
449
450    // Verify chain: walk entries and check prev_hash linkage
451    let mut running_hash = if !entries.is_empty() {
452        entries[0].prev_hash
453    } else {
454        ZERO_HASH
455    };
456
457    for entry in &entries {
458        if entry.prev_hash != running_hash {
459            return Err(ChangesetError::ChainBroken {
460                expected: u64::from_be_bytes(running_hash[0..8].try_into().expect("8 bytes")),
461                changeset_prev: u64::from_be_bytes(entry.prev_hash[0..8].try_into().expect("8 bytes")),
462            });
463        }
464        running_hash = compute_entry_hash(&entry.prev_hash, &entry.payload);
465    }
466
467    // Read chain hash trailer if present
468    let chain_hash = if header.has_chain_hash() {
469        let trailer_start = body_end;
470        let trailer_end = trailer_start + CHAIN_HASH_TRAILER_SIZE;
471        if trailer_end > data.len() {
472            return Err(ChangesetError::Truncated {
473                needed: trailer_end,
474                available: data.len(),
475            });
476        }
477        let mut hash = [0u8; 32];
478        hash.copy_from_slice(&data[trailer_start..trailer_end]);
479
480        // Verify trailer matches computed chain
481        if hash != running_hash {
482            return Err(ChangesetError::ChecksumMismatch {
483                expected: u64::from_be_bytes(hash[0..8].try_into().expect("8 bytes")),
484                actual: u64::from_be_bytes(running_hash[0..8].try_into().expect("8 bytes")),
485            });
486        }
487        hash
488    } else {
489        running_hash
490    };
491
492    Ok(JournalSegment {
493        header,
494        entries,
495        chain_hash,
496    })
497}
498
499/// Build a chain of journal entries from opaque payloads.
500///
501/// Given a starting prev_hash and a sequence of (seq, payload) pairs,
502/// constructs JournalEntry values with correct prev_hash linkage.
503pub fn build_entry_chain(
504    start_prev_hash: [u8; 32],
505    payloads: Vec<(u64, Vec<u8>)>,
506) -> Vec<JournalEntry> {
507    let mut entries = Vec::with_capacity(payloads.len());
508    let mut prev_hash = start_prev_hash;
509
510    for (seq, payload) in payloads {
511        let entry = JournalEntry {
512            sequence: seq,
513            prev_hash,
514            payload: payload.clone(),
515        };
516        prev_hash = compute_entry_hash(&prev_hash, &payload);
517        entries.push(entry);
518    }
519
520    entries
521}
522
523/// Truncate a SHA-256 hash to u64 (for prev_segment_checksum compatibility).
524pub fn hash_to_u64(hash: &[u8; 32]) -> u64 {
525    u64::from_be_bytes(hash[0..8].try_into().expect("32 bytes"))
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    fn make_payloads(count: u64) -> Vec<(u64, Vec<u8>)> {
533        (1..=count)
534            .map(|seq| (seq, format!("query_{}", seq).into_bytes()))
535            .collect()
536    }
537
538    // --- Entry encode/decode ---
539
540    #[test]
541    fn test_entry_roundtrip() {
542        let entry = JournalEntry {
543            sequence: 42,
544            prev_hash: ZERO_HASH,
545            payload: b"CREATE TABLE foo (id INT)".to_vec(),
546        };
547
548        let encoded = encode_entry(&entry);
549        let (decoded, consumed) = decode_entry(&encoded, 0).unwrap();
550
551        assert_eq!(decoded, entry);
552        assert_eq!(consumed, encoded.len());
553    }
554
555    #[test]
556    fn test_entry_crc_detects_corruption() {
557        let entry = JournalEntry {
558            sequence: 1,
559            prev_hash: ZERO_HASH,
560            payload: b"hello".to_vec(),
561        };
562
563        let mut encoded = encode_entry(&entry);
564        // Corrupt a payload byte
565        let last = encoded.len() - 1;
566        encoded[last] ^= 0xFF;
567
568        let result = decode_entry(&encoded, 0);
569        assert!(result.is_err());
570    }
571
572    #[test]
573    fn test_entry_truncated() {
574        let entry = JournalEntry {
575            sequence: 1,
576            prev_hash: ZERO_HASH,
577            payload: b"data".to_vec(),
578        };
579
580        let encoded = encode_entry(&entry);
581        let result = decode_entry(&encoded[..ENTRY_HEADER_SIZE - 1], 0);
582        assert!(matches!(result, Err(ChangesetError::Truncated { .. })));
583    }
584
585    #[test]
586    fn test_entry_empty_payload() {
587        let entry = JournalEntry {
588            sequence: 1,
589            prev_hash: ZERO_HASH,
590            payload: vec![],
591        };
592
593        let encoded = encode_entry(&entry);
594        let (decoded, _) = decode_entry(&encoded, 0).unwrap();
595        assert_eq!(decoded.payload.len(), 0);
596    }
597
598    // --- Chain hash ---
599
600    #[test]
601    fn test_chain_hash_deterministic() {
602        let h1 = compute_entry_hash(&ZERO_HASH, b"hello");
603        let h2 = compute_entry_hash(&ZERO_HASH, b"hello");
604        assert_eq!(h1, h2);
605    }
606
607    #[test]
608    fn test_chain_hash_different_data() {
609        let h1 = compute_entry_hash(&ZERO_HASH, b"hello");
610        let h2 = compute_entry_hash(&ZERO_HASH, b"world");
611        assert_ne!(h1, h2);
612    }
613
614    #[test]
615    fn test_chain_hash_different_prev() {
616        let h1 = compute_entry_hash(&ZERO_HASH, b"hello");
617        let other_prev = compute_entry_hash(&ZERO_HASH, b"seed");
618        let h2 = compute_entry_hash(&other_prev, b"hello");
619        assert_ne!(h1, h2);
620    }
621
622    #[test]
623    fn test_chain_linkage() {
624        let entries = build_entry_chain(ZERO_HASH, make_payloads(3));
625
626        assert_eq!(entries[0].prev_hash, ZERO_HASH);
627        let h0 = compute_entry_hash(&ZERO_HASH, &entries[0].payload);
628        assert_eq!(entries[1].prev_hash, h0);
629        let h1 = compute_entry_hash(&h0, &entries[1].payload);
630        assert_eq!(entries[2].prev_hash, h1);
631    }
632
633    // --- Segment seal/encode/decode ---
634
635    #[test]
636    fn test_segment_roundtrip() {
637        let entries = build_entry_chain(ZERO_HASH, make_payloads(5));
638        let segment = seal(entries, 0);
639
640        assert_eq!(segment.header.first_seq, 1);
641        assert_eq!(segment.header.last_seq, 5);
642        assert_eq!(segment.header.entry_count, 5);
643        assert!(segment.header.is_sealed());
644        assert!(segment.header.has_chain_hash());
645
646        let encoded = encode(&segment);
647        let decoded = decode(&encoded).unwrap();
648
649        assert_eq!(decoded.header.first_seq, segment.header.first_seq);
650        assert_eq!(decoded.header.last_seq, segment.header.last_seq);
651        assert_eq!(decoded.header.entry_count, segment.header.entry_count);
652        assert_eq!(decoded.entries.len(), 5);
653        assert_eq!(decoded.chain_hash, segment.chain_hash);
654
655        for (orig, dec) in segment.entries.iter().zip(decoded.entries.iter()) {
656            assert_eq!(orig.sequence, dec.sequence);
657            assert_eq!(orig.payload, dec.payload);
658            assert_eq!(orig.prev_hash, dec.prev_hash);
659        }
660    }
661
662    #[test]
663    fn test_segment_single_entry() {
664        let entries = build_entry_chain(ZERO_HASH, vec![(1, b"single".to_vec())]);
665        let segment = seal(entries, 0);
666        let decoded = decode(&encode(&segment)).unwrap();
667        assert_eq!(decoded.entries.len(), 1);
668        assert_eq!(decoded.header.first_seq, 1);
669        assert_eq!(decoded.header.last_seq, 1);
670    }
671
672    #[test]
673    fn test_segment_large_payloads() {
674        let payloads: Vec<(u64, Vec<u8>)> = (1..=10)
675            .map(|seq| (seq, vec![seq as u8; 10_000]))
676            .collect();
677        let entries = build_entry_chain(ZERO_HASH, payloads);
678        let segment = seal(entries, 0);
679        let decoded = decode(&encode(&segment)).unwrap();
680        assert_eq!(decoded.entries.len(), 10);
681        for (i, entry) in decoded.entries.iter().enumerate() {
682            assert_eq!(entry.payload.len(), 10_000);
683            assert_eq!(entry.payload[0], (i + 1) as u8);
684        }
685    }
686
687    #[test]
688    fn test_segment_prev_segment_checksum() {
689        let entries = build_entry_chain(ZERO_HASH, make_payloads(3));
690        let segment = seal(entries, 0xDEADBEEF);
691        assert_eq!(segment.header.prev_segment_checksum, 0xDEADBEEF);
692
693        let decoded = decode(&encode(&segment)).unwrap();
694        assert_eq!(decoded.header.prev_segment_checksum, 0xDEADBEEF);
695    }
696
697    #[test]
698    fn test_segment_chain_across_segments() {
699        // Segment 1: entries 1-3
700        let entries1 = build_entry_chain(ZERO_HASH, make_payloads(3));
701        let seg1 = seal(entries1, 0);
702
703        // Segment 2: entries 4-6, chained from segment 1
704        let payloads2: Vec<(u64, Vec<u8>)> = (4..=6)
705            .map(|seq| (seq, format!("query_{}", seq).into_bytes()))
706            .collect();
707        let entries2 = build_entry_chain(seg1.chain_hash, payloads2);
708        let seg2 = seal(entries2, hash_to_u64(&seg1.chain_hash));
709
710        assert_eq!(seg2.header.prev_segment_checksum, hash_to_u64(&seg1.chain_hash));
711        assert_eq!(seg2.entries[0].prev_hash, seg1.chain_hash);
712
713        // Both decode independently
714        let dec1 = decode(&encode(&seg1)).unwrap();
715        let dec2 = decode(&encode(&seg2)).unwrap();
716        assert_eq!(dec1.chain_hash, seg1.chain_hash);
717        assert_eq!(dec2.entries[0].prev_hash, dec1.chain_hash);
718    }
719
720    // --- Negative tests ---
721
722    #[test]
723    fn test_decode_bad_magic() {
724        let entries = build_entry_chain(ZERO_HASH, make_payloads(1));
725        let segment = seal(entries, 0);
726        let mut encoded = encode(&segment);
727        encoded[0] = b'X';
728        assert!(matches!(decode(&encoded), Err(ChangesetError::InvalidMagic)));
729    }
730
731    #[test]
732    fn test_decode_bad_version() {
733        let entries = build_entry_chain(ZERO_HASH, make_payloads(1));
734        let segment = seal(entries, 0);
735        let mut encoded = encode(&segment);
736        encoded[5] = 99;
737        assert!(matches!(decode(&encoded), Err(ChangesetError::UnsupportedVersion(99))));
738    }
739
740    #[test]
741    fn test_decode_truncated_header() {
742        assert!(matches!(
743            decode(&[0u8; 10]),
744            Err(ChangesetError::Truncated { .. })
745        ));
746    }
747
748    #[test]
749    fn test_decode_truncated_body() {
750        let entries = build_entry_chain(ZERO_HASH, make_payloads(1));
751        let segment = seal(entries, 0);
752        let encoded = encode(&segment);
753        // Cut off body
754        assert!(matches!(
755            decode(&encoded[..HEADER_SIZE + 5]),
756            Err(ChangesetError::Truncated { .. })
757        ));
758    }
759
760    #[test]
761    fn test_decode_corrupted_body() {
762        let entries = build_entry_chain(ZERO_HASH, make_payloads(1));
763        let segment = seal(entries, 0);
764        let mut encoded = encode(&segment);
765        // Corrupt a byte in the body
766        encoded[HEADER_SIZE + 10] ^= 0xFF;
767        assert!(matches!(
768            decode(&encoded),
769            Err(ChangesetError::ChecksumMismatch { .. })
770        ));
771    }
772
773    #[test]
774    fn test_decode_corrupted_chain_trailer() {
775        let entries = build_entry_chain(ZERO_HASH, make_payloads(1));
776        let segment = seal(entries, 0);
777        let mut encoded = encode(&segment);
778        // Corrupt last byte (chain hash trailer)
779        let last = encoded.len() - 1;
780        encoded[last] ^= 0xFF;
781        assert!(matches!(
782            decode(&encoded),
783            Err(ChangesetError::ChecksumMismatch { .. })
784        ));
785    }
786
787    #[test]
788    fn test_decode_broken_entry_chain() {
789        // Manually create entries with broken chain
790        let entry1 = JournalEntry {
791            sequence: 1,
792            prev_hash: ZERO_HASH,
793            payload: b"first".to_vec(),
794        };
795        let entry2 = JournalEntry {
796            sequence: 2,
797            prev_hash: ZERO_HASH, // Wrong! Should be hash of entry1
798            payload: b"second".to_vec(),
799        };
800
801        // Build raw body manually (bypass seal which would compute correct hashes)
802        let mut body = Vec::new();
803        body.extend_from_slice(&encode_entry(&entry1));
804        body.extend_from_slice(&encode_entry(&entry2));
805
806        let body_checksum = {
807            let mut hasher = Sha256::new();
808            hasher.update(&body);
809            let result = hasher.finalize();
810            let mut cs = [0u8; 32];
811            cs.copy_from_slice(&result);
812            cs
813        };
814
815        let header = JournalHeader {
816            flags: FLAG_SEALED,
817            compression: COMPRESSION_NONE,
818            first_seq: 1,
819            last_seq: 2,
820            entry_count: 2,
821            body_len: body.len() as u64,
822            body_checksum,
823            prev_segment_checksum: 0,
824            created_ms: 0,
825        };
826
827        let mut buf = Vec::new();
828        buf.extend_from_slice(&encode_header(&header));
829        buf.extend_from_slice(&body);
830
831        assert!(matches!(
832            decode(&buf),
833            Err(ChangesetError::ChainBroken { .. })
834        ));
835    }
836
837    #[test]
838    #[should_panic(expected = "cannot seal an empty journal segment")]
839    fn test_seal_empty_panics() {
840        seal(vec![], 0);
841    }
842
843    // --- Compression tests (feature-gated) ---
844
845    #[cfg(feature = "journal")]
846    #[test]
847    fn test_compressed_roundtrip() {
848        let entries = build_entry_chain(ZERO_HASH, make_payloads(20));
849        let segment = seal(entries, 0);
850
851        let compressed = encode_compressed(&segment, 3);
852        let raw = encode(&segment);
853
854        // Compressed should be smaller (repetitive payloads compress well)
855        assert!(compressed.len() < raw.len());
856
857        let decoded = decode(&compressed).unwrap();
858        assert_eq!(decoded.entries.len(), 20);
859        assert_eq!(decoded.chain_hash, segment.chain_hash);
860        assert!(decoded.header.is_compressed());
861    }
862
863    #[cfg(feature = "journal")]
864    #[test]
865    fn test_compressed_large_payloads() {
866        let payloads: Vec<(u64, Vec<u8>)> = (1..=50)
867            .map(|seq| (seq, vec![0xAA; 1000]))
868            .collect();
869        let entries = build_entry_chain(ZERO_HASH, payloads);
870        let segment = seal(entries, 0);
871
872        let compressed = encode_compressed(&segment, 3);
873        let decoded = decode(&compressed).unwrap();
874        assert_eq!(decoded.entries.len(), 50);
875    }
876
877    // --- build_entry_chain ---
878
879    #[test]
880    fn test_build_entry_chain_empty() {
881        let entries = build_entry_chain(ZERO_HASH, vec![]);
882        assert!(entries.is_empty());
883    }
884
885    #[test]
886    fn test_build_entry_chain_custom_start_hash() {
887        let custom_hash = compute_entry_hash(&ZERO_HASH, b"seed");
888        let entries = build_entry_chain(custom_hash, make_payloads(2));
889        assert_eq!(entries[0].prev_hash, custom_hash);
890    }
891
892    // --- hash_to_u64 ---
893
894    #[test]
895    fn test_hash_to_u64() {
896        let hash = compute_entry_hash(&ZERO_HASH, b"test");
897        let val = hash_to_u64(&hash);
898        assert_ne!(val, 0);
899        // Deterministic
900        assert_eq!(val, hash_to_u64(&hash));
901    }
902}