Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26/// Default timeout for bridge calls (5 minutes).
27const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32    Arc::new(move |line: &str| {
33        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34        let mut stdout = std::io::stdout().lock();
35        stdout
36            .write_all(line.as_bytes())
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .write_all(b"\n")
40            .map_err(|e| format!("Bridge write error: {e}"))?;
41        stdout
42            .flush()
43            .map_err(|e| format!("Bridge flush error: {e}"))?;
44        Ok(())
45    })
46}
47
48/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
49///
50/// The bridge sends requests to the host on stdout and receives responses
51/// on stdin. A background task reads stdin and dispatches responses to
52/// waiting callers by request ID. All stdout writes are serialized through
53/// a mutex to prevent interleaving.
54pub struct HostBridge {
55    next_id: AtomicU64,
56    /// Pending request waiters, keyed by JSON-RPC id.
57    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58    /// Whether the host has sent a cancel notification.
59    cancelled: Arc<AtomicBool>,
60    /// Wakes pending host calls when cancellation arrives.
61    cancel_notify: Arc<Notify>,
62    /// Transport writer used to send JSON-RPC to the host.
63    writer: HostBridgeWriter,
64    /// ACP session ID (set in ACP mode for session-scoped notifications).
65    session_id: std::sync::Mutex<String>,
66    /// Name of the currently executing Harn script (without .harn suffix).
67    script_name: std::sync::Mutex<String>,
68    /// Transcript injections queued by the host while a run is active.
69    queued_transcript_injections: HostBridgeInjectionState,
70    /// Host-triggered resume signal for daemon agents.
71    resume_requested: Arc<AtomicBool>,
72    /// Host-triggered skill-registry invalidation signal. Set when the
73    /// host sends a `skills/update` notification; consumed by the CLI
74    /// between runs (watch mode, long-running agents) to rebuild the
75    /// layered skill catalog from its current filesystem + host state.
76    skills_reload_requested: Arc<AtomicBool>,
77    /// Whether the current daemon-mode agent loop is blocked in idle wait.
78    daemon_idle: Arc<AtomicBool>,
79    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
80    /// finalize during this prompt. Read once by the ACP adapter when the
81    /// pipeline returns and populated by `host_agent_session_finalize`.
82    /// Pipelines that don't run an agent loop leave this `None`, in which
83    /// case the adapter falls back to `end_turn`.
84    prompt_stop_reason: std::sync::Mutex<Option<String>>,
85    /// Per-call visible assistant text state for call_progress notifications.
86    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87    /// Whether an LLM call's deltas should be exposed to end users while streaming.
88    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89    /// Optional in-process host-module backend used by `harn playground`.
90    in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94    module_path: PathBuf,
95    exported_functions: BTreeMap<String, Arc<VmClosure>>,
96    vm: Vm,
97}
98
99impl InProcessHost {
100    /// Box-pin'd to break the static recursion between the VM's hot dispatch
101    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
102    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
103    /// at this slow-path boundary keeps the recursion satisfied without
104    /// allocating per call in the hot per-callback path.
105    fn dispatch<'a>(
106        &'a self,
107        method: &'a str,
108        params: serde_json::Value,
109    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110        Box::pin(async move {
111            match method {
112                "builtin_call" => {
113                    let name = params
114                        .get("name")
115                        .and_then(|value| value.as_str())
116                        .unwrap_or_default();
117                    let args = params
118                        .get("args")
119                        .and_then(|value| value.as_array())
120                        .cloned()
121                        .unwrap_or_default()
122                        .into_iter()
123                        .map(|value| json_result_to_vm_value(&value))
124                        .collect::<Vec<_>>();
125                    self.invoke_export(name, &args).await
126                }
127                "host/tools/list" => self
128                    .invoke_optional_export("host_tools_list", &[])
129                    .await
130                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131                "session/request_permission" => self.request_permission(params).await,
132                other => Err(VmError::Runtime(format!(
133                    "playground host backend does not implement bridge method '{other}'"
134                ))),
135            }
136        })
137    }
138
139    async fn invoke_export(
140        &self,
141        name: &str,
142        args: &[VmValue],
143    ) -> Result<serde_json::Value, VmError> {
144        let Some(closure) = self.exported_functions.get(name) else {
145            return Err(VmError::Runtime(format!(
146                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147                self.module_path.display()
148            )));
149        };
150
151        let mut vm = self.vm.child_vm_for_host();
152        let result = vm.call_closure_pub(closure, args).await?;
153        Ok(crate::llm::vm_value_to_json(&result))
154    }
155
156    async fn invoke_optional_export(
157        &self,
158        name: &str,
159        args: &[VmValue],
160    ) -> Result<Option<serde_json::Value>, VmError> {
161        if !self.exported_functions.contains_key(name) {
162            return Ok(None);
163        }
164        self.invoke_export(name, args).await.map(Some)
165    }
166
167    async fn request_permission(
168        &self,
169        params: serde_json::Value,
170    ) -> Result<serde_json::Value, VmError> {
171        // No exported `request_permission` means the playground host has no
172        // approval policy, so it grants through the canonical ACP option.
173        let Some(closure) = self.exported_functions.get("request_permission") else {
174            return Ok(crate::llm::acp_permission::allow_response());
175        };
176
177        let tool_call = params.get("toolCall");
178        let tool_name = tool_call
179            .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182            .and_then(|value| value.as_str())
183            .unwrap_or_default();
184        let tool_args = tool_call
185            .and_then(|tool_call| tool_call.get("rawInput"))
186            .map(json_result_to_vm_value)
187            .unwrap_or(VmValue::Nil);
188        let full_payload = json_result_to_vm_value(&params);
189
190        let arg_count = closure.func.params.len();
191        let args = if arg_count >= 3 {
192            vec![
193                VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
194                tool_args,
195                full_payload,
196            ]
197        } else if arg_count == 2 {
198            vec![
199                VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
200                tool_args,
201            ]
202        } else if arg_count == 1 {
203            vec![full_payload]
204        } else {
205            Vec::new()
206        };
207
208        let mut vm = self.vm.child_vm_for_host();
209        let result = vm.call_closure_pub(closure, &args).await?;
210        // Translate the script's verdict into a canonical ACP response
211        // (`{ outcome: { outcome: "selected" | "cancelled", optionId? } }`).
212        // The script API stays ergonomic — bool / string-reason / dict — but
213        // the wire shape is canonical.
214        let payload = match result {
215            VmValue::Bool(granted) => {
216                if granted {
217                    crate::llm::acp_permission::allow_response()
218                } else {
219                    crate::llm::acp_permission::reject_response(None)
220                }
221            }
222            VmValue::String(reason) if !reason.is_empty() => {
223                crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224            }
225            other => {
226                let json = crate::llm::vm_value_to_json(&other);
227                if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228                    if granted {
229                        crate::llm::acp_permission::allow_response()
230                    } else {
231                        crate::llm::acp_permission::reject_response(
232                            json.get("reason")
233                                .and_then(|value| value.as_str())
234                                .map(str::to_string),
235                        )
236                    }
237                } else if json.get("outcome").is_some() {
238                    // The script already returned a canonical-shaped outcome.
239                    json
240                } else if other.is_truthy() {
241                    crate::llm::acp_permission::allow_response()
242                } else {
243                    crate::llm::acp_permission::reject_response(None)
244                }
245            }
246        };
247        Ok(payload)
248    }
249}
250
251/// How a queued bridge injection is delivered into the agent loop.
252///
253/// `AuditOnly` was previously called `WaitForCompletion`; the rename is
254/// truth-in-advertising (harn#2212). These injections drain at
255/// `loop_exit`, *after* the last LLM call has returned — so they land in
256/// the transcript audit but are **never rendered into a model prompt**.
257/// Hosts that want the model to react to the reminder on its final
258/// iteration should use `FinishStep` instead, which drains at every
259/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
260#[derive(Clone, Copy, Debug, PartialEq, Eq)]
261pub enum QueuedUserMessageMode {
262    InterruptImmediate,
263    FinishStep,
264    AuditOnly,
265}
266
267#[derive(Clone, Copy, Debug, PartialEq, Eq)]
268pub enum DeliveryCheckpoint {
269    InterruptImmediate,
270    AfterCurrentOperation,
271    EndOfInteraction,
272}
273
274impl QueuedUserMessageMode {
275    fn from_str(value: &str) -> Self {
276        match value {
277            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
278            // `steer` is the ACP `session/inject` alias for mid-turn
279            // user-message delivery at the next tool boundary; it maps to
280            // the same `FinishStep` checkpoint as `finish_step`.
281            "finish_step" | "after_current_operation" | "steer" => Self::FinishStep,
282            // `queue` is the explicit ACP alias for the audit-only path.
283            "queue" => Self::AuditOnly,
284            // Unknown / missing modes fall through to the safest option:
285            // record for audit, do not preempt the loop. Pre-#2212 hosts
286            // that send `wait_for_completion` are caught by this arm —
287            // the canonical name is `audit_only` going forward.
288            _ => Self::AuditOnly,
289        }
290    }
291
292    fn as_str(self) -> &'static str {
293        match self {
294            Self::InterruptImmediate => "interrupt_immediate",
295            Self::FinishStep => "finish_step",
296            Self::AuditOnly => "audit_only",
297        }
298    }
299}
300
301#[derive(Clone, Debug, PartialEq, Eq)]
302pub struct QueuedUserMessage {
303    pub message_id: String,
304    pub content: String,
305    pub transcript_content: serde_json::Value,
306    pub mode: QueuedUserMessageMode,
307}
308
309#[derive(Clone, Debug, PartialEq, Eq)]
310pub struct QueuedReminder {
311    pub reminder: crate::llm::helpers::SystemReminder,
312    pub mode: QueuedUserMessageMode,
313}
314
315#[derive(Clone, Debug, PartialEq, Eq)]
316pub enum QueuedTranscriptInjection {
317    User(QueuedUserMessage),
318    Reminder(QueuedReminder),
319}
320
321#[derive(Debug, Default)]
322struct QueuedTranscriptInjections {
323    queue: VecDeque<QueuedTranscriptInjection>,
324    revoked_user_message_ids: HashSet<String>,
325    delivered_user_message_ids: HashSet<String>,
326    revoked_reminder_ids: HashSet<String>,
327    delivered_reminder_ids: HashSet<String>,
328}
329
330#[derive(Clone, Debug, Default)]
331pub struct HostBridgeInjectionState {
332    inner: Arc<Mutex<QueuedTranscriptInjections>>,
333}
334
335#[derive(Clone, Copy, Debug, PartialEq, Eq)]
336pub enum PendingUserMessageMutationResult {
337    Mutated,
338    AlreadyRevoked,
339    AlreadyDelivered,
340    UnknownMessageId,
341}
342
343impl QueuedTranscriptInjection {
344    fn mode(&self) -> QueuedUserMessageMode {
345        match self {
346            Self::User(message) => message.mode,
347            Self::Reminder(reminder) => reminder.mode,
348        }
349    }
350
351    fn pending_json(&self, position: usize) -> serde_json::Value {
352        match self {
353            Self::User(message) => serde_json::json!({
354                "kind": "user",
355                "id": message.message_id,
356                "messageId": message.message_id,
357                "mode": message.mode.as_str(),
358                "position": position,
359                "content": message.transcript_content,
360            }),
361            Self::Reminder(reminder) => serde_json::json!({
362                "kind": "reminder",
363                "id": reminder.reminder.id,
364                "reminderId": reminder.reminder.id,
365                "mode": reminder.mode.as_str(),
366                "position": position,
367                "body": reminder.reminder.body,
368                "tags": reminder.reminder.tags,
369                "dedupeKey": reminder.reminder.dedupe_key,
370                "ttlTurns": reminder.reminder.ttl_turns,
371                "preserveOnCompact": reminder.reminder.preserve_on_compact,
372                "propagate": reminder.reminder.propagate.as_str(),
373                "roleHint": reminder.reminder.role_hint.as_str(),
374                "source": reminder.reminder.source.as_str(),
375                "firedAtTurn": reminder.reminder.fired_at_turn,
376                "originatingAgentId": reminder.reminder.originating_agent_id,
377            }),
378        }
379    }
380}
381
382#[derive(Clone, Copy, Debug, PartialEq, Eq)]
383pub enum PendingReminderMutationResult {
384    Mutated,
385    AlreadyRevoked,
386    AlreadyDelivered,
387    UnknownReminderId,
388}
389
390fn new_inject_message_id() -> String {
391    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
392}
393
394impl HostBridgeInjectionState {
395    pub fn new() -> Self {
396        Self::default()
397    }
398
399    pub async fn push_pending_user_message(
400        &self,
401        content: String,
402        transcript_content: serde_json::Value,
403        mode: &str,
404    ) -> String {
405        let message_id = new_inject_message_id();
406        self.inner
407            .lock()
408            .await
409            .queue
410            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
411                message_id: message_id.clone(),
412                content,
413                transcript_content,
414                mode: QueuedUserMessageMode::from_str(mode),
415            }));
416        message_id
417    }
418
419    pub async fn revoke_pending_user_message(
420        &self,
421        message_id: &str,
422    ) -> PendingUserMessageMutationResult {
423        let mut state = self.inner.lock().await;
424        let mut retained = VecDeque::new();
425        let mut revoked = false;
426        while let Some(injection) = state.queue.pop_front() {
427            match &injection {
428                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
429                    revoked = true;
430                }
431                _ => retained.push_back(injection),
432            }
433        }
434        state.queue = retained;
435        if revoked {
436            state
437                .revoked_user_message_ids
438                .insert(message_id.to_string());
439            return PendingUserMessageMutationResult::Mutated;
440        }
441        if state.revoked_user_message_ids.contains(message_id) {
442            PendingUserMessageMutationResult::AlreadyRevoked
443        } else if state.delivered_user_message_ids.contains(message_id) {
444            PendingUserMessageMutationResult::AlreadyDelivered
445        } else {
446            PendingUserMessageMutationResult::UnknownMessageId
447        }
448    }
449
450    pub async fn revoke_pending_reminder(
451        &self,
452        reminder_id: &str,
453    ) -> PendingReminderMutationResult {
454        let mut state = self.inner.lock().await;
455        let mut retained = VecDeque::new();
456        let mut revoked = false;
457        while let Some(injection) = state.queue.pop_front() {
458            match &injection {
459                QueuedTranscriptInjection::Reminder(reminder)
460                    if reminder.reminder.id == reminder_id =>
461                {
462                    revoked = true;
463                }
464                _ => retained.push_back(injection),
465            }
466        }
467        state.queue = retained;
468        if revoked {
469            state.revoked_reminder_ids.insert(reminder_id.to_string());
470            return PendingReminderMutationResult::Mutated;
471        }
472        if state.revoked_reminder_ids.contains(reminder_id) {
473            PendingReminderMutationResult::AlreadyRevoked
474        } else if state.delivered_reminder_ids.contains(reminder_id) {
475            PendingReminderMutationResult::AlreadyDelivered
476        } else {
477            PendingReminderMutationResult::UnknownReminderId
478        }
479    }
480
481    pub async fn replace_pending_user_message(
482        &self,
483        message_id: &str,
484        content: String,
485        transcript_content: serde_json::Value,
486    ) -> PendingUserMessageMutationResult {
487        let mut state = self.inner.lock().await;
488        for injection in &mut state.queue {
489            if let QueuedTranscriptInjection::User(message) = injection {
490                if message.message_id == message_id {
491                    message.content = content;
492                    message.transcript_content = transcript_content;
493                    return PendingUserMessageMutationResult::Mutated;
494                }
495            }
496        }
497        if state.revoked_user_message_ids.contains(message_id) {
498            PendingUserMessageMutationResult::AlreadyRevoked
499        } else if state.delivered_user_message_ids.contains(message_id) {
500            PendingUserMessageMutationResult::AlreadyDelivered
501        } else {
502            PendingUserMessageMutationResult::UnknownMessageId
503        }
504    }
505
506    async fn push_session_reminder(&self, reminder: QueuedReminder) {
507        self.inner
508            .lock()
509            .await
510            .queue
511            .push_back(QueuedTranscriptInjection::Reminder(reminder));
512    }
513
514    pub async fn pending_injections_json(&self) -> serde_json::Value {
515        let state = self.inner.lock().await;
516        let injections = state
517            .queue
518            .iter()
519            .enumerate()
520            .map(|(position, injection)| injection.pending_json(position))
521            .collect::<Vec<_>>();
522        serde_json::json!({
523            "pendingCount": injections.len(),
524            "injections": injections,
525        })
526    }
527}
528
529fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
530    format!(
531        "{}: {}",
532        Code::ReminderUnknownOption.as_str(),
533        message.as_ref()
534    )
535}
536
537fn session_remind_shape_error(message: impl AsRef<str>) -> String {
538    format!(
539        "{}: {}",
540        Code::ReminderInvalidShape.as_str(),
541        message.as_ref()
542    )
543}
544
545fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
546    format!(
547        "{}: {}",
548        Code::ReminderUnknownPropagate.as_str(),
549        message.as_ref()
550    )
551}
552
553fn string_field(
554    map: &serde_json::Map<String, serde_json::Value>,
555    key: &str,
556    required: bool,
557) -> Result<Option<String>, String> {
558    match map.get(key) {
559        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
560            format!("`{key}` must be a non-empty string"),
561        )),
562        None | Some(serde_json::Value::Null) => Ok(None),
563        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
564            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
565        ),
566        Some(serde_json::Value::String(value)) => {
567            let trimmed = value.trim();
568            if trimmed.is_empty() {
569                Ok(None)
570            } else {
571                Ok(Some(trimmed.to_string()))
572            }
573        }
574        Some(other) => Err(session_remind_shape_error(format!(
575            "`{key}` must be a string, got {other}"
576        ))),
577    }
578}
579
580fn bool_field(
581    map: &serde_json::Map<String, serde_json::Value>,
582    key: &str,
583) -> Result<Option<bool>, String> {
584    match map.get(key) {
585        None | Some(serde_json::Value::Null) => Ok(None),
586        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
587        Some(other) => Err(session_remind_shape_error(format!(
588            "`{key}` must be a bool, got {other}"
589        ))),
590    }
591}
592
593fn int_field(
594    map: &serde_json::Map<String, serde_json::Value>,
595    key: &str,
596) -> Result<Option<i64>, String> {
597    match map.get(key) {
598        None | Some(serde_json::Value::Null) => Ok(None),
599        Some(serde_json::Value::Number(value)) => {
600            let Some(value) = value.as_i64() else {
601                return Err(session_remind_shape_error(format!(
602                    "`{key}` must be an integer"
603                )));
604            };
605            Ok(Some(value))
606        }
607        Some(other) => Err(session_remind_shape_error(format!(
608            "`{key}` must be an int, got {other}"
609        ))),
610    }
611}
612
613fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
614    let Some(value) = map.get("tags") else {
615        return Ok(Vec::new());
616    };
617    if value.is_null() {
618        return Ok(Vec::new());
619    }
620    let Some(values) = value.as_array() else {
621        return Err(session_remind_shape_error("`tags` must be a list"));
622    };
623    let mut tags = Vec::new();
624    for value in values {
625        let Some(tag) = value.as_str() else {
626            return Err(session_remind_shape_error(format!(
627                "`tags` entries must be strings, got {value}"
628            )));
629        };
630        let tag = tag.trim();
631        if tag.is_empty() {
632            return Err(session_remind_shape_error(
633                "`tags` entries must be non-empty strings",
634            ));
635        }
636        if !tags.iter().any(|existing| existing == tag) {
637            tags.push(tag.to_string());
638        }
639    }
640    Ok(tags)
641}
642
643fn session_remind_payload_from_value(
644    value: &serde_json::Value,
645) -> Result<crate::llm::helpers::SystemReminder, String> {
646    let Some(map) = value.as_object() else {
647        return Err(session_remind_shape_error(
648            "session/remind payload must be a reminder object",
649        ));
650    };
651    const ALLOWED: &[&str] = &[
652        "_meta",
653        "body",
654        "dedupe_key",
655        "fired_at_turn",
656        "id",
657        "preserve_on_compact",
658        "propagate",
659        "role_hint",
660        "source",
661        "tags",
662        "ttl_turns",
663    ];
664    let unknown = map
665        .keys()
666        .filter(|key| !ALLOWED.contains(&key.as_str()))
667        .map(String::as_str)
668        .collect::<Vec<_>>();
669    if !unknown.is_empty() {
670        if unknown.contains(&"content") {
671            return Err(session_remind_shape_error(
672                "session/remind expects reminder `body`, not user-message `content`",
673            ));
674        }
675        return Err(reminder_unknown_option_error(format!(
676            "unknown reminder option(s): {}",
677            unknown.join(", ")
678        )));
679    }
680    if let Some(meta) = map.get("_meta") {
681        if !meta.is_null() && !meta.is_object() {
682            return Err(session_remind_shape_error("`_meta` must be an object"));
683        }
684    }
685    let ttl_turns = int_field(map, "ttl_turns")?;
686    if let Some(value) = ttl_turns {
687        if value <= 0 {
688            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
689        }
690    }
691    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
692    if fired_at_turn < 0 {
693        return Err(session_remind_shape_error(
694            "`fired_at_turn` must be >= 0 when provided",
695        ));
696    }
697    match string_field(map, "source", false)?.as_deref() {
698        None | Some("bridge") => {}
699        Some(_) => {
700            return Err(session_remind_shape_error(
701                "`source` for session/remind must be bridge when provided",
702            ))
703        }
704    }
705    let propagate = match string_field(map, "propagate", false)?.as_deref() {
706        None => crate::llm::helpers::ReminderPropagate::Session,
707        Some("all") => crate::llm::helpers::ReminderPropagate::All,
708        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
709        Some("none") => crate::llm::helpers::ReminderPropagate::None,
710        Some(_) => {
711            return Err(reminder_unknown_propagate_error(
712                "`propagate` must be one of all, session, or none",
713            ))
714        }
715    };
716    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
717        None => crate::llm::helpers::ReminderRoleHint::System,
718        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
719        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
720        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
721        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
722        Some(_) => {
723            return Err(session_remind_shape_error(
724                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
725            ))
726        }
727    };
728    Ok(crate::llm::helpers::SystemReminder {
729        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
730        tags: tags_field(map)?,
731        dedupe_key: string_field(map, "dedupe_key", false)?,
732        ttl_turns,
733        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
734        propagate,
735        role_hint,
736        source: crate::llm::helpers::ReminderSource::Bridge,
737        body: string_field(map, "body", true)?.unwrap_or_default(),
738        fired_at_turn,
739        originating_agent_id: None,
740    })
741}
742
743/// Parse the params of a `session/cancel_tool_call` notification and fire
744/// the per-tool-call cancellation. Mirrors the shape used by the public
745/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
746/// regardless of which surface they came through.
747///
748/// Stdio bridges send this as a notification (no id, no response); the
749/// builtin handles request/response semantics in Harn. We deliberately
750/// drop malformed payloads silently because notifications can't reply
751/// with an error — logging would also be too noisy for partial drops.
752fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
753    let session_id = params
754        .get("sessionId")
755        .or_else(|| params.get("session_id"))
756        .and_then(|value| value.as_str())
757        .unwrap_or_default();
758    let call_id = params
759        .get("toolCallId")
760        .or_else(|| params.get("tool_call_id"))
761        .or_else(|| params.get("callId"))
762        .or_else(|| params.get("call_id"))
763        .and_then(|value| value.as_str())
764        .unwrap_or_default();
765    if call_id.is_empty() {
766        return;
767    }
768    let reason = params
769        .get("reason")
770        .and_then(|value| value.as_str())
771        .unwrap_or("host cancelled in-flight tool call")
772        .to_string();
773    let inject_reminder = params
774        .get("injectReminder")
775        .or_else(|| params.get("inject_reminder"))
776        .and_then(|value| value.as_bool())
777        .unwrap_or(true);
778    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
779}
780
781fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
782    let mode = QueuedUserMessageMode::from_str(
783        params
784            .get("mode")
785            .and_then(|value| value.as_str())
786            .unwrap_or("audit_only"),
787    );
788    let reminder_value = if let Some(reminder) = params.get("reminder") {
789        reminder.clone()
790    } else {
791        let Some(params) = params.as_object() else {
792            return Err(session_remind_shape_error(
793                "session/remind params must be an object",
794            ));
795        };
796        let mut reminder = params.clone();
797        reminder.remove("mode");
798        reminder.remove("sessionId");
799        reminder.remove("session_id");
800        serde_json::Value::Object(reminder)
801    };
802    Ok(QueuedReminder {
803        reminder: session_remind_payload_from_value(&reminder_value)?,
804        mode,
805    })
806}
807
808// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
809#[allow(clippy::new_without_default)]
810impl HostBridge {
811    /// Create a new bridge and spawn the stdin reader task.
812    ///
813    /// Must be called within a tokio LocalSet (uses spawn_local for the
814    /// stdin reader since it's single-threaded).
815    pub fn new() -> Self {
816        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
817            Arc::new(Mutex::new(HashMap::new()));
818        let cancelled = Arc::new(AtomicBool::new(false));
819        let cancel_notify = Arc::new(Notify::new());
820        let queued_transcript_injections = HostBridgeInjectionState::default();
821        let resume_requested = Arc::new(AtomicBool::new(false));
822        let skills_reload_requested = Arc::new(AtomicBool::new(false));
823        let daemon_idle = Arc::new(AtomicBool::new(false));
824
825        // Stdin reader: reads JSON-RPC lines and dispatches responses
826        let pending_clone = pending.clone();
827        let cancelled_clone = cancelled.clone();
828        let cancel_notify_clone = cancel_notify.clone();
829        let queued_clone = queued_transcript_injections.clone();
830        let resume_clone = resume_requested.clone();
831        let skills_reload_clone = skills_reload_requested.clone();
832        tokio::task::spawn_local(async move {
833            let stdin = tokio::io::stdin();
834            let reader = tokio::io::BufReader::new(stdin);
835            let mut lines = reader.lines();
836
837            while let Ok(Some(line)) = lines.next_line().await {
838                let line = line.trim().to_string();
839                if line.is_empty() {
840                    continue;
841                }
842
843                let msg: serde_json::Value = match serde_json::from_str(&line) {
844                    Ok(v) => v,
845                    Err(_) => continue,
846                };
847
848                // Notifications have no id; responses have one.
849                if msg.get("id").is_none() {
850                    if let Some(method) = msg["method"].as_str() {
851                        if method == "cancel" {
852                            cancelled_clone.store(true, Ordering::SeqCst);
853                            cancel_notify_clone.notify_waiters();
854                        } else if method == "agent/resume" {
855                            resume_clone.store(true, Ordering::SeqCst);
856                        } else if method == "skills/update" {
857                            skills_reload_clone.store(true, Ordering::SeqCst);
858                        } else if method == "session/remind" {
859                            let params = &msg["params"];
860                            if let Ok(reminder) = queued_session_remind_from_params(params) {
861                                queued_clone.push_session_reminder(reminder).await;
862                            }
863                        } else if method == "session/cancel_tool_call" {
864                            handle_cancel_tool_call_notification(&msg["params"]);
865                        }
866                    }
867                    continue;
868                }
869
870                if let Some(id) = msg["id"].as_u64() {
871                    let mut pending = pending_clone.lock().await;
872                    if let Some(sender) = pending.remove(&id) {
873                        let _ = sender.send(msg);
874                    }
875                }
876            }
877
878            // stdin closed: drop pending senders to cancel waiters.
879            let mut pending = pending_clone.lock().await;
880            pending.clear();
881        });
882
883        Self {
884            next_id: AtomicU64::new(1),
885            pending,
886            cancelled,
887            cancel_notify,
888            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
889            session_id: std::sync::Mutex::new(String::new()),
890            script_name: std::sync::Mutex::new(String::new()),
891            queued_transcript_injections,
892            resume_requested,
893            skills_reload_requested,
894            daemon_idle,
895            prompt_stop_reason: std::sync::Mutex::new(None),
896            visible_call_states: std::sync::Mutex::new(HashMap::new()),
897            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
898            in_process: None,
899        }
900    }
901
902    /// Create a bridge from pre-existing shared state.
903    ///
904    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
905    /// responsible for dispatching responses into `pending`.  This is used
906    /// by ACP mode which already has its own stdin reader.
907    pub fn from_parts(
908        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
909        cancelled: Arc<AtomicBool>,
910        stdout_lock: Arc<std::sync::Mutex<()>>,
911        start_id: u64,
912    ) -> Self {
913        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
914    }
915
916    pub fn from_parts_with_writer(
917        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
918        cancelled: Arc<AtomicBool>,
919        writer: HostBridgeWriter,
920        start_id: u64,
921    ) -> Self {
922        Self::from_parts_with_writer_and_cancel_notify(
923            pending,
924            cancelled,
925            Arc::new(Notify::new()),
926            writer,
927            start_id,
928        )
929    }
930
931    pub fn from_parts_with_writer_and_cancel_notify(
932        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
933        cancelled: Arc<AtomicBool>,
934        cancel_notify: Arc<Notify>,
935        writer: HostBridgeWriter,
936        start_id: u64,
937    ) -> Self {
938        Self::from_parts_with_writer_cancel_notify_and_injection_state(
939            pending,
940            cancelled,
941            cancel_notify,
942            writer,
943            start_id,
944            None,
945        )
946    }
947
948    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
949        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
950        cancelled: Arc<AtomicBool>,
951        cancel_notify: Arc<Notify>,
952        writer: HostBridgeWriter,
953        start_id: u64,
954        injection_state: Option<HostBridgeInjectionState>,
955    ) -> Self {
956        Self {
957            next_id: AtomicU64::new(start_id),
958            pending,
959            cancelled,
960            cancel_notify,
961            writer,
962            session_id: std::sync::Mutex::new(String::new()),
963            script_name: std::sync::Mutex::new(String::new()),
964            queued_transcript_injections: injection_state.unwrap_or_default(),
965            resume_requested: Arc::new(AtomicBool::new(false)),
966            skills_reload_requested: Arc::new(AtomicBool::new(false)),
967            daemon_idle: Arc::new(AtomicBool::new(false)),
968            prompt_stop_reason: std::sync::Mutex::new(None),
969            visible_call_states: std::sync::Mutex::new(HashMap::new()),
970            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
971            in_process: None,
972        }
973    }
974
975    /// Create an in-process host bridge backed by exported functions from a
976    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
977    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
978        let exported_functions = vm.load_module_exports(module_path).await?;
979        Ok(Self {
980            next_id: AtomicU64::new(1),
981            pending: Arc::new(Mutex::new(HashMap::new())),
982            cancelled: Arc::new(AtomicBool::new(false)),
983            cancel_notify: Arc::new(Notify::new()),
984            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
985            session_id: std::sync::Mutex::new(String::new()),
986            script_name: std::sync::Mutex::new(String::new()),
987            queued_transcript_injections: HostBridgeInjectionState::default(),
988            resume_requested: Arc::new(AtomicBool::new(false)),
989            skills_reload_requested: Arc::new(AtomicBool::new(false)),
990            daemon_idle: Arc::new(AtomicBool::new(false)),
991            prompt_stop_reason: std::sync::Mutex::new(None),
992            visible_call_states: std::sync::Mutex::new(HashMap::new()),
993            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
994            in_process: Some(InProcessHost {
995                module_path: module_path.to_path_buf(),
996                exported_functions,
997                vm,
998            }),
999        })
1000    }
1001
1002    /// Set the ACP session ID for session-scoped notifications.
1003    pub fn set_session_id(&self, id: &str) {
1004        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1005    }
1006
1007    /// Set the currently executing script name (without .harn suffix).
1008    pub fn set_script_name(&self, name: &str) {
1009        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1010    }
1011
1012    /// Get the current script name.
1013    fn get_script_name(&self) -> String {
1014        self.script_name
1015            .lock()
1016            .unwrap_or_else(|e| e.into_inner())
1017            .clone()
1018    }
1019
1020    /// Get the session ID.
1021    pub fn get_session_id(&self) -> String {
1022        self.session_id
1023            .lock()
1024            .unwrap_or_else(|e| e.into_inner())
1025            .clone()
1026    }
1027
1028    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
1029    fn write_line(&self, line: &str) -> Result<(), VmError> {
1030        (self.writer)(line).map_err(VmError::Runtime)
1031    }
1032
1033    /// Send a JSON-RPC request to the host and wait for the response.
1034    /// Times out after 5 minutes to prevent deadlocks.
1035    pub async fn call(
1036        &self,
1037        method: &str,
1038        params: serde_json::Value,
1039    ) -> Result<serde_json::Value, VmError> {
1040        if let Some(in_process) = &self.in_process {
1041            return in_process.dispatch(method, params).await;
1042        }
1043
1044        if self.is_cancelled() {
1045            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1046        }
1047
1048        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1049        let cancel_wait = self.cancel_notify.notified();
1050        tokio::pin!(cancel_wait);
1051
1052        let request = crate::jsonrpc::request(id, method, params);
1053
1054        let (tx, rx) = oneshot::channel();
1055        {
1056            let mut pending = self.pending.lock().await;
1057            pending.insert(id, tx);
1058        }
1059
1060        let line = serde_json::to_string(&request)
1061            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1062        if let Err(e) = self.write_line(&line) {
1063            let mut pending = self.pending.lock().await;
1064            pending.remove(&id);
1065            return Err(e);
1066        }
1067
1068        if self.is_cancelled() {
1069            let mut pending = self.pending.lock().await;
1070            pending.remove(&id);
1071            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1072        }
1073
1074        let response = tokio::select! {
1075            result = rx => match result {
1076                Ok(msg) => msg,
1077                Err(_) => {
1078                    // Sender dropped: host closed or stdin reader exited.
1079                    return Err(VmError::Runtime(
1080                        "Bridge: host closed connection before responding".into(),
1081                    ));
1082                }
1083            },
1084            _ = &mut cancel_wait => {
1085                let mut pending = self.pending.lock().await;
1086                pending.remove(&id);
1087                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1088            }
1089            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1090                let mut pending = self.pending.lock().await;
1091                pending.remove(&id);
1092                return Err(VmError::Runtime(format!(
1093                    "Bridge: host did not respond to '{method}' within {}s",
1094                    DEFAULT_TIMEOUT.as_secs()
1095                )));
1096            }
1097        };
1098
1099        if let Some(error) = response.get("error") {
1100            let message = error["message"].as_str().unwrap_or("Unknown host error");
1101            let code = error["code"].as_i64().unwrap_or(-1);
1102            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
1103            if code == -32001 {
1104                return Err(VmError::CategorizedError {
1105                    message: message.to_string(),
1106                    category: ErrorCategory::ToolRejected,
1107                });
1108            }
1109            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1110        }
1111
1112        Ok(response["result"].clone())
1113    }
1114
1115    /// Send a JSON-RPC notification to the host (no response expected).
1116    /// Serialized through the stdout mutex to prevent interleaving.
1117    pub fn notify(&self, method: &str, params: serde_json::Value) {
1118        let notification = crate::jsonrpc::notification(method, params);
1119        if self.in_process.is_some() {
1120            return;
1121        }
1122        if let Ok(line) = serde_json::to_string(&notification) {
1123            let _ = self.write_line(&line);
1124        }
1125    }
1126
1127    /// Check if the host has sent a cancel notification.
1128    pub fn is_cancelled(&self) -> bool {
1129        self.cancelled.load(Ordering::SeqCst)
1130    }
1131
1132    pub fn take_resume_signal(&self) -> bool {
1133        self.resume_requested.swap(false, Ordering::SeqCst)
1134    }
1135
1136    pub fn signal_resume(&self) {
1137        self.resume_requested.store(true, Ordering::SeqCst);
1138    }
1139
1140    pub fn set_daemon_idle(&self, idle: bool) {
1141        self.daemon_idle.store(idle, Ordering::SeqCst);
1142    }
1143
1144    pub fn is_daemon_idle(&self) -> bool {
1145        self.daemon_idle.load(Ordering::SeqCst)
1146    }
1147
1148    /// Record the canonical ACP `stopReason` for the current prompt. The
1149    /// last writer wins, which matches the semantic that an outer
1150    /// `agent_loop` (the one whose result the user observes) always
1151    /// finalizes after any inner loops it spawned.
1152    pub fn set_prompt_stop_reason(&self, reason: &str) {
1153        *self
1154            .prompt_stop_reason
1155            .lock()
1156            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1157    }
1158
1159    /// Consume any prompt stop reason recorded during this prompt. The
1160    /// ACP adapter calls this once after the pipeline returns; pipelines
1161    /// that didn't run an `agent_loop` see `None` and the adapter falls
1162    /// back to `end_turn`.
1163    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1164        self.prompt_stop_reason
1165            .lock()
1166            .unwrap_or_else(|e| e.into_inner())
1167            .take()
1168    }
1169
1170    /// Consume any pending `skills/update` signal the host has sent.
1171    /// Returns `true` exactly once per notification, letting callers
1172    /// trigger a layered-discovery rebuild without polling false
1173    /// positives. See issue #73 for the hot-reload contract.
1174    pub fn take_skills_reload_signal(&self) -> bool {
1175        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1176    }
1177
1178    /// Manually mark the skill catalog as stale. Used by tests and by
1179    /// the CLI when an internal event (e.g. `harn install`) should
1180    /// trigger the same rebuild a `skills/update` notification would.
1181    pub fn signal_skills_reload(&self) {
1182        self.skills_reload_requested.store(true, Ordering::SeqCst);
1183    }
1184
1185    /// Call the host's `skills/list` RPC and return the raw JSON array
1186    /// it responded with. Shape:
1187    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1188    /// The CLI adapter converts each entry into a
1189    /// [`crate::skills::SkillManifestRef`].
1190    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1191        let result = self.call("skills/list", serde_json::json!({})).await?;
1192        match result {
1193            serde_json::Value::Array(items) => Ok(items),
1194            serde_json::Value::Object(map) => match map.get("skills") {
1195                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1196                _ => Err(VmError::Runtime(
1197                    "skills/list: host response must be an array or { skills: [...] }".into(),
1198                )),
1199            },
1200            _ => Err(VmError::Runtime(
1201                "skills/list: unexpected response shape".into(),
1202            )),
1203        }
1204    }
1205
1206    /// Call the host's `host/tools/list` RPC and return normalized tool
1207    /// descriptors. Shape:
1208    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1209    /// The bridge also accepts `{ "tools": [...] }` and
1210    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1211    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1212        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1213        parse_host_tools_list_response(result)
1214    }
1215
1216    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1217    /// raw JSON body so the CLI can inspect both the frontmatter fields
1218    /// and the skill markdown body in whatever shape the host sends.
1219    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1220        self.call("skills/fetch", serde_json::json!({ "id": id }))
1221            .await
1222    }
1223
1224    pub fn injection_state(&self) -> HostBridgeInjectionState {
1225        self.queued_transcript_injections.clone()
1226    }
1227
1228    pub async fn push_pending_user_message(
1229        &self,
1230        content: String,
1231        transcript_content: serde_json::Value,
1232        mode: &str,
1233    ) -> String {
1234        self.queued_transcript_injections
1235            .push_pending_user_message(content, transcript_content, mode)
1236            .await
1237    }
1238
1239    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1240        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1241            .await
1242    }
1243
1244    pub async fn revoke_pending_user_message(
1245        &self,
1246        message_id: &str,
1247    ) -> PendingUserMessageMutationResult {
1248        self.queued_transcript_injections
1249            .revoke_pending_user_message(message_id)
1250            .await
1251    }
1252
1253    pub async fn revoke_pending_reminder(
1254        &self,
1255        reminder_id: &str,
1256    ) -> PendingReminderMutationResult {
1257        self.queued_transcript_injections
1258            .revoke_pending_reminder(reminder_id)
1259            .await
1260    }
1261
1262    pub async fn pending_injections_json(&self) -> serde_json::Value {
1263        self.queued_transcript_injections
1264            .pending_injections_json()
1265            .await
1266    }
1267
1268    pub async fn replace_pending_user_message(
1269        &self,
1270        message_id: &str,
1271        content: String,
1272        transcript_content: serde_json::Value,
1273    ) -> PendingUserMessageMutationResult {
1274        self.queued_transcript_injections
1275            .replace_pending_user_message(message_id, content, transcript_content)
1276            .await
1277    }
1278
1279    pub async fn push_queued_session_remind_from_params(
1280        &self,
1281        params: &serde_json::Value,
1282    ) -> Result<String, String> {
1283        let reminder = queued_session_remind_from_params(params)?;
1284        let reminder_id = reminder.reminder.id.clone();
1285        self.queued_transcript_injections
1286            .push_session_reminder(reminder)
1287            .await;
1288        Ok(reminder_id)
1289    }
1290
1291    pub async fn take_queued_user_messages(
1292        &self,
1293        include_interrupt_immediate: bool,
1294        include_finish_step: bool,
1295        include_audit_only: bool,
1296    ) -> Vec<QueuedUserMessage> {
1297        let mut state = self.queued_transcript_injections.inner.lock().await;
1298        let mut selected = Vec::new();
1299        let mut retained = VecDeque::new();
1300        while let Some(injection) = state.queue.pop_front() {
1301            let should_take = match injection.mode() {
1302                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1303                QueuedUserMessageMode::FinishStep => include_finish_step,
1304                QueuedUserMessageMode::AuditOnly => include_audit_only,
1305            };
1306            match (should_take, injection) {
1307                (true, QueuedTranscriptInjection::User(message)) => {
1308                    state
1309                        .delivered_user_message_ids
1310                        .insert(message.message_id.clone());
1311                    selected.push(message);
1312                }
1313                (_, injection) => retained.push_back(injection),
1314            }
1315        }
1316        state.queue = retained;
1317        selected
1318    }
1319
1320    pub async fn take_queued_transcript_injections(
1321        &self,
1322        include_interrupt_immediate: bool,
1323        include_finish_step: bool,
1324        include_audit_only: bool,
1325    ) -> Vec<QueuedTranscriptInjection> {
1326        let mut state = self.queued_transcript_injections.inner.lock().await;
1327        let mut selected = Vec::new();
1328        let mut retained = VecDeque::new();
1329        while let Some(injection) = state.queue.pop_front() {
1330            let should_take = match injection.mode() {
1331                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1332                QueuedUserMessageMode::FinishStep => include_finish_step,
1333                QueuedUserMessageMode::AuditOnly => include_audit_only,
1334            };
1335            if should_take {
1336                match &injection {
1337                    QueuedTranscriptInjection::User(message) => {
1338                        state
1339                            .delivered_user_message_ids
1340                            .insert(message.message_id.clone());
1341                    }
1342                    QueuedTranscriptInjection::Reminder(reminder) => {
1343                        state
1344                            .delivered_reminder_ids
1345                            .insert(reminder.reminder.id.clone());
1346                    }
1347                }
1348                selected.push(injection);
1349            } else {
1350                retained.push_back(injection);
1351            }
1352        }
1353        state.queue = retained;
1354        selected
1355    }
1356
1357    pub async fn take_queued_user_messages_for(
1358        &self,
1359        checkpoint: DeliveryCheckpoint,
1360    ) -> Vec<QueuedUserMessage> {
1361        match checkpoint {
1362            DeliveryCheckpoint::InterruptImmediate => {
1363                self.take_queued_user_messages(true, false, false).await
1364            }
1365            DeliveryCheckpoint::AfterCurrentOperation => {
1366                self.take_queued_user_messages(false, true, false).await
1367            }
1368            DeliveryCheckpoint::EndOfInteraction => {
1369                self.take_queued_user_messages(false, false, true).await
1370            }
1371        }
1372    }
1373
1374    pub async fn take_queued_transcript_injections_for(
1375        &self,
1376        checkpoint: DeliveryCheckpoint,
1377    ) -> Vec<QueuedTranscriptInjection> {
1378        match checkpoint {
1379            DeliveryCheckpoint::InterruptImmediate => {
1380                self.take_queued_transcript_injections(true, false, false)
1381                    .await
1382            }
1383            DeliveryCheckpoint::AfterCurrentOperation => {
1384                self.take_queued_transcript_injections(false, true, false)
1385                    .await
1386            }
1387            DeliveryCheckpoint::EndOfInteraction => {
1388                self.take_queued_transcript_injections(false, false, true)
1389                    .await
1390            }
1391        }
1392    }
1393
1394    /// Send an output notification (for log/print in bridge mode).
1395    pub fn send_output(&self, text: &str) {
1396        self.notify("output", serde_json::json!({"text": text}));
1397    }
1398
1399    /// Send a progress notification with optional numeric progress and structured data.
1400    pub fn send_progress(
1401        &self,
1402        phase: &str,
1403        message: &str,
1404        progress: Option<i64>,
1405        total: Option<i64>,
1406        data: Option<serde_json::Value>,
1407    ) {
1408        let mut payload = serde_json::json!({"phase": phase, "message": message});
1409        if let Some(p) = progress {
1410            payload["progress"] = serde_json::json!(p);
1411        }
1412        if let Some(t) = total {
1413            payload["total"] = serde_json::json!(t);
1414        }
1415        if let Some(d) = data {
1416            payload["data"] = d;
1417        }
1418        self.notify("progress", payload);
1419    }
1420
1421    /// Send a structured log notification.
1422    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1423        let mut payload = serde_json::json!({"level": level, "message": message});
1424        if let Some(f) = fields {
1425            payload["fields"] = f;
1426        }
1427        self.notify("log", payload);
1428    }
1429
1430    /// Send a `session/update` with `call_start` — signals the beginning of
1431    /// an LLM call, tool call, or builtin call for observability.
1432    pub fn send_call_start(
1433        &self,
1434        call_id: &str,
1435        call_type: &str,
1436        name: &str,
1437        metadata: serde_json::Value,
1438    ) {
1439        let session_id = self.get_session_id();
1440        let script = self.get_script_name();
1441        let stream_publicly = metadata
1442            .get("stream_publicly")
1443            .and_then(|value| value.as_bool())
1444            .unwrap_or(true);
1445        self.visible_call_streams
1446            .lock()
1447            .unwrap_or_else(|e| e.into_inner())
1448            .insert(call_id.to_string(), stream_publicly);
1449        self.notify(
1450            "session/update",
1451            serde_json::json!({
1452                "sessionId": session_id,
1453                "update": {
1454                    "sessionUpdate": "call_start",
1455                    "content": {
1456                        "toolCallId": call_id,
1457                        "call_type": call_type,
1458                        "name": name,
1459                        "script": script,
1460                        "metadata": metadata,
1461                    },
1462                },
1463            }),
1464        );
1465    }
1466
1467    /// Send a `session/update` with `call_progress` — a streaming token delta
1468    /// from an in-flight LLM call.
1469    pub fn send_call_progress(
1470        &self,
1471        call_id: &str,
1472        delta: &str,
1473        accumulated_tokens: u64,
1474        user_visible: bool,
1475    ) {
1476        let session_id = self.get_session_id();
1477        let (visible_text, visible_delta) = {
1478            let stream_publicly = self
1479                .visible_call_streams
1480                .lock()
1481                .unwrap_or_else(|e| e.into_inner())
1482                .get(call_id)
1483                .copied()
1484                .unwrap_or(true);
1485            let mut states = self
1486                .visible_call_states
1487                .lock()
1488                .unwrap_or_else(|e| e.into_inner());
1489            let state = states.entry(call_id.to_string()).or_default();
1490            state.push(delta, stream_publicly)
1491        };
1492        self.notify(
1493            "session/update",
1494            serde_json::json!({
1495                "sessionId": session_id,
1496                "update": {
1497                    "sessionUpdate": "call_progress",
1498                    "content": {
1499                        "toolCallId": call_id,
1500                        "delta": delta,
1501                        "accumulated_tokens": accumulated_tokens,
1502                        "visible_text": visible_text,
1503                        "visible_delta": visible_delta,
1504                        "user_visible": user_visible,
1505                    },
1506                },
1507            }),
1508        );
1509    }
1510
1511    /// Send a `session/update` with `call_end` — signals completion of a call.
1512    pub fn send_call_end(
1513        &self,
1514        call_id: &str,
1515        call_type: &str,
1516        name: &str,
1517        duration_ms: u64,
1518        status: &str,
1519        metadata: serde_json::Value,
1520    ) {
1521        let session_id = self.get_session_id();
1522        let script = self.get_script_name();
1523        self.visible_call_states
1524            .lock()
1525            .unwrap_or_else(|e| e.into_inner())
1526            .remove(call_id);
1527        self.visible_call_streams
1528            .lock()
1529            .unwrap_or_else(|e| e.into_inner())
1530            .remove(call_id);
1531        self.notify(
1532            "session/update",
1533            serde_json::json!({
1534                "sessionId": session_id,
1535                "update": {
1536                    "sessionUpdate": "call_end",
1537                    "content": {
1538                        "toolCallId": call_id,
1539                        "call_type": call_type,
1540                        "name": name,
1541                        "script": script,
1542                        "duration_ms": duration_ms,
1543                        "status": status,
1544                        "metadata": metadata,
1545                    },
1546                },
1547            }),
1548        );
1549    }
1550
1551    /// Send a worker lifecycle update for delegated/background execution.
1552    pub fn send_worker_update(
1553        &self,
1554        worker_id: &str,
1555        worker_name: &str,
1556        status: &str,
1557        metadata: serde_json::Value,
1558        audit: Option<&MutationSessionRecord>,
1559    ) {
1560        let session_id = self.get_session_id();
1561        let script = self.get_script_name();
1562        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1563        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1564        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1565        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1566        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1567        let lifecycle = serde_json::json!({
1568            "event": status,
1569            "worker_id": worker_id,
1570            "worker_name": worker_name,
1571            "started_at": started_at,
1572            "finished_at": finished_at,
1573        });
1574        self.notify(
1575            "session/update",
1576            serde_json::json!({
1577                "sessionId": session_id,
1578                "update": {
1579                    "sessionUpdate": "worker_update",
1580                    "content": {
1581                        "worker_id": worker_id,
1582                        "worker_name": worker_name,
1583                        "status": status,
1584                        "script": script,
1585                        "started_at": started_at,
1586                        "finished_at": finished_at,
1587                        "snapshot_path": snapshot_path,
1588                        "run_id": run_id,
1589                        "run_path": run_path,
1590                        "lifecycle": lifecycle,
1591                        "audit": audit,
1592                        "metadata": metadata,
1593                    },
1594                },
1595            }),
1596        );
1597    }
1598}
1599
1600/// Convert a serde_json::Value to a VmValue.
1601pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1602    crate::stdlib::json_to_vm_value(val)
1603}
1604
1605fn parse_host_tools_list_response(
1606    result: serde_json::Value,
1607) -> Result<Vec<serde_json::Value>, VmError> {
1608    let tools = match result {
1609        serde_json::Value::Array(items) => items,
1610        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1611            map.get("result")
1612                .and_then(|value| value.get("tools"))
1613                .cloned()
1614        }) {
1615            Some(serde_json::Value::Array(items)) => items,
1616            _ => {
1617                return Err(VmError::Runtime(
1618                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1619                ));
1620            }
1621        },
1622        _ => {
1623            return Err(VmError::Runtime(
1624                "host/tools/list: unexpected response shape".into(),
1625            ));
1626        }
1627    };
1628
1629    let mut normalized = Vec::with_capacity(tools.len());
1630    for tool in tools {
1631        let serde_json::Value::Object(map) = tool else {
1632            return Err(VmError::Runtime(
1633                "host/tools/list: every tool must be an object".into(),
1634            ));
1635        };
1636        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1637            return Err(VmError::Runtime(
1638                "host/tools/list: every tool must include a string `name`".into(),
1639            ));
1640        };
1641        let description = map
1642            .get("description")
1643            .and_then(|value| value.as_str())
1644            .or_else(|| {
1645                map.get("short_description")
1646                    .and_then(|value| value.as_str())
1647            })
1648            .unwrap_or_default();
1649        let schema = map
1650            .get("schema")
1651            .cloned()
1652            .or_else(|| map.get("parameters").cloned())
1653            .or_else(|| map.get("input_schema").cloned())
1654            .unwrap_or(serde_json::Value::Null);
1655        let deprecated = map
1656            .get("deprecated")
1657            .and_then(|value| value.as_bool())
1658            .unwrap_or(false);
1659        normalized.push(serde_json::json!({
1660            "name": name,
1661            "description": description,
1662            "schema": schema,
1663            "deprecated": deprecated,
1664        }));
1665    }
1666    Ok(normalized)
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671    use super::*;
1672
1673    fn test_bridge() -> HostBridge {
1674        HostBridge::from_parts(
1675            Arc::new(Mutex::new(HashMap::new())),
1676            Arc::new(AtomicBool::new(false)),
1677            Arc::new(std::sync::Mutex::new(())),
1678            1,
1679        )
1680    }
1681
1682    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1683        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1684            Arc::new(Mutex::new(HashMap::new())),
1685            Arc::new(AtomicBool::new(false)),
1686            Arc::new(Notify::new()),
1687            Arc::new(|_| Ok(())),
1688            100,
1689            Some(owner.injection_state()),
1690        )
1691    }
1692
1693    #[test]
1694    fn test_json_rpc_request_format() {
1695        let request = crate::jsonrpc::request(
1696            1,
1697            "llm_call",
1698            serde_json::json!({
1699                "prompt": "Hello",
1700                "system": "Be helpful",
1701            }),
1702        );
1703        let s = serde_json::to_string(&request).unwrap();
1704        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1705        assert!(s.contains("\"id\":1"));
1706        assert!(s.contains("\"method\":\"llm_call\""));
1707    }
1708
1709    #[test]
1710    fn test_json_rpc_notification_format() {
1711        let notification =
1712            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1713        let s = serde_json::to_string(&notification).unwrap();
1714        assert!(s.contains("\"method\":\"output\""));
1715        assert!(!s.contains("\"id\""));
1716    }
1717
1718    #[test]
1719    fn test_json_rpc_error_response_parsing() {
1720        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1721        assert!(response.get("error").is_some());
1722        assert_eq!(
1723            response["error"]["message"].as_str().unwrap(),
1724            "Invalid request"
1725        );
1726    }
1727
1728    #[test]
1729    fn test_json_rpc_success_response_parsing() {
1730        let response = crate::jsonrpc::response(
1731            1,
1732            serde_json::json!({
1733                "text": "Hello world",
1734                "input_tokens": 10,
1735                "output_tokens": 5,
1736            }),
1737        );
1738        assert!(response.get("result").is_some());
1739        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1740    }
1741
1742    #[test]
1743    fn test_cancelled_flag() {
1744        let cancelled = Arc::new(AtomicBool::new(false));
1745        assert!(!cancelled.load(Ordering::SeqCst));
1746        cancelled.store(true, Ordering::SeqCst);
1747        assert!(cancelled.load(Ordering::SeqCst));
1748    }
1749
1750    #[test]
1751    fn pending_host_calls_return_when_cancellation_arrives() {
1752        let runtime = tokio::runtime::Builder::new_current_thread()
1753            .enable_all()
1754            .build()
1755            .unwrap();
1756        runtime.block_on(async {
1757            let pending = Arc::new(Mutex::new(HashMap::new()));
1758            let cancelled = Arc::new(AtomicBool::new(false));
1759            let bridge = HostBridge::from_parts_with_writer(
1760                pending.clone(),
1761                cancelled.clone(),
1762                Arc::new(|_| Ok(())),
1763                1,
1764            );
1765
1766            let call = bridge.call("host/work", serde_json::json!({}));
1767            tokio::pin!(call);
1768
1769            loop {
1770                tokio::select! {
1771                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1772                    _ = tokio::task::yield_now() => {}
1773                }
1774                if !pending.lock().await.is_empty() {
1775                    break;
1776                }
1777            }
1778
1779            cancelled.store(true, Ordering::SeqCst);
1780            bridge.cancel_notify.notify_waiters();
1781
1782            let result = tokio::time::timeout(Duration::from_secs(1), call)
1783                .await
1784                .expect("pending call should observe cancellation promptly");
1785            assert!(
1786                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1787            );
1788            assert!(pending.lock().await.is_empty());
1789        });
1790    }
1791
1792    #[test]
1793    fn queued_messages_are_filtered_by_delivery_mode() {
1794        let runtime = tokio::runtime::Builder::new_current_thread()
1795            .enable_all()
1796            .build()
1797            .unwrap();
1798        runtime.block_on(async {
1799            let bridge = test_bridge();
1800            bridge
1801                .push_queued_user_message("first".to_string(), "finish_step")
1802                .await;
1803            bridge
1804                .push_queued_user_message("second".to_string(), "audit_only")
1805                .await;
1806
1807            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1808            assert_eq!(finish_step.len(), 1);
1809            assert_eq!(finish_step[0].content, "first");
1810
1811            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1812            assert_eq!(audit_only.len(), 1);
1813            assert_eq!(audit_only[0].content, "second");
1814        });
1815    }
1816
1817    #[test]
1818    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1819        let runtime = tokio::runtime::Builder::new_current_thread()
1820            .enable_all()
1821            .build()
1822            .unwrap();
1823        runtime.block_on(async {
1824            let bridge = test_bridge();
1825            let first_id = bridge
1826                .push_pending_user_message(
1827                    "first".to_string(),
1828                    serde_json::json!("first"),
1829                    "audit_only",
1830                )
1831                .await;
1832            let second_id = bridge
1833                .push_pending_user_message(
1834                    "second".to_string(),
1835                    serde_json::json!("second"),
1836                    "audit_only",
1837                )
1838                .await;
1839
1840            assert_eq!(
1841                bridge
1842                    .replace_pending_user_message(
1843                        &second_id,
1844                        "second edited".to_string(),
1845                        serde_json::json!("second edited"),
1846                    )
1847                    .await,
1848                PendingUserMessageMutationResult::Mutated
1849            );
1850            assert_eq!(
1851                bridge.revoke_pending_user_message(&first_id).await,
1852                PendingUserMessageMutationResult::Mutated
1853            );
1854            assert_eq!(
1855                bridge.revoke_pending_user_message(&first_id).await,
1856                PendingUserMessageMutationResult::AlreadyRevoked
1857            );
1858
1859            let delivered = bridge
1860                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1861                .await;
1862            assert_eq!(delivered.len(), 1);
1863            assert_eq!(delivered[0].message_id, second_id);
1864            assert_eq!(delivered[0].content, "second edited");
1865
1866            assert_eq!(
1867                bridge.revoke_pending_user_message(&second_id).await,
1868                PendingUserMessageMutationResult::AlreadyDelivered
1869            );
1870            assert_eq!(
1871                bridge
1872                    .replace_pending_user_message(
1873                        &second_id,
1874                        "too late".to_string(),
1875                        serde_json::json!("too late"),
1876                    )
1877                    .await,
1878                PendingUserMessageMutationResult::AlreadyDelivered
1879            );
1880            assert_eq!(
1881                bridge.revoke_pending_user_message("missing").await,
1882                PendingUserMessageMutationResult::UnknownMessageId
1883            );
1884        });
1885    }
1886
1887    #[test]
1888    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1889        let runtime = tokio::runtime::Builder::new_current_thread()
1890            .enable_all()
1891            .build()
1892            .unwrap();
1893        runtime.block_on(async {
1894            let bridge = test_bridge();
1895            let first_id = bridge
1896                .push_pending_user_message(
1897                    "first".to_string(),
1898                    serde_json::json!("first"),
1899                    "finish_step",
1900                )
1901                .await;
1902            let second_id = bridge
1903                .push_pending_user_message(
1904                    "second".to_string(),
1905                    serde_json::json!("second"),
1906                    "finish_step",
1907                )
1908                .await;
1909            assert_eq!(
1910                bridge
1911                    .replace_pending_user_message(
1912                        &first_id,
1913                        "first edited".to_string(),
1914                        serde_json::json!("first edited"),
1915                    )
1916                    .await,
1917                PendingUserMessageMutationResult::Mutated
1918            );
1919
1920            let delivered = bridge
1921                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1922                .await;
1923            assert_eq!(
1924                delivered
1925                    .iter()
1926                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1927                    .collect::<Vec<_>>(),
1928                vec![
1929                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1930                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
1931                ]
1932            );
1933        });
1934    }
1935
1936    #[test]
1937    fn pending_user_message_state_survives_bridge_replacement() {
1938        let runtime = tokio::runtime::Builder::new_current_thread()
1939            .enable_all()
1940            .build()
1941            .unwrap();
1942        runtime.block_on(async {
1943            let bridge = test_bridge();
1944            let revoked_id = bridge
1945                .push_pending_user_message(
1946                    "revoke me".to_string(),
1947                    serde_json::json!("revoke me"),
1948                    "audit_only",
1949                )
1950                .await;
1951            let delivered_id = bridge
1952                .push_pending_user_message(
1953                    "deliver me".to_string(),
1954                    serde_json::json!("deliver me"),
1955                    "audit_only",
1956                )
1957                .await;
1958            assert_eq!(
1959                bridge.revoke_pending_user_message(&revoked_id).await,
1960                PendingUserMessageMutationResult::Mutated
1961            );
1962            bridge.cancelled.store(true, Ordering::SeqCst);
1963
1964            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1965            assert_eq!(
1966                replacement_bridge
1967                    .revoke_pending_user_message(&revoked_id)
1968                    .await,
1969                PendingUserMessageMutationResult::AlreadyRevoked
1970            );
1971            let delivered = replacement_bridge
1972                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1973                .await;
1974            assert_eq!(delivered.len(), 1);
1975            assert_eq!(delivered[0].message_id, delivered_id);
1976            assert_eq!(delivered[0].content, "deliver me");
1977            assert_eq!(
1978                bridge.revoke_pending_user_message(&delivered_id).await,
1979                PendingUserMessageMutationResult::AlreadyDelivered
1980            );
1981        });
1982    }
1983
1984    #[test]
1985    fn queued_transcript_injections_preserve_user_reminder_separation() {
1986        let runtime = tokio::runtime::Builder::new_current_thread()
1987            .enable_all()
1988            .build()
1989            .unwrap();
1990        runtime.block_on(async {
1991            let bridge = test_bridge();
1992            bridge
1993                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1994                .await;
1995            let reminder_id = bridge
1996                .push_queued_session_remind_from_params(&serde_json::json!({
1997                    "body": "Host-provided ambient context.",
1998                    "tags": ["host"],
1999                    "dedupe_key": "host-context",
2000                    "ttl_turns": 2,
2001                    "mode": "audit_only",
2002                    "_meta": {"harn": {"source": "test"}},
2003                }))
2004                .await
2005                .expect("valid reminder");
2006
2007            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2008            assert_eq!(finish_step.len(), 1);
2009            assert_eq!(finish_step[0].content, "human follow-up");
2010
2011            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2012            assert!(no_user_messages.is_empty());
2013
2014            let injections = bridge
2015                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2016                .await;
2017            assert_eq!(injections.len(), 1);
2018            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2019                panic!("expected queued reminder");
2020            };
2021            assert_eq!(reminder.reminder.id, reminder_id);
2022            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2023            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2024            assert_eq!(
2025                reminder.reminder.dedupe_key.as_deref(),
2026                Some("host-context")
2027            );
2028            assert_eq!(reminder.reminder.ttl_turns, Some(2));
2029            assert_eq!(
2030                reminder.reminder.source,
2031                crate::llm::helpers::ReminderSource::Bridge
2032            );
2033        });
2034    }
2035
2036    #[test]
2037    fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2038        let runtime = tokio::runtime::Builder::new_current_thread()
2039            .enable_all()
2040            .build()
2041            .unwrap();
2042        runtime.block_on(async {
2043            let bridge = test_bridge();
2044            let message_id = bridge
2045                .push_pending_user_message(
2046                    "human follow-up".to_string(),
2047                    serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2048                    "finish_step",
2049                )
2050                .await;
2051            let reminder_id = bridge
2052                .push_queued_session_remind_from_params(&serde_json::json!({
2053                    "id": "rem-test",
2054                    "body": "Host reminder",
2055                    "tags": ["host"],
2056                    "dedupe_key": "host-reminder",
2057                    "ttl_turns": 2,
2058                    "mode": "interrupt_immediate",
2059                }))
2060                .await
2061                .expect("valid session/remind payload");
2062
2063            let pending = bridge.pending_injections_json().await;
2064            assert_eq!(pending["pendingCount"], 2);
2065            assert_eq!(pending["injections"][0]["kind"], "user");
2066            assert_eq!(pending["injections"][0]["id"], message_id);
2067            assert_eq!(pending["injections"][0]["messageId"], message_id);
2068            assert_eq!(pending["injections"][0]["mode"], "finish_step");
2069            assert_eq!(pending["injections"][0]["position"], 0);
2070            assert_eq!(pending["injections"][1]["kind"], "reminder");
2071            assert_eq!(pending["injections"][1]["id"], reminder_id);
2072            assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2073            assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2074            assert_eq!(pending["injections"][1]["body"], "Host reminder");
2075            assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2076            assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2077            assert_eq!(pending["injections"][1]["position"], 1);
2078        });
2079    }
2080
2081    #[test]
2082    fn pending_reminders_support_revoke_and_delivery_states() {
2083        let runtime = tokio::runtime::Builder::new_current_thread()
2084            .enable_all()
2085            .build()
2086            .unwrap();
2087        runtime.block_on(async {
2088            let bridge = test_bridge();
2089            let revoked_id = bridge
2090                .push_queued_session_remind_from_params(&serde_json::json!({
2091                    "id": "rem-revoke",
2092                    "body": "remove me",
2093                    "mode": "finish_step",
2094                }))
2095                .await
2096                .expect("valid session/remind payload");
2097            let delivered_id = bridge
2098                .push_queued_session_remind_from_params(&serde_json::json!({
2099                    "id": "rem-deliver",
2100                    "body": "deliver me",
2101                    "mode": "finish_step",
2102                }))
2103                .await
2104                .expect("valid session/remind payload");
2105
2106            assert_eq!(
2107                bridge.revoke_pending_reminder(&revoked_id).await,
2108                PendingReminderMutationResult::Mutated
2109            );
2110            assert_eq!(
2111                bridge.revoke_pending_reminder(&revoked_id).await,
2112                PendingReminderMutationResult::AlreadyRevoked
2113            );
2114
2115            let pending = bridge.pending_injections_json().await;
2116            assert_eq!(pending["pendingCount"], 1);
2117            assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2118
2119            let delivered = bridge
2120                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2121                .await;
2122            assert_eq!(delivered.len(), 1);
2123            let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2124                panic!("expected delivered reminder");
2125            };
2126            assert_eq!(reminder.reminder.id, delivered_id);
2127
2128            assert_eq!(
2129                bridge.revoke_pending_reminder(&delivered_id).await,
2130                PendingReminderMutationResult::AlreadyDelivered
2131            );
2132            assert_eq!(
2133                bridge.revoke_pending_reminder("missing").await,
2134                PendingReminderMutationResult::UnknownReminderId
2135            );
2136        });
2137    }
2138
2139    #[test]
2140    fn bridge_remind_modes_honor_delivery_checkpoints() {
2141        let runtime = tokio::runtime::Builder::new_current_thread()
2142            .enable_all()
2143            .build()
2144            .unwrap();
2145        runtime.block_on(async {
2146            let cases = [
2147                (
2148                    "interrupt_immediate",
2149                    DeliveryCheckpoint::InterruptImmediate,
2150                    DeliveryCheckpoint::AfterCurrentOperation,
2151                ),
2152                (
2153                    "finish_step",
2154                    DeliveryCheckpoint::AfterCurrentOperation,
2155                    DeliveryCheckpoint::EndOfInteraction,
2156                ),
2157                (
2158                    "audit_only",
2159                    DeliveryCheckpoint::EndOfInteraction,
2160                    DeliveryCheckpoint::InterruptImmediate,
2161                ),
2162            ];
2163
2164            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2165                let bridge = test_bridge();
2166                bridge
2167                    .push_queued_session_remind_from_params(&serde_json::json!({
2168                        "body": format!("Reminder for {mode}"),
2169                        "mode": mode,
2170                    }))
2171                    .await
2172                    .expect("valid session/remind payload");
2173
2174                let premature = bridge
2175                    .take_queued_transcript_injections_for(wrong_checkpoint)
2176                    .await;
2177                assert!(
2178                    premature.is_empty(),
2179                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2180                );
2181
2182                let delivered = bridge
2183                    .take_queued_transcript_injections_for(expected_checkpoint)
2184                    .await;
2185                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2186                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2187                    panic!("expected reminder for {mode}");
2188                };
2189                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2190            }
2191        });
2192    }
2193
2194    #[test]
2195    fn session_remind_validation_rejects_user_message_shape() {
2196        let err = queued_session_remind_from_params(&serde_json::json!({
2197            "content": "this is still a user message",
2198            "mode": "interrupt_immediate",
2199        }))
2200        .expect_err("session/remind must require a reminder body");
2201        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2202        assert!(err.contains("body"));
2203    }
2204
2205    #[test]
2206    fn session_remind_validation_rejects_unknown_options_separately() {
2207        let err = queued_session_remind_from_params(&serde_json::json!({
2208            "body": "valid body",
2209            "unknown_host_field": true,
2210        }))
2211        .expect_err("session/remind must reject unknown top-level fields");
2212        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2213        assert!(err.contains("unknown_host_field"));
2214    }
2215
2216    #[test]
2217    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2218        let err = queued_session_remind_from_params(&serde_json::json!({
2219            "body": "valid body",
2220            "propagate": "workspace",
2221        }))
2222        .expect_err("session/remind must reject unknown propagate values");
2223        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2224        assert!(err.contains("propagate"));
2225    }
2226
2227    #[test]
2228    fn test_json_result_to_vm_value_string() {
2229        let val = serde_json::json!("hello");
2230        let vm_val = json_result_to_vm_value(&val);
2231        assert_eq!(vm_val.display(), "hello");
2232    }
2233
2234    #[test]
2235    fn test_json_result_to_vm_value_dict() {
2236        let val = serde_json::json!({"name": "test", "count": 42});
2237        let vm_val = json_result_to_vm_value(&val);
2238        let VmValue::Dict(d) = &vm_val else {
2239            unreachable!("Expected Dict, got {:?}", vm_val);
2240        };
2241        assert_eq!(d.get("name").unwrap().display(), "test");
2242        assert_eq!(d.get("count").unwrap().display(), "42");
2243    }
2244
2245    #[test]
2246    fn test_json_result_to_vm_value_null() {
2247        let val = serde_json::json!(null);
2248        let vm_val = json_result_to_vm_value(&val);
2249        assert!(matches!(vm_val, VmValue::Nil));
2250    }
2251
2252    #[test]
2253    fn test_json_result_to_vm_value_nested() {
2254        let val = serde_json::json!({
2255            "text": "response",
2256            "tool_calls": [
2257                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2258            ],
2259            "input_tokens": 100,
2260            "output_tokens": 50,
2261        });
2262        let vm_val = json_result_to_vm_value(&val);
2263        let VmValue::Dict(d) = &vm_val else {
2264            unreachable!("Expected Dict, got {:?}", vm_val);
2265        };
2266        assert_eq!(d.get("text").unwrap().display(), "response");
2267        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2268            unreachable!("Expected List for tool_calls");
2269        };
2270        assert_eq!(list.len(), 1);
2271    }
2272
2273    #[test]
2274    fn parse_host_tools_list_accepts_object_wrapper() {
2275        let tools = parse_host_tools_list_response(serde_json::json!({
2276            "tools": [
2277                {
2278                    "name": "Read",
2279                    "description": "Read a file",
2280                    "schema": {"type": "object"},
2281                }
2282            ]
2283        }))
2284        .expect("tool list");
2285
2286        assert_eq!(tools.len(), 1);
2287        assert_eq!(tools[0]["name"], "Read");
2288        assert_eq!(tools[0]["deprecated"], false);
2289    }
2290
2291    #[test]
2292    fn parse_host_tools_list_accepts_compat_fields() {
2293        let tools = parse_host_tools_list_response(serde_json::json!({
2294            "result": {
2295                "tools": [
2296                    {
2297                        "name": "Edit",
2298                        "short_description": "Apply an edit",
2299                        "input_schema": {"type": "object"},
2300                        "deprecated": true,
2301                    }
2302                ]
2303            }
2304        }))
2305        .expect("tool list");
2306
2307        assert_eq!(tools[0]["description"], "Apply an edit");
2308        assert_eq!(tools[0]["schema"]["type"], "object");
2309        assert_eq!(tools[0]["deprecated"], true);
2310    }
2311
2312    #[test]
2313    fn parse_host_tools_list_requires_tool_names() {
2314        let err = parse_host_tools_list_response(serde_json::json!({
2315            "tools": [
2316                {"description": "missing name"}
2317            ]
2318        }))
2319        .expect_err("expected error");
2320        assert!(err
2321            .to_string()
2322            .contains("host/tools/list: every tool must include a string `name`"));
2323    }
2324
2325    #[test]
2326    fn test_timeout_duration() {
2327        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2328    }
2329}