Skip to main content

cli/cli/commands/
agent_cmd.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Stable JSON-first agent reservation API.
3
4use anyhow::{Result, anyhow};
5use chrono::Utc;
6use objects::store::{
7    AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ReserveOutcome, current_boot_id,
8};
9use refs::{Head, RefExpectation};
10use repo::{
11    Repository, Thread, ThreadConfidenceSummary, ThreadFreshness, ThreadIntegrationPolicy,
12    ThreadManager, ThreadMode, ThreadState, ThreadVerificationSummary,
13};
14use schemars::JsonSchema;
15use serde::Serialize;
16
17use crate::cli::{
18    Cli,
19    cli_args::{
20        AgentApiListArgs, AgentHeartbeatArgs, AgentReleaseArgs, AgentReleaseStatusArg,
21        AgentReserveArgs,
22    },
23    should_output_json,
24};
25
26#[derive(Serialize, JsonSchema)]
27pub struct AgentReservationOutput {
28    pub session_id: String,
29    pub reservation_token: Option<String>,
30    pub thread: String,
31    pub anchor_state: Option<String>,
32    pub anchor_root: Option<String>,
33    /// Lifecycle status as a stable kebab-case string
34    /// (`active|abandoned|complete|merged`). Mirrors
35    /// `objects::store::AgentStatus` but kept as a `String` here so
36    /// the schema lives entirely in the CLI crate.
37    pub status: String,
38    pub path: Option<String>,
39    pub task: Option<String>,
40}
41
42impl From<&AgentEntry> for AgentReservationOutput {
43    fn from(entry: &AgentEntry) -> Self {
44        Self {
45            session_id: entry.session_id.clone(),
46            reservation_token: entry.reservation_token.clone(),
47            thread: entry.thread.clone(),
48            anchor_state: entry.anchor_state.clone(),
49            anchor_root: entry.anchor_root.clone(),
50            status: entry.status.to_string(),
51            path: entry.path.as_ref().map(|path| path.display().to_string()),
52            task: entry.attach_reason.clone(),
53        }
54    }
55}
56
57/// Stable structured conflict shape emitted on stdout when `agent
58/// reserve` cannot proceed. Orchestrators parse this; humans see the
59/// shorter `Error: ...` message anyhow renders to stderr.
60#[derive(Serialize, JsonSchema)]
61pub struct AgentReservationConflict {
62    /// `"live_owner"` (existing reservation matches the requested
63    /// anchor — wait or release) or `"anchor_drift"` (existing
64    /// reservation is on a different anchor — refresh and retry).
65    pub kind: &'static str,
66    pub thread: String,
67    pub requested_anchor: String,
68    /// `Some` when a live agent already holds the thread; `None` when
69    /// the thread ref exists at a different state with no live owner.
70    pub owner: Option<AgentReservationOutput>,
71    /// Anchor recorded against the existing reservation or thread ref,
72    /// when known. Always present for anchor-drift conflicts so
73    /// orchestrators can decide whether to refresh.
74    pub reserved_anchor: Option<String>,
75    pub message: String,
76}
77
78fn emit_live_owner_conflict(
79    thread: &str,
80    requested_anchor_full: &str,
81    owner: &AgentEntry,
82) -> anyhow::Error {
83    let kind = if owner.anchor_state.as_deref() == Some(requested_anchor_full) {
84        "live_owner"
85    } else {
86        "anchor_drift"
87    };
88    let message = if kind == "live_owner" {
89        format!(
90            "thread '{}' already has a live reservation on session '{}'. Use `heddle thread show {}` or release the session before starting another writer.",
91            thread, owner.session_id, thread
92        )
93    } else {
94        format!(
95            "thread '{}' is reserved by session '{}' on anchor {}, but you requested {}. Refresh the thread or rebase before retrying.",
96            thread,
97            owner.session_id,
98            owner.anchor_state.as_deref().unwrap_or("<unknown>"),
99            requested_anchor_full
100        )
101    };
102    let conflict = AgentReservationConflict {
103        kind,
104        thread: thread.to_string(),
105        requested_anchor: requested_anchor_full.to_string(),
106        owner: Some(AgentReservationOutput::from(owner)),
107        reserved_anchor: owner.anchor_state.clone(),
108        message: message.clone(),
109    };
110    if let Ok(json) = serde_json::to_string(&conflict) {
111        println!("{}", json);
112    }
113    anyhow!(message)
114}
115
116fn emit_anchor_drift_no_owner(
117    thread: &str,
118    requested_anchor_full: &str,
119    reserved_anchor: &str,
120) -> anyhow::Error {
121    let message = format!(
122        "thread '{}' is anchored at {}, but reservation requested {}. Refresh the thread or rebase before retrying.",
123        thread, reserved_anchor, requested_anchor_full
124    );
125    let conflict = AgentReservationConflict {
126        kind: "anchor_drift",
127        thread: thread.to_string(),
128        requested_anchor: requested_anchor_full.to_string(),
129        owner: None,
130        reserved_anchor: Some(reserved_anchor.to_string()),
131        message: message.clone(),
132    };
133    if let Ok(json) = serde_json::to_string(&conflict) {
134        println!("{}", json);
135    }
136    anyhow!(message)
137}
138
139pub fn cmd_agent_reserve(cli: &Cli, args: AgentReserveArgs) -> Result<()> {
140    let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
141    let anchor = match &args.anchor {
142        Some(spec) => repo
143            .resolve_state(spec)?
144            .ok_or_else(|| anyhow!("anchor state '{}' not found", spec))?,
145        None => repo
146            .head()?
147            .ok_or_else(|| anyhow!("repository has no HEAD state to reserve from"))?,
148    };
149    let anchor_root = repo
150        .store()
151        .get_state(&anchor)?
152        .map(|state| state.tree.short())
153        .unwrap_or_default();
154    let anchor_full = anchor.to_string_full();
155    let thread_name = args.thread.clone();
156
157    // Hard pre-check: a thread ref already pointing at a different
158    // state without any live owner is an anchor-drift case the caller
159    // must resolve before we hand them a fresh reservation. We surface
160    // it here (rather than letting set_thread_cas fail later) so the
161    // structured JSON conflict is emitted on stdout.
162    let existing_ref = repo.refs().get_thread(&thread_name)?;
163    if let Some(existing) = existing_ref
164        && existing != anchor
165    {
166        // Look for a live owner first — if one exists, route through
167        // emit_live_owner_conflict so the caller sees the owner's
168        // session_id alongside the drift.
169        let registry = AgentRegistry::new(repo.heddle_dir());
170        registry.reap_dead_for_thread(&thread_name)?;
171        if let Some(owner) = registry
172            .list()?
173            .into_iter()
174            .find(|entry| entry.status == AgentStatus::Active && entry.thread == thread_name)
175        {
176            return Err(emit_live_owner_conflict(&thread_name, &anchor_full, &owner));
177        }
178        return Err(emit_anchor_drift_no_owner(
179            &thread_name,
180            &anchor_full,
181            &existing.to_string_full(),
182        ));
183    }
184
185    let registry = AgentRegistry::new(repo.heddle_dir());
186    let task = args.task.clone();
187    let anchor_full_for_entry = anchor_full.clone();
188    let anchor_short = anchor.short();
189    // `--hold-for-pid PID` binds the reservation to an external
190    // process (typically the orchestrator that wraps the heddle
191    // CLI). Without it we record this one-shot CLI's pid, which
192    // exits before the next liveness check — fine when the calling
193    // script doesn't care about reaping, but means the dead-pid
194    // reaper would recycle the reservation immediately if a second
195    // agent races in. With `--hold-for-pid` the reservation tracks
196    // the orchestrator's lifetime instead.
197    let recorded_pid = args.hold_for_pid.unwrap_or_else(std::process::id);
198    let outcome = registry.try_reserve_thread(&thread_name, |session_id| {
199        Ok(AgentEntry {
200            session_id: session_id.to_string(),
201            client_instance_id: None,
202            native_actor_key: None,
203            native_parent_actor_key: None,
204            native_instance_key: None,
205            heddle_session_id: None,
206            thread_id: Some(thread_name.clone()),
207            thread: thread_name.clone(),
208            pid: Some(recorded_pid),
209            boot_id: current_boot_id(),
210            liveness_path: Some(
211                repo.heddle_dir()
212                    .join("agents")
213                    .join(format!("{session_id}.live")),
214            ),
215            heartbeat_at: Some(Utc::now()),
216            anchor_state: Some(anchor_full_for_entry.clone()),
217            anchor_root: Some(anchor_root.clone()),
218            reservation_token: Some(objects::store::generate_agent_id()),
219            path: None,
220            base_state: anchor_short.clone(),
221            started_at: Utc::now(),
222            provider: None,
223            model: None,
224            harness: Some("heddle-agent-api".to_string()),
225            thinking_level: None,
226            usage_summary: AgentUsageSummary::default(),
227            last_progress_at: None,
228            report_flush_state: None,
229            attach_reason: task.clone(),
230            attach_precedence: vec!["agent-reserve".to_string()],
231            winning_attach_rule: Some("agent-reserve".to_string()),
232            probe_source: Some("agent_api".to_string()),
233            probe_confidence: Some(1.0),
234            status: AgentStatus::Active,
235            completed_at: None,
236            context_queries: vec![],
237        })
238    })?;
239
240    let entry = match outcome {
241        ReserveOutcome::Reserved(entry) => entry,
242        ReserveOutcome::LiveOwner(existing) => {
243            return Err(emit_live_owner_conflict(
244                &thread_name,
245                &anchor_full,
246                &existing,
247            ));
248        }
249    };
250
251    // We hold the reservation. The remaining steps (CAS, oplog record,
252    // thread metadata, JSON emit) must be all-or-nothing from the
253    // caller's perspective: if any step fails after `try_reserve_thread`
254    // wrote the Active entry, we have to mark that entry Abandoned
255    // before returning the error. Otherwise the caller never sees a
256    // session_id (no successful JSON output), but the registry retains
257    // a live-owner row that ghost-blocks subsequent reservers — which
258    // is exactly what `try_reserve_thread`'s own conflict logic would
259    // hit, until pid-liveness reaping eventually clears it.
260    //
261    // The race the reviewer flagged: another writer advances the
262    // thread ref between the pre-check at line 161 and the CAS below,
263    // causing `set_thread_cas` to return ExpectationViolated. The
264    // fallible closure here ensures we abandon the orphaned reservation
265    // before bubbling that error up.
266    let post_reserve = (|| -> Result<()> {
267        if let Some(existing) = existing_ref {
268            repo.refs()
269                .set_thread_cas(&thread_name, RefExpectation::Value(existing), &anchor)?;
270        } else {
271            repo.refs()
272                .set_thread_cas(&thread_name, RefExpectation::Missing, &anchor)?;
273            repo.oplog()
274                .record_thread_create(&thread_name, &anchor, Some(&repo.op_scope()))?;
275        }
276
277        // Ensure a Thread record exists so downstream commands
278        // (`agent ready`, `thread show`, `ready`, `merge --preview`) have
279        // first-class metadata to work with. `start_thread` does the
280        // same; we mirror just the minimum required for the agent API.
281        ensure_thread_record(&repo, &thread_name, &anchor, &args.task)?;
282
283        println!(
284            "{}",
285            serde_json::to_string(&AgentReservationOutput::from(&entry))?
286        );
287        Ok(())
288    })();
289
290    if let Err(err) = post_reserve {
291        // Best-effort abandon: if the registry write itself fails (FS
292        // error mid-cleanup), surface the original error and let the
293        // pid-based dead-owner reaper recycle the orphan on the next
294        // reserve. Logging the secondary failure would be ideal but
295        // would make the error wire-format dependent on transient FS
296        // state, so we keep the structured surface clean.
297        let _ = registry.update_entry(&entry.session_id, |e| {
298            e.status = AgentStatus::Abandoned;
299            e.completed_at = Some(Utc::now());
300        });
301        return Err(err);
302    }
303
304    Ok(())
305}
306
307/// Persist a minimal `Thread` record for `thread_name` if one does not
308/// already exist. Mirrors the relevant fields from `start_thread`.
309fn ensure_thread_record(
310    repo: &Repository,
311    thread_name: &str,
312    anchor: &objects::object::ChangeId,
313    task: &Option<String>,
314) -> Result<()> {
315    let manager = ThreadManager::new(repo.heddle_dir());
316    if manager.load(thread_name)?.is_some() {
317        return Ok(());
318    }
319    let state = repo
320        .store()
321        .get_state(anchor)?
322        .ok_or_else(|| anyhow!("anchor state '{}' not found", anchor.short()))?;
323    let base_short = anchor.short();
324    let base_root = state.tree.short();
325    let target_thread = match repo.head_ref()? {
326        Head::Attached { thread } if thread != thread_name => Some(thread),
327        _ => None,
328    };
329    let thread_state = Thread {
330        id: thread_name.to_string(),
331        thread: thread_name.to_string(),
332        target_thread,
333        parent_thread: None,
334        mode: ThreadMode::Lightweight,
335        state: ThreadState::Active,
336        base_state: base_short.clone(),
337        base_root,
338        current_state: Some(base_short),
339        merged_state: None,
340        task: task.clone(),
341        execution_path: repo.root().to_path_buf(),
342        materialized_path: None,
343        changed_paths: vec![],
344        impact_categories: vec![],
345        heavy_impact_paths: vec![],
346        promotion_suggested: false,
347        freshness: ThreadFreshness::Current,
348        verification_summary: ThreadVerificationSummary::default(),
349        confidence_summary: ThreadConfidenceSummary::default(),
350        integration_policy_result: ThreadIntegrationPolicy::default(),
351        created_at: Utc::now(),
352        updated_at: Utc::now(),
353        // Reservation-API-created threads aren't ephemeral by
354        // default; orchestrators that want TTL-bounded threads pass
355        // through `heddle thread create --ephemeral` instead.
356        ephemeral: None,
357        // Reservation API threads are user-orchestrated, not
358        // harness-auto-created — leave them visible in the default
359        // `thread list` view.
360        auto: false,
361        shared_target_dir: None,
362    };
363    manager.save(&thread_state)?;
364    Ok(())
365}
366
367pub fn cmd_agent_heartbeat(cli: &Cli, args: AgentHeartbeatArgs) -> Result<()> {
368    let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
369    let registry = AgentRegistry::new(repo.heddle_dir());
370    let entry = registry
371        .update_entry(&args.session, |entry| {
372            entry.heartbeat_at = Some(Utc::now());
373            entry.last_progress_at = Some(Utc::now());
374        })?
375        .ok_or_else(|| anyhow!("agent session '{}' not found", args.session))?;
376    println!(
377        "{}",
378        serde_json::to_string(&AgentReservationOutput::from(&entry))?
379    );
380    Ok(())
381}
382
383pub fn cmd_agent_release(cli: &Cli, args: AgentReleaseArgs) -> Result<()> {
384    let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
385    let registry = AgentRegistry::new(repo.heddle_dir());
386    let status = match args.status {
387        AgentReleaseStatusArg::Complete => AgentStatus::Complete,
388        AgentReleaseStatusArg::Abandoned => AgentStatus::Abandoned,
389    };
390    let entry = registry
391        .update_entry(&args.session, |entry| {
392            entry.status = status.clone();
393            entry.completed_at = match entry.status {
394                AgentStatus::Active => None,
395                AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
396                    Some(Utc::now())
397                }
398            };
399        })?
400        .ok_or_else(|| anyhow!("agent session '{}' not found", args.session))?;
401    println!(
402        "{}",
403        serde_json::to_string(&AgentReservationOutput::from(&entry))?
404    );
405    Ok(())
406}
407
408pub fn cmd_agent_list(cli: &Cli, args: AgentApiListArgs) -> Result<()> {
409    let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
410    let registry = AgentRegistry::new(repo.heddle_dir());
411    if args.alive_only {
412        // Sweep dead reservations before reporting so callers asking
413        // "who is alive?" see a pid-checked, current view.
414        registry.reap_dead()?;
415    }
416    let entries: Vec<_> = registry
417        .list()?
418        .into_iter()
419        .filter(|entry| {
420            args.thread
421                .as_ref()
422                .is_none_or(|thread| &entry.thread == thread)
423        })
424        .filter(|entry| !args.alive_only || entry.status == AgentStatus::Active)
425        .map(|entry| AgentReservationOutput::from(&entry))
426        .collect();
427    render_agent_list(&entries, should_output_json(cli, Some(repo.config())))
428}
429
430fn render_agent_list(entries: &[AgentReservationOutput], json: bool) -> Result<()> {
431    if json {
432        println!("{}", serde_json::to_string(entries)?);
433        return Ok(());
434    }
435    if entries.is_empty() {
436        println!("No agent reservations.");
437        return Ok(());
438    }
439    println!("Agent reservations ({}):", entries.len());
440    for entry in entries {
441        println!(
442            "  {} [{}] thread={}",
443            crate::cli::style::accent(&entry.session_id),
444            entry.status,
445            entry.thread,
446        );
447        if let Some(task) = &entry.task {
448            println!("    task: {}", crate::cli::style::dim(task));
449        }
450        if let Some(path) = &entry.path
451            && !path.is_empty()
452        {
453            println!("    path: {}", crate::cli::style::dim(path));
454        }
455    }
456    Ok(())
457}
458
459/// Resolve `--session SID` to an Active reservation, refresh its
460/// heartbeat, and return the entry. Errors out cleanly when the
461/// session is missing, terminal, or reaped between calls — that's the
462/// signal the orchestrator must re-reserve before continuing.
463fn validate_active_session(
464    registry: &AgentRegistry,
465    session_id: &str,
466) -> Result<objects::store::AgentEntry> {
467    let entry = registry
468        .update_entry(session_id, |entry| {
469            entry.heartbeat_at = Some(Utc::now());
470            entry.last_progress_at = Some(Utc::now());
471        })?
472        .ok_or_else(|| anyhow!("agent session '{}' not found", session_id))?;
473    if entry.status != AgentStatus::Active {
474        return Err(anyhow!(
475            "agent session '{}' is no longer active (status: {}). Re-reserve the thread before retrying.",
476            session_id,
477            entry.status
478        ));
479    }
480    Ok(entry)
481}
482
483/// `heddle agent capture --session <SID>`: a session-validated
484/// alias for `heddle capture` that proves the caller still owns the
485/// reservation it claims to before writing.
486pub async fn cmd_agent_capture(
487    cli: &Cli,
488    args: crate::cli::cli_args::AgentCaptureArgs,
489) -> Result<()> {
490    let repo_path = cli
491        .repo
492        .clone()
493        .unwrap_or(std::env::current_dir().map_err(anyhow::Error::from)?);
494    let repo = Repository::open(&repo_path)?;
495    let registry = AgentRegistry::new(repo.heddle_dir());
496    let entry = validate_active_session(&registry, &args.session)?;
497
498    // Confirm the reservation still names the thread the caller is
499    // attached to. We don't switch threads here — the agent must
500    // already be on its reserved thread when invoking capture.
501    if let Some(current) = repo.current_lane()?
502        && current != entry.thread
503    {
504        return Err(anyhow!(
505            "agent session '{}' reserved thread '{}', but the current thread is '{}'. Switch threads before capturing.",
506            args.session,
507            entry.thread,
508            current
509        ));
510    }
511
512    super::snapshot::cmd_snapshot(
513        cli,
514        args.message.clone(),
515        args.confidence,
516        false,
517        super::snapshot::SnapshotAgentOverrides {
518            provider: entry.provider.clone(),
519            model: entry.model.clone(),
520            session: Some(args.session.clone()),
521            segment: None,
522            policy: None,
523            no_policy: false,
524            no_agent: entry.provider.is_none() && entry.model.is_none(),
525        },
526    )
527    .await
528}
529
530/// `heddle agent ready --session <SID>`: a session-validated alias
531/// for `heddle ready` that ensures the caller still owns the
532/// reservation it's trying to mark ready.
533pub async fn cmd_agent_ready(cli: &Cli, args: crate::cli::cli_args::AgentReadyArgs) -> Result<()> {
534    let repo_path = cli
535        .repo
536        .clone()
537        .unwrap_or(std::env::current_dir().map_err(anyhow::Error::from)?);
538    let repo = Repository::open(&repo_path)?;
539    let registry = AgentRegistry::new(repo.heddle_dir());
540    let entry = validate_active_session(&registry, &args.session)?;
541
542    super::ready_cmd::cmd_ready(
543        cli,
544        crate::cli::cli_args::ReadyArgs {
545            thread: Some(entry.thread.clone()),
546            message: args.message.clone(),
547        },
548    )
549    .await
550}
551
552/// Return the combined JSON schema for the public agent-API output
553/// types. Snapshot-tested in `tests/agent_api_schema.rs` so any
554/// breaking change to the wire shape is caught at PR review.
555pub fn agent_api_schema() -> serde_json::Value {
556    serde_json::json!({
557        "AgentReservationOutput": schemars::schema_for!(AgentReservationOutput),
558        "AgentReservationConflict": schemars::schema_for!(AgentReservationConflict),
559    })
560}