1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
8use std::future::Future;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11use std::pin::Pin;
12use std::rc::Rc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::Duration;
16
17use tokio::io::AsyncBufReadExt;
18use tokio::sync::{oneshot, Mutex, Notify};
19
20use harn_parser::diagnostic_codes::Code;
21
22use crate::orchestration::MutationSessionRecord;
23use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
24use crate::visible_text::VisibleTextState;
25use crate::vm::Vm;
26
27const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
29
30pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
31
32fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
33 Arc::new(move |line: &str| {
34 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
35 let mut stdout = std::io::stdout().lock();
36 stdout
37 .write_all(line.as_bytes())
38 .map_err(|e| format!("Bridge write error: {e}"))?;
39 stdout
40 .write_all(b"\n")
41 .map_err(|e| format!("Bridge write error: {e}"))?;
42 stdout
43 .flush()
44 .map_err(|e| format!("Bridge flush error: {e}"))?;
45 Ok(())
46 })
47}
48
49pub struct HostBridge {
56 next_id: AtomicU64,
57 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
59 cancelled: Arc<AtomicBool>,
61 cancel_notify: Arc<Notify>,
63 writer: HostBridgeWriter,
65 session_id: std::sync::Mutex<String>,
67 script_name: std::sync::Mutex<String>,
69 queued_transcript_injections: HostBridgeInjectionState,
71 resume_requested: Arc<AtomicBool>,
73 skills_reload_requested: Arc<AtomicBool>,
78 daemon_idle: Arc<AtomicBool>,
80 prompt_stop_reason: std::sync::Mutex<Option<String>>,
86 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
88 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
90 in_process: Option<InProcessHost>,
92}
93
94struct InProcessHost {
95 module_path: PathBuf,
96 exported_functions: BTreeMap<String, Rc<VmClosure>>,
97 vm: Vm,
98}
99
100impl InProcessHost {
101 fn dispatch<'a>(
107 &'a self,
108 method: &'a str,
109 params: serde_json::Value,
110 ) -> Pin<Box<dyn Future<Output = Result<serde_json::Value, VmError>> + 'a>> {
111 Box::pin(async move {
112 match method {
113 "builtin_call" => {
114 let name = params
115 .get("name")
116 .and_then(|value| value.as_str())
117 .unwrap_or_default();
118 let args = params
119 .get("args")
120 .and_then(|value| value.as_array())
121 .cloned()
122 .unwrap_or_default()
123 .into_iter()
124 .map(|value| json_result_to_vm_value(&value))
125 .collect::<Vec<_>>();
126 self.invoke_export(name, &args).await
127 }
128 "host/tools/list" => self
129 .invoke_optional_export("host_tools_list", &[])
130 .await
131 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
132 "session/request_permission" => self.request_permission(params).await,
133 other => Err(VmError::Runtime(format!(
134 "playground host backend does not implement bridge method '{other}'"
135 ))),
136 }
137 })
138 }
139
140 async fn invoke_export(
141 &self,
142 name: &str,
143 args: &[VmValue],
144 ) -> Result<serde_json::Value, VmError> {
145 let Some(closure) = self.exported_functions.get(name) else {
146 return Err(VmError::Runtime(format!(
147 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
148 self.module_path.display()
149 )));
150 };
151
152 let mut vm = self.vm.child_vm_for_host();
153 let result = vm.call_closure_pub(closure, args).await?;
154 Ok(crate::llm::vm_value_to_json(&result))
155 }
156
157 async fn invoke_optional_export(
158 &self,
159 name: &str,
160 args: &[VmValue],
161 ) -> Result<Option<serde_json::Value>, VmError> {
162 if !self.exported_functions.contains_key(name) {
163 return Ok(None);
164 }
165 self.invoke_export(name, args).await.map(Some)
166 }
167
168 async fn request_permission(
169 &self,
170 params: serde_json::Value,
171 ) -> Result<serde_json::Value, VmError> {
172 let Some(closure) = self.exported_functions.get("request_permission") else {
173 return Ok(serde_json::json!({ "granted": true }));
174 };
175
176 let tool_name = params
177 .get("toolCall")
178 .and_then(|tool_call| tool_call.get("toolName"))
179 .and_then(|value| value.as_str())
180 .unwrap_or_default();
181 let tool_args = params
182 .get("toolCall")
183 .and_then(|tool_call| tool_call.get("rawInput"))
184 .map(json_result_to_vm_value)
185 .unwrap_or(VmValue::Nil);
186 let full_payload = json_result_to_vm_value(¶ms);
187
188 let arg_count = closure.func.params.len();
189 let args = if arg_count >= 3 {
190 vec![
191 VmValue::String(Rc::from(tool_name.to_string())),
192 tool_args,
193 full_payload,
194 ]
195 } else if arg_count == 2 {
196 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
197 } else if arg_count == 1 {
198 vec![full_payload]
199 } else {
200 Vec::new()
201 };
202
203 let mut vm = self.vm.child_vm_for_host();
204 let result = vm.call_closure_pub(closure, &args).await?;
205 let payload = match result {
206 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
207 VmValue::String(reason) if !reason.is_empty() => {
208 serde_json::json!({ "granted": false, "reason": reason.to_string() })
209 }
210 other => {
211 let json = crate::llm::vm_value_to_json(&other);
212 if json
213 .get("granted")
214 .and_then(|value| value.as_bool())
215 .is_some()
216 || json.get("outcome").is_some()
217 {
218 json
219 } else {
220 serde_json::json!({ "granted": other.is_truthy() })
221 }
222 }
223 };
224 Ok(payload)
225 }
226}
227
228#[derive(Clone, Copy, Debug, PartialEq, Eq)]
238pub enum QueuedUserMessageMode {
239 InterruptImmediate,
240 FinishStep,
241 AuditOnly,
242}
243
244#[derive(Clone, Copy, Debug, PartialEq, Eq)]
245pub enum DeliveryCheckpoint {
246 InterruptImmediate,
247 AfterCurrentOperation,
248 EndOfInteraction,
249}
250
251impl QueuedUserMessageMode {
252 fn from_str(value: &str) -> Self {
253 match value {
254 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
255 "finish_step" | "after_current_operation" => Self::FinishStep,
256 _ => Self::AuditOnly,
261 }
262 }
263}
264
265#[derive(Clone, Debug, PartialEq, Eq)]
266pub struct QueuedUserMessage {
267 pub message_id: String,
268 pub content: String,
269 pub transcript_content: serde_json::Value,
270 pub mode: QueuedUserMessageMode,
271}
272
273#[derive(Clone, Debug, PartialEq, Eq)]
274pub struct QueuedReminder {
275 pub reminder: crate::llm::helpers::SystemReminder,
276 pub mode: QueuedUserMessageMode,
277}
278
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub enum QueuedTranscriptInjection {
281 User(QueuedUserMessage),
282 Reminder(QueuedReminder),
283}
284
285#[derive(Debug, Default)]
286struct QueuedTranscriptInjections {
287 queue: VecDeque<QueuedTranscriptInjection>,
288 revoked_user_message_ids: HashSet<String>,
289 delivered_user_message_ids: HashSet<String>,
290}
291
292#[derive(Clone, Debug, Default)]
293pub struct HostBridgeInjectionState {
294 inner: Arc<Mutex<QueuedTranscriptInjections>>,
295}
296
297#[derive(Clone, Copy, Debug, PartialEq, Eq)]
298pub enum PendingUserMessageMutationResult {
299 Mutated,
300 AlreadyRevoked,
301 AlreadyDelivered,
302 UnknownMessageId,
303}
304
305impl QueuedTranscriptInjection {
306 fn mode(&self) -> QueuedUserMessageMode {
307 match self {
308 Self::User(message) => message.mode,
309 Self::Reminder(reminder) => reminder.mode,
310 }
311 }
312}
313
314fn new_inject_message_id() -> String {
315 format!("msg_inj_{}", uuid::Uuid::now_v7().simple())
316}
317
318impl HostBridgeInjectionState {
319 pub fn new() -> Self {
320 Self::default()
321 }
322
323 pub async fn push_pending_user_message(
324 &self,
325 content: String,
326 transcript_content: serde_json::Value,
327 mode: &str,
328 ) -> String {
329 let message_id = new_inject_message_id();
330 self.inner
331 .lock()
332 .await
333 .queue
334 .push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
335 message_id: message_id.clone(),
336 content,
337 transcript_content,
338 mode: QueuedUserMessageMode::from_str(mode),
339 }));
340 message_id
341 }
342
343 pub async fn revoke_pending_user_message(
344 &self,
345 message_id: &str,
346 ) -> PendingUserMessageMutationResult {
347 let mut state = self.inner.lock().await;
348 let mut retained = VecDeque::new();
349 let mut revoked = false;
350 while let Some(injection) = state.queue.pop_front() {
351 match &injection {
352 QueuedTranscriptInjection::User(message) if message.message_id == message_id => {
353 revoked = true;
354 }
355 _ => retained.push_back(injection),
356 }
357 }
358 state.queue = retained;
359 if revoked {
360 state
361 .revoked_user_message_ids
362 .insert(message_id.to_string());
363 return PendingUserMessageMutationResult::Mutated;
364 }
365 if state.revoked_user_message_ids.contains(message_id) {
366 PendingUserMessageMutationResult::AlreadyRevoked
367 } else if state.delivered_user_message_ids.contains(message_id) {
368 PendingUserMessageMutationResult::AlreadyDelivered
369 } else {
370 PendingUserMessageMutationResult::UnknownMessageId
371 }
372 }
373
374 pub async fn replace_pending_user_message(
375 &self,
376 message_id: &str,
377 content: String,
378 transcript_content: serde_json::Value,
379 ) -> PendingUserMessageMutationResult {
380 let mut state = self.inner.lock().await;
381 for injection in &mut state.queue {
382 if let QueuedTranscriptInjection::User(message) = injection {
383 if message.message_id == message_id {
384 message.content = content;
385 message.transcript_content = transcript_content;
386 return PendingUserMessageMutationResult::Mutated;
387 }
388 }
389 }
390 if state.revoked_user_message_ids.contains(message_id) {
391 PendingUserMessageMutationResult::AlreadyRevoked
392 } else if state.delivered_user_message_ids.contains(message_id) {
393 PendingUserMessageMutationResult::AlreadyDelivered
394 } else {
395 PendingUserMessageMutationResult::UnknownMessageId
396 }
397 }
398
399 async fn push_session_reminder(&self, reminder: QueuedReminder) {
400 self.inner
401 .lock()
402 .await
403 .queue
404 .push_back(QueuedTranscriptInjection::Reminder(reminder));
405 }
406}
407
408fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
409 format!(
410 "{}: {}",
411 Code::ReminderUnknownOption.as_str(),
412 message.as_ref()
413 )
414}
415
416fn session_remind_shape_error(message: impl AsRef<str>) -> String {
417 format!(
418 "{}: {}",
419 Code::ReminderInvalidShape.as_str(),
420 message.as_ref()
421 )
422}
423
424fn reminder_unknown_propagate_error(message: impl AsRef<str>) -> String {
425 format!(
426 "{}: {}",
427 Code::ReminderUnknownPropagate.as_str(),
428 message.as_ref()
429 )
430}
431
432fn string_field(
433 map: &serde_json::Map<String, serde_json::Value>,
434 key: &str,
435 required: bool,
436) -> Result<Option<String>, String> {
437 match map.get(key) {
438 None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
439 format!("`{key}` must be a non-empty string"),
440 )),
441 None | Some(serde_json::Value::Null) => Ok(None),
442 Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
443 session_remind_shape_error(format!("`{key}` must be a non-empty string")),
444 ),
445 Some(serde_json::Value::String(value)) => {
446 let trimmed = value.trim();
447 if trimmed.is_empty() {
448 Ok(None)
449 } else {
450 Ok(Some(trimmed.to_string()))
451 }
452 }
453 Some(other) => Err(session_remind_shape_error(format!(
454 "`{key}` must be a string, got {other}"
455 ))),
456 }
457}
458
459fn bool_field(
460 map: &serde_json::Map<String, serde_json::Value>,
461 key: &str,
462) -> Result<Option<bool>, String> {
463 match map.get(key) {
464 None | Some(serde_json::Value::Null) => Ok(None),
465 Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
466 Some(other) => Err(session_remind_shape_error(format!(
467 "`{key}` must be a bool, got {other}"
468 ))),
469 }
470}
471
472fn int_field(
473 map: &serde_json::Map<String, serde_json::Value>,
474 key: &str,
475) -> Result<Option<i64>, String> {
476 match map.get(key) {
477 None | Some(serde_json::Value::Null) => Ok(None),
478 Some(serde_json::Value::Number(value)) => {
479 let Some(value) = value.as_i64() else {
480 return Err(session_remind_shape_error(format!(
481 "`{key}` must be an integer"
482 )));
483 };
484 Ok(Some(value))
485 }
486 Some(other) => Err(session_remind_shape_error(format!(
487 "`{key}` must be an int, got {other}"
488 ))),
489 }
490}
491
492fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
493 let Some(value) = map.get("tags") else {
494 return Ok(Vec::new());
495 };
496 if value.is_null() {
497 return Ok(Vec::new());
498 }
499 let Some(values) = value.as_array() else {
500 return Err(session_remind_shape_error("`tags` must be a list"));
501 };
502 let mut tags = Vec::new();
503 for value in values {
504 let Some(tag) = value.as_str() else {
505 return Err(session_remind_shape_error(format!(
506 "`tags` entries must be strings, got {value}"
507 )));
508 };
509 let tag = tag.trim();
510 if tag.is_empty() {
511 return Err(session_remind_shape_error(
512 "`tags` entries must be non-empty strings",
513 ));
514 }
515 if !tags.iter().any(|existing| existing == tag) {
516 tags.push(tag.to_string());
517 }
518 }
519 Ok(tags)
520}
521
522fn session_remind_payload_from_value(
523 value: &serde_json::Value,
524) -> Result<crate::llm::helpers::SystemReminder, String> {
525 let Some(map) = value.as_object() else {
526 return Err(session_remind_shape_error(
527 "session/remind payload must be a reminder object",
528 ));
529 };
530 const ALLOWED: &[&str] = &[
531 "_meta",
532 "body",
533 "dedupe_key",
534 "fired_at_turn",
535 "id",
536 "preserve_on_compact",
537 "propagate",
538 "role_hint",
539 "source",
540 "tags",
541 "ttl_turns",
542 ];
543 let unknown = map
544 .keys()
545 .filter(|key| !ALLOWED.contains(&key.as_str()))
546 .map(String::as_str)
547 .collect::<Vec<_>>();
548 if !unknown.is_empty() {
549 if unknown.contains(&"content") {
550 return Err(session_remind_shape_error(
551 "session/remind expects reminder `body`, not user-message `content`",
552 ));
553 }
554 return Err(reminder_unknown_option_error(format!(
555 "unknown reminder option(s): {}",
556 unknown.join(", ")
557 )));
558 }
559 if let Some(meta) = map.get("_meta") {
560 if !meta.is_null() && !meta.is_object() {
561 return Err(session_remind_shape_error("`_meta` must be an object"));
562 }
563 }
564 let ttl_turns = int_field(map, "ttl_turns")?;
565 if let Some(value) = ttl_turns {
566 if value <= 0 {
567 return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
568 }
569 }
570 let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
571 if fired_at_turn < 0 {
572 return Err(session_remind_shape_error(
573 "`fired_at_turn` must be >= 0 when provided",
574 ));
575 }
576 match string_field(map, "source", false)?.as_deref() {
577 None | Some("bridge") => {}
578 Some(_) => {
579 return Err(session_remind_shape_error(
580 "`source` for session/remind must be bridge when provided",
581 ))
582 }
583 }
584 let propagate = match string_field(map, "propagate", false)?.as_deref() {
585 None => crate::llm::helpers::ReminderPropagate::Session,
586 Some("all") => crate::llm::helpers::ReminderPropagate::All,
587 Some("session") => crate::llm::helpers::ReminderPropagate::Session,
588 Some("none") => crate::llm::helpers::ReminderPropagate::None,
589 Some(_) => {
590 return Err(reminder_unknown_propagate_error(
591 "`propagate` must be one of all, session, or none",
592 ))
593 }
594 };
595 let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
596 None => crate::llm::helpers::ReminderRoleHint::System,
597 Some("system") => crate::llm::helpers::ReminderRoleHint::System,
598 Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
599 Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
600 Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
601 Some(_) => {
602 return Err(session_remind_shape_error(
603 "`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
604 ))
605 }
606 };
607 Ok(crate::llm::helpers::SystemReminder {
608 id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
609 tags: tags_field(map)?,
610 dedupe_key: string_field(map, "dedupe_key", false)?,
611 ttl_turns,
612 preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
613 propagate,
614 role_hint,
615 source: crate::llm::helpers::ReminderSource::Bridge,
616 body: string_field(map, "body", true)?.unwrap_or_default(),
617 fired_at_turn,
618 originating_agent_id: None,
619 })
620}
621
622fn handle_cancel_tool_call_notification(params: &serde_json::Value) {
632 let session_id = params
633 .get("sessionId")
634 .or_else(|| params.get("session_id"))
635 .and_then(|value| value.as_str())
636 .unwrap_or_default();
637 let call_id = params
638 .get("toolCallId")
639 .or_else(|| params.get("tool_call_id"))
640 .or_else(|| params.get("callId"))
641 .or_else(|| params.get("call_id"))
642 .and_then(|value| value.as_str())
643 .unwrap_or_default();
644 if call_id.is_empty() {
645 return;
646 }
647 let reason = params
648 .get("reason")
649 .and_then(|value| value.as_str())
650 .unwrap_or("host cancelled in-flight tool call")
651 .to_string();
652 let inject_reminder = params
653 .get("injectReminder")
654 .or_else(|| params.get("inject_reminder"))
655 .and_then(|value| value.as_bool())
656 .unwrap_or(true);
657 crate::tool_call_cancellations::cancel(session_id, call_id, reason, inject_reminder);
658}
659
660fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
661 let mode = QueuedUserMessageMode::from_str(
662 params
663 .get("mode")
664 .and_then(|value| value.as_str())
665 .unwrap_or("audit_only"),
666 );
667 let reminder_value = if let Some(reminder) = params.get("reminder") {
668 reminder.clone()
669 } else {
670 let Some(params) = params.as_object() else {
671 return Err(session_remind_shape_error(
672 "session/remind params must be an object",
673 ));
674 };
675 let mut reminder = params.clone();
676 reminder.remove("mode");
677 reminder.remove("sessionId");
678 reminder.remove("session_id");
679 serde_json::Value::Object(reminder)
680 };
681 Ok(QueuedReminder {
682 reminder: session_remind_payload_from_value(&reminder_value)?,
683 mode,
684 })
685}
686
687#[allow(clippy::new_without_default)]
689impl HostBridge {
690 pub fn new() -> Self {
695 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
696 Arc::new(Mutex::new(HashMap::new()));
697 let cancelled = Arc::new(AtomicBool::new(false));
698 let cancel_notify = Arc::new(Notify::new());
699 let queued_transcript_injections = HostBridgeInjectionState::default();
700 let resume_requested = Arc::new(AtomicBool::new(false));
701 let skills_reload_requested = Arc::new(AtomicBool::new(false));
702 let daemon_idle = Arc::new(AtomicBool::new(false));
703
704 let pending_clone = pending.clone();
706 let cancelled_clone = cancelled.clone();
707 let cancel_notify_clone = cancel_notify.clone();
708 let queued_clone = queued_transcript_injections.clone();
709 let resume_clone = resume_requested.clone();
710 let skills_reload_clone = skills_reload_requested.clone();
711 tokio::task::spawn_local(async move {
712 let stdin = tokio::io::stdin();
713 let reader = tokio::io::BufReader::new(stdin);
714 let mut lines = reader.lines();
715
716 while let Ok(Some(line)) = lines.next_line().await {
717 let line = line.trim().to_string();
718 if line.is_empty() {
719 continue;
720 }
721
722 let msg: serde_json::Value = match serde_json::from_str(&line) {
723 Ok(v) => v,
724 Err(_) => continue,
725 };
726
727 if msg.get("id").is_none() {
729 if let Some(method) = msg["method"].as_str() {
730 if method == "cancel" {
731 cancelled_clone.store(true, Ordering::SeqCst);
732 cancel_notify_clone.notify_waiters();
733 } else if method == "agent/resume" {
734 resume_clone.store(true, Ordering::SeqCst);
735 } else if method == "skills/update" {
736 skills_reload_clone.store(true, Ordering::SeqCst);
737 } else if method == "session/remind" {
738 let params = &msg["params"];
739 if let Ok(reminder) = queued_session_remind_from_params(params) {
740 queued_clone.push_session_reminder(reminder).await;
741 }
742 } else if method == "session/cancel_tool_call" {
743 handle_cancel_tool_call_notification(&msg["params"]);
744 }
745 }
746 continue;
747 }
748
749 if let Some(id) = msg["id"].as_u64() {
750 let mut pending = pending_clone.lock().await;
751 if let Some(sender) = pending.remove(&id) {
752 let _ = sender.send(msg);
753 }
754 }
755 }
756
757 let mut pending = pending_clone.lock().await;
759 pending.clear();
760 });
761
762 Self {
763 next_id: AtomicU64::new(1),
764 pending,
765 cancelled,
766 cancel_notify,
767 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
768 session_id: std::sync::Mutex::new(String::new()),
769 script_name: std::sync::Mutex::new(String::new()),
770 queued_transcript_injections,
771 resume_requested,
772 skills_reload_requested,
773 daemon_idle,
774 prompt_stop_reason: std::sync::Mutex::new(None),
775 visible_call_states: std::sync::Mutex::new(HashMap::new()),
776 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
777 in_process: None,
778 }
779 }
780
781 pub fn from_parts(
787 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
788 cancelled: Arc<AtomicBool>,
789 stdout_lock: Arc<std::sync::Mutex<()>>,
790 start_id: u64,
791 ) -> Self {
792 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
793 }
794
795 pub fn from_parts_with_writer(
796 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
797 cancelled: Arc<AtomicBool>,
798 writer: HostBridgeWriter,
799 start_id: u64,
800 ) -> Self {
801 Self::from_parts_with_writer_and_cancel_notify(
802 pending,
803 cancelled,
804 Arc::new(Notify::new()),
805 writer,
806 start_id,
807 )
808 }
809
810 pub fn from_parts_with_writer_and_cancel_notify(
811 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
812 cancelled: Arc<AtomicBool>,
813 cancel_notify: Arc<Notify>,
814 writer: HostBridgeWriter,
815 start_id: u64,
816 ) -> Self {
817 Self::from_parts_with_writer_cancel_notify_and_injection_state(
818 pending,
819 cancelled,
820 cancel_notify,
821 writer,
822 start_id,
823 None,
824 )
825 }
826
827 pub fn from_parts_with_writer_cancel_notify_and_injection_state(
828 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
829 cancelled: Arc<AtomicBool>,
830 cancel_notify: Arc<Notify>,
831 writer: HostBridgeWriter,
832 start_id: u64,
833 injection_state: Option<HostBridgeInjectionState>,
834 ) -> Self {
835 Self {
836 next_id: AtomicU64::new(start_id),
837 pending,
838 cancelled,
839 cancel_notify,
840 writer,
841 session_id: std::sync::Mutex::new(String::new()),
842 script_name: std::sync::Mutex::new(String::new()),
843 queued_transcript_injections: injection_state.unwrap_or_default(),
844 resume_requested: Arc::new(AtomicBool::new(false)),
845 skills_reload_requested: Arc::new(AtomicBool::new(false)),
846 daemon_idle: Arc::new(AtomicBool::new(false)),
847 prompt_stop_reason: std::sync::Mutex::new(None),
848 visible_call_states: std::sync::Mutex::new(HashMap::new()),
849 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
850 in_process: None,
851 }
852 }
853
854 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
857 let exported_functions = vm.load_module_exports(module_path).await?;
858 Ok(Self {
859 next_id: AtomicU64::new(1),
860 pending: Arc::new(Mutex::new(HashMap::new())),
861 cancelled: Arc::new(AtomicBool::new(false)),
862 cancel_notify: Arc::new(Notify::new()),
863 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
864 session_id: std::sync::Mutex::new(String::new()),
865 script_name: std::sync::Mutex::new(String::new()),
866 queued_transcript_injections: HostBridgeInjectionState::default(),
867 resume_requested: Arc::new(AtomicBool::new(false)),
868 skills_reload_requested: Arc::new(AtomicBool::new(false)),
869 daemon_idle: Arc::new(AtomicBool::new(false)),
870 prompt_stop_reason: std::sync::Mutex::new(None),
871 visible_call_states: std::sync::Mutex::new(HashMap::new()),
872 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
873 in_process: Some(InProcessHost {
874 module_path: module_path.to_path_buf(),
875 exported_functions,
876 vm,
877 }),
878 })
879 }
880
881 pub fn set_session_id(&self, id: &str) {
883 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
884 }
885
886 pub fn set_script_name(&self, name: &str) {
888 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
889 }
890
891 fn get_script_name(&self) -> String {
893 self.script_name
894 .lock()
895 .unwrap_or_else(|e| e.into_inner())
896 .clone()
897 }
898
899 pub fn get_session_id(&self) -> String {
901 self.session_id
902 .lock()
903 .unwrap_or_else(|e| e.into_inner())
904 .clone()
905 }
906
907 fn write_line(&self, line: &str) -> Result<(), VmError> {
909 (self.writer)(line).map_err(VmError::Runtime)
910 }
911
912 pub async fn call(
915 &self,
916 method: &str,
917 params: serde_json::Value,
918 ) -> Result<serde_json::Value, VmError> {
919 if let Some(in_process) = &self.in_process {
920 return in_process.dispatch(method, params).await;
921 }
922
923 if self.is_cancelled() {
924 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
925 }
926
927 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
928 let cancel_wait = self.cancel_notify.notified();
929 tokio::pin!(cancel_wait);
930
931 let request = crate::jsonrpc::request(id, method, params);
932
933 let (tx, rx) = oneshot::channel();
934 {
935 let mut pending = self.pending.lock().await;
936 pending.insert(id, tx);
937 }
938
939 let line = serde_json::to_string(&request)
940 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
941 if let Err(e) = self.write_line(&line) {
942 let mut pending = self.pending.lock().await;
943 pending.remove(&id);
944 return Err(e);
945 }
946
947 if self.is_cancelled() {
948 let mut pending = self.pending.lock().await;
949 pending.remove(&id);
950 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
951 }
952
953 let response = tokio::select! {
954 result = rx => match result {
955 Ok(msg) => msg,
956 Err(_) => {
957 return Err(VmError::Runtime(
959 "Bridge: host closed connection before responding".into(),
960 ));
961 }
962 },
963 _ = &mut cancel_wait => {
964 let mut pending = self.pending.lock().await;
965 pending.remove(&id);
966 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
967 }
968 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
969 let mut pending = self.pending.lock().await;
970 pending.remove(&id);
971 return Err(VmError::Runtime(format!(
972 "Bridge: host did not respond to '{method}' within {}s",
973 DEFAULT_TIMEOUT.as_secs()
974 )));
975 }
976 };
977
978 if let Some(error) = response.get("error") {
979 let message = error["message"].as_str().unwrap_or("Unknown host error");
980 let code = error["code"].as_i64().unwrap_or(-1);
981 if code == -32001 {
983 return Err(VmError::CategorizedError {
984 message: message.to_string(),
985 category: ErrorCategory::ToolRejected,
986 });
987 }
988 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
989 }
990
991 Ok(response["result"].clone())
992 }
993
994 pub fn notify(&self, method: &str, params: serde_json::Value) {
997 let notification = crate::jsonrpc::notification(method, params);
998 if self.in_process.is_some() {
999 return;
1000 }
1001 if let Ok(line) = serde_json::to_string(¬ification) {
1002 let _ = self.write_line(&line);
1003 }
1004 }
1005
1006 pub fn is_cancelled(&self) -> bool {
1008 self.cancelled.load(Ordering::SeqCst)
1009 }
1010
1011 pub fn take_resume_signal(&self) -> bool {
1012 self.resume_requested.swap(false, Ordering::SeqCst)
1013 }
1014
1015 pub fn signal_resume(&self) {
1016 self.resume_requested.store(true, Ordering::SeqCst);
1017 }
1018
1019 pub fn set_daemon_idle(&self, idle: bool) {
1020 self.daemon_idle.store(idle, Ordering::SeqCst);
1021 }
1022
1023 pub fn is_daemon_idle(&self) -> bool {
1024 self.daemon_idle.load(Ordering::SeqCst)
1025 }
1026
1027 pub fn set_prompt_stop_reason(&self, reason: &str) {
1032 *self
1033 .prompt_stop_reason
1034 .lock()
1035 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
1036 }
1037
1038 pub fn take_prompt_stop_reason(&self) -> Option<String> {
1043 self.prompt_stop_reason
1044 .lock()
1045 .unwrap_or_else(|e| e.into_inner())
1046 .take()
1047 }
1048
1049 pub fn take_skills_reload_signal(&self) -> bool {
1054 self.skills_reload_requested.swap(false, Ordering::SeqCst)
1055 }
1056
1057 pub fn signal_skills_reload(&self) {
1061 self.skills_reload_requested.store(true, Ordering::SeqCst);
1062 }
1063
1064 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
1070 let result = self.call("skills/list", serde_json::json!({})).await?;
1071 match result {
1072 serde_json::Value::Array(items) => Ok(items),
1073 serde_json::Value::Object(map) => match map.get("skills") {
1074 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
1075 _ => Err(VmError::Runtime(
1076 "skills/list: host response must be an array or { skills: [...] }".into(),
1077 )),
1078 },
1079 _ => Err(VmError::Runtime(
1080 "skills/list: unexpected response shape".into(),
1081 )),
1082 }
1083 }
1084
1085 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
1091 let result = self.call("host/tools/list", serde_json::json!({})).await?;
1092 parse_host_tools_list_response(result)
1093 }
1094
1095 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
1099 self.call("skills/fetch", serde_json::json!({ "id": id }))
1100 .await
1101 }
1102
1103 pub fn injection_state(&self) -> HostBridgeInjectionState {
1104 self.queued_transcript_injections.clone()
1105 }
1106
1107 pub async fn push_pending_user_message(
1108 &self,
1109 content: String,
1110 transcript_content: serde_json::Value,
1111 mode: &str,
1112 ) -> String {
1113 self.queued_transcript_injections
1114 .push_pending_user_message(content, transcript_content, mode)
1115 .await
1116 }
1117
1118 pub async fn push_queued_user_message(&self, content: String, mode: &str) -> String {
1119 self.push_pending_user_message(content.clone(), serde_json::Value::String(content), mode)
1120 .await
1121 }
1122
1123 pub async fn revoke_pending_user_message(
1124 &self,
1125 message_id: &str,
1126 ) -> PendingUserMessageMutationResult {
1127 self.queued_transcript_injections
1128 .revoke_pending_user_message(message_id)
1129 .await
1130 }
1131
1132 pub async fn replace_pending_user_message(
1133 &self,
1134 message_id: &str,
1135 content: String,
1136 transcript_content: serde_json::Value,
1137 ) -> PendingUserMessageMutationResult {
1138 self.queued_transcript_injections
1139 .replace_pending_user_message(message_id, content, transcript_content)
1140 .await
1141 }
1142
1143 pub async fn push_queued_session_remind_from_params(
1144 &self,
1145 params: &serde_json::Value,
1146 ) -> Result<String, String> {
1147 let reminder = queued_session_remind_from_params(params)?;
1148 let reminder_id = reminder.reminder.id.clone();
1149 self.queued_transcript_injections
1150 .push_session_reminder(reminder)
1151 .await;
1152 Ok(reminder_id)
1153 }
1154
1155 pub async fn take_queued_user_messages(
1156 &self,
1157 include_interrupt_immediate: bool,
1158 include_finish_step: bool,
1159 include_audit_only: bool,
1160 ) -> Vec<QueuedUserMessage> {
1161 let mut state = self.queued_transcript_injections.inner.lock().await;
1162 let mut selected = Vec::new();
1163 let mut retained = VecDeque::new();
1164 while let Some(injection) = state.queue.pop_front() {
1165 let should_take = match injection.mode() {
1166 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1167 QueuedUserMessageMode::FinishStep => include_finish_step,
1168 QueuedUserMessageMode::AuditOnly => include_audit_only,
1169 };
1170 match (should_take, injection) {
1171 (true, QueuedTranscriptInjection::User(message)) => {
1172 state
1173 .delivered_user_message_ids
1174 .insert(message.message_id.clone());
1175 selected.push(message);
1176 }
1177 (_, injection) => retained.push_back(injection),
1178 }
1179 }
1180 state.queue = retained;
1181 selected
1182 }
1183
1184 pub async fn take_queued_transcript_injections(
1185 &self,
1186 include_interrupt_immediate: bool,
1187 include_finish_step: bool,
1188 include_audit_only: bool,
1189 ) -> Vec<QueuedTranscriptInjection> {
1190 let mut state = self.queued_transcript_injections.inner.lock().await;
1191 let mut selected = Vec::new();
1192 let mut retained = VecDeque::new();
1193 while let Some(injection) = state.queue.pop_front() {
1194 let should_take = match injection.mode() {
1195 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
1196 QueuedUserMessageMode::FinishStep => include_finish_step,
1197 QueuedUserMessageMode::AuditOnly => include_audit_only,
1198 };
1199 if should_take {
1200 if let QueuedTranscriptInjection::User(message) = &injection {
1201 state
1202 .delivered_user_message_ids
1203 .insert(message.message_id.clone());
1204 }
1205 selected.push(injection);
1206 } else {
1207 retained.push_back(injection);
1208 }
1209 }
1210 state.queue = retained;
1211 selected
1212 }
1213
1214 pub async fn take_queued_user_messages_for(
1215 &self,
1216 checkpoint: DeliveryCheckpoint,
1217 ) -> Vec<QueuedUserMessage> {
1218 match checkpoint {
1219 DeliveryCheckpoint::InterruptImmediate => {
1220 self.take_queued_user_messages(true, false, false).await
1221 }
1222 DeliveryCheckpoint::AfterCurrentOperation => {
1223 self.take_queued_user_messages(false, true, false).await
1224 }
1225 DeliveryCheckpoint::EndOfInteraction => {
1226 self.take_queued_user_messages(false, false, true).await
1227 }
1228 }
1229 }
1230
1231 pub async fn take_queued_transcript_injections_for(
1232 &self,
1233 checkpoint: DeliveryCheckpoint,
1234 ) -> Vec<QueuedTranscriptInjection> {
1235 match checkpoint {
1236 DeliveryCheckpoint::InterruptImmediate => {
1237 self.take_queued_transcript_injections(true, false, false)
1238 .await
1239 }
1240 DeliveryCheckpoint::AfterCurrentOperation => {
1241 self.take_queued_transcript_injections(false, true, false)
1242 .await
1243 }
1244 DeliveryCheckpoint::EndOfInteraction => {
1245 self.take_queued_transcript_injections(false, false, true)
1246 .await
1247 }
1248 }
1249 }
1250
1251 pub fn send_output(&self, text: &str) {
1253 self.notify("output", serde_json::json!({"text": text}));
1254 }
1255
1256 pub fn send_progress(
1258 &self,
1259 phase: &str,
1260 message: &str,
1261 progress: Option<i64>,
1262 total: Option<i64>,
1263 data: Option<serde_json::Value>,
1264 ) {
1265 let mut payload = serde_json::json!({"phase": phase, "message": message});
1266 if let Some(p) = progress {
1267 payload["progress"] = serde_json::json!(p);
1268 }
1269 if let Some(t) = total {
1270 payload["total"] = serde_json::json!(t);
1271 }
1272 if let Some(d) = data {
1273 payload["data"] = d;
1274 }
1275 self.notify("progress", payload);
1276 }
1277
1278 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
1280 let mut payload = serde_json::json!({"level": level, "message": message});
1281 if let Some(f) = fields {
1282 payload["fields"] = f;
1283 }
1284 self.notify("log", payload);
1285 }
1286
1287 pub fn send_call_start(
1290 &self,
1291 call_id: &str,
1292 call_type: &str,
1293 name: &str,
1294 metadata: serde_json::Value,
1295 ) {
1296 let session_id = self.get_session_id();
1297 let script = self.get_script_name();
1298 let stream_publicly = metadata
1299 .get("stream_publicly")
1300 .and_then(|value| value.as_bool())
1301 .unwrap_or(true);
1302 self.visible_call_streams
1303 .lock()
1304 .unwrap_or_else(|e| e.into_inner())
1305 .insert(call_id.to_string(), stream_publicly);
1306 self.notify(
1307 "session/update",
1308 serde_json::json!({
1309 "sessionId": session_id,
1310 "update": {
1311 "sessionUpdate": "call_start",
1312 "content": {
1313 "toolCallId": call_id,
1314 "call_type": call_type,
1315 "name": name,
1316 "script": script,
1317 "metadata": metadata,
1318 },
1319 },
1320 }),
1321 );
1322 }
1323
1324 pub fn send_call_progress(
1327 &self,
1328 call_id: &str,
1329 delta: &str,
1330 accumulated_tokens: u64,
1331 user_visible: bool,
1332 ) {
1333 let session_id = self.get_session_id();
1334 let (visible_text, visible_delta) = {
1335 let stream_publicly = self
1336 .visible_call_streams
1337 .lock()
1338 .unwrap_or_else(|e| e.into_inner())
1339 .get(call_id)
1340 .copied()
1341 .unwrap_or(true);
1342 let mut states = self
1343 .visible_call_states
1344 .lock()
1345 .unwrap_or_else(|e| e.into_inner());
1346 let state = states.entry(call_id.to_string()).or_default();
1347 state.push(delta, stream_publicly)
1348 };
1349 self.notify(
1350 "session/update",
1351 serde_json::json!({
1352 "sessionId": session_id,
1353 "update": {
1354 "sessionUpdate": "call_progress",
1355 "content": {
1356 "toolCallId": call_id,
1357 "delta": delta,
1358 "accumulated_tokens": accumulated_tokens,
1359 "visible_text": visible_text,
1360 "visible_delta": visible_delta,
1361 "user_visible": user_visible,
1362 },
1363 },
1364 }),
1365 );
1366 }
1367
1368 pub fn send_call_end(
1370 &self,
1371 call_id: &str,
1372 call_type: &str,
1373 name: &str,
1374 duration_ms: u64,
1375 status: &str,
1376 metadata: serde_json::Value,
1377 ) {
1378 let session_id = self.get_session_id();
1379 let script = self.get_script_name();
1380 self.visible_call_states
1381 .lock()
1382 .unwrap_or_else(|e| e.into_inner())
1383 .remove(call_id);
1384 self.visible_call_streams
1385 .lock()
1386 .unwrap_or_else(|e| e.into_inner())
1387 .remove(call_id);
1388 self.notify(
1389 "session/update",
1390 serde_json::json!({
1391 "sessionId": session_id,
1392 "update": {
1393 "sessionUpdate": "call_end",
1394 "content": {
1395 "toolCallId": call_id,
1396 "call_type": call_type,
1397 "name": name,
1398 "script": script,
1399 "duration_ms": duration_ms,
1400 "status": status,
1401 "metadata": metadata,
1402 },
1403 },
1404 }),
1405 );
1406 }
1407
1408 pub fn send_worker_update(
1410 &self,
1411 worker_id: &str,
1412 worker_name: &str,
1413 status: &str,
1414 metadata: serde_json::Value,
1415 audit: Option<&MutationSessionRecord>,
1416 ) {
1417 let session_id = self.get_session_id();
1418 let script = self.get_script_name();
1419 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
1420 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
1421 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
1422 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
1423 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
1424 let lifecycle = serde_json::json!({
1425 "event": status,
1426 "worker_id": worker_id,
1427 "worker_name": worker_name,
1428 "started_at": started_at,
1429 "finished_at": finished_at,
1430 });
1431 self.notify(
1432 "session/update",
1433 serde_json::json!({
1434 "sessionId": session_id,
1435 "update": {
1436 "sessionUpdate": "worker_update",
1437 "content": {
1438 "worker_id": worker_id,
1439 "worker_name": worker_name,
1440 "status": status,
1441 "script": script,
1442 "started_at": started_at,
1443 "finished_at": finished_at,
1444 "snapshot_path": snapshot_path,
1445 "run_id": run_id,
1446 "run_path": run_path,
1447 "lifecycle": lifecycle,
1448 "audit": audit,
1449 "metadata": metadata,
1450 },
1451 },
1452 }),
1453 );
1454 }
1455}
1456
1457pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
1459 crate::stdlib::json_to_vm_value(val)
1460}
1461
1462fn parse_host_tools_list_response(
1463 result: serde_json::Value,
1464) -> Result<Vec<serde_json::Value>, VmError> {
1465 let tools = match result {
1466 serde_json::Value::Array(items) => items,
1467 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
1468 map.get("result")
1469 .and_then(|value| value.get("tools"))
1470 .cloned()
1471 }) {
1472 Some(serde_json::Value::Array(items)) => items,
1473 _ => {
1474 return Err(VmError::Runtime(
1475 "host/tools/list: host response must be an array or { tools: [...] }".into(),
1476 ));
1477 }
1478 },
1479 _ => {
1480 return Err(VmError::Runtime(
1481 "host/tools/list: unexpected response shape".into(),
1482 ));
1483 }
1484 };
1485
1486 let mut normalized = Vec::with_capacity(tools.len());
1487 for tool in tools {
1488 let serde_json::Value::Object(map) = tool else {
1489 return Err(VmError::Runtime(
1490 "host/tools/list: every tool must be an object".into(),
1491 ));
1492 };
1493 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
1494 return Err(VmError::Runtime(
1495 "host/tools/list: every tool must include a string `name`".into(),
1496 ));
1497 };
1498 let description = map
1499 .get("description")
1500 .and_then(|value| value.as_str())
1501 .or_else(|| {
1502 map.get("short_description")
1503 .and_then(|value| value.as_str())
1504 })
1505 .unwrap_or_default();
1506 let schema = map
1507 .get("schema")
1508 .cloned()
1509 .or_else(|| map.get("parameters").cloned())
1510 .or_else(|| map.get("input_schema").cloned())
1511 .unwrap_or(serde_json::Value::Null);
1512 let deprecated = map
1513 .get("deprecated")
1514 .and_then(|value| value.as_bool())
1515 .unwrap_or(false);
1516 normalized.push(serde_json::json!({
1517 "name": name,
1518 "description": description,
1519 "schema": schema,
1520 "deprecated": deprecated,
1521 }));
1522 }
1523 Ok(normalized)
1524}
1525
1526#[cfg(test)]
1527mod tests {
1528 use super::*;
1529
1530 fn test_bridge() -> HostBridge {
1531 HostBridge::from_parts(
1532 Arc::new(Mutex::new(HashMap::new())),
1533 Arc::new(AtomicBool::new(false)),
1534 Arc::new(std::sync::Mutex::new(())),
1535 1,
1536 )
1537 }
1538
1539 fn test_bridge_sharing_injection_state(owner: &HostBridge) -> HostBridge {
1540 HostBridge::from_parts_with_writer_cancel_notify_and_injection_state(
1541 Arc::new(Mutex::new(HashMap::new())),
1542 Arc::new(AtomicBool::new(false)),
1543 Arc::new(Notify::new()),
1544 Arc::new(|_| Ok(())),
1545 100,
1546 Some(owner.injection_state()),
1547 )
1548 }
1549
1550 #[test]
1551 fn test_json_rpc_request_format() {
1552 let request = crate::jsonrpc::request(
1553 1,
1554 "llm_call",
1555 serde_json::json!({
1556 "prompt": "Hello",
1557 "system": "Be helpful",
1558 }),
1559 );
1560 let s = serde_json::to_string(&request).unwrap();
1561 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1562 assert!(s.contains("\"id\":1"));
1563 assert!(s.contains("\"method\":\"llm_call\""));
1564 }
1565
1566 #[test]
1567 fn test_json_rpc_notification_format() {
1568 let notification =
1569 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1570 let s = serde_json::to_string(¬ification).unwrap();
1571 assert!(s.contains("\"method\":\"output\""));
1572 assert!(!s.contains("\"id\""));
1573 }
1574
1575 #[test]
1576 fn test_json_rpc_error_response_parsing() {
1577 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1578 assert!(response.get("error").is_some());
1579 assert_eq!(
1580 response["error"]["message"].as_str().unwrap(),
1581 "Invalid request"
1582 );
1583 }
1584
1585 #[test]
1586 fn test_json_rpc_success_response_parsing() {
1587 let response = crate::jsonrpc::response(
1588 1,
1589 serde_json::json!({
1590 "text": "Hello world",
1591 "input_tokens": 10,
1592 "output_tokens": 5,
1593 }),
1594 );
1595 assert!(response.get("result").is_some());
1596 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1597 }
1598
1599 #[test]
1600 fn test_cancelled_flag() {
1601 let cancelled = Arc::new(AtomicBool::new(false));
1602 assert!(!cancelled.load(Ordering::SeqCst));
1603 cancelled.store(true, Ordering::SeqCst);
1604 assert!(cancelled.load(Ordering::SeqCst));
1605 }
1606
1607 #[test]
1608 fn pending_host_calls_return_when_cancellation_arrives() {
1609 let runtime = tokio::runtime::Builder::new_current_thread()
1610 .enable_all()
1611 .build()
1612 .unwrap();
1613 runtime.block_on(async {
1614 let pending = Arc::new(Mutex::new(HashMap::new()));
1615 let cancelled = Arc::new(AtomicBool::new(false));
1616 let bridge = HostBridge::from_parts_with_writer(
1617 pending.clone(),
1618 cancelled.clone(),
1619 Arc::new(|_| Ok(())),
1620 1,
1621 );
1622
1623 let call = bridge.call("host/work", serde_json::json!({}));
1624 tokio::pin!(call);
1625
1626 loop {
1627 tokio::select! {
1628 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1629 _ = tokio::task::yield_now() => {}
1630 }
1631 if !pending.lock().await.is_empty() {
1632 break;
1633 }
1634 }
1635
1636 cancelled.store(true, Ordering::SeqCst);
1637 bridge.cancel_notify.notify_waiters();
1638
1639 let result = tokio::time::timeout(Duration::from_secs(1), call)
1640 .await
1641 .expect("pending call should observe cancellation promptly");
1642 assert!(
1643 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1644 );
1645 assert!(pending.lock().await.is_empty());
1646 });
1647 }
1648
1649 #[test]
1650 fn queued_messages_are_filtered_by_delivery_mode() {
1651 let runtime = tokio::runtime::Builder::new_current_thread()
1652 .enable_all()
1653 .build()
1654 .unwrap();
1655 runtime.block_on(async {
1656 let bridge = test_bridge();
1657 bridge
1658 .push_queued_user_message("first".to_string(), "finish_step")
1659 .await;
1660 bridge
1661 .push_queued_user_message("second".to_string(), "audit_only")
1662 .await;
1663
1664 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1665 assert_eq!(finish_step.len(), 1);
1666 assert_eq!(finish_step[0].content, "first");
1667
1668 let audit_only = bridge.take_queued_user_messages(false, false, true).await;
1669 assert_eq!(audit_only.len(), 1);
1670 assert_eq!(audit_only[0].content, "second");
1671 });
1672 }
1673
1674 #[test]
1675 fn pending_user_messages_support_revoke_replace_and_delivery_states() {
1676 let runtime = tokio::runtime::Builder::new_current_thread()
1677 .enable_all()
1678 .build()
1679 .unwrap();
1680 runtime.block_on(async {
1681 let bridge = test_bridge();
1682 let first_id = bridge
1683 .push_pending_user_message(
1684 "first".to_string(),
1685 serde_json::json!("first"),
1686 "audit_only",
1687 )
1688 .await;
1689 let second_id = bridge
1690 .push_pending_user_message(
1691 "second".to_string(),
1692 serde_json::json!("second"),
1693 "audit_only",
1694 )
1695 .await;
1696
1697 assert_eq!(
1698 bridge
1699 .replace_pending_user_message(
1700 &second_id,
1701 "second edited".to_string(),
1702 serde_json::json!("second edited"),
1703 )
1704 .await,
1705 PendingUserMessageMutationResult::Mutated
1706 );
1707 assert_eq!(
1708 bridge.revoke_pending_user_message(&first_id).await,
1709 PendingUserMessageMutationResult::Mutated
1710 );
1711 assert_eq!(
1712 bridge.revoke_pending_user_message(&first_id).await,
1713 PendingUserMessageMutationResult::AlreadyRevoked
1714 );
1715
1716 let delivered = bridge
1717 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1718 .await;
1719 assert_eq!(delivered.len(), 1);
1720 assert_eq!(delivered[0].message_id, second_id);
1721 assert_eq!(delivered[0].content, "second edited");
1722
1723 assert_eq!(
1724 bridge.revoke_pending_user_message(&second_id).await,
1725 PendingUserMessageMutationResult::AlreadyDelivered
1726 );
1727 assert_eq!(
1728 bridge
1729 .replace_pending_user_message(
1730 &second_id,
1731 "too late".to_string(),
1732 serde_json::json!("too late"),
1733 )
1734 .await,
1735 PendingUserMessageMutationResult::AlreadyDelivered
1736 );
1737 assert_eq!(
1738 bridge.revoke_pending_user_message("missing").await,
1739 PendingUserMessageMutationResult::UnknownMessageId
1740 );
1741 });
1742 }
1743
1744 #[test]
1745 fn pending_user_message_replace_preserves_fifo_position_and_mode() {
1746 let runtime = tokio::runtime::Builder::new_current_thread()
1747 .enable_all()
1748 .build()
1749 .unwrap();
1750 runtime.block_on(async {
1751 let bridge = test_bridge();
1752 let first_id = bridge
1753 .push_pending_user_message(
1754 "first".to_string(),
1755 serde_json::json!("first"),
1756 "finish_step",
1757 )
1758 .await;
1759 let second_id = bridge
1760 .push_pending_user_message(
1761 "second".to_string(),
1762 serde_json::json!("second"),
1763 "finish_step",
1764 )
1765 .await;
1766 assert_eq!(
1767 bridge
1768 .replace_pending_user_message(
1769 &first_id,
1770 "first edited".to_string(),
1771 serde_json::json!("first edited"),
1772 )
1773 .await,
1774 PendingUserMessageMutationResult::Mutated
1775 );
1776
1777 let delivered = bridge
1778 .take_queued_user_messages_for(DeliveryCheckpoint::AfterCurrentOperation)
1779 .await;
1780 assert_eq!(
1781 delivered
1782 .iter()
1783 .map(|message| (&message.message_id, message.content.as_str(), message.mode))
1784 .collect::<Vec<_>>(),
1785 vec![
1786 (&first_id, "first edited", QueuedUserMessageMode::FinishStep,),
1787 (&second_id, "second", QueuedUserMessageMode::FinishStep),
1788 ]
1789 );
1790 });
1791 }
1792
1793 #[test]
1794 fn pending_user_message_state_survives_bridge_replacement() {
1795 let runtime = tokio::runtime::Builder::new_current_thread()
1796 .enable_all()
1797 .build()
1798 .unwrap();
1799 runtime.block_on(async {
1800 let bridge = test_bridge();
1801 let revoked_id = bridge
1802 .push_pending_user_message(
1803 "revoke me".to_string(),
1804 serde_json::json!("revoke me"),
1805 "audit_only",
1806 )
1807 .await;
1808 let delivered_id = bridge
1809 .push_pending_user_message(
1810 "deliver me".to_string(),
1811 serde_json::json!("deliver me"),
1812 "audit_only",
1813 )
1814 .await;
1815 assert_eq!(
1816 bridge.revoke_pending_user_message(&revoked_id).await,
1817 PendingUserMessageMutationResult::Mutated
1818 );
1819 bridge.cancelled.store(true, Ordering::SeqCst);
1820
1821 let replacement_bridge = test_bridge_sharing_injection_state(&bridge);
1822 assert_eq!(
1823 replacement_bridge
1824 .revoke_pending_user_message(&revoked_id)
1825 .await,
1826 PendingUserMessageMutationResult::AlreadyRevoked
1827 );
1828 let delivered = replacement_bridge
1829 .take_queued_user_messages_for(DeliveryCheckpoint::EndOfInteraction)
1830 .await;
1831 assert_eq!(delivered.len(), 1);
1832 assert_eq!(delivered[0].message_id, delivered_id);
1833 assert_eq!(delivered[0].content, "deliver me");
1834 assert_eq!(
1835 bridge.revoke_pending_user_message(&delivered_id).await,
1836 PendingUserMessageMutationResult::AlreadyDelivered
1837 );
1838 });
1839 }
1840
1841 #[test]
1842 fn queued_transcript_injections_preserve_user_reminder_separation() {
1843 let runtime = tokio::runtime::Builder::new_current_thread()
1844 .enable_all()
1845 .build()
1846 .unwrap();
1847 runtime.block_on(async {
1848 let bridge = test_bridge();
1849 bridge
1850 .push_queued_user_message("human follow-up".to_string(), "finish_step")
1851 .await;
1852 let reminder_id = bridge
1853 .push_queued_session_remind_from_params(&serde_json::json!({
1854 "body": "Host-provided ambient context.",
1855 "tags": ["host"],
1856 "dedupe_key": "host-context",
1857 "ttl_turns": 2,
1858 "mode": "audit_only",
1859 "_meta": {"harn": {"source": "test"}},
1860 }))
1861 .await
1862 .expect("valid reminder");
1863
1864 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1865 assert_eq!(finish_step.len(), 1);
1866 assert_eq!(finish_step[0].content, "human follow-up");
1867
1868 let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
1869 assert!(no_user_messages.is_empty());
1870
1871 let injections = bridge
1872 .take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
1873 .await;
1874 assert_eq!(injections.len(), 1);
1875 let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
1876 panic!("expected queued reminder");
1877 };
1878 assert_eq!(reminder.reminder.id, reminder_id);
1879 assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
1880 assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
1881 assert_eq!(
1882 reminder.reminder.dedupe_key.as_deref(),
1883 Some("host-context")
1884 );
1885 assert_eq!(reminder.reminder.ttl_turns, Some(2));
1886 assert_eq!(
1887 reminder.reminder.source,
1888 crate::llm::helpers::ReminderSource::Bridge
1889 );
1890 });
1891 }
1892
1893 #[test]
1894 fn bridge_remind_modes_honor_delivery_checkpoints() {
1895 let runtime = tokio::runtime::Builder::new_current_thread()
1896 .enable_all()
1897 .build()
1898 .unwrap();
1899 runtime.block_on(async {
1900 let cases = [
1901 (
1902 "interrupt_immediate",
1903 DeliveryCheckpoint::InterruptImmediate,
1904 DeliveryCheckpoint::AfterCurrentOperation,
1905 ),
1906 (
1907 "finish_step",
1908 DeliveryCheckpoint::AfterCurrentOperation,
1909 DeliveryCheckpoint::EndOfInteraction,
1910 ),
1911 (
1912 "audit_only",
1913 DeliveryCheckpoint::EndOfInteraction,
1914 DeliveryCheckpoint::InterruptImmediate,
1915 ),
1916 ];
1917
1918 for (mode, expected_checkpoint, wrong_checkpoint) in cases {
1919 let bridge = test_bridge();
1920 bridge
1921 .push_queued_session_remind_from_params(&serde_json::json!({
1922 "body": format!("Reminder for {mode}"),
1923 "mode": mode,
1924 }))
1925 .await
1926 .expect("valid session/remind payload");
1927
1928 let premature = bridge
1929 .take_queued_transcript_injections_for(wrong_checkpoint)
1930 .await;
1931 assert!(
1932 premature.is_empty(),
1933 "{mode} reminder must not be delivered at {wrong_checkpoint:?}"
1934 );
1935
1936 let delivered = bridge
1937 .take_queued_transcript_injections_for(expected_checkpoint)
1938 .await;
1939 assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
1940 let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
1941 panic!("expected reminder for {mode}");
1942 };
1943 assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
1944 }
1945 });
1946 }
1947
1948 #[test]
1949 fn session_remind_validation_rejects_user_message_shape() {
1950 let err = queued_session_remind_from_params(&serde_json::json!({
1951 "content": "this is still a user message",
1952 "mode": "interrupt_immediate",
1953 }))
1954 .expect_err("session/remind must require a reminder body");
1955 assert!(err.contains(Code::ReminderInvalidShape.as_str()));
1956 assert!(err.contains("body"));
1957 }
1958
1959 #[test]
1960 fn session_remind_validation_rejects_unknown_options_separately() {
1961 let err = queued_session_remind_from_params(&serde_json::json!({
1962 "body": "valid body",
1963 "unknown_host_field": true,
1964 }))
1965 .expect_err("session/remind must reject unknown top-level fields");
1966 assert!(err.contains(Code::ReminderUnknownOption.as_str()));
1967 assert!(err.contains("unknown_host_field"));
1968 }
1969
1970 #[test]
1971 fn session_remind_validation_rejects_unknown_propagate_with_specific_code() {
1972 let err = queued_session_remind_from_params(&serde_json::json!({
1973 "body": "valid body",
1974 "propagate": "workspace",
1975 }))
1976 .expect_err("session/remind must reject unknown propagate values");
1977 assert!(err.contains(Code::ReminderUnknownPropagate.as_str()));
1978 assert!(err.contains("propagate"));
1979 }
1980
1981 #[test]
1982 fn test_json_result_to_vm_value_string() {
1983 let val = serde_json::json!("hello");
1984 let vm_val = json_result_to_vm_value(&val);
1985 assert_eq!(vm_val.display(), "hello");
1986 }
1987
1988 #[test]
1989 fn test_json_result_to_vm_value_dict() {
1990 let val = serde_json::json!({"name": "test", "count": 42});
1991 let vm_val = json_result_to_vm_value(&val);
1992 let VmValue::Dict(d) = &vm_val else {
1993 unreachable!("Expected Dict, got {:?}", vm_val);
1994 };
1995 assert_eq!(d.get("name").unwrap().display(), "test");
1996 assert_eq!(d.get("count").unwrap().display(), "42");
1997 }
1998
1999 #[test]
2000 fn test_json_result_to_vm_value_null() {
2001 let val = serde_json::json!(null);
2002 let vm_val = json_result_to_vm_value(&val);
2003 assert!(matches!(vm_val, VmValue::Nil));
2004 }
2005
2006 #[test]
2007 fn test_json_result_to_vm_value_nested() {
2008 let val = serde_json::json!({
2009 "text": "response",
2010 "tool_calls": [
2011 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
2012 ],
2013 "input_tokens": 100,
2014 "output_tokens": 50,
2015 });
2016 let vm_val = json_result_to_vm_value(&val);
2017 let VmValue::Dict(d) = &vm_val else {
2018 unreachable!("Expected Dict, got {:?}", vm_val);
2019 };
2020 assert_eq!(d.get("text").unwrap().display(), "response");
2021 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
2022 unreachable!("Expected List for tool_calls");
2023 };
2024 assert_eq!(list.len(), 1);
2025 }
2026
2027 #[test]
2028 fn parse_host_tools_list_accepts_object_wrapper() {
2029 let tools = parse_host_tools_list_response(serde_json::json!({
2030 "tools": [
2031 {
2032 "name": "Read",
2033 "description": "Read a file",
2034 "schema": {"type": "object"},
2035 }
2036 ]
2037 }))
2038 .expect("tool list");
2039
2040 assert_eq!(tools.len(), 1);
2041 assert_eq!(tools[0]["name"], "Read");
2042 assert_eq!(tools[0]["deprecated"], false);
2043 }
2044
2045 #[test]
2046 fn parse_host_tools_list_accepts_compat_fields() {
2047 let tools = parse_host_tools_list_response(serde_json::json!({
2048 "result": {
2049 "tools": [
2050 {
2051 "name": "Edit",
2052 "short_description": "Apply an edit",
2053 "input_schema": {"type": "object"},
2054 "deprecated": true,
2055 }
2056 ]
2057 }
2058 }))
2059 .expect("tool list");
2060
2061 assert_eq!(tools[0]["description"], "Apply an edit");
2062 assert_eq!(tools[0]["schema"]["type"], "object");
2063 assert_eq!(tools[0]["deprecated"], true);
2064 }
2065
2066 #[test]
2067 fn parse_host_tools_list_requires_tool_names() {
2068 let err = parse_host_tools_list_response(serde_json::json!({
2069 "tools": [
2070 {"description": "missing name"}
2071 ]
2072 }))
2073 .expect_err("expected error");
2074 assert!(err
2075 .to_string()
2076 .contains("host/tools/list: every tool must include a string `name`"));
2077 }
2078
2079 #[test]
2080 fn test_timeout_duration() {
2081 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
2082 }
2083}