grex_core/sync.rs
1//! Sync orchestrator — M3 Stage B slice 6.
2//!
3//! Glues the building blocks shipped in slices 1–5b into a single runnable
4//! pipeline:
5//!
6//! 1. Walk a pack tree via [`Walker`] + [`FsPackLoader`] + a `GitBackend`.
7//! 2. Run plan-phase validators (manifest-level + graph-level).
8//! 3. Execute every action via a pluggable [`ActionExecutor`]
9//! ([`PlanExecutor`] for dry-run, [`FsExecutor`] for wet-run).
10//! 4. Record each step as an [`Event::Sync`] entry in the pack-root's
11//! `.grex/grex.jsonl` event log.
12//!
13//! # Traversal order
14//!
15//! Nodes are executed in **depth-first post-order**: children fully install
16//! before their parent. Rationale: parent packs commonly `require:` artifacts
17//! created by children (e.g. a parent symlink whose `src` lives inside a
18//! child). Running the root last matches the overlay-style dotfile-install
19//! intent authors expect, and it matches how `walker.walk` is structured
20//! (children are hydrated before the recursion returns).
21//!
22//! # Decoupling
23//!
24//! The CLI crate drives this module through a thin `run()` entry point;
25//! [`SyncOptions`] is `#[non_exhaustive]` so new knobs (parallelism, filter
26//! expressions, ref overrides) can land in later milestones without breaking
27//! CLI callers. Errors aggregate into [`SyncError`] with a small, stable
28//! variant set.
29
30use std::borrow::Cow;
31use std::fs;
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34
35use chrono::{DateTime, Utc};
36use globset::{Glob, GlobSet, GlobSetBuilder};
37use thiserror::Error;
38use tokio_util::sync::CancellationToken;
39
40use crate::execute::{
41 ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
42 PlanExecutor, Platform, StepKind,
43};
44use crate::fs::{ManifestLock, ScopedLock};
45use crate::git::GixBackend;
46use crate::lockfile::{
47 compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
48};
49use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
50use crate::pack::{Action, PackValidationError};
51use crate::plugin::{PackTypeRegistry, Registry};
52use crate::scheduler::Scheduler;
53use crate::tree::{FsPackLoader, PackGraph, PackNode, TreeError, Walker};
54use crate::vars::VarEnv;
55
56/// Inputs to [`run`].
57///
58/// Fields are public-writable so call sites can construct with struct
59/// literals and `..SyncOptions::default()`. Marked `#[non_exhaustive]`
60/// so future knobs (parallelism, filter expressions, additional ref
61/// strategies) can land without breaking library consumers who
62/// constructed with explicit-literal syntax. Forces callers to use
63/// struct-update syntax (`..Default::default()`).
64#[non_exhaustive]
65#[derive(Debug, Clone)]
66pub struct SyncOptions {
67 /// When `true`, use [`PlanExecutor`] (no filesystem mutations).
68 pub dry_run: bool,
69 /// When `false`, skip plan-phase validators (manifest + graph). Debug
70 /// escape hatch; production callers should leave this `true`.
71 pub validate: bool,
72 /// Override workspace directory. `None` → the parent pack root itself
73 /// (children resolve as flat siblings of the parent pack root).
74 pub workspace: Option<PathBuf>,
75 /// Global ref override (`grex sync --ref <sha|branch|tag>`). When
76 /// `Some`, every child pack clone/checkout uses this ref instead of
77 /// the declared `child.ref`. Empty strings are rejected at the CLI
78 /// layer.
79 pub ref_override: Option<String>,
80 /// Pack-path filter patterns (`grex sync --only <glob>`). Raw glob
81 /// strings — compiled internally via an in-crate `globset` helper so the
82 /// `globset` crate version does not leak into the public API.
83 /// `None` / empty means every pack runs (M3 semantics). Matching is
84 /// against the pack's **workspace-relative** path normalized to
85 /// forward-slash form.
86 pub only_patterns: Option<Vec<String>>,
87 /// Bypass the lockfile hash-match skip (`grex sync --force`). When
88 /// `true`, every pack re-executes even if its `actions_hash` is
89 /// unchanged from the prior lockfile.
90 pub force: bool,
91 /// Max parallel pack ops for this sync run (feat-m6-1).
92 ///
93 /// * `None` → callers default to `num_cpus::get()` at CLI layer.
94 /// Library callers who construct `SyncOptions` directly and leave
95 /// this `None` get `num_cpus::get()` semantics too — the sync
96 /// driver resolves the default in one place so the scheduler slot
97 /// on every `ExecCtx` is always populated.
98 /// * `Some(0)` → unbounded (`Semaphore::MAX_PERMITS`).
99 /// * `Some(1)` → serial fast-path.
100 /// * `Some(n >= 2)` → bounded parallel.
101 pub parallel: Option<usize>,
102}
103
104impl Default for SyncOptions {
105 fn default() -> Self {
106 Self {
107 dry_run: false,
108 validate: true,
109 workspace: None,
110 ref_override: None,
111 only_patterns: None,
112 force: false,
113 parallel: None,
114 }
115 }
116}
117
118/// Compile raw `--only` pattern strings into a [`globset::GlobSet`].
119/// Empty / absent input yields `Ok(None)` so M3's zero-config path
120/// (every pack runs) stays the default.
121fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
122 let Some(pats) = patterns else { return Ok(None) };
123 if pats.is_empty() {
124 return Ok(None);
125 }
126 let mut builder = GlobSetBuilder::new();
127 for p in pats {
128 let glob = Glob::new(p)
129 .map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
130 builder.add(glob);
131 }
132 let set = builder
133 .build()
134 .map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
135 Ok(Some(set))
136}
137
138impl SyncOptions {
139 /// Default options: wet-run, validators enabled, default workspace path.
140 #[must_use]
141 pub fn new() -> Self {
142 Self::default()
143 }
144
145 /// Set `dry_run`.
146 #[must_use]
147 pub fn with_dry_run(mut self, dry_run: bool) -> Self {
148 self.dry_run = dry_run;
149 self
150 }
151
152 /// Set `validate`.
153 #[must_use]
154 pub fn with_validate(mut self, validate: bool) -> Self {
155 self.validate = validate;
156 self
157 }
158
159 /// Set `workspace` override.
160 #[must_use]
161 pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
162 self.workspace = workspace;
163 self
164 }
165
166 /// Set `ref_override` (`--ref`).
167 #[must_use]
168 pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
169 self.ref_override = ref_override;
170 self
171 }
172
173 /// Set `only_patterns` (`--only`). Empty vector or `None` disables
174 /// the filter.
175 #[must_use]
176 pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
177 self.only_patterns = patterns;
178 self
179 }
180
181 /// Set `force` (`--force`).
182 #[must_use]
183 pub fn with_force(mut self, force: bool) -> Self {
184 self.force = force;
185 self
186 }
187
188 /// Set `parallel` (`--parallel`). See [`SyncOptions::parallel`] for
189 /// the `None` / `Some(0)` / `Some(1)` / `Some(n)` semantics.
190 #[must_use]
191 pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
192 self.parallel = parallel;
193 self
194 }
195}
196
197/// One executed (or planned) action step in a sync run.
198///
199/// Marked `#[non_exhaustive]` so new observability fields (timestamps,
200/// plugin provenance) can land without breaking library consumers who
201/// destructure the struct.
202#[non_exhaustive]
203#[derive(Debug, Clone)]
204pub struct SyncStep {
205 /// Name of the pack that owned the action.
206 pub pack: String,
207 /// 0-based index into the pack's top-level `actions` vector.
208 pub action_idx: usize,
209 /// The [`ExecStep`] record emitted by the executor.
210 pub exec_step: ExecStep,
211}
212
213/// Outcome of a [`run`] invocation.
214///
215/// On fail-fast termination, `halted` carries the error that stopped the
216/// sync; every completed step up to that point is still in `steps` so
217/// callers can render a partial transcript.
218///
219/// Marked `#[non_exhaustive]` so new report-level fields (run id, metrics)
220/// can land without breaking library consumers who destructure the struct.
221#[non_exhaustive]
222#[derive(Debug)]
223pub struct SyncReport {
224 /// Fully-walked pack graph (present even on halted runs).
225 pub graph: PackGraph,
226 /// Steps produced by the executor, in execution order.
227 pub steps: Vec<SyncStep>,
228 /// `Some(e)` if execution stopped before all actions ran.
229 pub halted: Option<SyncError>,
230 /// Non-fatal manifest-append warnings (one per failed event append).
231 /// Kept as a separate field because spec marks event-log write failures
232 /// as non-aborting.
233 pub event_log_warnings: Vec<String>,
234 /// `Some(r)` when the pre-run teardown scan found orphaned backup
235 /// files or dangling [`Event::ActionStarted`] records from a prior
236 /// crashed run. Informational only — the report is still returned and
237 /// the sync proceeds. CLI renderers should surface a warning so the
238 /// operator can decide whether to run a future `grex doctor` verb.
239 pub pre_run_recovery: Option<RecoveryReport>,
240 /// One entry per child whose legacy `.grex/workspace/<name>/` layout
241 /// was relocated (or considered for relocation) on this sync. Empty
242 /// when no legacy directory was found — the common case for any
243 /// workspace built fresh on v1.1.0+. CLI renderers should surface
244 /// the list so operators see what changed.
245 pub workspace_migrations: Vec<WorkspaceMigration>,
246}
247
248/// One legacy-layout migration attempt. `outcome` distinguishes the
249/// move-succeeded case from the don't-clobber-user-data case so CLI
250/// renderers can present different advice to the operator.
251#[non_exhaustive]
252#[derive(Debug, Clone, PartialEq, Eq)]
253pub struct WorkspaceMigration {
254 /// Source path under the legacy `.grex/workspace/<name>/` location,
255 /// rendered relative to the pack root for log readability.
256 pub from: PathBuf,
257 /// Destination flat-sibling path `<pack_root>/<name>/`, relative to
258 /// the pack root.
259 pub to: PathBuf,
260 /// What happened.
261 pub outcome: MigrationOutcome,
262}
263
264/// Outcome of one legacy-layout migration attempt.
265#[non_exhaustive]
266#[derive(Debug, Clone, PartialEq, Eq)]
267pub enum MigrationOutcome {
268 /// Legacy directory was renamed onto the flat-sibling slot.
269 Migrated,
270 /// Both legacy and flat-sibling slots existed. Skipped — the user
271 /// must inspect and reconcile manually so we never silently delete
272 /// either.
273 SkippedBothExist,
274 /// Flat-sibling slot already had a non-grex file or directory in
275 /// the way. Skipped — refusing to clobber user data even when the
276 /// legacy slot is plainly the source of truth.
277 SkippedDestOccupied,
278 /// `fs::rename` failed (e.g. cross-volume, ACL denied). The legacy
279 /// directory is still in place; surfaced so the operator can move
280 /// it manually.
281 Failed { error: String },
282}
283
284/// Rich context attached to a [`SyncError::Halted`] variant.
285///
286/// Packages the pack + action position together with the underlying
287/// executor error and an optional human-readable recovery hint. Marked
288/// `#[non_exhaustive]` so future fields (step transcript, timestamp) can
289/// land without breaking `match` arms or struct destructures.
290#[non_exhaustive]
291#[derive(Debug)]
292pub struct HaltedContext {
293 /// Name of the pack that owned the halted action.
294 pub pack: String,
295 /// 0-based index into the pack's top-level `actions` vector.
296 pub action_idx: usize,
297 /// Short action kind tag (e.g. `"symlink"`, `"exec"`).
298 pub action_name: String,
299 /// Underlying executor error.
300 pub error: ExecError,
301 /// Optional next-step suggestion for the operator. `None` when no
302 /// generic hint applies — the executor error's own `Display` already
303 /// tells the story.
304 pub recovery_hint: Option<String>,
305}
306
307/// Error taxonomy surfaced by [`run`].
308#[non_exhaustive]
309#[derive(Debug, Error)]
310pub enum SyncError {
311 /// The pack-tree walker failed (loader error, git error, cycle, …).
312 #[error("tree walk failed: {0}")]
313 Tree(#[from] TreeError),
314 /// One or more plan-phase validators flagged the graph.
315 #[error("validation failed: {errors:?}")]
316 Validation {
317 /// Aggregated errors from manifest-level + graph-level validators.
318 errors: Vec<PackValidationError>,
319 },
320 /// An action executor returned an error.
321 ///
322 /// Retained for backward compatibility; new call sites should prefer
323 /// [`SyncError::Halted`] which carries full pack + action context.
324 /// Kept non-deprecated because [`From<ExecError>`] still materialises
325 /// the variant for non-sync-loop callers (e.g. ad-hoc helpers).
326 #[error("action execution failed: {0}")]
327 Exec(#[from] ExecError),
328 /// Action execution halted; full context (pack, action index, error,
329 /// optional recovery hint) lives in [`HaltedContext`]. This is the
330 /// variant the sync driver emits — [`SyncError::Exec`] is only
331 /// surfaced by ancillary code paths.
332 #[error(
333 "sync halted at pack `{}` action #{} ({}): {}",
334 .0.pack, .0.action_idx, .0.action_name, .0.error
335 )]
336 Halted(Box<HaltedContext>),
337 /// Another `grex` process (or thread) already holds the workspace-level
338 /// lock. The running sync refused to start to avoid racing two concurrent
339 /// walkers into the same workspace. If the lock file at `lock_path` is
340 /// stale (no other grex is actually running), remove it by hand.
341 #[error(
342 "workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
343 )]
344 WorkspaceBusy {
345 /// Resolved workspace directory that the current run tried to lock.
346 workspace: PathBuf,
347 /// Sidecar lock file that is currently held.
348 lock_path: PathBuf,
349 },
350 /// Reading or parsing the resolved-state lockfile failed. Surfaced as
351 /// its own variant (rather than folded into `Validation`) because a
352 /// corrupt / unreadable lockfile is an I/O or schema fault, not a
353 /// dependency-satisfaction fault. Resolution is operator-level
354 /// (restore a backup, delete the file, re-sync), not author-level.
355 #[error("lockfile `{path}` failed to load: {source}")]
356 Lockfile {
357 /// Lockfile path that failed to load.
358 path: PathBuf,
359 /// Underlying lockfile error.
360 #[source]
361 source: LockfileError,
362 },
363 /// One of the `--only <GLOB>` patterns failed to compile. Surfaced
364 /// as its own variant so the CLI can map it to a dedicated usage
365 /// error exit code instead of the generic sync-failure bucket.
366 #[error("invalid --only glob `{pattern}`: {source}")]
367 InvalidOnlyGlob {
368 /// The raw pattern string that failed to compile.
369 pattern: String,
370 /// Underlying globset error.
371 #[source]
372 source: globset::Error,
373 },
374}
375
376impl Clone for SyncError {
377 fn clone(&self) -> Self {
378 // `TreeError` / `ExecError` do not implement `Clone` (they wrap
379 // `std::io::Error`-adjacent values). Halts carry only a display
380 // rendering in the report; we re-materialise via a synthetic
381 // `Validation` variant so `SyncReport` can be `Clone`-safe for
382 // observability tooling without widening the taxonomy.
383 match self {
384 Self::Tree(e) => Self::Validation {
385 errors: vec![PackValidationError::DependsOnUnsatisfied {
386 pack: "<tree>".into(),
387 required: e.to_string(),
388 }],
389 },
390 Self::Validation { errors } => Self::Validation { errors: errors.clone() },
391 Self::Exec(e) => Self::Validation {
392 errors: vec![PackValidationError::DependsOnUnsatisfied {
393 pack: "<exec>".into(),
394 required: e.to_string(),
395 }],
396 },
397 Self::Halted(ctx) => Self::Validation {
398 errors: vec![PackValidationError::DependsOnUnsatisfied {
399 pack: ctx.pack.clone(),
400 required: format!(
401 "action #{} ({}): {}",
402 ctx.action_idx, ctx.action_name, ctx.error
403 ),
404 }],
405 },
406 Self::WorkspaceBusy { workspace, lock_path } => {
407 Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
408 }
409 Self::Lockfile { path, source } => Self::Validation {
410 errors: vec![PackValidationError::DependsOnUnsatisfied {
411 pack: "<lockfile>".into(),
412 required: format!("{}: {source}", path.display()),
413 }],
414 },
415 Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
416 errors: vec![PackValidationError::DependsOnUnsatisfied {
417 pack: "<only-glob>".into(),
418 required: format!("{pattern}: {source}"),
419 }],
420 },
421 }
422 }
423}
424
425/// Run a full sync over the pack tree rooted at `pack_root`.
426///
427/// Resolution rules:
428/// * If `pack_root` is a directory the walker looks for
429/// `<pack_root>/.grex/pack.yaml`.
430/// * If `pack_root` ends in `.yaml` / `.yml` it is loaded verbatim.
431/// * Workspace defaults to the pack root directory itself when
432/// `opts.workspace` is `None`. Children resolve as flat siblings of the
433/// parent pack root (since v1.1.0).
434///
435/// # Errors
436///
437/// Returns the first error that halts the pipeline — see [`SyncError`] for
438/// the taxonomy.
439///
440/// `cancel` is the cooperative cancellation handle threaded through the
441/// pipeline by feat-m7-1 stage 2. Stage 2 only wires the parameter; the
442/// `is_cancelled()` polls land in stages 3-4 (scheduler + pack-lock
443/// acquire). CLI callers pass a never-cancelled sentinel
444/// (`CancellationToken::new()`); the MCP server passes a token tied to
445/// the request lifetime.
446pub fn run(
447 pack_root: &Path,
448 opts: &SyncOptions,
449 cancel: &CancellationToken,
450) -> Result<SyncReport, SyncError> {
451 // Stage 2 is signature-only — silence "unused parameter" without
452 // hiding it behind `_` (downstream stages will read it).
453 let _ = cancel;
454 let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
455 ensure_workspace_dir(&workspace)?;
456 let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
457 let _ws_guard = match ws_lock.try_acquire() {
458 Ok(Some(g)) => g,
459 Ok(None) => {
460 return Err(SyncError::WorkspaceBusy {
461 workspace: workspace.clone(),
462 lock_path: ws_lock_path,
463 });
464 }
465 Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
466 };
467
468 // Compile `--only` patterns into a GlobSet here so the
469 // `globset` crate version does not leak into `SyncOptions`.
470 let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
471
472 // Auto-migrate legacy `.grex/workspace/<name>/` layout BEFORE the
473 // walker resolves children. Idempotent: a fresh v1.1.0+ workspace
474 // sees no legacy directory and the function no-ops.
475 let workspace_migrations = migrate_legacy_workspace(pack_root);
476
477 let graph =
478 walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
479 let prep = prepare_run_context(pack_root, &graph, &workspace)?;
480 log_force_flag(opts.force);
481
482 let mut report = SyncReport {
483 graph,
484 steps: Vec::new(),
485 halted: None,
486 event_log_warnings: Vec::new(),
487 pre_run_recovery: prep.pre_run_recovery,
488 workspace_migrations,
489 };
490
491 let mut next_lock = prep.prior_lock.clone();
492 // feat-m6 B1: resolve `--parallel` once and build the scheduler
493 // shared across every `ExecCtx` in this run. Library callers who
494 // leave `opts.parallel == None` default to `num_cpus::get()` here
495 // (clamped `>= 1`) so the scheduler slot is always populated —
496 // `ctx.scheduler` being `None` would strand acquire-sites into
497 // unbounded concurrency. See `.omne/cfg/concurrency.md` §Scheduler.
498 let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
499 let scheduler = Arc::new(Scheduler::new(resolved_parallel));
500 run_actions(
501 &mut report,
502 &prep.order,
503 &prep.vars,
504 &workspace,
505 &prep.event_log,
506 &prep.lock_path,
507 opts.dry_run,
508 &prep.prior_lock,
509 &mut next_lock,
510 &prep.registry,
511 &prep.pack_type_registry,
512 only_set.as_ref(),
513 opts.force,
514 resolved_parallel,
515 &scheduler,
516 );
517
518 persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
519 Ok(report)
520}
521
522/// Bag of context pieces assembled once at the top of [`run`]. Grouping
523/// them keeps [`run`] under the workspace's 50-LOC function lint without
524/// smearing the read of sequential setup across helpers. Fields are
525/// consumed piecemeal by the actions loop; no getters needed.
526struct RunContext {
527 order: Vec<usize>,
528 vars: VarEnv,
529 event_log: PathBuf,
530 lock_path: PathBuf,
531 lockfile_path: PathBuf,
532 prior_lock: std::collections::HashMap<String, LockEntry>,
533 registry: Arc<Registry>,
534 pack_type_registry: Arc<PackTypeRegistry>,
535 pre_run_recovery: Option<RecoveryReport>,
536}
537
538/// Build the per-run context: traversal order, vars env, event/lockfile
539/// paths, prior lockfile state, bootstrap registry, and (optionally) a
540/// pre-run recovery scan. Kept narrow so [`run`] stays small.
541///
542/// `workspace` is the resolved workspace directory (post `--workspace`
543/// override) so the recovery scan looks for `.grex.bak` artefacts under
544/// the actual on-disk location children were materialised at — not
545/// under the pack root, which differs from the workspace whenever the
546/// CLI's `--workspace` flag is used. Pre-fix this anchoring drift
547/// caused recovery scans to miss every backup left under an override
548/// workspace.
549fn prepare_run_context(
550 pack_root: &Path,
551 graph: &PackGraph,
552 workspace: &Path,
553) -> Result<RunContext, SyncError> {
554 let event_log = event_log_path(pack_root);
555 let lock_path = event_lock_path(&event_log);
556 let vars = VarEnv::from_os();
557 let order = post_order(graph);
558 let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
559 let lockfile_path = lockfile_path(pack_root);
560 let prior_lock = load_prior_lock(&lockfile_path)?;
561 let registry = Arc::new(Registry::bootstrap());
562 let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
563 Ok(RunContext {
564 order,
565 vars,
566 event_log,
567 lock_path,
568 lockfile_path,
569 prior_lock,
570 registry,
571 pack_type_registry,
572 pre_run_recovery,
573 })
574}
575
576/// Build the [`PackTypeRegistry`] the sync driver threads into every
577/// [`ExecCtx`] it constructs.
578///
579/// Default path (no `plugin-inventory` feature) hard-codes the three
580/// built-ins via [`PackTypeRegistry::bootstrap`]. With the feature on,
581/// [`PackTypeRegistry::bootstrap_from_inventory`] is preferred so any
582/// externally-submitted plugin types (mirroring the M4-E pattern for
583/// action plugins) shadow the built-ins last-writer-wins. Kept as a free
584/// helper so the `#[cfg]` split lives in one place instead of being
585/// smeared across every sync call-site.
586fn bootstrap_pack_type_registry() -> PackTypeRegistry {
587 #[cfg(feature = "plugin-inventory")]
588 {
589 let mut reg = PackTypeRegistry::bootstrap();
590 reg.register_from_inventory();
591 reg
592 }
593 #[cfg(not(feature = "plugin-inventory"))]
594 {
595 PackTypeRegistry::bootstrap()
596 }
597}
598
599/// Emit a single `tracing::info!` line when `--force` is active so
600/// operators can confirm from logs that the skip short-circuit was
601/// bypassed. Extracted so [`run`] stays small.
602fn log_force_flag(force: bool) {
603 if force {
604 tracing::info!(
605 target: "grex::sync",
606 "--force active: bypassing lockfile skip-on-hash short-circuit"
607 );
608 }
609}
610
611/// Walk the pack tree rooted at `pack_root`, optionally running the
612/// plan-phase validators. Extracted so [`run`] stays under the
613/// workspace's 50-LOC per-function lint threshold.
614fn walk_and_validate(
615 pack_root: &Path,
616 workspace: &Path,
617 validate: bool,
618 ref_override: Option<&str>,
619) -> Result<PackGraph, SyncError> {
620 let loader = FsPackLoader::new();
621 let backend = GixBackend::new();
622 let walker = Walker::new(&loader, &backend, workspace.to_path_buf())
623 .with_ref_override(ref_override.map(str::to_string));
624 let graph = walker.walk(pack_root)?;
625 if validate {
626 validate_graph(&graph)?;
627 }
628 Ok(graph)
629}
630
631/// Load the prior lockfile (`grex.lock.jsonl`). Missing file yields an
632/// empty map; parse errors are fatal since writes are atomic and a torn
633/// lockfile therefore indicates real corruption that must be resolved
634/// before a fresh sync is safe. Parse/IO failures surface as
635/// [`SyncError::Lockfile`] — this is an I/O / schema fault, not a
636/// dependency-satisfaction fault, so it gets its own taxonomy slot.
637fn load_prior_lock(
638 lockfile_path: &Path,
639) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
640 read_lockfile(lockfile_path)
641 .map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
642}
643
644/// Persist `next_lock` atomically to `lockfile_path` whenever this was
645/// not a dry-run. On a halt the map has already had the halted pack's
646/// entry removed (see `run_actions`), so persisting now preserves every
647/// *successful* pack's fresh entry while guaranteeing absence of an
648/// entry for the halted pack — next sync sees no prior hash there and
649/// re-executes from scratch (route (b) halt-state gating). Write errors
650/// surface as non-fatal warnings on the report.
651fn persist_lockfile_if_clean(
652 report: &mut SyncReport,
653 lockfile_path: &Path,
654 next_lock: &std::collections::HashMap<String, LockEntry>,
655 dry_run: bool,
656) {
657 if dry_run {
658 return;
659 }
660 if let Err(e) = write_lockfile(lockfile_path, next_lock) {
661 tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
662 report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
663 }
664}
665
666/// Canonical location of the resolved-state lockfile
667/// (`<pack_root>/.grex/grex.lock.jsonl`). Colocated with the event log
668/// so both audit artifacts live under a single `.grex/` sidecar.
669fn lockfile_path(pack_root: &Path) -> PathBuf {
670 pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
671}
672
673/// Create the workspace directory if it does not yet exist.
674fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
675 if !workspace.exists() {
676 std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
677 errors: vec![PackValidationError::DependsOnUnsatisfied {
678 pack: "<workspace>".into(),
679 required: format!("{}: {e}", workspace.display()),
680 }],
681 })?;
682 }
683 Ok(())
684}
685
686/// Open (but do not acquire) the workspace-level lock file.
687fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
688 let ws_lock_path = workspace_lock_path(workspace);
689 let ws_lock = ScopedLock::open(&ws_lock_path)
690 .map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
691 Ok((ws_lock, ws_lock_path))
692}
693
694/// Build a `Validation` error describing a workspace-lock failure.
695fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
696 SyncError::Validation {
697 errors: vec![PackValidationError::DependsOnUnsatisfied {
698 pack: "<workspace-lock>".into(),
699 required: format!("{}: {reason}", ws_lock_path.display()),
700 }],
701 }
702}
703
704/// Single source of truth for the legacy workspace directory name.
705/// Pre-`v1.1.0` `resolve_workspace` joined `.grex/workspace/` onto the
706/// pack root by default; the auto-migration in
707/// [`migrate_legacy_workspace`] is the only place that legacy literal
708/// is allowed to appear in `crates/grex-core/src/`. The grep gate in
709/// the v1.1.0 release checklist allows this one constant.
710const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
711
712/// Auto-migrate any legacy `.grex/workspace/<name>/` child layout left
713/// over from v1.0.x to the v1.1.0 flat-sibling layout. Idempotent: a
714/// fresh workspace built on v1.1.0+ sees no `.grex/workspace/`
715/// directory and the function no-ops.
716///
717/// Per-child outcomes:
718///
719/// * **Both legacy + flat-sibling exist** → `SkippedBothExist`. The
720/// user needs to inspect (perhaps the legacy is stale, perhaps it is
721/// the source of truth); we never silently delete either.
722/// * **Flat-sibling slot occupied by a non-grex file or non-empty dir**
723/// → `SkippedDestOccupied`. Refuse to clobber user data.
724/// * **Legacy exists, flat-sibling absent** → `Migrated` via atomic
725/// `fs::rename`. Same-volume move is the common case (the migration
726/// stays inside `pack_root`); cross-volume failures surface as
727/// `Failed { error }` with the OS message so the operator can move
728/// manually.
729/// * **Legacy absent** → silent no-op (not recorded in the report).
730///
731/// After all per-child decisions: orphan `.grex.sync.lock` under the
732/// legacy workspace is removed (best-effort) and the empty
733/// `.grex/workspace/` directory is rmdir'd (best-effort). Both are
734/// soft-failures: leaving them on disk is harmless, surfacing the
735/// errors as a sync abort would be over-strict.
736///
737/// Discovery is by directory listing, not by parent-manifest parse —
738/// migration must work even when the parent manifest itself was
739/// rewritten between versions. A child counts as "legacy" iff
740/// `<pack_root>/<LEGACY_WORKSPACE_DIR>/<name>/.git` exists (i.e. it is
741/// an actual git working tree, not stray metadata).
742fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
743 let root = pack_root_dir(pack_root);
744 let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
745 if !legacy_root.is_dir() {
746 return Vec::new();
747 }
748 let entries = match fs::read_dir(&legacy_root) {
749 Ok(e) => e,
750 Err(e) => {
751 tracing::warn!(
752 target: "grex::sync::migrate",
753 "legacy workspace `{}` unreadable: {e}",
754 legacy_root.display(),
755 );
756 return Vec::new();
757 }
758 };
759 let mut migrations = Vec::new();
760 for entry_result in entries {
761 let entry = match entry_result {
762 Ok(e) => e,
763 Err(e) => {
764 tracing::warn!(
765 target: "grex::sync::migrate",
766 "skipping unreadable entry under `{}`: {e}",
767 legacy_root.display(),
768 );
769 continue;
770 }
771 };
772 let Ok(ft) = entry.file_type() else { continue };
773 // file_type avoids symlink-following; legitimate v1.0.x children
774 // were always real directories, so anything else is skipped.
775 if ft.is_symlink() || !ft.is_dir() {
776 continue;
777 }
778 let name_os = entry.file_name();
779 let Some(name) = name_os.to_str() else { continue };
780 // Only act on entries that look like real cloned children (have
781 // a `.git`). The legacy workspace lock file (`.grex.sync.lock`)
782 // is not a directory and is filtered out by the dir check above;
783 // we clean it up explicitly after the migration loop completes.
784 let from_abs = entry.path();
785 if !from_abs.join(".git").exists() {
786 continue;
787 }
788 let to_abs = root.join(name);
789 let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
790 let to_rel = PathBuf::from(name);
791 let outcome = decide_and_migrate(&from_abs, &to_abs);
792 log_migration(&from_rel, &to_rel, &outcome);
793 migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
794 }
795 cleanup_legacy_workspace_root(&legacy_root);
796 migrations
797}
798
799/// Decide what to do with one legacy child + perform the move when
800/// safe. Returns the outcome to record on the [`WorkspaceMigration`].
801fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
802 let dest_exists = to.exists();
803 let dest_is_grex_repo = dest_exists && to.join(".git").exists();
804 if dest_is_grex_repo {
805 // Both legacy and flat-sibling are git repos. Refuse to choose
806 // between them; let the user resolve.
807 return MigrationOutcome::SkippedBothExist;
808 }
809 if dest_exists {
810 // Some other entry occupies the flat-sibling slot — a stray
811 // file, an empty dir, an unrelated dir. Treat as user data and
812 // leave both in place.
813 return MigrationOutcome::SkippedDestOccupied;
814 }
815 match fs::rename(from, to) {
816 Ok(()) => MigrationOutcome::Migrated,
817 Err(e) => MigrationOutcome::Failed { error: e.to_string() },
818 }
819}
820
821/// Emit one structured log line per migration so users see exactly what
822/// happened during the upgrade. Severity matches outcome: success is
823/// `info`, skips and failures are `warn` so they surface in the default
824/// log level without forcing operators to crank verbosity.
825fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
826 let from_disp = from.display();
827 let to_disp = to.display();
828 match outcome {
829 MigrationOutcome::Migrated => {
830 tracing::info!(
831 target: "grex::sync::migrate",
832 "migrated: legacy={from_disp} -> new={to_disp}",
833 );
834 }
835 MigrationOutcome::SkippedBothExist => {
836 tracing::warn!(
837 target: "grex::sync::migrate",
838 "skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
839 );
840 }
841 MigrationOutcome::SkippedDestOccupied => {
842 tracing::warn!(
843 target: "grex::sync::migrate",
844 "skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
845 );
846 }
847 MigrationOutcome::Failed { error } => {
848 tracing::warn!(
849 target: "grex::sync::migrate",
850 "failed: legacy={from_disp} -> new={to_disp}: {error}",
851 );
852 }
853 }
854}
855
856/// Best-effort cleanup of the legacy workspace root after migration:
857/// remove the orphan `.grex.sync.lock` (always safe — the v1.1.0
858/// workspace lock lives at `<pack_root>/.grex.sync.lock`) and try to
859/// rmdir the now-empty `.grex/workspace/` directory. Errors are logged
860/// at trace level only — both leftovers are harmless.
861fn cleanup_legacy_workspace_root(legacy_root: &Path) {
862 let orphan_lock = legacy_root.join(".grex.sync.lock");
863 if orphan_lock.exists() {
864 if let Err(e) = fs::remove_file(&orphan_lock) {
865 tracing::warn!(
866 target: "grex::sync::migrate",
867 "could not remove orphan lock `{}`: {e}",
868 orphan_lock.display(),
869 );
870 } else {
871 tracing::info!(
872 target: "grex::sync::migrate",
873 "removed orphan lock `{}`",
874 orphan_lock.display(),
875 );
876 }
877 }
878 // `remove_dir` only succeeds when the directory is empty — exactly
879 // what we want; if any unmigrated child remains, the legacy root
880 // stays put for the operator to inspect.
881 let _ = fs::remove_dir(legacy_root);
882}
883
884/// Compute the default workspace path when `override_` is absent.
885///
886/// The default is the pack root directory itself, so child packs
887/// resolve as flat siblings of the parent pack root. The rationale —
888/// alignment with the long-standing pack-spec rule that
889/// `children[].path` is a bare name — lives in the pack-spec
890/// "Validation rules" section (`man/concepts/pack-spec.md` /
891/// `grex-doc/src/concepts/pack-spec.md`).
892fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> PathBuf {
893 if let Some(p) = override_ {
894 return p.to_path_buf();
895 }
896 pack_root_dir(pack_root)
897}
898
899/// If `pack_root` points at a yaml file, use its parent; otherwise use it.
900fn pack_root_dir(pack_root: &Path) -> PathBuf {
901 let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
902 if is_yaml {
903 pack_root
904 .parent()
905 .and_then(Path::parent)
906 .map_or_else(|| PathBuf::from("."), Path::to_path_buf)
907 } else {
908 pack_root.to_path_buf()
909 }
910}
911
912/// Compute the `.grex/grex.jsonl` path next to the pack root.
913fn event_log_path(pack_root: &Path) -> PathBuf {
914 pack_root_dir(pack_root).join(".grex").join("grex.jsonl")
915}
916
917/// Compute the sidecar lock path next to the event log. One canonical slot
918/// per pack root — cooperating grex procs serialize through this file.
919fn event_lock_path(event_log: &Path) -> PathBuf {
920 event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
921}
922
923/// Compute the sidecar lock path for the workspace itself. Lives at
924/// `<workspace>/.grex.sync.lock` — the workspace dir is already created by
925/// the `run()` prologue, so the lock sidecar lands beside the child clones.
926fn workspace_lock_path(workspace: &Path) -> PathBuf {
927 workspace.join(".grex.sync.lock")
928}
929
930/// Aggregate manifest-level + graph-level validators and return their output.
931fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
932 let mut errors: Vec<PackValidationError> = Vec::new();
933 for node in graph.nodes() {
934 if let Err(mut e) = node.manifest.validate_plan() {
935 errors.append(&mut e);
936 }
937 }
938 if let Err(mut e) = graph.validate() {
939 errors.append(&mut e);
940 }
941 if errors.is_empty() {
942 Ok(())
943 } else {
944 Err(SyncError::Validation { errors })
945 }
946}
947
948/// Depth-first post-order traversal of the graph starting from root.
949///
950/// Children fully precede their parent in the returned vector so downstream
951/// executors install leaves first and the root last.
952fn post_order(graph: &PackGraph) -> Vec<usize> {
953 let mut out = Vec::with_capacity(graph.nodes().len());
954 visit_post(graph, 0, &mut out);
955 out
956}
957
958fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
959 // Collect child ids first to avoid borrow conflicts with graph iteration.
960 let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
961 for k in kids {
962 visit_post(graph, k, out);
963 }
964 out.push(id);
965}
966
967/// Drive every action for every node; abort on the first [`ExecError`].
968///
969/// Each action is bracketed by three manifest events:
970/// 1. [`Event::ActionStarted`] — appended **before** `execute` returns.
971/// 2. [`Event::ActionCompleted`] — appended on `Ok(step)`.
972/// 3. [`Event::ActionHalted`] — appended on `Err(e)` before returning.
973///
974/// All three writes go through the same [`ManifestLock`]-wrapped path
975/// ([`append_manifest_event`]) and failures are recorded as non-fatal
976/// warnings so the executor's outcome always dominates. The third append
977/// (`ActionHalted`) lets a future `grex doctor` correlate crash recovery
978/// with the exact action that halted.
979// feat-m6 B1 wiring added `parallel` + `scheduler` args; the signature
980// now pushes past the 50-LOC per-function lint by one line. Silence
981// that one — the body itself is unchanged in scope.
982#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
983fn run_actions(
984 report: &mut SyncReport,
985 order: &[usize],
986 vars: &VarEnv,
987 workspace: &Path,
988 event_log: &Path,
989 lock_path: &Path,
990 dry_run: bool,
991 prior_lock: &std::collections::HashMap<String, LockEntry>,
992 next_lock: &mut std::collections::HashMap<String, LockEntry>,
993 registry: &Arc<Registry>,
994 pack_type_registry: &Arc<PackTypeRegistry>,
995 only: Option<&GlobSet>,
996 force: bool,
997 parallel: usize,
998 scheduler: &Arc<Scheduler>,
999) {
1000 let plan = PlanExecutor::with_registry(registry.clone());
1001 let fs = FsExecutor::with_registry(registry.clone());
1002 let rt = build_pack_type_runtime(parallel);
1003 let visited_meta = new_visited_meta();
1004 for &id in order {
1005 let Some(node) = report.graph.node(id) else { continue };
1006 let pack_name = node.name.clone();
1007 let pack_path = node.path.clone();
1008 let actions = node.manifest.actions.clone();
1009 let manifest = node.manifest.clone();
1010 let commit_sha = node.commit_sha.clone().unwrap_or_default();
1011 // `--only` filter + skip-on-hash short-circuits colocated in
1012 // `try_skip_or_filter` so this outer loop stays within the
1013 // 50-LOC per-function budget.
1014 if try_skip_or_filter(
1015 report,
1016 only,
1017 &pack_name,
1018 &pack_path,
1019 &actions,
1020 &commit_sha,
1021 workspace,
1022 prior_lock,
1023 next_lock,
1024 dry_run,
1025 force,
1026 ) {
1027 continue;
1028 }
1029 let pack_halted = run_pack_lifecycle(
1030 report,
1031 vars,
1032 workspace,
1033 event_log,
1034 lock_path,
1035 dry_run,
1036 &plan,
1037 &fs,
1038 registry,
1039 pack_type_registry,
1040 &rt,
1041 &pack_name,
1042 &pack_path,
1043 &manifest,
1044 &visited_meta,
1045 scheduler,
1046 );
1047 if pack_halted {
1048 // Route (b) halt-state gating: drop any prior entry for the
1049 // halted pack so the next sync sees no prior hash and
1050 // re-executes from scratch. Successful packs in this same
1051 // run keep their freshly-upserted entries, and packs we did
1052 // not reach keep their prior entries untouched.
1053 next_lock.remove(&pack_name);
1054 return;
1055 }
1056 // Successful pack — record a fresh lockfile entry so the next
1057 // run's skip-on-hash test can succeed. Commit SHA is now plumbed
1058 // from the walker (M4-D): `PackNode::commit_sha` carries the
1059 // resolved HEAD SHA when the pack's working tree is a git
1060 // repository, otherwise an empty string keeps the hash stable.
1061 let actions_hash = compute_actions_hash(&actions, &commit_sha);
1062 upsert_lock_entry(next_lock, &pack_name, &commit_sha, &actions_hash);
1063 }
1064}
1065
1066/// Build the multi-thread tokio runtime used to drive async pack-type
1067/// plugin dispatch. Pack-type plugins expose `async fn` methods via
1068/// `async_trait`, but the sync driver is synchronous end-to-end — we
1069/// block on each plugin future inside the outer action loop. Extracted
1070/// into a standalone helper so the runtime construction does not
1071/// inflate `run_actions` beyond the 50-LOC per-function budget.
1072///
1073/// # Multi-thread rationale (M5-2c)
1074///
1075/// M5-2c enabled real [`crate::plugin::pack_type::MetaPlugin`] recursion
1076/// through [`crate::execute::ExecCtx::pack_type_registry`]. The recursion
1077/// itself is purely `async` / `.await` (no nested `block_on`), but future
1078/// plugin authors may reasonably compose `block_on` calls inside
1079/// lifecycle hooks — and external callers that drive `MetaPlugin` via
1080/// `rt.block_on(...)` within their own runtime would deadlock on a
1081/// current-thread runtime the moment a hook re-enters. A multi-thread
1082/// runtime with a small worker pool lets those re-entries resolve on a
1083/// sibling worker instead of blocking the dispatcher thread.
1084///
1085/// # Worker-thread sizing (feat-m6 H6)
1086///
1087/// The worker pool is sized from the resolved `--parallel` knob so the
1088/// runtime always has enough workers to service every in-flight pack op
1089/// plus at least one sibling for nested `block_on`. Clamped to
1090/// `[2, num_cpus::get()]`: `2` preserves the pre-M6 floor (one driver +
1091/// one sibling so re-entrant hooks never deadlock), and the upper bound
1092/// caps the pool at the host's CPU count so `--parallel 0`
1093/// (unbounded-semantics) does not explode the worker count.
1094fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
1095 let workers = parallel.clamp(2, num_cpus::get().max(2));
1096 tokio::runtime::Builder::new_multi_thread()
1097 .worker_threads(workers)
1098 .enable_all()
1099 .build()
1100 .expect("tokio runtime for pack-type dispatch")
1101}
1102
1103/// Construct a fresh [`MetaVisitedSet`] for one sync run. Walker-driven
1104/// dispatch does not attach it (see `dispatch_pack_type_plugin`), but
1105/// the argument is threaded through so future explicit-install /
1106/// teardown verbs can share the same set shape.
1107fn new_visited_meta() -> MetaVisitedSet {
1108 std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
1109}
1110
1111/// Combined short-circuit helper: `--only` filter + skip-on-hash. Returns
1112/// `true` when the outer loop should `continue` for this pack.
1113///
1114/// Extracted from `run_actions` so that function stays under the
1115/// workspace's 50-LOC per-function lint. Semantics are unchanged; this
1116/// is a pure structural refactor.
1117#[allow(clippy::too_many_arguments)]
1118fn try_skip_or_filter(
1119 report: &mut SyncReport,
1120 only: Option<&GlobSet>,
1121 pack_name: &str,
1122 pack_path: &Path,
1123 actions: &[Action],
1124 commit_sha: &str,
1125 workspace: &Path,
1126 prior_lock: &std::collections::HashMap<String, LockEntry>,
1127 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1128 dry_run: bool,
1129 force: bool,
1130) -> bool {
1131 if skip_for_only_filter(only, pack_name, pack_path, workspace) {
1132 if let Some(prev) = prior_lock.get(pack_name) {
1133 next_lock.insert(pack_name.to_string(), prev.clone());
1134 }
1135 return true;
1136 }
1137 try_skip_pack(
1138 report, pack_name, pack_path, actions, commit_sha, prior_lock, next_lock, dry_run, force,
1139 )
1140}
1141
1142/// Return `true` when `--only` is active and the pack's
1143/// **workspace-relative path** (normalized to forward-slash form) does
1144/// not match any of the registered globs. Name-fallback matching was
1145/// dropped in the M4-D post-review fix bundle: spec §M4 req 6 says
1146/// "pack paths" and cross-platform consistency requires a single
1147/// normalized representation rather than `display()`-formatted strings
1148/// (which use `\\` on Windows and `/` on POSIX — globset treats `\\`
1149/// as a glob-escape, not a path separator). For the root pack whose
1150/// `pack_path` is not under `workspace`, the fallback is to match
1151/// against the absolute path's forward-slash form.
1152fn skip_for_only_filter(
1153 only: Option<&GlobSet>,
1154 pack_name: &str,
1155 pack_path: &Path,
1156 workspace: &Path,
1157) -> bool {
1158 let Some(set) = only else { return false };
1159 let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
1160 let rel_str = rel.to_string_lossy().replace('\\', "/");
1161 let matches = set.is_match(&rel_str);
1162 if !matches {
1163 tracing::info!(
1164 target: "grex::sync",
1165 "skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
1166 );
1167 }
1168 !matches
1169}
1170
1171/// Per-pack lifecycle dispatch. Returns `true` when the sync must halt.
1172///
1173/// M5-1 Stage C replaces the blind `for action in manifest.actions` loop
1174/// with a pack-type-aware dispatch:
1175///
1176/// * [`PackType::Declarative`] retains the per-action execution shape that
1177/// M4 shipped — each action lands its own `ActionStarted` /
1178/// `ActionCompleted` / `ActionHalted` event bracket. The registry is
1179/// still consulted via [`PackTypeRegistry::get`] as a name-oracle so
1180/// mistyped packs fail closed.
1181/// * [`PackType::Meta`] / [`PackType::Scripted`] dispatch once through the
1182/// pack-type plugin's `sync` method (the sync CLI verb is the only
1183/// caller in M5-1; `install` / `update` / `teardown` verbs wire in
1184/// M5-2), returning a single aggregate [`ExecStep`]. A single event
1185/// bracket frames the async call.
1186///
1187/// Declarative is kept on the legacy per-action path because its event log
1188/// semantics (one event per action, per-step rollback context) are exactly
1189/// what plugin authors expect to observe. Unifying declarative under the
1190/// plugin dispatch is M5-2 scope — it requires reshaping the trait surface
1191/// to emit a step stream rather than a single aggregate.
1192#[allow(clippy::too_many_arguments)]
1193fn run_pack_lifecycle(
1194 report: &mut SyncReport,
1195 vars: &VarEnv,
1196 workspace: &Path,
1197 event_log: &Path,
1198 lock_path: &Path,
1199 dry_run: bool,
1200 plan: &PlanExecutor,
1201 fs: &FsExecutor,
1202 registry: &Arc<Registry>,
1203 pack_type_registry: &Arc<PackTypeRegistry>,
1204 rt: &tokio::runtime::Runtime,
1205 pack_name: &str,
1206 pack_path: &Path,
1207 manifest: &crate::pack::PackManifest,
1208 visited_meta: &MetaVisitedSet,
1209 scheduler: &Arc<Scheduler>,
1210) -> bool {
1211 let type_tag = manifest.r#type.as_str();
1212 // Name-oracle check: every pack type must be registered. Unknown
1213 // pack types halt the pack the same way M4 halted unknown actions.
1214 if pack_type_registry.get(type_tag).is_none() {
1215 let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1216 record_action_err(report, event_log, lock_path, pack_name, 0, "pack-type", err);
1217 return true;
1218 }
1219 match manifest.r#type {
1220 crate::pack::PackType::Declarative => run_declarative_actions(
1221 report,
1222 vars,
1223 workspace,
1224 event_log,
1225 lock_path,
1226 dry_run,
1227 plan,
1228 fs,
1229 pack_name,
1230 pack_path,
1231 manifest,
1232 &manifest.actions,
1233 scheduler,
1234 ),
1235 crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
1236 report,
1237 vars,
1238 workspace,
1239 event_log,
1240 lock_path,
1241 registry,
1242 pack_type_registry,
1243 rt,
1244 pack_name,
1245 pack_path,
1246 manifest,
1247 type_tag,
1248 visited_meta,
1249 scheduler,
1250 ),
1251 }
1252}
1253
1254/// Run a declarative pack's actions sequentially. Preserves the M4
1255/// per-action event-log bracket (`ActionStarted` → `ActionCompleted` |
1256/// `ActionHalted`). Returns `true` when the sync must halt.
1257#[allow(clippy::too_many_arguments)]
1258fn run_declarative_actions(
1259 report: &mut SyncReport,
1260 vars: &VarEnv,
1261 workspace: &Path,
1262 event_log: &Path,
1263 lock_path: &Path,
1264 dry_run: bool,
1265 plan: &PlanExecutor,
1266 fs: &FsExecutor,
1267 pack_name: &str,
1268 pack_path: &Path,
1269 manifest: &crate::pack::PackManifest,
1270 actions: &[Action],
1271 scheduler: &Arc<Scheduler>,
1272) -> bool {
1273 // `apply_gitignore` is called per-lifecycle by each PackTypePlugin
1274 // for meta/scripted, and here for declarative (which bypasses the
1275 // plugin in `sync::run`'s per-action driver). Keeping plugins as
1276 // the single apply site everywhere else means the declarative
1277 // per-action path is the only code outside the PackTypePlugin
1278 // surface that needs a direct apply call.
1279 if !dry_run {
1280 let ctx = ExecCtx::new(vars, pack_path, workspace)
1281 .with_platform(Platform::current())
1282 .with_scheduler(scheduler);
1283 if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
1284 record_action_err(report, event_log, lock_path, pack_name, 0, "gitignore", e);
1285 return true;
1286 }
1287 }
1288 for (idx, action) in actions.iter().enumerate() {
1289 let ctx = ExecCtx::new(vars, pack_path, workspace)
1290 .with_platform(Platform::current())
1291 .with_scheduler(scheduler);
1292 let action_tag = action_kind_tag(action);
1293 append_manifest_event(
1294 event_log,
1295 lock_path,
1296 &Event::ActionStarted {
1297 ts: Utc::now(),
1298 pack: pack_name.to_string(),
1299 action_idx: idx,
1300 action_name: action_tag.to_string(),
1301 },
1302 &mut report.event_log_warnings,
1303 );
1304 let step_result =
1305 if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
1306 if !record_action_outcome(
1307 report,
1308 event_log,
1309 lock_path,
1310 pack_name,
1311 idx,
1312 action_tag,
1313 step_result,
1314 ) {
1315 return true;
1316 }
1317 }
1318 false
1319}
1320
1321/// Dispatch a pack-type plugin (meta / scripted) through the async
1322/// registry. Brackets the call with a single `ActionStarted` /
1323/// `ActionCompleted` / `ActionHalted` trio at index 0. Returns `true`
1324/// when the sync must halt.
1325#[allow(clippy::too_many_arguments)]
1326fn dispatch_pack_type_plugin(
1327 report: &mut SyncReport,
1328 vars: &VarEnv,
1329 workspace: &Path,
1330 event_log: &Path,
1331 lock_path: &Path,
1332 registry: &Arc<Registry>,
1333 pack_type_registry: &Arc<PackTypeRegistry>,
1334 rt: &tokio::runtime::Runtime,
1335 pack_name: &str,
1336 pack_path: &Path,
1337 manifest: &crate::pack::PackManifest,
1338 type_tag: &'static str,
1339 visited_meta: &MetaVisitedSet,
1340 scheduler: &Arc<Scheduler>,
1341) -> bool {
1342 // NB: `visited_meta` is intentionally NOT attached to the ctx here.
1343 // The sync driver already walks children in post-order via the tree
1344 // walker; attaching the visited set would trigger MetaPlugin's
1345 // real-recursion branch and cause double dispatch (walker runs child
1346 // packs as their own graph nodes, then MetaPlugin would recurse into
1347 // them again). The `visited_meta` parameter is kept on the argument
1348 // list so future explicit-install / teardown verbs that invoke
1349 // MetaPlugin directly can share the same set shape.
1350 let _ = visited_meta;
1351 let ctx = ExecCtx::new(vars, pack_path, workspace)
1352 .with_platform(Platform::current())
1353 .with_registry(registry)
1354 .with_pack_type_registry(pack_type_registry)
1355 .with_scheduler(scheduler);
1356 append_manifest_event(
1357 event_log,
1358 lock_path,
1359 &Event::ActionStarted {
1360 ts: Utc::now(),
1361 pack: pack_name.to_string(),
1362 action_idx: 0,
1363 action_name: type_tag.to_string(),
1364 },
1365 &mut report.event_log_warnings,
1366 );
1367 // SAFETY: `get` just confirmed the plugin is registered for
1368 // `type_tag`, so this unwrap cannot panic under the matched arm.
1369 let plugin = pack_type_registry
1370 .get(type_tag)
1371 .expect("pack-type plugin must be registered (guarded above)");
1372 // feat-m6 CI fix — establish a task-local tier stack frame for every
1373 // async dispatch. Without this, `TierGuard::push` (which runs inside
1374 // the plugin lifecycle and may span `.await` / thread hops under the
1375 // multi-thread runtime) has no enforcement frame to push into.
1376 let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
1377 !record_action_outcome(report, event_log, lock_path, pack_name, 0, type_tag, step_result)
1378}
1379
1380/// Decide whether `pack_name` can be short-circuited via a lockfile
1381/// hash match. When the prior hash matches the freshly-computed hash,
1382/// emit a single [`ExecResult::Skipped`] step and carry the prior
1383/// lockfile entry forward unchanged. Returns `true` when the pack was
1384/// skipped.
1385#[allow(clippy::too_many_arguments)]
1386fn try_skip_pack(
1387 report: &mut SyncReport,
1388 pack_name: &str,
1389 pack_path: &Path,
1390 actions: &[Action],
1391 commit_sha: &str,
1392 prior_lock: &std::collections::HashMap<String, LockEntry>,
1393 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1394 dry_run: bool,
1395 force: bool,
1396) -> bool {
1397 if dry_run || force {
1398 // Dry runs must always produce the planned-step transcript so
1399 // authors can see what `sync` *would* do. `--force` is the
1400 // operator's explicit opt-out from the hash short-circuit.
1401 return false;
1402 }
1403 let Some(prior) = prior_lock.get(pack_name) else {
1404 return false;
1405 };
1406 let hash = compute_actions_hash(actions, commit_sha);
1407 if prior.actions_hash != hash {
1408 return false;
1409 }
1410 let skipped_step = ExecStep {
1411 action_name: Cow::Borrowed("pack"),
1412 result: ExecResult::Skipped {
1413 pack_path: pack_path.to_path_buf(),
1414 actions_hash: hash.clone(),
1415 },
1416 // W4 landed `StepKind::PackSkipped` as the dedicated pack-level
1417 // short-circuit detail; we use it here instead of the prior
1418 // `Require { Satisfied, Skip }` proxy so renderers and consumers
1419 // can match on a single, purpose-built variant.
1420 details: StepKind::PackSkipped { actions_hash: hash },
1421 };
1422 report.steps.push(SyncStep {
1423 pack: pack_name.to_string(),
1424 action_idx: 0,
1425 exec_step: skipped_step,
1426 });
1427 // Carry the prior entry forward so the next-lock snapshot stays
1428 // consistent with what's on disk.
1429 next_lock.insert(pack_name.to_string(), prior.clone());
1430 true
1431}
1432
1433/// Insert or update a lockfile entry for `pack_name` with `actions_hash`.
1434///
1435/// Stores `commit_sha` verbatim — including the empty string when the
1436/// pack is not a git working tree or the HEAD probe failed.
1437/// `actions_hash` is computed over the same `commit_sha`, so the two
1438/// fields stay internally consistent: if probing starts returning a
1439/// non-empty SHA on the next run, the hash differs and the skip is
1440/// correctly invalidated. The prior-preserve carve-out that was
1441/// introduced in M4-D was unsound (hash-vs-sha drift) and is removed
1442/// by the M4-D post-review fix bundle; see spec §M4 req 4a.
1443fn upsert_lock_entry(
1444 next_lock: &mut std::collections::HashMap<String, LockEntry>,
1445 pack_name: &str,
1446 commit_sha: &str,
1447 actions_hash: &str,
1448) {
1449 let installed_at = Utc::now();
1450 let entry = next_lock.get(pack_name).map_or_else(
1451 || LockEntry {
1452 id: pack_name.to_string(),
1453 sha: commit_sha.to_string(),
1454 branch: String::new(),
1455 installed_at,
1456 actions_hash: actions_hash.to_string(),
1457 schema_version: "1".to_string(),
1458 },
1459 |prev| LockEntry {
1460 installed_at,
1461 actions_hash: actions_hash.to_string(),
1462 sha: commit_sha.to_string(),
1463 ..prev.clone()
1464 },
1465 );
1466 next_lock.insert(pack_name.to_string(), entry);
1467}
1468
1469/// Record one action outcome into `report` + event log. Returns `false`
1470/// when the run must halt (on error); `true` otherwise.
1471fn record_action_outcome(
1472 report: &mut SyncReport,
1473 event_log: &Path,
1474 lock_path: &Path,
1475 pack_name: &str,
1476 idx: usize,
1477 action_tag: &'static str,
1478 step_result: Result<ExecStep, ExecError>,
1479) -> bool {
1480 match step_result {
1481 Ok(step) => {
1482 record_action_ok(report, event_log, lock_path, pack_name, idx, step);
1483 true
1484 }
1485 Err(e) => {
1486 record_action_err(report, event_log, lock_path, pack_name, idx, action_tag, e);
1487 false
1488 }
1489 }
1490}
1491
1492/// Success-path bookkeeping: emit legacy `Sync` summary + `ActionCompleted`
1493/// audit event, then push the step onto the report.
1494fn record_action_ok(
1495 report: &mut SyncReport,
1496 event_log: &Path,
1497 lock_path: &Path,
1498 pack_name: &str,
1499 idx: usize,
1500 step: ExecStep,
1501) {
1502 append_step_event(event_log, lock_path, pack_name, &step, &mut report.event_log_warnings);
1503 append_manifest_event(
1504 event_log,
1505 lock_path,
1506 &Event::ActionCompleted {
1507 ts: Utc::now(),
1508 pack: pack_name.to_string(),
1509 action_idx: idx,
1510 result_summary: format!("{:?}", step.result),
1511 },
1512 &mut report.event_log_warnings,
1513 );
1514 report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
1515}
1516
1517/// Halt-path bookkeeping: emit `ActionHalted` audit event, then stash the
1518/// rich `HaltedContext` into `report.halted`.
1519fn record_action_err(
1520 report: &mut SyncReport,
1521 event_log: &Path,
1522 lock_path: &Path,
1523 pack_name: &str,
1524 idx: usize,
1525 action_tag: &'static str,
1526 e: ExecError,
1527) {
1528 let error_summary = truncate_error_summary(&e);
1529 append_manifest_event(
1530 event_log,
1531 lock_path,
1532 &Event::ActionHalted {
1533 ts: Utc::now(),
1534 pack: pack_name.to_string(),
1535 action_idx: idx,
1536 action_name: action_tag.to_string(),
1537 error_summary,
1538 },
1539 &mut report.event_log_warnings,
1540 );
1541 let recovery_hint = recovery_hint_for(&e);
1542 report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
1543 pack: pack_name.to_string(),
1544 action_idx: idx,
1545 action_name: action_tag.to_string(),
1546 error: e,
1547 recovery_hint,
1548 })));
1549}
1550
1551/// Short stable kind-tag for an [`crate::pack::Action`]. Mirrors the
1552/// `ACTION_*` constants used by [`crate::execute::step`] so the audit log
1553/// stays uniform.
1554fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
1555 use crate::pack::Action;
1556 match action {
1557 Action::Symlink(_) => "symlink",
1558 Action::Unlink(_) => "unlink",
1559 Action::Env(_) => "env",
1560 Action::Mkdir(_) => "mkdir",
1561 Action::Rmdir(_) => "rmdir",
1562 Action::Require(_) => "require",
1563 Action::When(_) => "when",
1564 Action::Exec(_) => "exec",
1565 }
1566}
1567
1568/// Produce a bounded human summary of an [`ExecError`] for
1569/// [`Event::ActionHalted::error_summary`]. Keeps the written JSONL line
1570/// from pathological blowup when captured stderr is large.
1571fn truncate_error_summary(err: &ExecError) -> String {
1572 let mut s = err.to_string();
1573 if s.len() > ACTION_ERROR_SUMMARY_MAX {
1574 s.truncate(ACTION_ERROR_SUMMARY_MAX);
1575 s.push_str("…[truncated]");
1576 }
1577 s
1578}
1579
1580/// Best-effort recovery hint for common [`ExecError`] shapes. Returns
1581/// `None` when no generic advice applies; the error's own `Display`
1582/// output is already shown by the `Halted` variant's format string.
1583fn recovery_hint_for(err: &ExecError) -> Option<String> {
1584 match err {
1585 ExecError::SymlinkDestOccupied { .. } => Some(
1586 "set `backup: true` on the symlink action, or remove the conflicting entry by hand"
1587 .into(),
1588 ),
1589 ExecError::SymlinkPrivilegeDenied { .. } => {
1590 Some("enable Windows Developer Mode or re-run grex as administrator".into())
1591 }
1592 ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
1593 Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
1594 }
1595 ExecError::RmdirNotEmpty { .. } => {
1596 Some("set `force: true` on the rmdir action to recurse".into())
1597 }
1598 ExecError::EnvPersistenceDenied { .. } => {
1599 Some("re-run elevated (Machine scope needs admin)".into())
1600 }
1601 _ => None,
1602 }
1603}
1604
1605/// Append one [`Event::Sync`] record summarising an [`ExecStep`].
1606///
1607/// Failures log a warning and are recorded in the report's
1608/// `event_log_warnings`; they do not abort the sync (spec: event-log write
1609/// failures are non-fatal).
1610///
1611/// # Concurrency
1612///
1613/// The append is serialized through a [`ManifestLock`] held across the
1614/// write. The lock is acquired **per action** (not once across the full
1615/// traversal) so cooperating grex processes can observe mid-progress log
1616/// state between actions; fd-lock acquisition is cheap on modern kernels
1617/// and sync runs are dominated by executor side effects, not lock waits.
1618/// This closes the bypass gap surfaced by the M3 concurrency review where
1619/// `append_event` was called without any cross-process serialisation.
1620fn append_step_event(
1621 log: &Path,
1622 lock_path: &Path,
1623 pack: &str,
1624 step: &ExecStep,
1625 warnings: &mut Vec<String>,
1626) {
1627 let summary = format!("{}:{:?}", step.action_name, step.result);
1628 let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
1629 if let Err(e) = append_event_locked(log, lock_path, &event) {
1630 tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
1631 warnings.push(format!("{}: {e}", log.display()));
1632 }
1633 // Schema version is recorded once at the manifest level by existing
1634 // manifest code; this stub uses the constant to keep a single source of
1635 // truth for forward-compat.
1636 let _ = SCHEMA_VERSION;
1637}
1638
1639/// Append a single [`Event`] under the shared [`ManifestLock`] path.
1640/// Failures are logged and recorded as non-fatal warnings — the spec
1641/// marks event-log write failures as non-aborting so a transient disk
1642/// error must not kill a sync mid-stream.
1643fn append_manifest_event(log: &Path, lock_path: &Path, event: &Event, warnings: &mut Vec<String>) {
1644 if let Err(e) = append_event_locked(log, lock_path, event) {
1645 tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
1646 warnings.push(format!("{}: {e}", log.display()));
1647 }
1648}
1649
1650/// Acquire [`ManifestLock`] and append one event. Parent dir of the log is
1651/// created lazily on first write.
1652fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
1653 if let Some(parent) = log.parent() {
1654 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1655 }
1656 if let Some(parent) = lock_path.parent() {
1657 std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
1658 }
1659 let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
1660 lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
1661}
1662
1663/// Re-export a cheap helper so CLI renderers can label halted steps by node
1664/// name without reaching into the graph twice.
1665#[must_use]
1666pub fn pack_display_name(node: &PackNode) -> &str {
1667 &node.name
1668}
1669
1670/// Run a full teardown over the pack tree rooted at `pack_root`.
1671///
1672/// Mirrors [`run`] but invokes
1673/// [`crate::plugin::PackTypePlugin::teardown`] on every pack in
1674/// **reverse** post-order so a parent tears down before its children
1675/// (the inverse of install). Children composed later by an author
1676/// consequently teardown earlier, matching the declarative
1677/// auto-reverse contract (R-M5-11).
1678///
1679/// All other concerns are identical to [`run`]: workspace lock, plan-
1680/// phase validators, lockfile update skipped (teardown does not
1681/// write a `actions_hash` forward), and event-log bracketing.
1682/// Teardown does NOT consult the lockfile skip-on-hash shortcut — a
1683/// user explicitly asked to remove the pack, so we always dispatch.
1684///
1685/// # Errors
1686///
1687/// Returns the first error that halts the pipeline — see [`SyncError`].
1688///
1689/// See [`run`] for the `cancel` contract — feat-m7-1 stage 2 threads
1690/// the parameter through teardown for parity; stages 3-4 add the polls.
1691pub fn teardown(
1692 pack_root: &Path,
1693 opts: &SyncOptions,
1694 cancel: &CancellationToken,
1695) -> Result<SyncReport, SyncError> {
1696 let _ = cancel;
1697 let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
1698 ensure_workspace_dir(&workspace)?;
1699 let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
1700 let _ws_guard = match ws_lock.try_acquire() {
1701 Ok(Some(g)) => g,
1702 Ok(None) => {
1703 return Err(SyncError::WorkspaceBusy {
1704 workspace: workspace.clone(),
1705 lock_path: ws_lock_path,
1706 });
1707 }
1708 Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
1709 };
1710
1711 let graph =
1712 walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
1713 let prep = prepare_run_context(pack_root, &graph, &workspace)?;
1714
1715 let mut report = SyncReport {
1716 graph,
1717 steps: Vec::new(),
1718 halted: None,
1719 event_log_warnings: Vec::new(),
1720 pre_run_recovery: prep.pre_run_recovery,
1721 // teardown does not run the legacy-layout migration — by the time
1722 // a user is tearing down, the layout has already been migrated
1723 // (or was never legacy in the first place). Surfacing an empty
1724 // list keeps the report shape symmetric with `run()`.
1725 workspace_migrations: Vec::new(),
1726 };
1727
1728 // feat-m6 B1: mirror `run()` — resolve `--parallel`, build a
1729 // Scheduler, thread it through every `ExecCtx` the teardown path
1730 // constructs. Teardown is the other user-facing verb that owns a
1731 // runtime, so it gets the same wiring.
1732 let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
1733 let scheduler = Arc::new(Scheduler::new(resolved_parallel));
1734 run_teardown(
1735 &mut report,
1736 &prep.order,
1737 &prep.vars,
1738 &workspace,
1739 &prep.event_log,
1740 &prep.lock_path,
1741 &prep.registry,
1742 &prep.pack_type_registry,
1743 resolved_parallel,
1744 &scheduler,
1745 );
1746 Ok(report)
1747}
1748
1749/// Dispatch `teardown` for every pack in **reverse** post-order.
1750/// Declarative packs go through [`crate::plugin::PackTypePlugin`]
1751/// rather than the per-action M4 path because the trait's
1752/// auto-reverse / explicit-block logic must compose with the
1753/// registry; going through the per-action path would mean
1754/// re-implementing inverse synthesis in the sync loop.
1755#[allow(clippy::too_many_arguments)]
1756fn run_teardown(
1757 report: &mut SyncReport,
1758 order: &[usize],
1759 vars: &VarEnv,
1760 workspace: &Path,
1761 event_log: &Path,
1762 lock_path: &Path,
1763 registry: &Arc<Registry>,
1764 pack_type_registry: &Arc<PackTypeRegistry>,
1765 parallel: usize,
1766 scheduler: &Arc<Scheduler>,
1767) {
1768 let rt = build_pack_type_runtime(parallel);
1769 // Reverse post-order: root first, then children. Pack-type plugin
1770 // teardown methods reverse their own children/actions, so the
1771 // outer loop only flips the inter-pack order.
1772 for &id in order.iter().rev() {
1773 let Some(node) = report.graph.node(id) else { continue };
1774 let pack_name = node.name.clone();
1775 let pack_path = node.path.clone();
1776 let manifest = node.manifest.clone();
1777 let type_tag = manifest.r#type.as_str();
1778 if pack_type_registry.get(type_tag).is_none() {
1779 let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
1780 record_action_err(report, event_log, lock_path, &pack_name, 0, "pack-type", err);
1781 return;
1782 }
1783 let ctx = ExecCtx::new(vars, &pack_path, workspace)
1784 .with_platform(Platform::current())
1785 .with_registry(registry)
1786 .with_pack_type_registry(pack_type_registry)
1787 .with_scheduler(scheduler);
1788 append_manifest_event(
1789 event_log,
1790 lock_path,
1791 &Event::ActionStarted {
1792 ts: Utc::now(),
1793 pack: pack_name.clone(),
1794 action_idx: 0,
1795 action_name: type_tag.to_string(),
1796 },
1797 &mut report.event_log_warnings,
1798 );
1799 let plugin = pack_type_registry
1800 .get(type_tag)
1801 .expect("pack-type plugin must be registered (guarded above)");
1802 // feat-m6 CI fix — see dispatch_pack_type note.
1803 let step_result =
1804 rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
1805 if !record_action_outcome(
1806 report,
1807 event_log,
1808 lock_path,
1809 &pack_name,
1810 0,
1811 type_tag,
1812 step_result,
1813 ) {
1814 return;
1815 }
1816 }
1817}
1818
1819/// Test-only hook: append one [`Event::Sync`] through the same
1820/// [`ManifestLock`]-serialised path the sync driver uses.
1821///
1822/// Exposed so integration tests under `tests/` can exercise the locked
1823/// append helper without spinning up a full pack tree. Not intended for
1824/// downstream consumers — the signature may change without notice.
1825#[doc(hidden)]
1826pub fn __test_append_sync_event(
1827 log: &Path,
1828 lock_path: &Path,
1829 pack: &str,
1830 action_name: &str,
1831) -> Result<(), String> {
1832 let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
1833 append_event_locked(log, lock_path, &event)
1834}
1835
1836// ----------------------------------------------------------------------
1837// PR E — pre-run teardown scan
1838// ----------------------------------------------------------------------
1839
1840/// One `ActionStarted` event in the manifest log that has no matching
1841/// `ActionCompleted` or `ActionHalted` peer.
1842///
1843/// Dangling starts are the primary crash signal: the process wrote the
1844/// pre-action event, then died before the executor returned. Callers
1845/// should surface these to the operator (diagnostics only this PR; a
1846/// future `grex doctor` verb will act on them).
1847#[non_exhaustive]
1848#[derive(Debug, Clone, PartialEq, Eq)]
1849pub struct DanglingStart {
1850 /// Pack that owned the halted action.
1851 pub pack: String,
1852 /// 0-based action index within the pack.
1853 pub action_idx: usize,
1854 /// Short action kind tag.
1855 pub action_name: String,
1856 /// Timestamp the `ActionStarted` event was written.
1857 pub started_at: DateTime<Utc>,
1858}
1859
1860/// Summary of teardown artifacts found under a pack root before a sync
1861/// begins.
1862///
1863/// Built by [`scan_recovery`]. All fields are diagnostic; the sync
1864/// proceeds regardless of what the scan finds.
1865#[non_exhaustive]
1866#[derive(Debug, Clone, Default, PartialEq, Eq)]
1867pub struct RecoveryReport {
1868 /// `<dst>.grex.bak` files sitting next to a non-symlink or missing
1869 /// original (symlink-action rollback orphan).
1870 pub orphan_backups: Vec<PathBuf>,
1871 /// `<path>.grex.bak.<timestamp>` tombstones left by `rmdir` with
1872 /// `backup: true`.
1873 pub orphan_tombstones: Vec<PathBuf>,
1874 /// `ActionStarted` events in the log with no matching
1875 /// `ActionCompleted`/`ActionHalted`.
1876 pub dangling_starts: Vec<DanglingStart>,
1877}
1878
1879impl RecoveryReport {
1880 /// `true` when the scan found nothing worth reporting.
1881 #[must_use]
1882 pub fn is_empty(&self) -> bool {
1883 self.orphan_backups.is_empty()
1884 && self.orphan_tombstones.is_empty()
1885 && self.dangling_starts.is_empty()
1886 }
1887}
1888
1889/// Walk `workspace` and the manifest log to find crash-recovery artifacts.
1890///
1891/// Inspects:
1892///
1893/// * `workspace` for `.grex.bak` orphans and timestamped `.grex.bak.<ts>`
1894/// tombstones. The workspace IS where children materialise (whether
1895/// the default flat-sibling layout under the pack root, or an
1896/// explicit `--workspace` override directory) so this single bounded
1897/// walk covers every backup site.
1898/// * `event_log` (the manifest JSONL) for `ActionStarted` entries that
1899/// have no matching `ActionCompleted` / `ActionHalted` successor.
1900///
1901/// Non-blocking: scan errors are swallowed to an empty report so a
1902/// half-readable directory cannot kill a sync that would otherwise
1903/// succeed. Call sites that want to surface scan failures should read
1904/// the manifest directly.
1905///
1906/// Pre-`v1.1.0` post-review fix this anchored at `pack_root_dir(pack_root)`,
1907/// which missed every backup under a `--workspace` override.
1908///
1909/// # Errors
1910///
1911/// Returns [`SyncError::Validation`] only when the manifest read itself
1912/// reports corruption. Filesystem traversal errors are swallowed.
1913pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
1914 let mut report = RecoveryReport::default();
1915 walk_for_backups(workspace, &mut report);
1916 if event_log.exists() {
1917 match read_all(event_log) {
1918 Ok(events) => {
1919 report.dangling_starts = collect_dangling_starts(&events);
1920 }
1921 Err(e) => {
1922 return Err(SyncError::Validation {
1923 errors: vec![PackValidationError::DependsOnUnsatisfied {
1924 pack: "<event-log>".into(),
1925 required: e.to_string(),
1926 }],
1927 });
1928 }
1929 }
1930 }
1931 Ok(report)
1932}
1933
1934/// Shallow directory walker (bounded depth = 6) that categorizes
1935/// `.grex.bak` and `.grex.bak.<ts>` filenames into the appropriate
1936/// report slot. Depth-limited so a pathological workspace with a deep
1937/// tree cannot stall the scan; realistic layouts are well under six
1938/// levels.
1939fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
1940 walk_for_backups_inner(root, report, 0);
1941}
1942
1943fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
1944 const MAX_DEPTH: u32 = 6;
1945 if depth > MAX_DEPTH {
1946 return;
1947 }
1948 let Ok(entries) = std::fs::read_dir(dir) else { return };
1949 for entry_result in entries {
1950 let entry = match entry_result {
1951 Ok(e) => e,
1952 Err(e) => {
1953 tracing::warn!(
1954 target: "grex::sync::recover",
1955 "skipping unreadable entry under `{}`: {e}",
1956 dir.display(),
1957 );
1958 continue;
1959 }
1960 };
1961 let path = entry.path();
1962 let name = entry.file_name();
1963 let Some(name_str) = name.to_str() else { continue };
1964 if name_str.ends_with(".grex.bak") {
1965 report.orphan_backups.push(path.clone());
1966 continue;
1967 }
1968 if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
1969 // `rsplit_once` returns `(prefix, suffix)`; suffix is the
1970 // timestamp chunk. Accept any non-empty suffix — the exact
1971 // timestamp shape is `fs_executor` internal.
1972 if !rest.1.is_empty() {
1973 report.orphan_tombstones.push(path.clone());
1974 continue;
1975 }
1976 }
1977 // Recurse only into real directories (not symlinks, to avoid
1978 // traversing into the workspace's cloned repos via aliased
1979 // paths). `entry.file_type()` does NOT follow symlinks (unlike
1980 // `entry.metadata()` which would dereference and report the
1981 // target's type — defeating the very check this guards). The
1982 // symlink-skip is also explicit so the intent is recoverable
1983 // from the source: backup-recovery never crosses a symlink.
1984 let Ok(ft) = entry.file_type() else { continue };
1985 if ft.is_symlink() {
1986 continue;
1987 }
1988 if ft.is_dir() {
1989 walk_for_backups_inner(&path, report, depth + 1);
1990 }
1991 }
1992}
1993
1994/// Reduce an event stream to a list of `ActionStarted` records with no
1995/// matching terminator.
1996///
1997/// Matching is positional per `(pack, action_idx)`: a later
1998/// `ActionCompleted` or `ActionHalted` with the same key clears the
1999/// entry. Whatever remains in the map after the pass is dangling.
2000fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
2001 use std::collections::HashMap;
2002 let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
2003 for ev in events {
2004 match ev {
2005 Event::ActionStarted { ts, pack, action_idx, action_name } => {
2006 open.insert(
2007 (pack.clone(), *action_idx),
2008 DanglingStart {
2009 pack: pack.clone(),
2010 action_idx: *action_idx,
2011 action_name: action_name.clone(),
2012 started_at: *ts,
2013 },
2014 );
2015 }
2016 Event::ActionCompleted { pack, action_idx, .. }
2017 | Event::ActionHalted { pack, action_idx, .. } => {
2018 open.remove(&(pack.clone(), *action_idx));
2019 }
2020 _ => {}
2021 }
2022 }
2023 let mut out: Vec<DanglingStart> = open.into_values().collect();
2024 out.sort_by_key(|a| a.started_at);
2025 out
2026}