Skip to main content

cli/
operation_id.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Client-supplied operation-id resolution.
3//!
4//! Commands that advertise `supports_op_id: true` in the command
5//! catalog accept `--op-id <UUID>` or `HEDDLE_OPERATION_ID`. The local
6//! CLI reserves the id before dispatch, records stdout/stderr/exit
7//! status, and replays that recorded result for the same command body.
8//! Reusing the id with different arguments fails with a typed conflict.
9//!
10//! Commands that advertise `persists_op_id: true` may additionally
11//! generate and save an op-id for interrupted retry loops. Current
12//! explicit replay support does not imply generated persistence.
13
14use std::{io::Write, path::PathBuf, process::Command, str::FromStr, sync::Arc};
15
16use anyhow::{Context, Result, anyhow};
17use objects::object::OperationId;
18use repo::{
19    Repository,
20    operation_dedup::{
21        DedupOutcome, OperationDedupStore, hash_request_body, reserve_operation_id_eager,
22    },
23};
24use serde::{Deserialize, Serialize};
25
26use crate::cli::{
27    cli_args::{Cli, OutputMode},
28    commands::RecoveryAdvice,
29};
30
31const LOCAL_OP_ID_CHILD_ENV: &str = "HEDDLE_LOCAL_OP_ID_CHILD";
32
33/// Canonical helper used by every state-changing dispatch arm in
34/// `main.rs`. Validates the `--op-id` format eagerly so a malformed
35/// value fails before the verb starts work.
36///
37/// The `op_id_coverage` build-time test grep-asserts a call to this
38/// function in every state-changing arm — keep the name stable.
39pub fn resolve_operation_id(cli: &Cli) -> Result<Option<OperationId>> {
40    let Some(raw) = cli.op_id.as_deref() else {
41        return Ok(None);
42    };
43    if raw.trim().is_empty() {
44        return Ok(None);
45    }
46    OperationId::from_str(raw)
47        .map(Some)
48        .map_err(|err| anyhow!(RecoveryAdvice::op_id_invalid(raw, err)))
49}
50
51pub fn supports_local_op_id(command_name: &str) -> bool {
52    crate::cli::commands::command_runtime_contract(command_name)
53        .map(|contract| contract.supports_op_id)
54        .unwrap_or(false)
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
58struct LocalOpIdResponse {
59    status_code: i32,
60    stdout: Vec<u8>,
61    stderr: Vec<u8>,
62}
63
64pub fn run_local_idempotency_if_requested(
65    cli: &Cli,
66    command_name: &str,
67    command_supports_op_id: bool,
68) -> Result<bool> {
69    let Some(op_id) = resolve_operation_id(cli)? else {
70        return Ok(false);
71    };
72    if std::env::var_os(LOCAL_OP_ID_CHILD_ENV).is_some() {
73        return Ok(false);
74    }
75    if !command_supports_op_id {
76        return Err(anyhow!(RecoveryAdvice::op_id_unsupported(command_name)));
77    }
78
79    let bootstrap_store = uses_bootstrap_op_id_store(command_name);
80    let normalized_args = normalized_argv_for_op_id();
81    let bootstrap_scope = if bootstrap_store {
82        Some(bootstrap_op_id_scope(cli)?)
83    } else {
84        None
85    };
86    let request_hash = request_hash_for_op_id(
87        &normalized_args,
88        bootstrap_scope
89            .as_ref()
90            .map(|scope| scope.hash_material.as_str()),
91    )?;
92    let repo_for_eager;
93    let store = if bootstrap_store {
94        let scope = bootstrap_scope
95            .as_ref()
96            .expect("bootstrap scope should be present for bootstrap store");
97        repo_for_eager = None;
98        Arc::new(
99            OperationDedupStore::open(bootstrap_op_id_store_dir(scope))
100                .context("open bootstrap op-id dedup store")?,
101        )
102    } else {
103        let repo = cli.open_repo()?;
104        let bootstrap_scope = bootstrap_op_id_scope_for_root(repo.root().to_path_buf())?;
105        let bootstrap_store =
106            OperationDedupStore::open(bootstrap_op_id_store_dir(&bootstrap_scope))
107                .context("open bootstrap op-id dedup store")?;
108        if let Some(existing) = bootstrap_store.metadata_for(op_id, command_name) {
109            return Err(anyhow!(RecoveryAdvice::op_id_conflict(
110                command_name,
111                &bootstrap_scope.label,
112                &normalized_args,
113                request_hash,
114                Some(existing),
115            )));
116        }
117        let store = Arc::new(
118            OperationDedupStore::open(repo.heddle_dir()).context("open op-id dedup store")?,
119        );
120        repo_for_eager = Some(repo);
121        store
122    };
123    let json_mode = json_display_mode(cli);
124
125    let reserve_outcome = if let Some(repo) = repo_for_eager.as_ref() {
126        reserve_operation_id_eager(repo, Arc::clone(&store), op_id, command_name, request_hash)?
127    } else {
128        store.reserve(op_id, command_name, request_hash)?
129    };
130
131    match reserve_outcome {
132        DedupOutcome::Replay { response } => {
133            let replay: LocalOpIdResponse =
134                serde_json::from_slice(&response).context("decode cached op-id response")?;
135            replay_response(
136                &replay,
137                json_mode.map(|mode| OpIdDisplayContext {
138                    op_id: &op_id,
139                    command_name,
140                    status: "replayed",
141                    replayed: true,
142                    child_succeeded: replay.status_code == 0,
143                    mode,
144                }),
145            )?;
146            if replay.status_code != 0 {
147                std::process::exit(replay.status_code);
148            }
149            Ok(true)
150        }
151        DedupOutcome::Conflict => Err(anyhow!(RecoveryAdvice::op_id_conflict(
152            command_name,
153            bootstrap_scope
154                .as_ref()
155                .map(|scope| scope.label.as_str())
156                .unwrap_or("repository-local .heddle"),
157            &normalized_args,
158            request_hash,
159            store.metadata_for(op_id, command_name),
160        ))),
161        DedupOutcome::InFlight => Err(anyhow!(RecoveryAdvice::op_id_in_flight())),
162        DedupOutcome::Reserved => {
163            let output = Command::new(std::env::current_exe()?)
164                .args(std::env::args_os().skip(1))
165                .env(LOCAL_OP_ID_CHILD_ENV, "1")
166                .output();
167            let output = match output {
168                Ok(output) => output,
169                Err(error) => {
170                    let _ = store.cancel(op_id, command_name);
171                    return Err(error).context("run op-id child process");
172                }
173            };
174            let response = LocalOpIdResponse {
175                status_code: output.status.code().unwrap_or(1),
176                stdout: output.stdout,
177                stderr: output.stderr,
178            };
179            let encoded = serde_json::to_vec(&response).context("encode cached op-id response")?;
180            store.record(op_id, command_name, request_hash, encoded)?;
181            replay_response(
182                &response,
183                json_mode.map(|mode| OpIdDisplayContext {
184                    op_id: &op_id,
185                    command_name,
186                    status: "executed",
187                    replayed: false,
188                    child_succeeded: response.status_code == 0,
189                    mode,
190                }),
191            )?;
192            if response.status_code != 0 {
193                std::process::exit(response.status_code);
194            }
195            Ok(true)
196        }
197    }
198}
199
200#[derive(Clone, Copy)]
201struct OpIdDisplayContext<'a> {
202    op_id: &'a OperationId,
203    command_name: &'a str,
204    status: &'a str,
205    replayed: bool,
206    child_succeeded: bool,
207    mode: JsonDisplayMode,
208}
209
210fn replay_response(
211    response: &LocalOpIdResponse,
212    context: Option<OpIdDisplayContext>,
213) -> Result<()> {
214    let stdout = context
215        .map(|context| decorate_json_stream(&response.stdout, context))
216        .transpose()?
217        .unwrap_or_else(|| response.stdout.clone());
218    let stderr = context
219        .map(|context| decorate_json_stream(&response.stderr, context))
220        .transpose()?
221        .unwrap_or_else(|| response.stderr.clone());
222    std::io::stdout().write_all(&stdout)?;
223    std::io::stderr().write_all(&stderr)?;
224    Ok(())
225}
226
227#[derive(Clone, Copy)]
228enum JsonDisplayMode {
229    Full,
230    Compact,
231}
232
233fn json_display_mode(cli: &Cli) -> Option<JsonDisplayMode> {
234    match cli.output {
235        Some(OutputMode::Json) => Some(JsonDisplayMode::Full),
236        Some(OutputMode::JsonCompact) => Some(JsonDisplayMode::Compact),
237        _ => None,
238    }
239}
240
241fn decorate_json_stream(bytes: &[u8], context: OpIdDisplayContext) -> Result<Vec<u8>> {
242    if bytes.is_empty() {
243        return Ok(Vec::new());
244    }
245    if matches!(context.mode, JsonDisplayMode::Compact) && !context.child_succeeded {
246        return Ok(bytes.to_vec());
247    }
248    let Ok(mut value) = serde_json::from_slice::<serde_json::Value>(bytes) else {
249        return Ok(bytes.to_vec());
250    };
251    let Some(object) = value.as_object_mut() else {
252        return Ok(bytes.to_vec());
253    };
254    if matches!(context.mode, JsonDisplayMode::Compact) {
255        crate::cli::commands::compact::retain_compact_surface_fields(&mut value);
256        let mut compact = serde_json::to_vec(&value)?;
257        compact.push(b'\n');
258        return Ok(compact);
259    }
260    let op_id = context.op_id.to_string();
261    object.insert(
262        "op_id".to_string(),
263        serde_json::Value::String(op_id.clone()),
264    );
265    object.insert(
266        "idempotency_status".to_string(),
267        serde_json::Value::String(context.status.to_string()),
268    );
269    object.insert(
270        "replayed".to_string(),
271        serde_json::Value::Bool(context.replayed),
272    );
273    object.insert(
274        "operation_record".to_string(),
275        serde_json::json!({
276            "op_id": op_id,
277            "command": context.command_name,
278            "idempotency_status": context.status,
279            "replayed": context.replayed,
280        }),
281    );
282    let mut decorated = serde_json::to_vec(&value)?;
283    decorated.push(b'\n');
284    Ok(decorated)
285}
286
287fn normalized_argv_for_op_id() -> Vec<String> {
288    let mut normalized = Vec::new();
289    let mut args = std::env::args().skip(1).peekable();
290    while let Some(arg) = args.next() {
291        if arg == "--op-id" {
292            let _ = args.next();
293            continue;
294        }
295        if arg.starts_with("--op-id=") {
296            continue;
297        }
298        normalized.push(arg);
299    }
300    normalized
301}
302
303fn request_hash_for_op_id(
304    normalized_args: &[String],
305    invocation_context: Option<&str>,
306) -> Result<[u8; 32]> {
307    let mut body = normalized_args.join("\0").into_bytes();
308    if let Some(context) = invocation_context {
309        body.extend_from_slice(b"\0context\0");
310        body.extend_from_slice(context.as_bytes());
311    }
312    Ok(hash_request_body(&body))
313}
314
315fn uses_bootstrap_op_id_store(command_name: &str) -> bool {
316    crate::cli::commands::command_uses_bootstrap_op_id_store(command_name)
317}
318
319struct BootstrapOpIdScope {
320    id: String,
321    label: String,
322    hash_material: String,
323}
324
325fn bootstrap_op_id_scope(cli: &Cli) -> Result<BootstrapOpIdScope> {
326    let root = match &cli.command {
327        crate::cli::Commands::Init(args) => args.path.clone().or_else(|| cli.repo.clone()),
328        crate::cli::Commands::Adopt(args) => args.path.clone().or_else(|| cli.repo.clone()),
329        // Clone destinations normally don't exist yet, so feeding the
330        // raw string into the hasher (and relying on the canonicalize
331        // fallback) lets two different cwds with `./repo` collide in
332        // the bootstrap cache. Resolve the destination against the
333        // current directory up front so the scope is cwd-specific.
334        crate::cli::Commands::Clone(args) => {
335            let cwd = std::env::current_dir()
336                .context("resolve current directory for clone op-id scope")?;
337            Some(absolutize_clone_destination(&args.local, &cwd))
338        }
339        _ => cli.repo.clone(),
340    }
341    .unwrap_or(std::env::current_dir().context("resolve current directory for op-id scope")?);
342    bootstrap_op_id_scope_for_root(root)
343}
344
345/// Anchor a (possibly relative, possibly non-existent) clone destination
346/// to an absolute path that's stable across cwds. `canonicalize` only
347/// works on paths that exist, so for the typical `heddle clone ./repo`
348/// case we walk up to the longest existing prefix, canonicalize it, then
349/// re-attach the remainder verbatim.
350fn absolutize_clone_destination(dest: &str, cwd: &std::path::Path) -> PathBuf {
351    let path = PathBuf::from(dest);
352    let absolute = if path.is_absolute() {
353        path
354    } else {
355        cwd.join(path)
356    };
357    let mut existing = absolute.clone();
358    let mut remainder: Vec<std::ffi::OsString> = Vec::new();
359    while !existing.exists() {
360        let Some(name) = existing.file_name().map(|s| s.to_os_string()) else {
361            break;
362        };
363        if !existing.pop() {
364            break;
365        }
366        remainder.push(name);
367    }
368    let mut result = std::fs::canonicalize(&existing).unwrap_or(existing);
369    for component in remainder.into_iter().rev() {
370        result.push(component);
371    }
372    result
373}
374
375fn bootstrap_op_id_scope_for_root(root: PathBuf) -> Result<BootstrapOpIdScope> {
376    use sha2::{Digest, Sha256};
377
378    let canonical = std::fs::canonicalize(&root).unwrap_or(root);
379    let label = canonical.display().to_string();
380    let hash_material = format!("bootstrap-repo-root\0{label}");
381    let mut hasher = Sha256::new();
382    hasher.update(hash_material.as_bytes());
383    let digest = hex::encode(hasher.finalize());
384    Ok(BootstrapOpIdScope {
385        id: digest[..16.min(digest.len())].to_string(),
386        label,
387        hash_material,
388    })
389}
390
391fn bootstrap_op_id_store_dir(scope: &BootstrapOpIdScope) -> PathBuf {
392    let base = std::env::var_os("HOME")
393        .map(PathBuf::from)
394        .unwrap_or_else(std::env::temp_dir);
395    base.join(".heddle").join("bootstrap-op-id").join(&scope.id)
396}
397
398/// Same as [`resolve_operation_id`] but returns the wire-string form
399/// expected by gRPC requests. `""` means "no idempotency for this call".
400pub fn wire(cli: &Cli) -> String {
401    cli.op_id.clone().unwrap_or_default()
402}
403
404/// Per-repo session directory under `$HOME/.heddle/session/<repo-id>`.
405/// `<repo-id>` is a 16-char SHA-256 of the canonical repo root so two
406/// worktrees of the same repo don't collide.
407fn session_dir_for(repo: &Repository) -> PathBuf {
408    use sha2::{Digest, Sha256};
409    let canonical =
410        std::fs::canonicalize(repo.root()).unwrap_or_else(|_| repo.root().to_path_buf());
411    let mut hasher = Sha256::new();
412    hasher.update(canonical.to_string_lossy().as_bytes());
413    let digest = hex::encode(hasher.finalize());
414    let repo_id = &digest[..16.min(digest.len())];
415    let base = std::env::var_os("HOME")
416        .map(PathBuf::from)
417        .unwrap_or_else(std::env::temp_dir);
418    base.join(".heddle").join("session").join(repo_id)
419}
420
421fn last_op_id_path(repo: &Repository) -> PathBuf {
422    session_dir_for(repo).join("last_op_id.toml")
423}
424
425#[derive(serde::Serialize, serde::Deserialize, Default)]
426struct LastOpIdFile {
427    /// Per-verb most-recent op-id. Verbs whose command contract does
428    /// not opt into generated op-id persistence are never read or
429    /// written here.
430    #[serde(default)]
431    by_verb: std::collections::BTreeMap<String, String>,
432}
433
434/// Resolve the op-id for a verb that opts into `^C → re-run`
435/// persistence. Order:
436///   1. Caller passed `--op-id` / `HEDDLE_OPERATION_ID` → use it.
437///   2. The command contract opts into op-id persistence AND a recent
438///      saved id exists for that verb → use it (don't persist; we're
439///      reusing).
440///   3. Otherwise generate a fresh id, persist it for the verb, return.
441///
442/// Call [`clear_persisted_op_id`] after the verb completes
443/// successfully so the next run gets a fresh id.
444pub fn resolve_or_persist_for_verb(
445    cli: &Cli,
446    repo: &Repository,
447    verb: &str,
448) -> Result<OperationId> {
449    if let Some(explicit) = resolve_operation_id(cli)? {
450        return Ok(explicit);
451    }
452    if !crate::cli::commands::command_runtime_contract(verb)
453        .map(|contract| contract.persists_op_id)
454        .unwrap_or(false)
455    {
456        return Ok(OperationId::new());
457    }
458    let path = last_op_id_path(repo);
459    if let Ok(bytes) = std::fs::read(&path)
460        && let Ok(decoded) = toml::from_str::<LastOpIdFile>(&String::from_utf8_lossy(&bytes))
461        && let Some(saved) = decoded.by_verb.get(verb)
462        && let Ok(parsed) = OperationId::from_str(saved)
463    {
464        return Ok(parsed);
465    }
466    let fresh = OperationId::new();
467    persist_op_id(&path, verb, &fresh).context("persist last op id")?;
468    Ok(fresh)
469}
470
471/// Drop the persisted op-id for `verb`. Called after a successful
472/// response — releases the slot so the next run gets a fresh id
473/// rather than replaying.
474pub fn clear_persisted_op_id(repo: &Repository, verb: &str) -> Result<()> {
475    let path = last_op_id_path(repo);
476    let mut file: LastOpIdFile = match std::fs::read(&path) {
477        Ok(bytes) => toml::from_str(&String::from_utf8_lossy(&bytes)).unwrap_or_default(),
478        Err(_) => return Ok(()),
479    };
480    if file.by_verb.remove(verb).is_none() {
481        return Ok(());
482    }
483    if file.by_verb.is_empty() {
484        let _ = std::fs::remove_file(&path);
485        return Ok(());
486    }
487    let serialized = toml::to_string(&file).context("serialize last_op_id.toml")?;
488    if let Some(parent) = path.parent() {
489        std::fs::create_dir_all(parent)?;
490    }
491    std::fs::write(&path, serialized)?;
492    Ok(())
493}
494
495fn persist_op_id(path: &std::path::Path, verb: &str, op_id: &OperationId) -> Result<()> {
496    let mut file: LastOpIdFile = match std::fs::read(path) {
497        Ok(bytes) => toml::from_str(&String::from_utf8_lossy(&bytes)).unwrap_or_default(),
498        Err(_) => LastOpIdFile::default(),
499    };
500    file.by_verb.insert(verb.to_string(), op_id.to_string());
501    let serialized = toml::to_string(&file).context("serialize last_op_id.toml")?;
502    if let Some(parent) = path.parent() {
503        std::fs::create_dir_all(parent)?;
504    }
505    std::fs::write(path, serialized)?;
506    Ok(())
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512
513    fn cli_with(op_id: Option<&str>) -> Cli {
514        let mut cli: Cli = clap::Parser::parse_from(["heddle", "status"]);
515        cli.op_id = op_id.map(|s| s.to_string());
516        cli
517    }
518
519    #[test]
520    fn resolve_none_when_unset() {
521        let cli = cli_with(None);
522        assert!(resolve_operation_id(&cli).unwrap().is_none());
523    }
524
525    #[test]
526    fn resolve_parses_uuid() {
527        let id = OperationId::new();
528        let cli = cli_with(Some(&id.to_string()));
529        assert_eq!(resolve_operation_id(&cli).unwrap(), Some(id));
530    }
531
532    #[test]
533    fn resolve_rejects_garbage() {
534        let cli = cli_with(Some("not-a-uuid"));
535        assert!(resolve_operation_id(&cli).is_err());
536    }
537
538    #[test]
539    fn wire_is_empty_when_unset() {
540        let cli = cli_with(None);
541        assert_eq!(wire(&cli), "");
542    }
543
544    #[test]
545    fn wire_returns_string_when_set() {
546        let id = OperationId::new();
547        let cli = cli_with(Some(&id.to_string()));
548        assert_eq!(wire(&cli), id.to_string());
549    }
550
551    /// Two clones with the same `--op-id` and same relative destination
552    /// but launched from different working directories must hash to
553    /// different bootstrap scopes, otherwise the cache replays the
554    /// wrong checkout.
555    #[test]
556    fn clone_destination_resolves_relative_to_supplied_cwd() {
557        let cwd_a = tempfile::tempdir().expect("tempdir a");
558        let cwd_b = tempfile::tempdir().expect("tempdir b");
559
560        let from_a = super::absolutize_clone_destination("./repo", cwd_a.path());
561        let from_b = super::absolutize_clone_destination("./repo", cwd_b.path());
562
563        assert_ne!(
564            from_a, from_b,
565            "same relative dest from different cwds must absolutize differently"
566        );
567        assert!(from_a.ends_with("repo"));
568        assert!(from_b.ends_with("repo"));
569
570        // Same cwd + different relative leaves also stay distinct.
571        let leaf_x = super::absolutize_clone_destination("./repo-x", cwd_a.path());
572        let leaf_y = super::absolutize_clone_destination("./repo-y", cwd_a.path());
573        assert_ne!(leaf_x, leaf_y);
574    }
575
576    #[test]
577    fn clone_destination_preserves_absolute_paths() {
578        let cwd = tempfile::tempdir().expect("tempdir");
579        let resolved =
580            super::absolutize_clone_destination("/var/empty/heddle-clone-target", cwd.path());
581        assert!(resolved.is_absolute());
582        assert!(resolved.ends_with("heddle-clone-target"));
583    }
584
585    /// Bootstrap scopes derived from the absolutized destination must
586    /// also disagree, since the hasher consumes the canonical path
587    /// label. Confirms the fix flows through to `BootstrapOpIdScope`.
588    #[test]
589    fn bootstrap_scope_for_relative_clone_dest_is_cwd_specific() {
590        let cwd_a = tempfile::tempdir().expect("tempdir a");
591        let cwd_b = tempfile::tempdir().expect("tempdir b");
592
593        let dest_a = super::absolutize_clone_destination("./repo", cwd_a.path());
594        let dest_b = super::absolutize_clone_destination("./repo", cwd_b.path());
595        let scope_a = super::bootstrap_op_id_scope_for_root(dest_a).expect("scope a");
596        let scope_b = super::bootstrap_op_id_scope_for_root(dest_b).expect("scope b");
597
598        assert_ne!(scope_a.id, scope_b.id);
599        assert_ne!(scope_a.hash_material, scope_b.hash_material);
600    }
601}