grex_core/sync.rs
1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`crate::tree::sync_meta`] +
7//! [`crate::tree::build_graph`] + [`FsPackLoader`] + a `GitBackend`.
8//! 2. Run plan-phase validators (manifest-level + graph-level).
9//! 3. Execute every action via a pluggable [`ActionExecutor`]
10//! ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
11//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
12//! `.grex/events.jsonl` event log.
13//!
14//! # Traversal order
15//!
16//! Nodes are executed in **depth-first post-order**: children fully install
17//! before their parent. Rationale: parent packs commonly `require:` artifacts
18//! created by children (e.g. a parent symlink whose `src` lives inside a
19//! child). Running the root last matches the overlay-style dotfile-install
20//! intent authors expect, and it matches how `walker.walk` is structured
21//! (children are hydrated before the recursion returns).
22//!
23//! # Decoupling
24//!
25//! The CLI crate drives this module through a thin `run()` entry point;
26//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
27//! expressions, ref overrides) can land in later milestones without breaking
28//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
29//! variant set.
30
31use std::borrow::Cow;
32use std::fs;
33use std::path::{Path, PathBuf};
34use std::sync::Arc;
35
36use chrono::{DateTime, Utc};
37use globset::{Glob, GlobSet, GlobSetBuilder};
38use thiserror::Error;
39use tokio_util::sync::CancellationToken;
40
41use crate::execute::{
42 ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
43 PlanExecutor, Platform, StepKind,
44};
45use crate::fs::{ManifestLock, ScopedLock};
46use crate::git::GixBackend;
47use crate::lockfile::{
48 compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
49};
50use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
51use crate::pack::{Action, PackValidationError};
52use crate::plugin::{PackTypeRegistry, Registry};
53use crate::scheduler::Scheduler;
54use crate::tree::{
55 build_graph, sync_meta, FsPackLoader, PackGraph, PackNode, SyncMetaOptions, TreeError,
56};
57use crate::vars::VarEnv;
58
59/// Inputs to [`run`].
60///
61/// Fields are public-writable so call sites can construct with struct
62/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
63/// so future knobs (parallelism, filter expressions, additional ref
64/// strategies) can land without breaking library consumers who
65/// constructed with explicit-literal syntax. Forces callers to use
66/// struct-update syntax (`..Default::default()`).
67#[non_exhaustive]
68#[derive(Debug, Clone)]
69pub struct SyncOptions {
70 /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
71 pub dry_run: bool,
72 /// When `false`, skip plan-phase validators (manifest + graph). Debug
73 /// escape hatch; production callers should leave this `true`.
74 pub validate: bool,
75 /// Override workspace directory. `None` → derived from `pack_root`
76 /// (the directory holding `.grex/pack.yaml`).
77 ///
78 /// **v1.2.1 path (iii) semantics**: when `Some`, this path IS the
79 /// canonical meta directory. Children resolve parent-relatively as
80 /// `<workspace>/<child.path>` and `<workspace>/.grex/pack.yaml` is
81 /// where the root manifest is read from. The path MUST exist;
82 /// symlinks are resolved via `fs::canonicalize` to a single
83 /// inode-stable form. Pre-v1.2.1 the override only re-anchored
84 /// children — that legacy split is retired.
85 pub workspace: Option<PathBuf>,
86 /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
87 /// `Some`, every child pack clone/checkout uses this ref instead of
88 /// the declared `child.ref`. Empty strings are rejected at the CLI
89 /// layer.
90 pub ref_override: Option<String>,
91 /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
92 /// strings — compiled internally via an in-crate `globset` helper so the
93 /// `globset` crate version does not leak into the public API.
94 /// `None` / empty means every pack runs (M3 semantics). Matching is
95 /// against the pack's **workspace-relative** path normalized to
96 /// forward-slash form.
97 pub only_patterns: Option<Vec<String>>,
98 /// Bypass the lockfile hash-match skip (`grex sync --force`). When
99 /// `true`, every pack re-executes even if its `actions_hash` is
100 /// unchanged from the prior lockfile.
101 pub force: bool,
102 /// Max parallel pack ops for this sync run (feat-m6-1).
103 ///
104 /// * `None` → callers default to `num_cpus::get()` at CLI layer.
105 /// Library callers who construct `SyncOptions` directly and leave
106 /// this `None` get `num_cpus::get()` semantics too — the sync
107 /// driver resolves the default in one place so the scheduler slot
108 /// on every `ExecCtx` is always populated.
109 /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
110 /// * `Some(1)` → serial fast-path.
111 /// * `Some(n >= 2)` → bounded parallel.
112 pub parallel: Option<usize>,
113 /// v1.2.0 Stage 1.l prep — when `true`, walker Phase 2 may drop
114 /// dirty trees during prune. Still refuses ignored content unless
115 /// [`SyncOptions::force_prune_with_ignored`] is also `true`.
116 /// Default `false` preserves v1.1.1 behavior (refuse all dirty
117 /// drops).
118 pub force_prune: bool,
119 /// v1.2.0 Stage 1.l prep — when `true` (implies
120 /// [`SyncOptions::force_prune`]), walker Phase 2 also drops
121 /// ignored content. Hard override — the strongest level. Default
122 /// `false` preserves v1.1.1 behavior.
123 pub force_prune_with_ignored: bool,
124 /// v1.2.1 Item 5b — when `true` AND `force_prune` (or
125 /// `force_prune_with_ignored`) is set, divert Phase 2 prunes
126 /// through the snapshot-then-unlink quarantine pipeline. The
127 /// dest's full subtree is recursively copied to
128 /// `<workspace>/.grex/trash/<ISO8601>/<basename>/` BEFORE
129 /// `unlink(dest)` fires. Snapshot or audit-fsync failure aborts
130 /// the prune (no unlink). Lean theorem
131 /// `quarantine_snapshot_precedes_delete` proves the safety
132 /// contract. Default `false` preserves v1.2.0 direct-unlink
133 /// behavior. Has no effect unless one of the `force_prune*`
134 /// flags is also set (the CLI enforces this via
135 /// `requires = "force_prune"`; library callers who set this
136 /// with neither flag get a no-op since Phase 2 will not enter
137 /// the override path at all).
138 pub quarantine: bool,
139 /// v1.2.0 Stage 1.h opt-in — when `true`, the walker rewrites a
140 /// legacy v1.1.1 lockfile in place to the v1.2.0 shape. When
141 /// `false` (default), the walker errors on the legacy shape so
142 /// migration is always an explicit caller decision.
143 pub migrate_lockfile: bool,
144 /// v1.2.0 Stage 1.j prep — when `true` (default), the walker
145 /// descends into nested meta-children. `doctor --shallow` flips
146 /// this to `false` so only the immediate workspace is inspected.
147 pub recurse: bool,
148 /// v1.2.0 Stage 1.j prep — pairs with
149 /// [`SyncOptions::recurse`] for `--shallow=N`. `None` (default)
150 /// is unbounded recursion when `recurse` is `true`. `Some(n)`
151 /// caps depth at `n` levels of nesting.
152 pub max_depth: Option<usize>,
153 /// v1.2.5 — when `Some(N)`, every meta sync starts with a
154 /// best-effort GC sweep over `<meta>/.grex/trash/`, deleting
155 /// entries older than `N` days. `None` (default) preserves the
156 /// v1.2.1 indefinite-retention behavior. The CLI surfaces this
157 /// as `grex sync --retain-days N`; library callers wire it via
158 /// [`SyncOptions::with_retain_days`]. Sweep failures log via
159 /// `tracing::warn!` and DO NOT halt the sync.
160 pub retain_days: Option<u32>,
161}
162
163impl Default for SyncOptions {
164 fn default() -> Self {
165 Self {
166 dry_run: false,
167 validate: true,
168 workspace: None,
169 ref_override: None,
170 only_patterns: None,
171 force: false,
172 parallel: None,
173 // v1.2.0 Stage 1.m additions — defaults preserve v1.1.1
174 // behavior. Each field is a dormant placeholder until
175 // its corresponding walker stage wires it.
176 force_prune: false,
177 force_prune_with_ignored: false,
178 quarantine: false,
179 migrate_lockfile: false,
180 recurse: true,
181 max_depth: None,
182 retain_days: None,
183 }
184 }
185}
186
187/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
188/// Empty / absent input yields `Ok(None)` so M3's zero-config path
189/// (every pack runs) stays the default.
190fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
191 let Some(pats) = patterns else { return Ok(None) };
192 if pats.is_empty() {
193 return Ok(None);
194 }
195 let mut builder = GlobSetBuilder::new();
196 for p in pats {
197 let glob = Glob::new(p)
198 .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
199 builder.add(glob);
200 }
201 let set = builder
202 .build()
203 .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
204 Ok(Some(set))
205}
206
207impl SyncOptions {
208 /// Default options: wet-run, validators enabled, default workspace path.
209 #[must_use]
210 pub fn new() -> Self {
211 Self::default()
212 }
213
214 /// Set `dry_run`.
215 #[must_use]
216 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
217 self.dry_run = dry_run;
218 self
219 }
220
221 /// Set `validate`.
222 #[must_use]
223 pub fn with_validate(mut self, validate: bool) -> Self {
224 self.validate = validate;
225 self
226 }
227
228 /// Set `workspace` override.
229 #[must_use]
230 pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
231 self.workspace = workspace;
232 self
233 }
234
235 /// Set `ref_override` (`--ref`).
236 #[must_use]
237 pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
238 self.ref_override = ref_override;
239 self
240 }
241
242 /// Set `only_patterns` (`--only`). Empty vector or `None` disables
243 /// the filter.
244 #[must_use]
245 pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
246 self.only_patterns = patterns;
247 self
248 }
249
250 /// Set `force` (`--force`).
251 #[must_use]
252 pub fn with_force(mut self, force: bool) -> Self {
253 self.force = force;
254 self
255 }
256
257 /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
258 /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
259 #[must_use]
260 pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
261 self.parallel = parallel;
262 self
263 }
264
265 /// Set `force_prune` (`--force-prune`). See
266 /// [`SyncOptions::force_prune`] for the override matrix.
267 #[must_use]
268 pub fn with_force_prune(mut self, force_prune: bool) -> Self {
269 self.force_prune = force_prune;
270 self
271 }
272
273 /// Set `force_prune_with_ignored` (`--force-prune-with-ignored`).
274 /// See [`SyncOptions::force_prune_with_ignored`] for the override
275 /// matrix.
276 #[must_use]
277 pub fn with_force_prune_with_ignored(mut self, force_prune_with_ignored: bool) -> Self {
278 self.force_prune_with_ignored = force_prune_with_ignored;
279 self
280 }
281
282 /// Set `quarantine` (`--quarantine`). See
283 /// [`SyncOptions::quarantine`] for the snapshot-before-delete
284 /// contract. Has no effect unless [`SyncOptions::force_prune`]
285 /// or [`SyncOptions::force_prune_with_ignored`] is also set.
286 #[must_use]
287 pub fn with_quarantine(mut self, quarantine: bool) -> Self {
288 self.quarantine = quarantine;
289 self
290 }
291
292 /// Set `retain_days` (`--retain-days N`). See
293 /// [`SyncOptions::retain_days`] for the GC-sweep contract.
294 /// `None` preserves v1.2.1 indefinite-retention behavior;
295 /// `Some(N)` triggers a best-effort sweep at the start of every
296 /// meta sync.
297 #[must_use]
298 pub fn with_retain_days(mut self, retain_days: Option<u32>) -> Self {
299 self.retain_days = retain_days;
300 self
301 }
302}
303
304/// One executed (or planned) action step in a sync run.
305///
306/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
307/// plugin provenance) can land without breaking library consumers who
308/// destructure the struct.
309#[non_exhaustive]
310#[derive(Debug, Clone)]
311pub struct SyncStep {
312 /// Name of the pack that owned the action.
313 pub pack: String,
314 /// 0-based index into the pack's top-level `actions` vector.
315 pub action_idx: usize,
316 /// The [`ExecStep`] record emitted by the executor.
317 pub exec_step: ExecStep,
318}
319
320/// Outcome of a [`run`] invocation.
321///
322/// On fail-fast termination, `halted` carries the error that stopped the
323/// sync; every completed step up to that point is still in `steps` so
324/// callers can render a partial transcript.
325///
326/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
327/// can land without breaking library consumers who destructure the struct.
328#[non_exhaustive]
329#[derive(Debug)]
330pub struct SyncReport {
331 /// Fully-walked pack graph (present even on halted runs).
332 pub graph: PackGraph,
333 /// Steps produced by the executor, in execution order.
334 pub steps: Vec<SyncStep>,
335 /// `Some(e)` if execution stopped before all actions ran.
336 pub halted: Option<SyncError>,
337 /// Non-fatal manifest-append warnings (one per failed event append).
338 /// Kept as a separate field because spec marks event-log write failures
339 /// as non-aborting.
340 pub event_log_warnings: Vec<String>,
341 /// `Some(r)` when the pre-run teardown scan found orphaned backup
342 /// files or dangling [`Event::ActionStarted`] records from a prior
343 /// crashed run. Informational only — the report is still returned and
344 /// the sync proceeds. CLI renderers should surface a warning so the
345 /// operator can decide whether to run a future `grex doctor` verb.
346 pub pre_run_recovery: Option<RecoveryReport>,
347 /// One entry per child whose legacy `.grex/workspace/<name>/` layout
348 /// was relocated (or considered for relocation) on this sync. Empty
349 /// when no legacy directory was found — the common case for any
350 /// workspace built fresh on v1.1.0+. CLI renderers should surface
351 /// the list so operators see what changed.
352 pub workspace_migrations: Vec<WorkspaceMigration>,
353}
354
355/// One legacy-layout migration attempt. `outcome` distinguishes the
356/// move-succeeded case from the don't-clobber-user-data case so CLI
357/// renderers can present different advice to the operator.
358#[non_exhaustive]
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct WorkspaceMigration {
361 /// Source path under the legacy `.grex/workspace/<name>/` location,
362 /// rendered relative to the pack root for log readability.
363 pub from: PathBuf,
364 /// Destination flat-sibling path `<pack_root>/<name>/`, relative to
365 /// the pack root.
366 pub to: PathBuf,
367 /// What happened.
368 pub outcome: MigrationOutcome,
369}
370
371/// Outcome of one legacy-layout migration attempt.
372#[non_exhaustive]
373#[derive(Debug, Clone, PartialEq, Eq)]
374pub enum MigrationOutcome {
375 /// Legacy directory was renamed onto the flat-sibling slot.
376 Migrated,
377 /// Both legacy and flat-sibling slots existed. Skipped — the user
378 /// must inspect and reconcile manually so we never silently delete
379 /// either.
380 SkippedBothExist,
381 /// Flat-sibling slot already had a non-grex file or directory in
382 /// the way. Skipped — refusing to clobber user data even when the
383 /// legacy slot is plainly the source of truth.
384 SkippedDestOccupied,
385 /// `fs::rename` failed (e.g. cross-volume, ACL denied). The legacy
386 /// directory is still in place; surfaced so the operator can move
387 /// it manually.
388 Failed { error: String },
389}
390
391/// Rich context attached to a [`SyncError::Halted`] variant.
392///
393/// Packages the pack + action position together with the underlying
394/// executor error and an optional human-readable recovery hint. Marked
395/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
396/// land without breaking `match` arms or struct destructures.
397#[non_exhaustive]
398#[derive(Debug)]
399pub struct HaltedContext {
400 /// Name of the pack that owned the halted action.
401 pub pack: String,
402 /// 0-based index into the pack's top-level `actions` vector.
403 pub action_idx: usize,
404 /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
405 pub action_name: String,
406 /// Underlying executor error.
407 pub error: ExecError,
408 /// Optional next-step suggestion for the operator. `None` when no
409 /// generic hint applies — the executor error's own `Display` already
410 /// tells the story.
411 pub recovery_hint: Option<String>,
412}
413
414/// Error taxonomy surfaced by [`run`].
415#[non_exhaustive]
416#[derive(Debug, Error)]
417pub enum SyncError {
418 /// The pack-tree walker failed (loader error, git error, cycle, …).
419 #[error("tree walk failed: {0}")]
420 Tree(#[from] TreeError),
421 /// One or more plan-phase validators flagged the graph.
422 #[error("validation failed: {errors:?}")]
423 Validation {
424 /// Aggregated errors from manifest-level + graph-level validators.
425 errors: Vec<PackValidationError>,
426 },
427 /// An action executor returned an error.
428 ///
429 /// Retained for backward compatibility; new call sites should prefer
430 /// [`SyncError::Halted`] which carries full pack + action context.
431 /// Kept non-deprecated because [`From<ExecError>`] still materialises
432 /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
433 #[error("action execution failed: {0}")]
434 Exec(#[from] ExecError),
435 /// Action execution halted; full context (pack, action index, error,
436 /// optional recovery hint) lives in [`HaltedContext`]. This is the
437 /// variant the sync driver emits — [`SyncError::Exec`] is only
438 /// surfaced by ancillary code paths.
439 #[error(
440 "sync halted at pack `{}` action #{} ({}): {}",
441 .0.pack, .0.action_idx, .0.action_name, .0.error
442 )]
443 Halted(Box<HaltedContext>),
444 /// Another `grex` process (or thread) already holds the workspace-level
445 /// lock. The running sync refused to start to avoid racing two concurrent
446 /// walkers into the same workspace. If the lock file at `lock_path` is
447 /// stale (no other grex is actually running), remove it by hand.
448 #[error(
449 "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
450 )]
451 WorkspaceBusy {
452 /// Resolved workspace directory that the current run tried to lock.
453 workspace: PathBuf,
454 /// Sidecar lock file that is currently held.
455 lock_path: PathBuf,
456 },
457 /// Reading or parsing the resolved-state lockfile failed. Surfaced as
458 /// its own variant (rather than folded into `Validation`) because a
459 /// corrupt / unreadable lockfile is an I/O or schema fault, not a
460 /// dependency-satisfaction fault. Resolution is operator-level
461 /// (restore a backup, delete the file, re-sync), not author-level.
462 #[error("lockfile `{path}` failed to load: {source}")]
463 Lockfile {
464 /// Lockfile path that failed to load.
465 path: PathBuf,
466 /// Underlying lockfile error.
467 #[source]
468 source: LockfileError,
469 },
470 /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
471 /// as its own variant so the CLI can map it to a dedicated usage
472 /// error exit code instead of the generic sync-failure bucket.
473 #[error("invalid --only glob `{pattern}`: {source}")]
474 InvalidOnlyGlob {
475 /// The raw pattern string that failed to compile.
476 pattern: String,
477 /// Underlying globset error.
478 #[source]
479 source: globset::Error,
480 },
481 /// Migrating the v1.x event log (`grex.jsonl`) to the v2 canonical
482 /// path (`.grex/events.jsonl`) failed. Operator-level resolution
483 /// (check filesystem permissions, free disk space, then retry).
484 #[error("event-log migration failed: {0}")]
485 EventLogMigration(#[source] crate::manifest::ManifestError),
486 /// Cooperative cancellation fired (Ctrl-C / SIGTERM) during a
487 /// parallel sync. v1.2.0 Stage 1.g wires the rayon walker to surface
488 /// this distinct-from-failure variant so the CLI can exit with a
489 /// dedicated cancellation code instead of a generic sync error.
490 /// Dormant until Stage 1.g — the existing CLI does not yet emit it.
491 #[error("sync cancelled by user")]
492 SchedulerCancelled,
493}
494
495impl Clone for SyncError {
496 fn clone(&self) -> Self {
497 // `TreeError` / `ExecError` do not implement `Clone` (they wrap
498 // `std::io::Error`-adjacent values). Halts carry only a display
499 // rendering in the report; we re-materialise via a synthetic
500 // `Validation` variant so `SyncReport` can be `Clone`-safe for
501 // observability tooling without widening the taxonomy.
502 match self {
503 Self::Tree(e) => Self::Validation {
504 errors: vec![PackValidationError::DependsOnUnsatisfied {
505 pack: "<tree>".into(),
506 required: e.to_string(),
507 }],
508 },
509 Self::Validation { errors } => Self::Validation { errors: errors.clone() },
510 Self::Exec(e) => Self::Validation {
511 errors: vec![PackValidationError::DependsOnUnsatisfied {
512 pack: "<exec>".into(),
513 required: e.to_string(),
514 }],
515 },
516 Self::Halted(ctx) => Self::Validation {
517 errors: vec![PackValidationError::DependsOnUnsatisfied {
518 pack: ctx.pack.clone(),
519 required: format!(
520 "action #{} ({}): {}",
521 ctx.action_idx, ctx.action_name, ctx.error
522 ),
523 }],
524 },
525 Self::WorkspaceBusy { workspace, lock_path } => {
526 Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
527 }
528 Self::Lockfile { path, source } => Self::Validation {
529 errors: vec![PackValidationError::DependsOnUnsatisfied {
530 pack: "<lockfile>".into(),
531 required: format!("{}: {source}", path.display()),
532 }],
533 },
534 Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
535 errors: vec![PackValidationError::DependsOnUnsatisfied {
536 pack: "<only-glob>".into(),
537 required: format!("{pattern}: {source}"),
538 }],
539 },
540 Self::EventLogMigration(source) => Self::Validation {
541 errors: vec![PackValidationError::DependsOnUnsatisfied {
542 pack: "<event-log-migration>".into(),
543 required: source.to_string(),
544 }],
545 },
546 Self::SchedulerCancelled => Self::SchedulerCancelled,
547 }
548 }
549}
550
551/// Run a full sync over the pack tree rooted at `pack_root`.
552///
553/// Resolution rules:
554/// * If `pack_root` is a directory the walker looks for
555/// `<pack_root>/.grex/pack.yaml`.
556/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
557/// * Workspace defaults to the pack root directory itself when
558/// `opts.workspace` is `None`. Children resolve as flat siblings of the
559/// parent pack root (since v1.1.0).
560///
561/// # Errors
562///
563/// Returns the first error that halts the pipeline — see [`SyncError`] for
564/// the taxonomy.
565///
566/// `cancel` is the cooperative cancellation handle threaded through the
567/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
568/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
569/// acquire). CLI callers pass a never-cancelled sentinel
570/// (`CancellationToken::new()`); the MCP server passes a token tied to
571/// the request lifetime.
572pub fn run(
573 pack_root: &Path,
574 opts: &SyncOptions,
575 cancel: &CancellationToken,
576) -> Result<SyncReport, SyncError> {
577 // Stage 2 is signature-only — silence "unused parameter" without
578 // hiding it behind `_` (downstream stages will read it).
579 let _ = cancel;
580 let workspace = prepare_workspace(pack_root, opts)?;
581 let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
582 let _ws_guard = match ws_lock.try_acquire() {
583 Ok(Some(g)) => g,
584 Ok(None) => {
585 return Err(SyncError::WorkspaceBusy {
586 workspace: workspace.clone(),
587 lock_path: ws_lock_path,
588 });
589 }
590 Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
591 };
592
593 // Compile `--only` patterns into a GlobSet here so the
594 // `globset` crate version does not leak into `SyncOptions`.
595 let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
596
597 // Auto-migrate legacy `.grex/workspace/<name>/` layout BEFORE the
598 // walker resolves children. Idempotent: a fresh v1.1.0+ workspace
599 // sees no legacy directory and the function no-ops.
600 let workspace_migrations = migrate_legacy_workspace(pack_root);
601
602 // v1.2.1 path (iii) — three-stage composition:
603 // sync_meta(workspace, prune_candidates) — mutate (rayon parallel)
604 // build_graph(workspace) — read-only graph
605 // run_actions(graph) — consume graph
606 // `Walker::walk` is retired from the prod path; the symbol is kept
607 // for test-suite compat. See `crates/grex-core/src/tree/graph_build.rs`.
608 run_sync_meta(&workspace, opts)?;
609 let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
610 let prep = prepare_run_context(pack_root, &graph, &workspace)?;
611 log_force_flag(opts.force);
612
613 let mut report = SyncReport {
614 graph,
615 steps: Vec::new(),
616 halted: None,
617 event_log_warnings: Vec::new(),
618 pre_run_recovery: prep.pre_run_recovery,
619 workspace_migrations,
620 };
621
622 let mut next_lock = prep.prior_lock.clone();
623 // feat-m6 B1: resolve `--parallel` once and build the scheduler
624 // shared across every `ExecCtx` in this run. Library callers who
625 // leave `opts.parallel == None` default to `num_cpus::get()` here
626 // (clamped `>= 1`) so the scheduler slot is always populated —
627 // `ctx.scheduler` being `None` would strand acquire-sites into
628 // unbounded concurrency. See `.omne/cfg/concurrency.md` §Scheduler.
629 let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
630 let scheduler = Arc::new(Scheduler::new(resolved_parallel));
631 run_actions(
632 &mut report,
633 &prep.order,
634 &prep.vars,
635 &workspace,
636 &prep.event_log,
637 &prep.lock_path,
638 opts.dry_run,
639 &prep.prior_lock,
640 &mut next_lock,
641 &prep.registry,
642 &prep.pack_type_registry,
643 only_set.as_ref(),
644 opts.force,
645 resolved_parallel,
646 &scheduler,
647 );
648
649 persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
650 Ok(report)
651}
652
653/// Bag of context pieces assembled once at the top of [`run`]. Grouping
654/// them keeps [`run`] under the workspace's 50-LOC function lint without
655/// smearing the read of sequential setup across helpers. Fields are
656/// consumed piecemeal by the actions loop; no getters needed.
657struct RunContext {
658 order: Vec<usize>,
659 vars: VarEnv,
660 event_log: PathBuf,
661 lock_path: PathBuf,
662 lockfile_path: PathBuf,
663 prior_lock: std::collections::HashMap<String, LockEntry>,
664 registry: Arc<Registry>,
665 pack_type_registry: Arc<PackTypeRegistry>,
666 pre_run_recovery: Option<RecoveryReport>,
667}
668
669/// Build the per-run context: traversal order, vars env, event/lockfile
670/// paths, prior lockfile state, bootstrap registry, and (optionally) a
671/// pre-run recovery scan. Kept narrow so [`run`] stays small.
672///
673/// `workspace` is the resolved workspace directory (post `--workspace`
674/// override) so the recovery scan looks for `.grex.bak` artefacts under
675/// the actual on-disk location children were materialised at — not
676/// under the pack root, which differs from the workspace whenever the
677/// CLI's `--workspace` flag is used. Pre-fix this anchoring drift
678/// caused recovery scans to miss every backup left under an override
679/// workspace.
680fn prepare_run_context(
681 pack_root: &Path,
682 graph: &PackGraph,
683 workspace: &Path,
684) -> Result<RunContext, SyncError> {
685 let event_log = event_log_path(pack_root);
686 let lock_path = event_lock_path(&event_log);
687 let vars = VarEnv::from_os();
688 let order = post_order(graph);
689 let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
690 let lockfile_path = lockfile_path(pack_root);
691 let prior_lock = load_prior_lock(&lockfile_path)?;
692 let registry = Arc::new(Registry::bootstrap());
693 let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
694 Ok(RunContext {
695 order,
696 vars,
697 event_log,
698 lock_path,
699 lockfile_path,
700 prior_lock,
701 registry,
702 pack_type_registry,
703 pre_run_recovery,
704 })
705}
706
707/// Build the [`PackTypeRegistry`] the sync driver threads into every
708/// [`ExecCtx`] it constructs.
709///
710/// Default path (no `plugin-inventory` feature) hard-codes the three
711/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
712/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
713/// externally-submitted plugin types (mirroring the M4-E pattern for
714/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
715/// helper so the `#[cfg]` split lives in one place instead of being
716/// smeared across every sync call-site.
717fn bootstrap_pack_type_registry() -> PackTypeRegistry {
718 #[cfg(feature = "plugin-inventory")]
719 {
720 let mut reg = PackTypeRegistry::bootstrap();
721 reg.register_from_inventory();
722 reg
723 }
724 #[cfg(not(feature = "plugin-inventory"))]
725 {
726 PackTypeRegistry::bootstrap()
727 }
728}
729
730/// Emit a single `tracing::info!` line when `--force` is active so
731/// operators can confirm from logs that the skip short-circuit was
732/// bypassed. Extracted so [`run`] stays small.
733fn log_force_flag(force: bool) {
734 if force {
735 tracing::info!(
736 target: "grex::sync",
737 "--force active: bypassing lockfile skip-on-hash short-circuit"
738 );
739 }
740}
741
742/// v1.2.1 path (iii) — drive the v1.2.0 [`sync_meta`] walker over the
743/// resolved canonical workspace.
744///
745/// This is the SOLE mutating pass in `sync::run`: clones, fetches,
746/// prune dispatches, distributed-lockfile reads, and TOCTOU `BoundedDir`
747/// opens all happen here. The subsequent [`build_and_validate_graph`]
748/// pass is read-only against the disk state this fn leaves behind.
749///
750/// `prune_candidates` is computed from the per-meta lockfile orphans:
751/// every entry in `<workspace>/.grex/grex.lock.jsonl` whose `path` no
752/// longer appears in the live root manifest's `children[]` is fed into
753/// Phase 2 for dispatch (with `--force-prune` / `--force-prune-with-ignored`
754/// overrides honoured by the consent walk). This closes the
755/// "prune-inert" gap from the previous wiring, where `sync::run` passed
756/// `&[]` and `--force-prune` was a CLI flag with no behavioural reach.
757///
758/// `--workspace` semantics: the canonical `workspace` argument is what
759/// `sync_meta` uses as its `meta_dir`. Children land at
760/// `<workspace>/<child.path>` — the v1.2.0 parent-relative model. Prior
761/// to v1.2.1, callers passing `--workspace` skipped the precursor
762/// entirely; that bypass is retired here so override callers see the
763/// same v1.2.0 semantics as the default-cwd path.
764///
765/// `SyncOptions::parallel` mapping (mirrors [`SyncMetaOptions::parallel`]
766/// with the documented `Some(0)` carve-out):
767/// * `None` → `SyncMetaOptions::parallel = None` (rayon default =
768/// `num_cpus::get()`).
769/// * `Some(0)` → `SyncMetaOptions::parallel = None` (the CLI sentinel
770/// for "unbounded" maps to rayon's default; `Some(0)` would be
771/// clamped to `1` inside `build_pool`, which is not what callers
772/// asking for unbounded want).
773/// * `Some(n)` for `n >= 1` → `SyncMetaOptions::parallel = Some(n)`.
774fn run_sync_meta(workspace: &Path, opts: &SyncOptions) -> Result<(), SyncError> {
775 let loader = FsPackLoader::new();
776 let backend = GixBackend::new();
777 let parallel = match opts.parallel {
778 None | Some(0) => None,
779 Some(n) => Some(n),
780 };
781 // v1.2.1 Item 5b — resolve the quarantine config relative to the
782 // canonical workspace (the same `meta_dir` `sync_meta` runs on).
783 // Trash bucket lives at `<workspace>/.grex/trash/`; audit log at
784 // `<workspace>/.grex/events.jsonl` — same path the existing
785 // `ForcePruneExecuted` event uses.
786 let quarantine = opts.quarantine.then(|| crate::tree::QuarantineConfig {
787 trash_root: workspace.join(".grex").join("trash"),
788 audit_log: crate::manifest::event_log_path(workspace),
789 });
790 // v1.2.5 — thread `--retain-days N` into the per-meta options so
791 // every recursion frame swept its own trash bucket. `None` skips
792 // the GC entirely (v1.2.1 indefinite-retention).
793 let retention =
794 opts.retain_days.map(|retain_days| crate::tree::RetentionConfig { retain_days });
795 let meta_opts = SyncMetaOptions {
796 ref_override: opts.ref_override.clone(),
797 recurse: opts.recurse,
798 max_depth: opts.max_depth,
799 force_prune: opts.force_prune,
800 force_prune_with_ignored: opts.force_prune_with_ignored,
801 parallel,
802 quarantine,
803 retention,
804 };
805 let prune_candidates = compute_prune_candidates(workspace, &loader);
806 let report = sync_meta(workspace, &backend, &loader, &meta_opts, &prune_candidates)?;
807 if let Some(first) = report.errors.into_iter().next() {
808 return Err(SyncError::Tree(first));
809 }
810 Ok(())
811}
812
813/// v1.2.1 path (iii) — orphan-prune candidate computation.
814///
815/// Reads `<workspace>/.grex/grex.lock.jsonl` and the root manifest;
816/// returns every lockfile entry path that no longer matches a declared
817/// child in `manifest.children`. Empty in three cases:
818///
819/// * No lockfile (fresh workspace, never synced).
820/// * No manifest at `<workspace>/.grex/pack.yaml` (single-node tree —
821/// `sync_meta` will surface its own diagnostic).
822/// * Lockfile entries are all still declared (steady-state sync).
823///
824/// Lockfile read errors are tolerated as `Vec::new()`: the prune pass
825/// is opportunistic, and a corrupt lockfile is the migrator's concern,
826/// not the prune dispatcher's. Manifest read errors are similarly
827/// tolerated — `sync_meta` will fail loudly on the same condition,
828/// giving the operator a single unambiguous error surface.
829fn compute_prune_candidates(
830 workspace: &Path,
831 loader: &dyn crate::tree::PackLoader,
832) -> Vec<PathBuf> {
833 use crate::lockfile::read_meta_lockfile;
834 let entries = match read_meta_lockfile(workspace) {
835 Ok(e) => e,
836 Err(_) => return Vec::new(),
837 };
838 if entries.is_empty() {
839 return Vec::new();
840 }
841 let manifest = match loader.load(workspace) {
842 Ok(m) => m,
843 Err(_) => return Vec::new(),
844 };
845 let declared: std::collections::HashSet<String> =
846 manifest.children.iter().map(crate::pack::ChildRef::effective_path).collect();
847 entries
848 .into_iter()
849 .filter(|e| !declared.contains(&e.path))
850 .map(|e| PathBuf::from(e.path))
851 .collect()
852}
853
854/// v1.2.1 path (iii) — read-only graph build + plan-phase validation.
855///
856/// Builds the [`PackGraph`] from the on-disk meta tree rooted at
857/// `workspace`. Replaces the legacy `walk_and_validate` (which used
858/// [`crate::tree::Walker::walk`] and re-issued every clone/fetch as a
859/// no-op probe) with the v1.2.1 split:
860///
861/// * The mutating half ran in [`run_sync_meta`] — all clones, fetches,
862/// prune dispatches, and TOCTOU `BoundedDir` opens already happened.
863/// * THIS pass is strictly READ-ONLY. It walks the manifest tree
864/// parent-relatively (matching what `sync_meta` placed on disk),
865/// loads each child's `pack.yaml` (or synthesises a plain-git leaf),
866/// probes `head_sha`, and produces the [`PackGraph`] consumed by
867/// [`run_actions`].
868///
869/// Plan-phase validators run against the assembled graph when
870/// `validate` is true.
871fn build_and_validate_graph(
872 workspace: &Path,
873 validate: bool,
874 ref_override: Option<&str>,
875) -> Result<PackGraph, SyncError> {
876 let loader = FsPackLoader::new();
877 let backend = GixBackend::new();
878 let graph = build_graph(workspace, &backend, &loader, ref_override)?;
879 if validate {
880 validate_graph(&graph)?;
881 }
882 Ok(graph)
883}
884
885/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
886/// empty map; parse errors are fatal since writes are atomic and a torn
887/// lockfile therefore indicates real corruption that must be resolved
888/// before a fresh sync is safe. Parse/IO failures surface as
889/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
890/// dependency-satisfaction fault, so it gets its own taxonomy slot.
891fn load_prior_lock(
892 lockfile_path: &Path,
893) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
894 read_lockfile(lockfile_path)
895 .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
896}
897
898/// Persist `next_lock` atomically to `lockfile_path` whenever this was
899/// not a dry-run. On a halt the map has already had the halted pack's
900/// entry removed (see `run_actions`), so persisting now preserves every
901/// *successful* pack's fresh entry while guaranteeing absence of an
902/// entry for the halted pack — next sync sees no prior hash there and
903/// re-executes from scratch (route (b) halt-state gating). Write errors
904/// surface as non-fatal warnings on the report.
905fn persist_lockfile_if_clean(
906 report: &mut SyncReport,
907 lockfile_path: &Path,
908 next_lock: &std::collections::HashMap<String, LockEntry>,
909 dry_run: bool,
910) {
911 if dry_run {
912 return;
913 }
914 if let Err(e) = write_lockfile(lockfile_path, next_lock) {
915 tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
916 report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
917 }
918}
919
920/// Canonical location of the resolved-state lockfile
921/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
922/// so both audit artifacts live under a single `.grex/` sidecar.
923fn lockfile_path(pack_root: &Path) -> PathBuf {
924 pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
925}
926
927/// Create the workspace directory if it does not yet exist.
928fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
929 if !workspace.exists() {
930 std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
931 errors: vec![PackValidationError::DependsOnUnsatisfied {
932 pack: "<workspace>".into(),
933 required: format!("{}: {e}", workspace.display()),
934 }],
935 })?;
936 }
937 Ok(())
938}
939
940/// Open (but do not acquire) the workspace-level lock file.
941fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
942 let ws_lock_path = workspace_lock_path(workspace);
943 let ws_lock = ScopedLock::open(&ws_lock_path)
944 .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
945 Ok((ws_lock, ws_lock_path))
946}
947
948/// Build a `Validation` error describing a workspace-lock failure.
949fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
950 SyncError::Validation {
951 errors: vec![PackValidationError::DependsOnUnsatisfied {
952 pack: "<workspace-lock>".into(),
953 required: format!("{}: {reason}", ws_lock_path.display()),
954 }],
955 }
956}
957
958/// Single source of truth for the legacy workspace directory name.
959/// Pre-`v1.1.0` `resolve_workspace` joined `.grex/workspace/` onto the
960/// pack root by default; the auto-migration in
961/// [`migrate_legacy_workspace`] is the only place that legacy literal
962/// is allowed to appear in `crates/grex-core/src/`. The grep gate in
963/// the v1.1.0 release checklist allows this one constant.
964const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
965
966/// Auto-migrate any legacy `.grex/workspace/<name>/` child layout left
967/// over from v1.0.x to the v1.1.0 flat-sibling layout. Idempotent: a
968/// fresh workspace built on v1.1.0+ sees no `.grex/workspace/`
969/// directory and the function no-ops.
970///
971/// Per-child outcomes:
972///
973/// * **Both legacy + flat-sibling exist** → `SkippedBothExist`. The
974/// user needs to inspect (perhaps the legacy is stale, perhaps it is
975/// the source of truth); we never silently delete either.
976/// * **Flat-sibling slot occupied by a non-grex file or non-empty dir**
977/// → `SkippedDestOccupied`. Refuse to clobber user data.
978/// * **Legacy exists, flat-sibling absent** → `Migrated` via atomic
979/// `fs::rename`. Same-volume move is the common case (the migration
980/// stays inside `pack_root`); cross-volume failures surface as
981/// `Failed { error }` with the OS message so the operator can move
982/// manually.
983/// * **Legacy absent** → silent no-op (not recorded in the report).
984///
985/// After all per-child decisions: orphan `.grex.sync.lock` under the
986/// legacy workspace is removed (best-effort) and the empty
987/// `.grex/workspace/` directory is rmdir'd (best-effort). Both are
988/// soft-failures: leaving them on disk is harmless, surfacing the
989/// errors as a sync abort would be over-strict.
990///
991/// Discovery is by directory listing, not by parent-manifest parse —
992/// migration must work even when the parent manifest itself was
993/// rewritten between versions. A child counts as "legacy" iff
994/// `<pack_root>/<LEGACY_WORKSPACE_DIR>/<name>/.git` exists (i.e. it is
995/// an actual git working tree, not stray metadata).
996fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
997 let root = pack_root_dir(pack_root);
998 let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
999 if !legacy_root.is_dir() {
1000 return Vec::new();
1001 }
1002 let entries = match fs::read_dir(&legacy_root) {
1003 Ok(e) => e,
1004 Err(e) => {
1005 tracing::warn!(
1006 target: "grex::sync::migrate",
1007 "legacy workspace `{}` unreadable: {e}",
1008 legacy_root.display(),
1009 );
1010 return Vec::new();
1011 }
1012 };
1013 let mut migrations = Vec::new();
1014 for entry_result in entries {
1015 let entry = match entry_result {
1016 Ok(e) => e,
1017 Err(e) => {
1018 tracing::warn!(
1019 target: "grex::sync::migrate",
1020 "skipping unreadable entry under `{}`: {e}",
1021 legacy_root.display(),
1022 );
1023 continue;
1024 }
1025 };
1026 let Ok(ft) = entry.file_type() else { continue };
1027 // file_type avoids symlink-following; legitimate v1.0.x children
1028 // were always real directories, so anything else is skipped.
1029 if ft.is_symlink() || !ft.is_dir() {
1030 continue;
1031 }
1032 let name_os = entry.file_name();
1033 let Some(name) = name_os.to_str() else { continue };
1034 // Only act on entries that look like real cloned children (have
1035 // a `.git`). The legacy workspace lock file (`.grex.sync.lock`)
1036 // is not a directory and is filtered out by the dir check above;
1037 // we clean it up explicitly after the migration loop completes.
1038 let from_abs = entry.path();
1039 if !from_abs.join(".git").exists() {
1040 continue;
1041 }
1042 let to_abs = root.join(name);
1043 let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
1044 let to_rel = PathBuf::from(name);
1045 let outcome = decide_and_migrate(&from_abs, &to_abs);
1046 log_migration(&from_rel, &to_rel, &outcome);
1047 migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
1048 }
1049 cleanup_legacy_workspace_root(&legacy_root);
1050 migrations
1051}
1052
1053/// Decide what to do with one legacy child + perform the move when
1054/// safe. Returns the outcome to record on the [`WorkspaceMigration`].
1055fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
1056 let dest_exists = to.exists();
1057 let dest_is_grex_repo = dest_exists && to.join(".git").exists();
1058 if dest_is_grex_repo {
1059 // Both legacy and flat-sibling are git repos. Refuse to choose
1060 // between them; let the user resolve.
1061 return MigrationOutcome::SkippedBothExist;
1062 }
1063 if dest_exists {
1064 // Some other entry occupies the flat-sibling slot — a stray
1065 // file, an empty dir, an unrelated dir. Treat as user data and
1066 // leave both in place.
1067 return MigrationOutcome::SkippedDestOccupied;
1068 }
1069 match fs::rename(from, to) {
1070 Ok(()) => MigrationOutcome::Migrated,
1071 Err(e) => MigrationOutcome::Failed { error: e.to_string() },
1072 }
1073}
1074
1075/// Emit one structured log line per migration so users see exactly what
1076/// happened during the upgrade. Severity matches outcome: success is
1077/// `info`, skips and failures are `warn` so they surface in the default
1078/// log level without forcing operators to crank verbosity.
1079fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
1080 let from_disp = from.display();
1081 let to_disp = to.display();
1082 match outcome {
1083 MigrationOutcome::Migrated => {
1084 tracing::info!(
1085 target: "grex::sync::migrate",
1086 "migrated: legacy={from_disp} -> new={to_disp}",
1087 );
1088 }
1089 MigrationOutcome::SkippedBothExist => {
1090 tracing::warn!(
1091 target: "grex::sync::migrate",
1092 "skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
1093 );
1094 }
1095 MigrationOutcome::SkippedDestOccupied => {
1096 tracing::warn!(
1097 target: "grex::sync::migrate",
1098 "skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
1099 );
1100 }
1101 MigrationOutcome::Failed { error } => {
1102 tracing::warn!(
1103 target: "grex::sync::migrate",
1104 "failed: legacy={from_disp} -> new={to_disp}: {error}",
1105 );
1106 }
1107 }
1108}
1109
1110/// Best-effort cleanup of the legacy workspace root after migration:
1111/// remove the orphan `.grex.sync.lock` (always safe — the v1.1.0
1112/// workspace lock lives at `<pack_root>/.grex.sync.lock`) and try to
1113/// rmdir the now-empty `.grex/workspace/` directory. Errors are logged
1114/// at trace level only — both leftovers are harmless.
1115fn cleanup_legacy_workspace_root(legacy_root: &Path) {
1116 let orphan_lock = legacy_root.join(".grex.sync.lock");
1117 if orphan_lock.exists() {
1118 if let Err(e) = fs::remove_file(&orphan_lock) {
1119 tracing::warn!(
1120 target: "grex::sync::migrate",
1121 "could not remove orphan lock `{}`: {e}",
1122 orphan_lock.display(),
1123 );
1124 } else {
1125 tracing::info!(
1126 target: "grex::sync::migrate",
1127 "removed orphan lock `{}`",
1128 orphan_lock.display(),
1129 );
1130 }
1131 }
1132 // `remove_dir` only succeeds when the directory is empty — exactly
1133 // what we want; if any unmigrated child remains, the legacy root
1134 // stays put for the operator to inspect.
1135 let _ = fs::remove_dir(legacy_root);
1136}
1137
1138/// Compute the default workspace path when `override_` is absent.
1139///
1140/// The default is the pack root directory itself, so child packs
1141/// resolve as flat siblings of the parent pack root. The rationale —
1142/// alignment with the long-standing pack-spec rule that
1143/// `children[].path` is a bare name — lives in the pack-spec
1144/// "Validation rules" section (`man/concepts/pack-spec.md` /
1145/// `grex-doc/src/concepts/pack-spec.md`).
1146/// v1.2.1 path (iii) — resolve the workspace anchor with canonical
1147/// symlink resolution.
1148///
1149/// Resolution rules:
1150/// * `override_ = None` ⇒ derive workspace from `pack_root_dir(pack_root)`.
1151/// No canonicalize on this branch — the pack-root path was supplied
1152/// directly by the caller and may legitimately reference a not-yet-real
1153/// directory (e.g. integration fixtures that lazily materialise the
1154/// pack root).
1155/// * `override_ = Some(path)`:
1156/// 1. **Must-exist** check. A `--workspace` override pointing at a
1157/// non-existent directory is a fail-fast error (we won't silently
1158/// `mkdir -p` someone else's typo).
1159/// 2. **Canonicalise.** Resolve symlinks to a real path. This is the
1160/// anchor every downstream pass (`sync_meta`, `build_graph`, the
1161/// lockfile reads, the TOCTOU `BoundedDir` opens) hangs off — they
1162/// MUST agree on a single inode-stable string.
1163/// 3. **Log when input != canonical.** Surfaces symlink resolution to
1164/// operators so they can correlate workspace-busy diagnostics with
1165/// what the OS actually opened.
1166fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> Result<PathBuf, SyncError> {
1167 let Some(input) = override_ else {
1168 return Ok(pack_root_dir(pack_root));
1169 };
1170 if !input.exists() {
1171 return Err(SyncError::Validation {
1172 errors: vec![PackValidationError::DependsOnUnsatisfied {
1173 pack: "<workspace>".into(),
1174 required: format!("--workspace {}: directory does not exist", input.display()),
1175 }],
1176 });
1177 }
1178 let canonical = match input.canonicalize() {
1179 Ok(p) => p,
1180 Err(e) => {
1181 return Err(SyncError::Validation {
1182 errors: vec![PackValidationError::DependsOnUnsatisfied {
1183 pack: "<workspace>".into(),
1184 required: format!("--workspace {}: canonicalize failed: {e}", input.display()),
1185 }],
1186 });
1187 }
1188 };
1189 if canonical != input {
1190 tracing::info!(
1191 target: "grex::sync",
1192 "workspace: {} → {}",
1193 input.display(),
1194 canonical.display(),
1195 );
1196 }
1197 Ok(canonical)
1198}
1199
1200/// Resolve the workspace, ensure the directory exists, and run the v1→v2
1201/// event-log migration. Extracted so [`run`] and [`teardown`] stay under
1202/// the workspace's 50-LOC per-function lint threshold.
1203fn prepare_workspace(pack_root: &Path, opts: &SyncOptions) -> Result<PathBuf, SyncError> {
1204 let workspace = resolve_workspace(pack_root, opts.workspace.as_deref())?;
1205 ensure_workspace_dir(&workspace)?;
1206 crate::manifest::ensure_event_log_migrated(&workspace).map_err(SyncError::EventLogMigration)?;
1207 Ok(workspace)
1208}
1209
1210/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
1211fn pack_root_dir(pack_root: &Path) -> PathBuf {
1212 let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
1213 if is_yaml {
1214 pack_root
1215 .parent()
1216 .and_then(Path::parent)
1217 .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
1218 } else {
1219 pack_root.to_path_buf()
1220 }
1221}
1222
1223/// Compute the `.grex/events.jsonl` path next to the pack root.
1224///
1225/// Delegates to [`crate::manifest::event_log_path`] (single source of
1226/// truth for the canonical event-log location).
1227fn event_log_path(pack_root: &Path) -> PathBuf {
1228 crate::manifest::event_log_path(&pack_root_dir(pack_root))
1229}
1230
1231/// Compute the sidecar lock path next to the event log. One canonical slot
1232/// per pack root — cooperating grex procs serialize through this file.
1233fn event_lock_path(event_log: &Path) -> PathBuf {
1234 event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
1235}
1236
1237/// Compute the sidecar lock path for the workspace itself. Lives at
1238/// `<workspace>/.grex.sync.lock` — the workspace dir is already created by
1239/// the `run()` prologue, so the lock sidecar lands beside the child clones.
1240fn workspace_lock_path(workspace: &Path) -> PathBuf {
1241 workspace.join(".grex.sync.lock")
1242}
1243
1244/// Aggregate manifest-level + graph-level validators and return their output.
1245fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
1246 let mut errors: Vec<PackValidationError> = Vec::new();
1247 for node in graph.nodes() {
1248 if let Err(mut e) = node.manifest.validate_plan() {
1249 errors.append(&mut e);
1250 }
1251 }
1252 if let Err(mut e) = graph.validate() {
1253 errors.append(&mut e);
1254 }
1255 if errors.is_empty() {
1256 Ok(())
1257 } else {
1258 Err(SyncError::Validation { errors })
1259 }
1260}
1261
1262/// Depth-first post-order traversal of the graph starting from root.
1263///
1264/// Children fully precede their parent in the returned vector so downstream
1265/// executors install leaves first and the root last.
1266fn post_order(graph: &PackGraph) -> Vec<usize> {
1267 let mut out = Vec::with_capacity(graph.nodes().len());
1268 visit_post(graph, 0, &mut out);
1269 out
1270}
1271
1272fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
1273 // Collect child ids first to avoid borrow conflicts with graph iteration.
1274 let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
1275 for k in kids {
1276 visit_post(graph, k, out);
1277 }
1278 out.push(id);
1279}
1280
1281/// Drive every action for every node; abort on the first [`ExecError`].
1282///
1283/// Each action is bracketed by three manifest events:
1284/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
1285/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
1286/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
1287///
1288/// All three writes go through the same [`ManifestLock`]-wrapped path
1289/// ([`append_manifest_event`]) and failures are recorded as non-fatal
1290/// warnings so the executor's outcome always dominates. The third append
1291/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
1292/// with the exact action that halted.
1293// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
1294// now pushes past the 50-LOC per-function lint by one line. Silence
1295// that one — the body itself is unchanged in scope.
1296#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
1297fn run_actions(
1298 report: &mut SyncReport,
1299 order: &[usize],
1300 vars: &VarEnv,
1301 workspace: &Path,
1302 event_log: &Path,
1303 lock_path: &Path,
1304 dry_run: bool,
1305 prior_lock: &std::collections::HashMap<String, LockEntry>,
1306 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1307 registry: &Arc<Registry>,
1308 pack_type_registry: &Arc<PackTypeRegistry>,
1309 only: Option<&GlobSet>,
1310 force: bool,
1311 parallel: usize,
1312 scheduler: &Arc<Scheduler>,
1313) {
1314 let plan = PlanExecutor::with_registry(registry.clone());
1315 let fs = FsExecutor::with_registry(registry.clone());
1316 let rt = build_pack_type_runtime(parallel);
1317 let visited_meta = new_visited_meta();
1318 for &id in order {
1319 let Some(node) = report.graph.node(id) else { continue };
1320 let pack_name = node.name.clone();
1321 let pack_path = node.path.clone();
1322 let actions = node.manifest.actions.clone();
1323 let manifest = node.manifest.clone();
1324 let commit_sha = node.commit_sha.clone().unwrap_or_default();
1325 let synthetic = node.synthetic;
1326 // `--only` filter + skip-on-hash short-circuits colocated in
1327 // `try_skip_or_filter` so this outer loop stays within the
1328 // 50-LOC per-function budget.
1329 if try_skip_or_filter(
1330 report,
1331 only,
1332 &pack_name,
1333 &pack_path,
1334 &actions,
1335 &commit_sha,
1336 synthetic,
1337 workspace,
1338 prior_lock,
1339 next_lock,
1340 dry_run,
1341 force,
1342 ) {
1343 continue;
1344 }
1345 let pack_halted = run_pack_lifecycle(
1346 report,
1347 vars,
1348 workspace,
1349 event_log,
1350 lock_path,
1351 dry_run,
1352 &plan,
1353 &fs,
1354 registry,
1355 pack_type_registry,
1356 &rt,
1357 &pack_name,
1358 &pack_path,
1359 &manifest,
1360 &visited_meta,
1361 scheduler,
1362 );
1363 if pack_halted {
1364 // Route (b) halt-state gating: drop any prior entry for the
1365 // halted pack so the next sync sees no prior hash and
1366 // re-executes from scratch. Successful packs in this same
1367 // run keep their freshly-upserted entries, and packs we did
1368 // not reach keep their prior entries untouched.
1369 next_lock.remove(&pack_name);
1370 return;
1371 }
1372 // Successful pack — record a fresh lockfile entry so the next
1373 // run's skip-on-hash test can succeed. Commit SHA is now plumbed
1374 // from the walker (M4-D): `PackNode::commit_sha` carries the
1375 // resolved HEAD SHA when the pack's working tree is a git
1376 // repository, otherwise an empty string keeps the hash stable.
1377 let actions_hash = compute_actions_hash(&actions, &commit_sha);
1378 upsert_lock_entry(prior_lock, next_lock, &pack_name, &commit_sha, &actions_hash, synthetic);
1379 }
1380}
1381
1382/// Build the multi-thread tokio runtime used to drive async pack-type
1383/// plugin dispatch. Pack-type plugins expose `async fn` methods via
1384/// `async_trait`, but the sync driver is synchronous end-to-end — we
1385/// block on each plugin future inside the outer action loop. Extracted
1386/// into a standalone helper so the runtime construction does not
1387/// inflate `run_actions` beyond the 50-LOC per-function budget.
1388///
1389/// # Multi-thread rationale (M5-2c)
1390///
1391/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
1392/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
1393/// itself is purely `async` / `.await` (no nested `block_on`), but future
1394/// plugin authors may reasonably compose `block_on` calls inside
1395/// lifecycle hooks — and external callers that drive `MetaPlugin` via
1396/// `rt.block_on(...)` within their own runtime would deadlock on a
1397/// current-thread runtime the moment a hook re-enters. A multi-thread
1398/// runtime with a small worker pool lets those re-entries resolve on a
1399/// sibling worker instead of blocking the dispatcher thread.
1400///
1401/// # Worker-thread sizing (feat-m6 H6)
1402///
1403/// The worker pool is sized from the resolved `--parallel` knob so the
1404/// runtime always has enough workers to service every in-flight pack op
1405/// plus at least one sibling for nested `block_on`. Clamped to
1406/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
1407/// one sibling so re-entrant hooks never deadlock), and the upper bound
1408/// caps the pool at the host's CPU count so `--parallel 0`
1409/// (unbounded-semantics) does not explode the worker count.
1410fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
1411 let workers = parallel.clamp(2, num_cpus::get().max(2));
1412 tokio::runtime::Builder::new_multi_thread()
1413 .worker_threads(workers)
1414 .enable_all()
1415 .build()
1416 .expect("tokio runtime for pack-type dispatch")
1417}
1418
1419/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
1420/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
1421/// the argument is threaded through so future explicit-install /
1422/// teardown verbs can share the same set shape.
1423fn new_visited_meta() -> MetaVisitedSet {
1424 std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
1425}
1426
1427/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
1428/// `true` when the outer loop should `continue` for this pack.
1429///
1430/// Extracted from `run_actions` so that function stays under the
1431/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
1432/// is a pure structural refactor.
1433#[allow(clippy::too_many_arguments)]
1434fn try_skip_or_filter(
1435 report: &mut SyncReport,
1436 only: Option<&GlobSet>,
1437 pack_name: &str,
1438 pack_path: &Path,
1439 actions: &[Action],
1440 commit_sha: &str,
1441 current_synthetic: bool,
1442 workspace: &Path,
1443 prior_lock: &std::collections::HashMap<String, LockEntry>,
1444 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1445 dry_run: bool,
1446 force: bool,
1447) -> bool {
1448 if skip_for_only_filter(only, pack_name, pack_path, workspace) {
1449 if let Some(prev) = prior_lock.get(pack_name) {
1450 next_lock.insert(pack_name.to_string(), prev.clone());
1451 }
1452 return true;
1453 }
1454 try_skip_pack(
1455 report,
1456 pack_name,
1457 pack_path,
1458 actions,
1459 commit_sha,
1460 current_synthetic,
1461 prior_lock,
1462 next_lock,
1463 dry_run,
1464 force,
1465 )
1466}
1467
1468/// Return `true` when `--only` is active and the pack's
1469/// **workspace-relative path** (normalized to forward-slash form) does
1470/// not match any of the registered globs. Name-fallback matching was
1471/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
1472/// "pack paths" and cross-platform consistency requires a single
1473/// normalized representation rather than `display()`-formatted strings
1474/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
1475/// as a glob-escape, not a path separator). For the root pack whose
1476/// `pack_path` is not under `workspace`, the fallback is to match
1477/// against the absolute path's forward-slash form.
1478fn skip_for_only_filter(
1479 only: Option<&GlobSet>,
1480 pack_name: &str,
1481 pack_path: &Path,
1482 workspace: &Path,
1483) -> bool {
1484 let Some(set) = only else { return false };
1485 let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
1486 let rel_str = rel.to_string_lossy().replace('\\', "/");
1487 let matches = set.is_match(&rel_str);
1488 if !matches {
1489 tracing::info!(
1490 target: "grex::sync",
1491 "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
1492 );
1493 }
1494 !matches
1495}
1496
1497/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
1498///
1499/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
1500/// with a pack-type-aware dispatch:
1501///
1502/// * [`PackType::Declarative`] retains the per-action execution shape that
1503/// M4 shipped — each action lands its own `ActionStarted` /
1504/// `ActionCompleted` / `ActionHalted` event bracket. The registry is
1505/// still consulted via [`PackTypeRegistry::get`] as a name-oracle so
1506/// mistyped packs fail closed.
1507/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
1508/// pack-type plugin's `sync` method (the sync CLI verb is the only
1509/// caller in M5-1; `install` / `update` / `teardown` verbs wire in
1510/// M5-2), returning a single aggregate [`ExecStep`]. A single event
1511/// bracket frames the async call.
1512///
1513/// Declarative is kept on the legacy per-action path because its event log
1514/// semantics (one event per action, per-step rollback context) are exactly
1515/// what plugin authors expect to observe. Unifying declarative under the
1516/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
1517/// to emit a step stream rather than a single aggregate.
1518#[allow(clippy::too_many_arguments)]
1519fn run_pack_lifecycle(
1520 report: &mut SyncReport,
1521 vars: &VarEnv,
1522 workspace: &Path,
1523 event_log: &Path,
1524 lock_path: &Path,
1525 dry_run: bool,
1526 plan: &PlanExecutor,
1527 fs: &FsExecutor,
1528 registry: &Arc<Registry>,
1529 pack_type_registry: &Arc<PackTypeRegistry>,
1530 rt: &tokio::runtime::Runtime,
1531 pack_name: &str,
1532 pack_path: &Path,
1533 manifest: &crate::pack::PackManifest,
1534 visited_meta: &MetaVisitedSet,
1535 scheduler: &Arc<Scheduler>,
1536) -> bool {
1537 let type_tag = manifest.r#type.as_str();
1538 // Name-oracle check: every pack type must be registered. Unknown
1539 // pack types halt the pack the same way M4 halted unknown actions.
1540 if pack_type_registry.get(type_tag).is_none() {
1541 let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1542 record_action_err(report, event_log, lock_path, pack_name, 0, "pack-type", err);
1543 return true;
1544 }
1545 match manifest.r#type {
1546 crate::pack::PackType::Declarative => run_declarative_actions(
1547 report,
1548 vars,
1549 workspace,
1550 event_log,
1551 lock_path,
1552 dry_run,
1553 plan,
1554 fs,
1555 pack_name,
1556 pack_path,
1557 manifest,
1558 &manifest.actions,
1559 scheduler,
1560 ),
1561 crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
1562 report,
1563 vars,
1564 workspace,
1565 event_log,
1566 lock_path,
1567 registry,
1568 pack_type_registry,
1569 rt,
1570 pack_name,
1571 pack_path,
1572 manifest,
1573 type_tag,
1574 visited_meta,
1575 scheduler,
1576 ),
1577 }
1578}
1579
1580/// Run a declarative pack's actions sequentially. Preserves the M4
1581/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1582/// `ActionHalted`). Returns `true` when the sync must halt.
1583#[allow(clippy::too_many_arguments)]
1584fn run_declarative_actions(
1585 report: &mut SyncReport,
1586 vars: &VarEnv,
1587 workspace: &Path,
1588 event_log: &Path,
1589 lock_path: &Path,
1590 dry_run: bool,
1591 plan: &PlanExecutor,
1592 fs: &FsExecutor,
1593 pack_name: &str,
1594 pack_path: &Path,
1595 manifest: &crate::pack::PackManifest,
1596 actions: &[Action],
1597 scheduler: &Arc<Scheduler>,
1598) -> bool {
1599 // `apply_gitignore` is called per-lifecycle by each PackTypePlugin
1600 // for meta/scripted, and here for declarative (which bypasses the
1601 // plugin in `sync::run`'s per-action driver). Keeping plugins as
1602 // the single apply site everywhere else means the declarative
1603 // per-action path is the only code outside the PackTypePlugin
1604 // surface that needs a direct apply call.
1605 if !dry_run {
1606 let ctx = ExecCtx::new(vars, pack_path, workspace)
1607 .with_platform(Platform::current())
1608 .with_scheduler(scheduler);
1609 if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1610 record_action_err(report, event_log, lock_path, pack_name, 0, "gitignore", e);
1611 return true;
1612 }
1613 }
1614 for (idx, action) in actions.iter().enumerate() {
1615 let ctx = ExecCtx::new(vars, pack_path, workspace)
1616 .with_platform(Platform::current())
1617 .with_scheduler(scheduler);
1618 let action_tag = action_kind_tag(action);
1619 append_manifest_event(
1620 event_log,
1621 lock_path,
1622 &Event::ActionStarted {
1623 ts: Utc::now(),
1624 pack: pack_name.to_string(),
1625 action_idx: idx,
1626 action_name: action_tag.to_string(),
1627 },
1628 &mut report.event_log_warnings,
1629 );
1630 let step_result =
1631 if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1632 if !record_action_outcome(
1633 report,
1634 event_log,
1635 lock_path,
1636 pack_name,
1637 idx,
1638 action_tag,
1639 step_result,
1640 ) {
1641 return true;
1642 }
1643 }
1644 false
1645}
1646
1647/// Dispatch a pack-type plugin (meta / scripted) through the async
1648/// registry. Brackets the call with a single `ActionStarted` /
1649/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1650/// when the sync must halt.
1651#[allow(clippy::too_many_arguments)]
1652fn dispatch_pack_type_plugin(
1653 report: &mut SyncReport,
1654 vars: &VarEnv,
1655 workspace: &Path,
1656 event_log: &Path,
1657 lock_path: &Path,
1658 registry: &Arc<Registry>,
1659 pack_type_registry: &Arc<PackTypeRegistry>,
1660 rt: &tokio::runtime::Runtime,
1661 pack_name: &str,
1662 pack_path: &Path,
1663 manifest: &crate::pack::PackManifest,
1664 type_tag: &'static str,
1665 visited_meta: &MetaVisitedSet,
1666 scheduler: &Arc<Scheduler>,
1667) -> bool {
1668 // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1669 // The sync driver already walks children in post-order via the tree
1670 // walker; attaching the visited set would trigger MetaPlugin's
1671 // real-recursion branch and cause double dispatch (walker runs child
1672 // packs as their own graph nodes, then MetaPlugin would recurse into
1673 // them again). The `visited_meta` parameter is kept on the argument
1674 // list so future explicit-install / teardown verbs that invoke
1675 // MetaPlugin directly can share the same set shape.
1676 let _ = visited_meta;
1677 let ctx = ExecCtx::new(vars, pack_path, workspace)
1678 .with_platform(Platform::current())
1679 .with_registry(registry)
1680 .with_pack_type_registry(pack_type_registry)
1681 .with_scheduler(scheduler);
1682 append_manifest_event(
1683 event_log,
1684 lock_path,
1685 &Event::ActionStarted {
1686 ts: Utc::now(),
1687 pack: pack_name.to_string(),
1688 action_idx: 0,
1689 action_name: type_tag.to_string(),
1690 },
1691 &mut report.event_log_warnings,
1692 );
1693 // SAFETY: `get` just confirmed the plugin is registered for
1694 // `type_tag`, so this unwrap cannot panic under the matched arm.
1695 let plugin = pack_type_registry
1696 .get(type_tag)
1697 .expect("pack-type plugin must be registered (guarded above)");
1698 // feat-m6 CI fix — establish a task-local tier stack frame for every
1699 // async dispatch. Without this, `TierGuard::push` (which runs inside
1700 // the plugin lifecycle and may span `.await` / thread hops under the
1701 // multi-thread runtime) has no enforcement frame to push into.
1702 let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1703 !record_action_outcome(report, event_log, lock_path, pack_name, 0, type_tag, step_result)
1704}
1705
1706/// Pure skip-eligibility decision. Returns `Some(hash)` when the pack
1707/// is eligible for the hash-skip short-circuit, `None` otherwise.
1708///
1709/// Splitting the decision out of [`try_skip_pack`] keeps the
1710/// side-effecting transcript bookkeeping testable in isolation: the
1711/// v1.1.1 synthetic-flag-flip regression exercises this helper without
1712/// having to stand up a `SyncReport` / `PackGraph`.
1713fn skip_eligibility(
1714 actions: &[Action],
1715 commit_sha: &str,
1716 current_synthetic: bool,
1717 prior: &LockEntry,
1718 dry_run: bool,
1719 force: bool,
1720) -> Option<String> {
1721 if dry_run || force {
1722 // Dry runs must always produce the planned-step transcript so
1723 // authors can see what `sync` *would* do. `--force` is the
1724 // operator's explicit opt-out from the hash short-circuit.
1725 return None;
1726 }
1727 let hash = compute_actions_hash(actions, commit_sha);
1728 if prior.actions_hash != hash {
1729 return None;
1730 }
1731 if prior.synthetic != current_synthetic {
1732 // Pack-shape flipped between runs (real ↔ synthetic). Even
1733 // when the actions hash matches by coincidence (e.g. a
1734 // declarative pack with empty `actions[]` whose pack.yaml was
1735 // deleted, falling through to a synthetic leaf with the same
1736 // empty actions list and stable commit SHA), we must NOT
1737 // carry the stale `synthetic` flag forward. Forcing the
1738 // upsert path re-emits the entry with the current flag.
1739 return None;
1740 }
1741 Some(hash)
1742}
1743
1744/// Decide whether `pack_name` can be short-circuited via a lockfile
1745/// hash match. When the prior hash matches the freshly-computed hash,
1746/// emit a single [`ExecResult::Skipped`] step and carry the prior
1747/// lockfile entry forward unchanged. Returns `true` when the pack was
1748/// skipped.
1749///
1750/// `current_synthetic` is the walker-derived synthetic flag for this
1751/// pack on the current run. The skip eligibility check requires it to
1752/// match `prior.synthetic` so a pack-shape transition (e.g. user
1753/// deletes `pack.yaml` so a previously-real pack now walks as
1754/// synthetic) invalidates the skip and forces the lockfile entry to
1755/// be re-emitted with the fresh `synthetic` value.
1756#[allow(clippy::too_many_arguments)]
1757fn try_skip_pack(
1758 report: &mut SyncReport,
1759 pack_name: &str,
1760 pack_path: &Path,
1761 actions: &[Action],
1762 commit_sha: &str,
1763 current_synthetic: bool,
1764 prior_lock: &std::collections::HashMap<String, LockEntry>,
1765 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1766 dry_run: bool,
1767 force: bool,
1768) -> bool {
1769 let Some(prior) = prior_lock.get(pack_name) else {
1770 return false;
1771 };
1772 let Some(hash) =
1773 skip_eligibility(actions, commit_sha, current_synthetic, prior, dry_run, force)
1774 else {
1775 return false;
1776 };
1777 let skipped_step = ExecStep {
1778 action_name: Cow::Borrowed("pack"),
1779 result: ExecResult::Skipped {
1780 pack_path: pack_path.to_path_buf(),
1781 actions_hash: hash.clone(),
1782 },
1783 // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1784 // short-circuit detail; we use it here instead of the prior
1785 // `Require { Satisfied, Skip }` proxy so renderers and consumers
1786 // can match on a single, purpose-built variant.
1787 details: StepKind::PackSkipped { actions_hash: hash },
1788 };
1789 report.steps.push(SyncStep {
1790 pack: pack_name.to_string(),
1791 action_idx: 0,
1792 exec_step: skipped_step,
1793 });
1794 // Carry the prior entry forward so the next-lock snapshot stays
1795 // consistent with what's on disk.
1796 next_lock.insert(pack_name.to_string(), prior.clone());
1797 true
1798}
1799
1800/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1801///
1802/// Stores `commit_sha` verbatim — including the empty string when the
1803/// pack is not a git working tree or the HEAD probe failed.
1804/// `actions_hash` is computed over the same `commit_sha`, so the two
1805/// fields stay internally consistent: if probing starts returning a
1806/// non-empty SHA on the next run, the hash differs and the skip is
1807/// correctly invalidated. The prior-preserve carve-out that was
1808/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1809/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1810///
1811/// `prior_lock` is consulted purely for observability: when a
1812/// previously-real pack flips to synthetic between runs (user deleted
1813/// the pack's `pack.yaml` so the walker fell back to v1.1.1
1814/// plain-git-child synthesis), a `tracing::warn!` records the
1815/// downgrade so the operator notices their declarative actions have
1816/// stopped running.
1817fn upsert_lock_entry(
1818 prior_lock: &std::collections::HashMap<String, LockEntry>,
1819 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1820 pack_name: &str,
1821 commit_sha: &str,
1822 actions_hash: &str,
1823 synthetic: bool,
1824) {
1825 if synthetic {
1826 if let Some(prior) = prior_lock.get(pack_name) {
1827 if !prior.synthetic {
1828 tracing::warn!(
1829 target: "grex::sync",
1830 pack = pack_name,
1831 "pack `{pack_name}` downgraded from real to synthetic — \
1832 pack.yaml missing on disk; only `git pull` will run going forward",
1833 );
1834 }
1835 }
1836 }
1837 let installed_at = Utc::now();
1838 let entry = next_lock.get(pack_name).map_or_else(
1839 || LockEntry {
1840 id: pack_name.to_string(),
1841 // v1.1.1 convention: path == id (1:1 id↔folder). Stage 1.e
1842 // (walker rewrite) will replace this with the parent-relative
1843 // manifest path captured during the walk.
1844 path: pack_name.to_string(),
1845 sha: commit_sha.to_string(),
1846 branch: String::new(),
1847 installed_at,
1848 actions_hash: actions_hash.to_string(),
1849 schema_version: "1".to_string(),
1850 synthetic,
1851 },
1852 |prev| LockEntry {
1853 installed_at,
1854 actions_hash: actions_hash.to_string(),
1855 sha: commit_sha.to_string(),
1856 synthetic,
1857 ..prev.clone()
1858 },
1859 );
1860 next_lock.insert(pack_name.to_string(), entry);
1861}
1862
1863/// Record one action outcome into `report` + event log. Returns `false`
1864/// when the run must halt (on error); `true` otherwise.
1865fn record_action_outcome(
1866 report: &mut SyncReport,
1867 event_log: &Path,
1868 lock_path: &Path,
1869 pack_name: &str,
1870 idx: usize,
1871 action_tag: &'static str,
1872 step_result: Result<ExecStep, ExecError>,
1873) -> bool {
1874 match step_result {
1875 Ok(step) => {
1876 record_action_ok(report, event_log, lock_path, pack_name, idx, step);
1877 true
1878 }
1879 Err(e) => {
1880 record_action_err(report, event_log, lock_path, pack_name, idx, action_tag, e);
1881 false
1882 }
1883 }
1884}
1885
1886/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1887/// audit event, then push the step onto the report.
1888fn record_action_ok(
1889 report: &mut SyncReport,
1890 event_log: &Path,
1891 lock_path: &Path,
1892 pack_name: &str,
1893 idx: usize,
1894 step: ExecStep,
1895) {
1896 append_step_event(event_log, lock_path, pack_name, &step, &mut report.event_log_warnings);
1897 append_manifest_event(
1898 event_log,
1899 lock_path,
1900 &Event::ActionCompleted {
1901 ts: Utc::now(),
1902 pack: pack_name.to_string(),
1903 action_idx: idx,
1904 result_summary: format!("{:?}", step.result),
1905 },
1906 &mut report.event_log_warnings,
1907 );
1908 report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1909}
1910
1911/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
1912/// rich `HaltedContext` into `report.halted`.
1913fn record_action_err(
1914 report: &mut SyncReport,
1915 event_log: &Path,
1916 lock_path: &Path,
1917 pack_name: &str,
1918 idx: usize,
1919 action_tag: &'static str,
1920 e: ExecError,
1921) {
1922 let error_summary = truncate_error_summary(&e);
1923 append_manifest_event(
1924 event_log,
1925 lock_path,
1926 &Event::ActionHalted {
1927 ts: Utc::now(),
1928 pack: pack_name.to_string(),
1929 action_idx: idx,
1930 action_name: action_tag.to_string(),
1931 error_summary,
1932 },
1933 &mut report.event_log_warnings,
1934 );
1935 let recovery_hint = recovery_hint_for(&e);
1936 report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
1937 pack: pack_name.to_string(),
1938 action_idx: idx,
1939 action_name: action_tag.to_string(),
1940 error: e,
1941 recovery_hint,
1942 })));
1943}
1944
1945/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
1946/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
1947/// stays uniform.
1948fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
1949 use crate::pack::Action;
1950 match action {
1951 Action::Symlink(_) => "symlink",
1952 Action::Unlink(_) => "unlink",
1953 Action::Env(_) => "env",
1954 Action::Mkdir(_) => "mkdir",
1955 Action::Rmdir(_) => "rmdir",
1956 Action::Require(_) => "require",
1957 Action::When(_) => "when",
1958 Action::Exec(_) => "exec",
1959 }
1960}
1961
1962/// Produce a bounded human summary of an [`ExecError`] for
1963/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
1964/// from pathological blowup when captured stderr is large.
1965fn truncate_error_summary(err: &ExecError) -> String {
1966 let mut s = err.to_string();
1967 if s.len() > ACTION_ERROR_SUMMARY_MAX {
1968 s.truncate(ACTION_ERROR_SUMMARY_MAX);
1969 s.push_str("…[truncated]");
1970 }
1971 s
1972}
1973
1974/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
1975/// `None` when no generic advice applies; the error's own `Display`
1976/// output is already shown by the `Halted` variant's format string.
1977fn recovery_hint_for(err: &ExecError) -> Option<String> {
1978 match err {
1979 ExecError::SymlinkDestOccupied { .. } => Some(
1980 "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
1981 .into(),
1982 ),
1983 ExecError::SymlinkPrivilegeDenied { .. } => {
1984 Some("enable Windows Developer Mode or re-run grex as administrator".into())
1985 }
1986 ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
1987 Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
1988 }
1989 ExecError::RmdirNotEmpty { .. } => {
1990 Some("set `force: true` on the rmdir action to recurse".into())
1991 }
1992 ExecError::EnvPersistenceDenied { .. } => {
1993 Some("re-run elevated (Machine scope needs admin)".into())
1994 }
1995 _ => None,
1996 }
1997}
1998
1999/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
2000///
2001/// Failures log a warning and are recorded in the report's
2002/// `event_log_warnings`; they do not abort the sync (spec: event-log write
2003/// failures are non-fatal).
2004///
2005/// # Concurrency
2006///
2007/// The append is serialized through a [`ManifestLock`] held across the
2008/// write. The lock is acquired **per action** (not once across the full
2009/// traversal) so cooperating grex processes can observe mid-progress log
2010/// state between actions; fd-lock acquisition is cheap on modern kernels
2011/// and sync runs are dominated by executor side effects, not lock waits.
2012/// This closes the bypass gap surfaced by the M3 concurrency review where
2013/// `append_event` was called without any cross-process serialisation.
2014fn append_step_event(
2015 log: &Path,
2016 lock_path: &Path,
2017 pack: &str,
2018 step: &ExecStep,
2019 warnings: &mut Vec<String>,
2020) {
2021 let summary = format!("{}:{:?}", step.action_name, step.result);
2022 let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
2023 if let Err(e) = append_event_locked(log, lock_path, &event) {
2024 tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2025 warnings.push(format!("{}: {e}", log.display()));
2026 }
2027 // Schema version is recorded once at the manifest level by existing
2028 // manifest code; this stub uses the constant to keep a single source of
2029 // truth for forward-compat.
2030 let _ = SCHEMA_VERSION;
2031}
2032
2033/// Append a single [`Event`] under the shared [`ManifestLock`] path.
2034/// Failures are logged and recorded as non-fatal warnings — the spec
2035/// marks event-log write failures as non-aborting so a transient disk
2036/// error must not kill a sync mid-stream.
2037fn append_manifest_event(log: &Path, lock_path: &Path, event: &Event, warnings: &mut Vec<String>) {
2038 if let Err(e) = append_event_locked(log, lock_path, event) {
2039 tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
2040 warnings.push(format!("{}: {e}", log.display()));
2041 }
2042}
2043
2044/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
2045/// created lazily on first write.
2046fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
2047 if let Some(parent) = log.parent() {
2048 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2049 }
2050 if let Some(parent) = lock_path.parent() {
2051 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
2052 }
2053 let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
2054 lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
2055}
2056
2057/// Re-export a cheap helper so CLI renderers can label halted steps by node
2058/// name without reaching into the graph twice.
2059#[must_use]
2060pub fn pack_display_name(node: &PackNode) -> &str {
2061 &node.name
2062}
2063
2064/// Run a full teardown over the pack tree rooted at `pack_root`.
2065///
2066/// Mirrors [`run`] but invokes
2067/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
2068/// **reverse** post-order so a parent tears down before its children
2069/// (the inverse of install). Children composed later by an author
2070/// consequently teardown earlier, matching the declarative
2071/// auto-reverse contract (R-M5-11).
2072///
2073/// All other concerns are identical to [`run`]: workspace lock, plan-
2074/// phase validators, lockfile update skipped (teardown does not
2075/// write a `actions_hash` forward), and event-log bracketing.
2076/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
2077/// user explicitly asked to remove the pack, so we always dispatch.
2078///
2079/// # Errors
2080///
2081/// Returns the first error that halts the pipeline — see [`SyncError`].
2082///
2083/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
2084/// the parameter through teardown for parity; stages 3-4 add the polls.
2085pub fn teardown(
2086 pack_root: &Path,
2087 opts: &SyncOptions,
2088 cancel: &CancellationToken,
2089) -> Result<SyncReport, SyncError> {
2090 let _ = cancel;
2091 let workspace = prepare_workspace(pack_root, opts)?;
2092 let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
2093 let _ws_guard = match ws_lock.try_acquire() {
2094 Ok(Some(g)) => g,
2095 Ok(None) => {
2096 return Err(SyncError::WorkspaceBusy {
2097 workspace: workspace.clone(),
2098 lock_path: ws_lock_path,
2099 });
2100 }
2101 Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
2102 };
2103
2104 // v1.2.1 path (iii) — teardown is read-only against the existing
2105 // disk state (no clones / fetches / prunes). It only needs the
2106 // graph build pass; `sync_meta` is intentionally skipped here.
2107 let graph = build_and_validate_graph(&workspace, opts.validate, opts.ref_override.as_deref())?;
2108 let prep = prepare_run_context(pack_root, &graph, &workspace)?;
2109
2110 let mut report = SyncReport {
2111 graph,
2112 steps: Vec::new(),
2113 halted: None,
2114 event_log_warnings: Vec::new(),
2115 pre_run_recovery: prep.pre_run_recovery,
2116 // teardown does not run the legacy-layout migration — by the time
2117 // a user is tearing down, the layout has already been migrated
2118 // (or was never legacy in the first place). Surfacing an empty
2119 // list keeps the report shape symmetric with `run()`.
2120 workspace_migrations: Vec::new(),
2121 };
2122
2123 // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
2124 // Scheduler, thread it through every `ExecCtx` the teardown path
2125 // constructs. Teardown is the other user-facing verb that owns a
2126 // runtime, so it gets the same wiring.
2127 let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
2128 let scheduler = Arc::new(Scheduler::new(resolved_parallel));
2129 run_teardown(
2130 &mut report,
2131 &prep.order,
2132 &prep.vars,
2133 &workspace,
2134 &prep.event_log,
2135 &prep.lock_path,
2136 &prep.registry,
2137 &prep.pack_type_registry,
2138 resolved_parallel,
2139 &scheduler,
2140 );
2141 Ok(report)
2142}
2143
2144/// Dispatch `teardown` for every pack in **reverse** post-order.
2145/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
2146/// rather than the per-action M4 path because the trait's
2147/// auto-reverse / explicit-block logic must compose with the
2148/// registry; going through the per-action path would mean
2149/// re-implementing inverse synthesis in the sync loop.
2150#[allow(clippy::too_many_arguments)]
2151fn run_teardown(
2152 report: &mut SyncReport,
2153 order: &[usize],
2154 vars: &VarEnv,
2155 workspace: &Path,
2156 event_log: &Path,
2157 lock_path: &Path,
2158 registry: &Arc<Registry>,
2159 pack_type_registry: &Arc<PackTypeRegistry>,
2160 parallel: usize,
2161 scheduler: &Arc<Scheduler>,
2162) {
2163 let rt = build_pack_type_runtime(parallel);
2164 // Reverse post-order: root first, then children. Pack-type plugin
2165 // teardown methods reverse their own children/actions, so the
2166 // outer loop only flips the inter-pack order.
2167 for &id in order.iter().rev() {
2168 let Some(node) = report.graph.node(id) else { continue };
2169 let pack_name = node.name.clone();
2170 let pack_path = node.path.clone();
2171 let manifest = node.manifest.clone();
2172 let type_tag = manifest.r#type.as_str();
2173 if pack_type_registry.get(type_tag).is_none() {
2174 let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
2175 record_action_err(report, event_log, lock_path, &pack_name, 0, "pack-type", err);
2176 return;
2177 }
2178 let ctx = ExecCtx::new(vars, &pack_path, workspace)
2179 .with_platform(Platform::current())
2180 .with_registry(registry)
2181 .with_pack_type_registry(pack_type_registry)
2182 .with_scheduler(scheduler);
2183 append_manifest_event(
2184 event_log,
2185 lock_path,
2186 &Event::ActionStarted {
2187 ts: Utc::now(),
2188 pack: pack_name.clone(),
2189 action_idx: 0,
2190 action_name: type_tag.to_string(),
2191 },
2192 &mut report.event_log_warnings,
2193 );
2194 let plugin = pack_type_registry
2195 .get(type_tag)
2196 .expect("pack-type plugin must be registered (guarded above)");
2197 // feat-m6 CI fix — see dispatch_pack_type note.
2198 let step_result =
2199 rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
2200 if !record_action_outcome(
2201 report,
2202 event_log,
2203 lock_path,
2204 &pack_name,
2205 0,
2206 type_tag,
2207 step_result,
2208 ) {
2209 return;
2210 }
2211 }
2212}
2213
2214/// Test-only hook: append one [`Event::Sync`] through the same
2215/// [`ManifestLock`]-serialised path the sync driver uses.
2216///
2217/// Exposed so integration tests under `tests/` can exercise the locked
2218/// append helper without spinning up a full pack tree. Not intended for
2219/// downstream consumers — the signature may change without notice.
2220#[doc(hidden)]
2221pub fn __test_append_sync_event(
2222 log: &Path,
2223 lock_path: &Path,
2224 pack: &str,
2225 action_name: &str,
2226) -> Result<(), String> {
2227 let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
2228 append_event_locked(log, lock_path, &event)
2229}
2230
2231// ----------------------------------------------------------------------
2232// PR E — pre-run teardown scan
2233// ----------------------------------------------------------------------
2234
2235/// One `ActionStarted` event in the manifest log that has no matching
2236/// `ActionCompleted` or `ActionHalted` peer.
2237///
2238/// Dangling starts are the primary crash signal: the process wrote the
2239/// pre-action event, then died before the executor returned. Callers
2240/// should surface these to the operator (diagnostics only this PR; a
2241/// future `grex doctor` verb will act on them).
2242#[non_exhaustive]
2243#[derive(Debug, Clone, PartialEq, Eq)]
2244pub struct DanglingStart {
2245 /// Pack that owned the halted action.
2246 pub pack: String,
2247 /// 0-based action index within the pack.
2248 pub action_idx: usize,
2249 /// Short action kind tag.
2250 pub action_name: String,
2251 /// Timestamp the `ActionStarted` event was written.
2252 pub started_at: DateTime<Utc>,
2253}
2254
2255/// Summary of teardown artifacts found under a pack root before a sync
2256/// begins.
2257///
2258/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
2259/// proceeds regardless of what the scan finds.
2260#[non_exhaustive]
2261#[derive(Debug, Clone, Default, PartialEq, Eq)]
2262pub struct RecoveryReport {
2263 /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
2264 /// original (symlink-action rollback orphan).
2265 pub orphan_backups: Vec<PathBuf>,
2266 /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
2267 /// `backup: true`.
2268 pub orphan_tombstones: Vec<PathBuf>,
2269 /// `ActionStarted` events in the log with no matching
2270 /// `ActionCompleted`/`ActionHalted`.
2271 pub dangling_starts: Vec<DanglingStart>,
2272}
2273
2274impl RecoveryReport {
2275 /// `true` when the scan found nothing worth reporting.
2276 #[must_use]
2277 pub fn is_empty(&self) -> bool {
2278 self.orphan_backups.is_empty()
2279 && self.orphan_tombstones.is_empty()
2280 && self.dangling_starts.is_empty()
2281 }
2282}
2283
2284/// Walk `workspace` and the manifest log to find crash-recovery artifacts.
2285///
2286/// Inspects:
2287///
2288/// * `workspace` for `.grex.bak` orphans and timestamped `.grex.bak.<ts>`
2289/// tombstones. The workspace IS where children materialise (whether
2290/// the default flat-sibling layout under the pack root, or an
2291/// explicit `--workspace` override directory) so this single bounded
2292/// walk covers every backup site.
2293/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
2294/// have no matching `ActionCompleted` / `ActionHalted` successor.
2295///
2296/// Non-blocking: scan errors are swallowed to an empty report so a
2297/// half-readable directory cannot kill a sync that would otherwise
2298/// succeed. Call sites that want to surface scan failures should read
2299/// the manifest directly.
2300///
2301/// Pre-`v1.1.0` post-review fix this anchored at `pack_root_dir(pack_root)`,
2302/// which missed every backup under a `--workspace` override.
2303///
2304/// # Errors
2305///
2306/// Returns [`SyncError::Validation`] only when the manifest read itself
2307/// reports corruption. Filesystem traversal errors are swallowed.
2308pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
2309 let mut report = RecoveryReport::default();
2310 walk_for_backups(workspace, &mut report);
2311 if event_log.exists() {
2312 match read_all(event_log) {
2313 Ok(events) => {
2314 report.dangling_starts = collect_dangling_starts(&events);
2315 }
2316 Err(e) => {
2317 return Err(SyncError::Validation {
2318 errors: vec![PackValidationError::DependsOnUnsatisfied {
2319 pack: "<event-log>".into(),
2320 required: e.to_string(),
2321 }],
2322 });
2323 }
2324 }
2325 }
2326 Ok(report)
2327}
2328
2329/// Shallow directory walker (bounded depth = 6) that categorizes
2330/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
2331/// report slot. Depth-limited so a pathological workspace with a deep
2332/// tree cannot stall the scan; realistic layouts are well under six
2333/// levels.
2334fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
2335 walk_for_backups_inner(root, report, 0);
2336}
2337
2338fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
2339 const MAX_DEPTH: u32 = 6;
2340 if depth > MAX_DEPTH {
2341 return;
2342 }
2343 let Ok(entries) = std::fs::read_dir(dir) else { return };
2344 for entry_result in entries {
2345 let entry = match entry_result {
2346 Ok(e) => e,
2347 Err(e) => {
2348 tracing::warn!(
2349 target: "grex::sync::recover",
2350 "skipping unreadable entry under `{}`: {e}",
2351 dir.display(),
2352 );
2353 continue;
2354 }
2355 };
2356 let path = entry.path();
2357 let name = entry.file_name();
2358 let Some(name_str) = name.to_str() else { continue };
2359 if name_str.ends_with(".grex.bak") {
2360 report.orphan_backups.push(path.clone());
2361 continue;
2362 }
2363 if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
2364 // `rsplit_once` returns `(prefix, suffix)`; suffix is the
2365 // timestamp chunk. Accept any non-empty suffix — the exact
2366 // timestamp shape is `fs_executor` internal.
2367 if !rest.1.is_empty() {
2368 report.orphan_tombstones.push(path.clone());
2369 continue;
2370 }
2371 }
2372 // Recurse only into real directories (not symlinks, to avoid
2373 // traversing into the workspace's cloned repos via aliased
2374 // paths). `entry.file_type()` does NOT follow symlinks (unlike
2375 // `entry.metadata()` which would dereference and report the
2376 // target's type — defeating the very check this guards). The
2377 // symlink-skip is also explicit so the intent is recoverable
2378 // from the source: backup-recovery never crosses a symlink.
2379 let Ok(ft) = entry.file_type() else { continue };
2380 if ft.is_symlink() {
2381 continue;
2382 }
2383 if ft.is_dir() {
2384 walk_for_backups_inner(&path, report, depth + 1);
2385 }
2386 }
2387}
2388
2389/// Reduce an event stream to a list of `ActionStarted` records with no
2390/// matching terminator.
2391///
2392/// Matching is positional per `(pack, action_idx)`: a later
2393/// `ActionCompleted` or `ActionHalted` with the same key clears the
2394/// entry. Whatever remains in the map after the pass is dangling.
2395fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
2396 use std::collections::HashMap;
2397 let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
2398 for ev in events {
2399 match ev {
2400 Event::ActionStarted { ts, pack, action_idx, action_name } => {
2401 open.insert(
2402 (pack.clone(), *action_idx),
2403 DanglingStart {
2404 pack: pack.clone(),
2405 action_idx: *action_idx,
2406 action_name: action_name.clone(),
2407 started_at: *ts,
2408 },
2409 );
2410 }
2411 Event::ActionCompleted { pack, action_idx, .. }
2412 | Event::ActionHalted { pack, action_idx, .. } => {
2413 open.remove(&(pack.clone(), *action_idx));
2414 }
2415 _ => {}
2416 }
2417 }
2418 let mut out: Vec<DanglingStart> = open.into_values().collect();
2419 out.sort_by_key(|a| a.started_at);
2420 out
2421}
2422
2423#[cfg(test)]
2424mod synthetic_transition_tests {
2425 //! v1.1.1 — regression cover for the pack-shape transition fixes.
2426 //!
2427 //! These tests exercise [`skip_eligibility`] / [`upsert_lock_entry`]
2428 //! directly (no walker, no fs) so the assertion is on the plumbing
2429 //! itself: skip eligibility must require synthetic-flag agreement
2430 //! even when the actions hash matches by coincidence, and the
2431 //! upsert path must record the real-to-synthetic downgrade in the
2432 //! lockfile so the operator's lockfile reflects what just happened.
2433 use super::{skip_eligibility, upsert_lock_entry, LockEntry};
2434 use crate::lockfile::compute_actions_hash;
2435 use chrono::{TimeZone, Utc};
2436 use std::collections::HashMap;
2437
2438 fn ts() -> chrono::DateTime<Utc> {
2439 Utc.with_ymd_and_hms(2026, 4, 27, 10, 0, 0).unwrap()
2440 }
2441
2442 /// Stable empty-actions hash with a fixed commit SHA. The same
2443 /// inputs feed both the prior (real) and the new (synthetic)
2444 /// configuration in the regression below, which is exactly the
2445 /// coincidental-hash-match scenario FIX 3 must catch.
2446 fn stable_hash() -> String {
2447 compute_actions_hash(&[], "deadbeef")
2448 }
2449
2450 fn prior_entry(synthetic: bool) -> LockEntry {
2451 LockEntry {
2452 id: "alpha".into(),
2453 path: "alpha".into(),
2454 sha: "deadbeef".into(),
2455 branch: "main".into(),
2456 installed_at: ts(),
2457 actions_hash: stable_hash(),
2458 schema_version: "1".into(),
2459 synthetic,
2460 }
2461 }
2462
2463 /// FIX 3 — pack flips from real → synthetic but `actions_hash` and
2464 /// `commit_sha` happen to match. The skip MUST be invalidated so
2465 /// the upsert path re-emits the lockfile entry with `synthetic =
2466 /// true`.
2467 #[test]
2468 fn skip_eligibility_invalidates_when_synthetic_flag_flips() {
2469 let prior = prior_entry(false);
2470 let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2471 assert!(decision.is_none(), "skip must be invalidated when synthetic flag flips");
2472 }
2473
2474 /// Same hash, same synthetic flag → skip is allowed (baseline).
2475 #[test]
2476 fn skip_eligibility_allows_skip_when_synthetic_matches() {
2477 let prior = prior_entry(true);
2478 let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
2479 assert_eq!(
2480 decision.as_deref(),
2481 Some(stable_hash().as_str()),
2482 "skip must be honoured when synthetic flag matches",
2483 );
2484 }
2485
2486 /// `dry_run` and `force` always disable the skip regardless of
2487 /// flag agreement.
2488 #[test]
2489 fn skip_eligibility_respects_dry_run_and_force() {
2490 let prior = prior_entry(true);
2491 assert!(skip_eligibility(&[], "deadbeef", true, &prior, true, false).is_none());
2492 assert!(skip_eligibility(&[], "deadbeef", true, &prior, false, true).is_none());
2493 }
2494
2495 /// FIX 4 — `upsert_lock_entry` records the downgrade in the
2496 /// lockfile (entry flips to `synthetic = true`) when the prior
2497 /// entry was real. The `tracing::warn!` is fire-and-forget, but
2498 /// the lockfile transition itself is observable and must be
2499 /// correct.
2500 #[test]
2501 fn upsert_lock_entry_records_real_to_synthetic_downgrade() {
2502 let mut prior: HashMap<String, LockEntry> = HashMap::new();
2503 prior.insert(
2504 "beta".into(),
2505 LockEntry {
2506 id: "beta".into(),
2507 path: "beta".into(),
2508 sha: "deadbeef".into(),
2509 branch: "main".into(),
2510 installed_at: ts(),
2511 actions_hash: stable_hash(),
2512 schema_version: "1".into(),
2513 synthetic: false,
2514 },
2515 );
2516 let mut next: HashMap<String, LockEntry> = HashMap::new();
2517
2518 upsert_lock_entry(&prior, &mut next, "beta", "deadbeef", &stable_hash(), true);
2519
2520 let entry = next.get("beta").expect("entry must be upserted");
2521 assert!(entry.synthetic, "downgraded entry must carry synthetic = true");
2522 assert_eq!(entry.actions_hash, stable_hash(), "actions_hash must reflect current run");
2523 }
2524
2525 /// Upsert path is a no-op for the steady-state case (synthetic →
2526 /// synthetic): the entry is replaced with the current run's
2527 /// timestamp/hash but the synthetic flag is preserved. This
2528 /// guards against an over-eager warning fire.
2529 #[test]
2530 fn upsert_lock_entry_no_op_for_steady_state_synthetic() {
2531 let mut prior: HashMap<String, LockEntry> = HashMap::new();
2532 prior.insert(
2533 "gamma".into(),
2534 LockEntry {
2535 id: "gamma".into(),
2536 path: "gamma".into(),
2537 sha: "deadbeef".into(),
2538 branch: "main".into(),
2539 installed_at: ts(),
2540 actions_hash: stable_hash(),
2541 schema_version: "1".into(),
2542 synthetic: true,
2543 },
2544 );
2545 let mut next: HashMap<String, LockEntry> = HashMap::new();
2546
2547 upsert_lock_entry(&prior, &mut next, "gamma", "deadbeef", &stable_hash(), true);
2548
2549 let entry = next.get("gamma").expect("entry must be upserted");
2550 assert!(entry.synthetic, "synthetic must remain true on no-op refresh");
2551 }
2552}
2553
2554#[cfg(test)]
2555mod error_display_tests {
2556 //! v1.2.0 Stage 1.k — `SyncError` Display assertions.
2557 //!
2558 //! Pure construction + `to_string()` checks. Variants land dormant —
2559 //! Stage 1.g (rayon scheduler) wires `SchedulerCancelled` once
2560 //! cooperative cancel polls reach the parallel walker.
2561 use super::SyncError;
2562
2563 #[test]
2564 fn test_sync_error_scheduler_cancelled_display() {
2565 let err = SyncError::SchedulerCancelled;
2566 assert_eq!(err.to_string(), "sync cancelled by user");
2567 }
2568}
2569
2570#[cfg(test)]
2571mod sync_options_v1_2_0_tests {
2572 //! v1.2.0 Stage 1.m — leaf cover for new [`SyncOptions`] fields.
2573 //!
2574 //! These tests are mechanical default-value assertions plus simple
2575 //! builder/clone round-trips. They exist to lock down that:
2576 //!
2577 //! 1. Adding the new fields preserves v1.1.1 behavior (defaults
2578 //! leave existing call sites observably unchanged).
2579 //! 2. The shape is what later walker stages (1.h / 1.j / 1.l) will
2580 //! consume — if any of these fields are renamed or change type,
2581 //! those stages must update in lock-step.
2582 //!
2583 //! The fields themselves are *dormant placeholders* at 1.m scope —
2584 //! no behavior wiring lives in this stage.
2585 use super::{pack_root_dir, resolve_workspace, SyncError, SyncOptions};
2586
2587 /// `force_prune` defaults to `false` so existing call sites refuse
2588 /// to drop dirty trees (v1.1.1 behavior).
2589 #[test]
2590 fn test_sync_options_default_force_prune_false() {
2591 let opts = SyncOptions::default();
2592 assert!(!opts.force_prune, "force_prune must default to false");
2593 }
2594
2595 /// `force_prune_with_ignored` defaults to `false` so existing call
2596 /// sites refuse to drop ignored content (v1.1.1 behavior).
2597 #[test]
2598 fn test_sync_options_default_force_prune_with_ignored_false() {
2599 let opts = SyncOptions::default();
2600 assert!(!opts.force_prune_with_ignored, "force_prune_with_ignored must default to false");
2601 }
2602
2603 /// `migrate_lockfile` defaults to `false` so the walker errors on
2604 /// legacy v1.1.1 lockfile shapes unless the caller opts in.
2605 #[test]
2606 fn test_sync_options_default_migrate_lockfile_false() {
2607 let opts = SyncOptions::default();
2608 assert!(!opts.migrate_lockfile, "migrate_lockfile must default to false");
2609 }
2610
2611 /// `recurse` defaults to `true` — the walker descends into nested
2612 /// meta-children unless `--shallow` is requested.
2613 #[test]
2614 fn test_sync_options_default_recurse_true() {
2615 let opts = SyncOptions::default();
2616 assert!(opts.recurse, "recurse must default to true");
2617 }
2618
2619 /// `max_depth` defaults to `None` — unbounded recursion when
2620 /// `recurse` is `true`.
2621 #[test]
2622 fn test_sync_options_default_max_depth_none() {
2623 let opts = SyncOptions::default();
2624 assert!(opts.max_depth.is_none(), "max_depth must default to None");
2625 }
2626
2627 /// Setting `force_prune_with_ignored = true` alongside
2628 /// `force_prune = true` is the documented "stronger" combination.
2629 /// No contradiction: `with_ignored` is the harder override and
2630 /// implies the base `force_prune` semantics. This test guards the
2631 /// invariant that both flags coexist as plain `bool` (not enum)
2632 /// so callers can set them independently without runtime panic.
2633 #[test]
2634 fn test_sync_options_force_prune_with_ignored_implies_force_prune() {
2635 let opts = SyncOptions {
2636 force_prune: true,
2637 force_prune_with_ignored: true,
2638 ..SyncOptions::default()
2639 };
2640 assert!(opts.force_prune);
2641 assert!(opts.force_prune_with_ignored);
2642 }
2643
2644 /// `max_depth = Some(n)` paired with `recurse = true` is the
2645 /// documented `--shallow=N` shape. The fields are independent
2646 /// `bool` / `Option<usize>` so callers may set `max_depth` while
2647 /// `recurse` is left at its default (`true`). Stage 1.j will
2648 /// later define the precise interaction; this test only locks
2649 /// the two fields' types and defaults.
2650 #[test]
2651 fn test_sync_options_max_depth_pairs_with_recurse() {
2652 let opts = SyncOptions { max_depth: Some(2), ..SyncOptions::default() };
2653 assert_eq!(opts.max_depth, Some(2));
2654 assert!(opts.recurse, "recurse stays at its default (true) when only max_depth is set");
2655 }
2656
2657 /// Round-trip via `Clone` — guards that all new fields participate
2658 /// in the existing `Clone` derive (no `#[clone(skip)]` slipped in).
2659 #[test]
2660 fn test_sync_options_clone_preserves_new_fields() {
2661 let opts = SyncOptions {
2662 force_prune: true,
2663 force_prune_with_ignored: true,
2664 migrate_lockfile: true,
2665 recurse: false,
2666 max_depth: Some(7),
2667 ..SyncOptions::default()
2668 };
2669 let cloned = opts.clone();
2670 assert_eq!(cloned.force_prune, opts.force_prune);
2671 assert_eq!(cloned.force_prune_with_ignored, opts.force_prune_with_ignored);
2672 assert_eq!(cloned.migrate_lockfile, opts.migrate_lockfile);
2673 assert_eq!(cloned.recurse, opts.recurse);
2674 assert_eq!(cloned.max_depth, opts.max_depth);
2675 }
2676
2677 // ------------------------------------------------------------------
2678 // v1.2.1 path (iii) — `resolve_workspace` canonicalisation tests
2679 // ------------------------------------------------------------------
2680
2681 /// `--workspace` pointing at a non-existent directory must fail
2682 /// fast with a Validation error citing the offending path. We
2683 /// explicitly do NOT mkdir-p someone else's typo — `--workspace`
2684 /// is an opt-in operator decision and a missing target is always
2685 /// a configuration mistake.
2686 #[test]
2687 fn test_resolve_workspace_errors_on_missing_override_dir() {
2688 let tmp = tempfile::tempdir().unwrap();
2689 let missing = tmp.path().join("nope");
2690 let pack_root = tmp.path();
2691 let err = resolve_workspace(pack_root, Some(missing.as_path())).expect_err("must fail");
2692 match err {
2693 SyncError::Validation { errors } => {
2694 assert!(errors.iter().any(|e| format!("{e}").contains("does not exist")));
2695 }
2696 other => panic!("expected Validation, got {other:?}"),
2697 }
2698 }
2699
2700 /// `--workspace = None` is the default cwd-meta path — no
2701 /// canonicalize, no fail-on-missing. The pack-root path is
2702 /// returned verbatim (post `pack_root_dir` normalisation).
2703 #[test]
2704 fn test_resolve_workspace_none_returns_pack_root_dir() {
2705 let tmp = tempfile::tempdir().unwrap();
2706 let pack_root = tmp.path().join("nonexistent-yet");
2707 let resolved = resolve_workspace(&pack_root, None).expect("None override is always Ok");
2708 assert_eq!(resolved, pack_root_dir(&pack_root));
2709 }
2710
2711 /// `--workspace = Some(<existing>)` returns the canonicalised path.
2712 /// On Windows this typically inserts the `\\?\` long-path prefix;
2713 /// on Unix it resolves any `..` / symlink components. Either way
2714 /// the returned path is what every downstream pass anchors against.
2715 #[test]
2716 fn test_resolve_workspace_canonicalises_existing_override() {
2717 let tmp = tempfile::tempdir().unwrap();
2718 let real = tmp.path().join("real-ws");
2719 std::fs::create_dir_all(&real).unwrap();
2720 let pack_root = tmp.path();
2721 let resolved =
2722 resolve_workspace(pack_root, Some(real.as_path())).expect("existing dir must resolve");
2723 let canonical = real.canonicalize().unwrap();
2724 assert_eq!(resolved, canonical);
2725 }
2726}