1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub struct HostBridge {
33 next_id: AtomicU64,
34 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
36 cancelled: Arc<AtomicBool>,
38 stdout_lock: Arc<std::sync::Mutex<()>>,
40 session_id: std::sync::Mutex<String>,
42 script_name: std::sync::Mutex<String>,
44 queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
46 resume_requested: Arc<AtomicBool>,
48 skills_reload_requested: Arc<AtomicBool>,
53 daemon_idle: Arc<AtomicBool>,
55 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
57 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
59 in_process: Option<InProcessHost>,
61 #[cfg(test)]
62 recorded_notifications: Arc<std::sync::Mutex<Vec<serde_json::Value>>>,
63}
64
65struct InProcessHost {
66 module_path: PathBuf,
67 exported_functions: BTreeMap<String, Rc<VmClosure>>,
68 vm: Vm,
69}
70
71impl InProcessHost {
72 async fn dispatch(
73 &self,
74 method: &str,
75 params: serde_json::Value,
76 ) -> Result<serde_json::Value, VmError> {
77 match method {
78 "builtin_call" => {
79 let name = params
80 .get("name")
81 .and_then(|value| value.as_str())
82 .unwrap_or_default();
83 let args = params
84 .get("args")
85 .and_then(|value| value.as_array())
86 .cloned()
87 .unwrap_or_default()
88 .into_iter()
89 .map(|value| json_result_to_vm_value(&value))
90 .collect::<Vec<_>>();
91 self.invoke_export(name, &args).await
92 }
93 "host/tools/list" => self
94 .invoke_optional_export("host_tools_list", &[])
95 .await
96 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
97 "session/request_permission" => self.request_permission(params).await,
98 other => Err(VmError::Runtime(format!(
99 "playground host backend does not implement bridge method '{other}'"
100 ))),
101 }
102 }
103
104 async fn invoke_export(
105 &self,
106 name: &str,
107 args: &[VmValue],
108 ) -> Result<serde_json::Value, VmError> {
109 let Some(closure) = self.exported_functions.get(name) else {
110 return Err(VmError::Runtime(format!(
111 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
112 self.module_path.display()
113 )));
114 };
115
116 let mut vm = self.vm.child_vm_for_host();
117 let result = vm.call_closure_pub(closure, args, &[]).await?;
118 Ok(crate::llm::vm_value_to_json(&result))
119 }
120
121 async fn invoke_optional_export(
122 &self,
123 name: &str,
124 args: &[VmValue],
125 ) -> Result<Option<serde_json::Value>, VmError> {
126 if !self.exported_functions.contains_key(name) {
127 return Ok(None);
128 }
129 self.invoke_export(name, args).await.map(Some)
130 }
131
132 async fn request_permission(
133 &self,
134 params: serde_json::Value,
135 ) -> Result<serde_json::Value, VmError> {
136 let Some(closure) = self.exported_functions.get("request_permission") else {
137 return Ok(serde_json::json!({ "granted": true }));
138 };
139
140 let tool_name = params
141 .get("toolCall")
142 .and_then(|tool_call| tool_call.get("toolName"))
143 .and_then(|value| value.as_str())
144 .unwrap_or_default();
145 let tool_args = params
146 .get("toolCall")
147 .and_then(|tool_call| tool_call.get("rawInput"))
148 .map(json_result_to_vm_value)
149 .unwrap_or(VmValue::Nil);
150 let full_payload = json_result_to_vm_value(¶ms);
151
152 let arg_count = closure.func.params.len();
153 let args = if arg_count >= 3 {
154 vec![
155 VmValue::String(Rc::from(tool_name.to_string())),
156 tool_args,
157 full_payload,
158 ]
159 } else if arg_count == 2 {
160 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
161 } else if arg_count == 1 {
162 vec![full_payload]
163 } else {
164 Vec::new()
165 };
166
167 let mut vm = self.vm.child_vm_for_host();
168 let result = vm.call_closure_pub(closure, &args, &[]).await?;
169 let payload = match result {
170 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
171 VmValue::String(reason) if !reason.is_empty() => {
172 serde_json::json!({ "granted": false, "reason": reason.to_string() })
173 }
174 other => {
175 let json = crate::llm::vm_value_to_json(&other);
176 if json
177 .get("granted")
178 .and_then(|value| value.as_bool())
179 .is_some()
180 || json.get("outcome").is_some()
181 {
182 json
183 } else {
184 serde_json::json!({ "granted": other.is_truthy() })
185 }
186 }
187 };
188 Ok(payload)
189 }
190}
191
192#[derive(Clone, Debug, PartialEq, Eq)]
193pub enum QueuedUserMessageMode {
194 InterruptImmediate,
195 FinishStep,
196 WaitForCompletion,
197}
198
199#[derive(Clone, Copy, Debug, PartialEq, Eq)]
200pub enum DeliveryCheckpoint {
201 InterruptImmediate,
202 AfterCurrentOperation,
203 EndOfInteraction,
204}
205
206impl QueuedUserMessageMode {
207 fn from_str(value: &str) -> Self {
208 match value {
209 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
210 "finish_step" | "after_current_operation" => Self::FinishStep,
211 _ => Self::WaitForCompletion,
212 }
213 }
214}
215
216#[derive(Clone, Debug, PartialEq, Eq)]
217pub struct QueuedUserMessage {
218 pub content: String,
219 pub mode: QueuedUserMessageMode,
220}
221
222#[allow(clippy::new_without_default)]
224impl HostBridge {
225 pub fn new() -> Self {
230 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
231 Arc::new(Mutex::new(HashMap::new()));
232 let cancelled = Arc::new(AtomicBool::new(false));
233 let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
234 Arc::new(Mutex::new(VecDeque::new()));
235 let resume_requested = Arc::new(AtomicBool::new(false));
236 let skills_reload_requested = Arc::new(AtomicBool::new(false));
237 let daemon_idle = Arc::new(AtomicBool::new(false));
238
239 let pending_clone = pending.clone();
241 let cancelled_clone = cancelled.clone();
242 let queued_clone = queued_user_messages.clone();
243 let resume_clone = resume_requested.clone();
244 let skills_reload_clone = skills_reload_requested.clone();
245 tokio::task::spawn_local(async move {
246 let stdin = tokio::io::stdin();
247 let reader = tokio::io::BufReader::new(stdin);
248 let mut lines = reader.lines();
249
250 while let Ok(Some(line)) = lines.next_line().await {
251 let line = line.trim().to_string();
252 if line.is_empty() {
253 continue;
254 }
255
256 let msg: serde_json::Value = match serde_json::from_str(&line) {
257 Ok(v) => v,
258 Err(_) => continue,
259 };
260
261 if msg.get("id").is_none() {
263 if let Some(method) = msg["method"].as_str() {
264 if method == "cancel" {
265 cancelled_clone.store(true, Ordering::SeqCst);
266 } else if method == "agent/resume" {
267 resume_clone.store(true, Ordering::SeqCst);
268 } else if method == "skills/update" {
269 skills_reload_clone.store(true, Ordering::SeqCst);
270 } else if method == "user_message"
271 || method == "session/input"
272 || method == "agent/user_message"
273 {
274 let params = &msg["params"];
275 let content = params
276 .get("content")
277 .and_then(|v| v.as_str())
278 .unwrap_or("")
279 .to_string();
280 if !content.is_empty() {
281 let mode = QueuedUserMessageMode::from_str(
282 params
283 .get("mode")
284 .and_then(|v| v.as_str())
285 .unwrap_or("wait_for_completion"),
286 );
287 queued_clone
288 .lock()
289 .await
290 .push_back(QueuedUserMessage { content, mode });
291 }
292 }
293 }
294 continue;
295 }
296
297 if let Some(id) = msg["id"].as_u64() {
298 let mut pending = pending_clone.lock().await;
299 if let Some(sender) = pending.remove(&id) {
300 let _ = sender.send(msg);
301 }
302 }
303 }
304
305 let mut pending = pending_clone.lock().await;
307 pending.clear();
308 });
309
310 Self {
311 next_id: AtomicU64::new(1),
312 pending,
313 cancelled,
314 stdout_lock: Arc::new(std::sync::Mutex::new(())),
315 session_id: std::sync::Mutex::new(String::new()),
316 script_name: std::sync::Mutex::new(String::new()),
317 queued_user_messages,
318 resume_requested,
319 skills_reload_requested,
320 daemon_idle,
321 visible_call_states: std::sync::Mutex::new(HashMap::new()),
322 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
323 in_process: None,
324 #[cfg(test)]
325 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
326 }
327 }
328
329 pub fn from_parts(
335 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
336 cancelled: Arc<AtomicBool>,
337 stdout_lock: Arc<std::sync::Mutex<()>>,
338 start_id: u64,
339 ) -> Self {
340 Self {
341 next_id: AtomicU64::new(start_id),
342 pending,
343 cancelled,
344 stdout_lock,
345 session_id: std::sync::Mutex::new(String::new()),
346 script_name: std::sync::Mutex::new(String::new()),
347 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
348 resume_requested: Arc::new(AtomicBool::new(false)),
349 skills_reload_requested: Arc::new(AtomicBool::new(false)),
350 daemon_idle: Arc::new(AtomicBool::new(false)),
351 visible_call_states: std::sync::Mutex::new(HashMap::new()),
352 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
353 in_process: None,
354 #[cfg(test)]
355 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
356 }
357 }
358
359 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
362 let exported_functions = vm.load_module_exports(module_path).await?;
363 Ok(Self {
364 next_id: AtomicU64::new(1),
365 pending: Arc::new(Mutex::new(HashMap::new())),
366 cancelled: Arc::new(AtomicBool::new(false)),
367 stdout_lock: Arc::new(std::sync::Mutex::new(())),
368 session_id: std::sync::Mutex::new(String::new()),
369 script_name: std::sync::Mutex::new(String::new()),
370 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
371 resume_requested: Arc::new(AtomicBool::new(false)),
372 skills_reload_requested: Arc::new(AtomicBool::new(false)),
373 daemon_idle: Arc::new(AtomicBool::new(false)),
374 visible_call_states: std::sync::Mutex::new(HashMap::new()),
375 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
376 in_process: Some(InProcessHost {
377 module_path: module_path.to_path_buf(),
378 exported_functions,
379 vm,
380 }),
381 #[cfg(test)]
382 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
383 })
384 }
385
386 pub fn set_session_id(&self, id: &str) {
388 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
389 }
390
391 pub fn set_script_name(&self, name: &str) {
393 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
394 }
395
396 fn get_script_name(&self) -> String {
398 self.script_name
399 .lock()
400 .unwrap_or_else(|e| e.into_inner())
401 .clone()
402 }
403
404 fn get_session_id(&self) -> String {
406 self.session_id
407 .lock()
408 .unwrap_or_else(|e| e.into_inner())
409 .clone()
410 }
411
412 fn write_line(&self, line: &str) -> Result<(), VmError> {
414 let _guard = self.stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
415 let mut stdout = std::io::stdout().lock();
416 stdout
417 .write_all(line.as_bytes())
418 .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
419 stdout
420 .write_all(b"\n")
421 .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
422 stdout
423 .flush()
424 .map_err(|e| VmError::Runtime(format!("Bridge flush error: {e}")))?;
425 Ok(())
426 }
427
428 pub async fn call(
431 &self,
432 method: &str,
433 params: serde_json::Value,
434 ) -> Result<serde_json::Value, VmError> {
435 if let Some(in_process) = &self.in_process {
436 return in_process.dispatch(method, params).await;
437 }
438
439 if self.is_cancelled() {
440 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
441 }
442
443 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
444
445 let request = crate::jsonrpc::request(id, method, params);
446
447 let (tx, rx) = oneshot::channel();
448 {
449 let mut pending = self.pending.lock().await;
450 pending.insert(id, tx);
451 }
452
453 let line = serde_json::to_string(&request)
454 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
455 if let Err(e) = self.write_line(&line) {
456 let mut pending = self.pending.lock().await;
457 pending.remove(&id);
458 return Err(e);
459 }
460
461 let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
462 Ok(Ok(msg)) => msg,
463 Ok(Err(_)) => {
464 return Err(VmError::Runtime(
466 "Bridge: host closed connection before responding".into(),
467 ));
468 }
469 Err(_) => {
470 let mut pending = self.pending.lock().await;
471 pending.remove(&id);
472 return Err(VmError::Runtime(format!(
473 "Bridge: host did not respond to '{method}' within {}s",
474 DEFAULT_TIMEOUT.as_secs()
475 )));
476 }
477 };
478
479 if let Some(error) = response.get("error") {
480 let message = error["message"].as_str().unwrap_or("Unknown host error");
481 let code = error["code"].as_i64().unwrap_or(-1);
482 if code == -32001 {
484 return Err(VmError::CategorizedError {
485 message: message.to_string(),
486 category: ErrorCategory::ToolRejected,
487 });
488 }
489 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
490 }
491
492 Ok(response["result"].clone())
493 }
494
495 pub fn notify(&self, method: &str, params: serde_json::Value) {
498 let notification = crate::jsonrpc::notification(method, params);
499 #[cfg(test)]
500 self.recorded_notifications
501 .lock()
502 .unwrap_or_else(|e| e.into_inner())
503 .push(notification.clone());
504 if self.in_process.is_some() {
505 return;
506 }
507 if let Ok(line) = serde_json::to_string(¬ification) {
508 let _ = self.write_line(&line);
509 }
510 }
511
512 #[cfg(test)]
513 pub(crate) fn recorded_notifications(&self) -> Vec<serde_json::Value> {
514 self.recorded_notifications
515 .lock()
516 .unwrap_or_else(|e| e.into_inner())
517 .clone()
518 }
519
520 pub fn is_cancelled(&self) -> bool {
522 self.cancelled.load(Ordering::SeqCst)
523 }
524
525 pub fn take_resume_signal(&self) -> bool {
526 self.resume_requested.swap(false, Ordering::SeqCst)
527 }
528
529 pub fn signal_resume(&self) {
530 self.resume_requested.store(true, Ordering::SeqCst);
531 }
532
533 pub fn set_daemon_idle(&self, idle: bool) {
534 self.daemon_idle.store(idle, Ordering::SeqCst);
535 }
536
537 pub fn is_daemon_idle(&self) -> bool {
538 self.daemon_idle.load(Ordering::SeqCst)
539 }
540
541 pub fn take_skills_reload_signal(&self) -> bool {
546 self.skills_reload_requested.swap(false, Ordering::SeqCst)
547 }
548
549 pub fn signal_skills_reload(&self) {
553 self.skills_reload_requested.store(true, Ordering::SeqCst);
554 }
555
556 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
562 let result = self.call("skills/list", serde_json::json!({})).await?;
563 match result {
564 serde_json::Value::Array(items) => Ok(items),
565 serde_json::Value::Object(map) => match map.get("skills") {
566 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
567 _ => Err(VmError::Runtime(
568 "skills/list: host response must be an array or { skills: [...] }".into(),
569 )),
570 },
571 _ => Err(VmError::Runtime(
572 "skills/list: unexpected response shape".into(),
573 )),
574 }
575 }
576
577 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
583 let result = self.call("host/tools/list", serde_json::json!({})).await?;
584 parse_host_tools_list_response(result)
585 }
586
587 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
591 self.call("skills/fetch", serde_json::json!({ "id": id }))
592 .await
593 }
594
595 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
596 self.queued_user_messages
597 .lock()
598 .await
599 .push_back(QueuedUserMessage {
600 content,
601 mode: QueuedUserMessageMode::from_str(mode),
602 });
603 }
604
605 pub async fn take_queued_user_messages(
606 &self,
607 include_interrupt_immediate: bool,
608 include_finish_step: bool,
609 include_wait_for_completion: bool,
610 ) -> Vec<QueuedUserMessage> {
611 let mut queue = self.queued_user_messages.lock().await;
612 let mut selected = Vec::new();
613 let mut retained = VecDeque::new();
614 while let Some(message) = queue.pop_front() {
615 let should_take = match message.mode {
616 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
617 QueuedUserMessageMode::FinishStep => include_finish_step,
618 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
619 };
620 if should_take {
621 selected.push(message);
622 } else {
623 retained.push_back(message);
624 }
625 }
626 *queue = retained;
627 selected
628 }
629
630 pub async fn take_queued_user_messages_for(
631 &self,
632 checkpoint: DeliveryCheckpoint,
633 ) -> Vec<QueuedUserMessage> {
634 match checkpoint {
635 DeliveryCheckpoint::InterruptImmediate => {
636 self.take_queued_user_messages(true, false, false).await
637 }
638 DeliveryCheckpoint::AfterCurrentOperation => {
639 self.take_queued_user_messages(false, true, false).await
640 }
641 DeliveryCheckpoint::EndOfInteraction => {
642 self.take_queued_user_messages(false, false, true).await
643 }
644 }
645 }
646
647 pub fn send_output(&self, text: &str) {
649 self.notify("output", serde_json::json!({"text": text}));
650 }
651
652 pub fn send_progress(
654 &self,
655 phase: &str,
656 message: &str,
657 progress: Option<i64>,
658 total: Option<i64>,
659 data: Option<serde_json::Value>,
660 ) {
661 let mut payload = serde_json::json!({"phase": phase, "message": message});
662 if let Some(p) = progress {
663 payload["progress"] = serde_json::json!(p);
664 }
665 if let Some(t) = total {
666 payload["total"] = serde_json::json!(t);
667 }
668 if let Some(d) = data {
669 payload["data"] = d;
670 }
671 self.notify("progress", payload);
672 }
673
674 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
676 let mut payload = serde_json::json!({"level": level, "message": message});
677 if let Some(f) = fields {
678 payload["fields"] = f;
679 }
680 self.notify("log", payload);
681 }
682
683 pub fn send_call_start(
686 &self,
687 call_id: &str,
688 call_type: &str,
689 name: &str,
690 metadata: serde_json::Value,
691 ) {
692 let session_id = self.get_session_id();
693 let script = self.get_script_name();
694 let stream_publicly = metadata
695 .get("stream_publicly")
696 .and_then(|value| value.as_bool())
697 .unwrap_or(true);
698 self.visible_call_streams
699 .lock()
700 .unwrap_or_else(|e| e.into_inner())
701 .insert(call_id.to_string(), stream_publicly);
702 self.notify(
703 "session/update",
704 serde_json::json!({
705 "sessionId": session_id,
706 "update": {
707 "sessionUpdate": "call_start",
708 "content": {
709 "toolCallId": call_id,
710 "call_type": call_type,
711 "name": name,
712 "script": script,
713 "metadata": metadata,
714 },
715 },
716 }),
717 );
718 }
719
720 pub fn send_call_progress(
723 &self,
724 call_id: &str,
725 delta: &str,
726 accumulated_tokens: u64,
727 user_visible: bool,
728 ) {
729 let session_id = self.get_session_id();
730 let (visible_text, visible_delta) = {
731 let stream_publicly = self
732 .visible_call_streams
733 .lock()
734 .unwrap_or_else(|e| e.into_inner())
735 .get(call_id)
736 .copied()
737 .unwrap_or(true);
738 let mut states = self
739 .visible_call_states
740 .lock()
741 .unwrap_or_else(|e| e.into_inner());
742 let state = states.entry(call_id.to_string()).or_default();
743 state.push(delta, stream_publicly)
744 };
745 self.notify(
746 "session/update",
747 serde_json::json!({
748 "sessionId": session_id,
749 "update": {
750 "sessionUpdate": "call_progress",
751 "content": {
752 "toolCallId": call_id,
753 "delta": delta,
754 "accumulated_tokens": accumulated_tokens,
755 "visible_text": visible_text,
756 "visible_delta": visible_delta,
757 "user_visible": user_visible,
758 },
759 },
760 }),
761 );
762 }
763
764 pub fn send_call_end(
766 &self,
767 call_id: &str,
768 call_type: &str,
769 name: &str,
770 duration_ms: u64,
771 status: &str,
772 metadata: serde_json::Value,
773 ) {
774 let session_id = self.get_session_id();
775 let script = self.get_script_name();
776 self.visible_call_states
777 .lock()
778 .unwrap_or_else(|e| e.into_inner())
779 .remove(call_id);
780 self.visible_call_streams
781 .lock()
782 .unwrap_or_else(|e| e.into_inner())
783 .remove(call_id);
784 self.notify(
785 "session/update",
786 serde_json::json!({
787 "sessionId": session_id,
788 "update": {
789 "sessionUpdate": "call_end",
790 "content": {
791 "toolCallId": call_id,
792 "call_type": call_type,
793 "name": name,
794 "script": script,
795 "duration_ms": duration_ms,
796 "status": status,
797 "metadata": metadata,
798 },
799 },
800 }),
801 );
802 }
803
804 pub fn send_worker_update(
806 &self,
807 worker_id: &str,
808 worker_name: &str,
809 status: &str,
810 metadata: serde_json::Value,
811 audit: Option<&MutationSessionRecord>,
812 ) {
813 let session_id = self.get_session_id();
814 let script = self.get_script_name();
815 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
816 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
817 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
818 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
819 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
820 let lifecycle = serde_json::json!({
821 "event": status,
822 "worker_id": worker_id,
823 "worker_name": worker_name,
824 "started_at": started_at,
825 "finished_at": finished_at,
826 });
827 self.notify(
828 "session/update",
829 serde_json::json!({
830 "sessionId": session_id,
831 "update": {
832 "sessionUpdate": "worker_update",
833 "content": {
834 "worker_id": worker_id,
835 "worker_name": worker_name,
836 "status": status,
837 "script": script,
838 "started_at": started_at,
839 "finished_at": finished_at,
840 "snapshot_path": snapshot_path,
841 "run_id": run_id,
842 "run_path": run_path,
843 "lifecycle": lifecycle,
844 "audit": audit,
845 "metadata": metadata,
846 },
847 },
848 }),
849 );
850 }
851}
852
853pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
855 crate::stdlib::json_to_vm_value(val)
856}
857
858fn parse_host_tools_list_response(
859 result: serde_json::Value,
860) -> Result<Vec<serde_json::Value>, VmError> {
861 let tools = match result {
862 serde_json::Value::Array(items) => items,
863 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
864 map.get("result")
865 .and_then(|value| value.get("tools"))
866 .cloned()
867 }) {
868 Some(serde_json::Value::Array(items)) => items,
869 _ => {
870 return Err(VmError::Runtime(
871 "host/tools/list: host response must be an array or { tools: [...] }".into(),
872 ));
873 }
874 },
875 _ => {
876 return Err(VmError::Runtime(
877 "host/tools/list: unexpected response shape".into(),
878 ));
879 }
880 };
881
882 let mut normalized = Vec::with_capacity(tools.len());
883 for tool in tools {
884 let serde_json::Value::Object(map) = tool else {
885 return Err(VmError::Runtime(
886 "host/tools/list: every tool must be an object".into(),
887 ));
888 };
889 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
890 return Err(VmError::Runtime(
891 "host/tools/list: every tool must include a string `name`".into(),
892 ));
893 };
894 let description = map
895 .get("description")
896 .and_then(|value| value.as_str())
897 .or_else(|| {
898 map.get("short_description")
899 .and_then(|value| value.as_str())
900 })
901 .unwrap_or_default();
902 let schema = map
903 .get("schema")
904 .cloned()
905 .or_else(|| map.get("parameters").cloned())
906 .or_else(|| map.get("input_schema").cloned())
907 .unwrap_or(serde_json::Value::Null);
908 let deprecated = map
909 .get("deprecated")
910 .and_then(|value| value.as_bool())
911 .unwrap_or(false);
912 normalized.push(serde_json::json!({
913 "name": name,
914 "description": description,
915 "schema": schema,
916 "deprecated": deprecated,
917 }));
918 }
919 Ok(normalized)
920}
921
922#[cfg(test)]
923mod tests {
924 use super::*;
925
926 #[test]
927 fn test_json_rpc_request_format() {
928 let request = crate::jsonrpc::request(
929 1,
930 "llm_call",
931 serde_json::json!({
932 "prompt": "Hello",
933 "system": "Be helpful",
934 }),
935 );
936 let s = serde_json::to_string(&request).unwrap();
937 assert!(s.contains("\"jsonrpc\":\"2.0\""));
938 assert!(s.contains("\"id\":1"));
939 assert!(s.contains("\"method\":\"llm_call\""));
940 }
941
942 #[test]
943 fn test_json_rpc_notification_format() {
944 let notification =
945 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
946 let s = serde_json::to_string(¬ification).unwrap();
947 assert!(s.contains("\"method\":\"output\""));
948 assert!(!s.contains("\"id\""));
949 }
950
951 #[test]
952 fn test_json_rpc_error_response_parsing() {
953 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
954 assert!(response.get("error").is_some());
955 assert_eq!(
956 response["error"]["message"].as_str().unwrap(),
957 "Invalid request"
958 );
959 }
960
961 #[test]
962 fn test_json_rpc_success_response_parsing() {
963 let response = crate::jsonrpc::response(
964 1,
965 serde_json::json!({
966 "text": "Hello world",
967 "input_tokens": 10,
968 "output_tokens": 5,
969 }),
970 );
971 assert!(response.get("result").is_some());
972 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
973 }
974
975 #[test]
976 fn test_cancelled_flag() {
977 let cancelled = Arc::new(AtomicBool::new(false));
978 assert!(!cancelled.load(Ordering::SeqCst));
979 cancelled.store(true, Ordering::SeqCst);
980 assert!(cancelled.load(Ordering::SeqCst));
981 }
982
983 #[test]
984 fn queued_messages_are_filtered_by_delivery_mode() {
985 let runtime = tokio::runtime::Builder::new_current_thread()
986 .enable_all()
987 .build()
988 .unwrap();
989 runtime.block_on(async {
990 let bridge = HostBridge::from_parts(
991 Arc::new(Mutex::new(HashMap::new())),
992 Arc::new(AtomicBool::new(false)),
993 Arc::new(std::sync::Mutex::new(())),
994 1,
995 );
996 bridge
997 .push_queued_user_message("first".to_string(), "finish_step")
998 .await;
999 bridge
1000 .push_queued_user_message("second".to_string(), "wait_for_completion")
1001 .await;
1002
1003 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1004 assert_eq!(finish_step.len(), 1);
1005 assert_eq!(finish_step[0].content, "first");
1006
1007 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1008 assert_eq!(turn_end.len(), 1);
1009 assert_eq!(turn_end[0].content, "second");
1010 });
1011 }
1012
1013 #[test]
1014 fn test_json_result_to_vm_value_string() {
1015 let val = serde_json::json!("hello");
1016 let vm_val = json_result_to_vm_value(&val);
1017 assert_eq!(vm_val.display(), "hello");
1018 }
1019
1020 #[test]
1021 fn test_json_result_to_vm_value_dict() {
1022 let val = serde_json::json!({"name": "test", "count": 42});
1023 let vm_val = json_result_to_vm_value(&val);
1024 let VmValue::Dict(d) = &vm_val else {
1025 unreachable!("Expected Dict, got {:?}", vm_val);
1026 };
1027 assert_eq!(d.get("name").unwrap().display(), "test");
1028 assert_eq!(d.get("count").unwrap().display(), "42");
1029 }
1030
1031 #[test]
1032 fn test_json_result_to_vm_value_null() {
1033 let val = serde_json::json!(null);
1034 let vm_val = json_result_to_vm_value(&val);
1035 assert!(matches!(vm_val, VmValue::Nil));
1036 }
1037
1038 #[test]
1039 fn test_json_result_to_vm_value_nested() {
1040 let val = serde_json::json!({
1041 "text": "response",
1042 "tool_calls": [
1043 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1044 ],
1045 "input_tokens": 100,
1046 "output_tokens": 50,
1047 });
1048 let vm_val = json_result_to_vm_value(&val);
1049 let VmValue::Dict(d) = &vm_val else {
1050 unreachable!("Expected Dict, got {:?}", vm_val);
1051 };
1052 assert_eq!(d.get("text").unwrap().display(), "response");
1053 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1054 unreachable!("Expected List for tool_calls");
1055 };
1056 assert_eq!(list.len(), 1);
1057 }
1058
1059 #[test]
1060 fn parse_host_tools_list_accepts_object_wrapper() {
1061 let tools = parse_host_tools_list_response(serde_json::json!({
1062 "tools": [
1063 {
1064 "name": "Read",
1065 "description": "Read a file",
1066 "schema": {"type": "object"},
1067 }
1068 ]
1069 }))
1070 .expect("tool list");
1071
1072 assert_eq!(tools.len(), 1);
1073 assert_eq!(tools[0]["name"], "Read");
1074 assert_eq!(tools[0]["deprecated"], false);
1075 }
1076
1077 #[test]
1078 fn parse_host_tools_list_accepts_compat_fields() {
1079 let tools = parse_host_tools_list_response(serde_json::json!({
1080 "result": {
1081 "tools": [
1082 {
1083 "name": "Edit",
1084 "short_description": "Apply an edit",
1085 "input_schema": {"type": "object"},
1086 "deprecated": true,
1087 }
1088 ]
1089 }
1090 }))
1091 .expect("tool list");
1092
1093 assert_eq!(tools[0]["description"], "Apply an edit");
1094 assert_eq!(tools[0]["schema"]["type"], "object");
1095 assert_eq!(tools[0]["deprecated"], true);
1096 }
1097
1098 #[test]
1099 fn parse_host_tools_list_requires_tool_names() {
1100 let err = parse_host_tools_list_response(serde_json::json!({
1101 "tools": [
1102 {"description": "missing name"}
1103 ]
1104 }))
1105 .expect_err("expected error");
1106 assert!(err
1107 .to_string()
1108 .contains("host/tools/list: every tool must include a string `name`"));
1109 }
1110
1111 #[test]
1112 fn test_timeout_duration() {
1113 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1114 }
1115}