use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::BTreeMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Output, Stdio};
use std::thread;
use std::time::{Duration, Instant};
use thiserror::Error;
use crate::runner::ChoreographyJson;
use crate::sim_reference::{
SimRunInput, SimRunOutput, SimTraceValidation, SimulationStructuredError,
};
use crate::vm_export::TickedObsEvent;
use crate::vm_trace::{
normalize_vm_trace, traces_equivalent, EffectTraceEvent, OutputConditionTraceEvent,
};
#[path = "vm_runner_json_parsing.rs"]
mod parsing;
use parsing::{
parse_required_valid, parse_sim_run_output, parse_sim_trace_validation,
parse_structured_errors, simulation_trace_payload,
};
#[derive(Debug, Error)]
pub enum VmRunnerError {
#[error("VM runner binary not found at {0}")]
BinaryNotFound(PathBuf),
#[error("Failed to create temp file: {0}")]
TempFileError(#[from] std::io::Error),
#[error("VM runner failed with exit code {code}: {stderr}")]
ProcessFailed {
code: i32,
stderr: String,
},
#[error("Failed to parse VM runner output: {0}")]
ParseError(String),
#[error("VM runner operation '{operation}' timed out after {timeout_ms}ms")]
TimedOut {
operation: String,
timeout_ms: u64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VmRunInput {
#[serde(default = "crate::schema::default_schema_version")]
pub schema_version: String,
pub choreographies: Vec<ChoreographyJson>,
pub concurrency: u64,
pub max_steps: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VmSessionStatus {
#[serde(default = "crate::schema::default_schema_version")]
pub schema_version: String,
pub sid: u64,
pub terminal: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct VmTraceEvent {
#[serde(default = "crate::schema::default_schema_version")]
pub schema_version: String,
pub kind: String,
pub tick: u64,
#[serde(default)]
pub session: Option<u64>,
#[serde(default)]
pub sender: Option<String>,
#[serde(default)]
pub receiver: Option<String>,
#[serde(default)]
pub label: Option<String>,
#[serde(default)]
pub role: Option<String>,
#[serde(default)]
pub target: Option<String>,
#[serde(default)]
pub permitted: Option<bool>,
#[serde(default)]
pub epoch: Option<u64>,
#[serde(default)]
pub ghost: Option<u64>,
#[serde(default)]
pub from: Option<u64>,
#[serde(default)]
pub to: Option<u64>,
#[serde(default)]
pub predicate_ref: Option<String>,
#[serde(default)]
pub witness_ref: Option<String>,
#[serde(default)]
pub output_digest: Option<String>,
#[serde(default)]
pub passed: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VmStepState {
#[serde(default)]
pub step_index: u64,
#[serde(default)]
pub selected_coro: Option<u64>,
#[serde(default)]
pub exec_status: Option<String>,
#[serde(default)]
pub session_type_counts: BTreeMap<u64, u64>,
#[serde(default)]
pub event: Option<VmTraceEvent>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VmRunOutput {
#[serde(default = "crate::schema::default_schema_version")]
pub schema_version: String,
pub trace: Vec<VmTraceEvent>,
pub sessions: Vec<VmSessionStatus>,
pub steps_executed: u64,
pub concurrency: u64,
pub status: String,
#[serde(default)]
pub effect_trace: Vec<EffectTraceEvent>,
#[serde(default)]
pub output_condition_trace: Vec<OutputConditionTraceEvent>,
#[serde(default)]
pub step_states: Vec<VmStepState>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LeanStructuredError {
pub code: String,
#[serde(default)]
pub path: Option<String>,
pub message: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct TraceValidation {
pub valid: bool,
#[serde(default)]
pub errors: Vec<LeanStructuredError>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ComparisonResult {
pub equivalent: bool,
pub trace_equivalent: bool,
pub rust_normalized: Vec<TickedObsEvent<VmTraceEvent>>,
pub lean_normalized: Vec<TickedObsEvent<VmTraceEvent>>,
#[serde(default)]
pub diff: Option<Value>,
pub lean_output: VmRunOutput,
}
#[derive(Debug, Clone, Serialize)]
pub struct InvariantVerificationResult {
pub valid: bool,
#[serde(default)]
pub errors: Vec<LeanStructuredError>,
#[serde(default)]
pub artifacts: Value,
}
pub struct VmRunner {
binary_path: PathBuf,
}
impl VmRunner {
pub const DEFAULT_BINARY_PATH: &'static str = "lean/.lake/build/bin/vm_runner";
pub const DEFAULT_TIMEOUT_MS: u64 = 120_000;
fn process_timeout() -> Duration {
let ms = std::env::var("TELLTALE_VM_TIMEOUT_MS")
.ok()
.and_then(|raw| raw.parse::<u64>().ok())
.unwrap_or(Self::DEFAULT_TIMEOUT_MS);
Duration::from_millis(ms.max(1))
}
fn wait_with_timeout(
mut child: Child,
timeout: Duration,
operation: &str,
) -> Result<Output, VmRunnerError> {
let start = Instant::now();
loop {
match child.try_wait()? {
Some(_) => return child.wait_with_output().map_err(VmRunnerError::from),
None => {
if start.elapsed() >= timeout {
if let Err(err) = child.kill() {
eprintln!(
"best-effort child.kill failed during timeout handling: {err}"
);
}
if let Err(err) = child.wait() {
eprintln!(
"best-effort child.wait failed during timeout handling: {err}"
);
}
return Err(VmRunnerError::TimedOut {
operation: operation.to_string(),
timeout_ms: u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX),
});
}
thread::sleep(Duration::from_millis(10));
}
}
}
}
fn find_workspace_root() -> Option<PathBuf> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let mut path = PathBuf::from(manifest_dir);
for _ in 0..5 {
if path.join("lean/.lake").is_dir() {
return Some(path);
}
if !path.pop() {
break;
}
}
None
}
fn get_binary_path() -> Option<PathBuf> {
Self::find_workspace_root()
.map(|root| root.join(Self::DEFAULT_BINARY_PATH))
.filter(|p| p.exists())
}
pub fn new() -> Result<Self, VmRunnerError> {
match Self::get_binary_path() {
Some(path) => Ok(Self { binary_path: path }),
None => Err(VmRunnerError::BinaryNotFound(PathBuf::from(
Self::DEFAULT_BINARY_PATH,
))),
}
}
pub fn with_binary_path(path: impl AsRef<Path>) -> Result<Self, VmRunnerError> {
let binary_path = PathBuf::from(path.as_ref());
if !binary_path.exists() || !binary_path.is_file() {
return Err(VmRunnerError::BinaryNotFound(binary_path));
}
Ok(Self { binary_path })
}
#[must_use]
pub fn try_new() -> Option<Self> {
Self::new().ok()
}
pub fn run(&self, input: &VmRunInput) -> Result<VmRunOutput, VmRunnerError> {
crate::schema::ensure_supported_schema_version(&input.schema_version, "VmRunInput")
.map_err(VmRunnerError::ParseError)?;
let payload =
serde_json::to_vec(input).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
let mut cmd = Command::new(&self.binary_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(VmRunnerError::TempFileError)?;
if let Some(mut stdin) = cmd.stdin.take() {
stdin.write_all(&payload)?;
}
let output = Self::wait_with_timeout(cmd, Self::process_timeout(), "run")?;
if !output.status.success() {
return Err(VmRunnerError::ProcessFailed {
code: output.status.code().unwrap_or(-1),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
});
}
let out: VmRunOutput = serde_json::from_slice(&output.stdout)
.map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
crate::schema::ensure_supported_schema_version(&out.schema_version, "VmRunOutput")
.map_err(VmRunnerError::ParseError)?;
Ok(out)
}
pub fn run_lean_vm(&self, input: &VmRunInput) -> Result<VmRunOutput, VmRunnerError> {
self.run(input)
}
pub fn run_lean_validation(
&self,
operation: &str,
payload: &Value,
) -> Result<Value, VmRunnerError> {
let input = serde_json::json!({
"schema_version": crate::schema::default_schema_version(),
"operation": operation,
"payload": payload,
});
let bytes =
serde_json::to_vec(&input).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
let mut cmd = Command::new(&self.binary_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(VmRunnerError::TempFileError)?;
if let Some(mut stdin) = cmd.stdin.take() {
stdin.write_all(&bytes)?;
}
let output = Self::wait_with_timeout(cmd, Self::process_timeout(), operation)?;
if !output.status.success() {
return Err(VmRunnerError::ProcessFailed {
code: output.status.code().unwrap_or(-1),
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
});
}
serde_json::from_slice(&output.stdout).map_err(|e| VmRunnerError::ParseError(e.to_string()))
}
pub fn validate_trace(
&self,
rust_trace: &[VmTraceEvent],
) -> Result<TraceValidation, VmRunnerError> {
let payload = serde_json::json!({
"trace": rust_trace,
});
let response = self.run_lean_validation("validateTrace", &payload)?;
Ok(TraceValidation {
valid: parse_required_valid(&response, "validateTrace")?,
errors: parse_structured_errors(&response),
})
}
pub fn run_reference_simulation(
&self,
input: &SimRunInput,
) -> Result<SimRunOutput, VmRunnerError> {
crate::schema::ensure_supported_schema_version(&input.schema_version, "SimRunInput")
.map_err(VmRunnerError::ParseError)?;
let payload =
serde_json::to_value(input).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
let response = self.run_lean_validation("runSimulation", &payload)?;
parse_sim_run_output(response)
}
pub fn validate_simulation_trace(
&self,
trace: &[VmTraceEvent],
) -> Result<SimTraceValidation, VmRunnerError> {
let payload = simulation_trace_payload(trace);
let response = self.run_lean_validation("validateSimulationTrace", &payload)?;
parse_sim_trace_validation(&response)
}
pub fn compare_execution(
&self,
choreography: &ChoreographyJson,
rust_output: &VmRunOutput,
) -> Result<ComparisonResult, VmRunnerError> {
let input = VmRunInput {
schema_version: crate::schema::default_schema_version(),
choreographies: vec![choreography.clone()],
concurrency: rust_output.concurrency,
max_steps: rust_output.steps_executed.max(1),
};
let lean_output = self.run_lean_vm(&input)?;
let rust_ticked: Vec<TickedObsEvent<VmTraceEvent>> = rust_output
.trace
.iter()
.cloned()
.map(|event| TickedObsEvent {
tick: event.tick,
event,
})
.collect();
let lean_ticked: Vec<TickedObsEvent<VmTraceEvent>> = lean_output
.trace
.iter()
.cloned()
.map(|event| TickedObsEvent {
tick: event.tick,
event,
})
.collect();
let rust_normalized = normalize_vm_trace(&rust_ticked);
let lean_normalized = normalize_vm_trace(&lean_ticked);
let trace_equivalent = traces_equivalent(&rust_ticked, &lean_ticked);
let diff = compute_trace_diff(&rust_normalized, &lean_normalized);
Ok(ComparisonResult {
equivalent: trace_equivalent,
trace_equivalent,
rust_normalized,
lean_normalized,
diff,
lean_output,
})
}
pub fn verify_invariants(
&self,
bundle: &crate::invariants::ProtocolBundle,
) -> Result<InvariantVerificationResult, VmRunnerError> {
let payload =
serde_json::to_value(bundle).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
let response = self.run_lean_validation("verifyProtocolBundle", &payload)?;
Ok(InvariantVerificationResult {
valid: parse_required_valid(&response, "verifyProtocolBundle")?,
errors: parse_structured_errors(&response),
artifacts: response.get("artifacts").cloned().unwrap_or(Value::Null),
})
}
}
#[must_use]
pub fn compute_trace_diff(
rust_trace: &[TickedObsEvent<VmTraceEvent>],
lean_trace: &[TickedObsEvent<VmTraceEvent>],
) -> Option<Value> {
if rust_trace == lean_trace {
return None;
}
let min_len = rust_trace.len().min(lean_trace.len());
for idx in 0..min_len {
if rust_trace[idx] != lean_trace[idx] {
return Some(serde_json::json!({
"kind": "event_mismatch",
"index": idx,
"rust": rust_trace[idx],
"lean": lean_trace[idx],
"rust_len": rust_trace.len(),
"lean_len": lean_trace.len(),
}));
}
}
Some(serde_json::json!({
"kind": "length_mismatch",
"rust_len": rust_trace.len(),
"lean_len": lean_trace.len(),
}))
}
pub fn vm_input_from_values(
choreographies: Vec<Value>,
concurrency: u64,
max_steps: u64,
) -> Result<VmRunInput, VmRunnerError> {
let mut choreos = Vec::new();
for value in choreographies {
let choreo: ChoreographyJson =
serde_json::from_value(value).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
choreos.push(choreo);
}
Ok(VmRunInput {
schema_version: crate::schema::default_schema_version(),
choreographies: choreos,
concurrency,
max_steps,
})
}
pub fn output_to_json(output: &VmRunOutput) -> Result<Value, VmRunnerError> {
serde_json::to_value(output).map_err(|e| VmRunnerError::ParseError(e.to_string()))
}
#[cfg(test)]
mod tests {
use super::*;
use std::process::Command;
use std::time::Duration;
fn trace_event(kind: &str, tick: u64, session: Option<u64>) -> VmTraceEvent {
VmTraceEvent {
schema_version: crate::schema::default_schema_version(),
kind: kind.to_string(),
tick,
session,
sender: None,
receiver: None,
label: None,
role: None,
target: None,
permitted: None,
epoch: None,
ghost: None,
from: None,
to: None,
predicate_ref: None,
witness_ref: None,
output_digest: None,
passed: None,
}
}
#[test]
fn compute_trace_diff_none_for_equal_traces() {
let trace = vec![TickedObsEvent {
tick: 0,
event: trace_event("sent", 1, Some(0)),
}];
assert!(compute_trace_diff(&trace, &trace).is_none());
}
#[test]
fn compute_trace_diff_reports_event_mismatch() {
let rust_trace = vec![TickedObsEvent {
tick: 0,
event: trace_event("sent", 1, Some(0)),
}];
let lean_trace = vec![TickedObsEvent {
tick: 0,
event: trace_event("received", 1, Some(0)),
}];
let diff = compute_trace_diff(&rust_trace, &lean_trace).expect("expected diff");
assert_eq!(diff["kind"], "event_mismatch");
assert_eq!(diff["index"], 0);
}
#[test]
fn parse_structured_errors_reads_codes_and_paths() {
let response = serde_json::json!({
"errors": [
{ "code": "trace.mismatch", "path": "trace[0]", "message": "mismatch" }
]
});
let errors = parse_structured_errors(&response);
assert_eq!(errors.len(), 1);
assert_eq!(errors[0].code, "trace.mismatch");
assert_eq!(errors[0].path.as_deref(), Some("trace[0]"));
}
#[test]
fn parse_sim_trace_validation_reads_errors_and_artifacts() {
let response = serde_json::json!({
"valid": false,
"errors": [
{ "code": "sim.trace.mismatch", "path": "trace[1]", "message": "mismatch" }
],
"artifacts": { "kind": "diff" }
});
let parsed = parse_sim_trace_validation(&response).expect("parse simulation validation");
assert!(!parsed.valid);
assert_eq!(parsed.errors.len(), 1);
assert_eq!(parsed.errors[0].code, "sim.trace.mismatch");
assert_eq!(parsed.errors[0].path.as_deref(), Some("trace[1]"));
assert_eq!(parsed.artifacts["kind"], "diff");
}
#[test]
fn parse_required_valid_rejects_missing_or_non_boolean() {
let missing = serde_json::json!({
"errors": []
});
let missing_err =
parse_required_valid(&missing, "validateTrace").expect_err("missing valid must fail");
assert!(matches!(missing_err, VmRunnerError::ParseError(_)));
let wrong_type = serde_json::json!({
"valid": "true"
});
let wrong_type_err = parse_required_valid(&wrong_type, "validateTrace")
.expect_err("non-boolean valid must fail");
assert!(matches!(wrong_type_err, VmRunnerError::ParseError(_)));
}
#[test]
fn parse_sim_run_output_checks_schema_version() {
let payload = serde_json::json!({
"schema_version": crate::schema::default_schema_version(),
"trace": [],
"violations": [],
"artifacts": {}
});
let parsed = parse_sim_run_output(payload).expect("parse sim run output");
assert_eq!(
parsed.schema_version,
crate::schema::default_schema_version()
);
}
#[test]
fn simulation_trace_payload_has_expected_shape() {
let trace = vec![trace_event("sent", 1, Some(0))];
let payload = simulation_trace_payload(&trace);
assert!(payload["trace"].is_array());
assert_eq!(payload["trace"][0]["kind"], "sent");
}
#[test]
fn wait_with_timeout_returns_timeout_error() {
let child = Command::new("sh")
.arg("-c")
.arg("sleep 1")
.spawn()
.expect("spawn sleep");
let result = VmRunner::wait_with_timeout(child, Duration::from_millis(10), "test_sleep");
assert!(matches!(result, Err(VmRunnerError::TimedOut { .. })));
}
}