1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29 Arc::new(move |line: &str| {
30 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31 let mut stdout = std::io::stdout().lock();
32 stdout
33 .write_all(line.as_bytes())
34 .map_err(|e| format!("Bridge write error: {e}"))?;
35 stdout
36 .write_all(b"\n")
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .flush()
40 .map_err(|e| format!("Bridge flush error: {e}"))?;
41 Ok(())
42 })
43}
44
45pub struct HostBridge {
52 next_id: AtomicU64,
53 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55 cancelled: Arc<AtomicBool>,
57 writer: HostBridgeWriter,
59 session_id: std::sync::Mutex<String>,
61 script_name: std::sync::Mutex<String>,
63 queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
65 resume_requested: Arc<AtomicBool>,
67 skills_reload_requested: Arc<AtomicBool>,
72 daemon_idle: Arc<AtomicBool>,
74 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
76 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
78 in_process: Option<InProcessHost>,
80 #[cfg(test)]
81 recorded_notifications: Arc<std::sync::Mutex<Vec<serde_json::Value>>>,
82}
83
84struct InProcessHost {
85 module_path: PathBuf,
86 exported_functions: BTreeMap<String, Rc<VmClosure>>,
87 vm: Vm,
88}
89
90impl InProcessHost {
91 async fn dispatch(
92 &self,
93 method: &str,
94 params: serde_json::Value,
95 ) -> Result<serde_json::Value, VmError> {
96 match method {
97 "builtin_call" => {
98 let name = params
99 .get("name")
100 .and_then(|value| value.as_str())
101 .unwrap_or_default();
102 let args = params
103 .get("args")
104 .and_then(|value| value.as_array())
105 .cloned()
106 .unwrap_or_default()
107 .into_iter()
108 .map(|value| json_result_to_vm_value(&value))
109 .collect::<Vec<_>>();
110 self.invoke_export(name, &args).await
111 }
112 "host/tools/list" => self
113 .invoke_optional_export("host_tools_list", &[])
114 .await
115 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
116 "session/request_permission" => self.request_permission(params).await,
117 other => Err(VmError::Runtime(format!(
118 "playground host backend does not implement bridge method '{other}'"
119 ))),
120 }
121 }
122
123 async fn invoke_export(
124 &self,
125 name: &str,
126 args: &[VmValue],
127 ) -> Result<serde_json::Value, VmError> {
128 let Some(closure) = self.exported_functions.get(name) else {
129 return Err(VmError::Runtime(format!(
130 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
131 self.module_path.display()
132 )));
133 };
134
135 let mut vm = self.vm.child_vm_for_host();
136 let result = vm.call_closure_pub(closure, args).await?;
137 Ok(crate::llm::vm_value_to_json(&result))
138 }
139
140 async fn invoke_optional_export(
141 &self,
142 name: &str,
143 args: &[VmValue],
144 ) -> Result<Option<serde_json::Value>, VmError> {
145 if !self.exported_functions.contains_key(name) {
146 return Ok(None);
147 }
148 self.invoke_export(name, args).await.map(Some)
149 }
150
151 async fn request_permission(
152 &self,
153 params: serde_json::Value,
154 ) -> Result<serde_json::Value, VmError> {
155 let Some(closure) = self.exported_functions.get("request_permission") else {
156 return Ok(serde_json::json!({ "granted": true }));
157 };
158
159 let tool_name = params
160 .get("toolCall")
161 .and_then(|tool_call| tool_call.get("toolName"))
162 .and_then(|value| value.as_str())
163 .unwrap_or_default();
164 let tool_args = params
165 .get("toolCall")
166 .and_then(|tool_call| tool_call.get("rawInput"))
167 .map(json_result_to_vm_value)
168 .unwrap_or(VmValue::Nil);
169 let full_payload = json_result_to_vm_value(¶ms);
170
171 let arg_count = closure.func.params.len();
172 let args = if arg_count >= 3 {
173 vec![
174 VmValue::String(Rc::from(tool_name.to_string())),
175 tool_args,
176 full_payload,
177 ]
178 } else if arg_count == 2 {
179 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
180 } else if arg_count == 1 {
181 vec![full_payload]
182 } else {
183 Vec::new()
184 };
185
186 let mut vm = self.vm.child_vm_for_host();
187 let result = vm.call_closure_pub(closure, &args).await?;
188 let payload = match result {
189 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
190 VmValue::String(reason) if !reason.is_empty() => {
191 serde_json::json!({ "granted": false, "reason": reason.to_string() })
192 }
193 other => {
194 let json = crate::llm::vm_value_to_json(&other);
195 if json
196 .get("granted")
197 .and_then(|value| value.as_bool())
198 .is_some()
199 || json.get("outcome").is_some()
200 {
201 json
202 } else {
203 serde_json::json!({ "granted": other.is_truthy() })
204 }
205 }
206 };
207 Ok(payload)
208 }
209}
210
211#[derive(Clone, Debug, PartialEq, Eq)]
212pub enum QueuedUserMessageMode {
213 InterruptImmediate,
214 FinishStep,
215 WaitForCompletion,
216}
217
218#[derive(Clone, Copy, Debug, PartialEq, Eq)]
219pub enum DeliveryCheckpoint {
220 InterruptImmediate,
221 AfterCurrentOperation,
222 EndOfInteraction,
223}
224
225impl QueuedUserMessageMode {
226 fn from_str(value: &str) -> Self {
227 match value {
228 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
229 "finish_step" | "after_current_operation" => Self::FinishStep,
230 _ => Self::WaitForCompletion,
231 }
232 }
233}
234
235#[derive(Clone, Debug, PartialEq, Eq)]
236pub struct QueuedUserMessage {
237 pub content: String,
238 pub mode: QueuedUserMessageMode,
239}
240
241#[allow(clippy::new_without_default)]
243impl HostBridge {
244 pub fn new() -> Self {
249 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
250 Arc::new(Mutex::new(HashMap::new()));
251 let cancelled = Arc::new(AtomicBool::new(false));
252 let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
253 Arc::new(Mutex::new(VecDeque::new()));
254 let resume_requested = Arc::new(AtomicBool::new(false));
255 let skills_reload_requested = Arc::new(AtomicBool::new(false));
256 let daemon_idle = Arc::new(AtomicBool::new(false));
257
258 let pending_clone = pending.clone();
260 let cancelled_clone = cancelled.clone();
261 let queued_clone = queued_user_messages.clone();
262 let resume_clone = resume_requested.clone();
263 let skills_reload_clone = skills_reload_requested.clone();
264 tokio::task::spawn_local(async move {
265 let stdin = tokio::io::stdin();
266 let reader = tokio::io::BufReader::new(stdin);
267 let mut lines = reader.lines();
268
269 while let Ok(Some(line)) = lines.next_line().await {
270 let line = line.trim().to_string();
271 if line.is_empty() {
272 continue;
273 }
274
275 let msg: serde_json::Value = match serde_json::from_str(&line) {
276 Ok(v) => v,
277 Err(_) => continue,
278 };
279
280 if msg.get("id").is_none() {
282 if let Some(method) = msg["method"].as_str() {
283 if method == "cancel" {
284 cancelled_clone.store(true, Ordering::SeqCst);
285 } else if method == "agent/resume" {
286 resume_clone.store(true, Ordering::SeqCst);
287 } else if method == "skills/update" {
288 skills_reload_clone.store(true, Ordering::SeqCst);
289 } else if method == "user_message"
290 || method == "session/input"
291 || method == "agent/user_message"
292 {
293 let params = &msg["params"];
294 let content = params
295 .get("content")
296 .and_then(|v| v.as_str())
297 .unwrap_or("")
298 .to_string();
299 if !content.is_empty() {
300 let mode = QueuedUserMessageMode::from_str(
301 params
302 .get("mode")
303 .and_then(|v| v.as_str())
304 .unwrap_or("wait_for_completion"),
305 );
306 queued_clone
307 .lock()
308 .await
309 .push_back(QueuedUserMessage { content, mode });
310 }
311 }
312 }
313 continue;
314 }
315
316 if let Some(id) = msg["id"].as_u64() {
317 let mut pending = pending_clone.lock().await;
318 if let Some(sender) = pending.remove(&id) {
319 let _ = sender.send(msg);
320 }
321 }
322 }
323
324 let mut pending = pending_clone.lock().await;
326 pending.clear();
327 });
328
329 Self {
330 next_id: AtomicU64::new(1),
331 pending,
332 cancelled,
333 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
334 session_id: std::sync::Mutex::new(String::new()),
335 script_name: std::sync::Mutex::new(String::new()),
336 queued_user_messages,
337 resume_requested,
338 skills_reload_requested,
339 daemon_idle,
340 visible_call_states: std::sync::Mutex::new(HashMap::new()),
341 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
342 in_process: None,
343 #[cfg(test)]
344 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
345 }
346 }
347
348 pub fn from_parts(
354 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
355 cancelled: Arc<AtomicBool>,
356 stdout_lock: Arc<std::sync::Mutex<()>>,
357 start_id: u64,
358 ) -> Self {
359 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
360 }
361
362 pub fn from_parts_with_writer(
363 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
364 cancelled: Arc<AtomicBool>,
365 writer: HostBridgeWriter,
366 start_id: u64,
367 ) -> Self {
368 Self {
369 next_id: AtomicU64::new(start_id),
370 pending,
371 cancelled,
372 writer,
373 session_id: std::sync::Mutex::new(String::new()),
374 script_name: std::sync::Mutex::new(String::new()),
375 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
376 resume_requested: Arc::new(AtomicBool::new(false)),
377 skills_reload_requested: Arc::new(AtomicBool::new(false)),
378 daemon_idle: Arc::new(AtomicBool::new(false)),
379 visible_call_states: std::sync::Mutex::new(HashMap::new()),
380 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
381 in_process: None,
382 #[cfg(test)]
383 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
384 }
385 }
386
387 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
390 let exported_functions = vm.load_module_exports(module_path).await?;
391 Ok(Self {
392 next_id: AtomicU64::new(1),
393 pending: Arc::new(Mutex::new(HashMap::new())),
394 cancelled: Arc::new(AtomicBool::new(false)),
395 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
396 session_id: std::sync::Mutex::new(String::new()),
397 script_name: std::sync::Mutex::new(String::new()),
398 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
399 resume_requested: Arc::new(AtomicBool::new(false)),
400 skills_reload_requested: Arc::new(AtomicBool::new(false)),
401 daemon_idle: Arc::new(AtomicBool::new(false)),
402 visible_call_states: std::sync::Mutex::new(HashMap::new()),
403 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
404 in_process: Some(InProcessHost {
405 module_path: module_path.to_path_buf(),
406 exported_functions,
407 vm,
408 }),
409 #[cfg(test)]
410 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
411 })
412 }
413
414 pub fn set_session_id(&self, id: &str) {
416 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
417 }
418
419 pub fn set_script_name(&self, name: &str) {
421 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
422 }
423
424 fn get_script_name(&self) -> String {
426 self.script_name
427 .lock()
428 .unwrap_or_else(|e| e.into_inner())
429 .clone()
430 }
431
432 pub fn get_session_id(&self) -> String {
434 self.session_id
435 .lock()
436 .unwrap_or_else(|e| e.into_inner())
437 .clone()
438 }
439
440 fn write_line(&self, line: &str) -> Result<(), VmError> {
442 (self.writer)(line).map_err(VmError::Runtime)
443 }
444
445 pub async fn call(
448 &self,
449 method: &str,
450 params: serde_json::Value,
451 ) -> Result<serde_json::Value, VmError> {
452 if let Some(in_process) = &self.in_process {
453 return in_process.dispatch(method, params).await;
454 }
455
456 if self.is_cancelled() {
457 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
458 }
459
460 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
461
462 let request = crate::jsonrpc::request(id, method, params);
463
464 let (tx, rx) = oneshot::channel();
465 {
466 let mut pending = self.pending.lock().await;
467 pending.insert(id, tx);
468 }
469
470 let line = serde_json::to_string(&request)
471 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
472 if let Err(e) = self.write_line(&line) {
473 let mut pending = self.pending.lock().await;
474 pending.remove(&id);
475 return Err(e);
476 }
477
478 let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
479 Ok(Ok(msg)) => msg,
480 Ok(Err(_)) => {
481 return Err(VmError::Runtime(
483 "Bridge: host closed connection before responding".into(),
484 ));
485 }
486 Err(_) => {
487 let mut pending = self.pending.lock().await;
488 pending.remove(&id);
489 return Err(VmError::Runtime(format!(
490 "Bridge: host did not respond to '{method}' within {}s",
491 DEFAULT_TIMEOUT.as_secs()
492 )));
493 }
494 };
495
496 if let Some(error) = response.get("error") {
497 let message = error["message"].as_str().unwrap_or("Unknown host error");
498 let code = error["code"].as_i64().unwrap_or(-1);
499 if code == -32001 {
501 return Err(VmError::CategorizedError {
502 message: message.to_string(),
503 category: ErrorCategory::ToolRejected,
504 });
505 }
506 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
507 }
508
509 Ok(response["result"].clone())
510 }
511
512 pub fn notify(&self, method: &str, params: serde_json::Value) {
515 let notification = crate::jsonrpc::notification(method, params);
516 #[cfg(test)]
517 self.recorded_notifications
518 .lock()
519 .unwrap_or_else(|e| e.into_inner())
520 .push(notification.clone());
521 if self.in_process.is_some() {
522 return;
523 }
524 if let Ok(line) = serde_json::to_string(¬ification) {
525 let _ = self.write_line(&line);
526 }
527 }
528
529 #[cfg(test)]
530 pub(crate) fn recorded_notifications(&self) -> Vec<serde_json::Value> {
531 self.recorded_notifications
532 .lock()
533 .unwrap_or_else(|e| e.into_inner())
534 .clone()
535 }
536
537 pub fn is_cancelled(&self) -> bool {
539 self.cancelled.load(Ordering::SeqCst)
540 }
541
542 pub fn take_resume_signal(&self) -> bool {
543 self.resume_requested.swap(false, Ordering::SeqCst)
544 }
545
546 pub fn signal_resume(&self) {
547 self.resume_requested.store(true, Ordering::SeqCst);
548 }
549
550 pub fn set_daemon_idle(&self, idle: bool) {
551 self.daemon_idle.store(idle, Ordering::SeqCst);
552 }
553
554 pub fn is_daemon_idle(&self) -> bool {
555 self.daemon_idle.load(Ordering::SeqCst)
556 }
557
558 pub fn take_skills_reload_signal(&self) -> bool {
563 self.skills_reload_requested.swap(false, Ordering::SeqCst)
564 }
565
566 pub fn signal_skills_reload(&self) {
570 self.skills_reload_requested.store(true, Ordering::SeqCst);
571 }
572
573 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
579 let result = self.call("skills/list", serde_json::json!({})).await?;
580 match result {
581 serde_json::Value::Array(items) => Ok(items),
582 serde_json::Value::Object(map) => match map.get("skills") {
583 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
584 _ => Err(VmError::Runtime(
585 "skills/list: host response must be an array or { skills: [...] }".into(),
586 )),
587 },
588 _ => Err(VmError::Runtime(
589 "skills/list: unexpected response shape".into(),
590 )),
591 }
592 }
593
594 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
600 let result = self.call("host/tools/list", serde_json::json!({})).await?;
601 parse_host_tools_list_response(result)
602 }
603
604 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
608 self.call("skills/fetch", serde_json::json!({ "id": id }))
609 .await
610 }
611
612 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
613 self.queued_user_messages
614 .lock()
615 .await
616 .push_back(QueuedUserMessage {
617 content,
618 mode: QueuedUserMessageMode::from_str(mode),
619 });
620 }
621
622 pub async fn take_queued_user_messages(
623 &self,
624 include_interrupt_immediate: bool,
625 include_finish_step: bool,
626 include_wait_for_completion: bool,
627 ) -> Vec<QueuedUserMessage> {
628 let mut queue = self.queued_user_messages.lock().await;
629 let mut selected = Vec::new();
630 let mut retained = VecDeque::new();
631 while let Some(message) = queue.pop_front() {
632 let should_take = match message.mode {
633 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
634 QueuedUserMessageMode::FinishStep => include_finish_step,
635 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
636 };
637 if should_take {
638 selected.push(message);
639 } else {
640 retained.push_back(message);
641 }
642 }
643 *queue = retained;
644 selected
645 }
646
647 pub async fn take_queued_user_messages_for(
648 &self,
649 checkpoint: DeliveryCheckpoint,
650 ) -> Vec<QueuedUserMessage> {
651 match checkpoint {
652 DeliveryCheckpoint::InterruptImmediate => {
653 self.take_queued_user_messages(true, false, false).await
654 }
655 DeliveryCheckpoint::AfterCurrentOperation => {
656 self.take_queued_user_messages(false, true, false).await
657 }
658 DeliveryCheckpoint::EndOfInteraction => {
659 self.take_queued_user_messages(false, false, true).await
660 }
661 }
662 }
663
664 pub fn send_output(&self, text: &str) {
666 self.notify("output", serde_json::json!({"text": text}));
667 }
668
669 pub fn send_progress(
671 &self,
672 phase: &str,
673 message: &str,
674 progress: Option<i64>,
675 total: Option<i64>,
676 data: Option<serde_json::Value>,
677 ) {
678 let mut payload = serde_json::json!({"phase": phase, "message": message});
679 if let Some(p) = progress {
680 payload["progress"] = serde_json::json!(p);
681 }
682 if let Some(t) = total {
683 payload["total"] = serde_json::json!(t);
684 }
685 if let Some(d) = data {
686 payload["data"] = d;
687 }
688 self.notify("progress", payload);
689 }
690
691 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
693 let mut payload = serde_json::json!({"level": level, "message": message});
694 if let Some(f) = fields {
695 payload["fields"] = f;
696 }
697 self.notify("log", payload);
698 }
699
700 pub fn send_call_start(
703 &self,
704 call_id: &str,
705 call_type: &str,
706 name: &str,
707 metadata: serde_json::Value,
708 ) {
709 let session_id = self.get_session_id();
710 let script = self.get_script_name();
711 let stream_publicly = metadata
712 .get("stream_publicly")
713 .and_then(|value| value.as_bool())
714 .unwrap_or(true);
715 self.visible_call_streams
716 .lock()
717 .unwrap_or_else(|e| e.into_inner())
718 .insert(call_id.to_string(), stream_publicly);
719 self.notify(
720 "session/update",
721 serde_json::json!({
722 "sessionId": session_id,
723 "update": {
724 "sessionUpdate": "call_start",
725 "content": {
726 "toolCallId": call_id,
727 "call_type": call_type,
728 "name": name,
729 "script": script,
730 "metadata": metadata,
731 },
732 },
733 }),
734 );
735 }
736
737 pub fn send_call_progress(
740 &self,
741 call_id: &str,
742 delta: &str,
743 accumulated_tokens: u64,
744 user_visible: bool,
745 ) {
746 let session_id = self.get_session_id();
747 let (visible_text, visible_delta) = {
748 let stream_publicly = self
749 .visible_call_streams
750 .lock()
751 .unwrap_or_else(|e| e.into_inner())
752 .get(call_id)
753 .copied()
754 .unwrap_or(true);
755 let mut states = self
756 .visible_call_states
757 .lock()
758 .unwrap_or_else(|e| e.into_inner());
759 let state = states.entry(call_id.to_string()).or_default();
760 state.push(delta, stream_publicly)
761 };
762 self.notify(
763 "session/update",
764 serde_json::json!({
765 "sessionId": session_id,
766 "update": {
767 "sessionUpdate": "call_progress",
768 "content": {
769 "toolCallId": call_id,
770 "delta": delta,
771 "accumulated_tokens": accumulated_tokens,
772 "visible_text": visible_text,
773 "visible_delta": visible_delta,
774 "user_visible": user_visible,
775 },
776 },
777 }),
778 );
779 }
780
781 pub fn send_call_end(
783 &self,
784 call_id: &str,
785 call_type: &str,
786 name: &str,
787 duration_ms: u64,
788 status: &str,
789 metadata: serde_json::Value,
790 ) {
791 let session_id = self.get_session_id();
792 let script = self.get_script_name();
793 self.visible_call_states
794 .lock()
795 .unwrap_or_else(|e| e.into_inner())
796 .remove(call_id);
797 self.visible_call_streams
798 .lock()
799 .unwrap_or_else(|e| e.into_inner())
800 .remove(call_id);
801 self.notify(
802 "session/update",
803 serde_json::json!({
804 "sessionId": session_id,
805 "update": {
806 "sessionUpdate": "call_end",
807 "content": {
808 "toolCallId": call_id,
809 "call_type": call_type,
810 "name": name,
811 "script": script,
812 "duration_ms": duration_ms,
813 "status": status,
814 "metadata": metadata,
815 },
816 },
817 }),
818 );
819 }
820
821 pub fn send_worker_update(
823 &self,
824 worker_id: &str,
825 worker_name: &str,
826 status: &str,
827 metadata: serde_json::Value,
828 audit: Option<&MutationSessionRecord>,
829 ) {
830 let session_id = self.get_session_id();
831 let script = self.get_script_name();
832 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
833 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
834 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
835 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
836 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
837 let lifecycle = serde_json::json!({
838 "event": status,
839 "worker_id": worker_id,
840 "worker_name": worker_name,
841 "started_at": started_at,
842 "finished_at": finished_at,
843 });
844 self.notify(
845 "session/update",
846 serde_json::json!({
847 "sessionId": session_id,
848 "update": {
849 "sessionUpdate": "worker_update",
850 "content": {
851 "worker_id": worker_id,
852 "worker_name": worker_name,
853 "status": status,
854 "script": script,
855 "started_at": started_at,
856 "finished_at": finished_at,
857 "snapshot_path": snapshot_path,
858 "run_id": run_id,
859 "run_path": run_path,
860 "lifecycle": lifecycle,
861 "audit": audit,
862 "metadata": metadata,
863 },
864 },
865 }),
866 );
867 }
868}
869
870pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
872 crate::stdlib::json_to_vm_value(val)
873}
874
875fn parse_host_tools_list_response(
876 result: serde_json::Value,
877) -> Result<Vec<serde_json::Value>, VmError> {
878 let tools = match result {
879 serde_json::Value::Array(items) => items,
880 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
881 map.get("result")
882 .and_then(|value| value.get("tools"))
883 .cloned()
884 }) {
885 Some(serde_json::Value::Array(items)) => items,
886 _ => {
887 return Err(VmError::Runtime(
888 "host/tools/list: host response must be an array or { tools: [...] }".into(),
889 ));
890 }
891 },
892 _ => {
893 return Err(VmError::Runtime(
894 "host/tools/list: unexpected response shape".into(),
895 ));
896 }
897 };
898
899 let mut normalized = Vec::with_capacity(tools.len());
900 for tool in tools {
901 let serde_json::Value::Object(map) = tool else {
902 return Err(VmError::Runtime(
903 "host/tools/list: every tool must be an object".into(),
904 ));
905 };
906 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
907 return Err(VmError::Runtime(
908 "host/tools/list: every tool must include a string `name`".into(),
909 ));
910 };
911 let description = map
912 .get("description")
913 .and_then(|value| value.as_str())
914 .or_else(|| {
915 map.get("short_description")
916 .and_then(|value| value.as_str())
917 })
918 .unwrap_or_default();
919 let schema = map
920 .get("schema")
921 .cloned()
922 .or_else(|| map.get("parameters").cloned())
923 .or_else(|| map.get("input_schema").cloned())
924 .unwrap_or(serde_json::Value::Null);
925 let deprecated = map
926 .get("deprecated")
927 .and_then(|value| value.as_bool())
928 .unwrap_or(false);
929 normalized.push(serde_json::json!({
930 "name": name,
931 "description": description,
932 "schema": schema,
933 "deprecated": deprecated,
934 }));
935 }
936 Ok(normalized)
937}
938
939#[cfg(test)]
940mod tests {
941 use super::*;
942
943 #[test]
944 fn test_json_rpc_request_format() {
945 let request = crate::jsonrpc::request(
946 1,
947 "llm_call",
948 serde_json::json!({
949 "prompt": "Hello",
950 "system": "Be helpful",
951 }),
952 );
953 let s = serde_json::to_string(&request).unwrap();
954 assert!(s.contains("\"jsonrpc\":\"2.0\""));
955 assert!(s.contains("\"id\":1"));
956 assert!(s.contains("\"method\":\"llm_call\""));
957 }
958
959 #[test]
960 fn test_json_rpc_notification_format() {
961 let notification =
962 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
963 let s = serde_json::to_string(¬ification).unwrap();
964 assert!(s.contains("\"method\":\"output\""));
965 assert!(!s.contains("\"id\""));
966 }
967
968 #[test]
969 fn test_json_rpc_error_response_parsing() {
970 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
971 assert!(response.get("error").is_some());
972 assert_eq!(
973 response["error"]["message"].as_str().unwrap(),
974 "Invalid request"
975 );
976 }
977
978 #[test]
979 fn test_json_rpc_success_response_parsing() {
980 let response = crate::jsonrpc::response(
981 1,
982 serde_json::json!({
983 "text": "Hello world",
984 "input_tokens": 10,
985 "output_tokens": 5,
986 }),
987 );
988 assert!(response.get("result").is_some());
989 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
990 }
991
992 #[test]
993 fn test_cancelled_flag() {
994 let cancelled = Arc::new(AtomicBool::new(false));
995 assert!(!cancelled.load(Ordering::SeqCst));
996 cancelled.store(true, Ordering::SeqCst);
997 assert!(cancelled.load(Ordering::SeqCst));
998 }
999
1000 #[test]
1001 fn queued_messages_are_filtered_by_delivery_mode() {
1002 let runtime = tokio::runtime::Builder::new_current_thread()
1003 .enable_all()
1004 .build()
1005 .unwrap();
1006 runtime.block_on(async {
1007 let bridge = HostBridge::from_parts(
1008 Arc::new(Mutex::new(HashMap::new())),
1009 Arc::new(AtomicBool::new(false)),
1010 Arc::new(std::sync::Mutex::new(())),
1011 1,
1012 );
1013 bridge
1014 .push_queued_user_message("first".to_string(), "finish_step")
1015 .await;
1016 bridge
1017 .push_queued_user_message("second".to_string(), "wait_for_completion")
1018 .await;
1019
1020 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1021 assert_eq!(finish_step.len(), 1);
1022 assert_eq!(finish_step[0].content, "first");
1023
1024 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1025 assert_eq!(turn_end.len(), 1);
1026 assert_eq!(turn_end[0].content, "second");
1027 });
1028 }
1029
1030 #[test]
1031 fn test_json_result_to_vm_value_string() {
1032 let val = serde_json::json!("hello");
1033 let vm_val = json_result_to_vm_value(&val);
1034 assert_eq!(vm_val.display(), "hello");
1035 }
1036
1037 #[test]
1038 fn test_json_result_to_vm_value_dict() {
1039 let val = serde_json::json!({"name": "test", "count": 42});
1040 let vm_val = json_result_to_vm_value(&val);
1041 let VmValue::Dict(d) = &vm_val else {
1042 unreachable!("Expected Dict, got {:?}", vm_val);
1043 };
1044 assert_eq!(d.get("name").unwrap().display(), "test");
1045 assert_eq!(d.get("count").unwrap().display(), "42");
1046 }
1047
1048 #[test]
1049 fn test_json_result_to_vm_value_null() {
1050 let val = serde_json::json!(null);
1051 let vm_val = json_result_to_vm_value(&val);
1052 assert!(matches!(vm_val, VmValue::Nil));
1053 }
1054
1055 #[test]
1056 fn test_json_result_to_vm_value_nested() {
1057 let val = serde_json::json!({
1058 "text": "response",
1059 "tool_calls": [
1060 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1061 ],
1062 "input_tokens": 100,
1063 "output_tokens": 50,
1064 });
1065 let vm_val = json_result_to_vm_value(&val);
1066 let VmValue::Dict(d) = &vm_val else {
1067 unreachable!("Expected Dict, got {:?}", vm_val);
1068 };
1069 assert_eq!(d.get("text").unwrap().display(), "response");
1070 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1071 unreachable!("Expected List for tool_calls");
1072 };
1073 assert_eq!(list.len(), 1);
1074 }
1075
1076 #[test]
1077 fn parse_host_tools_list_accepts_object_wrapper() {
1078 let tools = parse_host_tools_list_response(serde_json::json!({
1079 "tools": [
1080 {
1081 "name": "Read",
1082 "description": "Read a file",
1083 "schema": {"type": "object"},
1084 }
1085 ]
1086 }))
1087 .expect("tool list");
1088
1089 assert_eq!(tools.len(), 1);
1090 assert_eq!(tools[0]["name"], "Read");
1091 assert_eq!(tools[0]["deprecated"], false);
1092 }
1093
1094 #[test]
1095 fn parse_host_tools_list_accepts_compat_fields() {
1096 let tools = parse_host_tools_list_response(serde_json::json!({
1097 "result": {
1098 "tools": [
1099 {
1100 "name": "Edit",
1101 "short_description": "Apply an edit",
1102 "input_schema": {"type": "object"},
1103 "deprecated": true,
1104 }
1105 ]
1106 }
1107 }))
1108 .expect("tool list");
1109
1110 assert_eq!(tools[0]["description"], "Apply an edit");
1111 assert_eq!(tools[0]["schema"]["type"], "object");
1112 assert_eq!(tools[0]["deprecated"], true);
1113 }
1114
1115 #[test]
1116 fn parse_host_tools_list_requires_tool_names() {
1117 let err = parse_host_tools_list_response(serde_json::json!({
1118 "tools": [
1119 {"description": "missing name"}
1120 ]
1121 }))
1122 .expect_err("expected error");
1123 assert!(err
1124 .to_string()
1125 .contains("host/tools/list: every tool must include a string `name`"));
1126 }
1127
1128 #[test]
1129 fn test_timeout_duration() {
1130 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1131 }
1132}