use std::{sync::Arc, time::Duration};
use bytes::Bytes;
use microsandbox_protocol::{
exec::{ExecResize, ExecSignal, ExecStdin},
message::MessageType,
};
use tokio::sync::mpsc;
use crate::{MicrosandboxResult, agent::AgentClient};
use microsandbox_types::EnvVar;
#[derive(Debug, Clone, Default)]
pub struct ExecOptions {
pub args: Vec<String>,
pub cwd: Option<String>,
pub user: Option<String>,
pub env: Vec<EnvVar>,
pub timeout: Option<Duration>,
pub stdin: StdinMode,
pub tty: bool,
pub rlimits: Vec<Rlimit>,
}
#[derive(Default)]
pub struct ExecOptionsBuilder {
options: ExecOptions,
}
#[derive(Debug, Clone, Default)]
pub enum StdinMode {
#[default]
Null,
Pipe,
Bytes(Vec<u8>),
}
#[derive(Debug)]
pub struct ExecOutput {
status: ExitStatus,
stdout: Bytes,
stderr: Bytes,
}
#[derive(Debug, Clone, Copy)]
pub struct ExitStatus {
pub code: i32,
pub success: bool,
}
pub struct ExecHandle {
id: u32,
events: mpsc::UnboundedReceiver<ExecEvent>,
stdin: Option<ExecSink>,
client: Arc<AgentClient>,
}
#[derive(Clone)]
pub struct ExecControl {
id: u32,
client: Arc<AgentClient>,
}
#[derive(Debug)]
pub enum ExecEvent {
Started {
pid: u32,
},
Stdout(Bytes),
Stderr(Bytes),
Exited {
code: i32,
},
Failed(microsandbox_protocol::exec::ExecFailed),
StdinError(microsandbox_protocol::exec::ExecStdinError),
}
pub struct ExecSink {
id: u32,
client: Arc<AgentClient>,
}
impl ExecOptionsBuilder {
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.options.args.push(arg.into());
self
}
pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.options.args.extend(args.into_iter().map(Into::into));
self
}
pub fn cwd(mut self, cwd: impl Into<String>) -> Self {
self.options.cwd = Some(cwd.into());
self
}
pub fn user(mut self, user: impl Into<String>) -> Self {
self.options.user = Some(user.into());
self
}
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options.env.push(EnvVar::new(key, value));
self
}
pub fn envs(
mut self,
vars: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.options
.env
.extend(vars.into_iter().map(|(key, value)| EnvVar::new(key, value)));
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.options.timeout = Some(timeout);
self
}
pub fn stdin_null(mut self) -> Self {
self.options.stdin = StdinMode::Null;
self
}
pub fn stdin_pipe(mut self) -> Self {
self.options.stdin = StdinMode::Pipe;
self
}
pub fn stdin_bytes(mut self, data: impl Into<Vec<u8>>) -> Self {
self.options.stdin = StdinMode::Bytes(data.into());
self
}
pub fn tty(mut self, enabled: bool) -> Self {
self.options.tty = enabled;
self
}
pub fn rlimit(mut self, resource: RlimitResource, limit: u64) -> Self {
self.options.rlimits.push(Rlimit {
resource,
soft: limit,
hard: limit,
});
self
}
pub fn rlimit_range(mut self, resource: RlimitResource, soft: u64, hard: u64) -> Self {
self.options.rlimits.push(Rlimit {
resource,
soft,
hard,
});
self
}
pub fn build(self) -> MicrosandboxResult<ExecOptions> {
validate_rlimits(&self.options.rlimits)?;
Ok(self.options)
}
}
pub(crate) fn validate_rlimits(rlimits: &[Rlimit]) -> MicrosandboxResult<()> {
for rlimit in rlimits {
if rlimit.soft > rlimit.hard {
return Err(crate::MicrosandboxError::InvalidConfig(format!(
"rlimit {}: soft ({}) must not exceed hard ({})",
rlimit.resource.as_str(),
rlimit.soft,
rlimit.hard
)));
}
}
Ok(())
}
impl ExecOutput {
pub(crate) fn from_parts(status: ExitStatus, stdout: Bytes, stderr: Bytes) -> Self {
Self {
status,
stdout,
stderr,
}
}
pub fn status(&self) -> ExitStatus {
self.status
}
pub fn stdout(&self) -> Result<String, std::string::FromUtf8Error> {
String::from_utf8(self.stdout.to_vec())
}
pub fn stderr(&self) -> Result<String, std::string::FromUtf8Error> {
String::from_utf8(self.stderr.to_vec())
}
pub fn stdout_bytes(&self) -> &Bytes {
&self.stdout
}
pub fn stderr_bytes(&self) -> &Bytes {
&self.stderr
}
}
impl ExecHandle {
pub(crate) fn new(
id: u32,
events: mpsc::UnboundedReceiver<ExecEvent>,
stdin: Option<ExecSink>,
client: Arc<AgentClient>,
) -> Self {
Self {
id,
events,
stdin,
client,
}
}
pub fn id(&self) -> String {
self.id.to_string()
}
pub fn control(&self) -> ExecControl {
ExecControl {
id: self.id,
client: Arc::clone(&self.client),
}
}
#[cfg(feature = "ssh")]
pub(crate) fn into_parts(
self,
) -> (
ExecControl,
Option<ExecSink>,
mpsc::UnboundedReceiver<ExecEvent>,
) {
(
ExecControl {
id: self.id,
client: Arc::clone(&self.client),
},
self.stdin,
self.events,
)
}
pub async fn recv(&mut self) -> Option<ExecEvent> {
self.events.recv().await
}
pub fn take_stdin(&mut self) -> Option<ExecSink> {
self.stdin.take()
}
pub async fn wait(&mut self) -> MicrosandboxResult<ExitStatus> {
while let Some(event) = self.events.recv().await {
match event {
ExecEvent::Exited { code } => {
return Ok(ExitStatus {
code,
success: code == 0,
});
}
ExecEvent::Failed(payload) => {
return Err(crate::MicrosandboxError::ExecFailed(payload));
}
_ => {}
}
}
Err(crate::MicrosandboxError::Runtime(
"exec session ended without exit event".into(),
))
}
pub async fn collect(&mut self) -> MicrosandboxResult<ExecOutput> {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut exit_code: Option<i32> = None;
while let Some(event) = self.events.recv().await {
match event {
ExecEvent::Started { pid: _ } => {}
ExecEvent::Stdout(data) => {
stdout.extend_from_slice(&data);
}
ExecEvent::Stderr(data) => {
stderr.extend_from_slice(&data);
}
ExecEvent::Exited { code } => {
exit_code = Some(code);
break;
}
ExecEvent::Failed(payload) => {
return Err(crate::MicrosandboxError::ExecFailed(payload));
}
ExecEvent::StdinError(_) => {}
}
}
let code = exit_code.ok_or_else(|| {
crate::MicrosandboxError::Runtime("exec session ended without exit event".into())
})?;
Ok(ExecOutput {
status: ExitStatus {
code,
success: code == 0,
},
stdout: Bytes::from(stdout),
stderr: Bytes::from(stderr),
})
}
pub async fn signal(&self, signal: i32) -> MicrosandboxResult<()> {
self.control().signal(signal).await
}
pub async fn kill(&self) -> MicrosandboxResult<()> {
self.control().kill().await
}
pub async fn resize(&self, rows: u16, cols: u16) -> MicrosandboxResult<()> {
self.control().resize(rows, cols).await
}
}
impl ExecControl {
pub fn id(&self) -> String {
self.id.to_string()
}
pub async fn signal(&self, signal: i32) -> MicrosandboxResult<()> {
let payload = ExecSignal { signal };
self.client
.send(self.id, MessageType::ExecSignal, &payload)
.await?;
Ok(())
}
pub async fn kill(&self) -> MicrosandboxResult<()> {
self.signal(9).await
}
pub async fn resize(&self, rows: u16, cols: u16) -> MicrosandboxResult<()> {
let payload = ExecResize { rows, cols };
self.client
.send(self.id, MessageType::ExecResize, &payload)
.await?;
Ok(())
}
}
impl ExecSink {
pub(crate) fn new(id: u32, client: Arc<AgentClient>) -> Self {
Self { id, client }
}
pub async fn write(&self, data: impl AsRef<[u8]>) -> MicrosandboxResult<()> {
let payload = ExecStdin {
data: data.as_ref().to_vec(),
};
self.client
.send(self.id, MessageType::ExecStdin, &payload)
.await?;
Ok(())
}
pub async fn close(&self) -> MicrosandboxResult<()> {
let payload = ExecStdin { data: Vec::new() };
self.client
.send(self.id, MessageType::ExecStdin, &payload)
.await?;
Ok(())
}
}
pub(crate) mod local {
use std::sync::Arc;
use bytes::Bytes;
use microsandbox_protocol::{
exec::{ExecExited, ExecStarted, ExecStderr, ExecStdin, ExecStdout},
message::{Message, MessageType},
};
use tokio::sync::mpsc;
use crate::{
MicrosandboxError, MicrosandboxResult,
backend::LocalBackend,
sandbox::{SandboxConfig, build_exec_request},
};
use super::{ExecEvent, ExecHandle, ExecOptions, ExecOutput, ExecSink, ExitStatus, StdinMode};
pub(crate) async fn exec_stream(
local: &LocalBackend,
name: &str,
config: &SandboxConfig,
cmd: String,
opts: ExecOptions,
) -> MicrosandboxResult<ExecHandle> {
exec_stream_with_pty_size(local, name, config, cmd, opts, 24, 80).await
}
pub(crate) async fn exec_stream_with_pty_size(
local: &LocalBackend,
name: &str,
config: &SandboxConfig,
cmd: String,
opts: ExecOptions,
rows: u16,
cols: u16,
) -> MicrosandboxResult<ExecHandle> {
let client = Arc::new(super::super::fs::local::connect_agent(local, name).await?);
let ExecOptions {
args,
cwd,
user,
env,
rlimits,
tty,
stdin: stdin_mode,
timeout: _,
} = opts;
tracing::debug!(
sandbox = %name,
cmd = %cmd,
args = ?args,
cwd = ?cwd,
tty,
"exec_stream"
);
let req = build_exec_request(
config, cmd, args, cwd, user, &env, &rlimits, tty, rows, cols,
);
let (id, rx) = client.stream(MessageType::ExecRequest, &req).await?;
let stdin = match &stdin_mode {
StdinMode::Pipe => Some(ExecSink::new(id, Arc::clone(&client))),
_ => None,
};
if let StdinMode::Bytes(ref data) = stdin_mode {
let data = data.clone();
let bridge = Arc::clone(&client);
tokio::spawn(async move {
let payload = ExecStdin { data };
let _ = bridge.send(id, MessageType::ExecStdin, &payload).await;
let close = ExecStdin { data: Vec::new() };
let _ = bridge.send(id, MessageType::ExecStdin, &close).await;
});
}
let (event_tx, event_rx) = mpsc::unbounded_channel();
tokio::spawn(event_mapper_task(rx, event_tx));
Ok(ExecHandle::new(id, event_rx, stdin, client))
}
pub(crate) async fn exec(
local: &LocalBackend,
name: &str,
config: &SandboxConfig,
cmd: String,
opts: ExecOptions,
) -> MicrosandboxResult<ExecOutput> {
let timeout_duration = opts.timeout;
let mut handle = exec_stream(local, name, config, cmd, opts).await?;
match timeout_duration {
Some(duration) => match tokio::time::timeout(duration, handle.collect()).await {
Ok(result) => result,
Err(_) => {
let _ = handle.kill().await;
let _ =
tokio::time::timeout(std::time::Duration::from_secs(5), handle.collect())
.await;
Err(MicrosandboxError::ExecTimeout(duration))
}
},
None => handle.collect().await,
}
}
async fn event_mapper_task(
mut rx: mpsc::Receiver<Message>,
tx: mpsc::UnboundedSender<ExecEvent>,
) {
while let Some(msg) = rx.recv().await {
let event = match msg.t {
MessageType::ExecStarted => match msg.payload::<ExecStarted>() {
Ok(started) => ExecEvent::Started { pid: started.pid },
Err(_) => continue,
},
MessageType::ExecStdout => match msg.payload::<ExecStdout>() {
Ok(out) => ExecEvent::Stdout(Bytes::from(out.data)),
Err(_) => continue,
},
MessageType::ExecStderr => match msg.payload::<ExecStderr>() {
Ok(err) => ExecEvent::Stderr(Bytes::from(err.data)),
Err(_) => continue,
},
MessageType::ExecExited => {
if let Ok(exited) = msg.payload::<ExecExited>() {
let _ = tx.send(ExecEvent::Exited { code: exited.code });
}
break;
}
MessageType::ExecFailed => {
if let Ok(failed) = msg.payload::<microsandbox_protocol::exec::ExecFailed>() {
let _ = tx.send(ExecEvent::Failed(failed));
}
break;
}
MessageType::ExecStdinError => {
match msg.payload::<microsandbox_protocol::exec::ExecStdinError>() {
Ok(payload) => ExecEvent::StdinError(payload),
Err(_) => continue,
}
}
_ => continue,
};
if tx.send(event).is_err() {
break;
}
}
}
#[allow(dead_code)]
pub(crate) fn _exit_status(code: i32) -> ExitStatus {
ExitStatus {
code,
success: code == 0,
}
}
}
pub use microsandbox_types::{Rlimit, RlimitResource};