Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use harn_parser::diagnostic_codes::Code;
19
20use crate::orchestration::MutationSessionRecord;
21use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
22use crate::visible_text::VisibleTextState;
23use crate::vm::Vm;
24
25/// Default timeout for bridge calls (5 minutes).
26const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
27
28pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
29
30fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
31    Arc::new(move |line: &str| {
32        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
33        let mut stdout = std::io::stdout().lock();
34        stdout
35            .write_all(line.as_bytes())
36            .map_err(|e| format!("Bridge write error: {e}"))?;
37        stdout
38            .write_all(b"\n")
39            .map_err(|e| format!("Bridge write error: {e}"))?;
40        stdout
41            .flush()
42            .map_err(|e| format!("Bridge flush error: {e}"))?;
43        Ok(())
44    })
45}
46
47/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
48///
49/// The bridge sends requests to the host on stdout and receives responses
50/// on stdin. A background task reads stdin and dispatches responses to
51/// waiting callers by request ID. All stdout writes are serialized through
52/// a mutex to prevent interleaving.
53pub struct HostBridge {
54    next_id: AtomicU64,
55    /// Pending request waiters, keyed by JSON-RPC id.
56    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
57    /// Whether the host has sent a cancel notification.
58    cancelled: Arc<AtomicBool>,
59    /// Wakes pending host calls when cancellation arrives.
60    cancel_notify: Arc<Notify>,
61    /// Transport writer used to send JSON-RPC to the host.
62    writer: HostBridgeWriter,
63    /// ACP session ID (set in ACP mode for session-scoped notifications).
64    session_id: std::sync::Mutex<String>,
65    /// Name of the currently executing Harn script (without .harn suffix).
66    script_name: std::sync::Mutex<String>,
67    /// Transcript injections queued by the host while a run is active.
68    queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>>,
69    /// Host-triggered resume signal for daemon agents.
70    resume_requested: Arc<AtomicBool>,
71    /// Host-triggered skill-registry invalidation signal. Set when the
72    /// host sends a `skills/update` notification; consumed by the CLI
73    /// between runs (watch mode, long-running agents) to rebuild the
74    /// layered skill catalog from its current filesystem + host state.
75    skills_reload_requested: Arc<AtomicBool>,
76    /// Whether the current daemon-mode agent loop is blocked in idle wait.
77    daemon_idle: Arc<AtomicBool>,
78    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
79    /// finalize during this prompt. Read once by the ACP adapter when the
80    /// pipeline returns and populated by `host_agent_session_finalize`.
81    /// Pipelines that don't run an agent loop leave this `None`, in which
82    /// case the adapter falls back to `end_turn`.
83    prompt_stop_reason: std::sync::Mutex<Option<String>>,
84    /// Per-call visible assistant text state for call_progress notifications.
85    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
86    /// Whether an LLM call's deltas should be exposed to end users while streaming.
87    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
88    /// Optional in-process host-module backend used by `harn playground`.
89    in_process: Option<InProcessHost>,
90}
91
92struct InProcessHost {
93    module_path: PathBuf,
94    exported_functions: BTreeMap<String, Rc<VmClosure>>,
95    vm: Vm,
96}
97
98impl InProcessHost {
99    async fn dispatch(
100        &self,
101        method: &str,
102        params: serde_json::Value,
103    ) -> Result<serde_json::Value, VmError> {
104        match method {
105            "builtin_call" => {
106                let name = params
107                    .get("name")
108                    .and_then(|value| value.as_str())
109                    .unwrap_or_default();
110                let args = params
111                    .get("args")
112                    .and_then(|value| value.as_array())
113                    .cloned()
114                    .unwrap_or_default()
115                    .into_iter()
116                    .map(|value| json_result_to_vm_value(&value))
117                    .collect::<Vec<_>>();
118                self.invoke_export(name, &args).await
119            }
120            "host/tools/list" => self
121                .invoke_optional_export("host_tools_list", &[])
122                .await
123                .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
124            "session/request_permission" => self.request_permission(params).await,
125            other => Err(VmError::Runtime(format!(
126                "playground host backend does not implement bridge method '{other}'"
127            ))),
128        }
129    }
130
131    async fn invoke_export(
132        &self,
133        name: &str,
134        args: &[VmValue],
135    ) -> Result<serde_json::Value, VmError> {
136        let Some(closure) = self.exported_functions.get(name) else {
137            return Err(VmError::Runtime(format!(
138                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
139                self.module_path.display()
140            )));
141        };
142
143        let mut vm = self.vm.child_vm_for_host();
144        let result = vm.call_closure_pub(closure, args).await?;
145        Ok(crate::llm::vm_value_to_json(&result))
146    }
147
148    async fn invoke_optional_export(
149        &self,
150        name: &str,
151        args: &[VmValue],
152    ) -> Result<Option<serde_json::Value>, VmError> {
153        if !self.exported_functions.contains_key(name) {
154            return Ok(None);
155        }
156        self.invoke_export(name, args).await.map(Some)
157    }
158
159    async fn request_permission(
160        &self,
161        params: serde_json::Value,
162    ) -> Result<serde_json::Value, VmError> {
163        let Some(closure) = self.exported_functions.get("request_permission") else {
164            return Ok(serde_json::json!({ "granted": true }));
165        };
166
167        let tool_name = params
168            .get("toolCall")
169            .and_then(|tool_call| tool_call.get("toolName"))
170            .and_then(|value| value.as_str())
171            .unwrap_or_default();
172        let tool_args = params
173            .get("toolCall")
174            .and_then(|tool_call| tool_call.get("rawInput"))
175            .map(json_result_to_vm_value)
176            .unwrap_or(VmValue::Nil);
177        let full_payload = json_result_to_vm_value(&params);
178
179        let arg_count = closure.func.params.len();
180        let args = if arg_count >= 3 {
181            vec![
182                VmValue::String(Rc::from(tool_name.to_string())),
183                tool_args,
184                full_payload,
185            ]
186        } else if arg_count == 2 {
187            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
188        } else if arg_count == 1 {
189            vec![full_payload]
190        } else {
191            Vec::new()
192        };
193
194        let mut vm = self.vm.child_vm_for_host();
195        let result = vm.call_closure_pub(closure, &args).await?;
196        let payload = match result {
197            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
198            VmValue::String(reason) if !reason.is_empty() => {
199                serde_json::json!({ "granted": false, "reason": reason.to_string() })
200            }
201            other => {
202                let json = crate::llm::vm_value_to_json(&other);
203                if json
204                    .get("granted")
205                    .and_then(|value| value.as_bool())
206                    .is_some()
207                    || json.get("outcome").is_some()
208                {
209                    json
210                } else {
211                    serde_json::json!({ "granted": other.is_truthy() })
212                }
213            }
214        };
215        Ok(payload)
216    }
217}
218
219#[derive(Clone, Copy, Debug, PartialEq, Eq)]
220pub enum QueuedUserMessageMode {
221    InterruptImmediate,
222    FinishStep,
223    WaitForCompletion,
224}
225
226#[derive(Clone, Copy, Debug, PartialEq, Eq)]
227pub enum DeliveryCheckpoint {
228    InterruptImmediate,
229    AfterCurrentOperation,
230    EndOfInteraction,
231}
232
233impl QueuedUserMessageMode {
234    fn from_str(value: &str) -> Self {
235        match value {
236            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
237            "finish_step" | "after_current_operation" => Self::FinishStep,
238            _ => Self::WaitForCompletion,
239        }
240    }
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub struct QueuedUserMessage {
245    pub content: String,
246    pub mode: QueuedUserMessageMode,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub struct QueuedReminder {
251    pub reminder: crate::llm::helpers::SystemReminder,
252    pub mode: QueuedUserMessageMode,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq)]
256pub enum QueuedTranscriptInjection {
257    User(QueuedUserMessage),
258    Reminder(QueuedReminder),
259}
260
261impl QueuedTranscriptInjection {
262    fn mode(&self) -> QueuedUserMessageMode {
263        match self {
264            Self::User(message) => message.mode,
265            Self::Reminder(reminder) => reminder.mode,
266        }
267    }
268}
269
270fn queue_user_message_from_params(params: &serde_json::Value) -> Option<QueuedUserMessage> {
271    let content = params
272        .get("content")
273        .and_then(|v| v.as_str())
274        .unwrap_or("")
275        .to_string();
276    if content.is_empty() {
277        return None;
278    }
279    let mode = QueuedUserMessageMode::from_str(
280        params
281            .get("mode")
282            .and_then(|v| v.as_str())
283            .unwrap_or("wait_for_completion"),
284    );
285    Some(QueuedUserMessage { content, mode })
286}
287
288fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
289    format!(
290        "{}: {}",
291        Code::ReminderUnknownOption.as_str(),
292        message.as_ref()
293    )
294}
295
296fn session_remind_shape_error(message: impl AsRef<str>) -> String {
297    format!(
298        "{}: {}",
299        Code::ReminderInvalidShape.as_str(),
300        message.as_ref()
301    )
302}
303
304fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
305    format!(
306        "{}: {}",
307        Code::ReminderUnknownPropagate.as_str(),
308        message.as_ref()
309    )
310}
311
312fn string_field(
313    map: &serde_json::Map<String, serde_json::Value>,
314    key: &str,
315    required: bool,
316) -> Result<Option<String>, String> {
317    match map.get(key) {
318        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
319            format!("`{key}` must be a non-empty string"),
320        )),
321        None | Some(serde_json::Value::Null) => Ok(None),
322        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
323            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
324        ),
325        Some(serde_json::Value::String(value)) => {
326            let trimmed = value.trim();
327            if trimmed.is_empty() {
328                Ok(None)
329            } else {
330                Ok(Some(trimmed.to_string()))
331            }
332        }
333        Some(other) => Err(session_remind_shape_error(format!(
334            "`{key}` must be a string, got {other}"
335        ))),
336    }
337}
338
339fn bool_field(
340    map: &serde_json::Map<String, serde_json::Value>,
341    key: &str,
342) -> Result<Option<bool>, String> {
343    match map.get(key) {
344        None | Some(serde_json::Value::Null) => Ok(None),
345        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
346        Some(other) => Err(session_remind_shape_error(format!(
347            "`{key}` must be a bool, got {other}"
348        ))),
349    }
350}
351
352fn int_field(
353    map: &serde_json::Map<String, serde_json::Value>,
354    key: &str,
355) -> Result<Option<i64>, String> {
356    match map.get(key) {
357        None | Some(serde_json::Value::Null) => Ok(None),
358        Some(serde_json::Value::Number(value)) => {
359            let Some(value) = value.as_i64() else {
360                return Err(session_remind_shape_error(format!(
361                    "`{key}` must be an integer"
362                )));
363            };
364            Ok(Some(value))
365        }
366        Some(other) => Err(session_remind_shape_error(format!(
367            "`{key}` must be an int, got {other}"
368        ))),
369    }
370}
371
372fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
373    let Some(value) = map.get("tags") else {
374        return Ok(Vec::new());
375    };
376    if value.is_null() {
377        return Ok(Vec::new());
378    }
379    let Some(values) = value.as_array() else {
380        return Err(session_remind_shape_error("`tags` must be a list"));
381    };
382    let mut tags = Vec::new();
383    for value in values {
384        let Some(tag) = value.as_str() else {
385            return Err(session_remind_shape_error(format!(
386                "`tags` entries must be strings, got {value}"
387            )));
388        };
389        let tag = tag.trim();
390        if tag.is_empty() {
391            return Err(session_remind_shape_error(
392                "`tags` entries must be non-empty strings",
393            ));
394        }
395        if !tags.iter().any(|existing| existing == tag) {
396            tags.push(tag.to_string());
397        }
398    }
399    Ok(tags)
400}
401
402fn session_remind_payload_from_value(
403    value: &serde_json::Value,
404) -> Result<crate::llm::helpers::SystemReminder, String> {
405    let Some(map) = value.as_object() else {
406        return Err(session_remind_shape_error(
407            "session/remind payload must be a reminder object",
408        ));
409    };
410    const ALLOWED: &[&str] = &[
411        "_meta",
412        "body",
413        "dedupe_key",
414        "fired_at_turn",
415        "id",
416        "preserve_on_compact",
417        "propagate",
418        "role_hint",
419        "source",
420        "tags",
421        "ttl_turns",
422    ];
423    let unknown = map
424        .keys()
425        .filter(|key| !ALLOWED.contains(&key.as_str()))
426        .map(String::as_str)
427        .collect::<Vec<_>>();
428    if !unknown.is_empty() {
429        if unknown.contains(&"content") {
430            return Err(session_remind_shape_error(
431                "session/remind expects reminder `body`, not user-message `content`",
432            ));
433        }
434        return Err(reminder_unknown_option_error(format!(
435            "unknown reminder option(s): {}",
436            unknown.join(", ")
437        )));
438    }
439    if let Some(meta) = map.get("_meta") {
440        if !meta.is_null() && !meta.is_object() {
441            return Err(session_remind_shape_error("`_meta` must be an object"));
442        }
443    }
444    let ttl_turns = int_field(map, "ttl_turns")?;
445    if let Some(value) = ttl_turns {
446        if value <= 0 {
447            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
448        }
449    }
450    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
451    if fired_at_turn < 0 {
452        return Err(session_remind_shape_error(
453            "`fired_at_turn` must be >= 0 when provided",
454        ));
455    }
456    match string_field(map, "source", false)?.as_deref() {
457        None | Some("bridge") => {}
458        Some(_) => {
459            return Err(session_remind_shape_error(
460                "`source` for session/remind must be bridge when provided",
461            ))
462        }
463    }
464    let propagate = match string_field(map, "propagate", false)?.as_deref() {
465        None => crate::llm::helpers::ReminderPropagate::Session,
466        Some("all") => crate::llm::helpers::ReminderPropagate::All,
467        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
468        Some("none") => crate::llm::helpers::ReminderPropagate::None,
469        Some(_) => {
470            return Err(reminder_unknown_propagate_error(
471                "`propagate` must be one of all, session, or none",
472            ))
473        }
474    };
475    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
476        None => crate::llm::helpers::ReminderRoleHint::System,
477        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
478        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
479        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
480        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
481        Some(_) => {
482            return Err(session_remind_shape_error(
483                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
484            ))
485        }
486    };
487    Ok(crate::llm::helpers::SystemReminder {
488        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
489        tags: tags_field(map)?,
490        dedupe_key: string_field(map, "dedupe_key", false)?,
491        ttl_turns,
492        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
493        propagate,
494        role_hint,
495        source: crate::llm::helpers::ReminderSource::Bridge,
496        body: string_field(map, "body", true)?.unwrap_or_default(),
497        fired_at_turn,
498        originating_agent_id: None,
499    })
500}
501
502fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
503    let mode = QueuedUserMessageMode::from_str(
504        params
505            .get("mode")
506            .and_then(|value| value.as_str())
507            .unwrap_or("wait_for_completion"),
508    );
509    let reminder_value = if let Some(reminder) = params.get("reminder") {
510        reminder.clone()
511    } else {
512        let Some(params) = params.as_object() else {
513            return Err(session_remind_shape_error(
514                "session/remind params must be an object",
515            ));
516        };
517        let mut reminder = params.clone();
518        reminder.remove("mode");
519        reminder.remove("sessionId");
520        reminder.remove("session_id");
521        serde_json::Value::Object(reminder)
522    };
523    Ok(QueuedReminder {
524        reminder: session_remind_payload_from_value(&reminder_value)?,
525        mode,
526    })
527}
528
529// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
530#[allow(clippy::new_without_default)]
531impl HostBridge {
532    /// Create a new bridge and spawn the stdin reader task.
533    ///
534    /// Must be called within a tokio LocalSet (uses spawn_local for the
535    /// stdin reader since it's single-threaded).
536    pub fn new() -> Self {
537        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
538            Arc::new(Mutex::new(HashMap::new()));
539        let cancelled = Arc::new(AtomicBool::new(false));
540        let cancel_notify = Arc::new(Notify::new());
541        let queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>> =
542            Arc::new(Mutex::new(VecDeque::new()));
543        let resume_requested = Arc::new(AtomicBool::new(false));
544        let skills_reload_requested = Arc::new(AtomicBool::new(false));
545        let daemon_idle = Arc::new(AtomicBool::new(false));
546
547        // Stdin reader: reads JSON-RPC lines and dispatches responses
548        let pending_clone = pending.clone();
549        let cancelled_clone = cancelled.clone();
550        let cancel_notify_clone = cancel_notify.clone();
551        let queued_clone = queued_transcript_injections.clone();
552        let resume_clone = resume_requested.clone();
553        let skills_reload_clone = skills_reload_requested.clone();
554        tokio::task::spawn_local(async move {
555            let stdin = tokio::io::stdin();
556            let reader = tokio::io::BufReader::new(stdin);
557            let mut lines = reader.lines();
558
559            while let Ok(Some(line)) = lines.next_line().await {
560                let line = line.trim().to_string();
561                if line.is_empty() {
562                    continue;
563                }
564
565                let msg: serde_json::Value = match serde_json::from_str(&line) {
566                    Ok(v) => v,
567                    Err(_) => continue,
568                };
569
570                // Notifications have no id; responses have one.
571                if msg.get("id").is_none() {
572                    if let Some(method) = msg["method"].as_str() {
573                        if method == "cancel" {
574                            cancelled_clone.store(true, Ordering::SeqCst);
575                            cancel_notify_clone.notify_waiters();
576                        } else if method == "agent/resume" {
577                            resume_clone.store(true, Ordering::SeqCst);
578                        } else if method == "skills/update" {
579                            skills_reload_clone.store(true, Ordering::SeqCst);
580                        } else if method == "user_message"
581                            || method == "session/input"
582                            || method == "agent/user_message"
583                        {
584                            let params = &msg["params"];
585                            if let Some(message) = queue_user_message_from_params(params) {
586                                queued_clone
587                                    .lock()
588                                    .await
589                                    .push_back(QueuedTranscriptInjection::User(message));
590                            }
591                        } else if method == "session/remind" {
592                            let params = &msg["params"];
593                            if let Ok(reminder) = queued_session_remind_from_params(params) {
594                                queued_clone
595                                    .lock()
596                                    .await
597                                    .push_back(QueuedTranscriptInjection::Reminder(reminder));
598                            }
599                        }
600                    }
601                    continue;
602                }
603
604                if let Some(id) = msg["id"].as_u64() {
605                    let mut pending = pending_clone.lock().await;
606                    if let Some(sender) = pending.remove(&id) {
607                        let _ = sender.send(msg);
608                    }
609                }
610            }
611
612            // stdin closed: drop pending senders to cancel waiters.
613            let mut pending = pending_clone.lock().await;
614            pending.clear();
615        });
616
617        Self {
618            next_id: AtomicU64::new(1),
619            pending,
620            cancelled,
621            cancel_notify,
622            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
623            session_id: std::sync::Mutex::new(String::new()),
624            script_name: std::sync::Mutex::new(String::new()),
625            queued_transcript_injections,
626            resume_requested,
627            skills_reload_requested,
628            daemon_idle,
629            prompt_stop_reason: std::sync::Mutex::new(None),
630            visible_call_states: std::sync::Mutex::new(HashMap::new()),
631            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
632            in_process: None,
633        }
634    }
635
636    /// Create a bridge from pre-existing shared state.
637    ///
638    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
639    /// responsible for dispatching responses into `pending`.  This is used
640    /// by ACP mode which already has its own stdin reader.
641    pub fn from_parts(
642        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
643        cancelled: Arc<AtomicBool>,
644        stdout_lock: Arc<std::sync::Mutex<()>>,
645        start_id: u64,
646    ) -> Self {
647        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
648    }
649
650    pub fn from_parts_with_writer(
651        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
652        cancelled: Arc<AtomicBool>,
653        writer: HostBridgeWriter,
654        start_id: u64,
655    ) -> Self {
656        Self::from_parts_with_writer_and_cancel_notify(
657            pending,
658            cancelled,
659            Arc::new(Notify::new()),
660            writer,
661            start_id,
662        )
663    }
664
665    pub fn from_parts_with_writer_and_cancel_notify(
666        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
667        cancelled: Arc<AtomicBool>,
668        cancel_notify: Arc<Notify>,
669        writer: HostBridgeWriter,
670        start_id: u64,
671    ) -> Self {
672        Self {
673            next_id: AtomicU64::new(start_id),
674            pending,
675            cancelled,
676            cancel_notify,
677            writer,
678            session_id: std::sync::Mutex::new(String::new()),
679            script_name: std::sync::Mutex::new(String::new()),
680            queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
681            resume_requested: Arc::new(AtomicBool::new(false)),
682            skills_reload_requested: Arc::new(AtomicBool::new(false)),
683            daemon_idle: Arc::new(AtomicBool::new(false)),
684            prompt_stop_reason: std::sync::Mutex::new(None),
685            visible_call_states: std::sync::Mutex::new(HashMap::new()),
686            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
687            in_process: None,
688        }
689    }
690
691    /// Create an in-process host bridge backed by exported functions from a
692    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
693    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
694        let exported_functions = vm.load_module_exports(module_path).await?;
695        Ok(Self {
696            next_id: AtomicU64::new(1),
697            pending: Arc::new(Mutex::new(HashMap::new())),
698            cancelled: Arc::new(AtomicBool::new(false)),
699            cancel_notify: Arc::new(Notify::new()),
700            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
701            session_id: std::sync::Mutex::new(String::new()),
702            script_name: std::sync::Mutex::new(String::new()),
703            queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
704            resume_requested: Arc::new(AtomicBool::new(false)),
705            skills_reload_requested: Arc::new(AtomicBool::new(false)),
706            daemon_idle: Arc::new(AtomicBool::new(false)),
707            prompt_stop_reason: std::sync::Mutex::new(None),
708            visible_call_states: std::sync::Mutex::new(HashMap::new()),
709            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
710            in_process: Some(InProcessHost {
711                module_path: module_path.to_path_buf(),
712                exported_functions,
713                vm,
714            }),
715        })
716    }
717
718    /// Set the ACP session ID for session-scoped notifications.
719    pub fn set_session_id(&self, id: &str) {
720        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
721    }
722
723    /// Set the currently executing script name (without .harn suffix).
724    pub fn set_script_name(&self, name: &str) {
725        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
726    }
727
728    /// Get the current script name.
729    fn get_script_name(&self) -> String {
730        self.script_name
731            .lock()
732            .unwrap_or_else(|e| e.into_inner())
733            .clone()
734    }
735
736    /// Get the session ID.
737    pub fn get_session_id(&self) -> String {
738        self.session_id
739            .lock()
740            .unwrap_or_else(|e| e.into_inner())
741            .clone()
742    }
743
744    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
745    fn write_line(&self, line: &str) -> Result<(), VmError> {
746        (self.writer)(line).map_err(VmError::Runtime)
747    }
748
749    /// Send a JSON-RPC request to the host and wait for the response.
750    /// Times out after 5 minutes to prevent deadlocks.
751    pub async fn call(
752        &self,
753        method: &str,
754        params: serde_json::Value,
755    ) -> Result<serde_json::Value, VmError> {
756        if let Some(in_process) = &self.in_process {
757            return in_process.dispatch(method, params).await;
758        }
759
760        if self.is_cancelled() {
761            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
762        }
763
764        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
765        let cancel_wait = self.cancel_notify.notified();
766        tokio::pin!(cancel_wait);
767
768        let request = crate::jsonrpc::request(id, method, params);
769
770        let (tx, rx) = oneshot::channel();
771        {
772            let mut pending = self.pending.lock().await;
773            pending.insert(id, tx);
774        }
775
776        let line = serde_json::to_string(&request)
777            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
778        if let Err(e) = self.write_line(&line) {
779            let mut pending = self.pending.lock().await;
780            pending.remove(&id);
781            return Err(e);
782        }
783
784        if self.is_cancelled() {
785            let mut pending = self.pending.lock().await;
786            pending.remove(&id);
787            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
788        }
789
790        let response = tokio::select! {
791            result = rx => match result {
792                Ok(msg) => msg,
793                Err(_) => {
794                    // Sender dropped: host closed or stdin reader exited.
795                    return Err(VmError::Runtime(
796                        "Bridge: host closed connection before responding".into(),
797                    ));
798                }
799            },
800            _ = &mut cancel_wait => {
801                let mut pending = self.pending.lock().await;
802                pending.remove(&id);
803                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
804            }
805            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
806                let mut pending = self.pending.lock().await;
807                pending.remove(&id);
808                return Err(VmError::Runtime(format!(
809                    "Bridge: host did not respond to '{method}' within {}s",
810                    DEFAULT_TIMEOUT.as_secs()
811                )));
812            }
813        };
814
815        if let Some(error) = response.get("error") {
816            let message = error["message"].as_str().unwrap_or("Unknown host error");
817            let code = error["code"].as_i64().unwrap_or(-1);
818            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
819            if code == -32001 {
820                return Err(VmError::CategorizedError {
821                    message: message.to_string(),
822                    category: ErrorCategory::ToolRejected,
823                });
824            }
825            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
826        }
827
828        Ok(response["result"].clone())
829    }
830
831    /// Send a JSON-RPC notification to the host (no response expected).
832    /// Serialized through the stdout mutex to prevent interleaving.
833    pub fn notify(&self, method: &str, params: serde_json::Value) {
834        let notification = crate::jsonrpc::notification(method, params);
835        if self.in_process.is_some() {
836            return;
837        }
838        if let Ok(line) = serde_json::to_string(&notification) {
839            let _ = self.write_line(&line);
840        }
841    }
842
843    /// Check if the host has sent a cancel notification.
844    pub fn is_cancelled(&self) -> bool {
845        self.cancelled.load(Ordering::SeqCst)
846    }
847
848    pub fn take_resume_signal(&self) -> bool {
849        self.resume_requested.swap(false, Ordering::SeqCst)
850    }
851
852    pub fn signal_resume(&self) {
853        self.resume_requested.store(true, Ordering::SeqCst);
854    }
855
856    pub fn set_daemon_idle(&self, idle: bool) {
857        self.daemon_idle.store(idle, Ordering::SeqCst);
858    }
859
860    pub fn is_daemon_idle(&self) -> bool {
861        self.daemon_idle.load(Ordering::SeqCst)
862    }
863
864    /// Record the canonical ACP `stopReason` for the current prompt. The
865    /// last writer wins, which matches the semantic that an outer
866    /// `agent_loop` (the one whose result the user observes) always
867    /// finalizes after any inner loops it spawned.
868    pub fn set_prompt_stop_reason(&self, reason: &str) {
869        *self
870            .prompt_stop_reason
871            .lock()
872            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
873    }
874
875    /// Consume any prompt stop reason recorded during this prompt. The
876    /// ACP adapter calls this once after the pipeline returns; pipelines
877    /// that didn't run an `agent_loop` see `None` and the adapter falls
878    /// back to `end_turn`.
879    pub fn take_prompt_stop_reason(&self) -> Option<String> {
880        self.prompt_stop_reason
881            .lock()
882            .unwrap_or_else(|e| e.into_inner())
883            .take()
884    }
885
886    /// Consume any pending `skills/update` signal the host has sent.
887    /// Returns `true` exactly once per notification, letting callers
888    /// trigger a layered-discovery rebuild without polling false
889    /// positives. See issue #73 for the hot-reload contract.
890    pub fn take_skills_reload_signal(&self) -> bool {
891        self.skills_reload_requested.swap(false, Ordering::SeqCst)
892    }
893
894    /// Manually mark the skill catalog as stale. Used by tests and by
895    /// the CLI when an internal event (e.g. `harn install`) should
896    /// trigger the same rebuild a `skills/update` notification would.
897    pub fn signal_skills_reload(&self) {
898        self.skills_reload_requested.store(true, Ordering::SeqCst);
899    }
900
901    /// Call the host's `skills/list` RPC and return the raw JSON array
902    /// it responded with. Shape:
903    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
904    /// The CLI adapter converts each entry into a
905    /// [`crate::skills::SkillManifestRef`].
906    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
907        let result = self.call("skills/list", serde_json::json!({})).await?;
908        match result {
909            serde_json::Value::Array(items) => Ok(items),
910            serde_json::Value::Object(map) => match map.get("skills") {
911                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
912                _ => Err(VmError::Runtime(
913                    "skills/list: host response must be an array or { skills: [...] }".into(),
914                )),
915            },
916            _ => Err(VmError::Runtime(
917                "skills/list: unexpected response shape".into(),
918            )),
919        }
920    }
921
922    /// Call the host's `host/tools/list` RPC and return normalized tool
923    /// descriptors. Shape:
924    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
925    /// The bridge also accepts `{ "tools": [...] }` and
926    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
927    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
928        let result = self.call("host/tools/list", serde_json::json!({})).await?;
929        parse_host_tools_list_response(result)
930    }
931
932    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
933    /// raw JSON body so the CLI can inspect both the frontmatter fields
934    /// and the skill markdown body in whatever shape the host sends.
935    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
936        self.call("skills/fetch", serde_json::json!({ "id": id }))
937            .await
938    }
939
940    pub async fn push_queued_user_message(&self, content: String, mode: &str) {
941        self.queued_transcript_injections
942            .lock()
943            .await
944            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
945                content,
946                mode: QueuedUserMessageMode::from_str(mode),
947            }));
948    }
949
950    pub async fn push_queued_session_remind_from_params(
951        &self,
952        params: &serde_json::Value,
953    ) -> Result<String, String> {
954        let reminder = queued_session_remind_from_params(params)?;
955        let reminder_id = reminder.reminder.id.clone();
956        self.queued_transcript_injections
957            .lock()
958            .await
959            .push_back(QueuedTranscriptInjection::Reminder(reminder));
960        Ok(reminder_id)
961    }
962
963    pub async fn take_queued_user_messages(
964        &self,
965        include_interrupt_immediate: bool,
966        include_finish_step: bool,
967        include_wait_for_completion: bool,
968    ) -> Vec<QueuedUserMessage> {
969        let mut queue = self.queued_transcript_injections.lock().await;
970        let mut selected = Vec::new();
971        let mut retained = VecDeque::new();
972        while let Some(injection) = queue.pop_front() {
973            let should_take = match injection.mode() {
974                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
975                QueuedUserMessageMode::FinishStep => include_finish_step,
976                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
977            };
978            match (should_take, injection) {
979                (true, QueuedTranscriptInjection::User(message)) => selected.push(message),
980                (_, injection) => retained.push_back(injection),
981            }
982        }
983        *queue = retained;
984        selected
985    }
986
987    pub async fn take_queued_transcript_injections(
988        &self,
989        include_interrupt_immediate: bool,
990        include_finish_step: bool,
991        include_wait_for_completion: bool,
992    ) -> Vec<QueuedTranscriptInjection> {
993        let mut queue = self.queued_transcript_injections.lock().await;
994        let mut selected = Vec::new();
995        let mut retained = VecDeque::new();
996        while let Some(injection) = queue.pop_front() {
997            let should_take = match injection.mode() {
998                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
999                QueuedUserMessageMode::FinishStep => include_finish_step,
1000                QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
1001            };
1002            if should_take {
1003                selected.push(injection);
1004            } else {
1005                retained.push_back(injection);
1006            }
1007        }
1008        *queue = retained;
1009        selected
1010    }
1011
1012    pub async fn take_queued_user_messages_for(
1013        &self,
1014        checkpoint: DeliveryCheckpoint,
1015    ) -> Vec<QueuedUserMessage> {
1016        match checkpoint {
1017            DeliveryCheckpoint::InterruptImmediate => {
1018                self.take_queued_user_messages(true, false, false).await
1019            }
1020            DeliveryCheckpoint::AfterCurrentOperation => {
1021                self.take_queued_user_messages(false, true, false).await
1022            }
1023            DeliveryCheckpoint::EndOfInteraction => {
1024                self.take_queued_user_messages(false, false, true).await
1025            }
1026        }
1027    }
1028
1029    pub async fn take_queued_transcript_injections_for(
1030        &self,
1031        checkpoint: DeliveryCheckpoint,
1032    ) -> Vec<QueuedTranscriptInjection> {
1033        match checkpoint {
1034            DeliveryCheckpoint::InterruptImmediate => {
1035                self.take_queued_transcript_injections(true, false, false)
1036                    .await
1037            }
1038            DeliveryCheckpoint::AfterCurrentOperation => {
1039                self.take_queued_transcript_injections(false, true, false)
1040                    .await
1041            }
1042            DeliveryCheckpoint::EndOfInteraction => {
1043                self.take_queued_transcript_injections(false, false, true)
1044                    .await
1045            }
1046        }
1047    }
1048
1049    /// Send an output notification (for log/print in bridge mode).
1050    pub fn send_output(&self, text: &str) {
1051        self.notify("output", serde_json::json!({"text": text}));
1052    }
1053
1054    /// Send a progress notification with optional numeric progress and structured data.
1055    pub fn send_progress(
1056        &self,
1057        phase: &str,
1058        message: &str,
1059        progress: Option<i64>,
1060        total: Option<i64>,
1061        data: Option<serde_json::Value>,
1062    ) {
1063        let mut payload = serde_json::json!({"phase": phase, "message": message});
1064        if let Some(p) = progress {
1065            payload["progress"] = serde_json::json!(p);
1066        }
1067        if let Some(t) = total {
1068            payload["total"] = serde_json::json!(t);
1069        }
1070        if let Some(d) = data {
1071            payload["data"] = d;
1072        }
1073        self.notify("progress", payload);
1074    }
1075
1076    /// Send a structured log notification.
1077    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1078        let mut payload = serde_json::json!({"level": level, "message": message});
1079        if let Some(f) = fields {
1080            payload["fields"] = f;
1081        }
1082        self.notify("log", payload);
1083    }
1084
1085    /// Send a `session/update` with `call_start` — signals the beginning of
1086    /// an LLM call, tool call, or builtin call for observability.
1087    pub fn send_call_start(
1088        &self,
1089        call_id: &str,
1090        call_type: &str,
1091        name: &str,
1092        metadata: serde_json::Value,
1093    ) {
1094        let session_id = self.get_session_id();
1095        let script = self.get_script_name();
1096        let stream_publicly = metadata
1097            .get("stream_publicly")
1098            .and_then(|value| value.as_bool())
1099            .unwrap_or(true);
1100        self.visible_call_streams
1101            .lock()
1102            .unwrap_or_else(|e| e.into_inner())
1103            .insert(call_id.to_string(), stream_publicly);
1104        self.notify(
1105            "session/update",
1106            serde_json::json!({
1107                "sessionId": session_id,
1108                "update": {
1109                    "sessionUpdate": "call_start",
1110                    "content": {
1111                        "toolCallId": call_id,
1112                        "call_type": call_type,
1113                        "name": name,
1114                        "script": script,
1115                        "metadata": metadata,
1116                    },
1117                },
1118            }),
1119        );
1120    }
1121
1122    /// Send a `session/update` with `call_progress` — a streaming token delta
1123    /// from an in-flight LLM call.
1124    pub fn send_call_progress(
1125        &self,
1126        call_id: &str,
1127        delta: &str,
1128        accumulated_tokens: u64,
1129        user_visible: bool,
1130    ) {
1131        let session_id = self.get_session_id();
1132        let (visible_text, visible_delta) = {
1133            let stream_publicly = self
1134                .visible_call_streams
1135                .lock()
1136                .unwrap_or_else(|e| e.into_inner())
1137                .get(call_id)
1138                .copied()
1139                .unwrap_or(true);
1140            let mut states = self
1141                .visible_call_states
1142                .lock()
1143                .unwrap_or_else(|e| e.into_inner());
1144            let state = states.entry(call_id.to_string()).or_default();
1145            state.push(delta, stream_publicly)
1146        };
1147        self.notify(
1148            "session/update",
1149            serde_json::json!({
1150                "sessionId": session_id,
1151                "update": {
1152                    "sessionUpdate": "call_progress",
1153                    "content": {
1154                        "toolCallId": call_id,
1155                        "delta": delta,
1156                        "accumulated_tokens": accumulated_tokens,
1157                        "visible_text": visible_text,
1158                        "visible_delta": visible_delta,
1159                        "user_visible": user_visible,
1160                    },
1161                },
1162            }),
1163        );
1164    }
1165
1166    /// Send a `session/update` with `call_end` — signals completion of a call.
1167    pub fn send_call_end(
1168        &self,
1169        call_id: &str,
1170        call_type: &str,
1171        name: &str,
1172        duration_ms: u64,
1173        status: &str,
1174        metadata: serde_json::Value,
1175    ) {
1176        let session_id = self.get_session_id();
1177        let script = self.get_script_name();
1178        self.visible_call_states
1179            .lock()
1180            .unwrap_or_else(|e| e.into_inner())
1181            .remove(call_id);
1182        self.visible_call_streams
1183            .lock()
1184            .unwrap_or_else(|e| e.into_inner())
1185            .remove(call_id);
1186        self.notify(
1187            "session/update",
1188            serde_json::json!({
1189                "sessionId": session_id,
1190                "update": {
1191                    "sessionUpdate": "call_end",
1192                    "content": {
1193                        "toolCallId": call_id,
1194                        "call_type": call_type,
1195                        "name": name,
1196                        "script": script,
1197                        "duration_ms": duration_ms,
1198                        "status": status,
1199                        "metadata": metadata,
1200                    },
1201                },
1202            }),
1203        );
1204    }
1205
1206    /// Send a worker lifecycle update for delegated/background execution.
1207    pub fn send_worker_update(
1208        &self,
1209        worker_id: &str,
1210        worker_name: &str,
1211        status: &str,
1212        metadata: serde_json::Value,
1213        audit: Option<&MutationSessionRecord>,
1214    ) {
1215        let session_id = self.get_session_id();
1216        let script = self.get_script_name();
1217        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1218        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1219        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1220        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1221        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1222        let lifecycle = serde_json::json!({
1223            "event": status,
1224            "worker_id": worker_id,
1225            "worker_name": worker_name,
1226            "started_at": started_at,
1227            "finished_at": finished_at,
1228        });
1229        self.notify(
1230            "session/update",
1231            serde_json::json!({
1232                "sessionId": session_id,
1233                "update": {
1234                    "sessionUpdate": "worker_update",
1235                    "content": {
1236                        "worker_id": worker_id,
1237                        "worker_name": worker_name,
1238                        "status": status,
1239                        "script": script,
1240                        "started_at": started_at,
1241                        "finished_at": finished_at,
1242                        "snapshot_path": snapshot_path,
1243                        "run_id": run_id,
1244                        "run_path": run_path,
1245                        "lifecycle": lifecycle,
1246                        "audit": audit,
1247                        "metadata": metadata,
1248                    },
1249                },
1250            }),
1251        );
1252    }
1253}
1254
1255/// Convert a serde_json::Value to a VmValue.
1256pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1257    crate::stdlib::json_to_vm_value(val)
1258}
1259
1260fn parse_host_tools_list_response(
1261    result: serde_json::Value,
1262) -> Result<Vec<serde_json::Value>, VmError> {
1263    let tools = match result {
1264        serde_json::Value::Array(items) => items,
1265        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1266            map.get("result")
1267                .and_then(|value| value.get("tools"))
1268                .cloned()
1269        }) {
1270            Some(serde_json::Value::Array(items)) => items,
1271            _ => {
1272                return Err(VmError::Runtime(
1273                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1274                ));
1275            }
1276        },
1277        _ => {
1278            return Err(VmError::Runtime(
1279                "host/tools/list: unexpected response shape".into(),
1280            ));
1281        }
1282    };
1283
1284    let mut normalized = Vec::with_capacity(tools.len());
1285    for tool in tools {
1286        let serde_json::Value::Object(map) = tool else {
1287            return Err(VmError::Runtime(
1288                "host/tools/list: every tool must be an object".into(),
1289            ));
1290        };
1291        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1292            return Err(VmError::Runtime(
1293                "host/tools/list: every tool must include a string `name`".into(),
1294            ));
1295        };
1296        let description = map
1297            .get("description")
1298            .and_then(|value| value.as_str())
1299            .or_else(|| {
1300                map.get("short_description")
1301                    .and_then(|value| value.as_str())
1302            })
1303            .unwrap_or_default();
1304        let schema = map
1305            .get("schema")
1306            .cloned()
1307            .or_else(|| map.get("parameters").cloned())
1308            .or_else(|| map.get("input_schema").cloned())
1309            .unwrap_or(serde_json::Value::Null);
1310        let deprecated = map
1311            .get("deprecated")
1312            .and_then(|value| value.as_bool())
1313            .unwrap_or(false);
1314        normalized.push(serde_json::json!({
1315            "name": name,
1316            "description": description,
1317            "schema": schema,
1318            "deprecated": deprecated,
1319        }));
1320    }
1321    Ok(normalized)
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326    use super::*;
1327
1328    fn test_bridge() -> HostBridge {
1329        HostBridge::from_parts(
1330            Arc::new(Mutex::new(HashMap::new())),
1331            Arc::new(AtomicBool::new(false)),
1332            Arc::new(std::sync::Mutex::new(())),
1333            1,
1334        )
1335    }
1336
1337    #[test]
1338    fn test_json_rpc_request_format() {
1339        let request = crate::jsonrpc::request(
1340            1,
1341            "llm_call",
1342            serde_json::json!({
1343                "prompt": "Hello",
1344                "system": "Be helpful",
1345            }),
1346        );
1347        let s = serde_json::to_string(&request).unwrap();
1348        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1349        assert!(s.contains("\"id\":1"));
1350        assert!(s.contains("\"method\":\"llm_call\""));
1351    }
1352
1353    #[test]
1354    fn test_json_rpc_notification_format() {
1355        let notification =
1356            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1357        let s = serde_json::to_string(&notification).unwrap();
1358        assert!(s.contains("\"method\":\"output\""));
1359        assert!(!s.contains("\"id\""));
1360    }
1361
1362    #[test]
1363    fn test_json_rpc_error_response_parsing() {
1364        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1365        assert!(response.get("error").is_some());
1366        assert_eq!(
1367            response["error"]["message"].as_str().unwrap(),
1368            "Invalid request"
1369        );
1370    }
1371
1372    #[test]
1373    fn test_json_rpc_success_response_parsing() {
1374        let response = crate::jsonrpc::response(
1375            1,
1376            serde_json::json!({
1377                "text": "Hello world",
1378                "input_tokens": 10,
1379                "output_tokens": 5,
1380            }),
1381        );
1382        assert!(response.get("result").is_some());
1383        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1384    }
1385
1386    #[test]
1387    fn test_cancelled_flag() {
1388        let cancelled = Arc::new(AtomicBool::new(false));
1389        assert!(!cancelled.load(Ordering::SeqCst));
1390        cancelled.store(true, Ordering::SeqCst);
1391        assert!(cancelled.load(Ordering::SeqCst));
1392    }
1393
1394    #[test]
1395    fn pending_host_calls_return_when_cancellation_arrives() {
1396        let runtime = tokio::runtime::Builder::new_current_thread()
1397            .enable_all()
1398            .build()
1399            .unwrap();
1400        runtime.block_on(async {
1401            let pending = Arc::new(Mutex::new(HashMap::new()));
1402            let cancelled = Arc::new(AtomicBool::new(false));
1403            let bridge = HostBridge::from_parts_with_writer(
1404                pending.clone(),
1405                cancelled.clone(),
1406                Arc::new(|_| Ok(())),
1407                1,
1408            );
1409
1410            let call = bridge.call("host/work", serde_json::json!({}));
1411            tokio::pin!(call);
1412
1413            loop {
1414                tokio::select! {
1415                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1416                    _ = tokio::task::yield_now() => {}
1417                }
1418                if !pending.lock().await.is_empty() {
1419                    break;
1420                }
1421            }
1422
1423            cancelled.store(true, Ordering::SeqCst);
1424            bridge.cancel_notify.notify_waiters();
1425
1426            let result = tokio::time::timeout(Duration::from_secs(1), call)
1427                .await
1428                .expect("pending call should observe cancellation promptly");
1429            assert!(
1430                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1431            );
1432            assert!(pending.lock().await.is_empty());
1433        });
1434    }
1435
1436    #[test]
1437    fn queued_messages_are_filtered_by_delivery_mode() {
1438        let runtime = tokio::runtime::Builder::new_current_thread()
1439            .enable_all()
1440            .build()
1441            .unwrap();
1442        runtime.block_on(async {
1443            let bridge = test_bridge();
1444            bridge
1445                .push_queued_user_message("first".to_string(), "finish_step")
1446                .await;
1447            bridge
1448                .push_queued_user_message("second".to_string(), "wait_for_completion")
1449                .await;
1450
1451            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1452            assert_eq!(finish_step.len(), 1);
1453            assert_eq!(finish_step[0].content, "first");
1454
1455            let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1456            assert_eq!(turn_end.len(), 1);
1457            assert_eq!(turn_end[0].content, "second");
1458        });
1459    }
1460
1461    #[test]
1462    fn queued_transcript_injections_preserve_user_reminder_separation() {
1463        let runtime = tokio::runtime::Builder::new_current_thread()
1464            .enable_all()
1465            .build()
1466            .unwrap();
1467        runtime.block_on(async {
1468            let bridge = test_bridge();
1469            bridge
1470                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1471                .await;
1472            let reminder_id = bridge
1473                .push_queued_session_remind_from_params(&serde_json::json!({
1474                    "body": "Host-provided ambient context.",
1475                    "tags": ["host"],
1476                    "dedupe_key": "host-context",
1477                    "ttl_turns": 2,
1478                    "mode": "wait_for_completion",
1479                    "_meta": {"harn": {"source": "test"}},
1480                }))
1481                .await
1482                .expect("valid reminder");
1483
1484            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1485            assert_eq!(finish_step.len(), 1);
1486            assert_eq!(finish_step[0].content, "human follow-up");
1487
1488            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1489            assert!(no_user_messages.is_empty());
1490
1491            let injections = bridge
1492                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1493                .await;
1494            assert_eq!(injections.len(), 1);
1495            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1496                panic!("expected queued reminder");
1497            };
1498            assert_eq!(reminder.reminder.id, reminder_id);
1499            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1500            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1501            assert_eq!(
1502                reminder.reminder.dedupe_key.as_deref(),
1503                Some("host-context")
1504            );
1505            assert_eq!(reminder.reminder.ttl_turns, Some(2));
1506            assert_eq!(
1507                reminder.reminder.source,
1508                crate::llm::helpers::ReminderSource::Bridge
1509            );
1510        });
1511    }
1512
1513    #[test]
1514    fn bridge_remind_modes_honor_delivery_checkpoints() {
1515        let runtime = tokio::runtime::Builder::new_current_thread()
1516            .enable_all()
1517            .build()
1518            .unwrap();
1519        runtime.block_on(async {
1520            let cases = [
1521                (
1522                    "interrupt_immediate",
1523                    DeliveryCheckpoint::InterruptImmediate,
1524                    DeliveryCheckpoint::AfterCurrentOperation,
1525                ),
1526                (
1527                    "finish_step",
1528                    DeliveryCheckpoint::AfterCurrentOperation,
1529                    DeliveryCheckpoint::EndOfInteraction,
1530                ),
1531                (
1532                    "wait_for_completion",
1533                    DeliveryCheckpoint::EndOfInteraction,
1534                    DeliveryCheckpoint::InterruptImmediate,
1535                ),
1536            ];
1537
1538            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
1539                let bridge = test_bridge();
1540                bridge
1541                    .push_queued_session_remind_from_params(&serde_json::json!({
1542                        "body": format!("Reminder for {mode}"),
1543                        "mode": mode,
1544                    }))
1545                    .await
1546                    .expect("valid session/remind payload");
1547
1548                let premature = bridge
1549                    .take_queued_transcript_injections_for(wrong_checkpoint)
1550                    .await;
1551                assert!(
1552                    premature.is_empty(),
1553                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
1554                );
1555
1556                let delivered = bridge
1557                    .take_queued_transcript_injections_for(expected_checkpoint)
1558                    .await;
1559                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
1560                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
1561                    panic!("expected reminder for {mode}");
1562                };
1563                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
1564            }
1565        });
1566    }
1567
1568    #[test]
1569    fn bridge_session_input_path_never_produces_reminder() {
1570        let runtime = tokio::runtime::Builder::new_current_thread()
1571            .enable_all()
1572            .build()
1573            .unwrap();
1574        runtime.block_on(async {
1575            let bridge = test_bridge();
1576            bridge
1577                .push_queued_user_message("still user input".to_string(), "finish_step")
1578                .await;
1579
1580            let delivered = bridge
1581                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
1582                .await;
1583            assert_eq!(delivered.len(), 1);
1584            let QueuedTranscriptInjection::User(message) = &delivered[0] else {
1585                panic!("session/input queue path must produce a user message");
1586            };
1587            assert_eq!(message.content, "still user input");
1588        });
1589    }
1590
1591    #[test]
1592    fn session_remind_validation_rejects_user_message_shape() {
1593        let err = queued_session_remind_from_params(&serde_json::json!({
1594            "content": "this is still a user message",
1595            "mode": "interrupt_immediate",
1596        }))
1597        .expect_err("session/remind must require a reminder body");
1598        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
1599        assert!(err.contains("body"));
1600    }
1601
1602    #[test]
1603    fn session_remind_validation_rejects_unknown_options_separately() {
1604        let err = queued_session_remind_from_params(&serde_json::json!({
1605            "body": "valid body",
1606            "unknown_host_field": true,
1607        }))
1608        .expect_err("session/remind must reject unknown top-level fields");
1609        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
1610        assert!(err.contains("unknown_host_field"));
1611    }
1612
1613    #[test]
1614    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
1615        let err = queued_session_remind_from_params(&serde_json::json!({
1616            "body": "valid body",
1617            "propagate": "workspace",
1618        }))
1619        .expect_err("session/remind must reject unknown propagate values");
1620        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
1621        assert!(err.contains("propagate"));
1622    }
1623
1624    #[test]
1625    fn test_json_result_to_vm_value_string() {
1626        let val = serde_json::json!("hello");
1627        let vm_val = json_result_to_vm_value(&val);
1628        assert_eq!(vm_val.display(), "hello");
1629    }
1630
1631    #[test]
1632    fn test_json_result_to_vm_value_dict() {
1633        let val = serde_json::json!({"name": "test", "count": 42});
1634        let vm_val = json_result_to_vm_value(&val);
1635        let VmValue::Dict(d) = &vm_val else {
1636            unreachable!("Expected Dict, got {:?}", vm_val);
1637        };
1638        assert_eq!(d.get("name").unwrap().display(), "test");
1639        assert_eq!(d.get("count").unwrap().display(), "42");
1640    }
1641
1642    #[test]
1643    fn test_json_result_to_vm_value_null() {
1644        let val = serde_json::json!(null);
1645        let vm_val = json_result_to_vm_value(&val);
1646        assert!(matches!(vm_val, VmValue::Nil));
1647    }
1648
1649    #[test]
1650    fn test_json_result_to_vm_value_nested() {
1651        let val = serde_json::json!({
1652            "text": "response",
1653            "tool_calls": [
1654                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1655            ],
1656            "input_tokens": 100,
1657            "output_tokens": 50,
1658        });
1659        let vm_val = json_result_to_vm_value(&val);
1660        let VmValue::Dict(d) = &vm_val else {
1661            unreachable!("Expected Dict, got {:?}", vm_val);
1662        };
1663        assert_eq!(d.get("text").unwrap().display(), "response");
1664        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1665            unreachable!("Expected List for tool_calls");
1666        };
1667        assert_eq!(list.len(), 1);
1668    }
1669
1670    #[test]
1671    fn parse_host_tools_list_accepts_object_wrapper() {
1672        let tools = parse_host_tools_list_response(serde_json::json!({
1673            "tools": [
1674                {
1675                    "name": "Read",
1676                    "description": "Read a file",
1677                    "schema": {"type": "object"},
1678                }
1679            ]
1680        }))
1681        .expect("tool list");
1682
1683        assert_eq!(tools.len(), 1);
1684        assert_eq!(tools[0]["name"], "Read");
1685        assert_eq!(tools[0]["deprecated"], false);
1686    }
1687
1688    #[test]
1689    fn parse_host_tools_list_accepts_compat_fields() {
1690        let tools = parse_host_tools_list_response(serde_json::json!({
1691            "result": {
1692                "tools": [
1693                    {
1694                        "name": "Edit",
1695                        "short_description": "Apply an edit",
1696                        "input_schema": {"type": "object"},
1697                        "deprecated": true,
1698                    }
1699                ]
1700            }
1701        }))
1702        .expect("tool list");
1703
1704        assert_eq!(tools[0]["description"], "Apply an edit");
1705        assert_eq!(tools[0]["schema"]["type"], "object");
1706        assert_eq!(tools[0]["deprecated"], true);
1707    }
1708
1709    #[test]
1710    fn parse_host_tools_list_requires_tool_names() {
1711        let err = parse_host_tools_list_response(serde_json::json!({
1712            "tools": [
1713                {"description": "missing name"}
1714            ]
1715        }))
1716        .expect_err("expected error");
1717        assert!(err
1718            .to_string()
1719            .contains("host/tools/list: every tool must include a string `name`"));
1720    }
1721
1722    #[test]
1723    fn test_timeout_duration() {
1724        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1725    }
1726}