use anyhow::{Result, anyhow};
use chrono::Utc;
use objects::store::{
AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ReserveOutcome, current_boot_id,
};
use refs::{Head, RefExpectation};
use repo::{
Repository, Thread, ThreadConfidenceSummary, ThreadFreshness, ThreadIntegrationPolicy,
ThreadManager, ThreadMode, ThreadState, ThreadVerificationSummary,
};
use schemars::JsonSchema;
use serde::Serialize;
use crate::cli::{
Cli,
cli_args::{
AgentApiListArgs, AgentHeartbeatArgs, AgentReleaseArgs, AgentReleaseStatusArg,
AgentReserveArgs,
},
should_output_json,
};
#[derive(Serialize, JsonSchema)]
pub struct AgentReservationOutput {
pub session_id: String,
pub reservation_token: Option<String>,
pub thread: String,
pub anchor_state: Option<String>,
pub anchor_root: Option<String>,
pub status: String,
pub path: Option<String>,
pub task: Option<String>,
}
impl From<&AgentEntry> for AgentReservationOutput {
fn from(entry: &AgentEntry) -> Self {
Self {
session_id: entry.session_id.clone(),
reservation_token: entry.reservation_token.clone(),
thread: entry.thread.clone(),
anchor_state: entry.anchor_state.clone(),
anchor_root: entry.anchor_root.clone(),
status: entry.status.to_string(),
path: entry.path.as_ref().map(|path| path.display().to_string()),
task: entry.attach_reason.clone(),
}
}
}
#[derive(Serialize, JsonSchema)]
pub struct AgentReservationConflict {
pub kind: &'static str,
pub thread: String,
pub requested_anchor: String,
pub owner: Option<AgentReservationOutput>,
pub reserved_anchor: Option<String>,
pub message: String,
}
fn emit_live_owner_conflict(
thread: &str,
requested_anchor_full: &str,
owner: &AgentEntry,
) -> anyhow::Error {
let kind = if owner.anchor_state.as_deref() == Some(requested_anchor_full) {
"live_owner"
} else {
"anchor_drift"
};
let message = if kind == "live_owner" {
format!(
"thread '{}' already has a live reservation on session '{}'. Use `heddle thread show {}` or release the session before starting another writer.",
thread, owner.session_id, thread
)
} else {
format!(
"thread '{}' is reserved by session '{}' on anchor {}, but you requested {}. Refresh the thread or rebase before retrying.",
thread,
owner.session_id,
owner.anchor_state.as_deref().unwrap_or("<unknown>"),
requested_anchor_full
)
};
let conflict = AgentReservationConflict {
kind,
thread: thread.to_string(),
requested_anchor: requested_anchor_full.to_string(),
owner: Some(AgentReservationOutput::from(owner)),
reserved_anchor: owner.anchor_state.clone(),
message: message.clone(),
};
if let Ok(json) = serde_json::to_string(&conflict) {
println!("{}", json);
}
anyhow!(message)
}
fn emit_anchor_drift_no_owner(
thread: &str,
requested_anchor_full: &str,
reserved_anchor: &str,
) -> anyhow::Error {
let message = format!(
"thread '{}' is anchored at {}, but reservation requested {}. Refresh the thread or rebase before retrying.",
thread, reserved_anchor, requested_anchor_full
);
let conflict = AgentReservationConflict {
kind: "anchor_drift",
thread: thread.to_string(),
requested_anchor: requested_anchor_full.to_string(),
owner: None,
reserved_anchor: Some(reserved_anchor.to_string()),
message: message.clone(),
};
if let Ok(json) = serde_json::to_string(&conflict) {
println!("{}", json);
}
anyhow!(message)
}
pub fn cmd_agent_reserve(cli: &Cli, args: AgentReserveArgs) -> Result<()> {
let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
let anchor = match &args.anchor {
Some(spec) => repo
.resolve_state(spec)?
.ok_or_else(|| anyhow!("anchor state '{}' not found", spec))?,
None => repo
.head()?
.ok_or_else(|| anyhow!("repository has no HEAD state to reserve from"))?,
};
let anchor_root = repo
.store()
.get_state(&anchor)?
.map(|state| state.tree.short())
.unwrap_or_default();
let anchor_full = anchor.to_string_full();
let thread_name = args.thread.clone();
let existing_ref = repo.refs().get_thread(&thread_name)?;
if let Some(existing) = existing_ref
&& existing != anchor
{
let registry = AgentRegistry::new(repo.heddle_dir());
registry.reap_dead_for_thread(&thread_name)?;
if let Some(owner) = registry
.list()?
.into_iter()
.find(|entry| entry.status == AgentStatus::Active && entry.thread == thread_name)
{
return Err(emit_live_owner_conflict(&thread_name, &anchor_full, &owner));
}
return Err(emit_anchor_drift_no_owner(
&thread_name,
&anchor_full,
&existing.to_string_full(),
));
}
let registry = AgentRegistry::new(repo.heddle_dir());
let task = args.task.clone();
let anchor_full_for_entry = anchor_full.clone();
let anchor_short = anchor.short();
let recorded_pid = args.hold_for_pid.unwrap_or_else(std::process::id);
let outcome = registry.try_reserve_thread(&thread_name, |session_id| {
Ok(AgentEntry {
session_id: session_id.to_string(),
client_instance_id: None,
native_actor_key: None,
native_parent_actor_key: None,
native_instance_key: None,
heddle_session_id: None,
thread_id: Some(thread_name.clone()),
thread: thread_name.clone(),
pid: Some(recorded_pid),
boot_id: current_boot_id(),
liveness_path: Some(
repo.heddle_dir()
.join("agents")
.join(format!("{session_id}.live")),
),
heartbeat_at: Some(Utc::now()),
anchor_state: Some(anchor_full_for_entry.clone()),
anchor_root: Some(anchor_root.clone()),
reservation_token: Some(objects::store::generate_agent_id()),
path: None,
base_state: anchor_short.clone(),
started_at: Utc::now(),
provider: None,
model: None,
harness: Some("heddle-agent-api".to_string()),
thinking_level: None,
usage_summary: AgentUsageSummary::default(),
last_progress_at: None,
report_flush_state: None,
attach_reason: task.clone(),
attach_precedence: vec!["agent-reserve".to_string()],
winning_attach_rule: Some("agent-reserve".to_string()),
probe_source: Some("agent_api".to_string()),
probe_confidence: Some(1.0),
status: AgentStatus::Active,
completed_at: None,
context_queries: vec![],
})
})?;
let entry = match outcome {
ReserveOutcome::Reserved(entry) => entry,
ReserveOutcome::LiveOwner(existing) => {
return Err(emit_live_owner_conflict(
&thread_name,
&anchor_full,
&existing,
));
}
};
let post_reserve = (|| -> Result<()> {
if let Some(existing) = existing_ref {
repo.refs()
.set_thread_cas(&thread_name, RefExpectation::Value(existing), &anchor)?;
} else {
repo.refs()
.set_thread_cas(&thread_name, RefExpectation::Missing, &anchor)?;
repo.oplog()
.record_thread_create(&thread_name, &anchor, Some(&repo.op_scope()))?;
}
ensure_thread_record(&repo, &thread_name, &anchor, &args.task)?;
println!(
"{}",
serde_json::to_string(&AgentReservationOutput::from(&entry))?
);
Ok(())
})();
if let Err(err) = post_reserve {
let _ = registry.update_entry(&entry.session_id, |e| {
e.status = AgentStatus::Abandoned;
e.completed_at = Some(Utc::now());
});
return Err(err);
}
Ok(())
}
fn ensure_thread_record(
repo: &Repository,
thread_name: &str,
anchor: &objects::object::ChangeId,
task: &Option<String>,
) -> Result<()> {
let manager = ThreadManager::new(repo.heddle_dir());
if manager.load(thread_name)?.is_some() {
return Ok(());
}
let state = repo
.store()
.get_state(anchor)?
.ok_or_else(|| anyhow!("anchor state '{}' not found", anchor.short()))?;
let base_short = anchor.short();
let base_root = state.tree.short();
let target_thread = match repo.head_ref()? {
Head::Attached { thread } if thread != thread_name => Some(thread),
_ => None,
};
let thread_state = Thread {
id: thread_name.to_string(),
thread: thread_name.to_string(),
target_thread,
parent_thread: None,
mode: ThreadMode::Lightweight,
state: ThreadState::Active,
base_state: base_short.clone(),
base_root,
current_state: Some(base_short),
merged_state: None,
task: task.clone(),
execution_path: repo.root().to_path_buf(),
materialized_path: None,
changed_paths: vec![],
impact_categories: vec![],
heavy_impact_paths: vec![],
promotion_suggested: false,
freshness: ThreadFreshness::Current,
verification_summary: ThreadVerificationSummary::default(),
confidence_summary: ThreadConfidenceSummary::default(),
integration_policy_result: ThreadIntegrationPolicy::default(),
created_at: Utc::now(),
updated_at: Utc::now(),
ephemeral: None,
auto: false,
shared_target_dir: None,
};
manager.save(&thread_state)?;
Ok(())
}
pub fn cmd_agent_heartbeat(cli: &Cli, args: AgentHeartbeatArgs) -> Result<()> {
let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
let registry = AgentRegistry::new(repo.heddle_dir());
let entry = registry
.update_entry(&args.session, |entry| {
entry.heartbeat_at = Some(Utc::now());
entry.last_progress_at = Some(Utc::now());
})?
.ok_or_else(|| anyhow!("agent session '{}' not found", args.session))?;
println!(
"{}",
serde_json::to_string(&AgentReservationOutput::from(&entry))?
);
Ok(())
}
pub fn cmd_agent_release(cli: &Cli, args: AgentReleaseArgs) -> Result<()> {
let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
let registry = AgentRegistry::new(repo.heddle_dir());
let status = match args.status {
AgentReleaseStatusArg::Complete => AgentStatus::Complete,
AgentReleaseStatusArg::Abandoned => AgentStatus::Abandoned,
};
let entry = registry
.update_entry(&args.session, |entry| {
entry.status = status.clone();
entry.completed_at = match entry.status {
AgentStatus::Active => None,
AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
Some(Utc::now())
}
};
})?
.ok_or_else(|| anyhow!("agent session '{}' not found", args.session))?;
println!(
"{}",
serde_json::to_string(&AgentReservationOutput::from(&entry))?
);
Ok(())
}
pub fn cmd_agent_list(cli: &Cli, args: AgentApiListArgs) -> Result<()> {
let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
let registry = AgentRegistry::new(repo.heddle_dir());
if args.alive_only {
registry.reap_dead()?;
}
let entries: Vec<_> = registry
.list()?
.into_iter()
.filter(|entry| {
args.thread
.as_ref()
.is_none_or(|thread| &entry.thread == thread)
})
.filter(|entry| !args.alive_only || entry.status == AgentStatus::Active)
.map(|entry| AgentReservationOutput::from(&entry))
.collect();
render_agent_list(&entries, should_output_json(cli, Some(repo.config())))
}
fn render_agent_list(entries: &[AgentReservationOutput], json: bool) -> Result<()> {
if json {
println!("{}", serde_json::to_string(entries)?);
return Ok(());
}
if entries.is_empty() {
println!("No agent reservations.");
return Ok(());
}
println!("Agent reservations ({}):", entries.len());
for entry in entries {
println!(
" {} [{}] thread={}",
crate::cli::style::accent(&entry.session_id),
entry.status,
entry.thread,
);
if let Some(task) = &entry.task {
println!(" task: {}", crate::cli::style::dim(task));
}
if let Some(path) = &entry.path
&& !path.is_empty()
{
println!(" path: {}", crate::cli::style::dim(path));
}
}
Ok(())
}
fn validate_active_session(
registry: &AgentRegistry,
session_id: &str,
) -> Result<objects::store::AgentEntry> {
let entry = registry
.update_entry(session_id, |entry| {
entry.heartbeat_at = Some(Utc::now());
entry.last_progress_at = Some(Utc::now());
})?
.ok_or_else(|| anyhow!("agent session '{}' not found", session_id))?;
if entry.status != AgentStatus::Active {
return Err(anyhow!(
"agent session '{}' is no longer active (status: {}). Re-reserve the thread before retrying.",
session_id,
entry.status
));
}
Ok(entry)
}
pub async fn cmd_agent_capture(
cli: &Cli,
args: crate::cli::cli_args::AgentCaptureArgs,
) -> Result<()> {
let repo_path = cli
.repo
.clone()
.unwrap_or(std::env::current_dir().map_err(anyhow::Error::from)?);
let repo = Repository::open(&repo_path)?;
let registry = AgentRegistry::new(repo.heddle_dir());
let entry = validate_active_session(®istry, &args.session)?;
if let Some(current) = repo.current_lane()?
&& current != entry.thread
{
return Err(anyhow!(
"agent session '{}' reserved thread '{}', but the current thread is '{}'. Switch threads before capturing.",
args.session,
entry.thread,
current
));
}
super::snapshot::cmd_snapshot(
cli,
args.message.clone(),
args.confidence,
false,
super::snapshot::SnapshotAgentOverrides {
provider: entry.provider.clone(),
model: entry.model.clone(),
session: Some(args.session.clone()),
segment: None,
policy: None,
no_policy: false,
no_agent: entry.provider.is_none() && entry.model.is_none(),
},
)
.await
}
pub async fn cmd_agent_ready(cli: &Cli, args: crate::cli::cli_args::AgentReadyArgs) -> Result<()> {
let repo_path = cli
.repo
.clone()
.unwrap_or(std::env::current_dir().map_err(anyhow::Error::from)?);
let repo = Repository::open(&repo_path)?;
let registry = AgentRegistry::new(repo.heddle_dir());
let entry = validate_active_session(®istry, &args.session)?;
super::ready_cmd::cmd_ready(
cli,
crate::cli::cli_args::ReadyArgs {
thread: Some(entry.thread.clone()),
message: args.message.clone(),
},
)
.await
}
pub fn agent_api_schema() -> serde_json::Value {
serde_json::json!({
"AgentReservationOutput": schemars::schema_for!(AgentReservationOutput),
"AgentReservationConflict": schemars::schema_for!(AgentReservationConflict),
})
}