Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26/// Default timeout for bridge calls (5 minutes).
27const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32    Arc::new(move |line: &str| {
33        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34        let mut stdout = std::io::stdout().lock();
35        stdout
36            .write_all(line.as_bytes())
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .write_all(b"\n")
40            .map_err(|e| format!("Bridge write error: {e}"))?;
41        stdout
42            .flush()
43            .map_err(|e| format!("Bridge flush error: {e}"))?;
44        Ok(())
45    })
46}
47
48/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
49///
50/// The bridge sends requests to the host on stdout and receives responses
51/// on stdin. A background task reads stdin and dispatches responses to
52/// waiting callers by request ID. All stdout writes are serialized through
53/// a mutex to prevent interleaving.
54pub struct HostBridge {
55    next_id: AtomicU64,
56    /// Pending request waiters, keyed by JSON-RPC id.
57    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58    /// Whether the host has sent a cancel notification.
59    cancelled: Arc<AtomicBool>,
60    /// Wakes pending host calls when cancellation arrives.
61    cancel_notify: Arc<Notify>,
62    /// Transport writer used to send JSON-RPC to the host.
63    writer: HostBridgeWriter,
64    /// ACP session ID (set in ACP mode for session-scoped notifications).
65    session_id: std::sync::Mutex<String>,
66    /// Name of the currently executing Harn script (without .harn suffix).
67    script_name: std::sync::Mutex<String>,
68    /// Transcript injections queued by the host while a run is active.
69    queued_transcript_injections: HostBridgeInjectionState,
70    /// Host-triggered resume signal for daemon agents.
71    resume_requested: Arc<AtomicBool>,
72    /// Host-triggered skill-registry invalidation signal. Set when the
73    /// host sends a `skills/update` notification; consumed by the CLI
74    /// between runs (watch mode, long-running agents) to rebuild the
75    /// layered skill catalog from its current filesystem + host state.
76    skills_reload_requested: Arc<AtomicBool>,
77    /// Whether the current daemon-mode agent loop is blocked in idle wait.
78    daemon_idle: Arc<AtomicBool>,
79    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
80    /// finalize during this prompt. Read once by the ACP adapter when the
81    /// pipeline returns and populated by `host_agent_session_finalize`.
82    /// Pipelines that don't run an agent loop leave this `None`, in which
83    /// case the adapter falls back to `end_turn`.
84    prompt_stop_reason: std::sync::Mutex<Option<String>>,
85    /// Per-call visible assistant text state for call_progress notifications.
86    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87    /// Whether an LLM call's deltas should be exposed to end users while streaming.
88    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89    /// Optional in-process host-module backend used by `harn playground`.
90    in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94    module_path: PathBuf,
95    exported_functions: BTreeMap<String, Arc<VmClosure>>,
96    vm: Vm,
97}
98
99impl InProcessHost {
100    /// Box-pin'd to break the static recursion between the VM's hot dispatch
101    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
102    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
103    /// at this slow-path boundary keeps the recursion satisfied without
104    /// allocating per call in the hot per-callback path.
105    fn dispatch<'a>(
106        &'a self,
107        method: &'a str,
108        params: serde_json::Value,
109    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110        Box::pin(async move {
111            match method {
112                "builtin_call" => {
113                    let name = params
114                        .get("name")
115                        .and_then(|value| value.as_str())
116                        .unwrap_or_default();
117                    let args = params
118                        .get("args")
119                        .and_then(|value| value.as_array())
120                        .cloned()
121                        .unwrap_or_default()
122                        .into_iter()
123                        .map(|value| json_result_to_vm_value(&value))
124                        .collect::<Vec<_>>();
125                    self.invoke_export(name, &args).await
126                }
127                "host/tools/list" => self
128                    .invoke_optional_export("host_tools_list", &[])
129                    .await
130                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131                "session/request_permission" => self.request_permission(params).await,
132                other => Err(VmError::Runtime(format!(
133                    "playground host backend does not implement bridge method '{other}'"
134                ))),
135            }
136        })
137    }
138
139    async fn invoke_export(
140        &self,
141        name: &str,
142        args: &[VmValue],
143    ) -> Result<serde_json::Value, VmError> {
144        let Some(closure) = self.exported_functions.get(name) else {
145            return Err(VmError::Runtime(format!(
146                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147                self.module_path.display()
148            )));
149        };
150
151        let mut vm = self.vm.child_vm_for_host();
152        let result = vm.call_closure_pub(closure, args).await?;
153        Ok(crate::llm::vm_value_to_json(&result))
154    }
155
156    async fn invoke_optional_export(
157        &self,
158        name: &str,
159        args: &[VmValue],
160    ) -> Result<Option<serde_json::Value>, VmError> {
161        if !self.exported_functions.contains_key(name) {
162            return Ok(None);
163        }
164        self.invoke_export(name, args).await.map(Some)
165    }
166
167    async fn request_permission(
168        &self,
169        params: serde_json::Value,
170    ) -> Result<serde_json::Value, VmError> {
171        // No exported `request_permission` means the playground host has no
172        // approval policy, so it grants through the canonical ACP option.
173        let Some(closure) = self.exported_functions.get("request_permission") else {
174            return Ok(crate::llm::acp_permission::allow_response());
175        };
176
177        let tool_call = params.get("toolCall");
178        let tool_name = tool_call
179            .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182            .and_then(|value| value.as_str())
183            .unwrap_or_default();
184        let tool_args = tool_call
185            .and_then(|tool_call| tool_call.get("rawInput"))
186            .map(json_result_to_vm_value)
187            .unwrap_or(VmValue::Nil);
188        let full_payload = json_result_to_vm_value(&params);
189
190        let arg_count = closure.func.params.len();
191        let args = if arg_count >= 3 {
192            vec![
193                VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
194                tool_args,
195                full_payload,
196            ]
197        } else if arg_count == 2 {
198            vec![
199                VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
200                tool_args,
201            ]
202        } else if arg_count == 1 {
203            vec![full_payload]
204        } else {
205            Vec::new()
206        };
207
208        let mut vm = self.vm.child_vm_for_host();
209        let result = vm.call_closure_pub(closure, &args).await?;
210        // Translate the script's verdict into a canonical ACP response
211        // (`{ outcome: { outcome: "selected" | "cancelled", optionId? } }`).
212        // The script API stays ergonomic — bool / string-reason / dict — but
213        // the wire shape is canonical.
214        let payload = match result {
215            VmValue::Bool(granted) => {
216                if granted {
217                    crate::llm::acp_permission::allow_response()
218                } else {
219                    crate::llm::acp_permission::reject_response(None)
220                }
221            }
222            VmValue::String(reason) if !reason.is_empty() => {
223                crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224            }
225            other => {
226                let json = crate::llm::vm_value_to_json(&other);
227                if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228                    if granted {
229                        crate::llm::acp_permission::allow_response()
230                    } else {
231                        crate::llm::acp_permission::reject_response(
232                            json.get("reason")
233                                .and_then(|value| value.as_str())
234                                .map(str::to_string),
235                        )
236                    }
237                } else if json.get("outcome").is_some() {
238                    // The script already returned a canonical-shaped outcome.
239                    json
240                } else if other.is_truthy() {
241                    crate::llm::acp_permission::allow_response()
242                } else {
243                    crate::llm::acp_permission::reject_response(None)
244                }
245            }
246        };
247        Ok(payload)
248    }
249}
250
251/// How a queued bridge injection is delivered into the agent loop.
252///
253/// `AuditOnly` injections drain at `loop_exit`, *after* the last LLM call has
254/// returned, so they land in the transcript audit but are **never rendered into
255/// a model prompt**.
256/// Hosts that want the model to react to the reminder on its final
257/// iteration should use `FinishStep` instead, which drains at every
258/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
259#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260pub enum QueuedUserMessageMode {
261    InterruptImmediate,
262    FinishStep,
263    AuditOnly,
264}
265
266#[derive(Clone, Copy, Debug, PartialEq, Eq)]
267pub enum DeliveryCheckpoint {
268    InterruptImmediate,
269    AfterCurrentOperation,
270    EndOfInteraction,
271}
272
273impl QueuedUserMessageMode {
274    fn from_str(value: &str) -> Self {
275        match value {
276            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
277            // `steer` is the ACP `session/inject` alias for mid-turn
278            // user-message delivery at the next tool boundary; it maps to
279            // the same `FinishStep` checkpoint as `finish_step`.
280            "finish_step" | "after_current_operation" | "steer" => Self::FinishStep,
281            // `queue` is the explicit ACP alias for the audit-only path.
282            "queue" => Self::AuditOnly,
283            // Unknown / missing modes fall through to the safest option:
284            // record for audit, do not preempt the loop. Pre-#2212 hosts
285            // that send `wait_for_completion` are caught by this arm —
286            // the canonical name is `audit_only` going forward.
287            _ => Self::AuditOnly,
288        }
289    }
290
291    fn as_str(self) -> &'static str {
292        match self {
293            Self::InterruptImmediate => "interrupt_immediate",
294            Self::FinishStep => "finish_step",
295            Self::AuditOnly => "audit_only",
296        }
297    }
298}
299
300#[derive(Clone, Debug, PartialEq, Eq)]
301pub struct QueuedUserMessage {
302    pub message_id: String,
303    pub content: String,
304    pub transcript_content: serde_json::Value,
305    pub mode: QueuedUserMessageMode,
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct QueuedReminder {
310    pub reminder: crate::llm::helpers::SystemReminder,
311    pub mode: QueuedUserMessageMode,
312}
313
314#[derive(Clone, Debug, PartialEq, Eq)]
315pub enum QueuedTranscriptInjection {
316    User(QueuedUserMessage),
317    Reminder(QueuedReminder),
318}
319
320#[derive(Debug, Default)]
321struct QueuedTranscriptInjections {
322    queue: VecDeque<QueuedTranscriptInjection>,
323    revoked_user_message_ids: HashSet<String>,
324    delivered_user_message_ids: HashSet<String>,
325    revoked_reminder_ids: HashSet<String>,
326    delivered_reminder_ids: HashSet<String>,
327}
328
329#[derive(Clone, Debug, Default)]
330pub struct HostBridgeInjectionState {
331    inner: Arc<Mutex<QueuedTranscriptInjections>>,
332}
333
334#[derive(Clone, Copy, Debug, PartialEq, Eq)]
335pub enum PendingUserMessageMutationResult {
336    Mutated,
337    AlreadyRevoked,
338    AlreadyDelivered,
339    UnknownMessageId,
340}
341
342impl QueuedTranscriptInjection {
343    fn mode(&self) -> QueuedUserMessageMode {
344        match self {
345            Self::User(message) => message.mode,
346            Self::Reminder(reminder) => reminder.mode,
347        }
348    }
349
350    fn pending_json(&self, position: usize) -> serde_json::Value {
351        match self {
352            Self::User(message) => serde_json::json!({
353                "kind": "user",
354                "id": message.message_id,
355                "messageId": message.message_id,
356                "mode": message.mode.as_str(),
357                "position": position,
358                "content": message.transcript_content,
359            }),
360            Self::Reminder(reminder) => serde_json::json!({
361                "kind": "reminder",
362                "id": reminder.reminder.id,
363                "reminderId": reminder.reminder.id,
364                "mode": reminder.mode.as_str(),
365                "position": position,
366                "body": reminder.reminder.body,
367                "tags": reminder.reminder.tags,
368                "dedupeKey": reminder.reminder.dedupe_key,
369                "ttlTurns": reminder.reminder.ttl_turns,
370                "preserveOnCompact": reminder.reminder.preserve_on_compact,
371                "propagate": reminder.reminder.propagate.as_str(),
372                "roleHint": reminder.reminder.role_hint.as_str(),
373                "source": reminder.reminder.source.as_str(),
374                "firedAtTurn": reminder.reminder.fired_at_turn,
375                "originatingAgentId": reminder.reminder.originating_agent_id,
376            }),
377        }
378    }
379}
380
381#[derive(Clone, Copy, Debug, PartialEq, Eq)]
382pub enum PendingReminderMutationResult {
383    Mutated,
384    AlreadyRevoked,
385    AlreadyDelivered,
386    UnknownReminderId,
387}
388
389fn new_inject_message_id() -> String {
390    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
391}
392
393impl HostBridgeInjectionState {
394    pub fn new() -> Self {
395        Self::default()
396    }
397
398    pub async fn push_pending_user_message(
399        &self,
400        content: String,
401        transcript_content: serde_json::Value,
402        mode: &str,
403    ) -> String {
404        let message_id = new_inject_message_id();
405        self.inner
406            .lock()
407            .await
408            .queue
409            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
410                message_id: message_id.clone(),
411                content,
412                transcript_content,
413                mode: QueuedUserMessageMode::from_str(mode),
414            }));
415        message_id
416    }
417
418    pub async fn revoke_pending_user_message(
419        &self,
420        message_id: &str,
421    ) -> PendingUserMessageMutationResult {
422        let mut state = self.inner.lock().await;
423        let mut retained = VecDeque::new();
424        let mut revoked = false;
425        while let Some(injection) = state.queue.pop_front() {
426            match &injection {
427                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
428                    revoked = true;
429                }
430                _ => retained.push_back(injection),
431            }
432        }
433        state.queue = retained;
434        if revoked {
435            state
436                .revoked_user_message_ids
437                .insert(message_id.to_string());
438            return PendingUserMessageMutationResult::Mutated;
439        }
440        if state.revoked_user_message_ids.contains(message_id) {
441            PendingUserMessageMutationResult::AlreadyRevoked
442        } else if state.delivered_user_message_ids.contains(message_id) {
443            PendingUserMessageMutationResult::AlreadyDelivered
444        } else {
445            PendingUserMessageMutationResult::UnknownMessageId
446        }
447    }
448
449    pub async fn revoke_pending_reminder(
450        &self,
451        reminder_id: &str,
452    ) -> PendingReminderMutationResult {
453        let mut state = self.inner.lock().await;
454        let mut retained = VecDeque::new();
455        let mut revoked = false;
456        while let Some(injection) = state.queue.pop_front() {
457            match &injection {
458                QueuedTranscriptInjection::Reminder(reminder)
459                    if reminder.reminder.id == reminder_id =>
460                {
461                    revoked = true;
462                }
463                _ => retained.push_back(injection),
464            }
465        }
466        state.queue = retained;
467        if revoked {
468            state.revoked_reminder_ids.insert(reminder_id.to_string());
469            return PendingReminderMutationResult::Mutated;
470        }
471        if state.revoked_reminder_ids.contains(reminder_id) {
472            PendingReminderMutationResult::AlreadyRevoked
473        } else if state.delivered_reminder_ids.contains(reminder_id) {
474            PendingReminderMutationResult::AlreadyDelivered
475        } else {
476            PendingReminderMutationResult::UnknownReminderId
477        }
478    }
479
480    pub async fn replace_pending_user_message(
481        &self,
482        message_id: &str,
483        content: String,
484        transcript_content: serde_json::Value,
485    ) -> PendingUserMessageMutationResult {
486        let mut state = self.inner.lock().await;
487        for injection in &mut state.queue {
488            if let QueuedTranscriptInjection::User(message) = injection {
489                if message.message_id == message_id {
490                    message.content = content;
491                    message.transcript_content = transcript_content;
492                    return PendingUserMessageMutationResult::Mutated;
493                }
494            }
495        }
496        if state.revoked_user_message_ids.contains(message_id) {
497            PendingUserMessageMutationResult::AlreadyRevoked
498        } else if state.delivered_user_message_ids.contains(message_id) {
499            PendingUserMessageMutationResult::AlreadyDelivered
500        } else {
501            PendingUserMessageMutationResult::UnknownMessageId
502        }
503    }
504
505    async fn push_session_reminder(&self, reminder: QueuedReminder) {
506        self.inner
507            .lock()
508            .await
509            .queue
510            .push_back(QueuedTranscriptInjection::Reminder(reminder));
511    }
512
513    pub async fn pending_injections_json(&self) -> serde_json::Value {
514        let state = self.inner.lock().await;
515        let injections = state
516            .queue
517            .iter()
518            .enumerate()
519            .map(|(position, injection)| injection.pending_json(position))
520            .collect::<Vec<_>>();
521        serde_json::json!({
522            "pendingCount": injections.len(),
523            "injections": injections,
524        })
525    }
526}
527
528fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
529    format!(
530        "{}: {}",
531        Code::ReminderUnknownOption.as_str(),
532        message.as_ref()
533    )
534}
535
536fn session_remind_shape_error(message: impl AsRef<str>) -> String {
537    format!(
538        "{}: {}",
539        Code::ReminderInvalidShape.as_str(),
540        message.as_ref()
541    )
542}
543
544fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
545    format!(
546        "{}: {}",
547        Code::ReminderUnknownPropagate.as_str(),
548        message.as_ref()
549    )
550}
551
552fn string_field(
553    map: &serde_json::Map<String, serde_json::Value>,
554    key: &str,
555    required: bool,
556) -> Result<Option<String>, String> {
557    match map.get(key) {
558        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
559            format!("`{key}` must be a non-empty string"),
560        )),
561        None | Some(serde_json::Value::Null) => Ok(None),
562        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
563            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
564        ),
565        Some(serde_json::Value::String(value)) => {
566            let trimmed = value.trim();
567            if trimmed.is_empty() {
568                Ok(None)
569            } else {
570                Ok(Some(trimmed.to_string()))
571            }
572        }
573        Some(other) => Err(session_remind_shape_error(format!(
574            "`{key}` must be a string, got {other}"
575        ))),
576    }
577}
578
579fn bool_field(
580    map: &serde_json::Map<String, serde_json::Value>,
581    key: &str,
582) -> Result<Option<bool>, String> {
583    match map.get(key) {
584        None | Some(serde_json::Value::Null) => Ok(None),
585        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
586        Some(other) => Err(session_remind_shape_error(format!(
587            "`{key}` must be a bool, got {other}"
588        ))),
589    }
590}
591
592fn int_field(
593    map: &serde_json::Map<String, serde_json::Value>,
594    key: &str,
595) -> Result<Option<i64>, String> {
596    match map.get(key) {
597        None | Some(serde_json::Value::Null) => Ok(None),
598        Some(serde_json::Value::Number(value)) => {
599            let Some(value) = value.as_i64() else {
600                return Err(session_remind_shape_error(format!(
601                    "`{key}` must be an integer"
602                )));
603            };
604            Ok(Some(value))
605        }
606        Some(other) => Err(session_remind_shape_error(format!(
607            "`{key}` must be an int, got {other}"
608        ))),
609    }
610}
611
612fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
613    let Some(value) = map.get("tags") else {
614        return Ok(Vec::new());
615    };
616    if value.is_null() {
617        return Ok(Vec::new());
618    }
619    let Some(values) = value.as_array() else {
620        return Err(session_remind_shape_error("`tags` must be a list"));
621    };
622    let mut tags = Vec::new();
623    for value in values {
624        let Some(tag) = value.as_str() else {
625            return Err(session_remind_shape_error(format!(
626                "`tags` entries must be strings, got {value}"
627            )));
628        };
629        let tag = tag.trim();
630        if tag.is_empty() {
631            return Err(session_remind_shape_error(
632                "`tags` entries must be non-empty strings",
633            ));
634        }
635        if !tags.iter().any(|existing| existing == tag) {
636            tags.push(tag.to_string());
637        }
638    }
639    Ok(tags)
640}
641
642fn session_remind_payload_from_value(
643    value: &serde_json::Value,
644) -> Result<crate::llm::helpers::SystemReminder, String> {
645    let Some(map) = value.as_object() else {
646        return Err(session_remind_shape_error(
647            "session/remind payload must be a reminder object",
648        ));
649    };
650    const ALLOWED: &[&str] = &[
651        "_meta",
652        "body",
653        "dedupe_key",
654        "fired_at_turn",
655        "id",
656        "preserve_on_compact",
657        "propagate",
658        "role_hint",
659        "source",
660        "tags",
661        "ttl_turns",
662    ];
663    let unknown = map
664        .keys()
665        .filter(|key| !ALLOWED.contains(&key.as_str()))
666        .map(String::as_str)
667        .collect::<Vec<_>>();
668    if !unknown.is_empty() {
669        if unknown.contains(&"content") {
670            return Err(session_remind_shape_error(
671                "session/remind expects reminder `body`, not user-message `content`",
672            ));
673        }
674        return Err(reminder_unknown_option_error(format!(
675            "unknown reminder option(s): {}",
676            unknown.join(", ")
677        )));
678    }
679    if let Some(meta) = map.get("_meta") {
680        if !meta.is_null() && !meta.is_object() {
681            return Err(session_remind_shape_error("`_meta` must be an object"));
682        }
683    }
684    let ttl_turns = int_field(map, "ttl_turns")?;
685    if let Some(value) = ttl_turns {
686        if value <= 0 {
687            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
688        }
689    }
690    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
691    if fired_at_turn < 0 {
692        return Err(session_remind_shape_error(
693            "`fired_at_turn` must be >= 0 when provided",
694        ));
695    }
696    match string_field(map, "source", false)?.as_deref() {
697        None | Some("bridge") => {}
698        Some(_) => {
699            return Err(session_remind_shape_error(
700                "`source` for session/remind must be bridge when provided",
701            ))
702        }
703    }
704    let propagate = match string_field(map, "propagate", false)?.as_deref() {
705        None => crate::llm::helpers::ReminderPropagate::Session,
706        Some("all") => crate::llm::helpers::ReminderPropagate::All,
707        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
708        Some("none") => crate::llm::helpers::ReminderPropagate::None,
709        Some(_) => {
710            return Err(reminder_unknown_propagate_error(
711                "`propagate` must be one of all, session, or none",
712            ))
713        }
714    };
715    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
716        None => crate::llm::helpers::ReminderRoleHint::System,
717        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
718        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
719        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
720        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
721        Some(_) => {
722            return Err(session_remind_shape_error(
723                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
724            ))
725        }
726    };
727    Ok(crate::llm::helpers::SystemReminder {
728        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
729        tags: tags_field(map)?,
730        dedupe_key: string_field(map, "dedupe_key", false)?,
731        ttl_turns,
732        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
733        propagate,
734        role_hint,
735        source: crate::llm::helpers::ReminderSource::Bridge,
736        body: string_field(map, "body", true)?.unwrap_or_default(),
737        fired_at_turn,
738        originating_agent_id: None,
739    })
740}
741
742/// Parse the params of a `session/cancel_tool_call` notification and fire
743/// the per-tool-call cancellation. Mirrors the shape used by the public
744/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
745/// regardless of which surface they came through.
746///
747/// Stdio bridges send this as a notification (no id, no response); the
748/// builtin handles request/response semantics in Harn. We deliberately
749/// drop malformed payloads silently because notifications can't reply
750/// with an error — logging would also be too noisy for partial drops.
751fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
752    let session_id = params
753        .get("sessionId")
754        .or_else(|| params.get("session_id"))
755        .and_then(|value| value.as_str())
756        .unwrap_or_default();
757    let call_id = params
758        .get("toolCallId")
759        .or_else(|| params.get("tool_call_id"))
760        .or_else(|| params.get("callId"))
761        .or_else(|| params.get("call_id"))
762        .and_then(|value| value.as_str())
763        .unwrap_or_default();
764    if call_id.is_empty() {
765        return;
766    }
767    let reason = params
768        .get("reason")
769        .and_then(|value| value.as_str())
770        .unwrap_or("host cancelled in-flight tool call")
771        .to_string();
772    let inject_reminder = params
773        .get("injectReminder")
774        .or_else(|| params.get("inject_reminder"))
775        .and_then(|value| value.as_bool())
776        .unwrap_or(true);
777    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
778}
779
780fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
781    let mode = QueuedUserMessageMode::from_str(
782        params
783            .get("mode")
784            .and_then(|value| value.as_str())
785            .unwrap_or("audit_only"),
786    );
787    let reminder_value = if let Some(reminder) = params.get("reminder") {
788        reminder.clone()
789    } else {
790        let Some(params) = params.as_object() else {
791            return Err(session_remind_shape_error(
792                "session/remind params must be an object",
793            ));
794        };
795        let mut reminder = params.clone();
796        reminder.remove("mode");
797        reminder.remove("sessionId");
798        reminder.remove("session_id");
799        serde_json::Value::Object(reminder)
800    };
801    Ok(QueuedReminder {
802        reminder: session_remind_payload_from_value(&reminder_value)?,
803        mode,
804    })
805}
806
807// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
808#[allow(clippy::new_without_default)]
809impl HostBridge {
810    /// Create a new bridge and spawn the stdin reader task.
811    ///
812    /// Must be called within a tokio LocalSet (uses spawn_local for the
813    /// stdin reader since it's single-threaded).
814    pub fn new() -> Self {
815        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
816            Arc::new(Mutex::new(HashMap::new()));
817        let cancelled = Arc::new(AtomicBool::new(false));
818        let cancel_notify = Arc::new(Notify::new());
819        let queued_transcript_injections = HostBridgeInjectionState::default();
820        let resume_requested = Arc::new(AtomicBool::new(false));
821        let skills_reload_requested = Arc::new(AtomicBool::new(false));
822        let daemon_idle = Arc::new(AtomicBool::new(false));
823
824        // Stdin reader: reads JSON-RPC lines and dispatches responses
825        let pending_clone = pending.clone();
826        let cancelled_clone = cancelled.clone();
827        let cancel_notify_clone = cancel_notify.clone();
828        let queued_clone = queued_transcript_injections.clone();
829        let resume_clone = resume_requested.clone();
830        let skills_reload_clone = skills_reload_requested.clone();
831        tokio::task::spawn_local(async move {
832            let stdin = tokio::io::stdin();
833            let reader = tokio::io::BufReader::new(stdin);
834            let mut lines = reader.lines();
835
836            while let Ok(Some(line)) = lines.next_line().await {
837                let line = line.trim().to_string();
838                if line.is_empty() {
839                    continue;
840                }
841
842                let msg: serde_json::Value = match serde_json::from_str(&line) {
843                    Ok(v) => v,
844                    Err(_) => continue,
845                };
846
847                // Notifications have no id; responses have one.
848                if msg.get("id").is_none() {
849                    if let Some(method) = msg["method"].as_str() {
850                        if method == "cancel" {
851                            cancelled_clone.store(true, Ordering::SeqCst);
852                            cancel_notify_clone.notify_waiters();
853                        } else if method == "agent/resume" {
854                            resume_clone.store(true, Ordering::SeqCst);
855                        } else if method == "skills/update" {
856                            skills_reload_clone.store(true, Ordering::SeqCst);
857                        } else if method == "session/remind" {
858                            let params = &msg["params"];
859                            if let Ok(reminder) = queued_session_remind_from_params(params) {
860                                queued_clone.push_session_reminder(reminder).await;
861                            }
862                        } else if method == "session/cancel_tool_call" {
863                            handle_cancel_tool_call_notification(&msg["params"]);
864                        }
865                    }
866                    continue;
867                }
868
869                if let Some(id) = msg["id"].as_u64() {
870                    let mut pending = pending_clone.lock().await;
871                    if let Some(sender) = pending.remove(&id) {
872                        let _ = sender.send(msg);
873                    }
874                }
875            }
876
877            // stdin closed: drop pending senders to cancel waiters.
878            let mut pending = pending_clone.lock().await;
879            pending.clear();
880        });
881
882        Self {
883            next_id: AtomicU64::new(1),
884            pending,
885            cancelled,
886            cancel_notify,
887            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
888            session_id: std::sync::Mutex::new(String::new()),
889            script_name: std::sync::Mutex::new(String::new()),
890            queued_transcript_injections,
891            resume_requested,
892            skills_reload_requested,
893            daemon_idle,
894            prompt_stop_reason: std::sync::Mutex::new(None),
895            visible_call_states: std::sync::Mutex::new(HashMap::new()),
896            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
897            in_process: None,
898        }
899    }
900
901    /// Create a bridge from pre-existing shared state.
902    ///
903    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
904    /// responsible for dispatching responses into `pending`.  This is used
905    /// by ACP mode which already has its own stdin reader.
906    pub fn from_parts(
907        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
908        cancelled: Arc<AtomicBool>,
909        stdout_lock: Arc<std::sync::Mutex<()>>,
910        start_id: u64,
911    ) -> Self {
912        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
913    }
914
915    pub fn from_parts_with_writer(
916        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
917        cancelled: Arc<AtomicBool>,
918        writer: HostBridgeWriter,
919        start_id: u64,
920    ) -> Self {
921        Self::from_parts_with_writer_and_cancel_notify(
922            pending,
923            cancelled,
924            Arc::new(Notify::new()),
925            writer,
926            start_id,
927        )
928    }
929
930    pub fn from_parts_with_writer_and_cancel_notify(
931        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
932        cancelled: Arc<AtomicBool>,
933        cancel_notify: Arc<Notify>,
934        writer: HostBridgeWriter,
935        start_id: u64,
936    ) -> Self {
937        Self::from_parts_with_writer_cancel_notify_and_injection_state(
938            pending,
939            cancelled,
940            cancel_notify,
941            writer,
942            start_id,
943            None,
944        )
945    }
946
947    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
948        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
949        cancelled: Arc<AtomicBool>,
950        cancel_notify: Arc<Notify>,
951        writer: HostBridgeWriter,
952        start_id: u64,
953        injection_state: Option<HostBridgeInjectionState>,
954    ) -> Self {
955        Self {
956            next_id: AtomicU64::new(start_id),
957            pending,
958            cancelled,
959            cancel_notify,
960            writer,
961            session_id: std::sync::Mutex::new(String::new()),
962            script_name: std::sync::Mutex::new(String::new()),
963            queued_transcript_injections: injection_state.unwrap_or_default(),
964            resume_requested: Arc::new(AtomicBool::new(false)),
965            skills_reload_requested: Arc::new(AtomicBool::new(false)),
966            daemon_idle: Arc::new(AtomicBool::new(false)),
967            prompt_stop_reason: std::sync::Mutex::new(None),
968            visible_call_states: std::sync::Mutex::new(HashMap::new()),
969            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
970            in_process: None,
971        }
972    }
973
974    /// Create an in-process host bridge backed by exported functions from a
975    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
976    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
977        let exported_functions = vm.load_module_exports(module_path).await?;
978        Ok(Self {
979            next_id: AtomicU64::new(1),
980            pending: Arc::new(Mutex::new(HashMap::new())),
981            cancelled: Arc::new(AtomicBool::new(false)),
982            cancel_notify: Arc::new(Notify::new()),
983            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
984            session_id: std::sync::Mutex::new(String::new()),
985            script_name: std::sync::Mutex::new(String::new()),
986            queued_transcript_injections: HostBridgeInjectionState::default(),
987            resume_requested: Arc::new(AtomicBool::new(false)),
988            skills_reload_requested: Arc::new(AtomicBool::new(false)),
989            daemon_idle: Arc::new(AtomicBool::new(false)),
990            prompt_stop_reason: std::sync::Mutex::new(None),
991            visible_call_states: std::sync::Mutex::new(HashMap::new()),
992            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
993            in_process: Some(InProcessHost {
994                module_path: module_path.to_path_buf(),
995                exported_functions,
996                vm,
997            }),
998        })
999    }
1000
1001    /// Set the ACP session ID for session-scoped notifications.
1002    pub fn set_session_id(&self, id: &str) {
1003        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1004    }
1005
1006    /// Set the currently executing script name (without .harn suffix).
1007    pub fn set_script_name(&self, name: &str) {
1008        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1009    }
1010
1011    /// Get the current script name.
1012    fn get_script_name(&self) -> String {
1013        self.script_name
1014            .lock()
1015            .unwrap_or_else(|e| e.into_inner())
1016            .clone()
1017    }
1018
1019    /// Get the session ID.
1020    pub fn get_session_id(&self) -> String {
1021        self.session_id
1022            .lock()
1023            .unwrap_or_else(|e| e.into_inner())
1024            .clone()
1025    }
1026
1027    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
1028    fn write_line(&self, line: &str) -> Result<(), VmError> {
1029        (self.writer)(line).map_err(VmError::Runtime)
1030    }
1031
1032    /// Send a JSON-RPC request to the host and wait for the response.
1033    /// Times out after 5 minutes to prevent deadlocks.
1034    pub async fn call(
1035        &self,
1036        method: &str,
1037        params: serde_json::Value,
1038    ) -> Result<serde_json::Value, VmError> {
1039        if let Some(in_process) = &self.in_process {
1040            return in_process.dispatch(method, params).await;
1041        }
1042
1043        if self.is_cancelled() {
1044            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1045        }
1046
1047        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1048        let cancel_wait = self.cancel_notify.notified();
1049        tokio::pin!(cancel_wait);
1050
1051        let request = crate::jsonrpc::request(id, method, params);
1052
1053        let (tx, rx) = oneshot::channel();
1054        {
1055            let mut pending = self.pending.lock().await;
1056            pending.insert(id, tx);
1057        }
1058
1059        let line = serde_json::to_string(&request)
1060            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1061        if let Err(e) = self.write_line(&line) {
1062            let mut pending = self.pending.lock().await;
1063            pending.remove(&id);
1064            return Err(e);
1065        }
1066
1067        if self.is_cancelled() {
1068            let mut pending = self.pending.lock().await;
1069            pending.remove(&id);
1070            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1071        }
1072
1073        let response = tokio::select! {
1074            result = rx => match result {
1075                Ok(msg) => msg,
1076                Err(_) => {
1077                    // Sender dropped: host closed or stdin reader exited.
1078                    return Err(VmError::Runtime(
1079                        "Bridge: host closed connection before responding".into(),
1080                    ));
1081                }
1082            },
1083            _ = &mut cancel_wait => {
1084                let mut pending = self.pending.lock().await;
1085                pending.remove(&id);
1086                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1087            }
1088            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1089                let mut pending = self.pending.lock().await;
1090                pending.remove(&id);
1091                return Err(VmError::Runtime(format!(
1092                    "Bridge: host did not respond to '{method}' within {}s",
1093                    DEFAULT_TIMEOUT.as_secs()
1094                )));
1095            }
1096        };
1097
1098        if let Some(error) = response.get("error") {
1099            let message = error["message"].as_str().unwrap_or("Unknown host error");
1100            let code = error["code"].as_i64().unwrap_or(-1);
1101            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
1102            if code == -32001 {
1103                return Err(VmError::CategorizedError {
1104                    message: message.to_string(),
1105                    category: ErrorCategory::ToolRejected,
1106                });
1107            }
1108            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1109        }
1110
1111        Ok(response["result"].clone())
1112    }
1113
1114    /// Send a JSON-RPC notification to the host (no response expected).
1115    /// Serialized through the stdout mutex to prevent interleaving.
1116    pub fn notify(&self, method: &str, params: serde_json::Value) {
1117        let notification = crate::jsonrpc::notification(method, params);
1118        if self.in_process.is_some() {
1119            return;
1120        }
1121        if let Ok(line) = serde_json::to_string(&notification) {
1122            let _ = self.write_line(&line);
1123        }
1124    }
1125
1126    /// Check if the host has sent a cancel notification.
1127    pub fn is_cancelled(&self) -> bool {
1128        self.cancelled.load(Ordering::SeqCst)
1129    }
1130
1131    pub fn take_resume_signal(&self) -> bool {
1132        self.resume_requested.swap(false, Ordering::SeqCst)
1133    }
1134
1135    pub fn signal_resume(&self) {
1136        self.resume_requested.store(true, Ordering::SeqCst);
1137    }
1138
1139    pub fn set_daemon_idle(&self, idle: bool) {
1140        self.daemon_idle.store(idle, Ordering::SeqCst);
1141    }
1142
1143    pub fn is_daemon_idle(&self) -> bool {
1144        self.daemon_idle.load(Ordering::SeqCst)
1145    }
1146
1147    /// Record the canonical ACP `stopReason` for the current prompt. The
1148    /// last writer wins, which matches the semantic that an outer
1149    /// `agent_loop` (the one whose result the user observes) always
1150    /// finalizes after any inner loops it spawned.
1151    pub fn set_prompt_stop_reason(&self, reason: &str) {
1152        *self
1153            .prompt_stop_reason
1154            .lock()
1155            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1156    }
1157
1158    /// Consume any prompt stop reason recorded during this prompt. The
1159    /// ACP adapter calls this once after the pipeline returns; pipelines
1160    /// that didn't run an `agent_loop` see `None` and the adapter falls
1161    /// back to `end_turn`.
1162    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1163        self.prompt_stop_reason
1164            .lock()
1165            .unwrap_or_else(|e| e.into_inner())
1166            .take()
1167    }
1168
1169    /// Consume any pending `skills/update` signal the host has sent.
1170    /// Returns `true` exactly once per notification, letting callers
1171    /// trigger a layered-discovery rebuild without polling false
1172    /// positives. See issue #73 for the hot-reload contract.
1173    pub fn take_skills_reload_signal(&self) -> bool {
1174        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1175    }
1176
1177    /// Manually mark the skill catalog as stale. Used by tests and by
1178    /// the CLI when an internal event (e.g. `harn install`) should
1179    /// trigger the same rebuild a `skills/update` notification would.
1180    pub fn signal_skills_reload(&self) {
1181        self.skills_reload_requested.store(true, Ordering::SeqCst);
1182    }
1183
1184    /// Call the host's `skills/list` RPC and return the raw JSON array
1185    /// it responded with. Shape:
1186    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1187    /// The CLI adapter converts each entry into a
1188    /// [`crate::skills::SkillManifestRef`].
1189    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1190        let result = self.call("skills/list", serde_json::json!({})).await?;
1191        match result {
1192            serde_json::Value::Array(items) => Ok(items),
1193            serde_json::Value::Object(map) => match map.get("skills") {
1194                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1195                _ => Err(VmError::Runtime(
1196                    "skills/list: host response must be an array or { skills: [...] }".into(),
1197                )),
1198            },
1199            _ => Err(VmError::Runtime(
1200                "skills/list: unexpected response shape".into(),
1201            )),
1202        }
1203    }
1204
1205    /// Call the host's `host/tools/list` RPC and return normalized tool
1206    /// descriptors. Shape:
1207    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1208    /// The bridge also accepts `{ "tools": [...] }` and
1209    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1210    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1211        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1212        parse_host_tools_list_response(result)
1213    }
1214
1215    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1216    /// raw JSON body so the CLI can inspect both the frontmatter fields
1217    /// and the skill markdown body in whatever shape the host sends.
1218    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1219        self.call("skills/fetch", serde_json::json!({ "id": id }))
1220            .await
1221    }
1222
1223    pub fn injection_state(&self) -> HostBridgeInjectionState {
1224        self.queued_transcript_injections.clone()
1225    }
1226
1227    pub async fn push_pending_user_message(
1228        &self,
1229        content: String,
1230        transcript_content: serde_json::Value,
1231        mode: &str,
1232    ) -> String {
1233        self.queued_transcript_injections
1234            .push_pending_user_message(content, transcript_content, mode)
1235            .await
1236    }
1237
1238    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1239        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1240            .await
1241    }
1242
1243    pub async fn revoke_pending_user_message(
1244        &self,
1245        message_id: &str,
1246    ) -> PendingUserMessageMutationResult {
1247        self.queued_transcript_injections
1248            .revoke_pending_user_message(message_id)
1249            .await
1250    }
1251
1252    pub async fn revoke_pending_reminder(
1253        &self,
1254        reminder_id: &str,
1255    ) -> PendingReminderMutationResult {
1256        self.queued_transcript_injections
1257            .revoke_pending_reminder(reminder_id)
1258            .await
1259    }
1260
1261    pub async fn pending_injections_json(&self) -> serde_json::Value {
1262        self.queued_transcript_injections
1263            .pending_injections_json()
1264            .await
1265    }
1266
1267    pub async fn replace_pending_user_message(
1268        &self,
1269        message_id: &str,
1270        content: String,
1271        transcript_content: serde_json::Value,
1272    ) -> PendingUserMessageMutationResult {
1273        self.queued_transcript_injections
1274            .replace_pending_user_message(message_id, content, transcript_content)
1275            .await
1276    }
1277
1278    pub async fn push_queued_session_remind_from_params(
1279        &self,
1280        params: &serde_json::Value,
1281    ) -> Result<String, String> {
1282        let reminder = queued_session_remind_from_params(params)?;
1283        let reminder_id = reminder.reminder.id.clone();
1284        self.queued_transcript_injections
1285            .push_session_reminder(reminder)
1286            .await;
1287        Ok(reminder_id)
1288    }
1289
1290    pub async fn take_queued_user_messages(
1291        &self,
1292        include_interrupt_immediate: bool,
1293        include_finish_step: bool,
1294        include_audit_only: bool,
1295    ) -> Vec<QueuedUserMessage> {
1296        let mut state = self.queued_transcript_injections.inner.lock().await;
1297        let mut selected = Vec::new();
1298        let mut retained = VecDeque::new();
1299        while let Some(injection) = state.queue.pop_front() {
1300            let should_take = match injection.mode() {
1301                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1302                QueuedUserMessageMode::FinishStep => include_finish_step,
1303                QueuedUserMessageMode::AuditOnly => include_audit_only,
1304            };
1305            match (should_take, injection) {
1306                (true, QueuedTranscriptInjection::User(message)) => {
1307                    state
1308                        .delivered_user_message_ids
1309                        .insert(message.message_id.clone());
1310                    selected.push(message);
1311                }
1312                (_, injection) => retained.push_back(injection),
1313            }
1314        }
1315        state.queue = retained;
1316        selected
1317    }
1318
1319    pub async fn take_queued_transcript_injections(
1320        &self,
1321        include_interrupt_immediate: bool,
1322        include_finish_step: bool,
1323        include_audit_only: bool,
1324    ) -> Vec<QueuedTranscriptInjection> {
1325        let mut state = self.queued_transcript_injections.inner.lock().await;
1326        let mut selected = Vec::new();
1327        let mut retained = VecDeque::new();
1328        while let Some(injection) = state.queue.pop_front() {
1329            let should_take = match injection.mode() {
1330                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1331                QueuedUserMessageMode::FinishStep => include_finish_step,
1332                QueuedUserMessageMode::AuditOnly => include_audit_only,
1333            };
1334            if should_take {
1335                match &injection {
1336                    QueuedTranscriptInjection::User(message) => {
1337                        state
1338                            .delivered_user_message_ids
1339                            .insert(message.message_id.clone());
1340                    }
1341                    QueuedTranscriptInjection::Reminder(reminder) => {
1342                        state
1343                            .delivered_reminder_ids
1344                            .insert(reminder.reminder.id.clone());
1345                    }
1346                }
1347                selected.push(injection);
1348            } else {
1349                retained.push_back(injection);
1350            }
1351        }
1352        state.queue = retained;
1353        selected
1354    }
1355
1356    pub async fn take_queued_user_messages_for(
1357        &self,
1358        checkpoint: DeliveryCheckpoint,
1359    ) -> Vec<QueuedUserMessage> {
1360        match checkpoint {
1361            DeliveryCheckpoint::InterruptImmediate => {
1362                self.take_queued_user_messages(true, false, false).await
1363            }
1364            DeliveryCheckpoint::AfterCurrentOperation => {
1365                self.take_queued_user_messages(false, true, false).await
1366            }
1367            DeliveryCheckpoint::EndOfInteraction => {
1368                self.take_queued_user_messages(false, false, true).await
1369            }
1370        }
1371    }
1372
1373    pub async fn take_queued_transcript_injections_for(
1374        &self,
1375        checkpoint: DeliveryCheckpoint,
1376    ) -> Vec<QueuedTranscriptInjection> {
1377        match checkpoint {
1378            DeliveryCheckpoint::InterruptImmediate => {
1379                self.take_queued_transcript_injections(true, false, false)
1380                    .await
1381            }
1382            DeliveryCheckpoint::AfterCurrentOperation => {
1383                self.take_queued_transcript_injections(false, true, false)
1384                    .await
1385            }
1386            DeliveryCheckpoint::EndOfInteraction => {
1387                self.take_queued_transcript_injections(false, false, true)
1388                    .await
1389            }
1390        }
1391    }
1392
1393    /// Send an output notification (for log/print in bridge mode).
1394    pub fn send_output(&self, text: &str) {
1395        self.notify("output", serde_json::json!({"text": text}));
1396    }
1397
1398    /// Send a progress notification with optional numeric progress and structured data.
1399    pub fn send_progress(
1400        &self,
1401        phase: &str,
1402        message: &str,
1403        progress: Option<i64>,
1404        total: Option<i64>,
1405        data: Option<serde_json::Value>,
1406    ) {
1407        let mut payload = serde_json::json!({"phase": phase, "message": message});
1408        if let Some(p) = progress {
1409            payload["progress"] = serde_json::json!(p);
1410        }
1411        if let Some(t) = total {
1412            payload["total"] = serde_json::json!(t);
1413        }
1414        if let Some(d) = data {
1415            payload["data"] = d;
1416        }
1417        self.notify("progress", payload);
1418    }
1419
1420    /// Send a structured log notification.
1421    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1422        let mut payload = serde_json::json!({"level": level, "message": message});
1423        if let Some(f) = fields {
1424            payload["fields"] = f;
1425        }
1426        self.notify("log", payload);
1427    }
1428
1429    /// Send a `session/update` with `call_start` — signals the beginning of
1430    /// an LLM call, tool call, or builtin call for observability.
1431    pub fn send_call_start(
1432        &self,
1433        call_id: &str,
1434        call_type: &str,
1435        name: &str,
1436        metadata: serde_json::Value,
1437    ) {
1438        let session_id = self.get_session_id();
1439        let script = self.get_script_name();
1440        let stream_publicly = metadata
1441            .get("stream_publicly")
1442            .and_then(|value| value.as_bool())
1443            .unwrap_or(true);
1444        self.visible_call_streams
1445            .lock()
1446            .unwrap_or_else(|e| e.into_inner())
1447            .insert(call_id.to_string(), stream_publicly);
1448        self.notify(
1449            "session/update",
1450            serde_json::json!({
1451                "sessionId": session_id,
1452                "update": {
1453                    "sessionUpdate": "call_start",
1454                    "content": {
1455                        "toolCallId": call_id,
1456                        "call_type": call_type,
1457                        "name": name,
1458                        "script": script,
1459                        "metadata": metadata,
1460                    },
1461                },
1462            }),
1463        );
1464    }
1465
1466    /// Send a `session/update` with `call_progress` — a streaming token delta
1467    /// from an in-flight LLM call.
1468    pub fn send_call_progress(
1469        &self,
1470        call_id: &str,
1471        delta: &str,
1472        accumulated_tokens: u64,
1473        user_visible: bool,
1474    ) {
1475        let session_id = self.get_session_id();
1476        let (visible_text, visible_delta) = {
1477            let stream_publicly = self
1478                .visible_call_streams
1479                .lock()
1480                .unwrap_or_else(|e| e.into_inner())
1481                .get(call_id)
1482                .copied()
1483                .unwrap_or(true);
1484            if !user_visible || !stream_publicly {
1485                (String::new(), String::new())
1486            } else {
1487                let mut states = self
1488                    .visible_call_states
1489                    .lock()
1490                    .unwrap_or_else(|e| e.into_inner());
1491                let state = states.entry(call_id.to_string()).or_default();
1492                state.push(delta, true)
1493            }
1494        };
1495        self.notify(
1496            "session/update",
1497            serde_json::json!({
1498                "sessionId": session_id,
1499                "update": {
1500                    "sessionUpdate": "call_progress",
1501                    "content": {
1502                        "toolCallId": call_id,
1503                        "delta": delta,
1504                        "accumulated_tokens": accumulated_tokens,
1505                        "visible_text": visible_text,
1506                        "visible_delta": visible_delta,
1507                        "user_visible": user_visible,
1508                    },
1509                },
1510            }),
1511        );
1512    }
1513
1514    /// Send a `session/update` with `call_end` — signals completion of a call.
1515    pub fn send_call_end(
1516        &self,
1517        call_id: &str,
1518        call_type: &str,
1519        name: &str,
1520        duration_ms: u64,
1521        status: &str,
1522        metadata: serde_json::Value,
1523    ) {
1524        let session_id = self.get_session_id();
1525        let script = self.get_script_name();
1526        self.visible_call_states
1527            .lock()
1528            .unwrap_or_else(|e| e.into_inner())
1529            .remove(call_id);
1530        self.visible_call_streams
1531            .lock()
1532            .unwrap_or_else(|e| e.into_inner())
1533            .remove(call_id);
1534        self.notify(
1535            "session/update",
1536            serde_json::json!({
1537                "sessionId": session_id,
1538                "update": {
1539                    "sessionUpdate": "call_end",
1540                    "content": {
1541                        "toolCallId": call_id,
1542                        "call_type": call_type,
1543                        "name": name,
1544                        "script": script,
1545                        "duration_ms": duration_ms,
1546                        "status": status,
1547                        "metadata": metadata,
1548                    },
1549                },
1550            }),
1551        );
1552    }
1553
1554    /// Send a worker lifecycle update for delegated/background execution.
1555    pub fn send_worker_update(
1556        &self,
1557        worker_id: &str,
1558        worker_name: &str,
1559        status: &str,
1560        metadata: serde_json::Value,
1561        audit: Option<&MutationSessionRecord>,
1562    ) {
1563        let session_id = self.get_session_id();
1564        let script = self.get_script_name();
1565        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1566        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1567        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1568        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1569        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1570        let lifecycle = serde_json::json!({
1571            "event": status,
1572            "worker_id": worker_id,
1573            "worker_name": worker_name,
1574            "started_at": started_at,
1575            "finished_at": finished_at,
1576        });
1577        self.notify(
1578            "session/update",
1579            serde_json::json!({
1580                "sessionId": session_id,
1581                "update": {
1582                    "sessionUpdate": "worker_update",
1583                    "content": {
1584                        "worker_id": worker_id,
1585                        "worker_name": worker_name,
1586                        "status": status,
1587                        "script": script,
1588                        "started_at": started_at,
1589                        "finished_at": finished_at,
1590                        "snapshot_path": snapshot_path,
1591                        "run_id": run_id,
1592                        "run_path": run_path,
1593                        "lifecycle": lifecycle,
1594                        "audit": audit,
1595                        "metadata": metadata,
1596                    },
1597                },
1598            }),
1599        );
1600    }
1601}
1602
1603/// Convert a serde_json::Value to a VmValue.
1604pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1605    crate::stdlib::json_to_vm_value(val)
1606}
1607
1608fn parse_host_tools_list_response(
1609    result: serde_json::Value,
1610) -> Result<Vec<serde_json::Value>, VmError> {
1611    let tools = match result {
1612        serde_json::Value::Array(items) => items,
1613        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1614            map.get("result")
1615                .and_then(|value| value.get("tools"))
1616                .cloned()
1617        }) {
1618            Some(serde_json::Value::Array(items)) => items,
1619            _ => {
1620                return Err(VmError::Runtime(
1621                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1622                ));
1623            }
1624        },
1625        _ => {
1626            return Err(VmError::Runtime(
1627                "host/tools/list: unexpected response shape".into(),
1628            ));
1629        }
1630    };
1631
1632    let mut normalized = Vec::with_capacity(tools.len());
1633    for tool in tools {
1634        let serde_json::Value::Object(map) = tool else {
1635            return Err(VmError::Runtime(
1636                "host/tools/list: every tool must be an object".into(),
1637            ));
1638        };
1639        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1640            return Err(VmError::Runtime(
1641                "host/tools/list: every tool must include a string `name`".into(),
1642            ));
1643        };
1644        let description = map
1645            .get("description")
1646            .and_then(|value| value.as_str())
1647            .or_else(|| {
1648                map.get("short_description")
1649                    .and_then(|value| value.as_str())
1650            })
1651            .unwrap_or_default();
1652        let schema = map
1653            .get("schema")
1654            .cloned()
1655            .or_else(|| map.get("parameters").cloned())
1656            .or_else(|| map.get("input_schema").cloned())
1657            .unwrap_or(serde_json::Value::Null);
1658        let deprecated = map
1659            .get("deprecated")
1660            .and_then(|value| value.as_bool())
1661            .unwrap_or(false);
1662        normalized.push(serde_json::json!({
1663            "name": name,
1664            "description": description,
1665            "schema": schema,
1666            "deprecated": deprecated,
1667        }));
1668    }
1669    Ok(normalized)
1670}
1671
1672#[cfg(test)]
1673mod tests {
1674    use super::*;
1675
1676    fn test_bridge() -> HostBridge {
1677        HostBridge::from_parts(
1678            Arc::new(Mutex::new(HashMap::new())),
1679            Arc::new(AtomicBool::new(false)),
1680            Arc::new(std::sync::Mutex::new(())),
1681            1,
1682        )
1683    }
1684
1685    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1686        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1687            Arc::new(Mutex::new(HashMap::new())),
1688            Arc::new(AtomicBool::new(false)),
1689            Arc::new(Notify::new()),
1690            Arc::new(|_| Ok(())),
1691            100,
1692            Some(owner.injection_state()),
1693        )
1694    }
1695
1696    #[test]
1697    fn test_json_rpc_request_format() {
1698        let request = crate::jsonrpc::request(
1699            1,
1700            "llm_call",
1701            serde_json::json!({
1702                "prompt": "Hello",
1703                "system": "Be helpful",
1704            }),
1705        );
1706        let s = serde_json::to_string(&request).unwrap();
1707        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1708        assert!(s.contains("\"id\":1"));
1709        assert!(s.contains("\"method\":\"llm_call\""));
1710    }
1711
1712    #[test]
1713    fn test_json_rpc_notification_format() {
1714        let notification =
1715            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1716        let s = serde_json::to_string(&notification).unwrap();
1717        assert!(s.contains("\"method\":\"output\""));
1718        assert!(!s.contains("\"id\""));
1719    }
1720
1721    #[test]
1722    fn test_json_rpc_error_response_parsing() {
1723        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1724        assert!(response.get("error").is_some());
1725        assert_eq!(
1726            response["error"]["message"].as_str().unwrap(),
1727            "Invalid request"
1728        );
1729    }
1730
1731    #[test]
1732    fn test_json_rpc_success_response_parsing() {
1733        let response = crate::jsonrpc::response(
1734            1,
1735            serde_json::json!({
1736                "text": "Hello world",
1737                "input_tokens": 10,
1738                "output_tokens": 5,
1739            }),
1740        );
1741        assert!(response.get("result").is_some());
1742        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1743    }
1744
1745    #[test]
1746    fn test_cancelled_flag() {
1747        let cancelled = Arc::new(AtomicBool::new(false));
1748        assert!(!cancelled.load(Ordering::SeqCst));
1749        cancelled.store(true, Ordering::SeqCst);
1750        assert!(cancelled.load(Ordering::SeqCst));
1751    }
1752
1753    #[test]
1754    fn pending_host_calls_return_when_cancellation_arrives() {
1755        let runtime = tokio::runtime::Builder::new_current_thread()
1756            .enable_all()
1757            .build()
1758            .unwrap();
1759        runtime.block_on(async {
1760            let pending = Arc::new(Mutex::new(HashMap::new()));
1761            let cancelled = Arc::new(AtomicBool::new(false));
1762            let bridge = HostBridge::from_parts_with_writer(
1763                pending.clone(),
1764                cancelled.clone(),
1765                Arc::new(|_| Ok(())),
1766                1,
1767            );
1768
1769            let call = bridge.call("host/work", serde_json::json!({}));
1770            tokio::pin!(call);
1771
1772            loop {
1773                tokio::select! {
1774                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1775                    _ = tokio::task::yield_now() => {}
1776                }
1777                if !pending.lock().await.is_empty() {
1778                    break;
1779                }
1780            }
1781
1782            cancelled.store(true, Ordering::SeqCst);
1783            bridge.cancel_notify.notify_waiters();
1784
1785            let result = tokio::time::timeout(Duration::from_secs(1), call)
1786                .await
1787                .expect("pending call should observe cancellation promptly");
1788            assert!(
1789                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1790            );
1791            assert!(pending.lock().await.is_empty());
1792        });
1793    }
1794
1795    #[test]
1796    fn call_progress_hides_non_user_visible_deltas() {
1797        let lines = Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
1798        let captured = lines.clone();
1799        let bridge = HostBridge::from_parts_with_writer(
1800            Arc::new(Mutex::new(HashMap::new())),
1801            Arc::new(AtomicBool::new(false)),
1802            Arc::new(move |line| {
1803                captured
1804                    .lock()
1805                    .unwrap_or_else(|e| e.into_inner())
1806                    .push(line.to_string());
1807                Ok(())
1808            }),
1809            1,
1810        );
1811
1812        bridge.send_call_start(
1813            "call-1",
1814            "llm",
1815            "llm_call",
1816            serde_json::json!({"stream_publicly": true}),
1817        );
1818        bridge.send_call_progress(
1819            "call-1",
1820            r#"{"verdict":"done","reasoning":"internal"}"#,
1821            1,
1822            false,
1823        );
1824
1825        let lines = lines.lock().unwrap_or_else(|e| e.into_inner());
1826        let progress: serde_json::Value =
1827            serde_json::from_str(&lines[1]).expect("call_progress notification json");
1828        let content = &progress["params"]["update"]["content"];
1829        assert_eq!(
1830            content["delta"],
1831            r#"{"verdict":"done","reasoning":"internal"}"#
1832        );
1833        assert_eq!(content["user_visible"], false);
1834        assert_eq!(content["visible_text"], "");
1835        assert_eq!(content["visible_delta"], "");
1836    }
1837
1838    #[test]
1839    fn call_progress_hides_non_public_streams() {
1840        let lines = Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
1841        let captured = lines.clone();
1842        let bridge = HostBridge::from_parts_with_writer(
1843            Arc::new(Mutex::new(HashMap::new())),
1844            Arc::new(AtomicBool::new(false)),
1845            Arc::new(move |line| {
1846                captured
1847                    .lock()
1848                    .unwrap_or_else(|e| e.into_inner())
1849                    .push(line.to_string());
1850                Ok(())
1851            }),
1852            1,
1853        );
1854
1855        bridge.send_call_start(
1856            "call-1",
1857            "llm",
1858            "llm_call",
1859            serde_json::json!({"stream_publicly": false}),
1860        );
1861        bridge.send_call_progress("call-1", "secret schema bytes", 1, true);
1862
1863        let lines = lines.lock().unwrap_or_else(|e| e.into_inner());
1864        let progress: serde_json::Value =
1865            serde_json::from_str(&lines[1]).expect("call_progress notification json");
1866        let content = &progress["params"]["update"]["content"];
1867        assert_eq!(content["delta"], "secret schema bytes");
1868        assert_eq!(content["user_visible"], true);
1869        assert_eq!(content["visible_text"], "");
1870        assert_eq!(content["visible_delta"], "");
1871    }
1872
1873    #[test]
1874    fn queued_messages_are_filtered_by_delivery_mode() {
1875        let runtime = tokio::runtime::Builder::new_current_thread()
1876            .enable_all()
1877            .build()
1878            .unwrap();
1879        runtime.block_on(async {
1880            let bridge = test_bridge();
1881            bridge
1882                .push_queued_user_message("first".to_string(), "finish_step")
1883                .await;
1884            bridge
1885                .push_queued_user_message("second".to_string(), "audit_only")
1886                .await;
1887
1888            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1889            assert_eq!(finish_step.len(), 1);
1890            assert_eq!(finish_step[0].content, "first");
1891
1892            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1893            assert_eq!(audit_only.len(), 1);
1894            assert_eq!(audit_only[0].content, "second");
1895        });
1896    }
1897
1898    #[test]
1899    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1900        let runtime = tokio::runtime::Builder::new_current_thread()
1901            .enable_all()
1902            .build()
1903            .unwrap();
1904        runtime.block_on(async {
1905            let bridge = test_bridge();
1906            let first_id = bridge
1907                .push_pending_user_message(
1908                    "first".to_string(),
1909                    serde_json::json!("first"),
1910                    "audit_only",
1911                )
1912                .await;
1913            let second_id = bridge
1914                .push_pending_user_message(
1915                    "second".to_string(),
1916                    serde_json::json!("second"),
1917                    "audit_only",
1918                )
1919                .await;
1920
1921            assert_eq!(
1922                bridge
1923                    .replace_pending_user_message(
1924                        &second_id,
1925                        "second edited".to_string(),
1926                        serde_json::json!("second edited"),
1927                    )
1928                    .await,
1929                PendingUserMessageMutationResult::Mutated
1930            );
1931            assert_eq!(
1932                bridge.revoke_pending_user_message(&first_id).await,
1933                PendingUserMessageMutationResult::Mutated
1934            );
1935            assert_eq!(
1936                bridge.revoke_pending_user_message(&first_id).await,
1937                PendingUserMessageMutationResult::AlreadyRevoked
1938            );
1939
1940            let delivered = bridge
1941                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1942                .await;
1943            assert_eq!(delivered.len(), 1);
1944            assert_eq!(delivered[0].message_id, second_id);
1945            assert_eq!(delivered[0].content, "second edited");
1946
1947            assert_eq!(
1948                bridge.revoke_pending_user_message(&second_id).await,
1949                PendingUserMessageMutationResult::AlreadyDelivered
1950            );
1951            assert_eq!(
1952                bridge
1953                    .replace_pending_user_message(
1954                        &second_id,
1955                        "too late".to_string(),
1956                        serde_json::json!("too late"),
1957                    )
1958                    .await,
1959                PendingUserMessageMutationResult::AlreadyDelivered
1960            );
1961            assert_eq!(
1962                bridge.revoke_pending_user_message("missing").await,
1963                PendingUserMessageMutationResult::UnknownMessageId
1964            );
1965        });
1966    }
1967
1968    #[test]
1969    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1970        let runtime = tokio::runtime::Builder::new_current_thread()
1971            .enable_all()
1972            .build()
1973            .unwrap();
1974        runtime.block_on(async {
1975            let bridge = test_bridge();
1976            let first_id = bridge
1977                .push_pending_user_message(
1978                    "first".to_string(),
1979                    serde_json::json!("first"),
1980                    "finish_step",
1981                )
1982                .await;
1983            let second_id = bridge
1984                .push_pending_user_message(
1985                    "second".to_string(),
1986                    serde_json::json!("second"),
1987                    "finish_step",
1988                )
1989                .await;
1990            assert_eq!(
1991                bridge
1992                    .replace_pending_user_message(
1993                        &first_id,
1994                        "first edited".to_string(),
1995                        serde_json::json!("first edited"),
1996                    )
1997                    .await,
1998                PendingUserMessageMutationResult::Mutated
1999            );
2000
2001            let delivered = bridge
2002                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
2003                .await;
2004            assert_eq!(
2005                delivered
2006                    .iter()
2007                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
2008                    .collect::<Vec<_>>(),
2009                vec![
2010                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
2011                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
2012                ]
2013            );
2014        });
2015    }
2016
2017    #[test]
2018    fn pending_user_message_state_survives_bridge_replacement() {
2019        let runtime = tokio::runtime::Builder::new_current_thread()
2020            .enable_all()
2021            .build()
2022            .unwrap();
2023        runtime.block_on(async {
2024            let bridge = test_bridge();
2025            let revoked_id = bridge
2026                .push_pending_user_message(
2027                    "revoke me".to_string(),
2028                    serde_json::json!("revoke me"),
2029                    "audit_only",
2030                )
2031                .await;
2032            let delivered_id = bridge
2033                .push_pending_user_message(
2034                    "deliver me".to_string(),
2035                    serde_json::json!("deliver me"),
2036                    "audit_only",
2037                )
2038                .await;
2039            assert_eq!(
2040                bridge.revoke_pending_user_message(&revoked_id).await,
2041                PendingUserMessageMutationResult::Mutated
2042            );
2043            bridge.cancelled.store(true, Ordering::SeqCst);
2044
2045            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
2046            assert_eq!(
2047                replacement_bridge
2048                    .revoke_pending_user_message(&revoked_id)
2049                    .await,
2050                PendingUserMessageMutationResult::AlreadyRevoked
2051            );
2052            let delivered = replacement_bridge
2053                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
2054                .await;
2055            assert_eq!(delivered.len(), 1);
2056            assert_eq!(delivered[0].message_id, delivered_id);
2057            assert_eq!(delivered[0].content, "deliver me");
2058            assert_eq!(
2059                bridge.revoke_pending_user_message(&delivered_id).await,
2060                PendingUserMessageMutationResult::AlreadyDelivered
2061            );
2062        });
2063    }
2064
2065    #[test]
2066    fn queued_transcript_injections_preserve_user_reminder_separation() {
2067        let runtime = tokio::runtime::Builder::new_current_thread()
2068            .enable_all()
2069            .build()
2070            .unwrap();
2071        runtime.block_on(async {
2072            let bridge = test_bridge();
2073            bridge
2074                .push_queued_user_message("human follow-up".to_string(), "finish_step")
2075                .await;
2076            let reminder_id = bridge
2077                .push_queued_session_remind_from_params(&serde_json::json!({
2078                    "body": "Host-provided ambient context.",
2079                    "tags": ["host"],
2080                    "dedupe_key": "host-context",
2081                    "ttl_turns": 2,
2082                    "mode": "audit_only",
2083                    "_meta": {"harn": {"source": "test"}},
2084                }))
2085                .await
2086                .expect("valid reminder");
2087
2088            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2089            assert_eq!(finish_step.len(), 1);
2090            assert_eq!(finish_step[0].content, "human follow-up");
2091
2092            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2093            assert!(no_user_messages.is_empty());
2094
2095            let injections = bridge
2096                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2097                .await;
2098            assert_eq!(injections.len(), 1);
2099            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2100                panic!("expected queued reminder");
2101            };
2102            assert_eq!(reminder.reminder.id, reminder_id);
2103            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2104            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2105            assert_eq!(
2106                reminder.reminder.dedupe_key.as_deref(),
2107                Some("host-context")
2108            );
2109            assert_eq!(reminder.reminder.ttl_turns, Some(2));
2110            assert_eq!(
2111                reminder.reminder.source,
2112                crate::llm::helpers::ReminderSource::Bridge
2113            );
2114        });
2115    }
2116
2117    #[test]
2118    fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2119        let runtime = tokio::runtime::Builder::new_current_thread()
2120            .enable_all()
2121            .build()
2122            .unwrap();
2123        runtime.block_on(async {
2124            let bridge = test_bridge();
2125            let message_id = bridge
2126                .push_pending_user_message(
2127                    "human follow-up".to_string(),
2128                    serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2129                    "finish_step",
2130                )
2131                .await;
2132            let reminder_id = bridge
2133                .push_queued_session_remind_from_params(&serde_json::json!({
2134                    "id": "rem-test",
2135                    "body": "Host reminder",
2136                    "tags": ["host"],
2137                    "dedupe_key": "host-reminder",
2138                    "ttl_turns": 2,
2139                    "mode": "interrupt_immediate",
2140                }))
2141                .await
2142                .expect("valid session/remind payload");
2143
2144            let pending = bridge.pending_injections_json().await;
2145            assert_eq!(pending["pendingCount"], 2);
2146            assert_eq!(pending["injections"][0]["kind"], "user");
2147            assert_eq!(pending["injections"][0]["id"], message_id);
2148            assert_eq!(pending["injections"][0]["messageId"], message_id);
2149            assert_eq!(pending["injections"][0]["mode"], "finish_step");
2150            assert_eq!(pending["injections"][0]["position"], 0);
2151            assert_eq!(pending["injections"][1]["kind"], "reminder");
2152            assert_eq!(pending["injections"][1]["id"], reminder_id);
2153            assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2154            assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2155            assert_eq!(pending["injections"][1]["body"], "Host reminder");
2156            assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2157            assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2158            assert_eq!(pending["injections"][1]["position"], 1);
2159        });
2160    }
2161
2162    #[test]
2163    fn pending_reminders_support_revoke_and_delivery_states() {
2164        let runtime = tokio::runtime::Builder::new_current_thread()
2165            .enable_all()
2166            .build()
2167            .unwrap();
2168        runtime.block_on(async {
2169            let bridge = test_bridge();
2170            let revoked_id = bridge
2171                .push_queued_session_remind_from_params(&serde_json::json!({
2172                    "id": "rem-revoke",
2173                    "body": "remove me",
2174                    "mode": "finish_step",
2175                }))
2176                .await
2177                .expect("valid session/remind payload");
2178            let delivered_id = bridge
2179                .push_queued_session_remind_from_params(&serde_json::json!({
2180                    "id": "rem-deliver",
2181                    "body": "deliver me",
2182                    "mode": "finish_step",
2183                }))
2184                .await
2185                .expect("valid session/remind payload");
2186
2187            assert_eq!(
2188                bridge.revoke_pending_reminder(&revoked_id).await,
2189                PendingReminderMutationResult::Mutated
2190            );
2191            assert_eq!(
2192                bridge.revoke_pending_reminder(&revoked_id).await,
2193                PendingReminderMutationResult::AlreadyRevoked
2194            );
2195
2196            let pending = bridge.pending_injections_json().await;
2197            assert_eq!(pending["pendingCount"], 1);
2198            assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2199
2200            let delivered = bridge
2201                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2202                .await;
2203            assert_eq!(delivered.len(), 1);
2204            let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2205                panic!("expected delivered reminder");
2206            };
2207            assert_eq!(reminder.reminder.id, delivered_id);
2208
2209            assert_eq!(
2210                bridge.revoke_pending_reminder(&delivered_id).await,
2211                PendingReminderMutationResult::AlreadyDelivered
2212            );
2213            assert_eq!(
2214                bridge.revoke_pending_reminder("missing").await,
2215                PendingReminderMutationResult::UnknownReminderId
2216            );
2217        });
2218    }
2219
2220    #[test]
2221    fn bridge_remind_modes_honor_delivery_checkpoints() {
2222        let runtime = tokio::runtime::Builder::new_current_thread()
2223            .enable_all()
2224            .build()
2225            .unwrap();
2226        runtime.block_on(async {
2227            let cases = [
2228                (
2229                    "interrupt_immediate",
2230                    DeliveryCheckpoint::InterruptImmediate,
2231                    DeliveryCheckpoint::AfterCurrentOperation,
2232                ),
2233                (
2234                    "finish_step",
2235                    DeliveryCheckpoint::AfterCurrentOperation,
2236                    DeliveryCheckpoint::EndOfInteraction,
2237                ),
2238                (
2239                    "audit_only",
2240                    DeliveryCheckpoint::EndOfInteraction,
2241                    DeliveryCheckpoint::InterruptImmediate,
2242                ),
2243            ];
2244
2245            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2246                let bridge = test_bridge();
2247                bridge
2248                    .push_queued_session_remind_from_params(&serde_json::json!({
2249                        "body": format!("Reminder for {mode}"),
2250                        "mode": mode,
2251                    }))
2252                    .await
2253                    .expect("valid session/remind payload");
2254
2255                let premature = bridge
2256                    .take_queued_transcript_injections_for(wrong_checkpoint)
2257                    .await;
2258                assert!(
2259                    premature.is_empty(),
2260                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2261                );
2262
2263                let delivered = bridge
2264                    .take_queued_transcript_injections_for(expected_checkpoint)
2265                    .await;
2266                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2267                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2268                    panic!("expected reminder for {mode}");
2269                };
2270                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2271            }
2272        });
2273    }
2274
2275    #[test]
2276    fn session_remind_validation_rejects_user_message_shape() {
2277        let err = queued_session_remind_from_params(&serde_json::json!({
2278            "content": "this is still a user message",
2279            "mode": "interrupt_immediate",
2280        }))
2281        .expect_err("session/remind must require a reminder body");
2282        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2283        assert!(err.contains("body"));
2284    }
2285
2286    #[test]
2287    fn session_remind_validation_rejects_unknown_options_separately() {
2288        let err = queued_session_remind_from_params(&serde_json::json!({
2289            "body": "valid body",
2290            "unknown_host_field": true,
2291        }))
2292        .expect_err("session/remind must reject unknown top-level fields");
2293        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2294        assert!(err.contains("unknown_host_field"));
2295    }
2296
2297    #[test]
2298    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2299        let err = queued_session_remind_from_params(&serde_json::json!({
2300            "body": "valid body",
2301            "propagate": "workspace",
2302        }))
2303        .expect_err("session/remind must reject unknown propagate values");
2304        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2305        assert!(err.contains("propagate"));
2306    }
2307
2308    #[test]
2309    fn test_json_result_to_vm_value_string() {
2310        let val = serde_json::json!("hello");
2311        let vm_val = json_result_to_vm_value(&val);
2312        assert_eq!(vm_val.display(), "hello");
2313    }
2314
2315    #[test]
2316    fn test_json_result_to_vm_value_dict() {
2317        let val = serde_json::json!({"name": "test", "count": 42});
2318        let vm_val = json_result_to_vm_value(&val);
2319        let VmValue::Dict(d) = &vm_val else {
2320            unreachable!("Expected Dict, got {:?}", vm_val);
2321        };
2322        assert_eq!(d.get("name").unwrap().display(), "test");
2323        assert_eq!(d.get("count").unwrap().display(), "42");
2324    }
2325
2326    #[test]
2327    fn test_json_result_to_vm_value_null() {
2328        let val = serde_json::json!(null);
2329        let vm_val = json_result_to_vm_value(&val);
2330        assert!(matches!(vm_val, VmValue::Nil));
2331    }
2332
2333    #[test]
2334    fn test_json_result_to_vm_value_nested() {
2335        let val = serde_json::json!({
2336            "text": "response",
2337            "tool_calls": [
2338                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2339            ],
2340            "input_tokens": 100,
2341            "output_tokens": 50,
2342        });
2343        let vm_val = json_result_to_vm_value(&val);
2344        let VmValue::Dict(d) = &vm_val else {
2345            unreachable!("Expected Dict, got {:?}", vm_val);
2346        };
2347        assert_eq!(d.get("text").unwrap().display(), "response");
2348        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2349            unreachable!("Expected List for tool_calls");
2350        };
2351        assert_eq!(list.len(), 1);
2352    }
2353
2354    #[test]
2355    fn parse_host_tools_list_accepts_object_wrapper() {
2356        let tools = parse_host_tools_list_response(serde_json::json!({
2357            "tools": [
2358                {
2359                    "name": "Read",
2360                    "description": "Read a file",
2361                    "schema": {"type": "object"},
2362                }
2363            ]
2364        }))
2365        .expect("tool list");
2366
2367        assert_eq!(tools.len(), 1);
2368        assert_eq!(tools[0]["name"], "Read");
2369        assert_eq!(tools[0]["deprecated"], false);
2370    }
2371
2372    #[test]
2373    fn parse_host_tools_list_accepts_compat_fields() {
2374        let tools = parse_host_tools_list_response(serde_json::json!({
2375            "result": {
2376                "tools": [
2377                    {
2378                        "name": "Edit",
2379                        "short_description": "Apply an edit",
2380                        "input_schema": {"type": "object"},
2381                        "deprecated": true,
2382                    }
2383                ]
2384            }
2385        }))
2386        .expect("tool list");
2387
2388        assert_eq!(tools[0]["description"], "Apply an edit");
2389        assert_eq!(tools[0]["schema"]["type"], "object");
2390        assert_eq!(tools[0]["deprecated"], true);
2391    }
2392
2393    #[test]
2394    fn parse_host_tools_list_requires_tool_names() {
2395        let err = parse_host_tools_list_response(serde_json::json!({
2396            "tools": [
2397                {"description": "missing name"}
2398            ]
2399        }))
2400        .expect_err("expected error");
2401        assert!(err
2402            .to_string()
2403            .contains("host/tools/list: every tool must include a string `name`"));
2404    }
2405
2406    #[test]
2407    fn test_timeout_duration() {
2408        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2409    }
2410}