Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23/// Default timeout for bridge calls (5 minutes).
24const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29    Arc::new(move |line: &str| {
30        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31        let mut stdout = std::io::stdout().lock();
32        stdout
33            .write_all(line.as_bytes())
34            .map_err(|e| format!("Bridge write error: {e}"))?;
35        stdout
36            .write_all(b"\n")
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .flush()
40            .map_err(|e| format!("Bridge flush error: {e}"))?;
41        Ok(())
42    })
43}
44
45/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
46///
47/// The bridge sends requests to the host on stdout and receives responses
48/// on stdin. A background task reads stdin and dispatches responses to
49/// waiting callers by request ID. All stdout writes are serialized through
50/// a mutex to prevent interleaving.
51pub struct HostBridge {
52    next_id: AtomicU64,
53    /// Pending request waiters, keyed by JSON-RPC id.
54    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55    /// Whether the host has sent a cancel notification.
56    cancelled: Arc<AtomicBool>,
57    /// Wakes pending host calls when cancellation arrives.
58    cancel_notify: Arc<Notify>,
59    /// Transport writer used to send JSON-RPC to the host.
60    writer: HostBridgeWriter,
61    /// ACP session ID (set in ACP mode for session-scoped notifications).
62    session_id: std::sync::Mutex<String>,
63    /// Name of the currently executing Harn script (without .harn suffix).
64    script_name: std::sync::Mutex<String>,
65    /// User messages injected by the host while a run is active.
66    queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
67    /// Host-triggered resume signal for daemon agents.
68    resume_requested: Arc<AtomicBool>,
69    /// Host-triggered skill-registry invalidation signal. Set when the
70    /// host sends a `skills/update` notification; consumed by the CLI
71    /// between runs (watch mode, long-running agents) to rebuild the
72    /// layered skill catalog from its current filesystem + host state.
73    skills_reload_requested: Arc<AtomicBool>,
74    /// Whether the current daemon-mode agent loop is blocked in idle wait.
75    daemon_idle: Arc<AtomicBool>,
76    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
77    /// finalize during this prompt. Read once by the ACP adapter when the
78    /// pipeline returns and populated by `host_agent_session_finalize`.
79    /// Pipelines that don't run an agent loop leave this `None`, in which
80    /// case the adapter falls back to `end_turn`.
81    prompt_stop_reason: std::sync::Mutex<Option<String>>,
82    /// Per-call visible assistant text state for call_progress notifications.
83    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
84    /// Whether an LLM call's deltas should be exposed to end users while streaming.
85    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
86    /// Optional in-process host-module backend used by `harn playground`.
87    in_process: Option<InProcessHost>,
88}
89
90struct InProcessHost {
91    module_path: PathBuf,
92    exported_functions: BTreeMap<String, Rc<VmClosure>>,
93    vm: Vm,
94}
95
96impl InProcessHost {
97    async fn dispatch(
98        &self,
99        method: &str,
100        params: serde_json::Value,
101    ) -> Result<serde_json::Value, VmError> {
102        match method {
103            "builtin_call" => {
104                let name = params
105                    .get("name")
106                    .and_then(|value| value.as_str())
107                    .unwrap_or_default();
108                let args = params
109                    .get("args")
110                    .and_then(|value| value.as_array())
111                    .cloned()
112                    .unwrap_or_default()
113                    .into_iter()
114                    .map(|value| json_result_to_vm_value(&value))
115                    .collect::<Vec<_>>();
116                self.invoke_export(name, &args).await
117            }
118            "host/tools/list" => self
119                .invoke_optional_export("host_tools_list", &[])
120                .await
121                .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
122            "session/request_permission" => self.request_permission(params).await,
123            other => Err(VmError::Runtime(format!(
124                "playground host backend does not implement bridge method '{other}'"
125            ))),
126        }
127    }
128
129    async fn invoke_export(
130        &self,
131        name: &str,
132        args: &[VmValue],
133    ) -> Result<serde_json::Value, VmError> {
134        let Some(closure) = self.exported_functions.get(name) else {
135            return Err(VmError::Runtime(format!(
136                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
137                self.module_path.display()
138            )));
139        };
140
141        let mut vm = self.vm.child_vm_for_host();
142        let result = vm.call_closure_pub(closure, args).await?;
143        Ok(crate::llm::vm_value_to_json(&result))
144    }
145
146    async fn invoke_optional_export(
147        &self,
148        name: &str,
149        args: &[VmValue],
150    ) -> Result<Option<serde_json::Value>, VmError> {
151        if !self.exported_functions.contains_key(name) {
152            return Ok(None);
153        }
154        self.invoke_export(name, args).await.map(Some)
155    }
156
157    async fn request_permission(
158        &self,
159        params: serde_json::Value,
160    ) -> Result<serde_json::Value, VmError> {
161        let Some(closure) = self.exported_functions.get("request_permission") else {
162            return Ok(serde_json::json!({ "granted": true }));
163        };
164
165        let tool_name = params
166            .get("toolCall")
167            .and_then(|tool_call| tool_call.get("toolName"))
168            .and_then(|value| value.as_str())
169            .unwrap_or_default();
170        let tool_args = params
171            .get("toolCall")
172            .and_then(|tool_call| tool_call.get("rawInput"))
173            .map(json_result_to_vm_value)
174            .unwrap_or(VmValue::Nil);
175        let full_payload = json_result_to_vm_value(&params);
176
177        let arg_count = closure.func.params.len();
178        let args = if arg_count >= 3 {
179            vec![
180                VmValue::String(Rc::from(tool_name.to_string())),
181                tool_args,
182                full_payload,
183            ]
184        } else if arg_count == 2 {
185            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
186        } else if arg_count == 1 {
187            vec![full_payload]
188        } else {
189            Vec::new()
190        };
191
192        let mut vm = self.vm.child_vm_for_host();
193        let result = vm.call_closure_pub(closure, &args).await?;
194        let payload = match result {
195            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
196            VmValue::String(reason) if !reason.is_empty() => {
197                serde_json::json!({ "granted": false, "reason": reason.to_string() })
198            }
199            other => {
200                let json = crate::llm::vm_value_to_json(&other);
201                if json
202                    .get("granted")
203                    .and_then(|value| value.as_bool())
204                    .is_some()
205                    || json.get("outcome").is_some()
206                {
207                    json
208                } else {
209                    serde_json::json!({ "granted": other.is_truthy() })
210                }
211            }
212        };
213        Ok(payload)
214    }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
218pub enum QueuedUserMessageMode {
219    InterruptImmediate,
220    FinishStep,
221    WaitForCompletion,
222}
223
224#[derive(Clone, Copy, Debug, PartialEq, Eq)]
225pub enum DeliveryCheckpoint {
226    InterruptImmediate,
227    AfterCurrentOperation,
228    EndOfInteraction,
229}
230
231impl QueuedUserMessageMode {
232    fn from_str(value: &str) -> Self {
233        match value {
234            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
235            "finish_step" | "after_current_operation" => Self::FinishStep,
236            _ => Self::WaitForCompletion,
237        }
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
242pub struct QueuedUserMessage {
243    pub content: String,
244    pub mode: QueuedUserMessageMode,
245}
246
247// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
248#[allow(clippy::new_without_default)]
249impl HostBridge {
250    /// Create a new bridge and spawn the stdin reader task.
251    ///
252    /// Must be called within a tokio LocalSet (uses spawn_local for the
253    /// stdin reader since it's single-threaded).
254    pub fn new() -> Self {
255        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
256            Arc::new(Mutex::new(HashMap::new()));
257        let cancelled = Arc::new(AtomicBool::new(false));
258        let cancel_notify = Arc::new(Notify::new());
259        let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
260            Arc::new(Mutex::new(VecDeque::new()));
261        let resume_requested = Arc::new(AtomicBool::new(false));
262        let skills_reload_requested = Arc::new(AtomicBool::new(false));
263        let daemon_idle = Arc::new(AtomicBool::new(false));
264
265        // Stdin reader: reads JSON-RPC lines and dispatches responses
266        let pending_clone = pending.clone();
267        let cancelled_clone = cancelled.clone();
268        let cancel_notify_clone = cancel_notify.clone();
269        let queued_clone = queued_user_messages.clone();
270        let resume_clone = resume_requested.clone();
271        let skills_reload_clone = skills_reload_requested.clone();
272        tokio::task::spawn_local(async move {
273            let stdin = tokio::io::stdin();
274            let reader = tokio::io::BufReader::new(stdin);
275            let mut lines = reader.lines();
276
277            while let Ok(Some(line)) = lines.next_line().await {
278                let line = line.trim().to_string();
279                if line.is_empty() {
280                    continue;
281                }
282
283                let msg: serde_json::Value = match serde_json::from_str(&line) {
284                    Ok(v) => v,
285                    Err(_) => continue,
286                };
287
288                // Notifications have no id; responses have one.
289                if msg.get("id").is_none() {
290                    if let Some(method) = msg["method"].as_str() {
291                        if method == "cancel" {
292                            cancelled_clone.store(true, Ordering::SeqCst);
293                            cancel_notify_clone.notify_waiters();
294                        } else if method == "agent/resume" {
295                            resume_clone.store(true, Ordering::SeqCst);
296                        } else if method == "skills/update" {
297                            skills_reload_clone.store(true, Ordering::SeqCst);
298                        } else if method == "user_message"
299                            || method == "session/input"
300                            || method == "agent/user_message"
301                        {
302                            let params = &msg["params"];
303                            let content = params
304                                .get("content")
305                                .and_then(|v| v.as_str())
306                                .unwrap_or("")
307                                .to_string();
308                            if !content.is_empty() {
309                                let mode = QueuedUserMessageMode::from_str(
310                                    params
311                                        .get("mode")
312                                        .and_then(|v| v.as_str())
313                                        .unwrap_or("wait_for_completion"),
314                                );
315                                queued_clone
316                                    .lock()
317                                    .await
318                                    .push_back(QueuedUserMessage { content, mode });
319                            }
320                        }
321                    }
322                    continue;
323                }
324
325                if let Some(id) = msg["id"].as_u64() {
326                    let mut pending = pending_clone.lock().await;
327                    if let Some(sender) = pending.remove(&id) {
328                        let _ = sender.send(msg);
329                    }
330                }
331            }
332
333            // stdin closed: drop pending senders to cancel waiters.
334            let mut pending = pending_clone.lock().await;
335            pending.clear();
336        });
337
338        Self {
339            next_id: AtomicU64::new(1),
340            pending,
341            cancelled,
342            cancel_notify,
343            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
344            session_id: std::sync::Mutex::new(String::new()),
345            script_name: std::sync::Mutex::new(String::new()),
346            queued_user_messages,
347            resume_requested,
348            skills_reload_requested,
349            daemon_idle,
350            prompt_stop_reason: std::sync::Mutex::new(None),
351            visible_call_states: std::sync::Mutex::new(HashMap::new()),
352            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
353            in_process: None,
354        }
355    }
356
357    /// Create a bridge from pre-existing shared state.
358    ///
359    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
360    /// responsible for dispatching responses into `pending`.  This is used
361    /// by ACP mode which already has its own stdin reader.
362    pub fn from_parts(
363        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
364        cancelled: Arc<AtomicBool>,
365        stdout_lock: Arc<std::sync::Mutex<()>>,
366        start_id: u64,
367    ) -> Self {
368        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
369    }
370
371    pub fn from_parts_with_writer(
372        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
373        cancelled: Arc<AtomicBool>,
374        writer: HostBridgeWriter,
375        start_id: u64,
376    ) -> Self {
377        Self::from_parts_with_writer_and_cancel_notify(
378            pending,
379            cancelled,
380            Arc::new(Notify::new()),
381            writer,
382            start_id,
383        )
384    }
385
386    pub fn from_parts_with_writer_and_cancel_notify(
387        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
388        cancelled: Arc<AtomicBool>,
389        cancel_notify: Arc<Notify>,
390        writer: HostBridgeWriter,
391        start_id: u64,
392    ) -> Self {
393        Self {
394            next_id: AtomicU64::new(start_id),
395            pending,
396            cancelled,
397            cancel_notify,
398            writer,
399            session_id: std::sync::Mutex::new(String::new()),
400            script_name: std::sync::Mutex::new(String::new()),
401            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
402            resume_requested: Arc::new(AtomicBool::new(false)),
403            skills_reload_requested: Arc::new(AtomicBool::new(false)),
404            daemon_idle: Arc::new(AtomicBool::new(false)),
405            prompt_stop_reason: std::sync::Mutex::new(None),
406            visible_call_states: std::sync::Mutex::new(HashMap::new()),
407            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
408            in_process: None,
409        }
410    }
411
412    /// Create an in-process host bridge backed by exported functions from a
413    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
414    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
415        let exported_functions = vm.load_module_exports(module_path).await?;
416        Ok(Self {
417            next_id: AtomicU64::new(1),
418            pending: Arc::new(Mutex::new(HashMap::new())),
419            cancelled: Arc::new(AtomicBool::new(false)),
420            cancel_notify: Arc::new(Notify::new()),
421            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
422            session_id: std::sync::Mutex::new(String::new()),
423            script_name: std::sync::Mutex::new(String::new()),
424            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
425            resume_requested: Arc::new(AtomicBool::new(false)),
426            skills_reload_requested: Arc::new(AtomicBool::new(false)),
427            daemon_idle: Arc::new(AtomicBool::new(false)),
428            prompt_stop_reason: std::sync::Mutex::new(None),
429            visible_call_states: std::sync::Mutex::new(HashMap::new()),
430            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
431            in_process: Some(InProcessHost {
432                module_path: module_path.to_path_buf(),
433                exported_functions,
434                vm,
435            }),
436        })
437    }
438
439    /// Set the ACP session ID for session-scoped notifications.
440    pub fn set_session_id(&self, id: &str) {
441        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
442    }
443
444    /// Set the currently executing script name (without .harn suffix).
445    pub fn set_script_name(&self, name: &str) {
446        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
447    }
448
449    /// Get the current script name.
450    fn get_script_name(&self) -> String {
451        self.script_name
452            .lock()
453            .unwrap_or_else(|e| e.into_inner())
454            .clone()
455    }
456
457    /// Get the session ID.
458    pub fn get_session_id(&self) -> String {
459        self.session_id
460            .lock()
461            .unwrap_or_else(|e| e.into_inner())
462            .clone()
463    }
464
465    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
466    fn write_line(&self, line: &str) -> Result<(), VmError> {
467        (self.writer)(line).map_err(VmError::Runtime)
468    }
469
470    /// Send a JSON-RPC request to the host and wait for the response.
471    /// Times out after 5 minutes to prevent deadlocks.
472    pub async fn call(
473        &self,
474        method: &str,
475        params: serde_json::Value,
476    ) -> Result<serde_json::Value, VmError> {
477        if let Some(in_process) = &self.in_process {
478            return in_process.dispatch(method, params).await;
479        }
480
481        if self.is_cancelled() {
482            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
483        }
484
485        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
486        let cancel_wait = self.cancel_notify.notified();
487        tokio::pin!(cancel_wait);
488
489        let request = crate::jsonrpc::request(id, method, params);
490
491        let (tx, rx) = oneshot::channel();
492        {
493            let mut pending = self.pending.lock().await;
494            pending.insert(id, tx);
495        }
496
497        let line = serde_json::to_string(&request)
498            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
499        if let Err(e) = self.write_line(&line) {
500            let mut pending = self.pending.lock().await;
501            pending.remove(&id);
502            return Err(e);
503        }
504
505        if self.is_cancelled() {
506            let mut pending = self.pending.lock().await;
507            pending.remove(&id);
508            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
509        }
510
511        let response = tokio::select! {
512            result = rx => match result {
513                Ok(msg) => msg,
514                Err(_) => {
515                    // Sender dropped: host closed or stdin reader exited.
516                    return Err(VmError::Runtime(
517                        "Bridge: host closed connection before responding".into(),
518                    ));
519                }
520            },
521            _ = &mut cancel_wait => {
522                let mut pending = self.pending.lock().await;
523                pending.remove(&id);
524                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
525            }
526            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
527                let mut pending = self.pending.lock().await;
528                pending.remove(&id);
529                return Err(VmError::Runtime(format!(
530                    "Bridge: host did not respond to '{method}' within {}s",
531                    DEFAULT_TIMEOUT.as_secs()
532                )));
533            }
534        };
535
536        if let Some(error) = response.get("error") {
537            let message = error["message"].as_str().unwrap_or("Unknown host error");
538            let code = error["code"].as_i64().unwrap_or(-1);
539            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
540            if code == -32001 {
541                return Err(VmError::CategorizedError {
542                    message: message.to_string(),
543                    category: ErrorCategory::ToolRejected,
544                });
545            }
546            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
547        }
548
549        Ok(response["result"].clone())
550    }
551
552    /// Send a JSON-RPC notification to the host (no response expected).
553    /// Serialized through the stdout mutex to prevent interleaving.
554    pub fn notify(&self, method: &str, params: serde_json::Value) {
555        let notification = crate::jsonrpc::notification(method, params);
556        if self.in_process.is_some() {
557            return;
558        }
559        if let Ok(line) = serde_json::to_string(&notification) {
560            let _ = self.write_line(&line);
561        }
562    }
563
564    /// Check if the host has sent a cancel notification.
565    pub fn is_cancelled(&self) -> bool {
566        self.cancelled.load(Ordering::SeqCst)
567    }
568
569    pub fn take_resume_signal(&self) -> bool {
570        self.resume_requested.swap(false, Ordering::SeqCst)
571    }
572
573    pub fn signal_resume(&self) {
574        self.resume_requested.store(true, Ordering::SeqCst);
575    }
576
577    pub fn set_daemon_idle(&self, idle: bool) {
578        self.daemon_idle.store(idle, Ordering::SeqCst);
579    }
580
581    pub fn is_daemon_idle(&self) -> bool {
582        self.daemon_idle.load(Ordering::SeqCst)
583    }
584
585    /// Record the canonical ACP `stopReason` for the current prompt. The
586    /// last writer wins, which matches the semantic that an outer
587    /// `agent_loop` (the one whose result the user observes) always
588    /// finalizes after any inner loops it spawned.
589    pub fn set_prompt_stop_reason(&self, reason: &str) {
590        *self
591            .prompt_stop_reason
592            .lock()
593            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
594    }
595
596    /// Consume any prompt stop reason recorded during this prompt. The
597    /// ACP adapter calls this once after the pipeline returns; pipelines
598    /// that didn't run an `agent_loop` see `None` and the adapter falls
599    /// back to `end_turn`.
600    pub fn take_prompt_stop_reason(&self) -> Option<String> {
601        self.prompt_stop_reason
602            .lock()
603            .unwrap_or_else(|e| e.into_inner())
604            .take()
605    }
606
607    /// Consume any pending `skills/update` signal the host has sent.
608    /// Returns `true` exactly once per notification, letting callers
609    /// trigger a layered-discovery rebuild without polling false
610    /// positives. See issue #73 for the hot-reload contract.
611    pub fn take_skills_reload_signal(&self) -> bool {
612        self.skills_reload_requested.swap(false, Ordering::SeqCst)
613    }
614
615    /// Manually mark the skill catalog as stale. Used by tests and by
616    /// the CLI when an internal event (e.g. `harn install`) should
617    /// trigger the same rebuild a `skills/update` notification would.
618    pub fn signal_skills_reload(&self) {
619        self.skills_reload_requested.store(true, Ordering::SeqCst);
620    }
621
622    /// Call the host's `skills/list` RPC and return the raw JSON array
623    /// it responded with. Shape:
624    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
625    /// The CLI adapter converts each entry into a
626    /// [`crate::skills::SkillManifestRef`].
627    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
628        let result = self.call("skills/list", serde_json::json!({})).await?;
629        match result {
630            serde_json::Value::Array(items) => Ok(items),
631            serde_json::Value::Object(map) => match map.get("skills") {
632                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
633                _ => Err(VmError::Runtime(
634                    "skills/list: host response must be an array or { skills: [...] }".into(),
635                )),
636            },
637            _ => Err(VmError::Runtime(
638                "skills/list: unexpected response shape".into(),
639            )),
640        }
641    }
642
643    /// Call the host's `host/tools/list` RPC and return normalized tool
644    /// descriptors. Shape:
645    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
646    /// The bridge also accepts `{ "tools": [...] }` and
647    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
648    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
649        let result = self.call("host/tools/list", serde_json::json!({})).await?;
650        parse_host_tools_list_response(result)
651    }
652
653    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
654    /// raw JSON body so the CLI can inspect both the frontmatter fields
655    /// and the skill markdown body in whatever shape the host sends.
656    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
657        self.call("skills/fetch", serde_json::json!({ "id": id }))
658            .await
659    }
660
661    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
662        self.queued_user_messages
663            .lock()
664            .await
665            .push_back(QueuedUserMessage {
666                content,
667                mode: QueuedUserMessageMode::from_str(mode),
668            });
669    }
670
671    pub async fn take_queued_user_messages(
672        &self,
673        include_interrupt_immediate: bool,
674        include_finish_step: bool,
675        include_wait_for_completion: bool,
676    ) -> Vec<QueuedUserMessage> {
677        let mut queue = self.queued_user_messages.lock().await;
678        let mut selected = Vec::new();
679        let mut retained = VecDeque::new();
680        while let Some(message) = queue.pop_front() {
681            let should_take = match message.mode {
682                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
683                QueuedUserMessageMode::FinishStep => include_finish_step,
684                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
685            };
686            if should_take {
687                selected.push(message);
688            } else {
689                retained.push_back(message);
690            }
691        }
692        *queue = retained;
693        selected
694    }
695
696    pub async fn take_queued_user_messages_for(
697        &self,
698        checkpoint: DeliveryCheckpoint,
699    ) -> Vec<QueuedUserMessage> {
700        match checkpoint {
701            DeliveryCheckpoint::InterruptImmediate => {
702                self.take_queued_user_messages(true, false, false).await
703            }
704            DeliveryCheckpoint::AfterCurrentOperation => {
705                self.take_queued_user_messages(false, true, false).await
706            }
707            DeliveryCheckpoint::EndOfInteraction => {
708                self.take_queued_user_messages(false, false, true).await
709            }
710        }
711    }
712
713    /// Send an output notification (for log/print in bridge mode).
714    pub fn send_output(&self, text: &str) {
715        self.notify("output", serde_json::json!({"text": text}));
716    }
717
718    /// Send a progress notification with optional numeric progress and structured data.
719    pub fn send_progress(
720        &self,
721        phase: &str,
722        message: &str,
723        progress: Option<i64>,
724        total: Option<i64>,
725        data: Option<serde_json::Value>,
726    ) {
727        let mut payload = serde_json::json!({"phase": phase, "message": message});
728        if let Some(p) = progress {
729            payload["progress"] = serde_json::json!(p);
730        }
731        if let Some(t) = total {
732            payload["total"] = serde_json::json!(t);
733        }
734        if let Some(d) = data {
735            payload["data"] = d;
736        }
737        self.notify("progress", payload);
738    }
739
740    /// Send a structured log notification.
741    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
742        let mut payload = serde_json::json!({"level": level, "message": message});
743        if let Some(f) = fields {
744            payload["fields"] = f;
745        }
746        self.notify("log", payload);
747    }
748
749    /// Send a `session/update` with `call_start` — signals the beginning of
750    /// an LLM call, tool call, or builtin call for observability.
751    pub fn send_call_start(
752        &self,
753        call_id: &str,
754        call_type: &str,
755        name: &str,
756        metadata: serde_json::Value,
757    ) {
758        let session_id = self.get_session_id();
759        let script = self.get_script_name();
760        let stream_publicly = metadata
761            .get("stream_publicly")
762            .and_then(|value| value.as_bool())
763            .unwrap_or(true);
764        self.visible_call_streams
765            .lock()
766            .unwrap_or_else(|e| e.into_inner())
767            .insert(call_id.to_string(), stream_publicly);
768        self.notify(
769            "session/update",
770            serde_json::json!({
771                "sessionId": session_id,
772                "update": {
773                    "sessionUpdate": "call_start",
774                    "content": {
775                        "toolCallId": call_id,
776                        "call_type": call_type,
777                        "name": name,
778                        "script": script,
779                        "metadata": metadata,
780                    },
781                },
782            }),
783        );
784    }
785
786    /// Send a `session/update` with `call_progress` — a streaming token delta
787    /// from an in-flight LLM call.
788    pub fn send_call_progress(
789        &self,
790        call_id: &str,
791        delta: &str,
792        accumulated_tokens: u64,
793        user_visible: bool,
794    ) {
795        let session_id = self.get_session_id();
796        let (visible_text, visible_delta) = {
797            let stream_publicly = self
798                .visible_call_streams
799                .lock()
800                .unwrap_or_else(|e| e.into_inner())
801                .get(call_id)
802                .copied()
803                .unwrap_or(true);
804            let mut states = self
805                .visible_call_states
806                .lock()
807                .unwrap_or_else(|e| e.into_inner());
808            let state = states.entry(call_id.to_string()).or_default();
809            state.push(delta, stream_publicly)
810        };
811        self.notify(
812            "session/update",
813            serde_json::json!({
814                "sessionId": session_id,
815                "update": {
816                    "sessionUpdate": "call_progress",
817                    "content": {
818                        "toolCallId": call_id,
819                        "delta": delta,
820                        "accumulated_tokens": accumulated_tokens,
821                        "visible_text": visible_text,
822                        "visible_delta": visible_delta,
823                        "user_visible": user_visible,
824                    },
825                },
826            }),
827        );
828    }
829
830    /// Send a `session/update` with `call_end` — signals completion of a call.
831    pub fn send_call_end(
832        &self,
833        call_id: &str,
834        call_type: &str,
835        name: &str,
836        duration_ms: u64,
837        status: &str,
838        metadata: serde_json::Value,
839    ) {
840        let session_id = self.get_session_id();
841        let script = self.get_script_name();
842        self.visible_call_states
843            .lock()
844            .unwrap_or_else(|e| e.into_inner())
845            .remove(call_id);
846        self.visible_call_streams
847            .lock()
848            .unwrap_or_else(|e| e.into_inner())
849            .remove(call_id);
850        self.notify(
851            "session/update",
852            serde_json::json!({
853                "sessionId": session_id,
854                "update": {
855                    "sessionUpdate": "call_end",
856                    "content": {
857                        "toolCallId": call_id,
858                        "call_type": call_type,
859                        "name": name,
860                        "script": script,
861                        "duration_ms": duration_ms,
862                        "status": status,
863                        "metadata": metadata,
864                    },
865                },
866            }),
867        );
868    }
869
870    /// Send a worker lifecycle update for delegated/background execution.
871    pub fn send_worker_update(
872        &self,
873        worker_id: &str,
874        worker_name: &str,
875        status: &str,
876        metadata: serde_json::Value,
877        audit: Option<&MutationSessionRecord>,
878    ) {
879        let session_id = self.get_session_id();
880        let script = self.get_script_name();
881        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
882        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
883        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
884        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
885        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
886        let lifecycle = serde_json::json!({
887            "event": status,
888            "worker_id": worker_id,
889            "worker_name": worker_name,
890            "started_at": started_at,
891            "finished_at": finished_at,
892        });
893        self.notify(
894            "session/update",
895            serde_json::json!({
896                "sessionId": session_id,
897                "update": {
898                    "sessionUpdate": "worker_update",
899                    "content": {
900                        "worker_id": worker_id,
901                        "worker_name": worker_name,
902                        "status": status,
903                        "script": script,
904                        "started_at": started_at,
905                        "finished_at": finished_at,
906                        "snapshot_path": snapshot_path,
907                        "run_id": run_id,
908                        "run_path": run_path,
909                        "lifecycle": lifecycle,
910                        "audit": audit,
911                        "metadata": metadata,
912                    },
913                },
914            }),
915        );
916    }
917}
918
919/// Convert a serde_json::Value to a VmValue.
920pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
921    crate::stdlib::json_to_vm_value(val)
922}
923
924fn parse_host_tools_list_response(
925    result: serde_json::Value,
926) -> Result<Vec<serde_json::Value>, VmError> {
927    let tools = match result {
928        serde_json::Value::Array(items) => items,
929        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
930            map.get("result")
931                .and_then(|value| value.get("tools"))
932                .cloned()
933        }) {
934            Some(serde_json::Value::Array(items)) => items,
935            _ => {
936                return Err(VmError::Runtime(
937                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
938                ));
939            }
940        },
941        _ => {
942            return Err(VmError::Runtime(
943                "host/tools/list: unexpected response shape".into(),
944            ));
945        }
946    };
947
948    let mut normalized = Vec::with_capacity(tools.len());
949    for tool in tools {
950        let serde_json::Value::Object(map) = tool else {
951            return Err(VmError::Runtime(
952                "host/tools/list: every tool must be an object".into(),
953            ));
954        };
955        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
956            return Err(VmError::Runtime(
957                "host/tools/list: every tool must include a string `name`".into(),
958            ));
959        };
960        let description = map
961            .get("description")
962            .and_then(|value| value.as_str())
963            .or_else(|| {
964                map.get("short_description")
965                    .and_then(|value| value.as_str())
966            })
967            .unwrap_or_default();
968        let schema = map
969            .get("schema")
970            .cloned()
971            .or_else(|| map.get("parameters").cloned())
972            .or_else(|| map.get("input_schema").cloned())
973            .unwrap_or(serde_json::Value::Null);
974        let deprecated = map
975            .get("deprecated")
976            .and_then(|value| value.as_bool())
977            .unwrap_or(false);
978        normalized.push(serde_json::json!({
979            "name": name,
980            "description": description,
981            "schema": schema,
982            "deprecated": deprecated,
983        }));
984    }
985    Ok(normalized)
986}
987
988#[cfg(test)]
989mod tests {
990    use super::*;
991
992    #[test]
993    fn test_json_rpc_request_format() {
994        let request = crate::jsonrpc::request(
995            1,
996            "llm_call",
997            serde_json::json!({
998                "prompt": "Hello",
999                "system": "Be helpful",
1000            }),
1001        );
1002        let s = serde_json::to_string(&request).unwrap();
1003        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1004        assert!(s.contains("\"id\":1"));
1005        assert!(s.contains("\"method\":\"llm_call\""));
1006    }
1007
1008    #[test]
1009    fn test_json_rpc_notification_format() {
1010        let notification =
1011            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1012        let s = serde_json::to_string(&notification).unwrap();
1013        assert!(s.contains("\"method\":\"output\""));
1014        assert!(!s.contains("\"id\""));
1015    }
1016
1017    #[test]
1018    fn test_json_rpc_error_response_parsing() {
1019        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1020        assert!(response.get("error").is_some());
1021        assert_eq!(
1022            response["error"]["message"].as_str().unwrap(),
1023            "Invalid request"
1024        );
1025    }
1026
1027    #[test]
1028    fn test_json_rpc_success_response_parsing() {
1029        let response = crate::jsonrpc::response(
1030            1,
1031            serde_json::json!({
1032                "text": "Hello world",
1033                "input_tokens": 10,
1034                "output_tokens": 5,
1035            }),
1036        );
1037        assert!(response.get("result").is_some());
1038        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1039    }
1040
1041    #[test]
1042    fn test_cancelled_flag() {
1043        let cancelled = Arc::new(AtomicBool::new(false));
1044        assert!(!cancelled.load(Ordering::SeqCst));
1045        cancelled.store(true, Ordering::SeqCst);
1046        assert!(cancelled.load(Ordering::SeqCst));
1047    }
1048
1049    #[test]
1050    fn pending_host_calls_return_when_cancellation_arrives() {
1051        let runtime = tokio::runtime::Builder::new_current_thread()
1052            .enable_all()
1053            .build()
1054            .unwrap();
1055        runtime.block_on(async {
1056            let pending = Arc::new(Mutex::new(HashMap::new()));
1057            let cancelled = Arc::new(AtomicBool::new(false));
1058            let bridge = HostBridge::from_parts_with_writer(
1059                pending.clone(),
1060                cancelled.clone(),
1061                Arc::new(|_| Ok(())),
1062                1,
1063            );
1064
1065            let call = bridge.call("host/work", serde_json::json!({}));
1066            tokio::pin!(call);
1067
1068            loop {
1069                tokio::select! {
1070                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1071                    _ = tokio::task::yield_now() => {}
1072                }
1073                if !pending.lock().await.is_empty() {
1074                    break;
1075                }
1076            }
1077
1078            cancelled.store(true, Ordering::SeqCst);
1079            bridge.cancel_notify.notify_waiters();
1080
1081            let result = tokio::time::timeout(Duration::from_secs(1), call)
1082                .await
1083                .expect("pending call should observe cancellation promptly");
1084            assert!(
1085                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1086            );
1087            assert!(pending.lock().await.is_empty());
1088        });
1089    }
1090
1091    #[test]
1092    fn queued_messages_are_filtered_by_delivery_mode() {
1093        let runtime = tokio::runtime::Builder::new_current_thread()
1094            .enable_all()
1095            .build()
1096            .unwrap();
1097        runtime.block_on(async {
1098            let bridge = HostBridge::from_parts(
1099                Arc::new(Mutex::new(HashMap::new())),
1100                Arc::new(AtomicBool::new(false)),
1101                Arc::new(std::sync::Mutex::new(())),
1102                1,
1103            );
1104            bridge
1105                .push_queued_user_message("first".to_string(), "finish_step")
1106                .await;
1107            bridge
1108                .push_queued_user_message("second".to_string(), "wait_for_completion")
1109                .await;
1110
1111            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1112            assert_eq!(finish_step.len(), 1);
1113            assert_eq!(finish_step[0].content, "first");
1114
1115            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1116            assert_eq!(turn_end.len(), 1);
1117            assert_eq!(turn_end[0].content, "second");
1118        });
1119    }
1120
1121    #[test]
1122    fn test_json_result_to_vm_value_string() {
1123        let val = serde_json::json!("hello");
1124        let vm_val = json_result_to_vm_value(&val);
1125        assert_eq!(vm_val.display(), "hello");
1126    }
1127
1128    #[test]
1129    fn test_json_result_to_vm_value_dict() {
1130        let val = serde_json::json!({"name": "test", "count": 42});
1131        let vm_val = json_result_to_vm_value(&val);
1132        let VmValue::Dict(d) = &vm_val else {
1133            unreachable!("Expected Dict, got {:?}", vm_val);
1134        };
1135        assert_eq!(d.get("name").unwrap().display(), "test");
1136        assert_eq!(d.get("count").unwrap().display(), "42");
1137    }
1138
1139    #[test]
1140    fn test_json_result_to_vm_value_null() {
1141        let val = serde_json::json!(null);
1142        let vm_val = json_result_to_vm_value(&val);
1143        assert!(matches!(vm_val, VmValue::Nil));
1144    }
1145
1146    #[test]
1147    fn test_json_result_to_vm_value_nested() {
1148        let val = serde_json::json!({
1149            "text": "response",
1150            "tool_calls": [
1151                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1152            ],
1153            "input_tokens": 100,
1154            "output_tokens": 50,
1155        });
1156        let vm_val = json_result_to_vm_value(&val);
1157        let VmValue::Dict(d) = &vm_val else {
1158            unreachable!("Expected Dict, got {:?}", vm_val);
1159        };
1160        assert_eq!(d.get("text").unwrap().display(), "response");
1161        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1162            unreachable!("Expected List for tool_calls");
1163        };
1164        assert_eq!(list.len(), 1);
1165    }
1166
1167    #[test]
1168    fn parse_host_tools_list_accepts_object_wrapper() {
1169        let tools = parse_host_tools_list_response(serde_json::json!({
1170            "tools": [
1171                {
1172                    "name": "Read",
1173                    "description": "Read a file",
1174                    "schema": {"type": "object"},
1175                }
1176            ]
1177        }))
1178        .expect("tool list");
1179
1180        assert_eq!(tools.len(), 1);
1181        assert_eq!(tools[0]["name"], "Read");
1182        assert_eq!(tools[0]["deprecated"], false);
1183    }
1184
1185    #[test]
1186    fn parse_host_tools_list_accepts_compat_fields() {
1187        let tools = parse_host_tools_list_response(serde_json::json!({
1188            "result": {
1189                "tools": [
1190                    {
1191                        "name": "Edit",
1192                        "short_description": "Apply an edit",
1193                        "input_schema": {"type": "object"},
1194                        "deprecated": true,
1195                    }
1196                ]
1197            }
1198        }))
1199        .expect("tool list");
1200
1201        assert_eq!(tools[0]["description"], "Apply an edit");
1202        assert_eq!(tools[0]["schema"]["type"], "object");
1203        assert_eq!(tools[0]["deprecated"], true);
1204    }
1205
1206    #[test]
1207    fn parse_host_tools_list_requires_tool_names() {
1208        let err = parse_host_tools_list_response(serde_json::json!({
1209            "tools": [
1210                {"description": "missing name"}
1211            ]
1212        }))
1213        .expect_err("expected error");
1214        assert!(err
1215            .to_string()
1216            .contains("host/tools/list: every tool must include a string `name`"));
1217    }
1218
1219    #[test]
1220    fn test_timeout_duration() {
1221        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1222    }
1223}