1use std::collections::{BTreeMap, HashMap, VecDeque};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10use std::rc::Rc;
11use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
12use std::sync::Arc;
13use std::time::Duration;
14
15use tokio::io::AsyncBufReadExt;
16use tokio::sync::{oneshot, Mutex};
17
18use crate::orchestration::MutationSessionRecord;
19use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
20use crate::visible_text::VisibleTextState;
21use crate::vm::Vm;
22
23const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
25
26pub struct HostBridge {
33 next_id: AtomicU64,
34 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
36 cancelled: Arc<AtomicBool>,
38 stdout_lock: Arc<std::sync::Mutex<()>>,
40 session_id: std::sync::Mutex<String>,
42 script_name: std::sync::Mutex<String>,
44 queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>>,
46 resume_requested: Arc<AtomicBool>,
48 skills_reload_requested: Arc<AtomicBool>,
53 visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
55 visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
57 in_process: Option<InProcessHost>,
59 #[cfg(test)]
60 recorded_notifications: Arc<std::sync::Mutex<Vec<serde_json::Value>>>,
61}
62
63struct InProcessHost {
64 module_path: PathBuf,
65 exported_functions: BTreeMap<String, Rc<VmClosure>>,
66 vm: Vm,
67}
68
69impl InProcessHost {
70 async fn dispatch(
71 &self,
72 method: &str,
73 params: serde_json::Value,
74 ) -> Result<serde_json::Value, VmError> {
75 match method {
76 "builtin_call" => {
77 let name = params
78 .get("name")
79 .and_then(|value| value.as_str())
80 .unwrap_or_default();
81 let args = params
82 .get("args")
83 .and_then(|value| value.as_array())
84 .cloned()
85 .unwrap_or_default()
86 .into_iter()
87 .map(|value| json_result_to_vm_value(&value))
88 .collect::<Vec<_>>();
89 self.invoke_export(name, &args).await
90 }
91 "session/request_permission" => self.request_permission(params).await,
92 other => Err(VmError::Runtime(format!(
93 "playground host backend does not implement bridge method '{other}'"
94 ))),
95 }
96 }
97
98 async fn invoke_export(
99 &self,
100 name: &str,
101 args: &[VmValue],
102 ) -> Result<serde_json::Value, VmError> {
103 let Some(closure) = self.exported_functions.get(name) else {
104 return Err(VmError::Runtime(format!(
105 "Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
106 self.module_path.display()
107 )));
108 };
109
110 let mut vm = self.vm.child_vm_for_host();
111 let result = vm.call_closure_pub(closure, args, &[]).await?;
112 Ok(crate::llm::vm_value_to_json(&result))
113 }
114
115 async fn request_permission(
116 &self,
117 params: serde_json::Value,
118 ) -> Result<serde_json::Value, VmError> {
119 let Some(closure) = self.exported_functions.get("request_permission") else {
120 return Ok(serde_json::json!({ "granted": true }));
121 };
122
123 let tool_name = params
124 .get("toolCall")
125 .and_then(|tool_call| tool_call.get("toolName"))
126 .and_then(|value| value.as_str())
127 .unwrap_or_default();
128 let tool_args = params
129 .get("toolCall")
130 .and_then(|tool_call| tool_call.get("rawInput"))
131 .map(json_result_to_vm_value)
132 .unwrap_or(VmValue::Nil);
133 let full_payload = json_result_to_vm_value(¶ms);
134
135 let arg_count = closure.func.params.len();
136 let args = if arg_count >= 3 {
137 vec![
138 VmValue::String(Rc::from(tool_name.to_string())),
139 tool_args,
140 full_payload,
141 ]
142 } else if arg_count == 2 {
143 vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
144 } else if arg_count == 1 {
145 vec![full_payload]
146 } else {
147 Vec::new()
148 };
149
150 let mut vm = self.vm.child_vm_for_host();
151 let result = vm.call_closure_pub(closure, &args, &[]).await?;
152 let payload = match result {
153 VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
154 VmValue::String(reason) if !reason.is_empty() => {
155 serde_json::json!({ "granted": false, "reason": reason.to_string() })
156 }
157 other => {
158 let json = crate::llm::vm_value_to_json(&other);
159 if json
160 .get("granted")
161 .and_then(|value| value.as_bool())
162 .is_some()
163 || json.get("outcome").is_some()
164 {
165 json
166 } else {
167 serde_json::json!({ "granted": other.is_truthy() })
168 }
169 }
170 };
171 Ok(payload)
172 }
173}
174
175#[derive(Clone, Debug, PartialEq, Eq)]
176pub enum QueuedUserMessageMode {
177 InterruptImmediate,
178 FinishStep,
179 WaitForCompletion,
180}
181
182#[derive(Clone, Copy, Debug, PartialEq, Eq)]
183pub enum DeliveryCheckpoint {
184 InterruptImmediate,
185 AfterCurrentOperation,
186 EndOfInteraction,
187}
188
189impl QueuedUserMessageMode {
190 fn from_str(value: &str) -> Self {
191 match value {
192 "interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
193 "finish_step" | "after_current_operation" => Self::FinishStep,
194 _ => Self::WaitForCompletion,
195 }
196 }
197}
198
199#[derive(Clone, Debug, PartialEq, Eq)]
200pub struct QueuedUserMessage {
201 pub content: String,
202 pub mode: QueuedUserMessageMode,
203}
204
205#[allow(clippy::new_without_default)]
207impl HostBridge {
208 pub fn new() -> Self {
213 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
214 Arc::new(Mutex::new(HashMap::new()));
215 let cancelled = Arc::new(AtomicBool::new(false));
216 let queued_user_messages: Arc<Mutex<VecDeque<QueuedUserMessage>>> =
217 Arc::new(Mutex::new(VecDeque::new()));
218 let resume_requested = Arc::new(AtomicBool::new(false));
219 let skills_reload_requested = Arc::new(AtomicBool::new(false));
220
221 let pending_clone = pending.clone();
223 let cancelled_clone = cancelled.clone();
224 let queued_clone = queued_user_messages.clone();
225 let resume_clone = resume_requested.clone();
226 let skills_reload_clone = skills_reload_requested.clone();
227 tokio::task::spawn_local(async move {
228 let stdin = tokio::io::stdin();
229 let reader = tokio::io::BufReader::new(stdin);
230 let mut lines = reader.lines();
231
232 while let Ok(Some(line)) = lines.next_line().await {
233 let line = line.trim().to_string();
234 if line.is_empty() {
235 continue;
236 }
237
238 let msg: serde_json::Value = match serde_json::from_str(&line) {
239 Ok(v) => v,
240 Err(_) => continue,
241 };
242
243 if msg.get("id").is_none() {
245 if let Some(method) = msg["method"].as_str() {
246 if method == "cancel" {
247 cancelled_clone.store(true, Ordering::SeqCst);
248 } else if method == "agent/resume" {
249 resume_clone.store(true, Ordering::SeqCst);
250 } else if method == "skills/update" {
251 skills_reload_clone.store(true, Ordering::SeqCst);
252 } else if method == "user_message"
253 || method == "session/input"
254 || method == "agent/user_message"
255 {
256 let params = &msg["params"];
257 let content = params
258 .get("content")
259 .and_then(|v| v.as_str())
260 .unwrap_or("")
261 .to_string();
262 if !content.is_empty() {
263 let mode = QueuedUserMessageMode::from_str(
264 params
265 .get("mode")
266 .and_then(|v| v.as_str())
267 .unwrap_or("wait_for_completion"),
268 );
269 queued_clone
270 .lock()
271 .await
272 .push_back(QueuedUserMessage { content, mode });
273 }
274 }
275 }
276 continue;
277 }
278
279 if let Some(id) = msg["id"].as_u64() {
280 let mut pending = pending_clone.lock().await;
281 if let Some(sender) = pending.remove(&id) {
282 let _ = sender.send(msg);
283 }
284 }
285 }
286
287 let mut pending = pending_clone.lock().await;
289 pending.clear();
290 });
291
292 Self {
293 next_id: AtomicU64::new(1),
294 pending,
295 cancelled,
296 stdout_lock: Arc::new(std::sync::Mutex::new(())),
297 session_id: std::sync::Mutex::new(String::new()),
298 script_name: std::sync::Mutex::new(String::new()),
299 queued_user_messages,
300 resume_requested,
301 skills_reload_requested,
302 visible_call_states: std::sync::Mutex::new(HashMap::new()),
303 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
304 in_process: None,
305 #[cfg(test)]
306 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
307 }
308 }
309
310 pub fn from_parts(
316 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
317 cancelled: Arc<AtomicBool>,
318 stdout_lock: Arc<std::sync::Mutex<()>>,
319 start_id: u64,
320 ) -> Self {
321 Self {
322 next_id: AtomicU64::new(start_id),
323 pending,
324 cancelled,
325 stdout_lock,
326 session_id: std::sync::Mutex::new(String::new()),
327 script_name: std::sync::Mutex::new(String::new()),
328 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
329 resume_requested: Arc::new(AtomicBool::new(false)),
330 skills_reload_requested: Arc::new(AtomicBool::new(false)),
331 visible_call_states: std::sync::Mutex::new(HashMap::new()),
332 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
333 in_process: None,
334 #[cfg(test)]
335 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
336 }
337 }
338
339 pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
342 let exported_functions = vm.load_module_exports(module_path).await?;
343 Ok(Self {
344 next_id: AtomicU64::new(1),
345 pending: Arc::new(Mutex::new(HashMap::new())),
346 cancelled: Arc::new(AtomicBool::new(false)),
347 stdout_lock: Arc::new(std::sync::Mutex::new(())),
348 session_id: std::sync::Mutex::new(String::new()),
349 script_name: std::sync::Mutex::new(String::new()),
350 queued_user_messages: Arc::new(Mutex::new(VecDeque::new())),
351 resume_requested: Arc::new(AtomicBool::new(false)),
352 skills_reload_requested: Arc::new(AtomicBool::new(false)),
353 visible_call_states: std::sync::Mutex::new(HashMap::new()),
354 visible_call_streams: std::sync::Mutex::new(HashMap::new()),
355 in_process: Some(InProcessHost {
356 module_path: module_path.to_path_buf(),
357 exported_functions,
358 vm,
359 }),
360 #[cfg(test)]
361 recorded_notifications: Arc::new(std::sync::Mutex::new(Vec::new())),
362 })
363 }
364
365 pub fn set_session_id(&self, id: &str) {
367 *self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
368 }
369
370 pub fn set_script_name(&self, name: &str) {
372 *self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
373 }
374
375 fn get_script_name(&self) -> String {
377 self.script_name
378 .lock()
379 .unwrap_or_else(|e| e.into_inner())
380 .clone()
381 }
382
383 fn get_session_id(&self) -> String {
385 self.session_id
386 .lock()
387 .unwrap_or_else(|e| e.into_inner())
388 .clone()
389 }
390
391 fn write_line(&self, line: &str) -> Result<(), VmError> {
393 let _guard = self.stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
394 let mut stdout = std::io::stdout().lock();
395 stdout
396 .write_all(line.as_bytes())
397 .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
398 stdout
399 .write_all(b"\n")
400 .map_err(|e| VmError::Runtime(format!("Bridge write error: {e}")))?;
401 stdout
402 .flush()
403 .map_err(|e| VmError::Runtime(format!("Bridge flush error: {e}")))?;
404 Ok(())
405 }
406
407 pub async fn call(
410 &self,
411 method: &str,
412 params: serde_json::Value,
413 ) -> Result<serde_json::Value, VmError> {
414 if let Some(in_process) = &self.in_process {
415 return in_process.dispatch(method, params).await;
416 }
417
418 if self.is_cancelled() {
419 return Err(VmError::Runtime("Bridge: operation cancelled".into()));
420 }
421
422 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
423
424 let request = crate::jsonrpc::request(id, method, params);
425
426 let (tx, rx) = oneshot::channel();
427 {
428 let mut pending = self.pending.lock().await;
429 pending.insert(id, tx);
430 }
431
432 let line = serde_json::to_string(&request)
433 .map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
434 if let Err(e) = self.write_line(&line) {
435 let mut pending = self.pending.lock().await;
436 pending.remove(&id);
437 return Err(e);
438 }
439
440 let response = match tokio::time::timeout(DEFAULT_TIMEOUT, rx).await {
441 Ok(Ok(msg)) => msg,
442 Ok(Err(_)) => {
443 return Err(VmError::Runtime(
445 "Bridge: host closed connection before responding".into(),
446 ));
447 }
448 Err(_) => {
449 let mut pending = self.pending.lock().await;
450 pending.remove(&id);
451 return Err(VmError::Runtime(format!(
452 "Bridge: host did not respond to '{method}' within {}s",
453 DEFAULT_TIMEOUT.as_secs()
454 )));
455 }
456 };
457
458 if let Some(error) = response.get("error") {
459 let message = error["message"].as_str().unwrap_or("Unknown host error");
460 let code = error["code"].as_i64().unwrap_or(-1);
461 if code == -32001 {
463 return Err(VmError::CategorizedError {
464 message: message.to_string(),
465 category: ErrorCategory::ToolRejected,
466 });
467 }
468 return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
469 }
470
471 Ok(response["result"].clone())
472 }
473
474 pub fn notify(&self, method: &str, params: serde_json::Value) {
477 let notification = crate::jsonrpc::notification(method, params);
478 #[cfg(test)]
479 self.recorded_notifications
480 .lock()
481 .unwrap_or_else(|e| e.into_inner())
482 .push(notification.clone());
483 if self.in_process.is_some() {
484 return;
485 }
486 if let Ok(line) = serde_json::to_string(¬ification) {
487 let _ = self.write_line(&line);
488 }
489 }
490
491 #[cfg(test)]
492 pub(crate) fn recorded_notifications(&self) -> Vec<serde_json::Value> {
493 self.recorded_notifications
494 .lock()
495 .unwrap_or_else(|e| e.into_inner())
496 .clone()
497 }
498
499 pub fn is_cancelled(&self) -> bool {
501 self.cancelled.load(Ordering::SeqCst)
502 }
503
504 pub fn take_resume_signal(&self) -> bool {
505 self.resume_requested.swap(false, Ordering::SeqCst)
506 }
507
508 pub fn signal_resume(&self) {
509 self.resume_requested.store(true, Ordering::SeqCst);
510 }
511
512 pub fn take_skills_reload_signal(&self) -> bool {
517 self.skills_reload_requested.swap(false, Ordering::SeqCst)
518 }
519
520 pub fn signal_skills_reload(&self) {
524 self.skills_reload_requested.store(true, Ordering::SeqCst);
525 }
526
527 pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
533 let result = self.call("skills/list", serde_json::json!({})).await?;
534 match result {
535 serde_json::Value::Array(items) => Ok(items),
536 serde_json::Value::Object(map) => match map.get("skills") {
537 Some(serde_json::Value::Array(items)) => Ok(items.clone()),
538 _ => Err(VmError::Runtime(
539 "skills/list: host response must be an array or { skills: [...] }".into(),
540 )),
541 },
542 _ => Err(VmError::Runtime(
543 "skills/list: unexpected response shape".into(),
544 )),
545 }
546 }
547
548 pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
552 self.call("skills/fetch", serde_json::json!({ "id": id }))
553 .await
554 }
555
556 pub async fn push_queued_user_message(&self, content: String, mode: &str) {
557 self.queued_user_messages
558 .lock()
559 .await
560 .push_back(QueuedUserMessage {
561 content,
562 mode: QueuedUserMessageMode::from_str(mode),
563 });
564 }
565
566 pub async fn take_queued_user_messages(
567 &self,
568 include_interrupt_immediate: bool,
569 include_finish_step: bool,
570 include_wait_for_completion: bool,
571 ) -> Vec<QueuedUserMessage> {
572 let mut queue = self.queued_user_messages.lock().await;
573 let mut selected = Vec::new();
574 let mut retained = VecDeque::new();
575 while let Some(message) = queue.pop_front() {
576 let should_take = match message.mode {
577 QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
578 QueuedUserMessageMode::FinishStep => include_finish_step,
579 QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
580 };
581 if should_take {
582 selected.push(message);
583 } else {
584 retained.push_back(message);
585 }
586 }
587 *queue = retained;
588 selected
589 }
590
591 pub async fn take_queued_user_messages_for(
592 &self,
593 checkpoint: DeliveryCheckpoint,
594 ) -> Vec<QueuedUserMessage> {
595 match checkpoint {
596 DeliveryCheckpoint::InterruptImmediate => {
597 self.take_queued_user_messages(true, false, false).await
598 }
599 DeliveryCheckpoint::AfterCurrentOperation => {
600 self.take_queued_user_messages(false, true, false).await
601 }
602 DeliveryCheckpoint::EndOfInteraction => {
603 self.take_queued_user_messages(false, false, true).await
604 }
605 }
606 }
607
608 pub fn send_output(&self, text: &str) {
610 self.notify("output", serde_json::json!({"text": text}));
611 }
612
613 pub fn send_progress(
615 &self,
616 phase: &str,
617 message: &str,
618 progress: Option<i64>,
619 total: Option<i64>,
620 data: Option<serde_json::Value>,
621 ) {
622 let mut payload = serde_json::json!({"phase": phase, "message": message});
623 if let Some(p) = progress {
624 payload["progress"] = serde_json::json!(p);
625 }
626 if let Some(t) = total {
627 payload["total"] = serde_json::json!(t);
628 }
629 if let Some(d) = data {
630 payload["data"] = d;
631 }
632 self.notify("progress", payload);
633 }
634
635 pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
637 let mut payload = serde_json::json!({"level": level, "message": message});
638 if let Some(f) = fields {
639 payload["fields"] = f;
640 }
641 self.notify("log", payload);
642 }
643
644 pub fn send_call_start(
647 &self,
648 call_id: &str,
649 call_type: &str,
650 name: &str,
651 metadata: serde_json::Value,
652 ) {
653 let session_id = self.get_session_id();
654 let script = self.get_script_name();
655 let stream_publicly = metadata
656 .get("stream_publicly")
657 .and_then(|value| value.as_bool())
658 .unwrap_or(true);
659 self.visible_call_streams
660 .lock()
661 .unwrap_or_else(|e| e.into_inner())
662 .insert(call_id.to_string(), stream_publicly);
663 self.notify(
664 "session/update",
665 serde_json::json!({
666 "sessionId": session_id,
667 "update": {
668 "sessionUpdate": "call_start",
669 "content": {
670 "toolCallId": call_id,
671 "call_type": call_type,
672 "name": name,
673 "script": script,
674 "metadata": metadata,
675 },
676 },
677 }),
678 );
679 }
680
681 pub fn send_call_progress(
684 &self,
685 call_id: &str,
686 delta: &str,
687 accumulated_tokens: u64,
688 user_visible: bool,
689 ) {
690 let session_id = self.get_session_id();
691 let (visible_text, visible_delta) = {
692 let stream_publicly = self
693 .visible_call_streams
694 .lock()
695 .unwrap_or_else(|e| e.into_inner())
696 .get(call_id)
697 .copied()
698 .unwrap_or(true);
699 let mut states = self
700 .visible_call_states
701 .lock()
702 .unwrap_or_else(|e| e.into_inner());
703 let state = states.entry(call_id.to_string()).or_default();
704 state.push(delta, stream_publicly)
705 };
706 self.notify(
707 "session/update",
708 serde_json::json!({
709 "sessionId": session_id,
710 "update": {
711 "sessionUpdate": "call_progress",
712 "content": {
713 "toolCallId": call_id,
714 "delta": delta,
715 "accumulated_tokens": accumulated_tokens,
716 "visible_text": visible_text,
717 "visible_delta": visible_delta,
718 "user_visible": user_visible,
719 },
720 },
721 }),
722 );
723 }
724
725 pub fn send_call_end(
727 &self,
728 call_id: &str,
729 call_type: &str,
730 name: &str,
731 duration_ms: u64,
732 status: &str,
733 metadata: serde_json::Value,
734 ) {
735 let session_id = self.get_session_id();
736 let script = self.get_script_name();
737 self.visible_call_states
738 .lock()
739 .unwrap_or_else(|e| e.into_inner())
740 .remove(call_id);
741 self.visible_call_streams
742 .lock()
743 .unwrap_or_else(|e| e.into_inner())
744 .remove(call_id);
745 self.notify(
746 "session/update",
747 serde_json::json!({
748 "sessionId": session_id,
749 "update": {
750 "sessionUpdate": "call_end",
751 "content": {
752 "toolCallId": call_id,
753 "call_type": call_type,
754 "name": name,
755 "script": script,
756 "duration_ms": duration_ms,
757 "status": status,
758 "metadata": metadata,
759 },
760 },
761 }),
762 );
763 }
764
765 pub fn send_worker_update(
767 &self,
768 worker_id: &str,
769 worker_name: &str,
770 status: &str,
771 metadata: serde_json::Value,
772 audit: Option<&MutationSessionRecord>,
773 ) {
774 let session_id = self.get_session_id();
775 let script = self.get_script_name();
776 let started_at = metadata.get("started_at").cloned().unwrap_or_default();
777 let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
778 let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
779 let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
780 let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
781 let lifecycle = serde_json::json!({
782 "event": status,
783 "worker_id": worker_id,
784 "worker_name": worker_name,
785 "started_at": started_at,
786 "finished_at": finished_at,
787 });
788 self.notify(
789 "session/update",
790 serde_json::json!({
791 "sessionId": session_id,
792 "update": {
793 "sessionUpdate": "worker_update",
794 "content": {
795 "worker_id": worker_id,
796 "worker_name": worker_name,
797 "status": status,
798 "script": script,
799 "started_at": started_at,
800 "finished_at": finished_at,
801 "snapshot_path": snapshot_path,
802 "run_id": run_id,
803 "run_path": run_path,
804 "lifecycle": lifecycle,
805 "audit": audit,
806 "metadata": metadata,
807 },
808 },
809 }),
810 );
811 }
812}
813
814pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
816 crate::stdlib::json_to_vm_value(val)
817}
818
819#[cfg(test)]
820mod tests {
821 use super::*;
822
823 #[test]
824 fn test_json_rpc_request_format() {
825 let request = crate::jsonrpc::request(
826 1,
827 "llm_call",
828 serde_json::json!({
829 "prompt": "Hello",
830 "system": "Be helpful",
831 }),
832 );
833 let s = serde_json::to_string(&request).unwrap();
834 assert!(s.contains("\"jsonrpc\":\"2.0\""));
835 assert!(s.contains("\"id\":1"));
836 assert!(s.contains("\"method\":\"llm_call\""));
837 }
838
839 #[test]
840 fn test_json_rpc_notification_format() {
841 let notification =
842 crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
843 let s = serde_json::to_string(¬ification).unwrap();
844 assert!(s.contains("\"method\":\"output\""));
845 assert!(!s.contains("\"id\""));
846 }
847
848 #[test]
849 fn test_json_rpc_error_response_parsing() {
850 let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
851 assert!(response.get("error").is_some());
852 assert_eq!(
853 response["error"]["message"].as_str().unwrap(),
854 "Invalid request"
855 );
856 }
857
858 #[test]
859 fn test_json_rpc_success_response_parsing() {
860 let response = crate::jsonrpc::response(
861 1,
862 serde_json::json!({
863 "text": "Hello world",
864 "input_tokens": 10,
865 "output_tokens": 5,
866 }),
867 );
868 assert!(response.get("result").is_some());
869 assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
870 }
871
872 #[test]
873 fn test_cancelled_flag() {
874 let cancelled = Arc::new(AtomicBool::new(false));
875 assert!(!cancelled.load(Ordering::SeqCst));
876 cancelled.store(true, Ordering::SeqCst);
877 assert!(cancelled.load(Ordering::SeqCst));
878 }
879
880 #[test]
881 fn queued_messages_are_filtered_by_delivery_mode() {
882 let runtime = tokio::runtime::Builder::new_current_thread()
883 .enable_all()
884 .build()
885 .unwrap();
886 runtime.block_on(async {
887 let bridge = HostBridge::from_parts(
888 Arc::new(Mutex::new(HashMap::new())),
889 Arc::new(AtomicBool::new(false)),
890 Arc::new(std::sync::Mutex::new(())),
891 1,
892 );
893 bridge
894 .push_queued_user_message("first".to_string(), "finish_step")
895 .await;
896 bridge
897 .push_queued_user_message("second".to_string(), "wait_for_completion")
898 .await;
899
900 let finish_step = bridge.take_queued_user_messages(false, true, false).await;
901 assert_eq!(finish_step.len(), 1);
902 assert_eq!(finish_step[0].content, "first");
903
904 let turn_end = bridge.take_queued_user_messages(false, false, true).await;
905 assert_eq!(turn_end.len(), 1);
906 assert_eq!(turn_end[0].content, "second");
907 });
908 }
909
910 #[test]
911 fn test_json_result_to_vm_value_string() {
912 let val = serde_json::json!("hello");
913 let vm_val = json_result_to_vm_value(&val);
914 assert_eq!(vm_val.display(), "hello");
915 }
916
917 #[test]
918 fn test_json_result_to_vm_value_dict() {
919 let val = serde_json::json!({"name": "test", "count": 42});
920 let vm_val = json_result_to_vm_value(&val);
921 let VmValue::Dict(d) = &vm_val else {
922 unreachable!("Expected Dict, got {:?}", vm_val);
923 };
924 assert_eq!(d.get("name").unwrap().display(), "test");
925 assert_eq!(d.get("count").unwrap().display(), "42");
926 }
927
928 #[test]
929 fn test_json_result_to_vm_value_null() {
930 let val = serde_json::json!(null);
931 let vm_val = json_result_to_vm_value(&val);
932 assert!(matches!(vm_val, VmValue::Nil));
933 }
934
935 #[test]
936 fn test_json_result_to_vm_value_nested() {
937 let val = serde_json::json!({
938 "text": "response",
939 "tool_calls": [
940 {"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
941 ],
942 "input_tokens": 100,
943 "output_tokens": 50,
944 });
945 let vm_val = json_result_to_vm_value(&val);
946 let VmValue::Dict(d) = &vm_val else {
947 unreachable!("Expected Dict, got {:?}", vm_val);
948 };
949 assert_eq!(d.get("text").unwrap().display(), "response");
950 let VmValue::List(list) = d.get("tool_calls").unwrap() else {
951 unreachable!("Expected List for tool_calls");
952 };
953 assert_eq!(list.len(), 1);
954 }
955
956 #[test]
957 fn test_timeout_duration() {
958 assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
959 }
960}