1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex, Notify};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29 Arc::new(move |line: &str| {
30 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31 let mut stdout = std::io::stdout().lock();
32 stdout
33 .write_all(line.as_bytes())
34 .map_err(|e| format!("Bridge write error: {e}"))?;
35 stdout
36 .write_all(b"\n")
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .flush()
40 .map_err(|e| format!("Bridge flush error: {e}"))?;
41 Ok(())
42 })
43}
44
45pub struct HostBridge {
52 next_id: AtomicU64,
53 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55 cancelled: Arc<AtomicBool>,
57 cancel_notify: Arc<Notify>,
59 writer: HostBridgeWriter,
61 session_id: std::sync::Mutex<String>,
63 script_name: std::sync::Mutex<String>,
65 queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
67 resume_requested: Arc<AtomicBool>,
69 skills_reload_requested: Arc<AtomicBool>,
74 daemon_idle: Arc<AtomicBool>,
76 prompt_stop_reason: std::sync::Mutex<Option<String>>,
82 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
84 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
86 in_process: Option<InProcessHost>,
88}
89
90struct InProcessHost {
91 module_path: PathBuf,
92 exported_functions: BTreeMap<String, Rc<VmClosure>>,
93 vm: Vm,
94}
95
96impl InProcessHost {
97 async fn dispatch(
98 &self,
99 method: &str,
100 params: serde_json::Value,
101 ) -> Result<serde_json::Value, VmError> {
102 match method {
103 "builtin_call" => {
104 let name = params
105 .get("name")
106 .and_then(|value| value.as_str())
107 .unwrap_or_default();
108 let args = params
109 .get("args")
110 .and_then(|value| value.as_array())
111 .cloned()
112 .unwrap_or_default()
113 .into_iter()
114 .map(|value| json_result_to_vm_value(&value))
115 .collect::<Vec<_>>();
116 self.invoke_export(name, &args).await
117 }
118 "host/tools/list" => self
119 .invoke_optional_export("host_tools_list", &[])
120 .await
121 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
122 "session/request_permission" => self.request_permission(params).await,
123 other => Err(VmError::Runtime(format!(
124 "playground host backend does not implement bridge method '{other}'"
125 ))),
126 }
127 }
128
129 async fn invoke_export(
130 &self,
131 name: &str,
132 args: &[VmValue],
133 ) -> Result<serde_json::Value, VmError> {
134 let Some(closure) = self.exported_functions.get(name) else {
135 return Err(VmError::Runtime(format!(
136 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
137 self.module_path.display()
138 )));
139 };
140
141 let mut vm = self.vm.child_vm_for_host();
142 let result = vm.call_closure_pub(closure, args).await?;
143 Ok(crate::llm::vm_value_to_json(&result))
144 }
145
146 async fn invoke_optional_export(
147 &self,
148 name: &str,
149 args: &[VmValue],
150 ) -> Result<Option<serde_json::Value>, VmError> {
151 if !self.exported_functions.contains_key(name) {
152 return Ok(None);
153 }
154 self.invoke_export(name, args).await.map(Some)
155 }
156
157 async fn request_permission(
158 &self,
159 params: serde_json::Value,
160 ) -> Result<serde_json::Value, VmError> {
161 let Some(closure) = self.exported_functions.get("request_permission") else {
162 return Ok(serde_json::json!({ "granted": true }));
163 };
164
165 let tool_name = params
166 .get("toolCall")
167 .and_then(|tool_call| tool_call.get("toolName"))
168 .and_then(|value| value.as_str())
169 .unwrap_or_default();
170 let tool_args = params
171 .get("toolCall")
172 .and_then(|tool_call| tool_call.get("rawInput"))
173 .map(json_result_to_vm_value)
174 .unwrap_or(VmValue::Nil);
175 let full_payload = json_result_to_vm_value(¶ms);
176
177 let arg_count = closure.func.params.len();
178 let args = if arg_count >= 3 {
179 vec![
180 VmValue::String(Rc::from(tool_name.to_string())),
181 tool_args,
182 full_payload,
183 ]
184 } else if arg_count == 2 {
185 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
186 } else if arg_count == 1 {
187 vec![full_payload]
188 } else {
189 Vec::new()
190 };
191
192 let mut vm = self.vm.child_vm_for_host();
193 let result = vm.call_closure_pub(closure, &args).await?;
194 let payload = match result {
195 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
196 VmValue::String(reason) if !reason.is_empty() => {
197 serde_json::json!({ "granted": false, "reason": reason.to_string() })
198 }
199 other => {
200 let json = crate::llm::vm_value_to_json(&other);
201 if json
202 .get("granted")
203 .and_then(|value| value.as_bool())
204 .is_some()
205 || json.get("outcome").is_some()
206 {
207 json
208 } else {
209 serde_json::json!({ "granted": other.is_truthy() })
210 }
211 }
212 };
213 Ok(payload)
214 }
215}
216
217#[derive(Clone, Debug, PartialEq, Eq)]
218pub enum QueuedUserMessageMode {
219 InterruptImmediate,
220 FinishStep,
221 WaitForCompletion,
222}
223
224#[derive(Clone, Copy, Debug, PartialEq, Eq)]
225pub enum DeliveryCheckpoint {
226 InterruptImmediate,
227 AfterCurrentOperation,
228 EndOfInteraction,
229}
230
231impl QueuedUserMessageMode {
232 fn from_str(value: &str) -> Self {
233 match value {
234 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
235 "finish_step" | "after_current_operation" => Self::FinishStep,
236 _ => Self::WaitForCompletion,
237 }
238 }
239}
240
241#[derive(Clone, Debug, PartialEq, Eq)]
242pub struct QueuedUserMessage {
243 pub content: String,
244 pub mode: QueuedUserMessageMode,
245}
246
247#[allow(clippy::new_without_default)]
249impl HostBridge {
250 pub fn new() -> Self {
255 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
256 Arc::new(Mutex::new(HashMap::new()));
257 let cancelled = Arc::new(AtomicBool::new(false));
258 let cancel_notify = Arc::new(Notify::new());
259 let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
260 Arc::new(Mutex::new(VecDeque::new()));
261 let resume_requested = Arc::new(AtomicBool::new(false));
262 let skills_reload_requested = Arc::new(AtomicBool::new(false));
263 let daemon_idle = Arc::new(AtomicBool::new(false));
264
265 let pending_clone = pending.clone();
267 let cancelled_clone = cancelled.clone();
268 let cancel_notify_clone = cancel_notify.clone();
269 let queued_clone = queued_user_messages.clone();
270 let resume_clone = resume_requested.clone();
271 let skills_reload_clone = skills_reload_requested.clone();
272 tokio::task::spawn_local(async move {
273 let stdin = tokio::io::stdin();
274 let reader = tokio::io::BufReader::new(stdin);
275 let mut lines = reader.lines();
276
277 while let Ok(Some(line)) = lines.next_line().await {
278 let line = line.trim().to_string();
279 if line.is_empty() {
280 continue;
281 }
282
283 let msg: serde_json::Value = match serde_json::from_str(&line) {
284 Ok(v) => v,
285 Err(_) => continue,
286 };
287
288 if msg.get("id").is_none() {
290 if let Some(method) = msg["method"].as_str() {
291 if method == "cancel" {
292 cancelled_clone.store(true, Ordering::SeqCst);
293 cancel_notify_clone.notify_waiters();
294 } else if method == "agent/resume" {
295 resume_clone.store(true, Ordering::SeqCst);
296 } else if method == "skills/update" {
297 skills_reload_clone.store(true, Ordering::SeqCst);
298 } else if method == "user_message"
299 || method == "session/input"
300 || method == "agent/user_message"
301 {
302 let params = &msg["params"];
303 let content = params
304 .get("content")
305 .and_then(|v| v.as_str())
306 .unwrap_or("")
307 .to_string();
308 if !content.is_empty() {
309 let mode = QueuedUserMessageMode::from_str(
310 params
311 .get("mode")
312 .and_then(|v| v.as_str())
313 .unwrap_or("wait_for_completion"),
314 );
315 queued_clone
316 .lock()
317 .await
318 .push_back(QueuedUserMessage { content, mode });
319 }
320 }
321 }
322 continue;
323 }
324
325 if let Some(id) = msg["id"].as_u64() {
326 let mut pending = pending_clone.lock().await;
327 if let Some(sender) = pending.remove(&id) {
328 let _ = sender.send(msg);
329 }
330 }
331 }
332
333 let mut pending = pending_clone.lock().await;
335 pending.clear();
336 });
337
338 Self {
339 next_id: AtomicU64::new(1),
340 pending,
341 cancelled,
342 cancel_notify,
343 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
344 session_id: std::sync::Mutex::new(String::new()),
345 script_name: std::sync::Mutex::new(String::new()),
346 queued_user_messages,
347 resume_requested,
348 skills_reload_requested,
349 daemon_idle,
350 prompt_stop_reason: std::sync::Mutex::new(None),
351 visible_call_states: std::sync::Mutex::new(HashMap::new()),
352 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
353 in_process: None,
354 }
355 }
356
357 pub fn from_parts(
363 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
364 cancelled: Arc<AtomicBool>,
365 stdout_lock: Arc<std::sync::Mutex<()>>,
366 start_id: u64,
367 ) -> Self {
368 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
369 }
370
371 pub fn from_parts_with_writer(
372 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
373 cancelled: Arc<AtomicBool>,
374 writer: HostBridgeWriter,
375 start_id: u64,
376 ) -> Self {
377 Self::from_parts_with_writer_and_cancel_notify(
378 pending,
379 cancelled,
380 Arc::new(Notify::new()),
381 writer,
382 start_id,
383 )
384 }
385
386 pub fn from_parts_with_writer_and_cancel_notify(
387 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
388 cancelled: Arc<AtomicBool>,
389 cancel_notify: Arc<Notify>,
390 writer: HostBridgeWriter,
391 start_id: u64,
392 ) -> Self {
393 Self {
394 next_id: AtomicU64::new(start_id),
395 pending,
396 cancelled,
397 cancel_notify,
398 writer,
399 session_id: std::sync::Mutex::new(String::new()),
400 script_name: std::sync::Mutex::new(String::new()),
401 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
402 resume_requested: Arc::new(AtomicBool::new(false)),
403 skills_reload_requested: Arc::new(AtomicBool::new(false)),
404 daemon_idle: Arc::new(AtomicBool::new(false)),
405 prompt_stop_reason: std::sync::Mutex::new(None),
406 visible_call_states: std::sync::Mutex::new(HashMap::new()),
407 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
408 in_process: None,
409 }
410 }
411
412 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
415 let exported_functions = vm.load_module_exports(module_path).await?;
416 Ok(Self {
417 next_id: AtomicU64::new(1),
418 pending: Arc::new(Mutex::new(HashMap::new())),
419 cancelled: Arc::new(AtomicBool::new(false)),
420 cancel_notify: Arc::new(Notify::new()),
421 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
422 session_id: std::sync::Mutex::new(String::new()),
423 script_name: std::sync::Mutex::new(String::new()),
424 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
425 resume_requested: Arc::new(AtomicBool::new(false)),
426 skills_reload_requested: Arc::new(AtomicBool::new(false)),
427 daemon_idle: Arc::new(AtomicBool::new(false)),
428 prompt_stop_reason: std::sync::Mutex::new(None),
429 visible_call_states: std::sync::Mutex::new(HashMap::new()),
430 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
431 in_process: Some(InProcessHost {
432 module_path: module_path.to_path_buf(),
433 exported_functions,
434 vm,
435 }),
436 })
437 }
438
439 pub fn set_session_id(&self, id: &str) {
441 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
442 }
443
444 pub fn set_script_name(&self, name: &str) {
446 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
447 }
448
449 fn get_script_name(&self) -> String {
451 self.script_name
452 .lock()
453 .unwrap_or_else(|e| e.into_inner())
454 .clone()
455 }
456
457 pub fn get_session_id(&self) -> String {
459 self.session_id
460 .lock()
461 .unwrap_or_else(|e| e.into_inner())
462 .clone()
463 }
464
465 fn write_line(&self, line: &str) -> Result<(), VmError> {
467 (self.writer)(line).map_err(VmError::Runtime)
468 }
469
470 pub async fn call(
473 &self,
474 method: &str,
475 params: serde_json::Value,
476 ) -> Result<serde_json::Value, VmError> {
477 if let Some(in_process) = &self.in_process {
478 return in_process.dispatch(method, params).await;
479 }
480
481 if self.is_cancelled() {
482 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
483 }
484
485 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
486 let cancel_wait = self.cancel_notify.notified();
487 tokio::pin!(cancel_wait);
488
489 let request = crate::jsonrpc::request(id, method, params);
490
491 let (tx, rx) = oneshot::channel();
492 {
493 let mut pending = self.pending.lock().await;
494 pending.insert(id, tx);
495 }
496
497 let line = serde_json::to_string(&request)
498 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
499 if let Err(e) = self.write_line(&line) {
500 let mut pending = self.pending.lock().await;
501 pending.remove(&id);
502 return Err(e);
503 }
504
505 if self.is_cancelled() {
506 let mut pending = self.pending.lock().await;
507 pending.remove(&id);
508 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
509 }
510
511 let response = tokio::select! {
512 result = rx => match result {
513 Ok(msg) => msg,
514 Err(_) => {
515 return Err(VmError::Runtime(
517 "Bridge: host closed connection before responding".into(),
518 ));
519 }
520 },
521 _ = &mut cancel_wait => {
522 let mut pending = self.pending.lock().await;
523 pending.remove(&id);
524 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
525 }
526 _ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
527 let mut pending = self.pending.lock().await;
528 pending.remove(&id);
529 return Err(VmError::Runtime(format!(
530 "Bridge: host did not respond to '{method}' within {}s",
531 DEFAULT_TIMEOUT.as_secs()
532 )));
533 }
534 };
535
536 if let Some(error) = response.get("error") {
537 let message = error["message"].as_str().unwrap_or("Unknown host error");
538 let code = error["code"].as_i64().unwrap_or(-1);
539 if code == -32001 {
541 return Err(VmError::CategorizedError {
542 message: message.to_string(),
543 category: ErrorCategory::ToolRejected,
544 });
545 }
546 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
547 }
548
549 Ok(response["result"].clone())
550 }
551
552 pub fn notify(&self, method: &str, params: serde_json::Value) {
555 let notification = crate::jsonrpc::notification(method, params);
556 if self.in_process.is_some() {
557 return;
558 }
559 if let Ok(line) = serde_json::to_string(¬ification) {
560 let _ = self.write_line(&line);
561 }
562 }
563
564 pub fn is_cancelled(&self) -> bool {
566 self.cancelled.load(Ordering::SeqCst)
567 }
568
569 pub fn take_resume_signal(&self) -> bool {
570 self.resume_requested.swap(false, Ordering::SeqCst)
571 }
572
573 pub fn signal_resume(&self) {
574 self.resume_requested.store(true, Ordering::SeqCst);
575 }
576
577 pub fn set_daemon_idle(&self, idle: bool) {
578 self.daemon_idle.store(idle, Ordering::SeqCst);
579 }
580
581 pub fn is_daemon_idle(&self) -> bool {
582 self.daemon_idle.load(Ordering::SeqCst)
583 }
584
585 pub fn set_prompt_stop_reason(&self, reason: &str) {
590 *self
591 .prompt_stop_reason
592 .lock()
593 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
594 }
595
596 pub fn take_prompt_stop_reason(&self) -> Option<String> {
601 self.prompt_stop_reason
602 .lock()
603 .unwrap_or_else(|e| e.into_inner())
604 .take()
605 }
606
607 pub fn take_skills_reload_signal(&self) -> bool {
612 self.skills_reload_requested.swap(false, Ordering::SeqCst)
613 }
614
615 pub fn signal_skills_reload(&self) {
619 self.skills_reload_requested.store(true, Ordering::SeqCst);
620 }
621
622 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
628 let result = self.call("skills/list", serde_json::json!({})).await?;
629 match result {
630 serde_json::Value::Array(items) => Ok(items),
631 serde_json::Value::Object(map) => match map.get("skills") {
632 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
633 _ => Err(VmError::Runtime(
634 "skills/list: host response must be an array or { skills: [...] }".into(),
635 )),
636 },
637 _ => Err(VmError::Runtime(
638 "skills/list: unexpected response shape".into(),
639 )),
640 }
641 }
642
643 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
649 let result = self.call("host/tools/list", serde_json::json!({})).await?;
650 parse_host_tools_list_response(result)
651 }
652
653 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
657 self.call("skills/fetch", serde_json::json!({ "id": id }))
658 .await
659 }
660
661 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
662 self.queued_user_messages
663 .lock()
664 .await
665 .push_back(QueuedUserMessage {
666 content,
667 mode: QueuedUserMessageMode::from_str(mode),
668 });
669 }
670
671 pub async fn take_queued_user_messages(
672 &self,
673 include_interrupt_immediate: bool,
674 include_finish_step: bool,
675 include_wait_for_completion: bool,
676 ) -> Vec<QueuedUserMessage> {
677 let mut queue = self.queued_user_messages.lock().await;
678 let mut selected = Vec::new();
679 let mut retained = VecDeque::new();
680 while let Some(message) = queue.pop_front() {
681 let should_take = match message.mode {
682 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
683 QueuedUserMessageMode::FinishStep => include_finish_step,
684 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
685 };
686 if should_take {
687 selected.push(message);
688 } else {
689 retained.push_back(message);
690 }
691 }
692 *queue = retained;
693 selected
694 }
695
696 pub async fn take_queued_user_messages_for(
697 &self,
698 checkpoint: DeliveryCheckpoint,
699 ) -> Vec<QueuedUserMessage> {
700 match checkpoint {
701 DeliveryCheckpoint::InterruptImmediate => {
702 self.take_queued_user_messages(true, false, false).await
703 }
704 DeliveryCheckpoint::AfterCurrentOperation => {
705 self.take_queued_user_messages(false, true, false).await
706 }
707 DeliveryCheckpoint::EndOfInteraction => {
708 self.take_queued_user_messages(false, false, true).await
709 }
710 }
711 }
712
713 pub fn send_output(&self, text: &str) {
715 self.notify("output", serde_json::json!({"text": text}));
716 }
717
718 pub fn send_progress(
720 &self,
721 phase: &str,
722 message: &str,
723 progress: Option<i64>,
724 total: Option<i64>,
725 data: Option<serde_json::Value>,
726 ) {
727 let mut payload = serde_json::json!({"phase": phase, "message": message});
728 if let Some(p) = progress {
729 payload["progress"] = serde_json::json!(p);
730 }
731 if let Some(t) = total {
732 payload["total"] = serde_json::json!(t);
733 }
734 if let Some(d) = data {
735 payload["data"] = d;
736 }
737 self.notify("progress", payload);
738 }
739
740 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
742 let mut payload = serde_json::json!({"level": level, "message": message});
743 if let Some(f) = fields {
744 payload["fields"] = f;
745 }
746 self.notify("log", payload);
747 }
748
749 pub fn send_call_start(
752 &self,
753 call_id: &str,
754 call_type: &str,
755 name: &str,
756 metadata: serde_json::Value,
757 ) {
758 let session_id = self.get_session_id();
759 let script = self.get_script_name();
760 let stream_publicly = metadata
761 .get("stream_publicly")
762 .and_then(|value| value.as_bool())
763 .unwrap_or(true);
764 self.visible_call_streams
765 .lock()
766 .unwrap_or_else(|e| e.into_inner())
767 .insert(call_id.to_string(), stream_publicly);
768 self.notify(
769 "session/update",
770 serde_json::json!({
771 "sessionId": session_id,
772 "update": {
773 "sessionUpdate": "call_start",
774 "content": {
775 "toolCallId": call_id,
776 "call_type": call_type,
777 "name": name,
778 "script": script,
779 "metadata": metadata,
780 },
781 },
782 }),
783 );
784 }
785
786 pub fn send_call_progress(
789 &self,
790 call_id: &str,
791 delta: &str,
792 accumulated_tokens: u64,
793 user_visible: bool,
794 ) {
795 let session_id = self.get_session_id();
796 let (visible_text, visible_delta) = {
797 let stream_publicly = self
798 .visible_call_streams
799 .lock()
800 .unwrap_or_else(|e| e.into_inner())
801 .get(call_id)
802 .copied()
803 .unwrap_or(true);
804 let mut states = self
805 .visible_call_states
806 .lock()
807 .unwrap_or_else(|e| e.into_inner());
808 let state = states.entry(call_id.to_string()).or_default();
809 state.push(delta, stream_publicly)
810 };
811 self.notify(
812 "session/update",
813 serde_json::json!({
814 "sessionId": session_id,
815 "update": {
816 "sessionUpdate": "call_progress",
817 "content": {
818 "toolCallId": call_id,
819 "delta": delta,
820 "accumulated_tokens": accumulated_tokens,
821 "visible_text": visible_text,
822 "visible_delta": visible_delta,
823 "user_visible": user_visible,
824 },
825 },
826 }),
827 );
828 }
829
830 pub fn send_call_end(
832 &self,
833 call_id: &str,
834 call_type: &str,
835 name: &str,
836 duration_ms: u64,
837 status: &str,
838 metadata: serde_json::Value,
839 ) {
840 let session_id = self.get_session_id();
841 let script = self.get_script_name();
842 self.visible_call_states
843 .lock()
844 .unwrap_or_else(|e| e.into_inner())
845 .remove(call_id);
846 self.visible_call_streams
847 .lock()
848 .unwrap_or_else(|e| e.into_inner())
849 .remove(call_id);
850 self.notify(
851 "session/update",
852 serde_json::json!({
853 "sessionId": session_id,
854 "update": {
855 "sessionUpdate": "call_end",
856 "content": {
857 "toolCallId": call_id,
858 "call_type": call_type,
859 "name": name,
860 "script": script,
861 "duration_ms": duration_ms,
862 "status": status,
863 "metadata": metadata,
864 },
865 },
866 }),
867 );
868 }
869
870 pub fn send_worker_update(
872 &self,
873 worker_id: &str,
874 worker_name: &str,
875 status: &str,
876 metadata: serde_json::Value,
877 audit: Option<&MutationSessionRecord>,
878 ) {
879 let session_id = self.get_session_id();
880 let script = self.get_script_name();
881 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
882 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
883 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
884 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
885 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
886 let lifecycle = serde_json::json!({
887 "event": status,
888 "worker_id": worker_id,
889 "worker_name": worker_name,
890 "started_at": started_at,
891 "finished_at": finished_at,
892 });
893 self.notify(
894 "session/update",
895 serde_json::json!({
896 "sessionId": session_id,
897 "update": {
898 "sessionUpdate": "worker_update",
899 "content": {
900 "worker_id": worker_id,
901 "worker_name": worker_name,
902 "status": status,
903 "script": script,
904 "started_at": started_at,
905 "finished_at": finished_at,
906 "snapshot_path": snapshot_path,
907 "run_id": run_id,
908 "run_path": run_path,
909 "lifecycle": lifecycle,
910 "audit": audit,
911 "metadata": metadata,
912 },
913 },
914 }),
915 );
916 }
917}
918
919pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
921 crate::stdlib::json_to_vm_value(val)
922}
923
924fn parse_host_tools_list_response(
925 result: serde_json::Value,
926) -> Result<Vec<serde_json::Value>, VmError> {
927 let tools = match result {
928 serde_json::Value::Array(items) => items,
929 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
930 map.get("result")
931 .and_then(|value| value.get("tools"))
932 .cloned()
933 }) {
934 Some(serde_json::Value::Array(items)) => items,
935 _ => {
936 return Err(VmError::Runtime(
937 "host/tools/list: host response must be an array or { tools: [...] }".into(),
938 ));
939 }
940 },
941 _ => {
942 return Err(VmError::Runtime(
943 "host/tools/list: unexpected response shape".into(),
944 ));
945 }
946 };
947
948 let mut normalized = Vec::with_capacity(tools.len());
949 for tool in tools {
950 let serde_json::Value::Object(map) = tool else {
951 return Err(VmError::Runtime(
952 "host/tools/list: every tool must be an object".into(),
953 ));
954 };
955 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
956 return Err(VmError::Runtime(
957 "host/tools/list: every tool must include a string `name`".into(),
958 ));
959 };
960 let description = map
961 .get("description")
962 .and_then(|value| value.as_str())
963 .or_else(|| {
964 map.get("short_description")
965 .and_then(|value| value.as_str())
966 })
967 .unwrap_or_default();
968 let schema = map
969 .get("schema")
970 .cloned()
971 .or_else(|| map.get("parameters").cloned())
972 .or_else(|| map.get("input_schema").cloned())
973 .unwrap_or(serde_json::Value::Null);
974 let deprecated = map
975 .get("deprecated")
976 .and_then(|value| value.as_bool())
977 .unwrap_or(false);
978 normalized.push(serde_json::json!({
979 "name": name,
980 "description": description,
981 "schema": schema,
982 "deprecated": deprecated,
983 }));
984 }
985 Ok(normalized)
986}
987
988#[cfg(test)]
989mod tests {
990 use super::*;
991
992 #[test]
993 fn test_json_rpc_request_format() {
994 let request = crate::jsonrpc::request(
995 1,
996 "llm_call",
997 serde_json::json!({
998 "prompt": "Hello",
999 "system": "Be helpful",
1000 }),
1001 );
1002 let s = serde_json::to_string(&request).unwrap();
1003 assert!(s.contains("\"jsonrpc\":\"2.0\""));
1004 assert!(s.contains("\"id\":1"));
1005 assert!(s.contains("\"method\":\"llm_call\""));
1006 }
1007
1008 #[test]
1009 fn test_json_rpc_notification_format() {
1010 let notification =
1011 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
1012 let s = serde_json::to_string(¬ification).unwrap();
1013 assert!(s.contains("\"method\":\"output\""));
1014 assert!(!s.contains("\"id\""));
1015 }
1016
1017 #[test]
1018 fn test_json_rpc_error_response_parsing() {
1019 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
1020 assert!(response.get("error").is_some());
1021 assert_eq!(
1022 response["error"]["message"].as_str().unwrap(),
1023 "Invalid request"
1024 );
1025 }
1026
1027 #[test]
1028 fn test_json_rpc_success_response_parsing() {
1029 let response = crate::jsonrpc::response(
1030 1,
1031 serde_json::json!({
1032 "text": "Hello world",
1033 "input_tokens": 10,
1034 "output_tokens": 5,
1035 }),
1036 );
1037 assert!(response.get("result").is_some());
1038 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1039 }
1040
1041 #[test]
1042 fn test_cancelled_flag() {
1043 let cancelled = Arc::new(AtomicBool::new(false));
1044 assert!(!cancelled.load(Ordering::SeqCst));
1045 cancelled.store(true, Ordering::SeqCst);
1046 assert!(cancelled.load(Ordering::SeqCst));
1047 }
1048
1049 #[test]
1050 fn pending_host_calls_return_when_cancellation_arrives() {
1051 let runtime = tokio::runtime::Builder::new_current_thread()
1052 .enable_all()
1053 .build()
1054 .unwrap();
1055 runtime.block_on(async {
1056 let pending = Arc::new(Mutex::new(HashMap::new()));
1057 let cancelled = Arc::new(AtomicBool::new(false));
1058 let bridge = HostBridge::from_parts_with_writer(
1059 pending.clone(),
1060 cancelled.clone(),
1061 Arc::new(|_| Ok(())),
1062 1,
1063 );
1064
1065 let call = bridge.call("host/work", serde_json::json!({}));
1066 tokio::pin!(call);
1067
1068 loop {
1069 tokio::select! {
1070 result = &mut call => panic!("call completed before cancellation: {result:?}"),
1071 _ = tokio::task::yield_now() => {}
1072 }
1073 if !pending.lock().await.is_empty() {
1074 break;
1075 }
1076 }
1077
1078 cancelled.store(true, Ordering::SeqCst);
1079 bridge.cancel_notify.notify_waiters();
1080
1081 let result = tokio::time::timeout(Duration::from_secs(1), call)
1082 .await
1083 .expect("pending call should observe cancellation promptly");
1084 assert!(
1085 matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
1086 );
1087 assert!(pending.lock().await.is_empty());
1088 });
1089 }
1090
1091 #[test]
1092 fn queued_messages_are_filtered_by_delivery_mode() {
1093 let runtime = tokio::runtime::Builder::new_current_thread()
1094 .enable_all()
1095 .build()
1096 .unwrap();
1097 runtime.block_on(async {
1098 let bridge = HostBridge::from_parts(
1099 Arc::new(Mutex::new(HashMap::new())),
1100 Arc::new(AtomicBool::new(false)),
1101 Arc::new(std::sync::Mutex::new(())),
1102 1,
1103 );
1104 bridge
1105 .push_queued_user_message("first".to_string(), "finish_step")
1106 .await;
1107 bridge
1108 .push_queued_user_message("second".to_string(), "wait_for_completion")
1109 .await;
1110
1111 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1112 assert_eq!(finish_step.len(), 1);
1113 assert_eq!(finish_step[0].content, "first");
1114
1115 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1116 assert_eq!(turn_end.len(), 1);
1117 assert_eq!(turn_end[0].content, "second");
1118 });
1119 }
1120
1121 #[test]
1122 fn test_json_result_to_vm_value_string() {
1123 let val = serde_json::json!("hello");
1124 let vm_val = json_result_to_vm_value(&val);
1125 assert_eq!(vm_val.display(), "hello");
1126 }
1127
1128 #[test]
1129 fn test_json_result_to_vm_value_dict() {
1130 let val = serde_json::json!({"name": "test", "count": 42});
1131 let vm_val = json_result_to_vm_value(&val);
1132 let VmValue::Dict(d) = &vm_val else {
1133 unreachable!("Expected Dict, got {:?}", vm_val);
1134 };
1135 assert_eq!(d.get("name").unwrap().display(), "test");
1136 assert_eq!(d.get("count").unwrap().display(), "42");
1137 }
1138
1139 #[test]
1140 fn test_json_result_to_vm_value_null() {
1141 let val = serde_json::json!(null);
1142 let vm_val = json_result_to_vm_value(&val);
1143 assert!(matches!(vm_val, VmValue::Nil));
1144 }
1145
1146 #[test]
1147 fn test_json_result_to_vm_value_nested() {
1148 let val = serde_json::json!({
1149 "text": "response",
1150 "tool_calls": [
1151 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1152 ],
1153 "input_tokens": 100,
1154 "output_tokens": 50,
1155 });
1156 let vm_val = json_result_to_vm_value(&val);
1157 let VmValue::Dict(d) = &vm_val else {
1158 unreachable!("Expected Dict, got {:?}", vm_val);
1159 };
1160 assert_eq!(d.get("text").unwrap().display(), "response");
1161 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1162 unreachable!("Expected List for tool_calls");
1163 };
1164 assert_eq!(list.len(), 1);
1165 }
1166
1167 #[test]
1168 fn parse_host_tools_list_accepts_object_wrapper() {
1169 let tools = parse_host_tools_list_response(serde_json::json!({
1170 "tools": [
1171 {
1172 "name": "Read",
1173 "description": "Read a file",
1174 "schema": {"type": "object"},
1175 }
1176 ]
1177 }))
1178 .expect("tool list");
1179
1180 assert_eq!(tools.len(), 1);
1181 assert_eq!(tools[0]["name"], "Read");
1182 assert_eq!(tools[0]["deprecated"], false);
1183 }
1184
1185 #[test]
1186 fn parse_host_tools_list_accepts_compat_fields() {
1187 let tools = parse_host_tools_list_response(serde_json::json!({
1188 "result": {
1189 "tools": [
1190 {
1191 "name": "Edit",
1192 "short_description": "Apply an edit",
1193 "input_schema": {"type": "object"},
1194 "deprecated": true,
1195 }
1196 ]
1197 }
1198 }))
1199 .expect("tool list");
1200
1201 assert_eq!(tools[0]["description"], "Apply an edit");
1202 assert_eq!(tools[0]["schema"]["type"], "object");
1203 assert_eq!(tools[0]["deprecated"], true);
1204 }
1205
1206 #[test]
1207 fn parse_host_tools_list_requires_tool_names() {
1208 let err = parse_host_tools_list_response(serde_json::json!({
1209 "tools": [
1210 {"description": "missing name"}
1211 ]
1212 }))
1213 .expect_err("expected error");
1214 assert!(err
1215 .to_string()
1216 .contains("host/tools/list: every tool must include a string `name`"));
1217 }
1218
1219 #[test]
1220 fn test_timeout_duration() {
1221 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1222 }
1223}