1use std::{
3 collections::{BTreeMap, BTreeSet},
4 fs::{self, OpenOptions},
5 io::{BufRead, BufReader, BufWriter, Write},
6 path::{Path, PathBuf},
7};
8
9use anyhow::{Result, anyhow};
10use base64::Engine as _;
11use chrono::Utc;
12use objects::{
13 fs_atomic::write_file_atomic,
14 object::{
15 ChangeId, ContentHash, DiffKind, NativeToolCallRefV1, Session, ThreadName,
16 TimelineBranchId, TimelineLabel, TimelineOperationBodyV1, TimelineOperationEnvelope,
17 TimelineStepId, TimelineToolCallStatus, TimelineToolPayloadMetadata, ToolCallFinishedV1,
18 ToolCallStartedV1, Tree,
19 },
20 store::{AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ObjectStore},
21};
22use oplog::OpLogRecorder;
23use refs::Head;
24use repo::{
25 Repository, SessionManager, Thread, ThreadFreshness, ThreadIntegrationPolicy, ThreadManager,
26 ThreadMode, ThreadState, TimelineStore, TimelineView,
27};
28use serde::{Deserialize, Serialize};
29use serde_json::Value;
30use wire::{
31 HarnessIdentity, ProgressCheckpoint, SessionDiffSummary, SessionReportEnvelope,
32 TranscriptAttachmentRef, UsageTotals, WorktreeChangeBaseline,
33};
34
35mod claude_hook;
36mod probe;
37
38use self::probe::{HarnessProbeInput, HarnessProbeResult, probe_harness_actor};
39use crate::{
40 cli::{
41 Cli,
42 commands::{
43 snapshot::{
44 SnapshotAgentOverrides, create_snapshot, summarize_confidence,
45 summarize_verification,
46 },
47 worktree_cmd::helpers::{prepare_worktree_target, write_isolated_checkout},
48 },
49 style, worktree_status_options,
50 },
51 config::{
52 HarnessMode, HarnessTranscriptMode, HarnessTransport, UserConfig, UserHarnessOverride,
53 UserHarnessRootThreadPolicy, UserHarnessSubagentThreadPolicy, UserThreadWorkspaceMode,
54 },
55};
56
57pub(crate) fn probe_current_process_harness(
58 repo: &Repository,
59 current_provider: Option<String>,
60 current_model: Option<String>,
61 current_policy: Option<String>,
62) -> Result<HarnessProbeResult> {
63 probe_harness_actor(&HarnessProbeInput {
64 argv: detected_harness_argv().or_else(|| Some(std::env::args().collect())),
65 env_hints: harness_env_hints(),
66 explicit_harness: None,
67 explicit_provider: None,
68 explicit_model: None,
69 explicit_thinking_level: None,
70 explicit_policy: None,
71 probe_metadata: BTreeMap::new(),
72 current_provider,
73 current_model,
74 current_policy,
75 repo_root: repo.root().display().to_string(),
76 })
77}
78
79fn harness_env_hints() -> BTreeMap<String, String> {
80 std::env::vars()
81 .filter(|(key, value)| {
82 !value.trim().is_empty()
83 && (key.starts_with("HEDDLE_AGENT_")
84 || key.starts_with("CODEX_")
85 || key.starts_with("CLAUDE")
86 || key.starts_with("ANTHROPIC_")
87 || key.starts_with("OPENAI_")
88 || key.starts_with("OPENCODE_")
89 || key.starts_with("AIDER_")
90 || matches!(
91 key.as_str(),
92 "MODEL" | "REASONING_EFFORT" | "THINKING_LEVEL"
93 ))
94 })
95 .collect()
96}
97
98fn detected_harness_argv() -> Option<Vec<String>> {
99 detected_harness_argv_impl()
100}
101
102#[cfg(target_os = "linux")]
103fn detected_harness_argv_impl() -> Option<Vec<String>> {
104 let mut pid = std::process::id();
105 for _ in 0..8 {
106 let stat = fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
107 let ppid = stat.split_whitespace().nth(3)?.parse::<u32>().ok()?;
108 if ppid == 0 || ppid == pid {
109 return None;
110 }
111 pid = ppid;
112 let raw = fs::read(format!("/proc/{pid}/cmdline")).ok()?;
113 let argv = raw
114 .split(|byte| *byte == 0)
115 .filter(|part| !part.is_empty())
116 .map(|part| String::from_utf8_lossy(part).to_string())
117 .collect::<Vec<_>>();
118 let program = argv.first().map(|arg| arg.to_ascii_lowercase())?;
119 if ["codex", "claude", "opencode", "aider"]
120 .iter()
121 .any(|needle| program.contains(needle))
122 {
123 return Some(argv);
124 }
125 }
126 None
127}
128
129#[cfg(not(target_os = "linux"))]
130fn detected_harness_argv_impl() -> Option<Vec<String>> {
131 None
132}
133
134pub fn cmd_harness_bridge(cli: &Cli) -> Result<()> {
135 let repo = cli.open_repo()?;
136 let mut runtime = init_harness_runtime(&repo)?;
137
138 let stdin = std::io::stdin();
139 let stdout = std::io::stdout();
140 let reader = BufReader::new(stdin.lock());
141 let mut writer = BufWriter::new(stdout.lock());
142
143 for line in reader.lines() {
144 let line = line?;
145 if line.trim().is_empty() {
146 continue;
147 }
148 let response = match serde_json::from_str::<BridgeRequest>(&line) {
149 Ok(request) => runtime.handle_request(request),
150 Err(err) => BridgeResponse::error(
151 None,
152 "invalid_request",
153 format!("failed to parse request: {err}"),
154 ),
155 };
156 serde_json::to_writer(&mut writer, &response)?;
157 writer.write_all(b"\n")?;
158 writer.flush()?;
159 }
160
161 Ok(())
162}
163
164pub(crate) fn relay_harness_event(
165 repo: &Repository,
166 harness: &str,
167 event: &str,
168 payload: &str,
169) -> Result<()> {
170 let mut runtime = init_harness_runtime(repo)?;
171 let (json, warning) = parse_relay_payload(payload);
172 if let Some(warning) = warning {
173 eprintln!("{}", style::warn(&warning));
174 }
175 match harness {
176 "codex" => relay_codex(&mut runtime, event, &json),
177 "claude-code" => relay_claude(&mut runtime, event, &json),
178 "opencode" => relay_opencode(&mut runtime, event, &json),
179 other => Err(anyhow!("unsupported harness relay: {other}")),
180 }
181}
182
183fn init_harness_runtime(repo: &Repository) -> Result<HarnessBridgeRuntime> {
184 let (user_config, warning) = load_harness_user_config(UserConfig::default_path());
185 if let Some(warning) = warning {
186 eprintln!("{}", style::warn(&warning));
187 }
188 Ok(HarnessBridgeRuntime::new(
189 Repository::open(repo.root())?,
190 user_config,
191 ))
192}
193
194fn load_harness_user_config(default_path: Option<PathBuf>) -> (UserConfig, Option<String>) {
195 let Some(path) = default_path else {
196 return (UserConfig::default(), None);
197 };
198 match UserConfig::load(&path) {
199 Ok(config) => (config, None),
200 Err(err) if is_not_found(&err) => (UserConfig::default(), None),
201 Err(err) => {
202 let warning = format!(
203 "warning: failed to load user config from {}: {err}; continuing with defaults",
204 path.display()
205 );
206 (UserConfig::default(), Some(warning))
207 }
208 }
209}
210
211fn is_not_found(err: &anyhow::Error) -> bool {
212 err.downcast_ref::<std::io::Error>()
213 .is_some_and(|io| io.kind() == std::io::ErrorKind::NotFound)
214}
215
216fn parse_relay_payload(payload: &str) -> (Value, Option<String>) {
217 if payload.trim().is_empty() {
218 return (Value::Null, None);
219 }
220 match serde_json::from_str::<Value>(payload) {
221 Ok(value) => (value, None),
222 Err(err) => (
223 Value::Null,
224 Some(format!(
225 "warning: failed to parse harness relay payload as JSON: {err}; continuing with null payload"
226 )),
227 ),
228 }
229}
230
231struct HarnessBridgeRuntime {
232 repo: Repository,
233 user_config: UserConfig,
234 reports: SessionReportStore,
235}
236
237struct RegistryEntryRequest<'a> {
238 heddle_session_id: &'a str,
239 thread_name: Option<&'a str>,
240 thread_id: Option<&'a str>,
241 identity: &'a ResolvedIdentity,
242 probe: &'a HarnessProbeResult,
243 attach: &'a ResolvedAttachment,
244 client_instance_id: Option<&'a str>,
245 requested_entry: Option<&'a AgentEntry>,
246}
247
248struct CanonicalActorSessionRequest<'a> {
249 tentative_session: Session,
250 tentative_owns_session: bool,
251 entry: &'a AgentEntry,
252 probe: &'a HarnessProbeResult,
253 attach: &'a mut ResolvedAttachment,
254}
255
256struct AttachmentResolutionInput<'a> {
257 requested_entry: Option<&'a AgentEntry>,
258 explicit_heddle_session_id: Option<&'a str>,
259 client_instance_id: Option<&'a str>,
260 probe: &'a HarnessProbeResult,
261 token_claims: Option<&'a TokenClaims>,
262}
263
264fn relay_codex(runtime: &mut HarnessBridgeRuntime, _event: &str, payload: &Value) -> Result<()> {
265 let metadata = map_from_pairs([
266 (
267 "client_name",
268 value_string(payload, &["client"]).or_else(|| value_string(payload, &["client_name"])),
269 ),
270 ("model", value_string(payload, &["model"])),
271 (
272 "model_provider",
273 value_string(payload, &["model_provider"])
274 .or_else(|| value_string(payload, &["provider"])),
275 ),
276 (
277 "model_reasoning_effort",
278 value_string(payload, &["reasoning_effort"]),
279 ),
280 ]);
281 let opened = runtime.open_session(OpenSessionParams {
282 harness: Some("codex".to_string()),
283 summary: value_string(payload, &["message"]),
284 probe_metadata: metadata,
285 ..OpenSessionParams::default()
286 })?;
287 runtime.update_progress(UpdateProgressParams {
288 heddle_session_id: opened.heddle_session_id,
289 summary: value_string(payload, &["message"]),
290 harness: Some("codex".to_string()),
291 ..UpdateProgressParams::default()
292 })?;
293 Ok(())
294}
295
296fn relay_claude(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
297 let metadata = map_from_pairs([
298 ("session_id", value_string(payload, &["session_id"])),
299 ("agent_id", value_string(payload, &["agent_id"])),
300 ("session_name", value_string(payload, &["session_name"])),
301 (
302 "transcript_path",
303 value_string(payload, &["transcript_path"]),
304 ),
305 (
306 "model",
307 value_string(payload, &["model", "id"]).or_else(|| value_string(payload, &["model"])),
308 ),
309 (
310 "model_display_name",
311 value_string(payload, &["model", "display_name"]),
312 ),
313 ("effort", value_string(payload, &["effort"])),
314 ("hook_event", Some(event.to_string())),
315 (
316 "status_line",
317 (event == "StatusLine").then(|| "1".to_string()),
318 ),
319 (
320 "touched_paths",
321 value_array_join(payload, &["tool_response", "filePaths"])
322 .or_else(|| value_string(payload, &["file_path"])),
323 ),
324 (
325 "input_tokens",
326 value_u64_string(payload, &["context_window", "total_input_tokens"]),
327 ),
328 (
329 "output_tokens",
330 value_u64_string(payload, &["context_window", "total_output_tokens"]),
331 ),
332 (
333 "cost_micros_usd",
334 value_cost_micros(payload, &["cost", "total_cost_usd"]),
335 ),
336 ]);
337 let opened = runtime.open_session(OpenSessionParams {
338 harness: Some("claude-code".to_string()),
339 model: value_string(payload, &["model", "display_name"])
340 .or_else(|| value_string(payload, &["model", "id"]))
341 .or_else(|| value_string(payload, &["model"])),
342 summary: value_string(payload, &["message"]).or_else(|| value_string(payload, &["reason"])),
343 probe_metadata: metadata.clone(),
344 ..OpenSessionParams::default()
345 })?;
346 match event {
347 "SessionEnd" => {
348 runtime.close_session(CloseSessionParams {
349 heddle_session_id: opened.heddle_session_id,
350 summary: value_string(payload, &["reason"])
351 .or_else(|| value_string(payload, &["stop_hook_active"])),
352 outcome: Some("completed".to_string()),
353 ..CloseSessionParams::default()
354 })?;
355 }
356 "StatusLine" => {
357 runtime.update_progress(UpdateProgressParams {
358 heddle_session_id: opened.heddle_session_id.clone(),
359 harness: Some("claude-code".to_string()),
360 status: Some("StatusLine".to_string()),
361 message: value_string(payload, &["session_name"])
362 .or_else(|| value_string(payload, &["cwd"]))
363 .or_else(|| value_string(payload, &["workspace", "current_dir"])),
364 probe_metadata: metadata.clone(),
365 ..UpdateProgressParams::default()
366 })?;
367 runtime.record_usage(RecordUsageParams {
368 heddle_session_id: opened.heddle_session_id,
369 input_tokens: value_u64(payload, &["context_window", "total_input_tokens"]),
370 output_tokens: value_u64(payload, &["context_window", "total_output_tokens"]),
371 reasoning_tokens: value_u64(payload, &["context_window", "total_reasoning_tokens"]),
372 cache_creation_tokens: None,
373 cache_read_tokens: None,
374 tool_calls: None,
375 cost_micros_usd: value_cost_micros_u64(payload, &["cost", "total_cost_usd"]),
376 })?;
377 }
378 "Stop" => {
379 runtime.update_progress(UpdateProgressParams {
380 heddle_session_id: opened.heddle_session_id,
381 harness: Some("claude-code".to_string()),
382 status: Some("Stop".to_string()),
383 message: value_string(payload, &["message"])
384 .or_else(|| value_string(payload, &["result"]))
385 .or_else(|| value_string(payload, &["stop_reason"])),
386 probe_metadata: metadata,
387 ..UpdateProgressParams::default()
388 })?;
389 if let Err(err) = claude_hook::handle_stop_capture(
390 &runtime.repo,
391 &runtime.user_config,
392 payload,
393 "Claude Code turn",
394 ) {
395 tracing::warn!(?err, "heddle Stop hook capture failed");
396 }
397 }
398 "SubagentStop" => {
399 runtime.update_progress(UpdateProgressParams {
400 heddle_session_id: opened.heddle_session_id,
401 harness: Some("claude-code".to_string()),
402 status: Some("SubagentStop".to_string()),
403 touched_paths: csv_from_value(metadata.get("touched_paths")),
404 probe_metadata: metadata,
405 ..UpdateProgressParams::default()
406 })?;
407 if let Err(err) = claude_hook::handle_stop_capture(
408 &runtime.repo,
409 &runtime.user_config,
410 payload,
411 "Claude Code subagent turn",
412 ) {
413 tracing::warn!(?err, "heddle SubagentStop hook capture failed");
414 }
415 if let Err(err) = claude_hook::mark_subagent_complete(&runtime.repo, payload) {
416 tracing::debug!(?err, "heddle SubagentStop mark-complete failed");
417 }
418 }
419 "SubagentStart" => {
420 runtime.update_progress(UpdateProgressParams {
427 heddle_session_id: opened.heddle_session_id,
428 harness: Some("claude-code".to_string()),
429 status: Some("SubagentStart".to_string()),
430 touched_paths: csv_from_value(metadata.get("touched_paths")),
431 probe_metadata: metadata,
432 ..UpdateProgressParams::default()
433 })?;
434 }
435 "UserPromptSubmit" => {
436 runtime.update_progress(UpdateProgressParams {
437 heddle_session_id: opened.heddle_session_id.clone(),
438 harness: Some("claude-code".to_string()),
439 status: Some("UserPromptSubmit".to_string()),
440 touched_paths: csv_from_value(metadata.get("touched_paths")),
441 probe_metadata: metadata,
442 ..UpdateProgressParams::default()
443 })?;
444 if let Err(err) = claude_hook::handle_user_prompt_segment_rotate(
445 &runtime.repo,
446 &opened.heddle_session_id,
447 payload,
448 ) {
449 tracing::debug!(?err, "heddle UserPromptSubmit segment rotation failed");
450 }
451 }
452 "PreToolUse" => {
453 runtime.update_progress(UpdateProgressParams {
454 heddle_session_id: opened.heddle_session_id,
455 harness: Some("claude-code".to_string()),
456 status: Some("PreToolUse".to_string()),
457 touched_paths: csv_from_value(metadata.get("touched_paths")),
458 probe_metadata: metadata,
459 ..UpdateProgressParams::default()
460 })?;
461 if let Err(err) = claude_hook::handle_pre_tool_use(&runtime.repo, payload) {
462 tracing::debug!(?err, "heddle PreToolUse context inject skipped");
463 }
464 }
465 _ => {
466 runtime.update_progress(UpdateProgressParams {
467 heddle_session_id: opened.heddle_session_id,
468 harness: Some("claude-code".to_string()),
469 status: Some(event.to_string()),
470 touched_paths: csv_from_value(metadata.get("touched_paths")),
471 probe_metadata: metadata,
472 ..UpdateProgressParams::default()
473 })?;
474 }
475 }
476 Ok(())
477}
478
479fn relay_opencode(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
480 let metadata = map_from_pairs([
481 (
482 "session_id",
483 value_string(payload, &["sessionID"])
484 .or_else(|| value_string(payload, &["session_id"])),
485 ),
486 (
487 "parent_id",
488 value_string(payload, &["parentID"]).or_else(|| value_string(payload, &["parent_id"])),
489 ),
490 (
491 "client_name",
492 value_string(payload, &["client"]).or_else(|| std::env::var("OPENCODE_CLIENT").ok()),
493 ),
494 ("model", value_string(payload, &["model"])),
495 ("provider", value_string(payload, &["provider"])),
496 ("hook_event", Some(event.to_string())),
497 (
498 "touched_paths",
499 value_string(payload, &["file", "path"]).or_else(|| value_string(payload, &["path"])),
500 ),
501 ]);
502 let opened = runtime.open_session(OpenSessionParams {
503 harness: Some("opencode".to_string()),
504 model: value_string(payload, &["model"]),
505 provider: value_string(payload, &["provider"]),
506 probe_metadata: metadata.clone(),
507 ..OpenSessionParams::default()
508 })?;
509 let session_id = opened.heddle_session_id.clone();
510 runtime.update_progress(UpdateProgressParams {
511 heddle_session_id: session_id.clone(),
512 harness: Some("opencode".to_string()),
513 status: Some(event.to_string()),
514 touched_paths: csv_from_value(metadata.get("touched_paths")),
515 probe_metadata: metadata,
516 ..UpdateProgressParams::default()
517 })?;
518 if let Err(err) = record_opencode_timeline_event(runtime, event, payload, &opened) {
519 tracing::debug!(?err, event, "heddle OpenCode timeline recording skipped");
520 }
521 Ok(())
522}
523
524#[derive(Clone, Copy, Debug, PartialEq, Eq)]
525enum TimelineToolEvent {
526 Started,
527 Finished,
528}
529
530trait HarnessTimelineExtractor {
531 fn timeline_event(&self, event: &str) -> Option<TimelineToolEvent>;
532 fn native_tool_call(&self, payload: &Value) -> Option<NativeToolCallRefV1>;
533 fn tool_name(&self, payload: &Value) -> String;
534 fn tool_status(&self, payload: &Value) -> TimelineToolCallStatus;
535 fn payload_metadata(&self, event: &str, payload: &Value)
536 -> Result<TimelineToolPayloadMetadata>;
537 fn touched_paths(&self, payload: &Value) -> Vec<String>;
538 fn capture_intent(&self, native: &NativeToolCallRefV1, payload: &Value) -> String;
539
540 fn timeline_thread(
541 &self,
542 runtime: &HarnessBridgeRuntime,
543 opened: &OpenSessionResult,
544 ) -> Result<String> {
545 if let Some(report) = runtime.reports.load(&opened.heddle_session_id)?
546 && let Some(thread) = report.thread
547 {
548 return Ok(thread);
549 }
550 match runtime.repo.head_ref()? {
551 Head::Attached { thread } => Ok(thread.to_string()),
552 Head::Detached { .. } => Ok("main".to_string()),
553 }
554 }
555
556 fn stable_step_id(&self, native: &NativeToolCallRefV1) -> TimelineStepId {
557 let key = format!(
558 "{}\0{}\0{}\0{}",
559 native.harness,
560 native.session_id.as_deref().unwrap_or(""),
561 native.message_id.as_deref().unwrap_or(""),
562 native.tool_call_id
563 );
564 let hash =
565 ContentHash::compute_typed("timeline-native-tool-call-v1", key.as_bytes()).to_hex();
566 TimelineStepId::new(format!("tls-{}", &hash[..24]))
567 }
568
569 fn started_labels(&self, _payload: &Value) -> Vec<TimelineLabel> {
570 vec![TimelineLabel::ExternalSideEffectsUnknown]
571 }
572
573 fn finished_labels(&self, changed: bool, _payload: &Value) -> Vec<TimelineLabel> {
574 if changed {
575 vec![
576 TimelineLabel::RepoReversible,
577 TimelineLabel::ExternalSideEffectsUnknown,
578 ]
579 } else {
580 vec![TimelineLabel::ExternalSideEffectsUnknown]
581 }
582 }
583}
584
585struct OpenCodeTimelineExtractor;
586
587impl HarnessTimelineExtractor for OpenCodeTimelineExtractor {
588 fn timeline_event(&self, event: &str) -> Option<TimelineToolEvent> {
589 match event {
590 "tool.execute.before" => Some(TimelineToolEvent::Started),
591 "tool.execute.after" => Some(TimelineToolEvent::Finished),
592 _ => None,
593 }
594 }
595
596 fn native_tool_call(&self, payload: &Value) -> Option<NativeToolCallRefV1> {
597 opencode_native_tool_call(payload)
598 }
599
600 fn tool_name(&self, payload: &Value) -> String {
601 opencode_tool_name(payload)
602 }
603
604 fn tool_status(&self, payload: &Value) -> TimelineToolCallStatus {
605 opencode_tool_status(payload)
606 }
607
608 fn payload_metadata(
609 &self,
610 event: &str,
611 payload: &Value,
612 ) -> Result<TimelineToolPayloadMetadata> {
613 opencode_payload_metadata(event, payload)
614 }
615
616 fn touched_paths(&self, payload: &Value) -> Vec<String> {
617 opencode_touched_paths(payload)
618 }
619
620 fn capture_intent(&self, native: &NativeToolCallRefV1, payload: &Value) -> String {
621 format!(
622 "OpenCode {} tool call {}",
623 self.tool_name(payload),
624 native.tool_call_id
625 )
626 }
627}
628
629fn record_opencode_timeline_event(
630 runtime: &mut HarnessBridgeRuntime,
631 event: &str,
632 payload: &Value,
633 opened: &OpenSessionResult,
634) -> Result<()> {
635 record_timeline_event(runtime, event, payload, opened, &OpenCodeTimelineExtractor)
636}
637
638fn record_timeline_event<E: HarnessTimelineExtractor>(
639 runtime: &mut HarnessBridgeRuntime,
640 event: &str,
641 payload: &Value,
642 opened: &OpenSessionResult,
643 extractor: &E,
644) -> Result<()> {
645 match extractor.timeline_event(event) {
646 Some(TimelineToolEvent::Started) => {
647 record_timeline_tool_started(runtime, event, payload, opened, extractor)
648 }
649 Some(TimelineToolEvent::Finished) => {
650 record_timeline_tool_finished(runtime, event, payload, opened, extractor)
651 }
652 None => Ok(()),
653 }
654}
655
656fn record_timeline_tool_started<E: HarnessTimelineExtractor>(
657 runtime: &mut HarnessBridgeRuntime,
658 event: &str,
659 payload: &Value,
660 opened: &OpenSessionResult,
661 extractor: &E,
662) -> Result<()> {
663 let Some(native) = extractor.native_tool_call(payload) else {
664 return Ok(());
665 };
666 let Some(before_state) = current_change_id(&runtime.repo)? else {
667 return Ok(());
668 };
669 let thread = extractor.timeline_thread(runtime, opened)?;
670 let store = TimelineStore::open(runtime.repo.heddle_dir())?;
671 let _record_guard = store.lock_recording(&thread)?;
672 let view = TimelineView::rebuild(&store)?;
673 let step_id = extractor.stable_step_id(&native);
674 let (branch_id, parent_step_id) = timeline_position_for_new_tool_step(&view, &thread, &step_id);
675 let envelope = TimelineOperationEnvelope::new(
676 TimelineOperationBodyV1::ToolCallStarted(ToolCallStartedV1 {
677 thread,
678 step_id,
679 branch_id,
680 parent_step_id,
681 native,
682 tool_name: extractor.tool_name(payload),
683 before_state,
684 payload: Some(extractor.payload_metadata(event, payload)?),
685 started_at_ms: Utc::now().timestamp_millis(),
686 }),
687 extractor.started_labels(payload),
688 );
689 store.write_operation(&envelope)?;
690 Ok(())
691}
692
693fn record_timeline_tool_finished<E: HarnessTimelineExtractor>(
694 runtime: &mut HarnessBridgeRuntime,
695 event: &str,
696 payload: &Value,
697 opened: &OpenSessionResult,
698 extractor: &E,
699) -> Result<()> {
700 let Some(native) = extractor.native_tool_call(payload) else {
701 return Ok(());
702 };
703 let Some(fallback_state) = current_change_id(&runtime.repo)? else {
704 return Ok(());
705 };
706 let thread = extractor.timeline_thread(runtime, opened)?;
707 let store = TimelineStore::open(runtime.repo.heddle_dir())?;
708 let _record_guard = store.lock_recording(&thread)?;
709 let before_view = TimelineView::rebuild(&store)?;
710 let step_id = extractor.stable_step_id(&native);
711 let (branch_id, _) = timeline_position_for_new_tool_step(&before_view, &thread, &step_id);
712 let before_state = before_view
713 .step(&thread, &step_id)
714 .and_then(|step| step.before_state)
715 .unwrap_or(fallback_state);
716 let has_worktree_changes_before_capture = !collect_worktree_changes(&runtime.repo)?.is_empty();
717 let mut capture_failed = false;
718 let capture_state = if !has_worktree_changes_before_capture {
719 None
720 } else {
721 let intent = extractor.capture_intent(&native, payload);
722 match create_snapshot(
723 &runtime.repo,
724 &runtime.user_config,
725 Some(intent),
726 None,
727 SnapshotAgentOverrides {
728 provider: opened.provider.clone(),
729 model: opened.model.clone(),
730 session: native.session_id.clone(),
731 segment: None,
732 policy: None,
733 no_policy: false,
734 no_agent: false,
735 },
736 ) {
737 Ok(_) => runtime.repo.head()?,
738 Err(err) => {
739 capture_failed = true;
740 tracing::warn!(?err, "heddle timeline tool capture failed");
741 None
742 }
743 }
744 };
745 let after_state = current_change_id(&runtime.repo)?.unwrap_or(fallback_state);
746 let mut touched_paths = extractor.touched_paths(payload);
747 merge_string_vec(
748 &mut touched_paths,
749 changed_paths_between_states(&runtime.repo, before_state, after_state)?,
750 );
751 let changed = before_state != after_state;
752 let mut labels = extractor.finished_labels(changed, payload);
753 if capture_failed {
754 merge_timeline_labels(&mut labels, vec![TimelineLabel::CaptureFailed]);
755 }
756 let envelope = TimelineOperationEnvelope::new(
757 TimelineOperationBodyV1::ToolCallFinished(ToolCallFinishedV1 {
758 thread,
759 step_id,
760 branch_id,
761 native,
762 status: extractor.tool_status(payload),
763 before_state,
764 after_state,
765 capture_state,
766 capture_oplog_batch_id: None,
767 changed,
768 touched_paths,
769 payload: Some(extractor.payload_metadata(event, payload)?),
770 finished_at_ms: Utc::now().timestamp_millis(),
771 }),
772 labels,
773 );
774 store.write_operation(&envelope)?;
775 Ok(())
776}
777
778fn opencode_native_tool_call(payload: &Value) -> Option<NativeToolCallRefV1> {
779 let tool_call_id = first_value_string(
780 payload,
781 &[
782 &["toolCallID"],
783 &["tool_call_id"],
784 &["toolCallId"],
785 &["callID"],
786 &["call_id"],
787 &["tool", "callID"],
788 &["tool", "call_id"],
789 &["tool", "id"],
790 &["toolCall", "id"],
791 &["tool_call", "id"],
792 &["id"],
793 ],
794 )?;
795 Some(NativeToolCallRefV1 {
796 harness: "opencode".to_string(),
797 session_id: value_string(payload, &["sessionID"])
798 .or_else(|| value_string(payload, &["session_id"])),
799 message_id: value_string(payload, &["messageID"])
800 .or_else(|| value_string(payload, &["message_id"]))
801 .or_else(|| value_string(payload, &["message", "id"])),
802 tool_call_id,
803 })
804}
805
806fn timeline_position_for_new_tool_step(
807 view: &TimelineView,
808 thread: &str,
809 step_id: &TimelineStepId,
810) -> (TimelineBranchId, Option<TimelineStepId>) {
811 let branch_id = view
812 .status(thread)
813 .and_then(|status| status.current_branch_id.clone())
814 .unwrap_or_else(|| TimelineBranchId::new("tlb-main"));
815 let parent_step_id = view
816 .status(thread)
817 .and_then(|status| status.current_step_id.clone())
818 .filter(|current| current != step_id);
819 (branch_id, parent_step_id)
820}
821
822fn current_change_id(repo: &Repository) -> Result<Option<ChangeId>> {
823 Ok(repo
824 .current_state()?
825 .map(|state| state.change_id)
826 .or(repo.head()?))
827}
828
829fn opencode_tool_name(payload: &Value) -> String {
830 first_value_string(
831 payload,
832 &[
833 &["tool", "name"],
834 &["toolName"],
835 &["tool_name"],
836 &["tool"],
837 &["name"],
838 ],
839 )
840 .unwrap_or_else(|| "tool".to_string())
841}
842
843fn opencode_tool_status(payload: &Value) -> TimelineToolCallStatus {
844 let status = first_value_string(
845 payload,
846 &[
847 &["status"],
848 &["tool", "status"],
849 &["result", "status"],
850 &["output", "status"],
851 ],
852 )
853 .unwrap_or_default()
854 .to_ascii_lowercase();
855 if status.contains("cancel") {
856 TimelineToolCallStatus::Cancelled
857 } else if status.contains("fail")
858 || status.contains("error")
859 || payload.get("error").is_some()
860 || payload.get("exception").is_some()
861 {
862 TimelineToolCallStatus::Failed
863 } else {
864 TimelineToolCallStatus::Succeeded
865 }
866}
867
868fn opencode_payload_metadata(event: &str, payload: &Value) -> Result<TimelineToolPayloadMetadata> {
869 let tool_name = opencode_tool_name(payload);
870 let tool_call_id = opencode_native_tool_call(payload)
871 .map(|native| native.tool_call_id)
872 .unwrap_or_default();
873 let raw = serde_json::to_vec(payload)?;
874 let hash = ContentHash::compute_typed("timeline-tool-payload", &raw);
875 let summary = if tool_call_id.is_empty() {
876 format!("OpenCode {event}: {tool_name}")
877 } else {
878 format!("OpenCode {event}: {tool_name} ({tool_call_id})")
879 };
880 Ok(TimelineToolPayloadMetadata {
881 summary: Some(summary),
882 hash: Some(hash),
883 })
884}
885
886fn opencode_touched_paths(payload: &Value) -> Vec<String> {
887 let mut paths = Vec::new();
888 for path in [
889 value_string(payload, &["file", "path"]),
890 value_string(payload, &["path"]),
891 value_string(payload, &["tool", "path"]),
892 value_string(payload, &["tool", "input", "file_path"]),
893 value_string(payload, &["input", "file_path"]),
894 ]
895 .into_iter()
896 .flatten()
897 {
898 if !path.trim().is_empty() && !paths.contains(&path) {
899 paths.push(path);
900 }
901 }
902 for value_path in [
903 &["paths"][..],
904 &["files"][..],
905 &["tool", "input", "paths"][..],
906 &["input", "paths"][..],
907 ] {
908 if let Some(items) = value_string_array(payload, value_path) {
909 merge_string_vec(&mut paths, items);
910 }
911 }
912 paths
913}
914
915fn first_value_string(value: &Value, paths: &[&[&str]]) -> Option<String> {
916 paths.iter().find_map(|path| value_string(value, path))
917}
918
919fn value_string_array(value: &Value, path: &[&str]) -> Option<Vec<String>> {
920 let mut current = value;
921 for segment in path {
922 current = current.get(*segment)?;
923 }
924 current.as_array().map(|items| {
925 items
926 .iter()
927 .filter_map(|item| item.as_str().map(ToString::to_string))
928 .collect()
929 })
930}
931
932fn merge_string_vec(target: &mut Vec<String>, incoming: Vec<String>) {
933 for item in incoming {
934 if !item.trim().is_empty() && !target.contains(&item) {
935 target.push(item);
936 }
937 }
938}
939
940fn merge_timeline_labels(target: &mut Vec<TimelineLabel>, incoming: Vec<TimelineLabel>) {
941 for label in incoming {
942 if !target.contains(&label) {
943 target.push(label);
944 }
945 }
946}
947
948fn value_string(value: &Value, path: &[&str]) -> Option<String> {
949 let mut current = value;
950 for segment in path {
951 current = current.get(*segment)?;
952 }
953 match current {
954 Value::String(s) => Some(s.clone()),
955 Value::Bool(v) => Some(v.to_string()),
956 Value::Number(v) => Some(v.to_string()),
957 _ => None,
958 }
959}
960
961fn value_array_join(value: &Value, path: &[&str]) -> Option<String> {
962 let mut current = value;
963 for segment in path {
964 current = current.get(*segment)?;
965 }
966 current.as_array().map(|items| {
967 items
968 .iter()
969 .filter_map(|item| item.as_str().map(ToString::to_string))
970 .collect::<Vec<_>>()
971 .join(",")
972 })
973}
974
975fn value_u64_string(value: &Value, path: &[&str]) -> Option<String> {
976 let mut current = value;
977 for segment in path {
978 current = current.get(*segment)?;
979 }
980 current.as_u64().map(|v| v.to_string())
981}
982
983fn value_u64(value: &Value, path: &[&str]) -> Option<u64> {
984 let mut current = value;
985 for segment in path {
986 current = current.get(*segment)?;
987 }
988 current.as_u64()
989}
990
991fn value_cost_micros(value: &Value, path: &[&str]) -> Option<String> {
992 let mut current = value;
993 for segment in path {
994 current = current.get(*segment)?;
995 }
996 current
997 .as_f64()
998 .map(|v| ((v * 1_000_000.0).round() as u64).to_string())
999}
1000
1001fn value_cost_micros_u64(value: &Value, path: &[&str]) -> Option<u64> {
1002 let mut current = value;
1003 for segment in path {
1004 current = current.get(*segment)?;
1005 }
1006 current.as_f64().map(|v| (v * 1_000_000.0).round() as u64)
1007}
1008
1009fn map_from_pairs<const N: usize>(pairs: [(&str, Option<String>); N]) -> BTreeMap<String, String> {
1010 pairs
1011 .into_iter()
1012 .filter_map(|(key, value)| value.map(|value| (key.to_string(), value)))
1013 .collect()
1014}
1015
1016fn csv_from_value(value: Option<&String>) -> Vec<String> {
1017 value
1018 .map(|value| {
1019 value
1020 .split(',')
1021 .map(|item| item.trim().to_string())
1022 .filter(|item| !item.is_empty())
1023 .collect()
1024 })
1025 .unwrap_or_default()
1026}
1027
1028impl HarnessBridgeRuntime {
1029 fn new(repo: Repository, user_config: UserConfig) -> Self {
1030 let reports = SessionReportStore::new(repo.root());
1031 Self {
1032 repo,
1033 user_config,
1034 reports,
1035 }
1036 }
1037
1038 fn handle_request(&mut self, request: BridgeRequest) -> BridgeResponse {
1039 let response = match request.method.as_str() {
1040 "open_session" => self
1041 .decode_params::<OpenSessionParams>(request.params)
1042 .and_then(|params| self.open_session(params))
1043 .and_then(to_json_value),
1044 "update_progress" => self
1045 .decode_params::<UpdateProgressParams>(request.params)
1046 .and_then(|params| self.update_progress(params))
1047 .and_then(to_json_value),
1048 "record_usage" => self
1049 .decode_params::<RecordUsageParams>(request.params)
1050 .and_then(|params| self.record_usage(params))
1051 .and_then(to_json_value),
1052 "record_touched_paths" => self
1053 .decode_params::<RecordTouchedPathsParams>(request.params)
1054 .and_then(|params| self.record_touched_paths(params))
1055 .and_then(to_json_value),
1056 "close_session" => self
1057 .decode_params::<CloseSessionParams>(request.params)
1058 .and_then(|params| self.close_session(params))
1059 .and_then(to_json_value),
1060 "flush_reports" => self
1061 .decode_params::<FlushReportsParams>(request.params)
1062 .and_then(|params| self.flush_reports(params))
1063 .and_then(to_json_value),
1064 other => Err(anyhow!("unknown method '{other}'")),
1065 };
1066
1067 match response {
1068 Ok(result) => BridgeResponse::ok(request.id, result),
1069 Err(err) => BridgeResponse::error(request.id, "bridge_error", err.to_string()),
1070 }
1071 }
1072
1073 fn decode_params<T: for<'de> Deserialize<'de>>(&self, value: Value) -> Result<T> {
1074 serde_json::from_value(value).map_err(|err| anyhow!(err))
1075 }
1076
1077 fn open_session(&mut self, params: OpenSessionParams) -> Result<OpenSessionResult> {
1078 if self.user_config.harness.mode == HarnessMode::Off {
1079 return Err(anyhow!("harness integration is disabled in user config"));
1080 }
1081
1082 let requested_transport = params
1083 .transport
1084 .unwrap_or(self.user_config.harness.transport);
1085 let transcript_mode = params
1086 .transcript_mode
1087 .unwrap_or(self.user_config.harness.transcript);
1088 let env_hints = merged_env_hints(¶ms.env_hints);
1089 let token_claims = user_config_token_claims(&self.user_config);
1090 let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
1091 let current_segment = current_session
1092 .as_ref()
1093 .and_then(|session| session.current_segment());
1094 let probe = probe_harness_actor(&HarnessProbeInput {
1095 argv: params.argv.clone(),
1096 env_hints: env_hints.clone(),
1097 explicit_harness: params.harness.clone(),
1098 explicit_provider: params.provider.clone(),
1099 explicit_model: params.model.clone(),
1100 explicit_thinking_level: params.thinking_level.clone(),
1101 explicit_policy: params.policy.clone(),
1102 probe_metadata: params.probe_metadata.clone(),
1103 current_provider: current_segment.map(|segment| segment.provider.clone()),
1104 current_model: current_segment.map(|segment| segment.model.clone()),
1105 current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
1106 repo_root: self.repo.root().display().to_string(),
1107 })?;
1108 let identity = resolve_identity(
1109 &self.repo,
1110 &self.user_config,
1111 IdentityHints {
1112 harness: params.harness.clone(),
1113 provider: params.provider.clone(),
1114 model: params.model.clone(),
1115 thinking_level: params.thinking_level.clone(),
1116 policy: params.policy.clone(),
1117 probe: probe.clone(),
1118 },
1119 )?;
1120 let registry = AgentRegistry::new(self.repo.heddle_dir());
1121 let requested_entry = resolve_requested_registry_entry(
1122 ®istry,
1123 params.agent_session_id.as_deref(),
1124 params.client_instance_id.as_deref(),
1125 )?;
1126
1127 if self.user_config.harness.mode == HarnessMode::Required
1128 && (identity.harness.is_none()
1129 || identity.provider.is_none()
1130 || identity.model.is_none())
1131 {
1132 return Err(anyhow!(
1133 "harness mode is 'required' but harness/provider/model could not be resolved"
1134 ));
1135 }
1136
1137 let mut sessions = SessionManager::new(self.repo.root());
1138 let principal = self.repo.get_principal()?;
1139 let mut attach = resolve_actor_attachment(
1140 ®istry,
1141 &self.repo,
1142 &mut sessions,
1143 AttachmentResolutionInput {
1144 requested_entry: requested_entry.as_ref(),
1145 explicit_heddle_session_id: params.heddle_session_id.as_deref(),
1146 client_instance_id: params.client_instance_id.as_deref(),
1147 probe: &probe,
1148 token_claims: token_claims.as_ref(),
1149 },
1150 )?;
1151 let (session, owns_session) = match &attach.target {
1152 AttachTarget::ExistingSession(session) => {
1153 let segment_id = session.current_segment_id.clone().unwrap_or_default();
1154 sessions.set_current_session(&session.id, &segment_id)?;
1155 (session.clone(), false)
1156 }
1157 AttachTarget::CreateNew {
1158 _because_claimed: _,
1159 } => {
1160 let session = sessions.start_session(
1161 principal,
1162 identity
1163 .provider
1164 .clone()
1165 .unwrap_or_else(|| "unknown".to_string()),
1166 identity
1167 .model
1168 .clone()
1169 .unwrap_or_else(|| "unknown".to_string()),
1170 identity.policy.clone(),
1171 )?;
1172 (session, true)
1173 }
1174 };
1175
1176 let (thread_name, thread_id) =
1177 self.resolve_harness_thread_binding(¶ms, &probe, &identity)?;
1178 let entry = self.ensure_registry_entry(RegistryEntryRequest {
1179 heddle_session_id: &session.id,
1180 thread_name: thread_name.as_deref(),
1181 thread_id: thread_id.as_deref(),
1182 identity: &identity,
1183 probe: &probe,
1184 attach: &attach,
1185 client_instance_id: params.client_instance_id.as_deref(),
1186 requested_entry: requested_entry.as_ref(),
1187 })?;
1188 let (session, owns_session) = self.reuse_canonical_actor_session(
1189 &mut sessions,
1190 CanonicalActorSessionRequest {
1191 tentative_session: session,
1192 tentative_owns_session: owns_session,
1193 entry: &entry,
1194 probe: &probe,
1195 attach: &mut attach,
1196 },
1197 )?;
1198
1199 let mut segment_id = session.current_segment_id.clone().unwrap_or_default();
1200 if should_rotate_segment(&session, &identity) {
1201 let segment = sessions.add_segment(
1202 &session.id,
1203 identity
1204 .provider
1205 .clone()
1206 .unwrap_or_else(|| "unknown".to_string()),
1207 identity
1208 .model
1209 .clone()
1210 .unwrap_or_else(|| "unknown".to_string()),
1211 identity.policy.clone(),
1212 )?;
1213 segment_id = segment.id;
1214 }
1215
1216 let base_state = self
1217 .repo
1218 .current_state()?
1219 .map(|state| state.change_id.to_string_full())
1220 .or_else(|| {
1221 self.repo
1222 .head()
1223 .ok()
1224 .flatten()
1225 .map(|id| id.to_string_full())
1226 });
1227 let worktree_changes_at_open = capture_worktree_change_snapshot(&self.repo)?;
1228 let opened_at = Utc::now().to_rfc3339();
1229 let mut report = SessionReportEnvelope {
1230 version: 1,
1231 heddle_session_id: session.id.clone(),
1232 heddle_segment_id: (!segment_id.is_empty()).then_some(segment_id.clone()),
1233 agent_session_id: Some(entry.session_id.clone()),
1234 client_instance_id: entry.client_instance_id.clone(),
1235 native_actor_key: entry.native_actor_key.clone(),
1236 native_parent_actor_key: entry.native_parent_actor_key.clone(),
1237 native_instance_key: entry.native_instance_key.clone(),
1238 repo_root: self.repo.root().display().to_string(),
1239 thread: thread_name.clone(),
1240 thread_id,
1241 task: params.task.clone(),
1242 summary: params.summary.clone(),
1243 opened_at,
1244 closed_at: None,
1245 base_state_at_open: base_state.clone(),
1246 worktree_changes_at_open,
1247 head_state_at_close: None,
1248 transport_mode: transport_mode_name(requested_transport).to_string(),
1249 transcript_mode: transcript_mode_name(transcript_mode).to_string(),
1250 outcome: None,
1251 harness: identity.to_transport_identity(),
1252 progress: Vec::new(),
1253 usage: UsageTotals::default(),
1254 touched_paths: Vec::new(),
1255 changed_paths: Vec::new(),
1256 diff_summary: None,
1257 transcript_refs: Vec::new(),
1258 last_progress_at: None,
1259 report_flush_state: Some("pending-local".to_string()),
1260 attach_reason: Some(attach.attach_reason.clone()),
1261 attach_precedence: attach.precedence.clone(),
1262 winning_attach_rule: Some(attach.winning_rule.clone()),
1263 probe_source: probe.probe_source.clone(),
1264 probe_confidence: probe.confidence,
1265 pending_flush: true,
1266 last_flushed_at: None,
1267 owns_session,
1268 };
1269 merge_unique_paths(&mut report.touched_paths, probe.touched_paths.clone());
1270 merge_usage(&mut report.usage, &probe.usage_totals);
1271 if transcript_mode != HarnessTranscriptMode::Off {
1272 report.transcript_refs = probe.transcript_refs.clone();
1273 }
1274 self.reports.save(&report)?;
1275 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1276 if matches!(requested_transport, HarnessTransport::Direct) {
1277 enqueue_report(&self.reports, &mut report)?;
1278 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1279 }
1280
1281 Ok(OpenSessionResult {
1282 heddle_session_id: report.heddle_session_id.clone(),
1283 heddle_segment_id: report.heddle_segment_id.clone(),
1284 agent_session_id: report.agent_session_id.clone(),
1285 created_session: owns_session,
1286 harness: report.harness.harness.clone(),
1287 provider: report.harness.provider.clone(),
1288 model: report.harness.model.clone(),
1289 thinking_level: report.harness.thinking_level.clone(),
1290 report_flush_state: report.report_flush_state.clone(),
1291 attach_reason: report.attach_reason.clone(),
1292 })
1293 }
1294
1295 fn update_progress(&mut self, params: UpdateProgressParams) -> Result<SessionMutationResult> {
1296 let mut report = self
1297 .reports
1298 .load(¶ms.heddle_session_id)?
1299 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1300 let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
1301 let current_segment = current_session
1302 .as_ref()
1303 .and_then(|session| session.current_segment());
1304 let probe = probe_harness_actor(&HarnessProbeInput {
1305 argv: params.argv.clone(),
1306 env_hints: merged_env_hints(¶ms.env_hints),
1307 explicit_harness: params.harness.clone(),
1308 explicit_provider: params.provider.clone(),
1309 explicit_model: params.model.clone(),
1310 explicit_thinking_level: params.thinking_level.clone(),
1311 explicit_policy: params.policy.clone(),
1312 probe_metadata: params.probe_metadata.clone(),
1313 current_provider: current_segment.map(|segment| segment.provider.clone()),
1314 current_model: current_segment.map(|segment| segment.model.clone()),
1315 current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
1316 repo_root: self.repo.root().display().to_string(),
1317 })?;
1318 let identity = resolve_identity(
1319 &self.repo,
1320 &self.user_config,
1321 IdentityHints {
1322 harness: params.harness.clone(),
1323 provider: params.provider.clone(),
1324 model: params.model.clone(),
1325 thinking_level: params.thinking_level.clone(),
1326 policy: params.policy.clone(),
1327 probe: probe.clone(),
1328 },
1329 )?;
1330 self.ensure_segment_for_report(&mut report, &identity)?;
1331 if report.harness.harness.is_none() {
1332 report.harness.harness = identity.harness.clone();
1333 }
1334 if report.harness.provider.is_none() {
1335 report.harness.provider = identity.provider.clone();
1336 }
1337 if report.harness.model.is_none() {
1338 report.harness.model = identity.model.clone();
1339 }
1340 if report.harness.thinking_level.is_none() {
1341 report.harness.thinking_level = identity.thinking_level.clone();
1342 }
1343 if report.harness.policy.is_none() {
1344 report.harness.policy = identity.policy.clone();
1345 }
1346 if report.native_actor_key.is_none() {
1347 report.native_actor_key = probe.native_actor_key.clone();
1348 }
1349 if report.native_parent_actor_key.is_none() {
1350 report.native_parent_actor_key = probe.native_parent_actor_key.clone();
1351 }
1352 if report.native_instance_key.is_none() {
1353 report.native_instance_key = probe.native_instance_key.clone();
1354 }
1355 if report.probe_source.is_none() {
1356 report.probe_source = probe.probe_source.clone();
1357 }
1358 if report.probe_confidence.is_none() {
1359 report.probe_confidence = probe.confidence;
1360 }
1361
1362 let recorded_at = Utc::now().to_rfc3339();
1363 let checkpoint = ProgressCheckpoint {
1364 status: params.status.clone(),
1365 message: params.message.clone(),
1366 completed_steps: params.completed_steps,
1367 total_steps: params.total_steps,
1368 touched_paths: normalize_paths(
1369 params
1370 .touched_paths
1371 .into_iter()
1372 .chain(probe.touched_paths)
1373 .collect::<Vec<_>>(),
1374 ),
1375 recorded_at: recorded_at.clone(),
1376 };
1377 merge_unique_paths(
1378 &mut report.touched_paths,
1379 checkpoint.touched_paths.iter().cloned(),
1380 );
1381 merge_usage(&mut report.usage, &probe.usage_totals);
1382 if report.transcript_mode != "off" && report.transcript_refs.is_empty() {
1383 report.transcript_refs = probe.transcript_refs;
1384 }
1385 report.progress.push(checkpoint);
1386 if let Some(summary) = params.summary {
1387 report.summary = Some(summary);
1388 }
1389 report.last_progress_at = Some(recorded_at);
1390 mark_pending_flush(&mut report);
1391 self.persist_report(report)
1392 }
1393
1394 fn resolve_harness_thread_binding(
1395 &self,
1396 params: &OpenSessionParams,
1397 probe: &HarnessProbeResult,
1398 identity: &ResolvedIdentity,
1399 ) -> Result<(Option<String>, Option<String>)> {
1400 if let Some(thread) = params.thread.clone() {
1401 let thread_id = thread_id_for_name(&self.repo, Some(&thread))?;
1402 return Ok((Some(thread), thread_id));
1403 }
1404
1405 let current_attached = match self.repo.head_ref()? {
1406 Head::Attached { thread } => Some(thread.to_string()),
1407 Head::Detached { .. } => None,
1408 };
1409
1410 if !probe.attach_hints.root_actor
1411 && self.user_config.harness.threading.subagent
1412 == UserHarnessSubagentThreadPolicy::CreateChild
1413 && let Some(parent_thread) =
1414 resolve_parent_thread_for_subagent(&self.repo, probe, current_attached.as_deref())?
1415 && can_create_harness_thread(&self.repo, Some(&parent_thread), Some(&parent_thread))?
1416 {
1417 let name = allocate_thread_name(
1418 &self.repo,
1419 &format!(
1420 "{}/{}",
1421 parent_thread,
1422 sanitize_name(&preferred_thread_slug(params, probe, identity))
1423 ),
1424 )?;
1425 self.ensure_harness_thread(
1426 &name,
1427 Some(&parent_thread),
1428 Some(&parent_thread),
1429 params.task.clone(),
1430 )?;
1431 let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
1432 return Ok((Some(name), thread_id));
1433 }
1434
1435 if probe.attach_hints.root_actor
1436 && self.user_config.harness.threading.root_actor
1437 == UserHarnessRootThreadPolicy::CreateNew
1438 && let Some(current) = current_attached.clone()
1439 && can_create_harness_thread(&self.repo, Some(¤t), None)?
1440 {
1441 let name = allocate_thread_name(
1442 &self.repo,
1443 &format!(
1444 "{}/{}",
1445 current,
1446 sanitize_name(&preferred_thread_slug(params, probe, identity))
1447 ),
1448 )?;
1449 self.ensure_harness_thread(&name, Some(¤t), None, params.task.clone())?;
1450 let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
1451 return Ok((Some(name), thread_id));
1452 }
1453
1454 let thread_id = thread_id_for_name(&self.repo, current_attached.as_deref())?;
1455 Ok((current_attached, thread_id))
1456 }
1457
1458 fn ensure_harness_thread(
1459 &self,
1460 name: &str,
1461 target_thread: Option<&str>,
1462 parent_thread: Option<&str>,
1463 task: Option<String>,
1464 ) -> Result<()> {
1465 let manager = ThreadManager::new(self.repo.heddle_dir());
1466 if manager.load(name)?.is_some() {
1467 return Ok(());
1468 }
1469
1470 let base_state = self
1471 .resolve_harness_thread_base_state(target_thread, parent_thread)?
1472 .ok_or_else(|| anyhow!("No current state to start a thread from"))?;
1473 let tn = ThreadName::new(name);
1474 if self.repo.refs().get_thread(&tn)?.is_none() {
1475 self.repo
1476 .refs()
1477 .set_thread_cas(&tn, refs::RefExpectation::Missing, &base_state)?;
1478 self.repo.oplog().record_thread_create(
1483 &tn,
1484 &base_state,
1485 None,
1486 Some(&self.repo.op_scope()),
1487 )?;
1488 }
1489
1490 let workspace_mode = self
1491 .user_config
1492 .harness
1493 .threading
1494 .workspace_default
1495 .unwrap_or(UserThreadWorkspaceMode::Materialized);
1496 let thread_mode = match workspace_mode {
1497 UserThreadWorkspaceMode::Materialized | UserThreadWorkspaceMode::Auto => {
1498 ThreadMode::Materialized
1499 }
1500 UserThreadWorkspaceMode::Virtualized => ThreadMode::Virtualized,
1501 UserThreadWorkspaceMode::Solid => ThreadMode::Solid,
1502 };
1503 let path = match thread_mode {
1504 ThreadMode::Solid | ThreadMode::Materialized => {
1505 default_private_thread_path(&self.repo, name)
1506 }
1507 ThreadMode::Virtualized => default_private_thread_path(&self.repo, name),
1510 };
1511 let abs_path = prepare_worktree_target(&self.repo, &path, Some(name))?.path;
1512 write_isolated_checkout(&self.repo, &abs_path, &base_state, Some(name))?;
1513
1514 let base_state_obj = self
1515 .repo
1516 .store()
1517 .get_state(&base_state)?
1518 .ok_or_else(|| anyhow!("Base state '{}' not found", base_state.short()))?;
1519 let thread = Thread {
1520 id: name.to_string(),
1521 thread: name.to_string(),
1522 target_thread: target_thread.map(ToString::to_string),
1523 parent_thread: parent_thread.map(ToString::to_string),
1524 mode: thread_mode.clone(),
1525 state: ThreadState::Active,
1526 base_state: base_state.short(),
1527 base_root: base_state_obj.tree.short(),
1528 current_state: Some(base_state.short()),
1529 merged_state: None,
1530 task,
1531 execution_path: abs_path.clone(),
1532 materialized_path: match thread_mode {
1533 ThreadMode::Solid => Some(abs_path),
1534 ThreadMode::Materialized | ThreadMode::Virtualized => None,
1538 },
1539 changed_paths: vec![],
1540 impact_categories: vec![],
1541 heavy_impact_paths: vec![],
1542 promotion_suggested: false,
1543 freshness: if target_thread.is_some() {
1544 ThreadFreshness::Current
1545 } else {
1546 ThreadFreshness::Unknown
1547 },
1548 verification_summary: summarize_verification(base_state_obj.verification.as_ref()),
1549 confidence_summary: summarize_confidence(base_state_obj.confidence),
1550 integration_policy_result: ThreadIntegrationPolicy::default(),
1551 created_at: Utc::now(),
1552 updated_at: Utc::now(),
1553 ephemeral: None,
1554 auto: true,
1559 shared_target_dir: None,
1562 };
1563 manager.save(&thread)?;
1564 Ok(())
1565 }
1566
1567 fn resolve_harness_thread_base_state(
1568 &self,
1569 target_thread: Option<&str>,
1570 parent_thread: Option<&str>,
1571 ) -> Result<Option<objects::object::ChangeId>> {
1572 resolve_harness_thread_base_state(&self.repo, target_thread, parent_thread)
1573 }
1574
1575 fn record_usage(&mut self, params: RecordUsageParams) -> Result<SessionMutationResult> {
1576 let mut report = self
1577 .reports
1578 .load(¶ms.heddle_session_id)?
1579 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1580 if let Some(input) = params.input_tokens {
1581 report.usage.input_tokens = Some(max_u64(report.usage.input_tokens, input));
1582 }
1583 if let Some(output) = params.output_tokens {
1584 report.usage.output_tokens = Some(max_u64(report.usage.output_tokens, output));
1585 }
1586 if let Some(reasoning) = params.reasoning_tokens {
1587 report.usage.reasoning_tokens = Some(max_u64(report.usage.reasoning_tokens, reasoning));
1588 }
1589 if let Some(cache_creation) = params.cache_creation_tokens {
1590 report.usage.cache_creation_tokens =
1591 Some(max_u64(report.usage.cache_creation_tokens, cache_creation));
1592 }
1593 if let Some(cache_read) = params.cache_read_tokens {
1594 report.usage.cache_read_tokens =
1595 Some(max_u64(report.usage.cache_read_tokens, cache_read));
1596 }
1597 if let Some(tool_calls) = params.tool_calls {
1598 report.usage.tool_calls = Some(max_u32(report.usage.tool_calls, tool_calls));
1599 }
1600 if let Some(cost) = params.cost_micros_usd {
1601 report.usage.cost_micros_usd = Some(max_u64(report.usage.cost_micros_usd, cost));
1602 }
1603 mark_pending_flush(&mut report);
1604 self.persist_report(report)
1605 }
1606
1607 fn record_touched_paths(
1608 &mut self,
1609 params: RecordTouchedPathsParams,
1610 ) -> Result<SessionMutationResult> {
1611 let mut report = self
1612 .reports
1613 .load(¶ms.heddle_session_id)?
1614 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1615 merge_unique_paths(&mut report.touched_paths, normalize_paths(params.paths));
1616 mark_pending_flush(&mut report);
1617 self.persist_report(report)
1618 }
1619
1620 fn close_session(&mut self, params: CloseSessionParams) -> Result<CloseSessionResult> {
1621 let mut report = self
1622 .reports
1623 .load(¶ms.heddle_session_id)?
1624 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1625 report.closed_at = Some(Utc::now().to_rfc3339());
1626 report.outcome = params.outcome.clone();
1627 if let Some(summary) = params.summary {
1628 report.summary = Some(summary);
1629 }
1630 if let Some(transcript_refs) = params.transcript_refs {
1631 report.transcript_refs = transcript_refs;
1632 }
1633 let final_diff = compute_final_diff(
1634 &self.repo,
1635 report.base_state_at_open.as_deref(),
1636 &report.worktree_changes_at_open,
1637 )?;
1638 report.head_state_at_close = final_diff.head_state;
1639 report.changed_paths = final_diff.changed_paths;
1640 report.diff_summary = Some(final_diff.diff_summary);
1641 mark_pending_flush(&mut report);
1642 if report.owns_session {
1643 let mut sessions = SessionManager::new(self.repo.root());
1644 if let Ok(Some(session)) = sessions.get_session(&report.heddle_session_id)
1645 && session.is_active()
1646 {
1647 let _ = sessions.end_session(Some(&report.heddle_session_id));
1648 }
1649 }
1650
1651 let transport = params
1652 .transport
1653 .unwrap_or(self.user_config.harness.transport);
1654 if matches!(transport, HarnessTransport::Direct | HarnessTransport::End) {
1655 enqueue_report(&self.reports, &mut report)?;
1656 } else {
1657 self.reports.save(&report)?;
1658 }
1659 self.sync_registry_from_report(&report, AgentStatus::Complete)?;
1660 Ok(CloseSessionResult {
1661 heddle_session_id: report.heddle_session_id,
1662 changed_paths: report.changed_paths,
1663 diff_summary: report.diff_summary.unwrap_or_default(),
1664 report_flush_state: report.report_flush_state,
1665 })
1666 }
1667
1668 fn flush_reports(&mut self, params: FlushReportsParams) -> Result<FlushReportsResult> {
1669 let mut flushed = 0usize;
1670 let session_ids = match params.heddle_session_id {
1671 Some(session_id) => vec![session_id],
1672 None => self.reports.list_pending()?,
1673 };
1674 for session_id in session_ids {
1675 let Some(mut report) = self.reports.load(&session_id)? else {
1676 continue;
1677 };
1678 if !report.pending_flush {
1679 continue;
1680 }
1681 enqueue_report(&self.reports, &mut report)?;
1682 let status = if report.closed_at.is_some() {
1683 AgentStatus::Complete
1684 } else {
1685 AgentStatus::Active
1686 };
1687 self.sync_registry_from_report(&report, status)?;
1688 flushed += 1;
1689 }
1690 Ok(FlushReportsResult { flushed })
1691 }
1692
1693 fn persist_report(
1694 &mut self,
1695 mut report: SessionReportEnvelope,
1696 ) -> Result<SessionMutationResult> {
1697 let transport = transport_from_report(&report, self.user_config.harness.transport);
1698 match transport {
1699 HarnessTransport::Direct => {
1700 enqueue_report(&self.reports, &mut report)?;
1701 }
1702 HarnessTransport::Spool | HarnessTransport::End => {
1703 self.reports.save(&report)?;
1704 }
1705 }
1706 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1707 Ok(SessionMutationResult {
1708 heddle_session_id: report.heddle_session_id,
1709 heddle_segment_id: report.heddle_segment_id,
1710 report_flush_state: report.report_flush_state,
1711 })
1712 }
1713
1714 fn ensure_segment_for_report(
1715 &self,
1716 report: &mut SessionReportEnvelope,
1717 identity: &ResolvedIdentity,
1718 ) -> Result<()> {
1719 let mut sessions = SessionManager::new(self.repo.root());
1720 let Some(session) = sessions.get_session(&report.heddle_session_id)? else {
1721 return Ok(());
1722 };
1723 if !session.is_active() || !should_rotate_segment(&session, identity) {
1724 return Ok(());
1725 }
1726 let segment = sessions.add_segment(
1727 &report.heddle_session_id,
1728 identity
1729 .provider
1730 .clone()
1731 .unwrap_or_else(|| "unknown".to_string()),
1732 identity
1733 .model
1734 .clone()
1735 .unwrap_or_else(|| "unknown".to_string()),
1736 identity.policy.clone(),
1737 )?;
1738 report.heddle_segment_id = Some(segment.id);
1739 if identity.provider.is_some() {
1740 report.harness.provider = identity.provider.clone();
1741 }
1742 if identity.model.is_some() {
1743 report.harness.model = identity.model.clone();
1744 }
1745 if identity.policy.is_some() {
1746 report.harness.policy = identity.policy.clone();
1747 }
1748 if identity.thinking_level.is_some() {
1749 report.harness.thinking_level = identity.thinking_level.clone();
1750 }
1751 Ok(())
1752 }
1753
1754 fn ensure_registry_entry(&self, request: RegistryEntryRequest<'_>) -> Result<AgentEntry> {
1755 let RegistryEntryRequest {
1756 heddle_session_id,
1757 thread_name,
1758 thread_id,
1759 identity,
1760 probe,
1761 attach,
1762 client_instance_id,
1763 requested_entry,
1764 } = request;
1765 let registry = AgentRegistry::new(self.repo.heddle_dir());
1766 let fallback_entry = if client_instance_id.is_some()
1767 || probe.native_actor_key.is_some()
1768 || probe.native_instance_key.is_some()
1769 {
1770 None
1771 } else {
1772 find_matching_registry_entry(®istry, &self.repo, heddle_session_id, thread_name)?
1773 };
1774 if let Some(entry) = requested_entry
1775 .cloned()
1776 .or_else(|| attach.matched_entry.clone())
1777 .or(fallback_entry)
1778 {
1779 return registry
1780 .update_entry(&entry.session_id, |existing| {
1781 if client_instance_id.is_some() {
1782 existing.client_instance_id = client_instance_id.map(ToString::to_string);
1783 }
1784 if probe.native_actor_key.is_some() {
1785 existing.native_actor_key = probe.native_actor_key.clone();
1786 }
1787 if probe.native_parent_actor_key.is_some() {
1788 existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
1789 }
1790 if probe.native_instance_key.is_some() {
1791 existing.native_instance_key = probe.native_instance_key.clone();
1792 }
1793 existing.heddle_session_id = Some(heddle_session_id.to_string());
1794 existing.thread_id = thread_id.map(ToString::to_string);
1795 if let Some(thread_name) = thread_name {
1796 existing.thread = thread_name.to_string();
1797 }
1798 existing.path = Some(self.repo.root().to_path_buf());
1799 if identity.provider.is_some() {
1800 existing.provider = identity.provider.clone();
1801 }
1802 if identity.model.is_some() {
1803 existing.model = identity.model.clone();
1804 }
1805 if identity.harness.is_some() {
1806 existing.harness = identity.harness.clone();
1807 }
1808 if identity.thinking_level.is_some() {
1809 existing.thinking_level = identity.thinking_level.clone();
1810 }
1811 existing.attach_reason = Some(attach.attach_reason.clone());
1812 existing.attach_precedence = attach.precedence.clone();
1813 existing.winning_attach_rule = Some(attach.winning_rule.clone());
1814 existing.probe_source = probe.probe_source.clone();
1815 existing.probe_confidence = probe.confidence;
1816 existing.status = AgentStatus::Active;
1817 })?
1818 .ok_or_else(|| anyhow!("registry entry disappeared during update"));
1819 }
1820
1821 if client_instance_id.is_none() && probe.native_actor_key.is_some() {
1822 let (entry, _) = registry.find_or_create_active_entry(
1823 |entry| {
1824 claude_actor_compatible(entry, probe, self.repo.root())
1825 && entry.native_actor_key == probe.native_actor_key
1826 },
1827 |existing| {
1828 if client_instance_id.is_some() {
1829 existing.client_instance_id = client_instance_id.map(ToString::to_string);
1830 }
1831 if existing.heddle_session_id.is_none() {
1832 existing.heddle_session_id = Some(heddle_session_id.to_string());
1833 }
1834 existing.thread_id = thread_id.map(ToString::to_string);
1835 if let Some(thread_name) = thread_name {
1836 existing.thread = thread_name.to_string();
1837 }
1838 existing.path = Some(self.repo.root().to_path_buf());
1839 if identity.provider.is_some() {
1840 existing.provider = identity.provider.clone();
1841 }
1842 if identity.model.is_some() {
1843 existing.model = identity.model.clone();
1844 }
1845 if identity.harness.is_some() {
1846 existing.harness = identity.harness.clone();
1847 }
1848 if identity.thinking_level.is_some() {
1849 existing.thinking_level = identity.thinking_level.clone();
1850 }
1851 if probe.native_parent_actor_key.is_some() {
1852 existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
1853 }
1854 if probe.native_instance_key.is_some() {
1855 existing.native_instance_key = probe.native_instance_key.clone();
1856 }
1857 existing.attach_reason = Some(attach.attach_reason.clone());
1858 existing.attach_precedence = attach.precedence.clone();
1859 existing.winning_attach_rule = Some(attach.winning_rule.clone());
1860 existing.probe_source = probe.probe_source.clone();
1861 existing.probe_confidence = probe.confidence;
1862 existing.status = AgentStatus::Active;
1863 },
1864 |session_id| {
1865 Ok(AgentEntry {
1866 session_id: session_id.to_string(),
1867 client_instance_id: client_instance_id.map(ToString::to_string),
1868 native_actor_key: probe.native_actor_key.clone(),
1869 native_parent_actor_key: probe.native_parent_actor_key.clone(),
1870 native_instance_key: probe.native_instance_key.clone(),
1871 heddle_session_id: Some(heddle_session_id.to_string()),
1872 thread_id: thread_id.map(ToString::to_string),
1873 thread: thread_name.unwrap_or("detached").to_string(),
1874 pid: Some(std::process::id()),
1875 boot_id: None,
1876 liveness_path: None,
1877 heartbeat_at: Some(Utc::now()),
1878 anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
1879 anchor_root: None,
1880 reservation_token: Some(objects::store::generate_agent_id()),
1881 path: Some(self.repo.root().to_path_buf()),
1882 base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
1883 started_at: Utc::now(),
1884 provider: identity.provider.clone(),
1885 model: identity.model.clone(),
1886 harness: identity.harness.clone(),
1887 thinking_level: identity.thinking_level.clone(),
1888 usage_summary: AgentUsageSummary::default(),
1889 last_progress_at: None,
1890 report_flush_state: Some("pending-local".to_string()),
1891 attach_reason: Some(attach.attach_reason.clone()),
1892 attach_precedence: attach.precedence.clone(),
1893 winning_attach_rule: Some(attach.winning_rule.clone()),
1894 probe_source: probe.probe_source.clone(),
1895 probe_confidence: probe.confidence,
1896 status: AgentStatus::Active,
1897 completed_at: None,
1898 context_queries: vec![],
1899 })
1900 },
1901 )?;
1902 return Ok(entry);
1903 }
1904
1905 Ok(registry.create_generated_entry(|session_id| {
1906 Ok(AgentEntry {
1907 session_id: session_id.to_string(),
1908 client_instance_id: client_instance_id.map(ToString::to_string),
1909 native_actor_key: probe.native_actor_key.clone(),
1910 native_parent_actor_key: probe.native_parent_actor_key.clone(),
1911 native_instance_key: probe.native_instance_key.clone(),
1912 heddle_session_id: Some(heddle_session_id.to_string()),
1913 thread_id: thread_id.map(ToString::to_string),
1914 thread: thread_name.unwrap_or("detached").to_string(),
1915 pid: Some(std::process::id()),
1916 boot_id: None,
1917 liveness_path: None,
1918 heartbeat_at: Some(Utc::now()),
1919 anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
1920 anchor_root: None,
1921 reservation_token: Some(objects::store::generate_agent_id()),
1922 path: Some(self.repo.root().to_path_buf()),
1923 base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
1924 started_at: Utc::now(),
1925 provider: identity.provider.clone(),
1926 model: identity.model.clone(),
1927 harness: identity.harness.clone(),
1928 thinking_level: identity.thinking_level.clone(),
1929 usage_summary: AgentUsageSummary::default(),
1930 last_progress_at: None,
1931 report_flush_state: Some("pending-local".to_string()),
1932 attach_reason: Some(attach.attach_reason.clone()),
1933 attach_precedence: attach.precedence.clone(),
1934 winning_attach_rule: Some(attach.winning_rule.clone()),
1935 probe_source: probe.probe_source.clone(),
1936 probe_confidence: probe.confidence,
1937 status: AgentStatus::Active,
1938 completed_at: None,
1939 context_queries: vec![],
1940 })
1941 })?)
1942 }
1943
1944 fn reuse_canonical_actor_session(
1945 &self,
1946 sessions: &mut SessionManager,
1947 request: CanonicalActorSessionRequest<'_>,
1948 ) -> Result<(Session, bool)> {
1949 let CanonicalActorSessionRequest {
1950 tentative_session,
1951 tentative_owns_session,
1952 entry,
1953 probe,
1954 attach,
1955 } = request;
1956 let Some(canonical_session_id) = entry.heddle_session_id.as_deref() else {
1957 return Ok((tentative_session, tentative_owns_session));
1958 };
1959 if canonical_session_id == tentative_session.id {
1960 return Ok((tentative_session, tentative_owns_session));
1961 }
1962
1963 if tentative_owns_session
1964 && let Ok(Some(session)) = sessions.get_session(&tentative_session.id)
1965 && session.is_active()
1966 {
1967 let _ = sessions.end_session(Some(&tentative_session.id));
1968 }
1969
1970 let canonical_session = sessions
1971 .get_session(canonical_session_id)?
1972 .ok_or_else(|| anyhow!("session not found: {canonical_session_id}"))?;
1973 let canonical_segment_id = canonical_session
1974 .current_segment_id
1975 .clone()
1976 .unwrap_or_default();
1977 sessions.set_current_session(canonical_session_id, &canonical_segment_id)?;
1978
1979 if let Some(native_actor_key) = probe
1980 .native_actor_key
1981 .as_deref()
1982 .or(entry.native_actor_key.as_deref())
1983 {
1984 attach.precedence.push(format!(
1985 "post-create-native-actor-key:{native_actor_key}:matched"
1986 ));
1987 attach.attach_reason = format!(
1988 "reused existing native actor {} on Heddle session {}",
1989 native_actor_key, canonical_session_id
1990 );
1991 attach.winning_rule = "native-actor-key-post-create".to_string();
1992 }
1993
1994 Ok((canonical_session, false))
1995 }
1996
1997 fn sync_registry_from_report(
1998 &self,
1999 report: &SessionReportEnvelope,
2000 status: AgentStatus,
2001 ) -> Result<()> {
2002 let registry = AgentRegistry::new(self.repo.heddle_dir());
2003 let entry = if let Some(agent_session_id) = &report.agent_session_id {
2004 registry.update_entry(agent_session_id, |entry| {
2005 if report.client_instance_id.is_some() {
2006 entry.client_instance_id = report.client_instance_id.clone();
2007 }
2008 if report.native_actor_key.is_some() {
2009 entry.native_actor_key = report.native_actor_key.clone();
2010 }
2011 if report.native_parent_actor_key.is_some() {
2012 entry.native_parent_actor_key = report.native_parent_actor_key.clone();
2013 }
2014 if report.native_instance_key.is_some() {
2015 entry.native_instance_key = report.native_instance_key.clone();
2016 }
2017 entry.heddle_session_id = Some(report.heddle_session_id.clone());
2018 entry.path = Some(self.repo.root().to_path_buf());
2019 entry.harness = report.harness.harness.clone();
2020 entry.provider = report.harness.provider.clone();
2021 entry.model = report.harness.model.clone();
2022 entry.thinking_level = report.harness.thinking_level.clone();
2023 entry.usage_summary = usage_to_summary(&report.usage);
2024 entry.last_progress_at =
2025 report.last_progress_at.as_deref().and_then(parse_timestamp);
2026 entry.report_flush_state = report.report_flush_state.clone();
2027 entry.attach_reason = report.attach_reason.clone();
2028 entry.attach_precedence = report.attach_precedence.clone();
2029 entry.winning_attach_rule = report.winning_attach_rule.clone();
2030 entry.probe_source = report.probe_source.clone();
2031 entry.probe_confidence = report.probe_confidence;
2032 entry.status = status.clone();
2033 entry.completed_at = match status {
2034 AgentStatus::Active => None,
2035 AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
2036 Some(Utc::now())
2037 }
2038 };
2039 })?
2040 } else {
2041 None
2042 };
2043
2044 if entry.is_none() {
2045 let resolved = self.ensure_registry_entry(RegistryEntryRequest {
2046 heddle_session_id: &report.heddle_session_id,
2047 thread_name: report.thread.as_deref(),
2048 thread_id: report.thread_id.as_deref(),
2049 identity: &ResolvedIdentity {
2050 harness: report.harness.harness.clone(),
2051 provider: report.harness.provider.clone(),
2052 model: report.harness.model.clone(),
2053 thinking_level: report.harness.thinking_level.clone(),
2054 policy: report.harness.policy.clone(),
2055 },
2056 probe: &HarnessProbeResult {
2057 native_actor_key: report.native_actor_key.clone(),
2058 native_parent_actor_key: report.native_parent_actor_key.clone(),
2059 native_instance_key: report.native_instance_key.clone(),
2060 probe_source: report.probe_source.clone(),
2061 confidence: report.probe_confidence,
2062 ..HarnessProbeResult::default()
2063 },
2064 attach: &ResolvedAttachment {
2065 target: AttachTarget::CreateNew {
2066 _because_claimed: false,
2067 },
2068 matched_entry: None,
2069 attach_reason: report.attach_reason.clone().unwrap_or_else(|| {
2070 format!(
2071 "created actor for Heddle session {}",
2072 report.heddle_session_id
2073 )
2074 }),
2075 precedence: report.attach_precedence.clone(),
2076 winning_rule: report
2077 .winning_attach_rule
2078 .clone()
2079 .unwrap_or_else(|| "report-sync".to_string()),
2080 },
2081 client_instance_id: report.client_instance_id.as_deref(),
2082 requested_entry: None,
2083 })?;
2084 let mut report = report.clone();
2085 report.agent_session_id = Some(resolved.session_id);
2086 self.reports.save(&report)?;
2087 }
2088 Ok(())
2089 }
2090}
2091
2092#[derive(Debug, Clone, Default)]
2093struct ResolvedIdentity {
2094 harness: Option<String>,
2095 provider: Option<String>,
2096 model: Option<String>,
2097 thinking_level: Option<String>,
2098 policy: Option<String>,
2099}
2100
2101impl ResolvedIdentity {
2102 fn to_transport_identity(&self) -> HarnessIdentity {
2103 HarnessIdentity {
2104 harness: self.harness.clone(),
2105 provider: self.provider.clone(),
2106 model: self.model.clone(),
2107 thinking_level: self.thinking_level.clone(),
2108 policy: self.policy.clone(),
2109 }
2110 }
2111}
2112
2113struct IdentityHints {
2114 harness: Option<String>,
2115 provider: Option<String>,
2116 model: Option<String>,
2117 thinking_level: Option<String>,
2118 policy: Option<String>,
2119 probe: HarnessProbeResult,
2120}
2121
2122fn resolve_identity(
2123 repo: &Repository,
2124 user_config: &UserConfig,
2125 hints: IdentityHints,
2126) -> Result<ResolvedIdentity> {
2127 let current_session = SessionManager::new(repo.root()).get_current_session()?;
2128 let current_segment = current_session
2129 .as_ref()
2130 .and_then(|session| session.current_segment());
2131 let token_claims = if user_config.harness.auto_infer {
2132 user_config_token_claims(user_config)
2133 } else {
2134 None
2135 };
2136 let harness_override = resolved_harness_override(
2137 user_config,
2138 hints.harness.as_deref(),
2139 hints.probe.harness.as_deref(),
2140 );
2141
2142 Ok(ResolvedIdentity {
2143 harness: hints.harness.or(hints.probe.harness),
2144 provider: hints
2145 .provider
2146 .or(hints.probe.provider)
2147 .or_else(|| current_segment.map(|segment| segment.provider.clone()))
2148 .or_else(|| {
2149 token_claims
2150 .as_ref()
2151 .and_then(|claims| claims.agent_provider.clone())
2152 })
2153 .or_else(|| harness_override.and_then(|entry| entry.provider.clone()))
2154 .or_else(|| user_config.agent.provider.clone()),
2155 model: hints
2156 .model
2157 .or(hints.probe.model)
2158 .or_else(|| current_segment.map(|segment| segment.model.clone()))
2159 .or_else(|| {
2160 token_claims
2161 .as_ref()
2162 .and_then(|claims| claims.agent_model.clone())
2163 })
2164 .or_else(|| harness_override.and_then(|entry| entry.model.clone()))
2165 .or_else(|| user_config.agent.model.clone()),
2166 thinking_level: hints
2167 .thinking_level
2168 .or(hints.probe.thinking_level)
2169 .or_else(|| harness_override.and_then(|entry| entry.thinking_level.clone())),
2170 policy: hints
2171 .policy
2172 .or(hints.probe.policy)
2173 .or_else(|| current_segment.and_then(|segment| segment.policy_id.clone()))
2174 .or_else(|| harness_override.and_then(|entry| entry.policy.clone()))
2175 .or_else(|| user_config.agent.default_policy.clone()),
2176 })
2177}
2178
2179fn resolved_harness_override<'a>(
2180 user_config: &'a UserConfig,
2181 explicit: Option<&str>,
2182 fingerprint: Option<&str>,
2183) -> Option<&'a UserHarnessOverride> {
2184 explicit
2185 .and_then(|name| user_config.harness.harnesses.get(name))
2186 .or_else(|| fingerprint.and_then(|name| user_config.harness.harnesses.get(name)))
2187}
2188
2189enum AttachTarget {
2190 ExistingSession(objects::object::Session),
2191 CreateNew { _because_claimed: bool },
2192}
2193
2194struct ResolvedAttachment {
2195 target: AttachTarget,
2196 matched_entry: Option<AgentEntry>,
2197 attach_reason: String,
2198 precedence: Vec<String>,
2199 winning_rule: String,
2200}
2201
2202fn resolve_actor_attachment(
2203 registry: &AgentRegistry,
2204 repo: &Repository,
2205 sessions: &mut SessionManager,
2206 input: AttachmentResolutionInput<'_>,
2207) -> Result<ResolvedAttachment> {
2208 let AttachmentResolutionInput {
2209 requested_entry,
2210 explicit_heddle_session_id,
2211 client_instance_id,
2212 probe,
2213 token_claims,
2214 } = input;
2215 let mut precedence = Vec::new();
2216 if let Some(entry) = requested_entry
2217 && let Some(bound_session_id) = entry.heddle_session_id.as_deref()
2218 {
2219 precedence.push(format!(
2220 "explicit-agent-session:{}:matched",
2221 entry.session_id
2222 ));
2223 let session = sessions
2224 .get_session(bound_session_id)?
2225 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2226 if !session.is_active() {
2227 return Err(anyhow!("session is not active: {bound_session_id}"));
2228 }
2229 return Ok(ResolvedAttachment {
2230 target: AttachTarget::ExistingSession(session),
2231 matched_entry: Some(entry.clone()),
2232 attach_reason: format!(
2233 "reattached actor {} to existing Heddle session {}",
2234 entry.session_id, bound_session_id
2235 ),
2236 precedence,
2237 winning_rule: "explicit-agent-session".to_string(),
2238 });
2239 }
2240 precedence.push("explicit-agent-session:miss".to_string());
2241
2242 if let Some(session_id) = explicit_heddle_session_id {
2243 precedence.push(format!("explicit-heddle-session:{session_id}:matched"));
2244 ensure_requested_entry_matches_session(requested_entry, session_id)?;
2245 let session = sessions
2246 .get_session(session_id)?
2247 .ok_or_else(|| anyhow!("session not found: {session_id}"))?;
2248 if !session.is_active() {
2249 return Err(anyhow!("session is not active: {session_id}"));
2250 }
2251 return Ok(ResolvedAttachment {
2252 target: AttachTarget::ExistingSession(session),
2253 matched_entry: None,
2254 attach_reason: format!("attached to explicit Heddle session {session_id}"),
2255 precedence,
2256 winning_rule: "explicit-heddle-session".to_string(),
2257 });
2258 }
2259 precedence.push("explicit-heddle-session:miss".to_string());
2260
2261 if client_instance_id.is_none()
2262 && let Some(native_actor_key) = probe.native_actor_key.as_deref()
2263 {
2264 if let Some(entry) = registry.find_active_by_native_actor_key(native_actor_key)?
2265 && claude_actor_compatible(&entry, probe, repo.root())
2266 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2267 {
2268 precedence.push(format!("native-actor-key:{native_actor_key}:matched"));
2269 let session = sessions
2270 .get_session(&bound_session_id)?
2271 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2272 if session.is_active() {
2273 return Ok(ResolvedAttachment {
2274 target: AttachTarget::ExistingSession(session),
2275 matched_entry: Some(entry),
2276 attach_reason: format!(
2277 "reattached native actor {} to Heddle session {}",
2278 native_actor_key, bound_session_id
2279 ),
2280 precedence,
2281 winning_rule: "native-actor-key".to_string(),
2282 });
2283 }
2284 }
2285 precedence.push(format!("native-actor-key:{native_actor_key}:miss"));
2286 } else {
2287 precedence.push("native-actor-key:miss".to_string());
2288 }
2289
2290 if let Some(client_instance_id) = client_instance_id {
2291 if let Some(entry) = registry.find_active_by_client_instance_id(client_instance_id)?
2292 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2293 {
2294 precedence.push(format!("client-instance-id:{client_instance_id}:matched"));
2295 let session = sessions
2296 .get_session(&bound_session_id)?
2297 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2298 if session.is_active() {
2299 return Ok(ResolvedAttachment {
2300 target: AttachTarget::ExistingSession(session),
2301 matched_entry: Some(entry),
2302 attach_reason: format!(
2303 "reattached client instance {client_instance_id} to Heddle session {bound_session_id}"
2304 ),
2305 precedence,
2306 winning_rule: "client-instance-id".to_string(),
2307 });
2308 }
2309 }
2310 precedence.push(format!("client-instance-id:{client_instance_id}:miss"));
2311 return Ok(ResolvedAttachment {
2312 target: AttachTarget::CreateNew {
2313 _because_claimed: false,
2314 },
2315 matched_entry: None,
2316 attach_reason: format!(
2317 "started new Heddle session for distinct client instance {client_instance_id}"
2318 ),
2319 precedence,
2320 winning_rule: "create-new-session".to_string(),
2321 });
2322 } else {
2323 precedence.push("client-instance-id:miss".to_string());
2324 }
2325
2326 if client_instance_id.is_none() && probe.native_actor_key.is_some() {
2327 precedence.push("native-instance-key:skipped-strong-native-key".to_string());
2328 return Ok(ResolvedAttachment {
2329 target: AttachTarget::CreateNew {
2330 _because_claimed: false,
2331 },
2332 matched_entry: None,
2333 attach_reason:
2334 "started new Heddle session because no compatible native actor match was found"
2335 .to_string(),
2336 precedence,
2337 winning_rule: "create-new-session".to_string(),
2338 });
2339 }
2340
2341 if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
2342 if let Some(entry) =
2343 registry.find_active_by_native_instance_key_at_path(native_instance_key, repo.root())?
2344 && claude_actor_compatible(&entry, probe, repo.root())
2345 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2346 {
2347 precedence.push(format!("native-instance-key:{native_instance_key}:matched"));
2348 let session = sessions
2349 .get_session(&bound_session_id)?
2350 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2351 if session.is_active() {
2352 return Ok(ResolvedAttachment {
2353 target: AttachTarget::ExistingSession(session),
2354 matched_entry: Some(entry),
2355 attach_reason: format!(
2356 "reattached native instance {} to Heddle session {}",
2357 native_instance_key, bound_session_id
2358 ),
2359 precedence,
2360 winning_rule: "native-instance-key".to_string(),
2361 });
2362 }
2363 }
2364 precedence.push(format!("native-instance-key:{native_instance_key}:miss"));
2365 } else {
2366 precedence.push("native-instance-key:miss".to_string());
2367 }
2368
2369 if probe.attach_hints.root_actor
2370 && let Some(current) = sessions.get_current_session()?
2371 && current.is_active()
2372 {
2373 let claimed = session_claimed_by_other(
2374 registry,
2375 ¤t.id,
2376 requested_entry,
2377 client_instance_id,
2378 probe.native_actor_key.as_deref(),
2379 )?;
2380 if !claimed {
2381 precedence.push(format!("current-worktree-session:{}:matched", current.id));
2382 return Ok(ResolvedAttachment {
2383 target: AttachTarget::ExistingSession(current.clone()),
2384 matched_entry: None,
2385 attach_reason: format!("attached to active worktree Heddle session {}", current.id),
2386 precedence,
2387 winning_rule: "current-worktree-session".to_string(),
2388 });
2389 }
2390 precedence.push(format!("current-worktree-session:{}:claimed", current.id));
2391 return Ok(ResolvedAttachment {
2392 target: AttachTarget::CreateNew {
2393 _because_claimed: true,
2394 },
2395 matched_entry: None,
2396 attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
2397 precedence,
2398 winning_rule: "create-new-session".to_string(),
2399 });
2400 }
2401 precedence.push("current-worktree-session:miss".to_string());
2402
2403 if let Some(claims) = token_claims
2404 && let Some(token_sid) = claims.sid.as_deref()
2405 && let Some(session) = sessions.get_session(token_sid)?
2406 && session.is_active()
2407 {
2408 let claimed = session_claimed_by_other(
2409 registry,
2410 &session.id,
2411 requested_entry,
2412 client_instance_id,
2413 probe.native_actor_key.as_deref(),
2414 )?;
2415 if !claimed {
2416 precedence.push(format!("token-sid:{token_sid}:matched"));
2417 return Ok(ResolvedAttachment {
2418 target: AttachTarget::ExistingSession(session),
2419 matched_entry: None,
2420 attach_reason: format!(
2421 "attached to Heddle session {token_sid} from auth token sid"
2422 ),
2423 precedence,
2424 winning_rule: "token-sid".to_string(),
2425 });
2426 }
2427 precedence.push(format!("token-sid:{token_sid}:claimed"));
2428 return Ok(ResolvedAttachment {
2429 target: AttachTarget::CreateNew {
2430 _because_claimed: true,
2431 },
2432 matched_entry: None,
2433 attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
2434 precedence,
2435 winning_rule: "create-new-session".to_string(),
2436 });
2437 }
2438 precedence.push("token-sid:miss".to_string());
2439
2440 Ok(ResolvedAttachment {
2441 target: AttachTarget::CreateNew {
2442 _because_claimed: false,
2443 },
2444 matched_entry: None,
2445 attach_reason: "started new Heddle session".to_string(),
2446 precedence,
2447 winning_rule: "create-new-session".to_string(),
2448 })
2449}
2450
2451fn claude_actor_compatible(
2452 entry: &AgentEntry,
2453 probe: &HarnessProbeResult,
2454 repo_root: &Path,
2455) -> bool {
2456 let Some(native_actor_key) = probe.native_actor_key.as_deref() else {
2457 return true;
2458 };
2459 if !native_actor_key.starts_with("claude-code:") {
2460 return true;
2461 }
2462 if native_actor_key.starts_with("claude-code:agent:") {
2463 return entry.native_actor_key.as_deref() == Some(native_actor_key);
2464 }
2465 if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
2466 return entry.native_actor_key.as_deref() == Some(native_actor_key)
2467 && entry.native_instance_key.as_deref() == Some(native_instance_key);
2468 }
2469 let same_repo = entry
2470 .path
2471 .as_ref()
2472 .map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()))
2473 .unwrap_or_default()
2474 == repo_root
2475 .canonicalize()
2476 .unwrap_or_else(|_| repo_root.to_path_buf());
2477 entry.native_actor_key.as_deref() == Some(native_actor_key)
2478 && same_repo
2479 && probe.confidence.unwrap_or_default() >= 0.9
2480}
2481
2482fn decode_token_claims(token: &str) -> Option<TokenClaims> {
2483 let payload = token.split('.').nth(1)?;
2484 let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
2485 .decode(payload.as_bytes())
2486 .ok()?;
2487 serde_json::from_slice(&decoded).ok()
2488}
2489
2490fn user_config_token_claims(user_config: &UserConfig) -> Option<TokenClaims> {
2491 user_config
2492 .remote_token()
2493 .ok()
2494 .flatten()
2495 .and_then(|token| decode_token_claims(&token.id))
2496}
2497
2498#[derive(Debug, Deserialize)]
2499struct TokenClaims {
2500 #[serde(default)]
2501 sid: Option<String>,
2502 #[serde(default)]
2503 agent_provider: Option<String>,
2504 #[serde(default)]
2505 agent_model: Option<String>,
2506}
2507
2508fn should_rotate_segment(session: &objects::object::Session, identity: &ResolvedIdentity) -> bool {
2509 let Some(segment) = session.current_segment() else {
2510 return false;
2511 };
2512 let provider_changed = identity
2513 .provider
2514 .as_deref()
2515 .is_some_and(|provider| provider != segment.provider);
2516 let model_changed = identity
2517 .model
2518 .as_deref()
2519 .is_some_and(|model| model != segment.model);
2520 provider_changed || model_changed
2521}
2522
2523fn thread_id_for_name(repo: &Repository, thread_name: Option<&str>) -> Result<Option<String>> {
2524 let Some(thread_name) = thread_name else {
2525 return Ok(None);
2526 };
2527 Ok(ThreadManager::new(repo.heddle_dir())
2528 .load(thread_name)?
2529 .map(|thread| thread.id))
2530}
2531
2532fn can_create_harness_thread(
2533 repo: &Repository,
2534 target_thread: Option<&str>,
2535 parent_thread: Option<&str>,
2536) -> Result<bool> {
2537 Ok(resolve_harness_thread_base_state(repo, target_thread, parent_thread)?.is_some())
2538}
2539
2540fn resolve_harness_thread_base_state(
2541 repo: &Repository,
2542 target_thread: Option<&str>,
2543 parent_thread: Option<&str>,
2544) -> Result<Option<objects::object::ChangeId>> {
2545 if let Some(head_state) = repo.head()? {
2546 return Ok(Some(head_state));
2547 }
2548
2549 for thread_name in [parent_thread, target_thread].into_iter().flatten() {
2550 if let Some(state) = resolve_named_thread_base_state(repo, thread_name)? {
2551 return Ok(Some(state));
2552 }
2553 }
2554
2555 Ok(None)
2556}
2557
2558fn resolve_named_thread_base_state(
2559 repo: &Repository,
2560 thread_name: &str,
2561) -> Result<Option<objects::object::ChangeId>> {
2562 if let Some(thread) = ThreadManager::new(repo.heddle_dir()).load(thread_name)?
2563 && let Some(state_spec) = thread
2564 .current_state
2565 .as_deref()
2566 .or(Some(thread.base_state.as_str()))
2567 && let Some(state_id) = repo
2568 .resolve_state(state_spec)?
2569 .or_else(|| objects::object::ChangeId::parse(state_spec).ok())
2570 {
2571 return Ok(Some(state_id));
2572 }
2573
2574 Ok(repo.refs().get_thread(&ThreadName::new(thread_name))?)
2575}
2576
2577fn resolve_parent_thread_for_subagent(
2578 repo: &Repository,
2579 probe: &HarnessProbeResult,
2580 current_attached: Option<&str>,
2581) -> Result<Option<String>> {
2582 if let Some(parent_key) = probe.native_parent_actor_key.as_deref() {
2583 let registry = AgentRegistry::new(repo.heddle_dir());
2584 if let Some(entry) = registry.find_active_by_native_actor_key(parent_key)? {
2585 return Ok(Some(entry.thread));
2586 }
2587 }
2588 Ok(current_attached.map(ToString::to_string))
2589}
2590
2591fn preferred_thread_slug(
2592 params: &OpenSessionParams,
2593 probe: &HarnessProbeResult,
2594 identity: &ResolvedIdentity,
2595) -> String {
2596 params
2597 .task
2598 .clone()
2599 .or_else(|| params.summary.clone())
2600 .or_else(|| probe.native_actor_key.as_deref().map(native_key_slug))
2601 .or_else(|| probe.native_instance_key.as_deref().map(native_key_slug))
2602 .or_else(|| identity.harness.clone())
2603 .unwrap_or_else(|| "work".to_string())
2604}
2605
2606fn native_key_slug(value: &str) -> String {
2607 value
2608 .rsplit(':')
2609 .next()
2610 .map(ToString::to_string)
2611 .unwrap_or_else(|| value.to_string())
2612}
2613
2614fn allocate_thread_name(repo: &Repository, base: &str) -> Result<String> {
2615 if ThreadManager::new(repo.heddle_dir()).load(base)?.is_none()
2616 && repo.refs().get_thread(&ThreadName::new(base))?.is_none()
2617 {
2618 return Ok(base.to_string());
2619 }
2620 for idx in 2..1000 {
2621 let candidate = format!("{base}-{idx}");
2622 if ThreadManager::new(repo.heddle_dir())
2623 .load(&candidate)?
2624 .is_none()
2625 && repo
2626 .refs()
2627 .get_thread(&ThreadName::new(&candidate))?
2628 .is_none()
2629 {
2630 return Ok(candidate);
2631 }
2632 }
2633 Err(anyhow!(
2634 "could not allocate a unique thread name from '{base}'"
2635 ))
2636}
2637
2638fn default_private_thread_path(repo: &Repository, name: &str) -> PathBuf {
2639 repo.managed_checkout_path(name)
2647}
2648
2649fn sanitize_name(name: &str) -> String {
2650 let mut out = String::new();
2651 let mut last_dash = false;
2652 for ch in name.chars() {
2653 if ch.is_ascii_alphanumeric() {
2654 out.push(ch.to_ascii_lowercase());
2655 last_dash = false;
2656 } else if !last_dash {
2657 out.push('-');
2658 last_dash = true;
2659 }
2660 }
2661 out.trim_matches('-').to_string()
2662}
2663
2664fn resolve_requested_registry_entry(
2665 registry: &AgentRegistry,
2666 agent_session_id: Option<&str>,
2667 client_instance_id: Option<&str>,
2668) -> Result<Option<AgentEntry>> {
2669 if let Some(agent_session_id) = agent_session_id {
2670 let entry = registry
2671 .load(agent_session_id)?
2672 .ok_or_else(|| anyhow!("agent session not found: {agent_session_id}"))?;
2673 if entry.status != AgentStatus::Active {
2674 return Err(anyhow!("agent session is not active: {agent_session_id}"));
2675 }
2676 return Ok(Some(entry));
2677 }
2678
2679 if let Some(client_instance_id) = client_instance_id {
2680 return Ok(registry.find_active_by_client_instance_id(client_instance_id)?);
2681 }
2682
2683 Ok(None)
2684}
2685
2686fn ensure_requested_entry_matches_session(
2687 requested_entry: Option<&AgentEntry>,
2688 heddle_session_id: &str,
2689) -> Result<()> {
2690 if let Some(entry) = requested_entry
2691 && let Some(bound_session_id) = entry.heddle_session_id.as_deref()
2692 && bound_session_id != heddle_session_id
2693 {
2694 return Err(anyhow!(
2695 "requested agent is already bound to a different heddle session: {}",
2696 entry.session_id
2697 ));
2698 }
2699 Ok(())
2700}
2701
2702fn session_claimed_by_other(
2703 registry: &AgentRegistry,
2704 heddle_session_id: &str,
2705 requested_entry: Option<&AgentEntry>,
2706 client_instance_id: Option<&str>,
2707 native_actor_key: Option<&str>,
2708) -> Result<bool> {
2709 if requested_entry.is_none() && client_instance_id.is_none() && native_actor_key.is_none() {
2710 return Ok(false);
2711 }
2712
2713 let Some(existing) = registry.find_active_by_heddle_session_id(heddle_session_id)? else {
2714 return Ok(false);
2715 };
2716 if let Some(requested) = requested_entry {
2717 return Ok(requested.session_id != existing.session_id);
2718 }
2719 if let Some(client_instance_id) = client_instance_id
2720 && existing.client_instance_id.as_deref() == Some(client_instance_id)
2721 {
2722 return Ok(false);
2723 }
2724 if let Some(native_actor_key) = native_actor_key
2725 && existing.native_actor_key.as_deref() == Some(native_actor_key)
2726 {
2727 return Ok(false);
2728 }
2729 Ok(true)
2730}
2731
2732fn find_matching_registry_entry(
2733 registry: &AgentRegistry,
2734 repo: &Repository,
2735 heddle_session_id: &str,
2736 thread_name: Option<&str>,
2737) -> Result<Option<AgentEntry>> {
2738 if let Some(entry) = registry.find_active_by_heddle_session_id(heddle_session_id)? {
2739 return Ok(Some(entry));
2740 }
2741 let canonical_root = repo
2742 .root()
2743 .canonicalize()
2744 .unwrap_or_else(|_| repo.root().to_path_buf());
2745 Ok(registry
2746 .list()?
2747 .into_iter()
2748 .filter(|entry| entry.status == AgentStatus::Active)
2749 .find(|entry| {
2750 entry
2751 .path
2752 .as_ref()
2753 .map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()) == canonical_root)
2754 .unwrap_or(false)
2755 || thread_name.is_some_and(|thread| entry.thread == thread)
2756 }))
2757}
2758
2759fn merged_env_hints(extra: &BTreeMap<String, String>) -> BTreeMap<String, String> {
2760 let mut merged: BTreeMap<String, String> = std::env::vars()
2761 .filter(|(key, _)| inherited_harness_hint(key))
2762 .collect();
2763 for (key, value) in extra {
2764 merged.insert(key.clone(), value.clone());
2765 }
2766 merged
2767}
2768
2769fn inherited_harness_hint(key: &str) -> bool {
2770 if matches!(
2771 key,
2772 "OPENAI_MODEL"
2773 | "ANTHROPIC_MODEL"
2774 | "CLAUDE_MODEL"
2775 | "MODEL"
2776 | "OPENAI_REASONING_EFFORT"
2777 | "REASONING_EFFORT"
2778 | "THINKING_LEVEL"
2779 | "PROMPT_POLICY"
2780 ) {
2781 return false;
2782 }
2783
2784 key.starts_with("HEDDLE_")
2785 || key.starts_with("CODEX_")
2786 || key == "CLAUDECODE"
2787 || key.starts_with("OPENCODE_")
2788}
2789
2790fn to_json_value<T: Serialize>(value: T) -> Result<Value> {
2791 serde_json::to_value(value).map_err(|err| anyhow!(err))
2792}
2793
2794fn normalize_paths<I>(paths: I) -> Vec<String>
2795where
2796 I: IntoIterator<Item = String>,
2797{
2798 let mut ordered = BTreeSet::new();
2799 for path in paths {
2800 let normalized = path.trim().replace('\\', "/");
2801 if !normalized.is_empty() {
2802 ordered.insert(normalized);
2803 }
2804 }
2805 ordered.into_iter().collect()
2806}
2807
2808fn merge_unique_paths<I>(target: &mut Vec<String>, paths: I)
2809where
2810 I: IntoIterator<Item = String>,
2811{
2812 let mut merged: BTreeSet<String> = target.iter().cloned().collect();
2813 merged.extend(paths);
2814 *target = merged.into_iter().collect();
2815}
2816
2817fn max_u64(current: Option<u64>, candidate: u64) -> u64 {
2818 current
2819 .map(|value| value.max(candidate))
2820 .unwrap_or(candidate)
2821}
2822
2823fn max_u32(current: Option<u32>, candidate: u32) -> u32 {
2824 current
2825 .map(|value| value.max(candidate))
2826 .unwrap_or(candidate)
2827}
2828
2829fn merge_usage(target: &mut UsageTotals, incoming: &UsageTotals) {
2830 if let Some(input) = incoming.input_tokens {
2831 target.input_tokens = Some(max_u64(target.input_tokens, input));
2832 }
2833 if let Some(output) = incoming.output_tokens {
2834 target.output_tokens = Some(max_u64(target.output_tokens, output));
2835 }
2836 if let Some(reasoning) = incoming.reasoning_tokens {
2837 target.reasoning_tokens = Some(max_u64(target.reasoning_tokens, reasoning));
2838 }
2839 if let Some(cache_creation) = incoming.cache_creation_tokens {
2840 target.cache_creation_tokens = Some(max_u64(target.cache_creation_tokens, cache_creation));
2841 }
2842 if let Some(cache_read) = incoming.cache_read_tokens {
2843 target.cache_read_tokens = Some(max_u64(target.cache_read_tokens, cache_read));
2844 }
2845 if let Some(tool_calls) = incoming.tool_calls {
2846 target.tool_calls = Some(max_u32(target.tool_calls, tool_calls));
2847 }
2848 if let Some(cost) = incoming.cost_micros_usd {
2849 target.cost_micros_usd = Some(max_u64(target.cost_micros_usd, cost));
2850 }
2851}
2852
2853fn parse_timestamp(value: &str) -> Option<chrono::DateTime<Utc>> {
2854 chrono::DateTime::parse_from_rfc3339(value)
2855 .ok()
2856 .map(|dt| dt.with_timezone(&Utc))
2857}
2858
2859fn transport_from_report(
2860 report: &SessionReportEnvelope,
2861 fallback: HarnessTransport,
2862) -> HarnessTransport {
2863 match report.transport_mode.as_str() {
2864 "spool" => HarnessTransport::Spool,
2865 "direct" => HarnessTransport::Direct,
2866 "end" => HarnessTransport::End,
2867 _ => fallback,
2868 }
2869}
2870
2871fn mark_pending_flush(report: &mut SessionReportEnvelope) {
2872 report.pending_flush = true;
2873 report.report_flush_state = Some("pending-local".to_string());
2874}
2875
2876fn enqueue_report(store: &SessionReportStore, report: &mut SessionReportEnvelope) -> Result<()> {
2877 store.append_outbox(report)?;
2878 report.pending_flush = false;
2879 let flushed_at = Utc::now().to_rfc3339();
2880 report.last_flushed_at = Some(flushed_at);
2881 report.report_flush_state = Some("queued-local".to_string());
2882 store.save(report)?;
2883 Ok(())
2884}
2885
2886fn usage_to_summary(usage: &UsageTotals) -> AgentUsageSummary {
2887 AgentUsageSummary {
2888 input_tokens: usage.input_tokens,
2889 output_tokens: usage.output_tokens,
2890 reasoning_tokens: usage.reasoning_tokens,
2891 tool_calls: usage.tool_calls,
2892 cost_micros_usd: usage.cost_micros_usd,
2893 }
2894}
2895
2896fn transcript_mode_name(mode: HarnessTranscriptMode) -> &'static str {
2897 match mode {
2898 HarnessTranscriptMode::Off => "off",
2899 HarnessTranscriptMode::Summary => "summary",
2900 HarnessTranscriptMode::Full => "full",
2901 }
2902}
2903
2904fn transport_mode_name(mode: HarnessTransport) -> &'static str {
2905 match mode {
2906 HarnessTransport::Spool => "spool",
2907 HarnessTransport::Direct => "direct",
2908 HarnessTransport::End => "end",
2909 }
2910}
2911
2912struct FinalDiff {
2913 changed_paths: Vec<String>,
2914 diff_summary: SessionDiffSummary,
2915 head_state: Option<String>,
2916}
2917
2918fn compute_final_diff(
2919 repo: &Repository,
2920 base_state: Option<&str>,
2921 worktree_baseline: &[WorktreeChangeBaseline],
2922) -> Result<FinalDiff> {
2923 let mut changes: BTreeMap<String, DiffKind> = BTreeMap::new();
2924
2925 let head_state = repo.head()?;
2926 if let (Some(base_spec), Some(head_id)) = (base_state, head_state) {
2927 let base_id = repo
2928 .resolve_state(base_spec)?
2929 .or_else(|| objects::object::ChangeId::parse(base_spec).ok());
2930 if let Some(base_id) = base_id
2931 && base_id != head_id
2932 {
2933 let Some(base_state_obj) = repo.store().get_state(&base_id)? else {
2934 return Err(anyhow!("base state not found: {base_spec}"));
2935 };
2936 let Some(head_state_obj) = repo.store().get_state(&head_id)? else {
2937 return Err(anyhow!("head state not found: {}", head_id.short()));
2938 };
2939 for change in repo.diff_trees(&base_state_obj.tree, &head_state_obj.tree)? {
2940 changes.insert(change.path, change.kind);
2941 }
2942 }
2943 }
2944
2945 let baseline_paths: BTreeSet<(String, String)> = worktree_baseline
2946 .iter()
2947 .map(|change| (change.path.clone(), change.kind.clone()))
2948 .collect();
2949 for (path, kind) in collect_worktree_changes(repo)? {
2950 let kind_name = diff_kind_name(kind);
2951 if !baseline_paths.contains(&(path.clone(), kind_name.to_string())) {
2952 changes.insert(path, kind);
2953 }
2954 }
2955
2956 let diff_summary = SessionDiffSummary {
2957 changed_file_count: changes.len() as u32,
2958 added_files: changes
2959 .values()
2960 .filter(|kind| **kind == DiffKind::Added)
2961 .count() as u32,
2962 modified_files: changes
2963 .values()
2964 .filter(|kind| **kind == DiffKind::Modified)
2965 .count() as u32,
2966 deleted_files: changes
2967 .values()
2968 .filter(|kind| **kind == DiffKind::Deleted)
2969 .count() as u32,
2970 };
2971
2972 Ok(FinalDiff {
2973 changed_paths: changes.into_keys().collect(),
2974 diff_summary,
2975 head_state: head_state.map(|id| id.to_string_full()),
2976 })
2977}
2978
2979fn capture_worktree_change_snapshot(repo: &Repository) -> Result<Vec<WorktreeChangeBaseline>> {
2980 Ok(collect_worktree_changes(repo)?
2981 .into_iter()
2982 .map(|(path, kind)| WorktreeChangeBaseline {
2983 path,
2984 kind: diff_kind_name(kind).to_string(),
2985 })
2986 .collect())
2987}
2988
2989fn collect_worktree_changes(repo: &Repository) -> Result<BTreeMap<String, DiffKind>> {
2990 let status_options = worktree_status_options(Some(repo.config()));
2991 let worktree_tree = match repo.current_state()? {
2992 Some(state) => repo.require_tree(&state.tree)?,
2993 None => Tree::new(),
2994 };
2995 let status = repo.compare_worktree_cached_with_options(&worktree_tree, &status_options)?;
2996 let mut changes = BTreeMap::new();
2997 for path in status.added {
2998 changes.insert(path.display().to_string(), DiffKind::Added);
2999 }
3000 for path in status.modified {
3001 changes.insert(path.display().to_string(), DiffKind::Modified);
3002 }
3003 for path in status.deleted {
3004 changes.insert(path.display().to_string(), DiffKind::Deleted);
3005 }
3006 Ok(changes)
3007}
3008
3009fn changed_paths_between_states(
3010 repo: &Repository,
3011 before_state: ChangeId,
3012 after_state: ChangeId,
3013) -> Result<Vec<String>> {
3014 if before_state == after_state {
3015 return Ok(Vec::new());
3016 }
3017 let Some(before_state_obj) = repo.store().get_state(&before_state)? else {
3018 return Err(anyhow!(
3019 "timeline before state not found: {}",
3020 before_state.short()
3021 ));
3022 };
3023 let Some(after_state_obj) = repo.store().get_state(&after_state)? else {
3024 return Err(anyhow!(
3025 "timeline after state not found: {}",
3026 after_state.short()
3027 ));
3028 };
3029 let mut paths = BTreeSet::new();
3030 for change in repo.diff_trees(&before_state_obj.tree, &after_state_obj.tree)? {
3031 paths.insert(change.path);
3032 }
3033 Ok(paths.into_iter().collect())
3034}
3035
3036fn diff_kind_name(kind: DiffKind) -> &'static str {
3037 match kind {
3038 DiffKind::Added => "added",
3039 DiffKind::Modified => "modified",
3040 DiffKind::Deleted => "deleted",
3041 DiffKind::Unchanged => "unchanged",
3042 }
3043}
3044
3045struct SessionReportStore {
3046 dir: PathBuf,
3047}
3048
3049impl SessionReportStore {
3050 fn new(repo_root: &Path) -> Self {
3051 Self {
3052 dir: repo_root.join(".heddle/state").join("session-reports"),
3053 }
3054 }
3055
3056 fn session_path(&self, heddle_session_id: &str) -> PathBuf {
3057 self.dir.join(format!("{heddle_session_id}.json"))
3058 }
3059
3060 fn outbox_path(&self) -> PathBuf {
3061 self.dir.join("outbox.jsonl")
3062 }
3063
3064 fn load(&self, heddle_session_id: &str) -> Result<Option<SessionReportEnvelope>> {
3065 let path = self.session_path(heddle_session_id);
3066 if !path.exists() {
3067 return Ok(None);
3068 }
3069 let bytes = fs::read(path)?;
3070 Ok(Some(serde_json::from_slice(&bytes)?))
3071 }
3072
3073 fn save(&self, report: &SessionReportEnvelope) -> Result<()> {
3074 fs::create_dir_all(&self.dir)?;
3075 let path = self.session_path(&report.heddle_session_id);
3076 let bytes = serde_json::to_vec_pretty(report)?;
3077 write_file_atomic(&path, &bytes)?;
3078 Ok(())
3079 }
3080
3081 fn append_outbox(&self, report: &SessionReportEnvelope) -> Result<()> {
3082 fs::create_dir_all(&self.dir)?;
3083 let mut file = OpenOptions::new()
3084 .create(true)
3085 .append(true)
3086 .open(self.outbox_path())?;
3087 serde_json::to_writer(&mut file, report)?;
3088 file.write_all(b"\n")?;
3089 file.flush()?;
3090 Ok(())
3091 }
3092
3093 fn list_pending(&self) -> Result<Vec<String>> {
3094 if !self.dir.exists() {
3095 return Ok(Vec::new());
3096 }
3097 let mut ids = Vec::new();
3098 for entry in fs::read_dir(&self.dir)? {
3099 let entry = entry?;
3100 let path = entry.path();
3101 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
3102 continue;
3103 }
3104 let bytes = fs::read(&path)?;
3105 let report: SessionReportEnvelope = serde_json::from_slice(&bytes)?;
3106 if report.pending_flush {
3107 ids.push(report.heddle_session_id);
3108 }
3109 }
3110 ids.sort();
3111 Ok(ids)
3112 }
3113}
3114
3115#[derive(Debug, Deserialize)]
3116struct BridgeRequest {
3117 #[serde(default)]
3118 id: Option<String>,
3119 method: String,
3120 #[serde(default)]
3121 params: Value,
3122}
3123
3124#[derive(Debug, Serialize)]
3125struct BridgeResponse {
3126 #[serde(default)]
3127 id: Option<String>,
3128 ok: bool,
3129 #[serde(skip_serializing_if = "Option::is_none")]
3130 result: Option<Value>,
3131 #[serde(skip_serializing_if = "Option::is_none")]
3132 error: Option<BridgeError>,
3133}
3134
3135impl BridgeResponse {
3136 fn ok(id: Option<String>, result: Value) -> Self {
3137 Self {
3138 id,
3139 ok: true,
3140 result: Some(result),
3141 error: None,
3142 }
3143 }
3144
3145 fn error(id: Option<String>, code: impl Into<String>, message: impl Into<String>) -> Self {
3146 Self {
3147 id,
3148 ok: false,
3149 result: None,
3150 error: Some(BridgeError {
3151 code: code.into(),
3152 message: message.into(),
3153 }),
3154 }
3155 }
3156}
3157
3158#[derive(Debug, Serialize)]
3159struct BridgeError {
3160 code: String,
3161 message: String,
3162}
3163
3164#[derive(Debug, Clone, Deserialize, Default)]
3165struct OpenSessionParams {
3166 #[serde(default)]
3167 heddle_session_id: Option<String>,
3168 #[serde(default)]
3169 agent_session_id: Option<String>,
3170 #[serde(default)]
3171 client_instance_id: Option<String>,
3172 #[serde(default)]
3173 thread: Option<String>,
3174 #[serde(default)]
3175 task: Option<String>,
3176 #[serde(default)]
3177 summary: Option<String>,
3178 #[serde(default)]
3179 harness: Option<String>,
3180 #[serde(default)]
3181 provider: Option<String>,
3182 #[serde(default)]
3183 model: Option<String>,
3184 #[serde(default)]
3185 thinking_level: Option<String>,
3186 #[serde(default)]
3187 policy: Option<String>,
3188 #[serde(default)]
3189 transport: Option<HarnessTransport>,
3190 #[serde(default)]
3191 transcript_mode: Option<HarnessTranscriptMode>,
3192 #[serde(default)]
3193 argv: Option<Vec<String>>,
3194 #[serde(default)]
3195 env_hints: BTreeMap<String, String>,
3196 #[serde(default)]
3197 probe_metadata: BTreeMap<String, String>,
3198}
3199
3200#[derive(Debug, Clone, Deserialize, Default)]
3201struct UpdateProgressParams {
3202 heddle_session_id: String,
3203 #[serde(default)]
3204 status: Option<String>,
3205 #[serde(default)]
3206 message: Option<String>,
3207 #[serde(default)]
3208 completed_steps: Option<u32>,
3209 #[serde(default)]
3210 total_steps: Option<u32>,
3211 #[serde(default)]
3212 touched_paths: Vec<String>,
3213 #[serde(default)]
3214 summary: Option<String>,
3215 #[serde(default)]
3216 harness: Option<String>,
3217 #[serde(default)]
3218 provider: Option<String>,
3219 #[serde(default)]
3220 model: Option<String>,
3221 #[serde(default)]
3222 thinking_level: Option<String>,
3223 #[serde(default)]
3224 policy: Option<String>,
3225 #[serde(default)]
3226 argv: Option<Vec<String>>,
3227 #[serde(default)]
3228 env_hints: BTreeMap<String, String>,
3229 #[serde(default)]
3230 probe_metadata: BTreeMap<String, String>,
3231}
3232
3233#[derive(Debug, Clone, Deserialize, Default)]
3234struct RecordUsageParams {
3235 heddle_session_id: String,
3236 #[serde(default)]
3237 input_tokens: Option<u64>,
3238 #[serde(default)]
3239 output_tokens: Option<u64>,
3240 #[serde(default)]
3241 reasoning_tokens: Option<u64>,
3242 #[serde(default)]
3243 cache_creation_tokens: Option<u64>,
3244 #[serde(default)]
3245 cache_read_tokens: Option<u64>,
3246 #[serde(default)]
3247 tool_calls: Option<u32>,
3248 #[serde(default)]
3249 cost_micros_usd: Option<u64>,
3250}
3251
3252#[derive(Debug, Clone, Deserialize, Default)]
3253struct RecordTouchedPathsParams {
3254 heddle_session_id: String,
3255 #[serde(default)]
3256 paths: Vec<String>,
3257}
3258
3259#[derive(Debug, Clone, Deserialize, Default)]
3260struct CloseSessionParams {
3261 heddle_session_id: String,
3262 #[serde(default)]
3263 outcome: Option<String>,
3264 #[serde(default)]
3265 summary: Option<String>,
3266 #[serde(default)]
3267 transcript_refs: Option<Vec<TranscriptAttachmentRef>>,
3268 #[serde(default)]
3269 transport: Option<HarnessTransport>,
3270}
3271
3272#[derive(Debug, Clone, Deserialize, Default)]
3273struct FlushReportsParams {
3274 #[serde(default)]
3275 heddle_session_id: Option<String>,
3276}
3277
3278#[derive(Debug, Serialize)]
3279struct OpenSessionResult {
3280 heddle_session_id: String,
3281 heddle_segment_id: Option<String>,
3282 agent_session_id: Option<String>,
3283 created_session: bool,
3284 harness: Option<String>,
3285 provider: Option<String>,
3286 model: Option<String>,
3287 thinking_level: Option<String>,
3288 report_flush_state: Option<String>,
3289 attach_reason: Option<String>,
3290}
3291
3292#[derive(Debug, Serialize)]
3293struct SessionMutationResult {
3294 heddle_session_id: String,
3295 heddle_segment_id: Option<String>,
3296 report_flush_state: Option<String>,
3297}
3298
3299#[derive(Debug, Serialize)]
3300struct CloseSessionResult {
3301 heddle_session_id: String,
3302 changed_paths: Vec<String>,
3303 diff_summary: SessionDiffSummary,
3304 report_flush_state: Option<String>,
3305}
3306
3307#[derive(Debug, Serialize)]
3308struct FlushReportsResult {
3309 flushed: usize,
3310}
3311
3312#[cfg(test)]
3313mod tests {
3314 #[cfg(unix)]
3315 use std::os::unix::fs::PermissionsExt;
3316
3317 use super::*;
3318
3319 fn init_repo() -> (tempfile::TempDir, Repository) {
3320 let temp = tempfile::TempDir::new().unwrap();
3321 let repo = Repository::init_default(temp.path()).unwrap();
3322 (temp, repo)
3323 }
3324
3325 #[test]
3326 fn harness_config_load_missing_path_defaults_without_warning() {
3327 let temp = tempfile::TempDir::new().unwrap();
3328 let missing = temp.path().join("missing-config.toml");
3329
3330 let (config, warning) = load_harness_user_config(Some(missing));
3331
3332 assert_eq!(config.harness.transport, HarnessTransport::Spool);
3333 assert!(warning.is_none());
3334 }
3335
3336 #[test]
3337 fn harness_config_load_malformed_path_warns_and_defaults() {
3338 let temp = tempfile::TempDir::new().unwrap();
3339 let path = temp.path().join("config.toml");
3340 std::fs::write(&path, "[harness\ntransport = \"direct\"\n").unwrap();
3341
3342 let (config, warning) = load_harness_user_config(Some(path.clone()));
3343
3344 assert_eq!(config.harness.transport, HarnessTransport::Spool);
3345 let warning = warning.expect("malformed config should produce a warning");
3346 assert!(warning.contains("failed to load user config"));
3347 assert!(warning.contains(&path.display().to_string()));
3348 assert!(warning.contains("continuing with defaults"));
3349 }
3350
3351 #[test]
3352 fn harness_config_load_valid_path_loads_without_warning() {
3353 let temp = tempfile::TempDir::new().unwrap();
3354 let path = temp.path().join("config.toml");
3355 std::fs::write(
3356 &path,
3357 "[harness]\ntransport = \"direct\"\ntranscript = \"summary\"\n",
3358 )
3359 .unwrap();
3360
3361 let (config, warning) = load_harness_user_config(Some(path));
3362
3363 assert_eq!(config.harness.transport, HarnessTransport::Direct);
3364 assert_eq!(config.harness.transcript, HarnessTranscriptMode::Summary);
3365 assert!(warning.is_none());
3366 }
3367
3368 #[test]
3369 fn relay_payload_parse_invalid_json_warns_and_uses_null() {
3370 let (value, warning) = parse_relay_payload("{not-json");
3371
3372 assert_eq!(value, Value::Null);
3373 let warning = warning.expect("invalid JSON should produce a warning");
3374 assert!(warning.contains("failed to parse harness relay payload as JSON"));
3375 assert!(warning.contains("continuing with null payload"));
3376 }
3377
3378 #[test]
3379 fn relay_payload_parse_empty_payload_uses_null_without_warning() {
3380 let (value, warning) = parse_relay_payload(" \n");
3381
3382 assert_eq!(value, Value::Null);
3383 assert!(warning.is_none());
3384 }
3385
3386 #[test]
3387 fn relay_payload_parse_valid_json_without_warning() {
3388 let (value, warning) = parse_relay_payload(r#"{"message":"hello"}"#);
3389
3390 assert_eq!(value["message"], "hello");
3391 assert!(warning.is_none());
3392 }
3393
3394 #[test]
3402 fn harness_default_path_matches_canonical_thread_dir() {
3403 let (_temp, repo) = init_repo();
3404 for id in ["foo", "parent/task", "feature/foo", "team@scope"] {
3405 let harness_path = default_private_thread_path(&repo, id);
3406 let canonical = repo.managed_checkout_path(id);
3407 assert_eq!(
3408 harness_path, canonical,
3409 "harness default must match the canonical thread_dir for {id:?}"
3410 );
3411 }
3412 }
3413
3414 #[test]
3415 fn inherited_harness_hints_exclude_ambient_model_identity() {
3416 assert!(!inherited_harness_hint("OPENAI_MODEL"));
3417 assert!(!inherited_harness_hint("ANTHROPIC_MODEL"));
3418 assert!(!inherited_harness_hint("CLAUDE_MODEL"));
3419 assert!(!inherited_harness_hint("MODEL"));
3420 assert!(!inherited_harness_hint("OPENAI_REASONING_EFFORT"));
3421 assert!(inherited_harness_hint("HEDDLE_AGENT_MODEL"));
3422 assert!(inherited_harness_hint("CODEX_SANDBOX"));
3423 assert!(inherited_harness_hint("CLAUDECODE"));
3424 }
3425
3426 #[test]
3427 fn open_session_creates_or_attaches() {
3428 let (_temp, repo) = init_repo();
3429 let user_config = UserConfig::default();
3430 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3431
3432 let created = runtime
3433 .open_session(OpenSessionParams {
3434 harness: Some("codex".to_string()),
3435 provider: Some("openai".to_string()),
3436 model: Some("gpt-5.4".to_string()),
3437 ..OpenSessionParams::default()
3438 })
3439 .unwrap();
3440 assert!(created.created_session);
3441
3442 let attached = runtime
3443 .open_session(OpenSessionParams {
3444 harness: Some("codex".to_string()),
3445 provider: Some("openai".to_string()),
3446 model: Some("gpt-5.4".to_string()),
3447 ..OpenSessionParams::default()
3448 })
3449 .unwrap();
3450 assert!(!attached.created_session);
3451 assert_eq!(created.heddle_session_id, attached.heddle_session_id);
3452 }
3453
3454 #[test]
3455 fn same_client_instance_reattaches_to_its_existing_session() {
3456 let (_temp, repo) = init_repo();
3457 let user_config = UserConfig::default();
3458 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3459
3460 let first = runtime
3461 .open_session(OpenSessionParams {
3462 client_instance_id: Some("client-a".to_string()),
3463 harness: Some("codex".to_string()),
3464 provider: Some("openai".to_string()),
3465 model: Some("gpt-5.4".to_string()),
3466 ..OpenSessionParams::default()
3467 })
3468 .unwrap();
3469 let second = runtime
3470 .open_session(OpenSessionParams {
3471 client_instance_id: Some("client-b".to_string()),
3472 harness: Some("codex".to_string()),
3473 provider: Some("openai".to_string()),
3474 model: Some("gpt-5.4".to_string()),
3475 ..OpenSessionParams::default()
3476 })
3477 .unwrap();
3478 let reopened = runtime
3479 .open_session(OpenSessionParams {
3480 client_instance_id: Some("client-a".to_string()),
3481 harness: Some("codex".to_string()),
3482 provider: Some("openai".to_string()),
3483 model: Some("gpt-5.4".to_string()),
3484 ..OpenSessionParams::default()
3485 })
3486 .unwrap();
3487
3488 assert_ne!(first.heddle_session_id, second.heddle_session_id);
3489 assert_eq!(first.heddle_session_id, reopened.heddle_session_id);
3490 assert_eq!(first.agent_session_id, reopened.agent_session_id);
3491 }
3492
3493 #[test]
3494 fn different_client_instances_do_not_share_the_current_session() {
3495 let (_temp, repo) = init_repo();
3496 let user_config = UserConfig::default();
3497 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3498
3499 let first = runtime
3500 .open_session(OpenSessionParams {
3501 client_instance_id: Some("client-a".to_string()),
3502 harness: Some("codex".to_string()),
3503 provider: Some("openai".to_string()),
3504 model: Some("gpt-5.4".to_string()),
3505 ..OpenSessionParams::default()
3506 })
3507 .unwrap();
3508 let second = runtime
3509 .open_session(OpenSessionParams {
3510 client_instance_id: Some("client-b".to_string()),
3511 harness: Some("codex".to_string()),
3512 provider: Some("openai".to_string()),
3513 model: Some("gpt-5.4".to_string()),
3514 ..OpenSessionParams::default()
3515 })
3516 .unwrap();
3517
3518 assert_ne!(first.heddle_session_id, second.heddle_session_id);
3519 assert_ne!(first.agent_session_id, second.agent_session_id);
3520 }
3521
3522 #[test]
3523 fn provider_model_change_creates_segment() {
3524 let (_temp, repo) = init_repo();
3525 let user_config = UserConfig::default();
3526 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3527
3528 let opened = runtime
3529 .open_session(OpenSessionParams {
3530 harness: Some("claude-code".to_string()),
3531 provider: Some("anthropic".to_string()),
3532 model: Some("claude-sonnet".to_string()),
3533 ..OpenSessionParams::default()
3534 })
3535 .unwrap();
3536 runtime
3537 .update_progress(UpdateProgressParams {
3538 heddle_session_id: opened.heddle_session_id.clone(),
3539 provider: Some("openai".to_string()),
3540 model: Some("gpt-5.4".to_string()),
3541 ..UpdateProgressParams::default()
3542 })
3543 .unwrap();
3544
3545 let report = runtime
3546 .reports
3547 .load(&opened.heddle_session_id)
3548 .unwrap()
3549 .unwrap();
3550 let expected_segment = format!("{}-seg-2", opened.heddle_session_id);
3551 assert_eq!(
3552 report.heddle_segment_id.as_deref(),
3553 Some(expected_segment.as_str())
3554 );
3555 }
3556
3557 #[test]
3558 fn blank_agent_model_hint_falls_through_to_detected_model_without_segment_rotation() {
3559 let (_temp, repo) = init_repo();
3560 let user_config = UserConfig::default();
3561 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3562 let blank_model_env = BTreeMap::from([
3563 ("HEDDLE_AGENT_PROVIDER".to_string(), "anthropic".to_string()),
3564 ("HEDDLE_AGENT_MODEL".to_string(), String::new()),
3565 ]);
3566
3567 let opened = runtime
3568 .open_session(OpenSessionParams {
3569 harness: Some("claude-code".to_string()),
3570 env_hints: blank_model_env.clone(),
3571 probe_metadata: BTreeMap::from([
3572 ("session_id".to_string(), "claude-sess-blank".to_string()),
3573 ("model".to_string(), "claude-opus-4-8[1m]".to_string()),
3574 ]),
3575 ..OpenSessionParams::default()
3576 })
3577 .unwrap();
3578 assert_eq!(opened.model.as_deref(), Some("claude-opus-4-8[1m]"));
3579
3580 let original_segment = opened.heddle_segment_id.clone();
3581 runtime
3582 .update_progress(UpdateProgressParams {
3583 heddle_session_id: opened.heddle_session_id.clone(),
3584 env_hints: blank_model_env,
3585 probe_metadata: BTreeMap::from([
3586 ("session_id".to_string(), "claude-sess-blank".to_string()),
3587 ("model".to_string(), "claude-opus-4-8[1m]".to_string()),
3588 ]),
3589 ..UpdateProgressParams::default()
3590 })
3591 .unwrap();
3592
3593 let report = runtime
3594 .reports
3595 .load(&opened.heddle_session_id)
3596 .unwrap()
3597 .unwrap();
3598 assert_eq!(report.harness.model.as_deref(), Some("claude-opus-4-8[1m]"));
3599 assert_eq!(report.heddle_segment_id, original_segment);
3600 }
3601
3602 #[test]
3603 fn close_session_captures_changed_paths_from_status_and_hints() {
3604 let (temp, repo) = init_repo();
3605 let config = UserConfig::default();
3606 let mut runtime = HarnessBridgeRuntime::new(repo, config);
3607
3608 let opened = runtime
3609 .open_session(OpenSessionParams {
3610 harness: Some("codex".to_string()),
3611 provider: Some("openai".to_string()),
3612 model: Some("gpt-5.4".to_string()),
3613 ..OpenSessionParams::default()
3614 })
3615 .unwrap();
3616 std::fs::write(temp.path().join("src.txt"), "hello\n").unwrap();
3617 runtime
3618 .record_touched_paths(RecordTouchedPathsParams {
3619 heddle_session_id: opened.heddle_session_id.clone(),
3620 paths: vec!["src.txt".to_string(), "notes.md".to_string()],
3621 })
3622 .unwrap();
3623 let closed = runtime
3624 .close_session(CloseSessionParams {
3625 heddle_session_id: opened.heddle_session_id.clone(),
3626 outcome: Some("completed".to_string()),
3627 ..CloseSessionParams::default()
3628 })
3629 .unwrap();
3630 let report = runtime
3631 .reports
3632 .load(&opened.heddle_session_id)
3633 .unwrap()
3634 .unwrap();
3635 assert!(closed.changed_paths.iter().any(|path| path == "src.txt"));
3636 assert!(!closed.changed_paths.iter().any(|path| path == "notes.md"));
3637 assert!(report.touched_paths.iter().any(|path| path == "src.txt"));
3638 assert!(report.touched_paths.iter().any(|path| path == "notes.md"));
3639 assert_eq!(
3640 closed.diff_summary.changed_file_count,
3641 closed.changed_paths.len() as u32
3642 );
3643 }
3644
3645 #[test]
3646 fn flush_reports_moves_pending_report_to_outbox() {
3647 let (_temp, repo) = init_repo();
3648 let user_config = UserConfig::default();
3649 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3650
3651 let opened = runtime
3652 .open_session(OpenSessionParams {
3653 harness: Some("codex".to_string()),
3654 provider: Some("openai".to_string()),
3655 model: Some("gpt-5.4".to_string()),
3656 ..OpenSessionParams::default()
3657 })
3658 .unwrap();
3659 let flushed = runtime
3660 .flush_reports(FlushReportsParams {
3661 heddle_session_id: Some(opened.heddle_session_id.clone()),
3662 })
3663 .unwrap();
3664 assert_eq!(flushed.flushed, 1);
3665 let report = runtime
3666 .reports
3667 .load(&opened.heddle_session_id)
3668 .unwrap()
3669 .unwrap();
3670 assert!(!report.pending_flush);
3671 assert_eq!(report.report_flush_state.as_deref(), Some("queued-local"));
3672 assert!(runtime.reports.outbox_path().exists());
3673 }
3674
3675 #[test]
3676 fn explicit_overrides_beat_fingerprint_and_user_defaults() {
3677 let (_temp, repo) = init_repo();
3678 let mut user_config = UserConfig::default();
3679 user_config.harness.harnesses.insert(
3680 "codex".to_string(),
3681 UserHarnessOverride {
3682 provider: Some("openai".to_string()),
3683 model: Some("gpt-default".to_string()),
3684 thinking_level: Some("medium".to_string()),
3685 policy: Some("default".to_string()),
3686 },
3687 );
3688 let identity = resolve_identity(
3689 &repo,
3690 &user_config,
3691 IdentityHints {
3692 harness: Some("codex".to_string()),
3693 provider: Some("openai".to_string()),
3694 model: Some("gpt-5.4".to_string()),
3695 thinking_level: Some("high".to_string()),
3696 policy: Some("custom".to_string()),
3697 probe: HarnessProbeResult::default(),
3698 },
3699 )
3700 .unwrap();
3701 assert_eq!(identity.model.as_deref(), Some("gpt-5.4"));
3702 assert_eq!(identity.thinking_level.as_deref(), Some("high"));
3703 assert_eq!(identity.policy.as_deref(), Some("custom"));
3704 }
3705
3706 #[test]
3707 fn transcript_mode_defaults_to_off_and_keeps_refs_empty() {
3708 let (_temp, repo) = init_repo();
3709 let user_config = UserConfig::default();
3710 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3711
3712 let opened = runtime
3713 .open_session(OpenSessionParams {
3714 harness: Some("codex".to_string()),
3715 provider: Some("openai".to_string()),
3716 model: Some("gpt-5.4".to_string()),
3717 ..OpenSessionParams::default()
3718 })
3719 .unwrap();
3720 let report = runtime
3721 .reports
3722 .load(&opened.heddle_session_id)
3723 .unwrap()
3724 .unwrap();
3725 assert_eq!(report.transcript_mode, "off");
3726 assert!(report.transcript_refs.is_empty());
3727 }
3728
3729 #[test]
3730 fn codex_thread_probe_reattaches_same_actor() {
3731 let (_temp, repo) = init_repo();
3732 let user_config = UserConfig::default();
3733 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3734
3735 let first = runtime
3736 .open_session(OpenSessionParams {
3737 harness: Some("codex".to_string()),
3738 probe_metadata: BTreeMap::from([
3739 ("thread_id".to_string(), "thr_123".to_string()),
3740 ("client_name".to_string(), "codex-tui".to_string()),
3741 ]),
3742 ..OpenSessionParams::default()
3743 })
3744 .unwrap();
3745 let second = runtime
3746 .open_session(OpenSessionParams {
3747 harness: Some("codex".to_string()),
3748 probe_metadata: BTreeMap::from([
3749 ("thread_id".to_string(), "thr_123".to_string()),
3750 ("client_name".to_string(), "codex-tui".to_string()),
3751 ]),
3752 ..OpenSessionParams::default()
3753 })
3754 .unwrap();
3755
3756 assert_eq!(first.agent_session_id, second.agent_session_id);
3757 assert_eq!(first.heddle_session_id, second.heddle_session_id);
3758 }
3759
3760 #[test]
3761 fn opencode_child_session_creates_distinct_actor_with_parent_key() {
3762 let (_temp, repo) = init_repo();
3763 let user_config = UserConfig::default();
3764 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3765
3766 let root = runtime
3767 .open_session(OpenSessionParams {
3768 harness: Some("opencode".to_string()),
3769 probe_metadata: BTreeMap::from([("session_id".to_string(), "root-1".to_string())]),
3770 ..OpenSessionParams::default()
3771 })
3772 .unwrap();
3773 let child = runtime
3774 .open_session(OpenSessionParams {
3775 harness: Some("opencode".to_string()),
3776 probe_metadata: BTreeMap::from([
3777 ("session_id".to_string(), "child-1".to_string()),
3778 ("parent_id".to_string(), "root-1".to_string()),
3779 ]),
3780 ..OpenSessionParams::default()
3781 })
3782 .unwrap();
3783
3784 assert_ne!(root.agent_session_id, child.agent_session_id);
3785 let report = runtime
3786 .reports
3787 .load(&child.heddle_session_id)
3788 .unwrap()
3789 .unwrap();
3790 assert_eq!(
3791 report.native_parent_actor_key.as_deref(),
3792 Some("opencode:session:root-1")
3793 );
3794 }
3795
3796 #[test]
3797 fn claude_resume_with_new_session_id_does_not_steal_existing_actor() {
3798 let (_temp, repo) = init_repo();
3799 let user_config = UserConfig::default();
3800 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3801
3802 let first = runtime
3803 .open_session(OpenSessionParams {
3804 harness: Some("claude-code".to_string()),
3805 probe_metadata: BTreeMap::from([
3806 ("session_id".to_string(), "sess-old".to_string()),
3807 (
3808 "transcript_path".to_string(),
3809 "/tmp/claude/session-a.jsonl".to_string(),
3810 ),
3811 ]),
3812 ..OpenSessionParams::default()
3813 })
3814 .unwrap();
3815 let resumed = runtime
3816 .open_session(OpenSessionParams {
3817 harness: Some("claude-code".to_string()),
3818 probe_metadata: BTreeMap::from([
3819 ("session_id".to_string(), "sess-new".to_string()),
3820 (
3821 "transcript_path".to_string(),
3822 "/tmp/claude/session-a.jsonl".to_string(),
3823 ),
3824 ]),
3825 ..OpenSessionParams::default()
3826 })
3827 .unwrap();
3828
3829 assert_ne!(first.agent_session_id, resumed.agent_session_id);
3830 assert_ne!(first.heddle_session_id, resumed.heddle_session_id);
3831 }
3832
3833 #[test]
3834 fn explicit_claude_harness_beats_generic_session_id_probe_match() {
3835 let (_temp, repo) = init_repo();
3836 let user_config = UserConfig::default();
3837 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3838
3839 let opened = runtime
3840 .open_session(OpenSessionParams {
3841 harness: Some("claude-code".to_string()),
3842 probe_metadata: BTreeMap::from([
3843 ("session_id".to_string(), "claude-sess-1".to_string()),
3844 ("hook_event".to_string(), "SubagentStop".to_string()),
3845 ]),
3846 ..OpenSessionParams::default()
3847 })
3848 .unwrap();
3849 let report = runtime
3850 .reports
3851 .load(&opened.heddle_session_id)
3852 .unwrap()
3853 .unwrap();
3854 assert_eq!(
3855 report.native_actor_key.as_deref(),
3856 Some("claude-code:session:claude-sess-1")
3857 );
3858 assert_eq!(report.harness.harness.as_deref(), Some("claude-code"));
3859 }
3860
3861 #[test]
3862 fn same_native_actor_key_reuses_existing_actor_after_tentative_session_creation() {
3863 let (_temp, repo) = init_repo();
3864 let user_config = UserConfig::default();
3865 let runtime = HarnessBridgeRuntime::new(repo, user_config);
3866 let principal = runtime.repo.get_principal().unwrap();
3867 let mut sessions = SessionManager::new(runtime.repo.root());
3868 let existing_session = sessions
3869 .start_session(
3870 principal.clone(),
3871 "anthropic".to_string(),
3872 "claude-opus-4-7[1m]".to_string(),
3873 None,
3874 )
3875 .unwrap();
3876 let tentative_session = sessions
3877 .start_session(
3878 principal,
3879 "anthropic".to_string(),
3880 "claude-opus-4-7[1m]".to_string(),
3881 None,
3882 )
3883 .unwrap();
3884
3885 let registry = AgentRegistry::new(runtime.repo.heddle_dir());
3886 let existing_entry = registry
3887 .create_generated_entry(|session_id| {
3888 Ok(AgentEntry {
3889 session_id: session_id.to_string(),
3890 client_instance_id: None,
3891 native_actor_key: Some(
3892 "claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
3893 ),
3894 native_parent_actor_key: None,
3895 native_instance_key: Some(
3896 "claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
3897 ),
3898 heddle_session_id: Some(existing_session.id.clone()),
3899 thread_id: None,
3900 thread: "detached".to_string(),
3901 pid: Some(std::process::id()),
3902 boot_id: None,
3903 liveness_path: None,
3904 heartbeat_at: Some(Utc::now()),
3905 anchor_state: None,
3906 anchor_root: None,
3907 reservation_token: Some(objects::store::generate_agent_id()),
3908 path: Some(runtime.repo.root().to_path_buf()),
3909 base_state: String::new(),
3910 started_at: Utc::now(),
3911 provider: Some("anthropic".to_string()),
3912 model: Some("claude-opus-4-7[1m]".to_string()),
3913 harness: Some("claude-code".to_string()),
3914 thinking_level: None,
3915 usage_summary: AgentUsageSummary::default(),
3916 last_progress_at: None,
3917 report_flush_state: Some("pending-local".to_string()),
3918 attach_reason: None,
3919 attach_precedence: vec![],
3920 winning_attach_rule: None,
3921 probe_source: Some("hook_payload".to_string()),
3922 probe_confidence: Some(1.0),
3923 status: AgentStatus::Active,
3924 completed_at: None,
3925 context_queries: vec![],
3926 })
3927 })
3928 .unwrap();
3929
3930 let probe = HarnessProbeResult {
3931 harness: Some("claude-code".to_string()),
3932 provider: Some("anthropic".to_string()),
3933 model: Some("claude-opus-4-7[1m]".to_string()),
3934 native_actor_key: Some(
3935 "claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
3936 ),
3937 native_instance_key: Some(
3938 "claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
3939 ),
3940 probe_source: Some("hook_payload".to_string()),
3941 confidence: Some(1.0),
3942 ..HarnessProbeResult::default()
3943 };
3944 let identity = ResolvedIdentity {
3945 harness: Some("claude-code".to_string()),
3946 provider: Some("anthropic".to_string()),
3947 model: Some("claude-opus-4-7[1m]".to_string()),
3948 thinking_level: None,
3949 policy: None,
3950 };
3951 let mut attach = ResolvedAttachment {
3952 target: AttachTarget::CreateNew {
3953 _because_claimed: false,
3954 },
3955 matched_entry: None,
3956 attach_reason:
3957 "started new Heddle session because no compatible native actor match was found"
3958 .to_string(),
3959 precedence: vec!["native-actor-key:miss".to_string()],
3960 winning_rule: "create-new-session".to_string(),
3961 };
3962
3963 let resolved_entry = runtime
3964 .ensure_registry_entry(RegistryEntryRequest {
3965 heddle_session_id: &tentative_session.id,
3966 thread_name: None,
3967 thread_id: None,
3968 identity: &identity,
3969 probe: &probe,
3970 attach: &attach,
3971 client_instance_id: None,
3972 requested_entry: None,
3973 })
3974 .unwrap();
3975 assert_eq!(resolved_entry.session_id, existing_entry.session_id);
3976 assert_eq!(
3977 resolved_entry.heddle_session_id.as_deref(),
3978 Some(existing_session.id.as_str())
3979 );
3980
3981 let (canonical_session, owns_session) = runtime
3982 .reuse_canonical_actor_session(
3983 &mut sessions,
3984 CanonicalActorSessionRequest {
3985 tentative_session: tentative_session.clone(),
3986 tentative_owns_session: true,
3987 entry: &resolved_entry,
3988 probe: &probe,
3989 attach: &mut attach,
3990 },
3991 )
3992 .unwrap();
3993 assert_eq!(canonical_session.id, existing_session.id);
3994 assert!(!owns_session);
3995 assert!(
3996 attach
3997 .precedence
3998 .iter()
3999 .any(|step| step.starts_with("post-create-native-actor-key:"))
4000 );
4001 assert_eq!(attach.winning_rule, "native-actor-key-post-create");
4002 assert!(
4003 !sessions
4004 .get_session(&tentative_session.id)
4005 .unwrap()
4006 .unwrap()
4007 .is_active()
4008 );
4009 }
4010
4011 #[test]
4012 fn close_session_does_not_blame_preexisting_dirty_worktree() {
4013 let (temp, repo) = init_repo();
4014 std::fs::write(temp.path().join("preexisting.txt"), "already dirty\n").unwrap();
4015 let user_config = UserConfig::default();
4016 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
4017
4018 let opened = runtime
4019 .open_session(OpenSessionParams {
4020 harness: Some("claude-code".to_string()),
4021 provider: Some("anthropic".to_string()),
4022 model: Some("claude-opus-4-7[1m]".to_string()),
4023 ..OpenSessionParams::default()
4024 })
4025 .unwrap();
4026 let closed = runtime
4027 .close_session(CloseSessionParams {
4028 heddle_session_id: opened.heddle_session_id.clone(),
4029 outcome: Some("completed".to_string()),
4030 ..CloseSessionParams::default()
4031 })
4032 .unwrap();
4033 let report = runtime
4034 .reports
4035 .load(&opened.heddle_session_id)
4036 .unwrap()
4037 .unwrap();
4038
4039 assert!(
4040 report
4041 .worktree_changes_at_open
4042 .iter()
4043 .any(|change| change.path == "preexisting.txt")
4044 );
4045 assert!(
4046 !closed
4047 .changed_paths
4048 .iter()
4049 .any(|path| path == "preexisting.txt")
4050 );
4051 assert_eq!(closed.diff_summary.changed_file_count, 0);
4052 }
4053
4054 #[test]
4055 fn timeline_state_delta_paths_ignore_uncaptured_worktree_changes() {
4056 let (temp, repo) = init_repo();
4057 let repo_root = repo.root().to_path_buf();
4058 std::fs::write(repo_root.join("tracked.txt"), b"one\n").unwrap();
4059 let before = repo.snapshot(Some("seed".into()), None).unwrap();
4060 std::fs::write(repo_root.join("tracked.txt"), b"two\n").unwrap();
4061 let after = repo.snapshot(Some("advance".into()), None).unwrap();
4062 std::fs::write(temp.path().join("ambient.txt"), b"not in the state delta\n").unwrap();
4063
4064 assert_eq!(
4065 changed_paths_between_states(&repo, before.change_id, after.change_id).unwrap(),
4066 vec!["tracked.txt"]
4067 );
4068 }
4069
4070 #[test]
4071 fn relay_claude_stop_captures_state_with_agent_attribution() {
4072 let (temp, repo) = init_repo();
4073 let repo_root = repo.root().to_path_buf();
4074
4075 std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
4077 let _ = repo.snapshot(Some("seed".into()), None).unwrap();
4078
4079 std::fs::write(repo_root.join("seed.txt"), b"hello, heddle").unwrap();
4081
4082 drop(repo);
4083
4084 let fresh_repo = Repository::open(temp.path()).unwrap();
4085 let user_config = UserConfig {
4086 principal: Some(crate::config::UserPrincipalConfig {
4087 name: "Ada Lovelace".to_string(),
4088 email: "ada@example.com".to_string(),
4089 }),
4090 ..UserConfig::default()
4091 };
4092 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, user_config);
4093 let payload = serde_json::json!({
4094 "session_id": "claude-sess-123",
4095 "transcript_path": "/tmp/claude/x.jsonl",
4096 "model": {
4097 "id": "claude-opus-4-7",
4098 "display_name": "Claude Opus 4.7",
4099 },
4100 "message": "hook-driven capture test",
4101 "hook_event_name": "Stop",
4102 });
4103 relay_claude(&mut runtime, "Stop", &payload).unwrap();
4104 drop(runtime);
4105
4106 let verify = Repository::open(temp.path()).unwrap();
4107 let head_id = verify.head().unwrap().expect("HEAD after Stop capture");
4108 let state = verify
4109 .store()
4110 .get_state(&head_id)
4111 .unwrap()
4112 .expect("state for HEAD");
4113 let agent = state.attribution.agent.expect("agent attribution on state");
4114 assert_eq!(agent.provider, "anthropic");
4115 assert_eq!(agent.model, "Claude Opus 4.7");
4116 assert_eq!(
4117 state.intent.as_deref(),
4118 Some("hook-driven capture test"),
4119 "intent should be pulled from payload message",
4120 );
4121 }
4122
4123 #[test]
4124 fn relay_claude_stop_is_idempotent_when_clean() {
4125 let (temp, repo) = init_repo();
4126 let repo_root = repo.root().to_path_buf();
4127 std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
4128 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4129 drop(repo);
4130
4131 let fresh_repo = Repository::open(temp.path()).unwrap();
4132 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4133 let payload = serde_json::json!({
4134 "session_id": "claude-sess-clean",
4135 "model": {"id": "claude-sonnet-4-6"},
4136 });
4137 relay_claude(&mut runtime, "Stop", &payload).unwrap();
4138 drop(runtime);
4139
4140 let verify = Repository::open(temp.path()).unwrap();
4141 let head_id = verify.head().unwrap().expect("HEAD preserved");
4142 assert_eq!(
4143 head_id, seed.change_id,
4144 "no change expected when worktree is clean",
4145 );
4146 }
4147
4148 #[test]
4149 fn relay_claude_pre_tool_use_ignores_non_file_tool() {
4150 let (temp, repo) = init_repo();
4151 drop(repo);
4152 let fresh_repo = Repository::open(temp.path()).unwrap();
4153 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4154 let payload = serde_json::json!({
4155 "session_id": "claude-sess-bash",
4156 "tool_name": "Bash",
4157 "tool_input": {"command": "ls"},
4158 });
4159 relay_claude(&mut runtime, "PreToolUse", &payload).unwrap();
4161 }
4162
4163 #[test]
4164 fn relay_opencode_tool_execute_before_records_timeline_step() {
4165 let (_temp, repo) = init_repo();
4166 let root = repo.root().to_path_buf();
4167 std::fs::write(root.join("seed.txt"), b"hello").unwrap();
4168 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4169 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4170 let payload = opencode_tool_payload("call-1");
4171
4172 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4173
4174 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4175 let view = TimelineView::rebuild(&store).unwrap();
4176 let steps = view.steps_for_thread("main");
4177 assert_eq!(steps.len(), 1);
4178 let step = steps[0];
4179 assert_eq!(step.native.as_ref().unwrap().harness, "opencode");
4180 assert_eq!(step.native.as_ref().unwrap().tool_call_id, "call-1");
4181 assert_eq!(step.tool_name.as_deref(), Some("bash"));
4182 assert_eq!(step.before_state, Some(seed.change_id));
4183 assert!(step.status.is_none());
4184 assert!(step.payload_summary.as_deref().unwrap().contains("call-1"));
4185 assert!(step.payload_hash.is_some());
4186 assert!(
4187 step.labels
4188 .contains(&TimelineLabel::ExternalSideEffectsUnknown)
4189 );
4190 }
4191
4192 #[test]
4193 fn relay_opencode_tool_execute_after_captures_dirty_worktree() {
4194 let (_temp, repo) = init_repo();
4195 let root = repo.root().to_path_buf();
4196 std::fs::write(root.join("tracked.txt"), b"one\n").unwrap();
4197 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4198 let user_config = UserConfig {
4199 principal: Some(crate::config::UserPrincipalConfig {
4200 name: "Ada Lovelace".to_string(),
4201 email: "ada@example.com".to_string(),
4202 }),
4203 ..UserConfig::default()
4204 };
4205 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
4206 let payload = opencode_tool_payload("call-2");
4207
4208 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4209 std::fs::write(root.join("tracked.txt"), b"two\n").unwrap();
4210 relay_opencode(&mut runtime, "tool.execute.after", &payload).unwrap();
4211
4212 let head = runtime.repo.head().unwrap().expect("capture advanced HEAD");
4213 assert_ne!(head, seed.change_id);
4214 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4215 let view = TimelineView::rebuild(&store).unwrap();
4216 let steps = view.steps_for_thread("main");
4217 assert_eq!(steps.len(), 1, "before/after should merge by native id");
4218 let step = steps[0];
4219 assert_eq!(step.operation_ids.len(), 2);
4220 assert_eq!(step.status, Some(TimelineToolCallStatus::Succeeded));
4221 assert_eq!(step.before_state, Some(seed.change_id));
4222 assert_eq!(step.after_state, Some(head));
4223 assert_eq!(step.capture_state, Some(head));
4224 assert_eq!(step.changed, Some(true));
4225 assert!(step.touched_paths.contains(&"tracked.txt".to_string()));
4226 assert!(step.labels.contains(&TimelineLabel::RepoReversible));
4227 assert!(
4228 step.labels
4229 .contains(&TimelineLabel::ExternalSideEffectsUnknown)
4230 );
4231 assert!(!step.payload_summary.as_deref().unwrap().contains("SECRET"));
4232 assert!(step.payload_hash.is_some());
4233 }
4234
4235 #[cfg(unix)]
4236 #[test]
4237 fn relay_opencode_tool_execute_after_records_capture_failed_without_ambient_paths() {
4238 let (_temp, repo) = init_repo();
4239 let root = repo.root().to_path_buf();
4240 std::fs::write(root.join("seed.txt"), b"seed\n").unwrap();
4241 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4242 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4243 let mut payload = opencode_tool_payload("call-capture-failed");
4244 payload["tool"]["input"]["file_path"] = serde_json::json!("hinted.txt");
4245 let hooks_dir = root.join(".heddle/hooks");
4246 std::fs::create_dir_all(&hooks_dir).unwrap();
4247 let hook_path = hooks_dir.join("pre-snapshot");
4248 std::fs::write(&hook_path, "#!/bin/sh\nexit 1\n").unwrap();
4249 let mut perms = std::fs::metadata(&hook_path).unwrap().permissions();
4250 perms.set_mode(0o755);
4251 std::fs::set_permissions(&hook_path, perms).unwrap();
4252
4253 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4254 std::fs::write(root.join("ambient.txt"), b"dirty but uncaptured\n").unwrap();
4255 relay_opencode(&mut runtime, "tool.execute.after", &payload).unwrap();
4256
4257 assert_eq!(
4258 runtime.repo.head().unwrap(),
4259 Some(seed.change_id),
4260 "capture failure must not advance HEAD"
4261 );
4262 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4263 let view = TimelineView::rebuild(&store).unwrap();
4264 let steps = view.steps_for_thread("main");
4265 assert_eq!(steps.len(), 1, "before/after should merge by native id");
4266 let step = steps[0];
4267 assert_eq!(step.operation_ids.len(), 2);
4268 assert_eq!(step.before_state, Some(seed.change_id));
4269 assert_eq!(step.after_state, Some(seed.change_id));
4270 assert_eq!(step.capture_state, None);
4271 assert_eq!(step.changed, Some(false));
4272 assert!(step.labels.contains(&TimelineLabel::CaptureFailed));
4273 assert!(
4274 !step.labels.contains(&TimelineLabel::RepoReversible),
4275 "failed captures are not repo-reversible"
4276 );
4277 assert_eq!(step.touched_paths, vec!["hinted.txt"]);
4278 }
4279
4280 #[test]
4281 fn relay_opencode_tool_execute_missing_tool_id_does_not_fail_or_record_timeline() {
4282 let (_temp, repo) = init_repo();
4283 let root = repo.root().to_path_buf();
4284 std::fs::write(root.join("seed.txt"), b"hello").unwrap();
4285 let _ = repo.snapshot(Some("seed".into()), None).unwrap();
4286 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4287 let payload = serde_json::json!({
4288 "sessionID": "opencode-session",
4289 "model": "gpt-5.4",
4290 "provider": "openai",
4291 "tool": {"name": "bash"},
4292 });
4293
4294 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4295
4296 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4297 let view = TimelineView::rebuild(&store).unwrap();
4298 assert!(view.steps_for_thread("main").is_empty());
4299 let report_count = std::fs::read_dir(root.join(".heddle/state/session-reports"))
4300 .unwrap()
4301 .count();
4302 assert!(
4303 report_count > 0,
4304 "session progress should still be recorded"
4305 );
4306 }
4307
4308 fn opencode_tool_payload(call_id: &str) -> Value {
4309 serde_json::json!({
4310 "sessionID": "opencode-session",
4311 "messageID": "message-1",
4312 "toolCallID": call_id,
4313 "model": "gpt-5.4",
4314 "provider": "openai",
4315 "tool": {
4316 "name": "bash",
4317 "input": {
4318 "command": "echo SECRET",
4319 "file_path": "tracked.txt"
4320 }
4321 },
4322 "status": "success"
4323 })
4324 }
4325
4326 #[test]
4327 fn relay_claude_subagent_start_creates_child_entry_with_parent_key() {
4328 let (temp, repo) = init_repo();
4329 drop(repo);
4330 let fresh_repo = Repository::open(temp.path()).unwrap();
4331 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4332 let payload = serde_json::json!({
4333 "session_id": "parent-claude-sess",
4334 "agent_id": "child-subagent-xyz",
4335 "model": {"id": "claude-sonnet-4-6"},
4336 });
4337 relay_claude(&mut runtime, "SubagentStart", &payload).unwrap();
4338 drop(runtime);
4339
4340 let verify = Repository::open(temp.path()).unwrap();
4341 let registry = AgentRegistry::new(verify.heddle_dir());
4342 let child = registry
4343 .find_active_by_native_actor_key("claude-code:agent:child-subagent-xyz")
4344 .unwrap()
4345 .expect("subagent AgentEntry should exist after SubagentStart");
4346 assert_eq!(
4347 child.native_parent_actor_key.as_deref(),
4348 Some("claude-code:session:parent-claude-sess"),
4349 "subagent must carry parent session linkage",
4350 );
4351 assert_eq!(child.status, AgentStatus::Active);
4352 }
4353
4354 #[test]
4355 fn relay_claude_subagent_stop_marks_child_entry_complete() {
4356 let (temp, repo) = init_repo();
4357 let repo_root = repo.root().to_path_buf();
4358 drop(repo);
4359
4360 let fresh = Repository::open(temp.path()).unwrap();
4362 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4363 let start_payload = serde_json::json!({
4364 "session_id": "parent-sess",
4365 "agent_id": "worker-1",
4366 "model": {"id": "claude-sonnet-4-6"},
4367 });
4368 relay_claude(&mut runtime, "SubagentStart", &start_payload).unwrap();
4369 drop(runtime);
4370
4371 std::fs::write(
4373 repo_root.join("child-output.txt"),
4374 b"subagent produced this",
4375 )
4376 .unwrap();
4377
4378 let fresh = Repository::open(temp.path()).unwrap();
4379 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4380 let stop_payload = serde_json::json!({
4381 "session_id": "parent-sess",
4382 "agent_id": "worker-1",
4383 "model": {
4384 "id": "claude-sonnet-4-6",
4385 "display_name": "Claude Sonnet 4.6",
4386 },
4387 });
4388 relay_claude(&mut runtime, "SubagentStop", &stop_payload).unwrap();
4389 drop(runtime);
4390
4391 let verify = Repository::open(temp.path()).unwrap();
4392 let registry = AgentRegistry::new(verify.heddle_dir());
4393 let child = registry
4394 .list()
4395 .unwrap()
4396 .into_iter()
4397 .find(|e| e.native_actor_key.as_deref() == Some("claude-code:agent:worker-1"))
4398 .expect("child entry should still exist");
4399 assert_eq!(
4400 child.status,
4401 AgentStatus::Complete,
4402 "SubagentStop should mark the child entry Complete",
4403 );
4404 }
4405
4406 #[test]
4407 fn relay_claude_user_prompt_submit_rotates_segment() {
4408 let (temp, repo) = init_repo();
4409 drop(repo);
4410
4411 let fresh = Repository::open(temp.path()).unwrap();
4412 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4413 let session_payload = serde_json::json!({
4415 "session_id": "claude-prompt-sess",
4416 "model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
4417 });
4418 relay_claude(&mut runtime, "SessionStart", &session_payload).unwrap();
4419 let sessions_before = SessionManager::new(runtime.repo.root())
4420 .list_sessions(true)
4421 .unwrap();
4422 let initial_segments = sessions_before
4423 .iter()
4424 .find(|s| !s.segments.is_empty())
4425 .map(|s| s.segments.len())
4426 .unwrap_or(0);
4427
4428 let prompt_payload = serde_json::json!({
4430 "session_id": "claude-prompt-sess",
4431 "model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
4432 "prompt": "write a new feature",
4433 });
4434 relay_claude(&mut runtime, "UserPromptSubmit", &prompt_payload).unwrap();
4435 drop(runtime);
4436
4437 let verify = Repository::open(temp.path()).unwrap();
4438 let sessions_after = SessionManager::new(verify.root())
4439 .list_sessions(true)
4440 .unwrap();
4441 let rotated = sessions_after
4442 .iter()
4443 .any(|s| s.segments.len() > initial_segments);
4444 assert!(
4445 rotated,
4446 "UserPromptSubmit must add at least one segment beyond the SessionStart baseline \
4447 (initial={initial_segments}, sessions_after={:?})",
4448 sessions_after
4449 .iter()
4450 .map(|s| (s.id.clone(), s.segments.len()))
4451 .collect::<Vec<_>>(),
4452 );
4453 }
4454}