Skip to main content

wire/
object_graph.rs

1// SPDX-License-Identifier: Apache-2.0
2use std::collections::{HashSet, VecDeque};
3
4use objects::{
5    object::{ChangeId, ContentHash, EntryType},
6    store::ObjectStore,
7};
8use serde::{Deserialize, Serialize};
9
10use crate::{ProtocolError, Result};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum ObjectId {
14    Hash(ContentHash),
15    ChangeId(ChangeId),
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ObjectInfo {
20    pub id: ObjectId,
21    pub obj_type: ObjectType,
22    pub size: u64,
23    pub delta_base: Option<ContentHash>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27pub struct PlannedObject {
28    pub id: ObjectId,
29    pub obj_type: ObjectType,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33pub enum ObjectType {
34    Blob,
35    Tree,
36    State,
37    Action,
38    /// A `RedactionsBlob` sidecar — the rmp-encoded record(s) declaring
39    /// that a specific blob has been redacted (and possibly purged) by
40    /// an authorized operator. Keyed on the wire by `ObjectId::Hash` of
41    /// the *redacted blob*, since `Repository`'s sidecar store is
42    /// indexed that way.
43    Redaction,
44    /// A `StateVisibilityBlob` sidecar — the rmp-encoded record(s)
45    /// declaring a non-public audience tier for a specific state. Keyed
46    /// on the wire by `ObjectId::ChangeId` of the *state*, since the
47    /// per-state sidecar store is indexed that way. Like `Redaction`, it
48    /// is a sidecar record that lives outside the content-addressed pack
49    /// and ships via the per-object transfer path, not the pack.
50    StateVisibility,
51}
52
53#[derive(Debug, Clone, Default)]
54pub struct StateClosureOptions {
55    pub depth: Option<u32>,
56    pub exclude_states: Vec<ChangeId>,
57}
58
59pub fn enumerate_state_closure(
60    store: &impl ObjectStore,
61    state_id: ChangeId,
62) -> Result<Vec<ObjectInfo>> {
63    enumerate_state_closure_with_options(store, state_id, StateClosureOptions::default())
64}
65
66pub fn enumerate_state_closure_with_options(
67    store: &impl ObjectStore,
68    state_id: ChangeId,
69    options: StateClosureOptions,
70) -> Result<Vec<ObjectInfo>> {
71    let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
72
73    let mut out = Vec::new();
74    let mut seen_states: HashSet<ChangeId> = HashSet::new();
75    let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
76    let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
77    queue.push_back((state_id, 0));
78
79    while let Some((id, depth)) = queue.pop_front() {
80        if excluded_states.contains(&id) {
81            continue;
82        }
83        if !seen_states.insert(id) {
84            continue;
85        }
86
87        let state = store
88            .get_state(&id)?
89            .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
90
91        let state_bytes = rmp_serde::to_vec_named(&state)?;
92        out.push(ObjectInfo {
93            id: ObjectId::ChangeId(id),
94            obj_type: ObjectType::State,
95            size: state_bytes.len() as u64,
96            delta_base: None,
97        });
98        emit_state_visibility_info(store, &id, &mut out)?;
99
100        if options.depth.map(|max| depth < max).unwrap_or(true) {
101            for parent in &state.parents {
102                queue.push_back((*parent, depth + 1));
103            }
104        }
105
106        enumerate_tree_closure_filtered(
107            store,
108            state.tree,
109            &excluded_hashes,
110            &mut seen_hashes,
111            &mut out,
112        )?;
113        if let Some(provenance_root) = state.provenance {
114            enumerate_tree_closure_filtered(
115                store,
116                provenance_root,
117                &excluded_hashes,
118                &mut seen_hashes,
119                &mut out,
120            )?;
121        }
122        if let Some(context_root) = state.context {
123            enumerate_tree_closure_filtered(
124                store,
125                context_root,
126                &excluded_hashes,
127                &mut seen_hashes,
128                &mut out,
129            )?;
130        }
131    }
132
133    Ok(out)
134}
135
136pub fn enumerate_state_closure_plan(
137    store: &impl ObjectStore,
138    state_id: ChangeId,
139) -> Result<Vec<PlannedObject>> {
140    enumerate_state_closure_plan_with_options(store, state_id, StateClosureOptions::default())
141}
142
143pub fn enumerate_state_closure_plan_with_options(
144    store: &impl ObjectStore,
145    state_id: ChangeId,
146    options: StateClosureOptions,
147) -> Result<Vec<PlannedObject>> {
148    let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
149
150    let mut out = Vec::new();
151    let mut seen_states: HashSet<ChangeId> = HashSet::new();
152    let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
153    let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
154    queue.push_back((state_id, 0));
155
156    while let Some((id, depth)) = queue.pop_front() {
157        if excluded_states.contains(&id) {
158            continue;
159        }
160        if !seen_states.insert(id) {
161            continue;
162        }
163
164        let state = store
165            .get_state(&id)?
166            .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
167
168        out.push(PlannedObject {
169            id: ObjectId::ChangeId(id),
170            obj_type: ObjectType::State,
171        });
172        emit_state_visibility_plan(store, &id, &mut out)?;
173
174        if options.depth.map(|max| depth < max).unwrap_or(true) {
175            for parent in &state.parents {
176                queue.push_back((*parent, depth + 1));
177            }
178        }
179
180        enumerate_tree_plan_filtered(
181            store,
182            state.tree,
183            &excluded_hashes,
184            &mut seen_hashes,
185            &mut out,
186        )?;
187        if let Some(provenance_root) = state.provenance {
188            enumerate_tree_plan_filtered(
189                store,
190                provenance_root,
191                &excluded_hashes,
192                &mut seen_hashes,
193                &mut out,
194            )?;
195        }
196        if let Some(context_root) = state.context {
197            enumerate_tree_plan_filtered(
198                store,
199                context_root,
200                &excluded_hashes,
201                &mut seen_hashes,
202                &mut out,
203            )?;
204        }
205    }
206
207    Ok(out)
208}
209
210fn enumerate_tree_closure_filtered(
211    store: &impl ObjectStore,
212    tree_hash: ContentHash,
213    excluded: &HashSet<ContentHash>,
214    seen: &mut HashSet<ContentHash>,
215    out: &mut Vec<ObjectInfo>,
216) -> Result<()> {
217    if excluded.contains(&tree_hash) {
218        return Ok(());
219    }
220    if !seen.insert(tree_hash) {
221        return Ok(());
222    }
223
224    let tree = store
225        .get_tree(&tree_hash)?
226        .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
227
228    let tree_bytes = rmp_serde::to_vec_named(&tree)?;
229    out.push(ObjectInfo {
230        id: ObjectId::Hash(tree_hash),
231        obj_type: ObjectType::Tree,
232        size: tree_bytes.len() as u64,
233        delta_base: None,
234    });
235
236    for entry in tree.entries() {
237        match entry.entry_type {
238            EntryType::Blob => {
239                if excluded.contains(&entry.hash) {
240                    continue;
241                }
242                if !seen.insert(entry.hash) {
243                    continue;
244                }
245                let blob = store
246                    .get_blob(&entry.hash)?
247                    .ok_or_else(|| ProtocolError::ObjectNotFound(entry.hash.to_hex()))?;
248                out.push(ObjectInfo {
249                    id: ObjectId::Hash(entry.hash),
250                    obj_type: ObjectType::Blob,
251                    size: blob.size() as u64,
252                    delta_base: None,
253                });
254                emit_redaction_info(store, &entry.hash, out)?;
255            }
256            EntryType::Tree => {
257                enumerate_tree_closure_filtered(store, entry.hash, excluded, seen, out)?;
258            }
259            EntryType::Symlink => {
260                if excluded.contains(&entry.hash) {
261                    continue;
262                }
263                if !seen.insert(entry.hash) {
264                    continue;
265                }
266                let blob = store
267                    .get_blob(&entry.hash)?
268                    .ok_or_else(|| ProtocolError::ObjectNotFound(entry.hash.to_hex()))?;
269                out.push(ObjectInfo {
270                    id: ObjectId::Hash(entry.hash),
271                    obj_type: ObjectType::Blob,
272                    size: blob.size() as u64,
273                    delta_base: None,
274                });
275                emit_redaction_info(store, &entry.hash, out)?;
276            }
277        }
278    }
279
280    Ok(())
281}
282
283/// If `state` carries a state-visibility sidecar, push a StateVisibility
284/// `ObjectInfo` keyed by the state id. No-op when the state is public by
285/// absence.
286fn emit_state_visibility_info(
287    store: &impl ObjectStore,
288    state: &ChangeId,
289    out: &mut Vec<ObjectInfo>,
290) -> Result<()> {
291    if let Some(bytes) = store.get_state_visibility_bytes_for_state(state)? {
292        out.push(ObjectInfo {
293            id: ObjectId::ChangeId(*state),
294            obj_type: ObjectType::StateVisibility,
295            size: bytes.len() as u64,
296            delta_base: None,
297        });
298    }
299    Ok(())
300}
301
302fn emit_state_visibility_plan(
303    store: &impl ObjectStore,
304    state: &ChangeId,
305    out: &mut Vec<PlannedObject>,
306) -> Result<()> {
307    if store.has_state_visibility_for_state(state)? {
308        out.push(PlannedObject {
309            id: ObjectId::ChangeId(*state),
310            obj_type: ObjectType::StateVisibility,
311        });
312    }
313    Ok(())
314}
315
316/// If `blob` carries a redaction sidecar, push a Redaction `ObjectInfo`
317/// keyed by the blob hash. No-op when the blob has no redactions.
318///
319/// Redactions are not deduped via the `seen: HashSet<ContentHash>` used
320/// for blob/tree dedup because the `ObjectId` for a redaction is the
321/// *redacted blob's* hash — and that hash is already inserted into
322/// `seen` by the blob's own emission. A blob can only appear once in
323/// the closure (dedup'd by hash), so its redaction can only be emitted
324/// once too.
325fn emit_redaction_info(
326    store: &impl ObjectStore,
327    blob: &ContentHash,
328    out: &mut Vec<ObjectInfo>,
329) -> Result<()> {
330    if let Some(bytes) = store.get_redactions_bytes_for_blob(blob)? {
331        out.push(ObjectInfo {
332            id: ObjectId::Hash(*blob),
333            obj_type: ObjectType::Redaction,
334            size: bytes.len() as u64,
335            delta_base: None,
336        });
337    }
338    Ok(())
339}
340
341fn enumerate_tree_plan_filtered(
342    store: &impl ObjectStore,
343    tree_hash: ContentHash,
344    excluded: &HashSet<ContentHash>,
345    seen: &mut HashSet<ContentHash>,
346    out: &mut Vec<PlannedObject>,
347) -> Result<()> {
348    if excluded.contains(&tree_hash) {
349        return Ok(());
350    }
351    if !seen.insert(tree_hash) {
352        return Ok(());
353    }
354
355    let tree = store
356        .get_tree(&tree_hash)?
357        .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
358
359    out.push(PlannedObject {
360        id: ObjectId::Hash(tree_hash),
361        obj_type: ObjectType::Tree,
362    });
363
364    for entry in tree.entries() {
365        match entry.entry_type {
366            EntryType::Blob | EntryType::Symlink => {
367                if excluded.contains(&entry.hash) {
368                    continue;
369                }
370                if !seen.insert(entry.hash) {
371                    continue;
372                }
373                out.push(PlannedObject {
374                    id: ObjectId::Hash(entry.hash),
375                    obj_type: ObjectType::Blob,
376                });
377                emit_redaction_plan(store, &entry.hash, out)?;
378            }
379            EntryType::Tree => {
380                enumerate_tree_plan_filtered(store, entry.hash, excluded, seen, out)?;
381            }
382        }
383    }
384
385    Ok(())
386}
387
388fn emit_redaction_plan(
389    store: &impl ObjectStore,
390    blob: &ContentHash,
391    out: &mut Vec<PlannedObject>,
392) -> Result<()> {
393    if store.has_redactions_for_blob(blob)? {
394        out.push(PlannedObject {
395            id: ObjectId::Hash(*blob),
396            obj_type: ObjectType::Redaction,
397        });
398    }
399    Ok(())
400}
401
402fn collect_excluded(
403    store: &impl ObjectStore,
404    roots: &[ChangeId],
405) -> Result<(HashSet<ChangeId>, HashSet<ContentHash>)> {
406    if roots.is_empty() {
407        return Ok((HashSet::new(), HashSet::new()));
408    }
409
410    let mut excluded_states: HashSet<ChangeId> = HashSet::new();
411    let mut excluded_hashes: HashSet<ContentHash> = HashSet::new();
412    let mut queue: VecDeque<ChangeId> = VecDeque::new();
413
414    for id in roots {
415        queue.push_back(*id);
416    }
417
418    while let Some(id) = queue.pop_front() {
419        if !excluded_states.insert(id) {
420            continue;
421        }
422
423        let state = match store.get_state(&id)? {
424            Some(state) => state,
425            None => continue,
426        };
427
428        for parent in &state.parents {
429            queue.push_back(*parent);
430        }
431
432        collect_tree_hashes(store, state.tree, &mut excluded_hashes)?;
433        if let Some(provenance_root) = state.provenance {
434            collect_tree_hashes(store, provenance_root, &mut excluded_hashes)?;
435        }
436        if let Some(context_root) = state.context {
437            collect_tree_hashes(store, context_root, &mut excluded_hashes)?;
438        }
439    }
440
441    Ok((excluded_states, excluded_hashes))
442}
443
444fn collect_tree_hashes(
445    store: &impl ObjectStore,
446    tree_hash: ContentHash,
447    excluded: &mut HashSet<ContentHash>,
448) -> Result<()> {
449    if !excluded.insert(tree_hash) {
450        return Ok(());
451    }
452
453    let tree = match store.get_tree(&tree_hash)? {
454        Some(tree) => tree,
455        None => return Ok(()),
456    };
457
458    for entry in tree.entries() {
459        match entry.entry_type {
460            EntryType::Blob | EntryType::Symlink => {
461                excluded.insert(entry.hash);
462            }
463            EntryType::Tree => {
464                collect_tree_hashes(store, entry.hash, excluded)?;
465            }
466        }
467    }
468
469    Ok(())
470}
471
472pub fn is_ancestor(
473    store: &impl ObjectStore,
474    ancestor: ChangeId,
475    descendant: ChangeId,
476) -> Result<bool> {
477    if ancestor == descendant {
478        return Ok(true);
479    }
480
481    let mut seen: HashSet<ChangeId> = HashSet::new();
482    let mut queue: VecDeque<ChangeId> = VecDeque::new();
483    queue.push_back(descendant);
484
485    while let Some(id) = queue.pop_front() {
486        if !seen.insert(id) {
487            continue;
488        }
489        let state = match store.get_state(&id)? {
490            Some(s) => s,
491            None => return Ok(false),
492        };
493        for parent in state.parents {
494            if parent == ancestor {
495                return Ok(true);
496            }
497            queue.push_back(parent);
498        }
499    }
500
501    Ok(false)
502}
503
504#[cfg(test)]
505mod tests {
506    use chrono::Utc;
507    use objects::{
508        object::{Principal, Redaction, StateVisibility, VisibilityTier},
509        store::ObjectStore,
510    };
511    use repo::Repository;
512    use tempfile::TempDir;
513
514    use super::{
515        ObjectId, ObjectType, StateClosureOptions, enumerate_state_closure_plan_with_options,
516        enumerate_state_closure_with_options,
517    };
518
519    #[test]
520    fn lean_closure_planner_matches_object_info_ids_and_types() {
521        let temp = TempDir::new().unwrap();
522        let repo = Repository::init_default(temp.path()).unwrap();
523        std::fs::create_dir_all(temp.path().join("src")).unwrap();
524        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
525        std::fs::write(temp.path().join("src/lib.rs"), "pub fn hi() {}\n").unwrap();
526        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
527
528        let full = enumerate_state_closure_with_options(
529            repo.store(),
530            state.change_id,
531            StateClosureOptions::default(),
532        )
533        .unwrap();
534        let lean = enumerate_state_closure_plan_with_options(
535            repo.store(),
536            state.change_id,
537            StateClosureOptions::default(),
538        )
539        .unwrap();
540
541        let full_pairs = full
542            .into_iter()
543            .map(|info| (info.id, info.obj_type))
544            .collect::<std::collections::HashSet<_>>();
545        let lean_pairs = lean
546            .into_iter()
547            .map(|info| (info.id, info.obj_type))
548            .collect::<std::collections::HashSet<_>>();
549
550        assert_eq!(full_pairs, lean_pairs);
551        assert!(
552            full_pairs
553                .iter()
554                .any(|(id, _)| matches!(id, ObjectId::ChangeId(_)))
555        );
556    }
557
558    /// Once a redaction is declared for a blob in a snapshot, the
559    /// state closure must include an `ObjectType::Redaction` entry
560    /// keyed on that blob's hash — that's the wire-side signal the
561    /// receiver replays.
562    #[test]
563    fn enumerate_state_closure_emits_redaction_for_redacted_blob() {
564        let temp = TempDir::new().unwrap();
565        let repo = Repository::init_default(temp.path()).unwrap();
566        std::fs::write(temp.path().join("secret.toml"), "api_token = \"x\"\n").unwrap();
567        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
568
569        // Find the blob hash for secret.toml by walking the snapshot's tree.
570        let tree = repo
571            .store()
572            .get_tree(&state.tree)
573            .unwrap()
574            .expect("tree present");
575        let blob_hash = tree
576            .iter()
577            .find(|e| e.name == "secret.toml")
578            .expect("entry present")
579            .hash;
580
581        let redaction = Redaction {
582            redacted_blob: blob_hash,
583            state: state.change_id,
584            path: "secret.toml".to_string(),
585            reason: "test leak".to_string(),
586            redactor: Principal {
587                name: "Tester".into(),
588                email: "tester@heddle.sh".into(),
589            },
590            redacted_at: Utc::now(),
591            signature: None,
592            purged_at: None,
593            supersedes: None,
594        };
595        repo.put_redaction(redaction).unwrap();
596
597        let full = enumerate_state_closure_with_options(
598            repo.store(),
599            state.change_id,
600            StateClosureOptions::default(),
601        )
602        .unwrap();
603        let plan = enumerate_state_closure_plan_with_options(
604            repo.store(),
605            state.change_id,
606            StateClosureOptions::default(),
607        )
608        .unwrap();
609
610        assert!(
611            full.iter()
612                .any(|info| info.obj_type == ObjectType::Redaction
613                    && info.id == ObjectId::Hash(blob_hash)),
614            "full closure must include a Redaction entry for the redacted blob"
615        );
616        assert!(
617            plan.iter()
618                .any(|p| p.obj_type == ObjectType::Redaction && p.id == ObjectId::Hash(blob_hash)),
619            "plan closure must include a Redaction entry for the redacted blob"
620        );
621    }
622
623    #[test]
624    fn enumerate_state_closure_emits_state_visibility_for_visible_state() {
625        let temp = TempDir::new().unwrap();
626        let repo = Repository::init_default(temp.path()).unwrap();
627        std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
628        let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
629
630        repo.put_state_visibility(StateVisibility {
631            state: state.change_id,
632            tier: VisibilityTier::Restricted {
633                scope_label: "security-embargo".into(),
634            },
635            embargo_until: None,
636            declarer: Principal {
637                name: "Tester".into(),
638                email: "tester@heddle.sh".into(),
639            },
640            declared_at: Utc::now(),
641            signature: None,
642            supersedes: None,
643        })
644        .unwrap();
645
646        let full = enumerate_state_closure_with_options(
647            repo.store(),
648            state.change_id,
649            StateClosureOptions::default(),
650        )
651        .unwrap();
652        let plan = enumerate_state_closure_plan_with_options(
653            repo.store(),
654            state.change_id,
655            StateClosureOptions::default(),
656        )
657        .unwrap();
658
659        assert!(
660            full.iter()
661                .any(|info| info.obj_type == ObjectType::StateVisibility
662                    && info.id == ObjectId::ChangeId(state.change_id)),
663            "full closure must include a StateVisibility entry for the visible state"
664        );
665        assert!(
666            plan.iter()
667                .any(|p| p.obj_type == ObjectType::StateVisibility
668                    && p.id == ObjectId::ChangeId(state.change_id)),
669            "plan closure must include a StateVisibility entry for the visible state"
670        );
671    }
672}