1use std::io::Write;
15use std::sync::Arc;
16
17use tracing::{info, warn};
18
19use super::gc_output::{format_mark_outcome, format_sweep_outcome};
20use super::{ManageError, Prompter};
21use crate::git::RefName;
22use crate::keys;
23use crate::object_store::ObjectStore;
24use crate::packchain::PackchainError;
25use crate::packchain::audit::{self, AuditReport, BranchRow};
26use crate::packchain::compact::{
27 self, CompactAction, CompactOpts as PackchainCompactOpts, CompactOutcome,
28};
29use crate::packchain::gc;
30use crate::protocol::push::{resolve_lock_ttl_seconds, saturating_duration_seconds};
31
32#[derive(Debug, Clone, Default)]
34pub struct CompactOpts {
35 pub ref_name: Option<String>,
38 pub force: bool,
40 pub with_gc: bool,
43 pub lock_ttl_seconds: Option<u64>,
49 pub gc_grace_hours: Option<u64>,
54}
55
56pub struct Compact<'a> {
58 store: Arc<dyn ObjectStore>,
59 prefix: String,
60 opts: CompactOpts,
61 prompter: &'a dyn Prompter,
62}
63
64impl<'a> Compact<'a> {
65 #[must_use]
69 pub fn new(
70 store: Arc<dyn ObjectStore>,
71 prefix: impl Into<String>,
72 opts: CompactOpts,
73 prompter: &'a dyn Prompter,
74 ) -> Self {
75 Self {
76 store,
77 prefix: prefix.into(),
78 opts,
79 prompter,
80 }
81 }
82
83 pub async fn run(&self) -> Result<(), ManageError> {
95 self.run_into(&mut std::io::stdout()).await
96 }
97
98 pub(crate) async fn run_into<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
109 let ttl_secs = resolve_lock_ttl_seconds(self.opts.lock_ttl_seconds);
114 let lock_ttl = saturating_duration_seconds(ttl_secs);
115 let compact_opts = PackchainCompactOpts {
116 force: self.opts.force,
117 lock_ttl,
118 };
119
120 let targets = self.resolve_targets(out).await?;
121 if targets.is_empty() {
122 writeln!(out, "compact: no refs match the criteria; nothing to do.")?;
123 return Ok(());
124 }
125
126 let summary = self
127 .compact_targets_into(out, &targets, compact_opts)
128 .await?;
129
130 if self.opts.with_gc && summary.any_compacted {
131 self.run_gc(out).await?;
132 } else if self.opts.with_gc {
133 writeln!(out, "compact: no refs were compacted; skipping gc.")?;
134 }
135 Ok(())
136 }
137
138 async fn compact_targets_into<W: Write>(
151 &self,
152 out: &mut W,
153 targets: &[RefName],
154 compact_opts: PackchainCompactOpts,
155 ) -> Result<RunSummary, ManageError> {
156 let mut summary = RunSummary::default();
157 for ref_name in targets {
158 match compact::compact(
159 Arc::clone(&self.store),
160 self.prefix_opt(),
161 ref_name,
162 compact_opts,
163 )
164 .await
165 {
166 Ok(outcome) => {
167 write_outcome(out, &outcome)?;
168 if matches!(outcome.action, CompactAction::Compacted) {
169 summary.any_compacted = true;
170 }
171 }
172 Err(PackchainError::ChainAbsent { ref_name: r }) => {
173 summary.skipped_vanished += 1;
183 warn!(
184 ref_path = %r,
185 "compact: chain.json vanished between target selection and per-ref compact \
186 (concurrent delete?); skipping",
187 );
188 writeln!(
189 out,
190 "compact: {r} vanished between selection and compact (concurrent delete?); skipping",
191 )?;
192 }
193 Err(e) => return Err(ManageError::Packchain(e)),
194 }
195 }
196 if summary.skipped_vanished > 0 {
197 writeln!(
198 out,
199 "compact: skipped {} vanished ref(s) (chain.json removed concurrently).",
200 summary.skipped_vanished,
201 )?;
202 }
203 Ok(summary)
204 }
205
206 async fn resolve_targets<W: Write>(&self, out: &mut W) -> Result<Vec<RefName>, ManageError> {
210 if let Some(name) = &self.opts.ref_name {
211 let ref_name =
212 RefName::new(name).map_err(|_| ManageError::InvalidBranch(name.clone()))?;
213 return Ok(vec![ref_name]);
214 }
215
216 let report = self.audit_for_compaction_candidates().await?;
217 let candidates: Vec<&BranchRow> = report
218 .branches
219 .iter()
220 .filter(|r| r.recommend_compact)
221 .collect();
222 if candidates.is_empty() {
223 return Ok(Vec::new());
224 }
225
226 writeln!(out, "Branches recommended for compaction:")?;
227 for row in &candidates {
228 writeln!(
229 out,
230 " - {}: {} segment(s), {} byte(s)",
231 row.ref_path, row.segments_total, row.bytes_total,
232 )?;
233 }
234 if !self.prompter.confirm("Compact all of the above?")? {
235 writeln!(out, "Aborted")?;
236 return Err(ManageError::Cancelled);
237 }
238
239 let mut resolved = Vec::with_capacity(candidates.len());
240 for row in candidates {
241 let ref_name = RefName::new(&row.ref_path)
242 .map_err(|_| ManageError::InvalidBranch(row.ref_path.clone()))?;
243 resolved.push(ref_name);
244 }
245 Ok(resolved)
246 }
247
248 async fn audit_for_compaction_candidates(&self) -> Result<AuditReport, ManageError> {
251 let list_prefix = keys::join(Some(&self.prefix), "");
252 let objects = self.store.list(&list_prefix).await?;
253 audit::audit(self.store.as_ref(), &self.prefix, &objects)
254 .await
255 .map_err(ManageError::from)
256 }
257
258 async fn run_gc<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
259 let store_ref = self.store.as_ref();
260 let mark = gc::mark(store_ref, &self.prefix, gc::MarkOpts::default()).await?;
261 format_mark_outcome(out, &mark)?;
262 if mark.orphan_count != 0 {
263 info!(
264 run_id = %mark.run_id,
265 key = %mark.tombstone_key,
266 "compact --with-gc: mark completed",
267 );
268 }
269 let grace_hours = gc::resolve_grace_hours(self.opts.gc_grace_hours);
270 let sweep = gc::sweep(
271 store_ref,
272 &self.prefix,
273 gc::SweepOpts {
274 grace_hours,
275 force: false,
276 },
277 )
278 .await?;
279 format_sweep_outcome(out, &sweep)?;
280 Ok(())
281 }
282
283 fn prefix_opt(&self) -> Option<&str> {
284 if self.prefix.is_empty() {
285 None
286 } else {
287 Some(&self.prefix)
288 }
289 }
290}
291
292fn write_outcome<W: Write>(out: &mut W, outcome: &CompactOutcome) -> std::io::Result<()> {
293 match outcome.action {
294 CompactAction::Compacted => {
295 let new_pack = outcome.new_pack_sha.as_deref().unwrap_or("?");
296 writeln!(
297 out,
298 "compact: {} rewritten to single segment (was {} segment(s), {} byte(s); new pack {} at {} byte(s))",
299 outcome.ref_path,
300 outcome.prior_segments,
301 outcome.prior_bytes,
302 new_pack,
303 outcome.new_pack_bytes,
304 )
305 }
306 CompactAction::SkippedUnderThreshold => writeln!(
307 out,
308 "compact: {} below heuristic ({} segment(s), {} byte(s)); skipping. Use --force to compact unconditionally.",
309 outcome.ref_path, outcome.prior_segments, outcome.prior_bytes,
310 ),
311 CompactAction::AlreadyMinimal => writeln!(
312 out,
313 "compact: {} already a single-segment chain at the tip; nothing to do.",
314 outcome.ref_path,
315 ),
316 CompactAction::LockContended => writeln!(
317 out,
318 "compact: {} per-ref lock is held by another client; try again later.",
319 outcome.ref_path,
320 ),
321 }
322}
323
324#[derive(Debug, Default)]
327struct RunSummary {
328 any_compacted: bool,
332 skipped_vanished: usize,
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342 use crate::manage::ScriptedPrompter;
343 use crate::object_store::mock::MockStore;
344 use std::sync::Arc;
345 use time::Duration;
346
347 fn store_arc(mock: &MockStore) -> Arc<dyn ObjectStore> {
348 Arc::new(mock.clone())
349 }
350
351 #[tokio::test]
352 async fn run_with_named_ref_propagates_invalid_branch() {
353 let mock = MockStore::new();
357 let prompter = ScriptedPrompter::new([]);
358 let runner = Compact::new(
359 store_arc(&mock),
360 "repo",
361 CompactOpts {
362 ref_name: Some("refs/heads/../etc/passwd".to_owned()),
363 ..CompactOpts::default()
364 },
365 &prompter,
366 );
367 let err = runner.run().await.expect_err("invalid ref must error");
368 assert!(matches!(err, ManageError::InvalidBranch(_)), "{err:?}");
369 }
370
371 #[tokio::test]
372 async fn run_default_with_no_candidates_does_nothing() {
373 let mock = MockStore::new();
377 let prompter = ScriptedPrompter::new([]); let runner = Compact::new(store_arc(&mock), "repo", CompactOpts::default(), &prompter);
379 runner.run().await.expect("no-candidate run is Ok");
380 }
381
382 #[tokio::test]
383 async fn vanished_ref_is_skipped_and_other_targets_still_compact() {
384 let mock = MockStore::new();
400 let single_segment_chain = |sha: &str| crate::packchain::schema::ChainManifest {
401 v: 1,
402 tip: crate::packchain::schema::Sha40::try_new(sha).unwrap(),
403 full_at: crate::packchain::schema::Sha40::try_new(sha).unwrap(),
404 segments: vec![crate::packchain::schema::ChainSegment {
405 sha: crate::packchain::schema::Sha40::try_new(sha).unwrap(),
406 parent_sha: None,
407 pack: format!("packs/{sha}.pack"),
408 bytes: 1024,
409 }],
410 };
411 let main = crate::git::RefName::new("refs/heads/main").unwrap();
412 let dev = crate::git::RefName::new("refs/heads/dev").unwrap();
413 crate::packchain::manifest::write_chain(
414 &mock,
415 Some("repo"),
416 &main,
417 &single_segment_chain("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
418 )
419 .await
420 .unwrap();
421 crate::packchain::manifest::write_chain(
422 &mock,
423 Some("repo"),
424 &dev,
425 &single_segment_chain("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
426 )
427 .await
428 .unwrap();
429
430 assert!(
436 mock.remove_key("repo/refs/heads/dev/chain.json"),
437 "fixture invariant: dev's chain.json must be present pre-delete",
438 );
439
440 let prompter = ScriptedPrompter::new([]);
441 let runner = Compact::new(
442 store_arc(&mock),
443 "repo",
444 CompactOpts {
445 force: true,
446 ..CompactOpts::default()
447 },
448 &prompter,
449 );
450 let compact_opts = PackchainCompactOpts {
451 force: true,
452 lock_ttl: Duration::seconds(60),
453 };
454 let targets = vec![dev.clone(), main.clone()];
458 let mut out: Vec<u8> = Vec::new();
459 let summary = runner
460 .compact_targets_into(&mut out, &targets, compact_opts)
461 .await
462 .expect("vanished ref must NOT abort the run");
463 let stdout = String::from_utf8(out).expect("operator output is utf-8");
464
465 assert_eq!(summary.skipped_vanished, 1);
470 assert!(
471 !summary.any_compacted,
472 "AlreadyMinimal must not flip any_compacted",
473 );
474 assert!(
475 stdout.contains(
476 "compact: refs/heads/dev vanished between selection and compact \
477 (concurrent delete?); skipping",
478 ),
479 "operator must see the per-ref vanished line for dev; got: {stdout}",
480 );
481 assert!(
482 stdout.contains(
483 "compact: refs/heads/main already a single-segment chain at the tip; nothing to do.",
484 ),
485 "the surviving ref's outcome must still print after the vanished skip; got: {stdout}",
486 );
487 assert!(
488 stdout.contains("compact: skipped 1 vanished ref(s)"),
489 "trailing summary must surface the skip count; got: {stdout}",
490 );
491 }
492
493 #[tokio::test]
494 async fn non_chainabsent_packchain_error_still_aborts_run() {
495 let mock = MockStore::new();
501 let main = crate::git::RefName::new("refs/heads/main").unwrap();
502 mock.insert(
507 "repo/refs/heads/main/chain.json",
508 bytes::Bytes::from_static(b"not valid json"),
509 );
510
511 let prompter = ScriptedPrompter::new([]);
512 let runner = Compact::new(
513 store_arc(&mock),
514 "repo",
515 CompactOpts {
516 force: true,
517 ..CompactOpts::default()
518 },
519 &prompter,
520 );
521 let compact_opts = PackchainCompactOpts {
522 force: true,
523 lock_ttl: Duration::seconds(60),
524 };
525 let mut out: Vec<u8> = Vec::new();
526 let err = runner
527 .compact_targets_into(&mut out, std::slice::from_ref(&main), compact_opts)
528 .await
529 .expect_err("non-ChainAbsent engine error must abort the run");
530 assert!(
531 matches!(err, ManageError::Packchain(_)),
532 "expected ManageError::Packchain, got {err:?}",
533 );
534 }
535
536 #[tokio::test]
537 async fn with_gc_skipped_when_no_compaction_happened() {
538 let mock = MockStore::new();
542 let chain = crate::packchain::schema::ChainManifest {
543 v: 1,
544 tip: crate::packchain::schema::Sha40::try_new(
545 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
546 )
547 .unwrap(),
548 full_at: crate::packchain::schema::Sha40::try_new(
549 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
550 )
551 .unwrap(),
552 segments: vec![crate::packchain::schema::ChainSegment {
553 sha: crate::packchain::schema::Sha40::try_new(
554 "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
555 )
556 .unwrap(),
557 parent_sha: Some(
558 crate::packchain::schema::Sha40::try_new(
559 "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
560 )
561 .unwrap(),
562 ),
563 pack: "packs/1111111111111111111111111111111111111111.pack".to_owned(),
564 bytes: 1024,
565 }],
566 };
567 let rn = crate::git::RefName::new("refs/heads/main").unwrap();
568 crate::packchain::manifest::write_chain(&mock, Some("repo"), &rn, &chain)
569 .await
570 .unwrap();
571 mock.insert(
579 "repo/packs/9999999999999999999999999999999999999999.pack",
580 bytes::Bytes::from_static(b"orphan"),
581 );
582
583 let prompter = ScriptedPrompter::new([]);
584 let runner = Compact::new(
585 store_arc(&mock),
586 "repo",
587 CompactOpts {
588 ref_name: Some("refs/heads/main".to_owned()),
589 with_gc: true,
590 ..CompactOpts::default()
591 },
592 &prompter,
593 );
594 runner.run().await.expect("skip-under-threshold run is Ok");
600 let keys = mock.keys();
601 assert!(
602 !keys.iter().any(|k| k.starts_with("repo/gc/")),
603 "with_gc must NOT run gc when nothing was compacted; \
604 unexpected gc/ keys: {:?}",
605 keys.iter()
606 .filter(|k| k.starts_with("repo/gc/"))
607 .collect::<Vec<_>>(),
608 );
609 }
610}