1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29 Arc::new(move |line: &str| {
30 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31 let mut stdout = std::io::stdout().lock();
32 stdout
33 .write_all(line.as_bytes())
34 .map_err(|e| format!("Bridge write error: {e}"))?;
35 stdout
36 .write_all(b"\n")
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .flush()
40 .map_err(|e| format!("Bridge flush error: {e}"))?;
41 Ok(())
42 })
43}
44
45pub struct HostBridge {
52 next_id: AtomicU64,
53 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55 cancelled: Arc<AtomicBool>,
57 cancel_notify: Arc<Notify>,
59 writer: HostBridgeWriter,
61 session_id: std::sync::Mutex<String>,
63 script_name: std::sync::Mutex<String>,
65 queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
67 resume_requested: Arc<AtomicBool>,
69 skills_reload_requested: Arc<AtomicBool>,
74 daemon_idle: Arc<AtomicBool>,
76 prompt_stop_reason: std::sync::Mutex<Option<String>>,
82 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
84 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
86 in_process: Option<InProcessHost>,
88}
89
90struct InProcessHost {
91 module_path: PathBuf,
92 exported_functions: BTreeMap<String, Rc<VmClosure>>,
93 vm: Vm,
94}
95
96impl InProcessHost {
97 async fn dispatch(
98 &self,
99 method: &str,
100 params: serde_json::Value,
101 ) -> Result<serde_json::Value, VmError> {
102 match method {
103 "builtin_call" => {
104 let name = params
105 .get("name")
106 .and_then(|value| value.as_str())
107 .unwrap_or_default();
108 let args = params
109 .get("args")
110 .and_then(|value| value.as_array())
111 .cloned()
112 .unwrap_or_default()
113 .into_iter()
114 .map(|value| json_result_to_vm_value(&value))
115 .collect::<Vec<_>>();
116 self.invoke_export(name, &args).await
117 }
118 "host/tools/list" => self
119 .invoke_optional_export("host_tools_list", &[])
120 .await
121 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
122 "session/request_permission" => self.request_permission(params).await,
123 other => Err(VmError::Runtime(format!(
124 "playground host backend does not implement bridge method '{other}'"
125 ))),
126 }
127 }
128
129 async fn invoke_export(
130 &self,
131 name: &str,
132 args: &[VmValue],
133 ) -> Result<serde_json::Value, VmError> {
134 let Some(closure) = self.exported_functions.get(name) else {
135 return Err(VmError::Runtime(format!(
136 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
137 self.module_path.display()
138 )));
139 };
140
141 let mut vm = self.vm.child_vm_for_host();
142 let result = vm.call_closure_pub(closure, args).await?;
143 Ok(crate::llm::vm_value_to_json(&result))
144 }
145
146 async fn invoke_optional_export(
147 &self,
148 name: &str,
149 args: &[VmValue],
150 ) -> Result<Option<serde_json::Value>, VmError> {
151 if !self.exported_functions.contains_key(name) {
152 return Ok(None);
153 }
154 self.invoke_export(name, args).await.map(Some)
155 }
156
157 async fn request_permission(
158 &self,
159 params: serde_json::Value,
160 ) -> Result<serde_json::Value, VmError> {
161 let Some(closure) = self.exported_functions.get("request_permission") else {
162 return Ok(serde_json::json!({ "granted": true }));
163 };
164
165 let tool_name = params
166 .get("toolCall")
167 .and_then(|tool_call| tool_call.get("toolName"))
168 .and_then(|value| value.as_str())
169 .unwrap_or_default();
170 let tool_args = params
171 .get("toolCall")
172 .and_then(|tool_call| tool_call.get("rawInput"))
173 .map(json_result_to_vm_value)
174 .unwrap_or(VmValue::Nil);
175 let full_payload = json_result_to_vm_value(¶ms);
176
177 let arg_count = closure.func.params.len();
178 let args = if arg_count >= 3 {
179 vec![
180 VmValue::String(Rc::from(tool_name.to_string())),
181 tool_args,
182 full_payload,
183 ]
184 } else if arg_count == 2 {
185 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
186 } else if arg_count == 1 {
187 vec![full_payload]
188 } else {
189 Vec::new()
190 };
191
192 let mut vm = self.vm.child_vm_for_host();
193 let result = vm.call_closure_pub(closure, &args).await?;
194 let payload = match result {
195 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
196 VmValue::String(reason) if !reason.is_empty() => {
197 serde_json::json!({ "granted": false, "reason": reason.to_string() })
198 }
199 other => {
200 let json = crate::llm::vm_value_to_json(&other);
201 if json
202 .get("granted")
203 .and_then(|value| value.as_bool())
204 .is_some()
205 || json.get("outcome").is_some()
206 {
207 json
208 } else {
209 serde_json::json!({ "granted": other.is_truthy() })
210 }
211 }
212 };
213 Ok(payload)
214 }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
218pub enum QueuedUserMessageMode {
219 InterruptImmediate,
220 FinishStep,
221 WaitForCompletion,
222}
223
224#[derive(Clone, Copy, Debug, PartialEq, Eq)]
225pub enum DeliveryCheckpoint {
226 InterruptImmediate,
227 AfterCurrentOperation,
228 EndOfInteraction,
229}
230
231impl QueuedUserMessageMode {
232 fn from_str(value: &str) -> Self {
233 match value {
234 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
235 "finish_step" | "after_current_operation" => Self::FinishStep,
236 _ => Self::WaitForCompletion,
237 }
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
242pub struct QueuedUserMessage {
243 pub content: String,
244 pub mode: QueuedUserMessageMode,
245}
246
247#[allow(clippy::new_without_default)]
249impl HostBridge {
250 pub fn new() -> Self {
255 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
256 Arc::new(Mutex::new(HashMap::new()));
257 let cancelled = Arc::new(AtomicBool::new(false));
258 let cancel_notify = Arc::new(Notify::new());
259 let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
260 Arc::new(Mutex::new(VecDeque::new()));
261 let resume_requested = Arc::new(AtomicBool::new(false));
262 let skills_reload_requested = Arc::new(AtomicBool::new(false));
263 let daemon_idle = Arc::new(AtomicBool::new(false));
264
265 let pending_clone = pending.clone();
267 let cancelled_clone = cancelled.clone();
268 let cancel_notify_clone = cancel_notify.clone();
269 let queued_clone = queued_user_messages.clone();
270 let resume_clone = resume_requested.clone();
271 let skills_reload_clone = skills_reload_requested.clone();
272 tokio::task::spawn_local(async move {
273 let stdin = tokio::io::stdin();
274 let reader = tokio::io::BufReader::new(stdin);
275 let mut lines = reader.lines();
276
277 while let Ok(Some(line)) = lines.next_line().await {
278 let line = line.trim().to_string();
279 if line.is_empty() {
280 continue;
281 }
282
283 let msg: serde_json::Value = match serde_json::from_str(&line) {
284 Ok(v) => v,
285 Err(_) => continue,
286 };
287
288 if msg.get("id").is_none() {
290 if let Some(method) = msg["method"].as_str() {
291 if method == "cancel" {
292 cancelled_clone.store(true, Ordering::SeqCst);
293 cancel_notify_clone.notify_waiters();
294 } else if method == "agent/resume" {
295 resume_clone.store(true, Ordering::SeqCst);
296 } else if method == "skills/update" {
297 skills_reload_clone.store(true, Ordering::SeqCst);
298 } else if method == "user_message"
299 || method == "session/input"
300 || method == "agent/user_message"
301 {
302 let params = &msg["params"];
303 let content = params
304 .get("content")
305 .and_then(|v| v.as_str())
306 .unwrap_or("")
307 .to_string();
308 if !content.is_empty() {
309 let mode = QueuedUserMessageMode::from_str(
310 params
311 .get("mode")
312 .and_then(|v| v.as_str())
313 .unwrap_or("wait_for_completion"),
314 );
315 queued_clone
316 .lock()
317 .await
318 .push_back(QueuedUserMessage { content, mode });
319 }
320 }
321 }
322 continue;
323 }
324
325 if let Some(id) = msg["id"].as_u64() {
326 let mut pending = pending_clone.lock().await;
327 if let Some(sender) = pending.remove(&id) {
328 let _ = sender.send(msg);
329 }
330 }
331 }
332
333 let mut pending = pending_clone.lock().await;
335 pending.clear();
336 });
337
338 Self {
339 next_id: AtomicU64::new(1),
340 pending,
341 cancelled,
342 cancel_notify,
343 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
344 session_id: std::sync::Mutex::new(String::new()),
345 script_name: std::sync::Mutex::new(String::new()),
346 queued_user_messages,
347 resume_requested,
348 skills_reload_requested,
349 daemon_idle,
350 prompt_stop_reason: std::sync::Mutex::new(None),
351 visible_call_states: std::sync::Mutex::new(HashMap::new()),
352 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
353 in_process: None,
354 }
355 }
356
357 pub fn from_parts(
363 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
364 cancelled: Arc<AtomicBool>,
365 stdout_lock: Arc<std::sync::Mutex<()>>,
366 start_id: u64,
367 ) -> Self {
368 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
369 }
370
371 pub fn from_parts_with_writer(
372 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
373 cancelled: Arc<AtomicBool>,
374 writer: HostBridgeWriter,
375 start_id: u64,
376 ) -> Self {
377 Self {
378 next_id: AtomicU64::new(start_id),
379 pending,
380 cancelled,
381 cancel_notify: Arc::new(Notify::new()),
382 writer,
383 session_id: std::sync::Mutex::new(String::new()),
384 script_name: std::sync::Mutex::new(String::new()),
385 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
386 resume_requested: Arc::new(AtomicBool::new(false)),
387 skills_reload_requested: Arc::new(AtomicBool::new(false)),
388 daemon_idle: Arc::new(AtomicBool::new(false)),
389 prompt_stop_reason: std::sync::Mutex::new(None),
390 visible_call_states: std::sync::Mutex::new(HashMap::new()),
391 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
392 in_process: None,
393 }
394 }
395
396 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
399 let exported_functions = vm.load_module_exports(module_path).await?;
400 Ok(Self {
401 next_id: AtomicU64::new(1),
402 pending: Arc::new(Mutex::new(HashMap::new())),
403 cancelled: Arc::new(AtomicBool::new(false)),
404 cancel_notify: Arc::new(Notify::new()),
405 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
406 session_id: std::sync::Mutex::new(String::new()),
407 script_name: std::sync::Mutex::new(String::new()),
408 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
409 resume_requested: Arc::new(AtomicBool::new(false)),
410 skills_reload_requested: Arc::new(AtomicBool::new(false)),
411 daemon_idle: Arc::new(AtomicBool::new(false)),
412 prompt_stop_reason: std::sync::Mutex::new(None),
413 visible_call_states: std::sync::Mutex::new(HashMap::new()),
414 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
415 in_process: Some(InProcessHost {
416 module_path: module_path.to_path_buf(),
417 exported_functions,
418 vm,
419 }),
420 })
421 }
422
423 pub fn set_session_id(&self, id: &str) {
425 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
426 }
427
428 pub fn set_script_name(&self, name: &str) {
430 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
431 }
432
433 fn get_script_name(&self) -> String {
435 self.script_name
436 .lock()
437 .unwrap_or_else(|e| e.into_inner())
438 .clone()
439 }
440
441 pub fn get_session_id(&self) -> String {
443 self.session_id
444 .lock()
445 .unwrap_or_else(|e| e.into_inner())
446 .clone()
447 }
448
449 fn write_line(&self, line: &str) -> Result<(), VmError> {
451 (self.writer)(line).map_err(VmError::Runtime)
452 }
453
454 pub async fn call(
457 &self,
458 method: &str,
459 params: serde_json::Value,
460 ) -> Result<serde_json::Value, VmError> {
461 if let Some(in_process) = &self.in_process {
462 return in_process.dispatch(method, params).await;
463 }
464
465 if self.is_cancelled() {
466 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
467 }
468
469 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
470 let cancel_wait = self.cancel_notify.notified();
471 tokio::pin!(cancel_wait);
472
473 let request = crate::jsonrpc::request(id, method, params);
474
475 let (tx, rx) = oneshot::channel();
476 {
477 let mut pending = self.pending.lock().await;
478 pending.insert(id, tx);
479 }
480
481 let line = serde_json::to_string(&request)
482 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
483 if let Err(e) = self.write_line(&line) {
484 let mut pending = self.pending.lock().await;
485 pending.remove(&id);
486 return Err(e);
487 }
488
489 if self.is_cancelled() {
490 let mut pending = self.pending.lock().await;
491 pending.remove(&id);
492 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
493 }
494
495 let response = tokio::select! {
496 result = rx => match result {
497 Ok(msg) => msg,
498 Err(_) => {
499 return Err(VmError::Runtime(
501 "Bridge: host closed connection before responding".into(),
502 ));
503 }
504 },
505 _ = &mut cancel_wait => {
506 let mut pending = self.pending.lock().await;
507 pending.remove(&id);
508 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
509 }
510 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
511 let mut pending = self.pending.lock().await;
512 pending.remove(&id);
513 return Err(VmError::Runtime(format!(
514 "Bridge: host did not respond to '{method}' within {}s",
515 DEFAULT_TIMEOUT.as_secs()
516 )));
517 }
518 };
519
520 if let Some(error) = response.get("error") {
521 let message = error["message"].as_str().unwrap_or("Unknown host error");
522 let code = error["code"].as_i64().unwrap_or(-1);
523 if code == -32001 {
525 return Err(VmError::CategorizedError {
526 message: message.to_string(),
527 category: ErrorCategory::ToolRejected,
528 });
529 }
530 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
531 }
532
533 Ok(response["result"].clone())
534 }
535
536 pub fn notify(&self, method: &str, params: serde_json::Value) {
539 let notification = crate::jsonrpc::notification(method, params);
540 if self.in_process.is_some() {
541 return;
542 }
543 if let Ok(line) = serde_json::to_string(¬ification) {
544 let _ = self.write_line(&line);
545 }
546 }
547
548 pub fn is_cancelled(&self) -> bool {
550 self.cancelled.load(Ordering::SeqCst)
551 }
552
553 pub fn take_resume_signal(&self) -> bool {
554 self.resume_requested.swap(false, Ordering::SeqCst)
555 }
556
557 pub fn signal_resume(&self) {
558 self.resume_requested.store(true, Ordering::SeqCst);
559 }
560
561 pub fn set_daemon_idle(&self, idle: bool) {
562 self.daemon_idle.store(idle, Ordering::SeqCst);
563 }
564
565 pub fn is_daemon_idle(&self) -> bool {
566 self.daemon_idle.load(Ordering::SeqCst)
567 }
568
569 pub fn set_prompt_stop_reason(&self, reason: &str) {
574 *self
575 .prompt_stop_reason
576 .lock()
577 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
578 }
579
580 pub fn take_prompt_stop_reason(&self) -> Option<String> {
585 self.prompt_stop_reason
586 .lock()
587 .unwrap_or_else(|e| e.into_inner())
588 .take()
589 }
590
591 pub fn take_skills_reload_signal(&self) -> bool {
596 self.skills_reload_requested.swap(false, Ordering::SeqCst)
597 }
598
599 pub fn signal_skills_reload(&self) {
603 self.skills_reload_requested.store(true, Ordering::SeqCst);
604 }
605
606 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
612 let result = self.call("skills/list", serde_json::json!({})).await?;
613 match result {
614 serde_json::Value::Array(items) => Ok(items),
615 serde_json::Value::Object(map) => match map.get("skills") {
616 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
617 _ => Err(VmError::Runtime(
618 "skills/list: host response must be an array or { skills: [...] }".into(),
619 )),
620 },
621 _ => Err(VmError::Runtime(
622 "skills/list: unexpected response shape".into(),
623 )),
624 }
625 }
626
627 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
633 let result = self.call("host/tools/list", serde_json::json!({})).await?;
634 parse_host_tools_list_response(result)
635 }
636
637 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
641 self.call("skills/fetch", serde_json::json!({ "id": id }))
642 .await
643 }
644
645 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
646 self.queued_user_messages
647 .lock()
648 .await
649 .push_back(QueuedUserMessage {
650 content,
651 mode: QueuedUserMessageMode::from_str(mode),
652 });
653 }
654
655 pub async fn take_queued_user_messages(
656 &self,
657 include_interrupt_immediate: bool,
658 include_finish_step: bool,
659 include_wait_for_completion: bool,
660 ) -> Vec<QueuedUserMessage> {
661 let mut queue = self.queued_user_messages.lock().await;
662 let mut selected = Vec::new();
663 let mut retained = VecDeque::new();
664 while let Some(message) = queue.pop_front() {
665 let should_take = match message.mode {
666 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
667 QueuedUserMessageMode::FinishStep => include_finish_step,
668 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
669 };
670 if should_take {
671 selected.push(message);
672 } else {
673 retained.push_back(message);
674 }
675 }
676 *queue = retained;
677 selected
678 }
679
680 pub async fn take_queued_user_messages_for(
681 &self,
682 checkpoint: DeliveryCheckpoint,
683 ) -> Vec<QueuedUserMessage> {
684 match checkpoint {
685 DeliveryCheckpoint::InterruptImmediate => {
686 self.take_queued_user_messages(true, false, false).await
687 }
688 DeliveryCheckpoint::AfterCurrentOperation => {
689 self.take_queued_user_messages(false, true, false).await
690 }
691 DeliveryCheckpoint::EndOfInteraction => {
692 self.take_queued_user_messages(false, false, true).await
693 }
694 }
695 }
696
697 pub fn send_output(&self, text: &str) {
699 self.notify("output", serde_json::json!({"text": text}));
700 }
701
702 pub fn send_progress(
704 &self,
705 phase: &str,
706 message: &str,
707 progress: Option<i64>,
708 total: Option<i64>,
709 data: Option<serde_json::Value>,
710 ) {
711 let mut payload = serde_json::json!({"phase": phase, "message": message});
712 if let Some(p) = progress {
713 payload["progress"] = serde_json::json!(p);
714 }
715 if let Some(t) = total {
716 payload["total"] = serde_json::json!(t);
717 }
718 if let Some(d) = data {
719 payload["data"] = d;
720 }
721 self.notify("progress", payload);
722 }
723
724 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
726 let mut payload = serde_json::json!({"level": level, "message": message});
727 if let Some(f) = fields {
728 payload["fields"] = f;
729 }
730 self.notify("log", payload);
731 }
732
733 pub fn send_call_start(
736 &self,
737 call_id: &str,
738 call_type: &str,
739 name: &str,
740 metadata: serde_json::Value,
741 ) {
742 let session_id = self.get_session_id();
743 let script = self.get_script_name();
744 let stream_publicly = metadata
745 .get("stream_publicly")
746 .and_then(|value| value.as_bool())
747 .unwrap_or(true);
748 self.visible_call_streams
749 .lock()
750 .unwrap_or_else(|e| e.into_inner())
751 .insert(call_id.to_string(), stream_publicly);
752 self.notify(
753 "session/update",
754 serde_json::json!({
755 "sessionId": session_id,
756 "update": {
757 "sessionUpdate": "call_start",
758 "content": {
759 "toolCallId": call_id,
760 "call_type": call_type,
761 "name": name,
762 "script": script,
763 "metadata": metadata,
764 },
765 },
766 }),
767 );
768 }
769
770 pub fn send_call_progress(
773 &self,
774 call_id: &str,
775 delta: &str,
776 accumulated_tokens: u64,
777 user_visible: bool,
778 ) {
779 let session_id = self.get_session_id();
780 let (visible_text, visible_delta) = {
781 let stream_publicly = self
782 .visible_call_streams
783 .lock()
784 .unwrap_or_else(|e| e.into_inner())
785 .get(call_id)
786 .copied()
787 .unwrap_or(true);
788 let mut states = self
789 .visible_call_states
790 .lock()
791 .unwrap_or_else(|e| e.into_inner());
792 let state = states.entry(call_id.to_string()).or_default();
793 state.push(delta, stream_publicly)
794 };
795 self.notify(
796 "session/update",
797 serde_json::json!({
798 "sessionId": session_id,
799 "update": {
800 "sessionUpdate": "call_progress",
801 "content": {
802 "toolCallId": call_id,
803 "delta": delta,
804 "accumulated_tokens": accumulated_tokens,
805 "visible_text": visible_text,
806 "visible_delta": visible_delta,
807 "user_visible": user_visible,
808 },
809 },
810 }),
811 );
812 }
813
814 pub fn send_call_end(
816 &self,
817 call_id: &str,
818 call_type: &str,
819 name: &str,
820 duration_ms: u64,
821 status: &str,
822 metadata: serde_json::Value,
823 ) {
824 let session_id = self.get_session_id();
825 let script = self.get_script_name();
826 self.visible_call_states
827 .lock()
828 .unwrap_or_else(|e| e.into_inner())
829 .remove(call_id);
830 self.visible_call_streams
831 .lock()
832 .unwrap_or_else(|e| e.into_inner())
833 .remove(call_id);
834 self.notify(
835 "session/update",
836 serde_json::json!({
837 "sessionId": session_id,
838 "update": {
839 "sessionUpdate": "call_end",
840 "content": {
841 "toolCallId": call_id,
842 "call_type": call_type,
843 "name": name,
844 "script": script,
845 "duration_ms": duration_ms,
846 "status": status,
847 "metadata": metadata,
848 },
849 },
850 }),
851 );
852 }
853
854 pub fn send_worker_update(
856 &self,
857 worker_id: &str,
858 worker_name: &str,
859 status: &str,
860 metadata: serde_json::Value,
861 audit: Option<&MutationSessionRecord>,
862 ) {
863 let session_id = self.get_session_id();
864 let script = self.get_script_name();
865 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
866 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
867 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
868 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
869 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
870 let lifecycle = serde_json::json!({
871 "event": status,
872 "worker_id": worker_id,
873 "worker_name": worker_name,
874 "started_at": started_at,
875 "finished_at": finished_at,
876 });
877 self.notify(
878 "session/update",
879 serde_json::json!({
880 "sessionId": session_id,
881 "update": {
882 "sessionUpdate": "worker_update",
883 "content": {
884 "worker_id": worker_id,
885 "worker_name": worker_name,
886 "status": status,
887 "script": script,
888 "started_at": started_at,
889 "finished_at": finished_at,
890 "snapshot_path": snapshot_path,
891 "run_id": run_id,
892 "run_path": run_path,
893 "lifecycle": lifecycle,
894 "audit": audit,
895 "metadata": metadata,
896 },
897 },
898 }),
899 );
900 }
901}
902
903pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
905 crate::stdlib::json_to_vm_value(val)
906}
907
908fn parse_host_tools_list_response(
909 result: serde_json::Value,
910) -> Result<Vec<serde_json::Value>, VmError> {
911 let tools = match result {
912 serde_json::Value::Array(items) => items,
913 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
914 map.get("result")
915 .and_then(|value| value.get("tools"))
916 .cloned()
917 }) {
918 Some(serde_json::Value::Array(items)) => items,
919 _ => {
920 return Err(VmError::Runtime(
921 "host/tools/list: host response must be an array or { tools: [...] }".into(),
922 ));
923 }
924 },
925 _ => {
926 return Err(VmError::Runtime(
927 "host/tools/list: unexpected response shape".into(),
928 ));
929 }
930 };
931
932 let mut normalized = Vec::with_capacity(tools.len());
933 for tool in tools {
934 let serde_json::Value::Object(map) = tool else {
935 return Err(VmError::Runtime(
936 "host/tools/list: every tool must be an object".into(),
937 ));
938 };
939 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
940 return Err(VmError::Runtime(
941 "host/tools/list: every tool must include a string `name`".into(),
942 ));
943 };
944 let description = map
945 .get("description")
946 .and_then(|value| value.as_str())
947 .or_else(|| {
948 map.get("short_description")
949 .and_then(|value| value.as_str())
950 })
951 .unwrap_or_default();
952 let schema = map
953 .get("schema")
954 .cloned()
955 .or_else(|| map.get("parameters").cloned())
956 .or_else(|| map.get("input_schema").cloned())
957 .unwrap_or(serde_json::Value::Null);
958 let deprecated = map
959 .get("deprecated")
960 .and_then(|value| value.as_bool())
961 .unwrap_or(false);
962 normalized.push(serde_json::json!({
963 "name": name,
964 "description": description,
965 "schema": schema,
966 "deprecated": deprecated,
967 }));
968 }
969 Ok(normalized)
970}
971
972#[cfg(test)]
973mod tests {
974 use super::*;
975
976 #[test]
977 fn test_json_rpc_request_format() {
978 let request = crate::jsonrpc::request(
979 1,
980 "llm_call",
981 serde_json::json!({
982 "prompt": "Hello",
983 "system": "Be helpful",
984 }),
985 );
986 let s = serde_json::to_string(&request).unwrap();
987 assert!(s.contains("\"jsonrpc\":\"2.0\""));
988 assert!(s.contains("\"id\":1"));
989 assert!(s.contains("\"method\":\"llm_call\""));
990 }
991
992 #[test]
993 fn test_json_rpc_notification_format() {
994 let notification =
995 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
996 let s = serde_json::to_string(¬ification).unwrap();
997 assert!(s.contains("\"method\":\"output\""));
998 assert!(!s.contains("\"id\""));
999 }
1000
1001 #[test]
1002 fn test_json_rpc_error_response_parsing() {
1003 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1004 assert!(response.get("error").is_some());
1005 assert_eq!(
1006 response["error"]["message"].as_str().unwrap(),
1007 "Invalid request"
1008 );
1009 }
1010
1011 #[test]
1012 fn test_json_rpc_success_response_parsing() {
1013 let response = crate::jsonrpc::response(
1014 1,
1015 serde_json::json!({
1016 "text": "Hello world",
1017 "input_tokens": 10,
1018 "output_tokens": 5,
1019 }),
1020 );
1021 assert!(response.get("result").is_some());
1022 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1023 }
1024
1025 #[test]
1026 fn test_cancelled_flag() {
1027 let cancelled = Arc::new(AtomicBool::new(false));
1028 assert!(!cancelled.load(Ordering::SeqCst));
1029 cancelled.store(true, Ordering::SeqCst);
1030 assert!(cancelled.load(Ordering::SeqCst));
1031 }
1032
1033 #[test]
1034 fn pending_host_calls_return_when_cancellation_arrives() {
1035 let runtime = tokio::runtime::Builder::new_current_thread()
1036 .enable_all()
1037 .build()
1038 .unwrap();
1039 runtime.block_on(async {
1040 let pending = Arc::new(Mutex::new(HashMap::new()));
1041 let cancelled = Arc::new(AtomicBool::new(false));
1042 let bridge = HostBridge::from_parts_with_writer(
1043 pending.clone(),
1044 cancelled.clone(),
1045 Arc::new(|_| Ok(())),
1046 1,
1047 );
1048
1049 let call = bridge.call("host/work", serde_json::json!({}));
1050 tokio::pin!(call);
1051
1052 loop {
1053 tokio::select! {
1054 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1055 _ = tokio::task::yield_now() => {}
1056 }
1057 if !pending.lock().await.is_empty() {
1058 break;
1059 }
1060 }
1061
1062 cancelled.store(true, Ordering::SeqCst);
1063 bridge.cancel_notify.notify_waiters();
1064
1065 let result = tokio::time::timeout(Duration::from_secs(1), call)
1066 .await
1067 .expect("pending call should observe cancellation promptly");
1068 assert!(
1069 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1070 );
1071 assert!(pending.lock().await.is_empty());
1072 });
1073 }
1074
1075 #[test]
1076 fn queued_messages_are_filtered_by_delivery_mode() {
1077 let runtime = tokio::runtime::Builder::new_current_thread()
1078 .enable_all()
1079 .build()
1080 .unwrap();
1081 runtime.block_on(async {
1082 let bridge = HostBridge::from_parts(
1083 Arc::new(Mutex::new(HashMap::new())),
1084 Arc::new(AtomicBool::new(false)),
1085 Arc::new(std::sync::Mutex::new(())),
1086 1,
1087 );
1088 bridge
1089 .push_queued_user_message("first".to_string(), "finish_step")
1090 .await;
1091 bridge
1092 .push_queued_user_message("second".to_string(), "wait_for_completion")
1093 .await;
1094
1095 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1096 assert_eq!(finish_step.len(), 1);
1097 assert_eq!(finish_step[0].content, "first");
1098
1099 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1100 assert_eq!(turn_end.len(), 1);
1101 assert_eq!(turn_end[0].content, "second");
1102 });
1103 }
1104
1105 #[test]
1106 fn test_json_result_to_vm_value_string() {
1107 let val = serde_json::json!("hello");
1108 let vm_val = json_result_to_vm_value(&val);
1109 assert_eq!(vm_val.display(), "hello");
1110 }
1111
1112 #[test]
1113 fn test_json_result_to_vm_value_dict() {
1114 let val = serde_json::json!({"name": "test", "count": 42});
1115 let vm_val = json_result_to_vm_value(&val);
1116 let VmValue::Dict(d) = &vm_val else {
1117 unreachable!("Expected Dict, got {:?}", vm_val);
1118 };
1119 assert_eq!(d.get("name").unwrap().display(), "test");
1120 assert_eq!(d.get("count").unwrap().display(), "42");
1121 }
1122
1123 #[test]
1124 fn test_json_result_to_vm_value_null() {
1125 let val = serde_json::json!(null);
1126 let vm_val = json_result_to_vm_value(&val);
1127 assert!(matches!(vm_val, VmValue::Nil));
1128 }
1129
1130 #[test]
1131 fn test_json_result_to_vm_value_nested() {
1132 let val = serde_json::json!({
1133 "text": "response",
1134 "tool_calls": [
1135 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1136 ],
1137 "input_tokens": 100,
1138 "output_tokens": 50,
1139 });
1140 let vm_val = json_result_to_vm_value(&val);
1141 let VmValue::Dict(d) = &vm_val else {
1142 unreachable!("Expected Dict, got {:?}", vm_val);
1143 };
1144 assert_eq!(d.get("text").unwrap().display(), "response");
1145 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1146 unreachable!("Expected List for tool_calls");
1147 };
1148 assert_eq!(list.len(), 1);
1149 }
1150
1151 #[test]
1152 fn parse_host_tools_list_accepts_object_wrapper() {
1153 let tools = parse_host_tools_list_response(serde_json::json!({
1154 "tools": [
1155 {
1156 "name": "Read",
1157 "description": "Read a file",
1158 "schema": {"type": "object"},
1159 }
1160 ]
1161 }))
1162 .expect("tool list");
1163
1164 assert_eq!(tools.len(), 1);
1165 assert_eq!(tools[0]["name"], "Read");
1166 assert_eq!(tools[0]["deprecated"], false);
1167 }
1168
1169 #[test]
1170 fn parse_host_tools_list_accepts_compat_fields() {
1171 let tools = parse_host_tools_list_response(serde_json::json!({
1172 "result": {
1173 "tools": [
1174 {
1175 "name": "Edit",
1176 "short_description": "Apply an edit",
1177 "input_schema": {"type": "object"},
1178 "deprecated": true,
1179 }
1180 ]
1181 }
1182 }))
1183 .expect("tool list");
1184
1185 assert_eq!(tools[0]["description"], "Apply an edit");
1186 assert_eq!(tools[0]["schema"]["type"], "object");
1187 assert_eq!(tools[0]["deprecated"], true);
1188 }
1189
1190 #[test]
1191 fn parse_host_tools_list_requires_tool_names() {
1192 let err = parse_host_tools_list_response(serde_json::json!({
1193 "tools": [
1194 {"description": "missing name"}
1195 ]
1196 }))
1197 .expect_err("expected error");
1198 assert!(err
1199 .to_string()
1200 .contains("host/tools/list: every tool must include a string `name`"));
1201 }
1202
1203 #[test]
1204 fn test_timeout_duration() {
1205 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1206 }
1207}