Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27/// Default timeout for bridge calls (5 minutes).
28const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33    Arc::new(move |line: &str| {
34        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35        let mut stdout = std::io::stdout().lock();
36        stdout
37            .write_all(line.as_bytes())
38            .map_err(|e| format!("Bridge write error: {e}"))?;
39        stdout
40            .write_all(b"\n")
41            .map_err(|e| format!("Bridge write error: {e}"))?;
42        stdout
43            .flush()
44            .map_err(|e| format!("Bridge flush error: {e}"))?;
45        Ok(())
46    })
47}
48
49/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
50///
51/// The bridge sends requests to the host on stdout and receives responses
52/// on stdin. A background task reads stdin and dispatches responses to
53/// waiting callers by request ID. All stdout writes are serialized through
54/// a mutex to prevent interleaving.
55pub struct HostBridge {
56    next_id: AtomicU64,
57    /// Pending request waiters, keyed by JSON-RPC id.
58    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59    /// Whether the host has sent a cancel notification.
60    cancelled: Arc<AtomicBool>,
61    /// Wakes pending host calls when cancellation arrives.
62    cancel_notify: Arc<Notify>,
63    /// Transport writer used to send JSON-RPC to the host.
64    writer: HostBridgeWriter,
65    /// ACP session ID (set in ACP mode for session-scoped notifications).
66    session_id: std::sync::Mutex<String>,
67    /// Name of the currently executing Harn script (without .harn suffix).
68    script_name: std::sync::Mutex<String>,
69    /// Transcript injections queued by the host while a run is active.
70    queued_transcript_injections: HostBridgeInjectionState,
71    /// Host-triggered resume signal for daemon agents.
72    resume_requested: Arc<AtomicBool>,
73    /// Host-triggered skill-registry invalidation signal. Set when the
74    /// host sends a `skills/update` notification; consumed by the CLI
75    /// between runs (watch mode, long-running agents) to rebuild the
76    /// layered skill catalog from its current filesystem + host state.
77    skills_reload_requested: Arc<AtomicBool>,
78    /// Whether the current daemon-mode agent loop is blocked in idle wait.
79    daemon_idle: Arc<AtomicBool>,
80    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
81    /// finalize during this prompt. Read once by the ACP adapter when the
82    /// pipeline returns and populated by `host_agent_session_finalize`.
83    /// Pipelines that don't run an agent loop leave this `None`, in which
84    /// case the adapter falls back to `end_turn`.
85    prompt_stop_reason: std::sync::Mutex<Option<String>>,
86    /// Per-call visible assistant text state for call_progress notifications.
87    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88    /// Whether an LLM call's deltas should be exposed to end users while streaming.
89    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90    /// Optional in-process host-module backend used by `harn playground`.
91    in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95    module_path: PathBuf,
96    exported_functions: BTreeMap<String, Rc<VmClosure>>,
97    vm: Vm,
98}
99
100impl InProcessHost {
101    /// Box-pin'd to break the static recursion between the VM's hot dispatch
102    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
103    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
104    /// at this slow-path boundary keeps the recursion satisfied without
105    /// allocating per call in the hot per-callback path.
106    fn dispatch<'a>(
107        &'a self,
108        method: &'a str,
109        params: serde_json::Value,
110    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111        Box::pin(async move {
112            match method {
113                "builtin_call" => {
114                    let name = params
115                        .get("name")
116                        .and_then(|value| value.as_str())
117                        .unwrap_or_default();
118                    let args = params
119                        .get("args")
120                        .and_then(|value| value.as_array())
121                        .cloned()
122                        .unwrap_or_default()
123                        .into_iter()
124                        .map(|value| json_result_to_vm_value(&value))
125                        .collect::<Vec<_>>();
126                    self.invoke_export(name, &args).await
127                }
128                "host/tools/list" => self
129                    .invoke_optional_export("host_tools_list", &[])
130                    .await
131                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132                "session/request_permission" => self.request_permission(params).await,
133                other => Err(VmError::Runtime(format!(
134                    "playground host backend does not implement bridge method '{other}'"
135                ))),
136            }
137        })
138    }
139
140    async fn invoke_export(
141        &self,
142        name: &str,
143        args: &[VmValue],
144    ) -> Result<serde_json::Value, VmError> {
145        let Some(closure) = self.exported_functions.get(name) else {
146            return Err(VmError::Runtime(format!(
147                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148                self.module_path.display()
149            )));
150        };
151
152        let mut vm = self.vm.child_vm_for_host();
153        let result = vm.call_closure_pub(closure, args).await?;
154        Ok(crate::llm::vm_value_to_json(&result))
155    }
156
157    async fn invoke_optional_export(
158        &self,
159        name: &str,
160        args: &[VmValue],
161    ) -> Result<Option<serde_json::Value>, VmError> {
162        if !self.exported_functions.contains_key(name) {
163            return Ok(None);
164        }
165        self.invoke_export(name, args).await.map(Some)
166    }
167
168    async fn request_permission(
169        &self,
170        params: serde_json::Value,
171    ) -> Result<serde_json::Value, VmError> {
172        // No exported `request_permission` means the playground host has no
173        // approval policy, so it grants through the canonical ACP option.
174        let Some(closure) = self.exported_functions.get("request_permission") else {
175            return Ok(crate::llm::acp_permission::allow_response());
176        };
177
178        let tool_call = params.get("toolCall");
179        let tool_name = tool_call
180            .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
181            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
182            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
183            .and_then(|value| value.as_str())
184            .unwrap_or_default();
185        let tool_args = tool_call
186            .and_then(|tool_call| tool_call.get("rawInput"))
187            .map(json_result_to_vm_value)
188            .unwrap_or(VmValue::Nil);
189        let full_payload = json_result_to_vm_value(&params);
190
191        let arg_count = closure.func.params.len();
192        let args = if arg_count >= 3 {
193            vec![
194                VmValue::String(Rc::from(tool_name.to_string())),
195                tool_args,
196                full_payload,
197            ]
198        } else if arg_count == 2 {
199            vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
200        } else if arg_count == 1 {
201            vec![full_payload]
202        } else {
203            Vec::new()
204        };
205
206        let mut vm = self.vm.child_vm_for_host();
207        let result = vm.call_closure_pub(closure, &args).await?;
208        // Translate the script's verdict into a canonical ACP response
209        // (`{ outcome: { outcome: "selected" | "cancelled", optionId? } }`).
210        // The script API stays ergonomic — bool / string-reason / dict — but
211        // the wire shape is canonical.
212        let payload = match result {
213            VmValue::Bool(granted) => {
214                if granted {
215                    crate::llm::acp_permission::allow_response()
216                } else {
217                    crate::llm::acp_permission::reject_response(None)
218                }
219            }
220            VmValue::String(reason) if !reason.is_empty() => {
221                crate::llm::acp_permission::reject_response(Some(reason.to_string()))
222            }
223            other => {
224                let json = crate::llm::vm_value_to_json(&other);
225                if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
226                    if granted {
227                        crate::llm::acp_permission::allow_response()
228                    } else {
229                        crate::llm::acp_permission::reject_response(
230                            json.get("reason")
231                                .and_then(|value| value.as_str())
232                                .map(str::to_string),
233                        )
234                    }
235                } else if json.get("outcome").is_some() {
236                    // The script already returned a canonical-shaped outcome.
237                    json
238                } else if other.is_truthy() {
239                    crate::llm::acp_permission::allow_response()
240                } else {
241                    crate::llm::acp_permission::reject_response(None)
242                }
243            }
244        };
245        Ok(payload)
246    }
247}
248
249/// How a queued bridge injection is delivered into the agent loop.
250///
251/// `AuditOnly` was previously called `WaitForCompletion`; the rename is
252/// truth-in-advertising (harn#2212). These injections drain at
253/// `loop_exit`, *after* the last LLM call has returned — so they land in
254/// the transcript audit but are **never rendered into a model prompt**.
255/// Hosts that want the model to react to the reminder on its final
256/// iteration should use `FinishStep` instead, which drains at every
257/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
258#[derive(Clone, Copy, Debug, PartialEq, Eq)]
259pub enum QueuedUserMessageMode {
260    InterruptImmediate,
261    FinishStep,
262    AuditOnly,
263}
264
265#[derive(Clone, Copy, Debug, PartialEq, Eq)]
266pub enum DeliveryCheckpoint {
267    InterruptImmediate,
268    AfterCurrentOperation,
269    EndOfInteraction,
270}
271
272impl QueuedUserMessageMode {
273    fn from_str(value: &str) -> Self {
274        match value {
275            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
276            "finish_step" | "after_current_operation" => Self::FinishStep,
277            // Unknown / missing modes fall through to the safest option:
278            // record for audit, do not preempt the loop. Pre-#2212 hosts
279            // that send `wait_for_completion` are caught by this arm —
280            // the canonical name is `audit_only` going forward.
281            _ => Self::AuditOnly,
282        }
283    }
284
285    fn as_str(self) -> &'static str {
286        match self {
287            Self::InterruptImmediate => "interrupt_immediate",
288            Self::FinishStep => "finish_step",
289            Self::AuditOnly => "audit_only",
290        }
291    }
292}
293
294#[derive(Clone, Debug, PartialEq, Eq)]
295pub struct QueuedUserMessage {
296    pub message_id: String,
297    pub content: String,
298    pub transcript_content: serde_json::Value,
299    pub mode: QueuedUserMessageMode,
300}
301
302#[derive(Clone, Debug, PartialEq, Eq)]
303pub struct QueuedReminder {
304    pub reminder: crate::llm::helpers::SystemReminder,
305    pub mode: QueuedUserMessageMode,
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub enum QueuedTranscriptInjection {
310    User(QueuedUserMessage),
311    Reminder(QueuedReminder),
312}
313
314#[derive(Debug, Default)]
315struct QueuedTranscriptInjections {
316    queue: VecDeque<QueuedTranscriptInjection>,
317    revoked_user_message_ids: HashSet<String>,
318    delivered_user_message_ids: HashSet<String>,
319    revoked_reminder_ids: HashSet<String>,
320    delivered_reminder_ids: HashSet<String>,
321}
322
323#[derive(Clone, Debug, Default)]
324pub struct HostBridgeInjectionState {
325    inner: Arc<Mutex<QueuedTranscriptInjections>>,
326}
327
328#[derive(Clone, Copy, Debug, PartialEq, Eq)]
329pub enum PendingUserMessageMutationResult {
330    Mutated,
331    AlreadyRevoked,
332    AlreadyDelivered,
333    UnknownMessageId,
334}
335
336impl QueuedTranscriptInjection {
337    fn mode(&self) -> QueuedUserMessageMode {
338        match self {
339            Self::User(message) => message.mode,
340            Self::Reminder(reminder) => reminder.mode,
341        }
342    }
343
344    fn pending_json(&self, position: usize) -> serde_json::Value {
345        match self {
346            Self::User(message) => serde_json::json!({
347                "kind": "user",
348                "id": message.message_id,
349                "messageId": message.message_id,
350                "mode": message.mode.as_str(),
351                "position": position,
352                "content": message.transcript_content,
353            }),
354            Self::Reminder(reminder) => serde_json::json!({
355                "kind": "reminder",
356                "id": reminder.reminder.id,
357                "reminderId": reminder.reminder.id,
358                "mode": reminder.mode.as_str(),
359                "position": position,
360                "body": reminder.reminder.body,
361                "tags": reminder.reminder.tags,
362                "dedupeKey": reminder.reminder.dedupe_key,
363                "ttlTurns": reminder.reminder.ttl_turns,
364                "preserveOnCompact": reminder.reminder.preserve_on_compact,
365                "propagate": reminder.reminder.propagate.as_str(),
366                "roleHint": reminder.reminder.role_hint.as_str(),
367                "source": reminder.reminder.source.as_str(),
368                "firedAtTurn": reminder.reminder.fired_at_turn,
369                "originatingAgentId": reminder.reminder.originating_agent_id,
370            }),
371        }
372    }
373}
374
375#[derive(Clone, Copy, Debug, PartialEq, Eq)]
376pub enum PendingReminderMutationResult {
377    Mutated,
378    AlreadyRevoked,
379    AlreadyDelivered,
380    UnknownReminderId,
381}
382
383fn new_inject_message_id() -> String {
384    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
385}
386
387impl HostBridgeInjectionState {
388    pub fn new() -> Self {
389        Self::default()
390    }
391
392    pub async fn push_pending_user_message(
393        &self,
394        content: String,
395        transcript_content: serde_json::Value,
396        mode: &str,
397    ) -> String {
398        let message_id = new_inject_message_id();
399        self.inner
400            .lock()
401            .await
402            .queue
403            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
404                message_id: message_id.clone(),
405                content,
406                transcript_content,
407                mode: QueuedUserMessageMode::from_str(mode),
408            }));
409        message_id
410    }
411
412    pub async fn revoke_pending_user_message(
413        &self,
414        message_id: &str,
415    ) -> PendingUserMessageMutationResult {
416        let mut state = self.inner.lock().await;
417        let mut retained = VecDeque::new();
418        let mut revoked = false;
419        while let Some(injection) = state.queue.pop_front() {
420            match &injection {
421                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
422                    revoked = true;
423                }
424                _ => retained.push_back(injection),
425            }
426        }
427        state.queue = retained;
428        if revoked {
429            state
430                .revoked_user_message_ids
431                .insert(message_id.to_string());
432            return PendingUserMessageMutationResult::Mutated;
433        }
434        if state.revoked_user_message_ids.contains(message_id) {
435            PendingUserMessageMutationResult::AlreadyRevoked
436        } else if state.delivered_user_message_ids.contains(message_id) {
437            PendingUserMessageMutationResult::AlreadyDelivered
438        } else {
439            PendingUserMessageMutationResult::UnknownMessageId
440        }
441    }
442
443    pub async fn revoke_pending_reminder(
444        &self,
445        reminder_id: &str,
446    ) -> PendingReminderMutationResult {
447        let mut state = self.inner.lock().await;
448        let mut retained = VecDeque::new();
449        let mut revoked = false;
450        while let Some(injection) = state.queue.pop_front() {
451            match &injection {
452                QueuedTranscriptInjection::Reminder(reminder)
453                    if reminder.reminder.id == reminder_id =>
454                {
455                    revoked = true;
456                }
457                _ => retained.push_back(injection),
458            }
459        }
460        state.queue = retained;
461        if revoked {
462            state.revoked_reminder_ids.insert(reminder_id.to_string());
463            return PendingReminderMutationResult::Mutated;
464        }
465        if state.revoked_reminder_ids.contains(reminder_id) {
466            PendingReminderMutationResult::AlreadyRevoked
467        } else if state.delivered_reminder_ids.contains(reminder_id) {
468            PendingReminderMutationResult::AlreadyDelivered
469        } else {
470            PendingReminderMutationResult::UnknownReminderId
471        }
472    }
473
474    pub async fn replace_pending_user_message(
475        &self,
476        message_id: &str,
477        content: String,
478        transcript_content: serde_json::Value,
479    ) -> PendingUserMessageMutationResult {
480        let mut state = self.inner.lock().await;
481        for injection in &mut state.queue {
482            if let QueuedTranscriptInjection::User(message) = injection {
483                if message.message_id == message_id {
484                    message.content = content;
485                    message.transcript_content = transcript_content;
486                    return PendingUserMessageMutationResult::Mutated;
487                }
488            }
489        }
490        if state.revoked_user_message_ids.contains(message_id) {
491            PendingUserMessageMutationResult::AlreadyRevoked
492        } else if state.delivered_user_message_ids.contains(message_id) {
493            PendingUserMessageMutationResult::AlreadyDelivered
494        } else {
495            PendingUserMessageMutationResult::UnknownMessageId
496        }
497    }
498
499    async fn push_session_reminder(&self, reminder: QueuedReminder) {
500        self.inner
501            .lock()
502            .await
503            .queue
504            .push_back(QueuedTranscriptInjection::Reminder(reminder));
505    }
506
507    pub async fn pending_injections_json(&self) -> serde_json::Value {
508        let state = self.inner.lock().await;
509        let injections = state
510            .queue
511            .iter()
512            .enumerate()
513            .map(|(position, injection)| injection.pending_json(position))
514            .collect::<Vec<_>>();
515        serde_json::json!({
516            "pendingCount": injections.len(),
517            "injections": injections,
518        })
519    }
520}
521
522fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
523    format!(
524        "{}: {}",
525        Code::ReminderUnknownOption.as_str(),
526        message.as_ref()
527    )
528}
529
530fn session_remind_shape_error(message: impl AsRef<str>) -> String {
531    format!(
532        "{}: {}",
533        Code::ReminderInvalidShape.as_str(),
534        message.as_ref()
535    )
536}
537
538fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
539    format!(
540        "{}: {}",
541        Code::ReminderUnknownPropagate.as_str(),
542        message.as_ref()
543    )
544}
545
546fn string_field(
547    map: &serde_json::Map<String, serde_json::Value>,
548    key: &str,
549    required: bool,
550) -> Result<Option<String>, String> {
551    match map.get(key) {
552        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
553            format!("`{key}` must be a non-empty string"),
554        )),
555        None | Some(serde_json::Value::Null) => Ok(None),
556        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
557            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
558        ),
559        Some(serde_json::Value::String(value)) => {
560            let trimmed = value.trim();
561            if trimmed.is_empty() {
562                Ok(None)
563            } else {
564                Ok(Some(trimmed.to_string()))
565            }
566        }
567        Some(other) => Err(session_remind_shape_error(format!(
568            "`{key}` must be a string, got {other}"
569        ))),
570    }
571}
572
573fn bool_field(
574    map: &serde_json::Map<String, serde_json::Value>,
575    key: &str,
576) -> Result<Option<bool>, String> {
577    match map.get(key) {
578        None | Some(serde_json::Value::Null) => Ok(None),
579        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
580        Some(other) => Err(session_remind_shape_error(format!(
581            "`{key}` must be a bool, got {other}"
582        ))),
583    }
584}
585
586fn int_field(
587    map: &serde_json::Map<String, serde_json::Value>,
588    key: &str,
589) -> Result<Option<i64>, String> {
590    match map.get(key) {
591        None | Some(serde_json::Value::Null) => Ok(None),
592        Some(serde_json::Value::Number(value)) => {
593            let Some(value) = value.as_i64() else {
594                return Err(session_remind_shape_error(format!(
595                    "`{key}` must be an integer"
596                )));
597            };
598            Ok(Some(value))
599        }
600        Some(other) => Err(session_remind_shape_error(format!(
601            "`{key}` must be an int, got {other}"
602        ))),
603    }
604}
605
606fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
607    let Some(value) = map.get("tags") else {
608        return Ok(Vec::new());
609    };
610    if value.is_null() {
611        return Ok(Vec::new());
612    }
613    let Some(values) = value.as_array() else {
614        return Err(session_remind_shape_error("`tags` must be a list"));
615    };
616    let mut tags = Vec::new();
617    for value in values {
618        let Some(tag) = value.as_str() else {
619            return Err(session_remind_shape_error(format!(
620                "`tags` entries must be strings, got {value}"
621            )));
622        };
623        let tag = tag.trim();
624        if tag.is_empty() {
625            return Err(session_remind_shape_error(
626                "`tags` entries must be non-empty strings",
627            ));
628        }
629        if !tags.iter().any(|existing| existing == tag) {
630            tags.push(tag.to_string());
631        }
632    }
633    Ok(tags)
634}
635
636fn session_remind_payload_from_value(
637    value: &serde_json::Value,
638) -> Result<crate::llm::helpers::SystemReminder, String> {
639    let Some(map) = value.as_object() else {
640        return Err(session_remind_shape_error(
641            "session/remind payload must be a reminder object",
642        ));
643    };
644    const ALLOWED: &[&str] = &[
645        "_meta",
646        "body",
647        "dedupe_key",
648        "fired_at_turn",
649        "id",
650        "preserve_on_compact",
651        "propagate",
652        "role_hint",
653        "source",
654        "tags",
655        "ttl_turns",
656    ];
657    let unknown = map
658        .keys()
659        .filter(|key| !ALLOWED.contains(&key.as_str()))
660        .map(String::as_str)
661        .collect::<Vec<_>>();
662    if !unknown.is_empty() {
663        if unknown.contains(&"content") {
664            return Err(session_remind_shape_error(
665                "session/remind expects reminder `body`, not user-message `content`",
666            ));
667        }
668        return Err(reminder_unknown_option_error(format!(
669            "unknown reminder option(s): {}",
670            unknown.join(", ")
671        )));
672    }
673    if let Some(meta) = map.get("_meta") {
674        if !meta.is_null() && !meta.is_object() {
675            return Err(session_remind_shape_error("`_meta` must be an object"));
676        }
677    }
678    let ttl_turns = int_field(map, "ttl_turns")?;
679    if let Some(value) = ttl_turns {
680        if value <= 0 {
681            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
682        }
683    }
684    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
685    if fired_at_turn < 0 {
686        return Err(session_remind_shape_error(
687            "`fired_at_turn` must be >= 0 when provided",
688        ));
689    }
690    match string_field(map, "source", false)?.as_deref() {
691        None | Some("bridge") => {}
692        Some(_) => {
693            return Err(session_remind_shape_error(
694                "`source` for session/remind must be bridge when provided",
695            ))
696        }
697    }
698    let propagate = match string_field(map, "propagate", false)?.as_deref() {
699        None => crate::llm::helpers::ReminderPropagate::Session,
700        Some("all") => crate::llm::helpers::ReminderPropagate::All,
701        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
702        Some("none") => crate::llm::helpers::ReminderPropagate::None,
703        Some(_) => {
704            return Err(reminder_unknown_propagate_error(
705                "`propagate` must be one of all, session, or none",
706            ))
707        }
708    };
709    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
710        None => crate::llm::helpers::ReminderRoleHint::System,
711        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
712        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
713        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
714        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
715        Some(_) => {
716            return Err(session_remind_shape_error(
717                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
718            ))
719        }
720    };
721    Ok(crate::llm::helpers::SystemReminder {
722        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
723        tags: tags_field(map)?,
724        dedupe_key: string_field(map, "dedupe_key", false)?,
725        ttl_turns,
726        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
727        propagate,
728        role_hint,
729        source: crate::llm::helpers::ReminderSource::Bridge,
730        body: string_field(map, "body", true)?.unwrap_or_default(),
731        fired_at_turn,
732        originating_agent_id: None,
733    })
734}
735
736/// Parse the params of a `session/cancel_tool_call` notification and fire
737/// the per-tool-call cancellation. Mirrors the shape used by the public
738/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
739/// regardless of which surface they came through.
740///
741/// Stdio bridges send this as a notification (no id, no response); the
742/// builtin handles request/response semantics in Harn. We deliberately
743/// drop malformed payloads silently because notifications can't reply
744/// with an error — logging would also be too noisy for partial drops.
745fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
746    let session_id = params
747        .get("sessionId")
748        .or_else(|| params.get("session_id"))
749        .and_then(|value| value.as_str())
750        .unwrap_or_default();
751    let call_id = params
752        .get("toolCallId")
753        .or_else(|| params.get("tool_call_id"))
754        .or_else(|| params.get("callId"))
755        .or_else(|| params.get("call_id"))
756        .and_then(|value| value.as_str())
757        .unwrap_or_default();
758    if call_id.is_empty() {
759        return;
760    }
761    let reason = params
762        .get("reason")
763        .and_then(|value| value.as_str())
764        .unwrap_or("host cancelled in-flight tool call")
765        .to_string();
766    let inject_reminder = params
767        .get("injectReminder")
768        .or_else(|| params.get("inject_reminder"))
769        .and_then(|value| value.as_bool())
770        .unwrap_or(true);
771    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
772}
773
774fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
775    let mode = QueuedUserMessageMode::from_str(
776        params
777            .get("mode")
778            .and_then(|value| value.as_str())
779            .unwrap_or("audit_only"),
780    );
781    let reminder_value = if let Some(reminder) = params.get("reminder") {
782        reminder.clone()
783    } else {
784        let Some(params) = params.as_object() else {
785            return Err(session_remind_shape_error(
786                "session/remind params must be an object",
787            ));
788        };
789        let mut reminder = params.clone();
790        reminder.remove("mode");
791        reminder.remove("sessionId");
792        reminder.remove("session_id");
793        serde_json::Value::Object(reminder)
794    };
795    Ok(QueuedReminder {
796        reminder: session_remind_payload_from_value(&reminder_value)?,
797        mode,
798    })
799}
800
801// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
802#[allow(clippy::new_without_default)]
803impl HostBridge {
804    /// Create a new bridge and spawn the stdin reader task.
805    ///
806    /// Must be called within a tokio LocalSet (uses spawn_local for the
807    /// stdin reader since it's single-threaded).
808    pub fn new() -> Self {
809        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
810            Arc::new(Mutex::new(HashMap::new()));
811        let cancelled = Arc::new(AtomicBool::new(false));
812        let cancel_notify = Arc::new(Notify::new());
813        let queued_transcript_injections = HostBridgeInjectionState::default();
814        let resume_requested = Arc::new(AtomicBool::new(false));
815        let skills_reload_requested = Arc::new(AtomicBool::new(false));
816        let daemon_idle = Arc::new(AtomicBool::new(false));
817
818        // Stdin reader: reads JSON-RPC lines and dispatches responses
819        let pending_clone = pending.clone();
820        let cancelled_clone = cancelled.clone();
821        let cancel_notify_clone = cancel_notify.clone();
822        let queued_clone = queued_transcript_injections.clone();
823        let resume_clone = resume_requested.clone();
824        let skills_reload_clone = skills_reload_requested.clone();
825        tokio::task::spawn_local(async move {
826            let stdin = tokio::io::stdin();
827            let reader = tokio::io::BufReader::new(stdin);
828            let mut lines = reader.lines();
829
830            while let Ok(Some(line)) = lines.next_line().await {
831                let line = line.trim().to_string();
832                if line.is_empty() {
833                    continue;
834                }
835
836                let msg: serde_json::Value = match serde_json::from_str(&line) {
837                    Ok(v) => v,
838                    Err(_) => continue,
839                };
840
841                // Notifications have no id; responses have one.
842                if msg.get("id").is_none() {
843                    if let Some(method) = msg["method"].as_str() {
844                        if method == "cancel" {
845                            cancelled_clone.store(true, Ordering::SeqCst);
846                            cancel_notify_clone.notify_waiters();
847                        } else if method == "agent/resume" {
848                            resume_clone.store(true, Ordering::SeqCst);
849                        } else if method == "skills/update" {
850                            skills_reload_clone.store(true, Ordering::SeqCst);
851                        } else if method == "session/remind" {
852                            let params = &msg["params"];
853                            if let Ok(reminder) = queued_session_remind_from_params(params) {
854                                queued_clone.push_session_reminder(reminder).await;
855                            }
856                        } else if method == "session/cancel_tool_call" {
857                            handle_cancel_tool_call_notification(&msg["params"]);
858                        }
859                    }
860                    continue;
861                }
862
863                if let Some(id) = msg["id"].as_u64() {
864                    let mut pending = pending_clone.lock().await;
865                    if let Some(sender) = pending.remove(&id) {
866                        let _ = sender.send(msg);
867                    }
868                }
869            }
870
871            // stdin closed: drop pending senders to cancel waiters.
872            let mut pending = pending_clone.lock().await;
873            pending.clear();
874        });
875
876        Self {
877            next_id: AtomicU64::new(1),
878            pending,
879            cancelled,
880            cancel_notify,
881            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
882            session_id: std::sync::Mutex::new(String::new()),
883            script_name: std::sync::Mutex::new(String::new()),
884            queued_transcript_injections,
885            resume_requested,
886            skills_reload_requested,
887            daemon_idle,
888            prompt_stop_reason: std::sync::Mutex::new(None),
889            visible_call_states: std::sync::Mutex::new(HashMap::new()),
890            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
891            in_process: None,
892        }
893    }
894
895    /// Create a bridge from pre-existing shared state.
896    ///
897    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
898    /// responsible for dispatching responses into `pending`.  This is used
899    /// by ACP mode which already has its own stdin reader.
900    pub fn from_parts(
901        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
902        cancelled: Arc<AtomicBool>,
903        stdout_lock: Arc<std::sync::Mutex<()>>,
904        start_id: u64,
905    ) -> Self {
906        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
907    }
908
909    pub fn from_parts_with_writer(
910        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
911        cancelled: Arc<AtomicBool>,
912        writer: HostBridgeWriter,
913        start_id: u64,
914    ) -> Self {
915        Self::from_parts_with_writer_and_cancel_notify(
916            pending,
917            cancelled,
918            Arc::new(Notify::new()),
919            writer,
920            start_id,
921        )
922    }
923
924    pub fn from_parts_with_writer_and_cancel_notify(
925        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
926        cancelled: Arc<AtomicBool>,
927        cancel_notify: Arc<Notify>,
928        writer: HostBridgeWriter,
929        start_id: u64,
930    ) -> Self {
931        Self::from_parts_with_writer_cancel_notify_and_injection_state(
932            pending,
933            cancelled,
934            cancel_notify,
935            writer,
936            start_id,
937            None,
938        )
939    }
940
941    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
942        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
943        cancelled: Arc<AtomicBool>,
944        cancel_notify: Arc<Notify>,
945        writer: HostBridgeWriter,
946        start_id: u64,
947        injection_state: Option<HostBridgeInjectionState>,
948    ) -> Self {
949        Self {
950            next_id: AtomicU64::new(start_id),
951            pending,
952            cancelled,
953            cancel_notify,
954            writer,
955            session_id: std::sync::Mutex::new(String::new()),
956            script_name: std::sync::Mutex::new(String::new()),
957            queued_transcript_injections: injection_state.unwrap_or_default(),
958            resume_requested: Arc::new(AtomicBool::new(false)),
959            skills_reload_requested: Arc::new(AtomicBool::new(false)),
960            daemon_idle: Arc::new(AtomicBool::new(false)),
961            prompt_stop_reason: std::sync::Mutex::new(None),
962            visible_call_states: std::sync::Mutex::new(HashMap::new()),
963            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
964            in_process: None,
965        }
966    }
967
968    /// Create an in-process host bridge backed by exported functions from a
969    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
970    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
971        let exported_functions = vm.load_module_exports(module_path).await?;
972        Ok(Self {
973            next_id: AtomicU64::new(1),
974            pending: Arc::new(Mutex::new(HashMap::new())),
975            cancelled: Arc::new(AtomicBool::new(false)),
976            cancel_notify: Arc::new(Notify::new()),
977            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
978            session_id: std::sync::Mutex::new(String::new()),
979            script_name: std::sync::Mutex::new(String::new()),
980            queued_transcript_injections: HostBridgeInjectionState::default(),
981            resume_requested: Arc::new(AtomicBool::new(false)),
982            skills_reload_requested: Arc::new(AtomicBool::new(false)),
983            daemon_idle: Arc::new(AtomicBool::new(false)),
984            prompt_stop_reason: std::sync::Mutex::new(None),
985            visible_call_states: std::sync::Mutex::new(HashMap::new()),
986            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
987            in_process: Some(InProcessHost {
988                module_path: module_path.to_path_buf(),
989                exported_functions,
990                vm,
991            }),
992        })
993    }
994
995    /// Set the ACP session ID for session-scoped notifications.
996    pub fn set_session_id(&self, id: &str) {
997        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
998    }
999
1000    /// Set the currently executing script name (without .harn suffix).
1001    pub fn set_script_name(&self, name: &str) {
1002        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1003    }
1004
1005    /// Get the current script name.
1006    fn get_script_name(&self) -> String {
1007        self.script_name
1008            .lock()
1009            .unwrap_or_else(|e| e.into_inner())
1010            .clone()
1011    }
1012
1013    /// Get the session ID.
1014    pub fn get_session_id(&self) -> String {
1015        self.session_id
1016            .lock()
1017            .unwrap_or_else(|e| e.into_inner())
1018            .clone()
1019    }
1020
1021    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
1022    fn write_line(&self, line: &str) -> Result<(), VmError> {
1023        (self.writer)(line).map_err(VmError::Runtime)
1024    }
1025
1026    /// Send a JSON-RPC request to the host and wait for the response.
1027    /// Times out after 5 minutes to prevent deadlocks.
1028    pub async fn call(
1029        &self,
1030        method: &str,
1031        params: serde_json::Value,
1032    ) -> Result<serde_json::Value, VmError> {
1033        if let Some(in_process) = &self.in_process {
1034            return in_process.dispatch(method, params).await;
1035        }
1036
1037        if self.is_cancelled() {
1038            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1039        }
1040
1041        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1042        let cancel_wait = self.cancel_notify.notified();
1043        tokio::pin!(cancel_wait);
1044
1045        let request = crate::jsonrpc::request(id, method, params);
1046
1047        let (tx, rx) = oneshot::channel();
1048        {
1049            let mut pending = self.pending.lock().await;
1050            pending.insert(id, tx);
1051        }
1052
1053        let line = serde_json::to_string(&request)
1054            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1055        if let Err(e) = self.write_line(&line) {
1056            let mut pending = self.pending.lock().await;
1057            pending.remove(&id);
1058            return Err(e);
1059        }
1060
1061        if self.is_cancelled() {
1062            let mut pending = self.pending.lock().await;
1063            pending.remove(&id);
1064            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1065        }
1066
1067        let response = tokio::select! {
1068            result = rx => match result {
1069                Ok(msg) => msg,
1070                Err(_) => {
1071                    // Sender dropped: host closed or stdin reader exited.
1072                    return Err(VmError::Runtime(
1073                        "Bridge: host closed connection before responding".into(),
1074                    ));
1075                }
1076            },
1077            _ = &mut cancel_wait => {
1078                let mut pending = self.pending.lock().await;
1079                pending.remove(&id);
1080                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1081            }
1082            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1083                let mut pending = self.pending.lock().await;
1084                pending.remove(&id);
1085                return Err(VmError::Runtime(format!(
1086                    "Bridge: host did not respond to '{method}' within {}s",
1087                    DEFAULT_TIMEOUT.as_secs()
1088                )));
1089            }
1090        };
1091
1092        if let Some(error) = response.get("error") {
1093            let message = error["message"].as_str().unwrap_or("Unknown host error");
1094            let code = error["code"].as_i64().unwrap_or(-1);
1095            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
1096            if code == -32001 {
1097                return Err(VmError::CategorizedError {
1098                    message: message.to_string(),
1099                    category: ErrorCategory::ToolRejected,
1100                });
1101            }
1102            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1103        }
1104
1105        Ok(response["result"].clone())
1106    }
1107
1108    /// Send a JSON-RPC notification to the host (no response expected).
1109    /// Serialized through the stdout mutex to prevent interleaving.
1110    pub fn notify(&self, method: &str, params: serde_json::Value) {
1111        let notification = crate::jsonrpc::notification(method, params);
1112        if self.in_process.is_some() {
1113            return;
1114        }
1115        if let Ok(line) = serde_json::to_string(&notification) {
1116            let _ = self.write_line(&line);
1117        }
1118    }
1119
1120    /// Check if the host has sent a cancel notification.
1121    pub fn is_cancelled(&self) -> bool {
1122        self.cancelled.load(Ordering::SeqCst)
1123    }
1124
1125    pub fn take_resume_signal(&self) -> bool {
1126        self.resume_requested.swap(false, Ordering::SeqCst)
1127    }
1128
1129    pub fn signal_resume(&self) {
1130        self.resume_requested.store(true, Ordering::SeqCst);
1131    }
1132
1133    pub fn set_daemon_idle(&self, idle: bool) {
1134        self.daemon_idle.store(idle, Ordering::SeqCst);
1135    }
1136
1137    pub fn is_daemon_idle(&self) -> bool {
1138        self.daemon_idle.load(Ordering::SeqCst)
1139    }
1140
1141    /// Record the canonical ACP `stopReason` for the current prompt. The
1142    /// last writer wins, which matches the semantic that an outer
1143    /// `agent_loop` (the one whose result the user observes) always
1144    /// finalizes after any inner loops it spawned.
1145    pub fn set_prompt_stop_reason(&self, reason: &str) {
1146        *self
1147            .prompt_stop_reason
1148            .lock()
1149            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1150    }
1151
1152    /// Consume any prompt stop reason recorded during this prompt. The
1153    /// ACP adapter calls this once after the pipeline returns; pipelines
1154    /// that didn't run an `agent_loop` see `None` and the adapter falls
1155    /// back to `end_turn`.
1156    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1157        self.prompt_stop_reason
1158            .lock()
1159            .unwrap_or_else(|e| e.into_inner())
1160            .take()
1161    }
1162
1163    /// Consume any pending `skills/update` signal the host has sent.
1164    /// Returns `true` exactly once per notification, letting callers
1165    /// trigger a layered-discovery rebuild without polling false
1166    /// positives. See issue #73 for the hot-reload contract.
1167    pub fn take_skills_reload_signal(&self) -> bool {
1168        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1169    }
1170
1171    /// Manually mark the skill catalog as stale. Used by tests and by
1172    /// the CLI when an internal event (e.g. `harn install`) should
1173    /// trigger the same rebuild a `skills/update` notification would.
1174    pub fn signal_skills_reload(&self) {
1175        self.skills_reload_requested.store(true, Ordering::SeqCst);
1176    }
1177
1178    /// Call the host's `skills/list` RPC and return the raw JSON array
1179    /// it responded with. Shape:
1180    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1181    /// The CLI adapter converts each entry into a
1182    /// [`crate::skills::SkillManifestRef`].
1183    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1184        let result = self.call("skills/list", serde_json::json!({})).await?;
1185        match result {
1186            serde_json::Value::Array(items) => Ok(items),
1187            serde_json::Value::Object(map) => match map.get("skills") {
1188                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1189                _ => Err(VmError::Runtime(
1190                    "skills/list: host response must be an array or { skills: [...] }".into(),
1191                )),
1192            },
1193            _ => Err(VmError::Runtime(
1194                "skills/list: unexpected response shape".into(),
1195            )),
1196        }
1197    }
1198
1199    /// Call the host's `host/tools/list` RPC and return normalized tool
1200    /// descriptors. Shape:
1201    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1202    /// The bridge also accepts `{ "tools": [...] }` and
1203    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1204    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1205        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1206        parse_host_tools_list_response(result)
1207    }
1208
1209    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1210    /// raw JSON body so the CLI can inspect both the frontmatter fields
1211    /// and the skill markdown body in whatever shape the host sends.
1212    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1213        self.call("skills/fetch", serde_json::json!({ "id": id }))
1214            .await
1215    }
1216
1217    pub fn injection_state(&self) -> HostBridgeInjectionState {
1218        self.queued_transcript_injections.clone()
1219    }
1220
1221    pub async fn push_pending_user_message(
1222        &self,
1223        content: String,
1224        transcript_content: serde_json::Value,
1225        mode: &str,
1226    ) -> String {
1227        self.queued_transcript_injections
1228            .push_pending_user_message(content, transcript_content, mode)
1229            .await
1230    }
1231
1232    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1233        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1234            .await
1235    }
1236
1237    pub async fn revoke_pending_user_message(
1238        &self,
1239        message_id: &str,
1240    ) -> PendingUserMessageMutationResult {
1241        self.queued_transcript_injections
1242            .revoke_pending_user_message(message_id)
1243            .await
1244    }
1245
1246    pub async fn revoke_pending_reminder(
1247        &self,
1248        reminder_id: &str,
1249    ) -> PendingReminderMutationResult {
1250        self.queued_transcript_injections
1251            .revoke_pending_reminder(reminder_id)
1252            .await
1253    }
1254
1255    pub async fn pending_injections_json(&self) -> serde_json::Value {
1256        self.queued_transcript_injections
1257            .pending_injections_json()
1258            .await
1259    }
1260
1261    pub async fn replace_pending_user_message(
1262        &self,
1263        message_id: &str,
1264        content: String,
1265        transcript_content: serde_json::Value,
1266    ) -> PendingUserMessageMutationResult {
1267        self.queued_transcript_injections
1268            .replace_pending_user_message(message_id, content, transcript_content)
1269            .await
1270    }
1271
1272    pub async fn push_queued_session_remind_from_params(
1273        &self,
1274        params: &serde_json::Value,
1275    ) -> Result<String, String> {
1276        let reminder = queued_session_remind_from_params(params)?;
1277        let reminder_id = reminder.reminder.id.clone();
1278        self.queued_transcript_injections
1279            .push_session_reminder(reminder)
1280            .await;
1281        Ok(reminder_id)
1282    }
1283
1284    pub async fn take_queued_user_messages(
1285        &self,
1286        include_interrupt_immediate: bool,
1287        include_finish_step: bool,
1288        include_audit_only: bool,
1289    ) -> Vec<QueuedUserMessage> {
1290        let mut state = self.queued_transcript_injections.inner.lock().await;
1291        let mut selected = Vec::new();
1292        let mut retained = VecDeque::new();
1293        while let Some(injection) = state.queue.pop_front() {
1294            let should_take = match injection.mode() {
1295                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1296                QueuedUserMessageMode::FinishStep => include_finish_step,
1297                QueuedUserMessageMode::AuditOnly => include_audit_only,
1298            };
1299            match (should_take, injection) {
1300                (true, QueuedTranscriptInjection::User(message)) => {
1301                    state
1302                        .delivered_user_message_ids
1303                        .insert(message.message_id.clone());
1304                    selected.push(message);
1305                }
1306                (_, injection) => retained.push_back(injection),
1307            }
1308        }
1309        state.queue = retained;
1310        selected
1311    }
1312
1313    pub async fn take_queued_transcript_injections(
1314        &self,
1315        include_interrupt_immediate: bool,
1316        include_finish_step: bool,
1317        include_audit_only: bool,
1318    ) -> Vec<QueuedTranscriptInjection> {
1319        let mut state = self.queued_transcript_injections.inner.lock().await;
1320        let mut selected = Vec::new();
1321        let mut retained = VecDeque::new();
1322        while let Some(injection) = state.queue.pop_front() {
1323            let should_take = match injection.mode() {
1324                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1325                QueuedUserMessageMode::FinishStep => include_finish_step,
1326                QueuedUserMessageMode::AuditOnly => include_audit_only,
1327            };
1328            if should_take {
1329                match &injection {
1330                    QueuedTranscriptInjection::User(message) => {
1331                        state
1332                            .delivered_user_message_ids
1333                            .insert(message.message_id.clone());
1334                    }
1335                    QueuedTranscriptInjection::Reminder(reminder) => {
1336                        state
1337                            .delivered_reminder_ids
1338                            .insert(reminder.reminder.id.clone());
1339                    }
1340                }
1341                selected.push(injection);
1342            } else {
1343                retained.push_back(injection);
1344            }
1345        }
1346        state.queue = retained;
1347        selected
1348    }
1349
1350    pub async fn take_queued_user_messages_for(
1351        &self,
1352        checkpoint: DeliveryCheckpoint,
1353    ) -> Vec<QueuedUserMessage> {
1354        match checkpoint {
1355            DeliveryCheckpoint::InterruptImmediate => {
1356                self.take_queued_user_messages(true, false, false).await
1357            }
1358            DeliveryCheckpoint::AfterCurrentOperation => {
1359                self.take_queued_user_messages(false, true, false).await
1360            }
1361            DeliveryCheckpoint::EndOfInteraction => {
1362                self.take_queued_user_messages(false, false, true).await
1363            }
1364        }
1365    }
1366
1367    pub async fn take_queued_transcript_injections_for(
1368        &self,
1369        checkpoint: DeliveryCheckpoint,
1370    ) -> Vec<QueuedTranscriptInjection> {
1371        match checkpoint {
1372            DeliveryCheckpoint::InterruptImmediate => {
1373                self.take_queued_transcript_injections(true, false, false)
1374                    .await
1375            }
1376            DeliveryCheckpoint::AfterCurrentOperation => {
1377                self.take_queued_transcript_injections(false, true, false)
1378                    .await
1379            }
1380            DeliveryCheckpoint::EndOfInteraction => {
1381                self.take_queued_transcript_injections(false, false, true)
1382                    .await
1383            }
1384        }
1385    }
1386
1387    /// Send an output notification (for log/print in bridge mode).
1388    pub fn send_output(&self, text: &str) {
1389        self.notify("output", serde_json::json!({"text": text}));
1390    }
1391
1392    /// Send a progress notification with optional numeric progress and structured data.
1393    pub fn send_progress(
1394        &self,
1395        phase: &str,
1396        message: &str,
1397        progress: Option<i64>,
1398        total: Option<i64>,
1399        data: Option<serde_json::Value>,
1400    ) {
1401        let mut payload = serde_json::json!({"phase": phase, "message": message});
1402        if let Some(p) = progress {
1403            payload["progress"] = serde_json::json!(p);
1404        }
1405        if let Some(t) = total {
1406            payload["total"] = serde_json::json!(t);
1407        }
1408        if let Some(d) = data {
1409            payload["data"] = d;
1410        }
1411        self.notify("progress", payload);
1412    }
1413
1414    /// Send a structured log notification.
1415    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1416        let mut payload = serde_json::json!({"level": level, "message": message});
1417        if let Some(f) = fields {
1418            payload["fields"] = f;
1419        }
1420        self.notify("log", payload);
1421    }
1422
1423    /// Send a `session/update` with `call_start` — signals the beginning of
1424    /// an LLM call, tool call, or builtin call for observability.
1425    pub fn send_call_start(
1426        &self,
1427        call_id: &str,
1428        call_type: &str,
1429        name: &str,
1430        metadata: serde_json::Value,
1431    ) {
1432        let session_id = self.get_session_id();
1433        let script = self.get_script_name();
1434        let stream_publicly = metadata
1435            .get("stream_publicly")
1436            .and_then(|value| value.as_bool())
1437            .unwrap_or(true);
1438        self.visible_call_streams
1439            .lock()
1440            .unwrap_or_else(|e| e.into_inner())
1441            .insert(call_id.to_string(), stream_publicly);
1442        self.notify(
1443            "session/update",
1444            serde_json::json!({
1445                "sessionId": session_id,
1446                "update": {
1447                    "sessionUpdate": "call_start",
1448                    "content": {
1449                        "toolCallId": call_id,
1450                        "call_type": call_type,
1451                        "name": name,
1452                        "script": script,
1453                        "metadata": metadata,
1454                    },
1455                },
1456            }),
1457        );
1458    }
1459
1460    /// Send a `session/update` with `call_progress` — a streaming token delta
1461    /// from an in-flight LLM call.
1462    pub fn send_call_progress(
1463        &self,
1464        call_id: &str,
1465        delta: &str,
1466        accumulated_tokens: u64,
1467        user_visible: bool,
1468    ) {
1469        let session_id = self.get_session_id();
1470        let (visible_text, visible_delta) = {
1471            let stream_publicly = self
1472                .visible_call_streams
1473                .lock()
1474                .unwrap_or_else(|e| e.into_inner())
1475                .get(call_id)
1476                .copied()
1477                .unwrap_or(true);
1478            let mut states = self
1479                .visible_call_states
1480                .lock()
1481                .unwrap_or_else(|e| e.into_inner());
1482            let state = states.entry(call_id.to_string()).or_default();
1483            state.push(delta, stream_publicly)
1484        };
1485        self.notify(
1486            "session/update",
1487            serde_json::json!({
1488                "sessionId": session_id,
1489                "update": {
1490                    "sessionUpdate": "call_progress",
1491                    "content": {
1492                        "toolCallId": call_id,
1493                        "delta": delta,
1494                        "accumulated_tokens": accumulated_tokens,
1495                        "visible_text": visible_text,
1496                        "visible_delta": visible_delta,
1497                        "user_visible": user_visible,
1498                    },
1499                },
1500            }),
1501        );
1502    }
1503
1504    /// Send a `session/update` with `call_end` — signals completion of a call.
1505    pub fn send_call_end(
1506        &self,
1507        call_id: &str,
1508        call_type: &str,
1509        name: &str,
1510        duration_ms: u64,
1511        status: &str,
1512        metadata: serde_json::Value,
1513    ) {
1514        let session_id = self.get_session_id();
1515        let script = self.get_script_name();
1516        self.visible_call_states
1517            .lock()
1518            .unwrap_or_else(|e| e.into_inner())
1519            .remove(call_id);
1520        self.visible_call_streams
1521            .lock()
1522            .unwrap_or_else(|e| e.into_inner())
1523            .remove(call_id);
1524        self.notify(
1525            "session/update",
1526            serde_json::json!({
1527                "sessionId": session_id,
1528                "update": {
1529                    "sessionUpdate": "call_end",
1530                    "content": {
1531                        "toolCallId": call_id,
1532                        "call_type": call_type,
1533                        "name": name,
1534                        "script": script,
1535                        "duration_ms": duration_ms,
1536                        "status": status,
1537                        "metadata": metadata,
1538                    },
1539                },
1540            }),
1541        );
1542    }
1543
1544    /// Send a worker lifecycle update for delegated/background execution.
1545    pub fn send_worker_update(
1546        &self,
1547        worker_id: &str,
1548        worker_name: &str,
1549        status: &str,
1550        metadata: serde_json::Value,
1551        audit: Option<&MutationSessionRecord>,
1552    ) {
1553        let session_id = self.get_session_id();
1554        let script = self.get_script_name();
1555        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1556        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1557        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1558        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1559        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1560        let lifecycle = serde_json::json!({
1561            "event": status,
1562            "worker_id": worker_id,
1563            "worker_name": worker_name,
1564            "started_at": started_at,
1565            "finished_at": finished_at,
1566        });
1567        self.notify(
1568            "session/update",
1569            serde_json::json!({
1570                "sessionId": session_id,
1571                "update": {
1572                    "sessionUpdate": "worker_update",
1573                    "content": {
1574                        "worker_id": worker_id,
1575                        "worker_name": worker_name,
1576                        "status": status,
1577                        "script": script,
1578                        "started_at": started_at,
1579                        "finished_at": finished_at,
1580                        "snapshot_path": snapshot_path,
1581                        "run_id": run_id,
1582                        "run_path": run_path,
1583                        "lifecycle": lifecycle,
1584                        "audit": audit,
1585                        "metadata": metadata,
1586                    },
1587                },
1588            }),
1589        );
1590    }
1591}
1592
1593/// Convert a serde_json::Value to a VmValue.
1594pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1595    crate::stdlib::json_to_vm_value(val)
1596}
1597
1598fn parse_host_tools_list_response(
1599    result: serde_json::Value,
1600) -> Result<Vec<serde_json::Value>, VmError> {
1601    let tools = match result {
1602        serde_json::Value::Array(items) => items,
1603        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1604            map.get("result")
1605                .and_then(|value| value.get("tools"))
1606                .cloned()
1607        }) {
1608            Some(serde_json::Value::Array(items)) => items,
1609            _ => {
1610                return Err(VmError::Runtime(
1611                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1612                ));
1613            }
1614        },
1615        _ => {
1616            return Err(VmError::Runtime(
1617                "host/tools/list: unexpected response shape".into(),
1618            ));
1619        }
1620    };
1621
1622    let mut normalized = Vec::with_capacity(tools.len());
1623    for tool in tools {
1624        let serde_json::Value::Object(map) = tool else {
1625            return Err(VmError::Runtime(
1626                "host/tools/list: every tool must be an object".into(),
1627            ));
1628        };
1629        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1630            return Err(VmError::Runtime(
1631                "host/tools/list: every tool must include a string `name`".into(),
1632            ));
1633        };
1634        let description = map
1635            .get("description")
1636            .and_then(|value| value.as_str())
1637            .or_else(|| {
1638                map.get("short_description")
1639                    .and_then(|value| value.as_str())
1640            })
1641            .unwrap_or_default();
1642        let schema = map
1643            .get("schema")
1644            .cloned()
1645            .or_else(|| map.get("parameters").cloned())
1646            .or_else(|| map.get("input_schema").cloned())
1647            .unwrap_or(serde_json::Value::Null);
1648        let deprecated = map
1649            .get("deprecated")
1650            .and_then(|value| value.as_bool())
1651            .unwrap_or(false);
1652        normalized.push(serde_json::json!({
1653            "name": name,
1654            "description": description,
1655            "schema": schema,
1656            "deprecated": deprecated,
1657        }));
1658    }
1659    Ok(normalized)
1660}
1661
1662#[cfg(test)]
1663mod tests {
1664    use super::*;
1665
1666    fn test_bridge() -> HostBridge {
1667        HostBridge::from_parts(
1668            Arc::new(Mutex::new(HashMap::new())),
1669            Arc::new(AtomicBool::new(false)),
1670            Arc::new(std::sync::Mutex::new(())),
1671            1,
1672        )
1673    }
1674
1675    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1676        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1677            Arc::new(Mutex::new(HashMap::new())),
1678            Arc::new(AtomicBool::new(false)),
1679            Arc::new(Notify::new()),
1680            Arc::new(|_| Ok(())),
1681            100,
1682            Some(owner.injection_state()),
1683        )
1684    }
1685
1686    #[test]
1687    fn test_json_rpc_request_format() {
1688        let request = crate::jsonrpc::request(
1689            1,
1690            "llm_call",
1691            serde_json::json!({
1692                "prompt": "Hello",
1693                "system": "Be helpful",
1694            }),
1695        );
1696        let s = serde_json::to_string(&request).unwrap();
1697        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1698        assert!(s.contains("\"id\":1"));
1699        assert!(s.contains("\"method\":\"llm_call\""));
1700    }
1701
1702    #[test]
1703    fn test_json_rpc_notification_format() {
1704        let notification =
1705            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1706        let s = serde_json::to_string(&notification).unwrap();
1707        assert!(s.contains("\"method\":\"output\""));
1708        assert!(!s.contains("\"id\""));
1709    }
1710
1711    #[test]
1712    fn test_json_rpc_error_response_parsing() {
1713        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1714        assert!(response.get("error").is_some());
1715        assert_eq!(
1716            response["error"]["message"].as_str().unwrap(),
1717            "Invalid request"
1718        );
1719    }
1720
1721    #[test]
1722    fn test_json_rpc_success_response_parsing() {
1723        let response = crate::jsonrpc::response(
1724            1,
1725            serde_json::json!({
1726                "text": "Hello world",
1727                "input_tokens": 10,
1728                "output_tokens": 5,
1729            }),
1730        );
1731        assert!(response.get("result").is_some());
1732        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1733    }
1734
1735    #[test]
1736    fn test_cancelled_flag() {
1737        let cancelled = Arc::new(AtomicBool::new(false));
1738        assert!(!cancelled.load(Ordering::SeqCst));
1739        cancelled.store(true, Ordering::SeqCst);
1740        assert!(cancelled.load(Ordering::SeqCst));
1741    }
1742
1743    #[test]
1744    fn pending_host_calls_return_when_cancellation_arrives() {
1745        let runtime = tokio::runtime::Builder::new_current_thread()
1746            .enable_all()
1747            .build()
1748            .unwrap();
1749        runtime.block_on(async {
1750            let pending = Arc::new(Mutex::new(HashMap::new()));
1751            let cancelled = Arc::new(AtomicBool::new(false));
1752            let bridge = HostBridge::from_parts_with_writer(
1753                pending.clone(),
1754                cancelled.clone(),
1755                Arc::new(|_| Ok(())),
1756                1,
1757            );
1758
1759            let call = bridge.call("host/work", serde_json::json!({}));
1760            tokio::pin!(call);
1761
1762            loop {
1763                tokio::select! {
1764                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1765                    _ = tokio::task::yield_now() => {}
1766                }
1767                if !pending.lock().await.is_empty() {
1768                    break;
1769                }
1770            }
1771
1772            cancelled.store(true, Ordering::SeqCst);
1773            bridge.cancel_notify.notify_waiters();
1774
1775            let result = tokio::time::timeout(Duration::from_secs(1), call)
1776                .await
1777                .expect("pending call should observe cancellation promptly");
1778            assert!(
1779                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1780            );
1781            assert!(pending.lock().await.is_empty());
1782        });
1783    }
1784
1785    #[test]
1786    fn queued_messages_are_filtered_by_delivery_mode() {
1787        let runtime = tokio::runtime::Builder::new_current_thread()
1788            .enable_all()
1789            .build()
1790            .unwrap();
1791        runtime.block_on(async {
1792            let bridge = test_bridge();
1793            bridge
1794                .push_queued_user_message("first".to_string(), "finish_step")
1795                .await;
1796            bridge
1797                .push_queued_user_message("second".to_string(), "audit_only")
1798                .await;
1799
1800            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1801            assert_eq!(finish_step.len(), 1);
1802            assert_eq!(finish_step[0].content, "first");
1803
1804            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1805            assert_eq!(audit_only.len(), 1);
1806            assert_eq!(audit_only[0].content, "second");
1807        });
1808    }
1809
1810    #[test]
1811    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1812        let runtime = tokio::runtime::Builder::new_current_thread()
1813            .enable_all()
1814            .build()
1815            .unwrap();
1816        runtime.block_on(async {
1817            let bridge = test_bridge();
1818            let first_id = bridge
1819                .push_pending_user_message(
1820                    "first".to_string(),
1821                    serde_json::json!("first"),
1822                    "audit_only",
1823                )
1824                .await;
1825            let second_id = bridge
1826                .push_pending_user_message(
1827                    "second".to_string(),
1828                    serde_json::json!("second"),
1829                    "audit_only",
1830                )
1831                .await;
1832
1833            assert_eq!(
1834                bridge
1835                    .replace_pending_user_message(
1836                        &second_id,
1837                        "second edited".to_string(),
1838                        serde_json::json!("second edited"),
1839                    )
1840                    .await,
1841                PendingUserMessageMutationResult::Mutated
1842            );
1843            assert_eq!(
1844                bridge.revoke_pending_user_message(&first_id).await,
1845                PendingUserMessageMutationResult::Mutated
1846            );
1847            assert_eq!(
1848                bridge.revoke_pending_user_message(&first_id).await,
1849                PendingUserMessageMutationResult::AlreadyRevoked
1850            );
1851
1852            let delivered = bridge
1853                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1854                .await;
1855            assert_eq!(delivered.len(), 1);
1856            assert_eq!(delivered[0].message_id, second_id);
1857            assert_eq!(delivered[0].content, "second edited");
1858
1859            assert_eq!(
1860                bridge.revoke_pending_user_message(&second_id).await,
1861                PendingUserMessageMutationResult::AlreadyDelivered
1862            );
1863            assert_eq!(
1864                bridge
1865                    .replace_pending_user_message(
1866                        &second_id,
1867                        "too late".to_string(),
1868                        serde_json::json!("too late"),
1869                    )
1870                    .await,
1871                PendingUserMessageMutationResult::AlreadyDelivered
1872            );
1873            assert_eq!(
1874                bridge.revoke_pending_user_message("missing").await,
1875                PendingUserMessageMutationResult::UnknownMessageId
1876            );
1877        });
1878    }
1879
1880    #[test]
1881    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1882        let runtime = tokio::runtime::Builder::new_current_thread()
1883            .enable_all()
1884            .build()
1885            .unwrap();
1886        runtime.block_on(async {
1887            let bridge = test_bridge();
1888            let first_id = bridge
1889                .push_pending_user_message(
1890                    "first".to_string(),
1891                    serde_json::json!("first"),
1892                    "finish_step",
1893                )
1894                .await;
1895            let second_id = bridge
1896                .push_pending_user_message(
1897                    "second".to_string(),
1898                    serde_json::json!("second"),
1899                    "finish_step",
1900                )
1901                .await;
1902            assert_eq!(
1903                bridge
1904                    .replace_pending_user_message(
1905                        &first_id,
1906                        "first edited".to_string(),
1907                        serde_json::json!("first edited"),
1908                    )
1909                    .await,
1910                PendingUserMessageMutationResult::Mutated
1911            );
1912
1913            let delivered = bridge
1914                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1915                .await;
1916            assert_eq!(
1917                delivered
1918                    .iter()
1919                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1920                    .collect::<Vec<_>>(),
1921                vec![
1922                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1923                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
1924                ]
1925            );
1926        });
1927    }
1928
1929    #[test]
1930    fn pending_user_message_state_survives_bridge_replacement() {
1931        let runtime = tokio::runtime::Builder::new_current_thread()
1932            .enable_all()
1933            .build()
1934            .unwrap();
1935        runtime.block_on(async {
1936            let bridge = test_bridge();
1937            let revoked_id = bridge
1938                .push_pending_user_message(
1939                    "revoke me".to_string(),
1940                    serde_json::json!("revoke me"),
1941                    "audit_only",
1942                )
1943                .await;
1944            let delivered_id = bridge
1945                .push_pending_user_message(
1946                    "deliver me".to_string(),
1947                    serde_json::json!("deliver me"),
1948                    "audit_only",
1949                )
1950                .await;
1951            assert_eq!(
1952                bridge.revoke_pending_user_message(&revoked_id).await,
1953                PendingUserMessageMutationResult::Mutated
1954            );
1955            bridge.cancelled.store(true, Ordering::SeqCst);
1956
1957            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1958            assert_eq!(
1959                replacement_bridge
1960                    .revoke_pending_user_message(&revoked_id)
1961                    .await,
1962                PendingUserMessageMutationResult::AlreadyRevoked
1963            );
1964            let delivered = replacement_bridge
1965                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1966                .await;
1967            assert_eq!(delivered.len(), 1);
1968            assert_eq!(delivered[0].message_id, delivered_id);
1969            assert_eq!(delivered[0].content, "deliver me");
1970            assert_eq!(
1971                bridge.revoke_pending_user_message(&delivered_id).await,
1972                PendingUserMessageMutationResult::AlreadyDelivered
1973            );
1974        });
1975    }
1976
1977    #[test]
1978    fn queued_transcript_injections_preserve_user_reminder_separation() {
1979        let runtime = tokio::runtime::Builder::new_current_thread()
1980            .enable_all()
1981            .build()
1982            .unwrap();
1983        runtime.block_on(async {
1984            let bridge = test_bridge();
1985            bridge
1986                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1987                .await;
1988            let reminder_id = bridge
1989                .push_queued_session_remind_from_params(&serde_json::json!({
1990                    "body": "Host-provided ambient context.",
1991                    "tags": ["host"],
1992                    "dedupe_key": "host-context",
1993                    "ttl_turns": 2,
1994                    "mode": "audit_only",
1995                    "_meta": {"harn": {"source": "test"}},
1996                }))
1997                .await
1998                .expect("valid reminder");
1999
2000            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2001            assert_eq!(finish_step.len(), 1);
2002            assert_eq!(finish_step[0].content, "human follow-up");
2003
2004            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2005            assert!(no_user_messages.is_empty());
2006
2007            let injections = bridge
2008                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2009                .await;
2010            assert_eq!(injections.len(), 1);
2011            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2012                panic!("expected queued reminder");
2013            };
2014            assert_eq!(reminder.reminder.id, reminder_id);
2015            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2016            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2017            assert_eq!(
2018                reminder.reminder.dedupe_key.as_deref(),
2019                Some("host-context")
2020            );
2021            assert_eq!(reminder.reminder.ttl_turns, Some(2));
2022            assert_eq!(
2023                reminder.reminder.source,
2024                crate::llm::helpers::ReminderSource::Bridge
2025            );
2026        });
2027    }
2028
2029    #[test]
2030    fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2031        let runtime = tokio::runtime::Builder::new_current_thread()
2032            .enable_all()
2033            .build()
2034            .unwrap();
2035        runtime.block_on(async {
2036            let bridge = test_bridge();
2037            let message_id = bridge
2038                .push_pending_user_message(
2039                    "human follow-up".to_string(),
2040                    serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2041                    "finish_step",
2042                )
2043                .await;
2044            let reminder_id = bridge
2045                .push_queued_session_remind_from_params(&serde_json::json!({
2046                    "id": "rem-test",
2047                    "body": "Host reminder",
2048                    "tags": ["host"],
2049                    "dedupe_key": "host-reminder",
2050                    "ttl_turns": 2,
2051                    "mode": "interrupt_immediate",
2052                }))
2053                .await
2054                .expect("valid session/remind payload");
2055
2056            let pending = bridge.pending_injections_json().await;
2057            assert_eq!(pending["pendingCount"], 2);
2058            assert_eq!(pending["injections"][0]["kind"], "user");
2059            assert_eq!(pending["injections"][0]["id"], message_id);
2060            assert_eq!(pending["injections"][0]["messageId"], message_id);
2061            assert_eq!(pending["injections"][0]["mode"], "finish_step");
2062            assert_eq!(pending["injections"][0]["position"], 0);
2063            assert_eq!(pending["injections"][1]["kind"], "reminder");
2064            assert_eq!(pending["injections"][1]["id"], reminder_id);
2065            assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2066            assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2067            assert_eq!(pending["injections"][1]["body"], "Host reminder");
2068            assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2069            assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2070            assert_eq!(pending["injections"][1]["position"], 1);
2071        });
2072    }
2073
2074    #[test]
2075    fn pending_reminders_support_revoke_and_delivery_states() {
2076        let runtime = tokio::runtime::Builder::new_current_thread()
2077            .enable_all()
2078            .build()
2079            .unwrap();
2080        runtime.block_on(async {
2081            let bridge = test_bridge();
2082            let revoked_id = bridge
2083                .push_queued_session_remind_from_params(&serde_json::json!({
2084                    "id": "rem-revoke",
2085                    "body": "remove me",
2086                    "mode": "finish_step",
2087                }))
2088                .await
2089                .expect("valid session/remind payload");
2090            let delivered_id = bridge
2091                .push_queued_session_remind_from_params(&serde_json::json!({
2092                    "id": "rem-deliver",
2093                    "body": "deliver me",
2094                    "mode": "finish_step",
2095                }))
2096                .await
2097                .expect("valid session/remind payload");
2098
2099            assert_eq!(
2100                bridge.revoke_pending_reminder(&revoked_id).await,
2101                PendingReminderMutationResult::Mutated
2102            );
2103            assert_eq!(
2104                bridge.revoke_pending_reminder(&revoked_id).await,
2105                PendingReminderMutationResult::AlreadyRevoked
2106            );
2107
2108            let pending = bridge.pending_injections_json().await;
2109            assert_eq!(pending["pendingCount"], 1);
2110            assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2111
2112            let delivered = bridge
2113                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2114                .await;
2115            assert_eq!(delivered.len(), 1);
2116            let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2117                panic!("expected delivered reminder");
2118            };
2119            assert_eq!(reminder.reminder.id, delivered_id);
2120
2121            assert_eq!(
2122                bridge.revoke_pending_reminder(&delivered_id).await,
2123                PendingReminderMutationResult::AlreadyDelivered
2124            );
2125            assert_eq!(
2126                bridge.revoke_pending_reminder("missing").await,
2127                PendingReminderMutationResult::UnknownReminderId
2128            );
2129        });
2130    }
2131
2132    #[test]
2133    fn bridge_remind_modes_honor_delivery_checkpoints() {
2134        let runtime = tokio::runtime::Builder::new_current_thread()
2135            .enable_all()
2136            .build()
2137            .unwrap();
2138        runtime.block_on(async {
2139            let cases = [
2140                (
2141                    "interrupt_immediate",
2142                    DeliveryCheckpoint::InterruptImmediate,
2143                    DeliveryCheckpoint::AfterCurrentOperation,
2144                ),
2145                (
2146                    "finish_step",
2147                    DeliveryCheckpoint::AfterCurrentOperation,
2148                    DeliveryCheckpoint::EndOfInteraction,
2149                ),
2150                (
2151                    "audit_only",
2152                    DeliveryCheckpoint::EndOfInteraction,
2153                    DeliveryCheckpoint::InterruptImmediate,
2154                ),
2155            ];
2156
2157            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2158                let bridge = test_bridge();
2159                bridge
2160                    .push_queued_session_remind_from_params(&serde_json::json!({
2161                        "body": format!("Reminder for {mode}"),
2162                        "mode": mode,
2163                    }))
2164                    .await
2165                    .expect("valid session/remind payload");
2166
2167                let premature = bridge
2168                    .take_queued_transcript_injections_for(wrong_checkpoint)
2169                    .await;
2170                assert!(
2171                    premature.is_empty(),
2172                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2173                );
2174
2175                let delivered = bridge
2176                    .take_queued_transcript_injections_for(expected_checkpoint)
2177                    .await;
2178                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2179                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2180                    panic!("expected reminder for {mode}");
2181                };
2182                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2183            }
2184        });
2185    }
2186
2187    #[test]
2188    fn session_remind_validation_rejects_user_message_shape() {
2189        let err = queued_session_remind_from_params(&serde_json::json!({
2190            "content": "this is still a user message",
2191            "mode": "interrupt_immediate",
2192        }))
2193        .expect_err("session/remind must require a reminder body");
2194        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2195        assert!(err.contains("body"));
2196    }
2197
2198    #[test]
2199    fn session_remind_validation_rejects_unknown_options_separately() {
2200        let err = queued_session_remind_from_params(&serde_json::json!({
2201            "body": "valid body",
2202            "unknown_host_field": true,
2203        }))
2204        .expect_err("session/remind must reject unknown top-level fields");
2205        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2206        assert!(err.contains("unknown_host_field"));
2207    }
2208
2209    #[test]
2210    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2211        let err = queued_session_remind_from_params(&serde_json::json!({
2212            "body": "valid body",
2213            "propagate": "workspace",
2214        }))
2215        .expect_err("session/remind must reject unknown propagate values");
2216        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2217        assert!(err.contains("propagate"));
2218    }
2219
2220    #[test]
2221    fn test_json_result_to_vm_value_string() {
2222        let val = serde_json::json!("hello");
2223        let vm_val = json_result_to_vm_value(&val);
2224        assert_eq!(vm_val.display(), "hello");
2225    }
2226
2227    #[test]
2228    fn test_json_result_to_vm_value_dict() {
2229        let val = serde_json::json!({"name": "test", "count": 42});
2230        let vm_val = json_result_to_vm_value(&val);
2231        let VmValue::Dict(d) = &vm_val else {
2232            unreachable!("Expected Dict, got {:?}", vm_val);
2233        };
2234        assert_eq!(d.get("name").unwrap().display(), "test");
2235        assert_eq!(d.get("count").unwrap().display(), "42");
2236    }
2237
2238    #[test]
2239    fn test_json_result_to_vm_value_null() {
2240        let val = serde_json::json!(null);
2241        let vm_val = json_result_to_vm_value(&val);
2242        assert!(matches!(vm_val, VmValue::Nil));
2243    }
2244
2245    #[test]
2246    fn test_json_result_to_vm_value_nested() {
2247        let val = serde_json::json!({
2248            "text": "response",
2249            "tool_calls": [
2250                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2251            ],
2252            "input_tokens": 100,
2253            "output_tokens": 50,
2254        });
2255        let vm_val = json_result_to_vm_value(&val);
2256        let VmValue::Dict(d) = &vm_val else {
2257            unreachable!("Expected Dict, got {:?}", vm_val);
2258        };
2259        assert_eq!(d.get("text").unwrap().display(), "response");
2260        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2261            unreachable!("Expected List for tool_calls");
2262        };
2263        assert_eq!(list.len(), 1);
2264    }
2265
2266    #[test]
2267    fn parse_host_tools_list_accepts_object_wrapper() {
2268        let tools = parse_host_tools_list_response(serde_json::json!({
2269            "tools": [
2270                {
2271                    "name": "Read",
2272                    "description": "Read a file",
2273                    "schema": {"type": "object"},
2274                }
2275            ]
2276        }))
2277        .expect("tool list");
2278
2279        assert_eq!(tools.len(), 1);
2280        assert_eq!(tools[0]["name"], "Read");
2281        assert_eq!(tools[0]["deprecated"], false);
2282    }
2283
2284    #[test]
2285    fn parse_host_tools_list_accepts_compat_fields() {
2286        let tools = parse_host_tools_list_response(serde_json::json!({
2287            "result": {
2288                "tools": [
2289                    {
2290                        "name": "Edit",
2291                        "short_description": "Apply an edit",
2292                        "input_schema": {"type": "object"},
2293                        "deprecated": true,
2294                    }
2295                ]
2296            }
2297        }))
2298        .expect("tool list");
2299
2300        assert_eq!(tools[0]["description"], "Apply an edit");
2301        assert_eq!(tools[0]["schema"]["type"], "object");
2302        assert_eq!(tools[0]["deprecated"], true);
2303    }
2304
2305    #[test]
2306    fn parse_host_tools_list_requires_tool_names() {
2307        let err = parse_host_tools_list_response(serde_json::json!({
2308            "tools": [
2309                {"description": "missing name"}
2310            ]
2311        }))
2312        .expect_err("expected error");
2313        assert!(err
2314            .to_string()
2315            .contains("host/tools/list: every tool must include a string `name`"));
2316    }
2317
2318    #[test]
2319    fn test_timeout_duration() {
2320        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2321    }
2322}