Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27/// Default timeout for bridge calls (5 minutes).
28const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33    Arc::new(move |line: &str| {
34        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35        let mut stdout = std::io::stdout().lock();
36        stdout
37            .write_all(line.as_bytes())
38            .map_err(|e| format!("Bridge write error: {e}"))?;
39        stdout
40            .write_all(b"\n")
41            .map_err(|e| format!("Bridge write error: {e}"))?;
42        stdout
43            .flush()
44            .map_err(|e| format!("Bridge flush error: {e}"))?;
45        Ok(())
46    })
47}
48
49/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
50///
51/// The bridge sends requests to the host on stdout and receives responses
52/// on stdin. A background task reads stdin and dispatches responses to
53/// waiting callers by request ID. All stdout writes are serialized through
54/// a mutex to prevent interleaving.
55pub struct HostBridge {
56    next_id: AtomicU64,
57    /// Pending request waiters, keyed by JSON-RPC id.
58    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59    /// Whether the host has sent a cancel notification.
60    cancelled: Arc<AtomicBool>,
61    /// Wakes pending host calls when cancellation arrives.
62    cancel_notify: Arc<Notify>,
63    /// Transport writer used to send JSON-RPC to the host.
64    writer: HostBridgeWriter,
65    /// ACP session ID (set in ACP mode for session-scoped notifications).
66    session_id: std::sync::Mutex<String>,
67    /// Name of the currently executing Harn script (without .harn suffix).
68    script_name: std::sync::Mutex<String>,
69    /// Transcript injections queued by the host while a run is active.
70    queued_transcript_injections: HostBridgeInjectionState,
71    /// Host-triggered resume signal for daemon agents.
72    resume_requested: Arc<AtomicBool>,
73    /// Host-triggered skill-registry invalidation signal. Set when the
74    /// host sends a `skills/update` notification; consumed by the CLI
75    /// between runs (watch mode, long-running agents) to rebuild the
76    /// layered skill catalog from its current filesystem + host state.
77    skills_reload_requested: Arc<AtomicBool>,
78    /// Whether the current daemon-mode agent loop is blocked in idle wait.
79    daemon_idle: Arc<AtomicBool>,
80    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
81    /// finalize during this prompt. Read once by the ACP adapter when the
82    /// pipeline returns and populated by `host_agent_session_finalize`.
83    /// Pipelines that don't run an agent loop leave this `None`, in which
84    /// case the adapter falls back to `end_turn`.
85    prompt_stop_reason: std::sync::Mutex<Option<String>>,
86    /// Per-call visible assistant text state for call_progress notifications.
87    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88    /// Whether an LLM call's deltas should be exposed to end users while streaming.
89    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90    /// Optional in-process host-module backend used by `harn playground`.
91    in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95    module_path: PathBuf,
96    exported_functions: BTreeMap<String, Rc<VmClosure>>,
97    vm: Vm,
98}
99
100impl InProcessHost {
101    /// Box-pin'd to break the static recursion between the VM's hot dispatch
102    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
103    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
104    /// at this slow-path boundary keeps the recursion satisfied without
105    /// allocating per call in the hot per-callback path.
106    fn dispatch<'a>(
107        &'a self,
108        method: &'a str,
109        params: serde_json::Value,
110    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111        Box::pin(async move {
112            match method {
113                "builtin_call" => {
114                    let name = params
115                        .get("name")
116                        .and_then(|value| value.as_str())
117                        .unwrap_or_default();
118                    let args = params
119                        .get("args")
120                        .and_then(|value| value.as_array())
121                        .cloned()
122                        .unwrap_or_default()
123                        .into_iter()
124                        .map(|value| json_result_to_vm_value(&value))
125                        .collect::<Vec<_>>();
126                    self.invoke_export(name, &args).await
127                }
128                "host/tools/list" => self
129                    .invoke_optional_export("host_tools_list", &[])
130                    .await
131                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132                "session/request_permission" => self.request_permission(params).await,
133                other => Err(VmError::Runtime(format!(
134                    "playground host backend does not implement bridge method '{other}'"
135                ))),
136            }
137        })
138    }
139
140    async fn invoke_export(
141        &self,
142        name: &str,
143        args: &[VmValue],
144    ) -> Result<serde_json::Value, VmError> {
145        let Some(closure) = self.exported_functions.get(name) else {
146            return Err(VmError::Runtime(format!(
147                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148                self.module_path.display()
149            )));
150        };
151
152        let mut vm = self.vm.child_vm_for_host();
153        let result = vm.call_closure_pub(closure, args).await?;
154        Ok(crate::llm::vm_value_to_json(&result))
155    }
156
157    async fn invoke_optional_export(
158        &self,
159        name: &str,
160        args: &[VmValue],
161    ) -> Result<Option<serde_json::Value>, VmError> {
162        if !self.exported_functions.contains_key(name) {
163            return Ok(None);
164        }
165        self.invoke_export(name, args).await.map(Some)
166    }
167
168    async fn request_permission(
169        &self,
170        params: serde_json::Value,
171    ) -> Result<serde_json::Value, VmError> {
172        let Some(closure) = self.exported_functions.get("request_permission") else {
173            return Ok(serde_json::json!({ "granted": true }));
174        };
175
176        let tool_name = params
177            .get("toolCall")
178            .and_then(|tool_call| tool_call.get("toolName"))
179            .and_then(|value| value.as_str())
180            .unwrap_or_default();
181        let tool_args = params
182            .get("toolCall")
183            .and_then(|tool_call| tool_call.get("rawInput"))
184            .map(json_result_to_vm_value)
185            .unwrap_or(VmValue::Nil);
186        let full_payload = json_result_to_vm_value(&params);
187
188        let arg_count = closure.func.params.len();
189        let args = if arg_count >= 3 {
190            vec![
191                VmValue::String(Rc::from(tool_name.to_string())),
192                tool_args,
193                full_payload,
194            ]
195        } else if arg_count == 2 {
196            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
197        } else if arg_count == 1 {
198            vec![full_payload]
199        } else {
200            Vec::new()
201        };
202
203        let mut vm = self.vm.child_vm_for_host();
204        let result = vm.call_closure_pub(closure, &args).await?;
205        let payload = match result {
206            VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
207            VmValue::String(reason) if !reason.is_empty() => {
208                serde_json::json!({ "granted": false, "reason": reason.to_string() })
209            }
210            other => {
211                let json = crate::llm::vm_value_to_json(&other);
212                if json
213                    .get("granted")
214                    .and_then(|value| value.as_bool())
215                    .is_some()
216                    || json.get("outcome").is_some()
217                {
218                    json
219                } else {
220                    serde_json::json!({ "granted": other.is_truthy() })
221                }
222            }
223        };
224        Ok(payload)
225    }
226}
227
228/// How a queued bridge injection is delivered into the agent loop.
229///
230/// `AuditOnly` was previously called `WaitForCompletion`; the rename is
231/// truth-in-advertising (harn#2212). These injections drain at
232/// `loop_exit`, *after* the last LLM call has returned — so they land in
233/// the transcript audit but are **never rendered into a model prompt**.
234/// Hosts that want the model to react to the reminder on its final
235/// iteration should use `FinishStep` instead, which drains at every
236/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
237#[derive(Clone, Copy, Debug, PartialEq, Eq)]
238pub enum QueuedUserMessageMode {
239    InterruptImmediate,
240    FinishStep,
241    AuditOnly,
242}
243
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245pub enum DeliveryCheckpoint {
246    InterruptImmediate,
247    AfterCurrentOperation,
248    EndOfInteraction,
249}
250
251impl QueuedUserMessageMode {
252    fn from_str(value: &str) -> Self {
253        match value {
254            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
255            "finish_step" | "after_current_operation" => Self::FinishStep,
256            // Unknown / missing modes fall through to the safest option:
257            // record for audit, do not preempt the loop. Pre-#2212 hosts
258            // that send `wait_for_completion` are caught by this arm —
259            // the canonical name is `audit_only` going forward.
260            _ => Self::AuditOnly,
261        }
262    }
263
264    fn as_str(self) -> &'static str {
265        match self {
266            Self::InterruptImmediate => "interrupt_immediate",
267            Self::FinishStep => "finish_step",
268            Self::AuditOnly => "audit_only",
269        }
270    }
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct QueuedUserMessage {
275    pub message_id: String,
276    pub content: String,
277    pub transcript_content: serde_json::Value,
278    pub mode: QueuedUserMessageMode,
279}
280
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub struct QueuedReminder {
283    pub reminder: crate::llm::helpers::SystemReminder,
284    pub mode: QueuedUserMessageMode,
285}
286
287#[derive(Clone, Debug, PartialEq, Eq)]
288pub enum QueuedTranscriptInjection {
289    User(QueuedUserMessage),
290    Reminder(QueuedReminder),
291}
292
293#[derive(Debug, Default)]
294struct QueuedTranscriptInjections {
295    queue: VecDeque<QueuedTranscriptInjection>,
296    revoked_user_message_ids: HashSet<String>,
297    delivered_user_message_ids: HashSet<String>,
298    revoked_reminder_ids: HashSet<String>,
299    delivered_reminder_ids: HashSet<String>,
300}
301
302#[derive(Clone, Debug, Default)]
303pub struct HostBridgeInjectionState {
304    inner: Arc<Mutex<QueuedTranscriptInjections>>,
305}
306
307#[derive(Clone, Copy, Debug, PartialEq, Eq)]
308pub enum PendingUserMessageMutationResult {
309    Mutated,
310    AlreadyRevoked,
311    AlreadyDelivered,
312    UnknownMessageId,
313}
314
315impl QueuedTranscriptInjection {
316    fn mode(&self) -> QueuedUserMessageMode {
317        match self {
318            Self::User(message) => message.mode,
319            Self::Reminder(reminder) => reminder.mode,
320        }
321    }
322
323    fn pending_json(&self, position: usize) -> serde_json::Value {
324        match self {
325            Self::User(message) => serde_json::json!({
326                "kind": "user",
327                "id": message.message_id,
328                "messageId": message.message_id,
329                "mode": message.mode.as_str(),
330                "position": position,
331                "content": message.transcript_content,
332            }),
333            Self::Reminder(reminder) => serde_json::json!({
334                "kind": "reminder",
335                "id": reminder.reminder.id,
336                "reminderId": reminder.reminder.id,
337                "mode": reminder.mode.as_str(),
338                "position": position,
339                "body": reminder.reminder.body,
340                "tags": reminder.reminder.tags,
341                "dedupeKey": reminder.reminder.dedupe_key,
342                "ttlTurns": reminder.reminder.ttl_turns,
343                "preserveOnCompact": reminder.reminder.preserve_on_compact,
344                "propagate": reminder.reminder.propagate.as_str(),
345                "roleHint": reminder.reminder.role_hint.as_str(),
346                "source": reminder.reminder.source.as_str(),
347                "firedAtTurn": reminder.reminder.fired_at_turn,
348                "originatingAgentId": reminder.reminder.originating_agent_id,
349            }),
350        }
351    }
352}
353
354#[derive(Clone, Copy, Debug, PartialEq, Eq)]
355pub enum PendingReminderMutationResult {
356    Mutated,
357    AlreadyRevoked,
358    AlreadyDelivered,
359    UnknownReminderId,
360}
361
362fn new_inject_message_id() -> String {
363    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
364}
365
366impl HostBridgeInjectionState {
367    pub fn new() -> Self {
368        Self::default()
369    }
370
371    pub async fn push_pending_user_message(
372        &self,
373        content: String,
374        transcript_content: serde_json::Value,
375        mode: &str,
376    ) -> String {
377        let message_id = new_inject_message_id();
378        self.inner
379            .lock()
380            .await
381            .queue
382            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
383                message_id: message_id.clone(),
384                content,
385                transcript_content,
386                mode: QueuedUserMessageMode::from_str(mode),
387            }));
388        message_id
389    }
390
391    pub async fn revoke_pending_user_message(
392        &self,
393        message_id: &str,
394    ) -> PendingUserMessageMutationResult {
395        let mut state = self.inner.lock().await;
396        let mut retained = VecDeque::new();
397        let mut revoked = false;
398        while let Some(injection) = state.queue.pop_front() {
399            match &injection {
400                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
401                    revoked = true;
402                }
403                _ => retained.push_back(injection),
404            }
405        }
406        state.queue = retained;
407        if revoked {
408            state
409                .revoked_user_message_ids
410                .insert(message_id.to_string());
411            return PendingUserMessageMutationResult::Mutated;
412        }
413        if state.revoked_user_message_ids.contains(message_id) {
414            PendingUserMessageMutationResult::AlreadyRevoked
415        } else if state.delivered_user_message_ids.contains(message_id) {
416            PendingUserMessageMutationResult::AlreadyDelivered
417        } else {
418            PendingUserMessageMutationResult::UnknownMessageId
419        }
420    }
421
422    pub async fn revoke_pending_reminder(
423        &self,
424        reminder_id: &str,
425    ) -> PendingReminderMutationResult {
426        let mut state = self.inner.lock().await;
427        let mut retained = VecDeque::new();
428        let mut revoked = false;
429        while let Some(injection) = state.queue.pop_front() {
430            match &injection {
431                QueuedTranscriptInjection::Reminder(reminder)
432                    if reminder.reminder.id == reminder_id =>
433                {
434                    revoked = true;
435                }
436                _ => retained.push_back(injection),
437            }
438        }
439        state.queue = retained;
440        if revoked {
441            state.revoked_reminder_ids.insert(reminder_id.to_string());
442            return PendingReminderMutationResult::Mutated;
443        }
444        if state.revoked_reminder_ids.contains(reminder_id) {
445            PendingReminderMutationResult::AlreadyRevoked
446        } else if state.delivered_reminder_ids.contains(reminder_id) {
447            PendingReminderMutationResult::AlreadyDelivered
448        } else {
449            PendingReminderMutationResult::UnknownReminderId
450        }
451    }
452
453    pub async fn replace_pending_user_message(
454        &self,
455        message_id: &str,
456        content: String,
457        transcript_content: serde_json::Value,
458    ) -> PendingUserMessageMutationResult {
459        let mut state = self.inner.lock().await;
460        for injection in &mut state.queue {
461            if let QueuedTranscriptInjection::User(message) = injection {
462                if message.message_id == message_id {
463                    message.content = content;
464                    message.transcript_content = transcript_content;
465                    return PendingUserMessageMutationResult::Mutated;
466                }
467            }
468        }
469        if state.revoked_user_message_ids.contains(message_id) {
470            PendingUserMessageMutationResult::AlreadyRevoked
471        } else if state.delivered_user_message_ids.contains(message_id) {
472            PendingUserMessageMutationResult::AlreadyDelivered
473        } else {
474            PendingUserMessageMutationResult::UnknownMessageId
475        }
476    }
477
478    async fn push_session_reminder(&self, reminder: QueuedReminder) {
479        self.inner
480            .lock()
481            .await
482            .queue
483            .push_back(QueuedTranscriptInjection::Reminder(reminder));
484    }
485
486    pub async fn pending_injections_json(&self) -> serde_json::Value {
487        let state = self.inner.lock().await;
488        let injections = state
489            .queue
490            .iter()
491            .enumerate()
492            .map(|(position, injection)| injection.pending_json(position))
493            .collect::<Vec<_>>();
494        serde_json::json!({
495            "pendingCount": injections.len(),
496            "injections": injections,
497        })
498    }
499}
500
501fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
502    format!(
503        "{}: {}",
504        Code::ReminderUnknownOption.as_str(),
505        message.as_ref()
506    )
507}
508
509fn session_remind_shape_error(message: impl AsRef<str>) -> String {
510    format!(
511        "{}: {}",
512        Code::ReminderInvalidShape.as_str(),
513        message.as_ref()
514    )
515}
516
517fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
518    format!(
519        "{}: {}",
520        Code::ReminderUnknownPropagate.as_str(),
521        message.as_ref()
522    )
523}
524
525fn string_field(
526    map: &serde_json::Map<String, serde_json::Value>,
527    key: &str,
528    required: bool,
529) -> Result<Option<String>, String> {
530    match map.get(key) {
531        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
532            format!("`{key}` must be a non-empty string"),
533        )),
534        None | Some(serde_json::Value::Null) => Ok(None),
535        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
536            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
537        ),
538        Some(serde_json::Value::String(value)) => {
539            let trimmed = value.trim();
540            if trimmed.is_empty() {
541                Ok(None)
542            } else {
543                Ok(Some(trimmed.to_string()))
544            }
545        }
546        Some(other) => Err(session_remind_shape_error(format!(
547            "`{key}` must be a string, got {other}"
548        ))),
549    }
550}
551
552fn bool_field(
553    map: &serde_json::Map<String, serde_json::Value>,
554    key: &str,
555) -> Result<Option<bool>, String> {
556    match map.get(key) {
557        None | Some(serde_json::Value::Null) => Ok(None),
558        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
559        Some(other) => Err(session_remind_shape_error(format!(
560            "`{key}` must be a bool, got {other}"
561        ))),
562    }
563}
564
565fn int_field(
566    map: &serde_json::Map<String, serde_json::Value>,
567    key: &str,
568) -> Result<Option<i64>, String> {
569    match map.get(key) {
570        None | Some(serde_json::Value::Null) => Ok(None),
571        Some(serde_json::Value::Number(value)) => {
572            let Some(value) = value.as_i64() else {
573                return Err(session_remind_shape_error(format!(
574                    "`{key}` must be an integer"
575                )));
576            };
577            Ok(Some(value))
578        }
579        Some(other) => Err(session_remind_shape_error(format!(
580            "`{key}` must be an int, got {other}"
581        ))),
582    }
583}
584
585fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
586    let Some(value) = map.get("tags") else {
587        return Ok(Vec::new());
588    };
589    if value.is_null() {
590        return Ok(Vec::new());
591    }
592    let Some(values) = value.as_array() else {
593        return Err(session_remind_shape_error("`tags` must be a list"));
594    };
595    let mut tags = Vec::new();
596    for value in values {
597        let Some(tag) = value.as_str() else {
598            return Err(session_remind_shape_error(format!(
599                "`tags` entries must be strings, got {value}"
600            )));
601        };
602        let tag = tag.trim();
603        if tag.is_empty() {
604            return Err(session_remind_shape_error(
605                "`tags` entries must be non-empty strings",
606            ));
607        }
608        if !tags.iter().any(|existing| existing == tag) {
609            tags.push(tag.to_string());
610        }
611    }
612    Ok(tags)
613}
614
615fn session_remind_payload_from_value(
616    value: &serde_json::Value,
617) -> Result<crate::llm::helpers::SystemReminder, String> {
618    let Some(map) = value.as_object() else {
619        return Err(session_remind_shape_error(
620            "session/remind payload must be a reminder object",
621        ));
622    };
623    const ALLOWED: &[&str] = &[
624        "_meta",
625        "body",
626        "dedupe_key",
627        "fired_at_turn",
628        "id",
629        "preserve_on_compact",
630        "propagate",
631        "role_hint",
632        "source",
633        "tags",
634        "ttl_turns",
635    ];
636    let unknown = map
637        .keys()
638        .filter(|key| !ALLOWED.contains(&key.as_str()))
639        .map(String::as_str)
640        .collect::<Vec<_>>();
641    if !unknown.is_empty() {
642        if unknown.contains(&"content") {
643            return Err(session_remind_shape_error(
644                "session/remind expects reminder `body`, not user-message `content`",
645            ));
646        }
647        return Err(reminder_unknown_option_error(format!(
648            "unknown reminder option(s): {}",
649            unknown.join(", ")
650        )));
651    }
652    if let Some(meta) = map.get("_meta") {
653        if !meta.is_null() && !meta.is_object() {
654            return Err(session_remind_shape_error("`_meta` must be an object"));
655        }
656    }
657    let ttl_turns = int_field(map, "ttl_turns")?;
658    if let Some(value) = ttl_turns {
659        if value <= 0 {
660            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
661        }
662    }
663    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
664    if fired_at_turn < 0 {
665        return Err(session_remind_shape_error(
666            "`fired_at_turn` must be >= 0 when provided",
667        ));
668    }
669    match string_field(map, "source", false)?.as_deref() {
670        None | Some("bridge") => {}
671        Some(_) => {
672            return Err(session_remind_shape_error(
673                "`source` for session/remind must be bridge when provided",
674            ))
675        }
676    }
677    let propagate = match string_field(map, "propagate", false)?.as_deref() {
678        None => crate::llm::helpers::ReminderPropagate::Session,
679        Some("all") => crate::llm::helpers::ReminderPropagate::All,
680        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
681        Some("none") => crate::llm::helpers::ReminderPropagate::None,
682        Some(_) => {
683            return Err(reminder_unknown_propagate_error(
684                "`propagate` must be one of all, session, or none",
685            ))
686        }
687    };
688    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
689        None => crate::llm::helpers::ReminderRoleHint::System,
690        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
691        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
692        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
693        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
694        Some(_) => {
695            return Err(session_remind_shape_error(
696                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
697            ))
698        }
699    };
700    Ok(crate::llm::helpers::SystemReminder {
701        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
702        tags: tags_field(map)?,
703        dedupe_key: string_field(map, "dedupe_key", false)?,
704        ttl_turns,
705        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
706        propagate,
707        role_hint,
708        source: crate::llm::helpers::ReminderSource::Bridge,
709        body: string_field(map, "body", true)?.unwrap_or_default(),
710        fired_at_turn,
711        originating_agent_id: None,
712    })
713}
714
715/// Parse the params of a `session/cancel_tool_call` notification and fire
716/// the per-tool-call cancellation. Mirrors the shape used by the public
717/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
718/// regardless of which surface they came through.
719///
720/// Stdio bridges send this as a notification (no id, no response); the
721/// builtin handles request/response semantics in Harn. We deliberately
722/// drop malformed payloads silently because notifications can't reply
723/// with an error — logging would also be too noisy for partial drops.
724fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
725    let session_id = params
726        .get("sessionId")
727        .or_else(|| params.get("session_id"))
728        .and_then(|value| value.as_str())
729        .unwrap_or_default();
730    let call_id = params
731        .get("toolCallId")
732        .or_else(|| params.get("tool_call_id"))
733        .or_else(|| params.get("callId"))
734        .or_else(|| params.get("call_id"))
735        .and_then(|value| value.as_str())
736        .unwrap_or_default();
737    if call_id.is_empty() {
738        return;
739    }
740    let reason = params
741        .get("reason")
742        .and_then(|value| value.as_str())
743        .unwrap_or("host cancelled in-flight tool call")
744        .to_string();
745    let inject_reminder = params
746        .get("injectReminder")
747        .or_else(|| params.get("inject_reminder"))
748        .and_then(|value| value.as_bool())
749        .unwrap_or(true);
750    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
751}
752
753fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
754    let mode = QueuedUserMessageMode::from_str(
755        params
756            .get("mode")
757            .and_then(|value| value.as_str())
758            .unwrap_or("audit_only"),
759    );
760    let reminder_value = if let Some(reminder) = params.get("reminder") {
761        reminder.clone()
762    } else {
763        let Some(params) = params.as_object() else {
764            return Err(session_remind_shape_error(
765                "session/remind params must be an object",
766            ));
767        };
768        let mut reminder = params.clone();
769        reminder.remove("mode");
770        reminder.remove("sessionId");
771        reminder.remove("session_id");
772        serde_json::Value::Object(reminder)
773    };
774    Ok(QueuedReminder {
775        reminder: session_remind_payload_from_value(&reminder_value)?,
776        mode,
777    })
778}
779
780// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
781#[allow(clippy::new_without_default)]
782impl HostBridge {
783    /// Create a new bridge and spawn the stdin reader task.
784    ///
785    /// Must be called within a tokio LocalSet (uses spawn_local for the
786    /// stdin reader since it's single-threaded).
787    pub fn new() -> Self {
788        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
789            Arc::new(Mutex::new(HashMap::new()));
790        let cancelled = Arc::new(AtomicBool::new(false));
791        let cancel_notify = Arc::new(Notify::new());
792        let queued_transcript_injections = HostBridgeInjectionState::default();
793        let resume_requested = Arc::new(AtomicBool::new(false));
794        let skills_reload_requested = Arc::new(AtomicBool::new(false));
795        let daemon_idle = Arc::new(AtomicBool::new(false));
796
797        // Stdin reader: reads JSON-RPC lines and dispatches responses
798        let pending_clone = pending.clone();
799        let cancelled_clone = cancelled.clone();
800        let cancel_notify_clone = cancel_notify.clone();
801        let queued_clone = queued_transcript_injections.clone();
802        let resume_clone = resume_requested.clone();
803        let skills_reload_clone = skills_reload_requested.clone();
804        tokio::task::spawn_local(async move {
805            let stdin = tokio::io::stdin();
806            let reader = tokio::io::BufReader::new(stdin);
807            let mut lines = reader.lines();
808
809            while let Ok(Some(line)) = lines.next_line().await {
810                let line = line.trim().to_string();
811                if line.is_empty() {
812                    continue;
813                }
814
815                let msg: serde_json::Value = match serde_json::from_str(&line) {
816                    Ok(v) => v,
817                    Err(_) => continue,
818                };
819
820                // Notifications have no id; responses have one.
821                if msg.get("id").is_none() {
822                    if let Some(method) = msg["method"].as_str() {
823                        if method == "cancel" {
824                            cancelled_clone.store(true, Ordering::SeqCst);
825                            cancel_notify_clone.notify_waiters();
826                        } else if method == "agent/resume" {
827                            resume_clone.store(true, Ordering::SeqCst);
828                        } else if method == "skills/update" {
829                            skills_reload_clone.store(true, Ordering::SeqCst);
830                        } else if method == "session/remind" {
831                            let params = &msg["params"];
832                            if let Ok(reminder) = queued_session_remind_from_params(params) {
833                                queued_clone.push_session_reminder(reminder).await;
834                            }
835                        } else if method == "session/cancel_tool_call" {
836                            handle_cancel_tool_call_notification(&msg["params"]);
837                        }
838                    }
839                    continue;
840                }
841
842                if let Some(id) = msg["id"].as_u64() {
843                    let mut pending = pending_clone.lock().await;
844                    if let Some(sender) = pending.remove(&id) {
845                        let _ = sender.send(msg);
846                    }
847                }
848            }
849
850            // stdin closed: drop pending senders to cancel waiters.
851            let mut pending = pending_clone.lock().await;
852            pending.clear();
853        });
854
855        Self {
856            next_id: AtomicU64::new(1),
857            pending,
858            cancelled,
859            cancel_notify,
860            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
861            session_id: std::sync::Mutex::new(String::new()),
862            script_name: std::sync::Mutex::new(String::new()),
863            queued_transcript_injections,
864            resume_requested,
865            skills_reload_requested,
866            daemon_idle,
867            prompt_stop_reason: std::sync::Mutex::new(None),
868            visible_call_states: std::sync::Mutex::new(HashMap::new()),
869            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
870            in_process: None,
871        }
872    }
873
874    /// Create a bridge from pre-existing shared state.
875    ///
876    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
877    /// responsible for dispatching responses into `pending`.  This is used
878    /// by ACP mode which already has its own stdin reader.
879    pub fn from_parts(
880        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
881        cancelled: Arc<AtomicBool>,
882        stdout_lock: Arc<std::sync::Mutex<()>>,
883        start_id: u64,
884    ) -> Self {
885        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
886    }
887
888    pub fn from_parts_with_writer(
889        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
890        cancelled: Arc<AtomicBool>,
891        writer: HostBridgeWriter,
892        start_id: u64,
893    ) -> Self {
894        Self::from_parts_with_writer_and_cancel_notify(
895            pending,
896            cancelled,
897            Arc::new(Notify::new()),
898            writer,
899            start_id,
900        )
901    }
902
903    pub fn from_parts_with_writer_and_cancel_notify(
904        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
905        cancelled: Arc<AtomicBool>,
906        cancel_notify: Arc<Notify>,
907        writer: HostBridgeWriter,
908        start_id: u64,
909    ) -> Self {
910        Self::from_parts_with_writer_cancel_notify_and_injection_state(
911            pending,
912            cancelled,
913            cancel_notify,
914            writer,
915            start_id,
916            None,
917        )
918    }
919
920    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
921        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
922        cancelled: Arc<AtomicBool>,
923        cancel_notify: Arc<Notify>,
924        writer: HostBridgeWriter,
925        start_id: u64,
926        injection_state: Option<HostBridgeInjectionState>,
927    ) -> Self {
928        Self {
929            next_id: AtomicU64::new(start_id),
930            pending,
931            cancelled,
932            cancel_notify,
933            writer,
934            session_id: std::sync::Mutex::new(String::new()),
935            script_name: std::sync::Mutex::new(String::new()),
936            queued_transcript_injections: injection_state.unwrap_or_default(),
937            resume_requested: Arc::new(AtomicBool::new(false)),
938            skills_reload_requested: Arc::new(AtomicBool::new(false)),
939            daemon_idle: Arc::new(AtomicBool::new(false)),
940            prompt_stop_reason: std::sync::Mutex::new(None),
941            visible_call_states: std::sync::Mutex::new(HashMap::new()),
942            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
943            in_process: None,
944        }
945    }
946
947    /// Create an in-process host bridge backed by exported functions from a
948    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
949    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
950        let exported_functions = vm.load_module_exports(module_path).await?;
951        Ok(Self {
952            next_id: AtomicU64::new(1),
953            pending: Arc::new(Mutex::new(HashMap::new())),
954            cancelled: Arc::new(AtomicBool::new(false)),
955            cancel_notify: Arc::new(Notify::new()),
956            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
957            session_id: std::sync::Mutex::new(String::new()),
958            script_name: std::sync::Mutex::new(String::new()),
959            queued_transcript_injections: HostBridgeInjectionState::default(),
960            resume_requested: Arc::new(AtomicBool::new(false)),
961            skills_reload_requested: Arc::new(AtomicBool::new(false)),
962            daemon_idle: Arc::new(AtomicBool::new(false)),
963            prompt_stop_reason: std::sync::Mutex::new(None),
964            visible_call_states: std::sync::Mutex::new(HashMap::new()),
965            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
966            in_process: Some(InProcessHost {
967                module_path: module_path.to_path_buf(),
968                exported_functions,
969                vm,
970            }),
971        })
972    }
973
974    /// Set the ACP session ID for session-scoped notifications.
975    pub fn set_session_id(&self, id: &str) {
976        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
977    }
978
979    /// Set the currently executing script name (without .harn suffix).
980    pub fn set_script_name(&self, name: &str) {
981        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
982    }
983
984    /// Get the current script name.
985    fn get_script_name(&self) -> String {
986        self.script_name
987            .lock()
988            .unwrap_or_else(|e| e.into_inner())
989            .clone()
990    }
991
992    /// Get the session ID.
993    pub fn get_session_id(&self) -> String {
994        self.session_id
995            .lock()
996            .unwrap_or_else(|e| e.into_inner())
997            .clone()
998    }
999
1000    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
1001    fn write_line(&self, line: &str) -> Result<(), VmError> {
1002        (self.writer)(line).map_err(VmError::Runtime)
1003    }
1004
1005    /// Send a JSON-RPC request to the host and wait for the response.
1006    /// Times out after 5 minutes to prevent deadlocks.
1007    pub async fn call(
1008        &self,
1009        method: &str,
1010        params: serde_json::Value,
1011    ) -> Result<serde_json::Value, VmError> {
1012        if let Some(in_process) = &self.in_process {
1013            return in_process.dispatch(method, params).await;
1014        }
1015
1016        if self.is_cancelled() {
1017            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1018        }
1019
1020        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1021        let cancel_wait = self.cancel_notify.notified();
1022        tokio::pin!(cancel_wait);
1023
1024        let request = crate::jsonrpc::request(id, method, params);
1025
1026        let (tx, rx) = oneshot::channel();
1027        {
1028            let mut pending = self.pending.lock().await;
1029            pending.insert(id, tx);
1030        }
1031
1032        let line = serde_json::to_string(&request)
1033            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1034        if let Err(e) = self.write_line(&line) {
1035            let mut pending = self.pending.lock().await;
1036            pending.remove(&id);
1037            return Err(e);
1038        }
1039
1040        if self.is_cancelled() {
1041            let mut pending = self.pending.lock().await;
1042            pending.remove(&id);
1043            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1044        }
1045
1046        let response = tokio::select! {
1047            result = rx => match result {
1048                Ok(msg) => msg,
1049                Err(_) => {
1050                    // Sender dropped: host closed or stdin reader exited.
1051                    return Err(VmError::Runtime(
1052                        "Bridge: host closed connection before responding".into(),
1053                    ));
1054                }
1055            },
1056            _ = &mut cancel_wait => {
1057                let mut pending = self.pending.lock().await;
1058                pending.remove(&id);
1059                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1060            }
1061            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1062                let mut pending = self.pending.lock().await;
1063                pending.remove(&id);
1064                return Err(VmError::Runtime(format!(
1065                    "Bridge: host did not respond to '{method}' within {}s",
1066                    DEFAULT_TIMEOUT.as_secs()
1067                )));
1068            }
1069        };
1070
1071        if let Some(error) = response.get("error") {
1072            let message = error["message"].as_str().unwrap_or("Unknown host error");
1073            let code = error["code"].as_i64().unwrap_or(-1);
1074            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
1075            if code == -32001 {
1076                return Err(VmError::CategorizedError {
1077                    message: message.to_string(),
1078                    category: ErrorCategory::ToolRejected,
1079                });
1080            }
1081            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1082        }
1083
1084        Ok(response["result"].clone())
1085    }
1086
1087    /// Send a JSON-RPC notification to the host (no response expected).
1088    /// Serialized through the stdout mutex to prevent interleaving.
1089    pub fn notify(&self, method: &str, params: serde_json::Value) {
1090        let notification = crate::jsonrpc::notification(method, params);
1091        if self.in_process.is_some() {
1092            return;
1093        }
1094        if let Ok(line) = serde_json::to_string(&notification) {
1095            let _ = self.write_line(&line);
1096        }
1097    }
1098
1099    /// Check if the host has sent a cancel notification.
1100    pub fn is_cancelled(&self) -> bool {
1101        self.cancelled.load(Ordering::SeqCst)
1102    }
1103
1104    pub fn take_resume_signal(&self) -> bool {
1105        self.resume_requested.swap(false, Ordering::SeqCst)
1106    }
1107
1108    pub fn signal_resume(&self) {
1109        self.resume_requested.store(true, Ordering::SeqCst);
1110    }
1111
1112    pub fn set_daemon_idle(&self, idle: bool) {
1113        self.daemon_idle.store(idle, Ordering::SeqCst);
1114    }
1115
1116    pub fn is_daemon_idle(&self) -> bool {
1117        self.daemon_idle.load(Ordering::SeqCst)
1118    }
1119
1120    /// Record the canonical ACP `stopReason` for the current prompt. The
1121    /// last writer wins, which matches the semantic that an outer
1122    /// `agent_loop` (the one whose result the user observes) always
1123    /// finalizes after any inner loops it spawned.
1124    pub fn set_prompt_stop_reason(&self, reason: &str) {
1125        *self
1126            .prompt_stop_reason
1127            .lock()
1128            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1129    }
1130
1131    /// Consume any prompt stop reason recorded during this prompt. The
1132    /// ACP adapter calls this once after the pipeline returns; pipelines
1133    /// that didn't run an `agent_loop` see `None` and the adapter falls
1134    /// back to `end_turn`.
1135    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1136        self.prompt_stop_reason
1137            .lock()
1138            .unwrap_or_else(|e| e.into_inner())
1139            .take()
1140    }
1141
1142    /// Consume any pending `skills/update` signal the host has sent.
1143    /// Returns `true` exactly once per notification, letting callers
1144    /// trigger a layered-discovery rebuild without polling false
1145    /// positives. See issue #73 for the hot-reload contract.
1146    pub fn take_skills_reload_signal(&self) -> bool {
1147        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1148    }
1149
1150    /// Manually mark the skill catalog as stale. Used by tests and by
1151    /// the CLI when an internal event (e.g. `harn install`) should
1152    /// trigger the same rebuild a `skills/update` notification would.
1153    pub fn signal_skills_reload(&self) {
1154        self.skills_reload_requested.store(true, Ordering::SeqCst);
1155    }
1156
1157    /// Call the host's `skills/list` RPC and return the raw JSON array
1158    /// it responded with. Shape:
1159    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1160    /// The CLI adapter converts each entry into a
1161    /// [`crate::skills::SkillManifestRef`].
1162    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1163        let result = self.call("skills/list", serde_json::json!({})).await?;
1164        match result {
1165            serde_json::Value::Array(items) => Ok(items),
1166            serde_json::Value::Object(map) => match map.get("skills") {
1167                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1168                _ => Err(VmError::Runtime(
1169                    "skills/list: host response must be an array or { skills: [...] }".into(),
1170                )),
1171            },
1172            _ => Err(VmError::Runtime(
1173                "skills/list: unexpected response shape".into(),
1174            )),
1175        }
1176    }
1177
1178    /// Call the host's `host/tools/list` RPC and return normalized tool
1179    /// descriptors. Shape:
1180    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1181    /// The bridge also accepts `{ "tools": [...] }` and
1182    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1183    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1184        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1185        parse_host_tools_list_response(result)
1186    }
1187
1188    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1189    /// raw JSON body so the CLI can inspect both the frontmatter fields
1190    /// and the skill markdown body in whatever shape the host sends.
1191    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1192        self.call("skills/fetch", serde_json::json!({ "id": id }))
1193            .await
1194    }
1195
1196    pub fn injection_state(&self) -> HostBridgeInjectionState {
1197        self.queued_transcript_injections.clone()
1198    }
1199
1200    pub async fn push_pending_user_message(
1201        &self,
1202        content: String,
1203        transcript_content: serde_json::Value,
1204        mode: &str,
1205    ) -> String {
1206        self.queued_transcript_injections
1207            .push_pending_user_message(content, transcript_content, mode)
1208            .await
1209    }
1210
1211    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1212        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1213            .await
1214    }
1215
1216    pub async fn revoke_pending_user_message(
1217        &self,
1218        message_id: &str,
1219    ) -> PendingUserMessageMutationResult {
1220        self.queued_transcript_injections
1221            .revoke_pending_user_message(message_id)
1222            .await
1223    }
1224
1225    pub async fn revoke_pending_reminder(
1226        &self,
1227        reminder_id: &str,
1228    ) -> PendingReminderMutationResult {
1229        self.queued_transcript_injections
1230            .revoke_pending_reminder(reminder_id)
1231            .await
1232    }
1233
1234    pub async fn pending_injections_json(&self) -> serde_json::Value {
1235        self.queued_transcript_injections
1236            .pending_injections_json()
1237            .await
1238    }
1239
1240    pub async fn replace_pending_user_message(
1241        &self,
1242        message_id: &str,
1243        content: String,
1244        transcript_content: serde_json::Value,
1245    ) -> PendingUserMessageMutationResult {
1246        self.queued_transcript_injections
1247            .replace_pending_user_message(message_id, content, transcript_content)
1248            .await
1249    }
1250
1251    pub async fn push_queued_session_remind_from_params(
1252        &self,
1253        params: &serde_json::Value,
1254    ) -> Result<String, String> {
1255        let reminder = queued_session_remind_from_params(params)?;
1256        let reminder_id = reminder.reminder.id.clone();
1257        self.queued_transcript_injections
1258            .push_session_reminder(reminder)
1259            .await;
1260        Ok(reminder_id)
1261    }
1262
1263    pub async fn take_queued_user_messages(
1264        &self,
1265        include_interrupt_immediate: bool,
1266        include_finish_step: bool,
1267        include_audit_only: bool,
1268    ) -> Vec<QueuedUserMessage> {
1269        let mut state = self.queued_transcript_injections.inner.lock().await;
1270        let mut selected = Vec::new();
1271        let mut retained = VecDeque::new();
1272        while let Some(injection) = state.queue.pop_front() {
1273            let should_take = match injection.mode() {
1274                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1275                QueuedUserMessageMode::FinishStep => include_finish_step,
1276                QueuedUserMessageMode::AuditOnly => include_audit_only,
1277            };
1278            match (should_take, injection) {
1279                (true, QueuedTranscriptInjection::User(message)) => {
1280                    state
1281                        .delivered_user_message_ids
1282                        .insert(message.message_id.clone());
1283                    selected.push(message);
1284                }
1285                (_, injection) => retained.push_back(injection),
1286            }
1287        }
1288        state.queue = retained;
1289        selected
1290    }
1291
1292    pub async fn take_queued_transcript_injections(
1293        &self,
1294        include_interrupt_immediate: bool,
1295        include_finish_step: bool,
1296        include_audit_only: bool,
1297    ) -> Vec<QueuedTranscriptInjection> {
1298        let mut state = self.queued_transcript_injections.inner.lock().await;
1299        let mut selected = Vec::new();
1300        let mut retained = VecDeque::new();
1301        while let Some(injection) = state.queue.pop_front() {
1302            let should_take = match injection.mode() {
1303                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1304                QueuedUserMessageMode::FinishStep => include_finish_step,
1305                QueuedUserMessageMode::AuditOnly => include_audit_only,
1306            };
1307            if should_take {
1308                match &injection {
1309                    QueuedTranscriptInjection::User(message) => {
1310                        state
1311                            .delivered_user_message_ids
1312                            .insert(message.message_id.clone());
1313                    }
1314                    QueuedTranscriptInjection::Reminder(reminder) => {
1315                        state
1316                            .delivered_reminder_ids
1317                            .insert(reminder.reminder.id.clone());
1318                    }
1319                }
1320                selected.push(injection);
1321            } else {
1322                retained.push_back(injection);
1323            }
1324        }
1325        state.queue = retained;
1326        selected
1327    }
1328
1329    pub async fn take_queued_user_messages_for(
1330        &self,
1331        checkpoint: DeliveryCheckpoint,
1332    ) -> Vec<QueuedUserMessage> {
1333        match checkpoint {
1334            DeliveryCheckpoint::InterruptImmediate => {
1335                self.take_queued_user_messages(true, false, false).await
1336            }
1337            DeliveryCheckpoint::AfterCurrentOperation => {
1338                self.take_queued_user_messages(false, true, false).await
1339            }
1340            DeliveryCheckpoint::EndOfInteraction => {
1341                self.take_queued_user_messages(false, false, true).await
1342            }
1343        }
1344    }
1345
1346    pub async fn take_queued_transcript_injections_for(
1347        &self,
1348        checkpoint: DeliveryCheckpoint,
1349    ) -> Vec<QueuedTranscriptInjection> {
1350        match checkpoint {
1351            DeliveryCheckpoint::InterruptImmediate => {
1352                self.take_queued_transcript_injections(true, false, false)
1353                    .await
1354            }
1355            DeliveryCheckpoint::AfterCurrentOperation => {
1356                self.take_queued_transcript_injections(false, true, false)
1357                    .await
1358            }
1359            DeliveryCheckpoint::EndOfInteraction => {
1360                self.take_queued_transcript_injections(false, false, true)
1361                    .await
1362            }
1363        }
1364    }
1365
1366    /// Send an output notification (for log/print in bridge mode).
1367    pub fn send_output(&self, text: &str) {
1368        self.notify("output", serde_json::json!({"text": text}));
1369    }
1370
1371    /// Send a progress notification with optional numeric progress and structured data.
1372    pub fn send_progress(
1373        &self,
1374        phase: &str,
1375        message: &str,
1376        progress: Option<i64>,
1377        total: Option<i64>,
1378        data: Option<serde_json::Value>,
1379    ) {
1380        let mut payload = serde_json::json!({"phase": phase, "message": message});
1381        if let Some(p) = progress {
1382            payload["progress"] = serde_json::json!(p);
1383        }
1384        if let Some(t) = total {
1385            payload["total"] = serde_json::json!(t);
1386        }
1387        if let Some(d) = data {
1388            payload["data"] = d;
1389        }
1390        self.notify("progress", payload);
1391    }
1392
1393    /// Send a structured log notification.
1394    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1395        let mut payload = serde_json::json!({"level": level, "message": message});
1396        if let Some(f) = fields {
1397            payload["fields"] = f;
1398        }
1399        self.notify("log", payload);
1400    }
1401
1402    /// Send a `session/update` with `call_start` — signals the beginning of
1403    /// an LLM call, tool call, or builtin call for observability.
1404    pub fn send_call_start(
1405        &self,
1406        call_id: &str,
1407        call_type: &str,
1408        name: &str,
1409        metadata: serde_json::Value,
1410    ) {
1411        let session_id = self.get_session_id();
1412        let script = self.get_script_name();
1413        let stream_publicly = metadata
1414            .get("stream_publicly")
1415            .and_then(|value| value.as_bool())
1416            .unwrap_or(true);
1417        self.visible_call_streams
1418            .lock()
1419            .unwrap_or_else(|e| e.into_inner())
1420            .insert(call_id.to_string(), stream_publicly);
1421        self.notify(
1422            "session/update",
1423            serde_json::json!({
1424                "sessionId": session_id,
1425                "update": {
1426                    "sessionUpdate": "call_start",
1427                    "content": {
1428                        "toolCallId": call_id,
1429                        "call_type": call_type,
1430                        "name": name,
1431                        "script": script,
1432                        "metadata": metadata,
1433                    },
1434                },
1435            }),
1436        );
1437    }
1438
1439    /// Send a `session/update` with `call_progress` — a streaming token delta
1440    /// from an in-flight LLM call.
1441    pub fn send_call_progress(
1442        &self,
1443        call_id: &str,
1444        delta: &str,
1445        accumulated_tokens: u64,
1446        user_visible: bool,
1447    ) {
1448        let session_id = self.get_session_id();
1449        let (visible_text, visible_delta) = {
1450            let stream_publicly = self
1451                .visible_call_streams
1452                .lock()
1453                .unwrap_or_else(|e| e.into_inner())
1454                .get(call_id)
1455                .copied()
1456                .unwrap_or(true);
1457            let mut states = self
1458                .visible_call_states
1459                .lock()
1460                .unwrap_or_else(|e| e.into_inner());
1461            let state = states.entry(call_id.to_string()).or_default();
1462            state.push(delta, stream_publicly)
1463        };
1464        self.notify(
1465            "session/update",
1466            serde_json::json!({
1467                "sessionId": session_id,
1468                "update": {
1469                    "sessionUpdate": "call_progress",
1470                    "content": {
1471                        "toolCallId": call_id,
1472                        "delta": delta,
1473                        "accumulated_tokens": accumulated_tokens,
1474                        "visible_text": visible_text,
1475                        "visible_delta": visible_delta,
1476                        "user_visible": user_visible,
1477                    },
1478                },
1479            }),
1480        );
1481    }
1482
1483    /// Send a `session/update` with `call_end` — signals completion of a call.
1484    pub fn send_call_end(
1485        &self,
1486        call_id: &str,
1487        call_type: &str,
1488        name: &str,
1489        duration_ms: u64,
1490        status: &str,
1491        metadata: serde_json::Value,
1492    ) {
1493        let session_id = self.get_session_id();
1494        let script = self.get_script_name();
1495        self.visible_call_states
1496            .lock()
1497            .unwrap_or_else(|e| e.into_inner())
1498            .remove(call_id);
1499        self.visible_call_streams
1500            .lock()
1501            .unwrap_or_else(|e| e.into_inner())
1502            .remove(call_id);
1503        self.notify(
1504            "session/update",
1505            serde_json::json!({
1506                "sessionId": session_id,
1507                "update": {
1508                    "sessionUpdate": "call_end",
1509                    "content": {
1510                        "toolCallId": call_id,
1511                        "call_type": call_type,
1512                        "name": name,
1513                        "script": script,
1514                        "duration_ms": duration_ms,
1515                        "status": status,
1516                        "metadata": metadata,
1517                    },
1518                },
1519            }),
1520        );
1521    }
1522
1523    /// Send a worker lifecycle update for delegated/background execution.
1524    pub fn send_worker_update(
1525        &self,
1526        worker_id: &str,
1527        worker_name: &str,
1528        status: &str,
1529        metadata: serde_json::Value,
1530        audit: Option<&MutationSessionRecord>,
1531    ) {
1532        let session_id = self.get_session_id();
1533        let script = self.get_script_name();
1534        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1535        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1536        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1537        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1538        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1539        let lifecycle = serde_json::json!({
1540            "event": status,
1541            "worker_id": worker_id,
1542            "worker_name": worker_name,
1543            "started_at": started_at,
1544            "finished_at": finished_at,
1545        });
1546        self.notify(
1547            "session/update",
1548            serde_json::json!({
1549                "sessionId": session_id,
1550                "update": {
1551                    "sessionUpdate": "worker_update",
1552                    "content": {
1553                        "worker_id": worker_id,
1554                        "worker_name": worker_name,
1555                        "status": status,
1556                        "script": script,
1557                        "started_at": started_at,
1558                        "finished_at": finished_at,
1559                        "snapshot_path": snapshot_path,
1560                        "run_id": run_id,
1561                        "run_path": run_path,
1562                        "lifecycle": lifecycle,
1563                        "audit": audit,
1564                        "metadata": metadata,
1565                    },
1566                },
1567            }),
1568        );
1569    }
1570}
1571
1572/// Convert a serde_json::Value to a VmValue.
1573pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1574    crate::stdlib::json_to_vm_value(val)
1575}
1576
1577fn parse_host_tools_list_response(
1578    result: serde_json::Value,
1579) -> Result<Vec<serde_json::Value>, VmError> {
1580    let tools = match result {
1581        serde_json::Value::Array(items) => items,
1582        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1583            map.get("result")
1584                .and_then(|value| value.get("tools"))
1585                .cloned()
1586        }) {
1587            Some(serde_json::Value::Array(items)) => items,
1588            _ => {
1589                return Err(VmError::Runtime(
1590                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1591                ));
1592            }
1593        },
1594        _ => {
1595            return Err(VmError::Runtime(
1596                "host/tools/list: unexpected response shape".into(),
1597            ));
1598        }
1599    };
1600
1601    let mut normalized = Vec::with_capacity(tools.len());
1602    for tool in tools {
1603        let serde_json::Value::Object(map) = tool else {
1604            return Err(VmError::Runtime(
1605                "host/tools/list: every tool must be an object".into(),
1606            ));
1607        };
1608        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1609            return Err(VmError::Runtime(
1610                "host/tools/list: every tool must include a string `name`".into(),
1611            ));
1612        };
1613        let description = map
1614            .get("description")
1615            .and_then(|value| value.as_str())
1616            .or_else(|| {
1617                map.get("short_description")
1618                    .and_then(|value| value.as_str())
1619            })
1620            .unwrap_or_default();
1621        let schema = map
1622            .get("schema")
1623            .cloned()
1624            .or_else(|| map.get("parameters").cloned())
1625            .or_else(|| map.get("input_schema").cloned())
1626            .unwrap_or(serde_json::Value::Null);
1627        let deprecated = map
1628            .get("deprecated")
1629            .and_then(|value| value.as_bool())
1630            .unwrap_or(false);
1631        normalized.push(serde_json::json!({
1632            "name": name,
1633            "description": description,
1634            "schema": schema,
1635            "deprecated": deprecated,
1636        }));
1637    }
1638    Ok(normalized)
1639}
1640
1641#[cfg(test)]
1642mod tests {
1643    use super::*;
1644
1645    fn test_bridge() -> HostBridge {
1646        HostBridge::from_parts(
1647            Arc::new(Mutex::new(HashMap::new())),
1648            Arc::new(AtomicBool::new(false)),
1649            Arc::new(std::sync::Mutex::new(())),
1650            1,
1651        )
1652    }
1653
1654    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1655        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1656            Arc::new(Mutex::new(HashMap::new())),
1657            Arc::new(AtomicBool::new(false)),
1658            Arc::new(Notify::new()),
1659            Arc::new(|_| Ok(())),
1660            100,
1661            Some(owner.injection_state()),
1662        )
1663    }
1664
1665    #[test]
1666    fn test_json_rpc_request_format() {
1667        let request = crate::jsonrpc::request(
1668            1,
1669            "llm_call",
1670            serde_json::json!({
1671                "prompt": "Hello",
1672                "system": "Be helpful",
1673            }),
1674        );
1675        let s = serde_json::to_string(&request).unwrap();
1676        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1677        assert!(s.contains("\"id\":1"));
1678        assert!(s.contains("\"method\":\"llm_call\""));
1679    }
1680
1681    #[test]
1682    fn test_json_rpc_notification_format() {
1683        let notification =
1684            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1685        let s = serde_json::to_string(&notification).unwrap();
1686        assert!(s.contains("\"method\":\"output\""));
1687        assert!(!s.contains("\"id\""));
1688    }
1689
1690    #[test]
1691    fn test_json_rpc_error_response_parsing() {
1692        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1693        assert!(response.get("error").is_some());
1694        assert_eq!(
1695            response["error"]["message"].as_str().unwrap(),
1696            "Invalid request"
1697        );
1698    }
1699
1700    #[test]
1701    fn test_json_rpc_success_response_parsing() {
1702        let response = crate::jsonrpc::response(
1703            1,
1704            serde_json::json!({
1705                "text": "Hello world",
1706                "input_tokens": 10,
1707                "output_tokens": 5,
1708            }),
1709        );
1710        assert!(response.get("result").is_some());
1711        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1712    }
1713
1714    #[test]
1715    fn test_cancelled_flag() {
1716        let cancelled = Arc::new(AtomicBool::new(false));
1717        assert!(!cancelled.load(Ordering::SeqCst));
1718        cancelled.store(true, Ordering::SeqCst);
1719        assert!(cancelled.load(Ordering::SeqCst));
1720    }
1721
1722    #[test]
1723    fn pending_host_calls_return_when_cancellation_arrives() {
1724        let runtime = tokio::runtime::Builder::new_current_thread()
1725            .enable_all()
1726            .build()
1727            .unwrap();
1728        runtime.block_on(async {
1729            let pending = Arc::new(Mutex::new(HashMap::new()));
1730            let cancelled = Arc::new(AtomicBool::new(false));
1731            let bridge = HostBridge::from_parts_with_writer(
1732                pending.clone(),
1733                cancelled.clone(),
1734                Arc::new(|_| Ok(())),
1735                1,
1736            );
1737
1738            let call = bridge.call("host/work", serde_json::json!({}));
1739            tokio::pin!(call);
1740
1741            loop {
1742                tokio::select! {
1743                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1744                    _ = tokio::task::yield_now() => {}
1745                }
1746                if !pending.lock().await.is_empty() {
1747                    break;
1748                }
1749            }
1750
1751            cancelled.store(true, Ordering::SeqCst);
1752            bridge.cancel_notify.notify_waiters();
1753
1754            let result = tokio::time::timeout(Duration::from_secs(1), call)
1755                .await
1756                .expect("pending call should observe cancellation promptly");
1757            assert!(
1758                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1759            );
1760            assert!(pending.lock().await.is_empty());
1761        });
1762    }
1763
1764    #[test]
1765    fn queued_messages_are_filtered_by_delivery_mode() {
1766        let runtime = tokio::runtime::Builder::new_current_thread()
1767            .enable_all()
1768            .build()
1769            .unwrap();
1770        runtime.block_on(async {
1771            let bridge = test_bridge();
1772            bridge
1773                .push_queued_user_message("first".to_string(), "finish_step")
1774                .await;
1775            bridge
1776                .push_queued_user_message("second".to_string(), "audit_only")
1777                .await;
1778
1779            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1780            assert_eq!(finish_step.len(), 1);
1781            assert_eq!(finish_step[0].content, "first");
1782
1783            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1784            assert_eq!(audit_only.len(), 1);
1785            assert_eq!(audit_only[0].content, "second");
1786        });
1787    }
1788
1789    #[test]
1790    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1791        let runtime = tokio::runtime::Builder::new_current_thread()
1792            .enable_all()
1793            .build()
1794            .unwrap();
1795        runtime.block_on(async {
1796            let bridge = test_bridge();
1797            let first_id = bridge
1798                .push_pending_user_message(
1799                    "first".to_string(),
1800                    serde_json::json!("first"),
1801                    "audit_only",
1802                )
1803                .await;
1804            let second_id = bridge
1805                .push_pending_user_message(
1806                    "second".to_string(),
1807                    serde_json::json!("second"),
1808                    "audit_only",
1809                )
1810                .await;
1811
1812            assert_eq!(
1813                bridge
1814                    .replace_pending_user_message(
1815                        &second_id,
1816                        "second edited".to_string(),
1817                        serde_json::json!("second edited"),
1818                    )
1819                    .await,
1820                PendingUserMessageMutationResult::Mutated
1821            );
1822            assert_eq!(
1823                bridge.revoke_pending_user_message(&first_id).await,
1824                PendingUserMessageMutationResult::Mutated
1825            );
1826            assert_eq!(
1827                bridge.revoke_pending_user_message(&first_id).await,
1828                PendingUserMessageMutationResult::AlreadyRevoked
1829            );
1830
1831            let delivered = bridge
1832                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1833                .await;
1834            assert_eq!(delivered.len(), 1);
1835            assert_eq!(delivered[0].message_id, second_id);
1836            assert_eq!(delivered[0].content, "second edited");
1837
1838            assert_eq!(
1839                bridge.revoke_pending_user_message(&second_id).await,
1840                PendingUserMessageMutationResult::AlreadyDelivered
1841            );
1842            assert_eq!(
1843                bridge
1844                    .replace_pending_user_message(
1845                        &second_id,
1846                        "too late".to_string(),
1847                        serde_json::json!("too late"),
1848                    )
1849                    .await,
1850                PendingUserMessageMutationResult::AlreadyDelivered
1851            );
1852            assert_eq!(
1853                bridge.revoke_pending_user_message("missing").await,
1854                PendingUserMessageMutationResult::UnknownMessageId
1855            );
1856        });
1857    }
1858
1859    #[test]
1860    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1861        let runtime = tokio::runtime::Builder::new_current_thread()
1862            .enable_all()
1863            .build()
1864            .unwrap();
1865        runtime.block_on(async {
1866            let bridge = test_bridge();
1867            let first_id = bridge
1868                .push_pending_user_message(
1869                    "first".to_string(),
1870                    serde_json::json!("first"),
1871                    "finish_step",
1872                )
1873                .await;
1874            let second_id = bridge
1875                .push_pending_user_message(
1876                    "second".to_string(),
1877                    serde_json::json!("second"),
1878                    "finish_step",
1879                )
1880                .await;
1881            assert_eq!(
1882                bridge
1883                    .replace_pending_user_message(
1884                        &first_id,
1885                        "first edited".to_string(),
1886                        serde_json::json!("first edited"),
1887                    )
1888                    .await,
1889                PendingUserMessageMutationResult::Mutated
1890            );
1891
1892            let delivered = bridge
1893                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1894                .await;
1895            assert_eq!(
1896                delivered
1897                    .iter()
1898                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1899                    .collect::<Vec<_>>(),
1900                vec![
1901                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1902                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
1903                ]
1904            );
1905        });
1906    }
1907
1908    #[test]
1909    fn pending_user_message_state_survives_bridge_replacement() {
1910        let runtime = tokio::runtime::Builder::new_current_thread()
1911            .enable_all()
1912            .build()
1913            .unwrap();
1914        runtime.block_on(async {
1915            let bridge = test_bridge();
1916            let revoked_id = bridge
1917                .push_pending_user_message(
1918                    "revoke me".to_string(),
1919                    serde_json::json!("revoke me"),
1920                    "audit_only",
1921                )
1922                .await;
1923            let delivered_id = bridge
1924                .push_pending_user_message(
1925                    "deliver me".to_string(),
1926                    serde_json::json!("deliver me"),
1927                    "audit_only",
1928                )
1929                .await;
1930            assert_eq!(
1931                bridge.revoke_pending_user_message(&revoked_id).await,
1932                PendingUserMessageMutationResult::Mutated
1933            );
1934            bridge.cancelled.store(true, Ordering::SeqCst);
1935
1936            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1937            assert_eq!(
1938                replacement_bridge
1939                    .revoke_pending_user_message(&revoked_id)
1940                    .await,
1941                PendingUserMessageMutationResult::AlreadyRevoked
1942            );
1943            let delivered = replacement_bridge
1944                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1945                .await;
1946            assert_eq!(delivered.len(), 1);
1947            assert_eq!(delivered[0].message_id, delivered_id);
1948            assert_eq!(delivered[0].content, "deliver me");
1949            assert_eq!(
1950                bridge.revoke_pending_user_message(&delivered_id).await,
1951                PendingUserMessageMutationResult::AlreadyDelivered
1952            );
1953        });
1954    }
1955
1956    #[test]
1957    fn queued_transcript_injections_preserve_user_reminder_separation() {
1958        let runtime = tokio::runtime::Builder::new_current_thread()
1959            .enable_all()
1960            .build()
1961            .unwrap();
1962        runtime.block_on(async {
1963            let bridge = test_bridge();
1964            bridge
1965                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1966                .await;
1967            let reminder_id = bridge
1968                .push_queued_session_remind_from_params(&serde_json::json!({
1969                    "body": "Host-provided ambient context.",
1970                    "tags": ["host"],
1971                    "dedupe_key": "host-context",
1972                    "ttl_turns": 2,
1973                    "mode": "audit_only",
1974                    "_meta": {"harn": {"source": "test"}},
1975                }))
1976                .await
1977                .expect("valid reminder");
1978
1979            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1980            assert_eq!(finish_step.len(), 1);
1981            assert_eq!(finish_step[0].content, "human follow-up");
1982
1983            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1984            assert!(no_user_messages.is_empty());
1985
1986            let injections = bridge
1987                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1988                .await;
1989            assert_eq!(injections.len(), 1);
1990            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1991                panic!("expected queued reminder");
1992            };
1993            assert_eq!(reminder.reminder.id, reminder_id);
1994            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1995            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1996            assert_eq!(
1997                reminder.reminder.dedupe_key.as_deref(),
1998                Some("host-context")
1999            );
2000            assert_eq!(reminder.reminder.ttl_turns, Some(2));
2001            assert_eq!(
2002                reminder.reminder.source,
2003                crate::llm::helpers::ReminderSource::Bridge
2004            );
2005        });
2006    }
2007
2008    #[test]
2009    fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2010        let runtime = tokio::runtime::Builder::new_current_thread()
2011            .enable_all()
2012            .build()
2013            .unwrap();
2014        runtime.block_on(async {
2015            let bridge = test_bridge();
2016            let message_id = bridge
2017                .push_pending_user_message(
2018                    "human follow-up".to_string(),
2019                    serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2020                    "finish_step",
2021                )
2022                .await;
2023            let reminder_id = bridge
2024                .push_queued_session_remind_from_params(&serde_json::json!({
2025                    "id": "rem-test",
2026                    "body": "Host reminder",
2027                    "tags": ["host"],
2028                    "dedupe_key": "host-reminder",
2029                    "ttl_turns": 2,
2030                    "mode": "interrupt_immediate",
2031                }))
2032                .await
2033                .expect("valid session/remind payload");
2034
2035            let pending = bridge.pending_injections_json().await;
2036            assert_eq!(pending["pendingCount"], 2);
2037            assert_eq!(pending["injections"][0]["kind"], "user");
2038            assert_eq!(pending["injections"][0]["id"], message_id);
2039            assert_eq!(pending["injections"][0]["messageId"], message_id);
2040            assert_eq!(pending["injections"][0]["mode"], "finish_step");
2041            assert_eq!(pending["injections"][0]["position"], 0);
2042            assert_eq!(pending["injections"][1]["kind"], "reminder");
2043            assert_eq!(pending["injections"][1]["id"], reminder_id);
2044            assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2045            assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2046            assert_eq!(pending["injections"][1]["body"], "Host reminder");
2047            assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2048            assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2049            assert_eq!(pending["injections"][1]["position"], 1);
2050        });
2051    }
2052
2053    #[test]
2054    fn pending_reminders_support_revoke_and_delivery_states() {
2055        let runtime = tokio::runtime::Builder::new_current_thread()
2056            .enable_all()
2057            .build()
2058            .unwrap();
2059        runtime.block_on(async {
2060            let bridge = test_bridge();
2061            let revoked_id = bridge
2062                .push_queued_session_remind_from_params(&serde_json::json!({
2063                    "id": "rem-revoke",
2064                    "body": "remove me",
2065                    "mode": "finish_step",
2066                }))
2067                .await
2068                .expect("valid session/remind payload");
2069            let delivered_id = bridge
2070                .push_queued_session_remind_from_params(&serde_json::json!({
2071                    "id": "rem-deliver",
2072                    "body": "deliver me",
2073                    "mode": "finish_step",
2074                }))
2075                .await
2076                .expect("valid session/remind payload");
2077
2078            assert_eq!(
2079                bridge.revoke_pending_reminder(&revoked_id).await,
2080                PendingReminderMutationResult::Mutated
2081            );
2082            assert_eq!(
2083                bridge.revoke_pending_reminder(&revoked_id).await,
2084                PendingReminderMutationResult::AlreadyRevoked
2085            );
2086
2087            let pending = bridge.pending_injections_json().await;
2088            assert_eq!(pending["pendingCount"], 1);
2089            assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2090
2091            let delivered = bridge
2092                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2093                .await;
2094            assert_eq!(delivered.len(), 1);
2095            let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2096                panic!("expected delivered reminder");
2097            };
2098            assert_eq!(reminder.reminder.id, delivered_id);
2099
2100            assert_eq!(
2101                bridge.revoke_pending_reminder(&delivered_id).await,
2102                PendingReminderMutationResult::AlreadyDelivered
2103            );
2104            assert_eq!(
2105                bridge.revoke_pending_reminder("missing").await,
2106                PendingReminderMutationResult::UnknownReminderId
2107            );
2108        });
2109    }
2110
2111    #[test]
2112    fn bridge_remind_modes_honor_delivery_checkpoints() {
2113        let runtime = tokio::runtime::Builder::new_current_thread()
2114            .enable_all()
2115            .build()
2116            .unwrap();
2117        runtime.block_on(async {
2118            let cases = [
2119                (
2120                    "interrupt_immediate",
2121                    DeliveryCheckpoint::InterruptImmediate,
2122                    DeliveryCheckpoint::AfterCurrentOperation,
2123                ),
2124                (
2125                    "finish_step",
2126                    DeliveryCheckpoint::AfterCurrentOperation,
2127                    DeliveryCheckpoint::EndOfInteraction,
2128                ),
2129                (
2130                    "audit_only",
2131                    DeliveryCheckpoint::EndOfInteraction,
2132                    DeliveryCheckpoint::InterruptImmediate,
2133                ),
2134            ];
2135
2136            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2137                let bridge = test_bridge();
2138                bridge
2139                    .push_queued_session_remind_from_params(&serde_json::json!({
2140                        "body": format!("Reminder for {mode}"),
2141                        "mode": mode,
2142                    }))
2143                    .await
2144                    .expect("valid session/remind payload");
2145
2146                let premature = bridge
2147                    .take_queued_transcript_injections_for(wrong_checkpoint)
2148                    .await;
2149                assert!(
2150                    premature.is_empty(),
2151                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2152                );
2153
2154                let delivered = bridge
2155                    .take_queued_transcript_injections_for(expected_checkpoint)
2156                    .await;
2157                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2158                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2159                    panic!("expected reminder for {mode}");
2160                };
2161                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2162            }
2163        });
2164    }
2165
2166    #[test]
2167    fn session_remind_validation_rejects_user_message_shape() {
2168        let err = queued_session_remind_from_params(&serde_json::json!({
2169            "content": "this is still a user message",
2170            "mode": "interrupt_immediate",
2171        }))
2172        .expect_err("session/remind must require a reminder body");
2173        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2174        assert!(err.contains("body"));
2175    }
2176
2177    #[test]
2178    fn session_remind_validation_rejects_unknown_options_separately() {
2179        let err = queued_session_remind_from_params(&serde_json::json!({
2180            "body": "valid body",
2181            "unknown_host_field": true,
2182        }))
2183        .expect_err("session/remind must reject unknown top-level fields");
2184        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2185        assert!(err.contains("unknown_host_field"));
2186    }
2187
2188    #[test]
2189    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2190        let err = queued_session_remind_from_params(&serde_json::json!({
2191            "body": "valid body",
2192            "propagate": "workspace",
2193        }))
2194        .expect_err("session/remind must reject unknown propagate values");
2195        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2196        assert!(err.contains("propagate"));
2197    }
2198
2199    #[test]
2200    fn test_json_result_to_vm_value_string() {
2201        let val = serde_json::json!("hello");
2202        let vm_val = json_result_to_vm_value(&val);
2203        assert_eq!(vm_val.display(), "hello");
2204    }
2205
2206    #[test]
2207    fn test_json_result_to_vm_value_dict() {
2208        let val = serde_json::json!({"name": "test", "count": 42});
2209        let vm_val = json_result_to_vm_value(&val);
2210        let VmValue::Dict(d) = &vm_val else {
2211            unreachable!("Expected Dict, got {:?}", vm_val);
2212        };
2213        assert_eq!(d.get("name").unwrap().display(), "test");
2214        assert_eq!(d.get("count").unwrap().display(), "42");
2215    }
2216
2217    #[test]
2218    fn test_json_result_to_vm_value_null() {
2219        let val = serde_json::json!(null);
2220        let vm_val = json_result_to_vm_value(&val);
2221        assert!(matches!(vm_val, VmValue::Nil));
2222    }
2223
2224    #[test]
2225    fn test_json_result_to_vm_value_nested() {
2226        let val = serde_json::json!({
2227            "text": "response",
2228            "tool_calls": [
2229                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2230            ],
2231            "input_tokens": 100,
2232            "output_tokens": 50,
2233        });
2234        let vm_val = json_result_to_vm_value(&val);
2235        let VmValue::Dict(d) = &vm_val else {
2236            unreachable!("Expected Dict, got {:?}", vm_val);
2237        };
2238        assert_eq!(d.get("text").unwrap().display(), "response");
2239        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2240            unreachable!("Expected List for tool_calls");
2241        };
2242        assert_eq!(list.len(), 1);
2243    }
2244
2245    #[test]
2246    fn parse_host_tools_list_accepts_object_wrapper() {
2247        let tools = parse_host_tools_list_response(serde_json::json!({
2248            "tools": [
2249                {
2250                    "name": "Read",
2251                    "description": "Read a file",
2252                    "schema": {"type": "object"},
2253                }
2254            ]
2255        }))
2256        .expect("tool list");
2257
2258        assert_eq!(tools.len(), 1);
2259        assert_eq!(tools[0]["name"], "Read");
2260        assert_eq!(tools[0]["deprecated"], false);
2261    }
2262
2263    #[test]
2264    fn parse_host_tools_list_accepts_compat_fields() {
2265        let tools = parse_host_tools_list_response(serde_json::json!({
2266            "result": {
2267                "tools": [
2268                    {
2269                        "name": "Edit",
2270                        "short_description": "Apply an edit",
2271                        "input_schema": {"type": "object"},
2272                        "deprecated": true,
2273                    }
2274                ]
2275            }
2276        }))
2277        .expect("tool list");
2278
2279        assert_eq!(tools[0]["description"], "Apply an edit");
2280        assert_eq!(tools[0]["schema"]["type"], "object");
2281        assert_eq!(tools[0]["deprecated"], true);
2282    }
2283
2284    #[test]
2285    fn parse_host_tools_list_requires_tool_names() {
2286        let err = parse_host_tools_list_response(serde_json::json!({
2287            "tools": [
2288                {"description": "missing name"}
2289            ]
2290        }))
2291        .expect_err("expected error");
2292        assert!(err
2293            .to_string()
2294            .contains("host/tools/list: every tool must include a string `name`"));
2295    }
2296
2297    #[test]
2298    fn test_timeout_duration() {
2299        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2300    }
2301}