1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
13use std::sync::Arc;
14use std::time::Duration;
15
16use tokio::io::AsyncBufReadExt;
17use tokio::sync::{oneshot, Mutex, Notify};
18
19use harn_parser::diagnostic_codes::Code;
20
21use crate::orchestration::MutationSessionRecord;
22use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
23use crate::visible_text::VisibleTextState;
24use crate::vm::Vm;
25
26const DEFAULT_TIMEOUT: Duration = Duration::from_mins(5);
28
29pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
30
31fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
32 Arc::new(move |line: &str| {
33 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
34 let mut stdout = std::io::stdout().lock();
35 stdout
36 .write_all(line.as_bytes())
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .write_all(b"\n")
40 .map_err(|e| format!("Bridge write error: {e}"))?;
41 stdout
42 .flush()
43 .map_err(|e| format!("Bridge flush error: {e}"))?;
44 Ok(())
45 })
46}
47
48pub struct HostBridge {
55 next_id: AtomicU64,
56 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
58 cancelled: Arc<AtomicBool>,
60 cancel_notify: Arc<Notify>,
62 writer: HostBridgeWriter,
64 session_id: std::sync::Mutex<String>,
66 script_name: std::sync::Mutex<String>,
68 queued_transcript_injections: HostBridgeInjectionState,
70 resume_requested: Arc<AtomicBool>,
72 skills_reload_requested: Arc<AtomicBool>,
77 daemon_idle: Arc<AtomicBool>,
79 prompt_stop_reason: std::sync::Mutex<Option<String>>,
85 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
87 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
89 in_process: Option<InProcessHost>,
91}
92
93struct InProcessHost {
94 module_path: PathBuf,
95 exported_functions: BTreeMap<String, Arc<VmClosure>>,
96 vm: Vm,
97}
98
99impl InProcessHost {
100 fn dispatch<'a>(
106 &'a self,
107 method: &'a str,
108 params: serde_json::Value,
109 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + Send + 'a>> {
110 Box::pin(async move {
111 match method {
112 "builtin_call" => {
113 let name = params
114 .get("name")
115 .and_then(|value| value.as_str())
116 .unwrap_or_default();
117 let args = params
118 .get("args")
119 .and_then(|value| value.as_array())
120 .cloned()
121 .unwrap_or_default()
122 .into_iter()
123 .map(|value| json_result_to_vm_value(&value))
124 .collect::<Vec<_>>();
125 self.invoke_export(name, &args).await
126 }
127 "host/tools/list" => self
128 .invoke_optional_export("host_tools_list", &[])
129 .await
130 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
131 "session/request_permission" => self.request_permission(params).await,
132 other => Err(VmError::Runtime(format!(
133 "playground host backend does not implement bridge method '{other}'"
134 ))),
135 }
136 })
137 }
138
139 async fn invoke_export(
140 &self,
141 name: &str,
142 args: &[VmValue],
143 ) -> Result<serde_json::Value, VmError> {
144 let Some(closure) = self.exported_functions.get(name) else {
145 return Err(VmError::Runtime(format!(
146 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
147 self.module_path.display()
148 )));
149 };
150
151 let mut vm = self.vm.child_vm_for_host();
152 let result = vm.call_closure_pub(closure, args).await?;
153 Ok(crate::llm::vm_value_to_json(&result))
154 }
155
156 async fn invoke_optional_export(
157 &self,
158 name: &str,
159 args: &[VmValue],
160 ) -> Result<Option<serde_json::Value>, VmError> {
161 if !self.exported_functions.contains_key(name) {
162 return Ok(None);
163 }
164 self.invoke_export(name, args).await.map(Some)
165 }
166
167 async fn request_permission(
168 &self,
169 params: serde_json::Value,
170 ) -> Result<serde_json::Value, VmError> {
171 let Some(closure) = self.exported_functions.get("request_permission") else {
174 return Ok(crate::llm::acp_permission::allow_response());
175 };
176
177 let tool_call = params.get("toolCall");
178 let tool_name = tool_call
179 .and_then(|tool_call| tool_call.pointer("/_meta/harn/toolName"))
180 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("toolName")))
181 .or_else(|| tool_call.and_then(|tool_call| tool_call.get("title")))
182 .and_then(|value| value.as_str())
183 .unwrap_or_default();
184 let tool_args = tool_call
185 .and_then(|tool_call| tool_call.get("rawInput"))
186 .map(json_result_to_vm_value)
187 .unwrap_or(VmValue::Nil);
188 let full_payload = json_result_to_vm_value(¶ms);
189
190 let arg_count = closure.func.params.len();
191 let args = if arg_count >= 3 {
192 vec![
193 VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
194 tool_args,
195 full_payload,
196 ]
197 } else if arg_count == 2 {
198 vec![
199 VmValue::String(arcstr::ArcStr::from(tool_name.to_string())),
200 tool_args,
201 ]
202 } else if arg_count == 1 {
203 vec![full_payload]
204 } else {
205 Vec::new()
206 };
207
208 let mut vm = self.vm.child_vm_for_host();
209 let result = vm.call_closure_pub(closure, &args).await?;
210 let payload = match result {
215 VmValue::Bool(granted) => {
216 if granted {
217 crate::llm::acp_permission::allow_response()
218 } else {
219 crate::llm::acp_permission::reject_response(None)
220 }
221 }
222 VmValue::String(reason) if !reason.is_empty() => {
223 crate::llm::acp_permission::reject_response(Some(reason.to_string()))
224 }
225 other => {
226 let json = crate::llm::vm_value_to_json(&other);
227 if let Some(granted) = json.get("granted").and_then(|value| value.as_bool()) {
228 if granted {
229 crate::llm::acp_permission::allow_response()
230 } else {
231 crate::llm::acp_permission::reject_response(
232 json.get("reason")
233 .and_then(|value| value.as_str())
234 .map(str::to_string),
235 )
236 }
237 } else if json.get("outcome").is_some() {
238 json
240 } else if other.is_truthy() {
241 crate::llm::acp_permission::allow_response()
242 } else {
243 crate::llm::acp_permission::reject_response(None)
244 }
245 }
246 };
247 Ok(payload)
248 }
249}
250
251#[derive(Clone, Copy, Debug, PartialEq, Eq)]
260pub enum QueuedUserMessageMode {
261 InterruptImmediate,
262 FinishStep,
263 AuditOnly,
264}
265
266#[derive(Clone, Copy, Debug, PartialEq, Eq)]
267pub enum DeliveryCheckpoint {
268 InterruptImmediate,
269 AfterCurrentOperation,
270 EndOfInteraction,
271}
272
273impl QueuedUserMessageMode {
274 fn from_str(value: &str) -> Self {
275 match value {
276 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
277 "finish_step" | "after_current_operation" | "steer" => Self::FinishStep,
281 "queue" => Self::AuditOnly,
283 _ => Self::AuditOnly,
288 }
289 }
290
291 fn as_str(self) -> &'static str {
292 match self {
293 Self::InterruptImmediate => "interrupt_immediate",
294 Self::FinishStep => "finish_step",
295 Self::AuditOnly => "audit_only",
296 }
297 }
298}
299
300#[derive(Clone, Debug, PartialEq, Eq)]
301pub struct QueuedUserMessage {
302 pub message_id: String,
303 pub content: String,
304 pub transcript_content: serde_json::Value,
305 pub mode: QueuedUserMessageMode,
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
309pub struct QueuedReminder {
310 pub reminder: crate::llm::helpers::SystemReminder,
311 pub mode: QueuedUserMessageMode,
312}
313
314#[derive(Clone, Debug, PartialEq, Eq)]
315pub enum QueuedTranscriptInjection {
316 User(QueuedUserMessage),
317 Reminder(QueuedReminder),
318}
319
320#[derive(Debug, Default)]
321struct QueuedTranscriptInjections {
322 queue: VecDeque<QueuedTranscriptInjection>,
323 revoked_user_message_ids: HashSet<String>,
324 delivered_user_message_ids: HashSet<String>,
325 revoked_reminder_ids: HashSet<String>,
326 delivered_reminder_ids: HashSet<String>,
327}
328
329#[derive(Clone, Debug, Default)]
330pub struct HostBridgeInjectionState {
331 inner: Arc<Mutex<QueuedTranscriptInjections>>,
332}
333
334#[derive(Clone, Copy, Debug, PartialEq, Eq)]
335pub enum PendingUserMessageMutationResult {
336 Mutated,
337 AlreadyRevoked,
338 AlreadyDelivered,
339 UnknownMessageId,
340}
341
342impl QueuedTranscriptInjection {
343 fn mode(&self) -> QueuedUserMessageMode {
344 match self {
345 Self::User(message) => message.mode,
346 Self::Reminder(reminder) => reminder.mode,
347 }
348 }
349
350 fn pending_json(&self, position: usize) -> serde_json::Value {
351 match self {
352 Self::User(message) => serde_json::json!({
353 "kind": "user",
354 "id": message.message_id,
355 "messageId": message.message_id,
356 "mode": message.mode.as_str(),
357 "position": position,
358 "content": message.transcript_content,
359 }),
360 Self::Reminder(reminder) => serde_json::json!({
361 "kind": "reminder",
362 "id": reminder.reminder.id,
363 "reminderId": reminder.reminder.id,
364 "mode": reminder.mode.as_str(),
365 "position": position,
366 "body": reminder.reminder.body,
367 "tags": reminder.reminder.tags,
368 "dedupeKey": reminder.reminder.dedupe_key,
369 "ttlTurns": reminder.reminder.ttl_turns,
370 "preserveOnCompact": reminder.reminder.preserve_on_compact,
371 "propagate": reminder.reminder.propagate.as_str(),
372 "roleHint": reminder.reminder.role_hint.as_str(),
373 "source": reminder.reminder.source.as_str(),
374 "firedAtTurn": reminder.reminder.fired_at_turn,
375 "originatingAgentId": reminder.reminder.originating_agent_id,
376 }),
377 }
378 }
379}
380
381#[derive(Clone, Copy, Debug, PartialEq, Eq)]
382pub enum PendingReminderMutationResult {
383 Mutated,
384 AlreadyRevoked,
385 AlreadyDelivered,
386 UnknownReminderId,
387}
388
389fn new_inject_message_id() -> String {
390 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
391}
392
393impl HostBridgeInjectionState {
394 pub fn new() -> Self {
395 Self::default()
396 }
397
398 pub async fn push_pending_user_message(
399 &self,
400 content: String,
401 transcript_content: serde_json::Value,
402 mode: &str,
403 ) -> String {
404 let message_id = new_inject_message_id();
405 self.inner
406 .lock()
407 .await
408 .queue
409 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
410 message_id: message_id.clone(),
411 content,
412 transcript_content,
413 mode: QueuedUserMessageMode::from_str(mode),
414 }));
415 message_id
416 }
417
418 pub async fn revoke_pending_user_message(
419 &self,
420 message_id: &str,
421 ) -> PendingUserMessageMutationResult {
422 let mut state = self.inner.lock().await;
423 let mut retained = VecDeque::new();
424 let mut revoked = false;
425 while let Some(injection) = state.queue.pop_front() {
426 match &injection {
427 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
428 revoked = true;
429 }
430 _ => retained.push_back(injection),
431 }
432 }
433 state.queue = retained;
434 if revoked {
435 state
436 .revoked_user_message_ids
437 .insert(message_id.to_string());
438 return PendingUserMessageMutationResult::Mutated;
439 }
440 if state.revoked_user_message_ids.contains(message_id) {
441 PendingUserMessageMutationResult::AlreadyRevoked
442 } else if state.delivered_user_message_ids.contains(message_id) {
443 PendingUserMessageMutationResult::AlreadyDelivered
444 } else {
445 PendingUserMessageMutationResult::UnknownMessageId
446 }
447 }
448
449 pub async fn revoke_pending_reminder(
450 &self,
451 reminder_id: &str,
452 ) -> PendingReminderMutationResult {
453 let mut state = self.inner.lock().await;
454 let mut retained = VecDeque::new();
455 let mut revoked = false;
456 while let Some(injection) = state.queue.pop_front() {
457 match &injection {
458 QueuedTranscriptInjection::Reminder(reminder)
459 if reminder.reminder.id == reminder_id =>
460 {
461 revoked = true;
462 }
463 _ => retained.push_back(injection),
464 }
465 }
466 state.queue = retained;
467 if revoked {
468 state.revoked_reminder_ids.insert(reminder_id.to_string());
469 return PendingReminderMutationResult::Mutated;
470 }
471 if state.revoked_reminder_ids.contains(reminder_id) {
472 PendingReminderMutationResult::AlreadyRevoked
473 } else if state.delivered_reminder_ids.contains(reminder_id) {
474 PendingReminderMutationResult::AlreadyDelivered
475 } else {
476 PendingReminderMutationResult::UnknownReminderId
477 }
478 }
479
480 pub async fn replace_pending_user_message(
481 &self,
482 message_id: &str,
483 content: String,
484 transcript_content: serde_json::Value,
485 ) -> PendingUserMessageMutationResult {
486 let mut state = self.inner.lock().await;
487 for injection in &mut state.queue {
488 if let QueuedTranscriptInjection::User(message) = injection {
489 if message.message_id == message_id {
490 message.content = content;
491 message.transcript_content = transcript_content;
492 return PendingUserMessageMutationResult::Mutated;
493 }
494 }
495 }
496 if state.revoked_user_message_ids.contains(message_id) {
497 PendingUserMessageMutationResult::AlreadyRevoked
498 } else if state.delivered_user_message_ids.contains(message_id) {
499 PendingUserMessageMutationResult::AlreadyDelivered
500 } else {
501 PendingUserMessageMutationResult::UnknownMessageId
502 }
503 }
504
505 async fn push_session_reminder(&self, reminder: QueuedReminder) {
506 self.inner
507 .lock()
508 .await
509 .queue
510 .push_back(QueuedTranscriptInjection::Reminder(reminder));
511 }
512
513 pub async fn pending_injections_json(&self) -> serde_json::Value {
514 let state = self.inner.lock().await;
515 let injections = state
516 .queue
517 .iter()
518 .enumerate()
519 .map(|(position, injection)| injection.pending_json(position))
520 .collect::<Vec<_>>();
521 serde_json::json!({
522 "pendingCount": injections.len(),
523 "injections": injections,
524 })
525 }
526}
527
528fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
529 format!(
530 "{}: {}",
531 Code::ReminderUnknownOption.as_str(),
532 message.as_ref()
533 )
534}
535
536fn session_remind_shape_error(message: impl AsRef<str>) -> String {
537 format!(
538 "{}: {}",
539 Code::ReminderInvalidShape.as_str(),
540 message.as_ref()
541 )
542}
543
544fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
545 format!(
546 "{}: {}",
547 Code::ReminderUnknownPropagate.as_str(),
548 message.as_ref()
549 )
550}
551
552fn string_field(
553 map: &serde_json::Map<String, serde_json::Value>,
554 key: &str,
555 required: bool,
556) -> Result<Option<String>, String> {
557 match map.get(key) {
558 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
559 format!("`{key}` must be a non-empty string"),
560 )),
561 None | Some(serde_json::Value::Null) => Ok(None),
562 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
563 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
564 ),
565 Some(serde_json::Value::String(value)) => {
566 let trimmed = value.trim();
567 if trimmed.is_empty() {
568 Ok(None)
569 } else {
570 Ok(Some(trimmed.to_string()))
571 }
572 }
573 Some(other) => Err(session_remind_shape_error(format!(
574 "`{key}` must be a string, got {other}"
575 ))),
576 }
577}
578
579fn bool_field(
580 map: &serde_json::Map<String, serde_json::Value>,
581 key: &str,
582) -> Result<Option<bool>, String> {
583 match map.get(key) {
584 None | Some(serde_json::Value::Null) => Ok(None),
585 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
586 Some(other) => Err(session_remind_shape_error(format!(
587 "`{key}` must be a bool, got {other}"
588 ))),
589 }
590}
591
592fn int_field(
593 map: &serde_json::Map<String, serde_json::Value>,
594 key: &str,
595) -> Result<Option<i64>, String> {
596 match map.get(key) {
597 None | Some(serde_json::Value::Null) => Ok(None),
598 Some(serde_json::Value::Number(value)) => {
599 let Some(value) = value.as_i64() else {
600 return Err(session_remind_shape_error(format!(
601 "`{key}` must be an integer"
602 )));
603 };
604 Ok(Some(value))
605 }
606 Some(other) => Err(session_remind_shape_error(format!(
607 "`{key}` must be an int, got {other}"
608 ))),
609 }
610}
611
612fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
613 let Some(value) = map.get("tags") else {
614 return Ok(Vec::new());
615 };
616 if value.is_null() {
617 return Ok(Vec::new());
618 }
619 let Some(values) = value.as_array() else {
620 return Err(session_remind_shape_error("`tags` must be a list"));
621 };
622 let mut tags = Vec::new();
623 for value in values {
624 let Some(tag) = value.as_str() else {
625 return Err(session_remind_shape_error(format!(
626 "`tags` entries must be strings, got {value}"
627 )));
628 };
629 let tag = tag.trim();
630 if tag.is_empty() {
631 return Err(session_remind_shape_error(
632 "`tags` entries must be non-empty strings",
633 ));
634 }
635 if !tags.iter().any(|existing| existing == tag) {
636 tags.push(tag.to_string());
637 }
638 }
639 Ok(tags)
640}
641
642fn session_remind_payload_from_value(
643 value: &serde_json::Value,
644) -> Result<crate::llm::helpers::SystemReminder, String> {
645 let Some(map) = value.as_object() else {
646 return Err(session_remind_shape_error(
647 "session/remind payload must be a reminder object",
648 ));
649 };
650 const ALLOWED: &[&str] = &[
651 "_meta",
652 "body",
653 "dedupe_key",
654 "fired_at_turn",
655 "id",
656 "preserve_on_compact",
657 "propagate",
658 "role_hint",
659 "source",
660 "tags",
661 "ttl_turns",
662 ];
663 let unknown = map
664 .keys()
665 .filter(|key| !ALLOWED.contains(&key.as_str()))
666 .map(String::as_str)
667 .collect::<Vec<_>>();
668 if !unknown.is_empty() {
669 if unknown.contains(&"content") {
670 return Err(session_remind_shape_error(
671 "session/remind expects reminder `body`, not user-message `content`",
672 ));
673 }
674 return Err(reminder_unknown_option_error(format!(
675 "unknown reminder option(s): {}",
676 unknown.join(", ")
677 )));
678 }
679 if let Some(meta) = map.get("_meta") {
680 if !meta.is_null() && !meta.is_object() {
681 return Err(session_remind_shape_error("`_meta` must be an object"));
682 }
683 }
684 let ttl_turns = int_field(map, "ttl_turns")?;
685 if let Some(value) = ttl_turns {
686 if value <= 0 {
687 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
688 }
689 }
690 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
691 if fired_at_turn < 0 {
692 return Err(session_remind_shape_error(
693 "`fired_at_turn` must be >= 0 when provided",
694 ));
695 }
696 match string_field(map, "source", false)?.as_deref() {
697 None | Some("bridge") => {}
698 Some(_) => {
699 return Err(session_remind_shape_error(
700 "`source` for session/remind must be bridge when provided",
701 ))
702 }
703 }
704 let propagate = match string_field(map, "propagate", false)?.as_deref() {
705 None => crate::llm::helpers::ReminderPropagate::Session,
706 Some("all") => crate::llm::helpers::ReminderPropagate::All,
707 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
708 Some("none") => crate::llm::helpers::ReminderPropagate::None,
709 Some(_) => {
710 return Err(reminder_unknown_propagate_error(
711 "`propagate` must be one of all, session, or none",
712 ))
713 }
714 };
715 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
716 None => crate::llm::helpers::ReminderRoleHint::System,
717 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
718 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
719 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
720 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
721 Some(_) => {
722 return Err(session_remind_shape_error(
723 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
724 ))
725 }
726 };
727 Ok(crate::llm::helpers::SystemReminder {
728 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
729 tags: tags_field(map)?,
730 dedupe_key: string_field(map, "dedupe_key", false)?,
731 ttl_turns,
732 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
733 propagate,
734 role_hint,
735 source: crate::llm::helpers::ReminderSource::Bridge,
736 body: string_field(map, "body", true)?.unwrap_or_default(),
737 fired_at_turn,
738 originating_agent_id: None,
739 })
740}
741
742fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
752 let session_id = params
753 .get("sessionId")
754 .or_else(|| params.get("session_id"))
755 .and_then(|value| value.as_str())
756 .unwrap_or_default();
757 let call_id = params
758 .get("toolCallId")
759 .or_else(|| params.get("tool_call_id"))
760 .or_else(|| params.get("callId"))
761 .or_else(|| params.get("call_id"))
762 .and_then(|value| value.as_str())
763 .unwrap_or_default();
764 if call_id.is_empty() {
765 return;
766 }
767 let reason = params
768 .get("reason")
769 .and_then(|value| value.as_str())
770 .unwrap_or("host cancelled in-flight tool call")
771 .to_string();
772 let inject_reminder = params
773 .get("injectReminder")
774 .or_else(|| params.get("inject_reminder"))
775 .and_then(|value| value.as_bool())
776 .unwrap_or(true);
777 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
778}
779
780fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
781 let mode = QueuedUserMessageMode::from_str(
782 params
783 .get("mode")
784 .and_then(|value| value.as_str())
785 .unwrap_or("audit_only"),
786 );
787 let reminder_value = if let Some(reminder) = params.get("reminder") {
788 reminder.clone()
789 } else {
790 let Some(params) = params.as_object() else {
791 return Err(session_remind_shape_error(
792 "session/remind params must be an object",
793 ));
794 };
795 let mut reminder = params.clone();
796 reminder.remove("mode");
797 reminder.remove("sessionId");
798 reminder.remove("session_id");
799 serde_json::Value::Object(reminder)
800 };
801 Ok(QueuedReminder {
802 reminder: session_remind_payload_from_value(&reminder_value)?,
803 mode,
804 })
805}
806
807#[allow(clippy::new_without_default)]
809impl HostBridge {
810 pub fn new() -> Self {
815 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
816 Arc::new(Mutex::new(HashMap::new()));
817 let cancelled = Arc::new(AtomicBool::new(false));
818 let cancel_notify = Arc::new(Notify::new());
819 let queued_transcript_injections = HostBridgeInjectionState::default();
820 let resume_requested = Arc::new(AtomicBool::new(false));
821 let skills_reload_requested = Arc::new(AtomicBool::new(false));
822 let daemon_idle = Arc::new(AtomicBool::new(false));
823
824 let pending_clone = pending.clone();
826 let cancelled_clone = cancelled.clone();
827 let cancel_notify_clone = cancel_notify.clone();
828 let queued_clone = queued_transcript_injections.clone();
829 let resume_clone = resume_requested.clone();
830 let skills_reload_clone = skills_reload_requested.clone();
831 tokio::task::spawn_local(async move {
832 let stdin = tokio::io::stdin();
833 let reader = tokio::io::BufReader::new(stdin);
834 let mut lines = reader.lines();
835
836 while let Ok(Some(line)) = lines.next_line().await {
837 let line = line.trim().to_string();
838 if line.is_empty() {
839 continue;
840 }
841
842 let msg: serde_json::Value = match serde_json::from_str(&line) {
843 Ok(v) => v,
844 Err(_) => continue,
845 };
846
847 if msg.get("id").is_none() {
849 if let Some(method) = msg["method"].as_str() {
850 if method == "cancel" {
851 cancelled_clone.store(true, Ordering::SeqCst);
852 cancel_notify_clone.notify_waiters();
853 } else if method == "agent/resume" {
854 resume_clone.store(true, Ordering::SeqCst);
855 } else if method == "skills/update" {
856 skills_reload_clone.store(true, Ordering::SeqCst);
857 } else if method == "session/remind" {
858 let params = &msg["params"];
859 if let Ok(reminder) = queued_session_remind_from_params(params) {
860 queued_clone.push_session_reminder(reminder).await;
861 }
862 } else if method == "session/cancel_tool_call" {
863 handle_cancel_tool_call_notification(&msg["params"]);
864 }
865 }
866 continue;
867 }
868
869 if let Some(id) = msg["id"].as_u64() {
870 let mut pending = pending_clone.lock().await;
871 if let Some(sender) = pending.remove(&id) {
872 let _ = sender.send(msg);
873 }
874 }
875 }
876
877 let mut pending = pending_clone.lock().await;
879 pending.clear();
880 });
881
882 Self {
883 next_id: AtomicU64::new(1),
884 pending,
885 cancelled,
886 cancel_notify,
887 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
888 session_id: std::sync::Mutex::new(String::new()),
889 script_name: std::sync::Mutex::new(String::new()),
890 queued_transcript_injections,
891 resume_requested,
892 skills_reload_requested,
893 daemon_idle,
894 prompt_stop_reason: std::sync::Mutex::new(None),
895 visible_call_states: std::sync::Mutex::new(HashMap::new()),
896 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
897 in_process: None,
898 }
899 }
900
901 pub fn from_parts(
907 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
908 cancelled: Arc<AtomicBool>,
909 stdout_lock: Arc<std::sync::Mutex<()>>,
910 start_id: u64,
911 ) -> Self {
912 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
913 }
914
915 pub fn from_parts_with_writer(
916 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
917 cancelled: Arc<AtomicBool>,
918 writer: HostBridgeWriter,
919 start_id: u64,
920 ) -> Self {
921 Self::from_parts_with_writer_and_cancel_notify(
922 pending,
923 cancelled,
924 Arc::new(Notify::new()),
925 writer,
926 start_id,
927 )
928 }
929
930 pub fn from_parts_with_writer_and_cancel_notify(
931 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
932 cancelled: Arc<AtomicBool>,
933 cancel_notify: Arc<Notify>,
934 writer: HostBridgeWriter,
935 start_id: u64,
936 ) -> Self {
937 Self::from_parts_with_writer_cancel_notify_and_injection_state(
938 pending,
939 cancelled,
940 cancel_notify,
941 writer,
942 start_id,
943 None,
944 )
945 }
946
947 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
948 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
949 cancelled: Arc<AtomicBool>,
950 cancel_notify: Arc<Notify>,
951 writer: HostBridgeWriter,
952 start_id: u64,
953 injection_state: Option<HostBridgeInjectionState>,
954 ) -> Self {
955 Self {
956 next_id: AtomicU64::new(start_id),
957 pending,
958 cancelled,
959 cancel_notify,
960 writer,
961 session_id: std::sync::Mutex::new(String::new()),
962 script_name: std::sync::Mutex::new(String::new()),
963 queued_transcript_injections: injection_state.unwrap_or_default(),
964 resume_requested: Arc::new(AtomicBool::new(false)),
965 skills_reload_requested: Arc::new(AtomicBool::new(false)),
966 daemon_idle: Arc::new(AtomicBool::new(false)),
967 prompt_stop_reason: std::sync::Mutex::new(None),
968 visible_call_states: std::sync::Mutex::new(HashMap::new()),
969 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
970 in_process: None,
971 }
972 }
973
974 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
977 let exported_functions = vm.load_module_exports(module_path).await?;
978 Ok(Self {
979 next_id: AtomicU64::new(1),
980 pending: Arc::new(Mutex::new(HashMap::new())),
981 cancelled: Arc::new(AtomicBool::new(false)),
982 cancel_notify: Arc::new(Notify::new()),
983 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
984 session_id: std::sync::Mutex::new(String::new()),
985 script_name: std::sync::Mutex::new(String::new()),
986 queued_transcript_injections: HostBridgeInjectionState::default(),
987 resume_requested: Arc::new(AtomicBool::new(false)),
988 skills_reload_requested: Arc::new(AtomicBool::new(false)),
989 daemon_idle: Arc::new(AtomicBool::new(false)),
990 prompt_stop_reason: std::sync::Mutex::new(None),
991 visible_call_states: std::sync::Mutex::new(HashMap::new()),
992 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
993 in_process: Some(InProcessHost {
994 module_path: module_path.to_path_buf(),
995 exported_functions,
996 vm,
997 }),
998 })
999 }
1000
1001 pub fn set_session_id(&self, id: &str) {
1003 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
1004 }
1005
1006 pub fn set_script_name(&self, name: &str) {
1008 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
1009 }
1010
1011 fn get_script_name(&self) -> String {
1013 self.script_name
1014 .lock()
1015 .unwrap_or_else(|e| e.into_inner())
1016 .clone()
1017 }
1018
1019 pub fn get_session_id(&self) -> String {
1021 self.session_id
1022 .lock()
1023 .unwrap_or_else(|e| e.into_inner())
1024 .clone()
1025 }
1026
1027 fn write_line(&self, line: &str) -> Result<(), VmError> {
1029 (self.writer)(line).map_err(VmError::Runtime)
1030 }
1031
1032 pub async fn call(
1035 &self,
1036 method: &str,
1037 params: serde_json::Value,
1038 ) -> Result<serde_json::Value, VmError> {
1039 if let Some(in_process) = &self.in_process {
1040 return in_process.dispatch(method, params).await;
1041 }
1042
1043 if self.is_cancelled() {
1044 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1045 }
1046
1047 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
1048 let cancel_wait = self.cancel_notify.notified();
1049 tokio::pin!(cancel_wait);
1050
1051 let request = crate::jsonrpc::request(id, method, params);
1052
1053 let (tx, rx) = oneshot::channel();
1054 {
1055 let mut pending = self.pending.lock().await;
1056 pending.insert(id, tx);
1057 }
1058
1059 let line = serde_json::to_string(&request)
1060 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
1061 if let Err(e) = self.write_line(&line) {
1062 let mut pending = self.pending.lock().await;
1063 pending.remove(&id);
1064 return Err(e);
1065 }
1066
1067 if self.is_cancelled() {
1068 let mut pending = self.pending.lock().await;
1069 pending.remove(&id);
1070 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1071 }
1072
1073 let response = tokio::select! {
1074 result = rx => match result {
1075 Ok(msg) => msg,
1076 Err(_) => {
1077 return Err(VmError::Runtime(
1079 "Bridge: host closed connection before responding".into(),
1080 ));
1081 }
1082 },
1083 _ = &mut cancel_wait => {
1084 let mut pending = self.pending.lock().await;
1085 pending.remove(&id);
1086 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
1087 }
1088 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
1089 let mut pending = self.pending.lock().await;
1090 pending.remove(&id);
1091 return Err(VmError::Runtime(format!(
1092 "Bridge: host did not respond to '{method}' within {}s",
1093 DEFAULT_TIMEOUT.as_secs()
1094 )));
1095 }
1096 };
1097
1098 if let Some(error) = response.get("error") {
1099 let message = error["message"].as_str().unwrap_or("Unknown host error");
1100 let code = error["code"].as_i64().unwrap_or(-1);
1101 if code == -32001 {
1103 return Err(VmError::CategorizedError {
1104 message: message.to_string(),
1105 category: ErrorCategory::ToolRejected,
1106 });
1107 }
1108 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
1109 }
1110
1111 Ok(response["result"].clone())
1112 }
1113
1114 pub fn notify(&self, method: &str, params: serde_json::Value) {
1117 let notification = crate::jsonrpc::notification(method, params);
1118 if self.in_process.is_some() {
1119 return;
1120 }
1121 if let Ok(line) = serde_json::to_string(¬ification) {
1122 let _ = self.write_line(&line);
1123 }
1124 }
1125
1126 pub fn is_cancelled(&self) -> bool {
1128 self.cancelled.load(Ordering::SeqCst)
1129 }
1130
1131 pub fn take_resume_signal(&self) -> bool {
1132 self.resume_requested.swap(false, Ordering::SeqCst)
1133 }
1134
1135 pub fn signal_resume(&self) {
1136 self.resume_requested.store(true, Ordering::SeqCst);
1137 }
1138
1139 pub fn set_daemon_idle(&self, idle: bool) {
1140 self.daemon_idle.store(idle, Ordering::SeqCst);
1141 }
1142
1143 pub fn is_daemon_idle(&self) -> bool {
1144 self.daemon_idle.load(Ordering::SeqCst)
1145 }
1146
1147 pub fn set_prompt_stop_reason(&self, reason: &str) {
1152 *self
1153 .prompt_stop_reason
1154 .lock()
1155 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1156 }
1157
1158 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1163 self.prompt_stop_reason
1164 .lock()
1165 .unwrap_or_else(|e| e.into_inner())
1166 .take()
1167 }
1168
1169 pub fn take_skills_reload_signal(&self) -> bool {
1174 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1175 }
1176
1177 pub fn signal_skills_reload(&self) {
1181 self.skills_reload_requested.store(true, Ordering::SeqCst);
1182 }
1183
1184 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1190 let result = self.call("skills/list", serde_json::json!({})).await?;
1191 match result {
1192 serde_json::Value::Array(items) => Ok(items),
1193 serde_json::Value::Object(map) => match map.get("skills") {
1194 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1195 _ => Err(VmError::Runtime(
1196 "skills/list: host response must be an array or { skills: [...] }".into(),
1197 )),
1198 },
1199 _ => Err(VmError::Runtime(
1200 "skills/list: unexpected response shape".into(),
1201 )),
1202 }
1203 }
1204
1205 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1211 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1212 parse_host_tools_list_response(result)
1213 }
1214
1215 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1219 self.call("skills/fetch", serde_json::json!({ "id": id }))
1220 .await
1221 }
1222
1223 pub fn injection_state(&self) -> HostBridgeInjectionState {
1224 self.queued_transcript_injections.clone()
1225 }
1226
1227 pub async fn push_pending_user_message(
1228 &self,
1229 content: String,
1230 transcript_content: serde_json::Value,
1231 mode: &str,
1232 ) -> String {
1233 self.queued_transcript_injections
1234 .push_pending_user_message(content, transcript_content, mode)
1235 .await
1236 }
1237
1238 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1239 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1240 .await
1241 }
1242
1243 pub async fn revoke_pending_user_message(
1244 &self,
1245 message_id: &str,
1246 ) -> PendingUserMessageMutationResult {
1247 self.queued_transcript_injections
1248 .revoke_pending_user_message(message_id)
1249 .await
1250 }
1251
1252 pub async fn revoke_pending_reminder(
1253 &self,
1254 reminder_id: &str,
1255 ) -> PendingReminderMutationResult {
1256 self.queued_transcript_injections
1257 .revoke_pending_reminder(reminder_id)
1258 .await
1259 }
1260
1261 pub async fn pending_injections_json(&self) -> serde_json::Value {
1262 self.queued_transcript_injections
1263 .pending_injections_json()
1264 .await
1265 }
1266
1267 pub async fn replace_pending_user_message(
1268 &self,
1269 message_id: &str,
1270 content: String,
1271 transcript_content: serde_json::Value,
1272 ) -> PendingUserMessageMutationResult {
1273 self.queued_transcript_injections
1274 .replace_pending_user_message(message_id, content, transcript_content)
1275 .await
1276 }
1277
1278 pub async fn push_queued_session_remind_from_params(
1279 &self,
1280 params: &serde_json::Value,
1281 ) -> Result<String, String> {
1282 let reminder = queued_session_remind_from_params(params)?;
1283 let reminder_id = reminder.reminder.id.clone();
1284 self.queued_transcript_injections
1285 .push_session_reminder(reminder)
1286 .await;
1287 Ok(reminder_id)
1288 }
1289
1290 pub async fn take_queued_user_messages(
1291 &self,
1292 include_interrupt_immediate: bool,
1293 include_finish_step: bool,
1294 include_audit_only: bool,
1295 ) -> Vec<QueuedUserMessage> {
1296 let mut state = self.queued_transcript_injections.inner.lock().await;
1297 let mut selected = Vec::new();
1298 let mut retained = VecDeque::new();
1299 while let Some(injection) = state.queue.pop_front() {
1300 let should_take = match injection.mode() {
1301 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1302 QueuedUserMessageMode::FinishStep => include_finish_step,
1303 QueuedUserMessageMode::AuditOnly => include_audit_only,
1304 };
1305 match (should_take, injection) {
1306 (true, QueuedTranscriptInjection::User(message)) => {
1307 state
1308 .delivered_user_message_ids
1309 .insert(message.message_id.clone());
1310 selected.push(message);
1311 }
1312 (_, injection) => retained.push_back(injection),
1313 }
1314 }
1315 state.queue = retained;
1316 selected
1317 }
1318
1319 pub async fn take_queued_transcript_injections(
1320 &self,
1321 include_interrupt_immediate: bool,
1322 include_finish_step: bool,
1323 include_audit_only: bool,
1324 ) -> Vec<QueuedTranscriptInjection> {
1325 let mut state = self.queued_transcript_injections.inner.lock().await;
1326 let mut selected = Vec::new();
1327 let mut retained = VecDeque::new();
1328 while let Some(injection) = state.queue.pop_front() {
1329 let should_take = match injection.mode() {
1330 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1331 QueuedUserMessageMode::FinishStep => include_finish_step,
1332 QueuedUserMessageMode::AuditOnly => include_audit_only,
1333 };
1334 if should_take {
1335 match &injection {
1336 QueuedTranscriptInjection::User(message) => {
1337 state
1338 .delivered_user_message_ids
1339 .insert(message.message_id.clone());
1340 }
1341 QueuedTranscriptInjection::Reminder(reminder) => {
1342 state
1343 .delivered_reminder_ids
1344 .insert(reminder.reminder.id.clone());
1345 }
1346 }
1347 selected.push(injection);
1348 } else {
1349 retained.push_back(injection);
1350 }
1351 }
1352 state.queue = retained;
1353 selected
1354 }
1355
1356 pub async fn take_queued_user_messages_for(
1357 &self,
1358 checkpoint: DeliveryCheckpoint,
1359 ) -> Vec<QueuedUserMessage> {
1360 match checkpoint {
1361 DeliveryCheckpoint::InterruptImmediate => {
1362 self.take_queued_user_messages(true, false, false).await
1363 }
1364 DeliveryCheckpoint::AfterCurrentOperation => {
1365 self.take_queued_user_messages(false, true, false).await
1366 }
1367 DeliveryCheckpoint::EndOfInteraction => {
1368 self.take_queued_user_messages(false, false, true).await
1369 }
1370 }
1371 }
1372
1373 pub async fn take_queued_transcript_injections_for(
1374 &self,
1375 checkpoint: DeliveryCheckpoint,
1376 ) -> Vec<QueuedTranscriptInjection> {
1377 match checkpoint {
1378 DeliveryCheckpoint::InterruptImmediate => {
1379 self.take_queued_transcript_injections(true, false, false)
1380 .await
1381 }
1382 DeliveryCheckpoint::AfterCurrentOperation => {
1383 self.take_queued_transcript_injections(false, true, false)
1384 .await
1385 }
1386 DeliveryCheckpoint::EndOfInteraction => {
1387 self.take_queued_transcript_injections(false, false, true)
1388 .await
1389 }
1390 }
1391 }
1392
1393 pub fn send_output(&self, text: &str) {
1395 self.notify("output", serde_json::json!({"text": text}));
1396 }
1397
1398 pub fn send_progress(
1400 &self,
1401 phase: &str,
1402 message: &str,
1403 progress: Option<i64>,
1404 total: Option<i64>,
1405 data: Option<serde_json::Value>,
1406 ) {
1407 let mut payload = serde_json::json!({"phase": phase, "message": message});
1408 if let Some(p) = progress {
1409 payload["progress"] = serde_json::json!(p);
1410 }
1411 if let Some(t) = total {
1412 payload["total"] = serde_json::json!(t);
1413 }
1414 if let Some(d) = data {
1415 payload["data"] = d;
1416 }
1417 self.notify("progress", payload);
1418 }
1419
1420 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1422 let mut payload = serde_json::json!({"level": level, "message": message});
1423 if let Some(f) = fields {
1424 payload["fields"] = f;
1425 }
1426 self.notify("log", payload);
1427 }
1428
1429 pub fn send_call_start(
1432 &self,
1433 call_id: &str,
1434 call_type: &str,
1435 name: &str,
1436 metadata: serde_json::Value,
1437 ) {
1438 let session_id = self.get_session_id();
1439 let script = self.get_script_name();
1440 let stream_publicly = metadata
1441 .get("stream_publicly")
1442 .and_then(|value| value.as_bool())
1443 .unwrap_or(true);
1444 self.visible_call_streams
1445 .lock()
1446 .unwrap_or_else(|e| e.into_inner())
1447 .insert(call_id.to_string(), stream_publicly);
1448 self.notify(
1449 "session/update",
1450 serde_json::json!({
1451 "sessionId": session_id,
1452 "update": {
1453 "sessionUpdate": "call_start",
1454 "content": {
1455 "toolCallId": call_id,
1456 "call_type": call_type,
1457 "name": name,
1458 "script": script,
1459 "metadata": metadata,
1460 },
1461 },
1462 }),
1463 );
1464 }
1465
1466 pub fn send_call_progress(
1469 &self,
1470 call_id: &str,
1471 delta: &str,
1472 accumulated_tokens: u64,
1473 user_visible: bool,
1474 ) {
1475 let session_id = self.get_session_id();
1476 let (visible_text, visible_delta) = {
1477 let stream_publicly = self
1478 .visible_call_streams
1479 .lock()
1480 .unwrap_or_else(|e| e.into_inner())
1481 .get(call_id)
1482 .copied()
1483 .unwrap_or(true);
1484 if !user_visible || !stream_publicly {
1485 (String::new(), String::new())
1486 } else {
1487 let mut states = self
1488 .visible_call_states
1489 .lock()
1490 .unwrap_or_else(|e| e.into_inner());
1491 let state = states.entry(call_id.to_string()).or_default();
1492 state.push(delta, true)
1493 }
1494 };
1495 self.notify(
1496 "session/update",
1497 serde_json::json!({
1498 "sessionId": session_id,
1499 "update": {
1500 "sessionUpdate": "call_progress",
1501 "content": {
1502 "toolCallId": call_id,
1503 "delta": delta,
1504 "accumulated_tokens": accumulated_tokens,
1505 "visible_text": visible_text,
1506 "visible_delta": visible_delta,
1507 "user_visible": user_visible,
1508 },
1509 },
1510 }),
1511 );
1512 }
1513
1514 pub fn send_call_end(
1516 &self,
1517 call_id: &str,
1518 call_type: &str,
1519 name: &str,
1520 duration_ms: u64,
1521 status: &str,
1522 metadata: serde_json::Value,
1523 ) {
1524 let session_id = self.get_session_id();
1525 let script = self.get_script_name();
1526 self.visible_call_states
1527 .lock()
1528 .unwrap_or_else(|e| e.into_inner())
1529 .remove(call_id);
1530 self.visible_call_streams
1531 .lock()
1532 .unwrap_or_else(|e| e.into_inner())
1533 .remove(call_id);
1534 self.notify(
1535 "session/update",
1536 serde_json::json!({
1537 "sessionId": session_id,
1538 "update": {
1539 "sessionUpdate": "call_end",
1540 "content": {
1541 "toolCallId": call_id,
1542 "call_type": call_type,
1543 "name": name,
1544 "script": script,
1545 "duration_ms": duration_ms,
1546 "status": status,
1547 "metadata": metadata,
1548 },
1549 },
1550 }),
1551 );
1552 }
1553
1554 pub fn send_worker_update(
1556 &self,
1557 worker_id: &str,
1558 worker_name: &str,
1559 status: &str,
1560 metadata: serde_json::Value,
1561 audit: Option<&MutationSessionRecord>,
1562 ) {
1563 let session_id = self.get_session_id();
1564 let script = self.get_script_name();
1565 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1566 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1567 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1568 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1569 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1570 let lifecycle = serde_json::json!({
1571 "event": status,
1572 "worker_id": worker_id,
1573 "worker_name": worker_name,
1574 "started_at": started_at,
1575 "finished_at": finished_at,
1576 });
1577 self.notify(
1578 "session/update",
1579 serde_json::json!({
1580 "sessionId": session_id,
1581 "update": {
1582 "sessionUpdate": "worker_update",
1583 "content": {
1584 "worker_id": worker_id,
1585 "worker_name": worker_name,
1586 "status": status,
1587 "script": script,
1588 "started_at": started_at,
1589 "finished_at": finished_at,
1590 "snapshot_path": snapshot_path,
1591 "run_id": run_id,
1592 "run_path": run_path,
1593 "lifecycle": lifecycle,
1594 "audit": audit,
1595 "metadata": metadata,
1596 },
1597 },
1598 }),
1599 );
1600 }
1601}
1602
1603pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1605 crate::stdlib::json_to_vm_value(val)
1606}
1607
1608fn parse_host_tools_list_response(
1609 result: serde_json::Value,
1610) -> Result<Vec<serde_json::Value>, VmError> {
1611 let tools = match result {
1612 serde_json::Value::Array(items) => items,
1613 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1614 map.get("result")
1615 .and_then(|value| value.get("tools"))
1616 .cloned()
1617 }) {
1618 Some(serde_json::Value::Array(items)) => items,
1619 _ => {
1620 return Err(VmError::Runtime(
1621 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1622 ));
1623 }
1624 },
1625 _ => {
1626 return Err(VmError::Runtime(
1627 "host/tools/list: unexpected response shape".into(),
1628 ));
1629 }
1630 };
1631
1632 let mut normalized = Vec::with_capacity(tools.len());
1633 for tool in tools {
1634 let serde_json::Value::Object(map) = tool else {
1635 return Err(VmError::Runtime(
1636 "host/tools/list: every tool must be an object".into(),
1637 ));
1638 };
1639 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1640 return Err(VmError::Runtime(
1641 "host/tools/list: every tool must include a string `name`".into(),
1642 ));
1643 };
1644 let description = map
1645 .get("description")
1646 .and_then(|value| value.as_str())
1647 .or_else(|| {
1648 map.get("short_description")
1649 .and_then(|value| value.as_str())
1650 })
1651 .unwrap_or_default();
1652 let schema = map
1653 .get("schema")
1654 .cloned()
1655 .or_else(|| map.get("parameters").cloned())
1656 .or_else(|| map.get("input_schema").cloned())
1657 .unwrap_or(serde_json::Value::Null);
1658 let deprecated = map
1659 .get("deprecated")
1660 .and_then(|value| value.as_bool())
1661 .unwrap_or(false);
1662 normalized.push(serde_json::json!({
1663 "name": name,
1664 "description": description,
1665 "schema": schema,
1666 "deprecated": deprecated,
1667 }));
1668 }
1669 Ok(normalized)
1670}
1671
1672#[cfg(test)]
1673mod tests {
1674 use super::*;
1675
1676 fn test_bridge() -> HostBridge {
1677 HostBridge::from_parts(
1678 Arc::new(Mutex::new(HashMap::new())),
1679 Arc::new(AtomicBool::new(false)),
1680 Arc::new(std::sync::Mutex::new(())),
1681 1,
1682 )
1683 }
1684
1685 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1686 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1687 Arc::new(Mutex::new(HashMap::new())),
1688 Arc::new(AtomicBool::new(false)),
1689 Arc::new(Notify::new()),
1690 Arc::new(|_| Ok(())),
1691 100,
1692 Some(owner.injection_state()),
1693 )
1694 }
1695
1696 #[test]
1697 fn test_json_rpc_request_format() {
1698 let request = crate::jsonrpc::request(
1699 1,
1700 "llm_call",
1701 serde_json::json!({
1702 "prompt": "Hello",
1703 "system": "Be helpful",
1704 }),
1705 );
1706 let s = serde_json::to_string(&request).unwrap();
1707 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1708 assert!(s.contains("\"id\":1"));
1709 assert!(s.contains("\"method\":\"llm_call\""));
1710 }
1711
1712 #[test]
1713 fn test_json_rpc_notification_format() {
1714 let notification =
1715 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1716 let s = serde_json::to_string(¬ification).unwrap();
1717 assert!(s.contains("\"method\":\"output\""));
1718 assert!(!s.contains("\"id\""));
1719 }
1720
1721 #[test]
1722 fn test_json_rpc_error_response_parsing() {
1723 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1724 assert!(response.get("error").is_some());
1725 assert_eq!(
1726 response["error"]["message"].as_str().unwrap(),
1727 "Invalid request"
1728 );
1729 }
1730
1731 #[test]
1732 fn test_json_rpc_success_response_parsing() {
1733 let response = crate::jsonrpc::response(
1734 1,
1735 serde_json::json!({
1736 "text": "Hello world",
1737 "input_tokens": 10,
1738 "output_tokens": 5,
1739 }),
1740 );
1741 assert!(response.get("result").is_some());
1742 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1743 }
1744
1745 #[test]
1746 fn test_cancelled_flag() {
1747 let cancelled = Arc::new(AtomicBool::new(false));
1748 assert!(!cancelled.load(Ordering::SeqCst));
1749 cancelled.store(true, Ordering::SeqCst);
1750 assert!(cancelled.load(Ordering::SeqCst));
1751 }
1752
1753 #[test]
1754 fn pending_host_calls_return_when_cancellation_arrives() {
1755 let runtime = tokio::runtime::Builder::new_current_thread()
1756 .enable_all()
1757 .build()
1758 .unwrap();
1759 runtime.block_on(async {
1760 let pending = Arc::new(Mutex::new(HashMap::new()));
1761 let cancelled = Arc::new(AtomicBool::new(false));
1762 let bridge = HostBridge::from_parts_with_writer(
1763 pending.clone(),
1764 cancelled.clone(),
1765 Arc::new(|_| Ok(())),
1766 1,
1767 );
1768
1769 let call = bridge.call("host/work", serde_json::json!({}));
1770 tokio::pin!(call);
1771
1772 loop {
1773 tokio::select! {
1774 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1775 _ = tokio::task::yield_now() => {}
1776 }
1777 if !pending.lock().await.is_empty() {
1778 break;
1779 }
1780 }
1781
1782 cancelled.store(true, Ordering::SeqCst);
1783 bridge.cancel_notify.notify_waiters();
1784
1785 let result = tokio::time::timeout(Duration::from_secs(1), call)
1786 .await
1787 .expect("pending call should observe cancellation promptly");
1788 assert!(
1789 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1790 );
1791 assert!(pending.lock().await.is_empty());
1792 });
1793 }
1794
1795 #[test]
1796 fn call_progress_hides_non_user_visible_deltas() {
1797 let lines = Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
1798 let captured = lines.clone();
1799 let bridge = HostBridge::from_parts_with_writer(
1800 Arc::new(Mutex::new(HashMap::new())),
1801 Arc::new(AtomicBool::new(false)),
1802 Arc::new(move |line| {
1803 captured
1804 .lock()
1805 .unwrap_or_else(|e| e.into_inner())
1806 .push(line.to_string());
1807 Ok(())
1808 }),
1809 1,
1810 );
1811
1812 bridge.send_call_start(
1813 "call-1",
1814 "llm",
1815 "llm_call",
1816 serde_json::json!({"stream_publicly": true}),
1817 );
1818 bridge.send_call_progress(
1819 "call-1",
1820 r#"{"verdict":"done","reasoning":"internal"}"#,
1821 1,
1822 false,
1823 );
1824
1825 let lines = lines.lock().unwrap_or_else(|e| e.into_inner());
1826 let progress: serde_json::Value =
1827 serde_json::from_str(&lines[1]).expect("call_progress notification json");
1828 let content = &progress["params"]["update"]["content"];
1829 assert_eq!(
1830 content["delta"],
1831 r#"{"verdict":"done","reasoning":"internal"}"#
1832 );
1833 assert_eq!(content["user_visible"], false);
1834 assert_eq!(content["visible_text"], "");
1835 assert_eq!(content["visible_delta"], "");
1836 }
1837
1838 #[test]
1839 fn call_progress_hides_non_public_streams() {
1840 let lines = Arc::new(std::sync::Mutex::new(Vec::<String>::new()));
1841 let captured = lines.clone();
1842 let bridge = HostBridge::from_parts_with_writer(
1843 Arc::new(Mutex::new(HashMap::new())),
1844 Arc::new(AtomicBool::new(false)),
1845 Arc::new(move |line| {
1846 captured
1847 .lock()
1848 .unwrap_or_else(|e| e.into_inner())
1849 .push(line.to_string());
1850 Ok(())
1851 }),
1852 1,
1853 );
1854
1855 bridge.send_call_start(
1856 "call-1",
1857 "llm",
1858 "llm_call",
1859 serde_json::json!({"stream_publicly": false}),
1860 );
1861 bridge.send_call_progress("call-1", "secret schema bytes", 1, true);
1862
1863 let lines = lines.lock().unwrap_or_else(|e| e.into_inner());
1864 let progress: serde_json::Value =
1865 serde_json::from_str(&lines[1]).expect("call_progress notification json");
1866 let content = &progress["params"]["update"]["content"];
1867 assert_eq!(content["delta"], "secret schema bytes");
1868 assert_eq!(content["user_visible"], true);
1869 assert_eq!(content["visible_text"], "");
1870 assert_eq!(content["visible_delta"], "");
1871 }
1872
1873 #[test]
1874 fn queued_messages_are_filtered_by_delivery_mode() {
1875 let runtime = tokio::runtime::Builder::new_current_thread()
1876 .enable_all()
1877 .build()
1878 .unwrap();
1879 runtime.block_on(async {
1880 let bridge = test_bridge();
1881 bridge
1882 .push_queued_user_message("first".to_string(), "finish_step")
1883 .await;
1884 bridge
1885 .push_queued_user_message("second".to_string(), "audit_only")
1886 .await;
1887
1888 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1889 assert_eq!(finish_step.len(), 1);
1890 assert_eq!(finish_step[0].content, "first");
1891
1892 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1893 assert_eq!(audit_only.len(), 1);
1894 assert_eq!(audit_only[0].content, "second");
1895 });
1896 }
1897
1898 #[test]
1899 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1900 let runtime = tokio::runtime::Builder::new_current_thread()
1901 .enable_all()
1902 .build()
1903 .unwrap();
1904 runtime.block_on(async {
1905 let bridge = test_bridge();
1906 let first_id = bridge
1907 .push_pending_user_message(
1908 "first".to_string(),
1909 serde_json::json!("first"),
1910 "audit_only",
1911 )
1912 .await;
1913 let second_id = bridge
1914 .push_pending_user_message(
1915 "second".to_string(),
1916 serde_json::json!("second"),
1917 "audit_only",
1918 )
1919 .await;
1920
1921 assert_eq!(
1922 bridge
1923 .replace_pending_user_message(
1924 &second_id,
1925 "second edited".to_string(),
1926 serde_json::json!("second edited"),
1927 )
1928 .await,
1929 PendingUserMessageMutationResult::Mutated
1930 );
1931 assert_eq!(
1932 bridge.revoke_pending_user_message(&first_id).await,
1933 PendingUserMessageMutationResult::Mutated
1934 );
1935 assert_eq!(
1936 bridge.revoke_pending_user_message(&first_id).await,
1937 PendingUserMessageMutationResult::AlreadyRevoked
1938 );
1939
1940 let delivered = bridge
1941 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1942 .await;
1943 assert_eq!(delivered.len(), 1);
1944 assert_eq!(delivered[0].message_id, second_id);
1945 assert_eq!(delivered[0].content, "second edited");
1946
1947 assert_eq!(
1948 bridge.revoke_pending_user_message(&second_id).await,
1949 PendingUserMessageMutationResult::AlreadyDelivered
1950 );
1951 assert_eq!(
1952 bridge
1953 .replace_pending_user_message(
1954 &second_id,
1955 "too late".to_string(),
1956 serde_json::json!("too late"),
1957 )
1958 .await,
1959 PendingUserMessageMutationResult::AlreadyDelivered
1960 );
1961 assert_eq!(
1962 bridge.revoke_pending_user_message("missing").await,
1963 PendingUserMessageMutationResult::UnknownMessageId
1964 );
1965 });
1966 }
1967
1968 #[test]
1969 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1970 let runtime = tokio::runtime::Builder::new_current_thread()
1971 .enable_all()
1972 .build()
1973 .unwrap();
1974 runtime.block_on(async {
1975 let bridge = test_bridge();
1976 let first_id = bridge
1977 .push_pending_user_message(
1978 "first".to_string(),
1979 serde_json::json!("first"),
1980 "finish_step",
1981 )
1982 .await;
1983 let second_id = bridge
1984 .push_pending_user_message(
1985 "second".to_string(),
1986 serde_json::json!("second"),
1987 "finish_step",
1988 )
1989 .await;
1990 assert_eq!(
1991 bridge
1992 .replace_pending_user_message(
1993 &first_id,
1994 "first edited".to_string(),
1995 serde_json::json!("first edited"),
1996 )
1997 .await,
1998 PendingUserMessageMutationResult::Mutated
1999 );
2000
2001 let delivered = bridge
2002 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
2003 .await;
2004 assert_eq!(
2005 delivered
2006 .iter()
2007 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
2008 .collect::<Vec<_>>(),
2009 vec![
2010 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
2011 (&second_id, "second", QueuedUserMessageMode::FinishStep),
2012 ]
2013 );
2014 });
2015 }
2016
2017 #[test]
2018 fn pending_user_message_state_survives_bridge_replacement() {
2019 let runtime = tokio::runtime::Builder::new_current_thread()
2020 .enable_all()
2021 .build()
2022 .unwrap();
2023 runtime.block_on(async {
2024 let bridge = test_bridge();
2025 let revoked_id = bridge
2026 .push_pending_user_message(
2027 "revoke me".to_string(),
2028 serde_json::json!("revoke me"),
2029 "audit_only",
2030 )
2031 .await;
2032 let delivered_id = bridge
2033 .push_pending_user_message(
2034 "deliver me".to_string(),
2035 serde_json::json!("deliver me"),
2036 "audit_only",
2037 )
2038 .await;
2039 assert_eq!(
2040 bridge.revoke_pending_user_message(&revoked_id).await,
2041 PendingUserMessageMutationResult::Mutated
2042 );
2043 bridge.cancelled.store(true, Ordering::SeqCst);
2044
2045 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
2046 assert_eq!(
2047 replacement_bridge
2048 .revoke_pending_user_message(&revoked_id)
2049 .await,
2050 PendingUserMessageMutationResult::AlreadyRevoked
2051 );
2052 let delivered = replacement_bridge
2053 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
2054 .await;
2055 assert_eq!(delivered.len(), 1);
2056 assert_eq!(delivered[0].message_id, delivered_id);
2057 assert_eq!(delivered[0].content, "deliver me");
2058 assert_eq!(
2059 bridge.revoke_pending_user_message(&delivered_id).await,
2060 PendingUserMessageMutationResult::AlreadyDelivered
2061 );
2062 });
2063 }
2064
2065 #[test]
2066 fn queued_transcript_injections_preserve_user_reminder_separation() {
2067 let runtime = tokio::runtime::Builder::new_current_thread()
2068 .enable_all()
2069 .build()
2070 .unwrap();
2071 runtime.block_on(async {
2072 let bridge = test_bridge();
2073 bridge
2074 .push_queued_user_message("human follow-up".to_string(), "finish_step")
2075 .await;
2076 let reminder_id = bridge
2077 .push_queued_session_remind_from_params(&serde_json::json!({
2078 "body": "Host-provided ambient context.",
2079 "tags": ["host"],
2080 "dedupe_key": "host-context",
2081 "ttl_turns": 2,
2082 "mode": "audit_only",
2083 "_meta": {"harn": {"source": "test"}},
2084 }))
2085 .await
2086 .expect("valid reminder");
2087
2088 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
2089 assert_eq!(finish_step.len(), 1);
2090 assert_eq!(finish_step[0].content, "human follow-up");
2091
2092 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
2093 assert!(no_user_messages.is_empty());
2094
2095 let injections = bridge
2096 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
2097 .await;
2098 assert_eq!(injections.len(), 1);
2099 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
2100 panic!("expected queued reminder");
2101 };
2102 assert_eq!(reminder.reminder.id, reminder_id);
2103 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
2104 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
2105 assert_eq!(
2106 reminder.reminder.dedupe_key.as_deref(),
2107 Some("host-context")
2108 );
2109 assert_eq!(reminder.reminder.ttl_turns, Some(2));
2110 assert_eq!(
2111 reminder.reminder.source,
2112 crate::llm::helpers::ReminderSource::Bridge
2113 );
2114 });
2115 }
2116
2117 #[test]
2118 fn pending_injections_list_user_messages_and_reminders_in_fifo_order() {
2119 let runtime = tokio::runtime::Builder::new_current_thread()
2120 .enable_all()
2121 .build()
2122 .unwrap();
2123 runtime.block_on(async {
2124 let bridge = test_bridge();
2125 let message_id = bridge
2126 .push_pending_user_message(
2127 "human follow-up".to_string(),
2128 serde_json::json!([{"type": "text", "text": "human follow-up"}]),
2129 "finish_step",
2130 )
2131 .await;
2132 let reminder_id = bridge
2133 .push_queued_session_remind_from_params(&serde_json::json!({
2134 "id": "rem-test",
2135 "body": "Host reminder",
2136 "tags": ["host"],
2137 "dedupe_key": "host-reminder",
2138 "ttl_turns": 2,
2139 "mode": "interrupt_immediate",
2140 }))
2141 .await
2142 .expect("valid session/remind payload");
2143
2144 let pending = bridge.pending_injections_json().await;
2145 assert_eq!(pending["pendingCount"], 2);
2146 assert_eq!(pending["injections"][0]["kind"], "user");
2147 assert_eq!(pending["injections"][0]["id"], message_id);
2148 assert_eq!(pending["injections"][0]["messageId"], message_id);
2149 assert_eq!(pending["injections"][0]["mode"], "finish_step");
2150 assert_eq!(pending["injections"][0]["position"], 0);
2151 assert_eq!(pending["injections"][1]["kind"], "reminder");
2152 assert_eq!(pending["injections"][1]["id"], reminder_id);
2153 assert_eq!(pending["injections"][1]["reminderId"], "rem-test");
2154 assert_eq!(pending["injections"][1]["mode"], "interrupt_immediate");
2155 assert_eq!(pending["injections"][1]["body"], "Host reminder");
2156 assert_eq!(pending["injections"][1]["dedupeKey"], "host-reminder");
2157 assert_eq!(pending["injections"][1]["ttlTurns"], 2);
2158 assert_eq!(pending["injections"][1]["position"], 1);
2159 });
2160 }
2161
2162 #[test]
2163 fn pending_reminders_support_revoke_and_delivery_states() {
2164 let runtime = tokio::runtime::Builder::new_current_thread()
2165 .enable_all()
2166 .build()
2167 .unwrap();
2168 runtime.block_on(async {
2169 let bridge = test_bridge();
2170 let revoked_id = bridge
2171 .push_queued_session_remind_from_params(&serde_json::json!({
2172 "id": "rem-revoke",
2173 "body": "remove me",
2174 "mode": "finish_step",
2175 }))
2176 .await
2177 .expect("valid session/remind payload");
2178 let delivered_id = bridge
2179 .push_queued_session_remind_from_params(&serde_json::json!({
2180 "id": "rem-deliver",
2181 "body": "deliver me",
2182 "mode": "finish_step",
2183 }))
2184 .await
2185 .expect("valid session/remind payload");
2186
2187 assert_eq!(
2188 bridge.revoke_pending_reminder(&revoked_id).await,
2189 PendingReminderMutationResult::Mutated
2190 );
2191 assert_eq!(
2192 bridge.revoke_pending_reminder(&revoked_id).await,
2193 PendingReminderMutationResult::AlreadyRevoked
2194 );
2195
2196 let pending = bridge.pending_injections_json().await;
2197 assert_eq!(pending["pendingCount"], 1);
2198 assert_eq!(pending["injections"][0]["reminderId"], delivered_id);
2199
2200 let delivered = bridge
2201 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
2202 .await;
2203 assert_eq!(delivered.len(), 1);
2204 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2205 panic!("expected delivered reminder");
2206 };
2207 assert_eq!(reminder.reminder.id, delivered_id);
2208
2209 assert_eq!(
2210 bridge.revoke_pending_reminder(&delivered_id).await,
2211 PendingReminderMutationResult::AlreadyDelivered
2212 );
2213 assert_eq!(
2214 bridge.revoke_pending_reminder("missing").await,
2215 PendingReminderMutationResult::UnknownReminderId
2216 );
2217 });
2218 }
2219
2220 #[test]
2221 fn bridge_remind_modes_honor_delivery_checkpoints() {
2222 let runtime = tokio::runtime::Builder::new_current_thread()
2223 .enable_all()
2224 .build()
2225 .unwrap();
2226 runtime.block_on(async {
2227 let cases = [
2228 (
2229 "interrupt_immediate",
2230 DeliveryCheckpoint::InterruptImmediate,
2231 DeliveryCheckpoint::AfterCurrentOperation,
2232 ),
2233 (
2234 "finish_step",
2235 DeliveryCheckpoint::AfterCurrentOperation,
2236 DeliveryCheckpoint::EndOfInteraction,
2237 ),
2238 (
2239 "audit_only",
2240 DeliveryCheckpoint::EndOfInteraction,
2241 DeliveryCheckpoint::InterruptImmediate,
2242 ),
2243 ];
2244
2245 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
2246 let bridge = test_bridge();
2247 bridge
2248 .push_queued_session_remind_from_params(&serde_json::json!({
2249 "body": format!("Reminder for {mode}"),
2250 "mode": mode,
2251 }))
2252 .await
2253 .expect("valid session/remind payload");
2254
2255 let premature = bridge
2256 .take_queued_transcript_injections_for(wrong_checkpoint)
2257 .await;
2258 assert!(
2259 premature.is_empty(),
2260 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
2261 );
2262
2263 let delivered = bridge
2264 .take_queued_transcript_injections_for(expected_checkpoint)
2265 .await;
2266 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
2267 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
2268 panic!("expected reminder for {mode}");
2269 };
2270 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
2271 }
2272 });
2273 }
2274
2275 #[test]
2276 fn session_remind_validation_rejects_user_message_shape() {
2277 let err = queued_session_remind_from_params(&serde_json::json!({
2278 "content": "this is still a user message",
2279 "mode": "interrupt_immediate",
2280 }))
2281 .expect_err("session/remind must require a reminder body");
2282 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
2283 assert!(err.contains("body"));
2284 }
2285
2286 #[test]
2287 fn session_remind_validation_rejects_unknown_options_separately() {
2288 let err = queued_session_remind_from_params(&serde_json::json!({
2289 "body": "valid body",
2290 "unknown_host_field": true,
2291 }))
2292 .expect_err("session/remind must reject unknown top-level fields");
2293 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
2294 assert!(err.contains("unknown_host_field"));
2295 }
2296
2297 #[test]
2298 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
2299 let err = queued_session_remind_from_params(&serde_json::json!({
2300 "body": "valid body",
2301 "propagate": "workspace",
2302 }))
2303 .expect_err("session/remind must reject unknown propagate values");
2304 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
2305 assert!(err.contains("propagate"));
2306 }
2307
2308 #[test]
2309 fn test_json_result_to_vm_value_string() {
2310 let val = serde_json::json!("hello");
2311 let vm_val = json_result_to_vm_value(&val);
2312 assert_eq!(vm_val.display(), "hello");
2313 }
2314
2315 #[test]
2316 fn test_json_result_to_vm_value_dict() {
2317 let val = serde_json::json!({"name": "test", "count": 42});
2318 let vm_val = json_result_to_vm_value(&val);
2319 let VmValue::Dict(d) = &vm_val else {
2320 unreachable!("Expected Dict, got {:?}", vm_val);
2321 };
2322 assert_eq!(d.get("name").unwrap().display(), "test");
2323 assert_eq!(d.get("count").unwrap().display(), "42");
2324 }
2325
2326 #[test]
2327 fn test_json_result_to_vm_value_null() {
2328 let val = serde_json::json!(null);
2329 let vm_val = json_result_to_vm_value(&val);
2330 assert!(matches!(vm_val, VmValue::Nil));
2331 }
2332
2333 #[test]
2334 fn test_json_result_to_vm_value_nested() {
2335 let val = serde_json::json!({
2336 "text": "response",
2337 "tool_calls": [
2338 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2339 ],
2340 "input_tokens": 100,
2341 "output_tokens": 50,
2342 });
2343 let vm_val = json_result_to_vm_value(&val);
2344 let VmValue::Dict(d) = &vm_val else {
2345 unreachable!("Expected Dict, got {:?}", vm_val);
2346 };
2347 assert_eq!(d.get("text").unwrap().display(), "response");
2348 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2349 unreachable!("Expected List for tool_calls");
2350 };
2351 assert_eq!(list.len(), 1);
2352 }
2353
2354 #[test]
2355 fn parse_host_tools_list_accepts_object_wrapper() {
2356 let tools = parse_host_tools_list_response(serde_json::json!({
2357 "tools": [
2358 {
2359 "name": "Read",
2360 "description": "Read a file",
2361 "schema": {"type": "object"},
2362 }
2363 ]
2364 }))
2365 .expect("tool list");
2366
2367 assert_eq!(tools.len(), 1);
2368 assert_eq!(tools[0]["name"], "Read");
2369 assert_eq!(tools[0]["deprecated"], false);
2370 }
2371
2372 #[test]
2373 fn parse_host_tools_list_accepts_compat_fields() {
2374 let tools = parse_host_tools_list_response(serde_json::json!({
2375 "result": {
2376 "tools": [
2377 {
2378 "name": "Edit",
2379 "short_description": "Apply an edit",
2380 "input_schema": {"type": "object"},
2381 "deprecated": true,
2382 }
2383 ]
2384 }
2385 }))
2386 .expect("tool list");
2387
2388 assert_eq!(tools[0]["description"], "Apply an edit");
2389 assert_eq!(tools[0]["schema"]["type"], "object");
2390 assert_eq!(tools[0]["deprecated"], true);
2391 }
2392
2393 #[test]
2394 fn parse_host_tools_list_requires_tool_names() {
2395 let err = parse_host_tools_list_response(serde_json::json!({
2396 "tools": [
2397 {"description": "missing name"}
2398 ]
2399 }))
2400 .expect_err("expected error");
2401 assert!(err
2402 .to_string()
2403 .contains("host/tools/list: every tool must include a string `name`"));
2404 }
2405
2406 #[test]
2407 fn test_timeout_duration() {
2408 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2409 }
2410}