Skip to main content

harn_vm/
bridge.rs

1//! JSON-RPC 2.0 bridge for host communication.
2//!
3//! When `harn run --bridge` is used, the VM delegates builtins (llm_call,
4//! file I/O, tool execution) to a host process over stdin/stdout JSON-RPC.
5//! The host application handles these requests using its own providers.
6
7use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26/// Default timeout for bridge calls (5 minutes).
27const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32    Arc::new(move |line: &str| {
33        let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34        let mut stdout = std::io::stdout().lock();
35        stdout
36            .write_all(line.as_bytes())
37            .map_err(|e| format!("Bridge write error: {e}"))?;
38        stdout
39            .write_all(b"\n")
40            .map_err(|e| format!("Bridge write error: {e}"))?;
41        stdout
42            .flush()
43            .map_err(|e| format!("Bridge flush error: {e}"))?;
44        Ok(())
45    })
46}
47
48/// A JSON-RPC 2.0 bridge to a host process over stdin/stdout.
49///
50/// The bridge sends requests to the host on stdout and receives responses
51/// on stdin. A background task reads stdin and dispatches responses to
52/// waiting callers by request ID. All stdout writes are serialized through
53/// a mutex to prevent interleaving.
54pub struct HostBridge {
55    next_id: AtomicU64,
56    /// Pending request waiters, keyed by JSON-RPC id.
57    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58    /// Whether the host has sent a cancel notification.
59    cancelled: Arc<AtomicBool>,
60    /// Wakes pending host calls when cancellation arrives.
61    cancel_notify: Arc<Notify>,
62    /// Transport writer used to send JSON-RPC to the host.
63    writer: HostBridgeWriter,
64    /// ACP session ID (set in ACP mode for session-scoped notifications).
65    session_id: std::sync::Mutex<String>,
66    /// Name of the currently executing Harn script (without .harn suffix).
67    script_name: std::sync::Mutex<String>,
68    /// Transcript injections queued by the host while a run is active.
69    queued_transcript_injections: HostBridgeInjectionState,
70    /// Host-triggered resume signal for daemon agents.
71    resume_requested: Arc<AtomicBool>,
72    /// Host-triggered skill-registry invalidation signal. Set when the
73    /// host sends a `skills/update` notification; consumed by the CLI
74    /// between runs (watch mode, long-running agents) to rebuild the
75    /// layered skill catalog from its current filesystem + host state.
76    skills_reload_requested: Arc<AtomicBool>,
77    /// Whether the current daemon-mode agent loop is blocked in idle wait.
78    daemon_idle: Arc<AtomicBool>,
79    /// Canonical ACP `stopReason` recorded by the most recent `agent_loop`
80    /// finalize during this prompt. Read once by the ACP adapter when the
81    /// pipeline returns and populated by `host_agent_session_finalize`.
82    /// Pipelines that don't run an agent loop leave this `None`, in which
83    /// case the adapter falls back to `end_turn`.
84    prompt_stop_reason: std::sync::Mutex<Option<String>>,
85    /// Per-call visible assistant text state for call_progress notifications.
86    visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87    /// Whether an LLM call's deltas should be exposed to end users while streaming.
88    visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89    /// Optional in-process host-module backend used by `harn playground`.
90    in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94    module_path: PathBuf,
95    exported_functions: BTreeMap<String, Arc<VmClosure>>,
96    vm: Vm,
97}
98
99impl InProcessHost {
100    /// Box-pin'd to break the static recursion between the VM's hot dispatch
101    /// loop and the bridge: a bridge-backed builtin spawns a child VM that
102    /// calls back into the dispatch loop via `call_closure_pub`. Indirecting
103    /// at this slow-path boundary keeps the recursion satisfied without
104    /// allocating per call in the hot per-callback path.
105    fn dispatch<'a>(
106        &'a self,
107        method: &'a str,
108        params: serde_json::Value,
109    ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110        Box::pin(async move {
111            match method {
112                "builtin_call" => {
113                    let name = params
114                        .get("name")
115                        .and_then(|value| value.as_str())
116                        .unwrap_or_default();
117                    let args = params
118                        .get("args")
119                        .and_then(|value| value.as_array())
120                        .cloned()
121                        .unwrap_or_default()
122                        .into_iter()
123                        .map(|value| json_result_to_vm_value(&value))
124                        .collect::<Vec<_>>();
125                    self.invoke_export(name, &args).await
126                }
127                "host/tools/list" => self
128                    .invoke_optional_export("host_tools_list", &[])
129                    .await
130                    .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131                "session/request_permission" => self.request_permission(params).await,
132                other => Err(VmError::Runtime(format!(
133                    "playground host backend does not implement bridge method '{other}'"
134                ))),
135            }
136        })
137    }
138
139    async fn invoke_export(
140        &self,
141        name: &str,
142        args: &[VmValue],
143    ) -> Result<serde_json::Value, VmError> {
144        let Some(closure) = self.exported_functions.get(name) else {
145            return Err(VmError::Runtime(format!(
146                "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147                self.module_path.display()
148            )));
149        };
150
151        let mut vm = self.vm.child_vm_for_host();
152        let result = vm.call_closure_pub(closure, args).await?;
153        Ok(crate::llm::vm_value_to_json(&result))
154    }
155
156    async fn invoke_optional_export(
157        &self,
158        name: &str,
159        args: &[VmValue],
160    ) -> Result<Option<serde_json::Value>, VmError> {
161        if !self.exported_functions.contains_key(name) {
162            return Ok(None);
163        }
164        self.invoke_export(name, args).await.map(Some)
165    }
166
167    async fn request_permission(
168        &self,
169        params: serde_json::Value,
170    ) -> Result<serde_json::Value, VmError> {
171        // No exported `request_permission` means the playground host has no
172        // approval policy, so it grants through the canonical ACP option.
173        let Some(closure) = self.exported_functions.get("request_permission") else {
174            return Ok(crate::llm::acp_permission::allow_response());
175        };
176
177        let tool_call = params.get("toolCall");
178        let tool_name = tool_call
179            .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181            .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182            .and_then(|value| value.as_str())
183            .unwrap_or_default();
184        let tool_args = tool_call
185            .and_then(|tool_call| tool_call.get("rawInput"))
186            .map(json_result_to_vm_value)
187            .unwrap_or(VmValue::Nil);
188        let full_payload = json_result_to_vm_value(&params);
189
190        let arg_count = closure.func.params.len();
191        let args = if arg_count >= 3 {
192            vec![
193                VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
194                tool_args,
195                full_payload,
196            ]
197        } else if arg_count == 2 {
198            vec![
199                VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
200                tool_args,
201            ]
202        } else if arg_count == 1 {
203            vec![full_payload]
204        } else {
205            Vec::new()
206        };
207
208        let mut vm = self.vm.child_vm_for_host();
209        let result = vm.call_closure_pub(closure, &args).await?;
210        // Translate the script's verdict into a canonical ACP response
211        // (`{ outcome: { outcome: "selected" | "cancelled", optionId? } }`).
212        // The script API stays ergonomic — bool / string-reason / dict — but
213        // the wire shape is canonical.
214        let payload = match result {
215            VmValue::Bool(granted) => {
216                if granted {
217                    crate::llm::acp_permission::allow_response()
218                } else {
219                    crate::llm::acp_permission::reject_response(None)
220                }
221            }
222            VmValue::String(reason) if !reason.is_empty() => {
223                crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224            }
225            other => {
226                let json = crate::llm::vm_value_to_json(&other);
227                if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228                    if granted {
229                        crate::llm::acp_permission::allow_response()
230                    } else {
231                        crate::llm::acp_permission::reject_response(
232                            json.get("reason")
233                                .and_then(|value| value.as_str())
234                                .map(str::to_string),
235                        )
236                    }
237                } else if json.get("outcome").is_some() {
238                    // The script already returned a canonical-shaped outcome.
239                    json
240                } else if other.is_truthy() {
241                    crate::llm::acp_permission::allow_response()
242                } else {
243                    crate::llm::acp_permission::reject_response(None)
244                }
245            }
246        };
247        Ok(payload)
248    }
249}
250
251/// How a queued bridge injection is delivered into the agent loop.
252///
253/// `AuditOnly` injections drain at `loop_exit`, *after* the last LLM call has
254/// returned, so they land in the transcript audit but are **never rendered into
255/// a model prompt**.
256/// Hosts that want the model to react to the reminder on its final
257/// iteration should use `FinishStep` instead, which drains at every
258/// `iteration_start` / `post_tool_dispatch` / `iteration_end` checkpoint.
259#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260pub enum QueuedUserMessageMode {
261    InterruptImmediate,
262    FinishStep,
263    AuditOnly,
264}
265
266#[derive(Clone, Copy, Debug, PartialEq, Eq)]
267pub enum DeliveryCheckpoint {
268    InterruptImmediate,
269    AfterCurrentOperation,
270    EndOfInteraction,
271}
272
273impl QueuedUserMessageMode {
274    fn from_str(value: &str) -> Self {
275        match value {
276            "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
277            // `steer` is the ACP `session/inject` alias for mid-turn
278            // user-message delivery at the next tool boundary; it maps to
279            // the same `FinishStep` checkpoint as `finish_step`.
280            "finish_step" | "after_current_operation" | "steer" => Self::FinishStep,
281            // `queue` is the explicit ACP alias for the audit-only path.
282            "queue" => Self::AuditOnly,
283            // Unknown / missing modes fall through to the safest option:
284            // record for audit, do not preempt the loop. Pre-#2212 hosts
285            // that send `wait_for_completion` are caught by this arm —
286            // the canonical name is `audit_only` going forward.
287            _ => Self::AuditOnly,
288        }
289    }
290
291    fn as_str(self) -> &'static str {
292        match self {
293            Self::InterruptImmediate => "interrupt_immediate",
294            Self::FinishStep => "finish_step",
295            Self::AuditOnly => "audit_only",
296        }
297    }
298}
299
300#[derive(Clone, Debug, PartialEq, Eq)]
301pub struct QueuedUserMessage {
302    pub message_id: String,
303    pub content: String,
304    pub transcript_content: serde_json::Value,
305    pub mode: QueuedUserMessageMode,
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct QueuedReminder {
310    pub reminder: crate::llm::helpers::SystemReminder,
311    pub mode: QueuedUserMessageMode,
312}
313
314#[derive(Clone, Debug, PartialEq, Eq)]
315pub enum QueuedTranscriptInjection {
316    User(QueuedUserMessage),
317    Reminder(QueuedReminder),
318}
319
320#[derive(Debug, Default)]
321struct QueuedTranscriptInjections {
322    queue: VecDeque<QueuedTranscriptInjection>,
323    revoked_user_message_ids: HashSet<String>,
324    delivered_user_message_ids: HashSet<String>,
325    revoked_reminder_ids: HashSet<String>,
326    delivered_reminder_ids: HashSet<String>,
327}
328
329#[derive(Clone, Debug, Default)]
330pub struct HostBridgeInjectionState {
331    inner: Arc<Mutex<QueuedTranscriptInjections>>,
332}
333
334#[derive(Clone, Copy, Debug, PartialEq, Eq)]
335pub enum PendingUserMessageMutationResult {
336    Mutated,
337    AlreadyRevoked,
338    AlreadyDelivered,
339    UnknownMessageId,
340}
341
342impl QueuedTranscriptInjection {
343    fn mode(&self) -> QueuedUserMessageMode {
344        match self {
345            Self::User(message) => message.mode,
346            Self::Reminder(reminder) => reminder.mode,
347        }
348    }
349
350    fn pending_json(&self, position: usize) -> serde_json::Value {
351        match self {
352            Self::User(message) => serde_json::json!({
353                "kind": "user",
354                "id": message.message_id,
355                "messageId": message.message_id,
356                "mode": message.mode.as_str(),
357                "position": position,
358                "content": message.transcript_content,
359            }),
360            Self::Reminder(reminder) => serde_json::json!({
361                "kind": "reminder",
362                "id": reminder.reminder.id,
363                "reminderId": reminder.reminder.id,
364                "mode": reminder.mode.as_str(),
365                "position": position,
366                "body": reminder.reminder.body,
367                "tags": reminder.reminder.tags,
368                "dedupeKey": reminder.reminder.dedupe_key,
369                "ttlTurns": reminder.reminder.ttl_turns,
370                "preserveOnCompact": reminder.reminder.preserve_on_compact,
371                "propagate": reminder.reminder.propagate.as_str(),
372                "roleHint": reminder.reminder.role_hint.as_str(),
373                "source": reminder.reminder.source.as_str(),
374                "firedAtTurn": reminder.reminder.fired_at_turn,
375                "originatingAgentId": reminder.reminder.originating_agent_id,
376            }),
377        }
378    }
379}
380
381#[derive(Clone, Copy, Debug, PartialEq, Eq)]
382pub enum PendingReminderMutationResult {
383    Mutated,
384    AlreadyRevoked,
385    AlreadyDelivered,
386    UnknownReminderId,
387}
388
389fn new_inject_message_id() -> String {
390    format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
391}
392
393impl HostBridgeInjectionState {
394    pub fn new() -> Self {
395        Self::default()
396    }
397
398    pub async fn push_pending_user_message(
399        &self,
400        content: String,
401        transcript_content: serde_json::Value,
402        mode: &str,
403    ) -> String {
404        let message_id = new_inject_message_id();
405        self.inner
406            .lock()
407            .await
408            .queue
409            .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
410                message_id: message_id.clone(),
411                content,
412                transcript_content,
413                mode: QueuedUserMessageMode::from_str(mode),
414            }));
415        message_id
416    }
417
418    pub async fn revoke_pending_user_message(
419        &self,
420        message_id: &str,
421    ) -> PendingUserMessageMutationResult {
422        let mut state = self.inner.lock().await;
423        let mut retained = VecDeque::new();
424        let mut revoked = false;
425        while let Some(injection) = state.queue.pop_front() {
426            match &injection {
427                QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
428                    revoked = true;
429                }
430                _ => retained.push_back(injection),
431            }
432        }
433        state.queue = retained;
434        if revoked {
435            state
436                .revoked_user_message_ids
437                .insert(message_id.to_string());
438            return PendingUserMessageMutationResult::Mutated;
439        }
440        if state.revoked_user_message_ids.contains(message_id) {
441            PendingUserMessageMutationResult::AlreadyRevoked
442        } else if state.delivered_user_message_ids.contains(message_id) {
443            PendingUserMessageMutationResult::AlreadyDelivered
444        } else {
445            PendingUserMessageMutationResult::UnknownMessageId
446        }
447    }
448
449    pub async fn revoke_pending_reminder(
450        &self,
451        reminder_id: &str,
452    ) -> PendingReminderMutationResult {
453        let mut state = self.inner.lock().await;
454        let mut retained = VecDeque::new();
455        let mut revoked = false;
456        while let Some(injection) = state.queue.pop_front() {
457            match &injection {
458                QueuedTranscriptInjection::Reminder(reminder)
459                    if reminder.reminder.id == reminder_id =>
460                {
461                    revoked = true;
462                }
463                _ => retained.push_back(injection),
464            }
465        }
466        state.queue = retained;
467        if revoked {
468            state.revoked_reminder_ids.insert(reminder_id.to_string());
469            return PendingReminderMutationResult::Mutated;
470        }
471        if state.revoked_reminder_ids.contains(reminder_id) {
472            PendingReminderMutationResult::AlreadyRevoked
473        } else if state.delivered_reminder_ids.contains(reminder_id) {
474            PendingReminderMutationResult::AlreadyDelivered
475        } else {
476            PendingReminderMutationResult::UnknownReminderId
477        }
478    }
479
480    pub async fn replace_pending_user_message(
481        &self,
482        message_id: &str,
483        content: String,
484        transcript_content: serde_json::Value,
485    ) -> PendingUserMessageMutationResult {
486        let mut state = self.inner.lock().await;
487        for injection in &mut state.queue {
488            if let QueuedTranscriptInjection::User(message) = injection {
489                if message.message_id == message_id {
490                    message.content = content;
491                    message.transcript_content = transcript_content;
492                    return PendingUserMessageMutationResult::Mutated;
493                }
494            }
495        }
496        if state.revoked_user_message_ids.contains(message_id) {
497            PendingUserMessageMutationResult::AlreadyRevoked
498        } else if state.delivered_user_message_ids.contains(message_id) {
499            PendingUserMessageMutationResult::AlreadyDelivered
500        } else {
501            PendingUserMessageMutationResult::UnknownMessageId
502        }
503    }
504
505    async fn push_session_reminder(&self, reminder: QueuedReminder) {
506        self.inner
507            .lock()
508            .await
509            .queue
510            .push_back(QueuedTranscriptInjection::Reminder(reminder));
511    }
512
513    pub async fn pending_injections_json(&self) -> serde_json::Value {
514        let state = self.inner.lock().await;
515        let injections = state
516            .queue
517            .iter()
518            .enumerate()
519            .map(|(position, injection)| injection.pending_json(position))
520            .collect::<Vec<_>>();
521        serde_json::json!({
522            "pendingCount": injections.len(),
523            "injections": injections,
524        })
525    }
526}
527
528fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
529    format!(
530        "{}: {}",
531        Code::ReminderUnknownOption.as_str(),
532        message.as_ref()
533    )
534}
535
536fn session_remind_shape_error(message: impl AsRef<str>) -> String {
537    format!(
538        "{}: {}",
539        Code::ReminderInvalidShape.as_str(),
540        message.as_ref()
541    )
542}
543
544fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
545    format!(
546        "{}: {}",
547        Code::ReminderUnknownPropagate.as_str(),
548        message.as_ref()
549    )
550}
551
552fn string_field(
553    map: &serde_json::Map<String, serde_json::Value>,
554    key: &str,
555    required: bool,
556) -> Result<Option<String>, String> {
557    match map.get(key) {
558        None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
559            format!("`{key}` must be a non-empty string"),
560        )),
561        None | Some(serde_json::Value::Null) => Ok(None),
562        Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
563            session_remind_shape_error(format!("`{key}` must be a non-empty string")),
564        ),
565        Some(serde_json::Value::String(value)) => {
566            let trimmed = value.trim();
567            if trimmed.is_empty() {
568                Ok(None)
569            } else {
570                Ok(Some(trimmed.to_string()))
571            }
572        }
573        Some(other) => Err(session_remind_shape_error(format!(
574            "`{key}` must be a string, got {other}"
575        ))),
576    }
577}
578
579fn bool_field(
580    map: &serde_json::Map<String, serde_json::Value>,
581    key: &str,
582) -> Result<Option<bool>, String> {
583    match map.get(key) {
584        None | Some(serde_json::Value::Null) => Ok(None),
585        Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
586        Some(other) => Err(session_remind_shape_error(format!(
587            "`{key}` must be a bool, got {other}"
588        ))),
589    }
590}
591
592fn int_field(
593    map: &serde_json::Map<String, serde_json::Value>,
594    key: &str,
595) -> Result<Option<i64>, String> {
596    match map.get(key) {
597        None | Some(serde_json::Value::Null) => Ok(None),
598        Some(serde_json::Value::Number(value)) => {
599            let Some(value) = value.as_i64() else {
600                return Err(session_remind_shape_error(format!(
601                    "`{key}` must be an integer"
602                )));
603            };
604            Ok(Some(value))
605        }
606        Some(other) => Err(session_remind_shape_error(format!(
607            "`{key}` must be an int, got {other}"
608        ))),
609    }
610}
611
612fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
613    let Some(value) = map.get("tags") else {
614        return Ok(Vec::new());
615    };
616    if value.is_null() {
617        return Ok(Vec::new());
618    }
619    let Some(values) = value.as_array() else {
620        return Err(session_remind_shape_error("`tags` must be a list"));
621    };
622    let mut tags = Vec::new();
623    for value in values {
624        let Some(tag) = value.as_str() else {
625            return Err(session_remind_shape_error(format!(
626                "`tags` entries must be strings, got {value}"
627            )));
628        };
629        let tag = tag.trim();
630        if tag.is_empty() {
631            return Err(session_remind_shape_error(
632                "`tags` entries must be non-empty strings",
633            ));
634        }
635        if !tags.iter().any(|existing| existing == tag) {
636            tags.push(tag.to_string());
637        }
638    }
639    Ok(tags)
640}
641
642fn session_remind_payload_from_value(
643    value: &serde_json::Value,
644) -> Result<crate::llm::helpers::SystemReminder, String> {
645    let Some(map) = value.as_object() else {
646        return Err(session_remind_shape_error(
647            "session/remind payload must be a reminder object",
648        ));
649    };
650    const ALLOWED: &[&str] = &[
651        "_meta",
652        "body",
653        "dedupe_key",
654        "fired_at_turn",
655        "id",
656        "preserve_on_compact",
657        "propagate",
658        "role_hint",
659        "source",
660        "tags",
661        "ttl_turns",
662    ];
663    let unknown = map
664        .keys()
665        .filter(|key| !ALLOWED.contains(&key.as_str()))
666        .map(String::as_str)
667        .collect::<Vec<_>>();
668    if !unknown.is_empty() {
669        if unknown.contains(&"content") {
670            return Err(session_remind_shape_error(
671                "session/remind expects reminder `body`, not user-message `content`",
672            ));
673        }
674        return Err(reminder_unknown_option_error(format!(
675            "unknown reminder option(s): {}",
676            unknown.join(", ")
677        )));
678    }
679    if let Some(meta) = map.get("_meta") {
680        if !meta.is_null() && !meta.is_object() {
681            return Err(session_remind_shape_error("`_meta` must be an object"));
682        }
683    }
684    let ttl_turns = int_field(map, "ttl_turns")?;
685    if let Some(value) = ttl_turns {
686        if value <= 0 {
687            return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
688        }
689    }
690    let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
691    if fired_at_turn < 0 {
692        return Err(session_remind_shape_error(
693            "`fired_at_turn` must be >= 0 when provided",
694        ));
695    }
696    match string_field(map, "source", false)?.as_deref() {
697        None | Some("bridge") => {}
698        Some(_) => {
699            return Err(session_remind_shape_error(
700                "`source` for session/remind must be bridge when provided",
701            ))
702        }
703    }
704    let propagate = match string_field(map, "propagate", false)?.as_deref() {
705        None => crate::llm::helpers::ReminderPropagate::Session,
706        Some("all") => crate::llm::helpers::ReminderPropagate::All,
707        Some("session") => crate::llm::helpers::ReminderPropagate::Session,
708        Some("none") => crate::llm::helpers::ReminderPropagate::None,
709        Some(_) => {
710            return Err(reminder_unknown_propagate_error(
711                "`propagate` must be one of all, session, or none",
712            ))
713        }
714    };
715    let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
716        None => crate::llm::helpers::ReminderRoleHint::System,
717        Some("system") => crate::llm::helpers::ReminderRoleHint::System,
718        Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
719        Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
720        Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
721        Some(_) => {
722            return Err(session_remind_shape_error(
723                "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
724            ))
725        }
726    };
727    Ok(crate::llm::helpers::SystemReminder {
728        id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
729        tags: tags_field(map)?,
730        dedupe_key: string_field(map, "dedupe_key", false)?,
731        ttl_turns,
732        preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
733        propagate,
734        role_hint,
735        source: crate::llm::helpers::ReminderSource::Bridge,
736        body: string_field(map, "body", true)?.unwrap_or_default(),
737        fired_at_turn,
738        originating_agent_id: None,
739    })
740}
741
742/// Parse the params of a `session/cancel_tool_call` notification and fire
743/// the per-tool-call cancellation. Mirrors the shape used by the public
744/// `cancel_in_flight_tool_call` builtin so hosts have one wire format
745/// regardless of which surface they came through.
746///
747/// Stdio bridges send this as a notification (no id, no response); the
748/// builtin handles request/response semantics in Harn. We deliberately
749/// drop malformed payloads silently because notifications can't reply
750/// with an error — logging would also be too noisy for partial drops.
751fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
752    let session_id = params
753        .get("sessionId")
754        .or_else(|| params.get("session_id"))
755        .and_then(|value| value.as_str())
756        .unwrap_or_default();
757    let call_id = params
758        .get("toolCallId")
759        .or_else(|| params.get("tool_call_id"))
760        .or_else(|| params.get("callId"))
761        .or_else(|| params.get("call_id"))
762        .and_then(|value| value.as_str())
763        .unwrap_or_default();
764    if call_id.is_empty() {
765        return;
766    }
767    let reason = params
768        .get("reason")
769        .and_then(|value| value.as_str())
770        .unwrap_or("host cancelled in-flight tool call")
771        .to_string();
772    let inject_reminder = params
773        .get("injectReminder")
774        .or_else(|| params.get("inject_reminder"))
775        .and_then(|value| value.as_bool())
776        .unwrap_or(true);
777    crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
778}
779
780fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
781    let mode = QueuedUserMessageMode::from_str(
782        params
783            .get("mode")
784            .and_then(|value| value.as_str())
785            .unwrap_or("audit_only"),
786    );
787    let reminder_value = if let Some(reminder) = params.get("reminder") {
788        reminder.clone()
789    } else {
790        let Some(params) = params.as_object() else {
791            return Err(session_remind_shape_error(
792                "session/remind params must be an object",
793            ));
794        };
795        let mut reminder = params.clone();
796        reminder.remove("mode");
797        reminder.remove("sessionId");
798        reminder.remove("session_id");
799        serde_json::Value::Object(reminder)
800    };
801    Ok(QueuedReminder {
802        reminder: session_remind_payload_from_value(&reminder_value)?,
803        mode,
804    })
805}
806
807// Default doesn't apply — new() spawns async tasks requiring a tokio LocalSet.
808#[allow(clippy::new_without_default)]
809impl HostBridge {
810    /// Create a new bridge and spawn the stdin reader task.
811    ///
812    /// Must be called within a tokio LocalSet (uses spawn_local for the
813    /// stdin reader since it's single-threaded).
814    pub fn new() -> Self {
815        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
816            Arc::new(Mutex::new(HashMap::new()));
817        let cancelled = Arc::new(AtomicBool::new(false));
818        let cancel_notify = Arc::new(Notify::new());
819        let queued_transcript_injections = HostBridgeInjectionState::default();
820        let resume_requested = Arc::new(AtomicBool::new(false));
821        let skills_reload_requested = Arc::new(AtomicBool::new(false));
822        let daemon_idle = Arc::new(AtomicBool::new(false));
823
824        // Stdin reader: reads JSON-RPC lines and dispatches responses
825        let pending_clone = pending.clone();
826        let cancelled_clone = cancelled.clone();
827        let cancel_notify_clone = cancel_notify.clone();
828        let queued_clone = queued_transcript_injections.clone();
829        let resume_clone = resume_requested.clone();
830        let skills_reload_clone = skills_reload_requested.clone();
831        tokio::task::spawn_local(async move {
832            let stdin = tokio::io::stdin();
833            let reader = tokio::io::BufReader::new(stdin);
834            let mut lines = reader.lines();
835
836            while let Ok(Some(line)) = lines.next_line().await {
837                let line = line.trim().to_string();
838                if line.is_empty() {
839                    continue;
840                }
841
842                let msg: serde_json::Value = match serde_json::from_str(&line) {
843                    Ok(v) => v,
844                    Err(_) => continue,
845                };
846
847                // Notifications have no id; responses have one.
848                if msg.get("id").is_none() {
849                    if let Some(method) = msg["method"].as_str() {
850                        if method == "cancel" {
851                            cancelled_clone.store(true, Ordering::SeqCst);
852                            cancel_notify_clone.notify_waiters();
853                        } else if method == "agent/resume" {
854                            resume_clone.store(true, Ordering::SeqCst);
855                        } else if method == "skills/update" {
856                            skills_reload_clone.store(true, Ordering::SeqCst);
857                        } else if method == "session/remind" {
858                            let params = &msg["params"];
859                            if let Ok(reminder) = queued_session_remind_from_params(params) {
860                                queued_clone.push_session_reminder(reminder).await;
861                            }
862                        } else if method == "session/cancel_tool_call" {
863                            handle_cancel_tool_call_notification(&msg["params"]);
864                        }
865                    }
866                    continue;
867                }
868
869                if let Some(id) = msg["id"].as_u64() {
870                    let mut pending = pending_clone.lock().await;
871                    if let Some(sender) = pending.remove(&id) {
872                        let _ = sender.send(msg);
873                    }
874                }
875            }
876
877            // stdin closed: drop pending senders to cancel waiters.
878            let mut pending = pending_clone.lock().await;
879            pending.clear();
880        });
881
882        Self {
883            next_id: AtomicU64::new(1),
884            pending,
885            cancelled,
886            cancel_notify,
887            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
888            session_id: std::sync::Mutex::new(String::new()),
889            script_name: std::sync::Mutex::new(String::new()),
890            queued_transcript_injections,
891            resume_requested,
892            skills_reload_requested,
893            daemon_idle,
894            prompt_stop_reason: std::sync::Mutex::new(None),
895            visible_call_states: std::sync::Mutex::new(HashMap::new()),
896            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
897            in_process: None,
898        }
899    }
900
901    /// Create a bridge from pre-existing shared state.
902    ///
903    /// Unlike `new()`, does **not** spawn a stdin reader — the caller is
904    /// responsible for dispatching responses into `pending`.  This is used
905    /// by ACP mode which already has its own stdin reader.
906    pub fn from_parts(
907        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
908        cancelled: Arc<AtomicBool>,
909        stdout_lock: Arc<std::sync::Mutex<()>>,
910        start_id: u64,
911    ) -> Self {
912        Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
913    }
914
915    pub fn from_parts_with_writer(
916        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
917        cancelled: Arc<AtomicBool>,
918        writer: HostBridgeWriter,
919        start_id: u64,
920    ) -> Self {
921        Self::from_parts_with_writer_and_cancel_notify(
922            pending,
923            cancelled,
924            Arc::new(Notify::new()),
925            writer,
926            start_id,
927        )
928    }
929
930    pub fn from_parts_with_writer_and_cancel_notify(
931        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
932        cancelled: Arc<AtomicBool>,
933        cancel_notify: Arc<Notify>,
934        writer: HostBridgeWriter,
935        start_id: u64,
936    ) -> Self {
937        Self::from_parts_with_writer_cancel_notify_and_injection_state(
938            pending,
939            cancelled,
940            cancel_notify,
941            writer,
942            start_id,
943            None,
944        )
945    }
946
947    pub fn from_parts_with_writer_cancel_notify_and_injection_state(
948        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
949        cancelled: Arc<AtomicBool>,
950        cancel_notify: Arc<Notify>,
951        writer: HostBridgeWriter,
952        start_id: u64,
953        injection_state: Option<HostBridgeInjectionState>,
954    ) -> Self {
955        Self {
956            next_id: AtomicU64::new(start_id),
957            pending,
958            cancelled,
959            cancel_notify,
960            writer,
961            session_id: std::sync::Mutex::new(String::new()),
962            script_name: std::sync::Mutex::new(String::new()),
963            queued_transcript_injections: injection_state.unwrap_or_default(),
964            resume_requested: Arc::new(AtomicBool::new(false)),
965            skills_reload_requested: Arc::new(AtomicBool::new(false)),
966            daemon_idle: Arc::new(AtomicBool::new(false)),
967            prompt_stop_reason: std::sync::Mutex::new(None),
968            visible_call_states: std::sync::Mutex::new(HashMap::new()),
969            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
970            in_process: None,
971        }
972    }
973
974    /// Create an in-process host bridge backed by exported functions from a
975    /// Harn module. Used by `harn playground` to avoid JSON-RPC boilerplate.
976    pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
977        let exported_functions = vm.load_module_exports(module_path).await?;
978        Ok(Self {
979            next_id: AtomicU64::new(1),
980            pending: Arc::new(Mutex::new(HashMap::new())),
981            cancelled: Arc::new(AtomicBool::new(false)),
982            cancel_notify: Arc::new(Notify::new()),
983            writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
984            session_id: std::sync::Mutex::new(String::new()),
985            script_name: std::sync::Mutex::new(String::new()),
986            queued_transcript_injections: HostBridgeInjectionState::default(),
987            resume_requested: Arc::new(AtomicBool::new(false)),
988            skills_reload_requested: Arc::new(AtomicBool::new(false)),
989            daemon_idle: Arc::new(AtomicBool::new(false)),
990            prompt_stop_reason: std::sync::Mutex::new(None),
991            visible_call_states: std::sync::Mutex::new(HashMap::new()),
992            visible_call_streams: std::sync::Mutex::new(HashMap::new()),
993            in_process: Some(InProcessHost {
994                module_path: module_path.to_path_buf(),
995                exported_functions,
996                vm,
997            }),
998        })
999    }
1000
1001    /// Set the ACP session ID for session-scoped notifications.
1002    pub fn set_session_id(&self, id: &str) {
1003        *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1004    }
1005
1006    /// Set the currently executing script name (without .harn suffix).
1007    pub fn set_script_name(&self, name: &str) {
1008        *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1009    }
1010
1011    /// Get the current script name.
1012    fn get_script_name(&self) -> String {
1013        self.script_name
1014            .lock()
1015            .unwrap_or_else(|e| e.into_inner())
1016            .clone()
1017    }
1018
1019    /// Get the session ID.
1020    pub fn get_session_id(&self) -> String {
1021        self.session_id
1022            .lock()
1023            .unwrap_or_else(|e| e.into_inner())
1024            .clone()
1025    }
1026
1027    /// Write a complete JSON-RPC line to stdout, serialized through a mutex.
1028    fn write_line(&self, line: &str) -> Result<(), VmError> {
1029        (self.writer)(line).map_err(VmError::Runtime)
1030    }
1031
1032    /// Send a JSON-RPC request to the host and wait for the response.
1033    /// Times out after 5 minutes to prevent deadlocks.
1034    pub async fn call(
1035        &self,
1036        method: &str,
1037        params: serde_json::Value,
1038    ) -> Result<serde_json::Value, VmError> {
1039        if let Some(in_process) = &self.in_process {
1040            return in_process.dispatch(method, params).await;
1041        }
1042
1043        if self.is_cancelled() {
1044            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1045        }
1046
1047        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1048        let cancel_wait = self.cancel_notify.notified();
1049        tokio::pin!(cancel_wait);
1050
1051        let request = crate::jsonrpc::request(id, method, params);
1052
1053        let (tx, rx) = oneshot::channel();
1054        {
1055            let mut pending = self.pending.lock().await;
1056            pending.insert(id, tx);
1057        }
1058
1059        let line = serde_json::to_string(&request)
1060            .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1061        if let Err(e) = self.write_line(&line) {
1062            let mut pending = self.pending.lock().await;
1063            pending.remove(&id);
1064            return Err(e);
1065        }
1066
1067        if self.is_cancelled() {
1068            let mut pending = self.pending.lock().await;
1069            pending.remove(&id);
1070            return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1071        }
1072
1073        let response = tokio::select! {
1074            result = rx => match result {
1075                Ok(msg) => msg,
1076                Err(_) => {
1077                    // Sender dropped: host closed or stdin reader exited.
1078                    return Err(VmError::Runtime(
1079                        "Bridge: host closed connection before responding".into(),
1080                    ));
1081                }
1082            },
1083            _ = &mut cancel_wait => {
1084                let mut pending = self.pending.lock().await;
1085                pending.remove(&id);
1086                return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1087            }
1088            _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1089                let mut pending = self.pending.lock().await;
1090                pending.remove(&id);
1091                return Err(VmError::Runtime(format!(
1092                    "Bridge: host did not respond to '{method}' within {}s",
1093                    DEFAULT_TIMEOUT.as_secs()
1094                )));
1095            }
1096        };
1097
1098        if let Some(error) = response.get("error") {
1099            let message = error["message"].as_str().unwrap_or("Unknown host error");
1100            let code = error["code"].as_i64().unwrap_or(-1);
1101            // JSON-RPC -32001 signals the host rejected the tool (not permitted / not in allowlist).
1102            if code == -32001 {
1103                return Err(VmError::CategorizedError {
1104                    message: message.to_string(),
1105                    category: ErrorCategory::ToolRejected,
1106                });
1107            }
1108            return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1109        }
1110
1111        Ok(response["result"].clone())
1112    }
1113
1114    /// Send a JSON-RPC notification to the host (no response expected).
1115    /// Serialized through the stdout mutex to prevent interleaving.
1116    pub fn notify(&self, method: &str, params: serde_json::Value) {
1117        let notification = crate::jsonrpc::notification(method, params);
1118        if self.in_process.is_some() {
1119            return;
1120        }
1121        if let Ok(line) = serde_json::to_string(&notification) {
1122            let _ = self.write_line(&line);
1123        }
1124    }
1125
1126    /// Check if the host has sent a cancel notification.
1127    pub fn is_cancelled(&self) -> bool {
1128        self.cancelled.load(Ordering::SeqCst)
1129    }
1130
1131    pub fn take_resume_signal(&self) -> bool {
1132        self.resume_requested.swap(false, Ordering::SeqCst)
1133    }
1134
1135    pub fn signal_resume(&self) {
1136        self.resume_requested.store(true, Ordering::SeqCst);
1137    }
1138
1139    pub fn set_daemon_idle(&self, idle: bool) {
1140        self.daemon_idle.store(idle, Ordering::SeqCst);
1141    }
1142
1143    pub fn is_daemon_idle(&self) -> bool {
1144        self.daemon_idle.load(Ordering::SeqCst)
1145    }
1146
1147    /// Record the canonical ACP `stopReason` for the current prompt. The
1148    /// last writer wins, which matches the semantic that an outer
1149    /// `agent_loop` (the one whose result the user observes) always
1150    /// finalizes after any inner loops it spawned.
1151    pub fn set_prompt_stop_reason(&self, reason: &str) {
1152        *self
1153            .prompt_stop_reason
1154            .lock()
1155            .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1156    }
1157
1158    /// Consume any prompt stop reason recorded during this prompt. The
1159    /// ACP adapter calls this once after the pipeline returns; pipelines
1160    /// that didn't run an `agent_loop` see `None` and the adapter falls
1161    /// back to `end_turn`.
1162    pub fn take_prompt_stop_reason(&self) -> Option<String> {
1163        self.prompt_stop_reason
1164            .lock()
1165            .unwrap_or_else(|e| e.into_inner())
1166            .take()
1167    }
1168
1169    /// Consume any pending `skills/update` signal the host has sent.
1170    /// Returns `true` exactly once per notification, letting callers
1171    /// trigger a layered-discovery rebuild without polling false
1172    /// positives. See issue #73 for the hot-reload contract.
1173    pub fn take_skills_reload_signal(&self) -> bool {
1174        self.skills_reload_requested.swap(false, Ordering::SeqCst)
1175    }
1176
1177    /// Manually mark the skill catalog as stale. Used by tests and by
1178    /// the CLI when an internal event (e.g. `harn install`) should
1179    /// trigger the same rebuild a `skills/update` notification would.
1180    pub fn signal_skills_reload(&self) {
1181        self.skills_reload_requested.store(true, Ordering::SeqCst);
1182    }
1183
1184    /// Call the host's `skills/list` RPC and return the raw JSON array
1185    /// it responded with. Shape:
1186    /// `[{ "id": "...", "name": "...", "description": "...", "source": "..." }, ...]`.
1187    /// The CLI adapter converts each entry into a
1188    /// [`crate::skills::SkillManifestRef`].
1189    pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1190        let result = self.call("skills/list", serde_json::json!({})).await?;
1191        match result {
1192            serde_json::Value::Array(items) => Ok(items),
1193            serde_json::Value::Object(map) => match map.get("skills") {
1194                Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1195                _ => Err(VmError::Runtime(
1196                    "skills/list: host response must be an array or { skills: [...] }".into(),
1197                )),
1198            },
1199            _ => Err(VmError::Runtime(
1200                "skills/list: unexpected response shape".into(),
1201            )),
1202        }
1203    }
1204
1205    /// Call the host's `host/tools/list` RPC and return normalized tool
1206    /// descriptors. Shape:
1207    /// `[{ "name": "...", "description": "...", "schema": {...}, "deprecated": false }, ...]`.
1208    /// The bridge also accepts `{ "tools": [...] }` and
1209    /// `{ "result": { "tools": [...] } }` wrappers for lenient hosts.
1210    pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1211        let result = self.call("host/tools/list", serde_json::json!({})).await?;
1212        parse_host_tools_list_response(result)
1213    }
1214
1215    /// Call the host's `skills/fetch` RPC for one skill id. Returns the
1216    /// raw JSON body so the CLI can inspect both the frontmatter fields
1217    /// and the skill markdown body in whatever shape the host sends.
1218    pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1219        self.call("skills/fetch", serde_json::json!({ "id": id }))
1220            .await
1221    }
1222
1223    pub fn injection_state(&self) -> HostBridgeInjectionState {
1224        self.queued_transcript_injections.clone()
1225    }
1226
1227    pub async fn push_pending_user_message(
1228        &self,
1229        content: String,
1230        transcript_content: serde_json::Value,
1231        mode: &str,
1232    ) -> String {
1233        self.queued_transcript_injections
1234            .push_pending_user_message(content, transcript_content, mode)
1235            .await
1236    }
1237
1238    pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1239        self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1240            .await
1241    }
1242
1243    pub async fn revoke_pending_user_message(
1244        &self,
1245        message_id: &str,
1246    ) -> PendingUserMessageMutationResult {
1247        self.queued_transcript_injections
1248            .revoke_pending_user_message(message_id)
1249            .await
1250    }
1251
1252    pub async fn revoke_pending_reminder(
1253        &self,
1254        reminder_id: &str,
1255    ) -> PendingReminderMutationResult {
1256        self.queued_transcript_injections
1257            .revoke_pending_reminder(reminder_id)
1258            .await
1259    }
1260
1261    pub async fn pending_injections_json(&self) -> serde_json::Value {
1262        self.queued_transcript_injections
1263            .pending_injections_json()
1264            .await
1265    }
1266
1267    pub async fn replace_pending_user_message(
1268        &self,
1269        message_id: &str,
1270        content: String,
1271        transcript_content: serde_json::Value,
1272    ) -> PendingUserMessageMutationResult {
1273        self.queued_transcript_injections
1274            .replace_pending_user_message(message_id, content, transcript_content)
1275            .await
1276    }
1277
1278    pub async fn push_queued_session_remind_from_params(
1279        &self,
1280        params: &serde_json::Value,
1281    ) -> Result<String, String> {
1282        let reminder = queued_session_remind_from_params(params)?;
1283        let reminder_id = reminder.reminder.id.clone();
1284        self.queued_transcript_injections
1285            .push_session_reminder(reminder)
1286            .await;
1287        Ok(reminder_id)
1288    }
1289
1290    pub async fn take_queued_user_messages(
1291        &self,
1292        include_interrupt_immediate: bool,
1293        include_finish_step: bool,
1294        include_audit_only: bool,
1295    ) -> Vec<QueuedUserMessage> {
1296        let mut state = self.queued_transcript_injections.inner.lock().await;
1297        let mut selected = Vec::new();
1298        let mut retained = VecDeque::new();
1299        while let Some(injection) = state.queue.pop_front() {
1300            let should_take = match injection.mode() {
1301                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1302                QueuedUserMessageMode::FinishStep => include_finish_step,
1303                QueuedUserMessageMode::AuditOnly => include_audit_only,
1304            };
1305            match (should_take, injection) {
1306                (true, QueuedTranscriptInjection::User(message)) => {
1307                    state
1308                        .delivered_user_message_ids
1309                        .insert(message.message_id.clone());
1310                    selected.push(message);
1311                }
1312                (_, injection) => retained.push_back(injection),
1313            }
1314        }
1315        state.queue = retained;
1316        selected
1317    }
1318
1319    pub async fn take_queued_transcript_injections(
1320        &self,
1321        include_interrupt_immediate: bool,
1322        include_finish_step: bool,
1323        include_audit_only: bool,
1324    ) -> Vec<QueuedTranscriptInjection> {
1325        let mut state = self.queued_transcript_injections.inner.lock().await;
1326        let mut selected = Vec::new();
1327        let mut retained = VecDeque::new();
1328        while let Some(injection) = state.queue.pop_front() {
1329            let should_take = match injection.mode() {
1330                QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1331                QueuedUserMessageMode::FinishStep => include_finish_step,
1332                QueuedUserMessageMode::AuditOnly => include_audit_only,
1333            };
1334            if should_take {
1335                match &injection {
1336                    QueuedTranscriptInjection::User(message) => {
1337                        state
1338                            .delivered_user_message_ids
1339                            .insert(message.message_id.clone());
1340                    }
1341                    QueuedTranscriptInjection::Reminder(reminder) => {
1342                        state
1343                            .delivered_reminder_ids
1344                            .insert(reminder.reminder.id.clone());
1345                    }
1346                }
1347                selected.push(injection);
1348            } else {
1349                retained.push_back(injection);
1350            }
1351        }
1352        state.queue = retained;
1353        selected
1354    }
1355
1356    pub async fn take_queued_user_messages_for(
1357        &self,
1358        checkpoint: DeliveryCheckpoint,
1359    ) -> Vec<QueuedUserMessage> {
1360        match checkpoint {
1361            DeliveryCheckpoint::InterruptImmediate => {
1362                self.take_queued_user_messages(true, false, false).await
1363            }
1364            DeliveryCheckpoint::AfterCurrentOperation => {
1365                self.take_queued_user_messages(false, true, false).await
1366            }
1367            DeliveryCheckpoint::EndOfInteraction => {
1368                self.take_queued_user_messages(false, false, true).await
1369            }
1370        }
1371    }
1372
1373    pub async fn take_queued_transcript_injections_for(
1374        &self,
1375        checkpoint: DeliveryCheckpoint,
1376    ) -> Vec<QueuedTranscriptInjection> {
1377        match checkpoint {
1378            DeliveryCheckpoint::InterruptImmediate => {
1379                self.take_queued_transcript_injections(true, false, false)
1380                    .await
1381            }
1382            DeliveryCheckpoint::AfterCurrentOperation => {
1383                self.take_queued_transcript_injections(false, true, false)
1384                    .await
1385            }
1386            DeliveryCheckpoint::EndOfInteraction => {
1387                self.take_queued_transcript_injections(false, false, true)
1388                    .await
1389            }
1390        }
1391    }
1392
1393    /// Send an output notification (for log/print in bridge mode).
1394    pub fn send_output(&self, text: &str) {
1395        self.notify("output", serde_json::json!({"text": text}));
1396    }
1397
1398    /// Send a progress notification with optional numeric progress and structured data.
1399    pub fn send_progress(
1400        &self,
1401        phase: &str,
1402        message: &str,
1403        progress: Option<i64>,
1404        total: Option<i64>,
1405        data: Option<serde_json::Value>,
1406    ) {
1407        let mut payload = serde_json::json!({"phase": phase, "message": message});
1408        if let Some(p) = progress {
1409            payload["progress"] = serde_json::json!(p);
1410        }
1411        if let Some(t) = total {
1412            payload["total"] = serde_json::json!(t);
1413        }
1414        if let Some(d) = data {
1415            payload["data"] = d;
1416        }
1417        self.notify("progress", payload);
1418    }
1419
1420    /// Send a structured log notification.
1421    pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1422        let mut payload = serde_json::json!({"level": level, "message": message});
1423        if let Some(f) = fields {
1424            payload["fields"] = f;
1425        }
1426        self.notify("log", payload);
1427    }
1428
1429    /// Send a `session/update` with `call_start` — signals the beginning of
1430    /// an LLM call, tool call, or builtin call for observability.
1431    pub fn send_call_start(
1432        &self,
1433        call_id: &str,
1434        call_type: &str,
1435        name: &str,
1436        metadata: serde_json::Value,
1437    ) {
1438        let session_id = self.get_session_id();
1439        let script = self.get_script_name();
1440        let stream_publicly = metadata
1441            .get("stream_publicly")
1442            .and_then(|value| value.as_bool())
1443            .unwrap_or(true);
1444        self.visible_call_streams
1445            .lock()
1446            .unwrap_or_else(|e| e.into_inner())
1447            .insert(call_id.to_string(), stream_publicly);
1448        self.notify(
1449            "session/update",
1450            serde_json::json!({
1451                "sessionId": session_id,
1452                "update": {
1453                    "sessionUpdate": "call_start",
1454                    "content": {
1455                        "toolCallId": call_id,
1456                        "call_type": call_type,
1457                        "name": name,
1458                        "script": script,
1459                        "metadata": metadata,
1460                    },
1461                },
1462            }),
1463        );
1464    }
1465
1466    /// Send a `session/update` with `call_progress` — a streaming token delta
1467    /// from an in-flight LLM call.
1468    pub fn send_call_progress(
1469        &self,
1470        call_id: &str,
1471        delta: &str,
1472        accumulated_tokens: u64,
1473        user_visible: bool,
1474    ) {
1475        let session_id = self.get_session_id();
1476        let (visible_text, visible_delta) = {
1477            let stream_publicly = self
1478                .visible_call_streams
1479                .lock()
1480                .unwrap_or_else(|e| e.into_inner())
1481                .get(call_id)
1482                .copied()
1483                .unwrap_or(true);
1484            let mut states = self
1485                .visible_call_states
1486                .lock()
1487                .unwrap_or_else(|e| e.into_inner());
1488            let state = states.entry(call_id.to_string()).or_default();
1489            state.push(delta, stream_publicly)
1490        };
1491        self.notify(
1492            "session/update",
1493            serde_json::json!({
1494                "sessionId": session_id,
1495                "update": {
1496                    "sessionUpdate": "call_progress",
1497                    "content": {
1498                        "toolCallId": call_id,
1499                        "delta": delta,
1500                        "accumulated_tokens": accumulated_tokens,
1501                        "visible_text": visible_text,
1502                        "visible_delta": visible_delta,
1503                        "user_visible": user_visible,
1504                    },
1505                },
1506            }),
1507        );
1508    }
1509
1510    /// Send a `session/update` with `call_end` — signals completion of a call.
1511    pub fn send_call_end(
1512        &self,
1513        call_id: &str,
1514        call_type: &str,
1515        name: &str,
1516        duration_ms: u64,
1517        status: &str,
1518        metadata: serde_json::Value,
1519    ) {
1520        let session_id = self.get_session_id();
1521        let script = self.get_script_name();
1522        self.visible_call_states
1523            .lock()
1524            .unwrap_or_else(|e| e.into_inner())
1525            .remove(call_id);
1526        self.visible_call_streams
1527            .lock()
1528            .unwrap_or_else(|e| e.into_inner())
1529            .remove(call_id);
1530        self.notify(
1531            "session/update",
1532            serde_json::json!({
1533                "sessionId": session_id,
1534                "update": {
1535                    "sessionUpdate": "call_end",
1536                    "content": {
1537                        "toolCallId": call_id,
1538                        "call_type": call_type,
1539                        "name": name,
1540                        "script": script,
1541                        "duration_ms": duration_ms,
1542                        "status": status,
1543                        "metadata": metadata,
1544                    },
1545                },
1546            }),
1547        );
1548    }
1549
1550    /// Send a worker lifecycle update for delegated/background execution.
1551    pub fn send_worker_update(
1552        &self,
1553        worker_id: &str,
1554        worker_name: &str,
1555        status: &str,
1556        metadata: serde_json::Value,
1557        audit: Option<&MutationSessionRecord>,
1558    ) {
1559        let session_id = self.get_session_id();
1560        let script = self.get_script_name();
1561        let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1562        let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1563        let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1564        let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1565        let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1566        let lifecycle = serde_json::json!({
1567            "event": status,
1568            "worker_id": worker_id,
1569            "worker_name": worker_name,
1570            "started_at": started_at,
1571            "finished_at": finished_at,
1572        });
1573        self.notify(
1574            "session/update",
1575            serde_json::json!({
1576                "sessionId": session_id,
1577                "update": {
1578                    "sessionUpdate": "worker_update",
1579                    "content": {
1580                        "worker_id": worker_id,
1581                        "worker_name": worker_name,
1582                        "status": status,
1583                        "script": script,
1584                        "started_at": started_at,
1585                        "finished_at": finished_at,
1586                        "snapshot_path": snapshot_path,
1587                        "run_id": run_id,
1588                        "run_path": run_path,
1589                        "lifecycle": lifecycle,
1590                        "audit": audit,
1591                        "metadata": metadata,
1592                    },
1593                },
1594            }),
1595        );
1596    }
1597}
1598
1599/// Convert a serde_json::Value to a VmValue.
1600pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1601    crate::stdlib::json_to_vm_value(val)
1602}
1603
1604fn parse_host_tools_list_response(
1605    result: serde_json::Value,
1606) -> Result<Vec<serde_json::Value>, VmError> {
1607    let tools = match result {
1608        serde_json::Value::Array(items) => items,
1609        serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1610            map.get("result")
1611                .and_then(|value| value.get("tools"))
1612                .cloned()
1613        }) {
1614            Some(serde_json::Value::Array(items)) => items,
1615            _ => {
1616                return Err(VmError::Runtime(
1617                    "host/tools/list: host response must be an array or { tools: [...] }".into(),
1618                ));
1619            }
1620        },
1621        _ => {
1622            return Err(VmError::Runtime(
1623                "host/tools/list: unexpected response shape".into(),
1624            ));
1625        }
1626    };
1627
1628    let mut normalized = Vec::with_capacity(tools.len());
1629    for tool in tools {
1630        let serde_json::Value::Object(map) = tool else {
1631            return Err(VmError::Runtime(
1632                "host/tools/list: every tool must be an object".into(),
1633            ));
1634        };
1635        let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1636            return Err(VmError::Runtime(
1637                "host/tools/list: every tool must include a string `name`".into(),
1638            ));
1639        };
1640        let description = map
1641            .get("description")
1642            .and_then(|value| value.as_str())
1643            .or_else(|| {
1644                map.get("short_description")
1645                    .and_then(|value| value.as_str())
1646            })
1647            .unwrap_or_default();
1648        let schema = map
1649            .get("schema")
1650            .cloned()
1651            .or_else(|| map.get("parameters").cloned())
1652            .or_else(|| map.get("input_schema").cloned())
1653            .unwrap_or(serde_json::Value::Null);
1654        let deprecated = map
1655            .get("deprecated")
1656            .and_then(|value| value.as_bool())
1657            .unwrap_or(false);
1658        normalized.push(serde_json::json!({
1659            "name": name,
1660            "description": description,
1661            "schema": schema,
1662            "deprecated": deprecated,
1663        }));
1664    }
1665    Ok(normalized)
1666}
1667
1668#[cfg(test)]
1669mod tests {
1670    use super::*;
1671
1672    fn test_bridge() -> HostBridge {
1673        HostBridge::from_parts(
1674            Arc::new(Mutex::new(HashMap::new())),
1675            Arc::new(AtomicBool::new(false)),
1676            Arc::new(std::sync::Mutex::new(())),
1677            1,
1678        )
1679    }
1680
1681    fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1682        HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1683            Arc::new(Mutex::new(HashMap::new())),
1684            Arc::new(AtomicBool::new(false)),
1685            Arc::new(Notify::new()),
1686            Arc::new(|_| Ok(())),
1687            100,
1688            Some(owner.injection_state()),
1689        )
1690    }
1691
1692    #[test]
1693    fn test_json_rpc_request_format() {
1694        let request = crate::jsonrpc::request(
1695            1,
1696            "llm_call",
1697            serde_json::json!({
1698                "prompt": "Hello",
1699                "system": "Be helpful",
1700            }),
1701        );
1702        let s = serde_json::to_string(&request).unwrap();
1703        assert!(s.contains("\"jsonrpc\":\"2.0\""));
1704        assert!(s.contains("\"id\":1"));
1705        assert!(s.contains("\"method\":\"llm_call\""));
1706    }
1707
1708    #[test]
1709    fn test_json_rpc_notification_format() {
1710        let notification =
1711            crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1712        let s = serde_json::to_string(&notification).unwrap();
1713        assert!(s.contains("\"method\":\"output\""));
1714        assert!(!s.contains("\"id\""));
1715    }
1716
1717    #[test]
1718    fn test_json_rpc_error_response_parsing() {
1719        let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1720        assert!(response.get("error").is_some());
1721        assert_eq!(
1722            response["error"]["message"].as_str().unwrap(),
1723            "Invalid request"
1724        );
1725    }
1726
1727    #[test]
1728    fn test_json_rpc_success_response_parsing() {
1729        let response = crate::jsonrpc::response(
1730            1,
1731            serde_json::json!({
1732                "text": "Hello world",
1733                "input_tokens": 10,
1734                "output_tokens": 5,
1735            }),
1736        );
1737        assert!(response.get("result").is_some());
1738        assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1739    }
1740
1741    #[test]
1742    fn test_cancelled_flag() {
1743        let cancelled = Arc::new(AtomicBool::new(false));
1744        assert!(!cancelled.load(Ordering::SeqCst));
1745        cancelled.store(true, Ordering::SeqCst);
1746        assert!(cancelled.load(Ordering::SeqCst));
1747    }
1748
1749    #[test]
1750    fn pending_host_calls_return_when_cancellation_arrives() {
1751        let runtime = tokio::runtime::Builder::new_current_thread()
1752            .enable_all()
1753            .build()
1754            .unwrap();
1755        runtime.block_on(async {
1756            let pending = Arc::new(Mutex::new(HashMap::new()));
1757            let cancelled = Arc::new(AtomicBool::new(false));
1758            let bridge = HostBridge::from_parts_with_writer(
1759                pending.clone(),
1760                cancelled.clone(),
1761                Arc::new(|_| Ok(())),
1762                1,
1763            );
1764
1765            let call = bridge.call("host/work", serde_json::json!({}));
1766            tokio::pin!(call);
1767
1768            loop {
1769                tokio::select! {
1770                    result = &mut call => panic!("call completed before cancellation: {result:?}"),
1771                    _ = tokio::task::yield_now() => {}
1772                }
1773                if !pending.lock().await.is_empty() {
1774                    break;
1775                }
1776            }
1777
1778            cancelled.store(true, Ordering::SeqCst);
1779            bridge.cancel_notify.notify_waiters();
1780
1781            let result = tokio::time::timeout(Duration::from_secs(1), call)
1782                .await
1783                .expect("pending call should observe cancellation promptly");
1784            assert!(
1785                matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1786            );
1787            assert!(pending.lock().await.is_empty());
1788        });
1789    }
1790
1791    #[test]
1792    fn queued_messages_are_filtered_by_delivery_mode() {
1793        let runtime = tokio::runtime::Builder::new_current_thread()
1794            .enable_all()
1795            .build()
1796            .unwrap();
1797        runtime.block_on(async {
1798            let bridge = test_bridge();
1799            bridge
1800                .push_queued_user_message("first".to_string(), "finish_step")
1801                .await;
1802            bridge
1803                .push_queued_user_message("second".to_string(), "audit_only")
1804                .await;
1805
1806            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1807            assert_eq!(finish_step.len(), 1);
1808            assert_eq!(finish_step[0].content, "first");
1809
1810            let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1811            assert_eq!(audit_only.len(), 1);
1812            assert_eq!(audit_only[0].content, "second");
1813        });
1814    }
1815
1816    #[test]
1817    fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1818        let runtime = tokio::runtime::Builder::new_current_thread()
1819            .enable_all()
1820            .build()
1821            .unwrap();
1822        runtime.block_on(async {
1823            let bridge = test_bridge();
1824            let first_id = bridge
1825                .push_pending_user_message(
1826                    "first".to_string(),
1827                    serde_json::json!("first"),
1828                    "audit_only",
1829                )
1830                .await;
1831            let second_id = bridge
1832                .push_pending_user_message(
1833                    "second".to_string(),
1834                    serde_json::json!("second"),
1835                    "audit_only",
1836                )
1837                .await;
1838
1839            assert_eq!(
1840                bridge
1841                    .replace_pending_user_message(
1842                        &second_id,
1843                        "second edited".to_string(),
1844                        serde_json::json!("second edited"),
1845                    )
1846                    .await,
1847                PendingUserMessageMutationResult::Mutated
1848            );
1849            assert_eq!(
1850                bridge.revoke_pending_user_message(&first_id).await,
1851                PendingUserMessageMutationResult::Mutated
1852            );
1853            assert_eq!(
1854                bridge.revoke_pending_user_message(&first_id).await,
1855                PendingUserMessageMutationResult::AlreadyRevoked
1856            );
1857
1858            let delivered = bridge
1859                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1860                .await;
1861            assert_eq!(delivered.len(), 1);
1862            assert_eq!(delivered[0].message_id, second_id);
1863            assert_eq!(delivered[0].content, "second edited");
1864
1865            assert_eq!(
1866                bridge.revoke_pending_user_message(&second_id).await,
1867                PendingUserMessageMutationResult::AlreadyDelivered
1868            );
1869            assert_eq!(
1870                bridge
1871                    .replace_pending_user_message(
1872                        &second_id,
1873                        "too late".to_string(),
1874                        serde_json::json!("too late"),
1875                    )
1876                    .await,
1877                PendingUserMessageMutationResult::AlreadyDelivered
1878            );
1879            assert_eq!(
1880                bridge.revoke_pending_user_message("missing").await,
1881                PendingUserMessageMutationResult::UnknownMessageId
1882            );
1883        });
1884    }
1885
1886    #[test]
1887    fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1888        let runtime = tokio::runtime::Builder::new_current_thread()
1889            .enable_all()
1890            .build()
1891            .unwrap();
1892        runtime.block_on(async {
1893            let bridge = test_bridge();
1894            let first_id = bridge
1895                .push_pending_user_message(
1896                    "first".to_string(),
1897                    serde_json::json!("first"),
1898                    "finish_step",
1899                )
1900                .await;
1901            let second_id = bridge
1902                .push_pending_user_message(
1903                    "second".to_string(),
1904                    serde_json::json!("second"),
1905                    "finish_step",
1906                )
1907                .await;
1908            assert_eq!(
1909                bridge
1910                    .replace_pending_user_message(
1911                        &first_id,
1912                        "first edited".to_string(),
1913                        serde_json::json!("first edited"),
1914                    )
1915                    .await,
1916                PendingUserMessageMutationResult::Mutated
1917            );
1918
1919            let delivered = bridge
1920                .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1921                .await;
1922            assert_eq!(
1923                delivered
1924                    .iter()
1925                    .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1926                    .collect::<Vec<_>>(),
1927                vec![
1928                    (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1929                    (&second_id, "second", QueuedUserMessageMode::FinishStep),
1930                ]
1931            );
1932        });
1933    }
1934
1935    #[test]
1936    fn pending_user_message_state_survives_bridge_replacement() {
1937        let runtime = tokio::runtime::Builder::new_current_thread()
1938            .enable_all()
1939            .build()
1940            .unwrap();
1941        runtime.block_on(async {
1942            let bridge = test_bridge();
1943            let revoked_id = bridge
1944                .push_pending_user_message(
1945                    "revoke me".to_string(),
1946                    serde_json::json!("revoke me"),
1947                    "audit_only",
1948                )
1949                .await;
1950            let delivered_id = bridge
1951                .push_pending_user_message(
1952                    "deliver me".to_string(),
1953                    serde_json::json!("deliver me"),
1954                    "audit_only",
1955                )
1956                .await;
1957            assert_eq!(
1958                bridge.revoke_pending_user_message(&revoked_id).await,
1959                PendingUserMessageMutationResult::Mutated
1960            );
1961            bridge.cancelled.store(true, Ordering::SeqCst);
1962
1963            let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1964            assert_eq!(
1965                replacement_bridge
1966                    .revoke_pending_user_message(&revoked_id)
1967                    .await,
1968                PendingUserMessageMutationResult::AlreadyRevoked
1969            );
1970            let delivered = replacement_bridge
1971                .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1972                .await;
1973            assert_eq!(delivered.len(), 1);
1974            assert_eq!(delivered[0].message_id, delivered_id);
1975            assert_eq!(delivered[0].content, "deliver me");
1976            assert_eq!(
1977                bridge.revoke_pending_user_message(&delivered_id).await,
1978                PendingUserMessageMutationResult::AlreadyDelivered
1979            );
1980        });
1981    }
1982
1983    #[test]
1984    fn queued_transcript_injections_preserve_user_reminder_separation() {
1985        let runtime = tokio::runtime::Builder::new_current_thread()
1986            .enable_all()
1987            .build()
1988            .unwrap();
1989        runtime.block_on(async {
1990            let bridge = test_bridge();
1991            bridge
1992                .push_queued_user_message("human follow-up".to_string(), "finish_step")
1993                .await;
1994            let reminder_id = bridge
1995                .push_queued_session_remind_from_params(&serde_json::json!({
1996                    "body": "Host-provided ambient context.",
1997                    "tags": ["host"],
1998                    "dedupe_key": "host-context",
1999                    "ttl_turns": 2,
2000                    "mode": "audit_only",
2001                    "_meta": {"harn": {"source": "test"}},
2002                }))
2003                .await
2004                .expect("valid reminder");
2005
2006            let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2007            assert_eq!(finish_step.len(), 1);
2008            assert_eq!(finish_step[0].content, "human follow-up");
2009
2010            let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2011            assert!(no_user_messages.is_empty());
2012
2013            let injections = bridge
2014                .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2015                .await;
2016            assert_eq!(injections.len(), 1);
2017            let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2018                panic!("expected queued reminder");
2019            };
2020            assert_eq!(reminder.reminder.id, reminder_id);
2021            assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2022            assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2023            assert_eq!(
2024                reminder.reminder.dedupe_key.as_deref(),
2025                Some("host-context")
2026            );
2027            assert_eq!(reminder.reminder.ttl_turns, Some(2));
2028            assert_eq!(
2029                reminder.reminder.source,
2030                crate::llm::helpers::ReminderSource::Bridge
2031            );
2032        });
2033    }
2034
2035    #[test]
2036    fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2037        let runtime = tokio::runtime::Builder::new_current_thread()
2038            .enable_all()
2039            .build()
2040            .unwrap();
2041        runtime.block_on(async {
2042            let bridge = test_bridge();
2043            let message_id = bridge
2044                .push_pending_user_message(
2045                    "human follow-up".to_string(),
2046                    serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2047                    "finish_step",
2048                )
2049                .await;
2050            let reminder_id = bridge
2051                .push_queued_session_remind_from_params(&serde_json::json!({
2052                    "id": "rem-test",
2053                    "body": "Host reminder",
2054                    "tags": ["host"],
2055                    "dedupe_key": "host-reminder",
2056                    "ttl_turns": 2,
2057                    "mode": "interrupt_immediate",
2058                }))
2059                .await
2060                .expect("valid session/remind payload");
2061
2062            let pending = bridge.pending_injections_json().await;
2063            assert_eq!(pending["pendingCount"], 2);
2064            assert_eq!(pending["injections"][0]["kind"], "user");
2065            assert_eq!(pending["injections"][0]["id"], message_id);
2066            assert_eq!(pending["injections"][0]["messageId"], message_id);
2067            assert_eq!(pending["injections"][0]["mode"], "finish_step");
2068            assert_eq!(pending["injections"][0]["position"], 0);
2069            assert_eq!(pending["injections"][1]["kind"], "reminder");
2070            assert_eq!(pending["injections"][1]["id"], reminder_id);
2071            assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2072            assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2073            assert_eq!(pending["injections"][1]["body"], "Host reminder");
2074            assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2075            assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2076            assert_eq!(pending["injections"][1]["position"], 1);
2077        });
2078    }
2079
2080    #[test]
2081    fn pending_reminders_support_revoke_and_delivery_states() {
2082        let runtime = tokio::runtime::Builder::new_current_thread()
2083            .enable_all()
2084            .build()
2085            .unwrap();
2086        runtime.block_on(async {
2087            let bridge = test_bridge();
2088            let revoked_id = bridge
2089                .push_queued_session_remind_from_params(&serde_json::json!({
2090                    "id": "rem-revoke",
2091                    "body": "remove me",
2092                    "mode": "finish_step",
2093                }))
2094                .await
2095                .expect("valid session/remind payload");
2096            let delivered_id = bridge
2097                .push_queued_session_remind_from_params(&serde_json::json!({
2098                    "id": "rem-deliver",
2099                    "body": "deliver me",
2100                    "mode": "finish_step",
2101                }))
2102                .await
2103                .expect("valid session/remind payload");
2104
2105            assert_eq!(
2106                bridge.revoke_pending_reminder(&revoked_id).await,
2107                PendingReminderMutationResult::Mutated
2108            );
2109            assert_eq!(
2110                bridge.revoke_pending_reminder(&revoked_id).await,
2111                PendingReminderMutationResult::AlreadyRevoked
2112            );
2113
2114            let pending = bridge.pending_injections_json().await;
2115            assert_eq!(pending["pendingCount"], 1);
2116            assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2117
2118            let delivered = bridge
2119                .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2120                .await;
2121            assert_eq!(delivered.len(), 1);
2122            let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2123                panic!("expected delivered reminder");
2124            };
2125            assert_eq!(reminder.reminder.id, delivered_id);
2126
2127            assert_eq!(
2128                bridge.revoke_pending_reminder(&delivered_id).await,
2129                PendingReminderMutationResult::AlreadyDelivered
2130            );
2131            assert_eq!(
2132                bridge.revoke_pending_reminder("missing").await,
2133                PendingReminderMutationResult::UnknownReminderId
2134            );
2135        });
2136    }
2137
2138    #[test]
2139    fn bridge_remind_modes_honor_delivery_checkpoints() {
2140        let runtime = tokio::runtime::Builder::new_current_thread()
2141            .enable_all()
2142            .build()
2143            .unwrap();
2144        runtime.block_on(async {
2145            let cases = [
2146                (
2147                    "interrupt_immediate",
2148                    DeliveryCheckpoint::InterruptImmediate,
2149                    DeliveryCheckpoint::AfterCurrentOperation,
2150                ),
2151                (
2152                    "finish_step",
2153                    DeliveryCheckpoint::AfterCurrentOperation,
2154                    DeliveryCheckpoint::EndOfInteraction,
2155                ),
2156                (
2157                    "audit_only",
2158                    DeliveryCheckpoint::EndOfInteraction,
2159                    DeliveryCheckpoint::InterruptImmediate,
2160                ),
2161            ];
2162
2163            for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2164                let bridge = test_bridge();
2165                bridge
2166                    .push_queued_session_remind_from_params(&serde_json::json!({
2167                        "body": format!("Reminder for {mode}"),
2168                        "mode": mode,
2169                    }))
2170                    .await
2171                    .expect("valid session/remind payload");
2172
2173                let premature = bridge
2174                    .take_queued_transcript_injections_for(wrong_checkpoint)
2175                    .await;
2176                assert!(
2177                    premature.is_empty(),
2178                    "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2179                );
2180
2181                let delivered = bridge
2182                    .take_queued_transcript_injections_for(expected_checkpoint)
2183                    .await;
2184                assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2185                let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2186                    panic!("expected reminder for {mode}");
2187                };
2188                assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2189            }
2190        });
2191    }
2192
2193    #[test]
2194    fn session_remind_validation_rejects_user_message_shape() {
2195        let err = queued_session_remind_from_params(&serde_json::json!({
2196            "content": "this is still a user message",
2197            "mode": "interrupt_immediate",
2198        }))
2199        .expect_err("session/remind must require a reminder body");
2200        assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2201        assert!(err.contains("body"));
2202    }
2203
2204    #[test]
2205    fn session_remind_validation_rejects_unknown_options_separately() {
2206        let err = queued_session_remind_from_params(&serde_json::json!({
2207            "body": "valid body",
2208            "unknown_host_field": true,
2209        }))
2210        .expect_err("session/remind must reject unknown top-level fields");
2211        assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2212        assert!(err.contains("unknown_host_field"));
2213    }
2214
2215    #[test]
2216    fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2217        let err = queued_session_remind_from_params(&serde_json::json!({
2218            "body": "valid body",
2219            "propagate": "workspace",
2220        }))
2221        .expect_err("session/remind must reject unknown propagate values");
2222        assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2223        assert!(err.contains("propagate"));
2224    }
2225
2226    #[test]
2227    fn test_json_result_to_vm_value_string() {
2228        let val = serde_json::json!("hello");
2229        let vm_val = json_result_to_vm_value(&val);
2230        assert_eq!(vm_val.display(), "hello");
2231    }
2232
2233    #[test]
2234    fn test_json_result_to_vm_value_dict() {
2235        let val = serde_json::json!({"name": "test", "count": 42});
2236        let vm_val = json_result_to_vm_value(&val);
2237        let VmValue::Dict(d) = &vm_val else {
2238            unreachable!("Expected Dict, got {:?}", vm_val);
2239        };
2240        assert_eq!(d.get("name").unwrap().display(), "test");
2241        assert_eq!(d.get("count").unwrap().display(), "42");
2242    }
2243
2244    #[test]
2245    fn test_json_result_to_vm_value_null() {
2246        let val = serde_json::json!(null);
2247        let vm_val = json_result_to_vm_value(&val);
2248        assert!(matches!(vm_val, VmValue::Nil));
2249    }
2250
2251    #[test]
2252    fn test_json_result_to_vm_value_nested() {
2253        let val = serde_json::json!({
2254            "text": "response",
2255            "tool_calls": [
2256                {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2257            ],
2258            "input_tokens": 100,
2259            "output_tokens": 50,
2260        });
2261        let vm_val = json_result_to_vm_value(&val);
2262        let VmValue::Dict(d) = &vm_val else {
2263            unreachable!("Expected Dict, got {:?}", vm_val);
2264        };
2265        assert_eq!(d.get("text").unwrap().display(), "response");
2266        let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2267            unreachable!("Expected List for tool_calls");
2268        };
2269        assert_eq!(list.len(), 1);
2270    }
2271
2272    #[test]
2273    fn parse_host_tools_list_accepts_object_wrapper() {
2274        let tools = parse_host_tools_list_response(serde_json::json!({
2275            "tools": [
2276                {
2277                    "name": "Read",
2278                    "description": "Read a file",
2279                    "schema": {"type": "object"},
2280                }
2281            ]
2282        }))
2283        .expect("tool list");
2284
2285        assert_eq!(tools.len(), 1);
2286        assert_eq!(tools[0]["name"], "Read");
2287        assert_eq!(tools[0]["deprecated"], false);
2288    }
2289
2290    #[test]
2291    fn parse_host_tools_list_accepts_compat_fields() {
2292        let tools = parse_host_tools_list_response(serde_json::json!({
2293            "result": {
2294                "tools": [
2295                    {
2296                        "name": "Edit",
2297                        "short_description": "Apply an edit",
2298                        "input_schema": {"type": "object"},
2299                        "deprecated": true,
2300                    }
2301                ]
2302            }
2303        }))
2304        .expect("tool list");
2305
2306        assert_eq!(tools[0]["description"], "Apply an edit");
2307        assert_eq!(tools[0]["schema"]["type"], "object");
2308        assert_eq!(tools[0]["deprecated"], true);
2309    }
2310
2311    #[test]
2312    fn parse_host_tools_list_requires_tool_names() {
2313        let err = parse_host_tools_list_response(serde_json::json!({
2314            "tools": [
2315                {"description": "missing name"}
2316            ]
2317        }))
2318        .expect_err("expected error");
2319        assert!(err
2320            .to_string()
2321            .contains("host/tools/list: every tool must include a string `name`"));
2322    }
2323
2324    #[test]
2325    fn test_timeout_duration() {
2326        assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2327    }
2328}