Skip to main content

git_remote_object_store/manage/
compact.rs

1//! `compact` subcommand for the management CLI (issue #67, Phase 5
2//! of #52).
3//!
4//! Drives [`crate::packchain::compact::compact`] against one ref
5//! (when `--ref` is given) or every ref whose chain meets the
6//! compaction heuristic (the default — operator confirms the list
7//! interactively before any work runs).
8//!
9//! All output is human-readable on stdout; the management CLI may
10//! write to stdout per `.claude/rules/protocol-stdout.md`. The
11//! per-ref loop accepts an injected writer so tests can pin the
12//! operator-visible output byte-for-byte (issue #160).
13
14use std::io::Write;
15use std::sync::Arc;
16
17use tracing::{info, warn};
18
19use super::gc_output::{format_mark_outcome, format_sweep_outcome};
20use super::{ManageError, Prompter};
21use crate::git::RefName;
22use crate::keys;
23use crate::object_store::ObjectStore;
24use crate::packchain::PackchainError;
25use crate::packchain::audit::{self, AuditReport, BranchRow};
26use crate::packchain::compact::{
27    self, CompactAction, CompactOpts as PackchainCompactOpts, CompactOutcome,
28};
29use crate::packchain::gc;
30use crate::protocol::push::{resolve_lock_ttl_seconds, saturating_duration_seconds};
31
32/// Tunables for [`Compact::run`]. Field semantics mirror the CLI flags.
33#[derive(Debug, Clone, Default)]
34pub struct CompactOpts {
35    /// Compact only the named ref. `None` triggers the audit-driven
36    /// "every ref meeting the heuristic" mode.
37    pub ref_name: Option<String>,
38    /// Bypass the heuristic and compact unconditionally.
39    pub force: bool,
40    /// Run [`crate::packchain::gc`] mark+sweep against the same
41    /// bucket after a successful compact.
42    pub with_gc: bool,
43    /// Lock TTL for compact's per-ref lock. When `None` *or* `Some(0)`,
44    /// falls back to [`crate::protocol::push::lock_ttl_from_env`] which
45    /// honours `GIT_REMOTE_OBJECT_STORE_LOCK_TTL_SECONDS`. A zero TTL
46    /// would defeat per-ref locking (issue #208), so the resolver
47    /// clamps it through [`crate::protocol::push::resolve_lock_ttl_seconds`].
48    pub lock_ttl_seconds: Option<u64>,
49    /// Grace hours forwarded to `gc::sweep` when `with_gc` is set.
50    /// `None` falls back to [`crate::packchain::gc::grace_hours_from_env`]
51    /// which honours `GIT_REMOTE_OBJECT_STORE_GC_GRACE_HOURS` (defaulting
52    /// to [`gc::DEFAULT_GRACE_HOURS`] when unset).
53    pub gc_grace_hours: Option<u64>,
54}
55
56/// `compact` runner.
57pub struct Compact<'a> {
58    store: Arc<dyn ObjectStore>,
59    prefix: String,
60    opts: CompactOpts,
61    prompter: &'a dyn Prompter,
62}
63
64impl<'a> Compact<'a> {
65    /// Construct a runner. `prefix` is the parsed remote URL's
66    /// repository prefix without a trailing slash; pass an empty
67    /// string for bucket-root repositories.
68    #[must_use]
69    pub fn new(
70        store: Arc<dyn ObjectStore>,
71        prefix: impl Into<String>,
72        opts: CompactOpts,
73        prompter: &'a dyn Prompter,
74    ) -> Self {
75        Self {
76            store,
77            prefix: prefix.into(),
78            opts,
79            prompter,
80        }
81    }
82
83    /// Execute the configured flow.
84    ///
85    /// # Errors
86    ///
87    /// Returns [`ManageError::Store`] for transport failures,
88    /// [`ManageError::Packchain`] for engine-level failures during
89    /// chain install / repack, [`ManageError::InvalidBranch`] when
90    /// `--ref` value fails ref-name validation,
91    /// [`ManageError::BranchNotFound`] when a named ref has no
92    /// chain.json, and [`ManageError::Cancelled`] when the operator
93    /// declines the interactive prompt.
94    pub async fn run(&self) -> Result<(), ManageError> {
95        self.run_into(&mut std::io::stdout()).await
96    }
97
98    /// Same contract as [`run`](Self::run) but writes human-readable
99    /// output to `out`. Tests use this to capture the operator messages
100    /// (e.g. the per-ref "vanished, skipping" notice from #160) so a
101    /// regression that silently turns a concurrent ref deletion into a
102    /// whole-run abort, or into a silent success, is caught.
103    ///
104    /// # Errors
105    ///
106    /// Same as [`run`](Self::run), plus [`ManageError::Io`] if a write
107    /// to `out` fails.
108    pub(crate) async fn run_into<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
109        // Route Option<u64> through the shared resolver so `Some(0)`
110        // cannot defeat per-ref locking (issue #208), then saturate
111        // via the shared `saturating_duration_seconds` helper so
112        // `u64::MAX`-class TTLs map to the ~292-billion-year sentinel.
113        let ttl_secs = resolve_lock_ttl_seconds(self.opts.lock_ttl_seconds);
114        let lock_ttl = saturating_duration_seconds(ttl_secs);
115        let compact_opts = PackchainCompactOpts {
116            force: self.opts.force,
117            lock_ttl,
118        };
119
120        let targets = self.resolve_targets(out).await?;
121        if targets.is_empty() {
122            writeln!(out, "compact: no refs match the criteria; nothing to do.")?;
123            return Ok(());
124        }
125
126        let summary = self
127            .compact_targets_into(out, &targets, compact_opts)
128            .await?;
129
130        if self.opts.with_gc && summary.any_compacted {
131            self.run_gc(out).await?;
132        } else if self.opts.with_gc {
133            writeln!(out, "compact: no refs were compacted; skipping gc.")?;
134        }
135        Ok(())
136    }
137
138    /// Per-ref compaction loop with the #160 "vanished, skipping"
139    /// downgrade. Returns the aggregate summary so the caller can
140    /// decide whether to run gc and how to phrase the trailing line.
141    ///
142    /// `ChainAbsent` on any single ref is downgraded to a warn-and-skip
143    /// so a concurrent `delete-branch` (or any deletion between the
144    /// `resolve_targets` snapshot and a ref's turn in the loop) does
145    /// not abort the entire run and rob the operator of the work that
146    /// would otherwise complete for the surviving refs. Every other
147    /// `PackchainError` variant — and any `ManageError` — still aborts:
148    /// a transport failure or auth error signals the bucket itself is
149    /// untrustworthy, not just one vanished ref.
150    async fn compact_targets_into<W: Write>(
151        &self,
152        out: &mut W,
153        targets: &[RefName],
154        compact_opts: PackchainCompactOpts,
155    ) -> Result<RunSummary, ManageError> {
156        let mut summary = RunSummary::default();
157        for ref_name in targets {
158            match compact::compact(
159                Arc::clone(&self.store),
160                self.prefix_opt(),
161                ref_name,
162                compact_opts,
163            )
164            .await
165            {
166                Ok(outcome) => {
167                    write_outcome(out, &outcome)?;
168                    if matches!(outcome.action, CompactAction::Compacted) {
169                        summary.any_compacted = true;
170                    }
171                }
172                Err(PackchainError::ChainAbsent { ref_name: r }) => {
173                    // Concurrent `delete-branch` (or any deletion path)
174                    // removed this ref's `chain.json` between the
175                    // `resolve_targets` snapshot and its turn in the
176                    // loop. Without this carve-out, a single vanished
177                    // ref would abort the entire compact run and the
178                    // surviving refs would never be touched (issue
179                    // #160). The skip is operator-visible both on
180                    // stdout and via `tracing::warn` so a vanished ref
181                    // never disappears silently.
182                    summary.skipped_vanished += 1;
183                    warn!(
184                        ref_path = %r,
185                        "compact: chain.json vanished between target selection and per-ref compact \
186                         (concurrent delete?); skipping",
187                    );
188                    writeln!(
189                        out,
190                        "compact: {r} vanished between selection and compact (concurrent delete?); skipping",
191                    )?;
192                }
193                Err(e) => return Err(ManageError::Packchain(e)),
194            }
195        }
196        if summary.skipped_vanished > 0 {
197            writeln!(
198                out,
199                "compact: skipped {} vanished ref(s) (chain.json removed concurrently).",
200                summary.skipped_vanished,
201            )?;
202        }
203        Ok(summary)
204    }
205
206    /// Compute the list of ref names to compact. `--ref` short-
207    /// circuits to a single ref; otherwise scan via the audit and
208    /// prompt the operator to confirm the candidate list.
209    async fn resolve_targets<W: Write>(&self, out: &mut W) -> Result<Vec<RefName>, ManageError> {
210        if let Some(name) = &self.opts.ref_name {
211            let ref_name =
212                RefName::new(name).map_err(|_| ManageError::InvalidBranch(name.clone()))?;
213            return Ok(vec![ref_name]);
214        }
215
216        let report = self.audit_for_compaction_candidates().await?;
217        let candidates: Vec<&BranchRow> = report
218            .branches
219            .iter()
220            .filter(|r| r.recommend_compact)
221            .collect();
222        if candidates.is_empty() {
223            return Ok(Vec::new());
224        }
225
226        writeln!(out, "Branches recommended for compaction:")?;
227        for row in &candidates {
228            writeln!(
229                out,
230                "  - {}: {} segment(s), {} byte(s)",
231                row.ref_path, row.segments_total, row.bytes_total,
232            )?;
233        }
234        if !self.prompter.confirm("Compact all of the above?")? {
235            writeln!(out, "Aborted")?;
236            return Err(ManageError::Cancelled);
237        }
238
239        let mut resolved = Vec::with_capacity(candidates.len());
240        for row in candidates {
241            let ref_name = RefName::new(&row.ref_path)
242                .map_err(|_| ManageError::InvalidBranch(row.ref_path.clone()))?;
243            resolved.push(ref_name);
244        }
245        Ok(resolved)
246    }
247
248    /// Walk the bucket once and derive the audit report we use for
249    /// candidate selection. Mirrors the doctor's audit flow.
250    async fn audit_for_compaction_candidates(&self) -> Result<AuditReport, ManageError> {
251        let list_prefix = keys::join(Some(&self.prefix), "");
252        let objects = self.store.list(&list_prefix).await?;
253        audit::audit(self.store.as_ref(), &self.prefix, &objects)
254            .await
255            .map_err(ManageError::from)
256    }
257
258    async fn run_gc<W: Write>(&self, out: &mut W) -> Result<(), ManageError> {
259        let store_ref = self.store.as_ref();
260        let mark = gc::mark(store_ref, &self.prefix, gc::MarkOpts::default()).await?;
261        format_mark_outcome(out, &mark)?;
262        if mark.orphan_count != 0 {
263            info!(
264                run_id = %mark.run_id,
265                key = %mark.tombstone_key,
266                "compact --with-gc: mark completed",
267            );
268        }
269        let grace_hours = gc::resolve_grace_hours(self.opts.gc_grace_hours);
270        let sweep = gc::sweep(
271            store_ref,
272            &self.prefix,
273            gc::SweepOpts {
274                grace_hours,
275                force: false,
276            },
277        )
278        .await?;
279        format_sweep_outcome(out, &sweep)?;
280        Ok(())
281    }
282
283    fn prefix_opt(&self) -> Option<&str> {
284        if self.prefix.is_empty() {
285            None
286        } else {
287            Some(&self.prefix)
288        }
289    }
290}
291
292fn write_outcome<W: Write>(out: &mut W, outcome: &CompactOutcome) -> std::io::Result<()> {
293    match outcome.action {
294        CompactAction::Compacted => {
295            let new_pack = outcome.new_pack_sha.as_deref().unwrap_or("?");
296            writeln!(
297                out,
298                "compact: {} rewritten to single segment (was {} segment(s), {} byte(s); new pack {} at {} byte(s))",
299                outcome.ref_path,
300                outcome.prior_segments,
301                outcome.prior_bytes,
302                new_pack,
303                outcome.new_pack_bytes,
304            )
305        }
306        CompactAction::SkippedUnderThreshold => writeln!(
307            out,
308            "compact: {} below heuristic ({} segment(s), {} byte(s)); skipping. Use --force to compact unconditionally.",
309            outcome.ref_path, outcome.prior_segments, outcome.prior_bytes,
310        ),
311        CompactAction::AlreadyMinimal => writeln!(
312            out,
313            "compact: {} already a single-segment chain at the tip; nothing to do.",
314            outcome.ref_path,
315        ),
316        CompactAction::LockContended => writeln!(
317            out,
318            "compact: {} per-ref lock is held by another client; try again later.",
319            outcome.ref_path,
320        ),
321    }
322}
323
324/// Aggregate over the per-ref loop. Drives both the `with_gc` decision
325/// and the trailing "skipped N vanished ref(s)" line.
326#[derive(Debug, Default)]
327struct RunSummary {
328    /// At least one ref produced a [`CompactAction::Compacted`]
329    /// outcome. Gates the `with_gc` mark/sweep — if nothing changed,
330    /// gc has no orphans to reap.
331    any_compacted: bool,
332    /// Count of refs that vanished between target selection and the
333    /// per-ref compact (issue #160). Surfaced in the trailing summary
334    /// line so the operator sees the skip count even if the warn line
335    /// scrolled off.
336    skipped_vanished: usize,
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342    use crate::manage::ScriptedPrompter;
343    use crate::object_store::mock::MockStore;
344    use std::sync::Arc;
345    use time::Duration;
346
347    fn store_arc(mock: &MockStore) -> Arc<dyn ObjectStore> {
348        Arc::new(mock.clone())
349    }
350
351    #[tokio::test]
352    async fn run_with_named_ref_propagates_invalid_branch() {
353        // `--ref ../etc/passwd` must surface a typed
354        // `ManageError::InvalidBranch`, not a transport error or a
355        // `ChainAbsent`.
356        let mock = MockStore::new();
357        let prompter = ScriptedPrompter::new([]);
358        let runner = Compact::new(
359            store_arc(&mock),
360            "repo",
361            CompactOpts {
362                ref_name: Some("refs/heads/../etc/passwd".to_owned()),
363                ..CompactOpts::default()
364            },
365            &prompter,
366        );
367        let err = runner.run().await.expect_err("invalid ref must error");
368        assert!(matches!(err, ManageError::InvalidBranch(_)), "{err:?}");
369    }
370
371    #[tokio::test]
372    async fn run_default_with_no_candidates_does_nothing() {
373        // No chain.json in the bucket → audit returns no candidates →
374        // runner prints "nothing to do" and returns Ok without ever
375        // prompting.
376        let mock = MockStore::new();
377        let prompter = ScriptedPrompter::new([]); // no answers queued
378        let runner = Compact::new(store_arc(&mock), "repo", CompactOpts::default(), &prompter);
379        runner.run().await.expect("no-candidate run is Ok");
380    }
381
382    #[tokio::test]
383    async fn vanished_ref_is_skipped_and_other_targets_still_compact() {
384        // Issue #160 regression: `manage compact` snapshots the target
385        // list once and then loops `compact()` per ref. If a ref is
386        // deleted between the snapshot and its turn — exactly the
387        // window a concurrent `delete-branch` opens — the underlying
388        // engine returns `PackchainError::ChainAbsent`. Pre-fix, the
389        // `?` on that call aborted the whole run; surviving refs were
390        // robbed of work the operator had explicitly confirmed.
391        //
392        // Fixture: two refs that are already-minimal single-segment
393        // chains at the tip. The compact engine short-circuits both
394        // through `AlreadyMinimal` without touching packs/bundles —
395        // sufficient to prove the per-ref loop continues across the
396        // ChainAbsent skip. The dev ref's chain.json is deleted
397        // before `compact_targets_into` runs, mirroring the
398        // post-snapshot / pre-per-ref deletion window from the bug.
399        let mock = MockStore::new();
400        let single_segment_chain = |sha: &str| crate::packchain::schema::ChainManifest {
401            v: 1,
402            tip: crate::packchain::schema::Sha40::try_new(sha).unwrap(),
403            full_at: crate::packchain::schema::Sha40::try_new(sha).unwrap(),
404            segments: vec![crate::packchain::schema::ChainSegment {
405                sha: crate::packchain::schema::Sha40::try_new(sha).unwrap(),
406                parent_sha: None,
407                pack: format!("packs/{sha}.pack"),
408                bytes: 1024,
409            }],
410        };
411        let main = crate::git::RefName::new("refs/heads/main").unwrap();
412        let dev = crate::git::RefName::new("refs/heads/dev").unwrap();
413        crate::packchain::manifest::write_chain(
414            &mock,
415            Some("repo"),
416            &main,
417            &single_segment_chain("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
418        )
419        .await
420        .unwrap();
421        crate::packchain::manifest::write_chain(
422            &mock,
423            Some("repo"),
424            &dev,
425            &single_segment_chain("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
426        )
427        .await
428        .unwrap();
429
430        // Simulate a concurrent `delete-branch` removing dev's
431        // `chain.json` AFTER the target snapshot but BEFORE its turn
432        // in the loop. The fixture sets force=true to bypass the
433        // heuristic so an already-minimal chain still gets a real
434        // `compact()` call (which short-circuits in the engine).
435        assert!(
436            mock.remove_key("repo/refs/heads/dev/chain.json"),
437            "fixture invariant: dev's chain.json must be present pre-delete",
438        );
439
440        let prompter = ScriptedPrompter::new([]);
441        let runner = Compact::new(
442            store_arc(&mock),
443            "repo",
444            CompactOpts {
445                force: true,
446                ..CompactOpts::default()
447            },
448            &prompter,
449        );
450        let compact_opts = PackchainCompactOpts {
451            force: true,
452            lock_ttl: Duration::seconds(60),
453        };
454        // Drive the per-ref loop directly with a stable target order
455        // so the test asserts deterministic output regardless of
456        // audit-driven ordering.
457        let targets = vec![dev.clone(), main.clone()];
458        let mut out: Vec<u8> = Vec::new();
459        let summary = runner
460            .compact_targets_into(&mut out, &targets, compact_opts)
461            .await
462            .expect("vanished ref must NOT abort the run");
463        let stdout = String::from_utf8(out).expect("operator output is utf-8");
464
465        // Pre-fix: this assertion fails because `compact_targets_into`
466        // returns `Err(Packchain(ChainAbsent { … }))` and we never
467        // even reach the second target. Post-fix: the run completes
468        // with one vanished skip and one AlreadyMinimal.
469        assert_eq!(summary.skipped_vanished, 1);
470        assert!(
471            !summary.any_compacted,
472            "AlreadyMinimal must not flip any_compacted",
473        );
474        assert!(
475            stdout.contains(
476                "compact: refs/heads/dev vanished between selection and compact \
477                 (concurrent delete?); skipping",
478            ),
479            "operator must see the per-ref vanished line for dev; got: {stdout}",
480        );
481        assert!(
482            stdout.contains(
483                "compact: refs/heads/main already a single-segment chain at the tip; nothing to do.",
484            ),
485            "the surviving ref's outcome must still print after the vanished skip; got: {stdout}",
486        );
487        assert!(
488            stdout.contains("compact: skipped 1 vanished ref(s)"),
489            "trailing summary must surface the skip count; got: {stdout}",
490        );
491    }
492
493    #[tokio::test]
494    async fn non_chainabsent_packchain_error_still_aborts_run() {
495        // Mirror of the vanished-ref test that proves the carve-out is
496        // narrow: only `ChainAbsent` downgrades. Any other
497        // `PackchainError` — here, a malformed chain.json that the
498        // engine surfaces as `Json` — must still abort the run so a
499        // genuine bucket-level fault is not silently swallowed.
500        let mock = MockStore::new();
501        let main = crate::git::RefName::new("refs/heads/main").unwrap();
502        // A non-empty, non-JSON body trips the deserialise path in
503        // `load_chain`; the engine surfaces this as
504        // `PackchainError::Json`, which the management layer must
505        // propagate verbatim.
506        mock.insert(
507            "repo/refs/heads/main/chain.json",
508            bytes::Bytes::from_static(b"not valid json"),
509        );
510
511        let prompter = ScriptedPrompter::new([]);
512        let runner = Compact::new(
513            store_arc(&mock),
514            "repo",
515            CompactOpts {
516                force: true,
517                ..CompactOpts::default()
518            },
519            &prompter,
520        );
521        let compact_opts = PackchainCompactOpts {
522            force: true,
523            lock_ttl: Duration::seconds(60),
524        };
525        let mut out: Vec<u8> = Vec::new();
526        let err = runner
527            .compact_targets_into(&mut out, std::slice::from_ref(&main), compact_opts)
528            .await
529            .expect_err("non-ChainAbsent engine error must abort the run");
530        assert!(
531            matches!(err, ManageError::Packchain(_)),
532            "expected ManageError::Packchain, got {err:?}",
533        );
534    }
535
536    #[tokio::test]
537    async fn with_gc_skipped_when_no_compaction_happened() {
538        // ref_name + force=false against a chain that does not meet
539        // the heuristic → SkippedUnderThreshold → with_gc must NOT
540        // run gc (no orphans were produced).
541        let mock = MockStore::new();
542        let chain = crate::packchain::schema::ChainManifest {
543            v: 1,
544            tip: crate::packchain::schema::Sha40::try_new(
545                "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
546            )
547            .unwrap(),
548            full_at: crate::packchain::schema::Sha40::try_new(
549                "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
550            )
551            .unwrap(),
552            segments: vec![crate::packchain::schema::ChainSegment {
553                sha: crate::packchain::schema::Sha40::try_new(
554                    "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
555                )
556                .unwrap(),
557                parent_sha: Some(
558                    crate::packchain::schema::Sha40::try_new(
559                        "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
560                    )
561                    .unwrap(),
562                ),
563                pack: "packs/1111111111111111111111111111111111111111.pack".to_owned(),
564                bytes: 1024,
565            }],
566        };
567        let rn = crate::git::RefName::new("refs/heads/main").unwrap();
568        crate::packchain::manifest::write_chain(&mock, Some("repo"), &rn, &chain)
569            .await
570            .unwrap();
571        // Add a stand-alone pack with NO chain reference. This is a
572        // real orphan: gc::mark would observe it and write a
573        // tombstone if it ran. The original test had no orphans on
574        // the bucket, so the "no `gc/` keys" assertion was
575        // vacuously true regardless of whether the with_gc gate
576        // fired — mutation-verified during /audit-tests
577        // (#67-followup).
578        mock.insert(
579            "repo/packs/9999999999999999999999999999999999999999.pack",
580            bytes::Bytes::from_static(b"orphan"),
581        );
582
583        let prompter = ScriptedPrompter::new([]);
584        let runner = Compact::new(
585            store_arc(&mock),
586            "repo",
587            CompactOpts {
588                ref_name: Some("refs/heads/main".to_owned()),
589                with_gc: true,
590                ..CompactOpts::default()
591            },
592            &prompter,
593        );
594        // Compact under-threshold short-circuits to
595        // `SkippedUnderThreshold`; with_gc must observe
596        // `any_compacted == false` and skip gc. If gc ran, it
597        // would tombstone the orphan above and we would see a
598        // `repo/gc/tombstones-*.json` key.
599        runner.run().await.expect("skip-under-threshold run is Ok");
600        let keys = mock.keys();
601        assert!(
602            !keys.iter().any(|k| k.starts_with("repo/gc/")),
603            "with_gc must NOT run gc when nothing was compacted; \
604             unexpected gc/ keys: {:?}",
605            keys.iter()
606                .filter(|k| k.starts_with("repo/gc/"))
607                .collect::<Vec<_>>(),
608        );
609    }
610}