Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27/// Default timeout for bridge calls (5 minutes).
28const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33    Arc::new(move |line: &str| {
34        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35        let mut stdout = std::io::stdout().lock();
36        stdout
37            .write_all(line.as_bytes())
38            .map_err(|e| format!("Bridge write error: {e}"))?;
39        stdout
40            .write_all(b"\n")
41            .map_err(|e| format!("Bridge write error: {e}"))?;
42        stdout
43            .flush()
44            .map_err(|e| format!("Bridge flush error: {e}"))?;
45        Ok(())
46    })
47}
48
49/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
50///
51/// The bridge sends requests to the host on stdout and receives responses
52/// on stdin. A background task reads stdin and dispatches responses to
53/// waiting callers by request ID. All stdout writes are serialized through
54/// a mutex to prevent interleaving.
55pub struct HostBridge {
56    next_id: AtomicU64,
57    /// Pending request waiters, keyed by JSON-RPC id.
58    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59    /// Whether the host has sent a cancel notification.
60    cancelled: Arc<AtomicBool>,
61    /// Wakes pending host calls when cancellation arrives.
62    cancel_notify: Arc<Notify>,
63    /// Transport writer used to send JSON-RPC to the host.
64    writer: HostBridgeWriter,
65    /// ACP session ID (set in ACP mode for session-scoped notifications).
66    session_id: std::sync::Mutex<String>,
67    /// Name of the currently executing Harn script (without .harn suffix).
68    script_name: std::sync::Mutex<String>,
69    /// Transcript injections queued by the host while a run is active.
70    queued_transcript_injections: HostBridgeInjectionState,
71    /// Host-triggered resume signal for daemon agents.
72    resume_requested: Arc<AtomicBool>,
73    /// Host-triggered skill-registry invalidation signal. Set when the
74    /// host sends a `skills/update` notification; consumed by the CLI
75    /// between runs (watch mode, long-running agents) to rebuild the
76    /// layered skill catalog from its current filesystem + host state.
77    skills_reload_requested: Arc<AtomicBool>,
78    /// Whether the current daemon-mode agent loop is blocked in idle wait.
79    daemon_idle: Arc<AtomicBool>,
80    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
81    /// finalize during this prompt. Read once by the ACP adapter when the
82    /// pipeline returns and populated by `host_agent_session_finalize`.
83    /// Pipelines that don't run an agent loop leave this `None`, in which
84    /// case the adapter falls back to `end_turn`.
85    prompt_stop_reason: std::sync::Mutex<Option<String>>,
86    /// Per-call visible assistant text state for call_progress notifications.
87    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88    /// Whether an LLM call's deltas should be exposed to end users while streaming.
89    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90    /// Optional in-process host-module backend used by `harn playground`.
91    in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95    module_path: PathBuf,
96    exported_functions: BTreeMap<String, Rc<VmClosure>>,
97    vm: Vm,
98}
99
100impl InProcessHost {
101    /// Box-pin'd to break the static recursion between the VM's hot dispatch
102    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
103    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
104    /// at this slow-path boundary keeps the recursion satisfied without
105    /// allocating per call in the hot per-callback path.
106    fn dispatch<'a>(
107        &'a self,
108        method: &'a str,
109        params: serde_json::Value,
110    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111        Box::pin(async move {
112            match method {
113                "builtin_call" => {
114                    let name = params
115                        .get("name")
116                        .and_then(|value| value.as_str())
117                        .unwrap_or_default();
118                    let args = params
119                        .get("args")
120                        .and_then(|value| value.as_array())
121                        .cloned()
122                        .unwrap_or_default()
123                        .into_iter()
124                        .map(|value| json_result_to_vm_value(&value))
125                        .collect::<Vec<_>>();
126                    self.invoke_export(name, &args).await
127                }
128                "host/tools/list" => self
129                    .invoke_optional_export("host_tools_list", &[])
130                    .await
131                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132                "session/request_permission" => self.request_permission(params).await,
133                other => Err(VmError::Runtime(format!(
134                    "playground host backend does not implement bridge method '{other}'"
135                ))),
136            }
137        })
138    }
139
140    async fn invoke_export(
141        &self,
142        name: &str,
143        args: &[VmValue],
144    ) -> Result<serde_json::Value, VmError> {
145        let Some(closure) = self.exported_functions.get(name) else {
146            return Err(VmError::Runtime(format!(
147                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148                self.module_path.display()
149            )));
150        };
151
152        let mut vm = self.vm.child_vm_for_host();
153        let result = vm.call_closure_pub(closure, args).await?;
154        Ok(crate::llm::vm_value_to_json(&result))
155    }
156
157    async fn invoke_optional_export(
158        &self,
159        name: &str,
160        args: &[VmValue],
161    ) -> Result<Option<serde_json::Value>, VmError> {
162        if !self.exported_functions.contains_key(name) {
163            return Ok(None);
164        }
165        self.invoke_export(name, args).await.map(Some)
166    }
167
168    async fn request_permission(
169        &self,
170        params: serde_json::Value,
171    ) -> Result<serde_json::Value, VmError> {
172        // No exported `request_permission` ⇒ default-allow: emit a canonical
173        // `selected` outcome on the allow option (#2639).
174        let Some(closure) = self.exported_functions.get("request_permission") else {
175            return Ok(canonical_allow_response());
176        };
177
178        let tool_call = params.get("toolCall");
179        let tool_name = tool_call
180            .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
181            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
182            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
183            .and_then(|value| value.as_str())
184            .unwrap_or_default();
185        let tool_args = tool_call
186            .and_then(|tool_call| tool_call.get("rawInput"))
187            .map(json_result_to_vm_value)
188            .unwrap_or(VmValue::Nil);
189        let full_payload = json_result_to_vm_value(&params);
190
191        let arg_count = closure.func.params.len();
192        let args = if arg_count >= 3 {
193            vec![
194                VmValue::String(Rc::from(tool_name.to_string())),
195                tool_args,
196                full_payload,
197            ]
198        } else if arg_count == 2 {
199            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
200        } else if arg_count == 1 {
201            vec![full_payload]
202        } else {
203            Vec::new()
204        };
205
206        let mut vm = self.vm.child_vm_for_host();
207        let result = vm.call_closure_pub(closure, &args).await?;
208        // Translate the script's verdict into a canonical ACP response
209        // (`{ outcome: { outcome: "selected" | "cancelled", optionId? } }`).
210        // The script API stays ergonomic — bool / string-reason / dict — but
211        // the wire shape is canonical.
212        let payload = match result {
213            VmValue::Bool(granted) => {
214                if granted {
215                    canonical_allow_response()
216                } else {
217                    canonical_reject_response(None)
218                }
219            }
220            VmValue::String(reason) if !reason.is_empty() => {
221                canonical_reject_response(Some(reason.to_string()))
222            }
223            other => {
224                let json = crate::llm::vm_value_to_json(&other);
225                if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
226                    if granted {
227                        canonical_allow_response()
228                    } else {
229                        canonical_reject_response(
230                            json.get("reason")
231                                .and_then(|value| value.as_str())
232                                .map(str::to_string),
233                        )
234                    }
235                } else if json.get("outcome").is_some() {
236                    // The script already returned a canonical-shaped outcome.
237                    json
238                } else if other.is_truthy() {
239                    canonical_allow_response()
240                } else {
241                    canonical_reject_response(None)
242                }
243            }
244        };
245        Ok(payload)
246    }
247}
248
249/// Canonical ACP `RequestPermissionResponse` granting the call: a
250/// `selected` outcome on the `allow` option (#2639).
251fn canonical_allow_response() -> serde_json::Value {
252    serde_json::json!({
253        "outcome": { "outcome": "selected", "optionId": "allow" }
254    })
255}
256
257/// Canonical ACP `RequestPermissionResponse` rejecting the call: a
258/// `selected` outcome on the `reject` option, with an optional harn-vendor
259/// `reason` extension (#2639).
260fn canonical_reject_response(reason: Option<String>) -> serde_json::Value {
261    let mut response = serde_json::json!({
262        "outcome": { "outcome": "selected", "optionId": "reject" }
263    });
264    if let Some(reason) = reason {
265        response
266            .as_object_mut()
267            .expect("object")
268            .insert("reason".to_string(), serde_json::Value::String(reason));
269    }
270    response
271}
272
273/// How a queued bridge injection is delivered into the agent loop.
274///
275/// `AuditOnly` was previously called `WaitForCompletion`; the rename is
276/// truth-in-advertising (harn#2212). These injections drain at
277/// `loop_exit`, *after* the last LLM call has returned — so they land in
278/// the transcript audit but are **never rendered into a model prompt**.
279/// Hosts that want the model to react to the reminder on its final
280/// iteration should use `FinishStep` instead, which drains at every
281/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
282#[derive(Clone, Copy, Debug, PartialEq, Eq)]
283pub enum QueuedUserMessageMode {
284    InterruptImmediate,
285    FinishStep,
286    AuditOnly,
287}
288
289#[derive(Clone, Copy, Debug, PartialEq, Eq)]
290pub enum DeliveryCheckpoint {
291    InterruptImmediate,
292    AfterCurrentOperation,
293    EndOfInteraction,
294}
295
296impl QueuedUserMessageMode {
297    fn from_str(value: &str) -> Self {
298        match value {
299            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
300            "finish_step" | "after_current_operation" => Self::FinishStep,
301            // Unknown / missing modes fall through to the safest option:
302            // record for audit, do not preempt the loop. Pre-#2212 hosts
303            // that send `wait_for_completion` are caught by this arm —
304            // the canonical name is `audit_only` going forward.
305            _ => Self::AuditOnly,
306        }
307    }
308
309    fn as_str(self) -> &'static str {
310        match self {
311            Self::InterruptImmediate => "interrupt_immediate",
312            Self::FinishStep => "finish_step",
313            Self::AuditOnly => "audit_only",
314        }
315    }
316}
317
318#[derive(Clone, Debug, PartialEq, Eq)]
319pub struct QueuedUserMessage {
320    pub message_id: String,
321    pub content: String,
322    pub transcript_content: serde_json::Value,
323    pub mode: QueuedUserMessageMode,
324}
325
326#[derive(Clone, Debug, PartialEq, Eq)]
327pub struct QueuedReminder {
328    pub reminder: crate::llm::helpers::SystemReminder,
329    pub mode: QueuedUserMessageMode,
330}
331
332#[derive(Clone, Debug, PartialEq, Eq)]
333pub enum QueuedTranscriptInjection {
334    User(QueuedUserMessage),
335    Reminder(QueuedReminder),
336}
337
338#[derive(Debug, Default)]
339struct QueuedTranscriptInjections {
340    queue: VecDeque<QueuedTranscriptInjection>,
341    revoked_user_message_ids: HashSet<String>,
342    delivered_user_message_ids: HashSet<String>,
343    revoked_reminder_ids: HashSet<String>,
344    delivered_reminder_ids: HashSet<String>,
345}
346
347#[derive(Clone, Debug, Default)]
348pub struct HostBridgeInjectionState {
349    inner: Arc<Mutex<QueuedTranscriptInjections>>,
350}
351
352#[derive(Clone, Copy, Debug, PartialEq, Eq)]
353pub enum PendingUserMessageMutationResult {
354    Mutated,
355    AlreadyRevoked,
356    AlreadyDelivered,
357    UnknownMessageId,
358}
359
360impl QueuedTranscriptInjection {
361    fn mode(&self) -> QueuedUserMessageMode {
362        match self {
363            Self::User(message) => message.mode,
364            Self::Reminder(reminder) => reminder.mode,
365        }
366    }
367
368    fn pending_json(&self, position: usize) -> serde_json::Value {
369        match self {
370            Self::User(message) => serde_json::json!({
371                "kind": "user",
372                "id": message.message_id,
373                "messageId": message.message_id,
374                "mode": message.mode.as_str(),
375                "position": position,
376                "content": message.transcript_content,
377            }),
378            Self::Reminder(reminder) => serde_json::json!({
379                "kind": "reminder",
380                "id": reminder.reminder.id,
381                "reminderId": reminder.reminder.id,
382                "mode": reminder.mode.as_str(),
383                "position": position,
384                "body": reminder.reminder.body,
385                "tags": reminder.reminder.tags,
386                "dedupeKey": reminder.reminder.dedupe_key,
387                "ttlTurns": reminder.reminder.ttl_turns,
388                "preserveOnCompact": reminder.reminder.preserve_on_compact,
389                "propagate": reminder.reminder.propagate.as_str(),
390                "roleHint": reminder.reminder.role_hint.as_str(),
391                "source": reminder.reminder.source.as_str(),
392                "firedAtTurn": reminder.reminder.fired_at_turn,
393                "originatingAgentId": reminder.reminder.originating_agent_id,
394            }),
395        }
396    }
397}
398
399#[derive(Clone, Copy, Debug, PartialEq, Eq)]
400pub enum PendingReminderMutationResult {
401    Mutated,
402    AlreadyRevoked,
403    AlreadyDelivered,
404    UnknownReminderId,
405}
406
407fn new_inject_message_id() -> String {
408    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
409}
410
411impl HostBridgeInjectionState {
412    pub fn new() -> Self {
413        Self::default()
414    }
415
416    pub async fn push_pending_user_message(
417        &self,
418        content: String,
419        transcript_content: serde_json::Value,
420        mode: &str,
421    ) -> String {
422        let message_id = new_inject_message_id();
423        self.inner
424            .lock()
425            .await
426            .queue
427            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
428                message_id: message_id.clone(),
429                content,
430                transcript_content,
431                mode: QueuedUserMessageMode::from_str(mode),
432            }));
433        message_id
434    }
435
436    pub async fn revoke_pending_user_message(
437        &self,
438        message_id: &str,
439    ) -> PendingUserMessageMutationResult {
440        let mut state = self.inner.lock().await;
441        let mut retained = VecDeque::new();
442        let mut revoked = false;
443        while let Some(injection) = state.queue.pop_front() {
444            match &injection {
445                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
446                    revoked = true;
447                }
448                _ => retained.push_back(injection),
449            }
450        }
451        state.queue = retained;
452        if revoked {
453            state
454                .revoked_user_message_ids
455                .insert(message_id.to_string());
456            return PendingUserMessageMutationResult::Mutated;
457        }
458        if state.revoked_user_message_ids.contains(message_id) {
459            PendingUserMessageMutationResult::AlreadyRevoked
460        } else if state.delivered_user_message_ids.contains(message_id) {
461            PendingUserMessageMutationResult::AlreadyDelivered
462        } else {
463            PendingUserMessageMutationResult::UnknownMessageId
464        }
465    }
466
467    pub async fn revoke_pending_reminder(
468        &self,
469        reminder_id: &str,
470    ) -> PendingReminderMutationResult {
471        let mut state = self.inner.lock().await;
472        let mut retained = VecDeque::new();
473        let mut revoked = false;
474        while let Some(injection) = state.queue.pop_front() {
475            match &injection {
476                QueuedTranscriptInjection::Reminder(reminder)
477                    if reminder.reminder.id == reminder_id =>
478                {
479                    revoked = true;
480                }
481                _ => retained.push_back(injection),
482            }
483        }
484        state.queue = retained;
485        if revoked {
486            state.revoked_reminder_ids.insert(reminder_id.to_string());
487            return PendingReminderMutationResult::Mutated;
488        }
489        if state.revoked_reminder_ids.contains(reminder_id) {
490            PendingReminderMutationResult::AlreadyRevoked
491        } else if state.delivered_reminder_ids.contains(reminder_id) {
492            PendingReminderMutationResult::AlreadyDelivered
493        } else {
494            PendingReminderMutationResult::UnknownReminderId
495        }
496    }
497
498    pub async fn replace_pending_user_message(
499        &self,
500        message_id: &str,
501        content: String,
502        transcript_content: serde_json::Value,
503    ) -> PendingUserMessageMutationResult {
504        let mut state = self.inner.lock().await;
505        for injection in &mut state.queue {
506            if let QueuedTranscriptInjection::User(message) = injection {
507                if message.message_id == message_id {
508                    message.content = content;
509                    message.transcript_content = transcript_content;
510                    return PendingUserMessageMutationResult::Mutated;
511                }
512            }
513        }
514        if state.revoked_user_message_ids.contains(message_id) {
515            PendingUserMessageMutationResult::AlreadyRevoked
516        } else if state.delivered_user_message_ids.contains(message_id) {
517            PendingUserMessageMutationResult::AlreadyDelivered
518        } else {
519            PendingUserMessageMutationResult::UnknownMessageId
520        }
521    }
522
523    async fn push_session_reminder(&self, reminder: QueuedReminder) {
524        self.inner
525            .lock()
526            .await
527            .queue
528            .push_back(QueuedTranscriptInjection::Reminder(reminder));
529    }
530
531    pub async fn pending_injections_json(&self) -> serde_json::Value {
532        let state = self.inner.lock().await;
533        let injections = state
534            .queue
535            .iter()
536            .enumerate()
537            .map(|(position, injection)| injection.pending_json(position))
538            .collect::<Vec<_>>();
539        serde_json::json!({
540            "pendingCount": injections.len(),
541            "injections": injections,
542        })
543    }
544}
545
546fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
547    format!(
548        "{}: {}",
549        Code::ReminderUnknownOption.as_str(),
550        message.as_ref()
551    )
552}
553
554fn session_remind_shape_error(message: impl AsRef<str>) -> String {
555    format!(
556        "{}: {}",
557        Code::ReminderInvalidShape.as_str(),
558        message.as_ref()
559    )
560}
561
562fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
563    format!(
564        "{}: {}",
565        Code::ReminderUnknownPropagate.as_str(),
566        message.as_ref()
567    )
568}
569
570fn string_field(
571    map: &serde_json::Map<String, serde_json::Value>,
572    key: &str,
573    required: bool,
574) -> Result<Option<String>, String> {
575    match map.get(key) {
576        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
577            format!("`{key}` must be a non-empty string"),
578        )),
579        None | Some(serde_json::Value::Null) => Ok(None),
580        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
581            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
582        ),
583        Some(serde_json::Value::String(value)) => {
584            let trimmed = value.trim();
585            if trimmed.is_empty() {
586                Ok(None)
587            } else {
588                Ok(Some(trimmed.to_string()))
589            }
590        }
591        Some(other) => Err(session_remind_shape_error(format!(
592            "`{key}` must be a string, got {other}"
593        ))),
594    }
595}
596
597fn bool_field(
598    map: &serde_json::Map<String, serde_json::Value>,
599    key: &str,
600) -> Result<Option<bool>, String> {
601    match map.get(key) {
602        None | Some(serde_json::Value::Null) => Ok(None),
603        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
604        Some(other) => Err(session_remind_shape_error(format!(
605            "`{key}` must be a bool, got {other}"
606        ))),
607    }
608}
609
610fn int_field(
611    map: &serde_json::Map<String, serde_json::Value>,
612    key: &str,
613) -> Result<Option<i64>, String> {
614    match map.get(key) {
615        None | Some(serde_json::Value::Null) => Ok(None),
616        Some(serde_json::Value::Number(value)) => {
617            let Some(value) = value.as_i64() else {
618                return Err(session_remind_shape_error(format!(
619                    "`{key}` must be an integer"
620                )));
621            };
622            Ok(Some(value))
623        }
624        Some(other) => Err(session_remind_shape_error(format!(
625            "`{key}` must be an int, got {other}"
626        ))),
627    }
628}
629
630fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
631    let Some(value) = map.get("tags") else {
632        return Ok(Vec::new());
633    };
634    if value.is_null() {
635        return Ok(Vec::new());
636    }
637    let Some(values) = value.as_array() else {
638        return Err(session_remind_shape_error("`tags` must be a list"));
639    };
640    let mut tags = Vec::new();
641    for value in values {
642        let Some(tag) = value.as_str() else {
643            return Err(session_remind_shape_error(format!(
644                "`tags` entries must be strings, got {value}"
645            )));
646        };
647        let tag = tag.trim();
648        if tag.is_empty() {
649            return Err(session_remind_shape_error(
650                "`tags` entries must be non-empty strings",
651            ));
652        }
653        if !tags.iter().any(|existing| existing == tag) {
654            tags.push(tag.to_string());
655        }
656    }
657    Ok(tags)
658}
659
660fn session_remind_payload_from_value(
661    value: &serde_json::Value,
662) -> Result<crate::llm::helpers::SystemReminder, String> {
663    let Some(map) = value.as_object() else {
664        return Err(session_remind_shape_error(
665            "session/remind payload must be a reminder object",
666        ));
667    };
668    const ALLOWED: &[&str] = &[
669        "_meta",
670        "body",
671        "dedupe_key",
672        "fired_at_turn",
673        "id",
674        "preserve_on_compact",
675        "propagate",
676        "role_hint",
677        "source",
678        "tags",
679        "ttl_turns",
680    ];
681    let unknown = map
682        .keys()
683        .filter(|key| !ALLOWED.contains(&key.as_str()))
684        .map(String::as_str)
685        .collect::<Vec<_>>();
686    if !unknown.is_empty() {
687        if unknown.contains(&"content") {
688            return Err(session_remind_shape_error(
689                "session/remind expects reminder `body`, not user-message `content`",
690            ));
691        }
692        return Err(reminder_unknown_option_error(format!(
693            "unknown reminder option(s): {}",
694            unknown.join(", ")
695        )));
696    }
697    if let Some(meta) = map.get("_meta") {
698        if !meta.is_null() && !meta.is_object() {
699            return Err(session_remind_shape_error("`_meta` must be an object"));
700        }
701    }
702    let ttl_turns = int_field(map, "ttl_turns")?;
703    if let Some(value) = ttl_turns {
704        if value <= 0 {
705            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
706        }
707    }
708    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
709    if fired_at_turn < 0 {
710        return Err(session_remind_shape_error(
711            "`fired_at_turn` must be >= 0 when provided",
712        ));
713    }
714    match string_field(map, "source", false)?.as_deref() {
715        None | Some("bridge") => {}
716        Some(_) => {
717            return Err(session_remind_shape_error(
718                "`source` for session/remind must be bridge when provided",
719            ))
720        }
721    }
722    let propagate = match string_field(map, "propagate", false)?.as_deref() {
723        None => crate::llm::helpers::ReminderPropagate::Session,
724        Some("all") => crate::llm::helpers::ReminderPropagate::All,
725        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
726        Some("none") => crate::llm::helpers::ReminderPropagate::None,
727        Some(_) => {
728            return Err(reminder_unknown_propagate_error(
729                "`propagate` must be one of all, session, or none",
730            ))
731        }
732    };
733    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
734        None => crate::llm::helpers::ReminderRoleHint::System,
735        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
736        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
737        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
738        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
739        Some(_) => {
740            return Err(session_remind_shape_error(
741                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
742            ))
743        }
744    };
745    Ok(crate::llm::helpers::SystemReminder {
746        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
747        tags: tags_field(map)?,
748        dedupe_key: string_field(map, "dedupe_key", false)?,
749        ttl_turns,
750        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
751        propagate,
752        role_hint,
753        source: crate::llm::helpers::ReminderSource::Bridge,
754        body: string_field(map, "body", true)?.unwrap_or_default(),
755        fired_at_turn,
756        originating_agent_id: None,
757    })
758}
759
760/// Parse the params of a `session/cancel_tool_call` notification and fire
761/// the per-tool-call cancellation. Mirrors the shape used by the public
762/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
763/// regardless of which surface they came through.
764///
765/// Stdio bridges send this as a notification (no id, no response); the
766/// builtin handles request/response semantics in Harn. We deliberately
767/// drop malformed payloads silently because notifications can't reply
768/// with an error — logging would also be too noisy for partial drops.
769fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
770    let session_id = params
771        .get("sessionId")
772        .or_else(|| params.get("session_id"))
773        .and_then(|value| value.as_str())
774        .unwrap_or_default();
775    let call_id = params
776        .get("toolCallId")
777        .or_else(|| params.get("tool_call_id"))
778        .or_else(|| params.get("callId"))
779        .or_else(|| params.get("call_id"))
780        .and_then(|value| value.as_str())
781        .unwrap_or_default();
782    if call_id.is_empty() {
783        return;
784    }
785    let reason = params
786        .get("reason")
787        .and_then(|value| value.as_str())
788        .unwrap_or("host cancelled in-flight tool call")
789        .to_string();
790    let inject_reminder = params
791        .get("injectReminder")
792        .or_else(|| params.get("inject_reminder"))
793        .and_then(|value| value.as_bool())
794        .unwrap_or(true);
795    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
796}
797
798fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
799    let mode = QueuedUserMessageMode::from_str(
800        params
801            .get("mode")
802            .and_then(|value| value.as_str())
803            .unwrap_or("audit_only"),
804    );
805    let reminder_value = if let Some(reminder) = params.get("reminder") {
806        reminder.clone()
807    } else {
808        let Some(params) = params.as_object() else {
809            return Err(session_remind_shape_error(
810                "session/remind params must be an object",
811            ));
812        };
813        let mut reminder = params.clone();
814        reminder.remove("mode");
815        reminder.remove("sessionId");
816        reminder.remove("session_id");
817        serde_json::Value::Object(reminder)
818    };
819    Ok(QueuedReminder {
820        reminder: session_remind_payload_from_value(&reminder_value)?,
821        mode,
822    })
823}
824
825// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
826#[allow(clippy::new_without_default)]
827impl HostBridge {
828    /// Create a new bridge and spawn the stdin reader task.
829    ///
830    /// Must be called within a tokio LocalSet (uses spawn_local for the
831    /// stdin reader since it's single-threaded).
832    pub fn new() -> Self {
833        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
834            Arc::new(Mutex::new(HashMap::new()));
835        let cancelled = Arc::new(AtomicBool::new(false));
836        let cancel_notify = Arc::new(Notify::new());
837        let queued_transcript_injections = HostBridgeInjectionState::default();
838        let resume_requested = Arc::new(AtomicBool::new(false));
839        let skills_reload_requested = Arc::new(AtomicBool::new(false));
840        let daemon_idle = Arc::new(AtomicBool::new(false));
841
842        // Stdin reader: reads JSON-RPC lines and dispatches responses
843        let pending_clone = pending.clone();
844        let cancelled_clone = cancelled.clone();
845        let cancel_notify_clone = cancel_notify.clone();
846        let queued_clone = queued_transcript_injections.clone();
847        let resume_clone = resume_requested.clone();
848        let skills_reload_clone = skills_reload_requested.clone();
849        tokio::task::spawn_local(async move {
850            let stdin = tokio::io::stdin();
851            let reader = tokio::io::BufReader::new(stdin);
852            let mut lines = reader.lines();
853
854            while let Ok(Some(line)) = lines.next_line().await {
855                let line = line.trim().to_string();
856                if line.is_empty() {
857                    continue;
858                }
859
860                let msg: serde_json::Value = match serde_json::from_str(&line) {
861                    Ok(v) => v,
862                    Err(_) => continue,
863                };
864
865                // Notifications have no id; responses have one.
866                if msg.get("id").is_none() {
867                    if let Some(method) = msg["method"].as_str() {
868                        if method == "cancel" {
869                            cancelled_clone.store(true, Ordering::SeqCst);
870                            cancel_notify_clone.notify_waiters();
871                        } else if method == "agent/resume" {
872                            resume_clone.store(true, Ordering::SeqCst);
873                        } else if method == "skills/update" {
874                            skills_reload_clone.store(true, Ordering::SeqCst);
875                        } else if method == "session/remind" {
876                            let params = &msg["params"];
877                            if let Ok(reminder) = queued_session_remind_from_params(params) {
878                                queued_clone.push_session_reminder(reminder).await;
879                            }
880                        } else if method == "session/cancel_tool_call" {
881                            handle_cancel_tool_call_notification(&msg["params"]);
882                        }
883                    }
884                    continue;
885                }
886
887                if let Some(id) = msg["id"].as_u64() {
888                    let mut pending = pending_clone.lock().await;
889                    if let Some(sender) = pending.remove(&id) {
890                        let _ = sender.send(msg);
891                    }
892                }
893            }
894
895            // stdin closed: drop pending senders to cancel waiters.
896            let mut pending = pending_clone.lock().await;
897            pending.clear();
898        });
899
900        Self {
901            next_id: AtomicU64::new(1),
902            pending,
903            cancelled,
904            cancel_notify,
905            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
906            session_id: std::sync::Mutex::new(String::new()),
907            script_name: std::sync::Mutex::new(String::new()),
908            queued_transcript_injections,
909            resume_requested,
910            skills_reload_requested,
911            daemon_idle,
912            prompt_stop_reason: std::sync::Mutex::new(None),
913            visible_call_states: std::sync::Mutex::new(HashMap::new()),
914            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
915            in_process: None,
916        }
917    }
918
919    /// Create a bridge from pre-existing shared state.
920    ///
921    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
922    /// responsible for dispatching responses into `pending`.  This is used
923    /// by ACP mode which already has its own stdin reader.
924    pub fn from_parts(
925        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
926        cancelled: Arc<AtomicBool>,
927        stdout_lock: Arc<std::sync::Mutex<()>>,
928        start_id: u64,
929    ) -> Self {
930        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
931    }
932
933    pub fn from_parts_with_writer(
934        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
935        cancelled: Arc<AtomicBool>,
936        writer: HostBridgeWriter,
937        start_id: u64,
938    ) -> Self {
939        Self::from_parts_with_writer_and_cancel_notify(
940            pending,
941            cancelled,
942            Arc::new(Notify::new()),
943            writer,
944            start_id,
945        )
946    }
947
948    pub fn from_parts_with_writer_and_cancel_notify(
949        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
950        cancelled: Arc<AtomicBool>,
951        cancel_notify: Arc<Notify>,
952        writer: HostBridgeWriter,
953        start_id: u64,
954    ) -> Self {
955        Self::from_parts_with_writer_cancel_notify_and_injection_state(
956            pending,
957            cancelled,
958            cancel_notify,
959            writer,
960            start_id,
961            None,
962        )
963    }
964
965    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
966        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
967        cancelled: Arc<AtomicBool>,
968        cancel_notify: Arc<Notify>,
969        writer: HostBridgeWriter,
970        start_id: u64,
971        injection_state: Option<HostBridgeInjectionState>,
972    ) -> Self {
973        Self {
974            next_id: AtomicU64::new(start_id),
975            pending,
976            cancelled,
977            cancel_notify,
978            writer,
979            session_id: std::sync::Mutex::new(String::new()),
980            script_name: std::sync::Mutex::new(String::new()),
981            queued_transcript_injections: injection_state.unwrap_or_default(),
982            resume_requested: Arc::new(AtomicBool::new(false)),
983            skills_reload_requested: Arc::new(AtomicBool::new(false)),
984            daemon_idle: Arc::new(AtomicBool::new(false)),
985            prompt_stop_reason: std::sync::Mutex::new(None),
986            visible_call_states: std::sync::Mutex::new(HashMap::new()),
987            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
988            in_process: None,
989        }
990    }
991
992    /// Create an in-process host bridge backed by exported functions from a
993    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
994    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
995        let exported_functions = vm.load_module_exports(module_path).await?;
996        Ok(Self {
997            next_id: AtomicU64::new(1),
998            pending: Arc::new(Mutex::new(HashMap::new())),
999            cancelled: Arc::new(AtomicBool::new(false)),
1000            cancel_notify: Arc::new(Notify::new()),
1001            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
1002            session_id: std::sync::Mutex::new(String::new()),
1003            script_name: std::sync::Mutex::new(String::new()),
1004            queued_transcript_injections: HostBridgeInjectionState::default(),
1005            resume_requested: Arc::new(AtomicBool::new(false)),
1006            skills_reload_requested: Arc::new(AtomicBool::new(false)),
1007            daemon_idle: Arc::new(AtomicBool::new(false)),
1008            prompt_stop_reason: std::sync::Mutex::new(None),
1009            visible_call_states: std::sync::Mutex::new(HashMap::new()),
1010            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
1011            in_process: Some(InProcessHost {
1012                module_path: module_path.to_path_buf(),
1013                exported_functions,
1014                vm,
1015            }),
1016        })
1017    }
1018
1019    /// Set the ACP session ID for session-scoped notifications.
1020    pub fn set_session_id(&self, id: &str) {
1021        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1022    }
1023
1024    /// Set the currently executing script name (without .harn suffix).
1025    pub fn set_script_name(&self, name: &str) {
1026        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1027    }
1028
1029    /// Get the current script name.
1030    fn get_script_name(&self) -> String {
1031        self.script_name
1032            .lock()
1033            .unwrap_or_else(|e| e.into_inner())
1034            .clone()
1035    }
1036
1037    /// Get the session ID.
1038    pub fn get_session_id(&self) -> String {
1039        self.session_id
1040            .lock()
1041            .unwrap_or_else(|e| e.into_inner())
1042            .clone()
1043    }
1044
1045    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
1046    fn write_line(&self, line: &str) -> Result<(), VmError> {
1047        (self.writer)(line).map_err(VmError::Runtime)
1048    }
1049
1050    /// Send a JSON-RPC request to the host and wait for the response.
1051    /// Times out after 5 minutes to prevent deadlocks.
1052    pub async fn call(
1053        &self,
1054        method: &str,
1055        params: serde_json::Value,
1056    ) -> Result<serde_json::Value, VmError> {
1057        if let Some(in_process) = &self.in_process {
1058            return in_process.dispatch(method, params).await;
1059        }
1060
1061        if self.is_cancelled() {
1062            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1063        }
1064
1065        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1066        let cancel_wait = self.cancel_notify.notified();
1067        tokio::pin!(cancel_wait);
1068
1069        let request = crate::jsonrpc::request(id, method, params);
1070
1071        let (tx, rx) = oneshot::channel();
1072        {
1073            let mut pending = self.pending.lock().await;
1074            pending.insert(id, tx);
1075        }
1076
1077        let line = serde_json::to_string(&request)
1078            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1079        if let Err(e) = self.write_line(&line) {
1080            let mut pending = self.pending.lock().await;
1081            pending.remove(&id);
1082            return Err(e);
1083        }
1084
1085        if self.is_cancelled() {
1086            let mut pending = self.pending.lock().await;
1087            pending.remove(&id);
1088            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1089        }
1090
1091        let response = tokio::select! {
1092            result = rx => match result {
1093                Ok(msg) => msg,
1094                Err(_) => {
1095                    // Sender dropped: host closed or stdin reader exited.
1096                    return Err(VmError::Runtime(
1097                        "Bridge: host closed connection before responding".into(),
1098                    ));
1099                }
1100            },
1101            _ = &mut cancel_wait => {
1102                let mut pending = self.pending.lock().await;
1103                pending.remove(&id);
1104                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1105            }
1106            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1107                let mut pending = self.pending.lock().await;
1108                pending.remove(&id);
1109                return Err(VmError::Runtime(format!(
1110                    "Bridge: host did not respond to '{method}' within {}s",
1111                    DEFAULT_TIMEOUT.as_secs()
1112                )));
1113            }
1114        };
1115
1116        if let Some(error) = response.get("error") {
1117            let message = error["message"].as_str().unwrap_or("Unknown host error");
1118            let code = error["code"].as_i64().unwrap_or(-1);
1119            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
1120            if code == -32001 {
1121                return Err(VmError::CategorizedError {
1122                    message: message.to_string(),
1123                    category: ErrorCategory::ToolRejected,
1124                });
1125            }
1126            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1127        }
1128
1129        Ok(response["result"].clone())
1130    }
1131
1132    /// Send a JSON-RPC notification to the host (no response expected).
1133    /// Serialized through the stdout mutex to prevent interleaving.
1134    pub fn notify(&self, method: &str, params: serde_json::Value) {
1135        let notification = crate::jsonrpc::notification(method, params);
1136        if self.in_process.is_some() {
1137            return;
1138        }
1139        if let Ok(line) = serde_json::to_string(&notification) {
1140            let _ = self.write_line(&line);
1141        }
1142    }
1143
1144    /// Check if the host has sent a cancel notification.
1145    pub fn is_cancelled(&self) -> bool {
1146        self.cancelled.load(Ordering::SeqCst)
1147    }
1148
1149    pub fn take_resume_signal(&self) -> bool {
1150        self.resume_requested.swap(false, Ordering::SeqCst)
1151    }
1152
1153    pub fn signal_resume(&self) {
1154        self.resume_requested.store(true, Ordering::SeqCst);
1155    }
1156
1157    pub fn set_daemon_idle(&self, idle: bool) {
1158        self.daemon_idle.store(idle, Ordering::SeqCst);
1159    }
1160
1161    pub fn is_daemon_idle(&self) -> bool {
1162        self.daemon_idle.load(Ordering::SeqCst)
1163    }
1164
1165    /// Record the canonical ACP `stopReason` for the current prompt. The
1166    /// last writer wins, which matches the semantic that an outer
1167    /// `agent_loop` (the one whose result the user observes) always
1168    /// finalizes after any inner loops it spawned.
1169    pub fn set_prompt_stop_reason(&self, reason: &str) {
1170        *self
1171            .prompt_stop_reason
1172            .lock()
1173            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1174    }
1175
1176    /// Consume any prompt stop reason recorded during this prompt. The
1177    /// ACP adapter calls this once after the pipeline returns; pipelines
1178    /// that didn't run an `agent_loop` see `None` and the adapter falls
1179    /// back to `end_turn`.
1180    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1181        self.prompt_stop_reason
1182            .lock()
1183            .unwrap_or_else(|e| e.into_inner())
1184            .take()
1185    }
1186
1187    /// Consume any pending `skills/update` signal the host has sent.
1188    /// Returns `true` exactly once per notification, letting callers
1189    /// trigger a layered-discovery rebuild without polling false
1190    /// positives. See issue #73 for the hot-reload contract.
1191    pub fn take_skills_reload_signal(&self) -> bool {
1192        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1193    }
1194
1195    /// Manually mark the skill catalog as stale. Used by tests and by
1196    /// the CLI when an internal event (e.g. `harn install`) should
1197    /// trigger the same rebuild a `skills/update` notification would.
1198    pub fn signal_skills_reload(&self) {
1199        self.skills_reload_requested.store(true, Ordering::SeqCst);
1200    }
1201
1202    /// Call the host's `skills/list` RPC and return the raw JSON array
1203    /// it responded with. Shape:
1204    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1205    /// The CLI adapter converts each entry into a
1206    /// [`crate::skills::SkillManifestRef`].
1207    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1208        let result = self.call("skills/list", serde_json::json!({})).await?;
1209        match result {
1210            serde_json::Value::Array(items) => Ok(items),
1211            serde_json::Value::Object(map) => match map.get("skills") {
1212                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1213                _ => Err(VmError::Runtime(
1214                    "skills/list: host response must be an array or { skills: [...] }".into(),
1215                )),
1216            },
1217            _ => Err(VmError::Runtime(
1218                "skills/list: unexpected response shape".into(),
1219            )),
1220        }
1221    }
1222
1223    /// Call the host's `host/tools/list` RPC and return normalized tool
1224    /// descriptors. Shape:
1225    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1226    /// The bridge also accepts `{ "tools": [...] }` and
1227    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1228    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1229        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1230        parse_host_tools_list_response(result)
1231    }
1232
1233    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1234    /// raw JSON body so the CLI can inspect both the frontmatter fields
1235    /// and the skill markdown body in whatever shape the host sends.
1236    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1237        self.call("skills/fetch", serde_json::json!({ "id": id }))
1238            .await
1239    }
1240
1241    pub fn injection_state(&self) -> HostBridgeInjectionState {
1242        self.queued_transcript_injections.clone()
1243    }
1244
1245    pub async fn push_pending_user_message(
1246        &self,
1247        content: String,
1248        transcript_content: serde_json::Value,
1249        mode: &str,
1250    ) -> String {
1251        self.queued_transcript_injections
1252            .push_pending_user_message(content, transcript_content, mode)
1253            .await
1254    }
1255
1256    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1257        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1258            .await
1259    }
1260
1261    pub async fn revoke_pending_user_message(
1262        &self,
1263        message_id: &str,
1264    ) -> PendingUserMessageMutationResult {
1265        self.queued_transcript_injections
1266            .revoke_pending_user_message(message_id)
1267            .await
1268    }
1269
1270    pub async fn revoke_pending_reminder(
1271        &self,
1272        reminder_id: &str,
1273    ) -> PendingReminderMutationResult {
1274        self.queued_transcript_injections
1275            .revoke_pending_reminder(reminder_id)
1276            .await
1277    }
1278
1279    pub async fn pending_injections_json(&self) -> serde_json::Value {
1280        self.queued_transcript_injections
1281            .pending_injections_json()
1282            .await
1283    }
1284
1285    pub async fn replace_pending_user_message(
1286        &self,
1287        message_id: &str,
1288        content: String,
1289        transcript_content: serde_json::Value,
1290    ) -> PendingUserMessageMutationResult {
1291        self.queued_transcript_injections
1292            .replace_pending_user_message(message_id, content, transcript_content)
1293            .await
1294    }
1295
1296    pub async fn push_queued_session_remind_from_params(
1297        &self,
1298        params: &serde_json::Value,
1299    ) -> Result<String, String> {
1300        let reminder = queued_session_remind_from_params(params)?;
1301        let reminder_id = reminder.reminder.id.clone();
1302        self.queued_transcript_injections
1303            .push_session_reminder(reminder)
1304            .await;
1305        Ok(reminder_id)
1306    }
1307
1308    pub async fn take_queued_user_messages(
1309        &self,
1310        include_interrupt_immediate: bool,
1311        include_finish_step: bool,
1312        include_audit_only: bool,
1313    ) -> Vec<QueuedUserMessage> {
1314        let mut state = self.queued_transcript_injections.inner.lock().await;
1315        let mut selected = Vec::new();
1316        let mut retained = VecDeque::new();
1317        while let Some(injection) = state.queue.pop_front() {
1318            let should_take = match injection.mode() {
1319                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1320                QueuedUserMessageMode::FinishStep => include_finish_step,
1321                QueuedUserMessageMode::AuditOnly => include_audit_only,
1322            };
1323            match (should_take, injection) {
1324                (true, QueuedTranscriptInjection::User(message)) => {
1325                    state
1326                        .delivered_user_message_ids
1327                        .insert(message.message_id.clone());
1328                    selected.push(message);
1329                }
1330                (_, injection) => retained.push_back(injection),
1331            }
1332        }
1333        state.queue = retained;
1334        selected
1335    }
1336
1337    pub async fn take_queued_transcript_injections(
1338        &self,
1339        include_interrupt_immediate: bool,
1340        include_finish_step: bool,
1341        include_audit_only: bool,
1342    ) -> Vec<QueuedTranscriptInjection> {
1343        let mut state = self.queued_transcript_injections.inner.lock().await;
1344        let mut selected = Vec::new();
1345        let mut retained = VecDeque::new();
1346        while let Some(injection) = state.queue.pop_front() {
1347            let should_take = match injection.mode() {
1348                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1349                QueuedUserMessageMode::FinishStep => include_finish_step,
1350                QueuedUserMessageMode::AuditOnly => include_audit_only,
1351            };
1352            if should_take {
1353                match &injection {
1354                    QueuedTranscriptInjection::User(message) => {
1355                        state
1356                            .delivered_user_message_ids
1357                            .insert(message.message_id.clone());
1358                    }
1359                    QueuedTranscriptInjection::Reminder(reminder) => {
1360                        state
1361                            .delivered_reminder_ids
1362                            .insert(reminder.reminder.id.clone());
1363                    }
1364                }
1365                selected.push(injection);
1366            } else {
1367                retained.push_back(injection);
1368            }
1369        }
1370        state.queue = retained;
1371        selected
1372    }
1373
1374    pub async fn take_queued_user_messages_for(
1375        &self,
1376        checkpoint: DeliveryCheckpoint,
1377    ) -> Vec<QueuedUserMessage> {
1378        match checkpoint {
1379            DeliveryCheckpoint::InterruptImmediate => {
1380                self.take_queued_user_messages(true, false, false).await
1381            }
1382            DeliveryCheckpoint::AfterCurrentOperation => {
1383                self.take_queued_user_messages(false, true, false).await
1384            }
1385            DeliveryCheckpoint::EndOfInteraction => {
1386                self.take_queued_user_messages(false, false, true).await
1387            }
1388        }
1389    }
1390
1391    pub async fn take_queued_transcript_injections_for(
1392        &self,
1393        checkpoint: DeliveryCheckpoint,
1394    ) -> Vec<QueuedTranscriptInjection> {
1395        match checkpoint {
1396            DeliveryCheckpoint::InterruptImmediate => {
1397                self.take_queued_transcript_injections(true, false, false)
1398                    .await
1399            }
1400            DeliveryCheckpoint::AfterCurrentOperation => {
1401                self.take_queued_transcript_injections(false, true, false)
1402                    .await
1403            }
1404            DeliveryCheckpoint::EndOfInteraction => {
1405                self.take_queued_transcript_injections(false, false, true)
1406                    .await
1407            }
1408        }
1409    }
1410
1411    /// Send an output notification (for log/print in bridge mode).
1412    pub fn send_output(&self, text: &str) {
1413        self.notify("output", serde_json::json!({"text": text}));
1414    }
1415
1416    /// Send a progress notification with optional numeric progress and structured data.
1417    pub fn send_progress(
1418        &self,
1419        phase: &str,
1420        message: &str,
1421        progress: Option<i64>,
1422        total: Option<i64>,
1423        data: Option<serde_json::Value>,
1424    ) {
1425        let mut payload = serde_json::json!({"phase": phase, "message": message});
1426        if let Some(p) = progress {
1427            payload["progress"] = serde_json::json!(p);
1428        }
1429        if let Some(t) = total {
1430            payload["total"] = serde_json::json!(t);
1431        }
1432        if let Some(d) = data {
1433            payload["data"] = d;
1434        }
1435        self.notify("progress", payload);
1436    }
1437
1438    /// Send a structured log notification.
1439    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1440        let mut payload = serde_json::json!({"level": level, "message": message});
1441        if let Some(f) = fields {
1442            payload["fields"] = f;
1443        }
1444        self.notify("log", payload);
1445    }
1446
1447    /// Send a `session/update` with `call_start` — signals the beginning of
1448    /// an LLM call, tool call, or builtin call for observability.
1449    pub fn send_call_start(
1450        &self,
1451        call_id: &str,
1452        call_type: &str,
1453        name: &str,
1454        metadata: serde_json::Value,
1455    ) {
1456        let session_id = self.get_session_id();
1457        let script = self.get_script_name();
1458        let stream_publicly = metadata
1459            .get("stream_publicly")
1460            .and_then(|value| value.as_bool())
1461            .unwrap_or(true);
1462        self.visible_call_streams
1463            .lock()
1464            .unwrap_or_else(|e| e.into_inner())
1465            .insert(call_id.to_string(), stream_publicly);
1466        self.notify(
1467            "session/update",
1468            serde_json::json!({
1469                "sessionId": session_id,
1470                "update": {
1471                    "sessionUpdate": "call_start",
1472                    "content": {
1473                        "toolCallId": call_id,
1474                        "call_type": call_type,
1475                        "name": name,
1476                        "script": script,
1477                        "metadata": metadata,
1478                    },
1479                },
1480            }),
1481        );
1482    }
1483
1484    /// Send a `session/update` with `call_progress` — a streaming token delta
1485    /// from an in-flight LLM call.
1486    pub fn send_call_progress(
1487        &self,
1488        call_id: &str,
1489        delta: &str,
1490        accumulated_tokens: u64,
1491        user_visible: bool,
1492    ) {
1493        let session_id = self.get_session_id();
1494        let (visible_text, visible_delta) = {
1495            let stream_publicly = self
1496                .visible_call_streams
1497                .lock()
1498                .unwrap_or_else(|e| e.into_inner())
1499                .get(call_id)
1500                .copied()
1501                .unwrap_or(true);
1502            let mut states = self
1503                .visible_call_states
1504                .lock()
1505                .unwrap_or_else(|e| e.into_inner());
1506            let state = states.entry(call_id.to_string()).or_default();
1507            state.push(delta, stream_publicly)
1508        };
1509        self.notify(
1510            "session/update",
1511            serde_json::json!({
1512                "sessionId": session_id,
1513                "update": {
1514                    "sessionUpdate": "call_progress",
1515                    "content": {
1516                        "toolCallId": call_id,
1517                        "delta": delta,
1518                        "accumulated_tokens": accumulated_tokens,
1519                        "visible_text": visible_text,
1520                        "visible_delta": visible_delta,
1521                        "user_visible": user_visible,
1522                    },
1523                },
1524            }),
1525        );
1526    }
1527
1528    /// Send a `session/update` with `call_end` — signals completion of a call.
1529    pub fn send_call_end(
1530        &self,
1531        call_id: &str,
1532        call_type: &str,
1533        name: &str,
1534        duration_ms: u64,
1535        status: &str,
1536        metadata: serde_json::Value,
1537    ) {
1538        let session_id = self.get_session_id();
1539        let script = self.get_script_name();
1540        self.visible_call_states
1541            .lock()
1542            .unwrap_or_else(|e| e.into_inner())
1543            .remove(call_id);
1544        self.visible_call_streams
1545            .lock()
1546            .unwrap_or_else(|e| e.into_inner())
1547            .remove(call_id);
1548        self.notify(
1549            "session/update",
1550            serde_json::json!({
1551                "sessionId": session_id,
1552                "update": {
1553                    "sessionUpdate": "call_end",
1554                    "content": {
1555                        "toolCallId": call_id,
1556                        "call_type": call_type,
1557                        "name": name,
1558                        "script": script,
1559                        "duration_ms": duration_ms,
1560                        "status": status,
1561                        "metadata": metadata,
1562                    },
1563                },
1564            }),
1565        );
1566    }
1567
1568    /// Send a worker lifecycle update for delegated/background execution.
1569    pub fn send_worker_update(
1570        &self,
1571        worker_id: &str,
1572        worker_name: &str,
1573        status: &str,
1574        metadata: serde_json::Value,
1575        audit: Option<&MutationSessionRecord>,
1576    ) {
1577        let session_id = self.get_session_id();
1578        let script = self.get_script_name();
1579        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1580        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1581        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1582        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1583        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1584        let lifecycle = serde_json::json!({
1585            "event": status,
1586            "worker_id": worker_id,
1587            "worker_name": worker_name,
1588            "started_at": started_at,
1589            "finished_at": finished_at,
1590        });
1591        self.notify(
1592            "session/update",
1593            serde_json::json!({
1594                "sessionId": session_id,
1595                "update": {
1596                    "sessionUpdate": "worker_update",
1597                    "content": {
1598                        "worker_id": worker_id,
1599                        "worker_name": worker_name,
1600                        "status": status,
1601                        "script": script,
1602                        "started_at": started_at,
1603                        "finished_at": finished_at,
1604                        "snapshot_path": snapshot_path,
1605                        "run_id": run_id,
1606                        "run_path": run_path,
1607                        "lifecycle": lifecycle,
1608                        "audit": audit,
1609                        "metadata": metadata,
1610                    },
1611                },
1612            }),
1613        );
1614    }
1615}
1616
1617/// Convert a serde_json::Value to a VmValue.
1618pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1619    crate::stdlib::json_to_vm_value(val)
1620}
1621
1622fn parse_host_tools_list_response(
1623    result: serde_json::Value,
1624) -> Result<Vec<serde_json::Value>, VmError> {
1625    let tools = match result {
1626        serde_json::Value::Array(items) => items,
1627        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1628            map.get("result")
1629                .and_then(|value| value.get("tools"))
1630                .cloned()
1631        }) {
1632            Some(serde_json::Value::Array(items)) => items,
1633            _ => {
1634                return Err(VmError::Runtime(
1635                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1636                ));
1637            }
1638        },
1639        _ => {
1640            return Err(VmError::Runtime(
1641                "host/tools/list: unexpected response shape".into(),
1642            ));
1643        }
1644    };
1645
1646    let mut normalized = Vec::with_capacity(tools.len());
1647    for tool in tools {
1648        let serde_json::Value::Object(map) = tool else {
1649            return Err(VmError::Runtime(
1650                "host/tools/list: every tool must be an object".into(),
1651            ));
1652        };
1653        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1654            return Err(VmError::Runtime(
1655                "host/tools/list: every tool must include a string `name`".into(),
1656            ));
1657        };
1658        let description = map
1659            .get("description")
1660            .and_then(|value| value.as_str())
1661            .or_else(|| {
1662                map.get("short_description")
1663                    .and_then(|value| value.as_str())
1664            })
1665            .unwrap_or_default();
1666        let schema = map
1667            .get("schema")
1668            .cloned()
1669            .or_else(|| map.get("parameters").cloned())
1670            .or_else(|| map.get("input_schema").cloned())
1671            .unwrap_or(serde_json::Value::Null);
1672        let deprecated = map
1673            .get("deprecated")
1674            .and_then(|value| value.as_bool())
1675            .unwrap_or(false);
1676        normalized.push(serde_json::json!({
1677            "name": name,
1678            "description": description,
1679            "schema": schema,
1680            "deprecated": deprecated,
1681        }));
1682    }
1683    Ok(normalized)
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688    use super::*;
1689
1690    fn test_bridge() -> HostBridge {
1691        HostBridge::from_parts(
1692            Arc::new(Mutex::new(HashMap::new())),
1693            Arc::new(AtomicBool::new(false)),
1694            Arc::new(std::sync::Mutex::new(())),
1695            1,
1696        )
1697    }
1698
1699    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1700        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1701            Arc::new(Mutex::new(HashMap::new())),
1702            Arc::new(AtomicBool::new(false)),
1703            Arc::new(Notify::new()),
1704            Arc::new(|_| Ok(())),
1705            100,
1706            Some(owner.injection_state()),
1707        )
1708    }
1709
1710    #[test]
1711    fn test_json_rpc_request_format() {
1712        let request = crate::jsonrpc::request(
1713            1,
1714            "llm_call",
1715            serde_json::json!({
1716                "prompt": "Hello",
1717                "system": "Be helpful",
1718            }),
1719        );
1720        let s = serde_json::to_string(&request).unwrap();
1721        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1722        assert!(s.contains("\"id\":1"));
1723        assert!(s.contains("\"method\":\"llm_call\""));
1724    }
1725
1726    #[test]
1727    fn test_json_rpc_notification_format() {
1728        let notification =
1729            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1730        let s = serde_json::to_string(&notification).unwrap();
1731        assert!(s.contains("\"method\":\"output\""));
1732        assert!(!s.contains("\"id\""));
1733    }
1734
1735    #[test]
1736    fn test_json_rpc_error_response_parsing() {
1737        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1738        assert!(response.get("error").is_some());
1739        assert_eq!(
1740            response["error"]["message"].as_str().unwrap(),
1741            "Invalid request"
1742        );
1743    }
1744
1745    #[test]
1746    fn test_json_rpc_success_response_parsing() {
1747        let response = crate::jsonrpc::response(
1748            1,
1749            serde_json::json!({
1750                "text": "Hello world",
1751                "input_tokens": 10,
1752                "output_tokens": 5,
1753            }),
1754        );
1755        assert!(response.get("result").is_some());
1756        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1757    }
1758
1759    #[test]
1760    fn test_cancelled_flag() {
1761        let cancelled = Arc::new(AtomicBool::new(false));
1762        assert!(!cancelled.load(Ordering::SeqCst));
1763        cancelled.store(true, Ordering::SeqCst);
1764        assert!(cancelled.load(Ordering::SeqCst));
1765    }
1766
1767    #[test]
1768    fn pending_host_calls_return_when_cancellation_arrives() {
1769        let runtime = tokio::runtime::Builder::new_current_thread()
1770            .enable_all()
1771            .build()
1772            .unwrap();
1773        runtime.block_on(async {
1774            let pending = Arc::new(Mutex::new(HashMap::new()));
1775            let cancelled = Arc::new(AtomicBool::new(false));
1776            let bridge = HostBridge::from_parts_with_writer(
1777                pending.clone(),
1778                cancelled.clone(),
1779                Arc::new(|_| Ok(())),
1780                1,
1781            );
1782
1783            let call = bridge.call("host/work", serde_json::json!({}));
1784            tokio::pin!(call);
1785
1786            loop {
1787                tokio::select! {
1788                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1789                    _ = tokio::task::yield_now() => {}
1790                }
1791                if !pending.lock().await.is_empty() {
1792                    break;
1793                }
1794            }
1795
1796            cancelled.store(true, Ordering::SeqCst);
1797            bridge.cancel_notify.notify_waiters();
1798
1799            let result = tokio::time::timeout(Duration::from_secs(1), call)
1800                .await
1801                .expect("pending call should observe cancellation promptly");
1802            assert!(
1803                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1804            );
1805            assert!(pending.lock().await.is_empty());
1806        });
1807    }
1808
1809    #[test]
1810    fn queued_messages_are_filtered_by_delivery_mode() {
1811        let runtime = tokio::runtime::Builder::new_current_thread()
1812            .enable_all()
1813            .build()
1814            .unwrap();
1815        runtime.block_on(async {
1816            let bridge = test_bridge();
1817            bridge
1818                .push_queued_user_message("first".to_string(), "finish_step")
1819                .await;
1820            bridge
1821                .push_queued_user_message("second".to_string(), "audit_only")
1822                .await;
1823
1824            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1825            assert_eq!(finish_step.len(), 1);
1826            assert_eq!(finish_step[0].content, "first");
1827
1828            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1829            assert_eq!(audit_only.len(), 1);
1830            assert_eq!(audit_only[0].content, "second");
1831        });
1832    }
1833
1834    #[test]
1835    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1836        let runtime = tokio::runtime::Builder::new_current_thread()
1837            .enable_all()
1838            .build()
1839            .unwrap();
1840        runtime.block_on(async {
1841            let bridge = test_bridge();
1842            let first_id = bridge
1843                .push_pending_user_message(
1844                    "first".to_string(),
1845                    serde_json::json!("first"),
1846                    "audit_only",
1847                )
1848                .await;
1849            let second_id = bridge
1850                .push_pending_user_message(
1851                    "second".to_string(),
1852                    serde_json::json!("second"),
1853                    "audit_only",
1854                )
1855                .await;
1856
1857            assert_eq!(
1858                bridge
1859                    .replace_pending_user_message(
1860                        &second_id,
1861                        "second edited".to_string(),
1862                        serde_json::json!("second edited"),
1863                    )
1864                    .await,
1865                PendingUserMessageMutationResult::Mutated
1866            );
1867            assert_eq!(
1868                bridge.revoke_pending_user_message(&first_id).await,
1869                PendingUserMessageMutationResult::Mutated
1870            );
1871            assert_eq!(
1872                bridge.revoke_pending_user_message(&first_id).await,
1873                PendingUserMessageMutationResult::AlreadyRevoked
1874            );
1875
1876            let delivered = bridge
1877                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1878                .await;
1879            assert_eq!(delivered.len(), 1);
1880            assert_eq!(delivered[0].message_id, second_id);
1881            assert_eq!(delivered[0].content, "second edited");
1882
1883            assert_eq!(
1884                bridge.revoke_pending_user_message(&second_id).await,
1885                PendingUserMessageMutationResult::AlreadyDelivered
1886            );
1887            assert_eq!(
1888                bridge
1889                    .replace_pending_user_message(
1890                        &second_id,
1891                        "too late".to_string(),
1892                        serde_json::json!("too late"),
1893                    )
1894                    .await,
1895                PendingUserMessageMutationResult::AlreadyDelivered
1896            );
1897            assert_eq!(
1898                bridge.revoke_pending_user_message("missing").await,
1899                PendingUserMessageMutationResult::UnknownMessageId
1900            );
1901        });
1902    }
1903
1904    #[test]
1905    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1906        let runtime = tokio::runtime::Builder::new_current_thread()
1907            .enable_all()
1908            .build()
1909            .unwrap();
1910        runtime.block_on(async {
1911            let bridge = test_bridge();
1912            let first_id = bridge
1913                .push_pending_user_message(
1914                    "first".to_string(),
1915                    serde_json::json!("first"),
1916                    "finish_step",
1917                )
1918                .await;
1919            let second_id = bridge
1920                .push_pending_user_message(
1921                    "second".to_string(),
1922                    serde_json::json!("second"),
1923                    "finish_step",
1924                )
1925                .await;
1926            assert_eq!(
1927                bridge
1928                    .replace_pending_user_message(
1929                        &first_id,
1930                        "first edited".to_string(),
1931                        serde_json::json!("first edited"),
1932                    )
1933                    .await,
1934                PendingUserMessageMutationResult::Mutated
1935            );
1936
1937            let delivered = bridge
1938                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1939                .await;
1940            assert_eq!(
1941                delivered
1942                    .iter()
1943                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1944                    .collect::<Vec<_>>(),
1945                vec![
1946                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1947                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
1948                ]
1949            );
1950        });
1951    }
1952
1953    #[test]
1954    fn pending_user_message_state_survives_bridge_replacement() {
1955        let runtime = tokio::runtime::Builder::new_current_thread()
1956            .enable_all()
1957            .build()
1958            .unwrap();
1959        runtime.block_on(async {
1960            let bridge = test_bridge();
1961            let revoked_id = bridge
1962                .push_pending_user_message(
1963                    "revoke me".to_string(),
1964                    serde_json::json!("revoke me"),
1965                    "audit_only",
1966                )
1967                .await;
1968            let delivered_id = bridge
1969                .push_pending_user_message(
1970                    "deliver me".to_string(),
1971                    serde_json::json!("deliver me"),
1972                    "audit_only",
1973                )
1974                .await;
1975            assert_eq!(
1976                bridge.revoke_pending_user_message(&revoked_id).await,
1977                PendingUserMessageMutationResult::Mutated
1978            );
1979            bridge.cancelled.store(true, Ordering::SeqCst);
1980
1981            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1982            assert_eq!(
1983                replacement_bridge
1984                    .revoke_pending_user_message(&revoked_id)
1985                    .await,
1986                PendingUserMessageMutationResult::AlreadyRevoked
1987            );
1988            let delivered = replacement_bridge
1989                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1990                .await;
1991            assert_eq!(delivered.len(), 1);
1992            assert_eq!(delivered[0].message_id, delivered_id);
1993            assert_eq!(delivered[0].content, "deliver me");
1994            assert_eq!(
1995                bridge.revoke_pending_user_message(&delivered_id).await,
1996                PendingUserMessageMutationResult::AlreadyDelivered
1997            );
1998        });
1999    }
2000
2001    #[test]
2002    fn queued_transcript_injections_preserve_user_reminder_separation() {
2003        let runtime = tokio::runtime::Builder::new_current_thread()
2004            .enable_all()
2005            .build()
2006            .unwrap();
2007        runtime.block_on(async {
2008            let bridge = test_bridge();
2009            bridge
2010                .push_queued_user_message("human follow-up".to_string(), "finish_step")
2011                .await;
2012            let reminder_id = bridge
2013                .push_queued_session_remind_from_params(&serde_json::json!({
2014                    "body": "Host-provided ambient context.",
2015                    "tags": ["host"],
2016                    "dedupe_key": "host-context",
2017                    "ttl_turns": 2,
2018                    "mode": "audit_only",
2019                    "_meta": {"harn": {"source": "test"}},
2020                }))
2021                .await
2022                .expect("valid reminder");
2023
2024            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2025            assert_eq!(finish_step.len(), 1);
2026            assert_eq!(finish_step[0].content, "human follow-up");
2027
2028            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2029            assert!(no_user_messages.is_empty());
2030
2031            let injections = bridge
2032                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2033                .await;
2034            assert_eq!(injections.len(), 1);
2035            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2036                panic!("expected queued reminder");
2037            };
2038            assert_eq!(reminder.reminder.id, reminder_id);
2039            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2040            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2041            assert_eq!(
2042                reminder.reminder.dedupe_key.as_deref(),
2043                Some("host-context")
2044            );
2045            assert_eq!(reminder.reminder.ttl_turns, Some(2));
2046            assert_eq!(
2047                reminder.reminder.source,
2048                crate::llm::helpers::ReminderSource::Bridge
2049            );
2050        });
2051    }
2052
2053    #[test]
2054    fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2055        let runtime = tokio::runtime::Builder::new_current_thread()
2056            .enable_all()
2057            .build()
2058            .unwrap();
2059        runtime.block_on(async {
2060            let bridge = test_bridge();
2061            let message_id = bridge
2062                .push_pending_user_message(
2063                    "human follow-up".to_string(),
2064                    serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2065                    "finish_step",
2066                )
2067                .await;
2068            let reminder_id = bridge
2069                .push_queued_session_remind_from_params(&serde_json::json!({
2070                    "id": "rem-test",
2071                    "body": "Host reminder",
2072                    "tags": ["host"],
2073                    "dedupe_key": "host-reminder",
2074                    "ttl_turns": 2,
2075                    "mode": "interrupt_immediate",
2076                }))
2077                .await
2078                .expect("valid session/remind payload");
2079
2080            let pending = bridge.pending_injections_json().await;
2081            assert_eq!(pending["pendingCount"], 2);
2082            assert_eq!(pending["injections"][0]["kind"], "user");
2083            assert_eq!(pending["injections"][0]["id"], message_id);
2084            assert_eq!(pending["injections"][0]["messageId"], message_id);
2085            assert_eq!(pending["injections"][0]["mode"], "finish_step");
2086            assert_eq!(pending["injections"][0]["position"], 0);
2087            assert_eq!(pending["injections"][1]["kind"], "reminder");
2088            assert_eq!(pending["injections"][1]["id"], reminder_id);
2089            assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2090            assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2091            assert_eq!(pending["injections"][1]["body"], "Host reminder");
2092            assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2093            assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2094            assert_eq!(pending["injections"][1]["position"], 1);
2095        });
2096    }
2097
2098    #[test]
2099    fn pending_reminders_support_revoke_and_delivery_states() {
2100        let runtime = tokio::runtime::Builder::new_current_thread()
2101            .enable_all()
2102            .build()
2103            .unwrap();
2104        runtime.block_on(async {
2105            let bridge = test_bridge();
2106            let revoked_id = bridge
2107                .push_queued_session_remind_from_params(&serde_json::json!({
2108                    "id": "rem-revoke",
2109                    "body": "remove me",
2110                    "mode": "finish_step",
2111                }))
2112                .await
2113                .expect("valid session/remind payload");
2114            let delivered_id = bridge
2115                .push_queued_session_remind_from_params(&serde_json::json!({
2116                    "id": "rem-deliver",
2117                    "body": "deliver me",
2118                    "mode": "finish_step",
2119                }))
2120                .await
2121                .expect("valid session/remind payload");
2122
2123            assert_eq!(
2124                bridge.revoke_pending_reminder(&revoked_id).await,
2125                PendingReminderMutationResult::Mutated
2126            );
2127            assert_eq!(
2128                bridge.revoke_pending_reminder(&revoked_id).await,
2129                PendingReminderMutationResult::AlreadyRevoked
2130            );
2131
2132            let pending = bridge.pending_injections_json().await;
2133            assert_eq!(pending["pendingCount"], 1);
2134            assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2135
2136            let delivered = bridge
2137                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2138                .await;
2139            assert_eq!(delivered.len(), 1);
2140            let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2141                panic!("expected delivered reminder");
2142            };
2143            assert_eq!(reminder.reminder.id, delivered_id);
2144
2145            assert_eq!(
2146                bridge.revoke_pending_reminder(&delivered_id).await,
2147                PendingReminderMutationResult::AlreadyDelivered
2148            );
2149            assert_eq!(
2150                bridge.revoke_pending_reminder("missing").await,
2151                PendingReminderMutationResult::UnknownReminderId
2152            );
2153        });
2154    }
2155
2156    #[test]
2157    fn bridge_remind_modes_honor_delivery_checkpoints() {
2158        let runtime = tokio::runtime::Builder::new_current_thread()
2159            .enable_all()
2160            .build()
2161            .unwrap();
2162        runtime.block_on(async {
2163            let cases = [
2164                (
2165                    "interrupt_immediate",
2166                    DeliveryCheckpoint::InterruptImmediate,
2167                    DeliveryCheckpoint::AfterCurrentOperation,
2168                ),
2169                (
2170                    "finish_step",
2171                    DeliveryCheckpoint::AfterCurrentOperation,
2172                    DeliveryCheckpoint::EndOfInteraction,
2173                ),
2174                (
2175                    "audit_only",
2176                    DeliveryCheckpoint::EndOfInteraction,
2177                    DeliveryCheckpoint::InterruptImmediate,
2178                ),
2179            ];
2180
2181            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2182                let bridge = test_bridge();
2183                bridge
2184                    .push_queued_session_remind_from_params(&serde_json::json!({
2185                        "body": format!("Reminder for {mode}"),
2186                        "mode": mode,
2187                    }))
2188                    .await
2189                    .expect("valid session/remind payload");
2190
2191                let premature = bridge
2192                    .take_queued_transcript_injections_for(wrong_checkpoint)
2193                    .await;
2194                assert!(
2195                    premature.is_empty(),
2196                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2197                );
2198
2199                let delivered = bridge
2200                    .take_queued_transcript_injections_for(expected_checkpoint)
2201                    .await;
2202                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2203                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2204                    panic!("expected reminder for {mode}");
2205                };
2206                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2207            }
2208        });
2209    }
2210
2211    #[test]
2212    fn session_remind_validation_rejects_user_message_shape() {
2213        let err = queued_session_remind_from_params(&serde_json::json!({
2214            "content": "this is still a user message",
2215            "mode": "interrupt_immediate",
2216        }))
2217        .expect_err("session/remind must require a reminder body");
2218        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2219        assert!(err.contains("body"));
2220    }
2221
2222    #[test]
2223    fn session_remind_validation_rejects_unknown_options_separately() {
2224        let err = queued_session_remind_from_params(&serde_json::json!({
2225            "body": "valid body",
2226            "unknown_host_field": true,
2227        }))
2228        .expect_err("session/remind must reject unknown top-level fields");
2229        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2230        assert!(err.contains("unknown_host_field"));
2231    }
2232
2233    #[test]
2234    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2235        let err = queued_session_remind_from_params(&serde_json::json!({
2236            "body": "valid body",
2237            "propagate": "workspace",
2238        }))
2239        .expect_err("session/remind must reject unknown propagate values");
2240        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2241        assert!(err.contains("propagate"));
2242    }
2243
2244    #[test]
2245    fn test_json_result_to_vm_value_string() {
2246        let val = serde_json::json!("hello");
2247        let vm_val = json_result_to_vm_value(&val);
2248        assert_eq!(vm_val.display(), "hello");
2249    }
2250
2251    #[test]
2252    fn test_json_result_to_vm_value_dict() {
2253        let val = serde_json::json!({"name": "test", "count": 42});
2254        let vm_val = json_result_to_vm_value(&val);
2255        let VmValue::Dict(d) = &vm_val else {
2256            unreachable!("Expected Dict, got {:?}", vm_val);
2257        };
2258        assert_eq!(d.get("name").unwrap().display(), "test");
2259        assert_eq!(d.get("count").unwrap().display(), "42");
2260    }
2261
2262    #[test]
2263    fn test_json_result_to_vm_value_null() {
2264        let val = serde_json::json!(null);
2265        let vm_val = json_result_to_vm_value(&val);
2266        assert!(matches!(vm_val, VmValue::Nil));
2267    }
2268
2269    #[test]
2270    fn test_json_result_to_vm_value_nested() {
2271        let val = serde_json::json!({
2272            "text": "response",
2273            "tool_calls": [
2274                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2275            ],
2276            "input_tokens": 100,
2277            "output_tokens": 50,
2278        });
2279        let vm_val = json_result_to_vm_value(&val);
2280        let VmValue::Dict(d) = &vm_val else {
2281            unreachable!("Expected Dict, got {:?}", vm_val);
2282        };
2283        assert_eq!(d.get("text").unwrap().display(), "response");
2284        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2285            unreachable!("Expected List for tool_calls");
2286        };
2287        assert_eq!(list.len(), 1);
2288    }
2289
2290    #[test]
2291    fn parse_host_tools_list_accepts_object_wrapper() {
2292        let tools = parse_host_tools_list_response(serde_json::json!({
2293            "tools": [
2294                {
2295                    "name": "Read",
2296                    "description": "Read a file",
2297                    "schema": {"type": "object"},
2298                }
2299            ]
2300        }))
2301        .expect("tool list");
2302
2303        assert_eq!(tools.len(), 1);
2304        assert_eq!(tools[0]["name"], "Read");
2305        assert_eq!(tools[0]["deprecated"], false);
2306    }
2307
2308    #[test]
2309    fn parse_host_tools_list_accepts_compat_fields() {
2310        let tools = parse_host_tools_list_response(serde_json::json!({
2311            "result": {
2312                "tools": [
2313                    {
2314                        "name": "Edit",
2315                        "short_description": "Apply an edit",
2316                        "input_schema": {"type": "object"},
2317                        "deprecated": true,
2318                    }
2319                ]
2320            }
2321        }))
2322        .expect("tool list");
2323
2324        assert_eq!(tools[0]["description"], "Apply an edit");
2325        assert_eq!(tools[0]["schema"]["type"], "object");
2326        assert_eq!(tools[0]["deprecated"], true);
2327    }
2328
2329    #[test]
2330    fn parse_host_tools_list_requires_tool_names() {
2331        let err = parse_host_tools_list_response(serde_json::json!({
2332            "tools": [
2333                {"description": "missing name"}
2334            ]
2335        }))
2336        .expect_err("expected error");
2337        assert!(err
2338            .to_string()
2339            .contains("host/tools/list: every tool must include a string `name`"));
2340    }
2341
2342    #[test]
2343    fn test_timeout_duration() {
2344        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2345    }
2346}