use crate::runtime::backend::ExecBackend;
use boxlite_shared::errors::BoxliteResult;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::mpsc;
#[derive(Clone, Debug)]
pub struct BoxCommand {
pub(crate) command: String,
pub(crate) args: Vec<String>,
pub(crate) env: Option<Vec<(String, String)>>,
pub(crate) timeout: Option<Duration>,
pub(crate) working_dir: Option<String>,
pub(crate) tty: bool,
pub(crate) user: Option<String>,
}
impl BoxCommand {
pub fn new(command: impl Into<String>) -> Self {
Self {
command: command.into(),
args: vec![],
env: None,
timeout: None,
working_dir: None,
tty: false,
user: None,
}
}
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.args.push(arg.into());
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.args.extend(args.into_iter().map(Into::into));
self
}
pub fn env(mut self, key: impl Into<String>, val: impl Into<String>) -> Self {
self.env
.get_or_insert_with(Vec::new)
.push((key.into(), val.into()));
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn working_dir(mut self, dir: impl Into<String>) -> Self {
self.working_dir = Some(dir.into());
self
}
pub fn tty(mut self, enable: bool) -> Self {
self.tty = enable;
self
}
pub fn user(mut self, spec: impl Into<String>) -> Self {
let s = spec.into();
self.user = if s.trim().is_empty() { None } else { Some(s) };
self
}
}
#[derive(Clone)]
pub struct Execution {
id: ExecutionId,
inner: std::sync::Arc<tokio::sync::Mutex<ExecutionInner>>,
}
pub(crate) struct ExecutionInner {
interface: Box<dyn ExecBackend>,
result_rx: mpsc::UnboundedReceiver<ExecResult>,
cached_result: Option<ExecResult>,
stdin: Option<ExecStdin>,
stdout: Option<ExecStdout>,
stderr: Option<ExecStderr>,
}
pub type ExecutionId = String;
impl Execution {
pub(crate) fn new(
execution_id: ExecutionId,
interface: Box<dyn ExecBackend>,
result_rx: mpsc::UnboundedReceiver<ExecResult>,
stdin: Option<ExecStdin>,
stdout: Option<ExecStdout>,
stderr: Option<ExecStderr>,
) -> Self {
let inner = ExecutionInner {
interface,
result_rx,
cached_result: None,
stdin,
stdout,
stderr,
};
Self {
id: execution_id,
inner: std::sync::Arc::new(tokio::sync::Mutex::new(inner)),
}
}
pub fn id(&self) -> &ExecutionId {
&self.id
}
pub fn stdin(&mut self) -> Option<ExecStdin> {
futures::executor::block_on(async {
let mut inner = self.inner.lock().await;
inner.stdin.take()
})
}
pub fn stdout(&mut self) -> Option<ExecStdout> {
futures::executor::block_on(async {
let mut inner = self.inner.lock().await;
inner.stdout.take()
})
}
pub fn stderr(&mut self) -> Option<ExecStderr> {
futures::executor::block_on(async {
let mut inner = self.inner.lock().await;
inner.stderr.take()
})
}
pub async fn wait(&mut self) -> BoxliteResult<ExecResult> {
let mut inner = self.inner.lock().await;
if let Some(result) = &inner.cached_result {
return Ok(result.clone());
}
if let Ok(status) = inner.result_rx.try_recv() {
inner.cached_result = Some(status.clone());
return Ok(status);
}
let status = inner.result_rx.recv().await.ok_or_else(|| {
boxlite_shared::BoxliteError::Internal("Result channel closed".into())
})?;
inner.cached_result = Some(status.clone());
Ok(status)
}
pub async fn kill(&mut self) -> BoxliteResult<()> {
self.signal(9).await }
pub async fn signal(&self, signal: i32) -> BoxliteResult<()> {
let mut inner = self.inner.lock().await;
inner.interface.kill(&self.id, signal).await
}
pub async fn resize_tty(&self, rows: u32, cols: u32) -> BoxliteResult<()> {
let mut inner = self.inner.lock().await;
inner.interface.resize_tty(&self.id, rows, cols, 0, 0).await
}
}
#[derive(Clone, Debug)]
pub struct ExecResult {
pub exit_code: i32,
pub error_message: Option<String>,
}
impl ExecResult {
pub fn success(&self) -> bool {
self.exit_code == 0
}
pub fn code(&self) -> i32 {
self.exit_code
}
}
pub struct ExecStdin {
sender: Option<mpsc::UnboundedSender<Vec<u8>>>,
}
impl ExecStdin {
pub(crate) fn new(sender: mpsc::UnboundedSender<Vec<u8>>) -> Self {
Self {
sender: Some(sender),
}
}
pub async fn write(&mut self, data: &[u8]) -> BoxliteResult<()> {
match &self.sender {
Some(sender) => sender.send(data.to_vec()).map_err(|_| {
boxlite_shared::BoxliteError::Internal("stdin channel closed".to_string())
}),
None => Err(boxlite_shared::BoxliteError::Internal(
"stdin already closed".to_string(),
)),
}
}
pub async fn write_all(&mut self, data: &[u8]) -> BoxliteResult<()> {
self.write(data).await
}
pub fn close(&mut self) {
self.sender = None;
}
pub fn is_closed(&self) -> bool {
self.sender.is_none()
}
}
pub struct ExecStdout {
receiver: mpsc::UnboundedReceiver<String>,
}
impl ExecStdout {
pub(crate) fn new(receiver: mpsc::UnboundedReceiver<String>) -> Self {
Self { receiver }
}
}
impl Stream for ExecStdout {
type Item = String;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
pub struct ExecStderr {
receiver: mpsc::UnboundedReceiver<String>,
}
impl ExecStderr {
pub(crate) fn new(receiver: mpsc::UnboundedReceiver<String>) -> Self {
Self { receiver }
}
}
impl Stream for ExecStderr {
type Item = String;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_box_command_user_builder() {
let cmd = BoxCommand::new("whoami").user("abc:staff");
assert_eq!(cmd.user, Some("abc:staff".to_string()));
}
#[test]
fn test_box_command_default_no_user() {
let cmd = BoxCommand::new("ls");
assert_eq!(cmd.user, None);
}
#[test]
fn test_box_command_user_numeric() {
let cmd = BoxCommand::new("id").user("1000:1000");
assert_eq!(cmd.user, Some("1000:1000".to_string()));
}
#[test]
fn test_box_command_user_empty_string_becomes_none() {
let cmd = BoxCommand::new("id").user("");
assert_eq!(cmd.user, None);
}
#[test]
fn test_box_command_user_whitespace_only_becomes_none() {
let cmd = BoxCommand::new("id").user(" ");
assert_eq!(cmd.user, None);
}
}