1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use harn_parser::diagnostic_codes::Code;
19
20use crate::orchestration::MutationSessionRecord;
21use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
22use crate::visible_text::VisibleTextState;
23use crate::vm::Vm;
24
25const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
27
28pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
29
30fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
31 Arc::new(move |line: &str| {
32 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
33 let mut stdout = std::io::stdout().lock();
34 stdout
35 .write_all(line.as_bytes())
36 .map_err(|e| format!("Bridge write error: {e}"))?;
37 stdout
38 .write_all(b"\n")
39 .map_err(|e| format!("Bridge write error: {e}"))?;
40 stdout
41 .flush()
42 .map_err(|e| format!("Bridge flush error: {e}"))?;
43 Ok(())
44 })
45}
46
47pub struct HostBridge {
54 next_id: AtomicU64,
55 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
57 cancelled: Arc<AtomicBool>,
59 cancel_notify: Arc<Notify>,
61 writer: HostBridgeWriter,
63 session_id: std::sync::Mutex<String>,
65 script_name: std::sync::Mutex<String>,
67 queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>>,
69 resume_requested: Arc<AtomicBool>,
71 skills_reload_requested: Arc<AtomicBool>,
76 daemon_idle: Arc<AtomicBool>,
78 prompt_stop_reason: std::sync::Mutex<Option<String>>,
84 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
86 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
88 in_process: Option<InProcessHost>,
90}
91
92struct InProcessHost {
93 module_path: PathBuf,
94 exported_functions: BTreeMap<String, Rc<VmClosure>>,
95 vm: Vm,
96}
97
98impl InProcessHost {
99 async fn dispatch(
100 &self,
101 method: &str,
102 params: serde_json::Value,
103 ) -> Result<serde_json::Value, VmError> {
104 match method {
105 "builtin_call" => {
106 let name = params
107 .get("name")
108 .and_then(|value| value.as_str())
109 .unwrap_or_default();
110 let args = params
111 .get("args")
112 .and_then(|value| value.as_array())
113 .cloned()
114 .unwrap_or_default()
115 .into_iter()
116 .map(|value| json_result_to_vm_value(&value))
117 .collect::<Vec<_>>();
118 self.invoke_export(name, &args).await
119 }
120 "host/tools/list" => self
121 .invoke_optional_export("host_tools_list", &[])
122 .await
123 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
124 "session/request_permission" => self.request_permission(params).await,
125 other => Err(VmError::Runtime(format!(
126 "playground host backend does not implement bridge method '{other}'"
127 ))),
128 }
129 }
130
131 async fn invoke_export(
132 &self,
133 name: &str,
134 args: &[VmValue],
135 ) -> Result<serde_json::Value, VmError> {
136 let Some(closure) = self.exported_functions.get(name) else {
137 return Err(VmError::Runtime(format!(
138 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
139 self.module_path.display()
140 )));
141 };
142
143 let mut vm = self.vm.child_vm_for_host();
144 let result = vm.call_closure_pub(closure, args).await?;
145 Ok(crate::llm::vm_value_to_json(&result))
146 }
147
148 async fn invoke_optional_export(
149 &self,
150 name: &str,
151 args: &[VmValue],
152 ) -> Result<Option<serde_json::Value>, VmError> {
153 if !self.exported_functions.contains_key(name) {
154 return Ok(None);
155 }
156 self.invoke_export(name, args).await.map(Some)
157 }
158
159 async fn request_permission(
160 &self,
161 params: serde_json::Value,
162 ) -> Result<serde_json::Value, VmError> {
163 let Some(closure) = self.exported_functions.get("request_permission") else {
164 return Ok(serde_json::json!({ "granted": true }));
165 };
166
167 let tool_name = params
168 .get("toolCall")
169 .and_then(|tool_call| tool_call.get("toolName"))
170 .and_then(|value| value.as_str())
171 .unwrap_or_default();
172 let tool_args = params
173 .get("toolCall")
174 .and_then(|tool_call| tool_call.get("rawInput"))
175 .map(json_result_to_vm_value)
176 .unwrap_or(VmValue::Nil);
177 let full_payload = json_result_to_vm_value(¶ms);
178
179 let arg_count = closure.func.params.len();
180 let args = if arg_count >= 3 {
181 vec![
182 VmValue::String(Rc::from(tool_name.to_string())),
183 tool_args,
184 full_payload,
185 ]
186 } else if arg_count == 2 {
187 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
188 } else if arg_count == 1 {
189 vec![full_payload]
190 } else {
191 Vec::new()
192 };
193
194 let mut vm = self.vm.child_vm_for_host();
195 let result = vm.call_closure_pub(closure, &args).await?;
196 let payload = match result {
197 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
198 VmValue::String(reason) if !reason.is_empty() => {
199 serde_json::json!({ "granted": false, "reason": reason.to_string() })
200 }
201 other => {
202 let json = crate::llm::vm_value_to_json(&other);
203 if json
204 .get("granted")
205 .and_then(|value| value.as_bool())
206 .is_some()
207 || json.get("outcome").is_some()
208 {
209 json
210 } else {
211 serde_json::json!({ "granted": other.is_truthy() })
212 }
213 }
214 };
215 Ok(payload)
216 }
217}
218
219#[derive(Clone, Copy, Debug, PartialEq, Eq)]
220pub enum QueuedUserMessageMode {
221 InterruptImmediate,
222 FinishStep,
223 WaitForCompletion,
224}
225
226#[derive(Clone, Copy, Debug, PartialEq, Eq)]
227pub enum DeliveryCheckpoint {
228 InterruptImmediate,
229 AfterCurrentOperation,
230 EndOfInteraction,
231}
232
233impl QueuedUserMessageMode {
234 fn from_str(value: &str) -> Self {
235 match value {
236 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
237 "finish_step" | "after_current_operation" => Self::FinishStep,
238 _ => Self::WaitForCompletion,
239 }
240 }
241}
242
243#[derive(Clone, Debug, PartialEq, Eq)]
244pub struct QueuedUserMessage {
245 pub content: String,
246 pub mode: QueuedUserMessageMode,
247}
248
249#[derive(Clone, Debug, PartialEq, Eq)]
250pub struct QueuedReminder {
251 pub reminder: crate::llm::helpers::SystemReminder,
252 pub mode: QueuedUserMessageMode,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq)]
256pub enum QueuedTranscriptInjection {
257 User(QueuedUserMessage),
258 Reminder(QueuedReminder),
259}
260
261impl QueuedTranscriptInjection {
262 fn mode(&self) -> QueuedUserMessageMode {
263 match self {
264 Self::User(message) => message.mode,
265 Self::Reminder(reminder) => reminder.mode,
266 }
267 }
268}
269
270fn queue_user_message_from_params(params: &serde_json::Value) -> Option<QueuedUserMessage> {
271 let content = params
272 .get("content")
273 .and_then(|v| v.as_str())
274 .unwrap_or("")
275 .to_string();
276 if content.is_empty() {
277 return None;
278 }
279 let mode = QueuedUserMessageMode::from_str(
280 params
281 .get("mode")
282 .and_then(|v| v.as_str())
283 .unwrap_or("wait_for_completion"),
284 );
285 Some(QueuedUserMessage { content, mode })
286}
287
288fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
289 format!(
290 "{}: {}",
291 Code::ReminderUnknownOption.as_str(),
292 message.as_ref()
293 )
294}
295
296fn session_remind_shape_error(message: impl AsRef<str>) -> String {
297 format!(
298 "{}: {}",
299 Code::ReminderInvalidShape.as_str(),
300 message.as_ref()
301 )
302}
303
304fn string_field(
305 map: &serde_json::Map<String, serde_json::Value>,
306 key: &str,
307 required: bool,
308) -> Result<Option<String>, String> {
309 match map.get(key) {
310 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
311 format!("`{key}` must be a non-empty string"),
312 )),
313 None | Some(serde_json::Value::Null) => Ok(None),
314 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
315 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
316 ),
317 Some(serde_json::Value::String(value)) => {
318 let trimmed = value.trim();
319 if trimmed.is_empty() {
320 Ok(None)
321 } else {
322 Ok(Some(trimmed.to_string()))
323 }
324 }
325 Some(other) => Err(session_remind_shape_error(format!(
326 "`{key}` must be a string, got {other}"
327 ))),
328 }
329}
330
331fn bool_field(
332 map: &serde_json::Map<String, serde_json::Value>,
333 key: &str,
334) -> Result<Option<bool>, String> {
335 match map.get(key) {
336 None | Some(serde_json::Value::Null) => Ok(None),
337 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
338 Some(other) => Err(session_remind_shape_error(format!(
339 "`{key}` must be a bool, got {other}"
340 ))),
341 }
342}
343
344fn int_field(
345 map: &serde_json::Map<String, serde_json::Value>,
346 key: &str,
347) -> Result<Option<i64>, String> {
348 match map.get(key) {
349 None | Some(serde_json::Value::Null) => Ok(None),
350 Some(serde_json::Value::Number(value)) => {
351 let Some(value) = value.as_i64() else {
352 return Err(session_remind_shape_error(format!(
353 "`{key}` must be an integer"
354 )));
355 };
356 Ok(Some(value))
357 }
358 Some(other) => Err(session_remind_shape_error(format!(
359 "`{key}` must be an int, got {other}"
360 ))),
361 }
362}
363
364fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
365 let Some(value) = map.get("tags") else {
366 return Ok(Vec::new());
367 };
368 if value.is_null() {
369 return Ok(Vec::new());
370 }
371 let Some(values) = value.as_array() else {
372 return Err(session_remind_shape_error("`tags` must be a list"));
373 };
374 let mut tags = Vec::new();
375 for value in values {
376 let Some(tag) = value.as_str() else {
377 return Err(session_remind_shape_error(format!(
378 "`tags` entries must be strings, got {value}"
379 )));
380 };
381 let tag = tag.trim();
382 if tag.is_empty() {
383 return Err(session_remind_shape_error(
384 "`tags` entries must be non-empty strings",
385 ));
386 }
387 if !tags.iter().any(|existing| existing == tag) {
388 tags.push(tag.to_string());
389 }
390 }
391 Ok(tags)
392}
393
394fn session_remind_payload_from_value(
395 value: &serde_json::Value,
396) -> Result<crate::llm::helpers::SystemReminder, String> {
397 let Some(map) = value.as_object() else {
398 return Err(session_remind_shape_error(
399 "session/remind payload must be a reminder object",
400 ));
401 };
402 const ALLOWED: &[&str] = &[
403 "_meta",
404 "body",
405 "dedupe_key",
406 "fired_at_turn",
407 "id",
408 "preserve_on_compact",
409 "propagate",
410 "role_hint",
411 "source",
412 "tags",
413 "ttl_turns",
414 ];
415 let unknown = map
416 .keys()
417 .filter(|key| !ALLOWED.contains(&key.as_str()))
418 .map(String::as_str)
419 .collect::<Vec<_>>();
420 if !unknown.is_empty() {
421 if unknown.contains(&"content") {
422 return Err(session_remind_shape_error(
423 "session/remind expects reminder `body`, not user-message `content`",
424 ));
425 }
426 return Err(reminder_unknown_option_error(format!(
427 "unknown reminder option(s): {}",
428 unknown.join(", ")
429 )));
430 }
431 if let Some(meta) = map.get("_meta") {
432 if !meta.is_null() && !meta.is_object() {
433 return Err(session_remind_shape_error("`_meta` must be an object"));
434 }
435 }
436 let ttl_turns = int_field(map, "ttl_turns")?;
437 if let Some(value) = ttl_turns {
438 if value <= 0 {
439 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
440 }
441 }
442 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
443 if fired_at_turn < 0 {
444 return Err(session_remind_shape_error(
445 "`fired_at_turn` must be >= 0 when provided",
446 ));
447 }
448 match string_field(map, "source", false)?.as_deref() {
449 None | Some("bridge") => {}
450 Some(_) => {
451 return Err(session_remind_shape_error(
452 "`source` for session/remind must be bridge when provided",
453 ))
454 }
455 }
456 let propagate = match string_field(map, "propagate", false)?.as_deref() {
457 None => crate::llm::helpers::ReminderPropagate::Session,
458 Some("all") => crate::llm::helpers::ReminderPropagate::All,
459 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
460 Some("none") => crate::llm::helpers::ReminderPropagate::None,
461 Some(_) => {
462 return Err(session_remind_shape_error(
463 "`propagate` must be one of all, session, or none",
464 ))
465 }
466 };
467 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
468 None => crate::llm::helpers::ReminderRoleHint::System,
469 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
470 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
471 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
472 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
473 Some(_) => {
474 return Err(session_remind_shape_error(
475 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
476 ))
477 }
478 };
479 Ok(crate::llm::helpers::SystemReminder {
480 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
481 tags: tags_field(map)?,
482 dedupe_key: string_field(map, "dedupe_key", false)?,
483 ttl_turns,
484 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
485 propagate,
486 role_hint,
487 source: crate::llm::helpers::ReminderSource::Bridge,
488 body: string_field(map, "body", true)?.unwrap_or_default(),
489 fired_at_turn,
490 originating_agent_id: None,
491 })
492}
493
494fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
495 let mode = QueuedUserMessageMode::from_str(
496 params
497 .get("mode")
498 .and_then(|value| value.as_str())
499 .unwrap_or("wait_for_completion"),
500 );
501 let reminder_value = if let Some(reminder) = params.get("reminder") {
502 reminder.clone()
503 } else {
504 let Some(params) = params.as_object() else {
505 return Err(session_remind_shape_error(
506 "session/remind params must be an object",
507 ));
508 };
509 let mut reminder = params.clone();
510 reminder.remove("mode");
511 reminder.remove("sessionId");
512 reminder.remove("session_id");
513 serde_json::Value::Object(reminder)
514 };
515 Ok(QueuedReminder {
516 reminder: session_remind_payload_from_value(&reminder_value)?,
517 mode,
518 })
519}
520
521#[allow(clippy::new_without_default)]
523impl HostBridge {
524 pub fn new() -> Self {
529 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
530 Arc::new(Mutex::new(HashMap::new()));
531 let cancelled = Arc::new(AtomicBool::new(false));
532 let cancel_notify = Arc::new(Notify::new());
533 let queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>> =
534 Arc::new(Mutex::new(VecDeque::new()));
535 let resume_requested = Arc::new(AtomicBool::new(false));
536 let skills_reload_requested = Arc::new(AtomicBool::new(false));
537 let daemon_idle = Arc::new(AtomicBool::new(false));
538
539 let pending_clone = pending.clone();
541 let cancelled_clone = cancelled.clone();
542 let cancel_notify_clone = cancel_notify.clone();
543 let queued_clone = queued_transcript_injections.clone();
544 let resume_clone = resume_requested.clone();
545 let skills_reload_clone = skills_reload_requested.clone();
546 tokio::task::spawn_local(async move {
547 let stdin = tokio::io::stdin();
548 let reader = tokio::io::BufReader::new(stdin);
549 let mut lines = reader.lines();
550
551 while let Ok(Some(line)) = lines.next_line().await {
552 let line = line.trim().to_string();
553 if line.is_empty() {
554 continue;
555 }
556
557 let msg: serde_json::Value = match serde_json::from_str(&line) {
558 Ok(v) => v,
559 Err(_) => continue,
560 };
561
562 if msg.get("id").is_none() {
564 if let Some(method) = msg["method"].as_str() {
565 if method == "cancel" {
566 cancelled_clone.store(true, Ordering::SeqCst);
567 cancel_notify_clone.notify_waiters();
568 } else if method == "agent/resume" {
569 resume_clone.store(true, Ordering::SeqCst);
570 } else if method == "skills/update" {
571 skills_reload_clone.store(true, Ordering::SeqCst);
572 } else if method == "user_message"
573 || method == "session/input"
574 || method == "agent/user_message"
575 {
576 let params = &msg["params"];
577 if let Some(message) = queue_user_message_from_params(params) {
578 queued_clone
579 .lock()
580 .await
581 .push_back(QueuedTranscriptInjection::User(message));
582 }
583 } else if method == "session/remind" {
584 let params = &msg["params"];
585 if let Ok(reminder) = queued_session_remind_from_params(params) {
586 queued_clone
587 .lock()
588 .await
589 .push_back(QueuedTranscriptInjection::Reminder(reminder));
590 }
591 }
592 }
593 continue;
594 }
595
596 if let Some(id) = msg["id"].as_u64() {
597 let mut pending = pending_clone.lock().await;
598 if let Some(sender) = pending.remove(&id) {
599 let _ = sender.send(msg);
600 }
601 }
602 }
603
604 let mut pending = pending_clone.lock().await;
606 pending.clear();
607 });
608
609 Self {
610 next_id: AtomicU64::new(1),
611 pending,
612 cancelled,
613 cancel_notify,
614 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
615 session_id: std::sync::Mutex::new(String::new()),
616 script_name: std::sync::Mutex::new(String::new()),
617 queued_transcript_injections,
618 resume_requested,
619 skills_reload_requested,
620 daemon_idle,
621 prompt_stop_reason: std::sync::Mutex::new(None),
622 visible_call_states: std::sync::Mutex::new(HashMap::new()),
623 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
624 in_process: None,
625 }
626 }
627
628 pub fn from_parts(
634 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
635 cancelled: Arc<AtomicBool>,
636 stdout_lock: Arc<std::sync::Mutex<()>>,
637 start_id: u64,
638 ) -> Self {
639 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
640 }
641
642 pub fn from_parts_with_writer(
643 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
644 cancelled: Arc<AtomicBool>,
645 writer: HostBridgeWriter,
646 start_id: u64,
647 ) -> Self {
648 Self::from_parts_with_writer_and_cancel_notify(
649 pending,
650 cancelled,
651 Arc::new(Notify::new()),
652 writer,
653 start_id,
654 )
655 }
656
657 pub fn from_parts_with_writer_and_cancel_notify(
658 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
659 cancelled: Arc<AtomicBool>,
660 cancel_notify: Arc<Notify>,
661 writer: HostBridgeWriter,
662 start_id: u64,
663 ) -> Self {
664 Self {
665 next_id: AtomicU64::new(start_id),
666 pending,
667 cancelled,
668 cancel_notify,
669 writer,
670 session_id: std::sync::Mutex::new(String::new()),
671 script_name: std::sync::Mutex::new(String::new()),
672 queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
673 resume_requested: Arc::new(AtomicBool::new(false)),
674 skills_reload_requested: Arc::new(AtomicBool::new(false)),
675 daemon_idle: Arc::new(AtomicBool::new(false)),
676 prompt_stop_reason: std::sync::Mutex::new(None),
677 visible_call_states: std::sync::Mutex::new(HashMap::new()),
678 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
679 in_process: None,
680 }
681 }
682
683 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
686 let exported_functions = vm.load_module_exports(module_path).await?;
687 Ok(Self {
688 next_id: AtomicU64::new(1),
689 pending: Arc::new(Mutex::new(HashMap::new())),
690 cancelled: Arc::new(AtomicBool::new(false)),
691 cancel_notify: Arc::new(Notify::new()),
692 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
693 session_id: std::sync::Mutex::new(String::new()),
694 script_name: std::sync::Mutex::new(String::new()),
695 queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
696 resume_requested: Arc::new(AtomicBool::new(false)),
697 skills_reload_requested: Arc::new(AtomicBool::new(false)),
698 daemon_idle: Arc::new(AtomicBool::new(false)),
699 prompt_stop_reason: std::sync::Mutex::new(None),
700 visible_call_states: std::sync::Mutex::new(HashMap::new()),
701 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
702 in_process: Some(InProcessHost {
703 module_path: module_path.to_path_buf(),
704 exported_functions,
705 vm,
706 }),
707 })
708 }
709
710 pub fn set_session_id(&self, id: &str) {
712 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
713 }
714
715 pub fn set_script_name(&self, name: &str) {
717 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
718 }
719
720 fn get_script_name(&self) -> String {
722 self.script_name
723 .lock()
724 .unwrap_or_else(|e| e.into_inner())
725 .clone()
726 }
727
728 pub fn get_session_id(&self) -> String {
730 self.session_id
731 .lock()
732 .unwrap_or_else(|e| e.into_inner())
733 .clone()
734 }
735
736 fn write_line(&self, line: &str) -> Result<(), VmError> {
738 (self.writer)(line).map_err(VmError::Runtime)
739 }
740
741 pub async fn call(
744 &self,
745 method: &str,
746 params: serde_json::Value,
747 ) -> Result<serde_json::Value, VmError> {
748 if let Some(in_process) = &self.in_process {
749 return in_process.dispatch(method, params).await;
750 }
751
752 if self.is_cancelled() {
753 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
754 }
755
756 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
757 let cancel_wait = self.cancel_notify.notified();
758 tokio::pin!(cancel_wait);
759
760 let request = crate::jsonrpc::request(id, method, params);
761
762 let (tx, rx) = oneshot::channel();
763 {
764 let mut pending = self.pending.lock().await;
765 pending.insert(id, tx);
766 }
767
768 let line = serde_json::to_string(&request)
769 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
770 if let Err(e) = self.write_line(&line) {
771 let mut pending = self.pending.lock().await;
772 pending.remove(&id);
773 return Err(e);
774 }
775
776 if self.is_cancelled() {
777 let mut pending = self.pending.lock().await;
778 pending.remove(&id);
779 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
780 }
781
782 let response = tokio::select! {
783 result = rx => match result {
784 Ok(msg) => msg,
785 Err(_) => {
786 return Err(VmError::Runtime(
788 "Bridge: host closed connection before responding".into(),
789 ));
790 }
791 },
792 _ = &mut cancel_wait => {
793 let mut pending = self.pending.lock().await;
794 pending.remove(&id);
795 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
796 }
797 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
798 let mut pending = self.pending.lock().await;
799 pending.remove(&id);
800 return Err(VmError::Runtime(format!(
801 "Bridge: host did not respond to '{method}' within {}s",
802 DEFAULT_TIMEOUT.as_secs()
803 )));
804 }
805 };
806
807 if let Some(error) = response.get("error") {
808 let message = error["message"].as_str().unwrap_or("Unknown host error");
809 let code = error["code"].as_i64().unwrap_or(-1);
810 if code == -32001 {
812 return Err(VmError::CategorizedError {
813 message: message.to_string(),
814 category: ErrorCategory::ToolRejected,
815 });
816 }
817 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
818 }
819
820 Ok(response["result"].clone())
821 }
822
823 pub fn notify(&self, method: &str, params: serde_json::Value) {
826 let notification = crate::jsonrpc::notification(method, params);
827 if self.in_process.is_some() {
828 return;
829 }
830 if let Ok(line) = serde_json::to_string(¬ification) {
831 let _ = self.write_line(&line);
832 }
833 }
834
835 pub fn is_cancelled(&self) -> bool {
837 self.cancelled.load(Ordering::SeqCst)
838 }
839
840 pub fn take_resume_signal(&self) -> bool {
841 self.resume_requested.swap(false, Ordering::SeqCst)
842 }
843
844 pub fn signal_resume(&self) {
845 self.resume_requested.store(true, Ordering::SeqCst);
846 }
847
848 pub fn set_daemon_idle(&self, idle: bool) {
849 self.daemon_idle.store(idle, Ordering::SeqCst);
850 }
851
852 pub fn is_daemon_idle(&self) -> bool {
853 self.daemon_idle.load(Ordering::SeqCst)
854 }
855
856 pub fn set_prompt_stop_reason(&self, reason: &str) {
861 *self
862 .prompt_stop_reason
863 .lock()
864 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
865 }
866
867 pub fn take_prompt_stop_reason(&self) -> Option<String> {
872 self.prompt_stop_reason
873 .lock()
874 .unwrap_or_else(|e| e.into_inner())
875 .take()
876 }
877
878 pub fn take_skills_reload_signal(&self) -> bool {
883 self.skills_reload_requested.swap(false, Ordering::SeqCst)
884 }
885
886 pub fn signal_skills_reload(&self) {
890 self.skills_reload_requested.store(true, Ordering::SeqCst);
891 }
892
893 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
899 let result = self.call("skills/list", serde_json::json!({})).await?;
900 match result {
901 serde_json::Value::Array(items) => Ok(items),
902 serde_json::Value::Object(map) => match map.get("skills") {
903 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
904 _ => Err(VmError::Runtime(
905 "skills/list: host response must be an array or { skills: [...] }".into(),
906 )),
907 },
908 _ => Err(VmError::Runtime(
909 "skills/list: unexpected response shape".into(),
910 )),
911 }
912 }
913
914 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
920 let result = self.call("host/tools/list", serde_json::json!({})).await?;
921 parse_host_tools_list_response(result)
922 }
923
924 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
928 self.call("skills/fetch", serde_json::json!({ "id": id }))
929 .await
930 }
931
932 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
933 self.queued_transcript_injections
934 .lock()
935 .await
936 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
937 content,
938 mode: QueuedUserMessageMode::from_str(mode),
939 }));
940 }
941
942 pub async fn push_queued_session_remind_from_params(
943 &self,
944 params: &serde_json::Value,
945 ) -> Result<String, String> {
946 let reminder = queued_session_remind_from_params(params)?;
947 let reminder_id = reminder.reminder.id.clone();
948 self.queued_transcript_injections
949 .lock()
950 .await
951 .push_back(QueuedTranscriptInjection::Reminder(reminder));
952 Ok(reminder_id)
953 }
954
955 pub async fn take_queued_user_messages(
956 &self,
957 include_interrupt_immediate: bool,
958 include_finish_step: bool,
959 include_wait_for_completion: bool,
960 ) -> Vec<QueuedUserMessage> {
961 let mut queue = self.queued_transcript_injections.lock().await;
962 let mut selected = Vec::new();
963 let mut retained = VecDeque::new();
964 while let Some(injection) = queue.pop_front() {
965 let should_take = match injection.mode() {
966 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
967 QueuedUserMessageMode::FinishStep => include_finish_step,
968 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
969 };
970 match (should_take, injection) {
971 (true, QueuedTranscriptInjection::User(message)) => selected.push(message),
972 (_, injection) => retained.push_back(injection),
973 }
974 }
975 *queue = retained;
976 selected
977 }
978
979 pub async fn take_queued_transcript_injections(
980 &self,
981 include_interrupt_immediate: bool,
982 include_finish_step: bool,
983 include_wait_for_completion: bool,
984 ) -> Vec<QueuedTranscriptInjection> {
985 let mut queue = self.queued_transcript_injections.lock().await;
986 let mut selected = Vec::new();
987 let mut retained = VecDeque::new();
988 while let Some(injection) = queue.pop_front() {
989 let should_take = match injection.mode() {
990 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
991 QueuedUserMessageMode::FinishStep => include_finish_step,
992 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
993 };
994 if should_take {
995 selected.push(injection);
996 } else {
997 retained.push_back(injection);
998 }
999 }
1000 *queue = retained;
1001 selected
1002 }
1003
1004 pub async fn take_queued_user_messages_for(
1005 &self,
1006 checkpoint: DeliveryCheckpoint,
1007 ) -> Vec<QueuedUserMessage> {
1008 match checkpoint {
1009 DeliveryCheckpoint::InterruptImmediate => {
1010 self.take_queued_user_messages(true, false, false).await
1011 }
1012 DeliveryCheckpoint::AfterCurrentOperation => {
1013 self.take_queued_user_messages(false, true, false).await
1014 }
1015 DeliveryCheckpoint::EndOfInteraction => {
1016 self.take_queued_user_messages(false, false, true).await
1017 }
1018 }
1019 }
1020
1021 pub async fn take_queued_transcript_injections_for(
1022 &self,
1023 checkpoint: DeliveryCheckpoint,
1024 ) -> Vec<QueuedTranscriptInjection> {
1025 match checkpoint {
1026 DeliveryCheckpoint::InterruptImmediate => {
1027 self.take_queued_transcript_injections(true, false, false)
1028 .await
1029 }
1030 DeliveryCheckpoint::AfterCurrentOperation => {
1031 self.take_queued_transcript_injections(false, true, false)
1032 .await
1033 }
1034 DeliveryCheckpoint::EndOfInteraction => {
1035 self.take_queued_transcript_injections(false, false, true)
1036 .await
1037 }
1038 }
1039 }
1040
1041 pub fn send_output(&self, text: &str) {
1043 self.notify("output", serde_json::json!({"text": text}));
1044 }
1045
1046 pub fn send_progress(
1048 &self,
1049 phase: &str,
1050 message: &str,
1051 progress: Option<i64>,
1052 total: Option<i64>,
1053 data: Option<serde_json::Value>,
1054 ) {
1055 let mut payload = serde_json::json!({"phase": phase, "message": message});
1056 if let Some(p) = progress {
1057 payload["progress"] = serde_json::json!(p);
1058 }
1059 if let Some(t) = total {
1060 payload["total"] = serde_json::json!(t);
1061 }
1062 if let Some(d) = data {
1063 payload["data"] = d;
1064 }
1065 self.notify("progress", payload);
1066 }
1067
1068 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1070 let mut payload = serde_json::json!({"level": level, "message": message});
1071 if let Some(f) = fields {
1072 payload["fields"] = f;
1073 }
1074 self.notify("log", payload);
1075 }
1076
1077 pub fn send_call_start(
1080 &self,
1081 call_id: &str,
1082 call_type: &str,
1083 name: &str,
1084 metadata: serde_json::Value,
1085 ) {
1086 let session_id = self.get_session_id();
1087 let script = self.get_script_name();
1088 let stream_publicly = metadata
1089 .get("stream_publicly")
1090 .and_then(|value| value.as_bool())
1091 .unwrap_or(true);
1092 self.visible_call_streams
1093 .lock()
1094 .unwrap_or_else(|e| e.into_inner())
1095 .insert(call_id.to_string(), stream_publicly);
1096 self.notify(
1097 "session/update",
1098 serde_json::json!({
1099 "sessionId": session_id,
1100 "update": {
1101 "sessionUpdate": "call_start",
1102 "content": {
1103 "toolCallId": call_id,
1104 "call_type": call_type,
1105 "name": name,
1106 "script": script,
1107 "metadata": metadata,
1108 },
1109 },
1110 }),
1111 );
1112 }
1113
1114 pub fn send_call_progress(
1117 &self,
1118 call_id: &str,
1119 delta: &str,
1120 accumulated_tokens: u64,
1121 user_visible: bool,
1122 ) {
1123 let session_id = self.get_session_id();
1124 let (visible_text, visible_delta) = {
1125 let stream_publicly = self
1126 .visible_call_streams
1127 .lock()
1128 .unwrap_or_else(|e| e.into_inner())
1129 .get(call_id)
1130 .copied()
1131 .unwrap_or(true);
1132 let mut states = self
1133 .visible_call_states
1134 .lock()
1135 .unwrap_or_else(|e| e.into_inner());
1136 let state = states.entry(call_id.to_string()).or_default();
1137 state.push(delta, stream_publicly)
1138 };
1139 self.notify(
1140 "session/update",
1141 serde_json::json!({
1142 "sessionId": session_id,
1143 "update": {
1144 "sessionUpdate": "call_progress",
1145 "content": {
1146 "toolCallId": call_id,
1147 "delta": delta,
1148 "accumulated_tokens": accumulated_tokens,
1149 "visible_text": visible_text,
1150 "visible_delta": visible_delta,
1151 "user_visible": user_visible,
1152 },
1153 },
1154 }),
1155 );
1156 }
1157
1158 pub fn send_call_end(
1160 &self,
1161 call_id: &str,
1162 call_type: &str,
1163 name: &str,
1164 duration_ms: u64,
1165 status: &str,
1166 metadata: serde_json::Value,
1167 ) {
1168 let session_id = self.get_session_id();
1169 let script = self.get_script_name();
1170 self.visible_call_states
1171 .lock()
1172 .unwrap_or_else(|e| e.into_inner())
1173 .remove(call_id);
1174 self.visible_call_streams
1175 .lock()
1176 .unwrap_or_else(|e| e.into_inner())
1177 .remove(call_id);
1178 self.notify(
1179 "session/update",
1180 serde_json::json!({
1181 "sessionId": session_id,
1182 "update": {
1183 "sessionUpdate": "call_end",
1184 "content": {
1185 "toolCallId": call_id,
1186 "call_type": call_type,
1187 "name": name,
1188 "script": script,
1189 "duration_ms": duration_ms,
1190 "status": status,
1191 "metadata": metadata,
1192 },
1193 },
1194 }),
1195 );
1196 }
1197
1198 pub fn send_worker_update(
1200 &self,
1201 worker_id: &str,
1202 worker_name: &str,
1203 status: &str,
1204 metadata: serde_json::Value,
1205 audit: Option<&MutationSessionRecord>,
1206 ) {
1207 let session_id = self.get_session_id();
1208 let script = self.get_script_name();
1209 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1210 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1211 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1212 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1213 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1214 let lifecycle = serde_json::json!({
1215 "event": status,
1216 "worker_id": worker_id,
1217 "worker_name": worker_name,
1218 "started_at": started_at,
1219 "finished_at": finished_at,
1220 });
1221 self.notify(
1222 "session/update",
1223 serde_json::json!({
1224 "sessionId": session_id,
1225 "update": {
1226 "sessionUpdate": "worker_update",
1227 "content": {
1228 "worker_id": worker_id,
1229 "worker_name": worker_name,
1230 "status": status,
1231 "script": script,
1232 "started_at": started_at,
1233 "finished_at": finished_at,
1234 "snapshot_path": snapshot_path,
1235 "run_id": run_id,
1236 "run_path": run_path,
1237 "lifecycle": lifecycle,
1238 "audit": audit,
1239 "metadata": metadata,
1240 },
1241 },
1242 }),
1243 );
1244 }
1245}
1246
1247pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1249 crate::stdlib::json_to_vm_value(val)
1250}
1251
1252fn parse_host_tools_list_response(
1253 result: serde_json::Value,
1254) -> Result<Vec<serde_json::Value>, VmError> {
1255 let tools = match result {
1256 serde_json::Value::Array(items) => items,
1257 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1258 map.get("result")
1259 .and_then(|value| value.get("tools"))
1260 .cloned()
1261 }) {
1262 Some(serde_json::Value::Array(items)) => items,
1263 _ => {
1264 return Err(VmError::Runtime(
1265 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1266 ));
1267 }
1268 },
1269 _ => {
1270 return Err(VmError::Runtime(
1271 "host/tools/list: unexpected response shape".into(),
1272 ));
1273 }
1274 };
1275
1276 let mut normalized = Vec::with_capacity(tools.len());
1277 for tool in tools {
1278 let serde_json::Value::Object(map) = tool else {
1279 return Err(VmError::Runtime(
1280 "host/tools/list: every tool must be an object".into(),
1281 ));
1282 };
1283 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1284 return Err(VmError::Runtime(
1285 "host/tools/list: every tool must include a string `name`".into(),
1286 ));
1287 };
1288 let description = map
1289 .get("description")
1290 .and_then(|value| value.as_str())
1291 .or_else(|| {
1292 map.get("short_description")
1293 .and_then(|value| value.as_str())
1294 })
1295 .unwrap_or_default();
1296 let schema = map
1297 .get("schema")
1298 .cloned()
1299 .or_else(|| map.get("parameters").cloned())
1300 .or_else(|| map.get("input_schema").cloned())
1301 .unwrap_or(serde_json::Value::Null);
1302 let deprecated = map
1303 .get("deprecated")
1304 .and_then(|value| value.as_bool())
1305 .unwrap_or(false);
1306 normalized.push(serde_json::json!({
1307 "name": name,
1308 "description": description,
1309 "schema": schema,
1310 "deprecated": deprecated,
1311 }));
1312 }
1313 Ok(normalized)
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318 use super::*;
1319
1320 fn test_bridge() -> HostBridge {
1321 HostBridge::from_parts(
1322 Arc::new(Mutex::new(HashMap::new())),
1323 Arc::new(AtomicBool::new(false)),
1324 Arc::new(std::sync::Mutex::new(())),
1325 1,
1326 )
1327 }
1328
1329 #[test]
1330 fn test_json_rpc_request_format() {
1331 let request = crate::jsonrpc::request(
1332 1,
1333 "llm_call",
1334 serde_json::json!({
1335 "prompt": "Hello",
1336 "system": "Be helpful",
1337 }),
1338 );
1339 let s = serde_json::to_string(&request).unwrap();
1340 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1341 assert!(s.contains("\"id\":1"));
1342 assert!(s.contains("\"method\":\"llm_call\""));
1343 }
1344
1345 #[test]
1346 fn test_json_rpc_notification_format() {
1347 let notification =
1348 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1349 let s = serde_json::to_string(¬ification).unwrap();
1350 assert!(s.contains("\"method\":\"output\""));
1351 assert!(!s.contains("\"id\""));
1352 }
1353
1354 #[test]
1355 fn test_json_rpc_error_response_parsing() {
1356 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1357 assert!(response.get("error").is_some());
1358 assert_eq!(
1359 response["error"]["message"].as_str().unwrap(),
1360 "Invalid request"
1361 );
1362 }
1363
1364 #[test]
1365 fn test_json_rpc_success_response_parsing() {
1366 let response = crate::jsonrpc::response(
1367 1,
1368 serde_json::json!({
1369 "text": "Hello world",
1370 "input_tokens": 10,
1371 "output_tokens": 5,
1372 }),
1373 );
1374 assert!(response.get("result").is_some());
1375 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1376 }
1377
1378 #[test]
1379 fn test_cancelled_flag() {
1380 let cancelled = Arc::new(AtomicBool::new(false));
1381 assert!(!cancelled.load(Ordering::SeqCst));
1382 cancelled.store(true, Ordering::SeqCst);
1383 assert!(cancelled.load(Ordering::SeqCst));
1384 }
1385
1386 #[test]
1387 fn pending_host_calls_return_when_cancellation_arrives() {
1388 let runtime = tokio::runtime::Builder::new_current_thread()
1389 .enable_all()
1390 .build()
1391 .unwrap();
1392 runtime.block_on(async {
1393 let pending = Arc::new(Mutex::new(HashMap::new()));
1394 let cancelled = Arc::new(AtomicBool::new(false));
1395 let bridge = HostBridge::from_parts_with_writer(
1396 pending.clone(),
1397 cancelled.clone(),
1398 Arc::new(|_| Ok(())),
1399 1,
1400 );
1401
1402 let call = bridge.call("host/work", serde_json::json!({}));
1403 tokio::pin!(call);
1404
1405 loop {
1406 tokio::select! {
1407 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1408 _ = tokio::task::yield_now() => {}
1409 }
1410 if !pending.lock().await.is_empty() {
1411 break;
1412 }
1413 }
1414
1415 cancelled.store(true, Ordering::SeqCst);
1416 bridge.cancel_notify.notify_waiters();
1417
1418 let result = tokio::time::timeout(Duration::from_secs(1), call)
1419 .await
1420 .expect("pending call should observe cancellation promptly");
1421 assert!(
1422 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1423 );
1424 assert!(pending.lock().await.is_empty());
1425 });
1426 }
1427
1428 #[test]
1429 fn queued_messages_are_filtered_by_delivery_mode() {
1430 let runtime = tokio::runtime::Builder::new_current_thread()
1431 .enable_all()
1432 .build()
1433 .unwrap();
1434 runtime.block_on(async {
1435 let bridge = test_bridge();
1436 bridge
1437 .push_queued_user_message("first".to_string(), "finish_step")
1438 .await;
1439 bridge
1440 .push_queued_user_message("second".to_string(), "wait_for_completion")
1441 .await;
1442
1443 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1444 assert_eq!(finish_step.len(), 1);
1445 assert_eq!(finish_step[0].content, "first");
1446
1447 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1448 assert_eq!(turn_end.len(), 1);
1449 assert_eq!(turn_end[0].content, "second");
1450 });
1451 }
1452
1453 #[test]
1454 fn queued_transcript_injections_preserve_user_reminder_separation() {
1455 let runtime = tokio::runtime::Builder::new_current_thread()
1456 .enable_all()
1457 .build()
1458 .unwrap();
1459 runtime.block_on(async {
1460 let bridge = test_bridge();
1461 bridge
1462 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1463 .await;
1464 let reminder_id = bridge
1465 .push_queued_session_remind_from_params(&serde_json::json!({
1466 "body": "Host-provided ambient context.",
1467 "tags": ["host"],
1468 "dedupe_key": "host-context",
1469 "ttl_turns": 2,
1470 "mode": "wait_for_completion",
1471 "_meta": {"harn": {"source": "test"}},
1472 }))
1473 .await
1474 .expect("valid reminder");
1475
1476 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1477 assert_eq!(finish_step.len(), 1);
1478 assert_eq!(finish_step[0].content, "human follow-up");
1479
1480 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1481 assert!(no_user_messages.is_empty());
1482
1483 let injections = bridge
1484 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1485 .await;
1486 assert_eq!(injections.len(), 1);
1487 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1488 panic!("expected queued reminder");
1489 };
1490 assert_eq!(reminder.reminder.id, reminder_id);
1491 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1492 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1493 assert_eq!(
1494 reminder.reminder.dedupe_key.as_deref(),
1495 Some("host-context")
1496 );
1497 assert_eq!(reminder.reminder.ttl_turns, Some(2));
1498 assert_eq!(
1499 reminder.reminder.source,
1500 crate::llm::helpers::ReminderSource::Bridge
1501 );
1502 });
1503 }
1504
1505 #[test]
1506 fn bridge_remind_modes_honor_delivery_checkpoints() {
1507 let runtime = tokio::runtime::Builder::new_current_thread()
1508 .enable_all()
1509 .build()
1510 .unwrap();
1511 runtime.block_on(async {
1512 let cases = [
1513 (
1514 "interrupt_immediate",
1515 DeliveryCheckpoint::InterruptImmediate,
1516 DeliveryCheckpoint::AfterCurrentOperation,
1517 ),
1518 (
1519 "finish_step",
1520 DeliveryCheckpoint::AfterCurrentOperation,
1521 DeliveryCheckpoint::EndOfInteraction,
1522 ),
1523 (
1524 "wait_for_completion",
1525 DeliveryCheckpoint::EndOfInteraction,
1526 DeliveryCheckpoint::InterruptImmediate,
1527 ),
1528 ];
1529
1530 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
1531 let bridge = test_bridge();
1532 bridge
1533 .push_queued_session_remind_from_params(&serde_json::json!({
1534 "body": format!("Reminder for {mode}"),
1535 "mode": mode,
1536 }))
1537 .await
1538 .expect("valid session/remind payload");
1539
1540 let premature = bridge
1541 .take_queued_transcript_injections_for(wrong_checkpoint)
1542 .await;
1543 assert!(
1544 premature.is_empty(),
1545 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
1546 );
1547
1548 let delivered = bridge
1549 .take_queued_transcript_injections_for(expected_checkpoint)
1550 .await;
1551 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
1552 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
1553 panic!("expected reminder for {mode}");
1554 };
1555 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
1556 }
1557 });
1558 }
1559
1560 #[test]
1561 fn bridge_session_input_path_never_produces_reminder() {
1562 let runtime = tokio::runtime::Builder::new_current_thread()
1563 .enable_all()
1564 .build()
1565 .unwrap();
1566 runtime.block_on(async {
1567 let bridge = test_bridge();
1568 bridge
1569 .push_queued_user_message("still user input".to_string(), "finish_step")
1570 .await;
1571
1572 let delivered = bridge
1573 .take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
1574 .await;
1575 assert_eq!(delivered.len(), 1);
1576 let QueuedTranscriptInjection::User(message) = &delivered[0] else {
1577 panic!("session/input queue path must produce a user message");
1578 };
1579 assert_eq!(message.content, "still user input");
1580 });
1581 }
1582
1583 #[test]
1584 fn session_remind_validation_rejects_user_message_shape() {
1585 let err = queued_session_remind_from_params(&serde_json::json!({
1586 "content": "this is still a user message",
1587 "mode": "interrupt_immediate",
1588 }))
1589 .expect_err("session/remind must require a reminder body");
1590 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
1591 assert!(err.contains("body"));
1592 }
1593
1594 #[test]
1595 fn session_remind_validation_rejects_unknown_options_separately() {
1596 let err = queued_session_remind_from_params(&serde_json::json!({
1597 "body": "valid body",
1598 "unknown_host_field": true,
1599 }))
1600 .expect_err("session/remind must reject unknown top-level fields");
1601 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
1602 assert!(err.contains("unknown_host_field"));
1603 }
1604
1605 #[test]
1606 fn test_json_result_to_vm_value_string() {
1607 let val = serde_json::json!("hello");
1608 let vm_val = json_result_to_vm_value(&val);
1609 assert_eq!(vm_val.display(), "hello");
1610 }
1611
1612 #[test]
1613 fn test_json_result_to_vm_value_dict() {
1614 let val = serde_json::json!({"name": "test", "count": 42});
1615 let vm_val = json_result_to_vm_value(&val);
1616 let VmValue::Dict(d) = &vm_val else {
1617 unreachable!("Expected Dict, got {:?}", vm_val);
1618 };
1619 assert_eq!(d.get("name").unwrap().display(), "test");
1620 assert_eq!(d.get("count").unwrap().display(), "42");
1621 }
1622
1623 #[test]
1624 fn test_json_result_to_vm_value_null() {
1625 let val = serde_json::json!(null);
1626 let vm_val = json_result_to_vm_value(&val);
1627 assert!(matches!(vm_val, VmValue::Nil));
1628 }
1629
1630 #[test]
1631 fn test_json_result_to_vm_value_nested() {
1632 let val = serde_json::json!({
1633 "text": "response",
1634 "tool_calls": [
1635 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1636 ],
1637 "input_tokens": 100,
1638 "output_tokens": 50,
1639 });
1640 let vm_val = json_result_to_vm_value(&val);
1641 let VmValue::Dict(d) = &vm_val else {
1642 unreachable!("Expected Dict, got {:?}", vm_val);
1643 };
1644 assert_eq!(d.get("text").unwrap().display(), "response");
1645 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1646 unreachable!("Expected List for tool_calls");
1647 };
1648 assert_eq!(list.len(), 1);
1649 }
1650
1651 #[test]
1652 fn parse_host_tools_list_accepts_object_wrapper() {
1653 let tools = parse_host_tools_list_response(serde_json::json!({
1654 "tools": [
1655 {
1656 "name": "Read",
1657 "description": "Read a file",
1658 "schema": {"type": "object"},
1659 }
1660 ]
1661 }))
1662 .expect("tool list");
1663
1664 assert_eq!(tools.len(), 1);
1665 assert_eq!(tools[0]["name"], "Read");
1666 assert_eq!(tools[0]["deprecated"], false);
1667 }
1668
1669 #[test]
1670 fn parse_host_tools_list_accepts_compat_fields() {
1671 let tools = parse_host_tools_list_response(serde_json::json!({
1672 "result": {
1673 "tools": [
1674 {
1675 "name": "Edit",
1676 "short_description": "Apply an edit",
1677 "input_schema": {"type": "object"},
1678 "deprecated": true,
1679 }
1680 ]
1681 }
1682 }))
1683 .expect("tool list");
1684
1685 assert_eq!(tools[0]["description"], "Apply an edit");
1686 assert_eq!(tools[0]["schema"]["type"], "object");
1687 assert_eq!(tools[0]["deprecated"], true);
1688 }
1689
1690 #[test]
1691 fn parse_host_tools_list_requires_tool_names() {
1692 let err = parse_host_tools_list_response(serde_json::json!({
1693 "tools": [
1694 {"description": "missing name"}
1695 ]
1696 }))
1697 .expect_err("expected error");
1698 assert!(err
1699 .to_string()
1700 .contains("host/tools/list: every tool must include a string `name`"));
1701 }
1702
1703 #[test]
1704 fn test_timeout_duration() {
1705 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1706 }
1707}