Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23/// Default timeout for bridge calls (5 minutes).
24const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29    Arc::new(move |line: &str| {
30        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31        let mut stdout = std::io::stdout().lock();
32        stdout
33            .write_all(line.as_bytes())
34            .map_err(|e| format!("Bridge write error: {e}"))?;
35        stdout
36            .write_all(b"\n")
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .flush()
40            .map_err(|e| format!("Bridge flush error: {e}"))?;
41        Ok(())
42    })
43}
44
45/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
46///
47/// The bridge sends requests to the host on stdout and receives responses
48/// on stdin. A background task reads stdin and dispatches responses to
49/// waiting callers by request ID. All stdout writes are serialized through
50/// a mutex to prevent interleaving.
51pub struct HostBridge {
52    next_id: AtomicU64,
53    /// Pending request waiters, keyed by JSON-RPC id.
54    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55    /// Whether the host has sent a cancel notification.
56    cancelled: Arc<AtomicBool>,
57    /// Transport writer used to send JSON-RPC to the host.
58    writer: HostBridgeWriter,
59    /// ACP session ID (set in ACP mode for session-scoped notifications).
60    session_id: std::sync::Mutex<String>,
61    /// Name of the currently executing Harn script (without .harn suffix).
62    script_name: std::sync::Mutex<String>,
63    /// User messages injected by the host while a run is active.
64    queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
65    /// Host-triggered resume signal for daemon agents.
66    resume_requested: Arc<AtomicBool>,
67    /// Host-triggered skill-registry invalidation signal. Set when the
68    /// host sends a `skills/update` notification; consumed by the CLI
69    /// between runs (watch mode, long-running agents) to rebuild the
70    /// layered skill catalog from its current filesystem + host state.
71    skills_reload_requested: Arc<AtomicBool>,
72    /// Whether the current daemon-mode agent loop is blocked in idle wait.
73    daemon_idle: Arc<AtomicBool>,
74    /// Per-call visible assistant text state for call_progress notifications.
75    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
76    /// Whether an LLM call's deltas should be exposed to end users while streaming.
77    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
78    /// Optional in-process host-module backend used by `harn playground`.
79    in_process: Option<InProcessHost>,
80    #[cfg(test)]
81    recorded_notifications: Arc<std::sync::Mutex<Vec<serde_json::Value>>>,
82}
83
84struct InProcessHost {
85    module_path: PathBuf,
86    exported_functions: BTreeMap<String, Rc<VmClosure>>,
87    vm: Vm,
88}
89
90impl InProcessHost {
91    async fn dispatch(
92        &self,
93        method: &str,
94        params: serde_json::Value,
95    ) -> Result<serde_json::Value, VmError> {
96        match method {
97            "builtin_call" => {
98                let name = params
99                    .get("name")
100                    .and_then(|value| value.as_str())
101                    .unwrap_or_default();
102                let args = params
103                    .get("args")
104                    .and_then(|value| value.as_array())
105                    .cloned()
106                    .unwrap_or_default()
107                    .into_iter()
108                    .map(|value| json_result_to_vm_value(&value))
109                    .collect::<Vec<_>>();
110                self.invoke_export(name, &args).await
111            }
112            "host/tools/list" => self
113                .invoke_optional_export("host_tools_list", &[])
114                .await
115                .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
116            "session/request_permission" => self.request_permission(params).await,
117            other => Err(VmError::Runtime(format!(
118                "playground host backend does not implement bridge method '{other}'"
119            ))),
120        }
121    }
122
123    async fn invoke_export(
124        &self,
125        name: &str,
126        args: &[VmValue],
127    ) -> Result<serde_json::Value, VmError> {
128        let Some(closure) = self.exported_functions.get(name) else {
129            return Err(VmError::Runtime(format!(
130                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
131                self.module_path.display()
132            )));
133        };
134
135        let mut vm = self.vm.child_vm_for_host();
136        let result = vm.call_closure_pub(closure, args).await?;
137        Ok(crate::llm::vm_value_to_json(&result))
138    }
139
140    async fn invoke_optional_export(
141        &self,
142        name: &str,
143        args: &[VmValue],
144    ) -> Result<Option<serde_json::Value>, VmError> {
145        if !self.exported_functions.contains_key(name) {
146            return Ok(None);
147        }
148        self.invoke_export(name, args).await.map(Some)
149    }
150
151    async fn request_permission(
152        &self,
153        params: serde_json::Value,
154    ) -> Result<serde_json::Value, VmError> {
155        let Some(closure) = self.exported_functions.get("request_permission") else {
156            return Ok(serde_json::json!({ "granted": true }));
157        };
158
159        let tool_name = params
160            .get("toolCall")
161            .and_then(|tool_call| tool_call.get("toolName"))
162            .and_then(|value| value.as_str())
163            .unwrap_or_default();
164        let tool_args = params
165            .get("toolCall")
166            .and_then(|tool_call| tool_call.get("rawInput"))
167            .map(json_result_to_vm_value)
168            .unwrap_or(VmValue::Nil);
169        let full_payload = json_result_to_vm_value(&params);
170
171        let arg_count = closure.func.params.len();
172        let args = if arg_count >= 3 {
173            vec![
174                VmValue::String(Rc::from(tool_name.to_string())),
175                tool_args,
176                full_payload,
177            ]
178        } else if arg_count == 2 {
179            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
180        } else if arg_count == 1 {
181            vec![full_payload]
182        } else {
183            Vec::new()
184        };
185
186        let mut vm = self.vm.child_vm_for_host();
187        let result = vm.call_closure_pub(closure, &args).await?;
188        let payload = match result {
189            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
190            VmValue::String(reason) if !reason.is_empty() => {
191                serde_json::json!({ "granted": false, "reason": reason.to_string() })
192            }
193            other => {
194                let json = crate::llm::vm_value_to_json(&other);
195                if json
196                    .get("granted")
197                    .and_then(|value| value.as_bool())
198                    .is_some()
199                    || json.get("outcome").is_some()
200                {
201                    json
202                } else {
203                    serde_json::json!({ "granted": other.is_truthy() })
204                }
205            }
206        };
207        Ok(payload)
208    }
209}
210
211#[derive(Clone, Debug, PartialEq, Eq)]
212pub enum QueuedUserMessageMode {
213    InterruptImmediate,
214    FinishStep,
215    WaitForCompletion,
216}
217
218#[derive(Clone, Copy, Debug, PartialEq, Eq)]
219pub enum DeliveryCheckpoint {
220    InterruptImmediate,
221    AfterCurrentOperation,
222    EndOfInteraction,
223}
224
225impl QueuedUserMessageMode {
226    fn from_str(value: &str) -> Self {
227        match value {
228            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
229            "finish_step" | "after_current_operation" => Self::FinishStep,
230            _ => Self::WaitForCompletion,
231        }
232    }
233}
234
235#[derive(Clone, Debug, PartialEq, Eq)]
236pub struct QueuedUserMessage {
237    pub content: String,
238    pub mode: QueuedUserMessageMode,
239}
240
241// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
242#[allow(clippy::new_without_default)]
243impl HostBridge {
244    /// Create a new bridge and spawn the stdin reader task.
245    ///
246    /// Must be called within a tokio LocalSet (uses spawn_local for the
247    /// stdin reader since it's single-threaded).
248    pub fn new() -> Self {
249        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
250            Arc::new(Mutex::new(HashMap::new()));
251        let cancelled = Arc::new(AtomicBool::new(false));
252        let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
253            Arc::new(Mutex::new(VecDeque::new()));
254        let resume_requested = Arc::new(AtomicBool::new(false));
255        let skills_reload_requested = Arc::new(AtomicBool::new(false));
256        let daemon_idle = Arc::new(AtomicBool::new(false));
257
258        // Stdin reader: reads JSON-RPC lines and dispatches responses
259        let pending_clone = pending.clone();
260        let cancelled_clone = cancelled.clone();
261        let queued_clone = queued_user_messages.clone();
262        let resume_clone = resume_requested.clone();
263        let skills_reload_clone = skills_reload_requested.clone();
264        tokio::task::spawn_local(async move {
265            let stdin = tokio::io::stdin();
266            let reader = tokio::io::BufReader::new(stdin);
267            let mut lines = reader.lines();
268
269            while let Ok(Some(line)) = lines.next_line().await {
270                let line = line.trim().to_string();
271                if line.is_empty() {
272                    continue;
273                }
274
275                let msg: serde_json::Value = match serde_json::from_str(&line) {
276                    Ok(v) => v,
277                    Err(_) => continue,
278                };
279
280                // Notifications have no id; responses have one.
281                if msg.get("id").is_none() {
282                    if let Some(method) = msg["method"].as_str() {
283                        if method == "cancel" {
284                            cancelled_clone.store(true, Ordering::SeqCst);
285                        } else if method == "agent/resume" {
286                            resume_clone.store(true, Ordering::SeqCst);
287                        } else if method == "skills/update" {
288                            skills_reload_clone.store(true, Ordering::SeqCst);
289                        } else if method == "user_message"
290                            || method == "session/input"
291                            || method == "agent/user_message"
292                        {
293                            let params = &msg["params"];
294                            let content = params
295                                .get("content")
296                                .and_then(|v| v.as_str())
297                                .unwrap_or("")
298                                .to_string();
299                            if !content.is_empty() {
300                                let mode = QueuedUserMessageMode::from_str(
301                                    params
302                                        .get("mode")
303                                        .and_then(|v| v.as_str())
304                                        .unwrap_or("wait_for_completion"),
305                                );
306                                queued_clone
307                                    .lock()
308                                    .await
309                                    .push_back(QueuedUserMessage { content, mode });
310                            }
311                        }
312                    }
313                    continue;
314                }
315
316                if let Some(id) = msg["id"].as_u64() {
317                    let mut pending = pending_clone.lock().await;
318                    if let Some(sender) = pending.remove(&id) {
319                        let _ = sender.send(msg);
320                    }
321                }
322            }
323
324            // stdin closed: drop pending senders to cancel waiters.
325            let mut pending = pending_clone.lock().await;
326            pending.clear();
327        });
328
329        Self {
330            next_id: AtomicU64::new(1),
331            pending,
332            cancelled,
333            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
334            session_id: std::sync::Mutex::new(String::new()),
335            script_name: std::sync::Mutex::new(String::new()),
336            queued_user_messages,
337            resume_requested,
338            skills_reload_requested,
339            daemon_idle,
340            visible_call_states: std::sync::Mutex::new(HashMap::new()),
341            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
342            in_process: None,
343            #[cfg(test)]
344            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
345        }
346    }
347
348    /// Create a bridge from pre-existing shared state.
349    ///
350    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
351    /// responsible for dispatching responses into `pending`.  This is used
352    /// by ACP mode which already has its own stdin reader.
353    pub fn from_parts(
354        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
355        cancelled: Arc<AtomicBool>,
356        stdout_lock: Arc<std::sync::Mutex<()>>,
357        start_id: u64,
358    ) -> Self {
359        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
360    }
361
362    pub fn from_parts_with_writer(
363        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
364        cancelled: Arc<AtomicBool>,
365        writer: HostBridgeWriter,
366        start_id: u64,
367    ) -> Self {
368        Self {
369            next_id: AtomicU64::new(start_id),
370            pending,
371            cancelled,
372            writer,
373            session_id: std::sync::Mutex::new(String::new()),
374            script_name: std::sync::Mutex::new(String::new()),
375            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
376            resume_requested: Arc::new(AtomicBool::new(false)),
377            skills_reload_requested: Arc::new(AtomicBool::new(false)),
378            daemon_idle: Arc::new(AtomicBool::new(false)),
379            visible_call_states: std::sync::Mutex::new(HashMap::new()),
380            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
381            in_process: None,
382            #[cfg(test)]
383            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
384        }
385    }
386
387    /// Create an in-process host bridge backed by exported functions from a
388    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
389    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
390        let exported_functions = vm.load_module_exports(module_path).await?;
391        Ok(Self {
392            next_id: AtomicU64::new(1),
393            pending: Arc::new(Mutex::new(HashMap::new())),
394            cancelled: Arc::new(AtomicBool::new(false)),
395            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
396            session_id: std::sync::Mutex::new(String::new()),
397            script_name: std::sync::Mutex::new(String::new()),
398            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
399            resume_requested: Arc::new(AtomicBool::new(false)),
400            skills_reload_requested: Arc::new(AtomicBool::new(false)),
401            daemon_idle: Arc::new(AtomicBool::new(false)),
402            visible_call_states: std::sync::Mutex::new(HashMap::new()),
403            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
404            in_process: Some(InProcessHost {
405                module_path: module_path.to_path_buf(),
406                exported_functions,
407                vm,
408            }),
409            #[cfg(test)]
410            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
411        })
412    }
413
414    /// Set the ACP session ID for session-scoped notifications.
415    pub fn set_session_id(&self, id: &str) {
416        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
417    }
418
419    /// Set the currently executing script name (without .harn suffix).
420    pub fn set_script_name(&self, name: &str) {
421        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
422    }
423
424    /// Get the current script name.
425    fn get_script_name(&self) -> String {
426        self.script_name
427            .lock()
428            .unwrap_or_else(|e| e.into_inner())
429            .clone()
430    }
431
432    /// Get the session ID.
433    fn get_session_id(&self) -> String {
434        self.session_id
435            .lock()
436            .unwrap_or_else(|e| e.into_inner())
437            .clone()
438    }
439
440    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
441    fn write_line(&self, line: &str) -> Result<(), VmError> {
442        (self.writer)(line).map_err(VmError::Runtime)
443    }
444
445    /// Send a JSON-RPC request to the host and wait for the response.
446    /// Times out after 5 minutes to prevent deadlocks.
447    pub async fn call(
448        &self,
449        method: &str,
450        params: serde_json::Value,
451    ) -> Result<serde_json::Value, VmError> {
452        if let Some(in_process) = &self.in_process {
453            return in_process.dispatch(method, params).await;
454        }
455
456        if self.is_cancelled() {
457            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
458        }
459
460        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
461
462        let request = crate::jsonrpc::request(id, method, params);
463
464        let (tx, rx) = oneshot::channel();
465        {
466            let mut pending = self.pending.lock().await;
467            pending.insert(id, tx);
468        }
469
470        let line = serde_json::to_string(&request)
471            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
472        if let Err(e) = self.write_line(&line) {
473            let mut pending = self.pending.lock().await;
474            pending.remove(&id);
475            return Err(e);
476        }
477
478        let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
479            Ok(Ok(msg)) => msg,
480            Ok(Err(_)) => {
481                // Sender dropped: host closed or stdin reader exited.
482                return Err(VmError::Runtime(
483                    "Bridge: host closed connection before responding".into(),
484                ));
485            }
486            Err(_) => {
487                let mut pending = self.pending.lock().await;
488                pending.remove(&id);
489                return Err(VmError::Runtime(format!(
490                    "Bridge: host did not respond to '{method}' within {}s",
491                    DEFAULT_TIMEOUT.as_secs()
492                )));
493            }
494        };
495
496        if let Some(error) = response.get("error") {
497            let message = error["message"].as_str().unwrap_or("Unknown host error");
498            let code = error["code"].as_i64().unwrap_or(-1);
499            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
500            if code == -32001 {
501                return Err(VmError::CategorizedError {
502                    message: message.to_string(),
503                    category: ErrorCategory::ToolRejected,
504                });
505            }
506            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
507        }
508
509        Ok(response["result"].clone())
510    }
511
512    /// Send a JSON-RPC notification to the host (no response expected).
513    /// Serialized through the stdout mutex to prevent interleaving.
514    pub fn notify(&self, method: &str, params: serde_json::Value) {
515        let notification = crate::jsonrpc::notification(method, params);
516        #[cfg(test)]
517        self.recorded_notifications
518            .lock()
519            .unwrap_or_else(|e| e.into_inner())
520            .push(notification.clone());
521        if self.in_process.is_some() {
522            return;
523        }
524        if let Ok(line) = serde_json::to_string(&notification) {
525            let _ = self.write_line(&line);
526        }
527    }
528
529    #[cfg(test)]
530    pub(crate) fn recorded_notifications(&self) -> Vec<serde_json::Value> {
531        self.recorded_notifications
532            .lock()
533            .unwrap_or_else(|e| e.into_inner())
534            .clone()
535    }
536
537    /// Check if the host has sent a cancel notification.
538    pub fn is_cancelled(&self) -> bool {
539        self.cancelled.load(Ordering::SeqCst)
540    }
541
542    pub fn take_resume_signal(&self) -> bool {
543        self.resume_requested.swap(false, Ordering::SeqCst)
544    }
545
546    pub fn signal_resume(&self) {
547        self.resume_requested.store(true, Ordering::SeqCst);
548    }
549
550    pub fn set_daemon_idle(&self, idle: bool) {
551        self.daemon_idle.store(idle, Ordering::SeqCst);
552    }
553
554    pub fn is_daemon_idle(&self) -> bool {
555        self.daemon_idle.load(Ordering::SeqCst)
556    }
557
558    /// Consume any pending `skills/update` signal the host has sent.
559    /// Returns `true` exactly once per notification, letting callers
560    /// trigger a layered-discovery rebuild without polling false
561    /// positives. See issue #73 for the hot-reload contract.
562    pub fn take_skills_reload_signal(&self) -> bool {
563        self.skills_reload_requested.swap(false, Ordering::SeqCst)
564    }
565
566    /// Manually mark the skill catalog as stale. Used by tests and by
567    /// the CLI when an internal event (e.g. `harn install`) should
568    /// trigger the same rebuild a `skills/update` notification would.
569    pub fn signal_skills_reload(&self) {
570        self.skills_reload_requested.store(true, Ordering::SeqCst);
571    }
572
573    /// Call the host's `skills/list` RPC and return the raw JSON array
574    /// it responded with. Shape:
575    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
576    /// The CLI adapter converts each entry into a
577    /// [`crate::skills::SkillManifestRef`].
578    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
579        let result = self.call("skills/list", serde_json::json!({})).await?;
580        match result {
581            serde_json::Value::Array(items) => Ok(items),
582            serde_json::Value::Object(map) => match map.get("skills") {
583                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
584                _ => Err(VmError::Runtime(
585                    "skills/list: host response must be an array or { skills: [...] }".into(),
586                )),
587            },
588            _ => Err(VmError::Runtime(
589                "skills/list: unexpected response shape".into(),
590            )),
591        }
592    }
593
594    /// Call the host's `host/tools/list` RPC and return normalized tool
595    /// descriptors. Shape:
596    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
597    /// The bridge also accepts `{ "tools": [...] }` and
598    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
599    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
600        let result = self.call("host/tools/list", serde_json::json!({})).await?;
601        parse_host_tools_list_response(result)
602    }
603
604    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
605    /// raw JSON body so the CLI can inspect both the frontmatter fields
606    /// and the skill markdown body in whatever shape the host sends.
607    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
608        self.call("skills/fetch", serde_json::json!({ "id": id }))
609            .await
610    }
611
612    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
613        self.queued_user_messages
614            .lock()
615            .await
616            .push_back(QueuedUserMessage {
617                content,
618                mode: QueuedUserMessageMode::from_str(mode),
619            });
620    }
621
622    pub async fn take_queued_user_messages(
623        &self,
624        include_interrupt_immediate: bool,
625        include_finish_step: bool,
626        include_wait_for_completion: bool,
627    ) -> Vec<QueuedUserMessage> {
628        let mut queue = self.queued_user_messages.lock().await;
629        let mut selected = Vec::new();
630        let mut retained = VecDeque::new();
631        while let Some(message) = queue.pop_front() {
632            let should_take = match message.mode {
633                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
634                QueuedUserMessageMode::FinishStep => include_finish_step,
635                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
636            };
637            if should_take {
638                selected.push(message);
639            } else {
640                retained.push_back(message);
641            }
642        }
643        *queue = retained;
644        selected
645    }
646
647    pub async fn take_queued_user_messages_for(
648        &self,
649        checkpoint: DeliveryCheckpoint,
650    ) -> Vec<QueuedUserMessage> {
651        match checkpoint {
652            DeliveryCheckpoint::InterruptImmediate => {
653                self.take_queued_user_messages(true, false, false).await
654            }
655            DeliveryCheckpoint::AfterCurrentOperation => {
656                self.take_queued_user_messages(false, true, false).await
657            }
658            DeliveryCheckpoint::EndOfInteraction => {
659                self.take_queued_user_messages(false, false, true).await
660            }
661        }
662    }
663
664    /// Send an output notification (for log/print in bridge mode).
665    pub fn send_output(&self, text: &str) {
666        self.notify("output", serde_json::json!({"text": text}));
667    }
668
669    /// Send a progress notification with optional numeric progress and structured data.
670    pub fn send_progress(
671        &self,
672        phase: &str,
673        message: &str,
674        progress: Option<i64>,
675        total: Option<i64>,
676        data: Option<serde_json::Value>,
677    ) {
678        let mut payload = serde_json::json!({"phase": phase, "message": message});
679        if let Some(p) = progress {
680            payload["progress"] = serde_json::json!(p);
681        }
682        if let Some(t) = total {
683            payload["total"] = serde_json::json!(t);
684        }
685        if let Some(d) = data {
686            payload["data"] = d;
687        }
688        self.notify("progress", payload);
689    }
690
691    /// Send a structured log notification.
692    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
693        let mut payload = serde_json::json!({"level": level, "message": message});
694        if let Some(f) = fields {
695            payload["fields"] = f;
696        }
697        self.notify("log", payload);
698    }
699
700    /// Send a `session/update` with `call_start` — signals the beginning of
701    /// an LLM call, tool call, or builtin call for observability.
702    pub fn send_call_start(
703        &self,
704        call_id: &str,
705        call_type: &str,
706        name: &str,
707        metadata: serde_json::Value,
708    ) {
709        let session_id = self.get_session_id();
710        let script = self.get_script_name();
711        let stream_publicly = metadata
712            .get("stream_publicly")
713            .and_then(|value| value.as_bool())
714            .unwrap_or(true);
715        self.visible_call_streams
716            .lock()
717            .unwrap_or_else(|e| e.into_inner())
718            .insert(call_id.to_string(), stream_publicly);
719        self.notify(
720            "session/update",
721            serde_json::json!({
722                "sessionId": session_id,
723                "update": {
724                    "sessionUpdate": "call_start",
725                    "content": {
726                        "toolCallId": call_id,
727                        "call_type": call_type,
728                        "name": name,
729                        "script": script,
730                        "metadata": metadata,
731                    },
732                },
733            }),
734        );
735    }
736
737    /// Send a `session/update` with `call_progress` — a streaming token delta
738    /// from an in-flight LLM call.
739    pub fn send_call_progress(
740        &self,
741        call_id: &str,
742        delta: &str,
743        accumulated_tokens: u64,
744        user_visible: bool,
745    ) {
746        let session_id = self.get_session_id();
747        let (visible_text, visible_delta) = {
748            let stream_publicly = self
749                .visible_call_streams
750                .lock()
751                .unwrap_or_else(|e| e.into_inner())
752                .get(call_id)
753                .copied()
754                .unwrap_or(true);
755            let mut states = self
756                .visible_call_states
757                .lock()
758                .unwrap_or_else(|e| e.into_inner());
759            let state = states.entry(call_id.to_string()).or_default();
760            state.push(delta, stream_publicly)
761        };
762        self.notify(
763            "session/update",
764            serde_json::json!({
765                "sessionId": session_id,
766                "update": {
767                    "sessionUpdate": "call_progress",
768                    "content": {
769                        "toolCallId": call_id,
770                        "delta": delta,
771                        "accumulated_tokens": accumulated_tokens,
772                        "visible_text": visible_text,
773                        "visible_delta": visible_delta,
774                        "user_visible": user_visible,
775                    },
776                },
777            }),
778        );
779    }
780
781    /// Send a `session/update` with `call_end` — signals completion of a call.
782    pub fn send_call_end(
783        &self,
784        call_id: &str,
785        call_type: &str,
786        name: &str,
787        duration_ms: u64,
788        status: &str,
789        metadata: serde_json::Value,
790    ) {
791        let session_id = self.get_session_id();
792        let script = self.get_script_name();
793        self.visible_call_states
794            .lock()
795            .unwrap_or_else(|e| e.into_inner())
796            .remove(call_id);
797        self.visible_call_streams
798            .lock()
799            .unwrap_or_else(|e| e.into_inner())
800            .remove(call_id);
801        self.notify(
802            "session/update",
803            serde_json::json!({
804                "sessionId": session_id,
805                "update": {
806                    "sessionUpdate": "call_end",
807                    "content": {
808                        "toolCallId": call_id,
809                        "call_type": call_type,
810                        "name": name,
811                        "script": script,
812                        "duration_ms": duration_ms,
813                        "status": status,
814                        "metadata": metadata,
815                    },
816                },
817            }),
818        );
819    }
820
821    /// Send a worker lifecycle update for delegated/background execution.
822    pub fn send_worker_update(
823        &self,
824        worker_id: &str,
825        worker_name: &str,
826        status: &str,
827        metadata: serde_json::Value,
828        audit: Option<&MutationSessionRecord>,
829    ) {
830        let session_id = self.get_session_id();
831        let script = self.get_script_name();
832        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
833        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
834        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
835        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
836        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
837        let lifecycle = serde_json::json!({
838            "event": status,
839            "worker_id": worker_id,
840            "worker_name": worker_name,
841            "started_at": started_at,
842            "finished_at": finished_at,
843        });
844        self.notify(
845            "session/update",
846            serde_json::json!({
847                "sessionId": session_id,
848                "update": {
849                    "sessionUpdate": "worker_update",
850                    "content": {
851                        "worker_id": worker_id,
852                        "worker_name": worker_name,
853                        "status": status,
854                        "script": script,
855                        "started_at": started_at,
856                        "finished_at": finished_at,
857                        "snapshot_path": snapshot_path,
858                        "run_id": run_id,
859                        "run_path": run_path,
860                        "lifecycle": lifecycle,
861                        "audit": audit,
862                        "metadata": metadata,
863                    },
864                },
865            }),
866        );
867    }
868}
869
870/// Convert a serde_json::Value to a VmValue.
871pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
872    crate::stdlib::json_to_vm_value(val)
873}
874
875fn parse_host_tools_list_response(
876    result: serde_json::Value,
877) -> Result<Vec<serde_json::Value>, VmError> {
878    let tools = match result {
879        serde_json::Value::Array(items) => items,
880        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
881            map.get("result")
882                .and_then(|value| value.get("tools"))
883                .cloned()
884        }) {
885            Some(serde_json::Value::Array(items)) => items,
886            _ => {
887                return Err(VmError::Runtime(
888                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
889                ));
890            }
891        },
892        _ => {
893            return Err(VmError::Runtime(
894                "host/tools/list: unexpected response shape".into(),
895            ));
896        }
897    };
898
899    let mut normalized = Vec::with_capacity(tools.len());
900    for tool in tools {
901        let serde_json::Value::Object(map) = tool else {
902            return Err(VmError::Runtime(
903                "host/tools/list: every tool must be an object".into(),
904            ));
905        };
906        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
907            return Err(VmError::Runtime(
908                "host/tools/list: every tool must include a string `name`".into(),
909            ));
910        };
911        let description = map
912            .get("description")
913            .and_then(|value| value.as_str())
914            .or_else(|| {
915                map.get("short_description")
916                    .and_then(|value| value.as_str())
917            })
918            .unwrap_or_default();
919        let schema = map
920            .get("schema")
921            .cloned()
922            .or_else(|| map.get("parameters").cloned())
923            .or_else(|| map.get("input_schema").cloned())
924            .unwrap_or(serde_json::Value::Null);
925        let deprecated = map
926            .get("deprecated")
927            .and_then(|value| value.as_bool())
928            .unwrap_or(false);
929        normalized.push(serde_json::json!({
930            "name": name,
931            "description": description,
932            "schema": schema,
933            "deprecated": deprecated,
934        }));
935    }
936    Ok(normalized)
937}
938
939#[cfg(test)]
940mod tests {
941    use super::*;
942
943    #[test]
944    fn test_json_rpc_request_format() {
945        let request = crate::jsonrpc::request(
946            1,
947            "llm_call",
948            serde_json::json!({
949                "prompt": "Hello",
950                "system": "Be helpful",
951            }),
952        );
953        let s = serde_json::to_string(&request).unwrap();
954        assert!(s.contains("\"jsonrpc\":\"2.0\""));
955        assert!(s.contains("\"id\":1"));
956        assert!(s.contains("\"method\":\"llm_call\""));
957    }
958
959    #[test]
960    fn test_json_rpc_notification_format() {
961        let notification =
962            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
963        let s = serde_json::to_string(&notification).unwrap();
964        assert!(s.contains("\"method\":\"output\""));
965        assert!(!s.contains("\"id\""));
966    }
967
968    #[test]
969    fn test_json_rpc_error_response_parsing() {
970        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
971        assert!(response.get("error").is_some());
972        assert_eq!(
973            response["error"]["message"].as_str().unwrap(),
974            "Invalid request"
975        );
976    }
977
978    #[test]
979    fn test_json_rpc_success_response_parsing() {
980        let response = crate::jsonrpc::response(
981            1,
982            serde_json::json!({
983                "text": "Hello world",
984                "input_tokens": 10,
985                "output_tokens": 5,
986            }),
987        );
988        assert!(response.get("result").is_some());
989        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
990    }
991
992    #[test]
993    fn test_cancelled_flag() {
994        let cancelled = Arc::new(AtomicBool::new(false));
995        assert!(!cancelled.load(Ordering::SeqCst));
996        cancelled.store(true, Ordering::SeqCst);
997        assert!(cancelled.load(Ordering::SeqCst));
998    }
999
1000    #[test]
1001    fn queued_messages_are_filtered_by_delivery_mode() {
1002        let runtime = tokio::runtime::Builder::new_current_thread()
1003            .enable_all()
1004            .build()
1005            .unwrap();
1006        runtime.block_on(async {
1007            let bridge = HostBridge::from_parts(
1008                Arc::new(Mutex::new(HashMap::new())),
1009                Arc::new(AtomicBool::new(false)),
1010                Arc::new(std::sync::Mutex::new(())),
1011                1,
1012            );
1013            bridge
1014                .push_queued_user_message("first".to_string(), "finish_step")
1015                .await;
1016            bridge
1017                .push_queued_user_message("second".to_string(), "wait_for_completion")
1018                .await;
1019
1020            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1021            assert_eq!(finish_step.len(), 1);
1022            assert_eq!(finish_step[0].content, "first");
1023
1024            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1025            assert_eq!(turn_end.len(), 1);
1026            assert_eq!(turn_end[0].content, "second");
1027        });
1028    }
1029
1030    #[test]
1031    fn test_json_result_to_vm_value_string() {
1032        let val = serde_json::json!("hello");
1033        let vm_val = json_result_to_vm_value(&val);
1034        assert_eq!(vm_val.display(), "hello");
1035    }
1036
1037    #[test]
1038    fn test_json_result_to_vm_value_dict() {
1039        let val = serde_json::json!({"name": "test", "count": 42});
1040        let vm_val = json_result_to_vm_value(&val);
1041        let VmValue::Dict(d) = &vm_val else {
1042            unreachable!("Expected Dict, got {:?}", vm_val);
1043        };
1044        assert_eq!(d.get("name").unwrap().display(), "test");
1045        assert_eq!(d.get("count").unwrap().display(), "42");
1046    }
1047
1048    #[test]
1049    fn test_json_result_to_vm_value_null() {
1050        let val = serde_json::json!(null);
1051        let vm_val = json_result_to_vm_value(&val);
1052        assert!(matches!(vm_val, VmValue::Nil));
1053    }
1054
1055    #[test]
1056    fn test_json_result_to_vm_value_nested() {
1057        let val = serde_json::json!({
1058            "text": "response",
1059            "tool_calls": [
1060                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1061            ],
1062            "input_tokens": 100,
1063            "output_tokens": 50,
1064        });
1065        let vm_val = json_result_to_vm_value(&val);
1066        let VmValue::Dict(d) = &vm_val else {
1067            unreachable!("Expected Dict, got {:?}", vm_val);
1068        };
1069        assert_eq!(d.get("text").unwrap().display(), "response");
1070        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1071            unreachable!("Expected List for tool_calls");
1072        };
1073        assert_eq!(list.len(), 1);
1074    }
1075
1076    #[test]
1077    fn parse_host_tools_list_accepts_object_wrapper() {
1078        let tools = parse_host_tools_list_response(serde_json::json!({
1079            "tools": [
1080                {
1081                    "name": "Read",
1082                    "description": "Read a file",
1083                    "schema": {"type": "object"},
1084                }
1085            ]
1086        }))
1087        .expect("tool list");
1088
1089        assert_eq!(tools.len(), 1);
1090        assert_eq!(tools[0]["name"], "Read");
1091        assert_eq!(tools[0]["deprecated"], false);
1092    }
1093
1094    #[test]
1095    fn parse_host_tools_list_accepts_compat_fields() {
1096        let tools = parse_host_tools_list_response(serde_json::json!({
1097            "result": {
1098                "tools": [
1099                    {
1100                        "name": "Edit",
1101                        "short_description": "Apply an edit",
1102                        "input_schema": {"type": "object"},
1103                        "deprecated": true,
1104                    }
1105                ]
1106            }
1107        }))
1108        .expect("tool list");
1109
1110        assert_eq!(tools[0]["description"], "Apply an edit");
1111        assert_eq!(tools[0]["schema"]["type"], "object");
1112        assert_eq!(tools[0]["deprecated"], true);
1113    }
1114
1115    #[test]
1116    fn parse_host_tools_list_requires_tool_names() {
1117        let err = parse_host_tools_list_response(serde_json::json!({
1118            "tools": [
1119                {"description": "missing name"}
1120            ]
1121        }))
1122        .expect_err("expected error");
1123        assert!(err
1124            .to_string()
1125            .contains("host/tools/list: every tool must include a string `name`"));
1126    }
1127
1128    #[test]
1129    fn test_timeout_duration() {
1130        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1131    }
1132}