batpak 0.8.0

Event sourcing with causal graphs and caller-defined gates. Sync API, no async runtime.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
//! Batpak Substrate Closure attested registry row: stable row identity, canonical row body digest,
//! lifecycle and supersession pointers, drift evidence, and verification reports that compose
//! [`crate::artifact::CanonicalArtifactEnvelope`] without importing [`crate::store`].
//!
//! Public evidence bodies use [`crate::encoding::to_bytes`] (the [`crate::canonical`] alias) for byte identity.
//! Callers supply opaque `row_kind`, `opaque_payload`, and `named_digests`; batpak does not interpret
//! protocol or application meaning in those fields.

use crate::artifact::{
    verify_canonical_artifact_envelope, ArtifactHash, ArtifactVerificationReport,
    CanonicalArtifactEnvelope, SignatureRef,
};
use crate::evidence::{content_hash, sort_findings, sorted_findings};
use serde::{Deserialize, Serialize};

/// Schema version baked into canonical [`RegistryRowBody`] encoding.
pub const REGISTRY_ROW_BODY_SCHEMA_VERSION: u32 = 1;

/// Schema version for canonical [`RegistryDriftReportBody`].
pub const REGISTRY_DRIFT_REPORT_SCHEMA_VERSION: u32 = 1;

/// Schema version for canonical [`RegistryVerificationReport`].
pub const REGISTRY_VERIFICATION_REPORT_SCHEMA_VERSION: u32 = 1;

/// Row published but not yet active ([`RegistryRowBody::lifecycle`]).
pub const REGISTRY_LIFECYCLE_ANNOUNCED: u32 = 0;
/// Active row ([`RegistryRowBody::lifecycle`]).
pub const REGISTRY_LIFECYCLE_LIVE: u32 = 1;
/// Superseded or slated for removal; still discoverable ([`RegistryRowBody::lifecycle`]).
pub const REGISTRY_LIFECYCLE_DEPRECATED: u32 = 2;
/// Retired; structural checks flag `supersedes` on removed rows ([`RegistryRowBody::lifecycle`]).
pub const REGISTRY_LIFECYCLE_REMOVED: u32 = 3;

/// Stable opaque row identifier (digest-sized).
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct RegistryRowId(pub ArtifactHash);

/// Named digest anchor (sorted before row body hashing).
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct NamedDigest {
    /// Caller-defined stable name (sorted lexicographically with ties broken by digest).
    pub name: String,
    /// Content digest for the named attachment or sidecar.
    pub digest: ArtifactHash,
}

/// Canonical immutable row **body** (hashed for `row_hash`; envelope fields stay outside).
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RegistryRowBody {
    /// Must equal [`REGISTRY_ROW_BODY_SCHEMA_VERSION`] for v1 hashing helpers.
    pub schema_version: u32,
    /// Stable row identity (included in the body so the digest commits to the id).
    pub row_id: RegistryRowId,
    /// Opaque kind discriminant for caller registries.
    pub row_kind: u64,
    /// Layout version for `opaque_payload` interpretation (caller-owned).
    pub row_layout_version: u32,
    /// Opaque payload bytes (caller-owned).
    pub opaque_payload: Vec<u8>,
    /// Named digests; normalized to sorted order before hashing.
    pub named_digests: Vec<NamedDigest>,
    /// Lifecycle lane; must be one of [`REGISTRY_LIFECYCLE_ANNOUNCED`], [`REGISTRY_LIFECYCLE_LIVE`],
    /// [`REGISTRY_LIFECYCLE_DEPRECATED`], or [`REGISTRY_LIFECYCLE_REMOVED`] for clean verification.
    pub lifecycle: u32,
    /// Optional prior row this entry supersedes.
    pub supersedes: Option<RegistryRowId>,
}

/// Normalize row body for canonical digest (sorts `named_digests`).
#[must_use]
pub fn normalize_registry_row_body(body: &RegistryRowBody) -> RegistryRowBody {
    let mut named_digests = body.named_digests.clone();
    named_digests.sort();
    RegistryRowBody {
        named_digests,
        ..body.clone()
    }
}

/// Canonical MessagePack bytes for the normalized row body (same encoding plane as
/// [`crate::artifact::artifact_body_bytes`] on the normalized [`RegistryRowBody`]).
///
/// # Errors
/// MessagePack encode failure from `rmp-serde`.
pub fn registry_row_body_bytes(
    body: &RegistryRowBody,
) -> Result<Vec<u8>, rmp_serde::encode::Error> {
    let normalized = normalize_registry_row_body(body);
    crate::encoding::to_bytes(&normalized)
}

/// Digest of canonical normalized row body bytes.
///
/// # Errors
/// MessagePack encode failure from `rmp-serde`.
pub fn registry_row_body_hash(
    body: &RegistryRowBody,
) -> Result<ArtifactHash, rmp_serde::encode::Error> {
    let bytes = registry_row_body_bytes(body)?;
    Ok(content_hash(&bytes))
}

/// Structural drift finding between expected and observed registries.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum RegistryDriftFinding {
    /// Expected row missing from observed set.
    MissingRow {
        /// Row id absent on the observed side.
        row_id: RegistryRowId,
    },
    /// Observed row not present in expected set.
    ExtraRow {
        /// Row id only on the observed side.
        row_id: RegistryRowId,
    },
    /// Same `row_id` but digest mismatch.
    HashMismatch {
        /// Conflicting row id.
        row_id: RegistryRowId,
        /// Expected canonical row hash.
        expected: ArtifactHash,
        /// Observed canonical row hash.
        observed: ArtifactHash,
    },
}

/// Deterministic drift report **body** (hash with [`registry_drift_report_body_hash`]).
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RegistryDriftReportBody {
    /// Must equal [`REGISTRY_DRIFT_REPORT_SCHEMA_VERSION`] for v1.
    pub schema_version: u32,
    /// Lexicographically sorted `(row_id, row_hash)` expected side.
    pub expected: Vec<(RegistryRowId, ArtifactHash)>,
    /// Lexicographically sorted `(row_id, row_hash)` observed side.
    pub observed: Vec<(RegistryRowId, ArtifactHash)>,
    /// Drift findings (sorted before body hash).
    pub findings: Vec<RegistryDriftFinding>,
}

/// Sort `(row_id, hash)` pairs by row id then hash.
pub fn sort_registry_row_hash_pairs(pairs: &mut [(RegistryRowId, ArtifactHash)]) {
    pairs.sort_by(|a, b| a.0.cmp(&b.0).then_with(|| a.1.cmp(&b.1)));
}

/// Build drift findings from sorted expected/observed maps (must be sorted, see [`sort_registry_row_hash_pairs`]).
#[must_use]
pub fn registry_drift_findings_sorted(
    expected: &[(RegistryRowId, ArtifactHash)],
    observed: &[(RegistryRowId, ArtifactHash)],
) -> Vec<RegistryDriftFinding> {
    let mut i = 0usize;
    let mut j = 0usize;
    let mut out = Vec::new();
    while i < expected.len() && j < observed.len() {
        match expected[i].0.cmp(&observed[j].0) {
            std::cmp::Ordering::Less => {
                out.push(RegistryDriftFinding::MissingRow {
                    row_id: expected[i].0,
                });
                i += 1;
            }
            std::cmp::Ordering::Greater => {
                out.push(RegistryDriftFinding::ExtraRow {
                    row_id: observed[j].0,
                });
                j += 1;
            }
            std::cmp::Ordering::Equal => {
                if expected[i].1 != observed[j].1 {
                    out.push(RegistryDriftFinding::HashMismatch {
                        row_id: expected[i].0,
                        expected: expected[i].1,
                        observed: observed[j].1,
                    });
                }
                i += 1;
                j += 1;
            }
        }
    }
    while i < expected.len() {
        out.push(RegistryDriftFinding::MissingRow {
            row_id: expected[i].0,
        });
        i += 1;
    }
    while j < observed.len() {
        out.push(RegistryDriftFinding::ExtraRow {
            row_id: observed[j].0,
        });
        j += 1;
    }
    sort_findings(&mut out);
    out
}

/// Deterministic digest over drift report body (sorts `findings` clone).
///
/// # Errors
/// MessagePack encode failure from `rmp-serde`.
pub fn registry_drift_report_body_hash(
    report: &RegistryDriftReportBody,
) -> Result<ArtifactHash, rmp_serde::encode::Error> {
    let findings = sorted_findings(&report.findings);
    let mut expected = report.expected.clone();
    let mut observed = report.observed.clone();
    sort_registry_row_hash_pairs(&mut expected);
    sort_registry_row_hash_pairs(&mut observed);
    let normalized = RegistryDriftReportBody {
        expected,
        observed,
        findings,
        ..report.clone()
    };
    let bytes = crate::encoding::to_bytes(&normalized)?;
    Ok(content_hash(&bytes))
}

/// Registry-specific verification finding layered on [`ArtifactVerificationReport`].
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum RegistryVerificationFinding {
    /// [`RegistryRowBody::schema_version`] is not supported by these v1 helpers.
    UnsupportedRowSchemaVersion {
        /// Row id from the decoded body.
        row_id: RegistryRowId,
        /// Observed row body schema version.
        observed: u32,
        /// Supported row body schema version.
        expected: u32,
    },
    /// [`RegistryRowBody::lifecycle`] is not one of the `REGISTRY_LIFECYCLE_*` constants.
    InvalidLifecycle {
        /// Row id from the decoded body.
        row_id: RegistryRowId,
        /// Raw discriminant observed.
        lifecycle: u32,
    },
    /// Declared row hash does not match recomputed canonical body hash.
    RowHashMismatch {
        /// Row id from the body.
        row_id: RegistryRowId,
        /// Caller-claimed digest.
        claimed: ArtifactHash,
        /// Recomputed digest from [`registry_row_body_hash`].
        computed: ArtifactHash,
    },
    /// [`RegistryRowBody::row_id`] disagrees with the claim.
    RowIdMismatch {
        /// Identity in the body.
        body_row_id: RegistryRowId,
        /// Identity supplied by caller.
        claimed_row_id: RegistryRowId,
    },
}

/// Full-stack verification report for an attested row envelope plus structural checks.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct RegistryVerificationReport {
    /// Must equal [`REGISTRY_VERIFICATION_REPORT_SCHEMA_VERSION`] for v1 hashing.
    pub schema_version: u32,
    /// Signature and envelope digest plane from [`verify_canonical_artifact_envelope`].
    pub envelope_plane: ArtifactVerificationReport,
    /// Registry-only findings (sorted before [`registry_verification_report_body_hash`]).
    pub findings: Vec<RegistryVerificationFinding>,
}

/// Deterministic digest over [`RegistryVerificationReport`] (sorts `findings` clone).
///
/// # Errors
/// MessagePack encode failure from `rmp-serde`.
pub fn registry_verification_report_body_hash(
    report: &RegistryVerificationReport,
) -> Result<ArtifactHash, rmp_serde::encode::Error> {
    let findings = sorted_findings(&report.findings);
    let mut envelope_plane = report.envelope_plane.clone();
    sort_findings(&mut envelope_plane.findings);
    let normalized = RegistryVerificationReport {
        envelope_plane,
        findings,
        ..report.clone()
    };
    let bytes = crate::encoding::to_bytes(&normalized)?;
    Ok(content_hash(&bytes))
}

/// Supersession graph finding across a closed catalog of row bodies (sorted by `row_id` before calling).
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum RegistrySupersessionFinding {
    /// `supersedes` points to a row id not present in `catalog`.
    DanglingSupersedes {
        /// Row declaring the pointer.
        from: RegistryRowId,
        /// Missing target id.
        target: RegistryRowId,
    },
    /// A `Removed` row still declares a `supersedes` edge (structural hygiene).
    RemovedDeclaresSupersedes {
        /// Offending row id.
        from: RegistryRowId,
    },
    /// Same `row_id` appears more than once in the sorted catalog input.
    DuplicateRowId {
        /// Repeated id (second and later occurrences are ignored by merge-walk callers).
        row_id: RegistryRowId,
    },
    /// Following `supersedes` edges within the catalog revisits a row on the path.
    SupersedesCycle {
        /// Edge that closes or participates in a cycle (first edge found in stable walk order).
        edge_from: RegistryRowId,
        /// Head of the back edge (already on the active walk stack).
        edge_to: RegistryRowId,
    },
}

/// Deterministic supersession audit over a catalog keyed by [`RegistryRowId`].
///
/// `catalog` must be sorted by ascending `row_id` and must contain unique ids.
#[must_use]
pub fn registry_supersession_findings_sorted(
    catalog: &[(RegistryRowId, RegistryRowBody)],
) -> Vec<RegistrySupersessionFinding> {
    let mut out = Vec::new();
    for w in catalog.windows(2) {
        if w[0].0 == w[1].0 {
            out.push(RegistrySupersessionFinding::DuplicateRowId { row_id: w[1].0 });
        }
    }
    let id_set: std::collections::BTreeSet<RegistryRowId> =
        catalog.iter().map(|(id, _)| *id).collect();
    let by_id: std::collections::BTreeMap<RegistryRowId, &RegistryRowBody> =
        catalog.iter().map(|(id, body)| (*id, body)).collect();

    for (id, body) in catalog {
        if let Some(target) = body.supersedes {
            if !id_set.contains(&target) {
                out.push(RegistrySupersessionFinding::DanglingSupersedes { from: *id, target });
            }
        }
        if body.lifecycle == REGISTRY_LIFECYCLE_REMOVED && body.supersedes.is_some() {
            out.push(RegistrySupersessionFinding::RemovedDeclaresSupersedes { from: *id });
        }
    }

    let mut cycle_edges: std::collections::BTreeSet<(RegistryRowId, RegistryRowId)> =
        std::collections::BTreeSet::new();
    for &(start, _) in catalog {
        let mut path: Vec<RegistryRowId> = Vec::new();
        supersession_walk_for_cycles(&by_id, start, &mut path, &mut cycle_edges);
    }
    for edge in cycle_edges {
        out.push(RegistrySupersessionFinding::SupersedesCycle {
            edge_from: edge.0,
            edge_to: edge.1,
        });
    }
    out.sort();
    out
}

fn supersession_walk_for_cycles(
    by_id: &std::collections::BTreeMap<RegistryRowId, &RegistryRowBody>,
    cur: RegistryRowId,
    path: &mut Vec<RegistryRowId>,
    cycle_edges: &mut std::collections::BTreeSet<(RegistryRowId, RegistryRowId)>,
) {
    if path.contains(&cur) {
        if let Some(&prev) = path.last() {
            cycle_edges.insert((prev, cur));
        }
        return;
    }
    path.push(cur);
    if let Some(body) = by_id.get(&cur) {
        if let Some(next) = body.supersedes {
            if by_id.contains_key(&next) {
                supersession_walk_for_cycles(by_id, next, path, cycle_edges);
            }
        }
    }
    path.pop();
}

/// Verify signatures on a canonical envelope whose body is [`RegistryRowBody`], then apply structural checks.
///
/// `claimed_row_id` and `claimed_row_hash` let callers pin identity to the envelope body and digest.
///
/// # Errors
/// MessagePack encode failure while hashing or verifying the body.
pub fn verify_registry_attested_row<F>(
    envelope: &CanonicalArtifactEnvelope<RegistryRowBody>,
    claimed_row_id: RegistryRowId,
    claimed_row_hash: ArtifactHash,
    verify_signature: F,
) -> Result<RegistryVerificationReport, rmp_serde::encode::Error>
where
    F: FnMut(&SignatureRef, &[u8]) -> Result<(), String>,
{
    let normalized_body = normalize_registry_row_body(&envelope.body);
    let envelope_norm = CanonicalArtifactEnvelope {
        body: normalized_body,
        envelope_schema_version: envelope.envelope_schema_version,
        generated_at_wall_ms: envelope.generated_at_wall_ms,
        diagnostic_note: envelope.diagnostic_note.clone(),
        signatures: envelope.signatures.clone(),
        attestations: envelope.attestations.clone(),
    };
    let envelope_plane = verify_canonical_artifact_envelope(&envelope_norm, verify_signature)?;
    let mut findings = Vec::new();

    let body = &envelope_norm.body;
    if body.schema_version != REGISTRY_ROW_BODY_SCHEMA_VERSION {
        findings.push(RegistryVerificationFinding::UnsupportedRowSchemaVersion {
            row_id: body.row_id,
            observed: body.schema_version,
            expected: REGISTRY_ROW_BODY_SCHEMA_VERSION,
        });
    }

    if body.row_id != claimed_row_id {
        findings.push(RegistryVerificationFinding::RowIdMismatch {
            body_row_id: body.row_id,
            claimed_row_id,
        });
    }

    let computed = registry_row_body_hash(body)?;
    if computed != claimed_row_hash {
        findings.push(RegistryVerificationFinding::RowHashMismatch {
            row_id: body.row_id,
            claimed: claimed_row_hash,
            computed,
        });
    }

    if body.lifecycle != REGISTRY_LIFECYCLE_ANNOUNCED
        && body.lifecycle != REGISTRY_LIFECYCLE_LIVE
        && body.lifecycle != REGISTRY_LIFECYCLE_DEPRECATED
        && body.lifecycle != REGISTRY_LIFECYCLE_REMOVED
    {
        findings.push(RegistryVerificationFinding::InvalidLifecycle {
            row_id: body.row_id,
            lifecycle: body.lifecycle,
        });
    }

    sort_findings(&mut findings);

    Ok(RegistryVerificationReport {
        schema_version: REGISTRY_VERIFICATION_REPORT_SCHEMA_VERSION,
        envelope_plane,
        findings,
    })
}

/// Expose canonical body bytes used for signature verification (normalized row body).
///
/// # Errors
/// MessagePack encode failure from `rmp-serde`.
pub fn registry_row_signing_bytes(
    body: &RegistryRowBody,
) -> Result<Vec<u8>, rmp_serde::encode::Error> {
    registry_row_body_bytes(body)
}

/// Verify signatures treating the registry row body as the signed payload (same bytes as [`registry_row_body_hash`] input plane).
///
/// # Errors
/// MessagePack encode failure from `rmp-serde`.
pub fn verify_registry_row_signatures_only<F>(
    envelope: &CanonicalArtifactEnvelope<RegistryRowBody>,
    verify_signature: F,
) -> Result<ArtifactVerificationReport, rmp_serde::encode::Error>
where
    F: FnMut(&SignatureRef, &[u8]) -> Result<(), String>,
{
    let normalized_body = normalize_registry_row_body(&envelope.body);
    let envelope_norm = CanonicalArtifactEnvelope {
        body: normalized_body,
        envelope_schema_version: envelope.envelope_schema_version,
        generated_at_wall_ms: envelope.generated_at_wall_ms,
        diagnostic_note: envelope.diagnostic_note.clone(),
        signatures: envelope.signatures.clone(),
        attestations: envelope.attestations.clone(),
    };
    verify_canonical_artifact_envelope(&envelope_norm, verify_signature)
}

/// `true` when canonical row bytes match [`crate::artifact::artifact_body_bytes`] on the normalized body.
///
/// # Errors
/// MessagePack encode failure from `rmp-serde`.
pub fn registry_row_body_hash_matches_signing_bytes(
    body: &RegistryRowBody,
) -> Result<bool, rmp_serde::encode::Error> {
    let n = normalize_registry_row_body(body);
    let a = crate::artifact::artifact_body_bytes(&n)?;
    let b = registry_row_body_bytes(body)?;
    Ok(a == b)
}