Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26/// Default timeout for bridge calls (5 minutes).
27const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32    Arc::new(move |line: &str| {
33        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34        let mut stdout = std::io::stdout().lock();
35        stdout
36            .write_all(line.as_bytes())
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .write_all(b"\n")
40            .map_err(|e| format!("Bridge write error: {e}"))?;
41        stdout
42            .flush()
43            .map_err(|e| format!("Bridge flush error: {e}"))?;
44        Ok(())
45    })
46}
47
48/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
49///
50/// The bridge sends requests to the host on stdout and receives responses
51/// on stdin. A background task reads stdin and dispatches responses to
52/// waiting callers by request ID. All stdout writes are serialized through
53/// a mutex to prevent interleaving.
54pub struct HostBridge {
55    next_id: AtomicU64,
56    /// Pending request waiters, keyed by JSON-RPC id.
57    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58    /// Whether the host has sent a cancel notification.
59    cancelled: Arc<AtomicBool>,
60    /// Wakes pending host calls when cancellation arrives.
61    cancel_notify: Arc<Notify>,
62    /// Transport writer used to send JSON-RPC to the host.
63    writer: HostBridgeWriter,
64    /// ACP session ID (set in ACP mode for session-scoped notifications).
65    session_id: std::sync::Mutex<String>,
66    /// Name of the currently executing Harn script (without .harn suffix).
67    script_name: std::sync::Mutex<String>,
68    /// Transcript injections queued by the host while a run is active.
69    queued_transcript_injections: HostBridgeInjectionState,
70    /// Host-triggered resume signal for daemon agents.
71    resume_requested: Arc<AtomicBool>,
72    /// Host-triggered skill-registry invalidation signal. Set when the
73    /// host sends a `skills/update` notification; consumed by the CLI
74    /// between runs (watch mode, long-running agents) to rebuild the
75    /// layered skill catalog from its current filesystem + host state.
76    skills_reload_requested: Arc<AtomicBool>,
77    /// Whether the current daemon-mode agent loop is blocked in idle wait.
78    daemon_idle: Arc<AtomicBool>,
79    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
80    /// finalize during this prompt. Read once by the ACP adapter when the
81    /// pipeline returns and populated by `host_agent_session_finalize`.
82    /// Pipelines that don't run an agent loop leave this `None`, in which
83    /// case the adapter falls back to `end_turn`.
84    prompt_stop_reason: std::sync::Mutex<Option<String>>,
85    /// Per-call visible assistant text state for call_progress notifications.
86    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87    /// Whether an LLM call's deltas should be exposed to end users while streaming.
88    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89    /// Optional in-process host-module backend used by `harn playground`.
90    in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94    module_path: PathBuf,
95    exported_functions: BTreeMap<String, Arc<VmClosure>>,
96    vm: Vm,
97}
98
99impl InProcessHost {
100    /// Box-pin'd to break the static recursion between the VM's hot dispatch
101    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
102    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
103    /// at this slow-path boundary keeps the recursion satisfied without
104    /// allocating per call in the hot per-callback path.
105    fn dispatch<'a>(
106        &'a self,
107        method: &'a str,
108        params: serde_json::Value,
109    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110        Box::pin(async move {
111            match method {
112                "builtin_call" => {
113                    let name = params
114                        .get("name")
115                        .and_then(|value| value.as_str())
116                        .unwrap_or_default();
117                    let args = params
118                        .get("args")
119                        .and_then(|value| value.as_array())
120                        .cloned()
121                        .unwrap_or_default()
122                        .into_iter()
123                        .map(|value| json_result_to_vm_value(&value))
124                        .collect::<Vec<_>>();
125                    self.invoke_export(name, &args).await
126                }
127                "host/tools/list" => self
128                    .invoke_optional_export("host_tools_list", &[])
129                    .await
130                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131                "session/request_permission" => self.request_permission(params).await,
132                other => Err(VmError::Runtime(format!(
133                    "playground host backend does not implement bridge method '{other}'"
134                ))),
135            }
136        })
137    }
138
139    async fn invoke_export(
140        &self,
141        name: &str,
142        args: &[VmValue],
143    ) -> Result<serde_json::Value, VmError> {
144        let Some(closure) = self.exported_functions.get(name) else {
145            return Err(VmError::Runtime(format!(
146                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147                self.module_path.display()
148            )));
149        };
150
151        let mut vm = self.vm.child_vm_for_host();
152        let result = vm.call_closure_pub(closure, args).await?;
153        Ok(crate::llm::vm_value_to_json(&result))
154    }
155
156    async fn invoke_optional_export(
157        &self,
158        name: &str,
159        args: &[VmValue],
160    ) -> Result<Option<serde_json::Value>, VmError> {
161        if !self.exported_functions.contains_key(name) {
162            return Ok(None);
163        }
164        self.invoke_export(name, args).await.map(Some)
165    }
166
167    async fn request_permission(
168        &self,
169        params: serde_json::Value,
170    ) -> Result<serde_json::Value, VmError> {
171        // No exported `request_permission` means the playground host has no
172        // approval policy, so it grants through the canonical ACP option.
173        let Some(closure) = self.exported_functions.get("request_permission") else {
174            return Ok(crate::llm::acp_permission::allow_response());
175        };
176
177        let tool_call = params.get("toolCall");
178        let tool_name = tool_call
179            .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182            .and_then(|value| value.as_str())
183            .unwrap_or_default();
184        let tool_args = tool_call
185            .and_then(|tool_call| tool_call.get("rawInput"))
186            .map(json_result_to_vm_value)
187            .unwrap_or(VmValue::Nil);
188        let full_payload = json_result_to_vm_value(&params);
189
190        let arg_count = closure.func.params.len();
191        let args = if arg_count >= 3 {
192            vec![
193                VmValue::String(std::sync::Arc::from(tool_name.to_string())),
194                tool_args,
195                full_payload,
196            ]
197        } else if arg_count == 2 {
198            vec![
199                VmValue::String(std::sync::Arc::from(tool_name.to_string())),
200                tool_args,
201            ]
202        } else if arg_count == 1 {
203            vec![full_payload]
204        } else {
205            Vec::new()
206        };
207
208        let mut vm = self.vm.child_vm_for_host();
209        let result = vm.call_closure_pub(closure, &args).await?;
210        // Translate the script's verdict into a canonical ACP response
211        // (`{ outcome: { outcome: "selected" | "cancelled", optionId? } }`).
212        // The script API stays ergonomic — bool / string-reason / dict — but
213        // the wire shape is canonical.
214        let payload = match result {
215            VmValue::Bool(granted) => {
216                if granted {
217                    crate::llm::acp_permission::allow_response()
218                } else {
219                    crate::llm::acp_permission::reject_response(None)
220                }
221            }
222            VmValue::String(reason) if !reason.is_empty() => {
223                crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224            }
225            other => {
226                let json = crate::llm::vm_value_to_json(&other);
227                if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228                    if granted {
229                        crate::llm::acp_permission::allow_response()
230                    } else {
231                        crate::llm::acp_permission::reject_response(
232                            json.get("reason")
233                                .and_then(|value| value.as_str())
234                                .map(str::to_string),
235                        )
236                    }
237                } else if json.get("outcome").is_some() {
238                    // The script already returned a canonical-shaped outcome.
239                    json
240                } else if other.is_truthy() {
241                    crate::llm::acp_permission::allow_response()
242                } else {
243                    crate::llm::acp_permission::reject_response(None)
244                }
245            }
246        };
247        Ok(payload)
248    }
249}
250
251/// How a queued bridge injection is delivered into the agent loop.
252///
253/// `AuditOnly` was previously called `WaitForCompletion`; the rename is
254/// truth-in-advertising (harn#2212). These injections drain at
255/// `loop_exit`, *after* the last LLM call has returned — so they land in
256/// the transcript audit but are **never rendered into a model prompt**.
257/// Hosts that want the model to react to the reminder on its final
258/// iteration should use `FinishStep` instead, which drains at every
259/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
260#[derive(Clone, Copy, Debug, PartialEq, Eq)]
261pub enum QueuedUserMessageMode {
262    InterruptImmediate,
263    FinishStep,
264    AuditOnly,
265}
266
267#[derive(Clone, Copy, Debug, PartialEq, Eq)]
268pub enum DeliveryCheckpoint {
269    InterruptImmediate,
270    AfterCurrentOperation,
271    EndOfInteraction,
272}
273
274impl QueuedUserMessageMode {
275    fn from_str(value: &str) -> Self {
276        match value {
277            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
278            "finish_step" | "after_current_operation" => Self::FinishStep,
279            // Unknown / missing modes fall through to the safest option:
280            // record for audit, do not preempt the loop. Pre-#2212 hosts
281            // that send `wait_for_completion` are caught by this arm —
282            // the canonical name is `audit_only` going forward.
283            _ => Self::AuditOnly,
284        }
285    }
286
287    fn as_str(self) -> &'static str {
288        match self {
289            Self::InterruptImmediate => "interrupt_immediate",
290            Self::FinishStep => "finish_step",
291            Self::AuditOnly => "audit_only",
292        }
293    }
294}
295
296#[derive(Clone, Debug, PartialEq, Eq)]
297pub struct QueuedUserMessage {
298    pub message_id: String,
299    pub content: String,
300    pub transcript_content: serde_json::Value,
301    pub mode: QueuedUserMessageMode,
302}
303
304#[derive(Clone, Debug, PartialEq, Eq)]
305pub struct QueuedReminder {
306    pub reminder: crate::llm::helpers::SystemReminder,
307    pub mode: QueuedUserMessageMode,
308}
309
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub enum QueuedTranscriptInjection {
312    User(QueuedUserMessage),
313    Reminder(QueuedReminder),
314}
315
316#[derive(Debug, Default)]
317struct QueuedTranscriptInjections {
318    queue: VecDeque<QueuedTranscriptInjection>,
319    revoked_user_message_ids: HashSet<String>,
320    delivered_user_message_ids: HashSet<String>,
321    revoked_reminder_ids: HashSet<String>,
322    delivered_reminder_ids: HashSet<String>,
323}
324
325#[derive(Clone, Debug, Default)]
326pub struct HostBridgeInjectionState {
327    inner: Arc<Mutex<QueuedTranscriptInjections>>,
328}
329
330#[derive(Clone, Copy, Debug, PartialEq, Eq)]
331pub enum PendingUserMessageMutationResult {
332    Mutated,
333    AlreadyRevoked,
334    AlreadyDelivered,
335    UnknownMessageId,
336}
337
338impl QueuedTranscriptInjection {
339    fn mode(&self) -> QueuedUserMessageMode {
340        match self {
341            Self::User(message) => message.mode,
342            Self::Reminder(reminder) => reminder.mode,
343        }
344    }
345
346    fn pending_json(&self, position: usize) -> serde_json::Value {
347        match self {
348            Self::User(message) => serde_json::json!({
349                "kind": "user",
350                "id": message.message_id,
351                "messageId": message.message_id,
352                "mode": message.mode.as_str(),
353                "position": position,
354                "content": message.transcript_content,
355            }),
356            Self::Reminder(reminder) => serde_json::json!({
357                "kind": "reminder",
358                "id": reminder.reminder.id,
359                "reminderId": reminder.reminder.id,
360                "mode": reminder.mode.as_str(),
361                "position": position,
362                "body": reminder.reminder.body,
363                "tags": reminder.reminder.tags,
364                "dedupeKey": reminder.reminder.dedupe_key,
365                "ttlTurns": reminder.reminder.ttl_turns,
366                "preserveOnCompact": reminder.reminder.preserve_on_compact,
367                "propagate": reminder.reminder.propagate.as_str(),
368                "roleHint": reminder.reminder.role_hint.as_str(),
369                "source": reminder.reminder.source.as_str(),
370                "firedAtTurn": reminder.reminder.fired_at_turn,
371                "originatingAgentId": reminder.reminder.originating_agent_id,
372            }),
373        }
374    }
375}
376
377#[derive(Clone, Copy, Debug, PartialEq, Eq)]
378pub enum PendingReminderMutationResult {
379    Mutated,
380    AlreadyRevoked,
381    AlreadyDelivered,
382    UnknownReminderId,
383}
384
385fn new_inject_message_id() -> String {
386    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
387}
388
389impl HostBridgeInjectionState {
390    pub fn new() -> Self {
391        Self::default()
392    }
393
394    pub async fn push_pending_user_message(
395        &self,
396        content: String,
397        transcript_content: serde_json::Value,
398        mode: &str,
399    ) -> String {
400        let message_id = new_inject_message_id();
401        self.inner
402            .lock()
403            .await
404            .queue
405            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
406                message_id: message_id.clone(),
407                content,
408                transcript_content,
409                mode: QueuedUserMessageMode::from_str(mode),
410            }));
411        message_id
412    }
413
414    pub async fn revoke_pending_user_message(
415        &self,
416        message_id: &str,
417    ) -> PendingUserMessageMutationResult {
418        let mut state = self.inner.lock().await;
419        let mut retained = VecDeque::new();
420        let mut revoked = false;
421        while let Some(injection) = state.queue.pop_front() {
422            match &injection {
423                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
424                    revoked = true;
425                }
426                _ => retained.push_back(injection),
427            }
428        }
429        state.queue = retained;
430        if revoked {
431            state
432                .revoked_user_message_ids
433                .insert(message_id.to_string());
434            return PendingUserMessageMutationResult::Mutated;
435        }
436        if state.revoked_user_message_ids.contains(message_id) {
437            PendingUserMessageMutationResult::AlreadyRevoked
438        } else if state.delivered_user_message_ids.contains(message_id) {
439            PendingUserMessageMutationResult::AlreadyDelivered
440        } else {
441            PendingUserMessageMutationResult::UnknownMessageId
442        }
443    }
444
445    pub async fn revoke_pending_reminder(
446        &self,
447        reminder_id: &str,
448    ) -> PendingReminderMutationResult {
449        let mut state = self.inner.lock().await;
450        let mut retained = VecDeque::new();
451        let mut revoked = false;
452        while let Some(injection) = state.queue.pop_front() {
453            match &injection {
454                QueuedTranscriptInjection::Reminder(reminder)
455                    if reminder.reminder.id == reminder_id =>
456                {
457                    revoked = true;
458                }
459                _ => retained.push_back(injection),
460            }
461        }
462        state.queue = retained;
463        if revoked {
464            state.revoked_reminder_ids.insert(reminder_id.to_string());
465            return PendingReminderMutationResult::Mutated;
466        }
467        if state.revoked_reminder_ids.contains(reminder_id) {
468            PendingReminderMutationResult::AlreadyRevoked
469        } else if state.delivered_reminder_ids.contains(reminder_id) {
470            PendingReminderMutationResult::AlreadyDelivered
471        } else {
472            PendingReminderMutationResult::UnknownReminderId
473        }
474    }
475
476    pub async fn replace_pending_user_message(
477        &self,
478        message_id: &str,
479        content: String,
480        transcript_content: serde_json::Value,
481    ) -> PendingUserMessageMutationResult {
482        let mut state = self.inner.lock().await;
483        for injection in &mut state.queue {
484            if let QueuedTranscriptInjection::User(message) = injection {
485                if message.message_id == message_id {
486                    message.content = content;
487                    message.transcript_content = transcript_content;
488                    return PendingUserMessageMutationResult::Mutated;
489                }
490            }
491        }
492        if state.revoked_user_message_ids.contains(message_id) {
493            PendingUserMessageMutationResult::AlreadyRevoked
494        } else if state.delivered_user_message_ids.contains(message_id) {
495            PendingUserMessageMutationResult::AlreadyDelivered
496        } else {
497            PendingUserMessageMutationResult::UnknownMessageId
498        }
499    }
500
501    async fn push_session_reminder(&self, reminder: QueuedReminder) {
502        self.inner
503            .lock()
504            .await
505            .queue
506            .push_back(QueuedTranscriptInjection::Reminder(reminder));
507    }
508
509    pub async fn pending_injections_json(&self) -> serde_json::Value {
510        let state = self.inner.lock().await;
511        let injections = state
512            .queue
513            .iter()
514            .enumerate()
515            .map(|(position, injection)| injection.pending_json(position))
516            .collect::<Vec<_>>();
517        serde_json::json!({
518            "pendingCount": injections.len(),
519            "injections": injections,
520        })
521    }
522}
523
524fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
525    format!(
526        "{}: {}",
527        Code::ReminderUnknownOption.as_str(),
528        message.as_ref()
529    )
530}
531
532fn session_remind_shape_error(message: impl AsRef<str>) -> String {
533    format!(
534        "{}: {}",
535        Code::ReminderInvalidShape.as_str(),
536        message.as_ref()
537    )
538}
539
540fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
541    format!(
542        "{}: {}",
543        Code::ReminderUnknownPropagate.as_str(),
544        message.as_ref()
545    )
546}
547
548fn string_field(
549    map: &serde_json::Map<String, serde_json::Value>,
550    key: &str,
551    required: bool,
552) -> Result<Option<String>, String> {
553    match map.get(key) {
554        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
555            format!("`{key}` must be a non-empty string"),
556        )),
557        None | Some(serde_json::Value::Null) => Ok(None),
558        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
559            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
560        ),
561        Some(serde_json::Value::String(value)) => {
562            let trimmed = value.trim();
563            if trimmed.is_empty() {
564                Ok(None)
565            } else {
566                Ok(Some(trimmed.to_string()))
567            }
568        }
569        Some(other) => Err(session_remind_shape_error(format!(
570            "`{key}` must be a string, got {other}"
571        ))),
572    }
573}
574
575fn bool_field(
576    map: &serde_json::Map<String, serde_json::Value>,
577    key: &str,
578) -> Result<Option<bool>, String> {
579    match map.get(key) {
580        None | Some(serde_json::Value::Null) => Ok(None),
581        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
582        Some(other) => Err(session_remind_shape_error(format!(
583            "`{key}` must be a bool, got {other}"
584        ))),
585    }
586}
587
588fn int_field(
589    map: &serde_json::Map<String, serde_json::Value>,
590    key: &str,
591) -> Result<Option<i64>, String> {
592    match map.get(key) {
593        None | Some(serde_json::Value::Null) => Ok(None),
594        Some(serde_json::Value::Number(value)) => {
595            let Some(value) = value.as_i64() else {
596                return Err(session_remind_shape_error(format!(
597                    "`{key}` must be an integer"
598                )));
599            };
600            Ok(Some(value))
601        }
602        Some(other) => Err(session_remind_shape_error(format!(
603            "`{key}` must be an int, got {other}"
604        ))),
605    }
606}
607
608fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
609    let Some(value) = map.get("tags") else {
610        return Ok(Vec::new());
611    };
612    if value.is_null() {
613        return Ok(Vec::new());
614    }
615    let Some(values) = value.as_array() else {
616        return Err(session_remind_shape_error("`tags` must be a list"));
617    };
618    let mut tags = Vec::new();
619    for value in values {
620        let Some(tag) = value.as_str() else {
621            return Err(session_remind_shape_error(format!(
622                "`tags` entries must be strings, got {value}"
623            )));
624        };
625        let tag = tag.trim();
626        if tag.is_empty() {
627            return Err(session_remind_shape_error(
628                "`tags` entries must be non-empty strings",
629            ));
630        }
631        if !tags.iter().any(|existing| existing == tag) {
632            tags.push(tag.to_string());
633        }
634    }
635    Ok(tags)
636}
637
638fn session_remind_payload_from_value(
639    value: &serde_json::Value,
640) -> Result<crate::llm::helpers::SystemReminder, String> {
641    let Some(map) = value.as_object() else {
642        return Err(session_remind_shape_error(
643            "session/remind payload must be a reminder object",
644        ));
645    };
646    const ALLOWED: &[&str] = &[
647        "_meta",
648        "body",
649        "dedupe_key",
650        "fired_at_turn",
651        "id",
652        "preserve_on_compact",
653        "propagate",
654        "role_hint",
655        "source",
656        "tags",
657        "ttl_turns",
658    ];
659    let unknown = map
660        .keys()
661        .filter(|key| !ALLOWED.contains(&key.as_str()))
662        .map(String::as_str)
663        .collect::<Vec<_>>();
664    if !unknown.is_empty() {
665        if unknown.contains(&"content") {
666            return Err(session_remind_shape_error(
667                "session/remind expects reminder `body`, not user-message `content`",
668            ));
669        }
670        return Err(reminder_unknown_option_error(format!(
671            "unknown reminder option(s): {}",
672            unknown.join(", ")
673        )));
674    }
675    if let Some(meta) = map.get("_meta") {
676        if !meta.is_null() && !meta.is_object() {
677            return Err(session_remind_shape_error("`_meta` must be an object"));
678        }
679    }
680    let ttl_turns = int_field(map, "ttl_turns")?;
681    if let Some(value) = ttl_turns {
682        if value <= 0 {
683            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
684        }
685    }
686    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
687    if fired_at_turn < 0 {
688        return Err(session_remind_shape_error(
689            "`fired_at_turn` must be >= 0 when provided",
690        ));
691    }
692    match string_field(map, "source", false)?.as_deref() {
693        None | Some("bridge") => {}
694        Some(_) => {
695            return Err(session_remind_shape_error(
696                "`source` for session/remind must be bridge when provided",
697            ))
698        }
699    }
700    let propagate = match string_field(map, "propagate", false)?.as_deref() {
701        None => crate::llm::helpers::ReminderPropagate::Session,
702        Some("all") => crate::llm::helpers::ReminderPropagate::All,
703        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
704        Some("none") => crate::llm::helpers::ReminderPropagate::None,
705        Some(_) => {
706            return Err(reminder_unknown_propagate_error(
707                "`propagate` must be one of all, session, or none",
708            ))
709        }
710    };
711    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
712        None => crate::llm::helpers::ReminderRoleHint::System,
713        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
714        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
715        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
716        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
717        Some(_) => {
718            return Err(session_remind_shape_error(
719                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
720            ))
721        }
722    };
723    Ok(crate::llm::helpers::SystemReminder {
724        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
725        tags: tags_field(map)?,
726        dedupe_key: string_field(map, "dedupe_key", false)?,
727        ttl_turns,
728        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
729        propagate,
730        role_hint,
731        source: crate::llm::helpers::ReminderSource::Bridge,
732        body: string_field(map, "body", true)?.unwrap_or_default(),
733        fired_at_turn,
734        originating_agent_id: None,
735    })
736}
737
738/// Parse the params of a `session/cancel_tool_call` notification and fire
739/// the per-tool-call cancellation. Mirrors the shape used by the public
740/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
741/// regardless of which surface they came through.
742///
743/// Stdio bridges send this as a notification (no id, no response); the
744/// builtin handles request/response semantics in Harn. We deliberately
745/// drop malformed payloads silently because notifications can't reply
746/// with an error — logging would also be too noisy for partial drops.
747fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
748    let session_id = params
749        .get("sessionId")
750        .or_else(|| params.get("session_id"))
751        .and_then(|value| value.as_str())
752        .unwrap_or_default();
753    let call_id = params
754        .get("toolCallId")
755        .or_else(|| params.get("tool_call_id"))
756        .or_else(|| params.get("callId"))
757        .or_else(|| params.get("call_id"))
758        .and_then(|value| value.as_str())
759        .unwrap_or_default();
760    if call_id.is_empty() {
761        return;
762    }
763    let reason = params
764        .get("reason")
765        .and_then(|value| value.as_str())
766        .unwrap_or("host cancelled in-flight tool call")
767        .to_string();
768    let inject_reminder = params
769        .get("injectReminder")
770        .or_else(|| params.get("inject_reminder"))
771        .and_then(|value| value.as_bool())
772        .unwrap_or(true);
773    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
774}
775
776fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
777    let mode = QueuedUserMessageMode::from_str(
778        params
779            .get("mode")
780            .and_then(|value| value.as_str())
781            .unwrap_or("audit_only"),
782    );
783    let reminder_value = if let Some(reminder) = params.get("reminder") {
784        reminder.clone()
785    } else {
786        let Some(params) = params.as_object() else {
787            return Err(session_remind_shape_error(
788                "session/remind params must be an object",
789            ));
790        };
791        let mut reminder = params.clone();
792        reminder.remove("mode");
793        reminder.remove("sessionId");
794        reminder.remove("session_id");
795        serde_json::Value::Object(reminder)
796    };
797    Ok(QueuedReminder {
798        reminder: session_remind_payload_from_value(&reminder_value)?,
799        mode,
800    })
801}
802
803// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
804#[allow(clippy::new_without_default)]
805impl HostBridge {
806    /// Create a new bridge and spawn the stdin reader task.
807    ///
808    /// Must be called within a tokio LocalSet (uses spawn_local for the
809    /// stdin reader since it's single-threaded).
810    pub fn new() -> Self {
811        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
812            Arc::new(Mutex::new(HashMap::new()));
813        let cancelled = Arc::new(AtomicBool::new(false));
814        let cancel_notify = Arc::new(Notify::new());
815        let queued_transcript_injections = HostBridgeInjectionState::default();
816        let resume_requested = Arc::new(AtomicBool::new(false));
817        let skills_reload_requested = Arc::new(AtomicBool::new(false));
818        let daemon_idle = Arc::new(AtomicBool::new(false));
819
820        // Stdin reader: reads JSON-RPC lines and dispatches responses
821        let pending_clone = pending.clone();
822        let cancelled_clone = cancelled.clone();
823        let cancel_notify_clone = cancel_notify.clone();
824        let queued_clone = queued_transcript_injections.clone();
825        let resume_clone = resume_requested.clone();
826        let skills_reload_clone = skills_reload_requested.clone();
827        tokio::task::spawn_local(async move {
828            let stdin = tokio::io::stdin();
829            let reader = tokio::io::BufReader::new(stdin);
830            let mut lines = reader.lines();
831
832            while let Ok(Some(line)) = lines.next_line().await {
833                let line = line.trim().to_string();
834                if line.is_empty() {
835                    continue;
836                }
837
838                let msg: serde_json::Value = match serde_json::from_str(&line) {
839                    Ok(v) => v,
840                    Err(_) => continue,
841                };
842
843                // Notifications have no id; responses have one.
844                if msg.get("id").is_none() {
845                    if let Some(method) = msg["method"].as_str() {
846                        if method == "cancel" {
847                            cancelled_clone.store(true, Ordering::SeqCst);
848                            cancel_notify_clone.notify_waiters();
849                        } else if method == "agent/resume" {
850                            resume_clone.store(true, Ordering::SeqCst);
851                        } else if method == "skills/update" {
852                            skills_reload_clone.store(true, Ordering::SeqCst);
853                        } else if method == "session/remind" {
854                            let params = &msg["params"];
855                            if let Ok(reminder) = queued_session_remind_from_params(params) {
856                                queued_clone.push_session_reminder(reminder).await;
857                            }
858                        } else if method == "session/cancel_tool_call" {
859                            handle_cancel_tool_call_notification(&msg["params"]);
860                        }
861                    }
862                    continue;
863                }
864
865                if let Some(id) = msg["id"].as_u64() {
866                    let mut pending = pending_clone.lock().await;
867                    if let Some(sender) = pending.remove(&id) {
868                        let _ = sender.send(msg);
869                    }
870                }
871            }
872
873            // stdin closed: drop pending senders to cancel waiters.
874            let mut pending = pending_clone.lock().await;
875            pending.clear();
876        });
877
878        Self {
879            next_id: AtomicU64::new(1),
880            pending,
881            cancelled,
882            cancel_notify,
883            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
884            session_id: std::sync::Mutex::new(String::new()),
885            script_name: std::sync::Mutex::new(String::new()),
886            queued_transcript_injections,
887            resume_requested,
888            skills_reload_requested,
889            daemon_idle,
890            prompt_stop_reason: std::sync::Mutex::new(None),
891            visible_call_states: std::sync::Mutex::new(HashMap::new()),
892            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
893            in_process: None,
894        }
895    }
896
897    /// Create a bridge from pre-existing shared state.
898    ///
899    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
900    /// responsible for dispatching responses into `pending`.  This is used
901    /// by ACP mode which already has its own stdin reader.
902    pub fn from_parts(
903        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
904        cancelled: Arc<AtomicBool>,
905        stdout_lock: Arc<std::sync::Mutex<()>>,
906        start_id: u64,
907    ) -> Self {
908        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
909    }
910
911    pub fn from_parts_with_writer(
912        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
913        cancelled: Arc<AtomicBool>,
914        writer: HostBridgeWriter,
915        start_id: u64,
916    ) -> Self {
917        Self::from_parts_with_writer_and_cancel_notify(
918            pending,
919            cancelled,
920            Arc::new(Notify::new()),
921            writer,
922            start_id,
923        )
924    }
925
926    pub fn from_parts_with_writer_and_cancel_notify(
927        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
928        cancelled: Arc<AtomicBool>,
929        cancel_notify: Arc<Notify>,
930        writer: HostBridgeWriter,
931        start_id: u64,
932    ) -> Self {
933        Self::from_parts_with_writer_cancel_notify_and_injection_state(
934            pending,
935            cancelled,
936            cancel_notify,
937            writer,
938            start_id,
939            None,
940        )
941    }
942
943    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
944        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
945        cancelled: Arc<AtomicBool>,
946        cancel_notify: Arc<Notify>,
947        writer: HostBridgeWriter,
948        start_id: u64,
949        injection_state: Option<HostBridgeInjectionState>,
950    ) -> Self {
951        Self {
952            next_id: AtomicU64::new(start_id),
953            pending,
954            cancelled,
955            cancel_notify,
956            writer,
957            session_id: std::sync::Mutex::new(String::new()),
958            script_name: std::sync::Mutex::new(String::new()),
959            queued_transcript_injections: injection_state.unwrap_or_default(),
960            resume_requested: Arc::new(AtomicBool::new(false)),
961            skills_reload_requested: Arc::new(AtomicBool::new(false)),
962            daemon_idle: Arc::new(AtomicBool::new(false)),
963            prompt_stop_reason: std::sync::Mutex::new(None),
964            visible_call_states: std::sync::Mutex::new(HashMap::new()),
965            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
966            in_process: None,
967        }
968    }
969
970    /// Create an in-process host bridge backed by exported functions from a
971    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
972    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
973        let exported_functions = vm.load_module_exports(module_path).await?;
974        Ok(Self {
975            next_id: AtomicU64::new(1),
976            pending: Arc::new(Mutex::new(HashMap::new())),
977            cancelled: Arc::new(AtomicBool::new(false)),
978            cancel_notify: Arc::new(Notify::new()),
979            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
980            session_id: std::sync::Mutex::new(String::new()),
981            script_name: std::sync::Mutex::new(String::new()),
982            queued_transcript_injections: HostBridgeInjectionState::default(),
983            resume_requested: Arc::new(AtomicBool::new(false)),
984            skills_reload_requested: Arc::new(AtomicBool::new(false)),
985            daemon_idle: Arc::new(AtomicBool::new(false)),
986            prompt_stop_reason: std::sync::Mutex::new(None),
987            visible_call_states: std::sync::Mutex::new(HashMap::new()),
988            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
989            in_process: Some(InProcessHost {
990                module_path: module_path.to_path_buf(),
991                exported_functions,
992                vm,
993            }),
994        })
995    }
996
997    /// Set the ACP session ID for session-scoped notifications.
998    pub fn set_session_id(&self, id: &str) {
999        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1000    }
1001
1002    /// Set the currently executing script name (without .harn suffix).
1003    pub fn set_script_name(&self, name: &str) {
1004        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1005    }
1006
1007    /// Get the current script name.
1008    fn get_script_name(&self) -> String {
1009        self.script_name
1010            .lock()
1011            .unwrap_or_else(|e| e.into_inner())
1012            .clone()
1013    }
1014
1015    /// Get the session ID.
1016    pub fn get_session_id(&self) -> String {
1017        self.session_id
1018            .lock()
1019            .unwrap_or_else(|e| e.into_inner())
1020            .clone()
1021    }
1022
1023    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
1024    fn write_line(&self, line: &str) -> Result<(), VmError> {
1025        (self.writer)(line).map_err(VmError::Runtime)
1026    }
1027
1028    /// Send a JSON-RPC request to the host and wait for the response.
1029    /// Times out after 5 minutes to prevent deadlocks.
1030    pub async fn call(
1031        &self,
1032        method: &str,
1033        params: serde_json::Value,
1034    ) -> Result<serde_json::Value, VmError> {
1035        if let Some(in_process) = &self.in_process {
1036            return in_process.dispatch(method, params).await;
1037        }
1038
1039        if self.is_cancelled() {
1040            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1041        }
1042
1043        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1044        let cancel_wait = self.cancel_notify.notified();
1045        tokio::pin!(cancel_wait);
1046
1047        let request = crate::jsonrpc::request(id, method, params);
1048
1049        let (tx, rx) = oneshot::channel();
1050        {
1051            let mut pending = self.pending.lock().await;
1052            pending.insert(id, tx);
1053        }
1054
1055        let line = serde_json::to_string(&request)
1056            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1057        if let Err(e) = self.write_line(&line) {
1058            let mut pending = self.pending.lock().await;
1059            pending.remove(&id);
1060            return Err(e);
1061        }
1062
1063        if self.is_cancelled() {
1064            let mut pending = self.pending.lock().await;
1065            pending.remove(&id);
1066            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1067        }
1068
1069        let response = tokio::select! {
1070            result = rx => match result {
1071                Ok(msg) => msg,
1072                Err(_) => {
1073                    // Sender dropped: host closed or stdin reader exited.
1074                    return Err(VmError::Runtime(
1075                        "Bridge: host closed connection before responding".into(),
1076                    ));
1077                }
1078            },
1079            _ = &mut cancel_wait => {
1080                let mut pending = self.pending.lock().await;
1081                pending.remove(&id);
1082                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1083            }
1084            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1085                let mut pending = self.pending.lock().await;
1086                pending.remove(&id);
1087                return Err(VmError::Runtime(format!(
1088                    "Bridge: host did not respond to '{method}' within {}s",
1089                    DEFAULT_TIMEOUT.as_secs()
1090                )));
1091            }
1092        };
1093
1094        if let Some(error) = response.get("error") {
1095            let message = error["message"].as_str().unwrap_or("Unknown host error");
1096            let code = error["code"].as_i64().unwrap_or(-1);
1097            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
1098            if code == -32001 {
1099                return Err(VmError::CategorizedError {
1100                    message: message.to_string(),
1101                    category: ErrorCategory::ToolRejected,
1102                });
1103            }
1104            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1105        }
1106
1107        Ok(response["result"].clone())
1108    }
1109
1110    /// Send a JSON-RPC notification to the host (no response expected).
1111    /// Serialized through the stdout mutex to prevent interleaving.
1112    pub fn notify(&self, method: &str, params: serde_json::Value) {
1113        let notification = crate::jsonrpc::notification(method, params);
1114        if self.in_process.is_some() {
1115            return;
1116        }
1117        if let Ok(line) = serde_json::to_string(&notification) {
1118            let _ = self.write_line(&line);
1119        }
1120    }
1121
1122    /// Check if the host has sent a cancel notification.
1123    pub fn is_cancelled(&self) -> bool {
1124        self.cancelled.load(Ordering::SeqCst)
1125    }
1126
1127    pub fn take_resume_signal(&self) -> bool {
1128        self.resume_requested.swap(false, Ordering::SeqCst)
1129    }
1130
1131    pub fn signal_resume(&self) {
1132        self.resume_requested.store(true, Ordering::SeqCst);
1133    }
1134
1135    pub fn set_daemon_idle(&self, idle: bool) {
1136        self.daemon_idle.store(idle, Ordering::SeqCst);
1137    }
1138
1139    pub fn is_daemon_idle(&self) -> bool {
1140        self.daemon_idle.load(Ordering::SeqCst)
1141    }
1142
1143    /// Record the canonical ACP `stopReason` for the current prompt. The
1144    /// last writer wins, which matches the semantic that an outer
1145    /// `agent_loop` (the one whose result the user observes) always
1146    /// finalizes after any inner loops it spawned.
1147    pub fn set_prompt_stop_reason(&self, reason: &str) {
1148        *self
1149            .prompt_stop_reason
1150            .lock()
1151            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1152    }
1153
1154    /// Consume any prompt stop reason recorded during this prompt. The
1155    /// ACP adapter calls this once after the pipeline returns; pipelines
1156    /// that didn't run an `agent_loop` see `None` and the adapter falls
1157    /// back to `end_turn`.
1158    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1159        self.prompt_stop_reason
1160            .lock()
1161            .unwrap_or_else(|e| e.into_inner())
1162            .take()
1163    }
1164
1165    /// Consume any pending `skills/update` signal the host has sent.
1166    /// Returns `true` exactly once per notification, letting callers
1167    /// trigger a layered-discovery rebuild without polling false
1168    /// positives. See issue #73 for the hot-reload contract.
1169    pub fn take_skills_reload_signal(&self) -> bool {
1170        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1171    }
1172
1173    /// Manually mark the skill catalog as stale. Used by tests and by
1174    /// the CLI when an internal event (e.g. `harn install`) should
1175    /// trigger the same rebuild a `skills/update` notification would.
1176    pub fn signal_skills_reload(&self) {
1177        self.skills_reload_requested.store(true, Ordering::SeqCst);
1178    }
1179
1180    /// Call the host's `skills/list` RPC and return the raw JSON array
1181    /// it responded with. Shape:
1182    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1183    /// The CLI adapter converts each entry into a
1184    /// [`crate::skills::SkillManifestRef`].
1185    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1186        let result = self.call("skills/list", serde_json::json!({})).await?;
1187        match result {
1188            serde_json::Value::Array(items) => Ok(items),
1189            serde_json::Value::Object(map) => match map.get("skills") {
1190                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1191                _ => Err(VmError::Runtime(
1192                    "skills/list: host response must be an array or { skills: [...] }".into(),
1193                )),
1194            },
1195            _ => Err(VmError::Runtime(
1196                "skills/list: unexpected response shape".into(),
1197            )),
1198        }
1199    }
1200
1201    /// Call the host's `host/tools/list` RPC and return normalized tool
1202    /// descriptors. Shape:
1203    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1204    /// The bridge also accepts `{ "tools": [...] }` and
1205    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1206    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1207        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1208        parse_host_tools_list_response(result)
1209    }
1210
1211    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1212    /// raw JSON body so the CLI can inspect both the frontmatter fields
1213    /// and the skill markdown body in whatever shape the host sends.
1214    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1215        self.call("skills/fetch", serde_json::json!({ "id": id }))
1216            .await
1217    }
1218
1219    pub fn injection_state(&self) -> HostBridgeInjectionState {
1220        self.queued_transcript_injections.clone()
1221    }
1222
1223    pub async fn push_pending_user_message(
1224        &self,
1225        content: String,
1226        transcript_content: serde_json::Value,
1227        mode: &str,
1228    ) -> String {
1229        self.queued_transcript_injections
1230            .push_pending_user_message(content, transcript_content, mode)
1231            .await
1232    }
1233
1234    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1235        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1236            .await
1237    }
1238
1239    pub async fn revoke_pending_user_message(
1240        &self,
1241        message_id: &str,
1242    ) -> PendingUserMessageMutationResult {
1243        self.queued_transcript_injections
1244            .revoke_pending_user_message(message_id)
1245            .await
1246    }
1247
1248    pub async fn revoke_pending_reminder(
1249        &self,
1250        reminder_id: &str,
1251    ) -> PendingReminderMutationResult {
1252        self.queued_transcript_injections
1253            .revoke_pending_reminder(reminder_id)
1254            .await
1255    }
1256
1257    pub async fn pending_injections_json(&self) -> serde_json::Value {
1258        self.queued_transcript_injections
1259            .pending_injections_json()
1260            .await
1261    }
1262
1263    pub async fn replace_pending_user_message(
1264        &self,
1265        message_id: &str,
1266        content: String,
1267        transcript_content: serde_json::Value,
1268    ) -> PendingUserMessageMutationResult {
1269        self.queued_transcript_injections
1270            .replace_pending_user_message(message_id, content, transcript_content)
1271            .await
1272    }
1273
1274    pub async fn push_queued_session_remind_from_params(
1275        &self,
1276        params: &serde_json::Value,
1277    ) -> Result<String, String> {
1278        let reminder = queued_session_remind_from_params(params)?;
1279        let reminder_id = reminder.reminder.id.clone();
1280        self.queued_transcript_injections
1281            .push_session_reminder(reminder)
1282            .await;
1283        Ok(reminder_id)
1284    }
1285
1286    pub async fn take_queued_user_messages(
1287        &self,
1288        include_interrupt_immediate: bool,
1289        include_finish_step: bool,
1290        include_audit_only: bool,
1291    ) -> Vec<QueuedUserMessage> {
1292        let mut state = self.queued_transcript_injections.inner.lock().await;
1293        let mut selected = Vec::new();
1294        let mut retained = VecDeque::new();
1295        while let Some(injection) = state.queue.pop_front() {
1296            let should_take = match injection.mode() {
1297                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1298                QueuedUserMessageMode::FinishStep => include_finish_step,
1299                QueuedUserMessageMode::AuditOnly => include_audit_only,
1300            };
1301            match (should_take, injection) {
1302                (true, QueuedTranscriptInjection::User(message)) => {
1303                    state
1304                        .delivered_user_message_ids
1305                        .insert(message.message_id.clone());
1306                    selected.push(message);
1307                }
1308                (_, injection) => retained.push_back(injection),
1309            }
1310        }
1311        state.queue = retained;
1312        selected
1313    }
1314
1315    pub async fn take_queued_transcript_injections(
1316        &self,
1317        include_interrupt_immediate: bool,
1318        include_finish_step: bool,
1319        include_audit_only: bool,
1320    ) -> Vec<QueuedTranscriptInjection> {
1321        let mut state = self.queued_transcript_injections.inner.lock().await;
1322        let mut selected = Vec::new();
1323        let mut retained = VecDeque::new();
1324        while let Some(injection) = state.queue.pop_front() {
1325            let should_take = match injection.mode() {
1326                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1327                QueuedUserMessageMode::FinishStep => include_finish_step,
1328                QueuedUserMessageMode::AuditOnly => include_audit_only,
1329            };
1330            if should_take {
1331                match &injection {
1332                    QueuedTranscriptInjection::User(message) => {
1333                        state
1334                            .delivered_user_message_ids
1335                            .insert(message.message_id.clone());
1336                    }
1337                    QueuedTranscriptInjection::Reminder(reminder) => {
1338                        state
1339                            .delivered_reminder_ids
1340                            .insert(reminder.reminder.id.clone());
1341                    }
1342                }
1343                selected.push(injection);
1344            } else {
1345                retained.push_back(injection);
1346            }
1347        }
1348        state.queue = retained;
1349        selected
1350    }
1351
1352    pub async fn take_queued_user_messages_for(
1353        &self,
1354        checkpoint: DeliveryCheckpoint,
1355    ) -> Vec<QueuedUserMessage> {
1356        match checkpoint {
1357            DeliveryCheckpoint::InterruptImmediate => {
1358                self.take_queued_user_messages(true, false, false).await
1359            }
1360            DeliveryCheckpoint::AfterCurrentOperation => {
1361                self.take_queued_user_messages(false, true, false).await
1362            }
1363            DeliveryCheckpoint::EndOfInteraction => {
1364                self.take_queued_user_messages(false, false, true).await
1365            }
1366        }
1367    }
1368
1369    pub async fn take_queued_transcript_injections_for(
1370        &self,
1371        checkpoint: DeliveryCheckpoint,
1372    ) -> Vec<QueuedTranscriptInjection> {
1373        match checkpoint {
1374            DeliveryCheckpoint::InterruptImmediate => {
1375                self.take_queued_transcript_injections(true, false, false)
1376                    .await
1377            }
1378            DeliveryCheckpoint::AfterCurrentOperation => {
1379                self.take_queued_transcript_injections(false, true, false)
1380                    .await
1381            }
1382            DeliveryCheckpoint::EndOfInteraction => {
1383                self.take_queued_transcript_injections(false, false, true)
1384                    .await
1385            }
1386        }
1387    }
1388
1389    /// Send an output notification (for log/print in bridge mode).
1390    pub fn send_output(&self, text: &str) {
1391        self.notify("output", serde_json::json!({"text": text}));
1392    }
1393
1394    /// Send a progress notification with optional numeric progress and structured data.
1395    pub fn send_progress(
1396        &self,
1397        phase: &str,
1398        message: &str,
1399        progress: Option<i64>,
1400        total: Option<i64>,
1401        data: Option<serde_json::Value>,
1402    ) {
1403        let mut payload = serde_json::json!({"phase": phase, "message": message});
1404        if let Some(p) = progress {
1405            payload["progress"] = serde_json::json!(p);
1406        }
1407        if let Some(t) = total {
1408            payload["total"] = serde_json::json!(t);
1409        }
1410        if let Some(d) = data {
1411            payload["data"] = d;
1412        }
1413        self.notify("progress", payload);
1414    }
1415
1416    /// Send a structured log notification.
1417    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1418        let mut payload = serde_json::json!({"level": level, "message": message});
1419        if let Some(f) = fields {
1420            payload["fields"] = f;
1421        }
1422        self.notify("log", payload);
1423    }
1424
1425    /// Send a `session/update` with `call_start` — signals the beginning of
1426    /// an LLM call, tool call, or builtin call for observability.
1427    pub fn send_call_start(
1428        &self,
1429        call_id: &str,
1430        call_type: &str,
1431        name: &str,
1432        metadata: serde_json::Value,
1433    ) {
1434        let session_id = self.get_session_id();
1435        let script = self.get_script_name();
1436        let stream_publicly = metadata
1437            .get("stream_publicly")
1438            .and_then(|value| value.as_bool())
1439            .unwrap_or(true);
1440        self.visible_call_streams
1441            .lock()
1442            .unwrap_or_else(|e| e.into_inner())
1443            .insert(call_id.to_string(), stream_publicly);
1444        self.notify(
1445            "session/update",
1446            serde_json::json!({
1447                "sessionId": session_id,
1448                "update": {
1449                    "sessionUpdate": "call_start",
1450                    "content": {
1451                        "toolCallId": call_id,
1452                        "call_type": call_type,
1453                        "name": name,
1454                        "script": script,
1455                        "metadata": metadata,
1456                    },
1457                },
1458            }),
1459        );
1460    }
1461
1462    /// Send a `session/update` with `call_progress` — a streaming token delta
1463    /// from an in-flight LLM call.
1464    pub fn send_call_progress(
1465        &self,
1466        call_id: &str,
1467        delta: &str,
1468        accumulated_tokens: u64,
1469        user_visible: bool,
1470    ) {
1471        let session_id = self.get_session_id();
1472        let (visible_text, visible_delta) = {
1473            let stream_publicly = self
1474                .visible_call_streams
1475                .lock()
1476                .unwrap_or_else(|e| e.into_inner())
1477                .get(call_id)
1478                .copied()
1479                .unwrap_or(true);
1480            let mut states = self
1481                .visible_call_states
1482                .lock()
1483                .unwrap_or_else(|e| e.into_inner());
1484            let state = states.entry(call_id.to_string()).or_default();
1485            state.push(delta, stream_publicly)
1486        };
1487        self.notify(
1488            "session/update",
1489            serde_json::json!({
1490                "sessionId": session_id,
1491                "update": {
1492                    "sessionUpdate": "call_progress",
1493                    "content": {
1494                        "toolCallId": call_id,
1495                        "delta": delta,
1496                        "accumulated_tokens": accumulated_tokens,
1497                        "visible_text": visible_text,
1498                        "visible_delta": visible_delta,
1499                        "user_visible": user_visible,
1500                    },
1501                },
1502            }),
1503        );
1504    }
1505
1506    /// Send a `session/update` with `call_end` — signals completion of a call.
1507    pub fn send_call_end(
1508        &self,
1509        call_id: &str,
1510        call_type: &str,
1511        name: &str,
1512        duration_ms: u64,
1513        status: &str,
1514        metadata: serde_json::Value,
1515    ) {
1516        let session_id = self.get_session_id();
1517        let script = self.get_script_name();
1518        self.visible_call_states
1519            .lock()
1520            .unwrap_or_else(|e| e.into_inner())
1521            .remove(call_id);
1522        self.visible_call_streams
1523            .lock()
1524            .unwrap_or_else(|e| e.into_inner())
1525            .remove(call_id);
1526        self.notify(
1527            "session/update",
1528            serde_json::json!({
1529                "sessionId": session_id,
1530                "update": {
1531                    "sessionUpdate": "call_end",
1532                    "content": {
1533                        "toolCallId": call_id,
1534                        "call_type": call_type,
1535                        "name": name,
1536                        "script": script,
1537                        "duration_ms": duration_ms,
1538                        "status": status,
1539                        "metadata": metadata,
1540                    },
1541                },
1542            }),
1543        );
1544    }
1545
1546    /// Send a worker lifecycle update for delegated/background execution.
1547    pub fn send_worker_update(
1548        &self,
1549        worker_id: &str,
1550        worker_name: &str,
1551        status: &str,
1552        metadata: serde_json::Value,
1553        audit: Option<&MutationSessionRecord>,
1554    ) {
1555        let session_id = self.get_session_id();
1556        let script = self.get_script_name();
1557        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1558        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1559        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1560        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1561        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1562        let lifecycle = serde_json::json!({
1563            "event": status,
1564            "worker_id": worker_id,
1565            "worker_name": worker_name,
1566            "started_at": started_at,
1567            "finished_at": finished_at,
1568        });
1569        self.notify(
1570            "session/update",
1571            serde_json::json!({
1572                "sessionId": session_id,
1573                "update": {
1574                    "sessionUpdate": "worker_update",
1575                    "content": {
1576                        "worker_id": worker_id,
1577                        "worker_name": worker_name,
1578                        "status": status,
1579                        "script": script,
1580                        "started_at": started_at,
1581                        "finished_at": finished_at,
1582                        "snapshot_path": snapshot_path,
1583                        "run_id": run_id,
1584                        "run_path": run_path,
1585                        "lifecycle": lifecycle,
1586                        "audit": audit,
1587                        "metadata": metadata,
1588                    },
1589                },
1590            }),
1591        );
1592    }
1593}
1594
1595/// Convert a serde_json::Value to a VmValue.
1596pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1597    crate::stdlib::json_to_vm_value(val)
1598}
1599
1600fn parse_host_tools_list_response(
1601    result: serde_json::Value,
1602) -> Result<Vec<serde_json::Value>, VmError> {
1603    let tools = match result {
1604        serde_json::Value::Array(items) => items,
1605        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1606            map.get("result")
1607                .and_then(|value| value.get("tools"))
1608                .cloned()
1609        }) {
1610            Some(serde_json::Value::Array(items)) => items,
1611            _ => {
1612                return Err(VmError::Runtime(
1613                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1614                ));
1615            }
1616        },
1617        _ => {
1618            return Err(VmError::Runtime(
1619                "host/tools/list: unexpected response shape".into(),
1620            ));
1621        }
1622    };
1623
1624    let mut normalized = Vec::with_capacity(tools.len());
1625    for tool in tools {
1626        let serde_json::Value::Object(map) = tool else {
1627            return Err(VmError::Runtime(
1628                "host/tools/list: every tool must be an object".into(),
1629            ));
1630        };
1631        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1632            return Err(VmError::Runtime(
1633                "host/tools/list: every tool must include a string `name`".into(),
1634            ));
1635        };
1636        let description = map
1637            .get("description")
1638            .and_then(|value| value.as_str())
1639            .or_else(|| {
1640                map.get("short_description")
1641                    .and_then(|value| value.as_str())
1642            })
1643            .unwrap_or_default();
1644        let schema = map
1645            .get("schema")
1646            .cloned()
1647            .or_else(|| map.get("parameters").cloned())
1648            .or_else(|| map.get("input_schema").cloned())
1649            .unwrap_or(serde_json::Value::Null);
1650        let deprecated = map
1651            .get("deprecated")
1652            .and_then(|value| value.as_bool())
1653            .unwrap_or(false);
1654        normalized.push(serde_json::json!({
1655            "name": name,
1656            "description": description,
1657            "schema": schema,
1658            "deprecated": deprecated,
1659        }));
1660    }
1661    Ok(normalized)
1662}
1663
1664#[cfg(test)]
1665mod tests {
1666    use super::*;
1667
1668    fn test_bridge() -> HostBridge {
1669        HostBridge::from_parts(
1670            Arc::new(Mutex::new(HashMap::new())),
1671            Arc::new(AtomicBool::new(false)),
1672            Arc::new(std::sync::Mutex::new(())),
1673            1,
1674        )
1675    }
1676
1677    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1678        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1679            Arc::new(Mutex::new(HashMap::new())),
1680            Arc::new(AtomicBool::new(false)),
1681            Arc::new(Notify::new()),
1682            Arc::new(|_| Ok(())),
1683            100,
1684            Some(owner.injection_state()),
1685        )
1686    }
1687
1688    #[test]
1689    fn test_json_rpc_request_format() {
1690        let request = crate::jsonrpc::request(
1691            1,
1692            "llm_call",
1693            serde_json::json!({
1694                "prompt": "Hello",
1695                "system": "Be helpful",
1696            }),
1697        );
1698        let s = serde_json::to_string(&request).unwrap();
1699        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1700        assert!(s.contains("\"id\":1"));
1701        assert!(s.contains("\"method\":\"llm_call\""));
1702    }
1703
1704    #[test]
1705    fn test_json_rpc_notification_format() {
1706        let notification =
1707            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1708        let s = serde_json::to_string(&notification).unwrap();
1709        assert!(s.contains("\"method\":\"output\""));
1710        assert!(!s.contains("\"id\""));
1711    }
1712
1713    #[test]
1714    fn test_json_rpc_error_response_parsing() {
1715        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1716        assert!(response.get("error").is_some());
1717        assert_eq!(
1718            response["error"]["message"].as_str().unwrap(),
1719            "Invalid request"
1720        );
1721    }
1722
1723    #[test]
1724    fn test_json_rpc_success_response_parsing() {
1725        let response = crate::jsonrpc::response(
1726            1,
1727            serde_json::json!({
1728                "text": "Hello world",
1729                "input_tokens": 10,
1730                "output_tokens": 5,
1731            }),
1732        );
1733        assert!(response.get("result").is_some());
1734        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1735    }
1736
1737    #[test]
1738    fn test_cancelled_flag() {
1739        let cancelled = Arc::new(AtomicBool::new(false));
1740        assert!(!cancelled.load(Ordering::SeqCst));
1741        cancelled.store(true, Ordering::SeqCst);
1742        assert!(cancelled.load(Ordering::SeqCst));
1743    }
1744
1745    #[test]
1746    fn pending_host_calls_return_when_cancellation_arrives() {
1747        let runtime = tokio::runtime::Builder::new_current_thread()
1748            .enable_all()
1749            .build()
1750            .unwrap();
1751        runtime.block_on(async {
1752            let pending = Arc::new(Mutex::new(HashMap::new()));
1753            let cancelled = Arc::new(AtomicBool::new(false));
1754            let bridge = HostBridge::from_parts_with_writer(
1755                pending.clone(),
1756                cancelled.clone(),
1757                Arc::new(|_| Ok(())),
1758                1,
1759            );
1760
1761            let call = bridge.call("host/work", serde_json::json!({}));
1762            tokio::pin!(call);
1763
1764            loop {
1765                tokio::select! {
1766                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1767                    _ = tokio::task::yield_now() => {}
1768                }
1769                if !pending.lock().await.is_empty() {
1770                    break;
1771                }
1772            }
1773
1774            cancelled.store(true, Ordering::SeqCst);
1775            bridge.cancel_notify.notify_waiters();
1776
1777            let result = tokio::time::timeout(Duration::from_secs(1), call)
1778                .await
1779                .expect("pending call should observe cancellation promptly");
1780            assert!(
1781                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1782            );
1783            assert!(pending.lock().await.is_empty());
1784        });
1785    }
1786
1787    #[test]
1788    fn queued_messages_are_filtered_by_delivery_mode() {
1789        let runtime = tokio::runtime::Builder::new_current_thread()
1790            .enable_all()
1791            .build()
1792            .unwrap();
1793        runtime.block_on(async {
1794            let bridge = test_bridge();
1795            bridge
1796                .push_queued_user_message("first".to_string(), "finish_step")
1797                .await;
1798            bridge
1799                .push_queued_user_message("second".to_string(), "audit_only")
1800                .await;
1801
1802            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1803            assert_eq!(finish_step.len(), 1);
1804            assert_eq!(finish_step[0].content, "first");
1805
1806            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1807            assert_eq!(audit_only.len(), 1);
1808            assert_eq!(audit_only[0].content, "second");
1809        });
1810    }
1811
1812    #[test]
1813    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1814        let runtime = tokio::runtime::Builder::new_current_thread()
1815            .enable_all()
1816            .build()
1817            .unwrap();
1818        runtime.block_on(async {
1819            let bridge = test_bridge();
1820            let first_id = bridge
1821                .push_pending_user_message(
1822                    "first".to_string(),
1823                    serde_json::json!("first"),
1824                    "audit_only",
1825                )
1826                .await;
1827            let second_id = bridge
1828                .push_pending_user_message(
1829                    "second".to_string(),
1830                    serde_json::json!("second"),
1831                    "audit_only",
1832                )
1833                .await;
1834
1835            assert_eq!(
1836                bridge
1837                    .replace_pending_user_message(
1838                        &second_id,
1839                        "second edited".to_string(),
1840                        serde_json::json!("second edited"),
1841                    )
1842                    .await,
1843                PendingUserMessageMutationResult::Mutated
1844            );
1845            assert_eq!(
1846                bridge.revoke_pending_user_message(&first_id).await,
1847                PendingUserMessageMutationResult::Mutated
1848            );
1849            assert_eq!(
1850                bridge.revoke_pending_user_message(&first_id).await,
1851                PendingUserMessageMutationResult::AlreadyRevoked
1852            );
1853
1854            let delivered = bridge
1855                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1856                .await;
1857            assert_eq!(delivered.len(), 1);
1858            assert_eq!(delivered[0].message_id, second_id);
1859            assert_eq!(delivered[0].content, "second edited");
1860
1861            assert_eq!(
1862                bridge.revoke_pending_user_message(&second_id).await,
1863                PendingUserMessageMutationResult::AlreadyDelivered
1864            );
1865            assert_eq!(
1866                bridge
1867                    .replace_pending_user_message(
1868                        &second_id,
1869                        "too late".to_string(),
1870                        serde_json::json!("too late"),
1871                    )
1872                    .await,
1873                PendingUserMessageMutationResult::AlreadyDelivered
1874            );
1875            assert_eq!(
1876                bridge.revoke_pending_user_message("missing").await,
1877                PendingUserMessageMutationResult::UnknownMessageId
1878            );
1879        });
1880    }
1881
1882    #[test]
1883    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1884        let runtime = tokio::runtime::Builder::new_current_thread()
1885            .enable_all()
1886            .build()
1887            .unwrap();
1888        runtime.block_on(async {
1889            let bridge = test_bridge();
1890            let first_id = bridge
1891                .push_pending_user_message(
1892                    "first".to_string(),
1893                    serde_json::json!("first"),
1894                    "finish_step",
1895                )
1896                .await;
1897            let second_id = bridge
1898                .push_pending_user_message(
1899                    "second".to_string(),
1900                    serde_json::json!("second"),
1901                    "finish_step",
1902                )
1903                .await;
1904            assert_eq!(
1905                bridge
1906                    .replace_pending_user_message(
1907                        &first_id,
1908                        "first edited".to_string(),
1909                        serde_json::json!("first edited"),
1910                    )
1911                    .await,
1912                PendingUserMessageMutationResult::Mutated
1913            );
1914
1915            let delivered = bridge
1916                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1917                .await;
1918            assert_eq!(
1919                delivered
1920                    .iter()
1921                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1922                    .collect::<Vec<_>>(),
1923                vec![
1924                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1925                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
1926                ]
1927            );
1928        });
1929    }
1930
1931    #[test]
1932    fn pending_user_message_state_survives_bridge_replacement() {
1933        let runtime = tokio::runtime::Builder::new_current_thread()
1934            .enable_all()
1935            .build()
1936            .unwrap();
1937        runtime.block_on(async {
1938            let bridge = test_bridge();
1939            let revoked_id = bridge
1940                .push_pending_user_message(
1941                    "revoke me".to_string(),
1942                    serde_json::json!("revoke me"),
1943                    "audit_only",
1944                )
1945                .await;
1946            let delivered_id = bridge
1947                .push_pending_user_message(
1948                    "deliver me".to_string(),
1949                    serde_json::json!("deliver me"),
1950                    "audit_only",
1951                )
1952                .await;
1953            assert_eq!(
1954                bridge.revoke_pending_user_message(&revoked_id).await,
1955                PendingUserMessageMutationResult::Mutated
1956            );
1957            bridge.cancelled.store(true, Ordering::SeqCst);
1958
1959            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1960            assert_eq!(
1961                replacement_bridge
1962                    .revoke_pending_user_message(&revoked_id)
1963                    .await,
1964                PendingUserMessageMutationResult::AlreadyRevoked
1965            );
1966            let delivered = replacement_bridge
1967                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1968                .await;
1969            assert_eq!(delivered.len(), 1);
1970            assert_eq!(delivered[0].message_id, delivered_id);
1971            assert_eq!(delivered[0].content, "deliver me");
1972            assert_eq!(
1973                bridge.revoke_pending_user_message(&delivered_id).await,
1974                PendingUserMessageMutationResult::AlreadyDelivered
1975            );
1976        });
1977    }
1978
1979    #[test]
1980    fn queued_transcript_injections_preserve_user_reminder_separation() {
1981        let runtime = tokio::runtime::Builder::new_current_thread()
1982            .enable_all()
1983            .build()
1984            .unwrap();
1985        runtime.block_on(async {
1986            let bridge = test_bridge();
1987            bridge
1988                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1989                .await;
1990            let reminder_id = bridge
1991                .push_queued_session_remind_from_params(&serde_json::json!({
1992                    "body": "Host-provided ambient context.",
1993                    "tags": ["host"],
1994                    "dedupe_key": "host-context",
1995                    "ttl_turns": 2,
1996                    "mode": "audit_only",
1997                    "_meta": {"harn": {"source": "test"}},
1998                }))
1999                .await
2000                .expect("valid reminder");
2001
2002            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2003            assert_eq!(finish_step.len(), 1);
2004            assert_eq!(finish_step[0].content, "human follow-up");
2005
2006            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2007            assert!(no_user_messages.is_empty());
2008
2009            let injections = bridge
2010                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2011                .await;
2012            assert_eq!(injections.len(), 1);
2013            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2014                panic!("expected queued reminder");
2015            };
2016            assert_eq!(reminder.reminder.id, reminder_id);
2017            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2018            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2019            assert_eq!(
2020                reminder.reminder.dedupe_key.as_deref(),
2021                Some("host-context")
2022            );
2023            assert_eq!(reminder.reminder.ttl_turns, Some(2));
2024            assert_eq!(
2025                reminder.reminder.source,
2026                crate::llm::helpers::ReminderSource::Bridge
2027            );
2028        });
2029    }
2030
2031    #[test]
2032    fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2033        let runtime = tokio::runtime::Builder::new_current_thread()
2034            .enable_all()
2035            .build()
2036            .unwrap();
2037        runtime.block_on(async {
2038            let bridge = test_bridge();
2039            let message_id = bridge
2040                .push_pending_user_message(
2041                    "human follow-up".to_string(),
2042                    serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2043                    "finish_step",
2044                )
2045                .await;
2046            let reminder_id = bridge
2047                .push_queued_session_remind_from_params(&serde_json::json!({
2048                    "id": "rem-test",
2049                    "body": "Host reminder",
2050                    "tags": ["host"],
2051                    "dedupe_key": "host-reminder",
2052                    "ttl_turns": 2,
2053                    "mode": "interrupt_immediate",
2054                }))
2055                .await
2056                .expect("valid session/remind payload");
2057
2058            let pending = bridge.pending_injections_json().await;
2059            assert_eq!(pending["pendingCount"], 2);
2060            assert_eq!(pending["injections"][0]["kind"], "user");
2061            assert_eq!(pending["injections"][0]["id"], message_id);
2062            assert_eq!(pending["injections"][0]["messageId"], message_id);
2063            assert_eq!(pending["injections"][0]["mode"], "finish_step");
2064            assert_eq!(pending["injections"][0]["position"], 0);
2065            assert_eq!(pending["injections"][1]["kind"], "reminder");
2066            assert_eq!(pending["injections"][1]["id"], reminder_id);
2067            assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2068            assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2069            assert_eq!(pending["injections"][1]["body"], "Host reminder");
2070            assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2071            assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2072            assert_eq!(pending["injections"][1]["position"], 1);
2073        });
2074    }
2075
2076    #[test]
2077    fn pending_reminders_support_revoke_and_delivery_states() {
2078        let runtime = tokio::runtime::Builder::new_current_thread()
2079            .enable_all()
2080            .build()
2081            .unwrap();
2082        runtime.block_on(async {
2083            let bridge = test_bridge();
2084            let revoked_id = bridge
2085                .push_queued_session_remind_from_params(&serde_json::json!({
2086                    "id": "rem-revoke",
2087                    "body": "remove me",
2088                    "mode": "finish_step",
2089                }))
2090                .await
2091                .expect("valid session/remind payload");
2092            let delivered_id = bridge
2093                .push_queued_session_remind_from_params(&serde_json::json!({
2094                    "id": "rem-deliver",
2095                    "body": "deliver me",
2096                    "mode": "finish_step",
2097                }))
2098                .await
2099                .expect("valid session/remind payload");
2100
2101            assert_eq!(
2102                bridge.revoke_pending_reminder(&revoked_id).await,
2103                PendingReminderMutationResult::Mutated
2104            );
2105            assert_eq!(
2106                bridge.revoke_pending_reminder(&revoked_id).await,
2107                PendingReminderMutationResult::AlreadyRevoked
2108            );
2109
2110            let pending = bridge.pending_injections_json().await;
2111            assert_eq!(pending["pendingCount"], 1);
2112            assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2113
2114            let delivered = bridge
2115                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2116                .await;
2117            assert_eq!(delivered.len(), 1);
2118            let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2119                panic!("expected delivered reminder");
2120            };
2121            assert_eq!(reminder.reminder.id, delivered_id);
2122
2123            assert_eq!(
2124                bridge.revoke_pending_reminder(&delivered_id).await,
2125                PendingReminderMutationResult::AlreadyDelivered
2126            );
2127            assert_eq!(
2128                bridge.revoke_pending_reminder("missing").await,
2129                PendingReminderMutationResult::UnknownReminderId
2130            );
2131        });
2132    }
2133
2134    #[test]
2135    fn bridge_remind_modes_honor_delivery_checkpoints() {
2136        let runtime = tokio::runtime::Builder::new_current_thread()
2137            .enable_all()
2138            .build()
2139            .unwrap();
2140        runtime.block_on(async {
2141            let cases = [
2142                (
2143                    "interrupt_immediate",
2144                    DeliveryCheckpoint::InterruptImmediate,
2145                    DeliveryCheckpoint::AfterCurrentOperation,
2146                ),
2147                (
2148                    "finish_step",
2149                    DeliveryCheckpoint::AfterCurrentOperation,
2150                    DeliveryCheckpoint::EndOfInteraction,
2151                ),
2152                (
2153                    "audit_only",
2154                    DeliveryCheckpoint::EndOfInteraction,
2155                    DeliveryCheckpoint::InterruptImmediate,
2156                ),
2157            ];
2158
2159            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2160                let bridge = test_bridge();
2161                bridge
2162                    .push_queued_session_remind_from_params(&serde_json::json!({
2163                        "body": format!("Reminder for {mode}"),
2164                        "mode": mode,
2165                    }))
2166                    .await
2167                    .expect("valid session/remind payload");
2168
2169                let premature = bridge
2170                    .take_queued_transcript_injections_for(wrong_checkpoint)
2171                    .await;
2172                assert!(
2173                    premature.is_empty(),
2174                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2175                );
2176
2177                let delivered = bridge
2178                    .take_queued_transcript_injections_for(expected_checkpoint)
2179                    .await;
2180                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2181                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2182                    panic!("expected reminder for {mode}");
2183                };
2184                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2185            }
2186        });
2187    }
2188
2189    #[test]
2190    fn session_remind_validation_rejects_user_message_shape() {
2191        let err = queued_session_remind_from_params(&serde_json::json!({
2192            "content": "this is still a user message",
2193            "mode": "interrupt_immediate",
2194        }))
2195        .expect_err("session/remind must require a reminder body");
2196        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2197        assert!(err.contains("body"));
2198    }
2199
2200    #[test]
2201    fn session_remind_validation_rejects_unknown_options_separately() {
2202        let err = queued_session_remind_from_params(&serde_json::json!({
2203            "body": "valid body",
2204            "unknown_host_field": true,
2205        }))
2206        .expect_err("session/remind must reject unknown top-level fields");
2207        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2208        assert!(err.contains("unknown_host_field"));
2209    }
2210
2211    #[test]
2212    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2213        let err = queued_session_remind_from_params(&serde_json::json!({
2214            "body": "valid body",
2215            "propagate": "workspace",
2216        }))
2217        .expect_err("session/remind must reject unknown propagate values");
2218        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2219        assert!(err.contains("propagate"));
2220    }
2221
2222    #[test]
2223    fn test_json_result_to_vm_value_string() {
2224        let val = serde_json::json!("hello");
2225        let vm_val = json_result_to_vm_value(&val);
2226        assert_eq!(vm_val.display(), "hello");
2227    }
2228
2229    #[test]
2230    fn test_json_result_to_vm_value_dict() {
2231        let val = serde_json::json!({"name": "test", "count": 42});
2232        let vm_val = json_result_to_vm_value(&val);
2233        let VmValue::Dict(d) = &vm_val else {
2234            unreachable!("Expected Dict, got {:?}", vm_val);
2235        };
2236        assert_eq!(d.get("name").unwrap().display(), "test");
2237        assert_eq!(d.get("count").unwrap().display(), "42");
2238    }
2239
2240    #[test]
2241    fn test_json_result_to_vm_value_null() {
2242        let val = serde_json::json!(null);
2243        let vm_val = json_result_to_vm_value(&val);
2244        assert!(matches!(vm_val, VmValue::Nil));
2245    }
2246
2247    #[test]
2248    fn test_json_result_to_vm_value_nested() {
2249        let val = serde_json::json!({
2250            "text": "response",
2251            "tool_calls": [
2252                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2253            ],
2254            "input_tokens": 100,
2255            "output_tokens": 50,
2256        });
2257        let vm_val = json_result_to_vm_value(&val);
2258        let VmValue::Dict(d) = &vm_val else {
2259            unreachable!("Expected Dict, got {:?}", vm_val);
2260        };
2261        assert_eq!(d.get("text").unwrap().display(), "response");
2262        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2263            unreachable!("Expected List for tool_calls");
2264        };
2265        assert_eq!(list.len(), 1);
2266    }
2267
2268    #[test]
2269    fn parse_host_tools_list_accepts_object_wrapper() {
2270        let tools = parse_host_tools_list_response(serde_json::json!({
2271            "tools": [
2272                {
2273                    "name": "Read",
2274                    "description": "Read a file",
2275                    "schema": {"type": "object"},
2276                }
2277            ]
2278        }))
2279        .expect("tool list");
2280
2281        assert_eq!(tools.len(), 1);
2282        assert_eq!(tools[0]["name"], "Read");
2283        assert_eq!(tools[0]["deprecated"], false);
2284    }
2285
2286    #[test]
2287    fn parse_host_tools_list_accepts_compat_fields() {
2288        let tools = parse_host_tools_list_response(serde_json::json!({
2289            "result": {
2290                "tools": [
2291                    {
2292                        "name": "Edit",
2293                        "short_description": "Apply an edit",
2294                        "input_schema": {"type": "object"},
2295                        "deprecated": true,
2296                    }
2297                ]
2298            }
2299        }))
2300        .expect("tool list");
2301
2302        assert_eq!(tools[0]["description"], "Apply an edit");
2303        assert_eq!(tools[0]["schema"]["type"], "object");
2304        assert_eq!(tools[0]["deprecated"], true);
2305    }
2306
2307    #[test]
2308    fn parse_host_tools_list_requires_tool_names() {
2309        let err = parse_host_tools_list_response(serde_json::json!({
2310            "tools": [
2311                {"description": "missing name"}
2312            ]
2313        }))
2314        .expect_err("expected error");
2315        assert!(err
2316            .to_string()
2317            .contains("host/tools/list: every tool must include a string `name`"));
2318    }
2319
2320    #[test]
2321    fn test_timeout_duration() {
2322        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2323    }
2324}