Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23/// Default timeout for bridge calls (5 minutes).
24const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
27///
28/// The bridge sends requests to the host on stdout and receives responses
29/// on stdin. A background task reads stdin and dispatches responses to
30/// waiting callers by request ID. All stdout writes are serialized through
31/// a mutex to prevent interleaving.
32pub struct HostBridge {
33    next_id: AtomicU64,
34    /// Pending request waiters, keyed by JSON-RPC id.
35    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
36    /// Whether the host has sent a cancel notification.
37    cancelled: Arc<AtomicBool>,
38    /// Mutex protecting stdout writes to prevent interleaving.
39    stdout_lock: Arc<std::sync::Mutex<()>>,
40    /// ACP session ID (set in ACP mode for session-scoped notifications).
41    session_id: std::sync::Mutex<String>,
42    /// Name of the currently executing Harn script (without .harn suffix).
43    script_name: std::sync::Mutex<String>,
44    /// User messages injected by the host while a run is active.
45    queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
46    /// Host-triggered resume signal for daemon agents.
47    resume_requested: Arc<AtomicBool>,
48    /// Host-triggered skill-registry invalidation signal. Set when the
49    /// host sends a `skills/update` notification; consumed by the CLI
50    /// between runs (watch mode, long-running agents) to rebuild the
51    /// layered skill catalog from its current filesystem + host state.
52    skills_reload_requested: Arc<AtomicBool>,
53    /// Per-call visible assistant text state for call_progress notifications.
54    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
55    /// Whether an LLM call's deltas should be exposed to end users while streaming.
56    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
57    /// Optional in-process host-module backend used by `harn playground`.
58    in_process: Option<InProcessHost>,
59    #[cfg(test)]
60    recorded_notifications: Arc<std::sync::Mutex<Vec<serde_json::Value>>>,
61}
62
63struct InProcessHost {
64    module_path: PathBuf,
65    exported_functions: BTreeMap<String, Rc<VmClosure>>,
66    vm: Vm,
67}
68
69impl InProcessHost {
70    async fn dispatch(
71        &self,
72        method: &str,
73        params: serde_json::Value,
74    ) -> Result<serde_json::Value, VmError> {
75        match method {
76            "builtin_call" => {
77                let name = params
78                    .get("name")
79                    .and_then(|value| value.as_str())
80                    .unwrap_or_default();
81                let args = params
82                    .get("args")
83                    .and_then(|value| value.as_array())
84                    .cloned()
85                    .unwrap_or_default()
86                    .into_iter()
87                    .map(|value| json_result_to_vm_value(&value))
88                    .collect::<Vec<_>>();
89                self.invoke_export(name, &args).await
90            }
91            "session/request_permission" => self.request_permission(params).await,
92            other => Err(VmError::Runtime(format!(
93                "playground host backend does not implement bridge method '{other}'"
94            ))),
95        }
96    }
97
98    async fn invoke_export(
99        &self,
100        name: &str,
101        args: &[VmValue],
102    ) -> Result<serde_json::Value, VmError> {
103        let Some(closure) = self.exported_functions.get(name) else {
104            return Err(VmError::Runtime(format!(
105                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
106                self.module_path.display()
107            )));
108        };
109
110        let mut vm = self.vm.child_vm_for_host();
111        let result = vm.call_closure_pub(closure, args, &[]).await?;
112        Ok(crate::llm::vm_value_to_json(&result))
113    }
114
115    async fn request_permission(
116        &self,
117        params: serde_json::Value,
118    ) -> Result<serde_json::Value, VmError> {
119        let Some(closure) = self.exported_functions.get("request_permission") else {
120            return Ok(serde_json::json!({ "granted": true }));
121        };
122
123        let tool_name = params
124            .get("toolCall")
125            .and_then(|tool_call| tool_call.get("toolName"))
126            .and_then(|value| value.as_str())
127            .unwrap_or_default();
128        let tool_args = params
129            .get("toolCall")
130            .and_then(|tool_call| tool_call.get("rawInput"))
131            .map(json_result_to_vm_value)
132            .unwrap_or(VmValue::Nil);
133        let full_payload = json_result_to_vm_value(&params);
134
135        let arg_count = closure.func.params.len();
136        let args = if arg_count >= 3 {
137            vec![
138                VmValue::String(Rc::from(tool_name.to_string())),
139                tool_args,
140                full_payload,
141            ]
142        } else if arg_count == 2 {
143            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
144        } else if arg_count == 1 {
145            vec![full_payload]
146        } else {
147            Vec::new()
148        };
149
150        let mut vm = self.vm.child_vm_for_host();
151        let result = vm.call_closure_pub(closure, &args, &[]).await?;
152        let payload = match result {
153            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
154            VmValue::String(reason) if !reason.is_empty() => {
155                serde_json::json!({ "granted": false, "reason": reason.to_string() })
156            }
157            other => {
158                let json = crate::llm::vm_value_to_json(&other);
159                if json
160                    .get("granted")
161                    .and_then(|value| value.as_bool())
162                    .is_some()
163                    || json.get("outcome").is_some()
164                {
165                    json
166                } else {
167                    serde_json::json!({ "granted": other.is_truthy() })
168                }
169            }
170        };
171        Ok(payload)
172    }
173}
174
175#[derive(Clone, Debug, PartialEq, Eq)]
176pub enum QueuedUserMessageMode {
177    InterruptImmediate,
178    FinishStep,
179    WaitForCompletion,
180}
181
182#[derive(Clone, Copy, Debug, PartialEq, Eq)]
183pub enum DeliveryCheckpoint {
184    InterruptImmediate,
185    AfterCurrentOperation,
186    EndOfInteraction,
187}
188
189impl QueuedUserMessageMode {
190    fn from_str(value: &str) -> Self {
191        match value {
192            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
193            "finish_step" | "after_current_operation" => Self::FinishStep,
194            _ => Self::WaitForCompletion,
195        }
196    }
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct QueuedUserMessage {
201    pub content: String,
202    pub mode: QueuedUserMessageMode,
203}
204
205// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
206#[allow(clippy::new_without_default)]
207impl HostBridge {
208    /// Create a new bridge and spawn the stdin reader task.
209    ///
210    /// Must be called within a tokio LocalSet (uses spawn_local for the
211    /// stdin reader since it's single-threaded).
212    pub fn new() -> Self {
213        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
214            Arc::new(Mutex::new(HashMap::new()));
215        let cancelled = Arc::new(AtomicBool::new(false));
216        let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
217            Arc::new(Mutex::new(VecDeque::new()));
218        let resume_requested = Arc::new(AtomicBool::new(false));
219        let skills_reload_requested = Arc::new(AtomicBool::new(false));
220
221        // Stdin reader: reads JSON-RPC lines and dispatches responses
222        let pending_clone = pending.clone();
223        let cancelled_clone = cancelled.clone();
224        let queued_clone = queued_user_messages.clone();
225        let resume_clone = resume_requested.clone();
226        let skills_reload_clone = skills_reload_requested.clone();
227        tokio::task::spawn_local(async move {
228            let stdin = tokio::io::stdin();
229            let reader = tokio::io::BufReader::new(stdin);
230            let mut lines = reader.lines();
231
232            while let Ok(Some(line)) = lines.next_line().await {
233                let line = line.trim().to_string();
234                if line.is_empty() {
235                    continue;
236                }
237
238                let msg: serde_json::Value = match serde_json::from_str(&line) {
239                    Ok(v) => v,
240                    Err(_) => continue,
241                };
242
243                // Notifications have no id; responses have one.
244                if msg.get("id").is_none() {
245                    if let Some(method) = msg["method"].as_str() {
246                        if method == "cancel" {
247                            cancelled_clone.store(true, Ordering::SeqCst);
248                        } else if method == "agent/resume" {
249                            resume_clone.store(true, Ordering::SeqCst);
250                        } else if method == "skills/update" {
251                            skills_reload_clone.store(true, Ordering::SeqCst);
252                        } else if method == "user_message"
253                            || method == "session/input"
254                            || method == "agent/user_message"
255                        {
256                            let params = &msg["params"];
257                            let content = params
258                                .get("content")
259                                .and_then(|v| v.as_str())
260                                .unwrap_or("")
261                                .to_string();
262                            if !content.is_empty() {
263                                let mode = QueuedUserMessageMode::from_str(
264                                    params
265                                        .get("mode")
266                                        .and_then(|v| v.as_str())
267                                        .unwrap_or("wait_for_completion"),
268                                );
269                                queued_clone
270                                    .lock()
271                                    .await
272                                    .push_back(QueuedUserMessage { content, mode });
273                            }
274                        }
275                    }
276                    continue;
277                }
278
279                if let Some(id) = msg["id"].as_u64() {
280                    let mut pending = pending_clone.lock().await;
281                    if let Some(sender) = pending.remove(&id) {
282                        let _ = sender.send(msg);
283                    }
284                }
285            }
286
287            // stdin closed: drop pending senders to cancel waiters.
288            let mut pending = pending_clone.lock().await;
289            pending.clear();
290        });
291
292        Self {
293            next_id: AtomicU64::new(1),
294            pending,
295            cancelled,
296            stdout_lock: Arc::new(std::sync::Mutex::new(())),
297            session_id: std::sync::Mutex::new(String::new()),
298            script_name: std::sync::Mutex::new(String::new()),
299            queued_user_messages,
300            resume_requested,
301            skills_reload_requested,
302            visible_call_states: std::sync::Mutex::new(HashMap::new()),
303            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
304            in_process: None,
305            #[cfg(test)]
306            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
307        }
308    }
309
310    /// Create a bridge from pre-existing shared state.
311    ///
312    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
313    /// responsible for dispatching responses into `pending`.  This is used
314    /// by ACP mode which already has its own stdin reader.
315    pub fn from_parts(
316        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
317        cancelled: Arc<AtomicBool>,
318        stdout_lock: Arc<std::sync::Mutex<()>>,
319        start_id: u64,
320    ) -> Self {
321        Self {
322            next_id: AtomicU64::new(start_id),
323            pending,
324            cancelled,
325            stdout_lock,
326            session_id: std::sync::Mutex::new(String::new()),
327            script_name: std::sync::Mutex::new(String::new()),
328            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
329            resume_requested: Arc::new(AtomicBool::new(false)),
330            skills_reload_requested: Arc::new(AtomicBool::new(false)),
331            visible_call_states: std::sync::Mutex::new(HashMap::new()),
332            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
333            in_process: None,
334            #[cfg(test)]
335            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
336        }
337    }
338
339    /// Create an in-process host bridge backed by exported functions from a
340    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
341    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
342        let exported_functions = vm.load_module_exports(module_path).await?;
343        Ok(Self {
344            next_id: AtomicU64::new(1),
345            pending: Arc::new(Mutex::new(HashMap::new())),
346            cancelled: Arc::new(AtomicBool::new(false)),
347            stdout_lock: Arc::new(std::sync::Mutex::new(())),
348            session_id: std::sync::Mutex::new(String::new()),
349            script_name: std::sync::Mutex::new(String::new()),
350            queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
351            resume_requested: Arc::new(AtomicBool::new(false)),
352            skills_reload_requested: Arc::new(AtomicBool::new(false)),
353            visible_call_states: std::sync::Mutex::new(HashMap::new()),
354            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
355            in_process: Some(InProcessHost {
356                module_path: module_path.to_path_buf(),
357                exported_functions,
358                vm,
359            }),
360            #[cfg(test)]
361            recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
362        })
363    }
364
365    /// Set the ACP session ID for session-scoped notifications.
366    pub fn set_session_id(&self, id: &str) {
367        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
368    }
369
370    /// Set the currently executing script name (without .harn suffix).
371    pub fn set_script_name(&self, name: &str) {
372        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
373    }
374
375    /// Get the current script name.
376    fn get_script_name(&self) -> String {
377        self.script_name
378            .lock()
379            .unwrap_or_else(|e| e.into_inner())
380            .clone()
381    }
382
383    /// Get the session ID.
384    fn get_session_id(&self) -> String {
385        self.session_id
386            .lock()
387            .unwrap_or_else(|e| e.into_inner())
388            .clone()
389    }
390
391    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
392    fn write_line(&self, line: &str) -> Result<(), VmError> {
393        let _guard = self.stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
394        let mut stdout = std::io::stdout().lock();
395        stdout
396            .write_all(line.as_bytes())
397            .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
398        stdout
399            .write_all(b"\n")
400            .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
401        stdout
402            .flush()
403            .map_err(|e| VmError::Runtime(format!("Bridge flush error: {e}")))?;
404        Ok(())
405    }
406
407    /// Send a JSON-RPC request to the host and wait for the response.
408    /// Times out after 5 minutes to prevent deadlocks.
409    pub async fn call(
410        &self,
411        method: &str,
412        params: serde_json::Value,
413    ) -> Result<serde_json::Value, VmError> {
414        if let Some(in_process) = &self.in_process {
415            return in_process.dispatch(method, params).await;
416        }
417
418        if self.is_cancelled() {
419            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
420        }
421
422        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
423
424        let request = crate::jsonrpc::request(id, method, params);
425
426        let (tx, rx) = oneshot::channel();
427        {
428            let mut pending = self.pending.lock().await;
429            pending.insert(id, tx);
430        }
431
432        let line = serde_json::to_string(&request)
433            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
434        if let Err(e) = self.write_line(&line) {
435            let mut pending = self.pending.lock().await;
436            pending.remove(&id);
437            return Err(e);
438        }
439
440        let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
441            Ok(Ok(msg)) => msg,
442            Ok(Err(_)) => {
443                // Sender dropped: host closed or stdin reader exited.
444                return Err(VmError::Runtime(
445                    "Bridge: host closed connection before responding".into(),
446                ));
447            }
448            Err(_) => {
449                let mut pending = self.pending.lock().await;
450                pending.remove(&id);
451                return Err(VmError::Runtime(format!(
452                    "Bridge: host did not respond to '{method}' within {}s",
453                    DEFAULT_TIMEOUT.as_secs()
454                )));
455            }
456        };
457
458        if let Some(error) = response.get("error") {
459            let message = error["message"].as_str().unwrap_or("Unknown host error");
460            let code = error["code"].as_i64().unwrap_or(-1);
461            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
462            if code == -32001 {
463                return Err(VmError::CategorizedError {
464                    message: message.to_string(),
465                    category: ErrorCategory::ToolRejected,
466                });
467            }
468            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
469        }
470
471        Ok(response["result"].clone())
472    }
473
474    /// Send a JSON-RPC notification to the host (no response expected).
475    /// Serialized through the stdout mutex to prevent interleaving.
476    pub fn notify(&self, method: &str, params: serde_json::Value) {
477        let notification = crate::jsonrpc::notification(method, params);
478        #[cfg(test)]
479        self.recorded_notifications
480            .lock()
481            .unwrap_or_else(|e| e.into_inner())
482            .push(notification.clone());
483        if self.in_process.is_some() {
484            return;
485        }
486        if let Ok(line) = serde_json::to_string(&notification) {
487            let _ = self.write_line(&line);
488        }
489    }
490
491    #[cfg(test)]
492    pub(crate) fn recorded_notifications(&self) -> Vec<serde_json::Value> {
493        self.recorded_notifications
494            .lock()
495            .unwrap_or_else(|e| e.into_inner())
496            .clone()
497    }
498
499    /// Check if the host has sent a cancel notification.
500    pub fn is_cancelled(&self) -> bool {
501        self.cancelled.load(Ordering::SeqCst)
502    }
503
504    pub fn take_resume_signal(&self) -> bool {
505        self.resume_requested.swap(false, Ordering::SeqCst)
506    }
507
508    pub fn signal_resume(&self) {
509        self.resume_requested.store(true, Ordering::SeqCst);
510    }
511
512    /// Consume any pending `skills/update` signal the host has sent.
513    /// Returns `true` exactly once per notification, letting callers
514    /// trigger a layered-discovery rebuild without polling false
515    /// positives. See issue #73 for the hot-reload contract.
516    pub fn take_skills_reload_signal(&self) -> bool {
517        self.skills_reload_requested.swap(false, Ordering::SeqCst)
518    }
519
520    /// Manually mark the skill catalog as stale. Used by tests and by
521    /// the CLI when an internal event (e.g. `harn install`) should
522    /// trigger the same rebuild a `skills/update` notification would.
523    pub fn signal_skills_reload(&self) {
524        self.skills_reload_requested.store(true, Ordering::SeqCst);
525    }
526
527    /// Call the host's `skills/list` RPC and return the raw JSON array
528    /// it responded with. Shape:
529    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
530    /// The CLI adapter converts each entry into a
531    /// [`crate::skills::SkillManifestRef`].
532    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
533        let result = self.call("skills/list", serde_json::json!({})).await?;
534        match result {
535            serde_json::Value::Array(items) => Ok(items),
536            serde_json::Value::Object(map) => match map.get("skills") {
537                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
538                _ => Err(VmError::Runtime(
539                    "skills/list: host response must be an array or { skills: [...] }".into(),
540                )),
541            },
542            _ => Err(VmError::Runtime(
543                "skills/list: unexpected response shape".into(),
544            )),
545        }
546    }
547
548    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
549    /// raw JSON body so the CLI can inspect both the frontmatter fields
550    /// and the skill markdown body in whatever shape the host sends.
551    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
552        self.call("skills/fetch", serde_json::json!({ "id": id }))
553            .await
554    }
555
556    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
557        self.queued_user_messages
558            .lock()
559            .await
560            .push_back(QueuedUserMessage {
561                content,
562                mode: QueuedUserMessageMode::from_str(mode),
563            });
564    }
565
566    pub async fn take_queued_user_messages(
567        &self,
568        include_interrupt_immediate: bool,
569        include_finish_step: bool,
570        include_wait_for_completion: bool,
571    ) -> Vec<QueuedUserMessage> {
572        let mut queue = self.queued_user_messages.lock().await;
573        let mut selected = Vec::new();
574        let mut retained = VecDeque::new();
575        while let Some(message) = queue.pop_front() {
576            let should_take = match message.mode {
577                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
578                QueuedUserMessageMode::FinishStep => include_finish_step,
579                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
580            };
581            if should_take {
582                selected.push(message);
583            } else {
584                retained.push_back(message);
585            }
586        }
587        *queue = retained;
588        selected
589    }
590
591    pub async fn take_queued_user_messages_for(
592        &self,
593        checkpoint: DeliveryCheckpoint,
594    ) -> Vec<QueuedUserMessage> {
595        match checkpoint {
596            DeliveryCheckpoint::InterruptImmediate => {
597                self.take_queued_user_messages(true, false, false).await
598            }
599            DeliveryCheckpoint::AfterCurrentOperation => {
600                self.take_queued_user_messages(false, true, false).await
601            }
602            DeliveryCheckpoint::EndOfInteraction => {
603                self.take_queued_user_messages(false, false, true).await
604            }
605        }
606    }
607
608    /// Send an output notification (for log/print in bridge mode).
609    pub fn send_output(&self, text: &str) {
610        self.notify("output", serde_json::json!({"text": text}));
611    }
612
613    /// Send a progress notification with optional numeric progress and structured data.
614    pub fn send_progress(
615        &self,
616        phase: &str,
617        message: &str,
618        progress: Option<i64>,
619        total: Option<i64>,
620        data: Option<serde_json::Value>,
621    ) {
622        let mut payload = serde_json::json!({"phase": phase, "message": message});
623        if let Some(p) = progress {
624            payload["progress"] = serde_json::json!(p);
625        }
626        if let Some(t) = total {
627            payload["total"] = serde_json::json!(t);
628        }
629        if let Some(d) = data {
630            payload["data"] = d;
631        }
632        self.notify("progress", payload);
633    }
634
635    /// Send a structured log notification.
636    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
637        let mut payload = serde_json::json!({"level": level, "message": message});
638        if let Some(f) = fields {
639            payload["fields"] = f;
640        }
641        self.notify("log", payload);
642    }
643
644    /// Send a `session/update` with `call_start` — signals the beginning of
645    /// an LLM call, tool call, or builtin call for observability.
646    pub fn send_call_start(
647        &self,
648        call_id: &str,
649        call_type: &str,
650        name: &str,
651        metadata: serde_json::Value,
652    ) {
653        let session_id = self.get_session_id();
654        let script = self.get_script_name();
655        let stream_publicly = metadata
656            .get("stream_publicly")
657            .and_then(|value| value.as_bool())
658            .unwrap_or(true);
659        self.visible_call_streams
660            .lock()
661            .unwrap_or_else(|e| e.into_inner())
662            .insert(call_id.to_string(), stream_publicly);
663        self.notify(
664            "session/update",
665            serde_json::json!({
666                "sessionId": session_id,
667                "update": {
668                    "sessionUpdate": "call_start",
669                    "content": {
670                        "toolCallId": call_id,
671                        "call_type": call_type,
672                        "name": name,
673                        "script": script,
674                        "metadata": metadata,
675                    },
676                },
677            }),
678        );
679    }
680
681    /// Send a `session/update` with `call_progress` — a streaming token delta
682    /// from an in-flight LLM call.
683    pub fn send_call_progress(
684        &self,
685        call_id: &str,
686        delta: &str,
687        accumulated_tokens: u64,
688        user_visible: bool,
689    ) {
690        let session_id = self.get_session_id();
691        let (visible_text, visible_delta) = {
692            let stream_publicly = self
693                .visible_call_streams
694                .lock()
695                .unwrap_or_else(|e| e.into_inner())
696                .get(call_id)
697                .copied()
698                .unwrap_or(true);
699            let mut states = self
700                .visible_call_states
701                .lock()
702                .unwrap_or_else(|e| e.into_inner());
703            let state = states.entry(call_id.to_string()).or_default();
704            state.push(delta, stream_publicly)
705        };
706        self.notify(
707            "session/update",
708            serde_json::json!({
709                "sessionId": session_id,
710                "update": {
711                    "sessionUpdate": "call_progress",
712                    "content": {
713                        "toolCallId": call_id,
714                        "delta": delta,
715                        "accumulated_tokens": accumulated_tokens,
716                        "visible_text": visible_text,
717                        "visible_delta": visible_delta,
718                        "user_visible": user_visible,
719                    },
720                },
721            }),
722        );
723    }
724
725    /// Send a `session/update` with `call_end` — signals completion of a call.
726    pub fn send_call_end(
727        &self,
728        call_id: &str,
729        call_type: &str,
730        name: &str,
731        duration_ms: u64,
732        status: &str,
733        metadata: serde_json::Value,
734    ) {
735        let session_id = self.get_session_id();
736        let script = self.get_script_name();
737        self.visible_call_states
738            .lock()
739            .unwrap_or_else(|e| e.into_inner())
740            .remove(call_id);
741        self.visible_call_streams
742            .lock()
743            .unwrap_or_else(|e| e.into_inner())
744            .remove(call_id);
745        self.notify(
746            "session/update",
747            serde_json::json!({
748                "sessionId": session_id,
749                "update": {
750                    "sessionUpdate": "call_end",
751                    "content": {
752                        "toolCallId": call_id,
753                        "call_type": call_type,
754                        "name": name,
755                        "script": script,
756                        "duration_ms": duration_ms,
757                        "status": status,
758                        "metadata": metadata,
759                    },
760                },
761            }),
762        );
763    }
764
765    /// Send a worker lifecycle update for delegated/background execution.
766    pub fn send_worker_update(
767        &self,
768        worker_id: &str,
769        worker_name: &str,
770        status: &str,
771        metadata: serde_json::Value,
772        audit: Option<&MutationSessionRecord>,
773    ) {
774        let session_id = self.get_session_id();
775        let script = self.get_script_name();
776        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
777        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
778        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
779        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
780        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
781        let lifecycle = serde_json::json!({
782            "event": status,
783            "worker_id": worker_id,
784            "worker_name": worker_name,
785            "started_at": started_at,
786            "finished_at": finished_at,
787        });
788        self.notify(
789            "session/update",
790            serde_json::json!({
791                "sessionId": session_id,
792                "update": {
793                    "sessionUpdate": "worker_update",
794                    "content": {
795                        "worker_id": worker_id,
796                        "worker_name": worker_name,
797                        "status": status,
798                        "script": script,
799                        "started_at": started_at,
800                        "finished_at": finished_at,
801                        "snapshot_path": snapshot_path,
802                        "run_id": run_id,
803                        "run_path": run_path,
804                        "lifecycle": lifecycle,
805                        "audit": audit,
806                        "metadata": metadata,
807                    },
808                },
809            }),
810        );
811    }
812}
813
814/// Convert a serde_json::Value to a VmValue.
815pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
816    crate::stdlib::json_to_vm_value(val)
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822
823    #[test]
824    fn test_json_rpc_request_format() {
825        let request = crate::jsonrpc::request(
826            1,
827            "llm_call",
828            serde_json::json!({
829                "prompt": "Hello",
830                "system": "Be helpful",
831            }),
832        );
833        let s = serde_json::to_string(&request).unwrap();
834        assert!(s.contains("\"jsonrpc\":\"2.0\""));
835        assert!(s.contains("\"id\":1"));
836        assert!(s.contains("\"method\":\"llm_call\""));
837    }
838
839    #[test]
840    fn test_json_rpc_notification_format() {
841        let notification =
842            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
843        let s = serde_json::to_string(&notification).unwrap();
844        assert!(s.contains("\"method\":\"output\""));
845        assert!(!s.contains("\"id\""));
846    }
847
848    #[test]
849    fn test_json_rpc_error_response_parsing() {
850        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
851        assert!(response.get("error").is_some());
852        assert_eq!(
853            response["error"]["message"].as_str().unwrap(),
854            "Invalid request"
855        );
856    }
857
858    #[test]
859    fn test_json_rpc_success_response_parsing() {
860        let response = crate::jsonrpc::response(
861            1,
862            serde_json::json!({
863                "text": "Hello world",
864                "input_tokens": 10,
865                "output_tokens": 5,
866            }),
867        );
868        assert!(response.get("result").is_some());
869        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
870    }
871
872    #[test]
873    fn test_cancelled_flag() {
874        let cancelled = Arc::new(AtomicBool::new(false));
875        assert!(!cancelled.load(Ordering::SeqCst));
876        cancelled.store(true, Ordering::SeqCst);
877        assert!(cancelled.load(Ordering::SeqCst));
878    }
879
880    #[test]
881    fn queued_messages_are_filtered_by_delivery_mode() {
882        let runtime = tokio::runtime::Builder::new_current_thread()
883            .enable_all()
884            .build()
885            .unwrap();
886        runtime.block_on(async {
887            let bridge = HostBridge::from_parts(
888                Arc::new(Mutex::new(HashMap::new())),
889                Arc::new(AtomicBool::new(false)),
890                Arc::new(std::sync::Mutex::new(())),
891                1,
892            );
893            bridge
894                .push_queued_user_message("first".to_string(), "finish_step")
895                .await;
896            bridge
897                .push_queued_user_message("second".to_string(), "wait_for_completion")
898                .await;
899
900            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
901            assert_eq!(finish_step.len(), 1);
902            assert_eq!(finish_step[0].content, "first");
903
904            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
905            assert_eq!(turn_end.len(), 1);
906            assert_eq!(turn_end[0].content, "second");
907        });
908    }
909
910    #[test]
911    fn test_json_result_to_vm_value_string() {
912        let val = serde_json::json!("hello");
913        let vm_val = json_result_to_vm_value(&val);
914        assert_eq!(vm_val.display(), "hello");
915    }
916
917    #[test]
918    fn test_json_result_to_vm_value_dict() {
919        let val = serde_json::json!({"name": "test", "count": 42});
920        let vm_val = json_result_to_vm_value(&val);
921        let VmValue::Dict(d) = &vm_val else {
922            unreachable!("Expected Dict, got {:?}", vm_val);
923        };
924        assert_eq!(d.get("name").unwrap().display(), "test");
925        assert_eq!(d.get("count").unwrap().display(), "42");
926    }
927
928    #[test]
929    fn test_json_result_to_vm_value_null() {
930        let val = serde_json::json!(null);
931        let vm_val = json_result_to_vm_value(&val);
932        assert!(matches!(vm_val, VmValue::Nil));
933    }
934
935    #[test]
936    fn test_json_result_to_vm_value_nested() {
937        let val = serde_json::json!({
938            "text": "response",
939            "tool_calls": [
940                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
941            ],
942            "input_tokens": 100,
943            "output_tokens": 50,
944        });
945        let vm_val = json_result_to_vm_value(&val);
946        let VmValue::Dict(d) = &vm_val else {
947            unreachable!("Expected Dict, got {:?}", vm_val);
948        };
949        assert_eq!(d.get("text").unwrap().display(), "response");
950        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
951            unreachable!("Expected List for tool_calls");
952        };
953        assert_eq!(list.len(), 1);
954    }
955
956    #[test]
957    fn test_timeout_duration() {
958        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
959    }
960}