1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32 Arc::new(move |line: &str| {
33 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34 let mut stdout = std::io::stdout().lock();
35 stdout
36 .write_all(line.as_bytes())
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .write_all(b"\n")
40 .map_err(|e| format!("Bridge write error: {e}"))?;
41 stdout
42 .flush()
43 .map_err(|e| format!("Bridge flush error: {e}"))?;
44 Ok(())
45 })
46}
47
48pub struct HostBridge {
55 next_id: AtomicU64,
56 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58 cancelled: Arc<AtomicBool>,
60 cancel_notify: Arc<Notify>,
62 writer: HostBridgeWriter,
64 session_id: std::sync::Mutex<String>,
66 script_name: std::sync::Mutex<String>,
68 queued_transcript_injections: HostBridgeInjectionState,
70 resume_requested: Arc<AtomicBool>,
72 skills_reload_requested: Arc<AtomicBool>,
77 daemon_idle: Arc<AtomicBool>,
79 prompt_stop_reason: std::sync::Mutex<Option<String>>,
85 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89 in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94 module_path: PathBuf,
95 exported_functions: BTreeMap<String, Arc<VmClosure>>,
96 vm: Vm,
97}
98
99impl InProcessHost {
100 fn dispatch<'a>(
106 &'a self,
107 method: &'a str,
108 params: serde_json::Value,
109 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110 Box::pin(async move {
111 match method {
112 "builtin_call" => {
113 let name = params
114 .get("name")
115 .and_then(|value| value.as_str())
116 .unwrap_or_default();
117 let args = params
118 .get("args")
119 .and_then(|value| value.as_array())
120 .cloned()
121 .unwrap_or_default()
122 .into_iter()
123 .map(|value| json_result_to_vm_value(&value))
124 .collect::<Vec<_>>();
125 self.invoke_export(name, &args).await
126 }
127 "host/tools/list" => self
128 .invoke_optional_export("host_tools_list", &[])
129 .await
130 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131 "session/request_permission" => self.request_permission(params).await,
132 other => Err(VmError::Runtime(format!(
133 "playground host backend does not implement bridge method '{other}'"
134 ))),
135 }
136 })
137 }
138
139 async fn invoke_export(
140 &self,
141 name: &str,
142 args: &[VmValue],
143 ) -> Result<serde_json::Value, VmError> {
144 let Some(closure) = self.exported_functions.get(name) else {
145 return Err(VmError::Runtime(format!(
146 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147 self.module_path.display()
148 )));
149 };
150
151 let mut vm = self.vm.child_vm_for_host();
152 let result = vm.call_closure_pub(closure, args).await?;
153 Ok(crate::llm::vm_value_to_json(&result))
154 }
155
156 async fn invoke_optional_export(
157 &self,
158 name: &str,
159 args: &[VmValue],
160 ) -> Result<Option<serde_json::Value>, VmError> {
161 if !self.exported_functions.contains_key(name) {
162 return Ok(None);
163 }
164 self.invoke_export(name, args).await.map(Some)
165 }
166
167 async fn request_permission(
168 &self,
169 params: serde_json::Value,
170 ) -> Result<serde_json::Value, VmError> {
171 let Some(closure) = self.exported_functions.get("request_permission") else {
174 return Ok(crate::llm::acp_permission::allow_response());
175 };
176
177 let tool_call = params.get("toolCall");
178 let tool_name = tool_call
179 .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182 .and_then(|value| value.as_str())
183 .unwrap_or_default();
184 let tool_args = tool_call
185 .and_then(|tool_call| tool_call.get("rawInput"))
186 .map(json_result_to_vm_value)
187 .unwrap_or(VmValue::Nil);
188 let full_payload = json_result_to_vm_value(¶ms);
189
190 let arg_count = closure.func.params.len();
191 let args = if arg_count >= 3 {
192 vec![
193 VmValue::String(std::sync::Arc::from(tool_name.to_string())),
194 tool_args,
195 full_payload,
196 ]
197 } else if arg_count == 2 {
198 vec![
199 VmValue::String(std::sync::Arc::from(tool_name.to_string())),
200 tool_args,
201 ]
202 } else if arg_count == 1 {
203 vec![full_payload]
204 } else {
205 Vec::new()
206 };
207
208 let mut vm = self.vm.child_vm_for_host();
209 let result = vm.call_closure_pub(closure, &args).await?;
210 let payload = match result {
215 VmValue::Bool(granted) => {
216 if granted {
217 crate::llm::acp_permission::allow_response()
218 } else {
219 crate::llm::acp_permission::reject_response(None)
220 }
221 }
222 VmValue::String(reason) if !reason.is_empty() => {
223 crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224 }
225 other => {
226 let json = crate::llm::vm_value_to_json(&other);
227 if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228 if granted {
229 crate::llm::acp_permission::allow_response()
230 } else {
231 crate::llm::acp_permission::reject_response(
232 json.get("reason")
233 .and_then(|value| value.as_str())
234 .map(str::to_string),
235 )
236 }
237 } else if json.get("outcome").is_some() {
238 json
240 } else if other.is_truthy() {
241 crate::llm::acp_permission::allow_response()
242 } else {
243 crate::llm::acp_permission::reject_response(None)
244 }
245 }
246 };
247 Ok(payload)
248 }
249}
250
251#[derive(Clone, Copy, Debug, PartialEq, Eq)]
261pub enum QueuedUserMessageMode {
262 InterruptImmediate,
263 FinishStep,
264 AuditOnly,
265}
266
267#[derive(Clone, Copy, Debug, PartialEq, Eq)]
268pub enum DeliveryCheckpoint {
269 InterruptImmediate,
270 AfterCurrentOperation,
271 EndOfInteraction,
272}
273
274impl QueuedUserMessageMode {
275 fn from_str(value: &str) -> Self {
276 match value {
277 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
278 "finish_step" | "after_current_operation" => Self::FinishStep,
279 _ => Self::AuditOnly,
284 }
285 }
286
287 fn as_str(self) -> &'static str {
288 match self {
289 Self::InterruptImmediate => "interrupt_immediate",
290 Self::FinishStep => "finish_step",
291 Self::AuditOnly => "audit_only",
292 }
293 }
294}
295
296#[derive(Clone, Debug, PartialEq, Eq)]
297pub struct QueuedUserMessage {
298 pub message_id: String,
299 pub content: String,
300 pub transcript_content: serde_json::Value,
301 pub mode: QueuedUserMessageMode,
302}
303
304#[derive(Clone, Debug, PartialEq, Eq)]
305pub struct QueuedReminder {
306 pub reminder: crate::llm::helpers::SystemReminder,
307 pub mode: QueuedUserMessageMode,
308}
309
310#[derive(Clone, Debug, PartialEq, Eq)]
311pub enum QueuedTranscriptInjection {
312 User(QueuedUserMessage),
313 Reminder(QueuedReminder),
314}
315
316#[derive(Debug, Default)]
317struct QueuedTranscriptInjections {
318 queue: VecDeque<QueuedTranscriptInjection>,
319 revoked_user_message_ids: HashSet<String>,
320 delivered_user_message_ids: HashSet<String>,
321 revoked_reminder_ids: HashSet<String>,
322 delivered_reminder_ids: HashSet<String>,
323}
324
325#[derive(Clone, Debug, Default)]
326pub struct HostBridgeInjectionState {
327 inner: Arc<Mutex<QueuedTranscriptInjections>>,
328}
329
330#[derive(Clone, Copy, Debug, PartialEq, Eq)]
331pub enum PendingUserMessageMutationResult {
332 Mutated,
333 AlreadyRevoked,
334 AlreadyDelivered,
335 UnknownMessageId,
336}
337
338impl QueuedTranscriptInjection {
339 fn mode(&self) -> QueuedUserMessageMode {
340 match self {
341 Self::User(message) => message.mode,
342 Self::Reminder(reminder) => reminder.mode,
343 }
344 }
345
346 fn pending_json(&self, position: usize) -> serde_json::Value {
347 match self {
348 Self::User(message) => serde_json::json!({
349 "kind": "user",
350 "id": message.message_id,
351 "messageId": message.message_id,
352 "mode": message.mode.as_str(),
353 "position": position,
354 "content": message.transcript_content,
355 }),
356 Self::Reminder(reminder) => serde_json::json!({
357 "kind": "reminder",
358 "id": reminder.reminder.id,
359 "reminderId": reminder.reminder.id,
360 "mode": reminder.mode.as_str(),
361 "position": position,
362 "body": reminder.reminder.body,
363 "tags": reminder.reminder.tags,
364 "dedupeKey": reminder.reminder.dedupe_key,
365 "ttlTurns": reminder.reminder.ttl_turns,
366 "preserveOnCompact": reminder.reminder.preserve_on_compact,
367 "propagate": reminder.reminder.propagate.as_str(),
368 "roleHint": reminder.reminder.role_hint.as_str(),
369 "source": reminder.reminder.source.as_str(),
370 "firedAtTurn": reminder.reminder.fired_at_turn,
371 "originatingAgentId": reminder.reminder.originating_agent_id,
372 }),
373 }
374 }
375}
376
377#[derive(Clone, Copy, Debug, PartialEq, Eq)]
378pub enum PendingReminderMutationResult {
379 Mutated,
380 AlreadyRevoked,
381 AlreadyDelivered,
382 UnknownReminderId,
383}
384
385fn new_inject_message_id() -> String {
386 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
387}
388
389impl HostBridgeInjectionState {
390 pub fn new() -> Self {
391 Self::default()
392 }
393
394 pub async fn push_pending_user_message(
395 &self,
396 content: String,
397 transcript_content: serde_json::Value,
398 mode: &str,
399 ) -> String {
400 let message_id = new_inject_message_id();
401 self.inner
402 .lock()
403 .await
404 .queue
405 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
406 message_id: message_id.clone(),
407 content,
408 transcript_content,
409 mode: QueuedUserMessageMode::from_str(mode),
410 }));
411 message_id
412 }
413
414 pub async fn revoke_pending_user_message(
415 &self,
416 message_id: &str,
417 ) -> PendingUserMessageMutationResult {
418 let mut state = self.inner.lock().await;
419 let mut retained = VecDeque::new();
420 let mut revoked = false;
421 while let Some(injection) = state.queue.pop_front() {
422 match &injection {
423 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
424 revoked = true;
425 }
426 _ => retained.push_back(injection),
427 }
428 }
429 state.queue = retained;
430 if revoked {
431 state
432 .revoked_user_message_ids
433 .insert(message_id.to_string());
434 return PendingUserMessageMutationResult::Mutated;
435 }
436 if state.revoked_user_message_ids.contains(message_id) {
437 PendingUserMessageMutationResult::AlreadyRevoked
438 } else if state.delivered_user_message_ids.contains(message_id) {
439 PendingUserMessageMutationResult::AlreadyDelivered
440 } else {
441 PendingUserMessageMutationResult::UnknownMessageId
442 }
443 }
444
445 pub async fn revoke_pending_reminder(
446 &self,
447 reminder_id: &str,
448 ) -> PendingReminderMutationResult {
449 let mut state = self.inner.lock().await;
450 let mut retained = VecDeque::new();
451 let mut revoked = false;
452 while let Some(injection) = state.queue.pop_front() {
453 match &injection {
454 QueuedTranscriptInjection::Reminder(reminder)
455 if reminder.reminder.id == reminder_id =>
456 {
457 revoked = true;
458 }
459 _ => retained.push_back(injection),
460 }
461 }
462 state.queue = retained;
463 if revoked {
464 state.revoked_reminder_ids.insert(reminder_id.to_string());
465 return PendingReminderMutationResult::Mutated;
466 }
467 if state.revoked_reminder_ids.contains(reminder_id) {
468 PendingReminderMutationResult::AlreadyRevoked
469 } else if state.delivered_reminder_ids.contains(reminder_id) {
470 PendingReminderMutationResult::AlreadyDelivered
471 } else {
472 PendingReminderMutationResult::UnknownReminderId
473 }
474 }
475
476 pub async fn replace_pending_user_message(
477 &self,
478 message_id: &str,
479 content: String,
480 transcript_content: serde_json::Value,
481 ) -> PendingUserMessageMutationResult {
482 let mut state = self.inner.lock().await;
483 for injection in &mut state.queue {
484 if let QueuedTranscriptInjection::User(message) = injection {
485 if message.message_id == message_id {
486 message.content = content;
487 message.transcript_content = transcript_content;
488 return PendingUserMessageMutationResult::Mutated;
489 }
490 }
491 }
492 if state.revoked_user_message_ids.contains(message_id) {
493 PendingUserMessageMutationResult::AlreadyRevoked
494 } else if state.delivered_user_message_ids.contains(message_id) {
495 PendingUserMessageMutationResult::AlreadyDelivered
496 } else {
497 PendingUserMessageMutationResult::UnknownMessageId
498 }
499 }
500
501 async fn push_session_reminder(&self, reminder: QueuedReminder) {
502 self.inner
503 .lock()
504 .await
505 .queue
506 .push_back(QueuedTranscriptInjection::Reminder(reminder));
507 }
508
509 pub async fn pending_injections_json(&self) -> serde_json::Value {
510 let state = self.inner.lock().await;
511 let injections = state
512 .queue
513 .iter()
514 .enumerate()
515 .map(|(position, injection)| injection.pending_json(position))
516 .collect::<Vec<_>>();
517 serde_json::json!({
518 "pendingCount": injections.len(),
519 "injections": injections,
520 })
521 }
522}
523
524fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
525 format!(
526 "{}: {}",
527 Code::ReminderUnknownOption.as_str(),
528 message.as_ref()
529 )
530}
531
532fn session_remind_shape_error(message: impl AsRef<str>) -> String {
533 format!(
534 "{}: {}",
535 Code::ReminderInvalidShape.as_str(),
536 message.as_ref()
537 )
538}
539
540fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
541 format!(
542 "{}: {}",
543 Code::ReminderUnknownPropagate.as_str(),
544 message.as_ref()
545 )
546}
547
548fn string_field(
549 map: &serde_json::Map<String, serde_json::Value>,
550 key: &str,
551 required: bool,
552) -> Result<Option<String>, String> {
553 match map.get(key) {
554 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
555 format!("`{key}` must be a non-empty string"),
556 )),
557 None | Some(serde_json::Value::Null) => Ok(None),
558 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
559 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
560 ),
561 Some(serde_json::Value::String(value)) => {
562 let trimmed = value.trim();
563 if trimmed.is_empty() {
564 Ok(None)
565 } else {
566 Ok(Some(trimmed.to_string()))
567 }
568 }
569 Some(other) => Err(session_remind_shape_error(format!(
570 "`{key}` must be a string, got {other}"
571 ))),
572 }
573}
574
575fn bool_field(
576 map: &serde_json::Map<String, serde_json::Value>,
577 key: &str,
578) -> Result<Option<bool>, String> {
579 match map.get(key) {
580 None | Some(serde_json::Value::Null) => Ok(None),
581 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
582 Some(other) => Err(session_remind_shape_error(format!(
583 "`{key}` must be a bool, got {other}"
584 ))),
585 }
586}
587
588fn int_field(
589 map: &serde_json::Map<String, serde_json::Value>,
590 key: &str,
591) -> Result<Option<i64>, String> {
592 match map.get(key) {
593 None | Some(serde_json::Value::Null) => Ok(None),
594 Some(serde_json::Value::Number(value)) => {
595 let Some(value) = value.as_i64() else {
596 return Err(session_remind_shape_error(format!(
597 "`{key}` must be an integer"
598 )));
599 };
600 Ok(Some(value))
601 }
602 Some(other) => Err(session_remind_shape_error(format!(
603 "`{key}` must be an int, got {other}"
604 ))),
605 }
606}
607
608fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
609 let Some(value) = map.get("tags") else {
610 return Ok(Vec::new());
611 };
612 if value.is_null() {
613 return Ok(Vec::new());
614 }
615 let Some(values) = value.as_array() else {
616 return Err(session_remind_shape_error("`tags` must be a list"));
617 };
618 let mut tags = Vec::new();
619 for value in values {
620 let Some(tag) = value.as_str() else {
621 return Err(session_remind_shape_error(format!(
622 "`tags` entries must be strings, got {value}"
623 )));
624 };
625 let tag = tag.trim();
626 if tag.is_empty() {
627 return Err(session_remind_shape_error(
628 "`tags` entries must be non-empty strings",
629 ));
630 }
631 if !tags.iter().any(|existing| existing == tag) {
632 tags.push(tag.to_string());
633 }
634 }
635 Ok(tags)
636}
637
638fn session_remind_payload_from_value(
639 value: &serde_json::Value,
640) -> Result<crate::llm::helpers::SystemReminder, String> {
641 let Some(map) = value.as_object() else {
642 return Err(session_remind_shape_error(
643 "session/remind payload must be a reminder object",
644 ));
645 };
646 const ALLOWED: &[&str] = &[
647 "_meta",
648 "body",
649 "dedupe_key",
650 "fired_at_turn",
651 "id",
652 "preserve_on_compact",
653 "propagate",
654 "role_hint",
655 "source",
656 "tags",
657 "ttl_turns",
658 ];
659 let unknown = map
660 .keys()
661 .filter(|key| !ALLOWED.contains(&key.as_str()))
662 .map(String::as_str)
663 .collect::<Vec<_>>();
664 if !unknown.is_empty() {
665 if unknown.contains(&"content") {
666 return Err(session_remind_shape_error(
667 "session/remind expects reminder `body`, not user-message `content`",
668 ));
669 }
670 return Err(reminder_unknown_option_error(format!(
671 "unknown reminder option(s): {}",
672 unknown.join(", ")
673 )));
674 }
675 if let Some(meta) = map.get("_meta") {
676 if !meta.is_null() && !meta.is_object() {
677 return Err(session_remind_shape_error("`_meta` must be an object"));
678 }
679 }
680 let ttl_turns = int_field(map, "ttl_turns")?;
681 if let Some(value) = ttl_turns {
682 if value <= 0 {
683 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
684 }
685 }
686 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
687 if fired_at_turn < 0 {
688 return Err(session_remind_shape_error(
689 "`fired_at_turn` must be >= 0 when provided",
690 ));
691 }
692 match string_field(map, "source", false)?.as_deref() {
693 None | Some("bridge") => {}
694 Some(_) => {
695 return Err(session_remind_shape_error(
696 "`source` for session/remind must be bridge when provided",
697 ))
698 }
699 }
700 let propagate = match string_field(map, "propagate", false)?.as_deref() {
701 None => crate::llm::helpers::ReminderPropagate::Session,
702 Some("all") => crate::llm::helpers::ReminderPropagate::All,
703 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
704 Some("none") => crate::llm::helpers::ReminderPropagate::None,
705 Some(_) => {
706 return Err(reminder_unknown_propagate_error(
707 "`propagate` must be one of all, session, or none",
708 ))
709 }
710 };
711 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
712 None => crate::llm::helpers::ReminderRoleHint::System,
713 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
714 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
715 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
716 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
717 Some(_) => {
718 return Err(session_remind_shape_error(
719 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
720 ))
721 }
722 };
723 Ok(crate::llm::helpers::SystemReminder {
724 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
725 tags: tags_field(map)?,
726 dedupe_key: string_field(map, "dedupe_key", false)?,
727 ttl_turns,
728 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
729 propagate,
730 role_hint,
731 source: crate::llm::helpers::ReminderSource::Bridge,
732 body: string_field(map, "body", true)?.unwrap_or_default(),
733 fired_at_turn,
734 originating_agent_id: None,
735 })
736}
737
738fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
748 let session_id = params
749 .get("sessionId")
750 .or_else(|| params.get("session_id"))
751 .and_then(|value| value.as_str())
752 .unwrap_or_default();
753 let call_id = params
754 .get("toolCallId")
755 .or_else(|| params.get("tool_call_id"))
756 .or_else(|| params.get("callId"))
757 .or_else(|| params.get("call_id"))
758 .and_then(|value| value.as_str())
759 .unwrap_or_default();
760 if call_id.is_empty() {
761 return;
762 }
763 let reason = params
764 .get("reason")
765 .and_then(|value| value.as_str())
766 .unwrap_or("host cancelled in-flight tool call")
767 .to_string();
768 let inject_reminder = params
769 .get("injectReminder")
770 .or_else(|| params.get("inject_reminder"))
771 .and_then(|value| value.as_bool())
772 .unwrap_or(true);
773 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
774}
775
776fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
777 let mode = QueuedUserMessageMode::from_str(
778 params
779 .get("mode")
780 .and_then(|value| value.as_str())
781 .unwrap_or("audit_only"),
782 );
783 let reminder_value = if let Some(reminder) = params.get("reminder") {
784 reminder.clone()
785 } else {
786 let Some(params) = params.as_object() else {
787 return Err(session_remind_shape_error(
788 "session/remind params must be an object",
789 ));
790 };
791 let mut reminder = params.clone();
792 reminder.remove("mode");
793 reminder.remove("sessionId");
794 reminder.remove("session_id");
795 serde_json::Value::Object(reminder)
796 };
797 Ok(QueuedReminder {
798 reminder: session_remind_payload_from_value(&reminder_value)?,
799 mode,
800 })
801}
802
803#[allow(clippy::new_without_default)]
805impl HostBridge {
806 pub fn new() -> Self {
811 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
812 Arc::new(Mutex::new(HashMap::new()));
813 let cancelled = Arc::new(AtomicBool::new(false));
814 let cancel_notify = Arc::new(Notify::new());
815 let queued_transcript_injections = HostBridgeInjectionState::default();
816 let resume_requested = Arc::new(AtomicBool::new(false));
817 let skills_reload_requested = Arc::new(AtomicBool::new(false));
818 let daemon_idle = Arc::new(AtomicBool::new(false));
819
820 let pending_clone = pending.clone();
822 let cancelled_clone = cancelled.clone();
823 let cancel_notify_clone = cancel_notify.clone();
824 let queued_clone = queued_transcript_injections.clone();
825 let resume_clone = resume_requested.clone();
826 let skills_reload_clone = skills_reload_requested.clone();
827 tokio::task::spawn_local(async move {
828 let stdin = tokio::io::stdin();
829 let reader = tokio::io::BufReader::new(stdin);
830 let mut lines = reader.lines();
831
832 while let Ok(Some(line)) = lines.next_line().await {
833 let line = line.trim().to_string();
834 if line.is_empty() {
835 continue;
836 }
837
838 let msg: serde_json::Value = match serde_json::from_str(&line) {
839 Ok(v) => v,
840 Err(_) => continue,
841 };
842
843 if msg.get("id").is_none() {
845 if let Some(method) = msg["method"].as_str() {
846 if method == "cancel" {
847 cancelled_clone.store(true, Ordering::SeqCst);
848 cancel_notify_clone.notify_waiters();
849 } else if method == "agent/resume" {
850 resume_clone.store(true, Ordering::SeqCst);
851 } else if method == "skills/update" {
852 skills_reload_clone.store(true, Ordering::SeqCst);
853 } else if method == "session/remind" {
854 let params = &msg["params"];
855 if let Ok(reminder) = queued_session_remind_from_params(params) {
856 queued_clone.push_session_reminder(reminder).await;
857 }
858 } else if method == "session/cancel_tool_call" {
859 handle_cancel_tool_call_notification(&msg["params"]);
860 }
861 }
862 continue;
863 }
864
865 if let Some(id) = msg["id"].as_u64() {
866 let mut pending = pending_clone.lock().await;
867 if let Some(sender) = pending.remove(&id) {
868 let _ = sender.send(msg);
869 }
870 }
871 }
872
873 let mut pending = pending_clone.lock().await;
875 pending.clear();
876 });
877
878 Self {
879 next_id: AtomicU64::new(1),
880 pending,
881 cancelled,
882 cancel_notify,
883 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
884 session_id: std::sync::Mutex::new(String::new()),
885 script_name: std::sync::Mutex::new(String::new()),
886 queued_transcript_injections,
887 resume_requested,
888 skills_reload_requested,
889 daemon_idle,
890 prompt_stop_reason: std::sync::Mutex::new(None),
891 visible_call_states: std::sync::Mutex::new(HashMap::new()),
892 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
893 in_process: None,
894 }
895 }
896
897 pub fn from_parts(
903 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
904 cancelled: Arc<AtomicBool>,
905 stdout_lock: Arc<std::sync::Mutex<()>>,
906 start_id: u64,
907 ) -> Self {
908 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
909 }
910
911 pub fn from_parts_with_writer(
912 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
913 cancelled: Arc<AtomicBool>,
914 writer: HostBridgeWriter,
915 start_id: u64,
916 ) -> Self {
917 Self::from_parts_with_writer_and_cancel_notify(
918 pending,
919 cancelled,
920 Arc::new(Notify::new()),
921 writer,
922 start_id,
923 )
924 }
925
926 pub fn from_parts_with_writer_and_cancel_notify(
927 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
928 cancelled: Arc<AtomicBool>,
929 cancel_notify: Arc<Notify>,
930 writer: HostBridgeWriter,
931 start_id: u64,
932 ) -> Self {
933 Self::from_parts_with_writer_cancel_notify_and_injection_state(
934 pending,
935 cancelled,
936 cancel_notify,
937 writer,
938 start_id,
939 None,
940 )
941 }
942
943 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
944 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
945 cancelled: Arc<AtomicBool>,
946 cancel_notify: Arc<Notify>,
947 writer: HostBridgeWriter,
948 start_id: u64,
949 injection_state: Option<HostBridgeInjectionState>,
950 ) -> Self {
951 Self {
952 next_id: AtomicU64::new(start_id),
953 pending,
954 cancelled,
955 cancel_notify,
956 writer,
957 session_id: std::sync::Mutex::new(String::new()),
958 script_name: std::sync::Mutex::new(String::new()),
959 queued_transcript_injections: injection_state.unwrap_or_default(),
960 resume_requested: Arc::new(AtomicBool::new(false)),
961 skills_reload_requested: Arc::new(AtomicBool::new(false)),
962 daemon_idle: Arc::new(AtomicBool::new(false)),
963 prompt_stop_reason: std::sync::Mutex::new(None),
964 visible_call_states: std::sync::Mutex::new(HashMap::new()),
965 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
966 in_process: None,
967 }
968 }
969
970 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
973 let exported_functions = vm.load_module_exports(module_path).await?;
974 Ok(Self {
975 next_id: AtomicU64::new(1),
976 pending: Arc::new(Mutex::new(HashMap::new())),
977 cancelled: Arc::new(AtomicBool::new(false)),
978 cancel_notify: Arc::new(Notify::new()),
979 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
980 session_id: std::sync::Mutex::new(String::new()),
981 script_name: std::sync::Mutex::new(String::new()),
982 queued_transcript_injections: HostBridgeInjectionState::default(),
983 resume_requested: Arc::new(AtomicBool::new(false)),
984 skills_reload_requested: Arc::new(AtomicBool::new(false)),
985 daemon_idle: Arc::new(AtomicBool::new(false)),
986 prompt_stop_reason: std::sync::Mutex::new(None),
987 visible_call_states: std::sync::Mutex::new(HashMap::new()),
988 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
989 in_process: Some(InProcessHost {
990 module_path: module_path.to_path_buf(),
991 exported_functions,
992 vm,
993 }),
994 })
995 }
996
997 pub fn set_session_id(&self, id: &str) {
999 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1000 }
1001
1002 pub fn set_script_name(&self, name: &str) {
1004 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1005 }
1006
1007 fn get_script_name(&self) -> String {
1009 self.script_name
1010 .lock()
1011 .unwrap_or_else(|e| e.into_inner())
1012 .clone()
1013 }
1014
1015 pub fn get_session_id(&self) -> String {
1017 self.session_id
1018 .lock()
1019 .unwrap_or_else(|e| e.into_inner())
1020 .clone()
1021 }
1022
1023 fn write_line(&self, line: &str) -> Result<(), VmError> {
1025 (self.writer)(line).map_err(VmError::Runtime)
1026 }
1027
1028 pub async fn call(
1031 &self,
1032 method: &str,
1033 params: serde_json::Value,
1034 ) -> Result<serde_json::Value, VmError> {
1035 if let Some(in_process) = &self.in_process {
1036 return in_process.dispatch(method, params).await;
1037 }
1038
1039 if self.is_cancelled() {
1040 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1041 }
1042
1043 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1044 let cancel_wait = self.cancel_notify.notified();
1045 tokio::pin!(cancel_wait);
1046
1047 let request = crate::jsonrpc::request(id, method, params);
1048
1049 let (tx, rx) = oneshot::channel();
1050 {
1051 let mut pending = self.pending.lock().await;
1052 pending.insert(id, tx);
1053 }
1054
1055 let line = serde_json::to_string(&request)
1056 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1057 if let Err(e) = self.write_line(&line) {
1058 let mut pending = self.pending.lock().await;
1059 pending.remove(&id);
1060 return Err(e);
1061 }
1062
1063 if self.is_cancelled() {
1064 let mut pending = self.pending.lock().await;
1065 pending.remove(&id);
1066 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1067 }
1068
1069 let response = tokio::select! {
1070 result = rx => match result {
1071 Ok(msg) => msg,
1072 Err(_) => {
1073 return Err(VmError::Runtime(
1075 "Bridge: host closed connection before responding".into(),
1076 ));
1077 }
1078 },
1079 _ = &mut cancel_wait => {
1080 let mut pending = self.pending.lock().await;
1081 pending.remove(&id);
1082 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1083 }
1084 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1085 let mut pending = self.pending.lock().await;
1086 pending.remove(&id);
1087 return Err(VmError::Runtime(format!(
1088 "Bridge: host did not respond to '{method}' within {}s",
1089 DEFAULT_TIMEOUT.as_secs()
1090 )));
1091 }
1092 };
1093
1094 if let Some(error) = response.get("error") {
1095 let message = error["message"].as_str().unwrap_or("Unknown host error");
1096 let code = error["code"].as_i64().unwrap_or(-1);
1097 if code == -32001 {
1099 return Err(VmError::CategorizedError {
1100 message: message.to_string(),
1101 category: ErrorCategory::ToolRejected,
1102 });
1103 }
1104 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1105 }
1106
1107 Ok(response["result"].clone())
1108 }
1109
1110 pub fn notify(&self, method: &str, params: serde_json::Value) {
1113 let notification = crate::jsonrpc::notification(method, params);
1114 if self.in_process.is_some() {
1115 return;
1116 }
1117 if let Ok(line) = serde_json::to_string(¬ification) {
1118 let _ = self.write_line(&line);
1119 }
1120 }
1121
1122 pub fn is_cancelled(&self) -> bool {
1124 self.cancelled.load(Ordering::SeqCst)
1125 }
1126
1127 pub fn take_resume_signal(&self) -> bool {
1128 self.resume_requested.swap(false, Ordering::SeqCst)
1129 }
1130
1131 pub fn signal_resume(&self) {
1132 self.resume_requested.store(true, Ordering::SeqCst);
1133 }
1134
1135 pub fn set_daemon_idle(&self, idle: bool) {
1136 self.daemon_idle.store(idle, Ordering::SeqCst);
1137 }
1138
1139 pub fn is_daemon_idle(&self) -> bool {
1140 self.daemon_idle.load(Ordering::SeqCst)
1141 }
1142
1143 pub fn set_prompt_stop_reason(&self, reason: &str) {
1148 *self
1149 .prompt_stop_reason
1150 .lock()
1151 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1152 }
1153
1154 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1159 self.prompt_stop_reason
1160 .lock()
1161 .unwrap_or_else(|e| e.into_inner())
1162 .take()
1163 }
1164
1165 pub fn take_skills_reload_signal(&self) -> bool {
1170 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1171 }
1172
1173 pub fn signal_skills_reload(&self) {
1177 self.skills_reload_requested.store(true, Ordering::SeqCst);
1178 }
1179
1180 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1186 let result = self.call("skills/list", serde_json::json!({})).await?;
1187 match result {
1188 serde_json::Value::Array(items) => Ok(items),
1189 serde_json::Value::Object(map) => match map.get("skills") {
1190 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1191 _ => Err(VmError::Runtime(
1192 "skills/list: host response must be an array or { skills: [...] }".into(),
1193 )),
1194 },
1195 _ => Err(VmError::Runtime(
1196 "skills/list: unexpected response shape".into(),
1197 )),
1198 }
1199 }
1200
1201 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1207 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1208 parse_host_tools_list_response(result)
1209 }
1210
1211 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1215 self.call("skills/fetch", serde_json::json!({ "id": id }))
1216 .await
1217 }
1218
1219 pub fn injection_state(&self) -> HostBridgeInjectionState {
1220 self.queued_transcript_injections.clone()
1221 }
1222
1223 pub async fn push_pending_user_message(
1224 &self,
1225 content: String,
1226 transcript_content: serde_json::Value,
1227 mode: &str,
1228 ) -> String {
1229 self.queued_transcript_injections
1230 .push_pending_user_message(content, transcript_content, mode)
1231 .await
1232 }
1233
1234 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1235 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1236 .await
1237 }
1238
1239 pub async fn revoke_pending_user_message(
1240 &self,
1241 message_id: &str,
1242 ) -> PendingUserMessageMutationResult {
1243 self.queued_transcript_injections
1244 .revoke_pending_user_message(message_id)
1245 .await
1246 }
1247
1248 pub async fn revoke_pending_reminder(
1249 &self,
1250 reminder_id: &str,
1251 ) -> PendingReminderMutationResult {
1252 self.queued_transcript_injections
1253 .revoke_pending_reminder(reminder_id)
1254 .await
1255 }
1256
1257 pub async fn pending_injections_json(&self) -> serde_json::Value {
1258 self.queued_transcript_injections
1259 .pending_injections_json()
1260 .await
1261 }
1262
1263 pub async fn replace_pending_user_message(
1264 &self,
1265 message_id: &str,
1266 content: String,
1267 transcript_content: serde_json::Value,
1268 ) -> PendingUserMessageMutationResult {
1269 self.queued_transcript_injections
1270 .replace_pending_user_message(message_id, content, transcript_content)
1271 .await
1272 }
1273
1274 pub async fn push_queued_session_remind_from_params(
1275 &self,
1276 params: &serde_json::Value,
1277 ) -> Result<String, String> {
1278 let reminder = queued_session_remind_from_params(params)?;
1279 let reminder_id = reminder.reminder.id.clone();
1280 self.queued_transcript_injections
1281 .push_session_reminder(reminder)
1282 .await;
1283 Ok(reminder_id)
1284 }
1285
1286 pub async fn take_queued_user_messages(
1287 &self,
1288 include_interrupt_immediate: bool,
1289 include_finish_step: bool,
1290 include_audit_only: bool,
1291 ) -> Vec<QueuedUserMessage> {
1292 let mut state = self.queued_transcript_injections.inner.lock().await;
1293 let mut selected = Vec::new();
1294 let mut retained = VecDeque::new();
1295 while let Some(injection) = state.queue.pop_front() {
1296 let should_take = match injection.mode() {
1297 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1298 QueuedUserMessageMode::FinishStep => include_finish_step,
1299 QueuedUserMessageMode::AuditOnly => include_audit_only,
1300 };
1301 match (should_take, injection) {
1302 (true, QueuedTranscriptInjection::User(message)) => {
1303 state
1304 .delivered_user_message_ids
1305 .insert(message.message_id.clone());
1306 selected.push(message);
1307 }
1308 (_, injection) => retained.push_back(injection),
1309 }
1310 }
1311 state.queue = retained;
1312 selected
1313 }
1314
1315 pub async fn take_queued_transcript_injections(
1316 &self,
1317 include_interrupt_immediate: bool,
1318 include_finish_step: bool,
1319 include_audit_only: bool,
1320 ) -> Vec<QueuedTranscriptInjection> {
1321 let mut state = self.queued_transcript_injections.inner.lock().await;
1322 let mut selected = Vec::new();
1323 let mut retained = VecDeque::new();
1324 while let Some(injection) = state.queue.pop_front() {
1325 let should_take = match injection.mode() {
1326 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1327 QueuedUserMessageMode::FinishStep => include_finish_step,
1328 QueuedUserMessageMode::AuditOnly => include_audit_only,
1329 };
1330 if should_take {
1331 match &injection {
1332 QueuedTranscriptInjection::User(message) => {
1333 state
1334 .delivered_user_message_ids
1335 .insert(message.message_id.clone());
1336 }
1337 QueuedTranscriptInjection::Reminder(reminder) => {
1338 state
1339 .delivered_reminder_ids
1340 .insert(reminder.reminder.id.clone());
1341 }
1342 }
1343 selected.push(injection);
1344 } else {
1345 retained.push_back(injection);
1346 }
1347 }
1348 state.queue = retained;
1349 selected
1350 }
1351
1352 pub async fn take_queued_user_messages_for(
1353 &self,
1354 checkpoint: DeliveryCheckpoint,
1355 ) -> Vec<QueuedUserMessage> {
1356 match checkpoint {
1357 DeliveryCheckpoint::InterruptImmediate => {
1358 self.take_queued_user_messages(true, false, false).await
1359 }
1360 DeliveryCheckpoint::AfterCurrentOperation => {
1361 self.take_queued_user_messages(false, true, false).await
1362 }
1363 DeliveryCheckpoint::EndOfInteraction => {
1364 self.take_queued_user_messages(false, false, true).await
1365 }
1366 }
1367 }
1368
1369 pub async fn take_queued_transcript_injections_for(
1370 &self,
1371 checkpoint: DeliveryCheckpoint,
1372 ) -> Vec<QueuedTranscriptInjection> {
1373 match checkpoint {
1374 DeliveryCheckpoint::InterruptImmediate => {
1375 self.take_queued_transcript_injections(true, false, false)
1376 .await
1377 }
1378 DeliveryCheckpoint::AfterCurrentOperation => {
1379 self.take_queued_transcript_injections(false, true, false)
1380 .await
1381 }
1382 DeliveryCheckpoint::EndOfInteraction => {
1383 self.take_queued_transcript_injections(false, false, true)
1384 .await
1385 }
1386 }
1387 }
1388
1389 pub fn send_output(&self, text: &str) {
1391 self.notify("output", serde_json::json!({"text": text}));
1392 }
1393
1394 pub fn send_progress(
1396 &self,
1397 phase: &str,
1398 message: &str,
1399 progress: Option<i64>,
1400 total: Option<i64>,
1401 data: Option<serde_json::Value>,
1402 ) {
1403 let mut payload = serde_json::json!({"phase": phase, "message": message});
1404 if let Some(p) = progress {
1405 payload["progress"] = serde_json::json!(p);
1406 }
1407 if let Some(t) = total {
1408 payload["total"] = serde_json::json!(t);
1409 }
1410 if let Some(d) = data {
1411 payload["data"] = d;
1412 }
1413 self.notify("progress", payload);
1414 }
1415
1416 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1418 let mut payload = serde_json::json!({"level": level, "message": message});
1419 if let Some(f) = fields {
1420 payload["fields"] = f;
1421 }
1422 self.notify("log", payload);
1423 }
1424
1425 pub fn send_call_start(
1428 &self,
1429 call_id: &str,
1430 call_type: &str,
1431 name: &str,
1432 metadata: serde_json::Value,
1433 ) {
1434 let session_id = self.get_session_id();
1435 let script = self.get_script_name();
1436 let stream_publicly = metadata
1437 .get("stream_publicly")
1438 .and_then(|value| value.as_bool())
1439 .unwrap_or(true);
1440 self.visible_call_streams
1441 .lock()
1442 .unwrap_or_else(|e| e.into_inner())
1443 .insert(call_id.to_string(), stream_publicly);
1444 self.notify(
1445 "session/update",
1446 serde_json::json!({
1447 "sessionId": session_id,
1448 "update": {
1449 "sessionUpdate": "call_start",
1450 "content": {
1451 "toolCallId": call_id,
1452 "call_type": call_type,
1453 "name": name,
1454 "script": script,
1455 "metadata": metadata,
1456 },
1457 },
1458 }),
1459 );
1460 }
1461
1462 pub fn send_call_progress(
1465 &self,
1466 call_id: &str,
1467 delta: &str,
1468 accumulated_tokens: u64,
1469 user_visible: bool,
1470 ) {
1471 let session_id = self.get_session_id();
1472 let (visible_text, visible_delta) = {
1473 let stream_publicly = self
1474 .visible_call_streams
1475 .lock()
1476 .unwrap_or_else(|e| e.into_inner())
1477 .get(call_id)
1478 .copied()
1479 .unwrap_or(true);
1480 let mut states = self
1481 .visible_call_states
1482 .lock()
1483 .unwrap_or_else(|e| e.into_inner());
1484 let state = states.entry(call_id.to_string()).or_default();
1485 state.push(delta, stream_publicly)
1486 };
1487 self.notify(
1488 "session/update",
1489 serde_json::json!({
1490 "sessionId": session_id,
1491 "update": {
1492 "sessionUpdate": "call_progress",
1493 "content": {
1494 "toolCallId": call_id,
1495 "delta": delta,
1496 "accumulated_tokens": accumulated_tokens,
1497 "visible_text": visible_text,
1498 "visible_delta": visible_delta,
1499 "user_visible": user_visible,
1500 },
1501 },
1502 }),
1503 );
1504 }
1505
1506 pub fn send_call_end(
1508 &self,
1509 call_id: &str,
1510 call_type: &str,
1511 name: &str,
1512 duration_ms: u64,
1513 status: &str,
1514 metadata: serde_json::Value,
1515 ) {
1516 let session_id = self.get_session_id();
1517 let script = self.get_script_name();
1518 self.visible_call_states
1519 .lock()
1520 .unwrap_or_else(|e| e.into_inner())
1521 .remove(call_id);
1522 self.visible_call_streams
1523 .lock()
1524 .unwrap_or_else(|e| e.into_inner())
1525 .remove(call_id);
1526 self.notify(
1527 "session/update",
1528 serde_json::json!({
1529 "sessionId": session_id,
1530 "update": {
1531 "sessionUpdate": "call_end",
1532 "content": {
1533 "toolCallId": call_id,
1534 "call_type": call_type,
1535 "name": name,
1536 "script": script,
1537 "duration_ms": duration_ms,
1538 "status": status,
1539 "metadata": metadata,
1540 },
1541 },
1542 }),
1543 );
1544 }
1545
1546 pub fn send_worker_update(
1548 &self,
1549 worker_id: &str,
1550 worker_name: &str,
1551 status: &str,
1552 metadata: serde_json::Value,
1553 audit: Option<&MutationSessionRecord>,
1554 ) {
1555 let session_id = self.get_session_id();
1556 let script = self.get_script_name();
1557 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1558 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1559 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1560 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1561 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1562 let lifecycle = serde_json::json!({
1563 "event": status,
1564 "worker_id": worker_id,
1565 "worker_name": worker_name,
1566 "started_at": started_at,
1567 "finished_at": finished_at,
1568 });
1569 self.notify(
1570 "session/update",
1571 serde_json::json!({
1572 "sessionId": session_id,
1573 "update": {
1574 "sessionUpdate": "worker_update",
1575 "content": {
1576 "worker_id": worker_id,
1577 "worker_name": worker_name,
1578 "status": status,
1579 "script": script,
1580 "started_at": started_at,
1581 "finished_at": finished_at,
1582 "snapshot_path": snapshot_path,
1583 "run_id": run_id,
1584 "run_path": run_path,
1585 "lifecycle": lifecycle,
1586 "audit": audit,
1587 "metadata": metadata,
1588 },
1589 },
1590 }),
1591 );
1592 }
1593}
1594
1595pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1597 crate::stdlib::json_to_vm_value(val)
1598}
1599
1600fn parse_host_tools_list_response(
1601 result: serde_json::Value,
1602) -> Result<Vec<serde_json::Value>, VmError> {
1603 let tools = match result {
1604 serde_json::Value::Array(items) => items,
1605 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1606 map.get("result")
1607 .and_then(|value| value.get("tools"))
1608 .cloned()
1609 }) {
1610 Some(serde_json::Value::Array(items)) => items,
1611 _ => {
1612 return Err(VmError::Runtime(
1613 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1614 ));
1615 }
1616 },
1617 _ => {
1618 return Err(VmError::Runtime(
1619 "host/tools/list: unexpected response shape".into(),
1620 ));
1621 }
1622 };
1623
1624 let mut normalized = Vec::with_capacity(tools.len());
1625 for tool in tools {
1626 let serde_json::Value::Object(map) = tool else {
1627 return Err(VmError::Runtime(
1628 "host/tools/list: every tool must be an object".into(),
1629 ));
1630 };
1631 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1632 return Err(VmError::Runtime(
1633 "host/tools/list: every tool must include a string `name`".into(),
1634 ));
1635 };
1636 let description = map
1637 .get("description")
1638 .and_then(|value| value.as_str())
1639 .or_else(|| {
1640 map.get("short_description")
1641 .and_then(|value| value.as_str())
1642 })
1643 .unwrap_or_default();
1644 let schema = map
1645 .get("schema")
1646 .cloned()
1647 .or_else(|| map.get("parameters").cloned())
1648 .or_else(|| map.get("input_schema").cloned())
1649 .unwrap_or(serde_json::Value::Null);
1650 let deprecated = map
1651 .get("deprecated")
1652 .and_then(|value| value.as_bool())
1653 .unwrap_or(false);
1654 normalized.push(serde_json::json!({
1655 "name": name,
1656 "description": description,
1657 "schema": schema,
1658 "deprecated": deprecated,
1659 }));
1660 }
1661 Ok(normalized)
1662}
1663
1664#[cfg(test)]
1665mod tests {
1666 use super::*;
1667
1668 fn test_bridge() -> HostBridge {
1669 HostBridge::from_parts(
1670 Arc::new(Mutex::new(HashMap::new())),
1671 Arc::new(AtomicBool::new(false)),
1672 Arc::new(std::sync::Mutex::new(())),
1673 1,
1674 )
1675 }
1676
1677 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1678 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1679 Arc::new(Mutex::new(HashMap::new())),
1680 Arc::new(AtomicBool::new(false)),
1681 Arc::new(Notify::new()),
1682 Arc::new(|_| Ok(())),
1683 100,
1684 Some(owner.injection_state()),
1685 )
1686 }
1687
1688 #[test]
1689 fn test_json_rpc_request_format() {
1690 let request = crate::jsonrpc::request(
1691 1,
1692 "llm_call",
1693 serde_json::json!({
1694 "prompt": "Hello",
1695 "system": "Be helpful",
1696 }),
1697 );
1698 let s = serde_json::to_string(&request).unwrap();
1699 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1700 assert!(s.contains("\"id\":1"));
1701 assert!(s.contains("\"method\":\"llm_call\""));
1702 }
1703
1704 #[test]
1705 fn test_json_rpc_notification_format() {
1706 let notification =
1707 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1708 let s = serde_json::to_string(¬ification).unwrap();
1709 assert!(s.contains("\"method\":\"output\""));
1710 assert!(!s.contains("\"id\""));
1711 }
1712
1713 #[test]
1714 fn test_json_rpc_error_response_parsing() {
1715 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1716 assert!(response.get("error").is_some());
1717 assert_eq!(
1718 response["error"]["message"].as_str().unwrap(),
1719 "Invalid request"
1720 );
1721 }
1722
1723 #[test]
1724 fn test_json_rpc_success_response_parsing() {
1725 let response = crate::jsonrpc::response(
1726 1,
1727 serde_json::json!({
1728 "text": "Hello world",
1729 "input_tokens": 10,
1730 "output_tokens": 5,
1731 }),
1732 );
1733 assert!(response.get("result").is_some());
1734 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1735 }
1736
1737 #[test]
1738 fn test_cancelled_flag() {
1739 let cancelled = Arc::new(AtomicBool::new(false));
1740 assert!(!cancelled.load(Ordering::SeqCst));
1741 cancelled.store(true, Ordering::SeqCst);
1742 assert!(cancelled.load(Ordering::SeqCst));
1743 }
1744
1745 #[test]
1746 fn pending_host_calls_return_when_cancellation_arrives() {
1747 let runtime = tokio::runtime::Builder::new_current_thread()
1748 .enable_all()
1749 .build()
1750 .unwrap();
1751 runtime.block_on(async {
1752 let pending = Arc::new(Mutex::new(HashMap::new()));
1753 let cancelled = Arc::new(AtomicBool::new(false));
1754 let bridge = HostBridge::from_parts_with_writer(
1755 pending.clone(),
1756 cancelled.clone(),
1757 Arc::new(|_| Ok(())),
1758 1,
1759 );
1760
1761 let call = bridge.call("host/work", serde_json::json!({}));
1762 tokio::pin!(call);
1763
1764 loop {
1765 tokio::select! {
1766 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1767 _ = tokio::task::yield_now() => {}
1768 }
1769 if !pending.lock().await.is_empty() {
1770 break;
1771 }
1772 }
1773
1774 cancelled.store(true, Ordering::SeqCst);
1775 bridge.cancel_notify.notify_waiters();
1776
1777 let result = tokio::time::timeout(Duration::from_secs(1), call)
1778 .await
1779 .expect("pending call should observe cancellation promptly");
1780 assert!(
1781 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1782 );
1783 assert!(pending.lock().await.is_empty());
1784 });
1785 }
1786
1787 #[test]
1788 fn queued_messages_are_filtered_by_delivery_mode() {
1789 let runtime = tokio::runtime::Builder::new_current_thread()
1790 .enable_all()
1791 .build()
1792 .unwrap();
1793 runtime.block_on(async {
1794 let bridge = test_bridge();
1795 bridge
1796 .push_queued_user_message("first".to_string(), "finish_step")
1797 .await;
1798 bridge
1799 .push_queued_user_message("second".to_string(), "audit_only")
1800 .await;
1801
1802 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1803 assert_eq!(finish_step.len(), 1);
1804 assert_eq!(finish_step[0].content, "first");
1805
1806 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1807 assert_eq!(audit_only.len(), 1);
1808 assert_eq!(audit_only[0].content, "second");
1809 });
1810 }
1811
1812 #[test]
1813 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1814 let runtime = tokio::runtime::Builder::new_current_thread()
1815 .enable_all()
1816 .build()
1817 .unwrap();
1818 runtime.block_on(async {
1819 let bridge = test_bridge();
1820 let first_id = bridge
1821 .push_pending_user_message(
1822 "first".to_string(),
1823 serde_json::json!("first"),
1824 "audit_only",
1825 )
1826 .await;
1827 let second_id = bridge
1828 .push_pending_user_message(
1829 "second".to_string(),
1830 serde_json::json!("second"),
1831 "audit_only",
1832 )
1833 .await;
1834
1835 assert_eq!(
1836 bridge
1837 .replace_pending_user_message(
1838 &second_id,
1839 "second edited".to_string(),
1840 serde_json::json!("second edited"),
1841 )
1842 .await,
1843 PendingUserMessageMutationResult::Mutated
1844 );
1845 assert_eq!(
1846 bridge.revoke_pending_user_message(&first_id).await,
1847 PendingUserMessageMutationResult::Mutated
1848 );
1849 assert_eq!(
1850 bridge.revoke_pending_user_message(&first_id).await,
1851 PendingUserMessageMutationResult::AlreadyRevoked
1852 );
1853
1854 let delivered = bridge
1855 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1856 .await;
1857 assert_eq!(delivered.len(), 1);
1858 assert_eq!(delivered[0].message_id, second_id);
1859 assert_eq!(delivered[0].content, "second edited");
1860
1861 assert_eq!(
1862 bridge.revoke_pending_user_message(&second_id).await,
1863 PendingUserMessageMutationResult::AlreadyDelivered
1864 );
1865 assert_eq!(
1866 bridge
1867 .replace_pending_user_message(
1868 &second_id,
1869 "too late".to_string(),
1870 serde_json::json!("too late"),
1871 )
1872 .await,
1873 PendingUserMessageMutationResult::AlreadyDelivered
1874 );
1875 assert_eq!(
1876 bridge.revoke_pending_user_message("missing").await,
1877 PendingUserMessageMutationResult::UnknownMessageId
1878 );
1879 });
1880 }
1881
1882 #[test]
1883 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1884 let runtime = tokio::runtime::Builder::new_current_thread()
1885 .enable_all()
1886 .build()
1887 .unwrap();
1888 runtime.block_on(async {
1889 let bridge = test_bridge();
1890 let first_id = bridge
1891 .push_pending_user_message(
1892 "first".to_string(),
1893 serde_json::json!("first"),
1894 "finish_step",
1895 )
1896 .await;
1897 let second_id = bridge
1898 .push_pending_user_message(
1899 "second".to_string(),
1900 serde_json::json!("second"),
1901 "finish_step",
1902 )
1903 .await;
1904 assert_eq!(
1905 bridge
1906 .replace_pending_user_message(
1907 &first_id,
1908 "first edited".to_string(),
1909 serde_json::json!("first edited"),
1910 )
1911 .await,
1912 PendingUserMessageMutationResult::Mutated
1913 );
1914
1915 let delivered = bridge
1916 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1917 .await;
1918 assert_eq!(
1919 delivered
1920 .iter()
1921 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1922 .collect::<Vec<_>>(),
1923 vec![
1924 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1925 (&second_id, "second", QueuedUserMessageMode::FinishStep),
1926 ]
1927 );
1928 });
1929 }
1930
1931 #[test]
1932 fn pending_user_message_state_survives_bridge_replacement() {
1933 let runtime = tokio::runtime::Builder::new_current_thread()
1934 .enable_all()
1935 .build()
1936 .unwrap();
1937 runtime.block_on(async {
1938 let bridge = test_bridge();
1939 let revoked_id = bridge
1940 .push_pending_user_message(
1941 "revoke me".to_string(),
1942 serde_json::json!("revoke me"),
1943 "audit_only",
1944 )
1945 .await;
1946 let delivered_id = bridge
1947 .push_pending_user_message(
1948 "deliver me".to_string(),
1949 serde_json::json!("deliver me"),
1950 "audit_only",
1951 )
1952 .await;
1953 assert_eq!(
1954 bridge.revoke_pending_user_message(&revoked_id).await,
1955 PendingUserMessageMutationResult::Mutated
1956 );
1957 bridge.cancelled.store(true, Ordering::SeqCst);
1958
1959 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1960 assert_eq!(
1961 replacement_bridge
1962 .revoke_pending_user_message(&revoked_id)
1963 .await,
1964 PendingUserMessageMutationResult::AlreadyRevoked
1965 );
1966 let delivered = replacement_bridge
1967 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1968 .await;
1969 assert_eq!(delivered.len(), 1);
1970 assert_eq!(delivered[0].message_id, delivered_id);
1971 assert_eq!(delivered[0].content, "deliver me");
1972 assert_eq!(
1973 bridge.revoke_pending_user_message(&delivered_id).await,
1974 PendingUserMessageMutationResult::AlreadyDelivered
1975 );
1976 });
1977 }
1978
1979 #[test]
1980 fn queued_transcript_injections_preserve_user_reminder_separation() {
1981 let runtime = tokio::runtime::Builder::new_current_thread()
1982 .enable_all()
1983 .build()
1984 .unwrap();
1985 runtime.block_on(async {
1986 let bridge = test_bridge();
1987 bridge
1988 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1989 .await;
1990 let reminder_id = bridge
1991 .push_queued_session_remind_from_params(&serde_json::json!({
1992 "body": "Host-provided ambient context.",
1993 "tags": ["host"],
1994 "dedupe_key": "host-context",
1995 "ttl_turns": 2,
1996 "mode": "audit_only",
1997 "_meta": {"harn": {"source": "test"}},
1998 }))
1999 .await
2000 .expect("valid reminder");
2001
2002 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2003 assert_eq!(finish_step.len(), 1);
2004 assert_eq!(finish_step[0].content, "human follow-up");
2005
2006 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2007 assert!(no_user_messages.is_empty());
2008
2009 let injections = bridge
2010 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2011 .await;
2012 assert_eq!(injections.len(), 1);
2013 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2014 panic!("expected queued reminder");
2015 };
2016 assert_eq!(reminder.reminder.id, reminder_id);
2017 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2018 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2019 assert_eq!(
2020 reminder.reminder.dedupe_key.as_deref(),
2021 Some("host-context")
2022 );
2023 assert_eq!(reminder.reminder.ttl_turns, Some(2));
2024 assert_eq!(
2025 reminder.reminder.source,
2026 crate::llm::helpers::ReminderSource::Bridge
2027 );
2028 });
2029 }
2030
2031 #[test]
2032 fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2033 let runtime = tokio::runtime::Builder::new_current_thread()
2034 .enable_all()
2035 .build()
2036 .unwrap();
2037 runtime.block_on(async {
2038 let bridge = test_bridge();
2039 let message_id = bridge
2040 .push_pending_user_message(
2041 "human follow-up".to_string(),
2042 serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2043 "finish_step",
2044 )
2045 .await;
2046 let reminder_id = bridge
2047 .push_queued_session_remind_from_params(&serde_json::json!({
2048 "id": "rem-test",
2049 "body": "Host reminder",
2050 "tags": ["host"],
2051 "dedupe_key": "host-reminder",
2052 "ttl_turns": 2,
2053 "mode": "interrupt_immediate",
2054 }))
2055 .await
2056 .expect("valid session/remind payload");
2057
2058 let pending = bridge.pending_injections_json().await;
2059 assert_eq!(pending["pendingCount"], 2);
2060 assert_eq!(pending["injections"][0]["kind"], "user");
2061 assert_eq!(pending["injections"][0]["id"], message_id);
2062 assert_eq!(pending["injections"][0]["messageId"], message_id);
2063 assert_eq!(pending["injections"][0]["mode"], "finish_step");
2064 assert_eq!(pending["injections"][0]["position"], 0);
2065 assert_eq!(pending["injections"][1]["kind"], "reminder");
2066 assert_eq!(pending["injections"][1]["id"], reminder_id);
2067 assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2068 assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2069 assert_eq!(pending["injections"][1]["body"], "Host reminder");
2070 assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2071 assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2072 assert_eq!(pending["injections"][1]["position"], 1);
2073 });
2074 }
2075
2076 #[test]
2077 fn pending_reminders_support_revoke_and_delivery_states() {
2078 let runtime = tokio::runtime::Builder::new_current_thread()
2079 .enable_all()
2080 .build()
2081 .unwrap();
2082 runtime.block_on(async {
2083 let bridge = test_bridge();
2084 let revoked_id = bridge
2085 .push_queued_session_remind_from_params(&serde_json::json!({
2086 "id": "rem-revoke",
2087 "body": "remove me",
2088 "mode": "finish_step",
2089 }))
2090 .await
2091 .expect("valid session/remind payload");
2092 let delivered_id = bridge
2093 .push_queued_session_remind_from_params(&serde_json::json!({
2094 "id": "rem-deliver",
2095 "body": "deliver me",
2096 "mode": "finish_step",
2097 }))
2098 .await
2099 .expect("valid session/remind payload");
2100
2101 assert_eq!(
2102 bridge.revoke_pending_reminder(&revoked_id).await,
2103 PendingReminderMutationResult::Mutated
2104 );
2105 assert_eq!(
2106 bridge.revoke_pending_reminder(&revoked_id).await,
2107 PendingReminderMutationResult::AlreadyRevoked
2108 );
2109
2110 let pending = bridge.pending_injections_json().await;
2111 assert_eq!(pending["pendingCount"], 1);
2112 assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2113
2114 let delivered = bridge
2115 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2116 .await;
2117 assert_eq!(delivered.len(), 1);
2118 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2119 panic!("expected delivered reminder");
2120 };
2121 assert_eq!(reminder.reminder.id, delivered_id);
2122
2123 assert_eq!(
2124 bridge.revoke_pending_reminder(&delivered_id).await,
2125 PendingReminderMutationResult::AlreadyDelivered
2126 );
2127 assert_eq!(
2128 bridge.revoke_pending_reminder("missing").await,
2129 PendingReminderMutationResult::UnknownReminderId
2130 );
2131 });
2132 }
2133
2134 #[test]
2135 fn bridge_remind_modes_honor_delivery_checkpoints() {
2136 let runtime = tokio::runtime::Builder::new_current_thread()
2137 .enable_all()
2138 .build()
2139 .unwrap();
2140 runtime.block_on(async {
2141 let cases = [
2142 (
2143 "interrupt_immediate",
2144 DeliveryCheckpoint::InterruptImmediate,
2145 DeliveryCheckpoint::AfterCurrentOperation,
2146 ),
2147 (
2148 "finish_step",
2149 DeliveryCheckpoint::AfterCurrentOperation,
2150 DeliveryCheckpoint::EndOfInteraction,
2151 ),
2152 (
2153 "audit_only",
2154 DeliveryCheckpoint::EndOfInteraction,
2155 DeliveryCheckpoint::InterruptImmediate,
2156 ),
2157 ];
2158
2159 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2160 let bridge = test_bridge();
2161 bridge
2162 .push_queued_session_remind_from_params(&serde_json::json!({
2163 "body": format!("Reminder for {mode}"),
2164 "mode": mode,
2165 }))
2166 .await
2167 .expect("valid session/remind payload");
2168
2169 let premature = bridge
2170 .take_queued_transcript_injections_for(wrong_checkpoint)
2171 .await;
2172 assert!(
2173 premature.is_empty(),
2174 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2175 );
2176
2177 let delivered = bridge
2178 .take_queued_transcript_injections_for(expected_checkpoint)
2179 .await;
2180 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2181 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2182 panic!("expected reminder for {mode}");
2183 };
2184 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2185 }
2186 });
2187 }
2188
2189 #[test]
2190 fn session_remind_validation_rejects_user_message_shape() {
2191 let err = queued_session_remind_from_params(&serde_json::json!({
2192 "content": "this is still a user message",
2193 "mode": "interrupt_immediate",
2194 }))
2195 .expect_err("session/remind must require a reminder body");
2196 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2197 assert!(err.contains("body"));
2198 }
2199
2200 #[test]
2201 fn session_remind_validation_rejects_unknown_options_separately() {
2202 let err = queued_session_remind_from_params(&serde_json::json!({
2203 "body": "valid body",
2204 "unknown_host_field": true,
2205 }))
2206 .expect_err("session/remind must reject unknown top-level fields");
2207 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2208 assert!(err.contains("unknown_host_field"));
2209 }
2210
2211 #[test]
2212 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2213 let err = queued_session_remind_from_params(&serde_json::json!({
2214 "body": "valid body",
2215 "propagate": "workspace",
2216 }))
2217 .expect_err("session/remind must reject unknown propagate values");
2218 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2219 assert!(err.contains("propagate"));
2220 }
2221
2222 #[test]
2223 fn test_json_result_to_vm_value_string() {
2224 let val = serde_json::json!("hello");
2225 let vm_val = json_result_to_vm_value(&val);
2226 assert_eq!(vm_val.display(), "hello");
2227 }
2228
2229 #[test]
2230 fn test_json_result_to_vm_value_dict() {
2231 let val = serde_json::json!({"name": "test", "count": 42});
2232 let vm_val = json_result_to_vm_value(&val);
2233 let VmValue::Dict(d) = &vm_val else {
2234 unreachable!("Expected Dict, got {:?}", vm_val);
2235 };
2236 assert_eq!(d.get("name").unwrap().display(), "test");
2237 assert_eq!(d.get("count").unwrap().display(), "42");
2238 }
2239
2240 #[test]
2241 fn test_json_result_to_vm_value_null() {
2242 let val = serde_json::json!(null);
2243 let vm_val = json_result_to_vm_value(&val);
2244 assert!(matches!(vm_val, VmValue::Nil));
2245 }
2246
2247 #[test]
2248 fn test_json_result_to_vm_value_nested() {
2249 let val = serde_json::json!({
2250 "text": "response",
2251 "tool_calls": [
2252 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2253 ],
2254 "input_tokens": 100,
2255 "output_tokens": 50,
2256 });
2257 let vm_val = json_result_to_vm_value(&val);
2258 let VmValue::Dict(d) = &vm_val else {
2259 unreachable!("Expected Dict, got {:?}", vm_val);
2260 };
2261 assert_eq!(d.get("text").unwrap().display(), "response");
2262 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2263 unreachable!("Expected List for tool_calls");
2264 };
2265 assert_eq!(list.len(), 1);
2266 }
2267
2268 #[test]
2269 fn parse_host_tools_list_accepts_object_wrapper() {
2270 let tools = parse_host_tools_list_response(serde_json::json!({
2271 "tools": [
2272 {
2273 "name": "Read",
2274 "description": "Read a file",
2275 "schema": {"type": "object"},
2276 }
2277 ]
2278 }))
2279 .expect("tool list");
2280
2281 assert_eq!(tools.len(), 1);
2282 assert_eq!(tools[0]["name"], "Read");
2283 assert_eq!(tools[0]["deprecated"], false);
2284 }
2285
2286 #[test]
2287 fn parse_host_tools_list_accepts_compat_fields() {
2288 let tools = parse_host_tools_list_response(serde_json::json!({
2289 "result": {
2290 "tools": [
2291 {
2292 "name": "Edit",
2293 "short_description": "Apply an edit",
2294 "input_schema": {"type": "object"},
2295 "deprecated": true,
2296 }
2297 ]
2298 }
2299 }))
2300 .expect("tool list");
2301
2302 assert_eq!(tools[0]["description"], "Apply an edit");
2303 assert_eq!(tools[0]["schema"]["type"], "object");
2304 assert_eq!(tools[0]["deprecated"], true);
2305 }
2306
2307 #[test]
2308 fn parse_host_tools_list_requires_tool_names() {
2309 let err = parse_host_tools_list_response(serde_json::json!({
2310 "tools": [
2311 {"description": "missing name"}
2312 ]
2313 }))
2314 .expect_err("expected error");
2315 assert!(err
2316 .to_string()
2317 .contains("host/tools/list: every tool must include a string `name`"));
2318 }
2319
2320 #[test]
2321 fn test_timeout_duration() {
2322 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2323 }
2324}