1use crate::cli::args::{GlobalFlags, SyncArgs};
18use anyhow::Result;
19use grex_core::sync::{self, HaltedContext, SyncError, SyncOptions, SyncReport, SyncStep};
20use tokio_util::sync::CancellationToken;
21
22pub fn run(args: SyncArgs, global: &GlobalFlags, cancel: &CancellationToken) -> Result<()> {
30 let Some(pack_root) = args.pack_root.clone() else {
31 if global.json {
37 emit_json_error(
38 "usage",
39 "`<pack_root>` is required (directory with `.grex/pack.yaml` or the YAML file)",
40 "sync",
41 );
42 } else {
43 eprintln!(
44 "grex sync: <pack_root> required (directory with `.grex/pack.yaml` or the YAML file)"
45 );
46 }
47 std::process::exit(2);
48 };
49 let dry_run = args.dry_run || global.dry_run;
50 let only_patterns = if args.only.is_empty() { None } else { Some(args.only.clone()) };
51 let opts = SyncOptions::new()
52 .with_dry_run(dry_run)
53 .with_validate(!args.no_validate)
54 .with_workspace(args.workspace.clone())
55 .with_ref_override(args.ref_override.clone())
56 .with_only_patterns(only_patterns)
57 .with_force(args.force);
58 match run_impl(&pack_root, &opts, args.quiet, global.json, cancel) {
59 RunOutcome::Ok => Ok(()),
60 RunOutcome::UsageError => std::process::exit(2),
61 RunOutcome::Validation => std::process::exit(1),
62 RunOutcome::Exec => std::process::exit(2),
63 RunOutcome::Tree => std::process::exit(3),
64 }
65}
66
67pub(super) enum RunOutcome {
68 Ok,
69 UsageError,
72 Validation,
73 Exec,
74 Tree,
75}
76
77fn run_impl(
78 pack_root: &std::path::Path,
79 opts: &SyncOptions,
80 quiet: bool,
81 json: bool,
82 cancel: &CancellationToken,
83) -> RunOutcome {
84 match sync::run(pack_root, opts, cancel) {
85 Ok(report) => {
86 if json {
87 emit_json_report(&report, opts.dry_run, "sync");
88 } else {
89 render_report(&report, opts.dry_run, quiet);
90 }
91 if report.halted.is_some() {
92 return RunOutcome::Exec;
93 }
94 RunOutcome::Ok
95 }
96 Err(err) => classify_sync_err(err, json, "sync"),
97 }
98}
99
100pub(super) fn classify_sync_err(err: SyncError, json: bool, verb: &str) -> RunOutcome {
105 match err {
106 SyncError::Validation { errors } => {
107 emit_validation(&errors, json, verb);
108 RunOutcome::Validation
109 }
110 SyncError::Tree(e) => {
111 emit_simple("tree", &e.to_string(), "tree walk failed", json, verb);
112 RunOutcome::Tree
113 }
114 SyncError::Exec(e) => {
115 emit_simple("exec", &e.to_string(), "execution error", json, verb);
116 RunOutcome::Exec
117 }
118 SyncError::Halted(ctx) => {
119 if json {
120 emit_json_halted(&ctx, verb);
121 } else {
122 print_halted_context(&ctx);
123 }
124 RunOutcome::Exec
125 }
126 SyncError::InvalidOnlyGlob { pattern, source } => {
127 let msg = format!("invalid --only glob `{pattern}`: {source}");
128 emit_simple("usage", &msg, "error", json, verb);
129 RunOutcome::UsageError
130 }
131 other => {
132 emit_simple("other", &other.to_string(), &format!("{verb} failed"), json, verb);
133 RunOutcome::Tree
134 }
135 }
136}
137
138fn emit_validation(errors: &[impl std::fmt::Display], json: bool, verb: &str) {
139 if json {
140 let joined = errors.iter().map(|e| e.to_string()).collect::<Vec<_>>().join("; ");
141 emit_json_error("validation", &joined, verb);
142 } else {
143 eprintln!("validation failed:");
144 for e in errors {
145 eprintln!(" - {e}");
146 }
147 }
148}
149
150fn emit_simple(kind: &str, message: &str, human_prefix: &str, json: bool, verb: &str) {
151 if json {
152 emit_json_error(kind, message, verb);
153 } else {
154 eprintln!("{human_prefix}: {message}");
155 }
156}
157
158pub(super) fn emit_json_report(report: &SyncReport, dry_run: bool, verb: &str) {
162 let steps: Vec<serde_json::Value> =
163 report.steps.iter().map(|s| step_to_json(s, dry_run)).collect();
164 let halted = report.halted.as_ref().and_then(|h| match h {
165 SyncError::Halted(ctx) => Some(serde_json::json!({
166 "pack": ctx.pack,
167 "action": ctx.action_name,
168 "idx": ctx.action_idx,
169 "error": ctx.error.to_string(),
170 "recovery_hint": ctx.recovery_hint,
171 })),
172 _ => None,
173 });
174 let migrations: Vec<serde_json::Value> = report
175 .workspace_migrations
176 .iter()
177 .map(|m| {
178 serde_json::json!({
179 "from": m.from.display().to_string(),
180 "to": m.to.display().to_string(),
181 "outcome": migration_outcome_tag(&m.outcome),
182 "error": match &m.outcome {
183 grex_core::sync::MigrationOutcome::Failed { error } => {
184 serde_json::Value::String(error.clone())
185 }
186 _ => serde_json::Value::Null,
187 },
188 })
189 })
190 .collect();
191 let doc = serde_json::json!({
192 "verb": verb,
193 "dry_run": dry_run,
194 "steps": steps,
195 "halted": halted,
196 "event_log_warnings": report.event_log_warnings,
197 "workspace_migrations": migrations,
198 "summary": {"total_steps": report.steps.len()},
199 });
200 if let Ok(s) = serde_json::to_string(&doc) {
201 println!("{s}");
202 }
203}
204
205fn step_to_json(s: &SyncStep, dry_run: bool) -> serde_json::Value {
206 use grex_core::ExecResult;
207 let (result, details) = match &s.exec_step.result {
208 ExecResult::PerformedChange => ("performed_change", serde_json::Value::Null),
209 ExecResult::WouldPerformChange => {
210 if dry_run {
211 ("would_perform_change", serde_json::Value::Null)
212 } else {
213 ("performed_change", serde_json::Value::Null)
214 }
215 }
216 ExecResult::AlreadySatisfied => ("already_satisfied", serde_json::Value::Null),
217 ExecResult::NoOp => ("noop", serde_json::Value::Null),
218 ExecResult::Skipped { pack_path, actions_hash, .. } => (
219 "skipped",
220 serde_json::json!({
221 "pack_path": pack_path.display().to_string(),
222 "actions_hash": actions_hash,
223 }),
224 ),
225 _ => ("other", serde_json::Value::Null),
226 };
227 serde_json::json!({
228 "pack": s.pack,
229 "action": s.exec_step.action_name,
230 "idx": s.action_idx,
231 "result": result,
232 "details": details,
233 })
234}
235
236pub(super) fn emit_json_error(kind: &str, message: &str, verb: &str) {
237 let doc = serde_json::json!({
238 "verb": verb,
239 "error": {
240 "kind": kind,
241 "message": message,
242 },
243 });
244 if let Ok(s) = serde_json::to_string(&doc) {
245 println!("{s}");
246 }
247}
248
249pub(super) fn emit_json_halted(ctx: &HaltedContext, verb: &str) {
250 let doc = serde_json::json!({
251 "verb": verb,
252 "halted": {
253 "pack": ctx.pack,
254 "action": ctx.action_name,
255 "idx": ctx.action_idx,
256 "error": ctx.error.to_string(),
257 "recovery_hint": ctx.recovery_hint,
258 },
259 });
260 if let Ok(s) = serde_json::to_string(&doc) {
261 println!("{s}");
262 }
263}
264
265fn print_halted_context(ctx: &HaltedContext) {
268 eprintln!(
269 "sync halted at pack `{}` action #{} ({}):",
270 ctx.pack, ctx.action_idx, ctx.action_name
271 );
272 eprintln!(" error: {}", ctx.error);
273 if let Some(hint) = &ctx.recovery_hint {
274 eprintln!(" hint: {hint}");
275 }
276}
277
278fn render_report(report: &SyncReport, dry_run: bool, quiet: bool) {
279 if !quiet {
280 if !report.workspace_migrations.is_empty() {
281 print_workspace_migrations(&report.workspace_migrations);
282 }
283 if let Some(rec) = &report.pre_run_recovery {
284 print_recovery_report(rec);
285 }
286 for s in &report.steps {
287 print_step(s, dry_run);
288 }
289 }
290 for w in &report.event_log_warnings {
291 eprintln!("warning: {w}");
292 }
293 if let Some(err) = &report.halted {
294 match err {
295 SyncError::Halted(ctx) => print_halted_context(ctx),
296 other => eprintln!("halted: {other}"),
297 }
298 }
299}
300
301fn print_workspace_migrations(migrations: &[grex_core::sync::WorkspaceMigration]) {
306 use grex_core::sync::MigrationOutcome;
307 for m in migrations {
308 let from = m.from.display();
309 let to = m.to.display();
310 match &m.outcome {
311 MigrationOutcome::Migrated => {
312 eprintln!("[migrated] legacy={from} -> new={to}");
313 }
314 MigrationOutcome::SkippedBothExist => {
315 eprintln!("[skipped] legacy={from} AND new={to} both exist; resolve manually",);
316 }
317 MigrationOutcome::SkippedDestOccupied => {
318 eprintln!("[skipped] destination={to} occupied; legacy={from} kept");
319 }
320 MigrationOutcome::Failed { error } => {
321 eprintln!("[failed] legacy={from} -> new={to}: {error}");
322 }
323 other => eprintln!("[unknown] legacy={from} -> new={to} ({other:?})"),
326 }
327 }
328}
329
330fn migration_outcome_tag(o: &grex_core::sync::MigrationOutcome) -> &'static str {
333 use grex_core::sync::MigrationOutcome;
334 match o {
335 MigrationOutcome::Migrated => "migrated",
336 MigrationOutcome::SkippedBothExist => "skipped_both_exist",
337 MigrationOutcome::SkippedDestOccupied => "skipped_dest_occupied",
338 MigrationOutcome::Failed { .. } => "failed",
339 _ => "other",
342 }
343}
344
345fn print_recovery_report(rec: &grex_core::sync::RecoveryReport) {
348 let total = rec.orphan_backups.len() + rec.orphan_tombstones.len() + rec.dangling_starts.len();
349 if total == 0 {
350 return;
351 }
352 eprintln!("warning: pre-run recovery scan found {total} artifact(s) from prior sync:");
353 for p in &rec.orphan_backups {
354 eprintln!(" orphan backup: {}", p.display());
355 }
356 for p in &rec.orphan_tombstones {
357 eprintln!(" orphan tombstone: {}", p.display());
358 }
359 for d in &rec.dangling_starts {
360 eprintln!(
361 " dangling start: pack `{}` action #{} ({}) at {}",
362 d.pack, d.action_idx, d.action_name, d.started_at
363 );
364 }
365}
366
367fn print_step(s: &SyncStep, dry_run: bool) {
368 use grex_core::ExecResult;
369 if let ExecResult::Skipped { pack_path, actions_hash, .. } = &s.exec_step.result {
378 println!(
379 "[skipped] pack={pack} path={path} hash={hash}",
380 pack = s.pack,
381 path = pack_path.display(),
382 hash = actions_hash,
383 );
384 return;
385 }
386 let tag = match (&s.exec_step.result, dry_run) {
387 (ExecResult::PerformedChange, _) => "ok",
388 (ExecResult::WouldPerformChange, true) => "would",
389 (ExecResult::WouldPerformChange, false) => "ok",
390 (ExecResult::AlreadySatisfied, _) => "skipped",
391 (ExecResult::NoOp, _) => "noop",
392 _ => "other",
395 };
396 println!(
397 "[{tag}] pack={pack} action={kind} idx={idx}",
398 pack = s.pack,
399 kind = s.exec_step.action_name,
400 idx = s.action_idx,
401 );
402}
403
404