1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
// SPDX-License-Identifier: Apache-2.0
//! StateVisibility — a declaration that a state (commit) carries a
//! non-public audience tier.
//!
//! Modeled on [`Redaction`](crate::object::Redaction): an *additive*
//! sidecar record that lives outside the hashed `State` bytes, so changing a
//! state's tier never mutates the state or invalidates its signature. The
//! record is keyed by `ChangeId` (the state), not by a blob hash — commit
//! visibility is a per-state property, where redaction is per-blob.
//!
//! **Absence ≡ public.** A public resolution stays record-free: the public
//! tier is the default, and a state with no `StateVisibility` record is
//! served to every audience. Only resolutions more restrictive than public
//! are persisted, so the per-state sidecar is empty for the common case.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::object::{ChangeId, ContentHash, Principal, StateSignature, VisibilityTier};
/// Stable byte prefix the signing payload begins with. Bumping this versions
/// the payload format itself; old signatures with the old prefix continue to
/// verify exactly as they did when written.
pub const STATE_VISIBILITY_SIGNING_PAYLOAD_VERSION_TAG: &[u8] = b"hd-statevis-v1\x00";
/// A visibility-tier declaration on a single state.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StateVisibility {
/// The state (commit) this tier applies to.
pub state: ChangeId,
/// The audience tier the state's content is served at.
pub tier: VisibilityTier,
/// When set, the host materializes a superseding public record at this
/// instant (auto-promote). Advisory schedule only — the *effective*
/// tier is always read from the persisted records, never recomputed
/// from wall-clock at read time.
#[serde(default)]
pub embargo_until: Option<DateTime<Utc>>,
/// Who declared the tier.
pub declarer: Principal,
/// When the tier was declared. RFC3339 at the wire boundary;
/// `DateTime<Utc>` internally.
pub declared_at: DateTime<Utc>,
/// Optional cryptographic signature over the canonical signing payload
/// (see [`canonical_signing_payload`](StateVisibility::canonical_signing_payload)).
/// `None` for unsigned declarations.
#[serde(default)]
pub signature: Option<StateSignature>,
/// The record this one supersedes, if any — promotion appends a
/// superseding record rather than mutating a prior one. Identified by
/// the prior record's content hash.
#[serde(default)]
pub supersedes: Option<ContentHash>,
}
impl StateVisibility {
/// Build the canonical bytes a signer covers. The `signature` field is
/// intentionally excluded (a signature can't sign itself).
pub fn canonical_signing_payload(&self) -> Vec<u8> {
let mut buf = Vec::with_capacity(128);
buf.extend_from_slice(STATE_VISIBILITY_SIGNING_PAYLOAD_VERSION_TAG);
buf.extend_from_slice(self.state.as_bytes());
buf.extend_from_slice(self.tier.as_str().as_bytes());
buf.push(0);
match &self.tier {
VisibilityTier::TeamScoped { team_id } => buf.extend_from_slice(team_id.as_bytes()),
VisibilityTier::Restricted { scope_label }
| VisibilityTier::Private { scope_label } => {
buf.extend_from_slice(scope_label.as_bytes())
}
VisibilityTier::Public | VisibilityTier::Internal => {}
}
buf.push(0);
if let Some(embargo_until) = &self.embargo_until {
buf.extend_from_slice(embargo_until.to_rfc3339().as_bytes());
}
buf.push(0);
buf.extend_from_slice(self.declarer.name.as_bytes());
buf.push(0);
buf.extend_from_slice(self.declarer.email.as_bytes());
buf.push(0);
buf.extend_from_slice(self.declared_at.to_rfc3339().as_bytes());
if let Some(supersedes) = &self.supersedes {
buf.extend_from_slice(supersedes.as_bytes());
}
buf
}
/// Per-item validation hook required by [`versioned_msgpack_blob!`].
/// A `TeamScoped`/`Restricted` tier must carry a non-empty label —
/// an empty label is meaningless and would silently widen the
/// audience to "any team / any restricted scope".
pub fn validate(&self) -> Result<(), StateVisibilityError> {
match &self.tier {
VisibilityTier::TeamScoped { team_id } if team_id.trim().is_empty() => {
Err(StateVisibilityError::EmptyTierLabel("team_scoped"))
}
VisibilityTier::Restricted { scope_label } if scope_label.trim().is_empty() => {
Err(StateVisibilityError::EmptyTierLabel("restricted"))
}
VisibilityTier::Private { scope_label } if scope_label.trim().is_empty() => {
Err(StateVisibilityError::EmptyTierLabel("private"))
}
_ => Ok(()),
}
}
/// Content-addressed id of this record: `blake3` over the canonical
/// rmp-encoded bytes of a one-element [`StateVisibilityBlob`]. This is the
/// id a superseding record stores in its [`supersedes`](Self::supersedes)
/// pointer, and the key [`StateVisibilityBlob::latest`] resolves the
/// supersede chain by — so the write path (which sets `supersedes` from the
/// under-lock head) and the read path (which walks the chain) agree by
/// construction. The id covers the single record's bytes embedded in the
/// versioned envelope, so it stays stable across schema additions that only
/// extend the container.
pub fn content_hash(&self) -> Result<ContentHash, StateVisibilityError> {
let single = StateVisibilityBlob::new(vec![self.clone()]);
let bytes = single.encode()?;
Ok(ContentHash::from_bytes(*blake3::hash(&bytes).as_bytes()))
}
}
/// On-disk blob containing all visibility records for a single state. One
/// file per state, encoded with `rmp-serde` — mirrors the
/// [`RedactionsBlob`](crate::object::RedactionsBlob) sidecar pattern.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct StateVisibilityBlob {
pub format_version: u8,
pub records: Vec<StateVisibility>,
}
// `new` / `encode` / `decode` / `validate` + `FORMAT_VERSION`. `decode`
// rejects any blob whose `format_version` isn't the current one.
versioned_msgpack_blob! {
blob: StateVisibilityBlob,
item: StateVisibility,
field: records,
error: StateVisibilityError,
codec_err: Codec,
version: 1,
}
impl StateVisibilityBlob {
pub fn empty() -> Self {
Self::new(Vec::new())
}
pub fn push(&mut self, record: StateVisibility) {
self.records.push(record);
}
/// `true` iff this state carries any visibility record — i.e. it is not
/// public-by-absence. Used by the sidecar's `has_visibility_for_state`.
pub fn has_record(&self) -> bool {
!self.records.is_empty()
}
/// The effective record: the **head of the supersede DAG** — the record in
/// this blob that no other record supersedes — resolved purely from the
/// records' content-intrinsic [`supersedes`](StateVisibility::supersedes)
/// pointers, **never** from wall-clock `declared_at`.
///
/// Each locally-committed declaration links onto the prior head it read
/// under the repo write lock — its `supersedes` points at that head's
/// content hash (heddle#317 / PR #529 P1) — so for serialized local writes
/// the chain is linear and the head is the **last-committed** record,
/// independent of clock skew or whatever order the timestamps happen to
/// carry. `declared_at` is an audit/display field only. This also fixes a
/// latent cross-host bug: wall-clock cannot order records replicated across
/// hosts whose clocks disagree, but the content-hash chain can.
///
/// **Fork tie-break (concurrent / cross-host).** Two records can supersede
/// the *same* prior with neither superseding the other — a genuine
/// concurrent fork, e.g. two hosts that diverged. Both are heads. To make
/// every replica resolve the SAME effective record without consulting
/// wall-clock, the tie is broken by the **lexicographically greatest record
/// content hash** — a content-intrinsic, host-independent key. Cycles are
/// cryptographically unconstructable (a record's hash covers its
/// `supersedes` pointer, so no record can supersede one minted after it), so
/// a non-empty blob always has at least one head.
pub fn latest(&self) -> Result<Option<&StateVisibility>, StateVisibilityError> {
// Every content hash referenced by some record's `supersedes` pointer.
// A record whose own hash appears here has been superseded — it is not
// the head.
let superseded: std::collections::HashSet<ContentHash> =
self.records.iter().filter_map(|r| r.supersedes).collect();
let mut head: Option<(&StateVisibility, ContentHash)> = None;
for record in &self.records {
let hash = record.content_hash()?;
if superseded.contains(&hash) {
continue;
}
// Among multiple heads (a fork), keep the greatest content hash so
// the pick is deterministic and host-independent.
let take = match &head {
Some((_, best)) => hash > *best,
None => true,
};
if take {
head = Some((record, hash));
}
}
Ok(head.map(|(record, _)| record))
}
}
/// Errors produced while encoding/decoding/validating state visibility.
#[derive(Debug, thiserror::Error)]
pub enum StateVisibilityError {
#[error("unsupported state-visibility format version {0}")]
UnsupportedVersion(u8),
#[error("state-visibility codec error: {0}")]
Codec(String),
#[error("{0} tier requires a non-empty label")]
EmptyTierLabel(&'static str),
}
#[cfg(test)]
mod tests {
use chrono::TimeZone;
use super::*;
fn principal() -> Principal {
Principal {
name: "Grace Hopper".into(),
email: "grace@example.com".into(),
}
}
fn record(tier: VisibilityTier) -> StateVisibility {
StateVisibility {
state: ChangeId::from_bytes([3u8; 16]),
tier,
embargo_until: None,
declarer: principal(),
declared_at: Utc.with_ymd_and_hms(2026, 6, 1, 12, 0, 0).unwrap(),
signature: None,
supersedes: None,
}
}
#[test]
fn round_trips_through_msgpack() {
let original = StateVisibilityBlob::new(vec![record(VisibilityTier::Restricted {
scope_label: "security-embargo".into(),
})]);
let encoded = original.encode().expect("encode");
let decoded = StateVisibilityBlob::decode(&encoded).expect("decode");
assert_eq!(decoded, original);
// Format-version is load-bearing: future readers branch on it.
assert_eq!(decoded.format_version, StateVisibilityBlob::FORMAT_VERSION);
}
#[test]
fn round_trips_with_embargo_and_supersedes() {
let mut r = record(VisibilityTier::TeamScoped {
team_id: "infra".into(),
});
r.embargo_until = Some(Utc.with_ymd_and_hms(2026, 7, 1, 0, 0, 0).unwrap());
r.supersedes = Some(ContentHash::from_bytes([9u8; 32]));
let original = StateVisibilityBlob::new(vec![r]);
let encoded = original.encode().expect("encode");
let decoded = StateVisibilityBlob::decode(&encoded).expect("decode");
assert_eq!(decoded, original);
}
#[test]
fn decode_rejects_wrong_version() {
// Hand-encode a blob whose format_version is not the current one;
// decode must reject it via the macro's version-check prologue.
let bad = StateVisibilityBlob {
format_version: StateVisibilityBlob::FORMAT_VERSION + 7,
records: vec![record(VisibilityTier::Internal)],
};
let bytes = rmp_serde::to_vec(&bad).expect("raw encode");
let err = StateVisibilityBlob::decode(&bytes).expect_err("must reject wrong version");
assert!(matches!(
err,
StateVisibilityError::UnsupportedVersion(v) if v == StateVisibilityBlob::FORMAT_VERSION + 7
));
}
#[test]
fn validate_rejects_empty_label() {
let blob = StateVisibilityBlob::new(vec![record(VisibilityTier::Restricted {
scope_label: " ".into(),
})]);
assert!(matches!(
blob.validate(),
Err(StateVisibilityError::EmptyTierLabel("restricted"))
));
}
#[test]
fn canonical_payload_is_versioned_and_carries_tier() {
let r = record(VisibilityTier::Restricted {
scope_label: "security-embargo".into(),
});
let payload = r.canonical_signing_payload();
assert!(payload.starts_with(STATE_VISIBILITY_SIGNING_PAYLOAD_VERSION_TAG));
let text = String::from_utf8_lossy(&payload);
assert!(text.contains("restricted"));
assert!(text.contains("security-embargo"));
assert!(text.contains("grace@example.com"));
}
#[test]
fn latest_resolves_the_supersede_chain_head() {
// The effective record is the head of the supersede chain — the record
// no other record supersedes — not the most recent by wall-clock.
let early = record(VisibilityTier::Internal);
let early_id = early.content_hash().unwrap();
let late = StateVisibility {
declared_at: Utc.with_ymd_and_hms(2026, 6, 2, 9, 0, 0).unwrap(),
supersedes: Some(early_id),
..record(VisibilityTier::Public)
};
let blob = StateVisibilityBlob::new(vec![early, late.clone()]);
assert_eq!(blob.latest().unwrap().unwrap(), &late);
}
#[test]
fn latest_ignores_wall_clock_declared_at() {
// A record with a strictly LATER declared_at but EARLIER in the
// supersede chain must NOT be selected — the chain head wins regardless
// of wall-clock. This is the bug class the redesign closes: selection is
// content-intrinsic, so it can't be skewed by timestamps (or clock
// disagreement across hosts).
let head_tier = VisibilityTier::TeamScoped {
team_id: "infra".into(),
};
// The genesis carries the LATEST timestamp...
let early_in_chain = StateVisibility {
declared_at: Utc.with_ymd_and_hms(2030, 1, 1, 0, 0, 0).unwrap(),
..record(VisibilityTier::Internal)
};
let early_id = early_in_chain.content_hash().unwrap();
// ...the chain head supersedes it but carries an EARLIER timestamp.
let head = StateVisibility {
declared_at: Utc.with_ymd_and_hms(2000, 1, 1, 0, 0, 0).unwrap(),
supersedes: Some(early_id),
..record(head_tier.clone())
};
let blob = StateVisibilityBlob::new(vec![early_in_chain, head.clone()]);
let latest = blob.latest().unwrap().unwrap();
assert_eq!(latest, &head);
assert_eq!(
latest.tier, head_tier,
"the chain head wins even though its declared_at is the earlier of the two"
);
}
#[test]
fn concurrent_fork_resolves_deterministically() {
// Two records supersede the SAME prior with neither superseding the
// other — a genuine concurrent fork (e.g. two hosts). latest() must pick
// the SAME head on every replica via the content-intrinsic tie-break
// (greatest content hash), never wall-clock and never input order.
let genesis = record(VisibilityTier::Internal);
let genesis_id = genesis.content_hash().unwrap();
let fork_a = StateVisibility {
declared_at: Utc.with_ymd_and_hms(2026, 6, 2, 9, 0, 0).unwrap(),
supersedes: Some(genesis_id),
..record(VisibilityTier::TeamScoped {
team_id: "host-a".into(),
})
};
let fork_b = StateVisibility {
declared_at: Utc.with_ymd_and_hms(2026, 6, 3, 9, 0, 0).unwrap(),
supersedes: Some(genesis_id),
..record(VisibilityTier::TeamScoped {
team_id: "host-b".into(),
})
};
// The deterministic winner is the head with the greater content hash.
let expected = if fork_a.content_hash().unwrap() > fork_b.content_hash().unwrap() {
fork_a.clone()
} else {
fork_b.clone()
};
// Resolved identically regardless of the order the records appear in.
let blob1 = StateVisibilityBlob::new(vec![genesis.clone(), fork_a.clone(), fork_b.clone()]);
let blob2 = StateVisibilityBlob::new(vec![genesis, fork_b, fork_a]);
assert_eq!(blob1.latest().unwrap().unwrap(), &expected);
assert_eq!(
blob2.latest().unwrap().unwrap(),
&expected,
"the fork must resolve to the same head independent of record order"
);
}
#[test]
fn empty_blob_has_no_record() {
assert!(!StateVisibilityBlob::empty().has_record());
}
}