Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27/// Default timeout for bridge calls (5 minutes).
28const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33    Arc::new(move |line: &str| {
34        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35        let mut stdout = std::io::stdout().lock();
36        stdout
37            .write_all(line.as_bytes())
38            .map_err(|e| format!("Bridge write error: {e}"))?;
39        stdout
40            .write_all(b"\n")
41            .map_err(|e| format!("Bridge write error: {e}"))?;
42        stdout
43            .flush()
44            .map_err(|e| format!("Bridge flush error: {e}"))?;
45        Ok(())
46    })
47}
48
49/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
50///
51/// The bridge sends requests to the host on stdout and receives responses
52/// on stdin. A background task reads stdin and dispatches responses to
53/// waiting callers by request ID. All stdout writes are serialized through
54/// a mutex to prevent interleaving.
55pub struct HostBridge {
56    next_id: AtomicU64,
57    /// Pending request waiters, keyed by JSON-RPC id.
58    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59    /// Whether the host has sent a cancel notification.
60    cancelled: Arc<AtomicBool>,
61    /// Wakes pending host calls when cancellation arrives.
62    cancel_notify: Arc<Notify>,
63    /// Transport writer used to send JSON-RPC to the host.
64    writer: HostBridgeWriter,
65    /// ACP session ID (set in ACP mode for session-scoped notifications).
66    session_id: std::sync::Mutex<String>,
67    /// Name of the currently executing Harn script (without .harn suffix).
68    script_name: std::sync::Mutex<String>,
69    /// Transcript injections queued by the host while a run is active.
70    queued_transcript_injections: HostBridgeInjectionState,
71    /// Host-triggered resume signal for daemon agents.
72    resume_requested: Arc<AtomicBool>,
73    /// Host-triggered skill-registry invalidation signal. Set when the
74    /// host sends a `skills/update` notification; consumed by the CLI
75    /// between runs (watch mode, long-running agents) to rebuild the
76    /// layered skill catalog from its current filesystem + host state.
77    skills_reload_requested: Arc<AtomicBool>,
78    /// Whether the current daemon-mode agent loop is blocked in idle wait.
79    daemon_idle: Arc<AtomicBool>,
80    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
81    /// finalize during this prompt. Read once by the ACP adapter when the
82    /// pipeline returns and populated by `host_agent_session_finalize`.
83    /// Pipelines that don't run an agent loop leave this `None`, in which
84    /// case the adapter falls back to `end_turn`.
85    prompt_stop_reason: std::sync::Mutex<Option<String>>,
86    /// Per-call visible assistant text state for call_progress notifications.
87    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88    /// Whether an LLM call's deltas should be exposed to end users while streaming.
89    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90    /// Optional in-process host-module backend used by `harn playground`.
91    in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95    module_path: PathBuf,
96    exported_functions: BTreeMap<String, Rc<VmClosure>>,
97    vm: Vm,
98}
99
100impl InProcessHost {
101    /// Box-pin'd to break the static recursion between the VM's hot dispatch
102    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
103    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
104    /// at this slow-path boundary keeps the recursion satisfied without
105    /// allocating per call in the hot per-callback path.
106    fn dispatch<'a>(
107        &'a self,
108        method: &'a str,
109        params: serde_json::Value,
110    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111        Box::pin(async move {
112            match method {
113                "builtin_call" => {
114                    let name = params
115                        .get("name")
116                        .and_then(|value| value.as_str())
117                        .unwrap_or_default();
118                    let args = params
119                        .get("args")
120                        .and_then(|value| value.as_array())
121                        .cloned()
122                        .unwrap_or_default()
123                        .into_iter()
124                        .map(|value| json_result_to_vm_value(&value))
125                        .collect::<Vec<_>>();
126                    self.invoke_export(name, &args).await
127                }
128                "host/tools/list" => self
129                    .invoke_optional_export("host_tools_list", &[])
130                    .await
131                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132                "session/request_permission" => self.request_permission(params).await,
133                other => Err(VmError::Runtime(format!(
134                    "playground host backend does not implement bridge method '{other}'"
135                ))),
136            }
137        })
138    }
139
140    async fn invoke_export(
141        &self,
142        name: &str,
143        args: &[VmValue],
144    ) -> Result<serde_json::Value, VmError> {
145        let Some(closure) = self.exported_functions.get(name) else {
146            return Err(VmError::Runtime(format!(
147                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148                self.module_path.display()
149            )));
150        };
151
152        let mut vm = self.vm.child_vm_for_host();
153        let result = vm.call_closure_pub(closure, args).await?;
154        Ok(crate::llm::vm_value_to_json(&result))
155    }
156
157    async fn invoke_optional_export(
158        &self,
159        name: &str,
160        args: &[VmValue],
161    ) -> Result<Option<serde_json::Value>, VmError> {
162        if !self.exported_functions.contains_key(name) {
163            return Ok(None);
164        }
165        self.invoke_export(name, args).await.map(Some)
166    }
167
168    async fn request_permission(
169        &self,
170        params: serde_json::Value,
171    ) -> Result<serde_json::Value, VmError> {
172        let Some(closure) = self.exported_functions.get("request_permission") else {
173            return Ok(serde_json::json!({ "granted": true }));
174        };
175
176        let tool_name = params
177            .get("toolCall")
178            .and_then(|tool_call| tool_call.get("toolName"))
179            .and_then(|value| value.as_str())
180            .unwrap_or_default();
181        let tool_args = params
182            .get("toolCall")
183            .and_then(|tool_call| tool_call.get("rawInput"))
184            .map(json_result_to_vm_value)
185            .unwrap_or(VmValue::Nil);
186        let full_payload = json_result_to_vm_value(&params);
187
188        let arg_count = closure.func.params.len();
189        let args = if arg_count >= 3 {
190            vec![
191                VmValue::String(Rc::from(tool_name.to_string())),
192                tool_args,
193                full_payload,
194            ]
195        } else if arg_count == 2 {
196            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
197        } else if arg_count == 1 {
198            vec![full_payload]
199        } else {
200            Vec::new()
201        };
202
203        let mut vm = self.vm.child_vm_for_host();
204        let result = vm.call_closure_pub(closure, &args).await?;
205        let payload = match result {
206            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
207            VmValue::String(reason) if !reason.is_empty() => {
208                serde_json::json!({ "granted": false, "reason": reason.to_string() })
209            }
210            other => {
211                let json = crate::llm::vm_value_to_json(&other);
212                if json
213                    .get("granted")
214                    .and_then(|value| value.as_bool())
215                    .is_some()
216                    || json.get("outcome").is_some()
217                {
218                    json
219                } else {
220                    serde_json::json!({ "granted": other.is_truthy() })
221                }
222            }
223        };
224        Ok(payload)
225    }
226}
227
228/// How a queued bridge injection is delivered into the agent loop.
229///
230/// `AuditOnly` was previously called `WaitForCompletion`; the rename is
231/// truth-in-advertising (harn#2212). These injections drain at
232/// `loop_exit`, *after* the last LLM call has returned — so they land in
233/// the transcript audit but are **never rendered into a model prompt**.
234/// Hosts that want the model to react to the reminder on its final
235/// iteration should use `FinishStep` instead, which drains at every
236/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
237#[derive(Clone, Copy, Debug, PartialEq, Eq)]
238pub enum QueuedUserMessageMode {
239    InterruptImmediate,
240    FinishStep,
241    AuditOnly,
242}
243
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245pub enum DeliveryCheckpoint {
246    InterruptImmediate,
247    AfterCurrentOperation,
248    EndOfInteraction,
249}
250
251impl QueuedUserMessageMode {
252    fn from_str(value: &str) -> Self {
253        match value {
254            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
255            "finish_step" | "after_current_operation" => Self::FinishStep,
256            // Unknown / missing modes fall through to the safest option:
257            // record for audit, do not preempt the loop. Pre-#2212 hosts
258            // that send `wait_for_completion` are caught by this arm —
259            // the canonical name is `audit_only` going forward.
260            _ => Self::AuditOnly,
261        }
262    }
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266pub struct QueuedUserMessage {
267    pub message_id: String,
268    pub content: String,
269    pub transcript_content: serde_json::Value,
270    pub mode: QueuedUserMessageMode,
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct QueuedReminder {
275    pub reminder: crate::llm::helpers::SystemReminder,
276    pub mode: QueuedUserMessageMode,
277}
278
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub enum QueuedTranscriptInjection {
281    User(QueuedUserMessage),
282    Reminder(QueuedReminder),
283}
284
285#[derive(Debug, Default)]
286struct QueuedTranscriptInjections {
287    queue: VecDeque<QueuedTranscriptInjection>,
288    revoked_user_message_ids: HashSet<String>,
289    delivered_user_message_ids: HashSet<String>,
290}
291
292#[derive(Clone, Debug, Default)]
293pub struct HostBridgeInjectionState {
294    inner: Arc<Mutex<QueuedTranscriptInjections>>,
295}
296
297#[derive(Clone, Copy, Debug, PartialEq, Eq)]
298pub enum PendingUserMessageMutationResult {
299    Mutated,
300    AlreadyRevoked,
301    AlreadyDelivered,
302    UnknownMessageId,
303}
304
305impl QueuedTranscriptInjection {
306    fn mode(&self) -> QueuedUserMessageMode {
307        match self {
308            Self::User(message) => message.mode,
309            Self::Reminder(reminder) => reminder.mode,
310        }
311    }
312}
313
314fn new_inject_message_id() -> String {
315    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
316}
317
318impl HostBridgeInjectionState {
319    pub fn new() -> Self {
320        Self::default()
321    }
322
323    pub async fn push_pending_user_message(
324        &self,
325        content: String,
326        transcript_content: serde_json::Value,
327        mode: &str,
328    ) -> String {
329        let message_id = new_inject_message_id();
330        self.inner
331            .lock()
332            .await
333            .queue
334            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
335                message_id: message_id.clone(),
336                content,
337                transcript_content,
338                mode: QueuedUserMessageMode::from_str(mode),
339            }));
340        message_id
341    }
342
343    pub async fn revoke_pending_user_message(
344        &self,
345        message_id: &str,
346    ) -> PendingUserMessageMutationResult {
347        let mut state = self.inner.lock().await;
348        let mut retained = VecDeque::new();
349        let mut revoked = false;
350        while let Some(injection) = state.queue.pop_front() {
351            match &injection {
352                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
353                    revoked = true;
354                }
355                _ => retained.push_back(injection),
356            }
357        }
358        state.queue = retained;
359        if revoked {
360            state
361                .revoked_user_message_ids
362                .insert(message_id.to_string());
363            return PendingUserMessageMutationResult::Mutated;
364        }
365        if state.revoked_user_message_ids.contains(message_id) {
366            PendingUserMessageMutationResult::AlreadyRevoked
367        } else if state.delivered_user_message_ids.contains(message_id) {
368            PendingUserMessageMutationResult::AlreadyDelivered
369        } else {
370            PendingUserMessageMutationResult::UnknownMessageId
371        }
372    }
373
374    pub async fn replace_pending_user_message(
375        &self,
376        message_id: &str,
377        content: String,
378        transcript_content: serde_json::Value,
379    ) -> PendingUserMessageMutationResult {
380        let mut state = self.inner.lock().await;
381        for injection in &mut state.queue {
382            if let QueuedTranscriptInjection::User(message) = injection {
383                if message.message_id == message_id {
384                    message.content = content;
385                    message.transcript_content = transcript_content;
386                    return PendingUserMessageMutationResult::Mutated;
387                }
388            }
389        }
390        if state.revoked_user_message_ids.contains(message_id) {
391            PendingUserMessageMutationResult::AlreadyRevoked
392        } else if state.delivered_user_message_ids.contains(message_id) {
393            PendingUserMessageMutationResult::AlreadyDelivered
394        } else {
395            PendingUserMessageMutationResult::UnknownMessageId
396        }
397    }
398
399    async fn push_session_reminder(&self, reminder: QueuedReminder) {
400        self.inner
401            .lock()
402            .await
403            .queue
404            .push_back(QueuedTranscriptInjection::Reminder(reminder));
405    }
406}
407
408fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
409    format!(
410        "{}: {}",
411        Code::ReminderUnknownOption.as_str(),
412        message.as_ref()
413    )
414}
415
416fn session_remind_shape_error(message: impl AsRef<str>) -> String {
417    format!(
418        "{}: {}",
419        Code::ReminderInvalidShape.as_str(),
420        message.as_ref()
421    )
422}
423
424fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
425    format!(
426        "{}: {}",
427        Code::ReminderUnknownPropagate.as_str(),
428        message.as_ref()
429    )
430}
431
432fn string_field(
433    map: &serde_json::Map<String, serde_json::Value>,
434    key: &str,
435    required: bool,
436) -> Result<Option<String>, String> {
437    match map.get(key) {
438        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
439            format!("`{key}` must be a non-empty string"),
440        )),
441        None | Some(serde_json::Value::Null) => Ok(None),
442        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
443            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
444        ),
445        Some(serde_json::Value::String(value)) => {
446            let trimmed = value.trim();
447            if trimmed.is_empty() {
448                Ok(None)
449            } else {
450                Ok(Some(trimmed.to_string()))
451            }
452        }
453        Some(other) => Err(session_remind_shape_error(format!(
454            "`{key}` must be a string, got {other}"
455        ))),
456    }
457}
458
459fn bool_field(
460    map: &serde_json::Map<String, serde_json::Value>,
461    key: &str,
462) -> Result<Option<bool>, String> {
463    match map.get(key) {
464        None | Some(serde_json::Value::Null) => Ok(None),
465        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
466        Some(other) => Err(session_remind_shape_error(format!(
467            "`{key}` must be a bool, got {other}"
468        ))),
469    }
470}
471
472fn int_field(
473    map: &serde_json::Map<String, serde_json::Value>,
474    key: &str,
475) -> Result<Option<i64>, String> {
476    match map.get(key) {
477        None | Some(serde_json::Value::Null) => Ok(None),
478        Some(serde_json::Value::Number(value)) => {
479            let Some(value) = value.as_i64() else {
480                return Err(session_remind_shape_error(format!(
481                    "`{key}` must be an integer"
482                )));
483            };
484            Ok(Some(value))
485        }
486        Some(other) => Err(session_remind_shape_error(format!(
487            "`{key}` must be an int, got {other}"
488        ))),
489    }
490}
491
492fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
493    let Some(value) = map.get("tags") else {
494        return Ok(Vec::new());
495    };
496    if value.is_null() {
497        return Ok(Vec::new());
498    }
499    let Some(values) = value.as_array() else {
500        return Err(session_remind_shape_error("`tags` must be a list"));
501    };
502    let mut tags = Vec::new();
503    for value in values {
504        let Some(tag) = value.as_str() else {
505            return Err(session_remind_shape_error(format!(
506                "`tags` entries must be strings, got {value}"
507            )));
508        };
509        let tag = tag.trim();
510        if tag.is_empty() {
511            return Err(session_remind_shape_error(
512                "`tags` entries must be non-empty strings",
513            ));
514        }
515        if !tags.iter().any(|existing| existing == tag) {
516            tags.push(tag.to_string());
517        }
518    }
519    Ok(tags)
520}
521
522fn session_remind_payload_from_value(
523    value: &serde_json::Value,
524) -> Result<crate::llm::helpers::SystemReminder, String> {
525    let Some(map) = value.as_object() else {
526        return Err(session_remind_shape_error(
527            "session/remind payload must be a reminder object",
528        ));
529    };
530    const ALLOWED: &[&str] = &[
531        "_meta",
532        "body",
533        "dedupe_key",
534        "fired_at_turn",
535        "id",
536        "preserve_on_compact",
537        "propagate",
538        "role_hint",
539        "source",
540        "tags",
541        "ttl_turns",
542    ];
543    let unknown = map
544        .keys()
545        .filter(|key| !ALLOWED.contains(&key.as_str()))
546        .map(String::as_str)
547        .collect::<Vec<_>>();
548    if !unknown.is_empty() {
549        if unknown.contains(&"content") {
550            return Err(session_remind_shape_error(
551                "session/remind expects reminder `body`, not user-message `content`",
552            ));
553        }
554        return Err(reminder_unknown_option_error(format!(
555            "unknown reminder option(s): {}",
556            unknown.join(", ")
557        )));
558    }
559    if let Some(meta) = map.get("_meta") {
560        if !meta.is_null() && !meta.is_object() {
561            return Err(session_remind_shape_error("`_meta` must be an object"));
562        }
563    }
564    let ttl_turns = int_field(map, "ttl_turns")?;
565    if let Some(value) = ttl_turns {
566        if value <= 0 {
567            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
568        }
569    }
570    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
571    if fired_at_turn < 0 {
572        return Err(session_remind_shape_error(
573            "`fired_at_turn` must be >= 0 when provided",
574        ));
575    }
576    match string_field(map, "source", false)?.as_deref() {
577        None | Some("bridge") => {}
578        Some(_) => {
579            return Err(session_remind_shape_error(
580                "`source` for session/remind must be bridge when provided",
581            ))
582        }
583    }
584    let propagate = match string_field(map, "propagate", false)?.as_deref() {
585        None => crate::llm::helpers::ReminderPropagate::Session,
586        Some("all") => crate::llm::helpers::ReminderPropagate::All,
587        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
588        Some("none") => crate::llm::helpers::ReminderPropagate::None,
589        Some(_) => {
590            return Err(reminder_unknown_propagate_error(
591                "`propagate` must be one of all, session, or none",
592            ))
593        }
594    };
595    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
596        None => crate::llm::helpers::ReminderRoleHint::System,
597        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
598        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
599        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
600        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
601        Some(_) => {
602            return Err(session_remind_shape_error(
603                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
604            ))
605        }
606    };
607    Ok(crate::llm::helpers::SystemReminder {
608        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
609        tags: tags_field(map)?,
610        dedupe_key: string_field(map, "dedupe_key", false)?,
611        ttl_turns,
612        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
613        propagate,
614        role_hint,
615        source: crate::llm::helpers::ReminderSource::Bridge,
616        body: string_field(map, "body", true)?.unwrap_or_default(),
617        fired_at_turn,
618        originating_agent_id: None,
619    })
620}
621
622/// Parse the params of a `session/cancel_tool_call` notification and fire
623/// the per-tool-call cancellation. Mirrors the shape used by the public
624/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
625/// regardless of which surface they came through.
626///
627/// Stdio bridges send this as a notification (no id, no response); the
628/// builtin handles request/response semantics in Harn. We deliberately
629/// drop malformed payloads silently because notifications can't reply
630/// with an error — logging would also be too noisy for partial drops.
631fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
632    let session_id = params
633        .get("sessionId")
634        .or_else(|| params.get("session_id"))
635        .and_then(|value| value.as_str())
636        .unwrap_or_default();
637    let call_id = params
638        .get("toolCallId")
639        .or_else(|| params.get("tool_call_id"))
640        .or_else(|| params.get("callId"))
641        .or_else(|| params.get("call_id"))
642        .and_then(|value| value.as_str())
643        .unwrap_or_default();
644    if call_id.is_empty() {
645        return;
646    }
647    let reason = params
648        .get("reason")
649        .and_then(|value| value.as_str())
650        .unwrap_or("host cancelled in-flight tool call")
651        .to_string();
652    let inject_reminder = params
653        .get("injectReminder")
654        .or_else(|| params.get("inject_reminder"))
655        .and_then(|value| value.as_bool())
656        .unwrap_or(true);
657    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
658}
659
660fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
661    let mode = QueuedUserMessageMode::from_str(
662        params
663            .get("mode")
664            .and_then(|value| value.as_str())
665            .unwrap_or("audit_only"),
666    );
667    let reminder_value = if let Some(reminder) = params.get("reminder") {
668        reminder.clone()
669    } else {
670        let Some(params) = params.as_object() else {
671            return Err(session_remind_shape_error(
672                "session/remind params must be an object",
673            ));
674        };
675        let mut reminder = params.clone();
676        reminder.remove("mode");
677        reminder.remove("sessionId");
678        reminder.remove("session_id");
679        serde_json::Value::Object(reminder)
680    };
681    Ok(QueuedReminder {
682        reminder: session_remind_payload_from_value(&reminder_value)?,
683        mode,
684    })
685}
686
687// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
688#[allow(clippy::new_without_default)]
689impl HostBridge {
690    /// Create a new bridge and spawn the stdin reader task.
691    ///
692    /// Must be called within a tokio LocalSet (uses spawn_local for the
693    /// stdin reader since it's single-threaded).
694    pub fn new() -> Self {
695        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
696            Arc::new(Mutex::new(HashMap::new()));
697        let cancelled = Arc::new(AtomicBool::new(false));
698        let cancel_notify = Arc::new(Notify::new());
699        let queued_transcript_injections = HostBridgeInjectionState::default();
700        let resume_requested = Arc::new(AtomicBool::new(false));
701        let skills_reload_requested = Arc::new(AtomicBool::new(false));
702        let daemon_idle = Arc::new(AtomicBool::new(false));
703
704        // Stdin reader: reads JSON-RPC lines and dispatches responses
705        let pending_clone = pending.clone();
706        let cancelled_clone = cancelled.clone();
707        let cancel_notify_clone = cancel_notify.clone();
708        let queued_clone = queued_transcript_injections.clone();
709        let resume_clone = resume_requested.clone();
710        let skills_reload_clone = skills_reload_requested.clone();
711        tokio::task::spawn_local(async move {
712            let stdin = tokio::io::stdin();
713            let reader = tokio::io::BufReader::new(stdin);
714            let mut lines = reader.lines();
715
716            while let Ok(Some(line)) = lines.next_line().await {
717                let line = line.trim().to_string();
718                if line.is_empty() {
719                    continue;
720                }
721
722                let msg: serde_json::Value = match serde_json::from_str(&line) {
723                    Ok(v) => v,
724                    Err(_) => continue,
725                };
726
727                // Notifications have no id; responses have one.
728                if msg.get("id").is_none() {
729                    if let Some(method) = msg["method"].as_str() {
730                        if method == "cancel" {
731                            cancelled_clone.store(true, Ordering::SeqCst);
732                            cancel_notify_clone.notify_waiters();
733                        } else if method == "agent/resume" {
734                            resume_clone.store(true, Ordering::SeqCst);
735                        } else if method == "skills/update" {
736                            skills_reload_clone.store(true, Ordering::SeqCst);
737                        } else if method == "session/remind" {
738                            let params = &msg["params"];
739                            if let Ok(reminder) = queued_session_remind_from_params(params) {
740                                queued_clone.push_session_reminder(reminder).await;
741                            }
742                        } else if method == "session/cancel_tool_call" {
743                            handle_cancel_tool_call_notification(&msg["params"]);
744                        }
745                    }
746                    continue;
747                }
748
749                if let Some(id) = msg["id"].as_u64() {
750                    let mut pending = pending_clone.lock().await;
751                    if let Some(sender) = pending.remove(&id) {
752                        let _ = sender.send(msg);
753                    }
754                }
755            }
756
757            // stdin closed: drop pending senders to cancel waiters.
758            let mut pending = pending_clone.lock().await;
759            pending.clear();
760        });
761
762        Self {
763            next_id: AtomicU64::new(1),
764            pending,
765            cancelled,
766            cancel_notify,
767            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
768            session_id: std::sync::Mutex::new(String::new()),
769            script_name: std::sync::Mutex::new(String::new()),
770            queued_transcript_injections,
771            resume_requested,
772            skills_reload_requested,
773            daemon_idle,
774            prompt_stop_reason: std::sync::Mutex::new(None),
775            visible_call_states: std::sync::Mutex::new(HashMap::new()),
776            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
777            in_process: None,
778        }
779    }
780
781    /// Create a bridge from pre-existing shared state.
782    ///
783    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
784    /// responsible for dispatching responses into `pending`.  This is used
785    /// by ACP mode which already has its own stdin reader.
786    pub fn from_parts(
787        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
788        cancelled: Arc<AtomicBool>,
789        stdout_lock: Arc<std::sync::Mutex<()>>,
790        start_id: u64,
791    ) -> Self {
792        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
793    }
794
795    pub fn from_parts_with_writer(
796        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
797        cancelled: Arc<AtomicBool>,
798        writer: HostBridgeWriter,
799        start_id: u64,
800    ) -> Self {
801        Self::from_parts_with_writer_and_cancel_notify(
802            pending,
803            cancelled,
804            Arc::new(Notify::new()),
805            writer,
806            start_id,
807        )
808    }
809
810    pub fn from_parts_with_writer_and_cancel_notify(
811        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
812        cancelled: Arc<AtomicBool>,
813        cancel_notify: Arc<Notify>,
814        writer: HostBridgeWriter,
815        start_id: u64,
816    ) -> Self {
817        Self::from_parts_with_writer_cancel_notify_and_injection_state(
818            pending,
819            cancelled,
820            cancel_notify,
821            writer,
822            start_id,
823            None,
824        )
825    }
826
827    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
828        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
829        cancelled: Arc<AtomicBool>,
830        cancel_notify: Arc<Notify>,
831        writer: HostBridgeWriter,
832        start_id: u64,
833        injection_state: Option<HostBridgeInjectionState>,
834    ) -> Self {
835        Self {
836            next_id: AtomicU64::new(start_id),
837            pending,
838            cancelled,
839            cancel_notify,
840            writer,
841            session_id: std::sync::Mutex::new(String::new()),
842            script_name: std::sync::Mutex::new(String::new()),
843            queued_transcript_injections: injection_state.unwrap_or_default(),
844            resume_requested: Arc::new(AtomicBool::new(false)),
845            skills_reload_requested: Arc::new(AtomicBool::new(false)),
846            daemon_idle: Arc::new(AtomicBool::new(false)),
847            prompt_stop_reason: std::sync::Mutex::new(None),
848            visible_call_states: std::sync::Mutex::new(HashMap::new()),
849            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
850            in_process: None,
851        }
852    }
853
854    /// Create an in-process host bridge backed by exported functions from a
855    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
856    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
857        let exported_functions = vm.load_module_exports(module_path).await?;
858        Ok(Self {
859            next_id: AtomicU64::new(1),
860            pending: Arc::new(Mutex::new(HashMap::new())),
861            cancelled: Arc::new(AtomicBool::new(false)),
862            cancel_notify: Arc::new(Notify::new()),
863            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
864            session_id: std::sync::Mutex::new(String::new()),
865            script_name: std::sync::Mutex::new(String::new()),
866            queued_transcript_injections: HostBridgeInjectionState::default(),
867            resume_requested: Arc::new(AtomicBool::new(false)),
868            skills_reload_requested: Arc::new(AtomicBool::new(false)),
869            daemon_idle: Arc::new(AtomicBool::new(false)),
870            prompt_stop_reason: std::sync::Mutex::new(None),
871            visible_call_states: std::sync::Mutex::new(HashMap::new()),
872            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
873            in_process: Some(InProcessHost {
874                module_path: module_path.to_path_buf(),
875                exported_functions,
876                vm,
877            }),
878        })
879    }
880
881    /// Set the ACP session ID for session-scoped notifications.
882    pub fn set_session_id(&self, id: &str) {
883        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
884    }
885
886    /// Set the currently executing script name (without .harn suffix).
887    pub fn set_script_name(&self, name: &str) {
888        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
889    }
890
891    /// Get the current script name.
892    fn get_script_name(&self) -> String {
893        self.script_name
894            .lock()
895            .unwrap_or_else(|e| e.into_inner())
896            .clone()
897    }
898
899    /// Get the session ID.
900    pub fn get_session_id(&self) -> String {
901        self.session_id
902            .lock()
903            .unwrap_or_else(|e| e.into_inner())
904            .clone()
905    }
906
907    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
908    fn write_line(&self, line: &str) -> Result<(), VmError> {
909        (self.writer)(line).map_err(VmError::Runtime)
910    }
911
912    /// Send a JSON-RPC request to the host and wait for the response.
913    /// Times out after 5 minutes to prevent deadlocks.
914    pub async fn call(
915        &self,
916        method: &str,
917        params: serde_json::Value,
918    ) -> Result<serde_json::Value, VmError> {
919        if let Some(in_process) = &self.in_process {
920            return in_process.dispatch(method, params).await;
921        }
922
923        if self.is_cancelled() {
924            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
925        }
926
927        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
928        let cancel_wait = self.cancel_notify.notified();
929        tokio::pin!(cancel_wait);
930
931        let request = crate::jsonrpc::request(id, method, params);
932
933        let (tx, rx) = oneshot::channel();
934        {
935            let mut pending = self.pending.lock().await;
936            pending.insert(id, tx);
937        }
938
939        let line = serde_json::to_string(&request)
940            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
941        if let Err(e) = self.write_line(&line) {
942            let mut pending = self.pending.lock().await;
943            pending.remove(&id);
944            return Err(e);
945        }
946
947        if self.is_cancelled() {
948            let mut pending = self.pending.lock().await;
949            pending.remove(&id);
950            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
951        }
952
953        let response = tokio::select! {
954            result = rx => match result {
955                Ok(msg) => msg,
956                Err(_) => {
957                    // Sender dropped: host closed or stdin reader exited.
958                    return Err(VmError::Runtime(
959                        "Bridge: host closed connection before responding".into(),
960                    ));
961                }
962            },
963            _ = &mut cancel_wait => {
964                let mut pending = self.pending.lock().await;
965                pending.remove(&id);
966                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
967            }
968            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
969                let mut pending = self.pending.lock().await;
970                pending.remove(&id);
971                return Err(VmError::Runtime(format!(
972                    "Bridge: host did not respond to '{method}' within {}s",
973                    DEFAULT_TIMEOUT.as_secs()
974                )));
975            }
976        };
977
978        if let Some(error) = response.get("error") {
979            let message = error["message"].as_str().unwrap_or("Unknown host error");
980            let code = error["code"].as_i64().unwrap_or(-1);
981            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
982            if code == -32001 {
983                return Err(VmError::CategorizedError {
984                    message: message.to_string(),
985                    category: ErrorCategory::ToolRejected,
986                });
987            }
988            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
989        }
990
991        Ok(response["result"].clone())
992    }
993
994    /// Send a JSON-RPC notification to the host (no response expected).
995    /// Serialized through the stdout mutex to prevent interleaving.
996    pub fn notify(&self, method: &str, params: serde_json::Value) {
997        let notification = crate::jsonrpc::notification(method, params);
998        if self.in_process.is_some() {
999            return;
1000        }
1001        if let Ok(line) = serde_json::to_string(&notification) {
1002            let _ = self.write_line(&line);
1003        }
1004    }
1005
1006    /// Check if the host has sent a cancel notification.
1007    pub fn is_cancelled(&self) -> bool {
1008        self.cancelled.load(Ordering::SeqCst)
1009    }
1010
1011    pub fn take_resume_signal(&self) -> bool {
1012        self.resume_requested.swap(false, Ordering::SeqCst)
1013    }
1014
1015    pub fn signal_resume(&self) {
1016        self.resume_requested.store(true, Ordering::SeqCst);
1017    }
1018
1019    pub fn set_daemon_idle(&self, idle: bool) {
1020        self.daemon_idle.store(idle, Ordering::SeqCst);
1021    }
1022
1023    pub fn is_daemon_idle(&self) -> bool {
1024        self.daemon_idle.load(Ordering::SeqCst)
1025    }
1026
1027    /// Record the canonical ACP `stopReason` for the current prompt. The
1028    /// last writer wins, which matches the semantic that an outer
1029    /// `agent_loop` (the one whose result the user observes) always
1030    /// finalizes after any inner loops it spawned.
1031    pub fn set_prompt_stop_reason(&self, reason: &str) {
1032        *self
1033            .prompt_stop_reason
1034            .lock()
1035            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1036    }
1037
1038    /// Consume any prompt stop reason recorded during this prompt. The
1039    /// ACP adapter calls this once after the pipeline returns; pipelines
1040    /// that didn't run an `agent_loop` see `None` and the adapter falls
1041    /// back to `end_turn`.
1042    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1043        self.prompt_stop_reason
1044            .lock()
1045            .unwrap_or_else(|e| e.into_inner())
1046            .take()
1047    }
1048
1049    /// Consume any pending `skills/update` signal the host has sent.
1050    /// Returns `true` exactly once per notification, letting callers
1051    /// trigger a layered-discovery rebuild without polling false
1052    /// positives. See issue #73 for the hot-reload contract.
1053    pub fn take_skills_reload_signal(&self) -> bool {
1054        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1055    }
1056
1057    /// Manually mark the skill catalog as stale. Used by tests and by
1058    /// the CLI when an internal event (e.g. `harn install`) should
1059    /// trigger the same rebuild a `skills/update` notification would.
1060    pub fn signal_skills_reload(&self) {
1061        self.skills_reload_requested.store(true, Ordering::SeqCst);
1062    }
1063
1064    /// Call the host's `skills/list` RPC and return the raw JSON array
1065    /// it responded with. Shape:
1066    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1067    /// The CLI adapter converts each entry into a
1068    /// [`crate::skills::SkillManifestRef`].
1069    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1070        let result = self.call("skills/list", serde_json::json!({})).await?;
1071        match result {
1072            serde_json::Value::Array(items) => Ok(items),
1073            serde_json::Value::Object(map) => match map.get("skills") {
1074                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1075                _ => Err(VmError::Runtime(
1076                    "skills/list: host response must be an array or { skills: [...] }".into(),
1077                )),
1078            },
1079            _ => Err(VmError::Runtime(
1080                "skills/list: unexpected response shape".into(),
1081            )),
1082        }
1083    }
1084
1085    /// Call the host's `host/tools/list` RPC and return normalized tool
1086    /// descriptors. Shape:
1087    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1088    /// The bridge also accepts `{ "tools": [...] }` and
1089    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1090    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1091        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1092        parse_host_tools_list_response(result)
1093    }
1094
1095    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1096    /// raw JSON body so the CLI can inspect both the frontmatter fields
1097    /// and the skill markdown body in whatever shape the host sends.
1098    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1099        self.call("skills/fetch", serde_json::json!({ "id": id }))
1100            .await
1101    }
1102
1103    pub fn injection_state(&self) -> HostBridgeInjectionState {
1104        self.queued_transcript_injections.clone()
1105    }
1106
1107    pub async fn push_pending_user_message(
1108        &self,
1109        content: String,
1110        transcript_content: serde_json::Value,
1111        mode: &str,
1112    ) -> String {
1113        self.queued_transcript_injections
1114            .push_pending_user_message(content, transcript_content, mode)
1115            .await
1116    }
1117
1118    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1119        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1120            .await
1121    }
1122
1123    pub async fn revoke_pending_user_message(
1124        &self,
1125        message_id: &str,
1126    ) -> PendingUserMessageMutationResult {
1127        self.queued_transcript_injections
1128            .revoke_pending_user_message(message_id)
1129            .await
1130    }
1131
1132    pub async fn replace_pending_user_message(
1133        &self,
1134        message_id: &str,
1135        content: String,
1136        transcript_content: serde_json::Value,
1137    ) -> PendingUserMessageMutationResult {
1138        self.queued_transcript_injections
1139            .replace_pending_user_message(message_id, content, transcript_content)
1140            .await
1141    }
1142
1143    pub async fn push_queued_session_remind_from_params(
1144        &self,
1145        params: &serde_json::Value,
1146    ) -> Result<String, String> {
1147        let reminder = queued_session_remind_from_params(params)?;
1148        let reminder_id = reminder.reminder.id.clone();
1149        self.queued_transcript_injections
1150            .push_session_reminder(reminder)
1151            .await;
1152        Ok(reminder_id)
1153    }
1154
1155    pub async fn take_queued_user_messages(
1156        &self,
1157        include_interrupt_immediate: bool,
1158        include_finish_step: bool,
1159        include_audit_only: bool,
1160    ) -> Vec<QueuedUserMessage> {
1161        let mut state = self.queued_transcript_injections.inner.lock().await;
1162        let mut selected = Vec::new();
1163        let mut retained = VecDeque::new();
1164        while let Some(injection) = state.queue.pop_front() {
1165            let should_take = match injection.mode() {
1166                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1167                QueuedUserMessageMode::FinishStep => include_finish_step,
1168                QueuedUserMessageMode::AuditOnly => include_audit_only,
1169            };
1170            match (should_take, injection) {
1171                (true, QueuedTranscriptInjection::User(message)) => {
1172                    state
1173                        .delivered_user_message_ids
1174                        .insert(message.message_id.clone());
1175                    selected.push(message);
1176                }
1177                (_, injection) => retained.push_back(injection),
1178            }
1179        }
1180        state.queue = retained;
1181        selected
1182    }
1183
1184    pub async fn take_queued_transcript_injections(
1185        &self,
1186        include_interrupt_immediate: bool,
1187        include_finish_step: bool,
1188        include_audit_only: bool,
1189    ) -> Vec<QueuedTranscriptInjection> {
1190        let mut state = self.queued_transcript_injections.inner.lock().await;
1191        let mut selected = Vec::new();
1192        let mut retained = VecDeque::new();
1193        while let Some(injection) = state.queue.pop_front() {
1194            let should_take = match injection.mode() {
1195                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1196                QueuedUserMessageMode::FinishStep => include_finish_step,
1197                QueuedUserMessageMode::AuditOnly => include_audit_only,
1198            };
1199            if should_take {
1200                if let QueuedTranscriptInjection::User(message) = &injection {
1201                    state
1202                        .delivered_user_message_ids
1203                        .insert(message.message_id.clone());
1204                }
1205                selected.push(injection);
1206            } else {
1207                retained.push_back(injection);
1208            }
1209        }
1210        state.queue = retained;
1211        selected
1212    }
1213
1214    pub async fn take_queued_user_messages_for(
1215        &self,
1216        checkpoint: DeliveryCheckpoint,
1217    ) -> Vec<QueuedUserMessage> {
1218        match checkpoint {
1219            DeliveryCheckpoint::InterruptImmediate => {
1220                self.take_queued_user_messages(true, false, false).await
1221            }
1222            DeliveryCheckpoint::AfterCurrentOperation => {
1223                self.take_queued_user_messages(false, true, false).await
1224            }
1225            DeliveryCheckpoint::EndOfInteraction => {
1226                self.take_queued_user_messages(false, false, true).await
1227            }
1228        }
1229    }
1230
1231    pub async fn take_queued_transcript_injections_for(
1232        &self,
1233        checkpoint: DeliveryCheckpoint,
1234    ) -> Vec<QueuedTranscriptInjection> {
1235        match checkpoint {
1236            DeliveryCheckpoint::InterruptImmediate => {
1237                self.take_queued_transcript_injections(true, false, false)
1238                    .await
1239            }
1240            DeliveryCheckpoint::AfterCurrentOperation => {
1241                self.take_queued_transcript_injections(false, true, false)
1242                    .await
1243            }
1244            DeliveryCheckpoint::EndOfInteraction => {
1245                self.take_queued_transcript_injections(false, false, true)
1246                    .await
1247            }
1248        }
1249    }
1250
1251    /// Send an output notification (for log/print in bridge mode).
1252    pub fn send_output(&self, text: &str) {
1253        self.notify("output", serde_json::json!({"text": text}));
1254    }
1255
1256    /// Send a progress notification with optional numeric progress and structured data.
1257    pub fn send_progress(
1258        &self,
1259        phase: &str,
1260        message: &str,
1261        progress: Option<i64>,
1262        total: Option<i64>,
1263        data: Option<serde_json::Value>,
1264    ) {
1265        let mut payload = serde_json::json!({"phase": phase, "message": message});
1266        if let Some(p) = progress {
1267            payload["progress"] = serde_json::json!(p);
1268        }
1269        if let Some(t) = total {
1270            payload["total"] = serde_json::json!(t);
1271        }
1272        if let Some(d) = data {
1273            payload["data"] = d;
1274        }
1275        self.notify("progress", payload);
1276    }
1277
1278    /// Send a structured log notification.
1279    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1280        let mut payload = serde_json::json!({"level": level, "message": message});
1281        if let Some(f) = fields {
1282            payload["fields"] = f;
1283        }
1284        self.notify("log", payload);
1285    }
1286
1287    /// Send a `session/update` with `call_start` — signals the beginning of
1288    /// an LLM call, tool call, or builtin call for observability.
1289    pub fn send_call_start(
1290        &self,
1291        call_id: &str,
1292        call_type: &str,
1293        name: &str,
1294        metadata: serde_json::Value,
1295    ) {
1296        let session_id = self.get_session_id();
1297        let script = self.get_script_name();
1298        let stream_publicly = metadata
1299            .get("stream_publicly")
1300            .and_then(|value| value.as_bool())
1301            .unwrap_or(true);
1302        self.visible_call_streams
1303            .lock()
1304            .unwrap_or_else(|e| e.into_inner())
1305            .insert(call_id.to_string(), stream_publicly);
1306        self.notify(
1307            "session/update",
1308            serde_json::json!({
1309                "sessionId": session_id,
1310                "update": {
1311                    "sessionUpdate": "call_start",
1312                    "content": {
1313                        "toolCallId": call_id,
1314                        "call_type": call_type,
1315                        "name": name,
1316                        "script": script,
1317                        "metadata": metadata,
1318                    },
1319                },
1320            }),
1321        );
1322    }
1323
1324    /// Send a `session/update` with `call_progress` — a streaming token delta
1325    /// from an in-flight LLM call.
1326    pub fn send_call_progress(
1327        &self,
1328        call_id: &str,
1329        delta: &str,
1330        accumulated_tokens: u64,
1331        user_visible: bool,
1332    ) {
1333        let session_id = self.get_session_id();
1334        let (visible_text, visible_delta) = {
1335            let stream_publicly = self
1336                .visible_call_streams
1337                .lock()
1338                .unwrap_or_else(|e| e.into_inner())
1339                .get(call_id)
1340                .copied()
1341                .unwrap_or(true);
1342            let mut states = self
1343                .visible_call_states
1344                .lock()
1345                .unwrap_or_else(|e| e.into_inner());
1346            let state = states.entry(call_id.to_string()).or_default();
1347            state.push(delta, stream_publicly)
1348        };
1349        self.notify(
1350            "session/update",
1351            serde_json::json!({
1352                "sessionId": session_id,
1353                "update": {
1354                    "sessionUpdate": "call_progress",
1355                    "content": {
1356                        "toolCallId": call_id,
1357                        "delta": delta,
1358                        "accumulated_tokens": accumulated_tokens,
1359                        "visible_text": visible_text,
1360                        "visible_delta": visible_delta,
1361                        "user_visible": user_visible,
1362                    },
1363                },
1364            }),
1365        );
1366    }
1367
1368    /// Send a `session/update` with `call_end` — signals completion of a call.
1369    pub fn send_call_end(
1370        &self,
1371        call_id: &str,
1372        call_type: &str,
1373        name: &str,
1374        duration_ms: u64,
1375        status: &str,
1376        metadata: serde_json::Value,
1377    ) {
1378        let session_id = self.get_session_id();
1379        let script = self.get_script_name();
1380        self.visible_call_states
1381            .lock()
1382            .unwrap_or_else(|e| e.into_inner())
1383            .remove(call_id);
1384        self.visible_call_streams
1385            .lock()
1386            .unwrap_or_else(|e| e.into_inner())
1387            .remove(call_id);
1388        self.notify(
1389            "session/update",
1390            serde_json::json!({
1391                "sessionId": session_id,
1392                "update": {
1393                    "sessionUpdate": "call_end",
1394                    "content": {
1395                        "toolCallId": call_id,
1396                        "call_type": call_type,
1397                        "name": name,
1398                        "script": script,
1399                        "duration_ms": duration_ms,
1400                        "status": status,
1401                        "metadata": metadata,
1402                    },
1403                },
1404            }),
1405        );
1406    }
1407
1408    /// Send a worker lifecycle update for delegated/background execution.
1409    pub fn send_worker_update(
1410        &self,
1411        worker_id: &str,
1412        worker_name: &str,
1413        status: &str,
1414        metadata: serde_json::Value,
1415        audit: Option<&MutationSessionRecord>,
1416    ) {
1417        let session_id = self.get_session_id();
1418        let script = self.get_script_name();
1419        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1420        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1421        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1422        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1423        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1424        let lifecycle = serde_json::json!({
1425            "event": status,
1426            "worker_id": worker_id,
1427            "worker_name": worker_name,
1428            "started_at": started_at,
1429            "finished_at": finished_at,
1430        });
1431        self.notify(
1432            "session/update",
1433            serde_json::json!({
1434                "sessionId": session_id,
1435                "update": {
1436                    "sessionUpdate": "worker_update",
1437                    "content": {
1438                        "worker_id": worker_id,
1439                        "worker_name": worker_name,
1440                        "status": status,
1441                        "script": script,
1442                        "started_at": started_at,
1443                        "finished_at": finished_at,
1444                        "snapshot_path": snapshot_path,
1445                        "run_id": run_id,
1446                        "run_path": run_path,
1447                        "lifecycle": lifecycle,
1448                        "audit": audit,
1449                        "metadata": metadata,
1450                    },
1451                },
1452            }),
1453        );
1454    }
1455}
1456
1457/// Convert a serde_json::Value to a VmValue.
1458pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1459    crate::stdlib::json_to_vm_value(val)
1460}
1461
1462fn parse_host_tools_list_response(
1463    result: serde_json::Value,
1464) -> Result<Vec<serde_json::Value>, VmError> {
1465    let tools = match result {
1466        serde_json::Value::Array(items) => items,
1467        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1468            map.get("result")
1469                .and_then(|value| value.get("tools"))
1470                .cloned()
1471        }) {
1472            Some(serde_json::Value::Array(items)) => items,
1473            _ => {
1474                return Err(VmError::Runtime(
1475                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1476                ));
1477            }
1478        },
1479        _ => {
1480            return Err(VmError::Runtime(
1481                "host/tools/list: unexpected response shape".into(),
1482            ));
1483        }
1484    };
1485
1486    let mut normalized = Vec::with_capacity(tools.len());
1487    for tool in tools {
1488        let serde_json::Value::Object(map) = tool else {
1489            return Err(VmError::Runtime(
1490                "host/tools/list: every tool must be an object".into(),
1491            ));
1492        };
1493        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1494            return Err(VmError::Runtime(
1495                "host/tools/list: every tool must include a string `name`".into(),
1496            ));
1497        };
1498        let description = map
1499            .get("description")
1500            .and_then(|value| value.as_str())
1501            .or_else(|| {
1502                map.get("short_description")
1503                    .and_then(|value| value.as_str())
1504            })
1505            .unwrap_or_default();
1506        let schema = map
1507            .get("schema")
1508            .cloned()
1509            .or_else(|| map.get("parameters").cloned())
1510            .or_else(|| map.get("input_schema").cloned())
1511            .unwrap_or(serde_json::Value::Null);
1512        let deprecated = map
1513            .get("deprecated")
1514            .and_then(|value| value.as_bool())
1515            .unwrap_or(false);
1516        normalized.push(serde_json::json!({
1517            "name": name,
1518            "description": description,
1519            "schema": schema,
1520            "deprecated": deprecated,
1521        }));
1522    }
1523    Ok(normalized)
1524}
1525
1526#[cfg(test)]
1527mod tests {
1528    use super::*;
1529
1530    fn test_bridge() -> HostBridge {
1531        HostBridge::from_parts(
1532            Arc::new(Mutex::new(HashMap::new())),
1533            Arc::new(AtomicBool::new(false)),
1534            Arc::new(std::sync::Mutex::new(())),
1535            1,
1536        )
1537    }
1538
1539    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1540        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1541            Arc::new(Mutex::new(HashMap::new())),
1542            Arc::new(AtomicBool::new(false)),
1543            Arc::new(Notify::new()),
1544            Arc::new(|_| Ok(())),
1545            100,
1546            Some(owner.injection_state()),
1547        )
1548    }
1549
1550    #[test]
1551    fn test_json_rpc_request_format() {
1552        let request = crate::jsonrpc::request(
1553            1,
1554            "llm_call",
1555            serde_json::json!({
1556                "prompt": "Hello",
1557                "system": "Be helpful",
1558            }),
1559        );
1560        let s = serde_json::to_string(&request).unwrap();
1561        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1562        assert!(s.contains("\"id\":1"));
1563        assert!(s.contains("\"method\":\"llm_call\""));
1564    }
1565
1566    #[test]
1567    fn test_json_rpc_notification_format() {
1568        let notification =
1569            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1570        let s = serde_json::to_string(&notification).unwrap();
1571        assert!(s.contains("\"method\":\"output\""));
1572        assert!(!s.contains("\"id\""));
1573    }
1574
1575    #[test]
1576    fn test_json_rpc_error_response_parsing() {
1577        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1578        assert!(response.get("error").is_some());
1579        assert_eq!(
1580            response["error"]["message"].as_str().unwrap(),
1581            "Invalid request"
1582        );
1583    }
1584
1585    #[test]
1586    fn test_json_rpc_success_response_parsing() {
1587        let response = crate::jsonrpc::response(
1588            1,
1589            serde_json::json!({
1590                "text": "Hello world",
1591                "input_tokens": 10,
1592                "output_tokens": 5,
1593            }),
1594        );
1595        assert!(response.get("result").is_some());
1596        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1597    }
1598
1599    #[test]
1600    fn test_cancelled_flag() {
1601        let cancelled = Arc::new(AtomicBool::new(false));
1602        assert!(!cancelled.load(Ordering::SeqCst));
1603        cancelled.store(true, Ordering::SeqCst);
1604        assert!(cancelled.load(Ordering::SeqCst));
1605    }
1606
1607    #[test]
1608    fn pending_host_calls_return_when_cancellation_arrives() {
1609        let runtime = tokio::runtime::Builder::new_current_thread()
1610            .enable_all()
1611            .build()
1612            .unwrap();
1613        runtime.block_on(async {
1614            let pending = Arc::new(Mutex::new(HashMap::new()));
1615            let cancelled = Arc::new(AtomicBool::new(false));
1616            let bridge = HostBridge::from_parts_with_writer(
1617                pending.clone(),
1618                cancelled.clone(),
1619                Arc::new(|_| Ok(())),
1620                1,
1621            );
1622
1623            let call = bridge.call("host/work", serde_json::json!({}));
1624            tokio::pin!(call);
1625
1626            loop {
1627                tokio::select! {
1628                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1629                    _ = tokio::task::yield_now() => {}
1630                }
1631                if !pending.lock().await.is_empty() {
1632                    break;
1633                }
1634            }
1635
1636            cancelled.store(true, Ordering::SeqCst);
1637            bridge.cancel_notify.notify_waiters();
1638
1639            let result = tokio::time::timeout(Duration::from_secs(1), call)
1640                .await
1641                .expect("pending call should observe cancellation promptly");
1642            assert!(
1643                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1644            );
1645            assert!(pending.lock().await.is_empty());
1646        });
1647    }
1648
1649    #[test]
1650    fn queued_messages_are_filtered_by_delivery_mode() {
1651        let runtime = tokio::runtime::Builder::new_current_thread()
1652            .enable_all()
1653            .build()
1654            .unwrap();
1655        runtime.block_on(async {
1656            let bridge = test_bridge();
1657            bridge
1658                .push_queued_user_message("first".to_string(), "finish_step")
1659                .await;
1660            bridge
1661                .push_queued_user_message("second".to_string(), "audit_only")
1662                .await;
1663
1664            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1665            assert_eq!(finish_step.len(), 1);
1666            assert_eq!(finish_step[0].content, "first");
1667
1668            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1669            assert_eq!(audit_only.len(), 1);
1670            assert_eq!(audit_only[0].content, "second");
1671        });
1672    }
1673
1674    #[test]
1675    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1676        let runtime = tokio::runtime::Builder::new_current_thread()
1677            .enable_all()
1678            .build()
1679            .unwrap();
1680        runtime.block_on(async {
1681            let bridge = test_bridge();
1682            let first_id = bridge
1683                .push_pending_user_message(
1684                    "first".to_string(),
1685                    serde_json::json!("first"),
1686                    "audit_only",
1687                )
1688                .await;
1689            let second_id = bridge
1690                .push_pending_user_message(
1691                    "second".to_string(),
1692                    serde_json::json!("second"),
1693                    "audit_only",
1694                )
1695                .await;
1696
1697            assert_eq!(
1698                bridge
1699                    .replace_pending_user_message(
1700                        &second_id,
1701                        "second edited".to_string(),
1702                        serde_json::json!("second edited"),
1703                    )
1704                    .await,
1705                PendingUserMessageMutationResult::Mutated
1706            );
1707            assert_eq!(
1708                bridge.revoke_pending_user_message(&first_id).await,
1709                PendingUserMessageMutationResult::Mutated
1710            );
1711            assert_eq!(
1712                bridge.revoke_pending_user_message(&first_id).await,
1713                PendingUserMessageMutationResult::AlreadyRevoked
1714            );
1715
1716            let delivered = bridge
1717                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1718                .await;
1719            assert_eq!(delivered.len(), 1);
1720            assert_eq!(delivered[0].message_id, second_id);
1721            assert_eq!(delivered[0].content, "second edited");
1722
1723            assert_eq!(
1724                bridge.revoke_pending_user_message(&second_id).await,
1725                PendingUserMessageMutationResult::AlreadyDelivered
1726            );
1727            assert_eq!(
1728                bridge
1729                    .replace_pending_user_message(
1730                        &second_id,
1731                        "too late".to_string(),
1732                        serde_json::json!("too late"),
1733                    )
1734                    .await,
1735                PendingUserMessageMutationResult::AlreadyDelivered
1736            );
1737            assert_eq!(
1738                bridge.revoke_pending_user_message("missing").await,
1739                PendingUserMessageMutationResult::UnknownMessageId
1740            );
1741        });
1742    }
1743
1744    #[test]
1745    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1746        let runtime = tokio::runtime::Builder::new_current_thread()
1747            .enable_all()
1748            .build()
1749            .unwrap();
1750        runtime.block_on(async {
1751            let bridge = test_bridge();
1752            let first_id = bridge
1753                .push_pending_user_message(
1754                    "first".to_string(),
1755                    serde_json::json!("first"),
1756                    "finish_step",
1757                )
1758                .await;
1759            let second_id = bridge
1760                .push_pending_user_message(
1761                    "second".to_string(),
1762                    serde_json::json!("second"),
1763                    "finish_step",
1764                )
1765                .await;
1766            assert_eq!(
1767                bridge
1768                    .replace_pending_user_message(
1769                        &first_id,
1770                        "first edited".to_string(),
1771                        serde_json::json!("first edited"),
1772                    )
1773                    .await,
1774                PendingUserMessageMutationResult::Mutated
1775            );
1776
1777            let delivered = bridge
1778                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1779                .await;
1780            assert_eq!(
1781                delivered
1782                    .iter()
1783                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1784                    .collect::<Vec<_>>(),
1785                vec![
1786                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1787                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
1788                ]
1789            );
1790        });
1791    }
1792
1793    #[test]
1794    fn pending_user_message_state_survives_bridge_replacement() {
1795        let runtime = tokio::runtime::Builder::new_current_thread()
1796            .enable_all()
1797            .build()
1798            .unwrap();
1799        runtime.block_on(async {
1800            let bridge = test_bridge();
1801            let revoked_id = bridge
1802                .push_pending_user_message(
1803                    "revoke me".to_string(),
1804                    serde_json::json!("revoke me"),
1805                    "audit_only",
1806                )
1807                .await;
1808            let delivered_id = bridge
1809                .push_pending_user_message(
1810                    "deliver me".to_string(),
1811                    serde_json::json!("deliver me"),
1812                    "audit_only",
1813                )
1814                .await;
1815            assert_eq!(
1816                bridge.revoke_pending_user_message(&revoked_id).await,
1817                PendingUserMessageMutationResult::Mutated
1818            );
1819            bridge.cancelled.store(true, Ordering::SeqCst);
1820
1821            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1822            assert_eq!(
1823                replacement_bridge
1824                    .revoke_pending_user_message(&revoked_id)
1825                    .await,
1826                PendingUserMessageMutationResult::AlreadyRevoked
1827            );
1828            let delivered = replacement_bridge
1829                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1830                .await;
1831            assert_eq!(delivered.len(), 1);
1832            assert_eq!(delivered[0].message_id, delivered_id);
1833            assert_eq!(delivered[0].content, "deliver me");
1834            assert_eq!(
1835                bridge.revoke_pending_user_message(&delivered_id).await,
1836                PendingUserMessageMutationResult::AlreadyDelivered
1837            );
1838        });
1839    }
1840
1841    #[test]
1842    fn queued_transcript_injections_preserve_user_reminder_separation() {
1843        let runtime = tokio::runtime::Builder::new_current_thread()
1844            .enable_all()
1845            .build()
1846            .unwrap();
1847        runtime.block_on(async {
1848            let bridge = test_bridge();
1849            bridge
1850                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1851                .await;
1852            let reminder_id = bridge
1853                .push_queued_session_remind_from_params(&serde_json::json!({
1854                    "body": "Host-provided ambient context.",
1855                    "tags": ["host"],
1856                    "dedupe_key": "host-context",
1857                    "ttl_turns": 2,
1858                    "mode": "audit_only",
1859                    "_meta": {"harn": {"source": "test"}},
1860                }))
1861                .await
1862                .expect("valid reminder");
1863
1864            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1865            assert_eq!(finish_step.len(), 1);
1866            assert_eq!(finish_step[0].content, "human follow-up");
1867
1868            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1869            assert!(no_user_messages.is_empty());
1870
1871            let injections = bridge
1872                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1873                .await;
1874            assert_eq!(injections.len(), 1);
1875            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1876                panic!("expected queued reminder");
1877            };
1878            assert_eq!(reminder.reminder.id, reminder_id);
1879            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1880            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1881            assert_eq!(
1882                reminder.reminder.dedupe_key.as_deref(),
1883                Some("host-context")
1884            );
1885            assert_eq!(reminder.reminder.ttl_turns, Some(2));
1886            assert_eq!(
1887                reminder.reminder.source,
1888                crate::llm::helpers::ReminderSource::Bridge
1889            );
1890        });
1891    }
1892
1893    #[test]
1894    fn bridge_remind_modes_honor_delivery_checkpoints() {
1895        let runtime = tokio::runtime::Builder::new_current_thread()
1896            .enable_all()
1897            .build()
1898            .unwrap();
1899        runtime.block_on(async {
1900            let cases = [
1901                (
1902                    "interrupt_immediate",
1903                    DeliveryCheckpoint::InterruptImmediate,
1904                    DeliveryCheckpoint::AfterCurrentOperation,
1905                ),
1906                (
1907                    "finish_step",
1908                    DeliveryCheckpoint::AfterCurrentOperation,
1909                    DeliveryCheckpoint::EndOfInteraction,
1910                ),
1911                (
1912                    "audit_only",
1913                    DeliveryCheckpoint::EndOfInteraction,
1914                    DeliveryCheckpoint::InterruptImmediate,
1915                ),
1916            ];
1917
1918            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
1919                let bridge = test_bridge();
1920                bridge
1921                    .push_queued_session_remind_from_params(&serde_json::json!({
1922                        "body": format!("Reminder for {mode}"),
1923                        "mode": mode,
1924                    }))
1925                    .await
1926                    .expect("valid session/remind payload");
1927
1928                let premature = bridge
1929                    .take_queued_transcript_injections_for(wrong_checkpoint)
1930                    .await;
1931                assert!(
1932                    premature.is_empty(),
1933                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
1934                );
1935
1936                let delivered = bridge
1937                    .take_queued_transcript_injections_for(expected_checkpoint)
1938                    .await;
1939                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
1940                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
1941                    panic!("expected reminder for {mode}");
1942                };
1943                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
1944            }
1945        });
1946    }
1947
1948    #[test]
1949    fn session_remind_validation_rejects_user_message_shape() {
1950        let err = queued_session_remind_from_params(&serde_json::json!({
1951            "content": "this is still a user message",
1952            "mode": "interrupt_immediate",
1953        }))
1954        .expect_err("session/remind must require a reminder body");
1955        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
1956        assert!(err.contains("body"));
1957    }
1958
1959    #[test]
1960    fn session_remind_validation_rejects_unknown_options_separately() {
1961        let err = queued_session_remind_from_params(&serde_json::json!({
1962            "body": "valid body",
1963            "unknown_host_field": true,
1964        }))
1965        .expect_err("session/remind must reject unknown top-level fields");
1966        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
1967        assert!(err.contains("unknown_host_field"));
1968    }
1969
1970    #[test]
1971    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
1972        let err = queued_session_remind_from_params(&serde_json::json!({
1973            "body": "valid body",
1974            "propagate": "workspace",
1975        }))
1976        .expect_err("session/remind must reject unknown propagate values");
1977        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
1978        assert!(err.contains("propagate"));
1979    }
1980
1981    #[test]
1982    fn test_json_result_to_vm_value_string() {
1983        let val = serde_json::json!("hello");
1984        let vm_val = json_result_to_vm_value(&val);
1985        assert_eq!(vm_val.display(), "hello");
1986    }
1987
1988    #[test]
1989    fn test_json_result_to_vm_value_dict() {
1990        let val = serde_json::json!({"name": "test", "count": 42});
1991        let vm_val = json_result_to_vm_value(&val);
1992        let VmValue::Dict(d) = &vm_val else {
1993            unreachable!("Expected Dict, got {:?}", vm_val);
1994        };
1995        assert_eq!(d.get("name").unwrap().display(), "test");
1996        assert_eq!(d.get("count").unwrap().display(), "42");
1997    }
1998
1999    #[test]
2000    fn test_json_result_to_vm_value_null() {
2001        let val = serde_json::json!(null);
2002        let vm_val = json_result_to_vm_value(&val);
2003        assert!(matches!(vm_val, VmValue::Nil));
2004    }
2005
2006    #[test]
2007    fn test_json_result_to_vm_value_nested() {
2008        let val = serde_json::json!({
2009            "text": "response",
2010            "tool_calls": [
2011                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2012            ],
2013            "input_tokens": 100,
2014            "output_tokens": 50,
2015        });
2016        let vm_val = json_result_to_vm_value(&val);
2017        let VmValue::Dict(d) = &vm_val else {
2018            unreachable!("Expected Dict, got {:?}", vm_val);
2019        };
2020        assert_eq!(d.get("text").unwrap().display(), "response");
2021        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2022            unreachable!("Expected List for tool_calls");
2023        };
2024        assert_eq!(list.len(), 1);
2025    }
2026
2027    #[test]
2028    fn parse_host_tools_list_accepts_object_wrapper() {
2029        let tools = parse_host_tools_list_response(serde_json::json!({
2030            "tools": [
2031                {
2032                    "name": "Read",
2033                    "description": "Read a file",
2034                    "schema": {"type": "object"},
2035                }
2036            ]
2037        }))
2038        .expect("tool list");
2039
2040        assert_eq!(tools.len(), 1);
2041        assert_eq!(tools[0]["name"], "Read");
2042        assert_eq!(tools[0]["deprecated"], false);
2043    }
2044
2045    #[test]
2046    fn parse_host_tools_list_accepts_compat_fields() {
2047        let tools = parse_host_tools_list_response(serde_json::json!({
2048            "result": {
2049                "tools": [
2050                    {
2051                        "name": "Edit",
2052                        "short_description": "Apply an edit",
2053                        "input_schema": {"type": "object"},
2054                        "deprecated": true,
2055                    }
2056                ]
2057            }
2058        }))
2059        .expect("tool list");
2060
2061        assert_eq!(tools[0]["description"], "Apply an edit");
2062        assert_eq!(tools[0]["schema"]["type"], "object");
2063        assert_eq!(tools[0]["deprecated"], true);
2064    }
2065
2066    #[test]
2067    fn parse_host_tools_list_requires_tool_names() {
2068        let err = parse_host_tools_list_response(serde_json::json!({
2069            "tools": [
2070                {"description": "missing name"}
2071            ]
2072        }))
2073        .expect_err("expected error");
2074        assert!(err
2075            .to_string()
2076            .contains("host/tools/list: every tool must include a string `name`"));
2077    }
2078
2079    #[test]
2080    fn test_timeout_duration() {
2081        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2082    }
2083}