1use std::{io::Write, path::PathBuf, process::Command, str::FromStr, sync::Arc};
15
16use anyhow::{Context, Result, anyhow};
17use objects::object::OperationId;
18use repo::{
19 Repository,
20 operation_dedup::{
21 DedupOutcome, OperationDedupStore, hash_request_body, reserve_operation_id_eager,
22 },
23};
24use serde::{Deserialize, Serialize};
25
26use crate::cli::{
27 cli_args::{Cli, OutputMode},
28 commands::RecoveryAdvice,
29};
30
31const LOCAL_OP_ID_CHILD_ENV: &str = "HEDDLE_LOCAL_OP_ID_CHILD";
32
33pub fn resolve_operation_id(cli: &Cli) -> Result<Option<OperationId>> {
40 let Some(raw) = cli.op_id.as_deref() else {
41 return Ok(None);
42 };
43 if raw.trim().is_empty() {
44 return Ok(None);
45 }
46 OperationId::from_str(raw)
47 .map(Some)
48 .map_err(|err| anyhow!(RecoveryAdvice::op_id_invalid(raw, err)))
49}
50
51pub fn supports_local_op_id(command_name: &str) -> bool {
52 crate::cli::commands::command_runtime_contract(command_name)
53 .map(|contract| contract.supports_op_id)
54 .unwrap_or(false)
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct LocalOpIdResponse {
59 status_code: i32,
60 stdout: Vec<u8>,
61 stderr: Vec<u8>,
62}
63
64pub fn run_local_idempotency_if_requested(
65 cli: &Cli,
66 command_name: &str,
67 command_supports_op_id: bool,
68) -> Result<bool> {
69 let Some(op_id) = resolve_operation_id(cli)? else {
70 return Ok(false);
71 };
72 if std::env::var_os(LOCAL_OP_ID_CHILD_ENV).is_some() {
73 return Ok(false);
74 }
75 if !command_supports_op_id {
76 return Err(anyhow!(RecoveryAdvice::op_id_unsupported(command_name)));
77 }
78
79 let bootstrap_store = uses_bootstrap_op_id_store(command_name);
80 let normalized_args = normalized_argv_for_op_id();
81 let bootstrap_scope = if bootstrap_store {
82 Some(bootstrap_op_id_scope(cli)?)
83 } else {
84 None
85 };
86 let request_hash = request_hash_for_op_id(
87 &normalized_args,
88 bootstrap_scope
89 .as_ref()
90 .map(|scope| scope.hash_material.as_str()),
91 )?;
92 let repo_for_eager;
93 let store = if bootstrap_store {
94 let scope = bootstrap_scope
95 .as_ref()
96 .expect("bootstrap scope should be present for bootstrap store");
97 repo_for_eager = None;
98 Arc::new(
99 OperationDedupStore::open(bootstrap_op_id_store_dir(scope))
100 .context("open bootstrap op-id dedup store")?,
101 )
102 } else {
103 let repo = cli.open_repo()?;
104 let bootstrap_scope = bootstrap_op_id_scope_for_root(repo.root().to_path_buf())?;
105 let bootstrap_store =
106 OperationDedupStore::open(bootstrap_op_id_store_dir(&bootstrap_scope))
107 .context("open bootstrap op-id dedup store")?;
108 if let Some(existing) = bootstrap_store.metadata_for(op_id, command_name) {
109 return Err(anyhow!(RecoveryAdvice::op_id_conflict(
110 command_name,
111 &bootstrap_scope.label,
112 &normalized_args,
113 request_hash,
114 Some(existing),
115 )));
116 }
117 let store = Arc::new(
118 OperationDedupStore::open(repo.heddle_dir()).context("open op-id dedup store")?,
119 );
120 repo_for_eager = Some(repo);
121 store
122 };
123 let json_mode = json_display_mode(cli);
124
125 let reserve_outcome = if let Some(repo) = repo_for_eager.as_ref() {
126 reserve_operation_id_eager(repo, Arc::clone(&store), op_id, command_name, request_hash)?
127 } else {
128 store.reserve(op_id, command_name, request_hash)?
129 };
130
131 match reserve_outcome {
132 DedupOutcome::Replay { response } => {
133 let replay: LocalOpIdResponse =
134 serde_json::from_slice(&response).context("decode cached op-id response")?;
135 replay_response(
136 &replay,
137 json_mode.map(|mode| OpIdDisplayContext {
138 op_id: &op_id,
139 command_name,
140 status: "replayed",
141 replayed: true,
142 child_succeeded: replay.status_code == 0,
143 mode,
144 }),
145 )?;
146 if replay.status_code != 0 {
147 std::process::exit(replay.status_code);
148 }
149 Ok(true)
150 }
151 DedupOutcome::Conflict => Err(anyhow!(RecoveryAdvice::op_id_conflict(
152 command_name,
153 bootstrap_scope
154 .as_ref()
155 .map(|scope| scope.label.as_str())
156 .unwrap_or("repository-local .heddle"),
157 &normalized_args,
158 request_hash,
159 store.metadata_for(op_id, command_name),
160 ))),
161 DedupOutcome::InFlight => Err(anyhow!(RecoveryAdvice::op_id_in_flight())),
162 DedupOutcome::Reserved => {
163 let output = Command::new(std::env::current_exe()?)
164 .args(std::env::args_os().skip(1))
165 .env(LOCAL_OP_ID_CHILD_ENV, "1")
166 .output();
167 let output = match output {
168 Ok(output) => output,
169 Err(error) => {
170 let _ = store.cancel(op_id, command_name);
171 return Err(error).context("run op-id child process");
172 }
173 };
174 let response = LocalOpIdResponse {
175 status_code: output.status.code().unwrap_or(1),
176 stdout: output.stdout,
177 stderr: output.stderr,
178 };
179 let encoded = serde_json::to_vec(&response).context("encode cached op-id response")?;
180 store.record(op_id, command_name, request_hash, encoded)?;
181 replay_response(
182 &response,
183 json_mode.map(|mode| OpIdDisplayContext {
184 op_id: &op_id,
185 command_name,
186 status: "executed",
187 replayed: false,
188 child_succeeded: response.status_code == 0,
189 mode,
190 }),
191 )?;
192 if response.status_code != 0 {
193 std::process::exit(response.status_code);
194 }
195 Ok(true)
196 }
197 }
198}
199
200#[derive(Clone, Copy)]
201struct OpIdDisplayContext<'a> {
202 op_id: &'a OperationId,
203 command_name: &'a str,
204 status: &'a str,
205 replayed: bool,
206 child_succeeded: bool,
207 mode: JsonDisplayMode,
208}
209
210fn replay_response(
211 response: &LocalOpIdResponse,
212 context: Option<OpIdDisplayContext>,
213) -> Result<()> {
214 let stdout = context
215 .map(|context| decorate_json_stream(&response.stdout, context))
216 .transpose()?
217 .unwrap_or_else(|| response.stdout.clone());
218 let stderr = context
219 .map(|context| decorate_json_stream(&response.stderr, context))
220 .transpose()?
221 .unwrap_or_else(|| response.stderr.clone());
222 std::io::stdout().write_all(&stdout)?;
223 std::io::stderr().write_all(&stderr)?;
224 Ok(())
225}
226
227#[derive(Clone, Copy)]
228enum JsonDisplayMode {
229 Full,
230 Compact,
231}
232
233fn json_display_mode(cli: &Cli) -> Option<JsonDisplayMode> {
234 match cli.output {
235 Some(OutputMode::Json) => Some(JsonDisplayMode::Full),
236 Some(OutputMode::JsonCompact) => Some(JsonDisplayMode::Compact),
237 _ => None,
238 }
239}
240
241fn decorate_json_stream(bytes: &[u8], context: OpIdDisplayContext) -> Result<Vec<u8>> {
242 if bytes.is_empty() {
243 return Ok(Vec::new());
244 }
245 if matches!(context.mode, JsonDisplayMode::Compact) && !context.child_succeeded {
246 return Ok(bytes.to_vec());
247 }
248 let Ok(mut value) = serde_json::from_slice::<serde_json::Value>(bytes) else {
249 return Ok(bytes.to_vec());
250 };
251 let Some(object) = value.as_object_mut() else {
252 return Ok(bytes.to_vec());
253 };
254 if matches!(context.mode, JsonDisplayMode::Compact) {
255 crate::cli::commands::compact::retain_compact_surface_fields(&mut value);
256 let mut compact = serde_json::to_vec(&value)?;
257 compact.push(b'\n');
258 return Ok(compact);
259 }
260 let op_id = context.op_id.to_string();
261 object.insert(
262 "op_id".to_string(),
263 serde_json::Value::String(op_id.clone()),
264 );
265 object.insert(
266 "idempotency_status".to_string(),
267 serde_json::Value::String(context.status.to_string()),
268 );
269 object.insert(
270 "replayed".to_string(),
271 serde_json::Value::Bool(context.replayed),
272 );
273 object.insert(
274 "operation_record".to_string(),
275 serde_json::json!({
276 "op_id": op_id,
277 "command": context.command_name,
278 "idempotency_status": context.status,
279 "replayed": context.replayed,
280 }),
281 );
282 let mut decorated = serde_json::to_vec(&value)?;
283 decorated.push(b'\n');
284 Ok(decorated)
285}
286
287fn normalized_argv_for_op_id() -> Vec<String> {
288 let mut normalized = Vec::new();
289 let mut args = std::env::args().skip(1).peekable();
290 while let Some(arg) = args.next() {
291 if arg == "--op-id" {
292 let _ = args.next();
293 continue;
294 }
295 if arg.starts_with("--op-id=") {
296 continue;
297 }
298 normalized.push(arg);
299 }
300 normalized
301}
302
303fn request_hash_for_op_id(
304 normalized_args: &[String],
305 invocation_context: Option<&str>,
306) -> Result<[u8; 32]> {
307 let mut body = normalized_args.join("\0").into_bytes();
308 if let Some(context) = invocation_context {
309 body.extend_from_slice(b"\0context\0");
310 body.extend_from_slice(context.as_bytes());
311 }
312 Ok(hash_request_body(&body))
313}
314
315fn uses_bootstrap_op_id_store(command_name: &str) -> bool {
316 crate::cli::commands::command_uses_bootstrap_op_id_store(command_name)
317}
318
319struct BootstrapOpIdScope {
320 id: String,
321 label: String,
322 hash_material: String,
323}
324
325fn bootstrap_op_id_scope(cli: &Cli) -> Result<BootstrapOpIdScope> {
326 let root = match &cli.command {
327 crate::cli::Commands::Init(args) => args.path.clone().or_else(|| cli.repo.clone()),
328 crate::cli::Commands::Adopt(args) => args.path.clone().or_else(|| cli.repo.clone()),
329 crate::cli::Commands::Clone(args) => {
335 let cwd = std::env::current_dir()
336 .context("resolve current directory for clone op-id scope")?;
337 Some(absolutize_clone_destination(&args.local, &cwd))
338 }
339 _ => cli.repo.clone(),
340 }
341 .unwrap_or(std::env::current_dir().context("resolve current directory for op-id scope")?);
342 bootstrap_op_id_scope_for_root(root)
343}
344
345fn absolutize_clone_destination(dest: &str, cwd: &std::path::Path) -> PathBuf {
351 let path = PathBuf::from(dest);
352 let absolute = if path.is_absolute() {
353 path
354 } else {
355 cwd.join(path)
356 };
357 let mut existing = absolute.clone();
358 let mut remainder: Vec<std::ffi::OsString> = Vec::new();
359 while !existing.exists() {
360 let Some(name) = existing.file_name().map(|s| s.to_os_string()) else {
361 break;
362 };
363 if !existing.pop() {
364 break;
365 }
366 remainder.push(name);
367 }
368 let mut result = std::fs::canonicalize(&existing).unwrap_or(existing);
369 for component in remainder.into_iter().rev() {
370 result.push(component);
371 }
372 result
373}
374
375fn bootstrap_op_id_scope_for_root(root: PathBuf) -> Result<BootstrapOpIdScope> {
376 use sha2::{Digest, Sha256};
377
378 let canonical = std::fs::canonicalize(&root).unwrap_or(root);
379 let label = canonical.display().to_string();
380 let hash_material = format!("bootstrap-repo-root\0{label}");
381 let mut hasher = Sha256::new();
382 hasher.update(hash_material.as_bytes());
383 let digest = hex::encode(hasher.finalize());
384 Ok(BootstrapOpIdScope {
385 id: digest[..16.min(digest.len())].to_string(),
386 label,
387 hash_material,
388 })
389}
390
391fn bootstrap_op_id_store_dir(scope: &BootstrapOpIdScope) -> PathBuf {
392 let base = std::env::var_os("HOME")
393 .map(PathBuf::from)
394 .unwrap_or_else(std::env::temp_dir);
395 base.join(".heddle").join("bootstrap-op-id").join(&scope.id)
396}
397
398pub fn wire(cli: &Cli) -> String {
401 cli.op_id.clone().unwrap_or_default()
402}
403
404fn session_dir_for(repo: &Repository) -> PathBuf {
408 use sha2::{Digest, Sha256};
409 let canonical =
410 std::fs::canonicalize(repo.root()).unwrap_or_else(|_| repo.root().to_path_buf());
411 let mut hasher = Sha256::new();
412 hasher.update(canonical.to_string_lossy().as_bytes());
413 let digest = hex::encode(hasher.finalize());
414 let repo_id = &digest[..16.min(digest.len())];
415 let base = std::env::var_os("HOME")
416 .map(PathBuf::from)
417 .unwrap_or_else(std::env::temp_dir);
418 base.join(".heddle").join("session").join(repo_id)
419}
420
421fn last_op_id_path(repo: &Repository) -> PathBuf {
422 session_dir_for(repo).join("last_op_id.toml")
423}
424
425#[derive(serde::Serialize, serde::Deserialize, Default)]
426struct LastOpIdFile {
427 #[serde(default)]
431 by_verb: std::collections::BTreeMap<String, String>,
432}
433
434pub fn resolve_or_persist_for_verb(
445 cli: &Cli,
446 repo: &Repository,
447 verb: &str,
448) -> Result<OperationId> {
449 if let Some(explicit) = resolve_operation_id(cli)? {
450 return Ok(explicit);
451 }
452 if !crate::cli::commands::command_runtime_contract(verb)
453 .map(|contract| contract.persists_op_id)
454 .unwrap_or(false)
455 {
456 return Ok(OperationId::new());
457 }
458 let path = last_op_id_path(repo);
459 if let Ok(bytes) = std::fs::read(&path)
460 && let Ok(decoded) = toml::from_str::<LastOpIdFile>(&String::from_utf8_lossy(&bytes))
461 && let Some(saved) = decoded.by_verb.get(verb)
462 && let Ok(parsed) = OperationId::from_str(saved)
463 {
464 return Ok(parsed);
465 }
466 let fresh = OperationId::new();
467 persist_op_id(&path, verb, &fresh).context("persist last op id")?;
468 Ok(fresh)
469}
470
471pub fn clear_persisted_op_id(repo: &Repository, verb: &str) -> Result<()> {
475 let path = last_op_id_path(repo);
476 let mut file: LastOpIdFile = match std::fs::read(&path) {
477 Ok(bytes) => toml::from_str(&String::from_utf8_lossy(&bytes)).unwrap_or_default(),
478 Err(_) => return Ok(()),
479 };
480 if file.by_verb.remove(verb).is_none() {
481 return Ok(());
482 }
483 if file.by_verb.is_empty() {
484 let _ = std::fs::remove_file(&path);
485 return Ok(());
486 }
487 let serialized = toml::to_string(&file).context("serialize last_op_id.toml")?;
488 if let Some(parent) = path.parent() {
489 std::fs::create_dir_all(parent)?;
490 }
491 std::fs::write(&path, serialized)?;
492 Ok(())
493}
494
495fn persist_op_id(path: &std::path::Path, verb: &str, op_id: &OperationId) -> Result<()> {
496 let mut file: LastOpIdFile = match std::fs::read(path) {
497 Ok(bytes) => toml::from_str(&String::from_utf8_lossy(&bytes)).unwrap_or_default(),
498 Err(_) => LastOpIdFile::default(),
499 };
500 file.by_verb.insert(verb.to_string(), op_id.to_string());
501 let serialized = toml::to_string(&file).context("serialize last_op_id.toml")?;
502 if let Some(parent) = path.parent() {
503 std::fs::create_dir_all(parent)?;
504 }
505 std::fs::write(path, serialized)?;
506 Ok(())
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512
513 fn cli_with(op_id: Option<&str>) -> Cli {
514 let mut cli: Cli = clap::Parser::parse_from(["heddle", "status"]);
515 cli.op_id = op_id.map(|s| s.to_string());
516 cli
517 }
518
519 #[test]
520 fn resolve_none_when_unset() {
521 let cli = cli_with(None);
522 assert!(resolve_operation_id(&cli).unwrap().is_none());
523 }
524
525 #[test]
526 fn resolve_parses_uuid() {
527 let id = OperationId::new();
528 let cli = cli_with(Some(&id.to_string()));
529 assert_eq!(resolve_operation_id(&cli).unwrap(), Some(id));
530 }
531
532 #[test]
533 fn resolve_rejects_garbage() {
534 let cli = cli_with(Some("not-a-uuid"));
535 assert!(resolve_operation_id(&cli).is_err());
536 }
537
538 #[test]
539 fn wire_is_empty_when_unset() {
540 let cli = cli_with(None);
541 assert_eq!(wire(&cli), "");
542 }
543
544 #[test]
545 fn wire_returns_string_when_set() {
546 let id = OperationId::new();
547 let cli = cli_with(Some(&id.to_string()));
548 assert_eq!(wire(&cli), id.to_string());
549 }
550
551 #[test]
556 fn clone_destination_resolves_relative_to_supplied_cwd() {
557 let cwd_a = tempfile::tempdir().expect("tempdir a");
558 let cwd_b = tempfile::tempdir().expect("tempdir b");
559
560 let from_a = super::absolutize_clone_destination("./repo", cwd_a.path());
561 let from_b = super::absolutize_clone_destination("./repo", cwd_b.path());
562
563 assert_ne!(
564 from_a, from_b,
565 "same relative dest from different cwds must absolutize differently"
566 );
567 assert!(from_a.ends_with("repo"));
568 assert!(from_b.ends_with("repo"));
569
570 let leaf_x = super::absolutize_clone_destination("./repo-x", cwd_a.path());
572 let leaf_y = super::absolutize_clone_destination("./repo-y", cwd_a.path());
573 assert_ne!(leaf_x, leaf_y);
574 }
575
576 #[test]
577 fn clone_destination_preserves_absolute_paths() {
578 let cwd = tempfile::tempdir().expect("tempdir");
579 let resolved =
580 super::absolutize_clone_destination("/var/empty/heddle-clone-target", cwd.path());
581 assert!(resolved.is_absolute());
582 assert!(resolved.ends_with("heddle-clone-target"));
583 }
584
585 #[test]
589 fn bootstrap_scope_for_relative_clone_dest_is_cwd_specific() {
590 let cwd_a = tempfile::tempdir().expect("tempdir a");
591 let cwd_b = tempfile::tempdir().expect("tempdir b");
592
593 let dest_a = super::absolutize_clone_destination("./repo", cwd_a.path());
594 let dest_b = super::absolutize_clone_destination("./repo", cwd_b.path());
595 let scope_a = super::bootstrap_op_id_scope_for_root(dest_a).expect("scope a");
596 let scope_b = super::bootstrap_op_id_scope_for_root(dest_b).expect("scope b");
597
598 assert_ne!(scope_a.id, scope_b.id);
599 assert_ne!(scope_a.hash_material, scope_b.hash_material);
600 }
601}