use std::time::{Duration, Instant};
use harn_vm::VmValue;
use crate::error::HostlibError;
use crate::tools::payload::{optional_string, optional_u64, require_dict_arg, require_string};
use crate::tools::response::ResponseBuilder;
pub(crate) const NAME: &str = "hostlib_tools_wait_command";
pub(crate) fn handle(args: &[VmValue]) -> Result<VmValue, HostlibError> {
let map = require_dict_arg(NAME, args)?;
let handle_id = require_string(NAME, &map, "handle_id")?;
let timeout_ms = optional_u64(NAME, &map, "timeout_ms")?.unwrap_or(0);
let session_id = optional_string(NAME, &map, "session_id")?
.or_else(harn_vm::current_agent_session_id)
.unwrap_or_default();
if let Some(result) = drain_matching_result(&session_id, &handle_id) {
return Ok(result);
}
if timeout_ms > 0 {
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
let _ = harn_vm::orchestration::agent_inbox::wait_sync(&session_id, remaining);
if let Some(result) = drain_matching_result(&session_id, &handle_id) {
return Ok(result);
}
}
}
Ok(ResponseBuilder::new()
.str("handle_id", handle_id)
.str("status", "running")
.bool("completed", false)
.bool("timed_out", false)
.build())
}
fn drain_matching_result(session_id: &str, handle_id: &str) -> Option<VmValue> {
let entries = harn_vm::orchestration::agent_inbox::drain(session_id);
let mut kept = Vec::new();
let mut selected = None;
for entry in entries {
let parsed = serde_json::from_str::<serde_json::Value>(&entry.content).ok();
let matches = entry.kind == "tool_result"
&& parsed
.as_ref()
.and_then(|value| value.get("handle_id"))
.and_then(serde_json::Value::as_str)
== Some(handle_id);
if matches && selected.is_none() {
if let Some(mut payload) = parsed {
if let Some(object) = payload.as_object_mut() {
object.insert(
"feedback_kind".to_string(),
serde_json::Value::String(entry.kind.clone()),
);
object
.entry("timed_out".to_string())
.or_insert(serde_json::Value::Bool(false));
}
selected = Some(harn_vm::json_to_vm_value(&payload));
continue;
}
}
kept.push(entry);
}
for entry in kept.into_iter().rev() {
harn_vm::orchestration::agent_inbox::requeue_front(entry);
}
selected
}
#[cfg(test)]
mod tests {
use super::*;
use harn_vm::orchestration::agent_inbox;
fn fresh_session_id() -> String {
format!("wait-cmd-test-{}", uuid::Uuid::now_v7())
}
fn result_for(handle_id: &str) -> String {
serde_json::json!({
"handle_id": handle_id,
"status": "completed",
"exit_code": 0,
})
.to_string()
}
fn wait_args(session_id: &str, handle_id: &str, timeout_ms: u64) -> Vec<VmValue> {
let request = serde_json::json!({
"session_id": session_id,
"handle_id": handle_id,
"timeout_ms": timeout_ms,
});
vec![harn_vm::json_to_vm_value(&request)]
}
fn field(value: &VmValue, key: &str) -> Option<String> {
let VmValue::Dict(map) = value else {
return None;
};
match map.get(key) {
Some(VmValue::String(s)) => Some(s.to_string()),
_ => None,
}
}
fn status_of(value: &VmValue) -> Option<String> {
field(value, "status")
}
fn handle_of(value: &VmValue) -> Option<String> {
field(value, "handle_id")
}
#[test]
fn wait_skips_foreign_wakeup_and_returns_own_result() {
let session = fresh_session_id();
agent_inbox::push(&session, "tool_result", &result_for("H2"), "test");
let push_session = session.clone();
let pusher = std::thread::spawn(move || {
std::thread::sleep(Duration::from_millis(50));
agent_inbox::push(&push_session, "tool_result", &result_for("H1"), "test");
});
let value = handle(&wait_args(&session, "H1", 5_000)).expect("handle ok");
pusher.join().expect("pusher join");
assert_eq!(
status_of(&value).as_deref(),
Some("completed"),
"wait must return H1's completed result, not falsely report running"
);
assert_eq!(handle_of(&value).as_deref(), Some("H1"));
let leftover = agent_inbox::drain(&session);
assert_eq!(leftover.len(), 1, "H2's completion must still be queued");
let parsed: serde_json::Value = serde_json::from_str(&leftover[0].content).expect("json");
assert_eq!(parsed.get("handle_id").and_then(|v| v.as_str()), Some("H2"));
}
#[test]
fn drain_selects_own_handle_past_foreign_head() {
let session = fresh_session_id();
agent_inbox::push(&session, "tool_result", &result_for("H2"), "test");
agent_inbox::push(&session, "tool_result", &result_for("H1"), "test");
let value = handle(&wait_args(&session, "H1", 5_000)).expect("handle ok");
assert_eq!(status_of(&value).as_deref(), Some("completed"));
assert_eq!(handle_of(&value).as_deref(), Some("H1"));
let leftover = agent_inbox::drain(&session);
assert_eq!(leftover.len(), 1);
let parsed: serde_json::Value = serde_json::from_str(&leftover[0].content).expect("json");
assert_eq!(parsed.get("handle_id").and_then(|v| v.as_str()), Some("H2"));
}
#[test]
fn zero_timeout_is_nonblocking_poll() {
let session = fresh_session_id();
agent_inbox::push(&session, "tool_result", &result_for("H2"), "test");
let value = handle(&wait_args(&session, "H1", 0)).expect("handle ok");
assert_eq!(status_of(&value).as_deref(), Some("running"));
assert_eq!(handle_of(&value).as_deref(), Some("H1"));
assert_eq!(agent_inbox::pending_count(&session), 1);
}
}