Skip to main content

wire/
object_graph.rs

1// SPDX-License-Identifier: Apache-2.0
2use std::collections::{HashSet, VecDeque};
3
4use objects::{
5    object::{ChangeId, ContentHash, EntryType},
6    store::ObjectStore,
7};
8use serde::{Deserialize, Serialize};
9
10use crate::{ProtocolError, Result};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum ObjectId {
14    Hash(ContentHash),
15    ChangeId(ChangeId),
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ObjectInfo {
20    pub id: ObjectId,
21    pub obj_type: ObjectType,
22    pub size: u64,
23    pub delta_base: Option<ContentHash>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27pub struct PlannedObject {
28    pub id: ObjectId,
29    pub obj_type: ObjectType,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33pub enum ObjectType {
34    Blob,
35    Tree,
36    State,
37    Action,
38    /// A `RedactionsBlob` sidecar — the rmp-encoded record(s) declaring
39    /// that a specific blob has been redacted (and possibly purged) by
40    /// an authorized operator. Keyed on the wire by `ObjectId::Hash` of
41    /// the *redacted blob*, since `Repository`'s sidecar store is
42    /// indexed that way.
43    Redaction,
44    /// A `StateVisibilityBlob` sidecar — the rmp-encoded record(s)
45    /// declaring a non-public audience tier for a specific state. Keyed
46    /// on the wire by `ObjectId::ChangeId` of the *state*, since the
47    /// per-state sidecar store is indexed that way. Like `Redaction`, it
48    /// is a sidecar record that lives outside the content-addressed pack
49    /// and ships via the per-object transfer path, not the pack.
50    StateVisibility,
51}
52
53#[derive(Debug, Clone, Default)]
54pub struct StateClosureOptions {
55    pub depth: Option<u32>,
56    pub exclude_states: Vec<ChangeId>,
57}
58
59pub fn enumerate_state_closure(
60    store: &impl ObjectStore,
61    state_id: ChangeId,
62) -> Result<Vec<ObjectInfo>> {
63    enumerate_state_closure_with_options(store, state_id, StateClosureOptions::default())
64}
65
66pub fn enumerate_state_closure_with_options(
67    store: &impl ObjectStore,
68    state_id: ChangeId,
69    options: StateClosureOptions,
70) -> Result<Vec<ObjectInfo>> {
71    let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
72
73    let mut out = Vec::new();
74    let mut seen_states: HashSet<ChangeId> = HashSet::new();
75    let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
76    let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
77    queue.push_back((state_id, 0));
78
79    while let Some((id, depth)) = queue.pop_front() {
80        if excluded_states.contains(&id) {
81            continue;
82        }
83        if !seen_states.insert(id) {
84            continue;
85        }
86
87        let state = store
88            .get_state(&id)?
89            .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
90
91        let state_bytes = rmp_serde::to_vec_named(&state)?;
92        out.push(ObjectInfo {
93            id: ObjectId::ChangeId(id),
94            obj_type: ObjectType::State,
95            size: state_bytes.len() as u64,
96            delta_base: None,
97        });
98        emit_state_visibility_info(store, &id, &mut out)?;
99
100        if options.depth.map(|max| depth < max).unwrap_or(true) {
101            for parent in &state.parents {
102                queue.push_back((*parent, depth + 1));
103            }
104        }
105
106        enumerate_tree_closure_filtered(
107            store,
108            state.tree,
109            &excluded_hashes,
110            &mut seen_hashes,
111            &mut out,
112        )?;
113        if let Some(provenance_root) = state.provenance {
114            enumerate_tree_closure_filtered(
115                store,
116                provenance_root,
117                &excluded_hashes,
118                &mut seen_hashes,
119                &mut out,
120            )?;
121        }
122        if let Some(context_root) = state.context {
123            enumerate_tree_closure_filtered(
124                store,
125                context_root,
126                &excluded_hashes,
127                &mut seen_hashes,
128                &mut out,
129            )?;
130        }
131        if let Some(discussions_blob) = state.discussions {
132            enumerate_blob_filtered(
133                store,
134                discussions_blob,
135                &excluded_hashes,
136                &mut seen_hashes,
137                &mut out,
138            )?;
139        }
140    }
141
142    Ok(out)
143}
144
145pub fn enumerate_state_closure_plan(
146    store: &impl ObjectStore,
147    state_id: ChangeId,
148) -> Result<Vec<PlannedObject>> {
149    enumerate_state_closure_plan_with_options(store, state_id, StateClosureOptions::default())
150}
151
152pub fn enumerate_state_closure_plan_with_options(
153    store: &impl ObjectStore,
154    state_id: ChangeId,
155    options: StateClosureOptions,
156) -> Result<Vec<PlannedObject>> {
157    let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
158
159    let mut out = Vec::new();
160    let mut seen_states: HashSet<ChangeId> = HashSet::new();
161    let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
162    let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
163    queue.push_back((state_id, 0));
164
165    while let Some((id, depth)) = queue.pop_front() {
166        if excluded_states.contains(&id) {
167            continue;
168        }
169        if !seen_states.insert(id) {
170            continue;
171        }
172
173        let state = store
174            .get_state(&id)?
175            .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
176
177        out.push(PlannedObject {
178            id: ObjectId::ChangeId(id),
179            obj_type: ObjectType::State,
180        });
181        emit_state_visibility_plan(store, &id, &mut out)?;
182
183        if options.depth.map(|max| depth < max).unwrap_or(true) {
184            for parent in &state.parents {
185                queue.push_back((*parent, depth + 1));
186            }
187        }
188
189        enumerate_tree_plan_filtered(
190            store,
191            state.tree,
192            &excluded_hashes,
193            &mut seen_hashes,
194            &mut out,
195        )?;
196        if let Some(provenance_root) = state.provenance {
197            enumerate_tree_plan_filtered(
198                store,
199                provenance_root,
200                &excluded_hashes,
201                &mut seen_hashes,
202                &mut out,
203            )?;
204        }
205        if let Some(context_root) = state.context {
206            enumerate_tree_plan_filtered(
207                store,
208                context_root,
209                &excluded_hashes,
210                &mut seen_hashes,
211                &mut out,
212            )?;
213        }
214        if let Some(discussions_blob) = state.discussions {
215            enumerate_blob_plan_filtered(
216                store,
217                discussions_blob,
218                &excluded_hashes,
219                &mut seen_hashes,
220                &mut out,
221            )?;
222        }
223    }
224
225    Ok(out)
226}
227
228fn enumerate_tree_closure_filtered(
229    store: &impl ObjectStore,
230    tree_hash: ContentHash,
231    excluded: &HashSet<ContentHash>,
232    seen: &mut HashSet<ContentHash>,
233    out: &mut Vec<ObjectInfo>,
234) -> Result<()> {
235    if excluded.contains(&tree_hash) {
236        return Ok(());
237    }
238    if !seen.insert(tree_hash) {
239        return Ok(());
240    }
241
242    let tree = store
243        .get_tree(&tree_hash)?
244        .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
245
246    let tree_bytes = rmp_serde::to_vec_named(&tree)?;
247    out.push(ObjectInfo {
248        id: ObjectId::Hash(tree_hash),
249        obj_type: ObjectType::Tree,
250        size: tree_bytes.len() as u64,
251        delta_base: None,
252    });
253
254    for entry in tree.entries() {
255        match entry.entry_type {
256            EntryType::Blob => {
257                if excluded.contains(&entry.hash) {
258                    continue;
259                }
260                if !seen.insert(entry.hash) {
261                    continue;
262                }
263                let blob = store
264                    .get_blob(&entry.hash)?
265                    .ok_or_else(|| ProtocolError::ObjectNotFound(entry.hash.to_hex()))?;
266                out.push(ObjectInfo {
267                    id: ObjectId::Hash(entry.hash),
268                    obj_type: ObjectType::Blob,
269                    size: blob.size() as u64,
270                    delta_base: None,
271                });
272                emit_redaction_info(store, &entry.hash, out)?;
273            }
274            EntryType::Tree => {
275                enumerate_tree_closure_filtered(store, entry.hash, excluded, seen, out)?;
276            }
277            EntryType::Symlink => {
278                if excluded.contains(&entry.hash) {
279                    continue;
280                }
281                if !seen.insert(entry.hash) {
282                    continue;
283                }
284                let blob = store
285                    .get_blob(&entry.hash)?
286                    .ok_or_else(|| ProtocolError::ObjectNotFound(entry.hash.to_hex()))?;
287                out.push(ObjectInfo {
288                    id: ObjectId::Hash(entry.hash),
289                    obj_type: ObjectType::Blob,
290                    size: blob.size() as u64,
291                    delta_base: None,
292                });
293                emit_redaction_info(store, &entry.hash, out)?;
294            }
295        }
296    }
297
298    Ok(())
299}
300
301fn enumerate_blob_filtered(
302    store: &impl ObjectStore,
303    blob_hash: ContentHash,
304    excluded: &HashSet<ContentHash>,
305    seen: &mut HashSet<ContentHash>,
306    out: &mut Vec<ObjectInfo>,
307) -> Result<()> {
308    if excluded.contains(&blob_hash) || !seen.insert(blob_hash) {
309        return Ok(());
310    }
311    let blob = store
312        .get_blob(&blob_hash)?
313        .ok_or_else(|| ProtocolError::ObjectNotFound(blob_hash.to_hex()))?;
314    out.push(ObjectInfo {
315        id: ObjectId::Hash(blob_hash),
316        obj_type: ObjectType::Blob,
317        size: blob.size() as u64,
318        delta_base: None,
319    });
320    emit_redaction_info(store, &blob_hash, out)
321}
322
323/// If `state` carries a state-visibility sidecar, push a StateVisibility
324/// `ObjectInfo` keyed by the state id. No-op when the state is public by
325/// absence.
326fn emit_state_visibility_info(
327    store: &impl ObjectStore,
328    state: &ChangeId,
329    out: &mut Vec<ObjectInfo>,
330) -> Result<()> {
331    if let Some(bytes) = store.get_state_visibility_bytes_for_state(state)? {
332        out.push(ObjectInfo {
333            id: ObjectId::ChangeId(*state),
334            obj_type: ObjectType::StateVisibility,
335            size: bytes.len() as u64,
336            delta_base: None,
337        });
338    }
339    Ok(())
340}
341
342fn emit_state_visibility_plan(
343    store: &impl ObjectStore,
344    state: &ChangeId,
345    out: &mut Vec<PlannedObject>,
346) -> Result<()> {
347    if store.has_state_visibility_for_state(state)? {
348        out.push(PlannedObject {
349            id: ObjectId::ChangeId(*state),
350            obj_type: ObjectType::StateVisibility,
351        });
352    }
353    Ok(())
354}
355
356/// If `blob` carries a redaction sidecar, push a Redaction `ObjectInfo`
357/// keyed by the blob hash. No-op when the blob has no redactions.
358///
359/// Redactions are not deduped via the `seen: HashSet<ContentHash>` used
360/// for blob/tree dedup because the `ObjectId` for a redaction is the
361/// *redacted blob's* hash — and that hash is already inserted into
362/// `seen` by the blob's own emission. A blob can only appear once in
363/// the closure (dedup'd by hash), so its redaction can only be emitted
364/// once too.
365fn emit_redaction_info(
366    store: &impl ObjectStore,
367    blob: &ContentHash,
368    out: &mut Vec<ObjectInfo>,
369) -> Result<()> {
370    if let Some(bytes) = store.get_redactions_bytes_for_blob(blob)? {
371        out.push(ObjectInfo {
372            id: ObjectId::Hash(*blob),
373            obj_type: ObjectType::Redaction,
374            size: bytes.len() as u64,
375            delta_base: None,
376        });
377    }
378    Ok(())
379}
380
381fn enumerate_tree_plan_filtered(
382    store: &impl ObjectStore,
383    tree_hash: ContentHash,
384    excluded: &HashSet<ContentHash>,
385    seen: &mut HashSet<ContentHash>,
386    out: &mut Vec<PlannedObject>,
387) -> Result<()> {
388    if excluded.contains(&tree_hash) {
389        return Ok(());
390    }
391    if !seen.insert(tree_hash) {
392        return Ok(());
393    }
394
395    let tree = store
396        .get_tree(&tree_hash)?
397        .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
398
399    out.push(PlannedObject {
400        id: ObjectId::Hash(tree_hash),
401        obj_type: ObjectType::Tree,
402    });
403
404    for entry in tree.entries() {
405        match entry.entry_type {
406            EntryType::Blob | EntryType::Symlink => {
407                if excluded.contains(&entry.hash) {
408                    continue;
409                }
410                if !seen.insert(entry.hash) {
411                    continue;
412                }
413                out.push(PlannedObject {
414                    id: ObjectId::Hash(entry.hash),
415                    obj_type: ObjectType::Blob,
416                });
417                emit_redaction_plan(store, &entry.hash, out)?;
418            }
419            EntryType::Tree => {
420                enumerate_tree_plan_filtered(store, entry.hash, excluded, seen, out)?;
421            }
422        }
423    }
424
425    Ok(())
426}
427
428fn enumerate_blob_plan_filtered(
429    store: &impl ObjectStore,
430    blob_hash: ContentHash,
431    excluded: &HashSet<ContentHash>,
432    seen: &mut HashSet<ContentHash>,
433    out: &mut Vec<PlannedObject>,
434) -> Result<()> {
435    if excluded.contains(&blob_hash) || !seen.insert(blob_hash) {
436        return Ok(());
437    }
438    if store.get_blob(&blob_hash)?.is_none() {
439        return Err(ProtocolError::ObjectNotFound(blob_hash.to_hex()));
440    }
441    out.push(PlannedObject {
442        id: ObjectId::Hash(blob_hash),
443        obj_type: ObjectType::Blob,
444    });
445    emit_redaction_plan(store, &blob_hash, out)
446}
447
448fn emit_redaction_plan(
449    store: &impl ObjectStore,
450    blob: &ContentHash,
451    out: &mut Vec<PlannedObject>,
452) -> Result<()> {
453    if store.has_redactions_for_blob(blob)? {
454        out.push(PlannedObject {
455            id: ObjectId::Hash(*blob),
456            obj_type: ObjectType::Redaction,
457        });
458    }
459    Ok(())
460}
461
462fn collect_excluded(
463    store: &impl ObjectStore,
464    roots: &[ChangeId],
465) -> Result<(HashSet<ChangeId>, HashSet<ContentHash>)> {
466    if roots.is_empty() {
467        return Ok((HashSet::new(), HashSet::new()));
468    }
469
470    let mut excluded_states: HashSet<ChangeId> = HashSet::new();
471    let mut excluded_hashes: HashSet<ContentHash> = HashSet::new();
472    let mut queue: VecDeque<ChangeId> = VecDeque::new();
473
474    for id in roots {
475        queue.push_back(*id);
476    }
477
478    while let Some(id) = queue.pop_front() {
479        if !excluded_states.insert(id) {
480            continue;
481        }
482
483        let state = match store.get_state(&id)? {
484            Some(state) => state,
485            None => continue,
486        };
487
488        for parent in &state.parents {
489            queue.push_back(*parent);
490        }
491
492        collect_tree_hashes(store, state.tree, &mut excluded_hashes)?;
493        if let Some(provenance_root) = state.provenance {
494            collect_tree_hashes(store, provenance_root, &mut excluded_hashes)?;
495        }
496        if let Some(context_root) = state.context {
497            collect_tree_hashes(store, context_root, &mut excluded_hashes)?;
498        }
499        if let Some(discussions_blob) = state.discussions {
500            excluded_hashes.insert(discussions_blob);
501        }
502    }
503
504    Ok((excluded_states, excluded_hashes))
505}
506
507fn collect_tree_hashes(
508    store: &impl ObjectStore,
509    tree_hash: ContentHash,
510    excluded: &mut HashSet<ContentHash>,
511) -> Result<()> {
512    if !excluded.insert(tree_hash) {
513        return Ok(());
514    }
515
516    let tree = match store.get_tree(&tree_hash)? {
517        Some(tree) => tree,
518        None => return Ok(()),
519    };
520
521    for entry in tree.entries() {
522        match entry.entry_type {
523            EntryType::Blob | EntryType::Symlink => {
524                excluded.insert(entry.hash);
525            }
526            EntryType::Tree => {
527                collect_tree_hashes(store, entry.hash, excluded)?;
528            }
529        }
530    }
531
532    Ok(())
533}
534
535pub fn is_ancestor(
536    store: &impl ObjectStore,
537    ancestor: ChangeId,
538    descendant: ChangeId,
539) -> Result<bool> {
540    if ancestor == descendant {
541        return Ok(true);
542    }
543
544    let mut seen: HashSet<ChangeId> = HashSet::new();
545    let mut queue: VecDeque<ChangeId> = VecDeque::new();
546    queue.push_back(descendant);
547
548    while let Some(id) = queue.pop_front() {
549        if !seen.insert(id) {
550            continue;
551        }
552        let state = match store.get_state(&id)? {
553            Some(s) => s,
554            None => return Ok(false),
555        };
556        for parent in state.parents {
557            if parent == ancestor {
558                return Ok(true);
559            }
560            queue.push_back(parent);
561        }
562    }
563
564    Ok(false)
565}
566
567#[cfg(test)]
568mod tests {
569    use std::collections::HashSet;
570
571    use chrono::Utc;
572    use objects::{
573        object::{
574            Attribution, Blob, ChangeId, Discussion, DiscussionResolution, DiscussionTurn,
575            DiscussionsBlob, Principal, Redaction, State, StateVisibility, SymbolAnchor, Tree,
576            TreeEntry, VisibilityTier,
577        },
578        store::ObjectStore,
579    };
580    use repo::Repository;
581    use tempfile::TempDir;
582
583    use super::{
584        ObjectId, ObjectInfo, ObjectType, PlannedObject, StateClosureOptions,
585        enumerate_state_closure_plan_with_options, enumerate_state_closure_with_options,
586    };
587
588    fn pairs_from_full(objects: &[ObjectInfo]) -> HashSet<(ObjectId, ObjectType)> {
589        objects
590            .iter()
591            .map(|info| (info.id.clone(), info.obj_type))
592            .collect()
593    }
594
595    fn pairs_from_plan(objects: &[PlannedObject]) -> HashSet<(ObjectId, ObjectType)> {
596        objects
597            .iter()
598            .map(|info| (info.id.clone(), info.obj_type))
599            .collect()
600    }
601
602    fn assert_plan_parity(
603        repo: &Repository,
604        state_id: ChangeId,
605        options: StateClosureOptions,
606    ) -> HashSet<(ObjectId, ObjectType)> {
607        let full =
608            enumerate_state_closure_with_options(repo.store(), state_id, options.clone()).unwrap();
609        let plan =
610            enumerate_state_closure_plan_with_options(repo.store(), state_id, options).unwrap();
611
612        let full_pairs = pairs_from_full(&full);
613        let plan_pairs = pairs_from_plan(&plan);
614        assert_eq!(full_pairs, plan_pairs);
615        full_pairs
616    }
617
618    fn test_attribution() -> Attribution {
619        Attribution::human(Principal::new("Graph Tester", "graph@example.com"))
620    }
621
622    #[test]
623    fn lean_closure_planner_matches_object_info_ids_and_types() {
624        let temp = TempDir::new().unwrap();
625        let repo = Repository::init_default(temp.path()).unwrap();
626        std::fs::create_dir_all(temp.path().join("src")).unwrap();
627        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
628        std::fs::write(temp.path().join("src/lib.rs"), "pub fn hi() {}\n").unwrap();
629        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
630
631        let full = enumerate_state_closure_with_options(
632            repo.store(),
633            state.change_id,
634            StateClosureOptions::default(),
635        )
636        .unwrap();
637        let lean = enumerate_state_closure_plan_with_options(
638            repo.store(),
639            state.change_id,
640            StateClosureOptions::default(),
641        )
642        .unwrap();
643
644        let full_pairs = full
645            .into_iter()
646            .map(|info| (info.id, info.obj_type))
647            .collect::<std::collections::HashSet<_>>();
648        let lean_pairs = lean
649            .into_iter()
650            .map(|info| (info.id, info.obj_type))
651            .collect::<std::collections::HashSet<_>>();
652
653        assert_eq!(full_pairs, lean_pairs);
654        assert!(
655            full_pairs
656                .iter()
657                .any(|(id, _)| matches!(id, ObjectId::ChangeId(_)))
658        );
659    }
660
661    #[test]
662    fn depth_and_exclude_options_match_between_full_and_plan() {
663        let temp = TempDir::new().unwrap();
664        let repo = Repository::init_default(temp.path()).unwrap();
665        let path = temp.path().join("story.txt");
666
667        std::fs::write(&path, "base\n").unwrap();
668        let base = repo.snapshot(Some("base".to_string()), None).unwrap();
669        std::fs::write(&path, "middle\n").unwrap();
670        let middle = repo.snapshot(Some("middle".to_string()), None).unwrap();
671        std::fs::write(&path, "tip\n").unwrap();
672        let tip = repo.snapshot(Some("tip".to_string()), None).unwrap();
673
674        let depth_zero = assert_plan_parity(
675            &repo,
676            tip.change_id,
677            StateClosureOptions {
678                depth: Some(0),
679                exclude_states: Vec::new(),
680            },
681        );
682        assert!(depth_zero.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
683        assert!(!depth_zero.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State)));
684        assert!(!depth_zero.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
685
686        let depth_one = assert_plan_parity(
687            &repo,
688            tip.change_id,
689            StateClosureOptions {
690                depth: Some(1),
691                exclude_states: Vec::new(),
692            },
693        );
694        assert!(depth_one.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
695        assert!(depth_one.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State)));
696        assert!(!depth_one.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
697
698        let exclude_middle = assert_plan_parity(
699            &repo,
700            tip.change_id,
701            StateClosureOptions {
702                depth: None,
703                exclude_states: vec![middle.change_id],
704            },
705        );
706        assert!(exclude_middle.contains(&(ObjectId::ChangeId(tip.change_id), ObjectType::State)));
707        assert!(
708            !exclude_middle.contains(&(ObjectId::ChangeId(middle.change_id), ObjectType::State))
709        );
710        assert!(!exclude_middle.contains(&(ObjectId::ChangeId(base.change_id), ObjectType::State)));
711    }
712
713    #[test]
714    fn shared_tree_and_blob_references_are_emitted_once() {
715        let temp = TempDir::new().unwrap();
716        let repo = Repository::init_default(temp.path()).unwrap();
717
718        let shared_blob = Blob::from("shared contents\n");
719        let shared_blob_hash = repo.store().put_blob(&shared_blob).unwrap();
720        let shared_tree = Tree::from_entries(vec![
721            TreeEntry::file("shared.txt", shared_blob_hash, false).unwrap(),
722        ]);
723        let shared_tree_hash = repo.store().put_tree(&shared_tree).unwrap();
724        let root = Tree::from_entries(vec![
725            TreeEntry::directory("left", shared_tree_hash).unwrap(),
726            TreeEntry::directory("right", shared_tree_hash).unwrap(),
727        ]);
728        let root_hash = repo.store().put_tree(&root).unwrap();
729        let state = State::new(root_hash, Vec::new(), test_attribution());
730        repo.store().put_state(&state).unwrap();
731
732        let full = enumerate_state_closure_with_options(
733            repo.store(),
734            state.change_id,
735            StateClosureOptions::default(),
736        )
737        .unwrap();
738        let plan = enumerate_state_closure_plan_with_options(
739            repo.store(),
740            state.change_id,
741            StateClosureOptions::default(),
742        )
743        .unwrap();
744
745        assert_eq!(
746            pairs_from_full(&full),
747            pairs_from_plan(&plan),
748            "full and lean closure enumerators must dedup the same objects"
749        );
750
751        assert_eq!(
752            full.iter()
753                .filter(|info| info.id == ObjectId::Hash(root_hash)
754                    && info.obj_type == ObjectType::Tree)
755                .count(),
756            1
757        );
758        assert_eq!(
759            full.iter()
760                .filter(|info| info.id == ObjectId::Hash(shared_tree_hash)
761                    && info.obj_type == ObjectType::Tree)
762                .count(),
763            1
764        );
765        assert_eq!(
766            full.iter()
767                .filter(|info| info.id == ObjectId::Hash(shared_blob_hash)
768                    && info.obj_type == ObjectType::Blob)
769                .count(),
770            1
771        );
772    }
773
774    /// Once a redaction is declared for a blob in a snapshot, the
775    /// state closure must include an `ObjectType::Redaction` entry
776    /// keyed on that blob's hash — that's the wire-side signal the
777    /// receiver replays.
778    #[test]
779    fn enumerate_state_closure_emits_redaction_for_redacted_blob() {
780        let temp = TempDir::new().unwrap();
781        let repo = Repository::init_default(temp.path()).unwrap();
782        std::fs::write(temp.path().join("secret.toml"), "api_token = \"x\"\n").unwrap();
783        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
784
785        // Find the blob hash for secret.toml by walking the snapshot's tree.
786        let tree = repo
787            .store()
788            .get_tree(&state.tree)
789            .unwrap()
790            .expect("tree present");
791        let blob_hash = tree
792            .iter()
793            .find(|e| e.name == "secret.toml")
794            .expect("entry present")
795            .hash;
796
797        let redaction = Redaction {
798            redacted_blob: blob_hash,
799            state: state.change_id,
800            path: "secret.toml".to_string(),
801            reason: "test leak".to_string(),
802            redactor: Principal {
803                name: "Tester".into(),
804                email: "tester@heddle.sh".into(),
805            },
806            redacted_at: Utc::now(),
807            signature: None,
808            purged_at: None,
809            supersedes: None,
810        };
811        repo.put_redaction(redaction).unwrap();
812
813        let full = enumerate_state_closure_with_options(
814            repo.store(),
815            state.change_id,
816            StateClosureOptions::default(),
817        )
818        .unwrap();
819        let plan = enumerate_state_closure_plan_with_options(
820            repo.store(),
821            state.change_id,
822            StateClosureOptions::default(),
823        )
824        .unwrap();
825
826        assert!(
827            full.iter()
828                .any(|info| info.obj_type == ObjectType::Redaction
829                    && info.id == ObjectId::Hash(blob_hash)),
830            "full closure must include a Redaction entry for the redacted blob"
831        );
832        assert!(
833            plan.iter()
834                .any(|p| p.obj_type == ObjectType::Redaction && p.id == ObjectId::Hash(blob_hash)),
835            "plan closure must include a Redaction entry for the redacted blob"
836        );
837    }
838
839    #[test]
840    fn enumerate_state_closure_emits_state_visibility_for_visible_state() {
841        let temp = TempDir::new().unwrap();
842        let repo = Repository::init_default(temp.path()).unwrap();
843        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
844        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
845
846        repo.put_state_visibility(StateVisibility {
847            state: state.change_id,
848            tier: VisibilityTier::Restricted {
849                scope_label: "security-embargo".into(),
850            },
851            embargo_until: None,
852            declarer: Principal {
853                name: "Tester".into(),
854                email: "tester@heddle.sh".into(),
855            },
856            declared_at: Utc::now(),
857            signature: None,
858            supersedes: None,
859        })
860        .unwrap();
861
862        let full = enumerate_state_closure_with_options(
863            repo.store(),
864            state.change_id,
865            StateClosureOptions::default(),
866        )
867        .unwrap();
868        let plan = enumerate_state_closure_plan_with_options(
869            repo.store(),
870            state.change_id,
871            StateClosureOptions::default(),
872        )
873        .unwrap();
874
875        assert!(
876            full.iter()
877                .any(|info| info.obj_type == ObjectType::StateVisibility
878                    && info.id == ObjectId::ChangeId(state.change_id)),
879            "full closure must include a StateVisibility entry for the visible state"
880        );
881        assert!(
882            plan.iter()
883                .any(|p| p.obj_type == ObjectType::StateVisibility
884                    && p.id == ObjectId::ChangeId(state.change_id)),
885            "plan closure must include a StateVisibility entry for the visible state"
886        );
887    }
888
889    #[test]
890    fn enumerate_state_closure_emits_discussions_blob() {
891        let temp = TempDir::new().unwrap();
892        let repo = Repository::init_default(temp.path()).unwrap();
893        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
894        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
895
896        let principal = Principal::new("Tester", "tester@example.test");
897        let discussion_bytes = DiscussionsBlob::new(vec![Discussion {
898            id: "disc-1".to_string(),
899            anchor: SymbolAnchor::new("src/lib.rs", "answer"),
900            opened_against_state: state.change_id,
901            opened_at: 1_782_400_000,
902            thread_ref: None,
903            turns: vec![DiscussionTurn {
904                author: principal,
905                body: "Should this sync?".to_string(),
906                posted_at: 1_782_400_000,
907            }],
908            resolution: DiscussionResolution::Open,
909            body_changed_since_open: false,
910            orphaned: false,
911            visibility: VisibilityTier::default(),
912            resolved_annotation_id: None,
913        }])
914        .encode()
915        .expect("encode discussions");
916        let discussion_hash = repo
917            .store()
918            .put_blob(&Blob::new(discussion_bytes))
919            .expect("put discussions blob");
920        let state_with_discussions = state.with_discussions(discussion_hash);
921        repo.store()
922            .put_state(&state_with_discussions)
923            .expect("put state with discussions");
924
925        let full = enumerate_state_closure_with_options(
926            repo.store(),
927            state_with_discussions.change_id,
928            StateClosureOptions::default(),
929        )
930        .unwrap();
931        let plan = enumerate_state_closure_plan_with_options(
932            repo.store(),
933            state_with_discussions.change_id,
934            StateClosureOptions::default(),
935        )
936        .unwrap();
937
938        assert!(
939            full.iter().any(|info| info.obj_type == ObjectType::Blob
940                && info.id == ObjectId::Hash(discussion_hash)),
941            "full closure must include the discussions blob referenced by the state"
942        );
943        assert!(
944            plan.iter()
945                .any(|p| p.obj_type == ObjectType::Blob && p.id == ObjectId::Hash(discussion_hash)),
946            "plan closure must include the discussions blob referenced by the state"
947        );
948    }
949}