Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23/// Default timeout for bridge calls (5 minutes).
24const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29    Arc::new(move |line: &str| {
30        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31        let mut stdout = std::io::stdout().lock();
32        stdout
33            .write_all(line.as_bytes())
34            .map_err(|e| format!("Bridge write error: {e}"))?;
35        stdout
36            .write_all(b"\n")
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .flush()
40            .map_err(|e| format!("Bridge flush error: {e}"))?;
41        Ok(())
42    })
43}
44
45/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
46///
47/// The bridge sends requests to the host on stdout and receives responses
48/// on stdin. A background task reads stdin and dispatches responses to
49/// waiting callers by request ID. All stdout writes are serialized through
50/// a mutex to prevent interleaving.
51pub struct HostBridge {
52    next_id: AtomicU64,
53    /// Pending request waiters, keyed by JSON-RPC id.
54    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55    /// Whether the host has sent a cancel notification.
56    cancelled: Arc<AtomicBool>,
57    /// Transport writer used to send JSON-RPC to the host.
58    writer: HostBridgeWriter,
59    /// ACP session ID (set in ACP mode for session-scoped notifications).
60    session_id: std::sync::Mutex<String>,
61    /// Name of the currently executing Harn script (without .harn suffix).
62    script_name: std::sync::Mutex<String>,
63    /// User messages injected by the host while a run is active.
64    queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
65    /// Host-triggered resume signal for daemon agents.
66    resume_requested: Arc<AtomicBool>,
67    /// Host-triggered skill-registry invalidation signal. Set when the
68    /// host sends a `skills/update` notification; consumed by the CLI
69    /// between runs (watch mode, long-running agents) to rebuild the
70    /// layered skill catalog from its current filesystem + host state.
71    skills_reload_requested: Arc<AtomicBool>,
72    /// Whether the current daemon-mode agent loop is blocked in idle wait.
73    daemon_idle: Arc<AtomicBool>,
74    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
75    /// finalize during this prompt. Read once by the ACP adapter when the
76    /// pipeline returns and populated by `host_agent_session_finalize`.
77    /// Pipelines that don't run an agent loop leave this `None`, in which
78    /// case the adapter falls back to `end_turn`.
79    prompt_stop_reason: std::sync::Mutex<Option<String>>,
80    /// Per-call visible assistant text state for call_progress notifications.
81    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
82    /// Whether an LLM call's deltas should be exposed to end users while streaming.
83    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
84    /// Optional in-process host-module backend used by `harn playground`.
85    in_process: Option<InProcessHost>,
86}
87
88struct InProcessHost {
89    module_path: PathBuf,
90    exported_functions: BTreeMap<String, Rc<VmClosure>>,
91    vm: Vm,
92}
93
94impl InProcessHost {
95    async fn dispatch(
96        &self,
97        method: &str,
98        params: serde_json::Value,
99    ) -> Result<serde_json::Value, VmError> {
100        match method {
101            "builtin_call" => {
102                let name = params
103                    .get("name")
104                    .and_then(|value| value.as_str())
105                    .unwrap_or_default();
106                let args = params
107                    .get("args")
108                    .and_then(|value| value.as_array())
109                    .cloned()
110                    .unwrap_or_default()
111                    .into_iter()
112                    .map(|value| json_result_to_vm_value(&value))
113                    .collect::<Vec<_>>();
114                self.invoke_export(name, &args).await
115            }
116            "host/tools/list" => self
117                .invoke_optional_export("host_tools_list", &[])
118                .await
119                .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
120            "session/request_permission" => self.request_permission(params).await,
121            other => Err(VmError::Runtime(format!(
122                "playground host backend does not implement bridge method '{other}'"
123            ))),
124        }
125    }
126
127    async fn invoke_export(
128        &self,
129        name: &str,
130        args: &[VmValue],
131    ) -> Result<serde_json::Value, VmError> {
132        let Some(closure) = self.exported_functions.get(name) else {
133            return Err(VmError::Runtime(format!(
134                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
135                self.module_path.display()
136            )));
137        };
138
139        let mut vm = self.vm.child_vm_for_host();
140        let result = vm.call_closure_pub(closure, args).await?;
141        Ok(crate::llm::vm_value_to_json(&result))
142    }
143
144    async fn invoke_optional_export(
145        &self,
146        name: &str,
147        args: &[VmValue],
148    ) -> Result<Option<serde_json::Value>, VmError> {
149        if !self.exported_functions.contains_key(name) {
150            return Ok(None);
151        }
152        self.invoke_export(name, args).await.map(Some)
153    }
154
155    async fn request_permission(
156        &self,
157        params: serde_json::Value,
158    ) -> Result<serde_json::Value, VmError> {
159        let Some(closure) = self.exported_functions.get("request_permission") else {
160            return Ok(serde_json::json!({ "granted": true }));
161        };
162
163        let tool_name = params
164            .get("toolCall")
165            .and_then(|tool_call| tool_call.get("toolName"))
166            .and_then(|value| value.as_str())
167            .unwrap_or_default();
168        let tool_args = params
169            .get("toolCall")
170            .and_then(|tool_call| tool_call.get("rawInput"))
171            .map(json_result_to_vm_value)
172            .unwrap_or(VmValue::Nil);
173        let full_payload = json_result_to_vm_value(&params);
174
175        let arg_count = closure.func.params.len();
176        let args = if arg_count >= 3 {
177            vec![
178                VmValue::String(Rc::from(tool_name.to_string())),
179                tool_args,
180                full_payload,
181            ]
182        } else if arg_count == 2 {
183            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
184        } else if arg_count == 1 {
185            vec![full_payload]
186        } else {
187            Vec::new()
188        };
189
190        let mut vm = self.vm.child_vm_for_host();
191        let result = vm.call_closure_pub(closure, &args).await?;
192        let payload = match result {
193            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
194            VmValue::String(reason) if !reason.is_empty() => {
195                serde_json::json!({ "granted": false, "reason": reason.to_string() })
196            }
197            other => {
198                let json = crate::llm::vm_value_to_json(&other);
199                if json
200                    .get("granted")
201                    .and_then(|value| value.as_bool())
202                    .is_some()
203                    || json.get("outcome").is_some()
204                {
205                    json
206                } else {
207                    serde_json::json!({ "granted": other.is_truthy() })
208                }
209            }
210        };
211        Ok(payload)
212    }
213}
214
215#[derive(Clone, Debug, PartialEq, Eq)]
216pub enum QueuedUserMessageMode {
217    InterruptImmediate,
218    FinishStep,
219    WaitForCompletion,
220}
221
222#[derive(Clone, Copy, Debug, PartialEq, Eq)]
223pub enum DeliveryCheckpoint {
224    InterruptImmediate,
225    AfterCurrentOperation,
226    EndOfInteraction,
227}
228
229impl QueuedUserMessageMode {
230    fn from_str(value: &str) -> Self {
231        match value {
232            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
233            "finish_step" | "after_current_operation" => Self::FinishStep,
234            _ => Self::WaitForCompletion,
235        }
236    }
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct QueuedUserMessage {
241    pub content: String,
242    pub mode: QueuedUserMessageMode,
243}
244
245// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
246#[allow(clippy::new_without_default)]
247impl HostBridge {
248    /// Create a new bridge and spawn the stdin reader task.
249    ///
250    /// Must be called within a tokio LocalSet (uses spawn_local for the
251    /// stdin reader since it's single-threaded).
252    pub fn new() -> Self {
253        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
254            Arc::new(Mutex::new(HashMap::new()));
255        let cancelled = Arc::new(AtomicBool::new(false));
256        let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
257            Arc::new(Mutex::new(VecDeque::new()));
258        let resume_requested = Arc::new(AtomicBool::new(false));
259        let skills_reload_requested = Arc::new(AtomicBool::new(false));
260        let daemon_idle = Arc::new(AtomicBool::new(false));
261
262        // Stdin reader: reads JSON-RPC lines and dispatches responses
263        let pending_clone = pending.clone();
264        let cancelled_clone = cancelled.clone();
265        let queued_clone = queued_user_messages.clone();
266        let resume_clone = resume_requested.clone();
267        let skills_reload_clone = skills_reload_requested.clone();
268        tokio::task::spawn_local(async move {
269            let stdin = tokio::io::stdin();
270            let reader = tokio::io::BufReader::new(stdin);
271            let mut lines = reader.lines();
272
273            while let Ok(Some(line)) = lines.next_line().await {
274                let line = line.trim().to_string();
275                if line.is_empty() {
276                    continue;
277                }
278
279                let msg: serde_json::Value = match serde_json::from_str(&line) {
280                    Ok(v) => v,
281                    Err(_) => continue,
282                };
283
284                // Notifications have no id; responses have one.
285                if msg.get("id").is_none() {
286                    if let Some(method) = msg["method"].as_str() {
287                        if method == "cancel" {
288                            cancelled_clone.store(true, Ordering::SeqCst);
289                        } else if method == "agent/resume" {
290                            resume_clone.store(true, Ordering::SeqCst);
291                        } else if method == "skills/update" {
292                            skills_reload_clone.store(true, Ordering::SeqCst);
293                        } else if method == "user_message"
294                            || method == "session/input"
295                            || method == "agent/user_message"
296                        {
297                            let params = &msg["params"];
298                            let content = params
299                                .get("content")
300                                .and_then(|v| v.as_str())
301                                .unwrap_or("")
302                                .to_string();
303                            if !content.is_empty() {
304                                let mode = QueuedUserMessageMode::from_str(
305                                    params
306                                        .get("mode")
307                                        .and_then(|v| v.as_str())
308                                        .unwrap_or("wait_for_completion"),
309                                );
310                                queued_clone
311                                    .lock()
312                                    .await
313                                    .push_back(QueuedUserMessage { content, mode });
314                            }
315                        }
316                    }
317                    continue;
318                }
319
320                if let Some(id) = msg["id"].as_u64() {
321                    let mut pending = pending_clone.lock().await;
322                    if let Some(sender) = pending.remove(&id) {
323                        let _ = sender.send(msg);
324                    }
325                }
326            }
327
328            // stdin closed: drop pending senders to cancel waiters.
329            let mut pending = pending_clone.lock().await;
330            pending.clear();
331        });
332
333        Self {
334            next_id: AtomicU64::new(1),
335            pending,
336            cancelled,
337            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
338            session_id: std::sync::Mutex::new(String::new()),
339            script_name: std::sync::Mutex::new(String::new()),
340            queued_user_messages,
341            resume_requested,
342            skills_reload_requested,
343            daemon_idle,
344            prompt_stop_reason: std::sync::Mutex::new(None),
345            visible_call_states: std::sync::Mutex::new(HashMap::new()),
346            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
347            in_process: None,
348        }
349    }
350
351    /// Create a bridge from pre-existing shared state.
352    ///
353    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
354    /// responsible for dispatching responses into `pending`.  This is used
355    /// by ACP mode which already has its own stdin reader.
356    pub fn from_parts(
357        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
358        cancelled: Arc<AtomicBool>,
359        stdout_lock: Arc<std::sync::Mutex<()>>,
360        start_id: u64,
361    ) -> Self {
362        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
363    }
364
365    pub fn from_parts_with_writer(
366        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
367        cancelled: Arc<AtomicBool>,
368        writer: HostBridgeWriter,
369        start_id: u64,
370    ) -> Self {
371        Self {
372            next_id: AtomicU64::new(start_id),
373            pending,
374            cancelled,
375            writer,
376            session_id: std::sync::Mutex::new(String::new()),
377            script_name: std::sync::Mutex::new(String::new()),
378            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
379            resume_requested: Arc::new(AtomicBool::new(false)),
380            skills_reload_requested: Arc::new(AtomicBool::new(false)),
381            daemon_idle: Arc::new(AtomicBool::new(false)),
382            prompt_stop_reason: std::sync::Mutex::new(None),
383            visible_call_states: std::sync::Mutex::new(HashMap::new()),
384            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
385            in_process: None,
386        }
387    }
388
389    /// Create an in-process host bridge backed by exported functions from a
390    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
391    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
392        let exported_functions = vm.load_module_exports(module_path).await?;
393        Ok(Self {
394            next_id: AtomicU64::new(1),
395            pending: Arc::new(Mutex::new(HashMap::new())),
396            cancelled: Arc::new(AtomicBool::new(false)),
397            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
398            session_id: std::sync::Mutex::new(String::new()),
399            script_name: std::sync::Mutex::new(String::new()),
400            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
401            resume_requested: Arc::new(AtomicBool::new(false)),
402            skills_reload_requested: Arc::new(AtomicBool::new(false)),
403            daemon_idle: Arc::new(AtomicBool::new(false)),
404            prompt_stop_reason: std::sync::Mutex::new(None),
405            visible_call_states: std::sync::Mutex::new(HashMap::new()),
406            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
407            in_process: Some(InProcessHost {
408                module_path: module_path.to_path_buf(),
409                exported_functions,
410                vm,
411            }),
412        })
413    }
414
415    /// Set the ACP session ID for session-scoped notifications.
416    pub fn set_session_id(&self, id: &str) {
417        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
418    }
419
420    /// Set the currently executing script name (without .harn suffix).
421    pub fn set_script_name(&self, name: &str) {
422        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
423    }
424
425    /// Get the current script name.
426    fn get_script_name(&self) -> String {
427        self.script_name
428            .lock()
429            .unwrap_or_else(|e| e.into_inner())
430            .clone()
431    }
432
433    /// Get the session ID.
434    pub fn get_session_id(&self) -> String {
435        self.session_id
436            .lock()
437            .unwrap_or_else(|e| e.into_inner())
438            .clone()
439    }
440
441    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
442    fn write_line(&self, line: &str) -> Result<(), VmError> {
443        (self.writer)(line).map_err(VmError::Runtime)
444    }
445
446    /// Send a JSON-RPC request to the host and wait for the response.
447    /// Times out after 5 minutes to prevent deadlocks.
448    pub async fn call(
449        &self,
450        method: &str,
451        params: serde_json::Value,
452    ) -> Result<serde_json::Value, VmError> {
453        if let Some(in_process) = &self.in_process {
454            return in_process.dispatch(method, params).await;
455        }
456
457        if self.is_cancelled() {
458            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
459        }
460
461        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
462
463        let request = crate::jsonrpc::request(id, method, params);
464
465        let (tx, rx) = oneshot::channel();
466        {
467            let mut pending = self.pending.lock().await;
468            pending.insert(id, tx);
469        }
470
471        let line = serde_json::to_string(&request)
472            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
473        if let Err(e) = self.write_line(&line) {
474            let mut pending = self.pending.lock().await;
475            pending.remove(&id);
476            return Err(e);
477        }
478
479        let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
480            Ok(Ok(msg)) => msg,
481            Ok(Err(_)) => {
482                // Sender dropped: host closed or stdin reader exited.
483                return Err(VmError::Runtime(
484                    "Bridge: host closed connection before responding".into(),
485                ));
486            }
487            Err(_) => {
488                let mut pending = self.pending.lock().await;
489                pending.remove(&id);
490                return Err(VmError::Runtime(format!(
491                    "Bridge: host did not respond to '{method}' within {}s",
492                    DEFAULT_TIMEOUT.as_secs()
493                )));
494            }
495        };
496
497        if let Some(error) = response.get("error") {
498            let message = error["message"].as_str().unwrap_or("Unknown host error");
499            let code = error["code"].as_i64().unwrap_or(-1);
500            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
501            if code == -32001 {
502                return Err(VmError::CategorizedError {
503                    message: message.to_string(),
504                    category: ErrorCategory::ToolRejected,
505                });
506            }
507            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
508        }
509
510        Ok(response["result"].clone())
511    }
512
513    /// Send a JSON-RPC notification to the host (no response expected).
514    /// Serialized through the stdout mutex to prevent interleaving.
515    pub fn notify(&self, method: &str, params: serde_json::Value) {
516        let notification = crate::jsonrpc::notification(method, params);
517        if self.in_process.is_some() {
518            return;
519        }
520        if let Ok(line) = serde_json::to_string(&notification) {
521            let _ = self.write_line(&line);
522        }
523    }
524
525    /// Check if the host has sent a cancel notification.
526    pub fn is_cancelled(&self) -> bool {
527        self.cancelled.load(Ordering::SeqCst)
528    }
529
530    pub fn take_resume_signal(&self) -> bool {
531        self.resume_requested.swap(false, Ordering::SeqCst)
532    }
533
534    pub fn signal_resume(&self) {
535        self.resume_requested.store(true, Ordering::SeqCst);
536    }
537
538    pub fn set_daemon_idle(&self, idle: bool) {
539        self.daemon_idle.store(idle, Ordering::SeqCst);
540    }
541
542    pub fn is_daemon_idle(&self) -> bool {
543        self.daemon_idle.load(Ordering::SeqCst)
544    }
545
546    /// Record the canonical ACP `stopReason` for the current prompt. The
547    /// last writer wins, which matches the semantic that an outer
548    /// `agent_loop` (the one whose result the user observes) always
549    /// finalizes after any inner loops it spawned.
550    pub fn set_prompt_stop_reason(&self, reason: &str) {
551        *self
552            .prompt_stop_reason
553            .lock()
554            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
555    }
556
557    /// Consume any prompt stop reason recorded during this prompt. The
558    /// ACP adapter calls this once after the pipeline returns; pipelines
559    /// that didn't run an `agent_loop` see `None` and the adapter falls
560    /// back to `end_turn`.
561    pub fn take_prompt_stop_reason(&self) -> Option<String> {
562        self.prompt_stop_reason
563            .lock()
564            .unwrap_or_else(|e| e.into_inner())
565            .take()
566    }
567
568    /// Consume any pending `skills/update` signal the host has sent.
569    /// Returns `true` exactly once per notification, letting callers
570    /// trigger a layered-discovery rebuild without polling false
571    /// positives. See issue #73 for the hot-reload contract.
572    pub fn take_skills_reload_signal(&self) -> bool {
573        self.skills_reload_requested.swap(false, Ordering::SeqCst)
574    }
575
576    /// Manually mark the skill catalog as stale. Used by tests and by
577    /// the CLI when an internal event (e.g. `harn install`) should
578    /// trigger the same rebuild a `skills/update` notification would.
579    pub fn signal_skills_reload(&self) {
580        self.skills_reload_requested.store(true, Ordering::SeqCst);
581    }
582
583    /// Call the host's `skills/list` RPC and return the raw JSON array
584    /// it responded with. Shape:
585    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
586    /// The CLI adapter converts each entry into a
587    /// [`crate::skills::SkillManifestRef`].
588    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
589        let result = self.call("skills/list", serde_json::json!({})).await?;
590        match result {
591            serde_json::Value::Array(items) => Ok(items),
592            serde_json::Value::Object(map) => match map.get("skills") {
593                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
594                _ => Err(VmError::Runtime(
595                    "skills/list: host response must be an array or { skills: [...] }".into(),
596                )),
597            },
598            _ => Err(VmError::Runtime(
599                "skills/list: unexpected response shape".into(),
600            )),
601        }
602    }
603
604    /// Call the host's `host/tools/list` RPC and return normalized tool
605    /// descriptors. Shape:
606    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
607    /// The bridge also accepts `{ "tools": [...] }` and
608    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
609    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
610        let result = self.call("host/tools/list", serde_json::json!({})).await?;
611        parse_host_tools_list_response(result)
612    }
613
614    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
615    /// raw JSON body so the CLI can inspect both the frontmatter fields
616    /// and the skill markdown body in whatever shape the host sends.
617    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
618        self.call("skills/fetch", serde_json::json!({ "id": id }))
619            .await
620    }
621
622    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
623        self.queued_user_messages
624            .lock()
625            .await
626            .push_back(QueuedUserMessage {
627                content,
628                mode: QueuedUserMessageMode::from_str(mode),
629            });
630    }
631
632    pub async fn take_queued_user_messages(
633        &self,
634        include_interrupt_immediate: bool,
635        include_finish_step: bool,
636        include_wait_for_completion: bool,
637    ) -> Vec<QueuedUserMessage> {
638        let mut queue = self.queued_user_messages.lock().await;
639        let mut selected = Vec::new();
640        let mut retained = VecDeque::new();
641        while let Some(message) = queue.pop_front() {
642            let should_take = match message.mode {
643                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
644                QueuedUserMessageMode::FinishStep => include_finish_step,
645                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
646            };
647            if should_take {
648                selected.push(message);
649            } else {
650                retained.push_back(message);
651            }
652        }
653        *queue = retained;
654        selected
655    }
656
657    pub async fn take_queued_user_messages_for(
658        &self,
659        checkpoint: DeliveryCheckpoint,
660    ) -> Vec<QueuedUserMessage> {
661        match checkpoint {
662            DeliveryCheckpoint::InterruptImmediate => {
663                self.take_queued_user_messages(true, false, false).await
664            }
665            DeliveryCheckpoint::AfterCurrentOperation => {
666                self.take_queued_user_messages(false, true, false).await
667            }
668            DeliveryCheckpoint::EndOfInteraction => {
669                self.take_queued_user_messages(false, false, true).await
670            }
671        }
672    }
673
674    /// Send an output notification (for log/print in bridge mode).
675    pub fn send_output(&self, text: &str) {
676        self.notify("output", serde_json::json!({"text": text}));
677    }
678
679    /// Send a progress notification with optional numeric progress and structured data.
680    pub fn send_progress(
681        &self,
682        phase: &str,
683        message: &str,
684        progress: Option<i64>,
685        total: Option<i64>,
686        data: Option<serde_json::Value>,
687    ) {
688        let mut payload = serde_json::json!({"phase": phase, "message": message});
689        if let Some(p) = progress {
690            payload["progress"] = serde_json::json!(p);
691        }
692        if let Some(t) = total {
693            payload["total"] = serde_json::json!(t);
694        }
695        if let Some(d) = data {
696            payload["data"] = d;
697        }
698        self.notify("progress", payload);
699    }
700
701    /// Send a structured log notification.
702    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
703        let mut payload = serde_json::json!({"level": level, "message": message});
704        if let Some(f) = fields {
705            payload["fields"] = f;
706        }
707        self.notify("log", payload);
708    }
709
710    /// Send a `session/update` with `call_start` — signals the beginning of
711    /// an LLM call, tool call, or builtin call for observability.
712    pub fn send_call_start(
713        &self,
714        call_id: &str,
715        call_type: &str,
716        name: &str,
717        metadata: serde_json::Value,
718    ) {
719        let session_id = self.get_session_id();
720        let script = self.get_script_name();
721        let stream_publicly = metadata
722            .get("stream_publicly")
723            .and_then(|value| value.as_bool())
724            .unwrap_or(true);
725        self.visible_call_streams
726            .lock()
727            .unwrap_or_else(|e| e.into_inner())
728            .insert(call_id.to_string(), stream_publicly);
729        self.notify(
730            "session/update",
731            serde_json::json!({
732                "sessionId": session_id,
733                "update": {
734                    "sessionUpdate": "call_start",
735                    "content": {
736                        "toolCallId": call_id,
737                        "call_type": call_type,
738                        "name": name,
739                        "script": script,
740                        "metadata": metadata,
741                    },
742                },
743            }),
744        );
745    }
746
747    /// Send a `session/update` with `call_progress` — a streaming token delta
748    /// from an in-flight LLM call.
749    pub fn send_call_progress(
750        &self,
751        call_id: &str,
752        delta: &str,
753        accumulated_tokens: u64,
754        user_visible: bool,
755    ) {
756        let session_id = self.get_session_id();
757        let (visible_text, visible_delta) = {
758            let stream_publicly = self
759                .visible_call_streams
760                .lock()
761                .unwrap_or_else(|e| e.into_inner())
762                .get(call_id)
763                .copied()
764                .unwrap_or(true);
765            let mut states = self
766                .visible_call_states
767                .lock()
768                .unwrap_or_else(|e| e.into_inner());
769            let state = states.entry(call_id.to_string()).or_default();
770            state.push(delta, stream_publicly)
771        };
772        self.notify(
773            "session/update",
774            serde_json::json!({
775                "sessionId": session_id,
776                "update": {
777                    "sessionUpdate": "call_progress",
778                    "content": {
779                        "toolCallId": call_id,
780                        "delta": delta,
781                        "accumulated_tokens": accumulated_tokens,
782                        "visible_text": visible_text,
783                        "visible_delta": visible_delta,
784                        "user_visible": user_visible,
785                    },
786                },
787            }),
788        );
789    }
790
791    /// Send a `session/update` with `call_end` — signals completion of a call.
792    pub fn send_call_end(
793        &self,
794        call_id: &str,
795        call_type: &str,
796        name: &str,
797        duration_ms: u64,
798        status: &str,
799        metadata: serde_json::Value,
800    ) {
801        let session_id = self.get_session_id();
802        let script = self.get_script_name();
803        self.visible_call_states
804            .lock()
805            .unwrap_or_else(|e| e.into_inner())
806            .remove(call_id);
807        self.visible_call_streams
808            .lock()
809            .unwrap_or_else(|e| e.into_inner())
810            .remove(call_id);
811        self.notify(
812            "session/update",
813            serde_json::json!({
814                "sessionId": session_id,
815                "update": {
816                    "sessionUpdate": "call_end",
817                    "content": {
818                        "toolCallId": call_id,
819                        "call_type": call_type,
820                        "name": name,
821                        "script": script,
822                        "duration_ms": duration_ms,
823                        "status": status,
824                        "metadata": metadata,
825                    },
826                },
827            }),
828        );
829    }
830
831    /// Send a worker lifecycle update for delegated/background execution.
832    pub fn send_worker_update(
833        &self,
834        worker_id: &str,
835        worker_name: &str,
836        status: &str,
837        metadata: serde_json::Value,
838        audit: Option<&MutationSessionRecord>,
839    ) {
840        let session_id = self.get_session_id();
841        let script = self.get_script_name();
842        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
843        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
844        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
845        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
846        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
847        let lifecycle = serde_json::json!({
848            "event": status,
849            "worker_id": worker_id,
850            "worker_name": worker_name,
851            "started_at": started_at,
852            "finished_at": finished_at,
853        });
854        self.notify(
855            "session/update",
856            serde_json::json!({
857                "sessionId": session_id,
858                "update": {
859                    "sessionUpdate": "worker_update",
860                    "content": {
861                        "worker_id": worker_id,
862                        "worker_name": worker_name,
863                        "status": status,
864                        "script": script,
865                        "started_at": started_at,
866                        "finished_at": finished_at,
867                        "snapshot_path": snapshot_path,
868                        "run_id": run_id,
869                        "run_path": run_path,
870                        "lifecycle": lifecycle,
871                        "audit": audit,
872                        "metadata": metadata,
873                    },
874                },
875            }),
876        );
877    }
878}
879
880/// Convert a serde_json::Value to a VmValue.
881pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
882    crate::stdlib::json_to_vm_value(val)
883}
884
885fn parse_host_tools_list_response(
886    result: serde_json::Value,
887) -> Result<Vec<serde_json::Value>, VmError> {
888    let tools = match result {
889        serde_json::Value::Array(items) => items,
890        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
891            map.get("result")
892                .and_then(|value| value.get("tools"))
893                .cloned()
894        }) {
895            Some(serde_json::Value::Array(items)) => items,
896            _ => {
897                return Err(VmError::Runtime(
898                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
899                ));
900            }
901        },
902        _ => {
903            return Err(VmError::Runtime(
904                "host/tools/list: unexpected response shape".into(),
905            ));
906        }
907    };
908
909    let mut normalized = Vec::with_capacity(tools.len());
910    for tool in tools {
911        let serde_json::Value::Object(map) = tool else {
912            return Err(VmError::Runtime(
913                "host/tools/list: every tool must be an object".into(),
914            ));
915        };
916        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
917            return Err(VmError::Runtime(
918                "host/tools/list: every tool must include a string `name`".into(),
919            ));
920        };
921        let description = map
922            .get("description")
923            .and_then(|value| value.as_str())
924            .or_else(|| {
925                map.get("short_description")
926                    .and_then(|value| value.as_str())
927            })
928            .unwrap_or_default();
929        let schema = map
930            .get("schema")
931            .cloned()
932            .or_else(|| map.get("parameters").cloned())
933            .or_else(|| map.get("input_schema").cloned())
934            .unwrap_or(serde_json::Value::Null);
935        let deprecated = map
936            .get("deprecated")
937            .and_then(|value| value.as_bool())
938            .unwrap_or(false);
939        normalized.push(serde_json::json!({
940            "name": name,
941            "description": description,
942            "schema": schema,
943            "deprecated": deprecated,
944        }));
945    }
946    Ok(normalized)
947}
948
949#[cfg(test)]
950mod tests {
951    use super::*;
952
953    #[test]
954    fn test_json_rpc_request_format() {
955        let request = crate::jsonrpc::request(
956            1,
957            "llm_call",
958            serde_json::json!({
959                "prompt": "Hello",
960                "system": "Be helpful",
961            }),
962        );
963        let s = serde_json::to_string(&request).unwrap();
964        assert!(s.contains("\"jsonrpc\":\"2.0\""));
965        assert!(s.contains("\"id\":1"));
966        assert!(s.contains("\"method\":\"llm_call\""));
967    }
968
969    #[test]
970    fn test_json_rpc_notification_format() {
971        let notification =
972            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
973        let s = serde_json::to_string(&notification).unwrap();
974        assert!(s.contains("\"method\":\"output\""));
975        assert!(!s.contains("\"id\""));
976    }
977
978    #[test]
979    fn test_json_rpc_error_response_parsing() {
980        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
981        assert!(response.get("error").is_some());
982        assert_eq!(
983            response["error"]["message"].as_str().unwrap(),
984            "Invalid request"
985        );
986    }
987
988    #[test]
989    fn test_json_rpc_success_response_parsing() {
990        let response = crate::jsonrpc::response(
991            1,
992            serde_json::json!({
993                "text": "Hello world",
994                "input_tokens": 10,
995                "output_tokens": 5,
996            }),
997        );
998        assert!(response.get("result").is_some());
999        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1000    }
1001
1002    #[test]
1003    fn test_cancelled_flag() {
1004        let cancelled = Arc::new(AtomicBool::new(false));
1005        assert!(!cancelled.load(Ordering::SeqCst));
1006        cancelled.store(true, Ordering::SeqCst);
1007        assert!(cancelled.load(Ordering::SeqCst));
1008    }
1009
1010    #[test]
1011    fn queued_messages_are_filtered_by_delivery_mode() {
1012        let runtime = tokio::runtime::Builder::new_current_thread()
1013            .enable_all()
1014            .build()
1015            .unwrap();
1016        runtime.block_on(async {
1017            let bridge = HostBridge::from_parts(
1018                Arc::new(Mutex::new(HashMap::new())),
1019                Arc::new(AtomicBool::new(false)),
1020                Arc::new(std::sync::Mutex::new(())),
1021                1,
1022            );
1023            bridge
1024                .push_queued_user_message("first".to_string(), "finish_step")
1025                .await;
1026            bridge
1027                .push_queued_user_message("second".to_string(), "wait_for_completion")
1028                .await;
1029
1030            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1031            assert_eq!(finish_step.len(), 1);
1032            assert_eq!(finish_step[0].content, "first");
1033
1034            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1035            assert_eq!(turn_end.len(), 1);
1036            assert_eq!(turn_end[0].content, "second");
1037        });
1038    }
1039
1040    #[test]
1041    fn test_json_result_to_vm_value_string() {
1042        let val = serde_json::json!("hello");
1043        let vm_val = json_result_to_vm_value(&val);
1044        assert_eq!(vm_val.display(), "hello");
1045    }
1046
1047    #[test]
1048    fn test_json_result_to_vm_value_dict() {
1049        let val = serde_json::json!({"name": "test", "count": 42});
1050        let vm_val = json_result_to_vm_value(&val);
1051        let VmValue::Dict(d) = &vm_val else {
1052            unreachable!("Expected Dict, got {:?}", vm_val);
1053        };
1054        assert_eq!(d.get("name").unwrap().display(), "test");
1055        assert_eq!(d.get("count").unwrap().display(), "42");
1056    }
1057
1058    #[test]
1059    fn test_json_result_to_vm_value_null() {
1060        let val = serde_json::json!(null);
1061        let vm_val = json_result_to_vm_value(&val);
1062        assert!(matches!(vm_val, VmValue::Nil));
1063    }
1064
1065    #[test]
1066    fn test_json_result_to_vm_value_nested() {
1067        let val = serde_json::json!({
1068            "text": "response",
1069            "tool_calls": [
1070                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1071            ],
1072            "input_tokens": 100,
1073            "output_tokens": 50,
1074        });
1075        let vm_val = json_result_to_vm_value(&val);
1076        let VmValue::Dict(d) = &vm_val else {
1077            unreachable!("Expected Dict, got {:?}", vm_val);
1078        };
1079        assert_eq!(d.get("text").unwrap().display(), "response");
1080        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1081            unreachable!("Expected List for tool_calls");
1082        };
1083        assert_eq!(list.len(), 1);
1084    }
1085
1086    #[test]
1087    fn parse_host_tools_list_accepts_object_wrapper() {
1088        let tools = parse_host_tools_list_response(serde_json::json!({
1089            "tools": [
1090                {
1091                    "name": "Read",
1092                    "description": "Read a file",
1093                    "schema": {"type": "object"},
1094                }
1095            ]
1096        }))
1097        .expect("tool list");
1098
1099        assert_eq!(tools.len(), 1);
1100        assert_eq!(tools[0]["name"], "Read");
1101        assert_eq!(tools[0]["deprecated"], false);
1102    }
1103
1104    #[test]
1105    fn parse_host_tools_list_accepts_compat_fields() {
1106        let tools = parse_host_tools_list_response(serde_json::json!({
1107            "result": {
1108                "tools": [
1109                    {
1110                        "name": "Edit",
1111                        "short_description": "Apply an edit",
1112                        "input_schema": {"type": "object"},
1113                        "deprecated": true,
1114                    }
1115                ]
1116            }
1117        }))
1118        .expect("tool list");
1119
1120        assert_eq!(tools[0]["description"], "Apply an edit");
1121        assert_eq!(tools[0]["schema"]["type"], "object");
1122        assert_eq!(tools[0]["deprecated"], true);
1123    }
1124
1125    #[test]
1126    fn parse_host_tools_list_requires_tool_names() {
1127        let err = parse_host_tools_list_response(serde_json::json!({
1128            "tools": [
1129                {"description": "missing name"}
1130            ]
1131        }))
1132        .expect_err("expected error");
1133        assert!(err
1134            .to_string()
1135            .contains("host/tools/list: every tool must include a string `name`"));
1136    }
1137
1138    #[test]
1139    fn test_timeout_duration() {
1140        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1141    }
1142}