Skip to main content

grex_core/
sync.rs

1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`crate::tree::sync_meta`] +
7//!    [`crate::tree::build_graph`] + [`FsPackLoader`] + a `GitBackend`.
8//! 2. Run plan-phase validators (manifest-level + graph-level).
9//! 3. Execute every action via a pluggable [`ActionExecutor`]
10//!    ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
11//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
12//!    `.grex/events.jsonl` event log.
13//!
14//! # Traversal order
15//!
16//! Nodes are executed in **depth-first post-order**: children fully install
17//! before their parent. Rationale: parent packs commonly `require:` artifacts
18//! created by children (e.g. a parent symlink whose `src` lives inside a
19//! child). Running the root last matches the overlay-style dotfile-install
20//! intent authors expect, and it matches how `walker.walk` is structured
21//! (children are hydrated before the recursion returns).
22//!
23//! # Decoupling
24//!
25//! The CLI crate drives this module through a thin `run()` entry point;
26//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
27//! expressions, ref overrides) can land in later milestones without breaking
28//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
29//! variant set.
30
31use std::borrow::Cow;
32use std::fs;
33use std::path::{Path, PathBuf};
34use std::sync::Arc;
35
36use chrono::{DateTime, Utc};
37use globset::{Glob, GlobSet, GlobSetBuilder};
38use thiserror::Error;
39use tokio_util::sync::CancellationToken;
40
41use crate::execute::{
42    ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
43    PlanExecutor, Platform, StepKind,
44};
45use crate::fs::{ManifestLock, ScopedLock};
46use crate::git::GixBackend;
47use crate::lockfile::{
48    compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
49};
50use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
51use crate::pack::{Action, PackValidationError};
52use crate::plugin::{PackTypeRegistry, Registry};
53use crate::scheduler::Scheduler;
54use crate::tree::{
55    build_graph, sync_meta, FsPackLoader, PackGraph, PackNode, SyncMetaOptions, TreeError,
56};
57use crate::vars::VarEnv;
58
59/// Inputs to [`run`].
60///
61/// Fields are public-writable so call sites can construct with struct
62/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
63/// so future knobs (parallelism, filter expressions, additional ref
64/// strategies) can land without breaking library consumers who
65/// constructed with explicit-literal syntax. Forces callers to use
66/// struct-update syntax (`..Default::default()`).
67#[non_exhaustive]
68#[derive(Debug, Clone)]
69pub struct SyncOptions {
70    /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
71    pub dry_run: bool,
72    /// When `false`, skip plan-phase validators (manifest + graph). Debug
73    /// escape hatch; production callers should leave this `true`.
74    pub validate: bool,
75    /// Override workspace directory. `None` → derived from `pack_root`
76    /// (the directory holding `.grex/pack.yaml`).
77    ///
78    /// **v1.2.1 path (iii) semantics**: when `Some`, this path IS the
79    /// canonical meta directory. Children resolve parent-relatively as
80    /// `<workspace>/<child.path>` and `<workspace>/.grex/pack.yaml` is
81    /// where the root manifest is read from. The path MUST exist;
82    /// symlinks are resolved via `fs::canonicalize` to a single
83    /// inode-stable form. Pre-v1.2.1 the override only re-anchored
84    /// children — that legacy split is retired.
85    pub workspace: Option<PathBuf>,
86    /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
87    /// `Some`, every child pack clone/checkout uses this ref instead of
88    /// the declared `child.ref`. Empty strings are rejected at the CLI
89    /// layer.
90    pub ref_override: Option<String>,
91    /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
92    /// strings — compiled internally via an in-crate `globset` helper so the
93    /// `globset` crate version does not leak into the public API.
94    /// `None` / empty means every pack runs (M3 semantics). Matching is
95    /// against the pack's **workspace-relative** path normalized to
96    /// forward-slash form.
97    pub only_patterns: Option<Vec<String>>,
98    /// Bypass the lockfile hash-match skip (`grex sync --force`). When
99    /// `true`, every pack re-executes even if its `actions_hash` is
100    /// unchanged from the prior lockfile.
101    pub force: bool,
102    /// Max parallel pack ops for this sync run (feat-m6-1).
103    ///
104    /// * `None` → callers default to `num_cpus::get()` at CLI layer.
105    ///   Library callers who construct `SyncOptions` directly and leave
106    ///   this `None` get `num_cpus::get()` semantics too — the sync
107    ///   driver resolves the default in one place so the scheduler slot
108    ///   on every `ExecCtx` is always populated.
109    /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
110    /// * `Some(1)` → serial fast-path.
111    /// * `Some(n >= 2)` → bounded parallel.
112    pub parallel: Option<usize>,
113    /// v1.2.0 Stage 1.l prep — when `true`, walker Phase 2 may drop
114    /// dirty trees during prune. Still refuses ignored content unless
115    /// [`SyncOptions::force_prune_with_ignored`] is also `true`.
116    /// Default `false` preserves v1.1.1 behavior (refuse all dirty
117    /// drops).
118    pub force_prune: bool,
119    /// v1.2.0 Stage 1.l prep — when `true` (implies
120    /// [`SyncOptions::force_prune`]), walker Phase 2 also drops
121    /// ignored content. Hard override — the strongest level. Default
122    /// `false` preserves v1.1.1 behavior.
123    pub force_prune_with_ignored: bool,
124    /// v1.2.1 Item 5b — when `true` AND `force_prune` (or
125    /// `force_prune_with_ignored`) is set, divert Phase 2 prunes
126    /// through the snapshot-then-unlink quarantine pipeline. The
127    /// dest's full subtree is recursively copied to
128    /// `<workspace>/.grex/trash/<ISO8601>/<basename>/` BEFORE
129    /// `unlink(dest)` fires. Snapshot or audit-fsync failure aborts
130    /// the prune (no unlink). Lean theorem
131    /// `quarantine_snapshot_precedes_delete` proves the safety
132    /// contract. Default `false` preserves v1.2.0 direct-unlink
133    /// behavior. Has no effect unless one of the `force_prune*`
134    /// flags is also set (the CLI enforces this via
135    /// `requires = "force_prune"`; library callers who set this
136    /// with neither flag get a no-op since Phase 2 will not enter
137    /// the override path at all).
138    pub quarantine: bool,
139    /// v1.2.0 Stage 1.h opt-in — when `true`, the walker rewrites a
140    /// legacy v1.1.1 lockfile in place to the v1.2.0 shape. When
141    /// `false` (default), the walker errors on the legacy shape so
142    /// migration is always an explicit caller decision.
143    pub migrate_lockfile: bool,
144    /// v1.2.0 Stage 1.j prep — when `true` (default), the walker
145    /// descends into nested meta-children. `doctor --shallow` flips
146    /// this to `false` so only the immediate workspace is inspected.
147    pub recurse: bool,
148    /// v1.2.0 Stage 1.j prep — pairs with
149    /// [`SyncOptions::recurse`] for `--shallow=N`. `None` (default)
150    /// is unbounded recursion when `recurse` is `true`. `Some(n)`
151    /// caps depth at `n` levels of nesting.
152    pub max_depth: Option<usize>,
153}
154
155impl Default for SyncOptions {
156    fn default() -> Self {
157        Self {
158            dry_run: false,
159            validate: true,
160            workspace: None,
161            ref_override: None,
162            only_patterns: None,
163            force: false,
164            parallel: None,
165            // v1.2.0 Stage 1.m additions — defaults preserve v1.1.1
166            // behavior. Each field is a dormant placeholder until
167            // its corresponding walker stage wires it.
168            force_prune: false,
169            force_prune_with_ignored: false,
170            quarantine: false,
171            migrate_lockfile: false,
172            recurse: true,
173            max_depth: None,
174        }
175    }
176}
177
178/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
179/// Empty / absent input yields `Ok(None)` so M3's zero-config path
180/// (every pack runs) stays the default.
181fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
182    let Some(pats) = patterns else { return Ok(None) };
183    if pats.is_empty() {
184        return Ok(None);
185    }
186    let mut builder = GlobSetBuilder::new();
187    for p in pats {
188        let glob = Glob::new(p)
189            .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
190        builder.add(glob);
191    }
192    let set = builder
193        .build()
194        .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
195    Ok(Some(set))
196}
197
198impl SyncOptions {
199    /// Default options: wet-run, validators enabled, default workspace path.
200    #[must_use]
201    pub fn new() -> Self {
202        Self::default()
203    }
204
205    /// Set `dry_run`.
206    #[must_use]
207    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
208        self.dry_run = dry_run;
209        self
210    }
211
212    /// Set `validate`.
213    #[must_use]
214    pub fn with_validate(mut self, validate: bool) -> Self {
215        self.validate = validate;
216        self
217    }
218
219    /// Set `workspace` override.
220    #[must_use]
221    pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
222        self.workspace = workspace;
223        self
224    }
225
226    /// Set `ref_override` (`--ref`).
227    #[must_use]
228    pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
229        self.ref_override = ref_override;
230        self
231    }
232
233    /// Set `only_patterns` (`--only`). Empty vector or `None` disables
234    /// the filter.
235    #[must_use]
236    pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
237        self.only_patterns = patterns;
238        self
239    }
240
241    /// Set `force` (`--force`).
242    #[must_use]
243    pub fn with_force(mut self, force: bool) -> Self {
244        self.force = force;
245        self
246    }
247
248    /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
249    /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
250    #[must_use]
251    pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
252        self.parallel = parallel;
253        self
254    }
255
256    /// Set `force_prune` (`--force-prune`). See
257    /// [`SyncOptions::force_prune`] for the override matrix.
258    #[must_use]
259    pub fn with_force_prune(mut self, force_prune: bool) -> Self {
260        self.force_prune = force_prune;
261        self
262    }
263
264    /// Set `force_prune_with_ignored` (`--force-prune-with-ignored`).
265    /// See [`SyncOptions::force_prune_with_ignored`] for the override
266    /// matrix.
267    #[must_use]
268    pub fn with_force_prune_with_ignored(mut self, force_prune_with_ignored: bool) -> Self {
269        self.force_prune_with_ignored = force_prune_with_ignored;
270        self
271    }
272
273    /// Set `quarantine` (`--quarantine`). See
274    /// [`SyncOptions::quarantine`] for the snapshot-before-delete
275    /// contract. Has no effect unless [`SyncOptions::force_prune`]
276    /// or [`SyncOptions::force_prune_with_ignored`] is also set.
277    #[must_use]
278    pub fn with_quarantine(mut self, quarantine: bool) -> Self {
279        self.quarantine = quarantine;
280        self
281    }
282}
283
284/// One executed (or planned) action step in a sync run.
285///
286/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
287/// plugin provenance) can land without breaking library consumers who
288/// destructure the struct.
289#[non_exhaustive]
290#[derive(Debug, Clone)]
291pub struct SyncStep {
292    /// Name of the pack that owned the action.
293    pub pack: String,
294    /// 0-based index into the pack's top-level `actions` vector.
295    pub action_idx: usize,
296    /// The [`ExecStep`] record emitted by the executor.
297    pub exec_step: ExecStep,
298}
299
300/// Outcome of a [`run`] invocation.
301///
302/// On fail-fast termination, `halted` carries the error that stopped the
303/// sync; every completed step up to that point is still in `steps` so
304/// callers can render a partial transcript.
305///
306/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
307/// can land without breaking library consumers who destructure the struct.
308#[non_exhaustive]
309#[derive(Debug)]
310pub struct SyncReport {
311    /// Fully-walked pack graph (present even on halted runs).
312    pub graph: PackGraph,
313    /// Steps produced by the executor, in execution order.
314    pub steps: Vec<SyncStep>,
315    /// `Some(e)` if execution stopped before all actions ran.
316    pub halted: Option<SyncError>,
317    /// Non-fatal manifest-append warnings (one per failed event append).
318    /// Kept as a separate field because spec marks event-log write failures
319    /// as non-aborting.
320    pub event_log_warnings: Vec<String>,
321    /// `Some(r)` when the pre-run teardown scan found orphaned backup
322    /// files or dangling [`Event::ActionStarted`] records from a prior
323    /// crashed run. Informational only — the report is still returned and
324    /// the sync proceeds. CLI renderers should surface a warning so the
325    /// operator can decide whether to run a future `grex doctor` verb.
326    pub pre_run_recovery: Option<RecoveryReport>,
327    /// One entry per child whose legacy `.grex/workspace/<name>/` layout
328    /// was relocated (or considered for relocation) on this sync. Empty
329    /// when no legacy directory was found — the common case for any
330    /// workspace built fresh on v1.1.0+. CLI renderers should surface
331    /// the list so operators see what changed.
332    pub workspace_migrations: Vec<WorkspaceMigration>,
333}
334
335/// One legacy-layout migration attempt. `outcome` distinguishes the
336/// move-succeeded case from the don't-clobber-user-data case so CLI
337/// renderers can present different advice to the operator.
338#[non_exhaustive]
339#[derive(Debug, Clone, PartialEq, Eq)]
340pub struct WorkspaceMigration {
341    /// Source path under the legacy `.grex/workspace/<name>/` location,
342    /// rendered relative to the pack root for log readability.
343    pub from: PathBuf,
344    /// Destination flat-sibling path `<pack_root>/<name>/`, relative to
345    /// the pack root.
346    pub to: PathBuf,
347    /// What happened.
348    pub outcome: MigrationOutcome,
349}
350
351/// Outcome of one legacy-layout migration attempt.
352#[non_exhaustive]
353#[derive(Debug, Clone, PartialEq, Eq)]
354pub enum MigrationOutcome {
355    /// Legacy directory was renamed onto the flat-sibling slot.
356    Migrated,
357    /// Both legacy and flat-sibling slots existed. Skipped — the user
358    /// must inspect and reconcile manually so we never silently delete
359    /// either.
360    SkippedBothExist,
361    /// Flat-sibling slot already had a non-grex file or directory in
362    /// the way. Skipped — refusing to clobber user data even when the
363    /// legacy slot is plainly the source of truth.
364    SkippedDestOccupied,
365    /// `fs::rename` failed (e.g. cross-volume, ACL denied). The legacy
366    /// directory is still in place; surfaced so the operator can move
367    /// it manually.
368    Failed { error: String },
369}
370
371/// Rich context attached to a [`SyncError::Halted`] variant.
372///
373/// Packages the pack + action position together with the underlying
374/// executor error and an optional human-readable recovery hint. Marked
375/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
376/// land without breaking `match` arms or struct destructures.
377#[non_exhaustive]
378#[derive(Debug)]
379pub struct HaltedContext {
380    /// Name of the pack that owned the halted action.
381    pub pack: String,
382    /// 0-based index into the pack's top-level `actions` vector.
383    pub action_idx: usize,
384    /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
385    pub action_name: String,
386    /// Underlying executor error.
387    pub error: ExecError,
388    /// Optional next-step suggestion for the operator. `None` when no
389    /// generic hint applies — the executor error's own `Display` already
390    /// tells the story.
391    pub recovery_hint: Option<String>,
392}
393
394/// Error taxonomy surfaced by [`run`].
395#[non_exhaustive]
396#[derive(Debug, Error)]
397pub enum SyncError {
398    /// The pack-tree walker failed (loader error, git error, cycle, …).
399    #[error("tree walk failed: {0}")]
400    Tree(#[from] TreeError),
401    /// One or more plan-phase validators flagged the graph.
402    #[error("validation failed: {errors:?}")]
403    Validation {
404        /// Aggregated errors from manifest-level + graph-level validators.
405        errors: Vec<PackValidationError>,
406    },
407    /// An action executor returned an error.
408    ///
409    /// Retained for backward compatibility; new call sites should prefer
410    /// [`SyncError::Halted`] which carries full pack + action context.
411    /// Kept non-deprecated because [`From<ExecError>`] still materialises
412    /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
413    #[error("action execution failed: {0}")]
414    Exec(#[from] ExecError),
415    /// Action execution halted; full context (pack, action index, error,
416    /// optional recovery hint) lives in [`HaltedContext`]. This is the
417    /// variant the sync driver emits — [`SyncError::Exec`] is only
418    /// surfaced by ancillary code paths.
419    #[error(
420        "sync halted at pack `{}` action #{} ({}): {}",
421        .0.pack, .0.action_idx, .0.action_name, .0.error
422    )]
423    Halted(Box<HaltedContext>),
424    /// Another `grex` process (or thread) already holds the workspace-level
425    /// lock. The running sync refused to start to avoid racing two concurrent
426    /// walkers into the same workspace. If the lock file at `lock_path` is
427    /// stale (no other grex is actually running), remove it by hand.
428    #[error(
429        "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
430    )]
431    WorkspaceBusy {
432        /// Resolved workspace directory that the current run tried to lock.
433        workspace: PathBuf,
434        /// Sidecar lock file that is currently held.
435        lock_path: PathBuf,
436    },
437    /// Reading or parsing the resolved-state lockfile failed. Surfaced as
438    /// its own variant (rather than folded into `Validation`) because a
439    /// corrupt / unreadable lockfile is an I/O or schema fault, not a
440    /// dependency-satisfaction fault. Resolution is operator-level
441    /// (restore a backup, delete the file, re-sync), not author-level.
442    #[error("lockfile `{path}` failed to load: {source}")]
443    Lockfile {
444        /// Lockfile path that failed to load.
445        path: PathBuf,
446        /// Underlying lockfile error.
447        #[source]
448        source: LockfileError,
449    },
450    /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
451    /// as its own variant so the CLI can map it to a dedicated usage
452    /// error exit code instead of the generic sync-failure bucket.
453    #[error("invalid --only glob `{pattern}`: {source}")]
454    InvalidOnlyGlob {
455        /// The raw pattern string that failed to compile.
456        pattern: String,
457        /// Underlying globset error.
458        #[source]
459        source: globset::Error,
460    },
461    /// Migrating the v1.x event log (`grex.jsonl`) to the v2 canonical
462    /// path (`.grex/events.jsonl`) failed. Operator-level resolution
463    /// (check filesystem permissions, free disk space, then retry).
464    #[error("event-log migration failed: {0}")]
465    EventLogMigration(#[source] crate::manifest::ManifestError),
466    /// Cooperative cancellation fired (Ctrl-C / SIGTERM) during a
467    /// parallel sync. v1.2.0 Stage 1.g wires the rayon walker to surface
468    /// this distinct-from-failure variant so the CLI can exit with a
469    /// dedicated cancellation code instead of a generic sync error.
470    /// Dormant until Stage 1.g — the existing CLI does not yet emit it.
471    #[error("sync cancelled by user")]
472    SchedulerCancelled,
473}
474
475impl Clone for SyncError {
476    fn clone(&self) -> Self {
477        // `TreeError` / `ExecError` do not implement `Clone` (they wrap
478        // `std::io::Error`-adjacent values). Halts carry only a display
479        // rendering in the report; we re-materialise via a synthetic
480        // `Validation` variant so `SyncReport` can be `Clone`-safe for
481        // observability tooling without widening the taxonomy.
482        match self {
483            Self::Tree(e) => Self::Validation {
484                errors: vec![PackValidationError::DependsOnUnsatisfied {
485                    pack: "<tree>".into(),
486                    required: e.to_string(),
487                }],
488            },
489            Self::Validation { errors } => Self::Validation { errors: errors.clone() },
490            Self::Exec(e) => Self::Validation {
491                errors: vec![PackValidationError::DependsOnUnsatisfied {
492                    pack: "<exec>".into(),
493                    required: e.to_string(),
494                }],
495            },
496            Self::Halted(ctx) => Self::Validation {
497                errors: vec![PackValidationError::DependsOnUnsatisfied {
498                    pack: ctx.pack.clone(),
499                    required: format!(
500                        "action #{} ({}): {}",
501                        ctx.action_idx, ctx.action_name, ctx.error
502                    ),
503                }],
504            },
505            Self::WorkspaceBusy { workspace, lock_path } => {
506                Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
507            }
508            Self::Lockfile { path, source } => Self::Validation {
509                errors: vec![PackValidationError::DependsOnUnsatisfied {
510                    pack: "<lockfile>".into(),
511                    required: format!("{}: {source}", path.display()),
512                }],
513            },
514            Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
515                errors: vec![PackValidationError::DependsOnUnsatisfied {
516                    pack: "<only-glob>".into(),
517                    required: format!("{pattern}: {source}"),
518                }],
519            },
520            Self::EventLogMigration(source) => Self::Validation {
521                errors: vec![PackValidationError::DependsOnUnsatisfied {
522                    pack: "<event-log-migration>".into(),
523                    required: source.to_string(),
524                }],
525            },
526            Self::SchedulerCancelled => Self::SchedulerCancelled,
527        }
528    }
529}
530
531/// Run a full sync over the pack tree rooted at `pack_root`.
532///
533/// Resolution rules:
534/// * If `pack_root` is a directory the walker looks for
535///   `<pack_root>/.grex/pack.yaml`.
536/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
537/// * Workspace defaults to the pack root directory itself when
538///   `opts.workspace` is `None`. Children resolve as flat siblings of the
539///   parent pack root (since v1.1.0).
540///
541/// # Errors
542///
543/// Returns the first error that halts the pipeline — see [`SyncError`] for
544/// the taxonomy.
545///
546/// `cancel` is the cooperative cancellation handle threaded through the
547/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
548/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
549/// acquire). CLI callers pass a never-cancelled sentinel
550/// (`CancellationToken::new()`); the MCP server passes a token tied to
551/// the request lifetime.
552pub fn run(
553    pack_root: &Path,
554    opts: &SyncOptions,
555    cancel: &CancellationToken,
556) -> Result<SyncReport, SyncError> {
557    // Stage 2 is signature-only — silence "unused parameter" without
558    // hiding it behind `_` (downstream stages will read it).
559    let _ = cancel;
560    let workspace = prepare_workspace(pack_root, opts)?;
561    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
562    let _ws_guard = match ws_lock.try_acquire() {
563        Ok(Some(g)) => g,
564        Ok(None) => {
565            return Err(SyncError::WorkspaceBusy {
566                workspace: workspace.clone(),
567                lock_path: ws_lock_path,
568            });
569        }
570        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
571    };
572
573    // Compile `--only` patterns into a GlobSet here so the
574    // `globset` crate version does not leak into `SyncOptions`.
575    let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
576
577    // Auto-migrate legacy `.grex/workspace/<name>/` layout BEFORE the
578    // walker resolves children. Idempotent: a fresh v1.1.0+ workspace
579    // sees no legacy directory and the function no-ops.
580    let workspace_migrations = migrate_legacy_workspace(pack_root);
581
582    // v1.2.1 path (iii) — three-stage composition:
583    //   sync_meta(workspace, prune_candidates) — mutate (rayon parallel)
584    //   build_graph(workspace)                 — read-only graph
585    //   run_actions(graph)                     — consume graph
586    // `Walker::walk` is retired from the prod path; the symbol is kept
587    // for test-suite compat. See `crates/grex-core/src/tree/graph_build.rs`.
588    run_sync_meta(&workspace, opts)?;
589    let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
590    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
591    log_force_flag(opts.force);
592
593    let mut report = SyncReport {
594        graph,
595        steps: Vec::new(),
596        halted: None,
597        event_log_warnings: Vec::new(),
598        pre_run_recovery: prep.pre_run_recovery,
599        workspace_migrations,
600    };
601
602    let mut next_lock = prep.prior_lock.clone();
603    // feat-m6 B1: resolve `--parallel` once and build the scheduler
604    // shared across every `ExecCtx` in this run. Library callers who
605    // leave `opts.parallel == None` default to `num_cpus::get()` here
606    // (clamped `>= 1`) so the scheduler slot is always populated —
607    // `ctx.scheduler` being `None` would strand acquire-sites into
608    // unbounded concurrency. See `.omne/cfg/concurrency.md` §Scheduler.
609    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
610    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
611    run_actions(
612        &mut report,
613        &prep.order,
614        &prep.vars,
615        &workspace,
616        &prep.event_log,
617        &prep.lock_path,
618        opts.dry_run,
619        &prep.prior_lock,
620        &mut next_lock,
621        &prep.registry,
622        &prep.pack_type_registry,
623        only_set.as_ref(),
624        opts.force,
625        resolved_parallel,
626        &scheduler,
627    );
628
629    persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
630    Ok(report)
631}
632
633/// Bag of context pieces assembled once at the top of [`run`]. Grouping
634/// them keeps [`run`] under the workspace's 50-LOC function lint without
635/// smearing the read of sequential setup across helpers. Fields are
636/// consumed piecemeal by the actions loop; no getters needed.
637struct RunContext {
638    order: Vec<usize>,
639    vars: VarEnv,
640    event_log: PathBuf,
641    lock_path: PathBuf,
642    lockfile_path: PathBuf,
643    prior_lock: std::collections::HashMap<String, LockEntry>,
644    registry: Arc<Registry>,
645    pack_type_registry: Arc<PackTypeRegistry>,
646    pre_run_recovery: Option<RecoveryReport>,
647}
648
649/// Build the per-run context: traversal order, vars env, event/lockfile
650/// paths, prior lockfile state, bootstrap registry, and (optionally) a
651/// pre-run recovery scan. Kept narrow so [`run`] stays small.
652///
653/// `workspace` is the resolved workspace directory (post `--workspace`
654/// override) so the recovery scan looks for `.grex.bak` artefacts under
655/// the actual on-disk location children were materialised at — not
656/// under the pack root, which differs from the workspace whenever the
657/// CLI's `--workspace` flag is used. Pre-fix this anchoring drift
658/// caused recovery scans to miss every backup left under an override
659/// workspace.
660fn prepare_run_context(
661    pack_root: &Path,
662    graph: &PackGraph,
663    workspace: &Path,
664) -> Result<RunContext, SyncError> {
665    let event_log = event_log_path(pack_root);
666    let lock_path = event_lock_path(&event_log);
667    let vars = VarEnv::from_os();
668    let order = post_order(graph);
669    let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
670    let lockfile_path = lockfile_path(pack_root);
671    let prior_lock = load_prior_lock(&lockfile_path)?;
672    let registry = Arc::new(Registry::bootstrap());
673    let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
674    Ok(RunContext {
675        order,
676        vars,
677        event_log,
678        lock_path,
679        lockfile_path,
680        prior_lock,
681        registry,
682        pack_type_registry,
683        pre_run_recovery,
684    })
685}
686
687/// Build the [`PackTypeRegistry`] the sync driver threads into every
688/// [`ExecCtx`] it constructs.
689///
690/// Default path (no `plugin-inventory` feature) hard-codes the three
691/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
692/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
693/// externally-submitted plugin types (mirroring the M4-E pattern for
694/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
695/// helper so the `#[cfg]` split lives in one place instead of being
696/// smeared across every sync call-site.
697fn bootstrap_pack_type_registry() -> PackTypeRegistry {
698    #[cfg(feature = "plugin-inventory")]
699    {
700        let mut reg = PackTypeRegistry::bootstrap();
701        reg.register_from_inventory();
702        reg
703    }
704    #[cfg(not(feature = "plugin-inventory"))]
705    {
706        PackTypeRegistry::bootstrap()
707    }
708}
709
710/// Emit a single `tracing::info!` line when `--force` is active so
711/// operators can confirm from logs that the skip short-circuit was
712/// bypassed. Extracted so [`run`] stays small.
713fn log_force_flag(force: bool) {
714    if force {
715        tracing::info!(
716            target: "grex::sync",
717            "--force active: bypassing lockfile skip-on-hash short-circuit"
718        );
719    }
720}
721
722/// v1.2.1 path (iii) — drive the v1.2.0 [`sync_meta`] walker over the
723/// resolved canonical workspace.
724///
725/// This is the SOLE mutating pass in `sync::run`: clones, fetches,
726/// prune dispatches, distributed-lockfile reads, and TOCTOU `BoundedDir`
727/// opens all happen here. The subsequent [`build_and_validate_graph`]
728/// pass is read-only against the disk state this fn leaves behind.
729///
730/// `prune_candidates` is computed from the per-meta lockfile orphans:
731/// every entry in `<workspace>/.grex/grex.lock.jsonl` whose `path` no
732/// longer appears in the live root manifest's `children[]` is fed into
733/// Phase 2 for dispatch (with `--force-prune` / `--force-prune-with-ignored`
734/// overrides honoured by the consent walk). This closes the
735/// "prune-inert" gap from the previous wiring, where `sync::run` passed
736/// `&[]` and `--force-prune` was a CLI flag with no behavioural reach.
737///
738/// `--workspace` semantics: the canonical `workspace` argument is what
739/// `sync_meta` uses as its `meta_dir`. Children land at
740/// `<workspace>/<child.path>` — the v1.2.0 parent-relative model. Prior
741/// to v1.2.1, callers passing `--workspace` skipped the precursor
742/// entirely; that bypass is retired here so override callers see the
743/// same v1.2.0 semantics as the default-cwd path.
744///
745/// `SyncOptions::parallel` mapping (mirrors [`SyncMetaOptions::parallel`]
746/// with the documented `Some(0)` carve-out):
747/// * `None` → `SyncMetaOptions::parallel = None` (rayon default =
748///   `num_cpus::get()`).
749/// * `Some(0)` → `SyncMetaOptions::parallel = None` (the CLI sentinel
750///   for "unbounded" maps to rayon's default; `Some(0)` would be
751///   clamped to `1` inside `build_pool`, which is not what callers
752///   asking for unbounded want).
753/// * `Some(n)` for `n >= 1` → `SyncMetaOptions::parallel = Some(n)`.
754fn run_sync_meta(workspace: &Path, opts: &SyncOptions) -> Result<(), SyncError> {
755    let loader = FsPackLoader::new();
756    let backend = GixBackend::new();
757    let parallel = match opts.parallel {
758        None | Some(0) => None,
759        Some(n) => Some(n),
760    };
761    // v1.2.1 Item 5b — resolve the quarantine config relative to the
762    // canonical workspace (the same `meta_dir` `sync_meta` runs on).
763    // Trash bucket lives at `<workspace>/.grex/trash/`; audit log at
764    // `<workspace>/.grex/events.jsonl` — same path the existing
765    // `ForcePruneExecuted` event uses.
766    let quarantine = opts.quarantine.then(|| crate::tree::QuarantineConfig {
767        trash_root: workspace.join(".grex").join("trash"),
768        audit_log: crate::manifest::event_log_path(workspace),
769    });
770    let meta_opts = SyncMetaOptions {
771        ref_override: opts.ref_override.clone(),
772        recurse: opts.recurse,
773        max_depth: opts.max_depth,
774        force_prune: opts.force_prune,
775        force_prune_with_ignored: opts.force_prune_with_ignored,
776        parallel,
777        quarantine,
778    };
779    let prune_candidates = compute_prune_candidates(workspace, &loader);
780    let report = sync_meta(workspace, &backend, &loader, &meta_opts, &prune_candidates)?;
781    if let Some(first) = report.errors.into_iter().next() {
782        return Err(SyncError::Tree(first));
783    }
784    Ok(())
785}
786
787/// v1.2.1 path (iii) — orphan-prune candidate computation.
788///
789/// Reads `<workspace>/.grex/grex.lock.jsonl` and the root manifest;
790/// returns every lockfile entry path that no longer matches a declared
791/// child in `manifest.children`. Empty in three cases:
792///
793/// * No lockfile (fresh workspace, never synced).
794/// * No manifest at `<workspace>/.grex/pack.yaml` (single-node tree —
795///   `sync_meta` will surface its own diagnostic).
796/// * Lockfile entries are all still declared (steady-state sync).
797///
798/// Lockfile read errors are tolerated as `Vec::new()`: the prune pass
799/// is opportunistic, and a corrupt lockfile is the migrator's concern,
800/// not the prune dispatcher's. Manifest read errors are similarly
801/// tolerated — `sync_meta` will fail loudly on the same condition,
802/// giving the operator a single unambiguous error surface.
803fn compute_prune_candidates(
804    workspace: &Path,
805    loader: &dyn crate::tree::PackLoader,
806) -> Vec<PathBuf> {
807    use crate::lockfile::read_meta_lockfile;
808    let entries = match read_meta_lockfile(workspace) {
809        Ok(e) => e,
810        Err(_) => return Vec::new(),
811    };
812    if entries.is_empty() {
813        return Vec::new();
814    }
815    let manifest = match loader.load(workspace) {
816        Ok(m) => m,
817        Err(_) => return Vec::new(),
818    };
819    let declared: std::collections::HashSet<String> =
820        manifest.children.iter().map(crate::pack::ChildRef::effective_path).collect();
821    entries
822        .into_iter()
823        .filter(|e| !declared.contains(&e.path))
824        .map(|e| PathBuf::from(e.path))
825        .collect()
826}
827
828/// v1.2.1 path (iii) — read-only graph build + plan-phase validation.
829///
830/// Builds the [`PackGraph`] from the on-disk meta tree rooted at
831/// `workspace`. Replaces the legacy `walk_and_validate` (which used
832/// [`crate::tree::Walker::walk`] and re-issued every clone/fetch as a
833/// no-op probe) with the v1.2.1 split:
834///
835/// * The mutating half ran in [`run_sync_meta`] — all clones, fetches,
836///   prune dispatches, and TOCTOU `BoundedDir` opens already happened.
837/// * THIS pass is strictly READ-ONLY. It walks the manifest tree
838///   parent-relatively (matching what `sync_meta` placed on disk),
839///   loads each child's `pack.yaml` (or synthesises a plain-git leaf),
840///   probes `head_sha`, and produces the [`PackGraph`] consumed by
841///   [`run_actions`].
842///
843/// Plan-phase validators run against the assembled graph when
844/// `validate` is true.
845fn build_and_validate_graph(
846    workspace: &Path,
847    validate: bool,
848    ref_override: Option<&str>,
849) -> Result<PackGraph, SyncError> {
850    let loader = FsPackLoader::new();
851    let backend = GixBackend::new();
852    let graph = build_graph(workspace, &backend, &loader, ref_override)?;
853    if validate {
854        validate_graph(&graph)?;
855    }
856    Ok(graph)
857}
858
859/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
860/// empty map; parse errors are fatal since writes are atomic and a torn
861/// lockfile therefore indicates real corruption that must be resolved
862/// before a fresh sync is safe. Parse/IO failures surface as
863/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
864/// dependency-satisfaction fault, so it gets its own taxonomy slot.
865fn load_prior_lock(
866    lockfile_path: &Path,
867) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
868    read_lockfile(lockfile_path)
869        .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
870}
871
872/// Persist `next_lock` atomically to `lockfile_path` whenever this was
873/// not a dry-run. On a halt the map has already had the halted pack's
874/// entry removed (see `run_actions`), so persisting now preserves every
875/// *successful* pack's fresh entry while guaranteeing absence of an
876/// entry for the halted pack — next sync sees no prior hash there and
877/// re-executes from scratch (route (b) halt-state gating). Write errors
878/// surface as non-fatal warnings on the report.
879fn persist_lockfile_if_clean(
880    report: &mut SyncReport,
881    lockfile_path: &Path,
882    next_lock: &std::collections::HashMap<String, LockEntry>,
883    dry_run: bool,
884) {
885    if dry_run {
886        return;
887    }
888    if let Err(e) = write_lockfile(lockfile_path, next_lock) {
889        tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
890        report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
891    }
892}
893
894/// Canonical location of the resolved-state lockfile
895/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
896/// so both audit artifacts live under a single `.grex/` sidecar.
897fn lockfile_path(pack_root: &Path) -> PathBuf {
898    pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
899}
900
901/// Create the workspace directory if it does not yet exist.
902fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
903    if !workspace.exists() {
904        std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
905            errors: vec![PackValidationError::DependsOnUnsatisfied {
906                pack: "<workspace>".into(),
907                required: format!("{}: {e}", workspace.display()),
908            }],
909        })?;
910    }
911    Ok(())
912}
913
914/// Open (but do not acquire) the workspace-level lock file.
915fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
916    let ws_lock_path = workspace_lock_path(workspace);
917    let ws_lock = ScopedLock::open(&ws_lock_path)
918        .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
919    Ok((ws_lock, ws_lock_path))
920}
921
922/// Build a `Validation` error describing a workspace-lock failure.
923fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
924    SyncError::Validation {
925        errors: vec![PackValidationError::DependsOnUnsatisfied {
926            pack: "<workspace-lock>".into(),
927            required: format!("{}: {reason}", ws_lock_path.display()),
928        }],
929    }
930}
931
932/// Single source of truth for the legacy workspace directory name.
933/// Pre-`v1.1.0` `resolve_workspace` joined `.grex/workspace/` onto the
934/// pack root by default; the auto-migration in
935/// [`migrate_legacy_workspace`] is the only place that legacy literal
936/// is allowed to appear in `crates/grex-core/src/`. The grep gate in
937/// the v1.1.0 release checklist allows this one constant.
938const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
939
940/// Auto-migrate any legacy `.grex/workspace/<name>/` child layout left
941/// over from v1.0.x to the v1.1.0 flat-sibling layout. Idempotent: a
942/// fresh workspace built on v1.1.0+ sees no `.grex/workspace/`
943/// directory and the function no-ops.
944///
945/// Per-child outcomes:
946///
947/// * **Both legacy + flat-sibling exist** → `SkippedBothExist`. The
948///   user needs to inspect (perhaps the legacy is stale, perhaps it is
949///   the source of truth); we never silently delete either.
950/// * **Flat-sibling slot occupied by a non-grex file or non-empty dir**
951///   → `SkippedDestOccupied`. Refuse to clobber user data.
952/// * **Legacy exists, flat-sibling absent** → `Migrated` via atomic
953///   `fs::rename`. Same-volume move is the common case (the migration
954///   stays inside `pack_root`); cross-volume failures surface as
955///   `Failed { error }` with the OS message so the operator can move
956///   manually.
957/// * **Legacy absent** → silent no-op (not recorded in the report).
958///
959/// After all per-child decisions: orphan `.grex.sync.lock` under the
960/// legacy workspace is removed (best-effort) and the empty
961/// `.grex/workspace/` directory is rmdir'd (best-effort). Both are
962/// soft-failures: leaving them on disk is harmless, surfacing the
963/// errors as a sync abort would be over-strict.
964///
965/// Discovery is by directory listing, not by parent-manifest parse —
966/// migration must work even when the parent manifest itself was
967/// rewritten between versions. A child counts as "legacy" iff
968/// `<pack_root>/<LEGACY_WORKSPACE_DIR>/<name>/.git` exists (i.e. it is
969/// an actual git working tree, not stray metadata).
970fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
971    let root = pack_root_dir(pack_root);
972    let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
973    if !legacy_root.is_dir() {
974        return Vec::new();
975    }
976    let entries = match fs::read_dir(&legacy_root) {
977        Ok(e) => e,
978        Err(e) => {
979            tracing::warn!(
980                target: "grex::sync::migrate",
981                "legacy workspace `{}` unreadable: {e}",
982                legacy_root.display(),
983            );
984            return Vec::new();
985        }
986    };
987    let mut migrations = Vec::new();
988    for entry_result in entries {
989        let entry = match entry_result {
990            Ok(e) => e,
991            Err(e) => {
992                tracing::warn!(
993                    target: "grex::sync::migrate",
994                    "skipping unreadable entry under `{}`: {e}",
995                    legacy_root.display(),
996                );
997                continue;
998            }
999        };
1000        let Ok(ft) = entry.file_type() else { continue };
1001        // file_type avoids symlink-following; legitimate v1.0.x children
1002        // were always real directories, so anything else is skipped.
1003        if ft.is_symlink() || !ft.is_dir() {
1004            continue;
1005        }
1006        let name_os = entry.file_name();
1007        let Some(name) = name_os.to_str() else { continue };
1008        // Only act on entries that look like real cloned children (have
1009        // a `.git`). The legacy workspace lock file (`.grex.sync.lock`)
1010        // is not a directory and is filtered out by the dir check above;
1011        // we clean it up explicitly after the migration loop completes.
1012        let from_abs = entry.path();
1013        if !from_abs.join(".git").exists() {
1014            continue;
1015        }
1016        let to_abs = root.join(name);
1017        let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
1018        let to_rel = PathBuf::from(name);
1019        let outcome = decide_and_migrate(&from_abs, &to_abs);
1020        log_migration(&from_rel, &to_rel, &outcome);
1021        migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
1022    }
1023    cleanup_legacy_workspace_root(&legacy_root);
1024    migrations
1025}
1026
1027/// Decide what to do with one legacy child + perform the move when
1028/// safe. Returns the outcome to record on the [`WorkspaceMigration`].
1029fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
1030    let dest_exists = to.exists();
1031    let dest_is_grex_repo = dest_exists && to.join(".git").exists();
1032    if dest_is_grex_repo {
1033        // Both legacy and flat-sibling are git repos. Refuse to choose
1034        // between them; let the user resolve.
1035        return MigrationOutcome::SkippedBothExist;
1036    }
1037    if dest_exists {
1038        // Some other entry occupies the flat-sibling slot — a stray
1039        // file, an empty dir, an unrelated dir. Treat as user data and
1040        // leave both in place.
1041        return MigrationOutcome::SkippedDestOccupied;
1042    }
1043    match fs::rename(from, to) {
1044        Ok(()) => MigrationOutcome::Migrated,
1045        Err(e) => MigrationOutcome::Failed { error: e.to_string() },
1046    }
1047}
1048
1049/// Emit one structured log line per migration so users see exactly what
1050/// happened during the upgrade. Severity matches outcome: success is
1051/// `info`, skips and failures are `warn` so they surface in the default
1052/// log level without forcing operators to crank verbosity.
1053fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
1054    let from_disp = from.display();
1055    let to_disp = to.display();
1056    match outcome {
1057        MigrationOutcome::Migrated => {
1058            tracing::info!(
1059                target: "grex::sync::migrate",
1060                "migrated: legacy={from_disp} -> new={to_disp}",
1061            );
1062        }
1063        MigrationOutcome::SkippedBothExist => {
1064            tracing::warn!(
1065                target: "grex::sync::migrate",
1066                "skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
1067            );
1068        }
1069        MigrationOutcome::SkippedDestOccupied => {
1070            tracing::warn!(
1071                target: "grex::sync::migrate",
1072                "skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
1073            );
1074        }
1075        MigrationOutcome::Failed { error } => {
1076            tracing::warn!(
1077                target: "grex::sync::migrate",
1078                "failed: legacy={from_disp} -> new={to_disp}: {error}",
1079            );
1080        }
1081    }
1082}
1083
1084/// Best-effort cleanup of the legacy workspace root after migration:
1085/// remove the orphan `.grex.sync.lock` (always safe — the v1.1.0
1086/// workspace lock lives at `<pack_root>/.grex.sync.lock`) and try to
1087/// rmdir the now-empty `.grex/workspace/` directory. Errors are logged
1088/// at trace level only — both leftovers are harmless.
1089fn cleanup_legacy_workspace_root(legacy_root: &Path) {
1090    let orphan_lock = legacy_root.join(".grex.sync.lock");
1091    if orphan_lock.exists() {
1092        if let Err(e) = fs::remove_file(&orphan_lock) {
1093            tracing::warn!(
1094                target: "grex::sync::migrate",
1095                "could not remove orphan lock `{}`: {e}",
1096                orphan_lock.display(),
1097            );
1098        } else {
1099            tracing::info!(
1100                target: "grex::sync::migrate",
1101                "removed orphan lock `{}`",
1102                orphan_lock.display(),
1103            );
1104        }
1105    }
1106    // `remove_dir` only succeeds when the directory is empty — exactly
1107    // what we want; if any unmigrated child remains, the legacy root
1108    // stays put for the operator to inspect.
1109    let _ = fs::remove_dir(legacy_root);
1110}
1111
1112/// Compute the default workspace path when `override_` is absent.
1113///
1114/// The default is the pack root directory itself, so child packs
1115/// resolve as flat siblings of the parent pack root. The rationale —
1116/// alignment with the long-standing pack-spec rule that
1117/// `children[].path` is a bare name — lives in the pack-spec
1118/// "Validation rules" section (`man/concepts/pack-spec.md` /
1119/// `grex-doc/src/concepts/pack-spec.md`).
1120/// v1.2.1 path (iii) — resolve the workspace anchor with canonical
1121/// symlink resolution.
1122///
1123/// Resolution rules:
1124/// * `override_ = None` ⇒ derive workspace from `pack_root_dir(pack_root)`.
1125///   No canonicalize on this branch — the pack-root path was supplied
1126///   directly by the caller and may legitimately reference a not-yet-real
1127///   directory (e.g. integration fixtures that lazily materialise the
1128///   pack root).
1129/// * `override_ = Some(path)`:
1130///   1. **Must-exist** check. A `--workspace` override pointing at a
1131///      non-existent directory is a fail-fast error (we won't silently
1132///      `mkdir -p` someone else's typo).
1133///   2. **Canonicalise.** Resolve symlinks to a real path. This is the
1134///      anchor every downstream pass (`sync_meta`, `build_graph`, the
1135///      lockfile reads, the TOCTOU `BoundedDir` opens) hangs off — they
1136///      MUST agree on a single inode-stable string.
1137///   3. **Log when input != canonical.** Surfaces symlink resolution to
1138///      operators so they can correlate workspace-busy diagnostics with
1139///      what the OS actually opened.
1140fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> Result<PathBuf, SyncError> {
1141    let Some(input) = override_ else {
1142        return Ok(pack_root_dir(pack_root));
1143    };
1144    if !input.exists() {
1145        return Err(SyncError::Validation {
1146            errors: vec![PackValidationError::DependsOnUnsatisfied {
1147                pack: "<workspace>".into(),
1148                required: format!("--workspace {}: directory does not exist", input.display()),
1149            }],
1150        });
1151    }
1152    let canonical = match input.canonicalize() {
1153        Ok(p) => p,
1154        Err(e) => {
1155            return Err(SyncError::Validation {
1156                errors: vec![PackValidationError::DependsOnUnsatisfied {
1157                    pack: "<workspace>".into(),
1158                    required: format!("--workspace {}: canonicalize failed: {e}", input.display()),
1159                }],
1160            });
1161        }
1162    };
1163    if canonical != input {
1164        tracing::info!(
1165            target: "grex::sync",
1166            "workspace: {} → {}",
1167            input.display(),
1168            canonical.display(),
1169        );
1170    }
1171    Ok(canonical)
1172}
1173
1174/// Resolve the workspace, ensure the directory exists, and run the v1→v2
1175/// event-log migration. Extracted so [`run`] and [`teardown`] stay under
1176/// the workspace's 50-LOC per-function lint threshold.
1177fn prepare_workspace(pack_root: &Path, opts: &SyncOptions) -> Result<PathBuf, SyncError> {
1178    let workspace = resolve_workspace(pack_root, opts.workspace.as_deref())?;
1179    ensure_workspace_dir(&workspace)?;
1180    crate::manifest::ensure_event_log_migrated(&workspace).map_err(SyncError::EventLogMigration)?;
1181    Ok(workspace)
1182}
1183
1184/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
1185fn pack_root_dir(pack_root: &Path) -> PathBuf {
1186    let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
1187    if is_yaml {
1188        pack_root
1189            .parent()
1190            .and_then(Path::parent)
1191            .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
1192    } else {
1193        pack_root.to_path_buf()
1194    }
1195}
1196
1197/// Compute the `.grex/events.jsonl` path next to the pack root.
1198///
1199/// Delegates to [`crate::manifest::event_log_path`] (single source of
1200/// truth for the canonical event-log location).
1201fn event_log_path(pack_root: &Path) -> PathBuf {
1202    crate::manifest::event_log_path(&pack_root_dir(pack_root))
1203}
1204
1205/// Compute the sidecar lock path next to the event log. One canonical slot
1206/// per pack root — cooperating grex procs serialize through this file.
1207fn event_lock_path(event_log: &Path) -> PathBuf {
1208    event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
1209}
1210
1211/// Compute the sidecar lock path for the workspace itself. Lives at
1212/// `<workspace>/.grex.sync.lock` — the workspace dir is already created by
1213/// the `run()` prologue, so the lock sidecar lands beside the child clones.
1214fn workspace_lock_path(workspace: &Path) -> PathBuf {
1215    workspace.join(".grex.sync.lock")
1216}
1217
1218/// Aggregate manifest-level + graph-level validators and return their output.
1219fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
1220    let mut errors: Vec<PackValidationError> = Vec::new();
1221    for node in graph.nodes() {
1222        if let Err(mut e) = node.manifest.validate_plan() {
1223            errors.append(&mut e);
1224        }
1225    }
1226    if let Err(mut e) = graph.validate() {
1227        errors.append(&mut e);
1228    }
1229    if errors.is_empty() {
1230        Ok(())
1231    } else {
1232        Err(SyncError::Validation { errors })
1233    }
1234}
1235
1236/// Depth-first post-order traversal of the graph starting from root.
1237///
1238/// Children fully precede their parent in the returned vector so downstream
1239/// executors install leaves first and the root last.
1240fn post_order(graph: &PackGraph) -> Vec<usize> {
1241    let mut out = Vec::with_capacity(graph.nodes().len());
1242    visit_post(graph, 0, &mut out);
1243    out
1244}
1245
1246fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
1247    // Collect child ids first to avoid borrow conflicts with graph iteration.
1248    let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
1249    for k in kids {
1250        visit_post(graph, k, out);
1251    }
1252    out.push(id);
1253}
1254
1255/// Drive every action for every node; abort on the first [`ExecError`].
1256///
1257/// Each action is bracketed by three manifest events:
1258/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
1259/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
1260/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
1261///
1262/// All three writes go through the same [`ManifestLock`]-wrapped path
1263/// ([`append_manifest_event`]) and failures are recorded as non-fatal
1264/// warnings so the executor's outcome always dominates. The third append
1265/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
1266/// with the exact action that halted.
1267// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
1268// now pushes past the 50-LOC per-function lint by one line. Silence
1269// that one — the body itself is unchanged in scope.
1270#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1271fn run_actions(
1272    report: &mut SyncReport,
1273    order: &[usize],
1274    vars: &VarEnv,
1275    workspace: &Path,
1276    event_log: &Path,
1277    lock_path: &Path,
1278    dry_run: bool,
1279    prior_lock: &std::collections::HashMap<String, LockEntry>,
1280    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1281    registry: &Arc<Registry>,
1282    pack_type_registry: &Arc<PackTypeRegistry>,
1283    only: Option<&GlobSet>,
1284    force: bool,
1285    parallel: usize,
1286    scheduler: &Arc<Scheduler>,
1287) {
1288    let plan = PlanExecutor::with_registry(registry.clone());
1289    let fs = FsExecutor::with_registry(registry.clone());
1290    let rt = build_pack_type_runtime(parallel);
1291    let visited_meta = new_visited_meta();
1292    for &id in order {
1293        let Some(node) = report.graph.node(id) else { continue };
1294        let pack_name = node.name.clone();
1295        let pack_path = node.path.clone();
1296        let actions = node.manifest.actions.clone();
1297        let manifest = node.manifest.clone();
1298        let commit_sha = node.commit_sha.clone().unwrap_or_default();
1299        let synthetic = node.synthetic;
1300        // `--only` filter + skip-on-hash short-circuits colocated in
1301        // `try_skip_or_filter` so this outer loop stays within the
1302        // 50-LOC per-function budget.
1303        if try_skip_or_filter(
1304            report,
1305            only,
1306            &pack_name,
1307            &pack_path,
1308            &actions,
1309            &commit_sha,
1310            synthetic,
1311            workspace,
1312            prior_lock,
1313            next_lock,
1314            dry_run,
1315            force,
1316        ) {
1317            continue;
1318        }
1319        let pack_halted = run_pack_lifecycle(
1320            report,
1321            vars,
1322            workspace,
1323            event_log,
1324            lock_path,
1325            dry_run,
1326            &plan,
1327            &fs,
1328            registry,
1329            pack_type_registry,
1330            &rt,
1331            &pack_name,
1332            &pack_path,
1333            &manifest,
1334            &visited_meta,
1335            scheduler,
1336        );
1337        if pack_halted {
1338            // Route (b) halt-state gating: drop any prior entry for the
1339            // halted pack so the next sync sees no prior hash and
1340            // re-executes from scratch. Successful packs in this same
1341            // run keep their freshly-upserted entries, and packs we did
1342            // not reach keep their prior entries untouched.
1343            next_lock.remove(&pack_name);
1344            return;
1345        }
1346        // Successful pack — record a fresh lockfile entry so the next
1347        // run's skip-on-hash test can succeed. Commit SHA is now plumbed
1348        // from the walker (M4-D): `PackNode::commit_sha` carries the
1349        // resolved HEAD SHA when the pack's working tree is a git
1350        // repository, otherwise an empty string keeps the hash stable.
1351        let actions_hash = compute_actions_hash(&actions, &commit_sha);
1352        upsert_lock_entry(prior_lock, next_lock, &pack_name, &commit_sha, &actions_hash, synthetic);
1353    }
1354}
1355
1356/// Build the multi-thread tokio runtime used to drive async pack-type
1357/// plugin dispatch. Pack-type plugins expose `async fn` methods via
1358/// `async_trait`, but the sync driver is synchronous end-to-end — we
1359/// block on each plugin future inside the outer action loop. Extracted
1360/// into a standalone helper so the runtime construction does not
1361/// inflate `run_actions` beyond the 50-LOC per-function budget.
1362///
1363/// # Multi-thread rationale (M5-2c)
1364///
1365/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
1366/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
1367/// itself is purely `async` / `.await` (no nested `block_on`), but future
1368/// plugin authors may reasonably compose `block_on` calls inside
1369/// lifecycle hooks — and external callers that drive `MetaPlugin` via
1370/// `rt.block_on(...)` within their own runtime would deadlock on a
1371/// current-thread runtime the moment a hook re-enters. A multi-thread
1372/// runtime with a small worker pool lets those re-entries resolve on a
1373/// sibling worker instead of blocking the dispatcher thread.
1374///
1375/// # Worker-thread sizing (feat-m6 H6)
1376///
1377/// The worker pool is sized from the resolved `--parallel` knob so the
1378/// runtime always has enough workers to service every in-flight pack op
1379/// plus at least one sibling for nested `block_on`. Clamped to
1380/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
1381/// one sibling so re-entrant hooks never deadlock), and the upper bound
1382/// caps the pool at the host's CPU count so `--parallel 0`
1383/// (unbounded-semantics) does not explode the worker count.
1384fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
1385    let workers = parallel.clamp(2, num_cpus::get().max(2));
1386    tokio::runtime::Builder::new_multi_thread()
1387        .worker_threads(workers)
1388        .enable_all()
1389        .build()
1390        .expect("tokio runtime for pack-type dispatch")
1391}
1392
1393/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
1394/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
1395/// the argument is threaded through so future explicit-install /
1396/// teardown verbs can share the same set shape.
1397fn new_visited_meta() -> MetaVisitedSet {
1398    std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
1399}
1400
1401/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
1402/// `true` when the outer loop should `continue` for this pack.
1403///
1404/// Extracted from `run_actions` so that function stays under the
1405/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
1406/// is a pure structural refactor.
1407#[allow(clippy::too_many_arguments)]
1408fn try_skip_or_filter(
1409    report: &mut SyncReport,
1410    only: Option<&GlobSet>,
1411    pack_name: &str,
1412    pack_path: &Path,
1413    actions: &[Action],
1414    commit_sha: &str,
1415    current_synthetic: bool,
1416    workspace: &Path,
1417    prior_lock: &std::collections::HashMap<String, LockEntry>,
1418    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1419    dry_run: bool,
1420    force: bool,
1421) -> bool {
1422    if skip_for_only_filter(only, pack_name, pack_path, workspace) {
1423        if let Some(prev) = prior_lock.get(pack_name) {
1424            next_lock.insert(pack_name.to_string(), prev.clone());
1425        }
1426        return true;
1427    }
1428    try_skip_pack(
1429        report,
1430        pack_name,
1431        pack_path,
1432        actions,
1433        commit_sha,
1434        current_synthetic,
1435        prior_lock,
1436        next_lock,
1437        dry_run,
1438        force,
1439    )
1440}
1441
1442/// Return `true` when `--only` is active and the pack's
1443/// **workspace-relative path** (normalized to forward-slash form) does
1444/// not match any of the registered globs. Name-fallback matching was
1445/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
1446/// "pack paths" and cross-platform consistency requires a single
1447/// normalized representation rather than `display()`-formatted strings
1448/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
1449/// as a glob-escape, not a path separator). For the root pack whose
1450/// `pack_path` is not under `workspace`, the fallback is to match
1451/// against the absolute path's forward-slash form.
1452fn skip_for_only_filter(
1453    only: Option<&GlobSet>,
1454    pack_name: &str,
1455    pack_path: &Path,
1456    workspace: &Path,
1457) -> bool {
1458    let Some(set) = only else { return false };
1459    let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
1460    let rel_str = rel.to_string_lossy().replace('\\', "/");
1461    let matches = set.is_match(&rel_str);
1462    if !matches {
1463        tracing::info!(
1464            target: "grex::sync",
1465            "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
1466        );
1467    }
1468    !matches
1469}
1470
1471/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
1472///
1473/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
1474/// with a pack-type-aware dispatch:
1475///
1476/// * [`PackType::Declarative`] retains the per-action execution shape that
1477///   M4 shipped — each action lands its own `ActionStarted` /
1478///   `ActionCompleted` / `ActionHalted` event bracket. The registry is
1479///   still consulted via [`PackTypeRegistry::get`] as a name-oracle so
1480///   mistyped packs fail closed.
1481/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
1482///   pack-type plugin's `sync` method (the sync CLI verb is the only
1483///   caller in M5-1; `install` / `update` / `teardown` verbs wire in
1484///   M5-2), returning a single aggregate [`ExecStep`]. A single event
1485///   bracket frames the async call.
1486///
1487/// Declarative is kept on the legacy per-action path because its event log
1488/// semantics (one event per action, per-step rollback context) are exactly
1489/// what plugin authors expect to observe. Unifying declarative under the
1490/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
1491/// to emit a step stream rather than a single aggregate.
1492#[allow(clippy::too_many_arguments)]
1493fn run_pack_lifecycle(
1494    report: &mut SyncReport,
1495    vars: &VarEnv,
1496    workspace: &Path,
1497    event_log: &Path,
1498    lock_path: &Path,
1499    dry_run: bool,
1500    plan: &PlanExecutor,
1501    fs: &FsExecutor,
1502    registry: &Arc<Registry>,
1503    pack_type_registry: &Arc<PackTypeRegistry>,
1504    rt: &tokio::runtime::Runtime,
1505    pack_name: &str,
1506    pack_path: &Path,
1507    manifest: &crate::pack::PackManifest,
1508    visited_meta: &MetaVisitedSet,
1509    scheduler: &Arc<Scheduler>,
1510) -> bool {
1511    let type_tag = manifest.r#type.as_str();
1512    // Name-oracle check: every pack type must be registered. Unknown
1513    // pack types halt the pack the same way M4 halted unknown actions.
1514    if pack_type_registry.get(type_tag).is_none() {
1515        let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1516        record_action_err(report, event_log, lock_path, pack_name, 0, "pack-type", err);
1517        return true;
1518    }
1519    match manifest.r#type {
1520        crate::pack::PackType::Declarative => run_declarative_actions(
1521            report,
1522            vars,
1523            workspace,
1524            event_log,
1525            lock_path,
1526            dry_run,
1527            plan,
1528            fs,
1529            pack_name,
1530            pack_path,
1531            manifest,
1532            &manifest.actions,
1533            scheduler,
1534        ),
1535        crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
1536            report,
1537            vars,
1538            workspace,
1539            event_log,
1540            lock_path,
1541            registry,
1542            pack_type_registry,
1543            rt,
1544            pack_name,
1545            pack_path,
1546            manifest,
1547            type_tag,
1548            visited_meta,
1549            scheduler,
1550        ),
1551    }
1552}
1553
1554/// Run a declarative pack's actions sequentially. Preserves the M4
1555/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1556/// `ActionHalted`). Returns `true` when the sync must halt.
1557#[allow(clippy::too_many_arguments)]
1558fn run_declarative_actions(
1559    report: &mut SyncReport,
1560    vars: &VarEnv,
1561    workspace: &Path,
1562    event_log: &Path,
1563    lock_path: &Path,
1564    dry_run: bool,
1565    plan: &PlanExecutor,
1566    fs: &FsExecutor,
1567    pack_name: &str,
1568    pack_path: &Path,
1569    manifest: &crate::pack::PackManifest,
1570    actions: &[Action],
1571    scheduler: &Arc<Scheduler>,
1572) -> bool {
1573    // `apply_gitignore` is called per-lifecycle by each PackTypePlugin
1574    // for meta/scripted, and here for declarative (which bypasses the
1575    // plugin in `sync::run`'s per-action driver). Keeping plugins as
1576    // the single apply site everywhere else means the declarative
1577    // per-action path is the only code outside the PackTypePlugin
1578    // surface that needs a direct apply call.
1579    if !dry_run {
1580        let ctx = ExecCtx::new(vars, pack_path, workspace)
1581            .with_platform(Platform::current())
1582            .with_scheduler(scheduler);
1583        if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1584            record_action_err(report, event_log, lock_path, pack_name, 0, "gitignore", e);
1585            return true;
1586        }
1587    }
1588    for (idx, action) in actions.iter().enumerate() {
1589        let ctx = ExecCtx::new(vars, pack_path, workspace)
1590            .with_platform(Platform::current())
1591            .with_scheduler(scheduler);
1592        let action_tag = action_kind_tag(action);
1593        append_manifest_event(
1594            event_log,
1595            lock_path,
1596            &Event::ActionStarted {
1597                ts: Utc::now(),
1598                pack: pack_name.to_string(),
1599                action_idx: idx,
1600                action_name: action_tag.to_string(),
1601            },
1602            &mut report.event_log_warnings,
1603        );
1604        let step_result =
1605            if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1606        if !record_action_outcome(
1607            report,
1608            event_log,
1609            lock_path,
1610            pack_name,
1611            idx,
1612            action_tag,
1613            step_result,
1614        ) {
1615            return true;
1616        }
1617    }
1618    false
1619}
1620
1621/// Dispatch a pack-type plugin (meta / scripted) through the async
1622/// registry. Brackets the call with a single `ActionStarted` /
1623/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1624/// when the sync must halt.
1625#[allow(clippy::too_many_arguments)]
1626fn dispatch_pack_type_plugin(
1627    report: &mut SyncReport,
1628    vars: &VarEnv,
1629    workspace: &Path,
1630    event_log: &Path,
1631    lock_path: &Path,
1632    registry: &Arc<Registry>,
1633    pack_type_registry: &Arc<PackTypeRegistry>,
1634    rt: &tokio::runtime::Runtime,
1635    pack_name: &str,
1636    pack_path: &Path,
1637    manifest: &crate::pack::PackManifest,
1638    type_tag: &'static str,
1639    visited_meta: &MetaVisitedSet,
1640    scheduler: &Arc<Scheduler>,
1641) -> bool {
1642    // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1643    // The sync driver already walks children in post-order via the tree
1644    // walker; attaching the visited set would trigger MetaPlugin's
1645    // real-recursion branch and cause double dispatch (walker runs child
1646    // packs as their own graph nodes, then MetaPlugin would recurse into
1647    // them again). The `visited_meta` parameter is kept on the argument
1648    // list so future explicit-install / teardown verbs that invoke
1649    // MetaPlugin directly can share the same set shape.
1650    let _ = visited_meta;
1651    let ctx = ExecCtx::new(vars, pack_path, workspace)
1652        .with_platform(Platform::current())
1653        .with_registry(registry)
1654        .with_pack_type_registry(pack_type_registry)
1655        .with_scheduler(scheduler);
1656    append_manifest_event(
1657        event_log,
1658        lock_path,
1659        &Event::ActionStarted {
1660            ts: Utc::now(),
1661            pack: pack_name.to_string(),
1662            action_idx: 0,
1663            action_name: type_tag.to_string(),
1664        },
1665        &mut report.event_log_warnings,
1666    );
1667    // SAFETY: `get` just confirmed the plugin is registered for
1668    // `type_tag`, so this unwrap cannot panic under the matched arm.
1669    let plugin = pack_type_registry
1670        .get(type_tag)
1671        .expect("pack-type plugin must be registered (guarded above)");
1672    // feat-m6 CI fix — establish a task-local tier stack frame for every
1673    // async dispatch. Without this, `TierGuard::push` (which runs inside
1674    // the plugin lifecycle and may span `.await` / thread hops under the
1675    // multi-thread runtime) has no enforcement frame to push into.
1676    let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1677    !record_action_outcome(report, event_log, lock_path, pack_name, 0, type_tag, step_result)
1678}
1679
1680/// Pure skip-eligibility decision. Returns `Some(hash)` when the pack
1681/// is eligible for the hash-skip short-circuit, `None` otherwise.
1682///
1683/// Splitting the decision out of [`try_skip_pack`] keeps the
1684/// side-effecting transcript bookkeeping testable in isolation: the
1685/// v1.1.1 synthetic-flag-flip regression exercises this helper without
1686/// having to stand up a `SyncReport` / `PackGraph`.
1687fn skip_eligibility(
1688    actions: &[Action],
1689    commit_sha: &str,
1690    current_synthetic: bool,
1691    prior: &LockEntry,
1692    dry_run: bool,
1693    force: bool,
1694) -> Option<String> {
1695    if dry_run || force {
1696        // Dry runs must always produce the planned-step transcript so
1697        // authors can see what `sync` *would* do. `--force` is the
1698        // operator's explicit opt-out from the hash short-circuit.
1699        return None;
1700    }
1701    let hash = compute_actions_hash(actions, commit_sha);
1702    if prior.actions_hash != hash {
1703        return None;
1704    }
1705    if prior.synthetic != current_synthetic {
1706        // Pack-shape flipped between runs (real ↔ synthetic). Even
1707        // when the actions hash matches by coincidence (e.g. a
1708        // declarative pack with empty `actions[]` whose pack.yaml was
1709        // deleted, falling through to a synthetic leaf with the same
1710        // empty actions list and stable commit SHA), we must NOT
1711        // carry the stale `synthetic` flag forward. Forcing the
1712        // upsert path re-emits the entry with the current flag.
1713        return None;
1714    }
1715    Some(hash)
1716}
1717
1718/// Decide whether `pack_name` can be short-circuited via a lockfile
1719/// hash match. When the prior hash matches the freshly-computed hash,
1720/// emit a single [`ExecResult::Skipped`] step and carry the prior
1721/// lockfile entry forward unchanged. Returns `true` when the pack was
1722/// skipped.
1723///
1724/// `current_synthetic` is the walker-derived synthetic flag for this
1725/// pack on the current run. The skip eligibility check requires it to
1726/// match `prior.synthetic` so a pack-shape transition (e.g. user
1727/// deletes `pack.yaml` so a previously-real pack now walks as
1728/// synthetic) invalidates the skip and forces the lockfile entry to
1729/// be re-emitted with the fresh `synthetic` value.
1730#[allow(clippy::too_many_arguments)]
1731fn try_skip_pack(
1732    report: &mut SyncReport,
1733    pack_name: &str,
1734    pack_path: &Path,
1735    actions: &[Action],
1736    commit_sha: &str,
1737    current_synthetic: bool,
1738    prior_lock: &std::collections::HashMap<String, LockEntry>,
1739    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1740    dry_run: bool,
1741    force: bool,
1742) -> bool {
1743    let Some(prior) = prior_lock.get(pack_name) else {
1744        return false;
1745    };
1746    let Some(hash) =
1747        skip_eligibility(actions, commit_sha, current_synthetic, prior, dry_run, force)
1748    else {
1749        return false;
1750    };
1751    let skipped_step = ExecStep {
1752        action_name: Cow::Borrowed("pack"),
1753        result: ExecResult::Skipped {
1754            pack_path: pack_path.to_path_buf(),
1755            actions_hash: hash.clone(),
1756        },
1757        // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1758        // short-circuit detail; we use it here instead of the prior
1759        // `Require { Satisfied, Skip }` proxy so renderers and consumers
1760        // can match on a single, purpose-built variant.
1761        details: StepKind::PackSkipped { actions_hash: hash },
1762    };
1763    report.steps.push(SyncStep {
1764        pack: pack_name.to_string(),
1765        action_idx: 0,
1766        exec_step: skipped_step,
1767    });
1768    // Carry the prior entry forward so the next-lock snapshot stays
1769    // consistent with what's on disk.
1770    next_lock.insert(pack_name.to_string(), prior.clone());
1771    true
1772}
1773
1774/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1775///
1776/// Stores `commit_sha` verbatim — including the empty string when the
1777/// pack is not a git working tree or the HEAD probe failed.
1778/// `actions_hash` is computed over the same `commit_sha`, so the two
1779/// fields stay internally consistent: if probing starts returning a
1780/// non-empty SHA on the next run, the hash differs and the skip is
1781/// correctly invalidated. The prior-preserve carve-out that was
1782/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1783/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1784///
1785/// `prior_lock` is consulted purely for observability: when a
1786/// previously-real pack flips to synthetic between runs (user deleted
1787/// the pack's `pack.yaml` so the walker fell back to v1.1.1
1788/// plain-git-child synthesis), a `tracing::warn!` records the
1789/// downgrade so the operator notices their declarative actions have
1790/// stopped running.
1791fn upsert_lock_entry(
1792    prior_lock: &std::collections::HashMap<String, LockEntry>,
1793    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1794    pack_name: &str,
1795    commit_sha: &str,
1796    actions_hash: &str,
1797    synthetic: bool,
1798) {
1799    if synthetic {
1800        if let Some(prior) = prior_lock.get(pack_name) {
1801            if !prior.synthetic {
1802                tracing::warn!(
1803                    target: "grex::sync",
1804                    pack = pack_name,
1805                    "pack `{pack_name}` downgraded from real to synthetic — \
1806                     pack.yaml missing on disk; only `git pull` will run going forward",
1807                );
1808            }
1809        }
1810    }
1811    let installed_at = Utc::now();
1812    let entry = next_lock.get(pack_name).map_or_else(
1813        || LockEntry {
1814            id: pack_name.to_string(),
1815            // v1.1.1 convention: path == id (1:1 id↔folder). Stage 1.e
1816            // (walker rewrite) will replace this with the parent-relative
1817            // manifest path captured during the walk.
1818            path: pack_name.to_string(),
1819            sha: commit_sha.to_string(),
1820            branch: String::new(),
1821            installed_at,
1822            actions_hash: actions_hash.to_string(),
1823            schema_version: "1".to_string(),
1824            synthetic,
1825        },
1826        |prev| LockEntry {
1827            installed_at,
1828            actions_hash: actions_hash.to_string(),
1829            sha: commit_sha.to_string(),
1830            synthetic,
1831            ..prev.clone()
1832        },
1833    );
1834    next_lock.insert(pack_name.to_string(), entry);
1835}
1836
1837/// Record one action outcome into `report` + event log. Returns `false`
1838/// when the run must halt (on error); `true` otherwise.
1839fn record_action_outcome(
1840    report: &mut SyncReport,
1841    event_log: &Path,
1842    lock_path: &Path,
1843    pack_name: &str,
1844    idx: usize,
1845    action_tag: &'static str,
1846    step_result: Result<ExecStep, ExecError>,
1847) -> bool {
1848    match step_result {
1849        Ok(step) => {
1850            record_action_ok(report, event_log, lock_path, pack_name, idx, step);
1851            true
1852        }
1853        Err(e) => {
1854            record_action_err(report, event_log, lock_path, pack_name, idx, action_tag, e);
1855            false
1856        }
1857    }
1858}
1859
1860/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1861/// audit event, then push the step onto the report.
1862fn record_action_ok(
1863    report: &mut SyncReport,
1864    event_log: &Path,
1865    lock_path: &Path,
1866    pack_name: &str,
1867    idx: usize,
1868    step: ExecStep,
1869) {
1870    append_step_event(event_log, lock_path, pack_name, &step, &mut report.event_log_warnings);
1871    append_manifest_event(
1872        event_log,
1873        lock_path,
1874        &Event::ActionCompleted {
1875            ts: Utc::now(),
1876            pack: pack_name.to_string(),
1877            action_idx: idx,
1878            result_summary: format!("{:?}", step.result),
1879        },
1880        &mut report.event_log_warnings,
1881    );
1882    report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1883}
1884
1885/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
1886/// rich `HaltedContext` into `report.halted`.
1887fn record_action_err(
1888    report: &mut SyncReport,
1889    event_log: &Path,
1890    lock_path: &Path,
1891    pack_name: &str,
1892    idx: usize,
1893    action_tag: &'static str,
1894    e: ExecError,
1895) {
1896    let error_summary = truncate_error_summary(&e);
1897    append_manifest_event(
1898        event_log,
1899        lock_path,
1900        &Event::ActionHalted {
1901            ts: Utc::now(),
1902            pack: pack_name.to_string(),
1903            action_idx: idx,
1904            action_name: action_tag.to_string(),
1905            error_summary,
1906        },
1907        &mut report.event_log_warnings,
1908    );
1909    let recovery_hint = recovery_hint_for(&e);
1910    report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
1911        pack: pack_name.to_string(),
1912        action_idx: idx,
1913        action_name: action_tag.to_string(),
1914        error: e,
1915        recovery_hint,
1916    })));
1917}
1918
1919/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
1920/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
1921/// stays uniform.
1922fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
1923    use crate::pack::Action;
1924    match action {
1925        Action::Symlink(_) => "symlink",
1926        Action::Unlink(_) => "unlink",
1927        Action::Env(_) => "env",
1928        Action::Mkdir(_) => "mkdir",
1929        Action::Rmdir(_) => "rmdir",
1930        Action::Require(_) => "require",
1931        Action::When(_) => "when",
1932        Action::Exec(_) => "exec",
1933    }
1934}
1935
1936/// Produce a bounded human summary of an [`ExecError`] for
1937/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
1938/// from pathological blowup when captured stderr is large.
1939fn truncate_error_summary(err: &ExecError) -> String {
1940    let mut s = err.to_string();
1941    if s.len() > ACTION_ERROR_SUMMARY_MAX {
1942        s.truncate(ACTION_ERROR_SUMMARY_MAX);
1943        s.push_str("…[truncated]");
1944    }
1945    s
1946}
1947
1948/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
1949/// `None` when no generic advice applies; the error's own `Display`
1950/// output is already shown by the `Halted` variant's format string.
1951fn recovery_hint_for(err: &ExecError) -> Option<String> {
1952    match err {
1953        ExecError::SymlinkDestOccupied { .. } => Some(
1954            "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
1955                .into(),
1956        ),
1957        ExecError::SymlinkPrivilegeDenied { .. } => {
1958            Some("enable Windows Developer Mode or re-run grex as administrator".into())
1959        }
1960        ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
1961            Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
1962        }
1963        ExecError::RmdirNotEmpty { .. } => {
1964            Some("set `force: true` on the rmdir action to recurse".into())
1965        }
1966        ExecError::EnvPersistenceDenied { .. } => {
1967            Some("re-run elevated (Machine scope needs admin)".into())
1968        }
1969        _ => None,
1970    }
1971}
1972
1973/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
1974///
1975/// Failures log a warning and are recorded in the report's
1976/// `event_log_warnings`; they do not abort the sync (spec: event-log write
1977/// failures are non-fatal).
1978///
1979/// # Concurrency
1980///
1981/// The append is serialized through a [`ManifestLock`] held across the
1982/// write. The lock is acquired **per action** (not once across the full
1983/// traversal) so cooperating grex processes can observe mid-progress log
1984/// state between actions; fd-lock acquisition is cheap on modern kernels
1985/// and sync runs are dominated by executor side effects, not lock waits.
1986/// This closes the bypass gap surfaced by the M3 concurrency review where
1987/// `append_event` was called without any cross-process serialisation.
1988fn append_step_event(
1989    log: &Path,
1990    lock_path: &Path,
1991    pack: &str,
1992    step: &ExecStep,
1993    warnings: &mut Vec<String>,
1994) {
1995    let summary = format!("{}:{:?}", step.action_name, step.result);
1996    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
1997    if let Err(e) = append_event_locked(log, lock_path, &event) {
1998        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
1999        warnings.push(format!("{}: {e}", log.display()));
2000    }
2001    // Schema version is recorded once at the manifest level by existing
2002    // manifest code; this stub uses the constant to keep a single source of
2003    // truth for forward-compat.
2004    let _ = SCHEMA_VERSION;
2005}
2006
2007/// Append a single [`Event`] under the shared [`ManifestLock`] path.
2008/// Failures are logged and recorded as non-fatal warnings — the spec
2009/// marks event-log write failures as non-aborting so a transient disk
2010/// error must not kill a sync mid-stream.
2011fn append_manifest_event(log: &Path, lock_path: &Path, event: &Event, warnings: &mut Vec<String>) {
2012    if let Err(e) = append_event_locked(log, lock_path, event) {
2013        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2014        warnings.push(format!("{}: {e}", log.display()));
2015    }
2016}
2017
2018/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
2019/// created lazily on first write.
2020fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
2021    if let Some(parent) = log.parent() {
2022        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2023    }
2024    if let Some(parent) = lock_path.parent() {
2025        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2026    }
2027    let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
2028    lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
2029}
2030
2031/// Re-export a cheap helper so CLI renderers can label halted steps by node
2032/// name without reaching into the graph twice.
2033#[must_use]
2034pub fn pack_display_name(node: &PackNode) -> &str {
2035    &node.name
2036}
2037
2038/// Run a full teardown over the pack tree rooted at `pack_root`.
2039///
2040/// Mirrors [`run`] but invokes
2041/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
2042/// **reverse** post-order so a parent tears down before its children
2043/// (the inverse of install). Children composed later by an author
2044/// consequently teardown earlier, matching the declarative
2045/// auto-reverse contract (R-M5-11).
2046///
2047/// All other concerns are identical to [`run`]: workspace lock, plan-
2048/// phase validators, lockfile update skipped (teardown does not
2049/// write a `actions_hash` forward), and event-log bracketing.
2050/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
2051/// user explicitly asked to remove the pack, so we always dispatch.
2052///
2053/// # Errors
2054///
2055/// Returns the first error that halts the pipeline — see [`SyncError`].
2056///
2057/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
2058/// the parameter through teardown for parity; stages 3-4 add the polls.
2059pub fn teardown(
2060    pack_root: &Path,
2061    opts: &SyncOptions,
2062    cancel: &CancellationToken,
2063) -> Result<SyncReport, SyncError> {
2064    let _ = cancel;
2065    let workspace = prepare_workspace(pack_root, opts)?;
2066    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
2067    let _ws_guard = match ws_lock.try_acquire() {
2068        Ok(Some(g)) => g,
2069        Ok(None) => {
2070            return Err(SyncError::WorkspaceBusy {
2071                workspace: workspace.clone(),
2072                lock_path: ws_lock_path,
2073            });
2074        }
2075        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
2076    };
2077
2078    // v1.2.1 path (iii) — teardown is read-only against the existing
2079    // disk state (no clones / fetches / prunes). It only needs the
2080    // graph build pass; `sync_meta` is intentionally skipped here.
2081    let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
2082    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
2083
2084    let mut report = SyncReport {
2085        graph,
2086        steps: Vec::new(),
2087        halted: None,
2088        event_log_warnings: Vec::new(),
2089        pre_run_recovery: prep.pre_run_recovery,
2090        // teardown does not run the legacy-layout migration — by the time
2091        // a user is tearing down, the layout has already been migrated
2092        // (or was never legacy in the first place). Surfacing an empty
2093        // list keeps the report shape symmetric with `run()`.
2094        workspace_migrations: Vec::new(),
2095    };
2096
2097    // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
2098    // Scheduler, thread it through every `ExecCtx` the teardown path
2099    // constructs. Teardown is the other user-facing verb that owns a
2100    // runtime, so it gets the same wiring.
2101    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
2102    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
2103    run_teardown(
2104        &mut report,
2105        &prep.order,
2106        &prep.vars,
2107        &workspace,
2108        &prep.event_log,
2109        &prep.lock_path,
2110        &prep.registry,
2111        &prep.pack_type_registry,
2112        resolved_parallel,
2113        &scheduler,
2114    );
2115    Ok(report)
2116}
2117
2118/// Dispatch `teardown` for every pack in **reverse** post-order.
2119/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
2120/// rather than the per-action M4 path because the trait's
2121/// auto-reverse / explicit-block logic must compose with the
2122/// registry; going through the per-action path would mean
2123/// re-implementing inverse synthesis in the sync loop.
2124#[allow(clippy::too_many_arguments)]
2125fn run_teardown(
2126    report: &mut SyncReport,
2127    order: &[usize],
2128    vars: &VarEnv,
2129    workspace: &Path,
2130    event_log: &Path,
2131    lock_path: &Path,
2132    registry: &Arc<Registry>,
2133    pack_type_registry: &Arc<PackTypeRegistry>,
2134    parallel: usize,
2135    scheduler: &Arc<Scheduler>,
2136) {
2137    let rt = build_pack_type_runtime(parallel);
2138    // Reverse post-order: root first, then children. Pack-type plugin
2139    // teardown methods reverse their own children/actions, so the
2140    // outer loop only flips the inter-pack order.
2141    for &id in order.iter().rev() {
2142        let Some(node) = report.graph.node(id) else { continue };
2143        let pack_name = node.name.clone();
2144        let pack_path = node.path.clone();
2145        let manifest = node.manifest.clone();
2146        let type_tag = manifest.r#type.as_str();
2147        if pack_type_registry.get(type_tag).is_none() {
2148            let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
2149            record_action_err(report, event_log, lock_path, &pack_name, 0, "pack-type", err);
2150            return;
2151        }
2152        let ctx = ExecCtx::new(vars, &pack_path, workspace)
2153            .with_platform(Platform::current())
2154            .with_registry(registry)
2155            .with_pack_type_registry(pack_type_registry)
2156            .with_scheduler(scheduler);
2157        append_manifest_event(
2158            event_log,
2159            lock_path,
2160            &Event::ActionStarted {
2161                ts: Utc::now(),
2162                pack: pack_name.clone(),
2163                action_idx: 0,
2164                action_name: type_tag.to_string(),
2165            },
2166            &mut report.event_log_warnings,
2167        );
2168        let plugin = pack_type_registry
2169            .get(type_tag)
2170            .expect("pack-type plugin must be registered (guarded above)");
2171        // feat-m6 CI fix — see dispatch_pack_type note.
2172        let step_result =
2173            rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
2174        if !record_action_outcome(
2175            report,
2176            event_log,
2177            lock_path,
2178            &pack_name,
2179            0,
2180            type_tag,
2181            step_result,
2182        ) {
2183            return;
2184        }
2185    }
2186}
2187
2188/// Test-only hook: append one [`Event::Sync`] through the same
2189/// [`ManifestLock`]-serialised path the sync driver uses.
2190///
2191/// Exposed so integration tests under `tests/` can exercise the locked
2192/// append helper without spinning up a full pack tree. Not intended for
2193/// downstream consumers — the signature may change without notice.
2194#[doc(hidden)]
2195pub fn __test_append_sync_event(
2196    log: &Path,
2197    lock_path: &Path,
2198    pack: &str,
2199    action_name: &str,
2200) -> Result<(), String> {
2201    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
2202    append_event_locked(log, lock_path, &event)
2203}
2204
2205// ----------------------------------------------------------------------
2206// PR E — pre-run teardown scan
2207// ----------------------------------------------------------------------
2208
2209/// One `ActionStarted` event in the manifest log that has no matching
2210/// `ActionCompleted` or `ActionHalted` peer.
2211///
2212/// Dangling starts are the primary crash signal: the process wrote the
2213/// pre-action event, then died before the executor returned. Callers
2214/// should surface these to the operator (diagnostics only this PR; a
2215/// future `grex doctor` verb will act on them).
2216#[non_exhaustive]
2217#[derive(Debug, Clone, PartialEq, Eq)]
2218pub struct DanglingStart {
2219    /// Pack that owned the halted action.
2220    pub pack: String,
2221    /// 0-based action index within the pack.
2222    pub action_idx: usize,
2223    /// Short action kind tag.
2224    pub action_name: String,
2225    /// Timestamp the `ActionStarted` event was written.
2226    pub started_at: DateTime<Utc>,
2227}
2228
2229/// Summary of teardown artifacts found under a pack root before a sync
2230/// begins.
2231///
2232/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
2233/// proceeds regardless of what the scan finds.
2234#[non_exhaustive]
2235#[derive(Debug, Clone, Default, PartialEq, Eq)]
2236pub struct RecoveryReport {
2237    /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
2238    /// original (symlink-action rollback orphan).
2239    pub orphan_backups: Vec<PathBuf>,
2240    /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
2241    /// `backup: true`.
2242    pub orphan_tombstones: Vec<PathBuf>,
2243    /// `ActionStarted` events in the log with no matching
2244    /// `ActionCompleted`/`ActionHalted`.
2245    pub dangling_starts: Vec<DanglingStart>,
2246}
2247
2248impl RecoveryReport {
2249    /// `true` when the scan found nothing worth reporting.
2250    #[must_use]
2251    pub fn is_empty(&self) -> bool {
2252        self.orphan_backups.is_empty()
2253            && self.orphan_tombstones.is_empty()
2254            && self.dangling_starts.is_empty()
2255    }
2256}
2257
2258/// Walk `workspace` and the manifest log to find crash-recovery artifacts.
2259///
2260/// Inspects:
2261///
2262/// * `workspace` for `.grex.bak` orphans and timestamped `.grex.bak.<ts>`
2263///   tombstones. The workspace IS where children materialise (whether
2264///   the default flat-sibling layout under the pack root, or an
2265///   explicit `--workspace` override directory) so this single bounded
2266///   walk covers every backup site.
2267/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
2268///   have no matching `ActionCompleted` / `ActionHalted` successor.
2269///
2270/// Non-blocking: scan errors are swallowed to an empty report so a
2271/// half-readable directory cannot kill a sync that would otherwise
2272/// succeed. Call sites that want to surface scan failures should read
2273/// the manifest directly.
2274///
2275/// Pre-`v1.1.0` post-review fix this anchored at `pack_root_dir(pack_root)`,
2276/// which missed every backup under a `--workspace` override.
2277///
2278/// # Errors
2279///
2280/// Returns [`SyncError::Validation`] only when the manifest read itself
2281/// reports corruption. Filesystem traversal errors are swallowed.
2282pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
2283    let mut report = RecoveryReport::default();
2284    walk_for_backups(workspace, &mut report);
2285    if event_log.exists() {
2286        match read_all(event_log) {
2287            Ok(events) => {
2288                report.dangling_starts = collect_dangling_starts(&events);
2289            }
2290            Err(e) => {
2291                return Err(SyncError::Validation {
2292                    errors: vec![PackValidationError::DependsOnUnsatisfied {
2293                        pack: "<event-log>".into(),
2294                        required: e.to_string(),
2295                    }],
2296                });
2297            }
2298        }
2299    }
2300    Ok(report)
2301}
2302
2303/// Shallow directory walker (bounded depth = 6) that categorizes
2304/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
2305/// report slot. Depth-limited so a pathological workspace with a deep
2306/// tree cannot stall the scan; realistic layouts are well under six
2307/// levels.
2308fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
2309    walk_for_backups_inner(root, report, 0);
2310}
2311
2312fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
2313    const MAX_DEPTH: u32 = 6;
2314    if depth > MAX_DEPTH {
2315        return;
2316    }
2317    let Ok(entries) = std::fs::read_dir(dir) else { return };
2318    for entry_result in entries {
2319        let entry = match entry_result {
2320            Ok(e) => e,
2321            Err(e) => {
2322                tracing::warn!(
2323                    target: "grex::sync::recover",
2324                    "skipping unreadable entry under `{}`: {e}",
2325                    dir.display(),
2326                );
2327                continue;
2328            }
2329        };
2330        let path = entry.path();
2331        let name = entry.file_name();
2332        let Some(name_str) = name.to_str() else { continue };
2333        if name_str.ends_with(".grex.bak") {
2334            report.orphan_backups.push(path.clone());
2335            continue;
2336        }
2337        if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
2338            // `rsplit_once` returns `(prefix, suffix)`; suffix is the
2339            // timestamp chunk. Accept any non-empty suffix — the exact
2340            // timestamp shape is `fs_executor` internal.
2341            if !rest.1.is_empty() {
2342                report.orphan_tombstones.push(path.clone());
2343                continue;
2344            }
2345        }
2346        // Recurse only into real directories (not symlinks, to avoid
2347        // traversing into the workspace's cloned repos via aliased
2348        // paths). `entry.file_type()` does NOT follow symlinks (unlike
2349        // `entry.metadata()` which would dereference and report the
2350        // target's type — defeating the very check this guards). The
2351        // symlink-skip is also explicit so the intent is recoverable
2352        // from the source: backup-recovery never crosses a symlink.
2353        let Ok(ft) = entry.file_type() else { continue };
2354        if ft.is_symlink() {
2355            continue;
2356        }
2357        if ft.is_dir() {
2358            walk_for_backups_inner(&path, report, depth + 1);
2359        }
2360    }
2361}
2362
2363/// Reduce an event stream to a list of `ActionStarted` records with no
2364/// matching terminator.
2365///
2366/// Matching is positional per `(pack, action_idx)`: a later
2367/// `ActionCompleted` or `ActionHalted` with the same key clears the
2368/// entry. Whatever remains in the map after the pass is dangling.
2369fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
2370    use std::collections::HashMap;
2371    let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
2372    for ev in events {
2373        match ev {
2374            Event::ActionStarted { ts, pack, action_idx, action_name } => {
2375                open.insert(
2376                    (pack.clone(), *action_idx),
2377                    DanglingStart {
2378                        pack: pack.clone(),
2379                        action_idx: *action_idx,
2380                        action_name: action_name.clone(),
2381                        started_at: *ts,
2382                    },
2383                );
2384            }
2385            Event::ActionCompleted { pack, action_idx, .. }
2386            | Event::ActionHalted { pack, action_idx, .. } => {
2387                open.remove(&(pack.clone(), *action_idx));
2388            }
2389            _ => {}
2390        }
2391    }
2392    let mut out: Vec<DanglingStart> = open.into_values().collect();
2393    out.sort_by_key(|a| a.started_at);
2394    out
2395}
2396
2397#[cfg(test)]
2398mod synthetic_transition_tests {
2399    //! v1.1.1 — regression cover for the pack-shape transition fixes.
2400    //!
2401    //! These tests exercise [`skip_eligibility`] / [`upsert_lock_entry`]
2402    //! directly (no walker, no fs) so the assertion is on the plumbing
2403    //! itself: skip eligibility must require synthetic-flag agreement
2404    //! even when the actions hash matches by coincidence, and the
2405    //! upsert path must record the real-to-synthetic downgrade in the
2406    //! lockfile so the operator's lockfile reflects what just happened.
2407    use super::{skip_eligibility, upsert_lock_entry, LockEntry};
2408    use crate::lockfile::compute_actions_hash;
2409    use chrono::{TimeZone, Utc};
2410    use std::collections::HashMap;
2411
2412    fn ts() -> chrono::DateTime<Utc> {
2413        Utc.with_ymd_and_hms(2026, 4, 27, 10, 0, 0).unwrap()
2414    }
2415
2416    /// Stable empty-actions hash with a fixed commit SHA. The same
2417    /// inputs feed both the prior (real) and the new (synthetic)
2418    /// configuration in the regression below, which is exactly the
2419    /// coincidental-hash-match scenario FIX 3 must catch.
2420    fn stable_hash() -> String {
2421        compute_actions_hash(&[], "deadbeef")
2422    }
2423
2424    fn prior_entry(synthetic: bool) -> LockEntry {
2425        LockEntry {
2426            id: "alpha".into(),
2427            path: "alpha".into(),
2428            sha: "deadbeef".into(),
2429            branch: "main".into(),
2430            installed_at: ts(),
2431            actions_hash: stable_hash(),
2432            schema_version: "1".into(),
2433            synthetic,
2434        }
2435    }
2436
2437    /// FIX 3 — pack flips from real → synthetic but `actions_hash` and
2438    /// `commit_sha` happen to match. The skip MUST be invalidated so
2439    /// the upsert path re-emits the lockfile entry with `synthetic =
2440    /// true`.
2441    #[test]
2442    fn skip_eligibility_invalidates_when_synthetic_flag_flips() {
2443        let prior = prior_entry(false);
2444        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2445        assert!(decision.is_none(), "skip must be invalidated when synthetic flag flips");
2446    }
2447
2448    /// Same hash, same synthetic flag → skip is allowed (baseline).
2449    #[test]
2450    fn skip_eligibility_allows_skip_when_synthetic_matches() {
2451        let prior = prior_entry(true);
2452        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2453        assert_eq!(
2454            decision.as_deref(),
2455            Some(stable_hash().as_str()),
2456            "skip must be honoured when synthetic flag matches",
2457        );
2458    }
2459
2460    /// `dry_run` and `force` always disable the skip regardless of
2461    /// flag agreement.
2462    #[test]
2463    fn skip_eligibility_respects_dry_run_and_force() {
2464        let prior = prior_entry(true);
2465        assert!(skip_eligibility(&[], "deadbeef", true, &prior, true, false).is_none());
2466        assert!(skip_eligibility(&[], "deadbeef", true, &prior, false, true).is_none());
2467    }
2468
2469    /// FIX 4 — `upsert_lock_entry` records the downgrade in the
2470    /// lockfile (entry flips to `synthetic = true`) when the prior
2471    /// entry was real. The `tracing::warn!` is fire-and-forget, but
2472    /// the lockfile transition itself is observable and must be
2473    /// correct.
2474    #[test]
2475    fn upsert_lock_entry_records_real_to_synthetic_downgrade() {
2476        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2477        prior.insert(
2478            "beta".into(),
2479            LockEntry {
2480                id: "beta".into(),
2481                path: "beta".into(),
2482                sha: "deadbeef".into(),
2483                branch: "main".into(),
2484                installed_at: ts(),
2485                actions_hash: stable_hash(),
2486                schema_version: "1".into(),
2487                synthetic: false,
2488            },
2489        );
2490        let mut next: HashMap<String, LockEntry> = HashMap::new();
2491
2492        upsert_lock_entry(&prior, &mut next, "beta", "deadbeef", &stable_hash(), true);
2493
2494        let entry = next.get("beta").expect("entry must be upserted");
2495        assert!(entry.synthetic, "downgraded entry must carry synthetic = true");
2496        assert_eq!(entry.actions_hash, stable_hash(), "actions_hash must reflect current run");
2497    }
2498
2499    /// Upsert path is a no-op for the steady-state case (synthetic →
2500    /// synthetic): the entry is replaced with the current run's
2501    /// timestamp/hash but the synthetic flag is preserved. This
2502    /// guards against an over-eager warning fire.
2503    #[test]
2504    fn upsert_lock_entry_no_op_for_steady_state_synthetic() {
2505        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2506        prior.insert(
2507            "gamma".into(),
2508            LockEntry {
2509                id: "gamma".into(),
2510                path: "gamma".into(),
2511                sha: "deadbeef".into(),
2512                branch: "main".into(),
2513                installed_at: ts(),
2514                actions_hash: stable_hash(),
2515                schema_version: "1".into(),
2516                synthetic: true,
2517            },
2518        );
2519        let mut next: HashMap<String, LockEntry> = HashMap::new();
2520
2521        upsert_lock_entry(&prior, &mut next, "gamma", "deadbeef", &stable_hash(), true);
2522
2523        let entry = next.get("gamma").expect("entry must be upserted");
2524        assert!(entry.synthetic, "synthetic must remain true on no-op refresh");
2525    }
2526}
2527
2528#[cfg(test)]
2529mod error_display_tests {
2530    //! v1.2.0 Stage 1.k — `SyncError` Display assertions.
2531    //!
2532    //! Pure construction + `to_string()` checks. Variants land dormant —
2533    //! Stage 1.g (rayon scheduler) wires `SchedulerCancelled` once
2534    //! cooperative cancel polls reach the parallel walker.
2535    use super::SyncError;
2536
2537    #[test]
2538    fn test_sync_error_scheduler_cancelled_display() {
2539        let err = SyncError::SchedulerCancelled;
2540        assert_eq!(err.to_string(), "sync cancelled by user");
2541    }
2542}
2543
2544#[cfg(test)]
2545mod sync_options_v1_2_0_tests {
2546    //! v1.2.0 Stage 1.m — leaf cover for new [`SyncOptions`] fields.
2547    //!
2548    //! These tests are mechanical default-value assertions plus simple
2549    //! builder/clone round-trips. They exist to lock down that:
2550    //!
2551    //! 1. Adding the new fields preserves v1.1.1 behavior (defaults
2552    //!    leave existing call sites observably unchanged).
2553    //! 2. The shape is what later walker stages (1.h / 1.j / 1.l) will
2554    //!    consume — if any of these fields are renamed or change type,
2555    //!    those stages must update in lock-step.
2556    //!
2557    //! The fields themselves are *dormant placeholders* at 1.m scope —
2558    //! no behavior wiring lives in this stage.
2559    use super::{pack_root_dir, resolve_workspace, SyncError, SyncOptions};
2560
2561    /// `force_prune` defaults to `false` so existing call sites refuse
2562    /// to drop dirty trees (v1.1.1 behavior).
2563    #[test]
2564    fn test_sync_options_default_force_prune_false() {
2565        let opts = SyncOptions::default();
2566        assert!(!opts.force_prune, "force_prune must default to false");
2567    }
2568
2569    /// `force_prune_with_ignored` defaults to `false` so existing call
2570    /// sites refuse to drop ignored content (v1.1.1 behavior).
2571    #[test]
2572    fn test_sync_options_default_force_prune_with_ignored_false() {
2573        let opts = SyncOptions::default();
2574        assert!(!opts.force_prune_with_ignored, "force_prune_with_ignored must default to false");
2575    }
2576
2577    /// `migrate_lockfile` defaults to `false` so the walker errors on
2578    /// legacy v1.1.1 lockfile shapes unless the caller opts in.
2579    #[test]
2580    fn test_sync_options_default_migrate_lockfile_false() {
2581        let opts = SyncOptions::default();
2582        assert!(!opts.migrate_lockfile, "migrate_lockfile must default to false");
2583    }
2584
2585    /// `recurse` defaults to `true` — the walker descends into nested
2586    /// meta-children unless `--shallow` is requested.
2587    #[test]
2588    fn test_sync_options_default_recurse_true() {
2589        let opts = SyncOptions::default();
2590        assert!(opts.recurse, "recurse must default to true");
2591    }
2592
2593    /// `max_depth` defaults to `None` — unbounded recursion when
2594    /// `recurse` is `true`.
2595    #[test]
2596    fn test_sync_options_default_max_depth_none() {
2597        let opts = SyncOptions::default();
2598        assert!(opts.max_depth.is_none(), "max_depth must default to None");
2599    }
2600
2601    /// Setting `force_prune_with_ignored = true` alongside
2602    /// `force_prune = true` is the documented "stronger" combination.
2603    /// No contradiction: `with_ignored` is the harder override and
2604    /// implies the base `force_prune` semantics. This test guards the
2605    /// invariant that both flags coexist as plain `bool` (not enum)
2606    /// so callers can set them independently without runtime panic.
2607    #[test]
2608    fn test_sync_options_force_prune_with_ignored_implies_force_prune() {
2609        let opts = SyncOptions {
2610            force_prune: true,
2611            force_prune_with_ignored: true,
2612            ..SyncOptions::default()
2613        };
2614        assert!(opts.force_prune);
2615        assert!(opts.force_prune_with_ignored);
2616    }
2617
2618    /// `max_depth = Some(n)` paired with `recurse = true` is the
2619    /// documented `--shallow=N` shape. The fields are independent
2620    /// `bool` / `Option<usize>` so callers may set `max_depth` while
2621    /// `recurse` is left at its default (`true`). Stage 1.j will
2622    /// later define the precise interaction; this test only locks
2623    /// the two fields' types and defaults.
2624    #[test]
2625    fn test_sync_options_max_depth_pairs_with_recurse() {
2626        let opts = SyncOptions { max_depth: Some(2), ..SyncOptions::default() };
2627        assert_eq!(opts.max_depth, Some(2));
2628        assert!(opts.recurse, "recurse stays at its default (true) when only max_depth is set");
2629    }
2630
2631    /// Round-trip via `Clone` — guards that all new fields participate
2632    /// in the existing `Clone` derive (no `#[clone(skip)]` slipped in).
2633    #[test]
2634    fn test_sync_options_clone_preserves_new_fields() {
2635        let opts = SyncOptions {
2636            force_prune: true,
2637            force_prune_with_ignored: true,
2638            migrate_lockfile: true,
2639            recurse: false,
2640            max_depth: Some(7),
2641            ..SyncOptions::default()
2642        };
2643        let cloned = opts.clone();
2644        assert_eq!(cloned.force_prune, opts.force_prune);
2645        assert_eq!(cloned.force_prune_with_ignored, opts.force_prune_with_ignored);
2646        assert_eq!(cloned.migrate_lockfile, opts.migrate_lockfile);
2647        assert_eq!(cloned.recurse, opts.recurse);
2648        assert_eq!(cloned.max_depth, opts.max_depth);
2649    }
2650
2651    // ------------------------------------------------------------------
2652    // v1.2.1 path (iii) — `resolve_workspace` canonicalisation tests
2653    // ------------------------------------------------------------------
2654
2655    /// `--workspace` pointing at a non-existent directory must fail
2656    /// fast with a Validation error citing the offending path. We
2657    /// explicitly do NOT mkdir-p someone else's typo — `--workspace`
2658    /// is an opt-in operator decision and a missing target is always
2659    /// a configuration mistake.
2660    #[test]
2661    fn test_resolve_workspace_errors_on_missing_override_dir() {
2662        let tmp = tempfile::tempdir().unwrap();
2663        let missing = tmp.path().join("nope");
2664        let pack_root = tmp.path();
2665        let err = resolve_workspace(pack_root, Some(missing.as_path())).expect_err("must fail");
2666        match err {
2667            SyncError::Validation { errors } => {
2668                assert!(errors.iter().any(|e| format!("{e}").contains("does not exist")));
2669            }
2670            other => panic!("expected Validation, got {other:?}"),
2671        }
2672    }
2673
2674    /// `--workspace = None` is the default cwd-meta path — no
2675    /// canonicalize, no fail-on-missing. The pack-root path is
2676    /// returned verbatim (post `pack_root_dir` normalisation).
2677    #[test]
2678    fn test_resolve_workspace_none_returns_pack_root_dir() {
2679        let tmp = tempfile::tempdir().unwrap();
2680        let pack_root = tmp.path().join("nonexistent-yet");
2681        let resolved = resolve_workspace(&pack_root, None).expect("None override is always Ok");
2682        assert_eq!(resolved, pack_root_dir(&pack_root));
2683    }
2684
2685    /// `--workspace = Some(<existing>)` returns the canonicalised path.
2686    /// On Windows this typically inserts the `\\?\` long-path prefix;
2687    /// on Unix it resolves any `..` / symlink components. Either way
2688    /// the returned path is what every downstream pass anchors against.
2689    #[test]
2690    fn test_resolve_workspace_canonicalises_existing_override() {
2691        let tmp = tempfile::tempdir().unwrap();
2692        let real = tmp.path().join("real-ws");
2693        std::fs::create_dir_all(&real).unwrap();
2694        let pack_root = tmp.path();
2695        let resolved =
2696            resolve_workspace(pack_root, Some(real.as_path())).expect("existing dir must resolve");
2697        let canonical = real.canonicalize().unwrap();
2698        assert_eq!(resolved, canonical);
2699    }
2700}