Skip to main content

wire/
object_graph.rs

1// SPDX-License-Identifier: Apache-2.0
2use std::collections::{HashSet, VecDeque};
3
4use objects::{
5    object::{ChangeId, ContentHash, State, TreeEntryTarget},
6    store::{ObjectStore, pack::ObjectType as PackObjectType},
7};
8use serde::{Deserialize, Serialize};
9
10use crate::{ProtocolError, Result};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum ObjectId {
14    Hash(ContentHash),
15    ChangeId(ChangeId),
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ObjectInfo {
20    pub id: ObjectId,
21    pub obj_type: ObjectType,
22    pub size: u64,
23    pub delta_base: Option<ContentHash>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27pub struct PlannedObject {
28    pub id: ObjectId,
29    pub obj_type: ObjectType,
30}
31
32#[derive(Debug, Clone)]
33pub struct StateClosureTransferObjects {
34    pub planned_objects: Vec<PlannedObject>,
35    pub full_objects: Option<Vec<ObjectInfo>>,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
39pub enum ObjectType {
40    Blob,
41    Tree,
42    State,
43    Action,
44    /// A `RedactionsBlob` sidecar — the rmp-encoded record(s) declaring
45    /// that a specific blob has been redacted (and possibly purged) by
46    /// an authorized operator. Keyed on the wire by `ObjectId::Hash` of
47    /// the *redacted blob*, since `Repository`'s sidecar store is
48    /// indexed that way.
49    Redaction,
50    /// A `StateVisibilityBlob` sidecar — the rmp-encoded record(s)
51    /// declaring a non-public audience tier for a specific state. Keyed
52    /// on the wire by `ObjectId::ChangeId` of the *state*, since the
53    /// per-state sidecar store is indexed that way. Like `Redaction`, it
54    /// is a sidecar record that lives outside the content-addressed pack
55    /// and ships via the per-object transfer path, not the pack.
56    StateVisibility,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub enum ObjectTypeBucket {
61    Blob,
62    Tree,
63    State,
64    Action,
65    Redaction,
66    StateVisibility,
67}
68
69impl ObjectType {
70    pub fn wire_name(self) -> &'static str {
71        match self {
72            ObjectType::Blob => "blob",
73            ObjectType::Tree => "tree",
74            ObjectType::State => "state",
75            ObjectType::Action => "action",
76            ObjectType::Redaction => "redaction",
77            ObjectType::StateVisibility => "state_visibility",
78        }
79    }
80
81    pub fn from_wire(value: &str) -> Result<Self> {
82        match value {
83            "blob" => Ok(ObjectType::Blob),
84            "tree" => Ok(ObjectType::Tree),
85            "state" => Ok(ObjectType::State),
86            "action" => Ok(ObjectType::Action),
87            "redaction" => Ok(ObjectType::Redaction),
88            "state_visibility" => Ok(ObjectType::StateVisibility),
89            _ => Err(ProtocolError::InvalidState(format!(
90                "unknown object type: {value}"
91            ))),
92        }
93    }
94
95    pub fn packable(self) -> bool {
96        !matches!(self, ObjectType::Redaction | ObjectType::StateVisibility)
97    }
98
99    pub fn pack_object_type(self) -> Result<PackObjectType> {
100        match self {
101            ObjectType::Blob => Ok(PackObjectType::Blob),
102            ObjectType::Tree => Ok(PackObjectType::Tree),
103            ObjectType::State => Ok(PackObjectType::State),
104            ObjectType::Action => Ok(PackObjectType::Action),
105            ObjectType::Redaction => Err(ProtocolError::InvalidState(
106                "Redaction sidecar records cannot be packed into the content-addressed object pack"
107                    .to_string(),
108            )),
109            ObjectType::StateVisibility => Err(ProtocolError::InvalidState(
110                "StateVisibility sidecar records cannot be packed into the content-addressed object pack"
111                    .to_string(),
112            )),
113        }
114    }
115
116    pub fn bucket(self) -> ObjectTypeBucket {
117        match self {
118            ObjectType::Blob => ObjectTypeBucket::Blob,
119            ObjectType::Tree => ObjectTypeBucket::Tree,
120            ObjectType::State => ObjectTypeBucket::State,
121            ObjectType::Action => ObjectTypeBucket::Action,
122            ObjectType::Redaction => ObjectTypeBucket::Redaction,
123            ObjectType::StateVisibility => ObjectTypeBucket::StateVisibility,
124        }
125    }
126}
127
128#[derive(Debug, Clone, Default)]
129pub struct StateClosureOptions {
130    pub depth: Option<u32>,
131    pub exclude_states: Vec<ChangeId>,
132}
133
134pub fn enumerate_state_closure(
135    store: &impl ObjectStore,
136    state_id: ChangeId,
137) -> Result<Vec<ObjectInfo>> {
138    enumerate_state_closure_with_options(store, state_id, StateClosureOptions::default())
139}
140
141pub fn enumerate_state_closure_with_options(
142    store: &impl ObjectStore,
143    state_id: ChangeId,
144    options: StateClosureOptions,
145) -> Result<Vec<ObjectInfo>> {
146    let mut out = Vec::new();
147    walk_state_closure(store, state_id, options, |event| {
148        if let Some(info) = object_info_from_event(store, event)? {
149            out.push(info);
150        }
151        Ok(())
152    })?;
153
154    Ok(out)
155}
156
157pub fn enumerate_state_closure_plan(
158    store: &impl ObjectStore,
159    state_id: ChangeId,
160) -> Result<Vec<PlannedObject>> {
161    enumerate_state_closure_plan_with_options(store, state_id, StateClosureOptions::default())
162}
163
164pub fn enumerate_state_closure_plan_with_options(
165    store: &impl ObjectStore,
166    state_id: ChangeId,
167    options: StateClosureOptions,
168) -> Result<Vec<PlannedObject>> {
169    let mut out = Vec::new();
170    walk_state_closure(store, state_id, options, |event| {
171        if let Some(object) = planned_object_from_event(store, event)? {
172            out.push(object);
173        }
174        Ok(())
175    })?;
176
177    Ok(out)
178}
179
180pub fn enumerate_state_closure_transfer_with_options(
181    store: &impl ObjectStore,
182    state_id: ChangeId,
183    options: StateClosureOptions,
184    full_descriptor_object_threshold: usize,
185) -> Result<StateClosureTransferObjects> {
186    let mut planned_objects = Vec::new();
187    let mut full_objects = Some(Vec::new());
188
189    walk_state_closure(store, state_id, options, |event| {
190        if let Some(object) = planned_object_from_event(store, event)? {
191            planned_objects.push(object);
192        }
193
194        if full_objects.is_some() && planned_objects.len() > full_descriptor_object_threshold {
195            full_objects = None;
196        }
197        if let Some(objects) = full_objects.as_mut()
198            && let Some(info) = object_info_from_event(store, event)?
199        {
200            objects.push(info);
201        }
202
203        Ok(())
204    })?;
205
206    Ok(StateClosureTransferObjects {
207        planned_objects,
208        full_objects,
209    })
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
213enum BlobSource {
214    Tree,
215    StateMetadata,
216}
217
218#[derive(Debug, Clone, Copy)]
219enum StateClosureEvent<'a> {
220    State { id: ChangeId, state: &'a State },
221    Tree { hash: ContentHash, tree: &'a objects::object::Tree },
222    Blob { hash: ContentHash, source: BlobSource },
223    Redaction { blob: ContentHash },
224    StateVisibility { state: ChangeId },
225    ExcludedState { id: ChangeId },
226    ExcludedHash { hash: ContentHash },
227}
228
229fn walk_state_closure(
230    store: &impl ObjectStore,
231    state_id: ChangeId,
232    options: StateClosureOptions,
233    mut visit: impl for<'event> FnMut(StateClosureEvent<'event>) -> Result<()>,
234) -> Result<()> {
235    let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
236
237    let mut seen_states: HashSet<ChangeId> = HashSet::new();
238    let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
239    let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
240    queue.push_back((state_id, 0));
241
242    while let Some((id, depth)) = queue.pop_front() {
243        if excluded_states.contains(&id) {
244            visit(StateClosureEvent::ExcludedState { id })?;
245            continue;
246        }
247        if !seen_states.insert(id) {
248            continue;
249        }
250
251        let state = store
252            .get_state(&id)?
253            .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
254
255        visit(StateClosureEvent::State { id, state: &state })?;
256        if store.has_state_visibility_for_state(&id)? {
257            visit(StateClosureEvent::StateVisibility { state: id })?;
258        }
259
260        if options.depth.map(|max| depth < max).unwrap_or(true) {
261            for parent in &state.parents {
262                queue.push_back((*parent, depth + 1));
263            }
264        }
265
266        walk_tree_closure_filtered(
267            store,
268            state.tree,
269            &excluded_hashes,
270            &mut seen_hashes,
271            &mut visit,
272        )?;
273        if let Some(provenance_root) = state.provenance {
274            walk_tree_closure_filtered(
275                store,
276                provenance_root,
277                &excluded_hashes,
278                &mut seen_hashes,
279                &mut visit,
280            )?;
281        }
282        if let Some(context_root) = state.context {
283            walk_tree_closure_filtered(
284                store,
285                context_root,
286                &excluded_hashes,
287                &mut seen_hashes,
288                &mut visit,
289            )?;
290        }
291        for metadata_blob in state_blob_dependencies(&state) {
292            walk_blob_filtered(
293                store,
294                metadata_blob,
295                BlobSource::StateMetadata,
296                &excluded_hashes,
297                &mut seen_hashes,
298                &mut visit,
299            )?;
300        }
301    }
302
303    Ok(())
304}
305
306fn walk_tree_closure_filtered(
307    store: &impl ObjectStore,
308    tree_hash: ContentHash,
309    excluded: &HashSet<ContentHash>,
310    seen: &mut HashSet<ContentHash>,
311    visit: &mut impl for<'event> FnMut(StateClosureEvent<'event>) -> Result<()>,
312) -> Result<()> {
313    if excluded.contains(&tree_hash) {
314        visit(StateClosureEvent::ExcludedHash { hash: tree_hash })?;
315        return Ok(());
316    }
317    if !seen.insert(tree_hash) {
318        return Ok(());
319    }
320
321    let tree = store
322        .get_tree(&tree_hash)?
323        .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
324
325    visit(StateClosureEvent::Tree {
326        hash: tree_hash,
327        tree: &tree,
328    })?;
329
330    for entry in tree.entries() {
331        match entry.target() {
332            TreeEntryTarget::Blob { hash, .. } | TreeEntryTarget::Symlink { hash } => {
333                walk_blob_filtered(
334                    store,
335                    *hash,
336                    BlobSource::Tree,
337                    excluded,
338                    seen,
339                    visit,
340                )?;
341            }
342            TreeEntryTarget::Tree { hash } => {
343                walk_tree_closure_filtered(store, *hash, excluded, seen, visit)?;
344            }
345            TreeEntryTarget::Gitlink { .. } => {}
346        }
347    }
348
349    Ok(())
350}
351
352fn walk_blob_filtered(
353    store: &impl ObjectStore,
354    blob_hash: ContentHash,
355    source: BlobSource,
356    excluded: &HashSet<ContentHash>,
357    seen: &mut HashSet<ContentHash>,
358    visit: &mut impl for<'event> FnMut(StateClosureEvent<'event>) -> Result<()>,
359) -> Result<()> {
360    if excluded.contains(&blob_hash) {
361        visit(StateClosureEvent::ExcludedHash { hash: blob_hash })?;
362        return Ok(());
363    }
364    if !seen.insert(blob_hash) {
365        return Ok(());
366    }
367    visit(StateClosureEvent::Blob {
368        hash: blob_hash,
369        source,
370    })?;
371    if store.has_redactions_for_blob(&blob_hash)? {
372        visit(StateClosureEvent::Redaction { blob: blob_hash })?;
373    }
374    Ok(())
375}
376
377fn object_info_from_event(
378    store: &impl ObjectStore,
379    event: StateClosureEvent<'_>,
380) -> Result<Option<ObjectInfo>> {
381    match event {
382        StateClosureEvent::State { id, state } => {
383            let state_bytes = rmp_serde::to_vec_named(state)?;
384            Ok(Some(ObjectInfo {
385                id: ObjectId::ChangeId(id),
386                obj_type: ObjectType::State,
387                size: state_bytes.len() as u64,
388                delta_base: None,
389            }))
390        }
391        StateClosureEvent::Tree { hash, tree } => {
392            let tree_bytes = rmp_serde::to_vec_named(tree)?;
393            Ok(Some(ObjectInfo {
394                id: ObjectId::Hash(hash),
395                obj_type: ObjectType::Tree,
396                size: tree_bytes.len() as u64,
397                delta_base: None,
398            }))
399        }
400        StateClosureEvent::Blob { hash, .. } => {
401            let blob = store
402                .get_blob(&hash)?
403                .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?;
404            Ok(Some(ObjectInfo {
405                id: ObjectId::Hash(hash),
406                obj_type: ObjectType::Blob,
407                size: blob.size() as u64,
408                delta_base: None,
409            }))
410        }
411        StateClosureEvent::Redaction { blob } => {
412            Ok(store.get_redactions_bytes_for_blob(&blob)?.map(|bytes| ObjectInfo {
413                id: ObjectId::Hash(blob),
414                obj_type: ObjectType::Redaction,
415                size: bytes.len() as u64,
416                delta_base: None,
417            }))
418        }
419        StateClosureEvent::StateVisibility { state } => Ok(store
420            .get_state_visibility_bytes_for_state(&state)?
421            .map(|bytes| ObjectInfo {
422                id: ObjectId::ChangeId(state),
423                obj_type: ObjectType::StateVisibility,
424                size: bytes.len() as u64,
425                delta_base: None,
426            })),
427        StateClosureEvent::ExcludedState { id } => {
428            let _ = id;
429            Ok(None)
430        }
431        StateClosureEvent::ExcludedHash { hash } => {
432            let _ = hash;
433            Ok(None)
434        }
435    }
436}
437
438fn planned_object_from_event(
439    store: &impl ObjectStore,
440    event: StateClosureEvent<'_>,
441) -> Result<Option<PlannedObject>> {
442    match event {
443        StateClosureEvent::State { id, .. } => Ok(Some(PlannedObject {
444            id: ObjectId::ChangeId(id),
445            obj_type: ObjectType::State,
446        })),
447        StateClosureEvent::Tree { hash, .. } => Ok(Some(PlannedObject {
448            id: ObjectId::Hash(hash),
449            obj_type: ObjectType::Tree,
450        })),
451        StateClosureEvent::Blob { hash, source } => {
452            if source == BlobSource::StateMetadata && store.get_blob(&hash)?.is_none() {
453                return Err(ProtocolError::ObjectNotFound(hash.to_hex()));
454            }
455            Ok(Some(PlannedObject {
456                id: ObjectId::Hash(hash),
457                obj_type: ObjectType::Blob,
458            }))
459        }
460        StateClosureEvent::Redaction { blob } => Ok(Some(PlannedObject {
461            id: ObjectId::Hash(blob),
462            obj_type: ObjectType::Redaction,
463        })),
464        StateClosureEvent::StateVisibility { state } => Ok(Some(PlannedObject {
465            id: ObjectId::ChangeId(state),
466            obj_type: ObjectType::StateVisibility,
467        })),
468        StateClosureEvent::ExcludedState { id } => {
469            let _ = id;
470            Ok(None)
471        }
472        StateClosureEvent::ExcludedHash { hash } => {
473            let _ = hash;
474            Ok(None)
475        }
476    }
477}
478
479pub fn missing_blobs_in_tree(
480    store: &impl ObjectStore,
481    tree_hash: ContentHash,
482) -> Result<Vec<ContentHash>> {
483    let mut missing = Vec::new();
484    collect_missing_blobs_recursive(store, &tree_hash, &mut missing)?;
485    Ok(missing)
486}
487
488fn collect_missing_blobs_recursive(
489    store: &impl ObjectStore,
490    tree_hash: &ContentHash,
491    missing: &mut Vec<ContentHash>,
492) -> Result<()> {
493    let Some(tree) = store.get_tree(tree_hash).map_err(|err| {
494        ProtocolError::InvalidState(format!(
495            "load tree {} while collecting lazy hydration missing blobs: {err}",
496            tree_hash.to_hex()
497        ))
498    })?
499    else {
500        return Ok(());
501    };
502
503    for entry in tree.entries() {
504        match entry.target() {
505            TreeEntryTarget::Blob { hash, .. } | TreeEntryTarget::Symlink { hash } => {
506                if !store.has_blob(hash).map_err(|err| {
507                    ProtocolError::InvalidState(format!(
508                        "check blob {} while collecting lazy hydration missing blobs: {err}",
509                        hash.to_hex()
510                    ))
511                })? {
512                    missing.push(*hash);
513                }
514            }
515            TreeEntryTarget::Tree { hash } => {
516                collect_missing_blobs_recursive(store, hash, missing)?;
517            }
518            TreeEntryTarget::Gitlink { .. } => {}
519        }
520    }
521    Ok(())
522}
523
524fn collect_excluded(
525    store: &impl ObjectStore,
526    roots: &[ChangeId],
527) -> Result<(HashSet<ChangeId>, HashSet<ContentHash>)> {
528    if roots.is_empty() {
529        return Ok((HashSet::new(), HashSet::new()));
530    }
531
532    let mut excluded_states: HashSet<ChangeId> = HashSet::new();
533    let mut excluded_hashes: HashSet<ContentHash> = HashSet::new();
534    let mut queue: VecDeque<ChangeId> = VecDeque::new();
535
536    for id in roots {
537        queue.push_back(*id);
538    }
539
540    while let Some(id) = queue.pop_front() {
541        if !excluded_states.insert(id) {
542            continue;
543        }
544
545        let state = match store.get_state(&id)? {
546            Some(state) => state,
547            None => continue,
548        };
549
550        for parent in &state.parents {
551            queue.push_back(*parent);
552        }
553
554        collect_tree_hashes(store, state.tree, &mut excluded_hashes)?;
555        if let Some(provenance_root) = state.provenance {
556            collect_tree_hashes(store, provenance_root, &mut excluded_hashes)?;
557        }
558        if let Some(context_root) = state.context {
559            collect_tree_hashes(store, context_root, &mut excluded_hashes)?;
560        }
561        for metadata_blob in state_blob_dependencies(&state) {
562            excluded_hashes.insert(metadata_blob);
563        }
564    }
565
566    Ok((excluded_states, excluded_hashes))
567}
568
569fn state_blob_dependencies(state: &State) -> impl Iterator<Item = ContentHash> + '_ {
570    [
571        state.risk_signals,
572        state.review_signatures,
573        state.discussions,
574        state.structured_conflicts,
575    ]
576    .into_iter()
577    .flatten()
578}
579
580fn collect_tree_hashes(
581    store: &impl ObjectStore,
582    tree_hash: ContentHash,
583    excluded: &mut HashSet<ContentHash>,
584) -> Result<()> {
585    if !excluded.insert(tree_hash) {
586        return Ok(());
587    }
588
589    let tree = match store.get_tree(&tree_hash)? {
590        Some(tree) => tree,
591        None => return Ok(()),
592    };
593
594    for entry in tree.entries() {
595        match entry.target() {
596            TreeEntryTarget::Blob { hash, .. } | TreeEntryTarget::Symlink { hash } => {
597                excluded.insert(*hash);
598            }
599            TreeEntryTarget::Tree { hash } => {
600                collect_tree_hashes(store, *hash, excluded)?;
601            }
602            TreeEntryTarget::Gitlink { .. } => {}
603        }
604    }
605
606    Ok(())
607}
608
609pub fn is_ancestor(
610    store: &impl ObjectStore,
611    ancestor: ChangeId,
612    descendant: ChangeId,
613) -> Result<bool> {
614    if ancestor == descendant {
615        return Ok(true);
616    }
617
618    let mut seen: HashSet<ChangeId> = HashSet::new();
619    let mut queue: VecDeque<ChangeId> = VecDeque::new();
620    queue.push_back(descendant);
621
622    while let Some(id) = queue.pop_front() {
623        if !seen.insert(id) {
624            continue;
625        }
626        let state = match store.get_state(&id)? {
627            Some(s) => s,
628            None => return Ok(false),
629        };
630        for parent in state.parents {
631            if parent == ancestor {
632                return Ok(true);
633            }
634            queue.push_back(parent);
635        }
636    }
637
638    Ok(false)
639}
640
641#[cfg(test)]
642mod tests {
643    use std::collections::HashSet;
644
645    use chrono::Utc;
646    use objects::{
647        object::{
648            Action, ActionId, Attribution, Blob, ChangeId, ContentHash, Discussion,
649            DiscussionResolution, DiscussionTurn, DiscussionsBlob, Principal, Redaction, State,
650            StateVisibility, SymbolAnchor, Tree, TreeEntry, VisibilityTier,
651        },
652        store::{ObjectStore, Result as StoreResult},
653    };
654    use repo::Repository;
655    use sley::ObjectId as GitObjectId;
656    use std::sync::atomic::{AtomicUsize, Ordering};
657    use tempfile::TempDir;
658
659    use super::{
660        ObjectId, ObjectInfo, ObjectType, PlannedObject, StateClosureOptions,
661        enumerate_state_closure_plan_with_options, enumerate_state_closure_transfer_with_options,
662        enumerate_state_closure_with_options, missing_blobs_in_tree,
663    };
664
665    fn pairs_from_full(objects: &[ObjectInfo]) -> HashSet<(ObjectId, ObjectType)> {
666        objects
667            .iter()
668            .map(|info| (info.id.clone(), info.obj_type))
669            .collect()
670    }
671
672    fn pairs_from_plan(objects: &[PlannedObject]) -> HashSet<(ObjectId, ObjectType)> {
673        objects
674            .iter()
675            .map(|info| (info.id.clone(), info.obj_type))
676            .collect()
677    }
678
679    fn object_info_fingerprint(
680        objects: &[ObjectInfo],
681    ) -> Vec<(ObjectId, ObjectType, u64, Option<ContentHash>)> {
682        objects
683            .iter()
684            .map(|info| {
685                (
686                    info.id.clone(),
687                    info.obj_type,
688                    info.size,
689                    info.delta_base,
690                )
691            })
692            .collect()
693    }
694
695    fn assert_plan_parity(
696        repo: &Repository,
697        state_id: ChangeId,
698        options: StateClosureOptions,
699    ) -> HashSet<(ObjectId, ObjectType)> {
700        let full =
701            enumerate_state_closure_with_options(repo.store(), state_id, options.clone()).unwrap();
702        let plan =
703            enumerate_state_closure_plan_with_options(repo.store(), state_id, options).unwrap();
704
705        let full_pairs = pairs_from_full(&full);
706        let plan_pairs = pairs_from_plan(&plan);
707        assert_eq!(full_pairs, plan_pairs);
708        full_pairs
709    }
710
711    fn assert_contains_object(
712        objects: &HashSet<(ObjectId, ObjectType)>,
713        id: ObjectId,
714        obj_type: ObjectType,
715    ) {
716        assert!(
717            objects.contains(&(id.clone(), obj_type)),
718            "expected closure to contain {id:?} as {obj_type:?}: {objects:?}"
719        );
720    }
721
722    struct CountingStore<'a, S> {
723        inner: &'a S,
724        state_reads: AtomicUsize,
725    }
726
727    impl<'a, S> CountingStore<'a, S> {
728        fn new(inner: &'a S) -> Self {
729            Self {
730                inner,
731                state_reads: AtomicUsize::new(0),
732            }
733        }
734
735        fn state_reads(&self) -> usize {
736            self.state_reads.load(Ordering::SeqCst)
737        }
738    }
739
740    impl<S: ObjectStore> ObjectStore for CountingStore<'_, S> {
741        fn get_blob(&self, hash: &ContentHash) -> StoreResult<Option<Blob>> {
742            self.inner.get_blob(hash)
743        }
744
745        fn put_blob(&self, blob: &Blob) -> StoreResult<ContentHash> {
746            self.inner.put_blob(blob)
747        }
748
749        fn has_blob(&self, hash: &ContentHash) -> StoreResult<bool> {
750            self.inner.has_blob(hash)
751        }
752
753        fn get_tree(&self, hash: &ContentHash) -> StoreResult<Option<Tree>> {
754            self.inner.get_tree(hash)
755        }
756
757        fn put_tree(&self, tree: &Tree) -> StoreResult<ContentHash> {
758            self.inner.put_tree(tree)
759        }
760
761        fn has_tree(&self, hash: &ContentHash) -> StoreResult<bool> {
762            self.inner.has_tree(hash)
763        }
764
765        fn get_state(&self, id: &ChangeId) -> StoreResult<Option<State>> {
766            self.state_reads.fetch_add(1, Ordering::SeqCst);
767            self.inner.get_state(id)
768        }
769
770        fn put_state(&self, state: &State) -> StoreResult<()> {
771            self.inner.put_state(state)
772        }
773
774        fn has_state(&self, id: &ChangeId) -> StoreResult<bool> {
775            self.inner.has_state(id)
776        }
777
778        fn list_states(&self) -> StoreResult<Vec<ChangeId>> {
779            self.inner.list_states()
780        }
781
782        fn get_action(&self, id: &ActionId) -> StoreResult<Option<Action>> {
783            self.inner.get_action(id)
784        }
785
786        fn put_action(&self, action: &mut Action) -> StoreResult<ActionId> {
787            self.inner.put_action(action)
788        }
789
790        fn list_actions(&self) -> StoreResult<Vec<ActionId>> {
791            self.inner.list_actions()
792        }
793
794        fn list_blobs(&self) -> StoreResult<Vec<ContentHash>> {
795            self.inner.list_blobs()
796        }
797
798        fn list_trees(&self) -> StoreResult<Vec<ContentHash>> {
799            self.inner.list_trees()
800        }
801    }
802
803    fn test_attribution() -> Attribution {
804        Attribution::human(Principal::new("Graph Tester", "graph@example.com"))
805    }
806
807    #[test]
808    fn lean_closure_planner_matches_object_info_ids_and_types() {
809        let temp = TempDir::new().unwrap();
810        let repo = Repository::init_default(temp.path()).unwrap();
811        std::fs::create_dir_all(temp.path().join("src")).unwrap();
812        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
813        std::fs::write(temp.path().join("src/lib.rs"), "pub fn hi() {}\n").unwrap();
814        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
815
816        let full = enumerate_state_closure_with_options(
817            repo.store(),
818            state.change_id,
819            StateClosureOptions::default(),
820        )
821        .unwrap();
822        let lean = enumerate_state_closure_plan_with_options(
823            repo.store(),
824            state.change_id,
825            StateClosureOptions::default(),
826        )
827        .unwrap();
828
829        let full_pairs = full
830            .into_iter()
831            .map(|info| (info.id, info.obj_type))
832            .collect::<std::collections::HashSet<_>>();
833        let lean_pairs = lean
834            .into_iter()
835            .map(|info| (info.id, info.obj_type))
836            .collect::<std::collections::HashSet<_>>();
837
838        assert_eq!(full_pairs, lean_pairs);
839        assert!(
840            full_pairs
841                .iter()
842                .any(|(id, _)| matches!(id, ObjectId::ChangeId(_)))
843        );
844    }
845
846    #[test]
847    fn transfer_projection_matches_full_and_plan_on_mixed_state_closure_fixture() {
848        let temp = TempDir::new().unwrap();
849        let repo = Repository::init_default(temp.path()).unwrap();
850
851        let excluded_blob = repo
852            .store()
853            .put_blob(&Blob::from("excluded"))
854            .expect("put excluded blob");
855        let excluded_tree_hash = repo
856            .store()
857            .put_tree(&Tree::from_entries(vec![
858                TreeEntry::file("excluded.txt", excluded_blob, false).unwrap(),
859            ]))
860            .expect("put excluded tree");
861        let excluded_parent = State::new(excluded_tree_hash, Vec::new(), test_attribution());
862        repo.store()
863            .put_state(&excluded_parent)
864            .expect("put excluded parent");
865
866        let redacted_blob = repo
867            .store()
868            .put_blob(&Blob::from("secret"))
869            .expect("put redacted blob");
870        let nested_blob = repo
871            .store()
872            .put_blob(&Blob::from("nested"))
873            .expect("put nested blob");
874        let symlink_blob = repo
875            .store()
876            .put_blob(&Blob::from("target"))
877            .expect("put symlink blob");
878        let context_blob = repo
879            .store()
880            .put_blob(&Blob::from("context"))
881            .expect("put context blob");
882        let provenance_blob = repo
883            .store()
884            .put_blob(&Blob::from("provenance"))
885            .expect("put provenance blob");
886        let risk_blob = repo
887            .store()
888            .put_blob(&Blob::from("risk"))
889            .expect("put risk blob");
890        let review_blob = repo
891            .store()
892            .put_blob(&Blob::from("review"))
893            .expect("put review blob");
894        let discussions_blob = repo
895            .store()
896            .put_blob(&Blob::from("discussion"))
897            .expect("put discussion blob");
898        let conflicts_blob = repo
899            .store()
900            .put_blob(&Blob::from("conflicts"))
901            .expect("put conflicts blob");
902
903        let nested_tree_hash = repo
904            .store()
905            .put_tree(&Tree::from_entries(vec![
906                TreeEntry::file("nested.txt", nested_blob, false).unwrap(),
907                TreeEntry::symlink("latest", symlink_blob).unwrap(),
908            ]))
909            .expect("put nested tree");
910        let context_tree_hash = repo
911            .store()
912            .put_tree(&Tree::from_entries(vec![
913                TreeEntry::file("context.txt", context_blob, false).unwrap(),
914            ]))
915            .expect("put context tree");
916        let provenance_tree_hash = repo
917            .store()
918            .put_tree(&Tree::from_entries(vec![
919                TreeEntry::file("lineage.txt", provenance_blob, false).unwrap(),
920            ]))
921            .expect("put provenance tree");
922        let gitlink_target: GitObjectId = "0303030303030303030303030303030303030303"
923            .parse()
924            .expect("git oid");
925        let root_tree_hash = repo
926            .store()
927            .put_tree(&Tree::from_entries(vec![
928                TreeEntry::file("secret.txt", redacted_blob, false).unwrap(),
929                TreeEntry::directory("nested", nested_tree_hash).unwrap(),
930                TreeEntry::gitlink("vendor", gitlink_target).unwrap(),
931            ]))
932            .expect("put root tree");
933        let state = State::new(
934            root_tree_hash,
935            vec![excluded_parent.change_id],
936            test_attribution(),
937        )
938        .with_context(context_tree_hash)
939        .with_provenance(provenance_tree_hash)
940        .with_risk_signals(risk_blob)
941        .with_review_signatures(review_blob)
942        .with_discussions(discussions_blob)
943        .with_structured_conflicts(conflicts_blob);
944        repo.store().put_state(&state).expect("put state");
945
946        repo.put_redaction(Redaction {
947            redacted_blob,
948            state: state.change_id,
949            path: "secret.txt".to_string(),
950            reason: "test leak".to_string(),
951            redactor: Principal::new("Tester", "tester@example.test"),
952            redacted_at: Utc::now(),
953            signature: None,
954            purged_at: None,
955            supersedes: None,
956        })
957        .expect("put redaction");
958        repo.put_state_visibility(StateVisibility {
959            state: state.change_id,
960            tier: VisibilityTier::Restricted {
961                scope_label: "security".to_string(),
962            },
963            embargo_until: None,
964            declarer: Principal::new("Tester", "tester@example.test"),
965            declared_at: Utc::now(),
966            signature: None,
967            supersedes: None,
968        })
969        .expect("put visibility");
970
971        let options = StateClosureOptions {
972            depth: None,
973            exclude_states: vec![excluded_parent.change_id],
974        };
975        let transfer = enumerate_state_closure_transfer_with_options(
976            repo.store(),
977            state.change_id,
978            options.clone(),
979            512,
980        )
981        .expect("transfer projection");
982
983        let full = enumerate_state_closure_with_options(
984            repo.store(),
985            state.change_id,
986            options.clone(),
987        )
988        .expect("full closure");
989        let plan = enumerate_state_closure_plan_with_options(
990            repo.store(),
991            state.change_id,
992            options,
993        )
994        .expect("plan closure");
995        assert_eq!(
996            transfer.full_objects.as_deref().map(object_info_fingerprint),
997            Some(object_info_fingerprint(&full))
998        );
999        assert_eq!(transfer.planned_objects, plan);
1000
1001        let full_pairs = pairs_from_full(&full);
1002        assert_eq!(full_pairs, pairs_from_plan(&plan));
1003        assert_contains_object(
1004            &full_pairs,
1005            ObjectId::ChangeId(state.change_id),
1006            ObjectType::State,
1007        );
1008        assert_contains_object(
1009            &full_pairs,
1010            ObjectId::ChangeId(state.change_id),
1011            ObjectType::StateVisibility,
1012        );
1013        assert_contains_object(&full_pairs, ObjectId::Hash(redacted_blob), ObjectType::Blob);
1014        assert_contains_object(&full_pairs, ObjectId::Hash(redacted_blob), ObjectType::Redaction);
1015        for hash in [
1016            root_tree_hash,
1017            nested_tree_hash,
1018            context_tree_hash,
1019            provenance_tree_hash,
1020        ] {
1021            assert_contains_object(&full_pairs, ObjectId::Hash(hash), ObjectType::Tree);
1022        }
1023        for hash in [
1024            nested_blob,
1025            symlink_blob,
1026            context_blob,
1027            provenance_blob,
1028            risk_blob,
1029            review_blob,
1030            discussions_blob,
1031            conflicts_blob,
1032        ] {
1033            assert_contains_object(&full_pairs, ObjectId::Hash(hash), ObjectType::Blob);
1034        }
1035        assert!(!full_pairs.contains(&(
1036            ObjectId::ChangeId(excluded_parent.change_id),
1037            ObjectType::State
1038        )));
1039        assert!(!full_pairs.contains(&(ObjectId::Hash(excluded_tree_hash), ObjectType::Tree)));
1040        assert!(!full_pairs.contains(&(ObjectId::Hash(excluded_blob), ObjectType::Blob)));
1041    }
1042
1043    #[test]
1044    fn transfer_projection_reads_root_state_once_on_small_transfer() {
1045        let temp = TempDir::new().unwrap();
1046        let repo = Repository::init_default(temp.path()).unwrap();
1047        let blob = repo
1048            .store()
1049            .put_blob(&Blob::from("hello\n"))
1050            .expect("put blob");
1051        let tree_hash = repo
1052            .store()
1053            .put_tree(&Tree::from_entries(vec![
1054                TreeEntry::file("README.md", blob, false).unwrap(),
1055            ]))
1056            .expect("put tree");
1057        let state = State::new(tree_hash, Vec::new(), test_attribution());
1058        repo.store().put_state(&state).expect("put state");
1059        let store = CountingStore::new(repo.store());
1060
1061        let transfer = enumerate_state_closure_transfer_with_options(
1062            &store,
1063            state.change_id,
1064            StateClosureOptions::default(),
1065            512,
1066        )
1067        .expect("transfer projection");
1068
1069        assert!(
1070            !transfer.planned_objects.is_empty(),
1071            "lean projection should be available"
1072        );
1073        assert!(transfer.full_objects.is_some());
1074        assert_eq!(
1075            store.state_reads(),
1076            1,
1077            "small transfer projection must not read the root state through a second closure walk"
1078        );
1079    }
1080
1081    #[test]
1082    fn transfer_projection_drops_full_descriptors_after_threshold() {
1083        let temp = TempDir::new().unwrap();
1084        let repo = Repository::init_default(temp.path()).unwrap();
1085        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
1086        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1087
1088        let transfer = enumerate_state_closure_transfer_with_options(
1089            repo.store(),
1090            state.change_id,
1091            StateClosureOptions::default(),
1092            0,
1093        )
1094        .expect("transfer projection");
1095
1096        assert!(
1097            !transfer.planned_objects.is_empty(),
1098            "lean projection should still be available over the threshold"
1099        );
1100        assert!(transfer.full_objects.is_none());
1101    }
1102
1103    #[test]
1104    fn depth_and_exclude_options_match_between_full_and_plan() {
1105        let temp = TempDir::new().unwrap();
1106        let repo = Repository::init_default(temp.path()).unwrap();
1107        let path = temp.path().join("story.txt");
1108
1109        std::fs::write(&path, "base\n").unwrap();
1110        let base = repo.snapshot(Some("base".to_string()), None).unwrap();
1111        std::fs::write(&path, "middle\n").unwrap();
1112        let middle = repo.snapshot(Some("middle".to_string()), None).unwrap();
1113        std::fs::write(&path, "tip\n").unwrap();
1114        let tip = repo.snapshot(Some("tip".to_string()), None).unwrap();
1115
1116        let depth_zero = assert_plan_parity(
1117            &repo,
1118            tip.change_id,
1119            StateClosureOptions {
1120                depth: Some(0),
1121                exclude_states: Vec::new(),
1122            },
1123        );
1124        assert!(depth_zero.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
1125        assert!(!depth_zero.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State)));
1126        assert!(!depth_zero.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
1127
1128        let depth_one = assert_plan_parity(
1129            &repo,
1130            tip.change_id,
1131            StateClosureOptions {
1132                depth: Some(1),
1133                exclude_states: Vec::new(),
1134            },
1135        );
1136        assert!(depth_one.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
1137        assert!(depth_one.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State)));
1138        assert!(!depth_one.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
1139
1140        let exclude_middle = assert_plan_parity(
1141            &repo,
1142            tip.change_id,
1143            StateClosureOptions {
1144                depth: None,
1145                exclude_states: vec![middle.change_id],
1146            },
1147        );
1148        assert!(exclude_middle.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
1149        assert!(
1150            !exclude_middle.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State))
1151        );
1152        assert!(!exclude_middle.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
1153    }
1154
1155    #[test]
1156    fn shared_tree_and_blob_references_are_emitted_once() {
1157        let temp = TempDir::new().unwrap();
1158        let repo = Repository::init_default(temp.path()).unwrap();
1159
1160        let shared_blob = Blob::from("shared contents\n");
1161        let shared_blob_hash = repo.store().put_blob(&shared_blob).unwrap();
1162        let shared_tree = Tree::from_entries(vec![
1163            TreeEntry::file("shared.txt", shared_blob_hash, false).unwrap(),
1164        ]);
1165        let shared_tree_hash = repo.store().put_tree(&shared_tree).unwrap();
1166        let root = Tree::from_entries(vec![
1167            TreeEntry::directory("left", shared_tree_hash).unwrap(),
1168            TreeEntry::directory("right", shared_tree_hash).unwrap(),
1169        ]);
1170        let root_hash = repo.store().put_tree(&root).unwrap();
1171        let state = State::new(root_hash, Vec::new(), test_attribution());
1172        repo.store().put_state(&state).unwrap();
1173
1174        let full = enumerate_state_closure_with_options(
1175            repo.store(),
1176            state.change_id,
1177            StateClosureOptions::default(),
1178        )
1179        .unwrap();
1180        let plan = enumerate_state_closure_plan_with_options(
1181            repo.store(),
1182            state.change_id,
1183            StateClosureOptions::default(),
1184        )
1185        .unwrap();
1186
1187        assert_eq!(
1188            pairs_from_full(&full),
1189            pairs_from_plan(&plan),
1190            "full and lean closure enumerators must dedup the same objects"
1191        );
1192
1193        assert_eq!(
1194            full.iter()
1195                .filter(|info| info.id == ObjectId::Hash(root_hash)
1196                    && info.obj_type == ObjectType::Tree)
1197                .count(),
1198            1
1199        );
1200        assert_eq!(
1201            full.iter()
1202                .filter(|info| info.id == ObjectId::Hash(shared_tree_hash)
1203                    && info.obj_type == ObjectType::Tree)
1204                .count(),
1205            1
1206        );
1207        assert_eq!(
1208            full.iter()
1209                .filter(|info| info.id == ObjectId::Hash(shared_blob_hash)
1210                    && info.obj_type == ObjectType::Blob)
1211                .count(),
1212            1
1213        );
1214    }
1215
1216    #[test]
1217    fn state_closure_skips_gitlink_targets() {
1218        let temp = TempDir::new().unwrap();
1219        let repo = Repository::init_default(temp.path()).unwrap();
1220        let target: GitObjectId = "0303030303030303030303030303030303030303"
1221            .parse()
1222            .expect("git oid");
1223        let root = Tree::from_entries(vec![
1224            TreeEntry::gitlink("vendor", target).expect("gitlink entry"),
1225        ]);
1226        let root_hash = repo.store().put_tree(&root).unwrap();
1227        let state = State::new(root_hash, Vec::new(), test_attribution());
1228        repo.store().put_state(&state).unwrap();
1229
1230        let full = enumerate_state_closure_with_options(
1231            repo.store(),
1232            state.change_id,
1233            StateClosureOptions::default(),
1234        )
1235        .unwrap();
1236        let plan = enumerate_state_closure_plan_with_options(
1237            repo.store(),
1238            state.change_id,
1239            StateClosureOptions::default(),
1240        )
1241        .unwrap();
1242
1243        assert_eq!(pairs_from_full(&full), pairs_from_plan(&plan));
1244        assert!(
1245            !full.iter().any(|info| info.obj_type == ObjectType::Blob),
1246            "gitlinks carry foreign Git commit ids, not Heddle blob dependencies: {full:?}"
1247        );
1248        assert!(full.iter().any(|info| {
1249            info.id == ObjectId::Hash(root_hash) && info.obj_type == ObjectType::Tree
1250        }));
1251    }
1252
1253    #[test]
1254    fn missing_blobs_in_tree_skips_gitlinks_and_walks_nested_side_paths() {
1255        let temp = TempDir::new().unwrap();
1256        let repo = Repository::init_default(temp.path()).unwrap();
1257        let present_blob = repo
1258            .store()
1259            .put_blob(&Blob::from("already local"))
1260            .expect("put present blob");
1261        let missing_nested = ContentHash::from_bytes([7; 32]);
1262        let missing_symlink = ContentHash::from_bytes([8; 32]);
1263        let nested_tree = Tree::from_entries(vec![
1264            TreeEntry::file("remote.txt", missing_nested, false).unwrap(),
1265            TreeEntry::symlink("remote-link", missing_symlink).unwrap(),
1266        ]);
1267        let nested_tree_hash = repo.store().put_tree(&nested_tree).expect("put nested tree");
1268        let gitlink_target: GitObjectId = "0404040404040404040404040404040404040404"
1269            .parse()
1270            .expect("git oid");
1271        let root = Tree::from_entries(vec![
1272            TreeEntry::file("local.txt", present_blob, false).unwrap(),
1273            TreeEntry::directory("nested", nested_tree_hash).unwrap(),
1274            TreeEntry::gitlink("vendor", gitlink_target).unwrap(),
1275        ]);
1276        let root_hash = repo.store().put_tree(&root).expect("put root tree");
1277
1278        let missing = missing_blobs_in_tree(repo.store(), root_hash).expect("missing blobs");
1279
1280        assert_eq!(
1281            missing.into_iter().collect::<HashSet<_>>(),
1282            HashSet::from([missing_nested, missing_symlink])
1283        );
1284    }
1285
1286    /// Once a redaction is declared for a blob in a snapshot, the
1287    /// state closure must include an `ObjectType::Redaction` entry
1288    /// keyed on that blob's hash — that's the wire-side signal the
1289    /// receiver replays.
1290    #[test]
1291    fn enumerate_state_closure_emits_redaction_for_redacted_blob() {
1292        let temp = TempDir::new().unwrap();
1293        let repo = Repository::init_default(temp.path()).unwrap();
1294        std::fs::write(temp.path().join("secret.toml"), "api_token = \"x\"\n").unwrap();
1295        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1296
1297        // Find the blob hash for secret.toml by walking the snapshot's tree.
1298        let tree = repo
1299            .store()
1300            .get_tree(&state.tree)
1301            .unwrap()
1302            .expect("tree present");
1303        let blob_hash = tree
1304            .iter()
1305            .find(|e| e.name() == "secret.toml")
1306            .expect("entry present")
1307            .blob_hash()
1308            .expect("secret.toml is a blob");
1309
1310        let redaction = Redaction {
1311            redacted_blob: blob_hash,
1312            state: state.change_id,
1313            path: "secret.toml".to_string(),
1314            reason: "test leak".to_string(),
1315            redactor: Principal {
1316                name: "Tester".into(),
1317                email: "tester@heddle.sh".into(),
1318            },
1319            redacted_at: Utc::now(),
1320            signature: None,
1321            purged_at: None,
1322            supersedes: None,
1323        };
1324        repo.put_redaction(redaction).unwrap();
1325
1326        let full = enumerate_state_closure_with_options(
1327            repo.store(),
1328            state.change_id,
1329            StateClosureOptions::default(),
1330        )
1331        .unwrap();
1332        let plan = enumerate_state_closure_plan_with_options(
1333            repo.store(),
1334            state.change_id,
1335            StateClosureOptions::default(),
1336        )
1337        .unwrap();
1338
1339        assert!(
1340            full.iter()
1341                .any(|info| info.obj_type == ObjectType::Redaction
1342                    && info.id == ObjectId::Hash(blob_hash)),
1343            "full closure must include a Redaction entry for the redacted blob"
1344        );
1345        assert!(
1346            plan.iter()
1347                .any(|p| p.obj_type == ObjectType::Redaction && p.id == ObjectId::Hash(blob_hash)),
1348            "plan closure must include a Redaction entry for the redacted blob"
1349        );
1350    }
1351
1352    #[test]
1353    fn enumerate_state_closure_emits_state_visibility_for_visible_state() {
1354        let temp = TempDir::new().unwrap();
1355        let repo = Repository::init_default(temp.path()).unwrap();
1356        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
1357        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1358
1359        repo.put_state_visibility(StateVisibility {
1360            state: state.change_id,
1361            tier: VisibilityTier::Restricted {
1362                scope_label: "security-embargo".into(),
1363            },
1364            embargo_until: None,
1365            declarer: Principal {
1366                name: "Tester".into(),
1367                email: "tester@heddle.sh".into(),
1368            },
1369            declared_at: Utc::now(),
1370            signature: None,
1371            supersedes: None,
1372        })
1373        .unwrap();
1374
1375        let full = enumerate_state_closure_with_options(
1376            repo.store(),
1377            state.change_id,
1378            StateClosureOptions::default(),
1379        )
1380        .unwrap();
1381        let plan = enumerate_state_closure_plan_with_options(
1382            repo.store(),
1383            state.change_id,
1384            StateClosureOptions::default(),
1385        )
1386        .unwrap();
1387
1388        assert!(
1389            full.iter()
1390                .any(|info| info.obj_type == ObjectType::StateVisibility
1391                    && info.id == ObjectId::ChangeId(state.change_id)),
1392            "full closure must include a StateVisibility entry for the visible state"
1393        );
1394        assert!(
1395            plan.iter()
1396                .any(|p| p.obj_type == ObjectType::StateVisibility
1397                    && p.id == ObjectId::ChangeId(state.change_id)),
1398            "plan closure must include a StateVisibility entry for the visible state"
1399        );
1400    }
1401
1402    #[test]
1403    fn enumerate_state_closure_emits_state_metadata_blobs() {
1404        let temp = TempDir::new().unwrap();
1405        let repo = Repository::init_default(temp.path()).unwrap();
1406        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
1407        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1408
1409        let principal = Principal::new("Tester", "tester@example.test");
1410        let discussion_bytes = DiscussionsBlob::new(vec![Discussion {
1411            id: "disc-1".to_string(),
1412            anchor: SymbolAnchor::new("src/lib.rs", "answer"),
1413            opened_against_state: state.change_id,
1414            opened_at: 1_782_400_000,
1415            thread_ref: None,
1416            turns: vec![DiscussionTurn {
1417                author: principal,
1418                body: "Should this sync?".to_string(),
1419                posted_at: 1_782_400_000,
1420            }],
1421            resolution: DiscussionResolution::Open,
1422            body_changed_since_open: false,
1423            orphaned: false,
1424            visibility: VisibilityTier::default(),
1425            resolved_annotation_id: None,
1426        }])
1427        .encode()
1428        .expect("encode discussions");
1429        let discussion_hash = repo
1430            .store()
1431            .put_blob(&Blob::new(discussion_bytes))
1432            .expect("put discussions blob");
1433        let risk_hash = repo
1434            .store()
1435            .put_blob(&Blob::from_slice(b"risk signals"))
1436            .expect("put risk blob");
1437        let review_hash = repo
1438            .store()
1439            .put_blob(&Blob::from_slice(b"review signatures"))
1440            .expect("put review blob");
1441        let conflicts_hash = repo
1442            .store()
1443            .put_blob(&Blob::from_slice(b"structured conflicts"))
1444            .expect("put conflicts blob");
1445        let state_with_metadata = state
1446            .with_risk_signals(risk_hash)
1447            .with_review_signatures(review_hash)
1448            .with_discussions(discussion_hash)
1449            .with_structured_conflicts(conflicts_hash);
1450        repo.store()
1451            .put_state(&state_with_metadata)
1452            .expect("put state with metadata");
1453
1454        let full = enumerate_state_closure_with_options(
1455            repo.store(),
1456            state_with_metadata.change_id,
1457            StateClosureOptions::default(),
1458        )
1459        .unwrap();
1460        let plan = enumerate_state_closure_plan_with_options(
1461            repo.store(),
1462            state_with_metadata.change_id,
1463            StateClosureOptions::default(),
1464        )
1465        .unwrap();
1466
1467        for metadata_hash in [risk_hash, review_hash, discussion_hash, conflicts_hash] {
1468            assert!(
1469                full.iter().any(|info| info.obj_type == ObjectType::Blob
1470                    && info.id == ObjectId::Hash(metadata_hash)),
1471                "full closure must include state metadata blob {metadata_hash}"
1472            );
1473            assert!(
1474                plan.iter().any(
1475                    |p| p.obj_type == ObjectType::Blob && p.id == ObjectId::Hash(metadata_hash)
1476                ),
1477                "plan closure must include state metadata blob {metadata_hash}"
1478            );
1479        }
1480    }
1481}