use std::borrow::Cow;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use globset::{Glob, GlobSet, GlobSetBuilder};
use thiserror::Error;
use tokio_util::sync::CancellationToken;
use crate::execute::{
ActionExecutor, ExecCtx, ExecError, ExecResult, ExecStep, FsExecutor, MetaVisitedSet,
PlanExecutor, Platform, StepKind,
};
use crate::fs::{ManifestLock, ScopedLock};
use crate::git::GixBackend;
use crate::lockfile::{
compute_actions_hash, read_lockfile, write_lockfile, LockEntry, LockfileError,
};
use crate::manifest::{append_event, read_all, Event, ACTION_ERROR_SUMMARY_MAX, SCHEMA_VERSION};
use crate::pack::{Action, PackValidationError};
use crate::plugin::{PackTypeRegistry, Registry};
use crate::scheduler::Scheduler;
use crate::tree::{FsPackLoader, PackGraph, PackNode, TreeError, Walker};
use crate::vars::VarEnv;
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct SyncOptions {
pub dry_run: bool,
pub validate: bool,
pub workspace: Option<PathBuf>,
pub ref_override: Option<String>,
pub only_patterns: Option<Vec<String>>,
pub force: bool,
pub parallel: Option<usize>,
}
impl Default for SyncOptions {
fn default() -> Self {
Self {
dry_run: false,
validate: true,
workspace: None,
ref_override: None,
only_patterns: None,
force: false,
parallel: None,
}
}
}
fn compile_only_globset(patterns: Option<&Vec<String>>) -> Result<Option<GlobSet>, SyncError> {
let Some(pats) = patterns else { return Ok(None) };
if pats.is_empty() {
return Ok(None);
}
let mut builder = GlobSetBuilder::new();
for p in pats {
let glob = Glob::new(p)
.map_err(|source| SyncError::InvalidOnlyGlob { pattern: p.clone(), source })?;
builder.add(glob);
}
let set = builder
.build()
.map_err(|source| SyncError::InvalidOnlyGlob { pattern: pats.join(","), source })?;
Ok(Some(set))
}
impl SyncOptions {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_dry_run(mut self, dry_run: bool) -> Self {
self.dry_run = dry_run;
self
}
#[must_use]
pub fn with_validate(mut self, validate: bool) -> Self {
self.validate = validate;
self
}
#[must_use]
pub fn with_workspace(mut self, workspace: Option<PathBuf>) -> Self {
self.workspace = workspace;
self
}
#[must_use]
pub fn with_ref_override(mut self, ref_override: Option<String>) -> Self {
self.ref_override = ref_override;
self
}
#[must_use]
pub fn with_only_patterns(mut self, patterns: Option<Vec<String>>) -> Self {
self.only_patterns = patterns;
self
}
#[must_use]
pub fn with_force(mut self, force: bool) -> Self {
self.force = force;
self
}
#[must_use]
pub fn with_parallel(mut self, parallel: Option<usize>) -> Self {
self.parallel = parallel;
self
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub struct SyncStep {
pub pack: String,
pub action_idx: usize,
pub exec_step: ExecStep,
}
#[non_exhaustive]
#[derive(Debug)]
pub struct SyncReport {
pub graph: PackGraph,
pub steps: Vec<SyncStep>,
pub halted: Option<SyncError>,
pub event_log_warnings: Vec<String>,
pub pre_run_recovery: Option<RecoveryReport>,
pub workspace_migrations: Vec<WorkspaceMigration>,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct WorkspaceMigration {
pub from: PathBuf,
pub to: PathBuf,
pub outcome: MigrationOutcome,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MigrationOutcome {
Migrated,
SkippedBothExist,
SkippedDestOccupied,
Failed { error: String },
}
#[non_exhaustive]
#[derive(Debug)]
pub struct HaltedContext {
pub pack: String,
pub action_idx: usize,
pub action_name: String,
pub error: ExecError,
pub recovery_hint: Option<String>,
}
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum SyncError {
#[error("tree walk failed: {0}")]
Tree(#[from] TreeError),
#[error("validation failed: {errors:?}")]
Validation {
errors: Vec<PackValidationError>,
},
#[error("action execution failed: {0}")]
Exec(#[from] ExecError),
#[error(
"sync halted at pack `{}` action #{} ({}): {}",
.0.pack, .0.action_idx, .0.action_name, .0.error
)]
Halted(Box<HaltedContext>),
#[error(
"workspace `{workspace}` is locked by another grex process (remove {lock_path:?} if stale)"
)]
WorkspaceBusy {
workspace: PathBuf,
lock_path: PathBuf,
},
#[error("lockfile `{path}` failed to load: {source}")]
Lockfile {
path: PathBuf,
#[source]
source: LockfileError,
},
#[error("invalid --only glob `{pattern}`: {source}")]
InvalidOnlyGlob {
pattern: String,
#[source]
source: globset::Error,
},
}
impl Clone for SyncError {
fn clone(&self) -> Self {
match self {
Self::Tree(e) => Self::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: "<tree>".into(),
required: e.to_string(),
}],
},
Self::Validation { errors } => Self::Validation { errors: errors.clone() },
Self::Exec(e) => Self::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: "<exec>".into(),
required: e.to_string(),
}],
},
Self::Halted(ctx) => Self::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: ctx.pack.clone(),
required: format!(
"action #{} ({}): {}",
ctx.action_idx, ctx.action_name, ctx.error
),
}],
},
Self::WorkspaceBusy { workspace, lock_path } => {
Self::WorkspaceBusy { workspace: workspace.clone(), lock_path: lock_path.clone() }
}
Self::Lockfile { path, source } => Self::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: "<lockfile>".into(),
required: format!("{}: {source}", path.display()),
}],
},
Self::InvalidOnlyGlob { pattern, source } => Self::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: "<only-glob>".into(),
required: format!("{pattern}: {source}"),
}],
},
}
}
}
pub fn run(
pack_root: &Path,
opts: &SyncOptions,
cancel: &CancellationToken,
) -> Result<SyncReport, SyncError> {
let _ = cancel;
let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
ensure_workspace_dir(&workspace)?;
let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
let _ws_guard = match ws_lock.try_acquire() {
Ok(Some(g)) => g,
Ok(None) => {
return Err(SyncError::WorkspaceBusy {
workspace: workspace.clone(),
lock_path: ws_lock_path,
});
}
Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
};
let only_set = compile_only_globset(opts.only_patterns.as_ref())?;
let workspace_migrations = migrate_legacy_workspace(pack_root);
let graph =
walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
let prep = prepare_run_context(pack_root, &graph, &workspace)?;
log_force_flag(opts.force);
let mut report = SyncReport {
graph,
steps: Vec::new(),
halted: None,
event_log_warnings: Vec::new(),
pre_run_recovery: prep.pre_run_recovery,
workspace_migrations,
};
let mut next_lock = prep.prior_lock.clone();
let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
let scheduler = Arc::new(Scheduler::new(resolved_parallel));
run_actions(
&mut report,
&prep.order,
&prep.vars,
&workspace,
&prep.event_log,
&prep.lock_path,
opts.dry_run,
&prep.prior_lock,
&mut next_lock,
&prep.registry,
&prep.pack_type_registry,
only_set.as_ref(),
opts.force,
resolved_parallel,
&scheduler,
);
persist_lockfile_if_clean(&mut report, &prep.lockfile_path, &next_lock, opts.dry_run);
Ok(report)
}
struct RunContext {
order: Vec<usize>,
vars: VarEnv,
event_log: PathBuf,
lock_path: PathBuf,
lockfile_path: PathBuf,
prior_lock: std::collections::HashMap<String, LockEntry>,
registry: Arc<Registry>,
pack_type_registry: Arc<PackTypeRegistry>,
pre_run_recovery: Option<RecoveryReport>,
}
fn prepare_run_context(
pack_root: &Path,
graph: &PackGraph,
workspace: &Path,
) -> Result<RunContext, SyncError> {
let event_log = event_log_path(pack_root);
let lock_path = event_lock_path(&event_log);
let vars = VarEnv::from_os();
let order = post_order(graph);
let pre_run_recovery = scan_recovery(workspace, &event_log).ok().filter(|r| !r.is_empty());
let lockfile_path = lockfile_path(pack_root);
let prior_lock = load_prior_lock(&lockfile_path)?;
let registry = Arc::new(Registry::bootstrap());
let pack_type_registry = Arc::new(bootstrap_pack_type_registry());
Ok(RunContext {
order,
vars,
event_log,
lock_path,
lockfile_path,
prior_lock,
registry,
pack_type_registry,
pre_run_recovery,
})
}
fn bootstrap_pack_type_registry() -> PackTypeRegistry {
#[cfg(feature = "plugin-inventory")]
{
let mut reg = PackTypeRegistry::bootstrap();
reg.register_from_inventory();
reg
}
#[cfg(not(feature = "plugin-inventory"))]
{
PackTypeRegistry::bootstrap()
}
}
fn log_force_flag(force: bool) {
if force {
tracing::info!(
target: "grex::sync",
"--force active: bypassing lockfile skip-on-hash short-circuit"
);
}
}
fn walk_and_validate(
pack_root: &Path,
workspace: &Path,
validate: bool,
ref_override: Option<&str>,
) -> Result<PackGraph, SyncError> {
let loader = FsPackLoader::new();
let backend = GixBackend::new();
let walker = Walker::new(&loader, &backend, workspace.to_path_buf())
.with_ref_override(ref_override.map(str::to_string));
let graph = walker.walk(pack_root)?;
if validate {
validate_graph(&graph)?;
}
Ok(graph)
}
fn load_prior_lock(
lockfile_path: &Path,
) -> Result<std::collections::HashMap<String, LockEntry>, SyncError> {
read_lockfile(lockfile_path)
.map_err(|source| SyncError::Lockfile { path: lockfile_path.to_path_buf(), source })
}
fn persist_lockfile_if_clean(
report: &mut SyncReport,
lockfile_path: &Path,
next_lock: &std::collections::HashMap<String, LockEntry>,
dry_run: bool,
) {
if dry_run {
return;
}
if let Err(e) = write_lockfile(lockfile_path, next_lock) {
tracing::warn!(target: "grex::sync", "lockfile write failed: {e}");
report.event_log_warnings.push(format!("{}: {e}", lockfile_path.display()));
}
}
fn lockfile_path(pack_root: &Path) -> PathBuf {
pack_root_dir(pack_root).join(".grex").join("grex.lock.jsonl")
}
fn ensure_workspace_dir(workspace: &Path) -> Result<(), SyncError> {
if !workspace.exists() {
std::fs::create_dir_all(workspace).map_err(|e| SyncError::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: "<workspace>".into(),
required: format!("{}: {e}", workspace.display()),
}],
})?;
}
Ok(())
}
fn open_workspace_lock(workspace: &Path) -> Result<(ScopedLock, PathBuf), SyncError> {
let ws_lock_path = workspace_lock_path(workspace);
let ws_lock = ScopedLock::open(&ws_lock_path)
.map_err(|e| workspace_lock_err(&ws_lock_path, &e.to_string()))?;
Ok((ws_lock, ws_lock_path))
}
fn workspace_lock_err(ws_lock_path: &Path, reason: &str) -> SyncError {
SyncError::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: "<workspace-lock>".into(),
required: format!("{}: {reason}", ws_lock_path.display()),
}],
}
}
const LEGACY_WORKSPACE_DIR: &str = ".grex/workspace";
fn migrate_legacy_workspace(pack_root: &Path) -> Vec<WorkspaceMigration> {
let root = pack_root_dir(pack_root);
let legacy_root = root.join(LEGACY_WORKSPACE_DIR);
if !legacy_root.is_dir() {
return Vec::new();
}
let entries = match fs::read_dir(&legacy_root) {
Ok(e) => e,
Err(e) => {
tracing::warn!(
target: "grex::sync::migrate",
"legacy workspace `{}` unreadable: {e}",
legacy_root.display(),
);
return Vec::new();
}
};
let mut migrations = Vec::new();
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(e) => {
tracing::warn!(
target: "grex::sync::migrate",
"skipping unreadable entry under `{}`: {e}",
legacy_root.display(),
);
continue;
}
};
let Ok(ft) = entry.file_type() else { continue };
if ft.is_symlink() || !ft.is_dir() {
continue;
}
let name_os = entry.file_name();
let Some(name) = name_os.to_str() else { continue };
let from_abs = entry.path();
if !from_abs.join(".git").exists() {
continue;
}
let to_abs = root.join(name);
let from_rel = PathBuf::from(LEGACY_WORKSPACE_DIR).join(name);
let to_rel = PathBuf::from(name);
let outcome = decide_and_migrate(&from_abs, &to_abs);
log_migration(&from_rel, &to_rel, &outcome);
migrations.push(WorkspaceMigration { from: from_rel, to: to_rel, outcome });
}
cleanup_legacy_workspace_root(&legacy_root);
migrations
}
fn decide_and_migrate(from: &Path, to: &Path) -> MigrationOutcome {
let dest_exists = to.exists();
let dest_is_grex_repo = dest_exists && to.join(".git").exists();
if dest_is_grex_repo {
return MigrationOutcome::SkippedBothExist;
}
if dest_exists {
return MigrationOutcome::SkippedDestOccupied;
}
match fs::rename(from, to) {
Ok(()) => MigrationOutcome::Migrated,
Err(e) => MigrationOutcome::Failed { error: e.to_string() },
}
}
fn log_migration(from: &Path, to: &Path, outcome: &MigrationOutcome) {
let from_disp = from.display();
let to_disp = to.display();
match outcome {
MigrationOutcome::Migrated => {
tracing::info!(
target: "grex::sync::migrate",
"migrated: legacy={from_disp} -> new={to_disp}",
);
}
MigrationOutcome::SkippedBothExist => {
tracing::warn!(
target: "grex::sync::migrate",
"skipped: both legacy={from_disp} and new={to_disp} exist; resolve manually",
);
}
MigrationOutcome::SkippedDestOccupied => {
tracing::warn!(
target: "grex::sync::migrate",
"skipped: destination={to_disp} occupied; leaving legacy={from_disp} in place",
);
}
MigrationOutcome::Failed { error } => {
tracing::warn!(
target: "grex::sync::migrate",
"failed: legacy={from_disp} -> new={to_disp}: {error}",
);
}
}
}
fn cleanup_legacy_workspace_root(legacy_root: &Path) {
let orphan_lock = legacy_root.join(".grex.sync.lock");
if orphan_lock.exists() {
if let Err(e) = fs::remove_file(&orphan_lock) {
tracing::warn!(
target: "grex::sync::migrate",
"could not remove orphan lock `{}`: {e}",
orphan_lock.display(),
);
} else {
tracing::info!(
target: "grex::sync::migrate",
"removed orphan lock `{}`",
orphan_lock.display(),
);
}
}
let _ = fs::remove_dir(legacy_root);
}
fn resolve_workspace(pack_root: &Path, override_: Option<&Path>) -> PathBuf {
if let Some(p) = override_ {
return p.to_path_buf();
}
pack_root_dir(pack_root)
}
fn pack_root_dir(pack_root: &Path) -> PathBuf {
let is_yaml = matches!(pack_root.extension().and_then(|e| e.to_str()), Some("yaml" | "yml"));
if is_yaml {
pack_root
.parent()
.and_then(Path::parent)
.map_or_else(|| PathBuf::from("."), Path::to_path_buf)
} else {
pack_root.to_path_buf()
}
}
fn event_log_path(pack_root: &Path) -> PathBuf {
pack_root_dir(pack_root).join(".grex").join("grex.jsonl")
}
fn event_lock_path(event_log: &Path) -> PathBuf {
event_log.parent().map_or_else(|| PathBuf::from(".grex.lock"), |p| p.join(".grex.lock"))
}
fn workspace_lock_path(workspace: &Path) -> PathBuf {
workspace.join(".grex.sync.lock")
}
fn validate_graph(graph: &PackGraph) -> Result<(), SyncError> {
let mut errors: Vec<PackValidationError> = Vec::new();
for node in graph.nodes() {
if let Err(mut e) = node.manifest.validate_plan() {
errors.append(&mut e);
}
}
if let Err(mut e) = graph.validate() {
errors.append(&mut e);
}
if errors.is_empty() {
Ok(())
} else {
Err(SyncError::Validation { errors })
}
}
fn post_order(graph: &PackGraph) -> Vec<usize> {
let mut out = Vec::with_capacity(graph.nodes().len());
visit_post(graph, 0, &mut out);
out
}
fn visit_post(graph: &PackGraph, id: usize, out: &mut Vec<usize>) {
let kids: Vec<usize> = graph.children_of(id).map(|n| n.id).collect();
for k in kids {
visit_post(graph, k, out);
}
out.push(id);
}
#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
fn run_actions(
report: &mut SyncReport,
order: &[usize],
vars: &VarEnv,
workspace: &Path,
event_log: &Path,
lock_path: &Path,
dry_run: bool,
prior_lock: &std::collections::HashMap<String, LockEntry>,
next_lock: &mut std::collections::HashMap<String, LockEntry>,
registry: &Arc<Registry>,
pack_type_registry: &Arc<PackTypeRegistry>,
only: Option<&GlobSet>,
force: bool,
parallel: usize,
scheduler: &Arc<Scheduler>,
) {
let plan = PlanExecutor::with_registry(registry.clone());
let fs = FsExecutor::with_registry(registry.clone());
let rt = build_pack_type_runtime(parallel);
let visited_meta = new_visited_meta();
for &id in order {
let Some(node) = report.graph.node(id) else { continue };
let pack_name = node.name.clone();
let pack_path = node.path.clone();
let actions = node.manifest.actions.clone();
let manifest = node.manifest.clone();
let commit_sha = node.commit_sha.clone().unwrap_or_default();
let synthetic = node.synthetic;
if try_skip_or_filter(
report,
only,
&pack_name,
&pack_path,
&actions,
&commit_sha,
synthetic,
workspace,
prior_lock,
next_lock,
dry_run,
force,
) {
continue;
}
let pack_halted = run_pack_lifecycle(
report,
vars,
workspace,
event_log,
lock_path,
dry_run,
&plan,
&fs,
registry,
pack_type_registry,
&rt,
&pack_name,
&pack_path,
&manifest,
&visited_meta,
scheduler,
);
if pack_halted {
next_lock.remove(&pack_name);
return;
}
let actions_hash = compute_actions_hash(&actions, &commit_sha);
upsert_lock_entry(prior_lock, next_lock, &pack_name, &commit_sha, &actions_hash, synthetic);
}
}
fn build_pack_type_runtime(parallel: usize) -> tokio::runtime::Runtime {
let workers = parallel.clamp(2, num_cpus::get().max(2));
tokio::runtime::Builder::new_multi_thread()
.worker_threads(workers)
.enable_all()
.build()
.expect("tokio runtime for pack-type dispatch")
}
fn new_visited_meta() -> MetaVisitedSet {
std::sync::Arc::new(std::sync::Mutex::new(std::collections::HashSet::new()))
}
#[allow(clippy::too_many_arguments)]
fn try_skip_or_filter(
report: &mut SyncReport,
only: Option<&GlobSet>,
pack_name: &str,
pack_path: &Path,
actions: &[Action],
commit_sha: &str,
current_synthetic: bool,
workspace: &Path,
prior_lock: &std::collections::HashMap<String, LockEntry>,
next_lock: &mut std::collections::HashMap<String, LockEntry>,
dry_run: bool,
force: bool,
) -> bool {
if skip_for_only_filter(only, pack_name, pack_path, workspace) {
if let Some(prev) = prior_lock.get(pack_name) {
next_lock.insert(pack_name.to_string(), prev.clone());
}
return true;
}
try_skip_pack(
report,
pack_name,
pack_path,
actions,
commit_sha,
current_synthetic,
prior_lock,
next_lock,
dry_run,
force,
)
}
fn skip_for_only_filter(
only: Option<&GlobSet>,
pack_name: &str,
pack_path: &Path,
workspace: &Path,
) -> bool {
let Some(set) = only else { return false };
let rel = pack_path.strip_prefix(workspace).unwrap_or(pack_path);
let rel_str = rel.to_string_lossy().replace('\\', "/");
let matches = set.is_match(&rel_str);
if !matches {
tracing::info!(
target: "grex::sync",
"skipping pack `{pack_name}` (rel path `{rel_str}`): does not match --only filter"
);
}
!matches
}
#[allow(clippy::too_many_arguments)]
fn run_pack_lifecycle(
report: &mut SyncReport,
vars: &VarEnv,
workspace: &Path,
event_log: &Path,
lock_path: &Path,
dry_run: bool,
plan: &PlanExecutor,
fs: &FsExecutor,
registry: &Arc<Registry>,
pack_type_registry: &Arc<PackTypeRegistry>,
rt: &tokio::runtime::Runtime,
pack_name: &str,
pack_path: &Path,
manifest: &crate::pack::PackManifest,
visited_meta: &MetaVisitedSet,
scheduler: &Arc<Scheduler>,
) -> bool {
let type_tag = manifest.r#type.as_str();
if pack_type_registry.get(type_tag).is_none() {
let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
record_action_err(report, event_log, lock_path, pack_name, 0, "pack-type", err);
return true;
}
match manifest.r#type {
crate::pack::PackType::Declarative => run_declarative_actions(
report,
vars,
workspace,
event_log,
lock_path,
dry_run,
plan,
fs,
pack_name,
pack_path,
manifest,
&manifest.actions,
scheduler,
),
crate::pack::PackType::Meta | crate::pack::PackType::Scripted => dispatch_pack_type_plugin(
report,
vars,
workspace,
event_log,
lock_path,
registry,
pack_type_registry,
rt,
pack_name,
pack_path,
manifest,
type_tag,
visited_meta,
scheduler,
),
}
}
#[allow(clippy::too_many_arguments)]
fn run_declarative_actions(
report: &mut SyncReport,
vars: &VarEnv,
workspace: &Path,
event_log: &Path,
lock_path: &Path,
dry_run: bool,
plan: &PlanExecutor,
fs: &FsExecutor,
pack_name: &str,
pack_path: &Path,
manifest: &crate::pack::PackManifest,
actions: &[Action],
scheduler: &Arc<Scheduler>,
) -> bool {
if !dry_run {
let ctx = ExecCtx::new(vars, pack_path, workspace)
.with_platform(Platform::current())
.with_scheduler(scheduler);
if let Err(e) = crate::plugin::pack_type::apply_gitignore(&ctx, manifest) {
record_action_err(report, event_log, lock_path, pack_name, 0, "gitignore", e);
return true;
}
}
for (idx, action) in actions.iter().enumerate() {
let ctx = ExecCtx::new(vars, pack_path, workspace)
.with_platform(Platform::current())
.with_scheduler(scheduler);
let action_tag = action_kind_tag(action);
append_manifest_event(
event_log,
lock_path,
&Event::ActionStarted {
ts: Utc::now(),
pack: pack_name.to_string(),
action_idx: idx,
action_name: action_tag.to_string(),
},
&mut report.event_log_warnings,
);
let step_result =
if dry_run { plan.execute(action, &ctx) } else { fs.execute(action, &ctx) };
if !record_action_outcome(
report,
event_log,
lock_path,
pack_name,
idx,
action_tag,
step_result,
) {
return true;
}
}
false
}
#[allow(clippy::too_many_arguments)]
fn dispatch_pack_type_plugin(
report: &mut SyncReport,
vars: &VarEnv,
workspace: &Path,
event_log: &Path,
lock_path: &Path,
registry: &Arc<Registry>,
pack_type_registry: &Arc<PackTypeRegistry>,
rt: &tokio::runtime::Runtime,
pack_name: &str,
pack_path: &Path,
manifest: &crate::pack::PackManifest,
type_tag: &'static str,
visited_meta: &MetaVisitedSet,
scheduler: &Arc<Scheduler>,
) -> bool {
let _ = visited_meta;
let ctx = ExecCtx::new(vars, pack_path, workspace)
.with_platform(Platform::current())
.with_registry(registry)
.with_pack_type_registry(pack_type_registry)
.with_scheduler(scheduler);
append_manifest_event(
event_log,
lock_path,
&Event::ActionStarted {
ts: Utc::now(),
pack: pack_name.to_string(),
action_idx: 0,
action_name: type_tag.to_string(),
},
&mut report.event_log_warnings,
);
let plugin = pack_type_registry
.get(type_tag)
.expect("pack-type plugin must be registered (guarded above)");
let step_result = rt.block_on(crate::pack_lock::with_tier_scope(plugin.sync(&ctx, manifest)));
!record_action_outcome(report, event_log, lock_path, pack_name, 0, type_tag, step_result)
}
fn skip_eligibility(
actions: &[Action],
commit_sha: &str,
current_synthetic: bool,
prior: &LockEntry,
dry_run: bool,
force: bool,
) -> Option<String> {
if dry_run || force {
return None;
}
let hash = compute_actions_hash(actions, commit_sha);
if prior.actions_hash != hash {
return None;
}
if prior.synthetic != current_synthetic {
return None;
}
Some(hash)
}
#[allow(clippy::too_many_arguments)]
fn try_skip_pack(
report: &mut SyncReport,
pack_name: &str,
pack_path: &Path,
actions: &[Action],
commit_sha: &str,
current_synthetic: bool,
prior_lock: &std::collections::HashMap<String, LockEntry>,
next_lock: &mut std::collections::HashMap<String, LockEntry>,
dry_run: bool,
force: bool,
) -> bool {
let Some(prior) = prior_lock.get(pack_name) else {
return false;
};
let Some(hash) =
skip_eligibility(actions, commit_sha, current_synthetic, prior, dry_run, force)
else {
return false;
};
let skipped_step = ExecStep {
action_name: Cow::Borrowed("pack"),
result: ExecResult::Skipped {
pack_path: pack_path.to_path_buf(),
actions_hash: hash.clone(),
},
details: StepKind::PackSkipped { actions_hash: hash },
};
report.steps.push(SyncStep {
pack: pack_name.to_string(),
action_idx: 0,
exec_step: skipped_step,
});
next_lock.insert(pack_name.to_string(), prior.clone());
true
}
fn upsert_lock_entry(
prior_lock: &std::collections::HashMap<String, LockEntry>,
next_lock: &mut std::collections::HashMap<String, LockEntry>,
pack_name: &str,
commit_sha: &str,
actions_hash: &str,
synthetic: bool,
) {
if synthetic {
if let Some(prior) = prior_lock.get(pack_name) {
if !prior.synthetic {
tracing::warn!(
target: "grex::sync",
pack = pack_name,
"pack `{pack_name}` downgraded from real to synthetic — \
pack.yaml missing on disk; only `git pull` will run going forward",
);
}
}
}
let installed_at = Utc::now();
let entry = next_lock.get(pack_name).map_or_else(
|| LockEntry {
id: pack_name.to_string(),
sha: commit_sha.to_string(),
branch: String::new(),
installed_at,
actions_hash: actions_hash.to_string(),
schema_version: "1".to_string(),
synthetic,
},
|prev| LockEntry {
installed_at,
actions_hash: actions_hash.to_string(),
sha: commit_sha.to_string(),
synthetic,
..prev.clone()
},
);
next_lock.insert(pack_name.to_string(), entry);
}
fn record_action_outcome(
report: &mut SyncReport,
event_log: &Path,
lock_path: &Path,
pack_name: &str,
idx: usize,
action_tag: &'static str,
step_result: Result<ExecStep, ExecError>,
) -> bool {
match step_result {
Ok(step) => {
record_action_ok(report, event_log, lock_path, pack_name, idx, step);
true
}
Err(e) => {
record_action_err(report, event_log, lock_path, pack_name, idx, action_tag, e);
false
}
}
}
fn record_action_ok(
report: &mut SyncReport,
event_log: &Path,
lock_path: &Path,
pack_name: &str,
idx: usize,
step: ExecStep,
) {
append_step_event(event_log, lock_path, pack_name, &step, &mut report.event_log_warnings);
append_manifest_event(
event_log,
lock_path,
&Event::ActionCompleted {
ts: Utc::now(),
pack: pack_name.to_string(),
action_idx: idx,
result_summary: format!("{:?}", step.result),
},
&mut report.event_log_warnings,
);
report.steps.push(SyncStep { pack: pack_name.to_string(), action_idx: idx, exec_step: step });
}
fn record_action_err(
report: &mut SyncReport,
event_log: &Path,
lock_path: &Path,
pack_name: &str,
idx: usize,
action_tag: &'static str,
e: ExecError,
) {
let error_summary = truncate_error_summary(&e);
append_manifest_event(
event_log,
lock_path,
&Event::ActionHalted {
ts: Utc::now(),
pack: pack_name.to_string(),
action_idx: idx,
action_name: action_tag.to_string(),
error_summary,
},
&mut report.event_log_warnings,
);
let recovery_hint = recovery_hint_for(&e);
report.halted = Some(SyncError::Halted(Box::new(HaltedContext {
pack: pack_name.to_string(),
action_idx: idx,
action_name: action_tag.to_string(),
error: e,
recovery_hint,
})));
}
fn action_kind_tag(action: &crate::pack::Action) -> &'static str {
use crate::pack::Action;
match action {
Action::Symlink(_) => "symlink",
Action::Unlink(_) => "unlink",
Action::Env(_) => "env",
Action::Mkdir(_) => "mkdir",
Action::Rmdir(_) => "rmdir",
Action::Require(_) => "require",
Action::When(_) => "when",
Action::Exec(_) => "exec",
}
}
fn truncate_error_summary(err: &ExecError) -> String {
let mut s = err.to_string();
if s.len() > ACTION_ERROR_SUMMARY_MAX {
s.truncate(ACTION_ERROR_SUMMARY_MAX);
s.push_str("…[truncated]");
}
s
}
fn recovery_hint_for(err: &ExecError) -> Option<String> {
match err {
ExecError::SymlinkDestOccupied { .. } => Some(
"set `backup: true` on the symlink action, or remove the conflicting entry by hand"
.into(),
),
ExecError::SymlinkPrivilegeDenied { .. } => {
Some("enable Windows Developer Mode or re-run grex as administrator".into())
}
ExecError::SymlinkCreateAfterBackupFailed { backup, .. } => {
Some(format!("backup left at `{}`; restore manually then re-run", backup.display()))
}
ExecError::RmdirNotEmpty { .. } => {
Some("set `force: true` on the rmdir action to recurse".into())
}
ExecError::EnvPersistenceDenied { .. } => {
Some("re-run elevated (Machine scope needs admin)".into())
}
_ => None,
}
}
fn append_step_event(
log: &Path,
lock_path: &Path,
pack: &str,
step: &ExecStep,
warnings: &mut Vec<String>,
) {
let summary = format!("{}:{:?}", step.action_name, step.result);
let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: summary };
if let Err(e) = append_event_locked(log, lock_path, &event) {
tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
warnings.push(format!("{}: {e}", log.display()));
}
let _ = SCHEMA_VERSION;
}
fn append_manifest_event(log: &Path, lock_path: &Path, event: &Event, warnings: &mut Vec<String>) {
if let Err(e) = append_event_locked(log, lock_path, event) {
tracing::warn!(target: "grex::sync", "manifest append failed: {e}");
warnings.push(format!("{}: {e}", log.display()));
}
}
fn append_event_locked(log: &Path, lock_path: &Path, event: &Event) -> Result<(), String> {
if let Some(parent) = log.parent() {
std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
}
if let Some(parent) = lock_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| e.to_string())?;
}
let mut lock = ManifestLock::open(log, lock_path).map_err(|e| e.to_string())?;
lock.write(|| append_event(log, event)).map_err(|e| e.to_string())?.map_err(|e| e.to_string())
}
#[must_use]
pub fn pack_display_name(node: &PackNode) -> &str {
&node.name
}
pub fn teardown(
pack_root: &Path,
opts: &SyncOptions,
cancel: &CancellationToken,
) -> Result<SyncReport, SyncError> {
let _ = cancel;
let workspace = resolve_workspace(pack_root, opts.workspace.as_deref());
ensure_workspace_dir(&workspace)?;
let (mut ws_lock, ws_lock_path) = open_workspace_lock(&workspace)?;
let _ws_guard = match ws_lock.try_acquire() {
Ok(Some(g)) => g,
Ok(None) => {
return Err(SyncError::WorkspaceBusy {
workspace: workspace.clone(),
lock_path: ws_lock_path,
});
}
Err(e) => return Err(workspace_lock_err(&ws_lock_path, &e.to_string())),
};
let graph =
walk_and_validate(pack_root, &workspace, opts.validate, opts.ref_override.as_deref())?;
let prep = prepare_run_context(pack_root, &graph, &workspace)?;
let mut report = SyncReport {
graph,
steps: Vec::new(),
halted: None,
event_log_warnings: Vec::new(),
pre_run_recovery: prep.pre_run_recovery,
workspace_migrations: Vec::new(),
};
let resolved_parallel: usize = opts.parallel.unwrap_or_else(|| num_cpus::get().max(1));
let scheduler = Arc::new(Scheduler::new(resolved_parallel));
run_teardown(
&mut report,
&prep.order,
&prep.vars,
&workspace,
&prep.event_log,
&prep.lock_path,
&prep.registry,
&prep.pack_type_registry,
resolved_parallel,
&scheduler,
);
Ok(report)
}
#[allow(clippy::too_many_arguments)]
fn run_teardown(
report: &mut SyncReport,
order: &[usize],
vars: &VarEnv,
workspace: &Path,
event_log: &Path,
lock_path: &Path,
registry: &Arc<Registry>,
pack_type_registry: &Arc<PackTypeRegistry>,
parallel: usize,
scheduler: &Arc<Scheduler>,
) {
let rt = build_pack_type_runtime(parallel);
for &id in order.iter().rev() {
let Some(node) = report.graph.node(id) else { continue };
let pack_name = node.name.clone();
let pack_path = node.path.clone();
let manifest = node.manifest.clone();
let type_tag = manifest.r#type.as_str();
if pack_type_registry.get(type_tag).is_none() {
let err = ExecError::UnknownAction(format!("pack type `{type_tag}`"));
record_action_err(report, event_log, lock_path, &pack_name, 0, "pack-type", err);
return;
}
let ctx = ExecCtx::new(vars, &pack_path, workspace)
.with_platform(Platform::current())
.with_registry(registry)
.with_pack_type_registry(pack_type_registry)
.with_scheduler(scheduler);
append_manifest_event(
event_log,
lock_path,
&Event::ActionStarted {
ts: Utc::now(),
pack: pack_name.clone(),
action_idx: 0,
action_name: type_tag.to_string(),
},
&mut report.event_log_warnings,
);
let plugin = pack_type_registry
.get(type_tag)
.expect("pack-type plugin must be registered (guarded above)");
let step_result =
rt.block_on(crate::pack_lock::with_tier_scope(plugin.teardown(&ctx, &manifest)));
if !record_action_outcome(
report,
event_log,
lock_path,
&pack_name,
0,
type_tag,
step_result,
) {
return;
}
}
}
#[doc(hidden)]
pub fn __test_append_sync_event(
log: &Path,
lock_path: &Path,
pack: &str,
action_name: &str,
) -> Result<(), String> {
let event = Event::Sync { ts: Utc::now(), id: pack.to_string(), sha: action_name.to_string() };
append_event_locked(log, lock_path, &event)
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DanglingStart {
pub pack: String,
pub action_idx: usize,
pub action_name: String,
pub started_at: DateTime<Utc>,
}
#[non_exhaustive]
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RecoveryReport {
pub orphan_backups: Vec<PathBuf>,
pub orphan_tombstones: Vec<PathBuf>,
pub dangling_starts: Vec<DanglingStart>,
}
impl RecoveryReport {
#[must_use]
pub fn is_empty(&self) -> bool {
self.orphan_backups.is_empty()
&& self.orphan_tombstones.is_empty()
&& self.dangling_starts.is_empty()
}
}
pub fn scan_recovery(workspace: &Path, event_log: &Path) -> Result<RecoveryReport, SyncError> {
let mut report = RecoveryReport::default();
walk_for_backups(workspace, &mut report);
if event_log.exists() {
match read_all(event_log) {
Ok(events) => {
report.dangling_starts = collect_dangling_starts(&events);
}
Err(e) => {
return Err(SyncError::Validation {
errors: vec![PackValidationError::DependsOnUnsatisfied {
pack: "<event-log>".into(),
required: e.to_string(),
}],
});
}
}
}
Ok(report)
}
fn walk_for_backups(root: &Path, report: &mut RecoveryReport) {
walk_for_backups_inner(root, report, 0);
}
fn walk_for_backups_inner(dir: &Path, report: &mut RecoveryReport, depth: u32) {
const MAX_DEPTH: u32 = 6;
if depth > MAX_DEPTH {
return;
}
let Ok(entries) = std::fs::read_dir(dir) else { return };
for entry_result in entries {
let entry = match entry_result {
Ok(e) => e,
Err(e) => {
tracing::warn!(
target: "grex::sync::recover",
"skipping unreadable entry under `{}`: {e}",
dir.display(),
);
continue;
}
};
let path = entry.path();
let name = entry.file_name();
let Some(name_str) = name.to_str() else { continue };
if name_str.ends_with(".grex.bak") {
report.orphan_backups.push(path.clone());
continue;
}
if let Some(rest) = name_str.rsplit_once(".grex.bak.") {
if !rest.1.is_empty() {
report.orphan_tombstones.push(path.clone());
continue;
}
}
let Ok(ft) = entry.file_type() else { continue };
if ft.is_symlink() {
continue;
}
if ft.is_dir() {
walk_for_backups_inner(&path, report, depth + 1);
}
}
}
fn collect_dangling_starts(events: &[Event]) -> Vec<DanglingStart> {
use std::collections::HashMap;
let mut open: HashMap<(String, usize), DanglingStart> = HashMap::new();
for ev in events {
match ev {
Event::ActionStarted { ts, pack, action_idx, action_name } => {
open.insert(
(pack.clone(), *action_idx),
DanglingStart {
pack: pack.clone(),
action_idx: *action_idx,
action_name: action_name.clone(),
started_at: *ts,
},
);
}
Event::ActionCompleted { pack, action_idx, .. }
| Event::ActionHalted { pack, action_idx, .. } => {
open.remove(&(pack.clone(), *action_idx));
}
_ => {}
}
}
let mut out: Vec<DanglingStart> = open.into_values().collect();
out.sort_by_key(|a| a.started_at);
out
}
#[cfg(test)]
mod synthetic_transition_tests {
use super::{skip_eligibility, upsert_lock_entry, LockEntry};
use crate::lockfile::compute_actions_hash;
use chrono::{TimeZone, Utc};
use std::collections::HashMap;
fn ts() -> chrono::DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 4, 27, 10, 0, 0).unwrap()
}
fn stable_hash() -> String {
compute_actions_hash(&[], "deadbeef")
}
fn prior_entry(synthetic: bool) -> LockEntry {
LockEntry {
id: "alpha".into(),
sha: "deadbeef".into(),
branch: "main".into(),
installed_at: ts(),
actions_hash: stable_hash(),
schema_version: "1".into(),
synthetic,
}
}
#[test]
fn skip_eligibility_invalidates_when_synthetic_flag_flips() {
let prior = prior_entry(false);
let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
assert!(decision.is_none(), "skip must be invalidated when synthetic flag flips");
}
#[test]
fn skip_eligibility_allows_skip_when_synthetic_matches() {
let prior = prior_entry(true);
let decision = skip_eligibility(&[], "deadbeef", true, &prior, false, false);
assert_eq!(
decision.as_deref(),
Some(stable_hash().as_str()),
"skip must be honoured when synthetic flag matches",
);
}
#[test]
fn skip_eligibility_respects_dry_run_and_force() {
let prior = prior_entry(true);
assert!(skip_eligibility(&[], "deadbeef", true, &prior, true, false).is_none());
assert!(skip_eligibility(&[], "deadbeef", true, &prior, false, true).is_none());
}
#[test]
fn upsert_lock_entry_records_real_to_synthetic_downgrade() {
let mut prior: HashMap<String, LockEntry> = HashMap::new();
prior.insert(
"beta".into(),
LockEntry {
id: "beta".into(),
sha: "deadbeef".into(),
branch: "main".into(),
installed_at: ts(),
actions_hash: stable_hash(),
schema_version: "1".into(),
synthetic: false,
},
);
let mut next: HashMap<String, LockEntry> = HashMap::new();
upsert_lock_entry(&prior, &mut next, "beta", "deadbeef", &stable_hash(), true);
let entry = next.get("beta").expect("entry must be upserted");
assert!(entry.synthetic, "downgraded entry must carry synthetic = true");
assert_eq!(entry.actions_hash, stable_hash(), "actions_hash must reflect current run");
}
#[test]
fn upsert_lock_entry_no_op_for_steady_state_synthetic() {
let mut prior: HashMap<String, LockEntry> = HashMap::new();
prior.insert(
"gamma".into(),
LockEntry {
id: "gamma".into(),
sha: "deadbeef".into(),
branch: "main".into(),
installed_at: ts(),
actions_hash: stable_hash(),
schema_version: "1".into(),
synthetic: true,
},
);
let mut next: HashMap<String, LockEntry> = HashMap::new();
upsert_lock_entry(&prior, &mut next, "gamma", "deadbeef", &stable_hash(), true);
let entry = next.get("gamma").expect("entry must be upserted");
assert!(entry.synthetic, "synthetic must remain true on no-op refresh");
}
}