1use std::{
3 collections::{BTreeMap, BTreeSet},
4 fs::{self, OpenOptions},
5 io::{BufRead, BufReader, BufWriter, Write},
6 path::{Path, PathBuf},
7};
8
9use anyhow::{Result, anyhow};
10use base64::Engine as _;
11use chrono::Utc;
12use objects::{
13 fs_atomic::write_file_atomic,
14 object::{
15 ChangeId, ContentHash, DiffKind, NativeToolCallRefV1, Session, ThreadName,
16 TimelineBranchId, TimelineLabel, TimelineOperationBodyV1, TimelineOperationEnvelope,
17 TimelineStepId, TimelineToolCallStatus, TimelineToolPayloadMetadata, ToolCallFinishedV1,
18 ToolCallStartedV1, Tree,
19 },
20 store::{AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ObjectStore},
21};
22use oplog::OpLogRecorder;
23use wire::{
24 HarnessIdentity, ProgressCheckpoint, SessionDiffSummary, SessionReportEnvelope,
25 TranscriptAttachmentRef, UsageTotals, WorktreeChangeBaseline,
26};
27use refs::Head;
28use repo::{
29 Repository, SessionManager, Thread, ThreadFreshness, ThreadIntegrationPolicy, ThreadManager,
30 ThreadMode, ThreadState, TimelineStore, TimelineView,
31};
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34
35mod claude_hook;
36mod probe;
37
38use self::probe::{HarnessProbeInput, HarnessProbeResult, probe_harness_actor};
39use crate::{
40 cli::{
41 Cli,
42 commands::{
43 snapshot::{
44 SnapshotAgentOverrides, create_snapshot, summarize_confidence,
45 summarize_verification,
46 },
47 worktree_cmd::helpers::{prepare_worktree_target, write_isolated_checkout},
48 },
49 style, worktree_status_options,
50 },
51 config::{
52 HarnessMode, HarnessTranscriptMode, HarnessTransport, UserConfig, UserHarnessOverride,
53 UserHarnessRootThreadPolicy, UserHarnessSubagentThreadPolicy, UserThreadWorkspaceMode,
54 },
55};
56
57pub(crate) fn probe_current_process_harness(
58 repo: &Repository,
59 current_provider: Option<String>,
60 current_model: Option<String>,
61 current_policy: Option<String>,
62) -> Result<HarnessProbeResult> {
63 probe_harness_actor(&HarnessProbeInput {
64 argv: detected_harness_argv().or_else(|| Some(std::env::args().collect())),
65 env_hints: harness_env_hints(),
66 explicit_harness: None,
67 explicit_provider: None,
68 explicit_model: None,
69 explicit_thinking_level: None,
70 explicit_policy: None,
71 probe_metadata: BTreeMap::new(),
72 current_provider,
73 current_model,
74 current_policy,
75 repo_root: repo.root().display().to_string(),
76 })
77}
78
79fn harness_env_hints() -> BTreeMap<String, String> {
80 std::env::vars()
81 .filter(|(key, value)| {
82 !value.trim().is_empty()
83 && (key.starts_with("HEDDLE_AGENT_")
84 || key.starts_with("CODEX_")
85 || key.starts_with("CLAUDE")
86 || key.starts_with("ANTHROPIC_")
87 || key.starts_with("OPENAI_")
88 || key.starts_with("OPENCODE_")
89 || key.starts_with("AIDER_")
90 || matches!(
91 key.as_str(),
92 "MODEL" | "REASONING_EFFORT" | "THINKING_LEVEL"
93 ))
94 })
95 .collect()
96}
97
98fn detected_harness_argv() -> Option<Vec<String>> {
99 detected_harness_argv_impl()
100}
101
102#[cfg(target_os = "linux")]
103fn detected_harness_argv_impl() -> Option<Vec<String>> {
104 let mut pid = std::process::id();
105 for _ in 0..8 {
106 let stat = fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
107 let ppid = stat.split_whitespace().nth(3)?.parse::<u32>().ok()?;
108 if ppid == 0 || ppid == pid {
109 return None;
110 }
111 pid = ppid;
112 let raw = fs::read(format!("/proc/{pid}/cmdline")).ok()?;
113 let argv = raw
114 .split(|byte| *byte == 0)
115 .filter(|part| !part.is_empty())
116 .map(|part| String::from_utf8_lossy(part).to_string())
117 .collect::<Vec<_>>();
118 let program = argv.first().map(|arg| arg.to_ascii_lowercase())?;
119 if ["codex", "claude", "opencode", "aider"]
120 .iter()
121 .any(|needle| program.contains(needle))
122 {
123 return Some(argv);
124 }
125 }
126 None
127}
128
129#[cfg(not(target_os = "linux"))]
130fn detected_harness_argv_impl() -> Option<Vec<String>> {
131 None
132}
133
134pub fn cmd_harness_bridge(cli: &Cli) -> Result<()> {
135 let repo = cli.open_repo()?;
136 let mut runtime = init_harness_runtime(&repo)?;
137
138 let stdin = std::io::stdin();
139 let stdout = std::io::stdout();
140 let reader = BufReader::new(stdin.lock());
141 let mut writer = BufWriter::new(stdout.lock());
142
143 for line in reader.lines() {
144 let line = line?;
145 if line.trim().is_empty() {
146 continue;
147 }
148 let response = match serde_json::from_str::<BridgeRequest>(&line) {
149 Ok(request) => runtime.handle_request(request),
150 Err(err) => BridgeResponse::error(
151 None,
152 "invalid_request",
153 format!("failed to parse request: {err}"),
154 ),
155 };
156 serde_json::to_writer(&mut writer, &response)?;
157 writer.write_all(b"\n")?;
158 writer.flush()?;
159 }
160
161 Ok(())
162}
163
164pub(crate) fn relay_harness_event(
165 repo: &Repository,
166 harness: &str,
167 event: &str,
168 payload: &str,
169) -> Result<()> {
170 let mut runtime = init_harness_runtime(repo)?;
171 let (json, warning) = parse_relay_payload(payload);
172 if let Some(warning) = warning {
173 eprintln!("{}", style::warn(&warning));
174 }
175 match harness {
176 "codex" => relay_codex(&mut runtime, event, &json),
177 "claude-code" => relay_claude(&mut runtime, event, &json),
178 "opencode" => relay_opencode(&mut runtime, event, &json),
179 other => Err(anyhow!("unsupported harness relay: {other}")),
180 }
181}
182
183fn init_harness_runtime(repo: &Repository) -> Result<HarnessBridgeRuntime> {
184 let (user_config, warning) = load_harness_user_config(UserConfig::default_path());
185 if let Some(warning) = warning {
186 eprintln!("{}", style::warn(&warning));
187 }
188 Ok(HarnessBridgeRuntime::new(
189 Repository::open(repo.root())?,
190 user_config,
191 ))
192}
193
194fn load_harness_user_config(default_path: Option<PathBuf>) -> (UserConfig, Option<String>) {
195 let Some(path) = default_path else {
196 return (UserConfig::default(), None);
197 };
198 match UserConfig::load(&path) {
199 Ok(config) => (config, None),
200 Err(err) if is_not_found(&err) => (UserConfig::default(), None),
201 Err(err) => {
202 let warning = format!(
203 "warning: failed to load user config from {}: {err}; continuing with defaults",
204 path.display()
205 );
206 (UserConfig::default(), Some(warning))
207 }
208 }
209}
210
211fn is_not_found(err: &anyhow::Error) -> bool {
212 err.downcast_ref::<std::io::Error>()
213 .is_some_and(|io| io.kind() == std::io::ErrorKind::NotFound)
214}
215
216fn parse_relay_payload(payload: &str) -> (Value, Option<String>) {
217 if payload.trim().is_empty() {
218 return (Value::Null, None);
219 }
220 match serde_json::from_str::<Value>(payload) {
221 Ok(value) => (value, None),
222 Err(err) => (
223 Value::Null,
224 Some(format!(
225 "warning: failed to parse harness relay payload as JSON: {err}; continuing with null payload"
226 )),
227 ),
228 }
229}
230
231struct HarnessBridgeRuntime {
232 repo: Repository,
233 user_config: UserConfig,
234 reports: SessionReportStore,
235}
236
237struct RegistryEntryRequest<'a> {
238 heddle_session_id: &'a str,
239 thread_name: Option<&'a str>,
240 thread_id: Option<&'a str>,
241 identity: &'a ResolvedIdentity,
242 probe: &'a HarnessProbeResult,
243 attach: &'a ResolvedAttachment,
244 client_instance_id: Option<&'a str>,
245 requested_entry: Option<&'a AgentEntry>,
246}
247
248struct CanonicalActorSessionRequest<'a> {
249 tentative_session: Session,
250 tentative_owns_session: bool,
251 entry: &'a AgentEntry,
252 probe: &'a HarnessProbeResult,
253 attach: &'a mut ResolvedAttachment,
254}
255
256struct AttachmentResolutionInput<'a> {
257 requested_entry: Option<&'a AgentEntry>,
258 explicit_heddle_session_id: Option<&'a str>,
259 client_instance_id: Option<&'a str>,
260 probe: &'a HarnessProbeResult,
261 token_claims: Option<&'a TokenClaims>,
262}
263
264fn relay_codex(runtime: &mut HarnessBridgeRuntime, _event: &str, payload: &Value) -> Result<()> {
265 let metadata = map_from_pairs([
266 (
267 "client_name",
268 value_string(payload, &["client"]).or_else(|| value_string(payload, &["client_name"])),
269 ),
270 ("model", value_string(payload, &["model"])),
271 (
272 "model_provider",
273 value_string(payload, &["model_provider"])
274 .or_else(|| value_string(payload, &["provider"])),
275 ),
276 (
277 "model_reasoning_effort",
278 value_string(payload, &["reasoning_effort"]),
279 ),
280 ]);
281 let opened = runtime.open_session(OpenSessionParams {
282 harness: Some("codex".to_string()),
283 summary: value_string(payload, &["message"]),
284 probe_metadata: metadata,
285 ..OpenSessionParams::default()
286 })?;
287 runtime.update_progress(UpdateProgressParams {
288 heddle_session_id: opened.heddle_session_id,
289 summary: value_string(payload, &["message"]),
290 harness: Some("codex".to_string()),
291 ..UpdateProgressParams::default()
292 })?;
293 Ok(())
294}
295
296fn relay_claude(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
297 let metadata = map_from_pairs([
298 ("session_id", value_string(payload, &["session_id"])),
299 ("agent_id", value_string(payload, &["agent_id"])),
300 ("session_name", value_string(payload, &["session_name"])),
301 (
302 "transcript_path",
303 value_string(payload, &["transcript_path"]),
304 ),
305 (
306 "model",
307 value_string(payload, &["model", "id"]).or_else(|| value_string(payload, &["model"])),
308 ),
309 (
310 "model_display_name",
311 value_string(payload, &["model", "display_name"]),
312 ),
313 ("effort", value_string(payload, &["effort"])),
314 ("hook_event", Some(event.to_string())),
315 (
316 "status_line",
317 (event == "StatusLine").then(|| "1".to_string()),
318 ),
319 (
320 "touched_paths",
321 value_array_join(payload, &["tool_response", "filePaths"])
322 .or_else(|| value_string(payload, &["file_path"])),
323 ),
324 (
325 "input_tokens",
326 value_u64_string(payload, &["context_window", "total_input_tokens"]),
327 ),
328 (
329 "output_tokens",
330 value_u64_string(payload, &["context_window", "total_output_tokens"]),
331 ),
332 (
333 "cost_micros_usd",
334 value_cost_micros(payload, &["cost", "total_cost_usd"]),
335 ),
336 ]);
337 let opened = runtime.open_session(OpenSessionParams {
338 harness: Some("claude-code".to_string()),
339 model: value_string(payload, &["model", "display_name"])
340 .or_else(|| value_string(payload, &["model", "id"]))
341 .or_else(|| value_string(payload, &["model"])),
342 summary: value_string(payload, &["message"]).or_else(|| value_string(payload, &["reason"])),
343 probe_metadata: metadata.clone(),
344 ..OpenSessionParams::default()
345 })?;
346 match event {
347 "SessionEnd" => {
348 runtime.close_session(CloseSessionParams {
349 heddle_session_id: opened.heddle_session_id,
350 summary: value_string(payload, &["reason"])
351 .or_else(|| value_string(payload, &["stop_hook_active"])),
352 outcome: Some("completed".to_string()),
353 ..CloseSessionParams::default()
354 })?;
355 }
356 "StatusLine" => {
357 runtime.update_progress(UpdateProgressParams {
358 heddle_session_id: opened.heddle_session_id.clone(),
359 harness: Some("claude-code".to_string()),
360 status: Some("StatusLine".to_string()),
361 message: value_string(payload, &["session_name"])
362 .or_else(|| value_string(payload, &["cwd"]))
363 .or_else(|| value_string(payload, &["workspace", "current_dir"])),
364 probe_metadata: metadata.clone(),
365 ..UpdateProgressParams::default()
366 })?;
367 runtime.record_usage(RecordUsageParams {
368 heddle_session_id: opened.heddle_session_id,
369 input_tokens: value_u64(payload, &["context_window", "total_input_tokens"]),
370 output_tokens: value_u64(payload, &["context_window", "total_output_tokens"]),
371 reasoning_tokens: value_u64(payload, &["context_window", "total_reasoning_tokens"]),
372 cache_creation_tokens: None,
373 cache_read_tokens: None,
374 tool_calls: None,
375 cost_micros_usd: value_cost_micros_u64(payload, &["cost", "total_cost_usd"]),
376 })?;
377 }
378 "Stop" => {
379 runtime.update_progress(UpdateProgressParams {
380 heddle_session_id: opened.heddle_session_id,
381 harness: Some("claude-code".to_string()),
382 status: Some("Stop".to_string()),
383 message: value_string(payload, &["message"])
384 .or_else(|| value_string(payload, &["result"]))
385 .or_else(|| value_string(payload, &["stop_reason"])),
386 probe_metadata: metadata,
387 ..UpdateProgressParams::default()
388 })?;
389 if let Err(err) = claude_hook::handle_stop_capture(
390 &runtime.repo,
391 &runtime.user_config,
392 payload,
393 "Claude Code turn",
394 ) {
395 tracing::warn!(?err, "heddle Stop hook capture failed");
396 }
397 }
398 "SubagentStop" => {
399 runtime.update_progress(UpdateProgressParams {
400 heddle_session_id: opened.heddle_session_id,
401 harness: Some("claude-code".to_string()),
402 status: Some("SubagentStop".to_string()),
403 touched_paths: csv_from_value(metadata.get("touched_paths")),
404 probe_metadata: metadata,
405 ..UpdateProgressParams::default()
406 })?;
407 if let Err(err) = claude_hook::handle_stop_capture(
408 &runtime.repo,
409 &runtime.user_config,
410 payload,
411 "Claude Code subagent turn",
412 ) {
413 tracing::warn!(?err, "heddle SubagentStop hook capture failed");
414 }
415 if let Err(err) = claude_hook::mark_subagent_complete(&runtime.repo, payload) {
416 tracing::debug!(?err, "heddle SubagentStop mark-complete failed");
417 }
418 }
419 "SubagentStart" => {
420 runtime.update_progress(UpdateProgressParams {
427 heddle_session_id: opened.heddle_session_id,
428 harness: Some("claude-code".to_string()),
429 status: Some("SubagentStart".to_string()),
430 touched_paths: csv_from_value(metadata.get("touched_paths")),
431 probe_metadata: metadata,
432 ..UpdateProgressParams::default()
433 })?;
434 }
435 "UserPromptSubmit" => {
436 runtime.update_progress(UpdateProgressParams {
437 heddle_session_id: opened.heddle_session_id.clone(),
438 harness: Some("claude-code".to_string()),
439 status: Some("UserPromptSubmit".to_string()),
440 touched_paths: csv_from_value(metadata.get("touched_paths")),
441 probe_metadata: metadata,
442 ..UpdateProgressParams::default()
443 })?;
444 if let Err(err) = claude_hook::handle_user_prompt_segment_rotate(
445 &runtime.repo,
446 &opened.heddle_session_id,
447 payload,
448 ) {
449 tracing::debug!(?err, "heddle UserPromptSubmit segment rotation failed");
450 }
451 }
452 "PreToolUse" => {
453 runtime.update_progress(UpdateProgressParams {
454 heddle_session_id: opened.heddle_session_id,
455 harness: Some("claude-code".to_string()),
456 status: Some("PreToolUse".to_string()),
457 touched_paths: csv_from_value(metadata.get("touched_paths")),
458 probe_metadata: metadata,
459 ..UpdateProgressParams::default()
460 })?;
461 if let Err(err) = claude_hook::handle_pre_tool_use(&runtime.repo, payload) {
462 tracing::debug!(?err, "heddle PreToolUse context inject skipped");
463 }
464 }
465 _ => {
466 runtime.update_progress(UpdateProgressParams {
467 heddle_session_id: opened.heddle_session_id,
468 harness: Some("claude-code".to_string()),
469 status: Some(event.to_string()),
470 touched_paths: csv_from_value(metadata.get("touched_paths")),
471 probe_metadata: metadata,
472 ..UpdateProgressParams::default()
473 })?;
474 }
475 }
476 Ok(())
477}
478
479fn relay_opencode(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
480 let metadata = map_from_pairs([
481 (
482 "session_id",
483 value_string(payload, &["sessionID"])
484 .or_else(|| value_string(payload, &["session_id"])),
485 ),
486 (
487 "parent_id",
488 value_string(payload, &["parentID"]).or_else(|| value_string(payload, &["parent_id"])),
489 ),
490 (
491 "client_name",
492 value_string(payload, &["client"]).or_else(|| std::env::var("OPENCODE_CLIENT").ok()),
493 ),
494 ("model", value_string(payload, &["model"])),
495 ("provider", value_string(payload, &["provider"])),
496 ("hook_event", Some(event.to_string())),
497 (
498 "touched_paths",
499 value_string(payload, &["file", "path"]).or_else(|| value_string(payload, &["path"])),
500 ),
501 ]);
502 let opened = runtime.open_session(OpenSessionParams {
503 harness: Some("opencode".to_string()),
504 model: value_string(payload, &["model"]),
505 provider: value_string(payload, &["provider"]),
506 probe_metadata: metadata.clone(),
507 ..OpenSessionParams::default()
508 })?;
509 let session_id = opened.heddle_session_id.clone();
510 runtime.update_progress(UpdateProgressParams {
511 heddle_session_id: session_id.clone(),
512 harness: Some("opencode".to_string()),
513 status: Some(event.to_string()),
514 touched_paths: csv_from_value(metadata.get("touched_paths")),
515 probe_metadata: metadata,
516 ..UpdateProgressParams::default()
517 })?;
518 if let Err(err) = record_opencode_timeline_event(runtime, event, payload, &opened) {
519 tracing::debug!(?err, event, "heddle OpenCode timeline recording skipped");
520 }
521 Ok(())
522}
523
524#[derive(Clone, Copy, Debug, PartialEq, Eq)]
525enum TimelineToolEvent {
526 Started,
527 Finished,
528}
529
530trait HarnessTimelineExtractor {
531 fn timeline_event(&self, event: &str) -> Option<TimelineToolEvent>;
532 fn native_tool_call(&self, payload: &Value) -> Option<NativeToolCallRefV1>;
533 fn tool_name(&self, payload: &Value) -> String;
534 fn tool_status(&self, payload: &Value) -> TimelineToolCallStatus;
535 fn payload_metadata(
536 &self,
537 event: &str,
538 payload: &Value,
539 ) -> Result<TimelineToolPayloadMetadata>;
540 fn touched_paths(&self, payload: &Value) -> Vec<String>;
541 fn capture_intent(&self, native: &NativeToolCallRefV1, payload: &Value) -> String;
542
543 fn timeline_thread(
544 &self,
545 runtime: &HarnessBridgeRuntime,
546 opened: &OpenSessionResult,
547 ) -> Result<String> {
548 if let Some(report) = runtime.reports.load(&opened.heddle_session_id)?
549 && let Some(thread) = report.thread
550 {
551 return Ok(thread);
552 }
553 match runtime.repo.head_ref()? {
554 Head::Attached { thread } => Ok(thread.to_string()),
555 Head::Detached { .. } => Ok("main".to_string()),
556 }
557 }
558
559 fn stable_step_id(&self, native: &NativeToolCallRefV1) -> TimelineStepId {
560 let key = format!(
561 "{}\0{}\0{}\0{}",
562 native.harness,
563 native.session_id.as_deref().unwrap_or(""),
564 native.message_id.as_deref().unwrap_or(""),
565 native.tool_call_id
566 );
567 let hash =
568 ContentHash::compute_typed("timeline-native-tool-call-v1", key.as_bytes()).to_hex();
569 TimelineStepId::new(format!("tls-{}", &hash[..24]))
570 }
571
572 fn started_labels(&self, _payload: &Value) -> Vec<TimelineLabel> {
573 vec![TimelineLabel::ExternalSideEffectsUnknown]
574 }
575
576 fn finished_labels(&self, changed: bool, _payload: &Value) -> Vec<TimelineLabel> {
577 if changed {
578 vec![
579 TimelineLabel::RepoReversible,
580 TimelineLabel::ExternalSideEffectsUnknown,
581 ]
582 } else {
583 vec![TimelineLabel::ExternalSideEffectsUnknown]
584 }
585 }
586}
587
588struct OpenCodeTimelineExtractor;
589
590impl HarnessTimelineExtractor for OpenCodeTimelineExtractor {
591 fn timeline_event(&self, event: &str) -> Option<TimelineToolEvent> {
592 match event {
593 "tool.execute.before" => Some(TimelineToolEvent::Started),
594 "tool.execute.after" => Some(TimelineToolEvent::Finished),
595 _ => None,
596 }
597 }
598
599 fn native_tool_call(&self, payload: &Value) -> Option<NativeToolCallRefV1> {
600 opencode_native_tool_call(payload)
601 }
602
603 fn tool_name(&self, payload: &Value) -> String {
604 opencode_tool_name(payload)
605 }
606
607 fn tool_status(&self, payload: &Value) -> TimelineToolCallStatus {
608 opencode_tool_status(payload)
609 }
610
611 fn payload_metadata(
612 &self,
613 event: &str,
614 payload: &Value,
615 ) -> Result<TimelineToolPayloadMetadata> {
616 opencode_payload_metadata(event, payload)
617 }
618
619 fn touched_paths(&self, payload: &Value) -> Vec<String> {
620 opencode_touched_paths(payload)
621 }
622
623 fn capture_intent(&self, native: &NativeToolCallRefV1, payload: &Value) -> String {
624 format!(
625 "OpenCode {} tool call {}",
626 self.tool_name(payload),
627 native.tool_call_id
628 )
629 }
630}
631
632fn record_opencode_timeline_event(
633 runtime: &mut HarnessBridgeRuntime,
634 event: &str,
635 payload: &Value,
636 opened: &OpenSessionResult,
637) -> Result<()> {
638 record_timeline_event(runtime, event, payload, opened, &OpenCodeTimelineExtractor)
639}
640
641fn record_timeline_event<E: HarnessTimelineExtractor>(
642 runtime: &mut HarnessBridgeRuntime,
643 event: &str,
644 payload: &Value,
645 opened: &OpenSessionResult,
646 extractor: &E,
647) -> Result<()> {
648 match extractor.timeline_event(event) {
649 Some(TimelineToolEvent::Started) => {
650 record_timeline_tool_started(runtime, event, payload, opened, extractor)
651 }
652 Some(TimelineToolEvent::Finished) => {
653 record_timeline_tool_finished(runtime, event, payload, opened, extractor)
654 }
655 None => Ok(()),
656 }
657}
658
659fn record_timeline_tool_started<E: HarnessTimelineExtractor>(
660 runtime: &mut HarnessBridgeRuntime,
661 event: &str,
662 payload: &Value,
663 opened: &OpenSessionResult,
664 extractor: &E,
665) -> Result<()> {
666 let Some(native) = extractor.native_tool_call(payload) else {
667 return Ok(());
668 };
669 let Some(before_state) = current_change_id(&runtime.repo)? else {
670 return Ok(());
671 };
672 let thread = extractor.timeline_thread(runtime, opened)?;
673 let store = TimelineStore::open(runtime.repo.heddle_dir())?;
674 let _record_guard = store.lock_recording(&thread)?;
675 let view = TimelineView::rebuild(&store)?;
676 let step_id = extractor.stable_step_id(&native);
677 let (branch_id, parent_step_id) = timeline_position_for_new_tool_step(&view, &thread, &step_id);
678 let envelope = TimelineOperationEnvelope::new(
679 TimelineOperationBodyV1::ToolCallStarted(ToolCallStartedV1 {
680 thread,
681 step_id,
682 branch_id,
683 parent_step_id,
684 native,
685 tool_name: extractor.tool_name(payload),
686 before_state,
687 payload: Some(extractor.payload_metadata(event, payload)?),
688 started_at_ms: Utc::now().timestamp_millis(),
689 }),
690 extractor.started_labels(payload),
691 );
692 store.write_operation(&envelope)?;
693 Ok(())
694}
695
696fn record_timeline_tool_finished<E: HarnessTimelineExtractor>(
697 runtime: &mut HarnessBridgeRuntime,
698 event: &str,
699 payload: &Value,
700 opened: &OpenSessionResult,
701 extractor: &E,
702) -> Result<()> {
703 let Some(native) = extractor.native_tool_call(payload) else {
704 return Ok(());
705 };
706 let Some(fallback_state) = current_change_id(&runtime.repo)? else {
707 return Ok(());
708 };
709 let thread = extractor.timeline_thread(runtime, opened)?;
710 let store = TimelineStore::open(runtime.repo.heddle_dir())?;
711 let _record_guard = store.lock_recording(&thread)?;
712 let before_view = TimelineView::rebuild(&store)?;
713 let step_id = extractor.stable_step_id(&native);
714 let (branch_id, _) = timeline_position_for_new_tool_step(&before_view, &thread, &step_id);
715 let before_state = before_view
716 .step(&thread, &step_id)
717 .and_then(|step| step.before_state)
718 .unwrap_or(fallback_state);
719 let has_worktree_changes_before_capture =
720 !collect_worktree_changes(&runtime.repo)?.is_empty();
721 let mut capture_failed = false;
722 let capture_state = if !has_worktree_changes_before_capture {
723 None
724 } else {
725 let intent = extractor.capture_intent(&native, payload);
726 match create_snapshot(
727 &runtime.repo,
728 &runtime.user_config,
729 Some(intent),
730 None,
731 SnapshotAgentOverrides {
732 provider: opened.provider.clone(),
733 model: opened.model.clone(),
734 session: native.session_id.clone(),
735 segment: None,
736 policy: None,
737 no_policy: false,
738 no_agent: false,
739 },
740 ) {
741 Ok(_) => runtime.repo.head()?,
742 Err(err) => {
743 capture_failed = true;
744 tracing::warn!(?err, "heddle timeline tool capture failed");
745 None
746 }
747 }
748 };
749 let after_state = current_change_id(&runtime.repo)?.unwrap_or(fallback_state);
750 let mut touched_paths = extractor.touched_paths(payload);
751 merge_string_vec(
752 &mut touched_paths,
753 changed_paths_between_states(&runtime.repo, before_state, after_state)?,
754 );
755 let changed = before_state != after_state;
756 let mut labels = extractor.finished_labels(changed, payload);
757 if capture_failed {
758 merge_timeline_labels(&mut labels, vec![TimelineLabel::CaptureFailed]);
759 }
760 let envelope = TimelineOperationEnvelope::new(
761 TimelineOperationBodyV1::ToolCallFinished(ToolCallFinishedV1 {
762 thread,
763 step_id,
764 branch_id,
765 native,
766 status: extractor.tool_status(payload),
767 before_state,
768 after_state,
769 capture_state,
770 capture_oplog_batch_id: None,
771 changed,
772 touched_paths,
773 payload: Some(extractor.payload_metadata(event, payload)?),
774 finished_at_ms: Utc::now().timestamp_millis(),
775 }),
776 labels,
777 );
778 store.write_operation(&envelope)?;
779 Ok(())
780}
781
782fn opencode_native_tool_call(payload: &Value) -> Option<NativeToolCallRefV1> {
783 let tool_call_id = first_value_string(
784 payload,
785 &[
786 &["toolCallID"],
787 &["tool_call_id"],
788 &["toolCallId"],
789 &["callID"],
790 &["call_id"],
791 &["tool", "callID"],
792 &["tool", "call_id"],
793 &["tool", "id"],
794 &["toolCall", "id"],
795 &["tool_call", "id"],
796 &["id"],
797 ],
798 )?;
799 Some(NativeToolCallRefV1 {
800 harness: "opencode".to_string(),
801 session_id: value_string(payload, &["sessionID"])
802 .or_else(|| value_string(payload, &["session_id"])),
803 message_id: value_string(payload, &["messageID"])
804 .or_else(|| value_string(payload, &["message_id"]))
805 .or_else(|| value_string(payload, &["message", "id"])),
806 tool_call_id,
807 })
808}
809
810fn timeline_position_for_new_tool_step(
811 view: &TimelineView,
812 thread: &str,
813 step_id: &TimelineStepId,
814) -> (TimelineBranchId, Option<TimelineStepId>) {
815 let branch_id = view
816 .status(thread)
817 .and_then(|status| status.current_branch_id.clone())
818 .unwrap_or_else(|| TimelineBranchId::new("tlb-main"));
819 let parent_step_id = view
820 .status(thread)
821 .and_then(|status| status.current_step_id.clone())
822 .filter(|current| current != step_id);
823 (branch_id, parent_step_id)
824}
825
826fn current_change_id(repo: &Repository) -> Result<Option<ChangeId>> {
827 Ok(repo
828 .current_state()?
829 .map(|state| state.change_id)
830 .or(repo.head()?))
831}
832
833fn opencode_tool_name(payload: &Value) -> String {
834 first_value_string(
835 payload,
836 &[
837 &["tool", "name"],
838 &["toolName"],
839 &["tool_name"],
840 &["tool"],
841 &["name"],
842 ],
843 )
844 .unwrap_or_else(|| "tool".to_string())
845}
846
847fn opencode_tool_status(payload: &Value) -> TimelineToolCallStatus {
848 let status = first_value_string(
849 payload,
850 &[
851 &["status"],
852 &["tool", "status"],
853 &["result", "status"],
854 &["output", "status"],
855 ],
856 )
857 .unwrap_or_default()
858 .to_ascii_lowercase();
859 if status.contains("cancel") {
860 TimelineToolCallStatus::Cancelled
861 } else if status.contains("fail")
862 || status.contains("error")
863 || payload.get("error").is_some()
864 || payload.get("exception").is_some()
865 {
866 TimelineToolCallStatus::Failed
867 } else {
868 TimelineToolCallStatus::Succeeded
869 }
870}
871
872fn opencode_payload_metadata(event: &str, payload: &Value) -> Result<TimelineToolPayloadMetadata> {
873 let tool_name = opencode_tool_name(payload);
874 let tool_call_id = opencode_native_tool_call(payload)
875 .map(|native| native.tool_call_id)
876 .unwrap_or_default();
877 let raw = serde_json::to_vec(payload)?;
878 let hash = ContentHash::compute_typed("timeline-tool-payload", &raw);
879 let summary = if tool_call_id.is_empty() {
880 format!("OpenCode {event}: {tool_name}")
881 } else {
882 format!("OpenCode {event}: {tool_name} ({tool_call_id})")
883 };
884 Ok(TimelineToolPayloadMetadata {
885 summary: Some(summary),
886 hash: Some(hash),
887 })
888}
889
890fn opencode_touched_paths(payload: &Value) -> Vec<String> {
891 let mut paths = Vec::new();
892 for path in [
893 value_string(payload, &["file", "path"]),
894 value_string(payload, &["path"]),
895 value_string(payload, &["tool", "path"]),
896 value_string(payload, &["tool", "input", "file_path"]),
897 value_string(payload, &["input", "file_path"]),
898 ]
899 .into_iter()
900 .flatten()
901 {
902 if !path.trim().is_empty() && !paths.contains(&path) {
903 paths.push(path);
904 }
905 }
906 for value_path in [
907 &["paths"][..],
908 &["files"][..],
909 &["tool", "input", "paths"][..],
910 &["input", "paths"][..],
911 ] {
912 if let Some(items) = value_string_array(payload, value_path) {
913 merge_string_vec(&mut paths, items);
914 }
915 }
916 paths
917}
918
919fn first_value_string(value: &Value, paths: &[&[&str]]) -> Option<String> {
920 paths.iter().find_map(|path| value_string(value, path))
921}
922
923fn value_string_array(value: &Value, path: &[&str]) -> Option<Vec<String>> {
924 let mut current = value;
925 for segment in path {
926 current = current.get(*segment)?;
927 }
928 current.as_array().map(|items| {
929 items
930 .iter()
931 .filter_map(|item| item.as_str().map(ToString::to_string))
932 .collect()
933 })
934}
935
936fn merge_string_vec(target: &mut Vec<String>, incoming: Vec<String>) {
937 for item in incoming {
938 if !item.trim().is_empty() && !target.contains(&item) {
939 target.push(item);
940 }
941 }
942}
943
944fn merge_timeline_labels(target: &mut Vec<TimelineLabel>, incoming: Vec<TimelineLabel>) {
945 for label in incoming {
946 if !target.contains(&label) {
947 target.push(label);
948 }
949 }
950}
951
952fn value_string(value: &Value, path: &[&str]) -> Option<String> {
953 let mut current = value;
954 for segment in path {
955 current = current.get(*segment)?;
956 }
957 match current {
958 Value::String(s) => Some(s.clone()),
959 Value::Bool(v) => Some(v.to_string()),
960 Value::Number(v) => Some(v.to_string()),
961 _ => None,
962 }
963}
964
965fn value_array_join(value: &Value, path: &[&str]) -> Option<String> {
966 let mut current = value;
967 for segment in path {
968 current = current.get(*segment)?;
969 }
970 current.as_array().map(|items| {
971 items
972 .iter()
973 .filter_map(|item| item.as_str().map(ToString::to_string))
974 .collect::<Vec<_>>()
975 .join(",")
976 })
977}
978
979fn value_u64_string(value: &Value, path: &[&str]) -> Option<String> {
980 let mut current = value;
981 for segment in path {
982 current = current.get(*segment)?;
983 }
984 current.as_u64().map(|v| v.to_string())
985}
986
987fn value_u64(value: &Value, path: &[&str]) -> Option<u64> {
988 let mut current = value;
989 for segment in path {
990 current = current.get(*segment)?;
991 }
992 current.as_u64()
993}
994
995fn value_cost_micros(value: &Value, path: &[&str]) -> Option<String> {
996 let mut current = value;
997 for segment in path {
998 current = current.get(*segment)?;
999 }
1000 current
1001 .as_f64()
1002 .map(|v| ((v * 1_000_000.0).round() as u64).to_string())
1003}
1004
1005fn value_cost_micros_u64(value: &Value, path: &[&str]) -> Option<u64> {
1006 let mut current = value;
1007 for segment in path {
1008 current = current.get(*segment)?;
1009 }
1010 current.as_f64().map(|v| (v * 1_000_000.0).round() as u64)
1011}
1012
1013fn map_from_pairs<const N: usize>(pairs: [(&str, Option<String>); N]) -> BTreeMap<String, String> {
1014 pairs
1015 .into_iter()
1016 .filter_map(|(key, value)| value.map(|value| (key.to_string(), value)))
1017 .collect()
1018}
1019
1020fn csv_from_value(value: Option<&String>) -> Vec<String> {
1021 value
1022 .map(|value| {
1023 value
1024 .split(',')
1025 .map(|item| item.trim().to_string())
1026 .filter(|item| !item.is_empty())
1027 .collect()
1028 })
1029 .unwrap_or_default()
1030}
1031
1032impl HarnessBridgeRuntime {
1033 fn new(repo: Repository, user_config: UserConfig) -> Self {
1034 let reports = SessionReportStore::new(repo.root());
1035 Self {
1036 repo,
1037 user_config,
1038 reports,
1039 }
1040 }
1041
1042 fn handle_request(&mut self, request: BridgeRequest) -> BridgeResponse {
1043 let response = match request.method.as_str() {
1044 "open_session" => self
1045 .decode_params::<OpenSessionParams>(request.params)
1046 .and_then(|params| self.open_session(params))
1047 .and_then(to_json_value),
1048 "update_progress" => self
1049 .decode_params::<UpdateProgressParams>(request.params)
1050 .and_then(|params| self.update_progress(params))
1051 .and_then(to_json_value),
1052 "record_usage" => self
1053 .decode_params::<RecordUsageParams>(request.params)
1054 .and_then(|params| self.record_usage(params))
1055 .and_then(to_json_value),
1056 "record_touched_paths" => self
1057 .decode_params::<RecordTouchedPathsParams>(request.params)
1058 .and_then(|params| self.record_touched_paths(params))
1059 .and_then(to_json_value),
1060 "close_session" => self
1061 .decode_params::<CloseSessionParams>(request.params)
1062 .and_then(|params| self.close_session(params))
1063 .and_then(to_json_value),
1064 "flush_reports" => self
1065 .decode_params::<FlushReportsParams>(request.params)
1066 .and_then(|params| self.flush_reports(params))
1067 .and_then(to_json_value),
1068 other => Err(anyhow!("unknown method '{other}'")),
1069 };
1070
1071 match response {
1072 Ok(result) => BridgeResponse::ok(request.id, result),
1073 Err(err) => BridgeResponse::error(request.id, "bridge_error", err.to_string()),
1074 }
1075 }
1076
1077 fn decode_params<T: for<'de> Deserialize<'de>>(&self, value: Value) -> Result<T> {
1078 serde_json::from_value(value).map_err(|err| anyhow!(err))
1079 }
1080
1081 fn open_session(&mut self, params: OpenSessionParams) -> Result<OpenSessionResult> {
1082 if self.user_config.harness.mode == HarnessMode::Off {
1083 return Err(anyhow!("harness integration is disabled in user config"));
1084 }
1085
1086 let requested_transport = params
1087 .transport
1088 .unwrap_or(self.user_config.harness.transport);
1089 let transcript_mode = params
1090 .transcript_mode
1091 .unwrap_or(self.user_config.harness.transcript);
1092 let env_hints = merged_env_hints(¶ms.env_hints);
1093 let token_claims = user_config_token_claims(&self.user_config);
1094 let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
1095 let current_segment = current_session
1096 .as_ref()
1097 .and_then(|session| session.current_segment());
1098 let probe = probe_harness_actor(&HarnessProbeInput {
1099 argv: params.argv.clone(),
1100 env_hints: env_hints.clone(),
1101 explicit_harness: params.harness.clone(),
1102 explicit_provider: params.provider.clone(),
1103 explicit_model: params.model.clone(),
1104 explicit_thinking_level: params.thinking_level.clone(),
1105 explicit_policy: params.policy.clone(),
1106 probe_metadata: params.probe_metadata.clone(),
1107 current_provider: current_segment.map(|segment| segment.provider.clone()),
1108 current_model: current_segment.map(|segment| segment.model.clone()),
1109 current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
1110 repo_root: self.repo.root().display().to_string(),
1111 })?;
1112 let identity = resolve_identity(
1113 &self.repo,
1114 &self.user_config,
1115 IdentityHints {
1116 harness: params.harness.clone(),
1117 provider: params.provider.clone(),
1118 model: params.model.clone(),
1119 thinking_level: params.thinking_level.clone(),
1120 policy: params.policy.clone(),
1121 probe: probe.clone(),
1122 },
1123 )?;
1124 let registry = AgentRegistry::new(self.repo.heddle_dir());
1125 let requested_entry = resolve_requested_registry_entry(
1126 ®istry,
1127 params.agent_session_id.as_deref(),
1128 params.client_instance_id.as_deref(),
1129 )?;
1130
1131 if self.user_config.harness.mode == HarnessMode::Required
1132 && (identity.harness.is_none()
1133 || identity.provider.is_none()
1134 || identity.model.is_none())
1135 {
1136 return Err(anyhow!(
1137 "harness mode is 'required' but harness/provider/model could not be resolved"
1138 ));
1139 }
1140
1141 let mut sessions = SessionManager::new(self.repo.root());
1142 let principal = self.repo.get_principal()?;
1143 let mut attach = resolve_actor_attachment(
1144 ®istry,
1145 &self.repo,
1146 &mut sessions,
1147 AttachmentResolutionInput {
1148 requested_entry: requested_entry.as_ref(),
1149 explicit_heddle_session_id: params.heddle_session_id.as_deref(),
1150 client_instance_id: params.client_instance_id.as_deref(),
1151 probe: &probe,
1152 token_claims: token_claims.as_ref(),
1153 },
1154 )?;
1155 let (session, owns_session) = match &attach.target {
1156 AttachTarget::ExistingSession(session) => {
1157 let segment_id = session.current_segment_id.clone().unwrap_or_default();
1158 sessions.set_current_session(&session.id, &segment_id)?;
1159 (session.clone(), false)
1160 }
1161 AttachTarget::CreateNew {
1162 _because_claimed: _,
1163 } => {
1164 let session = sessions.start_session(
1165 principal,
1166 identity
1167 .provider
1168 .clone()
1169 .unwrap_or_else(|| "unknown".to_string()),
1170 identity
1171 .model
1172 .clone()
1173 .unwrap_or_else(|| "unknown".to_string()),
1174 identity.policy.clone(),
1175 )?;
1176 (session, true)
1177 }
1178 };
1179
1180 let (thread_name, thread_id) =
1181 self.resolve_harness_thread_binding(¶ms, &probe, &identity)?;
1182 let entry = self.ensure_registry_entry(RegistryEntryRequest {
1183 heddle_session_id: &session.id,
1184 thread_name: thread_name.as_deref(),
1185 thread_id: thread_id.as_deref(),
1186 identity: &identity,
1187 probe: &probe,
1188 attach: &attach,
1189 client_instance_id: params.client_instance_id.as_deref(),
1190 requested_entry: requested_entry.as_ref(),
1191 })?;
1192 let (session, owns_session) = self.reuse_canonical_actor_session(
1193 &mut sessions,
1194 CanonicalActorSessionRequest {
1195 tentative_session: session,
1196 tentative_owns_session: owns_session,
1197 entry: &entry,
1198 probe: &probe,
1199 attach: &mut attach,
1200 },
1201 )?;
1202
1203 let mut segment_id = session.current_segment_id.clone().unwrap_or_default();
1204 if should_rotate_segment(&session, &identity) {
1205 let segment = sessions.add_segment(
1206 &session.id,
1207 identity
1208 .provider
1209 .clone()
1210 .unwrap_or_else(|| "unknown".to_string()),
1211 identity
1212 .model
1213 .clone()
1214 .unwrap_or_else(|| "unknown".to_string()),
1215 identity.policy.clone(),
1216 )?;
1217 segment_id = segment.id;
1218 }
1219
1220 let base_state = self
1221 .repo
1222 .current_state()?
1223 .map(|state| state.change_id.to_string_full())
1224 .or_else(|| {
1225 self.repo
1226 .head()
1227 .ok()
1228 .flatten()
1229 .map(|id| id.to_string_full())
1230 });
1231 let worktree_changes_at_open = capture_worktree_change_snapshot(&self.repo)?;
1232 let opened_at = Utc::now().to_rfc3339();
1233 let mut report = SessionReportEnvelope {
1234 version: 1,
1235 heddle_session_id: session.id.clone(),
1236 heddle_segment_id: (!segment_id.is_empty()).then_some(segment_id.clone()),
1237 agent_session_id: Some(entry.session_id.clone()),
1238 client_instance_id: entry.client_instance_id.clone(),
1239 native_actor_key: entry.native_actor_key.clone(),
1240 native_parent_actor_key: entry.native_parent_actor_key.clone(),
1241 native_instance_key: entry.native_instance_key.clone(),
1242 repo_root: self.repo.root().display().to_string(),
1243 thread: thread_name.clone(),
1244 thread_id,
1245 task: params.task.clone(),
1246 summary: params.summary.clone(),
1247 opened_at,
1248 closed_at: None,
1249 base_state_at_open: base_state.clone(),
1250 worktree_changes_at_open,
1251 head_state_at_close: None,
1252 transport_mode: transport_mode_name(requested_transport).to_string(),
1253 transcript_mode: transcript_mode_name(transcript_mode).to_string(),
1254 outcome: None,
1255 harness: identity.to_transport_identity(),
1256 progress: Vec::new(),
1257 usage: UsageTotals::default(),
1258 touched_paths: Vec::new(),
1259 changed_paths: Vec::new(),
1260 diff_summary: None,
1261 transcript_refs: Vec::new(),
1262 last_progress_at: None,
1263 report_flush_state: Some("pending-local".to_string()),
1264 attach_reason: Some(attach.attach_reason.clone()),
1265 attach_precedence: attach.precedence.clone(),
1266 winning_attach_rule: Some(attach.winning_rule.clone()),
1267 probe_source: probe.probe_source.clone(),
1268 probe_confidence: probe.confidence,
1269 pending_flush: true,
1270 last_flushed_at: None,
1271 owns_session,
1272 };
1273 merge_unique_paths(&mut report.touched_paths, probe.touched_paths.clone());
1274 merge_usage(&mut report.usage, &probe.usage_totals);
1275 if transcript_mode != HarnessTranscriptMode::Off {
1276 report.transcript_refs = probe.transcript_refs.clone();
1277 }
1278 self.reports.save(&report)?;
1279 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1280 if matches!(requested_transport, HarnessTransport::Direct) {
1281 enqueue_report(&self.reports, &mut report)?;
1282 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1283 }
1284
1285 Ok(OpenSessionResult {
1286 heddle_session_id: report.heddle_session_id.clone(),
1287 heddle_segment_id: report.heddle_segment_id.clone(),
1288 agent_session_id: report.agent_session_id.clone(),
1289 created_session: owns_session,
1290 harness: report.harness.harness.clone(),
1291 provider: report.harness.provider.clone(),
1292 model: report.harness.model.clone(),
1293 thinking_level: report.harness.thinking_level.clone(),
1294 report_flush_state: report.report_flush_state.clone(),
1295 attach_reason: report.attach_reason.clone(),
1296 })
1297 }
1298
1299 fn update_progress(&mut self, params: UpdateProgressParams) -> Result<SessionMutationResult> {
1300 let mut report = self
1301 .reports
1302 .load(¶ms.heddle_session_id)?
1303 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1304 let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
1305 let current_segment = current_session
1306 .as_ref()
1307 .and_then(|session| session.current_segment());
1308 let probe = probe_harness_actor(&HarnessProbeInput {
1309 argv: params.argv.clone(),
1310 env_hints: merged_env_hints(¶ms.env_hints),
1311 explicit_harness: params.harness.clone(),
1312 explicit_provider: params.provider.clone(),
1313 explicit_model: params.model.clone(),
1314 explicit_thinking_level: params.thinking_level.clone(),
1315 explicit_policy: params.policy.clone(),
1316 probe_metadata: params.probe_metadata.clone(),
1317 current_provider: current_segment.map(|segment| segment.provider.clone()),
1318 current_model: current_segment.map(|segment| segment.model.clone()),
1319 current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
1320 repo_root: self.repo.root().display().to_string(),
1321 })?;
1322 let identity = resolve_identity(
1323 &self.repo,
1324 &self.user_config,
1325 IdentityHints {
1326 harness: params.harness.clone(),
1327 provider: params.provider.clone(),
1328 model: params.model.clone(),
1329 thinking_level: params.thinking_level.clone(),
1330 policy: params.policy.clone(),
1331 probe: probe.clone(),
1332 },
1333 )?;
1334 self.ensure_segment_for_report(&mut report, &identity)?;
1335 if report.harness.harness.is_none() {
1336 report.harness.harness = identity.harness.clone();
1337 }
1338 if report.harness.provider.is_none() {
1339 report.harness.provider = identity.provider.clone();
1340 }
1341 if report.harness.model.is_none() {
1342 report.harness.model = identity.model.clone();
1343 }
1344 if report.harness.thinking_level.is_none() {
1345 report.harness.thinking_level = identity.thinking_level.clone();
1346 }
1347 if report.harness.policy.is_none() {
1348 report.harness.policy = identity.policy.clone();
1349 }
1350 if report.native_actor_key.is_none() {
1351 report.native_actor_key = probe.native_actor_key.clone();
1352 }
1353 if report.native_parent_actor_key.is_none() {
1354 report.native_parent_actor_key = probe.native_parent_actor_key.clone();
1355 }
1356 if report.native_instance_key.is_none() {
1357 report.native_instance_key = probe.native_instance_key.clone();
1358 }
1359 if report.probe_source.is_none() {
1360 report.probe_source = probe.probe_source.clone();
1361 }
1362 if report.probe_confidence.is_none() {
1363 report.probe_confidence = probe.confidence;
1364 }
1365
1366 let recorded_at = Utc::now().to_rfc3339();
1367 let checkpoint = ProgressCheckpoint {
1368 status: params.status.clone(),
1369 message: params.message.clone(),
1370 completed_steps: params.completed_steps,
1371 total_steps: params.total_steps,
1372 touched_paths: normalize_paths(
1373 params
1374 .touched_paths
1375 .into_iter()
1376 .chain(probe.touched_paths)
1377 .collect::<Vec<_>>(),
1378 ),
1379 recorded_at: recorded_at.clone(),
1380 };
1381 merge_unique_paths(
1382 &mut report.touched_paths,
1383 checkpoint.touched_paths.iter().cloned(),
1384 );
1385 merge_usage(&mut report.usage, &probe.usage_totals);
1386 if report.transcript_mode != "off" && report.transcript_refs.is_empty() {
1387 report.transcript_refs = probe.transcript_refs;
1388 }
1389 report.progress.push(checkpoint);
1390 if let Some(summary) = params.summary {
1391 report.summary = Some(summary);
1392 }
1393 report.last_progress_at = Some(recorded_at);
1394 mark_pending_flush(&mut report);
1395 self.persist_report(report)
1396 }
1397
1398 fn resolve_harness_thread_binding(
1399 &self,
1400 params: &OpenSessionParams,
1401 probe: &HarnessProbeResult,
1402 identity: &ResolvedIdentity,
1403 ) -> Result<(Option<String>, Option<String>)> {
1404 if let Some(thread) = params.thread.clone() {
1405 let thread_id = thread_id_for_name(&self.repo, Some(&thread))?;
1406 return Ok((Some(thread), thread_id));
1407 }
1408
1409 let current_attached = match self.repo.head_ref()? {
1410 Head::Attached { thread } => Some(thread.to_string()),
1411 Head::Detached { .. } => None,
1412 };
1413
1414 if !probe.attach_hints.root_actor
1415 && self.user_config.harness.threading.subagent
1416 == UserHarnessSubagentThreadPolicy::CreateChild
1417 && let Some(parent_thread) =
1418 resolve_parent_thread_for_subagent(&self.repo, probe, current_attached.as_deref())?
1419 && can_create_harness_thread(&self.repo, Some(&parent_thread), Some(&parent_thread))?
1420 {
1421 let name = allocate_thread_name(
1422 &self.repo,
1423 &format!(
1424 "{}/{}",
1425 parent_thread,
1426 sanitize_name(&preferred_thread_slug(params, probe, identity))
1427 ),
1428 )?;
1429 self.ensure_harness_thread(
1430 &name,
1431 Some(&parent_thread),
1432 Some(&parent_thread),
1433 params.task.clone(),
1434 )?;
1435 let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
1436 return Ok((Some(name), thread_id));
1437 }
1438
1439 if probe.attach_hints.root_actor
1440 && self.user_config.harness.threading.root_actor
1441 == UserHarnessRootThreadPolicy::CreateNew
1442 && let Some(current) = current_attached.clone()
1443 && can_create_harness_thread(&self.repo, Some(¤t), None)?
1444 {
1445 let name = allocate_thread_name(
1446 &self.repo,
1447 &format!(
1448 "{}/{}",
1449 current,
1450 sanitize_name(&preferred_thread_slug(params, probe, identity))
1451 ),
1452 )?;
1453 self.ensure_harness_thread(&name, Some(¤t), None, params.task.clone())?;
1454 let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
1455 return Ok((Some(name), thread_id));
1456 }
1457
1458 let thread_id = thread_id_for_name(&self.repo, current_attached.as_deref())?;
1459 Ok((current_attached, thread_id))
1460 }
1461
1462 fn ensure_harness_thread(
1463 &self,
1464 name: &str,
1465 target_thread: Option<&str>,
1466 parent_thread: Option<&str>,
1467 task: Option<String>,
1468 ) -> Result<()> {
1469 let manager = ThreadManager::new(self.repo.heddle_dir());
1470 if manager.load(name)?.is_some() {
1471 return Ok(());
1472 }
1473
1474 let base_state = self
1475 .resolve_harness_thread_base_state(target_thread, parent_thread)?
1476 .ok_or_else(|| anyhow!("No current state to start a thread from"))?;
1477 let tn = ThreadName::new(name);
1478 if self.repo.refs().get_thread(&tn)?.is_none() {
1479 self.repo
1480 .refs()
1481 .set_thread_cas(&tn, refs::RefExpectation::Missing, &base_state)?;
1482 self.repo.oplog().record_thread_create(
1487 &tn,
1488 &base_state,
1489 None,
1490 Some(&self.repo.op_scope()),
1491 )?;
1492 }
1493
1494 let workspace_mode = self
1495 .user_config
1496 .harness
1497 .threading
1498 .workspace_default
1499 .unwrap_or(UserThreadWorkspaceMode::Materialized);
1500 let thread_mode = match workspace_mode {
1501 UserThreadWorkspaceMode::Materialized | UserThreadWorkspaceMode::Auto => {
1502 ThreadMode::Materialized
1503 }
1504 UserThreadWorkspaceMode::Virtualized => ThreadMode::Virtualized,
1505 UserThreadWorkspaceMode::Solid => ThreadMode::Solid,
1506 };
1507 let path = match thread_mode {
1508 ThreadMode::Solid | ThreadMode::Materialized => {
1509 default_private_thread_path(&self.repo, name)
1510 }
1511 ThreadMode::Virtualized => default_private_thread_path(&self.repo, name),
1514 };
1515 let abs_path = prepare_worktree_target(&self.repo, &path, Some(name))?.path;
1516 write_isolated_checkout(&self.repo, &abs_path, &base_state, Some(name))?;
1517
1518 let base_state_obj = self
1519 .repo
1520 .store()
1521 .get_state(&base_state)?
1522 .ok_or_else(|| anyhow!("Base state '{}' not found", base_state.short()))?;
1523 let thread = Thread {
1524 id: name.to_string(),
1525 thread: name.to_string(),
1526 target_thread: target_thread.map(ToString::to_string),
1527 parent_thread: parent_thread.map(ToString::to_string),
1528 mode: thread_mode.clone(),
1529 state: ThreadState::Active,
1530 base_state: base_state.short(),
1531 base_root: base_state_obj.tree.short(),
1532 current_state: Some(base_state.short()),
1533 merged_state: None,
1534 task,
1535 execution_path: abs_path.clone(),
1536 materialized_path: match thread_mode {
1537 ThreadMode::Solid => Some(abs_path),
1538 ThreadMode::Materialized | ThreadMode::Virtualized => None,
1542 },
1543 changed_paths: vec![],
1544 impact_categories: vec![],
1545 heavy_impact_paths: vec![],
1546 promotion_suggested: false,
1547 freshness: if target_thread.is_some() {
1548 ThreadFreshness::Current
1549 } else {
1550 ThreadFreshness::Unknown
1551 },
1552 verification_summary: summarize_verification(base_state_obj.verification.as_ref()),
1553 confidence_summary: summarize_confidence(base_state_obj.confidence),
1554 integration_policy_result: ThreadIntegrationPolicy::default(),
1555 created_at: Utc::now(),
1556 updated_at: Utc::now(),
1557 ephemeral: None,
1558 auto: true,
1563 shared_target_dir: None,
1566 };
1567 manager.save(&thread)?;
1568 Ok(())
1569 }
1570
1571 fn resolve_harness_thread_base_state(
1572 &self,
1573 target_thread: Option<&str>,
1574 parent_thread: Option<&str>,
1575 ) -> Result<Option<objects::object::ChangeId>> {
1576 resolve_harness_thread_base_state(&self.repo, target_thread, parent_thread)
1577 }
1578
1579 fn record_usage(&mut self, params: RecordUsageParams) -> Result<SessionMutationResult> {
1580 let mut report = self
1581 .reports
1582 .load(¶ms.heddle_session_id)?
1583 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1584 if let Some(input) = params.input_tokens {
1585 report.usage.input_tokens = Some(max_u64(report.usage.input_tokens, input));
1586 }
1587 if let Some(output) = params.output_tokens {
1588 report.usage.output_tokens = Some(max_u64(report.usage.output_tokens, output));
1589 }
1590 if let Some(reasoning) = params.reasoning_tokens {
1591 report.usage.reasoning_tokens = Some(max_u64(report.usage.reasoning_tokens, reasoning));
1592 }
1593 if let Some(cache_creation) = params.cache_creation_tokens {
1594 report.usage.cache_creation_tokens =
1595 Some(max_u64(report.usage.cache_creation_tokens, cache_creation));
1596 }
1597 if let Some(cache_read) = params.cache_read_tokens {
1598 report.usage.cache_read_tokens =
1599 Some(max_u64(report.usage.cache_read_tokens, cache_read));
1600 }
1601 if let Some(tool_calls) = params.tool_calls {
1602 report.usage.tool_calls = Some(max_u32(report.usage.tool_calls, tool_calls));
1603 }
1604 if let Some(cost) = params.cost_micros_usd {
1605 report.usage.cost_micros_usd = Some(max_u64(report.usage.cost_micros_usd, cost));
1606 }
1607 mark_pending_flush(&mut report);
1608 self.persist_report(report)
1609 }
1610
1611 fn record_touched_paths(
1612 &mut self,
1613 params: RecordTouchedPathsParams,
1614 ) -> Result<SessionMutationResult> {
1615 let mut report = self
1616 .reports
1617 .load(¶ms.heddle_session_id)?
1618 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1619 merge_unique_paths(&mut report.touched_paths, normalize_paths(params.paths));
1620 mark_pending_flush(&mut report);
1621 self.persist_report(report)
1622 }
1623
1624 fn close_session(&mut self, params: CloseSessionParams) -> Result<CloseSessionResult> {
1625 let mut report = self
1626 .reports
1627 .load(¶ms.heddle_session_id)?
1628 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1629 report.closed_at = Some(Utc::now().to_rfc3339());
1630 report.outcome = params.outcome.clone();
1631 if let Some(summary) = params.summary {
1632 report.summary = Some(summary);
1633 }
1634 if let Some(transcript_refs) = params.transcript_refs {
1635 report.transcript_refs = transcript_refs;
1636 }
1637 let final_diff = compute_final_diff(
1638 &self.repo,
1639 report.base_state_at_open.as_deref(),
1640 &report.worktree_changes_at_open,
1641 )?;
1642 report.head_state_at_close = final_diff.head_state;
1643 report.changed_paths = final_diff.changed_paths;
1644 report.diff_summary = Some(final_diff.diff_summary);
1645 mark_pending_flush(&mut report);
1646 if report.owns_session {
1647 let mut sessions = SessionManager::new(self.repo.root());
1648 if let Ok(Some(session)) = sessions.get_session(&report.heddle_session_id)
1649 && session.is_active()
1650 {
1651 let _ = sessions.end_session(Some(&report.heddle_session_id));
1652 }
1653 }
1654
1655 let transport = params
1656 .transport
1657 .unwrap_or(self.user_config.harness.transport);
1658 if matches!(transport, HarnessTransport::Direct | HarnessTransport::End) {
1659 enqueue_report(&self.reports, &mut report)?;
1660 } else {
1661 self.reports.save(&report)?;
1662 }
1663 self.sync_registry_from_report(&report, AgentStatus::Complete)?;
1664 Ok(CloseSessionResult {
1665 heddle_session_id: report.heddle_session_id,
1666 changed_paths: report.changed_paths,
1667 diff_summary: report.diff_summary.unwrap_or_default(),
1668 report_flush_state: report.report_flush_state,
1669 })
1670 }
1671
1672 fn flush_reports(&mut self, params: FlushReportsParams) -> Result<FlushReportsResult> {
1673 let mut flushed = 0usize;
1674 let session_ids = match params.heddle_session_id {
1675 Some(session_id) => vec![session_id],
1676 None => self.reports.list_pending()?,
1677 };
1678 for session_id in session_ids {
1679 let Some(mut report) = self.reports.load(&session_id)? else {
1680 continue;
1681 };
1682 if !report.pending_flush {
1683 continue;
1684 }
1685 enqueue_report(&self.reports, &mut report)?;
1686 let status = if report.closed_at.is_some() {
1687 AgentStatus::Complete
1688 } else {
1689 AgentStatus::Active
1690 };
1691 self.sync_registry_from_report(&report, status)?;
1692 flushed += 1;
1693 }
1694 Ok(FlushReportsResult { flushed })
1695 }
1696
1697 fn persist_report(
1698 &mut self,
1699 mut report: SessionReportEnvelope,
1700 ) -> Result<SessionMutationResult> {
1701 let transport = transport_from_report(&report, self.user_config.harness.transport);
1702 match transport {
1703 HarnessTransport::Direct => {
1704 enqueue_report(&self.reports, &mut report)?;
1705 }
1706 HarnessTransport::Spool | HarnessTransport::End => {
1707 self.reports.save(&report)?;
1708 }
1709 }
1710 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1711 Ok(SessionMutationResult {
1712 heddle_session_id: report.heddle_session_id,
1713 heddle_segment_id: report.heddle_segment_id,
1714 report_flush_state: report.report_flush_state,
1715 })
1716 }
1717
1718 fn ensure_segment_for_report(
1719 &self,
1720 report: &mut SessionReportEnvelope,
1721 identity: &ResolvedIdentity,
1722 ) -> Result<()> {
1723 let mut sessions = SessionManager::new(self.repo.root());
1724 let Some(session) = sessions.get_session(&report.heddle_session_id)? else {
1725 return Ok(());
1726 };
1727 if !session.is_active() || !should_rotate_segment(&session, identity) {
1728 return Ok(());
1729 }
1730 let segment = sessions.add_segment(
1731 &report.heddle_session_id,
1732 identity
1733 .provider
1734 .clone()
1735 .unwrap_or_else(|| "unknown".to_string()),
1736 identity
1737 .model
1738 .clone()
1739 .unwrap_or_else(|| "unknown".to_string()),
1740 identity.policy.clone(),
1741 )?;
1742 report.heddle_segment_id = Some(segment.id);
1743 if identity.provider.is_some() {
1744 report.harness.provider = identity.provider.clone();
1745 }
1746 if identity.model.is_some() {
1747 report.harness.model = identity.model.clone();
1748 }
1749 if identity.policy.is_some() {
1750 report.harness.policy = identity.policy.clone();
1751 }
1752 if identity.thinking_level.is_some() {
1753 report.harness.thinking_level = identity.thinking_level.clone();
1754 }
1755 Ok(())
1756 }
1757
1758 fn ensure_registry_entry(&self, request: RegistryEntryRequest<'_>) -> Result<AgentEntry> {
1759 let RegistryEntryRequest {
1760 heddle_session_id,
1761 thread_name,
1762 thread_id,
1763 identity,
1764 probe,
1765 attach,
1766 client_instance_id,
1767 requested_entry,
1768 } = request;
1769 let registry = AgentRegistry::new(self.repo.heddle_dir());
1770 let fallback_entry = if client_instance_id.is_some()
1771 || probe.native_actor_key.is_some()
1772 || probe.native_instance_key.is_some()
1773 {
1774 None
1775 } else {
1776 find_matching_registry_entry(®istry, &self.repo, heddle_session_id, thread_name)?
1777 };
1778 if let Some(entry) = requested_entry
1779 .cloned()
1780 .or_else(|| attach.matched_entry.clone())
1781 .or(fallback_entry)
1782 {
1783 return registry
1784 .update_entry(&entry.session_id, |existing| {
1785 if client_instance_id.is_some() {
1786 existing.client_instance_id = client_instance_id.map(ToString::to_string);
1787 }
1788 if probe.native_actor_key.is_some() {
1789 existing.native_actor_key = probe.native_actor_key.clone();
1790 }
1791 if probe.native_parent_actor_key.is_some() {
1792 existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
1793 }
1794 if probe.native_instance_key.is_some() {
1795 existing.native_instance_key = probe.native_instance_key.clone();
1796 }
1797 existing.heddle_session_id = Some(heddle_session_id.to_string());
1798 existing.thread_id = thread_id.map(ToString::to_string);
1799 if let Some(thread_name) = thread_name {
1800 existing.thread = thread_name.to_string();
1801 }
1802 existing.path = Some(self.repo.root().to_path_buf());
1803 if identity.provider.is_some() {
1804 existing.provider = identity.provider.clone();
1805 }
1806 if identity.model.is_some() {
1807 existing.model = identity.model.clone();
1808 }
1809 if identity.harness.is_some() {
1810 existing.harness = identity.harness.clone();
1811 }
1812 if identity.thinking_level.is_some() {
1813 existing.thinking_level = identity.thinking_level.clone();
1814 }
1815 existing.attach_reason = Some(attach.attach_reason.clone());
1816 existing.attach_precedence = attach.precedence.clone();
1817 existing.winning_attach_rule = Some(attach.winning_rule.clone());
1818 existing.probe_source = probe.probe_source.clone();
1819 existing.probe_confidence = probe.confidence;
1820 existing.status = AgentStatus::Active;
1821 })?
1822 .ok_or_else(|| anyhow!("registry entry disappeared during update"));
1823 }
1824
1825 if client_instance_id.is_none() && probe.native_actor_key.is_some() {
1826 let (entry, _) = registry.find_or_create_active_entry(
1827 |entry| {
1828 claude_actor_compatible(entry, probe, self.repo.root())
1829 && entry.native_actor_key == probe.native_actor_key
1830 },
1831 |existing| {
1832 if client_instance_id.is_some() {
1833 existing.client_instance_id = client_instance_id.map(ToString::to_string);
1834 }
1835 if existing.heddle_session_id.is_none() {
1836 existing.heddle_session_id = Some(heddle_session_id.to_string());
1837 }
1838 existing.thread_id = thread_id.map(ToString::to_string);
1839 if let Some(thread_name) = thread_name {
1840 existing.thread = thread_name.to_string();
1841 }
1842 existing.path = Some(self.repo.root().to_path_buf());
1843 if identity.provider.is_some() {
1844 existing.provider = identity.provider.clone();
1845 }
1846 if identity.model.is_some() {
1847 existing.model = identity.model.clone();
1848 }
1849 if identity.harness.is_some() {
1850 existing.harness = identity.harness.clone();
1851 }
1852 if identity.thinking_level.is_some() {
1853 existing.thinking_level = identity.thinking_level.clone();
1854 }
1855 if probe.native_parent_actor_key.is_some() {
1856 existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
1857 }
1858 if probe.native_instance_key.is_some() {
1859 existing.native_instance_key = probe.native_instance_key.clone();
1860 }
1861 existing.attach_reason = Some(attach.attach_reason.clone());
1862 existing.attach_precedence = attach.precedence.clone();
1863 existing.winning_attach_rule = Some(attach.winning_rule.clone());
1864 existing.probe_source = probe.probe_source.clone();
1865 existing.probe_confidence = probe.confidence;
1866 existing.status = AgentStatus::Active;
1867 },
1868 |session_id| {
1869 Ok(AgentEntry {
1870 session_id: session_id.to_string(),
1871 client_instance_id: client_instance_id.map(ToString::to_string),
1872 native_actor_key: probe.native_actor_key.clone(),
1873 native_parent_actor_key: probe.native_parent_actor_key.clone(),
1874 native_instance_key: probe.native_instance_key.clone(),
1875 heddle_session_id: Some(heddle_session_id.to_string()),
1876 thread_id: thread_id.map(ToString::to_string),
1877 thread: thread_name.unwrap_or("detached").to_string(),
1878 pid: Some(std::process::id()),
1879 boot_id: None,
1880 liveness_path: None,
1881 heartbeat_at: Some(Utc::now()),
1882 anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
1883 anchor_root: None,
1884 reservation_token: Some(objects::store::generate_agent_id()),
1885 path: Some(self.repo.root().to_path_buf()),
1886 base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
1887 started_at: Utc::now(),
1888 provider: identity.provider.clone(),
1889 model: identity.model.clone(),
1890 harness: identity.harness.clone(),
1891 thinking_level: identity.thinking_level.clone(),
1892 usage_summary: AgentUsageSummary::default(),
1893 last_progress_at: None,
1894 report_flush_state: Some("pending-local".to_string()),
1895 attach_reason: Some(attach.attach_reason.clone()),
1896 attach_precedence: attach.precedence.clone(),
1897 winning_attach_rule: Some(attach.winning_rule.clone()),
1898 probe_source: probe.probe_source.clone(),
1899 probe_confidence: probe.confidence,
1900 status: AgentStatus::Active,
1901 completed_at: None,
1902 context_queries: vec![],
1903 })
1904 },
1905 )?;
1906 return Ok(entry);
1907 }
1908
1909 Ok(registry.create_generated_entry(|session_id| {
1910 Ok(AgentEntry {
1911 session_id: session_id.to_string(),
1912 client_instance_id: client_instance_id.map(ToString::to_string),
1913 native_actor_key: probe.native_actor_key.clone(),
1914 native_parent_actor_key: probe.native_parent_actor_key.clone(),
1915 native_instance_key: probe.native_instance_key.clone(),
1916 heddle_session_id: Some(heddle_session_id.to_string()),
1917 thread_id: thread_id.map(ToString::to_string),
1918 thread: thread_name.unwrap_or("detached").to_string(),
1919 pid: Some(std::process::id()),
1920 boot_id: None,
1921 liveness_path: None,
1922 heartbeat_at: Some(Utc::now()),
1923 anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
1924 anchor_root: None,
1925 reservation_token: Some(objects::store::generate_agent_id()),
1926 path: Some(self.repo.root().to_path_buf()),
1927 base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
1928 started_at: Utc::now(),
1929 provider: identity.provider.clone(),
1930 model: identity.model.clone(),
1931 harness: identity.harness.clone(),
1932 thinking_level: identity.thinking_level.clone(),
1933 usage_summary: AgentUsageSummary::default(),
1934 last_progress_at: None,
1935 report_flush_state: Some("pending-local".to_string()),
1936 attach_reason: Some(attach.attach_reason.clone()),
1937 attach_precedence: attach.precedence.clone(),
1938 winning_attach_rule: Some(attach.winning_rule.clone()),
1939 probe_source: probe.probe_source.clone(),
1940 probe_confidence: probe.confidence,
1941 status: AgentStatus::Active,
1942 completed_at: None,
1943 context_queries: vec![],
1944 })
1945 })?)
1946 }
1947
1948 fn reuse_canonical_actor_session(
1949 &self,
1950 sessions: &mut SessionManager,
1951 request: CanonicalActorSessionRequest<'_>,
1952 ) -> Result<(Session, bool)> {
1953 let CanonicalActorSessionRequest {
1954 tentative_session,
1955 tentative_owns_session,
1956 entry,
1957 probe,
1958 attach,
1959 } = request;
1960 let Some(canonical_session_id) = entry.heddle_session_id.as_deref() else {
1961 return Ok((tentative_session, tentative_owns_session));
1962 };
1963 if canonical_session_id == tentative_session.id {
1964 return Ok((tentative_session, tentative_owns_session));
1965 }
1966
1967 if tentative_owns_session
1968 && let Ok(Some(session)) = sessions.get_session(&tentative_session.id)
1969 && session.is_active()
1970 {
1971 let _ = sessions.end_session(Some(&tentative_session.id));
1972 }
1973
1974 let canonical_session = sessions
1975 .get_session(canonical_session_id)?
1976 .ok_or_else(|| anyhow!("session not found: {canonical_session_id}"))?;
1977 let canonical_segment_id = canonical_session
1978 .current_segment_id
1979 .clone()
1980 .unwrap_or_default();
1981 sessions.set_current_session(canonical_session_id, &canonical_segment_id)?;
1982
1983 if let Some(native_actor_key) = probe
1984 .native_actor_key
1985 .as_deref()
1986 .or(entry.native_actor_key.as_deref())
1987 {
1988 attach.precedence.push(format!(
1989 "post-create-native-actor-key:{native_actor_key}:matched"
1990 ));
1991 attach.attach_reason = format!(
1992 "reused existing native actor {} on Heddle session {}",
1993 native_actor_key, canonical_session_id
1994 );
1995 attach.winning_rule = "native-actor-key-post-create".to_string();
1996 }
1997
1998 Ok((canonical_session, false))
1999 }
2000
2001 fn sync_registry_from_report(
2002 &self,
2003 report: &SessionReportEnvelope,
2004 status: AgentStatus,
2005 ) -> Result<()> {
2006 let registry = AgentRegistry::new(self.repo.heddle_dir());
2007 let entry = if let Some(agent_session_id) = &report.agent_session_id {
2008 registry.update_entry(agent_session_id, |entry| {
2009 if report.client_instance_id.is_some() {
2010 entry.client_instance_id = report.client_instance_id.clone();
2011 }
2012 if report.native_actor_key.is_some() {
2013 entry.native_actor_key = report.native_actor_key.clone();
2014 }
2015 if report.native_parent_actor_key.is_some() {
2016 entry.native_parent_actor_key = report.native_parent_actor_key.clone();
2017 }
2018 if report.native_instance_key.is_some() {
2019 entry.native_instance_key = report.native_instance_key.clone();
2020 }
2021 entry.heddle_session_id = Some(report.heddle_session_id.clone());
2022 entry.path = Some(self.repo.root().to_path_buf());
2023 entry.harness = report.harness.harness.clone();
2024 entry.provider = report.harness.provider.clone();
2025 entry.model = report.harness.model.clone();
2026 entry.thinking_level = report.harness.thinking_level.clone();
2027 entry.usage_summary = usage_to_summary(&report.usage);
2028 entry.last_progress_at =
2029 report.last_progress_at.as_deref().and_then(parse_timestamp);
2030 entry.report_flush_state = report.report_flush_state.clone();
2031 entry.attach_reason = report.attach_reason.clone();
2032 entry.attach_precedence = report.attach_precedence.clone();
2033 entry.winning_attach_rule = report.winning_attach_rule.clone();
2034 entry.probe_source = report.probe_source.clone();
2035 entry.probe_confidence = report.probe_confidence;
2036 entry.status = status.clone();
2037 entry.completed_at = match status {
2038 AgentStatus::Active => None,
2039 AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
2040 Some(Utc::now())
2041 }
2042 };
2043 })?
2044 } else {
2045 None
2046 };
2047
2048 if entry.is_none() {
2049 let resolved = self.ensure_registry_entry(RegistryEntryRequest {
2050 heddle_session_id: &report.heddle_session_id,
2051 thread_name: report.thread.as_deref(),
2052 thread_id: report.thread_id.as_deref(),
2053 identity: &ResolvedIdentity {
2054 harness: report.harness.harness.clone(),
2055 provider: report.harness.provider.clone(),
2056 model: report.harness.model.clone(),
2057 thinking_level: report.harness.thinking_level.clone(),
2058 policy: report.harness.policy.clone(),
2059 },
2060 probe: &HarnessProbeResult {
2061 native_actor_key: report.native_actor_key.clone(),
2062 native_parent_actor_key: report.native_parent_actor_key.clone(),
2063 native_instance_key: report.native_instance_key.clone(),
2064 probe_source: report.probe_source.clone(),
2065 confidence: report.probe_confidence,
2066 ..HarnessProbeResult::default()
2067 },
2068 attach: &ResolvedAttachment {
2069 target: AttachTarget::CreateNew {
2070 _because_claimed: false,
2071 },
2072 matched_entry: None,
2073 attach_reason: report.attach_reason.clone().unwrap_or_else(|| {
2074 format!(
2075 "created actor for Heddle session {}",
2076 report.heddle_session_id
2077 )
2078 }),
2079 precedence: report.attach_precedence.clone(),
2080 winning_rule: report
2081 .winning_attach_rule
2082 .clone()
2083 .unwrap_or_else(|| "report-sync".to_string()),
2084 },
2085 client_instance_id: report.client_instance_id.as_deref(),
2086 requested_entry: None,
2087 })?;
2088 let mut report = report.clone();
2089 report.agent_session_id = Some(resolved.session_id);
2090 self.reports.save(&report)?;
2091 }
2092 Ok(())
2093 }
2094}
2095
2096#[derive(Debug, Clone, Default)]
2097struct ResolvedIdentity {
2098 harness: Option<String>,
2099 provider: Option<String>,
2100 model: Option<String>,
2101 thinking_level: Option<String>,
2102 policy: Option<String>,
2103}
2104
2105impl ResolvedIdentity {
2106 fn to_transport_identity(&self) -> HarnessIdentity {
2107 HarnessIdentity {
2108 harness: self.harness.clone(),
2109 provider: self.provider.clone(),
2110 model: self.model.clone(),
2111 thinking_level: self.thinking_level.clone(),
2112 policy: self.policy.clone(),
2113 }
2114 }
2115}
2116
2117struct IdentityHints {
2118 harness: Option<String>,
2119 provider: Option<String>,
2120 model: Option<String>,
2121 thinking_level: Option<String>,
2122 policy: Option<String>,
2123 probe: HarnessProbeResult,
2124}
2125
2126fn resolve_identity(
2127 repo: &Repository,
2128 user_config: &UserConfig,
2129 hints: IdentityHints,
2130) -> Result<ResolvedIdentity> {
2131 let current_session = SessionManager::new(repo.root()).get_current_session()?;
2132 let current_segment = current_session
2133 .as_ref()
2134 .and_then(|session| session.current_segment());
2135 let token_claims = if user_config.harness.auto_infer {
2136 user_config_token_claims(user_config)
2137 } else {
2138 None
2139 };
2140 let harness_override = resolved_harness_override(
2141 user_config,
2142 hints.harness.as_deref(),
2143 hints.probe.harness.as_deref(),
2144 );
2145
2146 Ok(ResolvedIdentity {
2147 harness: hints.harness.or(hints.probe.harness),
2148 provider: hints
2149 .provider
2150 .or(hints.probe.provider)
2151 .or_else(|| current_segment.map(|segment| segment.provider.clone()))
2152 .or_else(|| {
2153 token_claims
2154 .as_ref()
2155 .and_then(|claims| claims.agent_provider.clone())
2156 })
2157 .or_else(|| harness_override.and_then(|entry| entry.provider.clone()))
2158 .or_else(|| user_config.agent.provider.clone()),
2159 model: hints
2160 .model
2161 .or(hints.probe.model)
2162 .or_else(|| current_segment.map(|segment| segment.model.clone()))
2163 .or_else(|| {
2164 token_claims
2165 .as_ref()
2166 .and_then(|claims| claims.agent_model.clone())
2167 })
2168 .or_else(|| harness_override.and_then(|entry| entry.model.clone()))
2169 .or_else(|| user_config.agent.model.clone()),
2170 thinking_level: hints
2171 .thinking_level
2172 .or(hints.probe.thinking_level)
2173 .or_else(|| harness_override.and_then(|entry| entry.thinking_level.clone())),
2174 policy: hints
2175 .policy
2176 .or(hints.probe.policy)
2177 .or_else(|| current_segment.and_then(|segment| segment.policy_id.clone()))
2178 .or_else(|| harness_override.and_then(|entry| entry.policy.clone()))
2179 .or_else(|| user_config.agent.default_policy.clone()),
2180 })
2181}
2182
2183fn resolved_harness_override<'a>(
2184 user_config: &'a UserConfig,
2185 explicit: Option<&str>,
2186 fingerprint: Option<&str>,
2187) -> Option<&'a UserHarnessOverride> {
2188 explicit
2189 .and_then(|name| user_config.harness.harnesses.get(name))
2190 .or_else(|| fingerprint.and_then(|name| user_config.harness.harnesses.get(name)))
2191}
2192
2193enum AttachTarget {
2194 ExistingSession(objects::object::Session),
2195 CreateNew { _because_claimed: bool },
2196}
2197
2198struct ResolvedAttachment {
2199 target: AttachTarget,
2200 matched_entry: Option<AgentEntry>,
2201 attach_reason: String,
2202 precedence: Vec<String>,
2203 winning_rule: String,
2204}
2205
2206fn resolve_actor_attachment(
2207 registry: &AgentRegistry,
2208 repo: &Repository,
2209 sessions: &mut SessionManager,
2210 input: AttachmentResolutionInput<'_>,
2211) -> Result<ResolvedAttachment> {
2212 let AttachmentResolutionInput {
2213 requested_entry,
2214 explicit_heddle_session_id,
2215 client_instance_id,
2216 probe,
2217 token_claims,
2218 } = input;
2219 let mut precedence = Vec::new();
2220 if let Some(entry) = requested_entry
2221 && let Some(bound_session_id) = entry.heddle_session_id.as_deref()
2222 {
2223 precedence.push(format!(
2224 "explicit-agent-session:{}:matched",
2225 entry.session_id
2226 ));
2227 let session = sessions
2228 .get_session(bound_session_id)?
2229 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2230 if !session.is_active() {
2231 return Err(anyhow!("session is not active: {bound_session_id}"));
2232 }
2233 return Ok(ResolvedAttachment {
2234 target: AttachTarget::ExistingSession(session),
2235 matched_entry: Some(entry.clone()),
2236 attach_reason: format!(
2237 "reattached actor {} to existing Heddle session {}",
2238 entry.session_id, bound_session_id
2239 ),
2240 precedence,
2241 winning_rule: "explicit-agent-session".to_string(),
2242 });
2243 }
2244 precedence.push("explicit-agent-session:miss".to_string());
2245
2246 if let Some(session_id) = explicit_heddle_session_id {
2247 precedence.push(format!("explicit-heddle-session:{session_id}:matched"));
2248 ensure_requested_entry_matches_session(requested_entry, session_id)?;
2249 let session = sessions
2250 .get_session(session_id)?
2251 .ok_or_else(|| anyhow!("session not found: {session_id}"))?;
2252 if !session.is_active() {
2253 return Err(anyhow!("session is not active: {session_id}"));
2254 }
2255 return Ok(ResolvedAttachment {
2256 target: AttachTarget::ExistingSession(session),
2257 matched_entry: None,
2258 attach_reason: format!("attached to explicit Heddle session {session_id}"),
2259 precedence,
2260 winning_rule: "explicit-heddle-session".to_string(),
2261 });
2262 }
2263 precedence.push("explicit-heddle-session:miss".to_string());
2264
2265 if client_instance_id.is_none()
2266 && let Some(native_actor_key) = probe.native_actor_key.as_deref()
2267 {
2268 if let Some(entry) = registry.find_active_by_native_actor_key(native_actor_key)?
2269 && claude_actor_compatible(&entry, probe, repo.root())
2270 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2271 {
2272 precedence.push(format!("native-actor-key:{native_actor_key}:matched"));
2273 let session = sessions
2274 .get_session(&bound_session_id)?
2275 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2276 if session.is_active() {
2277 return Ok(ResolvedAttachment {
2278 target: AttachTarget::ExistingSession(session),
2279 matched_entry: Some(entry),
2280 attach_reason: format!(
2281 "reattached native actor {} to Heddle session {}",
2282 native_actor_key, bound_session_id
2283 ),
2284 precedence,
2285 winning_rule: "native-actor-key".to_string(),
2286 });
2287 }
2288 }
2289 precedence.push(format!("native-actor-key:{native_actor_key}:miss"));
2290 } else {
2291 precedence.push("native-actor-key:miss".to_string());
2292 }
2293
2294 if let Some(client_instance_id) = client_instance_id {
2295 if let Some(entry) = registry.find_active_by_client_instance_id(client_instance_id)?
2296 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2297 {
2298 precedence.push(format!("client-instance-id:{client_instance_id}:matched"));
2299 let session = sessions
2300 .get_session(&bound_session_id)?
2301 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2302 if session.is_active() {
2303 return Ok(ResolvedAttachment {
2304 target: AttachTarget::ExistingSession(session),
2305 matched_entry: Some(entry),
2306 attach_reason: format!(
2307 "reattached client instance {client_instance_id} to Heddle session {bound_session_id}"
2308 ),
2309 precedence,
2310 winning_rule: "client-instance-id".to_string(),
2311 });
2312 }
2313 }
2314 precedence.push(format!("client-instance-id:{client_instance_id}:miss"));
2315 return Ok(ResolvedAttachment {
2316 target: AttachTarget::CreateNew {
2317 _because_claimed: false,
2318 },
2319 matched_entry: None,
2320 attach_reason: format!(
2321 "started new Heddle session for distinct client instance {client_instance_id}"
2322 ),
2323 precedence,
2324 winning_rule: "create-new-session".to_string(),
2325 });
2326 } else {
2327 precedence.push("client-instance-id:miss".to_string());
2328 }
2329
2330 if client_instance_id.is_none() && probe.native_actor_key.is_some() {
2331 precedence.push("native-instance-key:skipped-strong-native-key".to_string());
2332 return Ok(ResolvedAttachment {
2333 target: AttachTarget::CreateNew {
2334 _because_claimed: false,
2335 },
2336 matched_entry: None,
2337 attach_reason:
2338 "started new Heddle session because no compatible native actor match was found"
2339 .to_string(),
2340 precedence,
2341 winning_rule: "create-new-session".to_string(),
2342 });
2343 }
2344
2345 if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
2346 if let Some(entry) =
2347 registry.find_active_by_native_instance_key_at_path(native_instance_key, repo.root())?
2348 && claude_actor_compatible(&entry, probe, repo.root())
2349 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2350 {
2351 precedence.push(format!("native-instance-key:{native_instance_key}:matched"));
2352 let session = sessions
2353 .get_session(&bound_session_id)?
2354 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2355 if session.is_active() {
2356 return Ok(ResolvedAttachment {
2357 target: AttachTarget::ExistingSession(session),
2358 matched_entry: Some(entry),
2359 attach_reason: format!(
2360 "reattached native instance {} to Heddle session {}",
2361 native_instance_key, bound_session_id
2362 ),
2363 precedence,
2364 winning_rule: "native-instance-key".to_string(),
2365 });
2366 }
2367 }
2368 precedence.push(format!("native-instance-key:{native_instance_key}:miss"));
2369 } else {
2370 precedence.push("native-instance-key:miss".to_string());
2371 }
2372
2373 if probe.attach_hints.root_actor
2374 && let Some(current) = sessions.get_current_session()?
2375 && current.is_active()
2376 {
2377 let claimed = session_claimed_by_other(
2378 registry,
2379 ¤t.id,
2380 requested_entry,
2381 client_instance_id,
2382 probe.native_actor_key.as_deref(),
2383 )?;
2384 if !claimed {
2385 precedence.push(format!("current-worktree-session:{}:matched", current.id));
2386 return Ok(ResolvedAttachment {
2387 target: AttachTarget::ExistingSession(current.clone()),
2388 matched_entry: None,
2389 attach_reason: format!("attached to active worktree Heddle session {}", current.id),
2390 precedence,
2391 winning_rule: "current-worktree-session".to_string(),
2392 });
2393 }
2394 precedence.push(format!("current-worktree-session:{}:claimed", current.id));
2395 return Ok(ResolvedAttachment {
2396 target: AttachTarget::CreateNew {
2397 _because_claimed: true,
2398 },
2399 matched_entry: None,
2400 attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
2401 precedence,
2402 winning_rule: "create-new-session".to_string(),
2403 });
2404 }
2405 precedence.push("current-worktree-session:miss".to_string());
2406
2407 if let Some(claims) = token_claims
2408 && let Some(token_sid) = claims.sid.as_deref()
2409 && let Some(session) = sessions.get_session(token_sid)?
2410 && session.is_active()
2411 {
2412 let claimed = session_claimed_by_other(
2413 registry,
2414 &session.id,
2415 requested_entry,
2416 client_instance_id,
2417 probe.native_actor_key.as_deref(),
2418 )?;
2419 if !claimed {
2420 precedence.push(format!("token-sid:{token_sid}:matched"));
2421 return Ok(ResolvedAttachment {
2422 target: AttachTarget::ExistingSession(session),
2423 matched_entry: None,
2424 attach_reason: format!(
2425 "attached to Heddle session {token_sid} from auth token sid"
2426 ),
2427 precedence,
2428 winning_rule: "token-sid".to_string(),
2429 });
2430 }
2431 precedence.push(format!("token-sid:{token_sid}:claimed"));
2432 return Ok(ResolvedAttachment {
2433 target: AttachTarget::CreateNew {
2434 _because_claimed: true,
2435 },
2436 matched_entry: None,
2437 attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
2438 precedence,
2439 winning_rule: "create-new-session".to_string(),
2440 });
2441 }
2442 precedence.push("token-sid:miss".to_string());
2443
2444 Ok(ResolvedAttachment {
2445 target: AttachTarget::CreateNew {
2446 _because_claimed: false,
2447 },
2448 matched_entry: None,
2449 attach_reason: "started new Heddle session".to_string(),
2450 precedence,
2451 winning_rule: "create-new-session".to_string(),
2452 })
2453}
2454
2455fn claude_actor_compatible(
2456 entry: &AgentEntry,
2457 probe: &HarnessProbeResult,
2458 repo_root: &Path,
2459) -> bool {
2460 let Some(native_actor_key) = probe.native_actor_key.as_deref() else {
2461 return true;
2462 };
2463 if !native_actor_key.starts_with("claude-code:") {
2464 return true;
2465 }
2466 if native_actor_key.starts_with("claude-code:agent:") {
2467 return entry.native_actor_key.as_deref() == Some(native_actor_key);
2468 }
2469 if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
2470 return entry.native_actor_key.as_deref() == Some(native_actor_key)
2471 && entry.native_instance_key.as_deref() == Some(native_instance_key);
2472 }
2473 let same_repo = entry
2474 .path
2475 .as_ref()
2476 .map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()))
2477 .unwrap_or_default()
2478 == repo_root
2479 .canonicalize()
2480 .unwrap_or_else(|_| repo_root.to_path_buf());
2481 entry.native_actor_key.as_deref() == Some(native_actor_key)
2482 && same_repo
2483 && probe.confidence.unwrap_or_default() >= 0.9
2484}
2485
2486fn decode_token_claims(token: &str) -> Option<TokenClaims> {
2487 let payload = token.split('.').nth(1)?;
2488 let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
2489 .decode(payload.as_bytes())
2490 .ok()?;
2491 serde_json::from_slice(&decoded).ok()
2492}
2493
2494fn user_config_token_claims(user_config: &UserConfig) -> Option<TokenClaims> {
2495 user_config
2496 .remote_token()
2497 .ok()
2498 .flatten()
2499 .and_then(|token| decode_token_claims(&token.id))
2500}
2501
2502#[derive(Debug, Deserialize)]
2503struct TokenClaims {
2504 #[serde(default)]
2505 sid: Option<String>,
2506 #[serde(default)]
2507 agent_provider: Option<String>,
2508 #[serde(default)]
2509 agent_model: Option<String>,
2510}
2511
2512fn should_rotate_segment(session: &objects::object::Session, identity: &ResolvedIdentity) -> bool {
2513 let Some(segment) = session.current_segment() else {
2514 return false;
2515 };
2516 let provider_changed = identity
2517 .provider
2518 .as_deref()
2519 .is_some_and(|provider| provider != segment.provider);
2520 let model_changed = identity
2521 .model
2522 .as_deref()
2523 .is_some_and(|model| model != segment.model);
2524 provider_changed || model_changed
2525}
2526
2527fn thread_id_for_name(repo: &Repository, thread_name: Option<&str>) -> Result<Option<String>> {
2528 let Some(thread_name) = thread_name else {
2529 return Ok(None);
2530 };
2531 Ok(ThreadManager::new(repo.heddle_dir())
2532 .load(thread_name)?
2533 .map(|thread| thread.id))
2534}
2535
2536fn can_create_harness_thread(
2537 repo: &Repository,
2538 target_thread: Option<&str>,
2539 parent_thread: Option<&str>,
2540) -> Result<bool> {
2541 Ok(resolve_harness_thread_base_state(repo, target_thread, parent_thread)?.is_some())
2542}
2543
2544fn resolve_harness_thread_base_state(
2545 repo: &Repository,
2546 target_thread: Option<&str>,
2547 parent_thread: Option<&str>,
2548) -> Result<Option<objects::object::ChangeId>> {
2549 if let Some(head_state) = repo.head()? {
2550 return Ok(Some(head_state));
2551 }
2552
2553 for thread_name in [parent_thread, target_thread].into_iter().flatten() {
2554 if let Some(state) = resolve_named_thread_base_state(repo, thread_name)? {
2555 return Ok(Some(state));
2556 }
2557 }
2558
2559 Ok(None)
2560}
2561
2562fn resolve_named_thread_base_state(
2563 repo: &Repository,
2564 thread_name: &str,
2565) -> Result<Option<objects::object::ChangeId>> {
2566 if let Some(thread) = ThreadManager::new(repo.heddle_dir()).load(thread_name)?
2567 && let Some(state_spec) = thread
2568 .current_state
2569 .as_deref()
2570 .or(Some(thread.base_state.as_str()))
2571 && let Some(state_id) = repo
2572 .resolve_state(state_spec)?
2573 .or_else(|| objects::object::ChangeId::parse(state_spec).ok())
2574 {
2575 return Ok(Some(state_id));
2576 }
2577
2578 Ok(repo.refs().get_thread(&ThreadName::new(thread_name))?)
2579}
2580
2581fn resolve_parent_thread_for_subagent(
2582 repo: &Repository,
2583 probe: &HarnessProbeResult,
2584 current_attached: Option<&str>,
2585) -> Result<Option<String>> {
2586 if let Some(parent_key) = probe.native_parent_actor_key.as_deref() {
2587 let registry = AgentRegistry::new(repo.heddle_dir());
2588 if let Some(entry) = registry.find_active_by_native_actor_key(parent_key)? {
2589 return Ok(Some(entry.thread));
2590 }
2591 }
2592 Ok(current_attached.map(ToString::to_string))
2593}
2594
2595fn preferred_thread_slug(
2596 params: &OpenSessionParams,
2597 probe: &HarnessProbeResult,
2598 identity: &ResolvedIdentity,
2599) -> String {
2600 params
2601 .task
2602 .clone()
2603 .or_else(|| params.summary.clone())
2604 .or_else(|| probe.native_actor_key.as_deref().map(native_key_slug))
2605 .or_else(|| probe.native_instance_key.as_deref().map(native_key_slug))
2606 .or_else(|| identity.harness.clone())
2607 .unwrap_or_else(|| "work".to_string())
2608}
2609
2610fn native_key_slug(value: &str) -> String {
2611 value
2612 .rsplit(':')
2613 .next()
2614 .map(ToString::to_string)
2615 .unwrap_or_else(|| value.to_string())
2616}
2617
2618fn allocate_thread_name(repo: &Repository, base: &str) -> Result<String> {
2619 if ThreadManager::new(repo.heddle_dir()).load(base)?.is_none()
2620 && repo.refs().get_thread(&ThreadName::new(base))?.is_none()
2621 {
2622 return Ok(base.to_string());
2623 }
2624 for idx in 2..1000 {
2625 let candidate = format!("{base}-{idx}");
2626 if ThreadManager::new(repo.heddle_dir())
2627 .load(&candidate)?
2628 .is_none()
2629 && repo
2630 .refs()
2631 .get_thread(&ThreadName::new(&candidate))?
2632 .is_none()
2633 {
2634 return Ok(candidate);
2635 }
2636 }
2637 Err(anyhow!(
2638 "could not allocate a unique thread name from '{base}'"
2639 ))
2640}
2641
2642fn default_private_thread_path(repo: &Repository, name: &str) -> PathBuf {
2643 repo.managed_checkout_path(name)
2651}
2652
2653fn sanitize_name(name: &str) -> String {
2654 let mut out = String::new();
2655 let mut last_dash = false;
2656 for ch in name.chars() {
2657 if ch.is_ascii_alphanumeric() {
2658 out.push(ch.to_ascii_lowercase());
2659 last_dash = false;
2660 } else if !last_dash {
2661 out.push('-');
2662 last_dash = true;
2663 }
2664 }
2665 out.trim_matches('-').to_string()
2666}
2667
2668fn resolve_requested_registry_entry(
2669 registry: &AgentRegistry,
2670 agent_session_id: Option<&str>,
2671 client_instance_id: Option<&str>,
2672) -> Result<Option<AgentEntry>> {
2673 if let Some(agent_session_id) = agent_session_id {
2674 let entry = registry
2675 .load(agent_session_id)?
2676 .ok_or_else(|| anyhow!("agent session not found: {agent_session_id}"))?;
2677 if entry.status != AgentStatus::Active {
2678 return Err(anyhow!("agent session is not active: {agent_session_id}"));
2679 }
2680 return Ok(Some(entry));
2681 }
2682
2683 if let Some(client_instance_id) = client_instance_id {
2684 return Ok(registry.find_active_by_client_instance_id(client_instance_id)?);
2685 }
2686
2687 Ok(None)
2688}
2689
2690fn ensure_requested_entry_matches_session(
2691 requested_entry: Option<&AgentEntry>,
2692 heddle_session_id: &str,
2693) -> Result<()> {
2694 if let Some(entry) = requested_entry
2695 && let Some(bound_session_id) = entry.heddle_session_id.as_deref()
2696 && bound_session_id != heddle_session_id
2697 {
2698 return Err(anyhow!(
2699 "requested agent is already bound to a different heddle session: {}",
2700 entry.session_id
2701 ));
2702 }
2703 Ok(())
2704}
2705
2706fn session_claimed_by_other(
2707 registry: &AgentRegistry,
2708 heddle_session_id: &str,
2709 requested_entry: Option<&AgentEntry>,
2710 client_instance_id: Option<&str>,
2711 native_actor_key: Option<&str>,
2712) -> Result<bool> {
2713 if requested_entry.is_none() && client_instance_id.is_none() && native_actor_key.is_none() {
2714 return Ok(false);
2715 }
2716
2717 let Some(existing) = registry.find_active_by_heddle_session_id(heddle_session_id)? else {
2718 return Ok(false);
2719 };
2720 if let Some(requested) = requested_entry {
2721 return Ok(requested.session_id != existing.session_id);
2722 }
2723 if let Some(client_instance_id) = client_instance_id
2724 && existing.client_instance_id.as_deref() == Some(client_instance_id)
2725 {
2726 return Ok(false);
2727 }
2728 if let Some(native_actor_key) = native_actor_key
2729 && existing.native_actor_key.as_deref() == Some(native_actor_key)
2730 {
2731 return Ok(false);
2732 }
2733 Ok(true)
2734}
2735
2736fn find_matching_registry_entry(
2737 registry: &AgentRegistry,
2738 repo: &Repository,
2739 heddle_session_id: &str,
2740 thread_name: Option<&str>,
2741) -> Result<Option<AgentEntry>> {
2742 if let Some(entry) = registry.find_active_by_heddle_session_id(heddle_session_id)? {
2743 return Ok(Some(entry));
2744 }
2745 let canonical_root = repo
2746 .root()
2747 .canonicalize()
2748 .unwrap_or_else(|_| repo.root().to_path_buf());
2749 Ok(registry
2750 .list()?
2751 .into_iter()
2752 .filter(|entry| entry.status == AgentStatus::Active)
2753 .find(|entry| {
2754 entry
2755 .path
2756 .as_ref()
2757 .map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()) == canonical_root)
2758 .unwrap_or(false)
2759 || thread_name.is_some_and(|thread| entry.thread == thread)
2760 }))
2761}
2762
2763fn merged_env_hints(extra: &BTreeMap<String, String>) -> BTreeMap<String, String> {
2764 let mut merged: BTreeMap<String, String> = std::env::vars()
2765 .filter(|(key, _)| inherited_harness_hint(key))
2766 .collect();
2767 for (key, value) in extra {
2768 merged.insert(key.clone(), value.clone());
2769 }
2770 merged
2771}
2772
2773fn inherited_harness_hint(key: &str) -> bool {
2774 if matches!(
2775 key,
2776 "OPENAI_MODEL"
2777 | "ANTHROPIC_MODEL"
2778 | "CLAUDE_MODEL"
2779 | "MODEL"
2780 | "OPENAI_REASONING_EFFORT"
2781 | "REASONING_EFFORT"
2782 | "THINKING_LEVEL"
2783 | "PROMPT_POLICY"
2784 ) {
2785 return false;
2786 }
2787
2788 key.starts_with("HEDDLE_")
2789 || key.starts_with("CODEX_")
2790 || key == "CLAUDECODE"
2791 || key.starts_with("OPENCODE_")
2792}
2793
2794fn to_json_value<T: Serialize>(value: T) -> Result<Value> {
2795 serde_json::to_value(value).map_err(|err| anyhow!(err))
2796}
2797
2798fn normalize_paths<I>(paths: I) -> Vec<String>
2799where
2800 I: IntoIterator<Item = String>,
2801{
2802 let mut ordered = BTreeSet::new();
2803 for path in paths {
2804 let normalized = path.trim().replace('\\', "/");
2805 if !normalized.is_empty() {
2806 ordered.insert(normalized);
2807 }
2808 }
2809 ordered.into_iter().collect()
2810}
2811
2812fn merge_unique_paths<I>(target: &mut Vec<String>, paths: I)
2813where
2814 I: IntoIterator<Item = String>,
2815{
2816 let mut merged: BTreeSet<String> = target.iter().cloned().collect();
2817 merged.extend(paths);
2818 *target = merged.into_iter().collect();
2819}
2820
2821fn max_u64(current: Option<u64>, candidate: u64) -> u64 {
2822 current
2823 .map(|value| value.max(candidate))
2824 .unwrap_or(candidate)
2825}
2826
2827fn max_u32(current: Option<u32>, candidate: u32) -> u32 {
2828 current
2829 .map(|value| value.max(candidate))
2830 .unwrap_or(candidate)
2831}
2832
2833fn merge_usage(target: &mut UsageTotals, incoming: &UsageTotals) {
2834 if let Some(input) = incoming.input_tokens {
2835 target.input_tokens = Some(max_u64(target.input_tokens, input));
2836 }
2837 if let Some(output) = incoming.output_tokens {
2838 target.output_tokens = Some(max_u64(target.output_tokens, output));
2839 }
2840 if let Some(reasoning) = incoming.reasoning_tokens {
2841 target.reasoning_tokens = Some(max_u64(target.reasoning_tokens, reasoning));
2842 }
2843 if let Some(cache_creation) = incoming.cache_creation_tokens {
2844 target.cache_creation_tokens = Some(max_u64(target.cache_creation_tokens, cache_creation));
2845 }
2846 if let Some(cache_read) = incoming.cache_read_tokens {
2847 target.cache_read_tokens = Some(max_u64(target.cache_read_tokens, cache_read));
2848 }
2849 if let Some(tool_calls) = incoming.tool_calls {
2850 target.tool_calls = Some(max_u32(target.tool_calls, tool_calls));
2851 }
2852 if let Some(cost) = incoming.cost_micros_usd {
2853 target.cost_micros_usd = Some(max_u64(target.cost_micros_usd, cost));
2854 }
2855}
2856
2857fn parse_timestamp(value: &str) -> Option<chrono::DateTime<Utc>> {
2858 chrono::DateTime::parse_from_rfc3339(value)
2859 .ok()
2860 .map(|dt| dt.with_timezone(&Utc))
2861}
2862
2863fn transport_from_report(
2864 report: &SessionReportEnvelope,
2865 fallback: HarnessTransport,
2866) -> HarnessTransport {
2867 match report.transport_mode.as_str() {
2868 "spool" => HarnessTransport::Spool,
2869 "direct" => HarnessTransport::Direct,
2870 "end" => HarnessTransport::End,
2871 _ => fallback,
2872 }
2873}
2874
2875fn mark_pending_flush(report: &mut SessionReportEnvelope) {
2876 report.pending_flush = true;
2877 report.report_flush_state = Some("pending-local".to_string());
2878}
2879
2880fn enqueue_report(store: &SessionReportStore, report: &mut SessionReportEnvelope) -> Result<()> {
2881 store.append_outbox(report)?;
2882 report.pending_flush = false;
2883 let flushed_at = Utc::now().to_rfc3339();
2884 report.last_flushed_at = Some(flushed_at);
2885 report.report_flush_state = Some("queued-local".to_string());
2886 store.save(report)?;
2887 Ok(())
2888}
2889
2890fn usage_to_summary(usage: &UsageTotals) -> AgentUsageSummary {
2891 AgentUsageSummary {
2892 input_tokens: usage.input_tokens,
2893 output_tokens: usage.output_tokens,
2894 reasoning_tokens: usage.reasoning_tokens,
2895 tool_calls: usage.tool_calls,
2896 cost_micros_usd: usage.cost_micros_usd,
2897 }
2898}
2899
2900fn transcript_mode_name(mode: HarnessTranscriptMode) -> &'static str {
2901 match mode {
2902 HarnessTranscriptMode::Off => "off",
2903 HarnessTranscriptMode::Summary => "summary",
2904 HarnessTranscriptMode::Full => "full",
2905 }
2906}
2907
2908fn transport_mode_name(mode: HarnessTransport) -> &'static str {
2909 match mode {
2910 HarnessTransport::Spool => "spool",
2911 HarnessTransport::Direct => "direct",
2912 HarnessTransport::End => "end",
2913 }
2914}
2915
2916struct FinalDiff {
2917 changed_paths: Vec<String>,
2918 diff_summary: SessionDiffSummary,
2919 head_state: Option<String>,
2920}
2921
2922fn compute_final_diff(
2923 repo: &Repository,
2924 base_state: Option<&str>,
2925 worktree_baseline: &[WorktreeChangeBaseline],
2926) -> Result<FinalDiff> {
2927 let mut changes: BTreeMap<String, DiffKind> = BTreeMap::new();
2928
2929 let head_state = repo.head()?;
2930 if let (Some(base_spec), Some(head_id)) = (base_state, head_state) {
2931 let base_id = repo
2932 .resolve_state(base_spec)?
2933 .or_else(|| objects::object::ChangeId::parse(base_spec).ok());
2934 if let Some(base_id) = base_id
2935 && base_id != head_id
2936 {
2937 let Some(base_state_obj) = repo.store().get_state(&base_id)? else {
2938 return Err(anyhow!("base state not found: {base_spec}"));
2939 };
2940 let Some(head_state_obj) = repo.store().get_state(&head_id)? else {
2941 return Err(anyhow!("head state not found: {}", head_id.short()));
2942 };
2943 for change in repo.diff_trees(&base_state_obj.tree, &head_state_obj.tree)? {
2944 changes.insert(change.path, change.kind);
2945 }
2946 }
2947 }
2948
2949 let baseline_paths: BTreeSet<(String, String)> = worktree_baseline
2950 .iter()
2951 .map(|change| (change.path.clone(), change.kind.clone()))
2952 .collect();
2953 for (path, kind) in collect_worktree_changes(repo)? {
2954 let kind_name = diff_kind_name(kind);
2955 if !baseline_paths.contains(&(path.clone(), kind_name.to_string())) {
2956 changes.insert(path, kind);
2957 }
2958 }
2959
2960 let diff_summary = SessionDiffSummary {
2961 changed_file_count: changes.len() as u32,
2962 added_files: changes
2963 .values()
2964 .filter(|kind| **kind == DiffKind::Added)
2965 .count() as u32,
2966 modified_files: changes
2967 .values()
2968 .filter(|kind| **kind == DiffKind::Modified)
2969 .count() as u32,
2970 deleted_files: changes
2971 .values()
2972 .filter(|kind| **kind == DiffKind::Deleted)
2973 .count() as u32,
2974 };
2975
2976 Ok(FinalDiff {
2977 changed_paths: changes.into_keys().collect(),
2978 diff_summary,
2979 head_state: head_state.map(|id| id.to_string_full()),
2980 })
2981}
2982
2983fn capture_worktree_change_snapshot(repo: &Repository) -> Result<Vec<WorktreeChangeBaseline>> {
2984 Ok(collect_worktree_changes(repo)?
2985 .into_iter()
2986 .map(|(path, kind)| WorktreeChangeBaseline {
2987 path,
2988 kind: diff_kind_name(kind).to_string(),
2989 })
2990 .collect())
2991}
2992
2993fn collect_worktree_changes(repo: &Repository) -> Result<BTreeMap<String, DiffKind>> {
2994 let status_options = worktree_status_options(Some(repo.config()));
2995 let worktree_tree = match repo.current_state()? {
2996 Some(state) => repo.require_tree(&state.tree)?,
2997 None => Tree::new(),
2998 };
2999 let status = repo.compare_worktree_cached_with_options(&worktree_tree, &status_options)?;
3000 let mut changes = BTreeMap::new();
3001 for path in status.added {
3002 changes.insert(path.display().to_string(), DiffKind::Added);
3003 }
3004 for path in status.modified {
3005 changes.insert(path.display().to_string(), DiffKind::Modified);
3006 }
3007 for path in status.deleted {
3008 changes.insert(path.display().to_string(), DiffKind::Deleted);
3009 }
3010 Ok(changes)
3011}
3012
3013fn changed_paths_between_states(
3014 repo: &Repository,
3015 before_state: ChangeId,
3016 after_state: ChangeId,
3017) -> Result<Vec<String>> {
3018 if before_state == after_state {
3019 return Ok(Vec::new());
3020 }
3021 let Some(before_state_obj) = repo.store().get_state(&before_state)? else {
3022 return Err(anyhow!(
3023 "timeline before state not found: {}",
3024 before_state.short()
3025 ));
3026 };
3027 let Some(after_state_obj) = repo.store().get_state(&after_state)? else {
3028 return Err(anyhow!(
3029 "timeline after state not found: {}",
3030 after_state.short()
3031 ));
3032 };
3033 let mut paths = BTreeSet::new();
3034 for change in repo.diff_trees(&before_state_obj.tree, &after_state_obj.tree)? {
3035 paths.insert(change.path);
3036 }
3037 Ok(paths.into_iter().collect())
3038}
3039
3040fn diff_kind_name(kind: DiffKind) -> &'static str {
3041 match kind {
3042 DiffKind::Added => "added",
3043 DiffKind::Modified => "modified",
3044 DiffKind::Deleted => "deleted",
3045 DiffKind::Unchanged => "unchanged",
3046 }
3047}
3048
3049struct SessionReportStore {
3050 dir: PathBuf,
3051}
3052
3053impl SessionReportStore {
3054 fn new(repo_root: &Path) -> Self {
3055 Self {
3056 dir: repo_root.join(".heddle/state").join("session-reports"),
3057 }
3058 }
3059
3060 fn session_path(&self, heddle_session_id: &str) -> PathBuf {
3061 self.dir.join(format!("{heddle_session_id}.json"))
3062 }
3063
3064 fn outbox_path(&self) -> PathBuf {
3065 self.dir.join("outbox.jsonl")
3066 }
3067
3068 fn load(&self, heddle_session_id: &str) -> Result<Option<SessionReportEnvelope>> {
3069 let path = self.session_path(heddle_session_id);
3070 if !path.exists() {
3071 return Ok(None);
3072 }
3073 let bytes = fs::read(path)?;
3074 Ok(Some(serde_json::from_slice(&bytes)?))
3075 }
3076
3077 fn save(&self, report: &SessionReportEnvelope) -> Result<()> {
3078 fs::create_dir_all(&self.dir)?;
3079 let path = self.session_path(&report.heddle_session_id);
3080 let bytes = serde_json::to_vec_pretty(report)?;
3081 write_file_atomic(&path, &bytes)?;
3082 Ok(())
3083 }
3084
3085 fn append_outbox(&self, report: &SessionReportEnvelope) -> Result<()> {
3086 fs::create_dir_all(&self.dir)?;
3087 let mut file = OpenOptions::new()
3088 .create(true)
3089 .append(true)
3090 .open(self.outbox_path())?;
3091 serde_json::to_writer(&mut file, report)?;
3092 file.write_all(b"\n")?;
3093 file.flush()?;
3094 Ok(())
3095 }
3096
3097 fn list_pending(&self) -> Result<Vec<String>> {
3098 if !self.dir.exists() {
3099 return Ok(Vec::new());
3100 }
3101 let mut ids = Vec::new();
3102 for entry in fs::read_dir(&self.dir)? {
3103 let entry = entry?;
3104 let path = entry.path();
3105 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
3106 continue;
3107 }
3108 let bytes = fs::read(&path)?;
3109 let report: SessionReportEnvelope = serde_json::from_slice(&bytes)?;
3110 if report.pending_flush {
3111 ids.push(report.heddle_session_id);
3112 }
3113 }
3114 ids.sort();
3115 Ok(ids)
3116 }
3117}
3118
3119#[derive(Debug, Deserialize)]
3120struct BridgeRequest {
3121 #[serde(default)]
3122 id: Option<String>,
3123 method: String,
3124 #[serde(default)]
3125 params: Value,
3126}
3127
3128#[derive(Debug, Serialize)]
3129struct BridgeResponse {
3130 #[serde(default)]
3131 id: Option<String>,
3132 ok: bool,
3133 #[serde(skip_serializing_if = "Option::is_none")]
3134 result: Option<Value>,
3135 #[serde(skip_serializing_if = "Option::is_none")]
3136 error: Option<BridgeError>,
3137}
3138
3139impl BridgeResponse {
3140 fn ok(id: Option<String>, result: Value) -> Self {
3141 Self {
3142 id,
3143 ok: true,
3144 result: Some(result),
3145 error: None,
3146 }
3147 }
3148
3149 fn error(id: Option<String>, code: impl Into<String>, message: impl Into<String>) -> Self {
3150 Self {
3151 id,
3152 ok: false,
3153 result: None,
3154 error: Some(BridgeError {
3155 code: code.into(),
3156 message: message.into(),
3157 }),
3158 }
3159 }
3160}
3161
3162#[derive(Debug, Serialize)]
3163struct BridgeError {
3164 code: String,
3165 message: String,
3166}
3167
3168#[derive(Debug, Clone, Deserialize, Default)]
3169struct OpenSessionParams {
3170 #[serde(default)]
3171 heddle_session_id: Option<String>,
3172 #[serde(default)]
3173 agent_session_id: Option<String>,
3174 #[serde(default)]
3175 client_instance_id: Option<String>,
3176 #[serde(default)]
3177 thread: Option<String>,
3178 #[serde(default)]
3179 task: Option<String>,
3180 #[serde(default)]
3181 summary: Option<String>,
3182 #[serde(default)]
3183 harness: Option<String>,
3184 #[serde(default)]
3185 provider: Option<String>,
3186 #[serde(default)]
3187 model: Option<String>,
3188 #[serde(default)]
3189 thinking_level: Option<String>,
3190 #[serde(default)]
3191 policy: Option<String>,
3192 #[serde(default)]
3193 transport: Option<HarnessTransport>,
3194 #[serde(default)]
3195 transcript_mode: Option<HarnessTranscriptMode>,
3196 #[serde(default)]
3197 argv: Option<Vec<String>>,
3198 #[serde(default)]
3199 env_hints: BTreeMap<String, String>,
3200 #[serde(default)]
3201 probe_metadata: BTreeMap<String, String>,
3202}
3203
3204#[derive(Debug, Clone, Deserialize, Default)]
3205struct UpdateProgressParams {
3206 heddle_session_id: String,
3207 #[serde(default)]
3208 status: Option<String>,
3209 #[serde(default)]
3210 message: Option<String>,
3211 #[serde(default)]
3212 completed_steps: Option<u32>,
3213 #[serde(default)]
3214 total_steps: Option<u32>,
3215 #[serde(default)]
3216 touched_paths: Vec<String>,
3217 #[serde(default)]
3218 summary: Option<String>,
3219 #[serde(default)]
3220 harness: Option<String>,
3221 #[serde(default)]
3222 provider: Option<String>,
3223 #[serde(default)]
3224 model: Option<String>,
3225 #[serde(default)]
3226 thinking_level: Option<String>,
3227 #[serde(default)]
3228 policy: Option<String>,
3229 #[serde(default)]
3230 argv: Option<Vec<String>>,
3231 #[serde(default)]
3232 env_hints: BTreeMap<String, String>,
3233 #[serde(default)]
3234 probe_metadata: BTreeMap<String, String>,
3235}
3236
3237#[derive(Debug, Clone, Deserialize, Default)]
3238struct RecordUsageParams {
3239 heddle_session_id: String,
3240 #[serde(default)]
3241 input_tokens: Option<u64>,
3242 #[serde(default)]
3243 output_tokens: Option<u64>,
3244 #[serde(default)]
3245 reasoning_tokens: Option<u64>,
3246 #[serde(default)]
3247 cache_creation_tokens: Option<u64>,
3248 #[serde(default)]
3249 cache_read_tokens: Option<u64>,
3250 #[serde(default)]
3251 tool_calls: Option<u32>,
3252 #[serde(default)]
3253 cost_micros_usd: Option<u64>,
3254}
3255
3256#[derive(Debug, Clone, Deserialize, Default)]
3257struct RecordTouchedPathsParams {
3258 heddle_session_id: String,
3259 #[serde(default)]
3260 paths: Vec<String>,
3261}
3262
3263#[derive(Debug, Clone, Deserialize, Default)]
3264struct CloseSessionParams {
3265 heddle_session_id: String,
3266 #[serde(default)]
3267 outcome: Option<String>,
3268 #[serde(default)]
3269 summary: Option<String>,
3270 #[serde(default)]
3271 transcript_refs: Option<Vec<TranscriptAttachmentRef>>,
3272 #[serde(default)]
3273 transport: Option<HarnessTransport>,
3274}
3275
3276#[derive(Debug, Clone, Deserialize, Default)]
3277struct FlushReportsParams {
3278 #[serde(default)]
3279 heddle_session_id: Option<String>,
3280}
3281
3282#[derive(Debug, Serialize)]
3283struct OpenSessionResult {
3284 heddle_session_id: String,
3285 heddle_segment_id: Option<String>,
3286 agent_session_id: Option<String>,
3287 created_session: bool,
3288 harness: Option<String>,
3289 provider: Option<String>,
3290 model: Option<String>,
3291 thinking_level: Option<String>,
3292 report_flush_state: Option<String>,
3293 attach_reason: Option<String>,
3294}
3295
3296#[derive(Debug, Serialize)]
3297struct SessionMutationResult {
3298 heddle_session_id: String,
3299 heddle_segment_id: Option<String>,
3300 report_flush_state: Option<String>,
3301}
3302
3303#[derive(Debug, Serialize)]
3304struct CloseSessionResult {
3305 heddle_session_id: String,
3306 changed_paths: Vec<String>,
3307 diff_summary: SessionDiffSummary,
3308 report_flush_state: Option<String>,
3309}
3310
3311#[derive(Debug, Serialize)]
3312struct FlushReportsResult {
3313 flushed: usize,
3314}
3315
3316#[cfg(test)]
3317mod tests {
3318 use super::*;
3319 #[cfg(unix)]
3320 use std::os::unix::fs::PermissionsExt;
3321
3322 fn init_repo() -> (tempfile::TempDir, Repository) {
3323 let temp = tempfile::TempDir::new().unwrap();
3324 let repo = Repository::init_default(temp.path()).unwrap();
3325 (temp, repo)
3326 }
3327
3328 #[test]
3329 fn harness_config_load_missing_path_defaults_without_warning() {
3330 let temp = tempfile::TempDir::new().unwrap();
3331 let missing = temp.path().join("missing-config.toml");
3332
3333 let (config, warning) = load_harness_user_config(Some(missing));
3334
3335 assert_eq!(config.harness.transport, HarnessTransport::Spool);
3336 assert!(warning.is_none());
3337 }
3338
3339 #[test]
3340 fn harness_config_load_malformed_path_warns_and_defaults() {
3341 let temp = tempfile::TempDir::new().unwrap();
3342 let path = temp.path().join("config.toml");
3343 std::fs::write(&path, "[harness\ntransport = \"direct\"\n").unwrap();
3344
3345 let (config, warning) = load_harness_user_config(Some(path.clone()));
3346
3347 assert_eq!(config.harness.transport, HarnessTransport::Spool);
3348 let warning = warning.expect("malformed config should produce a warning");
3349 assert!(warning.contains("failed to load user config"));
3350 assert!(warning.contains(&path.display().to_string()));
3351 assert!(warning.contains("continuing with defaults"));
3352 }
3353
3354 #[test]
3355 fn harness_config_load_valid_path_loads_without_warning() {
3356 let temp = tempfile::TempDir::new().unwrap();
3357 let path = temp.path().join("config.toml");
3358 std::fs::write(
3359 &path,
3360 "[harness]\ntransport = \"direct\"\ntranscript = \"summary\"\n",
3361 )
3362 .unwrap();
3363
3364 let (config, warning) = load_harness_user_config(Some(path));
3365
3366 assert_eq!(config.harness.transport, HarnessTransport::Direct);
3367 assert_eq!(config.harness.transcript, HarnessTranscriptMode::Summary);
3368 assert!(warning.is_none());
3369 }
3370
3371 #[test]
3372 fn relay_payload_parse_invalid_json_warns_and_uses_null() {
3373 let (value, warning) = parse_relay_payload("{not-json");
3374
3375 assert_eq!(value, Value::Null);
3376 let warning = warning.expect("invalid JSON should produce a warning");
3377 assert!(warning.contains("failed to parse harness relay payload as JSON"));
3378 assert!(warning.contains("continuing with null payload"));
3379 }
3380
3381 #[test]
3382 fn relay_payload_parse_empty_payload_uses_null_without_warning() {
3383 let (value, warning) = parse_relay_payload(" \n");
3384
3385 assert_eq!(value, Value::Null);
3386 assert!(warning.is_none());
3387 }
3388
3389 #[test]
3390 fn relay_payload_parse_valid_json_without_warning() {
3391 let (value, warning) = parse_relay_payload(r#"{"message":"hello"}"#);
3392
3393 assert_eq!(value["message"], "hello");
3394 assert!(warning.is_none());
3395 }
3396
3397 #[test]
3405 fn harness_default_path_matches_canonical_thread_dir() {
3406 let (_temp, repo) = init_repo();
3407 for id in ["foo", "parent/task", "feature/foo", "team@scope"] {
3408 let harness_path = default_private_thread_path(&repo, id);
3409 let canonical = repo.managed_checkout_path(id);
3410 assert_eq!(
3411 harness_path, canonical,
3412 "harness default must match the canonical thread_dir for {id:?}"
3413 );
3414 }
3415 }
3416
3417 #[test]
3418 fn inherited_harness_hints_exclude_ambient_model_identity() {
3419 assert!(!inherited_harness_hint("OPENAI_MODEL"));
3420 assert!(!inherited_harness_hint("ANTHROPIC_MODEL"));
3421 assert!(!inherited_harness_hint("CLAUDE_MODEL"));
3422 assert!(!inherited_harness_hint("MODEL"));
3423 assert!(!inherited_harness_hint("OPENAI_REASONING_EFFORT"));
3424 assert!(inherited_harness_hint("HEDDLE_AGENT_MODEL"));
3425 assert!(inherited_harness_hint("CODEX_SANDBOX"));
3426 assert!(inherited_harness_hint("CLAUDECODE"));
3427 }
3428
3429 #[test]
3430 fn open_session_creates_or_attaches() {
3431 let (_temp, repo) = init_repo();
3432 let user_config = UserConfig::default();
3433 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3434
3435 let created = runtime
3436 .open_session(OpenSessionParams {
3437 harness: Some("codex".to_string()),
3438 provider: Some("openai".to_string()),
3439 model: Some("gpt-5.4".to_string()),
3440 ..OpenSessionParams::default()
3441 })
3442 .unwrap();
3443 assert!(created.created_session);
3444
3445 let attached = runtime
3446 .open_session(OpenSessionParams {
3447 harness: Some("codex".to_string()),
3448 provider: Some("openai".to_string()),
3449 model: Some("gpt-5.4".to_string()),
3450 ..OpenSessionParams::default()
3451 })
3452 .unwrap();
3453 assert!(!attached.created_session);
3454 assert_eq!(created.heddle_session_id, attached.heddle_session_id);
3455 }
3456
3457 #[test]
3458 fn same_client_instance_reattaches_to_its_existing_session() {
3459 let (_temp, repo) = init_repo();
3460 let user_config = UserConfig::default();
3461 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3462
3463 let first = runtime
3464 .open_session(OpenSessionParams {
3465 client_instance_id: Some("client-a".to_string()),
3466 harness: Some("codex".to_string()),
3467 provider: Some("openai".to_string()),
3468 model: Some("gpt-5.4".to_string()),
3469 ..OpenSessionParams::default()
3470 })
3471 .unwrap();
3472 let second = runtime
3473 .open_session(OpenSessionParams {
3474 client_instance_id: Some("client-b".to_string()),
3475 harness: Some("codex".to_string()),
3476 provider: Some("openai".to_string()),
3477 model: Some("gpt-5.4".to_string()),
3478 ..OpenSessionParams::default()
3479 })
3480 .unwrap();
3481 let reopened = runtime
3482 .open_session(OpenSessionParams {
3483 client_instance_id: Some("client-a".to_string()),
3484 harness: Some("codex".to_string()),
3485 provider: Some("openai".to_string()),
3486 model: Some("gpt-5.4".to_string()),
3487 ..OpenSessionParams::default()
3488 })
3489 .unwrap();
3490
3491 assert_ne!(first.heddle_session_id, second.heddle_session_id);
3492 assert_eq!(first.heddle_session_id, reopened.heddle_session_id);
3493 assert_eq!(first.agent_session_id, reopened.agent_session_id);
3494 }
3495
3496 #[test]
3497 fn different_client_instances_do_not_share_the_current_session() {
3498 let (_temp, repo) = init_repo();
3499 let user_config = UserConfig::default();
3500 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3501
3502 let first = runtime
3503 .open_session(OpenSessionParams {
3504 client_instance_id: Some("client-a".to_string()),
3505 harness: Some("codex".to_string()),
3506 provider: Some("openai".to_string()),
3507 model: Some("gpt-5.4".to_string()),
3508 ..OpenSessionParams::default()
3509 })
3510 .unwrap();
3511 let second = runtime
3512 .open_session(OpenSessionParams {
3513 client_instance_id: Some("client-b".to_string()),
3514 harness: Some("codex".to_string()),
3515 provider: Some("openai".to_string()),
3516 model: Some("gpt-5.4".to_string()),
3517 ..OpenSessionParams::default()
3518 })
3519 .unwrap();
3520
3521 assert_ne!(first.heddle_session_id, second.heddle_session_id);
3522 assert_ne!(first.agent_session_id, second.agent_session_id);
3523 }
3524
3525 #[test]
3526 fn provider_model_change_creates_segment() {
3527 let (_temp, repo) = init_repo();
3528 let user_config = UserConfig::default();
3529 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3530
3531 let opened = runtime
3532 .open_session(OpenSessionParams {
3533 harness: Some("claude-code".to_string()),
3534 provider: Some("anthropic".to_string()),
3535 model: Some("claude-sonnet".to_string()),
3536 ..OpenSessionParams::default()
3537 })
3538 .unwrap();
3539 runtime
3540 .update_progress(UpdateProgressParams {
3541 heddle_session_id: opened.heddle_session_id.clone(),
3542 provider: Some("openai".to_string()),
3543 model: Some("gpt-5.4".to_string()),
3544 ..UpdateProgressParams::default()
3545 })
3546 .unwrap();
3547
3548 let report = runtime
3549 .reports
3550 .load(&opened.heddle_session_id)
3551 .unwrap()
3552 .unwrap();
3553 let expected_segment = format!("{}-seg-2", opened.heddle_session_id);
3554 assert_eq!(
3555 report.heddle_segment_id.as_deref(),
3556 Some(expected_segment.as_str())
3557 );
3558 }
3559
3560 #[test]
3561 fn blank_agent_model_hint_falls_through_to_detected_model_without_segment_rotation() {
3562 let (_temp, repo) = init_repo();
3563 let user_config = UserConfig::default();
3564 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3565 let blank_model_env = BTreeMap::from([
3566 ("HEDDLE_AGENT_PROVIDER".to_string(), "anthropic".to_string()),
3567 ("HEDDLE_AGENT_MODEL".to_string(), String::new()),
3568 ]);
3569
3570 let opened = runtime
3571 .open_session(OpenSessionParams {
3572 harness: Some("claude-code".to_string()),
3573 env_hints: blank_model_env.clone(),
3574 probe_metadata: BTreeMap::from([
3575 ("session_id".to_string(), "claude-sess-blank".to_string()),
3576 ("model".to_string(), "claude-opus-4-8[1m]".to_string()),
3577 ]),
3578 ..OpenSessionParams::default()
3579 })
3580 .unwrap();
3581 assert_eq!(opened.model.as_deref(), Some("claude-opus-4-8[1m]"));
3582
3583 let original_segment = opened.heddle_segment_id.clone();
3584 runtime
3585 .update_progress(UpdateProgressParams {
3586 heddle_session_id: opened.heddle_session_id.clone(),
3587 env_hints: blank_model_env,
3588 probe_metadata: BTreeMap::from([
3589 ("session_id".to_string(), "claude-sess-blank".to_string()),
3590 ("model".to_string(), "claude-opus-4-8[1m]".to_string()),
3591 ]),
3592 ..UpdateProgressParams::default()
3593 })
3594 .unwrap();
3595
3596 let report = runtime
3597 .reports
3598 .load(&opened.heddle_session_id)
3599 .unwrap()
3600 .unwrap();
3601 assert_eq!(report.harness.model.as_deref(), Some("claude-opus-4-8[1m]"));
3602 assert_eq!(report.heddle_segment_id, original_segment);
3603 }
3604
3605 #[test]
3606 fn close_session_captures_changed_paths_from_status_and_hints() {
3607 let (temp, repo) = init_repo();
3608 let config = UserConfig::default();
3609 let mut runtime = HarnessBridgeRuntime::new(repo, config);
3610
3611 let opened = runtime
3612 .open_session(OpenSessionParams {
3613 harness: Some("codex".to_string()),
3614 provider: Some("openai".to_string()),
3615 model: Some("gpt-5.4".to_string()),
3616 ..OpenSessionParams::default()
3617 })
3618 .unwrap();
3619 std::fs::write(temp.path().join("src.txt"), "hello\n").unwrap();
3620 runtime
3621 .record_touched_paths(RecordTouchedPathsParams {
3622 heddle_session_id: opened.heddle_session_id.clone(),
3623 paths: vec!["src.txt".to_string(), "notes.md".to_string()],
3624 })
3625 .unwrap();
3626 let closed = runtime
3627 .close_session(CloseSessionParams {
3628 heddle_session_id: opened.heddle_session_id.clone(),
3629 outcome: Some("completed".to_string()),
3630 ..CloseSessionParams::default()
3631 })
3632 .unwrap();
3633 let report = runtime
3634 .reports
3635 .load(&opened.heddle_session_id)
3636 .unwrap()
3637 .unwrap();
3638 assert!(closed.changed_paths.iter().any(|path| path == "src.txt"));
3639 assert!(!closed.changed_paths.iter().any(|path| path == "notes.md"));
3640 assert!(report.touched_paths.iter().any(|path| path == "src.txt"));
3641 assert!(report.touched_paths.iter().any(|path| path == "notes.md"));
3642 assert_eq!(
3643 closed.diff_summary.changed_file_count,
3644 closed.changed_paths.len() as u32
3645 );
3646 }
3647
3648 #[test]
3649 fn flush_reports_moves_pending_report_to_outbox() {
3650 let (_temp, repo) = init_repo();
3651 let user_config = UserConfig::default();
3652 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3653
3654 let opened = runtime
3655 .open_session(OpenSessionParams {
3656 harness: Some("codex".to_string()),
3657 provider: Some("openai".to_string()),
3658 model: Some("gpt-5.4".to_string()),
3659 ..OpenSessionParams::default()
3660 })
3661 .unwrap();
3662 let flushed = runtime
3663 .flush_reports(FlushReportsParams {
3664 heddle_session_id: Some(opened.heddle_session_id.clone()),
3665 })
3666 .unwrap();
3667 assert_eq!(flushed.flushed, 1);
3668 let report = runtime
3669 .reports
3670 .load(&opened.heddle_session_id)
3671 .unwrap()
3672 .unwrap();
3673 assert!(!report.pending_flush);
3674 assert_eq!(report.report_flush_state.as_deref(), Some("queued-local"));
3675 assert!(runtime.reports.outbox_path().exists());
3676 }
3677
3678 #[test]
3679 fn explicit_overrides_beat_fingerprint_and_user_defaults() {
3680 let (_temp, repo) = init_repo();
3681 let mut user_config = UserConfig::default();
3682 user_config.harness.harnesses.insert(
3683 "codex".to_string(),
3684 UserHarnessOverride {
3685 provider: Some("openai".to_string()),
3686 model: Some("gpt-default".to_string()),
3687 thinking_level: Some("medium".to_string()),
3688 policy: Some("default".to_string()),
3689 },
3690 );
3691 let identity = resolve_identity(
3692 &repo,
3693 &user_config,
3694 IdentityHints {
3695 harness: Some("codex".to_string()),
3696 provider: Some("openai".to_string()),
3697 model: Some("gpt-5.4".to_string()),
3698 thinking_level: Some("high".to_string()),
3699 policy: Some("custom".to_string()),
3700 probe: HarnessProbeResult::default(),
3701 },
3702 )
3703 .unwrap();
3704 assert_eq!(identity.model.as_deref(), Some("gpt-5.4"));
3705 assert_eq!(identity.thinking_level.as_deref(), Some("high"));
3706 assert_eq!(identity.policy.as_deref(), Some("custom"));
3707 }
3708
3709 #[test]
3710 fn transcript_mode_defaults_to_off_and_keeps_refs_empty() {
3711 let (_temp, repo) = init_repo();
3712 let user_config = UserConfig::default();
3713 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3714
3715 let opened = runtime
3716 .open_session(OpenSessionParams {
3717 harness: Some("codex".to_string()),
3718 provider: Some("openai".to_string()),
3719 model: Some("gpt-5.4".to_string()),
3720 ..OpenSessionParams::default()
3721 })
3722 .unwrap();
3723 let report = runtime
3724 .reports
3725 .load(&opened.heddle_session_id)
3726 .unwrap()
3727 .unwrap();
3728 assert_eq!(report.transcript_mode, "off");
3729 assert!(report.transcript_refs.is_empty());
3730 }
3731
3732 #[test]
3733 fn codex_thread_probe_reattaches_same_actor() {
3734 let (_temp, repo) = init_repo();
3735 let user_config = UserConfig::default();
3736 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3737
3738 let first = runtime
3739 .open_session(OpenSessionParams {
3740 harness: Some("codex".to_string()),
3741 probe_metadata: BTreeMap::from([
3742 ("thread_id".to_string(), "thr_123".to_string()),
3743 ("client_name".to_string(), "codex-tui".to_string()),
3744 ]),
3745 ..OpenSessionParams::default()
3746 })
3747 .unwrap();
3748 let second = runtime
3749 .open_session(OpenSessionParams {
3750 harness: Some("codex".to_string()),
3751 probe_metadata: BTreeMap::from([
3752 ("thread_id".to_string(), "thr_123".to_string()),
3753 ("client_name".to_string(), "codex-tui".to_string()),
3754 ]),
3755 ..OpenSessionParams::default()
3756 })
3757 .unwrap();
3758
3759 assert_eq!(first.agent_session_id, second.agent_session_id);
3760 assert_eq!(first.heddle_session_id, second.heddle_session_id);
3761 }
3762
3763 #[test]
3764 fn opencode_child_session_creates_distinct_actor_with_parent_key() {
3765 let (_temp, repo) = init_repo();
3766 let user_config = UserConfig::default();
3767 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3768
3769 let root = runtime
3770 .open_session(OpenSessionParams {
3771 harness: Some("opencode".to_string()),
3772 probe_metadata: BTreeMap::from([("session_id".to_string(), "root-1".to_string())]),
3773 ..OpenSessionParams::default()
3774 })
3775 .unwrap();
3776 let child = runtime
3777 .open_session(OpenSessionParams {
3778 harness: Some("opencode".to_string()),
3779 probe_metadata: BTreeMap::from([
3780 ("session_id".to_string(), "child-1".to_string()),
3781 ("parent_id".to_string(), "root-1".to_string()),
3782 ]),
3783 ..OpenSessionParams::default()
3784 })
3785 .unwrap();
3786
3787 assert_ne!(root.agent_session_id, child.agent_session_id);
3788 let report = runtime
3789 .reports
3790 .load(&child.heddle_session_id)
3791 .unwrap()
3792 .unwrap();
3793 assert_eq!(
3794 report.native_parent_actor_key.as_deref(),
3795 Some("opencode:session:root-1")
3796 );
3797 }
3798
3799 #[test]
3800 fn claude_resume_with_new_session_id_does_not_steal_existing_actor() {
3801 let (_temp, repo) = init_repo();
3802 let user_config = UserConfig::default();
3803 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3804
3805 let first = runtime
3806 .open_session(OpenSessionParams {
3807 harness: Some("claude-code".to_string()),
3808 probe_metadata: BTreeMap::from([
3809 ("session_id".to_string(), "sess-old".to_string()),
3810 (
3811 "transcript_path".to_string(),
3812 "/tmp/claude/session-a.jsonl".to_string(),
3813 ),
3814 ]),
3815 ..OpenSessionParams::default()
3816 })
3817 .unwrap();
3818 let resumed = runtime
3819 .open_session(OpenSessionParams {
3820 harness: Some("claude-code".to_string()),
3821 probe_metadata: BTreeMap::from([
3822 ("session_id".to_string(), "sess-new".to_string()),
3823 (
3824 "transcript_path".to_string(),
3825 "/tmp/claude/session-a.jsonl".to_string(),
3826 ),
3827 ]),
3828 ..OpenSessionParams::default()
3829 })
3830 .unwrap();
3831
3832 assert_ne!(first.agent_session_id, resumed.agent_session_id);
3833 assert_ne!(first.heddle_session_id, resumed.heddle_session_id);
3834 }
3835
3836 #[test]
3837 fn explicit_claude_harness_beats_generic_session_id_probe_match() {
3838 let (_temp, repo) = init_repo();
3839 let user_config = UserConfig::default();
3840 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3841
3842 let opened = runtime
3843 .open_session(OpenSessionParams {
3844 harness: Some("claude-code".to_string()),
3845 probe_metadata: BTreeMap::from([
3846 ("session_id".to_string(), "claude-sess-1".to_string()),
3847 ("hook_event".to_string(), "SubagentStop".to_string()),
3848 ]),
3849 ..OpenSessionParams::default()
3850 })
3851 .unwrap();
3852 let report = runtime
3853 .reports
3854 .load(&opened.heddle_session_id)
3855 .unwrap()
3856 .unwrap();
3857 assert_eq!(
3858 report.native_actor_key.as_deref(),
3859 Some("claude-code:session:claude-sess-1")
3860 );
3861 assert_eq!(report.harness.harness.as_deref(), Some("claude-code"));
3862 }
3863
3864 #[test]
3865 fn same_native_actor_key_reuses_existing_actor_after_tentative_session_creation() {
3866 let (_temp, repo) = init_repo();
3867 let user_config = UserConfig::default();
3868 let runtime = HarnessBridgeRuntime::new(repo, user_config);
3869 let principal = runtime.repo.get_principal().unwrap();
3870 let mut sessions = SessionManager::new(runtime.repo.root());
3871 let existing_session = sessions
3872 .start_session(
3873 principal.clone(),
3874 "anthropic".to_string(),
3875 "claude-opus-4-7[1m]".to_string(),
3876 None,
3877 )
3878 .unwrap();
3879 let tentative_session = sessions
3880 .start_session(
3881 principal,
3882 "anthropic".to_string(),
3883 "claude-opus-4-7[1m]".to_string(),
3884 None,
3885 )
3886 .unwrap();
3887
3888 let registry = AgentRegistry::new(runtime.repo.heddle_dir());
3889 let existing_entry = registry
3890 .create_generated_entry(|session_id| {
3891 Ok(AgentEntry {
3892 session_id: session_id.to_string(),
3893 client_instance_id: None,
3894 native_actor_key: Some(
3895 "claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
3896 ),
3897 native_parent_actor_key: None,
3898 native_instance_key: Some(
3899 "claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
3900 ),
3901 heddle_session_id: Some(existing_session.id.clone()),
3902 thread_id: None,
3903 thread: "detached".to_string(),
3904 pid: Some(std::process::id()),
3905 boot_id: None,
3906 liveness_path: None,
3907 heartbeat_at: Some(Utc::now()),
3908 anchor_state: None,
3909 anchor_root: None,
3910 reservation_token: Some(objects::store::generate_agent_id()),
3911 path: Some(runtime.repo.root().to_path_buf()),
3912 base_state: String::new(),
3913 started_at: Utc::now(),
3914 provider: Some("anthropic".to_string()),
3915 model: Some("claude-opus-4-7[1m]".to_string()),
3916 harness: Some("claude-code".to_string()),
3917 thinking_level: None,
3918 usage_summary: AgentUsageSummary::default(),
3919 last_progress_at: None,
3920 report_flush_state: Some("pending-local".to_string()),
3921 attach_reason: None,
3922 attach_precedence: vec![],
3923 winning_attach_rule: None,
3924 probe_source: Some("hook_payload".to_string()),
3925 probe_confidence: Some(1.0),
3926 status: AgentStatus::Active,
3927 completed_at: None,
3928 context_queries: vec![],
3929 })
3930 })
3931 .unwrap();
3932
3933 let probe = HarnessProbeResult {
3934 harness: Some("claude-code".to_string()),
3935 provider: Some("anthropic".to_string()),
3936 model: Some("claude-opus-4-7[1m]".to_string()),
3937 native_actor_key: Some(
3938 "claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
3939 ),
3940 native_instance_key: Some(
3941 "claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
3942 ),
3943 probe_source: Some("hook_payload".to_string()),
3944 confidence: Some(1.0),
3945 ..HarnessProbeResult::default()
3946 };
3947 let identity = ResolvedIdentity {
3948 harness: Some("claude-code".to_string()),
3949 provider: Some("anthropic".to_string()),
3950 model: Some("claude-opus-4-7[1m]".to_string()),
3951 thinking_level: None,
3952 policy: None,
3953 };
3954 let mut attach = ResolvedAttachment {
3955 target: AttachTarget::CreateNew {
3956 _because_claimed: false,
3957 },
3958 matched_entry: None,
3959 attach_reason:
3960 "started new Heddle session because no compatible native actor match was found"
3961 .to_string(),
3962 precedence: vec!["native-actor-key:miss".to_string()],
3963 winning_rule: "create-new-session".to_string(),
3964 };
3965
3966 let resolved_entry = runtime
3967 .ensure_registry_entry(RegistryEntryRequest {
3968 heddle_session_id: &tentative_session.id,
3969 thread_name: None,
3970 thread_id: None,
3971 identity: &identity,
3972 probe: &probe,
3973 attach: &attach,
3974 client_instance_id: None,
3975 requested_entry: None,
3976 })
3977 .unwrap();
3978 assert_eq!(resolved_entry.session_id, existing_entry.session_id);
3979 assert_eq!(
3980 resolved_entry.heddle_session_id.as_deref(),
3981 Some(existing_session.id.as_str())
3982 );
3983
3984 let (canonical_session, owns_session) = runtime
3985 .reuse_canonical_actor_session(
3986 &mut sessions,
3987 CanonicalActorSessionRequest {
3988 tentative_session: tentative_session.clone(),
3989 tentative_owns_session: true,
3990 entry: &resolved_entry,
3991 probe: &probe,
3992 attach: &mut attach,
3993 },
3994 )
3995 .unwrap();
3996 assert_eq!(canonical_session.id, existing_session.id);
3997 assert!(!owns_session);
3998 assert!(
3999 attach
4000 .precedence
4001 .iter()
4002 .any(|step| step.starts_with("post-create-native-actor-key:"))
4003 );
4004 assert_eq!(attach.winning_rule, "native-actor-key-post-create");
4005 assert!(
4006 !sessions
4007 .get_session(&tentative_session.id)
4008 .unwrap()
4009 .unwrap()
4010 .is_active()
4011 );
4012 }
4013
4014 #[test]
4015 fn close_session_does_not_blame_preexisting_dirty_worktree() {
4016 let (temp, repo) = init_repo();
4017 std::fs::write(temp.path().join("preexisting.txt"), "already dirty\n").unwrap();
4018 let user_config = UserConfig::default();
4019 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
4020
4021 let opened = runtime
4022 .open_session(OpenSessionParams {
4023 harness: Some("claude-code".to_string()),
4024 provider: Some("anthropic".to_string()),
4025 model: Some("claude-opus-4-7[1m]".to_string()),
4026 ..OpenSessionParams::default()
4027 })
4028 .unwrap();
4029 let closed = runtime
4030 .close_session(CloseSessionParams {
4031 heddle_session_id: opened.heddle_session_id.clone(),
4032 outcome: Some("completed".to_string()),
4033 ..CloseSessionParams::default()
4034 })
4035 .unwrap();
4036 let report = runtime
4037 .reports
4038 .load(&opened.heddle_session_id)
4039 .unwrap()
4040 .unwrap();
4041
4042 assert!(
4043 report
4044 .worktree_changes_at_open
4045 .iter()
4046 .any(|change| change.path == "preexisting.txt")
4047 );
4048 assert!(
4049 !closed
4050 .changed_paths
4051 .iter()
4052 .any(|path| path == "preexisting.txt")
4053 );
4054 assert_eq!(closed.diff_summary.changed_file_count, 0);
4055 }
4056
4057 #[test]
4058 fn timeline_state_delta_paths_ignore_uncaptured_worktree_changes() {
4059 let (temp, repo) = init_repo();
4060 let repo_root = repo.root().to_path_buf();
4061 std::fs::write(repo_root.join("tracked.txt"), b"one\n").unwrap();
4062 let before = repo.snapshot(Some("seed".into()), None).unwrap();
4063 std::fs::write(repo_root.join("tracked.txt"), b"two\n").unwrap();
4064 let after = repo.snapshot(Some("advance".into()), None).unwrap();
4065 std::fs::write(temp.path().join("ambient.txt"), b"not in the state delta\n").unwrap();
4066
4067 assert_eq!(
4068 changed_paths_between_states(&repo, before.change_id, after.change_id).unwrap(),
4069 vec!["tracked.txt"]
4070 );
4071 }
4072
4073 #[test]
4074 fn relay_claude_stop_captures_state_with_agent_attribution() {
4075 let (temp, repo) = init_repo();
4076 let repo_root = repo.root().to_path_buf();
4077
4078 std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
4080 let _ = repo.snapshot(Some("seed".into()), None).unwrap();
4081
4082 std::fs::write(repo_root.join("seed.txt"), b"hello, heddle").unwrap();
4084
4085 drop(repo);
4086
4087 let fresh_repo = Repository::open(temp.path()).unwrap();
4088 let user_config = UserConfig {
4089 principal: Some(crate::config::UserPrincipalConfig {
4090 name: "Ada Lovelace".to_string(),
4091 email: "ada@example.com".to_string(),
4092 }),
4093 ..UserConfig::default()
4094 };
4095 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, user_config);
4096 let payload = serde_json::json!({
4097 "session_id": "claude-sess-123",
4098 "transcript_path": "/tmp/claude/x.jsonl",
4099 "model": {
4100 "id": "claude-opus-4-7",
4101 "display_name": "Claude Opus 4.7",
4102 },
4103 "message": "hook-driven capture test",
4104 "hook_event_name": "Stop",
4105 });
4106 relay_claude(&mut runtime, "Stop", &payload).unwrap();
4107 drop(runtime);
4108
4109 let verify = Repository::open(temp.path()).unwrap();
4110 let head_id = verify.head().unwrap().expect("HEAD after Stop capture");
4111 let state = verify
4112 .store()
4113 .get_state(&head_id)
4114 .unwrap()
4115 .expect("state for HEAD");
4116 let agent = state.attribution.agent.expect("agent attribution on state");
4117 assert_eq!(agent.provider, "anthropic");
4118 assert_eq!(agent.model, "Claude Opus 4.7");
4119 assert_eq!(
4120 state.intent.as_deref(),
4121 Some("hook-driven capture test"),
4122 "intent should be pulled from payload message",
4123 );
4124 }
4125
4126 #[test]
4127 fn relay_claude_stop_is_idempotent_when_clean() {
4128 let (temp, repo) = init_repo();
4129 let repo_root = repo.root().to_path_buf();
4130 std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
4131 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4132 drop(repo);
4133
4134 let fresh_repo = Repository::open(temp.path()).unwrap();
4135 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4136 let payload = serde_json::json!({
4137 "session_id": "claude-sess-clean",
4138 "model": {"id": "claude-sonnet-4-6"},
4139 });
4140 relay_claude(&mut runtime, "Stop", &payload).unwrap();
4141 drop(runtime);
4142
4143 let verify = Repository::open(temp.path()).unwrap();
4144 let head_id = verify.head().unwrap().expect("HEAD preserved");
4145 assert_eq!(
4146 head_id, seed.change_id,
4147 "no change expected when worktree is clean",
4148 );
4149 }
4150
4151 #[test]
4152 fn relay_claude_pre_tool_use_ignores_non_file_tool() {
4153 let (temp, repo) = init_repo();
4154 drop(repo);
4155 let fresh_repo = Repository::open(temp.path()).unwrap();
4156 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4157 let payload = serde_json::json!({
4158 "session_id": "claude-sess-bash",
4159 "tool_name": "Bash",
4160 "tool_input": {"command": "ls"},
4161 });
4162 relay_claude(&mut runtime, "PreToolUse", &payload).unwrap();
4164 }
4165
4166 #[test]
4167 fn relay_opencode_tool_execute_before_records_timeline_step() {
4168 let (_temp, repo) = init_repo();
4169 let root = repo.root().to_path_buf();
4170 std::fs::write(root.join("seed.txt"), b"hello").unwrap();
4171 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4172 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4173 let payload = opencode_tool_payload("call-1");
4174
4175 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4176
4177 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4178 let view = TimelineView::rebuild(&store).unwrap();
4179 let steps = view.steps_for_thread("main");
4180 assert_eq!(steps.len(), 1);
4181 let step = steps[0];
4182 assert_eq!(step.native.as_ref().unwrap().harness, "opencode");
4183 assert_eq!(step.native.as_ref().unwrap().tool_call_id, "call-1");
4184 assert_eq!(step.tool_name.as_deref(), Some("bash"));
4185 assert_eq!(step.before_state, Some(seed.change_id));
4186 assert!(step.status.is_none());
4187 assert!(step.payload_summary.as_deref().unwrap().contains("call-1"));
4188 assert!(step.payload_hash.is_some());
4189 assert!(
4190 step.labels
4191 .contains(&TimelineLabel::ExternalSideEffectsUnknown)
4192 );
4193 }
4194
4195 #[test]
4196 fn relay_opencode_tool_execute_after_captures_dirty_worktree() {
4197 let (_temp, repo) = init_repo();
4198 let root = repo.root().to_path_buf();
4199 std::fs::write(root.join("tracked.txt"), b"one\n").unwrap();
4200 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4201 let user_config = UserConfig {
4202 principal: Some(crate::config::UserPrincipalConfig {
4203 name: "Ada Lovelace".to_string(),
4204 email: "ada@example.com".to_string(),
4205 }),
4206 ..UserConfig::default()
4207 };
4208 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
4209 let payload = opencode_tool_payload("call-2");
4210
4211 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4212 std::fs::write(root.join("tracked.txt"), b"two\n").unwrap();
4213 relay_opencode(&mut runtime, "tool.execute.after", &payload).unwrap();
4214
4215 let head = runtime.repo.head().unwrap().expect("capture advanced HEAD");
4216 assert_ne!(head, seed.change_id);
4217 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4218 let view = TimelineView::rebuild(&store).unwrap();
4219 let steps = view.steps_for_thread("main");
4220 assert_eq!(steps.len(), 1, "before/after should merge by native id");
4221 let step = steps[0];
4222 assert_eq!(step.operation_ids.len(), 2);
4223 assert_eq!(step.status, Some(TimelineToolCallStatus::Succeeded));
4224 assert_eq!(step.before_state, Some(seed.change_id));
4225 assert_eq!(step.after_state, Some(head));
4226 assert_eq!(step.capture_state, Some(head));
4227 assert_eq!(step.changed, Some(true));
4228 assert!(step.touched_paths.contains(&"tracked.txt".to_string()));
4229 assert!(step.labels.contains(&TimelineLabel::RepoReversible));
4230 assert!(
4231 step.labels
4232 .contains(&TimelineLabel::ExternalSideEffectsUnknown)
4233 );
4234 assert!(!step.payload_summary.as_deref().unwrap().contains("SECRET"));
4235 assert!(step.payload_hash.is_some());
4236 }
4237
4238 #[cfg(unix)]
4239 #[test]
4240 fn relay_opencode_tool_execute_after_records_capture_failed_without_ambient_paths() {
4241 let (_temp, repo) = init_repo();
4242 let root = repo.root().to_path_buf();
4243 std::fs::write(root.join("seed.txt"), b"seed\n").unwrap();
4244 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4245 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4246 let mut payload = opencode_tool_payload("call-capture-failed");
4247 payload["tool"]["input"]["file_path"] = serde_json::json!("hinted.txt");
4248 let hooks_dir = root.join(".heddle/hooks");
4249 std::fs::create_dir_all(&hooks_dir).unwrap();
4250 let hook_path = hooks_dir.join("pre-snapshot");
4251 std::fs::write(&hook_path, "#!/bin/sh\nexit 1\n").unwrap();
4252 let mut perms = std::fs::metadata(&hook_path).unwrap().permissions();
4253 perms.set_mode(0o755);
4254 std::fs::set_permissions(&hook_path, perms).unwrap();
4255
4256 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4257 std::fs::write(root.join("ambient.txt"), b"dirty but uncaptured\n").unwrap();
4258 relay_opencode(&mut runtime, "tool.execute.after", &payload).unwrap();
4259
4260 assert_eq!(
4261 runtime.repo.head().unwrap(),
4262 Some(seed.change_id),
4263 "capture failure must not advance HEAD"
4264 );
4265 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4266 let view = TimelineView::rebuild(&store).unwrap();
4267 let steps = view.steps_for_thread("main");
4268 assert_eq!(steps.len(), 1, "before/after should merge by native id");
4269 let step = steps[0];
4270 assert_eq!(step.operation_ids.len(), 2);
4271 assert_eq!(step.before_state, Some(seed.change_id));
4272 assert_eq!(step.after_state, Some(seed.change_id));
4273 assert_eq!(step.capture_state, None);
4274 assert_eq!(step.changed, Some(false));
4275 assert!(step.labels.contains(&TimelineLabel::CaptureFailed));
4276 assert!(
4277 !step.labels.contains(&TimelineLabel::RepoReversible),
4278 "failed captures are not repo-reversible"
4279 );
4280 assert_eq!(step.touched_paths, vec!["hinted.txt"]);
4281 }
4282
4283 #[test]
4284 fn relay_opencode_tool_execute_missing_tool_id_does_not_fail_or_record_timeline() {
4285 let (_temp, repo) = init_repo();
4286 let root = repo.root().to_path_buf();
4287 std::fs::write(root.join("seed.txt"), b"hello").unwrap();
4288 let _ = repo.snapshot(Some("seed".into()), None).unwrap();
4289 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4290 let payload = serde_json::json!({
4291 "sessionID": "opencode-session",
4292 "model": "gpt-5.4",
4293 "provider": "openai",
4294 "tool": {"name": "bash"},
4295 });
4296
4297 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4298
4299 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4300 let view = TimelineView::rebuild(&store).unwrap();
4301 assert!(view.steps_for_thread("main").is_empty());
4302 let report_count = std::fs::read_dir(root.join(".heddle/state/session-reports"))
4303 .unwrap()
4304 .count();
4305 assert!(
4306 report_count > 0,
4307 "session progress should still be recorded"
4308 );
4309 }
4310
4311 fn opencode_tool_payload(call_id: &str) -> Value {
4312 serde_json::json!({
4313 "sessionID": "opencode-session",
4314 "messageID": "message-1",
4315 "toolCallID": call_id,
4316 "model": "gpt-5.4",
4317 "provider": "openai",
4318 "tool": {
4319 "name": "bash",
4320 "input": {
4321 "command": "echo SECRET",
4322 "file_path": "tracked.txt"
4323 }
4324 },
4325 "status": "success"
4326 })
4327 }
4328
4329 #[test]
4330 fn relay_claude_subagent_start_creates_child_entry_with_parent_key() {
4331 let (temp, repo) = init_repo();
4332 drop(repo);
4333 let fresh_repo = Repository::open(temp.path()).unwrap();
4334 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4335 let payload = serde_json::json!({
4336 "session_id": "parent-claude-sess",
4337 "agent_id": "child-subagent-xyz",
4338 "model": {"id": "claude-sonnet-4-6"},
4339 });
4340 relay_claude(&mut runtime, "SubagentStart", &payload).unwrap();
4341 drop(runtime);
4342
4343 let verify = Repository::open(temp.path()).unwrap();
4344 let registry = AgentRegistry::new(verify.heddle_dir());
4345 let child = registry
4346 .find_active_by_native_actor_key("claude-code:agent:child-subagent-xyz")
4347 .unwrap()
4348 .expect("subagent AgentEntry should exist after SubagentStart");
4349 assert_eq!(
4350 child.native_parent_actor_key.as_deref(),
4351 Some("claude-code:session:parent-claude-sess"),
4352 "subagent must carry parent session linkage",
4353 );
4354 assert_eq!(child.status, AgentStatus::Active);
4355 }
4356
4357 #[test]
4358 fn relay_claude_subagent_stop_marks_child_entry_complete() {
4359 let (temp, repo) = init_repo();
4360 let repo_root = repo.root().to_path_buf();
4361 drop(repo);
4362
4363 let fresh = Repository::open(temp.path()).unwrap();
4365 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4366 let start_payload = serde_json::json!({
4367 "session_id": "parent-sess",
4368 "agent_id": "worker-1",
4369 "model": {"id": "claude-sonnet-4-6"},
4370 });
4371 relay_claude(&mut runtime, "SubagentStart", &start_payload).unwrap();
4372 drop(runtime);
4373
4374 std::fs::write(
4376 repo_root.join("child-output.txt"),
4377 b"subagent produced this",
4378 )
4379 .unwrap();
4380
4381 let fresh = Repository::open(temp.path()).unwrap();
4382 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4383 let stop_payload = serde_json::json!({
4384 "session_id": "parent-sess",
4385 "agent_id": "worker-1",
4386 "model": {
4387 "id": "claude-sonnet-4-6",
4388 "display_name": "Claude Sonnet 4.6",
4389 },
4390 });
4391 relay_claude(&mut runtime, "SubagentStop", &stop_payload).unwrap();
4392 drop(runtime);
4393
4394 let verify = Repository::open(temp.path()).unwrap();
4395 let registry = AgentRegistry::new(verify.heddle_dir());
4396 let child = registry
4397 .list()
4398 .unwrap()
4399 .into_iter()
4400 .find(|e| e.native_actor_key.as_deref() == Some("claude-code:agent:worker-1"))
4401 .expect("child entry should still exist");
4402 assert_eq!(
4403 child.status,
4404 AgentStatus::Complete,
4405 "SubagentStop should mark the child entry Complete",
4406 );
4407 }
4408
4409 #[test]
4410 fn relay_claude_user_prompt_submit_rotates_segment() {
4411 let (temp, repo) = init_repo();
4412 drop(repo);
4413
4414 let fresh = Repository::open(temp.path()).unwrap();
4415 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4416 let session_payload = serde_json::json!({
4418 "session_id": "claude-prompt-sess",
4419 "model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
4420 });
4421 relay_claude(&mut runtime, "SessionStart", &session_payload).unwrap();
4422 let sessions_before = SessionManager::new(runtime.repo.root())
4423 .list_sessions(true)
4424 .unwrap();
4425 let initial_segments = sessions_before
4426 .iter()
4427 .find(|s| !s.segments.is_empty())
4428 .map(|s| s.segments.len())
4429 .unwrap_or(0);
4430
4431 let prompt_payload = serde_json::json!({
4433 "session_id": "claude-prompt-sess",
4434 "model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
4435 "prompt": "write a new feature",
4436 });
4437 relay_claude(&mut runtime, "UserPromptSubmit", &prompt_payload).unwrap();
4438 drop(runtime);
4439
4440 let verify = Repository::open(temp.path()).unwrap();
4441 let sessions_after = SessionManager::new(verify.root())
4442 .list_sessions(true)
4443 .unwrap();
4444 let rotated = sessions_after
4445 .iter()
4446 .any(|s| s.segments.len() > initial_segments);
4447 assert!(
4448 rotated,
4449 "UserPromptSubmit must add at least one segment beyond the SessionStart baseline \
4450 (initial={initial_segments}, sessions_after={:?})",
4451 sessions_after
4452 .iter()
4453 .map(|s| (s.id.clone(), s.segments.len()))
4454 .collect::<Vec<_>>(),
4455 );
4456 }
4457}