1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33 Arc::new(move |line: &str| {
34 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35 let mut stdout = std::io::stdout().lock();
36 stdout
37 .write_all(line.as_bytes())
38 .map_err(|e| format!("Bridge write error: {e}"))?;
39 stdout
40 .write_all(b"\n")
41 .map_err(|e| format!("Bridge write error: {e}"))?;
42 stdout
43 .flush()
44 .map_err(|e| format!("Bridge flush error: {e}"))?;
45 Ok(())
46 })
47}
48
49pub struct HostBridge {
56 next_id: AtomicU64,
57 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59 cancelled: Arc<AtomicBool>,
61 cancel_notify: Arc<Notify>,
63 writer: HostBridgeWriter,
65 session_id: std::sync::Mutex<String>,
67 script_name: std::sync::Mutex<String>,
69 queued_transcript_injections: HostBridgeInjectionState,
71 resume_requested: Arc<AtomicBool>,
73 skills_reload_requested: Arc<AtomicBool>,
78 daemon_idle: Arc<AtomicBool>,
80 prompt_stop_reason: std::sync::Mutex<Option<String>>,
86 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90 in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95 module_path: PathBuf,
96 exported_functions: BTreeMap<String, Rc<VmClosure>>,
97 vm: Vm,
98}
99
100impl InProcessHost {
101 fn dispatch<'a>(
107 &'a self,
108 method: &'a str,
109 params: serde_json::Value,
110 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111 Box::pin(async move {
112 match method {
113 "builtin_call" => {
114 let name = params
115 .get("name")
116 .and_then(|value| value.as_str())
117 .unwrap_or_default();
118 let args = params
119 .get("args")
120 .and_then(|value| value.as_array())
121 .cloned()
122 .unwrap_or_default()
123 .into_iter()
124 .map(|value| json_result_to_vm_value(&value))
125 .collect::<Vec<_>>();
126 self.invoke_export(name, &args).await
127 }
128 "host/tools/list" => self
129 .invoke_optional_export("host_tools_list", &[])
130 .await
131 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132 "session/request_permission" => self.request_permission(params).await,
133 other => Err(VmError::Runtime(format!(
134 "playground host backend does not implement bridge method '{other}'"
135 ))),
136 }
137 })
138 }
139
140 async fn invoke_export(
141 &self,
142 name: &str,
143 args: &[VmValue],
144 ) -> Result<serde_json::Value, VmError> {
145 let Some(closure) = self.exported_functions.get(name) else {
146 return Err(VmError::Runtime(format!(
147 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148 self.module_path.display()
149 )));
150 };
151
152 let mut vm = self.vm.child_vm_for_host();
153 let result = vm.call_closure_pub(closure, args).await?;
154 Ok(crate::llm::vm_value_to_json(&result))
155 }
156
157 async fn invoke_optional_export(
158 &self,
159 name: &str,
160 args: &[VmValue],
161 ) -> Result<Option<serde_json::Value>, VmError> {
162 if !self.exported_functions.contains_key(name) {
163 return Ok(None);
164 }
165 self.invoke_export(name, args).await.map(Some)
166 }
167
168 async fn request_permission(
169 &self,
170 params: serde_json::Value,
171 ) -> Result<serde_json::Value, VmError> {
172 let Some(closure) = self.exported_functions.get("request_permission") else {
175 return Ok(canonical_allow_response());
176 };
177
178 let tool_call = params.get("toolCall");
179 let tool_name = tool_call
180 .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
181 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
182 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
183 .and_then(|value| value.as_str())
184 .unwrap_or_default();
185 let tool_args = tool_call
186 .and_then(|tool_call| tool_call.get("rawInput"))
187 .map(json_result_to_vm_value)
188 .unwrap_or(VmValue::Nil);
189 let full_payload = json_result_to_vm_value(¶ms);
190
191 let arg_count = closure.func.params.len();
192 let args = if arg_count >= 3 {
193 vec![
194 VmValue::String(Rc::from(tool_name.to_string())),
195 tool_args,
196 full_payload,
197 ]
198 } else if arg_count == 2 {
199 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
200 } else if arg_count == 1 {
201 vec![full_payload]
202 } else {
203 Vec::new()
204 };
205
206 let mut vm = self.vm.child_vm_for_host();
207 let result = vm.call_closure_pub(closure, &args).await?;
208 let payload = match result {
213 VmValue::Bool(granted) => {
214 if granted {
215 canonical_allow_response()
216 } else {
217 canonical_reject_response(None)
218 }
219 }
220 VmValue::String(reason) if !reason.is_empty() => {
221 canonical_reject_response(Some(reason.to_string()))
222 }
223 other => {
224 let json = crate::llm::vm_value_to_json(&other);
225 if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
226 if granted {
227 canonical_allow_response()
228 } else {
229 canonical_reject_response(
230 json.get("reason")
231 .and_then(|value| value.as_str())
232 .map(str::to_string),
233 )
234 }
235 } else if json.get("outcome").is_some() {
236 json
238 } else if other.is_truthy() {
239 canonical_allow_response()
240 } else {
241 canonical_reject_response(None)
242 }
243 }
244 };
245 Ok(payload)
246 }
247}
248
249fn canonical_allow_response() -> serde_json::Value {
252 serde_json::json!({
253 "outcome": { "outcome": "selected", "optionId": "allow" }
254 })
255}
256
257fn canonical_reject_response(reason: Option<String>) -> serde_json::Value {
261 let mut response = serde_json::json!({
262 "outcome": { "outcome": "selected", "optionId": "reject" }
263 });
264 if let Some(reason) = reason {
265 response
266 .as_object_mut()
267 .expect("object")
268 .insert("reason".to_string(), serde_json::Value::String(reason));
269 }
270 response
271}
272
273#[derive(Clone, Copy, Debug, PartialEq, Eq)]
283pub enum QueuedUserMessageMode {
284 InterruptImmediate,
285 FinishStep,
286 AuditOnly,
287}
288
289#[derive(Clone, Copy, Debug, PartialEq, Eq)]
290pub enum DeliveryCheckpoint {
291 InterruptImmediate,
292 AfterCurrentOperation,
293 EndOfInteraction,
294}
295
296impl QueuedUserMessageMode {
297 fn from_str(value: &str) -> Self {
298 match value {
299 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
300 "finish_step" | "after_current_operation" => Self::FinishStep,
301 _ => Self::AuditOnly,
306 }
307 }
308
309 fn as_str(self) -> &'static str {
310 match self {
311 Self::InterruptImmediate => "interrupt_immediate",
312 Self::FinishStep => "finish_step",
313 Self::AuditOnly => "audit_only",
314 }
315 }
316}
317
318#[derive(Clone, Debug, PartialEq, Eq)]
319pub struct QueuedUserMessage {
320 pub message_id: String,
321 pub content: String,
322 pub transcript_content: serde_json::Value,
323 pub mode: QueuedUserMessageMode,
324}
325
326#[derive(Clone, Debug, PartialEq, Eq)]
327pub struct QueuedReminder {
328 pub reminder: crate::llm::helpers::SystemReminder,
329 pub mode: QueuedUserMessageMode,
330}
331
332#[derive(Clone, Debug, PartialEq, Eq)]
333pub enum QueuedTranscriptInjection {
334 User(QueuedUserMessage),
335 Reminder(QueuedReminder),
336}
337
338#[derive(Debug, Default)]
339struct QueuedTranscriptInjections {
340 queue: VecDeque<QueuedTranscriptInjection>,
341 revoked_user_message_ids: HashSet<String>,
342 delivered_user_message_ids: HashSet<String>,
343 revoked_reminder_ids: HashSet<String>,
344 delivered_reminder_ids: HashSet<String>,
345}
346
347#[derive(Clone, Debug, Default)]
348pub struct HostBridgeInjectionState {
349 inner: Arc<Mutex<QueuedTranscriptInjections>>,
350}
351
352#[derive(Clone, Copy, Debug, PartialEq, Eq)]
353pub enum PendingUserMessageMutationResult {
354 Mutated,
355 AlreadyRevoked,
356 AlreadyDelivered,
357 UnknownMessageId,
358}
359
360impl QueuedTranscriptInjection {
361 fn mode(&self) -> QueuedUserMessageMode {
362 match self {
363 Self::User(message) => message.mode,
364 Self::Reminder(reminder) => reminder.mode,
365 }
366 }
367
368 fn pending_json(&self, position: usize) -> serde_json::Value {
369 match self {
370 Self::User(message) => serde_json::json!({
371 "kind": "user",
372 "id": message.message_id,
373 "messageId": message.message_id,
374 "mode": message.mode.as_str(),
375 "position": position,
376 "content": message.transcript_content,
377 }),
378 Self::Reminder(reminder) => serde_json::json!({
379 "kind": "reminder",
380 "id": reminder.reminder.id,
381 "reminderId": reminder.reminder.id,
382 "mode": reminder.mode.as_str(),
383 "position": position,
384 "body": reminder.reminder.body,
385 "tags": reminder.reminder.tags,
386 "dedupeKey": reminder.reminder.dedupe_key,
387 "ttlTurns": reminder.reminder.ttl_turns,
388 "preserveOnCompact": reminder.reminder.preserve_on_compact,
389 "propagate": reminder.reminder.propagate.as_str(),
390 "roleHint": reminder.reminder.role_hint.as_str(),
391 "source": reminder.reminder.source.as_str(),
392 "firedAtTurn": reminder.reminder.fired_at_turn,
393 "originatingAgentId": reminder.reminder.originating_agent_id,
394 }),
395 }
396 }
397}
398
399#[derive(Clone, Copy, Debug, PartialEq, Eq)]
400pub enum PendingReminderMutationResult {
401 Mutated,
402 AlreadyRevoked,
403 AlreadyDelivered,
404 UnknownReminderId,
405}
406
407fn new_inject_message_id() -> String {
408 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
409}
410
411impl HostBridgeInjectionState {
412 pub fn new() -> Self {
413 Self::default()
414 }
415
416 pub async fn push_pending_user_message(
417 &self,
418 content: String,
419 transcript_content: serde_json::Value,
420 mode: &str,
421 ) -> String {
422 let message_id = new_inject_message_id();
423 self.inner
424 .lock()
425 .await
426 .queue
427 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
428 message_id: message_id.clone(),
429 content,
430 transcript_content,
431 mode: QueuedUserMessageMode::from_str(mode),
432 }));
433 message_id
434 }
435
436 pub async fn revoke_pending_user_message(
437 &self,
438 message_id: &str,
439 ) -> PendingUserMessageMutationResult {
440 let mut state = self.inner.lock().await;
441 let mut retained = VecDeque::new();
442 let mut revoked = false;
443 while let Some(injection) = state.queue.pop_front() {
444 match &injection {
445 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
446 revoked = true;
447 }
448 _ => retained.push_back(injection),
449 }
450 }
451 state.queue = retained;
452 if revoked {
453 state
454 .revoked_user_message_ids
455 .insert(message_id.to_string());
456 return PendingUserMessageMutationResult::Mutated;
457 }
458 if state.revoked_user_message_ids.contains(message_id) {
459 PendingUserMessageMutationResult::AlreadyRevoked
460 } else if state.delivered_user_message_ids.contains(message_id) {
461 PendingUserMessageMutationResult::AlreadyDelivered
462 } else {
463 PendingUserMessageMutationResult::UnknownMessageId
464 }
465 }
466
467 pub async fn revoke_pending_reminder(
468 &self,
469 reminder_id: &str,
470 ) -> PendingReminderMutationResult {
471 let mut state = self.inner.lock().await;
472 let mut retained = VecDeque::new();
473 let mut revoked = false;
474 while let Some(injection) = state.queue.pop_front() {
475 match &injection {
476 QueuedTranscriptInjection::Reminder(reminder)
477 if reminder.reminder.id == reminder_id =>
478 {
479 revoked = true;
480 }
481 _ => retained.push_back(injection),
482 }
483 }
484 state.queue = retained;
485 if revoked {
486 state.revoked_reminder_ids.insert(reminder_id.to_string());
487 return PendingReminderMutationResult::Mutated;
488 }
489 if state.revoked_reminder_ids.contains(reminder_id) {
490 PendingReminderMutationResult::AlreadyRevoked
491 } else if state.delivered_reminder_ids.contains(reminder_id) {
492 PendingReminderMutationResult::AlreadyDelivered
493 } else {
494 PendingReminderMutationResult::UnknownReminderId
495 }
496 }
497
498 pub async fn replace_pending_user_message(
499 &self,
500 message_id: &str,
501 content: String,
502 transcript_content: serde_json::Value,
503 ) -> PendingUserMessageMutationResult {
504 let mut state = self.inner.lock().await;
505 for injection in &mut state.queue {
506 if let QueuedTranscriptInjection::User(message) = injection {
507 if message.message_id == message_id {
508 message.content = content;
509 message.transcript_content = transcript_content;
510 return PendingUserMessageMutationResult::Mutated;
511 }
512 }
513 }
514 if state.revoked_user_message_ids.contains(message_id) {
515 PendingUserMessageMutationResult::AlreadyRevoked
516 } else if state.delivered_user_message_ids.contains(message_id) {
517 PendingUserMessageMutationResult::AlreadyDelivered
518 } else {
519 PendingUserMessageMutationResult::UnknownMessageId
520 }
521 }
522
523 async fn push_session_reminder(&self, reminder: QueuedReminder) {
524 self.inner
525 .lock()
526 .await
527 .queue
528 .push_back(QueuedTranscriptInjection::Reminder(reminder));
529 }
530
531 pub async fn pending_injections_json(&self) -> serde_json::Value {
532 let state = self.inner.lock().await;
533 let injections = state
534 .queue
535 .iter()
536 .enumerate()
537 .map(|(position, injection)| injection.pending_json(position))
538 .collect::<Vec<_>>();
539 serde_json::json!({
540 "pendingCount": injections.len(),
541 "injections": injections,
542 })
543 }
544}
545
546fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
547 format!(
548 "{}: {}",
549 Code::ReminderUnknownOption.as_str(),
550 message.as_ref()
551 )
552}
553
554fn session_remind_shape_error(message: impl AsRef<str>) -> String {
555 format!(
556 "{}: {}",
557 Code::ReminderInvalidShape.as_str(),
558 message.as_ref()
559 )
560}
561
562fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
563 format!(
564 "{}: {}",
565 Code::ReminderUnknownPropagate.as_str(),
566 message.as_ref()
567 )
568}
569
570fn string_field(
571 map: &serde_json::Map<String, serde_json::Value>,
572 key: &str,
573 required: bool,
574) -> Result<Option<String>, String> {
575 match map.get(key) {
576 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
577 format!("`{key}` must be a non-empty string"),
578 )),
579 None | Some(serde_json::Value::Null) => Ok(None),
580 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
581 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
582 ),
583 Some(serde_json::Value::String(value)) => {
584 let trimmed = value.trim();
585 if trimmed.is_empty() {
586 Ok(None)
587 } else {
588 Ok(Some(trimmed.to_string()))
589 }
590 }
591 Some(other) => Err(session_remind_shape_error(format!(
592 "`{key}` must be a string, got {other}"
593 ))),
594 }
595}
596
597fn bool_field(
598 map: &serde_json::Map<String, serde_json::Value>,
599 key: &str,
600) -> Result<Option<bool>, String> {
601 match map.get(key) {
602 None | Some(serde_json::Value::Null) => Ok(None),
603 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
604 Some(other) => Err(session_remind_shape_error(format!(
605 "`{key}` must be a bool, got {other}"
606 ))),
607 }
608}
609
610fn int_field(
611 map: &serde_json::Map<String, serde_json::Value>,
612 key: &str,
613) -> Result<Option<i64>, String> {
614 match map.get(key) {
615 None | Some(serde_json::Value::Null) => Ok(None),
616 Some(serde_json::Value::Number(value)) => {
617 let Some(value) = value.as_i64() else {
618 return Err(session_remind_shape_error(format!(
619 "`{key}` must be an integer"
620 )));
621 };
622 Ok(Some(value))
623 }
624 Some(other) => Err(session_remind_shape_error(format!(
625 "`{key}` must be an int, got {other}"
626 ))),
627 }
628}
629
630fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
631 let Some(value) = map.get("tags") else {
632 return Ok(Vec::new());
633 };
634 if value.is_null() {
635 return Ok(Vec::new());
636 }
637 let Some(values) = value.as_array() else {
638 return Err(session_remind_shape_error("`tags` must be a list"));
639 };
640 let mut tags = Vec::new();
641 for value in values {
642 let Some(tag) = value.as_str() else {
643 return Err(session_remind_shape_error(format!(
644 "`tags` entries must be strings, got {value}"
645 )));
646 };
647 let tag = tag.trim();
648 if tag.is_empty() {
649 return Err(session_remind_shape_error(
650 "`tags` entries must be non-empty strings",
651 ));
652 }
653 if !tags.iter().any(|existing| existing == tag) {
654 tags.push(tag.to_string());
655 }
656 }
657 Ok(tags)
658}
659
660fn session_remind_payload_from_value(
661 value: &serde_json::Value,
662) -> Result<crate::llm::helpers::SystemReminder, String> {
663 let Some(map) = value.as_object() else {
664 return Err(session_remind_shape_error(
665 "session/remind payload must be a reminder object",
666 ));
667 };
668 const ALLOWED: &[&str] = &[
669 "_meta",
670 "body",
671 "dedupe_key",
672 "fired_at_turn",
673 "id",
674 "preserve_on_compact",
675 "propagate",
676 "role_hint",
677 "source",
678 "tags",
679 "ttl_turns",
680 ];
681 let unknown = map
682 .keys()
683 .filter(|key| !ALLOWED.contains(&key.as_str()))
684 .map(String::as_str)
685 .collect::<Vec<_>>();
686 if !unknown.is_empty() {
687 if unknown.contains(&"content") {
688 return Err(session_remind_shape_error(
689 "session/remind expects reminder `body`, not user-message `content`",
690 ));
691 }
692 return Err(reminder_unknown_option_error(format!(
693 "unknown reminder option(s): {}",
694 unknown.join(", ")
695 )));
696 }
697 if let Some(meta) = map.get("_meta") {
698 if !meta.is_null() && !meta.is_object() {
699 return Err(session_remind_shape_error("`_meta` must be an object"));
700 }
701 }
702 let ttl_turns = int_field(map, "ttl_turns")?;
703 if let Some(value) = ttl_turns {
704 if value <= 0 {
705 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
706 }
707 }
708 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
709 if fired_at_turn < 0 {
710 return Err(session_remind_shape_error(
711 "`fired_at_turn` must be >= 0 when provided",
712 ));
713 }
714 match string_field(map, "source", false)?.as_deref() {
715 None | Some("bridge") => {}
716 Some(_) => {
717 return Err(session_remind_shape_error(
718 "`source` for session/remind must be bridge when provided",
719 ))
720 }
721 }
722 let propagate = match string_field(map, "propagate", false)?.as_deref() {
723 None => crate::llm::helpers::ReminderPropagate::Session,
724 Some("all") => crate::llm::helpers::ReminderPropagate::All,
725 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
726 Some("none") => crate::llm::helpers::ReminderPropagate::None,
727 Some(_) => {
728 return Err(reminder_unknown_propagate_error(
729 "`propagate` must be one of all, session, or none",
730 ))
731 }
732 };
733 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
734 None => crate::llm::helpers::ReminderRoleHint::System,
735 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
736 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
737 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
738 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
739 Some(_) => {
740 return Err(session_remind_shape_error(
741 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
742 ))
743 }
744 };
745 Ok(crate::llm::helpers::SystemReminder {
746 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
747 tags: tags_field(map)?,
748 dedupe_key: string_field(map, "dedupe_key", false)?,
749 ttl_turns,
750 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
751 propagate,
752 role_hint,
753 source: crate::llm::helpers::ReminderSource::Bridge,
754 body: string_field(map, "body", true)?.unwrap_or_default(),
755 fired_at_turn,
756 originating_agent_id: None,
757 })
758}
759
760fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
770 let session_id = params
771 .get("sessionId")
772 .or_else(|| params.get("session_id"))
773 .and_then(|value| value.as_str())
774 .unwrap_or_default();
775 let call_id = params
776 .get("toolCallId")
777 .or_else(|| params.get("tool_call_id"))
778 .or_else(|| params.get("callId"))
779 .or_else(|| params.get("call_id"))
780 .and_then(|value| value.as_str())
781 .unwrap_or_default();
782 if call_id.is_empty() {
783 return;
784 }
785 let reason = params
786 .get("reason")
787 .and_then(|value| value.as_str())
788 .unwrap_or("host cancelled in-flight tool call")
789 .to_string();
790 let inject_reminder = params
791 .get("injectReminder")
792 .or_else(|| params.get("inject_reminder"))
793 .and_then(|value| value.as_bool())
794 .unwrap_or(true);
795 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
796}
797
798fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
799 let mode = QueuedUserMessageMode::from_str(
800 params
801 .get("mode")
802 .and_then(|value| value.as_str())
803 .unwrap_or("audit_only"),
804 );
805 let reminder_value = if let Some(reminder) = params.get("reminder") {
806 reminder.clone()
807 } else {
808 let Some(params) = params.as_object() else {
809 return Err(session_remind_shape_error(
810 "session/remind params must be an object",
811 ));
812 };
813 let mut reminder = params.clone();
814 reminder.remove("mode");
815 reminder.remove("sessionId");
816 reminder.remove("session_id");
817 serde_json::Value::Object(reminder)
818 };
819 Ok(QueuedReminder {
820 reminder: session_remind_payload_from_value(&reminder_value)?,
821 mode,
822 })
823}
824
825#[allow(clippy::new_without_default)]
827impl HostBridge {
828 pub fn new() -> Self {
833 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
834 Arc::new(Mutex::new(HashMap::new()));
835 let cancelled = Arc::new(AtomicBool::new(false));
836 let cancel_notify = Arc::new(Notify::new());
837 let queued_transcript_injections = HostBridgeInjectionState::default();
838 let resume_requested = Arc::new(AtomicBool::new(false));
839 let skills_reload_requested = Arc::new(AtomicBool::new(false));
840 let daemon_idle = Arc::new(AtomicBool::new(false));
841
842 let pending_clone = pending.clone();
844 let cancelled_clone = cancelled.clone();
845 let cancel_notify_clone = cancel_notify.clone();
846 let queued_clone = queued_transcript_injections.clone();
847 let resume_clone = resume_requested.clone();
848 let skills_reload_clone = skills_reload_requested.clone();
849 tokio::task::spawn_local(async move {
850 let stdin = tokio::io::stdin();
851 let reader = tokio::io::BufReader::new(stdin);
852 let mut lines = reader.lines();
853
854 while let Ok(Some(line)) = lines.next_line().await {
855 let line = line.trim().to_string();
856 if line.is_empty() {
857 continue;
858 }
859
860 let msg: serde_json::Value = match serde_json::from_str(&line) {
861 Ok(v) => v,
862 Err(_) => continue,
863 };
864
865 if msg.get("id").is_none() {
867 if let Some(method) = msg["method"].as_str() {
868 if method == "cancel" {
869 cancelled_clone.store(true, Ordering::SeqCst);
870 cancel_notify_clone.notify_waiters();
871 } else if method == "agent/resume" {
872 resume_clone.store(true, Ordering::SeqCst);
873 } else if method == "skills/update" {
874 skills_reload_clone.store(true, Ordering::SeqCst);
875 } else if method == "session/remind" {
876 let params = &msg["params"];
877 if let Ok(reminder) = queued_session_remind_from_params(params) {
878 queued_clone.push_session_reminder(reminder).await;
879 }
880 } else if method == "session/cancel_tool_call" {
881 handle_cancel_tool_call_notification(&msg["params"]);
882 }
883 }
884 continue;
885 }
886
887 if let Some(id) = msg["id"].as_u64() {
888 let mut pending = pending_clone.lock().await;
889 if let Some(sender) = pending.remove(&id) {
890 let _ = sender.send(msg);
891 }
892 }
893 }
894
895 let mut pending = pending_clone.lock().await;
897 pending.clear();
898 });
899
900 Self {
901 next_id: AtomicU64::new(1),
902 pending,
903 cancelled,
904 cancel_notify,
905 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
906 session_id: std::sync::Mutex::new(String::new()),
907 script_name: std::sync::Mutex::new(String::new()),
908 queued_transcript_injections,
909 resume_requested,
910 skills_reload_requested,
911 daemon_idle,
912 prompt_stop_reason: std::sync::Mutex::new(None),
913 visible_call_states: std::sync::Mutex::new(HashMap::new()),
914 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
915 in_process: None,
916 }
917 }
918
919 pub fn from_parts(
925 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
926 cancelled: Arc<AtomicBool>,
927 stdout_lock: Arc<std::sync::Mutex<()>>,
928 start_id: u64,
929 ) -> Self {
930 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
931 }
932
933 pub fn from_parts_with_writer(
934 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
935 cancelled: Arc<AtomicBool>,
936 writer: HostBridgeWriter,
937 start_id: u64,
938 ) -> Self {
939 Self::from_parts_with_writer_and_cancel_notify(
940 pending,
941 cancelled,
942 Arc::new(Notify::new()),
943 writer,
944 start_id,
945 )
946 }
947
948 pub fn from_parts_with_writer_and_cancel_notify(
949 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
950 cancelled: Arc<AtomicBool>,
951 cancel_notify: Arc<Notify>,
952 writer: HostBridgeWriter,
953 start_id: u64,
954 ) -> Self {
955 Self::from_parts_with_writer_cancel_notify_and_injection_state(
956 pending,
957 cancelled,
958 cancel_notify,
959 writer,
960 start_id,
961 None,
962 )
963 }
964
965 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
966 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
967 cancelled: Arc<AtomicBool>,
968 cancel_notify: Arc<Notify>,
969 writer: HostBridgeWriter,
970 start_id: u64,
971 injection_state: Option<HostBridgeInjectionState>,
972 ) -> Self {
973 Self {
974 next_id: AtomicU64::new(start_id),
975 pending,
976 cancelled,
977 cancel_notify,
978 writer,
979 session_id: std::sync::Mutex::new(String::new()),
980 script_name: std::sync::Mutex::new(String::new()),
981 queued_transcript_injections: injection_state.unwrap_or_default(),
982 resume_requested: Arc::new(AtomicBool::new(false)),
983 skills_reload_requested: Arc::new(AtomicBool::new(false)),
984 daemon_idle: Arc::new(AtomicBool::new(false)),
985 prompt_stop_reason: std::sync::Mutex::new(None),
986 visible_call_states: std::sync::Mutex::new(HashMap::new()),
987 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
988 in_process: None,
989 }
990 }
991
992 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
995 let exported_functions = vm.load_module_exports(module_path).await?;
996 Ok(Self {
997 next_id: AtomicU64::new(1),
998 pending: Arc::new(Mutex::new(HashMap::new())),
999 cancelled: Arc::new(AtomicBool::new(false)),
1000 cancel_notify: Arc::new(Notify::new()),
1001 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
1002 session_id: std::sync::Mutex::new(String::new()),
1003 script_name: std::sync::Mutex::new(String::new()),
1004 queued_transcript_injections: HostBridgeInjectionState::default(),
1005 resume_requested: Arc::new(AtomicBool::new(false)),
1006 skills_reload_requested: Arc::new(AtomicBool::new(false)),
1007 daemon_idle: Arc::new(AtomicBool::new(false)),
1008 prompt_stop_reason: std::sync::Mutex::new(None),
1009 visible_call_states: std::sync::Mutex::new(HashMap::new()),
1010 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
1011 in_process: Some(InProcessHost {
1012 module_path: module_path.to_path_buf(),
1013 exported_functions,
1014 vm,
1015 }),
1016 })
1017 }
1018
1019 pub fn set_session_id(&self, id: &str) {
1021 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1022 }
1023
1024 pub fn set_script_name(&self, name: &str) {
1026 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1027 }
1028
1029 fn get_script_name(&self) -> String {
1031 self.script_name
1032 .lock()
1033 .unwrap_or_else(|e| e.into_inner())
1034 .clone()
1035 }
1036
1037 pub fn get_session_id(&self) -> String {
1039 self.session_id
1040 .lock()
1041 .unwrap_or_else(|e| e.into_inner())
1042 .clone()
1043 }
1044
1045 fn write_line(&self, line: &str) -> Result<(), VmError> {
1047 (self.writer)(line).map_err(VmError::Runtime)
1048 }
1049
1050 pub async fn call(
1053 &self,
1054 method: &str,
1055 params: serde_json::Value,
1056 ) -> Result<serde_json::Value, VmError> {
1057 if let Some(in_process) = &self.in_process {
1058 return in_process.dispatch(method, params).await;
1059 }
1060
1061 if self.is_cancelled() {
1062 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1063 }
1064
1065 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1066 let cancel_wait = self.cancel_notify.notified();
1067 tokio::pin!(cancel_wait);
1068
1069 let request = crate::jsonrpc::request(id, method, params);
1070
1071 let (tx, rx) = oneshot::channel();
1072 {
1073 let mut pending = self.pending.lock().await;
1074 pending.insert(id, tx);
1075 }
1076
1077 let line = serde_json::to_string(&request)
1078 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1079 if let Err(e) = self.write_line(&line) {
1080 let mut pending = self.pending.lock().await;
1081 pending.remove(&id);
1082 return Err(e);
1083 }
1084
1085 if self.is_cancelled() {
1086 let mut pending = self.pending.lock().await;
1087 pending.remove(&id);
1088 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1089 }
1090
1091 let response = tokio::select! {
1092 result = rx => match result {
1093 Ok(msg) => msg,
1094 Err(_) => {
1095 return Err(VmError::Runtime(
1097 "Bridge: host closed connection before responding".into(),
1098 ));
1099 }
1100 },
1101 _ = &mut cancel_wait => {
1102 let mut pending = self.pending.lock().await;
1103 pending.remove(&id);
1104 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1105 }
1106 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1107 let mut pending = self.pending.lock().await;
1108 pending.remove(&id);
1109 return Err(VmError::Runtime(format!(
1110 "Bridge: host did not respond to '{method}' within {}s",
1111 DEFAULT_TIMEOUT.as_secs()
1112 )));
1113 }
1114 };
1115
1116 if let Some(error) = response.get("error") {
1117 let message = error["message"].as_str().unwrap_or("Unknown host error");
1118 let code = error["code"].as_i64().unwrap_or(-1);
1119 if code == -32001 {
1121 return Err(VmError::CategorizedError {
1122 message: message.to_string(),
1123 category: ErrorCategory::ToolRejected,
1124 });
1125 }
1126 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1127 }
1128
1129 Ok(response["result"].clone())
1130 }
1131
1132 pub fn notify(&self, method: &str, params: serde_json::Value) {
1135 let notification = crate::jsonrpc::notification(method, params);
1136 if self.in_process.is_some() {
1137 return;
1138 }
1139 if let Ok(line) = serde_json::to_string(¬ification) {
1140 let _ = self.write_line(&line);
1141 }
1142 }
1143
1144 pub fn is_cancelled(&self) -> bool {
1146 self.cancelled.load(Ordering::SeqCst)
1147 }
1148
1149 pub fn take_resume_signal(&self) -> bool {
1150 self.resume_requested.swap(false, Ordering::SeqCst)
1151 }
1152
1153 pub fn signal_resume(&self) {
1154 self.resume_requested.store(true, Ordering::SeqCst);
1155 }
1156
1157 pub fn set_daemon_idle(&self, idle: bool) {
1158 self.daemon_idle.store(idle, Ordering::SeqCst);
1159 }
1160
1161 pub fn is_daemon_idle(&self) -> bool {
1162 self.daemon_idle.load(Ordering::SeqCst)
1163 }
1164
1165 pub fn set_prompt_stop_reason(&self, reason: &str) {
1170 *self
1171 .prompt_stop_reason
1172 .lock()
1173 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1174 }
1175
1176 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1181 self.prompt_stop_reason
1182 .lock()
1183 .unwrap_or_else(|e| e.into_inner())
1184 .take()
1185 }
1186
1187 pub fn take_skills_reload_signal(&self) -> bool {
1192 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1193 }
1194
1195 pub fn signal_skills_reload(&self) {
1199 self.skills_reload_requested.store(true, Ordering::SeqCst);
1200 }
1201
1202 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1208 let result = self.call("skills/list", serde_json::json!({})).await?;
1209 match result {
1210 serde_json::Value::Array(items) => Ok(items),
1211 serde_json::Value::Object(map) => match map.get("skills") {
1212 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1213 _ => Err(VmError::Runtime(
1214 "skills/list: host response must be an array or { skills: [...] }".into(),
1215 )),
1216 },
1217 _ => Err(VmError::Runtime(
1218 "skills/list: unexpected response shape".into(),
1219 )),
1220 }
1221 }
1222
1223 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1229 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1230 parse_host_tools_list_response(result)
1231 }
1232
1233 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1237 self.call("skills/fetch", serde_json::json!({ "id": id }))
1238 .await
1239 }
1240
1241 pub fn injection_state(&self) -> HostBridgeInjectionState {
1242 self.queued_transcript_injections.clone()
1243 }
1244
1245 pub async fn push_pending_user_message(
1246 &self,
1247 content: String,
1248 transcript_content: serde_json::Value,
1249 mode: &str,
1250 ) -> String {
1251 self.queued_transcript_injections
1252 .push_pending_user_message(content, transcript_content, mode)
1253 .await
1254 }
1255
1256 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1257 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1258 .await
1259 }
1260
1261 pub async fn revoke_pending_user_message(
1262 &self,
1263 message_id: &str,
1264 ) -> PendingUserMessageMutationResult {
1265 self.queued_transcript_injections
1266 .revoke_pending_user_message(message_id)
1267 .await
1268 }
1269
1270 pub async fn revoke_pending_reminder(
1271 &self,
1272 reminder_id: &str,
1273 ) -> PendingReminderMutationResult {
1274 self.queued_transcript_injections
1275 .revoke_pending_reminder(reminder_id)
1276 .await
1277 }
1278
1279 pub async fn pending_injections_json(&self) -> serde_json::Value {
1280 self.queued_transcript_injections
1281 .pending_injections_json()
1282 .await
1283 }
1284
1285 pub async fn replace_pending_user_message(
1286 &self,
1287 message_id: &str,
1288 content: String,
1289 transcript_content: serde_json::Value,
1290 ) -> PendingUserMessageMutationResult {
1291 self.queued_transcript_injections
1292 .replace_pending_user_message(message_id, content, transcript_content)
1293 .await
1294 }
1295
1296 pub async fn push_queued_session_remind_from_params(
1297 &self,
1298 params: &serde_json::Value,
1299 ) -> Result<String, String> {
1300 let reminder = queued_session_remind_from_params(params)?;
1301 let reminder_id = reminder.reminder.id.clone();
1302 self.queued_transcript_injections
1303 .push_session_reminder(reminder)
1304 .await;
1305 Ok(reminder_id)
1306 }
1307
1308 pub async fn take_queued_user_messages(
1309 &self,
1310 include_interrupt_immediate: bool,
1311 include_finish_step: bool,
1312 include_audit_only: bool,
1313 ) -> Vec<QueuedUserMessage> {
1314 let mut state = self.queued_transcript_injections.inner.lock().await;
1315 let mut selected = Vec::new();
1316 let mut retained = VecDeque::new();
1317 while let Some(injection) = state.queue.pop_front() {
1318 let should_take = match injection.mode() {
1319 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1320 QueuedUserMessageMode::FinishStep => include_finish_step,
1321 QueuedUserMessageMode::AuditOnly => include_audit_only,
1322 };
1323 match (should_take, injection) {
1324 (true, QueuedTranscriptInjection::User(message)) => {
1325 state
1326 .delivered_user_message_ids
1327 .insert(message.message_id.clone());
1328 selected.push(message);
1329 }
1330 (_, injection) => retained.push_back(injection),
1331 }
1332 }
1333 state.queue = retained;
1334 selected
1335 }
1336
1337 pub async fn take_queued_transcript_injections(
1338 &self,
1339 include_interrupt_immediate: bool,
1340 include_finish_step: bool,
1341 include_audit_only: bool,
1342 ) -> Vec<QueuedTranscriptInjection> {
1343 let mut state = self.queued_transcript_injections.inner.lock().await;
1344 let mut selected = Vec::new();
1345 let mut retained = VecDeque::new();
1346 while let Some(injection) = state.queue.pop_front() {
1347 let should_take = match injection.mode() {
1348 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1349 QueuedUserMessageMode::FinishStep => include_finish_step,
1350 QueuedUserMessageMode::AuditOnly => include_audit_only,
1351 };
1352 if should_take {
1353 match &injection {
1354 QueuedTranscriptInjection::User(message) => {
1355 state
1356 .delivered_user_message_ids
1357 .insert(message.message_id.clone());
1358 }
1359 QueuedTranscriptInjection::Reminder(reminder) => {
1360 state
1361 .delivered_reminder_ids
1362 .insert(reminder.reminder.id.clone());
1363 }
1364 }
1365 selected.push(injection);
1366 } else {
1367 retained.push_back(injection);
1368 }
1369 }
1370 state.queue = retained;
1371 selected
1372 }
1373
1374 pub async fn take_queued_user_messages_for(
1375 &self,
1376 checkpoint: DeliveryCheckpoint,
1377 ) -> Vec<QueuedUserMessage> {
1378 match checkpoint {
1379 DeliveryCheckpoint::InterruptImmediate => {
1380 self.take_queued_user_messages(true, false, false).await
1381 }
1382 DeliveryCheckpoint::AfterCurrentOperation => {
1383 self.take_queued_user_messages(false, true, false).await
1384 }
1385 DeliveryCheckpoint::EndOfInteraction => {
1386 self.take_queued_user_messages(false, false, true).await
1387 }
1388 }
1389 }
1390
1391 pub async fn take_queued_transcript_injections_for(
1392 &self,
1393 checkpoint: DeliveryCheckpoint,
1394 ) -> Vec<QueuedTranscriptInjection> {
1395 match checkpoint {
1396 DeliveryCheckpoint::InterruptImmediate => {
1397 self.take_queued_transcript_injections(true, false, false)
1398 .await
1399 }
1400 DeliveryCheckpoint::AfterCurrentOperation => {
1401 self.take_queued_transcript_injections(false, true, false)
1402 .await
1403 }
1404 DeliveryCheckpoint::EndOfInteraction => {
1405 self.take_queued_transcript_injections(false, false, true)
1406 .await
1407 }
1408 }
1409 }
1410
1411 pub fn send_output(&self, text: &str) {
1413 self.notify("output", serde_json::json!({"text": text}));
1414 }
1415
1416 pub fn send_progress(
1418 &self,
1419 phase: &str,
1420 message: &str,
1421 progress: Option<i64>,
1422 total: Option<i64>,
1423 data: Option<serde_json::Value>,
1424 ) {
1425 let mut payload = serde_json::json!({"phase": phase, "message": message});
1426 if let Some(p) = progress {
1427 payload["progress"] = serde_json::json!(p);
1428 }
1429 if let Some(t) = total {
1430 payload["total"] = serde_json::json!(t);
1431 }
1432 if let Some(d) = data {
1433 payload["data"] = d;
1434 }
1435 self.notify("progress", payload);
1436 }
1437
1438 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1440 let mut payload = serde_json::json!({"level": level, "message": message});
1441 if let Some(f) = fields {
1442 payload["fields"] = f;
1443 }
1444 self.notify("log", payload);
1445 }
1446
1447 pub fn send_call_start(
1450 &self,
1451 call_id: &str,
1452 call_type: &str,
1453 name: &str,
1454 metadata: serde_json::Value,
1455 ) {
1456 let session_id = self.get_session_id();
1457 let script = self.get_script_name();
1458 let stream_publicly = metadata
1459 .get("stream_publicly")
1460 .and_then(|value| value.as_bool())
1461 .unwrap_or(true);
1462 self.visible_call_streams
1463 .lock()
1464 .unwrap_or_else(|e| e.into_inner())
1465 .insert(call_id.to_string(), stream_publicly);
1466 self.notify(
1467 "session/update",
1468 serde_json::json!({
1469 "sessionId": session_id,
1470 "update": {
1471 "sessionUpdate": "call_start",
1472 "content": {
1473 "toolCallId": call_id,
1474 "call_type": call_type,
1475 "name": name,
1476 "script": script,
1477 "metadata": metadata,
1478 },
1479 },
1480 }),
1481 );
1482 }
1483
1484 pub fn send_call_progress(
1487 &self,
1488 call_id: &str,
1489 delta: &str,
1490 accumulated_tokens: u64,
1491 user_visible: bool,
1492 ) {
1493 let session_id = self.get_session_id();
1494 let (visible_text, visible_delta) = {
1495 let stream_publicly = self
1496 .visible_call_streams
1497 .lock()
1498 .unwrap_or_else(|e| e.into_inner())
1499 .get(call_id)
1500 .copied()
1501 .unwrap_or(true);
1502 let mut states = self
1503 .visible_call_states
1504 .lock()
1505 .unwrap_or_else(|e| e.into_inner());
1506 let state = states.entry(call_id.to_string()).or_default();
1507 state.push(delta, stream_publicly)
1508 };
1509 self.notify(
1510 "session/update",
1511 serde_json::json!({
1512 "sessionId": session_id,
1513 "update": {
1514 "sessionUpdate": "call_progress",
1515 "content": {
1516 "toolCallId": call_id,
1517 "delta": delta,
1518 "accumulated_tokens": accumulated_tokens,
1519 "visible_text": visible_text,
1520 "visible_delta": visible_delta,
1521 "user_visible": user_visible,
1522 },
1523 },
1524 }),
1525 );
1526 }
1527
1528 pub fn send_call_end(
1530 &self,
1531 call_id: &str,
1532 call_type: &str,
1533 name: &str,
1534 duration_ms: u64,
1535 status: &str,
1536 metadata: serde_json::Value,
1537 ) {
1538 let session_id = self.get_session_id();
1539 let script = self.get_script_name();
1540 self.visible_call_states
1541 .lock()
1542 .unwrap_or_else(|e| e.into_inner())
1543 .remove(call_id);
1544 self.visible_call_streams
1545 .lock()
1546 .unwrap_or_else(|e| e.into_inner())
1547 .remove(call_id);
1548 self.notify(
1549 "session/update",
1550 serde_json::json!({
1551 "sessionId": session_id,
1552 "update": {
1553 "sessionUpdate": "call_end",
1554 "content": {
1555 "toolCallId": call_id,
1556 "call_type": call_type,
1557 "name": name,
1558 "script": script,
1559 "duration_ms": duration_ms,
1560 "status": status,
1561 "metadata": metadata,
1562 },
1563 },
1564 }),
1565 );
1566 }
1567
1568 pub fn send_worker_update(
1570 &self,
1571 worker_id: &str,
1572 worker_name: &str,
1573 status: &str,
1574 metadata: serde_json::Value,
1575 audit: Option<&MutationSessionRecord>,
1576 ) {
1577 let session_id = self.get_session_id();
1578 let script = self.get_script_name();
1579 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1580 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1581 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1582 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1583 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1584 let lifecycle = serde_json::json!({
1585 "event": status,
1586 "worker_id": worker_id,
1587 "worker_name": worker_name,
1588 "started_at": started_at,
1589 "finished_at": finished_at,
1590 });
1591 self.notify(
1592 "session/update",
1593 serde_json::json!({
1594 "sessionId": session_id,
1595 "update": {
1596 "sessionUpdate": "worker_update",
1597 "content": {
1598 "worker_id": worker_id,
1599 "worker_name": worker_name,
1600 "status": status,
1601 "script": script,
1602 "started_at": started_at,
1603 "finished_at": finished_at,
1604 "snapshot_path": snapshot_path,
1605 "run_id": run_id,
1606 "run_path": run_path,
1607 "lifecycle": lifecycle,
1608 "audit": audit,
1609 "metadata": metadata,
1610 },
1611 },
1612 }),
1613 );
1614 }
1615}
1616
1617pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1619 crate::stdlib::json_to_vm_value(val)
1620}
1621
1622fn parse_host_tools_list_response(
1623 result: serde_json::Value,
1624) -> Result<Vec<serde_json::Value>, VmError> {
1625 let tools = match result {
1626 serde_json::Value::Array(items) => items,
1627 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1628 map.get("result")
1629 .and_then(|value| value.get("tools"))
1630 .cloned()
1631 }) {
1632 Some(serde_json::Value::Array(items)) => items,
1633 _ => {
1634 return Err(VmError::Runtime(
1635 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1636 ));
1637 }
1638 },
1639 _ => {
1640 return Err(VmError::Runtime(
1641 "host/tools/list: unexpected response shape".into(),
1642 ));
1643 }
1644 };
1645
1646 let mut normalized = Vec::with_capacity(tools.len());
1647 for tool in tools {
1648 let serde_json::Value::Object(map) = tool else {
1649 return Err(VmError::Runtime(
1650 "host/tools/list: every tool must be an object".into(),
1651 ));
1652 };
1653 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1654 return Err(VmError::Runtime(
1655 "host/tools/list: every tool must include a string `name`".into(),
1656 ));
1657 };
1658 let description = map
1659 .get("description")
1660 .and_then(|value| value.as_str())
1661 .or_else(|| {
1662 map.get("short_description")
1663 .and_then(|value| value.as_str())
1664 })
1665 .unwrap_or_default();
1666 let schema = map
1667 .get("schema")
1668 .cloned()
1669 .or_else(|| map.get("parameters").cloned())
1670 .or_else(|| map.get("input_schema").cloned())
1671 .unwrap_or(serde_json::Value::Null);
1672 let deprecated = map
1673 .get("deprecated")
1674 .and_then(|value| value.as_bool())
1675 .unwrap_or(false);
1676 normalized.push(serde_json::json!({
1677 "name": name,
1678 "description": description,
1679 "schema": schema,
1680 "deprecated": deprecated,
1681 }));
1682 }
1683 Ok(normalized)
1684}
1685
1686#[cfg(test)]
1687mod tests {
1688 use super::*;
1689
1690 fn test_bridge() -> HostBridge {
1691 HostBridge::from_parts(
1692 Arc::new(Mutex::new(HashMap::new())),
1693 Arc::new(AtomicBool::new(false)),
1694 Arc::new(std::sync::Mutex::new(())),
1695 1,
1696 )
1697 }
1698
1699 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1700 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1701 Arc::new(Mutex::new(HashMap::new())),
1702 Arc::new(AtomicBool::new(false)),
1703 Arc::new(Notify::new()),
1704 Arc::new(|_| Ok(())),
1705 100,
1706 Some(owner.injection_state()),
1707 )
1708 }
1709
1710 #[test]
1711 fn test_json_rpc_request_format() {
1712 let request = crate::jsonrpc::request(
1713 1,
1714 "llm_call",
1715 serde_json::json!({
1716 "prompt": "Hello",
1717 "system": "Be helpful",
1718 }),
1719 );
1720 let s = serde_json::to_string(&request).unwrap();
1721 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1722 assert!(s.contains("\"id\":1"));
1723 assert!(s.contains("\"method\":\"llm_call\""));
1724 }
1725
1726 #[test]
1727 fn test_json_rpc_notification_format() {
1728 let notification =
1729 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1730 let s = serde_json::to_string(¬ification).unwrap();
1731 assert!(s.contains("\"method\":\"output\""));
1732 assert!(!s.contains("\"id\""));
1733 }
1734
1735 #[test]
1736 fn test_json_rpc_error_response_parsing() {
1737 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1738 assert!(response.get("error").is_some());
1739 assert_eq!(
1740 response["error"]["message"].as_str().unwrap(),
1741 "Invalid request"
1742 );
1743 }
1744
1745 #[test]
1746 fn test_json_rpc_success_response_parsing() {
1747 let response = crate::jsonrpc::response(
1748 1,
1749 serde_json::json!({
1750 "text": "Hello world",
1751 "input_tokens": 10,
1752 "output_tokens": 5,
1753 }),
1754 );
1755 assert!(response.get("result").is_some());
1756 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1757 }
1758
1759 #[test]
1760 fn test_cancelled_flag() {
1761 let cancelled = Arc::new(AtomicBool::new(false));
1762 assert!(!cancelled.load(Ordering::SeqCst));
1763 cancelled.store(true, Ordering::SeqCst);
1764 assert!(cancelled.load(Ordering::SeqCst));
1765 }
1766
1767 #[test]
1768 fn pending_host_calls_return_when_cancellation_arrives() {
1769 let runtime = tokio::runtime::Builder::new_current_thread()
1770 .enable_all()
1771 .build()
1772 .unwrap();
1773 runtime.block_on(async {
1774 let pending = Arc::new(Mutex::new(HashMap::new()));
1775 let cancelled = Arc::new(AtomicBool::new(false));
1776 let bridge = HostBridge::from_parts_with_writer(
1777 pending.clone(),
1778 cancelled.clone(),
1779 Arc::new(|_| Ok(())),
1780 1,
1781 );
1782
1783 let call = bridge.call("host/work", serde_json::json!({}));
1784 tokio::pin!(call);
1785
1786 loop {
1787 tokio::select! {
1788 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1789 _ = tokio::task::yield_now() => {}
1790 }
1791 if !pending.lock().await.is_empty() {
1792 break;
1793 }
1794 }
1795
1796 cancelled.store(true, Ordering::SeqCst);
1797 bridge.cancel_notify.notify_waiters();
1798
1799 let result = tokio::time::timeout(Duration::from_secs(1), call)
1800 .await
1801 .expect("pending call should observe cancellation promptly");
1802 assert!(
1803 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1804 );
1805 assert!(pending.lock().await.is_empty());
1806 });
1807 }
1808
1809 #[test]
1810 fn queued_messages_are_filtered_by_delivery_mode() {
1811 let runtime = tokio::runtime::Builder::new_current_thread()
1812 .enable_all()
1813 .build()
1814 .unwrap();
1815 runtime.block_on(async {
1816 let bridge = test_bridge();
1817 bridge
1818 .push_queued_user_message("first".to_string(), "finish_step")
1819 .await;
1820 bridge
1821 .push_queued_user_message("second".to_string(), "audit_only")
1822 .await;
1823
1824 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1825 assert_eq!(finish_step.len(), 1);
1826 assert_eq!(finish_step[0].content, "first");
1827
1828 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1829 assert_eq!(audit_only.len(), 1);
1830 assert_eq!(audit_only[0].content, "second");
1831 });
1832 }
1833
1834 #[test]
1835 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1836 let runtime = tokio::runtime::Builder::new_current_thread()
1837 .enable_all()
1838 .build()
1839 .unwrap();
1840 runtime.block_on(async {
1841 let bridge = test_bridge();
1842 let first_id = bridge
1843 .push_pending_user_message(
1844 "first".to_string(),
1845 serde_json::json!("first"),
1846 "audit_only",
1847 )
1848 .await;
1849 let second_id = bridge
1850 .push_pending_user_message(
1851 "second".to_string(),
1852 serde_json::json!("second"),
1853 "audit_only",
1854 )
1855 .await;
1856
1857 assert_eq!(
1858 bridge
1859 .replace_pending_user_message(
1860 &second_id,
1861 "second edited".to_string(),
1862 serde_json::json!("second edited"),
1863 )
1864 .await,
1865 PendingUserMessageMutationResult::Mutated
1866 );
1867 assert_eq!(
1868 bridge.revoke_pending_user_message(&first_id).await,
1869 PendingUserMessageMutationResult::Mutated
1870 );
1871 assert_eq!(
1872 bridge.revoke_pending_user_message(&first_id).await,
1873 PendingUserMessageMutationResult::AlreadyRevoked
1874 );
1875
1876 let delivered = bridge
1877 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1878 .await;
1879 assert_eq!(delivered.len(), 1);
1880 assert_eq!(delivered[0].message_id, second_id);
1881 assert_eq!(delivered[0].content, "second edited");
1882
1883 assert_eq!(
1884 bridge.revoke_pending_user_message(&second_id).await,
1885 PendingUserMessageMutationResult::AlreadyDelivered
1886 );
1887 assert_eq!(
1888 bridge
1889 .replace_pending_user_message(
1890 &second_id,
1891 "too late".to_string(),
1892 serde_json::json!("too late"),
1893 )
1894 .await,
1895 PendingUserMessageMutationResult::AlreadyDelivered
1896 );
1897 assert_eq!(
1898 bridge.revoke_pending_user_message("missing").await,
1899 PendingUserMessageMutationResult::UnknownMessageId
1900 );
1901 });
1902 }
1903
1904 #[test]
1905 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1906 let runtime = tokio::runtime::Builder::new_current_thread()
1907 .enable_all()
1908 .build()
1909 .unwrap();
1910 runtime.block_on(async {
1911 let bridge = test_bridge();
1912 let first_id = bridge
1913 .push_pending_user_message(
1914 "first".to_string(),
1915 serde_json::json!("first"),
1916 "finish_step",
1917 )
1918 .await;
1919 let second_id = bridge
1920 .push_pending_user_message(
1921 "second".to_string(),
1922 serde_json::json!("second"),
1923 "finish_step",
1924 )
1925 .await;
1926 assert_eq!(
1927 bridge
1928 .replace_pending_user_message(
1929 &first_id,
1930 "first edited".to_string(),
1931 serde_json::json!("first edited"),
1932 )
1933 .await,
1934 PendingUserMessageMutationResult::Mutated
1935 );
1936
1937 let delivered = bridge
1938 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1939 .await;
1940 assert_eq!(
1941 delivered
1942 .iter()
1943 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1944 .collect::<Vec<_>>(),
1945 vec![
1946 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1947 (&second_id, "second", QueuedUserMessageMode::FinishStep),
1948 ]
1949 );
1950 });
1951 }
1952
1953 #[test]
1954 fn pending_user_message_state_survives_bridge_replacement() {
1955 let runtime = tokio::runtime::Builder::new_current_thread()
1956 .enable_all()
1957 .build()
1958 .unwrap();
1959 runtime.block_on(async {
1960 let bridge = test_bridge();
1961 let revoked_id = bridge
1962 .push_pending_user_message(
1963 "revoke me".to_string(),
1964 serde_json::json!("revoke me"),
1965 "audit_only",
1966 )
1967 .await;
1968 let delivered_id = bridge
1969 .push_pending_user_message(
1970 "deliver me".to_string(),
1971 serde_json::json!("deliver me"),
1972 "audit_only",
1973 )
1974 .await;
1975 assert_eq!(
1976 bridge.revoke_pending_user_message(&revoked_id).await,
1977 PendingUserMessageMutationResult::Mutated
1978 );
1979 bridge.cancelled.store(true, Ordering::SeqCst);
1980
1981 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1982 assert_eq!(
1983 replacement_bridge
1984 .revoke_pending_user_message(&revoked_id)
1985 .await,
1986 PendingUserMessageMutationResult::AlreadyRevoked
1987 );
1988 let delivered = replacement_bridge
1989 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1990 .await;
1991 assert_eq!(delivered.len(), 1);
1992 assert_eq!(delivered[0].message_id, delivered_id);
1993 assert_eq!(delivered[0].content, "deliver me");
1994 assert_eq!(
1995 bridge.revoke_pending_user_message(&delivered_id).await,
1996 PendingUserMessageMutationResult::AlreadyDelivered
1997 );
1998 });
1999 }
2000
2001 #[test]
2002 fn queued_transcript_injections_preserve_user_reminder_separation() {
2003 let runtime = tokio::runtime::Builder::new_current_thread()
2004 .enable_all()
2005 .build()
2006 .unwrap();
2007 runtime.block_on(async {
2008 let bridge = test_bridge();
2009 bridge
2010 .push_queued_user_message("human follow-up".to_string(), "finish_step")
2011 .await;
2012 let reminder_id = bridge
2013 .push_queued_session_remind_from_params(&serde_json::json!({
2014 "body": "Host-provided ambient context.",
2015 "tags": ["host"],
2016 "dedupe_key": "host-context",
2017 "ttl_turns": 2,
2018 "mode": "audit_only",
2019 "_meta": {"harn": {"source": "test"}},
2020 }))
2021 .await
2022 .expect("valid reminder");
2023
2024 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2025 assert_eq!(finish_step.len(), 1);
2026 assert_eq!(finish_step[0].content, "human follow-up");
2027
2028 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2029 assert!(no_user_messages.is_empty());
2030
2031 let injections = bridge
2032 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2033 .await;
2034 assert_eq!(injections.len(), 1);
2035 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2036 panic!("expected queued reminder");
2037 };
2038 assert_eq!(reminder.reminder.id, reminder_id);
2039 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2040 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2041 assert_eq!(
2042 reminder.reminder.dedupe_key.as_deref(),
2043 Some("host-context")
2044 );
2045 assert_eq!(reminder.reminder.ttl_turns, Some(2));
2046 assert_eq!(
2047 reminder.reminder.source,
2048 crate::llm::helpers::ReminderSource::Bridge
2049 );
2050 });
2051 }
2052
2053 #[test]
2054 fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2055 let runtime = tokio::runtime::Builder::new_current_thread()
2056 .enable_all()
2057 .build()
2058 .unwrap();
2059 runtime.block_on(async {
2060 let bridge = test_bridge();
2061 let message_id = bridge
2062 .push_pending_user_message(
2063 "human follow-up".to_string(),
2064 serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2065 "finish_step",
2066 )
2067 .await;
2068 let reminder_id = bridge
2069 .push_queued_session_remind_from_params(&serde_json::json!({
2070 "id": "rem-test",
2071 "body": "Host reminder",
2072 "tags": ["host"],
2073 "dedupe_key": "host-reminder",
2074 "ttl_turns": 2,
2075 "mode": "interrupt_immediate",
2076 }))
2077 .await
2078 .expect("valid session/remind payload");
2079
2080 let pending = bridge.pending_injections_json().await;
2081 assert_eq!(pending["pendingCount"], 2);
2082 assert_eq!(pending["injections"][0]["kind"], "user");
2083 assert_eq!(pending["injections"][0]["id"], message_id);
2084 assert_eq!(pending["injections"][0]["messageId"], message_id);
2085 assert_eq!(pending["injections"][0]["mode"], "finish_step");
2086 assert_eq!(pending["injections"][0]["position"], 0);
2087 assert_eq!(pending["injections"][1]["kind"], "reminder");
2088 assert_eq!(pending["injections"][1]["id"], reminder_id);
2089 assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2090 assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2091 assert_eq!(pending["injections"][1]["body"], "Host reminder");
2092 assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2093 assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2094 assert_eq!(pending["injections"][1]["position"], 1);
2095 });
2096 }
2097
2098 #[test]
2099 fn pending_reminders_support_revoke_and_delivery_states() {
2100 let runtime = tokio::runtime::Builder::new_current_thread()
2101 .enable_all()
2102 .build()
2103 .unwrap();
2104 runtime.block_on(async {
2105 let bridge = test_bridge();
2106 let revoked_id = bridge
2107 .push_queued_session_remind_from_params(&serde_json::json!({
2108 "id": "rem-revoke",
2109 "body": "remove me",
2110 "mode": "finish_step",
2111 }))
2112 .await
2113 .expect("valid session/remind payload");
2114 let delivered_id = bridge
2115 .push_queued_session_remind_from_params(&serde_json::json!({
2116 "id": "rem-deliver",
2117 "body": "deliver me",
2118 "mode": "finish_step",
2119 }))
2120 .await
2121 .expect("valid session/remind payload");
2122
2123 assert_eq!(
2124 bridge.revoke_pending_reminder(&revoked_id).await,
2125 PendingReminderMutationResult::Mutated
2126 );
2127 assert_eq!(
2128 bridge.revoke_pending_reminder(&revoked_id).await,
2129 PendingReminderMutationResult::AlreadyRevoked
2130 );
2131
2132 let pending = bridge.pending_injections_json().await;
2133 assert_eq!(pending["pendingCount"], 1);
2134 assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2135
2136 let delivered = bridge
2137 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2138 .await;
2139 assert_eq!(delivered.len(), 1);
2140 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2141 panic!("expected delivered reminder");
2142 };
2143 assert_eq!(reminder.reminder.id, delivered_id);
2144
2145 assert_eq!(
2146 bridge.revoke_pending_reminder(&delivered_id).await,
2147 PendingReminderMutationResult::AlreadyDelivered
2148 );
2149 assert_eq!(
2150 bridge.revoke_pending_reminder("missing").await,
2151 PendingReminderMutationResult::UnknownReminderId
2152 );
2153 });
2154 }
2155
2156 #[test]
2157 fn bridge_remind_modes_honor_delivery_checkpoints() {
2158 let runtime = tokio::runtime::Builder::new_current_thread()
2159 .enable_all()
2160 .build()
2161 .unwrap();
2162 runtime.block_on(async {
2163 let cases = [
2164 (
2165 "interrupt_immediate",
2166 DeliveryCheckpoint::InterruptImmediate,
2167 DeliveryCheckpoint::AfterCurrentOperation,
2168 ),
2169 (
2170 "finish_step",
2171 DeliveryCheckpoint::AfterCurrentOperation,
2172 DeliveryCheckpoint::EndOfInteraction,
2173 ),
2174 (
2175 "audit_only",
2176 DeliveryCheckpoint::EndOfInteraction,
2177 DeliveryCheckpoint::InterruptImmediate,
2178 ),
2179 ];
2180
2181 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2182 let bridge = test_bridge();
2183 bridge
2184 .push_queued_session_remind_from_params(&serde_json::json!({
2185 "body": format!("Reminder for {mode}"),
2186 "mode": mode,
2187 }))
2188 .await
2189 .expect("valid session/remind payload");
2190
2191 let premature = bridge
2192 .take_queued_transcript_injections_for(wrong_checkpoint)
2193 .await;
2194 assert!(
2195 premature.is_empty(),
2196 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2197 );
2198
2199 let delivered = bridge
2200 .take_queued_transcript_injections_for(expected_checkpoint)
2201 .await;
2202 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2203 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2204 panic!("expected reminder for {mode}");
2205 };
2206 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2207 }
2208 });
2209 }
2210
2211 #[test]
2212 fn session_remind_validation_rejects_user_message_shape() {
2213 let err = queued_session_remind_from_params(&serde_json::json!({
2214 "content": "this is still a user message",
2215 "mode": "interrupt_immediate",
2216 }))
2217 .expect_err("session/remind must require a reminder body");
2218 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2219 assert!(err.contains("body"));
2220 }
2221
2222 #[test]
2223 fn session_remind_validation_rejects_unknown_options_separately() {
2224 let err = queued_session_remind_from_params(&serde_json::json!({
2225 "body": "valid body",
2226 "unknown_host_field": true,
2227 }))
2228 .expect_err("session/remind must reject unknown top-level fields");
2229 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2230 assert!(err.contains("unknown_host_field"));
2231 }
2232
2233 #[test]
2234 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2235 let err = queued_session_remind_from_params(&serde_json::json!({
2236 "body": "valid body",
2237 "propagate": "workspace",
2238 }))
2239 .expect_err("session/remind must reject unknown propagate values");
2240 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2241 assert!(err.contains("propagate"));
2242 }
2243
2244 #[test]
2245 fn test_json_result_to_vm_value_string() {
2246 let val = serde_json::json!("hello");
2247 let vm_val = json_result_to_vm_value(&val);
2248 assert_eq!(vm_val.display(), "hello");
2249 }
2250
2251 #[test]
2252 fn test_json_result_to_vm_value_dict() {
2253 let val = serde_json::json!({"name": "test", "count": 42});
2254 let vm_val = json_result_to_vm_value(&val);
2255 let VmValue::Dict(d) = &vm_val else {
2256 unreachable!("Expected Dict, got {:?}", vm_val);
2257 };
2258 assert_eq!(d.get("name").unwrap().display(), "test");
2259 assert_eq!(d.get("count").unwrap().display(), "42");
2260 }
2261
2262 #[test]
2263 fn test_json_result_to_vm_value_null() {
2264 let val = serde_json::json!(null);
2265 let vm_val = json_result_to_vm_value(&val);
2266 assert!(matches!(vm_val, VmValue::Nil));
2267 }
2268
2269 #[test]
2270 fn test_json_result_to_vm_value_nested() {
2271 let val = serde_json::json!({
2272 "text": "response",
2273 "tool_calls": [
2274 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2275 ],
2276 "input_tokens": 100,
2277 "output_tokens": 50,
2278 });
2279 let vm_val = json_result_to_vm_value(&val);
2280 let VmValue::Dict(d) = &vm_val else {
2281 unreachable!("Expected Dict, got {:?}", vm_val);
2282 };
2283 assert_eq!(d.get("text").unwrap().display(), "response");
2284 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2285 unreachable!("Expected List for tool_calls");
2286 };
2287 assert_eq!(list.len(), 1);
2288 }
2289
2290 #[test]
2291 fn parse_host_tools_list_accepts_object_wrapper() {
2292 let tools = parse_host_tools_list_response(serde_json::json!({
2293 "tools": [
2294 {
2295 "name": "Read",
2296 "description": "Read a file",
2297 "schema": {"type": "object"},
2298 }
2299 ]
2300 }))
2301 .expect("tool list");
2302
2303 assert_eq!(tools.len(), 1);
2304 assert_eq!(tools[0]["name"], "Read");
2305 assert_eq!(tools[0]["deprecated"], false);
2306 }
2307
2308 #[test]
2309 fn parse_host_tools_list_accepts_compat_fields() {
2310 let tools = parse_host_tools_list_response(serde_json::json!({
2311 "result": {
2312 "tools": [
2313 {
2314 "name": "Edit",
2315 "short_description": "Apply an edit",
2316 "input_schema": {"type": "object"},
2317 "deprecated": true,
2318 }
2319 ]
2320 }
2321 }))
2322 .expect("tool list");
2323
2324 assert_eq!(tools[0]["description"], "Apply an edit");
2325 assert_eq!(tools[0]["schema"]["type"], "object");
2326 assert_eq!(tools[0]["deprecated"], true);
2327 }
2328
2329 #[test]
2330 fn parse_host_tools_list_requires_tool_names() {
2331 let err = parse_host_tools_list_response(serde_json::json!({
2332 "tools": [
2333 {"description": "missing name"}
2334 ]
2335 }))
2336 .expect_err("expected error");
2337 assert!(err
2338 .to_string()
2339 .contains("host/tools/list: every tool must include a string `name`"));
2340 }
2341
2342 #[test]
2343 fn test_timeout_duration() {
2344 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2345 }
2346}