1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
27
28fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
29 Arc::new(move |line: &str| {
30 let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
31 let mut stdout = std::io::stdout().lock();
32 stdout
33 .write_all(line.as_bytes())
34 .map_err(|e| format!("Bridge write error: {e}"))?;
35 stdout
36 .write_all(b"\n")
37 .map_err(|e| format!("Bridge write error: {e}"))?;
38 stdout
39 .flush()
40 .map_err(|e| format!("Bridge flush error: {e}"))?;
41 Ok(())
42 })
43}
44
45pub struct HostBridge {
52 next_id: AtomicU64,
53 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
55 cancelled: Arc<AtomicBool>,
57 writer: HostBridgeWriter,
59 session_id: std::sync::Mutex<String>,
61 script_name: std::sync::Mutex<String>,
63 queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
65 resume_requested: Arc<AtomicBool>,
67 skills_reload_requested: Arc<AtomicBool>,
72 daemon_idle: Arc<AtomicBool>,
74 prompt_stop_reason: std::sync::Mutex<Option<String>>,
80 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
82 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
84 in_process: Option<InProcessHost>,
86}
87
88struct InProcessHost {
89 module_path: PathBuf,
90 exported_functions: BTreeMap<String, Rc<VmClosure>>,
91 vm: Vm,
92}
93
94impl InProcessHost {
95 async fn dispatch(
96 &self,
97 method: &str,
98 params: serde_json::Value,
99 ) -> Result<serde_json::Value, VmError> {
100 match method {
101 "builtin_call" => {
102 let name = params
103 .get("name")
104 .and_then(|value| value.as_str())
105 .unwrap_or_default();
106 let args = params
107 .get("args")
108 .and_then(|value| value.as_array())
109 .cloned()
110 .unwrap_or_default()
111 .into_iter()
112 .map(|value| json_result_to_vm_value(&value))
113 .collect::<Vec<_>>();
114 self.invoke_export(name, &args).await
115 }
116 "host/tools/list" => self
117 .invoke_optional_export("host_tools_list", &[])
118 .await
119 .map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
120 "session/request_permission" => self.request_permission(params).await,
121 other => Err(VmError::Runtime(format!(
122 "playground host backend does not implement bridge method '{other}'"
123 ))),
124 }
125 }
126
127 async fn invoke_export(
128 &self,
129 name: &str,
130 args: &[VmValue],
131 ) -> Result<serde_json::Value, VmError> {
132 let Some(closure) = self.exported_functions.get(name) else {
133 return Err(VmError::Runtime(format!(
134 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
135 self.module_path.display()
136 )));
137 };
138
139 let mut vm = self.vm.child_vm_for_host();
140 let result = vm.call_closure_pub(closure, args).await?;
141 Ok(crate::llm::vm_value_to_json(&result))
142 }
143
144 async fn invoke_optional_export(
145 &self,
146 name: &str,
147 args: &[VmValue],
148 ) -> Result<Option<serde_json::Value>, VmError> {
149 if !self.exported_functions.contains_key(name) {
150 return Ok(None);
151 }
152 self.invoke_export(name, args).await.map(Some)
153 }
154
155 async fn request_permission(
156 &self,
157 params: serde_json::Value,
158 ) -> Result<serde_json::Value, VmError> {
159 let Some(closure) = self.exported_functions.get("request_permission") else {
160 return Ok(serde_json::json!({ "granted": true }));
161 };
162
163 let tool_name = params
164 .get("toolCall")
165 .and_then(|tool_call| tool_call.get("toolName"))
166 .and_then(|value| value.as_str())
167 .unwrap_or_default();
168 let tool_args = params
169 .get("toolCall")
170 .and_then(|tool_call| tool_call.get("rawInput"))
171 .map(json_result_to_vm_value)
172 .unwrap_or(VmValue::Nil);
173 let full_payload = json_result_to_vm_value(¶ms);
174
175 let arg_count = closure.func.params.len();
176 let args = if arg_count >= 3 {
177 vec![
178 VmValue::String(Rc::from(tool_name.to_string())),
179 tool_args,
180 full_payload,
181 ]
182 } else if arg_count == 2 {
183 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
184 } else if arg_count == 1 {
185 vec![full_payload]
186 } else {
187 Vec::new()
188 };
189
190 let mut vm = self.vm.child_vm_for_host();
191 let result = vm.call_closure_pub(closure, &args).await?;
192 let payload = match result {
193 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
194 VmValue::String(reason) if !reason.is_empty() => {
195 serde_json::json!({ "granted": false, "reason": reason.to_string() })
196 }
197 other => {
198 let json = crate::llm::vm_value_to_json(&other);
199 if json
200 .get("granted")
201 .and_then(|value| value.as_bool())
202 .is_some()
203 || json.get("outcome").is_some()
204 {
205 json
206 } else {
207 serde_json::json!({ "granted": other.is_truthy() })
208 }
209 }
210 };
211 Ok(payload)
212 }
213}
214
215#[derive(Clone, Debug, PartialEq, Eq)]
216pub enum QueuedUserMessageMode {
217 InterruptImmediate,
218 FinishStep,
219 WaitForCompletion,
220}
221
222#[derive(Clone, Copy, Debug, PartialEq, Eq)]
223pub enum DeliveryCheckpoint {
224 InterruptImmediate,
225 AfterCurrentOperation,
226 EndOfInteraction,
227}
228
229impl QueuedUserMessageMode {
230 fn from_str(value: &str) -> Self {
231 match value {
232 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
233 "finish_step" | "after_current_operation" => Self::FinishStep,
234 _ => Self::WaitForCompletion,
235 }
236 }
237}
238
239#[derive(Clone, Debug, PartialEq, Eq)]
240pub struct QueuedUserMessage {
241 pub content: String,
242 pub mode: QueuedUserMessageMode,
243}
244
245#[allow(clippy::new_without_default)]
247impl HostBridge {
248 pub fn new() -> Self {
253 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
254 Arc::new(Mutex::new(HashMap::new()));
255 let cancelled = Arc::new(AtomicBool::new(false));
256 let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
257 Arc::new(Mutex::new(VecDeque::new()));
258 let resume_requested = Arc::new(AtomicBool::new(false));
259 let skills_reload_requested = Arc::new(AtomicBool::new(false));
260 let daemon_idle = Arc::new(AtomicBool::new(false));
261
262 let pending_clone = pending.clone();
264 let cancelled_clone = cancelled.clone();
265 let queued_clone = queued_user_messages.clone();
266 let resume_clone = resume_requested.clone();
267 let skills_reload_clone = skills_reload_requested.clone();
268 tokio::task::spawn_local(async move {
269 let stdin = tokio::io::stdin();
270 let reader = tokio::io::BufReader::new(stdin);
271 let mut lines = reader.lines();
272
273 while let Ok(Some(line)) = lines.next_line().await {
274 let line = line.trim().to_string();
275 if line.is_empty() {
276 continue;
277 }
278
279 let msg: serde_json::Value = match serde_json::from_str(&line) {
280 Ok(v) => v,
281 Err(_) => continue,
282 };
283
284 if msg.get("id").is_none() {
286 if let Some(method) = msg["method"].as_str() {
287 if method == "cancel" {
288 cancelled_clone.store(true, Ordering::SeqCst);
289 } else if method == "agent/resume" {
290 resume_clone.store(true, Ordering::SeqCst);
291 } else if method == "skills/update" {
292 skills_reload_clone.store(true, Ordering::SeqCst);
293 } else if method == "user_message"
294 || method == "session/input"
295 || method == "agent/user_message"
296 {
297 let params = &msg["params"];
298 let content = params
299 .get("content")
300 .and_then(|v| v.as_str())
301 .unwrap_or("")
302 .to_string();
303 if !content.is_empty() {
304 let mode = QueuedUserMessageMode::from_str(
305 params
306 .get("mode")
307 .and_then(|v| v.as_str())
308 .unwrap_or("wait_for_completion"),
309 );
310 queued_clone
311 .lock()
312 .await
313 .push_back(QueuedUserMessage { content, mode });
314 }
315 }
316 }
317 continue;
318 }
319
320 if let Some(id) = msg["id"].as_u64() {
321 let mut pending = pending_clone.lock().await;
322 if let Some(sender) = pending.remove(&id) {
323 let _ = sender.send(msg);
324 }
325 }
326 }
327
328 let mut pending = pending_clone.lock().await;
330 pending.clear();
331 });
332
333 Self {
334 next_id: AtomicU64::new(1),
335 pending,
336 cancelled,
337 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
338 session_id: std::sync::Mutex::new(String::new()),
339 script_name: std::sync::Mutex::new(String::new()),
340 queued_user_messages,
341 resume_requested,
342 skills_reload_requested,
343 daemon_idle,
344 prompt_stop_reason: std::sync::Mutex::new(None),
345 visible_call_states: std::sync::Mutex::new(HashMap::new()),
346 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
347 in_process: None,
348 }
349 }
350
351 pub fn from_parts(
357 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
358 cancelled: Arc<AtomicBool>,
359 stdout_lock: Arc<std::sync::Mutex<()>>,
360 start_id: u64,
361 ) -> Self {
362 Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
363 }
364
365 pub fn from_parts_with_writer(
366 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
367 cancelled: Arc<AtomicBool>,
368 writer: HostBridgeWriter,
369 start_id: u64,
370 ) -> Self {
371 Self {
372 next_id: AtomicU64::new(start_id),
373 pending,
374 cancelled,
375 writer,
376 session_id: std::sync::Mutex::new(String::new()),
377 script_name: std::sync::Mutex::new(String::new()),
378 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
379 resume_requested: Arc::new(AtomicBool::new(false)),
380 skills_reload_requested: Arc::new(AtomicBool::new(false)),
381 daemon_idle: Arc::new(AtomicBool::new(false)),
382 prompt_stop_reason: std::sync::Mutex::new(None),
383 visible_call_states: std::sync::Mutex::new(HashMap::new()),
384 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
385 in_process: None,
386 }
387 }
388
389 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
392 let exported_functions = vm.load_module_exports(module_path).await?;
393 Ok(Self {
394 next_id: AtomicU64::new(1),
395 pending: Arc::new(Mutex::new(HashMap::new())),
396 cancelled: Arc::new(AtomicBool::new(false)),
397 writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
398 session_id: std::sync::Mutex::new(String::new()),
399 script_name: std::sync::Mutex::new(String::new()),
400 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
401 resume_requested: Arc::new(AtomicBool::new(false)),
402 skills_reload_requested: Arc::new(AtomicBool::new(false)),
403 daemon_idle: Arc::new(AtomicBool::new(false)),
404 prompt_stop_reason: std::sync::Mutex::new(None),
405 visible_call_states: std::sync::Mutex::new(HashMap::new()),
406 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
407 in_process: Some(InProcessHost {
408 module_path: module_path.to_path_buf(),
409 exported_functions,
410 vm,
411 }),
412 })
413 }
414
415 pub fn set_session_id(&self, id: &str) {
417 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
418 }
419
420 pub fn set_script_name(&self, name: &str) {
422 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
423 }
424
425 fn get_script_name(&self) -> String {
427 self.script_name
428 .lock()
429 .unwrap_or_else(|e| e.into_inner())
430 .clone()
431 }
432
433 pub fn get_session_id(&self) -> String {
435 self.session_id
436 .lock()
437 .unwrap_or_else(|e| e.into_inner())
438 .clone()
439 }
440
441 fn write_line(&self, line: &str) -> Result<(), VmError> {
443 (self.writer)(line).map_err(VmError::Runtime)
444 }
445
446 pub async fn call(
449 &self,
450 method: &str,
451 params: serde_json::Value,
452 ) -> Result<serde_json::Value, VmError> {
453 if let Some(in_process) = &self.in_process {
454 return in_process.dispatch(method, params).await;
455 }
456
457 if self.is_cancelled() {
458 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
459 }
460
461 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
462
463 let request = crate::jsonrpc::request(id, method, params);
464
465 let (tx, rx) = oneshot::channel();
466 {
467 let mut pending = self.pending.lock().await;
468 pending.insert(id, tx);
469 }
470
471 let line = serde_json::to_string(&request)
472 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
473 if let Err(e) = self.write_line(&line) {
474 let mut pending = self.pending.lock().await;
475 pending.remove(&id);
476 return Err(e);
477 }
478
479 let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
480 Ok(Ok(msg)) => msg,
481 Ok(Err(_)) => {
482 return Err(VmError::Runtime(
484 "Bridge: host closed connection before responding".into(),
485 ));
486 }
487 Err(_) => {
488 let mut pending = self.pending.lock().await;
489 pending.remove(&id);
490 return Err(VmError::Runtime(format!(
491 "Bridge: host did not respond to '{method}' within {}s",
492 DEFAULT_TIMEOUT.as_secs()
493 )));
494 }
495 };
496
497 if let Some(error) = response.get("error") {
498 let message = error["message"].as_str().unwrap_or("Unknown host error");
499 let code = error["code"].as_i64().unwrap_or(-1);
500 if code == -32001 {
502 return Err(VmError::CategorizedError {
503 message: message.to_string(),
504 category: ErrorCategory::ToolRejected,
505 });
506 }
507 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
508 }
509
510 Ok(response["result"].clone())
511 }
512
513 pub fn notify(&self, method: &str, params: serde_json::Value) {
516 let notification = crate::jsonrpc::notification(method, params);
517 if self.in_process.is_some() {
518 return;
519 }
520 if let Ok(line) = serde_json::to_string(¬ification) {
521 let _ = self.write_line(&line);
522 }
523 }
524
525 pub fn is_cancelled(&self) -> bool {
527 self.cancelled.load(Ordering::SeqCst)
528 }
529
530 pub fn take_resume_signal(&self) -> bool {
531 self.resume_requested.swap(false, Ordering::SeqCst)
532 }
533
534 pub fn signal_resume(&self) {
535 self.resume_requested.store(true, Ordering::SeqCst);
536 }
537
538 pub fn set_daemon_idle(&self, idle: bool) {
539 self.daemon_idle.store(idle, Ordering::SeqCst);
540 }
541
542 pub fn is_daemon_idle(&self) -> bool {
543 self.daemon_idle.load(Ordering::SeqCst)
544 }
545
546 pub fn set_prompt_stop_reason(&self, reason: &str) {
551 *self
552 .prompt_stop_reason
553 .lock()
554 .unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
555 }
556
557 pub fn take_prompt_stop_reason(&self) -> Option<String> {
562 self.prompt_stop_reason
563 .lock()
564 .unwrap_or_else(|e| e.into_inner())
565 .take()
566 }
567
568 pub fn take_skills_reload_signal(&self) -> bool {
573 self.skills_reload_requested.swap(false, Ordering::SeqCst)
574 }
575
576 pub fn signal_skills_reload(&self) {
580 self.skills_reload_requested.store(true, Ordering::SeqCst);
581 }
582
583 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
589 let result = self.call("skills/list", serde_json::json!({})).await?;
590 match result {
591 serde_json::Value::Array(items) => Ok(items),
592 serde_json::Value::Object(map) => match map.get("skills") {
593 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
594 _ => Err(VmError::Runtime(
595 "skills/list: host response must be an array or { skills: [...] }".into(),
596 )),
597 },
598 _ => Err(VmError::Runtime(
599 "skills/list: unexpected response shape".into(),
600 )),
601 }
602 }
603
604 pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
610 let result = self.call("host/tools/list", serde_json::json!({})).await?;
611 parse_host_tools_list_response(result)
612 }
613
614 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
618 self.call("skills/fetch", serde_json::json!({ "id": id }))
619 .await
620 }
621
622 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
623 self.queued_user_messages
624 .lock()
625 .await
626 .push_back(QueuedUserMessage {
627 content,
628 mode: QueuedUserMessageMode::from_str(mode),
629 });
630 }
631
632 pub async fn take_queued_user_messages(
633 &self,
634 include_interrupt_immediate: bool,
635 include_finish_step: bool,
636 include_wait_for_completion: bool,
637 ) -> Vec<QueuedUserMessage> {
638 let mut queue = self.queued_user_messages.lock().await;
639 let mut selected = Vec::new();
640 let mut retained = VecDeque::new();
641 while let Some(message) = queue.pop_front() {
642 let should_take = match message.mode {
643 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
644 QueuedUserMessageMode::FinishStep => include_finish_step,
645 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
646 };
647 if should_take {
648 selected.push(message);
649 } else {
650 retained.push_back(message);
651 }
652 }
653 *queue = retained;
654 selected
655 }
656
657 pub async fn take_queued_user_messages_for(
658 &self,
659 checkpoint: DeliveryCheckpoint,
660 ) -> Vec<QueuedUserMessage> {
661 match checkpoint {
662 DeliveryCheckpoint::InterruptImmediate => {
663 self.take_queued_user_messages(true, false, false).await
664 }
665 DeliveryCheckpoint::AfterCurrentOperation => {
666 self.take_queued_user_messages(false, true, false).await
667 }
668 DeliveryCheckpoint::EndOfInteraction => {
669 self.take_queued_user_messages(false, false, true).await
670 }
671 }
672 }
673
674 pub fn send_output(&self, text: &str) {
676 self.notify("output", serde_json::json!({"text": text}));
677 }
678
679 pub fn send_progress(
681 &self,
682 phase: &str,
683 message: &str,
684 progress: Option<i64>,
685 total: Option<i64>,
686 data: Option<serde_json::Value>,
687 ) {
688 let mut payload = serde_json::json!({"phase": phase, "message": message});
689 if let Some(p) = progress {
690 payload["progress"] = serde_json::json!(p);
691 }
692 if let Some(t) = total {
693 payload["total"] = serde_json::json!(t);
694 }
695 if let Some(d) = data {
696 payload["data"] = d;
697 }
698 self.notify("progress", payload);
699 }
700
701 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
703 let mut payload = serde_json::json!({"level": level, "message": message});
704 if let Some(f) = fields {
705 payload["fields"] = f;
706 }
707 self.notify("log", payload);
708 }
709
710 pub fn send_call_start(
713 &self,
714 call_id: &str,
715 call_type: &str,
716 name: &str,
717 metadata: serde_json::Value,
718 ) {
719 let session_id = self.get_session_id();
720 let script = self.get_script_name();
721 let stream_publicly = metadata
722 .get("stream_publicly")
723 .and_then(|value| value.as_bool())
724 .unwrap_or(true);
725 self.visible_call_streams
726 .lock()
727 .unwrap_or_else(|e| e.into_inner())
728 .insert(call_id.to_string(), stream_publicly);
729 self.notify(
730 "session/update",
731 serde_json::json!({
732 "sessionId": session_id,
733 "update": {
734 "sessionUpdate": "call_start",
735 "content": {
736 "toolCallId": call_id,
737 "call_type": call_type,
738 "name": name,
739 "script": script,
740 "metadata": metadata,
741 },
742 },
743 }),
744 );
745 }
746
747 pub fn send_call_progress(
750 &self,
751 call_id: &str,
752 delta: &str,
753 accumulated_tokens: u64,
754 user_visible: bool,
755 ) {
756 let session_id = self.get_session_id();
757 let (visible_text, visible_delta) = {
758 let stream_publicly = self
759 .visible_call_streams
760 .lock()
761 .unwrap_or_else(|e| e.into_inner())
762 .get(call_id)
763 .copied()
764 .unwrap_or(true);
765 let mut states = self
766 .visible_call_states
767 .lock()
768 .unwrap_or_else(|e| e.into_inner());
769 let state = states.entry(call_id.to_string()).or_default();
770 state.push(delta, stream_publicly)
771 };
772 self.notify(
773 "session/update",
774 serde_json::json!({
775 "sessionId": session_id,
776 "update": {
777 "sessionUpdate": "call_progress",
778 "content": {
779 "toolCallId": call_id,
780 "delta": delta,
781 "accumulated_tokens": accumulated_tokens,
782 "visible_text": visible_text,
783 "visible_delta": visible_delta,
784 "user_visible": user_visible,
785 },
786 },
787 }),
788 );
789 }
790
791 pub fn send_call_end(
793 &self,
794 call_id: &str,
795 call_type: &str,
796 name: &str,
797 duration_ms: u64,
798 status: &str,
799 metadata: serde_json::Value,
800 ) {
801 let session_id = self.get_session_id();
802 let script = self.get_script_name();
803 self.visible_call_states
804 .lock()
805 .unwrap_or_else(|e| e.into_inner())
806 .remove(call_id);
807 self.visible_call_streams
808 .lock()
809 .unwrap_or_else(|e| e.into_inner())
810 .remove(call_id);
811 self.notify(
812 "session/update",
813 serde_json::json!({
814 "sessionId": session_id,
815 "update": {
816 "sessionUpdate": "call_end",
817 "content": {
818 "toolCallId": call_id,
819 "call_type": call_type,
820 "name": name,
821 "script": script,
822 "duration_ms": duration_ms,
823 "status": status,
824 "metadata": metadata,
825 },
826 },
827 }),
828 );
829 }
830
831 pub fn send_worker_update(
833 &self,
834 worker_id: &str,
835 worker_name: &str,
836 status: &str,
837 metadata: serde_json::Value,
838 audit: Option<&MutationSessionRecord>,
839 ) {
840 let session_id = self.get_session_id();
841 let script = self.get_script_name();
842 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
843 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
844 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
845 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
846 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
847 let lifecycle = serde_json::json!({
848 "event": status,
849 "worker_id": worker_id,
850 "worker_name": worker_name,
851 "started_at": started_at,
852 "finished_at": finished_at,
853 });
854 self.notify(
855 "session/update",
856 serde_json::json!({
857 "sessionId": session_id,
858 "update": {
859 "sessionUpdate": "worker_update",
860 "content": {
861 "worker_id": worker_id,
862 "worker_name": worker_name,
863 "status": status,
864 "script": script,
865 "started_at": started_at,
866 "finished_at": finished_at,
867 "snapshot_path": snapshot_path,
868 "run_id": run_id,
869 "run_path": run_path,
870 "lifecycle": lifecycle,
871 "audit": audit,
872 "metadata": metadata,
873 },
874 },
875 }),
876 );
877 }
878}
879
880pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
882 crate::stdlib::json_to_vm_value(val)
883}
884
885fn parse_host_tools_list_response(
886 result: serde_json::Value,
887) -> Result<Vec<serde_json::Value>, VmError> {
888 let tools = match result {
889 serde_json::Value::Array(items) => items,
890 serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
891 map.get("result")
892 .and_then(|value| value.get("tools"))
893 .cloned()
894 }) {
895 Some(serde_json::Value::Array(items)) => items,
896 _ => {
897 return Err(VmError::Runtime(
898 "host/tools/list: host response must be an array or { tools: [...] }".into(),
899 ));
900 }
901 },
902 _ => {
903 return Err(VmError::Runtime(
904 "host/tools/list: unexpected response shape".into(),
905 ));
906 }
907 };
908
909 let mut normalized = Vec::with_capacity(tools.len());
910 for tool in tools {
911 let serde_json::Value::Object(map) = tool else {
912 return Err(VmError::Runtime(
913 "host/tools/list: every tool must be an object".into(),
914 ));
915 };
916 let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
917 return Err(VmError::Runtime(
918 "host/tools/list: every tool must include a string `name`".into(),
919 ));
920 };
921 let description = map
922 .get("description")
923 .and_then(|value| value.as_str())
924 .or_else(|| {
925 map.get("short_description")
926 .and_then(|value| value.as_str())
927 })
928 .unwrap_or_default();
929 let schema = map
930 .get("schema")
931 .cloned()
932 .or_else(|| map.get("parameters").cloned())
933 .or_else(|| map.get("input_schema").cloned())
934 .unwrap_or(serde_json::Value::Null);
935 let deprecated = map
936 .get("deprecated")
937 .and_then(|value| value.as_bool())
938 .unwrap_or(false);
939 normalized.push(serde_json::json!({
940 "name": name,
941 "description": description,
942 "schema": schema,
943 "deprecated": deprecated,
944 }));
945 }
946 Ok(normalized)
947}
948
949#[cfg(test)]
950mod tests {
951 use super::*;
952
953 #[test]
954 fn test_json_rpc_request_format() {
955 let request = crate::jsonrpc::request(
956 1,
957 "llm_call",
958 serde_json::json!({
959 "prompt": "Hello",
960 "system": "Be helpful",
961 }),
962 );
963 let s = serde_json::to_string(&request).unwrap();
964 assert!(s.contains("\"jsonrpc\":\"2.0\""));
965 assert!(s.contains("\"id\":1"));
966 assert!(s.contains("\"method\":\"llm_call\""));
967 }
968
969 #[test]
970 fn test_json_rpc_notification_format() {
971 let notification =
972 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
973 let s = serde_json::to_string(¬ification).unwrap();
974 assert!(s.contains("\"method\":\"output\""));
975 assert!(!s.contains("\"id\""));
976 }
977
978 #[test]
979 fn test_json_rpc_error_response_parsing() {
980 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
981 assert!(response.get("error").is_some());
982 assert_eq!(
983 response["error"]["message"].as_str().unwrap(),
984 "Invalid request"
985 );
986 }
987
988 #[test]
989 fn test_json_rpc_success_response_parsing() {
990 let response = crate::jsonrpc::response(
991 1,
992 serde_json::json!({
993 "text": "Hello world",
994 "input_tokens": 10,
995 "output_tokens": 5,
996 }),
997 );
998 assert!(response.get("result").is_some());
999 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
1000 }
1001
1002 #[test]
1003 fn test_cancelled_flag() {
1004 let cancelled = Arc::new(AtomicBool::new(false));
1005 assert!(!cancelled.load(Ordering::SeqCst));
1006 cancelled.store(true, Ordering::SeqCst);
1007 assert!(cancelled.load(Ordering::SeqCst));
1008 }
1009
1010 #[test]
1011 fn queued_messages_are_filtered_by_delivery_mode() {
1012 let runtime = tokio::runtime::Builder::new_current_thread()
1013 .enable_all()
1014 .build()
1015 .unwrap();
1016 runtime.block_on(async {
1017 let bridge = HostBridge::from_parts(
1018 Arc::new(Mutex::new(HashMap::new())),
1019 Arc::new(AtomicBool::new(false)),
1020 Arc::new(std::sync::Mutex::new(())),
1021 1,
1022 );
1023 bridge
1024 .push_queued_user_message("first".to_string(), "finish_step")
1025 .await;
1026 bridge
1027 .push_queued_user_message("second".to_string(), "wait_for_completion")
1028 .await;
1029
1030 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
1031 assert_eq!(finish_step.len(), 1);
1032 assert_eq!(finish_step[0].content, "first");
1033
1034 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
1035 assert_eq!(turn_end.len(), 1);
1036 assert_eq!(turn_end[0].content, "second");
1037 });
1038 }
1039
1040 #[test]
1041 fn test_json_result_to_vm_value_string() {
1042 let val = serde_json::json!("hello");
1043 let vm_val = json_result_to_vm_value(&val);
1044 assert_eq!(vm_val.display(), "hello");
1045 }
1046
1047 #[test]
1048 fn test_json_result_to_vm_value_dict() {
1049 let val = serde_json::json!({"name": "test", "count": 42});
1050 let vm_val = json_result_to_vm_value(&val);
1051 let VmValue::Dict(d) = &vm_val else {
1052 unreachable!("Expected Dict, got {:?}", vm_val);
1053 };
1054 assert_eq!(d.get("name").unwrap().display(), "test");
1055 assert_eq!(d.get("count").unwrap().display(), "42");
1056 }
1057
1058 #[test]
1059 fn test_json_result_to_vm_value_null() {
1060 let val = serde_json::json!(null);
1061 let vm_val = json_result_to_vm_value(&val);
1062 assert!(matches!(vm_val, VmValue::Nil));
1063 }
1064
1065 #[test]
1066 fn test_json_result_to_vm_value_nested() {
1067 let val = serde_json::json!({
1068 "text": "response",
1069 "tool_calls": [
1070 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
1071 ],
1072 "input_tokens": 100,
1073 "output_tokens": 50,
1074 });
1075 let vm_val = json_result_to_vm_value(&val);
1076 let VmValue::Dict(d) = &vm_val else {
1077 unreachable!("Expected Dict, got {:?}", vm_val);
1078 };
1079 assert_eq!(d.get("text").unwrap().display(), "response");
1080 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
1081 unreachable!("Expected List for tool_calls");
1082 };
1083 assert_eq!(list.len(), 1);
1084 }
1085
1086 #[test]
1087 fn parse_host_tools_list_accepts_object_wrapper() {
1088 let tools = parse_host_tools_list_response(serde_json::json!({
1089 "tools": [
1090 {
1091 "name": "Read",
1092 "description": "Read a file",
1093 "schema": {"type": "object"},
1094 }
1095 ]
1096 }))
1097 .expect("tool list");
1098
1099 assert_eq!(tools.len(), 1);
1100 assert_eq!(tools[0]["name"], "Read");
1101 assert_eq!(tools[0]["deprecated"], false);
1102 }
1103
1104 #[test]
1105 fn parse_host_tools_list_accepts_compat_fields() {
1106 let tools = parse_host_tools_list_response(serde_json::json!({
1107 "result": {
1108 "tools": [
1109 {
1110 "name": "Edit",
1111 "short_description": "Apply an edit",
1112 "input_schema": {"type": "object"},
1113 "deprecated": true,
1114 }
1115 ]
1116 }
1117 }))
1118 .expect("tool list");
1119
1120 assert_eq!(tools[0]["description"], "Apply an edit");
1121 assert_eq!(tools[0]["schema"]["type"], "object");
1122 assert_eq!(tools[0]["deprecated"], true);
1123 }
1124
1125 #[test]
1126 fn parse_host_tools_list_requires_tool_names() {
1127 let err = parse_host_tools_list_response(serde_json::json!({
1128 "tools": [
1129 {"description": "missing name"}
1130 ]
1131 }))
1132 .expect_err("expected error");
1133 assert!(err
1134 .to_string()
1135 .contains("host/tools/list: every tool must include a string `name`"));
1136 }
1137
1138 #[test]
1139 fn test_timeout_duration() {
1140 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
1141 }
1142}