use std::time::Duration;
use crate::harness::{vm_string, HarnessKind, HarnessMode, VmHarness};
use crate::stdlib::io::{
prompt_user_value, read_line_legacy_value, read_line_structured_value, write_stderr,
write_stdout,
};
use crate::value::{ErrorCategory, VmError, VmValue};
impl crate::vm::Vm {
pub(super) async fn call_harness_method(
&mut self,
handle: &VmHarness,
method: &str,
args: &[VmValue],
) -> Result<VmValue, VmError> {
if let HarnessMode::Null(state) = handle.inner().mode() {
state.record_deny(handle.kind(), method, args);
return Err(VmError::CategorizedError {
message: format!("NullHarness denied {}::{method}", handle.kind().type_name()),
category: ErrorCategory::ToolRejected,
});
}
if matches!(handle.inner().mode(), HarnessMode::Mock(_)) {
return self.call_mock_harness_method(handle, method, args).await;
}
match handle.kind() {
HarnessKind::Root => self.call_harness_root_method(handle, method, args).await,
HarnessKind::Stdio => self.call_harness_stdio_method(handle, method, args),
HarnessKind::Clock => self.call_harness_clock_method(handle, method, args).await,
HarnessKind::System => self.call_harness_system_method(handle, method, args),
HarnessKind::Fs | HarnessKind::Env | HarnessKind::Random | HarnessKind::Net => {
Err(VmError::TypeError(format!(
"{}::{method} is not yet implemented — wired by the E4.2-E4.4 migration tickets",
handle.type_name(),
)))
}
}
}
fn call_harness_system_method(
&mut self,
handle: &VmHarness,
method: &str,
_args: &[VmValue],
) -> Result<VmValue, VmError> {
use crate::harness_system as sys;
let json = match method {
"cpu" => sys::cpu_snapshot(),
"memory" => sys::memory_snapshot(),
"gpus" | "gpu" => sys::gpus_snapshot(),
"temperature" => sys::temperature_snapshot(),
"platform" => sys::platform_snapshot(),
"processes" => sys::processes_snapshot(),
_ => return Err(method_unsupported(handle, method)),
};
Ok(crate::stdlib::json_to_vm_value(&json))
}
async fn call_harness_root_method(
&mut self,
handle: &VmHarness,
method: &str,
args: &[VmValue],
) -> Result<VmValue, VmError> {
match method {
"unsettled_state" => {
let snapshot = crate::orchestration::unsettled_state_snapshot_async().await;
Ok(crate::stdlib::json_to_vm_value(&snapshot.to_json()))
}
"is_empty" => {
let empty = match args.first() {
Some(state) => state_counts(state)?.is_empty(),
None => crate::orchestration::unsettled_state_snapshot_async()
.await
.is_empty(),
};
Ok(VmValue::Bool(empty))
}
"counts" => match args.first() {
Some(state) => Ok(crate::stdlib::json_to_vm_value(
&state_counts(state)?.to_json(),
)),
None => {
let snapshot = crate::orchestration::unsettled_state_snapshot_async().await;
Ok(crate::stdlib::json_to_vm_value(&snapshot.counts_json()))
}
},
"summary" => match args.first() {
Some(state) => Ok(VmValue::String(std::rc::Rc::from(
state_counts(state)?.summary().as_str(),
))),
None => {
let snapshot = crate::orchestration::unsettled_state_snapshot_async().await;
Ok(VmValue::String(std::rc::Rc::from(snapshot.summary())))
}
},
"resume_subagent" => {
let handle_arg = args.first().cloned().ok_or_else(|| {
VmError::TypeError("Harness.resume_subagent expects a handle".to_string())
})?;
if let Some(input) = args.get(1).cloned() {
match self
.call_named_builtin(
"__host_worker_resume",
vec![handle_arg.clone(), input.clone()],
)
.await
{
Ok(value) => Ok(value),
Err(error) if error.to_string().contains("not suspended") => {
self.call_named_builtin(
"__host_worker_send_input",
vec![handle_arg, input],
)
.await
}
Err(error) => Err(error),
}
} else {
self.call_named_builtin("__host_worker_resume", vec![handle_arg])
.await
}
}
"cancel_subagent" => {
let handle_arg = args.first().cloned().ok_or_else(|| {
VmError::TypeError("Harness.cancel_subagent expects a handle".to_string())
})?;
self.call_named_builtin("__host_worker_close", vec![handle_arg])
.await
}
"wait_for_any_settlement" => {
let snapshot = crate::orchestration::unsettled_state_snapshot_async().await;
let status = if snapshot.is_empty() {
"settled"
} else {
"unsettled"
};
Ok(crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": status,
"timed_out": !snapshot.is_empty(),
"state": snapshot.to_json(),
})))
}
"current_pipeline_id" => Ok(crate::orchestration::current_mutation_session()
.and_then(|session| session.run_id.or(Some(session.session_id)))
.map(|id| VmValue::String(std::rc::Rc::from(id)))
.unwrap_or(VmValue::Nil)),
"handoff_to" => Ok(record_handoff_envelope(args)),
"emit_audit" => Ok(record_emit_audit_with_hooks(args).await),
"acknowledge_trigger" => Ok(acknowledge_trigger(args).await),
"defer_trigger" => Ok(defer_trigger(args).await),
"acknowledge_handoff" => Ok(acknowledge_handoff(args)),
"finalize" => Ok(finalize_pipeline(args)),
"spawn_settlement_agent" => {
let _vm_context = crate::vm::install_async_builtin_child_vm(self.child_vm());
Ok(record_spawn_settlement_agent_with_hooks(args).await)
}
_ => Err(method_unsupported(handle, method)),
}
}
fn call_harness_stdio_method(
&mut self,
handle: &VmHarness,
method: &str,
args: &[VmValue],
) -> Result<VmValue, VmError> {
match method {
"println" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
write_stdout(&mut self.output, &format!("{msg}\n"));
Ok(VmValue::Nil)
}
"print" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
write_stdout(&mut self.output, &msg);
Ok(VmValue::Nil)
}
"eprintln" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
write_stderr(&format!("{msg}\n"));
Ok(VmValue::Nil)
}
"eprint" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
write_stderr(&msg);
Ok(VmValue::Nil)
}
"read_line" => {
if args.is_empty() {
Ok(read_line_legacy_value())
} else {
read_line_structured_value(args)
}
}
"prompt" => prompt_user_value(args, &mut self.output),
_ => Err(method_unsupported(handle, method)),
}
}
async fn call_harness_clock_method(
&mut self,
handle: &VmHarness,
method: &str,
args: &[VmValue],
) -> Result<VmValue, VmError> {
let clock = handle.inner().clock();
match method {
"now_ms" => Ok(VmValue::Int(crate::clock::now_wall_ms(clock.as_ref()))),
"timestamp" => Ok(VmValue::Float(
crate::clock::now_wall_ms(clock.as_ref()) as f64 / 1_000.0,
)),
"monotonic_ms" | "elapsed" => Ok(VmValue::Int(clock.monotonic_ms())),
"sleep_ms" => {
let ms = sleep_ms_arg(args)?;
if ms > 0 {
clock.sleep(Duration::from_millis(ms as u64)).await;
}
Ok(VmValue::Nil)
}
_ => Err(method_unsupported(handle, method)),
}
}
async fn call_mock_harness_method(
&mut self,
handle: &VmHarness,
method: &str,
args: &[VmValue],
) -> Result<VmValue, VmError> {
let HarnessMode::Mock(state) = handle.inner().mode() else {
unreachable!("mock dispatch is only called for mock harnesses");
};
state.record_call(handle.kind(), method, args);
match handle.kind() {
HarnessKind::Root => Err(method_unsupported(handle, method)),
HarnessKind::Stdio => match method {
"println" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
state.push_stdio(&format!("{msg}\n"));
Ok(VmValue::Nil)
}
"print" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
state.push_stdio(&msg);
Ok(VmValue::Nil)
}
"eprintln" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
state.push_stderr(&format!("{msg}\n"));
Ok(VmValue::Nil)
}
"eprint" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
state.push_stderr(&msg);
Ok(VmValue::Nil)
}
"read_line" => Ok(mock_read_line_value(state, args)),
"prompt" => {
let msg = args.first().map(|a| a.display()).unwrap_or_default();
state.push_stdio(&msg);
Ok(mock_read_line_value(state, &[]))
}
_ => Err(method_unsupported(handle, method)),
},
HarnessKind::Clock => {
let clock = handle.inner().clock();
match method {
"now_ms" => Ok(VmValue::Int(crate::clock::now_wall_ms(clock.as_ref()))),
"timestamp" => Ok(VmValue::Float(
crate::clock::now_wall_ms(clock.as_ref()) as f64 / 1_000.0,
)),
"monotonic_ms" | "elapsed" => Ok(VmValue::Int(clock.monotonic_ms())),
"sleep_ms" => {
let ms = sleep_ms_arg(args)?;
if ms > 0 {
state.advance_clock(Duration::from_millis(ms as u64));
}
Ok(VmValue::Nil)
}
_ => Err(method_unsupported(handle, method)),
}
}
HarnessKind::Fs => match method {
"read_file" | "read" => {
let path = string_arg(args, 0, "HarnessFs.read_file")?;
let bytes = state
.fs_read(path)
.ok_or_else(|| VmError::CategorizedError {
message: format!("MockHarness has no fs_read response for {path}"),
category: ErrorCategory::NotFound,
})?;
Ok(VmValue::Bytes(std::rc::Rc::new(bytes.to_vec())))
}
"read_text" => {
let path = string_arg(args, 0, "HarnessFs.read_text")?;
let bytes = state
.fs_read(path)
.ok_or_else(|| VmError::CategorizedError {
message: format!("MockHarness has no fs_read response for {path}"),
category: ErrorCategory::NotFound,
})?;
let text = std::str::from_utf8(bytes).map_err(|error| {
VmError::TypeError(format!("HarnessFs.read_text: {error}"))
})?;
Ok(vm_string(text))
}
"exists" => {
let path = string_arg(args, 0, "HarnessFs.exists")?;
Ok(VmValue::Bool(state.fs_read(path).is_some()))
}
_ => Err(method_unsupported(handle, method)),
},
HarnessKind::Env => match method {
"get" => {
let key = string_arg(args, 0, "HarnessEnv.get")?;
Ok(state.env_get(key).map(vm_string).unwrap_or(VmValue::Nil))
}
_ => Err(method_unsupported(handle, method)),
},
HarnessKind::Random => match method {
"u64" | "gen_u64" => state
.next_random_u64()
.map(|value| VmValue::Int(value.min(i64::MAX as u64) as i64))
.ok_or_else(|| VmError::CategorizedError {
message: "MockHarness has no random_u64 response".to_string(),
category: ErrorCategory::NotFound,
}),
_ => Err(method_unsupported(handle, method)),
},
HarnessKind::Net => match method {
"get" | "http_get" => {
let url = string_arg(args, 0, "HarnessNet.get")?;
Ok(state.net_get(url).map(vm_string).ok_or_else(|| {
VmError::CategorizedError {
message: format!("MockHarness has no net_get response for {url}"),
category: ErrorCategory::NotFound,
}
})?)
}
_ => Err(method_unsupported(handle, method)),
},
HarnessKind::System => {
let json = match method {
"cpu" => serde_json::json!({
"count": 1,
"physical_count": 1,
"model": "mock-cpu",
"frequency_mhz": 0u64,
"usage_pct": 0.0,
}),
"memory" => serde_json::json!({
"total_bytes": 0u64,
"used_bytes": 0u64,
"available_bytes": 0u64,
"total_gb": 0.0,
"used_gb": 0.0,
"available_gb": 0.0,
"pressure": "unknown",
}),
"gpus" | "gpu" => serde_json::Value::Array(Vec::new()),
"temperature" => serde_json::json!({"components": []}),
"platform" => serde_json::json!({
"os": "mock",
"arch": "mock",
"version": "mock",
"kernel": "mock",
"long_os_version": "mock",
"hostname": "mock",
}),
"processes" => serde_json::Value::Array(vec![serde_json::json!({
"pid": 1,
"parent_pid": serde_json::Value::Null,
"name": "harn",
"cpu_pct": 0.0,
"mem_bytes": 0u64,
"is_harn_owned": true,
"is_self": true,
})]),
_ => return Err(method_unsupported(handle, method)),
};
Ok(crate::stdlib::json_to_vm_value(&json))
}
}
}
}
#[derive(Clone, Copy)]
struct UnsettledCounts {
suspended: usize,
queued: usize,
partial: usize,
in_flight: usize,
pool_pending: usize,
}
impl UnsettledCounts {
fn is_empty(self) -> bool {
self.suspended == 0
&& self.queued == 0
&& self.partial == 0
&& self.in_flight == 0
&& self.pool_pending == 0
}
fn to_json(self) -> serde_json::Value {
serde_json::json!({
"suspended": self.suspended,
"queued": self.queued,
"partial": self.partial,
"in_flight": self.in_flight,
"pool_pending": self.pool_pending,
})
}
fn summary(self) -> String {
if self.is_empty() {
"no unsettled work".to_string()
} else {
format!(
"unsettled work: {} suspended subagents, {} queued triggers, {} partial handoffs, {} in-flight llm calls, {} pool pending tasks",
self.suspended, self.queued, self.partial, self.in_flight, self.pool_pending
)
}
}
}
fn state_counts(state: &VmValue) -> Result<UnsettledCounts, VmError> {
let Some(dict) = state.as_dict() else {
return Err(VmError::TypeError(
"Harness unsettled-state helpers expect a state dict".to_string(),
));
};
Ok(UnsettledCounts {
suspended: state_bucket_len(dict, "suspended_subagents")?,
queued: state_bucket_len(dict, "queued_triggers")?,
partial: state_bucket_len(dict, "partial_handoffs")?,
in_flight: state_bucket_len(dict, "in_flight_llm_calls")?,
pool_pending: state_bucket_len(dict, "pool_pending_tasks")?,
})
}
fn state_bucket_len(
dict: &std::collections::BTreeMap<String, VmValue>,
key: &str,
) -> Result<usize, VmError> {
match dict.get(key) {
Some(VmValue::List(items)) => Ok(items.len()),
Some(other) => Err(VmError::TypeError(format!(
"unsettled-state field `{key}` must be a list, got {}",
other.type_name()
))),
None => Ok(0),
}
}
async fn acknowledge_trigger(args: &[VmValue]) -> VmValue {
let Some(id) = args
.first()
.map(vm_value_string)
.filter(|id| !id.is_empty())
else {
return json_receipt("rejected", "acknowledge_trigger", "missing trigger id");
};
let receipt = acknowledge_trigger_id(&id).await;
crate::stdlib::json_to_vm_value(&receipt)
}
async fn defer_trigger(args: &[VmValue]) -> VmValue {
let Some(id) = args
.first()
.map(vm_value_string)
.filter(|id| !id.is_empty())
else {
return json_receipt("rejected", "defer_trigger", "missing trigger id");
};
let target = args
.get(1)
.map(vm_value_string)
.filter(|target| !target.trim().is_empty())
.unwrap_or_else(|| "deferred-triggers".to_string());
let acknowledgement = acknowledge_trigger_id(&id).await;
if acknowledgement
.get("status")
.and_then(serde_json::Value::as_str)
!= Some("acknowledged")
{
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": acknowledgement
.get("status")
.and_then(serde_json::Value::as_str)
.unwrap_or("rejected"),
"method": "defer_trigger",
"trigger_id": id,
"acknowledgement": acknowledgement,
}));
}
let envelope = crate::orchestration::record_partial_handoff(
target,
serde_json::json!({
"deferred_trigger": acknowledgement.get("item").cloned().unwrap_or(serde_json::Value::Null),
"acknowledgement": acknowledgement.clone(),
}),
);
crate::orchestration::record_lifecycle_audit(
"trigger_deferred",
serde_json::json!({
"trigger_id": id,
"envelope_id": envelope.envelope_id.clone(),
}),
);
crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "deferred",
"method": "defer_trigger",
"trigger_id": id,
"acknowledgement": acknowledgement,
"envelope": envelope.to_json(),
}))
}
async fn acknowledge_trigger_id(id: &str) -> serde_json::Value {
let snapshot = crate::orchestration::unsettled_state_snapshot_async().await;
if !snapshot.suspended_subagents.is_empty() || !snapshot.partial_handoffs.is_empty() {
return serde_json::json!({
"status": "rejected",
"method": "acknowledge_trigger",
"trigger_id": id,
"reason": "HARN-DRN-001: cannot acknowledge trigger while suspended subagents remain",
});
}
let Some(item) = snapshot
.queued_triggers
.iter()
.find(|item| item.get("id").and_then(serde_json::Value::as_str) == Some(id))
.cloned()
else {
return serde_json::json!({
"status": "not_found",
"method": "acknowledge_trigger",
"trigger_id": id,
});
};
let Some(log) = crate::event_log::active_event_log() else {
return serde_json::json!({
"status": "rejected",
"method": "acknowledge_trigger",
"trigger_id": id,
"reason": "no active event log is installed",
"item": item,
});
};
let result = match item.get("source").and_then(serde_json::Value::as_str) {
Some("worker_queue") => {
let queue = item
.get("queue")
.and_then(serde_json::Value::as_str)
.unwrap_or_default();
let job_event_id = item
.get("job_event_id")
.and_then(serde_json::Value::as_u64)
.unwrap_or_default();
match crate::triggers::WorkerQueue::new(log)
.ack_job(queue, job_event_id, "pipeline_lifecycle")
.await
{
Ok(true) => serde_json::json!({"status": "acknowledged"}),
Ok(false) => serde_json::json!({"status": "not_found"}),
Err(error) => serde_json::json!({
"status": "rejected",
"reason": error.to_string(),
}),
}
}
Some("trigger_inbox") => {
let Some(binding_key) = item
.get("binding_key")
.and_then(serde_json::Value::as_str)
.filter(|value| !value.is_empty())
else {
return serde_json::json!({
"status": "rejected",
"method": "acknowledge_trigger",
"trigger_id": id,
"reason": "queued trigger is missing binding_key",
"item": item,
});
};
let Some(event_id) = item
.get("event_id")
.and_then(serde_json::Value::as_str)
.filter(|value| !value.is_empty())
else {
return serde_json::json!({
"status": "rejected",
"method": "acknowledge_trigger",
"trigger_id": id,
"reason": "queued trigger is missing event_id",
"item": item,
});
};
let request = crate::triggers::DispatchCancelRequest {
binding_key: binding_key.to_string(),
event_id: event_id.to_string(),
requested_at: crate::clock_mock::now_utc(),
requested_by: Some("pipeline_lifecycle".to_string()),
audit_id: None,
};
match crate::triggers::append_dispatch_cancel_request(&log, &request).await {
Ok(_) => serde_json::json!({"status": "acknowledged"}),
Err(error) => serde_json::json!({
"status": "rejected",
"reason": error.to_string(),
}),
}
}
Some(source) => serde_json::json!({
"status": "rejected",
"reason": format!("unknown queued trigger source `{source}`"),
}),
None => serde_json::json!({
"status": "rejected",
"reason": "queued trigger is missing source",
}),
};
if result.get("status").and_then(serde_json::Value::as_str) == Some("acknowledged") {
crate::orchestration::record_lifecycle_audit(
"trigger_acknowledged",
serde_json::json!({
"trigger_id": id,
"item": item.clone(),
}),
);
}
let mut receipt = serde_json::Map::new();
receipt.insert(
"status".to_string(),
result
.get("status")
.cloned()
.unwrap_or_else(|| serde_json::json!("rejected")),
);
receipt.insert(
"method".to_string(),
serde_json::json!("acknowledge_trigger"),
);
receipt.insert("trigger_id".to_string(), serde_json::json!(id));
receipt.insert("item".to_string(), item);
if let Some(reason) = result.get("reason").cloned() {
receipt.insert("reason".to_string(), reason);
}
serde_json::Value::Object(receipt)
}
fn acknowledge_handoff(args: &[VmValue]) -> VmValue {
let Some(envelope_id) = args
.first()
.map(vm_value_string)
.filter(|id| !id.is_empty())
else {
return json_receipt("rejected", "acknowledge_handoff", "missing envelope id");
};
let decision = args
.get(1)
.map(crate::llm::vm_value_to_json)
.unwrap_or(serde_json::Value::Null);
let snapshot = crate::orchestration::unsettled_state_snapshot();
if !snapshot.suspended_subagents.is_empty() {
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "rejected",
"method": "acknowledge_handoff",
"envelope_id": envelope_id,
"reason": "HARN-DRN-001: cannot acknowledge handoff while suspended subagents remain",
}));
}
match crate::orchestration::acknowledge_partial_handoff(&envelope_id, decision) {
Some(envelope) => crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "acknowledged",
"method": "acknowledge_handoff",
"envelope": envelope.to_json(),
})),
None => crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "not_found",
"method": "acknowledge_handoff",
"envelope_id": envelope_id,
})),
}
}
fn finalize_pipeline(args: &[VmValue]) -> VmValue {
let disposition = args
.first()
.map(crate::llm::vm_value_to_json)
.unwrap_or(serde_json::Value::Null);
let receipt = crate::orchestration::finalize_pipeline_disposition(disposition);
crate::stdlib::json_to_vm_value(&receipt)
}
fn json_receipt(status: &str, method: &str, reason: &str) -> VmValue {
crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": status,
"method": method,
"reason": reason,
}))
}
fn vm_value_string(value: &VmValue) -> String {
match value {
VmValue::String(text) => text.as_ref().to_string(),
other => other.display(),
}
}
async fn record_emit_audit_with_hooks(args: &[VmValue]) -> VmValue {
let kind = args
.first()
.map(|v| match v {
VmValue::String(s) => s.as_ref().to_string(),
other => other.display(),
})
.unwrap_or_default();
let mut payload = args
.get(1)
.map(crate::llm::vm_value_to_json)
.unwrap_or(serde_json::Value::Null);
if kind == "drain_decision" {
let hook_payload = serde_json::json!({
"event": crate::orchestration::HookEvent::OnDrainDecision.as_str(),
"action": payload.get("action").cloned().unwrap_or(serde_json::Value::Null),
"item": payload.get("item").cloned().unwrap_or(serde_json::Value::Null),
"payload": payload.clone(),
});
match crate::orchestration::run_lifecycle_hooks_with_control(
crate::orchestration::HookEvent::OnDrainDecision,
&hook_payload,
)
.await
{
Ok(crate::orchestration::HookControl::Allow) => {}
Ok(crate::orchestration::HookControl::Block { reason }) => {
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "blocked",
"method": "emit_audit",
"kind": kind,
"reason": reason,
}));
}
Ok(crate::orchestration::HookControl::Modify { payload: modified }) => {
if let Some(p) = modified.get("payload") {
payload = p.clone();
}
}
Ok(crate::orchestration::HookControl::Decision { .. }) => {}
Err(err) => {
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "error",
"method": "emit_audit",
"kind": kind,
"error": err.to_string(),
}));
}
}
record_drain_decision_span(&payload);
}
let entry = crate::orchestration::record_lifecycle_audit(kind, payload);
crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "recorded",
"method": "emit_audit",
"entry": entry.to_json(),
}))
}
async fn record_spawn_settlement_agent_with_hooks(args: &[VmValue]) -> VmValue {
let mut unsettled = args
.first()
.map(crate::llm::vm_value_to_json)
.unwrap_or(serde_json::Value::Null);
let return_value = args
.get(1)
.map(crate::llm::vm_value_to_json)
.unwrap_or(serde_json::Value::Null);
let options = args
.get(2)
.map(crate::llm::vm_value_to_json)
.unwrap_or(serde_json::Value::Null);
let pre_payload = serde_json::json!({
"event": crate::orchestration::HookEvent::PreDrain.as_str(),
"unsettled": unsettled.clone(),
"return_value": return_value.clone(),
"options": options.clone(),
});
match crate::orchestration::run_lifecycle_hooks_with_control(
crate::orchestration::HookEvent::PreDrain,
&pre_payload,
)
.await
{
Ok(crate::orchestration::HookControl::Allow) => {}
Ok(crate::orchestration::HookControl::Block { reason }) => {
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "skipped",
"method": "spawn_settlement_agent",
"reason": reason,
}));
}
Ok(crate::orchestration::HookControl::Modify { payload }) => {
if let Some(new_unsettled) = payload.get("unsettled") {
unsettled = new_unsettled.clone();
}
}
Ok(crate::orchestration::HookControl::Decision { .. }) => {}
Err(err) => {
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "error",
"method": "spawn_settlement_agent",
"error": err.to_string(),
}));
}
}
let span_links = crate::tracing::current_span_link()
.map(|link| {
link.with_attributes(std::collections::BTreeMap::from([(
"harn.link.kind".to_string(),
"pipeline".to_string(),
)]))
})
.into_iter()
.collect();
let span_id = crate::tracing::span_start_detached_with_links(
crate::tracing::SpanKind::Drain,
"settlement_agent".to_string(),
span_links,
);
if span_id != 0 {
if let Ok(counts) = state_counts(&crate::stdlib::json_to_vm_value(&unsettled)) {
crate::tracing::span_set_metadata(span_id, "counts", counts.to_json());
}
}
let outcome_json =
crate::orchestration::run_settlement_agent_loop(unsettled.clone(), return_value, options)
.await;
if span_id != 0 {
if let Some(status) = outcome_json.get("status").cloned() {
crate::tracing::span_set_metadata(span_id, "status", status);
}
if let Some(iterations) = outcome_json.get("iterations").cloned() {
crate::tracing::span_set_metadata(span_id, "iterations", iterations);
}
crate::tracing::span_end(span_id);
}
let outcome = crate::stdlib::json_to_vm_value(&outcome_json);
let post_payload = serde_json::json!({
"event": crate::orchestration::HookEvent::PostDrain.as_str(),
"unsettled": unsettled,
"outcome": outcome_json,
});
if let Err(err) = crate::orchestration::run_lifecycle_hooks(
crate::orchestration::HookEvent::PostDrain,
&post_payload,
)
.await
{
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "error",
"method": "spawn_settlement_agent",
"error": err.to_string(),
}));
}
outcome
}
fn record_drain_decision_span(payload: &serde_json::Value) {
let links = crate::tracing::current_span_link()
.map(|link| {
link.with_attributes(std::collections::BTreeMap::from([(
"harn.link.kind".to_string(),
"drain".to_string(),
)]))
})
.into_iter()
.collect();
let span_id = crate::tracing::span_start_detached_with_links(
crate::tracing::SpanKind::DrainDecision,
payload
.get("action")
.and_then(|value| value.as_str())
.unwrap_or("drain_decision")
.to_string(),
links,
);
if span_id != 0 {
if let Some(action) = payload.get("action").and_then(|value| value.as_str()) {
crate::tracing::span_set_metadata(span_id, "action", serde_json::json!(action));
}
if let Some(item) = payload.pointer("/item/id").and_then(|value| value.as_str()) {
crate::tracing::span_set_metadata(span_id, "item_id", serde_json::json!(item));
}
crate::tracing::span_end(span_id);
}
}
fn record_handoff_envelope(args: &[VmValue]) -> VmValue {
let Some(target_value) = args.first() else {
return crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "rejected",
"method": "handoff_to",
"reason": "missing target pipeline argument",
}));
};
let target = match target_value {
VmValue::String(s) => s.as_ref().to_string(),
other => other.display(),
};
let payload = args
.get(1)
.map(crate::llm::vm_value_to_json)
.unwrap_or(serde_json::Value::Null);
let envelope = crate::orchestration::record_partial_handoff(target, payload);
crate::stdlib::json_to_vm_value(&serde_json::json!({
"status": "queued",
"method": "handoff_to",
"envelope": envelope.to_json(),
}))
}
fn method_unsupported(handle: &VmHarness, method: &str) -> VmError {
VmError::TypeError(format!(
"value of type {} has no method `{method}`",
handle.type_name()
))
}
fn sleep_ms_arg(args: &[VmValue]) -> Result<i64, VmError> {
args.first()
.and_then(|v| match v {
VmValue::Int(n) => Some(*n),
VmValue::Duration(ms) => Some(*ms),
_ => None,
})
.ok_or_else(|| {
VmError::TypeError("HarnessClock.sleep_ms expects an int or duration argument".into())
})
}
fn string_arg<'a>(args: &'a [VmValue], index: usize, callee: &str) -> Result<&'a str, VmError> {
match args.get(index) {
Some(VmValue::String(value)) => Ok(value.as_ref()),
Some(other) => Err(VmError::TypeError(format!(
"{callee} expects string argument {}, got {}",
index + 1,
other.type_name()
))),
None => Err(VmError::TypeError(format!(
"{callee} expects string argument {}",
index + 1
))),
}
}
fn mock_read_line_value(state: &crate::harness::MockHarnessState, args: &[VmValue]) -> VmValue {
let structured = matches!(args.first(), Some(VmValue::Dict(_)));
match state.pop_stdin_line() {
Some(line) => {
if structured {
let mut out = std::collections::BTreeMap::new();
out.insert("ok".to_string(), VmValue::Bool(true));
out.insert(
"status".to_string(),
VmValue::String(std::rc::Rc::from("ok")),
);
out.insert(
"value".to_string(),
VmValue::String(std::rc::Rc::from(line)),
);
VmValue::Dict(std::rc::Rc::new(out))
} else {
VmValue::String(std::rc::Rc::from(line))
}
}
None => {
if structured {
let mut out = std::collections::BTreeMap::new();
out.insert("ok".to_string(), VmValue::Bool(false));
out.insert(
"status".to_string(),
VmValue::String(std::rc::Rc::from("eof")),
);
VmValue::Dict(std::rc::Rc::new(out))
} else {
VmValue::Nil
}
}
}
}