Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use harn_parser::diagnostic_codes::Code;
19
20use crate::orchestration::MutationSessionRecord;
21use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
22use crate::visible_text::VisibleTextState;
23use crate::vm::Vm;
24
25/// Default timeout for bridge calls (5 minutes).
26const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
27
28pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
29
30fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
31    Arc::new(move |line: &str| {
32        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
33        let mut stdout = std::io::stdout().lock();
34        stdout
35            .write_all(line.as_bytes())
36            .map_err(|e| format!("Bridge write error: {e}"))?;
37        stdout
38            .write_all(b"\n")
39            .map_err(|e| format!("Bridge write error: {e}"))?;
40        stdout
41            .flush()
42            .map_err(|e| format!("Bridge flush error: {e}"))?;
43        Ok(())
44    })
45}
46
47/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
48///
49/// The bridge sends requests to the host on stdout and receives responses
50/// on stdin. A background task reads stdin and dispatches responses to
51/// waiting callers by request ID. All stdout writes are serialized through
52/// a mutex to prevent interleaving.
53pub struct HostBridge {
54    next_id: AtomicU64,
55    /// Pending request waiters, keyed by JSON-RPC id.
56    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
57    /// Whether the host has sent a cancel notification.
58    cancelled: Arc<AtomicBool>,
59    /// Wakes pending host calls when cancellation arrives.
60    cancel_notify: Arc<Notify>,
61    /// Transport writer used to send JSON-RPC to the host.
62    writer: HostBridgeWriter,
63    /// ACP session ID (set in ACP mode for session-scoped notifications).
64    session_id: std::sync::Mutex<String>,
65    /// Name of the currently executing Harn script (without .harn suffix).
66    script_name: std::sync::Mutex<String>,
67    /// Transcript injections queued by the host while a run is active.
68    queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>>,
69    /// Host-triggered resume signal for daemon agents.
70    resume_requested: Arc<AtomicBool>,
71    /// Host-triggered skill-registry invalidation signal. Set when the
72    /// host sends a `skills/update` notification; consumed by the CLI
73    /// between runs (watch mode, long-running agents) to rebuild the
74    /// layered skill catalog from its current filesystem + host state.
75    skills_reload_requested: Arc<AtomicBool>,
76    /// Whether the current daemon-mode agent loop is blocked in idle wait.
77    daemon_idle: Arc<AtomicBool>,
78    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
79    /// finalize during this prompt. Read once by the ACP adapter when the
80    /// pipeline returns and populated by `host_agent_session_finalize`.
81    /// Pipelines that don't run an agent loop leave this `None`, in which
82    /// case the adapter falls back to `end_turn`.
83    prompt_stop_reason: std::sync::Mutex<Option<String>>,
84    /// Per-call visible assistant text state for call_progress notifications.
85    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
86    /// Whether an LLM call's deltas should be exposed to end users while streaming.
87    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
88    /// Optional in-process host-module backend used by `harn playground`.
89    in_process: Option<InProcessHost>,
90}
91
92struct InProcessHost {
93    module_path: PathBuf,
94    exported_functions: BTreeMap<String, Rc<VmClosure>>,
95    vm: Vm,
96}
97
98impl InProcessHost {
99    async fn dispatch(
100        &self,
101        method: &str,
102        params: serde_json::Value,
103    ) -> Result<serde_json::Value, VmError> {
104        match method {
105            "builtin_call" => {
106                let name = params
107                    .get("name")
108                    .and_then(|value| value.as_str())
109                    .unwrap_or_default();
110                let args = params
111                    .get("args")
112                    .and_then(|value| value.as_array())
113                    .cloned()
114                    .unwrap_or_default()
115                    .into_iter()
116                    .map(|value| json_result_to_vm_value(&value))
117                    .collect::<Vec<_>>();
118                self.invoke_export(name, &args).await
119            }
120            "host/tools/list" => self
121                .invoke_optional_export("host_tools_list", &[])
122                .await
123                .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
124            "session/request_permission" => self.request_permission(params).await,
125            other => Err(VmError::Runtime(format!(
126                "playground host backend does not implement bridge method '{other}'"
127            ))),
128        }
129    }
130
131    async fn invoke_export(
132        &self,
133        name: &str,
134        args: &[VmValue],
135    ) -> Result<serde_json::Value, VmError> {
136        let Some(closure) = self.exported_functions.get(name) else {
137            return Err(VmError::Runtime(format!(
138                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
139                self.module_path.display()
140            )));
141        };
142
143        let mut vm = self.vm.child_vm_for_host();
144        let result = vm.call_closure_pub(closure, args).await?;
145        Ok(crate::llm::vm_value_to_json(&result))
146    }
147
148    async fn invoke_optional_export(
149        &self,
150        name: &str,
151        args: &[VmValue],
152    ) -> Result<Option<serde_json::Value>, VmError> {
153        if !self.exported_functions.contains_key(name) {
154            return Ok(None);
155        }
156        self.invoke_export(name, args).await.map(Some)
157    }
158
159    async fn request_permission(
160        &self,
161        params: serde_json::Value,
162    ) -> Result<serde_json::Value, VmError> {
163        let Some(closure) = self.exported_functions.get("request_permission") else {
164            return Ok(serde_json::json!({ "granted": true }));
165        };
166
167        let tool_name = params
168            .get("toolCall")
169            .and_then(|tool_call| tool_call.get("toolName"))
170            .and_then(|value| value.as_str())
171            .unwrap_or_default();
172        let tool_args = params
173            .get("toolCall")
174            .and_then(|tool_call| tool_call.get("rawInput"))
175            .map(json_result_to_vm_value)
176            .unwrap_or(VmValue::Nil);
177        let full_payload = json_result_to_vm_value(&params);
178
179        let arg_count = closure.func.params.len();
180        let args = if arg_count >= 3 {
181            vec![
182                VmValue::String(Rc::from(tool_name.to_string())),
183                tool_args,
184                full_payload,
185            ]
186        } else if arg_count == 2 {
187            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
188        } else if arg_count == 1 {
189            vec![full_payload]
190        } else {
191            Vec::new()
192        };
193
194        let mut vm = self.vm.child_vm_for_host();
195        let result = vm.call_closure_pub(closure, &args).await?;
196        let payload = match result {
197            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
198            VmValue::String(reason) if !reason.is_empty() => {
199                serde_json::json!({ "granted": false, "reason": reason.to_string() })
200            }
201            other => {
202                let json = crate::llm::vm_value_to_json(&other);
203                if json
204                    .get("granted")
205                    .and_then(|value| value.as_bool())
206                    .is_some()
207                    || json.get("outcome").is_some()
208                {
209                    json
210                } else {
211                    serde_json::json!({ "granted": other.is_truthy() })
212                }
213            }
214        };
215        Ok(payload)
216    }
217}
218
219#[derive(Clone, Copy, Debug, PartialEq, Eq)]
220pub enum QueuedUserMessageMode {
221    InterruptImmediate,
222    FinishStep,
223    WaitForCompletion,
224}
225
226#[derive(Clone, Copy, Debug, PartialEq, Eq)]
227pub enum DeliveryCheckpoint {
228    InterruptImmediate,
229    AfterCurrentOperation,
230    EndOfInteraction,
231}
232
233impl QueuedUserMessageMode {
234    fn from_str(value: &str) -> Self {
235        match value {
236            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
237            "finish_step" | "after_current_operation" => Self::FinishStep,
238            _ => Self::WaitForCompletion,
239        }
240    }
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub struct QueuedUserMessage {
245    pub content: String,
246    pub mode: QueuedUserMessageMode,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub struct QueuedReminder {
251    pub reminder: crate::llm::helpers::SystemReminder,
252    pub mode: QueuedUserMessageMode,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq)]
256pub enum QueuedTranscriptInjection {
257    User(QueuedUserMessage),
258    Reminder(QueuedReminder),
259}
260
261impl QueuedTranscriptInjection {
262    fn mode(&self) -> QueuedUserMessageMode {
263        match self {
264            Self::User(message) => message.mode,
265            Self::Reminder(reminder) => reminder.mode,
266        }
267    }
268}
269
270fn queue_user_message_from_params(params: &serde_json::Value) -> Option<QueuedUserMessage> {
271    let content = params
272        .get("content")
273        .and_then(|v| v.as_str())
274        .unwrap_or("")
275        .to_string();
276    if content.is_empty() {
277        return None;
278    }
279    let mode = QueuedUserMessageMode::from_str(
280        params
281            .get("mode")
282            .and_then(|v| v.as_str())
283            .unwrap_or("wait_for_completion"),
284    );
285    Some(QueuedUserMessage { content, mode })
286}
287
288fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
289    format!(
290        "{}: {}",
291        Code::ReminderUnknownOption.as_str(),
292        message.as_ref()
293    )
294}
295
296fn session_remind_shape_error(message: impl AsRef<str>) -> String {
297    format!(
298        "{}: {}",
299        Code::ReminderInvalidShape.as_str(),
300        message.as_ref()
301    )
302}
303
304fn string_field(
305    map: &serde_json::Map<String, serde_json::Value>,
306    key: &str,
307    required: bool,
308) -> Result<Option<String>, String> {
309    match map.get(key) {
310        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
311            format!("`{key}` must be a non-empty string"),
312        )),
313        None | Some(serde_json::Value::Null) => Ok(None),
314        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
315            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
316        ),
317        Some(serde_json::Value::String(value)) => {
318            let trimmed = value.trim();
319            if trimmed.is_empty() {
320                Ok(None)
321            } else {
322                Ok(Some(trimmed.to_string()))
323            }
324        }
325        Some(other) => Err(session_remind_shape_error(format!(
326            "`{key}` must be a string, got {other}"
327        ))),
328    }
329}
330
331fn bool_field(
332    map: &serde_json::Map<String, serde_json::Value>,
333    key: &str,
334) -> Result<Option<bool>, String> {
335    match map.get(key) {
336        None | Some(serde_json::Value::Null) => Ok(None),
337        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
338        Some(other) => Err(session_remind_shape_error(format!(
339            "`{key}` must be a bool, got {other}"
340        ))),
341    }
342}
343
344fn int_field(
345    map: &serde_json::Map<String, serde_json::Value>,
346    key: &str,
347) -> Result<Option<i64>, String> {
348    match map.get(key) {
349        None | Some(serde_json::Value::Null) => Ok(None),
350        Some(serde_json::Value::Number(value)) => {
351            let Some(value) = value.as_i64() else {
352                return Err(session_remind_shape_error(format!(
353                    "`{key}` must be an integer"
354                )));
355            };
356            Ok(Some(value))
357        }
358        Some(other) => Err(session_remind_shape_error(format!(
359            "`{key}` must be an int, got {other}"
360        ))),
361    }
362}
363
364fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
365    let Some(value) = map.get("tags") else {
366        return Ok(Vec::new());
367    };
368    if value.is_null() {
369        return Ok(Vec::new());
370    }
371    let Some(values) = value.as_array() else {
372        return Err(session_remind_shape_error("`tags` must be a list"));
373    };
374    let mut tags = Vec::new();
375    for value in values {
376        let Some(tag) = value.as_str() else {
377            return Err(session_remind_shape_error(format!(
378                "`tags` entries must be strings, got {value}"
379            )));
380        };
381        let tag = tag.trim();
382        if tag.is_empty() {
383            return Err(session_remind_shape_error(
384                "`tags` entries must be non-empty strings",
385            ));
386        }
387        if !tags.iter().any(|existing| existing == tag) {
388            tags.push(tag.to_string());
389        }
390    }
391    Ok(tags)
392}
393
394fn session_remind_payload_from_value(
395    value: &serde_json::Value,
396) -> Result<crate::llm::helpers::SystemReminder, String> {
397    let Some(map) = value.as_object() else {
398        return Err(session_remind_shape_error(
399            "session/remind payload must be a reminder object",
400        ));
401    };
402    const ALLOWED: &[&str] = &[
403        "_meta",
404        "body",
405        "dedupe_key",
406        "fired_at_turn",
407        "id",
408        "preserve_on_compact",
409        "propagate",
410        "role_hint",
411        "source",
412        "tags",
413        "ttl_turns",
414    ];
415    let unknown = map
416        .keys()
417        .filter(|key| !ALLOWED.contains(&key.as_str()))
418        .map(String::as_str)
419        .collect::<Vec<_>>();
420    if !unknown.is_empty() {
421        if unknown.contains(&"content") {
422            return Err(session_remind_shape_error(
423                "session/remind expects reminder `body`, not user-message `content`",
424            ));
425        }
426        return Err(reminder_unknown_option_error(format!(
427            "unknown reminder option(s): {}",
428            unknown.join(", ")
429        )));
430    }
431    if let Some(meta) = map.get("_meta") {
432        if !meta.is_null() && !meta.is_object() {
433            return Err(session_remind_shape_error("`_meta` must be an object"));
434        }
435    }
436    let ttl_turns = int_field(map, "ttl_turns")?;
437    if let Some(value) = ttl_turns {
438        if value <= 0 {
439            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
440        }
441    }
442    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
443    if fired_at_turn < 0 {
444        return Err(session_remind_shape_error(
445            "`fired_at_turn` must be >= 0 when provided",
446        ));
447    }
448    match string_field(map, "source", false)?.as_deref() {
449        None | Some("bridge") => {}
450        Some(_) => {
451            return Err(session_remind_shape_error(
452                "`source` for session/remind must be bridge when provided",
453            ))
454        }
455    }
456    let propagate = match string_field(map, "propagate", false)?.as_deref() {
457        None => crate::llm::helpers::ReminderPropagate::Session,
458        Some("all") => crate::llm::helpers::ReminderPropagate::All,
459        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
460        Some("none") => crate::llm::helpers::ReminderPropagate::None,
461        Some(_) => {
462            return Err(session_remind_shape_error(
463                "`propagate` must be one of all, session, or none",
464            ))
465        }
466    };
467    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
468        None => crate::llm::helpers::ReminderRoleHint::System,
469        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
470        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
471        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
472        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
473        Some(_) => {
474            return Err(session_remind_shape_error(
475                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
476            ))
477        }
478    };
479    Ok(crate::llm::helpers::SystemReminder {
480        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
481        tags: tags_field(map)?,
482        dedupe_key: string_field(map, "dedupe_key", false)?,
483        ttl_turns,
484        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
485        propagate,
486        role_hint,
487        source: crate::llm::helpers::ReminderSource::Bridge,
488        body: string_field(map, "body", true)?.unwrap_or_default(),
489        fired_at_turn,
490        originating_agent_id: None,
491    })
492}
493
494fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
495    let mode = QueuedUserMessageMode::from_str(
496        params
497            .get("mode")
498            .and_then(|value| value.as_str())
499            .unwrap_or("wait_for_completion"),
500    );
501    let reminder_value = if let Some(reminder) = params.get("reminder") {
502        reminder.clone()
503    } else {
504        let Some(params) = params.as_object() else {
505            return Err(session_remind_shape_error(
506                "session/remind params must be an object",
507            ));
508        };
509        let mut reminder = params.clone();
510        reminder.remove("mode");
511        reminder.remove("sessionId");
512        reminder.remove("session_id");
513        serde_json::Value::Object(reminder)
514    };
515    Ok(QueuedReminder {
516        reminder: session_remind_payload_from_value(&reminder_value)?,
517        mode,
518    })
519}
520
521// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
522#[allow(clippy::new_without_default)]
523impl HostBridge {
524    /// Create a new bridge and spawn the stdin reader task.
525    ///
526    /// Must be called within a tokio LocalSet (uses spawn_local for the
527    /// stdin reader since it's single-threaded).
528    pub fn new() -> Self {
529        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
530            Arc::new(Mutex::new(HashMap::new()));
531        let cancelled = Arc::new(AtomicBool::new(false));
532        let cancel_notify = Arc::new(Notify::new());
533        let queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>> =
534            Arc::new(Mutex::new(VecDeque::new()));
535        let resume_requested = Arc::new(AtomicBool::new(false));
536        let skills_reload_requested = Arc::new(AtomicBool::new(false));
537        let daemon_idle = Arc::new(AtomicBool::new(false));
538
539        // Stdin reader: reads JSON-RPC lines and dispatches responses
540        let pending_clone = pending.clone();
541        let cancelled_clone = cancelled.clone();
542        let cancel_notify_clone = cancel_notify.clone();
543        let queued_clone = queued_transcript_injections.clone();
544        let resume_clone = resume_requested.clone();
545        let skills_reload_clone = skills_reload_requested.clone();
546        tokio::task::spawn_local(async move {
547            let stdin = tokio::io::stdin();
548            let reader = tokio::io::BufReader::new(stdin);
549            let mut lines = reader.lines();
550
551            while let Ok(Some(line)) = lines.next_line().await {
552                let line = line.trim().to_string();
553                if line.is_empty() {
554                    continue;
555                }
556
557                let msg: serde_json::Value = match serde_json::from_str(&line) {
558                    Ok(v) => v,
559                    Err(_) => continue,
560                };
561
562                // Notifications have no id; responses have one.
563                if msg.get("id").is_none() {
564                    if let Some(method) = msg["method"].as_str() {
565                        if method == "cancel" {
566                            cancelled_clone.store(true, Ordering::SeqCst);
567                            cancel_notify_clone.notify_waiters();
568                        } else if method == "agent/resume" {
569                            resume_clone.store(true, Ordering::SeqCst);
570                        } else if method == "skills/update" {
571                            skills_reload_clone.store(true, Ordering::SeqCst);
572                        } else if method == "user_message"
573                            || method == "session/input"
574                            || method == "agent/user_message"
575                        {
576                            let params = &msg["params"];
577                            if let Some(message) = queue_user_message_from_params(params) {
578                                queued_clone
579                                    .lock()
580                                    .await
581                                    .push_back(QueuedTranscriptInjection::User(message));
582                            }
583                        } else if method == "session/remind" {
584                            let params = &msg["params"];
585                            if let Ok(reminder) = queued_session_remind_from_params(params) {
586                                queued_clone
587                                    .lock()
588                                    .await
589                                    .push_back(QueuedTranscriptInjection::Reminder(reminder));
590                            }
591                        }
592                    }
593                    continue;
594                }
595
596                if let Some(id) = msg["id"].as_u64() {
597                    let mut pending = pending_clone.lock().await;
598                    if let Some(sender) = pending.remove(&id) {
599                        let _ = sender.send(msg);
600                    }
601                }
602            }
603
604            // stdin closed: drop pending senders to cancel waiters.
605            let mut pending = pending_clone.lock().await;
606            pending.clear();
607        });
608
609        Self {
610            next_id: AtomicU64::new(1),
611            pending,
612            cancelled,
613            cancel_notify,
614            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
615            session_id: std::sync::Mutex::new(String::new()),
616            script_name: std::sync::Mutex::new(String::new()),
617            queued_transcript_injections,
618            resume_requested,
619            skills_reload_requested,
620            daemon_idle,
621            prompt_stop_reason: std::sync::Mutex::new(None),
622            visible_call_states: std::sync::Mutex::new(HashMap::new()),
623            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
624            in_process: None,
625        }
626    }
627
628    /// Create a bridge from pre-existing shared state.
629    ///
630    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
631    /// responsible for dispatching responses into `pending`.  This is used
632    /// by ACP mode which already has its own stdin reader.
633    pub fn from_parts(
634        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
635        cancelled: Arc<AtomicBool>,
636        stdout_lock: Arc<std::sync::Mutex<()>>,
637        start_id: u64,
638    ) -> Self {
639        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
640    }
641
642    pub fn from_parts_with_writer(
643        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
644        cancelled: Arc<AtomicBool>,
645        writer: HostBridgeWriter,
646        start_id: u64,
647    ) -> Self {
648        Self::from_parts_with_writer_and_cancel_notify(
649            pending,
650            cancelled,
651            Arc::new(Notify::new()),
652            writer,
653            start_id,
654        )
655    }
656
657    pub fn from_parts_with_writer_and_cancel_notify(
658        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
659        cancelled: Arc<AtomicBool>,
660        cancel_notify: Arc<Notify>,
661        writer: HostBridgeWriter,
662        start_id: u64,
663    ) -> Self {
664        Self {
665            next_id: AtomicU64::new(start_id),
666            pending,
667            cancelled,
668            cancel_notify,
669            writer,
670            session_id: std::sync::Mutex::new(String::new()),
671            script_name: std::sync::Mutex::new(String::new()),
672            queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
673            resume_requested: Arc::new(AtomicBool::new(false)),
674            skills_reload_requested: Arc::new(AtomicBool::new(false)),
675            daemon_idle: Arc::new(AtomicBool::new(false)),
676            prompt_stop_reason: std::sync::Mutex::new(None),
677            visible_call_states: std::sync::Mutex::new(HashMap::new()),
678            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
679            in_process: None,
680        }
681    }
682
683    /// Create an in-process host bridge backed by exported functions from a
684    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
685    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
686        let exported_functions = vm.load_module_exports(module_path).await?;
687        Ok(Self {
688            next_id: AtomicU64::new(1),
689            pending: Arc::new(Mutex::new(HashMap::new())),
690            cancelled: Arc::new(AtomicBool::new(false)),
691            cancel_notify: Arc::new(Notify::new()),
692            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
693            session_id: std::sync::Mutex::new(String::new()),
694            script_name: std::sync::Mutex::new(String::new()),
695            queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
696            resume_requested: Arc::new(AtomicBool::new(false)),
697            skills_reload_requested: Arc::new(AtomicBool::new(false)),
698            daemon_idle: Arc::new(AtomicBool::new(false)),
699            prompt_stop_reason: std::sync::Mutex::new(None),
700            visible_call_states: std::sync::Mutex::new(HashMap::new()),
701            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
702            in_process: Some(InProcessHost {
703                module_path: module_path.to_path_buf(),
704                exported_functions,
705                vm,
706            }),
707        })
708    }
709
710    /// Set the ACP session ID for session-scoped notifications.
711    pub fn set_session_id(&self, id: &str) {
712        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
713    }
714
715    /// Set the currently executing script name (without .harn suffix).
716    pub fn set_script_name(&self, name: &str) {
717        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
718    }
719
720    /// Get the current script name.
721    fn get_script_name(&self) -> String {
722        self.script_name
723            .lock()
724            .unwrap_or_else(|e| e.into_inner())
725            .clone()
726    }
727
728    /// Get the session ID.
729    pub fn get_session_id(&self) -> String {
730        self.session_id
731            .lock()
732            .unwrap_or_else(|e| e.into_inner())
733            .clone()
734    }
735
736    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
737    fn write_line(&self, line: &str) -> Result<(), VmError> {
738        (self.writer)(line).map_err(VmError::Runtime)
739    }
740
741    /// Send a JSON-RPC request to the host and wait for the response.
742    /// Times out after 5 minutes to prevent deadlocks.
743    pub async fn call(
744        &self,
745        method: &str,
746        params: serde_json::Value,
747    ) -> Result<serde_json::Value, VmError> {
748        if let Some(in_process) = &self.in_process {
749            return in_process.dispatch(method, params).await;
750        }
751
752        if self.is_cancelled() {
753            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
754        }
755
756        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
757        let cancel_wait = self.cancel_notify.notified();
758        tokio::pin!(cancel_wait);
759
760        let request = crate::jsonrpc::request(id, method, params);
761
762        let (tx, rx) = oneshot::channel();
763        {
764            let mut pending = self.pending.lock().await;
765            pending.insert(id, tx);
766        }
767
768        let line = serde_json::to_string(&request)
769            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
770        if let Err(e) = self.write_line(&line) {
771            let mut pending = self.pending.lock().await;
772            pending.remove(&id);
773            return Err(e);
774        }
775
776        if self.is_cancelled() {
777            let mut pending = self.pending.lock().await;
778            pending.remove(&id);
779            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
780        }
781
782        let response = tokio::select! {
783            result = rx => match result {
784                Ok(msg) => msg,
785                Err(_) => {
786                    // Sender dropped: host closed or stdin reader exited.
787                    return Err(VmError::Runtime(
788                        "Bridge: host closed connection before responding".into(),
789                    ));
790                }
791            },
792            _ = &mut cancel_wait => {
793                let mut pending = self.pending.lock().await;
794                pending.remove(&id);
795                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
796            }
797            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
798                let mut pending = self.pending.lock().await;
799                pending.remove(&id);
800                return Err(VmError::Runtime(format!(
801                    "Bridge: host did not respond to '{method}' within {}s",
802                    DEFAULT_TIMEOUT.as_secs()
803                )));
804            }
805        };
806
807        if let Some(error) = response.get("error") {
808            let message = error["message"].as_str().unwrap_or("Unknown host error");
809            let code = error["code"].as_i64().unwrap_or(-1);
810            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
811            if code == -32001 {
812                return Err(VmError::CategorizedError {
813                    message: message.to_string(),
814                    category: ErrorCategory::ToolRejected,
815                });
816            }
817            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
818        }
819
820        Ok(response["result"].clone())
821    }
822
823    /// Send a JSON-RPC notification to the host (no response expected).
824    /// Serialized through the stdout mutex to prevent interleaving.
825    pub fn notify(&self, method: &str, params: serde_json::Value) {
826        let notification = crate::jsonrpc::notification(method, params);
827        if self.in_process.is_some() {
828            return;
829        }
830        if let Ok(line) = serde_json::to_string(&notification) {
831            let _ = self.write_line(&line);
832        }
833    }
834
835    /// Check if the host has sent a cancel notification.
836    pub fn is_cancelled(&self) -> bool {
837        self.cancelled.load(Ordering::SeqCst)
838    }
839
840    pub fn take_resume_signal(&self) -> bool {
841        self.resume_requested.swap(false, Ordering::SeqCst)
842    }
843
844    pub fn signal_resume(&self) {
845        self.resume_requested.store(true, Ordering::SeqCst);
846    }
847
848    pub fn set_daemon_idle(&self, idle: bool) {
849        self.daemon_idle.store(idle, Ordering::SeqCst);
850    }
851
852    pub fn is_daemon_idle(&self) -> bool {
853        self.daemon_idle.load(Ordering::SeqCst)
854    }
855
856    /// Record the canonical ACP `stopReason` for the current prompt. The
857    /// last writer wins, which matches the semantic that an outer
858    /// `agent_loop` (the one whose result the user observes) always
859    /// finalizes after any inner loops it spawned.
860    pub fn set_prompt_stop_reason(&self, reason: &str) {
861        *self
862            .prompt_stop_reason
863            .lock()
864            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
865    }
866
867    /// Consume any prompt stop reason recorded during this prompt. The
868    /// ACP adapter calls this once after the pipeline returns; pipelines
869    /// that didn't run an `agent_loop` see `None` and the adapter falls
870    /// back to `end_turn`.
871    pub fn take_prompt_stop_reason(&self) -> Option<String> {
872        self.prompt_stop_reason
873            .lock()
874            .unwrap_or_else(|e| e.into_inner())
875            .take()
876    }
877
878    /// Consume any pending `skills/update` signal the host has sent.
879    /// Returns `true` exactly once per notification, letting callers
880    /// trigger a layered-discovery rebuild without polling false
881    /// positives. See issue #73 for the hot-reload contract.
882    pub fn take_skills_reload_signal(&self) -> bool {
883        self.skills_reload_requested.swap(false, Ordering::SeqCst)
884    }
885
886    /// Manually mark the skill catalog as stale. Used by tests and by
887    /// the CLI when an internal event (e.g. `harn install`) should
888    /// trigger the same rebuild a `skills/update` notification would.
889    pub fn signal_skills_reload(&self) {
890        self.skills_reload_requested.store(true, Ordering::SeqCst);
891    }
892
893    /// Call the host's `skills/list` RPC and return the raw JSON array
894    /// it responded with. Shape:
895    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
896    /// The CLI adapter converts each entry into a
897    /// [`crate::skills::SkillManifestRef`].
898    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
899        let result = self.call("skills/list", serde_json::json!({})).await?;
900        match result {
901            serde_json::Value::Array(items) => Ok(items),
902            serde_json::Value::Object(map) => match map.get("skills") {
903                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
904                _ => Err(VmError::Runtime(
905                    "skills/list: host response must be an array or { skills: [...] }".into(),
906                )),
907            },
908            _ => Err(VmError::Runtime(
909                "skills/list: unexpected response shape".into(),
910            )),
911        }
912    }
913
914    /// Call the host's `host/tools/list` RPC and return normalized tool
915    /// descriptors. Shape:
916    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
917    /// The bridge also accepts `{ "tools": [...] }` and
918    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
919    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
920        let result = self.call("host/tools/list", serde_json::json!({})).await?;
921        parse_host_tools_list_response(result)
922    }
923
924    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
925    /// raw JSON body so the CLI can inspect both the frontmatter fields
926    /// and the skill markdown body in whatever shape the host sends.
927    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
928        self.call("skills/fetch", serde_json::json!({ "id": id }))
929            .await
930    }
931
932    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
933        self.queued_transcript_injections
934            .lock()
935            .await
936            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
937                content,
938                mode: QueuedUserMessageMode::from_str(mode),
939            }));
940    }
941
942    pub async fn push_queued_session_remind_from_params(
943        &self,
944        params: &serde_json::Value,
945    ) -> Result<String, String> {
946        let reminder = queued_session_remind_from_params(params)?;
947        let reminder_id = reminder.reminder.id.clone();
948        self.queued_transcript_injections
949            .lock()
950            .await
951            .push_back(QueuedTranscriptInjection::Reminder(reminder));
952        Ok(reminder_id)
953    }
954
955    pub async fn take_queued_user_messages(
956        &self,
957        include_interrupt_immediate: bool,
958        include_finish_step: bool,
959        include_wait_for_completion: bool,
960    ) -> Vec<QueuedUserMessage> {
961        let mut queue = self.queued_transcript_injections.lock().await;
962        let mut selected = Vec::new();
963        let mut retained = VecDeque::new();
964        while let Some(injection) = queue.pop_front() {
965            let should_take = match injection.mode() {
966                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
967                QueuedUserMessageMode::FinishStep => include_finish_step,
968                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
969            };
970            match (should_take, injection) {
971                (true, QueuedTranscriptInjection::User(message)) => selected.push(message),
972                (_, injection) => retained.push_back(injection),
973            }
974        }
975        *queue = retained;
976        selected
977    }
978
979    pub async fn take_queued_transcript_injections(
980        &self,
981        include_interrupt_immediate: bool,
982        include_finish_step: bool,
983        include_wait_for_completion: bool,
984    ) -> Vec<QueuedTranscriptInjection> {
985        let mut queue = self.queued_transcript_injections.lock().await;
986        let mut selected = Vec::new();
987        let mut retained = VecDeque::new();
988        while let Some(injection) = queue.pop_front() {
989            let should_take = match injection.mode() {
990                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
991                QueuedUserMessageMode::FinishStep => include_finish_step,
992                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
993            };
994            if should_take {
995                selected.push(injection);
996            } else {
997                retained.push_back(injection);
998            }
999        }
1000        *queue = retained;
1001        selected
1002    }
1003
1004    pub async fn take_queued_user_messages_for(
1005        &self,
1006        checkpoint: DeliveryCheckpoint,
1007    ) -> Vec<QueuedUserMessage> {
1008        match checkpoint {
1009            DeliveryCheckpoint::InterruptImmediate => {
1010                self.take_queued_user_messages(true, false, false).await
1011            }
1012            DeliveryCheckpoint::AfterCurrentOperation => {
1013                self.take_queued_user_messages(false, true, false).await
1014            }
1015            DeliveryCheckpoint::EndOfInteraction => {
1016                self.take_queued_user_messages(false, false, true).await
1017            }
1018        }
1019    }
1020
1021    pub async fn take_queued_transcript_injections_for(
1022        &self,
1023        checkpoint: DeliveryCheckpoint,
1024    ) -> Vec<QueuedTranscriptInjection> {
1025        match checkpoint {
1026            DeliveryCheckpoint::InterruptImmediate => {
1027                self.take_queued_transcript_injections(true, false, false)
1028                    .await
1029            }
1030            DeliveryCheckpoint::AfterCurrentOperation => {
1031                self.take_queued_transcript_injections(false, true, false)
1032                    .await
1033            }
1034            DeliveryCheckpoint::EndOfInteraction => {
1035                self.take_queued_transcript_injections(false, false, true)
1036                    .await
1037            }
1038        }
1039    }
1040
1041    /// Send an output notification (for log/print in bridge mode).
1042    pub fn send_output(&self, text: &str) {
1043        self.notify("output", serde_json::json!({"text": text}));
1044    }
1045
1046    /// Send a progress notification with optional numeric progress and structured data.
1047    pub fn send_progress(
1048        &self,
1049        phase: &str,
1050        message: &str,
1051        progress: Option<i64>,
1052        total: Option<i64>,
1053        data: Option<serde_json::Value>,
1054    ) {
1055        let mut payload = serde_json::json!({"phase": phase, "message": message});
1056        if let Some(p) = progress {
1057            payload["progress"] = serde_json::json!(p);
1058        }
1059        if let Some(t) = total {
1060            payload["total"] = serde_json::json!(t);
1061        }
1062        if let Some(d) = data {
1063            payload["data"] = d;
1064        }
1065        self.notify("progress", payload);
1066    }
1067
1068    /// Send a structured log notification.
1069    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1070        let mut payload = serde_json::json!({"level": level, "message": message});
1071        if let Some(f) = fields {
1072            payload["fields"] = f;
1073        }
1074        self.notify("log", payload);
1075    }
1076
1077    /// Send a `session/update` with `call_start` — signals the beginning of
1078    /// an LLM call, tool call, or builtin call for observability.
1079    pub fn send_call_start(
1080        &self,
1081        call_id: &str,
1082        call_type: &str,
1083        name: &str,
1084        metadata: serde_json::Value,
1085    ) {
1086        let session_id = self.get_session_id();
1087        let script = self.get_script_name();
1088        let stream_publicly = metadata
1089            .get("stream_publicly")
1090            .and_then(|value| value.as_bool())
1091            .unwrap_or(true);
1092        self.visible_call_streams
1093            .lock()
1094            .unwrap_or_else(|e| e.into_inner())
1095            .insert(call_id.to_string(), stream_publicly);
1096        self.notify(
1097            "session/update",
1098            serde_json::json!({
1099                "sessionId": session_id,
1100                "update": {
1101                    "sessionUpdate": "call_start",
1102                    "content": {
1103                        "toolCallId": call_id,
1104                        "call_type": call_type,
1105                        "name": name,
1106                        "script": script,
1107                        "metadata": metadata,
1108                    },
1109                },
1110            }),
1111        );
1112    }
1113
1114    /// Send a `session/update` with `call_progress` — a streaming token delta
1115    /// from an in-flight LLM call.
1116    pub fn send_call_progress(
1117        &self,
1118        call_id: &str,
1119        delta: &str,
1120        accumulated_tokens: u64,
1121        user_visible: bool,
1122    ) {
1123        let session_id = self.get_session_id();
1124        let (visible_text, visible_delta) = {
1125            let stream_publicly = self
1126                .visible_call_streams
1127                .lock()
1128                .unwrap_or_else(|e| e.into_inner())
1129                .get(call_id)
1130                .copied()
1131                .unwrap_or(true);
1132            let mut states = self
1133                .visible_call_states
1134                .lock()
1135                .unwrap_or_else(|e| e.into_inner());
1136            let state = states.entry(call_id.to_string()).or_default();
1137            state.push(delta, stream_publicly)
1138        };
1139        self.notify(
1140            "session/update",
1141            serde_json::json!({
1142                "sessionId": session_id,
1143                "update": {
1144                    "sessionUpdate": "call_progress",
1145                    "content": {
1146                        "toolCallId": call_id,
1147                        "delta": delta,
1148                        "accumulated_tokens": accumulated_tokens,
1149                        "visible_text": visible_text,
1150                        "visible_delta": visible_delta,
1151                        "user_visible": user_visible,
1152                    },
1153                },
1154            }),
1155        );
1156    }
1157
1158    /// Send a `session/update` with `call_end` — signals completion of a call.
1159    pub fn send_call_end(
1160        &self,
1161        call_id: &str,
1162        call_type: &str,
1163        name: &str,
1164        duration_ms: u64,
1165        status: &str,
1166        metadata: serde_json::Value,
1167    ) {
1168        let session_id = self.get_session_id();
1169        let script = self.get_script_name();
1170        self.visible_call_states
1171            .lock()
1172            .unwrap_or_else(|e| e.into_inner())
1173            .remove(call_id);
1174        self.visible_call_streams
1175            .lock()
1176            .unwrap_or_else(|e| e.into_inner())
1177            .remove(call_id);
1178        self.notify(
1179            "session/update",
1180            serde_json::json!({
1181                "sessionId": session_id,
1182                "update": {
1183                    "sessionUpdate": "call_end",
1184                    "content": {
1185                        "toolCallId": call_id,
1186                        "call_type": call_type,
1187                        "name": name,
1188                        "script": script,
1189                        "duration_ms": duration_ms,
1190                        "status": status,
1191                        "metadata": metadata,
1192                    },
1193                },
1194            }),
1195        );
1196    }
1197
1198    /// Send a worker lifecycle update for delegated/background execution.
1199    pub fn send_worker_update(
1200        &self,
1201        worker_id: &str,
1202        worker_name: &str,
1203        status: &str,
1204        metadata: serde_json::Value,
1205        audit: Option<&MutationSessionRecord>,
1206    ) {
1207        let session_id = self.get_session_id();
1208        let script = self.get_script_name();
1209        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1210        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1211        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1212        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1213        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1214        let lifecycle = serde_json::json!({
1215            "event": status,
1216            "worker_id": worker_id,
1217            "worker_name": worker_name,
1218            "started_at": started_at,
1219            "finished_at": finished_at,
1220        });
1221        self.notify(
1222            "session/update",
1223            serde_json::json!({
1224                "sessionId": session_id,
1225                "update": {
1226                    "sessionUpdate": "worker_update",
1227                    "content": {
1228                        "worker_id": worker_id,
1229                        "worker_name": worker_name,
1230                        "status": status,
1231                        "script": script,
1232                        "started_at": started_at,
1233                        "finished_at": finished_at,
1234                        "snapshot_path": snapshot_path,
1235                        "run_id": run_id,
1236                        "run_path": run_path,
1237                        "lifecycle": lifecycle,
1238                        "audit": audit,
1239                        "metadata": metadata,
1240                    },
1241                },
1242            }),
1243        );
1244    }
1245}
1246
1247/// Convert a serde_json::Value to a VmValue.
1248pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1249    crate::stdlib::json_to_vm_value(val)
1250}
1251
1252fn parse_host_tools_list_response(
1253    result: serde_json::Value,
1254) -> Result<Vec<serde_json::Value>, VmError> {
1255    let tools = match result {
1256        serde_json::Value::Array(items) => items,
1257        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1258            map.get("result")
1259                .and_then(|value| value.get("tools"))
1260                .cloned()
1261        }) {
1262            Some(serde_json::Value::Array(items)) => items,
1263            _ => {
1264                return Err(VmError::Runtime(
1265                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1266                ));
1267            }
1268        },
1269        _ => {
1270            return Err(VmError::Runtime(
1271                "host/tools/list: unexpected response shape".into(),
1272            ));
1273        }
1274    };
1275
1276    let mut normalized = Vec::with_capacity(tools.len());
1277    for tool in tools {
1278        let serde_json::Value::Object(map) = tool else {
1279            return Err(VmError::Runtime(
1280                "host/tools/list: every tool must be an object".into(),
1281            ));
1282        };
1283        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1284            return Err(VmError::Runtime(
1285                "host/tools/list: every tool must include a string `name`".into(),
1286            ));
1287        };
1288        let description = map
1289            .get("description")
1290            .and_then(|value| value.as_str())
1291            .or_else(|| {
1292                map.get("short_description")
1293                    .and_then(|value| value.as_str())
1294            })
1295            .unwrap_or_default();
1296        let schema = map
1297            .get("schema")
1298            .cloned()
1299            .or_else(|| map.get("parameters").cloned())
1300            .or_else(|| map.get("input_schema").cloned())
1301            .unwrap_or(serde_json::Value::Null);
1302        let deprecated = map
1303            .get("deprecated")
1304            .and_then(|value| value.as_bool())
1305            .unwrap_or(false);
1306        normalized.push(serde_json::json!({
1307            "name": name,
1308            "description": description,
1309            "schema": schema,
1310            "deprecated": deprecated,
1311        }));
1312    }
1313    Ok(normalized)
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318    use super::*;
1319
1320    fn test_bridge() -> HostBridge {
1321        HostBridge::from_parts(
1322            Arc::new(Mutex::new(HashMap::new())),
1323            Arc::new(AtomicBool::new(false)),
1324            Arc::new(std::sync::Mutex::new(())),
1325            1,
1326        )
1327    }
1328
1329    #[test]
1330    fn test_json_rpc_request_format() {
1331        let request = crate::jsonrpc::request(
1332            1,
1333            "llm_call",
1334            serde_json::json!({
1335                "prompt": "Hello",
1336                "system": "Be helpful",
1337            }),
1338        );
1339        let s = serde_json::to_string(&request).unwrap();
1340        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1341        assert!(s.contains("\"id\":1"));
1342        assert!(s.contains("\"method\":\"llm_call\""));
1343    }
1344
1345    #[test]
1346    fn test_json_rpc_notification_format() {
1347        let notification =
1348            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1349        let s = serde_json::to_string(&notification).unwrap();
1350        assert!(s.contains("\"method\":\"output\""));
1351        assert!(!s.contains("\"id\""));
1352    }
1353
1354    #[test]
1355    fn test_json_rpc_error_response_parsing() {
1356        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1357        assert!(response.get("error").is_some());
1358        assert_eq!(
1359            response["error"]["message"].as_str().unwrap(),
1360            "Invalid request"
1361        );
1362    }
1363
1364    #[test]
1365    fn test_json_rpc_success_response_parsing() {
1366        let response = crate::jsonrpc::response(
1367            1,
1368            serde_json::json!({
1369                "text": "Hello world",
1370                "input_tokens": 10,
1371                "output_tokens": 5,
1372            }),
1373        );
1374        assert!(response.get("result").is_some());
1375        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1376    }
1377
1378    #[test]
1379    fn test_cancelled_flag() {
1380        let cancelled = Arc::new(AtomicBool::new(false));
1381        assert!(!cancelled.load(Ordering::SeqCst));
1382        cancelled.store(true, Ordering::SeqCst);
1383        assert!(cancelled.load(Ordering::SeqCst));
1384    }
1385
1386    #[test]
1387    fn pending_host_calls_return_when_cancellation_arrives() {
1388        let runtime = tokio::runtime::Builder::new_current_thread()
1389            .enable_all()
1390            .build()
1391            .unwrap();
1392        runtime.block_on(async {
1393            let pending = Arc::new(Mutex::new(HashMap::new()));
1394            let cancelled = Arc::new(AtomicBool::new(false));
1395            let bridge = HostBridge::from_parts_with_writer(
1396                pending.clone(),
1397                cancelled.clone(),
1398                Arc::new(|_| Ok(())),
1399                1,
1400            );
1401
1402            let call = bridge.call("host/work", serde_json::json!({}));
1403            tokio::pin!(call);
1404
1405            loop {
1406                tokio::select! {
1407                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1408                    _ = tokio::task::yield_now() => {}
1409                }
1410                if !pending.lock().await.is_empty() {
1411                    break;
1412                }
1413            }
1414
1415            cancelled.store(true, Ordering::SeqCst);
1416            bridge.cancel_notify.notify_waiters();
1417
1418            let result = tokio::time::timeout(Duration::from_secs(1), call)
1419                .await
1420                .expect("pending call should observe cancellation promptly");
1421            assert!(
1422                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1423            );
1424            assert!(pending.lock().await.is_empty());
1425        });
1426    }
1427
1428    #[test]
1429    fn queued_messages_are_filtered_by_delivery_mode() {
1430        let runtime = tokio::runtime::Builder::new_current_thread()
1431            .enable_all()
1432            .build()
1433            .unwrap();
1434        runtime.block_on(async {
1435            let bridge = test_bridge();
1436            bridge
1437                .push_queued_user_message("first".to_string(), "finish_step")
1438                .await;
1439            bridge
1440                .push_queued_user_message("second".to_string(), "wait_for_completion")
1441                .await;
1442
1443            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1444            assert_eq!(finish_step.len(), 1);
1445            assert_eq!(finish_step[0].content, "first");
1446
1447            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1448            assert_eq!(turn_end.len(), 1);
1449            assert_eq!(turn_end[0].content, "second");
1450        });
1451    }
1452
1453    #[test]
1454    fn queued_transcript_injections_preserve_user_reminder_separation() {
1455        let runtime = tokio::runtime::Builder::new_current_thread()
1456            .enable_all()
1457            .build()
1458            .unwrap();
1459        runtime.block_on(async {
1460            let bridge = test_bridge();
1461            bridge
1462                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1463                .await;
1464            let reminder_id = bridge
1465                .push_queued_session_remind_from_params(&serde_json::json!({
1466                    "body": "Host-provided ambient context.",
1467                    "tags": ["host"],
1468                    "dedupe_key": "host-context",
1469                    "ttl_turns": 2,
1470                    "mode": "wait_for_completion",
1471                    "_meta": {"harn": {"source": "test"}},
1472                }))
1473                .await
1474                .expect("valid reminder");
1475
1476            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1477            assert_eq!(finish_step.len(), 1);
1478            assert_eq!(finish_step[0].content, "human follow-up");
1479
1480            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1481            assert!(no_user_messages.is_empty());
1482
1483            let injections = bridge
1484                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1485                .await;
1486            assert_eq!(injections.len(), 1);
1487            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1488                panic!("expected queued reminder");
1489            };
1490            assert_eq!(reminder.reminder.id, reminder_id);
1491            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1492            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1493            assert_eq!(
1494                reminder.reminder.dedupe_key.as_deref(),
1495                Some("host-context")
1496            );
1497            assert_eq!(reminder.reminder.ttl_turns, Some(2));
1498            assert_eq!(
1499                reminder.reminder.source,
1500                crate::llm::helpers::ReminderSource::Bridge
1501            );
1502        });
1503    }
1504
1505    #[test]
1506    fn bridge_remind_modes_honor_delivery_checkpoints() {
1507        let runtime = tokio::runtime::Builder::new_current_thread()
1508            .enable_all()
1509            .build()
1510            .unwrap();
1511        runtime.block_on(async {
1512            let cases = [
1513                (
1514                    "interrupt_immediate",
1515                    DeliveryCheckpoint::InterruptImmediate,
1516                    DeliveryCheckpoint::AfterCurrentOperation,
1517                ),
1518                (
1519                    "finish_step",
1520                    DeliveryCheckpoint::AfterCurrentOperation,
1521                    DeliveryCheckpoint::EndOfInteraction,
1522                ),
1523                (
1524                    "wait_for_completion",
1525                    DeliveryCheckpoint::EndOfInteraction,
1526                    DeliveryCheckpoint::InterruptImmediate,
1527                ),
1528            ];
1529
1530            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
1531                let bridge = test_bridge();
1532                bridge
1533                    .push_queued_session_remind_from_params(&serde_json::json!({
1534                        "body": format!("Reminder for {mode}"),
1535                        "mode": mode,
1536                    }))
1537                    .await
1538                    .expect("valid session/remind payload");
1539
1540                let premature = bridge
1541                    .take_queued_transcript_injections_for(wrong_checkpoint)
1542                    .await;
1543                assert!(
1544                    premature.is_empty(),
1545                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
1546                );
1547
1548                let delivered = bridge
1549                    .take_queued_transcript_injections_for(expected_checkpoint)
1550                    .await;
1551                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
1552                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
1553                    panic!("expected reminder for {mode}");
1554                };
1555                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
1556            }
1557        });
1558    }
1559
1560    #[test]
1561    fn bridge_session_input_path_never_produces_reminder() {
1562        let runtime = tokio::runtime::Builder::new_current_thread()
1563            .enable_all()
1564            .build()
1565            .unwrap();
1566        runtime.block_on(async {
1567            let bridge = test_bridge();
1568            bridge
1569                .push_queued_user_message("still user input".to_string(), "finish_step")
1570                .await;
1571
1572            let delivered = bridge
1573                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
1574                .await;
1575            assert_eq!(delivered.len(), 1);
1576            let QueuedTranscriptInjection::User(message) = &delivered[0] else {
1577                panic!("session/input queue path must produce a user message");
1578            };
1579            assert_eq!(message.content, "still user input");
1580        });
1581    }
1582
1583    #[test]
1584    fn session_remind_validation_rejects_user_message_shape() {
1585        let err = queued_session_remind_from_params(&serde_json::json!({
1586            "content": "this is still a user message",
1587            "mode": "interrupt_immediate",
1588        }))
1589        .expect_err("session/remind must require a reminder body");
1590        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
1591        assert!(err.contains("body"));
1592    }
1593
1594    #[test]
1595    fn session_remind_validation_rejects_unknown_options_separately() {
1596        let err = queued_session_remind_from_params(&serde_json::json!({
1597            "body": "valid body",
1598            "unknown_host_field": true,
1599        }))
1600        .expect_err("session/remind must reject unknown top-level fields");
1601        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
1602        assert!(err.contains("unknown_host_field"));
1603    }
1604
1605    #[test]
1606    fn test_json_result_to_vm_value_string() {
1607        let val = serde_json::json!("hello");
1608        let vm_val = json_result_to_vm_value(&val);
1609        assert_eq!(vm_val.display(), "hello");
1610    }
1611
1612    #[test]
1613    fn test_json_result_to_vm_value_dict() {
1614        let val = serde_json::json!({"name": "test", "count": 42});
1615        let vm_val = json_result_to_vm_value(&val);
1616        let VmValue::Dict(d) = &vm_val else {
1617            unreachable!("Expected Dict, got {:?}", vm_val);
1618        };
1619        assert_eq!(d.get("name").unwrap().display(), "test");
1620        assert_eq!(d.get("count").unwrap().display(), "42");
1621    }
1622
1623    #[test]
1624    fn test_json_result_to_vm_value_null() {
1625        let val = serde_json::json!(null);
1626        let vm_val = json_result_to_vm_value(&val);
1627        assert!(matches!(vm_val, VmValue::Nil));
1628    }
1629
1630    #[test]
1631    fn test_json_result_to_vm_value_nested() {
1632        let val = serde_json::json!({
1633            "text": "response",
1634            "tool_calls": [
1635                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1636            ],
1637            "input_tokens": 100,
1638            "output_tokens": 50,
1639        });
1640        let vm_val = json_result_to_vm_value(&val);
1641        let VmValue::Dict(d) = &vm_val else {
1642            unreachable!("Expected Dict, got {:?}", vm_val);
1643        };
1644        assert_eq!(d.get("text").unwrap().display(), "response");
1645        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1646            unreachable!("Expected List for tool_calls");
1647        };
1648        assert_eq!(list.len(), 1);
1649    }
1650
1651    #[test]
1652    fn parse_host_tools_list_accepts_object_wrapper() {
1653        let tools = parse_host_tools_list_response(serde_json::json!({
1654            "tools": [
1655                {
1656                    "name": "Read",
1657                    "description": "Read a file",
1658                    "schema": {"type": "object"},
1659                }
1660            ]
1661        }))
1662        .expect("tool list");
1663
1664        assert_eq!(tools.len(), 1);
1665        assert_eq!(tools[0]["name"], "Read");
1666        assert_eq!(tools[0]["deprecated"], false);
1667    }
1668
1669    #[test]
1670    fn parse_host_tools_list_accepts_compat_fields() {
1671        let tools = parse_host_tools_list_response(serde_json::json!({
1672            "result": {
1673                "tools": [
1674                    {
1675                        "name": "Edit",
1676                        "short_description": "Apply an edit",
1677                        "input_schema": {"type": "object"},
1678                        "deprecated": true,
1679                    }
1680                ]
1681            }
1682        }))
1683        .expect("tool list");
1684
1685        assert_eq!(tools[0]["description"], "Apply an edit");
1686        assert_eq!(tools[0]["schema"]["type"], "object");
1687        assert_eq!(tools[0]["deprecated"], true);
1688    }
1689
1690    #[test]
1691    fn parse_host_tools_list_requires_tool_names() {
1692        let err = parse_host_tools_list_response(serde_json::json!({
1693            "tools": [
1694                {"description": "missing name"}
1695            ]
1696        }))
1697        .expect_err("expected error");
1698        assert!(err
1699            .to_string()
1700            .contains("host/tools/list: every tool must include a string `name`"));
1701    }
1702
1703    #[test]
1704    fn test_timeout_duration() {
1705        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1706    }
1707}