Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23/// Default timeout for bridge calls (5 minutes).
24const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
27///
28/// The bridge sends requests to the host on stdout and receives responses
29/// on stdin. A background task reads stdin and dispatches responses to
30/// waiting callers by request ID. All stdout writes are serialized through
31/// a mutex to prevent interleaving.
32pub struct HostBridge {
33    next_id: AtomicU64,
34    /// Pending request waiters, keyed by JSON-RPC id.
35    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
36    /// Whether the host has sent a cancel notification.
37    cancelled: Arc<AtomicBool>,
38    /// Mutex protecting stdout writes to prevent interleaving.
39    stdout_lock: Arc<std::sync::Mutex<()>>,
40    /// ACP session ID (set in ACP mode for session-scoped notifications).
41    session_id: std::sync::Mutex<String>,
42    /// Name of the currently executing Harn script (without .harn suffix).
43    script_name: std::sync::Mutex<String>,
44    /// User messages injected by the host while a run is active.
45    queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
46    /// Host-triggered resume signal for daemon agents.
47    resume_requested: Arc<AtomicBool>,
48    /// Host-triggered skill-registry invalidation signal. Set when the
49    /// host sends a `skills/update` notification; consumed by the CLI
50    /// between runs (watch mode, long-running agents) to rebuild the
51    /// layered skill catalog from its current filesystem + host state.
52    skills_reload_requested: Arc<AtomicBool>,
53    /// Whether the current daemon-mode agent loop is blocked in idle wait.
54    daemon_idle: Arc<AtomicBool>,
55    /// Per-call visible assistant text state for call_progress notifications.
56    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
57    /// Whether an LLM call's deltas should be exposed to end users while streaming.
58    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
59    /// Optional in-process host-module backend used by `harn playground`.
60    in_process: Option<InProcessHost>,
61    #[cfg(test)]
62    recorded_notifications: Arc<std::sync::Mutex<Vec<serde_json::Value>>>,
63}
64
65struct InProcessHost {
66    module_path: PathBuf,
67    exported_functions: BTreeMap<String, Rc<VmClosure>>,
68    vm: Vm,
69}
70
71impl InProcessHost {
72    async fn dispatch(
73        &self,
74        method: &str,
75        params: serde_json::Value,
76    ) -> Result<serde_json::Value, VmError> {
77        match method {
78            "builtin_call" => {
79                let name = params
80                    .get("name")
81                    .and_then(|value| value.as_str())
82                    .unwrap_or_default();
83                let args = params
84                    .get("args")
85                    .and_then(|value| value.as_array())
86                    .cloned()
87                    .unwrap_or_default()
88                    .into_iter()
89                    .map(|value| json_result_to_vm_value(&value))
90                    .collect::<Vec<_>>();
91                self.invoke_export(name, &args).await
92            }
93            "host/tools/list" => self
94                .invoke_optional_export("host_tools_list", &[])
95                .await
96                .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
97            "session/request_permission" => self.request_permission(params).await,
98            other => Err(VmError::Runtime(format!(
99                "playground host backend does not implement bridge method '{other}'"
100            ))),
101        }
102    }
103
104    async fn invoke_export(
105        &self,
106        name: &str,
107        args: &[VmValue],
108    ) -> Result<serde_json::Value, VmError> {
109        let Some(closure) = self.exported_functions.get(name) else {
110            return Err(VmError::Runtime(format!(
111                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
112                self.module_path.display()
113            )));
114        };
115
116        let mut vm = self.vm.child_vm_for_host();
117        let result = vm.call_closure_pub(closure, args, &[]).await?;
118        Ok(crate::llm::vm_value_to_json(&result))
119    }
120
121    async fn invoke_optional_export(
122        &self,
123        name: &str,
124        args: &[VmValue],
125    ) -> Result<Option<serde_json::Value>, VmError> {
126        if !self.exported_functions.contains_key(name) {
127            return Ok(None);
128        }
129        self.invoke_export(name, args).await.map(Some)
130    }
131
132    async fn request_permission(
133        &self,
134        params: serde_json::Value,
135    ) -> Result<serde_json::Value, VmError> {
136        let Some(closure) = self.exported_functions.get("request_permission") else {
137            return Ok(serde_json::json!({ "granted": true }));
138        };
139
140        let tool_name = params
141            .get("toolCall")
142            .and_then(|tool_call| tool_call.get("toolName"))
143            .and_then(|value| value.as_str())
144            .unwrap_or_default();
145        let tool_args = params
146            .get("toolCall")
147            .and_then(|tool_call| tool_call.get("rawInput"))
148            .map(json_result_to_vm_value)
149            .unwrap_or(VmValue::Nil);
150        let full_payload = json_result_to_vm_value(&params);
151
152        let arg_count = closure.func.params.len();
153        let args = if arg_count >= 3 {
154            vec![
155                VmValue::String(Rc::from(tool_name.to_string())),
156                tool_args,
157                full_payload,
158            ]
159        } else if arg_count == 2 {
160            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
161        } else if arg_count == 1 {
162            vec![full_payload]
163        } else {
164            Vec::new()
165        };
166
167        let mut vm = self.vm.child_vm_for_host();
168        let result = vm.call_closure_pub(closure, &args, &[]).await?;
169        let payload = match result {
170            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
171            VmValue::String(reason) if !reason.is_empty() => {
172                serde_json::json!({ "granted": false, "reason": reason.to_string() })
173            }
174            other => {
175                let json = crate::llm::vm_value_to_json(&other);
176                if json
177                    .get("granted")
178                    .and_then(|value| value.as_bool())
179                    .is_some()
180                    || json.get("outcome").is_some()
181                {
182                    json
183                } else {
184                    serde_json::json!({ "granted": other.is_truthy() })
185                }
186            }
187        };
188        Ok(payload)
189    }
190}
191
192#[derive(Clone, Debug, PartialEq, Eq)]
193pub enum QueuedUserMessageMode {
194    InterruptImmediate,
195    FinishStep,
196    WaitForCompletion,
197}
198
199#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200pub enum DeliveryCheckpoint {
201    InterruptImmediate,
202    AfterCurrentOperation,
203    EndOfInteraction,
204}
205
206impl QueuedUserMessageMode {
207    fn from_str(value: &str) -> Self {
208        match value {
209            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
210            "finish_step" | "after_current_operation" => Self::FinishStep,
211            _ => Self::WaitForCompletion,
212        }
213    }
214}
215
216#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct QueuedUserMessage {
218    pub content: String,
219    pub mode: QueuedUserMessageMode,
220}
221
222// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
223#[allow(clippy::new_without_default)]
224impl HostBridge {
225    /// Create a new bridge and spawn the stdin reader task.
226    ///
227    /// Must be called within a tokio LocalSet (uses spawn_local for the
228    /// stdin reader since it's single-threaded).
229    pub fn new() -> Self {
230        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
231            Arc::new(Mutex::new(HashMap::new()));
232        let cancelled = Arc::new(AtomicBool::new(false));
233        let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
234            Arc::new(Mutex::new(VecDeque::new()));
235        let resume_requested = Arc::new(AtomicBool::new(false));
236        let skills_reload_requested = Arc::new(AtomicBool::new(false));
237        let daemon_idle = Arc::new(AtomicBool::new(false));
238
239        // Stdin reader: reads JSON-RPC lines and dispatches responses
240        let pending_clone = pending.clone();
241        let cancelled_clone = cancelled.clone();
242        let queued_clone = queued_user_messages.clone();
243        let resume_clone = resume_requested.clone();
244        let skills_reload_clone = skills_reload_requested.clone();
245        tokio::task::spawn_local(async move {
246            let stdin = tokio::io::stdin();
247            let reader = tokio::io::BufReader::new(stdin);
248            let mut lines = reader.lines();
249
250            while let Ok(Some(line)) = lines.next_line().await {
251                let line = line.trim().to_string();
252                if line.is_empty() {
253                    continue;
254                }
255
256                let msg: serde_json::Value = match serde_json::from_str(&line) {
257                    Ok(v) => v,
258                    Err(_) => continue,
259                };
260
261                // Notifications have no id; responses have one.
262                if msg.get("id").is_none() {
263                    if let Some(method) = msg["method"].as_str() {
264                        if method == "cancel" {
265                            cancelled_clone.store(true, Ordering::SeqCst);
266                        } else if method == "agent/resume" {
267                            resume_clone.store(true, Ordering::SeqCst);
268                        } else if method == "skills/update" {
269                            skills_reload_clone.store(true, Ordering::SeqCst);
270                        } else if method == "user_message"
271                            || method == "session/input"
272                            || method == "agent/user_message"
273                        {
274                            let params = &msg["params"];
275                            let content = params
276                                .get("content")
277                                .and_then(|v| v.as_str())
278                                .unwrap_or("")
279                                .to_string();
280                            if !content.is_empty() {
281                                let mode = QueuedUserMessageMode::from_str(
282                                    params
283                                        .get("mode")
284                                        .and_then(|v| v.as_str())
285                                        .unwrap_or("wait_for_completion"),
286                                );
287                                queued_clone
288                                    .lock()
289                                    .await
290                                    .push_back(QueuedUserMessage { content, mode });
291                            }
292                        }
293                    }
294                    continue;
295                }
296
297                if let Some(id) = msg["id"].as_u64() {
298                    let mut pending = pending_clone.lock().await;
299                    if let Some(sender) = pending.remove(&id) {
300                        let _ = sender.send(msg);
301                    }
302                }
303            }
304
305            // stdin closed: drop pending senders to cancel waiters.
306            let mut pending = pending_clone.lock().await;
307            pending.clear();
308        });
309
310        Self {
311            next_id: AtomicU64::new(1),
312            pending,
313            cancelled,
314            stdout_lock: Arc::new(std::sync::Mutex::new(())),
315            session_id: std::sync::Mutex::new(String::new()),
316            script_name: std::sync::Mutex::new(String::new()),
317            queued_user_messages,
318            resume_requested,
319            skills_reload_requested,
320            daemon_idle,
321            visible_call_states: std::sync::Mutex::new(HashMap::new()),
322            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
323            in_process: None,
324            #[cfg(test)]
325            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
326        }
327    }
328
329    /// Create a bridge from pre-existing shared state.
330    ///
331    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
332    /// responsible for dispatching responses into `pending`.  This is used
333    /// by ACP mode which already has its own stdin reader.
334    pub fn from_parts(
335        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
336        cancelled: Arc<AtomicBool>,
337        stdout_lock: Arc<std::sync::Mutex<()>>,
338        start_id: u64,
339    ) -> Self {
340        Self {
341            next_id: AtomicU64::new(start_id),
342            pending,
343            cancelled,
344            stdout_lock,
345            session_id: std::sync::Mutex::new(String::new()),
346            script_name: std::sync::Mutex::new(String::new()),
347            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
348            resume_requested: Arc::new(AtomicBool::new(false)),
349            skills_reload_requested: Arc::new(AtomicBool::new(false)),
350            daemon_idle: Arc::new(AtomicBool::new(false)),
351            visible_call_states: std::sync::Mutex::new(HashMap::new()),
352            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
353            in_process: None,
354            #[cfg(test)]
355            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
356        }
357    }
358
359    /// Create an in-process host bridge backed by exported functions from a
360    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
361    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
362        let exported_functions = vm.load_module_exports(module_path).await?;
363        Ok(Self {
364            next_id: AtomicU64::new(1),
365            pending: Arc::new(Mutex::new(HashMap::new())),
366            cancelled: Arc::new(AtomicBool::new(false)),
367            stdout_lock: Arc::new(std::sync::Mutex::new(())),
368            session_id: std::sync::Mutex::new(String::new()),
369            script_name: std::sync::Mutex::new(String::new()),
370            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
371            resume_requested: Arc::new(AtomicBool::new(false)),
372            skills_reload_requested: Arc::new(AtomicBool::new(false)),
373            daemon_idle: Arc::new(AtomicBool::new(false)),
374            visible_call_states: std::sync::Mutex::new(HashMap::new()),
375            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
376            in_process: Some(InProcessHost {
377                module_path: module_path.to_path_buf(),
378                exported_functions,
379                vm,
380            }),
381            #[cfg(test)]
382            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
383        })
384    }
385
386    /// Set the ACP session ID for session-scoped notifications.
387    pub fn set_session_id(&self, id: &str) {
388        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
389    }
390
391    /// Set the currently executing script name (without .harn suffix).
392    pub fn set_script_name(&self, name: &str) {
393        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
394    }
395
396    /// Get the current script name.
397    fn get_script_name(&self) -> String {
398        self.script_name
399            .lock()
400            .unwrap_or_else(|e| e.into_inner())
401            .clone()
402    }
403
404    /// Get the session ID.
405    fn get_session_id(&self) -> String {
406        self.session_id
407            .lock()
408            .unwrap_or_else(|e| e.into_inner())
409            .clone()
410    }
411
412    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
413    fn write_line(&self, line: &str) -> Result<(), VmError> {
414        let _guard = self.stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
415        let mut stdout = std::io::stdout().lock();
416        stdout
417            .write_all(line.as_bytes())
418            .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
419        stdout
420            .write_all(b"\n")
421            .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
422        stdout
423            .flush()
424            .map_err(|e| VmError::Runtime(format!("Bridge flush error: {e}")))?;
425        Ok(())
426    }
427
428    /// Send a JSON-RPC request to the host and wait for the response.
429    /// Times out after 5 minutes to prevent deadlocks.
430    pub async fn call(
431        &self,
432        method: &str,
433        params: serde_json::Value,
434    ) -> Result<serde_json::Value, VmError> {
435        if let Some(in_process) = &self.in_process {
436            return in_process.dispatch(method, params).await;
437        }
438
439        if self.is_cancelled() {
440            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
441        }
442
443        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
444
445        let request = crate::jsonrpc::request(id, method, params);
446
447        let (tx, rx) = oneshot::channel();
448        {
449            let mut pending = self.pending.lock().await;
450            pending.insert(id, tx);
451        }
452
453        let line = serde_json::to_string(&request)
454            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
455        if let Err(e) = self.write_line(&line) {
456            let mut pending = self.pending.lock().await;
457            pending.remove(&id);
458            return Err(e);
459        }
460
461        let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
462            Ok(Ok(msg)) => msg,
463            Ok(Err(_)) => {
464                // Sender dropped: host closed or stdin reader exited.
465                return Err(VmError::Runtime(
466                    "Bridge: host closed connection before responding".into(),
467                ));
468            }
469            Err(_) => {
470                let mut pending = self.pending.lock().await;
471                pending.remove(&id);
472                return Err(VmError::Runtime(format!(
473                    "Bridge: host did not respond to '{method}' within {}s",
474                    DEFAULT_TIMEOUT.as_secs()
475                )));
476            }
477        };
478
479        if let Some(error) = response.get("error") {
480            let message = error["message"].as_str().unwrap_or("Unknown host error");
481            let code = error["code"].as_i64().unwrap_or(-1);
482            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
483            if code == -32001 {
484                return Err(VmError::CategorizedError {
485                    message: message.to_string(),
486                    category: ErrorCategory::ToolRejected,
487                });
488            }
489            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
490        }
491
492        Ok(response["result"].clone())
493    }
494
495    /// Send a JSON-RPC notification to the host (no response expected).
496    /// Serialized through the stdout mutex to prevent interleaving.
497    pub fn notify(&self, method: &str, params: serde_json::Value) {
498        let notification = crate::jsonrpc::notification(method, params);
499        #[cfg(test)]
500        self.recorded_notifications
501            .lock()
502            .unwrap_or_else(|e| e.into_inner())
503            .push(notification.clone());
504        if self.in_process.is_some() {
505            return;
506        }
507        if let Ok(line) = serde_json::to_string(&notification) {
508            let _ = self.write_line(&line);
509        }
510    }
511
512    #[cfg(test)]
513    pub(crate) fn recorded_notifications(&self) -> Vec<serde_json::Value> {
514        self.recorded_notifications
515            .lock()
516            .unwrap_or_else(|e| e.into_inner())
517            .clone()
518    }
519
520    /// Check if the host has sent a cancel notification.
521    pub fn is_cancelled(&self) -> bool {
522        self.cancelled.load(Ordering::SeqCst)
523    }
524
525    pub fn take_resume_signal(&self) -> bool {
526        self.resume_requested.swap(false, Ordering::SeqCst)
527    }
528
529    pub fn signal_resume(&self) {
530        self.resume_requested.store(true, Ordering::SeqCst);
531    }
532
533    pub fn set_daemon_idle(&self, idle: bool) {
534        self.daemon_idle.store(idle, Ordering::SeqCst);
535    }
536
537    pub fn is_daemon_idle(&self) -> bool {
538        self.daemon_idle.load(Ordering::SeqCst)
539    }
540
541    /// Consume any pending `skills/update` signal the host has sent.
542    /// Returns `true` exactly once per notification, letting callers
543    /// trigger a layered-discovery rebuild without polling false
544    /// positives. See issue #73 for the hot-reload contract.
545    pub fn take_skills_reload_signal(&self) -> bool {
546        self.skills_reload_requested.swap(false, Ordering::SeqCst)
547    }
548
549    /// Manually mark the skill catalog as stale. Used by tests and by
550    /// the CLI when an internal event (e.g. `harn install`) should
551    /// trigger the same rebuild a `skills/update` notification would.
552    pub fn signal_skills_reload(&self) {
553        self.skills_reload_requested.store(true, Ordering::SeqCst);
554    }
555
556    /// Call the host's `skills/list` RPC and return the raw JSON array
557    /// it responded with. Shape:
558    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
559    /// The CLI adapter converts each entry into a
560    /// [`crate::skills::SkillManifestRef`].
561    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
562        let result = self.call("skills/list", serde_json::json!({})).await?;
563        match result {
564            serde_json::Value::Array(items) => Ok(items),
565            serde_json::Value::Object(map) => match map.get("skills") {
566                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
567                _ => Err(VmError::Runtime(
568                    "skills/list: host response must be an array or { skills: [...] }".into(),
569                )),
570            },
571            _ => Err(VmError::Runtime(
572                "skills/list: unexpected response shape".into(),
573            )),
574        }
575    }
576
577    /// Call the host's `host/tools/list` RPC and return normalized tool
578    /// descriptors. Shape:
579    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
580    /// The bridge also accepts `{ "tools": [...] }` and
581    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
582    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
583        let result = self.call("host/tools/list", serde_json::json!({})).await?;
584        parse_host_tools_list_response(result)
585    }
586
587    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
588    /// raw JSON body so the CLI can inspect both the frontmatter fields
589    /// and the skill markdown body in whatever shape the host sends.
590    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
591        self.call("skills/fetch", serde_json::json!({ "id": id }))
592            .await
593    }
594
595    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
596        self.queued_user_messages
597            .lock()
598            .await
599            .push_back(QueuedUserMessage {
600                content,
601                mode: QueuedUserMessageMode::from_str(mode),
602            });
603    }
604
605    pub async fn take_queued_user_messages(
606        &self,
607        include_interrupt_immediate: bool,
608        include_finish_step: bool,
609        include_wait_for_completion: bool,
610    ) -> Vec<QueuedUserMessage> {
611        let mut queue = self.queued_user_messages.lock().await;
612        let mut selected = Vec::new();
613        let mut retained = VecDeque::new();
614        while let Some(message) = queue.pop_front() {
615            let should_take = match message.mode {
616                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
617                QueuedUserMessageMode::FinishStep => include_finish_step,
618                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
619            };
620            if should_take {
621                selected.push(message);
622            } else {
623                retained.push_back(message);
624            }
625        }
626        *queue = retained;
627        selected
628    }
629
630    pub async fn take_queued_user_messages_for(
631        &self,
632        checkpoint: DeliveryCheckpoint,
633    ) -> Vec<QueuedUserMessage> {
634        match checkpoint {
635            DeliveryCheckpoint::InterruptImmediate => {
636                self.take_queued_user_messages(true, false, false).await
637            }
638            DeliveryCheckpoint::AfterCurrentOperation => {
639                self.take_queued_user_messages(false, true, false).await
640            }
641            DeliveryCheckpoint::EndOfInteraction => {
642                self.take_queued_user_messages(false, false, true).await
643            }
644        }
645    }
646
647    /// Send an output notification (for log/print in bridge mode).
648    pub fn send_output(&self, text: &str) {
649        self.notify("output", serde_json::json!({"text": text}));
650    }
651
652    /// Send a progress notification with optional numeric progress and structured data.
653    pub fn send_progress(
654        &self,
655        phase: &str,
656        message: &str,
657        progress: Option<i64>,
658        total: Option<i64>,
659        data: Option<serde_json::Value>,
660    ) {
661        let mut payload = serde_json::json!({"phase": phase, "message": message});
662        if let Some(p) = progress {
663            payload["progress"] = serde_json::json!(p);
664        }
665        if let Some(t) = total {
666            payload["total"] = serde_json::json!(t);
667        }
668        if let Some(d) = data {
669            payload["data"] = d;
670        }
671        self.notify("progress", payload);
672    }
673
674    /// Send a structured log notification.
675    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
676        let mut payload = serde_json::json!({"level": level, "message": message});
677        if let Some(f) = fields {
678            payload["fields"] = f;
679        }
680        self.notify("log", payload);
681    }
682
683    /// Send a `session/update` with `call_start` — signals the beginning of
684    /// an LLM call, tool call, or builtin call for observability.
685    pub fn send_call_start(
686        &self,
687        call_id: &str,
688        call_type: &str,
689        name: &str,
690        metadata: serde_json::Value,
691    ) {
692        let session_id = self.get_session_id();
693        let script = self.get_script_name();
694        let stream_publicly = metadata
695            .get("stream_publicly")
696            .and_then(|value| value.as_bool())
697            .unwrap_or(true);
698        self.visible_call_streams
699            .lock()
700            .unwrap_or_else(|e| e.into_inner())
701            .insert(call_id.to_string(), stream_publicly);
702        self.notify(
703            "session/update",
704            serde_json::json!({
705                "sessionId": session_id,
706                "update": {
707                    "sessionUpdate": "call_start",
708                    "content": {
709                        "toolCallId": call_id,
710                        "call_type": call_type,
711                        "name": name,
712                        "script": script,
713                        "metadata": metadata,
714                    },
715                },
716            }),
717        );
718    }
719
720    /// Send a `session/update` with `call_progress` — a streaming token delta
721    /// from an in-flight LLM call.
722    pub fn send_call_progress(
723        &self,
724        call_id: &str,
725        delta: &str,
726        accumulated_tokens: u64,
727        user_visible: bool,
728    ) {
729        let session_id = self.get_session_id();
730        let (visible_text, visible_delta) = {
731            let stream_publicly = self
732                .visible_call_streams
733                .lock()
734                .unwrap_or_else(|e| e.into_inner())
735                .get(call_id)
736                .copied()
737                .unwrap_or(true);
738            let mut states = self
739                .visible_call_states
740                .lock()
741                .unwrap_or_else(|e| e.into_inner());
742            let state = states.entry(call_id.to_string()).or_default();
743            state.push(delta, stream_publicly)
744        };
745        self.notify(
746            "session/update",
747            serde_json::json!({
748                "sessionId": session_id,
749                "update": {
750                    "sessionUpdate": "call_progress",
751                    "content": {
752                        "toolCallId": call_id,
753                        "delta": delta,
754                        "accumulated_tokens": accumulated_tokens,
755                        "visible_text": visible_text,
756                        "visible_delta": visible_delta,
757                        "user_visible": user_visible,
758                    },
759                },
760            }),
761        );
762    }
763
764    /// Send a `session/update` with `call_end` — signals completion of a call.
765    pub fn send_call_end(
766        &self,
767        call_id: &str,
768        call_type: &str,
769        name: &str,
770        duration_ms: u64,
771        status: &str,
772        metadata: serde_json::Value,
773    ) {
774        let session_id = self.get_session_id();
775        let script = self.get_script_name();
776        self.visible_call_states
777            .lock()
778            .unwrap_or_else(|e| e.into_inner())
779            .remove(call_id);
780        self.visible_call_streams
781            .lock()
782            .unwrap_or_else(|e| e.into_inner())
783            .remove(call_id);
784        self.notify(
785            "session/update",
786            serde_json::json!({
787                "sessionId": session_id,
788                "update": {
789                    "sessionUpdate": "call_end",
790                    "content": {
791                        "toolCallId": call_id,
792                        "call_type": call_type,
793                        "name": name,
794                        "script": script,
795                        "duration_ms": duration_ms,
796                        "status": status,
797                        "metadata": metadata,
798                    },
799                },
800            }),
801        );
802    }
803
804    /// Send a worker lifecycle update for delegated/background execution.
805    pub fn send_worker_update(
806        &self,
807        worker_id: &str,
808        worker_name: &str,
809        status: &str,
810        metadata: serde_json::Value,
811        audit: Option<&MutationSessionRecord>,
812    ) {
813        let session_id = self.get_session_id();
814        let script = self.get_script_name();
815        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
816        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
817        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
818        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
819        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
820        let lifecycle = serde_json::json!({
821            "event": status,
822            "worker_id": worker_id,
823            "worker_name": worker_name,
824            "started_at": started_at,
825            "finished_at": finished_at,
826        });
827        self.notify(
828            "session/update",
829            serde_json::json!({
830                "sessionId": session_id,
831                "update": {
832                    "sessionUpdate": "worker_update",
833                    "content": {
834                        "worker_id": worker_id,
835                        "worker_name": worker_name,
836                        "status": status,
837                        "script": script,
838                        "started_at": started_at,
839                        "finished_at": finished_at,
840                        "snapshot_path": snapshot_path,
841                        "run_id": run_id,
842                        "run_path": run_path,
843                        "lifecycle": lifecycle,
844                        "audit": audit,
845                        "metadata": metadata,
846                    },
847                },
848            }),
849        );
850    }
851}
852
853/// Convert a serde_json::Value to a VmValue.
854pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
855    crate::stdlib::json_to_vm_value(val)
856}
857
858fn parse_host_tools_list_response(
859    result: serde_json::Value,
860) -> Result<Vec<serde_json::Value>, VmError> {
861    let tools = match result {
862        serde_json::Value::Array(items) => items,
863        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
864            map.get("result")
865                .and_then(|value| value.get("tools"))
866                .cloned()
867        }) {
868            Some(serde_json::Value::Array(items)) => items,
869            _ => {
870                return Err(VmError::Runtime(
871                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
872                ));
873            }
874        },
875        _ => {
876            return Err(VmError::Runtime(
877                "host/tools/list: unexpected response shape".into(),
878            ));
879        }
880    };
881
882    let mut normalized = Vec::with_capacity(tools.len());
883    for tool in tools {
884        let serde_json::Value::Object(map) = tool else {
885            return Err(VmError::Runtime(
886                "host/tools/list: every tool must be an object".into(),
887            ));
888        };
889        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
890            return Err(VmError::Runtime(
891                "host/tools/list: every tool must include a string `name`".into(),
892            ));
893        };
894        let description = map
895            .get("description")
896            .and_then(|value| value.as_str())
897            .or_else(|| {
898                map.get("short_description")
899                    .and_then(|value| value.as_str())
900            })
901            .unwrap_or_default();
902        let schema = map
903            .get("schema")
904            .cloned()
905            .or_else(|| map.get("parameters").cloned())
906            .or_else(|| map.get("input_schema").cloned())
907            .unwrap_or(serde_json::Value::Null);
908        let deprecated = map
909            .get("deprecated")
910            .and_then(|value| value.as_bool())
911            .unwrap_or(false);
912        normalized.push(serde_json::json!({
913            "name": name,
914            "description": description,
915            "schema": schema,
916            "deprecated": deprecated,
917        }));
918    }
919    Ok(normalized)
920}
921
922#[cfg(test)]
923mod tests {
924    use super::*;
925
926    #[test]
927    fn test_json_rpc_request_format() {
928        let request = crate::jsonrpc::request(
929            1,
930            "llm_call",
931            serde_json::json!({
932                "prompt": "Hello",
933                "system": "Be helpful",
934            }),
935        );
936        let s = serde_json::to_string(&request).unwrap();
937        assert!(s.contains("\"jsonrpc\":\"2.0\""));
938        assert!(s.contains("\"id\":1"));
939        assert!(s.contains("\"method\":\"llm_call\""));
940    }
941
942    #[test]
943    fn test_json_rpc_notification_format() {
944        let notification =
945            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
946        let s = serde_json::to_string(&notification).unwrap();
947        assert!(s.contains("\"method\":\"output\""));
948        assert!(!s.contains("\"id\""));
949    }
950
951    #[test]
952    fn test_json_rpc_error_response_parsing() {
953        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
954        assert!(response.get("error").is_some());
955        assert_eq!(
956            response["error"]["message"].as_str().unwrap(),
957            "Invalid request"
958        );
959    }
960
961    #[test]
962    fn test_json_rpc_success_response_parsing() {
963        let response = crate::jsonrpc::response(
964            1,
965            serde_json::json!({
966                "text": "Hello world",
967                "input_tokens": 10,
968                "output_tokens": 5,
969            }),
970        );
971        assert!(response.get("result").is_some());
972        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
973    }
974
975    #[test]
976    fn test_cancelled_flag() {
977        let cancelled = Arc::new(AtomicBool::new(false));
978        assert!(!cancelled.load(Ordering::SeqCst));
979        cancelled.store(true, Ordering::SeqCst);
980        assert!(cancelled.load(Ordering::SeqCst));
981    }
982
983    #[test]
984    fn queued_messages_are_filtered_by_delivery_mode() {
985        let runtime = tokio::runtime::Builder::new_current_thread()
986            .enable_all()
987            .build()
988            .unwrap();
989        runtime.block_on(async {
990            let bridge = HostBridge::from_parts(
991                Arc::new(Mutex::new(HashMap::new())),
992                Arc::new(AtomicBool::new(false)),
993                Arc::new(std::sync::Mutex::new(())),
994                1,
995            );
996            bridge
997                .push_queued_user_message("first".to_string(), "finish_step")
998                .await;
999            bridge
1000                .push_queued_user_message("second".to_string(), "wait_for_completion")
1001                .await;
1002
1003            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1004            assert_eq!(finish_step.len(), 1);
1005            assert_eq!(finish_step[0].content, "first");
1006
1007            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1008            assert_eq!(turn_end.len(), 1);
1009            assert_eq!(turn_end[0].content, "second");
1010        });
1011    }
1012
1013    #[test]
1014    fn test_json_result_to_vm_value_string() {
1015        let val = serde_json::json!("hello");
1016        let vm_val = json_result_to_vm_value(&val);
1017        assert_eq!(vm_val.display(), "hello");
1018    }
1019
1020    #[test]
1021    fn test_json_result_to_vm_value_dict() {
1022        let val = serde_json::json!({"name": "test", "count": 42});
1023        let vm_val = json_result_to_vm_value(&val);
1024        let VmValue::Dict(d) = &vm_val else {
1025            unreachable!("Expected Dict, got {:?}", vm_val);
1026        };
1027        assert_eq!(d.get("name").unwrap().display(), "test");
1028        assert_eq!(d.get("count").unwrap().display(), "42");
1029    }
1030
1031    #[test]
1032    fn test_json_result_to_vm_value_null() {
1033        let val = serde_json::json!(null);
1034        let vm_val = json_result_to_vm_value(&val);
1035        assert!(matches!(vm_val, VmValue::Nil));
1036    }
1037
1038    #[test]
1039    fn test_json_result_to_vm_value_nested() {
1040        let val = serde_json::json!({
1041            "text": "response",
1042            "tool_calls": [
1043                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1044            ],
1045            "input_tokens": 100,
1046            "output_tokens": 50,
1047        });
1048        let vm_val = json_result_to_vm_value(&val);
1049        let VmValue::Dict(d) = &vm_val else {
1050            unreachable!("Expected Dict, got {:?}", vm_val);
1051        };
1052        assert_eq!(d.get("text").unwrap().display(), "response");
1053        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1054            unreachable!("Expected List for tool_calls");
1055        };
1056        assert_eq!(list.len(), 1);
1057    }
1058
1059    #[test]
1060    fn parse_host_tools_list_accepts_object_wrapper() {
1061        let tools = parse_host_tools_list_response(serde_json::json!({
1062            "tools": [
1063                {
1064                    "name": "Read",
1065                    "description": "Read a file",
1066                    "schema": {"type": "object"},
1067                }
1068            ]
1069        }))
1070        .expect("tool list");
1071
1072        assert_eq!(tools.len(), 1);
1073        assert_eq!(tools[0]["name"], "Read");
1074        assert_eq!(tools[0]["deprecated"], false);
1075    }
1076
1077    #[test]
1078    fn parse_host_tools_list_accepts_compat_fields() {
1079        let tools = parse_host_tools_list_response(serde_json::json!({
1080            "result": {
1081                "tools": [
1082                    {
1083                        "name": "Edit",
1084                        "short_description": "Apply an edit",
1085                        "input_schema": {"type": "object"},
1086                        "deprecated": true,
1087                    }
1088                ]
1089            }
1090        }))
1091        .expect("tool list");
1092
1093        assert_eq!(tools[0]["description"], "Apply an edit");
1094        assert_eq!(tools[0]["schema"]["type"], "object");
1095        assert_eq!(tools[0]["deprecated"], true);
1096    }
1097
1098    #[test]
1099    fn parse_host_tools_list_requires_tool_names() {
1100        let err = parse_host_tools_list_response(serde_json::json!({
1101            "tools": [
1102                {"description": "missing name"}
1103            ]
1104        }))
1105        .expect_err("expected error");
1106        assert!(err
1107            .to_string()
1108            .contains("host/tools/list: every tool must include a string `name`"));
1109    }
1110
1111    #[test]
1112    fn test_timeout_duration() {
1113        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1114    }
1115}