1use anyhow::{Result, anyhow};
5use chrono::Utc;
6use objects::store::{
7 AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ReserveOutcome, current_boot_id,
8};
9use refs::{Head, RefExpectation};
10use repo::{
11 Repository, Thread, ThreadConfidenceSummary, ThreadFreshness, ThreadIntegrationPolicy,
12 ThreadManager, ThreadMode, ThreadState, ThreadVerificationSummary,
13};
14use schemars::JsonSchema;
15use serde::Serialize;
16
17use crate::cli::{
18 Cli,
19 cli_args::{
20 AgentApiListArgs, AgentHeartbeatArgs, AgentReleaseArgs, AgentReleaseStatusArg,
21 AgentReserveArgs,
22 },
23 should_output_json,
24};
25
26#[derive(Serialize, JsonSchema)]
27pub struct AgentReservationOutput {
28 pub session_id: String,
29 pub reservation_token: Option<String>,
30 pub thread: String,
31 pub anchor_state: Option<String>,
32 pub anchor_root: Option<String>,
33 pub status: String,
38 pub path: Option<String>,
39 pub task: Option<String>,
40}
41
42impl From<&AgentEntry> for AgentReservationOutput {
43 fn from(entry: &AgentEntry) -> Self {
44 Self {
45 session_id: entry.session_id.clone(),
46 reservation_token: entry.reservation_token.clone(),
47 thread: entry.thread.clone(),
48 anchor_state: entry.anchor_state.clone(),
49 anchor_root: entry.anchor_root.clone(),
50 status: entry.status.to_string(),
51 path: entry.path.as_ref().map(|path| path.display().to_string()),
52 task: entry.attach_reason.clone(),
53 }
54 }
55}
56
57#[derive(Serialize, JsonSchema)]
61pub struct AgentReservationConflict {
62 pub kind: &'static str,
66 pub thread: String,
67 pub requested_anchor: String,
68 pub owner: Option<AgentReservationOutput>,
71 pub reserved_anchor: Option<String>,
75 pub message: String,
76}
77
78fn emit_live_owner_conflict(
79 thread: &str,
80 requested_anchor_full: &str,
81 owner: &AgentEntry,
82) -> anyhow::Error {
83 let kind = if owner.anchor_state.as_deref() == Some(requested_anchor_full) {
84 "live_owner"
85 } else {
86 "anchor_drift"
87 };
88 let message = if kind == "live_owner" {
89 format!(
90 "thread '{}' already has a live reservation on session '{}'. Use `heddle thread show {}` or release the session before starting another writer.",
91 thread, owner.session_id, thread
92 )
93 } else {
94 format!(
95 "thread '{}' is reserved by session '{}' on anchor {}, but you requested {}. Refresh the thread or rebase before retrying.",
96 thread,
97 owner.session_id,
98 owner.anchor_state.as_deref().unwrap_or("<unknown>"),
99 requested_anchor_full
100 )
101 };
102 let conflict = AgentReservationConflict {
103 kind,
104 thread: thread.to_string(),
105 requested_anchor: requested_anchor_full.to_string(),
106 owner: Some(AgentReservationOutput::from(owner)),
107 reserved_anchor: owner.anchor_state.clone(),
108 message: message.clone(),
109 };
110 if let Ok(json) = serde_json::to_string(&conflict) {
111 println!("{}", json);
112 }
113 anyhow!(message)
114}
115
116fn emit_anchor_drift_no_owner(
117 thread: &str,
118 requested_anchor_full: &str,
119 reserved_anchor: &str,
120) -> anyhow::Error {
121 let message = format!(
122 "thread '{}' is anchored at {}, but reservation requested {}. Refresh the thread or rebase before retrying.",
123 thread, reserved_anchor, requested_anchor_full
124 );
125 let conflict = AgentReservationConflict {
126 kind: "anchor_drift",
127 thread: thread.to_string(),
128 requested_anchor: requested_anchor_full.to_string(),
129 owner: None,
130 reserved_anchor: Some(reserved_anchor.to_string()),
131 message: message.clone(),
132 };
133 if let Ok(json) = serde_json::to_string(&conflict) {
134 println!("{}", json);
135 }
136 anyhow!(message)
137}
138
139pub fn cmd_agent_reserve(cli: &Cli, args: AgentReserveArgs) -> Result<()> {
140 let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
141 let anchor = match &args.anchor {
142 Some(spec) => repo
143 .resolve_state(spec)?
144 .ok_or_else(|| anyhow!("anchor state '{}' not found", spec))?,
145 None => repo
146 .head()?
147 .ok_or_else(|| anyhow!("repository has no HEAD state to reserve from"))?,
148 };
149 let anchor_root = repo
150 .store()
151 .get_state(&anchor)?
152 .map(|state| state.tree.short())
153 .unwrap_or_default();
154 let anchor_full = anchor.to_string_full();
155 let thread_name = args.thread.clone();
156
157 let existing_ref = repo.refs().get_thread(&thread_name)?;
163 if let Some(existing) = existing_ref
164 && existing != anchor
165 {
166 let registry = AgentRegistry::new(repo.heddle_dir());
170 registry.reap_dead_for_thread(&thread_name)?;
171 if let Some(owner) = registry
172 .list()?
173 .into_iter()
174 .find(|entry| entry.status == AgentStatus::Active && entry.thread == thread_name)
175 {
176 return Err(emit_live_owner_conflict(&thread_name, &anchor_full, &owner));
177 }
178 return Err(emit_anchor_drift_no_owner(
179 &thread_name,
180 &anchor_full,
181 &existing.to_string_full(),
182 ));
183 }
184
185 let registry = AgentRegistry::new(repo.heddle_dir());
186 let task = args.task.clone();
187 let anchor_full_for_entry = anchor_full.clone();
188 let anchor_short = anchor.short();
189 let recorded_pid = args.hold_for_pid.unwrap_or_else(std::process::id);
198 let outcome = registry.try_reserve_thread(&thread_name, |session_id| {
199 Ok(AgentEntry {
200 session_id: session_id.to_string(),
201 client_instance_id: None,
202 native_actor_key: None,
203 native_parent_actor_key: None,
204 native_instance_key: None,
205 heddle_session_id: None,
206 thread_id: Some(thread_name.clone()),
207 thread: thread_name.clone(),
208 pid: Some(recorded_pid),
209 boot_id: current_boot_id(),
210 liveness_path: Some(
211 repo.heddle_dir()
212 .join("agents")
213 .join(format!("{session_id}.live")),
214 ),
215 heartbeat_at: Some(Utc::now()),
216 anchor_state: Some(anchor_full_for_entry.clone()),
217 anchor_root: Some(anchor_root.clone()),
218 reservation_token: Some(objects::store::generate_agent_id()),
219 path: None,
220 base_state: anchor_short.clone(),
221 started_at: Utc::now(),
222 provider: None,
223 model: None,
224 harness: Some("heddle-agent-api".to_string()),
225 thinking_level: None,
226 usage_summary: AgentUsageSummary::default(),
227 last_progress_at: None,
228 report_flush_state: None,
229 attach_reason: task.clone(),
230 attach_precedence: vec!["agent-reserve".to_string()],
231 winning_attach_rule: Some("agent-reserve".to_string()),
232 probe_source: Some("agent_api".to_string()),
233 probe_confidence: Some(1.0),
234 status: AgentStatus::Active,
235 completed_at: None,
236 context_queries: vec![],
237 })
238 })?;
239
240 let entry = match outcome {
241 ReserveOutcome::Reserved(entry) => entry,
242 ReserveOutcome::LiveOwner(existing) => {
243 return Err(emit_live_owner_conflict(
244 &thread_name,
245 &anchor_full,
246 &existing,
247 ));
248 }
249 };
250
251 let post_reserve = (|| -> Result<()> {
267 if let Some(existing) = existing_ref {
268 repo.refs()
269 .set_thread_cas(&thread_name, RefExpectation::Value(existing), &anchor)?;
270 } else {
271 repo.refs()
272 .set_thread_cas(&thread_name, RefExpectation::Missing, &anchor)?;
273 repo.oplog()
274 .record_thread_create(&thread_name, &anchor, Some(&repo.op_scope()))?;
275 }
276
277 ensure_thread_record(&repo, &thread_name, &anchor, &args.task)?;
282
283 println!(
284 "{}",
285 serde_json::to_string(&AgentReservationOutput::from(&entry))?
286 );
287 Ok(())
288 })();
289
290 if let Err(err) = post_reserve {
291 let _ = registry.update_entry(&entry.session_id, |e| {
298 e.status = AgentStatus::Abandoned;
299 e.completed_at = Some(Utc::now());
300 });
301 return Err(err);
302 }
303
304 Ok(())
305}
306
307fn ensure_thread_record(
310 repo: &Repository,
311 thread_name: &str,
312 anchor: &objects::object::ChangeId,
313 task: &Option<String>,
314) -> Result<()> {
315 let manager = ThreadManager::new(repo.heddle_dir());
316 if manager.load(thread_name)?.is_some() {
317 return Ok(());
318 }
319 let state = repo
320 .store()
321 .get_state(anchor)?
322 .ok_or_else(|| anyhow!("anchor state '{}' not found", anchor.short()))?;
323 let base_short = anchor.short();
324 let base_root = state.tree.short();
325 let target_thread = match repo.head_ref()? {
326 Head::Attached { thread } if thread != thread_name => Some(thread),
327 _ => None,
328 };
329 let thread_state = Thread {
330 id: thread_name.to_string(),
331 thread: thread_name.to_string(),
332 target_thread,
333 parent_thread: None,
334 mode: ThreadMode::Lightweight,
335 state: ThreadState::Active,
336 base_state: base_short.clone(),
337 base_root,
338 current_state: Some(base_short),
339 merged_state: None,
340 task: task.clone(),
341 execution_path: repo.root().to_path_buf(),
342 materialized_path: None,
343 changed_paths: vec![],
344 impact_categories: vec![],
345 heavy_impact_paths: vec![],
346 promotion_suggested: false,
347 freshness: ThreadFreshness::Current,
348 verification_summary: ThreadVerificationSummary::default(),
349 confidence_summary: ThreadConfidenceSummary::default(),
350 integration_policy_result: ThreadIntegrationPolicy::default(),
351 created_at: Utc::now(),
352 updated_at: Utc::now(),
353 ephemeral: None,
357 auto: false,
361 shared_target_dir: None,
362 };
363 manager.save(&thread_state)?;
364 Ok(())
365}
366
367pub fn cmd_agent_heartbeat(cli: &Cli, args: AgentHeartbeatArgs) -> Result<()> {
368 let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
369 let registry = AgentRegistry::new(repo.heddle_dir());
370 let entry = registry
371 .update_entry(&args.session, |entry| {
372 entry.heartbeat_at = Some(Utc::now());
373 entry.last_progress_at = Some(Utc::now());
374 })?
375 .ok_or_else(|| anyhow!("agent session '{}' not found", args.session))?;
376 println!(
377 "{}",
378 serde_json::to_string(&AgentReservationOutput::from(&entry))?
379 );
380 Ok(())
381}
382
383pub fn cmd_agent_release(cli: &Cli, args: AgentReleaseArgs) -> Result<()> {
384 let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
385 let registry = AgentRegistry::new(repo.heddle_dir());
386 let status = match args.status {
387 AgentReleaseStatusArg::Complete => AgentStatus::Complete,
388 AgentReleaseStatusArg::Abandoned => AgentStatus::Abandoned,
389 };
390 let entry = registry
391 .update_entry(&args.session, |entry| {
392 entry.status = status.clone();
393 entry.completed_at = match entry.status {
394 AgentStatus::Active => None,
395 AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
396 Some(Utc::now())
397 }
398 };
399 })?
400 .ok_or_else(|| anyhow!("agent session '{}' not found", args.session))?;
401 println!(
402 "{}",
403 serde_json::to_string(&AgentReservationOutput::from(&entry))?
404 );
405 Ok(())
406}
407
408pub fn cmd_agent_list(cli: &Cli, args: AgentApiListArgs) -> Result<()> {
409 let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
410 let registry = AgentRegistry::new(repo.heddle_dir());
411 if args.alive_only {
412 registry.reap_dead()?;
415 }
416 let entries: Vec<_> = registry
417 .list()?
418 .into_iter()
419 .filter(|entry| {
420 args.thread
421 .as_ref()
422 .is_none_or(|thread| &entry.thread == thread)
423 })
424 .filter(|entry| !args.alive_only || entry.status == AgentStatus::Active)
425 .map(|entry| AgentReservationOutput::from(&entry))
426 .collect();
427 render_agent_list(&entries, should_output_json(cli, Some(repo.config())))
428}
429
430fn render_agent_list(entries: &[AgentReservationOutput], json: bool) -> Result<()> {
431 if json {
432 println!("{}", serde_json::to_string(entries)?);
433 return Ok(());
434 }
435 if entries.is_empty() {
436 println!("No agent reservations.");
437 return Ok(());
438 }
439 println!("Agent reservations ({}):", entries.len());
440 for entry in entries {
441 println!(
442 " {} [{}] thread={}",
443 crate::cli::style::accent(&entry.session_id),
444 entry.status,
445 entry.thread,
446 );
447 if let Some(task) = &entry.task {
448 println!(" task: {}", crate::cli::style::dim(task));
449 }
450 if let Some(path) = &entry.path
451 && !path.is_empty()
452 {
453 println!(" path: {}", crate::cli::style::dim(path));
454 }
455 }
456 Ok(())
457}
458
459fn validate_active_session(
464 registry: &AgentRegistry,
465 session_id: &str,
466) -> Result<objects::store::AgentEntry> {
467 let entry = registry
468 .update_entry(session_id, |entry| {
469 entry.heartbeat_at = Some(Utc::now());
470 entry.last_progress_at = Some(Utc::now());
471 })?
472 .ok_or_else(|| anyhow!("agent session '{}' not found", session_id))?;
473 if entry.status != AgentStatus::Active {
474 return Err(anyhow!(
475 "agent session '{}' is no longer active (status: {}). Re-reserve the thread before retrying.",
476 session_id,
477 entry.status
478 ));
479 }
480 Ok(entry)
481}
482
483pub async fn cmd_agent_capture(
487 cli: &Cli,
488 args: crate::cli::cli_args::AgentCaptureArgs,
489) -> Result<()> {
490 let repo_path = cli
491 .repo
492 .clone()
493 .unwrap_or(std::env::current_dir().map_err(anyhow::Error::from)?);
494 let repo = Repository::open(&repo_path)?;
495 let registry = AgentRegistry::new(repo.heddle_dir());
496 let entry = validate_active_session(®istry, &args.session)?;
497
498 if let Some(current) = repo.current_lane()?
502 && current != entry.thread
503 {
504 return Err(anyhow!(
505 "agent session '{}' reserved thread '{}', but the current thread is '{}'. Switch threads before capturing.",
506 args.session,
507 entry.thread,
508 current
509 ));
510 }
511
512 super::snapshot::cmd_snapshot(
513 cli,
514 args.message.clone(),
515 args.confidence,
516 false,
517 super::snapshot::SnapshotAgentOverrides {
518 provider: entry.provider.clone(),
519 model: entry.model.clone(),
520 session: Some(args.session.clone()),
521 segment: None,
522 policy: None,
523 no_policy: false,
524 no_agent: entry.provider.is_none() && entry.model.is_none(),
525 },
526 )
527 .await
528}
529
530pub async fn cmd_agent_ready(cli: &Cli, args: crate::cli::cli_args::AgentReadyArgs) -> Result<()> {
534 let repo_path = cli
535 .repo
536 .clone()
537 .unwrap_or(std::env::current_dir().map_err(anyhow::Error::from)?);
538 let repo = Repository::open(&repo_path)?;
539 let registry = AgentRegistry::new(repo.heddle_dir());
540 let entry = validate_active_session(®istry, &args.session)?;
541
542 super::ready_cmd::cmd_ready(
543 cli,
544 crate::cli::cli_args::ReadyArgs {
545 thread: Some(entry.thread.clone()),
546 message: args.message.clone(),
547 },
548 )
549 .await
550}
551
552pub fn agent_api_schema() -> serde_json::Value {
556 serde_json::json!({
557 "AgentReservationOutput": schemars::schema_for!(AgentReservationOutput),
558 "AgentReservationConflict": schemars::schema_for!(AgentReservationConflict),
559 })
560}