Skip to main content

grex_core/
sync.rs

1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`Walker`] + [`FsPackLoader`] + a `GitBackend`.
7//! 2. Run plan-phase validators (manifest-level + graph-level).
8//! 3. Execute every action via a pluggable [`ActionExecutor`]
9//!    ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
10//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
11//!    `.grex/grex.jsonl` event log.
12//!
13//! # Traversal order
14//!
15//! Nodes are executed in **depth-first post-order**: children fully install
16//! before their parent. Rationale: parent packs commonly `require:` artifacts
17//! created by children (e.g. a parent symlink whose `src` lives inside a
18//! child). Running the root last matches the overlay-style dotfile-install
19//! intent authors expect, and it matches how `walker.walk` is structured
20//! (children are hydrated before the recursion returns).
21//!
22//! # Decoupling
23//!
24//! The CLI crate drives this module through a thin `run()` entry point;
25//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
26//! expressions, ref overrides) can land in later milestones without breaking
27//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
28//! variant set.
29
30use std::borrow::Cow;
31use std::path::{Path, PathBuf};
32use std::sync::Arc;
33
34use chrono::{DateTime, Utc};
35use globset::{Glob, GlobSet, GlobSetBuilder};
36use thiserror::Error;
37use tokio_util::sync::CancellationToken;
38
39use crate::execute::{
40    ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
41    PlanExecutor, Platform, StepKind,
42};
43use crate::fs::{ManifestLock, ScopedLock};
44use crate::git::GixBackend;
45use crate::lockfile::{
46    compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
47};
48use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
49use crate::pack::{Action, PackValidationError};
50use crate::plugin::{PackTypeRegistry, Registry};
51use crate::scheduler::Scheduler;
52use crate::tree::{FsPackLoader, PackGraph, PackNode, TreeError, Walker};
53use crate::vars::VarEnv;
54
55/// Inputs to [`run`].
56///
57/// Fields are public-writable so call sites can construct with struct
58/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
59/// so future knobs (parallelism, filter expressions, additional ref
60/// strategies) can land without breaking library consumers who
61/// constructed with explicit-literal syntax. Forces callers to use
62/// struct-update syntax (`..Default::default()`).
63#[non_exhaustive]
64#[derive(Debug, Clone)]
65pub struct SyncOptions {
66    /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
67    pub dry_run: bool,
68    /// When `false`, skip plan-phase validators (manifest + graph). Debug
69    /// escape hatch; production callers should leave this `true`.
70    pub validate: bool,
71    /// Override workspace directory. `None` → `<pack_root>/.grex/workspace`.
72    pub workspace: Option<PathBuf>,
73    /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
74    /// `Some`, every child pack clone/checkout uses this ref instead of
75    /// the declared `child.ref`. Empty strings are rejected at the CLI
76    /// layer.
77    pub ref_override: Option<String>,
78    /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
79    /// strings — compiled internally via an in-crate `globset` helper so the
80    /// `globset` crate version does not leak into the public API.
81    /// `None` / empty means every pack runs (M3 semantics). Matching is
82    /// against the pack's **workspace-relative** path normalized to
83    /// forward-slash form.
84    pub only_patterns: Option<Vec<String>>,
85    /// Bypass the lockfile hash-match skip (`grex sync --force`). When
86    /// `true`, every pack re-executes even if its `actions_hash` is
87    /// unchanged from the prior lockfile.
88    pub force: bool,
89    /// Max parallel pack ops for this sync run (feat-m6-1).
90    ///
91    /// * `None` → callers default to `num_cpus::get()` at CLI layer.
92    ///   Library callers who construct `SyncOptions` directly and leave
93    ///   this `None` get `num_cpus::get()` semantics too — the sync
94    ///   driver resolves the default in one place so the scheduler slot
95    ///   on every `ExecCtx` is always populated.
96    /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
97    /// * `Some(1)` → serial fast-path.
98    /// * `Some(n >= 2)` → bounded parallel.
99    pub parallel: Option<usize>,
100}
101
102impl Default for SyncOptions {
103    fn default() -> Self {
104        Self {
105            dry_run: false,
106            validate: true,
107            workspace: None,
108            ref_override: None,
109            only_patterns: None,
110            force: false,
111            parallel: None,
112        }
113    }
114}
115
116/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
117/// Empty / absent input yields `Ok(None)` so M3's zero-config path
118/// (every pack runs) stays the default.
119fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
120    let Some(pats) = patterns else { return Ok(None) };
121    if pats.is_empty() {
122        return Ok(None);
123    }
124    let mut builder = GlobSetBuilder::new();
125    for p in pats {
126        let glob = Glob::new(p)
127            .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
128        builder.add(glob);
129    }
130    let set = builder
131        .build()
132        .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
133    Ok(Some(set))
134}
135
136impl SyncOptions {
137    /// Default options: wet-run, validators enabled, default workspace path.
138    #[must_use]
139    pub fn new() -> Self {
140        Self::default()
141    }
142
143    /// Set `dry_run`.
144    #[must_use]
145    pub fn with_dry_run(mut self, dry_run: bool) -> Self {
146        self.dry_run = dry_run;
147        self
148    }
149
150    /// Set `validate`.
151    #[must_use]
152    pub fn with_validate(mut self, validate: bool) -> Self {
153        self.validate = validate;
154        self
155    }
156
157    /// Set `workspace` override.
158    #[must_use]
159    pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
160        self.workspace = workspace;
161        self
162    }
163
164    /// Set `ref_override` (`--ref`).
165    #[must_use]
166    pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
167        self.ref_override = ref_override;
168        self
169    }
170
171    /// Set `only_patterns` (`--only`). Empty vector or `None` disables
172    /// the filter.
173    #[must_use]
174    pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
175        self.only_patterns = patterns;
176        self
177    }
178
179    /// Set `force` (`--force`).
180    #[must_use]
181    pub fn with_force(mut self, force: bool) -> Self {
182        self.force = force;
183        self
184    }
185
186    /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
187    /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
188    #[must_use]
189    pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
190        self.parallel = parallel;
191        self
192    }
193}
194
195/// One executed (or planned) action step in a sync run.
196///
197/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
198/// plugin provenance) can land without breaking library consumers who
199/// destructure the struct.
200#[non_exhaustive]
201#[derive(Debug, Clone)]
202pub struct SyncStep {
203    /// Name of the pack that owned the action.
204    pub pack: String,
205    /// 0-based index into the pack's top-level `actions` vector.
206    pub action_idx: usize,
207    /// The [`ExecStep`] record emitted by the executor.
208    pub exec_step: ExecStep,
209}
210
211/// Outcome of a [`run`] invocation.
212///
213/// On fail-fast termination, `halted` carries the error that stopped the
214/// sync; every completed step up to that point is still in `steps` so
215/// callers can render a partial transcript.
216///
217/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
218/// can land without breaking library consumers who destructure the struct.
219#[non_exhaustive]
220#[derive(Debug)]
221pub struct SyncReport {
222    /// Fully-walked pack graph (present even on halted runs).
223    pub graph: PackGraph,
224    /// Steps produced by the executor, in execution order.
225    pub steps: Vec<SyncStep>,
226    /// `Some(e)` if execution stopped before all actions ran.
227    pub halted: Option<SyncError>,
228    /// Non-fatal manifest-append warnings (one per failed event append).
229    /// Kept as a separate field because spec marks event-log write failures
230    /// as non-aborting.
231    pub event_log_warnings: Vec<String>,
232    /// `Some(r)` when the pre-run teardown scan found orphaned backup
233    /// files or dangling [`Event::ActionStarted`] records from a prior
234    /// crashed run. Informational only — the report is still returned and
235    /// the sync proceeds. CLI renderers should surface a warning so the
236    /// operator can decide whether to run a future `grex doctor` verb.
237    pub pre_run_recovery: Option<RecoveryReport>,
238}
239
240/// Rich context attached to a [`SyncError::Halted`] variant.
241///
242/// Packages the pack + action position together with the underlying
243/// executor error and an optional human-readable recovery hint. Marked
244/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
245/// land without breaking `match` arms or struct destructures.
246#[non_exhaustive]
247#[derive(Debug)]
248pub struct HaltedContext {
249    /// Name of the pack that owned the halted action.
250    pub pack: String,
251    /// 0-based index into the pack's top-level `actions` vector.
252    pub action_idx: usize,
253    /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
254    pub action_name: String,
255    /// Underlying executor error.
256    pub error: ExecError,
257    /// Optional next-step suggestion for the operator. `None` when no
258    /// generic hint applies — the executor error's own `Display` already
259    /// tells the story.
260    pub recovery_hint: Option<String>,
261}
262
263/// Error taxonomy surfaced by [`run`].
264#[non_exhaustive]
265#[derive(Debug, Error)]
266pub enum SyncError {
267    /// The pack-tree walker failed (loader error, git error, cycle, …).
268    #[error("tree walk failed: {0}")]
269    Tree(#[from] TreeError),
270    /// One or more plan-phase validators flagged the graph.
271    #[error("validation failed: {errors:?}")]
272    Validation {
273        /// Aggregated errors from manifest-level + graph-level validators.
274        errors: Vec<PackValidationError>,
275    },
276    /// An action executor returned an error.
277    ///
278    /// Retained for backward compatibility; new call sites should prefer
279    /// [`SyncError::Halted`] which carries full pack + action context.
280    /// Kept non-deprecated because [`From<ExecError>`] still materialises
281    /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
282    #[error("action execution failed: {0}")]
283    Exec(#[from] ExecError),
284    /// Action execution halted; full context (pack, action index, error,
285    /// optional recovery hint) lives in [`HaltedContext`]. This is the
286    /// variant the sync driver emits — [`SyncError::Exec`] is only
287    /// surfaced by ancillary code paths.
288    #[error(
289        "sync halted at pack `{}` action #{} ({}): {}",
290        .0.pack, .0.action_idx, .0.action_name, .0.error
291    )]
292    Halted(Box<HaltedContext>),
293    /// Another `grex` process (or thread) already holds the workspace-level
294    /// lock. The running sync refused to start to avoid racing two concurrent
295    /// walkers into the same workspace. If the lock file at `lock_path` is
296    /// stale (no other grex is actually running), remove it by hand.
297    #[error(
298        "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
299    )]
300    WorkspaceBusy {
301        /// Resolved workspace directory that the current run tried to lock.
302        workspace: PathBuf,
303        /// Sidecar lock file that is currently held.
304        lock_path: PathBuf,
305    },
306    /// Reading or parsing the resolved-state lockfile failed. Surfaced as
307    /// its own variant (rather than folded into `Validation`) because a
308    /// corrupt / unreadable lockfile is an I/O or schema fault, not a
309    /// dependency-satisfaction fault. Resolution is operator-level
310    /// (restore a backup, delete the file, re-sync), not author-level.
311    #[error("lockfile `{path}` failed to load: {source}")]
312    Lockfile {
313        /// Lockfile path that failed to load.
314        path: PathBuf,
315        /// Underlying lockfile error.
316        #[source]
317        source: LockfileError,
318    },
319    /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
320    /// as its own variant so the CLI can map it to a dedicated usage
321    /// error exit code instead of the generic sync-failure bucket.
322    #[error("invalid --only glob `{pattern}`: {source}")]
323    InvalidOnlyGlob {
324        /// The raw pattern string that failed to compile.
325        pattern: String,
326        /// Underlying globset error.
327        #[source]
328        source: globset::Error,
329    },
330}
331
332impl Clone for SyncError {
333    fn clone(&self) -> Self {
334        // `TreeError` / `ExecError` do not implement `Clone` (they wrap
335        // `std::io::Error`-adjacent values). Halts carry only a display
336        // rendering in the report; we re-materialise via a synthetic
337        // `Validation` variant so `SyncReport` can be `Clone`-safe for
338        // observability tooling without widening the taxonomy.
339        match self {
340            Self::Tree(e) => Self::Validation {
341                errors: vec![PackValidationError::DependsOnUnsatisfied {
342                    pack: "<tree>".into(),
343                    required: e.to_string(),
344                }],
345            },
346            Self::Validation { errors } => Self::Validation { errors: errors.clone() },
347            Self::Exec(e) => Self::Validation {
348                errors: vec![PackValidationError::DependsOnUnsatisfied {
349                    pack: "<exec>".into(),
350                    required: e.to_string(),
351                }],
352            },
353            Self::Halted(ctx) => Self::Validation {
354                errors: vec![PackValidationError::DependsOnUnsatisfied {
355                    pack: ctx.pack.clone(),
356                    required: format!(
357                        "action #{} ({}): {}",
358                        ctx.action_idx, ctx.action_name, ctx.error
359                    ),
360                }],
361            },
362            Self::WorkspaceBusy { workspace, lock_path } => {
363                Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
364            }
365            Self::Lockfile { path, source } => Self::Validation {
366                errors: vec![PackValidationError::DependsOnUnsatisfied {
367                    pack: "<lockfile>".into(),
368                    required: format!("{}: {source}", path.display()),
369                }],
370            },
371            Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
372                errors: vec![PackValidationError::DependsOnUnsatisfied {
373                    pack: "<only-glob>".into(),
374                    required: format!("{pattern}: {source}"),
375                }],
376            },
377        }
378    }
379}
380
381/// Run a full sync over the pack tree rooted at `pack_root`.
382///
383/// Resolution rules:
384/// * If `pack_root` is a directory the walker looks for
385///   `<pack_root>/.grex/pack.yaml`.
386/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
387/// * Workspace defaults to `<pack_root>/.grex/workspace` when `opts.workspace`
388///   is `None`.
389///
390/// # Errors
391///
392/// Returns the first error that halts the pipeline — see [`SyncError`] for
393/// the taxonomy.
394///
395/// `cancel` is the cooperative cancellation handle threaded through the
396/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
397/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
398/// acquire). CLI callers pass a never-cancelled sentinel
399/// (`CancellationToken::new()`); the MCP server passes a token tied to
400/// the request lifetime.
401pub fn run(
402    pack_root: &Path,
403    opts: &SyncOptions,
404    cancel: &CancellationToken,
405) -> Result<SyncReport, SyncError> {
406    // Stage 2 is signature-only — silence "unused parameter" without
407    // hiding it behind `_` (downstream stages will read it).
408    let _ = cancel;
409    let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
410    ensure_workspace_dir(&workspace)?;
411    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
412    let _ws_guard = match ws_lock.try_acquire() {
413        Ok(Some(g)) => g,
414        Ok(None) => {
415            return Err(SyncError::WorkspaceBusy {
416                workspace: workspace.clone(),
417                lock_path: ws_lock_path,
418            });
419        }
420        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
421    };
422
423    // Compile `--only` patterns into a GlobSet here so the
424    // `globset` crate version does not leak into `SyncOptions`.
425    let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
426
427    let graph =
428        walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
429    let prep = prepare_run_context(pack_root, &graph)?;
430    log_force_flag(opts.force);
431
432    let mut report = SyncReport {
433        graph,
434        steps: Vec::new(),
435        halted: None,
436        event_log_warnings: Vec::new(),
437        pre_run_recovery: prep.pre_run_recovery,
438    };
439
440    let mut next_lock = prep.prior_lock.clone();
441    // feat-m6 B1: resolve `--parallel` once and build the scheduler
442    // shared across every `ExecCtx` in this run. Library callers who
443    // leave `opts.parallel == None` default to `num_cpus::get()` here
444    // (clamped `>= 1`) so the scheduler slot is always populated —
445    // `ctx.scheduler` being `None` would strand acquire-sites into
446    // unbounded concurrency. See `.omne/cfg/concurrency.md` §Scheduler.
447    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
448    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
449    run_actions(
450        &mut report,
451        &prep.order,
452        &prep.vars,
453        &workspace,
454        &prep.event_log,
455        &prep.lock_path,
456        opts.dry_run,
457        &prep.prior_lock,
458        &mut next_lock,
459        &prep.registry,
460        &prep.pack_type_registry,
461        only_set.as_ref(),
462        opts.force,
463        resolved_parallel,
464        &scheduler,
465    );
466
467    persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
468    Ok(report)
469}
470
471/// Bag of context pieces assembled once at the top of [`run`]. Grouping
472/// them keeps [`run`] under the workspace's 50-LOC function lint without
473/// smearing the read of sequential setup across helpers. Fields are
474/// consumed piecemeal by the actions loop; no getters needed.
475struct RunContext {
476    order: Vec<usize>,
477    vars: VarEnv,
478    event_log: PathBuf,
479    lock_path: PathBuf,
480    lockfile_path: PathBuf,
481    prior_lock: std::collections::HashMap<String, LockEntry>,
482    registry: Arc<Registry>,
483    pack_type_registry: Arc<PackTypeRegistry>,
484    pre_run_recovery: Option<RecoveryReport>,
485}
486
487/// Build the per-run context: traversal order, vars env, event/lockfile
488/// paths, prior lockfile state, bootstrap registry, and (optionally) a
489/// pre-run recovery scan. Kept narrow so [`run`] stays small.
490fn prepare_run_context(pack_root: &Path, graph: &PackGraph) -> Result<RunContext, SyncError> {
491    let event_log = event_log_path(pack_root);
492    let lock_path = event_lock_path(&event_log);
493    let vars = VarEnv::from_os();
494    let order = post_order(graph);
495    let pre_run_recovery =
496        scan_recovery(&pack_root_dir(pack_root), &event_log).ok().filter(|r| !r.is_empty());
497    let lockfile_path = lockfile_path(pack_root);
498    let prior_lock = load_prior_lock(&lockfile_path)?;
499    let registry = Arc::new(Registry::bootstrap());
500    let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
501    Ok(RunContext {
502        order,
503        vars,
504        event_log,
505        lock_path,
506        lockfile_path,
507        prior_lock,
508        registry,
509        pack_type_registry,
510        pre_run_recovery,
511    })
512}
513
514/// Build the [`PackTypeRegistry`] the sync driver threads into every
515/// [`ExecCtx`] it constructs.
516///
517/// Default path (no `plugin-inventory` feature) hard-codes the three
518/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
519/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
520/// externally-submitted plugin types (mirroring the M4-E pattern for
521/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
522/// helper so the `#[cfg]` split lives in one place instead of being
523/// smeared across every sync call-site.
524fn bootstrap_pack_type_registry() -> PackTypeRegistry {
525    #[cfg(feature = "plugin-inventory")]
526    {
527        let mut reg = PackTypeRegistry::bootstrap();
528        reg.register_from_inventory();
529        reg
530    }
531    #[cfg(not(feature = "plugin-inventory"))]
532    {
533        PackTypeRegistry::bootstrap()
534    }
535}
536
537/// Emit a single `tracing::info!` line when `--force` is active so
538/// operators can confirm from logs that the skip short-circuit was
539/// bypassed. Extracted so [`run`] stays small.
540fn log_force_flag(force: bool) {
541    if force {
542        tracing::info!(
543            target: "grex::sync",
544            "--force active: bypassing lockfile skip-on-hash short-circuit"
545        );
546    }
547}
548
549/// Walk the pack tree rooted at `pack_root`, optionally running the
550/// plan-phase validators. Extracted so [`run`] stays under the
551/// workspace's 50-LOC per-function lint threshold.
552fn walk_and_validate(
553    pack_root: &Path,
554    workspace: &Path,
555    validate: bool,
556    ref_override: Option<&str>,
557) -> Result<PackGraph, SyncError> {
558    let loader = FsPackLoader::new();
559    let backend = GixBackend::new();
560    let walker = Walker::new(&loader, &backend, workspace.to_path_buf())
561        .with_ref_override(ref_override.map(str::to_string));
562    let graph = walker.walk(pack_root)?;
563    if validate {
564        validate_graph(&graph)?;
565    }
566    Ok(graph)
567}
568
569/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
570/// empty map; parse errors are fatal since writes are atomic and a torn
571/// lockfile therefore indicates real corruption that must be resolved
572/// before a fresh sync is safe. Parse/IO failures surface as
573/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
574/// dependency-satisfaction fault, so it gets its own taxonomy slot.
575fn load_prior_lock(
576    lockfile_path: &Path,
577) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
578    read_lockfile(lockfile_path)
579        .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
580}
581
582/// Persist `next_lock` atomically to `lockfile_path` whenever this was
583/// not a dry-run. On a halt the map has already had the halted pack's
584/// entry removed (see `run_actions`), so persisting now preserves every
585/// *successful* pack's fresh entry while guaranteeing absence of an
586/// entry for the halted pack — next sync sees no prior hash there and
587/// re-executes from scratch (route (b) halt-state gating). Write errors
588/// surface as non-fatal warnings on the report.
589fn persist_lockfile_if_clean(
590    report: &mut SyncReport,
591    lockfile_path: &Path,
592    next_lock: &std::collections::HashMap<String, LockEntry>,
593    dry_run: bool,
594) {
595    if dry_run {
596        return;
597    }
598    if let Err(e) = write_lockfile(lockfile_path, next_lock) {
599        tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
600        report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
601    }
602}
603
604/// Canonical location of the resolved-state lockfile
605/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
606/// so both audit artifacts live under a single `.grex/` sidecar.
607fn lockfile_path(pack_root: &Path) -> PathBuf {
608    pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
609}
610
611/// Create the workspace directory if it does not yet exist.
612fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
613    if !workspace.exists() {
614        std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
615            errors: vec![PackValidationError::DependsOnUnsatisfied {
616                pack: "<workspace>".into(),
617                required: format!("{}: {e}", workspace.display()),
618            }],
619        })?;
620    }
621    Ok(())
622}
623
624/// Open (but do not acquire) the workspace-level lock file.
625fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
626    let ws_lock_path = workspace_lock_path(workspace);
627    let ws_lock = ScopedLock::open(&ws_lock_path)
628        .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
629    Ok((ws_lock, ws_lock_path))
630}
631
632/// Build a `Validation` error describing a workspace-lock failure.
633fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
634    SyncError::Validation {
635        errors: vec![PackValidationError::DependsOnUnsatisfied {
636            pack: "<workspace-lock>".into(),
637            required: format!("{}: {reason}", ws_lock_path.display()),
638        }],
639    }
640}
641
642/// Compute the default workspace path when `override_` is absent.
643fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> PathBuf {
644    if let Some(p) = override_ {
645        return p.to_path_buf();
646    }
647    let anchor = pack_root_dir(pack_root);
648    anchor.join(".grex").join("workspace")
649}
650
651/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
652fn pack_root_dir(pack_root: &Path) -> PathBuf {
653    let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
654    if is_yaml {
655        pack_root
656            .parent()
657            .and_then(Path::parent)
658            .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
659    } else {
660        pack_root.to_path_buf()
661    }
662}
663
664/// Compute the `.grex/grex.jsonl` path next to the pack root.
665fn event_log_path(pack_root: &Path) -> PathBuf {
666    pack_root_dir(pack_root).join(".grex").join("grex.jsonl")
667}
668
669/// Compute the sidecar lock path next to the event log. One canonical slot
670/// per pack root — cooperating grex procs serialize through this file.
671fn event_lock_path(event_log: &Path) -> PathBuf {
672    event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
673}
674
675/// Compute the sidecar lock path for the workspace itself. Lives at
676/// `<workspace>/.grex.sync.lock` — the workspace dir is already created by
677/// the `run()` prologue, so the lock sidecar lands beside the child clones.
678fn workspace_lock_path(workspace: &Path) -> PathBuf {
679    workspace.join(".grex.sync.lock")
680}
681
682/// Aggregate manifest-level + graph-level validators and return their output.
683fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
684    let mut errors: Vec<PackValidationError> = Vec::new();
685    for node in graph.nodes() {
686        if let Err(mut e) = node.manifest.validate_plan() {
687            errors.append(&mut e);
688        }
689    }
690    if let Err(mut e) = graph.validate() {
691        errors.append(&mut e);
692    }
693    if errors.is_empty() {
694        Ok(())
695    } else {
696        Err(SyncError::Validation { errors })
697    }
698}
699
700/// Depth-first post-order traversal of the graph starting from root.
701///
702/// Children fully precede their parent in the returned vector so downstream
703/// executors install leaves first and the root last.
704fn post_order(graph: &PackGraph) -> Vec<usize> {
705    let mut out = Vec::with_capacity(graph.nodes().len());
706    visit_post(graph, 0, &mut out);
707    out
708}
709
710fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
711    // Collect child ids first to avoid borrow conflicts with graph iteration.
712    let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
713    for k in kids {
714        visit_post(graph, k, out);
715    }
716    out.push(id);
717}
718
719/// Drive every action for every node; abort on the first [`ExecError`].
720///
721/// Each action is bracketed by three manifest events:
722/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
723/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
724/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
725///
726/// All three writes go through the same [`ManifestLock`]-wrapped path
727/// ([`append_manifest_event`]) and failures are recorded as non-fatal
728/// warnings so the executor's outcome always dominates. The third append
729/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
730/// with the exact action that halted.
731// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
732// now pushes past the 50-LOC per-function lint by one line. Silence
733// that one — the body itself is unchanged in scope.
734#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
735fn run_actions(
736    report: &mut SyncReport,
737    order: &[usize],
738    vars: &VarEnv,
739    workspace: &Path,
740    event_log: &Path,
741    lock_path: &Path,
742    dry_run: bool,
743    prior_lock: &std::collections::HashMap<String, LockEntry>,
744    next_lock: &mut std::collections::HashMap<String, LockEntry>,
745    registry: &Arc<Registry>,
746    pack_type_registry: &Arc<PackTypeRegistry>,
747    only: Option<&GlobSet>,
748    force: bool,
749    parallel: usize,
750    scheduler: &Arc<Scheduler>,
751) {
752    let plan = PlanExecutor::with_registry(registry.clone());
753    let fs = FsExecutor::with_registry(registry.clone());
754    let rt = build_pack_type_runtime(parallel);
755    let visited_meta = new_visited_meta();
756    for &id in order {
757        let Some(node) = report.graph.node(id) else { continue };
758        let pack_name = node.name.clone();
759        let pack_path = node.path.clone();
760        let actions = node.manifest.actions.clone();
761        let manifest = node.manifest.clone();
762        let commit_sha = node.commit_sha.clone().unwrap_or_default();
763        // `--only` filter + skip-on-hash short-circuits colocated in
764        // `try_skip_or_filter` so this outer loop stays within the
765        // 50-LOC per-function budget.
766        if try_skip_or_filter(
767            report,
768            only,
769            &pack_name,
770            &pack_path,
771            &actions,
772            &commit_sha,
773            workspace,
774            prior_lock,
775            next_lock,
776            dry_run,
777            force,
778        ) {
779            continue;
780        }
781        let pack_halted = run_pack_lifecycle(
782            report,
783            vars,
784            workspace,
785            event_log,
786            lock_path,
787            dry_run,
788            &plan,
789            &fs,
790            registry,
791            pack_type_registry,
792            &rt,
793            &pack_name,
794            &pack_path,
795            &manifest,
796            &visited_meta,
797            scheduler,
798        );
799        if pack_halted {
800            // Route (b) halt-state gating: drop any prior entry for the
801            // halted pack so the next sync sees no prior hash and
802            // re-executes from scratch. Successful packs in this same
803            // run keep their freshly-upserted entries, and packs we did
804            // not reach keep their prior entries untouched.
805            next_lock.remove(&pack_name);
806            return;
807        }
808        // Successful pack — record a fresh lockfile entry so the next
809        // run's skip-on-hash test can succeed. Commit SHA is now plumbed
810        // from the walker (M4-D): `PackNode::commit_sha` carries the
811        // resolved HEAD SHA when the pack's working tree is a git
812        // repository, otherwise an empty string keeps the hash stable.
813        let actions_hash = compute_actions_hash(&actions, &commit_sha);
814        upsert_lock_entry(next_lock, &pack_name, &commit_sha, &actions_hash);
815    }
816}
817
818/// Build the multi-thread tokio runtime used to drive async pack-type
819/// plugin dispatch. Pack-type plugins expose `async fn` methods via
820/// `async_trait`, but the sync driver is synchronous end-to-end — we
821/// block on each plugin future inside the outer action loop. Extracted
822/// into a standalone helper so the runtime construction does not
823/// inflate `run_actions` beyond the 50-LOC per-function budget.
824///
825/// # Multi-thread rationale (M5-2c)
826///
827/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
828/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
829/// itself is purely `async` / `.await` (no nested `block_on`), but future
830/// plugin authors may reasonably compose `block_on` calls inside
831/// lifecycle hooks — and external callers that drive `MetaPlugin` via
832/// `rt.block_on(...)` within their own runtime would deadlock on a
833/// current-thread runtime the moment a hook re-enters. A multi-thread
834/// runtime with a small worker pool lets those re-entries resolve on a
835/// sibling worker instead of blocking the dispatcher thread.
836///
837/// # Worker-thread sizing (feat-m6 H6)
838///
839/// The worker pool is sized from the resolved `--parallel` knob so the
840/// runtime always has enough workers to service every in-flight pack op
841/// plus at least one sibling for nested `block_on`. Clamped to
842/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
843/// one sibling so re-entrant hooks never deadlock), and the upper bound
844/// caps the pool at the host's CPU count so `--parallel 0`
845/// (unbounded-semantics) does not explode the worker count.
846fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
847    let workers = parallel.clamp(2, num_cpus::get().max(2));
848    tokio::runtime::Builder::new_multi_thread()
849        .worker_threads(workers)
850        .enable_all()
851        .build()
852        .expect("tokio runtime for pack-type dispatch")
853}
854
855/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
856/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
857/// the argument is threaded through so future explicit-install /
858/// teardown verbs can share the same set shape.
859fn new_visited_meta() -> MetaVisitedSet {
860    std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
861}
862
863/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
864/// `true` when the outer loop should `continue` for this pack.
865///
866/// Extracted from `run_actions` so that function stays under the
867/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
868/// is a pure structural refactor.
869#[allow(clippy::too_many_arguments)]
870fn try_skip_or_filter(
871    report: &mut SyncReport,
872    only: Option<&GlobSet>,
873    pack_name: &str,
874    pack_path: &Path,
875    actions: &[Action],
876    commit_sha: &str,
877    workspace: &Path,
878    prior_lock: &std::collections::HashMap<String, LockEntry>,
879    next_lock: &mut std::collections::HashMap<String, LockEntry>,
880    dry_run: bool,
881    force: bool,
882) -> bool {
883    if skip_for_only_filter(only, pack_name, pack_path, workspace) {
884        if let Some(prev) = prior_lock.get(pack_name) {
885            next_lock.insert(pack_name.to_string(), prev.clone());
886        }
887        return true;
888    }
889    try_skip_pack(
890        report, pack_name, pack_path, actions, commit_sha, prior_lock, next_lock, dry_run, force,
891    )
892}
893
894/// Return `true` when `--only` is active and the pack's
895/// **workspace-relative path** (normalized to forward-slash form) does
896/// not match any of the registered globs. Name-fallback matching was
897/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
898/// "pack paths" and cross-platform consistency requires a single
899/// normalized representation rather than `display()`-formatted strings
900/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
901/// as a glob-escape, not a path separator). For the root pack whose
902/// `pack_path` is not under `workspace`, the fallback is to match
903/// against the absolute path's forward-slash form.
904fn skip_for_only_filter(
905    only: Option<&GlobSet>,
906    pack_name: &str,
907    pack_path: &Path,
908    workspace: &Path,
909) -> bool {
910    let Some(set) = only else { return false };
911    let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
912    let rel_str = rel.to_string_lossy().replace('\\', "/");
913    let matches = set.is_match(&rel_str);
914    if !matches {
915        tracing::info!(
916            target: "grex::sync",
917            "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
918        );
919    }
920    !matches
921}
922
923/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
924///
925/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
926/// with a pack-type-aware dispatch:
927///
928/// * [`PackType::Declarative`] retains the per-action execution shape that
929///   M4 shipped — each action lands its own `ActionStarted` /
930///   `ActionCompleted` / `ActionHalted` event bracket. The registry is
931///   still consulted via [`PackTypeRegistry::get`] as a name-oracle so
932///   mistyped packs fail closed.
933/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
934///   pack-type plugin's `sync` method (the sync CLI verb is the only
935///   caller in M5-1; `install` / `update` / `teardown` verbs wire in
936///   M5-2), returning a single aggregate [`ExecStep`]. A single event
937///   bracket frames the async call.
938///
939/// Declarative is kept on the legacy per-action path because its event log
940/// semantics (one event per action, per-step rollback context) are exactly
941/// what plugin authors expect to observe. Unifying declarative under the
942/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
943/// to emit a step stream rather than a single aggregate.
944#[allow(clippy::too_many_arguments)]
945fn run_pack_lifecycle(
946    report: &mut SyncReport,
947    vars: &VarEnv,
948    workspace: &Path,
949    event_log: &Path,
950    lock_path: &Path,
951    dry_run: bool,
952    plan: &PlanExecutor,
953    fs: &FsExecutor,
954    registry: &Arc<Registry>,
955    pack_type_registry: &Arc<PackTypeRegistry>,
956    rt: &tokio::runtime::Runtime,
957    pack_name: &str,
958    pack_path: &Path,
959    manifest: &crate::pack::PackManifest,
960    visited_meta: &MetaVisitedSet,
961    scheduler: &Arc<Scheduler>,
962) -> bool {
963    let type_tag = manifest.r#type.as_str();
964    // Name-oracle check: every pack type must be registered. Unknown
965    // pack types halt the pack the same way M4 halted unknown actions.
966    if pack_type_registry.get(type_tag).is_none() {
967        let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
968        record_action_err(report, event_log, lock_path, pack_name, 0, "pack-type", err);
969        return true;
970    }
971    match manifest.r#type {
972        crate::pack::PackType::Declarative => run_declarative_actions(
973            report,
974            vars,
975            workspace,
976            event_log,
977            lock_path,
978            dry_run,
979            plan,
980            fs,
981            pack_name,
982            pack_path,
983            manifest,
984            &manifest.actions,
985            scheduler,
986        ),
987        crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
988            report,
989            vars,
990            workspace,
991            event_log,
992            lock_path,
993            registry,
994            pack_type_registry,
995            rt,
996            pack_name,
997            pack_path,
998            manifest,
999            type_tag,
1000            visited_meta,
1001            scheduler,
1002        ),
1003    }
1004}
1005
1006/// Run a declarative pack's actions sequentially. Preserves the M4
1007/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1008/// `ActionHalted`). Returns `true` when the sync must halt.
1009#[allow(clippy::too_many_arguments)]
1010fn run_declarative_actions(
1011    report: &mut SyncReport,
1012    vars: &VarEnv,
1013    workspace: &Path,
1014    event_log: &Path,
1015    lock_path: &Path,
1016    dry_run: bool,
1017    plan: &PlanExecutor,
1018    fs: &FsExecutor,
1019    pack_name: &str,
1020    pack_path: &Path,
1021    manifest: &crate::pack::PackManifest,
1022    actions: &[Action],
1023    scheduler: &Arc<Scheduler>,
1024) -> bool {
1025    // `apply_gitignore` is called per-lifecycle by each PackTypePlugin
1026    // for meta/scripted, and here for declarative (which bypasses the
1027    // plugin in `sync::run`'s per-action driver). Keeping plugins as
1028    // the single apply site everywhere else means the declarative
1029    // per-action path is the only code outside the PackTypePlugin
1030    // surface that needs a direct apply call.
1031    if !dry_run {
1032        let ctx = ExecCtx::new(vars, pack_path, workspace)
1033            .with_platform(Platform::current())
1034            .with_scheduler(scheduler);
1035        if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1036            record_action_err(report, event_log, lock_path, pack_name, 0, "gitignore", e);
1037            return true;
1038        }
1039    }
1040    for (idx, action) in actions.iter().enumerate() {
1041        let ctx = ExecCtx::new(vars, pack_path, workspace)
1042            .with_platform(Platform::current())
1043            .with_scheduler(scheduler);
1044        let action_tag = action_kind_tag(action);
1045        append_manifest_event(
1046            event_log,
1047            lock_path,
1048            &Event::ActionStarted {
1049                ts: Utc::now(),
1050                pack: pack_name.to_string(),
1051                action_idx: idx,
1052                action_name: action_tag.to_string(),
1053            },
1054            &mut report.event_log_warnings,
1055        );
1056        let step_result =
1057            if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1058        if !record_action_outcome(
1059            report,
1060            event_log,
1061            lock_path,
1062            pack_name,
1063            idx,
1064            action_tag,
1065            step_result,
1066        ) {
1067            return true;
1068        }
1069    }
1070    false
1071}
1072
1073/// Dispatch a pack-type plugin (meta / scripted) through the async
1074/// registry. Brackets the call with a single `ActionStarted` /
1075/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1076/// when the sync must halt.
1077#[allow(clippy::too_many_arguments)]
1078fn dispatch_pack_type_plugin(
1079    report: &mut SyncReport,
1080    vars: &VarEnv,
1081    workspace: &Path,
1082    event_log: &Path,
1083    lock_path: &Path,
1084    registry: &Arc<Registry>,
1085    pack_type_registry: &Arc<PackTypeRegistry>,
1086    rt: &tokio::runtime::Runtime,
1087    pack_name: &str,
1088    pack_path: &Path,
1089    manifest: &crate::pack::PackManifest,
1090    type_tag: &'static str,
1091    visited_meta: &MetaVisitedSet,
1092    scheduler: &Arc<Scheduler>,
1093) -> bool {
1094    // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1095    // The sync driver already walks children in post-order via the tree
1096    // walker; attaching the visited set would trigger MetaPlugin's
1097    // real-recursion branch and cause double dispatch (walker runs child
1098    // packs as their own graph nodes, then MetaPlugin would recurse into
1099    // them again). The `visited_meta` parameter is kept on the argument
1100    // list so future explicit-install / teardown verbs that invoke
1101    // MetaPlugin directly can share the same set shape.
1102    let _ = visited_meta;
1103    let ctx = ExecCtx::new(vars, pack_path, workspace)
1104        .with_platform(Platform::current())
1105        .with_registry(registry)
1106        .with_pack_type_registry(pack_type_registry)
1107        .with_scheduler(scheduler);
1108    append_manifest_event(
1109        event_log,
1110        lock_path,
1111        &Event::ActionStarted {
1112            ts: Utc::now(),
1113            pack: pack_name.to_string(),
1114            action_idx: 0,
1115            action_name: type_tag.to_string(),
1116        },
1117        &mut report.event_log_warnings,
1118    );
1119    // SAFETY: `get` just confirmed the plugin is registered for
1120    // `type_tag`, so this unwrap cannot panic under the matched arm.
1121    let plugin = pack_type_registry
1122        .get(type_tag)
1123        .expect("pack-type plugin must be registered (guarded above)");
1124    // feat-m6 CI fix — establish a task-local tier stack frame for every
1125    // async dispatch. Without this, `TierGuard::push` (which runs inside
1126    // the plugin lifecycle and may span `.await` / thread hops under the
1127    // multi-thread runtime) has no enforcement frame to push into.
1128    let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1129    !record_action_outcome(report, event_log, lock_path, pack_name, 0, type_tag, step_result)
1130}
1131
1132/// Decide whether `pack_name` can be short-circuited via a lockfile
1133/// hash match. When the prior hash matches the freshly-computed hash,
1134/// emit a single [`ExecResult::Skipped`] step and carry the prior
1135/// lockfile entry forward unchanged. Returns `true` when the pack was
1136/// skipped.
1137#[allow(clippy::too_many_arguments)]
1138fn try_skip_pack(
1139    report: &mut SyncReport,
1140    pack_name: &str,
1141    pack_path: &Path,
1142    actions: &[Action],
1143    commit_sha: &str,
1144    prior_lock: &std::collections::HashMap<String, LockEntry>,
1145    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1146    dry_run: bool,
1147    force: bool,
1148) -> bool {
1149    if dry_run || force {
1150        // Dry runs must always produce the planned-step transcript so
1151        // authors can see what `sync` *would* do. `--force` is the
1152        // operator's explicit opt-out from the hash short-circuit.
1153        return false;
1154    }
1155    let Some(prior) = prior_lock.get(pack_name) else {
1156        return false;
1157    };
1158    let hash = compute_actions_hash(actions, commit_sha);
1159    if prior.actions_hash != hash {
1160        return false;
1161    }
1162    let skipped_step = ExecStep {
1163        action_name: Cow::Borrowed("pack"),
1164        result: ExecResult::Skipped {
1165            pack_path: pack_path.to_path_buf(),
1166            actions_hash: hash.clone(),
1167        },
1168        // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1169        // short-circuit detail; we use it here instead of the prior
1170        // `Require { Satisfied, Skip }` proxy so renderers and consumers
1171        // can match on a single, purpose-built variant.
1172        details: StepKind::PackSkipped { actions_hash: hash },
1173    };
1174    report.steps.push(SyncStep {
1175        pack: pack_name.to_string(),
1176        action_idx: 0,
1177        exec_step: skipped_step,
1178    });
1179    // Carry the prior entry forward so the next-lock snapshot stays
1180    // consistent with what's on disk.
1181    next_lock.insert(pack_name.to_string(), prior.clone());
1182    true
1183}
1184
1185/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1186///
1187/// Stores `commit_sha` verbatim — including the empty string when the
1188/// pack is not a git working tree or the HEAD probe failed.
1189/// `actions_hash` is computed over the same `commit_sha`, so the two
1190/// fields stay internally consistent: if probing starts returning a
1191/// non-empty SHA on the next run, the hash differs and the skip is
1192/// correctly invalidated. The prior-preserve carve-out that was
1193/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1194/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1195fn upsert_lock_entry(
1196    next_lock: &mut std::collections::HashMap<String, LockEntry>,
1197    pack_name: &str,
1198    commit_sha: &str,
1199    actions_hash: &str,
1200) {
1201    let installed_at = Utc::now();
1202    let entry = next_lock.get(pack_name).map_or_else(
1203        || LockEntry {
1204            id: pack_name.to_string(),
1205            sha: commit_sha.to_string(),
1206            branch: String::new(),
1207            installed_at,
1208            actions_hash: actions_hash.to_string(),
1209            schema_version: "1".to_string(),
1210        },
1211        |prev| LockEntry {
1212            installed_at,
1213            actions_hash: actions_hash.to_string(),
1214            sha: commit_sha.to_string(),
1215            ..prev.clone()
1216        },
1217    );
1218    next_lock.insert(pack_name.to_string(), entry);
1219}
1220
1221/// Record one action outcome into `report` + event log. Returns `false`
1222/// when the run must halt (on error); `true` otherwise.
1223fn record_action_outcome(
1224    report: &mut SyncReport,
1225    event_log: &Path,
1226    lock_path: &Path,
1227    pack_name: &str,
1228    idx: usize,
1229    action_tag: &'static str,
1230    step_result: Result<ExecStep, ExecError>,
1231) -> bool {
1232    match step_result {
1233        Ok(step) => {
1234            record_action_ok(report, event_log, lock_path, pack_name, idx, step);
1235            true
1236        }
1237        Err(e) => {
1238            record_action_err(report, event_log, lock_path, pack_name, idx, action_tag, e);
1239            false
1240        }
1241    }
1242}
1243
1244/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1245/// audit event, then push the step onto the report.
1246fn record_action_ok(
1247    report: &mut SyncReport,
1248    event_log: &Path,
1249    lock_path: &Path,
1250    pack_name: &str,
1251    idx: usize,
1252    step: ExecStep,
1253) {
1254    append_step_event(event_log, lock_path, pack_name, &step, &mut report.event_log_warnings);
1255    append_manifest_event(
1256        event_log,
1257        lock_path,
1258        &Event::ActionCompleted {
1259            ts: Utc::now(),
1260            pack: pack_name.to_string(),
1261            action_idx: idx,
1262            result_summary: format!("{:?}", step.result),
1263        },
1264        &mut report.event_log_warnings,
1265    );
1266    report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1267}
1268
1269/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
1270/// rich `HaltedContext` into `report.halted`.
1271fn record_action_err(
1272    report: &mut SyncReport,
1273    event_log: &Path,
1274    lock_path: &Path,
1275    pack_name: &str,
1276    idx: usize,
1277    action_tag: &'static str,
1278    e: ExecError,
1279) {
1280    let error_summary = truncate_error_summary(&e);
1281    append_manifest_event(
1282        event_log,
1283        lock_path,
1284        &Event::ActionHalted {
1285            ts: Utc::now(),
1286            pack: pack_name.to_string(),
1287            action_idx: idx,
1288            action_name: action_tag.to_string(),
1289            error_summary,
1290        },
1291        &mut report.event_log_warnings,
1292    );
1293    let recovery_hint = recovery_hint_for(&e);
1294    report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
1295        pack: pack_name.to_string(),
1296        action_idx: idx,
1297        action_name: action_tag.to_string(),
1298        error: e,
1299        recovery_hint,
1300    })));
1301}
1302
1303/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
1304/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
1305/// stays uniform.
1306fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
1307    use crate::pack::Action;
1308    match action {
1309        Action::Symlink(_) => "symlink",
1310        Action::Unlink(_) => "unlink",
1311        Action::Env(_) => "env",
1312        Action::Mkdir(_) => "mkdir",
1313        Action::Rmdir(_) => "rmdir",
1314        Action::Require(_) => "require",
1315        Action::When(_) => "when",
1316        Action::Exec(_) => "exec",
1317    }
1318}
1319
1320/// Produce a bounded human summary of an [`ExecError`] for
1321/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
1322/// from pathological blowup when captured stderr is large.
1323fn truncate_error_summary(err: &ExecError) -> String {
1324    let mut s = err.to_string();
1325    if s.len() > ACTION_ERROR_SUMMARY_MAX {
1326        s.truncate(ACTION_ERROR_SUMMARY_MAX);
1327        s.push_str("…[truncated]");
1328    }
1329    s
1330}
1331
1332/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
1333/// `None` when no generic advice applies; the error's own `Display`
1334/// output is already shown by the `Halted` variant's format string.
1335fn recovery_hint_for(err: &ExecError) -> Option<String> {
1336    match err {
1337        ExecError::SymlinkDestOccupied { .. } => Some(
1338            "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
1339                .into(),
1340        ),
1341        ExecError::SymlinkPrivilegeDenied { .. } => {
1342            Some("enable Windows Developer Mode or re-run grex as administrator".into())
1343        }
1344        ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
1345            Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
1346        }
1347        ExecError::RmdirNotEmpty { .. } => {
1348            Some("set `force: true` on the rmdir action to recurse".into())
1349        }
1350        ExecError::EnvPersistenceDenied { .. } => {
1351            Some("re-run elevated (Machine scope needs admin)".into())
1352        }
1353        _ => None,
1354    }
1355}
1356
1357/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
1358///
1359/// Failures log a warning and are recorded in the report's
1360/// `event_log_warnings`; they do not abort the sync (spec: event-log write
1361/// failures are non-fatal).
1362///
1363/// # Concurrency
1364///
1365/// The append is serialized through a [`ManifestLock`] held across the
1366/// write. The lock is acquired **per action** (not once across the full
1367/// traversal) so cooperating grex processes can observe mid-progress log
1368/// state between actions; fd-lock acquisition is cheap on modern kernels
1369/// and sync runs are dominated by executor side effects, not lock waits.
1370/// This closes the bypass gap surfaced by the M3 concurrency review where
1371/// `append_event` was called without any cross-process serialisation.
1372fn append_step_event(
1373    log: &Path,
1374    lock_path: &Path,
1375    pack: &str,
1376    step: &ExecStep,
1377    warnings: &mut Vec<String>,
1378) {
1379    let summary = format!("{}:{:?}", step.action_name, step.result);
1380    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
1381    if let Err(e) = append_event_locked(log, lock_path, &event) {
1382        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
1383        warnings.push(format!("{}: {e}", log.display()));
1384    }
1385    // Schema version is recorded once at the manifest level by existing
1386    // manifest code; this stub uses the constant to keep a single source of
1387    // truth for forward-compat.
1388    let _ = SCHEMA_VERSION;
1389}
1390
1391/// Append a single [`Event`] under the shared [`ManifestLock`] path.
1392/// Failures are logged and recorded as non-fatal warnings — the spec
1393/// marks event-log write failures as non-aborting so a transient disk
1394/// error must not kill a sync mid-stream.
1395fn append_manifest_event(log: &Path, lock_path: &Path, event: &Event, warnings: &mut Vec<String>) {
1396    if let Err(e) = append_event_locked(log, lock_path, event) {
1397        tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
1398        warnings.push(format!("{}: {e}", log.display()));
1399    }
1400}
1401
1402/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
1403/// created lazily on first write.
1404fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
1405    if let Some(parent) = log.parent() {
1406        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1407    }
1408    if let Some(parent) = lock_path.parent() {
1409        std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1410    }
1411    let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
1412    lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
1413}
1414
1415/// Re-export a cheap helper so CLI renderers can label halted steps by node
1416/// name without reaching into the graph twice.
1417#[must_use]
1418pub fn pack_display_name(node: &PackNode) -> &str {
1419    &node.name
1420}
1421
1422/// Run a full teardown over the pack tree rooted at `pack_root`.
1423///
1424/// Mirrors [`run`] but invokes
1425/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
1426/// **reverse** post-order so a parent tears down before its children
1427/// (the inverse of install). Children composed later by an author
1428/// consequently teardown earlier, matching the declarative
1429/// auto-reverse contract (R-M5-11).
1430///
1431/// All other concerns are identical to [`run`]: workspace lock, plan-
1432/// phase validators, lockfile update skipped (teardown does not
1433/// write a `actions_hash` forward), and event-log bracketing.
1434/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
1435/// user explicitly asked to remove the pack, so we always dispatch.
1436///
1437/// # Errors
1438///
1439/// Returns the first error that halts the pipeline — see [`SyncError`].
1440///
1441/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
1442/// the parameter through teardown for parity; stages 3-4 add the polls.
1443pub fn teardown(
1444    pack_root: &Path,
1445    opts: &SyncOptions,
1446    cancel: &CancellationToken,
1447) -> Result<SyncReport, SyncError> {
1448    let _ = cancel;
1449    let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
1450    ensure_workspace_dir(&workspace)?;
1451    let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
1452    let _ws_guard = match ws_lock.try_acquire() {
1453        Ok(Some(g)) => g,
1454        Ok(None) => {
1455            return Err(SyncError::WorkspaceBusy {
1456                workspace: workspace.clone(),
1457                lock_path: ws_lock_path,
1458            });
1459        }
1460        Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
1461    };
1462
1463    let graph =
1464        walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
1465    let prep = prepare_run_context(pack_root, &graph)?;
1466
1467    let mut report = SyncReport {
1468        graph,
1469        steps: Vec::new(),
1470        halted: None,
1471        event_log_warnings: Vec::new(),
1472        pre_run_recovery: prep.pre_run_recovery,
1473    };
1474
1475    // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
1476    // Scheduler, thread it through every `ExecCtx` the teardown path
1477    // constructs. Teardown is the other user-facing verb that owns a
1478    // runtime, so it gets the same wiring.
1479    let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
1480    let scheduler = Arc::new(Scheduler::new(resolved_parallel));
1481    run_teardown(
1482        &mut report,
1483        &prep.order,
1484        &prep.vars,
1485        &workspace,
1486        &prep.event_log,
1487        &prep.lock_path,
1488        &prep.registry,
1489        &prep.pack_type_registry,
1490        resolved_parallel,
1491        &scheduler,
1492    );
1493    Ok(report)
1494}
1495
1496/// Dispatch `teardown` for every pack in **reverse** post-order.
1497/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
1498/// rather than the per-action M4 path because the trait's
1499/// auto-reverse / explicit-block logic must compose with the
1500/// registry; going through the per-action path would mean
1501/// re-implementing inverse synthesis in the sync loop.
1502#[allow(clippy::too_many_arguments)]
1503fn run_teardown(
1504    report: &mut SyncReport,
1505    order: &[usize],
1506    vars: &VarEnv,
1507    workspace: &Path,
1508    event_log: &Path,
1509    lock_path: &Path,
1510    registry: &Arc<Registry>,
1511    pack_type_registry: &Arc<PackTypeRegistry>,
1512    parallel: usize,
1513    scheduler: &Arc<Scheduler>,
1514) {
1515    let rt = build_pack_type_runtime(parallel);
1516    // Reverse post-order: root first, then children. Pack-type plugin
1517    // teardown methods reverse their own children/actions, so the
1518    // outer loop only flips the inter-pack order.
1519    for &id in order.iter().rev() {
1520        let Some(node) = report.graph.node(id) else { continue };
1521        let pack_name = node.name.clone();
1522        let pack_path = node.path.clone();
1523        let manifest = node.manifest.clone();
1524        let type_tag = manifest.r#type.as_str();
1525        if pack_type_registry.get(type_tag).is_none() {
1526            let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1527            record_action_err(report, event_log, lock_path, &pack_name, 0, "pack-type", err);
1528            return;
1529        }
1530        let ctx = ExecCtx::new(vars, &pack_path, workspace)
1531            .with_platform(Platform::current())
1532            .with_registry(registry)
1533            .with_pack_type_registry(pack_type_registry)
1534            .with_scheduler(scheduler);
1535        append_manifest_event(
1536            event_log,
1537            lock_path,
1538            &Event::ActionStarted {
1539                ts: Utc::now(),
1540                pack: pack_name.clone(),
1541                action_idx: 0,
1542                action_name: type_tag.to_string(),
1543            },
1544            &mut report.event_log_warnings,
1545        );
1546        let plugin = pack_type_registry
1547            .get(type_tag)
1548            .expect("pack-type plugin must be registered (guarded above)");
1549        // feat-m6 CI fix — see dispatch_pack_type note.
1550        let step_result =
1551            rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
1552        if !record_action_outcome(
1553            report,
1554            event_log,
1555            lock_path,
1556            &pack_name,
1557            0,
1558            type_tag,
1559            step_result,
1560        ) {
1561            return;
1562        }
1563    }
1564}
1565
1566/// Test-only hook: append one [`Event::Sync`] through the same
1567/// [`ManifestLock`]-serialised path the sync driver uses.
1568///
1569/// Exposed so integration tests under `tests/` can exercise the locked
1570/// append helper without spinning up a full pack tree. Not intended for
1571/// downstream consumers — the signature may change without notice.
1572#[doc(hidden)]
1573pub fn __test_append_sync_event(
1574    log: &Path,
1575    lock_path: &Path,
1576    pack: &str,
1577    action_name: &str,
1578) -> Result<(), String> {
1579    let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
1580    append_event_locked(log, lock_path, &event)
1581}
1582
1583// ----------------------------------------------------------------------
1584// PR E — pre-run teardown scan
1585// ----------------------------------------------------------------------
1586
1587/// One `ActionStarted` event in the manifest log that has no matching
1588/// `ActionCompleted` or `ActionHalted` peer.
1589///
1590/// Dangling starts are the primary crash signal: the process wrote the
1591/// pre-action event, then died before the executor returned. Callers
1592/// should surface these to the operator (diagnostics only this PR; a
1593/// future `grex doctor` verb will act on them).
1594#[non_exhaustive]
1595#[derive(Debug, Clone, PartialEq, Eq)]
1596pub struct DanglingStart {
1597    /// Pack that owned the halted action.
1598    pub pack: String,
1599    /// 0-based action index within the pack.
1600    pub action_idx: usize,
1601    /// Short action kind tag.
1602    pub action_name: String,
1603    /// Timestamp the `ActionStarted` event was written.
1604    pub started_at: DateTime<Utc>,
1605}
1606
1607/// Summary of teardown artifacts found under a pack root before a sync
1608/// begins.
1609///
1610/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
1611/// proceeds regardless of what the scan finds.
1612#[non_exhaustive]
1613#[derive(Debug, Clone, Default, PartialEq, Eq)]
1614pub struct RecoveryReport {
1615    /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
1616    /// original (symlink-action rollback orphan).
1617    pub orphan_backups: Vec<PathBuf>,
1618    /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
1619    /// `backup: true`.
1620    pub orphan_tombstones: Vec<PathBuf>,
1621    /// `ActionStarted` events in the log with no matching
1622    /// `ActionCompleted`/`ActionHalted`.
1623    pub dangling_starts: Vec<DanglingStart>,
1624}
1625
1626impl RecoveryReport {
1627    /// `true` when the scan found nothing worth reporting.
1628    #[must_use]
1629    pub fn is_empty(&self) -> bool {
1630        self.orphan_backups.is_empty()
1631            && self.orphan_tombstones.is_empty()
1632            && self.dangling_starts.is_empty()
1633    }
1634}
1635
1636/// Walk `pack_root` and the manifest log to find crash-recovery artifacts.
1637///
1638/// Inspects:
1639///
1640/// * `<pack_root>/.grex/workspace/**` (and the pack_root itself) for
1641///   `.grex.bak` orphans and timestamped `.grex.bak.<ts>` tombstones.
1642/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
1643///   have no matching `ActionCompleted` / `ActionHalted` successor.
1644///
1645/// Non-blocking: scan errors are swallowed to an empty report so a
1646/// half-readable directory cannot kill a sync that would otherwise
1647/// succeed. Call sites that want to surface scan failures should read
1648/// the manifest directly.
1649///
1650/// # Errors
1651///
1652/// Returns [`SyncError::Validation`] only when the manifest read itself
1653/// reports corruption. Filesystem traversal errors are swallowed.
1654pub fn scan_recovery(pack_root: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
1655    let mut report = RecoveryReport::default();
1656    let workspace_root = pack_root.join(".grex").join("workspace");
1657    walk_for_backups(&workspace_root, &mut report);
1658    // Also scan the pack root itself — symlink destinations often live at
1659    // the top of the tree (e.g. `~/.config/foo`).
1660    walk_for_backups(pack_root, &mut report);
1661    if event_log.exists() {
1662        match read_all(event_log) {
1663            Ok(events) => {
1664                report.dangling_starts = collect_dangling_starts(&events);
1665            }
1666            Err(e) => {
1667                return Err(SyncError::Validation {
1668                    errors: vec![PackValidationError::DependsOnUnsatisfied {
1669                        pack: "<event-log>".into(),
1670                        required: e.to_string(),
1671                    }],
1672                });
1673            }
1674        }
1675    }
1676    Ok(report)
1677}
1678
1679/// Shallow directory walker (bounded depth = 6) that categorizes
1680/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
1681/// report slot. Depth-limited so a pathological workspace with a deep
1682/// tree cannot stall the scan; realistic layouts are well under six
1683/// levels.
1684fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
1685    walk_for_backups_inner(root, report, 0);
1686}
1687
1688fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
1689    const MAX_DEPTH: u32 = 6;
1690    if depth > MAX_DEPTH {
1691        return;
1692    }
1693    let Ok(entries) = std::fs::read_dir(dir) else { return };
1694    for entry in entries.flatten() {
1695        let path = entry.path();
1696        let name = entry.file_name();
1697        let Some(name_str) = name.to_str() else { continue };
1698        if name_str.ends_with(".grex.bak") {
1699            report.orphan_backups.push(path.clone());
1700            continue;
1701        }
1702        if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
1703            // `rsplit_once` returns `(prefix, suffix)`; suffix is the
1704            // timestamp chunk. Accept any non-empty suffix — the exact
1705            // timestamp shape is `fs_executor` internal.
1706            if !rest.1.is_empty() {
1707                report.orphan_tombstones.push(path.clone());
1708                continue;
1709            }
1710        }
1711        // Recurse only into real directories (not symlinks, to avoid
1712        // traversing into the workspace's cloned repos).
1713        let Ok(meta) = entry.metadata() else { continue };
1714        if meta.is_dir() {
1715            walk_for_backups_inner(&path, report, depth + 1);
1716        }
1717    }
1718}
1719
1720/// Reduce an event stream to a list of `ActionStarted` records with no
1721/// matching terminator.
1722///
1723/// Matching is positional per `(pack, action_idx)`: a later
1724/// `ActionCompleted` or `ActionHalted` with the same key clears the
1725/// entry. Whatever remains in the map after the pass is dangling.
1726fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
1727    use std::collections::HashMap;
1728    let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
1729    for ev in events {
1730        match ev {
1731            Event::ActionStarted { ts, pack, action_idx, action_name } => {
1732                open.insert(
1733                    (pack.clone(), *action_idx),
1734                    DanglingStart {
1735                        pack: pack.clone(),
1736                        action_idx: *action_idx,
1737                        action_name: action_name.clone(),
1738                        started_at: *ts,
1739                    },
1740                );
1741            }
1742            Event::ActionCompleted { pack, action_idx, .. }
1743            | Event::ActionHalted { pack, action_idx, .. } => {
1744                open.remove(&(pack.clone(), *action_idx));
1745            }
1746            _ => {}
1747        }
1748    }
1749    let mut out: Vec<DanglingStart> = open.into_values().collect();
1750    out.sort_by_key(|a| a.started_at);
1751    out
1752}