Skip to main content

grex_core/
sync.rs

1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`crate::tree::sync_meta`] +
7//!    [`crate::tree::build_graph`] + [`FsPackLoader`] + a `GitBackend`.
8//! 2. Run plan-phase validators (manifest-level + graph-level).
9//! 3. Execute every action via a pluggable [`ActionExecutor`]
10//!    ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
11//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
12//!    `.grex/events.jsonl` event log.
13//!
14//! # Traversal order
15//!
16//! Nodes are executed in **depth-first post-order**: children fully install
17//! before their parent. Rationale: parent packs commonly `require:` artifacts
18//! created by children (e.g. a parent symlink whose `src` lives inside a
19//! child). Running the root last matches the overlay-style dotfile-install
20//! intent authors expect, and it matches how `walker.walk` is structured
21//! (children are hydrated before the recursion returns).
22//!
23//! # Decoupling
24//!
25//! The CLI crate drives this module through a thin `run()` entry point;
26//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
27//! expressions, ref overrides) can land in later milestones without breaking
28//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
29//! variant set.
30
31use std::borrow::Cow;
32use std::fs;
33use std::path::{Path, PathBuf};
34use std::sync::Arc;
35
36use chrono::{DateTime, Utc};
37use globset::{Glob, GlobSet, GlobSetBuilder};
38use thiserror::Error;
39use tokio_util::sync::CancellationToken;
40
41use crate::execute::{
42    ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
43    PlanExecutor, Platform, StepKind,
44};
45use crate::fs::{ManifestLock, ScopedLock};
46use crate::git::GixBackend;
47use crate::lockfile::{
48    branch_of, compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
49};
50use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
51use crate::pack::{Action, PackValidationError};
52use crate::plugin::{PackTypeRegistry, Registry};
53use crate::scheduler::Scheduler;
54use crate::tree::{
55    build_graph, sync_meta, FsPackLoader, PackGraph, PackNode, SyncMetaOptions, TreeError,
56};
57use crate::vars::VarEnv;
58
59/// Inputs to [`run`].
60///
61/// Fields are public-writable so call sites can construct with struct
62/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
63/// so future knobs (parallelism, filter expressions, additional ref
64/// strategies) can land without breaking library consumers who
65/// constructed with explicit-literal syntax. Forces callers to use
66/// struct-update syntax (`..Default::default()`).
67#[non_exhaustive]
68#[derive(Debug, Clone)]
69pub struct SyncOptions {
70    /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
71    pub dry_run: bool,
72    /// When `false`, skip plan-phase validators (manifest + graph). Debug
73    /// escape hatch; production callers should leave this `true`.
74    pub validate: bool,
75    /// Override workspace directory. `None` → derived from `pack_root`
76    /// (the directory holding `.grex/pack.yaml`).
77    ///
78    /// **v1.2.1 path (iii) semantics**: when `Some`, this path IS the
79    /// canonical meta directory. Children resolve parent-relatively as
80    /// `<workspace>/<child.path>` and `<workspace>/.grex/pack.yaml` is
81    /// where the root manifest is read from. The path MUST exist;
82    /// symlinks are resolved via `fs::canonicalize` to a single
83    /// inode-stable form. Pre-v1.2.1 the override only re-anchored
84    /// children — that legacy split is retired.
85    pub workspace: Option<PathBuf>,
86    /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
87    /// `Some`, every child pack clone/checkout uses this ref instead of
88    /// the declared `child.ref`. Empty strings are rejected at the CLI
89    /// layer.
90    pub ref_override: Option<String>,
91    /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
92    /// strings — compiled internally via an in-crate `globset` helper so the
93    /// `globset` crate version does not leak into the public API.
94    /// `None` / empty means every pack runs (M3 semantics). Matching is
95    /// against the pack's **workspace-relative** path normalized to
96    /// forward-slash form.
97    pub only_patterns: Option<Vec<String>>,
98    /// Bypass the lockfile hash-match skip (`grex sync --force`). When
99    /// `true`, every pack re-executes even if its `actions_hash` is
100    /// unchanged from the prior lockfile.
101    pub force: bool,
102    /// Max parallel pack ops for this sync run (feat-m6-1).
103    ///
104    /// * `None` → callers default to `num_cpus::get()` at CLI layer.
105    ///   Library callers who construct `SyncOptions` directly and leave
106    ///   this `None` get `num_cpus::get()` semantics too — the sync
107    ///   driver resolves the default in one place so the scheduler slot
108    ///   on every `ExecCtx` is always populated.
109    /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
110    /// * `Some(1)` → serial fast-path.
111    /// * `Some(n >= 2)` → bounded parallel.
112    pub parallel: Option<usize>,
113    /// v1.2.0 Stage 1.l prep — when `true`, walker Phase 2 may drop
114    /// dirty trees during prune. Still refuses ignored content unless
115    /// [`SyncOptions::force_prune_with_ignored`] is also `true`.
116    /// Default `false` preserves v1.1.1 behavior (refuse all dirty
117    /// drops).
118    pub force_prune: bool,
119    /// v1.2.0 Stage 1.l prep — when `true` (implies
120    /// [`SyncOptions::force_prune`]), walker Phase 2 also drops
121    /// ignored content. Hard override — the strongest level. Default
122    /// `false` preserves v1.1.1 behavior.
123    pub force_prune_with_ignored: bool,
124    /// v1.2.1 Item 5b — when `true` AND `force_prune` (or
125    /// `force_prune_with_ignored`) is set, divert Phase 2 prunes
126    /// through the snapshot-then-unlink quarantine pipeline. The
127    /// dest's full subtree is recursively copied to
128    /// `<workspace>/.grex/trash/<ISO8601>/<basename>/` BEFORE
129    /// `unlink(dest)` fires. Snapshot or audit-fsync failure aborts
130    /// the prune (no unlink). Lean theorem
131    /// `quarantine_snapshot_precedes_delete` proves the safety
132    /// contract. Default `false` preserves v1.2.0 direct-unlink
133    /// behavior. Has no effect unless one of the `force_prune*`
134    /// flags is also set (the CLI enforces this via
135    /// `requires = "force_prune"`; library callers who set this
136    /// with neither flag get a no-op since Phase 2 will not enter
137    /// the override path at all).
138    pub quarantine: bool,
139    /// v1.2.0 Stage 1.h opt-in — when `true`, the walker rewrites a
140    /// legacy v1.1.1 lockfile in place to the v1.2.0 shape. When
141    /// `false` (default), the walker errors on the legacy shape so
142    /// migration is always an explicit caller decision.
143    pub migrate_lockfile: bool,
144    /// v1.2.0 Stage 1.j prep — when `true` (default), the walker
145    /// descends into nested meta-children. `doctor --shallow` flips
146    /// this to `false` so only the immediate workspace is inspected.
147    pub recurse: bool,
148    /// v1.2.0 Stage 1.j prep — pairs with
149    /// [`SyncOptions::recurse`] for `--shallow=N`. `None` (default)
150    /// is unbounded recursion when `recurse` is `true`. `Some(n)`
151    /// caps depth at `n` levels of nesting.
152    pub max_depth: Option<usize>,
153    /// v1.2.5 — when `Some(N)`, every meta sync starts with a
154    /// best-effort GC sweep over `<meta>/.grex/trash/`, deleting
155    /// entries older than `N` days. `None` (default) preserves the
156    /// v1.2.1 indefinite-retention behavior. The CLI surfaces this
157    /// as `grex sync --retain-days N`; library callers wire it via
158    /// [`SyncOptions::with_retain_days`]. Sweep failures log via
159    /// `tracing::warn!` and DO NOT halt the sync.
160    pub retain_days: Option<u32>,
161}
162
163impl Default for SyncOptions {
164    fn default() -> Self {
165        Self {
166            dry_run: false,
167            validate: true,
168            workspace: None,
169            ref_override: None,
170            only_patterns: None,
171            force: false,
172            parallel: None,
173            // v1.2.0 Stage 1.m additions — defaults preserve v1.1.1
174            // behavior. Each field is a dormant placeholder until
175            // its corresponding walker stage wires it.
176            force_prune: false,
177            force_prune_with_ignored: false,
178            quarantine: false,
179            migrate_lockfile: false,
180            recurse: true,
181            max_depth: None,
182            retain_days: None,
183        }
184    }
185}
186
187/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
188/// Empty / absent input yields `Ok(None)` so M3's zero-config path
189/// (every pack runs) stays the default.
190fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
191    let Some(pats) = patterns else { return Ok(None) };
192    if pats.is_empty() {
193        return Ok(None);
194    }
195    let mut builder = GlobSetBuilder::new();
196    for p in pats {
197        let glob = Glob::new(p)
198            .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
199        builder.add(glob);
200    }
201    let set = builder
202        .build()
203        .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
204    Ok(Some(set))
205}
206
207impl SyncOptions {
208    /// Default options: wet-run, validators enabled, default workspace path.
209    #[must_use]
210    pub fn new() -> Self {
211        Self::default()
212    }
213
214    /// Set `dry_run`.
215    #[must_use]
216    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
217        self.dry_run = dry_run;
218        self
219    }
220
221    /// Set `validate`.
222    #[must_use]
223    pub fn with_validate(mut self, validate: bool) -> Self {
224        self.validate = validate;
225        self
226    }
227
228    /// Set `workspace` override.
229    #[must_use]
230    pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
231        self.workspace = workspace;
232        self
233    }
234
235    /// Set `ref_override` (`--ref`).
236    #[must_use]
237    pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
238        self.ref_override = ref_override;
239        self
240    }
241
242    /// Set `only_patterns` (`--only`). Empty vector or `None` disables
243    /// the filter.
244    #[must_use]
245    pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
246        self.only_patterns = patterns;
247        self
248    }
249
250    /// Set `force` (`--force`).
251    #[must_use]
252    pub fn with_force(mut self, force: bool) -> Self {
253        self.force = force;
254        self
255    }
256
257    /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
258    /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
259    #[must_use]
260    pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
261        self.parallel = parallel;
262        self
263    }
264
265    /// Set `force_prune` (`--force-prune`). See
266    /// [`SyncOptions::force_prune`] for the override matrix.
267    #[must_use]
268    pub fn with_force_prune(mut self, force_prune: bool) -> Self {
269        self.force_prune = force_prune;
270        self
271    }
272
273    /// Set `force_prune_with_ignored` (`--force-prune-with-ignored`).
274    /// See [`SyncOptions::force_prune_with_ignored`] for the override
275    /// matrix.
276    #[must_use]
277    pub fn with_force_prune_with_ignored(mut self, force_prune_with_ignored: bool) -> Self {
278        self.force_prune_with_ignored = force_prune_with_ignored;
279        self
280    }
281
282    /// Set `quarantine` (`--quarantine`). See
283    /// [`SyncOptions::quarantine`] for the snapshot-before-delete
284    /// contract. Has no effect unless [`SyncOptions::force_prune`]
285    /// or [`SyncOptions::force_prune_with_ignored`] is also set.
286    #[must_use]
287    pub fn with_quarantine(mut self, quarantine: bool) -> Self {
288        self.quarantine = quarantine;
289        self
290    }
291
292    /// Set `retain_days` (`--retain-days N`). See
293    /// [`SyncOptions::retain_days`] for the GC-sweep contract.
294    /// `None` preserves v1.2.1 indefinite-retention behavior;
295    /// `Some(N)` triggers a best-effort sweep at the start of every
296    /// meta sync.
297    #[must_use]
298    pub fn with_retain_days(mut self, retain_days: Option<u32>) -> Self {
299        self.retain_days = retain_days;
300        self
301    }
302}
303
304/// One executed (or planned) action step in a sync run.
305///
306/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
307/// plugin provenance) can land without breaking library consumers who
308/// destructure the struct.
309#[non_exhaustive]
310#[derive(Debug, Clone)]
311pub struct SyncStep {
312    /// Name of the pack that owned the action.
313    pub pack: String,
314    /// 0-based index into the pack's top-level `actions` vector.
315    pub action_idx: usize,
316    /// The [`ExecStep`] record emitted by the executor.
317    pub exec_step: ExecStep,
318}
319
320/// Outcome of a [`run`] invocation.
321///
322/// On fail-fast termination, `halted` carries the error that stopped the
323/// sync; every completed step up to that point is still in `steps` so
324/// callers can render a partial transcript.
325///
326/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
327/// can land without breaking library consumers who destructure the struct.
328#[non_exhaustive]
329#[derive(Debug)]
330pub struct SyncReport {
331    /// Fully-walked pack graph (present even on halted runs).
332    pub graph: PackGraph,
333    /// Steps produced by the executor, in execution order.
334    pub steps: Vec<SyncStep>,
335    /// `Some(e)` if execution stopped before all actions ran.
336    pub halted: Option<SyncError>,
337    /// Non-fatal manifest-append warnings (one per failed event append).
338    /// Kept as a separate field because spec marks event-log write failures
339    /// as non-aborting.
340    pub event_log_warnings: Vec<String>,
341    /// `Some(r)` when the pre-run teardown scan found orphaned backup
342    /// files or dangling [`Event::ActionStarted`] records from a prior
343    /// crashed run. Informational only — the report is still returned and
344    /// the sync proceeds. CLI renderers should surface a warning so the
345    /// operator can decide whether to run a future `grex doctor` verb.
346    pub pre_run_recovery: Option<RecoveryReport>,
347    /// One entry per child whose legacy `.grex/workspace/<name>/` layout
348    /// was relocated (or considered for relocation) on this sync. Empty
349    /// when no legacy directory was found — the common case for any
350    /// workspace built fresh on v1.1.0+. CLI renderers should surface
351    /// the list so operators see what changed.
352    pub workspace_migrations: Vec<WorkspaceMigration>,
353}
354
355/// One legacy-layout migration attempt. `outcome` distinguishes the
356/// move-succeeded case from the don't-clobber-user-data case so CLI
357/// renderers can present different advice to the operator.
358#[non_exhaustive]
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct WorkspaceMigration {
361    /// Source path under the legacy `.grex/workspace/<name>/` location,
362    /// rendered relative to the pack root for log readability.
363    pub from: PathBuf,
364    /// Destination flat-sibling path `<pack_root>/<name>/`, relative to
365    /// the pack root.
366    pub to: PathBuf,
367    /// What happened.
368    pub outcome: MigrationOutcome,
369}
370
371/// Outcome of one legacy-layout migration attempt.
372#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq, Eq)]
374pub enum MigrationOutcome {
375    /// Legacy directory was renamed onto the flat-sibling slot.
376    Migrated,
377    /// Both legacy and flat-sibling slots existed. Skipped — the user
378    /// must inspect and reconcile manually so we never silently delete
379    /// either.
380    SkippedBothExist,
381    /// Flat-sibling slot already had a non-grex file or directory in
382    /// the way. Skipped — refusing to clobber user data even when the
383    /// legacy slot is plainly the source of truth.
384    SkippedDestOccupied,
385    /// `fs::rename` failed (e.g. cross-volume, ACL denied). The legacy
386    /// directory is still in place; surfaced so the operator can move
387    /// it manually.
388    Failed { error: String },
389}
390
391/// Rich context attached to a [`SyncError::Halted`] variant.
392///
393/// Packages the pack + action position together with the underlying
394/// executor error and an optional human-readable recovery hint. Marked
395/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
396/// land without breaking `match` arms or struct destructures.
397#[non_exhaustive]
398#[derive(Debug)]
399pub struct HaltedContext {
400    /// Name of the pack that owned the halted action.
401    pub pack: String,
402    /// 0-based index into the pack's top-level `actions` vector.
403    pub action_idx: usize,
404    /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
405    pub action_name: String,
406    /// Underlying executor error.
407    pub error: ExecError,
408    /// Optional next-step suggestion for the operator. `None` when no
409    /// generic hint applies — the executor error's own `Display` already
410    /// tells the story.
411    pub recovery_hint: Option<String>,
412}
413
414/// Error taxonomy surfaced by [`run`].
415#[non_exhaustive]
416#[derive(Debug, Error)]
417pub enum SyncError {
418    /// The pack-tree walker failed (loader error, git error, cycle, …).
419    #[error("tree walk failed: {0}")]
420    Tree(#[from] TreeError),
421    /// One or more plan-phase validators flagged the graph.
422    #[error("validation failed: {errors:?}")]
423    Validation {
424        /// Aggregated errors from manifest-level + graph-level validators.
425        errors: Vec<PackValidationError>,
426    },
427    /// An action executor returned an error.
428    ///
429    /// Retained for backward compatibility; new call sites should prefer
430    /// [`SyncError::Halted`] which carries full pack + action context.
431    /// Kept non-deprecated because [`From<ExecError>`] still materialises
432    /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
433    #[error("action execution failed: {0}")]
434    Exec(#[from] ExecError),
435    /// Action execution halted; full context (pack, action index, error,
436    /// optional recovery hint) lives in [`HaltedContext`]. This is the
437    /// variant the sync driver emits — [`SyncError::Exec`] is only
438    /// surfaced by ancillary code paths.
439    #[error(
440        "sync halted at pack `{}` action #{} ({}): {}",
441        .0.pack, .0.action_idx, .0.action_name, .0.error
442    )]
443    Halted(Box<HaltedContext>),
444    /// Another `grex` process (or thread) already holds the workspace-level
445    /// lock. The running sync refused to start to avoid racing two concurrent
446    /// walkers into the same workspace. If the lock file at `lock_path` is
447    /// stale (no other grex is actually running), remove it by hand.
448    #[error(
449        "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
450    )]
451    WorkspaceBusy {
452        /// Resolved workspace directory that the current run tried to lock.
453        workspace: PathBuf,
454        /// Sidecar lock file that is currently held.
455        lock_path: PathBuf,
456    },
457    /// Reading or parsing the resolved-state lockfile failed. Surfaced as
458    /// its own variant (rather than folded into `Validation`) because a
459    /// corrupt / unreadable lockfile is an I/O or schema fault, not a
460    /// dependency-satisfaction fault. Resolution is operator-level
461    /// (restore a backup, delete the file, re-sync), not author-level.
462    #[error("lockfile `{path}` failed to load: {source}")]
463    Lockfile {
464        /// Lockfile path that failed to load.
465        path: PathBuf,
466        /// Underlying lockfile error.
467        #[source]
468        source: LockfileError,
469    },
470    /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
471    /// as its own variant so the CLI can map it to a dedicated usage
472    /// error exit code instead of the generic sync-failure bucket.
473    #[error("invalid --only glob `{pattern}`: {source}")]
474    InvalidOnlyGlob {
475        /// The raw pattern string that failed to compile.
476        pattern: String,
477        /// Underlying globset error.
478        #[source]
479        source: globset::Error,
480    },
481    /// Migrating the v1.x event log (`grex.jsonl`) to the v2 canonical
482    /// path (`.grex/events.jsonl`) failed. Operator-level resolution
483    /// (check filesystem permissions, free disk space, then retry).
484    #[error("event-log migration failed: {0}")]
485    EventLogMigration(#[source] crate::manifest::ManifestError),
486    /// Cooperative cancellation fired (Ctrl-C / SIGTERM) during a
487    /// parallel sync. v1.2.0 Stage 1.g wires the rayon walker to surface
488    /// this distinct-from-failure variant so the CLI can exit with a
489    /// dedicated cancellation code instead of a generic sync error.
490    /// Dormant until Stage 1.g — the existing CLI does not yet emit it.
491    #[error("sync cancelled by user")]
492    SchedulerCancelled,
493}
494
495impl Clone for SyncError {
496    fn clone(&self) -> Self {
497        // `TreeError` / `ExecError` do not implement `Clone` (they wrap
498        // `std::io::Error`-adjacent values). Halts carry only a display
499        // rendering in the report; we re-materialise via a synthetic
500        // `Validation` variant so `SyncReport` can be `Clone`-safe for
501        // observability tooling without widening the taxonomy.
502        match self {
503            Self::Tree(e) => Self::Validation {
504                errors: vec![PackValidationError::DependsOnUnsatisfied {
505                    pack: "<tree>".into(),
506                    required: e.to_string(),
507                }],
508            },
509            Self::Validation { errors } => Self::Validation { errors: errors.clone() },
510            Self::Exec(e) => Self::Validation {
511                errors: vec![PackValidationError::DependsOnUnsatisfied {
512                    pack: "<exec>".into(),
513                    required: e.to_string(),
514                }],
515            },
516            Self::Halted(ctx) => Self::Validation {
517                errors: vec![PackValidationError::DependsOnUnsatisfied {
518                    pack: ctx.pack.clone(),
519                    required: format!(
520                        "action #{} ({}): {}",
521                        ctx.action_idx, ctx.action_name, ctx.error
522                    ),
523                }],
524            },
525            Self::WorkspaceBusy { workspace, lock_path } => {
526                Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
527            }
528            Self::Lockfile { path, source } => Self::Validation {
529                errors: vec![PackValidationError::DependsOnUnsatisfied {
530                    pack: "<lockfile>".into(),
531                    required: format!("{}: {source}", path.display()),
532                }],
533            },
534            Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
535                errors: vec![PackValidationError::DependsOnUnsatisfied {
536                    pack: "<only-glob>".into(),
537                    required: format!("{pattern}: {source}"),
538                }],
539            },
540            Self::EventLogMigration(source) => Self::Validation {
541                errors: vec![PackValidationError::DependsOnUnsatisfied {
542                    pack: "<event-log-migration>".into(),
543                    required: source.to_string(),
544                }],
545            },
546            Self::SchedulerCancelled => Self::SchedulerCancelled,
547        }
548    }
549}
550
551/// Run a full sync over the pack tree rooted at `pack_root`.
552///
553/// Resolution rules:
554/// * If `pack_root` is a directory the walker looks for
555///   `<pack_root>/.grex/pack.yaml`.
556/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
557/// * Workspace defaults to the pack root directory itself when
558///   `opts.workspace` is `None`. Children resolve as flat siblings of the
559///   parent pack root (since v1.1.0).
560///
561/// # Errors
562///
563/// Returns the first error that halts the pipeline — see [`SyncError`] for
564/// the taxonomy.
565///
566/// `cancel` is the cooperative cancellation handle threaded through the
567/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
568/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
569/// acquire). CLI callers pass a never-cancelled sentinel
570/// (`CancellationToken::new()`); the MCP server passes a token tied to
571/// the request lifetime.
572pub fn run(
573    pack_root: &Path,
574    opts: &SyncOptions,
575    cancel: &CancellationToken,
576) -> Result<SyncReport, SyncError> {
577    // Stage 2 is signature-only — silence "unused parameter" without
578    // hiding it behind `_` (downstream stages will read it).
579    let _ = cancel;
580    let workspace = prepare_workspace(pack_root, opts)?;
581    // v1.3.1 (B4) — `dry_run = true` is contractually FS-mutation-free.
582    // `open_workspace_lock` (via `ScopedLock::open`) creates a sidecar
583    // file at `<workspace>/.grex/.grex.sync.lock` (v1.3.2 B11), which
584    // would itself violate the no-FS-mutation contract. Skip lock
585    // acquisition entirely in dry-run; the dry-run path is read-only
586    // by construction so concurrent dry-runs against the same
587    // workspace are safe.
588    let mut ws_lock_holder =
589        if !opts.dry_run { Some(open_workspace_lock(&workspace)?) } else { None };
590    let _ws_guard = try_acquire_workspace_guard(ws_lock_holder.as_mut(), &workspace)?;
591
592    // Compile `--only` patterns into a GlobSet here so the
593    // `globset` crate version does not leak into `SyncOptions`.
594    let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
595
596    // Auto-migrate legacy `.grex/workspace/<name>/` layout BEFORE the
597    // walker resolves children. Idempotent: a fresh v1.1.0+ workspace
598    // sees no legacy directory and the function no-ops.
599    let workspace_migrations = migrate_legacy_workspace(pack_root);
600
601    // v1.2.1 path (iii) — three-stage composition:
602    //   sync_meta(workspace, prune_candidates) — mutate (rayon parallel)
603    //   build_graph(workspace)                 — read-only graph
604    //   run_actions(graph)                     — consume graph
605    // `Walker::walk` is retired from the prod path; the symbol is kept
606    // for test-suite compat. See `crates/grex-core/src/tree/graph_build.rs`.
607    run_sync_meta(&workspace, opts)?;
608    let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
609    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
610    log_force_flag(opts.force);
611
612    let mut report = SyncReport {
613        graph,
614        steps: Vec::new(),
615        halted: None,
616        event_log_warnings: Vec::new(),
617        pre_run_recovery: prep.pre_run_recovery,
618        workspace_migrations,
619    };
620
621    let mut next_lock = prep.prior_lock.clone();
622    // feat-m6 B1: resolve `--parallel` once and build the scheduler
623    // shared across every `ExecCtx` in this run. Library callers who
624    // leave `opts.parallel == None` default to `num_cpus::get()` here
625    // (clamped `>= 1`) so the scheduler slot is always populated —
626    // `ctx.scheduler` being `None` would strand acquire-sites into
627    // unbounded concurrency. See `inst/concurrency.md` §Scheduler.
628    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
629    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
630    run_actions(
631        &mut report,
632        &prep.order,
633        &prep.vars,
634        &workspace,
635        &prep.event_log,
636        &prep.lock_path,
637        opts.dry_run,
638        &prep.prior_lock,
639        &mut next_lock,
640        &prep.registry,
641        &prep.pack_type_registry,
642        only_set.as_ref(),
643        opts.force,
644        resolved_parallel,
645        &scheduler,
646    );
647
648    persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
649    Ok(report)
650}
651
652/// Bag of context pieces assembled once at the top of [`run`]. Grouping
653/// them keeps [`run`] under the workspace's 50-LOC function lint without
654/// smearing the read of sequential setup across helpers. Fields are
655/// consumed piecemeal by the actions loop; no getters needed.
656struct RunContext {
657    order: Vec<usize>,
658    vars: VarEnv,
659    event_log: PathBuf,
660    lock_path: PathBuf,
661    lockfile_path: PathBuf,
662    prior_lock: std::collections::HashMap<String, LockEntry>,
663    registry: Arc<Registry>,
664    pack_type_registry: Arc<PackTypeRegistry>,
665    pre_run_recovery: Option<RecoveryReport>,
666}
667
668/// Build the per-run context: traversal order, vars env, event/lockfile
669/// paths, prior lockfile state, bootstrap registry, and (optionally) a
670/// pre-run recovery scan. Kept narrow so [`run`] stays small.
671///
672/// `workspace` is the resolved workspace directory (post `--workspace`
673/// override) so the recovery scan looks for `.grex.bak` artefacts under
674/// the actual on-disk location children were materialised at — not
675/// under the pack root, which differs from the workspace whenever the
676/// CLI's `--workspace` flag is used. Pre-fix this anchoring drift
677/// caused recovery scans to miss every backup left under an override
678/// workspace.
679fn prepare_run_context(
680    pack_root: &Path,
681    graph: &PackGraph,
682    workspace: &Path,
683) -> Result<RunContext, SyncError> {
684    let event_log = event_log_path(pack_root);
685    let lock_path = event_lock_path(&event_log);
686    let vars = VarEnv::from_os();
687    let order = post_order(graph);
688    let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
689    let lockfile_path = lockfile_path(pack_root);
690    let prior_lock = load_prior_lock(&lockfile_path)?;
691    let registry = Arc::new(Registry::bootstrap());
692    let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
693    Ok(RunContext {
694        order,
695        vars,
696        event_log,
697        lock_path,
698        lockfile_path,
699        prior_lock,
700        registry,
701        pack_type_registry,
702        pre_run_recovery,
703    })
704}
705
706/// Build the [`PackTypeRegistry`] the sync driver threads into every
707/// [`ExecCtx`] it constructs.
708///
709/// Default path (no `plugin-inventory` feature) hard-codes the three
710/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
711/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
712/// externally-submitted plugin types (mirroring the M4-E pattern for
713/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
714/// helper so the `#[cfg]` split lives in one place instead of being
715/// smeared across every sync call-site.
716fn bootstrap_pack_type_registry() -> PackTypeRegistry {
717    #[cfg(feature = "plugin-inventory")]
718    {
719        let mut reg = PackTypeRegistry::bootstrap();
720        reg.register_from_inventory();
721        reg
722    }
723    #[cfg(not(feature = "plugin-inventory"))]
724    {
725        PackTypeRegistry::bootstrap()
726    }
727}
728
729/// Emit a single `tracing::info!` line when `--force` is active so
730/// operators can confirm from logs that the skip short-circuit was
731/// bypassed. Extracted so [`run`] stays small.
732fn log_force_flag(force: bool) {
733    if force {
734        tracing::info!(
735            target: "grex::sync",
736            "--force active: bypassing lockfile skip-on-hash short-circuit"
737        );
738    }
739}
740
741/// v1.2.1 path (iii) — drive the v1.2.0 [`sync_meta`] walker over the
742/// resolved canonical workspace.
743///
744/// This is the SOLE mutating pass in `sync::run`: clones, fetches,
745/// prune dispatches, distributed-lockfile reads, and TOCTOU `BoundedDir`
746/// opens all happen here. The subsequent [`build_and_validate_graph`]
747/// pass is read-only against the disk state this fn leaves behind.
748///
749/// `prune_candidates` is computed from the per-meta lockfile orphans:
750/// every entry in `<workspace>/.grex/grex.lock.jsonl` whose `path` no
751/// longer appears in the live root manifest's `children[]` is fed into
752/// Phase 2 for dispatch (with `--force-prune` / `--force-prune-with-ignored`
753/// overrides honoured by the consent walk). This closes the
754/// "prune-inert" gap from the previous wiring, where `sync::run` passed
755/// `&[]` and `--force-prune` was a CLI flag with no behavioural reach.
756///
757/// `--workspace` semantics: the canonical `workspace` argument is what
758/// `sync_meta` uses as its `meta_dir`. Children land at
759/// `<workspace>/<child.path>` — the v1.2.0 parent-relative model. Prior
760/// to v1.2.1, callers passing `--workspace` skipped the precursor
761/// entirely; that bypass is retired here so override callers see the
762/// same v1.2.0 semantics as the default-cwd path.
763///
764/// `SyncOptions::parallel` mapping (mirrors [`SyncMetaOptions::parallel`]
765/// with the documented `Some(0)` carve-out):
766/// * `None` → `SyncMetaOptions::parallel = None` (rayon default =
767///   `num_cpus::get()`).
768/// * `Some(0)` → `SyncMetaOptions::parallel = None` (the CLI sentinel
769///   for "unbounded" maps to rayon's default; `Some(0)` would be
770///   clamped to `1` inside `build_pool`, which is not what callers
771///   asking for unbounded want).
772/// * `Some(n)` for `n >= 1` → `SyncMetaOptions::parallel = Some(n)`.
773fn run_sync_meta(workspace: &Path, opts: &SyncOptions) -> Result<(), SyncError> {
774    let loader = FsPackLoader::new();
775    let backend = GixBackend::new();
776    let parallel = match opts.parallel {
777        None | Some(0) => None,
778        Some(n) => Some(n),
779    };
780    // v1.2.1 Item 5b — resolve the quarantine config relative to the
781    // canonical workspace (the same `meta_dir` `sync_meta` runs on).
782    // Trash bucket lives at `<workspace>/.grex/trash/`; audit log at
783    // `<workspace>/.grex/events.jsonl` — same path the existing
784    // `ForcePruneExecuted` event uses.
785    let quarantine = opts.quarantine.then(|| crate::tree::QuarantineConfig {
786        trash_root: workspace.join(".grex").join("trash"),
787        audit_log: crate::manifest::event_log_path(workspace),
788    });
789    // v1.2.5 — thread `--retain-days N` into the per-meta options so
790    // every recursion frame swept its own trash bucket. `None` skips
791    // the GC entirely (v1.2.1 indefinite-retention).
792    let retention =
793        opts.retain_days.map(|retain_days| crate::tree::RetentionConfig { retain_days });
794    let meta_opts = SyncMetaOptions {
795        ref_override: opts.ref_override.clone(),
796        recurse: opts.recurse,
797        max_depth: opts.max_depth,
798        force_prune: opts.force_prune,
799        force_prune_with_ignored: opts.force_prune_with_ignored,
800        parallel,
801        quarantine,
802        retention,
803        // v1.3.1 (B4) — propagate the orchestrator's dry-run flag into
804        // the walker so Phase 1 skips clone/fetch and emits the
805        // would-clone records into `SyncMetaReport::dry_run_would_clone`
806        // instead. The orchestrator already gates lockfile persist via
807        // `persist_lockfile_if_clean`; this wires the walker side.
808        dry_run: opts.dry_run,
809    };
810    let prune_candidates = compute_prune_candidates(workspace, &loader);
811    let report = sync_meta(workspace, &backend, &loader, &meta_opts, &prune_candidates)?;
812    if let Some(first) = report.errors.into_iter().next() {
813        return Err(SyncError::Tree(first));
814    }
815    Ok(())
816}
817
818/// v1.2.1 path (iii) — orphan-prune candidate computation.
819///
820/// Reads `<workspace>/.grex/grex.lock.jsonl` and the root manifest;
821/// returns every lockfile entry path that no longer matches a declared
822/// child in `manifest.children`. Empty in three cases:
823///
824/// * No lockfile (fresh workspace, never synced).
825/// * No manifest at `<workspace>/.grex/pack.yaml` (single-node tree —
826///   `sync_meta` will surface its own diagnostic).
827/// * Lockfile entries are all still declared (steady-state sync).
828///
829/// Lockfile read errors are tolerated as `Vec::new()`: the prune pass
830/// is opportunistic, and a corrupt lockfile is the migrator's concern,
831/// not the prune dispatcher's. Manifest read errors are similarly
832/// tolerated — `sync_meta` will fail loudly on the same condition,
833/// giving the operator a single unambiguous error surface.
834fn compute_prune_candidates(
835    workspace: &Path,
836    loader: &dyn crate::tree::PackLoader,
837) -> Vec<PathBuf> {
838    use crate::lockfile::read_meta_lockfile;
839    let entries = match read_meta_lockfile(workspace) {
840        Ok(e) => e,
841        Err(_) => return Vec::new(),
842    };
843    if entries.is_empty() {
844        return Vec::new();
845    }
846    let manifest = match loader.load(workspace) {
847        Ok(m) => m,
848        Err(_) => return Vec::new(),
849    };
850    let declared: std::collections::HashSet<String> =
851        manifest.children.iter().map(crate::pack::ChildRef::effective_path).collect();
852    entries
853        .into_iter()
854        .filter(|e| !declared.contains(&e.path))
855        .map(|e| PathBuf::from(e.path))
856        .collect()
857}
858
859/// v1.2.1 path (iii) — read-only graph build + plan-phase validation.
860///
861/// Builds the [`PackGraph`] from the on-disk meta tree rooted at
862/// `workspace`. Replaces the legacy `walk_and_validate` (which used
863/// [`crate::tree::Walker::walk`] and re-issued every clone/fetch as a
864/// no-op probe) with the v1.2.1 split:
865///
866/// * The mutating half ran in [`run_sync_meta`] — all clones, fetches,
867///   prune dispatches, and TOCTOU `BoundedDir` opens already happened.
868/// * THIS pass is strictly READ-ONLY. It walks the manifest tree
869///   parent-relatively (matching what `sync_meta` placed on disk),
870///   loads each child's `pack.yaml` (or synthesises a plain-git leaf),
871///   probes `head_sha`, and produces the [`PackGraph`] consumed by
872///   [`run_actions`].
873///
874/// Plan-phase validators run against the assembled graph when
875/// `validate` is true.
876fn build_and_validate_graph(
877    workspace: &Path,
878    validate: bool,
879    ref_override: Option<&str>,
880) -> Result<PackGraph, SyncError> {
881    let loader = FsPackLoader::new();
882    let backend = GixBackend::new();
883    let graph = build_graph(workspace, &backend, &loader, ref_override)?;
884    if validate {
885        validate_graph(&graph)?;
886    }
887    Ok(graph)
888}
889
890/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
891/// empty map; parse errors are fatal since writes are atomic and a torn
892/// lockfile therefore indicates real corruption that must be resolved
893/// before a fresh sync is safe. Parse/IO failures surface as
894/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
895/// dependency-satisfaction fault, so it gets its own taxonomy slot.
896fn load_prior_lock(
897    lockfile_path: &Path,
898) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
899    read_lockfile(lockfile_path)
900        .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
901}
902
903/// Persist `next_lock` atomically to `lockfile_path` whenever this was
904/// not a dry-run. On a halt the map has already had the halted pack's
905/// entry removed (see `run_actions`), so persisting now preserves every
906/// *successful* pack's fresh entry while guaranteeing absence of an
907/// entry for the halted pack — next sync sees no prior hash there and
908/// re-executes from scratch (route (b) halt-state gating). Write errors
909/// surface as non-fatal warnings on the report.
910fn persist_lockfile_if_clean(
911    report: &mut SyncReport,
912    lockfile_path: &Path,
913    next_lock: &std::collections::HashMap<String, LockEntry>,
914    dry_run: bool,
915) {
916    if dry_run {
917        return;
918    }
919    if let Err(e) = write_lockfile(lockfile_path, next_lock) {
920        tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
921        report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
922    }
923}
924
925/// Canonical location of the resolved-state lockfile
926/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
927/// so both audit artifacts live under a single `.grex/` sidecar.
928fn lockfile_path(pack_root: &Path) -> PathBuf {
929    pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
930}
931
932/// Create the workspace directory if it does not yet exist.
933fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
934    if !workspace.exists() {
935        std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
936            errors: vec![PackValidationError::DependsOnUnsatisfied {
937                pack: "<workspace>".into(),
938                required: format!("{}: {e}", workspace.display()),
939            }],
940        })?;
941    }
942    Ok(())
943}
944
945/// Open (but do not acquire) the workspace-level lock file.
946///
947/// v1.3.2 B11: lives at `<workspace>/.grex/.grex.sync.lock`. The `.grex/`
948/// parent is auto-created here so the lock open does not race against
949/// callers that have not yet seeded a manifest sidecar.
950fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
951    let ws_lock_path = workspace_lock_path(workspace);
952    if let Some(parent) = ws_lock_path.parent() {
953        std::fs::create_dir_all(parent)
954            .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
955    }
956    let ws_lock = ScopedLock::open(&ws_lock_path)
957        .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
958    Ok((ws_lock, ws_lock_path))
959}
960
961/// Try-acquire the workspace lock guard when the holder is `Some`.
962/// Returns `Ok(None)` when the holder is `None` (e.g. dry-run path skips
963/// lock acquisition entirely; see Blocker B4 v1.3.1). Translates the
964/// busy/error outcomes into the shared [`SyncError`] taxonomy. Extracted
965/// from [`run`] / [`teardown`] to keep both verb entry-points under the
966/// `clippy::too-many-lines` limit while preserving the original lock
967/// semantics.
968fn try_acquire_workspace_guard<'a>(
969    holder: Option<&'a mut (ScopedLock, PathBuf)>,
970    workspace: &Path,
971) -> Result<Option<fd_lock::RwLockWriteGuard<'a, std::fs::File>>, SyncError> {
972    let Some((ws_lock, ws_lock_path)) = holder else {
973        return Ok(None);
974    };
975    match ws_lock.try_acquire() {
976        Ok(Some(g)) => Ok(Some(g)),
977        Ok(None) => Err(SyncError::WorkspaceBusy {
978            workspace: workspace.to_path_buf(),
979            lock_path: ws_lock_path.clone(),
980        }),
981        Err(e) => Err(workspace_lock_err(ws_lock_path, &e.to_string())),
982    }
983}
984
985/// Build a `Validation` error describing a workspace-lock failure.
986fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
987    SyncError::Validation {
988        errors: vec![PackValidationError::DependsOnUnsatisfied {
989            pack: "<workspace-lock>".into(),
990            required: format!("{}: {reason}", ws_lock_path.display()),
991        }],
992    }
993}
994
995/// Single source of truth for the legacy workspace directory name.
996/// Pre-`v1.1.0` `resolve_workspace` joined `.grex/workspace/` onto the
997/// pack root by default; the auto-migration in
998/// [`migrate_legacy_workspace`] is the only place that legacy literal
999/// is allowed to appear in `crates/grex-core/src/`. The grep gate in
1000/// the v1.1.0 release checklist allows this one constant.
1001const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
1002
1003/// Auto-migrate any legacy `.grex/workspace/<name>/` child layout left
1004/// over from v1.0.x to the v1.1.0 flat-sibling layout. Idempotent: a
1005/// fresh workspace built on v1.1.0+ sees no `.grex/workspace/`
1006/// directory and the function no-ops.
1007///
1008/// Per-child outcomes:
1009///
1010/// * **Both legacy + flat-sibling exist** → `SkippedBothExist`. The
1011///   user needs to inspect (perhaps the legacy is stale, perhaps it is
1012///   the source of truth); we never silently delete either.
1013/// * **Flat-sibling slot occupied by a non-grex file or non-empty dir**
1014///   → `SkippedDestOccupied`. Refuse to clobber user data.
1015/// * **Legacy exists, flat-sibling absent** → `Migrated` via atomic
1016///   `fs::rename`. Same-volume move is the common case (the migration
1017///   stays inside `pack_root`); cross-volume failures surface as
1018///   `Failed { error }` with the OS message so the operator can move
1019///   manually.
1020/// * **Legacy absent** → silent no-op (not recorded in the report).
1021///
1022/// After all per-child decisions: orphan `.grex.sync.lock` under the
1023/// legacy workspace is removed (best-effort) and the empty
1024/// `.grex/workspace/` directory is rmdir'd (best-effort). Both are
1025/// soft-failures: leaving them on disk is harmless, surfacing the
1026/// errors as a sync abort would be over-strict.
1027///
1028/// Discovery is by directory listing, not by parent-manifest parse —
1029/// migration must work even when the parent manifest itself was
1030/// rewritten between versions. A child counts as "legacy" iff
1031/// `<pack_root>/<LEGACY_WORKSPACE_DIR>/<name>/.git` exists (i.e. it is
1032/// an actual git working tree, not stray metadata).
1033fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
1034    let root = pack_root_dir(pack_root);
1035    let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
1036    if !legacy_root.is_dir() {
1037        return Vec::new();
1038    }
1039    let entries = match fs::read_dir(&legacy_root) {
1040        Ok(e) => e,
1041        Err(e) => {
1042            tracing::warn!(
1043                target: "grex::sync::migrate",
1044                "legacy workspace `{}` unreadable: {e}",
1045                legacy_root.display(),
1046            );
1047            return Vec::new();
1048        }
1049    };
1050    let mut migrations = Vec::new();
1051    for entry_result in entries {
1052        let entry = match entry_result {
1053            Ok(e) => e,
1054            Err(e) => {
1055                tracing::warn!(
1056                    target: "grex::sync::migrate",
1057                    "skipping unreadable entry under `{}`: {e}",
1058                    legacy_root.display(),
1059                );
1060                continue;
1061            }
1062        };
1063        let Ok(ft) = entry.file_type() else { continue };
1064        // file_type avoids symlink-following; legitimate v1.0.x children
1065        // were always real directories, so anything else is skipped.
1066        if ft.is_symlink() || !ft.is_dir() {
1067            continue;
1068        }
1069        let name_os = entry.file_name();
1070        let Some(name) = name_os.to_str() else { continue };
1071        // Only act on entries that look like real cloned children (have
1072        // a `.git`). The legacy workspace lock file (`.grex.sync.lock`)
1073        // is not a directory and is filtered out by the dir check above;
1074        // we clean it up explicitly after the migration loop completes.
1075        let from_abs = entry.path();
1076        if !from_abs.join(".git").exists() {
1077            continue;
1078        }
1079        let to_abs = root.join(name);
1080        let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
1081        let to_rel = PathBuf::from(name);
1082        let outcome = decide_and_migrate(&from_abs, &to_abs);
1083        log_migration(&from_rel, &to_rel, &outcome);
1084        migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
1085    }
1086    cleanup_legacy_workspace_root(&legacy_root);
1087    migrations
1088}
1089
1090/// Decide what to do with one legacy child + perform the move when
1091/// safe. Returns the outcome to record on the [`WorkspaceMigration`].
1092fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
1093    let dest_exists = to.exists();
1094    let dest_is_grex_repo = dest_exists && to.join(".git").exists();
1095    if dest_is_grex_repo {
1096        // Both legacy and flat-sibling are git repos. Refuse to choose
1097        // between them; let the user resolve.
1098        return MigrationOutcome::SkippedBothExist;
1099    }
1100    if dest_exists {
1101        // Some other entry occupies the flat-sibling slot — a stray
1102        // file, an empty dir, an unrelated dir. Treat as user data and
1103        // leave both in place.
1104        return MigrationOutcome::SkippedDestOccupied;
1105    }
1106    match fs::rename(from, to) {
1107        Ok(()) => MigrationOutcome::Migrated,
1108        Err(e) => MigrationOutcome::Failed { error: e.to_string() },
1109    }
1110}
1111
1112/// Emit one structured log line per migration so users see exactly what
1113/// happened during the upgrade. Severity matches outcome: success is
1114/// `info`, skips and failures are `warn` so they surface in the default
1115/// log level without forcing operators to crank verbosity.
1116fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
1117    let from_disp = from.display();
1118    let to_disp = to.display();
1119    match outcome {
1120        MigrationOutcome::Migrated => {
1121            tracing::info!(
1122                target: "grex::sync::migrate",
1123                "migrated: legacy={from_disp} -> new={to_disp}",
1124            );
1125        }
1126        MigrationOutcome::SkippedBothExist => {
1127            tracing::warn!(
1128                target: "grex::sync::migrate",
1129                "skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
1130            );
1131        }
1132        MigrationOutcome::SkippedDestOccupied => {
1133            tracing::warn!(
1134                target: "grex::sync::migrate",
1135                "skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
1136            );
1137        }
1138        MigrationOutcome::Failed { error } => {
1139            tracing::warn!(
1140                target: "grex::sync::migrate",
1141                "failed: legacy={from_disp} -> new={to_disp}: {error}",
1142            );
1143        }
1144    }
1145}
1146
1147/// Best-effort cleanup of the legacy workspace root after migration:
1148/// remove the orphan `.grex.sync.lock` (always safe — the v1.1.0
1149/// workspace lock lives at `<pack_root>/.grex.sync.lock`) and try to
1150/// rmdir the now-empty `.grex/workspace/` directory. Errors are logged
1151/// at trace level only — both leftovers are harmless.
1152fn cleanup_legacy_workspace_root(legacy_root: &Path) {
1153    let orphan_lock = legacy_root.join(".grex.sync.lock");
1154    if orphan_lock.exists() {
1155        if let Err(e) = fs::remove_file(&orphan_lock) {
1156            tracing::warn!(
1157                target: "grex::sync::migrate",
1158                "could not remove orphan lock `{}`: {e}",
1159                orphan_lock.display(),
1160            );
1161        } else {
1162            tracing::info!(
1163                target: "grex::sync::migrate",
1164                "removed orphan lock `{}`",
1165                orphan_lock.display(),
1166            );
1167        }
1168    }
1169    // `remove_dir` only succeeds when the directory is empty — exactly
1170    // what we want; if any unmigrated child remains, the legacy root
1171    // stays put for the operator to inspect.
1172    let _ = fs::remove_dir(legacy_root);
1173}
1174
1175/// Compute the default workspace path when `override_` is absent.
1176///
1177/// The default is the pack root directory itself, so child packs
1178/// resolve as flat siblings of the parent pack root. The rationale —
1179/// alignment with the long-standing pack-spec rule that
1180/// `children[].path` is a bare name — lives in the pack-spec
1181/// "Validation rules" section (`man/concepts/pack-spec.md` /
1182/// `grex-doc/src/concepts/pack-spec.md`).
1183/// v1.2.1 path (iii) — resolve the workspace anchor with canonical
1184/// symlink resolution.
1185///
1186/// Resolution rules:
1187/// * `override_ = None` ⇒ derive workspace from `pack_root_dir(pack_root)`.
1188///   No canonicalize on this branch — the pack-root path was supplied
1189///   directly by the caller and may legitimately reference a not-yet-real
1190///   directory (e.g. integration fixtures that lazily materialise the
1191///   pack root).
1192/// * `override_ = Some(path)`:
1193///   1. **Must-exist** check. A `--workspace` override pointing at a
1194///      non-existent directory is a fail-fast error (we won't silently
1195///      `mkdir -p` someone else's typo).
1196///   2. **Canonicalise.** Resolve symlinks to a real path. This is the
1197///      anchor every downstream pass (`sync_meta`, `build_graph`, the
1198///      lockfile reads, the TOCTOU `BoundedDir` opens) hangs off — they
1199///      MUST agree on a single inode-stable string.
1200///   3. **Log when input != canonical.** Surfaces symlink resolution to
1201///      operators so they can correlate workspace-busy diagnostics with
1202///      what the OS actually opened.
1203fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> Result<PathBuf, SyncError> {
1204    let Some(input) = override_ else {
1205        return Ok(pack_root_dir(pack_root));
1206    };
1207    if !input.exists() {
1208        return Err(SyncError::Validation {
1209            errors: vec![PackValidationError::DependsOnUnsatisfied {
1210                pack: "<workspace>".into(),
1211                required: format!("--workspace {}: directory does not exist", input.display()),
1212            }],
1213        });
1214    }
1215    let canonical = match input.canonicalize() {
1216        Ok(p) => p,
1217        Err(e) => {
1218            return Err(SyncError::Validation {
1219                errors: vec![PackValidationError::DependsOnUnsatisfied {
1220                    pack: "<workspace>".into(),
1221                    required: format!("--workspace {}: canonicalize failed: {e}", input.display()),
1222                }],
1223            });
1224        }
1225    };
1226    if canonical != input {
1227        tracing::info!(
1228            target: "grex::sync",
1229            "workspace: {} → {}",
1230            input.display(),
1231            canonical.display(),
1232        );
1233    }
1234    Ok(canonical)
1235}
1236
1237/// Resolve the workspace, ensure the directory exists, and run the v1→v2
1238/// event-log migration. Extracted so [`run`] and [`teardown`] stay under
1239/// the workspace's 50-LOC per-function lint threshold.
1240fn prepare_workspace(pack_root: &Path, opts: &SyncOptions) -> Result<PathBuf, SyncError> {
1241    let workspace = resolve_workspace(pack_root, opts.workspace.as_deref())?;
1242    ensure_workspace_dir(&workspace)?;
1243    crate::manifest::ensure_event_log_migrated(&workspace).map_err(SyncError::EventLogMigration)?;
1244    Ok(workspace)
1245}
1246
1247/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
1248fn pack_root_dir(pack_root: &Path) -> PathBuf {
1249    let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
1250    if is_yaml {
1251        pack_root
1252            .parent()
1253            .and_then(Path::parent)
1254            .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
1255    } else {
1256        pack_root.to_path_buf()
1257    }
1258}
1259
1260/// Compute the `.grex/events.jsonl` path next to the pack root.
1261///
1262/// Delegates to [`crate::manifest::event_log_path`] (single source of
1263/// truth for the canonical event-log location).
1264fn event_log_path(pack_root: &Path) -> PathBuf {
1265    crate::manifest::event_log_path(&pack_root_dir(pack_root))
1266}
1267
1268/// Compute the sidecar lock path next to the event log. One canonical slot
1269/// per pack root — cooperating grex procs serialize through this file.
1270fn event_lock_path(event_log: &Path) -> PathBuf {
1271    event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
1272}
1273
1274/// Compute the sidecar lock path for the workspace itself. Lives at
1275/// `<workspace>/.grex/.grex.sync.lock` (v1.3.2 B11 hard-cut from
1276/// `<workspace>/.grex.sync.lock`). Co-locating under `.grex/` matches
1277/// the per-pack-lock and backend-lock placements so all stateful sidecar
1278/// files cluster under a single namespace.
1279fn workspace_lock_path(workspace: &Path) -> PathBuf {
1280    workspace.join(".grex").join(".grex.sync.lock")
1281}
1282
1283/// Aggregate manifest-level + graph-level validators and return their output.
1284fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
1285    let mut errors: Vec<PackValidationError> = Vec::new();
1286    for node in graph.nodes() {
1287        if let Err(mut e) = node.manifest.validate_plan() {
1288            errors.append(&mut e);
1289        }
1290    }
1291    if let Err(mut e) = graph.validate() {
1292        errors.append(&mut e);
1293    }
1294    if errors.is_empty() {
1295        Ok(())
1296    } else {
1297        Err(SyncError::Validation { errors })
1298    }
1299}
1300
1301/// Depth-first post-order traversal of the graph starting from root.
1302///
1303/// Children fully precede their parent in the returned vector so downstream
1304/// executors install leaves first and the root last.
1305fn post_order(graph: &PackGraph) -> Vec<usize> {
1306    let mut out = Vec::with_capacity(graph.nodes().len());
1307    visit_post(graph, 0, &mut out);
1308    out
1309}
1310
1311fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
1312    // Collect child ids first to avoid borrow conflicts with graph iteration.
1313    let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
1314    for k in kids {
1315        visit_post(graph, k, out);
1316    }
1317    out.push(id);
1318}
1319
1320/// Drive every action for every node; abort on the first [`ExecError`].
1321///
1322/// Each action is bracketed by three manifest events:
1323/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
1324/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
1325/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
1326///
1327/// All three writes go through the same [`ManifestLock`]-wrapped path
1328/// ([`append_manifest_event`]) and failures are recorded as non-fatal
1329/// warnings so the executor's outcome always dominates. The third append
1330/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
1331/// with the exact action that halted.
1332// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
1333// now pushes past the 50-LOC per-function lint by one line. Silence
1334// that one — the body itself is unchanged in scope.
1335#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1336fn run_actions(
1337    report: &mut SyncReport,
1338    order: &[usize],
1339    vars: &VarEnv,
1340    workspace: &Path,
1341    event_log: &Path,
1342    lock_path: &Path,
1343    dry_run: bool,
1344    prior_lock: &std::collections::HashMap<String, LockEntry>,
1345    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1346    registry: &Arc<Registry>,
1347    pack_type_registry: &Arc<PackTypeRegistry>,
1348    only: Option<&GlobSet>,
1349    force: bool,
1350    parallel: usize,
1351    scheduler: &Arc<Scheduler>,
1352) {
1353    let plan = PlanExecutor::with_registry(registry.clone());
1354    let fs = FsExecutor::with_registry(registry.clone());
1355    let rt = build_pack_type_runtime(parallel);
1356    let visited_meta = new_visited_meta();
1357    for &id in order {
1358        let Some(node) = report.graph.node(id) else { continue };
1359        let pack_name = node.name.clone();
1360        let pack_path = node.path.clone();
1361        let actions = node.manifest.actions.clone();
1362        let manifest = node.manifest.clone();
1363        let commit_sha = node.commit_sha.clone().unwrap_or_default();
1364        let synthetic = node.synthetic;
1365        // v1.3.1 B14: parent manifest's `ref:` value for this node,
1366        // captured by the walker. Threaded into `upsert_lock_entry`
1367        // so the lockfile `branch` slot mirrors the manifest verbatim.
1368        let manifest_ref = node.manifest_ref.clone();
1369        // `--only` filter + skip-on-hash short-circuits colocated in
1370        // `try_skip_or_filter` so this outer loop stays within the
1371        // 50-LOC per-function budget.
1372        if try_skip_or_filter(
1373            report,
1374            only,
1375            &pack_name,
1376            &pack_path,
1377            &actions,
1378            &commit_sha,
1379            synthetic,
1380            workspace,
1381            prior_lock,
1382            next_lock,
1383            dry_run,
1384            force,
1385        ) {
1386            continue;
1387        }
1388        let pack_halted = run_pack_lifecycle(
1389            report,
1390            vars,
1391            workspace,
1392            event_log,
1393            lock_path,
1394            dry_run,
1395            &plan,
1396            &fs,
1397            registry,
1398            pack_type_registry,
1399            &rt,
1400            &pack_name,
1401            &pack_path,
1402            &manifest,
1403            &visited_meta,
1404            scheduler,
1405        );
1406        if pack_halted {
1407            // Route (b) halt-state gating: drop any prior entry for the
1408            // halted pack so the next sync sees no prior hash and
1409            // re-executes from scratch. Successful packs in this same
1410            // run keep their freshly-upserted entries, and packs we did
1411            // not reach keep their prior entries untouched.
1412            next_lock.remove(&pack_name);
1413            return;
1414        }
1415        // Successful pack — record a fresh lockfile entry so the next
1416        // run's skip-on-hash test can succeed. Commit SHA is now plumbed
1417        // from the walker (M4-D): `PackNode::commit_sha` carries the
1418        // resolved HEAD SHA when the pack's working tree is a git
1419        // repository, otherwise an empty string keeps the hash stable.
1420        let actions_hash = compute_actions_hash(&actions, &commit_sha);
1421        upsert_lock_entry(
1422            prior_lock,
1423            next_lock,
1424            &pack_name,
1425            &commit_sha,
1426            &actions_hash,
1427            synthetic,
1428            manifest_ref.as_deref(),
1429        );
1430    }
1431}
1432
1433/// Build the multi-thread tokio runtime used to drive async pack-type
1434/// plugin dispatch. Pack-type plugins expose `async fn` methods via
1435/// `async_trait`, but the sync driver is synchronous end-to-end — we
1436/// block on each plugin future inside the outer action loop. Extracted
1437/// into a standalone helper so the runtime construction does not
1438/// inflate `run_actions` beyond the 50-LOC per-function budget.
1439///
1440/// # Multi-thread rationale (M5-2c)
1441///
1442/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
1443/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
1444/// itself is purely `async` / `.await` (no nested `block_on`), but future
1445/// plugin authors may reasonably compose `block_on` calls inside
1446/// lifecycle hooks — and external callers that drive `MetaPlugin` via
1447/// `rt.block_on(...)` within their own runtime would deadlock on a
1448/// current-thread runtime the moment a hook re-enters. A multi-thread
1449/// runtime with a small worker pool lets those re-entries resolve on a
1450/// sibling worker instead of blocking the dispatcher thread.
1451///
1452/// # Worker-thread sizing (feat-m6 H6)
1453///
1454/// The worker pool is sized from the resolved `--parallel` knob so the
1455/// runtime always has enough workers to service every in-flight pack op
1456/// plus at least one sibling for nested `block_on`. Clamped to
1457/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
1458/// one sibling so re-entrant hooks never deadlock), and the upper bound
1459/// caps the pool at the host's CPU count so `--parallel 0`
1460/// (unbounded-semantics) does not explode the worker count.
1461fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
1462    let workers = parallel.clamp(2, num_cpus::get().max(2));
1463    tokio::runtime::Builder::new_multi_thread()
1464        .worker_threads(workers)
1465        .enable_all()
1466        .build()
1467        .expect("tokio runtime for pack-type dispatch")
1468}
1469
1470/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
1471/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
1472/// the argument is threaded through so future explicit-install /
1473/// teardown verbs can share the same set shape.
1474fn new_visited_meta() -> MetaVisitedSet {
1475    std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
1476}
1477
1478/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
1479/// `true` when the outer loop should `continue` for this pack.
1480///
1481/// Extracted from `run_actions` so that function stays under the
1482/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
1483/// is a pure structural refactor.
1484#[allow(clippy::too_many_arguments)]
1485fn try_skip_or_filter(
1486    report: &mut SyncReport,
1487    only: Option<&GlobSet>,
1488    pack_name: &str,
1489    pack_path: &Path,
1490    actions: &[Action],
1491    commit_sha: &str,
1492    current_synthetic: bool,
1493    workspace: &Path,
1494    prior_lock: &std::collections::HashMap<String, LockEntry>,
1495    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1496    dry_run: bool,
1497    force: bool,
1498) -> bool {
1499    if skip_for_only_filter(only, pack_name, pack_path, workspace) {
1500        if let Some(prev) = prior_lock.get(pack_name) {
1501            next_lock.insert(pack_name.to_string(), prev.clone());
1502        }
1503        return true;
1504    }
1505    try_skip_pack(
1506        report,
1507        pack_name,
1508        pack_path,
1509        actions,
1510        commit_sha,
1511        current_synthetic,
1512        prior_lock,
1513        next_lock,
1514        dry_run,
1515        force,
1516    )
1517}
1518
1519/// Return `true` when `--only` is active and the pack's
1520/// **workspace-relative path** (normalized to forward-slash form) does
1521/// not match any of the registered globs. Name-fallback matching was
1522/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
1523/// "pack paths" and cross-platform consistency requires a single
1524/// normalized representation rather than `display()`-formatted strings
1525/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
1526/// as a glob-escape, not a path separator). For the root pack whose
1527/// `pack_path` is not under `workspace`, the fallback is to match
1528/// against the absolute path's forward-slash form.
1529fn skip_for_only_filter(
1530    only: Option<&GlobSet>,
1531    pack_name: &str,
1532    pack_path: &Path,
1533    workspace: &Path,
1534) -> bool {
1535    let Some(set) = only else { return false };
1536    let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
1537    let rel_str = rel.to_string_lossy().replace('\\', "/");
1538    let matches = set.is_match(&rel_str);
1539    if !matches {
1540        tracing::info!(
1541            target: "grex::sync",
1542            "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
1543        );
1544    }
1545    !matches
1546}
1547
1548/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
1549///
1550/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
1551/// with a pack-type-aware dispatch:
1552///
1553/// * [`PackType::Declarative`] retains the per-action execution shape that
1554///   M4 shipped — each action lands its own `ActionStarted` /
1555///   `ActionCompleted` / `ActionHalted` event bracket. The registry is
1556///   still consulted via [`PackTypeRegistry::get`] as a name-oracle so
1557///   mistyped packs fail closed.
1558/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
1559///   pack-type plugin's `sync` method (the sync CLI verb is the only
1560///   caller in M5-1; `install` / `update` / `teardown` verbs wire in
1561///   M5-2), returning a single aggregate [`ExecStep`]. A single event
1562///   bracket frames the async call.
1563///
1564/// Declarative is kept on the legacy per-action path because its event log
1565/// semantics (one event per action, per-step rollback context) are exactly
1566/// what plugin authors expect to observe. Unifying declarative under the
1567/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
1568/// to emit a step stream rather than a single aggregate.
1569#[allow(clippy::too_many_arguments)]
1570fn run_pack_lifecycle(
1571    report: &mut SyncReport,
1572    vars: &VarEnv,
1573    workspace: &Path,
1574    event_log: &Path,
1575    lock_path: &Path,
1576    dry_run: bool,
1577    plan: &PlanExecutor,
1578    fs: &FsExecutor,
1579    registry: &Arc<Registry>,
1580    pack_type_registry: &Arc<PackTypeRegistry>,
1581    rt: &tokio::runtime::Runtime,
1582    pack_name: &str,
1583    pack_path: &Path,
1584    manifest: &crate::pack::PackManifest,
1585    visited_meta: &MetaVisitedSet,
1586    scheduler: &Arc<Scheduler>,
1587) -> bool {
1588    let type_tag = manifest.r#type.as_str();
1589    // Name-oracle check: every pack type must be registered. Unknown
1590    // pack types halt the pack the same way M4 halted unknown actions.
1591    if pack_type_registry.get(type_tag).is_none() {
1592        let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1593        record_action_err(dry_run, report, event_log, lock_path, pack_name, 0, "pack-type", err);
1594        return true;
1595    }
1596    match manifest.r#type {
1597        crate::pack::PackType::Declarative => run_declarative_actions(
1598            report,
1599            vars,
1600            workspace,
1601            event_log,
1602            lock_path,
1603            dry_run,
1604            plan,
1605            fs,
1606            pack_name,
1607            pack_path,
1608            manifest,
1609            &manifest.actions,
1610            scheduler,
1611        ),
1612        crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
1613            report,
1614            vars,
1615            workspace,
1616            event_log,
1617            lock_path,
1618            dry_run,
1619            registry,
1620            pack_type_registry,
1621            rt,
1622            pack_name,
1623            pack_path,
1624            manifest,
1625            type_tag,
1626            visited_meta,
1627            scheduler,
1628        ),
1629    }
1630}
1631
1632/// Run a declarative pack's actions sequentially. Preserves the M4
1633/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1634/// `ActionHalted`). Returns `true` when the sync must halt.
1635#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1636fn run_declarative_actions(
1637    report: &mut SyncReport,
1638    vars: &VarEnv,
1639    workspace: &Path,
1640    event_log: &Path,
1641    lock_path: &Path,
1642    dry_run: bool,
1643    plan: &PlanExecutor,
1644    fs: &FsExecutor,
1645    pack_name: &str,
1646    pack_path: &Path,
1647    manifest: &crate::pack::PackManifest,
1648    actions: &[Action],
1649    scheduler: &Arc<Scheduler>,
1650) -> bool {
1651    // B12 v1.3.1: `apply_gitignore` was previously called here for
1652    // declarative packs (the per-action driver bypasses the plugin
1653    // path). Auto-mutation of the parent meta-repo's `.gitignore` was
1654    // removed in v1.3.1; `grex doctor` now surfaces an advisory when
1655    // the parent git index tracks pack content. The function is kept
1656    // as a no-op shim and the call is left in place so the diff stays
1657    // minimal — the reviewer pass will delete it together with the
1658    // other call sites in pack_type.rs.
1659    if !dry_run {
1660        let ctx = ExecCtx::new(vars, pack_path, workspace)
1661            .with_platform(Platform::current())
1662            .with_scheduler(scheduler);
1663        if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1664            record_action_err(dry_run, report, event_log, lock_path, pack_name, 0, "gitignore", e);
1665            return true;
1666        }
1667    }
1668    for (idx, action) in actions.iter().enumerate() {
1669        let ctx = ExecCtx::new(vars, pack_path, workspace)
1670            .with_platform(Platform::current())
1671            .with_scheduler(scheduler);
1672        let action_tag = action_kind_tag(action);
1673        append_manifest_event(
1674            dry_run,
1675            event_log,
1676            lock_path,
1677            &Event::ActionStarted {
1678                ts: Utc::now(),
1679                id: pack_name.to_string(),
1680                action_idx: idx,
1681                action_name: action_tag.to_string(),
1682                schema_version: SCHEMA_VERSION.to_string(),
1683            },
1684            &mut report.event_log_warnings,
1685        );
1686        let step_result =
1687            if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1688        if !record_action_outcome(
1689            dry_run,
1690            report,
1691            event_log,
1692            lock_path,
1693            pack_name,
1694            idx,
1695            action_tag,
1696            step_result,
1697        ) {
1698            return true;
1699        }
1700    }
1701    false
1702}
1703
1704/// Dispatch a pack-type plugin (meta / scripted) through the async
1705/// registry. Brackets the call with a single `ActionStarted` /
1706/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1707/// when the sync must halt.
1708#[allow(clippy::too_many_arguments)]
1709fn dispatch_pack_type_plugin(
1710    report: &mut SyncReport,
1711    vars: &VarEnv,
1712    workspace: &Path,
1713    event_log: &Path,
1714    lock_path: &Path,
1715    dry_run: bool,
1716    registry: &Arc<Registry>,
1717    pack_type_registry: &Arc<PackTypeRegistry>,
1718    rt: &tokio::runtime::Runtime,
1719    pack_name: &str,
1720    pack_path: &Path,
1721    manifest: &crate::pack::PackManifest,
1722    type_tag: &'static str,
1723    visited_meta: &MetaVisitedSet,
1724    scheduler: &Arc<Scheduler>,
1725) -> bool {
1726    // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1727    // The sync driver already walks children in post-order via the tree
1728    // walker; attaching the visited set would trigger MetaPlugin's
1729    // real-recursion branch and cause double dispatch (walker runs child
1730    // packs as their own graph nodes, then MetaPlugin would recurse into
1731    // them again). The `visited_meta` parameter is kept on the argument
1732    // list so future explicit-install / teardown verbs that invoke
1733    // MetaPlugin directly can share the same set shape.
1734    let _ = visited_meta;
1735    let ctx = ExecCtx::new(vars, pack_path, workspace)
1736        .with_platform(Platform::current())
1737        .with_registry(registry)
1738        .with_pack_type_registry(pack_type_registry)
1739        .with_scheduler(scheduler);
1740    append_manifest_event(
1741        dry_run,
1742        event_log,
1743        lock_path,
1744        &Event::ActionStarted {
1745            ts: Utc::now(),
1746            id: pack_name.to_string(),
1747            action_idx: 0,
1748            action_name: type_tag.to_string(),
1749            schema_version: SCHEMA_VERSION.to_string(),
1750        },
1751        &mut report.event_log_warnings,
1752    );
1753    // SAFETY: `get` just confirmed the plugin is registered for
1754    // `type_tag`, so this unwrap cannot panic under the matched arm.
1755    let plugin = pack_type_registry
1756        .get(type_tag)
1757        .expect("pack-type plugin must be registered (guarded above)");
1758    // feat-m6 CI fix — establish a task-local tier stack frame for every
1759    // async dispatch. Without this, `TierGuard::push` (which runs inside
1760    // the plugin lifecycle and may span `.await` / thread hops under the
1761    // multi-thread runtime) has no enforcement frame to push into.
1762    let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1763    !record_action_outcome(
1764        dry_run,
1765        report,
1766        event_log,
1767        lock_path,
1768        pack_name,
1769        0,
1770        type_tag,
1771        step_result,
1772    )
1773}
1774
1775/// Pure skip-eligibility decision. Returns `Some(hash)` when the pack
1776/// is eligible for the hash-skip short-circuit, `None` otherwise.
1777///
1778/// Splitting the decision out of [`try_skip_pack`] keeps the
1779/// side-effecting transcript bookkeeping testable in isolation: the
1780/// v1.1.1 synthetic-flag-flip regression exercises this helper without
1781/// having to stand up a `SyncReport` / `PackGraph`.
1782fn skip_eligibility(
1783    actions: &[Action],
1784    commit_sha: &str,
1785    current_synthetic: bool,
1786    prior: &LockEntry,
1787    dry_run: bool,
1788    force: bool,
1789) -> Option<String> {
1790    if dry_run || force {
1791        // Dry runs must always produce the planned-step transcript so
1792        // authors can see what `sync` *would* do. `--force` is the
1793        // operator's explicit opt-out from the hash short-circuit.
1794        return None;
1795    }
1796    let hash = compute_actions_hash(actions, commit_sha);
1797    if prior.actions_hash != hash {
1798        return None;
1799    }
1800    if prior.synthetic != current_synthetic {
1801        // Pack-shape flipped between runs (real ↔ synthetic). Even
1802        // when the actions hash matches by coincidence (e.g. a
1803        // declarative pack with empty `actions[]` whose pack.yaml was
1804        // deleted, falling through to a synthetic leaf with the same
1805        // empty actions list and stable commit SHA), we must NOT
1806        // carry the stale `synthetic` flag forward. Forcing the
1807        // upsert path re-emits the entry with the current flag.
1808        return None;
1809    }
1810    Some(hash)
1811}
1812
1813/// Decide whether `pack_name` can be short-circuited via a lockfile
1814/// hash match. When the prior hash matches the freshly-computed hash,
1815/// emit a single [`ExecResult::Skipped`] step and carry the prior
1816/// lockfile entry forward unchanged. Returns `true` when the pack was
1817/// skipped.
1818///
1819/// `current_synthetic` is the walker-derived synthetic flag for this
1820/// pack on the current run. The skip eligibility check requires it to
1821/// match `prior.synthetic` so a pack-shape transition (e.g. user
1822/// deletes `pack.yaml` so a previously-real pack now walks as
1823/// synthetic) invalidates the skip and forces the lockfile entry to
1824/// be re-emitted with the fresh `synthetic` value.
1825#[allow(clippy::too_many_arguments)]
1826fn try_skip_pack(
1827    report: &mut SyncReport,
1828    pack_name: &str,
1829    pack_path: &Path,
1830    actions: &[Action],
1831    commit_sha: &str,
1832    current_synthetic: bool,
1833    prior_lock: &std::collections::HashMap<String, LockEntry>,
1834    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1835    dry_run: bool,
1836    force: bool,
1837) -> bool {
1838    let Some(prior) = prior_lock.get(pack_name) else {
1839        return false;
1840    };
1841    let Some(hash) =
1842        skip_eligibility(actions, commit_sha, current_synthetic, prior, dry_run, force)
1843    else {
1844        return false;
1845    };
1846    let skipped_step = ExecStep {
1847        action_name: Cow::Borrowed("pack"),
1848        result: ExecResult::Skipped {
1849            pack_path: pack_path.to_path_buf(),
1850            actions_hash: hash.clone(),
1851        },
1852        // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1853        // short-circuit detail; we use it here instead of the prior
1854        // `Require { Satisfied, Skip }` proxy so renderers and consumers
1855        // can match on a single, purpose-built variant.
1856        details: StepKind::PackSkipped { actions_hash: hash },
1857    };
1858    report.steps.push(SyncStep {
1859        pack: pack_name.to_string(),
1860        action_idx: 0,
1861        exec_step: skipped_step,
1862    });
1863    // Carry the prior entry forward so the next-lock snapshot stays
1864    // consistent with what's on disk.
1865    next_lock.insert(pack_name.to_string(), prior.clone());
1866    true
1867}
1868
1869/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1870///
1871/// Stores `commit_sha` verbatim — including the empty string when the
1872/// pack is not a git working tree or the HEAD probe failed.
1873/// `actions_hash` is computed over the same `commit_sha`, so the two
1874/// fields stay internally consistent: if probing starts returning a
1875/// non-empty SHA on the next run, the hash differs and the skip is
1876/// correctly invalidated. The prior-preserve carve-out that was
1877/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1878/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1879///
1880/// `prior_lock` is consulted purely for observability: when a
1881/// previously-real pack flips to synthetic between runs (user deleted
1882/// the pack's `pack.yaml` so the walker fell back to v1.1.1
1883/// plain-git-child synthesis), a `tracing::warn!` records the
1884/// downgrade so the operator notices their declarative actions have
1885/// stopped running.
1886fn upsert_lock_entry(
1887    prior_lock: &std::collections::HashMap<String, LockEntry>,
1888    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1889    pack_name: &str,
1890    commit_sha: &str,
1891    actions_hash: &str,
1892    synthetic: bool,
1893    manifest_ref: Option<&str>,
1894) {
1895    if synthetic {
1896        if let Some(prior) = prior_lock.get(pack_name) {
1897            if !prior.synthetic {
1898                tracing::warn!(
1899                    target: "grex::sync",
1900                    pack = pack_name,
1901                    "pack `{pack_name}` downgraded from real to synthetic — \
1902                     pack.yaml missing on disk; only `git pull` will run going forward",
1903                );
1904            }
1905        }
1906    }
1907    let installed_at = Utc::now();
1908    let entry = next_lock.get(pack_name).map_or_else(
1909        || LockEntry {
1910            id: pack_name.to_string(),
1911            // v1.1.1 convention: path == id (1:1 id↔folder). Stage 1.e
1912            // (walker rewrite) will replace this with the parent-relative
1913            // manifest path captured during the walk.
1914            path: pack_name.to_string(),
1915            sha: commit_sha.to_string(),
1916            // v1.3.1 B14: mirror the parent manifest's `ref:` value
1917            // (or empty when absent), per the Lean theorem
1918            // `Grex.Lockfile.lockfile_branch_mirrors_manifest_ref`.
1919            branch: branch_of(manifest_ref),
1920            installed_at,
1921            actions_hash: actions_hash.to_string(),
1922            schema_version: "1".to_string(),
1923            synthetic,
1924        },
1925        |prev| LockEntry {
1926            installed_at,
1927            actions_hash: actions_hash.to_string(),
1928            sha: commit_sha.to_string(),
1929            synthetic,
1930            ..prev.clone()
1931        },
1932    );
1933    next_lock.insert(pack_name.to_string(), entry);
1934}
1935
1936/// Record one action outcome into `report` + event log. Returns `false`
1937/// when the run must halt (on error); `true` otherwise.
1938#[allow(clippy::too_many_arguments)]
1939fn record_action_outcome(
1940    dry_run: bool,
1941    report: &mut SyncReport,
1942    event_log: &Path,
1943    lock_path: &Path,
1944    pack_name: &str,
1945    idx: usize,
1946    action_tag: &'static str,
1947    step_result: Result<ExecStep, ExecError>,
1948) -> bool {
1949    match step_result {
1950        Ok(step) => {
1951            record_action_ok(dry_run, report, event_log, lock_path, pack_name, idx, step);
1952            true
1953        }
1954        Err(e) => {
1955            record_action_err(dry_run, report, event_log, lock_path, pack_name, idx, action_tag, e);
1956            false
1957        }
1958    }
1959}
1960
1961/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1962/// audit event, then push the step onto the report.
1963///
1964/// v1.3.1 B4 fix-up: under `dry_run = true`, the on-disk event-log writes
1965/// are skipped. The in-memory `report.steps` push still happens — dry-run
1966/// callers rely on the planned-step transcript for output.
1967#[allow(clippy::too_many_arguments)]
1968fn record_action_ok(
1969    dry_run: bool,
1970    report: &mut SyncReport,
1971    event_log: &Path,
1972    lock_path: &Path,
1973    pack_name: &str,
1974    idx: usize,
1975    step: ExecStep,
1976) {
1977    append_step_event(
1978        dry_run,
1979        event_log,
1980        lock_path,
1981        pack_name,
1982        &step,
1983        &mut report.event_log_warnings,
1984    );
1985    append_manifest_event(
1986        dry_run,
1987        event_log,
1988        lock_path,
1989        &Event::ActionCompleted {
1990            ts: Utc::now(),
1991            id: pack_name.to_string(),
1992            action_idx: idx,
1993            result_summary: format!("{:?}", step.result),
1994            schema_version: SCHEMA_VERSION.to_string(),
1995        },
1996        &mut report.event_log_warnings,
1997    );
1998    report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1999}
2000
2001/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
2002/// rich `HaltedContext` into `report.halted`.
2003///
2004/// v1.3.1 B4 fix-up: under `dry_run = true`, the on-disk event-log write
2005/// is skipped; the `report.halted` slot still receives the
2006/// [`HaltedContext`] so callers can render the halt reason without
2007/// touching disk.
2008#[allow(clippy::too_many_arguments)]
2009fn record_action_err(
2010    dry_run: bool,
2011    report: &mut SyncReport,
2012    event_log: &Path,
2013    lock_path: &Path,
2014    pack_name: &str,
2015    idx: usize,
2016    action_tag: &'static str,
2017    e: ExecError,
2018) {
2019    let error_summary = truncate_error_summary(&e);
2020    append_manifest_event(
2021        dry_run,
2022        event_log,
2023        lock_path,
2024        &Event::ActionHalted {
2025            ts: Utc::now(),
2026            id: pack_name.to_string(),
2027            action_idx: idx,
2028            action_name: action_tag.to_string(),
2029            error_summary,
2030            schema_version: SCHEMA_VERSION.to_string(),
2031        },
2032        &mut report.event_log_warnings,
2033    );
2034    let recovery_hint = recovery_hint_for(&e);
2035    report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
2036        pack: pack_name.to_string(),
2037        action_idx: idx,
2038        action_name: action_tag.to_string(),
2039        error: e,
2040        recovery_hint,
2041    })));
2042}
2043
2044/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
2045/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
2046/// stays uniform.
2047fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
2048    use crate::pack::Action;
2049    match action {
2050        Action::Symlink(_) => "symlink",
2051        Action::Unlink(_) => "unlink",
2052        Action::Env(_) => "env",
2053        Action::Mkdir(_) => "mkdir",
2054        Action::Rmdir(_) => "rmdir",
2055        Action::Require(_) => "require",
2056        Action::When(_) => "when",
2057        Action::Exec(_) => "exec",
2058    }
2059}
2060
2061/// Produce a bounded human summary of an [`ExecError`] for
2062/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
2063/// from pathological blowup when captured stderr is large.
2064fn truncate_error_summary(err: &ExecError) -> String {
2065    let mut s = err.to_string();
2066    if s.len() > ACTION_ERROR_SUMMARY_MAX {
2067        s.truncate(ACTION_ERROR_SUMMARY_MAX);
2068        s.push_str("…[truncated]");
2069    }
2070    s
2071}
2072
2073/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
2074/// `None` when no generic advice applies; the error's own `Display`
2075/// output is already shown by the `Halted` variant's format string.
2076fn recovery_hint_for(err: &ExecError) -> Option<String> {
2077    match err {
2078        ExecError::SymlinkDestOccupied { .. } => Some(
2079            "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
2080                .into(),
2081        ),
2082        ExecError::SymlinkPrivilegeDenied { .. } => {
2083            Some("enable Windows Developer Mode or re-run grex as administrator".into())
2084        }
2085        ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
2086            Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
2087        }
2088        ExecError::RmdirNotEmpty { .. } => {
2089            Some("set `force: true` on the rmdir action to recurse".into())
2090        }
2091        ExecError::EnvPersistenceDenied { .. } => {
2092            Some("re-run elevated (Machine scope needs admin)".into())
2093        }
2094        _ => None,
2095    }
2096}
2097
2098/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
2099///
2100/// Failures log a warning and are recorded in the report's
2101/// `event_log_warnings`; they do not abort the sync (spec: event-log write
2102/// failures are non-fatal).
2103///
2104/// # Concurrency
2105///
2106/// The append is serialized through a [`ManifestLock`] held across the
2107/// write. The lock is acquired **per action** (not once across the full
2108/// traversal) so cooperating grex processes can observe mid-progress log
2109/// state between actions; fd-lock acquisition is cheap on modern kernels
2110/// and sync runs are dominated by executor side effects, not lock waits.
2111/// This closes the bypass gap surfaced by the M3 concurrency review where
2112/// `append_event` was called without any cross-process serialisation.
2113fn append_step_event(
2114    dry_run: bool,
2115    log: &Path,
2116    lock_path: &Path,
2117    pack: &str,
2118    step: &ExecStep,
2119    warnings: &mut Vec<String>,
2120) {
2121    if dry_run {
2122        return;
2123    }
2124    let summary = format!("{}:{:?}", step.action_name, step.result);
2125    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
2126    if let Err(e) = append_event_locked(log, lock_path, &event) {
2127        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2128        warnings.push(format!("{}: {e}", log.display()));
2129    }
2130    // Schema version is recorded once at the manifest level by existing
2131    // manifest code; this stub uses the constant to keep a single source of
2132    // truth for forward-compat.
2133    let _ = SCHEMA_VERSION;
2134}
2135
2136/// Append a single [`Event`] under the shared [`ManifestLock`] path.
2137/// Failures are logged and recorded as non-fatal warnings — the spec
2138/// marks event-log write failures as non-aborting so a transient disk
2139/// error must not kill a sync mid-stream.
2140///
2141/// v1.3.1 B4 fix-up: when `dry_run` is `true`, this function is a no-op
2142/// — the dry-run contract forbids any write to `<workspace>/.grex/`,
2143/// including the audit `events.jsonl`. In-memory `event_log_warnings`
2144/// records remain available; only the on-disk side effect is gated.
2145fn append_manifest_event(
2146    dry_run: bool,
2147    log: &Path,
2148    lock_path: &Path,
2149    event: &Event,
2150    warnings: &mut Vec<String>,
2151) {
2152    if dry_run {
2153        return;
2154    }
2155    if let Err(e) = append_event_locked(log, lock_path, event) {
2156        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2157        warnings.push(format!("{}: {e}", log.display()));
2158    }
2159}
2160
2161/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
2162/// created lazily on first write.
2163fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
2164    if let Some(parent) = log.parent() {
2165        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2166    }
2167    if let Some(parent) = lock_path.parent() {
2168        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2169    }
2170    let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
2171    lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
2172}
2173
2174/// Re-export a cheap helper so CLI renderers can label halted steps by node
2175/// name without reaching into the graph twice.
2176#[must_use]
2177pub fn pack_display_name(node: &PackNode) -> &str {
2178    &node.name
2179}
2180
2181/// Run a full teardown over the pack tree rooted at `pack_root`.
2182///
2183/// Mirrors [`run`] but invokes
2184/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
2185/// **reverse** post-order so a parent tears down before its children
2186/// (the inverse of install). Children composed later by an author
2187/// consequently teardown earlier, matching the declarative
2188/// auto-reverse contract (R-M5-11).
2189///
2190/// All other concerns are identical to [`run`]: workspace lock, plan-
2191/// phase validators, lockfile update skipped (teardown does not
2192/// write a `actions_hash` forward), and event-log bracketing.
2193/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
2194/// user explicitly asked to remove the pack, so we always dispatch.
2195///
2196/// # Errors
2197///
2198/// Returns the first error that halts the pipeline — see [`SyncError`].
2199///
2200/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
2201/// the parameter through teardown for parity; stages 3-4 add the polls.
2202pub fn teardown(
2203    pack_root: &Path,
2204    opts: &SyncOptions,
2205    cancel: &CancellationToken,
2206) -> Result<SyncReport, SyncError> {
2207    let _ = cancel;
2208    let workspace = prepare_workspace(pack_root, opts)?;
2209    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
2210    let _ws_guard = match ws_lock.try_acquire() {
2211        Ok(Some(g)) => g,
2212        Ok(None) => {
2213            return Err(SyncError::WorkspaceBusy {
2214                workspace: workspace.clone(),
2215                lock_path: ws_lock_path,
2216            });
2217        }
2218        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
2219    };
2220
2221    // v1.2.1 path (iii) — teardown is read-only against the existing
2222    // disk state (no clones / fetches / prunes). It only needs the
2223    // graph build pass; `sync_meta` is intentionally skipped here.
2224    let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
2225    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
2226
2227    let mut report = SyncReport {
2228        graph,
2229        steps: Vec::new(),
2230        halted: None,
2231        event_log_warnings: Vec::new(),
2232        pre_run_recovery: prep.pre_run_recovery,
2233        // teardown does not run the legacy-layout migration — by the time
2234        // a user is tearing down, the layout has already been migrated
2235        // (or was never legacy in the first place). Surfacing an empty
2236        // list keeps the report shape symmetric with `run()`.
2237        workspace_migrations: Vec::new(),
2238    };
2239
2240    // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
2241    // Scheduler, thread it through every `ExecCtx` the teardown path
2242    // constructs. Teardown is the other user-facing verb that owns a
2243    // runtime, so it gets the same wiring.
2244    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
2245    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
2246    run_teardown(
2247        &mut report,
2248        &prep.order,
2249        &prep.vars,
2250        &workspace,
2251        &prep.event_log,
2252        &prep.lock_path,
2253        &prep.registry,
2254        &prep.pack_type_registry,
2255        resolved_parallel,
2256        &scheduler,
2257    );
2258    Ok(report)
2259}
2260
2261/// Dispatch `teardown` for every pack in **reverse** post-order.
2262/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
2263/// rather than the per-action M4 path because the trait's
2264/// auto-reverse / explicit-block logic must compose with the
2265/// registry; going through the per-action path would mean
2266/// re-implementing inverse synthesis in the sync loop.
2267#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2268fn run_teardown(
2269    report: &mut SyncReport,
2270    order: &[usize],
2271    vars: &VarEnv,
2272    workspace: &Path,
2273    event_log: &Path,
2274    lock_path: &Path,
2275    registry: &Arc<Registry>,
2276    pack_type_registry: &Arc<PackTypeRegistry>,
2277    parallel: usize,
2278    scheduler: &Arc<Scheduler>,
2279) {
2280    let rt = build_pack_type_runtime(parallel);
2281    // Reverse post-order: root first, then children. Pack-type plugin
2282    // teardown methods reverse their own children/actions, so the
2283    // outer loop only flips the inter-pack order.
2284    for &id in order.iter().rev() {
2285        let Some(node) = report.graph.node(id) else { continue };
2286        let pack_name = node.name.clone();
2287        let pack_path = node.path.clone();
2288        let manifest = node.manifest.clone();
2289        let type_tag = manifest.r#type.as_str();
2290        if pack_type_registry.get(type_tag).is_none() {
2291            let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
2292            // Teardown has no dry-run mode — pass `false` so the
2293            // event-log writes proceed as before.
2294            record_action_err(false, report, event_log, lock_path, &pack_name, 0, "pack-type", err);
2295            return;
2296        }
2297        let ctx = ExecCtx::new(vars, &pack_path, workspace)
2298            .with_platform(Platform::current())
2299            .with_registry(registry)
2300            .with_pack_type_registry(pack_type_registry)
2301            .with_scheduler(scheduler);
2302        append_manifest_event(
2303            false,
2304            event_log,
2305            lock_path,
2306            &Event::ActionStarted {
2307                ts: Utc::now(),
2308                id: pack_name.clone(),
2309                action_idx: 0,
2310                action_name: type_tag.to_string(),
2311                schema_version: SCHEMA_VERSION.to_string(),
2312            },
2313            &mut report.event_log_warnings,
2314        );
2315        let plugin = pack_type_registry
2316            .get(type_tag)
2317            .expect("pack-type plugin must be registered (guarded above)");
2318        // feat-m6 CI fix — see dispatch_pack_type note.
2319        let step_result =
2320            rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
2321        if !record_action_outcome(
2322            false,
2323            report,
2324            event_log,
2325            lock_path,
2326            &pack_name,
2327            0,
2328            type_tag,
2329            step_result,
2330        ) {
2331            return;
2332        }
2333    }
2334}
2335
2336/// Test-only hook: append one [`Event::Sync`] through the same
2337/// [`ManifestLock`]-serialised path the sync driver uses.
2338///
2339/// Exposed so integration tests under `tests/` can exercise the locked
2340/// append helper without spinning up a full pack tree. Not intended for
2341/// downstream consumers — the signature may change without notice.
2342#[doc(hidden)]
2343pub fn __test_append_sync_event(
2344    log: &Path,
2345    lock_path: &Path,
2346    pack: &str,
2347    action_name: &str,
2348) -> Result<(), String> {
2349    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
2350    append_event_locked(log, lock_path, &event)
2351}
2352
2353// ----------------------------------------------------------------------
2354// PR E — pre-run teardown scan
2355// ----------------------------------------------------------------------
2356
2357/// One `ActionStarted` event in the manifest log that has no matching
2358/// `ActionCompleted` or `ActionHalted` peer.
2359///
2360/// Dangling starts are the primary crash signal: the process wrote the
2361/// pre-action event, then died before the executor returned. Callers
2362/// should surface these to the operator (diagnostics only this PR; a
2363/// future `grex doctor` verb will act on them).
2364#[non_exhaustive]
2365#[derive(Debug, Clone, PartialEq, Eq)]
2366pub struct DanglingStart {
2367    /// Pack that owned the halted action.
2368    pub pack: String,
2369    /// 0-based action index within the pack.
2370    pub action_idx: usize,
2371    /// Short action kind tag.
2372    pub action_name: String,
2373    /// Timestamp the `ActionStarted` event was written.
2374    pub started_at: DateTime<Utc>,
2375}
2376
2377/// Summary of teardown artifacts found under a pack root before a sync
2378/// begins.
2379///
2380/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
2381/// proceeds regardless of what the scan finds.
2382#[non_exhaustive]
2383#[derive(Debug, Clone, Default, PartialEq, Eq)]
2384pub struct RecoveryReport {
2385    /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
2386    /// original (symlink-action rollback orphan).
2387    pub orphan_backups: Vec<PathBuf>,
2388    /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
2389    /// `backup: true`.
2390    pub orphan_tombstones: Vec<PathBuf>,
2391    /// `ActionStarted` events in the log with no matching
2392    /// `ActionCompleted`/`ActionHalted`.
2393    pub dangling_starts: Vec<DanglingStart>,
2394}
2395
2396impl RecoveryReport {
2397    /// `true` when the scan found nothing worth reporting.
2398    #[must_use]
2399    pub fn is_empty(&self) -> bool {
2400        self.orphan_backups.is_empty()
2401            && self.orphan_tombstones.is_empty()
2402            && self.dangling_starts.is_empty()
2403    }
2404}
2405
2406/// Walk `workspace` and the manifest log to find crash-recovery artifacts.
2407///
2408/// Inspects:
2409///
2410/// * `workspace` for `.grex.bak` orphans and timestamped `.grex.bak.<ts>`
2411///   tombstones. The workspace IS where children materialise (whether
2412///   the default flat-sibling layout under the pack root, or an
2413///   explicit `--workspace` override directory) so this single bounded
2414///   walk covers every backup site.
2415/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
2416///   have no matching `ActionCompleted` / `ActionHalted` successor.
2417///
2418/// Non-blocking: scan errors are swallowed to an empty report so a
2419/// half-readable directory cannot kill a sync that would otherwise
2420/// succeed. Call sites that want to surface scan failures should read
2421/// the manifest directly.
2422///
2423/// Pre-`v1.1.0` post-review fix this anchored at `pack_root_dir(pack_root)`,
2424/// which missed every backup under a `--workspace` override.
2425///
2426/// # Errors
2427///
2428/// Returns [`SyncError::Validation`] only when the manifest read itself
2429/// reports corruption. Filesystem traversal errors are swallowed.
2430pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
2431    let mut report = RecoveryReport::default();
2432    walk_for_backups(workspace, &mut report);
2433    if event_log.exists() {
2434        match read_all(event_log) {
2435            Ok(events) => {
2436                report.dangling_starts = collect_dangling_starts(&events);
2437            }
2438            Err(e) => {
2439                return Err(SyncError::Validation {
2440                    errors: vec![PackValidationError::DependsOnUnsatisfied {
2441                        pack: "<event-log>".into(),
2442                        required: e.to_string(),
2443                    }],
2444                });
2445            }
2446        }
2447    }
2448    Ok(report)
2449}
2450
2451/// Shallow directory walker (bounded depth = 6) that categorizes
2452/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
2453/// report slot. Depth-limited so a pathological workspace with a deep
2454/// tree cannot stall the scan; realistic layouts are well under six
2455/// levels.
2456fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
2457    walk_for_backups_inner(root, report, 0);
2458}
2459
2460fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
2461    const MAX_DEPTH: u32 = 6;
2462    if depth > MAX_DEPTH {
2463        return;
2464    }
2465    let Ok(entries) = std::fs::read_dir(dir) else { return };
2466    for entry_result in entries {
2467        let entry = match entry_result {
2468            Ok(e) => e,
2469            Err(e) => {
2470                tracing::warn!(
2471                    target: "grex::sync::recover",
2472                    "skipping unreadable entry under `{}`: {e}",
2473                    dir.display(),
2474                );
2475                continue;
2476            }
2477        };
2478        let path = entry.path();
2479        let name = entry.file_name();
2480        let Some(name_str) = name.to_str() else { continue };
2481        if name_str.ends_with(".grex.bak") {
2482            report.orphan_backups.push(path.clone());
2483            continue;
2484        }
2485        if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
2486            // `rsplit_once` returns `(prefix, suffix)`; suffix is the
2487            // timestamp chunk. Accept any non-empty suffix — the exact
2488            // timestamp shape is `fs_executor` internal.
2489            if !rest.1.is_empty() {
2490                report.orphan_tombstones.push(path.clone());
2491                continue;
2492            }
2493        }
2494        // Recurse only into real directories (not symlinks, to avoid
2495        // traversing into the workspace's cloned repos via aliased
2496        // paths). `entry.file_type()` does NOT follow symlinks (unlike
2497        // `entry.metadata()` which would dereference and report the
2498        // target's type — defeating the very check this guards). The
2499        // symlink-skip is also explicit so the intent is recoverable
2500        // from the source: backup-recovery never crosses a symlink.
2501        let Ok(ft) = entry.file_type() else { continue };
2502        if ft.is_symlink() {
2503            continue;
2504        }
2505        if ft.is_dir() {
2506            walk_for_backups_inner(&path, report, depth + 1);
2507        }
2508    }
2509}
2510
2511/// Reduce an event stream to a list of `ActionStarted` records with no
2512/// matching terminator.
2513///
2514/// Matching is positional per `(pack, action_idx)`: a later
2515/// `ActionCompleted` or `ActionHalted` with the same key clears the
2516/// entry. Whatever remains in the map after the pass is dangling.
2517fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
2518    use std::collections::HashMap;
2519    let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
2520    for ev in events {
2521        match ev {
2522            // v1.3.1 schema v2: pack-id field is `id`. The destructure
2523            // binds `id` and `schema_version` is ignored via `..`.
2524            Event::ActionStarted { ts, id, action_idx, action_name, .. } => {
2525                open.insert(
2526                    (id.clone(), *action_idx),
2527                    DanglingStart {
2528                        pack: id.clone(),
2529                        action_idx: *action_idx,
2530                        action_name: action_name.clone(),
2531                        started_at: *ts,
2532                    },
2533                );
2534            }
2535            Event::ActionCompleted { id, action_idx, .. }
2536            | Event::ActionHalted { id, action_idx, .. } => {
2537                open.remove(&(id.clone(), *action_idx));
2538            }
2539            _ => {}
2540        }
2541    }
2542    let mut out: Vec<DanglingStart> = open.into_values().collect();
2543    out.sort_by_key(|a| a.started_at);
2544    out
2545}
2546
2547#[cfg(test)]
2548mod synthetic_transition_tests {
2549    //! v1.1.1 — regression cover for the pack-shape transition fixes.
2550    //!
2551    //! These tests exercise [`skip_eligibility`] / [`upsert_lock_entry`]
2552    //! directly (no walker, no fs) so the assertion is on the plumbing
2553    //! itself: skip eligibility must require synthetic-flag agreement
2554    //! even when the actions hash matches by coincidence, and the
2555    //! upsert path must record the real-to-synthetic downgrade in the
2556    //! lockfile so the operator's lockfile reflects what just happened.
2557    use super::{skip_eligibility, upsert_lock_entry, LockEntry};
2558    use crate::lockfile::compute_actions_hash;
2559    use chrono::{TimeZone, Utc};
2560    use std::collections::HashMap;
2561
2562    fn ts() -> chrono::DateTime<Utc> {
2563        Utc.with_ymd_and_hms(2026, 4, 27, 10, 0, 0).unwrap()
2564    }
2565
2566    /// Stable empty-actions hash with a fixed commit SHA. The same
2567    /// inputs feed both the prior (real) and the new (synthetic)
2568    /// configuration in the regression below, which is exactly the
2569    /// coincidental-hash-match scenario FIX 3 must catch.
2570    fn stable_hash() -> String {
2571        compute_actions_hash(&[], "deadbeef")
2572    }
2573
2574    fn prior_entry(synthetic: bool) -> LockEntry {
2575        LockEntry {
2576            id: "alpha".into(),
2577            path: "alpha".into(),
2578            sha: "deadbeef".into(),
2579            branch: "main".into(),
2580            installed_at: ts(),
2581            actions_hash: stable_hash(),
2582            schema_version: "1".into(),
2583            synthetic,
2584        }
2585    }
2586
2587    /// FIX 3 — pack flips from real → synthetic but `actions_hash` and
2588    /// `commit_sha` happen to match. The skip MUST be invalidated so
2589    /// the upsert path re-emits the lockfile entry with `synthetic =
2590    /// true`.
2591    #[test]
2592    fn skip_eligibility_invalidates_when_synthetic_flag_flips() {
2593        let prior = prior_entry(false);
2594        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2595        assert!(decision.is_none(), "skip must be invalidated when synthetic flag flips");
2596    }
2597
2598    /// Same hash, same synthetic flag → skip is allowed (baseline).
2599    #[test]
2600    fn skip_eligibility_allows_skip_when_synthetic_matches() {
2601        let prior = prior_entry(true);
2602        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2603        assert_eq!(
2604            decision.as_deref(),
2605            Some(stable_hash().as_str()),
2606            "skip must be honoured when synthetic flag matches",
2607        );
2608    }
2609
2610    /// `dry_run` and `force` always disable the skip regardless of
2611    /// flag agreement.
2612    #[test]
2613    fn skip_eligibility_respects_dry_run_and_force() {
2614        let prior = prior_entry(true);
2615        assert!(skip_eligibility(&[], "deadbeef", true, &prior, true, false).is_none());
2616        assert!(skip_eligibility(&[], "deadbeef", true, &prior, false, true).is_none());
2617    }
2618
2619    /// FIX 4 — `upsert_lock_entry` records the downgrade in the
2620    /// lockfile (entry flips to `synthetic = true`) when the prior
2621    /// entry was real. The `tracing::warn!` is fire-and-forget, but
2622    /// the lockfile transition itself is observable and must be
2623    /// correct.
2624    #[test]
2625    fn upsert_lock_entry_records_real_to_synthetic_downgrade() {
2626        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2627        prior.insert(
2628            "beta".into(),
2629            LockEntry {
2630                id: "beta".into(),
2631                path: "beta".into(),
2632                sha: "deadbeef".into(),
2633                branch: "main".into(),
2634                installed_at: ts(),
2635                actions_hash: stable_hash(),
2636                schema_version: "1".into(),
2637                synthetic: false,
2638            },
2639        );
2640        let mut next: HashMap<String, LockEntry> = HashMap::new();
2641
2642        upsert_lock_entry(&prior, &mut next, "beta", "deadbeef", &stable_hash(), true, None);
2643
2644        let entry = next.get("beta").expect("entry must be upserted");
2645        assert!(entry.synthetic, "downgraded entry must carry synthetic = true");
2646        assert_eq!(entry.actions_hash, stable_hash(), "actions_hash must reflect current run");
2647    }
2648
2649    /// Upsert path is a no-op for the steady-state case (synthetic →
2650    /// synthetic): the entry is replaced with the current run's
2651    /// timestamp/hash but the synthetic flag is preserved. This
2652    /// guards against an over-eager warning fire.
2653    #[test]
2654    fn upsert_lock_entry_no_op_for_steady_state_synthetic() {
2655        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2656        prior.insert(
2657            "gamma".into(),
2658            LockEntry {
2659                id: "gamma".into(),
2660                path: "gamma".into(),
2661                sha: "deadbeef".into(),
2662                branch: "main".into(),
2663                installed_at: ts(),
2664                actions_hash: stable_hash(),
2665                schema_version: "1".into(),
2666                synthetic: true,
2667            },
2668        );
2669        let mut next: HashMap<String, LockEntry> = HashMap::new();
2670
2671        upsert_lock_entry(&prior, &mut next, "gamma", "deadbeef", &stable_hash(), true, None);
2672
2673        let entry = next.get("gamma").expect("entry must be upserted");
2674        assert!(entry.synthetic, "synthetic must remain true on no-op refresh");
2675    }
2676}
2677
2678#[cfg(test)]
2679mod error_display_tests {
2680    //! v1.2.0 Stage 1.k — `SyncError` Display assertions.
2681    //!
2682    //! Pure construction + `to_string()` checks. Variants land dormant —
2683    //! Stage 1.g (rayon scheduler) wires `SchedulerCancelled` once
2684    //! cooperative cancel polls reach the parallel walker.
2685    use super::SyncError;
2686
2687    #[test]
2688    fn test_sync_error_scheduler_cancelled_display() {
2689        let err = SyncError::SchedulerCancelled;
2690        assert_eq!(err.to_string(), "sync cancelled by user");
2691    }
2692}
2693
2694#[cfg(test)]
2695mod sync_options_v1_2_0_tests {
2696    //! v1.2.0 Stage 1.m — leaf cover for new [`SyncOptions`] fields.
2697    //!
2698    //! These tests are mechanical default-value assertions plus simple
2699    //! builder/clone round-trips. They exist to lock down that:
2700    //!
2701    //! 1. Adding the new fields preserves v1.1.1 behavior (defaults
2702    //!    leave existing call sites observably unchanged).
2703    //! 2. The shape is what later walker stages (1.h / 1.j / 1.l) will
2704    //!    consume — if any of these fields are renamed or change type,
2705    //!    those stages must update in lock-step.
2706    //!
2707    //! The fields themselves are *dormant placeholders* at 1.m scope —
2708    //! no behavior wiring lives in this stage.
2709    use super::{pack_root_dir, resolve_workspace, SyncError, SyncOptions};
2710
2711    /// `force_prune` defaults to `false` so existing call sites refuse
2712    /// to drop dirty trees (v1.1.1 behavior).
2713    #[test]
2714    fn test_sync_options_default_force_prune_false() {
2715        let opts = SyncOptions::default();
2716        assert!(!opts.force_prune, "force_prune must default to false");
2717    }
2718
2719    /// `force_prune_with_ignored` defaults to `false` so existing call
2720    /// sites refuse to drop ignored content (v1.1.1 behavior).
2721    #[test]
2722    fn test_sync_options_default_force_prune_with_ignored_false() {
2723        let opts = SyncOptions::default();
2724        assert!(!opts.force_prune_with_ignored, "force_prune_with_ignored must default to false");
2725    }
2726
2727    /// `migrate_lockfile` defaults to `false` so the walker errors on
2728    /// legacy v1.1.1 lockfile shapes unless the caller opts in.
2729    #[test]
2730    fn test_sync_options_default_migrate_lockfile_false() {
2731        let opts = SyncOptions::default();
2732        assert!(!opts.migrate_lockfile, "migrate_lockfile must default to false");
2733    }
2734
2735    /// `recurse` defaults to `true` — the walker descends into nested
2736    /// meta-children unless `--shallow` is requested.
2737    #[test]
2738    fn test_sync_options_default_recurse_true() {
2739        let opts = SyncOptions::default();
2740        assert!(opts.recurse, "recurse must default to true");
2741    }
2742
2743    /// `max_depth` defaults to `None` — unbounded recursion when
2744    /// `recurse` is `true`.
2745    #[test]
2746    fn test_sync_options_default_max_depth_none() {
2747        let opts = SyncOptions::default();
2748        assert!(opts.max_depth.is_none(), "max_depth must default to None");
2749    }
2750
2751    /// Setting `force_prune_with_ignored = true` alongside
2752    /// `force_prune = true` is the documented "stronger" combination.
2753    /// No contradiction: `with_ignored` is the harder override and
2754    /// implies the base `force_prune` semantics. This test guards the
2755    /// invariant that both flags coexist as plain `bool` (not enum)
2756    /// so callers can set them independently without runtime panic.
2757    #[test]
2758    fn test_sync_options_force_prune_with_ignored_implies_force_prune() {
2759        let opts = SyncOptions {
2760            force_prune: true,
2761            force_prune_with_ignored: true,
2762            ..SyncOptions::default()
2763        };
2764        assert!(opts.force_prune);
2765        assert!(opts.force_prune_with_ignored);
2766    }
2767
2768    /// `max_depth = Some(n)` paired with `recurse = true` is the
2769    /// documented `--shallow=N` shape. The fields are independent
2770    /// `bool` / `Option<usize>` so callers may set `max_depth` while
2771    /// `recurse` is left at its default (`true`). Stage 1.j will
2772    /// later define the precise interaction; this test only locks
2773    /// the two fields' types and defaults.
2774    #[test]
2775    fn test_sync_options_max_depth_pairs_with_recurse() {
2776        let opts = SyncOptions { max_depth: Some(2), ..SyncOptions::default() };
2777        assert_eq!(opts.max_depth, Some(2));
2778        assert!(opts.recurse, "recurse stays at its default (true) when only max_depth is set");
2779    }
2780
2781    /// Round-trip via `Clone` — guards that all new fields participate
2782    /// in the existing `Clone` derive (no `#[clone(skip)]` slipped in).
2783    #[test]
2784    fn test_sync_options_clone_preserves_new_fields() {
2785        let opts = SyncOptions {
2786            force_prune: true,
2787            force_prune_with_ignored: true,
2788            migrate_lockfile: true,
2789            recurse: false,
2790            max_depth: Some(7),
2791            ..SyncOptions::default()
2792        };
2793        let cloned = opts.clone();
2794        assert_eq!(cloned.force_prune, opts.force_prune);
2795        assert_eq!(cloned.force_prune_with_ignored, opts.force_prune_with_ignored);
2796        assert_eq!(cloned.migrate_lockfile, opts.migrate_lockfile);
2797        assert_eq!(cloned.recurse, opts.recurse);
2798        assert_eq!(cloned.max_depth, opts.max_depth);
2799    }
2800
2801    // ------------------------------------------------------------------
2802    // v1.2.1 path (iii) — `resolve_workspace` canonicalisation tests
2803    // ------------------------------------------------------------------
2804
2805    /// `--workspace` pointing at a non-existent directory must fail
2806    /// fast with a Validation error citing the offending path. We
2807    /// explicitly do NOT mkdir-p someone else's typo — `--workspace`
2808    /// is an opt-in operator decision and a missing target is always
2809    /// a configuration mistake.
2810    #[test]
2811    fn test_resolve_workspace_errors_on_missing_override_dir() {
2812        let tmp = tempfile::tempdir().unwrap();
2813        let missing = tmp.path().join("nope");
2814        let pack_root = tmp.path();
2815        let err = resolve_workspace(pack_root, Some(missing.as_path())).expect_err("must fail");
2816        match err {
2817            SyncError::Validation { errors } => {
2818                assert!(errors.iter().any(|e| format!("{e}").contains("does not exist")));
2819            }
2820            other => panic!("expected Validation, got {other:?}"),
2821        }
2822    }
2823
2824    /// `--workspace = None` is the default cwd-meta path — no
2825    /// canonicalize, no fail-on-missing. The pack-root path is
2826    /// returned verbatim (post `pack_root_dir` normalisation).
2827    #[test]
2828    fn test_resolve_workspace_none_returns_pack_root_dir() {
2829        let tmp = tempfile::tempdir().unwrap();
2830        let pack_root = tmp.path().join("nonexistent-yet");
2831        let resolved = resolve_workspace(&pack_root, None).expect("None override is always Ok");
2832        assert_eq!(resolved, pack_root_dir(&pack_root));
2833    }
2834
2835    /// `--workspace = Some(<existing>)` returns the canonicalised path.
2836    /// On Windows this typically inserts the `\\?\` long-path prefix;
2837    /// on Unix it resolves any `..` / symlink components. Either way
2838    /// the returned path is what every downstream pass anchors against.
2839    #[test]
2840    fn test_resolve_workspace_canonicalises_existing_override() {
2841        let tmp = tempfile::tempdir().unwrap();
2842        let real = tmp.path().join("real-ws");
2843        std::fs::create_dir_all(&real).unwrap();
2844        let pack_root = tmp.path();
2845        let resolved =
2846            resolve_workspace(pack_root, Some(real.as_path())).expect("existing dir must resolve");
2847        let canonical = real.canonicalize().unwrap();
2848        assert_eq!(resolved, canonical);
2849    }
2850}