grex_core/sync.rs
1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`crate::tree::sync_meta`] +
7//! [`crate::tree::build_graph`] + [`FsPackLoader`] + a `GitBackend`.
8//! 2. Run plan-phase validators (manifest-level + graph-level).
9//! 3. Execute every action via a pluggable [`ActionExecutor`]
10//! ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
11//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
12//! `.grex/events.jsonl` event log.
13//!
14//! # Traversal order
15//!
16//! Nodes are executed in **depth-first post-order**: children fully install
17//! before their parent. Rationale: parent packs commonly `require:` artifacts
18//! created by children (e.g. a parent symlink whose `src` lives inside a
19//! child). Running the root last matches the overlay-style dotfile-install
20//! intent authors expect, and it matches how `walker.walk` is structured
21//! (children are hydrated before the recursion returns).
22//!
23//! # Decoupling
24//!
25//! The CLI crate drives this module through a thin `run()` entry point;
26//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
27//! expressions, ref overrides) can land in later milestones without breaking
28//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
29//! variant set.
30
31use std::borrow::Cow;
32use std::fs;
33use std::path::{Path, PathBuf};
34use std::sync::Arc;
35
36use chrono::{DateTime, Utc};
37use globset::{Glob, GlobSet, GlobSetBuilder};
38use thiserror::Error;
39use tokio_util::sync::CancellationToken;
40
41use crate::execute::{
42 ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
43 PlanExecutor, Platform, StepKind,
44};
45use crate::fs::{ManifestLock, ScopedLock};
46use crate::git::GixBackend;
47use crate::lockfile::{
48 branch_of, compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
49};
50use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
51use crate::pack::{Action, PackValidationError};
52use crate::plugin::{PackTypeRegistry, Registry};
53use crate::scheduler::Scheduler;
54use crate::tree::{
55 build_graph, sync_meta, FsPackLoader, PackGraph, PackNode, SyncMetaOptions, TreeError,
56};
57use crate::vars::VarEnv;
58
59/// Inputs to [`run`].
60///
61/// Fields are public-writable so call sites can construct with struct
62/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
63/// so future knobs (parallelism, filter expressions, additional ref
64/// strategies) can land without breaking library consumers who
65/// constructed with explicit-literal syntax. Forces callers to use
66/// struct-update syntax (`..Default::default()`).
67#[non_exhaustive]
68#[derive(Debug, Clone)]
69pub struct SyncOptions {
70 /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
71 pub dry_run: bool,
72 /// When `false`, skip plan-phase validators (manifest + graph). Debug
73 /// escape hatch; production callers should leave this `true`.
74 pub validate: bool,
75 /// Override workspace directory. `None` → derived from `pack_root`
76 /// (the directory holding `.grex/pack.yaml`).
77 ///
78 /// **v1.2.1 path (iii) semantics**: when `Some`, this path IS the
79 /// canonical meta directory. Children resolve parent-relatively as
80 /// `<workspace>/<child.path>` and `<workspace>/.grex/pack.yaml` is
81 /// where the root manifest is read from. The path MUST exist;
82 /// symlinks are resolved via `fs::canonicalize` to a single
83 /// inode-stable form. Pre-v1.2.1 the override only re-anchored
84 /// children — that legacy split is retired.
85 pub workspace: Option<PathBuf>,
86 /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
87 /// `Some`, every child pack clone/checkout uses this ref instead of
88 /// the declared `child.ref`. Empty strings are rejected at the CLI
89 /// layer.
90 pub ref_override: Option<String>,
91 /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
92 /// strings — compiled internally via an in-crate `globset` helper so the
93 /// `globset` crate version does not leak into the public API.
94 /// `None` / empty means every pack runs (M3 semantics). Matching is
95 /// against the pack's **workspace-relative** path normalized to
96 /// forward-slash form.
97 pub only_patterns: Option<Vec<String>>,
98 /// Bypass the lockfile hash-match skip (`grex sync --force`). When
99 /// `true`, every pack re-executes even if its `actions_hash` is
100 /// unchanged from the prior lockfile.
101 pub force: bool,
102 /// Max parallel pack ops for this sync run (feat-m6-1).
103 ///
104 /// * `None` → callers default to `num_cpus::get()` at CLI layer.
105 /// Library callers who construct `SyncOptions` directly and leave
106 /// this `None` get `num_cpus::get()` semantics too — the sync
107 /// driver resolves the default in one place so the scheduler slot
108 /// on every `ExecCtx` is always populated.
109 /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
110 /// * `Some(1)` → serial fast-path.
111 /// * `Some(n >= 2)` → bounded parallel.
112 pub parallel: Option<usize>,
113 /// v1.2.0 Stage 1.l prep — when `true`, walker Phase 2 may drop
114 /// dirty trees during prune. Still refuses ignored content unless
115 /// [`SyncOptions::force_prune_with_ignored`] is also `true`.
116 /// Default `false` preserves v1.1.1 behavior (refuse all dirty
117 /// drops).
118 pub force_prune: bool,
119 /// v1.2.0 Stage 1.l prep — when `true` (implies
120 /// [`SyncOptions::force_prune`]), walker Phase 2 also drops
121 /// ignored content. Hard override — the strongest level. Default
122 /// `false` preserves v1.1.1 behavior.
123 pub force_prune_with_ignored: bool,
124 /// v1.2.1 Item 5b — when `true` AND `force_prune` (or
125 /// `force_prune_with_ignored`) is set, divert Phase 2 prunes
126 /// through the snapshot-then-unlink quarantine pipeline. The
127 /// dest's full subtree is recursively copied to
128 /// `<workspace>/.grex/trash/<ISO8601>/<basename>/` BEFORE
129 /// `unlink(dest)` fires. Snapshot or audit-fsync failure aborts
130 /// the prune (no unlink). Lean theorem
131 /// `quarantine_snapshot_precedes_delete` proves the safety
132 /// contract. Default `false` preserves v1.2.0 direct-unlink
133 /// behavior. Has no effect unless one of the `force_prune*`
134 /// flags is also set (the CLI enforces this via
135 /// `requires = "force_prune"`; library callers who set this
136 /// with neither flag get a no-op since Phase 2 will not enter
137 /// the override path at all).
138 pub quarantine: bool,
139 /// v1.2.0 Stage 1.h opt-in — when `true`, the walker rewrites a
140 /// legacy v1.1.1 lockfile in place to the v1.2.0 shape. When
141 /// `false` (default), the walker errors on the legacy shape so
142 /// migration is always an explicit caller decision.
143 pub migrate_lockfile: bool,
144 /// v1.2.0 Stage 1.j prep — when `true` (default), the walker
145 /// descends into nested meta-children. `doctor --shallow` flips
146 /// this to `false` so only the immediate workspace is inspected.
147 pub recurse: bool,
148 /// v1.2.0 Stage 1.j prep — pairs with
149 /// [`SyncOptions::recurse`] for `--shallow=N`. `None` (default)
150 /// is unbounded recursion when `recurse` is `true`. `Some(n)`
151 /// caps depth at `n` levels of nesting.
152 pub max_depth: Option<usize>,
153 /// v1.2.5 — when `Some(N)`, every meta sync starts with a
154 /// best-effort GC sweep over `<meta>/.grex/trash/`, deleting
155 /// entries older than `N` days. `None` (default) preserves the
156 /// v1.2.1 indefinite-retention behavior. The CLI surfaces this
157 /// as `grex sync --retain-days N`; library callers wire it via
158 /// [`SyncOptions::with_retain_days`]. Sweep failures log via
159 /// `tracing::warn!` and DO NOT halt the sync.
160 pub retain_days: Option<u32>,
161}
162
163impl Default for SyncOptions {
164 fn default() -> Self {
165 Self {
166 dry_run: false,
167 validate: true,
168 workspace: None,
169 ref_override: None,
170 only_patterns: None,
171 force: false,
172 parallel: None,
173 // v1.2.0 Stage 1.m additions — defaults preserve v1.1.1
174 // behavior. Each field is a dormant placeholder until
175 // its corresponding walker stage wires it.
176 force_prune: false,
177 force_prune_with_ignored: false,
178 quarantine: false,
179 migrate_lockfile: false,
180 recurse: true,
181 max_depth: None,
182 retain_days: None,
183 }
184 }
185}
186
187/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
188/// Empty / absent input yields `Ok(None)` so M3's zero-config path
189/// (every pack runs) stays the default.
190fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
191 let Some(pats) = patterns else { return Ok(None) };
192 if pats.is_empty() {
193 return Ok(None);
194 }
195 let mut builder = GlobSetBuilder::new();
196 for p in pats {
197 let glob = Glob::new(p)
198 .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
199 builder.add(glob);
200 }
201 let set = builder
202 .build()
203 .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
204 Ok(Some(set))
205}
206
207impl SyncOptions {
208 /// Default options: wet-run, validators enabled, default workspace path.
209 #[must_use]
210 pub fn new() -> Self {
211 Self::default()
212 }
213
214 /// Set `dry_run`.
215 #[must_use]
216 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
217 self.dry_run = dry_run;
218 self
219 }
220
221 /// Set `validate`.
222 #[must_use]
223 pub fn with_validate(mut self, validate: bool) -> Self {
224 self.validate = validate;
225 self
226 }
227
228 /// Set `workspace` override.
229 #[must_use]
230 pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
231 self.workspace = workspace;
232 self
233 }
234
235 /// Set `ref_override` (`--ref`).
236 #[must_use]
237 pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
238 self.ref_override = ref_override;
239 self
240 }
241
242 /// Set `only_patterns` (`--only`). Empty vector or `None` disables
243 /// the filter.
244 #[must_use]
245 pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
246 self.only_patterns = patterns;
247 self
248 }
249
250 /// Set `force` (`--force`).
251 #[must_use]
252 pub fn with_force(mut self, force: bool) -> Self {
253 self.force = force;
254 self
255 }
256
257 /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
258 /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
259 #[must_use]
260 pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
261 self.parallel = parallel;
262 self
263 }
264
265 /// Set `force_prune` (`--force-prune`). See
266 /// [`SyncOptions::force_prune`] for the override matrix.
267 #[must_use]
268 pub fn with_force_prune(mut self, force_prune: bool) -> Self {
269 self.force_prune = force_prune;
270 self
271 }
272
273 /// Set `force_prune_with_ignored` (`--force-prune-with-ignored`).
274 /// See [`SyncOptions::force_prune_with_ignored`] for the override
275 /// matrix.
276 #[must_use]
277 pub fn with_force_prune_with_ignored(mut self, force_prune_with_ignored: bool) -> Self {
278 self.force_prune_with_ignored = force_prune_with_ignored;
279 self
280 }
281
282 /// Set `quarantine` (`--quarantine`). See
283 /// [`SyncOptions::quarantine`] for the snapshot-before-delete
284 /// contract. Has no effect unless [`SyncOptions::force_prune`]
285 /// or [`SyncOptions::force_prune_with_ignored`] is also set.
286 #[must_use]
287 pub fn with_quarantine(mut self, quarantine: bool) -> Self {
288 self.quarantine = quarantine;
289 self
290 }
291
292 /// Set `retain_days` (`--retain-days N`). See
293 /// [`SyncOptions::retain_days`] for the GC-sweep contract.
294 /// `None` preserves v1.2.1 indefinite-retention behavior;
295 /// `Some(N)` triggers a best-effort sweep at the start of every
296 /// meta sync.
297 #[must_use]
298 pub fn with_retain_days(mut self, retain_days: Option<u32>) -> Self {
299 self.retain_days = retain_days;
300 self
301 }
302}
303
304/// One executed (or planned) action step in a sync run.
305///
306/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
307/// plugin provenance) can land without breaking library consumers who
308/// destructure the struct.
309#[non_exhaustive]
310#[derive(Debug, Clone)]
311pub struct SyncStep {
312 /// Name of the pack that owned the action.
313 pub pack: String,
314 /// 0-based index into the pack's top-level `actions` vector.
315 pub action_idx: usize,
316 /// The [`ExecStep`] record emitted by the executor.
317 pub exec_step: ExecStep,
318}
319
320/// Outcome of a [`run`] invocation.
321///
322/// On fail-fast termination, `halted` carries the error that stopped the
323/// sync; every completed step up to that point is still in `steps` so
324/// callers can render a partial transcript.
325///
326/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
327/// can land without breaking library consumers who destructure the struct.
328#[non_exhaustive]
329#[derive(Debug)]
330pub struct SyncReport {
331 /// Fully-walked pack graph (present even on halted runs).
332 pub graph: PackGraph,
333 /// Steps produced by the executor, in execution order.
334 pub steps: Vec<SyncStep>,
335 /// `Some(e)` if execution stopped before all actions ran.
336 pub halted: Option<SyncError>,
337 /// Non-fatal manifest-append warnings (one per failed event append).
338 /// Kept as a separate field because spec marks event-log write failures
339 /// as non-aborting.
340 pub event_log_warnings: Vec<String>,
341 /// `Some(r)` when the pre-run teardown scan found orphaned backup
342 /// files or dangling [`Event::ActionStarted`] records from a prior
343 /// crashed run. Informational only — the report is still returned and
344 /// the sync proceeds. CLI renderers should surface a warning so the
345 /// operator can decide whether to run a future `grex doctor` verb.
346 pub pre_run_recovery: Option<RecoveryReport>,
347 /// One entry per child whose legacy `.grex/workspace/<name>/` layout
348 /// was relocated (or considered for relocation) on this sync. Empty
349 /// when no legacy directory was found — the common case for any
350 /// workspace built fresh on v1.1.0+. CLI renderers should surface
351 /// the list so operators see what changed.
352 pub workspace_migrations: Vec<WorkspaceMigration>,
353}
354
355/// One legacy-layout migration attempt. `outcome` distinguishes the
356/// move-succeeded case from the don't-clobber-user-data case so CLI
357/// renderers can present different advice to the operator.
358#[non_exhaustive]
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct WorkspaceMigration {
361 /// Source path under the legacy `.grex/workspace/<name>/` location,
362 /// rendered relative to the pack root for log readability.
363 pub from: PathBuf,
364 /// Destination flat-sibling path `<pack_root>/<name>/`, relative to
365 /// the pack root.
366 pub to: PathBuf,
367 /// What happened.
368 pub outcome: MigrationOutcome,
369}
370
371/// Outcome of one legacy-layout migration attempt.
372#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq, Eq)]
374pub enum MigrationOutcome {
375 /// Legacy directory was renamed onto the flat-sibling slot.
376 Migrated,
377 /// Both legacy and flat-sibling slots existed. Skipped — the user
378 /// must inspect and reconcile manually so we never silently delete
379 /// either.
380 SkippedBothExist,
381 /// Flat-sibling slot already had a non-grex file or directory in
382 /// the way. Skipped — refusing to clobber user data even when the
383 /// legacy slot is plainly the source of truth.
384 SkippedDestOccupied,
385 /// `fs::rename` failed (e.g. cross-volume, ACL denied). The legacy
386 /// directory is still in place; surfaced so the operator can move
387 /// it manually.
388 Failed { error: String },
389}
390
391/// Rich context attached to a [`SyncError::Halted`] variant.
392///
393/// Packages the pack + action position together with the underlying
394/// executor error and an optional human-readable recovery hint. Marked
395/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
396/// land without breaking `match` arms or struct destructures.
397#[non_exhaustive]
398#[derive(Debug)]
399pub struct HaltedContext {
400 /// Name of the pack that owned the halted action.
401 pub pack: String,
402 /// 0-based index into the pack's top-level `actions` vector.
403 pub action_idx: usize,
404 /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
405 pub action_name: String,
406 /// Underlying executor error.
407 pub error: ExecError,
408 /// Optional next-step suggestion for the operator. `None` when no
409 /// generic hint applies — the executor error's own `Display` already
410 /// tells the story.
411 pub recovery_hint: Option<String>,
412}
413
414/// Error taxonomy surfaced by [`run`].
415#[non_exhaustive]
416#[derive(Debug, Error)]
417pub enum SyncError {
418 /// The pack-tree walker failed (loader error, git error, cycle, …).
419 #[error("tree walk failed: {0}")]
420 Tree(#[from] TreeError),
421 /// One or more plan-phase validators flagged the graph.
422 #[error("validation failed: {errors:?}")]
423 Validation {
424 /// Aggregated errors from manifest-level + graph-level validators.
425 errors: Vec<PackValidationError>,
426 },
427 /// An action executor returned an error.
428 ///
429 /// Retained for backward compatibility; new call sites should prefer
430 /// [`SyncError::Halted`] which carries full pack + action context.
431 /// Kept non-deprecated because [`From<ExecError>`] still materialises
432 /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
433 #[error("action execution failed: {0}")]
434 Exec(#[from] ExecError),
435 /// Action execution halted; full context (pack, action index, error,
436 /// optional recovery hint) lives in [`HaltedContext`]. This is the
437 /// variant the sync driver emits — [`SyncError::Exec`] is only
438 /// surfaced by ancillary code paths.
439 #[error(
440 "sync halted at pack `{}` action #{} ({}): {}",
441 .0.pack, .0.action_idx, .0.action_name, .0.error
442 )]
443 Halted(Box<HaltedContext>),
444 /// Another `grex` process (or thread) already holds the workspace-level
445 /// lock. The running sync refused to start to avoid racing two concurrent
446 /// walkers into the same workspace. If the lock file at `lock_path` is
447 /// stale (no other grex is actually running), remove it by hand.
448 #[error(
449 "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
450 )]
451 WorkspaceBusy {
452 /// Resolved workspace directory that the current run tried to lock.
453 workspace: PathBuf,
454 /// Sidecar lock file that is currently held.
455 lock_path: PathBuf,
456 },
457 /// Reading or parsing the resolved-state lockfile failed. Surfaced as
458 /// its own variant (rather than folded into `Validation`) because a
459 /// corrupt / unreadable lockfile is an I/O or schema fault, not a
460 /// dependency-satisfaction fault. Resolution is operator-level
461 /// (restore a backup, delete the file, re-sync), not author-level.
462 #[error("lockfile `{path}` failed to load: {source}")]
463 Lockfile {
464 /// Lockfile path that failed to load.
465 path: PathBuf,
466 /// Underlying lockfile error.
467 #[source]
468 source: LockfileError,
469 },
470 /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
471 /// as its own variant so the CLI can map it to a dedicated usage
472 /// error exit code instead of the generic sync-failure bucket.
473 #[error("invalid --only glob `{pattern}`: {source}")]
474 InvalidOnlyGlob {
475 /// The raw pattern string that failed to compile.
476 pattern: String,
477 /// Underlying globset error.
478 #[source]
479 source: globset::Error,
480 },
481 /// Migrating the v1.x event log (`grex.jsonl`) to the v2 canonical
482 /// path (`.grex/events.jsonl`) failed. Operator-level resolution
483 /// (check filesystem permissions, free disk space, then retry).
484 #[error("event-log migration failed: {0}")]
485 EventLogMigration(#[source] crate::manifest::ManifestError),
486 /// Cooperative cancellation fired (Ctrl-C / SIGTERM) during a
487 /// parallel sync. v1.2.0 Stage 1.g wires the rayon walker to surface
488 /// this distinct-from-failure variant so the CLI can exit with a
489 /// dedicated cancellation code instead of a generic sync error.
490 /// Dormant until Stage 1.g — the existing CLI does not yet emit it.
491 #[error("sync cancelled by user")]
492 SchedulerCancelled,
493}
494
495impl Clone for SyncError {
496 fn clone(&self) -> Self {
497 // `TreeError` / `ExecError` do not implement `Clone` (they wrap
498 // `std::io::Error`-adjacent values). Halts carry only a display
499 // rendering in the report; we re-materialise via a synthetic
500 // `Validation` variant so `SyncReport` can be `Clone`-safe for
501 // observability tooling without widening the taxonomy.
502 match self {
503 Self::Tree(e) => Self::Validation {
504 errors: vec![PackValidationError::DependsOnUnsatisfied {
505 pack: "<tree>".into(),
506 required: e.to_string(),
507 }],
508 },
509 Self::Validation { errors } => Self::Validation { errors: errors.clone() },
510 Self::Exec(e) => Self::Validation {
511 errors: vec![PackValidationError::DependsOnUnsatisfied {
512 pack: "<exec>".into(),
513 required: e.to_string(),
514 }],
515 },
516 Self::Halted(ctx) => Self::Validation {
517 errors: vec![PackValidationError::DependsOnUnsatisfied {
518 pack: ctx.pack.clone(),
519 required: format!(
520 "action #{} ({}): {}",
521 ctx.action_idx, ctx.action_name, ctx.error
522 ),
523 }],
524 },
525 Self::WorkspaceBusy { workspace, lock_path } => {
526 Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
527 }
528 Self::Lockfile { path, source } => Self::Validation {
529 errors: vec![PackValidationError::DependsOnUnsatisfied {
530 pack: "<lockfile>".into(),
531 required: format!("{}: {source}", path.display()),
532 }],
533 },
534 Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
535 errors: vec![PackValidationError::DependsOnUnsatisfied {
536 pack: "<only-glob>".into(),
537 required: format!("{pattern}: {source}"),
538 }],
539 },
540 Self::EventLogMigration(source) => Self::Validation {
541 errors: vec![PackValidationError::DependsOnUnsatisfied {
542 pack: "<event-log-migration>".into(),
543 required: source.to_string(),
544 }],
545 },
546 Self::SchedulerCancelled => Self::SchedulerCancelled,
547 }
548 }
549}
550
551/// Run a full sync over the pack tree rooted at `pack_root`.
552///
553/// Resolution rules:
554/// * If `pack_root` is a directory the walker looks for
555/// `<pack_root>/.grex/pack.yaml`.
556/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
557/// * Workspace defaults to the pack root directory itself when
558/// `opts.workspace` is `None`. Children resolve as flat siblings of the
559/// parent pack root (since v1.1.0).
560///
561/// # Errors
562///
563/// Returns the first error that halts the pipeline — see [`SyncError`] for
564/// the taxonomy.
565///
566/// `cancel` is the cooperative cancellation handle threaded through the
567/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
568/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
569/// acquire). CLI callers pass a never-cancelled sentinel
570/// (`CancellationToken::new()`); the MCP server passes a token tied to
571/// the request lifetime.
572pub fn run(
573 pack_root: &Path,
574 opts: &SyncOptions,
575 cancel: &CancellationToken,
576) -> Result<SyncReport, SyncError> {
577 // Stage 2 is signature-only — silence "unused parameter" without
578 // hiding it behind `_` (downstream stages will read it).
579 let _ = cancel;
580 let workspace = prepare_workspace(pack_root, opts)?;
581 // v1.3.1 (B4) — `dry_run = true` is contractually FS-mutation-free.
582 // `open_workspace_lock` (via `ScopedLock::open`) creates a sidecar
583 // file at `<workspace>/.grex.sync.lock`, which would itself violate
584 // the no-FS-mutation contract. Skip lock acquisition entirely in
585 // dry-run; the dry-run path is read-only by construction so
586 // concurrent dry-runs against the same workspace are safe.
587 let mut ws_lock_holder =
588 if !opts.dry_run { Some(open_workspace_lock(&workspace)?) } else { None };
589 let _ws_guard = try_acquire_workspace_guard(ws_lock_holder.as_mut(), &workspace)?;
590
591 // Compile `--only` patterns into a GlobSet here so the
592 // `globset` crate version does not leak into `SyncOptions`.
593 let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
594
595 // Auto-migrate legacy `.grex/workspace/<name>/` layout BEFORE the
596 // walker resolves children. Idempotent: a fresh v1.1.0+ workspace
597 // sees no legacy directory and the function no-ops.
598 let workspace_migrations = migrate_legacy_workspace(pack_root);
599
600 // v1.2.1 path (iii) — three-stage composition:
601 // sync_meta(workspace, prune_candidates) — mutate (rayon parallel)
602 // build_graph(workspace) — read-only graph
603 // run_actions(graph) — consume graph
604 // `Walker::walk` is retired from the prod path; the symbol is kept
605 // for test-suite compat. See `crates/grex-core/src/tree/graph_build.rs`.
606 run_sync_meta(&workspace, opts)?;
607 let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
608 let prep = prepare_run_context(pack_root, &graph, &workspace)?;
609 log_force_flag(opts.force);
610
611 let mut report = SyncReport {
612 graph,
613 steps: Vec::new(),
614 halted: None,
615 event_log_warnings: Vec::new(),
616 pre_run_recovery: prep.pre_run_recovery,
617 workspace_migrations,
618 };
619
620 let mut next_lock = prep.prior_lock.clone();
621 // feat-m6 B1: resolve `--parallel` once and build the scheduler
622 // shared across every `ExecCtx` in this run. Library callers who
623 // leave `opts.parallel == None` default to `num_cpus::get()` here
624 // (clamped `>= 1`) so the scheduler slot is always populated —
625 // `ctx.scheduler` being `None` would strand acquire-sites into
626 // unbounded concurrency. See `.omne/cfg/concurrency.md` §Scheduler.
627 let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
628 let scheduler = Arc::new(Scheduler::new(resolved_parallel));
629 run_actions(
630 &mut report,
631 &prep.order,
632 &prep.vars,
633 &workspace,
634 &prep.event_log,
635 &prep.lock_path,
636 opts.dry_run,
637 &prep.prior_lock,
638 &mut next_lock,
639 &prep.registry,
640 &prep.pack_type_registry,
641 only_set.as_ref(),
642 opts.force,
643 resolved_parallel,
644 &scheduler,
645 );
646
647 persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
648 Ok(report)
649}
650
651/// Bag of context pieces assembled once at the top of [`run`]. Grouping
652/// them keeps [`run`] under the workspace's 50-LOC function lint without
653/// smearing the read of sequential setup across helpers. Fields are
654/// consumed piecemeal by the actions loop; no getters needed.
655struct RunContext {
656 order: Vec<usize>,
657 vars: VarEnv,
658 event_log: PathBuf,
659 lock_path: PathBuf,
660 lockfile_path: PathBuf,
661 prior_lock: std::collections::HashMap<String, LockEntry>,
662 registry: Arc<Registry>,
663 pack_type_registry: Arc<PackTypeRegistry>,
664 pre_run_recovery: Option<RecoveryReport>,
665}
666
667/// Build the per-run context: traversal order, vars env, event/lockfile
668/// paths, prior lockfile state, bootstrap registry, and (optionally) a
669/// pre-run recovery scan. Kept narrow so [`run`] stays small.
670///
671/// `workspace` is the resolved workspace directory (post `--workspace`
672/// override) so the recovery scan looks for `.grex.bak` artefacts under
673/// the actual on-disk location children were materialised at — not
674/// under the pack root, which differs from the workspace whenever the
675/// CLI's `--workspace` flag is used. Pre-fix this anchoring drift
676/// caused recovery scans to miss every backup left under an override
677/// workspace.
678fn prepare_run_context(
679 pack_root: &Path,
680 graph: &PackGraph,
681 workspace: &Path,
682) -> Result<RunContext, SyncError> {
683 let event_log = event_log_path(pack_root);
684 let lock_path = event_lock_path(&event_log);
685 let vars = VarEnv::from_os();
686 let order = post_order(graph);
687 let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
688 let lockfile_path = lockfile_path(pack_root);
689 let prior_lock = load_prior_lock(&lockfile_path)?;
690 let registry = Arc::new(Registry::bootstrap());
691 let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
692 Ok(RunContext {
693 order,
694 vars,
695 event_log,
696 lock_path,
697 lockfile_path,
698 prior_lock,
699 registry,
700 pack_type_registry,
701 pre_run_recovery,
702 })
703}
704
705/// Build the [`PackTypeRegistry`] the sync driver threads into every
706/// [`ExecCtx`] it constructs.
707///
708/// Default path (no `plugin-inventory` feature) hard-codes the three
709/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
710/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
711/// externally-submitted plugin types (mirroring the M4-E pattern for
712/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
713/// helper so the `#[cfg]` split lives in one place instead of being
714/// smeared across every sync call-site.
715fn bootstrap_pack_type_registry() -> PackTypeRegistry {
716 #[cfg(feature = "plugin-inventory")]
717 {
718 let mut reg = PackTypeRegistry::bootstrap();
719 reg.register_from_inventory();
720 reg
721 }
722 #[cfg(not(feature = "plugin-inventory"))]
723 {
724 PackTypeRegistry::bootstrap()
725 }
726}
727
728/// Emit a single `tracing::info!` line when `--force` is active so
729/// operators can confirm from logs that the skip short-circuit was
730/// bypassed. Extracted so [`run`] stays small.
731fn log_force_flag(force: bool) {
732 if force {
733 tracing::info!(
734 target: "grex::sync",
735 "--force active: bypassing lockfile skip-on-hash short-circuit"
736 );
737 }
738}
739
740/// v1.2.1 path (iii) — drive the v1.2.0 [`sync_meta`] walker over the
741/// resolved canonical workspace.
742///
743/// This is the SOLE mutating pass in `sync::run`: clones, fetches,
744/// prune dispatches, distributed-lockfile reads, and TOCTOU `BoundedDir`
745/// opens all happen here. The subsequent [`build_and_validate_graph`]
746/// pass is read-only against the disk state this fn leaves behind.
747///
748/// `prune_candidates` is computed from the per-meta lockfile orphans:
749/// every entry in `<workspace>/.grex/grex.lock.jsonl` whose `path` no
750/// longer appears in the live root manifest's `children[]` is fed into
751/// Phase 2 for dispatch (with `--force-prune` / `--force-prune-with-ignored`
752/// overrides honoured by the consent walk). This closes the
753/// "prune-inert" gap from the previous wiring, where `sync::run` passed
754/// `&[]` and `--force-prune` was a CLI flag with no behavioural reach.
755///
756/// `--workspace` semantics: the canonical `workspace` argument is what
757/// `sync_meta` uses as its `meta_dir`. Children land at
758/// `<workspace>/<child.path>` — the v1.2.0 parent-relative model. Prior
759/// to v1.2.1, callers passing `--workspace` skipped the precursor
760/// entirely; that bypass is retired here so override callers see the
761/// same v1.2.0 semantics as the default-cwd path.
762///
763/// `SyncOptions::parallel` mapping (mirrors [`SyncMetaOptions::parallel`]
764/// with the documented `Some(0)` carve-out):
765/// * `None` → `SyncMetaOptions::parallel = None` (rayon default =
766/// `num_cpus::get()`).
767/// * `Some(0)` → `SyncMetaOptions::parallel = None` (the CLI sentinel
768/// for "unbounded" maps to rayon's default; `Some(0)` would be
769/// clamped to `1` inside `build_pool`, which is not what callers
770/// asking for unbounded want).
771/// * `Some(n)` for `n >= 1` → `SyncMetaOptions::parallel = Some(n)`.
772fn run_sync_meta(workspace: &Path, opts: &SyncOptions) -> Result<(), SyncError> {
773 let loader = FsPackLoader::new();
774 let backend = GixBackend::new();
775 let parallel = match opts.parallel {
776 None | Some(0) => None,
777 Some(n) => Some(n),
778 };
779 // v1.2.1 Item 5b — resolve the quarantine config relative to the
780 // canonical workspace (the same `meta_dir` `sync_meta` runs on).
781 // Trash bucket lives at `<workspace>/.grex/trash/`; audit log at
782 // `<workspace>/.grex/events.jsonl` — same path the existing
783 // `ForcePruneExecuted` event uses.
784 let quarantine = opts.quarantine.then(|| crate::tree::QuarantineConfig {
785 trash_root: workspace.join(".grex").join("trash"),
786 audit_log: crate::manifest::event_log_path(workspace),
787 });
788 // v1.2.5 — thread `--retain-days N` into the per-meta options so
789 // every recursion frame swept its own trash bucket. `None` skips
790 // the GC entirely (v1.2.1 indefinite-retention).
791 let retention =
792 opts.retain_days.map(|retain_days| crate::tree::RetentionConfig { retain_days });
793 let meta_opts = SyncMetaOptions {
794 ref_override: opts.ref_override.clone(),
795 recurse: opts.recurse,
796 max_depth: opts.max_depth,
797 force_prune: opts.force_prune,
798 force_prune_with_ignored: opts.force_prune_with_ignored,
799 parallel,
800 quarantine,
801 retention,
802 // v1.3.1 (B4) — propagate the orchestrator's dry-run flag into
803 // the walker so Phase 1 skips clone/fetch and emits the
804 // would-clone records into `SyncMetaReport::dry_run_would_clone`
805 // instead. The orchestrator already gates lockfile persist via
806 // `persist_lockfile_if_clean`; this wires the walker side.
807 dry_run: opts.dry_run,
808 };
809 let prune_candidates = compute_prune_candidates(workspace, &loader);
810 let report = sync_meta(workspace, &backend, &loader, &meta_opts, &prune_candidates)?;
811 if let Some(first) = report.errors.into_iter().next() {
812 return Err(SyncError::Tree(first));
813 }
814 Ok(())
815}
816
817/// v1.2.1 path (iii) — orphan-prune candidate computation.
818///
819/// Reads `<workspace>/.grex/grex.lock.jsonl` and the root manifest;
820/// returns every lockfile entry path that no longer matches a declared
821/// child in `manifest.children`. Empty in three cases:
822///
823/// * No lockfile (fresh workspace, never synced).
824/// * No manifest at `<workspace>/.grex/pack.yaml` (single-node tree —
825/// `sync_meta` will surface its own diagnostic).
826/// * Lockfile entries are all still declared (steady-state sync).
827///
828/// Lockfile read errors are tolerated as `Vec::new()`: the prune pass
829/// is opportunistic, and a corrupt lockfile is the migrator's concern,
830/// not the prune dispatcher's. Manifest read errors are similarly
831/// tolerated — `sync_meta` will fail loudly on the same condition,
832/// giving the operator a single unambiguous error surface.
833fn compute_prune_candidates(
834 workspace: &Path,
835 loader: &dyn crate::tree::PackLoader,
836) -> Vec<PathBuf> {
837 use crate::lockfile::read_meta_lockfile;
838 let entries = match read_meta_lockfile(workspace) {
839 Ok(e) => e,
840 Err(_) => return Vec::new(),
841 };
842 if entries.is_empty() {
843 return Vec::new();
844 }
845 let manifest = match loader.load(workspace) {
846 Ok(m) => m,
847 Err(_) => return Vec::new(),
848 };
849 let declared: std::collections::HashSet<String> =
850 manifest.children.iter().map(crate::pack::ChildRef::effective_path).collect();
851 entries
852 .into_iter()
853 .filter(|e| !declared.contains(&e.path))
854 .map(|e| PathBuf::from(e.path))
855 .collect()
856}
857
858/// v1.2.1 path (iii) — read-only graph build + plan-phase validation.
859///
860/// Builds the [`PackGraph`] from the on-disk meta tree rooted at
861/// `workspace`. Replaces the legacy `walk_and_validate` (which used
862/// [`crate::tree::Walker::walk`] and re-issued every clone/fetch as a
863/// no-op probe) with the v1.2.1 split:
864///
865/// * The mutating half ran in [`run_sync_meta`] — all clones, fetches,
866/// prune dispatches, and TOCTOU `BoundedDir` opens already happened.
867/// * THIS pass is strictly READ-ONLY. It walks the manifest tree
868/// parent-relatively (matching what `sync_meta` placed on disk),
869/// loads each child's `pack.yaml` (or synthesises a plain-git leaf),
870/// probes `head_sha`, and produces the [`PackGraph`] consumed by
871/// [`run_actions`].
872///
873/// Plan-phase validators run against the assembled graph when
874/// `validate` is true.
875fn build_and_validate_graph(
876 workspace: &Path,
877 validate: bool,
878 ref_override: Option<&str>,
879) -> Result<PackGraph, SyncError> {
880 let loader = FsPackLoader::new();
881 let backend = GixBackend::new();
882 let graph = build_graph(workspace, &backend, &loader, ref_override)?;
883 if validate {
884 validate_graph(&graph)?;
885 }
886 Ok(graph)
887}
888
889/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
890/// empty map; parse errors are fatal since writes are atomic and a torn
891/// lockfile therefore indicates real corruption that must be resolved
892/// before a fresh sync is safe. Parse/IO failures surface as
893/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
894/// dependency-satisfaction fault, so it gets its own taxonomy slot.
895fn load_prior_lock(
896 lockfile_path: &Path,
897) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
898 read_lockfile(lockfile_path)
899 .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
900}
901
902/// Persist `next_lock` atomically to `lockfile_path` whenever this was
903/// not a dry-run. On a halt the map has already had the halted pack's
904/// entry removed (see `run_actions`), so persisting now preserves every
905/// *successful* pack's fresh entry while guaranteeing absence of an
906/// entry for the halted pack — next sync sees no prior hash there and
907/// re-executes from scratch (route (b) halt-state gating). Write errors
908/// surface as non-fatal warnings on the report.
909fn persist_lockfile_if_clean(
910 report: &mut SyncReport,
911 lockfile_path: &Path,
912 next_lock: &std::collections::HashMap<String, LockEntry>,
913 dry_run: bool,
914) {
915 if dry_run {
916 return;
917 }
918 if let Err(e) = write_lockfile(lockfile_path, next_lock) {
919 tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
920 report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
921 }
922}
923
924/// Canonical location of the resolved-state lockfile
925/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
926/// so both audit artifacts live under a single `.grex/` sidecar.
927fn lockfile_path(pack_root: &Path) -> PathBuf {
928 pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
929}
930
931/// Create the workspace directory if it does not yet exist.
932fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
933 if !workspace.exists() {
934 std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
935 errors: vec![PackValidationError::DependsOnUnsatisfied {
936 pack: "<workspace>".into(),
937 required: format!("{}: {e}", workspace.display()),
938 }],
939 })?;
940 }
941 Ok(())
942}
943
944/// Open (but do not acquire) the workspace-level lock file.
945fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
946 let ws_lock_path = workspace_lock_path(workspace);
947 let ws_lock = ScopedLock::open(&ws_lock_path)
948 .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
949 Ok((ws_lock, ws_lock_path))
950}
951
952/// Try-acquire the workspace lock guard when the holder is `Some`.
953/// Returns `Ok(None)` when the holder is `None` (e.g. dry-run path skips
954/// lock acquisition entirely; see Blocker B4 v1.3.1). Translates the
955/// busy/error outcomes into the shared [`SyncError`] taxonomy. Extracted
956/// from [`run`] / [`teardown`] to keep both verb entry-points under the
957/// `clippy::too-many-lines` limit while preserving the original lock
958/// semantics.
959fn try_acquire_workspace_guard<'a>(
960 holder: Option<&'a mut (ScopedLock, PathBuf)>,
961 workspace: &Path,
962) -> Result<Option<fd_lock::RwLockWriteGuard<'a, std::fs::File>>, SyncError> {
963 let Some((ws_lock, ws_lock_path)) = holder else {
964 return Ok(None);
965 };
966 match ws_lock.try_acquire() {
967 Ok(Some(g)) => Ok(Some(g)),
968 Ok(None) => Err(SyncError::WorkspaceBusy {
969 workspace: workspace.to_path_buf(),
970 lock_path: ws_lock_path.clone(),
971 }),
972 Err(e) => Err(workspace_lock_err(ws_lock_path, &e.to_string())),
973 }
974}
975
976/// Build a `Validation` error describing a workspace-lock failure.
977fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
978 SyncError::Validation {
979 errors: vec![PackValidationError::DependsOnUnsatisfied {
980 pack: "<workspace-lock>".into(),
981 required: format!("{}: {reason}", ws_lock_path.display()),
982 }],
983 }
984}
985
986/// Single source of truth for the legacy workspace directory name.
987/// Pre-`v1.1.0` `resolve_workspace` joined `.grex/workspace/` onto the
988/// pack root by default; the auto-migration in
989/// [`migrate_legacy_workspace`] is the only place that legacy literal
990/// is allowed to appear in `crates/grex-core/src/`. The grep gate in
991/// the v1.1.0 release checklist allows this one constant.
992const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
993
994/// Auto-migrate any legacy `.grex/workspace/<name>/` child layout left
995/// over from v1.0.x to the v1.1.0 flat-sibling layout. Idempotent: a
996/// fresh workspace built on v1.1.0+ sees no `.grex/workspace/`
997/// directory and the function no-ops.
998///
999/// Per-child outcomes:
1000///
1001/// * **Both legacy + flat-sibling exist** → `SkippedBothExist`. The
1002/// user needs to inspect (perhaps the legacy is stale, perhaps it is
1003/// the source of truth); we never silently delete either.
1004/// * **Flat-sibling slot occupied by a non-grex file or non-empty dir**
1005/// → `SkippedDestOccupied`. Refuse to clobber user data.
1006/// * **Legacy exists, flat-sibling absent** → `Migrated` via atomic
1007/// `fs::rename`. Same-volume move is the common case (the migration
1008/// stays inside `pack_root`); cross-volume failures surface as
1009/// `Failed { error }` with the OS message so the operator can move
1010/// manually.
1011/// * **Legacy absent** → silent no-op (not recorded in the report).
1012///
1013/// After all per-child decisions: orphan `.grex.sync.lock` under the
1014/// legacy workspace is removed (best-effort) and the empty
1015/// `.grex/workspace/` directory is rmdir'd (best-effort). Both are
1016/// soft-failures: leaving them on disk is harmless, surfacing the
1017/// errors as a sync abort would be over-strict.
1018///
1019/// Discovery is by directory listing, not by parent-manifest parse —
1020/// migration must work even when the parent manifest itself was
1021/// rewritten between versions. A child counts as "legacy" iff
1022/// `<pack_root>/<LEGACY_WORKSPACE_DIR>/<name>/.git` exists (i.e. it is
1023/// an actual git working tree, not stray metadata).
1024fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
1025 let root = pack_root_dir(pack_root);
1026 let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
1027 if !legacy_root.is_dir() {
1028 return Vec::new();
1029 }
1030 let entries = match fs::read_dir(&legacy_root) {
1031 Ok(e) => e,
1032 Err(e) => {
1033 tracing::warn!(
1034 target: "grex::sync::migrate",
1035 "legacy workspace `{}` unreadable: {e}",
1036 legacy_root.display(),
1037 );
1038 return Vec::new();
1039 }
1040 };
1041 let mut migrations = Vec::new();
1042 for entry_result in entries {
1043 let entry = match entry_result {
1044 Ok(e) => e,
1045 Err(e) => {
1046 tracing::warn!(
1047 target: "grex::sync::migrate",
1048 "skipping unreadable entry under `{}`: {e}",
1049 legacy_root.display(),
1050 );
1051 continue;
1052 }
1053 };
1054 let Ok(ft) = entry.file_type() else { continue };
1055 // file_type avoids symlink-following; legitimate v1.0.x children
1056 // were always real directories, so anything else is skipped.
1057 if ft.is_symlink() || !ft.is_dir() {
1058 continue;
1059 }
1060 let name_os = entry.file_name();
1061 let Some(name) = name_os.to_str() else { continue };
1062 // Only act on entries that look like real cloned children (have
1063 // a `.git`). The legacy workspace lock file (`.grex.sync.lock`)
1064 // is not a directory and is filtered out by the dir check above;
1065 // we clean it up explicitly after the migration loop completes.
1066 let from_abs = entry.path();
1067 if !from_abs.join(".git").exists() {
1068 continue;
1069 }
1070 let to_abs = root.join(name);
1071 let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
1072 let to_rel = PathBuf::from(name);
1073 let outcome = decide_and_migrate(&from_abs, &to_abs);
1074 log_migration(&from_rel, &to_rel, &outcome);
1075 migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
1076 }
1077 cleanup_legacy_workspace_root(&legacy_root);
1078 migrations
1079}
1080
1081/// Decide what to do with one legacy child + perform the move when
1082/// safe. Returns the outcome to record on the [`WorkspaceMigration`].
1083fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
1084 let dest_exists = to.exists();
1085 let dest_is_grex_repo = dest_exists && to.join(".git").exists();
1086 if dest_is_grex_repo {
1087 // Both legacy and flat-sibling are git repos. Refuse to choose
1088 // between them; let the user resolve.
1089 return MigrationOutcome::SkippedBothExist;
1090 }
1091 if dest_exists {
1092 // Some other entry occupies the flat-sibling slot — a stray
1093 // file, an empty dir, an unrelated dir. Treat as user data and
1094 // leave both in place.
1095 return MigrationOutcome::SkippedDestOccupied;
1096 }
1097 match fs::rename(from, to) {
1098 Ok(()) => MigrationOutcome::Migrated,
1099 Err(e) => MigrationOutcome::Failed { error: e.to_string() },
1100 }
1101}
1102
1103/// Emit one structured log line per migration so users see exactly what
1104/// happened during the upgrade. Severity matches outcome: success is
1105/// `info`, skips and failures are `warn` so they surface in the default
1106/// log level without forcing operators to crank verbosity.
1107fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
1108 let from_disp = from.display();
1109 let to_disp = to.display();
1110 match outcome {
1111 MigrationOutcome::Migrated => {
1112 tracing::info!(
1113 target: "grex::sync::migrate",
1114 "migrated: legacy={from_disp} -> new={to_disp}",
1115 );
1116 }
1117 MigrationOutcome::SkippedBothExist => {
1118 tracing::warn!(
1119 target: "grex::sync::migrate",
1120 "skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
1121 );
1122 }
1123 MigrationOutcome::SkippedDestOccupied => {
1124 tracing::warn!(
1125 target: "grex::sync::migrate",
1126 "skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
1127 );
1128 }
1129 MigrationOutcome::Failed { error } => {
1130 tracing::warn!(
1131 target: "grex::sync::migrate",
1132 "failed: legacy={from_disp} -> new={to_disp}: {error}",
1133 );
1134 }
1135 }
1136}
1137
1138/// Best-effort cleanup of the legacy workspace root after migration:
1139/// remove the orphan `.grex.sync.lock` (always safe — the v1.1.0
1140/// workspace lock lives at `<pack_root>/.grex.sync.lock`) and try to
1141/// rmdir the now-empty `.grex/workspace/` directory. Errors are logged
1142/// at trace level only — both leftovers are harmless.
1143fn cleanup_legacy_workspace_root(legacy_root: &Path) {
1144 let orphan_lock = legacy_root.join(".grex.sync.lock");
1145 if orphan_lock.exists() {
1146 if let Err(e) = fs::remove_file(&orphan_lock) {
1147 tracing::warn!(
1148 target: "grex::sync::migrate",
1149 "could not remove orphan lock `{}`: {e}",
1150 orphan_lock.display(),
1151 );
1152 } else {
1153 tracing::info!(
1154 target: "grex::sync::migrate",
1155 "removed orphan lock `{}`",
1156 orphan_lock.display(),
1157 );
1158 }
1159 }
1160 // `remove_dir` only succeeds when the directory is empty — exactly
1161 // what we want; if any unmigrated child remains, the legacy root
1162 // stays put for the operator to inspect.
1163 let _ = fs::remove_dir(legacy_root);
1164}
1165
1166/// Compute the default workspace path when `override_` is absent.
1167///
1168/// The default is the pack root directory itself, so child packs
1169/// resolve as flat siblings of the parent pack root. The rationale —
1170/// alignment with the long-standing pack-spec rule that
1171/// `children[].path` is a bare name — lives in the pack-spec
1172/// "Validation rules" section (`man/concepts/pack-spec.md` /
1173/// `grex-doc/src/concepts/pack-spec.md`).
1174/// v1.2.1 path (iii) — resolve the workspace anchor with canonical
1175/// symlink resolution.
1176///
1177/// Resolution rules:
1178/// * `override_ = None` ⇒ derive workspace from `pack_root_dir(pack_root)`.
1179/// No canonicalize on this branch — the pack-root path was supplied
1180/// directly by the caller and may legitimately reference a not-yet-real
1181/// directory (e.g. integration fixtures that lazily materialise the
1182/// pack root).
1183/// * `override_ = Some(path)`:
1184/// 1. **Must-exist** check. A `--workspace` override pointing at a
1185/// non-existent directory is a fail-fast error (we won't silently
1186/// `mkdir -p` someone else's typo).
1187/// 2. **Canonicalise.** Resolve symlinks to a real path. This is the
1188/// anchor every downstream pass (`sync_meta`, `build_graph`, the
1189/// lockfile reads, the TOCTOU `BoundedDir` opens) hangs off — they
1190/// MUST agree on a single inode-stable string.
1191/// 3. **Log when input != canonical.** Surfaces symlink resolution to
1192/// operators so they can correlate workspace-busy diagnostics with
1193/// what the OS actually opened.
1194fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> Result<PathBuf, SyncError> {
1195 let Some(input) = override_ else {
1196 return Ok(pack_root_dir(pack_root));
1197 };
1198 if !input.exists() {
1199 return Err(SyncError::Validation {
1200 errors: vec![PackValidationError::DependsOnUnsatisfied {
1201 pack: "<workspace>".into(),
1202 required: format!("--workspace {}: directory does not exist", input.display()),
1203 }],
1204 });
1205 }
1206 let canonical = match input.canonicalize() {
1207 Ok(p) => p,
1208 Err(e) => {
1209 return Err(SyncError::Validation {
1210 errors: vec![PackValidationError::DependsOnUnsatisfied {
1211 pack: "<workspace>".into(),
1212 required: format!("--workspace {}: canonicalize failed: {e}", input.display()),
1213 }],
1214 });
1215 }
1216 };
1217 if canonical != input {
1218 tracing::info!(
1219 target: "grex::sync",
1220 "workspace: {} → {}",
1221 input.display(),
1222 canonical.display(),
1223 );
1224 }
1225 Ok(canonical)
1226}
1227
1228/// Resolve the workspace, ensure the directory exists, and run the v1→v2
1229/// event-log migration. Extracted so [`run`] and [`teardown`] stay under
1230/// the workspace's 50-LOC per-function lint threshold.
1231fn prepare_workspace(pack_root: &Path, opts: &SyncOptions) -> Result<PathBuf, SyncError> {
1232 let workspace = resolve_workspace(pack_root, opts.workspace.as_deref())?;
1233 ensure_workspace_dir(&workspace)?;
1234 crate::manifest::ensure_event_log_migrated(&workspace).map_err(SyncError::EventLogMigration)?;
1235 Ok(workspace)
1236}
1237
1238/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
1239fn pack_root_dir(pack_root: &Path) -> PathBuf {
1240 let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
1241 if is_yaml {
1242 pack_root
1243 .parent()
1244 .and_then(Path::parent)
1245 .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
1246 } else {
1247 pack_root.to_path_buf()
1248 }
1249}
1250
1251/// Compute the `.grex/events.jsonl` path next to the pack root.
1252///
1253/// Delegates to [`crate::manifest::event_log_path`] (single source of
1254/// truth for the canonical event-log location).
1255fn event_log_path(pack_root: &Path) -> PathBuf {
1256 crate::manifest::event_log_path(&pack_root_dir(pack_root))
1257}
1258
1259/// Compute the sidecar lock path next to the event log. One canonical slot
1260/// per pack root — cooperating grex procs serialize through this file.
1261fn event_lock_path(event_log: &Path) -> PathBuf {
1262 event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
1263}
1264
1265/// Compute the sidecar lock path for the workspace itself. Lives at
1266/// `<workspace>/.grex.sync.lock` — the workspace dir is already created by
1267/// the `run()` prologue, so the lock sidecar lands beside the child clones.
1268fn workspace_lock_path(workspace: &Path) -> PathBuf {
1269 workspace.join(".grex.sync.lock")
1270}
1271
1272/// Aggregate manifest-level + graph-level validators and return their output.
1273fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
1274 let mut errors: Vec<PackValidationError> = Vec::new();
1275 for node in graph.nodes() {
1276 if let Err(mut e) = node.manifest.validate_plan() {
1277 errors.append(&mut e);
1278 }
1279 }
1280 if let Err(mut e) = graph.validate() {
1281 errors.append(&mut e);
1282 }
1283 if errors.is_empty() {
1284 Ok(())
1285 } else {
1286 Err(SyncError::Validation { errors })
1287 }
1288}
1289
1290/// Depth-first post-order traversal of the graph starting from root.
1291///
1292/// Children fully precede their parent in the returned vector so downstream
1293/// executors install leaves first and the root last.
1294fn post_order(graph: &PackGraph) -> Vec<usize> {
1295 let mut out = Vec::with_capacity(graph.nodes().len());
1296 visit_post(graph, 0, &mut out);
1297 out
1298}
1299
1300fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
1301 // Collect child ids first to avoid borrow conflicts with graph iteration.
1302 let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
1303 for k in kids {
1304 visit_post(graph, k, out);
1305 }
1306 out.push(id);
1307}
1308
1309/// Drive every action for every node; abort on the first [`ExecError`].
1310///
1311/// Each action is bracketed by three manifest events:
1312/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
1313/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
1314/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
1315///
1316/// All three writes go through the same [`ManifestLock`]-wrapped path
1317/// ([`append_manifest_event`]) and failures are recorded as non-fatal
1318/// warnings so the executor's outcome always dominates. The third append
1319/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
1320/// with the exact action that halted.
1321// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
1322// now pushes past the 50-LOC per-function lint by one line. Silence
1323// that one — the body itself is unchanged in scope.
1324#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1325fn run_actions(
1326 report: &mut SyncReport,
1327 order: &[usize],
1328 vars: &VarEnv,
1329 workspace: &Path,
1330 event_log: &Path,
1331 lock_path: &Path,
1332 dry_run: bool,
1333 prior_lock: &std::collections::HashMap<String, LockEntry>,
1334 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1335 registry: &Arc<Registry>,
1336 pack_type_registry: &Arc<PackTypeRegistry>,
1337 only: Option<&GlobSet>,
1338 force: bool,
1339 parallel: usize,
1340 scheduler: &Arc<Scheduler>,
1341) {
1342 let plan = PlanExecutor::with_registry(registry.clone());
1343 let fs = FsExecutor::with_registry(registry.clone());
1344 let rt = build_pack_type_runtime(parallel);
1345 let visited_meta = new_visited_meta();
1346 for &id in order {
1347 let Some(node) = report.graph.node(id) else { continue };
1348 let pack_name = node.name.clone();
1349 let pack_path = node.path.clone();
1350 let actions = node.manifest.actions.clone();
1351 let manifest = node.manifest.clone();
1352 let commit_sha = node.commit_sha.clone().unwrap_or_default();
1353 let synthetic = node.synthetic;
1354 // v1.3.1 B14: parent manifest's `ref:` value for this node,
1355 // captured by the walker. Threaded into `upsert_lock_entry`
1356 // so the lockfile `branch` slot mirrors the manifest verbatim.
1357 let manifest_ref = node.manifest_ref.clone();
1358 // `--only` filter + skip-on-hash short-circuits colocated in
1359 // `try_skip_or_filter` so this outer loop stays within the
1360 // 50-LOC per-function budget.
1361 if try_skip_or_filter(
1362 report,
1363 only,
1364 &pack_name,
1365 &pack_path,
1366 &actions,
1367 &commit_sha,
1368 synthetic,
1369 workspace,
1370 prior_lock,
1371 next_lock,
1372 dry_run,
1373 force,
1374 ) {
1375 continue;
1376 }
1377 let pack_halted = run_pack_lifecycle(
1378 report,
1379 vars,
1380 workspace,
1381 event_log,
1382 lock_path,
1383 dry_run,
1384 &plan,
1385 &fs,
1386 registry,
1387 pack_type_registry,
1388 &rt,
1389 &pack_name,
1390 &pack_path,
1391 &manifest,
1392 &visited_meta,
1393 scheduler,
1394 );
1395 if pack_halted {
1396 // Route (b) halt-state gating: drop any prior entry for the
1397 // halted pack so the next sync sees no prior hash and
1398 // re-executes from scratch. Successful packs in this same
1399 // run keep their freshly-upserted entries, and packs we did
1400 // not reach keep their prior entries untouched.
1401 next_lock.remove(&pack_name);
1402 return;
1403 }
1404 // Successful pack — record a fresh lockfile entry so the next
1405 // run's skip-on-hash test can succeed. Commit SHA is now plumbed
1406 // from the walker (M4-D): `PackNode::commit_sha` carries the
1407 // resolved HEAD SHA when the pack's working tree is a git
1408 // repository, otherwise an empty string keeps the hash stable.
1409 let actions_hash = compute_actions_hash(&actions, &commit_sha);
1410 upsert_lock_entry(
1411 prior_lock,
1412 next_lock,
1413 &pack_name,
1414 &commit_sha,
1415 &actions_hash,
1416 synthetic,
1417 manifest_ref.as_deref(),
1418 );
1419 }
1420}
1421
1422/// Build the multi-thread tokio runtime used to drive async pack-type
1423/// plugin dispatch. Pack-type plugins expose `async fn` methods via
1424/// `async_trait`, but the sync driver is synchronous end-to-end — we
1425/// block on each plugin future inside the outer action loop. Extracted
1426/// into a standalone helper so the runtime construction does not
1427/// inflate `run_actions` beyond the 50-LOC per-function budget.
1428///
1429/// # Multi-thread rationale (M5-2c)
1430///
1431/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
1432/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
1433/// itself is purely `async` / `.await` (no nested `block_on`), but future
1434/// plugin authors may reasonably compose `block_on` calls inside
1435/// lifecycle hooks — and external callers that drive `MetaPlugin` via
1436/// `rt.block_on(...)` within their own runtime would deadlock on a
1437/// current-thread runtime the moment a hook re-enters. A multi-thread
1438/// runtime with a small worker pool lets those re-entries resolve on a
1439/// sibling worker instead of blocking the dispatcher thread.
1440///
1441/// # Worker-thread sizing (feat-m6 H6)
1442///
1443/// The worker pool is sized from the resolved `--parallel` knob so the
1444/// runtime always has enough workers to service every in-flight pack op
1445/// plus at least one sibling for nested `block_on`. Clamped to
1446/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
1447/// one sibling so re-entrant hooks never deadlock), and the upper bound
1448/// caps the pool at the host's CPU count so `--parallel 0`
1449/// (unbounded-semantics) does not explode the worker count.
1450fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
1451 let workers = parallel.clamp(2, num_cpus::get().max(2));
1452 tokio::runtime::Builder::new_multi_thread()
1453 .worker_threads(workers)
1454 .enable_all()
1455 .build()
1456 .expect("tokio runtime for pack-type dispatch")
1457}
1458
1459/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
1460/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
1461/// the argument is threaded through so future explicit-install /
1462/// teardown verbs can share the same set shape.
1463fn new_visited_meta() -> MetaVisitedSet {
1464 std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
1465}
1466
1467/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
1468/// `true` when the outer loop should `continue` for this pack.
1469///
1470/// Extracted from `run_actions` so that function stays under the
1471/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
1472/// is a pure structural refactor.
1473#[allow(clippy::too_many_arguments)]
1474fn try_skip_or_filter(
1475 report: &mut SyncReport,
1476 only: Option<&GlobSet>,
1477 pack_name: &str,
1478 pack_path: &Path,
1479 actions: &[Action],
1480 commit_sha: &str,
1481 current_synthetic: bool,
1482 workspace: &Path,
1483 prior_lock: &std::collections::HashMap<String, LockEntry>,
1484 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1485 dry_run: bool,
1486 force: bool,
1487) -> bool {
1488 if skip_for_only_filter(only, pack_name, pack_path, workspace) {
1489 if let Some(prev) = prior_lock.get(pack_name) {
1490 next_lock.insert(pack_name.to_string(), prev.clone());
1491 }
1492 return true;
1493 }
1494 try_skip_pack(
1495 report,
1496 pack_name,
1497 pack_path,
1498 actions,
1499 commit_sha,
1500 current_synthetic,
1501 prior_lock,
1502 next_lock,
1503 dry_run,
1504 force,
1505 )
1506}
1507
1508/// Return `true` when `--only` is active and the pack's
1509/// **workspace-relative path** (normalized to forward-slash form) does
1510/// not match any of the registered globs. Name-fallback matching was
1511/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
1512/// "pack paths" and cross-platform consistency requires a single
1513/// normalized representation rather than `display()`-formatted strings
1514/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
1515/// as a glob-escape, not a path separator). For the root pack whose
1516/// `pack_path` is not under `workspace`, the fallback is to match
1517/// against the absolute path's forward-slash form.
1518fn skip_for_only_filter(
1519 only: Option<&GlobSet>,
1520 pack_name: &str,
1521 pack_path: &Path,
1522 workspace: &Path,
1523) -> bool {
1524 let Some(set) = only else { return false };
1525 let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
1526 let rel_str = rel.to_string_lossy().replace('\\', "/");
1527 let matches = set.is_match(&rel_str);
1528 if !matches {
1529 tracing::info!(
1530 target: "grex::sync",
1531 "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
1532 );
1533 }
1534 !matches
1535}
1536
1537/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
1538///
1539/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
1540/// with a pack-type-aware dispatch:
1541///
1542/// * [`PackType::Declarative`] retains the per-action execution shape that
1543/// M4 shipped — each action lands its own `ActionStarted` /
1544/// `ActionCompleted` / `ActionHalted` event bracket. The registry is
1545/// still consulted via [`PackTypeRegistry::get`] as a name-oracle so
1546/// mistyped packs fail closed.
1547/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
1548/// pack-type plugin's `sync` method (the sync CLI verb is the only
1549/// caller in M5-1; `install` / `update` / `teardown` verbs wire in
1550/// M5-2), returning a single aggregate [`ExecStep`]. A single event
1551/// bracket frames the async call.
1552///
1553/// Declarative is kept on the legacy per-action path because its event log
1554/// semantics (one event per action, per-step rollback context) are exactly
1555/// what plugin authors expect to observe. Unifying declarative under the
1556/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
1557/// to emit a step stream rather than a single aggregate.
1558#[allow(clippy::too_many_arguments)]
1559fn run_pack_lifecycle(
1560 report: &mut SyncReport,
1561 vars: &VarEnv,
1562 workspace: &Path,
1563 event_log: &Path,
1564 lock_path: &Path,
1565 dry_run: bool,
1566 plan: &PlanExecutor,
1567 fs: &FsExecutor,
1568 registry: &Arc<Registry>,
1569 pack_type_registry: &Arc<PackTypeRegistry>,
1570 rt: &tokio::runtime::Runtime,
1571 pack_name: &str,
1572 pack_path: &Path,
1573 manifest: &crate::pack::PackManifest,
1574 visited_meta: &MetaVisitedSet,
1575 scheduler: &Arc<Scheduler>,
1576) -> bool {
1577 let type_tag = manifest.r#type.as_str();
1578 // Name-oracle check: every pack type must be registered. Unknown
1579 // pack types halt the pack the same way M4 halted unknown actions.
1580 if pack_type_registry.get(type_tag).is_none() {
1581 let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1582 record_action_err(dry_run, report, event_log, lock_path, pack_name, 0, "pack-type", err);
1583 return true;
1584 }
1585 match manifest.r#type {
1586 crate::pack::PackType::Declarative => run_declarative_actions(
1587 report,
1588 vars,
1589 workspace,
1590 event_log,
1591 lock_path,
1592 dry_run,
1593 plan,
1594 fs,
1595 pack_name,
1596 pack_path,
1597 manifest,
1598 &manifest.actions,
1599 scheduler,
1600 ),
1601 crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
1602 report,
1603 vars,
1604 workspace,
1605 event_log,
1606 lock_path,
1607 dry_run,
1608 registry,
1609 pack_type_registry,
1610 rt,
1611 pack_name,
1612 pack_path,
1613 manifest,
1614 type_tag,
1615 visited_meta,
1616 scheduler,
1617 ),
1618 }
1619}
1620
1621/// Run a declarative pack's actions sequentially. Preserves the M4
1622/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1623/// `ActionHalted`). Returns `true` when the sync must halt.
1624#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1625fn run_declarative_actions(
1626 report: &mut SyncReport,
1627 vars: &VarEnv,
1628 workspace: &Path,
1629 event_log: &Path,
1630 lock_path: &Path,
1631 dry_run: bool,
1632 plan: &PlanExecutor,
1633 fs: &FsExecutor,
1634 pack_name: &str,
1635 pack_path: &Path,
1636 manifest: &crate::pack::PackManifest,
1637 actions: &[Action],
1638 scheduler: &Arc<Scheduler>,
1639) -> bool {
1640 // B12 v1.3.1: `apply_gitignore` was previously called here for
1641 // declarative packs (the per-action driver bypasses the plugin
1642 // path). Auto-mutation of the parent meta-repo's `.gitignore` was
1643 // removed in v1.3.1; `grex doctor` now surfaces an advisory when
1644 // the parent git index tracks pack content. The function is kept
1645 // as a no-op shim and the call is left in place so the diff stays
1646 // minimal — the reviewer pass will delete it together with the
1647 // other call sites in pack_type.rs.
1648 if !dry_run {
1649 let ctx = ExecCtx::new(vars, pack_path, workspace)
1650 .with_platform(Platform::current())
1651 .with_scheduler(scheduler);
1652 if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1653 record_action_err(dry_run, report, event_log, lock_path, pack_name, 0, "gitignore", e);
1654 return true;
1655 }
1656 }
1657 for (idx, action) in actions.iter().enumerate() {
1658 let ctx = ExecCtx::new(vars, pack_path, workspace)
1659 .with_platform(Platform::current())
1660 .with_scheduler(scheduler);
1661 let action_tag = action_kind_tag(action);
1662 append_manifest_event(
1663 dry_run,
1664 event_log,
1665 lock_path,
1666 &Event::ActionStarted {
1667 ts: Utc::now(),
1668 id: pack_name.to_string(),
1669 action_idx: idx,
1670 action_name: action_tag.to_string(),
1671 schema_version: SCHEMA_VERSION.to_string(),
1672 },
1673 &mut report.event_log_warnings,
1674 );
1675 let step_result =
1676 if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1677 if !record_action_outcome(
1678 dry_run,
1679 report,
1680 event_log,
1681 lock_path,
1682 pack_name,
1683 idx,
1684 action_tag,
1685 step_result,
1686 ) {
1687 return true;
1688 }
1689 }
1690 false
1691}
1692
1693/// Dispatch a pack-type plugin (meta / scripted) through the async
1694/// registry. Brackets the call with a single `ActionStarted` /
1695/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1696/// when the sync must halt.
1697#[allow(clippy::too_many_arguments)]
1698fn dispatch_pack_type_plugin(
1699 report: &mut SyncReport,
1700 vars: &VarEnv,
1701 workspace: &Path,
1702 event_log: &Path,
1703 lock_path: &Path,
1704 dry_run: bool,
1705 registry: &Arc<Registry>,
1706 pack_type_registry: &Arc<PackTypeRegistry>,
1707 rt: &tokio::runtime::Runtime,
1708 pack_name: &str,
1709 pack_path: &Path,
1710 manifest: &crate::pack::PackManifest,
1711 type_tag: &'static str,
1712 visited_meta: &MetaVisitedSet,
1713 scheduler: &Arc<Scheduler>,
1714) -> bool {
1715 // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1716 // The sync driver already walks children in post-order via the tree
1717 // walker; attaching the visited set would trigger MetaPlugin's
1718 // real-recursion branch and cause double dispatch (walker runs child
1719 // packs as their own graph nodes, then MetaPlugin would recurse into
1720 // them again). The `visited_meta` parameter is kept on the argument
1721 // list so future explicit-install / teardown verbs that invoke
1722 // MetaPlugin directly can share the same set shape.
1723 let _ = visited_meta;
1724 let ctx = ExecCtx::new(vars, pack_path, workspace)
1725 .with_platform(Platform::current())
1726 .with_registry(registry)
1727 .with_pack_type_registry(pack_type_registry)
1728 .with_scheduler(scheduler);
1729 append_manifest_event(
1730 dry_run,
1731 event_log,
1732 lock_path,
1733 &Event::ActionStarted {
1734 ts: Utc::now(),
1735 id: pack_name.to_string(),
1736 action_idx: 0,
1737 action_name: type_tag.to_string(),
1738 schema_version: SCHEMA_VERSION.to_string(),
1739 },
1740 &mut report.event_log_warnings,
1741 );
1742 // SAFETY: `get` just confirmed the plugin is registered for
1743 // `type_tag`, so this unwrap cannot panic under the matched arm.
1744 let plugin = pack_type_registry
1745 .get(type_tag)
1746 .expect("pack-type plugin must be registered (guarded above)");
1747 // feat-m6 CI fix — establish a task-local tier stack frame for every
1748 // async dispatch. Without this, `TierGuard::push` (which runs inside
1749 // the plugin lifecycle and may span `.await` / thread hops under the
1750 // multi-thread runtime) has no enforcement frame to push into.
1751 let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1752 !record_action_outcome(
1753 dry_run,
1754 report,
1755 event_log,
1756 lock_path,
1757 pack_name,
1758 0,
1759 type_tag,
1760 step_result,
1761 )
1762}
1763
1764/// Pure skip-eligibility decision. Returns `Some(hash)` when the pack
1765/// is eligible for the hash-skip short-circuit, `None` otherwise.
1766///
1767/// Splitting the decision out of [`try_skip_pack`] keeps the
1768/// side-effecting transcript bookkeeping testable in isolation: the
1769/// v1.1.1 synthetic-flag-flip regression exercises this helper without
1770/// having to stand up a `SyncReport` / `PackGraph`.
1771fn skip_eligibility(
1772 actions: &[Action],
1773 commit_sha: &str,
1774 current_synthetic: bool,
1775 prior: &LockEntry,
1776 dry_run: bool,
1777 force: bool,
1778) -> Option<String> {
1779 if dry_run || force {
1780 // Dry runs must always produce the planned-step transcript so
1781 // authors can see what `sync` *would* do. `--force` is the
1782 // operator's explicit opt-out from the hash short-circuit.
1783 return None;
1784 }
1785 let hash = compute_actions_hash(actions, commit_sha);
1786 if prior.actions_hash != hash {
1787 return None;
1788 }
1789 if prior.synthetic != current_synthetic {
1790 // Pack-shape flipped between runs (real ↔ synthetic). Even
1791 // when the actions hash matches by coincidence (e.g. a
1792 // declarative pack with empty `actions[]` whose pack.yaml was
1793 // deleted, falling through to a synthetic leaf with the same
1794 // empty actions list and stable commit SHA), we must NOT
1795 // carry the stale `synthetic` flag forward. Forcing the
1796 // upsert path re-emits the entry with the current flag.
1797 return None;
1798 }
1799 Some(hash)
1800}
1801
1802/// Decide whether `pack_name` can be short-circuited via a lockfile
1803/// hash match. When the prior hash matches the freshly-computed hash,
1804/// emit a single [`ExecResult::Skipped`] step and carry the prior
1805/// lockfile entry forward unchanged. Returns `true` when the pack was
1806/// skipped.
1807///
1808/// `current_synthetic` is the walker-derived synthetic flag for this
1809/// pack on the current run. The skip eligibility check requires it to
1810/// match `prior.synthetic` so a pack-shape transition (e.g. user
1811/// deletes `pack.yaml` so a previously-real pack now walks as
1812/// synthetic) invalidates the skip and forces the lockfile entry to
1813/// be re-emitted with the fresh `synthetic` value.
1814#[allow(clippy::too_many_arguments)]
1815fn try_skip_pack(
1816 report: &mut SyncReport,
1817 pack_name: &str,
1818 pack_path: &Path,
1819 actions: &[Action],
1820 commit_sha: &str,
1821 current_synthetic: bool,
1822 prior_lock: &std::collections::HashMap<String, LockEntry>,
1823 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1824 dry_run: bool,
1825 force: bool,
1826) -> bool {
1827 let Some(prior) = prior_lock.get(pack_name) else {
1828 return false;
1829 };
1830 let Some(hash) =
1831 skip_eligibility(actions, commit_sha, current_synthetic, prior, dry_run, force)
1832 else {
1833 return false;
1834 };
1835 let skipped_step = ExecStep {
1836 action_name: Cow::Borrowed("pack"),
1837 result: ExecResult::Skipped {
1838 pack_path: pack_path.to_path_buf(),
1839 actions_hash: hash.clone(),
1840 },
1841 // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1842 // short-circuit detail; we use it here instead of the prior
1843 // `Require { Satisfied, Skip }` proxy so renderers and consumers
1844 // can match on a single, purpose-built variant.
1845 details: StepKind::PackSkipped { actions_hash: hash },
1846 };
1847 report.steps.push(SyncStep {
1848 pack: pack_name.to_string(),
1849 action_idx: 0,
1850 exec_step: skipped_step,
1851 });
1852 // Carry the prior entry forward so the next-lock snapshot stays
1853 // consistent with what's on disk.
1854 next_lock.insert(pack_name.to_string(), prior.clone());
1855 true
1856}
1857
1858/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1859///
1860/// Stores `commit_sha` verbatim — including the empty string when the
1861/// pack is not a git working tree or the HEAD probe failed.
1862/// `actions_hash` is computed over the same `commit_sha`, so the two
1863/// fields stay internally consistent: if probing starts returning a
1864/// non-empty SHA on the next run, the hash differs and the skip is
1865/// correctly invalidated. The prior-preserve carve-out that was
1866/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1867/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1868///
1869/// `prior_lock` is consulted purely for observability: when a
1870/// previously-real pack flips to synthetic between runs (user deleted
1871/// the pack's `pack.yaml` so the walker fell back to v1.1.1
1872/// plain-git-child synthesis), a `tracing::warn!` records the
1873/// downgrade so the operator notices their declarative actions have
1874/// stopped running.
1875fn upsert_lock_entry(
1876 prior_lock: &std::collections::HashMap<String, LockEntry>,
1877 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1878 pack_name: &str,
1879 commit_sha: &str,
1880 actions_hash: &str,
1881 synthetic: bool,
1882 manifest_ref: Option<&str>,
1883) {
1884 if synthetic {
1885 if let Some(prior) = prior_lock.get(pack_name) {
1886 if !prior.synthetic {
1887 tracing::warn!(
1888 target: "grex::sync",
1889 pack = pack_name,
1890 "pack `{pack_name}` downgraded from real to synthetic — \
1891 pack.yaml missing on disk; only `git pull` will run going forward",
1892 );
1893 }
1894 }
1895 }
1896 let installed_at = Utc::now();
1897 let entry = next_lock.get(pack_name).map_or_else(
1898 || LockEntry {
1899 id: pack_name.to_string(),
1900 // v1.1.1 convention: path == id (1:1 id↔folder). Stage 1.e
1901 // (walker rewrite) will replace this with the parent-relative
1902 // manifest path captured during the walk.
1903 path: pack_name.to_string(),
1904 sha: commit_sha.to_string(),
1905 // v1.3.1 B14: mirror the parent manifest's `ref:` value
1906 // (or empty when absent), per the Lean theorem
1907 // `Grex.Lockfile.lockfile_branch_mirrors_manifest_ref`.
1908 branch: branch_of(manifest_ref),
1909 installed_at,
1910 actions_hash: actions_hash.to_string(),
1911 schema_version: "1".to_string(),
1912 synthetic,
1913 },
1914 |prev| LockEntry {
1915 installed_at,
1916 actions_hash: actions_hash.to_string(),
1917 sha: commit_sha.to_string(),
1918 synthetic,
1919 ..prev.clone()
1920 },
1921 );
1922 next_lock.insert(pack_name.to_string(), entry);
1923}
1924
1925/// Record one action outcome into `report` + event log. Returns `false`
1926/// when the run must halt (on error); `true` otherwise.
1927#[allow(clippy::too_many_arguments)]
1928fn record_action_outcome(
1929 dry_run: bool,
1930 report: &mut SyncReport,
1931 event_log: &Path,
1932 lock_path: &Path,
1933 pack_name: &str,
1934 idx: usize,
1935 action_tag: &'static str,
1936 step_result: Result<ExecStep, ExecError>,
1937) -> bool {
1938 match step_result {
1939 Ok(step) => {
1940 record_action_ok(dry_run, report, event_log, lock_path, pack_name, idx, step);
1941 true
1942 }
1943 Err(e) => {
1944 record_action_err(dry_run, report, event_log, lock_path, pack_name, idx, action_tag, e);
1945 false
1946 }
1947 }
1948}
1949
1950/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1951/// audit event, then push the step onto the report.
1952///
1953/// v1.3.1 B4 fix-up: under `dry_run = true`, the on-disk event-log writes
1954/// are skipped. The in-memory `report.steps` push still happens — dry-run
1955/// callers rely on the planned-step transcript for output.
1956#[allow(clippy::too_many_arguments)]
1957fn record_action_ok(
1958 dry_run: bool,
1959 report: &mut SyncReport,
1960 event_log: &Path,
1961 lock_path: &Path,
1962 pack_name: &str,
1963 idx: usize,
1964 step: ExecStep,
1965) {
1966 append_step_event(
1967 dry_run,
1968 event_log,
1969 lock_path,
1970 pack_name,
1971 &step,
1972 &mut report.event_log_warnings,
1973 );
1974 append_manifest_event(
1975 dry_run,
1976 event_log,
1977 lock_path,
1978 &Event::ActionCompleted {
1979 ts: Utc::now(),
1980 id: pack_name.to_string(),
1981 action_idx: idx,
1982 result_summary: format!("{:?}", step.result),
1983 schema_version: SCHEMA_VERSION.to_string(),
1984 },
1985 &mut report.event_log_warnings,
1986 );
1987 report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1988}
1989
1990/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
1991/// rich `HaltedContext` into `report.halted`.
1992///
1993/// v1.3.1 B4 fix-up: under `dry_run = true`, the on-disk event-log write
1994/// is skipped; the `report.halted` slot still receives the
1995/// [`HaltedContext`] so callers can render the halt reason without
1996/// touching disk.
1997#[allow(clippy::too_many_arguments)]
1998fn record_action_err(
1999 dry_run: bool,
2000 report: &mut SyncReport,
2001 event_log: &Path,
2002 lock_path: &Path,
2003 pack_name: &str,
2004 idx: usize,
2005 action_tag: &'static str,
2006 e: ExecError,
2007) {
2008 let error_summary = truncate_error_summary(&e);
2009 append_manifest_event(
2010 dry_run,
2011 event_log,
2012 lock_path,
2013 &Event::ActionHalted {
2014 ts: Utc::now(),
2015 id: pack_name.to_string(),
2016 action_idx: idx,
2017 action_name: action_tag.to_string(),
2018 error_summary,
2019 schema_version: SCHEMA_VERSION.to_string(),
2020 },
2021 &mut report.event_log_warnings,
2022 );
2023 let recovery_hint = recovery_hint_for(&e);
2024 report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
2025 pack: pack_name.to_string(),
2026 action_idx: idx,
2027 action_name: action_tag.to_string(),
2028 error: e,
2029 recovery_hint,
2030 })));
2031}
2032
2033/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
2034/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
2035/// stays uniform.
2036fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
2037 use crate::pack::Action;
2038 match action {
2039 Action::Symlink(_) => "symlink",
2040 Action::Unlink(_) => "unlink",
2041 Action::Env(_) => "env",
2042 Action::Mkdir(_) => "mkdir",
2043 Action::Rmdir(_) => "rmdir",
2044 Action::Require(_) => "require",
2045 Action::When(_) => "when",
2046 Action::Exec(_) => "exec",
2047 }
2048}
2049
2050/// Produce a bounded human summary of an [`ExecError`] for
2051/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
2052/// from pathological blowup when captured stderr is large.
2053fn truncate_error_summary(err: &ExecError) -> String {
2054 let mut s = err.to_string();
2055 if s.len() > ACTION_ERROR_SUMMARY_MAX {
2056 s.truncate(ACTION_ERROR_SUMMARY_MAX);
2057 s.push_str("…[truncated]");
2058 }
2059 s
2060}
2061
2062/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
2063/// `None` when no generic advice applies; the error's own `Display`
2064/// output is already shown by the `Halted` variant's format string.
2065fn recovery_hint_for(err: &ExecError) -> Option<String> {
2066 match err {
2067 ExecError::SymlinkDestOccupied { .. } => Some(
2068 "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
2069 .into(),
2070 ),
2071 ExecError::SymlinkPrivilegeDenied { .. } => {
2072 Some("enable Windows Developer Mode or re-run grex as administrator".into())
2073 }
2074 ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
2075 Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
2076 }
2077 ExecError::RmdirNotEmpty { .. } => {
2078 Some("set `force: true` on the rmdir action to recurse".into())
2079 }
2080 ExecError::EnvPersistenceDenied { .. } => {
2081 Some("re-run elevated (Machine scope needs admin)".into())
2082 }
2083 _ => None,
2084 }
2085}
2086
2087/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
2088///
2089/// Failures log a warning and are recorded in the report's
2090/// `event_log_warnings`; they do not abort the sync (spec: event-log write
2091/// failures are non-fatal).
2092///
2093/// # Concurrency
2094///
2095/// The append is serialized through a [`ManifestLock`] held across the
2096/// write. The lock is acquired **per action** (not once across the full
2097/// traversal) so cooperating grex processes can observe mid-progress log
2098/// state between actions; fd-lock acquisition is cheap on modern kernels
2099/// and sync runs are dominated by executor side effects, not lock waits.
2100/// This closes the bypass gap surfaced by the M3 concurrency review where
2101/// `append_event` was called without any cross-process serialisation.
2102fn append_step_event(
2103 dry_run: bool,
2104 log: &Path,
2105 lock_path: &Path,
2106 pack: &str,
2107 step: &ExecStep,
2108 warnings: &mut Vec<String>,
2109) {
2110 if dry_run {
2111 return;
2112 }
2113 let summary = format!("{}:{:?}", step.action_name, step.result);
2114 let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
2115 if let Err(e) = append_event_locked(log, lock_path, &event) {
2116 tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2117 warnings.push(format!("{}: {e}", log.display()));
2118 }
2119 // Schema version is recorded once at the manifest level by existing
2120 // manifest code; this stub uses the constant to keep a single source of
2121 // truth for forward-compat.
2122 let _ = SCHEMA_VERSION;
2123}
2124
2125/// Append a single [`Event`] under the shared [`ManifestLock`] path.
2126/// Failures are logged and recorded as non-fatal warnings — the spec
2127/// marks event-log write failures as non-aborting so a transient disk
2128/// error must not kill a sync mid-stream.
2129///
2130/// v1.3.1 B4 fix-up: when `dry_run` is `true`, this function is a no-op
2131/// — the dry-run contract forbids any write to `<workspace>/.grex/`,
2132/// including the audit `events.jsonl`. In-memory `event_log_warnings`
2133/// records remain available; only the on-disk side effect is gated.
2134fn append_manifest_event(
2135 dry_run: bool,
2136 log: &Path,
2137 lock_path: &Path,
2138 event: &Event,
2139 warnings: &mut Vec<String>,
2140) {
2141 if dry_run {
2142 return;
2143 }
2144 if let Err(e) = append_event_locked(log, lock_path, event) {
2145 tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2146 warnings.push(format!("{}: {e}", log.display()));
2147 }
2148}
2149
2150/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
2151/// created lazily on first write.
2152fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
2153 if let Some(parent) = log.parent() {
2154 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2155 }
2156 if let Some(parent) = lock_path.parent() {
2157 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2158 }
2159 let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
2160 lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
2161}
2162
2163/// Re-export a cheap helper so CLI renderers can label halted steps by node
2164/// name without reaching into the graph twice.
2165#[must_use]
2166pub fn pack_display_name(node: &PackNode) -> &str {
2167 &node.name
2168}
2169
2170/// Run a full teardown over the pack tree rooted at `pack_root`.
2171///
2172/// Mirrors [`run`] but invokes
2173/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
2174/// **reverse** post-order so a parent tears down before its children
2175/// (the inverse of install). Children composed later by an author
2176/// consequently teardown earlier, matching the declarative
2177/// auto-reverse contract (R-M5-11).
2178///
2179/// All other concerns are identical to [`run`]: workspace lock, plan-
2180/// phase validators, lockfile update skipped (teardown does not
2181/// write a `actions_hash` forward), and event-log bracketing.
2182/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
2183/// user explicitly asked to remove the pack, so we always dispatch.
2184///
2185/// # Errors
2186///
2187/// Returns the first error that halts the pipeline — see [`SyncError`].
2188///
2189/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
2190/// the parameter through teardown for parity; stages 3-4 add the polls.
2191pub fn teardown(
2192 pack_root: &Path,
2193 opts: &SyncOptions,
2194 cancel: &CancellationToken,
2195) -> Result<SyncReport, SyncError> {
2196 let _ = cancel;
2197 let workspace = prepare_workspace(pack_root, opts)?;
2198 let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
2199 let _ws_guard = match ws_lock.try_acquire() {
2200 Ok(Some(g)) => g,
2201 Ok(None) => {
2202 return Err(SyncError::WorkspaceBusy {
2203 workspace: workspace.clone(),
2204 lock_path: ws_lock_path,
2205 });
2206 }
2207 Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
2208 };
2209
2210 // v1.2.1 path (iii) — teardown is read-only against the existing
2211 // disk state (no clones / fetches / prunes). It only needs the
2212 // graph build pass; `sync_meta` is intentionally skipped here.
2213 let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
2214 let prep = prepare_run_context(pack_root, &graph, &workspace)?;
2215
2216 let mut report = SyncReport {
2217 graph,
2218 steps: Vec::new(),
2219 halted: None,
2220 event_log_warnings: Vec::new(),
2221 pre_run_recovery: prep.pre_run_recovery,
2222 // teardown does not run the legacy-layout migration — by the time
2223 // a user is tearing down, the layout has already been migrated
2224 // (or was never legacy in the first place). Surfacing an empty
2225 // list keeps the report shape symmetric with `run()`.
2226 workspace_migrations: Vec::new(),
2227 };
2228
2229 // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
2230 // Scheduler, thread it through every `ExecCtx` the teardown path
2231 // constructs. Teardown is the other user-facing verb that owns a
2232 // runtime, so it gets the same wiring.
2233 let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
2234 let scheduler = Arc::new(Scheduler::new(resolved_parallel));
2235 run_teardown(
2236 &mut report,
2237 &prep.order,
2238 &prep.vars,
2239 &workspace,
2240 &prep.event_log,
2241 &prep.lock_path,
2242 &prep.registry,
2243 &prep.pack_type_registry,
2244 resolved_parallel,
2245 &scheduler,
2246 );
2247 Ok(report)
2248}
2249
2250/// Dispatch `teardown` for every pack in **reverse** post-order.
2251/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
2252/// rather than the per-action M4 path because the trait's
2253/// auto-reverse / explicit-block logic must compose with the
2254/// registry; going through the per-action path would mean
2255/// re-implementing inverse synthesis in the sync loop.
2256#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
2257fn run_teardown(
2258 report: &mut SyncReport,
2259 order: &[usize],
2260 vars: &VarEnv,
2261 workspace: &Path,
2262 event_log: &Path,
2263 lock_path: &Path,
2264 registry: &Arc<Registry>,
2265 pack_type_registry: &Arc<PackTypeRegistry>,
2266 parallel: usize,
2267 scheduler: &Arc<Scheduler>,
2268) {
2269 let rt = build_pack_type_runtime(parallel);
2270 // Reverse post-order: root first, then children. Pack-type plugin
2271 // teardown methods reverse their own children/actions, so the
2272 // outer loop only flips the inter-pack order.
2273 for &id in order.iter().rev() {
2274 let Some(node) = report.graph.node(id) else { continue };
2275 let pack_name = node.name.clone();
2276 let pack_path = node.path.clone();
2277 let manifest = node.manifest.clone();
2278 let type_tag = manifest.r#type.as_str();
2279 if pack_type_registry.get(type_tag).is_none() {
2280 let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
2281 // Teardown has no dry-run mode — pass `false` so the
2282 // event-log writes proceed as before.
2283 record_action_err(false, report, event_log, lock_path, &pack_name, 0, "pack-type", err);
2284 return;
2285 }
2286 let ctx = ExecCtx::new(vars, &pack_path, workspace)
2287 .with_platform(Platform::current())
2288 .with_registry(registry)
2289 .with_pack_type_registry(pack_type_registry)
2290 .with_scheduler(scheduler);
2291 append_manifest_event(
2292 false,
2293 event_log,
2294 lock_path,
2295 &Event::ActionStarted {
2296 ts: Utc::now(),
2297 id: pack_name.clone(),
2298 action_idx: 0,
2299 action_name: type_tag.to_string(),
2300 schema_version: SCHEMA_VERSION.to_string(),
2301 },
2302 &mut report.event_log_warnings,
2303 );
2304 let plugin = pack_type_registry
2305 .get(type_tag)
2306 .expect("pack-type plugin must be registered (guarded above)");
2307 // feat-m6 CI fix — see dispatch_pack_type note.
2308 let step_result =
2309 rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
2310 if !record_action_outcome(
2311 false,
2312 report,
2313 event_log,
2314 lock_path,
2315 &pack_name,
2316 0,
2317 type_tag,
2318 step_result,
2319 ) {
2320 return;
2321 }
2322 }
2323}
2324
2325/// Test-only hook: append one [`Event::Sync`] through the same
2326/// [`ManifestLock`]-serialised path the sync driver uses.
2327///
2328/// Exposed so integration tests under `tests/` can exercise the locked
2329/// append helper without spinning up a full pack tree. Not intended for
2330/// downstream consumers — the signature may change without notice.
2331#[doc(hidden)]
2332pub fn __test_append_sync_event(
2333 log: &Path,
2334 lock_path: &Path,
2335 pack: &str,
2336 action_name: &str,
2337) -> Result<(), String> {
2338 let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
2339 append_event_locked(log, lock_path, &event)
2340}
2341
2342// ----------------------------------------------------------------------
2343// PR E — pre-run teardown scan
2344// ----------------------------------------------------------------------
2345
2346/// One `ActionStarted` event in the manifest log that has no matching
2347/// `ActionCompleted` or `ActionHalted` peer.
2348///
2349/// Dangling starts are the primary crash signal: the process wrote the
2350/// pre-action event, then died before the executor returned. Callers
2351/// should surface these to the operator (diagnostics only this PR; a
2352/// future `grex doctor` verb will act on them).
2353#[non_exhaustive]
2354#[derive(Debug, Clone, PartialEq, Eq)]
2355pub struct DanglingStart {
2356 /// Pack that owned the halted action.
2357 pub pack: String,
2358 /// 0-based action index within the pack.
2359 pub action_idx: usize,
2360 /// Short action kind tag.
2361 pub action_name: String,
2362 /// Timestamp the `ActionStarted` event was written.
2363 pub started_at: DateTime<Utc>,
2364}
2365
2366/// Summary of teardown artifacts found under a pack root before a sync
2367/// begins.
2368///
2369/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
2370/// proceeds regardless of what the scan finds.
2371#[non_exhaustive]
2372#[derive(Debug, Clone, Default, PartialEq, Eq)]
2373pub struct RecoveryReport {
2374 /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
2375 /// original (symlink-action rollback orphan).
2376 pub orphan_backups: Vec<PathBuf>,
2377 /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
2378 /// `backup: true`.
2379 pub orphan_tombstones: Vec<PathBuf>,
2380 /// `ActionStarted` events in the log with no matching
2381 /// `ActionCompleted`/`ActionHalted`.
2382 pub dangling_starts: Vec<DanglingStart>,
2383}
2384
2385impl RecoveryReport {
2386 /// `true` when the scan found nothing worth reporting.
2387 #[must_use]
2388 pub fn is_empty(&self) -> bool {
2389 self.orphan_backups.is_empty()
2390 && self.orphan_tombstones.is_empty()
2391 && self.dangling_starts.is_empty()
2392 }
2393}
2394
2395/// Walk `workspace` and the manifest log to find crash-recovery artifacts.
2396///
2397/// Inspects:
2398///
2399/// * `workspace` for `.grex.bak` orphans and timestamped `.grex.bak.<ts>`
2400/// tombstones. The workspace IS where children materialise (whether
2401/// the default flat-sibling layout under the pack root, or an
2402/// explicit `--workspace` override directory) so this single bounded
2403/// walk covers every backup site.
2404/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
2405/// have no matching `ActionCompleted` / `ActionHalted` successor.
2406///
2407/// Non-blocking: scan errors are swallowed to an empty report so a
2408/// half-readable directory cannot kill a sync that would otherwise
2409/// succeed. Call sites that want to surface scan failures should read
2410/// the manifest directly.
2411///
2412/// Pre-`v1.1.0` post-review fix this anchored at `pack_root_dir(pack_root)`,
2413/// which missed every backup under a `--workspace` override.
2414///
2415/// # Errors
2416///
2417/// Returns [`SyncError::Validation`] only when the manifest read itself
2418/// reports corruption. Filesystem traversal errors are swallowed.
2419pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
2420 let mut report = RecoveryReport::default();
2421 walk_for_backups(workspace, &mut report);
2422 if event_log.exists() {
2423 match read_all(event_log) {
2424 Ok(events) => {
2425 report.dangling_starts = collect_dangling_starts(&events);
2426 }
2427 Err(e) => {
2428 return Err(SyncError::Validation {
2429 errors: vec![PackValidationError::DependsOnUnsatisfied {
2430 pack: "<event-log>".into(),
2431 required: e.to_string(),
2432 }],
2433 });
2434 }
2435 }
2436 }
2437 Ok(report)
2438}
2439
2440/// Shallow directory walker (bounded depth = 6) that categorizes
2441/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
2442/// report slot. Depth-limited so a pathological workspace with a deep
2443/// tree cannot stall the scan; realistic layouts are well under six
2444/// levels.
2445fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
2446 walk_for_backups_inner(root, report, 0);
2447}
2448
2449fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
2450 const MAX_DEPTH: u32 = 6;
2451 if depth > MAX_DEPTH {
2452 return;
2453 }
2454 let Ok(entries) = std::fs::read_dir(dir) else { return };
2455 for entry_result in entries {
2456 let entry = match entry_result {
2457 Ok(e) => e,
2458 Err(e) => {
2459 tracing::warn!(
2460 target: "grex::sync::recover",
2461 "skipping unreadable entry under `{}`: {e}",
2462 dir.display(),
2463 );
2464 continue;
2465 }
2466 };
2467 let path = entry.path();
2468 let name = entry.file_name();
2469 let Some(name_str) = name.to_str() else { continue };
2470 if name_str.ends_with(".grex.bak") {
2471 report.orphan_backups.push(path.clone());
2472 continue;
2473 }
2474 if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
2475 // `rsplit_once` returns `(prefix, suffix)`; suffix is the
2476 // timestamp chunk. Accept any non-empty suffix — the exact
2477 // timestamp shape is `fs_executor` internal.
2478 if !rest.1.is_empty() {
2479 report.orphan_tombstones.push(path.clone());
2480 continue;
2481 }
2482 }
2483 // Recurse only into real directories (not symlinks, to avoid
2484 // traversing into the workspace's cloned repos via aliased
2485 // paths). `entry.file_type()` does NOT follow symlinks (unlike
2486 // `entry.metadata()` which would dereference and report the
2487 // target's type — defeating the very check this guards). The
2488 // symlink-skip is also explicit so the intent is recoverable
2489 // from the source: backup-recovery never crosses a symlink.
2490 let Ok(ft) = entry.file_type() else { continue };
2491 if ft.is_symlink() {
2492 continue;
2493 }
2494 if ft.is_dir() {
2495 walk_for_backups_inner(&path, report, depth + 1);
2496 }
2497 }
2498}
2499
2500/// Reduce an event stream to a list of `ActionStarted` records with no
2501/// matching terminator.
2502///
2503/// Matching is positional per `(pack, action_idx)`: a later
2504/// `ActionCompleted` or `ActionHalted` with the same key clears the
2505/// entry. Whatever remains in the map after the pass is dangling.
2506fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
2507 use std::collections::HashMap;
2508 let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
2509 for ev in events {
2510 match ev {
2511 // v1.3.1 schema v2: pack-id field is `id`. The destructure
2512 // binds `id` and `schema_version` is ignored via `..`.
2513 Event::ActionStarted { ts, id, action_idx, action_name, .. } => {
2514 open.insert(
2515 (id.clone(), *action_idx),
2516 DanglingStart {
2517 pack: id.clone(),
2518 action_idx: *action_idx,
2519 action_name: action_name.clone(),
2520 started_at: *ts,
2521 },
2522 );
2523 }
2524 Event::ActionCompleted { id, action_idx, .. }
2525 | Event::ActionHalted { id, action_idx, .. } => {
2526 open.remove(&(id.clone(), *action_idx));
2527 }
2528 _ => {}
2529 }
2530 }
2531 let mut out: Vec<DanglingStart> = open.into_values().collect();
2532 out.sort_by_key(|a| a.started_at);
2533 out
2534}
2535
2536#[cfg(test)]
2537mod synthetic_transition_tests {
2538 //! v1.1.1 — regression cover for the pack-shape transition fixes.
2539 //!
2540 //! These tests exercise [`skip_eligibility`] / [`upsert_lock_entry`]
2541 //! directly (no walker, no fs) so the assertion is on the plumbing
2542 //! itself: skip eligibility must require synthetic-flag agreement
2543 //! even when the actions hash matches by coincidence, and the
2544 //! upsert path must record the real-to-synthetic downgrade in the
2545 //! lockfile so the operator's lockfile reflects what just happened.
2546 use super::{skip_eligibility, upsert_lock_entry, LockEntry};
2547 use crate::lockfile::compute_actions_hash;
2548 use chrono::{TimeZone, Utc};
2549 use std::collections::HashMap;
2550
2551 fn ts() -> chrono::DateTime<Utc> {
2552 Utc.with_ymd_and_hms(2026, 4, 27, 10, 0, 0).unwrap()
2553 }
2554
2555 /// Stable empty-actions hash with a fixed commit SHA. The same
2556 /// inputs feed both the prior (real) and the new (synthetic)
2557 /// configuration in the regression below, which is exactly the
2558 /// coincidental-hash-match scenario FIX 3 must catch.
2559 fn stable_hash() -> String {
2560 compute_actions_hash(&[], "deadbeef")
2561 }
2562
2563 fn prior_entry(synthetic: bool) -> LockEntry {
2564 LockEntry {
2565 id: "alpha".into(),
2566 path: "alpha".into(),
2567 sha: "deadbeef".into(),
2568 branch: "main".into(),
2569 installed_at: ts(),
2570 actions_hash: stable_hash(),
2571 schema_version: "1".into(),
2572 synthetic,
2573 }
2574 }
2575
2576 /// FIX 3 — pack flips from real → synthetic but `actions_hash` and
2577 /// `commit_sha` happen to match. The skip MUST be invalidated so
2578 /// the upsert path re-emits the lockfile entry with `synthetic =
2579 /// true`.
2580 #[test]
2581 fn skip_eligibility_invalidates_when_synthetic_flag_flips() {
2582 let prior = prior_entry(false);
2583 let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2584 assert!(decision.is_none(), "skip must be invalidated when synthetic flag flips");
2585 }
2586
2587 /// Same hash, same synthetic flag → skip is allowed (baseline).
2588 #[test]
2589 fn skip_eligibility_allows_skip_when_synthetic_matches() {
2590 let prior = prior_entry(true);
2591 let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2592 assert_eq!(
2593 decision.as_deref(),
2594 Some(stable_hash().as_str()),
2595 "skip must be honoured when synthetic flag matches",
2596 );
2597 }
2598
2599 /// `dry_run` and `force` always disable the skip regardless of
2600 /// flag agreement.
2601 #[test]
2602 fn skip_eligibility_respects_dry_run_and_force() {
2603 let prior = prior_entry(true);
2604 assert!(skip_eligibility(&[], "deadbeef", true, &prior, true, false).is_none());
2605 assert!(skip_eligibility(&[], "deadbeef", true, &prior, false, true).is_none());
2606 }
2607
2608 /// FIX 4 — `upsert_lock_entry` records the downgrade in the
2609 /// lockfile (entry flips to `synthetic = true`) when the prior
2610 /// entry was real. The `tracing::warn!` is fire-and-forget, but
2611 /// the lockfile transition itself is observable and must be
2612 /// correct.
2613 #[test]
2614 fn upsert_lock_entry_records_real_to_synthetic_downgrade() {
2615 let mut prior: HashMap<String, LockEntry> = HashMap::new();
2616 prior.insert(
2617 "beta".into(),
2618 LockEntry {
2619 id: "beta".into(),
2620 path: "beta".into(),
2621 sha: "deadbeef".into(),
2622 branch: "main".into(),
2623 installed_at: ts(),
2624 actions_hash: stable_hash(),
2625 schema_version: "1".into(),
2626 synthetic: false,
2627 },
2628 );
2629 let mut next: HashMap<String, LockEntry> = HashMap::new();
2630
2631 upsert_lock_entry(&prior, &mut next, "beta", "deadbeef", &stable_hash(), true, None);
2632
2633 let entry = next.get("beta").expect("entry must be upserted");
2634 assert!(entry.synthetic, "downgraded entry must carry synthetic = true");
2635 assert_eq!(entry.actions_hash, stable_hash(), "actions_hash must reflect current run");
2636 }
2637
2638 /// Upsert path is a no-op for the steady-state case (synthetic →
2639 /// synthetic): the entry is replaced with the current run's
2640 /// timestamp/hash but the synthetic flag is preserved. This
2641 /// guards against an over-eager warning fire.
2642 #[test]
2643 fn upsert_lock_entry_no_op_for_steady_state_synthetic() {
2644 let mut prior: HashMap<String, LockEntry> = HashMap::new();
2645 prior.insert(
2646 "gamma".into(),
2647 LockEntry {
2648 id: "gamma".into(),
2649 path: "gamma".into(),
2650 sha: "deadbeef".into(),
2651 branch: "main".into(),
2652 installed_at: ts(),
2653 actions_hash: stable_hash(),
2654 schema_version: "1".into(),
2655 synthetic: true,
2656 },
2657 );
2658 let mut next: HashMap<String, LockEntry> = HashMap::new();
2659
2660 upsert_lock_entry(&prior, &mut next, "gamma", "deadbeef", &stable_hash(), true, None);
2661
2662 let entry = next.get("gamma").expect("entry must be upserted");
2663 assert!(entry.synthetic, "synthetic must remain true on no-op refresh");
2664 }
2665}
2666
2667#[cfg(test)]
2668mod error_display_tests {
2669 //! v1.2.0 Stage 1.k — `SyncError` Display assertions.
2670 //!
2671 //! Pure construction + `to_string()` checks. Variants land dormant —
2672 //! Stage 1.g (rayon scheduler) wires `SchedulerCancelled` once
2673 //! cooperative cancel polls reach the parallel walker.
2674 use super::SyncError;
2675
2676 #[test]
2677 fn test_sync_error_scheduler_cancelled_display() {
2678 let err = SyncError::SchedulerCancelled;
2679 assert_eq!(err.to_string(), "sync cancelled by user");
2680 }
2681}
2682
2683#[cfg(test)]
2684mod sync_options_v1_2_0_tests {
2685 //! v1.2.0 Stage 1.m — leaf cover for new [`SyncOptions`] fields.
2686 //!
2687 //! These tests are mechanical default-value assertions plus simple
2688 //! builder/clone round-trips. They exist to lock down that:
2689 //!
2690 //! 1. Adding the new fields preserves v1.1.1 behavior (defaults
2691 //! leave existing call sites observably unchanged).
2692 //! 2. The shape is what later walker stages (1.h / 1.j / 1.l) will
2693 //! consume — if any of these fields are renamed or change type,
2694 //! those stages must update in lock-step.
2695 //!
2696 //! The fields themselves are *dormant placeholders* at 1.m scope —
2697 //! no behavior wiring lives in this stage.
2698 use super::{pack_root_dir, resolve_workspace, SyncError, SyncOptions};
2699
2700 /// `force_prune` defaults to `false` so existing call sites refuse
2701 /// to drop dirty trees (v1.1.1 behavior).
2702 #[test]
2703 fn test_sync_options_default_force_prune_false() {
2704 let opts = SyncOptions::default();
2705 assert!(!opts.force_prune, "force_prune must default to false");
2706 }
2707
2708 /// `force_prune_with_ignored` defaults to `false` so existing call
2709 /// sites refuse to drop ignored content (v1.1.1 behavior).
2710 #[test]
2711 fn test_sync_options_default_force_prune_with_ignored_false() {
2712 let opts = SyncOptions::default();
2713 assert!(!opts.force_prune_with_ignored, "force_prune_with_ignored must default to false");
2714 }
2715
2716 /// `migrate_lockfile` defaults to `false` so the walker errors on
2717 /// legacy v1.1.1 lockfile shapes unless the caller opts in.
2718 #[test]
2719 fn test_sync_options_default_migrate_lockfile_false() {
2720 let opts = SyncOptions::default();
2721 assert!(!opts.migrate_lockfile, "migrate_lockfile must default to false");
2722 }
2723
2724 /// `recurse` defaults to `true` — the walker descends into nested
2725 /// meta-children unless `--shallow` is requested.
2726 #[test]
2727 fn test_sync_options_default_recurse_true() {
2728 let opts = SyncOptions::default();
2729 assert!(opts.recurse, "recurse must default to true");
2730 }
2731
2732 /// `max_depth` defaults to `None` — unbounded recursion when
2733 /// `recurse` is `true`.
2734 #[test]
2735 fn test_sync_options_default_max_depth_none() {
2736 let opts = SyncOptions::default();
2737 assert!(opts.max_depth.is_none(), "max_depth must default to None");
2738 }
2739
2740 /// Setting `force_prune_with_ignored = true` alongside
2741 /// `force_prune = true` is the documented "stronger" combination.
2742 /// No contradiction: `with_ignored` is the harder override and
2743 /// implies the base `force_prune` semantics. This test guards the
2744 /// invariant that both flags coexist as plain `bool` (not enum)
2745 /// so callers can set them independently without runtime panic.
2746 #[test]
2747 fn test_sync_options_force_prune_with_ignored_implies_force_prune() {
2748 let opts = SyncOptions {
2749 force_prune: true,
2750 force_prune_with_ignored: true,
2751 ..SyncOptions::default()
2752 };
2753 assert!(opts.force_prune);
2754 assert!(opts.force_prune_with_ignored);
2755 }
2756
2757 /// `max_depth = Some(n)` paired with `recurse = true` is the
2758 /// documented `--shallow=N` shape. The fields are independent
2759 /// `bool` / `Option<usize>` so callers may set `max_depth` while
2760 /// `recurse` is left at its default (`true`). Stage 1.j will
2761 /// later define the precise interaction; this test only locks
2762 /// the two fields' types and defaults.
2763 #[test]
2764 fn test_sync_options_max_depth_pairs_with_recurse() {
2765 let opts = SyncOptions { max_depth: Some(2), ..SyncOptions::default() };
2766 assert_eq!(opts.max_depth, Some(2));
2767 assert!(opts.recurse, "recurse stays at its default (true) when only max_depth is set");
2768 }
2769
2770 /// Round-trip via `Clone` — guards that all new fields participate
2771 /// in the existing `Clone` derive (no `#[clone(skip)]` slipped in).
2772 #[test]
2773 fn test_sync_options_clone_preserves_new_fields() {
2774 let opts = SyncOptions {
2775 force_prune: true,
2776 force_prune_with_ignored: true,
2777 migrate_lockfile: true,
2778 recurse: false,
2779 max_depth: Some(7),
2780 ..SyncOptions::default()
2781 };
2782 let cloned = opts.clone();
2783 assert_eq!(cloned.force_prune, opts.force_prune);
2784 assert_eq!(cloned.force_prune_with_ignored, opts.force_prune_with_ignored);
2785 assert_eq!(cloned.migrate_lockfile, opts.migrate_lockfile);
2786 assert_eq!(cloned.recurse, opts.recurse);
2787 assert_eq!(cloned.max_depth, opts.max_depth);
2788 }
2789
2790 // ------------------------------------------------------------------
2791 // v1.2.1 path (iii) — `resolve_workspace` canonicalisation tests
2792 // ------------------------------------------------------------------
2793
2794 /// `--workspace` pointing at a non-existent directory must fail
2795 /// fast with a Validation error citing the offending path. We
2796 /// explicitly do NOT mkdir-p someone else's typo — `--workspace`
2797 /// is an opt-in operator decision and a missing target is always
2798 /// a configuration mistake.
2799 #[test]
2800 fn test_resolve_workspace_errors_on_missing_override_dir() {
2801 let tmp = tempfile::tempdir().unwrap();
2802 let missing = tmp.path().join("nope");
2803 let pack_root = tmp.path();
2804 let err = resolve_workspace(pack_root, Some(missing.as_path())).expect_err("must fail");
2805 match err {
2806 SyncError::Validation { errors } => {
2807 assert!(errors.iter().any(|e| format!("{e}").contains("does not exist")));
2808 }
2809 other => panic!("expected Validation, got {other:?}"),
2810 }
2811 }
2812
2813 /// `--workspace = None` is the default cwd-meta path — no
2814 /// canonicalize, no fail-on-missing. The pack-root path is
2815 /// returned verbatim (post `pack_root_dir` normalisation).
2816 #[test]
2817 fn test_resolve_workspace_none_returns_pack_root_dir() {
2818 let tmp = tempfile::tempdir().unwrap();
2819 let pack_root = tmp.path().join("nonexistent-yet");
2820 let resolved = resolve_workspace(&pack_root, None).expect("None override is always Ok");
2821 assert_eq!(resolved, pack_root_dir(&pack_root));
2822 }
2823
2824 /// `--workspace = Some(<existing>)` returns the canonicalised path.
2825 /// On Windows this typically inserts the `\\?\` long-path prefix;
2826 /// on Unix it resolves any `..` / symlink components. Either way
2827 /// the returned path is what every downstream pass anchors against.
2828 #[test]
2829 fn test_resolve_workspace_canonicalises_existing_override() {
2830 let tmp = tempfile::tempdir().unwrap();
2831 let real = tmp.path().join("real-ws");
2832 std::fs::create_dir_all(&real).unwrap();
2833 let pack_root = tmp.path();
2834 let resolved =
2835 resolve_workspace(pack_root, Some(real.as_path())).expect("existing dir must resolve");
2836 let canonical = real.canonicalize().unwrap();
2837 assert_eq!(resolved, canonical);
2838 }
2839}