use std::{sync::Arc, time::Duration};
use bytes::Bytes;
use microsandbox_protocol::{
exec::{ExecSignal, ExecStdin},
message::{Message, MessageType},
};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use crate::{MicrosandboxResult, agent::AgentClient};
#[derive(Debug, Clone, Default)]
pub struct ExecOptions {
pub args: Vec<String>,
pub cwd: Option<String>,
pub user: Option<String>,
pub env: Vec<(String, String)>,
pub timeout: Option<Duration>,
pub stdin: StdinMode,
pub tty: bool,
pub rlimits: Vec<Rlimit>,
}
#[derive(Default)]
pub struct ExecOptionsBuilder {
options: ExecOptions,
}
#[derive(Debug, Clone, Default)]
pub enum StdinMode {
#[default]
Null,
Pipe,
Bytes(Vec<u8>),
}
#[derive(Debug)]
pub struct ExecOutput {
status: ExitStatus,
stdout: Bytes,
stderr: Bytes,
}
#[derive(Debug, Clone, Copy)]
pub struct ExitStatus {
pub code: i32,
pub success: bool,
}
pub struct ExecHandle {
id: u32,
events: mpsc::UnboundedReceiver<ExecEvent>,
stdin: Option<ExecSink>,
client: Arc<AgentClient>,
}
#[derive(Debug)]
pub enum ExecEvent {
Started {
pid: u32,
},
Stdout(Bytes),
Stderr(Bytes),
Exited {
code: i32,
},
}
pub struct ExecSink {
id: u32,
client: Arc<AgentClient>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Rlimit {
pub resource: RlimitResource,
pub soft: u64,
pub hard: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RlimitResource {
Cpu,
Fsize,
Data,
Stack,
Core,
Rss,
Nproc,
Nofile,
Memlock,
As,
Locks,
Sigpending,
Msgqueue,
Nice,
Rtprio,
Rttime,
}
impl ExecOptionsBuilder {
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.options.args.push(arg.into());
self
}
pub fn args(mut self, args: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.options.args.extend(args.into_iter().map(Into::into));
self
}
pub fn cwd(mut self, cwd: impl Into<String>) -> Self {
self.options.cwd = Some(cwd.into());
self
}
pub fn user(mut self, user: impl Into<String>) -> Self {
self.options.user = Some(user.into());
self
}
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.options.env.push((key.into(), value.into()));
self
}
pub fn envs(
mut self,
vars: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Self {
self.options
.env
.extend(vars.into_iter().map(|(k, v)| (k.into(), v.into())));
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.options.timeout = Some(timeout);
self
}
pub fn stdin_null(mut self) -> Self {
self.options.stdin = StdinMode::Null;
self
}
pub fn stdin_pipe(mut self) -> Self {
self.options.stdin = StdinMode::Pipe;
self
}
pub fn stdin_bytes(mut self, data: impl Into<Vec<u8>>) -> Self {
self.options.stdin = StdinMode::Bytes(data.into());
self
}
pub fn tty(mut self, enabled: bool) -> Self {
self.options.tty = enabled;
self
}
pub fn rlimit(mut self, resource: RlimitResource, limit: u64) -> Self {
self.options.rlimits.push(Rlimit {
resource,
soft: limit,
hard: limit,
});
self
}
pub fn rlimit_range(mut self, resource: RlimitResource, soft: u64, hard: u64) -> Self {
self.options.rlimits.push(Rlimit {
resource,
soft,
hard,
});
self
}
pub fn build(self) -> ExecOptions {
self.options
}
}
impl ExecOutput {
pub fn status(&self) -> ExitStatus {
self.status
}
pub fn stdout(&self) -> Result<String, std::string::FromUtf8Error> {
String::from_utf8(self.stdout.to_vec())
}
pub fn stderr(&self) -> Result<String, std::string::FromUtf8Error> {
String::from_utf8(self.stderr.to_vec())
}
pub fn stdout_bytes(&self) -> &Bytes {
&self.stdout
}
pub fn stderr_bytes(&self) -> &Bytes {
&self.stderr
}
}
impl ExecHandle {
pub(crate) fn new(
id: u32,
events: mpsc::UnboundedReceiver<ExecEvent>,
stdin: Option<ExecSink>,
client: Arc<AgentClient>,
) -> Self {
Self {
id,
events,
stdin,
client,
}
}
pub fn id(&self) -> String {
self.id.to_string()
}
pub async fn recv(&mut self) -> Option<ExecEvent> {
self.events.recv().await
}
pub fn take_stdin(&mut self) -> Option<ExecSink> {
self.stdin.take()
}
pub async fn wait(&mut self) -> MicrosandboxResult<ExitStatus> {
while let Some(event) = self.events.recv().await {
if let ExecEvent::Exited { code } = event {
return Ok(ExitStatus {
code,
success: code == 0,
});
}
}
Err(crate::MicrosandboxError::Runtime(
"exec session ended without exit event".into(),
))
}
pub async fn collect(&mut self) -> MicrosandboxResult<ExecOutput> {
let mut stdout = Vec::new();
let mut stderr = Vec::new();
let mut exit_code: Option<i32> = None;
while let Some(event) = self.events.recv().await {
match event {
ExecEvent::Started { pid: _ } => {}
ExecEvent::Stdout(data) => {
stdout.extend_from_slice(&data);
}
ExecEvent::Stderr(data) => {
stderr.extend_from_slice(&data);
}
ExecEvent::Exited { code } => {
exit_code = Some(code);
break;
}
}
}
let code = exit_code.ok_or_else(|| {
crate::MicrosandboxError::Runtime("exec session ended without exit event".into())
})?;
Ok(ExecOutput {
status: ExitStatus {
code,
success: code == 0,
},
stdout: Bytes::from(stdout),
stderr: Bytes::from(stderr),
})
}
pub async fn signal(&self, signal: i32) -> MicrosandboxResult<()> {
let payload = ExecSignal { signal };
let msg = Message::with_payload(MessageType::ExecSignal, self.id, &payload)?;
self.client.send(&msg).await
}
pub async fn kill(&self) -> MicrosandboxResult<()> {
self.signal(9).await
}
}
impl ExecSink {
pub(crate) fn new(id: u32, client: Arc<AgentClient>) -> Self {
Self { id, client }
}
pub async fn write(&self, data: impl AsRef<[u8]>) -> MicrosandboxResult<()> {
let payload = ExecStdin {
data: data.as_ref().to_vec(),
};
let msg = Message::with_payload(MessageType::ExecStdin, self.id, &payload)?;
self.client.send(&msg).await
}
pub async fn close(&self) -> MicrosandboxResult<()> {
let payload = ExecStdin { data: Vec::new() };
let msg = Message::with_payload(MessageType::ExecStdin, self.id, &payload)?;
self.client.send(&msg).await
}
}
impl RlimitResource {
pub fn as_str(&self) -> &'static str {
match self {
Self::Cpu => "cpu",
Self::Fsize => "fsize",
Self::Data => "data",
Self::Stack => "stack",
Self::Core => "core",
Self::Rss => "rss",
Self::Nproc => "nproc",
Self::Nofile => "nofile",
Self::Memlock => "memlock",
Self::As => "as",
Self::Locks => "locks",
Self::Sigpending => "sigpending",
Self::Msgqueue => "msgqueue",
Self::Nice => "nice",
Self::Rtprio => "rtprio",
Self::Rttime => "rttime",
}
}
}
impl TryFrom<&str> for RlimitResource {
type Error = String;
fn try_from(s: &str) -> Result<Self, Self::Error> {
match s.to_lowercase().as_str() {
"cpu" => Ok(Self::Cpu),
"fsize" => Ok(Self::Fsize),
"data" => Ok(Self::Data),
"stack" => Ok(Self::Stack),
"core" => Ok(Self::Core),
"rss" => Ok(Self::Rss),
"nproc" => Ok(Self::Nproc),
"nofile" => Ok(Self::Nofile),
"memlock" => Ok(Self::Memlock),
"as" => Ok(Self::As),
"locks" => Ok(Self::Locks),
"sigpending" => Ok(Self::Sigpending),
"msgqueue" => Ok(Self::Msgqueue),
"nice" => Ok(Self::Nice),
"rtprio" => Ok(Self::Rtprio),
"rttime" => Ok(Self::Rttime),
_ => Err(format!("unknown rlimit resource: {s}")),
}
}
}