1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32 Arc::new(move |line: &str| {
33 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34 let mut stdout = std::io::stdout().lock();
35 stdout
36 .write_all(line.as_bytes())
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .write_all(b"\n")
40 .map_err(|e| format!("Bridge write error: {e}"))?;
41 stdout
42 .flush()
43 .map_err(|e| format!("Bridge flush error: {e}"))?;
44 Ok(())
45 })
46}
47
48pub struct HostBridge {
55 next_id: AtomicU64,
56 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58 cancelled: Arc<AtomicBool>,
60 cancel_notify: Arc<Notify>,
62 writer: HostBridgeWriter,
64 session_id: std::sync::Mutex<String>,
66 script_name: std::sync::Mutex<String>,
68 queued_transcript_injections: HostBridgeInjectionState,
70 resume_requested: Arc<AtomicBool>,
72 skills_reload_requested: Arc<AtomicBool>,
77 daemon_idle: Arc<AtomicBool>,
79 prompt_stop_reason: std::sync::Mutex<Option<String>>,
85 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89 in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94 module_path: PathBuf,
95 exported_functions: BTreeMap<String, Arc<VmClosure>>,
96 vm: Vm,
97}
98
99impl InProcessHost {
100 fn dispatch<'a>(
106 &'a self,
107 method: &'a str,
108 params: serde_json::Value,
109 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110 Box::pin(async move {
111 match method {
112 "builtin_call" => {
113 let name = params
114 .get("name")
115 .and_then(|value| value.as_str())
116 .unwrap_or_default();
117 let args = params
118 .get("args")
119 .and_then(|value| value.as_array())
120 .cloned()
121 .unwrap_or_default()
122 .into_iter()
123 .map(|value| json_result_to_vm_value(&value))
124 .collect::<Vec<_>>();
125 self.invoke_export(name, &args).await
126 }
127 "host/tools/list" => self
128 .invoke_optional_export("host_tools_list", &[])
129 .await
130 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131 "session/request_permission" => self.request_permission(params).await,
132 other => Err(VmError::Runtime(format!(
133 "playground host backend does not implement bridge method '{other}'"
134 ))),
135 }
136 })
137 }
138
139 async fn invoke_export(
140 &self,
141 name: &str,
142 args: &[VmValue],
143 ) -> Result<serde_json::Value, VmError> {
144 let Some(closure) = self.exported_functions.get(name) else {
145 return Err(VmError::Runtime(format!(
146 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147 self.module_path.display()
148 )));
149 };
150
151 let mut vm = self.vm.child_vm_for_host();
152 let result = vm.call_closure_pub(closure, args).await?;
153 Ok(crate::llm::vm_value_to_json(&result))
154 }
155
156 async fn invoke_optional_export(
157 &self,
158 name: &str,
159 args: &[VmValue],
160 ) -> Result<Option<serde_json::Value>, VmError> {
161 if !self.exported_functions.contains_key(name) {
162 return Ok(None);
163 }
164 self.invoke_export(name, args).await.map(Some)
165 }
166
167 async fn request_permission(
168 &self,
169 params: serde_json::Value,
170 ) -> Result<serde_json::Value, VmError> {
171 let Some(closure) = self.exported_functions.get("request_permission") else {
174 return Ok(crate::llm::acp_permission::allow_response());
175 };
176
177 let tool_call = params.get("toolCall");
178 let tool_name = tool_call
179 .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182 .and_then(|value| value.as_str())
183 .unwrap_or_default();
184 let tool_args = tool_call
185 .and_then(|tool_call| tool_call.get("rawInput"))
186 .map(json_result_to_vm_value)
187 .unwrap_or(VmValue::Nil);
188 let full_payload = json_result_to_vm_value(¶ms);
189
190 let arg_count = closure.func.params.len();
191 let args = if arg_count >= 3 {
192 vec![
193 VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
194 tool_args,
195 full_payload,
196 ]
197 } else if arg_count == 2 {
198 vec![
199 VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
200 tool_args,
201 ]
202 } else if arg_count == 1 {
203 vec![full_payload]
204 } else {
205 Vec::new()
206 };
207
208 let mut vm = self.vm.child_vm_for_host();
209 let result = vm.call_closure_pub(closure, &args).await?;
210 let payload = match result {
215 VmValue::Bool(granted) => {
216 if granted {
217 crate::llm::acp_permission::allow_response()
218 } else {
219 crate::llm::acp_permission::reject_response(None)
220 }
221 }
222 VmValue::String(reason) if !reason.is_empty() => {
223 crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224 }
225 other => {
226 let json = crate::llm::vm_value_to_json(&other);
227 if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228 if granted {
229 crate::llm::acp_permission::allow_response()
230 } else {
231 crate::llm::acp_permission::reject_response(
232 json.get("reason")
233 .and_then(|value| value.as_str())
234 .map(str::to_string),
235 )
236 }
237 } else if json.get("outcome").is_some() {
238 json
240 } else if other.is_truthy() {
241 crate::llm::acp_permission::allow_response()
242 } else {
243 crate::llm::acp_permission::reject_response(None)
244 }
245 }
246 };
247 Ok(payload)
248 }
249}
250
251#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260pub enum QueuedUserMessageMode {
261 InterruptImmediate,
262 FinishStep,
263 AuditOnly,
264}
265
266#[derive(Clone, Copy, Debug, PartialEq, Eq)]
267pub enum DeliveryCheckpoint {
268 InterruptImmediate,
269 AfterCurrentOperation,
270 EndOfInteraction,
271}
272
273impl QueuedUserMessageMode {
274 fn from_str(value: &str) -> Self {
275 match value {
276 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
277 "finish_step" | "after_current_operation" | "steer" => Self::FinishStep,
281 "queue" => Self::AuditOnly,
283 _ => Self::AuditOnly,
288 }
289 }
290
291 fn as_str(self) -> &'static str {
292 match self {
293 Self::InterruptImmediate => "interrupt_immediate",
294 Self::FinishStep => "finish_step",
295 Self::AuditOnly => "audit_only",
296 }
297 }
298}
299
300#[derive(Clone, Debug, PartialEq, Eq)]
301pub struct QueuedUserMessage {
302 pub message_id: String,
303 pub content: String,
304 pub transcript_content: serde_json::Value,
305 pub mode: QueuedUserMessageMode,
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct QueuedReminder {
310 pub reminder: crate::llm::helpers::SystemReminder,
311 pub mode: QueuedUserMessageMode,
312}
313
314#[derive(Clone, Debug, PartialEq, Eq)]
315pub enum QueuedTranscriptInjection {
316 User(QueuedUserMessage),
317 Reminder(QueuedReminder),
318}
319
320#[derive(Debug, Default)]
321struct QueuedTranscriptInjections {
322 queue: VecDeque<QueuedTranscriptInjection>,
323 revoked_user_message_ids: HashSet<String>,
324 delivered_user_message_ids: HashSet<String>,
325 revoked_reminder_ids: HashSet<String>,
326 delivered_reminder_ids: HashSet<String>,
327}
328
329#[derive(Clone, Debug, Default)]
330pub struct HostBridgeInjectionState {
331 inner: Arc<Mutex<QueuedTranscriptInjections>>,
332}
333
334#[derive(Clone, Copy, Debug, PartialEq, Eq)]
335pub enum PendingUserMessageMutationResult {
336 Mutated,
337 AlreadyRevoked,
338 AlreadyDelivered,
339 UnknownMessageId,
340}
341
342impl QueuedTranscriptInjection {
343 fn mode(&self) -> QueuedUserMessageMode {
344 match self {
345 Self::User(message) => message.mode,
346 Self::Reminder(reminder) => reminder.mode,
347 }
348 }
349
350 fn pending_json(&self, position: usize) -> serde_json::Value {
351 match self {
352 Self::User(message) => serde_json::json!({
353 "kind": "user",
354 "id": message.message_id,
355 "messageId": message.message_id,
356 "mode": message.mode.as_str(),
357 "position": position,
358 "content": message.transcript_content,
359 }),
360 Self::Reminder(reminder) => serde_json::json!({
361 "kind": "reminder",
362 "id": reminder.reminder.id,
363 "reminderId": reminder.reminder.id,
364 "mode": reminder.mode.as_str(),
365 "position": position,
366 "body": reminder.reminder.body,
367 "tags": reminder.reminder.tags,
368 "dedupeKey": reminder.reminder.dedupe_key,
369 "ttlTurns": reminder.reminder.ttl_turns,
370 "preserveOnCompact": reminder.reminder.preserve_on_compact,
371 "propagate": reminder.reminder.propagate.as_str(),
372 "roleHint": reminder.reminder.role_hint.as_str(),
373 "source": reminder.reminder.source.as_str(),
374 "firedAtTurn": reminder.reminder.fired_at_turn,
375 "originatingAgentId": reminder.reminder.originating_agent_id,
376 }),
377 }
378 }
379}
380
381#[derive(Clone, Copy, Debug, PartialEq, Eq)]
382pub enum PendingReminderMutationResult {
383 Mutated,
384 AlreadyRevoked,
385 AlreadyDelivered,
386 UnknownReminderId,
387}
388
389fn new_inject_message_id() -> String {
390 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
391}
392
393impl HostBridgeInjectionState {
394 pub fn new() -> Self {
395 Self::default()
396 }
397
398 pub async fn push_pending_user_message(
399 &self,
400 content: String,
401 transcript_content: serde_json::Value,
402 mode: &str,
403 ) -> String {
404 let message_id = new_inject_message_id();
405 self.inner
406 .lock()
407 .await
408 .queue
409 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
410 message_id: message_id.clone(),
411 content,
412 transcript_content,
413 mode: QueuedUserMessageMode::from_str(mode),
414 }));
415 message_id
416 }
417
418 pub async fn revoke_pending_user_message(
419 &self,
420 message_id: &str,
421 ) -> PendingUserMessageMutationResult {
422 let mut state = self.inner.lock().await;
423 let mut retained = VecDeque::new();
424 let mut revoked = false;
425 while let Some(injection) = state.queue.pop_front() {
426 match &injection {
427 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
428 revoked = true;
429 }
430 _ => retained.push_back(injection),
431 }
432 }
433 state.queue = retained;
434 if revoked {
435 state
436 .revoked_user_message_ids
437 .insert(message_id.to_string());
438 return PendingUserMessageMutationResult::Mutated;
439 }
440 if state.revoked_user_message_ids.contains(message_id) {
441 PendingUserMessageMutationResult::AlreadyRevoked
442 } else if state.delivered_user_message_ids.contains(message_id) {
443 PendingUserMessageMutationResult::AlreadyDelivered
444 } else {
445 PendingUserMessageMutationResult::UnknownMessageId
446 }
447 }
448
449 pub async fn revoke_pending_reminder(
450 &self,
451 reminder_id: &str,
452 ) -> PendingReminderMutationResult {
453 let mut state = self.inner.lock().await;
454 let mut retained = VecDeque::new();
455 let mut revoked = false;
456 while let Some(injection) = state.queue.pop_front() {
457 match &injection {
458 QueuedTranscriptInjection::Reminder(reminder)
459 if reminder.reminder.id == reminder_id =>
460 {
461 revoked = true;
462 }
463 _ => retained.push_back(injection),
464 }
465 }
466 state.queue = retained;
467 if revoked {
468 state.revoked_reminder_ids.insert(reminder_id.to_string());
469 return PendingReminderMutationResult::Mutated;
470 }
471 if state.revoked_reminder_ids.contains(reminder_id) {
472 PendingReminderMutationResult::AlreadyRevoked
473 } else if state.delivered_reminder_ids.contains(reminder_id) {
474 PendingReminderMutationResult::AlreadyDelivered
475 } else {
476 PendingReminderMutationResult::UnknownReminderId
477 }
478 }
479
480 pub async fn replace_pending_user_message(
481 &self,
482 message_id: &str,
483 content: String,
484 transcript_content: serde_json::Value,
485 ) -> PendingUserMessageMutationResult {
486 let mut state = self.inner.lock().await;
487 for injection in &mut state.queue {
488 if let QueuedTranscriptInjection::User(message) = injection {
489 if message.message_id == message_id {
490 message.content = content;
491 message.transcript_content = transcript_content;
492 return PendingUserMessageMutationResult::Mutated;
493 }
494 }
495 }
496 if state.revoked_user_message_ids.contains(message_id) {
497 PendingUserMessageMutationResult::AlreadyRevoked
498 } else if state.delivered_user_message_ids.contains(message_id) {
499 PendingUserMessageMutationResult::AlreadyDelivered
500 } else {
501 PendingUserMessageMutationResult::UnknownMessageId
502 }
503 }
504
505 async fn push_session_reminder(&self, reminder: QueuedReminder) {
506 self.inner
507 .lock()
508 .await
509 .queue
510 .push_back(QueuedTranscriptInjection::Reminder(reminder));
511 }
512
513 pub async fn pending_injections_json(&self) -> serde_json::Value {
514 let state = self.inner.lock().await;
515 let injections = state
516 .queue
517 .iter()
518 .enumerate()
519 .map(|(position, injection)| injection.pending_json(position))
520 .collect::<Vec<_>>();
521 serde_json::json!({
522 "pendingCount": injections.len(),
523 "injections": injections,
524 })
525 }
526}
527
528fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
529 format!(
530 "{}: {}",
531 Code::ReminderUnknownOption.as_str(),
532 message.as_ref()
533 )
534}
535
536fn session_remind_shape_error(message: impl AsRef<str>) -> String {
537 format!(
538 "{}: {}",
539 Code::ReminderInvalidShape.as_str(),
540 message.as_ref()
541 )
542}
543
544fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
545 format!(
546 "{}: {}",
547 Code::ReminderUnknownPropagate.as_str(),
548 message.as_ref()
549 )
550}
551
552fn string_field(
553 map: &serde_json::Map<String, serde_json::Value>,
554 key: &str,
555 required: bool,
556) -> Result<Option<String>, String> {
557 match map.get(key) {
558 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
559 format!("`{key}` must be a non-empty string"),
560 )),
561 None | Some(serde_json::Value::Null) => Ok(None),
562 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
563 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
564 ),
565 Some(serde_json::Value::String(value)) => {
566 let trimmed = value.trim();
567 if trimmed.is_empty() {
568 Ok(None)
569 } else {
570 Ok(Some(trimmed.to_string()))
571 }
572 }
573 Some(other) => Err(session_remind_shape_error(format!(
574 "`{key}` must be a string, got {other}"
575 ))),
576 }
577}
578
579fn bool_field(
580 map: &serde_json::Map<String, serde_json::Value>,
581 key: &str,
582) -> Result<Option<bool>, String> {
583 match map.get(key) {
584 None | Some(serde_json::Value::Null) => Ok(None),
585 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
586 Some(other) => Err(session_remind_shape_error(format!(
587 "`{key}` must be a bool, got {other}"
588 ))),
589 }
590}
591
592fn int_field(
593 map: &serde_json::Map<String, serde_json::Value>,
594 key: &str,
595) -> Result<Option<i64>, String> {
596 match map.get(key) {
597 None | Some(serde_json::Value::Null) => Ok(None),
598 Some(serde_json::Value::Number(value)) => {
599 let Some(value) = value.as_i64() else {
600 return Err(session_remind_shape_error(format!(
601 "`{key}` must be an integer"
602 )));
603 };
604 Ok(Some(value))
605 }
606 Some(other) => Err(session_remind_shape_error(format!(
607 "`{key}` must be an int, got {other}"
608 ))),
609 }
610}
611
612fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
613 let Some(value) = map.get("tags") else {
614 return Ok(Vec::new());
615 };
616 if value.is_null() {
617 return Ok(Vec::new());
618 }
619 let Some(values) = value.as_array() else {
620 return Err(session_remind_shape_error("`tags` must be a list"));
621 };
622 let mut tags = Vec::new();
623 for value in values {
624 let Some(tag) = value.as_str() else {
625 return Err(session_remind_shape_error(format!(
626 "`tags` entries must be strings, got {value}"
627 )));
628 };
629 let tag = tag.trim();
630 if tag.is_empty() {
631 return Err(session_remind_shape_error(
632 "`tags` entries must be non-empty strings",
633 ));
634 }
635 if !tags.iter().any(|existing| existing == tag) {
636 tags.push(tag.to_string());
637 }
638 }
639 Ok(tags)
640}
641
642fn session_remind_payload_from_value(
643 value: &serde_json::Value,
644) -> Result<crate::llm::helpers::SystemReminder, String> {
645 let Some(map) = value.as_object() else {
646 return Err(session_remind_shape_error(
647 "session/remind payload must be a reminder object",
648 ));
649 };
650 const ALLOWED: &[&str] = &[
651 "_meta",
652 "body",
653 "dedupe_key",
654 "fired_at_turn",
655 "id",
656 "preserve_on_compact",
657 "propagate",
658 "role_hint",
659 "source",
660 "tags",
661 "ttl_turns",
662 ];
663 let unknown = map
664 .keys()
665 .filter(|key| !ALLOWED.contains(&key.as_str()))
666 .map(String::as_str)
667 .collect::<Vec<_>>();
668 if !unknown.is_empty() {
669 if unknown.contains(&"content") {
670 return Err(session_remind_shape_error(
671 "session/remind expects reminder `body`, not user-message `content`",
672 ));
673 }
674 return Err(reminder_unknown_option_error(format!(
675 "unknown reminder option(s): {}",
676 unknown.join(", ")
677 )));
678 }
679 if let Some(meta) = map.get("_meta") {
680 if !meta.is_null() && !meta.is_object() {
681 return Err(session_remind_shape_error("`_meta` must be an object"));
682 }
683 }
684 let ttl_turns = int_field(map, "ttl_turns")?;
685 if let Some(value) = ttl_turns {
686 if value <= 0 {
687 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
688 }
689 }
690 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
691 if fired_at_turn < 0 {
692 return Err(session_remind_shape_error(
693 "`fired_at_turn` must be >= 0 when provided",
694 ));
695 }
696 match string_field(map, "source", false)?.as_deref() {
697 None | Some("bridge") => {}
698 Some(_) => {
699 return Err(session_remind_shape_error(
700 "`source` for session/remind must be bridge when provided",
701 ))
702 }
703 }
704 let propagate = match string_field(map, "propagate", false)?.as_deref() {
705 None => crate::llm::helpers::ReminderPropagate::Session,
706 Some("all") => crate::llm::helpers::ReminderPropagate::All,
707 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
708 Some("none") => crate::llm::helpers::ReminderPropagate::None,
709 Some(_) => {
710 return Err(reminder_unknown_propagate_error(
711 "`propagate` must be one of all, session, or none",
712 ))
713 }
714 };
715 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
716 None => crate::llm::helpers::ReminderRoleHint::System,
717 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
718 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
719 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
720 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
721 Some(_) => {
722 return Err(session_remind_shape_error(
723 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
724 ))
725 }
726 };
727 Ok(crate::llm::helpers::SystemReminder {
728 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
729 tags: tags_field(map)?,
730 dedupe_key: string_field(map, "dedupe_key", false)?,
731 ttl_turns,
732 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
733 propagate,
734 role_hint,
735 source: crate::llm::helpers::ReminderSource::Bridge,
736 body: string_field(map, "body", true)?.unwrap_or_default(),
737 fired_at_turn,
738 originating_agent_id: None,
739 })
740}
741
742fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
752 let session_id = params
753 .get("sessionId")
754 .or_else(|| params.get("session_id"))
755 .and_then(|value| value.as_str())
756 .unwrap_or_default();
757 let call_id = params
758 .get("toolCallId")
759 .or_else(|| params.get("tool_call_id"))
760 .or_else(|| params.get("callId"))
761 .or_else(|| params.get("call_id"))
762 .and_then(|value| value.as_str())
763 .unwrap_or_default();
764 if call_id.is_empty() {
765 return;
766 }
767 let reason = params
768 .get("reason")
769 .and_then(|value| value.as_str())
770 .unwrap_or("host cancelled in-flight tool call")
771 .to_string();
772 let inject_reminder = params
773 .get("injectReminder")
774 .or_else(|| params.get("inject_reminder"))
775 .and_then(|value| value.as_bool())
776 .unwrap_or(true);
777 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
778}
779
780fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
781 let mode = QueuedUserMessageMode::from_str(
782 params
783 .get("mode")
784 .and_then(|value| value.as_str())
785 .unwrap_or("audit_only"),
786 );
787 let reminder_value = if let Some(reminder) = params.get("reminder") {
788 reminder.clone()
789 } else {
790 let Some(params) = params.as_object() else {
791 return Err(session_remind_shape_error(
792 "session/remind params must be an object",
793 ));
794 };
795 let mut reminder = params.clone();
796 reminder.remove("mode");
797 reminder.remove("sessionId");
798 reminder.remove("session_id");
799 serde_json::Value::Object(reminder)
800 };
801 Ok(QueuedReminder {
802 reminder: session_remind_payload_from_value(&reminder_value)?,
803 mode,
804 })
805}
806
807#[allow(clippy::new_without_default)]
809impl HostBridge {
810 pub fn new() -> Self {
815 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
816 Arc::new(Mutex::new(HashMap::new()));
817 let cancelled = Arc::new(AtomicBool::new(false));
818 let cancel_notify = Arc::new(Notify::new());
819 let queued_transcript_injections = HostBridgeInjectionState::default();
820 let resume_requested = Arc::new(AtomicBool::new(false));
821 let skills_reload_requested = Arc::new(AtomicBool::new(false));
822 let daemon_idle = Arc::new(AtomicBool::new(false));
823
824 let pending_clone = pending.clone();
826 let cancelled_clone = cancelled.clone();
827 let cancel_notify_clone = cancel_notify.clone();
828 let queued_clone = queued_transcript_injections.clone();
829 let resume_clone = resume_requested.clone();
830 let skills_reload_clone = skills_reload_requested.clone();
831 tokio::task::spawn_local(async move {
832 let stdin = tokio::io::stdin();
833 let reader = tokio::io::BufReader::new(stdin);
834 let mut lines = reader.lines();
835
836 while let Ok(Some(line)) = lines.next_line().await {
837 let line = line.trim().to_string();
838 if line.is_empty() {
839 continue;
840 }
841
842 let msg: serde_json::Value = match serde_json::from_str(&line) {
843 Ok(v) => v,
844 Err(_) => continue,
845 };
846
847 if msg.get("id").is_none() {
849 if let Some(method) = msg["method"].as_str() {
850 if method == "cancel" {
851 cancelled_clone.store(true, Ordering::SeqCst);
852 cancel_notify_clone.notify_waiters();
853 } else if method == "agent/resume" {
854 resume_clone.store(true, Ordering::SeqCst);
855 } else if method == "skills/update" {
856 skills_reload_clone.store(true, Ordering::SeqCst);
857 } else if method == "session/remind" {
858 let params = &msg["params"];
859 if let Ok(reminder) = queued_session_remind_from_params(params) {
860 queued_clone.push_session_reminder(reminder).await;
861 }
862 } else if method == "session/cancel_tool_call" {
863 handle_cancel_tool_call_notification(&msg["params"]);
864 }
865 }
866 continue;
867 }
868
869 if let Some(id) = msg["id"].as_u64() {
870 let mut pending = pending_clone.lock().await;
871 if let Some(sender) = pending.remove(&id) {
872 let _ = sender.send(msg);
873 }
874 }
875 }
876
877 let mut pending = pending_clone.lock().await;
879 pending.clear();
880 });
881
882 Self {
883 next_id: AtomicU64::new(1),
884 pending,
885 cancelled,
886 cancel_notify,
887 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
888 session_id: std::sync::Mutex::new(String::new()),
889 script_name: std::sync::Mutex::new(String::new()),
890 queued_transcript_injections,
891 resume_requested,
892 skills_reload_requested,
893 daemon_idle,
894 prompt_stop_reason: std::sync::Mutex::new(None),
895 visible_call_states: std::sync::Mutex::new(HashMap::new()),
896 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
897 in_process: None,
898 }
899 }
900
901 pub fn from_parts(
907 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
908 cancelled: Arc<AtomicBool>,
909 stdout_lock: Arc<std::sync::Mutex<()>>,
910 start_id: u64,
911 ) -> Self {
912 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
913 }
914
915 pub fn from_parts_with_writer(
916 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
917 cancelled: Arc<AtomicBool>,
918 writer: HostBridgeWriter,
919 start_id: u64,
920 ) -> Self {
921 Self::from_parts_with_writer_and_cancel_notify(
922 pending,
923 cancelled,
924 Arc::new(Notify::new()),
925 writer,
926 start_id,
927 )
928 }
929
930 pub fn from_parts_with_writer_and_cancel_notify(
931 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
932 cancelled: Arc<AtomicBool>,
933 cancel_notify: Arc<Notify>,
934 writer: HostBridgeWriter,
935 start_id: u64,
936 ) -> Self {
937 Self::from_parts_with_writer_cancel_notify_and_injection_state(
938 pending,
939 cancelled,
940 cancel_notify,
941 writer,
942 start_id,
943 None,
944 )
945 }
946
947 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
948 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
949 cancelled: Arc<AtomicBool>,
950 cancel_notify: Arc<Notify>,
951 writer: HostBridgeWriter,
952 start_id: u64,
953 injection_state: Option<HostBridgeInjectionState>,
954 ) -> Self {
955 Self {
956 next_id: AtomicU64::new(start_id),
957 pending,
958 cancelled,
959 cancel_notify,
960 writer,
961 session_id: std::sync::Mutex::new(String::new()),
962 script_name: std::sync::Mutex::new(String::new()),
963 queued_transcript_injections: injection_state.unwrap_or_default(),
964 resume_requested: Arc::new(AtomicBool::new(false)),
965 skills_reload_requested: Arc::new(AtomicBool::new(false)),
966 daemon_idle: Arc::new(AtomicBool::new(false)),
967 prompt_stop_reason: std::sync::Mutex::new(None),
968 visible_call_states: std::sync::Mutex::new(HashMap::new()),
969 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
970 in_process: None,
971 }
972 }
973
974 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
977 let exported_functions = vm.load_module_exports(module_path).await?;
978 Ok(Self {
979 next_id: AtomicU64::new(1),
980 pending: Arc::new(Mutex::new(HashMap::new())),
981 cancelled: Arc::new(AtomicBool::new(false)),
982 cancel_notify: Arc::new(Notify::new()),
983 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
984 session_id: std::sync::Mutex::new(String::new()),
985 script_name: std::sync::Mutex::new(String::new()),
986 queued_transcript_injections: HostBridgeInjectionState::default(),
987 resume_requested: Arc::new(AtomicBool::new(false)),
988 skills_reload_requested: Arc::new(AtomicBool::new(false)),
989 daemon_idle: Arc::new(AtomicBool::new(false)),
990 prompt_stop_reason: std::sync::Mutex::new(None),
991 visible_call_states: std::sync::Mutex::new(HashMap::new()),
992 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
993 in_process: Some(InProcessHost {
994 module_path: module_path.to_path_buf(),
995 exported_functions,
996 vm,
997 }),
998 })
999 }
1000
1001 pub fn set_session_id(&self, id: &str) {
1003 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1004 }
1005
1006 pub fn set_script_name(&self, name: &str) {
1008 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1009 }
1010
1011 fn get_script_name(&self) -> String {
1013 self.script_name
1014 .lock()
1015 .unwrap_or_else(|e| e.into_inner())
1016 .clone()
1017 }
1018
1019 pub fn get_session_id(&self) -> String {
1021 self.session_id
1022 .lock()
1023 .unwrap_or_else(|e| e.into_inner())
1024 .clone()
1025 }
1026
1027 fn write_line(&self, line: &str) -> Result<(), VmError> {
1029 (self.writer)(line).map_err(VmError::Runtime)
1030 }
1031
1032 pub async fn call(
1035 &self,
1036 method: &str,
1037 params: serde_json::Value,
1038 ) -> Result<serde_json::Value, VmError> {
1039 if let Some(in_process) = &self.in_process {
1040 return in_process.dispatch(method, params).await;
1041 }
1042
1043 if self.is_cancelled() {
1044 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1045 }
1046
1047 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1048 let cancel_wait = self.cancel_notify.notified();
1049 tokio::pin!(cancel_wait);
1050
1051 let request = crate::jsonrpc::request(id, method, params);
1052
1053 let (tx, rx) = oneshot::channel();
1054 {
1055 let mut pending = self.pending.lock().await;
1056 pending.insert(id, tx);
1057 }
1058
1059 let line = serde_json::to_string(&request)
1060 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1061 if let Err(e) = self.write_line(&line) {
1062 let mut pending = self.pending.lock().await;
1063 pending.remove(&id);
1064 return Err(e);
1065 }
1066
1067 if self.is_cancelled() {
1068 let mut pending = self.pending.lock().await;
1069 pending.remove(&id);
1070 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1071 }
1072
1073 let response = tokio::select! {
1074 result = rx => match result {
1075 Ok(msg) => msg,
1076 Err(_) => {
1077 return Err(VmError::Runtime(
1079 "Bridge: host closed connection before responding".into(),
1080 ));
1081 }
1082 },
1083 _ = &mut cancel_wait => {
1084 let mut pending = self.pending.lock().await;
1085 pending.remove(&id);
1086 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1087 }
1088 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1089 let mut pending = self.pending.lock().await;
1090 pending.remove(&id);
1091 return Err(VmError::Runtime(format!(
1092 "Bridge: host did not respond to '{method}' within {}s",
1093 DEFAULT_TIMEOUT.as_secs()
1094 )));
1095 }
1096 };
1097
1098 if let Some(error) = response.get("error") {
1099 let message = error["message"].as_str().unwrap_or("Unknown host error");
1100 let code = error["code"].as_i64().unwrap_or(-1);
1101 if code == -32001 {
1103 return Err(VmError::CategorizedError {
1104 message: message.to_string(),
1105 category: ErrorCategory::ToolRejected,
1106 });
1107 }
1108 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1109 }
1110
1111 Ok(response["result"].clone())
1112 }
1113
1114 pub fn notify(&self, method: &str, params: serde_json::Value) {
1117 let notification = crate::jsonrpc::notification(method, params);
1118 if self.in_process.is_some() {
1119 return;
1120 }
1121 if let Ok(line) = serde_json::to_string(¬ification) {
1122 let _ = self.write_line(&line);
1123 }
1124 }
1125
1126 pub fn is_cancelled(&self) -> bool {
1128 self.cancelled.load(Ordering::SeqCst)
1129 }
1130
1131 pub fn take_resume_signal(&self) -> bool {
1132 self.resume_requested.swap(false, Ordering::SeqCst)
1133 }
1134
1135 pub fn signal_resume(&self) {
1136 self.resume_requested.store(true, Ordering::SeqCst);
1137 }
1138
1139 pub fn set_daemon_idle(&self, idle: bool) {
1140 self.daemon_idle.store(idle, Ordering::SeqCst);
1141 }
1142
1143 pub fn is_daemon_idle(&self) -> bool {
1144 self.daemon_idle.load(Ordering::SeqCst)
1145 }
1146
1147 pub fn set_prompt_stop_reason(&self, reason: &str) {
1152 *self
1153 .prompt_stop_reason
1154 .lock()
1155 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1156 }
1157
1158 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1163 self.prompt_stop_reason
1164 .lock()
1165 .unwrap_or_else(|e| e.into_inner())
1166 .take()
1167 }
1168
1169 pub fn take_skills_reload_signal(&self) -> bool {
1174 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1175 }
1176
1177 pub fn signal_skills_reload(&self) {
1181 self.skills_reload_requested.store(true, Ordering::SeqCst);
1182 }
1183
1184 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1190 let result = self.call("skills/list", serde_json::json!({})).await?;
1191 match result {
1192 serde_json::Value::Array(items) => Ok(items),
1193 serde_json::Value::Object(map) => match map.get("skills") {
1194 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1195 _ => Err(VmError::Runtime(
1196 "skills/list: host response must be an array or { skills: [...] }".into(),
1197 )),
1198 },
1199 _ => Err(VmError::Runtime(
1200 "skills/list: unexpected response shape".into(),
1201 )),
1202 }
1203 }
1204
1205 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1211 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1212 parse_host_tools_list_response(result)
1213 }
1214
1215 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1219 self.call("skills/fetch", serde_json::json!({ "id": id }))
1220 .await
1221 }
1222
1223 pub fn injection_state(&self) -> HostBridgeInjectionState {
1224 self.queued_transcript_injections.clone()
1225 }
1226
1227 pub async fn push_pending_user_message(
1228 &self,
1229 content: String,
1230 transcript_content: serde_json::Value,
1231 mode: &str,
1232 ) -> String {
1233 self.queued_transcript_injections
1234 .push_pending_user_message(content, transcript_content, mode)
1235 .await
1236 }
1237
1238 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1239 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1240 .await
1241 }
1242
1243 pub async fn revoke_pending_user_message(
1244 &self,
1245 message_id: &str,
1246 ) -> PendingUserMessageMutationResult {
1247 self.queued_transcript_injections
1248 .revoke_pending_user_message(message_id)
1249 .await
1250 }
1251
1252 pub async fn revoke_pending_reminder(
1253 &self,
1254 reminder_id: &str,
1255 ) -> PendingReminderMutationResult {
1256 self.queued_transcript_injections
1257 .revoke_pending_reminder(reminder_id)
1258 .await
1259 }
1260
1261 pub async fn pending_injections_json(&self) -> serde_json::Value {
1262 self.queued_transcript_injections
1263 .pending_injections_json()
1264 .await
1265 }
1266
1267 pub async fn replace_pending_user_message(
1268 &self,
1269 message_id: &str,
1270 content: String,
1271 transcript_content: serde_json::Value,
1272 ) -> PendingUserMessageMutationResult {
1273 self.queued_transcript_injections
1274 .replace_pending_user_message(message_id, content, transcript_content)
1275 .await
1276 }
1277
1278 pub async fn push_queued_session_remind_from_params(
1279 &self,
1280 params: &serde_json::Value,
1281 ) -> Result<String, String> {
1282 let reminder = queued_session_remind_from_params(params)?;
1283 let reminder_id = reminder.reminder.id.clone();
1284 self.queued_transcript_injections
1285 .push_session_reminder(reminder)
1286 .await;
1287 Ok(reminder_id)
1288 }
1289
1290 pub async fn take_queued_user_messages(
1291 &self,
1292 include_interrupt_immediate: bool,
1293 include_finish_step: bool,
1294 include_audit_only: bool,
1295 ) -> Vec<QueuedUserMessage> {
1296 let mut state = self.queued_transcript_injections.inner.lock().await;
1297 let mut selected = Vec::new();
1298 let mut retained = VecDeque::new();
1299 while let Some(injection) = state.queue.pop_front() {
1300 let should_take = match injection.mode() {
1301 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1302 QueuedUserMessageMode::FinishStep => include_finish_step,
1303 QueuedUserMessageMode::AuditOnly => include_audit_only,
1304 };
1305 match (should_take, injection) {
1306 (true, QueuedTranscriptInjection::User(message)) => {
1307 state
1308 .delivered_user_message_ids
1309 .insert(message.message_id.clone());
1310 selected.push(message);
1311 }
1312 (_, injection) => retained.push_back(injection),
1313 }
1314 }
1315 state.queue = retained;
1316 selected
1317 }
1318
1319 pub async fn take_queued_transcript_injections(
1320 &self,
1321 include_interrupt_immediate: bool,
1322 include_finish_step: bool,
1323 include_audit_only: bool,
1324 ) -> Vec<QueuedTranscriptInjection> {
1325 let mut state = self.queued_transcript_injections.inner.lock().await;
1326 let mut selected = Vec::new();
1327 let mut retained = VecDeque::new();
1328 while let Some(injection) = state.queue.pop_front() {
1329 let should_take = match injection.mode() {
1330 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1331 QueuedUserMessageMode::FinishStep => include_finish_step,
1332 QueuedUserMessageMode::AuditOnly => include_audit_only,
1333 };
1334 if should_take {
1335 match &injection {
1336 QueuedTranscriptInjection::User(message) => {
1337 state
1338 .delivered_user_message_ids
1339 .insert(message.message_id.clone());
1340 }
1341 QueuedTranscriptInjection::Reminder(reminder) => {
1342 state
1343 .delivered_reminder_ids
1344 .insert(reminder.reminder.id.clone());
1345 }
1346 }
1347 selected.push(injection);
1348 } else {
1349 retained.push_back(injection);
1350 }
1351 }
1352 state.queue = retained;
1353 selected
1354 }
1355
1356 pub async fn take_queued_user_messages_for(
1357 &self,
1358 checkpoint: DeliveryCheckpoint,
1359 ) -> Vec<QueuedUserMessage> {
1360 match checkpoint {
1361 DeliveryCheckpoint::InterruptImmediate => {
1362 self.take_queued_user_messages(true, false, false).await
1363 }
1364 DeliveryCheckpoint::AfterCurrentOperation => {
1365 self.take_queued_user_messages(false, true, false).await
1366 }
1367 DeliveryCheckpoint::EndOfInteraction => {
1368 self.take_queued_user_messages(false, false, true).await
1369 }
1370 }
1371 }
1372
1373 pub async fn take_queued_transcript_injections_for(
1374 &self,
1375 checkpoint: DeliveryCheckpoint,
1376 ) -> Vec<QueuedTranscriptInjection> {
1377 match checkpoint {
1378 DeliveryCheckpoint::InterruptImmediate => {
1379 self.take_queued_transcript_injections(true, false, false)
1380 .await
1381 }
1382 DeliveryCheckpoint::AfterCurrentOperation => {
1383 self.take_queued_transcript_injections(false, true, false)
1384 .await
1385 }
1386 DeliveryCheckpoint::EndOfInteraction => {
1387 self.take_queued_transcript_injections(false, false, true)
1388 .await
1389 }
1390 }
1391 }
1392
1393 pub fn send_output(&self, text: &str) {
1395 self.notify("output", serde_json::json!({"text": text}));
1396 }
1397
1398 pub fn send_progress(
1400 &self,
1401 phase: &str,
1402 message: &str,
1403 progress: Option<i64>,
1404 total: Option<i64>,
1405 data: Option<serde_json::Value>,
1406 ) {
1407 let mut payload = serde_json::json!({"phase": phase, "message": message});
1408 if let Some(p) = progress {
1409 payload["progress"] = serde_json::json!(p);
1410 }
1411 if let Some(t) = total {
1412 payload["total"] = serde_json::json!(t);
1413 }
1414 if let Some(d) = data {
1415 payload["data"] = d;
1416 }
1417 self.notify("progress", payload);
1418 }
1419
1420 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1422 let mut payload = serde_json::json!({"level": level, "message": message});
1423 if let Some(f) = fields {
1424 payload["fields"] = f;
1425 }
1426 self.notify("log", payload);
1427 }
1428
1429 pub fn send_call_start(
1432 &self,
1433 call_id: &str,
1434 call_type: &str,
1435 name: &str,
1436 metadata: serde_json::Value,
1437 ) {
1438 let session_id = self.get_session_id();
1439 let script = self.get_script_name();
1440 let stream_publicly = metadata
1441 .get("stream_publicly")
1442 .and_then(|value| value.as_bool())
1443 .unwrap_or(true);
1444 self.visible_call_streams
1445 .lock()
1446 .unwrap_or_else(|e| e.into_inner())
1447 .insert(call_id.to_string(), stream_publicly);
1448 self.notify(
1449 "session/update",
1450 serde_json::json!({
1451 "sessionId": session_id,
1452 "update": {
1453 "sessionUpdate": "call_start",
1454 "content": {
1455 "toolCallId": call_id,
1456 "call_type": call_type,
1457 "name": name,
1458 "script": script,
1459 "metadata": metadata,
1460 },
1461 },
1462 }),
1463 );
1464 }
1465
1466 pub fn send_call_progress(
1469 &self,
1470 call_id: &str,
1471 delta: &str,
1472 accumulated_tokens: u64,
1473 user_visible: bool,
1474 ) {
1475 let session_id = self.get_session_id();
1476 let (visible_text, visible_delta) = {
1477 let stream_publicly = self
1478 .visible_call_streams
1479 .lock()
1480 .unwrap_or_else(|e| e.into_inner())
1481 .get(call_id)
1482 .copied()
1483 .unwrap_or(true);
1484 let mut states = self
1485 .visible_call_states
1486 .lock()
1487 .unwrap_or_else(|e| e.into_inner());
1488 let state = states.entry(call_id.to_string()).or_default();
1489 state.push(delta, stream_publicly)
1490 };
1491 self.notify(
1492 "session/update",
1493 serde_json::json!({
1494 "sessionId": session_id,
1495 "update": {
1496 "sessionUpdate": "call_progress",
1497 "content": {
1498 "toolCallId": call_id,
1499 "delta": delta,
1500 "accumulated_tokens": accumulated_tokens,
1501 "visible_text": visible_text,
1502 "visible_delta": visible_delta,
1503 "user_visible": user_visible,
1504 },
1505 },
1506 }),
1507 );
1508 }
1509
1510 pub fn send_call_end(
1512 &self,
1513 call_id: &str,
1514 call_type: &str,
1515 name: &str,
1516 duration_ms: u64,
1517 status: &str,
1518 metadata: serde_json::Value,
1519 ) {
1520 let session_id = self.get_session_id();
1521 let script = self.get_script_name();
1522 self.visible_call_states
1523 .lock()
1524 .unwrap_or_else(|e| e.into_inner())
1525 .remove(call_id);
1526 self.visible_call_streams
1527 .lock()
1528 .unwrap_or_else(|e| e.into_inner())
1529 .remove(call_id);
1530 self.notify(
1531 "session/update",
1532 serde_json::json!({
1533 "sessionId": session_id,
1534 "update": {
1535 "sessionUpdate": "call_end",
1536 "content": {
1537 "toolCallId": call_id,
1538 "call_type": call_type,
1539 "name": name,
1540 "script": script,
1541 "duration_ms": duration_ms,
1542 "status": status,
1543 "metadata": metadata,
1544 },
1545 },
1546 }),
1547 );
1548 }
1549
1550 pub fn send_worker_update(
1552 &self,
1553 worker_id: &str,
1554 worker_name: &str,
1555 status: &str,
1556 metadata: serde_json::Value,
1557 audit: Option<&MutationSessionRecord>,
1558 ) {
1559 let session_id = self.get_session_id();
1560 let script = self.get_script_name();
1561 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1562 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1563 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1564 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1565 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1566 let lifecycle = serde_json::json!({
1567 "event": status,
1568 "worker_id": worker_id,
1569 "worker_name": worker_name,
1570 "started_at": started_at,
1571 "finished_at": finished_at,
1572 });
1573 self.notify(
1574 "session/update",
1575 serde_json::json!({
1576 "sessionId": session_id,
1577 "update": {
1578 "sessionUpdate": "worker_update",
1579 "content": {
1580 "worker_id": worker_id,
1581 "worker_name": worker_name,
1582 "status": status,
1583 "script": script,
1584 "started_at": started_at,
1585 "finished_at": finished_at,
1586 "snapshot_path": snapshot_path,
1587 "run_id": run_id,
1588 "run_path": run_path,
1589 "lifecycle": lifecycle,
1590 "audit": audit,
1591 "metadata": metadata,
1592 },
1593 },
1594 }),
1595 );
1596 }
1597}
1598
1599pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1601 crate::stdlib::json_to_vm_value(val)
1602}
1603
1604fn parse_host_tools_list_response(
1605 result: serde_json::Value,
1606) -> Result<Vec<serde_json::Value>, VmError> {
1607 let tools = match result {
1608 serde_json::Value::Array(items) => items,
1609 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1610 map.get("result")
1611 .and_then(|value| value.get("tools"))
1612 .cloned()
1613 }) {
1614 Some(serde_json::Value::Array(items)) => items,
1615 _ => {
1616 return Err(VmError::Runtime(
1617 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1618 ));
1619 }
1620 },
1621 _ => {
1622 return Err(VmError::Runtime(
1623 "host/tools/list: unexpected response shape".into(),
1624 ));
1625 }
1626 };
1627
1628 let mut normalized = Vec::with_capacity(tools.len());
1629 for tool in tools {
1630 let serde_json::Value::Object(map) = tool else {
1631 return Err(VmError::Runtime(
1632 "host/tools/list: every tool must be an object".into(),
1633 ));
1634 };
1635 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1636 return Err(VmError::Runtime(
1637 "host/tools/list: every tool must include a string `name`".into(),
1638 ));
1639 };
1640 let description = map
1641 .get("description")
1642 .and_then(|value| value.as_str())
1643 .or_else(|| {
1644 map.get("short_description")
1645 .and_then(|value| value.as_str())
1646 })
1647 .unwrap_or_default();
1648 let schema = map
1649 .get("schema")
1650 .cloned()
1651 .or_else(|| map.get("parameters").cloned())
1652 .or_else(|| map.get("input_schema").cloned())
1653 .unwrap_or(serde_json::Value::Null);
1654 let deprecated = map
1655 .get("deprecated")
1656 .and_then(|value| value.as_bool())
1657 .unwrap_or(false);
1658 normalized.push(serde_json::json!({
1659 "name": name,
1660 "description": description,
1661 "schema": schema,
1662 "deprecated": deprecated,
1663 }));
1664 }
1665 Ok(normalized)
1666}
1667
1668#[cfg(test)]
1669mod tests {
1670 use super::*;
1671
1672 fn test_bridge() -> HostBridge {
1673 HostBridge::from_parts(
1674 Arc::new(Mutex::new(HashMap::new())),
1675 Arc::new(AtomicBool::new(false)),
1676 Arc::new(std::sync::Mutex::new(())),
1677 1,
1678 )
1679 }
1680
1681 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1682 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1683 Arc::new(Mutex::new(HashMap::new())),
1684 Arc::new(AtomicBool::new(false)),
1685 Arc::new(Notify::new()),
1686 Arc::new(|_| Ok(())),
1687 100,
1688 Some(owner.injection_state()),
1689 )
1690 }
1691
1692 #[test]
1693 fn test_json_rpc_request_format() {
1694 let request = crate::jsonrpc::request(
1695 1,
1696 "llm_call",
1697 serde_json::json!({
1698 "prompt": "Hello",
1699 "system": "Be helpful",
1700 }),
1701 );
1702 let s = serde_json::to_string(&request).unwrap();
1703 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1704 assert!(s.contains("\"id\":1"));
1705 assert!(s.contains("\"method\":\"llm_call\""));
1706 }
1707
1708 #[test]
1709 fn test_json_rpc_notification_format() {
1710 let notification =
1711 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1712 let s = serde_json::to_string(¬ification).unwrap();
1713 assert!(s.contains("\"method\":\"output\""));
1714 assert!(!s.contains("\"id\""));
1715 }
1716
1717 #[test]
1718 fn test_json_rpc_error_response_parsing() {
1719 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1720 assert!(response.get("error").is_some());
1721 assert_eq!(
1722 response["error"]["message"].as_str().unwrap(),
1723 "Invalid request"
1724 );
1725 }
1726
1727 #[test]
1728 fn test_json_rpc_success_response_parsing() {
1729 let response = crate::jsonrpc::response(
1730 1,
1731 serde_json::json!({
1732 "text": "Hello world",
1733 "input_tokens": 10,
1734 "output_tokens": 5,
1735 }),
1736 );
1737 assert!(response.get("result").is_some());
1738 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1739 }
1740
1741 #[test]
1742 fn test_cancelled_flag() {
1743 let cancelled = Arc::new(AtomicBool::new(false));
1744 assert!(!cancelled.load(Ordering::SeqCst));
1745 cancelled.store(true, Ordering::SeqCst);
1746 assert!(cancelled.load(Ordering::SeqCst));
1747 }
1748
1749 #[test]
1750 fn pending_host_calls_return_when_cancellation_arrives() {
1751 let runtime = tokio::runtime::Builder::new_current_thread()
1752 .enable_all()
1753 .build()
1754 .unwrap();
1755 runtime.block_on(async {
1756 let pending = Arc::new(Mutex::new(HashMap::new()));
1757 let cancelled = Arc::new(AtomicBool::new(false));
1758 let bridge = HostBridge::from_parts_with_writer(
1759 pending.clone(),
1760 cancelled.clone(),
1761 Arc::new(|_| Ok(())),
1762 1,
1763 );
1764
1765 let call = bridge.call("host/work", serde_json::json!({}));
1766 tokio::pin!(call);
1767
1768 loop {
1769 tokio::select! {
1770 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1771 _ = tokio::task::yield_now() => {}
1772 }
1773 if !pending.lock().await.is_empty() {
1774 break;
1775 }
1776 }
1777
1778 cancelled.store(true, Ordering::SeqCst);
1779 bridge.cancel_notify.notify_waiters();
1780
1781 let result = tokio::time::timeout(Duration::from_secs(1), call)
1782 .await
1783 .expect("pending call should observe cancellation promptly");
1784 assert!(
1785 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1786 );
1787 assert!(pending.lock().await.is_empty());
1788 });
1789 }
1790
1791 #[test]
1792 fn queued_messages_are_filtered_by_delivery_mode() {
1793 let runtime = tokio::runtime::Builder::new_current_thread()
1794 .enable_all()
1795 .build()
1796 .unwrap();
1797 runtime.block_on(async {
1798 let bridge = test_bridge();
1799 bridge
1800 .push_queued_user_message("first".to_string(), "finish_step")
1801 .await;
1802 bridge
1803 .push_queued_user_message("second".to_string(), "audit_only")
1804 .await;
1805
1806 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1807 assert_eq!(finish_step.len(), 1);
1808 assert_eq!(finish_step[0].content, "first");
1809
1810 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1811 assert_eq!(audit_only.len(), 1);
1812 assert_eq!(audit_only[0].content, "second");
1813 });
1814 }
1815
1816 #[test]
1817 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1818 let runtime = tokio::runtime::Builder::new_current_thread()
1819 .enable_all()
1820 .build()
1821 .unwrap();
1822 runtime.block_on(async {
1823 let bridge = test_bridge();
1824 let first_id = bridge
1825 .push_pending_user_message(
1826 "first".to_string(),
1827 serde_json::json!("first"),
1828 "audit_only",
1829 )
1830 .await;
1831 let second_id = bridge
1832 .push_pending_user_message(
1833 "second".to_string(),
1834 serde_json::json!("second"),
1835 "audit_only",
1836 )
1837 .await;
1838
1839 assert_eq!(
1840 bridge
1841 .replace_pending_user_message(
1842 &second_id,
1843 "second edited".to_string(),
1844 serde_json::json!("second edited"),
1845 )
1846 .await,
1847 PendingUserMessageMutationResult::Mutated
1848 );
1849 assert_eq!(
1850 bridge.revoke_pending_user_message(&first_id).await,
1851 PendingUserMessageMutationResult::Mutated
1852 );
1853 assert_eq!(
1854 bridge.revoke_pending_user_message(&first_id).await,
1855 PendingUserMessageMutationResult::AlreadyRevoked
1856 );
1857
1858 let delivered = bridge
1859 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1860 .await;
1861 assert_eq!(delivered.len(), 1);
1862 assert_eq!(delivered[0].message_id, second_id);
1863 assert_eq!(delivered[0].content, "second edited");
1864
1865 assert_eq!(
1866 bridge.revoke_pending_user_message(&second_id).await,
1867 PendingUserMessageMutationResult::AlreadyDelivered
1868 );
1869 assert_eq!(
1870 bridge
1871 .replace_pending_user_message(
1872 &second_id,
1873 "too late".to_string(),
1874 serde_json::json!("too late"),
1875 )
1876 .await,
1877 PendingUserMessageMutationResult::AlreadyDelivered
1878 );
1879 assert_eq!(
1880 bridge.revoke_pending_user_message("missing").await,
1881 PendingUserMessageMutationResult::UnknownMessageId
1882 );
1883 });
1884 }
1885
1886 #[test]
1887 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1888 let runtime = tokio::runtime::Builder::new_current_thread()
1889 .enable_all()
1890 .build()
1891 .unwrap();
1892 runtime.block_on(async {
1893 let bridge = test_bridge();
1894 let first_id = bridge
1895 .push_pending_user_message(
1896 "first".to_string(),
1897 serde_json::json!("first"),
1898 "finish_step",
1899 )
1900 .await;
1901 let second_id = bridge
1902 .push_pending_user_message(
1903 "second".to_string(),
1904 serde_json::json!("second"),
1905 "finish_step",
1906 )
1907 .await;
1908 assert_eq!(
1909 bridge
1910 .replace_pending_user_message(
1911 &first_id,
1912 "first edited".to_string(),
1913 serde_json::json!("first edited"),
1914 )
1915 .await,
1916 PendingUserMessageMutationResult::Mutated
1917 );
1918
1919 let delivered = bridge
1920 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1921 .await;
1922 assert_eq!(
1923 delivered
1924 .iter()
1925 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1926 .collect::<Vec<_>>(),
1927 vec![
1928 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1929 (&second_id, "second", QueuedUserMessageMode::FinishStep),
1930 ]
1931 );
1932 });
1933 }
1934
1935 #[test]
1936 fn pending_user_message_state_survives_bridge_replacement() {
1937 let runtime = tokio::runtime::Builder::new_current_thread()
1938 .enable_all()
1939 .build()
1940 .unwrap();
1941 runtime.block_on(async {
1942 let bridge = test_bridge();
1943 let revoked_id = bridge
1944 .push_pending_user_message(
1945 "revoke me".to_string(),
1946 serde_json::json!("revoke me"),
1947 "audit_only",
1948 )
1949 .await;
1950 let delivered_id = bridge
1951 .push_pending_user_message(
1952 "deliver me".to_string(),
1953 serde_json::json!("deliver me"),
1954 "audit_only",
1955 )
1956 .await;
1957 assert_eq!(
1958 bridge.revoke_pending_user_message(&revoked_id).await,
1959 PendingUserMessageMutationResult::Mutated
1960 );
1961 bridge.cancelled.store(true, Ordering::SeqCst);
1962
1963 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1964 assert_eq!(
1965 replacement_bridge
1966 .revoke_pending_user_message(&revoked_id)
1967 .await,
1968 PendingUserMessageMutationResult::AlreadyRevoked
1969 );
1970 let delivered = replacement_bridge
1971 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1972 .await;
1973 assert_eq!(delivered.len(), 1);
1974 assert_eq!(delivered[0].message_id, delivered_id);
1975 assert_eq!(delivered[0].content, "deliver me");
1976 assert_eq!(
1977 bridge.revoke_pending_user_message(&delivered_id).await,
1978 PendingUserMessageMutationResult::AlreadyDelivered
1979 );
1980 });
1981 }
1982
1983 #[test]
1984 fn queued_transcript_injections_preserve_user_reminder_separation() {
1985 let runtime = tokio::runtime::Builder::new_current_thread()
1986 .enable_all()
1987 .build()
1988 .unwrap();
1989 runtime.block_on(async {
1990 let bridge = test_bridge();
1991 bridge
1992 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1993 .await;
1994 let reminder_id = bridge
1995 .push_queued_session_remind_from_params(&serde_json::json!({
1996 "body": "Host-provided ambient context.",
1997 "tags": ["host"],
1998 "dedupe_key": "host-context",
1999 "ttl_turns": 2,
2000 "mode": "audit_only",
2001 "_meta": {"harn": {"source": "test"}},
2002 }))
2003 .await
2004 .expect("valid reminder");
2005
2006 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2007 assert_eq!(finish_step.len(), 1);
2008 assert_eq!(finish_step[0].content, "human follow-up");
2009
2010 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2011 assert!(no_user_messages.is_empty());
2012
2013 let injections = bridge
2014 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2015 .await;
2016 assert_eq!(injections.len(), 1);
2017 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2018 panic!("expected queued reminder");
2019 };
2020 assert_eq!(reminder.reminder.id, reminder_id);
2021 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2022 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2023 assert_eq!(
2024 reminder.reminder.dedupe_key.as_deref(),
2025 Some("host-context")
2026 );
2027 assert_eq!(reminder.reminder.ttl_turns, Some(2));
2028 assert_eq!(
2029 reminder.reminder.source,
2030 crate::llm::helpers::ReminderSource::Bridge
2031 );
2032 });
2033 }
2034
2035 #[test]
2036 fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2037 let runtime = tokio::runtime::Builder::new_current_thread()
2038 .enable_all()
2039 .build()
2040 .unwrap();
2041 runtime.block_on(async {
2042 let bridge = test_bridge();
2043 let message_id = bridge
2044 .push_pending_user_message(
2045 "human follow-up".to_string(),
2046 serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2047 "finish_step",
2048 )
2049 .await;
2050 let reminder_id = bridge
2051 .push_queued_session_remind_from_params(&serde_json::json!({
2052 "id": "rem-test",
2053 "body": "Host reminder",
2054 "tags": ["host"],
2055 "dedupe_key": "host-reminder",
2056 "ttl_turns": 2,
2057 "mode": "interrupt_immediate",
2058 }))
2059 .await
2060 .expect("valid session/remind payload");
2061
2062 let pending = bridge.pending_injections_json().await;
2063 assert_eq!(pending["pendingCount"], 2);
2064 assert_eq!(pending["injections"][0]["kind"], "user");
2065 assert_eq!(pending["injections"][0]["id"], message_id);
2066 assert_eq!(pending["injections"][0]["messageId"], message_id);
2067 assert_eq!(pending["injections"][0]["mode"], "finish_step");
2068 assert_eq!(pending["injections"][0]["position"], 0);
2069 assert_eq!(pending["injections"][1]["kind"], "reminder");
2070 assert_eq!(pending["injections"][1]["id"], reminder_id);
2071 assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2072 assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2073 assert_eq!(pending["injections"][1]["body"], "Host reminder");
2074 assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2075 assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2076 assert_eq!(pending["injections"][1]["position"], 1);
2077 });
2078 }
2079
2080 #[test]
2081 fn pending_reminders_support_revoke_and_delivery_states() {
2082 let runtime = tokio::runtime::Builder::new_current_thread()
2083 .enable_all()
2084 .build()
2085 .unwrap();
2086 runtime.block_on(async {
2087 let bridge = test_bridge();
2088 let revoked_id = bridge
2089 .push_queued_session_remind_from_params(&serde_json::json!({
2090 "id": "rem-revoke",
2091 "body": "remove me",
2092 "mode": "finish_step",
2093 }))
2094 .await
2095 .expect("valid session/remind payload");
2096 let delivered_id = bridge
2097 .push_queued_session_remind_from_params(&serde_json::json!({
2098 "id": "rem-deliver",
2099 "body": "deliver me",
2100 "mode": "finish_step",
2101 }))
2102 .await
2103 .expect("valid session/remind payload");
2104
2105 assert_eq!(
2106 bridge.revoke_pending_reminder(&revoked_id).await,
2107 PendingReminderMutationResult::Mutated
2108 );
2109 assert_eq!(
2110 bridge.revoke_pending_reminder(&revoked_id).await,
2111 PendingReminderMutationResult::AlreadyRevoked
2112 );
2113
2114 let pending = bridge.pending_injections_json().await;
2115 assert_eq!(pending["pendingCount"], 1);
2116 assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2117
2118 let delivered = bridge
2119 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2120 .await;
2121 assert_eq!(delivered.len(), 1);
2122 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2123 panic!("expected delivered reminder");
2124 };
2125 assert_eq!(reminder.reminder.id, delivered_id);
2126
2127 assert_eq!(
2128 bridge.revoke_pending_reminder(&delivered_id).await,
2129 PendingReminderMutationResult::AlreadyDelivered
2130 );
2131 assert_eq!(
2132 bridge.revoke_pending_reminder("missing").await,
2133 PendingReminderMutationResult::UnknownReminderId
2134 );
2135 });
2136 }
2137
2138 #[test]
2139 fn bridge_remind_modes_honor_delivery_checkpoints() {
2140 let runtime = tokio::runtime::Builder::new_current_thread()
2141 .enable_all()
2142 .build()
2143 .unwrap();
2144 runtime.block_on(async {
2145 let cases = [
2146 (
2147 "interrupt_immediate",
2148 DeliveryCheckpoint::InterruptImmediate,
2149 DeliveryCheckpoint::AfterCurrentOperation,
2150 ),
2151 (
2152 "finish_step",
2153 DeliveryCheckpoint::AfterCurrentOperation,
2154 DeliveryCheckpoint::EndOfInteraction,
2155 ),
2156 (
2157 "audit_only",
2158 DeliveryCheckpoint::EndOfInteraction,
2159 DeliveryCheckpoint::InterruptImmediate,
2160 ),
2161 ];
2162
2163 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2164 let bridge = test_bridge();
2165 bridge
2166 .push_queued_session_remind_from_params(&serde_json::json!({
2167 "body": format!("Reminder for {mode}"),
2168 "mode": mode,
2169 }))
2170 .await
2171 .expect("valid session/remind payload");
2172
2173 let premature = bridge
2174 .take_queued_transcript_injections_for(wrong_checkpoint)
2175 .await;
2176 assert!(
2177 premature.is_empty(),
2178 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2179 );
2180
2181 let delivered = bridge
2182 .take_queued_transcript_injections_for(expected_checkpoint)
2183 .await;
2184 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2185 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2186 panic!("expected reminder for {mode}");
2187 };
2188 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2189 }
2190 });
2191 }
2192
2193 #[test]
2194 fn session_remind_validation_rejects_user_message_shape() {
2195 let err = queued_session_remind_from_params(&serde_json::json!({
2196 "content": "this is still a user message",
2197 "mode": "interrupt_immediate",
2198 }))
2199 .expect_err("session/remind must require a reminder body");
2200 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2201 assert!(err.contains("body"));
2202 }
2203
2204 #[test]
2205 fn session_remind_validation_rejects_unknown_options_separately() {
2206 let err = queued_session_remind_from_params(&serde_json::json!({
2207 "body": "valid body",
2208 "unknown_host_field": true,
2209 }))
2210 .expect_err("session/remind must reject unknown top-level fields");
2211 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2212 assert!(err.contains("unknown_host_field"));
2213 }
2214
2215 #[test]
2216 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2217 let err = queued_session_remind_from_params(&serde_json::json!({
2218 "body": "valid body",
2219 "propagate": "workspace",
2220 }))
2221 .expect_err("session/remind must reject unknown propagate values");
2222 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2223 assert!(err.contains("propagate"));
2224 }
2225
2226 #[test]
2227 fn test_json_result_to_vm_value_string() {
2228 let val = serde_json::json!("hello");
2229 let vm_val = json_result_to_vm_value(&val);
2230 assert_eq!(vm_val.display(), "hello");
2231 }
2232
2233 #[test]
2234 fn test_json_result_to_vm_value_dict() {
2235 let val = serde_json::json!({"name": "test", "count": 42});
2236 let vm_val = json_result_to_vm_value(&val);
2237 let VmValue::Dict(d) = &vm_val else {
2238 unreachable!("Expected Dict, got {:?}", vm_val);
2239 };
2240 assert_eq!(d.get("name").unwrap().display(), "test");
2241 assert_eq!(d.get("count").unwrap().display(), "42");
2242 }
2243
2244 #[test]
2245 fn test_json_result_to_vm_value_null() {
2246 let val = serde_json::json!(null);
2247 let vm_val = json_result_to_vm_value(&val);
2248 assert!(matches!(vm_val, VmValue::Nil));
2249 }
2250
2251 #[test]
2252 fn test_json_result_to_vm_value_nested() {
2253 let val = serde_json::json!({
2254 "text": "response",
2255 "tool_calls": [
2256 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2257 ],
2258 "input_tokens": 100,
2259 "output_tokens": 50,
2260 });
2261 let vm_val = json_result_to_vm_value(&val);
2262 let VmValue::Dict(d) = &vm_val else {
2263 unreachable!("Expected Dict, got {:?}", vm_val);
2264 };
2265 assert_eq!(d.get("text").unwrap().display(), "response");
2266 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2267 unreachable!("Expected List for tool_calls");
2268 };
2269 assert_eq!(list.len(), 1);
2270 }
2271
2272 #[test]
2273 fn parse_host_tools_list_accepts_object_wrapper() {
2274 let tools = parse_host_tools_list_response(serde_json::json!({
2275 "tools": [
2276 {
2277 "name": "Read",
2278 "description": "Read a file",
2279 "schema": {"type": "object"},
2280 }
2281 ]
2282 }))
2283 .expect("tool list");
2284
2285 assert_eq!(tools.len(), 1);
2286 assert_eq!(tools[0]["name"], "Read");
2287 assert_eq!(tools[0]["deprecated"], false);
2288 }
2289
2290 #[test]
2291 fn parse_host_tools_list_accepts_compat_fields() {
2292 let tools = parse_host_tools_list_response(serde_json::json!({
2293 "result": {
2294 "tools": [
2295 {
2296 "name": "Edit",
2297 "short_description": "Apply an edit",
2298 "input_schema": {"type": "object"},
2299 "deprecated": true,
2300 }
2301 ]
2302 }
2303 }))
2304 .expect("tool list");
2305
2306 assert_eq!(tools[0]["description"], "Apply an edit");
2307 assert_eq!(tools[0]["schema"]["type"], "object");
2308 assert_eq!(tools[0]["deprecated"], true);
2309 }
2310
2311 #[test]
2312 fn parse_host_tools_list_requires_tool_names() {
2313 let err = parse_host_tools_list_response(serde_json::json!({
2314 "tools": [
2315 {"description": "missing name"}
2316 ]
2317 }))
2318 .expect_err("expected error");
2319 assert!(err
2320 .to_string()
2321 .contains("host/tools/list: every tool must include a string `name`"));
2322 }
2323
2324 #[test]
2325 fn test_timeout_duration() {
2326 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2327 }
2328}