use std::{
collections::{BTreeMap, BTreeSet},
fs::{self, OpenOptions},
io::{BufRead, BufReader, BufWriter, Write},
path::{Path, PathBuf},
};
use anyhow::{Result, anyhow};
use base64::Engine as _;
use chrono::Utc;
use objects::{
fs_atomic::write_file_atomic,
object::{DiffKind, Session, Tree},
store::{AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary},
};
use proto::{
HarnessIdentity, ProgressCheckpoint, SessionDiffSummary, SessionReportEnvelope,
TranscriptAttachmentRef, UsageTotals, WorktreeChangeBaseline,
};
use refs::Head;
use repo::{
Repository, SessionManager, Thread, ThreadFreshness, ThreadIntegrationPolicy, ThreadManager,
ThreadMode, ThreadState,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
mod claude_hook;
mod probe;
use self::probe::{HarnessProbeInput, HarnessProbeResult, probe_harness_actor};
use crate::{
cli::{
Cli,
commands::{
snapshot::{summarize_confidence, summarize_verification},
worktree_cmd::helpers::{prepare_worktree_target, write_isolated_checkout},
},
worktree_status_options,
},
config::{
HarnessMode, HarnessTranscriptMode, HarnessTransport, UserConfig, UserHarnessOverride,
UserHarnessRootThreadPolicy, UserHarnessSubagentThreadPolicy, UserThreadWorkspaceMode,
},
};
pub fn cmd_harness_bridge(cli: &Cli) -> Result<()> {
let repo = Repository::open(cli.repo.as_ref().unwrap_or(&std::env::current_dir()?))?;
let user_config = UserConfig::load_default().unwrap_or_default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let stdin = std::io::stdin();
let stdout = std::io::stdout();
let reader = BufReader::new(stdin.lock());
let mut writer = BufWriter::new(stdout.lock());
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let response = match serde_json::from_str::<BridgeRequest>(&line) {
Ok(request) => runtime.handle_request(request),
Err(err) => BridgeResponse::error(
None,
"invalid_request",
format!("failed to parse request: {err}"),
),
};
serde_json::to_writer(&mut writer, &response)?;
writer.write_all(b"\n")?;
writer.flush()?;
}
Ok(())
}
pub(crate) fn relay_harness_event(
repo: &Repository,
user_config: &UserConfig,
harness: &str,
event: &str,
payload: &str,
) -> Result<()> {
let mut runtime =
HarnessBridgeRuntime::new(Repository::open(repo.root())?, user_config.clone());
let json = if payload.trim().is_empty() {
Value::Null
} else {
serde_json::from_str::<Value>(payload).unwrap_or(Value::Null)
};
match harness {
"codex" => relay_codex(&mut runtime, event, &json),
"claude-code" => relay_claude(&mut runtime, event, &json),
"opencode" => relay_opencode(&mut runtime, event, &json),
other => Err(anyhow!("unsupported harness relay: {other}")),
}
}
struct HarnessBridgeRuntime {
repo: Repository,
user_config: UserConfig,
reports: SessionReportStore,
}
struct RegistryEntryRequest<'a> {
heddle_session_id: &'a str,
thread_name: Option<&'a str>,
thread_id: Option<&'a str>,
identity: &'a ResolvedIdentity,
probe: &'a HarnessProbeResult,
attach: &'a ResolvedAttachment,
client_instance_id: Option<&'a str>,
requested_entry: Option<&'a AgentEntry>,
}
struct CanonicalActorSessionRequest<'a> {
tentative_session: Session,
tentative_owns_session: bool,
entry: &'a AgentEntry,
probe: &'a HarnessProbeResult,
attach: &'a mut ResolvedAttachment,
}
struct AttachmentResolutionInput<'a> {
requested_entry: Option<&'a AgentEntry>,
explicit_heddle_session_id: Option<&'a str>,
client_instance_id: Option<&'a str>,
probe: &'a HarnessProbeResult,
token_claims: Option<&'a TokenClaims>,
}
fn relay_codex(runtime: &mut HarnessBridgeRuntime, _event: &str, payload: &Value) -> Result<()> {
let metadata = map_from_pairs([
(
"client_name",
value_string(payload, &["client"]).or_else(|| value_string(payload, &["client_name"])),
),
("model", value_string(payload, &["model"])),
(
"model_provider",
value_string(payload, &["model_provider"])
.or_else(|| value_string(payload, &["provider"])),
),
(
"model_reasoning_effort",
value_string(payload, &["reasoning_effort"]),
),
]);
let opened = runtime.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
summary: value_string(payload, &["message"]),
probe_metadata: metadata,
..OpenSessionParams::default()
})?;
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id,
summary: value_string(payload, &["message"]),
harness: Some("codex".to_string()),
..UpdateProgressParams::default()
})?;
Ok(())
}
fn relay_claude(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
let metadata = map_from_pairs([
("session_id", value_string(payload, &["session_id"])),
("agent_id", value_string(payload, &["agent_id"])),
("session_name", value_string(payload, &["session_name"])),
(
"transcript_path",
value_string(payload, &["transcript_path"]),
),
(
"model",
value_string(payload, &["model", "id"]).or_else(|| value_string(payload, &["model"])),
),
(
"model_display_name",
value_string(payload, &["model", "display_name"]),
),
("effort", value_string(payload, &["effort"])),
("hook_event", Some(event.to_string())),
(
"status_line",
(event == "StatusLine").then(|| "1".to_string()),
),
(
"touched_paths",
value_array_join(payload, &["tool_response", "filePaths"])
.or_else(|| value_string(payload, &["file_path"])),
),
(
"input_tokens",
value_u64_string(payload, &["context_window", "total_input_tokens"]),
),
(
"output_tokens",
value_u64_string(payload, &["context_window", "total_output_tokens"]),
),
(
"cost_micros_usd",
value_cost_micros(payload, &["cost", "total_cost_usd"]),
),
]);
let opened = runtime.open_session(OpenSessionParams {
harness: Some("claude-code".to_string()),
model: value_string(payload, &["model", "display_name"])
.or_else(|| value_string(payload, &["model", "id"]))
.or_else(|| value_string(payload, &["model"])),
summary: value_string(payload, &["message"]).or_else(|| value_string(payload, &["reason"])),
probe_metadata: metadata.clone(),
..OpenSessionParams::default()
})?;
match event {
"SessionEnd" => {
runtime.close_session(CloseSessionParams {
heddle_session_id: opened.heddle_session_id,
summary: value_string(payload, &["reason"])
.or_else(|| value_string(payload, &["stop_hook_active"])),
outcome: Some("completed".to_string()),
..CloseSessionParams::default()
})?;
}
"StatusLine" => {
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id.clone(),
harness: Some("claude-code".to_string()),
status: Some("StatusLine".to_string()),
message: value_string(payload, &["session_name"])
.or_else(|| value_string(payload, &["cwd"]))
.or_else(|| value_string(payload, &["workspace", "current_dir"])),
probe_metadata: metadata.clone(),
..UpdateProgressParams::default()
})?;
runtime.record_usage(RecordUsageParams {
heddle_session_id: opened.heddle_session_id,
input_tokens: value_u64(payload, &["context_window", "total_input_tokens"]),
output_tokens: value_u64(payload, &["context_window", "total_output_tokens"]),
reasoning_tokens: value_u64(payload, &["context_window", "total_reasoning_tokens"]),
cache_creation_tokens: None,
cache_read_tokens: None,
tool_calls: None,
cost_micros_usd: value_cost_micros_u64(payload, &["cost", "total_cost_usd"]),
})?;
}
"Stop" => {
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id,
harness: Some("claude-code".to_string()),
status: Some("Stop".to_string()),
message: value_string(payload, &["message"])
.or_else(|| value_string(payload, &["result"]))
.or_else(|| value_string(payload, &["stop_reason"])),
probe_metadata: metadata,
..UpdateProgressParams::default()
})?;
if let Err(err) = claude_hook::handle_stop_capture(
&runtime.repo,
&runtime.user_config,
payload,
"Claude Code turn",
) {
tracing::warn!(?err, "heddle Stop hook capture failed");
}
}
"SubagentStop" => {
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id,
harness: Some("claude-code".to_string()),
status: Some("SubagentStop".to_string()),
touched_paths: csv_from_value(metadata.get("touched_paths")),
probe_metadata: metadata,
..UpdateProgressParams::default()
})?;
if let Err(err) = claude_hook::handle_stop_capture(
&runtime.repo,
&runtime.user_config,
payload,
"Claude Code subagent turn",
) {
tracing::warn!(?err, "heddle SubagentStop hook capture failed");
}
if let Err(err) = claude_hook::mark_subagent_complete(&runtime.repo, payload) {
tracing::debug!(?err, "heddle SubagentStop mark-complete failed");
}
}
"SubagentStart" => {
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id,
harness: Some("claude-code".to_string()),
status: Some("SubagentStart".to_string()),
touched_paths: csv_from_value(metadata.get("touched_paths")),
probe_metadata: metadata,
..UpdateProgressParams::default()
})?;
}
"UserPromptSubmit" => {
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id.clone(),
harness: Some("claude-code".to_string()),
status: Some("UserPromptSubmit".to_string()),
touched_paths: csv_from_value(metadata.get("touched_paths")),
probe_metadata: metadata,
..UpdateProgressParams::default()
})?;
if let Err(err) = claude_hook::handle_user_prompt_segment_rotate(
&runtime.repo,
&opened.heddle_session_id,
payload,
) {
tracing::debug!(?err, "heddle UserPromptSubmit segment rotation failed");
}
}
"PreToolUse" => {
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id,
harness: Some("claude-code".to_string()),
status: Some("PreToolUse".to_string()),
touched_paths: csv_from_value(metadata.get("touched_paths")),
probe_metadata: metadata,
..UpdateProgressParams::default()
})?;
if let Err(err) = claude_hook::handle_pre_tool_use(&runtime.repo, payload) {
tracing::debug!(?err, "heddle PreToolUse context inject skipped");
}
}
_ => {
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id,
harness: Some("claude-code".to_string()),
status: Some(event.to_string()),
touched_paths: csv_from_value(metadata.get("touched_paths")),
probe_metadata: metadata,
..UpdateProgressParams::default()
})?;
}
}
Ok(())
}
fn relay_opencode(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
let metadata = map_from_pairs([
(
"session_id",
value_string(payload, &["sessionID"])
.or_else(|| value_string(payload, &["session_id"])),
),
(
"parent_id",
value_string(payload, &["parentID"]).or_else(|| value_string(payload, &["parent_id"])),
),
(
"client_name",
value_string(payload, &["client"]).or_else(|| std::env::var("OPENCODE_CLIENT").ok()),
),
("model", value_string(payload, &["model"])),
("provider", value_string(payload, &["provider"])),
("hook_event", Some(event.to_string())),
(
"touched_paths",
value_string(payload, &["file", "path"]).or_else(|| value_string(payload, &["path"])),
),
]);
let opened = runtime.open_session(OpenSessionParams {
harness: Some("opencode".to_string()),
model: value_string(payload, &["model"]),
provider: value_string(payload, &["provider"]),
probe_metadata: metadata.clone(),
..OpenSessionParams::default()
})?;
runtime.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id,
harness: Some("opencode".to_string()),
status: Some(event.to_string()),
touched_paths: csv_from_value(metadata.get("touched_paths")),
probe_metadata: metadata,
..UpdateProgressParams::default()
})?;
Ok(())
}
fn value_string(value: &Value, path: &[&str]) -> Option<String> {
let mut current = value;
for segment in path {
current = current.get(*segment)?;
}
match current {
Value::String(s) => Some(s.clone()),
Value::Bool(v) => Some(v.to_string()),
Value::Number(v) => Some(v.to_string()),
_ => None,
}
}
fn value_array_join(value: &Value, path: &[&str]) -> Option<String> {
let mut current = value;
for segment in path {
current = current.get(*segment)?;
}
current.as_array().map(|items| {
items
.iter()
.filter_map(|item| item.as_str().map(ToString::to_string))
.collect::<Vec<_>>()
.join(",")
})
}
fn value_u64_string(value: &Value, path: &[&str]) -> Option<String> {
let mut current = value;
for segment in path {
current = current.get(*segment)?;
}
current.as_u64().map(|v| v.to_string())
}
fn value_u64(value: &Value, path: &[&str]) -> Option<u64> {
let mut current = value;
for segment in path {
current = current.get(*segment)?;
}
current.as_u64()
}
fn value_cost_micros(value: &Value, path: &[&str]) -> Option<String> {
let mut current = value;
for segment in path {
current = current.get(*segment)?;
}
current
.as_f64()
.map(|v| ((v * 1_000_000.0).round() as u64).to_string())
}
fn value_cost_micros_u64(value: &Value, path: &[&str]) -> Option<u64> {
let mut current = value;
for segment in path {
current = current.get(*segment)?;
}
current.as_f64().map(|v| (v * 1_000_000.0).round() as u64)
}
fn map_from_pairs<const N: usize>(pairs: [(&str, Option<String>); N]) -> BTreeMap<String, String> {
pairs
.into_iter()
.filter_map(|(key, value)| value.map(|value| (key.to_string(), value)))
.collect()
}
fn csv_from_value(value: Option<&String>) -> Vec<String> {
value
.map(|value| {
value
.split(',')
.map(|item| item.trim().to_string())
.filter(|item| !item.is_empty())
.collect()
})
.unwrap_or_default()
}
impl HarnessBridgeRuntime {
fn new(repo: Repository, user_config: UserConfig) -> Self {
let reports = SessionReportStore::new(repo.root());
Self {
repo,
user_config,
reports,
}
}
fn handle_request(&mut self, request: BridgeRequest) -> BridgeResponse {
let response = match request.method.as_str() {
"open_session" => self
.decode_params::<OpenSessionParams>(request.params)
.and_then(|params| self.open_session(params))
.and_then(to_json_value),
"update_progress" => self
.decode_params::<UpdateProgressParams>(request.params)
.and_then(|params| self.update_progress(params))
.and_then(to_json_value),
"record_usage" => self
.decode_params::<RecordUsageParams>(request.params)
.and_then(|params| self.record_usage(params))
.and_then(to_json_value),
"record_touched_paths" => self
.decode_params::<RecordTouchedPathsParams>(request.params)
.and_then(|params| self.record_touched_paths(params))
.and_then(to_json_value),
"close_session" => self
.decode_params::<CloseSessionParams>(request.params)
.and_then(|params| self.close_session(params))
.and_then(to_json_value),
"flush_reports" => self
.decode_params::<FlushReportsParams>(request.params)
.and_then(|params| self.flush_reports(params))
.and_then(to_json_value),
other => Err(anyhow!("unknown method '{other}'")),
};
match response {
Ok(result) => BridgeResponse::ok(request.id, result),
Err(err) => BridgeResponse::error(request.id, "bridge_error", err.to_string()),
}
}
fn decode_params<T: for<'de> Deserialize<'de>>(&self, value: Value) -> Result<T> {
serde_json::from_value(value).map_err(|err| anyhow!(err))
}
fn open_session(&mut self, params: OpenSessionParams) -> Result<OpenSessionResult> {
if self.user_config.harness.mode == HarnessMode::Off {
return Err(anyhow!("harness integration is disabled in user config"));
}
let requested_transport = params
.transport
.unwrap_or(self.user_config.harness.transport);
let transcript_mode = params
.transcript_mode
.unwrap_or(self.user_config.harness.transcript);
let env_hints = merged_env_hints(¶ms.env_hints);
let token_claims = user_config_token_claims(&self.user_config);
let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
let current_segment = current_session
.as_ref()
.and_then(|session| session.current_segment());
let probe = probe_harness_actor(&HarnessProbeInput {
argv: params.argv.clone(),
env_hints: env_hints.clone(),
explicit_harness: params.harness.clone(),
explicit_provider: params.provider.clone(),
explicit_model: params.model.clone(),
explicit_thinking_level: params.thinking_level.clone(),
explicit_policy: params.policy.clone(),
probe_metadata: params.probe_metadata.clone(),
current_provider: current_segment.map(|segment| segment.provider.clone()),
current_model: current_segment.map(|segment| segment.model.clone()),
current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
repo_root: self.repo.root().display().to_string(),
})?;
let identity = resolve_identity(
&self.repo,
&self.user_config,
IdentityHints {
harness: params.harness.clone(),
provider: params.provider.clone(),
model: params.model.clone(),
thinking_level: params.thinking_level.clone(),
policy: params.policy.clone(),
probe: probe.clone(),
},
)?;
let registry = AgentRegistry::new(self.repo.heddle_dir());
let requested_entry = resolve_requested_registry_entry(
®istry,
params.agent_session_id.as_deref(),
params.client_instance_id.as_deref(),
)?;
if self.user_config.harness.mode == HarnessMode::Required
&& (identity.harness.is_none()
|| identity.provider.is_none()
|| identity.model.is_none())
{
return Err(anyhow!(
"harness mode is 'required' but harness/provider/model could not be resolved"
));
}
let mut sessions = SessionManager::new(self.repo.root());
let principal = self.repo.get_principal()?;
let mut attach = resolve_actor_attachment(
®istry,
&self.repo,
&mut sessions,
AttachmentResolutionInput {
requested_entry: requested_entry.as_ref(),
explicit_heddle_session_id: params.heddle_session_id.as_deref(),
client_instance_id: params.client_instance_id.as_deref(),
probe: &probe,
token_claims: token_claims.as_ref(),
},
)?;
let (session, owns_session) = match &attach.target {
AttachTarget::ExistingSession(session) => {
let segment_id = session.current_segment_id.clone().unwrap_or_default();
sessions.set_current_session(&session.id, &segment_id)?;
(session.clone(), false)
}
AttachTarget::CreateNew {
_because_claimed: _,
} => {
let session = sessions.start_session(
principal,
identity
.provider
.clone()
.unwrap_or_else(|| "unknown".to_string()),
identity
.model
.clone()
.unwrap_or_else(|| "unknown".to_string()),
identity.policy.clone(),
)?;
(session, true)
}
};
let (thread_name, thread_id) =
self.resolve_harness_thread_binding(¶ms, &probe, &identity)?;
let entry = self.ensure_registry_entry(RegistryEntryRequest {
heddle_session_id: &session.id,
thread_name: thread_name.as_deref(),
thread_id: thread_id.as_deref(),
identity: &identity,
probe: &probe,
attach: &attach,
client_instance_id: params.client_instance_id.as_deref(),
requested_entry: requested_entry.as_ref(),
})?;
let (session, owns_session) = self.reuse_canonical_actor_session(
&mut sessions,
CanonicalActorSessionRequest {
tentative_session: session,
tentative_owns_session: owns_session,
entry: &entry,
probe: &probe,
attach: &mut attach,
},
)?;
let mut segment_id = session.current_segment_id.clone().unwrap_or_default();
if should_rotate_segment(&session, &identity) {
let segment = sessions.add_segment(
&session.id,
identity
.provider
.clone()
.unwrap_or_else(|| "unknown".to_string()),
identity
.model
.clone()
.unwrap_or_else(|| "unknown".to_string()),
identity.policy.clone(),
)?;
segment_id = segment.id;
}
let base_state = self
.repo
.current_state()?
.map(|state| state.change_id.to_string_full())
.or_else(|| {
self.repo
.head()
.ok()
.flatten()
.map(|id| id.to_string_full())
});
let worktree_changes_at_open = capture_worktree_change_snapshot(&self.repo)?;
let opened_at = Utc::now().to_rfc3339();
let mut report = SessionReportEnvelope {
version: 1,
heddle_session_id: session.id.clone(),
heddle_segment_id: (!segment_id.is_empty()).then_some(segment_id.clone()),
agent_session_id: Some(entry.session_id.clone()),
client_instance_id: entry.client_instance_id.clone(),
native_actor_key: entry.native_actor_key.clone(),
native_parent_actor_key: entry.native_parent_actor_key.clone(),
native_instance_key: entry.native_instance_key.clone(),
repo_root: self.repo.root().display().to_string(),
thread: thread_name.clone(),
thread_id,
task: params.task.clone(),
summary: params.summary.clone(),
opened_at,
closed_at: None,
base_state_at_open: base_state.clone(),
worktree_changes_at_open,
head_state_at_close: None,
transport_mode: transport_mode_name(requested_transport).to_string(),
transcript_mode: transcript_mode_name(transcript_mode).to_string(),
outcome: None,
harness: identity.to_transport_identity(),
progress: Vec::new(),
usage: UsageTotals::default(),
touched_paths: Vec::new(),
changed_paths: Vec::new(),
diff_summary: None,
transcript_refs: Vec::new(),
last_progress_at: None,
report_flush_state: Some("pending-local".to_string()),
attach_reason: Some(attach.attach_reason.clone()),
attach_precedence: attach.precedence.clone(),
winning_attach_rule: Some(attach.winning_rule.clone()),
probe_source: probe.probe_source.clone(),
probe_confidence: probe.confidence,
pending_flush: true,
last_flushed_at: None,
owns_session,
};
merge_unique_paths(&mut report.touched_paths, probe.touched_paths.clone());
merge_usage(&mut report.usage, &probe.usage_totals);
if transcript_mode != HarnessTranscriptMode::Off {
report.transcript_refs = probe.transcript_refs.clone();
}
self.reports.save(&report)?;
self.sync_registry_from_report(&report, AgentStatus::Active)?;
if matches!(requested_transport, HarnessTransport::Direct) {
enqueue_report(&self.reports, &mut report)?;
self.sync_registry_from_report(&report, AgentStatus::Active)?;
}
Ok(OpenSessionResult {
heddle_session_id: report.heddle_session_id.clone(),
heddle_segment_id: report.heddle_segment_id.clone(),
agent_session_id: report.agent_session_id.clone(),
created_session: owns_session,
harness: report.harness.harness.clone(),
provider: report.harness.provider.clone(),
model: report.harness.model.clone(),
thinking_level: report.harness.thinking_level.clone(),
report_flush_state: report.report_flush_state.clone(),
attach_reason: report.attach_reason.clone(),
})
}
fn update_progress(&mut self, params: UpdateProgressParams) -> Result<SessionMutationResult> {
let mut report = self
.reports
.load(¶ms.heddle_session_id)?
.ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
let current_segment = current_session
.as_ref()
.and_then(|session| session.current_segment());
let probe = probe_harness_actor(&HarnessProbeInput {
argv: params.argv.clone(),
env_hints: merged_env_hints(¶ms.env_hints),
explicit_harness: params.harness.clone(),
explicit_provider: params.provider.clone(),
explicit_model: params.model.clone(),
explicit_thinking_level: params.thinking_level.clone(),
explicit_policy: params.policy.clone(),
probe_metadata: params.probe_metadata.clone(),
current_provider: current_segment.map(|segment| segment.provider.clone()),
current_model: current_segment.map(|segment| segment.model.clone()),
current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
repo_root: self.repo.root().display().to_string(),
})?;
let identity = resolve_identity(
&self.repo,
&self.user_config,
IdentityHints {
harness: params.harness.clone(),
provider: params.provider.clone(),
model: params.model.clone(),
thinking_level: params.thinking_level.clone(),
policy: params.policy.clone(),
probe: probe.clone(),
},
)?;
self.ensure_segment_for_report(&mut report, &identity)?;
if report.harness.harness.is_none() {
report.harness.harness = identity.harness.clone();
}
if report.harness.provider.is_none() {
report.harness.provider = identity.provider.clone();
}
if report.harness.model.is_none() {
report.harness.model = identity.model.clone();
}
if report.harness.thinking_level.is_none() {
report.harness.thinking_level = identity.thinking_level.clone();
}
if report.harness.policy.is_none() {
report.harness.policy = identity.policy.clone();
}
if report.native_actor_key.is_none() {
report.native_actor_key = probe.native_actor_key.clone();
}
if report.native_parent_actor_key.is_none() {
report.native_parent_actor_key = probe.native_parent_actor_key.clone();
}
if report.native_instance_key.is_none() {
report.native_instance_key = probe.native_instance_key.clone();
}
if report.probe_source.is_none() {
report.probe_source = probe.probe_source.clone();
}
if report.probe_confidence.is_none() {
report.probe_confidence = probe.confidence;
}
let recorded_at = Utc::now().to_rfc3339();
let checkpoint = ProgressCheckpoint {
status: params.status.clone(),
message: params.message.clone(),
completed_steps: params.completed_steps,
total_steps: params.total_steps,
touched_paths: normalize_paths(
params
.touched_paths
.into_iter()
.chain(probe.touched_paths)
.collect::<Vec<_>>(),
),
recorded_at: recorded_at.clone(),
};
merge_unique_paths(
&mut report.touched_paths,
checkpoint.touched_paths.iter().cloned(),
);
merge_usage(&mut report.usage, &probe.usage_totals);
if report.transcript_mode != "off" && report.transcript_refs.is_empty() {
report.transcript_refs = probe.transcript_refs;
}
report.progress.push(checkpoint);
if let Some(summary) = params.summary {
report.summary = Some(summary);
}
report.last_progress_at = Some(recorded_at);
mark_pending_flush(&mut report);
self.persist_report(report)
}
fn resolve_harness_thread_binding(
&self,
params: &OpenSessionParams,
probe: &HarnessProbeResult,
identity: &ResolvedIdentity,
) -> Result<(Option<String>, Option<String>)> {
if let Some(thread) = params.thread.clone() {
let thread_id = thread_id_for_name(&self.repo, Some(&thread))?;
return Ok((Some(thread), thread_id));
}
let current_attached = match self.repo.head_ref()? {
Head::Attached { thread } => Some(thread),
Head::Detached { .. } => None,
};
if !probe.attach_hints.root_actor
&& self.user_config.harness.threading.subagent
== UserHarnessSubagentThreadPolicy::CreateChild
&& let Some(parent_thread) =
resolve_parent_thread_for_subagent(&self.repo, probe, current_attached.as_deref())?
&& can_create_harness_thread(&self.repo, Some(&parent_thread), Some(&parent_thread))?
{
let name = allocate_thread_name(
&self.repo,
&format!(
"{}/{}",
parent_thread,
sanitize_name(&preferred_thread_slug(params, probe, identity))
),
)?;
self.ensure_harness_thread(
&name,
Some(&parent_thread),
Some(&parent_thread),
params.task.clone(),
)?;
let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
return Ok((Some(name), thread_id));
}
if probe.attach_hints.root_actor
&& self.user_config.harness.threading.root_actor
== UserHarnessRootThreadPolicy::CreateNew
&& let Some(current) = current_attached.clone()
&& can_create_harness_thread(&self.repo, Some(¤t), None)?
{
let name = allocate_thread_name(
&self.repo,
&format!(
"{}/{}",
current,
sanitize_name(&preferred_thread_slug(params, probe, identity))
),
)?;
self.ensure_harness_thread(&name, Some(¤t), None, params.task.clone())?;
let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
return Ok((Some(name), thread_id));
}
let thread_id = thread_id_for_name(&self.repo, current_attached.as_deref())?;
Ok((current_attached, thread_id))
}
fn ensure_harness_thread(
&self,
name: &str,
target_thread: Option<&str>,
parent_thread: Option<&str>,
task: Option<String>,
) -> Result<()> {
let manager = ThreadManager::new(self.repo.heddle_dir());
if manager.load(name)?.is_some() {
return Ok(());
}
let base_state = self
.resolve_harness_thread_base_state(target_thread, parent_thread)?
.ok_or_else(|| anyhow!("No current state to start a thread from"))?;
if self.repo.refs().get_thread(name)?.is_none() {
self.repo
.refs()
.set_thread_cas(name, refs::RefExpectation::Missing, &base_state)?;
self.repo.oplog().record_thread_create(
name,
&base_state,
Some(&self.repo.op_scope()),
)?;
}
let workspace_mode = self
.user_config
.harness
.threading
.workspace_default
.unwrap_or(UserThreadWorkspaceMode::Heavy);
let thread_mode = match workspace_mode {
UserThreadWorkspaceMode::Heavy | UserThreadWorkspaceMode::Auto => {
ThreadMode::Lightweight
}
UserThreadWorkspaceMode::Light => ThreadMode::Virtualized,
};
let path = match thread_mode {
ThreadMode::Materialized | ThreadMode::Lightweight => {
default_private_thread_path(&self.repo, name)
}
ThreadMode::Virtualized => default_private_thread_path(&self.repo, name),
};
let abs_path = prepare_worktree_target(&self.repo, &path)?;
write_isolated_checkout(&self.repo, &abs_path, &base_state, Some(name))?;
let base_state_obj = self
.repo
.store()
.get_state(&base_state)?
.ok_or_else(|| anyhow!("Base state '{}' not found", base_state.short()))?;
let thread = Thread {
id: name.to_string(),
thread: name.to_string(),
target_thread: target_thread.map(ToString::to_string),
parent_thread: parent_thread.map(ToString::to_string),
mode: thread_mode.clone(),
state: ThreadState::Active,
base_state: base_state.short(),
base_root: base_state_obj.tree.short(),
current_state: Some(base_state.short()),
merged_state: None,
task,
execution_path: abs_path.clone(),
materialized_path: match thread_mode {
ThreadMode::Materialized => Some(abs_path),
ThreadMode::Lightweight | ThreadMode::Virtualized => None,
},
changed_paths: vec![],
impact_categories: vec![],
heavy_impact_paths: vec![],
promotion_suggested: false,
freshness: if target_thread.is_some() {
ThreadFreshness::Current
} else {
ThreadFreshness::Unknown
},
verification_summary: summarize_verification(base_state_obj.verification.as_ref()),
confidence_summary: summarize_confidence(base_state_obj.confidence),
integration_policy_result: ThreadIntegrationPolicy::default(),
created_at: Utc::now(),
updated_at: Utc::now(),
ephemeral: None,
auto: true,
shared_target_dir: None,
};
manager.save(&thread)?;
Ok(())
}
fn resolve_harness_thread_base_state(
&self,
target_thread: Option<&str>,
parent_thread: Option<&str>,
) -> Result<Option<objects::object::ChangeId>> {
resolve_harness_thread_base_state(&self.repo, target_thread, parent_thread)
}
fn record_usage(&mut self, params: RecordUsageParams) -> Result<SessionMutationResult> {
let mut report = self
.reports
.load(¶ms.heddle_session_id)?
.ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
if let Some(input) = params.input_tokens {
report.usage.input_tokens = Some(max_u64(report.usage.input_tokens, input));
}
if let Some(output) = params.output_tokens {
report.usage.output_tokens = Some(max_u64(report.usage.output_tokens, output));
}
if let Some(reasoning) = params.reasoning_tokens {
report.usage.reasoning_tokens = Some(max_u64(report.usage.reasoning_tokens, reasoning));
}
if let Some(cache_creation) = params.cache_creation_tokens {
report.usage.cache_creation_tokens =
Some(max_u64(report.usage.cache_creation_tokens, cache_creation));
}
if let Some(cache_read) = params.cache_read_tokens {
report.usage.cache_read_tokens =
Some(max_u64(report.usage.cache_read_tokens, cache_read));
}
if let Some(tool_calls) = params.tool_calls {
report.usage.tool_calls = Some(max_u32(report.usage.tool_calls, tool_calls));
}
if let Some(cost) = params.cost_micros_usd {
report.usage.cost_micros_usd = Some(max_u64(report.usage.cost_micros_usd, cost));
}
mark_pending_flush(&mut report);
self.persist_report(report)
}
fn record_touched_paths(
&mut self,
params: RecordTouchedPathsParams,
) -> Result<SessionMutationResult> {
let mut report = self
.reports
.load(¶ms.heddle_session_id)?
.ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
merge_unique_paths(&mut report.touched_paths, normalize_paths(params.paths));
mark_pending_flush(&mut report);
self.persist_report(report)
}
fn close_session(&mut self, params: CloseSessionParams) -> Result<CloseSessionResult> {
let mut report = self
.reports
.load(¶ms.heddle_session_id)?
.ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
report.closed_at = Some(Utc::now().to_rfc3339());
report.outcome = params.outcome.clone();
if let Some(summary) = params.summary {
report.summary = Some(summary);
}
if let Some(transcript_refs) = params.transcript_refs {
report.transcript_refs = transcript_refs;
}
let final_diff = compute_final_diff(
&self.repo,
report.base_state_at_open.as_deref(),
&report.worktree_changes_at_open,
)?;
report.head_state_at_close = final_diff.head_state;
report.changed_paths = final_diff.changed_paths;
report.diff_summary = Some(final_diff.diff_summary);
mark_pending_flush(&mut report);
if report.owns_session {
let mut sessions = SessionManager::new(self.repo.root());
if let Ok(Some(session)) = sessions.get_session(&report.heddle_session_id)
&& session.is_active()
{
let _ = sessions.end_session(Some(&report.heddle_session_id));
}
}
let transport = params
.transport
.unwrap_or(self.user_config.harness.transport);
if matches!(transport, HarnessTransport::Direct | HarnessTransport::End) {
enqueue_report(&self.reports, &mut report)?;
} else {
self.reports.save(&report)?;
}
self.sync_registry_from_report(&report, AgentStatus::Complete)?;
Ok(CloseSessionResult {
heddle_session_id: report.heddle_session_id,
changed_paths: report.changed_paths,
diff_summary: report.diff_summary.unwrap_or_default(),
report_flush_state: report.report_flush_state,
})
}
fn flush_reports(&mut self, params: FlushReportsParams) -> Result<FlushReportsResult> {
let mut flushed = 0usize;
let session_ids = match params.heddle_session_id {
Some(session_id) => vec![session_id],
None => self.reports.list_pending()?,
};
for session_id in session_ids {
let Some(mut report) = self.reports.load(&session_id)? else {
continue;
};
if !report.pending_flush {
continue;
}
enqueue_report(&self.reports, &mut report)?;
let status = if report.closed_at.is_some() {
AgentStatus::Complete
} else {
AgentStatus::Active
};
self.sync_registry_from_report(&report, status)?;
flushed += 1;
}
Ok(FlushReportsResult { flushed })
}
fn persist_report(
&mut self,
mut report: SessionReportEnvelope,
) -> Result<SessionMutationResult> {
let transport = transport_from_report(&report, self.user_config.harness.transport);
match transport {
HarnessTransport::Direct => {
enqueue_report(&self.reports, &mut report)?;
}
HarnessTransport::Spool | HarnessTransport::End => {
self.reports.save(&report)?;
}
}
self.sync_registry_from_report(&report, AgentStatus::Active)?;
Ok(SessionMutationResult {
heddle_session_id: report.heddle_session_id,
heddle_segment_id: report.heddle_segment_id,
report_flush_state: report.report_flush_state,
})
}
fn ensure_segment_for_report(
&self,
report: &mut SessionReportEnvelope,
identity: &ResolvedIdentity,
) -> Result<()> {
let mut sessions = SessionManager::new(self.repo.root());
let Some(session) = sessions.get_session(&report.heddle_session_id)? else {
return Ok(());
};
if !session.is_active() || !should_rotate_segment(&session, identity) {
return Ok(());
}
let segment = sessions.add_segment(
&report.heddle_session_id,
identity
.provider
.clone()
.unwrap_or_else(|| "unknown".to_string()),
identity
.model
.clone()
.unwrap_or_else(|| "unknown".to_string()),
identity.policy.clone(),
)?;
report.heddle_segment_id = Some(segment.id);
if identity.provider.is_some() {
report.harness.provider = identity.provider.clone();
}
if identity.model.is_some() {
report.harness.model = identity.model.clone();
}
if identity.policy.is_some() {
report.harness.policy = identity.policy.clone();
}
if identity.thinking_level.is_some() {
report.harness.thinking_level = identity.thinking_level.clone();
}
Ok(())
}
fn ensure_registry_entry(&self, request: RegistryEntryRequest<'_>) -> Result<AgentEntry> {
let RegistryEntryRequest {
heddle_session_id,
thread_name,
thread_id,
identity,
probe,
attach,
client_instance_id,
requested_entry,
} = request;
let registry = AgentRegistry::new(self.repo.heddle_dir());
let fallback_entry = if client_instance_id.is_some()
|| probe.native_actor_key.is_some()
|| probe.native_instance_key.is_some()
{
None
} else {
find_matching_registry_entry(®istry, &self.repo, heddle_session_id, thread_name)?
};
if let Some(entry) = requested_entry
.cloned()
.or_else(|| attach.matched_entry.clone())
.or(fallback_entry)
{
return registry
.update_entry(&entry.session_id, |existing| {
if client_instance_id.is_some() {
existing.client_instance_id = client_instance_id.map(ToString::to_string);
}
if probe.native_actor_key.is_some() {
existing.native_actor_key = probe.native_actor_key.clone();
}
if probe.native_parent_actor_key.is_some() {
existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
}
if probe.native_instance_key.is_some() {
existing.native_instance_key = probe.native_instance_key.clone();
}
existing.heddle_session_id = Some(heddle_session_id.to_string());
existing.thread_id = thread_id.map(ToString::to_string);
if let Some(thread_name) = thread_name {
existing.thread = thread_name.to_string();
}
existing.path = Some(self.repo.root().to_path_buf());
if identity.provider.is_some() {
existing.provider = identity.provider.clone();
}
if identity.model.is_some() {
existing.model = identity.model.clone();
}
if identity.harness.is_some() {
existing.harness = identity.harness.clone();
}
if identity.thinking_level.is_some() {
existing.thinking_level = identity.thinking_level.clone();
}
existing.attach_reason = Some(attach.attach_reason.clone());
existing.attach_precedence = attach.precedence.clone();
existing.winning_attach_rule = Some(attach.winning_rule.clone());
existing.probe_source = probe.probe_source.clone();
existing.probe_confidence = probe.confidence;
existing.status = AgentStatus::Active;
})?
.ok_or_else(|| anyhow!("registry entry disappeared during update"));
}
if probe.native_actor_key.is_some() {
let (entry, _) = registry.find_or_create_active_entry(
|entry| {
claude_actor_compatible(entry, probe, self.repo.root())
&& entry.native_actor_key == probe.native_actor_key
},
|existing| {
if client_instance_id.is_some() {
existing.client_instance_id = client_instance_id.map(ToString::to_string);
}
if existing.heddle_session_id.is_none() {
existing.heddle_session_id = Some(heddle_session_id.to_string());
}
existing.thread_id = thread_id.map(ToString::to_string);
if let Some(thread_name) = thread_name {
existing.thread = thread_name.to_string();
}
existing.path = Some(self.repo.root().to_path_buf());
if identity.provider.is_some() {
existing.provider = identity.provider.clone();
}
if identity.model.is_some() {
existing.model = identity.model.clone();
}
if identity.harness.is_some() {
existing.harness = identity.harness.clone();
}
if identity.thinking_level.is_some() {
existing.thinking_level = identity.thinking_level.clone();
}
if probe.native_parent_actor_key.is_some() {
existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
}
if probe.native_instance_key.is_some() {
existing.native_instance_key = probe.native_instance_key.clone();
}
existing.attach_reason = Some(attach.attach_reason.clone());
existing.attach_precedence = attach.precedence.clone();
existing.winning_attach_rule = Some(attach.winning_rule.clone());
existing.probe_source = probe.probe_source.clone();
existing.probe_confidence = probe.confidence;
existing.status = AgentStatus::Active;
},
|session_id| {
Ok(AgentEntry {
session_id: session_id.to_string(),
client_instance_id: client_instance_id.map(ToString::to_string),
native_actor_key: probe.native_actor_key.clone(),
native_parent_actor_key: probe.native_parent_actor_key.clone(),
native_instance_key: probe.native_instance_key.clone(),
heddle_session_id: Some(heddle_session_id.to_string()),
thread_id: thread_id.map(ToString::to_string),
thread: thread_name.unwrap_or("detached").to_string(),
pid: Some(std::process::id()),
boot_id: None,
liveness_path: None,
heartbeat_at: Some(Utc::now()),
anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
anchor_root: None,
reservation_token: Some(objects::store::generate_agent_id()),
path: Some(self.repo.root().to_path_buf()),
base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
started_at: Utc::now(),
provider: identity.provider.clone(),
model: identity.model.clone(),
harness: identity.harness.clone(),
thinking_level: identity.thinking_level.clone(),
usage_summary: AgentUsageSummary::default(),
last_progress_at: None,
report_flush_state: Some("pending-local".to_string()),
attach_reason: Some(attach.attach_reason.clone()),
attach_precedence: attach.precedence.clone(),
winning_attach_rule: Some(attach.winning_rule.clone()),
probe_source: probe.probe_source.clone(),
probe_confidence: probe.confidence,
status: AgentStatus::Active,
completed_at: None,
context_queries: vec![],
})
},
)?;
return Ok(entry);
}
Ok(registry.create_generated_entry(|session_id| {
Ok(AgentEntry {
session_id: session_id.to_string(),
client_instance_id: client_instance_id.map(ToString::to_string),
native_actor_key: probe.native_actor_key.clone(),
native_parent_actor_key: probe.native_parent_actor_key.clone(),
native_instance_key: probe.native_instance_key.clone(),
heddle_session_id: Some(heddle_session_id.to_string()),
thread_id: thread_id.map(ToString::to_string),
thread: thread_name.unwrap_or("detached").to_string(),
pid: Some(std::process::id()),
boot_id: None,
liveness_path: None,
heartbeat_at: Some(Utc::now()),
anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
anchor_root: None,
reservation_token: Some(objects::store::generate_agent_id()),
path: Some(self.repo.root().to_path_buf()),
base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
started_at: Utc::now(),
provider: identity.provider.clone(),
model: identity.model.clone(),
harness: identity.harness.clone(),
thinking_level: identity.thinking_level.clone(),
usage_summary: AgentUsageSummary::default(),
last_progress_at: None,
report_flush_state: Some("pending-local".to_string()),
attach_reason: Some(attach.attach_reason.clone()),
attach_precedence: attach.precedence.clone(),
winning_attach_rule: Some(attach.winning_rule.clone()),
probe_source: probe.probe_source.clone(),
probe_confidence: probe.confidence,
status: AgentStatus::Active,
completed_at: None,
context_queries: vec![],
})
})?)
}
fn reuse_canonical_actor_session(
&self,
sessions: &mut SessionManager,
request: CanonicalActorSessionRequest<'_>,
) -> Result<(Session, bool)> {
let CanonicalActorSessionRequest {
tentative_session,
tentative_owns_session,
entry,
probe,
attach,
} = request;
let Some(canonical_session_id) = entry.heddle_session_id.as_deref() else {
return Ok((tentative_session, tentative_owns_session));
};
if canonical_session_id == tentative_session.id {
return Ok((tentative_session, tentative_owns_session));
}
if tentative_owns_session
&& let Ok(Some(session)) = sessions.get_session(&tentative_session.id)
&& session.is_active()
{
let _ = sessions.end_session(Some(&tentative_session.id));
}
let canonical_session = sessions
.get_session(canonical_session_id)?
.ok_or_else(|| anyhow!("session not found: {canonical_session_id}"))?;
let canonical_segment_id = canonical_session
.current_segment_id
.clone()
.unwrap_or_default();
sessions.set_current_session(canonical_session_id, &canonical_segment_id)?;
if let Some(native_actor_key) = probe
.native_actor_key
.as_deref()
.or(entry.native_actor_key.as_deref())
{
attach.precedence.push(format!(
"post-create-native-actor-key:{native_actor_key}:matched"
));
attach.attach_reason = format!(
"reused existing native actor {} on Heddle session {}",
native_actor_key, canonical_session_id
);
attach.winning_rule = "native-actor-key-post-create".to_string();
}
Ok((canonical_session, false))
}
fn sync_registry_from_report(
&self,
report: &SessionReportEnvelope,
status: AgentStatus,
) -> Result<()> {
let registry = AgentRegistry::new(self.repo.heddle_dir());
let entry = if let Some(agent_session_id) = &report.agent_session_id {
registry.update_entry(agent_session_id, |entry| {
if report.client_instance_id.is_some() {
entry.client_instance_id = report.client_instance_id.clone();
}
if report.native_actor_key.is_some() {
entry.native_actor_key = report.native_actor_key.clone();
}
if report.native_parent_actor_key.is_some() {
entry.native_parent_actor_key = report.native_parent_actor_key.clone();
}
if report.native_instance_key.is_some() {
entry.native_instance_key = report.native_instance_key.clone();
}
entry.heddle_session_id = Some(report.heddle_session_id.clone());
entry.path = Some(self.repo.root().to_path_buf());
entry.harness = report.harness.harness.clone();
entry.provider = report.harness.provider.clone();
entry.model = report.harness.model.clone();
entry.thinking_level = report.harness.thinking_level.clone();
entry.usage_summary = usage_to_summary(&report.usage);
entry.last_progress_at =
report.last_progress_at.as_deref().and_then(parse_timestamp);
entry.report_flush_state = report.report_flush_state.clone();
entry.attach_reason = report.attach_reason.clone();
entry.attach_precedence = report.attach_precedence.clone();
entry.winning_attach_rule = report.winning_attach_rule.clone();
entry.probe_source = report.probe_source.clone();
entry.probe_confidence = report.probe_confidence;
entry.status = status.clone();
entry.completed_at = match status {
AgentStatus::Active => None,
AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
Some(Utc::now())
}
};
})?
} else {
None
};
if entry.is_none() {
let resolved = self.ensure_registry_entry(RegistryEntryRequest {
heddle_session_id: &report.heddle_session_id,
thread_name: report.thread.as_deref(),
thread_id: report.thread_id.as_deref(),
identity: &ResolvedIdentity {
harness: report.harness.harness.clone(),
provider: report.harness.provider.clone(),
model: report.harness.model.clone(),
thinking_level: report.harness.thinking_level.clone(),
policy: report.harness.policy.clone(),
},
probe: &HarnessProbeResult {
native_actor_key: report.native_actor_key.clone(),
native_parent_actor_key: report.native_parent_actor_key.clone(),
native_instance_key: report.native_instance_key.clone(),
probe_source: report.probe_source.clone(),
confidence: report.probe_confidence,
..HarnessProbeResult::default()
},
attach: &ResolvedAttachment {
target: AttachTarget::CreateNew {
_because_claimed: false,
},
matched_entry: None,
attach_reason: report.attach_reason.clone().unwrap_or_else(|| {
format!(
"created actor for Heddle session {}",
report.heddle_session_id
)
}),
precedence: report.attach_precedence.clone(),
winning_rule: report
.winning_attach_rule
.clone()
.unwrap_or_else(|| "report-sync".to_string()),
},
client_instance_id: report.client_instance_id.as_deref(),
requested_entry: None,
})?;
let mut report = report.clone();
report.agent_session_id = Some(resolved.session_id);
self.reports.save(&report)?;
}
Ok(())
}
}
#[derive(Debug, Clone, Default)]
struct ResolvedIdentity {
harness: Option<String>,
provider: Option<String>,
model: Option<String>,
thinking_level: Option<String>,
policy: Option<String>,
}
impl ResolvedIdentity {
fn to_transport_identity(&self) -> HarnessIdentity {
HarnessIdentity {
harness: self.harness.clone(),
provider: self.provider.clone(),
model: self.model.clone(),
thinking_level: self.thinking_level.clone(),
policy: self.policy.clone(),
}
}
}
struct IdentityHints {
harness: Option<String>,
provider: Option<String>,
model: Option<String>,
thinking_level: Option<String>,
policy: Option<String>,
probe: HarnessProbeResult,
}
fn resolve_identity(
repo: &Repository,
user_config: &UserConfig,
hints: IdentityHints,
) -> Result<ResolvedIdentity> {
let current_session = SessionManager::new(repo.root()).get_current_session()?;
let current_segment = current_session
.as_ref()
.and_then(|session| session.current_segment());
let token_claims = if user_config.harness.auto_infer {
user_config_token_claims(user_config)
} else {
None
};
let harness_override = resolved_harness_override(
user_config,
hints.harness.as_deref(),
hints.probe.harness.as_deref(),
);
Ok(ResolvedIdentity {
harness: hints.harness.or(hints.probe.harness),
provider: hints
.provider
.or(hints.probe.provider)
.or_else(|| current_segment.map(|segment| segment.provider.clone()))
.or_else(|| {
token_claims
.as_ref()
.and_then(|claims| claims.agent_provider.clone())
})
.or_else(|| harness_override.and_then(|entry| entry.provider.clone()))
.or_else(|| user_config.agent.provider.clone()),
model: hints
.model
.or(hints.probe.model)
.or_else(|| current_segment.map(|segment| segment.model.clone()))
.or_else(|| {
token_claims
.as_ref()
.and_then(|claims| claims.agent_model.clone())
})
.or_else(|| harness_override.and_then(|entry| entry.model.clone()))
.or_else(|| user_config.agent.model.clone()),
thinking_level: hints
.thinking_level
.or(hints.probe.thinking_level)
.or_else(|| harness_override.and_then(|entry| entry.thinking_level.clone())),
policy: hints
.policy
.or(hints.probe.policy)
.or_else(|| current_segment.and_then(|segment| segment.policy_id.clone()))
.or_else(|| harness_override.and_then(|entry| entry.policy.clone()))
.or_else(|| user_config.agent.default_policy.clone()),
})
}
fn resolved_harness_override<'a>(
user_config: &'a UserConfig,
explicit: Option<&str>,
fingerprint: Option<&str>,
) -> Option<&'a UserHarnessOverride> {
explicit
.and_then(|name| user_config.harness.harnesses.get(name))
.or_else(|| fingerprint.and_then(|name| user_config.harness.harnesses.get(name)))
}
enum AttachTarget {
ExistingSession(objects::object::Session),
CreateNew { _because_claimed: bool },
}
struct ResolvedAttachment {
target: AttachTarget,
matched_entry: Option<AgentEntry>,
attach_reason: String,
precedence: Vec<String>,
winning_rule: String,
}
fn resolve_actor_attachment(
registry: &AgentRegistry,
repo: &Repository,
sessions: &mut SessionManager,
input: AttachmentResolutionInput<'_>,
) -> Result<ResolvedAttachment> {
let AttachmentResolutionInput {
requested_entry,
explicit_heddle_session_id,
client_instance_id,
probe,
token_claims,
} = input;
let mut precedence = Vec::new();
if let Some(entry) = requested_entry
&& let Some(bound_session_id) = entry.heddle_session_id.as_deref()
{
precedence.push(format!(
"explicit-agent-session:{}:matched",
entry.session_id
));
let session = sessions
.get_session(bound_session_id)?
.ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
if !session.is_active() {
return Err(anyhow!("session is not active: {bound_session_id}"));
}
return Ok(ResolvedAttachment {
target: AttachTarget::ExistingSession(session),
matched_entry: Some(entry.clone()),
attach_reason: format!(
"reattached actor {} to existing Heddle session {}",
entry.session_id, bound_session_id
),
precedence,
winning_rule: "explicit-agent-session".to_string(),
});
}
precedence.push("explicit-agent-session:miss".to_string());
if let Some(session_id) = explicit_heddle_session_id {
precedence.push(format!("explicit-heddle-session:{session_id}:matched"));
ensure_requested_entry_matches_session(requested_entry, session_id)?;
let session = sessions
.get_session(session_id)?
.ok_or_else(|| anyhow!("session not found: {session_id}"))?;
if !session.is_active() {
return Err(anyhow!("session is not active: {session_id}"));
}
return Ok(ResolvedAttachment {
target: AttachTarget::ExistingSession(session),
matched_entry: None,
attach_reason: format!("attached to explicit Heddle session {session_id}"),
precedence,
winning_rule: "explicit-heddle-session".to_string(),
});
}
precedence.push("explicit-heddle-session:miss".to_string());
if let Some(native_actor_key) = probe.native_actor_key.as_deref() {
if let Some(entry) = registry.find_active_by_native_actor_key(native_actor_key)?
&& claude_actor_compatible(&entry, probe, repo.root())
&& let Some(bound_session_id) = entry.heddle_session_id.clone()
{
precedence.push(format!("native-actor-key:{native_actor_key}:matched"));
let session = sessions
.get_session(&bound_session_id)?
.ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
if session.is_active() {
return Ok(ResolvedAttachment {
target: AttachTarget::ExistingSession(session),
matched_entry: Some(entry),
attach_reason: format!(
"reattached native actor {} to Heddle session {}",
native_actor_key, bound_session_id
),
precedence,
winning_rule: "native-actor-key".to_string(),
});
}
}
precedence.push(format!("native-actor-key:{native_actor_key}:miss"));
} else {
precedence.push("native-actor-key:miss".to_string());
}
if let Some(client_instance_id) = client_instance_id {
if let Some(entry) = registry.find_active_by_client_instance_id(client_instance_id)?
&& let Some(bound_session_id) = entry.heddle_session_id.clone()
{
precedence.push(format!("client-instance-id:{client_instance_id}:matched"));
let session = sessions
.get_session(&bound_session_id)?
.ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
if session.is_active() {
return Ok(ResolvedAttachment {
target: AttachTarget::ExistingSession(session),
matched_entry: Some(entry),
attach_reason: format!(
"reattached client instance {client_instance_id} to Heddle session {bound_session_id}"
),
precedence,
winning_rule: "client-instance-id".to_string(),
});
}
}
precedence.push(format!("client-instance-id:{client_instance_id}:miss"));
return Ok(ResolvedAttachment {
target: AttachTarget::CreateNew {
_because_claimed: false,
},
matched_entry: None,
attach_reason: format!(
"started new Heddle session for distinct client instance {client_instance_id}"
),
precedence,
winning_rule: "create-new-session".to_string(),
});
} else {
precedence.push("client-instance-id:miss".to_string());
}
if probe.native_actor_key.is_some() {
precedence.push("native-instance-key:skipped-strong-native-key".to_string());
return Ok(ResolvedAttachment {
target: AttachTarget::CreateNew {
_because_claimed: false,
},
matched_entry: None,
attach_reason:
"started new Heddle session because no compatible native actor match was found"
.to_string(),
precedence,
winning_rule: "create-new-session".to_string(),
});
}
if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
if let Some(entry) =
registry.find_active_by_native_instance_key_at_path(native_instance_key, repo.root())?
&& claude_actor_compatible(&entry, probe, repo.root())
&& let Some(bound_session_id) = entry.heddle_session_id.clone()
{
precedence.push(format!("native-instance-key:{native_instance_key}:matched"));
let session = sessions
.get_session(&bound_session_id)?
.ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
if session.is_active() {
return Ok(ResolvedAttachment {
target: AttachTarget::ExistingSession(session),
matched_entry: Some(entry),
attach_reason: format!(
"reattached native instance {} to Heddle session {}",
native_instance_key, bound_session_id
),
precedence,
winning_rule: "native-instance-key".to_string(),
});
}
}
precedence.push(format!("native-instance-key:{native_instance_key}:miss"));
} else {
precedence.push("native-instance-key:miss".to_string());
}
if probe.attach_hints.root_actor
&& let Some(current) = sessions.get_current_session()?
&& current.is_active()
{
let claimed = session_claimed_by_other(
registry,
¤t.id,
requested_entry,
client_instance_id,
probe.native_actor_key.as_deref(),
)?;
if !claimed {
precedence.push(format!("current-worktree-session:{}:matched", current.id));
return Ok(ResolvedAttachment {
target: AttachTarget::ExistingSession(current.clone()),
matched_entry: None,
attach_reason: format!("attached to active worktree Heddle session {}", current.id),
precedence,
winning_rule: "current-worktree-session".to_string(),
});
}
precedence.push(format!("current-worktree-session:{}:claimed", current.id));
return Ok(ResolvedAttachment {
target: AttachTarget::CreateNew {
_because_claimed: true,
},
matched_entry: None,
attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
precedence,
winning_rule: "create-new-session".to_string(),
});
}
precedence.push("current-worktree-session:miss".to_string());
if let Some(claims) = token_claims
&& let Some(token_sid) = claims.sid.as_deref()
&& let Some(session) = sessions.get_session(token_sid)?
&& session.is_active()
{
let claimed = session_claimed_by_other(
registry,
&session.id,
requested_entry,
client_instance_id,
probe.native_actor_key.as_deref(),
)?;
if !claimed {
precedence.push(format!("token-sid:{token_sid}:matched"));
return Ok(ResolvedAttachment {
target: AttachTarget::ExistingSession(session),
matched_entry: None,
attach_reason: format!(
"attached to Heddle session {token_sid} from auth token sid"
),
precedence,
winning_rule: "token-sid".to_string(),
});
}
precedence.push(format!("token-sid:{token_sid}:claimed"));
return Ok(ResolvedAttachment {
target: AttachTarget::CreateNew {
_because_claimed: true,
},
matched_entry: None,
attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
precedence,
winning_rule: "create-new-session".to_string(),
});
}
precedence.push("token-sid:miss".to_string());
Ok(ResolvedAttachment {
target: AttachTarget::CreateNew {
_because_claimed: false,
},
matched_entry: None,
attach_reason: "started new Heddle session".to_string(),
precedence,
winning_rule: "create-new-session".to_string(),
})
}
fn claude_actor_compatible(
entry: &AgentEntry,
probe: &HarnessProbeResult,
repo_root: &Path,
) -> bool {
let Some(native_actor_key) = probe.native_actor_key.as_deref() else {
return true;
};
if !native_actor_key.starts_with("claude-code:") {
return true;
}
if native_actor_key.starts_with("claude-code:agent:") {
return entry.native_actor_key.as_deref() == Some(native_actor_key);
}
if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
return entry.native_actor_key.as_deref() == Some(native_actor_key)
&& entry.native_instance_key.as_deref() == Some(native_instance_key);
}
let same_repo = entry
.path
.as_ref()
.map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()))
.unwrap_or_default()
== repo_root
.canonicalize()
.unwrap_or_else(|_| repo_root.to_path_buf());
entry.native_actor_key.as_deref() == Some(native_actor_key)
&& same_repo
&& probe.confidence.unwrap_or_default() >= 0.9
}
fn decode_token_claims(token: &str) -> Option<TokenClaims> {
let payload = token.split('.').nth(1)?;
let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(payload.as_bytes())
.ok()?;
serde_json::from_slice(&decoded).ok()
}
fn user_config_token_claims(user_config: &UserConfig) -> Option<TokenClaims> {
user_config
.remote_token()
.and_then(|token| decode_token_claims(&token.id))
}
#[derive(Debug, Deserialize)]
struct TokenClaims {
#[serde(default)]
sid: Option<String>,
#[serde(default)]
agent_provider: Option<String>,
#[serde(default)]
agent_model: Option<String>,
}
fn should_rotate_segment(session: &objects::object::Session, identity: &ResolvedIdentity) -> bool {
let Some(segment) = session.current_segment() else {
return false;
};
let provider_changed = identity
.provider
.as_deref()
.is_some_and(|provider| provider != segment.provider);
let model_changed = identity
.model
.as_deref()
.is_some_and(|model| model != segment.model);
provider_changed || model_changed
}
fn thread_id_for_name(repo: &Repository, thread_name: Option<&str>) -> Result<Option<String>> {
let Some(thread_name) = thread_name else {
return Ok(None);
};
Ok(ThreadManager::new(repo.heddle_dir())
.load(thread_name)?
.map(|thread| thread.id))
}
fn can_create_harness_thread(
repo: &Repository,
target_thread: Option<&str>,
parent_thread: Option<&str>,
) -> Result<bool> {
Ok(resolve_harness_thread_base_state(repo, target_thread, parent_thread)?.is_some())
}
fn resolve_harness_thread_base_state(
repo: &Repository,
target_thread: Option<&str>,
parent_thread: Option<&str>,
) -> Result<Option<objects::object::ChangeId>> {
if let Some(head_state) = repo.head()? {
return Ok(Some(head_state));
}
for thread_name in [parent_thread, target_thread].into_iter().flatten() {
if let Some(state) = resolve_named_thread_base_state(repo, thread_name)? {
return Ok(Some(state));
}
}
Ok(None)
}
fn resolve_named_thread_base_state(
repo: &Repository,
thread_name: &str,
) -> Result<Option<objects::object::ChangeId>> {
if let Some(thread) = ThreadManager::new(repo.heddle_dir()).load(thread_name)?
&& let Some(state_spec) = thread
.current_state
.as_deref()
.or(Some(thread.base_state.as_str()))
&& let Some(state_id) = repo
.resolve_state(state_spec)?
.or_else(|| objects::object::ChangeId::parse(state_spec).ok())
{
return Ok(Some(state_id));
}
Ok(repo.refs().get_thread(thread_name)?)
}
fn resolve_parent_thread_for_subagent(
repo: &Repository,
probe: &HarnessProbeResult,
current_attached: Option<&str>,
) -> Result<Option<String>> {
if let Some(parent_key) = probe.native_parent_actor_key.as_deref() {
let registry = AgentRegistry::new(repo.heddle_dir());
if let Some(entry) = registry.find_active_by_native_actor_key(parent_key)? {
return Ok(Some(entry.thread));
}
}
Ok(current_attached.map(ToString::to_string))
}
fn preferred_thread_slug(
params: &OpenSessionParams,
probe: &HarnessProbeResult,
identity: &ResolvedIdentity,
) -> String {
params
.task
.clone()
.or_else(|| params.summary.clone())
.or_else(|| probe.native_actor_key.as_deref().map(native_key_slug))
.or_else(|| probe.native_instance_key.as_deref().map(native_key_slug))
.or_else(|| identity.harness.clone())
.unwrap_or_else(|| "work".to_string())
}
fn native_key_slug(value: &str) -> String {
value
.rsplit(':')
.next()
.map(ToString::to_string)
.unwrap_or_else(|| value.to_string())
}
fn allocate_thread_name(repo: &Repository, base: &str) -> Result<String> {
if ThreadManager::new(repo.heddle_dir()).load(base)?.is_none()
&& repo.refs().get_thread(base)?.is_none()
{
return Ok(base.to_string());
}
for idx in 2..1000 {
let candidate = format!("{base}-{idx}");
if ThreadManager::new(repo.heddle_dir())
.load(&candidate)?
.is_none()
&& repo.refs().get_thread(&candidate)?.is_none()
{
return Ok(candidate);
}
}
Err(anyhow!(
"could not allocate a unique thread name from '{base}'"
))
}
fn default_private_thread_path(repo: &Repository, name: &str) -> PathBuf {
let workspace_root = shared_workspace_root(repo);
let repo_name = workspace_root
.file_name()
.and_then(|name| name.to_str())
.filter(|name| !name.is_empty())
.unwrap_or("heddle");
let parent = workspace_root
.parent()
.map(|path| path.to_path_buf())
.unwrap_or_else(|| workspace_root.to_path_buf());
parent
.join(format!(".{repo_name}-heddle-threads"))
.join(sanitize_name(name))
.join("root")
}
fn shared_workspace_root(repo: &Repository) -> &Path {
repo.heddle_dir().parent().unwrap_or_else(|| repo.root())
}
fn sanitize_name(name: &str) -> String {
let mut out = String::new();
let mut last_dash = false;
for ch in name.chars() {
if ch.is_ascii_alphanumeric() {
out.push(ch.to_ascii_lowercase());
last_dash = false;
} else if !last_dash {
out.push('-');
last_dash = true;
}
}
out.trim_matches('-').to_string()
}
fn resolve_requested_registry_entry(
registry: &AgentRegistry,
agent_session_id: Option<&str>,
client_instance_id: Option<&str>,
) -> Result<Option<AgentEntry>> {
if let Some(agent_session_id) = agent_session_id {
let entry = registry
.load(agent_session_id)?
.ok_or_else(|| anyhow!("agent session not found: {agent_session_id}"))?;
if entry.status != AgentStatus::Active {
return Err(anyhow!("agent session is not active: {agent_session_id}"));
}
return Ok(Some(entry));
}
if let Some(client_instance_id) = client_instance_id {
return Ok(registry.find_active_by_client_instance_id(client_instance_id)?);
}
Ok(None)
}
fn ensure_requested_entry_matches_session(
requested_entry: Option<&AgentEntry>,
heddle_session_id: &str,
) -> Result<()> {
if let Some(entry) = requested_entry
&& let Some(bound_session_id) = entry.heddle_session_id.as_deref()
&& bound_session_id != heddle_session_id
{
return Err(anyhow!(
"requested agent is already bound to a different heddle session: {}",
entry.session_id
));
}
Ok(())
}
fn session_claimed_by_other(
registry: &AgentRegistry,
heddle_session_id: &str,
requested_entry: Option<&AgentEntry>,
client_instance_id: Option<&str>,
native_actor_key: Option<&str>,
) -> Result<bool> {
if requested_entry.is_none() && client_instance_id.is_none() && native_actor_key.is_none() {
return Ok(false);
}
let Some(existing) = registry.find_active_by_heddle_session_id(heddle_session_id)? else {
return Ok(false);
};
if let Some(requested) = requested_entry {
return Ok(requested.session_id != existing.session_id);
}
if let Some(client_instance_id) = client_instance_id
&& existing.client_instance_id.as_deref() == Some(client_instance_id)
{
return Ok(false);
}
if let Some(native_actor_key) = native_actor_key
&& existing.native_actor_key.as_deref() == Some(native_actor_key)
{
return Ok(false);
}
Ok(true)
}
fn find_matching_registry_entry(
registry: &AgentRegistry,
repo: &Repository,
heddle_session_id: &str,
thread_name: Option<&str>,
) -> Result<Option<AgentEntry>> {
if let Some(entry) = registry.find_active_by_heddle_session_id(heddle_session_id)? {
return Ok(Some(entry));
}
let canonical_root = repo
.root()
.canonicalize()
.unwrap_or_else(|_| repo.root().to_path_buf());
Ok(registry
.list()?
.into_iter()
.filter(|entry| entry.status == AgentStatus::Active)
.find(|entry| {
entry
.path
.as_ref()
.map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()) == canonical_root)
.unwrap_or(false)
|| thread_name.is_some_and(|thread| entry.thread == thread)
}))
}
fn merged_env_hints(extra: &BTreeMap<String, String>) -> BTreeMap<String, String> {
let mut merged: BTreeMap<String, String> = std::env::vars()
.filter(|(key, _)| inherited_harness_hint(key))
.collect();
for (key, value) in extra {
merged.insert(key.clone(), value.clone());
}
merged
}
fn inherited_harness_hint(key: &str) -> bool {
if matches!(
key,
"OPENAI_MODEL"
| "ANTHROPIC_MODEL"
| "CLAUDE_MODEL"
| "MODEL"
| "OPENAI_REASONING_EFFORT"
| "REASONING_EFFORT"
| "THINKING_LEVEL"
| "PROMPT_POLICY"
) {
return false;
}
key.starts_with("HEDDLE_")
|| key.starts_with("CODEX_")
|| key == "CLAUDECODE"
|| key.starts_with("OPENCODE_")
}
fn to_json_value<T: Serialize>(value: T) -> Result<Value> {
serde_json::to_value(value).map_err(|err| anyhow!(err))
}
fn normalize_paths<I>(paths: I) -> Vec<String>
where
I: IntoIterator<Item = String>,
{
let mut ordered = BTreeSet::new();
for path in paths {
let normalized = path.trim().replace('\\', "/");
if !normalized.is_empty() {
ordered.insert(normalized);
}
}
ordered.into_iter().collect()
}
fn merge_unique_paths<I>(target: &mut Vec<String>, paths: I)
where
I: IntoIterator<Item = String>,
{
let mut merged: BTreeSet<String> = target.iter().cloned().collect();
merged.extend(paths);
*target = merged.into_iter().collect();
}
fn max_u64(current: Option<u64>, candidate: u64) -> u64 {
current
.map(|value| value.max(candidate))
.unwrap_or(candidate)
}
fn max_u32(current: Option<u32>, candidate: u32) -> u32 {
current
.map(|value| value.max(candidate))
.unwrap_or(candidate)
}
fn merge_usage(target: &mut UsageTotals, incoming: &UsageTotals) {
if let Some(input) = incoming.input_tokens {
target.input_tokens = Some(max_u64(target.input_tokens, input));
}
if let Some(output) = incoming.output_tokens {
target.output_tokens = Some(max_u64(target.output_tokens, output));
}
if let Some(reasoning) = incoming.reasoning_tokens {
target.reasoning_tokens = Some(max_u64(target.reasoning_tokens, reasoning));
}
if let Some(cache_creation) = incoming.cache_creation_tokens {
target.cache_creation_tokens = Some(max_u64(target.cache_creation_tokens, cache_creation));
}
if let Some(cache_read) = incoming.cache_read_tokens {
target.cache_read_tokens = Some(max_u64(target.cache_read_tokens, cache_read));
}
if let Some(tool_calls) = incoming.tool_calls {
target.tool_calls = Some(max_u32(target.tool_calls, tool_calls));
}
if let Some(cost) = incoming.cost_micros_usd {
target.cost_micros_usd = Some(max_u64(target.cost_micros_usd, cost));
}
}
fn parse_timestamp(value: &str) -> Option<chrono::DateTime<Utc>> {
chrono::DateTime::parse_from_rfc3339(value)
.ok()
.map(|dt| dt.with_timezone(&Utc))
}
fn transport_from_report(
report: &SessionReportEnvelope,
fallback: HarnessTransport,
) -> HarnessTransport {
match report.transport_mode.as_str() {
"spool" => HarnessTransport::Spool,
"direct" => HarnessTransport::Direct,
"end" => HarnessTransport::End,
_ => fallback,
}
}
fn mark_pending_flush(report: &mut SessionReportEnvelope) {
report.pending_flush = true;
report.report_flush_state = Some("pending-local".to_string());
}
fn enqueue_report(store: &SessionReportStore, report: &mut SessionReportEnvelope) -> Result<()> {
store.append_outbox(report)?;
report.pending_flush = false;
let flushed_at = Utc::now().to_rfc3339();
report.last_flushed_at = Some(flushed_at);
report.report_flush_state = Some("queued-local".to_string());
store.save(report)?;
Ok(())
}
fn usage_to_summary(usage: &UsageTotals) -> AgentUsageSummary {
AgentUsageSummary {
input_tokens: usage.input_tokens,
output_tokens: usage.output_tokens,
reasoning_tokens: usage.reasoning_tokens,
tool_calls: usage.tool_calls,
cost_micros_usd: usage.cost_micros_usd,
}
}
fn transcript_mode_name(mode: HarnessTranscriptMode) -> &'static str {
match mode {
HarnessTranscriptMode::Off => "off",
HarnessTranscriptMode::Summary => "summary",
HarnessTranscriptMode::Full => "full",
}
}
fn transport_mode_name(mode: HarnessTransport) -> &'static str {
match mode {
HarnessTransport::Spool => "spool",
HarnessTransport::Direct => "direct",
HarnessTransport::End => "end",
}
}
struct FinalDiff {
changed_paths: Vec<String>,
diff_summary: SessionDiffSummary,
head_state: Option<String>,
}
fn compute_final_diff(
repo: &Repository,
base_state: Option<&str>,
worktree_baseline: &[WorktreeChangeBaseline],
) -> Result<FinalDiff> {
let mut changes: BTreeMap<String, DiffKind> = BTreeMap::new();
let head_state = repo.head()?;
if let (Some(base_spec), Some(head_id)) = (base_state, head_state) {
let base_id = repo
.resolve_state(base_spec)?
.or_else(|| objects::object::ChangeId::parse(base_spec).ok());
if let Some(base_id) = base_id
&& base_id != head_id
{
let Some(base_state_obj) = repo.store().get_state(&base_id)? else {
return Err(anyhow!("base state not found: {base_spec}"));
};
let Some(head_state_obj) = repo.store().get_state(&head_id)? else {
return Err(anyhow!("head state not found: {}", head_id.short()));
};
for change in repo.diff_trees(&base_state_obj.tree, &head_state_obj.tree)? {
changes.insert(change.path, change.kind);
}
}
}
let baseline_paths: BTreeSet<(String, String)> = worktree_baseline
.iter()
.map(|change| (change.path.clone(), change.kind.clone()))
.collect();
for (path, kind) in collect_worktree_changes(repo)? {
let kind_name = diff_kind_name(kind);
if !baseline_paths.contains(&(path.clone(), kind_name.to_string())) {
changes.insert(path, kind);
}
}
let diff_summary = SessionDiffSummary {
changed_file_count: changes.len() as u32,
added_files: changes
.values()
.filter(|kind| **kind == DiffKind::Added)
.count() as u32,
modified_files: changes
.values()
.filter(|kind| **kind == DiffKind::Modified)
.count() as u32,
deleted_files: changes
.values()
.filter(|kind| **kind == DiffKind::Deleted)
.count() as u32,
};
Ok(FinalDiff {
changed_paths: changes.into_keys().collect(),
diff_summary,
head_state: head_state.map(|id| id.to_string_full()),
})
}
fn capture_worktree_change_snapshot(repo: &Repository) -> Result<Vec<WorktreeChangeBaseline>> {
Ok(collect_worktree_changes(repo)?
.into_iter()
.map(|(path, kind)| WorktreeChangeBaseline {
path,
kind: diff_kind_name(kind).to_string(),
})
.collect())
}
fn collect_worktree_changes(repo: &Repository) -> Result<BTreeMap<String, DiffKind>> {
let status_options = worktree_status_options(Some(repo.config()));
let worktree_tree = match repo.current_state()? {
Some(state) => repo.store().get_tree(&state.tree)?.unwrap_or_default(),
None => Tree::new(),
};
let status = repo.compare_worktree_cached_with_options(&worktree_tree, &status_options)?;
let mut changes = BTreeMap::new();
for path in status.added {
changes.insert(path.display().to_string(), DiffKind::Added);
}
for path in status.modified {
changes.insert(path.display().to_string(), DiffKind::Modified);
}
for path in status.deleted {
changes.insert(path.display().to_string(), DiffKind::Deleted);
}
Ok(changes)
}
fn diff_kind_name(kind: DiffKind) -> &'static str {
match kind {
DiffKind::Added => "added",
DiffKind::Modified => "modified",
DiffKind::Deleted => "deleted",
DiffKind::Unchanged => "unchanged",
}
}
struct SessionReportStore {
dir: PathBuf,
}
impl SessionReportStore {
fn new(repo_root: &Path) -> Self {
Self {
dir: repo_root.join(".heddle/state").join("session-reports"),
}
}
fn session_path(&self, heddle_session_id: &str) -> PathBuf {
self.dir.join(format!("{heddle_session_id}.json"))
}
fn outbox_path(&self) -> PathBuf {
self.dir.join("outbox.jsonl")
}
fn load(&self, heddle_session_id: &str) -> Result<Option<SessionReportEnvelope>> {
let path = self.session_path(heddle_session_id);
if !path.exists() {
return Ok(None);
}
let bytes = fs::read(path)?;
Ok(Some(serde_json::from_slice(&bytes)?))
}
fn save(&self, report: &SessionReportEnvelope) -> Result<()> {
fs::create_dir_all(&self.dir)?;
let path = self.session_path(&report.heddle_session_id);
let bytes = serde_json::to_vec_pretty(report)?;
write_file_atomic(&path, &bytes)?;
Ok(())
}
fn append_outbox(&self, report: &SessionReportEnvelope) -> Result<()> {
fs::create_dir_all(&self.dir)?;
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(self.outbox_path())?;
serde_json::to_writer(&mut file, report)?;
file.write_all(b"\n")?;
file.flush()?;
Ok(())
}
fn list_pending(&self) -> Result<Vec<String>> {
if !self.dir.exists() {
return Ok(Vec::new());
}
let mut ids = Vec::new();
for entry in fs::read_dir(&self.dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
continue;
}
let bytes = fs::read(&path)?;
let report: SessionReportEnvelope = serde_json::from_slice(&bytes)?;
if report.pending_flush {
ids.push(report.heddle_session_id);
}
}
ids.sort();
Ok(ids)
}
}
#[derive(Debug, Deserialize)]
struct BridgeRequest {
#[serde(default)]
id: Option<String>,
method: String,
#[serde(default)]
params: Value,
}
#[derive(Debug, Serialize)]
struct BridgeResponse {
#[serde(default)]
id: Option<String>,
ok: bool,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<BridgeError>,
}
impl BridgeResponse {
fn ok(id: Option<String>, result: Value) -> Self {
Self {
id,
ok: true,
result: Some(result),
error: None,
}
}
fn error(id: Option<String>, code: impl Into<String>, message: impl Into<String>) -> Self {
Self {
id,
ok: false,
result: None,
error: Some(BridgeError {
code: code.into(),
message: message.into(),
}),
}
}
}
#[derive(Debug, Serialize)]
struct BridgeError {
code: String,
message: String,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct OpenSessionParams {
#[serde(default)]
heddle_session_id: Option<String>,
#[serde(default)]
agent_session_id: Option<String>,
#[serde(default)]
client_instance_id: Option<String>,
#[serde(default)]
thread: Option<String>,
#[serde(default)]
task: Option<String>,
#[serde(default)]
summary: Option<String>,
#[serde(default)]
harness: Option<String>,
#[serde(default)]
provider: Option<String>,
#[serde(default)]
model: Option<String>,
#[serde(default)]
thinking_level: Option<String>,
#[serde(default)]
policy: Option<String>,
#[serde(default)]
transport: Option<HarnessTransport>,
#[serde(default)]
transcript_mode: Option<HarnessTranscriptMode>,
#[serde(default)]
argv: Option<Vec<String>>,
#[serde(default)]
env_hints: BTreeMap<String, String>,
#[serde(default)]
probe_metadata: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct UpdateProgressParams {
heddle_session_id: String,
#[serde(default)]
status: Option<String>,
#[serde(default)]
message: Option<String>,
#[serde(default)]
completed_steps: Option<u32>,
#[serde(default)]
total_steps: Option<u32>,
#[serde(default)]
touched_paths: Vec<String>,
#[serde(default)]
summary: Option<String>,
#[serde(default)]
harness: Option<String>,
#[serde(default)]
provider: Option<String>,
#[serde(default)]
model: Option<String>,
#[serde(default)]
thinking_level: Option<String>,
#[serde(default)]
policy: Option<String>,
#[serde(default)]
argv: Option<Vec<String>>,
#[serde(default)]
env_hints: BTreeMap<String, String>,
#[serde(default)]
probe_metadata: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct RecordUsageParams {
heddle_session_id: String,
#[serde(default)]
input_tokens: Option<u64>,
#[serde(default)]
output_tokens: Option<u64>,
#[serde(default)]
reasoning_tokens: Option<u64>,
#[serde(default)]
cache_creation_tokens: Option<u64>,
#[serde(default)]
cache_read_tokens: Option<u64>,
#[serde(default)]
tool_calls: Option<u32>,
#[serde(default)]
cost_micros_usd: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct RecordTouchedPathsParams {
heddle_session_id: String,
#[serde(default)]
paths: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct CloseSessionParams {
heddle_session_id: String,
#[serde(default)]
outcome: Option<String>,
#[serde(default)]
summary: Option<String>,
#[serde(default)]
transcript_refs: Option<Vec<TranscriptAttachmentRef>>,
#[serde(default)]
transport: Option<HarnessTransport>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct FlushReportsParams {
#[serde(default)]
heddle_session_id: Option<String>,
}
#[derive(Debug, Serialize)]
struct OpenSessionResult {
heddle_session_id: String,
heddle_segment_id: Option<String>,
agent_session_id: Option<String>,
created_session: bool,
harness: Option<String>,
provider: Option<String>,
model: Option<String>,
thinking_level: Option<String>,
report_flush_state: Option<String>,
attach_reason: Option<String>,
}
#[derive(Debug, Serialize)]
struct SessionMutationResult {
heddle_session_id: String,
heddle_segment_id: Option<String>,
report_flush_state: Option<String>,
}
#[derive(Debug, Serialize)]
struct CloseSessionResult {
heddle_session_id: String,
changed_paths: Vec<String>,
diff_summary: SessionDiffSummary,
report_flush_state: Option<String>,
}
#[derive(Debug, Serialize)]
struct FlushReportsResult {
flushed: usize,
}
#[cfg(test)]
mod tests {
use super::*;
fn init_repo() -> (tempfile::TempDir, Repository) {
let temp = tempfile::TempDir::new().unwrap();
let repo = Repository::init_default(temp.path()).unwrap();
(temp, repo)
}
#[test]
fn inherited_harness_hints_exclude_ambient_model_identity() {
assert!(!inherited_harness_hint("OPENAI_MODEL"));
assert!(!inherited_harness_hint("ANTHROPIC_MODEL"));
assert!(!inherited_harness_hint("CLAUDE_MODEL"));
assert!(!inherited_harness_hint("MODEL"));
assert!(!inherited_harness_hint("OPENAI_REASONING_EFFORT"));
assert!(inherited_harness_hint("HEDDLE_AGENT_MODEL"));
assert!(inherited_harness_hint("CODEX_SANDBOX"));
assert!(inherited_harness_hint("CLAUDECODE"));
}
#[test]
fn open_session_creates_or_attaches() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let created = runtime
.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
assert!(created.created_session);
let attached = runtime
.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
assert!(!attached.created_session);
assert_eq!(created.heddle_session_id, attached.heddle_session_id);
}
#[test]
fn same_client_instance_reattaches_to_its_existing_session() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let first = runtime
.open_session(OpenSessionParams {
client_instance_id: Some("client-a".to_string()),
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
let second = runtime
.open_session(OpenSessionParams {
client_instance_id: Some("client-b".to_string()),
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
let reopened = runtime
.open_session(OpenSessionParams {
client_instance_id: Some("client-a".to_string()),
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
assert_ne!(first.heddle_session_id, second.heddle_session_id);
assert_eq!(first.heddle_session_id, reopened.heddle_session_id);
assert_eq!(first.agent_session_id, reopened.agent_session_id);
}
#[test]
fn different_client_instances_do_not_share_the_current_session() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let first = runtime
.open_session(OpenSessionParams {
client_instance_id: Some("client-a".to_string()),
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
let second = runtime
.open_session(OpenSessionParams {
client_instance_id: Some("client-b".to_string()),
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
assert_ne!(first.heddle_session_id, second.heddle_session_id);
assert_ne!(first.agent_session_id, second.agent_session_id);
}
#[test]
fn provider_model_change_creates_segment() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let opened = runtime
.open_session(OpenSessionParams {
harness: Some("claude-code".to_string()),
provider: Some("anthropic".to_string()),
model: Some("claude-sonnet".to_string()),
..OpenSessionParams::default()
})
.unwrap();
runtime
.update_progress(UpdateProgressParams {
heddle_session_id: opened.heddle_session_id.clone(),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..UpdateProgressParams::default()
})
.unwrap();
let report = runtime
.reports
.load(&opened.heddle_session_id)
.unwrap()
.unwrap();
let expected_segment = format!("{}-seg-2", opened.heddle_session_id);
assert_eq!(
report.heddle_segment_id.as_deref(),
Some(expected_segment.as_str())
);
}
#[test]
fn close_session_captures_changed_paths_from_status_and_hints() {
let (temp, repo) = init_repo();
let config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, config);
let opened = runtime
.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
std::fs::write(temp.path().join("src.txt"), "hello\n").unwrap();
runtime
.record_touched_paths(RecordTouchedPathsParams {
heddle_session_id: opened.heddle_session_id.clone(),
paths: vec!["src.txt".to_string(), "notes.md".to_string()],
})
.unwrap();
let closed = runtime
.close_session(CloseSessionParams {
heddle_session_id: opened.heddle_session_id.clone(),
outcome: Some("completed".to_string()),
..CloseSessionParams::default()
})
.unwrap();
let report = runtime
.reports
.load(&opened.heddle_session_id)
.unwrap()
.unwrap();
assert!(closed.changed_paths.iter().any(|path| path == "src.txt"));
assert!(!closed.changed_paths.iter().any(|path| path == "notes.md"));
assert!(report.touched_paths.iter().any(|path| path == "src.txt"));
assert!(report.touched_paths.iter().any(|path| path == "notes.md"));
assert_eq!(
closed.diff_summary.changed_file_count,
closed.changed_paths.len() as u32
);
}
#[test]
fn flush_reports_moves_pending_report_to_outbox() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let opened = runtime
.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
let flushed = runtime
.flush_reports(FlushReportsParams {
heddle_session_id: Some(opened.heddle_session_id.clone()),
})
.unwrap();
assert_eq!(flushed.flushed, 1);
let report = runtime
.reports
.load(&opened.heddle_session_id)
.unwrap()
.unwrap();
assert!(!report.pending_flush);
assert_eq!(report.report_flush_state.as_deref(), Some("queued-local"));
assert!(runtime.reports.outbox_path().exists());
}
#[test]
fn explicit_overrides_beat_fingerprint_and_user_defaults() {
let (_temp, repo) = init_repo();
let mut user_config = UserConfig::default();
user_config.harness.harnesses.insert(
"codex".to_string(),
UserHarnessOverride {
provider: Some("openai".to_string()),
model: Some("gpt-default".to_string()),
thinking_level: Some("medium".to_string()),
policy: Some("default".to_string()),
},
);
let identity = resolve_identity(
&repo,
&user_config,
IdentityHints {
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
thinking_level: Some("high".to_string()),
policy: Some("custom".to_string()),
probe: HarnessProbeResult::default(),
},
)
.unwrap();
assert_eq!(identity.model.as_deref(), Some("gpt-5.4"));
assert_eq!(identity.thinking_level.as_deref(), Some("high"));
assert_eq!(identity.policy.as_deref(), Some("custom"));
}
#[test]
fn transcript_mode_defaults_to_off_and_keeps_refs_empty() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let opened = runtime
.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
provider: Some("openai".to_string()),
model: Some("gpt-5.4".to_string()),
..OpenSessionParams::default()
})
.unwrap();
let report = runtime
.reports
.load(&opened.heddle_session_id)
.unwrap()
.unwrap();
assert_eq!(report.transcript_mode, "off");
assert!(report.transcript_refs.is_empty());
}
#[test]
fn codex_thread_probe_reattaches_same_actor() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let first = runtime
.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
probe_metadata: BTreeMap::from([
("thread_id".to_string(), "thr_123".to_string()),
("client_name".to_string(), "codex-tui".to_string()),
]),
..OpenSessionParams::default()
})
.unwrap();
let second = runtime
.open_session(OpenSessionParams {
harness: Some("codex".to_string()),
probe_metadata: BTreeMap::from([
("thread_id".to_string(), "thr_123".to_string()),
("client_name".to_string(), "codex-tui".to_string()),
]),
..OpenSessionParams::default()
})
.unwrap();
assert_eq!(first.agent_session_id, second.agent_session_id);
assert_eq!(first.heddle_session_id, second.heddle_session_id);
}
#[test]
fn opencode_child_session_creates_distinct_actor_with_parent_key() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let root = runtime
.open_session(OpenSessionParams {
harness: Some("opencode".to_string()),
probe_metadata: BTreeMap::from([("session_id".to_string(), "root-1".to_string())]),
..OpenSessionParams::default()
})
.unwrap();
let child = runtime
.open_session(OpenSessionParams {
harness: Some("opencode".to_string()),
probe_metadata: BTreeMap::from([
("session_id".to_string(), "child-1".to_string()),
("parent_id".to_string(), "root-1".to_string()),
]),
..OpenSessionParams::default()
})
.unwrap();
assert_ne!(root.agent_session_id, child.agent_session_id);
let report = runtime
.reports
.load(&child.heddle_session_id)
.unwrap()
.unwrap();
assert_eq!(
report.native_parent_actor_key.as_deref(),
Some("opencode:session:root-1")
);
}
#[test]
fn claude_resume_with_new_session_id_does_not_steal_existing_actor() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let first = runtime
.open_session(OpenSessionParams {
harness: Some("claude-code".to_string()),
probe_metadata: BTreeMap::from([
("session_id".to_string(), "sess-old".to_string()),
(
"transcript_path".to_string(),
"/tmp/claude/session-a.jsonl".to_string(),
),
]),
..OpenSessionParams::default()
})
.unwrap();
let resumed = runtime
.open_session(OpenSessionParams {
harness: Some("claude-code".to_string()),
probe_metadata: BTreeMap::from([
("session_id".to_string(), "sess-new".to_string()),
(
"transcript_path".to_string(),
"/tmp/claude/session-a.jsonl".to_string(),
),
]),
..OpenSessionParams::default()
})
.unwrap();
assert_ne!(first.agent_session_id, resumed.agent_session_id);
assert_ne!(first.heddle_session_id, resumed.heddle_session_id);
}
#[test]
fn explicit_claude_harness_beats_generic_session_id_probe_match() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let opened = runtime
.open_session(OpenSessionParams {
harness: Some("claude-code".to_string()),
probe_metadata: BTreeMap::from([
("session_id".to_string(), "claude-sess-1".to_string()),
("hook_event".to_string(), "SubagentStop".to_string()),
]),
..OpenSessionParams::default()
})
.unwrap();
let report = runtime
.reports
.load(&opened.heddle_session_id)
.unwrap()
.unwrap();
assert_eq!(
report.native_actor_key.as_deref(),
Some("claude-code:session:claude-sess-1")
);
assert_eq!(report.harness.harness.as_deref(), Some("claude-code"));
}
#[test]
fn same_native_actor_key_reuses_existing_actor_after_tentative_session_creation() {
let (_temp, repo) = init_repo();
let user_config = UserConfig::default();
let runtime = HarnessBridgeRuntime::new(repo, user_config);
let principal = runtime.repo.get_principal().unwrap();
let mut sessions = SessionManager::new(runtime.repo.root());
let existing_session = sessions
.start_session(
principal.clone(),
"anthropic".to_string(),
"claude-opus-4-7[1m]".to_string(),
None,
)
.unwrap();
let tentative_session = sessions
.start_session(
principal,
"anthropic".to_string(),
"claude-opus-4-7[1m]".to_string(),
None,
)
.unwrap();
let registry = AgentRegistry::new(runtime.repo.heddle_dir());
let existing_entry = registry
.create_generated_entry(|session_id| {
Ok(AgentEntry {
session_id: session_id.to_string(),
client_instance_id: None,
native_actor_key: Some(
"claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
),
native_parent_actor_key: None,
native_instance_key: Some(
"claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
),
heddle_session_id: Some(existing_session.id.clone()),
thread_id: None,
thread: "detached".to_string(),
pid: Some(std::process::id()),
boot_id: None,
liveness_path: None,
heartbeat_at: Some(Utc::now()),
anchor_state: None,
anchor_root: None,
reservation_token: Some(objects::store::generate_agent_id()),
path: Some(runtime.repo.root().to_path_buf()),
base_state: String::new(),
started_at: Utc::now(),
provider: Some("anthropic".to_string()),
model: Some("claude-opus-4-7[1m]".to_string()),
harness: Some("claude-code".to_string()),
thinking_level: None,
usage_summary: AgentUsageSummary::default(),
last_progress_at: None,
report_flush_state: Some("pending-local".to_string()),
attach_reason: None,
attach_precedence: vec![],
winning_attach_rule: None,
probe_source: Some("hook_payload".to_string()),
probe_confidence: Some(1.0),
status: AgentStatus::Active,
completed_at: None,
context_queries: vec![],
})
})
.unwrap();
let probe = HarnessProbeResult {
harness: Some("claude-code".to_string()),
provider: Some("anthropic".to_string()),
model: Some("claude-opus-4-7[1m]".to_string()),
native_actor_key: Some(
"claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
),
native_instance_key: Some(
"claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
),
probe_source: Some("hook_payload".to_string()),
confidence: Some(1.0),
..HarnessProbeResult::default()
};
let identity = ResolvedIdentity {
harness: Some("claude-code".to_string()),
provider: Some("anthropic".to_string()),
model: Some("claude-opus-4-7[1m]".to_string()),
thinking_level: None,
policy: None,
};
let mut attach = ResolvedAttachment {
target: AttachTarget::CreateNew {
_because_claimed: false,
},
matched_entry: None,
attach_reason:
"started new Heddle session because no compatible native actor match was found"
.to_string(),
precedence: vec!["native-actor-key:miss".to_string()],
winning_rule: "create-new-session".to_string(),
};
let resolved_entry = runtime
.ensure_registry_entry(RegistryEntryRequest {
heddle_session_id: &tentative_session.id,
thread_name: None,
thread_id: None,
identity: &identity,
probe: &probe,
attach: &attach,
client_instance_id: None,
requested_entry: None,
})
.unwrap();
assert_eq!(resolved_entry.session_id, existing_entry.session_id);
assert_eq!(
resolved_entry.heddle_session_id.as_deref(),
Some(existing_session.id.as_str())
);
let (canonical_session, owns_session) = runtime
.reuse_canonical_actor_session(
&mut sessions,
CanonicalActorSessionRequest {
tentative_session: tentative_session.clone(),
tentative_owns_session: true,
entry: &resolved_entry,
probe: &probe,
attach: &mut attach,
},
)
.unwrap();
assert_eq!(canonical_session.id, existing_session.id);
assert!(!owns_session);
assert!(
attach
.precedence
.iter()
.any(|step| step.starts_with("post-create-native-actor-key:"))
);
assert_eq!(attach.winning_rule, "native-actor-key-post-create");
assert!(
!sessions
.get_session(&tentative_session.id)
.unwrap()
.unwrap()
.is_active()
);
}
#[test]
fn close_session_does_not_blame_preexisting_dirty_worktree() {
let (temp, repo) = init_repo();
std::fs::write(temp.path().join("preexisting.txt"), "already dirty\n").unwrap();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
let opened = runtime
.open_session(OpenSessionParams {
harness: Some("claude-code".to_string()),
provider: Some("anthropic".to_string()),
model: Some("claude-opus-4-7[1m]".to_string()),
..OpenSessionParams::default()
})
.unwrap();
let closed = runtime
.close_session(CloseSessionParams {
heddle_session_id: opened.heddle_session_id.clone(),
outcome: Some("completed".to_string()),
..CloseSessionParams::default()
})
.unwrap();
let report = runtime
.reports
.load(&opened.heddle_session_id)
.unwrap()
.unwrap();
assert!(
report
.worktree_changes_at_open
.iter()
.any(|change| change.path == "preexisting.txt")
);
assert!(
!closed
.changed_paths
.iter()
.any(|path| path == "preexisting.txt")
);
assert_eq!(closed.diff_summary.changed_file_count, 0);
}
#[test]
fn relay_claude_stop_captures_state_with_agent_attribution() {
let (temp, repo) = init_repo();
let repo_root = repo.root().to_path_buf();
std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
let _ = repo.snapshot(Some("seed".into()), None).unwrap();
std::fs::write(repo_root.join("seed.txt"), b"hello, heddle").unwrap();
drop(repo);
let fresh_repo = Repository::open(temp.path()).unwrap();
let user_config = UserConfig::default();
let mut runtime = HarnessBridgeRuntime::new(fresh_repo, user_config);
let payload = serde_json::json!({
"session_id": "claude-sess-123",
"transcript_path": "/tmp/claude/x.jsonl",
"model": {
"id": "claude-opus-4-7",
"display_name": "Claude Opus 4.7",
},
"message": "hook-driven capture test",
"hook_event_name": "Stop",
});
relay_claude(&mut runtime, "Stop", &payload).unwrap();
drop(runtime);
let verify = Repository::open(temp.path()).unwrap();
let head_id = verify.head().unwrap().expect("HEAD after Stop capture");
let state = verify
.store()
.get_state(&head_id)
.unwrap()
.expect("state for HEAD");
let agent = state.attribution.agent.expect("agent attribution on state");
assert_eq!(agent.provider, "anthropic");
assert_eq!(agent.model, "Claude Opus 4.7");
assert_eq!(
state.intent.as_deref(),
Some("hook-driven capture test"),
"intent should be pulled from payload message",
);
}
#[test]
fn relay_claude_stop_is_idempotent_when_clean() {
let (temp, repo) = init_repo();
let repo_root = repo.root().to_path_buf();
std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
let seed = repo.snapshot(Some("seed".into()), None).unwrap();
drop(repo);
let fresh_repo = Repository::open(temp.path()).unwrap();
let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
let payload = serde_json::json!({
"session_id": "claude-sess-clean",
"model": {"id": "claude-sonnet-4-6"},
});
relay_claude(&mut runtime, "Stop", &payload).unwrap();
drop(runtime);
let verify = Repository::open(temp.path()).unwrap();
let head_id = verify.head().unwrap().expect("HEAD preserved");
assert_eq!(
head_id, seed.change_id,
"no change expected when worktree is clean",
);
}
#[test]
fn relay_claude_pre_tool_use_ignores_non_file_tool() {
let (temp, repo) = init_repo();
drop(repo);
let fresh_repo = Repository::open(temp.path()).unwrap();
let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
let payload = serde_json::json!({
"session_id": "claude-sess-bash",
"tool_name": "Bash",
"tool_input": {"command": "ls"},
});
relay_claude(&mut runtime, "PreToolUse", &payload).unwrap();
}
#[test]
fn relay_claude_subagent_start_creates_child_entry_with_parent_key() {
let (temp, repo) = init_repo();
drop(repo);
let fresh_repo = Repository::open(temp.path()).unwrap();
let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
let payload = serde_json::json!({
"session_id": "parent-claude-sess",
"agent_id": "child-subagent-xyz",
"model": {"id": "claude-sonnet-4-6"},
});
relay_claude(&mut runtime, "SubagentStart", &payload).unwrap();
drop(runtime);
let verify = Repository::open(temp.path()).unwrap();
let registry = AgentRegistry::new(verify.heddle_dir());
let child = registry
.find_active_by_native_actor_key("claude-code:agent:child-subagent-xyz")
.unwrap()
.expect("subagent AgentEntry should exist after SubagentStart");
assert_eq!(
child.native_parent_actor_key.as_deref(),
Some("claude-code:session:parent-claude-sess"),
"subagent must carry parent session linkage",
);
assert_eq!(child.status, AgentStatus::Active);
}
#[test]
fn relay_claude_subagent_stop_marks_child_entry_complete() {
let (temp, repo) = init_repo();
let repo_root = repo.root().to_path_buf();
drop(repo);
let fresh = Repository::open(temp.path()).unwrap();
let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
let start_payload = serde_json::json!({
"session_id": "parent-sess",
"agent_id": "worker-1",
"model": {"id": "claude-sonnet-4-6"},
});
relay_claude(&mut runtime, "SubagentStart", &start_payload).unwrap();
drop(runtime);
std::fs::write(
repo_root.join("child-output.txt"),
b"subagent produced this",
)
.unwrap();
let fresh = Repository::open(temp.path()).unwrap();
let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
let stop_payload = serde_json::json!({
"session_id": "parent-sess",
"agent_id": "worker-1",
"model": {
"id": "claude-sonnet-4-6",
"display_name": "Claude Sonnet 4.6",
},
});
relay_claude(&mut runtime, "SubagentStop", &stop_payload).unwrap();
drop(runtime);
let verify = Repository::open(temp.path()).unwrap();
let registry = AgentRegistry::new(verify.heddle_dir());
let child = registry
.list()
.unwrap()
.into_iter()
.find(|e| e.native_actor_key.as_deref() == Some("claude-code:agent:worker-1"))
.expect("child entry should still exist");
assert_eq!(
child.status,
AgentStatus::Complete,
"SubagentStop should mark the child entry Complete",
);
}
#[test]
fn relay_claude_user_prompt_submit_rotates_segment() {
let (temp, repo) = init_repo();
drop(repo);
let fresh = Repository::open(temp.path()).unwrap();
let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
let session_payload = serde_json::json!({
"session_id": "claude-prompt-sess",
"model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
});
relay_claude(&mut runtime, "SessionStart", &session_payload).unwrap();
let sessions_before = SessionManager::new(runtime.repo.root())
.list_sessions(true)
.unwrap();
let initial_segments = sessions_before
.iter()
.find(|s| !s.segments.is_empty())
.map(|s| s.segments.len())
.unwrap_or(0);
let prompt_payload = serde_json::json!({
"session_id": "claude-prompt-sess",
"model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
"prompt": "write a new feature",
});
relay_claude(&mut runtime, "UserPromptSubmit", &prompt_payload).unwrap();
drop(runtime);
let verify = Repository::open(temp.path()).unwrap();
let sessions_after = SessionManager::new(verify.root())
.list_sessions(true)
.unwrap();
let rotated = sessions_after
.iter()
.any(|s| s.segments.len() > initial_segments);
assert!(
rotated,
"UserPromptSubmit must add at least one segment beyond the SessionStart baseline \
(initial={initial_segments}, sessions_after={:?})",
sessions_after
.iter()
.map(|s| (s.id.clone(), s.segments.len()))
.collect::<Vec<_>>(),
);
}
}