Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23/// Default timeout for bridge calls (5 minutes).
24const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29    Arc::new(move |line: &str| {
30        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31        let mut stdout = std::io::stdout().lock();
32        stdout
33            .write_all(line.as_bytes())
34            .map_err(|e| format!("Bridge write error: {e}"))?;
35        stdout
36            .write_all(b"\n")
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .flush()
40            .map_err(|e| format!("Bridge flush error: {e}"))?;
41        Ok(())
42    })
43}
44
45/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
46///
47/// The bridge sends requests to the host on stdout and receives responses
48/// on stdin. A background task reads stdin and dispatches responses to
49/// waiting callers by request ID. All stdout writes are serialized through
50/// a mutex to prevent interleaving.
51pub struct HostBridge {
52    next_id: AtomicU64,
53    /// Pending request waiters, keyed by JSON-RPC id.
54    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55    /// Whether the host has sent a cancel notification.
56    cancelled: Arc<AtomicBool>,
57    /// Wakes pending host calls when cancellation arrives.
58    cancel_notify: Arc<Notify>,
59    /// Transport writer used to send JSON-RPC to the host.
60    writer: HostBridgeWriter,
61    /// ACP session ID (set in ACP mode for session-scoped notifications).
62    session_id: std::sync::Mutex<String>,
63    /// Name of the currently executing Harn script (without .harn suffix).
64    script_name: std::sync::Mutex<String>,
65    /// User messages injected by the host while a run is active.
66    queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
67    /// Host-triggered resume signal for daemon agents.
68    resume_requested: Arc<AtomicBool>,
69    /// Host-triggered skill-registry invalidation signal. Set when the
70    /// host sends a `skills/update` notification; consumed by the CLI
71    /// between runs (watch mode, long-running agents) to rebuild the
72    /// layered skill catalog from its current filesystem + host state.
73    skills_reload_requested: Arc<AtomicBool>,
74    /// Whether the current daemon-mode agent loop is blocked in idle wait.
75    daemon_idle: Arc<AtomicBool>,
76    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
77    /// finalize during this prompt. Read once by the ACP adapter when the
78    /// pipeline returns and populated by `host_agent_session_finalize`.
79    /// Pipelines that don't run an agent loop leave this `None`, in which
80    /// case the adapter falls back to `end_turn`.
81    prompt_stop_reason: std::sync::Mutex<Option<String>>,
82    /// Per-call visible assistant text state for call_progress notifications.
83    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
84    /// Whether an LLM call's deltas should be exposed to end users while streaming.
85    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
86    /// Optional in-process host-module backend used by `harn playground`.
87    in_process: Option<InProcessHost>,
88}
89
90struct InProcessHost {
91    module_path: PathBuf,
92    exported_functions: BTreeMap<String, Rc<VmClosure>>,
93    vm: Vm,
94}
95
96impl InProcessHost {
97    async fn dispatch(
98        &self,
99        method: &str,
100        params: serde_json::Value,
101    ) -> Result<serde_json::Value, VmError> {
102        match method {
103            "builtin_call" => {
104                let name = params
105                    .get("name")
106                    .and_then(|value| value.as_str())
107                    .unwrap_or_default();
108                let args = params
109                    .get("args")
110                    .and_then(|value| value.as_array())
111                    .cloned()
112                    .unwrap_or_default()
113                    .into_iter()
114                    .map(|value| json_result_to_vm_value(&value))
115                    .collect::<Vec<_>>();
116                self.invoke_export(name, &args).await
117            }
118            "host/tools/list" => self
119                .invoke_optional_export("host_tools_list", &[])
120                .await
121                .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
122            "session/request_permission" => self.request_permission(params).await,
123            other => Err(VmError::Runtime(format!(
124                "playground host backend does not implement bridge method '{other}'"
125            ))),
126        }
127    }
128
129    async fn invoke_export(
130        &self,
131        name: &str,
132        args: &[VmValue],
133    ) -> Result<serde_json::Value, VmError> {
134        let Some(closure) = self.exported_functions.get(name) else {
135            return Err(VmError::Runtime(format!(
136                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
137                self.module_path.display()
138            )));
139        };
140
141        let mut vm = self.vm.child_vm_for_host();
142        let result = vm.call_closure_pub(closure, args).await?;
143        Ok(crate::llm::vm_value_to_json(&result))
144    }
145
146    async fn invoke_optional_export(
147        &self,
148        name: &str,
149        args: &[VmValue],
150    ) -> Result<Option<serde_json::Value>, VmError> {
151        if !self.exported_functions.contains_key(name) {
152            return Ok(None);
153        }
154        self.invoke_export(name, args).await.map(Some)
155    }
156
157    async fn request_permission(
158        &self,
159        params: serde_json::Value,
160    ) -> Result<serde_json::Value, VmError> {
161        let Some(closure) = self.exported_functions.get("request_permission") else {
162            return Ok(serde_json::json!({ "granted": true }));
163        };
164
165        let tool_name = params
166            .get("toolCall")
167            .and_then(|tool_call| tool_call.get("toolName"))
168            .and_then(|value| value.as_str())
169            .unwrap_or_default();
170        let tool_args = params
171            .get("toolCall")
172            .and_then(|tool_call| tool_call.get("rawInput"))
173            .map(json_result_to_vm_value)
174            .unwrap_or(VmValue::Nil);
175        let full_payload = json_result_to_vm_value(&params);
176
177        let arg_count = closure.func.params.len();
178        let args = if arg_count >= 3 {
179            vec![
180                VmValue::String(Rc::from(tool_name.to_string())),
181                tool_args,
182                full_payload,
183            ]
184        } else if arg_count == 2 {
185            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
186        } else if arg_count == 1 {
187            vec![full_payload]
188        } else {
189            Vec::new()
190        };
191
192        let mut vm = self.vm.child_vm_for_host();
193        let result = vm.call_closure_pub(closure, &args).await?;
194        let payload = match result {
195            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
196            VmValue::String(reason) if !reason.is_empty() => {
197                serde_json::json!({ "granted": false, "reason": reason.to_string() })
198            }
199            other => {
200                let json = crate::llm::vm_value_to_json(&other);
201                if json
202                    .get("granted")
203                    .and_then(|value| value.as_bool())
204                    .is_some()
205                    || json.get("outcome").is_some()
206                {
207                    json
208                } else {
209                    serde_json::json!({ "granted": other.is_truthy() })
210                }
211            }
212        };
213        Ok(payload)
214    }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
218pub enum QueuedUserMessageMode {
219    InterruptImmediate,
220    FinishStep,
221    WaitForCompletion,
222}
223
224#[derive(Clone, Copy, Debug, PartialEq, Eq)]
225pub enum DeliveryCheckpoint {
226    InterruptImmediate,
227    AfterCurrentOperation,
228    EndOfInteraction,
229}
230
231impl QueuedUserMessageMode {
232    fn from_str(value: &str) -> Self {
233        match value {
234            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
235            "finish_step" | "after_current_operation" => Self::FinishStep,
236            _ => Self::WaitForCompletion,
237        }
238    }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
242pub struct QueuedUserMessage {
243    pub content: String,
244    pub mode: QueuedUserMessageMode,
245}
246
247// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
248#[allow(clippy::new_without_default)]
249impl HostBridge {
250    /// Create a new bridge and spawn the stdin reader task.
251    ///
252    /// Must be called within a tokio LocalSet (uses spawn_local for the
253    /// stdin reader since it's single-threaded).
254    pub fn new() -> Self {
255        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
256            Arc::new(Mutex::new(HashMap::new()));
257        let cancelled = Arc::new(AtomicBool::new(false));
258        let cancel_notify = Arc::new(Notify::new());
259        let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
260            Arc::new(Mutex::new(VecDeque::new()));
261        let resume_requested = Arc::new(AtomicBool::new(false));
262        let skills_reload_requested = Arc::new(AtomicBool::new(false));
263        let daemon_idle = Arc::new(AtomicBool::new(false));
264
265        // Stdin reader: reads JSON-RPC lines and dispatches responses
266        let pending_clone = pending.clone();
267        let cancelled_clone = cancelled.clone();
268        let cancel_notify_clone = cancel_notify.clone();
269        let queued_clone = queued_user_messages.clone();
270        let resume_clone = resume_requested.clone();
271        let skills_reload_clone = skills_reload_requested.clone();
272        tokio::task::spawn_local(async move {
273            let stdin = tokio::io::stdin();
274            let reader = tokio::io::BufReader::new(stdin);
275            let mut lines = reader.lines();
276
277            while let Ok(Some(line)) = lines.next_line().await {
278                let line = line.trim().to_string();
279                if line.is_empty() {
280                    continue;
281                }
282
283                let msg: serde_json::Value = match serde_json::from_str(&line) {
284                    Ok(v) => v,
285                    Err(_) => continue,
286                };
287
288                // Notifications have no id; responses have one.
289                if msg.get("id").is_none() {
290                    if let Some(method) = msg["method"].as_str() {
291                        if method == "cancel" {
292                            cancelled_clone.store(true, Ordering::SeqCst);
293                            cancel_notify_clone.notify_waiters();
294                        } else if method == "agent/resume" {
295                            resume_clone.store(true, Ordering::SeqCst);
296                        } else if method == "skills/update" {
297                            skills_reload_clone.store(true, Ordering::SeqCst);
298                        } else if method == "user_message"
299                            || method == "session/input"
300                            || method == "agent/user_message"
301                        {
302                            let params = &msg["params"];
303                            let content = params
304                                .get("content")
305                                .and_then(|v| v.as_str())
306                                .unwrap_or("")
307                                .to_string();
308                            if !content.is_empty() {
309                                let mode = QueuedUserMessageMode::from_str(
310                                    params
311                                        .get("mode")
312                                        .and_then(|v| v.as_str())
313                                        .unwrap_or("wait_for_completion"),
314                                );
315                                queued_clone
316                                    .lock()
317                                    .await
318                                    .push_back(QueuedUserMessage { content, mode });
319                            }
320                        }
321                    }
322                    continue;
323                }
324
325                if let Some(id) = msg["id"].as_u64() {
326                    let mut pending = pending_clone.lock().await;
327                    if let Some(sender) = pending.remove(&id) {
328                        let _ = sender.send(msg);
329                    }
330                }
331            }
332
333            // stdin closed: drop pending senders to cancel waiters.
334            let mut pending = pending_clone.lock().await;
335            pending.clear();
336        });
337
338        Self {
339            next_id: AtomicU64::new(1),
340            pending,
341            cancelled,
342            cancel_notify,
343            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
344            session_id: std::sync::Mutex::new(String::new()),
345            script_name: std::sync::Mutex::new(String::new()),
346            queued_user_messages,
347            resume_requested,
348            skills_reload_requested,
349            daemon_idle,
350            prompt_stop_reason: std::sync::Mutex::new(None),
351            visible_call_states: std::sync::Mutex::new(HashMap::new()),
352            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
353            in_process: None,
354        }
355    }
356
357    /// Create a bridge from pre-existing shared state.
358    ///
359    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
360    /// responsible for dispatching responses into `pending`.  This is used
361    /// by ACP mode which already has its own stdin reader.
362    pub fn from_parts(
363        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
364        cancelled: Arc<AtomicBool>,
365        stdout_lock: Arc<std::sync::Mutex<()>>,
366        start_id: u64,
367    ) -> Self {
368        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
369    }
370
371    pub fn from_parts_with_writer(
372        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
373        cancelled: Arc<AtomicBool>,
374        writer: HostBridgeWriter,
375        start_id: u64,
376    ) -> Self {
377        Self {
378            next_id: AtomicU64::new(start_id),
379            pending,
380            cancelled,
381            cancel_notify: Arc::new(Notify::new()),
382            writer,
383            session_id: std::sync::Mutex::new(String::new()),
384            script_name: std::sync::Mutex::new(String::new()),
385            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
386            resume_requested: Arc::new(AtomicBool::new(false)),
387            skills_reload_requested: Arc::new(AtomicBool::new(false)),
388            daemon_idle: Arc::new(AtomicBool::new(false)),
389            prompt_stop_reason: std::sync::Mutex::new(None),
390            visible_call_states: std::sync::Mutex::new(HashMap::new()),
391            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
392            in_process: None,
393        }
394    }
395
396    /// Create an in-process host bridge backed by exported functions from a
397    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
398    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
399        let exported_functions = vm.load_module_exports(module_path).await?;
400        Ok(Self {
401            next_id: AtomicU64::new(1),
402            pending: Arc::new(Mutex::new(HashMap::new())),
403            cancelled: Arc::new(AtomicBool::new(false)),
404            cancel_notify: Arc::new(Notify::new()),
405            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
406            session_id: std::sync::Mutex::new(String::new()),
407            script_name: std::sync::Mutex::new(String::new()),
408            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
409            resume_requested: Arc::new(AtomicBool::new(false)),
410            skills_reload_requested: Arc::new(AtomicBool::new(false)),
411            daemon_idle: Arc::new(AtomicBool::new(false)),
412            prompt_stop_reason: std::sync::Mutex::new(None),
413            visible_call_states: std::sync::Mutex::new(HashMap::new()),
414            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
415            in_process: Some(InProcessHost {
416                module_path: module_path.to_path_buf(),
417                exported_functions,
418                vm,
419            }),
420        })
421    }
422
423    /// Set the ACP session ID for session-scoped notifications.
424    pub fn set_session_id(&self, id: &str) {
425        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
426    }
427
428    /// Set the currently executing script name (without .harn suffix).
429    pub fn set_script_name(&self, name: &str) {
430        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
431    }
432
433    /// Get the current script name.
434    fn get_script_name(&self) -> String {
435        self.script_name
436            .lock()
437            .unwrap_or_else(|e| e.into_inner())
438            .clone()
439    }
440
441    /// Get the session ID.
442    pub fn get_session_id(&self) -> String {
443        self.session_id
444            .lock()
445            .unwrap_or_else(|e| e.into_inner())
446            .clone()
447    }
448
449    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
450    fn write_line(&self, line: &str) -> Result<(), VmError> {
451        (self.writer)(line).map_err(VmError::Runtime)
452    }
453
454    /// Send a JSON-RPC request to the host and wait for the response.
455    /// Times out after 5 minutes to prevent deadlocks.
456    pub async fn call(
457        &self,
458        method: &str,
459        params: serde_json::Value,
460    ) -> Result<serde_json::Value, VmError> {
461        if let Some(in_process) = &self.in_process {
462            return in_process.dispatch(method, params).await;
463        }
464
465        if self.is_cancelled() {
466            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
467        }
468
469        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
470        let cancel_wait = self.cancel_notify.notified();
471        tokio::pin!(cancel_wait);
472
473        let request = crate::jsonrpc::request(id, method, params);
474
475        let (tx, rx) = oneshot::channel();
476        {
477            let mut pending = self.pending.lock().await;
478            pending.insert(id, tx);
479        }
480
481        let line = serde_json::to_string(&request)
482            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
483        if let Err(e) = self.write_line(&line) {
484            let mut pending = self.pending.lock().await;
485            pending.remove(&id);
486            return Err(e);
487        }
488
489        if self.is_cancelled() {
490            let mut pending = self.pending.lock().await;
491            pending.remove(&id);
492            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
493        }
494
495        let response = tokio::select! {
496            result = rx => match result {
497                Ok(msg) => msg,
498                Err(_) => {
499                    // Sender dropped: host closed or stdin reader exited.
500                    return Err(VmError::Runtime(
501                        "Bridge: host closed connection before responding".into(),
502                    ));
503                }
504            },
505            _ = &mut cancel_wait => {
506                let mut pending = self.pending.lock().await;
507                pending.remove(&id);
508                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
509            }
510            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
511                let mut pending = self.pending.lock().await;
512                pending.remove(&id);
513                return Err(VmError::Runtime(format!(
514                    "Bridge: host did not respond to '{method}' within {}s",
515                    DEFAULT_TIMEOUT.as_secs()
516                )));
517            }
518        };
519
520        if let Some(error) = response.get("error") {
521            let message = error["message"].as_str().unwrap_or("Unknown host error");
522            let code = error["code"].as_i64().unwrap_or(-1);
523            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
524            if code == -32001 {
525                return Err(VmError::CategorizedError {
526                    message: message.to_string(),
527                    category: ErrorCategory::ToolRejected,
528                });
529            }
530            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
531        }
532
533        Ok(response["result"].clone())
534    }
535
536    /// Send a JSON-RPC notification to the host (no response expected).
537    /// Serialized through the stdout mutex to prevent interleaving.
538    pub fn notify(&self, method: &str, params: serde_json::Value) {
539        let notification = crate::jsonrpc::notification(method, params);
540        if self.in_process.is_some() {
541            return;
542        }
543        if let Ok(line) = serde_json::to_string(&notification) {
544            let _ = self.write_line(&line);
545        }
546    }
547
548    /// Check if the host has sent a cancel notification.
549    pub fn is_cancelled(&self) -> bool {
550        self.cancelled.load(Ordering::SeqCst)
551    }
552
553    pub fn take_resume_signal(&self) -> bool {
554        self.resume_requested.swap(false, Ordering::SeqCst)
555    }
556
557    pub fn signal_resume(&self) {
558        self.resume_requested.store(true, Ordering::SeqCst);
559    }
560
561    pub fn set_daemon_idle(&self, idle: bool) {
562        self.daemon_idle.store(idle, Ordering::SeqCst);
563    }
564
565    pub fn is_daemon_idle(&self) -> bool {
566        self.daemon_idle.load(Ordering::SeqCst)
567    }
568
569    /// Record the canonical ACP `stopReason` for the current prompt. The
570    /// last writer wins, which matches the semantic that an outer
571    /// `agent_loop` (the one whose result the user observes) always
572    /// finalizes after any inner loops it spawned.
573    pub fn set_prompt_stop_reason(&self, reason: &str) {
574        *self
575            .prompt_stop_reason
576            .lock()
577            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
578    }
579
580    /// Consume any prompt stop reason recorded during this prompt. The
581    /// ACP adapter calls this once after the pipeline returns; pipelines
582    /// that didn't run an `agent_loop` see `None` and the adapter falls
583    /// back to `end_turn`.
584    pub fn take_prompt_stop_reason(&self) -> Option<String> {
585        self.prompt_stop_reason
586            .lock()
587            .unwrap_or_else(|e| e.into_inner())
588            .take()
589    }
590
591    /// Consume any pending `skills/update` signal the host has sent.
592    /// Returns `true` exactly once per notification, letting callers
593    /// trigger a layered-discovery rebuild without polling false
594    /// positives. See issue #73 for the hot-reload contract.
595    pub fn take_skills_reload_signal(&self) -> bool {
596        self.skills_reload_requested.swap(false, Ordering::SeqCst)
597    }
598
599    /// Manually mark the skill catalog as stale. Used by tests and by
600    /// the CLI when an internal event (e.g. `harn install`) should
601    /// trigger the same rebuild a `skills/update` notification would.
602    pub fn signal_skills_reload(&self) {
603        self.skills_reload_requested.store(true, Ordering::SeqCst);
604    }
605
606    /// Call the host's `skills/list` RPC and return the raw JSON array
607    /// it responded with. Shape:
608    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
609    /// The CLI adapter converts each entry into a
610    /// [`crate::skills::SkillManifestRef`].
611    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
612        let result = self.call("skills/list", serde_json::json!({})).await?;
613        match result {
614            serde_json::Value::Array(items) => Ok(items),
615            serde_json::Value::Object(map) => match map.get("skills") {
616                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
617                _ => Err(VmError::Runtime(
618                    "skills/list: host response must be an array or { skills: [...] }".into(),
619                )),
620            },
621            _ => Err(VmError::Runtime(
622                "skills/list: unexpected response shape".into(),
623            )),
624        }
625    }
626
627    /// Call the host's `host/tools/list` RPC and return normalized tool
628    /// descriptors. Shape:
629    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
630    /// The bridge also accepts `{ "tools": [...] }` and
631    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
632    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
633        let result = self.call("host/tools/list", serde_json::json!({})).await?;
634        parse_host_tools_list_response(result)
635    }
636
637    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
638    /// raw JSON body so the CLI can inspect both the frontmatter fields
639    /// and the skill markdown body in whatever shape the host sends.
640    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
641        self.call("skills/fetch", serde_json::json!({ "id": id }))
642            .await
643    }
644
645    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
646        self.queued_user_messages
647            .lock()
648            .await
649            .push_back(QueuedUserMessage {
650                content,
651                mode: QueuedUserMessageMode::from_str(mode),
652            });
653    }
654
655    pub async fn take_queued_user_messages(
656        &self,
657        include_interrupt_immediate: bool,
658        include_finish_step: bool,
659        include_wait_for_completion: bool,
660    ) -> Vec<QueuedUserMessage> {
661        let mut queue = self.queued_user_messages.lock().await;
662        let mut selected = Vec::new();
663        let mut retained = VecDeque::new();
664        while let Some(message) = queue.pop_front() {
665            let should_take = match message.mode {
666                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
667                QueuedUserMessageMode::FinishStep => include_finish_step,
668                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
669            };
670            if should_take {
671                selected.push(message);
672            } else {
673                retained.push_back(message);
674            }
675        }
676        *queue = retained;
677        selected
678    }
679
680    pub async fn take_queued_user_messages_for(
681        &self,
682        checkpoint: DeliveryCheckpoint,
683    ) -> Vec<QueuedUserMessage> {
684        match checkpoint {
685            DeliveryCheckpoint::InterruptImmediate => {
686                self.take_queued_user_messages(true, false, false).await
687            }
688            DeliveryCheckpoint::AfterCurrentOperation => {
689                self.take_queued_user_messages(false, true, false).await
690            }
691            DeliveryCheckpoint::EndOfInteraction => {
692                self.take_queued_user_messages(false, false, true).await
693            }
694        }
695    }
696
697    /// Send an output notification (for log/print in bridge mode).
698    pub fn send_output(&self, text: &str) {
699        self.notify("output", serde_json::json!({"text": text}));
700    }
701
702    /// Send a progress notification with optional numeric progress and structured data.
703    pub fn send_progress(
704        &self,
705        phase: &str,
706        message: &str,
707        progress: Option<i64>,
708        total: Option<i64>,
709        data: Option<serde_json::Value>,
710    ) {
711        let mut payload = serde_json::json!({"phase": phase, "message": message});
712        if let Some(p) = progress {
713            payload["progress"] = serde_json::json!(p);
714        }
715        if let Some(t) = total {
716            payload["total"] = serde_json::json!(t);
717        }
718        if let Some(d) = data {
719            payload["data"] = d;
720        }
721        self.notify("progress", payload);
722    }
723
724    /// Send a structured log notification.
725    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
726        let mut payload = serde_json::json!({"level": level, "message": message});
727        if let Some(f) = fields {
728            payload["fields"] = f;
729        }
730        self.notify("log", payload);
731    }
732
733    /// Send a `session/update` with `call_start` — signals the beginning of
734    /// an LLM call, tool call, or builtin call for observability.
735    pub fn send_call_start(
736        &self,
737        call_id: &str,
738        call_type: &str,
739        name: &str,
740        metadata: serde_json::Value,
741    ) {
742        let session_id = self.get_session_id();
743        let script = self.get_script_name();
744        let stream_publicly = metadata
745            .get("stream_publicly")
746            .and_then(|value| value.as_bool())
747            .unwrap_or(true);
748        self.visible_call_streams
749            .lock()
750            .unwrap_or_else(|e| e.into_inner())
751            .insert(call_id.to_string(), stream_publicly);
752        self.notify(
753            "session/update",
754            serde_json::json!({
755                "sessionId": session_id,
756                "update": {
757                    "sessionUpdate": "call_start",
758                    "content": {
759                        "toolCallId": call_id,
760                        "call_type": call_type,
761                        "name": name,
762                        "script": script,
763                        "metadata": metadata,
764                    },
765                },
766            }),
767        );
768    }
769
770    /// Send a `session/update` with `call_progress` — a streaming token delta
771    /// from an in-flight LLM call.
772    pub fn send_call_progress(
773        &self,
774        call_id: &str,
775        delta: &str,
776        accumulated_tokens: u64,
777        user_visible: bool,
778    ) {
779        let session_id = self.get_session_id();
780        let (visible_text, visible_delta) = {
781            let stream_publicly = self
782                .visible_call_streams
783                .lock()
784                .unwrap_or_else(|e| e.into_inner())
785                .get(call_id)
786                .copied()
787                .unwrap_or(true);
788            let mut states = self
789                .visible_call_states
790                .lock()
791                .unwrap_or_else(|e| e.into_inner());
792            let state = states.entry(call_id.to_string()).or_default();
793            state.push(delta, stream_publicly)
794        };
795        self.notify(
796            "session/update",
797            serde_json::json!({
798                "sessionId": session_id,
799                "update": {
800                    "sessionUpdate": "call_progress",
801                    "content": {
802                        "toolCallId": call_id,
803                        "delta": delta,
804                        "accumulated_tokens": accumulated_tokens,
805                        "visible_text": visible_text,
806                        "visible_delta": visible_delta,
807                        "user_visible": user_visible,
808                    },
809                },
810            }),
811        );
812    }
813
814    /// Send a `session/update` with `call_end` — signals completion of a call.
815    pub fn send_call_end(
816        &self,
817        call_id: &str,
818        call_type: &str,
819        name: &str,
820        duration_ms: u64,
821        status: &str,
822        metadata: serde_json::Value,
823    ) {
824        let session_id = self.get_session_id();
825        let script = self.get_script_name();
826        self.visible_call_states
827            .lock()
828            .unwrap_or_else(|e| e.into_inner())
829            .remove(call_id);
830        self.visible_call_streams
831            .lock()
832            .unwrap_or_else(|e| e.into_inner())
833            .remove(call_id);
834        self.notify(
835            "session/update",
836            serde_json::json!({
837                "sessionId": session_id,
838                "update": {
839                    "sessionUpdate": "call_end",
840                    "content": {
841                        "toolCallId": call_id,
842                        "call_type": call_type,
843                        "name": name,
844                        "script": script,
845                        "duration_ms": duration_ms,
846                        "status": status,
847                        "metadata": metadata,
848                    },
849                },
850            }),
851        );
852    }
853
854    /// Send a worker lifecycle update for delegated/background execution.
855    pub fn send_worker_update(
856        &self,
857        worker_id: &str,
858        worker_name: &str,
859        status: &str,
860        metadata: serde_json::Value,
861        audit: Option<&MutationSessionRecord>,
862    ) {
863        let session_id = self.get_session_id();
864        let script = self.get_script_name();
865        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
866        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
867        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
868        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
869        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
870        let lifecycle = serde_json::json!({
871            "event": status,
872            "worker_id": worker_id,
873            "worker_name": worker_name,
874            "started_at": started_at,
875            "finished_at": finished_at,
876        });
877        self.notify(
878            "session/update",
879            serde_json::json!({
880                "sessionId": session_id,
881                "update": {
882                    "sessionUpdate": "worker_update",
883                    "content": {
884                        "worker_id": worker_id,
885                        "worker_name": worker_name,
886                        "status": status,
887                        "script": script,
888                        "started_at": started_at,
889                        "finished_at": finished_at,
890                        "snapshot_path": snapshot_path,
891                        "run_id": run_id,
892                        "run_path": run_path,
893                        "lifecycle": lifecycle,
894                        "audit": audit,
895                        "metadata": metadata,
896                    },
897                },
898            }),
899        );
900    }
901}
902
903/// Convert a serde_json::Value to a VmValue.
904pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
905    crate::stdlib::json_to_vm_value(val)
906}
907
908fn parse_host_tools_list_response(
909    result: serde_json::Value,
910) -> Result<Vec<serde_json::Value>, VmError> {
911    let tools = match result {
912        serde_json::Value::Array(items) => items,
913        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
914            map.get("result")
915                .and_then(|value| value.get("tools"))
916                .cloned()
917        }) {
918            Some(serde_json::Value::Array(items)) => items,
919            _ => {
920                return Err(VmError::Runtime(
921                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
922                ));
923            }
924        },
925        _ => {
926            return Err(VmError::Runtime(
927                "host/tools/list: unexpected response shape".into(),
928            ));
929        }
930    };
931
932    let mut normalized = Vec::with_capacity(tools.len());
933    for tool in tools {
934        let serde_json::Value::Object(map) = tool else {
935            return Err(VmError::Runtime(
936                "host/tools/list: every tool must be an object".into(),
937            ));
938        };
939        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
940            return Err(VmError::Runtime(
941                "host/tools/list: every tool must include a string `name`".into(),
942            ));
943        };
944        let description = map
945            .get("description")
946            .and_then(|value| value.as_str())
947            .or_else(|| {
948                map.get("short_description")
949                    .and_then(|value| value.as_str())
950            })
951            .unwrap_or_default();
952        let schema = map
953            .get("schema")
954            .cloned()
955            .or_else(|| map.get("parameters").cloned())
956            .or_else(|| map.get("input_schema").cloned())
957            .unwrap_or(serde_json::Value::Null);
958        let deprecated = map
959            .get("deprecated")
960            .and_then(|value| value.as_bool())
961            .unwrap_or(false);
962        normalized.push(serde_json::json!({
963            "name": name,
964            "description": description,
965            "schema": schema,
966            "deprecated": deprecated,
967        }));
968    }
969    Ok(normalized)
970}
971
972#[cfg(test)]
973mod tests {
974    use super::*;
975
976    #[test]
977    fn test_json_rpc_request_format() {
978        let request = crate::jsonrpc::request(
979            1,
980            "llm_call",
981            serde_json::json!({
982                "prompt": "Hello",
983                "system": "Be helpful",
984            }),
985        );
986        let s = serde_json::to_string(&request).unwrap();
987        assert!(s.contains("\"jsonrpc\":\"2.0\""));
988        assert!(s.contains("\"id\":1"));
989        assert!(s.contains("\"method\":\"llm_call\""));
990    }
991
992    #[test]
993    fn test_json_rpc_notification_format() {
994        let notification =
995            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
996        let s = serde_json::to_string(&notification).unwrap();
997        assert!(s.contains("\"method\":\"output\""));
998        assert!(!s.contains("\"id\""));
999    }
1000
1001    #[test]
1002    fn test_json_rpc_error_response_parsing() {
1003        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1004        assert!(response.get("error").is_some());
1005        assert_eq!(
1006            response["error"]["message"].as_str().unwrap(),
1007            "Invalid request"
1008        );
1009    }
1010
1011    #[test]
1012    fn test_json_rpc_success_response_parsing() {
1013        let response = crate::jsonrpc::response(
1014            1,
1015            serde_json::json!({
1016                "text": "Hello world",
1017                "input_tokens": 10,
1018                "output_tokens": 5,
1019            }),
1020        );
1021        assert!(response.get("result").is_some());
1022        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1023    }
1024
1025    #[test]
1026    fn test_cancelled_flag() {
1027        let cancelled = Arc::new(AtomicBool::new(false));
1028        assert!(!cancelled.load(Ordering::SeqCst));
1029        cancelled.store(true, Ordering::SeqCst);
1030        assert!(cancelled.load(Ordering::SeqCst));
1031    }
1032
1033    #[test]
1034    fn pending_host_calls_return_when_cancellation_arrives() {
1035        let runtime = tokio::runtime::Builder::new_current_thread()
1036            .enable_all()
1037            .build()
1038            .unwrap();
1039        runtime.block_on(async {
1040            let pending = Arc::new(Mutex::new(HashMap::new()));
1041            let cancelled = Arc::new(AtomicBool::new(false));
1042            let bridge = HostBridge::from_parts_with_writer(
1043                pending.clone(),
1044                cancelled.clone(),
1045                Arc::new(|_| Ok(())),
1046                1,
1047            );
1048
1049            let call = bridge.call("host/work", serde_json::json!({}));
1050            tokio::pin!(call);
1051
1052            loop {
1053                tokio::select! {
1054                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1055                    _ = tokio::task::yield_now() => {}
1056                }
1057                if !pending.lock().await.is_empty() {
1058                    break;
1059                }
1060            }
1061
1062            cancelled.store(true, Ordering::SeqCst);
1063            bridge.cancel_notify.notify_waiters();
1064
1065            let result = tokio::time::timeout(Duration::from_secs(1), call)
1066                .await
1067                .expect("pending call should observe cancellation promptly");
1068            assert!(
1069                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1070            );
1071            assert!(pending.lock().await.is_empty());
1072        });
1073    }
1074
1075    #[test]
1076    fn queued_messages_are_filtered_by_delivery_mode() {
1077        let runtime = tokio::runtime::Builder::new_current_thread()
1078            .enable_all()
1079            .build()
1080            .unwrap();
1081        runtime.block_on(async {
1082            let bridge = HostBridge::from_parts(
1083                Arc::new(Mutex::new(HashMap::new())),
1084                Arc::new(AtomicBool::new(false)),
1085                Arc::new(std::sync::Mutex::new(())),
1086                1,
1087            );
1088            bridge
1089                .push_queued_user_message("first".to_string(), "finish_step")
1090                .await;
1091            bridge
1092                .push_queued_user_message("second".to_string(), "wait_for_completion")
1093                .await;
1094
1095            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1096            assert_eq!(finish_step.len(), 1);
1097            assert_eq!(finish_step[0].content, "first");
1098
1099            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1100            assert_eq!(turn_end.len(), 1);
1101            assert_eq!(turn_end[0].content, "second");
1102        });
1103    }
1104
1105    #[test]
1106    fn test_json_result_to_vm_value_string() {
1107        let val = serde_json::json!("hello");
1108        let vm_val = json_result_to_vm_value(&val);
1109        assert_eq!(vm_val.display(), "hello");
1110    }
1111
1112    #[test]
1113    fn test_json_result_to_vm_value_dict() {
1114        let val = serde_json::json!({"name": "test", "count": 42});
1115        let vm_val = json_result_to_vm_value(&val);
1116        let VmValue::Dict(d) = &vm_val else {
1117            unreachable!("Expected Dict, got {:?}", vm_val);
1118        };
1119        assert_eq!(d.get("name").unwrap().display(), "test");
1120        assert_eq!(d.get("count").unwrap().display(), "42");
1121    }
1122
1123    #[test]
1124    fn test_json_result_to_vm_value_null() {
1125        let val = serde_json::json!(null);
1126        let vm_val = json_result_to_vm_value(&val);
1127        assert!(matches!(vm_val, VmValue::Nil));
1128    }
1129
1130    #[test]
1131    fn test_json_result_to_vm_value_nested() {
1132        let val = serde_json::json!({
1133            "text": "response",
1134            "tool_calls": [
1135                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1136            ],
1137            "input_tokens": 100,
1138            "output_tokens": 50,
1139        });
1140        let vm_val = json_result_to_vm_value(&val);
1141        let VmValue::Dict(d) = &vm_val else {
1142            unreachable!("Expected Dict, got {:?}", vm_val);
1143        };
1144        assert_eq!(d.get("text").unwrap().display(), "response");
1145        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1146            unreachable!("Expected List for tool_calls");
1147        };
1148        assert_eq!(list.len(), 1);
1149    }
1150
1151    #[test]
1152    fn parse_host_tools_list_accepts_object_wrapper() {
1153        let tools = parse_host_tools_list_response(serde_json::json!({
1154            "tools": [
1155                {
1156                    "name": "Read",
1157                    "description": "Read a file",
1158                    "schema": {"type": "object"},
1159                }
1160            ]
1161        }))
1162        .expect("tool list");
1163
1164        assert_eq!(tools.len(), 1);
1165        assert_eq!(tools[0]["name"], "Read");
1166        assert_eq!(tools[0]["deprecated"], false);
1167    }
1168
1169    #[test]
1170    fn parse_host_tools_list_accepts_compat_fields() {
1171        let tools = parse_host_tools_list_response(serde_json::json!({
1172            "result": {
1173                "tools": [
1174                    {
1175                        "name": "Edit",
1176                        "short_description": "Apply an edit",
1177                        "input_schema": {"type": "object"},
1178                        "deprecated": true,
1179                    }
1180                ]
1181            }
1182        }))
1183        .expect("tool list");
1184
1185        assert_eq!(tools[0]["description"], "Apply an edit");
1186        assert_eq!(tools[0]["schema"]["type"], "object");
1187        assert_eq!(tools[0]["deprecated"], true);
1188    }
1189
1190    #[test]
1191    fn parse_host_tools_list_requires_tool_names() {
1192        let err = parse_host_tools_list_response(serde_json::json!({
1193            "tools": [
1194                {"description": "missing name"}
1195            ]
1196        }))
1197        .expect_err("expected error");
1198        assert!(err
1199            .to_string()
1200            .contains("host/tools/list: every tool must include a string `name`"));
1201    }
1202
1203    #[test]
1204    fn test_timeout_duration() {
1205        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1206    }
1207}