Skip to main content

grex_core/
sync.rs

1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`Walker`] + [`FsPackLoader`] + a `GitBackend`.
7//! 2. Run plan-phase validators (manifest-level + graph-level).
8//! 3. Execute every action via a pluggable [`ActionExecutor`]
9//!    ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
10//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
11//!    `.grex/grex.jsonl` event log.
12//!
13//! # Traversal order
14//!
15//! Nodes are executed in **depth-first post-order**: children fully install
16//! before their parent. Rationale: parent packs commonly `require:` artifacts
17//! created by children (e.g. a parent symlink whose `src` lives inside a
18//! child). Running the root last matches the overlay-style dotfile-install
19//! intent authors expect, and it matches how `walker.walk` is structured
20//! (children are hydrated before the recursion returns).
21//!
22//! # Decoupling
23//!
24//! The CLI crate drives this module through a thin `run()` entry point;
25//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
26//! expressions, ref overrides) can land in later milestones without breaking
27//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
28//! variant set.
29
30use std::borrow::Cow;
31use std::fs;
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34
35use chrono::{DateTime, Utc};
36use globset::{Glob, GlobSet, GlobSetBuilder};
37use thiserror::Error;
38use tokio_util::sync::CancellationToken;
39
40use crate::execute::{
41    ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
42    PlanExecutor, Platform, StepKind,
43};
44use crate::fs::{ManifestLock, ScopedLock};
45use crate::git::GixBackend;
46use crate::lockfile::{
47    compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
48};
49use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
50use crate::pack::{Action, PackValidationError};
51use crate::plugin::{PackTypeRegistry, Registry};
52use crate::scheduler::Scheduler;
53use crate::tree::{FsPackLoader, PackGraph, PackNode, TreeError, Walker};
54use crate::vars::VarEnv;
55
56/// Inputs to [`run`].
57///
58/// Fields are public-writable so call sites can construct with struct
59/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
60/// so future knobs (parallelism, filter expressions, additional ref
61/// strategies) can land without breaking library consumers who
62/// constructed with explicit-literal syntax. Forces callers to use
63/// struct-update syntax (`..Default::default()`).
64#[non_exhaustive]
65#[derive(Debug, Clone)]
66pub struct SyncOptions {
67    /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
68    pub dry_run: bool,
69    /// When `false`, skip plan-phase validators (manifest + graph). Debug
70    /// escape hatch; production callers should leave this `true`.
71    pub validate: bool,
72    /// Override workspace directory. `None` → the parent pack root itself
73    /// (children resolve as flat siblings of the parent pack root).
74    pub workspace: Option<PathBuf>,
75    /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
76    /// `Some`, every child pack clone/checkout uses this ref instead of
77    /// the declared `child.ref`. Empty strings are rejected at the CLI
78    /// layer.
79    pub ref_override: Option<String>,
80    /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
81    /// strings — compiled internally via an in-crate `globset` helper so the
82    /// `globset` crate version does not leak into the public API.
83    /// `None` / empty means every pack runs (M3 semantics). Matching is
84    /// against the pack's **workspace-relative** path normalized to
85    /// forward-slash form.
86    pub only_patterns: Option<Vec<String>>,
87    /// Bypass the lockfile hash-match skip (`grex sync --force`). When
88    /// `true`, every pack re-executes even if its `actions_hash` is
89    /// unchanged from the prior lockfile.
90    pub force: bool,
91    /// Max parallel pack ops for this sync run (feat-m6-1).
92    ///
93    /// * `None` → callers default to `num_cpus::get()` at CLI layer.
94    ///   Library callers who construct `SyncOptions` directly and leave
95    ///   this `None` get `num_cpus::get()` semantics too — the sync
96    ///   driver resolves the default in one place so the scheduler slot
97    ///   on every `ExecCtx` is always populated.
98    /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
99    /// * `Some(1)` → serial fast-path.
100    /// * `Some(n >= 2)` → bounded parallel.
101    pub parallel: Option<usize>,
102}
103
104impl Default for SyncOptions {
105    fn default() -> Self {
106        Self {
107            dry_run: false,
108            validate: true,
109            workspace: None,
110            ref_override: None,
111            only_patterns: None,
112            force: false,
113            parallel: None,
114        }
115    }
116}
117
118/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
119/// Empty / absent input yields `Ok(None)` so M3's zero-config path
120/// (every pack runs) stays the default.
121fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
122    let Some(pats) = patterns else { return Ok(None) };
123    if pats.is_empty() {
124        return Ok(None);
125    }
126    let mut builder = GlobSetBuilder::new();
127    for p in pats {
128        let glob = Glob::new(p)
129            .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
130        builder.add(glob);
131    }
132    let set = builder
133        .build()
134        .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
135    Ok(Some(set))
136}
137
138impl SyncOptions {
139    /// Default options: wet-run, validators enabled, default workspace path.
140    #[must_use]
141    pub fn new() -> Self {
142        Self::default()
143    }
144
145    /// Set `dry_run`.
146    #[must_use]
147    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
148        self.dry_run = dry_run;
149        self
150    }
151
152    /// Set `validate`.
153    #[must_use]
154    pub fn with_validate(mut self, validate: bool) -> Self {
155        self.validate = validate;
156        self
157    }
158
159    /// Set `workspace` override.
160    #[must_use]
161    pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
162        self.workspace = workspace;
163        self
164    }
165
166    /// Set `ref_override` (`--ref`).
167    #[must_use]
168    pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
169        self.ref_override = ref_override;
170        self
171    }
172
173    /// Set `only_patterns` (`--only`). Empty vector or `None` disables
174    /// the filter.
175    #[must_use]
176    pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
177        self.only_patterns = patterns;
178        self
179    }
180
181    /// Set `force` (`--force`).
182    #[must_use]
183    pub fn with_force(mut self, force: bool) -> Self {
184        self.force = force;
185        self
186    }
187
188    /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
189    /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
190    #[must_use]
191    pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
192        self.parallel = parallel;
193        self
194    }
195}
196
197/// One executed (or planned) action step in a sync run.
198///
199/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
200/// plugin provenance) can land without breaking library consumers who
201/// destructure the struct.
202#[non_exhaustive]
203#[derive(Debug, Clone)]
204pub struct SyncStep {
205    /// Name of the pack that owned the action.
206    pub pack: String,
207    /// 0-based index into the pack's top-level `actions` vector.
208    pub action_idx: usize,
209    /// The [`ExecStep`] record emitted by the executor.
210    pub exec_step: ExecStep,
211}
212
213/// Outcome of a [`run`] invocation.
214///
215/// On fail-fast termination, `halted` carries the error that stopped the
216/// sync; every completed step up to that point is still in `steps` so
217/// callers can render a partial transcript.
218///
219/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
220/// can land without breaking library consumers who destructure the struct.
221#[non_exhaustive]
222#[derive(Debug)]
223pub struct SyncReport {
224    /// Fully-walked pack graph (present even on halted runs).
225    pub graph: PackGraph,
226    /// Steps produced by the executor, in execution order.
227    pub steps: Vec<SyncStep>,
228    /// `Some(e)` if execution stopped before all actions ran.
229    pub halted: Option<SyncError>,
230    /// Non-fatal manifest-append warnings (one per failed event append).
231    /// Kept as a separate field because spec marks event-log write failures
232    /// as non-aborting.
233    pub event_log_warnings: Vec<String>,
234    /// `Some(r)` when the pre-run teardown scan found orphaned backup
235    /// files or dangling [`Event::ActionStarted`] records from a prior
236    /// crashed run. Informational only — the report is still returned and
237    /// the sync proceeds. CLI renderers should surface a warning so the
238    /// operator can decide whether to run a future `grex doctor` verb.
239    pub pre_run_recovery: Option<RecoveryReport>,
240    /// One entry per child whose legacy `.grex/workspace/<name>/` layout
241    /// was relocated (or considered for relocation) on this sync. Empty
242    /// when no legacy directory was found — the common case for any
243    /// workspace built fresh on v1.1.0+. CLI renderers should surface
244    /// the list so operators see what changed.
245    pub workspace_migrations: Vec<WorkspaceMigration>,
246}
247
248/// One legacy-layout migration attempt. `outcome` distinguishes the
249/// move-succeeded case from the don't-clobber-user-data case so CLI
250/// renderers can present different advice to the operator.
251#[non_exhaustive]
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct WorkspaceMigration {
254    /// Source path under the legacy `.grex/workspace/<name>/` location,
255    /// rendered relative to the pack root for log readability.
256    pub from: PathBuf,
257    /// Destination flat-sibling path `<pack_root>/<name>/`, relative to
258    /// the pack root.
259    pub to: PathBuf,
260    /// What happened.
261    pub outcome: MigrationOutcome,
262}
263
264/// Outcome of one legacy-layout migration attempt.
265#[non_exhaustive]
266#[derive(Debug, Clone, PartialEq, Eq)]
267pub enum MigrationOutcome {
268    /// Legacy directory was renamed onto the flat-sibling slot.
269    Migrated,
270    /// Both legacy and flat-sibling slots existed. Skipped — the user
271    /// must inspect and reconcile manually so we never silently delete
272    /// either.
273    SkippedBothExist,
274    /// Flat-sibling slot already had a non-grex file or directory in
275    /// the way. Skipped — refusing to clobber user data even when the
276    /// legacy slot is plainly the source of truth.
277    SkippedDestOccupied,
278    /// `fs::rename` failed (e.g. cross-volume, ACL denied). The legacy
279    /// directory is still in place; surfaced so the operator can move
280    /// it manually.
281    Failed { error: String },
282}
283
284/// Rich context attached to a [`SyncError::Halted`] variant.
285///
286/// Packages the pack + action position together with the underlying
287/// executor error and an optional human-readable recovery hint. Marked
288/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
289/// land without breaking `match` arms or struct destructures.
290#[non_exhaustive]
291#[derive(Debug)]
292pub struct HaltedContext {
293    /// Name of the pack that owned the halted action.
294    pub pack: String,
295    /// 0-based index into the pack's top-level `actions` vector.
296    pub action_idx: usize,
297    /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
298    pub action_name: String,
299    /// Underlying executor error.
300    pub error: ExecError,
301    /// Optional next-step suggestion for the operator. `None` when no
302    /// generic hint applies — the executor error's own `Display` already
303    /// tells the story.
304    pub recovery_hint: Option<String>,
305}
306
307/// Error taxonomy surfaced by [`run`].
308#[non_exhaustive]
309#[derive(Debug, Error)]
310pub enum SyncError {
311    /// The pack-tree walker failed (loader error, git error, cycle, …).
312    #[error("tree walk failed: {0}")]
313    Tree(#[from] TreeError),
314    /// One or more plan-phase validators flagged the graph.
315    #[error("validation failed: {errors:?}")]
316    Validation {
317        /// Aggregated errors from manifest-level + graph-level validators.
318        errors: Vec<PackValidationError>,
319    },
320    /// An action executor returned an error.
321    ///
322    /// Retained for backward compatibility; new call sites should prefer
323    /// [`SyncError::Halted`] which carries full pack + action context.
324    /// Kept non-deprecated because [`From<ExecError>`] still materialises
325    /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
326    #[error("action execution failed: {0}")]
327    Exec(#[from] ExecError),
328    /// Action execution halted; full context (pack, action index, error,
329    /// optional recovery hint) lives in [`HaltedContext`]. This is the
330    /// variant the sync driver emits — [`SyncError::Exec`] is only
331    /// surfaced by ancillary code paths.
332    #[error(
333        "sync halted at pack `{}` action #{} ({}): {}",
334        .0.pack, .0.action_idx, .0.action_name, .0.error
335    )]
336    Halted(Box<HaltedContext>),
337    /// Another `grex` process (or thread) already holds the workspace-level
338    /// lock. The running sync refused to start to avoid racing two concurrent
339    /// walkers into the same workspace. If the lock file at `lock_path` is
340    /// stale (no other grex is actually running), remove it by hand.
341    #[error(
342        "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
343    )]
344    WorkspaceBusy {
345        /// Resolved workspace directory that the current run tried to lock.
346        workspace: PathBuf,
347        /// Sidecar lock file that is currently held.
348        lock_path: PathBuf,
349    },
350    /// Reading or parsing the resolved-state lockfile failed. Surfaced as
351    /// its own variant (rather than folded into `Validation`) because a
352    /// corrupt / unreadable lockfile is an I/O or schema fault, not a
353    /// dependency-satisfaction fault. Resolution is operator-level
354    /// (restore a backup, delete the file, re-sync), not author-level.
355    #[error("lockfile `{path}` failed to load: {source}")]
356    Lockfile {
357        /// Lockfile path that failed to load.
358        path: PathBuf,
359        /// Underlying lockfile error.
360        #[source]
361        source: LockfileError,
362    },
363    /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
364    /// as its own variant so the CLI can map it to a dedicated usage
365    /// error exit code instead of the generic sync-failure bucket.
366    #[error("invalid --only glob `{pattern}`: {source}")]
367    InvalidOnlyGlob {
368        /// The raw pattern string that failed to compile.
369        pattern: String,
370        /// Underlying globset error.
371        #[source]
372        source: globset::Error,
373    },
374}
375
376impl Clone for SyncError {
377    fn clone(&self) -> Self {
378        // `TreeError` / `ExecError` do not implement `Clone` (they wrap
379        // `std::io::Error`-adjacent values). Halts carry only a display
380        // rendering in the report; we re-materialise via a synthetic
381        // `Validation` variant so `SyncReport` can be `Clone`-safe for
382        // observability tooling without widening the taxonomy.
383        match self {
384            Self::Tree(e) => Self::Validation {
385                errors: vec![PackValidationError::DependsOnUnsatisfied {
386                    pack: "<tree>".into(),
387                    required: e.to_string(),
388                }],
389            },
390            Self::Validation { errors } => Self::Validation { errors: errors.clone() },
391            Self::Exec(e) => Self::Validation {
392                errors: vec![PackValidationError::DependsOnUnsatisfied {
393                    pack: "<exec>".into(),
394                    required: e.to_string(),
395                }],
396            },
397            Self::Halted(ctx) => Self::Validation {
398                errors: vec![PackValidationError::DependsOnUnsatisfied {
399                    pack: ctx.pack.clone(),
400                    required: format!(
401                        "action #{} ({}): {}",
402                        ctx.action_idx, ctx.action_name, ctx.error
403                    ),
404                }],
405            },
406            Self::WorkspaceBusy { workspace, lock_path } => {
407                Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
408            }
409            Self::Lockfile { path, source } => Self::Validation {
410                errors: vec![PackValidationError::DependsOnUnsatisfied {
411                    pack: "<lockfile>".into(),
412                    required: format!("{}: {source}", path.display()),
413                }],
414            },
415            Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
416                errors: vec![PackValidationError::DependsOnUnsatisfied {
417                    pack: "<only-glob>".into(),
418                    required: format!("{pattern}: {source}"),
419                }],
420            },
421        }
422    }
423}
424
425/// Run a full sync over the pack tree rooted at `pack_root`.
426///
427/// Resolution rules:
428/// * If `pack_root` is a directory the walker looks for
429///   `<pack_root>/.grex/pack.yaml`.
430/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
431/// * Workspace defaults to the pack root directory itself when
432///   `opts.workspace` is `None`. Children resolve as flat siblings of the
433///   parent pack root (since v1.1.0).
434///
435/// # Errors
436///
437/// Returns the first error that halts the pipeline — see [`SyncError`] for
438/// the taxonomy.
439///
440/// `cancel` is the cooperative cancellation handle threaded through the
441/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
442/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
443/// acquire). CLI callers pass a never-cancelled sentinel
444/// (`CancellationToken::new()`); the MCP server passes a token tied to
445/// the request lifetime.
446pub fn run(
447    pack_root: &Path,
448    opts: &SyncOptions,
449    cancel: &CancellationToken,
450) -> Result<SyncReport, SyncError> {
451    // Stage 2 is signature-only — silence "unused parameter" without
452    // hiding it behind `_` (downstream stages will read it).
453    let _ = cancel;
454    let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
455    ensure_workspace_dir(&workspace)?;
456    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
457    let _ws_guard = match ws_lock.try_acquire() {
458        Ok(Some(g)) => g,
459        Ok(None) => {
460            return Err(SyncError::WorkspaceBusy {
461                workspace: workspace.clone(),
462                lock_path: ws_lock_path,
463            });
464        }
465        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
466    };
467
468    // Compile `--only` patterns into a GlobSet here so the
469    // `globset` crate version does not leak into `SyncOptions`.
470    let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
471
472    // Auto-migrate legacy `.grex/workspace/<name>/` layout BEFORE the
473    // walker resolves children. Idempotent: a fresh v1.1.0+ workspace
474    // sees no legacy directory and the function no-ops.
475    let workspace_migrations = migrate_legacy_workspace(pack_root);
476
477    let graph =
478        walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
479    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
480    log_force_flag(opts.force);
481
482    let mut report = SyncReport {
483        graph,
484        steps: Vec::new(),
485        halted: None,
486        event_log_warnings: Vec::new(),
487        pre_run_recovery: prep.pre_run_recovery,
488        workspace_migrations,
489    };
490
491    let mut next_lock = prep.prior_lock.clone();
492    // feat-m6 B1: resolve `--parallel` once and build the scheduler
493    // shared across every `ExecCtx` in this run. Library callers who
494    // leave `opts.parallel == None` default to `num_cpus::get()` here
495    // (clamped `>= 1`) so the scheduler slot is always populated —
496    // `ctx.scheduler` being `None` would strand acquire-sites into
497    // unbounded concurrency. See `.omne/cfg/concurrency.md` §Scheduler.
498    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
499    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
500    run_actions(
501        &mut report,
502        &prep.order,
503        &prep.vars,
504        &workspace,
505        &prep.event_log,
506        &prep.lock_path,
507        opts.dry_run,
508        &prep.prior_lock,
509        &mut next_lock,
510        &prep.registry,
511        &prep.pack_type_registry,
512        only_set.as_ref(),
513        opts.force,
514        resolved_parallel,
515        &scheduler,
516    );
517
518    persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
519    Ok(report)
520}
521
522/// Bag of context pieces assembled once at the top of [`run`]. Grouping
523/// them keeps [`run`] under the workspace's 50-LOC function lint without
524/// smearing the read of sequential setup across helpers. Fields are
525/// consumed piecemeal by the actions loop; no getters needed.
526struct RunContext {
527    order: Vec<usize>,
528    vars: VarEnv,
529    event_log: PathBuf,
530    lock_path: PathBuf,
531    lockfile_path: PathBuf,
532    prior_lock: std::collections::HashMap<String, LockEntry>,
533    registry: Arc<Registry>,
534    pack_type_registry: Arc<PackTypeRegistry>,
535    pre_run_recovery: Option<RecoveryReport>,
536}
537
538/// Build the per-run context: traversal order, vars env, event/lockfile
539/// paths, prior lockfile state, bootstrap registry, and (optionally) a
540/// pre-run recovery scan. Kept narrow so [`run`] stays small.
541///
542/// `workspace` is the resolved workspace directory (post `--workspace`
543/// override) so the recovery scan looks for `.grex.bak` artefacts under
544/// the actual on-disk location children were materialised at — not
545/// under the pack root, which differs from the workspace whenever the
546/// CLI's `--workspace` flag is used. Pre-fix this anchoring drift
547/// caused recovery scans to miss every backup left under an override
548/// workspace.
549fn prepare_run_context(
550    pack_root: &Path,
551    graph: &PackGraph,
552    workspace: &Path,
553) -> Result<RunContext, SyncError> {
554    let event_log = event_log_path(pack_root);
555    let lock_path = event_lock_path(&event_log);
556    let vars = VarEnv::from_os();
557    let order = post_order(graph);
558    let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
559    let lockfile_path = lockfile_path(pack_root);
560    let prior_lock = load_prior_lock(&lockfile_path)?;
561    let registry = Arc::new(Registry::bootstrap());
562    let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
563    Ok(RunContext {
564        order,
565        vars,
566        event_log,
567        lock_path,
568        lockfile_path,
569        prior_lock,
570        registry,
571        pack_type_registry,
572        pre_run_recovery,
573    })
574}
575
576/// Build the [`PackTypeRegistry`] the sync driver threads into every
577/// [`ExecCtx`] it constructs.
578///
579/// Default path (no `plugin-inventory` feature) hard-codes the three
580/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
581/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
582/// externally-submitted plugin types (mirroring the M4-E pattern for
583/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
584/// helper so the `#[cfg]` split lives in one place instead of being
585/// smeared across every sync call-site.
586fn bootstrap_pack_type_registry() -> PackTypeRegistry {
587    #[cfg(feature = "plugin-inventory")]
588    {
589        let mut reg = PackTypeRegistry::bootstrap();
590        reg.register_from_inventory();
591        reg
592    }
593    #[cfg(not(feature = "plugin-inventory"))]
594    {
595        PackTypeRegistry::bootstrap()
596    }
597}
598
599/// Emit a single `tracing::info!` line when `--force` is active so
600/// operators can confirm from logs that the skip short-circuit was
601/// bypassed. Extracted so [`run`] stays small.
602fn log_force_flag(force: bool) {
603    if force {
604        tracing::info!(
605            target: "grex::sync",
606            "--force active: bypassing lockfile skip-on-hash short-circuit"
607        );
608    }
609}
610
611/// Walk the pack tree rooted at `pack_root`, optionally running the
612/// plan-phase validators. Extracted so [`run`] stays under the
613/// workspace's 50-LOC per-function lint threshold.
614fn walk_and_validate(
615    pack_root: &Path,
616    workspace: &Path,
617    validate: bool,
618    ref_override: Option<&str>,
619) -> Result<PackGraph, SyncError> {
620    let loader = FsPackLoader::new();
621    let backend = GixBackend::new();
622    let walker = Walker::new(&loader, &backend, workspace.to_path_buf())
623        .with_ref_override(ref_override.map(str::to_string));
624    let graph = walker.walk(pack_root)?;
625    if validate {
626        validate_graph(&graph)?;
627    }
628    Ok(graph)
629}
630
631/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
632/// empty map; parse errors are fatal since writes are atomic and a torn
633/// lockfile therefore indicates real corruption that must be resolved
634/// before a fresh sync is safe. Parse/IO failures surface as
635/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
636/// dependency-satisfaction fault, so it gets its own taxonomy slot.
637fn load_prior_lock(
638    lockfile_path: &Path,
639) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
640    read_lockfile(lockfile_path)
641        .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
642}
643
644/// Persist `next_lock` atomically to `lockfile_path` whenever this was
645/// not a dry-run. On a halt the map has already had the halted pack's
646/// entry removed (see `run_actions`), so persisting now preserves every
647/// *successful* pack's fresh entry while guaranteeing absence of an
648/// entry for the halted pack — next sync sees no prior hash there and
649/// re-executes from scratch (route (b) halt-state gating). Write errors
650/// surface as non-fatal warnings on the report.
651fn persist_lockfile_if_clean(
652    report: &mut SyncReport,
653    lockfile_path: &Path,
654    next_lock: &std::collections::HashMap<String, LockEntry>,
655    dry_run: bool,
656) {
657    if dry_run {
658        return;
659    }
660    if let Err(e) = write_lockfile(lockfile_path, next_lock) {
661        tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
662        report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
663    }
664}
665
666/// Canonical location of the resolved-state lockfile
667/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
668/// so both audit artifacts live under a single `.grex/` sidecar.
669fn lockfile_path(pack_root: &Path) -> PathBuf {
670    pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
671}
672
673/// Create the workspace directory if it does not yet exist.
674fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
675    if !workspace.exists() {
676        std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
677            errors: vec![PackValidationError::DependsOnUnsatisfied {
678                pack: "<workspace>".into(),
679                required: format!("{}: {e}", workspace.display()),
680            }],
681        })?;
682    }
683    Ok(())
684}
685
686/// Open (but do not acquire) the workspace-level lock file.
687fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
688    let ws_lock_path = workspace_lock_path(workspace);
689    let ws_lock = ScopedLock::open(&ws_lock_path)
690        .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
691    Ok((ws_lock, ws_lock_path))
692}
693
694/// Build a `Validation` error describing a workspace-lock failure.
695fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
696    SyncError::Validation {
697        errors: vec![PackValidationError::DependsOnUnsatisfied {
698            pack: "<workspace-lock>".into(),
699            required: format!("{}: {reason}", ws_lock_path.display()),
700        }],
701    }
702}
703
704/// Single source of truth for the legacy workspace directory name.
705/// Pre-`v1.1.0` `resolve_workspace` joined `.grex/workspace/` onto the
706/// pack root by default; the auto-migration in
707/// [`migrate_legacy_workspace`] is the only place that legacy literal
708/// is allowed to appear in `crates/grex-core/src/`. The grep gate in
709/// the v1.1.0 release checklist allows this one constant.
710const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
711
712/// Auto-migrate any legacy `.grex/workspace/<name>/` child layout left
713/// over from v1.0.x to the v1.1.0 flat-sibling layout. Idempotent: a
714/// fresh workspace built on v1.1.0+ sees no `.grex/workspace/`
715/// directory and the function no-ops.
716///
717/// Per-child outcomes:
718///
719/// * **Both legacy + flat-sibling exist** → `SkippedBothExist`. The
720///   user needs to inspect (perhaps the legacy is stale, perhaps it is
721///   the source of truth); we never silently delete either.
722/// * **Flat-sibling slot occupied by a non-grex file or non-empty dir**
723///   → `SkippedDestOccupied`. Refuse to clobber user data.
724/// * **Legacy exists, flat-sibling absent** → `Migrated` via atomic
725///   `fs::rename`. Same-volume move is the common case (the migration
726///   stays inside `pack_root`); cross-volume failures surface as
727///   `Failed { error }` with the OS message so the operator can move
728///   manually.
729/// * **Legacy absent** → silent no-op (not recorded in the report).
730///
731/// After all per-child decisions: orphan `.grex.sync.lock` under the
732/// legacy workspace is removed (best-effort) and the empty
733/// `.grex/workspace/` directory is rmdir'd (best-effort). Both are
734/// soft-failures: leaving them on disk is harmless, surfacing the
735/// errors as a sync abort would be over-strict.
736///
737/// Discovery is by directory listing, not by parent-manifest parse —
738/// migration must work even when the parent manifest itself was
739/// rewritten between versions. A child counts as "legacy" iff
740/// `<pack_root>/<LEGACY_WORKSPACE_DIR>/<name>/.git` exists (i.e. it is
741/// an actual git working tree, not stray metadata).
742fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
743    let root = pack_root_dir(pack_root);
744    let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
745    if !legacy_root.is_dir() {
746        return Vec::new();
747    }
748    let entries = match fs::read_dir(&legacy_root) {
749        Ok(e) => e,
750        Err(e) => {
751            tracing::warn!(
752                target: "grex::sync::migrate",
753                "legacy workspace `{}` unreadable: {e}",
754                legacy_root.display(),
755            );
756            return Vec::new();
757        }
758    };
759    let mut migrations = Vec::new();
760    for entry_result in entries {
761        let entry = match entry_result {
762            Ok(e) => e,
763            Err(e) => {
764                tracing::warn!(
765                    target: "grex::sync::migrate",
766                    "skipping unreadable entry under `{}`: {e}",
767                    legacy_root.display(),
768                );
769                continue;
770            }
771        };
772        let Ok(ft) = entry.file_type() else { continue };
773        // file_type avoids symlink-following; legitimate v1.0.x children
774        // were always real directories, so anything else is skipped.
775        if ft.is_symlink() || !ft.is_dir() {
776            continue;
777        }
778        let name_os = entry.file_name();
779        let Some(name) = name_os.to_str() else { continue };
780        // Only act on entries that look like real cloned children (have
781        // a `.git`). The legacy workspace lock file (`.grex.sync.lock`)
782        // is not a directory and is filtered out by the dir check above;
783        // we clean it up explicitly after the migration loop completes.
784        let from_abs = entry.path();
785        if !from_abs.join(".git").exists() {
786            continue;
787        }
788        let to_abs = root.join(name);
789        let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
790        let to_rel = PathBuf::from(name);
791        let outcome = decide_and_migrate(&from_abs, &to_abs);
792        log_migration(&from_rel, &to_rel, &outcome);
793        migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
794    }
795    cleanup_legacy_workspace_root(&legacy_root);
796    migrations
797}
798
799/// Decide what to do with one legacy child + perform the move when
800/// safe. Returns the outcome to record on the [`WorkspaceMigration`].
801fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
802    let dest_exists = to.exists();
803    let dest_is_grex_repo = dest_exists && to.join(".git").exists();
804    if dest_is_grex_repo {
805        // Both legacy and flat-sibling are git repos. Refuse to choose
806        // between them; let the user resolve.
807        return MigrationOutcome::SkippedBothExist;
808    }
809    if dest_exists {
810        // Some other entry occupies the flat-sibling slot — a stray
811        // file, an empty dir, an unrelated dir. Treat as user data and
812        // leave both in place.
813        return MigrationOutcome::SkippedDestOccupied;
814    }
815    match fs::rename(from, to) {
816        Ok(()) => MigrationOutcome::Migrated,
817        Err(e) => MigrationOutcome::Failed { error: e.to_string() },
818    }
819}
820
821/// Emit one structured log line per migration so users see exactly what
822/// happened during the upgrade. Severity matches outcome: success is
823/// `info`, skips and failures are `warn` so they surface in the default
824/// log level without forcing operators to crank verbosity.
825fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
826    let from_disp = from.display();
827    let to_disp = to.display();
828    match outcome {
829        MigrationOutcome::Migrated => {
830            tracing::info!(
831                target: "grex::sync::migrate",
832                "migrated: legacy={from_disp} -> new={to_disp}",
833            );
834        }
835        MigrationOutcome::SkippedBothExist => {
836            tracing::warn!(
837                target: "grex::sync::migrate",
838                "skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
839            );
840        }
841        MigrationOutcome::SkippedDestOccupied => {
842            tracing::warn!(
843                target: "grex::sync::migrate",
844                "skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
845            );
846        }
847        MigrationOutcome::Failed { error } => {
848            tracing::warn!(
849                target: "grex::sync::migrate",
850                "failed: legacy={from_disp} -> new={to_disp}: {error}",
851            );
852        }
853    }
854}
855
856/// Best-effort cleanup of the legacy workspace root after migration:
857/// remove the orphan `.grex.sync.lock` (always safe — the v1.1.0
858/// workspace lock lives at `<pack_root>/.grex.sync.lock`) and try to
859/// rmdir the now-empty `.grex/workspace/` directory. Errors are logged
860/// at trace level only — both leftovers are harmless.
861fn cleanup_legacy_workspace_root(legacy_root: &Path) {
862    let orphan_lock = legacy_root.join(".grex.sync.lock");
863    if orphan_lock.exists() {
864        if let Err(e) = fs::remove_file(&orphan_lock) {
865            tracing::warn!(
866                target: "grex::sync::migrate",
867                "could not remove orphan lock `{}`: {e}",
868                orphan_lock.display(),
869            );
870        } else {
871            tracing::info!(
872                target: "grex::sync::migrate",
873                "removed orphan lock `{}`",
874                orphan_lock.display(),
875            );
876        }
877    }
878    // `remove_dir` only succeeds when the directory is empty — exactly
879    // what we want; if any unmigrated child remains, the legacy root
880    // stays put for the operator to inspect.
881    let _ = fs::remove_dir(legacy_root);
882}
883
884/// Compute the default workspace path when `override_` is absent.
885///
886/// The default is the pack root directory itself, so child packs
887/// resolve as flat siblings of the parent pack root. The rationale —
888/// alignment with the long-standing pack-spec rule that
889/// `children[].path` is a bare name — lives in the pack-spec
890/// "Validation rules" section (`man/concepts/pack-spec.md` /
891/// `grex-doc/src/concepts/pack-spec.md`).
892fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> PathBuf {
893    if let Some(p) = override_ {
894        return p.to_path_buf();
895    }
896    pack_root_dir(pack_root)
897}
898
899/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
900fn pack_root_dir(pack_root: &Path) -> PathBuf {
901    let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
902    if is_yaml {
903        pack_root
904            .parent()
905            .and_then(Path::parent)
906            .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
907    } else {
908        pack_root.to_path_buf()
909    }
910}
911
912/// Compute the `.grex/grex.jsonl` path next to the pack root.
913fn event_log_path(pack_root: &Path) -> PathBuf {
914    pack_root_dir(pack_root).join(".grex").join("grex.jsonl")
915}
916
917/// Compute the sidecar lock path next to the event log. One canonical slot
918/// per pack root — cooperating grex procs serialize through this file.
919fn event_lock_path(event_log: &Path) -> PathBuf {
920    event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
921}
922
923/// Compute the sidecar lock path for the workspace itself. Lives at
924/// `<workspace>/.grex.sync.lock` — the workspace dir is already created by
925/// the `run()` prologue, so the lock sidecar lands beside the child clones.
926fn workspace_lock_path(workspace: &Path) -> PathBuf {
927    workspace.join(".grex.sync.lock")
928}
929
930/// Aggregate manifest-level + graph-level validators and return their output.
931fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
932    let mut errors: Vec<PackValidationError> = Vec::new();
933    for node in graph.nodes() {
934        if let Err(mut e) = node.manifest.validate_plan() {
935            errors.append(&mut e);
936        }
937    }
938    if let Err(mut e) = graph.validate() {
939        errors.append(&mut e);
940    }
941    if errors.is_empty() {
942        Ok(())
943    } else {
944        Err(SyncError::Validation { errors })
945    }
946}
947
948/// Depth-first post-order traversal of the graph starting from root.
949///
950/// Children fully precede their parent in the returned vector so downstream
951/// executors install leaves first and the root last.
952fn post_order(graph: &PackGraph) -> Vec<usize> {
953    let mut out = Vec::with_capacity(graph.nodes().len());
954    visit_post(graph, 0, &mut out);
955    out
956}
957
958fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
959    // Collect child ids first to avoid borrow conflicts with graph iteration.
960    let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
961    for k in kids {
962        visit_post(graph, k, out);
963    }
964    out.push(id);
965}
966
967/// Drive every action for every node; abort on the first [`ExecError`].
968///
969/// Each action is bracketed by three manifest events:
970/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
971/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
972/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
973///
974/// All three writes go through the same [`ManifestLock`]-wrapped path
975/// ([`append_manifest_event`]) and failures are recorded as non-fatal
976/// warnings so the executor's outcome always dominates. The third append
977/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
978/// with the exact action that halted.
979// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
980// now pushes past the 50-LOC per-function lint by one line. Silence
981// that one — the body itself is unchanged in scope.
982#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
983fn run_actions(
984    report: &mut SyncReport,
985    order: &[usize],
986    vars: &VarEnv,
987    workspace: &Path,
988    event_log: &Path,
989    lock_path: &Path,
990    dry_run: bool,
991    prior_lock: &std::collections::HashMap<String, LockEntry>,
992    next_lock: &mut std::collections::HashMap<String, LockEntry>,
993    registry: &Arc<Registry>,
994    pack_type_registry: &Arc<PackTypeRegistry>,
995    only: Option<&GlobSet>,
996    force: bool,
997    parallel: usize,
998    scheduler: &Arc<Scheduler>,
999) {
1000    let plan = PlanExecutor::with_registry(registry.clone());
1001    let fs = FsExecutor::with_registry(registry.clone());
1002    let rt = build_pack_type_runtime(parallel);
1003    let visited_meta = new_visited_meta();
1004    for &id in order {
1005        let Some(node) = report.graph.node(id) else { continue };
1006        let pack_name = node.name.clone();
1007        let pack_path = node.path.clone();
1008        let actions = node.manifest.actions.clone();
1009        let manifest = node.manifest.clone();
1010        let commit_sha = node.commit_sha.clone().unwrap_or_default();
1011        let synthetic = node.synthetic;
1012        // `--only` filter + skip-on-hash short-circuits colocated in
1013        // `try_skip_or_filter` so this outer loop stays within the
1014        // 50-LOC per-function budget.
1015        if try_skip_or_filter(
1016            report,
1017            only,
1018            &pack_name,
1019            &pack_path,
1020            &actions,
1021            &commit_sha,
1022            synthetic,
1023            workspace,
1024            prior_lock,
1025            next_lock,
1026            dry_run,
1027            force,
1028        ) {
1029            continue;
1030        }
1031        let pack_halted = run_pack_lifecycle(
1032            report,
1033            vars,
1034            workspace,
1035            event_log,
1036            lock_path,
1037            dry_run,
1038            &plan,
1039            &fs,
1040            registry,
1041            pack_type_registry,
1042            &rt,
1043            &pack_name,
1044            &pack_path,
1045            &manifest,
1046            &visited_meta,
1047            scheduler,
1048        );
1049        if pack_halted {
1050            // Route (b) halt-state gating: drop any prior entry for the
1051            // halted pack so the next sync sees no prior hash and
1052            // re-executes from scratch. Successful packs in this same
1053            // run keep their freshly-upserted entries, and packs we did
1054            // not reach keep their prior entries untouched.
1055            next_lock.remove(&pack_name);
1056            return;
1057        }
1058        // Successful pack — record a fresh lockfile entry so the next
1059        // run's skip-on-hash test can succeed. Commit SHA is now plumbed
1060        // from the walker (M4-D): `PackNode::commit_sha` carries the
1061        // resolved HEAD SHA when the pack's working tree is a git
1062        // repository, otherwise an empty string keeps the hash stable.
1063        let actions_hash = compute_actions_hash(&actions, &commit_sha);
1064        upsert_lock_entry(prior_lock, next_lock, &pack_name, &commit_sha, &actions_hash, synthetic);
1065    }
1066}
1067
1068/// Build the multi-thread tokio runtime used to drive async pack-type
1069/// plugin dispatch. Pack-type plugins expose `async fn` methods via
1070/// `async_trait`, but the sync driver is synchronous end-to-end — we
1071/// block on each plugin future inside the outer action loop. Extracted
1072/// into a standalone helper so the runtime construction does not
1073/// inflate `run_actions` beyond the 50-LOC per-function budget.
1074///
1075/// # Multi-thread rationale (M5-2c)
1076///
1077/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
1078/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
1079/// itself is purely `async` / `.await` (no nested `block_on`), but future
1080/// plugin authors may reasonably compose `block_on` calls inside
1081/// lifecycle hooks — and external callers that drive `MetaPlugin` via
1082/// `rt.block_on(...)` within their own runtime would deadlock on a
1083/// current-thread runtime the moment a hook re-enters. A multi-thread
1084/// runtime with a small worker pool lets those re-entries resolve on a
1085/// sibling worker instead of blocking the dispatcher thread.
1086///
1087/// # Worker-thread sizing (feat-m6 H6)
1088///
1089/// The worker pool is sized from the resolved `--parallel` knob so the
1090/// runtime always has enough workers to service every in-flight pack op
1091/// plus at least one sibling for nested `block_on`. Clamped to
1092/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
1093/// one sibling so re-entrant hooks never deadlock), and the upper bound
1094/// caps the pool at the host's CPU count so `--parallel 0`
1095/// (unbounded-semantics) does not explode the worker count.
1096fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
1097    let workers = parallel.clamp(2, num_cpus::get().max(2));
1098    tokio::runtime::Builder::new_multi_thread()
1099        .worker_threads(workers)
1100        .enable_all()
1101        .build()
1102        .expect("tokio runtime for pack-type dispatch")
1103}
1104
1105/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
1106/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
1107/// the argument is threaded through so future explicit-install /
1108/// teardown verbs can share the same set shape.
1109fn new_visited_meta() -> MetaVisitedSet {
1110    std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
1111}
1112
1113/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
1114/// `true` when the outer loop should `continue` for this pack.
1115///
1116/// Extracted from `run_actions` so that function stays under the
1117/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
1118/// is a pure structural refactor.
1119#[allow(clippy::too_many_arguments)]
1120fn try_skip_or_filter(
1121    report: &mut SyncReport,
1122    only: Option<&GlobSet>,
1123    pack_name: &str,
1124    pack_path: &Path,
1125    actions: &[Action],
1126    commit_sha: &str,
1127    current_synthetic: bool,
1128    workspace: &Path,
1129    prior_lock: &std::collections::HashMap<String, LockEntry>,
1130    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1131    dry_run: bool,
1132    force: bool,
1133) -> bool {
1134    if skip_for_only_filter(only, pack_name, pack_path, workspace) {
1135        if let Some(prev) = prior_lock.get(pack_name) {
1136            next_lock.insert(pack_name.to_string(), prev.clone());
1137        }
1138        return true;
1139    }
1140    try_skip_pack(
1141        report,
1142        pack_name,
1143        pack_path,
1144        actions,
1145        commit_sha,
1146        current_synthetic,
1147        prior_lock,
1148        next_lock,
1149        dry_run,
1150        force,
1151    )
1152}
1153
1154/// Return `true` when `--only` is active and the pack's
1155/// **workspace-relative path** (normalized to forward-slash form) does
1156/// not match any of the registered globs. Name-fallback matching was
1157/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
1158/// "pack paths" and cross-platform consistency requires a single
1159/// normalized representation rather than `display()`-formatted strings
1160/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
1161/// as a glob-escape, not a path separator). For the root pack whose
1162/// `pack_path` is not under `workspace`, the fallback is to match
1163/// against the absolute path's forward-slash form.
1164fn skip_for_only_filter(
1165    only: Option<&GlobSet>,
1166    pack_name: &str,
1167    pack_path: &Path,
1168    workspace: &Path,
1169) -> bool {
1170    let Some(set) = only else { return false };
1171    let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
1172    let rel_str = rel.to_string_lossy().replace('\\', "/");
1173    let matches = set.is_match(&rel_str);
1174    if !matches {
1175        tracing::info!(
1176            target: "grex::sync",
1177            "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
1178        );
1179    }
1180    !matches
1181}
1182
1183/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
1184///
1185/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
1186/// with a pack-type-aware dispatch:
1187///
1188/// * [`PackType::Declarative`] retains the per-action execution shape that
1189///   M4 shipped — each action lands its own `ActionStarted` /
1190///   `ActionCompleted` / `ActionHalted` event bracket. The registry is
1191///   still consulted via [`PackTypeRegistry::get`] as a name-oracle so
1192///   mistyped packs fail closed.
1193/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
1194///   pack-type plugin's `sync` method (the sync CLI verb is the only
1195///   caller in M5-1; `install` / `update` / `teardown` verbs wire in
1196///   M5-2), returning a single aggregate [`ExecStep`]. A single event
1197///   bracket frames the async call.
1198///
1199/// Declarative is kept on the legacy per-action path because its event log
1200/// semantics (one event per action, per-step rollback context) are exactly
1201/// what plugin authors expect to observe. Unifying declarative under the
1202/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
1203/// to emit a step stream rather than a single aggregate.
1204#[allow(clippy::too_many_arguments)]
1205fn run_pack_lifecycle(
1206    report: &mut SyncReport,
1207    vars: &VarEnv,
1208    workspace: &Path,
1209    event_log: &Path,
1210    lock_path: &Path,
1211    dry_run: bool,
1212    plan: &PlanExecutor,
1213    fs: &FsExecutor,
1214    registry: &Arc<Registry>,
1215    pack_type_registry: &Arc<PackTypeRegistry>,
1216    rt: &tokio::runtime::Runtime,
1217    pack_name: &str,
1218    pack_path: &Path,
1219    manifest: &crate::pack::PackManifest,
1220    visited_meta: &MetaVisitedSet,
1221    scheduler: &Arc<Scheduler>,
1222) -> bool {
1223    let type_tag = manifest.r#type.as_str();
1224    // Name-oracle check: every pack type must be registered. Unknown
1225    // pack types halt the pack the same way M4 halted unknown actions.
1226    if pack_type_registry.get(type_tag).is_none() {
1227        let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1228        record_action_err(report, event_log, lock_path, pack_name, 0, "pack-type", err);
1229        return true;
1230    }
1231    match manifest.r#type {
1232        crate::pack::PackType::Declarative => run_declarative_actions(
1233            report,
1234            vars,
1235            workspace,
1236            event_log,
1237            lock_path,
1238            dry_run,
1239            plan,
1240            fs,
1241            pack_name,
1242            pack_path,
1243            manifest,
1244            &manifest.actions,
1245            scheduler,
1246        ),
1247        crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
1248            report,
1249            vars,
1250            workspace,
1251            event_log,
1252            lock_path,
1253            registry,
1254            pack_type_registry,
1255            rt,
1256            pack_name,
1257            pack_path,
1258            manifest,
1259            type_tag,
1260            visited_meta,
1261            scheduler,
1262        ),
1263    }
1264}
1265
1266/// Run a declarative pack's actions sequentially. Preserves the M4
1267/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1268/// `ActionHalted`). Returns `true` when the sync must halt.
1269#[allow(clippy::too_many_arguments)]
1270fn run_declarative_actions(
1271    report: &mut SyncReport,
1272    vars: &VarEnv,
1273    workspace: &Path,
1274    event_log: &Path,
1275    lock_path: &Path,
1276    dry_run: bool,
1277    plan: &PlanExecutor,
1278    fs: &FsExecutor,
1279    pack_name: &str,
1280    pack_path: &Path,
1281    manifest: &crate::pack::PackManifest,
1282    actions: &[Action],
1283    scheduler: &Arc<Scheduler>,
1284) -> bool {
1285    // `apply_gitignore` is called per-lifecycle by each PackTypePlugin
1286    // for meta/scripted, and here for declarative (which bypasses the
1287    // plugin in `sync::run`'s per-action driver). Keeping plugins as
1288    // the single apply site everywhere else means the declarative
1289    // per-action path is the only code outside the PackTypePlugin
1290    // surface that needs a direct apply call.
1291    if !dry_run {
1292        let ctx = ExecCtx::new(vars, pack_path, workspace)
1293            .with_platform(Platform::current())
1294            .with_scheduler(scheduler);
1295        if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1296            record_action_err(report, event_log, lock_path, pack_name, 0, "gitignore", e);
1297            return true;
1298        }
1299    }
1300    for (idx, action) in actions.iter().enumerate() {
1301        let ctx = ExecCtx::new(vars, pack_path, workspace)
1302            .with_platform(Platform::current())
1303            .with_scheduler(scheduler);
1304        let action_tag = action_kind_tag(action);
1305        append_manifest_event(
1306            event_log,
1307            lock_path,
1308            &Event::ActionStarted {
1309                ts: Utc::now(),
1310                pack: pack_name.to_string(),
1311                action_idx: idx,
1312                action_name: action_tag.to_string(),
1313            },
1314            &mut report.event_log_warnings,
1315        );
1316        let step_result =
1317            if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1318        if !record_action_outcome(
1319            report,
1320            event_log,
1321            lock_path,
1322            pack_name,
1323            idx,
1324            action_tag,
1325            step_result,
1326        ) {
1327            return true;
1328        }
1329    }
1330    false
1331}
1332
1333/// Dispatch a pack-type plugin (meta / scripted) through the async
1334/// registry. Brackets the call with a single `ActionStarted` /
1335/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1336/// when the sync must halt.
1337#[allow(clippy::too_many_arguments)]
1338fn dispatch_pack_type_plugin(
1339    report: &mut SyncReport,
1340    vars: &VarEnv,
1341    workspace: &Path,
1342    event_log: &Path,
1343    lock_path: &Path,
1344    registry: &Arc<Registry>,
1345    pack_type_registry: &Arc<PackTypeRegistry>,
1346    rt: &tokio::runtime::Runtime,
1347    pack_name: &str,
1348    pack_path: &Path,
1349    manifest: &crate::pack::PackManifest,
1350    type_tag: &'static str,
1351    visited_meta: &MetaVisitedSet,
1352    scheduler: &Arc<Scheduler>,
1353) -> bool {
1354    // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1355    // The sync driver already walks children in post-order via the tree
1356    // walker; attaching the visited set would trigger MetaPlugin's
1357    // real-recursion branch and cause double dispatch (walker runs child
1358    // packs as their own graph nodes, then MetaPlugin would recurse into
1359    // them again). The `visited_meta` parameter is kept on the argument
1360    // list so future explicit-install / teardown verbs that invoke
1361    // MetaPlugin directly can share the same set shape.
1362    let _ = visited_meta;
1363    let ctx = ExecCtx::new(vars, pack_path, workspace)
1364        .with_platform(Platform::current())
1365        .with_registry(registry)
1366        .with_pack_type_registry(pack_type_registry)
1367        .with_scheduler(scheduler);
1368    append_manifest_event(
1369        event_log,
1370        lock_path,
1371        &Event::ActionStarted {
1372            ts: Utc::now(),
1373            pack: pack_name.to_string(),
1374            action_idx: 0,
1375            action_name: type_tag.to_string(),
1376        },
1377        &mut report.event_log_warnings,
1378    );
1379    // SAFETY: `get` just confirmed the plugin is registered for
1380    // `type_tag`, so this unwrap cannot panic under the matched arm.
1381    let plugin = pack_type_registry
1382        .get(type_tag)
1383        .expect("pack-type plugin must be registered (guarded above)");
1384    // feat-m6 CI fix — establish a task-local tier stack frame for every
1385    // async dispatch. Without this, `TierGuard::push` (which runs inside
1386    // the plugin lifecycle and may span `.await` / thread hops under the
1387    // multi-thread runtime) has no enforcement frame to push into.
1388    let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1389    !record_action_outcome(report, event_log, lock_path, pack_name, 0, type_tag, step_result)
1390}
1391
1392/// Pure skip-eligibility decision. Returns `Some(hash)` when the pack
1393/// is eligible for the hash-skip short-circuit, `None` otherwise.
1394///
1395/// Splitting the decision out of [`try_skip_pack`] keeps the
1396/// side-effecting transcript bookkeeping testable in isolation: the
1397/// v1.1.1 synthetic-flag-flip regression exercises this helper without
1398/// having to stand up a `SyncReport` / `PackGraph`.
1399fn skip_eligibility(
1400    actions: &[Action],
1401    commit_sha: &str,
1402    current_synthetic: bool,
1403    prior: &LockEntry,
1404    dry_run: bool,
1405    force: bool,
1406) -> Option<String> {
1407    if dry_run || force {
1408        // Dry runs must always produce the planned-step transcript so
1409        // authors can see what `sync` *would* do. `--force` is the
1410        // operator's explicit opt-out from the hash short-circuit.
1411        return None;
1412    }
1413    let hash = compute_actions_hash(actions, commit_sha);
1414    if prior.actions_hash != hash {
1415        return None;
1416    }
1417    if prior.synthetic != current_synthetic {
1418        // Pack-shape flipped between runs (real ↔ synthetic). Even
1419        // when the actions hash matches by coincidence (e.g. a
1420        // declarative pack with empty `actions[]` whose pack.yaml was
1421        // deleted, falling through to a synthetic leaf with the same
1422        // empty actions list and stable commit SHA), we must NOT
1423        // carry the stale `synthetic` flag forward. Forcing the
1424        // upsert path re-emits the entry with the current flag.
1425        return None;
1426    }
1427    Some(hash)
1428}
1429
1430/// Decide whether `pack_name` can be short-circuited via a lockfile
1431/// hash match. When the prior hash matches the freshly-computed hash,
1432/// emit a single [`ExecResult::Skipped`] step and carry the prior
1433/// lockfile entry forward unchanged. Returns `true` when the pack was
1434/// skipped.
1435///
1436/// `current_synthetic` is the walker-derived synthetic flag for this
1437/// pack on the current run. The skip eligibility check requires it to
1438/// match `prior.synthetic` so a pack-shape transition (e.g. user
1439/// deletes `pack.yaml` so a previously-real pack now walks as
1440/// synthetic) invalidates the skip and forces the lockfile entry to
1441/// be re-emitted with the fresh `synthetic` value.
1442#[allow(clippy::too_many_arguments)]
1443fn try_skip_pack(
1444    report: &mut SyncReport,
1445    pack_name: &str,
1446    pack_path: &Path,
1447    actions: &[Action],
1448    commit_sha: &str,
1449    current_synthetic: bool,
1450    prior_lock: &std::collections::HashMap<String, LockEntry>,
1451    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1452    dry_run: bool,
1453    force: bool,
1454) -> bool {
1455    let Some(prior) = prior_lock.get(pack_name) else {
1456        return false;
1457    };
1458    let Some(hash) =
1459        skip_eligibility(actions, commit_sha, current_synthetic, prior, dry_run, force)
1460    else {
1461        return false;
1462    };
1463    let skipped_step = ExecStep {
1464        action_name: Cow::Borrowed("pack"),
1465        result: ExecResult::Skipped {
1466            pack_path: pack_path.to_path_buf(),
1467            actions_hash: hash.clone(),
1468        },
1469        // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1470        // short-circuit detail; we use it here instead of the prior
1471        // `Require { Satisfied, Skip }` proxy so renderers and consumers
1472        // can match on a single, purpose-built variant.
1473        details: StepKind::PackSkipped { actions_hash: hash },
1474    };
1475    report.steps.push(SyncStep {
1476        pack: pack_name.to_string(),
1477        action_idx: 0,
1478        exec_step: skipped_step,
1479    });
1480    // Carry the prior entry forward so the next-lock snapshot stays
1481    // consistent with what's on disk.
1482    next_lock.insert(pack_name.to_string(), prior.clone());
1483    true
1484}
1485
1486/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1487///
1488/// Stores `commit_sha` verbatim — including the empty string when the
1489/// pack is not a git working tree or the HEAD probe failed.
1490/// `actions_hash` is computed over the same `commit_sha`, so the two
1491/// fields stay internally consistent: if probing starts returning a
1492/// non-empty SHA on the next run, the hash differs and the skip is
1493/// correctly invalidated. The prior-preserve carve-out that was
1494/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1495/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1496///
1497/// `prior_lock` is consulted purely for observability: when a
1498/// previously-real pack flips to synthetic between runs (user deleted
1499/// the pack's `pack.yaml` so the walker fell back to v1.1.1
1500/// plain-git-child synthesis), a `tracing::warn!` records the
1501/// downgrade so the operator notices their declarative actions have
1502/// stopped running.
1503fn upsert_lock_entry(
1504    prior_lock: &std::collections::HashMap<String, LockEntry>,
1505    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1506    pack_name: &str,
1507    commit_sha: &str,
1508    actions_hash: &str,
1509    synthetic: bool,
1510) {
1511    if synthetic {
1512        if let Some(prior) = prior_lock.get(pack_name) {
1513            if !prior.synthetic {
1514                tracing::warn!(
1515                    target: "grex::sync",
1516                    pack = pack_name,
1517                    "pack `{pack_name}` downgraded from real to synthetic — \
1518                     pack.yaml missing on disk; only `git pull` will run going forward",
1519                );
1520            }
1521        }
1522    }
1523    let installed_at = Utc::now();
1524    let entry = next_lock.get(pack_name).map_or_else(
1525        || LockEntry {
1526            id: pack_name.to_string(),
1527            sha: commit_sha.to_string(),
1528            branch: String::new(),
1529            installed_at,
1530            actions_hash: actions_hash.to_string(),
1531            schema_version: "1".to_string(),
1532            synthetic,
1533        },
1534        |prev| LockEntry {
1535            installed_at,
1536            actions_hash: actions_hash.to_string(),
1537            sha: commit_sha.to_string(),
1538            synthetic,
1539            ..prev.clone()
1540        },
1541    );
1542    next_lock.insert(pack_name.to_string(), entry);
1543}
1544
1545/// Record one action outcome into `report` + event log. Returns `false`
1546/// when the run must halt (on error); `true` otherwise.
1547fn record_action_outcome(
1548    report: &mut SyncReport,
1549    event_log: &Path,
1550    lock_path: &Path,
1551    pack_name: &str,
1552    idx: usize,
1553    action_tag: &'static str,
1554    step_result: Result<ExecStep, ExecError>,
1555) -> bool {
1556    match step_result {
1557        Ok(step) => {
1558            record_action_ok(report, event_log, lock_path, pack_name, idx, step);
1559            true
1560        }
1561        Err(e) => {
1562            record_action_err(report, event_log, lock_path, pack_name, idx, action_tag, e);
1563            false
1564        }
1565    }
1566}
1567
1568/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1569/// audit event, then push the step onto the report.
1570fn record_action_ok(
1571    report: &mut SyncReport,
1572    event_log: &Path,
1573    lock_path: &Path,
1574    pack_name: &str,
1575    idx: usize,
1576    step: ExecStep,
1577) {
1578    append_step_event(event_log, lock_path, pack_name, &step, &mut report.event_log_warnings);
1579    append_manifest_event(
1580        event_log,
1581        lock_path,
1582        &Event::ActionCompleted {
1583            ts: Utc::now(),
1584            pack: pack_name.to_string(),
1585            action_idx: idx,
1586            result_summary: format!("{:?}", step.result),
1587        },
1588        &mut report.event_log_warnings,
1589    );
1590    report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1591}
1592
1593/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
1594/// rich `HaltedContext` into `report.halted`.
1595fn record_action_err(
1596    report: &mut SyncReport,
1597    event_log: &Path,
1598    lock_path: &Path,
1599    pack_name: &str,
1600    idx: usize,
1601    action_tag: &'static str,
1602    e: ExecError,
1603) {
1604    let error_summary = truncate_error_summary(&e);
1605    append_manifest_event(
1606        event_log,
1607        lock_path,
1608        &Event::ActionHalted {
1609            ts: Utc::now(),
1610            pack: pack_name.to_string(),
1611            action_idx: idx,
1612            action_name: action_tag.to_string(),
1613            error_summary,
1614        },
1615        &mut report.event_log_warnings,
1616    );
1617    let recovery_hint = recovery_hint_for(&e);
1618    report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
1619        pack: pack_name.to_string(),
1620        action_idx: idx,
1621        action_name: action_tag.to_string(),
1622        error: e,
1623        recovery_hint,
1624    })));
1625}
1626
1627/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
1628/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
1629/// stays uniform.
1630fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
1631    use crate::pack::Action;
1632    match action {
1633        Action::Symlink(_) => "symlink",
1634        Action::Unlink(_) => "unlink",
1635        Action::Env(_) => "env",
1636        Action::Mkdir(_) => "mkdir",
1637        Action::Rmdir(_) => "rmdir",
1638        Action::Require(_) => "require",
1639        Action::When(_) => "when",
1640        Action::Exec(_) => "exec",
1641    }
1642}
1643
1644/// Produce a bounded human summary of an [`ExecError`] for
1645/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
1646/// from pathological blowup when captured stderr is large.
1647fn truncate_error_summary(err: &ExecError) -> String {
1648    let mut s = err.to_string();
1649    if s.len() > ACTION_ERROR_SUMMARY_MAX {
1650        s.truncate(ACTION_ERROR_SUMMARY_MAX);
1651        s.push_str("…[truncated]");
1652    }
1653    s
1654}
1655
1656/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
1657/// `None` when no generic advice applies; the error's own `Display`
1658/// output is already shown by the `Halted` variant's format string.
1659fn recovery_hint_for(err: &ExecError) -> Option<String> {
1660    match err {
1661        ExecError::SymlinkDestOccupied { .. } => Some(
1662            "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
1663                .into(),
1664        ),
1665        ExecError::SymlinkPrivilegeDenied { .. } => {
1666            Some("enable Windows Developer Mode or re-run grex as administrator".into())
1667        }
1668        ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
1669            Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
1670        }
1671        ExecError::RmdirNotEmpty { .. } => {
1672            Some("set `force: true` on the rmdir action to recurse".into())
1673        }
1674        ExecError::EnvPersistenceDenied { .. } => {
1675            Some("re-run elevated (Machine scope needs admin)".into())
1676        }
1677        _ => None,
1678    }
1679}
1680
1681/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
1682///
1683/// Failures log a warning and are recorded in the report's
1684/// `event_log_warnings`; they do not abort the sync (spec: event-log write
1685/// failures are non-fatal).
1686///
1687/// # Concurrency
1688///
1689/// The append is serialized through a [`ManifestLock`] held across the
1690/// write. The lock is acquired **per action** (not once across the full
1691/// traversal) so cooperating grex processes can observe mid-progress log
1692/// state between actions; fd-lock acquisition is cheap on modern kernels
1693/// and sync runs are dominated by executor side effects, not lock waits.
1694/// This closes the bypass gap surfaced by the M3 concurrency review where
1695/// `append_event` was called without any cross-process serialisation.
1696fn append_step_event(
1697    log: &Path,
1698    lock_path: &Path,
1699    pack: &str,
1700    step: &ExecStep,
1701    warnings: &mut Vec<String>,
1702) {
1703    let summary = format!("{}:{:?}", step.action_name, step.result);
1704    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
1705    if let Err(e) = append_event_locked(log, lock_path, &event) {
1706        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
1707        warnings.push(format!("{}: {e}", log.display()));
1708    }
1709    // Schema version is recorded once at the manifest level by existing
1710    // manifest code; this stub uses the constant to keep a single source of
1711    // truth for forward-compat.
1712    let _ = SCHEMA_VERSION;
1713}
1714
1715/// Append a single [`Event`] under the shared [`ManifestLock`] path.
1716/// Failures are logged and recorded as non-fatal warnings — the spec
1717/// marks event-log write failures as non-aborting so a transient disk
1718/// error must not kill a sync mid-stream.
1719fn append_manifest_event(log: &Path, lock_path: &Path, event: &Event, warnings: &mut Vec<String>) {
1720    if let Err(e) = append_event_locked(log, lock_path, event) {
1721        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
1722        warnings.push(format!("{}: {e}", log.display()));
1723    }
1724}
1725
1726/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
1727/// created lazily on first write.
1728fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
1729    if let Some(parent) = log.parent() {
1730        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1731    }
1732    if let Some(parent) = lock_path.parent() {
1733        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1734    }
1735    let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
1736    lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
1737}
1738
1739/// Re-export a cheap helper so CLI renderers can label halted steps by node
1740/// name without reaching into the graph twice.
1741#[must_use]
1742pub fn pack_display_name(node: &PackNode) -> &str {
1743    &node.name
1744}
1745
1746/// Run a full teardown over the pack tree rooted at `pack_root`.
1747///
1748/// Mirrors [`run`] but invokes
1749/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
1750/// **reverse** post-order so a parent tears down before its children
1751/// (the inverse of install). Children composed later by an author
1752/// consequently teardown earlier, matching the declarative
1753/// auto-reverse contract (R-M5-11).
1754///
1755/// All other concerns are identical to [`run`]: workspace lock, plan-
1756/// phase validators, lockfile update skipped (teardown does not
1757/// write a `actions_hash` forward), and event-log bracketing.
1758/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
1759/// user explicitly asked to remove the pack, so we always dispatch.
1760///
1761/// # Errors
1762///
1763/// Returns the first error that halts the pipeline — see [`SyncError`].
1764///
1765/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
1766/// the parameter through teardown for parity; stages 3-4 add the polls.
1767pub fn teardown(
1768    pack_root: &Path,
1769    opts: &SyncOptions,
1770    cancel: &CancellationToken,
1771) -> Result<SyncReport, SyncError> {
1772    let _ = cancel;
1773    let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
1774    ensure_workspace_dir(&workspace)?;
1775    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
1776    let _ws_guard = match ws_lock.try_acquire() {
1777        Ok(Some(g)) => g,
1778        Ok(None) => {
1779            return Err(SyncError::WorkspaceBusy {
1780                workspace: workspace.clone(),
1781                lock_path: ws_lock_path,
1782            });
1783        }
1784        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
1785    };
1786
1787    let graph =
1788        walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
1789    let prep = prepare_run_context(pack_root, &graph, &workspace)?;
1790
1791    let mut report = SyncReport {
1792        graph,
1793        steps: Vec::new(),
1794        halted: None,
1795        event_log_warnings: Vec::new(),
1796        pre_run_recovery: prep.pre_run_recovery,
1797        // teardown does not run the legacy-layout migration — by the time
1798        // a user is tearing down, the layout has already been migrated
1799        // (or was never legacy in the first place). Surfacing an empty
1800        // list keeps the report shape symmetric with `run()`.
1801        workspace_migrations: Vec::new(),
1802    };
1803
1804    // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
1805    // Scheduler, thread it through every `ExecCtx` the teardown path
1806    // constructs. Teardown is the other user-facing verb that owns a
1807    // runtime, so it gets the same wiring.
1808    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
1809    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
1810    run_teardown(
1811        &mut report,
1812        &prep.order,
1813        &prep.vars,
1814        &workspace,
1815        &prep.event_log,
1816        &prep.lock_path,
1817        &prep.registry,
1818        &prep.pack_type_registry,
1819        resolved_parallel,
1820        &scheduler,
1821    );
1822    Ok(report)
1823}
1824
1825/// Dispatch `teardown` for every pack in **reverse** post-order.
1826/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
1827/// rather than the per-action M4 path because the trait's
1828/// auto-reverse / explicit-block logic must compose with the
1829/// registry; going through the per-action path would mean
1830/// re-implementing inverse synthesis in the sync loop.
1831#[allow(clippy::too_many_arguments)]
1832fn run_teardown(
1833    report: &mut SyncReport,
1834    order: &[usize],
1835    vars: &VarEnv,
1836    workspace: &Path,
1837    event_log: &Path,
1838    lock_path: &Path,
1839    registry: &Arc<Registry>,
1840    pack_type_registry: &Arc<PackTypeRegistry>,
1841    parallel: usize,
1842    scheduler: &Arc<Scheduler>,
1843) {
1844    let rt = build_pack_type_runtime(parallel);
1845    // Reverse post-order: root first, then children. Pack-type plugin
1846    // teardown methods reverse their own children/actions, so the
1847    // outer loop only flips the inter-pack order.
1848    for &id in order.iter().rev() {
1849        let Some(node) = report.graph.node(id) else { continue };
1850        let pack_name = node.name.clone();
1851        let pack_path = node.path.clone();
1852        let manifest = node.manifest.clone();
1853        let type_tag = manifest.r#type.as_str();
1854        if pack_type_registry.get(type_tag).is_none() {
1855            let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1856            record_action_err(report, event_log, lock_path, &pack_name, 0, "pack-type", err);
1857            return;
1858        }
1859        let ctx = ExecCtx::new(vars, &pack_path, workspace)
1860            .with_platform(Platform::current())
1861            .with_registry(registry)
1862            .with_pack_type_registry(pack_type_registry)
1863            .with_scheduler(scheduler);
1864        append_manifest_event(
1865            event_log,
1866            lock_path,
1867            &Event::ActionStarted {
1868                ts: Utc::now(),
1869                pack: pack_name.clone(),
1870                action_idx: 0,
1871                action_name: type_tag.to_string(),
1872            },
1873            &mut report.event_log_warnings,
1874        );
1875        let plugin = pack_type_registry
1876            .get(type_tag)
1877            .expect("pack-type plugin must be registered (guarded above)");
1878        // feat-m6 CI fix — see dispatch_pack_type note.
1879        let step_result =
1880            rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
1881        if !record_action_outcome(
1882            report,
1883            event_log,
1884            lock_path,
1885            &pack_name,
1886            0,
1887            type_tag,
1888            step_result,
1889        ) {
1890            return;
1891        }
1892    }
1893}
1894
1895/// Test-only hook: append one [`Event::Sync`] through the same
1896/// [`ManifestLock`]-serialised path the sync driver uses.
1897///
1898/// Exposed so integration tests under `tests/` can exercise the locked
1899/// append helper without spinning up a full pack tree. Not intended for
1900/// downstream consumers — the signature may change without notice.
1901#[doc(hidden)]
1902pub fn __test_append_sync_event(
1903    log: &Path,
1904    lock_path: &Path,
1905    pack: &str,
1906    action_name: &str,
1907) -> Result<(), String> {
1908    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
1909    append_event_locked(log, lock_path, &event)
1910}
1911
1912// ----------------------------------------------------------------------
1913// PR E — pre-run teardown scan
1914// ----------------------------------------------------------------------
1915
1916/// One `ActionStarted` event in the manifest log that has no matching
1917/// `ActionCompleted` or `ActionHalted` peer.
1918///
1919/// Dangling starts are the primary crash signal: the process wrote the
1920/// pre-action event, then died before the executor returned. Callers
1921/// should surface these to the operator (diagnostics only this PR; a
1922/// future `grex doctor` verb will act on them).
1923#[non_exhaustive]
1924#[derive(Debug, Clone, PartialEq, Eq)]
1925pub struct DanglingStart {
1926    /// Pack that owned the halted action.
1927    pub pack: String,
1928    /// 0-based action index within the pack.
1929    pub action_idx: usize,
1930    /// Short action kind tag.
1931    pub action_name: String,
1932    /// Timestamp the `ActionStarted` event was written.
1933    pub started_at: DateTime<Utc>,
1934}
1935
1936/// Summary of teardown artifacts found under a pack root before a sync
1937/// begins.
1938///
1939/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
1940/// proceeds regardless of what the scan finds.
1941#[non_exhaustive]
1942#[derive(Debug, Clone, Default, PartialEq, Eq)]
1943pub struct RecoveryReport {
1944    /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
1945    /// original (symlink-action rollback orphan).
1946    pub orphan_backups: Vec<PathBuf>,
1947    /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
1948    /// `backup: true`.
1949    pub orphan_tombstones: Vec<PathBuf>,
1950    /// `ActionStarted` events in the log with no matching
1951    /// `ActionCompleted`/`ActionHalted`.
1952    pub dangling_starts: Vec<DanglingStart>,
1953}
1954
1955impl RecoveryReport {
1956    /// `true` when the scan found nothing worth reporting.
1957    #[must_use]
1958    pub fn is_empty(&self) -> bool {
1959        self.orphan_backups.is_empty()
1960            && self.orphan_tombstones.is_empty()
1961            && self.dangling_starts.is_empty()
1962    }
1963}
1964
1965/// Walk `workspace` and the manifest log to find crash-recovery artifacts.
1966///
1967/// Inspects:
1968///
1969/// * `workspace` for `.grex.bak` orphans and timestamped `.grex.bak.<ts>`
1970///   tombstones. The workspace IS where children materialise (whether
1971///   the default flat-sibling layout under the pack root, or an
1972///   explicit `--workspace` override directory) so this single bounded
1973///   walk covers every backup site.
1974/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
1975///   have no matching `ActionCompleted` / `ActionHalted` successor.
1976///
1977/// Non-blocking: scan errors are swallowed to an empty report so a
1978/// half-readable directory cannot kill a sync that would otherwise
1979/// succeed. Call sites that want to surface scan failures should read
1980/// the manifest directly.
1981///
1982/// Pre-`v1.1.0` post-review fix this anchored at `pack_root_dir(pack_root)`,
1983/// which missed every backup under a `--workspace` override.
1984///
1985/// # Errors
1986///
1987/// Returns [`SyncError::Validation`] only when the manifest read itself
1988/// reports corruption. Filesystem traversal errors are swallowed.
1989pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
1990    let mut report = RecoveryReport::default();
1991    walk_for_backups(workspace, &mut report);
1992    if event_log.exists() {
1993        match read_all(event_log) {
1994            Ok(events) => {
1995                report.dangling_starts = collect_dangling_starts(&events);
1996            }
1997            Err(e) => {
1998                return Err(SyncError::Validation {
1999                    errors: vec![PackValidationError::DependsOnUnsatisfied {
2000                        pack: "<event-log>".into(),
2001                        required: e.to_string(),
2002                    }],
2003                });
2004            }
2005        }
2006    }
2007    Ok(report)
2008}
2009
2010/// Shallow directory walker (bounded depth = 6) that categorizes
2011/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
2012/// report slot. Depth-limited so a pathological workspace with a deep
2013/// tree cannot stall the scan; realistic layouts are well under six
2014/// levels.
2015fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
2016    walk_for_backups_inner(root, report, 0);
2017}
2018
2019fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
2020    const MAX_DEPTH: u32 = 6;
2021    if depth > MAX_DEPTH {
2022        return;
2023    }
2024    let Ok(entries) = std::fs::read_dir(dir) else { return };
2025    for entry_result in entries {
2026        let entry = match entry_result {
2027            Ok(e) => e,
2028            Err(e) => {
2029                tracing::warn!(
2030                    target: "grex::sync::recover",
2031                    "skipping unreadable entry under `{}`: {e}",
2032                    dir.display(),
2033                );
2034                continue;
2035            }
2036        };
2037        let path = entry.path();
2038        let name = entry.file_name();
2039        let Some(name_str) = name.to_str() else { continue };
2040        if name_str.ends_with(".grex.bak") {
2041            report.orphan_backups.push(path.clone());
2042            continue;
2043        }
2044        if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
2045            // `rsplit_once` returns `(prefix, suffix)`; suffix is the
2046            // timestamp chunk. Accept any non-empty suffix — the exact
2047            // timestamp shape is `fs_executor` internal.
2048            if !rest.1.is_empty() {
2049                report.orphan_tombstones.push(path.clone());
2050                continue;
2051            }
2052        }
2053        // Recurse only into real directories (not symlinks, to avoid
2054        // traversing into the workspace's cloned repos via aliased
2055        // paths). `entry.file_type()` does NOT follow symlinks (unlike
2056        // `entry.metadata()` which would dereference and report the
2057        // target's type — defeating the very check this guards). The
2058        // symlink-skip is also explicit so the intent is recoverable
2059        // from the source: backup-recovery never crosses a symlink.
2060        let Ok(ft) = entry.file_type() else { continue };
2061        if ft.is_symlink() {
2062            continue;
2063        }
2064        if ft.is_dir() {
2065            walk_for_backups_inner(&path, report, depth + 1);
2066        }
2067    }
2068}
2069
2070/// Reduce an event stream to a list of `ActionStarted` records with no
2071/// matching terminator.
2072///
2073/// Matching is positional per `(pack, action_idx)`: a later
2074/// `ActionCompleted` or `ActionHalted` with the same key clears the
2075/// entry. Whatever remains in the map after the pass is dangling.
2076fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
2077    use std::collections::HashMap;
2078    let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
2079    for ev in events {
2080        match ev {
2081            Event::ActionStarted { ts, pack, action_idx, action_name } => {
2082                open.insert(
2083                    (pack.clone(), *action_idx),
2084                    DanglingStart {
2085                        pack: pack.clone(),
2086                        action_idx: *action_idx,
2087                        action_name: action_name.clone(),
2088                        started_at: *ts,
2089                    },
2090                );
2091            }
2092            Event::ActionCompleted { pack, action_idx, .. }
2093            | Event::ActionHalted { pack, action_idx, .. } => {
2094                open.remove(&(pack.clone(), *action_idx));
2095            }
2096            _ => {}
2097        }
2098    }
2099    let mut out: Vec<DanglingStart> = open.into_values().collect();
2100    out.sort_by_key(|a| a.started_at);
2101    out
2102}
2103
2104#[cfg(test)]
2105mod synthetic_transition_tests {
2106    //! v1.1.1 — regression cover for the pack-shape transition fixes.
2107    //!
2108    //! These tests exercise [`skip_eligibility`] / [`upsert_lock_entry`]
2109    //! directly (no walker, no fs) so the assertion is on the plumbing
2110    //! itself: skip eligibility must require synthetic-flag agreement
2111    //! even when the actions hash matches by coincidence, and the
2112    //! upsert path must record the real-to-synthetic downgrade in the
2113    //! lockfile so the operator's lockfile reflects what just happened.
2114    use super::{skip_eligibility, upsert_lock_entry, LockEntry};
2115    use crate::lockfile::compute_actions_hash;
2116    use chrono::{TimeZone, Utc};
2117    use std::collections::HashMap;
2118
2119    fn ts() -> chrono::DateTime<Utc> {
2120        Utc.with_ymd_and_hms(2026, 4, 27, 10, 0, 0).unwrap()
2121    }
2122
2123    /// Stable empty-actions hash with a fixed commit SHA. The same
2124    /// inputs feed both the prior (real) and the new (synthetic)
2125    /// configuration in the regression below, which is exactly the
2126    /// coincidental-hash-match scenario FIX 3 must catch.
2127    fn stable_hash() -> String {
2128        compute_actions_hash(&[], "deadbeef")
2129    }
2130
2131    fn prior_entry(synthetic: bool) -> LockEntry {
2132        LockEntry {
2133            id: "alpha".into(),
2134            sha: "deadbeef".into(),
2135            branch: "main".into(),
2136            installed_at: ts(),
2137            actions_hash: stable_hash(),
2138            schema_version: "1".into(),
2139            synthetic,
2140        }
2141    }
2142
2143    /// FIX 3 — pack flips from real → synthetic but `actions_hash` and
2144    /// `commit_sha` happen to match. The skip MUST be invalidated so
2145    /// the upsert path re-emits the lockfile entry with `synthetic =
2146    /// true`.
2147    #[test]
2148    fn skip_eligibility_invalidates_when_synthetic_flag_flips() {
2149        let prior = prior_entry(false);
2150        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2151        assert!(decision.is_none(), "skip must be invalidated when synthetic flag flips");
2152    }
2153
2154    /// Same hash, same synthetic flag → skip is allowed (baseline).
2155    #[test]
2156    fn skip_eligibility_allows_skip_when_synthetic_matches() {
2157        let prior = prior_entry(true);
2158        let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2159        assert_eq!(
2160            decision.as_deref(),
2161            Some(stable_hash().as_str()),
2162            "skip must be honoured when synthetic flag matches",
2163        );
2164    }
2165
2166    /// `dry_run` and `force` always disable the skip regardless of
2167    /// flag agreement.
2168    #[test]
2169    fn skip_eligibility_respects_dry_run_and_force() {
2170        let prior = prior_entry(true);
2171        assert!(skip_eligibility(&[], "deadbeef", true, &prior, true, false).is_none());
2172        assert!(skip_eligibility(&[], "deadbeef", true, &prior, false, true).is_none());
2173    }
2174
2175    /// FIX 4 — `upsert_lock_entry` records the downgrade in the
2176    /// lockfile (entry flips to `synthetic = true`) when the prior
2177    /// entry was real. The `tracing::warn!` is fire-and-forget, but
2178    /// the lockfile transition itself is observable and must be
2179    /// correct.
2180    #[test]
2181    fn upsert_lock_entry_records_real_to_synthetic_downgrade() {
2182        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2183        prior.insert(
2184            "beta".into(),
2185            LockEntry {
2186                id: "beta".into(),
2187                sha: "deadbeef".into(),
2188                branch: "main".into(),
2189                installed_at: ts(),
2190                actions_hash: stable_hash(),
2191                schema_version: "1".into(),
2192                synthetic: false,
2193            },
2194        );
2195        let mut next: HashMap<String, LockEntry> = HashMap::new();
2196
2197        upsert_lock_entry(&prior, &mut next, "beta", "deadbeef", &stable_hash(), true);
2198
2199        let entry = next.get("beta").expect("entry must be upserted");
2200        assert!(entry.synthetic, "downgraded entry must carry synthetic = true");
2201        assert_eq!(entry.actions_hash, stable_hash(), "actions_hash must reflect current run");
2202    }
2203
2204    /// Upsert path is a no-op for the steady-state case (synthetic →
2205    /// synthetic): the entry is replaced with the current run's
2206    /// timestamp/hash but the synthetic flag is preserved. This
2207    /// guards against an over-eager warning fire.
2208    #[test]
2209    fn upsert_lock_entry_no_op_for_steady_state_synthetic() {
2210        let mut prior: HashMap<String, LockEntry> = HashMap::new();
2211        prior.insert(
2212            "gamma".into(),
2213            LockEntry {
2214                id: "gamma".into(),
2215                sha: "deadbeef".into(),
2216                branch: "main".into(),
2217                installed_at: ts(),
2218                actions_hash: stable_hash(),
2219                schema_version: "1".into(),
2220                synthetic: true,
2221            },
2222        );
2223        let mut next: HashMap<String, LockEntry> = HashMap::new();
2224
2225        upsert_lock_entry(&prior, &mut next, "gamma", "deadbeef", &stable_hash(), true);
2226
2227        let entry = next.get("gamma").expect("entry must be upserted");
2228        assert!(entry.synthetic, "synthetic must remain true on no-op refresh");
2229    }
2230}