1use std::{
3 collections::{BTreeMap, BTreeSet},
4 fs::{self, OpenOptions},
5 io::{BufRead, BufReader, BufWriter, Write},
6 path::{Path, PathBuf},
7};
8
9use anyhow::{Result, anyhow};
10use base64::Engine as _;
11use chrono::Utc;
12use objects::{
13 fs_atomic::write_file_atomic,
14 object::{
15 ChangeId, ContentHash, DiffKind, NativeToolCallRefV1, Session, ThreadName,
16 TimelineBranchId, TimelineLabel, TimelineOperationBodyV1, TimelineOperationEnvelope,
17 TimelineStepId, TimelineToolCallStatus, TimelineToolPayloadMetadata, ToolCallFinishedV1,
18 ToolCallStartedV1, Tree,
19 },
20 store::{AgentEntry, AgentRegistry, AgentStatus, AgentUsageSummary, ObjectStore},
21};
22use oplog::OpLogRecorder;
23use refs::Head;
24use repo::{
25 Repository, SessionManager, Thread, ThreadFreshness, ThreadIntegrationPolicy, ThreadManager,
26 ThreadMode, ThreadState, TimelineStore, TimelineView,
27};
28use serde::{Deserialize, Serialize};
29use serde_json::Value;
30use wire::{
31 HarnessIdentity, ProgressCheckpoint, SessionDiffSummary, SessionReportEnvelope,
32 TranscriptAttachmentRef, UsageTotals, WorktreeChangeBaseline,
33};
34
35mod claude_hook;
36mod probe;
37
38use self::probe::{HarnessProbeInput, HarnessProbeResult, probe_harness_actor};
39use crate::{
40 cli::{
41 Cli,
42 commands::{
43 snapshot::{
44 SnapshotAgentOverrides, create_snapshot, summarize_confidence,
45 summarize_verification,
46 },
47 worktree_cmd::helpers::{prepare_worktree_target, write_isolated_checkout},
48 },
49 style, worktree_status_options,
50 },
51 config::{
52 HarnessMode, HarnessTranscriptMode, HarnessTransport, UserConfig, UserHarnessOverride,
53 UserHarnessRootThreadPolicy, UserHarnessSubagentThreadPolicy, UserThreadWorkspaceMode,
54 },
55};
56
57pub(crate) fn probe_current_process_harness(
58 repo: &Repository,
59 current_provider: Option<String>,
60 current_model: Option<String>,
61 current_policy: Option<String>,
62) -> Result<HarnessProbeResult> {
63 probe_harness_actor(&HarnessProbeInput {
64 argv: detected_harness_argv().or_else(|| Some(std::env::args().collect())),
65 env_hints: harness_env_hints(),
66 explicit_harness: None,
67 explicit_provider: None,
68 explicit_model: None,
69 explicit_thinking_level: None,
70 explicit_policy: None,
71 probe_metadata: BTreeMap::new(),
72 current_provider,
73 current_model,
74 current_policy,
75 repo_root: repo.root().display().to_string(),
76 })
77}
78
79fn harness_env_hints() -> BTreeMap<String, String> {
80 std::env::vars()
81 .filter(|(key, value)| {
82 !value.trim().is_empty()
83 && (key.starts_with("HEDDLE_AGENT_")
84 || key.starts_with("CODEX_")
85 || key.starts_with("CLAUDE")
86 || key.starts_with("ANTHROPIC_")
87 || key.starts_with("OPENAI_")
88 || key.starts_with("OPENCODE_")
89 || key.starts_with("AIDER_")
90 || matches!(
91 key.as_str(),
92 "MODEL" | "REASONING_EFFORT" | "THINKING_LEVEL"
93 ))
94 })
95 .collect()
96}
97
98fn detected_harness_argv() -> Option<Vec<String>> {
99 detected_harness_argv_impl()
100}
101
102#[cfg(target_os = "linux")]
103fn detected_harness_argv_impl() -> Option<Vec<String>> {
104 let mut pid = std::process::id();
105 for _ in 0..8 {
106 let stat = fs::read_to_string(format!("/proc/{pid}/stat")).ok()?;
107 let ppid = stat.split_whitespace().nth(3)?.parse::<u32>().ok()?;
108 if ppid == 0 || ppid == pid {
109 return None;
110 }
111 pid = ppid;
112 let raw = fs::read(format!("/proc/{pid}/cmdline")).ok()?;
113 let argv = raw
114 .split(|byte| *byte == 0)
115 .filter(|part| !part.is_empty())
116 .map(|part| String::from_utf8_lossy(part).to_string())
117 .collect::<Vec<_>>();
118 let program = argv.first().map(|arg| arg.to_ascii_lowercase())?;
119 if ["codex", "claude", "opencode", "aider"]
120 .iter()
121 .any(|needle| program.contains(needle))
122 {
123 return Some(argv);
124 }
125 }
126 None
127}
128
129#[cfg(not(target_os = "linux"))]
130fn detected_harness_argv_impl() -> Option<Vec<String>> {
131 None
132}
133
134pub fn cmd_harness_bridge(cli: &Cli) -> Result<()> {
135 let repo = cli.open_repo()?;
136 let mut runtime = init_harness_runtime(&repo)?;
137
138 let stdin = std::io::stdin();
139 let stdout = std::io::stdout();
140 let reader = BufReader::new(stdin.lock());
141 let mut writer = BufWriter::new(stdout.lock());
142
143 for line in reader.lines() {
144 let line = line?;
145 if line.trim().is_empty() {
146 continue;
147 }
148 let response = match serde_json::from_str::<BridgeRequest>(&line) {
149 Ok(request) => runtime.handle_request(request),
150 Err(err) => BridgeResponse::error(
151 None,
152 "invalid_request",
153 format!("failed to parse request: {err}"),
154 ),
155 };
156 serde_json::to_writer(&mut writer, &response)?;
157 writer.write_all(b"\n")?;
158 writer.flush()?;
159 }
160
161 Ok(())
162}
163
164pub(crate) fn relay_harness_event(
165 repo: &Repository,
166 harness: &str,
167 event: &str,
168 payload: &str,
169) -> Result<()> {
170 let mut runtime = init_harness_runtime(repo)?;
171 let (json, warning) = parse_relay_payload(payload);
172 if let Some(warning) = warning {
173 eprintln!("{}", style::warn(&warning));
174 }
175 match harness {
176 "codex" => relay_codex(&mut runtime, event, &json),
177 "claude-code" => relay_claude(&mut runtime, event, &json),
178 "opencode" => relay_opencode(&mut runtime, event, &json),
179 other => Err(anyhow!("unsupported harness relay: {other}")),
180 }
181}
182
183fn init_harness_runtime(repo: &Repository) -> Result<HarnessBridgeRuntime> {
184 let (user_config, warning) = load_harness_user_config(UserConfig::default_path());
185 if let Some(warning) = warning {
186 eprintln!("{}", style::warn(&warning));
187 }
188 Ok(HarnessBridgeRuntime::new(
189 Repository::open(repo.root())?,
190 user_config,
191 ))
192}
193
194fn load_harness_user_config(default_path: Option<PathBuf>) -> (UserConfig, Option<String>) {
195 let Some(path) = default_path else {
196 return (UserConfig::default(), None);
197 };
198 match UserConfig::load(&path) {
199 Ok(config) => (config, None),
200 Err(err) if is_not_found(&err) => (UserConfig::default(), None),
201 Err(err) => {
202 let warning = format!(
203 "warning: failed to load user config from {}: {err}; continuing with defaults",
204 path.display()
205 );
206 (UserConfig::default(), Some(warning))
207 }
208 }
209}
210
211fn is_not_found(err: &anyhow::Error) -> bool {
212 err.downcast_ref::<std::io::Error>()
213 .is_some_and(|io| io.kind() == std::io::ErrorKind::NotFound)
214}
215
216fn parse_relay_payload(payload: &str) -> (Value, Option<String>) {
217 if payload.trim().is_empty() {
218 return (Value::Null, None);
219 }
220 match serde_json::from_str::<Value>(payload) {
221 Ok(value) => (value, None),
222 Err(err) => (
223 Value::Null,
224 Some(format!(
225 "warning: failed to parse harness relay payload as JSON: {err}; continuing with null payload"
226 )),
227 ),
228 }
229}
230
231struct HarnessBridgeRuntime {
232 repo: Repository,
233 user_config: UserConfig,
234 reports: SessionReportStore,
235}
236
237struct RegistryEntryRequest<'a> {
238 heddle_session_id: &'a str,
239 thread_name: Option<&'a str>,
240 thread_id: Option<&'a str>,
241 identity: &'a ResolvedIdentity,
242 probe: &'a HarnessProbeResult,
243 attach: &'a ResolvedAttachment,
244 client_instance_id: Option<&'a str>,
245 requested_entry: Option<&'a AgentEntry>,
246}
247
248struct CanonicalActorSessionRequest<'a> {
249 tentative_session: Session,
250 tentative_owns_session: bool,
251 entry: &'a AgentEntry,
252 probe: &'a HarnessProbeResult,
253 attach: &'a mut ResolvedAttachment,
254}
255
256struct AttachmentResolutionInput<'a> {
257 requested_entry: Option<&'a AgentEntry>,
258 explicit_heddle_session_id: Option<&'a str>,
259 client_instance_id: Option<&'a str>,
260 probe: &'a HarnessProbeResult,
261 token_claims: Option<&'a TokenClaims>,
262}
263
264fn relay_codex(runtime: &mut HarnessBridgeRuntime, _event: &str, payload: &Value) -> Result<()> {
265 let metadata = map_from_pairs([
266 (
267 "client_name",
268 value_string(payload, &["client"]).or_else(|| value_string(payload, &["client_name"])),
269 ),
270 ("model", value_string(payload, &["model"])),
271 (
272 "model_provider",
273 value_string(payload, &["model_provider"])
274 .or_else(|| value_string(payload, &["provider"])),
275 ),
276 (
277 "model_reasoning_effort",
278 value_string(payload, &["reasoning_effort"]),
279 ),
280 ]);
281 let opened = runtime.open_session(OpenSessionParams {
282 harness: Some("codex".to_string()),
283 summary: value_string(payload, &["message"]),
284 probe_metadata: metadata,
285 ..OpenSessionParams::default()
286 })?;
287 runtime.update_progress(UpdateProgressParams {
288 heddle_session_id: opened.heddle_session_id,
289 summary: value_string(payload, &["message"]),
290 harness: Some("codex".to_string()),
291 ..UpdateProgressParams::default()
292 })?;
293 Ok(())
294}
295
296fn relay_claude(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
297 let metadata = map_from_pairs([
298 ("session_id", value_string(payload, &["session_id"])),
299 ("agent_id", value_string(payload, &["agent_id"])),
300 ("session_name", value_string(payload, &["session_name"])),
301 (
302 "transcript_path",
303 value_string(payload, &["transcript_path"]),
304 ),
305 (
306 "model",
307 value_string(payload, &["model", "id"]).or_else(|| value_string(payload, &["model"])),
308 ),
309 (
310 "model_display_name",
311 value_string(payload, &["model", "display_name"]),
312 ),
313 ("effort", value_string(payload, &["effort"])),
314 ("hook_event", Some(event.to_string())),
315 (
316 "status_line",
317 (event == "StatusLine").then(|| "1".to_string()),
318 ),
319 (
320 "touched_paths",
321 value_array_join(payload, &["tool_response", "filePaths"])
322 .or_else(|| value_string(payload, &["file_path"])),
323 ),
324 (
325 "input_tokens",
326 value_u64_string(payload, &["context_window", "total_input_tokens"]),
327 ),
328 (
329 "output_tokens",
330 value_u64_string(payload, &["context_window", "total_output_tokens"]),
331 ),
332 (
333 "cost_micros_usd",
334 value_cost_micros(payload, &["cost", "total_cost_usd"]),
335 ),
336 ]);
337 let opened = runtime.open_session(OpenSessionParams {
338 harness: Some("claude-code".to_string()),
339 model: value_string(payload, &["model", "display_name"])
340 .or_else(|| value_string(payload, &["model", "id"]))
341 .or_else(|| value_string(payload, &["model"])),
342 summary: value_string(payload, &["message"]).or_else(|| value_string(payload, &["reason"])),
343 probe_metadata: metadata.clone(),
344 ..OpenSessionParams::default()
345 })?;
346 match event {
347 "SessionEnd" => {
348 runtime.close_session(CloseSessionParams {
349 heddle_session_id: opened.heddle_session_id,
350 summary: value_string(payload, &["reason"])
351 .or_else(|| value_string(payload, &["stop_hook_active"])),
352 outcome: Some("completed".to_string()),
353 ..CloseSessionParams::default()
354 })?;
355 }
356 "StatusLine" => {
357 runtime.update_progress(UpdateProgressParams {
358 heddle_session_id: opened.heddle_session_id.clone(),
359 harness: Some("claude-code".to_string()),
360 status: Some("StatusLine".to_string()),
361 message: value_string(payload, &["session_name"])
362 .or_else(|| value_string(payload, &["cwd"]))
363 .or_else(|| value_string(payload, &["workspace", "current_dir"])),
364 probe_metadata: metadata.clone(),
365 ..UpdateProgressParams::default()
366 })?;
367 runtime.record_usage(RecordUsageParams {
368 heddle_session_id: opened.heddle_session_id,
369 input_tokens: value_u64(payload, &["context_window", "total_input_tokens"]),
370 output_tokens: value_u64(payload, &["context_window", "total_output_tokens"]),
371 reasoning_tokens: value_u64(payload, &["context_window", "total_reasoning_tokens"]),
372 cache_creation_tokens: None,
373 cache_read_tokens: None,
374 tool_calls: None,
375 cost_micros_usd: value_cost_micros_u64(payload, &["cost", "total_cost_usd"]),
376 })?;
377 }
378 "Stop" => {
379 runtime.update_progress(UpdateProgressParams {
380 heddle_session_id: opened.heddle_session_id,
381 harness: Some("claude-code".to_string()),
382 status: Some("Stop".to_string()),
383 message: value_string(payload, &["message"])
384 .or_else(|| value_string(payload, &["result"]))
385 .or_else(|| value_string(payload, &["stop_reason"])),
386 probe_metadata: metadata,
387 ..UpdateProgressParams::default()
388 })?;
389 if let Err(err) = claude_hook::handle_stop_capture(
390 &runtime.repo,
391 &runtime.user_config,
392 payload,
393 "Claude Code turn",
394 ) {
395 tracing::warn!(?err, "heddle Stop hook capture failed");
396 }
397 }
398 "SubagentStop" => {
399 runtime.update_progress(UpdateProgressParams {
400 heddle_session_id: opened.heddle_session_id,
401 harness: Some("claude-code".to_string()),
402 status: Some("SubagentStop".to_string()),
403 touched_paths: csv_from_value(metadata.get("touched_paths")),
404 probe_metadata: metadata,
405 ..UpdateProgressParams::default()
406 })?;
407 if let Err(err) = claude_hook::handle_stop_capture(
408 &runtime.repo,
409 &runtime.user_config,
410 payload,
411 "Claude Code subagent turn",
412 ) {
413 tracing::warn!(?err, "heddle SubagentStop hook capture failed");
414 }
415 if let Err(err) = claude_hook::mark_subagent_complete(&runtime.repo, payload) {
416 tracing::debug!(?err, "heddle SubagentStop mark-complete failed");
417 }
418 }
419 "SubagentStart" => {
420 runtime.update_progress(UpdateProgressParams {
427 heddle_session_id: opened.heddle_session_id,
428 harness: Some("claude-code".to_string()),
429 status: Some("SubagentStart".to_string()),
430 touched_paths: csv_from_value(metadata.get("touched_paths")),
431 probe_metadata: metadata,
432 ..UpdateProgressParams::default()
433 })?;
434 }
435 "UserPromptSubmit" => {
436 runtime.update_progress(UpdateProgressParams {
437 heddle_session_id: opened.heddle_session_id.clone(),
438 harness: Some("claude-code".to_string()),
439 status: Some("UserPromptSubmit".to_string()),
440 touched_paths: csv_from_value(metadata.get("touched_paths")),
441 probe_metadata: metadata,
442 ..UpdateProgressParams::default()
443 })?;
444 if let Err(err) = claude_hook::handle_user_prompt_segment_rotate(
445 &runtime.repo,
446 &opened.heddle_session_id,
447 payload,
448 ) {
449 tracing::debug!(?err, "heddle UserPromptSubmit segment rotation failed");
450 }
451 }
452 "PreToolUse" => {
453 runtime.update_progress(UpdateProgressParams {
454 heddle_session_id: opened.heddle_session_id,
455 harness: Some("claude-code".to_string()),
456 status: Some("PreToolUse".to_string()),
457 touched_paths: csv_from_value(metadata.get("touched_paths")),
458 probe_metadata: metadata,
459 ..UpdateProgressParams::default()
460 })?;
461 if let Err(err) = claude_hook::handle_pre_tool_use(&runtime.repo, payload) {
462 tracing::debug!(?err, "heddle PreToolUse context inject skipped");
463 }
464 }
465 _ => {
466 runtime.update_progress(UpdateProgressParams {
467 heddle_session_id: opened.heddle_session_id,
468 harness: Some("claude-code".to_string()),
469 status: Some(event.to_string()),
470 touched_paths: csv_from_value(metadata.get("touched_paths")),
471 probe_metadata: metadata,
472 ..UpdateProgressParams::default()
473 })?;
474 }
475 }
476 Ok(())
477}
478
479fn relay_opencode(runtime: &mut HarnessBridgeRuntime, event: &str, payload: &Value) -> Result<()> {
480 let metadata = map_from_pairs([
481 (
482 "session_id",
483 value_string(payload, &["sessionID"])
484 .or_else(|| value_string(payload, &["session_id"])),
485 ),
486 (
487 "parent_id",
488 value_string(payload, &["parentID"]).or_else(|| value_string(payload, &["parent_id"])),
489 ),
490 (
491 "client_name",
492 value_string(payload, &["client"]).or_else(|| std::env::var("OPENCODE_CLIENT").ok()),
493 ),
494 ("model", value_string(payload, &["model"])),
495 ("provider", value_string(payload, &["provider"])),
496 ("hook_event", Some(event.to_string())),
497 (
498 "touched_paths",
499 value_string(payload, &["file", "path"]).or_else(|| value_string(payload, &["path"])),
500 ),
501 ]);
502 let opened = runtime.open_session(OpenSessionParams {
503 harness: Some("opencode".to_string()),
504 model: value_string(payload, &["model"]),
505 provider: value_string(payload, &["provider"]),
506 probe_metadata: metadata.clone(),
507 ..OpenSessionParams::default()
508 })?;
509 let session_id = opened.heddle_session_id.clone();
510 runtime.update_progress(UpdateProgressParams {
511 heddle_session_id: session_id.clone(),
512 harness: Some("opencode".to_string()),
513 status: Some(event.to_string()),
514 touched_paths: csv_from_value(metadata.get("touched_paths")),
515 probe_metadata: metadata,
516 ..UpdateProgressParams::default()
517 })?;
518 if let Err(err) = record_opencode_timeline_event(runtime, event, payload, &opened) {
519 tracing::debug!(?err, event, "heddle OpenCode timeline recording skipped");
520 }
521 Ok(())
522}
523
524#[derive(Clone, Copy, Debug, PartialEq, Eq)]
525enum TimelineToolEvent {
526 Started,
527 Finished,
528}
529
530trait HarnessTimelineExtractor {
531 fn timeline_event(&self, event: &str) -> Option<TimelineToolEvent>;
532 fn native_tool_call(&self, payload: &Value) -> Option<NativeToolCallRefV1>;
533 fn tool_name(&self, payload: &Value) -> String;
534 fn tool_status(&self, payload: &Value) -> TimelineToolCallStatus;
535 fn payload_metadata(&self, event: &str, payload: &Value)
536 -> Result<TimelineToolPayloadMetadata>;
537 fn touched_paths(&self, payload: &Value) -> Vec<String>;
538 fn capture_intent(&self, native: &NativeToolCallRefV1, payload: &Value) -> String;
539
540 fn timeline_thread(
541 &self,
542 runtime: &HarnessBridgeRuntime,
543 opened: &OpenSessionResult,
544 ) -> Result<String> {
545 if let Some(report) = runtime.reports.load(&opened.heddle_session_id)?
546 && let Some(thread) = report.thread
547 {
548 return Ok(thread);
549 }
550 match runtime.repo.head_ref()? {
551 Head::Attached { thread } => Ok(thread.to_string()),
552 Head::Detached { .. } => Ok("main".to_string()),
553 }
554 }
555
556 fn stable_step_id(&self, native: &NativeToolCallRefV1) -> TimelineStepId {
557 let key = format!(
558 "{}\0{}\0{}\0{}",
559 native.harness,
560 native.session_id.as_deref().unwrap_or(""),
561 native.message_id.as_deref().unwrap_or(""),
562 native.tool_call_id
563 );
564 let hash =
565 ContentHash::compute_typed("timeline-native-tool-call-v1", key.as_bytes()).to_hex();
566 TimelineStepId::new(format!("tls-{}", &hash[..24]))
567 }
568
569 fn started_labels(&self, _payload: &Value) -> Vec<TimelineLabel> {
570 vec![TimelineLabel::ExternalSideEffectsUnknown]
571 }
572
573 fn finished_labels(&self, changed: bool, _payload: &Value) -> Vec<TimelineLabel> {
574 if changed {
575 vec![
576 TimelineLabel::RepoReversible,
577 TimelineLabel::ExternalSideEffectsUnknown,
578 ]
579 } else {
580 vec![TimelineLabel::ExternalSideEffectsUnknown]
581 }
582 }
583}
584
585struct OpenCodeTimelineExtractor;
586
587impl HarnessTimelineExtractor for OpenCodeTimelineExtractor {
588 fn timeline_event(&self, event: &str) -> Option<TimelineToolEvent> {
589 match event {
590 "tool.execute.before" => Some(TimelineToolEvent::Started),
591 "tool.execute.after" => Some(TimelineToolEvent::Finished),
592 _ => None,
593 }
594 }
595
596 fn native_tool_call(&self, payload: &Value) -> Option<NativeToolCallRefV1> {
597 opencode_native_tool_call(payload)
598 }
599
600 fn tool_name(&self, payload: &Value) -> String {
601 opencode_tool_name(payload)
602 }
603
604 fn tool_status(&self, payload: &Value) -> TimelineToolCallStatus {
605 opencode_tool_status(payload)
606 }
607
608 fn payload_metadata(
609 &self,
610 event: &str,
611 payload: &Value,
612 ) -> Result<TimelineToolPayloadMetadata> {
613 opencode_payload_metadata(event, payload)
614 }
615
616 fn touched_paths(&self, payload: &Value) -> Vec<String> {
617 opencode_touched_paths(payload)
618 }
619
620 fn capture_intent(&self, native: &NativeToolCallRefV1, payload: &Value) -> String {
621 format!(
622 "OpenCode {} tool call {}",
623 self.tool_name(payload),
624 native.tool_call_id
625 )
626 }
627}
628
629fn record_opencode_timeline_event(
630 runtime: &mut HarnessBridgeRuntime,
631 event: &str,
632 payload: &Value,
633 opened: &OpenSessionResult,
634) -> Result<()> {
635 record_timeline_event(runtime, event, payload, opened, &OpenCodeTimelineExtractor)
636}
637
638fn record_timeline_event<E: HarnessTimelineExtractor>(
639 runtime: &mut HarnessBridgeRuntime,
640 event: &str,
641 payload: &Value,
642 opened: &OpenSessionResult,
643 extractor: &E,
644) -> Result<()> {
645 match extractor.timeline_event(event) {
646 Some(TimelineToolEvent::Started) => {
647 record_timeline_tool_started(runtime, event, payload, opened, extractor)
648 }
649 Some(TimelineToolEvent::Finished) => {
650 record_timeline_tool_finished(runtime, event, payload, opened, extractor)
651 }
652 None => Ok(()),
653 }
654}
655
656fn record_timeline_tool_started<E: HarnessTimelineExtractor>(
657 runtime: &mut HarnessBridgeRuntime,
658 event: &str,
659 payload: &Value,
660 opened: &OpenSessionResult,
661 extractor: &E,
662) -> Result<()> {
663 let Some(native) = extractor.native_tool_call(payload) else {
664 return Ok(());
665 };
666 let Some(before_state) = current_change_id(&runtime.repo)? else {
667 return Ok(());
668 };
669 let thread = extractor.timeline_thread(runtime, opened)?;
670 let store = TimelineStore::open(runtime.repo.heddle_dir())?;
671 let _record_guard = store.lock_recording(&thread)?;
672 let view = TimelineView::rebuild(&store)?;
673 let step_id = extractor.stable_step_id(&native);
674 let (branch_id, parent_step_id) = timeline_position_for_new_tool_step(&view, &thread, &step_id);
675 let envelope = TimelineOperationEnvelope::new(
676 TimelineOperationBodyV1::ToolCallStarted(ToolCallStartedV1 {
677 thread,
678 step_id,
679 branch_id,
680 parent_step_id,
681 native,
682 tool_name: extractor.tool_name(payload),
683 before_state,
684 payload: Some(extractor.payload_metadata(event, payload)?),
685 started_at_ms: Utc::now().timestamp_millis(),
686 }),
687 extractor.started_labels(payload),
688 );
689 store.write_operation(&envelope)?;
690 Ok(())
691}
692
693fn record_timeline_tool_finished<E: HarnessTimelineExtractor>(
694 runtime: &mut HarnessBridgeRuntime,
695 event: &str,
696 payload: &Value,
697 opened: &OpenSessionResult,
698 extractor: &E,
699) -> Result<()> {
700 let Some(native) = extractor.native_tool_call(payload) else {
701 return Ok(());
702 };
703 let Some(fallback_state) = current_change_id(&runtime.repo)? else {
704 return Ok(());
705 };
706 let thread = extractor.timeline_thread(runtime, opened)?;
707 let store = TimelineStore::open(runtime.repo.heddle_dir())?;
708 let _record_guard = store.lock_recording(&thread)?;
709 let before_view = TimelineView::rebuild(&store)?;
710 let step_id = extractor.stable_step_id(&native);
711 let (branch_id, _) = timeline_position_for_new_tool_step(&before_view, &thread, &step_id);
712 let before_state = before_view
713 .step(&thread, &step_id)
714 .and_then(|step| step.before_state)
715 .unwrap_or(fallback_state);
716 let has_worktree_changes_before_capture = !collect_worktree_changes(&runtime.repo)?.is_empty();
717 let mut capture_failed = false;
718 let capture_state = if !has_worktree_changes_before_capture {
719 None
720 } else {
721 let intent = extractor.capture_intent(&native, payload);
722 match create_snapshot(
723 &runtime.repo,
724 &runtime.user_config,
725 Some(intent),
726 None,
727 SnapshotAgentOverrides {
728 provider: opened.provider.clone(),
729 model: opened.model.clone(),
730 session: native.session_id.clone(),
731 segment: None,
732 policy: None,
733 no_policy: false,
734 no_agent: false,
735 },
736 ) {
737 Ok(_) => runtime.repo.head()?,
738 Err(err) => {
739 capture_failed = true;
740 tracing::warn!(?err, "heddle timeline tool capture failed");
741 None
742 }
743 }
744 };
745 let after_state = current_change_id(&runtime.repo)?.unwrap_or(fallback_state);
746 let mut touched_paths = extractor.touched_paths(payload);
747 merge_string_vec(
748 &mut touched_paths,
749 changed_paths_between_states(&runtime.repo, before_state, after_state)?,
750 );
751 let changed = before_state != after_state;
752 let mut labels = extractor.finished_labels(changed, payload);
753 if capture_failed {
754 merge_timeline_labels(&mut labels, vec![TimelineLabel::CaptureFailed]);
755 }
756 let envelope = TimelineOperationEnvelope::new(
757 TimelineOperationBodyV1::ToolCallFinished(ToolCallFinishedV1 {
758 thread,
759 step_id,
760 branch_id,
761 native,
762 status: extractor.tool_status(payload),
763 before_state,
764 after_state,
765 capture_state,
766 capture_oplog_batch_id: None,
767 changed,
768 touched_paths,
769 payload: Some(extractor.payload_metadata(event, payload)?),
770 finished_at_ms: Utc::now().timestamp_millis(),
771 }),
772 labels,
773 );
774 store.write_operation(&envelope)?;
775 Ok(())
776}
777
778fn opencode_native_tool_call(payload: &Value) -> Option<NativeToolCallRefV1> {
779 let tool_call_id = first_value_string(
780 payload,
781 &[
782 &["toolCallID"],
783 &["tool_call_id"],
784 &["toolCallId"],
785 &["callID"],
786 &["call_id"],
787 &["tool", "callID"],
788 &["tool", "call_id"],
789 &["tool", "id"],
790 &["toolCall", "id"],
791 &["tool_call", "id"],
792 &["id"],
793 ],
794 )?;
795 Some(NativeToolCallRefV1 {
796 harness: "opencode".to_string(),
797 session_id: value_string(payload, &["sessionID"])
798 .or_else(|| value_string(payload, &["session_id"])),
799 message_id: value_string(payload, &["messageID"])
800 .or_else(|| value_string(payload, &["message_id"]))
801 .or_else(|| value_string(payload, &["message", "id"])),
802 tool_call_id,
803 })
804}
805
806fn timeline_position_for_new_tool_step(
807 view: &TimelineView,
808 thread: &str,
809 step_id: &TimelineStepId,
810) -> (TimelineBranchId, Option<TimelineStepId>) {
811 let branch_id = view
812 .status(thread)
813 .and_then(|status| status.current_branch_id.clone())
814 .unwrap_or_else(|| TimelineBranchId::new("tlb-main"));
815 let parent_step_id = view
816 .status(thread)
817 .and_then(|status| status.current_step_id.clone())
818 .filter(|current| current != step_id);
819 (branch_id, parent_step_id)
820}
821
822fn current_change_id(repo: &Repository) -> Result<Option<ChangeId>> {
823 Ok(repo
824 .current_state()?
825 .map(|state| state.change_id)
826 .or(repo.head()?))
827}
828
829fn opencode_tool_name(payload: &Value) -> String {
830 first_value_string(
831 payload,
832 &[
833 &["tool", "name"],
834 &["toolName"],
835 &["tool_name"],
836 &["tool"],
837 &["name"],
838 ],
839 )
840 .unwrap_or_else(|| "tool".to_string())
841}
842
843fn opencode_tool_status(payload: &Value) -> TimelineToolCallStatus {
844 let status = first_value_string(
845 payload,
846 &[
847 &["status"],
848 &["tool", "status"],
849 &["result", "status"],
850 &["output", "status"],
851 ],
852 )
853 .unwrap_or_default()
854 .to_ascii_lowercase();
855 if status.contains("cancel") {
856 TimelineToolCallStatus::Cancelled
857 } else if status.contains("fail")
858 || status.contains("error")
859 || payload.get("error").is_some()
860 || payload.get("exception").is_some()
861 {
862 TimelineToolCallStatus::Failed
863 } else {
864 TimelineToolCallStatus::Succeeded
865 }
866}
867
868fn opencode_payload_metadata(event: &str, payload: &Value) -> Result<TimelineToolPayloadMetadata> {
869 let tool_name = opencode_tool_name(payload);
870 let tool_call_id = opencode_native_tool_call(payload)
871 .map(|native| native.tool_call_id)
872 .unwrap_or_default();
873 let raw = serde_json::to_vec(payload)?;
874 let hash = ContentHash::compute_typed("timeline-tool-payload", &raw);
875 let summary = if tool_call_id.is_empty() {
876 format!("OpenCode {event}: {tool_name}")
877 } else {
878 format!("OpenCode {event}: {tool_name} ({tool_call_id})")
879 };
880 Ok(TimelineToolPayloadMetadata {
881 summary: Some(summary),
882 hash: Some(hash),
883 })
884}
885
886fn opencode_touched_paths(payload: &Value) -> Vec<String> {
887 let mut paths = Vec::new();
888 for path in [
889 value_string(payload, &["file", "path"]),
890 value_string(payload, &["path"]),
891 value_string(payload, &["tool", "path"]),
892 value_string(payload, &["tool", "input", "file_path"]),
893 value_string(payload, &["input", "file_path"]),
894 ]
895 .into_iter()
896 .flatten()
897 {
898 if !path.trim().is_empty() && !paths.contains(&path) {
899 paths.push(path);
900 }
901 }
902 for value_path in [
903 &["paths"][..],
904 &["files"][..],
905 &["tool", "input", "paths"][..],
906 &["input", "paths"][..],
907 ] {
908 if let Some(items) = value_string_array(payload, value_path) {
909 merge_string_vec(&mut paths, items);
910 }
911 }
912 paths
913}
914
915fn first_value_string(value: &Value, paths: &[&[&str]]) -> Option<String> {
916 paths.iter().find_map(|path| value_string(value, path))
917}
918
919fn value_string_array(value: &Value, path: &[&str]) -> Option<Vec<String>> {
920 let mut current = value;
921 for segment in path {
922 current = current.get(*segment)?;
923 }
924 current.as_array().map(|items| {
925 items
926 .iter()
927 .filter_map(|item| item.as_str().map(ToString::to_string))
928 .collect()
929 })
930}
931
932fn merge_string_vec(target: &mut Vec<String>, incoming: Vec<String>) {
933 for item in incoming {
934 if !item.trim().is_empty() && !target.contains(&item) {
935 target.push(item);
936 }
937 }
938}
939
940fn merge_timeline_labels(target: &mut Vec<TimelineLabel>, incoming: Vec<TimelineLabel>) {
941 for label in incoming {
942 if !target.contains(&label) {
943 target.push(label);
944 }
945 }
946}
947
948fn value_string(value: &Value, path: &[&str]) -> Option<String> {
949 let mut current = value;
950 for segment in path {
951 current = current.get(*segment)?;
952 }
953 match current {
954 Value::String(s) => Some(s.clone()),
955 Value::Bool(v) => Some(v.to_string()),
956 Value::Number(v) => Some(v.to_string()),
957 _ => None,
958 }
959}
960
961fn value_array_join(value: &Value, path: &[&str]) -> Option<String> {
962 let mut current = value;
963 for segment in path {
964 current = current.get(*segment)?;
965 }
966 current.as_array().map(|items| {
967 items
968 .iter()
969 .filter_map(|item| item.as_str().map(ToString::to_string))
970 .collect::<Vec<_>>()
971 .join(",")
972 })
973}
974
975fn value_u64_string(value: &Value, path: &[&str]) -> Option<String> {
976 let mut current = value;
977 for segment in path {
978 current = current.get(*segment)?;
979 }
980 current.as_u64().map(|v| v.to_string())
981}
982
983fn value_u64(value: &Value, path: &[&str]) -> Option<u64> {
984 let mut current = value;
985 for segment in path {
986 current = current.get(*segment)?;
987 }
988 current.as_u64()
989}
990
991fn value_cost_micros(value: &Value, path: &[&str]) -> Option<String> {
992 let mut current = value;
993 for segment in path {
994 current = current.get(*segment)?;
995 }
996 current
997 .as_f64()
998 .map(|v| ((v * 1_000_000.0).round() as u64).to_string())
999}
1000
1001fn value_cost_micros_u64(value: &Value, path: &[&str]) -> Option<u64> {
1002 let mut current = value;
1003 for segment in path {
1004 current = current.get(*segment)?;
1005 }
1006 current.as_f64().map(|v| (v * 1_000_000.0).round() as u64)
1007}
1008
1009fn map_from_pairs<const N: usize>(pairs: [(&str, Option<String>); N]) -> BTreeMap<String, String> {
1010 pairs
1011 .into_iter()
1012 .filter_map(|(key, value)| value.map(|value| (key.to_string(), value)))
1013 .collect()
1014}
1015
1016fn csv_from_value(value: Option<&String>) -> Vec<String> {
1017 value
1018 .map(|value| {
1019 value
1020 .split(',')
1021 .map(|item| item.trim().to_string())
1022 .filter(|item| !item.is_empty())
1023 .collect()
1024 })
1025 .unwrap_or_default()
1026}
1027
1028impl HarnessBridgeRuntime {
1029 fn new(repo: Repository, user_config: UserConfig) -> Self {
1030 let reports = SessionReportStore::new(repo.root());
1031 Self {
1032 repo,
1033 user_config,
1034 reports,
1035 }
1036 }
1037
1038 fn handle_request(&mut self, request: BridgeRequest) -> BridgeResponse {
1039 let response = match request.method.as_str() {
1040 "open_session" => self
1041 .decode_params::<OpenSessionParams>(request.params)
1042 .and_then(|params| self.open_session(params))
1043 .and_then(to_json_value),
1044 "update_progress" => self
1045 .decode_params::<UpdateProgressParams>(request.params)
1046 .and_then(|params| self.update_progress(params))
1047 .and_then(to_json_value),
1048 "record_usage" => self
1049 .decode_params::<RecordUsageParams>(request.params)
1050 .and_then(|params| self.record_usage(params))
1051 .and_then(to_json_value),
1052 "record_touched_paths" => self
1053 .decode_params::<RecordTouchedPathsParams>(request.params)
1054 .and_then(|params| self.record_touched_paths(params))
1055 .and_then(to_json_value),
1056 "close_session" => self
1057 .decode_params::<CloseSessionParams>(request.params)
1058 .and_then(|params| self.close_session(params))
1059 .and_then(to_json_value),
1060 "flush_reports" => self
1061 .decode_params::<FlushReportsParams>(request.params)
1062 .and_then(|params| self.flush_reports(params))
1063 .and_then(to_json_value),
1064 other => Err(anyhow!("unknown method '{other}'")),
1065 };
1066
1067 match response {
1068 Ok(result) => BridgeResponse::ok(request.id, result),
1069 Err(err) => BridgeResponse::error(request.id, "bridge_error", err.to_string()),
1070 }
1071 }
1072
1073 fn decode_params<T: for<'de> Deserialize<'de>>(&self, value: Value) -> Result<T> {
1074 serde_json::from_value(value).map_err(|err| anyhow!(err))
1075 }
1076
1077 fn open_session(&mut self, params: OpenSessionParams) -> Result<OpenSessionResult> {
1078 if self.user_config.harness.mode == HarnessMode::Off {
1079 return Err(anyhow!("harness integration is disabled in user config"));
1080 }
1081
1082 let requested_transport = params
1083 .transport
1084 .unwrap_or(self.user_config.harness.transport);
1085 let transcript_mode = params
1086 .transcript_mode
1087 .unwrap_or(self.user_config.harness.transcript);
1088 let env_hints = merged_env_hints(¶ms.env_hints);
1089 let token_claims = user_config_token_claims(&self.user_config);
1090 let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
1091 let current_segment = current_session
1092 .as_ref()
1093 .and_then(|session| session.current_segment());
1094 let probe = probe_harness_actor(&HarnessProbeInput {
1095 argv: params.argv.clone(),
1096 env_hints: env_hints.clone(),
1097 explicit_harness: params.harness.clone(),
1098 explicit_provider: params.provider.clone(),
1099 explicit_model: params.model.clone(),
1100 explicit_thinking_level: params.thinking_level.clone(),
1101 explicit_policy: params.policy.clone(),
1102 probe_metadata: params.probe_metadata.clone(),
1103 current_provider: current_segment.map(|segment| segment.provider.clone()),
1104 current_model: current_segment.map(|segment| segment.model.clone()),
1105 current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
1106 repo_root: self.repo.root().display().to_string(),
1107 })?;
1108 let identity = resolve_identity(
1109 &self.repo,
1110 &self.user_config,
1111 IdentityHints {
1112 harness: params.harness.clone(),
1113 provider: params.provider.clone(),
1114 model: params.model.clone(),
1115 thinking_level: params.thinking_level.clone(),
1116 policy: params.policy.clone(),
1117 probe: probe.clone(),
1118 },
1119 )?;
1120 let registry = AgentRegistry::new(self.repo.heddle_dir());
1121 let requested_entry = resolve_requested_registry_entry(
1122 ®istry,
1123 params.agent_session_id.as_deref(),
1124 params.client_instance_id.as_deref(),
1125 )?;
1126
1127 if self.user_config.harness.mode == HarnessMode::Required
1128 && (identity.harness.is_none()
1129 || identity.provider.is_none()
1130 || identity.model.is_none())
1131 {
1132 return Err(anyhow!(
1133 "harness mode is 'required' but harness/provider/model could not be resolved"
1134 ));
1135 }
1136
1137 let mut sessions = SessionManager::new(self.repo.root());
1138 let principal = self.repo.get_principal()?;
1139 let mut attach = resolve_actor_attachment(
1140 ®istry,
1141 &self.repo,
1142 &mut sessions,
1143 AttachmentResolutionInput {
1144 requested_entry: requested_entry.as_ref(),
1145 explicit_heddle_session_id: params.heddle_session_id.as_deref(),
1146 client_instance_id: params.client_instance_id.as_deref(),
1147 probe: &probe,
1148 token_claims: token_claims.as_ref(),
1149 },
1150 )?;
1151 let (session, owns_session) = match &attach.target {
1152 AttachTarget::ExistingSession(session) => {
1153 let segment_id = session.current_segment_id.clone().unwrap_or_default();
1154 sessions.set_current_session(&session.id, &segment_id)?;
1155 (session.clone(), false)
1156 }
1157 AttachTarget::CreateNew {
1158 _because_claimed: _,
1159 } => {
1160 let session = sessions.start_session(
1161 principal,
1162 identity
1163 .provider
1164 .clone()
1165 .unwrap_or_else(|| "unknown".to_string()),
1166 identity
1167 .model
1168 .clone()
1169 .unwrap_or_else(|| "unknown".to_string()),
1170 identity.policy.clone(),
1171 )?;
1172 (session, true)
1173 }
1174 };
1175
1176 let (thread_name, thread_id) =
1177 self.resolve_harness_thread_binding(¶ms, &probe, &identity)?;
1178 let entry = self.ensure_registry_entry(RegistryEntryRequest {
1179 heddle_session_id: &session.id,
1180 thread_name: thread_name.as_deref(),
1181 thread_id: thread_id.as_deref(),
1182 identity: &identity,
1183 probe: &probe,
1184 attach: &attach,
1185 client_instance_id: params.client_instance_id.as_deref(),
1186 requested_entry: requested_entry.as_ref(),
1187 })?;
1188 let (session, owns_session) = self.reuse_canonical_actor_session(
1189 &mut sessions,
1190 CanonicalActorSessionRequest {
1191 tentative_session: session,
1192 tentative_owns_session: owns_session,
1193 entry: &entry,
1194 probe: &probe,
1195 attach: &mut attach,
1196 },
1197 )?;
1198
1199 let mut segment_id = session.current_segment_id.clone().unwrap_or_default();
1200 if should_rotate_segment(&session, &identity) {
1201 let segment = sessions.add_segment(
1202 &session.id,
1203 identity
1204 .provider
1205 .clone()
1206 .unwrap_or_else(|| "unknown".to_string()),
1207 identity
1208 .model
1209 .clone()
1210 .unwrap_or_else(|| "unknown".to_string()),
1211 identity.policy.clone(),
1212 )?;
1213 segment_id = segment.id;
1214 }
1215
1216 let base_state = self
1217 .repo
1218 .current_state()?
1219 .map(|state| state.change_id.to_string_full())
1220 .or_else(|| {
1221 self.repo
1222 .head()
1223 .ok()
1224 .flatten()
1225 .map(|id| id.to_string_full())
1226 });
1227 let worktree_changes_at_open = capture_worktree_change_snapshot(&self.repo)?;
1228 let opened_at = Utc::now().to_rfc3339();
1229 let mut report = SessionReportEnvelope {
1230 version: 1,
1231 heddle_session_id: session.id.clone(),
1232 heddle_segment_id: (!segment_id.is_empty()).then_some(segment_id.clone()),
1233 agent_session_id: Some(entry.session_id.clone()),
1234 client_instance_id: entry.client_instance_id.clone(),
1235 native_actor_key: entry.native_actor_key.clone(),
1236 native_parent_actor_key: entry.native_parent_actor_key.clone(),
1237 native_instance_key: entry.native_instance_key.clone(),
1238 repo_root: self.repo.root().display().to_string(),
1239 thread: thread_name.clone(),
1240 thread_id,
1241 task: params.task.clone(),
1242 summary: params.summary.clone(),
1243 opened_at,
1244 closed_at: None,
1245 base_state_at_open: base_state.clone(),
1246 worktree_changes_at_open,
1247 head_state_at_close: None,
1248 transport_mode: transport_mode_name(requested_transport).to_string(),
1249 transcript_mode: transcript_mode_name(transcript_mode).to_string(),
1250 outcome: None,
1251 harness: identity.to_transport_identity(),
1252 progress: Vec::new(),
1253 usage: UsageTotals::default(),
1254 touched_paths: Vec::new(),
1255 changed_paths: Vec::new(),
1256 diff_summary: None,
1257 transcript_refs: Vec::new(),
1258 last_progress_at: None,
1259 report_flush_state: Some("pending-local".to_string()),
1260 attach_reason: Some(attach.attach_reason.clone()),
1261 attach_precedence: attach.precedence.clone(),
1262 winning_attach_rule: Some(attach.winning_rule.clone()),
1263 probe_source: probe.probe_source.clone(),
1264 probe_confidence: probe.confidence,
1265 pending_flush: true,
1266 last_flushed_at: None,
1267 owns_session,
1268 };
1269 merge_unique_paths(&mut report.touched_paths, probe.touched_paths.clone());
1270 merge_usage(&mut report.usage, &probe.usage_totals);
1271 if transcript_mode != HarnessTranscriptMode::Off {
1272 report.transcript_refs = probe.transcript_refs.clone();
1273 }
1274 self.reports.save(&report)?;
1275 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1276 if matches!(requested_transport, HarnessTransport::Direct) {
1277 enqueue_report(&self.reports, &mut report)?;
1278 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1279 }
1280
1281 Ok(OpenSessionResult {
1282 heddle_session_id: report.heddle_session_id.clone(),
1283 heddle_segment_id: report.heddle_segment_id.clone(),
1284 agent_session_id: report.agent_session_id.clone(),
1285 created_session: owns_session,
1286 harness: report.harness.harness.clone(),
1287 provider: report.harness.provider.clone(),
1288 model: report.harness.model.clone(),
1289 thinking_level: report.harness.thinking_level.clone(),
1290 report_flush_state: report.report_flush_state.clone(),
1291 attach_reason: report.attach_reason.clone(),
1292 })
1293 }
1294
1295 fn update_progress(&mut self, params: UpdateProgressParams) -> Result<SessionMutationResult> {
1296 let mut report = self
1297 .reports
1298 .load(¶ms.heddle_session_id)?
1299 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1300 let current_session = SessionManager::new(self.repo.root()).get_current_session()?;
1301 let current_segment = current_session
1302 .as_ref()
1303 .and_then(|session| session.current_segment());
1304 let probe = probe_harness_actor(&HarnessProbeInput {
1305 argv: params.argv.clone(),
1306 env_hints: merged_env_hints(¶ms.env_hints),
1307 explicit_harness: params.harness.clone(),
1308 explicit_provider: params.provider.clone(),
1309 explicit_model: params.model.clone(),
1310 explicit_thinking_level: params.thinking_level.clone(),
1311 explicit_policy: params.policy.clone(),
1312 probe_metadata: params.probe_metadata.clone(),
1313 current_provider: current_segment.map(|segment| segment.provider.clone()),
1314 current_model: current_segment.map(|segment| segment.model.clone()),
1315 current_policy: current_segment.and_then(|segment| segment.policy_id.clone()),
1316 repo_root: self.repo.root().display().to_string(),
1317 })?;
1318 let identity = resolve_identity(
1319 &self.repo,
1320 &self.user_config,
1321 IdentityHints {
1322 harness: params.harness.clone(),
1323 provider: params.provider.clone(),
1324 model: params.model.clone(),
1325 thinking_level: params.thinking_level.clone(),
1326 policy: params.policy.clone(),
1327 probe: probe.clone(),
1328 },
1329 )?;
1330 self.ensure_segment_for_report(&mut report, &identity)?;
1331 if report.harness.harness.is_none() {
1332 report.harness.harness = identity.harness.clone();
1333 }
1334 if report.harness.provider.is_none() {
1335 report.harness.provider = identity.provider.clone();
1336 }
1337 if report.harness.model.is_none() {
1338 report.harness.model = identity.model.clone();
1339 }
1340 if report.harness.thinking_level.is_none() {
1341 report.harness.thinking_level = identity.thinking_level.clone();
1342 }
1343 if report.harness.policy.is_none() {
1344 report.harness.policy = identity.policy.clone();
1345 }
1346 if report.native_actor_key.is_none() {
1347 report.native_actor_key = probe.native_actor_key.clone();
1348 }
1349 if report.native_parent_actor_key.is_none() {
1350 report.native_parent_actor_key = probe.native_parent_actor_key.clone();
1351 }
1352 if report.native_instance_key.is_none() {
1353 report.native_instance_key = probe.native_instance_key.clone();
1354 }
1355 if report.probe_source.is_none() {
1356 report.probe_source = probe.probe_source.clone();
1357 }
1358 if report.probe_confidence.is_none() {
1359 report.probe_confidence = probe.confidence;
1360 }
1361
1362 let recorded_at = Utc::now().to_rfc3339();
1363 let checkpoint = ProgressCheckpoint {
1364 status: params.status.clone(),
1365 message: params.message.clone(),
1366 completed_steps: params.completed_steps,
1367 total_steps: params.total_steps,
1368 touched_paths: normalize_paths(
1369 params
1370 .touched_paths
1371 .into_iter()
1372 .chain(probe.touched_paths)
1373 .collect::<Vec<_>>(),
1374 ),
1375 recorded_at: recorded_at.clone(),
1376 };
1377 merge_unique_paths(
1378 &mut report.touched_paths,
1379 checkpoint.touched_paths.iter().cloned(),
1380 );
1381 merge_usage(&mut report.usage, &probe.usage_totals);
1382 if report.transcript_mode != "off" && report.transcript_refs.is_empty() {
1383 report.transcript_refs = probe.transcript_refs;
1384 }
1385 report.progress.push(checkpoint);
1386 if let Some(summary) = params.summary {
1387 report.summary = Some(summary);
1388 }
1389 report.last_progress_at = Some(recorded_at);
1390 mark_pending_flush(&mut report);
1391 self.persist_report(report)
1392 }
1393
1394 fn resolve_harness_thread_binding(
1395 &self,
1396 params: &OpenSessionParams,
1397 probe: &HarnessProbeResult,
1398 identity: &ResolvedIdentity,
1399 ) -> Result<(Option<String>, Option<String>)> {
1400 if let Some(thread) = params.thread.clone() {
1401 let thread_id = thread_id_for_name(&self.repo, Some(&thread))?;
1402 return Ok((Some(thread), thread_id));
1403 }
1404
1405 let current_attached = match self.repo.head_ref()? {
1406 Head::Attached { thread } => Some(thread.to_string()),
1407 Head::Detached { .. } => None,
1408 };
1409
1410 if !probe.attach_hints.root_actor
1411 && self.user_config.harness.threading.subagent
1412 == UserHarnessSubagentThreadPolicy::CreateChild
1413 && let Some(parent_thread) =
1414 resolve_parent_thread_for_subagent(&self.repo, probe, current_attached.as_deref())?
1415 && can_create_harness_thread(&self.repo, Some(&parent_thread), Some(&parent_thread))?
1416 {
1417 let name = allocate_thread_name(
1418 &self.repo,
1419 &format!(
1420 "{}/{}",
1421 parent_thread,
1422 sanitize_name(&preferred_thread_slug(params, probe, identity))
1423 ),
1424 )?;
1425 self.ensure_harness_thread(
1426 &name,
1427 Some(&parent_thread),
1428 Some(&parent_thread),
1429 params.task.clone(),
1430 )?;
1431 let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
1432 return Ok((Some(name), thread_id));
1433 }
1434
1435 if probe.attach_hints.root_actor
1436 && self.user_config.harness.threading.root_actor
1437 == UserHarnessRootThreadPolicy::CreateNew
1438 && let Some(current) = current_attached.clone()
1439 && can_create_harness_thread(&self.repo, Some(¤t), None)?
1440 {
1441 let name = allocate_thread_name(
1442 &self.repo,
1443 &format!(
1444 "{}/{}",
1445 current,
1446 sanitize_name(&preferred_thread_slug(params, probe, identity))
1447 ),
1448 )?;
1449 self.ensure_harness_thread(&name, Some(¤t), None, params.task.clone())?;
1450 let thread_id = thread_id_for_name(&self.repo, Some(&name))?;
1451 return Ok((Some(name), thread_id));
1452 }
1453
1454 let thread_id = thread_id_for_name(&self.repo, current_attached.as_deref())?;
1455 Ok((current_attached, thread_id))
1456 }
1457
1458 fn ensure_harness_thread(
1459 &self,
1460 name: &str,
1461 target_thread: Option<&str>,
1462 parent_thread: Option<&str>,
1463 task: Option<String>,
1464 ) -> Result<()> {
1465 let manager = ThreadManager::new(self.repo.heddle_dir());
1466 if manager.load(name)?.is_some() {
1467 return Ok(());
1468 }
1469
1470 let base_state = self
1471 .resolve_harness_thread_base_state(target_thread, parent_thread)?
1472 .ok_or_else(|| anyhow!("No current state to start a thread from"))?;
1473 let tn = ThreadName::new(name);
1474 if self.repo.refs().get_thread(&tn)?.is_none() {
1475 self.repo
1476 .refs()
1477 .set_thread_cas(&tn, refs::RefExpectation::Missing, &base_state)?;
1478 self.repo.oplog().record_thread_create(
1483 &tn,
1484 &base_state,
1485 None,
1486 Some(&self.repo.op_scope()),
1487 )?;
1488 }
1489
1490 let workspace_mode = self
1491 .user_config
1492 .harness
1493 .threading
1494 .workspace_default
1495 .unwrap_or(UserThreadWorkspaceMode::Materialized);
1496 let thread_mode = match workspace_mode {
1497 UserThreadWorkspaceMode::Materialized | UserThreadWorkspaceMode::Auto => {
1498 ThreadMode::Materialized
1499 }
1500 UserThreadWorkspaceMode::Virtualized => ThreadMode::Virtualized,
1501 UserThreadWorkspaceMode::Solid => ThreadMode::Solid,
1502 };
1503 let path = match thread_mode {
1504 ThreadMode::Solid | ThreadMode::Materialized => {
1505 default_private_thread_path(&self.repo, name)
1506 }
1507 ThreadMode::Virtualized => default_private_thread_path(&self.repo, name),
1510 };
1511 let abs_path = prepare_worktree_target(&self.repo, &path, Some(name))?.path;
1512 write_isolated_checkout(&self.repo, &abs_path, &base_state, Some(name))?;
1513
1514 let base_state_obj = self
1515 .repo
1516 .store()
1517 .get_state(&base_state)?
1518 .ok_or_else(|| anyhow!("Base state '{}' not found", base_state.short()))?;
1519 let thread = Thread {
1520 id: name.to_string(),
1521 thread: name.to_string(),
1522 target_thread: target_thread.map(ToString::to_string),
1523 parent_thread: parent_thread.map(ToString::to_string),
1524 mode: thread_mode.clone(),
1525 state: ThreadState::Active,
1526 base_state: base_state.short(),
1527 base_root: base_state_obj.tree.short(),
1528 current_state: Some(base_state.short()),
1529 merged_state: None,
1530 task,
1531 execution_path: abs_path.clone(),
1532 materialized_path: match thread_mode {
1533 ThreadMode::Solid => Some(abs_path),
1534 ThreadMode::Materialized | ThreadMode::Virtualized => None,
1538 },
1539 changed_paths: vec![],
1540 impact_categories: vec![],
1541 heavy_impact_paths: vec![],
1542 promotion_suggested: false,
1543 freshness: if target_thread.is_some() {
1544 ThreadFreshness::Current
1545 } else {
1546 ThreadFreshness::Unknown
1547 },
1548 verification_summary: summarize_verification(base_state_obj.verification.as_ref()),
1549 confidence_summary: summarize_confidence(base_state_obj.confidence),
1550 integration_policy_result: ThreadIntegrationPolicy::default(),
1551 created_at: Utc::now(),
1552 updated_at: Utc::now(),
1553 ephemeral: None,
1554 auto: true,
1559 shared_target_dir: None,
1562 };
1563 manager.save(&thread)?;
1564 Ok(())
1565 }
1566
1567 fn resolve_harness_thread_base_state(
1568 &self,
1569 target_thread: Option<&str>,
1570 parent_thread: Option<&str>,
1571 ) -> Result<Option<objects::object::ChangeId>> {
1572 resolve_harness_thread_base_state(&self.repo, target_thread, parent_thread)
1573 }
1574
1575 fn record_usage(&mut self, params: RecordUsageParams) -> Result<SessionMutationResult> {
1576 let mut report = self
1577 .reports
1578 .load(¶ms.heddle_session_id)?
1579 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1580 if let Some(input) = params.input_tokens {
1581 report.usage.input_tokens = Some(max_u64(report.usage.input_tokens, input));
1582 }
1583 if let Some(output) = params.output_tokens {
1584 report.usage.output_tokens = Some(max_u64(report.usage.output_tokens, output));
1585 }
1586 if let Some(reasoning) = params.reasoning_tokens {
1587 report.usage.reasoning_tokens = Some(max_u64(report.usage.reasoning_tokens, reasoning));
1588 }
1589 if let Some(cache_creation) = params.cache_creation_tokens {
1590 report.usage.cache_creation_tokens =
1591 Some(max_u64(report.usage.cache_creation_tokens, cache_creation));
1592 }
1593 if let Some(cache_read) = params.cache_read_tokens {
1594 report.usage.cache_read_tokens =
1595 Some(max_u64(report.usage.cache_read_tokens, cache_read));
1596 }
1597 if let Some(tool_calls) = params.tool_calls {
1598 report.usage.tool_calls = Some(max_u32(report.usage.tool_calls, tool_calls));
1599 }
1600 if let Some(cost) = params.cost_micros_usd {
1601 report.usage.cost_micros_usd = Some(max_u64(report.usage.cost_micros_usd, cost));
1602 }
1603 mark_pending_flush(&mut report);
1604 self.persist_report(report)
1605 }
1606
1607 fn record_touched_paths(
1608 &mut self,
1609 params: RecordTouchedPathsParams,
1610 ) -> Result<SessionMutationResult> {
1611 let mut report = self
1612 .reports
1613 .load(¶ms.heddle_session_id)?
1614 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1615 merge_unique_paths(&mut report.touched_paths, normalize_paths(params.paths));
1616 mark_pending_flush(&mut report);
1617 self.persist_report(report)
1618 }
1619
1620 fn close_session(&mut self, params: CloseSessionParams) -> Result<CloseSessionResult> {
1621 let mut report = self
1622 .reports
1623 .load(¶ms.heddle_session_id)?
1624 .ok_or_else(|| anyhow!("session report not found for {}", params.heddle_session_id))?;
1625 report.closed_at = Some(Utc::now().to_rfc3339());
1626 report.outcome = params.outcome.clone();
1627 if let Some(summary) = params.summary {
1628 report.summary = Some(summary);
1629 }
1630 if let Some(transcript_refs) = params.transcript_refs {
1631 report.transcript_refs = transcript_refs;
1632 }
1633 let final_diff = compute_final_diff(
1634 &self.repo,
1635 report.base_state_at_open.as_deref(),
1636 &report.worktree_changes_at_open,
1637 )?;
1638 report.head_state_at_close = final_diff.head_state;
1639 report.changed_paths = final_diff.changed_paths;
1640 report.diff_summary = Some(final_diff.diff_summary);
1641 mark_pending_flush(&mut report);
1642 if report.owns_session {
1643 let mut sessions = SessionManager::new(self.repo.root());
1644 if let Ok(Some(session)) = sessions.get_session(&report.heddle_session_id)
1645 && session.is_active()
1646 {
1647 let _ = sessions.end_session(Some(&report.heddle_session_id));
1648 }
1649 }
1650
1651 let transport = params
1652 .transport
1653 .unwrap_or(self.user_config.harness.transport);
1654 if matches!(transport, HarnessTransport::Direct | HarnessTransport::End) {
1655 enqueue_report(&self.reports, &mut report)?;
1656 } else {
1657 self.reports.save(&report)?;
1658 }
1659 self.sync_registry_from_report(&report, AgentStatus::Complete)?;
1660 Ok(CloseSessionResult {
1661 heddle_session_id: report.heddle_session_id,
1662 changed_paths: report.changed_paths,
1663 diff_summary: report.diff_summary.unwrap_or_default(),
1664 report_flush_state: report.report_flush_state,
1665 })
1666 }
1667
1668 fn flush_reports(&mut self, params: FlushReportsParams) -> Result<FlushReportsResult> {
1669 let mut flushed = 0usize;
1670 let session_ids = match params.heddle_session_id {
1671 Some(session_id) => vec![session_id],
1672 None => self.reports.list_pending()?,
1673 };
1674 for session_id in session_ids {
1675 let Some(mut report) = self.reports.load(&session_id)? else {
1676 continue;
1677 };
1678 if !report.pending_flush {
1679 continue;
1680 }
1681 enqueue_report(&self.reports, &mut report)?;
1682 let status = if report.closed_at.is_some() {
1683 AgentStatus::Complete
1684 } else {
1685 AgentStatus::Active
1686 };
1687 self.sync_registry_from_report(&report, status)?;
1688 flushed += 1;
1689 }
1690 Ok(FlushReportsResult { flushed })
1691 }
1692
1693 fn persist_report(
1694 &mut self,
1695 mut report: SessionReportEnvelope,
1696 ) -> Result<SessionMutationResult> {
1697 let transport = transport_from_report(&report, self.user_config.harness.transport);
1698 match transport {
1699 HarnessTransport::Direct => {
1700 enqueue_report(&self.reports, &mut report)?;
1701 }
1702 HarnessTransport::Spool | HarnessTransport::End => {
1703 self.reports.save(&report)?;
1704 }
1705 }
1706 self.sync_registry_from_report(&report, AgentStatus::Active)?;
1707 Ok(SessionMutationResult {
1708 heddle_session_id: report.heddle_session_id,
1709 heddle_segment_id: report.heddle_segment_id,
1710 report_flush_state: report.report_flush_state,
1711 })
1712 }
1713
1714 fn ensure_segment_for_report(
1715 &self,
1716 report: &mut SessionReportEnvelope,
1717 identity: &ResolvedIdentity,
1718 ) -> Result<()> {
1719 let mut sessions = SessionManager::new(self.repo.root());
1720 let Some(session) = sessions.get_session(&report.heddle_session_id)? else {
1721 return Ok(());
1722 };
1723 if !session.is_active() || !should_rotate_segment(&session, identity) {
1724 return Ok(());
1725 }
1726 let segment = sessions.add_segment(
1727 &report.heddle_session_id,
1728 identity
1729 .provider
1730 .clone()
1731 .unwrap_or_else(|| "unknown".to_string()),
1732 identity
1733 .model
1734 .clone()
1735 .unwrap_or_else(|| "unknown".to_string()),
1736 identity.policy.clone(),
1737 )?;
1738 report.heddle_segment_id = Some(segment.id);
1739 if identity.provider.is_some() {
1740 report.harness.provider = identity.provider.clone();
1741 }
1742 if identity.model.is_some() {
1743 report.harness.model = identity.model.clone();
1744 }
1745 if identity.policy.is_some() {
1746 report.harness.policy = identity.policy.clone();
1747 }
1748 if identity.thinking_level.is_some() {
1749 report.harness.thinking_level = identity.thinking_level.clone();
1750 }
1751 Ok(())
1752 }
1753
1754 fn ensure_registry_entry(&self, request: RegistryEntryRequest<'_>) -> Result<AgentEntry> {
1755 let RegistryEntryRequest {
1756 heddle_session_id,
1757 thread_name,
1758 thread_id,
1759 identity,
1760 probe,
1761 attach,
1762 client_instance_id,
1763 requested_entry,
1764 } = request;
1765 let registry = AgentRegistry::new(self.repo.heddle_dir());
1766 let fallback_entry = if client_instance_id.is_some()
1767 || probe.native_actor_key.is_some()
1768 || probe.native_instance_key.is_some()
1769 {
1770 None
1771 } else {
1772 find_matching_registry_entry(®istry, &self.repo, heddle_session_id, thread_name)?
1773 };
1774 if let Some(entry) = requested_entry
1775 .cloned()
1776 .or_else(|| attach.matched_entry.clone())
1777 .or(fallback_entry)
1778 {
1779 return registry
1780 .update_entry(&entry.session_id, |existing| {
1781 if client_instance_id.is_some() {
1782 existing.client_instance_id = client_instance_id.map(ToString::to_string);
1783 }
1784 if probe.native_actor_key.is_some() {
1785 existing.native_actor_key = probe.native_actor_key.clone();
1786 }
1787 if probe.native_parent_actor_key.is_some() {
1788 existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
1789 }
1790 if probe.native_instance_key.is_some() {
1791 existing.native_instance_key = probe.native_instance_key.clone();
1792 }
1793 existing.heddle_session_id = Some(heddle_session_id.to_string());
1794 existing.thread_id = thread_id.map(ToString::to_string);
1795 if let Some(thread_name) = thread_name {
1796 existing.thread = thread_name.to_string();
1797 }
1798 existing.path = Some(self.repo.root().to_path_buf());
1799 if identity.provider.is_some() {
1800 existing.provider = identity.provider.clone();
1801 }
1802 if identity.model.is_some() {
1803 existing.model = identity.model.clone();
1804 }
1805 if identity.harness.is_some() {
1806 existing.harness = identity.harness.clone();
1807 }
1808 if identity.thinking_level.is_some() {
1809 existing.thinking_level = identity.thinking_level.clone();
1810 }
1811 existing.attach_reason = Some(attach.attach_reason.clone());
1812 existing.attach_precedence = attach.precedence.clone();
1813 existing.winning_attach_rule = Some(attach.winning_rule.clone());
1814 existing.probe_source = probe.probe_source.clone();
1815 existing.probe_confidence = probe.confidence;
1816 existing.status = AgentStatus::Active;
1817 })?
1818 .ok_or_else(|| anyhow!("registry entry disappeared during update"));
1819 }
1820
1821 if client_instance_id.is_none() && probe.native_actor_key.is_some() {
1822 let (entry, _) = registry.find_or_create_active_entry(
1823 |entry| {
1824 claude_actor_compatible(entry, probe, self.repo.root())
1825 && entry.native_actor_key == probe.native_actor_key
1826 },
1827 |existing| {
1828 if client_instance_id.is_some() {
1829 existing.client_instance_id = client_instance_id.map(ToString::to_string);
1830 }
1831 if existing.heddle_session_id.is_none() {
1832 existing.heddle_session_id = Some(heddle_session_id.to_string());
1833 }
1834 existing.thread_id = thread_id.map(ToString::to_string);
1835 if let Some(thread_name) = thread_name {
1836 existing.thread = thread_name.to_string();
1837 }
1838 existing.path = Some(self.repo.root().to_path_buf());
1839 if identity.provider.is_some() {
1840 existing.provider = identity.provider.clone();
1841 }
1842 if identity.model.is_some() {
1843 existing.model = identity.model.clone();
1844 }
1845 if identity.harness.is_some() {
1846 existing.harness = identity.harness.clone();
1847 }
1848 if identity.thinking_level.is_some() {
1849 existing.thinking_level = identity.thinking_level.clone();
1850 }
1851 if probe.native_parent_actor_key.is_some() {
1852 existing.native_parent_actor_key = probe.native_parent_actor_key.clone();
1853 }
1854 if probe.native_instance_key.is_some() {
1855 existing.native_instance_key = probe.native_instance_key.clone();
1856 }
1857 existing.attach_reason = Some(attach.attach_reason.clone());
1858 existing.attach_precedence = attach.precedence.clone();
1859 existing.winning_attach_rule = Some(attach.winning_rule.clone());
1860 existing.probe_source = probe.probe_source.clone();
1861 existing.probe_confidence = probe.confidence;
1862 existing.status = AgentStatus::Active;
1863 },
1864 |session_id| {
1865 Ok(AgentEntry {
1866 session_id: session_id.to_string(),
1867 client_instance_id: client_instance_id.map(ToString::to_string),
1868 native_actor_key: probe.native_actor_key.clone(),
1869 native_parent_actor_key: probe.native_parent_actor_key.clone(),
1870 native_instance_key: probe.native_instance_key.clone(),
1871 heddle_session_id: Some(heddle_session_id.to_string()),
1872 thread_id: thread_id.map(ToString::to_string),
1873 thread: thread_name.unwrap_or("detached").to_string(),
1874 pid: Some(std::process::id()),
1875 boot_id: None,
1876 liveness_path: None,
1877 heartbeat_at: Some(Utc::now()),
1878 anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
1879 anchor_root: None,
1880 reservation_token: Some(objects::store::generate_agent_id()),
1881 path: Some(self.repo.root().to_path_buf()),
1882 base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
1883 started_at: Utc::now(),
1884 provider: identity.provider.clone(),
1885 model: identity.model.clone(),
1886 harness: identity.harness.clone(),
1887 thinking_level: identity.thinking_level.clone(),
1888 usage_summary: AgentUsageSummary::default(),
1889 last_progress_at: None,
1890 report_flush_state: Some("pending-local".to_string()),
1891 attach_reason: Some(attach.attach_reason.clone()),
1892 task_assignment_id: None,
1893 attach_precedence: attach.precedence.clone(),
1894 winning_attach_rule: Some(attach.winning_rule.clone()),
1895 probe_source: probe.probe_source.clone(),
1896 probe_confidence: probe.confidence,
1897 status: AgentStatus::Active,
1898 completed_at: None,
1899 context_queries: vec![],
1900 })
1901 },
1902 )?;
1903 return Ok(entry);
1904 }
1905
1906 Ok(registry.create_generated_entry(|session_id| {
1907 Ok(AgentEntry {
1908 session_id: session_id.to_string(),
1909 client_instance_id: client_instance_id.map(ToString::to_string),
1910 native_actor_key: probe.native_actor_key.clone(),
1911 native_parent_actor_key: probe.native_parent_actor_key.clone(),
1912 native_instance_key: probe.native_instance_key.clone(),
1913 heddle_session_id: Some(heddle_session_id.to_string()),
1914 thread_id: thread_id.map(ToString::to_string),
1915 thread: thread_name.unwrap_or("detached").to_string(),
1916 pid: Some(std::process::id()),
1917 boot_id: None,
1918 liveness_path: None,
1919 heartbeat_at: Some(Utc::now()),
1920 anchor_state: self.repo.head()?.map(|id| id.to_string_full()),
1921 anchor_root: None,
1922 reservation_token: Some(objects::store::generate_agent_id()),
1923 path: Some(self.repo.root().to_path_buf()),
1924 base_state: self.repo.head()?.map(|id| id.short()).unwrap_or_default(),
1925 started_at: Utc::now(),
1926 provider: identity.provider.clone(),
1927 model: identity.model.clone(),
1928 harness: identity.harness.clone(),
1929 thinking_level: identity.thinking_level.clone(),
1930 usage_summary: AgentUsageSummary::default(),
1931 last_progress_at: None,
1932 report_flush_state: Some("pending-local".to_string()),
1933 attach_reason: Some(attach.attach_reason.clone()),
1934 task_assignment_id: None,
1935 attach_precedence: attach.precedence.clone(),
1936 winning_attach_rule: Some(attach.winning_rule.clone()),
1937 probe_source: probe.probe_source.clone(),
1938 probe_confidence: probe.confidence,
1939 status: AgentStatus::Active,
1940 completed_at: None,
1941 context_queries: vec![],
1942 })
1943 })?)
1944 }
1945
1946 fn reuse_canonical_actor_session(
1947 &self,
1948 sessions: &mut SessionManager,
1949 request: CanonicalActorSessionRequest<'_>,
1950 ) -> Result<(Session, bool)> {
1951 let CanonicalActorSessionRequest {
1952 tentative_session,
1953 tentative_owns_session,
1954 entry,
1955 probe,
1956 attach,
1957 } = request;
1958 let Some(canonical_session_id) = entry.heddle_session_id.as_deref() else {
1959 return Ok((tentative_session, tentative_owns_session));
1960 };
1961 if canonical_session_id == tentative_session.id {
1962 return Ok((tentative_session, tentative_owns_session));
1963 }
1964
1965 if tentative_owns_session
1966 && let Ok(Some(session)) = sessions.get_session(&tentative_session.id)
1967 && session.is_active()
1968 {
1969 let _ = sessions.end_session(Some(&tentative_session.id));
1970 }
1971
1972 let canonical_session = sessions
1973 .get_session(canonical_session_id)?
1974 .ok_or_else(|| anyhow!("session not found: {canonical_session_id}"))?;
1975 let canonical_segment_id = canonical_session
1976 .current_segment_id
1977 .clone()
1978 .unwrap_or_default();
1979 sessions.set_current_session(canonical_session_id, &canonical_segment_id)?;
1980
1981 if let Some(native_actor_key) = probe
1982 .native_actor_key
1983 .as_deref()
1984 .or(entry.native_actor_key.as_deref())
1985 {
1986 attach.precedence.push(format!(
1987 "post-create-native-actor-key:{native_actor_key}:matched"
1988 ));
1989 attach.attach_reason = format!(
1990 "reused existing native actor {} on Heddle session {}",
1991 native_actor_key, canonical_session_id
1992 );
1993 attach.winning_rule = "native-actor-key-post-create".to_string();
1994 }
1995
1996 Ok((canonical_session, false))
1997 }
1998
1999 fn sync_registry_from_report(
2000 &self,
2001 report: &SessionReportEnvelope,
2002 status: AgentStatus,
2003 ) -> Result<()> {
2004 let registry = AgentRegistry::new(self.repo.heddle_dir());
2005 let entry = if let Some(agent_session_id) = &report.agent_session_id {
2006 registry.update_entry(agent_session_id, |entry| {
2007 if report.client_instance_id.is_some() {
2008 entry.client_instance_id = report.client_instance_id.clone();
2009 }
2010 if report.native_actor_key.is_some() {
2011 entry.native_actor_key = report.native_actor_key.clone();
2012 }
2013 if report.native_parent_actor_key.is_some() {
2014 entry.native_parent_actor_key = report.native_parent_actor_key.clone();
2015 }
2016 if report.native_instance_key.is_some() {
2017 entry.native_instance_key = report.native_instance_key.clone();
2018 }
2019 entry.heddle_session_id = Some(report.heddle_session_id.clone());
2020 entry.path = Some(self.repo.root().to_path_buf());
2021 entry.harness = report.harness.harness.clone();
2022 entry.provider = report.harness.provider.clone();
2023 entry.model = report.harness.model.clone();
2024 entry.thinking_level = report.harness.thinking_level.clone();
2025 entry.usage_summary = usage_to_summary(&report.usage);
2026 entry.last_progress_at =
2027 report.last_progress_at.as_deref().and_then(parse_timestamp);
2028 entry.report_flush_state = report.report_flush_state.clone();
2029 entry.attach_reason = report.attach_reason.clone();
2030 entry.attach_precedence = report.attach_precedence.clone();
2031 entry.winning_attach_rule = report.winning_attach_rule.clone();
2032 entry.probe_source = report.probe_source.clone();
2033 entry.probe_confidence = report.probe_confidence;
2034 entry.status = status.clone();
2035 entry.completed_at = match status {
2036 AgentStatus::Active => None,
2037 AgentStatus::Abandoned | AgentStatus::Complete | AgentStatus::Merged => {
2038 Some(Utc::now())
2039 }
2040 };
2041 })?
2042 } else {
2043 None
2044 };
2045
2046 if entry.is_none() {
2047 let resolved = self.ensure_registry_entry(RegistryEntryRequest {
2048 heddle_session_id: &report.heddle_session_id,
2049 thread_name: report.thread.as_deref(),
2050 thread_id: report.thread_id.as_deref(),
2051 identity: &ResolvedIdentity {
2052 harness: report.harness.harness.clone(),
2053 provider: report.harness.provider.clone(),
2054 model: report.harness.model.clone(),
2055 thinking_level: report.harness.thinking_level.clone(),
2056 policy: report.harness.policy.clone(),
2057 },
2058 probe: &HarnessProbeResult {
2059 native_actor_key: report.native_actor_key.clone(),
2060 native_parent_actor_key: report.native_parent_actor_key.clone(),
2061 native_instance_key: report.native_instance_key.clone(),
2062 probe_source: report.probe_source.clone(),
2063 confidence: report.probe_confidence,
2064 ..HarnessProbeResult::default()
2065 },
2066 attach: &ResolvedAttachment {
2067 target: AttachTarget::CreateNew {
2068 _because_claimed: false,
2069 },
2070 matched_entry: None,
2071 attach_reason: report.attach_reason.clone().unwrap_or_else(|| {
2072 format!(
2073 "created actor for Heddle session {}",
2074 report.heddle_session_id
2075 )
2076 }),
2077 precedence: report.attach_precedence.clone(),
2078 winning_rule: report
2079 .winning_attach_rule
2080 .clone()
2081 .unwrap_or_else(|| "report-sync".to_string()),
2082 },
2083 client_instance_id: report.client_instance_id.as_deref(),
2084 requested_entry: None,
2085 })?;
2086 let mut report = report.clone();
2087 report.agent_session_id = Some(resolved.session_id);
2088 self.reports.save(&report)?;
2089 }
2090 Ok(())
2091 }
2092}
2093
2094#[derive(Debug, Clone, Default)]
2095struct ResolvedIdentity {
2096 harness: Option<String>,
2097 provider: Option<String>,
2098 model: Option<String>,
2099 thinking_level: Option<String>,
2100 policy: Option<String>,
2101}
2102
2103impl ResolvedIdentity {
2104 fn to_transport_identity(&self) -> HarnessIdentity {
2105 HarnessIdentity {
2106 harness: self.harness.clone(),
2107 provider: self.provider.clone(),
2108 model: self.model.clone(),
2109 thinking_level: self.thinking_level.clone(),
2110 policy: self.policy.clone(),
2111 }
2112 }
2113}
2114
2115struct IdentityHints {
2116 harness: Option<String>,
2117 provider: Option<String>,
2118 model: Option<String>,
2119 thinking_level: Option<String>,
2120 policy: Option<String>,
2121 probe: HarnessProbeResult,
2122}
2123
2124fn resolve_identity(
2125 repo: &Repository,
2126 user_config: &UserConfig,
2127 hints: IdentityHints,
2128) -> Result<ResolvedIdentity> {
2129 let current_session = SessionManager::new(repo.root()).get_current_session()?;
2130 let current_segment = current_session
2131 .as_ref()
2132 .and_then(|session| session.current_segment());
2133 let token_claims = if user_config.harness.auto_infer {
2134 user_config_token_claims(user_config)
2135 } else {
2136 None
2137 };
2138 let harness_override = resolved_harness_override(
2139 user_config,
2140 hints.harness.as_deref(),
2141 hints.probe.harness.as_deref(),
2142 );
2143
2144 Ok(ResolvedIdentity {
2145 harness: hints.harness.or(hints.probe.harness),
2146 provider: hints
2147 .provider
2148 .or(hints.probe.provider)
2149 .or_else(|| current_segment.map(|segment| segment.provider.clone()))
2150 .or_else(|| {
2151 token_claims
2152 .as_ref()
2153 .and_then(|claims| claims.agent_provider.clone())
2154 })
2155 .or_else(|| harness_override.and_then(|entry| entry.provider.clone()))
2156 .or_else(|| user_config.agent.provider.clone()),
2157 model: hints
2158 .model
2159 .or(hints.probe.model)
2160 .or_else(|| current_segment.map(|segment| segment.model.clone()))
2161 .or_else(|| {
2162 token_claims
2163 .as_ref()
2164 .and_then(|claims| claims.agent_model.clone())
2165 })
2166 .or_else(|| harness_override.and_then(|entry| entry.model.clone()))
2167 .or_else(|| user_config.agent.model.clone()),
2168 thinking_level: hints
2169 .thinking_level
2170 .or(hints.probe.thinking_level)
2171 .or_else(|| harness_override.and_then(|entry| entry.thinking_level.clone())),
2172 policy: hints
2173 .policy
2174 .or(hints.probe.policy)
2175 .or_else(|| current_segment.and_then(|segment| segment.policy_id.clone()))
2176 .or_else(|| harness_override.and_then(|entry| entry.policy.clone()))
2177 .or_else(|| user_config.agent.default_policy.clone()),
2178 })
2179}
2180
2181fn resolved_harness_override<'a>(
2182 user_config: &'a UserConfig,
2183 explicit: Option<&str>,
2184 fingerprint: Option<&str>,
2185) -> Option<&'a UserHarnessOverride> {
2186 explicit
2187 .and_then(|name| user_config.harness.harnesses.get(name))
2188 .or_else(|| fingerprint.and_then(|name| user_config.harness.harnesses.get(name)))
2189}
2190
2191enum AttachTarget {
2192 ExistingSession(objects::object::Session),
2193 CreateNew { _because_claimed: bool },
2194}
2195
2196struct ResolvedAttachment {
2197 target: AttachTarget,
2198 matched_entry: Option<AgentEntry>,
2199 attach_reason: String,
2200 precedence: Vec<String>,
2201 winning_rule: String,
2202}
2203
2204fn resolve_actor_attachment(
2205 registry: &AgentRegistry,
2206 repo: &Repository,
2207 sessions: &mut SessionManager,
2208 input: AttachmentResolutionInput<'_>,
2209) -> Result<ResolvedAttachment> {
2210 let AttachmentResolutionInput {
2211 requested_entry,
2212 explicit_heddle_session_id,
2213 client_instance_id,
2214 probe,
2215 token_claims,
2216 } = input;
2217 let mut precedence = Vec::new();
2218 if let Some(entry) = requested_entry
2219 && let Some(bound_session_id) = entry.heddle_session_id.as_deref()
2220 {
2221 precedence.push(format!(
2222 "explicit-agent-session:{}:matched",
2223 entry.session_id
2224 ));
2225 let session = sessions
2226 .get_session(bound_session_id)?
2227 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2228 if !session.is_active() {
2229 return Err(anyhow!("session is not active: {bound_session_id}"));
2230 }
2231 return Ok(ResolvedAttachment {
2232 target: AttachTarget::ExistingSession(session),
2233 matched_entry: Some(entry.clone()),
2234 attach_reason: format!(
2235 "reattached actor {} to existing Heddle session {}",
2236 entry.session_id, bound_session_id
2237 ),
2238 precedence,
2239 winning_rule: "explicit-agent-session".to_string(),
2240 });
2241 }
2242 precedence.push("explicit-agent-session:miss".to_string());
2243
2244 if let Some(session_id) = explicit_heddle_session_id {
2245 precedence.push(format!("explicit-heddle-session:{session_id}:matched"));
2246 ensure_requested_entry_matches_session(requested_entry, session_id)?;
2247 let session = sessions
2248 .get_session(session_id)?
2249 .ok_or_else(|| anyhow!("session not found: {session_id}"))?;
2250 if !session.is_active() {
2251 return Err(anyhow!("session is not active: {session_id}"));
2252 }
2253 return Ok(ResolvedAttachment {
2254 target: AttachTarget::ExistingSession(session),
2255 matched_entry: None,
2256 attach_reason: format!("attached to explicit Heddle session {session_id}"),
2257 precedence,
2258 winning_rule: "explicit-heddle-session".to_string(),
2259 });
2260 }
2261 precedence.push("explicit-heddle-session:miss".to_string());
2262
2263 if client_instance_id.is_none()
2264 && let Some(native_actor_key) = probe.native_actor_key.as_deref()
2265 {
2266 if let Some(entry) = registry.find_active_by_native_actor_key(native_actor_key)?
2267 && claude_actor_compatible(&entry, probe, repo.root())
2268 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2269 {
2270 precedence.push(format!("native-actor-key:{native_actor_key}:matched"));
2271 let session = sessions
2272 .get_session(&bound_session_id)?
2273 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2274 if session.is_active() {
2275 return Ok(ResolvedAttachment {
2276 target: AttachTarget::ExistingSession(session),
2277 matched_entry: Some(entry),
2278 attach_reason: format!(
2279 "reattached native actor {} to Heddle session {}",
2280 native_actor_key, bound_session_id
2281 ),
2282 precedence,
2283 winning_rule: "native-actor-key".to_string(),
2284 });
2285 }
2286 }
2287 precedence.push(format!("native-actor-key:{native_actor_key}:miss"));
2288 } else {
2289 precedence.push("native-actor-key:miss".to_string());
2290 }
2291
2292 if let Some(client_instance_id) = client_instance_id {
2293 if let Some(entry) = registry.find_active_by_client_instance_id(client_instance_id)?
2294 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2295 {
2296 precedence.push(format!("client-instance-id:{client_instance_id}:matched"));
2297 let session = sessions
2298 .get_session(&bound_session_id)?
2299 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2300 if session.is_active() {
2301 return Ok(ResolvedAttachment {
2302 target: AttachTarget::ExistingSession(session),
2303 matched_entry: Some(entry),
2304 attach_reason: format!(
2305 "reattached client instance {client_instance_id} to Heddle session {bound_session_id}"
2306 ),
2307 precedence,
2308 winning_rule: "client-instance-id".to_string(),
2309 });
2310 }
2311 }
2312 precedence.push(format!("client-instance-id:{client_instance_id}:miss"));
2313 return Ok(ResolvedAttachment {
2314 target: AttachTarget::CreateNew {
2315 _because_claimed: false,
2316 },
2317 matched_entry: None,
2318 attach_reason: format!(
2319 "started new Heddle session for distinct client instance {client_instance_id}"
2320 ),
2321 precedence,
2322 winning_rule: "create-new-session".to_string(),
2323 });
2324 } else {
2325 precedence.push("client-instance-id:miss".to_string());
2326 }
2327
2328 if client_instance_id.is_none() && probe.native_actor_key.is_some() {
2329 precedence.push("native-instance-key:skipped-strong-native-key".to_string());
2330 return Ok(ResolvedAttachment {
2331 target: AttachTarget::CreateNew {
2332 _because_claimed: false,
2333 },
2334 matched_entry: None,
2335 attach_reason:
2336 "started new Heddle session because no compatible native actor match was found"
2337 .to_string(),
2338 precedence,
2339 winning_rule: "create-new-session".to_string(),
2340 });
2341 }
2342
2343 if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
2344 if let Some(entry) =
2345 registry.find_active_by_native_instance_key_at_path(native_instance_key, repo.root())?
2346 && claude_actor_compatible(&entry, probe, repo.root())
2347 && let Some(bound_session_id) = entry.heddle_session_id.clone()
2348 {
2349 precedence.push(format!("native-instance-key:{native_instance_key}:matched"));
2350 let session = sessions
2351 .get_session(&bound_session_id)?
2352 .ok_or_else(|| anyhow!("session not found: {bound_session_id}"))?;
2353 if session.is_active() {
2354 return Ok(ResolvedAttachment {
2355 target: AttachTarget::ExistingSession(session),
2356 matched_entry: Some(entry),
2357 attach_reason: format!(
2358 "reattached native instance {} to Heddle session {}",
2359 native_instance_key, bound_session_id
2360 ),
2361 precedence,
2362 winning_rule: "native-instance-key".to_string(),
2363 });
2364 }
2365 }
2366 precedence.push(format!("native-instance-key:{native_instance_key}:miss"));
2367 } else {
2368 precedence.push("native-instance-key:miss".to_string());
2369 }
2370
2371 if probe.attach_hints.root_actor
2372 && let Some(current) = sessions.get_current_session()?
2373 && current.is_active()
2374 {
2375 let claimed = session_claimed_by_other(
2376 registry,
2377 ¤t.id,
2378 requested_entry,
2379 client_instance_id,
2380 probe.native_actor_key.as_deref(),
2381 )?;
2382 if !claimed {
2383 precedence.push(format!("current-worktree-session:{}:matched", current.id));
2384 return Ok(ResolvedAttachment {
2385 target: AttachTarget::ExistingSession(current.clone()),
2386 matched_entry: None,
2387 attach_reason: format!("attached to active worktree Heddle session {}", current.id),
2388 precedence,
2389 winning_rule: "current-worktree-session".to_string(),
2390 });
2391 }
2392 precedence.push(format!("current-worktree-session:{}:claimed", current.id));
2393 return Ok(ResolvedAttachment {
2394 target: AttachTarget::CreateNew {
2395 _because_claimed: true,
2396 },
2397 matched_entry: None,
2398 attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
2399 precedence,
2400 winning_rule: "create-new-session".to_string(),
2401 });
2402 }
2403 precedence.push("current-worktree-session:miss".to_string());
2404
2405 if let Some(claims) = token_claims
2406 && let Some(token_sid) = claims.sid.as_deref()
2407 && let Some(session) = sessions.get_session(token_sid)?
2408 && session.is_active()
2409 {
2410 let claimed = session_claimed_by_other(
2411 registry,
2412 &session.id,
2413 requested_entry,
2414 client_instance_id,
2415 probe.native_actor_key.as_deref(),
2416 )?;
2417 if !claimed {
2418 precedence.push(format!("token-sid:{token_sid}:matched"));
2419 return Ok(ResolvedAttachment {
2420 target: AttachTarget::ExistingSession(session),
2421 matched_entry: None,
2422 attach_reason: format!(
2423 "attached to Heddle session {token_sid} from auth token sid"
2424 ),
2425 precedence,
2426 winning_rule: "token-sid".to_string(),
2427 });
2428 }
2429 precedence.push(format!("token-sid:{token_sid}:claimed"));
2430 return Ok(ResolvedAttachment {
2431 target: AttachTarget::CreateNew {
2432 _because_claimed: true,
2433 },
2434 matched_entry: None,
2435 attach_reason: "started a new Heddle session because the current session was already claimed by another active actor".to_string(),
2436 precedence,
2437 winning_rule: "create-new-session".to_string(),
2438 });
2439 }
2440 precedence.push("token-sid:miss".to_string());
2441
2442 Ok(ResolvedAttachment {
2443 target: AttachTarget::CreateNew {
2444 _because_claimed: false,
2445 },
2446 matched_entry: None,
2447 attach_reason: "started new Heddle session".to_string(),
2448 precedence,
2449 winning_rule: "create-new-session".to_string(),
2450 })
2451}
2452
2453fn claude_actor_compatible(
2454 entry: &AgentEntry,
2455 probe: &HarnessProbeResult,
2456 repo_root: &Path,
2457) -> bool {
2458 let Some(native_actor_key) = probe.native_actor_key.as_deref() else {
2459 return true;
2460 };
2461 if !native_actor_key.starts_with("claude-code:") {
2462 return true;
2463 }
2464 if native_actor_key.starts_with("claude-code:agent:") {
2465 return entry.native_actor_key.as_deref() == Some(native_actor_key);
2466 }
2467 if let Some(native_instance_key) = probe.native_instance_key.as_deref() {
2468 return entry.native_actor_key.as_deref() == Some(native_actor_key)
2469 && entry.native_instance_key.as_deref() == Some(native_instance_key);
2470 }
2471 let same_repo = entry
2472 .path
2473 .as_ref()
2474 .map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()))
2475 .unwrap_or_default()
2476 == repo_root
2477 .canonicalize()
2478 .unwrap_or_else(|_| repo_root.to_path_buf());
2479 entry.native_actor_key.as_deref() == Some(native_actor_key)
2480 && same_repo
2481 && probe.confidence.unwrap_or_default() >= 0.9
2482}
2483
2484fn decode_token_claims(token: &str) -> Option<TokenClaims> {
2485 let payload = token.split('.').nth(1)?;
2486 let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
2487 .decode(payload.as_bytes())
2488 .ok()?;
2489 serde_json::from_slice(&decoded).ok()
2490}
2491
2492fn user_config_token_claims(user_config: &UserConfig) -> Option<TokenClaims> {
2493 user_config
2494 .remote_token()
2495 .ok()
2496 .flatten()
2497 .and_then(|token| decode_token_claims(&token.id))
2498}
2499
2500#[derive(Debug, Deserialize)]
2501struct TokenClaims {
2502 #[serde(default)]
2503 sid: Option<String>,
2504 #[serde(default)]
2505 agent_provider: Option<String>,
2506 #[serde(default)]
2507 agent_model: Option<String>,
2508}
2509
2510fn should_rotate_segment(session: &objects::object::Session, identity: &ResolvedIdentity) -> bool {
2511 let Some(segment) = session.current_segment() else {
2512 return false;
2513 };
2514 let provider_changed = identity
2515 .provider
2516 .as_deref()
2517 .is_some_and(|provider| provider != segment.provider);
2518 let model_changed = identity
2519 .model
2520 .as_deref()
2521 .is_some_and(|model| model != segment.model);
2522 provider_changed || model_changed
2523}
2524
2525fn thread_id_for_name(repo: &Repository, thread_name: Option<&str>) -> Result<Option<String>> {
2526 let Some(thread_name) = thread_name else {
2527 return Ok(None);
2528 };
2529 Ok(ThreadManager::new(repo.heddle_dir())
2530 .load(thread_name)?
2531 .map(|thread| thread.id))
2532}
2533
2534fn can_create_harness_thread(
2535 repo: &Repository,
2536 target_thread: Option<&str>,
2537 parent_thread: Option<&str>,
2538) -> Result<bool> {
2539 Ok(resolve_harness_thread_base_state(repo, target_thread, parent_thread)?.is_some())
2540}
2541
2542fn resolve_harness_thread_base_state(
2543 repo: &Repository,
2544 target_thread: Option<&str>,
2545 parent_thread: Option<&str>,
2546) -> Result<Option<objects::object::ChangeId>> {
2547 if let Some(head_state) = repo.head()? {
2548 return Ok(Some(head_state));
2549 }
2550
2551 for thread_name in [parent_thread, target_thread].into_iter().flatten() {
2552 if let Some(state) = resolve_named_thread_base_state(repo, thread_name)? {
2553 return Ok(Some(state));
2554 }
2555 }
2556
2557 Ok(None)
2558}
2559
2560fn resolve_named_thread_base_state(
2561 repo: &Repository,
2562 thread_name: &str,
2563) -> Result<Option<objects::object::ChangeId>> {
2564 if let Some(thread) = ThreadManager::new(repo.heddle_dir()).load(thread_name)?
2565 && let Some(state_spec) = thread
2566 .current_state
2567 .as_deref()
2568 .or(Some(thread.base_state.as_str()))
2569 && let Some(state_id) = repo
2570 .resolve_state(state_spec)?
2571 .or_else(|| objects::object::ChangeId::parse(state_spec).ok())
2572 {
2573 return Ok(Some(state_id));
2574 }
2575
2576 Ok(repo.refs().get_thread(&ThreadName::new(thread_name))?)
2577}
2578
2579fn resolve_parent_thread_for_subagent(
2580 repo: &Repository,
2581 probe: &HarnessProbeResult,
2582 current_attached: Option<&str>,
2583) -> Result<Option<String>> {
2584 if let Some(parent_key) = probe.native_parent_actor_key.as_deref() {
2585 let registry = AgentRegistry::new(repo.heddle_dir());
2586 if let Some(entry) = registry.find_active_by_native_actor_key(parent_key)? {
2587 return Ok(Some(entry.thread));
2588 }
2589 }
2590 Ok(current_attached.map(ToString::to_string))
2591}
2592
2593fn preferred_thread_slug(
2594 params: &OpenSessionParams,
2595 probe: &HarnessProbeResult,
2596 identity: &ResolvedIdentity,
2597) -> String {
2598 params
2599 .task
2600 .clone()
2601 .or_else(|| params.summary.clone())
2602 .or_else(|| probe.native_actor_key.as_deref().map(native_key_slug))
2603 .or_else(|| probe.native_instance_key.as_deref().map(native_key_slug))
2604 .or_else(|| identity.harness.clone())
2605 .unwrap_or_else(|| "work".to_string())
2606}
2607
2608fn native_key_slug(value: &str) -> String {
2609 value
2610 .rsplit(':')
2611 .next()
2612 .map(ToString::to_string)
2613 .unwrap_or_else(|| value.to_string())
2614}
2615
2616fn allocate_thread_name(repo: &Repository, base: &str) -> Result<String> {
2617 if ThreadManager::new(repo.heddle_dir()).load(base)?.is_none()
2618 && repo.refs().get_thread(&ThreadName::new(base))?.is_none()
2619 {
2620 return Ok(base.to_string());
2621 }
2622 for idx in 2..1000 {
2623 let candidate = format!("{base}-{idx}");
2624 if ThreadManager::new(repo.heddle_dir())
2625 .load(&candidate)?
2626 .is_none()
2627 && repo
2628 .refs()
2629 .get_thread(&ThreadName::new(&candidate))?
2630 .is_none()
2631 {
2632 return Ok(candidate);
2633 }
2634 }
2635 Err(anyhow!(
2636 "could not allocate a unique thread name from '{base}'"
2637 ))
2638}
2639
2640fn default_private_thread_path(repo: &Repository, name: &str) -> PathBuf {
2641 repo.managed_checkout_path(name)
2649}
2650
2651fn sanitize_name(name: &str) -> String {
2652 let mut out = String::new();
2653 let mut last_dash = false;
2654 for ch in name.chars() {
2655 if ch.is_ascii_alphanumeric() {
2656 out.push(ch.to_ascii_lowercase());
2657 last_dash = false;
2658 } else if !last_dash {
2659 out.push('-');
2660 last_dash = true;
2661 }
2662 }
2663 out.trim_matches('-').to_string()
2664}
2665
2666fn resolve_requested_registry_entry(
2667 registry: &AgentRegistry,
2668 agent_session_id: Option<&str>,
2669 client_instance_id: Option<&str>,
2670) -> Result<Option<AgentEntry>> {
2671 if let Some(agent_session_id) = agent_session_id {
2672 let entry = registry
2673 .load(agent_session_id)?
2674 .ok_or_else(|| anyhow!("agent session not found: {agent_session_id}"))?;
2675 if entry.status != AgentStatus::Active {
2676 return Err(anyhow!("agent session is not active: {agent_session_id}"));
2677 }
2678 return Ok(Some(entry));
2679 }
2680
2681 if let Some(client_instance_id) = client_instance_id {
2682 return Ok(registry.find_active_by_client_instance_id(client_instance_id)?);
2683 }
2684
2685 Ok(None)
2686}
2687
2688fn ensure_requested_entry_matches_session(
2689 requested_entry: Option<&AgentEntry>,
2690 heddle_session_id: &str,
2691) -> Result<()> {
2692 if let Some(entry) = requested_entry
2693 && let Some(bound_session_id) = entry.heddle_session_id.as_deref()
2694 && bound_session_id != heddle_session_id
2695 {
2696 return Err(anyhow!(
2697 "requested agent is already bound to a different heddle session: {}",
2698 entry.session_id
2699 ));
2700 }
2701 Ok(())
2702}
2703
2704fn session_claimed_by_other(
2705 registry: &AgentRegistry,
2706 heddle_session_id: &str,
2707 requested_entry: Option<&AgentEntry>,
2708 client_instance_id: Option<&str>,
2709 native_actor_key: Option<&str>,
2710) -> Result<bool> {
2711 if requested_entry.is_none() && client_instance_id.is_none() && native_actor_key.is_none() {
2712 return Ok(false);
2713 }
2714
2715 let Some(existing) = registry.find_active_by_heddle_session_id(heddle_session_id)? else {
2716 return Ok(false);
2717 };
2718 if let Some(requested) = requested_entry {
2719 return Ok(requested.session_id != existing.session_id);
2720 }
2721 if let Some(client_instance_id) = client_instance_id
2722 && existing.client_instance_id.as_deref() == Some(client_instance_id)
2723 {
2724 return Ok(false);
2725 }
2726 if let Some(native_actor_key) = native_actor_key
2727 && existing.native_actor_key.as_deref() == Some(native_actor_key)
2728 {
2729 return Ok(false);
2730 }
2731 Ok(true)
2732}
2733
2734fn find_matching_registry_entry(
2735 registry: &AgentRegistry,
2736 repo: &Repository,
2737 heddle_session_id: &str,
2738 thread_name: Option<&str>,
2739) -> Result<Option<AgentEntry>> {
2740 if let Some(entry) = registry.find_active_by_heddle_session_id(heddle_session_id)? {
2741 return Ok(Some(entry));
2742 }
2743 let canonical_root = repo
2744 .root()
2745 .canonicalize()
2746 .unwrap_or_else(|_| repo.root().to_path_buf());
2747 Ok(registry
2748 .list()?
2749 .into_iter()
2750 .filter(|entry| entry.status == AgentStatus::Active)
2751 .find(|entry| {
2752 entry
2753 .path
2754 .as_ref()
2755 .map(|path| path.canonicalize().unwrap_or_else(|_| path.clone()) == canonical_root)
2756 .unwrap_or(false)
2757 || thread_name.is_some_and(|thread| entry.thread == thread)
2758 }))
2759}
2760
2761fn merged_env_hints(extra: &BTreeMap<String, String>) -> BTreeMap<String, String> {
2762 let mut merged: BTreeMap<String, String> = std::env::vars()
2763 .filter(|(key, _)| inherited_harness_hint(key))
2764 .collect();
2765 for (key, value) in extra {
2766 merged.insert(key.clone(), value.clone());
2767 }
2768 merged
2769}
2770
2771fn inherited_harness_hint(key: &str) -> bool {
2772 if matches!(
2773 key,
2774 "OPENAI_MODEL"
2775 | "ANTHROPIC_MODEL"
2776 | "CLAUDE_MODEL"
2777 | "MODEL"
2778 | "OPENAI_REASONING_EFFORT"
2779 | "REASONING_EFFORT"
2780 | "THINKING_LEVEL"
2781 | "PROMPT_POLICY"
2782 ) {
2783 return false;
2784 }
2785
2786 key.starts_with("HEDDLE_")
2787 || key.starts_with("CODEX_")
2788 || key == "CLAUDECODE"
2789 || key.starts_with("OPENCODE_")
2790}
2791
2792fn to_json_value<T: Serialize>(value: T) -> Result<Value> {
2793 serde_json::to_value(value).map_err(|err| anyhow!(err))
2794}
2795
2796fn normalize_paths<I>(paths: I) -> Vec<String>
2797where
2798 I: IntoIterator<Item = String>,
2799{
2800 let mut ordered = BTreeSet::new();
2801 for path in paths {
2802 let normalized = path.trim().replace('\\', "/");
2803 if !normalized.is_empty() {
2804 ordered.insert(normalized);
2805 }
2806 }
2807 ordered.into_iter().collect()
2808}
2809
2810fn merge_unique_paths<I>(target: &mut Vec<String>, paths: I)
2811where
2812 I: IntoIterator<Item = String>,
2813{
2814 let mut merged: BTreeSet<String> = target.iter().cloned().collect();
2815 merged.extend(paths);
2816 *target = merged.into_iter().collect();
2817}
2818
2819fn max_u64(current: Option<u64>, candidate: u64) -> u64 {
2820 current
2821 .map(|value| value.max(candidate))
2822 .unwrap_or(candidate)
2823}
2824
2825fn max_u32(current: Option<u32>, candidate: u32) -> u32 {
2826 current
2827 .map(|value| value.max(candidate))
2828 .unwrap_or(candidate)
2829}
2830
2831fn merge_usage(target: &mut UsageTotals, incoming: &UsageTotals) {
2832 if let Some(input) = incoming.input_tokens {
2833 target.input_tokens = Some(max_u64(target.input_tokens, input));
2834 }
2835 if let Some(output) = incoming.output_tokens {
2836 target.output_tokens = Some(max_u64(target.output_tokens, output));
2837 }
2838 if let Some(reasoning) = incoming.reasoning_tokens {
2839 target.reasoning_tokens = Some(max_u64(target.reasoning_tokens, reasoning));
2840 }
2841 if let Some(cache_creation) = incoming.cache_creation_tokens {
2842 target.cache_creation_tokens = Some(max_u64(target.cache_creation_tokens, cache_creation));
2843 }
2844 if let Some(cache_read) = incoming.cache_read_tokens {
2845 target.cache_read_tokens = Some(max_u64(target.cache_read_tokens, cache_read));
2846 }
2847 if let Some(tool_calls) = incoming.tool_calls {
2848 target.tool_calls = Some(max_u32(target.tool_calls, tool_calls));
2849 }
2850 if let Some(cost) = incoming.cost_micros_usd {
2851 target.cost_micros_usd = Some(max_u64(target.cost_micros_usd, cost));
2852 }
2853}
2854
2855fn parse_timestamp(value: &str) -> Option<chrono::DateTime<Utc>> {
2856 chrono::DateTime::parse_from_rfc3339(value)
2857 .ok()
2858 .map(|dt| dt.with_timezone(&Utc))
2859}
2860
2861fn transport_from_report(
2862 report: &SessionReportEnvelope,
2863 fallback: HarnessTransport,
2864) -> HarnessTransport {
2865 match report.transport_mode.as_str() {
2866 "spool" => HarnessTransport::Spool,
2867 "direct" => HarnessTransport::Direct,
2868 "end" => HarnessTransport::End,
2869 _ => fallback,
2870 }
2871}
2872
2873fn mark_pending_flush(report: &mut SessionReportEnvelope) {
2874 report.pending_flush = true;
2875 report.report_flush_state = Some("pending-local".to_string());
2876}
2877
2878fn enqueue_report(store: &SessionReportStore, report: &mut SessionReportEnvelope) -> Result<()> {
2879 store.append_outbox(report)?;
2880 report.pending_flush = false;
2881 let flushed_at = Utc::now().to_rfc3339();
2882 report.last_flushed_at = Some(flushed_at);
2883 report.report_flush_state = Some("queued-local".to_string());
2884 store.save(report)?;
2885 Ok(())
2886}
2887
2888fn usage_to_summary(usage: &UsageTotals) -> AgentUsageSummary {
2889 AgentUsageSummary {
2890 input_tokens: usage.input_tokens,
2891 output_tokens: usage.output_tokens,
2892 reasoning_tokens: usage.reasoning_tokens,
2893 tool_calls: usage.tool_calls,
2894 cost_micros_usd: usage.cost_micros_usd,
2895 }
2896}
2897
2898fn transcript_mode_name(mode: HarnessTranscriptMode) -> &'static str {
2899 match mode {
2900 HarnessTranscriptMode::Off => "off",
2901 HarnessTranscriptMode::Summary => "summary",
2902 HarnessTranscriptMode::Full => "full",
2903 }
2904}
2905
2906fn transport_mode_name(mode: HarnessTransport) -> &'static str {
2907 match mode {
2908 HarnessTransport::Spool => "spool",
2909 HarnessTransport::Direct => "direct",
2910 HarnessTransport::End => "end",
2911 }
2912}
2913
2914struct FinalDiff {
2915 changed_paths: Vec<String>,
2916 diff_summary: SessionDiffSummary,
2917 head_state: Option<String>,
2918}
2919
2920fn compute_final_diff(
2921 repo: &Repository,
2922 base_state: Option<&str>,
2923 worktree_baseline: &[WorktreeChangeBaseline],
2924) -> Result<FinalDiff> {
2925 let mut changes: BTreeMap<String, DiffKind> = BTreeMap::new();
2926
2927 let head_state = repo.head()?;
2928 if let (Some(base_spec), Some(head_id)) = (base_state, head_state) {
2929 let base_id = repo
2930 .resolve_state(base_spec)?
2931 .or_else(|| objects::object::ChangeId::parse(base_spec).ok());
2932 if let Some(base_id) = base_id
2933 && base_id != head_id
2934 {
2935 let Some(base_state_obj) = repo.store().get_state(&base_id)? else {
2936 return Err(anyhow!("base state not found: {base_spec}"));
2937 };
2938 let Some(head_state_obj) = repo.store().get_state(&head_id)? else {
2939 return Err(anyhow!("head state not found: {}", head_id.short()));
2940 };
2941 for change in repo.diff_trees(&base_state_obj.tree, &head_state_obj.tree)? {
2942 changes.insert(change.path, change.kind);
2943 }
2944 }
2945 }
2946
2947 let baseline_paths: BTreeSet<(String, String)> = worktree_baseline
2948 .iter()
2949 .map(|change| (change.path.clone(), change.kind.clone()))
2950 .collect();
2951 for (path, kind) in collect_worktree_changes(repo)? {
2952 let kind_name = diff_kind_name(kind);
2953 if !baseline_paths.contains(&(path.clone(), kind_name.to_string())) {
2954 changes.insert(path, kind);
2955 }
2956 }
2957
2958 let diff_summary = SessionDiffSummary {
2959 changed_file_count: changes.len() as u32,
2960 added_files: changes
2961 .values()
2962 .filter(|kind| **kind == DiffKind::Added)
2963 .count() as u32,
2964 modified_files: changes
2965 .values()
2966 .filter(|kind| **kind == DiffKind::Modified)
2967 .count() as u32,
2968 deleted_files: changes
2969 .values()
2970 .filter(|kind| **kind == DiffKind::Deleted)
2971 .count() as u32,
2972 };
2973
2974 Ok(FinalDiff {
2975 changed_paths: changes.into_keys().collect(),
2976 diff_summary,
2977 head_state: head_state.map(|id| id.to_string_full()),
2978 })
2979}
2980
2981fn capture_worktree_change_snapshot(repo: &Repository) -> Result<Vec<WorktreeChangeBaseline>> {
2982 Ok(collect_worktree_changes(repo)?
2983 .into_iter()
2984 .map(|(path, kind)| WorktreeChangeBaseline {
2985 path,
2986 kind: diff_kind_name(kind).to_string(),
2987 })
2988 .collect())
2989}
2990
2991fn collect_worktree_changes(repo: &Repository) -> Result<BTreeMap<String, DiffKind>> {
2992 let status_options = worktree_status_options(Some(repo.config()));
2993 let worktree_tree = match repo.current_state()? {
2994 Some(state) => repo.require_tree(&state.tree)?,
2995 None => Tree::new(),
2996 };
2997 let status = repo.compare_worktree_cached_with_options(&worktree_tree, &status_options)?;
2998 let mut changes = BTreeMap::new();
2999 for path in status.added {
3000 changes.insert(path.display().to_string(), DiffKind::Added);
3001 }
3002 for path in status.modified {
3003 changes.insert(path.display().to_string(), DiffKind::Modified);
3004 }
3005 for path in status.deleted {
3006 changes.insert(path.display().to_string(), DiffKind::Deleted);
3007 }
3008 Ok(changes)
3009}
3010
3011fn changed_paths_between_states(
3012 repo: &Repository,
3013 before_state: ChangeId,
3014 after_state: ChangeId,
3015) -> Result<Vec<String>> {
3016 if before_state == after_state {
3017 return Ok(Vec::new());
3018 }
3019 let Some(before_state_obj) = repo.store().get_state(&before_state)? else {
3020 return Err(anyhow!(
3021 "timeline before state not found: {}",
3022 before_state.short()
3023 ));
3024 };
3025 let Some(after_state_obj) = repo.store().get_state(&after_state)? else {
3026 return Err(anyhow!(
3027 "timeline after state not found: {}",
3028 after_state.short()
3029 ));
3030 };
3031 let mut paths = BTreeSet::new();
3032 for change in repo.diff_trees(&before_state_obj.tree, &after_state_obj.tree)? {
3033 paths.insert(change.path);
3034 }
3035 Ok(paths.into_iter().collect())
3036}
3037
3038fn diff_kind_name(kind: DiffKind) -> &'static str {
3039 match kind {
3040 DiffKind::Added => "added",
3041 DiffKind::Modified => "modified",
3042 DiffKind::Deleted => "deleted",
3043 DiffKind::Unchanged => "unchanged",
3044 }
3045}
3046
3047struct SessionReportStore {
3048 dir: PathBuf,
3049}
3050
3051impl SessionReportStore {
3052 fn new(repo_root: &Path) -> Self {
3053 Self {
3054 dir: repo_root.join(".heddle/state").join("session-reports"),
3055 }
3056 }
3057
3058 fn session_path(&self, heddle_session_id: &str) -> PathBuf {
3059 self.dir.join(format!("{heddle_session_id}.json"))
3060 }
3061
3062 fn outbox_path(&self) -> PathBuf {
3063 self.dir.join("outbox.jsonl")
3064 }
3065
3066 fn load(&self, heddle_session_id: &str) -> Result<Option<SessionReportEnvelope>> {
3067 let path = self.session_path(heddle_session_id);
3068 if !path.exists() {
3069 return Ok(None);
3070 }
3071 let bytes = fs::read(path)?;
3072 Ok(Some(serde_json::from_slice(&bytes)?))
3073 }
3074
3075 fn save(&self, report: &SessionReportEnvelope) -> Result<()> {
3076 fs::create_dir_all(&self.dir)?;
3077 let path = self.session_path(&report.heddle_session_id);
3078 let bytes = serde_json::to_vec_pretty(report)?;
3079 write_file_atomic(&path, &bytes)?;
3080 Ok(())
3081 }
3082
3083 fn append_outbox(&self, report: &SessionReportEnvelope) -> Result<()> {
3084 fs::create_dir_all(&self.dir)?;
3085 let mut file = OpenOptions::new()
3086 .create(true)
3087 .append(true)
3088 .open(self.outbox_path())?;
3089 serde_json::to_writer(&mut file, report)?;
3090 file.write_all(b"\n")?;
3091 file.flush()?;
3092 Ok(())
3093 }
3094
3095 fn list_pending(&self) -> Result<Vec<String>> {
3096 if !self.dir.exists() {
3097 return Ok(Vec::new());
3098 }
3099 let mut ids = Vec::new();
3100 for entry in fs::read_dir(&self.dir)? {
3101 let entry = entry?;
3102 let path = entry.path();
3103 if path.extension().and_then(|ext| ext.to_str()) != Some("json") {
3104 continue;
3105 }
3106 let bytes = fs::read(&path)?;
3107 let report: SessionReportEnvelope = serde_json::from_slice(&bytes)?;
3108 if report.pending_flush {
3109 ids.push(report.heddle_session_id);
3110 }
3111 }
3112 ids.sort();
3113 Ok(ids)
3114 }
3115}
3116
3117#[derive(Debug, Deserialize)]
3118struct BridgeRequest {
3119 #[serde(default)]
3120 id: Option<String>,
3121 method: String,
3122 #[serde(default)]
3123 params: Value,
3124}
3125
3126#[derive(Debug, Serialize)]
3127struct BridgeResponse {
3128 #[serde(default)]
3129 id: Option<String>,
3130 ok: bool,
3131 #[serde(skip_serializing_if = "Option::is_none")]
3132 result: Option<Value>,
3133 #[serde(skip_serializing_if = "Option::is_none")]
3134 error: Option<BridgeError>,
3135}
3136
3137impl BridgeResponse {
3138 fn ok(id: Option<String>, result: Value) -> Self {
3139 Self {
3140 id,
3141 ok: true,
3142 result: Some(result),
3143 error: None,
3144 }
3145 }
3146
3147 fn error(id: Option<String>, code: impl Into<String>, message: impl Into<String>) -> Self {
3148 Self {
3149 id,
3150 ok: false,
3151 result: None,
3152 error: Some(BridgeError {
3153 code: code.into(),
3154 message: message.into(),
3155 }),
3156 }
3157 }
3158}
3159
3160#[derive(Debug, Serialize)]
3161struct BridgeError {
3162 code: String,
3163 message: String,
3164}
3165
3166#[derive(Debug, Clone, Deserialize, Default)]
3167struct OpenSessionParams {
3168 #[serde(default)]
3169 heddle_session_id: Option<String>,
3170 #[serde(default)]
3171 agent_session_id: Option<String>,
3172 #[serde(default)]
3173 client_instance_id: Option<String>,
3174 #[serde(default)]
3175 thread: Option<String>,
3176 #[serde(default)]
3177 task: Option<String>,
3178 #[serde(default)]
3179 summary: Option<String>,
3180 #[serde(default)]
3181 harness: Option<String>,
3182 #[serde(default)]
3183 provider: Option<String>,
3184 #[serde(default)]
3185 model: Option<String>,
3186 #[serde(default)]
3187 thinking_level: Option<String>,
3188 #[serde(default)]
3189 policy: Option<String>,
3190 #[serde(default)]
3191 transport: Option<HarnessTransport>,
3192 #[serde(default)]
3193 transcript_mode: Option<HarnessTranscriptMode>,
3194 #[serde(default)]
3195 argv: Option<Vec<String>>,
3196 #[serde(default)]
3197 env_hints: BTreeMap<String, String>,
3198 #[serde(default)]
3199 probe_metadata: BTreeMap<String, String>,
3200}
3201
3202#[derive(Debug, Clone, Deserialize, Default)]
3203struct UpdateProgressParams {
3204 heddle_session_id: String,
3205 #[serde(default)]
3206 status: Option<String>,
3207 #[serde(default)]
3208 message: Option<String>,
3209 #[serde(default)]
3210 completed_steps: Option<u32>,
3211 #[serde(default)]
3212 total_steps: Option<u32>,
3213 #[serde(default)]
3214 touched_paths: Vec<String>,
3215 #[serde(default)]
3216 summary: Option<String>,
3217 #[serde(default)]
3218 harness: Option<String>,
3219 #[serde(default)]
3220 provider: Option<String>,
3221 #[serde(default)]
3222 model: Option<String>,
3223 #[serde(default)]
3224 thinking_level: Option<String>,
3225 #[serde(default)]
3226 policy: Option<String>,
3227 #[serde(default)]
3228 argv: Option<Vec<String>>,
3229 #[serde(default)]
3230 env_hints: BTreeMap<String, String>,
3231 #[serde(default)]
3232 probe_metadata: BTreeMap<String, String>,
3233}
3234
3235#[derive(Debug, Clone, Deserialize, Default)]
3236struct RecordUsageParams {
3237 heddle_session_id: String,
3238 #[serde(default)]
3239 input_tokens: Option<u64>,
3240 #[serde(default)]
3241 output_tokens: Option<u64>,
3242 #[serde(default)]
3243 reasoning_tokens: Option<u64>,
3244 #[serde(default)]
3245 cache_creation_tokens: Option<u64>,
3246 #[serde(default)]
3247 cache_read_tokens: Option<u64>,
3248 #[serde(default)]
3249 tool_calls: Option<u32>,
3250 #[serde(default)]
3251 cost_micros_usd: Option<u64>,
3252}
3253
3254#[derive(Debug, Clone, Deserialize, Default)]
3255struct RecordTouchedPathsParams {
3256 heddle_session_id: String,
3257 #[serde(default)]
3258 paths: Vec<String>,
3259}
3260
3261#[derive(Debug, Clone, Deserialize, Default)]
3262struct CloseSessionParams {
3263 heddle_session_id: String,
3264 #[serde(default)]
3265 outcome: Option<String>,
3266 #[serde(default)]
3267 summary: Option<String>,
3268 #[serde(default)]
3269 transcript_refs: Option<Vec<TranscriptAttachmentRef>>,
3270 #[serde(default)]
3271 transport: Option<HarnessTransport>,
3272}
3273
3274#[derive(Debug, Clone, Deserialize, Default)]
3275struct FlushReportsParams {
3276 #[serde(default)]
3277 heddle_session_id: Option<String>,
3278}
3279
3280#[derive(Debug, Serialize)]
3281struct OpenSessionResult {
3282 heddle_session_id: String,
3283 heddle_segment_id: Option<String>,
3284 agent_session_id: Option<String>,
3285 created_session: bool,
3286 harness: Option<String>,
3287 provider: Option<String>,
3288 model: Option<String>,
3289 thinking_level: Option<String>,
3290 report_flush_state: Option<String>,
3291 attach_reason: Option<String>,
3292}
3293
3294#[derive(Debug, Serialize)]
3295struct SessionMutationResult {
3296 heddle_session_id: String,
3297 heddle_segment_id: Option<String>,
3298 report_flush_state: Option<String>,
3299}
3300
3301#[derive(Debug, Serialize)]
3302struct CloseSessionResult {
3303 heddle_session_id: String,
3304 changed_paths: Vec<String>,
3305 diff_summary: SessionDiffSummary,
3306 report_flush_state: Option<String>,
3307}
3308
3309#[derive(Debug, Serialize)]
3310struct FlushReportsResult {
3311 flushed: usize,
3312}
3313
3314#[cfg(test)]
3315mod tests {
3316 #[cfg(unix)]
3317 use std::os::unix::fs::PermissionsExt;
3318
3319 use super::*;
3320
3321 fn init_repo() -> (tempfile::TempDir, Repository) {
3322 let temp = tempfile::TempDir::new().unwrap();
3323 let repo = Repository::init_default(temp.path()).unwrap();
3324 (temp, repo)
3325 }
3326
3327 #[test]
3328 fn harness_config_load_missing_path_defaults_without_warning() {
3329 let temp = tempfile::TempDir::new().unwrap();
3330 let missing = temp.path().join("missing-config.toml");
3331
3332 let (config, warning) = load_harness_user_config(Some(missing));
3333
3334 assert_eq!(config.harness.transport, HarnessTransport::Spool);
3335 assert!(warning.is_none());
3336 }
3337
3338 #[test]
3339 fn harness_config_load_malformed_path_warns_and_defaults() {
3340 let temp = tempfile::TempDir::new().unwrap();
3341 let path = temp.path().join("config.toml");
3342 std::fs::write(&path, "[harness\ntransport = \"direct\"\n").unwrap();
3343
3344 let (config, warning) = load_harness_user_config(Some(path.clone()));
3345
3346 assert_eq!(config.harness.transport, HarnessTransport::Spool);
3347 let warning = warning.expect("malformed config should produce a warning");
3348 assert!(warning.contains("failed to load user config"));
3349 assert!(warning.contains(&path.display().to_string()));
3350 assert!(warning.contains("continuing with defaults"));
3351 }
3352
3353 #[test]
3354 fn harness_config_load_valid_path_loads_without_warning() {
3355 let temp = tempfile::TempDir::new().unwrap();
3356 let path = temp.path().join("config.toml");
3357 std::fs::write(
3358 &path,
3359 "[harness]\ntransport = \"direct\"\ntranscript = \"summary\"\n",
3360 )
3361 .unwrap();
3362
3363 let (config, warning) = load_harness_user_config(Some(path));
3364
3365 assert_eq!(config.harness.transport, HarnessTransport::Direct);
3366 assert_eq!(config.harness.transcript, HarnessTranscriptMode::Summary);
3367 assert!(warning.is_none());
3368 }
3369
3370 #[test]
3371 fn relay_payload_parse_invalid_json_warns_and_uses_null() {
3372 let (value, warning) = parse_relay_payload("{not-json");
3373
3374 assert_eq!(value, Value::Null);
3375 let warning = warning.expect("invalid JSON should produce a warning");
3376 assert!(warning.contains("failed to parse harness relay payload as JSON"));
3377 assert!(warning.contains("continuing with null payload"));
3378 }
3379
3380 #[test]
3381 fn relay_payload_parse_empty_payload_uses_null_without_warning() {
3382 let (value, warning) = parse_relay_payload(" \n");
3383
3384 assert_eq!(value, Value::Null);
3385 assert!(warning.is_none());
3386 }
3387
3388 #[test]
3389 fn relay_payload_parse_valid_json_without_warning() {
3390 let (value, warning) = parse_relay_payload(r#"{"message":"hello"}"#);
3391
3392 assert_eq!(value["message"], "hello");
3393 assert!(warning.is_none());
3394 }
3395
3396 #[test]
3404 fn harness_default_path_matches_canonical_thread_dir() {
3405 let (_temp, repo) = init_repo();
3406 for id in ["foo", "parent/task", "feature/foo", "team@scope"] {
3407 let harness_path = default_private_thread_path(&repo, id);
3408 let canonical = repo.managed_checkout_path(id);
3409 assert_eq!(
3410 harness_path, canonical,
3411 "harness default must match the canonical thread_dir for {id:?}"
3412 );
3413 }
3414 }
3415
3416 #[test]
3417 fn inherited_harness_hints_exclude_ambient_model_identity() {
3418 assert!(!inherited_harness_hint("OPENAI_MODEL"));
3419 assert!(!inherited_harness_hint("ANTHROPIC_MODEL"));
3420 assert!(!inherited_harness_hint("CLAUDE_MODEL"));
3421 assert!(!inherited_harness_hint("MODEL"));
3422 assert!(!inherited_harness_hint("OPENAI_REASONING_EFFORT"));
3423 assert!(inherited_harness_hint("HEDDLE_AGENT_MODEL"));
3424 assert!(inherited_harness_hint("CODEX_SANDBOX"));
3425 assert!(inherited_harness_hint("CLAUDECODE"));
3426 }
3427
3428 #[test]
3429 fn open_session_creates_or_attaches() {
3430 let (_temp, repo) = init_repo();
3431 let user_config = UserConfig::default();
3432 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3433
3434 let created = runtime
3435 .open_session(OpenSessionParams {
3436 harness: Some("codex".to_string()),
3437 provider: Some("openai".to_string()),
3438 model: Some("gpt-5.4".to_string()),
3439 ..OpenSessionParams::default()
3440 })
3441 .unwrap();
3442 assert!(created.created_session);
3443
3444 let attached = runtime
3445 .open_session(OpenSessionParams {
3446 harness: Some("codex".to_string()),
3447 provider: Some("openai".to_string()),
3448 model: Some("gpt-5.4".to_string()),
3449 ..OpenSessionParams::default()
3450 })
3451 .unwrap();
3452 assert!(!attached.created_session);
3453 assert_eq!(created.heddle_session_id, attached.heddle_session_id);
3454 }
3455
3456 #[test]
3457 fn same_client_instance_reattaches_to_its_existing_session() {
3458 let (_temp, repo) = init_repo();
3459 let user_config = UserConfig::default();
3460 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3461
3462 let first = runtime
3463 .open_session(OpenSessionParams {
3464 client_instance_id: Some("client-a".to_string()),
3465 harness: Some("codex".to_string()),
3466 provider: Some("openai".to_string()),
3467 model: Some("gpt-5.4".to_string()),
3468 ..OpenSessionParams::default()
3469 })
3470 .unwrap();
3471 let second = runtime
3472 .open_session(OpenSessionParams {
3473 client_instance_id: Some("client-b".to_string()),
3474 harness: Some("codex".to_string()),
3475 provider: Some("openai".to_string()),
3476 model: Some("gpt-5.4".to_string()),
3477 ..OpenSessionParams::default()
3478 })
3479 .unwrap();
3480 let reopened = runtime
3481 .open_session(OpenSessionParams {
3482 client_instance_id: Some("client-a".to_string()),
3483 harness: Some("codex".to_string()),
3484 provider: Some("openai".to_string()),
3485 model: Some("gpt-5.4".to_string()),
3486 ..OpenSessionParams::default()
3487 })
3488 .unwrap();
3489
3490 assert_ne!(first.heddle_session_id, second.heddle_session_id);
3491 assert_eq!(first.heddle_session_id, reopened.heddle_session_id);
3492 assert_eq!(first.agent_session_id, reopened.agent_session_id);
3493 }
3494
3495 #[test]
3496 fn different_client_instances_do_not_share_the_current_session() {
3497 let (_temp, repo) = init_repo();
3498 let user_config = UserConfig::default();
3499 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3500
3501 let first = runtime
3502 .open_session(OpenSessionParams {
3503 client_instance_id: Some("client-a".to_string()),
3504 harness: Some("codex".to_string()),
3505 provider: Some("openai".to_string()),
3506 model: Some("gpt-5.4".to_string()),
3507 ..OpenSessionParams::default()
3508 })
3509 .unwrap();
3510 let second = runtime
3511 .open_session(OpenSessionParams {
3512 client_instance_id: Some("client-b".to_string()),
3513 harness: Some("codex".to_string()),
3514 provider: Some("openai".to_string()),
3515 model: Some("gpt-5.4".to_string()),
3516 ..OpenSessionParams::default()
3517 })
3518 .unwrap();
3519
3520 assert_ne!(first.heddle_session_id, second.heddle_session_id);
3521 assert_ne!(first.agent_session_id, second.agent_session_id);
3522 }
3523
3524 #[test]
3525 fn provider_model_change_creates_segment() {
3526 let (_temp, repo) = init_repo();
3527 let user_config = UserConfig::default();
3528 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3529
3530 let opened = runtime
3531 .open_session(OpenSessionParams {
3532 harness: Some("claude-code".to_string()),
3533 provider: Some("anthropic".to_string()),
3534 model: Some("claude-sonnet".to_string()),
3535 ..OpenSessionParams::default()
3536 })
3537 .unwrap();
3538 runtime
3539 .update_progress(UpdateProgressParams {
3540 heddle_session_id: opened.heddle_session_id.clone(),
3541 provider: Some("openai".to_string()),
3542 model: Some("gpt-5.4".to_string()),
3543 ..UpdateProgressParams::default()
3544 })
3545 .unwrap();
3546
3547 let report = runtime
3548 .reports
3549 .load(&opened.heddle_session_id)
3550 .unwrap()
3551 .unwrap();
3552 let expected_segment = format!("{}-seg-2", opened.heddle_session_id);
3553 assert_eq!(
3554 report.heddle_segment_id.as_deref(),
3555 Some(expected_segment.as_str())
3556 );
3557 }
3558
3559 #[test]
3560 fn blank_agent_model_hint_falls_through_to_detected_model_without_segment_rotation() {
3561 let (_temp, repo) = init_repo();
3562 let user_config = UserConfig::default();
3563 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3564 let blank_model_env = BTreeMap::from([
3565 ("HEDDLE_AGENT_PROVIDER".to_string(), "anthropic".to_string()),
3566 ("HEDDLE_AGENT_MODEL".to_string(), String::new()),
3567 ]);
3568
3569 let opened = runtime
3570 .open_session(OpenSessionParams {
3571 harness: Some("claude-code".to_string()),
3572 env_hints: blank_model_env.clone(),
3573 probe_metadata: BTreeMap::from([
3574 ("session_id".to_string(), "claude-sess-blank".to_string()),
3575 ("model".to_string(), "claude-opus-4-8[1m]".to_string()),
3576 ]),
3577 ..OpenSessionParams::default()
3578 })
3579 .unwrap();
3580 assert_eq!(opened.model.as_deref(), Some("claude-opus-4-8[1m]"));
3581
3582 let original_segment = opened.heddle_segment_id.clone();
3583 runtime
3584 .update_progress(UpdateProgressParams {
3585 heddle_session_id: opened.heddle_session_id.clone(),
3586 env_hints: blank_model_env,
3587 probe_metadata: BTreeMap::from([
3588 ("session_id".to_string(), "claude-sess-blank".to_string()),
3589 ("model".to_string(), "claude-opus-4-8[1m]".to_string()),
3590 ]),
3591 ..UpdateProgressParams::default()
3592 })
3593 .unwrap();
3594
3595 let report = runtime
3596 .reports
3597 .load(&opened.heddle_session_id)
3598 .unwrap()
3599 .unwrap();
3600 assert_eq!(report.harness.model.as_deref(), Some("claude-opus-4-8[1m]"));
3601 assert_eq!(report.heddle_segment_id, original_segment);
3602 }
3603
3604 #[test]
3605 fn close_session_captures_changed_paths_from_status_and_hints() {
3606 let (temp, repo) = init_repo();
3607 let config = UserConfig::default();
3608 let mut runtime = HarnessBridgeRuntime::new(repo, config);
3609
3610 let opened = runtime
3611 .open_session(OpenSessionParams {
3612 harness: Some("codex".to_string()),
3613 provider: Some("openai".to_string()),
3614 model: Some("gpt-5.4".to_string()),
3615 ..OpenSessionParams::default()
3616 })
3617 .unwrap();
3618 std::fs::write(temp.path().join("src.txt"), "hello\n").unwrap();
3619 runtime
3620 .record_touched_paths(RecordTouchedPathsParams {
3621 heddle_session_id: opened.heddle_session_id.clone(),
3622 paths: vec!["src.txt".to_string(), "notes.md".to_string()],
3623 })
3624 .unwrap();
3625 let closed = runtime
3626 .close_session(CloseSessionParams {
3627 heddle_session_id: opened.heddle_session_id.clone(),
3628 outcome: Some("completed".to_string()),
3629 ..CloseSessionParams::default()
3630 })
3631 .unwrap();
3632 let report = runtime
3633 .reports
3634 .load(&opened.heddle_session_id)
3635 .unwrap()
3636 .unwrap();
3637 assert!(closed.changed_paths.iter().any(|path| path == "src.txt"));
3638 assert!(!closed.changed_paths.iter().any(|path| path == "notes.md"));
3639 assert!(report.touched_paths.iter().any(|path| path == "src.txt"));
3640 assert!(report.touched_paths.iter().any(|path| path == "notes.md"));
3641 assert_eq!(
3642 closed.diff_summary.changed_file_count,
3643 closed.changed_paths.len() as u32
3644 );
3645 }
3646
3647 #[test]
3648 fn flush_reports_moves_pending_report_to_outbox() {
3649 let (_temp, repo) = init_repo();
3650 let user_config = UserConfig::default();
3651 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3652
3653 let opened = runtime
3654 .open_session(OpenSessionParams {
3655 harness: Some("codex".to_string()),
3656 provider: Some("openai".to_string()),
3657 model: Some("gpt-5.4".to_string()),
3658 ..OpenSessionParams::default()
3659 })
3660 .unwrap();
3661 let flushed = runtime
3662 .flush_reports(FlushReportsParams {
3663 heddle_session_id: Some(opened.heddle_session_id.clone()),
3664 })
3665 .unwrap();
3666 assert_eq!(flushed.flushed, 1);
3667 let report = runtime
3668 .reports
3669 .load(&opened.heddle_session_id)
3670 .unwrap()
3671 .unwrap();
3672 assert!(!report.pending_flush);
3673 assert_eq!(report.report_flush_state.as_deref(), Some("queued-local"));
3674 assert!(runtime.reports.outbox_path().exists());
3675 }
3676
3677 #[test]
3678 fn explicit_overrides_beat_fingerprint_and_user_defaults() {
3679 let (_temp, repo) = init_repo();
3680 let mut user_config = UserConfig::default();
3681 user_config.harness.harnesses.insert(
3682 "codex".to_string(),
3683 UserHarnessOverride {
3684 provider: Some("openai".to_string()),
3685 model: Some("gpt-default".to_string()),
3686 thinking_level: Some("medium".to_string()),
3687 policy: Some("default".to_string()),
3688 },
3689 );
3690 let identity = resolve_identity(
3691 &repo,
3692 &user_config,
3693 IdentityHints {
3694 harness: Some("codex".to_string()),
3695 provider: Some("openai".to_string()),
3696 model: Some("gpt-5.4".to_string()),
3697 thinking_level: Some("high".to_string()),
3698 policy: Some("custom".to_string()),
3699 probe: HarnessProbeResult::default(),
3700 },
3701 )
3702 .unwrap();
3703 assert_eq!(identity.model.as_deref(), Some("gpt-5.4"));
3704 assert_eq!(identity.thinking_level.as_deref(), Some("high"));
3705 assert_eq!(identity.policy.as_deref(), Some("custom"));
3706 }
3707
3708 #[test]
3709 fn transcript_mode_defaults_to_off_and_keeps_refs_empty() {
3710 let (_temp, repo) = init_repo();
3711 let user_config = UserConfig::default();
3712 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3713
3714 let opened = runtime
3715 .open_session(OpenSessionParams {
3716 harness: Some("codex".to_string()),
3717 provider: Some("openai".to_string()),
3718 model: Some("gpt-5.4".to_string()),
3719 ..OpenSessionParams::default()
3720 })
3721 .unwrap();
3722 let report = runtime
3723 .reports
3724 .load(&opened.heddle_session_id)
3725 .unwrap()
3726 .unwrap();
3727 assert_eq!(report.transcript_mode, "off");
3728 assert!(report.transcript_refs.is_empty());
3729 }
3730
3731 #[test]
3732 fn codex_thread_probe_reattaches_same_actor() {
3733 let (_temp, repo) = init_repo();
3734 let user_config = UserConfig::default();
3735 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3736
3737 let first = runtime
3738 .open_session(OpenSessionParams {
3739 harness: Some("codex".to_string()),
3740 probe_metadata: BTreeMap::from([
3741 ("thread_id".to_string(), "thr_123".to_string()),
3742 ("client_name".to_string(), "codex-tui".to_string()),
3743 ]),
3744 ..OpenSessionParams::default()
3745 })
3746 .unwrap();
3747 let second = runtime
3748 .open_session(OpenSessionParams {
3749 harness: Some("codex".to_string()),
3750 probe_metadata: BTreeMap::from([
3751 ("thread_id".to_string(), "thr_123".to_string()),
3752 ("client_name".to_string(), "codex-tui".to_string()),
3753 ]),
3754 ..OpenSessionParams::default()
3755 })
3756 .unwrap();
3757
3758 assert_eq!(first.agent_session_id, second.agent_session_id);
3759 assert_eq!(first.heddle_session_id, second.heddle_session_id);
3760 }
3761
3762 #[test]
3763 fn opencode_child_session_creates_distinct_actor_with_parent_key() {
3764 let (_temp, repo) = init_repo();
3765 let user_config = UserConfig::default();
3766 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3767
3768 let root = runtime
3769 .open_session(OpenSessionParams {
3770 harness: Some("opencode".to_string()),
3771 probe_metadata: BTreeMap::from([("session_id".to_string(), "root-1".to_string())]),
3772 ..OpenSessionParams::default()
3773 })
3774 .unwrap();
3775 let child = runtime
3776 .open_session(OpenSessionParams {
3777 harness: Some("opencode".to_string()),
3778 probe_metadata: BTreeMap::from([
3779 ("session_id".to_string(), "child-1".to_string()),
3780 ("parent_id".to_string(), "root-1".to_string()),
3781 ]),
3782 ..OpenSessionParams::default()
3783 })
3784 .unwrap();
3785
3786 assert_ne!(root.agent_session_id, child.agent_session_id);
3787 let report = runtime
3788 .reports
3789 .load(&child.heddle_session_id)
3790 .unwrap()
3791 .unwrap();
3792 assert_eq!(
3793 report.native_parent_actor_key.as_deref(),
3794 Some("opencode:session:root-1")
3795 );
3796 }
3797
3798 #[test]
3799 fn claude_resume_with_new_session_id_does_not_steal_existing_actor() {
3800 let (_temp, repo) = init_repo();
3801 let user_config = UserConfig::default();
3802 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3803
3804 let first = runtime
3805 .open_session(OpenSessionParams {
3806 harness: Some("claude-code".to_string()),
3807 probe_metadata: BTreeMap::from([
3808 ("session_id".to_string(), "sess-old".to_string()),
3809 (
3810 "transcript_path".to_string(),
3811 "/tmp/claude/session-a.jsonl".to_string(),
3812 ),
3813 ]),
3814 ..OpenSessionParams::default()
3815 })
3816 .unwrap();
3817 let resumed = runtime
3818 .open_session(OpenSessionParams {
3819 harness: Some("claude-code".to_string()),
3820 probe_metadata: BTreeMap::from([
3821 ("session_id".to_string(), "sess-new".to_string()),
3822 (
3823 "transcript_path".to_string(),
3824 "/tmp/claude/session-a.jsonl".to_string(),
3825 ),
3826 ]),
3827 ..OpenSessionParams::default()
3828 })
3829 .unwrap();
3830
3831 assert_ne!(first.agent_session_id, resumed.agent_session_id);
3832 assert_ne!(first.heddle_session_id, resumed.heddle_session_id);
3833 }
3834
3835 #[test]
3836 fn explicit_claude_harness_beats_generic_session_id_probe_match() {
3837 let (_temp, repo) = init_repo();
3838 let user_config = UserConfig::default();
3839 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
3840
3841 let opened = runtime
3842 .open_session(OpenSessionParams {
3843 harness: Some("claude-code".to_string()),
3844 probe_metadata: BTreeMap::from([
3845 ("session_id".to_string(), "claude-sess-1".to_string()),
3846 ("hook_event".to_string(), "SubagentStop".to_string()),
3847 ]),
3848 ..OpenSessionParams::default()
3849 })
3850 .unwrap();
3851 let report = runtime
3852 .reports
3853 .load(&opened.heddle_session_id)
3854 .unwrap()
3855 .unwrap();
3856 assert_eq!(
3857 report.native_actor_key.as_deref(),
3858 Some("claude-code:session:claude-sess-1")
3859 );
3860 assert_eq!(report.harness.harness.as_deref(), Some("claude-code"));
3861 }
3862
3863 #[test]
3864 fn same_native_actor_key_reuses_existing_actor_after_tentative_session_creation() {
3865 let (_temp, repo) = init_repo();
3866 let user_config = UserConfig::default();
3867 let runtime = HarnessBridgeRuntime::new(repo, user_config);
3868 let principal = runtime.repo.get_principal().unwrap();
3869 let mut sessions = SessionManager::new(runtime.repo.root());
3870 let existing_session = sessions
3871 .start_session(
3872 principal.clone(),
3873 "anthropic".to_string(),
3874 "claude-opus-4-7[1m]".to_string(),
3875 None,
3876 )
3877 .unwrap();
3878 let tentative_session = sessions
3879 .start_session(
3880 principal,
3881 "anthropic".to_string(),
3882 "claude-opus-4-7[1m]".to_string(),
3883 None,
3884 )
3885 .unwrap();
3886
3887 let registry = AgentRegistry::new(runtime.repo.heddle_dir());
3888 let existing_entry = registry
3889 .create_generated_entry(|session_id| {
3890 Ok(AgentEntry {
3891 session_id: session_id.to_string(),
3892 client_instance_id: None,
3893 native_actor_key: Some(
3894 "claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
3895 ),
3896 native_parent_actor_key: None,
3897 native_instance_key: Some(
3898 "claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
3899 ),
3900 heddle_session_id: Some(existing_session.id.clone()),
3901 thread_id: None,
3902 thread: "detached".to_string(),
3903 pid: Some(std::process::id()),
3904 boot_id: None,
3905 liveness_path: None,
3906 heartbeat_at: Some(Utc::now()),
3907 anchor_state: None,
3908 anchor_root: None,
3909 reservation_token: Some(objects::store::generate_agent_id()),
3910 path: Some(runtime.repo.root().to_path_buf()),
3911 base_state: String::new(),
3912 started_at: Utc::now(),
3913 provider: Some("anthropic".to_string()),
3914 model: Some("claude-opus-4-7[1m]".to_string()),
3915 harness: Some("claude-code".to_string()),
3916 thinking_level: None,
3917 usage_summary: AgentUsageSummary::default(),
3918 last_progress_at: None,
3919 report_flush_state: Some("pending-local".to_string()),
3920 attach_reason: None,
3921 task_assignment_id: None,
3922 attach_precedence: vec![],
3923 winning_attach_rule: None,
3924 probe_source: Some("hook_payload".to_string()),
3925 probe_confidence: Some(1.0),
3926 status: AgentStatus::Active,
3927 completed_at: None,
3928 context_queries: vec![],
3929 })
3930 })
3931 .unwrap();
3932
3933 let probe = HarnessProbeResult {
3934 harness: Some("claude-code".to_string()),
3935 provider: Some("anthropic".to_string()),
3936 model: Some("claude-opus-4-7[1m]".to_string()),
3937 native_actor_key: Some(
3938 "claude-code:session:282396d3-554a-48aa-a9a8-8d1f0bd15fa5".to_string(),
3939 ),
3940 native_instance_key: Some(
3941 "claude-code:transcript:/tmp/claude/282396d3.jsonl".to_string(),
3942 ),
3943 probe_source: Some("hook_payload".to_string()),
3944 confidence: Some(1.0),
3945 ..HarnessProbeResult::default()
3946 };
3947 let identity = ResolvedIdentity {
3948 harness: Some("claude-code".to_string()),
3949 provider: Some("anthropic".to_string()),
3950 model: Some("claude-opus-4-7[1m]".to_string()),
3951 thinking_level: None,
3952 policy: None,
3953 };
3954 let mut attach = ResolvedAttachment {
3955 target: AttachTarget::CreateNew {
3956 _because_claimed: false,
3957 },
3958 matched_entry: None,
3959 attach_reason:
3960 "started new Heddle session because no compatible native actor match was found"
3961 .to_string(),
3962 precedence: vec!["native-actor-key:miss".to_string()],
3963 winning_rule: "create-new-session".to_string(),
3964 };
3965
3966 let resolved_entry = runtime
3967 .ensure_registry_entry(RegistryEntryRequest {
3968 heddle_session_id: &tentative_session.id,
3969 thread_name: None,
3970 thread_id: None,
3971 identity: &identity,
3972 probe: &probe,
3973 attach: &attach,
3974 client_instance_id: None,
3975 requested_entry: None,
3976 })
3977 .unwrap();
3978 assert_eq!(resolved_entry.session_id, existing_entry.session_id);
3979 assert_eq!(
3980 resolved_entry.heddle_session_id.as_deref(),
3981 Some(existing_session.id.as_str())
3982 );
3983
3984 let (canonical_session, owns_session) = runtime
3985 .reuse_canonical_actor_session(
3986 &mut sessions,
3987 CanonicalActorSessionRequest {
3988 tentative_session: tentative_session.clone(),
3989 tentative_owns_session: true,
3990 entry: &resolved_entry,
3991 probe: &probe,
3992 attach: &mut attach,
3993 },
3994 )
3995 .unwrap();
3996 assert_eq!(canonical_session.id, existing_session.id);
3997 assert!(!owns_session);
3998 assert!(
3999 attach
4000 .precedence
4001 .iter()
4002 .any(|step| step.starts_with("post-create-native-actor-key:"))
4003 );
4004 assert_eq!(attach.winning_rule, "native-actor-key-post-create");
4005 assert!(
4006 !sessions
4007 .get_session(&tentative_session.id)
4008 .unwrap()
4009 .unwrap()
4010 .is_active()
4011 );
4012 }
4013
4014 #[test]
4015 fn close_session_does_not_blame_preexisting_dirty_worktree() {
4016 let (temp, repo) = init_repo();
4017 std::fs::write(temp.path().join("preexisting.txt"), "already dirty\n").unwrap();
4018 let user_config = UserConfig::default();
4019 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
4020
4021 let opened = runtime
4022 .open_session(OpenSessionParams {
4023 harness: Some("claude-code".to_string()),
4024 provider: Some("anthropic".to_string()),
4025 model: Some("claude-opus-4-7[1m]".to_string()),
4026 ..OpenSessionParams::default()
4027 })
4028 .unwrap();
4029 let closed = runtime
4030 .close_session(CloseSessionParams {
4031 heddle_session_id: opened.heddle_session_id.clone(),
4032 outcome: Some("completed".to_string()),
4033 ..CloseSessionParams::default()
4034 })
4035 .unwrap();
4036 let report = runtime
4037 .reports
4038 .load(&opened.heddle_session_id)
4039 .unwrap()
4040 .unwrap();
4041
4042 assert!(
4043 report
4044 .worktree_changes_at_open
4045 .iter()
4046 .any(|change| change.path == "preexisting.txt")
4047 );
4048 assert!(
4049 !closed
4050 .changed_paths
4051 .iter()
4052 .any(|path| path == "preexisting.txt")
4053 );
4054 assert_eq!(closed.diff_summary.changed_file_count, 0);
4055 }
4056
4057 #[test]
4058 fn timeline_state_delta_paths_ignore_uncaptured_worktree_changes() {
4059 let (temp, repo) = init_repo();
4060 let repo_root = repo.root().to_path_buf();
4061 std::fs::write(repo_root.join("tracked.txt"), b"one\n").unwrap();
4062 let before = repo.snapshot(Some("seed".into()), None).unwrap();
4063 std::fs::write(repo_root.join("tracked.txt"), b"two\n").unwrap();
4064 let after = repo.snapshot(Some("advance".into()), None).unwrap();
4065 std::fs::write(temp.path().join("ambient.txt"), b"not in the state delta\n").unwrap();
4066
4067 assert_eq!(
4068 changed_paths_between_states(&repo, before.change_id, after.change_id).unwrap(),
4069 vec!["tracked.txt"]
4070 );
4071 }
4072
4073 #[test]
4074 fn relay_claude_stop_captures_state_with_agent_attribution() {
4075 let (temp, repo) = init_repo();
4076 let repo_root = repo.root().to_path_buf();
4077
4078 std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
4080 let _ = repo.snapshot(Some("seed".into()), None).unwrap();
4081
4082 std::fs::write(repo_root.join("seed.txt"), b"hello, heddle").unwrap();
4084
4085 drop(repo);
4086
4087 let fresh_repo = Repository::open(temp.path()).unwrap();
4088 let user_config = UserConfig {
4089 principal: Some(crate::config::UserPrincipalConfig {
4090 name: "Ada Lovelace".to_string(),
4091 email: "ada@example.com".to_string(),
4092 }),
4093 ..UserConfig::default()
4094 };
4095 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, user_config);
4096 let payload = serde_json::json!({
4097 "session_id": "claude-sess-123",
4098 "transcript_path": "/tmp/claude/x.jsonl",
4099 "model": {
4100 "id": "claude-opus-4-7",
4101 "display_name": "Claude Opus 4.7",
4102 },
4103 "message": "hook-driven capture test",
4104 "hook_event_name": "Stop",
4105 });
4106 relay_claude(&mut runtime, "Stop", &payload).unwrap();
4107 drop(runtime);
4108
4109 let verify = Repository::open(temp.path()).unwrap();
4110 let head_id = verify.head().unwrap().expect("HEAD after Stop capture");
4111 let state = verify
4112 .store()
4113 .get_state(&head_id)
4114 .unwrap()
4115 .expect("state for HEAD");
4116 let agent = state.attribution.agent.expect("agent attribution on state");
4117 assert_eq!(agent.provider, "anthropic");
4118 assert_eq!(agent.model, "Claude Opus 4.7");
4119 assert_eq!(
4120 state.intent.as_deref(),
4121 Some("hook-driven capture test"),
4122 "intent should be pulled from payload message",
4123 );
4124 }
4125
4126 #[test]
4127 fn relay_claude_stop_is_idempotent_when_clean() {
4128 let (temp, repo) = init_repo();
4129 let repo_root = repo.root().to_path_buf();
4130 std::fs::write(repo_root.join("seed.txt"), b"hello").unwrap();
4131 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4132 drop(repo);
4133
4134 let fresh_repo = Repository::open(temp.path()).unwrap();
4135 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4136 let payload = serde_json::json!({
4137 "session_id": "claude-sess-clean",
4138 "model": {"id": "claude-sonnet-4-6"},
4139 });
4140 relay_claude(&mut runtime, "Stop", &payload).unwrap();
4141 drop(runtime);
4142
4143 let verify = Repository::open(temp.path()).unwrap();
4144 let head_id = verify.head().unwrap().expect("HEAD preserved");
4145 assert_eq!(
4146 head_id, seed.change_id,
4147 "no change expected when worktree is clean",
4148 );
4149 }
4150
4151 #[test]
4152 fn relay_claude_pre_tool_use_ignores_non_file_tool() {
4153 let (temp, repo) = init_repo();
4154 drop(repo);
4155 let fresh_repo = Repository::open(temp.path()).unwrap();
4156 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4157 let payload = serde_json::json!({
4158 "session_id": "claude-sess-bash",
4159 "tool_name": "Bash",
4160 "tool_input": {"command": "ls"},
4161 });
4162 relay_claude(&mut runtime, "PreToolUse", &payload).unwrap();
4164 }
4165
4166 #[test]
4167 fn relay_opencode_tool_execute_before_records_timeline_step() {
4168 let (_temp, repo) = init_repo();
4169 let root = repo.root().to_path_buf();
4170 std::fs::write(root.join("seed.txt"), b"hello").unwrap();
4171 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4172 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4173 let payload = opencode_tool_payload("call-1");
4174
4175 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4176
4177 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4178 let view = TimelineView::rebuild(&store).unwrap();
4179 let steps = view.steps_for_thread("main");
4180 assert_eq!(steps.len(), 1);
4181 let step = steps[0];
4182 assert_eq!(step.native.as_ref().unwrap().harness, "opencode");
4183 assert_eq!(step.native.as_ref().unwrap().tool_call_id, "call-1");
4184 assert_eq!(step.tool_name.as_deref(), Some("bash"));
4185 assert_eq!(step.before_state, Some(seed.change_id));
4186 assert!(step.status.is_none());
4187 assert!(step.payload_summary.as_deref().unwrap().contains("call-1"));
4188 assert!(step.payload_hash.is_some());
4189 assert!(
4190 step.labels
4191 .contains(&TimelineLabel::ExternalSideEffectsUnknown)
4192 );
4193 }
4194
4195 #[test]
4196 fn relay_opencode_tool_execute_after_captures_dirty_worktree() {
4197 let (_temp, repo) = init_repo();
4198 let root = repo.root().to_path_buf();
4199 std::fs::write(root.join("tracked.txt"), b"one\n").unwrap();
4200 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4201 let user_config = UserConfig {
4202 principal: Some(crate::config::UserPrincipalConfig {
4203 name: "Ada Lovelace".to_string(),
4204 email: "ada@example.com".to_string(),
4205 }),
4206 ..UserConfig::default()
4207 };
4208 let mut runtime = HarnessBridgeRuntime::new(repo, user_config);
4209 let payload = opencode_tool_payload("call-2");
4210
4211 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4212 std::fs::write(root.join("tracked.txt"), b"two\n").unwrap();
4213 relay_opencode(&mut runtime, "tool.execute.after", &payload).unwrap();
4214
4215 let head = runtime.repo.head().unwrap().expect("capture advanced HEAD");
4216 assert_ne!(head, seed.change_id);
4217 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4218 let view = TimelineView::rebuild(&store).unwrap();
4219 let steps = view.steps_for_thread("main");
4220 assert_eq!(steps.len(), 1, "before/after should merge by native id");
4221 let step = steps[0];
4222 assert_eq!(step.operation_ids.len(), 2);
4223 assert_eq!(step.status, Some(TimelineToolCallStatus::Succeeded));
4224 assert_eq!(step.before_state, Some(seed.change_id));
4225 assert_eq!(step.after_state, Some(head));
4226 assert_eq!(step.capture_state, Some(head));
4227 assert_eq!(step.changed, Some(true));
4228 assert!(step.touched_paths.contains(&"tracked.txt".to_string()));
4229 assert!(step.labels.contains(&TimelineLabel::RepoReversible));
4230 assert!(
4231 step.labels
4232 .contains(&TimelineLabel::ExternalSideEffectsUnknown)
4233 );
4234 assert!(!step.payload_summary.as_deref().unwrap().contains("SECRET"));
4235 assert!(step.payload_hash.is_some());
4236 }
4237
4238 #[cfg(unix)]
4239 #[test]
4240 fn relay_opencode_tool_execute_after_records_capture_failed_without_ambient_paths() {
4241 let (_temp, repo) = init_repo();
4242 let root = repo.root().to_path_buf();
4243 std::fs::write(root.join("seed.txt"), b"seed\n").unwrap();
4244 let seed = repo.snapshot(Some("seed".into()), None).unwrap();
4245 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4246 let mut payload = opencode_tool_payload("call-capture-failed");
4247 payload["tool"]["input"]["file_path"] = serde_json::json!("hinted.txt");
4248 let hooks_dir = root.join(".heddle/hooks");
4249 std::fs::create_dir_all(&hooks_dir).unwrap();
4250 let hook_path = hooks_dir.join("pre-snapshot");
4251 std::fs::write(&hook_path, "#!/bin/sh\nexit 1\n").unwrap();
4252 let mut perms = std::fs::metadata(&hook_path).unwrap().permissions();
4253 perms.set_mode(0o755);
4254 std::fs::set_permissions(&hook_path, perms).unwrap();
4255
4256 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4257 std::fs::write(root.join("ambient.txt"), b"dirty but uncaptured\n").unwrap();
4258 relay_opencode(&mut runtime, "tool.execute.after", &payload).unwrap();
4259
4260 assert_eq!(
4261 runtime.repo.head().unwrap(),
4262 Some(seed.change_id),
4263 "capture failure must not advance HEAD"
4264 );
4265 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4266 let view = TimelineView::rebuild(&store).unwrap();
4267 let steps = view.steps_for_thread("main");
4268 assert_eq!(steps.len(), 1, "before/after should merge by native id");
4269 let step = steps[0];
4270 assert_eq!(step.operation_ids.len(), 2);
4271 assert_eq!(step.before_state, Some(seed.change_id));
4272 assert_eq!(step.after_state, Some(seed.change_id));
4273 assert_eq!(step.capture_state, None);
4274 assert_eq!(step.changed, Some(false));
4275 assert!(step.labels.contains(&TimelineLabel::CaptureFailed));
4276 assert!(
4277 !step.labels.contains(&TimelineLabel::RepoReversible),
4278 "failed captures are not repo-reversible"
4279 );
4280 assert_eq!(step.touched_paths, vec!["hinted.txt"]);
4281 }
4282
4283 #[test]
4284 fn relay_opencode_tool_execute_missing_tool_id_does_not_fail_or_record_timeline() {
4285 let (_temp, repo) = init_repo();
4286 let root = repo.root().to_path_buf();
4287 std::fs::write(root.join("seed.txt"), b"hello").unwrap();
4288 let _ = repo.snapshot(Some("seed".into()), None).unwrap();
4289 let mut runtime = HarnessBridgeRuntime::new(repo, UserConfig::default());
4290 let payload = serde_json::json!({
4291 "sessionID": "opencode-session",
4292 "model": "gpt-5.4",
4293 "provider": "openai",
4294 "tool": {"name": "bash"},
4295 });
4296
4297 relay_opencode(&mut runtime, "tool.execute.before", &payload).unwrap();
4298
4299 let store = TimelineStore::open(runtime.repo.heddle_dir()).unwrap();
4300 let view = TimelineView::rebuild(&store).unwrap();
4301 assert!(view.steps_for_thread("main").is_empty());
4302 let report_count = std::fs::read_dir(root.join(".heddle/state/session-reports"))
4303 .unwrap()
4304 .count();
4305 assert!(
4306 report_count > 0,
4307 "session progress should still be recorded"
4308 );
4309 }
4310
4311 fn opencode_tool_payload(call_id: &str) -> Value {
4312 serde_json::json!({
4313 "sessionID": "opencode-session",
4314 "messageID": "message-1",
4315 "toolCallID": call_id,
4316 "model": "gpt-5.4",
4317 "provider": "openai",
4318 "tool": {
4319 "name": "bash",
4320 "input": {
4321 "command": "echo SECRET",
4322 "file_path": "tracked.txt"
4323 }
4324 },
4325 "status": "success"
4326 })
4327 }
4328
4329 #[test]
4330 fn relay_claude_subagent_start_creates_child_entry_with_parent_key() {
4331 let (temp, repo) = init_repo();
4332 drop(repo);
4333 let fresh_repo = Repository::open(temp.path()).unwrap();
4334 let mut runtime = HarnessBridgeRuntime::new(fresh_repo, UserConfig::default());
4335 let payload = serde_json::json!({
4336 "session_id": "parent-claude-sess",
4337 "agent_id": "child-subagent-xyz",
4338 "model": {"id": "claude-sonnet-4-6"},
4339 });
4340 relay_claude(&mut runtime, "SubagentStart", &payload).unwrap();
4341 drop(runtime);
4342
4343 let verify = Repository::open(temp.path()).unwrap();
4344 let registry = AgentRegistry::new(verify.heddle_dir());
4345 let child = registry
4346 .find_active_by_native_actor_key("claude-code:agent:child-subagent-xyz")
4347 .unwrap()
4348 .expect("subagent AgentEntry should exist after SubagentStart");
4349 assert_eq!(
4350 child.native_parent_actor_key.as_deref(),
4351 Some("claude-code:session:parent-claude-sess"),
4352 "subagent must carry parent session linkage",
4353 );
4354 assert_eq!(child.status, AgentStatus::Active);
4355 }
4356
4357 #[test]
4358 fn relay_claude_subagent_stop_marks_child_entry_complete() {
4359 let (temp, repo) = init_repo();
4360 let repo_root = repo.root().to_path_buf();
4361 drop(repo);
4362
4363 let fresh = Repository::open(temp.path()).unwrap();
4365 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4366 let start_payload = serde_json::json!({
4367 "session_id": "parent-sess",
4368 "agent_id": "worker-1",
4369 "model": {"id": "claude-sonnet-4-6"},
4370 });
4371 relay_claude(&mut runtime, "SubagentStart", &start_payload).unwrap();
4372 drop(runtime);
4373
4374 std::fs::write(
4376 repo_root.join("child-output.txt"),
4377 b"subagent produced this",
4378 )
4379 .unwrap();
4380
4381 let fresh = Repository::open(temp.path()).unwrap();
4382 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4383 let stop_payload = serde_json::json!({
4384 "session_id": "parent-sess",
4385 "agent_id": "worker-1",
4386 "model": {
4387 "id": "claude-sonnet-4-6",
4388 "display_name": "Claude Sonnet 4.6",
4389 },
4390 });
4391 relay_claude(&mut runtime, "SubagentStop", &stop_payload).unwrap();
4392 drop(runtime);
4393
4394 let verify = Repository::open(temp.path()).unwrap();
4395 let registry = AgentRegistry::new(verify.heddle_dir());
4396 let child = registry
4397 .list()
4398 .unwrap()
4399 .into_iter()
4400 .find(|e| e.native_actor_key.as_deref() == Some("claude-code:agent:worker-1"))
4401 .expect("child entry should still exist");
4402 assert_eq!(
4403 child.status,
4404 AgentStatus::Complete,
4405 "SubagentStop should mark the child entry Complete",
4406 );
4407 }
4408
4409 #[test]
4410 fn relay_claude_user_prompt_submit_rotates_segment() {
4411 let (temp, repo) = init_repo();
4412 drop(repo);
4413
4414 let fresh = Repository::open(temp.path()).unwrap();
4415 let mut runtime = HarnessBridgeRuntime::new(fresh, UserConfig::default());
4416 let session_payload = serde_json::json!({
4418 "session_id": "claude-prompt-sess",
4419 "model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
4420 });
4421 relay_claude(&mut runtime, "SessionStart", &session_payload).unwrap();
4422 let sessions_before = SessionManager::new(runtime.repo.root())
4423 .list_sessions(true)
4424 .unwrap();
4425 let initial_segments = sessions_before
4426 .iter()
4427 .find(|s| !s.segments.is_empty())
4428 .map(|s| s.segments.len())
4429 .unwrap_or(0);
4430
4431 let prompt_payload = serde_json::json!({
4433 "session_id": "claude-prompt-sess",
4434 "model": {"id": "claude-opus-4-7", "display_name": "Claude Opus 4.7"},
4435 "prompt": "write a new feature",
4436 });
4437 relay_claude(&mut runtime, "UserPromptSubmit", &prompt_payload).unwrap();
4438 drop(runtime);
4439
4440 let verify = Repository::open(temp.path()).unwrap();
4441 let sessions_after = SessionManager::new(verify.root())
4442 .list_sessions(true)
4443 .unwrap();
4444 let rotated = sessions_after
4445 .iter()
4446 .any(|s| s.segments.len() > initial_segments);
4447 assert!(
4448 rotated,
4449 "UserPromptSubmit must add at least one segment beyond the SessionStart baseline \
4450 (initial={initial_segments}, sessions_after={:?})",
4451 sessions_after
4452 .iter()
4453 .map(|s| (s.id.clone(), s.segments.len()))
4454 .collect::<Vec<_>>(),
4455 );
4456 }
4457}