1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33 Arc::new(move |line: &str| {
34 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35 let mut stdout = std::io::stdout().lock();
36 stdout
37 .write_all(line.as_bytes())
38 .map_err(|e| format!("Bridge write error: {e}"))?;
39 stdout
40 .write_all(b"\n")
41 .map_err(|e| format!("Bridge write error: {e}"))?;
42 stdout
43 .flush()
44 .map_err(|e| format!("Bridge flush error: {e}"))?;
45 Ok(())
46 })
47}
48
49pub struct HostBridge {
56 next_id: AtomicU64,
57 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59 cancelled: Arc<AtomicBool>,
61 cancel_notify: Arc<Notify>,
63 writer: HostBridgeWriter,
65 session_id: std::sync::Mutex<String>,
67 script_name: std::sync::Mutex<String>,
69 queued_transcript_injections: HostBridgeInjectionState,
71 resume_requested: Arc<AtomicBool>,
73 skills_reload_requested: Arc<AtomicBool>,
78 daemon_idle: Arc<AtomicBool>,
80 prompt_stop_reason: std::sync::Mutex<Option<String>>,
86 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90 in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95 module_path: PathBuf,
96 exported_functions: BTreeMap<String, Rc<VmClosure>>,
97 vm: Vm,
98}
99
100impl InProcessHost {
101 fn dispatch<'a>(
107 &'a self,
108 method: &'a str,
109 params: serde_json::Value,
110 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111 Box::pin(async move {
112 match method {
113 "builtin_call" => {
114 let name = params
115 .get("name")
116 .and_then(|value| value.as_str())
117 .unwrap_or_default();
118 let args = params
119 .get("args")
120 .and_then(|value| value.as_array())
121 .cloned()
122 .unwrap_or_default()
123 .into_iter()
124 .map(|value| json_result_to_vm_value(&value))
125 .collect::<Vec<_>>();
126 self.invoke_export(name, &args).await
127 }
128 "host/tools/list" => self
129 .invoke_optional_export("host_tools_list", &[])
130 .await
131 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132 "session/request_permission" => self.request_permission(params).await,
133 other => Err(VmError::Runtime(format!(
134 "playground host backend does not implement bridge method '{other}'"
135 ))),
136 }
137 })
138 }
139
140 async fn invoke_export(
141 &self,
142 name: &str,
143 args: &[VmValue],
144 ) -> Result<serde_json::Value, VmError> {
145 let Some(closure) = self.exported_functions.get(name) else {
146 return Err(VmError::Runtime(format!(
147 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148 self.module_path.display()
149 )));
150 };
151
152 let mut vm = self.vm.child_vm_for_host();
153 let result = vm.call_closure_pub(closure, args).await?;
154 Ok(crate::llm::vm_value_to_json(&result))
155 }
156
157 async fn invoke_optional_export(
158 &self,
159 name: &str,
160 args: &[VmValue],
161 ) -> Result<Option<serde_json::Value>, VmError> {
162 if !self.exported_functions.contains_key(name) {
163 return Ok(None);
164 }
165 self.invoke_export(name, args).await.map(Some)
166 }
167
168 async fn request_permission(
169 &self,
170 params: serde_json::Value,
171 ) -> Result<serde_json::Value, VmError> {
172 let Some(closure) = self.exported_functions.get("request_permission") else {
173 return Ok(serde_json::json!({ "granted": true }));
174 };
175
176 let tool_name = params
177 .get("toolCall")
178 .and_then(|tool_call| tool_call.get("toolName"))
179 .and_then(|value| value.as_str())
180 .unwrap_or_default();
181 let tool_args = params
182 .get("toolCall")
183 .and_then(|tool_call| tool_call.get("rawInput"))
184 .map(json_result_to_vm_value)
185 .unwrap_or(VmValue::Nil);
186 let full_payload = json_result_to_vm_value(¶ms);
187
188 let arg_count = closure.func.params.len();
189 let args = if arg_count >= 3 {
190 vec![
191 VmValue::String(Rc::from(tool_name.to_string())),
192 tool_args,
193 full_payload,
194 ]
195 } else if arg_count == 2 {
196 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
197 } else if arg_count == 1 {
198 vec![full_payload]
199 } else {
200 Vec::new()
201 };
202
203 let mut vm = self.vm.child_vm_for_host();
204 let result = vm.call_closure_pub(closure, &args).await?;
205 let payload = match result {
206 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
207 VmValue::String(reason) if !reason.is_empty() => {
208 serde_json::json!({ "granted": false, "reason": reason.to_string() })
209 }
210 other => {
211 let json = crate::llm::vm_value_to_json(&other);
212 if json
213 .get("granted")
214 .and_then(|value| value.as_bool())
215 .is_some()
216 || json.get("outcome").is_some()
217 {
218 json
219 } else {
220 serde_json::json!({ "granted": other.is_truthy() })
221 }
222 }
223 };
224 Ok(payload)
225 }
226}
227
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
238pub enum QueuedUserMessageMode {
239 InterruptImmediate,
240 FinishStep,
241 AuditOnly,
242}
243
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245pub enum DeliveryCheckpoint {
246 InterruptImmediate,
247 AfterCurrentOperation,
248 EndOfInteraction,
249}
250
251impl QueuedUserMessageMode {
252 fn from_str(value: &str) -> Self {
253 match value {
254 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
255 "finish_step" | "after_current_operation" => Self::FinishStep,
256 _ => Self::AuditOnly,
261 }
262 }
263
264 fn as_str(self) -> &'static str {
265 match self {
266 Self::InterruptImmediate => "interrupt_immediate",
267 Self::FinishStep => "finish_step",
268 Self::AuditOnly => "audit_only",
269 }
270 }
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct QueuedUserMessage {
275 pub message_id: String,
276 pub content: String,
277 pub transcript_content: serde_json::Value,
278 pub mode: QueuedUserMessageMode,
279}
280
281#[derive(Clone, Debug, PartialEq, Eq)]
282pub struct QueuedReminder {
283 pub reminder: crate::llm::helpers::SystemReminder,
284 pub mode: QueuedUserMessageMode,
285}
286
287#[derive(Clone, Debug, PartialEq, Eq)]
288pub enum QueuedTranscriptInjection {
289 User(QueuedUserMessage),
290 Reminder(QueuedReminder),
291}
292
293#[derive(Debug, Default)]
294struct QueuedTranscriptInjections {
295 queue: VecDeque<QueuedTranscriptInjection>,
296 revoked_user_message_ids: HashSet<String>,
297 delivered_user_message_ids: HashSet<String>,
298 revoked_reminder_ids: HashSet<String>,
299 delivered_reminder_ids: HashSet<String>,
300}
301
302#[derive(Clone, Debug, Default)]
303pub struct HostBridgeInjectionState {
304 inner: Arc<Mutex<QueuedTranscriptInjections>>,
305}
306
307#[derive(Clone, Copy, Debug, PartialEq, Eq)]
308pub enum PendingUserMessageMutationResult {
309 Mutated,
310 AlreadyRevoked,
311 AlreadyDelivered,
312 UnknownMessageId,
313}
314
315impl QueuedTranscriptInjection {
316 fn mode(&self) -> QueuedUserMessageMode {
317 match self {
318 Self::User(message) => message.mode,
319 Self::Reminder(reminder) => reminder.mode,
320 }
321 }
322
323 fn pending_json(&self, position: usize) -> serde_json::Value {
324 match self {
325 Self::User(message) => serde_json::json!({
326 "kind": "user",
327 "id": message.message_id,
328 "messageId": message.message_id,
329 "mode": message.mode.as_str(),
330 "position": position,
331 "content": message.transcript_content,
332 }),
333 Self::Reminder(reminder) => serde_json::json!({
334 "kind": "reminder",
335 "id": reminder.reminder.id,
336 "reminderId": reminder.reminder.id,
337 "mode": reminder.mode.as_str(),
338 "position": position,
339 "body": reminder.reminder.body,
340 "tags": reminder.reminder.tags,
341 "dedupeKey": reminder.reminder.dedupe_key,
342 "ttlTurns": reminder.reminder.ttl_turns,
343 "preserveOnCompact": reminder.reminder.preserve_on_compact,
344 "propagate": reminder.reminder.propagate.as_str(),
345 "roleHint": reminder.reminder.role_hint.as_str(),
346 "source": reminder.reminder.source.as_str(),
347 "firedAtTurn": reminder.reminder.fired_at_turn,
348 "originatingAgentId": reminder.reminder.originating_agent_id,
349 }),
350 }
351 }
352}
353
354#[derive(Clone, Copy, Debug, PartialEq, Eq)]
355pub enum PendingReminderMutationResult {
356 Mutated,
357 AlreadyRevoked,
358 AlreadyDelivered,
359 UnknownReminderId,
360}
361
362fn new_inject_message_id() -> String {
363 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
364}
365
366impl HostBridgeInjectionState {
367 pub fn new() -> Self {
368 Self::default()
369 }
370
371 pub async fn push_pending_user_message(
372 &self,
373 content: String,
374 transcript_content: serde_json::Value,
375 mode: &str,
376 ) -> String {
377 let message_id = new_inject_message_id();
378 self.inner
379 .lock()
380 .await
381 .queue
382 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
383 message_id: message_id.clone(),
384 content,
385 transcript_content,
386 mode: QueuedUserMessageMode::from_str(mode),
387 }));
388 message_id
389 }
390
391 pub async fn revoke_pending_user_message(
392 &self,
393 message_id: &str,
394 ) -> PendingUserMessageMutationResult {
395 let mut state = self.inner.lock().await;
396 let mut retained = VecDeque::new();
397 let mut revoked = false;
398 while let Some(injection) = state.queue.pop_front() {
399 match &injection {
400 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
401 revoked = true;
402 }
403 _ => retained.push_back(injection),
404 }
405 }
406 state.queue = retained;
407 if revoked {
408 state
409 .revoked_user_message_ids
410 .insert(message_id.to_string());
411 return PendingUserMessageMutationResult::Mutated;
412 }
413 if state.revoked_user_message_ids.contains(message_id) {
414 PendingUserMessageMutationResult::AlreadyRevoked
415 } else if state.delivered_user_message_ids.contains(message_id) {
416 PendingUserMessageMutationResult::AlreadyDelivered
417 } else {
418 PendingUserMessageMutationResult::UnknownMessageId
419 }
420 }
421
422 pub async fn revoke_pending_reminder(
423 &self,
424 reminder_id: &str,
425 ) -> PendingReminderMutationResult {
426 let mut state = self.inner.lock().await;
427 let mut retained = VecDeque::new();
428 let mut revoked = false;
429 while let Some(injection) = state.queue.pop_front() {
430 match &injection {
431 QueuedTranscriptInjection::Reminder(reminder)
432 if reminder.reminder.id == reminder_id =>
433 {
434 revoked = true;
435 }
436 _ => retained.push_back(injection),
437 }
438 }
439 state.queue = retained;
440 if revoked {
441 state.revoked_reminder_ids.insert(reminder_id.to_string());
442 return PendingReminderMutationResult::Mutated;
443 }
444 if state.revoked_reminder_ids.contains(reminder_id) {
445 PendingReminderMutationResult::AlreadyRevoked
446 } else if state.delivered_reminder_ids.contains(reminder_id) {
447 PendingReminderMutationResult::AlreadyDelivered
448 } else {
449 PendingReminderMutationResult::UnknownReminderId
450 }
451 }
452
453 pub async fn replace_pending_user_message(
454 &self,
455 message_id: &str,
456 content: String,
457 transcript_content: serde_json::Value,
458 ) -> PendingUserMessageMutationResult {
459 let mut state = self.inner.lock().await;
460 for injection in &mut state.queue {
461 if let QueuedTranscriptInjection::User(message) = injection {
462 if message.message_id == message_id {
463 message.content = content;
464 message.transcript_content = transcript_content;
465 return PendingUserMessageMutationResult::Mutated;
466 }
467 }
468 }
469 if state.revoked_user_message_ids.contains(message_id) {
470 PendingUserMessageMutationResult::AlreadyRevoked
471 } else if state.delivered_user_message_ids.contains(message_id) {
472 PendingUserMessageMutationResult::AlreadyDelivered
473 } else {
474 PendingUserMessageMutationResult::UnknownMessageId
475 }
476 }
477
478 async fn push_session_reminder(&self, reminder: QueuedReminder) {
479 self.inner
480 .lock()
481 .await
482 .queue
483 .push_back(QueuedTranscriptInjection::Reminder(reminder));
484 }
485
486 pub async fn pending_injections_json(&self) -> serde_json::Value {
487 let state = self.inner.lock().await;
488 let injections = state
489 .queue
490 .iter()
491 .enumerate()
492 .map(|(position, injection)| injection.pending_json(position))
493 .collect::<Vec<_>>();
494 serde_json::json!({
495 "pendingCount": injections.len(),
496 "injections": injections,
497 })
498 }
499}
500
501fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
502 format!(
503 "{}: {}",
504 Code::ReminderUnknownOption.as_str(),
505 message.as_ref()
506 )
507}
508
509fn session_remind_shape_error(message: impl AsRef<str>) -> String {
510 format!(
511 "{}: {}",
512 Code::ReminderInvalidShape.as_str(),
513 message.as_ref()
514 )
515}
516
517fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
518 format!(
519 "{}: {}",
520 Code::ReminderUnknownPropagate.as_str(),
521 message.as_ref()
522 )
523}
524
525fn string_field(
526 map: &serde_json::Map<String, serde_json::Value>,
527 key: &str,
528 required: bool,
529) -> Result<Option<String>, String> {
530 match map.get(key) {
531 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
532 format!("`{key}` must be a non-empty string"),
533 )),
534 None | Some(serde_json::Value::Null) => Ok(None),
535 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
536 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
537 ),
538 Some(serde_json::Value::String(value)) => {
539 let trimmed = value.trim();
540 if trimmed.is_empty() {
541 Ok(None)
542 } else {
543 Ok(Some(trimmed.to_string()))
544 }
545 }
546 Some(other) => Err(session_remind_shape_error(format!(
547 "`{key}` must be a string, got {other}"
548 ))),
549 }
550}
551
552fn bool_field(
553 map: &serde_json::Map<String, serde_json::Value>,
554 key: &str,
555) -> Result<Option<bool>, String> {
556 match map.get(key) {
557 None | Some(serde_json::Value::Null) => Ok(None),
558 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
559 Some(other) => Err(session_remind_shape_error(format!(
560 "`{key}` must be a bool, got {other}"
561 ))),
562 }
563}
564
565fn int_field(
566 map: &serde_json::Map<String, serde_json::Value>,
567 key: &str,
568) -> Result<Option<i64>, String> {
569 match map.get(key) {
570 None | Some(serde_json::Value::Null) => Ok(None),
571 Some(serde_json::Value::Number(value)) => {
572 let Some(value) = value.as_i64() else {
573 return Err(session_remind_shape_error(format!(
574 "`{key}` must be an integer"
575 )));
576 };
577 Ok(Some(value))
578 }
579 Some(other) => Err(session_remind_shape_error(format!(
580 "`{key}` must be an int, got {other}"
581 ))),
582 }
583}
584
585fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
586 let Some(value) = map.get("tags") else {
587 return Ok(Vec::new());
588 };
589 if value.is_null() {
590 return Ok(Vec::new());
591 }
592 let Some(values) = value.as_array() else {
593 return Err(session_remind_shape_error("`tags` must be a list"));
594 };
595 let mut tags = Vec::new();
596 for value in values {
597 let Some(tag) = value.as_str() else {
598 return Err(session_remind_shape_error(format!(
599 "`tags` entries must be strings, got {value}"
600 )));
601 };
602 let tag = tag.trim();
603 if tag.is_empty() {
604 return Err(session_remind_shape_error(
605 "`tags` entries must be non-empty strings",
606 ));
607 }
608 if !tags.iter().any(|existing| existing == tag) {
609 tags.push(tag.to_string());
610 }
611 }
612 Ok(tags)
613}
614
615fn session_remind_payload_from_value(
616 value: &serde_json::Value,
617) -> Result<crate::llm::helpers::SystemReminder, String> {
618 let Some(map) = value.as_object() else {
619 return Err(session_remind_shape_error(
620 "session/remind payload must be a reminder object",
621 ));
622 };
623 const ALLOWED: &[&str] = &[
624 "_meta",
625 "body",
626 "dedupe_key",
627 "fired_at_turn",
628 "id",
629 "preserve_on_compact",
630 "propagate",
631 "role_hint",
632 "source",
633 "tags",
634 "ttl_turns",
635 ];
636 let unknown = map
637 .keys()
638 .filter(|key| !ALLOWED.contains(&key.as_str()))
639 .map(String::as_str)
640 .collect::<Vec<_>>();
641 if !unknown.is_empty() {
642 if unknown.contains(&"content") {
643 return Err(session_remind_shape_error(
644 "session/remind expects reminder `body`, not user-message `content`",
645 ));
646 }
647 return Err(reminder_unknown_option_error(format!(
648 "unknown reminder option(s): {}",
649 unknown.join(", ")
650 )));
651 }
652 if let Some(meta) = map.get("_meta") {
653 if !meta.is_null() && !meta.is_object() {
654 return Err(session_remind_shape_error("`_meta` must be an object"));
655 }
656 }
657 let ttl_turns = int_field(map, "ttl_turns")?;
658 if let Some(value) = ttl_turns {
659 if value <= 0 {
660 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
661 }
662 }
663 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
664 if fired_at_turn < 0 {
665 return Err(session_remind_shape_error(
666 "`fired_at_turn` must be >= 0 when provided",
667 ));
668 }
669 match string_field(map, "source", false)?.as_deref() {
670 None | Some("bridge") => {}
671 Some(_) => {
672 return Err(session_remind_shape_error(
673 "`source` for session/remind must be bridge when provided",
674 ))
675 }
676 }
677 let propagate = match string_field(map, "propagate", false)?.as_deref() {
678 None => crate::llm::helpers::ReminderPropagate::Session,
679 Some("all") => crate::llm::helpers::ReminderPropagate::All,
680 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
681 Some("none") => crate::llm::helpers::ReminderPropagate::None,
682 Some(_) => {
683 return Err(reminder_unknown_propagate_error(
684 "`propagate` must be one of all, session, or none",
685 ))
686 }
687 };
688 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
689 None => crate::llm::helpers::ReminderRoleHint::System,
690 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
691 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
692 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
693 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
694 Some(_) => {
695 return Err(session_remind_shape_error(
696 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
697 ))
698 }
699 };
700 Ok(crate::llm::helpers::SystemReminder {
701 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
702 tags: tags_field(map)?,
703 dedupe_key: string_field(map, "dedupe_key", false)?,
704 ttl_turns,
705 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
706 propagate,
707 role_hint,
708 source: crate::llm::helpers::ReminderSource::Bridge,
709 body: string_field(map, "body", true)?.unwrap_or_default(),
710 fired_at_turn,
711 originating_agent_id: None,
712 })
713}
714
715fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
725 let session_id = params
726 .get("sessionId")
727 .or_else(|| params.get("session_id"))
728 .and_then(|value| value.as_str())
729 .unwrap_or_default();
730 let call_id = params
731 .get("toolCallId")
732 .or_else(|| params.get("tool_call_id"))
733 .or_else(|| params.get("callId"))
734 .or_else(|| params.get("call_id"))
735 .and_then(|value| value.as_str())
736 .unwrap_or_default();
737 if call_id.is_empty() {
738 return;
739 }
740 let reason = params
741 .get("reason")
742 .and_then(|value| value.as_str())
743 .unwrap_or("host cancelled in-flight tool call")
744 .to_string();
745 let inject_reminder = params
746 .get("injectReminder")
747 .or_else(|| params.get("inject_reminder"))
748 .and_then(|value| value.as_bool())
749 .unwrap_or(true);
750 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
751}
752
753fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
754 let mode = QueuedUserMessageMode::from_str(
755 params
756 .get("mode")
757 .and_then(|value| value.as_str())
758 .unwrap_or("audit_only"),
759 );
760 let reminder_value = if let Some(reminder) = params.get("reminder") {
761 reminder.clone()
762 } else {
763 let Some(params) = params.as_object() else {
764 return Err(session_remind_shape_error(
765 "session/remind params must be an object",
766 ));
767 };
768 let mut reminder = params.clone();
769 reminder.remove("mode");
770 reminder.remove("sessionId");
771 reminder.remove("session_id");
772 serde_json::Value::Object(reminder)
773 };
774 Ok(QueuedReminder {
775 reminder: session_remind_payload_from_value(&reminder_value)?,
776 mode,
777 })
778}
779
780#[allow(clippy::new_without_default)]
782impl HostBridge {
783 pub fn new() -> Self {
788 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
789 Arc::new(Mutex::new(HashMap::new()));
790 let cancelled = Arc::new(AtomicBool::new(false));
791 let cancel_notify = Arc::new(Notify::new());
792 let queued_transcript_injections = HostBridgeInjectionState::default();
793 let resume_requested = Arc::new(AtomicBool::new(false));
794 let skills_reload_requested = Arc::new(AtomicBool::new(false));
795 let daemon_idle = Arc::new(AtomicBool::new(false));
796
797 let pending_clone = pending.clone();
799 let cancelled_clone = cancelled.clone();
800 let cancel_notify_clone = cancel_notify.clone();
801 let queued_clone = queued_transcript_injections.clone();
802 let resume_clone = resume_requested.clone();
803 let skills_reload_clone = skills_reload_requested.clone();
804 tokio::task::spawn_local(async move {
805 let stdin = tokio::io::stdin();
806 let reader = tokio::io::BufReader::new(stdin);
807 let mut lines = reader.lines();
808
809 while let Ok(Some(line)) = lines.next_line().await {
810 let line = line.trim().to_string();
811 if line.is_empty() {
812 continue;
813 }
814
815 let msg: serde_json::Value = match serde_json::from_str(&line) {
816 Ok(v) => v,
817 Err(_) => continue,
818 };
819
820 if msg.get("id").is_none() {
822 if let Some(method) = msg["method"].as_str() {
823 if method == "cancel" {
824 cancelled_clone.store(true, Ordering::SeqCst);
825 cancel_notify_clone.notify_waiters();
826 } else if method == "agent/resume" {
827 resume_clone.store(true, Ordering::SeqCst);
828 } else if method == "skills/update" {
829 skills_reload_clone.store(true, Ordering::SeqCst);
830 } else if method == "session/remind" {
831 let params = &msg["params"];
832 if let Ok(reminder) = queued_session_remind_from_params(params) {
833 queued_clone.push_session_reminder(reminder).await;
834 }
835 } else if method == "session/cancel_tool_call" {
836 handle_cancel_tool_call_notification(&msg["params"]);
837 }
838 }
839 continue;
840 }
841
842 if let Some(id) = msg["id"].as_u64() {
843 let mut pending = pending_clone.lock().await;
844 if let Some(sender) = pending.remove(&id) {
845 let _ = sender.send(msg);
846 }
847 }
848 }
849
850 let mut pending = pending_clone.lock().await;
852 pending.clear();
853 });
854
855 Self {
856 next_id: AtomicU64::new(1),
857 pending,
858 cancelled,
859 cancel_notify,
860 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
861 session_id: std::sync::Mutex::new(String::new()),
862 script_name: std::sync::Mutex::new(String::new()),
863 queued_transcript_injections,
864 resume_requested,
865 skills_reload_requested,
866 daemon_idle,
867 prompt_stop_reason: std::sync::Mutex::new(None),
868 visible_call_states: std::sync::Mutex::new(HashMap::new()),
869 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
870 in_process: None,
871 }
872 }
873
874 pub fn from_parts(
880 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
881 cancelled: Arc<AtomicBool>,
882 stdout_lock: Arc<std::sync::Mutex<()>>,
883 start_id: u64,
884 ) -> Self {
885 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
886 }
887
888 pub fn from_parts_with_writer(
889 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
890 cancelled: Arc<AtomicBool>,
891 writer: HostBridgeWriter,
892 start_id: u64,
893 ) -> Self {
894 Self::from_parts_with_writer_and_cancel_notify(
895 pending,
896 cancelled,
897 Arc::new(Notify::new()),
898 writer,
899 start_id,
900 )
901 }
902
903 pub fn from_parts_with_writer_and_cancel_notify(
904 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
905 cancelled: Arc<AtomicBool>,
906 cancel_notify: Arc<Notify>,
907 writer: HostBridgeWriter,
908 start_id: u64,
909 ) -> Self {
910 Self::from_parts_with_writer_cancel_notify_and_injection_state(
911 pending,
912 cancelled,
913 cancel_notify,
914 writer,
915 start_id,
916 None,
917 )
918 }
919
920 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
921 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
922 cancelled: Arc<AtomicBool>,
923 cancel_notify: Arc<Notify>,
924 writer: HostBridgeWriter,
925 start_id: u64,
926 injection_state: Option<HostBridgeInjectionState>,
927 ) -> Self {
928 Self {
929 next_id: AtomicU64::new(start_id),
930 pending,
931 cancelled,
932 cancel_notify,
933 writer,
934 session_id: std::sync::Mutex::new(String::new()),
935 script_name: std::sync::Mutex::new(String::new()),
936 queued_transcript_injections: injection_state.unwrap_or_default(),
937 resume_requested: Arc::new(AtomicBool::new(false)),
938 skills_reload_requested: Arc::new(AtomicBool::new(false)),
939 daemon_idle: Arc::new(AtomicBool::new(false)),
940 prompt_stop_reason: std::sync::Mutex::new(None),
941 visible_call_states: std::sync::Mutex::new(HashMap::new()),
942 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
943 in_process: None,
944 }
945 }
946
947 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
950 let exported_functions = vm.load_module_exports(module_path).await?;
951 Ok(Self {
952 next_id: AtomicU64::new(1),
953 pending: Arc::new(Mutex::new(HashMap::new())),
954 cancelled: Arc::new(AtomicBool::new(false)),
955 cancel_notify: Arc::new(Notify::new()),
956 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
957 session_id: std::sync::Mutex::new(String::new()),
958 script_name: std::sync::Mutex::new(String::new()),
959 queued_transcript_injections: HostBridgeInjectionState::default(),
960 resume_requested: Arc::new(AtomicBool::new(false)),
961 skills_reload_requested: Arc::new(AtomicBool::new(false)),
962 daemon_idle: Arc::new(AtomicBool::new(false)),
963 prompt_stop_reason: std::sync::Mutex::new(None),
964 visible_call_states: std::sync::Mutex::new(HashMap::new()),
965 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
966 in_process: Some(InProcessHost {
967 module_path: module_path.to_path_buf(),
968 exported_functions,
969 vm,
970 }),
971 })
972 }
973
974 pub fn set_session_id(&self, id: &str) {
976 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
977 }
978
979 pub fn set_script_name(&self, name: &str) {
981 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
982 }
983
984 fn get_script_name(&self) -> String {
986 self.script_name
987 .lock()
988 .unwrap_or_else(|e| e.into_inner())
989 .clone()
990 }
991
992 pub fn get_session_id(&self) -> String {
994 self.session_id
995 .lock()
996 .unwrap_or_else(|e| e.into_inner())
997 .clone()
998 }
999
1000 fn write_line(&self, line: &str) -> Result<(), VmError> {
1002 (self.writer)(line).map_err(VmError::Runtime)
1003 }
1004
1005 pub async fn call(
1008 &self,
1009 method: &str,
1010 params: serde_json::Value,
1011 ) -> Result<serde_json::Value, VmError> {
1012 if let Some(in_process) = &self.in_process {
1013 return in_process.dispatch(method, params).await;
1014 }
1015
1016 if self.is_cancelled() {
1017 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1018 }
1019
1020 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1021 let cancel_wait = self.cancel_notify.notified();
1022 tokio::pin!(cancel_wait);
1023
1024 let request = crate::jsonrpc::request(id, method, params);
1025
1026 let (tx, rx) = oneshot::channel();
1027 {
1028 let mut pending = self.pending.lock().await;
1029 pending.insert(id, tx);
1030 }
1031
1032 let line = serde_json::to_string(&request)
1033 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1034 if let Err(e) = self.write_line(&line) {
1035 let mut pending = self.pending.lock().await;
1036 pending.remove(&id);
1037 return Err(e);
1038 }
1039
1040 if self.is_cancelled() {
1041 let mut pending = self.pending.lock().await;
1042 pending.remove(&id);
1043 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1044 }
1045
1046 let response = tokio::select! {
1047 result = rx => match result {
1048 Ok(msg) => msg,
1049 Err(_) => {
1050 return Err(VmError::Runtime(
1052 "Bridge: host closed connection before responding".into(),
1053 ));
1054 }
1055 },
1056 _ = &mut cancel_wait => {
1057 let mut pending = self.pending.lock().await;
1058 pending.remove(&id);
1059 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1060 }
1061 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1062 let mut pending = self.pending.lock().await;
1063 pending.remove(&id);
1064 return Err(VmError::Runtime(format!(
1065 "Bridge: host did not respond to '{method}' within {}s",
1066 DEFAULT_TIMEOUT.as_secs()
1067 )));
1068 }
1069 };
1070
1071 if let Some(error) = response.get("error") {
1072 let message = error["message"].as_str().unwrap_or("Unknown host error");
1073 let code = error["code"].as_i64().unwrap_or(-1);
1074 if code == -32001 {
1076 return Err(VmError::CategorizedError {
1077 message: message.to_string(),
1078 category: ErrorCategory::ToolRejected,
1079 });
1080 }
1081 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1082 }
1083
1084 Ok(response["result"].clone())
1085 }
1086
1087 pub fn notify(&self, method: &str, params: serde_json::Value) {
1090 let notification = crate::jsonrpc::notification(method, params);
1091 if self.in_process.is_some() {
1092 return;
1093 }
1094 if let Ok(line) = serde_json::to_string(¬ification) {
1095 let _ = self.write_line(&line);
1096 }
1097 }
1098
1099 pub fn is_cancelled(&self) -> bool {
1101 self.cancelled.load(Ordering::SeqCst)
1102 }
1103
1104 pub fn take_resume_signal(&self) -> bool {
1105 self.resume_requested.swap(false, Ordering::SeqCst)
1106 }
1107
1108 pub fn signal_resume(&self) {
1109 self.resume_requested.store(true, Ordering::SeqCst);
1110 }
1111
1112 pub fn set_daemon_idle(&self, idle: bool) {
1113 self.daemon_idle.store(idle, Ordering::SeqCst);
1114 }
1115
1116 pub fn is_daemon_idle(&self) -> bool {
1117 self.daemon_idle.load(Ordering::SeqCst)
1118 }
1119
1120 pub fn set_prompt_stop_reason(&self, reason: &str) {
1125 *self
1126 .prompt_stop_reason
1127 .lock()
1128 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1129 }
1130
1131 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1136 self.prompt_stop_reason
1137 .lock()
1138 .unwrap_or_else(|e| e.into_inner())
1139 .take()
1140 }
1141
1142 pub fn take_skills_reload_signal(&self) -> bool {
1147 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1148 }
1149
1150 pub fn signal_skills_reload(&self) {
1154 self.skills_reload_requested.store(true, Ordering::SeqCst);
1155 }
1156
1157 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1163 let result = self.call("skills/list", serde_json::json!({})).await?;
1164 match result {
1165 serde_json::Value::Array(items) => Ok(items),
1166 serde_json::Value::Object(map) => match map.get("skills") {
1167 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1168 _ => Err(VmError::Runtime(
1169 "skills/list: host response must be an array or { skills: [...] }".into(),
1170 )),
1171 },
1172 _ => Err(VmError::Runtime(
1173 "skills/list: unexpected response shape".into(),
1174 )),
1175 }
1176 }
1177
1178 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1184 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1185 parse_host_tools_list_response(result)
1186 }
1187
1188 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1192 self.call("skills/fetch", serde_json::json!({ "id": id }))
1193 .await
1194 }
1195
1196 pub fn injection_state(&self) -> HostBridgeInjectionState {
1197 self.queued_transcript_injections.clone()
1198 }
1199
1200 pub async fn push_pending_user_message(
1201 &self,
1202 content: String,
1203 transcript_content: serde_json::Value,
1204 mode: &str,
1205 ) -> String {
1206 self.queued_transcript_injections
1207 .push_pending_user_message(content, transcript_content, mode)
1208 .await
1209 }
1210
1211 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1212 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1213 .await
1214 }
1215
1216 pub async fn revoke_pending_user_message(
1217 &self,
1218 message_id: &str,
1219 ) -> PendingUserMessageMutationResult {
1220 self.queued_transcript_injections
1221 .revoke_pending_user_message(message_id)
1222 .await
1223 }
1224
1225 pub async fn revoke_pending_reminder(
1226 &self,
1227 reminder_id: &str,
1228 ) -> PendingReminderMutationResult {
1229 self.queued_transcript_injections
1230 .revoke_pending_reminder(reminder_id)
1231 .await
1232 }
1233
1234 pub async fn pending_injections_json(&self) -> serde_json::Value {
1235 self.queued_transcript_injections
1236 .pending_injections_json()
1237 .await
1238 }
1239
1240 pub async fn replace_pending_user_message(
1241 &self,
1242 message_id: &str,
1243 content: String,
1244 transcript_content: serde_json::Value,
1245 ) -> PendingUserMessageMutationResult {
1246 self.queued_transcript_injections
1247 .replace_pending_user_message(message_id, content, transcript_content)
1248 .await
1249 }
1250
1251 pub async fn push_queued_session_remind_from_params(
1252 &self,
1253 params: &serde_json::Value,
1254 ) -> Result<String, String> {
1255 let reminder = queued_session_remind_from_params(params)?;
1256 let reminder_id = reminder.reminder.id.clone();
1257 self.queued_transcript_injections
1258 .push_session_reminder(reminder)
1259 .await;
1260 Ok(reminder_id)
1261 }
1262
1263 pub async fn take_queued_user_messages(
1264 &self,
1265 include_interrupt_immediate: bool,
1266 include_finish_step: bool,
1267 include_audit_only: bool,
1268 ) -> Vec<QueuedUserMessage> {
1269 let mut state = self.queued_transcript_injections.inner.lock().await;
1270 let mut selected = Vec::new();
1271 let mut retained = VecDeque::new();
1272 while let Some(injection) = state.queue.pop_front() {
1273 let should_take = match injection.mode() {
1274 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1275 QueuedUserMessageMode::FinishStep => include_finish_step,
1276 QueuedUserMessageMode::AuditOnly => include_audit_only,
1277 };
1278 match (should_take, injection) {
1279 (true, QueuedTranscriptInjection::User(message)) => {
1280 state
1281 .delivered_user_message_ids
1282 .insert(message.message_id.clone());
1283 selected.push(message);
1284 }
1285 (_, injection) => retained.push_back(injection),
1286 }
1287 }
1288 state.queue = retained;
1289 selected
1290 }
1291
1292 pub async fn take_queued_transcript_injections(
1293 &self,
1294 include_interrupt_immediate: bool,
1295 include_finish_step: bool,
1296 include_audit_only: bool,
1297 ) -> Vec<QueuedTranscriptInjection> {
1298 let mut state = self.queued_transcript_injections.inner.lock().await;
1299 let mut selected = Vec::new();
1300 let mut retained = VecDeque::new();
1301 while let Some(injection) = state.queue.pop_front() {
1302 let should_take = match injection.mode() {
1303 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1304 QueuedUserMessageMode::FinishStep => include_finish_step,
1305 QueuedUserMessageMode::AuditOnly => include_audit_only,
1306 };
1307 if should_take {
1308 match &injection {
1309 QueuedTranscriptInjection::User(message) => {
1310 state
1311 .delivered_user_message_ids
1312 .insert(message.message_id.clone());
1313 }
1314 QueuedTranscriptInjection::Reminder(reminder) => {
1315 state
1316 .delivered_reminder_ids
1317 .insert(reminder.reminder.id.clone());
1318 }
1319 }
1320 selected.push(injection);
1321 } else {
1322 retained.push_back(injection);
1323 }
1324 }
1325 state.queue = retained;
1326 selected
1327 }
1328
1329 pub async fn take_queued_user_messages_for(
1330 &self,
1331 checkpoint: DeliveryCheckpoint,
1332 ) -> Vec<QueuedUserMessage> {
1333 match checkpoint {
1334 DeliveryCheckpoint::InterruptImmediate => {
1335 self.take_queued_user_messages(true, false, false).await
1336 }
1337 DeliveryCheckpoint::AfterCurrentOperation => {
1338 self.take_queued_user_messages(false, true, false).await
1339 }
1340 DeliveryCheckpoint::EndOfInteraction => {
1341 self.take_queued_user_messages(false, false, true).await
1342 }
1343 }
1344 }
1345
1346 pub async fn take_queued_transcript_injections_for(
1347 &self,
1348 checkpoint: DeliveryCheckpoint,
1349 ) -> Vec<QueuedTranscriptInjection> {
1350 match checkpoint {
1351 DeliveryCheckpoint::InterruptImmediate => {
1352 self.take_queued_transcript_injections(true, false, false)
1353 .await
1354 }
1355 DeliveryCheckpoint::AfterCurrentOperation => {
1356 self.take_queued_transcript_injections(false, true, false)
1357 .await
1358 }
1359 DeliveryCheckpoint::EndOfInteraction => {
1360 self.take_queued_transcript_injections(false, false, true)
1361 .await
1362 }
1363 }
1364 }
1365
1366 pub fn send_output(&self, text: &str) {
1368 self.notify("output", serde_json::json!({"text": text}));
1369 }
1370
1371 pub fn send_progress(
1373 &self,
1374 phase: &str,
1375 message: &str,
1376 progress: Option<i64>,
1377 total: Option<i64>,
1378 data: Option<serde_json::Value>,
1379 ) {
1380 let mut payload = serde_json::json!({"phase": phase, "message": message});
1381 if let Some(p) = progress {
1382 payload["progress"] = serde_json::json!(p);
1383 }
1384 if let Some(t) = total {
1385 payload["total"] = serde_json::json!(t);
1386 }
1387 if let Some(d) = data {
1388 payload["data"] = d;
1389 }
1390 self.notify("progress", payload);
1391 }
1392
1393 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1395 let mut payload = serde_json::json!({"level": level, "message": message});
1396 if let Some(f) = fields {
1397 payload["fields"] = f;
1398 }
1399 self.notify("log", payload);
1400 }
1401
1402 pub fn send_call_start(
1405 &self,
1406 call_id: &str,
1407 call_type: &str,
1408 name: &str,
1409 metadata: serde_json::Value,
1410 ) {
1411 let session_id = self.get_session_id();
1412 let script = self.get_script_name();
1413 let stream_publicly = metadata
1414 .get("stream_publicly")
1415 .and_then(|value| value.as_bool())
1416 .unwrap_or(true);
1417 self.visible_call_streams
1418 .lock()
1419 .unwrap_or_else(|e| e.into_inner())
1420 .insert(call_id.to_string(), stream_publicly);
1421 self.notify(
1422 "session/update",
1423 serde_json::json!({
1424 "sessionId": session_id,
1425 "update": {
1426 "sessionUpdate": "call_start",
1427 "content": {
1428 "toolCallId": call_id,
1429 "call_type": call_type,
1430 "name": name,
1431 "script": script,
1432 "metadata": metadata,
1433 },
1434 },
1435 }),
1436 );
1437 }
1438
1439 pub fn send_call_progress(
1442 &self,
1443 call_id: &str,
1444 delta: &str,
1445 accumulated_tokens: u64,
1446 user_visible: bool,
1447 ) {
1448 let session_id = self.get_session_id();
1449 let (visible_text, visible_delta) = {
1450 let stream_publicly = self
1451 .visible_call_streams
1452 .lock()
1453 .unwrap_or_else(|e| e.into_inner())
1454 .get(call_id)
1455 .copied()
1456 .unwrap_or(true);
1457 let mut states = self
1458 .visible_call_states
1459 .lock()
1460 .unwrap_or_else(|e| e.into_inner());
1461 let state = states.entry(call_id.to_string()).or_default();
1462 state.push(delta, stream_publicly)
1463 };
1464 self.notify(
1465 "session/update",
1466 serde_json::json!({
1467 "sessionId": session_id,
1468 "update": {
1469 "sessionUpdate": "call_progress",
1470 "content": {
1471 "toolCallId": call_id,
1472 "delta": delta,
1473 "accumulated_tokens": accumulated_tokens,
1474 "visible_text": visible_text,
1475 "visible_delta": visible_delta,
1476 "user_visible": user_visible,
1477 },
1478 },
1479 }),
1480 );
1481 }
1482
1483 pub fn send_call_end(
1485 &self,
1486 call_id: &str,
1487 call_type: &str,
1488 name: &str,
1489 duration_ms: u64,
1490 status: &str,
1491 metadata: serde_json::Value,
1492 ) {
1493 let session_id = self.get_session_id();
1494 let script = self.get_script_name();
1495 self.visible_call_states
1496 .lock()
1497 .unwrap_or_else(|e| e.into_inner())
1498 .remove(call_id);
1499 self.visible_call_streams
1500 .lock()
1501 .unwrap_or_else(|e| e.into_inner())
1502 .remove(call_id);
1503 self.notify(
1504 "session/update",
1505 serde_json::json!({
1506 "sessionId": session_id,
1507 "update": {
1508 "sessionUpdate": "call_end",
1509 "content": {
1510 "toolCallId": call_id,
1511 "call_type": call_type,
1512 "name": name,
1513 "script": script,
1514 "duration_ms": duration_ms,
1515 "status": status,
1516 "metadata": metadata,
1517 },
1518 },
1519 }),
1520 );
1521 }
1522
1523 pub fn send_worker_update(
1525 &self,
1526 worker_id: &str,
1527 worker_name: &str,
1528 status: &str,
1529 metadata: serde_json::Value,
1530 audit: Option<&MutationSessionRecord>,
1531 ) {
1532 let session_id = self.get_session_id();
1533 let script = self.get_script_name();
1534 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1535 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1536 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1537 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1538 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1539 let lifecycle = serde_json::json!({
1540 "event": status,
1541 "worker_id": worker_id,
1542 "worker_name": worker_name,
1543 "started_at": started_at,
1544 "finished_at": finished_at,
1545 });
1546 self.notify(
1547 "session/update",
1548 serde_json::json!({
1549 "sessionId": session_id,
1550 "update": {
1551 "sessionUpdate": "worker_update",
1552 "content": {
1553 "worker_id": worker_id,
1554 "worker_name": worker_name,
1555 "status": status,
1556 "script": script,
1557 "started_at": started_at,
1558 "finished_at": finished_at,
1559 "snapshot_path": snapshot_path,
1560 "run_id": run_id,
1561 "run_path": run_path,
1562 "lifecycle": lifecycle,
1563 "audit": audit,
1564 "metadata": metadata,
1565 },
1566 },
1567 }),
1568 );
1569 }
1570}
1571
1572pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1574 crate::stdlib::json_to_vm_value(val)
1575}
1576
1577fn parse_host_tools_list_response(
1578 result: serde_json::Value,
1579) -> Result<Vec<serde_json::Value>, VmError> {
1580 let tools = match result {
1581 serde_json::Value::Array(items) => items,
1582 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1583 map.get("result")
1584 .and_then(|value| value.get("tools"))
1585 .cloned()
1586 }) {
1587 Some(serde_json::Value::Array(items)) => items,
1588 _ => {
1589 return Err(VmError::Runtime(
1590 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1591 ));
1592 }
1593 },
1594 _ => {
1595 return Err(VmError::Runtime(
1596 "host/tools/list: unexpected response shape".into(),
1597 ));
1598 }
1599 };
1600
1601 let mut normalized = Vec::with_capacity(tools.len());
1602 for tool in tools {
1603 let serde_json::Value::Object(map) = tool else {
1604 return Err(VmError::Runtime(
1605 "host/tools/list: every tool must be an object".into(),
1606 ));
1607 };
1608 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1609 return Err(VmError::Runtime(
1610 "host/tools/list: every tool must include a string `name`".into(),
1611 ));
1612 };
1613 let description = map
1614 .get("description")
1615 .and_then(|value| value.as_str())
1616 .or_else(|| {
1617 map.get("short_description")
1618 .and_then(|value| value.as_str())
1619 })
1620 .unwrap_or_default();
1621 let schema = map
1622 .get("schema")
1623 .cloned()
1624 .or_else(|| map.get("parameters").cloned())
1625 .or_else(|| map.get("input_schema").cloned())
1626 .unwrap_or(serde_json::Value::Null);
1627 let deprecated = map
1628 .get("deprecated")
1629 .and_then(|value| value.as_bool())
1630 .unwrap_or(false);
1631 normalized.push(serde_json::json!({
1632 "name": name,
1633 "description": description,
1634 "schema": schema,
1635 "deprecated": deprecated,
1636 }));
1637 }
1638 Ok(normalized)
1639}
1640
1641#[cfg(test)]
1642mod tests {
1643 use super::*;
1644
1645 fn test_bridge() -> HostBridge {
1646 HostBridge::from_parts(
1647 Arc::new(Mutex::new(HashMap::new())),
1648 Arc::new(AtomicBool::new(false)),
1649 Arc::new(std::sync::Mutex::new(())),
1650 1,
1651 )
1652 }
1653
1654 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1655 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1656 Arc::new(Mutex::new(HashMap::new())),
1657 Arc::new(AtomicBool::new(false)),
1658 Arc::new(Notify::new()),
1659 Arc::new(|_| Ok(())),
1660 100,
1661 Some(owner.injection_state()),
1662 )
1663 }
1664
1665 #[test]
1666 fn test_json_rpc_request_format() {
1667 let request = crate::jsonrpc::request(
1668 1,
1669 "llm_call",
1670 serde_json::json!({
1671 "prompt": "Hello",
1672 "system": "Be helpful",
1673 }),
1674 );
1675 let s = serde_json::to_string(&request).unwrap();
1676 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1677 assert!(s.contains("\"id\":1"));
1678 assert!(s.contains("\"method\":\"llm_call\""));
1679 }
1680
1681 #[test]
1682 fn test_json_rpc_notification_format() {
1683 let notification =
1684 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1685 let s = serde_json::to_string(¬ification).unwrap();
1686 assert!(s.contains("\"method\":\"output\""));
1687 assert!(!s.contains("\"id\""));
1688 }
1689
1690 #[test]
1691 fn test_json_rpc_error_response_parsing() {
1692 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1693 assert!(response.get("error").is_some());
1694 assert_eq!(
1695 response["error"]["message"].as_str().unwrap(),
1696 "Invalid request"
1697 );
1698 }
1699
1700 #[test]
1701 fn test_json_rpc_success_response_parsing() {
1702 let response = crate::jsonrpc::response(
1703 1,
1704 serde_json::json!({
1705 "text": "Hello world",
1706 "input_tokens": 10,
1707 "output_tokens": 5,
1708 }),
1709 );
1710 assert!(response.get("result").is_some());
1711 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1712 }
1713
1714 #[test]
1715 fn test_cancelled_flag() {
1716 let cancelled = Arc::new(AtomicBool::new(false));
1717 assert!(!cancelled.load(Ordering::SeqCst));
1718 cancelled.store(true, Ordering::SeqCst);
1719 assert!(cancelled.load(Ordering::SeqCst));
1720 }
1721
1722 #[test]
1723 fn pending_host_calls_return_when_cancellation_arrives() {
1724 let runtime = tokio::runtime::Builder::new_current_thread()
1725 .enable_all()
1726 .build()
1727 .unwrap();
1728 runtime.block_on(async {
1729 let pending = Arc::new(Mutex::new(HashMap::new()));
1730 let cancelled = Arc::new(AtomicBool::new(false));
1731 let bridge = HostBridge::from_parts_with_writer(
1732 pending.clone(),
1733 cancelled.clone(),
1734 Arc::new(|_| Ok(())),
1735 1,
1736 );
1737
1738 let call = bridge.call("host/work", serde_json::json!({}));
1739 tokio::pin!(call);
1740
1741 loop {
1742 tokio::select! {
1743 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1744 _ = tokio::task::yield_now() => {}
1745 }
1746 if !pending.lock().await.is_empty() {
1747 break;
1748 }
1749 }
1750
1751 cancelled.store(true, Ordering::SeqCst);
1752 bridge.cancel_notify.notify_waiters();
1753
1754 let result = tokio::time::timeout(Duration::from_secs(1), call)
1755 .await
1756 .expect("pending call should observe cancellation promptly");
1757 assert!(
1758 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1759 );
1760 assert!(pending.lock().await.is_empty());
1761 });
1762 }
1763
1764 #[test]
1765 fn queued_messages_are_filtered_by_delivery_mode() {
1766 let runtime = tokio::runtime::Builder::new_current_thread()
1767 .enable_all()
1768 .build()
1769 .unwrap();
1770 runtime.block_on(async {
1771 let bridge = test_bridge();
1772 bridge
1773 .push_queued_user_message("first".to_string(), "finish_step")
1774 .await;
1775 bridge
1776 .push_queued_user_message("second".to_string(), "audit_only")
1777 .await;
1778
1779 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1780 assert_eq!(finish_step.len(), 1);
1781 assert_eq!(finish_step[0].content, "first");
1782
1783 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1784 assert_eq!(audit_only.len(), 1);
1785 assert_eq!(audit_only[0].content, "second");
1786 });
1787 }
1788
1789 #[test]
1790 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1791 let runtime = tokio::runtime::Builder::new_current_thread()
1792 .enable_all()
1793 .build()
1794 .unwrap();
1795 runtime.block_on(async {
1796 let bridge = test_bridge();
1797 let first_id = bridge
1798 .push_pending_user_message(
1799 "first".to_string(),
1800 serde_json::json!("first"),
1801 "audit_only",
1802 )
1803 .await;
1804 let second_id = bridge
1805 .push_pending_user_message(
1806 "second".to_string(),
1807 serde_json::json!("second"),
1808 "audit_only",
1809 )
1810 .await;
1811
1812 assert_eq!(
1813 bridge
1814 .replace_pending_user_message(
1815 &second_id,
1816 "second edited".to_string(),
1817 serde_json::json!("second edited"),
1818 )
1819 .await,
1820 PendingUserMessageMutationResult::Mutated
1821 );
1822 assert_eq!(
1823 bridge.revoke_pending_user_message(&first_id).await,
1824 PendingUserMessageMutationResult::Mutated
1825 );
1826 assert_eq!(
1827 bridge.revoke_pending_user_message(&first_id).await,
1828 PendingUserMessageMutationResult::AlreadyRevoked
1829 );
1830
1831 let delivered = bridge
1832 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1833 .await;
1834 assert_eq!(delivered.len(), 1);
1835 assert_eq!(delivered[0].message_id, second_id);
1836 assert_eq!(delivered[0].content, "second edited");
1837
1838 assert_eq!(
1839 bridge.revoke_pending_user_message(&second_id).await,
1840 PendingUserMessageMutationResult::AlreadyDelivered
1841 );
1842 assert_eq!(
1843 bridge
1844 .replace_pending_user_message(
1845 &second_id,
1846 "too late".to_string(),
1847 serde_json::json!("too late"),
1848 )
1849 .await,
1850 PendingUserMessageMutationResult::AlreadyDelivered
1851 );
1852 assert_eq!(
1853 bridge.revoke_pending_user_message("missing").await,
1854 PendingUserMessageMutationResult::UnknownMessageId
1855 );
1856 });
1857 }
1858
1859 #[test]
1860 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1861 let runtime = tokio::runtime::Builder::new_current_thread()
1862 .enable_all()
1863 .build()
1864 .unwrap();
1865 runtime.block_on(async {
1866 let bridge = test_bridge();
1867 let first_id = bridge
1868 .push_pending_user_message(
1869 "first".to_string(),
1870 serde_json::json!("first"),
1871 "finish_step",
1872 )
1873 .await;
1874 let second_id = bridge
1875 .push_pending_user_message(
1876 "second".to_string(),
1877 serde_json::json!("second"),
1878 "finish_step",
1879 )
1880 .await;
1881 assert_eq!(
1882 bridge
1883 .replace_pending_user_message(
1884 &first_id,
1885 "first edited".to_string(),
1886 serde_json::json!("first edited"),
1887 )
1888 .await,
1889 PendingUserMessageMutationResult::Mutated
1890 );
1891
1892 let delivered = bridge
1893 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1894 .await;
1895 assert_eq!(
1896 delivered
1897 .iter()
1898 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1899 .collect::<Vec<_>>(),
1900 vec![
1901 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1902 (&second_id, "second", QueuedUserMessageMode::FinishStep),
1903 ]
1904 );
1905 });
1906 }
1907
1908 #[test]
1909 fn pending_user_message_state_survives_bridge_replacement() {
1910 let runtime = tokio::runtime::Builder::new_current_thread()
1911 .enable_all()
1912 .build()
1913 .unwrap();
1914 runtime.block_on(async {
1915 let bridge = test_bridge();
1916 let revoked_id = bridge
1917 .push_pending_user_message(
1918 "revoke me".to_string(),
1919 serde_json::json!("revoke me"),
1920 "audit_only",
1921 )
1922 .await;
1923 let delivered_id = bridge
1924 .push_pending_user_message(
1925 "deliver me".to_string(),
1926 serde_json::json!("deliver me"),
1927 "audit_only",
1928 )
1929 .await;
1930 assert_eq!(
1931 bridge.revoke_pending_user_message(&revoked_id).await,
1932 PendingUserMessageMutationResult::Mutated
1933 );
1934 bridge.cancelled.store(true, Ordering::SeqCst);
1935
1936 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1937 assert_eq!(
1938 replacement_bridge
1939 .revoke_pending_user_message(&revoked_id)
1940 .await,
1941 PendingUserMessageMutationResult::AlreadyRevoked
1942 );
1943 let delivered = replacement_bridge
1944 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1945 .await;
1946 assert_eq!(delivered.len(), 1);
1947 assert_eq!(delivered[0].message_id, delivered_id);
1948 assert_eq!(delivered[0].content, "deliver me");
1949 assert_eq!(
1950 bridge.revoke_pending_user_message(&delivered_id).await,
1951 PendingUserMessageMutationResult::AlreadyDelivered
1952 );
1953 });
1954 }
1955
1956 #[test]
1957 fn queued_transcript_injections_preserve_user_reminder_separation() {
1958 let runtime = tokio::runtime::Builder::new_current_thread()
1959 .enable_all()
1960 .build()
1961 .unwrap();
1962 runtime.block_on(async {
1963 let bridge = test_bridge();
1964 bridge
1965 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1966 .await;
1967 let reminder_id = bridge
1968 .push_queued_session_remind_from_params(&serde_json::json!({
1969 "body": "Host-provided ambient context.",
1970 "tags": ["host"],
1971 "dedupe_key": "host-context",
1972 "ttl_turns": 2,
1973 "mode": "audit_only",
1974 "_meta": {"harn": {"source": "test"}},
1975 }))
1976 .await
1977 .expect("valid reminder");
1978
1979 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1980 assert_eq!(finish_step.len(), 1);
1981 assert_eq!(finish_step[0].content, "human follow-up");
1982
1983 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1984 assert!(no_user_messages.is_empty());
1985
1986 let injections = bridge
1987 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1988 .await;
1989 assert_eq!(injections.len(), 1);
1990 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1991 panic!("expected queued reminder");
1992 };
1993 assert_eq!(reminder.reminder.id, reminder_id);
1994 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1995 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1996 assert_eq!(
1997 reminder.reminder.dedupe_key.as_deref(),
1998 Some("host-context")
1999 );
2000 assert_eq!(reminder.reminder.ttl_turns, Some(2));
2001 assert_eq!(
2002 reminder.reminder.source,
2003 crate::llm::helpers::ReminderSource::Bridge
2004 );
2005 });
2006 }
2007
2008 #[test]
2009 fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2010 let runtime = tokio::runtime::Builder::new_current_thread()
2011 .enable_all()
2012 .build()
2013 .unwrap();
2014 runtime.block_on(async {
2015 let bridge = test_bridge();
2016 let message_id = bridge
2017 .push_pending_user_message(
2018 "human follow-up".to_string(),
2019 serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2020 "finish_step",
2021 )
2022 .await;
2023 let reminder_id = bridge
2024 .push_queued_session_remind_from_params(&serde_json::json!({
2025 "id": "rem-test",
2026 "body": "Host reminder",
2027 "tags": ["host"],
2028 "dedupe_key": "host-reminder",
2029 "ttl_turns": 2,
2030 "mode": "interrupt_immediate",
2031 }))
2032 .await
2033 .expect("valid session/remind payload");
2034
2035 let pending = bridge.pending_injections_json().await;
2036 assert_eq!(pending["pendingCount"], 2);
2037 assert_eq!(pending["injections"][0]["kind"], "user");
2038 assert_eq!(pending["injections"][0]["id"], message_id);
2039 assert_eq!(pending["injections"][0]["messageId"], message_id);
2040 assert_eq!(pending["injections"][0]["mode"], "finish_step");
2041 assert_eq!(pending["injections"][0]["position"], 0);
2042 assert_eq!(pending["injections"][1]["kind"], "reminder");
2043 assert_eq!(pending["injections"][1]["id"], reminder_id);
2044 assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2045 assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2046 assert_eq!(pending["injections"][1]["body"], "Host reminder");
2047 assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2048 assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2049 assert_eq!(pending["injections"][1]["position"], 1);
2050 });
2051 }
2052
2053 #[test]
2054 fn pending_reminders_support_revoke_and_delivery_states() {
2055 let runtime = tokio::runtime::Builder::new_current_thread()
2056 .enable_all()
2057 .build()
2058 .unwrap();
2059 runtime.block_on(async {
2060 let bridge = test_bridge();
2061 let revoked_id = bridge
2062 .push_queued_session_remind_from_params(&serde_json::json!({
2063 "id": "rem-revoke",
2064 "body": "remove me",
2065 "mode": "finish_step",
2066 }))
2067 .await
2068 .expect("valid session/remind payload");
2069 let delivered_id = bridge
2070 .push_queued_session_remind_from_params(&serde_json::json!({
2071 "id": "rem-deliver",
2072 "body": "deliver me",
2073 "mode": "finish_step",
2074 }))
2075 .await
2076 .expect("valid session/remind payload");
2077
2078 assert_eq!(
2079 bridge.revoke_pending_reminder(&revoked_id).await,
2080 PendingReminderMutationResult::Mutated
2081 );
2082 assert_eq!(
2083 bridge.revoke_pending_reminder(&revoked_id).await,
2084 PendingReminderMutationResult::AlreadyRevoked
2085 );
2086
2087 let pending = bridge.pending_injections_json().await;
2088 assert_eq!(pending["pendingCount"], 1);
2089 assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2090
2091 let delivered = bridge
2092 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2093 .await;
2094 assert_eq!(delivered.len(), 1);
2095 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2096 panic!("expected delivered reminder");
2097 };
2098 assert_eq!(reminder.reminder.id, delivered_id);
2099
2100 assert_eq!(
2101 bridge.revoke_pending_reminder(&delivered_id).await,
2102 PendingReminderMutationResult::AlreadyDelivered
2103 );
2104 assert_eq!(
2105 bridge.revoke_pending_reminder("missing").await,
2106 PendingReminderMutationResult::UnknownReminderId
2107 );
2108 });
2109 }
2110
2111 #[test]
2112 fn bridge_remind_modes_honor_delivery_checkpoints() {
2113 let runtime = tokio::runtime::Builder::new_current_thread()
2114 .enable_all()
2115 .build()
2116 .unwrap();
2117 runtime.block_on(async {
2118 let cases = [
2119 (
2120 "interrupt_immediate",
2121 DeliveryCheckpoint::InterruptImmediate,
2122 DeliveryCheckpoint::AfterCurrentOperation,
2123 ),
2124 (
2125 "finish_step",
2126 DeliveryCheckpoint::AfterCurrentOperation,
2127 DeliveryCheckpoint::EndOfInteraction,
2128 ),
2129 (
2130 "audit_only",
2131 DeliveryCheckpoint::EndOfInteraction,
2132 DeliveryCheckpoint::InterruptImmediate,
2133 ),
2134 ];
2135
2136 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2137 let bridge = test_bridge();
2138 bridge
2139 .push_queued_session_remind_from_params(&serde_json::json!({
2140 "body": format!("Reminder for {mode}"),
2141 "mode": mode,
2142 }))
2143 .await
2144 .expect("valid session/remind payload");
2145
2146 let premature = bridge
2147 .take_queued_transcript_injections_for(wrong_checkpoint)
2148 .await;
2149 assert!(
2150 premature.is_empty(),
2151 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2152 );
2153
2154 let delivered = bridge
2155 .take_queued_transcript_injections_for(expected_checkpoint)
2156 .await;
2157 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2158 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2159 panic!("expected reminder for {mode}");
2160 };
2161 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2162 }
2163 });
2164 }
2165
2166 #[test]
2167 fn session_remind_validation_rejects_user_message_shape() {
2168 let err = queued_session_remind_from_params(&serde_json::json!({
2169 "content": "this is still a user message",
2170 "mode": "interrupt_immediate",
2171 }))
2172 .expect_err("session/remind must require a reminder body");
2173 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2174 assert!(err.contains("body"));
2175 }
2176
2177 #[test]
2178 fn session_remind_validation_rejects_unknown_options_separately() {
2179 let err = queued_session_remind_from_params(&serde_json::json!({
2180 "body": "valid body",
2181 "unknown_host_field": true,
2182 }))
2183 .expect_err("session/remind must reject unknown top-level fields");
2184 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2185 assert!(err.contains("unknown_host_field"));
2186 }
2187
2188 #[test]
2189 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2190 let err = queued_session_remind_from_params(&serde_json::json!({
2191 "body": "valid body",
2192 "propagate": "workspace",
2193 }))
2194 .expect_err("session/remind must reject unknown propagate values");
2195 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2196 assert!(err.contains("propagate"));
2197 }
2198
2199 #[test]
2200 fn test_json_result_to_vm_value_string() {
2201 let val = serde_json::json!("hello");
2202 let vm_val = json_result_to_vm_value(&val);
2203 assert_eq!(vm_val.display(), "hello");
2204 }
2205
2206 #[test]
2207 fn test_json_result_to_vm_value_dict() {
2208 let val = serde_json::json!({"name": "test", "count": 42});
2209 let vm_val = json_result_to_vm_value(&val);
2210 let VmValue::Dict(d) = &vm_val else {
2211 unreachable!("Expected Dict, got {:?}", vm_val);
2212 };
2213 assert_eq!(d.get("name").unwrap().display(), "test");
2214 assert_eq!(d.get("count").unwrap().display(), "42");
2215 }
2216
2217 #[test]
2218 fn test_json_result_to_vm_value_null() {
2219 let val = serde_json::json!(null);
2220 let vm_val = json_result_to_vm_value(&val);
2221 assert!(matches!(vm_val, VmValue::Nil));
2222 }
2223
2224 #[test]
2225 fn test_json_result_to_vm_value_nested() {
2226 let val = serde_json::json!({
2227 "text": "response",
2228 "tool_calls": [
2229 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2230 ],
2231 "input_tokens": 100,
2232 "output_tokens": 50,
2233 });
2234 let vm_val = json_result_to_vm_value(&val);
2235 let VmValue::Dict(d) = &vm_val else {
2236 unreachable!("Expected Dict, got {:?}", vm_val);
2237 };
2238 assert_eq!(d.get("text").unwrap().display(), "response");
2239 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2240 unreachable!("Expected List for tool_calls");
2241 };
2242 assert_eq!(list.len(), 1);
2243 }
2244
2245 #[test]
2246 fn parse_host_tools_list_accepts_object_wrapper() {
2247 let tools = parse_host_tools_list_response(serde_json::json!({
2248 "tools": [
2249 {
2250 "name": "Read",
2251 "description": "Read a file",
2252 "schema": {"type": "object"},
2253 }
2254 ]
2255 }))
2256 .expect("tool list");
2257
2258 assert_eq!(tools.len(), 1);
2259 assert_eq!(tools[0]["name"], "Read");
2260 assert_eq!(tools[0]["deprecated"], false);
2261 }
2262
2263 #[test]
2264 fn parse_host_tools_list_accepts_compat_fields() {
2265 let tools = parse_host_tools_list_response(serde_json::json!({
2266 "result": {
2267 "tools": [
2268 {
2269 "name": "Edit",
2270 "short_description": "Apply an edit",
2271 "input_schema": {"type": "object"},
2272 "deprecated": true,
2273 }
2274 ]
2275 }
2276 }))
2277 .expect("tool list");
2278
2279 assert_eq!(tools[0]["description"], "Apply an edit");
2280 assert_eq!(tools[0]["schema"]["type"], "object");
2281 assert_eq!(tools[0]["deprecated"], true);
2282 }
2283
2284 #[test]
2285 fn parse_host_tools_list_requires_tool_names() {
2286 let err = parse_host_tools_list_response(serde_json::json!({
2287 "tools": [
2288 {"description": "missing name"}
2289 ]
2290 }))
2291 .expect_err("expected error");
2292 assert!(err
2293 .to_string()
2294 .contains("host/tools/list: every tool must include a string `name`"));
2295 }
2296
2297 #[test]
2298 fn test_timeout_duration() {
2299 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2300 }
2301}