pub mod bifs;
pub mod buffer;
pub mod context;
mod pty;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use regex::Regex;
use regex::RegexBuilder;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use crate::RuntimeContext;
use crate::observe::event_log::BufferSnapshot;
use crate::observe::event_sink::EventSink;
use crate::report::result::Failure;
use crate::vm::buffer::BUFFER_TAIL_LEN;
use crate::vm::buffer::FailPatternHit;
use crate::vm::buffer::regex_error_summary;
use crate::vm::context::Captures;
use crate::vm::context::ExecutionContext;
use crate::vm::context::FailPattern;
use crate::vm::pty::PtyShell;
use relux_core::diagnostics::IrSpan;
use relux_ir::IrCallExpr;
use relux_ir::IrExpr;
use relux_ir::IrFn;
use relux_ir::IrInterpolation;
use relux_ir::IrPureFn;
use relux_ir::IrShellStmt;
use relux_ir::IrStringPart;
use relux_ir::Tables;
fn has_interpolation(expr: &IrInterpolation) -> bool {
expr.parts().iter().any(|p| {
matches!(
p,
IrStringPart::Var { .. } | IrStringPart::CaptureRef { .. }
)
})
}
fn interpolation_template(expr: &IrInterpolation) -> String {
expr.parts()
.iter()
.map(|p| match p {
IrStringPart::Literal { value, .. } => value.clone(),
IrStringPart::Var { name, .. } => format!("${{{name}}}"),
IrStringPart::EscapedDollar { .. } => "$".to_string(),
IrStringPart::CaptureRef { index, .. } => format!("${{{index}}}"),
})
.collect()
}
async fn interpolate_ir(expr: &IrInterpolation, ctx: &ExecutionContext) -> String {
let mut out = String::new();
for part in expr.parts() {
match part {
IrStringPart::Literal { value, .. } => out.push_str(value),
IrStringPart::Var { name, .. } => {
if let Some(v) = ctx.lookup(name).await {
out.push_str(&v);
}
}
IrStringPart::EscapedDollar { .. } => out.push('$'),
IrStringPart::CaptureRef { index, .. } => {
if let Some(v) = ctx.capture(*index) {
out.push_str(&v);
}
}
}
}
out
}
pub struct Vm {
pty: PtyShell,
ctx: ExecutionContext,
tables: Tables,
pub events: EventSink,
shell_prompt: String,
pub(crate) cancel: CancellationToken,
flaky_timeout_multiplier: f64,
}
impl Vm {
pub async fn new(
shell_name: String,
ctx: ExecutionContext,
rt_ctx: &RuntimeContext,
) -> Result<Self, Failure> {
let shell_log = rt_ctx
.create_shell_logger(&shell_name)
.map_err(|e| Failure::Runtime {
message: format!("failed to create shell log: {e}"),
span: None,
shell: Some(shell_name.clone()),
})?;
let shell_log = Arc::new(Mutex::new(shell_log));
let shell_command = rt_ctx.shell.command.to_string();
let shell_prompt = rt_ctx.shell.prompt.to_string();
let pty = PtyShell::spawn(&shell_command, ctx.process_env(), shell_log).map_err(|e| {
Failure::Runtime {
message: format!("failed to spawn shell: {e}"),
span: None,
shell: Some(shell_name.clone()),
}
})?;
let events = rt_ctx.events.clone();
let cancel = rt_ctx.cancel.clone();
let mut vm = Self {
pty,
ctx,
tables: rt_ctx.tables.clone(),
events: events.clone(),
shell_prompt,
cancel,
flaky_timeout_multiplier: rt_ctx.flaky_timeout_multiplier,
};
events.emit_shell_spawn(&shell_name, &shell_command);
vm.pty
.init_prompt(
&vm.shell_prompt,
vm.ctx
.timeout()
.adjusted_duration_with_flaky(vm.flaky_timeout_multiplier),
)
.await
.map_err(|_| Failure::Runtime {
message: "shell did not produce prompt during init".to_string(),
span: None,
shell: Some(shell_name),
})?;
vm.events.emit_shell_ready(vm.ctx.current_name());
Ok(vm)
}
pub fn current_name(&self) -> String {
self.ctx.current_name()
}
pub fn reset_for_export(&mut self, new_scope: context::Scope) {
self.ctx.reset_for_export(new_scope);
}
pub fn shell_prompt(&self) -> &str {
&self.shell_prompt
}
pub async fn exec_stmts(&mut self, stmts: &[IrShellStmt]) -> Result<String, Failure> {
let mut last = String::new();
for stmt in stmts {
if self.cancel.is_cancelled() {
return Err(Failure::Cancelled {
span: None,
shell: Some(self.ctx.current_name().to_string()),
});
}
last = self.exec_stmt(stmt).await?;
}
self.drain_recv_event().await;
Ok(last)
}
async fn drain_recv_event(&mut self) {
if let Some(_data) = self.pty.output_buf.drain_recv().await {
}
}
async fn emit_interpolation(&mut self, expr: &IrInterpolation, result: &str) {
if has_interpolation(expr) {
let mut bindings = Vec::new();
for part in expr.parts() {
match part {
IrStringPart::Var { name, .. } => {
let value = self.ctx.lookup(name).await.unwrap_or_default();
bindings.push((name.clone(), value));
}
IrStringPart::CaptureRef { index, .. } => {
let name = index.to_string();
let value = self.ctx.capture(*index).unwrap_or_default();
bindings.push((name, value));
}
_ => {}
}
}
let shell = self.ctx.current_name();
self.events
.emit_interpolation(&shell, interpolation_template(expr), result, &bindings);
}
}
pub async fn exec_stmt(&mut self, stmt: &IrShellStmt) -> Result<String, Failure> {
use relux_ir::IrNode;
let span = stmt.span().clone();
self.drain_recv_event().await;
self.check_fail(span.clone()).await?;
match stmt {
IrShellStmt::Comment { .. } => Ok(String::new()),
IrShellStmt::FailRegex {
pattern,
span: ir_span,
} => {
let pat = interpolate_ir(pattern, &self.ctx).await;
self.emit_interpolation(pattern, &pat).await;
let shell = self.ctx.current_name();
self.events.emit_fail_pattern_set(&shell, &pat);
let re = RegexBuilder::new(&pat)
.multi_line(true)
.crlf(true)
.build()
.map_err(|e| Failure::Runtime {
message: format!("invalid fail regex: {}", regex_error_summary(&e)),
span: Some(ir_span.clone()),
shell: Some(self.ctx.current_name().to_string()),
})?;
let fp = Some(FailPattern::Regex(re));
self.ctx.set_fail_pattern(fp);
self.check_fail(span).await?;
Ok(String::new())
}
IrShellStmt::FailLiteral { pattern, .. } => {
let pat = interpolate_ir(pattern, &self.ctx).await;
self.emit_interpolation(pattern, &pat).await;
let shell = self.ctx.current_name();
self.events.emit_fail_pattern_set(&shell, &pat);
let fp = Some(FailPattern::Literal(pat));
self.ctx.set_fail_pattern(fp);
self.check_fail(span).await?;
Ok(String::new())
}
IrShellStmt::ClearFailPattern { .. } => {
let shell = self.ctx.current_name();
self.events.emit_fail_pattern_cleared(&shell);
self.ctx.set_fail_pattern(None);
Ok(String::new())
}
IrShellStmt::Timeout { timeout, .. } => {
let previous = format!("{:?}", self.ctx.timeout());
self.ctx.set_timeout(timeout.clone());
let new_timeout = format!("{:?}", self.ctx.timeout());
let shell = self.ctx.current_name();
self.events
.emit_timeout_set(&shell, &new_timeout, &previous);
Ok(String::new())
}
IrShellStmt::Let { stmt: let_stmt, .. } => {
let value = if let Some(expr) = let_stmt.value() {
self.eval_expr(expr).await?
} else {
String::new()
};
let shell = self.ctx.current_name();
self.events
.emit_var_let(&shell, let_stmt.name().name(), &value);
self.ctx
.let_insert(let_stmt.name().name().to_string(), value.clone());
Ok(value)
}
IrShellStmt::Assign { stmt: assign, .. } => {
let value = self.eval_expr(assign.value()).await?;
let found = self.ctx.assign(assign.name().name(), value.clone()).await;
if !found {
return Err(Failure::Runtime {
message: format!(
"assignment to undeclared variable `{}`",
assign.name().name()
),
span: Some(assign.name().span().clone()),
shell: Some(self.ctx.current_name().to_string()),
});
}
let shell = self.ctx.current_name();
self.events
.emit_var_assign(&shell, assign.name().name(), &value);
Ok(value)
}
IrShellStmt::Expr { expr, .. } => self.eval_expr(expr).await,
IrShellStmt::Send { payload, .. } => {
let data = interpolate_ir(payload, &self.ctx).await;
self.emit_interpolation(payload, &data).await;
self.send_bytes(format!("{data}\n").as_bytes(), span.clone())
.await?;
let shell = self.ctx.current_name();
self.events.emit_send(&shell, &data);
Ok(data)
}
IrShellStmt::SendRaw { payload, .. } => {
let data = interpolate_ir(payload, &self.ctx).await;
self.emit_interpolation(payload, &data).await;
self.send_bytes(data.as_bytes(), span.clone()).await?;
let shell = self.ctx.current_name();
self.events.emit_send(&shell, &data);
Ok(data)
}
IrShellStmt::MatchLiteral { pattern, .. } => {
let timeout = self
.ctx
.timeout()
.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
let pat = interpolate_ir(pattern, &self.ctx).await;
self.emit_interpolation(pattern, &pat).await;
let shell = self.ctx.current_name();
self.events.emit_match_start(&shell, &pat, false);
let match_start = Instant::now();
let (mat, snapshot) = self.wait_consume_literal(&pat, timeout, span).await?;
let shell = self.ctx.current_name();
self.events.emit_match_done(
&shell,
&mat.value.0,
match_start.elapsed(),
snapshot,
None,
);
Ok(pat)
}
IrShellStmt::MatchRegex { pattern, .. } => {
let timeout = self
.ctx
.timeout()
.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
let pat = interpolate_ir(pattern, &self.ctx).await;
self.emit_interpolation(pattern, &pat).await;
let re = RegexBuilder::new(&pat)
.multi_line(true)
.crlf(true)
.build()
.map_err(|e| Failure::Runtime {
message: format!("invalid regex: {}", regex_error_summary(&e)),
span: Some(pattern.span().clone()),
shell: Some(self.ctx.current_name().to_string()),
})?;
let shell = self.ctx.current_name();
self.events.emit_match_start(&shell, &pat, true);
let match_start = Instant::now();
let (mat, snapshot) = self
.wait_consume_regex(&pat, &re, timeout, span.clone())
.await?;
let full = mat.value.0.get("0").cloned().unwrap_or_default();
let captures = mat.value.0.clone();
let shell = self.ctx.current_name();
self.events.emit_match_done(
&shell,
&full,
match_start.elapsed(),
snapshot,
Some(captures.clone()),
);
self.set_captures_from_map(captures);
Ok(full)
}
IrShellStmt::TimedMatchLiteral {
timeout, pattern, ..
} => {
let dur = timeout.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
let pat = interpolate_ir(pattern, &self.ctx).await;
self.emit_interpolation(pattern, &pat).await;
let shell = self.ctx.current_name();
self.events.emit_match_start(&shell, &pat, false);
let match_start = Instant::now();
let (mat, snapshot) = self.wait_consume_literal(&pat, dur, span).await?;
let shell = self.ctx.current_name();
self.events.emit_match_done(
&shell,
&mat.value.0,
match_start.elapsed(),
snapshot,
None,
);
Ok(pat)
}
IrShellStmt::TimedMatchRegex {
timeout, pattern, ..
} => {
let dur = timeout.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
let pat = interpolate_ir(pattern, &self.ctx).await;
self.emit_interpolation(pattern, &pat).await;
let re = RegexBuilder::new(&pat)
.multi_line(true)
.crlf(true)
.build()
.map_err(|e| Failure::Runtime {
message: format!("invalid regex: {}", regex_error_summary(&e)),
span: Some(pattern.span().clone()),
shell: Some(self.ctx.current_name().to_string()),
})?;
let shell = self.ctx.current_name();
self.events.emit_match_start(&shell, &pat, true);
let match_start = Instant::now();
let (mat, snapshot) = self
.wait_consume_regex(&pat, &re, dur, span.clone())
.await?;
let full = mat.value.0.get("0").cloned().unwrap_or_default();
let captures = mat.value.0.clone();
let shell = self.ctx.current_name();
self.events.emit_match_done(
&shell,
&full,
match_start.elapsed(),
snapshot,
Some(captures.clone()),
);
self.set_captures_from_map(captures);
Ok(full)
}
IrShellStmt::BufferReset { .. } => {
let snapshot = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
let shell = self.ctx.current_name();
self.events.emit_buffer_reset(&shell, snapshot);
self.pty.output_buf.clear().await;
Ok(String::new())
}
}
}
fn set_captures_from_map(&mut self, map: HashMap<String, String>) {
let mut caps = Captures::new();
for (k, v) in map {
caps.set(k, v);
}
self.ctx.set_captures(caps);
}
#[async_recursion::async_recursion]
async fn eval_expr(&mut self, expr: &IrExpr) -> Result<String, Failure> {
use relux_ir::IrNode;
let span = expr.span().clone();
self.check_fail(span.clone()).await?;
match expr {
IrExpr::String { value, .. } => {
let result = interpolate_ir(value, &self.ctx).await;
self.emit_interpolation(value, &result).await;
let shell = self.ctx.current_name();
self.events.emit_string_eval(&shell, &result);
Ok(result)
}
IrExpr::Var { name, .. } => Ok(self.ctx.lookup(name).await.unwrap_or_default()),
IrExpr::CaptureRef { index, .. } => Ok(self.ctx.capture(*index).unwrap_or_default()),
IrExpr::Call { call, .. } => self.eval_call(call, &span).await,
}
}
async fn eval_call(&mut self, call: &IrCallExpr, span: &IrSpan) -> Result<String, Failure> {
let fn_id = call.resolved().clone();
let fn_name = call.name().name().to_string();
let mut evaluated_args = Vec::with_capacity(call.args().len());
for arg in call.args() {
evaluated_args.push(self.eval_expr(arg).await?);
}
if let Some(result) = self.tables.fns.get(&fn_id) {
let ir_fn = result.as_ref().map_err(|e| Failure::Runtime {
message: format!("function resolution failed: {e:?}"),
span: Some(span.clone()),
shell: Some(self.ctx.current_name().to_string()),
})?;
match ir_fn {
IrFn::UserDefined { params, body, .. } => {
let params = params.clone();
let body = body.clone();
let named_args: Vec<(String, String)> = params
.iter()
.zip(evaluated_args.iter())
.map(|(p, v)| (p.name().to_string(), v.clone()))
.collect();
let shell = self.ctx.current_name();
self.events.emit_fn_enter(&shell, &fn_name, &named_args);
let pre_timeout = format!("{:?}", self.ctx.timeout());
let pre_fail = self.ctx.fail_pattern().map(|p| format!("{p:?}"));
self.ctx
.push_call(fn_name.clone(), named_args.into_iter().collect());
let mut last = String::new();
for stmt in &body {
match self.exec_stmt(stmt).await {
Ok(v) => last = v,
Err(e) => {
self.ctx.pop_call();
return Err(e);
}
}
}
self.ctx.pop_call();
let post_timeout = format!("{:?}", self.ctx.timeout());
let post_fail = self.ctx.fail_pattern().map(|p| format!("{p:?}"));
let shell = self.ctx.current_name();
self.events.emit_fn_exit(
&shell,
&fn_name,
&last,
if post_timeout != pre_timeout {
Some(&post_timeout)
} else {
None
},
if post_fail != pre_fail {
post_fail.as_deref()
} else {
None
},
);
return Ok(last);
}
IrFn::Builtin { name, arity } => {
if let Some(bif) = bifs::lookup_impure(name, *arity) {
let positional_args: Vec<(String, String)> = evaluated_args
.iter()
.enumerate()
.map(|(i, v)| (format!("${i}"), v.clone()))
.collect();
let shell = self.ctx.current_name();
self.events
.emit_fn_enter(&shell, &fn_name, &positional_args);
let result = bif.call(self, evaluated_args, span).await;
let return_value = result.clone().unwrap_or_default();
let shell = self.ctx.current_name();
self.events.emit_fn_exit(
&shell,
&fn_name,
&return_value,
None::<&str>,
None::<&str>,
);
return result;
}
}
}
}
if let Some(result) = self.tables.pure_fns.get(&fn_id) {
let ir_fn = result.as_ref().map_err(|e| Failure::Runtime {
message: format!("pure function resolution failed: {e:?}"),
span: Some(span.clone()),
shell: Some(self.ctx.current_name().to_string()),
})?;
let named_args: Vec<(String, String)> = match ir_fn {
IrPureFn::UserDefined { params, .. } => params
.iter()
.zip(evaluated_args.iter())
.map(|(p, v)| (p.name().to_string(), v.clone()))
.collect(),
IrPureFn::Builtin { .. } => evaluated_args
.iter()
.enumerate()
.map(|(i, v)| (format!("${i}"), v.clone()))
.collect(),
};
let shell = self.ctx.current_name();
self.events.emit_fn_enter(&shell, &fn_name, &named_args);
let return_value = relux_ir::evaluator::eval_pure_fn(
ir_fn,
evaluated_args,
&self.ctx.env,
&self.tables.pure_fns,
);
let shell = self.ctx.current_name();
self.events
.emit_fn_exit(&shell, &fn_name, &return_value, None::<&str>, None::<&str>);
return Ok(return_value);
}
Err(Failure::Runtime {
message: format!(
"undefined function `{}` with arity {}",
fn_name,
call.args().len()
),
span: Some(span.clone()),
shell: Some(self.ctx.current_name().to_string()),
})
}
pub async fn match_literal(&mut self, pattern: &str, span: &IrSpan) -> Result<String, Failure> {
let shell = self.ctx.current_name();
self.events.emit_match_start(&shell, pattern, false);
let match_start = Instant::now();
let timeout = self
.ctx
.timeout()
.adjusted_duration_with_flaky(self.flaky_timeout_multiplier);
let (mat, snapshot) = self
.wait_consume_literal(pattern, timeout, span.clone())
.await?;
let shell = self.ctx.current_name();
self.events
.emit_match_done(&shell, &mat.value.0, match_start.elapsed(), snapshot, None);
Ok(pattern.to_string())
}
pub async fn send_line(&mut self, line: &str, span: &IrSpan) -> Result<(), Failure> {
self.send_bytes(format!("{line}\n").as_bytes(), span.clone())
.await?;
let shell = self.ctx.current_name();
self.events.emit_send(&shell, line);
Ok(())
}
pub async fn send_raw(&mut self, data: &[u8], span: &IrSpan) -> Result<(), Failure> {
self.send_bytes(data, span.clone()).await?;
let display = data
.iter()
.map(|b| format!("\\x{b:02x}"))
.collect::<String>();
let shell = self.ctx.current_name();
self.events.emit_send(&shell, &display);
Ok(())
}
async fn wait_consume_literal(
&self,
pattern: &str,
timeout: Duration,
span: IrSpan,
) -> Result<(buffer::Match<buffer::LiteralMatch>, BufferSnapshot), Failure> {
let fut = async {
loop {
let notified = self.pty.output_buf.notify.notified();
let fail_pat = self.ctx.fail_pattern();
match self
.pty
.output_buf
.fail_check_consume_literal(pattern, fail_pat)
.await
{
Err(hit) => {
return Err(self.make_fail_pattern_error(hit, span.clone()).await);
}
Ok(Some(result)) => {
return Ok::<(buffer::Match<buffer::LiteralMatch>, BufferSnapshot), Failure>(
result,
);
}
Ok(None) => {}
}
tokio::select! {
_ = notified => {}
_ = self.cancel.cancelled() => {
return Err(Failure::Cancelled {
span: Some(span.clone()),
shell: Some(self.ctx.current_name().to_string()),
});
}
}
}
};
match tokio::time::timeout(timeout, fut).await {
Ok(result) => result,
Err(_) => {
let shell = self.ctx.current_name();
let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
self.events.emit_timeout(&shell, pattern, buffer);
Err(Failure::MatchTimeout {
pattern: pattern.to_string(),
span,
shell: self.ctx.current_name().to_string(),
})
}
}
}
async fn wait_consume_regex(
&self,
pattern: &str,
re: &Regex,
timeout: Duration,
span: IrSpan,
) -> Result<(buffer::Match<buffer::RegexMatch>, BufferSnapshot), Failure> {
let fut = async {
loop {
let notified = self.pty.output_buf.notify.notified();
let fail_pat = self.ctx.fail_pattern();
match self
.pty
.output_buf
.fail_check_consume_regex(re, fail_pat)
.await
{
Err(hit) => {
return Err(self.make_fail_pattern_error(hit, span.clone()).await);
}
Ok(Some(result)) => {
return Ok::<(buffer::Match<buffer::RegexMatch>, BufferSnapshot), Failure>(
result,
);
}
Ok(None) => {}
}
tokio::select! {
_ = notified => {}
_ = self.cancel.cancelled() => {
return Err(Failure::Cancelled {
span: Some(span.clone()),
shell: Some(self.ctx.current_name().to_string()),
});
}
}
}
};
match tokio::time::timeout(timeout, fut).await {
Ok(result) => result,
Err(_) => {
let shell = self.ctx.current_name();
let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
self.events.emit_timeout(&shell, pattern, buffer);
Err(Failure::MatchTimeout {
pattern: pattern.to_string(),
span,
shell: self.ctx.current_name().to_string(),
})
}
}
}
async fn check_fail(&self, span: IrSpan) -> Result<(), Failure> {
let fail_pat = self.ctx.fail_pattern();
if let Some(hit) = self.pty.output_buf.check_fail_pattern(fail_pat).await {
return Err(self.make_fail_pattern_error(hit, span).await);
}
Ok(())
}
async fn make_fail_pattern_error(&self, hit: FailPatternHit, span: IrSpan) -> Failure {
let shell = self.ctx.current_name();
let buffer = self.pty.output_buf.snapshot_tail(BUFFER_TAIL_LEN).await;
self.events
.emit_fail_pattern_triggered(&shell, &hit.pattern, &hit.matched_text, buffer);
Failure::FailPatternMatched {
pattern: hit.pattern,
matched_line: hit.matched_text,
span,
shell: self.ctx.current_name().to_string(),
}
}
async fn send_bytes(&mut self, data: &[u8], span: IrSpan) -> Result<(), Failure> {
self.pty
.send_bytes(data)
.await
.map_err(|e| Failure::ShellExited {
shell: self.ctx.current_name().to_string(),
exit_code: e.raw_os_error(),
span,
})
}
pub async fn shutdown(&mut self) {
let shell = self.ctx.current_name();
self.events.emit_shell_terminate(&shell);
self.pty.shutdown().await;
}
}