use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::io::AsyncBufReadExt;
use tokio::sync::{oneshot, Mutex, Notify};
use harn_parser::diagnostic_codes::Code;
use crate::orchestration::MutationSessionRecord;
use crate::value::{ErrorCategory, VmClosure, VmError, VmValue};
use crate::visible_text::VisibleTextState;
use crate::vm::Vm;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
pub type HostBridgeWriter = Arc<dyn Fn(&str) -> Result<(), String> + Send + Sync>;
fn stdout_writer(stdout_lock: Arc<std::sync::Mutex<()>>) -> HostBridgeWriter {
Arc::new(move |line: &str| {
let _guard = stdout_lock.lock().unwrap_or_else(|e| e.into_inner());
let mut stdout = std::io::stdout().lock();
stdout
.write_all(line.as_bytes())
.map_err(|e| format!("Bridge write error: {e}"))?;
stdout
.write_all(b"\n")
.map_err(|e| format!("Bridge write error: {e}"))?;
stdout
.flush()
.map_err(|e| format!("Bridge flush error: {e}"))?;
Ok(())
})
}
pub struct HostBridge {
next_id: AtomicU64,
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
cancelled: Arc<AtomicBool>,
cancel_notify: Arc<Notify>,
writer: HostBridgeWriter,
session_id: std::sync::Mutex<String>,
script_name: std::sync::Mutex<String>,
queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>>,
resume_requested: Arc<AtomicBool>,
skills_reload_requested: Arc<AtomicBool>,
daemon_idle: Arc<AtomicBool>,
prompt_stop_reason: std::sync::Mutex<Option<String>>,
visible_call_states: std::sync::Mutex<HashMap<String, VisibleTextState>>,
visible_call_streams: std::sync::Mutex<HashMap<String, bool>>,
in_process: Option<InProcessHost>,
}
struct InProcessHost {
module_path: PathBuf,
exported_functions: BTreeMap<String, Rc<VmClosure>>,
vm: Vm,
}
impl InProcessHost {
async fn dispatch(
&self,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, VmError> {
match method {
"builtin_call" => {
let name = params
.get("name")
.and_then(|value| value.as_str())
.unwrap_or_default();
let args = params
.get("args")
.and_then(|value| value.as_array())
.cloned()
.unwrap_or_default()
.into_iter()
.map(|value| json_result_to_vm_value(&value))
.collect::<Vec<_>>();
self.invoke_export(name, &args).await
}
"host/tools/list" => self
.invoke_optional_export("host_tools_list", &[])
.await
.map(|value| value.unwrap_or_else(|| serde_json::json!({ "tools": [] }))),
"session/request_permission" => self.request_permission(params).await,
other => Err(VmError::Runtime(format!(
"playground host backend does not implement bridge method '{other}'"
))),
}
}
async fn invoke_export(
&self,
name: &str,
args: &[VmValue],
) -> Result<serde_json::Value, VmError> {
let Some(closure) = self.exported_functions.get(name) else {
return Err(VmError::Runtime(format!(
"Playground host is missing capability '{name}'. Define `pub fn {name}(...)` in {}",
self.module_path.display()
)));
};
let mut vm = self.vm.child_vm_for_host();
let result = vm.call_closure_pub(closure, args).await?;
Ok(crate::llm::vm_value_to_json(&result))
}
async fn invoke_optional_export(
&self,
name: &str,
args: &[VmValue],
) -> Result<Option<serde_json::Value>, VmError> {
if !self.exported_functions.contains_key(name) {
return Ok(None);
}
self.invoke_export(name, args).await.map(Some)
}
async fn request_permission(
&self,
params: serde_json::Value,
) -> Result<serde_json::Value, VmError> {
let Some(closure) = self.exported_functions.get("request_permission") else {
return Ok(serde_json::json!({ "granted": true }));
};
let tool_name = params
.get("toolCall")
.and_then(|tool_call| tool_call.get("toolName"))
.and_then(|value| value.as_str())
.unwrap_or_default();
let tool_args = params
.get("toolCall")
.and_then(|tool_call| tool_call.get("rawInput"))
.map(json_result_to_vm_value)
.unwrap_or(VmValue::Nil);
let full_payload = json_result_to_vm_value(¶ms);
let arg_count = closure.func.params.len();
let args = if arg_count >= 3 {
vec![
VmValue::String(Rc::from(tool_name.to_string())),
tool_args,
full_payload,
]
} else if arg_count == 2 {
vec![VmValue::String(Rc::from(tool_name.to_string())), tool_args]
} else if arg_count == 1 {
vec![full_payload]
} else {
Vec::new()
};
let mut vm = self.vm.child_vm_for_host();
let result = vm.call_closure_pub(closure, &args).await?;
let payload = match result {
VmValue::Bool(granted) => serde_json::json!({ "granted": granted }),
VmValue::String(reason) if !reason.is_empty() => {
serde_json::json!({ "granted": false, "reason": reason.to_string() })
}
other => {
let json = crate::llm::vm_value_to_json(&other);
if json
.get("granted")
.and_then(|value| value.as_bool())
.is_some()
|| json.get("outcome").is_some()
{
json
} else {
serde_json::json!({ "granted": other.is_truthy() })
}
}
};
Ok(payload)
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum QueuedUserMessageMode {
InterruptImmediate,
FinishStep,
WaitForCompletion,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DeliveryCheckpoint {
InterruptImmediate,
AfterCurrentOperation,
EndOfInteraction,
}
impl QueuedUserMessageMode {
fn from_str(value: &str) -> Self {
match value {
"interrupt_immediate" | "interrupt" => Self::InterruptImmediate,
"finish_step" | "after_current_operation" => Self::FinishStep,
_ => Self::WaitForCompletion,
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct QueuedUserMessage {
pub content: String,
pub mode: QueuedUserMessageMode,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct QueuedReminder {
pub reminder: crate::llm::helpers::SystemReminder,
pub mode: QueuedUserMessageMode,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum QueuedTranscriptInjection {
User(QueuedUserMessage),
Reminder(QueuedReminder),
}
impl QueuedTranscriptInjection {
fn mode(&self) -> QueuedUserMessageMode {
match self {
Self::User(message) => message.mode,
Self::Reminder(reminder) => reminder.mode,
}
}
}
fn queue_user_message_from_params(params: &serde_json::Value) -> Option<QueuedUserMessage> {
let content = params
.get("content")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if content.is_empty() {
return None;
}
let mode = QueuedUserMessageMode::from_str(
params
.get("mode")
.and_then(|v| v.as_str())
.unwrap_or("wait_for_completion"),
);
Some(QueuedUserMessage { content, mode })
}
fn reminder_unknown_option_error(message: impl AsRef<str>) -> String {
format!(
"{}: {}",
Code::ReminderUnknownOption.as_str(),
message.as_ref()
)
}
fn session_remind_shape_error(message: impl AsRef<str>) -> String {
format!(
"{}: {}",
Code::ReminderInvalidShape.as_str(),
message.as_ref()
)
}
fn string_field(
map: &serde_json::Map<String, serde_json::Value>,
key: &str,
required: bool,
) -> Result<Option<String>, String> {
match map.get(key) {
None | Some(serde_json::Value::Null) if required => Err(session_remind_shape_error(
format!("`{key}` must be a non-empty string"),
)),
None | Some(serde_json::Value::Null) => Ok(None),
Some(serde_json::Value::String(value)) if required && value.trim().is_empty() => Err(
session_remind_shape_error(format!("`{key}` must be a non-empty string")),
),
Some(serde_json::Value::String(value)) => {
let trimmed = value.trim();
if trimmed.is_empty() {
Ok(None)
} else {
Ok(Some(trimmed.to_string()))
}
}
Some(other) => Err(session_remind_shape_error(format!(
"`{key}` must be a string, got {other}"
))),
}
}
fn bool_field(
map: &serde_json::Map<String, serde_json::Value>,
key: &str,
) -> Result<Option<bool>, String> {
match map.get(key) {
None | Some(serde_json::Value::Null) => Ok(None),
Some(serde_json::Value::Bool(value)) => Ok(Some(*value)),
Some(other) => Err(session_remind_shape_error(format!(
"`{key}` must be a bool, got {other}"
))),
}
}
fn int_field(
map: &serde_json::Map<String, serde_json::Value>,
key: &str,
) -> Result<Option<i64>, String> {
match map.get(key) {
None | Some(serde_json::Value::Null) => Ok(None),
Some(serde_json::Value::Number(value)) => {
let Some(value) = value.as_i64() else {
return Err(session_remind_shape_error(format!(
"`{key}` must be an integer"
)));
};
Ok(Some(value))
}
Some(other) => Err(session_remind_shape_error(format!(
"`{key}` must be an int, got {other}"
))),
}
}
fn tags_field(map: &serde_json::Map<String, serde_json::Value>) -> Result<Vec<String>, String> {
let Some(value) = map.get("tags") else {
return Ok(Vec::new());
};
if value.is_null() {
return Ok(Vec::new());
}
let Some(values) = value.as_array() else {
return Err(session_remind_shape_error("`tags` must be a list"));
};
let mut tags = Vec::new();
for value in values {
let Some(tag) = value.as_str() else {
return Err(session_remind_shape_error(format!(
"`tags` entries must be strings, got {value}"
)));
};
let tag = tag.trim();
if tag.is_empty() {
return Err(session_remind_shape_error(
"`tags` entries must be non-empty strings",
));
}
if !tags.iter().any(|existing| existing == tag) {
tags.push(tag.to_string());
}
}
Ok(tags)
}
fn session_remind_payload_from_value(
value: &serde_json::Value,
) -> Result<crate::llm::helpers::SystemReminder, String> {
let Some(map) = value.as_object() else {
return Err(session_remind_shape_error(
"session/remind payload must be a reminder object",
));
};
const ALLOWED: &[&str] = &[
"_meta",
"body",
"dedupe_key",
"fired_at_turn",
"id",
"preserve_on_compact",
"propagate",
"role_hint",
"source",
"tags",
"ttl_turns",
];
let unknown = map
.keys()
.filter(|key| !ALLOWED.contains(&key.as_str()))
.map(String::as_str)
.collect::<Vec<_>>();
if !unknown.is_empty() {
if unknown.contains(&"content") {
return Err(session_remind_shape_error(
"session/remind expects reminder `body`, not user-message `content`",
));
}
return Err(reminder_unknown_option_error(format!(
"unknown reminder option(s): {}",
unknown.join(", ")
)));
}
if let Some(meta) = map.get("_meta") {
if !meta.is_null() && !meta.is_object() {
return Err(session_remind_shape_error("`_meta` must be an object"));
}
}
let ttl_turns = int_field(map, "ttl_turns")?;
if let Some(value) = ttl_turns {
if value <= 0 {
return Err(session_remind_shape_error("`ttl_turns` must be > 0"));
}
}
let fired_at_turn = int_field(map, "fired_at_turn")?.unwrap_or(0);
if fired_at_turn < 0 {
return Err(session_remind_shape_error(
"`fired_at_turn` must be >= 0 when provided",
));
}
match string_field(map, "source", false)?.as_deref() {
None | Some("bridge") => {}
Some(_) => {
return Err(session_remind_shape_error(
"`source` for session/remind must be bridge when provided",
))
}
}
let propagate = match string_field(map, "propagate", false)?.as_deref() {
None => crate::llm::helpers::ReminderPropagate::Session,
Some("all") => crate::llm::helpers::ReminderPropagate::All,
Some("session") => crate::llm::helpers::ReminderPropagate::Session,
Some("none") => crate::llm::helpers::ReminderPropagate::None,
Some(_) => {
return Err(session_remind_shape_error(
"`propagate` must be one of all, session, or none",
))
}
};
let role_hint = match string_field(map, "role_hint", false)?.as_deref() {
None => crate::llm::helpers::ReminderRoleHint::System,
Some("system") => crate::llm::helpers::ReminderRoleHint::System,
Some("developer") => crate::llm::helpers::ReminderRoleHint::Developer,
Some("user_block") => crate::llm::helpers::ReminderRoleHint::UserBlock,
Some("ephemeral_cache") => crate::llm::helpers::ReminderRoleHint::EphemeralCache,
Some(_) => {
return Err(session_remind_shape_error(
"`role_hint` must be one of system, developer, user_block, or ephemeral_cache",
))
}
};
Ok(crate::llm::helpers::SystemReminder {
id: string_field(map, "id", false)?.unwrap_or_else(|| uuid::Uuid::now_v7().to_string()),
tags: tags_field(map)?,
dedupe_key: string_field(map, "dedupe_key", false)?,
ttl_turns,
preserve_on_compact: bool_field(map, "preserve_on_compact")?.unwrap_or(false),
propagate,
role_hint,
source: crate::llm::helpers::ReminderSource::Bridge,
body: string_field(map, "body", true)?.unwrap_or_default(),
fired_at_turn,
originating_agent_id: None,
})
}
fn queued_session_remind_from_params(params: &serde_json::Value) -> Result<QueuedReminder, String> {
let mode = QueuedUserMessageMode::from_str(
params
.get("mode")
.and_then(|value| value.as_str())
.unwrap_or("wait_for_completion"),
);
let reminder_value = if let Some(reminder) = params.get("reminder") {
reminder.clone()
} else {
let Some(params) = params.as_object() else {
return Err(session_remind_shape_error(
"session/remind params must be an object",
));
};
let mut reminder = params.clone();
reminder.remove("mode");
reminder.remove("sessionId");
reminder.remove("session_id");
serde_json::Value::Object(reminder)
};
Ok(QueuedReminder {
reminder: session_remind_payload_from_value(&reminder_value)?,
mode,
})
}
#[allow(clippy::new_without_default)]
impl HostBridge {
pub fn new() -> Self {
let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>> =
Arc::new(Mutex::new(HashMap::new()));
let cancelled = Arc::new(AtomicBool::new(false));
let cancel_notify = Arc::new(Notify::new());
let queued_transcript_injections: Arc<Mutex<VecDeque<QueuedTranscriptInjection>>> =
Arc::new(Mutex::new(VecDeque::new()));
let resume_requested = Arc::new(AtomicBool::new(false));
let skills_reload_requested = Arc::new(AtomicBool::new(false));
let daemon_idle = Arc::new(AtomicBool::new(false));
let pending_clone = pending.clone();
let cancelled_clone = cancelled.clone();
let cancel_notify_clone = cancel_notify.clone();
let queued_clone = queued_transcript_injections.clone();
let resume_clone = resume_requested.clone();
let skills_reload_clone = skills_reload_requested.clone();
tokio::task::spawn_local(async move {
let stdin = tokio::io::stdin();
let reader = tokio::io::BufReader::new(stdin);
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
let line = line.trim().to_string();
if line.is_empty() {
continue;
}
let msg: serde_json::Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
if msg.get("id").is_none() {
if let Some(method) = msg["method"].as_str() {
if method == "cancel" {
cancelled_clone.store(true, Ordering::SeqCst);
cancel_notify_clone.notify_waiters();
} else if method == "agent/resume" {
resume_clone.store(true, Ordering::SeqCst);
} else if method == "skills/update" {
skills_reload_clone.store(true, Ordering::SeqCst);
} else if method == "user_message"
|| method == "session/input"
|| method == "agent/user_message"
{
let params = &msg["params"];
if let Some(message) = queue_user_message_from_params(params) {
queued_clone
.lock()
.await
.push_back(QueuedTranscriptInjection::User(message));
}
} else if method == "session/remind" {
let params = &msg["params"];
if let Ok(reminder) = queued_session_remind_from_params(params) {
queued_clone
.lock()
.await
.push_back(QueuedTranscriptInjection::Reminder(reminder));
}
}
}
continue;
}
if let Some(id) = msg["id"].as_u64() {
let mut pending = pending_clone.lock().await;
if let Some(sender) = pending.remove(&id) {
let _ = sender.send(msg);
}
}
}
let mut pending = pending_clone.lock().await;
pending.clear();
});
Self {
next_id: AtomicU64::new(1),
pending,
cancelled,
cancel_notify,
writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
session_id: std::sync::Mutex::new(String::new()),
script_name: std::sync::Mutex::new(String::new()),
queued_transcript_injections,
resume_requested,
skills_reload_requested,
daemon_idle,
prompt_stop_reason: std::sync::Mutex::new(None),
visible_call_states: std::sync::Mutex::new(HashMap::new()),
visible_call_streams: std::sync::Mutex::new(HashMap::new()),
in_process: None,
}
}
pub fn from_parts(
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
cancelled: Arc<AtomicBool>,
stdout_lock: Arc<std::sync::Mutex<()>>,
start_id: u64,
) -> Self {
Self::from_parts_with_writer(pending, cancelled, stdout_writer(stdout_lock), start_id)
}
pub fn from_parts_with_writer(
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
cancelled: Arc<AtomicBool>,
writer: HostBridgeWriter,
start_id: u64,
) -> Self {
Self::from_parts_with_writer_and_cancel_notify(
pending,
cancelled,
Arc::new(Notify::new()),
writer,
start_id,
)
}
pub fn from_parts_with_writer_and_cancel_notify(
pending: Arc<Mutex<HashMap<u64, oneshot::Sender<serde_json::Value>>>>,
cancelled: Arc<AtomicBool>,
cancel_notify: Arc<Notify>,
writer: HostBridgeWriter,
start_id: u64,
) -> Self {
Self {
next_id: AtomicU64::new(start_id),
pending,
cancelled,
cancel_notify,
writer,
session_id: std::sync::Mutex::new(String::new()),
script_name: std::sync::Mutex::new(String::new()),
queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
resume_requested: Arc::new(AtomicBool::new(false)),
skills_reload_requested: Arc::new(AtomicBool::new(false)),
daemon_idle: Arc::new(AtomicBool::new(false)),
prompt_stop_reason: std::sync::Mutex::new(None),
visible_call_states: std::sync::Mutex::new(HashMap::new()),
visible_call_streams: std::sync::Mutex::new(HashMap::new()),
in_process: None,
}
}
pub async fn from_harn_module(mut vm: Vm, module_path: &Path) -> Result<Self, VmError> {
let exported_functions = vm.load_module_exports(module_path).await?;
Ok(Self {
next_id: AtomicU64::new(1),
pending: Arc::new(Mutex::new(HashMap::new())),
cancelled: Arc::new(AtomicBool::new(false)),
cancel_notify: Arc::new(Notify::new()),
writer: stdout_writer(Arc::new(std::sync::Mutex::new(()))),
session_id: std::sync::Mutex::new(String::new()),
script_name: std::sync::Mutex::new(String::new()),
queued_transcript_injections: Arc::new(Mutex::new(VecDeque::new())),
resume_requested: Arc::new(AtomicBool::new(false)),
skills_reload_requested: Arc::new(AtomicBool::new(false)),
daemon_idle: Arc::new(AtomicBool::new(false)),
prompt_stop_reason: std::sync::Mutex::new(None),
visible_call_states: std::sync::Mutex::new(HashMap::new()),
visible_call_streams: std::sync::Mutex::new(HashMap::new()),
in_process: Some(InProcessHost {
module_path: module_path.to_path_buf(),
exported_functions,
vm,
}),
})
}
pub fn set_session_id(&self, id: &str) {
*self.session_id.lock().unwrap_or_else(|e| e.into_inner()) = id.to_string();
}
pub fn set_script_name(&self, name: &str) {
*self.script_name.lock().unwrap_or_else(|e| e.into_inner()) = name.to_string();
}
fn get_script_name(&self) -> String {
self.script_name
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
pub fn get_session_id(&self) -> String {
self.session_id
.lock()
.unwrap_or_else(|e| e.into_inner())
.clone()
}
fn write_line(&self, line: &str) -> Result<(), VmError> {
(self.writer)(line).map_err(VmError::Runtime)
}
pub async fn call(
&self,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, VmError> {
if let Some(in_process) = &self.in_process {
return in_process.dispatch(method, params).await;
}
if self.is_cancelled() {
return Err(VmError::Runtime("Bridge: operation cancelled".into()));
}
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let cancel_wait = self.cancel_notify.notified();
tokio::pin!(cancel_wait);
let request = crate::jsonrpc::request(id, method, params);
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending.lock().await;
pending.insert(id, tx);
}
let line = serde_json::to_string(&request)
.map_err(|e| VmError::Runtime(format!("Bridge serialization error: {e}")))?;
if let Err(e) = self.write_line(&line) {
let mut pending = self.pending.lock().await;
pending.remove(&id);
return Err(e);
}
if self.is_cancelled() {
let mut pending = self.pending.lock().await;
pending.remove(&id);
return Err(VmError::Runtime("Bridge: operation cancelled".into()));
}
let response = tokio::select! {
result = rx => match result {
Ok(msg) => msg,
Err(_) => {
return Err(VmError::Runtime(
"Bridge: host closed connection before responding".into(),
));
}
},
_ = &mut cancel_wait => {
let mut pending = self.pending.lock().await;
pending.remove(&id);
return Err(VmError::Runtime("Bridge: operation cancelled".into()));
}
_ = tokio::time::sleep(DEFAULT_TIMEOUT) => {
let mut pending = self.pending.lock().await;
pending.remove(&id);
return Err(VmError::Runtime(format!(
"Bridge: host did not respond to '{method}' within {}s",
DEFAULT_TIMEOUT.as_secs()
)));
}
};
if let Some(error) = response.get("error") {
let message = error["message"].as_str().unwrap_or("Unknown host error");
let code = error["code"].as_i64().unwrap_or(-1);
if code == -32001 {
return Err(VmError::CategorizedError {
message: message.to_string(),
category: ErrorCategory::ToolRejected,
});
}
return Err(VmError::Runtime(format!("Host error ({code}): {message}")));
}
Ok(response["result"].clone())
}
pub fn notify(&self, method: &str, params: serde_json::Value) {
let notification = crate::jsonrpc::notification(method, params);
if self.in_process.is_some() {
return;
}
if let Ok(line) = serde_json::to_string(¬ification) {
let _ = self.write_line(&line);
}
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::SeqCst)
}
pub fn take_resume_signal(&self) -> bool {
self.resume_requested.swap(false, Ordering::SeqCst)
}
pub fn signal_resume(&self) {
self.resume_requested.store(true, Ordering::SeqCst);
}
pub fn set_daemon_idle(&self, idle: bool) {
self.daemon_idle.store(idle, Ordering::SeqCst);
}
pub fn is_daemon_idle(&self) -> bool {
self.daemon_idle.load(Ordering::SeqCst)
}
pub fn set_prompt_stop_reason(&self, reason: &str) {
*self
.prompt_stop_reason
.lock()
.unwrap_or_else(|e| e.into_inner()) = Some(reason.to_string());
}
pub fn take_prompt_stop_reason(&self) -> Option<String> {
self.prompt_stop_reason
.lock()
.unwrap_or_else(|e| e.into_inner())
.take()
}
pub fn take_skills_reload_signal(&self) -> bool {
self.skills_reload_requested.swap(false, Ordering::SeqCst)
}
pub fn signal_skills_reload(&self) {
self.skills_reload_requested.store(true, Ordering::SeqCst);
}
pub async fn list_host_skills(&self) -> Result<Vec<serde_json::Value>, VmError> {
let result = self.call("skills/list", serde_json::json!({})).await?;
match result {
serde_json::Value::Array(items) => Ok(items),
serde_json::Value::Object(map) => match map.get("skills") {
Some(serde_json::Value::Array(items)) => Ok(items.clone()),
_ => Err(VmError::Runtime(
"skills/list: host response must be an array or { skills: [...] }".into(),
)),
},
_ => Err(VmError::Runtime(
"skills/list: unexpected response shape".into(),
)),
}
}
pub async fn list_host_tools(&self) -> Result<Vec<serde_json::Value>, VmError> {
let result = self.call("host/tools/list", serde_json::json!({})).await?;
parse_host_tools_list_response(result)
}
pub async fn fetch_host_skill(&self, id: &str) -> Result<serde_json::Value, VmError> {
self.call("skills/fetch", serde_json::json!({ "id": id }))
.await
}
pub async fn push_queued_user_message(&self, content: String, mode: &str) {
self.queued_transcript_injections
.lock()
.await
.push_back(QueuedTranscriptInjection::User(QueuedUserMessage {
content,
mode: QueuedUserMessageMode::from_str(mode),
}));
}
pub async fn push_queued_session_remind_from_params(
&self,
params: &serde_json::Value,
) -> Result<String, String> {
let reminder = queued_session_remind_from_params(params)?;
let reminder_id = reminder.reminder.id.clone();
self.queued_transcript_injections
.lock()
.await
.push_back(QueuedTranscriptInjection::Reminder(reminder));
Ok(reminder_id)
}
pub async fn take_queued_user_messages(
&self,
include_interrupt_immediate: bool,
include_finish_step: bool,
include_wait_for_completion: bool,
) -> Vec<QueuedUserMessage> {
let mut queue = self.queued_transcript_injections.lock().await;
let mut selected = Vec::new();
let mut retained = VecDeque::new();
while let Some(injection) = queue.pop_front() {
let should_take = match injection.mode() {
QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
QueuedUserMessageMode::FinishStep => include_finish_step,
QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
};
match (should_take, injection) {
(true, QueuedTranscriptInjection::User(message)) => selected.push(message),
(_, injection) => retained.push_back(injection),
}
}
*queue = retained;
selected
}
pub async fn take_queued_transcript_injections(
&self,
include_interrupt_immediate: bool,
include_finish_step: bool,
include_wait_for_completion: bool,
) -> Vec<QueuedTranscriptInjection> {
let mut queue = self.queued_transcript_injections.lock().await;
let mut selected = Vec::new();
let mut retained = VecDeque::new();
while let Some(injection) = queue.pop_front() {
let should_take = match injection.mode() {
QueuedUserMessageMode::InterruptImmediate => include_interrupt_immediate,
QueuedUserMessageMode::FinishStep => include_finish_step,
QueuedUserMessageMode::WaitForCompletion => include_wait_for_completion,
};
if should_take {
selected.push(injection);
} else {
retained.push_back(injection);
}
}
*queue = retained;
selected
}
pub async fn take_queued_user_messages_for(
&self,
checkpoint: DeliveryCheckpoint,
) -> Vec<QueuedUserMessage> {
match checkpoint {
DeliveryCheckpoint::InterruptImmediate => {
self.take_queued_user_messages(true, false, false).await
}
DeliveryCheckpoint::AfterCurrentOperation => {
self.take_queued_user_messages(false, true, false).await
}
DeliveryCheckpoint::EndOfInteraction => {
self.take_queued_user_messages(false, false, true).await
}
}
}
pub async fn take_queued_transcript_injections_for(
&self,
checkpoint: DeliveryCheckpoint,
) -> Vec<QueuedTranscriptInjection> {
match checkpoint {
DeliveryCheckpoint::InterruptImmediate => {
self.take_queued_transcript_injections(true, false, false)
.await
}
DeliveryCheckpoint::AfterCurrentOperation => {
self.take_queued_transcript_injections(false, true, false)
.await
}
DeliveryCheckpoint::EndOfInteraction => {
self.take_queued_transcript_injections(false, false, true)
.await
}
}
}
pub fn send_output(&self, text: &str) {
self.notify("output", serde_json::json!({"text": text}));
}
pub fn send_progress(
&self,
phase: &str,
message: &str,
progress: Option<i64>,
total: Option<i64>,
data: Option<serde_json::Value>,
) {
let mut payload = serde_json::json!({"phase": phase, "message": message});
if let Some(p) = progress {
payload["progress"] = serde_json::json!(p);
}
if let Some(t) = total {
payload["total"] = serde_json::json!(t);
}
if let Some(d) = data {
payload["data"] = d;
}
self.notify("progress", payload);
}
pub fn send_log(&self, level: &str, message: &str, fields: Option<serde_json::Value>) {
let mut payload = serde_json::json!({"level": level, "message": message});
if let Some(f) = fields {
payload["fields"] = f;
}
self.notify("log", payload);
}
pub fn send_call_start(
&self,
call_id: &str,
call_type: &str,
name: &str,
metadata: serde_json::Value,
) {
let session_id = self.get_session_id();
let script = self.get_script_name();
let stream_publicly = metadata
.get("stream_publicly")
.and_then(|value| value.as_bool())
.unwrap_or(true);
self.visible_call_streams
.lock()
.unwrap_or_else(|e| e.into_inner())
.insert(call_id.to_string(), stream_publicly);
self.notify(
"session/update",
serde_json::json!({
"sessionId": session_id,
"update": {
"sessionUpdate": "call_start",
"content": {
"toolCallId": call_id,
"call_type": call_type,
"name": name,
"script": script,
"metadata": metadata,
},
},
}),
);
}
pub fn send_call_progress(
&self,
call_id: &str,
delta: &str,
accumulated_tokens: u64,
user_visible: bool,
) {
let session_id = self.get_session_id();
let (visible_text, visible_delta) = {
let stream_publicly = self
.visible_call_streams
.lock()
.unwrap_or_else(|e| e.into_inner())
.get(call_id)
.copied()
.unwrap_or(true);
let mut states = self
.visible_call_states
.lock()
.unwrap_or_else(|e| e.into_inner());
let state = states.entry(call_id.to_string()).or_default();
state.push(delta, stream_publicly)
};
self.notify(
"session/update",
serde_json::json!({
"sessionId": session_id,
"update": {
"sessionUpdate": "call_progress",
"content": {
"toolCallId": call_id,
"delta": delta,
"accumulated_tokens": accumulated_tokens,
"visible_text": visible_text,
"visible_delta": visible_delta,
"user_visible": user_visible,
},
},
}),
);
}
pub fn send_call_end(
&self,
call_id: &str,
call_type: &str,
name: &str,
duration_ms: u64,
status: &str,
metadata: serde_json::Value,
) {
let session_id = self.get_session_id();
let script = self.get_script_name();
self.visible_call_states
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(call_id);
self.visible_call_streams
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(call_id);
self.notify(
"session/update",
serde_json::json!({
"sessionId": session_id,
"update": {
"sessionUpdate": "call_end",
"content": {
"toolCallId": call_id,
"call_type": call_type,
"name": name,
"script": script,
"duration_ms": duration_ms,
"status": status,
"metadata": metadata,
},
},
}),
);
}
pub fn send_worker_update(
&self,
worker_id: &str,
worker_name: &str,
status: &str,
metadata: serde_json::Value,
audit: Option<&MutationSessionRecord>,
) {
let session_id = self.get_session_id();
let script = self.get_script_name();
let started_at = metadata.get("started_at").cloned().unwrap_or_default();
let finished_at = metadata.get("finished_at").cloned().unwrap_or_default();
let snapshot_path = metadata.get("snapshot_path").cloned().unwrap_or_default();
let run_id = metadata.get("child_run_id").cloned().unwrap_or_default();
let run_path = metadata.get("child_run_path").cloned().unwrap_or_default();
let lifecycle = serde_json::json!({
"event": status,
"worker_id": worker_id,
"worker_name": worker_name,
"started_at": started_at,
"finished_at": finished_at,
});
self.notify(
"session/update",
serde_json::json!({
"sessionId": session_id,
"update": {
"sessionUpdate": "worker_update",
"content": {
"worker_id": worker_id,
"worker_name": worker_name,
"status": status,
"script": script,
"started_at": started_at,
"finished_at": finished_at,
"snapshot_path": snapshot_path,
"run_id": run_id,
"run_path": run_path,
"lifecycle": lifecycle,
"audit": audit,
"metadata": metadata,
},
},
}),
);
}
}
pub fn json_result_to_vm_value(val: &serde_json::Value) -> VmValue {
crate::stdlib::json_to_vm_value(val)
}
fn parse_host_tools_list_response(
result: serde_json::Value,
) -> Result<Vec<serde_json::Value>, VmError> {
let tools = match result {
serde_json::Value::Array(items) => items,
serde_json::Value::Object(map) => match map.get("tools").cloned().or_else(|| {
map.get("result")
.and_then(|value| value.get("tools"))
.cloned()
}) {
Some(serde_json::Value::Array(items)) => items,
_ => {
return Err(VmError::Runtime(
"host/tools/list: host response must be an array or { tools: [...] }".into(),
));
}
},
_ => {
return Err(VmError::Runtime(
"host/tools/list: unexpected response shape".into(),
));
}
};
let mut normalized = Vec::with_capacity(tools.len());
for tool in tools {
let serde_json::Value::Object(map) = tool else {
return Err(VmError::Runtime(
"host/tools/list: every tool must be an object".into(),
));
};
let Some(name) = map.get("name").and_then(|value| value.as_str()) else {
return Err(VmError::Runtime(
"host/tools/list: every tool must include a string `name`".into(),
));
};
let description = map
.get("description")
.and_then(|value| value.as_str())
.or_else(|| {
map.get("short_description")
.and_then(|value| value.as_str())
})
.unwrap_or_default();
let schema = map
.get("schema")
.cloned()
.or_else(|| map.get("parameters").cloned())
.or_else(|| map.get("input_schema").cloned())
.unwrap_or(serde_json::Value::Null);
let deprecated = map
.get("deprecated")
.and_then(|value| value.as_bool())
.unwrap_or(false);
normalized.push(serde_json::json!({
"name": name,
"description": description,
"schema": schema,
"deprecated": deprecated,
}));
}
Ok(normalized)
}
#[cfg(test)]
mod tests {
use super::*;
fn test_bridge() -> HostBridge {
HostBridge::from_parts(
Arc::new(Mutex::new(HashMap::new())),
Arc::new(AtomicBool::new(false)),
Arc::new(std::sync::Mutex::new(())),
1,
)
}
#[test]
fn test_json_rpc_request_format() {
let request = crate::jsonrpc::request(
1,
"llm_call",
serde_json::json!({
"prompt": "Hello",
"system": "Be helpful",
}),
);
let s = serde_json::to_string(&request).unwrap();
assert!(s.contains("\"jsonrpc\":\"2.0\""));
assert!(s.contains("\"id\":1"));
assert!(s.contains("\"method\":\"llm_call\""));
}
#[test]
fn test_json_rpc_notification_format() {
let notification =
crate::jsonrpc::notification("output", serde_json::json!({"text": "[harn] hello\n"}));
let s = serde_json::to_string(¬ification).unwrap();
assert!(s.contains("\"method\":\"output\""));
assert!(!s.contains("\"id\""));
}
#[test]
fn test_json_rpc_error_response_parsing() {
let response = crate::jsonrpc::error_response(1, -32600, "Invalid request");
assert!(response.get("error").is_some());
assert_eq!(
response["error"]["message"].as_str().unwrap(),
"Invalid request"
);
}
#[test]
fn test_json_rpc_success_response_parsing() {
let response = crate::jsonrpc::response(
1,
serde_json::json!({
"text": "Hello world",
"input_tokens": 10,
"output_tokens": 5,
}),
);
assert!(response.get("result").is_some());
assert_eq!(response["result"]["text"].as_str().unwrap(), "Hello world");
}
#[test]
fn test_cancelled_flag() {
let cancelled = Arc::new(AtomicBool::new(false));
assert!(!cancelled.load(Ordering::SeqCst));
cancelled.store(true, Ordering::SeqCst);
assert!(cancelled.load(Ordering::SeqCst));
}
#[test]
fn pending_host_calls_return_when_cancellation_arrives() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let pending = Arc::new(Mutex::new(HashMap::new()));
let cancelled = Arc::new(AtomicBool::new(false));
let bridge = HostBridge::from_parts_with_writer(
pending.clone(),
cancelled.clone(),
Arc::new(|_| Ok(())),
1,
);
let call = bridge.call("host/work", serde_json::json!({}));
tokio::pin!(call);
loop {
tokio::select! {
result = &mut call => panic!("call completed before cancellation: {result:?}"),
_ = tokio::task::yield_now() => {}
}
if !pending.lock().await.is_empty() {
break;
}
}
cancelled.store(true, Ordering::SeqCst);
bridge.cancel_notify.notify_waiters();
let result = tokio::time::timeout(Duration::from_secs(1), call)
.await
.expect("pending call should observe cancellation promptly");
assert!(
matches!(result, Err(VmError::Runtime(message)) if message.contains("cancelled"))
);
assert!(pending.lock().await.is_empty());
});
}
#[test]
fn queued_messages_are_filtered_by_delivery_mode() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let bridge = test_bridge();
bridge
.push_queued_user_message("first".to_string(), "finish_step")
.await;
bridge
.push_queued_user_message("second".to_string(), "wait_for_completion")
.await;
let finish_step = bridge.take_queued_user_messages(false, true, false).await;
assert_eq!(finish_step.len(), 1);
assert_eq!(finish_step[0].content, "first");
let turn_end = bridge.take_queued_user_messages(false, false, true).await;
assert_eq!(turn_end.len(), 1);
assert_eq!(turn_end[0].content, "second");
});
}
#[test]
fn queued_transcript_injections_preserve_user_reminder_separation() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let bridge = test_bridge();
bridge
.push_queued_user_message("human follow-up".to_string(), "finish_step")
.await;
let reminder_id = bridge
.push_queued_session_remind_from_params(&serde_json::json!({
"body": "Host-provided ambient context.",
"tags": ["host"],
"dedupe_key": "host-context",
"ttl_turns": 2,
"mode": "wait_for_completion",
"_meta": {"harn": {"source": "test"}},
}))
.await
.expect("valid reminder");
let finish_step = bridge.take_queued_user_messages(false, true, false).await;
assert_eq!(finish_step.len(), 1);
assert_eq!(finish_step[0].content, "human follow-up");
let no_user_messages = bridge.take_queued_user_messages(false, false, true).await;
assert!(no_user_messages.is_empty());
let injections = bridge
.take_queued_transcript_injections_for(DeliveryCheckpoint::EndOfInteraction)
.await;
assert_eq!(injections.len(), 1);
let QueuedTranscriptInjection::Reminder(reminder) = &injections[0] else {
panic!("expected queued reminder");
};
assert_eq!(reminder.reminder.id, reminder_id);
assert_eq!(reminder.reminder.body, "Host-provided ambient context.");
assert_eq!(reminder.reminder.tags, vec!["host".to_string()]);
assert_eq!(
reminder.reminder.dedupe_key.as_deref(),
Some("host-context")
);
assert_eq!(reminder.reminder.ttl_turns, Some(2));
assert_eq!(
reminder.reminder.source,
crate::llm::helpers::ReminderSource::Bridge
);
});
}
#[test]
fn bridge_remind_modes_honor_delivery_checkpoints() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let cases = [
(
"interrupt_immediate",
DeliveryCheckpoint::InterruptImmediate,
DeliveryCheckpoint::AfterCurrentOperation,
),
(
"finish_step",
DeliveryCheckpoint::AfterCurrentOperation,
DeliveryCheckpoint::EndOfInteraction,
),
(
"wait_for_completion",
DeliveryCheckpoint::EndOfInteraction,
DeliveryCheckpoint::InterruptImmediate,
),
];
for (mode, expected_checkpoint, wrong_checkpoint) in cases {
let bridge = test_bridge();
bridge
.push_queued_session_remind_from_params(&serde_json::json!({
"body": format!("Reminder for {mode}"),
"mode": mode,
}))
.await
.expect("valid session/remind payload");
let premature = bridge
.take_queued_transcript_injections_for(wrong_checkpoint)
.await;
assert!(
premature.is_empty(),
"{mode} reminder must not be delivered at {wrong_checkpoint:?}"
);
let delivered = bridge
.take_queued_transcript_injections_for(expected_checkpoint)
.await;
assert_eq!(delivered.len(), 1, "{mode} reminder was not delivered");
let QueuedTranscriptInjection::Reminder(reminder) = &delivered[0] else {
panic!("expected reminder for {mode}");
};
assert_eq!(reminder.reminder.body, format!("Reminder for {mode}"));
}
});
}
#[test]
fn bridge_session_input_path_never_produces_reminder() {
let runtime = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
runtime.block_on(async {
let bridge = test_bridge();
bridge
.push_queued_user_message("still user input".to_string(), "finish_step")
.await;
let delivered = bridge
.take_queued_transcript_injections_for(DeliveryCheckpoint::AfterCurrentOperation)
.await;
assert_eq!(delivered.len(), 1);
let QueuedTranscriptInjection::User(message) = &delivered[0] else {
panic!("session/input queue path must produce a user message");
};
assert_eq!(message.content, "still user input");
});
}
#[test]
fn session_remind_validation_rejects_user_message_shape() {
let err = queued_session_remind_from_params(&serde_json::json!({
"content": "this is still a user message",
"mode": "interrupt_immediate",
}))
.expect_err("session/remind must require a reminder body");
assert!(err.contains(Code::ReminderInvalidShape.as_str()));
assert!(err.contains("body"));
}
#[test]
fn session_remind_validation_rejects_unknown_options_separately() {
let err = queued_session_remind_from_params(&serde_json::json!({
"body": "valid body",
"unknown_host_field": true,
}))
.expect_err("session/remind must reject unknown top-level fields");
assert!(err.contains(Code::ReminderUnknownOption.as_str()));
assert!(err.contains("unknown_host_field"));
}
#[test]
fn test_json_result_to_vm_value_string() {
let val = serde_json::json!("hello");
let vm_val = json_result_to_vm_value(&val);
assert_eq!(vm_val.display(), "hello");
}
#[test]
fn test_json_result_to_vm_value_dict() {
let val = serde_json::json!({"name": "test", "count": 42});
let vm_val = json_result_to_vm_value(&val);
let VmValue::Dict(d) = &vm_val else {
unreachable!("Expected Dict, got {:?}", vm_val);
};
assert_eq!(d.get("name").unwrap().display(), "test");
assert_eq!(d.get("count").unwrap().display(), "42");
}
#[test]
fn test_json_result_to_vm_value_null() {
let val = serde_json::json!(null);
let vm_val = json_result_to_vm_value(&val);
assert!(matches!(vm_val, VmValue::Nil));
}
#[test]
fn test_json_result_to_vm_value_nested() {
let val = serde_json::json!({
"text": "response",
"tool_calls": [
{"id": "tc_1", "name": "read_file", "arguments": {"path": "foo.rs"}}
],
"input_tokens": 100,
"output_tokens": 50,
});
let vm_val = json_result_to_vm_value(&val);
let VmValue::Dict(d) = &vm_val else {
unreachable!("Expected Dict, got {:?}", vm_val);
};
assert_eq!(d.get("text").unwrap().display(), "response");
let VmValue::List(list) = d.get("tool_calls").unwrap() else {
unreachable!("Expected List for tool_calls");
};
assert_eq!(list.len(), 1);
}
#[test]
fn parse_host_tools_list_accepts_object_wrapper() {
let tools = parse_host_tools_list_response(serde_json::json!({
"tools": [
{
"name": "Read",
"description": "Read a file",
"schema": {"type": "object"},
}
]
}))
.expect("tool list");
assert_eq!(tools.len(), 1);
assert_eq!(tools[0]["name"], "Read");
assert_eq!(tools[0]["deprecated"], false);
}
#[test]
fn parse_host_tools_list_accepts_compat_fields() {
let tools = parse_host_tools_list_response(serde_json::json!({
"result": {
"tools": [
{
"name": "Edit",
"short_description": "Apply an edit",
"input_schema": {"type": "object"},
"deprecated": true,
}
]
}
}))
.expect("tool list");
assert_eq!(tools[0]["description"], "Apply an edit");
assert_eq!(tools[0]["schema"]["type"], "object");
assert_eq!(tools[0]["deprecated"], true);
}
#[test]
fn parse_host_tools_list_requires_tool_names() {
let err = parse_host_tools_list_response(serde_json::json!({
"tools": [
{"description": "missing name"}
]
}))
.expect_err("expected error");
assert!(err
.to_string()
.contains("host/tools/list: every tool must include a string `name`"));
}
#[test]
fn test_timeout_duration() {
assert_eq!(DEFAULT_TIMEOUT.as_secs(), 300);
}
}