batpak 0.9.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
//! Deterministic evidence report for a store fork operation.

use crate::evidence::{content_hash, sort_findings};
use crate::store::StoreError;
use serde::{Deserialize, Serialize};
use std::path::Path;

/// Report body schema version for fork evidence.
pub const FORK_EVIDENCE_REPORT_SCHEMA_VERSION: u16 = 1;

/// Hash alias for fork evidence report bodies.
pub type ForkEvidenceHash = [u8; 32];

/// Copy strategy actually used for a forked artifact.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ForkCopyStrategy {
    /// Filesystem copy-on-write clone.
    Reflink,
    /// Hardlink to an immutable sealed segment.
    Hardlink,
    /// Ordinary byte-for-byte file copy.
    DeepCopy,
}

/// Copy ladder preference for forked artifacts.
///
/// Selects how aggressively a fork shares storage with its source. Each rung
/// falls back to an ordinary deep copy when the preferred mechanism is not
/// available on the underlying filesystem.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum CopyPreference {
    /// Try a filesystem reflink, then a hardlink, then a deep copy.
    #[default]
    ReflinkThenHardlink,
    /// Skip reflinks; try a hardlink, then a deep copy.
    HardlinkOnly,
    /// Always perform an ordinary byte-for-byte deep copy.
    DeepCopyOnly,
}

/// Whether a snapshot/fork of a store with payload encryption active may proceed,
/// and how the keyset is treated.
///
/// Keys must never travel with the ciphertext they open — a copy that carried the
/// keyset could, after a `shred_scope`, be restored to resurrect crypto-shredded
/// data. So keyset portability is fail-closed by default; the opt-out produces a
/// keys-excluded copy whose keyset must be managed out-of-band. Inert on a store
/// without payload encryption.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum KeysetPolicy {
    /// Default. REFUSE to snapshot/fork a store with payload encryption active
    /// (returns `StoreError::KeysetNotPortable`). A copy without its keys is
    /// silently unrestorable; a copy WITH its keys would defeat crypto-shred.
    #[default]
    Refuse,
    /// Proceed WITHOUT the keyset: the encrypted segments are copied, the keyset
    /// is not, and the report is stamped `KeysExcluded`. The keyset must be
    /// carried out-of-band; restoring the copy without it reports
    /// `StoreError::KeysetMissing`.
    ExcludeKeys,
}

/// Caller options for [`crate::store::Store::fork_with_evidence`].
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[must_use]
pub struct ForkOptions {
    /// Copy ladder preference for immutable sealed segments.
    pub copy_preference: CopyPreference,
    /// Exclude regenerable cold-start caches (`index.ckpt`, `index.fbati`).
    pub exclude_caches: bool,
    /// Keyset portability policy for an encryption-active store. Default
    /// [`KeysetPolicy::Refuse`] fails closed; [`KeysetPolicy::ExcludeKeys`]
    /// proceeds with a keys-excluded fork. Inert without payload encryption.
    pub keyset_policy: KeysetPolicy,
}

impl Default for ForkOptions {
    fn default() -> Self {
        Self {
            copy_preference: CopyPreference::default(),
            exclude_caches: true,
            keyset_policy: KeysetPolicy::default(),
        }
    }
}

/// Count of fork decisions and copy strategies observed during one fork.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct ForkStrategyCounts {
    /// Number of files copied with reflink.
    pub reflink: usize,
    /// Number of files copied with hardlink.
    pub hardlink: usize,
    /// Number of files copied with ordinary deep copy.
    pub deep_copy: usize,
    /// Number of regenerable cache files excluded.
    pub cache_regenerable: usize,
    /// Number of store-shaped artifacts excluded.
    pub excluded: usize,
}

impl ForkStrategyCounts {
    pub(crate) fn record_copy(&mut self, strategy: ForkCopyStrategy) {
        match strategy {
            ForkCopyStrategy::Reflink => self.reflink += 1,
            ForkCopyStrategy::Hardlink => self.hardlink += 1,
            ForkCopyStrategy::DeepCopy => self.deep_copy += 1,
        }
    }

    pub(crate) fn record_cache_regenerable(&mut self) {
        self.cache_regenerable += 1;
    }

    pub(crate) fn record_excluded(&mut self) {
        self.excluded += 1;
    }
}

/// Deterministic structural findings for a fork attempt.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum ForkFinding {
    /// Existing fork destination artifacts were cleared before copying.
    DestinationCleared {
        /// Number of destination artifacts removed.
        artifact_count: usize,
    },
    /// The private visibility fence was explicitly cancelled after copying.
    FenceTokenCancelled,
    /// A regenerable cache file was intentionally excluded.
    CacheRegenerableExcluded {
        /// Stable file name.
        file_name: String,
    },
    /// A store-shaped file was intentionally excluded.
    FileExcluded {
        /// Stable file name.
        file_name: String,
        /// Stable reason string.
        reason: String,
    },
    /// A source artifact was copied, with the concrete strategy recorded.
    FileCopied {
        /// Stable file name.
        file_name: String,
        /// Strategy actually used.
        strategy: ForkCopyStrategy,
    },
    /// Payload encryption was active and the keyset was deliberately EXCLUDED
    /// from this fork under `KeysetPolicy::ExcludeKeys`. The fork carries the
    /// encrypted segments but no keys; the keyset must be managed out-of-band.
    KeysExcluded,
}

#[derive(Serialize)]
struct ForkStructuralFingerprint {
    schema_version: u16,
    fence_token: crate::store::SnapshotFenceTokenRef,
    source_watermark: crate::store::SnapshotWatermarkRef,
    active_segment_id: u64,
    shared_segment_ids_sorted: Vec<u64>,
    deep_copied_segment_ids_sorted: Vec<u64>,
    strategy_counts: ForkStrategyCounts,
    copied_visibility_ranges_present: bool,
    copied_pending_compaction_marker_present: bool,
    copied_idempotency_store_present: bool,
    destination_path_digest: ForkEvidenceHash,
}

fn fork_id_digest(
    fp: &ForkStructuralFingerprint,
) -> Result<ForkEvidenceHash, rmp_serde::encode::Error> {
    let bytes = crate::encoding::to_bytes(fp)?;
    Ok(content_hash(&bytes))
}

/// Deterministic report body for one store fork.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ForkReportBody {
    /// Report-body schema version.
    pub schema_version: u16,
    /// Stable digest over the structural fork core.
    pub fork_id: ForkEvidenceHash,
    /// Private visibility-fence token covering the copied segment set.
    pub fence_token: crate::store::SnapshotFenceTokenRef,
    /// Source segment watermark after draining the writer.
    pub source_watermark: crate::store::SnapshotWatermarkRef,
    /// Segment id classified as active at the fork boundary.
    pub active_segment_id: u64,
    /// Segment ids shared by reflink or hardlink, sorted ascending.
    pub shared_segment_ids_sorted: Vec<u64>,
    /// Segment ids deep-copied, sorted ascending.
    pub deep_copied_segment_ids_sorted: Vec<u64>,
    /// Counts of the strategies and exclusions observed.
    pub strategy_counts: ForkStrategyCounts,
    /// Whether `visibility_ranges.fbv` was copied.
    pub copied_visibility_ranges_present: bool,
    /// Whether the pending-compaction marker was copied.
    pub copied_pending_compaction_marker_present: bool,
    /// Whether the durable idempotency store (`index.idemp`) was copied.
    pub copied_idempotency_store_present: bool,
    /// Digest of the destination path bytes.
    pub destination_path_digest: ForkEvidenceHash,
    /// Structural findings sorted before `body_hash`.
    pub findings: Vec<ForkFinding>,
}

impl ForkReportBody {
    /// Full report-body digest, with findings sorted before encoding.
    ///
    /// # Errors
    /// MessagePack encoding failure from `rmp-serde`.
    pub fn body_hash(&self) -> Result<ForkEvidenceHash, rmp_serde::encode::Error> {
        fork_report_body_hash(self)
    }
}

/// Fork evidence report envelope.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ForkReport {
    /// Deterministic report body.
    pub body: ForkReportBody,
    /// Canonical hash of `body`.
    pub body_hash: ForkEvidenceHash,
    /// Optional generation timestamp metadata outside deterministic identity.
    pub generated_at_unix_ms: Option<u64>,
    /// Optional producer version metadata outside deterministic identity.
    pub batpak_version: Option<String>,
    /// Optional diagnostics outside deterministic identity.
    pub diagnostics: Vec<String>,
}

/// Canonical `body_hash` over a fork report body.
///
/// # Errors
/// MessagePack encoding failure from `rmp-serde`.
pub fn fork_report_body_hash(
    body: &ForkReportBody,
) -> Result<ForkEvidenceHash, rmp_serde::encode::Error> {
    let mut body = body.clone();
    sort_findings(&mut body.findings);
    let bytes = crate::encoding::to_bytes(&body)?;
    Ok(content_hash(&bytes))
}

pub(crate) fn destination_path_digest(dest: &Path) -> ForkEvidenceHash {
    content_hash(dest.as_os_str().as_encoded_bytes())
}

pub(crate) struct ForkReportInput {
    pub(crate) fence_token: u64,
    pub(crate) source_watermark_segment_id: u64,
    pub(crate) source_watermark_offset: u64,
    pub(crate) active_segment_id: u64,
    pub(crate) shared_segment_ids_sorted: Vec<u64>,
    pub(crate) deep_copied_segment_ids_sorted: Vec<u64>,
    pub(crate) strategy_counts: ForkStrategyCounts,
    pub(crate) copied_visibility_ranges_present: bool,
    pub(crate) copied_pending_compaction_marker_present: bool,
    pub(crate) copied_idempotency_store_present: bool,
    pub(crate) destination_path_digest: ForkEvidenceHash,
    pub(crate) findings: Vec<ForkFinding>,
}

pub(crate) fn fork_evidence_report(
    input: ForkReportInput,
) -> Result<ForkReport, rmp_serde::encode::Error> {
    let fence_token = crate::store::SnapshotFenceTokenRef {
        token: input.fence_token,
    };
    let source_watermark = crate::store::SnapshotWatermarkRef {
        segment_id: input.source_watermark_segment_id,
        offset: input.source_watermark_offset,
    };
    let mut shared_segment_ids_sorted = input.shared_segment_ids_sorted;
    shared_segment_ids_sorted.sort_unstable();
    let mut deep_copied_segment_ids_sorted = input.deep_copied_segment_ids_sorted;
    deep_copied_segment_ids_sorted.sort_unstable();
    let mut findings = input.findings;
    sort_findings(&mut findings);

    let fp = ForkStructuralFingerprint {
        schema_version: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
        fence_token,
        source_watermark,
        active_segment_id: input.active_segment_id,
        shared_segment_ids_sorted: shared_segment_ids_sorted.clone(),
        deep_copied_segment_ids_sorted: deep_copied_segment_ids_sorted.clone(),
        strategy_counts: input.strategy_counts,
        copied_visibility_ranges_present: input.copied_visibility_ranges_present,
        copied_pending_compaction_marker_present: input.copied_pending_compaction_marker_present,
        copied_idempotency_store_present: input.copied_idempotency_store_present,
        destination_path_digest: input.destination_path_digest,
    };
    let fork_id = fork_id_digest(&fp)?;
    let body = ForkReportBody {
        schema_version: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
        fork_id,
        fence_token,
        source_watermark,
        active_segment_id: input.active_segment_id,
        shared_segment_ids_sorted,
        deep_copied_segment_ids_sorted,
        strategy_counts: input.strategy_counts,
        copied_visibility_ranges_present: input.copied_visibility_ranges_present,
        copied_pending_compaction_marker_present: input.copied_pending_compaction_marker_present,
        copied_idempotency_store_present: input.copied_idempotency_store_present,
        destination_path_digest: input.destination_path_digest,
        findings,
    };
    let body_hash = fork_report_body_hash(&body)?;
    Ok(ForkReport {
        body,
        body_hash,
        generated_at_unix_ms: None,
        batpak_version: None,
        diagnostics: Vec::new(),
    })
}

/// Wire magic for compat-matrix and forward-compat gates (`fork_evidence.fbev`).
pub(crate) const FORK_EVIDENCE_WIRE_MAGIC: &[u8; 6] = b"FBATFE";

/// Encode a fork evidence report body using the shared compat wire framing:
/// `magic(6) | version(2 le) | crc32(4 le over body) | body(to_vec_named)`.
///
/// # Errors
/// MessagePack encoding failure from `rmp-serde`.
pub fn encode_fork_evidence_wire(body: &ForkReportBody) -> Result<Vec<u8>, StoreError> {
    let body_bytes = crate::encoding::to_bytes(body)
        .map_err(|error| StoreError::Serialization(Box::new(error)))?;
    let crc = crc32fast::hash(&body_bytes);
    let mut bytes = Vec::with_capacity(12 + body_bytes.len());
    bytes.extend_from_slice(FORK_EVIDENCE_WIRE_MAGIC);
    bytes.extend_from_slice(&body.schema_version.to_le_bytes());
    bytes.extend_from_slice(&crc.to_le_bytes());
    bytes.extend_from_slice(&body_bytes);
    Ok(bytes)
}

/// Decode a fork evidence report body from compat wire framing.
///
/// # Errors
/// Returns [`StoreError::ForkEvidenceFutureVersion`] when the wire or body schema
/// version exceeds [`FORK_EVIDENCE_REPORT_SCHEMA_VERSION`], or a configuration /
/// serialization error for corrupt framing.
pub fn decode_fork_evidence_wire(bytes: &[u8]) -> Result<ForkReportBody, StoreError> {
    if bytes.len() < 12 || bytes.get(..6) != Some(FORK_EVIDENCE_WIRE_MAGIC) {
        return Err(StoreError::Configuration(
            "fork evidence wire framing is invalid".into(),
        ));
    }
    let found = u16::from_le_bytes(
        bytes[6..8]
            .try_into()
            .map_err(|_| StoreError::Configuration("fork evidence version slice".into()))?,
    );
    if found > FORK_EVIDENCE_REPORT_SCHEMA_VERSION {
        return Err(StoreError::ForkEvidenceFutureVersion {
            found,
            supported: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
        });
    }
    let expected_crc = u32::from_le_bytes(
        bytes[8..12]
            .try_into()
            .map_err(|_| StoreError::Configuration("fork evidence crc slice".into()))?,
    );
    let body_bytes = &bytes[12..];
    if crc32fast::hash(body_bytes) != expected_crc {
        return Err(StoreError::Configuration(
            "fork evidence wire crc mismatch".into(),
        ));
    }
    let body: ForkReportBody = crate::encoding::from_bytes(body_bytes)
        .map_err(|error| StoreError::Serialization(Box::new(error)))?;
    if body.schema_version > FORK_EVIDENCE_REPORT_SCHEMA_VERSION {
        return Err(StoreError::ForkEvidenceFutureVersion {
            found: body.schema_version,
            supported: FORK_EVIDENCE_REPORT_SCHEMA_VERSION,
        });
    }
    Ok(body)
}

#[cfg(test)]
#[path = "fork_report_mutation_kill.rs"]
mod fork_report_mutation_kill;

#[cfg(test)]
mod tests {
    use super::{decode_fork_evidence_wire, FORK_EVIDENCE_WIRE_MAGIC};
    use crate::store::StoreError;

    #[test]
    fn decode_rejects_a_short_frame_before_reading_its_version() {
        // The framing guard is `bytes.len() < 12 || magic-mismatch`. Replacing the
        // `||` with `&&` would require BOTH a short buffer AND a bad magic to
        // reject, so a too-short buffer that DOES carry the magic would slip past
        // the length check. We feed exactly that: 10 bytes = magic(6) + a version
        // field (0xFFFF) far above the supported schema. The real code rejects on
        // length with Configuration; the `&&` mutant skips the length check and
        // instead trips the future-version branch — a different error variant.
        let mut bytes = Vec::new();
        bytes.extend_from_slice(FORK_EVIDENCE_WIRE_MAGIC); // 6 bytes
        bytes.extend_from_slice(&u16::MAX.to_le_bytes()); // version 0xFFFF
        bytes.extend_from_slice(&[0u8, 0u8]); // pad to len 10 (< 12)

        let err = decode_fork_evidence_wire(&bytes)
            .expect_err("a 10-byte frame is too short to be a valid fork-evidence wire");
        assert!(
            matches!(err, StoreError::Configuration(_)),
            "a sub-12-byte frame must be rejected on LENGTH as Configuration, not \
             routed to a later branch; got {err:?}"
        );
    }
}