Skip to main content

wire/
object_graph.rs

1// SPDX-License-Identifier: Apache-2.0
2use std::collections::{HashSet, VecDeque};
3
4use objects::{
5    object::{ChangeId, ContentHash, State, TreeEntryTarget},
6    store::{ObjectStore, pack::ObjectType as PackObjectType},
7};
8use serde::{Deserialize, Serialize};
9
10use crate::{ProtocolError, Result};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum ObjectId {
14    Hash(ContentHash),
15    ChangeId(ChangeId),
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ObjectInfo {
20    pub id: ObjectId,
21    pub obj_type: ObjectType,
22    pub size: u64,
23    pub delta_base: Option<ContentHash>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27pub struct PlannedObject {
28    pub id: ObjectId,
29    pub obj_type: ObjectType,
30}
31
32#[derive(Debug, Clone)]
33pub struct StateClosureTransferObjects {
34    pub planned_objects: Vec<PlannedObject>,
35    pub full_objects: Option<Vec<ObjectInfo>>,
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
39pub enum ObjectType {
40    Blob,
41    Tree,
42    State,
43    Action,
44    /// A `RedactionsBlob` sidecar — the rmp-encoded record(s) declaring
45    /// that a specific blob has been redacted (and possibly purged) by
46    /// an authorized operator. Keyed on the wire by `ObjectId::Hash` of
47    /// the *redacted blob*, since `Repository`'s sidecar store is
48    /// indexed that way.
49    Redaction,
50    /// A `StateVisibilityBlob` sidecar — the rmp-encoded record(s)
51    /// declaring a non-public audience tier for a specific state. Keyed
52    /// on the wire by `ObjectId::ChangeId` of the *state*, since the
53    /// per-state sidecar store is indexed that way. Like `Redaction`, it
54    /// is a sidecar record that lives outside the content-addressed pack
55    /// and ships via the per-object transfer path, not the pack.
56    StateVisibility,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
60pub enum ObjectTypeBucket {
61    Blob,
62    Tree,
63    State,
64    Action,
65    Redaction,
66    StateVisibility,
67}
68
69impl ObjectType {
70    pub fn wire_name(self) -> &'static str {
71        match self {
72            ObjectType::Blob => "blob",
73            ObjectType::Tree => "tree",
74            ObjectType::State => "state",
75            ObjectType::Action => "action",
76            ObjectType::Redaction => "redaction",
77            ObjectType::StateVisibility => "state_visibility",
78        }
79    }
80
81    pub fn from_wire(value: &str) -> Result<Self> {
82        match value {
83            "blob" => Ok(ObjectType::Blob),
84            "tree" => Ok(ObjectType::Tree),
85            "state" => Ok(ObjectType::State),
86            "action" => Ok(ObjectType::Action),
87            "redaction" => Ok(ObjectType::Redaction),
88            "state_visibility" => Ok(ObjectType::StateVisibility),
89            _ => Err(ProtocolError::InvalidState(format!(
90                "unknown object type: {value}"
91            ))),
92        }
93    }
94
95    pub fn packable(self) -> bool {
96        !matches!(self, ObjectType::Redaction | ObjectType::StateVisibility)
97    }
98
99    pub fn pack_object_type(self) -> Result<PackObjectType> {
100        match self {
101            ObjectType::Blob => Ok(PackObjectType::Blob),
102            ObjectType::Tree => Ok(PackObjectType::Tree),
103            ObjectType::State => Ok(PackObjectType::State),
104            ObjectType::Action => Ok(PackObjectType::Action),
105            ObjectType::Redaction => Err(ProtocolError::InvalidState(
106                "Redaction sidecar records cannot be packed into the content-addressed object pack"
107                    .to_string(),
108            )),
109            ObjectType::StateVisibility => Err(ProtocolError::InvalidState(
110                "StateVisibility sidecar records cannot be packed into the content-addressed object pack"
111                    .to_string(),
112            )),
113        }
114    }
115
116    pub fn bucket(self) -> ObjectTypeBucket {
117        match self {
118            ObjectType::Blob => ObjectTypeBucket::Blob,
119            ObjectType::Tree => ObjectTypeBucket::Tree,
120            ObjectType::State => ObjectTypeBucket::State,
121            ObjectType::Action => ObjectTypeBucket::Action,
122            ObjectType::Redaction => ObjectTypeBucket::Redaction,
123            ObjectType::StateVisibility => ObjectTypeBucket::StateVisibility,
124        }
125    }
126}
127
128#[derive(Debug, Clone, Default)]
129pub struct StateClosureOptions {
130    pub depth: Option<u32>,
131    pub exclude_states: Vec<ChangeId>,
132}
133
134pub fn enumerate_state_closure(
135    store: &impl ObjectStore,
136    state_id: ChangeId,
137) -> Result<Vec<ObjectInfo>> {
138    enumerate_state_closure_with_options(store, state_id, StateClosureOptions::default())
139}
140
141pub fn enumerate_state_closure_with_options(
142    store: &impl ObjectStore,
143    state_id: ChangeId,
144    options: StateClosureOptions,
145) -> Result<Vec<ObjectInfo>> {
146    let mut out = Vec::new();
147    walk_state_closure(store, state_id, options, |event| {
148        if let Some(info) = object_info_from_event(store, event)? {
149            out.push(info);
150        }
151        Ok(())
152    })?;
153
154    Ok(out)
155}
156
157pub fn enumerate_state_closure_plan(
158    store: &impl ObjectStore,
159    state_id: ChangeId,
160) -> Result<Vec<PlannedObject>> {
161    enumerate_state_closure_plan_with_options(store, state_id, StateClosureOptions::default())
162}
163
164pub fn enumerate_state_closure_plan_with_options(
165    store: &impl ObjectStore,
166    state_id: ChangeId,
167    options: StateClosureOptions,
168) -> Result<Vec<PlannedObject>> {
169    let mut out = Vec::new();
170    walk_state_closure(store, state_id, options, |event| {
171        if let Some(object) = planned_object_from_event(store, event)? {
172            out.push(object);
173        }
174        Ok(())
175    })?;
176
177    Ok(out)
178}
179
180pub fn enumerate_state_closure_transfer_with_options(
181    store: &impl ObjectStore,
182    state_id: ChangeId,
183    options: StateClosureOptions,
184    full_descriptor_object_threshold: usize,
185) -> Result<StateClosureTransferObjects> {
186    let mut planned_objects = Vec::new();
187    let mut full_objects = Some(Vec::new());
188
189    walk_state_closure(store, state_id, options, |event| {
190        if let Some(object) = planned_object_from_event(store, event)? {
191            planned_objects.push(object);
192        }
193
194        if full_objects.is_some() && planned_objects.len() > full_descriptor_object_threshold {
195            full_objects = None;
196        }
197        if let Some(objects) = full_objects.as_mut()
198            && let Some(info) = object_info_from_event(store, event)?
199        {
200            objects.push(info);
201        }
202
203        Ok(())
204    })?;
205
206    Ok(StateClosureTransferObjects {
207        planned_objects,
208        full_objects,
209    })
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
213enum BlobSource {
214    Tree,
215    StateMetadata,
216}
217
218#[derive(Debug, Clone, Copy)]
219enum StateClosureEvent<'a> {
220    State { id: ChangeId, state: &'a State },
221    Tree { hash: ContentHash, tree: &'a objects::object::Tree },
222    Blob { hash: ContentHash, source: BlobSource },
223    Redaction { blob: ContentHash },
224    StateVisibility { state: ChangeId },
225    ExcludedState { id: ChangeId },
226    ExcludedHash { hash: ContentHash },
227}
228
229fn walk_state_closure(
230    store: &impl ObjectStore,
231    state_id: ChangeId,
232    options: StateClosureOptions,
233    mut visit: impl for<'event> FnMut(StateClosureEvent<'event>) -> Result<()>,
234) -> Result<()> {
235    let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
236
237    let mut seen_states: HashSet<ChangeId> = HashSet::new();
238    let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
239    let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
240    queue.push_back((state_id, 0));
241
242    while let Some((id, depth)) = queue.pop_front() {
243        if excluded_states.contains(&id) {
244            visit(StateClosureEvent::ExcludedState { id })?;
245            continue;
246        }
247        if !seen_states.insert(id) {
248            continue;
249        }
250
251        let state = store
252            .get_state(&id)?
253            .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
254
255        visit(StateClosureEvent::State { id, state: &state })?;
256        if store.has_state_visibility_for_state(&id)? {
257            visit(StateClosureEvent::StateVisibility { state: id })?;
258        }
259
260        if options.depth.map(|max| depth < max).unwrap_or(true) {
261            for parent in &state.parents {
262                queue.push_back((*parent, depth + 1));
263            }
264        }
265
266        walk_tree_closure_filtered(
267            store,
268            state.tree,
269            &excluded_hashes,
270            &mut seen_hashes,
271            &mut visit,
272        )?;
273        if let Some(provenance_root) = state.provenance {
274            walk_tree_closure_filtered(
275                store,
276                provenance_root,
277                &excluded_hashes,
278                &mut seen_hashes,
279                &mut visit,
280            )?;
281        }
282        if let Some(context_root) = state.context {
283            walk_tree_closure_filtered(
284                store,
285                context_root,
286                &excluded_hashes,
287                &mut seen_hashes,
288                &mut visit,
289            )?;
290        }
291        for metadata_blob in state_blob_dependencies(&state) {
292            walk_blob_filtered(
293                store,
294                metadata_blob,
295                BlobSource::StateMetadata,
296                &excluded_hashes,
297                &mut seen_hashes,
298                &mut visit,
299            )?;
300        }
301    }
302
303    Ok(())
304}
305
306fn walk_tree_closure_filtered(
307    store: &impl ObjectStore,
308    tree_hash: ContentHash,
309    excluded: &HashSet<ContentHash>,
310    seen: &mut HashSet<ContentHash>,
311    visit: &mut impl for<'event> FnMut(StateClosureEvent<'event>) -> Result<()>,
312) -> Result<()> {
313    if excluded.contains(&tree_hash) {
314        visit(StateClosureEvent::ExcludedHash { hash: tree_hash })?;
315        return Ok(());
316    }
317    if !seen.insert(tree_hash) {
318        return Ok(());
319    }
320
321    let tree = store
322        .get_tree(&tree_hash)?
323        .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
324
325    visit(StateClosureEvent::Tree {
326        hash: tree_hash,
327        tree: &tree,
328    })?;
329
330    for entry in tree.entries() {
331        match entry.target() {
332            TreeEntryTarget::Blob { hash, .. } | TreeEntryTarget::Symlink { hash } => {
333                walk_blob_filtered(
334                    store,
335                    *hash,
336                    BlobSource::Tree,
337                    excluded,
338                    seen,
339                    visit,
340                )?;
341            }
342            TreeEntryTarget::Tree { hash } => {
343                walk_tree_closure_filtered(store, *hash, excluded, seen, visit)?;
344            }
345            TreeEntryTarget::Gitlink { .. } => {}
346            // Native child-spool edge: its target lives in a separate spool
347            // object graph, not this store, so it is not walked here.
348            TreeEntryTarget::Spoollink { .. } => {}
349        }
350    }
351
352    Ok(())
353}
354
355fn walk_blob_filtered(
356    store: &impl ObjectStore,
357    blob_hash: ContentHash,
358    source: BlobSource,
359    excluded: &HashSet<ContentHash>,
360    seen: &mut HashSet<ContentHash>,
361    visit: &mut impl for<'event> FnMut(StateClosureEvent<'event>) -> Result<()>,
362) -> Result<()> {
363    if excluded.contains(&blob_hash) {
364        visit(StateClosureEvent::ExcludedHash { hash: blob_hash })?;
365        return Ok(());
366    }
367    if !seen.insert(blob_hash) {
368        return Ok(());
369    }
370    visit(StateClosureEvent::Blob {
371        hash: blob_hash,
372        source,
373    })?;
374    if store.has_redactions_for_blob(&blob_hash)? {
375        visit(StateClosureEvent::Redaction { blob: blob_hash })?;
376    }
377    Ok(())
378}
379
380fn object_info_from_event(
381    store: &impl ObjectStore,
382    event: StateClosureEvent<'_>,
383) -> Result<Option<ObjectInfo>> {
384    match event {
385        StateClosureEvent::State { id, state } => {
386            let state_bytes = rmp_serde::to_vec_named(state)?;
387            Ok(Some(ObjectInfo {
388                id: ObjectId::ChangeId(id),
389                obj_type: ObjectType::State,
390                size: state_bytes.len() as u64,
391                delta_base: None,
392            }))
393        }
394        StateClosureEvent::Tree { hash, tree } => {
395            let tree_bytes = rmp_serde::to_vec_named(tree)?;
396            Ok(Some(ObjectInfo {
397                id: ObjectId::Hash(hash),
398                obj_type: ObjectType::Tree,
399                size: tree_bytes.len() as u64,
400                delta_base: None,
401            }))
402        }
403        StateClosureEvent::Blob { hash, .. } => {
404            let blob = store
405                .get_blob(&hash)?
406                .ok_or_else(|| ProtocolError::ObjectNotFound(hash.to_hex()))?;
407            Ok(Some(ObjectInfo {
408                id: ObjectId::Hash(hash),
409                obj_type: ObjectType::Blob,
410                size: blob.size() as u64,
411                delta_base: None,
412            }))
413        }
414        StateClosureEvent::Redaction { blob } => {
415            Ok(store.get_redactions_bytes_for_blob(&blob)?.map(|bytes| ObjectInfo {
416                id: ObjectId::Hash(blob),
417                obj_type: ObjectType::Redaction,
418                size: bytes.len() as u64,
419                delta_base: None,
420            }))
421        }
422        StateClosureEvent::StateVisibility { state } => Ok(store
423            .get_state_visibility_bytes_for_state(&state)?
424            .map(|bytes| ObjectInfo {
425                id: ObjectId::ChangeId(state),
426                obj_type: ObjectType::StateVisibility,
427                size: bytes.len() as u64,
428                delta_base: None,
429            })),
430        StateClosureEvent::ExcludedState { id } => {
431            let _ = id;
432            Ok(None)
433        }
434        StateClosureEvent::ExcludedHash { hash } => {
435            let _ = hash;
436            Ok(None)
437        }
438    }
439}
440
441fn planned_object_from_event(
442    store: &impl ObjectStore,
443    event: StateClosureEvent<'_>,
444) -> Result<Option<PlannedObject>> {
445    match event {
446        StateClosureEvent::State { id, .. } => Ok(Some(PlannedObject {
447            id: ObjectId::ChangeId(id),
448            obj_type: ObjectType::State,
449        })),
450        StateClosureEvent::Tree { hash, .. } => Ok(Some(PlannedObject {
451            id: ObjectId::Hash(hash),
452            obj_type: ObjectType::Tree,
453        })),
454        StateClosureEvent::Blob { hash, source } => {
455            if source == BlobSource::StateMetadata && store.get_blob(&hash)?.is_none() {
456                return Err(ProtocolError::ObjectNotFound(hash.to_hex()));
457            }
458            Ok(Some(PlannedObject {
459                id: ObjectId::Hash(hash),
460                obj_type: ObjectType::Blob,
461            }))
462        }
463        StateClosureEvent::Redaction { blob } => Ok(Some(PlannedObject {
464            id: ObjectId::Hash(blob),
465            obj_type: ObjectType::Redaction,
466        })),
467        StateClosureEvent::StateVisibility { state } => Ok(Some(PlannedObject {
468            id: ObjectId::ChangeId(state),
469            obj_type: ObjectType::StateVisibility,
470        })),
471        StateClosureEvent::ExcludedState { id } => {
472            let _ = id;
473            Ok(None)
474        }
475        StateClosureEvent::ExcludedHash { hash } => {
476            let _ = hash;
477            Ok(None)
478        }
479    }
480}
481
482pub fn missing_blobs_in_tree(
483    store: &impl ObjectStore,
484    tree_hash: ContentHash,
485) -> Result<Vec<ContentHash>> {
486    let mut missing = Vec::new();
487    collect_missing_blobs_recursive(store, &tree_hash, &mut missing)?;
488    Ok(missing)
489}
490
491fn collect_missing_blobs_recursive(
492    store: &impl ObjectStore,
493    tree_hash: &ContentHash,
494    missing: &mut Vec<ContentHash>,
495) -> Result<()> {
496    let Some(tree) = store.get_tree(tree_hash).map_err(|err| {
497        ProtocolError::InvalidState(format!(
498            "load tree {} while collecting lazy hydration missing blobs: {err}",
499            tree_hash.to_hex()
500        ))
501    })?
502    else {
503        return Ok(());
504    };
505
506    for entry in tree.entries() {
507        match entry.target() {
508            TreeEntryTarget::Blob { hash, .. } | TreeEntryTarget::Symlink { hash } => {
509                if !store.has_blob(hash).map_err(|err| {
510                    ProtocolError::InvalidState(format!(
511                        "check blob {} while collecting lazy hydration missing blobs: {err}",
512                        hash.to_hex()
513                    ))
514                })? {
515                    missing.push(*hash);
516                }
517            }
518            TreeEntryTarget::Tree { hash } => {
519                collect_missing_blobs_recursive(store, hash, missing)?;
520            }
521            TreeEntryTarget::Gitlink { .. } => {}
522            // Native child-spool edge: its target lives in a separate spool
523            // object graph, not this store, so it is not walked here.
524            TreeEntryTarget::Spoollink { .. } => {}
525        }
526    }
527    Ok(())
528}
529
530fn collect_excluded(
531    store: &impl ObjectStore,
532    roots: &[ChangeId],
533) -> Result<(HashSet<ChangeId>, HashSet<ContentHash>)> {
534    if roots.is_empty() {
535        return Ok((HashSet::new(), HashSet::new()));
536    }
537
538    let mut excluded_states: HashSet<ChangeId> = HashSet::new();
539    let mut excluded_hashes: HashSet<ContentHash> = HashSet::new();
540    let mut queue: VecDeque<ChangeId> = VecDeque::new();
541
542    for id in roots {
543        queue.push_back(*id);
544    }
545
546    while let Some(id) = queue.pop_front() {
547        if !excluded_states.insert(id) {
548            continue;
549        }
550
551        let state = match store.get_state(&id)? {
552            Some(state) => state,
553            None => continue,
554        };
555
556        for parent in &state.parents {
557            queue.push_back(*parent);
558        }
559
560        collect_tree_hashes(store, state.tree, &mut excluded_hashes)?;
561        if let Some(provenance_root) = state.provenance {
562            collect_tree_hashes(store, provenance_root, &mut excluded_hashes)?;
563        }
564        if let Some(context_root) = state.context {
565            collect_tree_hashes(store, context_root, &mut excluded_hashes)?;
566        }
567        for metadata_blob in state_blob_dependencies(&state) {
568            excluded_hashes.insert(metadata_blob);
569        }
570    }
571
572    Ok((excluded_states, excluded_hashes))
573}
574
575fn state_blob_dependencies(state: &State) -> impl Iterator<Item = ContentHash> + '_ {
576    [
577        state.risk_signals,
578        state.review_signatures,
579        state.discussions,
580        state.structured_conflicts,
581    ]
582    .into_iter()
583    .flatten()
584}
585
586fn collect_tree_hashes(
587    store: &impl ObjectStore,
588    tree_hash: ContentHash,
589    excluded: &mut HashSet<ContentHash>,
590) -> Result<()> {
591    if !excluded.insert(tree_hash) {
592        return Ok(());
593    }
594
595    let tree = match store.get_tree(&tree_hash)? {
596        Some(tree) => tree,
597        None => return Ok(()),
598    };
599
600    for entry in tree.entries() {
601        match entry.target() {
602            TreeEntryTarget::Blob { hash, .. } | TreeEntryTarget::Symlink { hash } => {
603                excluded.insert(*hash);
604            }
605            TreeEntryTarget::Tree { hash } => {
606                collect_tree_hashes(store, *hash, excluded)?;
607            }
608            TreeEntryTarget::Gitlink { .. } => {}
609            // Native child-spool edge: its target lives in a separate spool
610            // object graph, not this store, so it is not walked here.
611            TreeEntryTarget::Spoollink { .. } => {}
612        }
613    }
614
615    Ok(())
616}
617
618pub fn is_ancestor(
619    store: &impl ObjectStore,
620    ancestor: ChangeId,
621    descendant: ChangeId,
622) -> Result<bool> {
623    if ancestor == descendant {
624        return Ok(true);
625    }
626
627    let mut seen: HashSet<ChangeId> = HashSet::new();
628    let mut queue: VecDeque<ChangeId> = VecDeque::new();
629    queue.push_back(descendant);
630
631    while let Some(id) = queue.pop_front() {
632        if !seen.insert(id) {
633            continue;
634        }
635        let state = match store.get_state(&id)? {
636            Some(s) => s,
637            None => return Ok(false),
638        };
639        for parent in state.parents {
640            if parent == ancestor {
641                return Ok(true);
642            }
643            queue.push_back(parent);
644        }
645    }
646
647    Ok(false)
648}
649
650#[cfg(test)]
651mod tests {
652    use std::collections::HashSet;
653
654    use chrono::Utc;
655    use objects::{
656        object::{
657            Action, ActionId, Attribution, Blob, ChangeId, ContentHash, Discussion,
658            DiscussionResolution, DiscussionTurn, DiscussionsBlob, Principal, Redaction, State,
659            StateVisibility, SymbolAnchor, Tree, TreeEntry, VisibilityTier,
660        },
661        store::{ObjectStore, Result as StoreResult},
662    };
663    use repo::Repository;
664    use sley::ObjectId as GitObjectId;
665    use std::sync::atomic::{AtomicUsize, Ordering};
666    use tempfile::TempDir;
667
668    use super::{
669        ObjectId, ObjectInfo, ObjectType, PlannedObject, StateClosureOptions,
670        enumerate_state_closure_plan_with_options, enumerate_state_closure_transfer_with_options,
671        enumerate_state_closure_with_options, missing_blobs_in_tree,
672    };
673
674    fn pairs_from_full(objects: &[ObjectInfo]) -> HashSet<(ObjectId, ObjectType)> {
675        objects
676            .iter()
677            .map(|info| (info.id.clone(), info.obj_type))
678            .collect()
679    }
680
681    fn pairs_from_plan(objects: &[PlannedObject]) -> HashSet<(ObjectId, ObjectType)> {
682        objects
683            .iter()
684            .map(|info| (info.id.clone(), info.obj_type))
685            .collect()
686    }
687
688    fn object_info_fingerprint(
689        objects: &[ObjectInfo],
690    ) -> Vec<(ObjectId, ObjectType, u64, Option<ContentHash>)> {
691        objects
692            .iter()
693            .map(|info| {
694                (
695                    info.id.clone(),
696                    info.obj_type,
697                    info.size,
698                    info.delta_base,
699                )
700            })
701            .collect()
702    }
703
704    fn assert_plan_parity(
705        repo: &Repository,
706        state_id: ChangeId,
707        options: StateClosureOptions,
708    ) -> HashSet<(ObjectId, ObjectType)> {
709        let full =
710            enumerate_state_closure_with_options(repo.store(), state_id, options.clone()).unwrap();
711        let plan =
712            enumerate_state_closure_plan_with_options(repo.store(), state_id, options).unwrap();
713
714        let full_pairs = pairs_from_full(&full);
715        let plan_pairs = pairs_from_plan(&plan);
716        assert_eq!(full_pairs, plan_pairs);
717        full_pairs
718    }
719
720    fn assert_contains_object(
721        objects: &HashSet<(ObjectId, ObjectType)>,
722        id: ObjectId,
723        obj_type: ObjectType,
724    ) {
725        assert!(
726            objects.contains(&(id.clone(), obj_type)),
727            "expected closure to contain {id:?} as {obj_type:?}: {objects:?}"
728        );
729    }
730
731    struct CountingStore<'a, S> {
732        inner: &'a S,
733        state_reads: AtomicUsize,
734    }
735
736    impl<'a, S> CountingStore<'a, S> {
737        fn new(inner: &'a S) -> Self {
738            Self {
739                inner,
740                state_reads: AtomicUsize::new(0),
741            }
742        }
743
744        fn state_reads(&self) -> usize {
745            self.state_reads.load(Ordering::SeqCst)
746        }
747    }
748
749    impl<S: ObjectStore> ObjectStore for CountingStore<'_, S> {
750        fn get_blob(&self, hash: &ContentHash) -> StoreResult<Option<Blob>> {
751            self.inner.get_blob(hash)
752        }
753
754        fn put_blob(&self, blob: &Blob) -> StoreResult<ContentHash> {
755            self.inner.put_blob(blob)
756        }
757
758        fn has_blob(&self, hash: &ContentHash) -> StoreResult<bool> {
759            self.inner.has_blob(hash)
760        }
761
762        fn get_tree(&self, hash: &ContentHash) -> StoreResult<Option<Tree>> {
763            self.inner.get_tree(hash)
764        }
765
766        fn put_tree(&self, tree: &Tree) -> StoreResult<ContentHash> {
767            self.inner.put_tree(tree)
768        }
769
770        fn has_tree(&self, hash: &ContentHash) -> StoreResult<bool> {
771            self.inner.has_tree(hash)
772        }
773
774        fn get_state(&self, id: &ChangeId) -> StoreResult<Option<State>> {
775            self.state_reads.fetch_add(1, Ordering::SeqCst);
776            self.inner.get_state(id)
777        }
778
779        fn put_state(&self, state: &State) -> StoreResult<()> {
780            self.inner.put_state(state)
781        }
782
783        fn has_state(&self, id: &ChangeId) -> StoreResult<bool> {
784            self.inner.has_state(id)
785        }
786
787        fn list_states(&self) -> StoreResult<Vec<ChangeId>> {
788            self.inner.list_states()
789        }
790
791        fn get_action(&self, id: &ActionId) -> StoreResult<Option<Action>> {
792            self.inner.get_action(id)
793        }
794
795        fn put_action(&self, action: &mut Action) -> StoreResult<ActionId> {
796            self.inner.put_action(action)
797        }
798
799        fn list_actions(&self) -> StoreResult<Vec<ActionId>> {
800            self.inner.list_actions()
801        }
802
803        fn list_blobs(&self) -> StoreResult<Vec<ContentHash>> {
804            self.inner.list_blobs()
805        }
806
807        fn list_trees(&self) -> StoreResult<Vec<ContentHash>> {
808            self.inner.list_trees()
809        }
810    }
811
812    fn test_attribution() -> Attribution {
813        Attribution::human(Principal::new("Graph Tester", "graph@example.com"))
814    }
815
816    #[test]
817    fn lean_closure_planner_matches_object_info_ids_and_types() {
818        let temp = TempDir::new().unwrap();
819        let repo = Repository::init_default(temp.path()).unwrap();
820        std::fs::create_dir_all(temp.path().join("src")).unwrap();
821        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
822        std::fs::write(temp.path().join("src/lib.rs"), "pub fn hi() {}\n").unwrap();
823        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
824
825        let full = enumerate_state_closure_with_options(
826            repo.store(),
827            state.change_id,
828            StateClosureOptions::default(),
829        )
830        .unwrap();
831        let lean = enumerate_state_closure_plan_with_options(
832            repo.store(),
833            state.change_id,
834            StateClosureOptions::default(),
835        )
836        .unwrap();
837
838        let full_pairs = full
839            .into_iter()
840            .map(|info| (info.id, info.obj_type))
841            .collect::<std::collections::HashSet<_>>();
842        let lean_pairs = lean
843            .into_iter()
844            .map(|info| (info.id, info.obj_type))
845            .collect::<std::collections::HashSet<_>>();
846
847        assert_eq!(full_pairs, lean_pairs);
848        assert!(
849            full_pairs
850                .iter()
851                .any(|(id, _)| matches!(id, ObjectId::ChangeId(_)))
852        );
853    }
854
855    #[test]
856    fn transfer_projection_matches_full_and_plan_on_mixed_state_closure_fixture() {
857        let temp = TempDir::new().unwrap();
858        let repo = Repository::init_default(temp.path()).unwrap();
859
860        let excluded_blob = repo
861            .store()
862            .put_blob(&Blob::from("excluded"))
863            .expect("put excluded blob");
864        let excluded_tree_hash = repo
865            .store()
866            .put_tree(&Tree::from_entries(vec![
867                TreeEntry::file("excluded.txt", excluded_blob, false).unwrap(),
868            ]))
869            .expect("put excluded tree");
870        let excluded_parent = State::new(excluded_tree_hash, Vec::new(), test_attribution());
871        repo.store()
872            .put_state(&excluded_parent)
873            .expect("put excluded parent");
874
875        let redacted_blob = repo
876            .store()
877            .put_blob(&Blob::from("secret"))
878            .expect("put redacted blob");
879        let nested_blob = repo
880            .store()
881            .put_blob(&Blob::from("nested"))
882            .expect("put nested blob");
883        let symlink_blob = repo
884            .store()
885            .put_blob(&Blob::from("target"))
886            .expect("put symlink blob");
887        let context_blob = repo
888            .store()
889            .put_blob(&Blob::from("context"))
890            .expect("put context blob");
891        let provenance_blob = repo
892            .store()
893            .put_blob(&Blob::from("provenance"))
894            .expect("put provenance blob");
895        let risk_blob = repo
896            .store()
897            .put_blob(&Blob::from("risk"))
898            .expect("put risk blob");
899        let review_blob = repo
900            .store()
901            .put_blob(&Blob::from("review"))
902            .expect("put review blob");
903        let discussions_blob = repo
904            .store()
905            .put_blob(&Blob::from("discussion"))
906            .expect("put discussion blob");
907        let conflicts_blob = repo
908            .store()
909            .put_blob(&Blob::from("conflicts"))
910            .expect("put conflicts blob");
911
912        let nested_tree_hash = repo
913            .store()
914            .put_tree(&Tree::from_entries(vec![
915                TreeEntry::file("nested.txt", nested_blob, false).unwrap(),
916                TreeEntry::symlink("latest", symlink_blob).unwrap(),
917            ]))
918            .expect("put nested tree");
919        let context_tree_hash = repo
920            .store()
921            .put_tree(&Tree::from_entries(vec![
922                TreeEntry::file("context.txt", context_blob, false).unwrap(),
923            ]))
924            .expect("put context tree");
925        let provenance_tree_hash = repo
926            .store()
927            .put_tree(&Tree::from_entries(vec![
928                TreeEntry::file("lineage.txt", provenance_blob, false).unwrap(),
929            ]))
930            .expect("put provenance tree");
931        let gitlink_target: GitObjectId = "0303030303030303030303030303030303030303"
932            .parse()
933            .expect("git oid");
934        let root_tree_hash = repo
935            .store()
936            .put_tree(&Tree::from_entries(vec![
937                TreeEntry::file("secret.txt", redacted_blob, false).unwrap(),
938                TreeEntry::directory("nested", nested_tree_hash).unwrap(),
939                TreeEntry::gitlink("vendor", gitlink_target).unwrap(),
940            ]))
941            .expect("put root tree");
942        let state = State::new(
943            root_tree_hash,
944            vec![excluded_parent.change_id],
945            test_attribution(),
946        )
947        .with_context(context_tree_hash)
948        .with_provenance(provenance_tree_hash)
949        .with_risk_signals(risk_blob)
950        .with_review_signatures(review_blob)
951        .with_discussions(discussions_blob)
952        .with_structured_conflicts(conflicts_blob);
953        repo.store().put_state(&state).expect("put state");
954
955        repo.put_redaction(Redaction {
956            redacted_blob,
957            state: state.change_id,
958            path: "secret.txt".to_string(),
959            reason: "test leak".to_string(),
960            redactor: Principal::new("Tester", "tester@example.test"),
961            redacted_at: Utc::now(),
962            signature: None,
963            purged_at: None,
964            supersedes: None,
965        })
966        .expect("put redaction");
967        repo.put_state_visibility(StateVisibility {
968            state: state.change_id,
969            tier: VisibilityTier::Restricted {
970                scope_label: "security".to_string(),
971            },
972            embargo_until: None,
973            declarer: Principal::new("Tester", "tester@example.test"),
974            declared_at: Utc::now(),
975            signature: None,
976            supersedes: None,
977        })
978        .expect("put visibility");
979
980        let options = StateClosureOptions {
981            depth: None,
982            exclude_states: vec![excluded_parent.change_id],
983        };
984        let transfer = enumerate_state_closure_transfer_with_options(
985            repo.store(),
986            state.change_id,
987            options.clone(),
988            512,
989        )
990        .expect("transfer projection");
991
992        let full = enumerate_state_closure_with_options(
993            repo.store(),
994            state.change_id,
995            options.clone(),
996        )
997        .expect("full closure");
998        let plan = enumerate_state_closure_plan_with_options(
999            repo.store(),
1000            state.change_id,
1001            options,
1002        )
1003        .expect("plan closure");
1004        assert_eq!(
1005            transfer.full_objects.as_deref().map(object_info_fingerprint),
1006            Some(object_info_fingerprint(&full))
1007        );
1008        assert_eq!(transfer.planned_objects, plan);
1009
1010        let full_pairs = pairs_from_full(&full);
1011        assert_eq!(full_pairs, pairs_from_plan(&plan));
1012        assert_contains_object(
1013            &full_pairs,
1014            ObjectId::ChangeId(state.change_id),
1015            ObjectType::State,
1016        );
1017        assert_contains_object(
1018            &full_pairs,
1019            ObjectId::ChangeId(state.change_id),
1020            ObjectType::StateVisibility,
1021        );
1022        assert_contains_object(&full_pairs, ObjectId::Hash(redacted_blob), ObjectType::Blob);
1023        assert_contains_object(&full_pairs, ObjectId::Hash(redacted_blob), ObjectType::Redaction);
1024        for hash in [
1025            root_tree_hash,
1026            nested_tree_hash,
1027            context_tree_hash,
1028            provenance_tree_hash,
1029        ] {
1030            assert_contains_object(&full_pairs, ObjectId::Hash(hash), ObjectType::Tree);
1031        }
1032        for hash in [
1033            nested_blob,
1034            symlink_blob,
1035            context_blob,
1036            provenance_blob,
1037            risk_blob,
1038            review_blob,
1039            discussions_blob,
1040            conflicts_blob,
1041        ] {
1042            assert_contains_object(&full_pairs, ObjectId::Hash(hash), ObjectType::Blob);
1043        }
1044        assert!(!full_pairs.contains(&(
1045            ObjectId::ChangeId(excluded_parent.change_id),
1046            ObjectType::State
1047        )));
1048        assert!(!full_pairs.contains(&(ObjectId::Hash(excluded_tree_hash), ObjectType::Tree)));
1049        assert!(!full_pairs.contains(&(ObjectId::Hash(excluded_blob), ObjectType::Blob)));
1050    }
1051
1052    #[test]
1053    fn transfer_projection_reads_root_state_once_on_small_transfer() {
1054        let temp = TempDir::new().unwrap();
1055        let repo = Repository::init_default(temp.path()).unwrap();
1056        let blob = repo
1057            .store()
1058            .put_blob(&Blob::from("hello\n"))
1059            .expect("put blob");
1060        let tree_hash = repo
1061            .store()
1062            .put_tree(&Tree::from_entries(vec![
1063                TreeEntry::file("README.md", blob, false).unwrap(),
1064            ]))
1065            .expect("put tree");
1066        let state = State::new(tree_hash, Vec::new(), test_attribution());
1067        repo.store().put_state(&state).expect("put state");
1068        let store = CountingStore::new(repo.store());
1069
1070        let transfer = enumerate_state_closure_transfer_with_options(
1071            &store,
1072            state.change_id,
1073            StateClosureOptions::default(),
1074            512,
1075        )
1076        .expect("transfer projection");
1077
1078        assert!(
1079            !transfer.planned_objects.is_empty(),
1080            "lean projection should be available"
1081        );
1082        assert!(transfer.full_objects.is_some());
1083        assert_eq!(
1084            store.state_reads(),
1085            1,
1086            "small transfer projection must not read the root state through a second closure walk"
1087        );
1088    }
1089
1090    #[test]
1091    fn transfer_projection_drops_full_descriptors_after_threshold() {
1092        let temp = TempDir::new().unwrap();
1093        let repo = Repository::init_default(temp.path()).unwrap();
1094        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
1095        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1096
1097        let transfer = enumerate_state_closure_transfer_with_options(
1098            repo.store(),
1099            state.change_id,
1100            StateClosureOptions::default(),
1101            0,
1102        )
1103        .expect("transfer projection");
1104
1105        assert!(
1106            !transfer.planned_objects.is_empty(),
1107            "lean projection should still be available over the threshold"
1108        );
1109        assert!(transfer.full_objects.is_none());
1110    }
1111
1112    #[test]
1113    fn depth_and_exclude_options_match_between_full_and_plan() {
1114        let temp = TempDir::new().unwrap();
1115        let repo = Repository::init_default(temp.path()).unwrap();
1116        let path = temp.path().join("story.txt");
1117
1118        std::fs::write(&path, "base\n").unwrap();
1119        let base = repo.snapshot(Some("base".to_string()), None).unwrap();
1120        std::fs::write(&path, "middle\n").unwrap();
1121        let middle = repo.snapshot(Some("middle".to_string()), None).unwrap();
1122        std::fs::write(&path, "tip\n").unwrap();
1123        let tip = repo.snapshot(Some("tip".to_string()), None).unwrap();
1124
1125        let depth_zero = assert_plan_parity(
1126            &repo,
1127            tip.change_id,
1128            StateClosureOptions {
1129                depth: Some(0),
1130                exclude_states: Vec::new(),
1131            },
1132        );
1133        assert!(depth_zero.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
1134        assert!(!depth_zero.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State)));
1135        assert!(!depth_zero.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
1136
1137        let depth_one = assert_plan_parity(
1138            &repo,
1139            tip.change_id,
1140            StateClosureOptions {
1141                depth: Some(1),
1142                exclude_states: Vec::new(),
1143            },
1144        );
1145        assert!(depth_one.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
1146        assert!(depth_one.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State)));
1147        assert!(!depth_one.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
1148
1149        let exclude_middle = assert_plan_parity(
1150            &repo,
1151            tip.change_id,
1152            StateClosureOptions {
1153                depth: None,
1154                exclude_states: vec![middle.change_id],
1155            },
1156        );
1157        assert!(exclude_middle.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
1158        assert!(
1159            !exclude_middle.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State))
1160        );
1161        assert!(!exclude_middle.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
1162    }
1163
1164    #[test]
1165    fn shared_tree_and_blob_references_are_emitted_once() {
1166        let temp = TempDir::new().unwrap();
1167        let repo = Repository::init_default(temp.path()).unwrap();
1168
1169        let shared_blob = Blob::from("shared contents\n");
1170        let shared_blob_hash = repo.store().put_blob(&shared_blob).unwrap();
1171        let shared_tree = Tree::from_entries(vec![
1172            TreeEntry::file("shared.txt", shared_blob_hash, false).unwrap(),
1173        ]);
1174        let shared_tree_hash = repo.store().put_tree(&shared_tree).unwrap();
1175        let root = Tree::from_entries(vec![
1176            TreeEntry::directory("left", shared_tree_hash).unwrap(),
1177            TreeEntry::directory("right", shared_tree_hash).unwrap(),
1178        ]);
1179        let root_hash = repo.store().put_tree(&root).unwrap();
1180        let state = State::new(root_hash, Vec::new(), test_attribution());
1181        repo.store().put_state(&state).unwrap();
1182
1183        let full = enumerate_state_closure_with_options(
1184            repo.store(),
1185            state.change_id,
1186            StateClosureOptions::default(),
1187        )
1188        .unwrap();
1189        let plan = enumerate_state_closure_plan_with_options(
1190            repo.store(),
1191            state.change_id,
1192            StateClosureOptions::default(),
1193        )
1194        .unwrap();
1195
1196        assert_eq!(
1197            pairs_from_full(&full),
1198            pairs_from_plan(&plan),
1199            "full and lean closure enumerators must dedup the same objects"
1200        );
1201
1202        assert_eq!(
1203            full.iter()
1204                .filter(|info| info.id == ObjectId::Hash(root_hash)
1205                    && info.obj_type == ObjectType::Tree)
1206                .count(),
1207            1
1208        );
1209        assert_eq!(
1210            full.iter()
1211                .filter(|info| info.id == ObjectId::Hash(shared_tree_hash)
1212                    && info.obj_type == ObjectType::Tree)
1213                .count(),
1214            1
1215        );
1216        assert_eq!(
1217            full.iter()
1218                .filter(|info| info.id == ObjectId::Hash(shared_blob_hash)
1219                    && info.obj_type == ObjectType::Blob)
1220                .count(),
1221            1
1222        );
1223    }
1224
1225    #[test]
1226    fn state_closure_skips_gitlink_targets() {
1227        let temp = TempDir::new().unwrap();
1228        let repo = Repository::init_default(temp.path()).unwrap();
1229        let target: GitObjectId = "0303030303030303030303030303030303030303"
1230            .parse()
1231            .expect("git oid");
1232        let root = Tree::from_entries(vec![
1233            TreeEntry::gitlink("vendor", target).expect("gitlink entry"),
1234        ]);
1235        let root_hash = repo.store().put_tree(&root).unwrap();
1236        let state = State::new(root_hash, Vec::new(), test_attribution());
1237        repo.store().put_state(&state).unwrap();
1238
1239        let full = enumerate_state_closure_with_options(
1240            repo.store(),
1241            state.change_id,
1242            StateClosureOptions::default(),
1243        )
1244        .unwrap();
1245        let plan = enumerate_state_closure_plan_with_options(
1246            repo.store(),
1247            state.change_id,
1248            StateClosureOptions::default(),
1249        )
1250        .unwrap();
1251
1252        assert_eq!(pairs_from_full(&full), pairs_from_plan(&plan));
1253        assert!(
1254            !full.iter().any(|info| info.obj_type == ObjectType::Blob),
1255            "gitlinks carry foreign Git commit ids, not Heddle blob dependencies: {full:?}"
1256        );
1257        assert!(full.iter().any(|info| {
1258            info.id == ObjectId::Hash(root_hash) && info.obj_type == ObjectType::Tree
1259        }));
1260    }
1261
1262    #[test]
1263    fn missing_blobs_in_tree_skips_gitlinks_and_walks_nested_side_paths() {
1264        let temp = TempDir::new().unwrap();
1265        let repo = Repository::init_default(temp.path()).unwrap();
1266        let present_blob = repo
1267            .store()
1268            .put_blob(&Blob::from("already local"))
1269            .expect("put present blob");
1270        let missing_nested = ContentHash::from_bytes([7; 32]);
1271        let missing_symlink = ContentHash::from_bytes([8; 32]);
1272        let nested_tree = Tree::from_entries(vec![
1273            TreeEntry::file("remote.txt", missing_nested, false).unwrap(),
1274            TreeEntry::symlink("remote-link", missing_symlink).unwrap(),
1275        ]);
1276        let nested_tree_hash = repo.store().put_tree(&nested_tree).expect("put nested tree");
1277        let gitlink_target: GitObjectId = "0404040404040404040404040404040404040404"
1278            .parse()
1279            .expect("git oid");
1280        let root = Tree::from_entries(vec![
1281            TreeEntry::file("local.txt", present_blob, false).unwrap(),
1282            TreeEntry::directory("nested", nested_tree_hash).unwrap(),
1283            TreeEntry::gitlink("vendor", gitlink_target).unwrap(),
1284        ]);
1285        let root_hash = repo.store().put_tree(&root).expect("put root tree");
1286
1287        let missing = missing_blobs_in_tree(repo.store(), root_hash).expect("missing blobs");
1288
1289        assert_eq!(
1290            missing.into_iter().collect::<HashSet<_>>(),
1291            HashSet::from([missing_nested, missing_symlink])
1292        );
1293    }
1294
1295    /// Once a redaction is declared for a blob in a snapshot, the
1296    /// state closure must include an `ObjectType::Redaction` entry
1297    /// keyed on that blob's hash — that's the wire-side signal the
1298    /// receiver replays.
1299    #[test]
1300    fn enumerate_state_closure_emits_redaction_for_redacted_blob() {
1301        let temp = TempDir::new().unwrap();
1302        let repo = Repository::init_default(temp.path()).unwrap();
1303        std::fs::write(temp.path().join("secret.toml"), "api_token = \"x\"\n").unwrap();
1304        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1305
1306        // Find the blob hash for secret.toml by walking the snapshot's tree.
1307        let tree = repo
1308            .store()
1309            .get_tree(&state.tree)
1310            .unwrap()
1311            .expect("tree present");
1312        let blob_hash = tree
1313            .iter()
1314            .find(|e| e.name() == "secret.toml")
1315            .expect("entry present")
1316            .blob_hash()
1317            .expect("secret.toml is a blob");
1318
1319        let redaction = Redaction {
1320            redacted_blob: blob_hash,
1321            state: state.change_id,
1322            path: "secret.toml".to_string(),
1323            reason: "test leak".to_string(),
1324            redactor: Principal {
1325                name: "Tester".into(),
1326                email: "tester@heddle.sh".into(),
1327            },
1328            redacted_at: Utc::now(),
1329            signature: None,
1330            purged_at: None,
1331            supersedes: None,
1332        };
1333        repo.put_redaction(redaction).unwrap();
1334
1335        let full = enumerate_state_closure_with_options(
1336            repo.store(),
1337            state.change_id,
1338            StateClosureOptions::default(),
1339        )
1340        .unwrap();
1341        let plan = enumerate_state_closure_plan_with_options(
1342            repo.store(),
1343            state.change_id,
1344            StateClosureOptions::default(),
1345        )
1346        .unwrap();
1347
1348        assert!(
1349            full.iter()
1350                .any(|info| info.obj_type == ObjectType::Redaction
1351                    && info.id == ObjectId::Hash(blob_hash)),
1352            "full closure must include a Redaction entry for the redacted blob"
1353        );
1354        assert!(
1355            plan.iter()
1356                .any(|p| p.obj_type == ObjectType::Redaction && p.id == ObjectId::Hash(blob_hash)),
1357            "plan closure must include a Redaction entry for the redacted blob"
1358        );
1359    }
1360
1361    #[test]
1362    fn enumerate_state_closure_emits_state_visibility_for_visible_state() {
1363        let temp = TempDir::new().unwrap();
1364        let repo = Repository::init_default(temp.path()).unwrap();
1365        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
1366        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1367
1368        repo.put_state_visibility(StateVisibility {
1369            state: state.change_id,
1370            tier: VisibilityTier::Restricted {
1371                scope_label: "security-embargo".into(),
1372            },
1373            embargo_until: None,
1374            declarer: Principal {
1375                name: "Tester".into(),
1376                email: "tester@heddle.sh".into(),
1377            },
1378            declared_at: Utc::now(),
1379            signature: None,
1380            supersedes: None,
1381        })
1382        .unwrap();
1383
1384        let full = enumerate_state_closure_with_options(
1385            repo.store(),
1386            state.change_id,
1387            StateClosureOptions::default(),
1388        )
1389        .unwrap();
1390        let plan = enumerate_state_closure_plan_with_options(
1391            repo.store(),
1392            state.change_id,
1393            StateClosureOptions::default(),
1394        )
1395        .unwrap();
1396
1397        assert!(
1398            full.iter()
1399                .any(|info| info.obj_type == ObjectType::StateVisibility
1400                    && info.id == ObjectId::ChangeId(state.change_id)),
1401            "full closure must include a StateVisibility entry for the visible state"
1402        );
1403        assert!(
1404            plan.iter()
1405                .any(|p| p.obj_type == ObjectType::StateVisibility
1406                    && p.id == ObjectId::ChangeId(state.change_id)),
1407            "plan closure must include a StateVisibility entry for the visible state"
1408        );
1409    }
1410
1411    #[test]
1412    fn enumerate_state_closure_emits_state_metadata_blobs() {
1413        let temp = TempDir::new().unwrap();
1414        let repo = Repository::init_default(temp.path()).unwrap();
1415        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
1416        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
1417
1418        let principal = Principal::new("Tester", "tester@example.test");
1419        let discussion_bytes = DiscussionsBlob::new(vec![Discussion {
1420            id: "disc-1".to_string(),
1421            anchor: SymbolAnchor::new("src/lib.rs", "answer"),
1422            opened_against_state: state.change_id,
1423            opened_at: 1_782_400_000,
1424            thread_ref: None,
1425            turns: vec![DiscussionTurn {
1426                author: principal,
1427                body: "Should this sync?".to_string(),
1428                posted_at: 1_782_400_000,
1429            }],
1430            resolution: DiscussionResolution::Open,
1431            body_changed_since_open: false,
1432            orphaned: false,
1433            visibility: VisibilityTier::default(),
1434            resolved_annotation_id: None,
1435        }])
1436        .encode()
1437        .expect("encode discussions");
1438        let discussion_hash = repo
1439            .store()
1440            .put_blob(&Blob::new(discussion_bytes))
1441            .expect("put discussions blob");
1442        let risk_hash = repo
1443            .store()
1444            .put_blob(&Blob::from_slice(b"risk signals"))
1445            .expect("put risk blob");
1446        let review_hash = repo
1447            .store()
1448            .put_blob(&Blob::from_slice(b"review signatures"))
1449            .expect("put review blob");
1450        let conflicts_hash = repo
1451            .store()
1452            .put_blob(&Blob::from_slice(b"structured conflicts"))
1453            .expect("put conflicts blob");
1454        let state_with_metadata = state
1455            .with_risk_signals(risk_hash)
1456            .with_review_signatures(review_hash)
1457            .with_discussions(discussion_hash)
1458            .with_structured_conflicts(conflicts_hash);
1459        repo.store()
1460            .put_state(&state_with_metadata)
1461            .expect("put state with metadata");
1462
1463        let full = enumerate_state_closure_with_options(
1464            repo.store(),
1465            state_with_metadata.change_id,
1466            StateClosureOptions::default(),
1467        )
1468        .unwrap();
1469        let plan = enumerate_state_closure_plan_with_options(
1470            repo.store(),
1471            state_with_metadata.change_id,
1472            StateClosureOptions::default(),
1473        )
1474        .unwrap();
1475
1476        for metadata_hash in [risk_hash, review_hash, discussion_hash, conflicts_hash] {
1477            assert!(
1478                full.iter().any(|info| info.obj_type == ObjectType::Blob
1479                    && info.id == ObjectId::Hash(metadata_hash)),
1480                "full closure must include state metadata blob {metadata_hash}"
1481            );
1482            assert!(
1483                plan.iter().any(
1484                    |p| p.obj_type == ObjectType::Blob && p.id == ObjectId::Hash(metadata_hash)
1485                ),
1486                "plan closure must include state metadata blob {metadata_hash}"
1487            );
1488        }
1489    }
1490}