1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use harn_parser::diagnostic_codes::Code;
19
20use crate::orchestration::MutationSessionRecord;
21use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
22use crate::visible_text::VisibleTextState;
23use crate::vm::Vm;
24
25const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
27
28pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
29
30fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
31 Arc::new(move |line: &str| {
32 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
33 let mut stdout = std::io::stdout().lock();
34 stdout
35 .write_all(line.as_bytes())
36 .map_err(|e| format!("Bridge write error: {e}"))?;
37 stdout
38 .write_all(b"\n")
39 .map_err(|e| format!("Bridge write error: {e}"))?;
40 stdout
41 .flush()
42 .map_err(|e| format!("Bridge flush error: {e}"))?;
43 Ok(())
44 })
45}
46
47pub struct HostBridge {
54 next_id: AtomicU64,
55 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
57 cancelled: Arc<AtomicBool>,
59 cancel_notify: Arc<Notify>,
61 writer: HostBridgeWriter,
63 session_id: std::sync::Mutex<String>,
65 script_name: std::sync::Mutex<String>,
67 queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>>,
69 resume_requested: Arc<AtomicBool>,
71 skills_reload_requested: Arc<AtomicBool>,
76 daemon_idle: Arc<AtomicBool>,
78 prompt_stop_reason: std::sync::Mutex<Option<String>>,
84 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
86 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
88 in_process: Option<InProcessHost>,
90}
91
92struct InProcessHost {
93 module_path: PathBuf,
94 exported_functions: BTreeMap<String, Rc<VmClosure>>,
95 vm: Vm,
96}
97
98impl InProcessHost {
99 async fn dispatch(
100 &self,
101 method: &str,
102 params: serde_json::Value,
103 ) -> Result<serde_json::Value, VmError> {
104 match method {
105 "builtin_call" => {
106 let name = params
107 .get("name")
108 .and_then(|value| value.as_str())
109 .unwrap_or_default();
110 let args = params
111 .get("args")
112 .and_then(|value| value.as_array())
113 .cloned()
114 .unwrap_or_default()
115 .into_iter()
116 .map(|value| json_result_to_vm_value(&value))
117 .collect::<Vec<_>>();
118 self.invoke_export(name, &args).await
119 }
120 "host/tools/list" => self
121 .invoke_optional_export("host_tools_list", &[])
122 .await
123 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
124 "session/request_permission" => self.request_permission(params).await,
125 other => Err(VmError::Runtime(format!(
126 "playground host backend does not implement bridge method '{other}'"
127 ))),
128 }
129 }
130
131 async fn invoke_export(
132 &self,
133 name: &str,
134 args: &[VmValue],
135 ) -> Result<serde_json::Value, VmError> {
136 let Some(closure) = self.exported_functions.get(name) else {
137 return Err(VmError::Runtime(format!(
138 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
139 self.module_path.display()
140 )));
141 };
142
143 let mut vm = self.vm.child_vm_for_host();
144 let result = vm.call_closure_pub(closure, args).await?;
145 Ok(crate::llm::vm_value_to_json(&result))
146 }
147
148 async fn invoke_optional_export(
149 &self,
150 name: &str,
151 args: &[VmValue],
152 ) -> Result<Option<serde_json::Value>, VmError> {
153 if !self.exported_functions.contains_key(name) {
154 return Ok(None);
155 }
156 self.invoke_export(name, args).await.map(Some)
157 }
158
159 async fn request_permission(
160 &self,
161 params: serde_json::Value,
162 ) -> Result<serde_json::Value, VmError> {
163 let Some(closure) = self.exported_functions.get("request_permission") else {
164 return Ok(serde_json::json!({ "granted": true }));
165 };
166
167 let tool_name = params
168 .get("toolCall")
169 .and_then(|tool_call| tool_call.get("toolName"))
170 .and_then(|value| value.as_str())
171 .unwrap_or_default();
172 let tool_args = params
173 .get("toolCall")
174 .and_then(|tool_call| tool_call.get("rawInput"))
175 .map(json_result_to_vm_value)
176 .unwrap_or(VmValue::Nil);
177 let full_payload = json_result_to_vm_value(¶ms);
178
179 let arg_count = closure.func.params.len();
180 let args = if arg_count >= 3 {
181 vec![
182 VmValue::String(Rc::from(tool_name.to_string())),
183 tool_args,
184 full_payload,
185 ]
186 } else if arg_count == 2 {
187 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
188 } else if arg_count == 1 {
189 vec![full_payload]
190 } else {
191 Vec::new()
192 };
193
194 let mut vm = self.vm.child_vm_for_host();
195 let result = vm.call_closure_pub(closure, &args).await?;
196 let payload = match result {
197 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
198 VmValue::String(reason) if !reason.is_empty() => {
199 serde_json::json!({ "granted": false, "reason": reason.to_string() })
200 }
201 other => {
202 let json = crate::llm::vm_value_to_json(&other);
203 if json
204 .get("granted")
205 .and_then(|value| value.as_bool())
206 .is_some()
207 || json.get("outcome").is_some()
208 {
209 json
210 } else {
211 serde_json::json!({ "granted": other.is_truthy() })
212 }
213 }
214 };
215 Ok(payload)
216 }
217}
218
219#[derive(Clone, Copy, Debug, PartialEq, Eq)]
220pub enum QueuedUserMessageMode {
221 InterruptImmediate,
222 FinishStep,
223 WaitForCompletion,
224}
225
226#[derive(Clone, Copy, Debug, PartialEq, Eq)]
227pub enum DeliveryCheckpoint {
228 InterruptImmediate,
229 AfterCurrentOperation,
230 EndOfInteraction,
231}
232
233impl QueuedUserMessageMode {
234 fn from_str(value: &str) -> Self {
235 match value {
236 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
237 "finish_step" | "after_current_operation" => Self::FinishStep,
238 _ => Self::WaitForCompletion,
239 }
240 }
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub struct QueuedUserMessage {
245 pub content: String,
246 pub mode: QueuedUserMessageMode,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub struct QueuedReminder {
251 pub reminder: crate::llm::helpers::SystemReminder,
252 pub mode: QueuedUserMessageMode,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq)]
256pub enum QueuedTranscriptInjection {
257 User(QueuedUserMessage),
258 Reminder(QueuedReminder),
259}
260
261impl QueuedTranscriptInjection {
262 fn mode(&self) -> QueuedUserMessageMode {
263 match self {
264 Self::User(message) => message.mode,
265 Self::Reminder(reminder) => reminder.mode,
266 }
267 }
268}
269
270fn queue_user_message_from_params(params: &serde_json::Value) -> Option<QueuedUserMessage> {
271 let content = params
272 .get("content")
273 .and_then(|v| v.as_str())
274 .unwrap_or("")
275 .to_string();
276 if content.is_empty() {
277 return None;
278 }
279 let mode = QueuedUserMessageMode::from_str(
280 params
281 .get("mode")
282 .and_then(|v| v.as_str())
283 .unwrap_or("wait_for_completion"),
284 );
285 Some(QueuedUserMessage { content, mode })
286}
287
288fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
289 format!(
290 "{}: {}",
291 Code::ReminderUnknownOption.as_str(),
292 message.as_ref()
293 )
294}
295
296fn session_remind_shape_error(message: impl AsRef<str>) -> String {
297 format!(
298 "{}: {}",
299 Code::ReminderInvalidShape.as_str(),
300 message.as_ref()
301 )
302}
303
304fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
305 format!(
306 "{}: {}",
307 Code::ReminderUnknownPropagate.as_str(),
308 message.as_ref()
309 )
310}
311
312fn string_field(
313 map: &serde_json::Map<String, serde_json::Value>,
314 key: &str,
315 required: bool,
316) -> Result<Option<String>, String> {
317 match map.get(key) {
318 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
319 format!("`{key}` must be a non-empty string"),
320 )),
321 None | Some(serde_json::Value::Null) => Ok(None),
322 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
323 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
324 ),
325 Some(serde_json::Value::String(value)) => {
326 let trimmed = value.trim();
327 if trimmed.is_empty() {
328 Ok(None)
329 } else {
330 Ok(Some(trimmed.to_string()))
331 }
332 }
333 Some(other) => Err(session_remind_shape_error(format!(
334 "`{key}` must be a string, got {other}"
335 ))),
336 }
337}
338
339fn bool_field(
340 map: &serde_json::Map<String, serde_json::Value>,
341 key: &str,
342) -> Result<Option<bool>, String> {
343 match map.get(key) {
344 None | Some(serde_json::Value::Null) => Ok(None),
345 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
346 Some(other) => Err(session_remind_shape_error(format!(
347 "`{key}` must be a bool, got {other}"
348 ))),
349 }
350}
351
352fn int_field(
353 map: &serde_json::Map<String, serde_json::Value>,
354 key: &str,
355) -> Result<Option<i64>, String> {
356 match map.get(key) {
357 None | Some(serde_json::Value::Null) => Ok(None),
358 Some(serde_json::Value::Number(value)) => {
359 let Some(value) = value.as_i64() else {
360 return Err(session_remind_shape_error(format!(
361 "`{key}` must be an integer"
362 )));
363 };
364 Ok(Some(value))
365 }
366 Some(other) => Err(session_remind_shape_error(format!(
367 "`{key}` must be an int, got {other}"
368 ))),
369 }
370}
371
372fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
373 let Some(value) = map.get("tags") else {
374 return Ok(Vec::new());
375 };
376 if value.is_null() {
377 return Ok(Vec::new());
378 }
379 let Some(values) = value.as_array() else {
380 return Err(session_remind_shape_error("`tags` must be a list"));
381 };
382 let mut tags = Vec::new();
383 for value in values {
384 let Some(tag) = value.as_str() else {
385 return Err(session_remind_shape_error(format!(
386 "`tags` entries must be strings, got {value}"
387 )));
388 };
389 let tag = tag.trim();
390 if tag.is_empty() {
391 return Err(session_remind_shape_error(
392 "`tags` entries must be non-empty strings",
393 ));
394 }
395 if !tags.iter().any(|existing| existing == tag) {
396 tags.push(tag.to_string());
397 }
398 }
399 Ok(tags)
400}
401
402fn session_remind_payload_from_value(
403 value: &serde_json::Value,
404) -> Result<crate::llm::helpers::SystemReminder, String> {
405 let Some(map) = value.as_object() else {
406 return Err(session_remind_shape_error(
407 "session/remind payload must be a reminder object",
408 ));
409 };
410 const ALLOWED: &[&str] = &[
411 "_meta",
412 "body",
413 "dedupe_key",
414 "fired_at_turn",
415 "id",
416 "preserve_on_compact",
417 "propagate",
418 "role_hint",
419 "source",
420 "tags",
421 "ttl_turns",
422 ];
423 let unknown = map
424 .keys()
425 .filter(|key| !ALLOWED.contains(&key.as_str()))
426 .map(String::as_str)
427 .collect::<Vec<_>>();
428 if !unknown.is_empty() {
429 if unknown.contains(&"content") {
430 return Err(session_remind_shape_error(
431 "session/remind expects reminder `body`, not user-message `content`",
432 ));
433 }
434 return Err(reminder_unknown_option_error(format!(
435 "unknown reminder option(s): {}",
436 unknown.join(", ")
437 )));
438 }
439 if let Some(meta) = map.get("_meta") {
440 if !meta.is_null() && !meta.is_object() {
441 return Err(session_remind_shape_error("`_meta` must be an object"));
442 }
443 }
444 let ttl_turns = int_field(map, "ttl_turns")?;
445 if let Some(value) = ttl_turns {
446 if value <= 0 {
447 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
448 }
449 }
450 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
451 if fired_at_turn < 0 {
452 return Err(session_remind_shape_error(
453 "`fired_at_turn` must be >= 0 when provided",
454 ));
455 }
456 match string_field(map, "source", false)?.as_deref() {
457 None | Some("bridge") => {}
458 Some(_) => {
459 return Err(session_remind_shape_error(
460 "`source` for session/remind must be bridge when provided",
461 ))
462 }
463 }
464 let propagate = match string_field(map, "propagate", false)?.as_deref() {
465 None => crate::llm::helpers::ReminderPropagate::Session,
466 Some("all") => crate::llm::helpers::ReminderPropagate::All,
467 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
468 Some("none") => crate::llm::helpers::ReminderPropagate::None,
469 Some(_) => {
470 return Err(reminder_unknown_propagate_error(
471 "`propagate` must be one of all, session, or none",
472 ))
473 }
474 };
475 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
476 None => crate::llm::helpers::ReminderRoleHint::System,
477 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
478 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
479 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
480 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
481 Some(_) => {
482 return Err(session_remind_shape_error(
483 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
484 ))
485 }
486 };
487 Ok(crate::llm::helpers::SystemReminder {
488 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
489 tags: tags_field(map)?,
490 dedupe_key: string_field(map, "dedupe_key", false)?,
491 ttl_turns,
492 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
493 propagate,
494 role_hint,
495 source: crate::llm::helpers::ReminderSource::Bridge,
496 body: string_field(map, "body", true)?.unwrap_or_default(),
497 fired_at_turn,
498 originating_agent_id: None,
499 })
500}
501
502fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
503 let mode = QueuedUserMessageMode::from_str(
504 params
505 .get("mode")
506 .and_then(|value| value.as_str())
507 .unwrap_or("wait_for_completion"),
508 );
509 let reminder_value = if let Some(reminder) = params.get("reminder") {
510 reminder.clone()
511 } else {
512 let Some(params) = params.as_object() else {
513 return Err(session_remind_shape_error(
514 "session/remind params must be an object",
515 ));
516 };
517 let mut reminder = params.clone();
518 reminder.remove("mode");
519 reminder.remove("sessionId");
520 reminder.remove("session_id");
521 serde_json::Value::Object(reminder)
522 };
523 Ok(QueuedReminder {
524 reminder: session_remind_payload_from_value(&reminder_value)?,
525 mode,
526 })
527}
528
529#[allow(clippy::new_without_default)]
531impl HostBridge {
532 pub fn new() -> Self {
537 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
538 Arc::new(Mutex::new(HashMap::new()));
539 let cancelled = Arc::new(AtomicBool::new(false));
540 let cancel_notify = Arc::new(Notify::new());
541 let queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>> =
542 Arc::new(Mutex::new(VecDeque::new()));
543 let resume_requested = Arc::new(AtomicBool::new(false));
544 let skills_reload_requested = Arc::new(AtomicBool::new(false));
545 let daemon_idle = Arc::new(AtomicBool::new(false));
546
547 let pending_clone = pending.clone();
549 let cancelled_clone = cancelled.clone();
550 let cancel_notify_clone = cancel_notify.clone();
551 let queued_clone = queued_transcript_injections.clone();
552 let resume_clone = resume_requested.clone();
553 let skills_reload_clone = skills_reload_requested.clone();
554 tokio::task::spawn_local(async move {
555 let stdin = tokio::io::stdin();
556 let reader = tokio::io::BufReader::new(stdin);
557 let mut lines = reader.lines();
558
559 while let Ok(Some(line)) = lines.next_line().await {
560 let line = line.trim().to_string();
561 if line.is_empty() {
562 continue;
563 }
564
565 let msg: serde_json::Value = match serde_json::from_str(&line) {
566 Ok(v) => v,
567 Err(_) => continue,
568 };
569
570 if msg.get("id").is_none() {
572 if let Some(method) = msg["method"].as_str() {
573 if method == "cancel" {
574 cancelled_clone.store(true, Ordering::SeqCst);
575 cancel_notify_clone.notify_waiters();
576 } else if method == "agent/resume" {
577 resume_clone.store(true, Ordering::SeqCst);
578 } else if method == "skills/update" {
579 skills_reload_clone.store(true, Ordering::SeqCst);
580 } else if method == "user_message"
581 || method == "session/input"
582 || method == "agent/user_message"
583 {
584 let params = &msg["params"];
585 if let Some(message) = queue_user_message_from_params(params) {
586 queued_clone
587 .lock()
588 .await
589 .push_back(QueuedTranscriptInjection::User(message));
590 }
591 } else if method == "session/remind" {
592 let params = &msg["params"];
593 if let Ok(reminder) = queued_session_remind_from_params(params) {
594 queued_clone
595 .lock()
596 .await
597 .push_back(QueuedTranscriptInjection::Reminder(reminder));
598 }
599 }
600 }
601 continue;
602 }
603
604 if let Some(id) = msg["id"].as_u64() {
605 let mut pending = pending_clone.lock().await;
606 if let Some(sender) = pending.remove(&id) {
607 let _ = sender.send(msg);
608 }
609 }
610 }
611
612 let mut pending = pending_clone.lock().await;
614 pending.clear();
615 });
616
617 Self {
618 next_id: AtomicU64::new(1),
619 pending,
620 cancelled,
621 cancel_notify,
622 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
623 session_id: std::sync::Mutex::new(String::new()),
624 script_name: std::sync::Mutex::new(String::new()),
625 queued_transcript_injections,
626 resume_requested,
627 skills_reload_requested,
628 daemon_idle,
629 prompt_stop_reason: std::sync::Mutex::new(None),
630 visible_call_states: std::sync::Mutex::new(HashMap::new()),
631 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
632 in_process: None,
633 }
634 }
635
636 pub fn from_parts(
642 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
643 cancelled: Arc<AtomicBool>,
644 stdout_lock: Arc<std::sync::Mutex<()>>,
645 start_id: u64,
646 ) -> Self {
647 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
648 }
649
650 pub fn from_parts_with_writer(
651 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
652 cancelled: Arc<AtomicBool>,
653 writer: HostBridgeWriter,
654 start_id: u64,
655 ) -> Self {
656 Self::from_parts_with_writer_and_cancel_notify(
657 pending,
658 cancelled,
659 Arc::new(Notify::new()),
660 writer,
661 start_id,
662 )
663 }
664
665 pub fn from_parts_with_writer_and_cancel_notify(
666 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
667 cancelled: Arc<AtomicBool>,
668 cancel_notify: Arc<Notify>,
669 writer: HostBridgeWriter,
670 start_id: u64,
671 ) -> Self {
672 Self {
673 next_id: AtomicU64::new(start_id),
674 pending,
675 cancelled,
676 cancel_notify,
677 writer,
678 session_id: std::sync::Mutex::new(String::new()),
679 script_name: std::sync::Mutex::new(String::new()),
680 queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
681 resume_requested: Arc::new(AtomicBool::new(false)),
682 skills_reload_requested: Arc::new(AtomicBool::new(false)),
683 daemon_idle: Arc::new(AtomicBool::new(false)),
684 prompt_stop_reason: std::sync::Mutex::new(None),
685 visible_call_states: std::sync::Mutex::new(HashMap::new()),
686 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
687 in_process: None,
688 }
689 }
690
691 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
694 let exported_functions = vm.load_module_exports(module_path).await?;
695 Ok(Self {
696 next_id: AtomicU64::new(1),
697 pending: Arc::new(Mutex::new(HashMap::new())),
698 cancelled: Arc::new(AtomicBool::new(false)),
699 cancel_notify: Arc::new(Notify::new()),
700 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
701 session_id: std::sync::Mutex::new(String::new()),
702 script_name: std::sync::Mutex::new(String::new()),
703 queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
704 resume_requested: Arc::new(AtomicBool::new(false)),
705 skills_reload_requested: Arc::new(AtomicBool::new(false)),
706 daemon_idle: Arc::new(AtomicBool::new(false)),
707 prompt_stop_reason: std::sync::Mutex::new(None),
708 visible_call_states: std::sync::Mutex::new(HashMap::new()),
709 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
710 in_process: Some(InProcessHost {
711 module_path: module_path.to_path_buf(),
712 exported_functions,
713 vm,
714 }),
715 })
716 }
717
718 pub fn set_session_id(&self, id: &str) {
720 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
721 }
722
723 pub fn set_script_name(&self, name: &str) {
725 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
726 }
727
728 fn get_script_name(&self) -> String {
730 self.script_name
731 .lock()
732 .unwrap_or_else(|e| e.into_inner())
733 .clone()
734 }
735
736 pub fn get_session_id(&self) -> String {
738 self.session_id
739 .lock()
740 .unwrap_or_else(|e| e.into_inner())
741 .clone()
742 }
743
744 fn write_line(&self, line: &str) -> Result<(), VmError> {
746 (self.writer)(line).map_err(VmError::Runtime)
747 }
748
749 pub async fn call(
752 &self,
753 method: &str,
754 params: serde_json::Value,
755 ) -> Result<serde_json::Value, VmError> {
756 if let Some(in_process) = &self.in_process {
757 return in_process.dispatch(method, params).await;
758 }
759
760 if self.is_cancelled() {
761 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
762 }
763
764 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
765 let cancel_wait = self.cancel_notify.notified();
766 tokio::pin!(cancel_wait);
767
768 let request = crate::jsonrpc::request(id, method, params);
769
770 let (tx, rx) = oneshot::channel();
771 {
772 let mut pending = self.pending.lock().await;
773 pending.insert(id, tx);
774 }
775
776 let line = serde_json::to_string(&request)
777 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
778 if let Err(e) = self.write_line(&line) {
779 let mut pending = self.pending.lock().await;
780 pending.remove(&id);
781 return Err(e);
782 }
783
784 if self.is_cancelled() {
785 let mut pending = self.pending.lock().await;
786 pending.remove(&id);
787 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
788 }
789
790 let response = tokio::select! {
791 result = rx => match result {
792 Ok(msg) => msg,
793 Err(_) => {
794 return Err(VmError::Runtime(
796 "Bridge: host closed connection before responding".into(),
797 ));
798 }
799 },
800 _ = &mut cancel_wait => {
801 let mut pending = self.pending.lock().await;
802 pending.remove(&id);
803 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
804 }
805 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
806 let mut pending = self.pending.lock().await;
807 pending.remove(&id);
808 return Err(VmError::Runtime(format!(
809 "Bridge: host did not respond to '{method}' within {}s",
810 DEFAULT_TIMEOUT.as_secs()
811 )));
812 }
813 };
814
815 if let Some(error) = response.get("error") {
816 let message = error["message"].as_str().unwrap_or("Unknown host error");
817 let code = error["code"].as_i64().unwrap_or(-1);
818 if code == -32001 {
820 return Err(VmError::CategorizedError {
821 message: message.to_string(),
822 category: ErrorCategory::ToolRejected,
823 });
824 }
825 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
826 }
827
828 Ok(response["result"].clone())
829 }
830
831 pub fn notify(&self, method: &str, params: serde_json::Value) {
834 let notification = crate::jsonrpc::notification(method, params);
835 if self.in_process.is_some() {
836 return;
837 }
838 if let Ok(line) = serde_json::to_string(¬ification) {
839 let _ = self.write_line(&line);
840 }
841 }
842
843 pub fn is_cancelled(&self) -> bool {
845 self.cancelled.load(Ordering::SeqCst)
846 }
847
848 pub fn take_resume_signal(&self) -> bool {
849 self.resume_requested.swap(false, Ordering::SeqCst)
850 }
851
852 pub fn signal_resume(&self) {
853 self.resume_requested.store(true, Ordering::SeqCst);
854 }
855
856 pub fn set_daemon_idle(&self, idle: bool) {
857 self.daemon_idle.store(idle, Ordering::SeqCst);
858 }
859
860 pub fn is_daemon_idle(&self) -> bool {
861 self.daemon_idle.load(Ordering::SeqCst)
862 }
863
864 pub fn set_prompt_stop_reason(&self, reason: &str) {
869 *self
870 .prompt_stop_reason
871 .lock()
872 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
873 }
874
875 pub fn take_prompt_stop_reason(&self) -> Option<String> {
880 self.prompt_stop_reason
881 .lock()
882 .unwrap_or_else(|e| e.into_inner())
883 .take()
884 }
885
886 pub fn take_skills_reload_signal(&self) -> bool {
891 self.skills_reload_requested.swap(false, Ordering::SeqCst)
892 }
893
894 pub fn signal_skills_reload(&self) {
898 self.skills_reload_requested.store(true, Ordering::SeqCst);
899 }
900
901 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
907 let result = self.call("skills/list", serde_json::json!({})).await?;
908 match result {
909 serde_json::Value::Array(items) => Ok(items),
910 serde_json::Value::Object(map) => match map.get("skills") {
911 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
912 _ => Err(VmError::Runtime(
913 "skills/list: host response must be an array or { skills: [...] }".into(),
914 )),
915 },
916 _ => Err(VmError::Runtime(
917 "skills/list: unexpected response shape".into(),
918 )),
919 }
920 }
921
922 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
928 let result = self.call("host/tools/list", serde_json::json!({})).await?;
929 parse_host_tools_list_response(result)
930 }
931
932 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
936 self.call("skills/fetch", serde_json::json!({ "id": id }))
937 .await
938 }
939
940 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
941 self.queued_transcript_injections
942 .lock()
943 .await
944 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
945 content,
946 mode: QueuedUserMessageMode::from_str(mode),
947 }));
948 }
949
950 pub async fn push_queued_session_remind_from_params(
951 &self,
952 params: &serde_json::Value,
953 ) -> Result<String, String> {
954 let reminder = queued_session_remind_from_params(params)?;
955 let reminder_id = reminder.reminder.id.clone();
956 self.queued_transcript_injections
957 .lock()
958 .await
959 .push_back(QueuedTranscriptInjection::Reminder(reminder));
960 Ok(reminder_id)
961 }
962
963 pub async fn take_queued_user_messages(
964 &self,
965 include_interrupt_immediate: bool,
966 include_finish_step: bool,
967 include_wait_for_completion: bool,
968 ) -> Vec<QueuedUserMessage> {
969 let mut queue = self.queued_transcript_injections.lock().await;
970 let mut selected = Vec::new();
971 let mut retained = VecDeque::new();
972 while let Some(injection) = queue.pop_front() {
973 let should_take = match injection.mode() {
974 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
975 QueuedUserMessageMode::FinishStep => include_finish_step,
976 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
977 };
978 match (should_take, injection) {
979 (true, QueuedTranscriptInjection::User(message)) => selected.push(message),
980 (_, injection) => retained.push_back(injection),
981 }
982 }
983 *queue = retained;
984 selected
985 }
986
987 pub async fn take_queued_transcript_injections(
988 &self,
989 include_interrupt_immediate: bool,
990 include_finish_step: bool,
991 include_wait_for_completion: bool,
992 ) -> Vec<QueuedTranscriptInjection> {
993 let mut queue = self.queued_transcript_injections.lock().await;
994 let mut selected = Vec::new();
995 let mut retained = VecDeque::new();
996 while let Some(injection) = queue.pop_front() {
997 let should_take = match injection.mode() {
998 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
999 QueuedUserMessageMode::FinishStep => include_finish_step,
1000 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
1001 };
1002 if should_take {
1003 selected.push(injection);
1004 } else {
1005 retained.push_back(injection);
1006 }
1007 }
1008 *queue = retained;
1009 selected
1010 }
1011
1012 pub async fn take_queued_user_messages_for(
1013 &self,
1014 checkpoint: DeliveryCheckpoint,
1015 ) -> Vec<QueuedUserMessage> {
1016 match checkpoint {
1017 DeliveryCheckpoint::InterruptImmediate => {
1018 self.take_queued_user_messages(true, false, false).await
1019 }
1020 DeliveryCheckpoint::AfterCurrentOperation => {
1021 self.take_queued_user_messages(false, true, false).await
1022 }
1023 DeliveryCheckpoint::EndOfInteraction => {
1024 self.take_queued_user_messages(false, false, true).await
1025 }
1026 }
1027 }
1028
1029 pub async fn take_queued_transcript_injections_for(
1030 &self,
1031 checkpoint: DeliveryCheckpoint,
1032 ) -> Vec<QueuedTranscriptInjection> {
1033 match checkpoint {
1034 DeliveryCheckpoint::InterruptImmediate => {
1035 self.take_queued_transcript_injections(true, false, false)
1036 .await
1037 }
1038 DeliveryCheckpoint::AfterCurrentOperation => {
1039 self.take_queued_transcript_injections(false, true, false)
1040 .await
1041 }
1042 DeliveryCheckpoint::EndOfInteraction => {
1043 self.take_queued_transcript_injections(false, false, true)
1044 .await
1045 }
1046 }
1047 }
1048
1049 pub fn send_output(&self, text: &str) {
1051 self.notify("output", serde_json::json!({"text": text}));
1052 }
1053
1054 pub fn send_progress(
1056 &self,
1057 phase: &str,
1058 message: &str,
1059 progress: Option<i64>,
1060 total: Option<i64>,
1061 data: Option<serde_json::Value>,
1062 ) {
1063 let mut payload = serde_json::json!({"phase": phase, "message": message});
1064 if let Some(p) = progress {
1065 payload["progress"] = serde_json::json!(p);
1066 }
1067 if let Some(t) = total {
1068 payload["total"] = serde_json::json!(t);
1069 }
1070 if let Some(d) = data {
1071 payload["data"] = d;
1072 }
1073 self.notify("progress", payload);
1074 }
1075
1076 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1078 let mut payload = serde_json::json!({"level": level, "message": message});
1079 if let Some(f) = fields {
1080 payload["fields"] = f;
1081 }
1082 self.notify("log", payload);
1083 }
1084
1085 pub fn send_call_start(
1088 &self,
1089 call_id: &str,
1090 call_type: &str,
1091 name: &str,
1092 metadata: serde_json::Value,
1093 ) {
1094 let session_id = self.get_session_id();
1095 let script = self.get_script_name();
1096 let stream_publicly = metadata
1097 .get("stream_publicly")
1098 .and_then(|value| value.as_bool())
1099 .unwrap_or(true);
1100 self.visible_call_streams
1101 .lock()
1102 .unwrap_or_else(|e| e.into_inner())
1103 .insert(call_id.to_string(), stream_publicly);
1104 self.notify(
1105 "session/update",
1106 serde_json::json!({
1107 "sessionId": session_id,
1108 "update": {
1109 "sessionUpdate": "call_start",
1110 "content": {
1111 "toolCallId": call_id,
1112 "call_type": call_type,
1113 "name": name,
1114 "script": script,
1115 "metadata": metadata,
1116 },
1117 },
1118 }),
1119 );
1120 }
1121
1122 pub fn send_call_progress(
1125 &self,
1126 call_id: &str,
1127 delta: &str,
1128 accumulated_tokens: u64,
1129 user_visible: bool,
1130 ) {
1131 let session_id = self.get_session_id();
1132 let (visible_text, visible_delta) = {
1133 let stream_publicly = self
1134 .visible_call_streams
1135 .lock()
1136 .unwrap_or_else(|e| e.into_inner())
1137 .get(call_id)
1138 .copied()
1139 .unwrap_or(true);
1140 let mut states = self
1141 .visible_call_states
1142 .lock()
1143 .unwrap_or_else(|e| e.into_inner());
1144 let state = states.entry(call_id.to_string()).or_default();
1145 state.push(delta, stream_publicly)
1146 };
1147 self.notify(
1148 "session/update",
1149 serde_json::json!({
1150 "sessionId": session_id,
1151 "update": {
1152 "sessionUpdate": "call_progress",
1153 "content": {
1154 "toolCallId": call_id,
1155 "delta": delta,
1156 "accumulated_tokens": accumulated_tokens,
1157 "visible_text": visible_text,
1158 "visible_delta": visible_delta,
1159 "user_visible": user_visible,
1160 },
1161 },
1162 }),
1163 );
1164 }
1165
1166 pub fn send_call_end(
1168 &self,
1169 call_id: &str,
1170 call_type: &str,
1171 name: &str,
1172 duration_ms: u64,
1173 status: &str,
1174 metadata: serde_json::Value,
1175 ) {
1176 let session_id = self.get_session_id();
1177 let script = self.get_script_name();
1178 self.visible_call_states
1179 .lock()
1180 .unwrap_or_else(|e| e.into_inner())
1181 .remove(call_id);
1182 self.visible_call_streams
1183 .lock()
1184 .unwrap_or_else(|e| e.into_inner())
1185 .remove(call_id);
1186 self.notify(
1187 "session/update",
1188 serde_json::json!({
1189 "sessionId": session_id,
1190 "update": {
1191 "sessionUpdate": "call_end",
1192 "content": {
1193 "toolCallId": call_id,
1194 "call_type": call_type,
1195 "name": name,
1196 "script": script,
1197 "duration_ms": duration_ms,
1198 "status": status,
1199 "metadata": metadata,
1200 },
1201 },
1202 }),
1203 );
1204 }
1205
1206 pub fn send_worker_update(
1208 &self,
1209 worker_id: &str,
1210 worker_name: &str,
1211 status: &str,
1212 metadata: serde_json::Value,
1213 audit: Option<&MutationSessionRecord>,
1214 ) {
1215 let session_id = self.get_session_id();
1216 let script = self.get_script_name();
1217 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1218 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1219 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1220 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1221 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1222 let lifecycle = serde_json::json!({
1223 "event": status,
1224 "worker_id": worker_id,
1225 "worker_name": worker_name,
1226 "started_at": started_at,
1227 "finished_at": finished_at,
1228 });
1229 self.notify(
1230 "session/update",
1231 serde_json::json!({
1232 "sessionId": session_id,
1233 "update": {
1234 "sessionUpdate": "worker_update",
1235 "content": {
1236 "worker_id": worker_id,
1237 "worker_name": worker_name,
1238 "status": status,
1239 "script": script,
1240 "started_at": started_at,
1241 "finished_at": finished_at,
1242 "snapshot_path": snapshot_path,
1243 "run_id": run_id,
1244 "run_path": run_path,
1245 "lifecycle": lifecycle,
1246 "audit": audit,
1247 "metadata": metadata,
1248 },
1249 },
1250 }),
1251 );
1252 }
1253}
1254
1255pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1257 crate::stdlib::json_to_vm_value(val)
1258}
1259
1260fn parse_host_tools_list_response(
1261 result: serde_json::Value,
1262) -> Result<Vec<serde_json::Value>, VmError> {
1263 let tools = match result {
1264 serde_json::Value::Array(items) => items,
1265 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1266 map.get("result")
1267 .and_then(|value| value.get("tools"))
1268 .cloned()
1269 }) {
1270 Some(serde_json::Value::Array(items)) => items,
1271 _ => {
1272 return Err(VmError::Runtime(
1273 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1274 ));
1275 }
1276 },
1277 _ => {
1278 return Err(VmError::Runtime(
1279 "host/tools/list: unexpected response shape".into(),
1280 ));
1281 }
1282 };
1283
1284 let mut normalized = Vec::with_capacity(tools.len());
1285 for tool in tools {
1286 let serde_json::Value::Object(map) = tool else {
1287 return Err(VmError::Runtime(
1288 "host/tools/list: every tool must be an object".into(),
1289 ));
1290 };
1291 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1292 return Err(VmError::Runtime(
1293 "host/tools/list: every tool must include a string `name`".into(),
1294 ));
1295 };
1296 let description = map
1297 .get("description")
1298 .and_then(|value| value.as_str())
1299 .or_else(|| {
1300 map.get("short_description")
1301 .and_then(|value| value.as_str())
1302 })
1303 .unwrap_or_default();
1304 let schema = map
1305 .get("schema")
1306 .cloned()
1307 .or_else(|| map.get("parameters").cloned())
1308 .or_else(|| map.get("input_schema").cloned())
1309 .unwrap_or(serde_json::Value::Null);
1310 let deprecated = map
1311 .get("deprecated")
1312 .and_then(|value| value.as_bool())
1313 .unwrap_or(false);
1314 normalized.push(serde_json::json!({
1315 "name": name,
1316 "description": description,
1317 "schema": schema,
1318 "deprecated": deprecated,
1319 }));
1320 }
1321 Ok(normalized)
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326 use super::*;
1327
1328 fn test_bridge() -> HostBridge {
1329 HostBridge::from_parts(
1330 Arc::new(Mutex::new(HashMap::new())),
1331 Arc::new(AtomicBool::new(false)),
1332 Arc::new(std::sync::Mutex::new(())),
1333 1,
1334 )
1335 }
1336
1337 #[test]
1338 fn test_json_rpc_request_format() {
1339 let request = crate::jsonrpc::request(
1340 1,
1341 "llm_call",
1342 serde_json::json!({
1343 "prompt": "Hello",
1344 "system": "Be helpful",
1345 }),
1346 );
1347 let s = serde_json::to_string(&request).unwrap();
1348 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1349 assert!(s.contains("\"id\":1"));
1350 assert!(s.contains("\"method\":\"llm_call\""));
1351 }
1352
1353 #[test]
1354 fn test_json_rpc_notification_format() {
1355 let notification =
1356 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1357 let s = serde_json::to_string(¬ification).unwrap();
1358 assert!(s.contains("\"method\":\"output\""));
1359 assert!(!s.contains("\"id\""));
1360 }
1361
1362 #[test]
1363 fn test_json_rpc_error_response_parsing() {
1364 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1365 assert!(response.get("error").is_some());
1366 assert_eq!(
1367 response["error"]["message"].as_str().unwrap(),
1368 "Invalid request"
1369 );
1370 }
1371
1372 #[test]
1373 fn test_json_rpc_success_response_parsing() {
1374 let response = crate::jsonrpc::response(
1375 1,
1376 serde_json::json!({
1377 "text": "Hello world",
1378 "input_tokens": 10,
1379 "output_tokens": 5,
1380 }),
1381 );
1382 assert!(response.get("result").is_some());
1383 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1384 }
1385
1386 #[test]
1387 fn test_cancelled_flag() {
1388 let cancelled = Arc::new(AtomicBool::new(false));
1389 assert!(!cancelled.load(Ordering::SeqCst));
1390 cancelled.store(true, Ordering::SeqCst);
1391 assert!(cancelled.load(Ordering::SeqCst));
1392 }
1393
1394 #[test]
1395 fn pending_host_calls_return_when_cancellation_arrives() {
1396 let runtime = tokio::runtime::Builder::new_current_thread()
1397 .enable_all()
1398 .build()
1399 .unwrap();
1400 runtime.block_on(async {
1401 let pending = Arc::new(Mutex::new(HashMap::new()));
1402 let cancelled = Arc::new(AtomicBool::new(false));
1403 let bridge = HostBridge::from_parts_with_writer(
1404 pending.clone(),
1405 cancelled.clone(),
1406 Arc::new(|_| Ok(())),
1407 1,
1408 );
1409
1410 let call = bridge.call("host/work", serde_json::json!({}));
1411 tokio::pin!(call);
1412
1413 loop {
1414 tokio::select! {
1415 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1416 _ = tokio::task::yield_now() => {}
1417 }
1418 if !pending.lock().await.is_empty() {
1419 break;
1420 }
1421 }
1422
1423 cancelled.store(true, Ordering::SeqCst);
1424 bridge.cancel_notify.notify_waiters();
1425
1426 let result = tokio::time::timeout(Duration::from_secs(1), call)
1427 .await
1428 .expect("pending call should observe cancellation promptly");
1429 assert!(
1430 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1431 );
1432 assert!(pending.lock().await.is_empty());
1433 });
1434 }
1435
1436 #[test]
1437 fn queued_messages_are_filtered_by_delivery_mode() {
1438 let runtime = tokio::runtime::Builder::new_current_thread()
1439 .enable_all()
1440 .build()
1441 .unwrap();
1442 runtime.block_on(async {
1443 let bridge = test_bridge();
1444 bridge
1445 .push_queued_user_message("first".to_string(), "finish_step")
1446 .await;
1447 bridge
1448 .push_queued_user_message("second".to_string(), "wait_for_completion")
1449 .await;
1450
1451 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1452 assert_eq!(finish_step.len(), 1);
1453 assert_eq!(finish_step[0].content, "first");
1454
1455 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1456 assert_eq!(turn_end.len(), 1);
1457 assert_eq!(turn_end[0].content, "second");
1458 });
1459 }
1460
1461 #[test]
1462 fn queued_transcript_injections_preserve_user_reminder_separation() {
1463 let runtime = tokio::runtime::Builder::new_current_thread()
1464 .enable_all()
1465 .build()
1466 .unwrap();
1467 runtime.block_on(async {
1468 let bridge = test_bridge();
1469 bridge
1470 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1471 .await;
1472 let reminder_id = bridge
1473 .push_queued_session_remind_from_params(&serde_json::json!({
1474 "body": "Host-provided ambient context.",
1475 "tags": ["host"],
1476 "dedupe_key": "host-context",
1477 "ttl_turns": 2,
1478 "mode": "wait_for_completion",
1479 "_meta": {"harn": {"source": "test"}},
1480 }))
1481 .await
1482 .expect("valid reminder");
1483
1484 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1485 assert_eq!(finish_step.len(), 1);
1486 assert_eq!(finish_step[0].content, "human follow-up");
1487
1488 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1489 assert!(no_user_messages.is_empty());
1490
1491 let injections = bridge
1492 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1493 .await;
1494 assert_eq!(injections.len(), 1);
1495 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1496 panic!("expected queued reminder");
1497 };
1498 assert_eq!(reminder.reminder.id, reminder_id);
1499 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1500 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1501 assert_eq!(
1502 reminder.reminder.dedupe_key.as_deref(),
1503 Some("host-context")
1504 );
1505 assert_eq!(reminder.reminder.ttl_turns, Some(2));
1506 assert_eq!(
1507 reminder.reminder.source,
1508 crate::llm::helpers::ReminderSource::Bridge
1509 );
1510 });
1511 }
1512
1513 #[test]
1514 fn bridge_remind_modes_honor_delivery_checkpoints() {
1515 let runtime = tokio::runtime::Builder::new_current_thread()
1516 .enable_all()
1517 .build()
1518 .unwrap();
1519 runtime.block_on(async {
1520 let cases = [
1521 (
1522 "interrupt_immediate",
1523 DeliveryCheckpoint::InterruptImmediate,
1524 DeliveryCheckpoint::AfterCurrentOperation,
1525 ),
1526 (
1527 "finish_step",
1528 DeliveryCheckpoint::AfterCurrentOperation,
1529 DeliveryCheckpoint::EndOfInteraction,
1530 ),
1531 (
1532 "wait_for_completion",
1533 DeliveryCheckpoint::EndOfInteraction,
1534 DeliveryCheckpoint::InterruptImmediate,
1535 ),
1536 ];
1537
1538 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
1539 let bridge = test_bridge();
1540 bridge
1541 .push_queued_session_remind_from_params(&serde_json::json!({
1542 "body": format!("Reminder for {mode}"),
1543 "mode": mode,
1544 }))
1545 .await
1546 .expect("valid session/remind payload");
1547
1548 let premature = bridge
1549 .take_queued_transcript_injections_for(wrong_checkpoint)
1550 .await;
1551 assert!(
1552 premature.is_empty(),
1553 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
1554 );
1555
1556 let delivered = bridge
1557 .take_queued_transcript_injections_for(expected_checkpoint)
1558 .await;
1559 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
1560 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
1561 panic!("expected reminder for {mode}");
1562 };
1563 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
1564 }
1565 });
1566 }
1567
1568 #[test]
1569 fn bridge_session_input_path_never_produces_reminder() {
1570 let runtime = tokio::runtime::Builder::new_current_thread()
1571 .enable_all()
1572 .build()
1573 .unwrap();
1574 runtime.block_on(async {
1575 let bridge = test_bridge();
1576 bridge
1577 .push_queued_user_message("still user input".to_string(), "finish_step")
1578 .await;
1579
1580 let delivered = bridge
1581 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
1582 .await;
1583 assert_eq!(delivered.len(), 1);
1584 let QueuedTranscriptInjection::User(message) = &delivered[0] else {
1585 panic!("session/input queue path must produce a user message");
1586 };
1587 assert_eq!(message.content, "still user input");
1588 });
1589 }
1590
1591 #[test]
1592 fn session_remind_validation_rejects_user_message_shape() {
1593 let err = queued_session_remind_from_params(&serde_json::json!({
1594 "content": "this is still a user message",
1595 "mode": "interrupt_immediate",
1596 }))
1597 .expect_err("session/remind must require a reminder body");
1598 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
1599 assert!(err.contains("body"));
1600 }
1601
1602 #[test]
1603 fn session_remind_validation_rejects_unknown_options_separately() {
1604 let err = queued_session_remind_from_params(&serde_json::json!({
1605 "body": "valid body",
1606 "unknown_host_field": true,
1607 }))
1608 .expect_err("session/remind must reject unknown top-level fields");
1609 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
1610 assert!(err.contains("unknown_host_field"));
1611 }
1612
1613 #[test]
1614 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
1615 let err = queued_session_remind_from_params(&serde_json::json!({
1616 "body": "valid body",
1617 "propagate": "workspace",
1618 }))
1619 .expect_err("session/remind must reject unknown propagate values");
1620 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
1621 assert!(err.contains("propagate"));
1622 }
1623
1624 #[test]
1625 fn test_json_result_to_vm_value_string() {
1626 let val = serde_json::json!("hello");
1627 let vm_val = json_result_to_vm_value(&val);
1628 assert_eq!(vm_val.display(), "hello");
1629 }
1630
1631 #[test]
1632 fn test_json_result_to_vm_value_dict() {
1633 let val = serde_json::json!({"name": "test", "count": 42});
1634 let vm_val = json_result_to_vm_value(&val);
1635 let VmValue::Dict(d) = &vm_val else {
1636 unreachable!("Expected Dict, got {:?}", vm_val);
1637 };
1638 assert_eq!(d.get("name").unwrap().display(), "test");
1639 assert_eq!(d.get("count").unwrap().display(), "42");
1640 }
1641
1642 #[test]
1643 fn test_json_result_to_vm_value_null() {
1644 let val = serde_json::json!(null);
1645 let vm_val = json_result_to_vm_value(&val);
1646 assert!(matches!(vm_val, VmValue::Nil));
1647 }
1648
1649 #[test]
1650 fn test_json_result_to_vm_value_nested() {
1651 let val = serde_json::json!({
1652 "text": "response",
1653 "tool_calls": [
1654 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1655 ],
1656 "input_tokens": 100,
1657 "output_tokens": 50,
1658 });
1659 let vm_val = json_result_to_vm_value(&val);
1660 let VmValue::Dict(d) = &vm_val else {
1661 unreachable!("Expected Dict, got {:?}", vm_val);
1662 };
1663 assert_eq!(d.get("text").unwrap().display(), "response");
1664 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1665 unreachable!("Expected List for tool_calls");
1666 };
1667 assert_eq!(list.len(), 1);
1668 }
1669
1670 #[test]
1671 fn parse_host_tools_list_accepts_object_wrapper() {
1672 let tools = parse_host_tools_list_response(serde_json::json!({
1673 "tools": [
1674 {
1675 "name": "Read",
1676 "description": "Read a file",
1677 "schema": {"type": "object"},
1678 }
1679 ]
1680 }))
1681 .expect("tool list");
1682
1683 assert_eq!(tools.len(), 1);
1684 assert_eq!(tools[0]["name"], "Read");
1685 assert_eq!(tools[0]["deprecated"], false);
1686 }
1687
1688 #[test]
1689 fn parse_host_tools_list_accepts_compat_fields() {
1690 let tools = parse_host_tools_list_response(serde_json::json!({
1691 "result": {
1692 "tools": [
1693 {
1694 "name": "Edit",
1695 "short_description": "Apply an edit",
1696 "input_schema": {"type": "object"},
1697 "deprecated": true,
1698 }
1699 ]
1700 }
1701 }))
1702 .expect("tool list");
1703
1704 assert_eq!(tools[0]["description"], "Apply an edit");
1705 assert_eq!(tools[0]["schema"]["type"], "object");
1706 assert_eq!(tools[0]["deprecated"], true);
1707 }
1708
1709 #[test]
1710 fn parse_host_tools_list_requires_tool_names() {
1711 let err = parse_host_tools_list_response(serde_json::json!({
1712 "tools": [
1713 {"description": "missing name"}
1714 ]
1715 }))
1716 .expect_err("expected error");
1717 assert!(err
1718 .to_string()
1719 .contains("host/tools/list: every tool must include a string `name`"));
1720 }
1721
1722 #[test]
1723 fn test_timeout_duration() {
1724 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1725 }
1726}