1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33 Arc::new(move |line: &str| {
34 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35 let mut stdout = std::io::stdout().lock();
36 stdout
37 .write_all(line.as_bytes())
38 .map_err(|e| format!("Bridge write error: {e}"))?;
39 stdout
40 .write_all(b"\n")
41 .map_err(|e| format!("Bridge write error: {e}"))?;
42 stdout
43 .flush()
44 .map_err(|e| format!("Bridge flush error: {e}"))?;
45 Ok(())
46 })
47}
48
49pub struct HostBridge {
56 next_id: AtomicU64,
57 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59 cancelled: Arc<AtomicBool>,
61 cancel_notify: Arc<Notify>,
63 writer: HostBridgeWriter,
65 session_id: std::sync::Mutex<String>,
67 script_name: std::sync::Mutex<String>,
69 queued_transcript_injections: HostBridgeInjectionState,
71 resume_requested: Arc<AtomicBool>,
73 skills_reload_requested: Arc<AtomicBool>,
78 daemon_idle: Arc<AtomicBool>,
80 prompt_stop_reason: std::sync::Mutex<Option<String>>,
86 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90 in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95 module_path: PathBuf,
96 exported_functions: BTreeMap<String, Rc<VmClosure>>,
97 vm: Vm,
98}
99
100impl InProcessHost {
101 fn dispatch<'a>(
107 &'a self,
108 method: &'a str,
109 params: serde_json::Value,
110 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111 Box::pin(async move {
112 match method {
113 "builtin_call" => {
114 let name = params
115 .get("name")
116 .and_then(|value| value.as_str())
117 .unwrap_or_default();
118 let args = params
119 .get("args")
120 .and_then(|value| value.as_array())
121 .cloned()
122 .unwrap_or_default()
123 .into_iter()
124 .map(|value| json_result_to_vm_value(&value))
125 .collect::<Vec<_>>();
126 self.invoke_export(name, &args).await
127 }
128 "host/tools/list" => self
129 .invoke_optional_export("host_tools_list", &[])
130 .await
131 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132 "session/request_permission" => self.request_permission(params).await,
133 other => Err(VmError::Runtime(format!(
134 "playground host backend does not implement bridge method '{other}'"
135 ))),
136 }
137 })
138 }
139
140 async fn invoke_export(
141 &self,
142 name: &str,
143 args: &[VmValue],
144 ) -> Result<serde_json::Value, VmError> {
145 let Some(closure) = self.exported_functions.get(name) else {
146 return Err(VmError::Runtime(format!(
147 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148 self.module_path.display()
149 )));
150 };
151
152 let mut vm = self.vm.child_vm_for_host();
153 let result = vm.call_closure_pub(closure, args).await?;
154 Ok(crate::llm::vm_value_to_json(&result))
155 }
156
157 async fn invoke_optional_export(
158 &self,
159 name: &str,
160 args: &[VmValue],
161 ) -> Result<Option<serde_json::Value>, VmError> {
162 if !self.exported_functions.contains_key(name) {
163 return Ok(None);
164 }
165 self.invoke_export(name, args).await.map(Some)
166 }
167
168 async fn request_permission(
169 &self,
170 params: serde_json::Value,
171 ) -> Result<serde_json::Value, VmError> {
172 let Some(closure) = self.exported_functions.get("request_permission") else {
175 return Ok(crate::llm::acp_permission::allow_response());
176 };
177
178 let tool_call = params.get("toolCall");
179 let tool_name = tool_call
180 .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
181 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
182 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
183 .and_then(|value| value.as_str())
184 .unwrap_or_default();
185 let tool_args = tool_call
186 .and_then(|tool_call| tool_call.get("rawInput"))
187 .map(json_result_to_vm_value)
188 .unwrap_or(VmValue::Nil);
189 let full_payload = json_result_to_vm_value(¶ms);
190
191 let arg_count = closure.func.params.len();
192 let args = if arg_count >= 3 {
193 vec![
194 VmValue::String(Rc::from(tool_name.to_string())),
195 tool_args,
196 full_payload,
197 ]
198 } else if arg_count == 2 {
199 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
200 } else if arg_count == 1 {
201 vec![full_payload]
202 } else {
203 Vec::new()
204 };
205
206 let mut vm = self.vm.child_vm_for_host();
207 let result = vm.call_closure_pub(closure, &args).await?;
208 let payload = match result {
213 VmValue::Bool(granted) => {
214 if granted {
215 crate::llm::acp_permission::allow_response()
216 } else {
217 crate::llm::acp_permission::reject_response(None)
218 }
219 }
220 VmValue::String(reason) if !reason.is_empty() => {
221 crate::llm::acp_permission::reject_response(Some(reason.to_string()))
222 }
223 other => {
224 let json = crate::llm::vm_value_to_json(&other);
225 if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
226 if granted {
227 crate::llm::acp_permission::allow_response()
228 } else {
229 crate::llm::acp_permission::reject_response(
230 json.get("reason")
231 .and_then(|value| value.as_str())
232 .map(str::to_string),
233 )
234 }
235 } else if json.get("outcome").is_some() {
236 json
238 } else if other.is_truthy() {
239 crate::llm::acp_permission::allow_response()
240 } else {
241 crate::llm::acp_permission::reject_response(None)
242 }
243 }
244 };
245 Ok(payload)
246 }
247}
248
249#[derive(Clone, Copy, Debug, PartialEq, Eq)]
259pub enum QueuedUserMessageMode {
260 InterruptImmediate,
261 FinishStep,
262 AuditOnly,
263}
264
265#[derive(Clone, Copy, Debug, PartialEq, Eq)]
266pub enum DeliveryCheckpoint {
267 InterruptImmediate,
268 AfterCurrentOperation,
269 EndOfInteraction,
270}
271
272impl QueuedUserMessageMode {
273 fn from_str(value: &str) -> Self {
274 match value {
275 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
276 "finish_step" | "after_current_operation" => Self::FinishStep,
277 _ => Self::AuditOnly,
282 }
283 }
284
285 fn as_str(self) -> &'static str {
286 match self {
287 Self::InterruptImmediate => "interrupt_immediate",
288 Self::FinishStep => "finish_step",
289 Self::AuditOnly => "audit_only",
290 }
291 }
292}
293
294#[derive(Clone, Debug, PartialEq, Eq)]
295pub struct QueuedUserMessage {
296 pub message_id: String,
297 pub content: String,
298 pub transcript_content: serde_json::Value,
299 pub mode: QueuedUserMessageMode,
300}
301
302#[derive(Clone, Debug, PartialEq, Eq)]
303pub struct QueuedReminder {
304 pub reminder: crate::llm::helpers::SystemReminder,
305 pub mode: QueuedUserMessageMode,
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub enum QueuedTranscriptInjection {
310 User(QueuedUserMessage),
311 Reminder(QueuedReminder),
312}
313
314#[derive(Debug, Default)]
315struct QueuedTranscriptInjections {
316 queue: VecDeque<QueuedTranscriptInjection>,
317 revoked_user_message_ids: HashSet<String>,
318 delivered_user_message_ids: HashSet<String>,
319 revoked_reminder_ids: HashSet<String>,
320 delivered_reminder_ids: HashSet<String>,
321}
322
323#[derive(Clone, Debug, Default)]
324pub struct HostBridgeInjectionState {
325 inner: Arc<Mutex<QueuedTranscriptInjections>>,
326}
327
328#[derive(Clone, Copy, Debug, PartialEq, Eq)]
329pub enum PendingUserMessageMutationResult {
330 Mutated,
331 AlreadyRevoked,
332 AlreadyDelivered,
333 UnknownMessageId,
334}
335
336impl QueuedTranscriptInjection {
337 fn mode(&self) -> QueuedUserMessageMode {
338 match self {
339 Self::User(message) => message.mode,
340 Self::Reminder(reminder) => reminder.mode,
341 }
342 }
343
344 fn pending_json(&self, position: usize) -> serde_json::Value {
345 match self {
346 Self::User(message) => serde_json::json!({
347 "kind": "user",
348 "id": message.message_id,
349 "messageId": message.message_id,
350 "mode": message.mode.as_str(),
351 "position": position,
352 "content": message.transcript_content,
353 }),
354 Self::Reminder(reminder) => serde_json::json!({
355 "kind": "reminder",
356 "id": reminder.reminder.id,
357 "reminderId": reminder.reminder.id,
358 "mode": reminder.mode.as_str(),
359 "position": position,
360 "body": reminder.reminder.body,
361 "tags": reminder.reminder.tags,
362 "dedupeKey": reminder.reminder.dedupe_key,
363 "ttlTurns": reminder.reminder.ttl_turns,
364 "preserveOnCompact": reminder.reminder.preserve_on_compact,
365 "propagate": reminder.reminder.propagate.as_str(),
366 "roleHint": reminder.reminder.role_hint.as_str(),
367 "source": reminder.reminder.source.as_str(),
368 "firedAtTurn": reminder.reminder.fired_at_turn,
369 "originatingAgentId": reminder.reminder.originating_agent_id,
370 }),
371 }
372 }
373}
374
375#[derive(Clone, Copy, Debug, PartialEq, Eq)]
376pub enum PendingReminderMutationResult {
377 Mutated,
378 AlreadyRevoked,
379 AlreadyDelivered,
380 UnknownReminderId,
381}
382
383fn new_inject_message_id() -> String {
384 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
385}
386
387impl HostBridgeInjectionState {
388 pub fn new() -> Self {
389 Self::default()
390 }
391
392 pub async fn push_pending_user_message(
393 &self,
394 content: String,
395 transcript_content: serde_json::Value,
396 mode: &str,
397 ) -> String {
398 let message_id = new_inject_message_id();
399 self.inner
400 .lock()
401 .await
402 .queue
403 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
404 message_id: message_id.clone(),
405 content,
406 transcript_content,
407 mode: QueuedUserMessageMode::from_str(mode),
408 }));
409 message_id
410 }
411
412 pub async fn revoke_pending_user_message(
413 &self,
414 message_id: &str,
415 ) -> PendingUserMessageMutationResult {
416 let mut state = self.inner.lock().await;
417 let mut retained = VecDeque::new();
418 let mut revoked = false;
419 while let Some(injection) = state.queue.pop_front() {
420 match &injection {
421 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
422 revoked = true;
423 }
424 _ => retained.push_back(injection),
425 }
426 }
427 state.queue = retained;
428 if revoked {
429 state
430 .revoked_user_message_ids
431 .insert(message_id.to_string());
432 return PendingUserMessageMutationResult::Mutated;
433 }
434 if state.revoked_user_message_ids.contains(message_id) {
435 PendingUserMessageMutationResult::AlreadyRevoked
436 } else if state.delivered_user_message_ids.contains(message_id) {
437 PendingUserMessageMutationResult::AlreadyDelivered
438 } else {
439 PendingUserMessageMutationResult::UnknownMessageId
440 }
441 }
442
443 pub async fn revoke_pending_reminder(
444 &self,
445 reminder_id: &str,
446 ) -> PendingReminderMutationResult {
447 let mut state = self.inner.lock().await;
448 let mut retained = VecDeque::new();
449 let mut revoked = false;
450 while let Some(injection) = state.queue.pop_front() {
451 match &injection {
452 QueuedTranscriptInjection::Reminder(reminder)
453 if reminder.reminder.id == reminder_id =>
454 {
455 revoked = true;
456 }
457 _ => retained.push_back(injection),
458 }
459 }
460 state.queue = retained;
461 if revoked {
462 state.revoked_reminder_ids.insert(reminder_id.to_string());
463 return PendingReminderMutationResult::Mutated;
464 }
465 if state.revoked_reminder_ids.contains(reminder_id) {
466 PendingReminderMutationResult::AlreadyRevoked
467 } else if state.delivered_reminder_ids.contains(reminder_id) {
468 PendingReminderMutationResult::AlreadyDelivered
469 } else {
470 PendingReminderMutationResult::UnknownReminderId
471 }
472 }
473
474 pub async fn replace_pending_user_message(
475 &self,
476 message_id: &str,
477 content: String,
478 transcript_content: serde_json::Value,
479 ) -> PendingUserMessageMutationResult {
480 let mut state = self.inner.lock().await;
481 for injection in &mut state.queue {
482 if let QueuedTranscriptInjection::User(message) = injection {
483 if message.message_id == message_id {
484 message.content = content;
485 message.transcript_content = transcript_content;
486 return PendingUserMessageMutationResult::Mutated;
487 }
488 }
489 }
490 if state.revoked_user_message_ids.contains(message_id) {
491 PendingUserMessageMutationResult::AlreadyRevoked
492 } else if state.delivered_user_message_ids.contains(message_id) {
493 PendingUserMessageMutationResult::AlreadyDelivered
494 } else {
495 PendingUserMessageMutationResult::UnknownMessageId
496 }
497 }
498
499 async fn push_session_reminder(&self, reminder: QueuedReminder) {
500 self.inner
501 .lock()
502 .await
503 .queue
504 .push_back(QueuedTranscriptInjection::Reminder(reminder));
505 }
506
507 pub async fn pending_injections_json(&self) -> serde_json::Value {
508 let state = self.inner.lock().await;
509 let injections = state
510 .queue
511 .iter()
512 .enumerate()
513 .map(|(position, injection)| injection.pending_json(position))
514 .collect::<Vec<_>>();
515 serde_json::json!({
516 "pendingCount": injections.len(),
517 "injections": injections,
518 })
519 }
520}
521
522fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
523 format!(
524 "{}: {}",
525 Code::ReminderUnknownOption.as_str(),
526 message.as_ref()
527 )
528}
529
530fn session_remind_shape_error(message: impl AsRef<str>) -> String {
531 format!(
532 "{}: {}",
533 Code::ReminderInvalidShape.as_str(),
534 message.as_ref()
535 )
536}
537
538fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
539 format!(
540 "{}: {}",
541 Code::ReminderUnknownPropagate.as_str(),
542 message.as_ref()
543 )
544}
545
546fn string_field(
547 map: &serde_json::Map<String, serde_json::Value>,
548 key: &str,
549 required: bool,
550) -> Result<Option<String>, String> {
551 match map.get(key) {
552 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
553 format!("`{key}` must be a non-empty string"),
554 )),
555 None | Some(serde_json::Value::Null) => Ok(None),
556 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
557 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
558 ),
559 Some(serde_json::Value::String(value)) => {
560 let trimmed = value.trim();
561 if trimmed.is_empty() {
562 Ok(None)
563 } else {
564 Ok(Some(trimmed.to_string()))
565 }
566 }
567 Some(other) => Err(session_remind_shape_error(format!(
568 "`{key}` must be a string, got {other}"
569 ))),
570 }
571}
572
573fn bool_field(
574 map: &serde_json::Map<String, serde_json::Value>,
575 key: &str,
576) -> Result<Option<bool>, String> {
577 match map.get(key) {
578 None | Some(serde_json::Value::Null) => Ok(None),
579 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
580 Some(other) => Err(session_remind_shape_error(format!(
581 "`{key}` must be a bool, got {other}"
582 ))),
583 }
584}
585
586fn int_field(
587 map: &serde_json::Map<String, serde_json::Value>,
588 key: &str,
589) -> Result<Option<i64>, String> {
590 match map.get(key) {
591 None | Some(serde_json::Value::Null) => Ok(None),
592 Some(serde_json::Value::Number(value)) => {
593 let Some(value) = value.as_i64() else {
594 return Err(session_remind_shape_error(format!(
595 "`{key}` must be an integer"
596 )));
597 };
598 Ok(Some(value))
599 }
600 Some(other) => Err(session_remind_shape_error(format!(
601 "`{key}` must be an int, got {other}"
602 ))),
603 }
604}
605
606fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
607 let Some(value) = map.get("tags") else {
608 return Ok(Vec::new());
609 };
610 if value.is_null() {
611 return Ok(Vec::new());
612 }
613 let Some(values) = value.as_array() else {
614 return Err(session_remind_shape_error("`tags` must be a list"));
615 };
616 let mut tags = Vec::new();
617 for value in values {
618 let Some(tag) = value.as_str() else {
619 return Err(session_remind_shape_error(format!(
620 "`tags` entries must be strings, got {value}"
621 )));
622 };
623 let tag = tag.trim();
624 if tag.is_empty() {
625 return Err(session_remind_shape_error(
626 "`tags` entries must be non-empty strings",
627 ));
628 }
629 if !tags.iter().any(|existing| existing == tag) {
630 tags.push(tag.to_string());
631 }
632 }
633 Ok(tags)
634}
635
636fn session_remind_payload_from_value(
637 value: &serde_json::Value,
638) -> Result<crate::llm::helpers::SystemReminder, String> {
639 let Some(map) = value.as_object() else {
640 return Err(session_remind_shape_error(
641 "session/remind payload must be a reminder object",
642 ));
643 };
644 const ALLOWED: &[&str] = &[
645 "_meta",
646 "body",
647 "dedupe_key",
648 "fired_at_turn",
649 "id",
650 "preserve_on_compact",
651 "propagate",
652 "role_hint",
653 "source",
654 "tags",
655 "ttl_turns",
656 ];
657 let unknown = map
658 .keys()
659 .filter(|key| !ALLOWED.contains(&key.as_str()))
660 .map(String::as_str)
661 .collect::<Vec<_>>();
662 if !unknown.is_empty() {
663 if unknown.contains(&"content") {
664 return Err(session_remind_shape_error(
665 "session/remind expects reminder `body`, not user-message `content`",
666 ));
667 }
668 return Err(reminder_unknown_option_error(format!(
669 "unknown reminder option(s): {}",
670 unknown.join(", ")
671 )));
672 }
673 if let Some(meta) = map.get("_meta") {
674 if !meta.is_null() && !meta.is_object() {
675 return Err(session_remind_shape_error("`_meta` must be an object"));
676 }
677 }
678 let ttl_turns = int_field(map, "ttl_turns")?;
679 if let Some(value) = ttl_turns {
680 if value <= 0 {
681 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
682 }
683 }
684 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
685 if fired_at_turn < 0 {
686 return Err(session_remind_shape_error(
687 "`fired_at_turn` must be >= 0 when provided",
688 ));
689 }
690 match string_field(map, "source", false)?.as_deref() {
691 None | Some("bridge") => {}
692 Some(_) => {
693 return Err(session_remind_shape_error(
694 "`source` for session/remind must be bridge when provided",
695 ))
696 }
697 }
698 let propagate = match string_field(map, "propagate", false)?.as_deref() {
699 None => crate::llm::helpers::ReminderPropagate::Session,
700 Some("all") => crate::llm::helpers::ReminderPropagate::All,
701 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
702 Some("none") => crate::llm::helpers::ReminderPropagate::None,
703 Some(_) => {
704 return Err(reminder_unknown_propagate_error(
705 "`propagate` must be one of all, session, or none",
706 ))
707 }
708 };
709 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
710 None => crate::llm::helpers::ReminderRoleHint::System,
711 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
712 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
713 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
714 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
715 Some(_) => {
716 return Err(session_remind_shape_error(
717 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
718 ))
719 }
720 };
721 Ok(crate::llm::helpers::SystemReminder {
722 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
723 tags: tags_field(map)?,
724 dedupe_key: string_field(map, "dedupe_key", false)?,
725 ttl_turns,
726 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
727 propagate,
728 role_hint,
729 source: crate::llm::helpers::ReminderSource::Bridge,
730 body: string_field(map, "body", true)?.unwrap_or_default(),
731 fired_at_turn,
732 originating_agent_id: None,
733 })
734}
735
736fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
746 let session_id = params
747 .get("sessionId")
748 .or_else(|| params.get("session_id"))
749 .and_then(|value| value.as_str())
750 .unwrap_or_default();
751 let call_id = params
752 .get("toolCallId")
753 .or_else(|| params.get("tool_call_id"))
754 .or_else(|| params.get("callId"))
755 .or_else(|| params.get("call_id"))
756 .and_then(|value| value.as_str())
757 .unwrap_or_default();
758 if call_id.is_empty() {
759 return;
760 }
761 let reason = params
762 .get("reason")
763 .and_then(|value| value.as_str())
764 .unwrap_or("host cancelled in-flight tool call")
765 .to_string();
766 let inject_reminder = params
767 .get("injectReminder")
768 .or_else(|| params.get("inject_reminder"))
769 .and_then(|value| value.as_bool())
770 .unwrap_or(true);
771 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
772}
773
774fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
775 let mode = QueuedUserMessageMode::from_str(
776 params
777 .get("mode")
778 .and_then(|value| value.as_str())
779 .unwrap_or("audit_only"),
780 );
781 let reminder_value = if let Some(reminder) = params.get("reminder") {
782 reminder.clone()
783 } else {
784 let Some(params) = params.as_object() else {
785 return Err(session_remind_shape_error(
786 "session/remind params must be an object",
787 ));
788 };
789 let mut reminder = params.clone();
790 reminder.remove("mode");
791 reminder.remove("sessionId");
792 reminder.remove("session_id");
793 serde_json::Value::Object(reminder)
794 };
795 Ok(QueuedReminder {
796 reminder: session_remind_payload_from_value(&reminder_value)?,
797 mode,
798 })
799}
800
801#[allow(clippy::new_without_default)]
803impl HostBridge {
804 pub fn new() -> Self {
809 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
810 Arc::new(Mutex::new(HashMap::new()));
811 let cancelled = Arc::new(AtomicBool::new(false));
812 let cancel_notify = Arc::new(Notify::new());
813 let queued_transcript_injections = HostBridgeInjectionState::default();
814 let resume_requested = Arc::new(AtomicBool::new(false));
815 let skills_reload_requested = Arc::new(AtomicBool::new(false));
816 let daemon_idle = Arc::new(AtomicBool::new(false));
817
818 let pending_clone = pending.clone();
820 let cancelled_clone = cancelled.clone();
821 let cancel_notify_clone = cancel_notify.clone();
822 let queued_clone = queued_transcript_injections.clone();
823 let resume_clone = resume_requested.clone();
824 let skills_reload_clone = skills_reload_requested.clone();
825 tokio::task::spawn_local(async move {
826 let stdin = tokio::io::stdin();
827 let reader = tokio::io::BufReader::new(stdin);
828 let mut lines = reader.lines();
829
830 while let Ok(Some(line)) = lines.next_line().await {
831 let line = line.trim().to_string();
832 if line.is_empty() {
833 continue;
834 }
835
836 let msg: serde_json::Value = match serde_json::from_str(&line) {
837 Ok(v) => v,
838 Err(_) => continue,
839 };
840
841 if msg.get("id").is_none() {
843 if let Some(method) = msg["method"].as_str() {
844 if method == "cancel" {
845 cancelled_clone.store(true, Ordering::SeqCst);
846 cancel_notify_clone.notify_waiters();
847 } else if method == "agent/resume" {
848 resume_clone.store(true, Ordering::SeqCst);
849 } else if method == "skills/update" {
850 skills_reload_clone.store(true, Ordering::SeqCst);
851 } else if method == "session/remind" {
852 let params = &msg["params"];
853 if let Ok(reminder) = queued_session_remind_from_params(params) {
854 queued_clone.push_session_reminder(reminder).await;
855 }
856 } else if method == "session/cancel_tool_call" {
857 handle_cancel_tool_call_notification(&msg["params"]);
858 }
859 }
860 continue;
861 }
862
863 if let Some(id) = msg["id"].as_u64() {
864 let mut pending = pending_clone.lock().await;
865 if let Some(sender) = pending.remove(&id) {
866 let _ = sender.send(msg);
867 }
868 }
869 }
870
871 let mut pending = pending_clone.lock().await;
873 pending.clear();
874 });
875
876 Self {
877 next_id: AtomicU64::new(1),
878 pending,
879 cancelled,
880 cancel_notify,
881 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
882 session_id: std::sync::Mutex::new(String::new()),
883 script_name: std::sync::Mutex::new(String::new()),
884 queued_transcript_injections,
885 resume_requested,
886 skills_reload_requested,
887 daemon_idle,
888 prompt_stop_reason: std::sync::Mutex::new(None),
889 visible_call_states: std::sync::Mutex::new(HashMap::new()),
890 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
891 in_process: None,
892 }
893 }
894
895 pub fn from_parts(
901 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
902 cancelled: Arc<AtomicBool>,
903 stdout_lock: Arc<std::sync::Mutex<()>>,
904 start_id: u64,
905 ) -> Self {
906 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
907 }
908
909 pub fn from_parts_with_writer(
910 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
911 cancelled: Arc<AtomicBool>,
912 writer: HostBridgeWriter,
913 start_id: u64,
914 ) -> Self {
915 Self::from_parts_with_writer_and_cancel_notify(
916 pending,
917 cancelled,
918 Arc::new(Notify::new()),
919 writer,
920 start_id,
921 )
922 }
923
924 pub fn from_parts_with_writer_and_cancel_notify(
925 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
926 cancelled: Arc<AtomicBool>,
927 cancel_notify: Arc<Notify>,
928 writer: HostBridgeWriter,
929 start_id: u64,
930 ) -> Self {
931 Self::from_parts_with_writer_cancel_notify_and_injection_state(
932 pending,
933 cancelled,
934 cancel_notify,
935 writer,
936 start_id,
937 None,
938 )
939 }
940
941 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
942 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
943 cancelled: Arc<AtomicBool>,
944 cancel_notify: Arc<Notify>,
945 writer: HostBridgeWriter,
946 start_id: u64,
947 injection_state: Option<HostBridgeInjectionState>,
948 ) -> Self {
949 Self {
950 next_id: AtomicU64::new(start_id),
951 pending,
952 cancelled,
953 cancel_notify,
954 writer,
955 session_id: std::sync::Mutex::new(String::new()),
956 script_name: std::sync::Mutex::new(String::new()),
957 queued_transcript_injections: injection_state.unwrap_or_default(),
958 resume_requested: Arc::new(AtomicBool::new(false)),
959 skills_reload_requested: Arc::new(AtomicBool::new(false)),
960 daemon_idle: Arc::new(AtomicBool::new(false)),
961 prompt_stop_reason: std::sync::Mutex::new(None),
962 visible_call_states: std::sync::Mutex::new(HashMap::new()),
963 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
964 in_process: None,
965 }
966 }
967
968 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
971 let exported_functions = vm.load_module_exports(module_path).await?;
972 Ok(Self {
973 next_id: AtomicU64::new(1),
974 pending: Arc::new(Mutex::new(HashMap::new())),
975 cancelled: Arc::new(AtomicBool::new(false)),
976 cancel_notify: Arc::new(Notify::new()),
977 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
978 session_id: std::sync::Mutex::new(String::new()),
979 script_name: std::sync::Mutex::new(String::new()),
980 queued_transcript_injections: HostBridgeInjectionState::default(),
981 resume_requested: Arc::new(AtomicBool::new(false)),
982 skills_reload_requested: Arc::new(AtomicBool::new(false)),
983 daemon_idle: Arc::new(AtomicBool::new(false)),
984 prompt_stop_reason: std::sync::Mutex::new(None),
985 visible_call_states: std::sync::Mutex::new(HashMap::new()),
986 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
987 in_process: Some(InProcessHost {
988 module_path: module_path.to_path_buf(),
989 exported_functions,
990 vm,
991 }),
992 })
993 }
994
995 pub fn set_session_id(&self, id: &str) {
997 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
998 }
999
1000 pub fn set_script_name(&self, name: &str) {
1002 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1003 }
1004
1005 fn get_script_name(&self) -> String {
1007 self.script_name
1008 .lock()
1009 .unwrap_or_else(|e| e.into_inner())
1010 .clone()
1011 }
1012
1013 pub fn get_session_id(&self) -> String {
1015 self.session_id
1016 .lock()
1017 .unwrap_or_else(|e| e.into_inner())
1018 .clone()
1019 }
1020
1021 fn write_line(&self, line: &str) -> Result<(), VmError> {
1023 (self.writer)(line).map_err(VmError::Runtime)
1024 }
1025
1026 pub async fn call(
1029 &self,
1030 method: &str,
1031 params: serde_json::Value,
1032 ) -> Result<serde_json::Value, VmError> {
1033 if let Some(in_process) = &self.in_process {
1034 return in_process.dispatch(method, params).await;
1035 }
1036
1037 if self.is_cancelled() {
1038 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1039 }
1040
1041 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1042 let cancel_wait = self.cancel_notify.notified();
1043 tokio::pin!(cancel_wait);
1044
1045 let request = crate::jsonrpc::request(id, method, params);
1046
1047 let (tx, rx) = oneshot::channel();
1048 {
1049 let mut pending = self.pending.lock().await;
1050 pending.insert(id, tx);
1051 }
1052
1053 let line = serde_json::to_string(&request)
1054 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1055 if let Err(e) = self.write_line(&line) {
1056 let mut pending = self.pending.lock().await;
1057 pending.remove(&id);
1058 return Err(e);
1059 }
1060
1061 if self.is_cancelled() {
1062 let mut pending = self.pending.lock().await;
1063 pending.remove(&id);
1064 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1065 }
1066
1067 let response = tokio::select! {
1068 result = rx => match result {
1069 Ok(msg) => msg,
1070 Err(_) => {
1071 return Err(VmError::Runtime(
1073 "Bridge: host closed connection before responding".into(),
1074 ));
1075 }
1076 },
1077 _ = &mut cancel_wait => {
1078 let mut pending = self.pending.lock().await;
1079 pending.remove(&id);
1080 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1081 }
1082 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1083 let mut pending = self.pending.lock().await;
1084 pending.remove(&id);
1085 return Err(VmError::Runtime(format!(
1086 "Bridge: host did not respond to '{method}' within {}s",
1087 DEFAULT_TIMEOUT.as_secs()
1088 )));
1089 }
1090 };
1091
1092 if let Some(error) = response.get("error") {
1093 let message = error["message"].as_str().unwrap_or("Unknown host error");
1094 let code = error["code"].as_i64().unwrap_or(-1);
1095 if code == -32001 {
1097 return Err(VmError::CategorizedError {
1098 message: message.to_string(),
1099 category: ErrorCategory::ToolRejected,
1100 });
1101 }
1102 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1103 }
1104
1105 Ok(response["result"].clone())
1106 }
1107
1108 pub fn notify(&self, method: &str, params: serde_json::Value) {
1111 let notification = crate::jsonrpc::notification(method, params);
1112 if self.in_process.is_some() {
1113 return;
1114 }
1115 if let Ok(line) = serde_json::to_string(¬ification) {
1116 let _ = self.write_line(&line);
1117 }
1118 }
1119
1120 pub fn is_cancelled(&self) -> bool {
1122 self.cancelled.load(Ordering::SeqCst)
1123 }
1124
1125 pub fn take_resume_signal(&self) -> bool {
1126 self.resume_requested.swap(false, Ordering::SeqCst)
1127 }
1128
1129 pub fn signal_resume(&self) {
1130 self.resume_requested.store(true, Ordering::SeqCst);
1131 }
1132
1133 pub fn set_daemon_idle(&self, idle: bool) {
1134 self.daemon_idle.store(idle, Ordering::SeqCst);
1135 }
1136
1137 pub fn is_daemon_idle(&self) -> bool {
1138 self.daemon_idle.load(Ordering::SeqCst)
1139 }
1140
1141 pub fn set_prompt_stop_reason(&self, reason: &str) {
1146 *self
1147 .prompt_stop_reason
1148 .lock()
1149 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1150 }
1151
1152 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1157 self.prompt_stop_reason
1158 .lock()
1159 .unwrap_or_else(|e| e.into_inner())
1160 .take()
1161 }
1162
1163 pub fn take_skills_reload_signal(&self) -> bool {
1168 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1169 }
1170
1171 pub fn signal_skills_reload(&self) {
1175 self.skills_reload_requested.store(true, Ordering::SeqCst);
1176 }
1177
1178 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1184 let result = self.call("skills/list", serde_json::json!({})).await?;
1185 match result {
1186 serde_json::Value::Array(items) => Ok(items),
1187 serde_json::Value::Object(map) => match map.get("skills") {
1188 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1189 _ => Err(VmError::Runtime(
1190 "skills/list: host response must be an array or { skills: [...] }".into(),
1191 )),
1192 },
1193 _ => Err(VmError::Runtime(
1194 "skills/list: unexpected response shape".into(),
1195 )),
1196 }
1197 }
1198
1199 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1205 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1206 parse_host_tools_list_response(result)
1207 }
1208
1209 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1213 self.call("skills/fetch", serde_json::json!({ "id": id }))
1214 .await
1215 }
1216
1217 pub fn injection_state(&self) -> HostBridgeInjectionState {
1218 self.queued_transcript_injections.clone()
1219 }
1220
1221 pub async fn push_pending_user_message(
1222 &self,
1223 content: String,
1224 transcript_content: serde_json::Value,
1225 mode: &str,
1226 ) -> String {
1227 self.queued_transcript_injections
1228 .push_pending_user_message(content, transcript_content, mode)
1229 .await
1230 }
1231
1232 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1233 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1234 .await
1235 }
1236
1237 pub async fn revoke_pending_user_message(
1238 &self,
1239 message_id: &str,
1240 ) -> PendingUserMessageMutationResult {
1241 self.queued_transcript_injections
1242 .revoke_pending_user_message(message_id)
1243 .await
1244 }
1245
1246 pub async fn revoke_pending_reminder(
1247 &self,
1248 reminder_id: &str,
1249 ) -> PendingReminderMutationResult {
1250 self.queued_transcript_injections
1251 .revoke_pending_reminder(reminder_id)
1252 .await
1253 }
1254
1255 pub async fn pending_injections_json(&self) -> serde_json::Value {
1256 self.queued_transcript_injections
1257 .pending_injections_json()
1258 .await
1259 }
1260
1261 pub async fn replace_pending_user_message(
1262 &self,
1263 message_id: &str,
1264 content: String,
1265 transcript_content: serde_json::Value,
1266 ) -> PendingUserMessageMutationResult {
1267 self.queued_transcript_injections
1268 .replace_pending_user_message(message_id, content, transcript_content)
1269 .await
1270 }
1271
1272 pub async fn push_queued_session_remind_from_params(
1273 &self,
1274 params: &serde_json::Value,
1275 ) -> Result<String, String> {
1276 let reminder = queued_session_remind_from_params(params)?;
1277 let reminder_id = reminder.reminder.id.clone();
1278 self.queued_transcript_injections
1279 .push_session_reminder(reminder)
1280 .await;
1281 Ok(reminder_id)
1282 }
1283
1284 pub async fn take_queued_user_messages(
1285 &self,
1286 include_interrupt_immediate: bool,
1287 include_finish_step: bool,
1288 include_audit_only: bool,
1289 ) -> Vec<QueuedUserMessage> {
1290 let mut state = self.queued_transcript_injections.inner.lock().await;
1291 let mut selected = Vec::new();
1292 let mut retained = VecDeque::new();
1293 while let Some(injection) = state.queue.pop_front() {
1294 let should_take = match injection.mode() {
1295 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1296 QueuedUserMessageMode::FinishStep => include_finish_step,
1297 QueuedUserMessageMode::AuditOnly => include_audit_only,
1298 };
1299 match (should_take, injection) {
1300 (true, QueuedTranscriptInjection::User(message)) => {
1301 state
1302 .delivered_user_message_ids
1303 .insert(message.message_id.clone());
1304 selected.push(message);
1305 }
1306 (_, injection) => retained.push_back(injection),
1307 }
1308 }
1309 state.queue = retained;
1310 selected
1311 }
1312
1313 pub async fn take_queued_transcript_injections(
1314 &self,
1315 include_interrupt_immediate: bool,
1316 include_finish_step: bool,
1317 include_audit_only: bool,
1318 ) -> Vec<QueuedTranscriptInjection> {
1319 let mut state = self.queued_transcript_injections.inner.lock().await;
1320 let mut selected = Vec::new();
1321 let mut retained = VecDeque::new();
1322 while let Some(injection) = state.queue.pop_front() {
1323 let should_take = match injection.mode() {
1324 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1325 QueuedUserMessageMode::FinishStep => include_finish_step,
1326 QueuedUserMessageMode::AuditOnly => include_audit_only,
1327 };
1328 if should_take {
1329 match &injection {
1330 QueuedTranscriptInjection::User(message) => {
1331 state
1332 .delivered_user_message_ids
1333 .insert(message.message_id.clone());
1334 }
1335 QueuedTranscriptInjection::Reminder(reminder) => {
1336 state
1337 .delivered_reminder_ids
1338 .insert(reminder.reminder.id.clone());
1339 }
1340 }
1341 selected.push(injection);
1342 } else {
1343 retained.push_back(injection);
1344 }
1345 }
1346 state.queue = retained;
1347 selected
1348 }
1349
1350 pub async fn take_queued_user_messages_for(
1351 &self,
1352 checkpoint: DeliveryCheckpoint,
1353 ) -> Vec<QueuedUserMessage> {
1354 match checkpoint {
1355 DeliveryCheckpoint::InterruptImmediate => {
1356 self.take_queued_user_messages(true, false, false).await
1357 }
1358 DeliveryCheckpoint::AfterCurrentOperation => {
1359 self.take_queued_user_messages(false, true, false).await
1360 }
1361 DeliveryCheckpoint::EndOfInteraction => {
1362 self.take_queued_user_messages(false, false, true).await
1363 }
1364 }
1365 }
1366
1367 pub async fn take_queued_transcript_injections_for(
1368 &self,
1369 checkpoint: DeliveryCheckpoint,
1370 ) -> Vec<QueuedTranscriptInjection> {
1371 match checkpoint {
1372 DeliveryCheckpoint::InterruptImmediate => {
1373 self.take_queued_transcript_injections(true, false, false)
1374 .await
1375 }
1376 DeliveryCheckpoint::AfterCurrentOperation => {
1377 self.take_queued_transcript_injections(false, true, false)
1378 .await
1379 }
1380 DeliveryCheckpoint::EndOfInteraction => {
1381 self.take_queued_transcript_injections(false, false, true)
1382 .await
1383 }
1384 }
1385 }
1386
1387 pub fn send_output(&self, text: &str) {
1389 self.notify("output", serde_json::json!({"text": text}));
1390 }
1391
1392 pub fn send_progress(
1394 &self,
1395 phase: &str,
1396 message: &str,
1397 progress: Option<i64>,
1398 total: Option<i64>,
1399 data: Option<serde_json::Value>,
1400 ) {
1401 let mut payload = serde_json::json!({"phase": phase, "message": message});
1402 if let Some(p) = progress {
1403 payload["progress"] = serde_json::json!(p);
1404 }
1405 if let Some(t) = total {
1406 payload["total"] = serde_json::json!(t);
1407 }
1408 if let Some(d) = data {
1409 payload["data"] = d;
1410 }
1411 self.notify("progress", payload);
1412 }
1413
1414 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1416 let mut payload = serde_json::json!({"level": level, "message": message});
1417 if let Some(f) = fields {
1418 payload["fields"] = f;
1419 }
1420 self.notify("log", payload);
1421 }
1422
1423 pub fn send_call_start(
1426 &self,
1427 call_id: &str,
1428 call_type: &str,
1429 name: &str,
1430 metadata: serde_json::Value,
1431 ) {
1432 let session_id = self.get_session_id();
1433 let script = self.get_script_name();
1434 let stream_publicly = metadata
1435 .get("stream_publicly")
1436 .and_then(|value| value.as_bool())
1437 .unwrap_or(true);
1438 self.visible_call_streams
1439 .lock()
1440 .unwrap_or_else(|e| e.into_inner())
1441 .insert(call_id.to_string(), stream_publicly);
1442 self.notify(
1443 "session/update",
1444 serde_json::json!({
1445 "sessionId": session_id,
1446 "update": {
1447 "sessionUpdate": "call_start",
1448 "content": {
1449 "toolCallId": call_id,
1450 "call_type": call_type,
1451 "name": name,
1452 "script": script,
1453 "metadata": metadata,
1454 },
1455 },
1456 }),
1457 );
1458 }
1459
1460 pub fn send_call_progress(
1463 &self,
1464 call_id: &str,
1465 delta: &str,
1466 accumulated_tokens: u64,
1467 user_visible: bool,
1468 ) {
1469 let session_id = self.get_session_id();
1470 let (visible_text, visible_delta) = {
1471 let stream_publicly = self
1472 .visible_call_streams
1473 .lock()
1474 .unwrap_or_else(|e| e.into_inner())
1475 .get(call_id)
1476 .copied()
1477 .unwrap_or(true);
1478 let mut states = self
1479 .visible_call_states
1480 .lock()
1481 .unwrap_or_else(|e| e.into_inner());
1482 let state = states.entry(call_id.to_string()).or_default();
1483 state.push(delta, stream_publicly)
1484 };
1485 self.notify(
1486 "session/update",
1487 serde_json::json!({
1488 "sessionId": session_id,
1489 "update": {
1490 "sessionUpdate": "call_progress",
1491 "content": {
1492 "toolCallId": call_id,
1493 "delta": delta,
1494 "accumulated_tokens": accumulated_tokens,
1495 "visible_text": visible_text,
1496 "visible_delta": visible_delta,
1497 "user_visible": user_visible,
1498 },
1499 },
1500 }),
1501 );
1502 }
1503
1504 pub fn send_call_end(
1506 &self,
1507 call_id: &str,
1508 call_type: &str,
1509 name: &str,
1510 duration_ms: u64,
1511 status: &str,
1512 metadata: serde_json::Value,
1513 ) {
1514 let session_id = self.get_session_id();
1515 let script = self.get_script_name();
1516 self.visible_call_states
1517 .lock()
1518 .unwrap_or_else(|e| e.into_inner())
1519 .remove(call_id);
1520 self.visible_call_streams
1521 .lock()
1522 .unwrap_or_else(|e| e.into_inner())
1523 .remove(call_id);
1524 self.notify(
1525 "session/update",
1526 serde_json::json!({
1527 "sessionId": session_id,
1528 "update": {
1529 "sessionUpdate": "call_end",
1530 "content": {
1531 "toolCallId": call_id,
1532 "call_type": call_type,
1533 "name": name,
1534 "script": script,
1535 "duration_ms": duration_ms,
1536 "status": status,
1537 "metadata": metadata,
1538 },
1539 },
1540 }),
1541 );
1542 }
1543
1544 pub fn send_worker_update(
1546 &self,
1547 worker_id: &str,
1548 worker_name: &str,
1549 status: &str,
1550 metadata: serde_json::Value,
1551 audit: Option<&MutationSessionRecord>,
1552 ) {
1553 let session_id = self.get_session_id();
1554 let script = self.get_script_name();
1555 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1556 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1557 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1558 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1559 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1560 let lifecycle = serde_json::json!({
1561 "event": status,
1562 "worker_id": worker_id,
1563 "worker_name": worker_name,
1564 "started_at": started_at,
1565 "finished_at": finished_at,
1566 });
1567 self.notify(
1568 "session/update",
1569 serde_json::json!({
1570 "sessionId": session_id,
1571 "update": {
1572 "sessionUpdate": "worker_update",
1573 "content": {
1574 "worker_id": worker_id,
1575 "worker_name": worker_name,
1576 "status": status,
1577 "script": script,
1578 "started_at": started_at,
1579 "finished_at": finished_at,
1580 "snapshot_path": snapshot_path,
1581 "run_id": run_id,
1582 "run_path": run_path,
1583 "lifecycle": lifecycle,
1584 "audit": audit,
1585 "metadata": metadata,
1586 },
1587 },
1588 }),
1589 );
1590 }
1591}
1592
1593pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1595 crate::stdlib::json_to_vm_value(val)
1596}
1597
1598fn parse_host_tools_list_response(
1599 result: serde_json::Value,
1600) -> Result<Vec<serde_json::Value>, VmError> {
1601 let tools = match result {
1602 serde_json::Value::Array(items) => items,
1603 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1604 map.get("result")
1605 .and_then(|value| value.get("tools"))
1606 .cloned()
1607 }) {
1608 Some(serde_json::Value::Array(items)) => items,
1609 _ => {
1610 return Err(VmError::Runtime(
1611 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1612 ));
1613 }
1614 },
1615 _ => {
1616 return Err(VmError::Runtime(
1617 "host/tools/list: unexpected response shape".into(),
1618 ));
1619 }
1620 };
1621
1622 let mut normalized = Vec::with_capacity(tools.len());
1623 for tool in tools {
1624 let serde_json::Value::Object(map) = tool else {
1625 return Err(VmError::Runtime(
1626 "host/tools/list: every tool must be an object".into(),
1627 ));
1628 };
1629 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1630 return Err(VmError::Runtime(
1631 "host/tools/list: every tool must include a string `name`".into(),
1632 ));
1633 };
1634 let description = map
1635 .get("description")
1636 .and_then(|value| value.as_str())
1637 .or_else(|| {
1638 map.get("short_description")
1639 .and_then(|value| value.as_str())
1640 })
1641 .unwrap_or_default();
1642 let schema = map
1643 .get("schema")
1644 .cloned()
1645 .or_else(|| map.get("parameters").cloned())
1646 .or_else(|| map.get("input_schema").cloned())
1647 .unwrap_or(serde_json::Value::Null);
1648 let deprecated = map
1649 .get("deprecated")
1650 .and_then(|value| value.as_bool())
1651 .unwrap_or(false);
1652 normalized.push(serde_json::json!({
1653 "name": name,
1654 "description": description,
1655 "schema": schema,
1656 "deprecated": deprecated,
1657 }));
1658 }
1659 Ok(normalized)
1660}
1661
1662#[cfg(test)]
1663mod tests {
1664 use super::*;
1665
1666 fn test_bridge() -> HostBridge {
1667 HostBridge::from_parts(
1668 Arc::new(Mutex::new(HashMap::new())),
1669 Arc::new(AtomicBool::new(false)),
1670 Arc::new(std::sync::Mutex::new(())),
1671 1,
1672 )
1673 }
1674
1675 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1676 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1677 Arc::new(Mutex::new(HashMap::new())),
1678 Arc::new(AtomicBool::new(false)),
1679 Arc::new(Notify::new()),
1680 Arc::new(|_| Ok(())),
1681 100,
1682 Some(owner.injection_state()),
1683 )
1684 }
1685
1686 #[test]
1687 fn test_json_rpc_request_format() {
1688 let request = crate::jsonrpc::request(
1689 1,
1690 "llm_call",
1691 serde_json::json!({
1692 "prompt": "Hello",
1693 "system": "Be helpful",
1694 }),
1695 );
1696 let s = serde_json::to_string(&request).unwrap();
1697 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1698 assert!(s.contains("\"id\":1"));
1699 assert!(s.contains("\"method\":\"llm_call\""));
1700 }
1701
1702 #[test]
1703 fn test_json_rpc_notification_format() {
1704 let notification =
1705 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1706 let s = serde_json::to_string(¬ification).unwrap();
1707 assert!(s.contains("\"method\":\"output\""));
1708 assert!(!s.contains("\"id\""));
1709 }
1710
1711 #[test]
1712 fn test_json_rpc_error_response_parsing() {
1713 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1714 assert!(response.get("error").is_some());
1715 assert_eq!(
1716 response["error"]["message"].as_str().unwrap(),
1717 "Invalid request"
1718 );
1719 }
1720
1721 #[test]
1722 fn test_json_rpc_success_response_parsing() {
1723 let response = crate::jsonrpc::response(
1724 1,
1725 serde_json::json!({
1726 "text": "Hello world",
1727 "input_tokens": 10,
1728 "output_tokens": 5,
1729 }),
1730 );
1731 assert!(response.get("result").is_some());
1732 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1733 }
1734
1735 #[test]
1736 fn test_cancelled_flag() {
1737 let cancelled = Arc::new(AtomicBool::new(false));
1738 assert!(!cancelled.load(Ordering::SeqCst));
1739 cancelled.store(true, Ordering::SeqCst);
1740 assert!(cancelled.load(Ordering::SeqCst));
1741 }
1742
1743 #[test]
1744 fn pending_host_calls_return_when_cancellation_arrives() {
1745 let runtime = tokio::runtime::Builder::new_current_thread()
1746 .enable_all()
1747 .build()
1748 .unwrap();
1749 runtime.block_on(async {
1750 let pending = Arc::new(Mutex::new(HashMap::new()));
1751 let cancelled = Arc::new(AtomicBool::new(false));
1752 let bridge = HostBridge::from_parts_with_writer(
1753 pending.clone(),
1754 cancelled.clone(),
1755 Arc::new(|_| Ok(())),
1756 1,
1757 );
1758
1759 let call = bridge.call("host/work", serde_json::json!({}));
1760 tokio::pin!(call);
1761
1762 loop {
1763 tokio::select! {
1764 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1765 _ = tokio::task::yield_now() => {}
1766 }
1767 if !pending.lock().await.is_empty() {
1768 break;
1769 }
1770 }
1771
1772 cancelled.store(true, Ordering::SeqCst);
1773 bridge.cancel_notify.notify_waiters();
1774
1775 let result = tokio::time::timeout(Duration::from_secs(1), call)
1776 .await
1777 .expect("pending call should observe cancellation promptly");
1778 assert!(
1779 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1780 );
1781 assert!(pending.lock().await.is_empty());
1782 });
1783 }
1784
1785 #[test]
1786 fn queued_messages_are_filtered_by_delivery_mode() {
1787 let runtime = tokio::runtime::Builder::new_current_thread()
1788 .enable_all()
1789 .build()
1790 .unwrap();
1791 runtime.block_on(async {
1792 let bridge = test_bridge();
1793 bridge
1794 .push_queued_user_message("first".to_string(), "finish_step")
1795 .await;
1796 bridge
1797 .push_queued_user_message("second".to_string(), "audit_only")
1798 .await;
1799
1800 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1801 assert_eq!(finish_step.len(), 1);
1802 assert_eq!(finish_step[0].content, "first");
1803
1804 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1805 assert_eq!(audit_only.len(), 1);
1806 assert_eq!(audit_only[0].content, "second");
1807 });
1808 }
1809
1810 #[test]
1811 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1812 let runtime = tokio::runtime::Builder::new_current_thread()
1813 .enable_all()
1814 .build()
1815 .unwrap();
1816 runtime.block_on(async {
1817 let bridge = test_bridge();
1818 let first_id = bridge
1819 .push_pending_user_message(
1820 "first".to_string(),
1821 serde_json::json!("first"),
1822 "audit_only",
1823 )
1824 .await;
1825 let second_id = bridge
1826 .push_pending_user_message(
1827 "second".to_string(),
1828 serde_json::json!("second"),
1829 "audit_only",
1830 )
1831 .await;
1832
1833 assert_eq!(
1834 bridge
1835 .replace_pending_user_message(
1836 &second_id,
1837 "second edited".to_string(),
1838 serde_json::json!("second edited"),
1839 )
1840 .await,
1841 PendingUserMessageMutationResult::Mutated
1842 );
1843 assert_eq!(
1844 bridge.revoke_pending_user_message(&first_id).await,
1845 PendingUserMessageMutationResult::Mutated
1846 );
1847 assert_eq!(
1848 bridge.revoke_pending_user_message(&first_id).await,
1849 PendingUserMessageMutationResult::AlreadyRevoked
1850 );
1851
1852 let delivered = bridge
1853 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1854 .await;
1855 assert_eq!(delivered.len(), 1);
1856 assert_eq!(delivered[0].message_id, second_id);
1857 assert_eq!(delivered[0].content, "second edited");
1858
1859 assert_eq!(
1860 bridge.revoke_pending_user_message(&second_id).await,
1861 PendingUserMessageMutationResult::AlreadyDelivered
1862 );
1863 assert_eq!(
1864 bridge
1865 .replace_pending_user_message(
1866 &second_id,
1867 "too late".to_string(),
1868 serde_json::json!("too late"),
1869 )
1870 .await,
1871 PendingUserMessageMutationResult::AlreadyDelivered
1872 );
1873 assert_eq!(
1874 bridge.revoke_pending_user_message("missing").await,
1875 PendingUserMessageMutationResult::UnknownMessageId
1876 );
1877 });
1878 }
1879
1880 #[test]
1881 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1882 let runtime = tokio::runtime::Builder::new_current_thread()
1883 .enable_all()
1884 .build()
1885 .unwrap();
1886 runtime.block_on(async {
1887 let bridge = test_bridge();
1888 let first_id = bridge
1889 .push_pending_user_message(
1890 "first".to_string(),
1891 serde_json::json!("first"),
1892 "finish_step",
1893 )
1894 .await;
1895 let second_id = bridge
1896 .push_pending_user_message(
1897 "second".to_string(),
1898 serde_json::json!("second"),
1899 "finish_step",
1900 )
1901 .await;
1902 assert_eq!(
1903 bridge
1904 .replace_pending_user_message(
1905 &first_id,
1906 "first edited".to_string(),
1907 serde_json::json!("first edited"),
1908 )
1909 .await,
1910 PendingUserMessageMutationResult::Mutated
1911 );
1912
1913 let delivered = bridge
1914 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1915 .await;
1916 assert_eq!(
1917 delivered
1918 .iter()
1919 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1920 .collect::<Vec<_>>(),
1921 vec![
1922 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1923 (&second_id, "second", QueuedUserMessageMode::FinishStep),
1924 ]
1925 );
1926 });
1927 }
1928
1929 #[test]
1930 fn pending_user_message_state_survives_bridge_replacement() {
1931 let runtime = tokio::runtime::Builder::new_current_thread()
1932 .enable_all()
1933 .build()
1934 .unwrap();
1935 runtime.block_on(async {
1936 let bridge = test_bridge();
1937 let revoked_id = bridge
1938 .push_pending_user_message(
1939 "revoke me".to_string(),
1940 serde_json::json!("revoke me"),
1941 "audit_only",
1942 )
1943 .await;
1944 let delivered_id = bridge
1945 .push_pending_user_message(
1946 "deliver me".to_string(),
1947 serde_json::json!("deliver me"),
1948 "audit_only",
1949 )
1950 .await;
1951 assert_eq!(
1952 bridge.revoke_pending_user_message(&revoked_id).await,
1953 PendingUserMessageMutationResult::Mutated
1954 );
1955 bridge.cancelled.store(true, Ordering::SeqCst);
1956
1957 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1958 assert_eq!(
1959 replacement_bridge
1960 .revoke_pending_user_message(&revoked_id)
1961 .await,
1962 PendingUserMessageMutationResult::AlreadyRevoked
1963 );
1964 let delivered = replacement_bridge
1965 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1966 .await;
1967 assert_eq!(delivered.len(), 1);
1968 assert_eq!(delivered[0].message_id, delivered_id);
1969 assert_eq!(delivered[0].content, "deliver me");
1970 assert_eq!(
1971 bridge.revoke_pending_user_message(&delivered_id).await,
1972 PendingUserMessageMutationResult::AlreadyDelivered
1973 );
1974 });
1975 }
1976
1977 #[test]
1978 fn queued_transcript_injections_preserve_user_reminder_separation() {
1979 let runtime = tokio::runtime::Builder::new_current_thread()
1980 .enable_all()
1981 .build()
1982 .unwrap();
1983 runtime.block_on(async {
1984 let bridge = test_bridge();
1985 bridge
1986 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1987 .await;
1988 let reminder_id = bridge
1989 .push_queued_session_remind_from_params(&serde_json::json!({
1990 "body": "Host-provided ambient context.",
1991 "tags": ["host"],
1992 "dedupe_key": "host-context",
1993 "ttl_turns": 2,
1994 "mode": "audit_only",
1995 "_meta": {"harn": {"source": "test"}},
1996 }))
1997 .await
1998 .expect("valid reminder");
1999
2000 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2001 assert_eq!(finish_step.len(), 1);
2002 assert_eq!(finish_step[0].content, "human follow-up");
2003
2004 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2005 assert!(no_user_messages.is_empty());
2006
2007 let injections = bridge
2008 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2009 .await;
2010 assert_eq!(injections.len(), 1);
2011 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2012 panic!("expected queued reminder");
2013 };
2014 assert_eq!(reminder.reminder.id, reminder_id);
2015 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2016 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2017 assert_eq!(
2018 reminder.reminder.dedupe_key.as_deref(),
2019 Some("host-context")
2020 );
2021 assert_eq!(reminder.reminder.ttl_turns, Some(2));
2022 assert_eq!(
2023 reminder.reminder.source,
2024 crate::llm::helpers::ReminderSource::Bridge
2025 );
2026 });
2027 }
2028
2029 #[test]
2030 fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2031 let runtime = tokio::runtime::Builder::new_current_thread()
2032 .enable_all()
2033 .build()
2034 .unwrap();
2035 runtime.block_on(async {
2036 let bridge = test_bridge();
2037 let message_id = bridge
2038 .push_pending_user_message(
2039 "human follow-up".to_string(),
2040 serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2041 "finish_step",
2042 )
2043 .await;
2044 let reminder_id = bridge
2045 .push_queued_session_remind_from_params(&serde_json::json!({
2046 "id": "rem-test",
2047 "body": "Host reminder",
2048 "tags": ["host"],
2049 "dedupe_key": "host-reminder",
2050 "ttl_turns": 2,
2051 "mode": "interrupt_immediate",
2052 }))
2053 .await
2054 .expect("valid session/remind payload");
2055
2056 let pending = bridge.pending_injections_json().await;
2057 assert_eq!(pending["pendingCount"], 2);
2058 assert_eq!(pending["injections"][0]["kind"], "user");
2059 assert_eq!(pending["injections"][0]["id"], message_id);
2060 assert_eq!(pending["injections"][0]["messageId"], message_id);
2061 assert_eq!(pending["injections"][0]["mode"], "finish_step");
2062 assert_eq!(pending["injections"][0]["position"], 0);
2063 assert_eq!(pending["injections"][1]["kind"], "reminder");
2064 assert_eq!(pending["injections"][1]["id"], reminder_id);
2065 assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2066 assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2067 assert_eq!(pending["injections"][1]["body"], "Host reminder");
2068 assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2069 assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2070 assert_eq!(pending["injections"][1]["position"], 1);
2071 });
2072 }
2073
2074 #[test]
2075 fn pending_reminders_support_revoke_and_delivery_states() {
2076 let runtime = tokio::runtime::Builder::new_current_thread()
2077 .enable_all()
2078 .build()
2079 .unwrap();
2080 runtime.block_on(async {
2081 let bridge = test_bridge();
2082 let revoked_id = bridge
2083 .push_queued_session_remind_from_params(&serde_json::json!({
2084 "id": "rem-revoke",
2085 "body": "remove me",
2086 "mode": "finish_step",
2087 }))
2088 .await
2089 .expect("valid session/remind payload");
2090 let delivered_id = bridge
2091 .push_queued_session_remind_from_params(&serde_json::json!({
2092 "id": "rem-deliver",
2093 "body": "deliver me",
2094 "mode": "finish_step",
2095 }))
2096 .await
2097 .expect("valid session/remind payload");
2098
2099 assert_eq!(
2100 bridge.revoke_pending_reminder(&revoked_id).await,
2101 PendingReminderMutationResult::Mutated
2102 );
2103 assert_eq!(
2104 bridge.revoke_pending_reminder(&revoked_id).await,
2105 PendingReminderMutationResult::AlreadyRevoked
2106 );
2107
2108 let pending = bridge.pending_injections_json().await;
2109 assert_eq!(pending["pendingCount"], 1);
2110 assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2111
2112 let delivered = bridge
2113 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2114 .await;
2115 assert_eq!(delivered.len(), 1);
2116 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2117 panic!("expected delivered reminder");
2118 };
2119 assert_eq!(reminder.reminder.id, delivered_id);
2120
2121 assert_eq!(
2122 bridge.revoke_pending_reminder(&delivered_id).await,
2123 PendingReminderMutationResult::AlreadyDelivered
2124 );
2125 assert_eq!(
2126 bridge.revoke_pending_reminder("missing").await,
2127 PendingReminderMutationResult::UnknownReminderId
2128 );
2129 });
2130 }
2131
2132 #[test]
2133 fn bridge_remind_modes_honor_delivery_checkpoints() {
2134 let runtime = tokio::runtime::Builder::new_current_thread()
2135 .enable_all()
2136 .build()
2137 .unwrap();
2138 runtime.block_on(async {
2139 let cases = [
2140 (
2141 "interrupt_immediate",
2142 DeliveryCheckpoint::InterruptImmediate,
2143 DeliveryCheckpoint::AfterCurrentOperation,
2144 ),
2145 (
2146 "finish_step",
2147 DeliveryCheckpoint::AfterCurrentOperation,
2148 DeliveryCheckpoint::EndOfInteraction,
2149 ),
2150 (
2151 "audit_only",
2152 DeliveryCheckpoint::EndOfInteraction,
2153 DeliveryCheckpoint::InterruptImmediate,
2154 ),
2155 ];
2156
2157 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2158 let bridge = test_bridge();
2159 bridge
2160 .push_queued_session_remind_from_params(&serde_json::json!({
2161 "body": format!("Reminder for {mode}"),
2162 "mode": mode,
2163 }))
2164 .await
2165 .expect("valid session/remind payload");
2166
2167 let premature = bridge
2168 .take_queued_transcript_injections_for(wrong_checkpoint)
2169 .await;
2170 assert!(
2171 premature.is_empty(),
2172 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2173 );
2174
2175 let delivered = bridge
2176 .take_queued_transcript_injections_for(expected_checkpoint)
2177 .await;
2178 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2179 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2180 panic!("expected reminder for {mode}");
2181 };
2182 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2183 }
2184 });
2185 }
2186
2187 #[test]
2188 fn session_remind_validation_rejects_user_message_shape() {
2189 let err = queued_session_remind_from_params(&serde_json::json!({
2190 "content": "this is still a user message",
2191 "mode": "interrupt_immediate",
2192 }))
2193 .expect_err("session/remind must require a reminder body");
2194 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2195 assert!(err.contains("body"));
2196 }
2197
2198 #[test]
2199 fn session_remind_validation_rejects_unknown_options_separately() {
2200 let err = queued_session_remind_from_params(&serde_json::json!({
2201 "body": "valid body",
2202 "unknown_host_field": true,
2203 }))
2204 .expect_err("session/remind must reject unknown top-level fields");
2205 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2206 assert!(err.contains("unknown_host_field"));
2207 }
2208
2209 #[test]
2210 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2211 let err = queued_session_remind_from_params(&serde_json::json!({
2212 "body": "valid body",
2213 "propagate": "workspace",
2214 }))
2215 .expect_err("session/remind must reject unknown propagate values");
2216 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2217 assert!(err.contains("propagate"));
2218 }
2219
2220 #[test]
2221 fn test_json_result_to_vm_value_string() {
2222 let val = serde_json::json!("hello");
2223 let vm_val = json_result_to_vm_value(&val);
2224 assert_eq!(vm_val.display(), "hello");
2225 }
2226
2227 #[test]
2228 fn test_json_result_to_vm_value_dict() {
2229 let val = serde_json::json!({"name": "test", "count": 42});
2230 let vm_val = json_result_to_vm_value(&val);
2231 let VmValue::Dict(d) = &vm_val else {
2232 unreachable!("Expected Dict, got {:?}", vm_val);
2233 };
2234 assert_eq!(d.get("name").unwrap().display(), "test");
2235 assert_eq!(d.get("count").unwrap().display(), "42");
2236 }
2237
2238 #[test]
2239 fn test_json_result_to_vm_value_null() {
2240 let val = serde_json::json!(null);
2241 let vm_val = json_result_to_vm_value(&val);
2242 assert!(matches!(vm_val, VmValue::Nil));
2243 }
2244
2245 #[test]
2246 fn test_json_result_to_vm_value_nested() {
2247 let val = serde_json::json!({
2248 "text": "response",
2249 "tool_calls": [
2250 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2251 ],
2252 "input_tokens": 100,
2253 "output_tokens": 50,
2254 });
2255 let vm_val = json_result_to_vm_value(&val);
2256 let VmValue::Dict(d) = &vm_val else {
2257 unreachable!("Expected Dict, got {:?}", vm_val);
2258 };
2259 assert_eq!(d.get("text").unwrap().display(), "response");
2260 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2261 unreachable!("Expected List for tool_calls");
2262 };
2263 assert_eq!(list.len(), 1);
2264 }
2265
2266 #[test]
2267 fn parse_host_tools_list_accepts_object_wrapper() {
2268 let tools = parse_host_tools_list_response(serde_json::json!({
2269 "tools": [
2270 {
2271 "name": "Read",
2272 "description": "Read a file",
2273 "schema": {"type": "object"},
2274 }
2275 ]
2276 }))
2277 .expect("tool list");
2278
2279 assert_eq!(tools.len(), 1);
2280 assert_eq!(tools[0]["name"], "Read");
2281 assert_eq!(tools[0]["deprecated"], false);
2282 }
2283
2284 #[test]
2285 fn parse_host_tools_list_accepts_compat_fields() {
2286 let tools = parse_host_tools_list_response(serde_json::json!({
2287 "result": {
2288 "tools": [
2289 {
2290 "name": "Edit",
2291 "short_description": "Apply an edit",
2292 "input_schema": {"type": "object"},
2293 "deprecated": true,
2294 }
2295 ]
2296 }
2297 }))
2298 .expect("tool list");
2299
2300 assert_eq!(tools[0]["description"], "Apply an edit");
2301 assert_eq!(tools[0]["schema"]["type"], "object");
2302 assert_eq!(tools[0]["deprecated"], true);
2303 }
2304
2305 #[test]
2306 fn parse_host_tools_list_requires_tool_names() {
2307 let err = parse_host_tools_list_response(serde_json::json!({
2308 "tools": [
2309 {"description": "missing name"}
2310 ]
2311 }))
2312 .expect_err("expected error");
2313 assert!(err
2314 .to_string()
2315 .contains("host/tools/list: every tool must include a string `name`"));
2316 }
2317
2318 #[test]
2319 fn test_timeout_duration() {
2320 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2321 }
2322}