1use std::collections::{HashSet, VecDeque};
3
4use objects::{
5 object::{ChangeId, ContentHash, EntryType},
6 store::ObjectStore,
7};
8use serde::{Deserialize, Serialize};
9
10use crate::{ProtocolError, Result};
11
12#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum ObjectId {
14 Hash(ContentHash),
15 ChangeId(ChangeId),
16}
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct ObjectInfo {
20 pub id: ObjectId,
21 pub obj_type: ObjectType,
22 pub size: u64,
23 pub delta_base: Option<ContentHash>,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
27pub struct PlannedObject {
28 pub id: ObjectId,
29 pub obj_type: ObjectType,
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33pub enum ObjectType {
34 Blob,
35 Tree,
36 State,
37 Action,
38 Redaction,
44 StateVisibility,
51}
52
53#[derive(Debug, Clone, Default)]
54pub struct StateClosureOptions {
55 pub depth: Option<u32>,
56 pub exclude_states: Vec<ChangeId>,
57}
58
59pub fn enumerate_state_closure(
60 store: &impl ObjectStore,
61 state_id: ChangeId,
62) -> Result<Vec<ObjectInfo>> {
63 enumerate_state_closure_with_options(store, state_id, StateClosureOptions::default())
64}
65
66pub fn enumerate_state_closure_with_options(
67 store: &impl ObjectStore,
68 state_id: ChangeId,
69 options: StateClosureOptions,
70) -> Result<Vec<ObjectInfo>> {
71 let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
72
73 let mut out = Vec::new();
74 let mut seen_states: HashSet<ChangeId> = HashSet::new();
75 let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
76 let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
77 queue.push_back((state_id, 0));
78
79 while let Some((id, depth)) = queue.pop_front() {
80 if excluded_states.contains(&id) {
81 continue;
82 }
83 if !seen_states.insert(id) {
84 continue;
85 }
86
87 let state = store
88 .get_state(&id)?
89 .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
90
91 let state_bytes = rmp_serde::to_vec_named(&state)?;
92 out.push(ObjectInfo {
93 id: ObjectId::ChangeId(id),
94 obj_type: ObjectType::State,
95 size: state_bytes.len() as u64,
96 delta_base: None,
97 });
98 emit_state_visibility_info(store, &id, &mut out)?;
99
100 if options.depth.map(|max| depth < max).unwrap_or(true) {
101 for parent in &state.parents {
102 queue.push_back((*parent, depth + 1));
103 }
104 }
105
106 enumerate_tree_closure_filtered(
107 store,
108 state.tree,
109 &excluded_hashes,
110 &mut seen_hashes,
111 &mut out,
112 )?;
113 if let Some(provenance_root) = state.provenance {
114 enumerate_tree_closure_filtered(
115 store,
116 provenance_root,
117 &excluded_hashes,
118 &mut seen_hashes,
119 &mut out,
120 )?;
121 }
122 if let Some(context_root) = state.context {
123 enumerate_tree_closure_filtered(
124 store,
125 context_root,
126 &excluded_hashes,
127 &mut seen_hashes,
128 &mut out,
129 )?;
130 }
131 }
132
133 Ok(out)
134}
135
136pub fn enumerate_state_closure_plan(
137 store: &impl ObjectStore,
138 state_id: ChangeId,
139) -> Result<Vec<PlannedObject>> {
140 enumerate_state_closure_plan_with_options(store, state_id, StateClosureOptions::default())
141}
142
143pub fn enumerate_state_closure_plan_with_options(
144 store: &impl ObjectStore,
145 state_id: ChangeId,
146 options: StateClosureOptions,
147) -> Result<Vec<PlannedObject>> {
148 let (excluded_states, excluded_hashes) = collect_excluded(store, &options.exclude_states)?;
149
150 let mut out = Vec::new();
151 let mut seen_states: HashSet<ChangeId> = HashSet::new();
152 let mut seen_hashes: HashSet<ContentHash> = HashSet::new();
153 let mut queue: VecDeque<(ChangeId, u32)> = VecDeque::new();
154 queue.push_back((state_id, 0));
155
156 while let Some((id, depth)) = queue.pop_front() {
157 if excluded_states.contains(&id) {
158 continue;
159 }
160 if !seen_states.insert(id) {
161 continue;
162 }
163
164 let state = store
165 .get_state(&id)?
166 .ok_or_else(|| ProtocolError::ObjectNotFound(id.to_string()))?;
167
168 out.push(PlannedObject {
169 id: ObjectId::ChangeId(id),
170 obj_type: ObjectType::State,
171 });
172 emit_state_visibility_plan(store, &id, &mut out)?;
173
174 if options.depth.map(|max| depth < max).unwrap_or(true) {
175 for parent in &state.parents {
176 queue.push_back((*parent, depth + 1));
177 }
178 }
179
180 enumerate_tree_plan_filtered(
181 store,
182 state.tree,
183 &excluded_hashes,
184 &mut seen_hashes,
185 &mut out,
186 )?;
187 if let Some(provenance_root) = state.provenance {
188 enumerate_tree_plan_filtered(
189 store,
190 provenance_root,
191 &excluded_hashes,
192 &mut seen_hashes,
193 &mut out,
194 )?;
195 }
196 if let Some(context_root) = state.context {
197 enumerate_tree_plan_filtered(
198 store,
199 context_root,
200 &excluded_hashes,
201 &mut seen_hashes,
202 &mut out,
203 )?;
204 }
205 }
206
207 Ok(out)
208}
209
210fn enumerate_tree_closure_filtered(
211 store: &impl ObjectStore,
212 tree_hash: ContentHash,
213 excluded: &HashSet<ContentHash>,
214 seen: &mut HashSet<ContentHash>,
215 out: &mut Vec<ObjectInfo>,
216) -> Result<()> {
217 if excluded.contains(&tree_hash) {
218 return Ok(());
219 }
220 if !seen.insert(tree_hash) {
221 return Ok(());
222 }
223
224 let tree = store
225 .get_tree(&tree_hash)?
226 .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
227
228 let tree_bytes = rmp_serde::to_vec_named(&tree)?;
229 out.push(ObjectInfo {
230 id: ObjectId::Hash(tree_hash),
231 obj_type: ObjectType::Tree,
232 size: tree_bytes.len() as u64,
233 delta_base: None,
234 });
235
236 for entry in tree.entries() {
237 match entry.entry_type {
238 EntryType::Blob => {
239 if excluded.contains(&entry.hash) {
240 continue;
241 }
242 if !seen.insert(entry.hash) {
243 continue;
244 }
245 let blob = store
246 .get_blob(&entry.hash)?
247 .ok_or_else(|| ProtocolError::ObjectNotFound(entry.hash.to_hex()))?;
248 out.push(ObjectInfo {
249 id: ObjectId::Hash(entry.hash),
250 obj_type: ObjectType::Blob,
251 size: blob.size() as u64,
252 delta_base: None,
253 });
254 emit_redaction_info(store, &entry.hash, out)?;
255 }
256 EntryType::Tree => {
257 enumerate_tree_closure_filtered(store, entry.hash, excluded, seen, out)?;
258 }
259 EntryType::Symlink => {
260 if excluded.contains(&entry.hash) {
261 continue;
262 }
263 if !seen.insert(entry.hash) {
264 continue;
265 }
266 let blob = store
267 .get_blob(&entry.hash)?
268 .ok_or_else(|| ProtocolError::ObjectNotFound(entry.hash.to_hex()))?;
269 out.push(ObjectInfo {
270 id: ObjectId::Hash(entry.hash),
271 obj_type: ObjectType::Blob,
272 size: blob.size() as u64,
273 delta_base: None,
274 });
275 emit_redaction_info(store, &entry.hash, out)?;
276 }
277 }
278 }
279
280 Ok(())
281}
282
283fn emit_state_visibility_info(
287 store: &impl ObjectStore,
288 state: &ChangeId,
289 out: &mut Vec<ObjectInfo>,
290) -> Result<()> {
291 if let Some(bytes) = store.get_state_visibility_bytes_for_state(state)? {
292 out.push(ObjectInfo {
293 id: ObjectId::ChangeId(*state),
294 obj_type: ObjectType::StateVisibility,
295 size: bytes.len() as u64,
296 delta_base: None,
297 });
298 }
299 Ok(())
300}
301
302fn emit_state_visibility_plan(
303 store: &impl ObjectStore,
304 state: &ChangeId,
305 out: &mut Vec<PlannedObject>,
306) -> Result<()> {
307 if store.has_state_visibility_for_state(state)? {
308 out.push(PlannedObject {
309 id: ObjectId::ChangeId(*state),
310 obj_type: ObjectType::StateVisibility,
311 });
312 }
313 Ok(())
314}
315
316fn emit_redaction_info(
326 store: &impl ObjectStore,
327 blob: &ContentHash,
328 out: &mut Vec<ObjectInfo>,
329) -> Result<()> {
330 if let Some(bytes) = store.get_redactions_bytes_for_blob(blob)? {
331 out.push(ObjectInfo {
332 id: ObjectId::Hash(*blob),
333 obj_type: ObjectType::Redaction,
334 size: bytes.len() as u64,
335 delta_base: None,
336 });
337 }
338 Ok(())
339}
340
341fn enumerate_tree_plan_filtered(
342 store: &impl ObjectStore,
343 tree_hash: ContentHash,
344 excluded: &HashSet<ContentHash>,
345 seen: &mut HashSet<ContentHash>,
346 out: &mut Vec<PlannedObject>,
347) -> Result<()> {
348 if excluded.contains(&tree_hash) {
349 return Ok(());
350 }
351 if !seen.insert(tree_hash) {
352 return Ok(());
353 }
354
355 let tree = store
356 .get_tree(&tree_hash)?
357 .ok_or_else(|| ProtocolError::ObjectNotFound(tree_hash.to_hex()))?;
358
359 out.push(PlannedObject {
360 id: ObjectId::Hash(tree_hash),
361 obj_type: ObjectType::Tree,
362 });
363
364 for entry in tree.entries() {
365 match entry.entry_type {
366 EntryType::Blob | EntryType::Symlink => {
367 if excluded.contains(&entry.hash) {
368 continue;
369 }
370 if !seen.insert(entry.hash) {
371 continue;
372 }
373 out.push(PlannedObject {
374 id: ObjectId::Hash(entry.hash),
375 obj_type: ObjectType::Blob,
376 });
377 emit_redaction_plan(store, &entry.hash, out)?;
378 }
379 EntryType::Tree => {
380 enumerate_tree_plan_filtered(store, entry.hash, excluded, seen, out)?;
381 }
382 }
383 }
384
385 Ok(())
386}
387
388fn emit_redaction_plan(
389 store: &impl ObjectStore,
390 blob: &ContentHash,
391 out: &mut Vec<PlannedObject>,
392) -> Result<()> {
393 if store.has_redactions_for_blob(blob)? {
394 out.push(PlannedObject {
395 id: ObjectId::Hash(*blob),
396 obj_type: ObjectType::Redaction,
397 });
398 }
399 Ok(())
400}
401
402fn collect_excluded(
403 store: &impl ObjectStore,
404 roots: &[ChangeId],
405) -> Result<(HashSet<ChangeId>, HashSet<ContentHash>)> {
406 if roots.is_empty() {
407 return Ok((HashSet::new(), HashSet::new()));
408 }
409
410 let mut excluded_states: HashSet<ChangeId> = HashSet::new();
411 let mut excluded_hashes: HashSet<ContentHash> = HashSet::new();
412 let mut queue: VecDeque<ChangeId> = VecDeque::new();
413
414 for id in roots {
415 queue.push_back(*id);
416 }
417
418 while let Some(id) = queue.pop_front() {
419 if !excluded_states.insert(id) {
420 continue;
421 }
422
423 let state = match store.get_state(&id)? {
424 Some(state) => state,
425 None => continue,
426 };
427
428 for parent in &state.parents {
429 queue.push_back(*parent);
430 }
431
432 collect_tree_hashes(store, state.tree, &mut excluded_hashes)?;
433 if let Some(provenance_root) = state.provenance {
434 collect_tree_hashes(store, provenance_root, &mut excluded_hashes)?;
435 }
436 if let Some(context_root) = state.context {
437 collect_tree_hashes(store, context_root, &mut excluded_hashes)?;
438 }
439 }
440
441 Ok((excluded_states, excluded_hashes))
442}
443
444fn collect_tree_hashes(
445 store: &impl ObjectStore,
446 tree_hash: ContentHash,
447 excluded: &mut HashSet<ContentHash>,
448) -> Result<()> {
449 if !excluded.insert(tree_hash) {
450 return Ok(());
451 }
452
453 let tree = match store.get_tree(&tree_hash)? {
454 Some(tree) => tree,
455 None => return Ok(()),
456 };
457
458 for entry in tree.entries() {
459 match entry.entry_type {
460 EntryType::Blob | EntryType::Symlink => {
461 excluded.insert(entry.hash);
462 }
463 EntryType::Tree => {
464 collect_tree_hashes(store, entry.hash, excluded)?;
465 }
466 }
467 }
468
469 Ok(())
470}
471
472pub fn is_ancestor(
473 store: &impl ObjectStore,
474 ancestor: ChangeId,
475 descendant: ChangeId,
476) -> Result<bool> {
477 if ancestor == descendant {
478 return Ok(true);
479 }
480
481 let mut seen: HashSet<ChangeId> = HashSet::new();
482 let mut queue: VecDeque<ChangeId> = VecDeque::new();
483 queue.push_back(descendant);
484
485 while let Some(id) = queue.pop_front() {
486 if !seen.insert(id) {
487 continue;
488 }
489 let state = match store.get_state(&id)? {
490 Some(s) => s,
491 None => return Ok(false),
492 };
493 for parent in state.parents {
494 if parent == ancestor {
495 return Ok(true);
496 }
497 queue.push_back(parent);
498 }
499 }
500
501 Ok(false)
502}
503
504#[cfg(test)]
505mod tests {
506 use chrono::Utc;
507 use objects::{
508 object::{Principal, Redaction, StateVisibility, VisibilityTier},
509 store::ObjectStore,
510 };
511 use repo::Repository;
512 use tempfile::TempDir;
513
514 use super::{
515 ObjectId, ObjectType, StateClosureOptions, enumerate_state_closure_plan_with_options,
516 enumerate_state_closure_with_options,
517 };
518
519 #[test]
520 fn lean_closure_planner_matches_object_info_ids_and_types() {
521 let temp = TempDir::new().unwrap();
522 let repo = Repository::init_default(temp.path()).unwrap();
523 std::fs::create_dir_all(temp.path().join("src")).unwrap();
524 std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
525 std::fs::write(temp.path().join("src/lib.rs"), "pub fn hi() {}\n").unwrap();
526 let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
527
528 let full = enumerate_state_closure_with_options(
529 repo.store(),
530 state.change_id,
531 StateClosureOptions::default(),
532 )
533 .unwrap();
534 let lean = enumerate_state_closure_plan_with_options(
535 repo.store(),
536 state.change_id,
537 StateClosureOptions::default(),
538 )
539 .unwrap();
540
541 let full_pairs = full
542 .into_iter()
543 .map(|info| (info.id, info.obj_type))
544 .collect::<std::collections::HashSet<_>>();
545 let lean_pairs = lean
546 .into_iter()
547 .map(|info| (info.id, info.obj_type))
548 .collect::<std::collections::HashSet<_>>();
549
550 assert_eq!(full_pairs, lean_pairs);
551 assert!(
552 full_pairs
553 .iter()
554 .any(|(id, _)| matches!(id, ObjectId::ChangeId(_)))
555 );
556 }
557
558 #[test]
563 fn enumerate_state_closure_emits_redaction_for_redacted_blob() {
564 let temp = TempDir::new().unwrap();
565 let repo = Repository::init_default(temp.path()).unwrap();
566 std::fs::write(temp.path().join("secret.toml"), "api_token = \"x\"\n").unwrap();
567 let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
568
569 let tree = repo
571 .store()
572 .get_tree(&state.tree)
573 .unwrap()
574 .expect("tree present");
575 let blob_hash = tree
576 .iter()
577 .find(|e| e.name == "secret.toml")
578 .expect("entry present")
579 .hash;
580
581 let redaction = Redaction {
582 redacted_blob: blob_hash,
583 state: state.change_id,
584 path: "secret.toml".to_string(),
585 reason: "test leak".to_string(),
586 redactor: Principal {
587 name: "Tester".into(),
588 email: "tester@heddle.sh".into(),
589 },
590 redacted_at: Utc::now(),
591 signature: None,
592 purged_at: None,
593 supersedes: None,
594 };
595 repo.put_redaction(redaction).unwrap();
596
597 let full = enumerate_state_closure_with_options(
598 repo.store(),
599 state.change_id,
600 StateClosureOptions::default(),
601 )
602 .unwrap();
603 let plan = enumerate_state_closure_plan_with_options(
604 repo.store(),
605 state.change_id,
606 StateClosureOptions::default(),
607 )
608 .unwrap();
609
610 assert!(
611 full.iter()
612 .any(|info| info.obj_type == ObjectType::Redaction
613 && info.id == ObjectId::Hash(blob_hash)),
614 "full closure must include a Redaction entry for the redacted blob"
615 );
616 assert!(
617 plan.iter()
618 .any(|p| p.obj_type == ObjectType::Redaction && p.id == ObjectId::Hash(blob_hash)),
619 "plan closure must include a Redaction entry for the redacted blob"
620 );
621 }
622
623 #[test]
624 fn enumerate_state_closure_emits_state_visibility_for_visible_state() {
625 let temp = TempDir::new().unwrap();
626 let repo = Repository::init_default(temp.path()).unwrap();
627 std::fs::write(temp.path().join("README.md"), "hello\n").unwrap();
628 let state = repo.snapshot(Some("seed".to_string()), None).unwrap();
629
630 repo.put_state_visibility(StateVisibility {
631 state: state.change_id,
632 tier: VisibilityTier::Restricted {
633 scope_label: "security-embargo".into(),
634 },
635 embargo_until: None,
636 declarer: Principal {
637 name: "Tester".into(),
638 email: "tester@heddle.sh".into(),
639 },
640 declared_at: Utc::now(),
641 signature: None,
642 supersedes: None,
643 })
644 .unwrap();
645
646 let full = enumerate_state_closure_with_options(
647 repo.store(),
648 state.change_id,
649 StateClosureOptions::default(),
650 )
651 .unwrap();
652 let plan = enumerate_state_closure_plan_with_options(
653 repo.store(),
654 state.change_id,
655 StateClosureOptions::default(),
656 )
657 .unwrap();
658
659 assert!(
660 full.iter()
661 .any(|info| info.obj_type == ObjectType::StateVisibility
662 && info.id == ObjectId::ChangeId(state.change_id)),
663 "full closure must include a StateVisibility entry for the visible state"
664 );
665 assert!(
666 plan.iter()
667 .any(|p| p.obj_type == ObjectType::StateVisibility
668 && p.id == ObjectId::ChangeId(state.change_id)),
669 "plan closure must include a StateVisibility entry for the visible state"
670 );
671 }
672}