1use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::BTreeMap;
6use std::io::Write;
7use std::path::{Path, PathBuf};
8use std::process::{Child, Command, Output, Stdio};
9use std::thread;
10use std::time::{Duration, Instant};
11use thiserror::Error;
12
13use crate::runner::ChoreographyJson;
14use crate::sim_reference::{
15 SimRunInput, SimRunOutput, SimTraceValidation, SimulationStructuredError,
16};
17use crate::vm_export::TickedObsEvent;
18use crate::vm_trace::{
19 normalize_vm_trace, traces_equivalent, EffectTraceEvent, OutputConditionTraceEvent,
20};
21
22#[path = "vm_runner_json_parsing.rs"]
23mod parsing;
24use parsing::{
25 parse_required_valid, parse_sim_run_output, parse_sim_trace_validation,
26 parse_structured_errors, simulation_trace_payload,
27};
28
29#[derive(Debug, Error)]
31pub enum VmRunnerError {
32 #[error("VM runner binary not found at {0}")]
34 BinaryNotFound(PathBuf),
35 #[error("Failed to create temp file: {0}")]
37 TempFileError(#[from] std::io::Error),
38 #[error("VM runner failed with exit code {code}: {stderr}")]
40 ProcessFailed {
41 code: i32,
43 stderr: String,
45 },
46 #[error("Failed to parse VM runner output: {0}")]
48 ParseError(String),
49 #[error("VM runner operation '{operation}' timed out after {timeout_ms}ms")]
51 TimedOut {
52 operation: String,
54 timeout_ms: u64,
56 },
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct VmRunInput {
62 #[serde(default = "crate::schema::default_schema_version")]
64 pub schema_version: String,
65 pub choreographies: Vec<ChoreographyJson>,
67 pub concurrency: u64,
69 pub max_steps: u64,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct VmSessionStatus {
76 #[serde(default = "crate::schema::default_schema_version")]
78 pub schema_version: String,
79 pub sid: u64,
81 pub terminal: bool,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
87pub struct VmTraceEvent {
88 #[serde(default = "crate::schema::default_schema_version")]
90 pub schema_version: String,
91 pub kind: String,
92 pub tick: u64,
93 #[serde(default)]
94 pub session: Option<u64>,
95 #[serde(default)]
96 pub sender: Option<String>,
97 #[serde(default)]
98 pub receiver: Option<String>,
99 #[serde(default)]
100 pub label: Option<String>,
101 #[serde(default)]
102 pub role: Option<String>,
103 #[serde(default)]
104 pub target: Option<String>,
105 #[serde(default)]
106 pub permitted: Option<bool>,
107 #[serde(default)]
108 pub epoch: Option<u64>,
109 #[serde(default)]
110 pub ghost: Option<u64>,
111 #[serde(default)]
112 pub from: Option<u64>,
113 #[serde(default)]
114 pub to: Option<u64>,
115 #[serde(default)]
116 pub predicate_ref: Option<String>,
117 #[serde(default)]
118 pub witness_ref: Option<String>,
119 #[serde(default)]
120 pub output_digest: Option<String>,
121 #[serde(default)]
122 pub passed: Option<bool>,
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
127pub struct VmStepState {
128 #[serde(default)]
130 pub step_index: u64,
131 #[serde(default)]
133 pub selected_coro: Option<u64>,
134 #[serde(default)]
136 pub exec_status: Option<String>,
137 #[serde(default)]
139 pub session_type_counts: BTreeMap<u64, u64>,
140 #[serde(default)]
142 pub event: Option<VmTraceEvent>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct VmRunOutput {
148 #[serde(default = "crate::schema::default_schema_version")]
150 pub schema_version: String,
151 pub trace: Vec<VmTraceEvent>,
153 pub sessions: Vec<VmSessionStatus>,
155 pub steps_executed: u64,
157 pub concurrency: u64,
159 pub status: String,
161 #[serde(default)]
163 pub effect_trace: Vec<EffectTraceEvent>,
164 #[serde(default)]
166 pub output_condition_trace: Vec<OutputConditionTraceEvent>,
167 #[serde(default)]
169 pub step_states: Vec<VmStepState>,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
174pub struct LeanStructuredError {
175 pub code: String,
176 #[serde(default)]
177 pub path: Option<String>,
178 pub message: String,
179}
180
181#[derive(Debug, Clone, Serialize)]
183pub struct TraceValidation {
184 pub valid: bool,
185 #[serde(default)]
186 pub errors: Vec<LeanStructuredError>,
187}
188
189#[derive(Debug, Clone, Serialize)]
191pub struct ComparisonResult {
192 pub equivalent: bool,
193 pub trace_equivalent: bool,
194 pub rust_normalized: Vec<TickedObsEvent<VmTraceEvent>>,
195 pub lean_normalized: Vec<TickedObsEvent<VmTraceEvent>>,
196 #[serde(default)]
197 pub diff: Option<Value>,
198 pub lean_output: VmRunOutput,
199}
200
201#[derive(Debug, Clone, Serialize)]
203pub struct InvariantVerificationResult {
204 pub valid: bool,
205 #[serde(default)]
206 pub errors: Vec<LeanStructuredError>,
207 #[serde(default)]
208 pub artifacts: Value,
209}
210
211pub struct VmRunner {
213 binary_path: PathBuf,
214}
215
216impl VmRunner {
217 pub const DEFAULT_BINARY_PATH: &'static str = "lean/.lake/build/bin/vm_runner";
219 pub const DEFAULT_TIMEOUT_MS: u64 = 120_000;
221
222 fn process_timeout() -> Duration {
223 let ms = std::env::var("TELLTALE_VM_TIMEOUT_MS")
224 .ok()
225 .and_then(|raw| raw.parse::<u64>().ok())
226 .unwrap_or(Self::DEFAULT_TIMEOUT_MS);
227 Duration::from_millis(ms.max(1))
228 }
229
230 fn wait_with_timeout(
231 mut child: Child,
232 timeout: Duration,
233 operation: &str,
234 ) -> Result<Output, VmRunnerError> {
235 let start = Instant::now();
236 loop {
237 match child.try_wait()? {
239 Some(_) => return child.wait_with_output().map_err(VmRunnerError::from),
240 None => {
241 if start.elapsed() >= timeout {
242 if let Err(err) = child.kill() {
243 eprintln!(
244 "best-effort child.kill failed during timeout handling: {err}"
245 );
246 }
247 if let Err(err) = child.wait() {
248 eprintln!(
249 "best-effort child.wait failed during timeout handling: {err}"
250 );
251 }
252 return Err(VmRunnerError::TimedOut {
253 operation: operation.to_string(),
254 timeout_ms: u64::try_from(timeout.as_millis()).unwrap_or(u64::MAX),
255 });
256 }
257 thread::sleep(Duration::from_millis(10));
258 }
259 }
260 }
261 }
262
263 fn find_workspace_root() -> Option<PathBuf> {
264 let manifest_dir = env!("CARGO_MANIFEST_DIR");
265 let mut path = PathBuf::from(manifest_dir);
266 for _ in 0..5 {
267 if path.join("lean/.lake").is_dir() {
268 return Some(path);
269 }
270 if !path.pop() {
271 break;
272 }
273 }
274 None
275 }
276
277 fn get_binary_path() -> Option<PathBuf> {
278 Self::find_workspace_root()
279 .map(|root| root.join(Self::DEFAULT_BINARY_PATH))
280 .filter(|p| p.exists())
281 }
282
283 pub fn new() -> Result<Self, VmRunnerError> {
289 match Self::get_binary_path() {
290 Some(path) => Ok(Self { binary_path: path }),
291 None => Err(VmRunnerError::BinaryNotFound(PathBuf::from(
292 Self::DEFAULT_BINARY_PATH,
293 ))),
294 }
295 }
296
297 pub fn with_binary_path(path: impl AsRef<Path>) -> Result<Self, VmRunnerError> {
303 let binary_path = PathBuf::from(path.as_ref());
304 if !binary_path.exists() || !binary_path.is_file() {
305 return Err(VmRunnerError::BinaryNotFound(binary_path));
306 }
307 Ok(Self { binary_path })
308 }
309
310 #[must_use]
312 pub fn try_new() -> Option<Self> {
313 Self::new().ok()
314 }
315
316 pub fn run(&self, input: &VmRunInput) -> Result<VmRunOutput, VmRunnerError> {
322 crate::schema::ensure_supported_schema_version(&input.schema_version, "VmRunInput")
323 .map_err(VmRunnerError::ParseError)?;
324
325 let payload =
326 serde_json::to_vec(input).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
327
328 let mut cmd = Command::new(&self.binary_path)
329 .stdin(Stdio::piped())
330 .stdout(Stdio::piped())
331 .stderr(Stdio::piped())
332 .spawn()
333 .map_err(VmRunnerError::TempFileError)?;
334
335 if let Some(mut stdin) = cmd.stdin.take() {
336 stdin.write_all(&payload)?;
337 }
338
339 let output = Self::wait_with_timeout(cmd, Self::process_timeout(), "run")?;
340
341 if !output.status.success() {
342 return Err(VmRunnerError::ProcessFailed {
343 code: output.status.code().unwrap_or(-1),
344 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
345 });
346 }
347
348 let out: VmRunOutput = serde_json::from_slice(&output.stdout)
349 .map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
350 crate::schema::ensure_supported_schema_version(&out.schema_version, "VmRunOutput")
351 .map_err(VmRunnerError::ParseError)?;
352 Ok(out)
353 }
354
355 pub fn run_lean_vm(&self, input: &VmRunInput) -> Result<VmRunOutput, VmRunnerError> {
361 self.run(input)
362 }
363
364 pub fn run_lean_validation(
370 &self,
371 operation: &str,
372 payload: &Value,
373 ) -> Result<Value, VmRunnerError> {
374 let input = serde_json::json!({
375 "schema_version": crate::schema::default_schema_version(),
376 "operation": operation,
377 "payload": payload,
378 });
379 let bytes =
380 serde_json::to_vec(&input).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
381
382 let mut cmd = Command::new(&self.binary_path)
383 .stdin(Stdio::piped())
384 .stdout(Stdio::piped())
385 .stderr(Stdio::piped())
386 .spawn()
387 .map_err(VmRunnerError::TempFileError)?;
388
389 if let Some(mut stdin) = cmd.stdin.take() {
390 stdin.write_all(&bytes)?;
391 }
392
393 let output = Self::wait_with_timeout(cmd, Self::process_timeout(), operation)?;
394 if !output.status.success() {
395 return Err(VmRunnerError::ProcessFailed {
396 code: output.status.code().unwrap_or(-1),
397 stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
398 });
399 }
400 serde_json::from_slice(&output.stdout).map_err(|e| VmRunnerError::ParseError(e.to_string()))
401 }
402
403 pub fn validate_trace(
409 &self,
410 rust_trace: &[VmTraceEvent],
411 ) -> Result<TraceValidation, VmRunnerError> {
412 let payload = serde_json::json!({
413 "trace": rust_trace,
414 });
415 let response = self.run_lean_validation("validateTrace", &payload)?;
416 Ok(TraceValidation {
417 valid: parse_required_valid(&response, "validateTrace")?,
418 errors: parse_structured_errors(&response),
419 })
420 }
421
422 pub fn run_reference_simulation(
428 &self,
429 input: &SimRunInput,
430 ) -> Result<SimRunOutput, VmRunnerError> {
431 crate::schema::ensure_supported_schema_version(&input.schema_version, "SimRunInput")
432 .map_err(VmRunnerError::ParseError)?;
433 let payload =
434 serde_json::to_value(input).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
435 let response = self.run_lean_validation("runSimulation", &payload)?;
436 parse_sim_run_output(response)
437 }
438
439 pub fn validate_simulation_trace(
445 &self,
446 trace: &[VmTraceEvent],
447 ) -> Result<SimTraceValidation, VmRunnerError> {
448 let payload = simulation_trace_payload(trace);
449 let response = self.run_lean_validation("validateSimulationTrace", &payload)?;
450 parse_sim_trace_validation(&response)
451 }
452
453 pub fn compare_execution(
459 &self,
460 choreography: &ChoreographyJson,
461 rust_output: &VmRunOutput,
462 ) -> Result<ComparisonResult, VmRunnerError> {
463 let input = VmRunInput {
464 schema_version: crate::schema::default_schema_version(),
465 choreographies: vec![choreography.clone()],
466 concurrency: rust_output.concurrency,
467 max_steps: rust_output.steps_executed.max(1),
468 };
469 let lean_output = self.run_lean_vm(&input)?;
470
471 let rust_ticked: Vec<TickedObsEvent<VmTraceEvent>> = rust_output
472 .trace
473 .iter()
474 .cloned()
475 .map(|event| TickedObsEvent {
476 tick: event.tick,
477 event,
478 })
479 .collect();
480 let lean_ticked: Vec<TickedObsEvent<VmTraceEvent>> = lean_output
481 .trace
482 .iter()
483 .cloned()
484 .map(|event| TickedObsEvent {
485 tick: event.tick,
486 event,
487 })
488 .collect();
489
490 let rust_normalized = normalize_vm_trace(&rust_ticked);
491 let lean_normalized = normalize_vm_trace(&lean_ticked);
492 let trace_equivalent = traces_equivalent(&rust_ticked, &lean_ticked);
493 let diff = compute_trace_diff(&rust_normalized, &lean_normalized);
494
495 Ok(ComparisonResult {
496 equivalent: trace_equivalent,
497 trace_equivalent,
498 rust_normalized,
499 lean_normalized,
500 diff,
501 lean_output,
502 })
503 }
504
505 pub fn verify_invariants(
511 &self,
512 bundle: &crate::invariants::ProtocolBundle,
513 ) -> Result<InvariantVerificationResult, VmRunnerError> {
514 let payload =
515 serde_json::to_value(bundle).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
516 let response = self.run_lean_validation("verifyProtocolBundle", &payload)?;
517
518 Ok(InvariantVerificationResult {
519 valid: parse_required_valid(&response, "verifyProtocolBundle")?,
520 errors: parse_structured_errors(&response),
521 artifacts: response.get("artifacts").cloned().unwrap_or(Value::Null),
522 })
523 }
524}
525
526#[must_use]
528pub fn compute_trace_diff(
529 rust_trace: &[TickedObsEvent<VmTraceEvent>],
530 lean_trace: &[TickedObsEvent<VmTraceEvent>],
531) -> Option<Value> {
532 if rust_trace == lean_trace {
533 return None;
534 }
535
536 let min_len = rust_trace.len().min(lean_trace.len());
537 for idx in 0..min_len {
538 if rust_trace[idx] != lean_trace[idx] {
539 return Some(serde_json::json!({
540 "kind": "event_mismatch",
541 "index": idx,
542 "rust": rust_trace[idx],
543 "lean": lean_trace[idx],
544 "rust_len": rust_trace.len(),
545 "lean_len": lean_trace.len(),
546 }));
547 }
548 }
549
550 Some(serde_json::json!({
551 "kind": "length_mismatch",
552 "rust_len": rust_trace.len(),
553 "lean_len": lean_trace.len(),
554 }))
555}
556
557pub fn vm_input_from_values(
563 choreographies: Vec<Value>,
564 concurrency: u64,
565 max_steps: u64,
566) -> Result<VmRunInput, VmRunnerError> {
567 let mut choreos = Vec::new();
568 for value in choreographies {
569 let choreo: ChoreographyJson =
570 serde_json::from_value(value).map_err(|e| VmRunnerError::ParseError(e.to_string()))?;
571 choreos.push(choreo);
572 }
573 Ok(VmRunInput {
574 schema_version: crate::schema::default_schema_version(),
575 choreographies: choreos,
576 concurrency,
577 max_steps,
578 })
579}
580
581pub fn output_to_json(output: &VmRunOutput) -> Result<Value, VmRunnerError> {
583 serde_json::to_value(output).map_err(|e| VmRunnerError::ParseError(e.to_string()))
584}
585
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use std::process::Command;
590 use std::time::Duration;
591
592 fn trace_event(kind: &str, tick: u64, session: Option<u64>) -> VmTraceEvent {
593 VmTraceEvent {
594 schema_version: crate::schema::default_schema_version(),
595 kind: kind.to_string(),
596 tick,
597 session,
598 sender: None,
599 receiver: None,
600 label: None,
601 role: None,
602 target: None,
603 permitted: None,
604 epoch: None,
605 ghost: None,
606 from: None,
607 to: None,
608 predicate_ref: None,
609 witness_ref: None,
610 output_digest: None,
611 passed: None,
612 }
613 }
614
615 #[test]
616 fn compute_trace_diff_none_for_equal_traces() {
617 let trace = vec![TickedObsEvent {
618 tick: 0,
619 event: trace_event("sent", 1, Some(0)),
620 }];
621 assert!(compute_trace_diff(&trace, &trace).is_none());
622 }
623
624 #[test]
625 fn compute_trace_diff_reports_event_mismatch() {
626 let rust_trace = vec![TickedObsEvent {
627 tick: 0,
628 event: trace_event("sent", 1, Some(0)),
629 }];
630 let lean_trace = vec![TickedObsEvent {
631 tick: 0,
632 event: trace_event("received", 1, Some(0)),
633 }];
634 let diff = compute_trace_diff(&rust_trace, &lean_trace).expect("expected diff");
635 assert_eq!(diff["kind"], "event_mismatch");
636 assert_eq!(diff["index"], 0);
637 }
638
639 #[test]
640 fn parse_structured_errors_reads_codes_and_paths() {
641 let response = serde_json::json!({
642 "errors": [
643 { "code": "trace.mismatch", "path": "trace[0]", "message": "mismatch" }
644 ]
645 });
646 let errors = parse_structured_errors(&response);
647 assert_eq!(errors.len(), 1);
648 assert_eq!(errors[0].code, "trace.mismatch");
649 assert_eq!(errors[0].path.as_deref(), Some("trace[0]"));
650 }
651
652 #[test]
653 fn parse_sim_trace_validation_reads_errors_and_artifacts() {
654 let response = serde_json::json!({
655 "valid": false,
656 "errors": [
657 { "code": "sim.trace.mismatch", "path": "trace[1]", "message": "mismatch" }
658 ],
659 "artifacts": { "kind": "diff" }
660 });
661 let parsed = parse_sim_trace_validation(&response).expect("parse simulation validation");
662 assert!(!parsed.valid);
663 assert_eq!(parsed.errors.len(), 1);
664 assert_eq!(parsed.errors[0].code, "sim.trace.mismatch");
665 assert_eq!(parsed.errors[0].path.as_deref(), Some("trace[1]"));
666 assert_eq!(parsed.artifacts["kind"], "diff");
667 }
668
669 #[test]
670 fn parse_required_valid_rejects_missing_or_non_boolean() {
671 let missing = serde_json::json!({
672 "errors": []
673 });
674 let missing_err =
675 parse_required_valid(&missing, "validateTrace").expect_err("missing valid must fail");
676 assert!(matches!(missing_err, VmRunnerError::ParseError(_)));
677
678 let wrong_type = serde_json::json!({
679 "valid": "true"
680 });
681 let wrong_type_err = parse_required_valid(&wrong_type, "validateTrace")
682 .expect_err("non-boolean valid must fail");
683 assert!(matches!(wrong_type_err, VmRunnerError::ParseError(_)));
684 }
685
686 #[test]
687 fn parse_sim_run_output_checks_schema_version() {
688 let payload = serde_json::json!({
689 "schema_version": crate::schema::default_schema_version(),
690 "trace": [],
691 "violations": [],
692 "artifacts": {}
693 });
694 let parsed = parse_sim_run_output(payload).expect("parse sim run output");
695 assert_eq!(
696 parsed.schema_version,
697 crate::schema::default_schema_version()
698 );
699 }
700
701 #[test]
702 fn simulation_trace_payload_has_expected_shape() {
703 let trace = vec![trace_event("sent", 1, Some(0))];
704 let payload = simulation_trace_payload(&trace);
705 assert!(payload["trace"].is_array());
706 assert_eq!(payload["trace"][0]["kind"], "sent");
707 }
708
709 #[test]
710 fn wait_with_timeout_returns_timeout_error() {
711 let child = Command::new("sh")
712 .arg("-c")
713 .arg("sleep 1")
714 .spawn()
715 .expect("spawn sleep");
716 let result = VmRunner::wait_with_timeout(child, Duration::from_millis(10), "test_sleep");
717 assert!(matches!(result, Err(VmRunnerError::TimedOut { .. })));
718 }
719}