Skip to main content

grex_core/
sync.rs

1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`crate::tree::sync_meta`] +
7//!    [`crate::tree::build_graph`] + [`FsPackLoader`] + a `GitBackend`.
8//! 2. Run plan-phase validators (manifest-level + graph-level).
9//! 3. Execute every action via a pluggable [`ActionExecutor`]
10//!    ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
11//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
12//!    `.grex/events.jsonl` event log.
13//!
14//! # Traversal order
15//!
16//! Nodes are executed in **depth-first post-order**: children fully install
17//! before their parent. Rationale: parent packs commonly `require:` artifacts
18//! created by children (e.g. a parent symlink whose `src` lives inside a
19//! child). Running the root last matches the overlay-style dotfile-install
20//! intent authors expect, and it matches how `walker.walk` is structured
21//! (children are hydrated before the recursion returns).
22//!
23//! # Decoupling
24//!
25//! The CLI crate drives this module through a thin `run()` entry point;
26//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
27//! expressions, ref overrides) can land in later milestones without breaking
28//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
29//! variant set.
30
31use std::borrow::Cow;
32use std::fs;
33use std::path::{Path, PathBuf};
34use std::sync::Arc;
35
36use chrono::{DateTime, Utc};
37use globset::{Glob, GlobSet, GlobSetBuilder};
38use thiserror::Error;
39use tokio_util::sync::CancellationToken;
40
41use crate::execute::{
42    ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
43    PlanExecutor, Platform, StepKind,
44};
45use crate::fs::{ManifestLock, ScopedLock};
46use crate::git::GixBackend;
47use crate::lockfile::{
48    branch_of, compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
49};
50use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
51use crate::pack::{Action, PackValidationError};
52use crate::plugin::{PackTypeRegistry, Registry};
53use crate::scheduler::Scheduler;
54use crate::tree::{
55    build_graph, sync_meta, FsPackLoader, PackGraph, PackNode, SyncMetaOptions, TreeError,
56};
57use crate::vars::VarEnv;
58
59/// Inputs to [`run`].
60///
61/// Fields are public-writable so call sites can construct with struct
62/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
63/// so future knobs (parallelism, filter expressions, additional ref
64/// strategies) can land without breaking library consumers who
65/// constructed with explicit-literal syntax. Forces callers to use
66/// struct-update syntax (`..Default::default()`).
67#[non_exhaustive]
68#[derive(Debug, Clone)]
69pub struct SyncOptions {
70    /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
71    pub dry_run: bool,
72    /// When `false`, skip plan-phase validators (manifest + graph). Debug
73    /// escape hatch; production callers should leave this `true`.
74    pub validate: bool,
75    /// Override workspace directory. `None` → derived from `pack_root`
76    /// (the directory holding `.grex/pack.yaml`).
77    ///
78    /// **v1.2.1 path (iii) semantics**: when `Some`, this path IS the
79    /// canonical meta directory. Children resolve parent-relatively as
80    /// `<workspace>/<child.path>` and `<workspace>/.grex/pack.yaml` is
81    /// where the root manifest is read from. The path MUST exist;
82    /// symlinks are resolved via `fs::canonicalize` to a single
83    /// inode-stable form. Pre-v1.2.1 the override only re-anchored
84    /// children — that legacy split is retired.
85    pub workspace: Option<PathBuf>,
86    /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
87    /// `Some`, every child pack clone/checkout uses this ref instead of
88    /// the declared `child.ref`. Empty strings are rejected at the CLI
89    /// layer.
90    pub ref_override: Option<String>,
91    /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
92    /// strings — compiled internally via an in-crate `globset` helper so the
93    /// `globset` crate version does not leak into the public API.
94    /// `None` / empty means every pack runs (M3 semantics). Matching is
95    /// against the pack's **workspace-relative** path normalized to
96    /// forward-slash form.
97    pub only_patterns: Option<Vec<String>>,
98    /// Bypass the lockfile hash-match skip (`grex sync --force`). When
99    /// `true`, every pack re-executes even if its `actions_hash` is
100    /// unchanged from the prior lockfile.
101    pub force: bool,
102    /// Max parallel pack ops for this sync run (feat-m6-1).
103    ///
104    /// * `None` → callers default to `num_cpus::get()` at CLI layer.
105    ///   Library callers who construct `SyncOptions` directly and leave
106    ///   this `None` get `num_cpus::get()` semantics too — the sync
107    ///   driver resolves the default in one place so the scheduler slot
108    ///   on every `ExecCtx` is always populated.
109    /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
110    /// * `Some(1)` → serial fast-path.
111    /// * `Some(n >= 2)` → bounded parallel.
112    pub parallel: Option<usize>,
113    /// v1.2.0 Stage 1.l prep — when `true`, walker Phase 2 may drop
114    /// dirty trees during prune. Still refuses ignored content unless
115    /// [`SyncOptions::force_prune_with_ignored`] is also `true`.
116    /// Default `false` preserves v1.1.1 behavior (refuse all dirty
117    /// drops).
118    pub force_prune: bool,
119    /// v1.2.0 Stage 1.l prep — when `true` (implies
120    /// [`SyncOptions::force_prune`]), walker Phase 2 also drops
121    /// ignored content. Hard override — the strongest level. Default
122    /// `false` preserves v1.1.1 behavior.
123    pub force_prune_with_ignored: bool,
124    /// v1.2.1 Item 5b — when `true` AND `force_prune` (or
125    /// `force_prune_with_ignored`) is set, divert Phase 2 prunes
126    /// through the snapshot-then-unlink quarantine pipeline. The
127    /// dest's full subtree is recursively copied to
128    /// `<workspace>/.grex/trash/<ISO8601>/<basename>/` BEFORE
129    /// `unlink(dest)` fires. Snapshot or audit-fsync failure aborts
130    /// the prune (no unlink). Lean theorem
131    /// `quarantine_snapshot_precedes_delete` proves the safety
132    /// contract. Default `false` preserves v1.2.0 direct-unlink
133    /// behavior. Has no effect unless one of the `force_prune*`
134    /// flags is also set (the CLI enforces this via
135    /// `requires = "force_prune"`; library callers who set this
136    /// with neither flag get a no-op since Phase 2 will not enter
137    /// the override path at all).
138    pub quarantine: bool,
139    /// v1.2.0 Stage 1.h opt-in — when `true`, the walker rewrites a
140    /// legacy v1.1.1 lockfile in place to the v1.2.0 shape. When
141    /// `false` (default), the walker errors on the legacy shape so
142    /// migration is always an explicit caller decision.
143    pub migrate_lockfile: bool,
144    /// v1.2.0 Stage 1.j prep — when `true` (default), the walker
145    /// descends into nested meta-children. `doctor --shallow` flips
146    /// this to `false` so only the immediate workspace is inspected.
147    pub recurse: bool,
148    /// v1.2.0 Stage 1.j prep — pairs with
149    /// [`SyncOptions::recurse`] for `--shallow=N`. `None` (default)
150    /// is unbounded recursion when `recurse` is `true`. `Some(n)`
151    /// caps depth at `n` levels of nesting.
152    pub max_depth: Option<usize>,
153    /// v1.2.5 — when `Some(N)`, every meta sync starts with a
154    /// best-effort GC sweep over `<meta>/.grex/trash/`, deleting
155    /// entries older than `N` days. `None` (default) preserves the
156    /// v1.2.1 indefinite-retention behavior. The CLI surfaces this
157    /// as `grex sync --retain-days N`; library callers wire it via
158    /// [`SyncOptions::with_retain_days`]. Sweep failures log via
159    /// `tracing::warn!` and DO NOT halt the sync.
160    pub retain_days: Option<u32>,
161}
162
163impl Default for SyncOptions {
164    fn default() -> Self {
165        Self {
166            dry_run: false,
167            validate: true,
168            workspace: None,
169            ref_override: None,
170            only_patterns: None,
171            force: false,
172            parallel: None,
173            // v1.2.0 Stage 1.m additions — defaults preserve v1.1.1
174            // behavior. Each field is a dormant placeholder until
175            // its corresponding walker stage wires it.
176            force_prune: false,
177            force_prune_with_ignored: false,
178            quarantine: false,
179            migrate_lockfile: false,
180            recurse: true,
181            max_depth: None,
182            retain_days: None,
183        }
184    }
185}
186
187/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
188/// Empty / absent input yields `Ok(None)` so M3's zero-config path
189/// (every pack runs) stays the default.
190fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
191    let Some(pats) = patterns else { return Ok(None) };
192    if pats.is_empty() {
193        return Ok(None);
194    }
195    let mut builder = GlobSetBuilder::new();
196    for p in pats {
197        let glob = Glob::new(p)
198            .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
199        builder.add(glob);
200    }
201    let set = builder
202        .build()
203        .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
204    Ok(Some(set))
205}
206
207impl SyncOptions {
208    /// Default options: wet-run, validators enabled, default workspace path.
209    #[must_use]
210    pub fn new() -> Self {
211        Self::default()
212    }
213
214    /// Set `dry_run`.
215    #[must_use]
216    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
217        self.dry_run = dry_run;
218        self
219    }
220
221    /// Set `validate`.
222    #[must_use]
223    pub fn with_validate(mut self, validate: bool) -> Self {
224        self.validate = validate;
225        self
226    }
227
228    /// Set `workspace` override.
229    #[must_use]
230    pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
231        self.workspace = workspace;
232        self
233    }
234
235    /// Set `ref_override` (`--ref`).
236    #[must_use]
237    pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
238        self.ref_override = ref_override;
239        self
240    }
241
242    /// Set `only_patterns` (`--only`). Empty vector or `None` disables
243    /// the filter.
244    #[must_use]
245    pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
246        self.only_patterns = patterns;
247        self
248    }
249
250    /// Set `force` (`--force`).
251    #[must_use]
252    pub fn with_force(mut self, force: bool) -> Self {
253        self.force = force;
254        self
255    }
256
257    /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
258    /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
259    #[must_use]
260    pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
261        self.parallel = parallel;
262        self
263    }
264
265    /// Set `force_prune` (`--force-prune`). See
266    /// [`SyncOptions::force_prune`] for the override matrix.
267    #[must_use]
268    pub fn with_force_prune(mut self, force_prune: bool) -> Self {
269        self.force_prune = force_prune;
270        self
271    }
272
273    /// Set `force_prune_with_ignored` (`--force-prune-with-ignored`).
274    /// See [`SyncOptions::force_prune_with_ignored`] for the override
275    /// matrix.
276    #[must_use]
277    pub fn with_force_prune_with_ignored(mut self, force_prune_with_ignored: bool) -> Self {
278        self.force_prune_with_ignored = force_prune_with_ignored;
279        self
280    }
281
282    /// Set `quarantine` (`--quarantine`). See
283    /// [`SyncOptions::quarantine`] for the snapshot-before-delete
284    /// contract. Has no effect unless [`SyncOptions::force_prune`]
285    /// or [`SyncOptions::force_prune_with_ignored`] is also set.
286    #[must_use]
287    pub fn with_quarantine(mut self, quarantine: bool) -> Self {
288        self.quarantine = quarantine;
289        self
290    }
291
292    /// Set `retain_days` (`--retain-days N`). See
293    /// [`SyncOptions::retain_days`] for the GC-sweep contract.
294    /// `None` preserves v1.2.1 indefinite-retention behavior;
295    /// `Some(N)` triggers a best-effort sweep at the start of every
296    /// meta sync.
297    #[must_use]
298    pub fn with_retain_days(mut self, retain_days: Option<u32>) -> Self {
299        self.retain_days = retain_days;
300        self
301    }
302}
303
304/// One executed (or planned) action step in a sync run.
305///
306/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
307/// plugin provenance) can land without breaking library consumers who
308/// destructure the struct.
309#[non_exhaustive]
310#[derive(Debug, Clone)]
311pub struct SyncStep {
312    /// Name of the pack that owned the action.
313    pub pack: String,
314    /// 0-based index into the pack's top-level `actions` vector.
315    pub action_idx: usize,
316    /// The [`ExecStep`] record emitted by the executor.
317    pub exec_step: ExecStep,
318}
319
320/// Outcome of a [`run`] invocation.
321///
322/// On fail-fast termination, `halted` carries the error that stopped the
323/// sync; every completed step up to that point is still in `steps` so
324/// callers can render a partial transcript.
325///
326/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
327/// can land without breaking library consumers who destructure the struct.
328#[non_exhaustive]
329#[derive(Debug)]
330pub struct SyncReport {
331    /// Fully-walked pack graph (present even on halted runs).
332    pub graph: PackGraph,
333    /// Steps produced by the executor, in execution order.
334    pub steps: Vec<SyncStep>,
335    /// `Some(e)` if execution stopped before all actions ran.
336    pub halted: Option<SyncError>,
337    /// Non-fatal manifest-append warnings (one per failed event append).
338    /// Kept as a separate field because spec marks event-log write failures
339    /// as non-aborting.
340    pub event_log_warnings: Vec<String>,
341    /// `Some(r)` when the pre-run teardown scan found orphaned backup
342    /// files or dangling [`Event::ActionStarted`] records from a prior
343    /// crashed run. Informational only — the report is still returned and
344    /// the sync proceeds. CLI renderers should surface a warning so the
345    /// operator can decide whether to run a future `grex doctor` verb.
346    pub pre_run_recovery: Option<RecoveryReport>,
347    /// One entry per child whose legacy `.grex/workspace/<name>/` layout
348    /// was relocated (or considered for relocation) on this sync. Empty
349    /// when no legacy directory was found — the common case for any
350    /// workspace built fresh on v1.1.0+. CLI renderers should surface
351    /// the list so operators see what changed.
352    pub workspace_migrations: Vec<WorkspaceMigration>,
353}
354
355/// One legacy-layout migration attempt. `outcome` distinguishes the
356/// move-succeeded case from the don't-clobber-user-data case so CLI
357/// renderers can present different advice to the operator.
358#[non_exhaustive]
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct WorkspaceMigration {
361    /// Source path under the legacy `.grex/workspace/<name>/` location,
362    /// rendered relative to the pack root for log readability.
363    pub from: PathBuf,
364    /// Destination flat-sibling path `<pack_root>/<name>/`, relative to
365    /// the pack root.
366    pub to: PathBuf,
367    /// What happened.
368    pub outcome: MigrationOutcome,
369}
370
371/// Outcome of one legacy-layout migration attempt.
372#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq, Eq)]
374pub enum MigrationOutcome {
375    /// Legacy directory was renamed onto the flat-sibling slot.
376    Migrated,
377    /// Both legacy and flat-sibling slots existed. Skipped — the user
378    /// must inspect and reconcile manually so we never silently delete
379    /// either.
380    SkippedBothExist,
381    /// Flat-sibling slot already had a non-grex file or directory in
382    /// the way. Skipped — refusing to clobber user data even when the
383    /// legacy slot is plainly the source of truth.
384    SkippedDestOccupied,
385    /// `fs::rename` failed (e.g. cross-volume, ACL denied). The legacy
386    /// directory is still in place; surfaced so the operator can move
387    /// it manually.
388    Failed { error: String },
389}
390
391/// Rich context attached to a [`SyncError::Halted`] variant.
392///
393/// Packages the pack + action position together with the underlying
394/// executor error and an optional human-readable recovery hint. Marked
395/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
396/// land without breaking `match` arms or struct destructures.
397#[non_exhaustive]
398#[derive(Debug)]
399pub struct HaltedContext {
400    /// Name of the pack that owned the halted action.
401    pub pack: String,
402    /// 0-based index into the pack's top-level `actions` vector.
403    pub action_idx: usize,
404    /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
405    pub action_name: String,
406    /// Underlying executor error.
407    pub error: ExecError,
408    /// Optional next-step suggestion for the operator. `None` when no
409    /// generic hint applies — the executor error's own `Display` already
410    /// tells the story.
411    pub recovery_hint: Option<String>,
412}
413
414/// Error taxonomy surfaced by [`run`].
415#[non_exhaustive]
416#[derive(Debug, Error)]
417pub enum SyncError {
418    /// The pack-tree walker failed (loader error, git error, cycle, …).
419    #[error("tree walk failed: {0}")]
420    Tree(#[from] TreeError),
421    /// One or more plan-phase validators flagged the graph.
422    #[error("validation failed: {errors:?}")]
423    Validation {
424        /// Aggregated errors from manifest-level + graph-level validators.
425        errors: Vec<PackValidationError>,
426    },
427    /// An action executor returned an error.
428    ///
429    /// Retained for backward compatibility; new call sites should prefer
430    /// [`SyncError::Halted`] which carries full pack + action context.
431    /// Kept non-deprecated because [`From<ExecError>`] still materialises
432    /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
433    #[error("action execution failed: {0}")]
434    Exec(#[from] ExecError),
435    /// Action execution halted; full context (pack, action index, error,
436    /// optional recovery hint) lives in [`HaltedContext`]. This is the
437    /// variant the sync driver emits — [`SyncError::Exec`] is only
438    /// surfaced by ancillary code paths.
439    #[error(
440        "sync halted at pack `{}` action #{} ({}): {}",
441        .0.pack, .0.action_idx, .0.action_name, .0.error
442    )]
443    Halted(Box<HaltedContext>),
444    /// Another `grex` process (or thread) already holds the workspace-level
445    /// lock. The running sync refused to start to avoid racing two concurrent
446    /// walkers into the same workspace. If the lock file at `lock_path` is
447    /// stale (no other grex is actually running), remove it by hand.
448    #[error(
449        "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
450    )]
451    WorkspaceBusy {
452        /// Resolved workspace directory that the current run tried to lock.
453        workspace: PathBuf,
454        /// Sidecar lock file that is currently held.
455        lock_path: PathBuf,
456    },
457    /// Reading or parsing the resolved-state lockfile failed. Surfaced as
458    /// its own variant (rather than folded into `Validation`) because a
459    /// corrupt / unreadable lockfile is an I/O or schema fault, not a
460    /// dependency-satisfaction fault. Resolution is operator-level
461    /// (restore a backup, delete the file, re-sync), not author-level.
462    #[error("lockfile `{path}` failed to load: {source}")]
463    Lockfile {
464        /// Lockfile path that failed to load.
465        path: PathBuf,
466        /// Underlying lockfile error.
467        #[source]
468        source: LockfileError,
469    },
470    /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
471    /// as its own variant so the CLI can map it to a dedicated usage
472    /// error exit code instead of the generic sync-failure bucket.
473    #[error("invalid --only glob `{pattern}`: {source}")]
474    InvalidOnlyGlob {
475        /// The raw pattern string that failed to compile.
476        pattern: String,
477        /// Underlying globset error.
478        #[source]
479        source: globset::Error,
480    },
481    /// Migrating the v1.x event log (`grex.jsonl`) to the v2 canonical
482    /// path (`.grex/events.jsonl`) failed. Operator-level resolution
483    /// (check filesystem permissions, free disk space, then retry).
484    #[error("event-log migration failed: {0}")]
485    EventLogMigration(#[source] crate::manifest::ManifestError),
486    /// Cooperative cancellation fired (Ctrl-C / SIGTERM) during a
487    /// parallel sync. v1.2.0 Stage 1.g wires the rayon walker to surface
488    /// this distinct-from-failure variant so the CLI can exit with a
489    /// dedicated cancellation code instead of a generic sync error.
490    /// Dormant until Stage 1.g — the existing CLI does not yet emit it.
491    #[error("sync cancelled by user")]
492    SchedulerCancelled,
493}
494
495impl Clone for SyncError {
496    fn clone(&self) -> Self {
497        // `TreeError` / `ExecError` do not implement `Clone` (they wrap
498        // `std::io::Error`-adjacent values). Halts carry only a display
499        // rendering in the report; we re-materialise via a synthetic
500        // `Validation` variant so `SyncReport` can be `Clone`-safe for
501        // observability tooling without widening the taxonomy.
502        match self {
503            Self::Tree(e) => Self::Validation {
504                errors: vec![PackValidationError::DependsOnUnsatisfied {
505                    pack: "<tree>".into(),
506                    required: e.to_string(),
507                }],
508            },
509            Self::Validation { errors } => Self::Validation { errors: errors.clone() },
510            Self::Exec(e) => Self::Validation {
511                errors: vec![PackValidationError::DependsOnUnsatisfied {
512                    pack: "<exec>".into(),
513                    required: e.to_string(),
514                }],
515            },
516            Self::Halted(ctx) => Self::Validation {
517                errors: vec![PackValidationError::DependsOnUnsatisfied {
518                    pack: ctx.pack.clone(),
519                    required: format!(
520                        "action #{} ({}): {}",
521                        ctx.action_idx, ctx.action_name, ctx.error
522                    ),
523                }],
524            },
525            Self::WorkspaceBusy { workspace, lock_path } => {
526                Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
527            }
528            Self::Lockfile { path, source } => Self::Validation {
529                errors: vec![PackValidationError::DependsOnUnsatisfied {
530                    pack: "<lockfile>".into(),
531                    required: format!("{}: {source}", path.display()),
532                }],
533            },
534            Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
535                errors: vec![PackValidationError::DependsOnUnsatisfied {
536                    pack: "<only-glob>".into(),
537                    required: format!("{pattern}: {source}"),
538                }],
539            },
540            Self::EventLogMigration(source) => Self::Validation {
541                errors: vec![PackValidationError::DependsOnUnsatisfied {
542                    pack: "<event-log-migration>".into(),
543                    required: source.to_string(),
544                }],
545            },
546            Self::SchedulerCancelled => Self::SchedulerCancelled,
547        }
548    }
549}
550
551/// Run a full sync over the pack tree rooted at `pack_root`.
552///
553/// Resolution rules:
554/// * If `pack_root` is a directory the walker looks for
555///   `<pack_root>/.grex/pack.yaml`.
556/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
557/// * Workspace defaults to the pack root directory itself when
558///   `opts.workspace` is `None`. Children resolve as flat siblings of the
559///   parent pack root (since v1.1.0).
560///
561/// # Errors
562///
563/// Returns the first error that halts the pipeline — see [`SyncError`] for
564/// the taxonomy.
565///
566/// `cancel` is the cooperative cancellation handle threaded through the
567/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
568/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
569/// acquire). CLI callers pass a never-cancelled sentinel
570/// (`CancellationToken::new()`); the MCP server passes a token tied to
571/// the request lifetime.
572pub fn run(
573    pack_root: &Path,
574    opts: &SyncOptions,
575    cancel: &CancellationToken,
576) -> Result<SyncReport, SyncError> {
577    // Stage 2 is signature-only — silence "unused parameter" without
578    // hiding it behind `_` (downstream stages will read it).
579    let _ = cancel;
580    let workspace = prepare_workspace(pack_root, opts)?;
581    // v1.3.1 (B4) — `dry_run = true` is contractually FS-mutation-free.
582    // `open_workspace_lock` (via `ScopedLock::open`) creates a sidecar
583    // file at `<workspace>/.grex.sync.lock`, which would itself violate
584    // the no-FS-mutation contract. Skip lock acquisition entirely in
585    // dry-run; the dry-run path is read-only by construction so
586    // concurrent dry-runs against the same workspace are safe.
587    let mut ws_lock_holder =
588        if !opts.dry_run { Some(open_workspace_lock(&workspace)?) } else { None };
589    let _ws_guard = try_acquire_workspace_guard(ws_lock_holder.as_mut(), &workspace)?;
590
591    // Compile `--only` patterns into a GlobSet here so the
592    // `globset` crate version does not leak into `SyncOptions`.
593    let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
594
595    // Auto-migrate legacy `.grex/workspace/<name>/` layout BEFORE the
596    // walker resolves children. Idempotent: a fresh v1.1.0+ workspace
597    // sees no legacy directory and the function no-ops.
598    let workspace_migrations = migrate_legacy_workspace(pack_root);
599
600    // v1.2.1 path (iii) — three-stage composition:
601    //   sync_meta(workspace, prune_candidates) — mutate (rayon parallel)
602    //   build_graph(workspace)                 — read-only graph
603    //   run_actions(graph)                     — consume graph
604    // `Walker::walk` is retired from the prod path; the symbol is kept
605    // for test-suite compat. See `crates/grex-core/src/tree/graph_build.rs`.
606    run_sync_meta(&workspace, opts)?;
607    let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
608    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
609    log_force_flag(opts.force);
610
611    let mut report = SyncReport {
612        graph,
613        steps: Vec::new(),
614        halted: None,
615        event_log_warnings: Vec::new(),
616        pre_run_recovery: prep.pre_run_recovery,
617        workspace_migrations,
618    };
619
620    let mut next_lock = prep.prior_lock.clone();
621    // feat-m6 B1: resolve `--parallel` once and build the scheduler
622    // shared across every `ExecCtx` in this run. Library callers who
623    // leave `opts.parallel == None` default to `num_cpus::get()` here
624    // (clamped `>= 1`) so the scheduler slot is always populated —
625    // `ctx.scheduler` being `None` would strand acquire-sites into
626    // unbounded concurrency. See `.omne/cfg/concurrency.md` §Scheduler.
627    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
628    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
629    run_actions(
630        &mut report,
631        &prep.order,
632        &prep.vars,
633        &workspace,
634        &prep.event_log,
635        &prep.lock_path,
636        opts.dry_run,
637        &prep.prior_lock,
638        &mut next_lock,
639        &prep.registry,
640        &prep.pack_type_registry,
641        only_set.as_ref(),
642        opts.force,
643        resolved_parallel,
644        &scheduler,
645    );
646
647    persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
648    Ok(report)
649}
650
651/// Bag of context pieces assembled once at the top of [`run`]. Grouping
652/// them keeps [`run`] under the workspace's 50-LOC function lint without
653/// smearing the read of sequential setup across helpers. Fields are
654/// consumed piecemeal by the actions loop; no getters needed.
655struct RunContext {
656    order: Vec<usize>,
657    vars: VarEnv,
658    event_log: PathBuf,
659    lock_path: PathBuf,
660    lockfile_path: PathBuf,
661    prior_lock: std::collections::HashMap<String, LockEntry>,
662    registry: Arc<Registry>,
663    pack_type_registry: Arc<PackTypeRegistry>,
664    pre_run_recovery: Option<RecoveryReport>,
665}
666
667/// Build the per-run context: traversal order, vars env, event/lockfile
668/// paths, prior lockfile state, bootstrap registry, and (optionally) a
669/// pre-run recovery scan. Kept narrow so [`run`] stays small.
670///
671/// `workspace` is the resolved workspace directory (post `--workspace`
672/// override) so the recovery scan looks for `.grex.bak` artefacts under
673/// the actual on-disk location children were materialised at — not
674/// under the pack root, which differs from the workspace whenever the
675/// CLI's `--workspace` flag is used. Pre-fix this anchoring drift
676/// caused recovery scans to miss every backup left under an override
677/// workspace.
678fn prepare_run_context(
679    pack_root: &Path,
680    graph: &PackGraph,
681    workspace: &Path,
682) -> Result<RunContext, SyncError> {
683    let event_log = event_log_path(pack_root);
684    let lock_path = event_lock_path(&event_log);
685    let vars = VarEnv::from_os();
686    let order = post_order(graph);
687    let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
688    let lockfile_path = lockfile_path(pack_root);
689    let prior_lock = load_prior_lock(&lockfile_path)?;
690    let registry = Arc::new(Registry::bootstrap());
691    let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
692    Ok(RunContext {
693        order,
694        vars,
695        event_log,
696        lock_path,
697        lockfile_path,
698        prior_lock,
699        registry,
700        pack_type_registry,
701        pre_run_recovery,
702    })
703}
704
705/// Build the [`PackTypeRegistry`] the sync driver threads into every
706/// [`ExecCtx`] it constructs.
707///
708/// Default path (no `plugin-inventory` feature) hard-codes the three
709/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
710/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
711/// externally-submitted plugin types (mirroring the M4-E pattern for
712/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
713/// helper so the `#[cfg]` split lives in one place instead of being
714/// smeared across every sync call-site.
715fn bootstrap_pack_type_registry() -> PackTypeRegistry {
716    #[cfg(feature = "plugin-inventory")]
717    {
718        let mut reg = PackTypeRegistry::bootstrap();
719        reg.register_from_inventory();
720        reg
721    }
722    #[cfg(not(feature = "plugin-inventory"))]
723    {
724        PackTypeRegistry::bootstrap()
725    }
726}
727
728/// Emit a single `tracing::info!` line when `--force` is active so
729/// operators can confirm from logs that the skip short-circuit was
730/// bypassed. Extracted so [`run`] stays small.
731fn log_force_flag(force: bool) {
732    if force {
733        tracing::info!(
734            target: "grex::sync",
735            "--force active: bypassing lockfile skip-on-hash short-circuit"
736        );
737    }
738}
739
740/// v1.2.1 path (iii) — drive the v1.2.0 [`sync_meta`] walker over the
741/// resolved canonical workspace.
742///
743/// This is the SOLE mutating pass in `sync::run`: clones, fetches,
744/// prune dispatches, distributed-lockfile reads, and TOCTOU `BoundedDir`
745/// opens all happen here. The subsequent [`build_and_validate_graph`]
746/// pass is read-only against the disk state this fn leaves behind.
747///
748/// `prune_candidates` is computed from the per-meta lockfile orphans:
749/// every entry in `<workspace>/.grex/grex.lock.jsonl` whose `path` no
750/// longer appears in the live root manifest's `children[]` is fed into
751/// Phase 2 for dispatch (with `--force-prune` / `--force-prune-with-ignored`
752/// overrides honoured by the consent walk). This closes the
753/// "prune-inert" gap from the previous wiring, where `sync::run` passed
754/// `&[]` and `--force-prune` was a CLI flag with no behavioural reach.
755///
756/// `--workspace` semantics: the canonical `workspace` argument is what
757/// `sync_meta` uses as its `meta_dir`. Children land at
758/// `<workspace>/<child.path>` — the v1.2.0 parent-relative model. Prior
759/// to v1.2.1, callers passing `--workspace` skipped the precursor
760/// entirely; that bypass is retired here so override callers see the
761/// same v1.2.0 semantics as the default-cwd path.
762///
763/// `SyncOptions::parallel` mapping (mirrors [`SyncMetaOptions::parallel`]
764/// with the documented `Some(0)` carve-out):
765/// * `None` → `SyncMetaOptions::parallel = None` (rayon default =
766///   `num_cpus::get()`).
767/// * `Some(0)` → `SyncMetaOptions::parallel = None` (the CLI sentinel
768///   for "unbounded" maps to rayon's default; `Some(0)` would be
769///   clamped to `1` inside `build_pool`, which is not what callers
770///   asking for unbounded want).
771/// * `Some(n)` for `n >= 1` → `SyncMetaOptions::parallel = Some(n)`.
772fn run_sync_meta(workspace: &Path, opts: &SyncOptions) -> Result<(), SyncError> {
773    let loader = FsPackLoader::new();
774    let backend = GixBackend::new();
775    let parallel = match opts.parallel {
776        None | Some(0) => None,
777        Some(n) => Some(n),
778    };
779    // v1.2.1 Item 5b — resolve the quarantine config relative to the
780    // canonical workspace (the same `meta_dir` `sync_meta` runs on).
781    // Trash bucket lives at `<workspace>/.grex/trash/`; audit log at
782    // `<workspace>/.grex/events.jsonl` — same path the existing
783    // `ForcePruneExecuted` event uses.
784    let quarantine = opts.quarantine.then(|| crate::tree::QuarantineConfig {
785        trash_root: workspace.join(".grex").join("trash"),
786        audit_log: crate::manifest::event_log_path(workspace),
787    });
788    // v1.2.5 — thread `--retain-days N` into the per-meta options so
789    // every recursion frame swept its own trash bucket. `None` skips
790    // the GC entirely (v1.2.1 indefinite-retention).
791    let retention =
792        opts.retain_days.map(|retain_days| crate::tree::RetentionConfig { retain_days });
793    let meta_opts = SyncMetaOptions {
794        ref_override: opts.ref_override.clone(),
795        recurse: opts.recurse,
796        max_depth: opts.max_depth,
797        force_prune: opts.force_prune,
798        force_prune_with_ignored: opts.force_prune_with_ignored,
799        parallel,
800        quarantine,
801        retention,
802        // v1.3.1 (B4) — propagate the orchestrator's dry-run flag into
803        // the walker so Phase 1 skips clone/fetch and emits the
804        // would-clone records into `SyncMetaReport::dry_run_would_clone`
805        // instead. The orchestrator already gates lockfile persist via
806        // `persist_lockfile_if_clean`; this wires the walker side.
807        dry_run: opts.dry_run,
808    };
809    let prune_candidates = compute_prune_candidates(workspace, &loader);
810    let report = sync_meta(workspace, &backend, &loader, &meta_opts, &prune_candidates)?;
811    if let Some(first) = report.errors.into_iter().next() {
812        return Err(SyncError::Tree(first));
813    }
814    Ok(())
815}
816
817/// v1.2.1 path (iii) — orphan-prune candidate computation.
818///
819/// Reads `<workspace>/.grex/grex.lock.jsonl` and the root manifest;
820/// returns every lockfile entry path that no longer matches a declared
821/// child in `manifest.children`. Empty in three cases:
822///
823/// * No lockfile (fresh workspace, never synced).
824/// * No manifest at `<workspace>/.grex/pack.yaml` (single-node tree —
825///   `sync_meta` will surface its own diagnostic).
826/// * Lockfile entries are all still declared (steady-state sync).
827///
828/// Lockfile read errors are tolerated as `Vec::new()`: the prune pass
829/// is opportunistic, and a corrupt lockfile is the migrator's concern,
830/// not the prune dispatcher's. Manifest read errors are similarly
831/// tolerated — `sync_meta` will fail loudly on the same condition,
832/// giving the operator a single unambiguous error surface.
833fn compute_prune_candidates(
834    workspace: &Path,
835    loader: &dyn crate::tree::PackLoader,
836) -> Vec<PathBuf> {
837    use crate::lockfile::read_meta_lockfile;
838    let entries = match read_meta_lockfile(workspace) {
839        Ok(e) => e,
840        Err(_) => return Vec::new(),
841    };
842    if entries.is_empty() {
843        return Vec::new();
844    }
845    let manifest = match loader.load(workspace) {
846        Ok(m) => m,
847        Err(_) => return Vec::new(),
848    };
849    let declared: std::collections::HashSet<String> =
850        manifest.children.iter().map(crate::pack::ChildRef::effective_path).collect();
851    entries
852        .into_iter()
853        .filter(|e| !declared.contains(&e.path))
854        .map(|e| PathBuf::from(e.path))
855        .collect()
856}
857
858/// v1.2.1 path (iii) — read-only graph build + plan-phase validation.
859///
860/// Builds the [`PackGraph`] from the on-disk meta tree rooted at
861/// `workspace`. Replaces the legacy `walk_and_validate` (which used
862/// [`crate::tree::Walker::walk`] and re-issued every clone/fetch as a
863/// no-op probe) with the v1.2.1 split:
864///
865/// * The mutating half ran in [`run_sync_meta`] — all clones, fetches,
866///   prune dispatches, and TOCTOU `BoundedDir` opens already happened.
867/// * THIS pass is strictly READ-ONLY. It walks the manifest tree
868///   parent-relatively (matching what `sync_meta` placed on disk),
869///   loads each child's `pack.yaml` (or synthesises a plain-git leaf),
870///   probes `head_sha`, and produces the [`PackGraph`] consumed by
871///   [`run_actions`].
872///
873/// Plan-phase validators run against the assembled graph when
874/// `validate` is true.
875fn build_and_validate_graph(
876    workspace: &Path,
877    validate: bool,
878    ref_override: Option<&str>,
879) -> Result<PackGraph, SyncError> {
880    let loader = FsPackLoader::new();
881    let backend = GixBackend::new();
882    let graph = build_graph(workspace, &backend, &loader, ref_override)?;
883    if validate {
884        validate_graph(&graph)?;
885    }
886    Ok(graph)
887}
888
889/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
890/// empty map; parse errors are fatal since writes are atomic and a torn
891/// lockfile therefore indicates real corruption that must be resolved
892/// before a fresh sync is safe. Parse/IO failures surface as
893/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
894/// dependency-satisfaction fault, so it gets its own taxonomy slot.
895fn load_prior_lock(
896    lockfile_path: &Path,
897) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
898    read_lockfile(lockfile_path)
899        .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
900}
901
902/// Persist `next_lock` atomically to `lockfile_path` whenever this was
903/// not a dry-run. On a halt the map has already had the halted pack's
904/// entry removed (see `run_actions`), so persisting now preserves every
905/// *successful* pack's fresh entry while guaranteeing absence of an
906/// entry for the halted pack — next sync sees no prior hash there and
907/// re-executes from scratch (route (b) halt-state gating). Write errors
908/// surface as non-fatal warnings on the report.
909fn persist_lockfile_if_clean(
910    report: &mut SyncReport,
911    lockfile_path: &Path,
912    next_lock: &std::collections::HashMap<String, LockEntry>,
913    dry_run: bool,
914) {
915    if dry_run {
916        return;
917    }
918    if let Err(e) = write_lockfile(lockfile_path, next_lock) {
919        tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
920        report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
921    }
922}
923
924/// Canonical location of the resolved-state lockfile
925/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
926/// so both audit artifacts live under a single `.grex/` sidecar.
927fn lockfile_path(pack_root: &Path) -> PathBuf {
928    pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
929}
930
931/// Create the workspace directory if it does not yet exist.
932fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
933    if !workspace.exists() {
934        std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
935            errors: vec![PackValidationError::DependsOnUnsatisfied {
936                pack: "<workspace>".into(),
937                required: format!("{}: {e}", workspace.display()),
938            }],
939        })?;
940    }
941    Ok(())
942}
943
944/// Open (but do not acquire) the workspace-level lock file.
945fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
946    let ws_lock_path = workspace_lock_path(workspace);
947    let ws_lock = ScopedLock::open(&ws_lock_path)
948        .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
949    Ok((ws_lock, ws_lock_path))
950}
951
952/// Try-acquire the workspace lock guard when the holder is `Some`.
953/// Returns `Ok(None)` when the holder is `None` (e.g. dry-run path skips
954/// lock acquisition entirely; see Blocker B4 v1.3.1). Translates the
955/// busy/error outcomes into the shared [`SyncError`] taxonomy. Extracted
956/// from [`run`] / [`teardown`] to keep both verb entry-points under the
957/// `clippy::too-many-lines` limit while preserving the original lock
958/// semantics.
959fn try_acquire_workspace_guard<'a>(
960    holder: Option<&'a mut (ScopedLock, PathBuf)>,
961    workspace: &Path,
962) -> Result<Option<fd_lock::RwLockWriteGuard<'a, std::fs::File>>, SyncError> {
963    let Some((ws_lock, ws_lock_path)) = holder else {
964        return Ok(None);
965    };
966    match ws_lock.try_acquire() {
967        Ok(Some(g)) => Ok(Some(g)),
968        Ok(None) => Err(SyncError::WorkspaceBusy {
969            workspace: workspace.to_path_buf(),
970            lock_path: ws_lock_path.clone(),
971        }),
972        Err(e) => Err(workspace_lock_err(ws_lock_path, &e.to_string())),
973    }
974}
975
976/// Build a `Validation` error describing a workspace-lock failure.
977fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
978    SyncError::Validation {
979        errors: vec![PackValidationError::DependsOnUnsatisfied {
980            pack: "<workspace-lock>".into(),
981            required: format!("{}: {reason}", ws_lock_path.display()),
982        }],
983    }
984}
985
986/// Single source of truth for the legacy workspace directory name.
987/// Pre-`v1.1.0` `resolve_workspace` joined `.grex/workspace/` onto the
988/// pack root by default; the auto-migration in
989/// [`migrate_legacy_workspace`] is the only place that legacy literal
990/// is allowed to appear in `crates/grex-core/src/`. The grep gate in
991/// the v1.1.0 release checklist allows this one constant.
992const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
993
994/// Auto-migrate any legacy `.grex/workspace/<name>/` child layout left
995/// over from v1.0.x to the v1.1.0 flat-sibling layout. Idempotent: a
996/// fresh workspace built on v1.1.0+ sees no `.grex/workspace/`
997/// directory and the function no-ops.
998///
999/// Per-child outcomes:
1000///
1001/// * **Both legacy + flat-sibling exist** → `SkippedBothExist`. The
1002///   user needs to inspect (perhaps the legacy is stale, perhaps it is
1003///   the source of truth); we never silently delete either.
1004/// * **Flat-sibling slot occupied by a non-grex file or non-empty dir**
1005///   → `SkippedDestOccupied`. Refuse to clobber user data.
1006/// * **Legacy exists, flat-sibling absent** → `Migrated` via atomic
1007///   `fs::rename`. Same-volume move is the common case (the migration
1008///   stays inside `pack_root`); cross-volume failures surface as
1009///   `Failed { error }` with the OS message so the operator can move
1010///   manually.
1011/// * **Legacy absent** → silent no-op (not recorded in the report).
1012///
1013/// After all per-child decisions: orphan `.grex.sync.lock` under the
1014/// legacy workspace is removed (best-effort) and the empty
1015/// `.grex/workspace/` directory is rmdir'd (best-effort). Both are
1016/// soft-failures: leaving them on disk is harmless, surfacing the
1017/// errors as a sync abort would be over-strict.
1018///
1019/// Discovery is by directory listing, not by parent-manifest parse —
1020/// migration must work even when the parent manifest itself was
1021/// rewritten between versions. A child counts as "legacy" iff
1022/// `<pack_root>/<LEGACY_WORKSPACE_DIR>/<name>/.git` exists (i.e. it is
1023/// an actual git working tree, not stray metadata).
1024fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
1025    let root = pack_root_dir(pack_root);
1026    let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
1027    if !legacy_root.is_dir() {
1028        return Vec::new();
1029    }
1030    let entries = match fs::read_dir(&legacy_root) {
1031        Ok(e) => e,
1032        Err(e) => {
1033            tracing::warn!(
1034                target: "grex::sync::migrate",
1035                "legacy workspace `{}` unreadable: {e}",
1036                legacy_root.display(),
1037            );
1038            return Vec::new();
1039        }
1040    };
1041    let mut migrations = Vec::new();
1042    for entry_result in entries {
1043        let entry = match entry_result {
1044            Ok(e) => e,
1045            Err(e) => {
1046                tracing::warn!(
1047                    target: "grex::sync::migrate",
1048                    "skipping unreadable entry under `{}`: {e}",
1049                    legacy_root.display(),
1050                );
1051                continue;
1052            }
1053        };
1054        let Ok(ft) = entry.file_type() else { continue };
1055        // file_type avoids symlink-following; legitimate v1.0.x children
1056        // were always real directories, so anything else is skipped.
1057        if ft.is_symlink() || !ft.is_dir() {
1058            continue;
1059        }
1060        let name_os = entry.file_name();
1061        let Some(name) = name_os.to_str() else { continue };
1062        // Only act on entries that look like real cloned children (have
1063        // a `.git`). The legacy workspace lock file (`.grex.sync.lock`)
1064        // is not a directory and is filtered out by the dir check above;
1065        // we clean it up explicitly after the migration loop completes.
1066        let from_abs = entry.path();
1067        if !from_abs.join(".git").exists() {
1068            continue;
1069        }
1070        let to_abs = root.join(name);
1071        let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
1072        let to_rel = PathBuf::from(name);
1073        let outcome = decide_and_migrate(&from_abs, &to_abs);
1074        log_migration(&from_rel, &to_rel, &outcome);
1075        migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
1076    }
1077    cleanup_legacy_workspace_root(&legacy_root);
1078    migrations
1079}
1080
1081/// Decide what to do with one legacy child + perform the move when
1082/// safe. Returns the outcome to record on the [`WorkspaceMigration`].
1083fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
1084    let dest_exists = to.exists();
1085    let dest_is_grex_repo = dest_exists && to.join(".git").exists();
1086    if dest_is_grex_repo {
1087        // Both legacy and flat-sibling are git repos. Refuse to choose
1088        // between them; let the user resolve.
1089        return MigrationOutcome::SkippedBothExist;
1090    }
1091    if dest_exists {
1092        // Some other entry occupies the flat-sibling slot — a stray
1093        // file, an empty dir, an unrelated dir. Treat as user data and
1094        // leave both in place.
1095        return MigrationOutcome::SkippedDestOccupied;
1096    }
1097    match fs::rename(from, to) {
1098        Ok(()) => MigrationOutcome::Migrated,
1099        Err(e) => MigrationOutcome::Failed { error: e.to_string() },
1100    }
1101}
1102
1103/// Emit one structured log line per migration so users see exactly what
1104/// happened during the upgrade. Severity matches outcome: success is
1105/// `info`, skips and failures are `warn` so they surface in the default
1106/// log level without forcing operators to crank verbosity.
1107fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
1108    let from_disp = from.display();
1109    let to_disp = to.display();
1110    match outcome {
1111        MigrationOutcome::Migrated => {
1112            tracing::info!(
1113                target: "grex::sync::migrate",
1114                "migrated: legacy={from_disp} -> new={to_disp}",
1115            );
1116        }
1117        MigrationOutcome::SkippedBothExist => {
1118            tracing::warn!(
1119                target: "grex::sync::migrate",
1120                "skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
1121            );
1122        }
1123        MigrationOutcome::SkippedDestOccupied => {
1124            tracing::warn!(
1125                target: "grex::sync::migrate",
1126                "skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
1127            );
1128        }
1129        MigrationOutcome::Failed { error } => {
1130            tracing::warn!(
1131                target: "grex::sync::migrate",
1132                "failed: legacy={from_disp} -> new={to_disp}: {error}",
1133            );
1134        }
1135    }
1136}
1137
1138/// Best-effort cleanup of the legacy workspace root after migration:
1139/// remove the orphan `.grex.sync.lock` (always safe — the v1.1.0
1140/// workspace lock lives at `<pack_root>/.grex.sync.lock`) and try to
1141/// rmdir the now-empty `.grex/workspace/` directory. Errors are logged
1142/// at trace level only — both leftovers are harmless.
1143fn cleanup_legacy_workspace_root(legacy_root: &Path) {
1144    let orphan_lock = legacy_root.join(".grex.sync.lock");
1145    if orphan_lock.exists() {
1146        if let Err(e) = fs::remove_file(&orphan_lock) {
1147            tracing::warn!(
1148                target: "grex::sync::migrate",
1149                "could not remove orphan lock `{}`: {e}",
1150                orphan_lock.display(),
1151            );
1152        } else {
1153            tracing::info!(
1154                target: "grex::sync::migrate",
1155                "removed orphan lock `{}`",
1156                orphan_lock.display(),
1157            );
1158        }
1159    }
1160    // `remove_dir` only succeeds when the directory is empty — exactly
1161    // what we want; if any unmigrated child remains, the legacy root
1162    // stays put for the operator to inspect.
1163    let _ = fs::remove_dir(legacy_root);
1164}
1165
1166/// Compute the default workspace path when `override_` is absent.
1167///
1168/// The default is the pack root directory itself, so child packs
1169/// resolve as flat siblings of the parent pack root. The rationale —
1170/// alignment with the long-standing pack-spec rule that
1171/// `children[].path` is a bare name — lives in the pack-spec
1172/// "Validation rules" section (`man/concepts/pack-spec.md` /
1173/// `grex-doc/src/concepts/pack-spec.md`).
1174/// v1.2.1 path (iii) — resolve the workspace anchor with canonical
1175/// symlink resolution.
1176///
1177/// Resolution rules:
1178/// * `override_ = None` ⇒ derive workspace from `pack_root_dir(pack_root)`.
1179///   No canonicalize on this branch — the pack-root path was supplied
1180///   directly by the caller and may legitimately reference a not-yet-real
1181///   directory (e.g. integration fixtures that lazily materialise the
1182///   pack root).
1183/// * `override_ = Some(path)`:
1184///   1. **Must-exist** check. A `--workspace` override pointing at a
1185///      non-existent directory is a fail-fast error (we won't silently
1186///      `mkdir -p` someone else's typo).
1187///   2. **Canonicalise.** Resolve symlinks to a real path. This is the
1188///      anchor every downstream pass (`sync_meta`, `build_graph`, the
1189///      lockfile reads, the TOCTOU `BoundedDir` opens) hangs off — they
1190///      MUST agree on a single inode-stable string.
1191///   3. **Log when input != canonical.** Surfaces symlink resolution to
1192///      operators so they can correlate workspace-busy diagnostics with
1193///      what the OS actually opened.
1194fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> Result<PathBuf, SyncError> {
1195    let Some(input) = override_ else {
1196        return Ok(pack_root_dir(pack_root));
1197    };
1198    if !input.exists() {
1199        return Err(SyncError::Validation {
1200            errors: vec![PackValidationError::DependsOnUnsatisfied {
1201                pack: "<workspace>".into(),
1202                required: format!("--workspace {}: directory does not exist", input.display()),
1203            }],
1204        });
1205    }
1206    let canonical = match input.canonicalize() {
1207        Ok(p) => p,
1208        Err(e) => {
1209            return Err(SyncError::Validation {
1210                errors: vec![PackValidationError::DependsOnUnsatisfied {
1211                    pack: "<workspace>".into(),
1212                    required: format!("--workspace {}: canonicalize failed: {e}", input.display()),
1213                }],
1214            });
1215        }
1216    };
1217    if canonical != input {
1218        tracing::info!(
1219            target: "grex::sync",
1220            "workspace: {} → {}",
1221            input.display(),
1222            canonical.display(),
1223        );
1224    }
1225    Ok(canonical)
1226}
1227
1228/// Resolve the workspace, ensure the directory exists, and run the v1→v2
1229/// event-log migration. Extracted so [`run`] and [`teardown`] stay under
1230/// the workspace's 50-LOC per-function lint threshold.
1231fn prepare_workspace(pack_root: &Path, opts: &SyncOptions) -> Result<PathBuf, SyncError> {
1232    let workspace = resolve_workspace(pack_root, opts.workspace.as_deref())?;
1233    ensure_workspace_dir(&workspace)?;
1234    crate::manifest::ensure_event_log_migrated(&workspace).map_err(SyncError::EventLogMigration)?;
1235    Ok(workspace)
1236}
1237
1238/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
1239fn pack_root_dir(pack_root: &Path) -> PathBuf {
1240    let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
1241    if is_yaml {
1242        pack_root
1243            .parent()
1244            .and_then(Path::parent)
1245            .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
1246    } else {
1247        pack_root.to_path_buf()
1248    }
1249}
1250
1251/// Compute the `.grex/events.jsonl` path next to the pack root.
1252///
1253/// Delegates to [`crate::manifest::event_log_path`] (single source of
1254/// truth for the canonical event-log location).
1255fn event_log_path(pack_root: &Path) -> PathBuf {
1256    crate::manifest::event_log_path(&pack_root_dir(pack_root))
1257}
1258
1259/// Compute the sidecar lock path next to the event log. One canonical slot
1260/// per pack root — cooperating grex procs serialize through this file.
1261fn event_lock_path(event_log: &Path) -> PathBuf {
1262    event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
1263}
1264
1265/// Compute the sidecar lock path for the workspace itself. Lives at
1266/// `<workspace>/.grex.sync.lock` — the workspace dir is already created by
1267/// the `run()` prologue, so the lock sidecar lands beside the child clones.
1268fn workspace_lock_path(workspace: &Path) -> PathBuf {
1269    workspace.join(".grex.sync.lock")
1270}
1271
1272/// Aggregate manifest-level + graph-level validators and return their output.
1273fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
1274    let mut errors: Vec<PackValidationError> = Vec::new();
1275    for node in graph.nodes() {
1276        if let Err(mut e) = node.manifest.validate_plan() {
1277            errors.append(&mut e);
1278        }
1279    }
1280    if let Err(mut e) = graph.validate() {
1281        errors.append(&mut e);
1282    }
1283    if errors.is_empty() {
1284        Ok(())
1285    } else {
1286        Err(SyncError::Validation { errors })
1287    }
1288}
1289
1290/// Depth-first post-order traversal of the graph starting from root.
1291///
1292/// Children fully precede their parent in the returned vector so downstream
1293/// executors install leaves first and the root last.
1294fn post_order(graph: &PackGraph) -> Vec<usize> {
1295    let mut out = Vec::with_capacity(graph.nodes().len());
1296    visit_post(graph, 0, &mut out);
1297    out
1298}
1299
1300fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
1301    // Collect child ids first to avoid borrow conflicts with graph iteration.
1302    let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
1303    for k in kids {
1304        visit_post(graph, k, out);
1305    }
1306    out.push(id);
1307}
1308
1309/// Drive every action for every node; abort on the first [`ExecError`].
1310///
1311/// Each action is bracketed by three manifest events:
1312/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
1313/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
1314/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
1315///
1316/// All three writes go through the same [`ManifestLock`]-wrapped path
1317/// ([`append_manifest_event`]) and failures are recorded as non-fatal
1318/// warnings so the executor's outcome always dominates. The third append
1319/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
1320/// with the exact action that halted.
1321// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
1322// now pushes past the 50-LOC per-function lint by one line. Silence
1323// that one — the body itself is unchanged in scope.
1324#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1325fn run_actions(
1326    report: &mut SyncReport,
1327    order: &[usize],
1328    vars: &VarEnv,
1329    workspace: &Path,
1330    event_log: &Path,
1331    lock_path: &Path,
1332    dry_run: bool,
1333    prior_lock: &std::collections::HashMap<String, LockEntry>,
1334    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1335    registry: &Arc<Registry>,
1336    pack_type_registry: &Arc<PackTypeRegistry>,
1337    only: Option<&GlobSet>,
1338    force: bool,
1339    parallel: usize,
1340    scheduler: &Arc<Scheduler>,
1341) {
1342    let plan = PlanExecutor::with_registry(registry.clone());
1343    let fs = FsExecutor::with_registry(registry.clone());
1344    let rt = build_pack_type_runtime(parallel);
1345    let visited_meta = new_visited_meta();
1346    for &id in order {
1347        let Some(node) = report.graph.node(id) else { continue };
1348        let pack_name = node.name.clone();
1349        let pack_path = node.path.clone();
1350        let actions = node.manifest.actions.clone();
1351        let manifest = node.manifest.clone();
1352        let commit_sha = node.commit_sha.clone().unwrap_or_default();
1353        let synthetic = node.synthetic;
1354        // v1.3.1 B14: parent manifest's `ref:` value for this node,
1355        // captured by the walker. Threaded into `upsert_lock_entry`
1356        // so the lockfile `branch` slot mirrors the manifest verbatim.
1357        let manifest_ref = node.manifest_ref.clone();
1358        // `--only` filter + skip-on-hash short-circuits colocated in
1359        // `try_skip_or_filter` so this outer loop stays within the
1360        // 50-LOC per-function budget.
1361        if try_skip_or_filter(
1362            report,
1363            only,
1364            &pack_name,
1365            &pack_path,
1366            &actions,
1367            &commit_sha,
1368            synthetic,
1369            workspace,
1370            prior_lock,
1371            next_lock,
1372            dry_run,
1373            force,
1374        ) {
1375            continue;
1376        }
1377        let pack_halted = run_pack_lifecycle(
1378            report,
1379            vars,
1380            workspace,
1381            event_log,
1382            lock_path,
1383            dry_run,
1384            &plan,
1385            &fs,
1386            registry,
1387            pack_type_registry,
1388            &rt,
1389            &pack_name,
1390            &pack_path,
1391            &manifest,
1392            &visited_meta,
1393            scheduler,
1394        );
1395        if pack_halted {
1396            // Route (b) halt-state gating: drop any prior entry for the
1397            // halted pack so the next sync sees no prior hash and
1398            // re-executes from scratch. Successful packs in this same
1399            // run keep their freshly-upserted entries, and packs we did
1400            // not reach keep their prior entries untouched.
1401            next_lock.remove(&pack_name);
1402            return;
1403        }
1404        // Successful pack — record a fresh lockfile entry so the next
1405        // run's skip-on-hash test can succeed. Commit SHA is now plumbed
1406        // from the walker (M4-D): `PackNode::commit_sha` carries the
1407        // resolved HEAD SHA when the pack's working tree is a git
1408        // repository, otherwise an empty string keeps the hash stable.
1409        let actions_hash = compute_actions_hash(&actions, &commit_sha);
1410        upsert_lock_entry(
1411            prior_lock,
1412            next_lock,
1413            &pack_name,
1414            &commit_sha,
1415            &actions_hash,
1416            synthetic,
1417            manifest_ref.as_deref(),
1418        );
1419    }
1420}
1421
1422/// Build the multi-thread tokio runtime used to drive async pack-type
1423/// plugin dispatch. Pack-type plugins expose `async fn` methods via
1424/// `async_trait`, but the sync driver is synchronous end-to-end — we
1425/// block on each plugin future inside the outer action loop. Extracted
1426/// into a standalone helper so the runtime construction does not
1427/// inflate `run_actions` beyond the 50-LOC per-function budget.
1428///
1429/// # Multi-thread rationale (M5-2c)
1430///
1431/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
1432/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
1433/// itself is purely `async` / `.await` (no nested `block_on`), but future
1434/// plugin authors may reasonably compose `block_on` calls inside
1435/// lifecycle hooks — and external callers that drive `MetaPlugin` via
1436/// `rt.block_on(...)` within their own runtime would deadlock on a
1437/// current-thread runtime the moment a hook re-enters. A multi-thread
1438/// runtime with a small worker pool lets those re-entries resolve on a
1439/// sibling worker instead of blocking the dispatcher thread.
1440///
1441/// # Worker-thread sizing (feat-m6 H6)
1442///
1443/// The worker pool is sized from the resolved `--parallel` knob so the
1444/// runtime always has enough workers to service every in-flight pack op
1445/// plus at least one sibling for nested `block_on`. Clamped to
1446/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
1447/// one sibling so re-entrant hooks never deadlock), and the upper bound
1448/// caps the pool at the host's CPU count so `--parallel 0`
1449/// (unbounded-semantics) does not explode the worker count.
1450fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
1451    let workers = parallel.clamp(2, num_cpus::get().max(2));
1452    tokio::runtime::Builder::new_multi_thread()
1453        .worker_threads(workers)
1454        .enable_all()
1455        .build()
1456        .expect("tokio runtime for pack-type dispatch")
1457}
1458
1459/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
1460/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
1461/// the argument is threaded through so future explicit-install /
1462/// teardown verbs can share the same set shape.
1463fn new_visited_meta() -> MetaVisitedSet {
1464    std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
1465}
1466
1467/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
1468/// `true` when the outer loop should `continue` for this pack.
1469///
1470/// Extracted from `run_actions` so that function stays under the
1471/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
1472/// is a pure structural refactor.
1473#[allow(clippy::too_many_arguments)]
1474fn try_skip_or_filter(
1475    report: &mut SyncReport,
1476    only: Option<&GlobSet>,
1477    pack_name: &str,
1478    pack_path: &Path,
1479    actions: &[Action],
1480    commit_sha: &str,
1481    current_synthetic: bool,
1482    workspace: &Path,
1483    prior_lock: &std::collections::HashMap<String, LockEntry>,
1484    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1485    dry_run: bool,
1486    force: bool,
1487) -> bool {
1488    if skip_for_only_filter(only, pack_name, pack_path, workspace) {
1489        if let Some(prev) = prior_lock.get(pack_name) {
1490            next_lock.insert(pack_name.to_string(), prev.clone());
1491        }
1492        return true;
1493    }
1494    try_skip_pack(
1495        report,
1496        pack_name,
1497        pack_path,
1498        actions,
1499        commit_sha,
1500        current_synthetic,
1501        prior_lock,
1502        next_lock,
1503        dry_run,
1504        force,
1505    )
1506}
1507
1508/// Return `true` when `--only` is active and the pack's
1509/// **workspace-relative path** (normalized to forward-slash form) does
1510/// not match any of the registered globs. Name-fallback matching was
1511/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
1512/// "pack paths" and cross-platform consistency requires a single
1513/// normalized representation rather than `display()`-formatted strings
1514/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
1515/// as a glob-escape, not a path separator). For the root pack whose
1516/// `pack_path` is not under `workspace`, the fallback is to match
1517/// against the absolute path's forward-slash form.
1518fn skip_for_only_filter(
1519    only: Option<&GlobSet>,
1520    pack_name: &str,
1521    pack_path: &Path,
1522    workspace: &Path,
1523) -> bool {
1524    let Some(set) = only else { return false };
1525    let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
1526    let rel_str = rel.to_string_lossy().replace('\\', "/");
1527    let matches = set.is_match(&rel_str);
1528    if !matches {
1529        tracing::info!(
1530            target: "grex::sync",
1531            "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
1532        );
1533    }
1534    !matches
1535}
1536
1537/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
1538///
1539/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
1540/// with a pack-type-aware dispatch:
1541///
1542/// * [`PackType::Declarative`] retains the per-action execution shape that
1543///   M4 shipped — each action lands its own `ActionStarted` /
1544///   `ActionCompleted` / `ActionHalted` event bracket. The registry is
1545///   still consulted via [`PackTypeRegistry::get`] as a name-oracle so
1546///   mistyped packs fail closed.
1547/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
1548///   pack-type plugin's `sync` method (the sync CLI verb is the only
1549///   caller in M5-1; `install` / `update` / `teardown` verbs wire in
1550///   M5-2), returning a single aggregate [`ExecStep`]. A single event
1551///   bracket frames the async call.
1552///
1553/// Declarative is kept on the legacy per-action path because its event log
1554/// semantics (one event per action, per-step rollback context) are exactly
1555/// what plugin authors expect to observe. Unifying declarative under the
1556/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
1557/// to emit a step stream rather than a single aggregate.
1558#[allow(clippy::too_many_arguments)]
1559fn run_pack_lifecycle(
1560    report: &mut SyncReport,
1561    vars: &VarEnv,
1562    workspace: &Path,
1563    event_log: &Path,
1564    lock_path: &Path,
1565    dry_run: bool,
1566    plan: &PlanExecutor,
1567    fs: &FsExecutor,
1568    registry: &Arc<Registry>,
1569    pack_type_registry: &Arc<PackTypeRegistry>,
1570    rt: &tokio::runtime::Runtime,
1571    pack_name: &str,
1572    pack_path: &Path,
1573    manifest: &crate::pack::PackManifest,
1574    visited_meta: &MetaVisitedSet,
1575    scheduler: &Arc<Scheduler>,
1576) -> bool {
1577    let type_tag = manifest.r#type.as_str();
1578    // Name-oracle check: every pack type must be registered. Unknown
1579    // pack types halt the pack the same way M4 halted unknown actions.
1580    if pack_type_registry.get(type_tag).is_none() {
1581        let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1582        record_action_err(dry_run, report, event_log, lock_path, pack_name, 0, "pack-type", err);
1583        return true;
1584    }
1585    match manifest.r#type {
1586        crate::pack::PackType::Declarative => run_declarative_actions(
1587            report,
1588            vars,
1589            workspace,
1590            event_log,
1591            lock_path,
1592            dry_run,
1593            plan,
1594            fs,
1595            pack_name,
1596            pack_path,
1597            manifest,
1598            &manifest.actions,
1599            scheduler,
1600        ),
1601        crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
1602            report,
1603            vars,
1604            workspace,
1605            event_log,
1606            lock_path,
1607            dry_run,
1608            registry,
1609            pack_type_registry,
1610            rt,
1611            pack_name,
1612            pack_path,
1613            manifest,
1614            type_tag,
1615            visited_meta,
1616            scheduler,
1617        ),
1618    }
1619}
1620
1621/// Run a declarative pack's actions sequentially. Preserves the M4
1622/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1623/// `ActionHalted`). Returns `true` when the sync must halt.
1624#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1625fn run_declarative_actions(
1626    report: &mut SyncReport,
1627    vars: &VarEnv,
1628    workspace: &Path,
1629    event_log: &Path,
1630    lock_path: &Path,
1631    dry_run: bool,
1632    plan: &PlanExecutor,
1633    fs: &FsExecutor,
1634    pack_name: &str,
1635    pack_path: &Path,
1636    manifest: &crate::pack::PackManifest,
1637    actions: &[Action],
1638    scheduler: &Arc<Scheduler>,
1639) -> bool {
1640    // B12 v1.3.1: `apply_gitignore` was previously called here for
1641    // declarative packs (the per-action driver bypasses the plugin
1642    // path). Auto-mutation of the parent meta-repo's `.gitignore` was
1643    // removed in v1.3.1; `grex doctor` now surfaces an advisory when
1644    // the parent git index tracks pack content. The function is kept
1645    // as a no-op shim and the call is left in place so the diff stays
1646    // minimal — the reviewer pass will delete it together with the
1647    // other call sites in pack_type.rs.
1648    if !dry_run {
1649        let ctx = ExecCtx::new(vars, pack_path, workspace)
1650            .with_platform(Platform::current())
1651            .with_scheduler(scheduler);
1652        if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1653            record_action_err(dry_run, report, event_log, lock_path, pack_name, 0, "gitignore", e);
1654            return true;
1655        }
1656    }
1657    for (idx, action) in actions.iter().enumerate() {
1658        let ctx = ExecCtx::new(vars, pack_path, workspace)
1659            .with_platform(Platform::current())
1660            .with_scheduler(scheduler);
1661        let action_tag = action_kind_tag(action);
1662        append_manifest_event(
1663            dry_run,
1664            event_log,
1665            lock_path,
1666            &Event::ActionStarted {
1667                ts: Utc::now(),
1668                id: pack_name.to_string(),
1669                action_idx: idx,
1670                action_name: action_tag.to_string(),
1671                schema_version: SCHEMA_VERSION.to_string(),
1672            },
1673            &mut report.event_log_warnings,
1674        );
1675        let step_result =
1676            if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1677        if !record_action_outcome(
1678            dry_run,
1679            report,
1680            event_log,
1681            lock_path,
1682            pack_name,
1683            idx,
1684            action_tag,
1685            step_result,
1686        ) {
1687            return true;
1688        }
1689    }
1690    false
1691}
1692
1693/// Dispatch a pack-type plugin (meta / scripted) through the async
1694/// registry. Brackets the call with a single `ActionStarted` /
1695/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1696/// when the sync must halt.
1697#[allow(clippy::too_many_arguments)]
1698fn dispatch_pack_type_plugin(
1699    report: &mut SyncReport,
1700    vars: &VarEnv,
1701    workspace: &Path,
1702    event_log: &Path,
1703    lock_path: &Path,
1704    dry_run: bool,
1705    registry: &Arc<Registry>,
1706    pack_type_registry: &Arc<PackTypeRegistry>,
1707    rt: &tokio::runtime::Runtime,
1708    pack_name: &str,
1709    pack_path: &Path,
1710    manifest: &crate::pack::PackManifest,
1711    type_tag: &'static str,
1712    visited_meta: &MetaVisitedSet,
1713    scheduler: &Arc<Scheduler>,
1714) -> bool {
1715    // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1716    // The sync driver already walks children in post-order via the tree
1717    // walker; attaching the visited set would trigger MetaPlugin's
1718    // real-recursion branch and cause double dispatch (walker runs child
1719    // packs as their own graph nodes, then MetaPlugin would recurse into
1720    // them again). The `visited_meta` parameter is kept on the argument
1721    // list so future explicit-install / teardown verbs that invoke
1722    // MetaPlugin directly can share the same set shape.
1723    let _ = visited_meta;
1724    let ctx = ExecCtx::new(vars, pack_path, workspace)
1725        .with_platform(Platform::current())
1726        .with_registry(registry)
1727        .with_pack_type_registry(pack_type_registry)
1728        .with_scheduler(scheduler);
1729    append_manifest_event(
1730        dry_run,
1731        event_log,
1732        lock_path,
1733        &Event::ActionStarted {
1734            ts: Utc::now(),
1735            id: pack_name.to_string(),
1736            action_idx: 0,
1737            action_name: type_tag.to_string(),
1738            schema_version: SCHEMA_VERSION.to_string(),
1739        },
1740        &mut report.event_log_warnings,
1741    );
1742    // SAFETY: `get` just confirmed the plugin is registered for
1743    // `type_tag`, so this unwrap cannot panic under the matched arm.
1744    let plugin = pack_type_registry
1745        .get(type_tag)
1746        .expect("pack-type plugin must be registered (guarded above)");
1747    // feat-m6 CI fix — establish a task-local tier stack frame for every
1748    // async dispatch. Without this, `TierGuard::push` (which runs inside
1749    // the plugin lifecycle and may span `.await` / thread hops under the
1750    // multi-thread runtime) has no enforcement frame to push into.
1751    let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1752    !record_action_outcome(
1753        dry_run,
1754        report,
1755        event_log,
1756        lock_path,
1757        pack_name,
1758        0,
1759        type_tag,
1760        step_result,
1761    )
1762}
1763
1764/// Pure skip-eligibility decision. Returns `Some(hash)` when the pack
1765/// is eligible for the hash-skip short-circuit, `None` otherwise.
1766///
1767/// Splitting the decision out of [`try_skip_pack`] keeps the
1768/// side-effecting transcript bookkeeping testable in isolation: the
1769/// v1.1.1 synthetic-flag-flip regression exercises this helper without
1770/// having to stand up a `SyncReport` / `PackGraph`.
1771fn skip_eligibility(
1772    actions: &[Action],
1773    commit_sha: &str,
1774    current_synthetic: bool,
1775    prior: &LockEntry,
1776    dry_run: bool,
1777    force: bool,
1778) -> Option<String> {
1779    if dry_run || force {
1780        // Dry runs must always produce the planned-step transcript so
1781        // authors can see what `sync` *would* do. `--force` is the
1782        // operator's explicit opt-out from the hash short-circuit.
1783        return None;
1784    }
1785    let hash = compute_actions_hash(actions, commit_sha);
1786    if prior.actions_hash != hash {
1787        return None;
1788    }
1789    if prior.synthetic != current_synthetic {
1790        // Pack-shape flipped between runs (real ↔ synthetic). Even
1791        // when the actions hash matches by coincidence (e.g. a
1792        // declarative pack with empty `actions[]` whose pack.yaml was
1793        // deleted, falling through to a synthetic leaf with the same
1794        // empty actions list and stable commit SHA), we must NOT
1795        // carry the stale `synthetic` flag forward. Forcing the
1796        // upsert path re-emits the entry with the current flag.
1797        return None;
1798    }
1799    Some(hash)
1800}
1801
1802/// Decide whether `pack_name` can be short-circuited via a lockfile
1803/// hash match. When the prior hash matches the freshly-computed hash,
1804/// emit a single [`ExecResult::Skipped`] step and carry the prior
1805/// lockfile entry forward unchanged. Returns `true` when the pack was
1806/// skipped.
1807///
1808/// `current_synthetic` is the walker-derived synthetic flag for this
1809/// pack on the current run. The skip eligibility check requires it to
1810/// match `prior.synthetic` so a pack-shape transition (e.g. user
1811/// deletes `pack.yaml` so a previously-real pack now walks as
1812/// synthetic) invalidates the skip and forces the lockfile entry to
1813/// be re-emitted with the fresh `synthetic` value.
1814#[allow(clippy::too_many_arguments)]
1815fn try_skip_pack(
1816    report: &mut SyncReport,
1817    pack_name: &str,
1818    pack_path: &Path,
1819    actions: &[Action],
1820    commit_sha: &str,
1821    current_synthetic: bool,
1822    prior_lock: &std::collections::HashMap<String, LockEntry>,
1823    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1824    dry_run: bool,
1825    force: bool,
1826) -> bool {
1827    let Some(prior) = prior_lock.get(pack_name) else {
1828        return false;
1829    };
1830    let Some(hash) =
1831        skip_eligibility(actions, commit_sha, current_synthetic, prior, dry_run, force)
1832    else {
1833        return false;
1834    };
1835    let skipped_step = ExecStep {
1836        action_name: Cow::Borrowed("pack"),
1837        result: ExecResult::Skipped {
1838            pack_path: pack_path.to_path_buf(),
1839            actions_hash: hash.clone(),
1840        },
1841        // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1842        // short-circuit detail; we use it here instead of the prior
1843        // `Require { Satisfied, Skip }` proxy so renderers and consumers
1844        // can match on a single, purpose-built variant.
1845        details: StepKind::PackSkipped { actions_hash: hash },
1846    };
1847    report.steps.push(SyncStep {
1848        pack: pack_name.to_string(),
1849        action_idx: 0,
1850        exec_step: skipped_step,
1851    });
1852    // Carry the prior entry forward so the next-lock snapshot stays
1853    // consistent with what's on disk.
1854    next_lock.insert(pack_name.to_string(), prior.clone());
1855    true
1856}
1857
1858/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1859///
1860/// Stores `commit_sha` verbatim — including the empty string when the
1861/// pack is not a git working tree or the HEAD probe failed.
1862/// `actions_hash` is computed over the same `commit_sha`, so the two
1863/// fields stay internally consistent: if probing starts returning a
1864/// non-empty SHA on the next run, the hash differs and the skip is
1865/// correctly invalidated. The prior-preserve carve-out that was
1866/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1867/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1868///
1869/// `prior_lock` is consulted purely for observability: when a
1870/// previously-real pack flips to synthetic between runs (user deleted
1871/// the pack's `pack.yaml` so the walker fell back to v1.1.1
1872/// plain-git-child synthesis), a `tracing::warn!` records the
1873/// downgrade so the operator notices their declarative actions have
1874/// stopped running.
1875fn upsert_lock_entry(
1876    prior_lock: &std::collections::HashMap<String, LockEntry>,
1877    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1878    pack_name: &str,
1879    commit_sha: &str,
1880    actions_hash: &str,
1881    synthetic: bool,
1882    manifest_ref: Option<&str>,
1883) {
1884    if synthetic {
1885        if let Some(prior) = prior_lock.get(pack_name) {
1886            if !prior.synthetic {
1887                tracing::warn!(
1888                    target: "grex::sync",
1889                    pack = pack_name,
1890                    "pack `{pack_name}` downgraded from real to synthetic — \
1891                     pack.yaml missing on disk; only `git pull` will run going forward",
1892                );
1893            }
1894        }
1895    }
1896    let installed_at = Utc::now();
1897    let entry = next_lock.get(pack_name).map_or_else(
1898        || LockEntry {
1899            id: pack_name.to_string(),
1900            // v1.1.1 convention: path == id (1:1 id↔folder). Stage 1.e
1901            // (walker rewrite) will replace this with the parent-relative
1902            // manifest path captured during the walk.
1903            path: pack_name.to_string(),
1904            sha: commit_sha.to_string(),
1905            // v1.3.1 B14: mirror the parent manifest's `ref:` value
1906            // (or empty when absent), per the Lean theorem
1907            // `Grex.Lockfile.lockfile_branch_mirrors_manifest_ref`.
1908            branch: branch_of(manifest_ref),
1909            installed_at,
1910            actions_hash: actions_hash.to_string(),
1911            schema_version: "1".to_string(),
1912            synthetic,
1913        },
1914        |prev| LockEntry {
1915            installed_at,
1916            actions_hash: actions_hash.to_string(),
1917            sha: commit_sha.to_string(),
1918            synthetic,
1919            ..prev.clone()
1920        },
1921    );
1922    next_lock.insert(pack_name.to_string(), entry);
1923}
1924
1925/// Record one action outcome into `report` + event log. Returns `false`
1926/// when the run must halt (on error); `true` otherwise.
1927#[allow(clippy::too_many_arguments)]
1928fn record_action_outcome(
1929    dry_run: bool,
1930    report: &mut SyncReport,
1931    event_log: &Path,
1932    lock_path: &Path,
1933    pack_name: &str,
1934    idx: usize,
1935    action_tag: &'static str,
1936    step_result: Result<ExecStep, ExecError>,
1937) -> bool {
1938    match step_result {
1939        Ok(step) => {
1940            record_action_ok(dry_run, report, event_log, lock_path, pack_name, idx, step);
1941            true
1942        }
1943        Err(e) => {
1944            record_action_err(dry_run, report, event_log, lock_path, pack_name, idx, action_tag, e);
1945            false
1946        }
1947    }
1948}
1949
1950/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1951/// audit event, then push the step onto the report.
1952///
1953/// v1.3.1 B4 fix-up: under `dry_run = true`, the on-disk event-log writes
1954/// are skipped. The in-memory `report.steps` push still happens — dry-run
1955/// callers rely on the planned-step transcript for output.
1956#[allow(clippy::too_many_arguments)]
1957fn record_action_ok(
1958    dry_run: bool,
1959    report: &mut SyncReport,
1960    event_log: &Path,
1961    lock_path: &Path,
1962    pack_name: &str,
1963    idx: usize,
1964    step: ExecStep,
1965) {
1966    append_step_event(
1967        dry_run,
1968        event_log,
1969        lock_path,
1970        pack_name,
1971        &step,
1972        &mut report.event_log_warnings,
1973    );
1974    append_manifest_event(
1975        dry_run,
1976        event_log,
1977        lock_path,
1978        &Event::ActionCompleted {
1979            ts: Utc::now(),
1980            id: pack_name.to_string(),
1981            action_idx: idx,
1982            result_summary: format!("{:?}", step.result),
1983            schema_version: SCHEMA_VERSION.to_string(),
1984        },
1985        &mut report.event_log_warnings,
1986    );
1987    report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1988}
1989
1990/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
1991/// rich `HaltedContext` into `report.halted`.
1992///
1993/// v1.3.1 B4 fix-up: under `dry_run = true`, the on-disk event-log write
1994/// is skipped; the `report.halted` slot still receives the
1995/// [`HaltedContext`] so callers can render the halt reason without
1996/// touching disk.
1997#[allow(clippy::too_many_arguments)]
1998fn record_action_err(
1999    dry_run: bool,
2000    report: &mut SyncReport,
2001    event_log: &Path,
2002    lock_path: &Path,
2003    pack_name: &str,
2004    idx: usize,
2005    action_tag: &'static str,
2006    e: ExecError,
2007) {
2008    let error_summary = truncate_error_summary(&e);
2009    append_manifest_event(
2010        dry_run,
2011        event_log,
2012        lock_path,
2013        &Event::ActionHalted {
2014            ts: Utc::now(),
2015            id: pack_name.to_string(),
2016            action_idx: idx,
2017            action_name: action_tag.to_string(),
2018            error_summary,
2019            schema_version: SCHEMA_VERSION.to_string(),
2020        },
2021        &mut report.event_log_warnings,
2022    );
2023    let recovery_hint = recovery_hint_for(&e);
2024    report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
2025        pack: pack_name.to_string(),
2026        action_idx: idx,
2027        action_name: action_tag.to_string(),
2028        error: e,
2029        recovery_hint,
2030    })));
2031}
2032
2033/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
2034/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
2035/// stays uniform.
2036fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
2037    use crate::pack::Action;
2038    match action {
2039        Action::Symlink(_) => "symlink",
2040        Action::Unlink(_) => "unlink",
2041        Action::Env(_) => "env",
2042        Action::Mkdir(_) => "mkdir",
2043        Action::Rmdir(_) => "rmdir",
2044        Action::Require(_) => "require",
2045        Action::When(_) => "when",
2046        Action::Exec(_) => "exec",
2047    }
2048}
2049
2050/// Produce a bounded human summary of an [`ExecError`] for
2051/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
2052/// from pathological blowup when captured stderr is large.
2053fn truncate_error_summary(err: &ExecError) -> String {
2054    let mut s = err.to_string();
2055    if s.len() > ACTION_ERROR_SUMMARY_MAX {
2056        s.truncate(ACTION_ERROR_SUMMARY_MAX);
2057        s.push_str("…[truncated]");
2058    }
2059    s
2060}
2061
2062/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
2063/// `None` when no generic advice applies; the error's own `Display`
2064/// output is already shown by the `Halted` variant's format string.
2065fn recovery_hint_for(err: &ExecError) -> Option<String> {
2066    match err {
2067        ExecError::SymlinkDestOccupied { .. } => Some(
2068            "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
2069                .into(),
2070        ),
2071        ExecError::SymlinkPrivilegeDenied { .. } => {
2072            Some("enable Windows Developer Mode or re-run grex as administrator".into())
2073        }
2074        ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
2075            Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
2076        }
2077        ExecError::RmdirNotEmpty { .. } => {
2078            Some("set `force: true` on the rmdir action to recurse".into())
2079        }
2080        ExecError::EnvPersistenceDenied { .. } => {
2081            Some("re-run elevated (Machine scope needs admin)".into())
2082        }
2083        _ => None,
2084    }
2085}
2086
2087/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
2088///
2089/// Failures log a warning and are recorded in the report's
2090/// `event_log_warnings`; they do not abort the sync (spec: event-log write
2091/// failures are non-fatal).
2092///
2093/// # Concurrency
2094///
2095/// The append is serialized through a [`ManifestLock`] held across the
2096/// write. The lock is acquired **per action** (not once across the full
2097/// traversal) so cooperating grex processes can observe mid-progress log
2098/// state between actions; fd-lock acquisition is cheap on modern kernels
2099/// and sync runs are dominated by executor side effects, not lock waits.
2100/// This closes the bypass gap surfaced by the M3 concurrency review where
2101/// `append_event` was called without any cross-process serialisation.
2102fn append_step_event(
2103    dry_run: bool,
2104    log: &Path,
2105    lock_path: &Path,
2106    pack: &str,
2107    step: &ExecStep,
2108    warnings: &mut Vec<String>,
2109) {
2110    if dry_run {
2111        return;
2112    }
2113    let summary = format!("{}:{:?}", step.action_name, step.result);
2114    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
2115    if let Err(e) = append_event_locked(log, lock_path, &event) {
2116        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2117        warnings.push(format!("{}: {e}", log.display()));
2118    }
2119    // Schema version is recorded once at the manifest level by existing
2120    // manifest code; this stub uses the constant to keep a single source of
2121    // truth for forward-compat.
2122    let _ = SCHEMA_VERSION;
2123}
2124
2125/// Append a single [`Event`] under the shared [`ManifestLock`] path.
2126/// Failures are logged and recorded as non-fatal warnings — the spec
2127/// marks event-log write failures as non-aborting so a transient disk
2128/// error must not kill a sync mid-stream.
2129///
2130/// v1.3.1 B4 fix-up: when `dry_run` is `true`, this function is a no-op
2131/// — the dry-run contract forbids any write to `<workspace>/.grex/`,
2132/// including the audit `events.jsonl`. In-memory `event_log_warnings`
2133/// records remain available; only the on-disk side effect is gated.
2134fn append_manifest_event(
2135    dry_run: bool,
2136    log: &Path,
2137    lock_path: &Path,
2138    event: &Event,
2139    warnings: &mut Vec<String>,
2140) {
2141    if dry_run {
2142        return;
2143    }
2144    if let Err(e) = append_event_locked(log, lock_path, event) {
2145        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2146        warnings.push(format!("{}: {e}", log.display()));
2147    }
2148}
2149
2150/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
2151/// created lazily on first write.
2152fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
2153    if let Some(parent) = log.parent() {
2154        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2155    }
2156    if let Some(parent) = lock_path.parent() {
2157        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2158    }
2159    let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
2160    lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
2161}
2162
2163/// Re-export a cheap helper so CLI renderers can label halted steps by node
2164/// name without reaching into the graph twice.
2165#[must_use]
2166pub fn pack_display_name(node: &PackNode) -> &str {
2167    &node.name
2168}
2169
2170/// Run a full teardown over the pack tree rooted at `pack_root`.
2171///
2172/// Mirrors [`run`] but invokes
2173/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
2174/// **reverse** post-order so a parent tears down before its children
2175/// (the inverse of install). Children composed later by an author
2176/// consequently teardown earlier, matching the declarative
2177/// auto-reverse contract (R-M5-11).
2178///
2179/// All other concerns are identical to [`run`]: workspace lock, plan-
2180/// phase validators, lockfile update skipped (teardown does not
2181/// write a `actions_hash` forward), and event-log bracketing.
2182/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
2183/// user explicitly asked to remove the pack, so we always dispatch.
2184///
2185/// # Errors
2186///
2187/// Returns the first error that halts the pipeline — see [`SyncError`].
2188///
2189/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
2190/// the parameter through teardown for parity; stages 3-4 add the polls.
2191pub fn teardown(
2192    pack_root: &Path,
2193    opts: &SyncOptions,
2194    cancel: &CancellationToken,
2195) -> Result<SyncReport, SyncError> {
2196    let _ = cancel;
2197    let workspace = prepare_workspace(pack_root, opts)?;
2198    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
2199    let _ws_guard = match ws_lock.try_acquire() {
2200        Ok(Some(g)) => g,
2201        Ok(None) => {
2202            return Err(SyncError::WorkspaceBusy {
2203                workspace: workspace.clone(),
2204                lock_path: ws_lock_path,
2205            });
2206        }
2207        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
2208    };
2209
2210    // v1.2.1 path (iii) — teardown is read-only against the existing
2211    // disk state (no clones / fetches / prunes). It only needs the
2212    // graph build pass; `sync_meta` is intentionally skipped here.
2213    let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
2214    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
2215
2216    let mut report = SyncReport {
2217        graph,
2218        steps: Vec::new(),
2219        halted: None,
2220        event_log_warnings: Vec::new(),
2221        pre_run_recovery: prep.pre_run_recovery,
2222        // teardown does not run the legacy-layout migration — by the time
2223        // a user is tearing down, the layout has already been migrated
2224        // (or was never legacy in the first place). Surfacing an empty
2225        // list keeps the report shape symmetric with `run()`.
2226        workspace_migrations: Vec::new(),
2227    };
2228
2229    // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
2230    // Scheduler, thread it through every `ExecCtx` the teardown path
2231    // constructs. Teardown is the other user-facing verb that owns a
2232    // runtime, so it gets the same wiring.
2233    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
2234    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
2235    run_teardown(
2236        &mut report,
2237        &prep.order,
2238        &prep.vars,
2239        &workspace,
2240        &prep.event_log,
2241        &prep.lock_path,
2242        &prep.registry,
2243        &prep.pack_type_registry,
2244        resolved_parallel,
2245        &scheduler,
2246    );
2247    Ok(report)
2248}
2249
2250/// Dispatch `teardown` for every pack in **reverse** post-order.
2251/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
2252/// rather than the per-action M4 path because the trait's
2253/// auto-reverse / explicit-block logic must compose with the
2254/// registry; going through the per-action path would mean
2255/// re-implementing inverse synthesis in the sync loop.
2256#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2257fn run_teardown(
2258    report: &mut SyncReport,
2259    order: &[usize],
2260    vars: &VarEnv,
2261    workspace: &Path,
2262    event_log: &Path,
2263    lock_path: &Path,
2264    registry: &Arc<Registry>,
2265    pack_type_registry: &Arc<PackTypeRegistry>,
2266    parallel: usize,
2267    scheduler: &Arc<Scheduler>,
2268) {
2269    let rt = build_pack_type_runtime(parallel);
2270    // Reverse post-order: root first, then children. Pack-type plugin
2271    // teardown methods reverse their own children/actions, so the
2272    // outer loop only flips the inter-pack order.
2273    for &id in order.iter().rev() {
2274        let Some(node) = report.graph.node(id) else { continue };
2275        let pack_name = node.name.clone();
2276        let pack_path = node.path.clone();
2277        let manifest = node.manifest.clone();
2278        let type_tag = manifest.r#type.as_str();
2279        if pack_type_registry.get(type_tag).is_none() {
2280            let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
2281            // Teardown has no dry-run mode — pass `false` so the
2282            // event-log writes proceed as before.
2283            record_action_err(false, report, event_log, lock_path, &pack_name, 0, "pack-type", err);
2284            return;
2285        }
2286        let ctx = ExecCtx::new(vars, &pack_path, workspace)
2287            .with_platform(Platform::current())
2288            .with_registry(registry)
2289            .with_pack_type_registry(pack_type_registry)
2290            .with_scheduler(scheduler);
2291        append_manifest_event(
2292            false,
2293            event_log,
2294            lock_path,
2295            &Event::ActionStarted {
2296                ts: Utc::now(),
2297                id: pack_name.clone(),
2298                action_idx: 0,
2299                action_name: type_tag.to_string(),
2300                schema_version: SCHEMA_VERSION.to_string(),
2301            },
2302            &mut report.event_log_warnings,
2303        );
2304        let plugin = pack_type_registry
2305            .get(type_tag)
2306            .expect("pack-type plugin must be registered (guarded above)");
2307        // feat-m6 CI fix — see dispatch_pack_type note.
2308        let step_result =
2309            rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
2310        if !record_action_outcome(
2311            false,
2312            report,
2313            event_log,
2314            lock_path,
2315            &pack_name,
2316            0,
2317            type_tag,
2318            step_result,
2319        ) {
2320            return;
2321        }
2322    }
2323}
2324
2325/// Test-only hook: append one [`Event::Sync`] through the same
2326/// [`ManifestLock`]-serialised path the sync driver uses.
2327///
2328/// Exposed so integration tests under `tests/` can exercise the locked
2329/// append helper without spinning up a full pack tree. Not intended for
2330/// downstream consumers — the signature may change without notice.
2331#[doc(hidden)]
2332pub fn __test_append_sync_event(
2333    log: &Path,
2334    lock_path: &Path,
2335    pack: &str,
2336    action_name: &str,
2337) -> Result<(), String> {
2338    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
2339    append_event_locked(log, lock_path, &event)
2340}
2341
2342// ----------------------------------------------------------------------
2343// PR E — pre-run teardown scan
2344// ----------------------------------------------------------------------
2345
2346/// One `ActionStarted` event in the manifest log that has no matching
2347/// `ActionCompleted` or `ActionHalted` peer.
2348///
2349/// Dangling starts are the primary crash signal: the process wrote the
2350/// pre-action event, then died before the executor returned. Callers
2351/// should surface these to the operator (diagnostics only this PR; a
2352/// future `grex doctor` verb will act on them).
2353#[non_exhaustive]
2354#[derive(Debug, Clone, PartialEq, Eq)]
2355pub struct DanglingStart {
2356    /// Pack that owned the halted action.
2357    pub pack: String,
2358    /// 0-based action index within the pack.
2359    pub action_idx: usize,
2360    /// Short action kind tag.
2361    pub action_name: String,
2362    /// Timestamp the `ActionStarted` event was written.
2363    pub started_at: DateTime<Utc>,
2364}
2365
2366/// Summary of teardown artifacts found under a pack root before a sync
2367/// begins.
2368///
2369/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
2370/// proceeds regardless of what the scan finds.
2371#[non_exhaustive]
2372#[derive(Debug, Clone, Default, PartialEq, Eq)]
2373pub struct RecoveryReport {
2374    /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
2375    /// original (symlink-action rollback orphan).
2376    pub orphan_backups: Vec<PathBuf>,
2377    /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
2378    /// `backup: true`.
2379    pub orphan_tombstones: Vec<PathBuf>,
2380    /// `ActionStarted` events in the log with no matching
2381    /// `ActionCompleted`/`ActionHalted`.
2382    pub dangling_starts: Vec<DanglingStart>,
2383}
2384
2385impl RecoveryReport {
2386    /// `true` when the scan found nothing worth reporting.
2387    #[must_use]
2388    pub fn is_empty(&self) -> bool {
2389        self.orphan_backups.is_empty()
2390            && self.orphan_tombstones.is_empty()
2391            && self.dangling_starts.is_empty()
2392    }
2393}
2394
2395/// Walk `workspace` and the manifest log to find crash-recovery artifacts.
2396///
2397/// Inspects:
2398///
2399/// * `workspace` for `.grex.bak` orphans and timestamped `.grex.bak.<ts>`
2400///   tombstones. The workspace IS where children materialise (whether
2401///   the default flat-sibling layout under the pack root, or an
2402///   explicit `--workspace` override directory) so this single bounded
2403///   walk covers every backup site.
2404/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
2405///   have no matching `ActionCompleted` / `ActionHalted` successor.
2406///
2407/// Non-blocking: scan errors are swallowed to an empty report so a
2408/// half-readable directory cannot kill a sync that would otherwise
2409/// succeed. Call sites that want to surface scan failures should read
2410/// the manifest directly.
2411///
2412/// Pre-`v1.1.0` post-review fix this anchored at `pack_root_dir(pack_root)`,
2413/// which missed every backup under a `--workspace` override.
2414///
2415/// # Errors
2416///
2417/// Returns [`SyncError::Validation`] only when the manifest read itself
2418/// reports corruption. Filesystem traversal errors are swallowed.
2419pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
2420    let mut report = RecoveryReport::default();
2421    walk_for_backups(workspace, &mut report);
2422    if event_log.exists() {
2423        match read_all(event_log) {
2424            Ok(events) => {
2425                report.dangling_starts = collect_dangling_starts(&events);
2426            }
2427            Err(e) => {
2428                return Err(SyncError::Validation {
2429                    errors: vec![PackValidationError::DependsOnUnsatisfied {
2430                        pack: "<event-log>".into(),
2431                        required: e.to_string(),
2432                    }],
2433                });
2434            }
2435        }
2436    }
2437    Ok(report)
2438}
2439
2440/// Shallow directory walker (bounded depth = 6) that categorizes
2441/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
2442/// report slot. Depth-limited so a pathological workspace with a deep
2443/// tree cannot stall the scan; realistic layouts are well under six
2444/// levels.
2445fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
2446    walk_for_backups_inner(root, report, 0);
2447}
2448
2449fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
2450    const MAX_DEPTH: u32 = 6;
2451    if depth > MAX_DEPTH {
2452        return;
2453    }
2454    let Ok(entries) = std::fs::read_dir(dir) else { return };
2455    for entry_result in entries {
2456        let entry = match entry_result {
2457            Ok(e) => e,
2458            Err(e) => {
2459                tracing::warn!(
2460                    target: "grex::sync::recover",
2461                    "skipping unreadable entry under `{}`: {e}",
2462                    dir.display(),
2463                );
2464                continue;
2465            }
2466        };
2467        let path = entry.path();
2468        let name = entry.file_name();
2469        let Some(name_str) = name.to_str() else { continue };
2470        if name_str.ends_with(".grex.bak") {
2471            report.orphan_backups.push(path.clone());
2472            continue;
2473        }
2474        if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
2475            // `rsplit_once` returns `(prefix, suffix)`; suffix is the
2476            // timestamp chunk. Accept any non-empty suffix — the exact
2477            // timestamp shape is `fs_executor` internal.
2478            if !rest.1.is_empty() {
2479                report.orphan_tombstones.push(path.clone());
2480                continue;
2481            }
2482        }
2483        // Recurse only into real directories (not symlinks, to avoid
2484        // traversing into the workspace's cloned repos via aliased
2485        // paths). `entry.file_type()` does NOT follow symlinks (unlike
2486        // `entry.metadata()` which would dereference and report the
2487        // target's type — defeating the very check this guards). The
2488        // symlink-skip is also explicit so the intent is recoverable
2489        // from the source: backup-recovery never crosses a symlink.
2490        let Ok(ft) = entry.file_type() else { continue };
2491        if ft.is_symlink() {
2492            continue;
2493        }
2494        if ft.is_dir() {
2495            walk_for_backups_inner(&path, report, depth + 1);
2496        }
2497    }
2498}
2499
2500/// Reduce an event stream to a list of `ActionStarted` records with no
2501/// matching terminator.
2502///
2503/// Matching is positional per `(pack, action_idx)`: a later
2504/// `ActionCompleted` or `ActionHalted` with the same key clears the
2505/// entry. Whatever remains in the map after the pass is dangling.
2506fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
2507    use std::collections::HashMap;
2508    let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
2509    for ev in events {
2510        match ev {
2511            // v1.3.1 schema v2: pack-id field is `id`. The destructure
2512            // binds `id` and `schema_version` is ignored via `..`.
2513            Event::ActionStarted { ts, id, action_idx, action_name, .. } => {
2514                open.insert(
2515                    (id.clone(), *action_idx),
2516                    DanglingStart {
2517                        pack: id.clone(),
2518                        action_idx: *action_idx,
2519                        action_name: action_name.clone(),
2520                        started_at: *ts,
2521                    },
2522                );
2523            }
2524            Event::ActionCompleted { id, action_idx, .. }
2525            | Event::ActionHalted { id, action_idx, .. } => {
2526                open.remove(&(id.clone(), *action_idx));
2527            }
2528            _ => {}
2529        }
2530    }
2531    let mut out: Vec<DanglingStart> = open.into_values().collect();
2532    out.sort_by_key(|a| a.started_at);
2533    out
2534}
2535
2536#[cfg(test)]
2537mod synthetic_transition_tests {
2538    //! v1.1.1 — regression cover for the pack-shape transition fixes.
2539    //!
2540    //! These tests exercise [`skip_eligibility`] / [`upsert_lock_entry`]
2541    //! directly (no walker, no fs) so the assertion is on the plumbing
2542    //! itself: skip eligibility must require synthetic-flag agreement
2543    //! even when the actions hash matches by coincidence, and the
2544    //! upsert path must record the real-to-synthetic downgrade in the
2545    //! lockfile so the operator's lockfile reflects what just happened.
2546    use super::{skip_eligibility, upsert_lock_entry, LockEntry};
2547    use crate::lockfile::compute_actions_hash;
2548    use chrono::{TimeZone, Utc};
2549    use std::collections::HashMap;
2550
2551    fn ts() -> chrono::DateTime<Utc> {
2552        Utc.with_ymd_and_hms(2026, 4, 27, 10, 0, 0).unwrap()
2553    }
2554
2555    /// Stable empty-actions hash with a fixed commit SHA. The same
2556    /// inputs feed both the prior (real) and the new (synthetic)
2557    /// configuration in the regression below, which is exactly the
2558    /// coincidental-hash-match scenario FIX 3 must catch.
2559    fn stable_hash() -> String {
2560        compute_actions_hash(&[], "deadbeef")
2561    }
2562
2563    fn prior_entry(synthetic: bool) -> LockEntry {
2564        LockEntry {
2565            id: "alpha".into(),
2566            path: "alpha".into(),
2567            sha: "deadbeef".into(),
2568            branch: "main".into(),
2569            installed_at: ts(),
2570            actions_hash: stable_hash(),
2571            schema_version: "1".into(),
2572            synthetic,
2573        }
2574    }
2575
2576    /// FIX 3 — pack flips from real → synthetic but `actions_hash` and
2577    /// `commit_sha` happen to match. The skip MUST be invalidated so
2578    /// the upsert path re-emits the lockfile entry with `synthetic =
2579    /// true`.
2580    #[test]
2581    fn skip_eligibility_invalidates_when_synthetic_flag_flips() {
2582        let prior = prior_entry(false);
2583        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2584        assert!(decision.is_none(), "skip must be invalidated when synthetic flag flips");
2585    }
2586
2587    /// Same hash, same synthetic flag → skip is allowed (baseline).
2588    #[test]
2589    fn skip_eligibility_allows_skip_when_synthetic_matches() {
2590        let prior = prior_entry(true);
2591        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2592        assert_eq!(
2593            decision.as_deref(),
2594            Some(stable_hash().as_str()),
2595            "skip must be honoured when synthetic flag matches",
2596        );
2597    }
2598
2599    /// `dry_run` and `force` always disable the skip regardless of
2600    /// flag agreement.
2601    #[test]
2602    fn skip_eligibility_respects_dry_run_and_force() {
2603        let prior = prior_entry(true);
2604        assert!(skip_eligibility(&[], "deadbeef", true, &prior, true, false).is_none());
2605        assert!(skip_eligibility(&[], "deadbeef", true, &prior, false, true).is_none());
2606    }
2607
2608    /// FIX 4 — `upsert_lock_entry` records the downgrade in the
2609    /// lockfile (entry flips to `synthetic = true`) when the prior
2610    /// entry was real. The `tracing::warn!` is fire-and-forget, but
2611    /// the lockfile transition itself is observable and must be
2612    /// correct.
2613    #[test]
2614    fn upsert_lock_entry_records_real_to_synthetic_downgrade() {
2615        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2616        prior.insert(
2617            "beta".into(),
2618            LockEntry {
2619                id: "beta".into(),
2620                path: "beta".into(),
2621                sha: "deadbeef".into(),
2622                branch: "main".into(),
2623                installed_at: ts(),
2624                actions_hash: stable_hash(),
2625                schema_version: "1".into(),
2626                synthetic: false,
2627            },
2628        );
2629        let mut next: HashMap<String, LockEntry> = HashMap::new();
2630
2631        upsert_lock_entry(&prior, &mut next, "beta", "deadbeef", &stable_hash(), true, None);
2632
2633        let entry = next.get("beta").expect("entry must be upserted");
2634        assert!(entry.synthetic, "downgraded entry must carry synthetic = true");
2635        assert_eq!(entry.actions_hash, stable_hash(), "actions_hash must reflect current run");
2636    }
2637
2638    /// Upsert path is a no-op for the steady-state case (synthetic →
2639    /// synthetic): the entry is replaced with the current run's
2640    /// timestamp/hash but the synthetic flag is preserved. This
2641    /// guards against an over-eager warning fire.
2642    #[test]
2643    fn upsert_lock_entry_no_op_for_steady_state_synthetic() {
2644        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2645        prior.insert(
2646            "gamma".into(),
2647            LockEntry {
2648                id: "gamma".into(),
2649                path: "gamma".into(),
2650                sha: "deadbeef".into(),
2651                branch: "main".into(),
2652                installed_at: ts(),
2653                actions_hash: stable_hash(),
2654                schema_version: "1".into(),
2655                synthetic: true,
2656            },
2657        );
2658        let mut next: HashMap<String, LockEntry> = HashMap::new();
2659
2660        upsert_lock_entry(&prior, &mut next, "gamma", "deadbeef", &stable_hash(), true, None);
2661
2662        let entry = next.get("gamma").expect("entry must be upserted");
2663        assert!(entry.synthetic, "synthetic must remain true on no-op refresh");
2664    }
2665}
2666
2667#[cfg(test)]
2668mod error_display_tests {
2669    //! v1.2.0 Stage 1.k — `SyncError` Display assertions.
2670    //!
2671    //! Pure construction + `to_string()` checks. Variants land dormant —
2672    //! Stage 1.g (rayon scheduler) wires `SchedulerCancelled` once
2673    //! cooperative cancel polls reach the parallel walker.
2674    use super::SyncError;
2675
2676    #[test]
2677    fn test_sync_error_scheduler_cancelled_display() {
2678        let err = SyncError::SchedulerCancelled;
2679        assert_eq!(err.to_string(), "sync cancelled by user");
2680    }
2681}
2682
2683#[cfg(test)]
2684mod sync_options_v1_2_0_tests {
2685    //! v1.2.0 Stage 1.m — leaf cover for new [`SyncOptions`] fields.
2686    //!
2687    //! These tests are mechanical default-value assertions plus simple
2688    //! builder/clone round-trips. They exist to lock down that:
2689    //!
2690    //! 1. Adding the new fields preserves v1.1.1 behavior (defaults
2691    //!    leave existing call sites observably unchanged).
2692    //! 2. The shape is what later walker stages (1.h / 1.j / 1.l) will
2693    //!    consume — if any of these fields are renamed or change type,
2694    //!    those stages must update in lock-step.
2695    //!
2696    //! The fields themselves are *dormant placeholders* at 1.m scope —
2697    //! no behavior wiring lives in this stage.
2698    use super::{pack_root_dir, resolve_workspace, SyncError, SyncOptions};
2699
2700    /// `force_prune` defaults to `false` so existing call sites refuse
2701    /// to drop dirty trees (v1.1.1 behavior).
2702    #[test]
2703    fn test_sync_options_default_force_prune_false() {
2704        let opts = SyncOptions::default();
2705        assert!(!opts.force_prune, "force_prune must default to false");
2706    }
2707
2708    /// `force_prune_with_ignored` defaults to `false` so existing call
2709    /// sites refuse to drop ignored content (v1.1.1 behavior).
2710    #[test]
2711    fn test_sync_options_default_force_prune_with_ignored_false() {
2712        let opts = SyncOptions::default();
2713        assert!(!opts.force_prune_with_ignored, "force_prune_with_ignored must default to false");
2714    }
2715
2716    /// `migrate_lockfile` defaults to `false` so the walker errors on
2717    /// legacy v1.1.1 lockfile shapes unless the caller opts in.
2718    #[test]
2719    fn test_sync_options_default_migrate_lockfile_false() {
2720        let opts = SyncOptions::default();
2721        assert!(!opts.migrate_lockfile, "migrate_lockfile must default to false");
2722    }
2723
2724    /// `recurse` defaults to `true` — the walker descends into nested
2725    /// meta-children unless `--shallow` is requested.
2726    #[test]
2727    fn test_sync_options_default_recurse_true() {
2728        let opts = SyncOptions::default();
2729        assert!(opts.recurse, "recurse must default to true");
2730    }
2731
2732    /// `max_depth` defaults to `None` — unbounded recursion when
2733    /// `recurse` is `true`.
2734    #[test]
2735    fn test_sync_options_default_max_depth_none() {
2736        let opts = SyncOptions::default();
2737        assert!(opts.max_depth.is_none(), "max_depth must default to None");
2738    }
2739
2740    /// Setting `force_prune_with_ignored = true` alongside
2741    /// `force_prune = true` is the documented "stronger" combination.
2742    /// No contradiction: `with_ignored` is the harder override and
2743    /// implies the base `force_prune` semantics. This test guards the
2744    /// invariant that both flags coexist as plain `bool` (not enum)
2745    /// so callers can set them independently without runtime panic.
2746    #[test]
2747    fn test_sync_options_force_prune_with_ignored_implies_force_prune() {
2748        let opts = SyncOptions {
2749            force_prune: true,
2750            force_prune_with_ignored: true,
2751            ..SyncOptions::default()
2752        };
2753        assert!(opts.force_prune);
2754        assert!(opts.force_prune_with_ignored);
2755    }
2756
2757    /// `max_depth = Some(n)` paired with `recurse = true` is the
2758    /// documented `--shallow=N` shape. The fields are independent
2759    /// `bool` / `Option<usize>` so callers may set `max_depth` while
2760    /// `recurse` is left at its default (`true`). Stage 1.j will
2761    /// later define the precise interaction; this test only locks
2762    /// the two fields' types and defaults.
2763    #[test]
2764    fn test_sync_options_max_depth_pairs_with_recurse() {
2765        let opts = SyncOptions { max_depth: Some(2), ..SyncOptions::default() };
2766        assert_eq!(opts.max_depth, Some(2));
2767        assert!(opts.recurse, "recurse stays at its default (true) when only max_depth is set");
2768    }
2769
2770    /// Round-trip via `Clone` — guards that all new fields participate
2771    /// in the existing `Clone` derive (no `#[clone(skip)]` slipped in).
2772    #[test]
2773    fn test_sync_options_clone_preserves_new_fields() {
2774        let opts = SyncOptions {
2775            force_prune: true,
2776            force_prune_with_ignored: true,
2777            migrate_lockfile: true,
2778            recurse: false,
2779            max_depth: Some(7),
2780            ..SyncOptions::default()
2781        };
2782        let cloned = opts.clone();
2783        assert_eq!(cloned.force_prune, opts.force_prune);
2784        assert_eq!(cloned.force_prune_with_ignored, opts.force_prune_with_ignored);
2785        assert_eq!(cloned.migrate_lockfile, opts.migrate_lockfile);
2786        assert_eq!(cloned.recurse, opts.recurse);
2787        assert_eq!(cloned.max_depth, opts.max_depth);
2788    }
2789
2790    // ------------------------------------------------------------------
2791    // v1.2.1 path (iii) — `resolve_workspace` canonicalisation tests
2792    // ------------------------------------------------------------------
2793
2794    /// `--workspace` pointing at a non-existent directory must fail
2795    /// fast with a Validation error citing the offending path. We
2796    /// explicitly do NOT mkdir-p someone else's typo — `--workspace`
2797    /// is an opt-in operator decision and a missing target is always
2798    /// a configuration mistake.
2799    #[test]
2800    fn test_resolve_workspace_errors_on_missing_override_dir() {
2801        let tmp = tempfile::tempdir().unwrap();
2802        let missing = tmp.path().join("nope");
2803        let pack_root = tmp.path();
2804        let err = resolve_workspace(pack_root, Some(missing.as_path())).expect_err("must fail");
2805        match err {
2806            SyncError::Validation { errors } => {
2807                assert!(errors.iter().any(|e| format!("{e}").contains("does not exist")));
2808            }
2809            other => panic!("expected Validation, got {other:?}"),
2810        }
2811    }
2812
2813    /// `--workspace = None` is the default cwd-meta path — no
2814    /// canonicalize, no fail-on-missing. The pack-root path is
2815    /// returned verbatim (post `pack_root_dir` normalisation).
2816    #[test]
2817    fn test_resolve_workspace_none_returns_pack_root_dir() {
2818        let tmp = tempfile::tempdir().unwrap();
2819        let pack_root = tmp.path().join("nonexistent-yet");
2820        let resolved = resolve_workspace(&pack_root, None).expect("None override is always Ok");
2821        assert_eq!(resolved, pack_root_dir(&pack_root));
2822    }
2823
2824    /// `--workspace = Some(<existing>)` returns the canonicalised path.
2825    /// On Windows this typically inserts the `\\?\` long-path prefix;
2826    /// on Unix it resolves any `..` / symlink components. Either way
2827    /// the returned path is what every downstream pass anchors against.
2828    #[test]
2829    fn test_resolve_workspace_canonicalises_existing_override() {
2830        let tmp = tempfile::tempdir().unwrap();
2831        let real = tmp.path().join("real-ws");
2832        std::fs::create_dir_all(&real).unwrap();
2833        let pack_root = tmp.path();
2834        let resolved =
2835            resolve_workspace(pack_root, Some(real.as_path())).expect("existing dir must resolve");
2836        let canonical = real.canonicalize().unwrap();
2837        assert_eq!(resolved, canonical);
2838    }
2839}