use std::collections::{HashMap, VecDeque};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread::JoinHandle;
use super::*;
pub type InMemoryCommand =
Arc<dyn Fn(&[String], &[(String, String)], &Path, SpawnStdio) -> i32 + Send + Sync>;
pub struct InMemoryRuntime {
commands: HashMap<String, InMemoryCommand>,
next_job_id: u32,
pending: HashMap<u32, JoinHandle<i32>>,
}
impl Clone for InMemoryRuntime {
fn clone(&self) -> Self {
Self {
commands: self.commands.clone(),
next_job_id: self.next_job_id,
pending: HashMap::new(),
}
}
}
impl std::fmt::Debug for InMemoryRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InMemoryRuntime")
.field("commands_len", &self.commands.len())
.finish()
}
}
impl InMemoryRuntime {
pub fn new() -> Self {
Self {
commands: HashMap::new(),
next_job_id: 1,
pending: HashMap::new(),
}
}
pub fn register_command<F>(&mut self, name: &str, command: F)
where
F: Fn(&[String], &[(String, String)], &Path, SpawnStdio) -> i32 + Send + Sync + 'static,
{
self.commands.insert(name.to_string(), Arc::new(command));
}
}
impl Default for InMemoryRuntime {
fn default() -> Self {
Self::new()
}
}
impl Runtime for InMemoryRuntime {
type ForegroundGuard = ();
fn fork(&self) -> Result<Self, io::Error> {
Ok(Self {
commands: self.commands.clone(),
next_job_id: self.next_job_id,
pending: HashMap::new(),
})
}
fn spawn_external_command(
&mut self,
command: &ExternalCommand,
stdio: SpawnStdio,
_close_fds: &[FileDescriptor],
_mode: SpawnMode,
) -> Result<SpawnedProcess, io::Error> {
fn dup_stage_fd(fd: FileDescriptor) -> (FileDescriptor, bool) {
if !fd.is_valid() {
return (fd, false);
}
match fd.dup() {
Ok(dup) => (dup, true),
Err(_) => (fd, false),
}
}
let argv = command.argv.clone();
let env = command.env.clone();
let cwd = command.cwd.clone();
let (stdin_fd, close_stdin) = dup_stage_fd(stdio.stdin_fd);
let (stdout_fd, close_stdout) = dup_stage_fd(stdio.stdout_fd);
let (stderr_fd, close_stderr) = dup_stage_fd(stdio.stderr_fd);
let stage_stdio = SpawnStdio {
stdin_fd,
stdout_fd,
stderr_fd,
};
let maybe_cmd = argv
.first()
.and_then(|program| self.commands.get(program))
.cloned();
let handle = std::thread::spawn(move || {
let status = match maybe_cmd {
Some(cmd) => cmd(&argv, &env, &cwd, stage_stdio),
None => 127,
};
if close_stdin {
stdin_fd.close();
}
if close_stdout {
stdout_fd.close();
}
if close_stderr {
stderr_fd.close();
}
status
});
let child = self.next_job_id;
self.next_job_id += 1;
self.pending.insert(child, handle);
Ok(SpawnedProcess {
handle: ProcessHandle::new(child as u64),
display_pid: Some(child),
})
}
fn wait_process(
&mut self,
process: ProcessHandle,
mode: WaitMode,
) -> Result<ProcessEvent, io::Error> {
let child = process.as_u64() as u32;
match mode {
WaitMode::Poll => {
let Some(handle) = self.pending.get(&child) else {
return Ok(ProcessEvent::Exited(0));
};
if !handle.is_finished() {
return Ok(ProcessEvent::Running);
}
let handle = self
.pending
.remove(&child)
.expect("pending handle should exist");
Ok(ProcessEvent::Exited(handle.join().unwrap_or(128)))
}
WaitMode::Block => {
let Some(handle) = self.pending.remove(&child) else {
return Ok(ProcessEvent::Exited(0));
};
Ok(ProcessEvent::Exited(handle.join().unwrap_or(128)))
}
}
}
fn signal_process_group(
&mut self,
_process: ProcessHandle,
_signal: RuntimeSignal,
) -> Result<(), io::Error> {
Ok(())
}
fn claim_foreground(
&mut self,
_process: ProcessHandle,
_tty: FileDescriptor,
) -> Result<Self::ForegroundGuard, io::Error> {
Ok(())
}
fn release_foreground(&mut self, _guard: Self::ForegroundGuard) -> Result<(), io::Error> {
Ok(())
}
fn has_command(&self, program: &str) -> bool {
self.commands.contains_key(program)
}
fn resolve_command_path(
&self,
program: &str,
path_var: &str,
cwd: &Path,
) -> Result<PathBuf, io::Error> {
if self.commands.contains_key(program) {
return Ok(PathBuf::from(program));
}
super::resolve_command_path(program, path_var, cwd)
}
fn exec_replace(
&self,
_program: &str,
_argv: &[String],
_env: &[(String, String)],
_cwd: &Path,
) -> Result<(), io::Error> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"exec is unavailable in the in-memory runtime",
))
}
}
#[derive(Clone, Debug)]
enum DeterministicSpawn {
Spawn(DeterministicProcess),
Fail {
kind: io::ErrorKind,
message: String,
},
}
#[derive(Clone, Debug)]
struct DeterministicProcess {
handle: ProcessHandle,
display_pid: Option<u32>,
poll_events: VecDeque<ProcessEvent>,
block_events: VecDeque<ProcessEvent>,
last_event: ProcessEvent,
}
impl DeterministicProcess {
fn next_event(&mut self, mode: WaitMode) -> ProcessEvent {
let next = match mode {
WaitMode::Poll => self.poll_events.pop_front(),
WaitMode::Block => self.block_events.pop_front(),
};
if let Some(event) = next {
self.last_event = event;
event
} else {
self.last_event
}
}
fn is_terminal(&self) -> bool {
matches!(
self.last_event,
ProcessEvent::Exited(_) | ProcessEvent::Signaled(_)
)
}
}
#[derive(Clone, Debug, Default)]
pub struct DeterministicRuntime {
next_handle: u64,
queued_spawns: VecDeque<DeterministicSpawn>,
processes: HashMap<ProcessHandle, DeterministicProcess>,
display_to_handle: HashMap<u32, ProcessHandle>,
recorded_signals: Vec<(ProcessHandle, RuntimeSignal)>,
foreground_claims: Vec<(ProcessHandle, FileDescriptor)>,
foreground_releases: usize,
}
impl DeterministicRuntime {
pub fn new() -> Self {
Self {
next_handle: 1,
queued_spawns: VecDeque::new(),
processes: HashMap::new(),
display_to_handle: HashMap::new(),
recorded_signals: Vec::new(),
foreground_claims: Vec::new(),
foreground_releases: 0,
}
}
pub fn push_spawn(
&mut self,
display_pid: Option<u32>,
poll_events: impl IntoIterator<Item = ProcessEvent>,
block_events: impl IntoIterator<Item = ProcessEvent>,
) -> ProcessHandle {
let handle = ProcessHandle::new(self.next_handle);
self.next_handle += 1;
let process = DeterministicProcess {
handle,
display_pid,
poll_events: poll_events.into_iter().collect(),
block_events: block_events.into_iter().collect(),
last_event: ProcessEvent::Running,
};
self.queued_spawns
.push_back(DeterministicSpawn::Spawn(process));
handle
}
pub fn push_spawn_error(&mut self, kind: io::ErrorKind, message: impl Into<String>) {
self.queued_spawns.push_back(DeterministicSpawn::Fail {
kind,
message: message.into(),
});
}
pub fn recorded_signals(&self) -> &[(ProcessHandle, RuntimeSignal)] {
&self.recorded_signals
}
pub fn foreground_claims(&self) -> &[(ProcessHandle, FileDescriptor)] {
&self.foreground_claims
}
pub fn foreground_releases(&self) -> usize {
self.foreground_releases
}
}
impl Runtime for DeterministicRuntime {
type ForegroundGuard = (ProcessHandle, FileDescriptor);
fn fork(&self) -> Result<Self, io::Error> {
Ok(Self {
next_handle: self.next_handle,
queued_spawns: VecDeque::new(),
processes: HashMap::new(),
display_to_handle: HashMap::new(),
recorded_signals: Vec::new(),
foreground_claims: Vec::new(),
foreground_releases: 0,
})
}
fn spawn_external_command(
&mut self,
_command: &ExternalCommand,
_stdio: SpawnStdio,
_close_fds: &[FileDescriptor],
_mode: SpawnMode,
) -> Result<SpawnedProcess, io::Error> {
match self.queued_spawns.pop_front() {
Some(DeterministicSpawn::Spawn(process)) => {
let display_pid = process.display_pid;
let handle = process.handle;
if let Some(display_pid) = display_pid {
self.display_to_handle.insert(display_pid, handle);
}
self.processes.insert(handle, process);
Ok(SpawnedProcess {
handle,
display_pid,
})
}
Some(DeterministicSpawn::Fail { kind, message }) => Err(io::Error::new(kind, message)),
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"no queued deterministic spawn",
)),
}
}
fn wait_process(
&mut self,
process: ProcessHandle,
mode: WaitMode,
) -> Result<ProcessEvent, io::Error> {
let Some(proc_state) = self.processes.get_mut(&process) else {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"unknown deterministic process handle",
));
};
let event = proc_state.next_event(mode);
let finished = proc_state.is_terminal();
let finished_display_pid = proc_state.display_pid;
if finished {
if let Some(display_pid) = finished_display_pid {
self.display_to_handle.remove(&display_pid);
}
self.processes.remove(&process);
}
Ok(event)
}
fn signal_process_group(
&mut self,
process: ProcessHandle,
signal: RuntimeSignal,
) -> Result<(), io::Error> {
self.recorded_signals.push((process, signal));
Ok(())
}
fn claim_foreground(
&mut self,
process: ProcessHandle,
tty: FileDescriptor,
) -> Result<Self::ForegroundGuard, io::Error> {
self.foreground_claims.push((process, tty));
Ok((process, tty))
}
fn release_foreground(&mut self, _guard: Self::ForegroundGuard) -> Result<(), io::Error> {
self.foreground_releases += 1;
Ok(())
}
fn resolve_command_path(
&self,
program: &str,
_path_var: &str,
_cwd: &Path,
) -> Result<PathBuf, io::Error> {
Ok(PathBuf::from(program))
}
fn exec_replace(
&self,
_program: &str,
_argv: &[String],
_env: &[(String, String)],
_cwd: &Path,
) -> Result<(), io::Error> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"exec is unavailable in the deterministic runtime",
))
}
}