1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32 Arc::new(move |line: &str| {
33 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34 let mut stdout = std::io::stdout().lock();
35 stdout
36 .write_all(line.as_bytes())
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .write_all(b"\n")
40 .map_err(|e| format!("Bridge write error: {e}"))?;
41 stdout
42 .flush()
43 .map_err(|e| format!("Bridge flush error: {e}"))?;
44 Ok(())
45 })
46}
47
48pub struct HostBridge {
55 next_id: AtomicU64,
56 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58 cancelled: Arc<AtomicBool>,
60 cancel_notify: Arc<Notify>,
62 writer: HostBridgeWriter,
64 session_id: std::sync::Mutex<String>,
66 script_name: std::sync::Mutex<String>,
68 queued_transcript_injections: HostBridgeInjectionState,
70 resume_requested: Arc<AtomicBool>,
72 skills_reload_requested: Arc<AtomicBool>,
77 daemon_idle: Arc<AtomicBool>,
79 prompt_stop_reason: std::sync::Mutex<Option<String>>,
85 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89 in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94 module_path: PathBuf,
95 exported_functions: BTreeMap<String, Arc<VmClosure>>,
96 vm: Vm,
97}
98
99impl InProcessHost {
100 fn dispatch<'a>(
106 &'a self,
107 method: &'a str,
108 params: serde_json::Value,
109 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110 Box::pin(async move {
111 match method {
112 "builtin_call" => {
113 let name = params
114 .get("name")
115 .and_then(|value| value.as_str())
116 .unwrap_or_default();
117 let args = params
118 .get("args")
119 .and_then(|value| value.as_array())
120 .cloned()
121 .unwrap_or_default()
122 .into_iter()
123 .map(|value| json_result_to_vm_value(&value))
124 .collect::<Vec<_>>();
125 self.invoke_export(name, &args).await
126 }
127 "host/tools/list" => self
128 .invoke_optional_export("host_tools_list", &[])
129 .await
130 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131 "session/request_permission" => self.request_permission(params).await,
132 other => Err(VmError::Runtime(format!(
133 "playground host backend does not implement bridge method '{other}'"
134 ))),
135 }
136 })
137 }
138
139 async fn invoke_export(
140 &self,
141 name: &str,
142 args: &[VmValue],
143 ) -> Result<serde_json::Value, VmError> {
144 let Some(closure) = self.exported_functions.get(name) else {
145 return Err(VmError::Runtime(format!(
146 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147 self.module_path.display()
148 )));
149 };
150
151 let mut vm = self.vm.child_vm_for_host();
152 let result = vm.call_closure_pub(closure, args).await?;
153 Ok(crate::llm::vm_value_to_json(&result))
154 }
155
156 async fn invoke_optional_export(
157 &self,
158 name: &str,
159 args: &[VmValue],
160 ) -> Result<Option<serde_json::Value>, VmError> {
161 if !self.exported_functions.contains_key(name) {
162 return Ok(None);
163 }
164 self.invoke_export(name, args).await.map(Some)
165 }
166
167 async fn request_permission(
168 &self,
169 params: serde_json::Value,
170 ) -> Result<serde_json::Value, VmError> {
171 let Some(closure) = self.exported_functions.get("request_permission") else {
174 return Ok(crate::llm::acp_permission::allow_response());
175 };
176
177 let tool_call = params.get("toolCall");
178 let tool_name = tool_call
179 .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182 .and_then(|value| value.as_str())
183 .unwrap_or_default();
184 let tool_args = tool_call
185 .and_then(|tool_call| tool_call.get("rawInput"))
186 .map(json_result_to_vm_value)
187 .unwrap_or(VmValue::Nil);
188 let full_payload = json_result_to_vm_value(¶ms);
189
190 let arg_count = closure.func.params.len();
191 let args = if arg_count >= 3 {
192 vec![
193 VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
194 tool_args,
195 full_payload,
196 ]
197 } else if arg_count == 2 {
198 vec![
199 VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
200 tool_args,
201 ]
202 } else if arg_count == 1 {
203 vec![full_payload]
204 } else {
205 Vec::new()
206 };
207
208 let mut vm = self.vm.child_vm_for_host();
209 let result = vm.call_closure_pub(closure, &args).await?;
210 let payload = match result {
215 VmValue::Bool(granted) => {
216 if granted {
217 crate::llm::acp_permission::allow_response()
218 } else {
219 crate::llm::acp_permission::reject_response(None)
220 }
221 }
222 VmValue::String(reason) if !reason.is_empty() => {
223 crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224 }
225 other => {
226 let json = crate::llm::vm_value_to_json(&other);
227 if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228 if granted {
229 crate::llm::acp_permission::allow_response()
230 } else {
231 crate::llm::acp_permission::reject_response(
232 json.get("reason")
233 .and_then(|value| value.as_str())
234 .map(str::to_string),
235 )
236 }
237 } else if json.get("outcome").is_some() {
238 json
240 } else if other.is_truthy() {
241 crate::llm::acp_permission::allow_response()
242 } else {
243 crate::llm::acp_permission::reject_response(None)
244 }
245 }
246 };
247 Ok(payload)
248 }
249}
250
251#[derive(Clone, Copy, Debug, PartialEq, Eq)]
261pub enum QueuedUserMessageMode {
262 InterruptImmediate,
263 FinishStep,
264 AuditOnly,
265}
266
267#[derive(Clone, Copy, Debug, PartialEq, Eq)]
268pub enum DeliveryCheckpoint {
269 InterruptImmediate,
270 AfterCurrentOperation,
271 EndOfInteraction,
272}
273
274impl QueuedUserMessageMode {
275 fn from_str(value: &str) -> Self {
276 match value {
277 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
278 "finish_step" | "after_current_operation" | "steer" => Self::FinishStep,
282 "queue" => Self::AuditOnly,
284 _ => Self::AuditOnly,
289 }
290 }
291
292 fn as_str(self) -> &'static str {
293 match self {
294 Self::InterruptImmediate => "interrupt_immediate",
295 Self::FinishStep => "finish_step",
296 Self::AuditOnly => "audit_only",
297 }
298 }
299}
300
301#[derive(Clone, Debug, PartialEq, Eq)]
302pub struct QueuedUserMessage {
303 pub message_id: String,
304 pub content: String,
305 pub transcript_content: serde_json::Value,
306 pub mode: QueuedUserMessageMode,
307}
308
309#[derive(Clone, Debug, PartialEq, Eq)]
310pub struct QueuedReminder {
311 pub reminder: crate::llm::helpers::SystemReminder,
312 pub mode: QueuedUserMessageMode,
313}
314
315#[derive(Clone, Debug, PartialEq, Eq)]
316pub enum QueuedTranscriptInjection {
317 User(QueuedUserMessage),
318 Reminder(QueuedReminder),
319}
320
321#[derive(Debug, Default)]
322struct QueuedTranscriptInjections {
323 queue: VecDeque<QueuedTranscriptInjection>,
324 revoked_user_message_ids: HashSet<String>,
325 delivered_user_message_ids: HashSet<String>,
326 revoked_reminder_ids: HashSet<String>,
327 delivered_reminder_ids: HashSet<String>,
328}
329
330#[derive(Clone, Debug, Default)]
331pub struct HostBridgeInjectionState {
332 inner: Arc<Mutex<QueuedTranscriptInjections>>,
333}
334
335#[derive(Clone, Copy, Debug, PartialEq, Eq)]
336pub enum PendingUserMessageMutationResult {
337 Mutated,
338 AlreadyRevoked,
339 AlreadyDelivered,
340 UnknownMessageId,
341}
342
343impl QueuedTranscriptInjection {
344 fn mode(&self) -> QueuedUserMessageMode {
345 match self {
346 Self::User(message) => message.mode,
347 Self::Reminder(reminder) => reminder.mode,
348 }
349 }
350
351 fn pending_json(&self, position: usize) -> serde_json::Value {
352 match self {
353 Self::User(message) => serde_json::json!({
354 "kind": "user",
355 "id": message.message_id,
356 "messageId": message.message_id,
357 "mode": message.mode.as_str(),
358 "position": position,
359 "content": message.transcript_content,
360 }),
361 Self::Reminder(reminder) => serde_json::json!({
362 "kind": "reminder",
363 "id": reminder.reminder.id,
364 "reminderId": reminder.reminder.id,
365 "mode": reminder.mode.as_str(),
366 "position": position,
367 "body": reminder.reminder.body,
368 "tags": reminder.reminder.tags,
369 "dedupeKey": reminder.reminder.dedupe_key,
370 "ttlTurns": reminder.reminder.ttl_turns,
371 "preserveOnCompact": reminder.reminder.preserve_on_compact,
372 "propagate": reminder.reminder.propagate.as_str(),
373 "roleHint": reminder.reminder.role_hint.as_str(),
374 "source": reminder.reminder.source.as_str(),
375 "firedAtTurn": reminder.reminder.fired_at_turn,
376 "originatingAgentId": reminder.reminder.originating_agent_id,
377 }),
378 }
379 }
380}
381
382#[derive(Clone, Copy, Debug, PartialEq, Eq)]
383pub enum PendingReminderMutationResult {
384 Mutated,
385 AlreadyRevoked,
386 AlreadyDelivered,
387 UnknownReminderId,
388}
389
390fn new_inject_message_id() -> String {
391 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
392}
393
394impl HostBridgeInjectionState {
395 pub fn new() -> Self {
396 Self::default()
397 }
398
399 pub async fn push_pending_user_message(
400 &self,
401 content: String,
402 transcript_content: serde_json::Value,
403 mode: &str,
404 ) -> String {
405 let message_id = new_inject_message_id();
406 self.inner
407 .lock()
408 .await
409 .queue
410 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
411 message_id: message_id.clone(),
412 content,
413 transcript_content,
414 mode: QueuedUserMessageMode::from_str(mode),
415 }));
416 message_id
417 }
418
419 pub async fn revoke_pending_user_message(
420 &self,
421 message_id: &str,
422 ) -> PendingUserMessageMutationResult {
423 let mut state = self.inner.lock().await;
424 let mut retained = VecDeque::new();
425 let mut revoked = false;
426 while let Some(injection) = state.queue.pop_front() {
427 match &injection {
428 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
429 revoked = true;
430 }
431 _ => retained.push_back(injection),
432 }
433 }
434 state.queue = retained;
435 if revoked {
436 state
437 .revoked_user_message_ids
438 .insert(message_id.to_string());
439 return PendingUserMessageMutationResult::Mutated;
440 }
441 if state.revoked_user_message_ids.contains(message_id) {
442 PendingUserMessageMutationResult::AlreadyRevoked
443 } else if state.delivered_user_message_ids.contains(message_id) {
444 PendingUserMessageMutationResult::AlreadyDelivered
445 } else {
446 PendingUserMessageMutationResult::UnknownMessageId
447 }
448 }
449
450 pub async fn revoke_pending_reminder(
451 &self,
452 reminder_id: &str,
453 ) -> PendingReminderMutationResult {
454 let mut state = self.inner.lock().await;
455 let mut retained = VecDeque::new();
456 let mut revoked = false;
457 while let Some(injection) = state.queue.pop_front() {
458 match &injection {
459 QueuedTranscriptInjection::Reminder(reminder)
460 if reminder.reminder.id == reminder_id =>
461 {
462 revoked = true;
463 }
464 _ => retained.push_back(injection),
465 }
466 }
467 state.queue = retained;
468 if revoked {
469 state.revoked_reminder_ids.insert(reminder_id.to_string());
470 return PendingReminderMutationResult::Mutated;
471 }
472 if state.revoked_reminder_ids.contains(reminder_id) {
473 PendingReminderMutationResult::AlreadyRevoked
474 } else if state.delivered_reminder_ids.contains(reminder_id) {
475 PendingReminderMutationResult::AlreadyDelivered
476 } else {
477 PendingReminderMutationResult::UnknownReminderId
478 }
479 }
480
481 pub async fn replace_pending_user_message(
482 &self,
483 message_id: &str,
484 content: String,
485 transcript_content: serde_json::Value,
486 ) -> PendingUserMessageMutationResult {
487 let mut state = self.inner.lock().await;
488 for injection in &mut state.queue {
489 if let QueuedTranscriptInjection::User(message) = injection {
490 if message.message_id == message_id {
491 message.content = content;
492 message.transcript_content = transcript_content;
493 return PendingUserMessageMutationResult::Mutated;
494 }
495 }
496 }
497 if state.revoked_user_message_ids.contains(message_id) {
498 PendingUserMessageMutationResult::AlreadyRevoked
499 } else if state.delivered_user_message_ids.contains(message_id) {
500 PendingUserMessageMutationResult::AlreadyDelivered
501 } else {
502 PendingUserMessageMutationResult::UnknownMessageId
503 }
504 }
505
506 async fn push_session_reminder(&self, reminder: QueuedReminder) {
507 self.inner
508 .lock()
509 .await
510 .queue
511 .push_back(QueuedTranscriptInjection::Reminder(reminder));
512 }
513
514 pub async fn pending_injections_json(&self) -> serde_json::Value {
515 let state = self.inner.lock().await;
516 let injections = state
517 .queue
518 .iter()
519 .enumerate()
520 .map(|(position, injection)| injection.pending_json(position))
521 .collect::<Vec<_>>();
522 serde_json::json!({
523 "pendingCount": injections.len(),
524 "injections": injections,
525 })
526 }
527}
528
529fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
530 format!(
531 "{}: {}",
532 Code::ReminderUnknownOption.as_str(),
533 message.as_ref()
534 )
535}
536
537fn session_remind_shape_error(message: impl AsRef<str>) -> String {
538 format!(
539 "{}: {}",
540 Code::ReminderInvalidShape.as_str(),
541 message.as_ref()
542 )
543}
544
545fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
546 format!(
547 "{}: {}",
548 Code::ReminderUnknownPropagate.as_str(),
549 message.as_ref()
550 )
551}
552
553fn string_field(
554 map: &serde_json::Map<String, serde_json::Value>,
555 key: &str,
556 required: bool,
557) -> Result<Option<String>, String> {
558 match map.get(key) {
559 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
560 format!("`{key}` must be a non-empty string"),
561 )),
562 None | Some(serde_json::Value::Null) => Ok(None),
563 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
564 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
565 ),
566 Some(serde_json::Value::String(value)) => {
567 let trimmed = value.trim();
568 if trimmed.is_empty() {
569 Ok(None)
570 } else {
571 Ok(Some(trimmed.to_string()))
572 }
573 }
574 Some(other) => Err(session_remind_shape_error(format!(
575 "`{key}` must be a string, got {other}"
576 ))),
577 }
578}
579
580fn bool_field(
581 map: &serde_json::Map<String, serde_json::Value>,
582 key: &str,
583) -> Result<Option<bool>, String> {
584 match map.get(key) {
585 None | Some(serde_json::Value::Null) => Ok(None),
586 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
587 Some(other) => Err(session_remind_shape_error(format!(
588 "`{key}` must be a bool, got {other}"
589 ))),
590 }
591}
592
593fn int_field(
594 map: &serde_json::Map<String, serde_json::Value>,
595 key: &str,
596) -> Result<Option<i64>, String> {
597 match map.get(key) {
598 None | Some(serde_json::Value::Null) => Ok(None),
599 Some(serde_json::Value::Number(value)) => {
600 let Some(value) = value.as_i64() else {
601 return Err(session_remind_shape_error(format!(
602 "`{key}` must be an integer"
603 )));
604 };
605 Ok(Some(value))
606 }
607 Some(other) => Err(session_remind_shape_error(format!(
608 "`{key}` must be an int, got {other}"
609 ))),
610 }
611}
612
613fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
614 let Some(value) = map.get("tags") else {
615 return Ok(Vec::new());
616 };
617 if value.is_null() {
618 return Ok(Vec::new());
619 }
620 let Some(values) = value.as_array() else {
621 return Err(session_remind_shape_error("`tags` must be a list"));
622 };
623 let mut tags = Vec::new();
624 for value in values {
625 let Some(tag) = value.as_str() else {
626 return Err(session_remind_shape_error(format!(
627 "`tags` entries must be strings, got {value}"
628 )));
629 };
630 let tag = tag.trim();
631 if tag.is_empty() {
632 return Err(session_remind_shape_error(
633 "`tags` entries must be non-empty strings",
634 ));
635 }
636 if !tags.iter().any(|existing| existing == tag) {
637 tags.push(tag.to_string());
638 }
639 }
640 Ok(tags)
641}
642
643fn session_remind_payload_from_value(
644 value: &serde_json::Value,
645) -> Result<crate::llm::helpers::SystemReminder, String> {
646 let Some(map) = value.as_object() else {
647 return Err(session_remind_shape_error(
648 "session/remind payload must be a reminder object",
649 ));
650 };
651 const ALLOWED: &[&str] = &[
652 "_meta",
653 "body",
654 "dedupe_key",
655 "fired_at_turn",
656 "id",
657 "preserve_on_compact",
658 "propagate",
659 "role_hint",
660 "source",
661 "tags",
662 "ttl_turns",
663 ];
664 let unknown = map
665 .keys()
666 .filter(|key| !ALLOWED.contains(&key.as_str()))
667 .map(String::as_str)
668 .collect::<Vec<_>>();
669 if !unknown.is_empty() {
670 if unknown.contains(&"content") {
671 return Err(session_remind_shape_error(
672 "session/remind expects reminder `body`, not user-message `content`",
673 ));
674 }
675 return Err(reminder_unknown_option_error(format!(
676 "unknown reminder option(s): {}",
677 unknown.join(", ")
678 )));
679 }
680 if let Some(meta) = map.get("_meta") {
681 if !meta.is_null() && !meta.is_object() {
682 return Err(session_remind_shape_error("`_meta` must be an object"));
683 }
684 }
685 let ttl_turns = int_field(map, "ttl_turns")?;
686 if let Some(value) = ttl_turns {
687 if value <= 0 {
688 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
689 }
690 }
691 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
692 if fired_at_turn < 0 {
693 return Err(session_remind_shape_error(
694 "`fired_at_turn` must be >= 0 when provided",
695 ));
696 }
697 match string_field(map, "source", false)?.as_deref() {
698 None | Some("bridge") => {}
699 Some(_) => {
700 return Err(session_remind_shape_error(
701 "`source` for session/remind must be bridge when provided",
702 ))
703 }
704 }
705 let propagate = match string_field(map, "propagate", false)?.as_deref() {
706 None => crate::llm::helpers::ReminderPropagate::Session,
707 Some("all") => crate::llm::helpers::ReminderPropagate::All,
708 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
709 Some("none") => crate::llm::helpers::ReminderPropagate::None,
710 Some(_) => {
711 return Err(reminder_unknown_propagate_error(
712 "`propagate` must be one of all, session, or none",
713 ))
714 }
715 };
716 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
717 None => crate::llm::helpers::ReminderRoleHint::System,
718 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
719 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
720 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
721 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
722 Some(_) => {
723 return Err(session_remind_shape_error(
724 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
725 ))
726 }
727 };
728 Ok(crate::llm::helpers::SystemReminder {
729 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
730 tags: tags_field(map)?,
731 dedupe_key: string_field(map, "dedupe_key", false)?,
732 ttl_turns,
733 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
734 propagate,
735 role_hint,
736 source: crate::llm::helpers::ReminderSource::Bridge,
737 body: string_field(map, "body", true)?.unwrap_or_default(),
738 fired_at_turn,
739 originating_agent_id: None,
740 })
741}
742
743fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
753 let session_id = params
754 .get("sessionId")
755 .or_else(|| params.get("session_id"))
756 .and_then(|value| value.as_str())
757 .unwrap_or_default();
758 let call_id = params
759 .get("toolCallId")
760 .or_else(|| params.get("tool_call_id"))
761 .or_else(|| params.get("callId"))
762 .or_else(|| params.get("call_id"))
763 .and_then(|value| value.as_str())
764 .unwrap_or_default();
765 if call_id.is_empty() {
766 return;
767 }
768 let reason = params
769 .get("reason")
770 .and_then(|value| value.as_str())
771 .unwrap_or("host cancelled in-flight tool call")
772 .to_string();
773 let inject_reminder = params
774 .get("injectReminder")
775 .or_else(|| params.get("inject_reminder"))
776 .and_then(|value| value.as_bool())
777 .unwrap_or(true);
778 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
779}
780
781fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
782 let mode = QueuedUserMessageMode::from_str(
783 params
784 .get("mode")
785 .and_then(|value| value.as_str())
786 .unwrap_or("audit_only"),
787 );
788 let reminder_value = if let Some(reminder) = params.get("reminder") {
789 reminder.clone()
790 } else {
791 let Some(params) = params.as_object() else {
792 return Err(session_remind_shape_error(
793 "session/remind params must be an object",
794 ));
795 };
796 let mut reminder = params.clone();
797 reminder.remove("mode");
798 reminder.remove("sessionId");
799 reminder.remove("session_id");
800 serde_json::Value::Object(reminder)
801 };
802 Ok(QueuedReminder {
803 reminder: session_remind_payload_from_value(&reminder_value)?,
804 mode,
805 })
806}
807
808#[allow(clippy::new_without_default)]
810impl HostBridge {
811 pub fn new() -> Self {
816 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
817 Arc::new(Mutex::new(HashMap::new()));
818 let cancelled = Arc::new(AtomicBool::new(false));
819 let cancel_notify = Arc::new(Notify::new());
820 let queued_transcript_injections = HostBridgeInjectionState::default();
821 let resume_requested = Arc::new(AtomicBool::new(false));
822 let skills_reload_requested = Arc::new(AtomicBool::new(false));
823 let daemon_idle = Arc::new(AtomicBool::new(false));
824
825 let pending_clone = pending.clone();
827 let cancelled_clone = cancelled.clone();
828 let cancel_notify_clone = cancel_notify.clone();
829 let queued_clone = queued_transcript_injections.clone();
830 let resume_clone = resume_requested.clone();
831 let skills_reload_clone = skills_reload_requested.clone();
832 tokio::task::spawn_local(async move {
833 let stdin = tokio::io::stdin();
834 let reader = tokio::io::BufReader::new(stdin);
835 let mut lines = reader.lines();
836
837 while let Ok(Some(line)) = lines.next_line().await {
838 let line = line.trim().to_string();
839 if line.is_empty() {
840 continue;
841 }
842
843 let msg: serde_json::Value = match serde_json::from_str(&line) {
844 Ok(v) => v,
845 Err(_) => continue,
846 };
847
848 if msg.get("id").is_none() {
850 if let Some(method) = msg["method"].as_str() {
851 if method == "cancel" {
852 cancelled_clone.store(true, Ordering::SeqCst);
853 cancel_notify_clone.notify_waiters();
854 } else if method == "agent/resume" {
855 resume_clone.store(true, Ordering::SeqCst);
856 } else if method == "skills/update" {
857 skills_reload_clone.store(true, Ordering::SeqCst);
858 } else if method == "session/remind" {
859 let params = &msg["params"];
860 if let Ok(reminder) = queued_session_remind_from_params(params) {
861 queued_clone.push_session_reminder(reminder).await;
862 }
863 } else if method == "session/cancel_tool_call" {
864 handle_cancel_tool_call_notification(&msg["params"]);
865 }
866 }
867 continue;
868 }
869
870 if let Some(id) = msg["id"].as_u64() {
871 let mut pending = pending_clone.lock().await;
872 if let Some(sender) = pending.remove(&id) {
873 let _ = sender.send(msg);
874 }
875 }
876 }
877
878 let mut pending = pending_clone.lock().await;
880 pending.clear();
881 });
882
883 Self {
884 next_id: AtomicU64::new(1),
885 pending,
886 cancelled,
887 cancel_notify,
888 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
889 session_id: std::sync::Mutex::new(String::new()),
890 script_name: std::sync::Mutex::new(String::new()),
891 queued_transcript_injections,
892 resume_requested,
893 skills_reload_requested,
894 daemon_idle,
895 prompt_stop_reason: std::sync::Mutex::new(None),
896 visible_call_states: std::sync::Mutex::new(HashMap::new()),
897 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
898 in_process: None,
899 }
900 }
901
902 pub fn from_parts(
908 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
909 cancelled: Arc<AtomicBool>,
910 stdout_lock: Arc<std::sync::Mutex<()>>,
911 start_id: u64,
912 ) -> Self {
913 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
914 }
915
916 pub fn from_parts_with_writer(
917 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
918 cancelled: Arc<AtomicBool>,
919 writer: HostBridgeWriter,
920 start_id: u64,
921 ) -> Self {
922 Self::from_parts_with_writer_and_cancel_notify(
923 pending,
924 cancelled,
925 Arc::new(Notify::new()),
926 writer,
927 start_id,
928 )
929 }
930
931 pub fn from_parts_with_writer_and_cancel_notify(
932 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
933 cancelled: Arc<AtomicBool>,
934 cancel_notify: Arc<Notify>,
935 writer: HostBridgeWriter,
936 start_id: u64,
937 ) -> Self {
938 Self::from_parts_with_writer_cancel_notify_and_injection_state(
939 pending,
940 cancelled,
941 cancel_notify,
942 writer,
943 start_id,
944 None,
945 )
946 }
947
948 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
949 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
950 cancelled: Arc<AtomicBool>,
951 cancel_notify: Arc<Notify>,
952 writer: HostBridgeWriter,
953 start_id: u64,
954 injection_state: Option<HostBridgeInjectionState>,
955 ) -> Self {
956 Self {
957 next_id: AtomicU64::new(start_id),
958 pending,
959 cancelled,
960 cancel_notify,
961 writer,
962 session_id: std::sync::Mutex::new(String::new()),
963 script_name: std::sync::Mutex::new(String::new()),
964 queued_transcript_injections: injection_state.unwrap_or_default(),
965 resume_requested: Arc::new(AtomicBool::new(false)),
966 skills_reload_requested: Arc::new(AtomicBool::new(false)),
967 daemon_idle: Arc::new(AtomicBool::new(false)),
968 prompt_stop_reason: std::sync::Mutex::new(None),
969 visible_call_states: std::sync::Mutex::new(HashMap::new()),
970 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
971 in_process: None,
972 }
973 }
974
975 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
978 let exported_functions = vm.load_module_exports(module_path).await?;
979 Ok(Self {
980 next_id: AtomicU64::new(1),
981 pending: Arc::new(Mutex::new(HashMap::new())),
982 cancelled: Arc::new(AtomicBool::new(false)),
983 cancel_notify: Arc::new(Notify::new()),
984 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
985 session_id: std::sync::Mutex::new(String::new()),
986 script_name: std::sync::Mutex::new(String::new()),
987 queued_transcript_injections: HostBridgeInjectionState::default(),
988 resume_requested: Arc::new(AtomicBool::new(false)),
989 skills_reload_requested: Arc::new(AtomicBool::new(false)),
990 daemon_idle: Arc::new(AtomicBool::new(false)),
991 prompt_stop_reason: std::sync::Mutex::new(None),
992 visible_call_states: std::sync::Mutex::new(HashMap::new()),
993 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
994 in_process: Some(InProcessHost {
995 module_path: module_path.to_path_buf(),
996 exported_functions,
997 vm,
998 }),
999 })
1000 }
1001
1002 pub fn set_session_id(&self, id: &str) {
1004 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1005 }
1006
1007 pub fn set_script_name(&self, name: &str) {
1009 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1010 }
1011
1012 fn get_script_name(&self) -> String {
1014 self.script_name
1015 .lock()
1016 .unwrap_or_else(|e| e.into_inner())
1017 .clone()
1018 }
1019
1020 pub fn get_session_id(&self) -> String {
1022 self.session_id
1023 .lock()
1024 .unwrap_or_else(|e| e.into_inner())
1025 .clone()
1026 }
1027
1028 fn write_line(&self, line: &str) -> Result<(), VmError> {
1030 (self.writer)(line).map_err(VmError::Runtime)
1031 }
1032
1033 pub async fn call(
1036 &self,
1037 method: &str,
1038 params: serde_json::Value,
1039 ) -> Result<serde_json::Value, VmError> {
1040 if let Some(in_process) = &self.in_process {
1041 return in_process.dispatch(method, params).await;
1042 }
1043
1044 if self.is_cancelled() {
1045 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1046 }
1047
1048 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1049 let cancel_wait = self.cancel_notify.notified();
1050 tokio::pin!(cancel_wait);
1051
1052 let request = crate::jsonrpc::request(id, method, params);
1053
1054 let (tx, rx) = oneshot::channel();
1055 {
1056 let mut pending = self.pending.lock().await;
1057 pending.insert(id, tx);
1058 }
1059
1060 let line = serde_json::to_string(&request)
1061 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1062 if let Err(e) = self.write_line(&line) {
1063 let mut pending = self.pending.lock().await;
1064 pending.remove(&id);
1065 return Err(e);
1066 }
1067
1068 if self.is_cancelled() {
1069 let mut pending = self.pending.lock().await;
1070 pending.remove(&id);
1071 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1072 }
1073
1074 let response = tokio::select! {
1075 result = rx => match result {
1076 Ok(msg) => msg,
1077 Err(_) => {
1078 return Err(VmError::Runtime(
1080 "Bridge: host closed connection before responding".into(),
1081 ));
1082 }
1083 },
1084 _ = &mut cancel_wait => {
1085 let mut pending = self.pending.lock().await;
1086 pending.remove(&id);
1087 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1088 }
1089 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1090 let mut pending = self.pending.lock().await;
1091 pending.remove(&id);
1092 return Err(VmError::Runtime(format!(
1093 "Bridge: host did not respond to '{method}' within {}s",
1094 DEFAULT_TIMEOUT.as_secs()
1095 )));
1096 }
1097 };
1098
1099 if let Some(error) = response.get("error") {
1100 let message = error["message"].as_str().unwrap_or("Unknown host error");
1101 let code = error["code"].as_i64().unwrap_or(-1);
1102 if code == -32001 {
1104 return Err(VmError::CategorizedError {
1105 message: message.to_string(),
1106 category: ErrorCategory::ToolRejected,
1107 });
1108 }
1109 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1110 }
1111
1112 Ok(response["result"].clone())
1113 }
1114
1115 pub fn notify(&self, method: &str, params: serde_json::Value) {
1118 let notification = crate::jsonrpc::notification(method, params);
1119 if self.in_process.is_some() {
1120 return;
1121 }
1122 if let Ok(line) = serde_json::to_string(¬ification) {
1123 let _ = self.write_line(&line);
1124 }
1125 }
1126
1127 pub fn is_cancelled(&self) -> bool {
1129 self.cancelled.load(Ordering::SeqCst)
1130 }
1131
1132 pub fn take_resume_signal(&self) -> bool {
1133 self.resume_requested.swap(false, Ordering::SeqCst)
1134 }
1135
1136 pub fn signal_resume(&self) {
1137 self.resume_requested.store(true, Ordering::SeqCst);
1138 }
1139
1140 pub fn set_daemon_idle(&self, idle: bool) {
1141 self.daemon_idle.store(idle, Ordering::SeqCst);
1142 }
1143
1144 pub fn is_daemon_idle(&self) -> bool {
1145 self.daemon_idle.load(Ordering::SeqCst)
1146 }
1147
1148 pub fn set_prompt_stop_reason(&self, reason: &str) {
1153 *self
1154 .prompt_stop_reason
1155 .lock()
1156 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1157 }
1158
1159 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1164 self.prompt_stop_reason
1165 .lock()
1166 .unwrap_or_else(|e| e.into_inner())
1167 .take()
1168 }
1169
1170 pub fn take_skills_reload_signal(&self) -> bool {
1175 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1176 }
1177
1178 pub fn signal_skills_reload(&self) {
1182 self.skills_reload_requested.store(true, Ordering::SeqCst);
1183 }
1184
1185 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1191 let result = self.call("skills/list", serde_json::json!({})).await?;
1192 match result {
1193 serde_json::Value::Array(items) => Ok(items),
1194 serde_json::Value::Object(map) => match map.get("skills") {
1195 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1196 _ => Err(VmError::Runtime(
1197 "skills/list: host response must be an array or { skills: [...] }".into(),
1198 )),
1199 },
1200 _ => Err(VmError::Runtime(
1201 "skills/list: unexpected response shape".into(),
1202 )),
1203 }
1204 }
1205
1206 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1212 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1213 parse_host_tools_list_response(result)
1214 }
1215
1216 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1220 self.call("skills/fetch", serde_json::json!({ "id": id }))
1221 .await
1222 }
1223
1224 pub fn injection_state(&self) -> HostBridgeInjectionState {
1225 self.queued_transcript_injections.clone()
1226 }
1227
1228 pub async fn push_pending_user_message(
1229 &self,
1230 content: String,
1231 transcript_content: serde_json::Value,
1232 mode: &str,
1233 ) -> String {
1234 self.queued_transcript_injections
1235 .push_pending_user_message(content, transcript_content, mode)
1236 .await
1237 }
1238
1239 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1240 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1241 .await
1242 }
1243
1244 pub async fn revoke_pending_user_message(
1245 &self,
1246 message_id: &str,
1247 ) -> PendingUserMessageMutationResult {
1248 self.queued_transcript_injections
1249 .revoke_pending_user_message(message_id)
1250 .await
1251 }
1252
1253 pub async fn revoke_pending_reminder(
1254 &self,
1255 reminder_id: &str,
1256 ) -> PendingReminderMutationResult {
1257 self.queued_transcript_injections
1258 .revoke_pending_reminder(reminder_id)
1259 .await
1260 }
1261
1262 pub async fn pending_injections_json(&self) -> serde_json::Value {
1263 self.queued_transcript_injections
1264 .pending_injections_json()
1265 .await
1266 }
1267
1268 pub async fn replace_pending_user_message(
1269 &self,
1270 message_id: &str,
1271 content: String,
1272 transcript_content: serde_json::Value,
1273 ) -> PendingUserMessageMutationResult {
1274 self.queued_transcript_injections
1275 .replace_pending_user_message(message_id, content, transcript_content)
1276 .await
1277 }
1278
1279 pub async fn push_queued_session_remind_from_params(
1280 &self,
1281 params: &serde_json::Value,
1282 ) -> Result<String, String> {
1283 let reminder = queued_session_remind_from_params(params)?;
1284 let reminder_id = reminder.reminder.id.clone();
1285 self.queued_transcript_injections
1286 .push_session_reminder(reminder)
1287 .await;
1288 Ok(reminder_id)
1289 }
1290
1291 pub async fn take_queued_user_messages(
1292 &self,
1293 include_interrupt_immediate: bool,
1294 include_finish_step: bool,
1295 include_audit_only: bool,
1296 ) -> Vec<QueuedUserMessage> {
1297 let mut state = self.queued_transcript_injections.inner.lock().await;
1298 let mut selected = Vec::new();
1299 let mut retained = VecDeque::new();
1300 while let Some(injection) = state.queue.pop_front() {
1301 let should_take = match injection.mode() {
1302 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1303 QueuedUserMessageMode::FinishStep => include_finish_step,
1304 QueuedUserMessageMode::AuditOnly => include_audit_only,
1305 };
1306 match (should_take, injection) {
1307 (true, QueuedTranscriptInjection::User(message)) => {
1308 state
1309 .delivered_user_message_ids
1310 .insert(message.message_id.clone());
1311 selected.push(message);
1312 }
1313 (_, injection) => retained.push_back(injection),
1314 }
1315 }
1316 state.queue = retained;
1317 selected
1318 }
1319
1320 pub async fn take_queued_transcript_injections(
1321 &self,
1322 include_interrupt_immediate: bool,
1323 include_finish_step: bool,
1324 include_audit_only: bool,
1325 ) -> Vec<QueuedTranscriptInjection> {
1326 let mut state = self.queued_transcript_injections.inner.lock().await;
1327 let mut selected = Vec::new();
1328 let mut retained = VecDeque::new();
1329 while let Some(injection) = state.queue.pop_front() {
1330 let should_take = match injection.mode() {
1331 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1332 QueuedUserMessageMode::FinishStep => include_finish_step,
1333 QueuedUserMessageMode::AuditOnly => include_audit_only,
1334 };
1335 if should_take {
1336 match &injection {
1337 QueuedTranscriptInjection::User(message) => {
1338 state
1339 .delivered_user_message_ids
1340 .insert(message.message_id.clone());
1341 }
1342 QueuedTranscriptInjection::Reminder(reminder) => {
1343 state
1344 .delivered_reminder_ids
1345 .insert(reminder.reminder.id.clone());
1346 }
1347 }
1348 selected.push(injection);
1349 } else {
1350 retained.push_back(injection);
1351 }
1352 }
1353 state.queue = retained;
1354 selected
1355 }
1356
1357 pub async fn take_queued_user_messages_for(
1358 &self,
1359 checkpoint: DeliveryCheckpoint,
1360 ) -> Vec<QueuedUserMessage> {
1361 match checkpoint {
1362 DeliveryCheckpoint::InterruptImmediate => {
1363 self.take_queued_user_messages(true, false, false).await
1364 }
1365 DeliveryCheckpoint::AfterCurrentOperation => {
1366 self.take_queued_user_messages(false, true, false).await
1367 }
1368 DeliveryCheckpoint::EndOfInteraction => {
1369 self.take_queued_user_messages(false, false, true).await
1370 }
1371 }
1372 }
1373
1374 pub async fn take_queued_transcript_injections_for(
1375 &self,
1376 checkpoint: DeliveryCheckpoint,
1377 ) -> Vec<QueuedTranscriptInjection> {
1378 match checkpoint {
1379 DeliveryCheckpoint::InterruptImmediate => {
1380 self.take_queued_transcript_injections(true, false, false)
1381 .await
1382 }
1383 DeliveryCheckpoint::AfterCurrentOperation => {
1384 self.take_queued_transcript_injections(false, true, false)
1385 .await
1386 }
1387 DeliveryCheckpoint::EndOfInteraction => {
1388 self.take_queued_transcript_injections(false, false, true)
1389 .await
1390 }
1391 }
1392 }
1393
1394 pub fn send_output(&self, text: &str) {
1396 self.notify("output", serde_json::json!({"text": text}));
1397 }
1398
1399 pub fn send_progress(
1401 &self,
1402 phase: &str,
1403 message: &str,
1404 progress: Option<i64>,
1405 total: Option<i64>,
1406 data: Option<serde_json::Value>,
1407 ) {
1408 let mut payload = serde_json::json!({"phase": phase, "message": message});
1409 if let Some(p) = progress {
1410 payload["progress"] = serde_json::json!(p);
1411 }
1412 if let Some(t) = total {
1413 payload["total"] = serde_json::json!(t);
1414 }
1415 if let Some(d) = data {
1416 payload["data"] = d;
1417 }
1418 self.notify("progress", payload);
1419 }
1420
1421 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1423 let mut payload = serde_json::json!({"level": level, "message": message});
1424 if let Some(f) = fields {
1425 payload["fields"] = f;
1426 }
1427 self.notify("log", payload);
1428 }
1429
1430 pub fn send_call_start(
1433 &self,
1434 call_id: &str,
1435 call_type: &str,
1436 name: &str,
1437 metadata: serde_json::Value,
1438 ) {
1439 let session_id = self.get_session_id();
1440 let script = self.get_script_name();
1441 let stream_publicly = metadata
1442 .get("stream_publicly")
1443 .and_then(|value| value.as_bool())
1444 .unwrap_or(true);
1445 self.visible_call_streams
1446 .lock()
1447 .unwrap_or_else(|e| e.into_inner())
1448 .insert(call_id.to_string(), stream_publicly);
1449 self.notify(
1450 "session/update",
1451 serde_json::json!({
1452 "sessionId": session_id,
1453 "update": {
1454 "sessionUpdate": "call_start",
1455 "content": {
1456 "toolCallId": call_id,
1457 "call_type": call_type,
1458 "name": name,
1459 "script": script,
1460 "metadata": metadata,
1461 },
1462 },
1463 }),
1464 );
1465 }
1466
1467 pub fn send_call_progress(
1470 &self,
1471 call_id: &str,
1472 delta: &str,
1473 accumulated_tokens: u64,
1474 user_visible: bool,
1475 ) {
1476 let session_id = self.get_session_id();
1477 let (visible_text, visible_delta) = {
1478 let stream_publicly = self
1479 .visible_call_streams
1480 .lock()
1481 .unwrap_or_else(|e| e.into_inner())
1482 .get(call_id)
1483 .copied()
1484 .unwrap_or(true);
1485 let mut states = self
1486 .visible_call_states
1487 .lock()
1488 .unwrap_or_else(|e| e.into_inner());
1489 let state = states.entry(call_id.to_string()).or_default();
1490 state.push(delta, stream_publicly)
1491 };
1492 self.notify(
1493 "session/update",
1494 serde_json::json!({
1495 "sessionId": session_id,
1496 "update": {
1497 "sessionUpdate": "call_progress",
1498 "content": {
1499 "toolCallId": call_id,
1500 "delta": delta,
1501 "accumulated_tokens": accumulated_tokens,
1502 "visible_text": visible_text,
1503 "visible_delta": visible_delta,
1504 "user_visible": user_visible,
1505 },
1506 },
1507 }),
1508 );
1509 }
1510
1511 pub fn send_call_end(
1513 &self,
1514 call_id: &str,
1515 call_type: &str,
1516 name: &str,
1517 duration_ms: u64,
1518 status: &str,
1519 metadata: serde_json::Value,
1520 ) {
1521 let session_id = self.get_session_id();
1522 let script = self.get_script_name();
1523 self.visible_call_states
1524 .lock()
1525 .unwrap_or_else(|e| e.into_inner())
1526 .remove(call_id);
1527 self.visible_call_streams
1528 .lock()
1529 .unwrap_or_else(|e| e.into_inner())
1530 .remove(call_id);
1531 self.notify(
1532 "session/update",
1533 serde_json::json!({
1534 "sessionId": session_id,
1535 "update": {
1536 "sessionUpdate": "call_end",
1537 "content": {
1538 "toolCallId": call_id,
1539 "call_type": call_type,
1540 "name": name,
1541 "script": script,
1542 "duration_ms": duration_ms,
1543 "status": status,
1544 "metadata": metadata,
1545 },
1546 },
1547 }),
1548 );
1549 }
1550
1551 pub fn send_worker_update(
1553 &self,
1554 worker_id: &str,
1555 worker_name: &str,
1556 status: &str,
1557 metadata: serde_json::Value,
1558 audit: Option<&MutationSessionRecord>,
1559 ) {
1560 let session_id = self.get_session_id();
1561 let script = self.get_script_name();
1562 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1563 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1564 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1565 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1566 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1567 let lifecycle = serde_json::json!({
1568 "event": status,
1569 "worker_id": worker_id,
1570 "worker_name": worker_name,
1571 "started_at": started_at,
1572 "finished_at": finished_at,
1573 });
1574 self.notify(
1575 "session/update",
1576 serde_json::json!({
1577 "sessionId": session_id,
1578 "update": {
1579 "sessionUpdate": "worker_update",
1580 "content": {
1581 "worker_id": worker_id,
1582 "worker_name": worker_name,
1583 "status": status,
1584 "script": script,
1585 "started_at": started_at,
1586 "finished_at": finished_at,
1587 "snapshot_path": snapshot_path,
1588 "run_id": run_id,
1589 "run_path": run_path,
1590 "lifecycle": lifecycle,
1591 "audit": audit,
1592 "metadata": metadata,
1593 },
1594 },
1595 }),
1596 );
1597 }
1598}
1599
1600pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1602 crate::stdlib::json_to_vm_value(val)
1603}
1604
1605fn parse_host_tools_list_response(
1606 result: serde_json::Value,
1607) -> Result<Vec<serde_json::Value>, VmError> {
1608 let tools = match result {
1609 serde_json::Value::Array(items) => items,
1610 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1611 map.get("result")
1612 .and_then(|value| value.get("tools"))
1613 .cloned()
1614 }) {
1615 Some(serde_json::Value::Array(items)) => items,
1616 _ => {
1617 return Err(VmError::Runtime(
1618 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1619 ));
1620 }
1621 },
1622 _ => {
1623 return Err(VmError::Runtime(
1624 "host/tools/list: unexpected response shape".into(),
1625 ));
1626 }
1627 };
1628
1629 let mut normalized = Vec::with_capacity(tools.len());
1630 for tool in tools {
1631 let serde_json::Value::Object(map) = tool else {
1632 return Err(VmError::Runtime(
1633 "host/tools/list: every tool must be an object".into(),
1634 ));
1635 };
1636 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1637 return Err(VmError::Runtime(
1638 "host/tools/list: every tool must include a string `name`".into(),
1639 ));
1640 };
1641 let description = map
1642 .get("description")
1643 .and_then(|value| value.as_str())
1644 .or_else(|| {
1645 map.get("short_description")
1646 .and_then(|value| value.as_str())
1647 })
1648 .unwrap_or_default();
1649 let schema = map
1650 .get("schema")
1651 .cloned()
1652 .or_else(|| map.get("parameters").cloned())
1653 .or_else(|| map.get("input_schema").cloned())
1654 .unwrap_or(serde_json::Value::Null);
1655 let deprecated = map
1656 .get("deprecated")
1657 .and_then(|value| value.as_bool())
1658 .unwrap_or(false);
1659 normalized.push(serde_json::json!({
1660 "name": name,
1661 "description": description,
1662 "schema": schema,
1663 "deprecated": deprecated,
1664 }));
1665 }
1666 Ok(normalized)
1667}
1668
1669#[cfg(test)]
1670mod tests {
1671 use super::*;
1672
1673 fn test_bridge() -> HostBridge {
1674 HostBridge::from_parts(
1675 Arc::new(Mutex::new(HashMap::new())),
1676 Arc::new(AtomicBool::new(false)),
1677 Arc::new(std::sync::Mutex::new(())),
1678 1,
1679 )
1680 }
1681
1682 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1683 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1684 Arc::new(Mutex::new(HashMap::new())),
1685 Arc::new(AtomicBool::new(false)),
1686 Arc::new(Notify::new()),
1687 Arc::new(|_| Ok(())),
1688 100,
1689 Some(owner.injection_state()),
1690 )
1691 }
1692
1693 #[test]
1694 fn test_json_rpc_request_format() {
1695 let request = crate::jsonrpc::request(
1696 1,
1697 "llm_call",
1698 serde_json::json!({
1699 "prompt": "Hello",
1700 "system": "Be helpful",
1701 }),
1702 );
1703 let s = serde_json::to_string(&request).unwrap();
1704 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1705 assert!(s.contains("\"id\":1"));
1706 assert!(s.contains("\"method\":\"llm_call\""));
1707 }
1708
1709 #[test]
1710 fn test_json_rpc_notification_format() {
1711 let notification =
1712 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1713 let s = serde_json::to_string(¬ification).unwrap();
1714 assert!(s.contains("\"method\":\"output\""));
1715 assert!(!s.contains("\"id\""));
1716 }
1717
1718 #[test]
1719 fn test_json_rpc_error_response_parsing() {
1720 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1721 assert!(response.get("error").is_some());
1722 assert_eq!(
1723 response["error"]["message"].as_str().unwrap(),
1724 "Invalid request"
1725 );
1726 }
1727
1728 #[test]
1729 fn test_json_rpc_success_response_parsing() {
1730 let response = crate::jsonrpc::response(
1731 1,
1732 serde_json::json!({
1733 "text": "Hello world",
1734 "input_tokens": 10,
1735 "output_tokens": 5,
1736 }),
1737 );
1738 assert!(response.get("result").is_some());
1739 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1740 }
1741
1742 #[test]
1743 fn test_cancelled_flag() {
1744 let cancelled = Arc::new(AtomicBool::new(false));
1745 assert!(!cancelled.load(Ordering::SeqCst));
1746 cancelled.store(true, Ordering::SeqCst);
1747 assert!(cancelled.load(Ordering::SeqCst));
1748 }
1749
1750 #[test]
1751 fn pending_host_calls_return_when_cancellation_arrives() {
1752 let runtime = tokio::runtime::Builder::new_current_thread()
1753 .enable_all()
1754 .build()
1755 .unwrap();
1756 runtime.block_on(async {
1757 let pending = Arc::new(Mutex::new(HashMap::new()));
1758 let cancelled = Arc::new(AtomicBool::new(false));
1759 let bridge = HostBridge::from_parts_with_writer(
1760 pending.clone(),
1761 cancelled.clone(),
1762 Arc::new(|_| Ok(())),
1763 1,
1764 );
1765
1766 let call = bridge.call("host/work", serde_json::json!({}));
1767 tokio::pin!(call);
1768
1769 loop {
1770 tokio::select! {
1771 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1772 _ = tokio::task::yield_now() => {}
1773 }
1774 if !pending.lock().await.is_empty() {
1775 break;
1776 }
1777 }
1778
1779 cancelled.store(true, Ordering::SeqCst);
1780 bridge.cancel_notify.notify_waiters();
1781
1782 let result = tokio::time::timeout(Duration::from_secs(1), call)
1783 .await
1784 .expect("pending call should observe cancellation promptly");
1785 assert!(
1786 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1787 );
1788 assert!(pending.lock().await.is_empty());
1789 });
1790 }
1791
1792 #[test]
1793 fn queued_messages_are_filtered_by_delivery_mode() {
1794 let runtime = tokio::runtime::Builder::new_current_thread()
1795 .enable_all()
1796 .build()
1797 .unwrap();
1798 runtime.block_on(async {
1799 let bridge = test_bridge();
1800 bridge
1801 .push_queued_user_message("first".to_string(), "finish_step")
1802 .await;
1803 bridge
1804 .push_queued_user_message("second".to_string(), "audit_only")
1805 .await;
1806
1807 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1808 assert_eq!(finish_step.len(), 1);
1809 assert_eq!(finish_step[0].content, "first");
1810
1811 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1812 assert_eq!(audit_only.len(), 1);
1813 assert_eq!(audit_only[0].content, "second");
1814 });
1815 }
1816
1817 #[test]
1818 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1819 let runtime = tokio::runtime::Builder::new_current_thread()
1820 .enable_all()
1821 .build()
1822 .unwrap();
1823 runtime.block_on(async {
1824 let bridge = test_bridge();
1825 let first_id = bridge
1826 .push_pending_user_message(
1827 "first".to_string(),
1828 serde_json::json!("first"),
1829 "audit_only",
1830 )
1831 .await;
1832 let second_id = bridge
1833 .push_pending_user_message(
1834 "second".to_string(),
1835 serde_json::json!("second"),
1836 "audit_only",
1837 )
1838 .await;
1839
1840 assert_eq!(
1841 bridge
1842 .replace_pending_user_message(
1843 &second_id,
1844 "second edited".to_string(),
1845 serde_json::json!("second edited"),
1846 )
1847 .await,
1848 PendingUserMessageMutationResult::Mutated
1849 );
1850 assert_eq!(
1851 bridge.revoke_pending_user_message(&first_id).await,
1852 PendingUserMessageMutationResult::Mutated
1853 );
1854 assert_eq!(
1855 bridge.revoke_pending_user_message(&first_id).await,
1856 PendingUserMessageMutationResult::AlreadyRevoked
1857 );
1858
1859 let delivered = bridge
1860 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1861 .await;
1862 assert_eq!(delivered.len(), 1);
1863 assert_eq!(delivered[0].message_id, second_id);
1864 assert_eq!(delivered[0].content, "second edited");
1865
1866 assert_eq!(
1867 bridge.revoke_pending_user_message(&second_id).await,
1868 PendingUserMessageMutationResult::AlreadyDelivered
1869 );
1870 assert_eq!(
1871 bridge
1872 .replace_pending_user_message(
1873 &second_id,
1874 "too late".to_string(),
1875 serde_json::json!("too late"),
1876 )
1877 .await,
1878 PendingUserMessageMutationResult::AlreadyDelivered
1879 );
1880 assert_eq!(
1881 bridge.revoke_pending_user_message("missing").await,
1882 PendingUserMessageMutationResult::UnknownMessageId
1883 );
1884 });
1885 }
1886
1887 #[test]
1888 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1889 let runtime = tokio::runtime::Builder::new_current_thread()
1890 .enable_all()
1891 .build()
1892 .unwrap();
1893 runtime.block_on(async {
1894 let bridge = test_bridge();
1895 let first_id = bridge
1896 .push_pending_user_message(
1897 "first".to_string(),
1898 serde_json::json!("first"),
1899 "finish_step",
1900 )
1901 .await;
1902 let second_id = bridge
1903 .push_pending_user_message(
1904 "second".to_string(),
1905 serde_json::json!("second"),
1906 "finish_step",
1907 )
1908 .await;
1909 assert_eq!(
1910 bridge
1911 .replace_pending_user_message(
1912 &first_id,
1913 "first edited".to_string(),
1914 serde_json::json!("first edited"),
1915 )
1916 .await,
1917 PendingUserMessageMutationResult::Mutated
1918 );
1919
1920 let delivered = bridge
1921 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1922 .await;
1923 assert_eq!(
1924 delivered
1925 .iter()
1926 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1927 .collect::<Vec<_>>(),
1928 vec![
1929 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1930 (&second_id, "second", QueuedUserMessageMode::FinishStep),
1931 ]
1932 );
1933 });
1934 }
1935
1936 #[test]
1937 fn pending_user_message_state_survives_bridge_replacement() {
1938 let runtime = tokio::runtime::Builder::new_current_thread()
1939 .enable_all()
1940 .build()
1941 .unwrap();
1942 runtime.block_on(async {
1943 let bridge = test_bridge();
1944 let revoked_id = bridge
1945 .push_pending_user_message(
1946 "revoke me".to_string(),
1947 serde_json::json!("revoke me"),
1948 "audit_only",
1949 )
1950 .await;
1951 let delivered_id = bridge
1952 .push_pending_user_message(
1953 "deliver me".to_string(),
1954 serde_json::json!("deliver me"),
1955 "audit_only",
1956 )
1957 .await;
1958 assert_eq!(
1959 bridge.revoke_pending_user_message(&revoked_id).await,
1960 PendingUserMessageMutationResult::Mutated
1961 );
1962 bridge.cancelled.store(true, Ordering::SeqCst);
1963
1964 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1965 assert_eq!(
1966 replacement_bridge
1967 .revoke_pending_user_message(&revoked_id)
1968 .await,
1969 PendingUserMessageMutationResult::AlreadyRevoked
1970 );
1971 let delivered = replacement_bridge
1972 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1973 .await;
1974 assert_eq!(delivered.len(), 1);
1975 assert_eq!(delivered[0].message_id, delivered_id);
1976 assert_eq!(delivered[0].content, "deliver me");
1977 assert_eq!(
1978 bridge.revoke_pending_user_message(&delivered_id).await,
1979 PendingUserMessageMutationResult::AlreadyDelivered
1980 );
1981 });
1982 }
1983
1984 #[test]
1985 fn queued_transcript_injections_preserve_user_reminder_separation() {
1986 let runtime = tokio::runtime::Builder::new_current_thread()
1987 .enable_all()
1988 .build()
1989 .unwrap();
1990 runtime.block_on(async {
1991 let bridge = test_bridge();
1992 bridge
1993 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1994 .await;
1995 let reminder_id = bridge
1996 .push_queued_session_remind_from_params(&serde_json::json!({
1997 "body": "Host-provided ambient context.",
1998 "tags": ["host"],
1999 "dedupe_key": "host-context",
2000 "ttl_turns": 2,
2001 "mode": "audit_only",
2002 "_meta": {"harn": {"source": "test"}},
2003 }))
2004 .await
2005 .expect("valid reminder");
2006
2007 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2008 assert_eq!(finish_step.len(), 1);
2009 assert_eq!(finish_step[0].content, "human follow-up");
2010
2011 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2012 assert!(no_user_messages.is_empty());
2013
2014 let injections = bridge
2015 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2016 .await;
2017 assert_eq!(injections.len(), 1);
2018 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2019 panic!("expected queued reminder");
2020 };
2021 assert_eq!(reminder.reminder.id, reminder_id);
2022 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2023 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2024 assert_eq!(
2025 reminder.reminder.dedupe_key.as_deref(),
2026 Some("host-context")
2027 );
2028 assert_eq!(reminder.reminder.ttl_turns, Some(2));
2029 assert_eq!(
2030 reminder.reminder.source,
2031 crate::llm::helpers::ReminderSource::Bridge
2032 );
2033 });
2034 }
2035
2036 #[test]
2037 fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2038 let runtime = tokio::runtime::Builder::new_current_thread()
2039 .enable_all()
2040 .build()
2041 .unwrap();
2042 runtime.block_on(async {
2043 let bridge = test_bridge();
2044 let message_id = bridge
2045 .push_pending_user_message(
2046 "human follow-up".to_string(),
2047 serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2048 "finish_step",
2049 )
2050 .await;
2051 let reminder_id = bridge
2052 .push_queued_session_remind_from_params(&serde_json::json!({
2053 "id": "rem-test",
2054 "body": "Host reminder",
2055 "tags": ["host"],
2056 "dedupe_key": "host-reminder",
2057 "ttl_turns": 2,
2058 "mode": "interrupt_immediate",
2059 }))
2060 .await
2061 .expect("valid session/remind payload");
2062
2063 let pending = bridge.pending_injections_json().await;
2064 assert_eq!(pending["pendingCount"], 2);
2065 assert_eq!(pending["injections"][0]["kind"], "user");
2066 assert_eq!(pending["injections"][0]["id"], message_id);
2067 assert_eq!(pending["injections"][0]["messageId"], message_id);
2068 assert_eq!(pending["injections"][0]["mode"], "finish_step");
2069 assert_eq!(pending["injections"][0]["position"], 0);
2070 assert_eq!(pending["injections"][1]["kind"], "reminder");
2071 assert_eq!(pending["injections"][1]["id"], reminder_id);
2072 assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2073 assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2074 assert_eq!(pending["injections"][1]["body"], "Host reminder");
2075 assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2076 assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2077 assert_eq!(pending["injections"][1]["position"], 1);
2078 });
2079 }
2080
2081 #[test]
2082 fn pending_reminders_support_revoke_and_delivery_states() {
2083 let runtime = tokio::runtime::Builder::new_current_thread()
2084 .enable_all()
2085 .build()
2086 .unwrap();
2087 runtime.block_on(async {
2088 let bridge = test_bridge();
2089 let revoked_id = bridge
2090 .push_queued_session_remind_from_params(&serde_json::json!({
2091 "id": "rem-revoke",
2092 "body": "remove me",
2093 "mode": "finish_step",
2094 }))
2095 .await
2096 .expect("valid session/remind payload");
2097 let delivered_id = bridge
2098 .push_queued_session_remind_from_params(&serde_json::json!({
2099 "id": "rem-deliver",
2100 "body": "deliver me",
2101 "mode": "finish_step",
2102 }))
2103 .await
2104 .expect("valid session/remind payload");
2105
2106 assert_eq!(
2107 bridge.revoke_pending_reminder(&revoked_id).await,
2108 PendingReminderMutationResult::Mutated
2109 );
2110 assert_eq!(
2111 bridge.revoke_pending_reminder(&revoked_id).await,
2112 PendingReminderMutationResult::AlreadyRevoked
2113 );
2114
2115 let pending = bridge.pending_injections_json().await;
2116 assert_eq!(pending["pendingCount"], 1);
2117 assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2118
2119 let delivered = bridge
2120 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2121 .await;
2122 assert_eq!(delivered.len(), 1);
2123 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2124 panic!("expected delivered reminder");
2125 };
2126 assert_eq!(reminder.reminder.id, delivered_id);
2127
2128 assert_eq!(
2129 bridge.revoke_pending_reminder(&delivered_id).await,
2130 PendingReminderMutationResult::AlreadyDelivered
2131 );
2132 assert_eq!(
2133 bridge.revoke_pending_reminder("missing").await,
2134 PendingReminderMutationResult::UnknownReminderId
2135 );
2136 });
2137 }
2138
2139 #[test]
2140 fn bridge_remind_modes_honor_delivery_checkpoints() {
2141 let runtime = tokio::runtime::Builder::new_current_thread()
2142 .enable_all()
2143 .build()
2144 .unwrap();
2145 runtime.block_on(async {
2146 let cases = [
2147 (
2148 "interrupt_immediate",
2149 DeliveryCheckpoint::InterruptImmediate,
2150 DeliveryCheckpoint::AfterCurrentOperation,
2151 ),
2152 (
2153 "finish_step",
2154 DeliveryCheckpoint::AfterCurrentOperation,
2155 DeliveryCheckpoint::EndOfInteraction,
2156 ),
2157 (
2158 "audit_only",
2159 DeliveryCheckpoint::EndOfInteraction,
2160 DeliveryCheckpoint::InterruptImmediate,
2161 ),
2162 ];
2163
2164 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2165 let bridge = test_bridge();
2166 bridge
2167 .push_queued_session_remind_from_params(&serde_json::json!({
2168 "body": format!("Reminder for {mode}"),
2169 "mode": mode,
2170 }))
2171 .await
2172 .expect("valid session/remind payload");
2173
2174 let premature = bridge
2175 .take_queued_transcript_injections_for(wrong_checkpoint)
2176 .await;
2177 assert!(
2178 premature.is_empty(),
2179 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2180 );
2181
2182 let delivered = bridge
2183 .take_queued_transcript_injections_for(expected_checkpoint)
2184 .await;
2185 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2186 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2187 panic!("expected reminder for {mode}");
2188 };
2189 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2190 }
2191 });
2192 }
2193
2194 #[test]
2195 fn session_remind_validation_rejects_user_message_shape() {
2196 let err = queued_session_remind_from_params(&serde_json::json!({
2197 "content": "this is still a user message",
2198 "mode": "interrupt_immediate",
2199 }))
2200 .expect_err("session/remind must require a reminder body");
2201 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2202 assert!(err.contains("body"));
2203 }
2204
2205 #[test]
2206 fn session_remind_validation_rejects_unknown_options_separately() {
2207 let err = queued_session_remind_from_params(&serde_json::json!({
2208 "body": "valid body",
2209 "unknown_host_field": true,
2210 }))
2211 .expect_err("session/remind must reject unknown top-level fields");
2212 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2213 assert!(err.contains("unknown_host_field"));
2214 }
2215
2216 #[test]
2217 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2218 let err = queued_session_remind_from_params(&serde_json::json!({
2219 "body": "valid body",
2220 "propagate": "workspace",
2221 }))
2222 .expect_err("session/remind must reject unknown propagate values");
2223 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2224 assert!(err.contains("propagate"));
2225 }
2226
2227 #[test]
2228 fn test_json_result_to_vm_value_string() {
2229 let val = serde_json::json!("hello");
2230 let vm_val = json_result_to_vm_value(&val);
2231 assert_eq!(vm_val.display(), "hello");
2232 }
2233
2234 #[test]
2235 fn test_json_result_to_vm_value_dict() {
2236 let val = serde_json::json!({"name": "test", "count": 42});
2237 let vm_val = json_result_to_vm_value(&val);
2238 let VmValue::Dict(d) = &vm_val else {
2239 unreachable!("Expected Dict, got {:?}", vm_val);
2240 };
2241 assert_eq!(d.get("name").unwrap().display(), "test");
2242 assert_eq!(d.get("count").unwrap().display(), "42");
2243 }
2244
2245 #[test]
2246 fn test_json_result_to_vm_value_null() {
2247 let val = serde_json::json!(null);
2248 let vm_val = json_result_to_vm_value(&val);
2249 assert!(matches!(vm_val, VmValue::Nil));
2250 }
2251
2252 #[test]
2253 fn test_json_result_to_vm_value_nested() {
2254 let val = serde_json::json!({
2255 "text": "response",
2256 "tool_calls": [
2257 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2258 ],
2259 "input_tokens": 100,
2260 "output_tokens": 50,
2261 });
2262 let vm_val = json_result_to_vm_value(&val);
2263 let VmValue::Dict(d) = &vm_val else {
2264 unreachable!("Expected Dict, got {:?}", vm_val);
2265 };
2266 assert_eq!(d.get("text").unwrap().display(), "response");
2267 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2268 unreachable!("Expected List for tool_calls");
2269 };
2270 assert_eq!(list.len(), 1);
2271 }
2272
2273 #[test]
2274 fn parse_host_tools_list_accepts_object_wrapper() {
2275 let tools = parse_host_tools_list_response(serde_json::json!({
2276 "tools": [
2277 {
2278 "name": "Read",
2279 "description": "Read a file",
2280 "schema": {"type": "object"},
2281 }
2282 ]
2283 }))
2284 .expect("tool list");
2285
2286 assert_eq!(tools.len(), 1);
2287 assert_eq!(tools[0]["name"], "Read");
2288 assert_eq!(tools[0]["deprecated"], false);
2289 }
2290
2291 #[test]
2292 fn parse_host_tools_list_accepts_compat_fields() {
2293 let tools = parse_host_tools_list_response(serde_json::json!({
2294 "result": {
2295 "tools": [
2296 {
2297 "name": "Edit",
2298 "short_description": "Apply an edit",
2299 "input_schema": {"type": "object"},
2300 "deprecated": true,
2301 }
2302 ]
2303 }
2304 }))
2305 .expect("tool list");
2306
2307 assert_eq!(tools[0]["description"], "Apply an edit");
2308 assert_eq!(tools[0]["schema"]["type"], "object");
2309 assert_eq!(tools[0]["deprecated"], true);
2310 }
2311
2312 #[test]
2313 fn parse_host_tools_list_requires_tool_names() {
2314 let err = parse_host_tools_list_response(serde_json::json!({
2315 "tools": [
2316 {"description": "missing name"}
2317 ]
2318 }))
2319 .expect_err("expected error");
2320 assert!(err
2321 .to_string()
2322 .contains("host/tools/list: every tool must include a string `name`"));
2323 }
2324
2325 #[test]
2326 fn test_timeout_duration() {
2327 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2328 }
2329}