use crate::runtime::backend::ExecBackend;
use boxlite_shared::errors::BoxliteResult;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::mpsc;
#[derive(Clone, Debug)]
pub struct BoxCommand {
pub(crate) command: String,
pub(crate) args: Vec<String>,
pub(crate) env: Option<Vec<(String, String)>>,
pub(crate) timeout: Option<Duration>,
pub(crate) working_dir: Option<String>,
pub(crate) tty: bool,
pub(crate) user: Option<String>,
}
impl BoxCommand {
pub fn new(command: impl Into<String>) -> Self {
Self {
command: command.into(),
args: vec![],
env: None,
timeout: None,
working_dir: None,
tty: false,
user: None,
}
}
pub fn arg(mut self, arg: impl Into<String>) -> Self {
self.args.push(arg.into());
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.args.extend(args.into_iter().map(Into::into));
self
}
pub fn env(mut self, key: impl Into<String>, val: impl Into<String>) -> Self {
self.env
.get_or_insert_with(Vec::new)
.push((key.into(), val.into()));
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn working_dir(mut self, dir: impl Into<String>) -> Self {
self.working_dir = Some(dir.into());
self
}
pub fn tty(mut self, enable: bool) -> Self {
self.tty = enable;
self
}
pub fn user(mut self, spec: impl Into<String>) -> Self {
let s = spec.into();
self.user = if s.trim().is_empty() { None } else { Some(s) };
self
}
}
#[derive(Clone)]
pub struct Execution {
id: ExecutionId,
inner: std::sync::Arc<tokio::sync::Mutex<ExecutionInner>>,
wait_state: std::sync::Arc<WaitState>,
}
pub(crate) struct ExecutionInner {
interface: Box<dyn ExecBackend>,
stdin: Option<ExecStdin>,
stdout: Option<ExecStdout>,
stderr: Option<ExecStderr>,
}
struct WaitState {
cached: tokio::sync::OnceCell<ExecResult>,
rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ExecResult>>,
}
pub type ExecutionId = String;
impl Execution {
pub(crate) fn new(
execution_id: ExecutionId,
interface: Box<dyn ExecBackend>,
result_rx: mpsc::UnboundedReceiver<ExecResult>,
stdin: Option<ExecStdin>,
stdout: Option<ExecStdout>,
stderr: Option<ExecStderr>,
) -> Self {
let inner = ExecutionInner {
interface,
stdin,
stdout,
stderr,
};
let wait_state = WaitState {
cached: tokio::sync::OnceCell::new(),
rx: tokio::sync::Mutex::new(result_rx),
};
Self {
id: execution_id,
inner: std::sync::Arc::new(tokio::sync::Mutex::new(inner)),
wait_state: std::sync::Arc::new(wait_state),
}
}
pub fn id(&self) -> &ExecutionId {
&self.id
}
pub fn stdin(&mut self) -> Option<ExecStdin> {
futures::executor::block_on(async {
let mut inner = self.inner.lock().await;
inner.stdin.take()
})
}
pub fn stdout(&mut self) -> Option<ExecStdout> {
futures::executor::block_on(async {
let mut inner = self.inner.lock().await;
inner.stdout.take()
})
}
pub fn stderr(&mut self) -> Option<ExecStderr> {
futures::executor::block_on(async {
let mut inner = self.inner.lock().await;
inner.stderr.take()
})
}
pub async fn wait(&self) -> BoxliteResult<ExecResult> {
self.wait_state
.cached
.get_or_try_init(|| async {
let mut rx = self.wait_state.rx.lock().await;
rx.recv().await.ok_or_else(|| {
boxlite_shared::BoxliteError::Internal("Result channel closed".into())
})
})
.await
.cloned()
}
pub async fn kill(&self) -> BoxliteResult<()> {
let mut inner = self.inner.lock().await;
inner.interface.kill(&self.id).await
}
pub async fn signal(&self, signal: i32) -> BoxliteResult<()> {
let mut inner = self.inner.lock().await;
inner.interface.signal(&self.id, signal).await
}
pub async fn resize_tty(&self, rows: u32, cols: u32) -> BoxliteResult<()> {
let mut inner = self.inner.lock().await;
inner.interface.resize_tty(&self.id, rows, cols, 0, 0).await
}
#[cfg(feature = "test-support")]
#[allow(clippy::type_complexity)]
pub fn stub(
id: &str,
) -> (
Self,
mpsc::UnboundedSender<String>,
mpsc::UnboundedSender<String>,
mpsc::UnboundedReceiver<Vec<u8>>,
mpsc::UnboundedSender<ExecResult>,
) {
use async_trait::async_trait;
struct NoopBackend;
#[async_trait]
impl ExecBackend for NoopBackend {
async fn signal(&mut self, _: &str, _: i32) -> BoxliteResult<()> {
Ok(())
}
async fn resize_tty(
&mut self,
_: &str,
_: u32,
_: u32,
_: u32,
_: u32,
) -> BoxliteResult<()> {
Ok(())
}
}
let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::<String>();
let (stderr_tx, stderr_rx) = mpsc::unbounded_channel::<String>();
let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let (result_tx, result_rx) = mpsc::unbounded_channel::<ExecResult>();
let exec = Self::new(
id.to_string(),
Box::new(NoopBackend),
result_rx,
Some(ExecStdin::new(stdin_tx)),
Some(ExecStdout::new(stdout_rx)),
Some(ExecStderr::new(stderr_rx)),
);
(exec, stdout_tx, stderr_tx, stdin_rx, result_tx)
}
}
#[derive(Clone, Debug)]
pub struct ExecResult {
pub exit_code: i32,
pub error_message: Option<String>,
}
impl ExecResult {
pub fn success(&self) -> bool {
self.exit_code == 0
}
pub fn code(&self) -> i32 {
self.exit_code
}
}
pub struct ExecStdin {
sender: Option<mpsc::UnboundedSender<Vec<u8>>>,
}
impl ExecStdin {
pub(crate) fn new(sender: mpsc::UnboundedSender<Vec<u8>>) -> Self {
Self {
sender: Some(sender),
}
}
pub async fn write(&mut self, data: &[u8]) -> BoxliteResult<()> {
match &self.sender {
Some(sender) => sender.send(data.to_vec()).map_err(|_| {
boxlite_shared::BoxliteError::Internal("stdin channel closed".to_string())
}),
None => Err(boxlite_shared::BoxliteError::Internal(
"stdin already closed".to_string(),
)),
}
}
pub async fn write_all(&mut self, data: &[u8]) -> BoxliteResult<()> {
self.write(data).await
}
pub fn close(&mut self) {
self.sender = None;
}
pub fn is_closed(&self) -> bool {
self.sender.is_none()
}
}
pub struct ExecStdout {
receiver: mpsc::UnboundedReceiver<String>,
}
impl ExecStdout {
pub(crate) fn new(receiver: mpsc::UnboundedReceiver<String>) -> Self {
Self { receiver }
}
}
impl Stream for ExecStdout {
type Item = String;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
pub struct ExecStderr {
receiver: mpsc::UnboundedReceiver<String>,
}
impl ExecStderr {
pub(crate) fn new(receiver: mpsc::UnboundedReceiver<String>) -> Self {
Self { receiver }
}
}
impl Stream for ExecStderr {
type Item = String;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_box_command_user_builder() {
let cmd = BoxCommand::new("whoami").user("abc:staff");
assert_eq!(cmd.user, Some("abc:staff".to_string()));
}
#[test]
fn test_box_command_default_no_user() {
let cmd = BoxCommand::new("ls");
assert_eq!(cmd.user, None);
}
#[test]
fn test_box_command_user_numeric() {
let cmd = BoxCommand::new("id").user("1000:1000");
assert_eq!(cmd.user, Some("1000:1000".to_string()));
}
#[test]
fn test_box_command_user_empty_string_becomes_none() {
let cmd = BoxCommand::new("id").user("");
assert_eq!(cmd.user, None);
}
#[test]
fn test_box_command_user_whitespace_only_becomes_none() {
let cmd = BoxCommand::new("id").user(" ");
assert_eq!(cmd.user, None);
}
use crate::runtime::backend::ExecBackend;
use async_trait::async_trait;
use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use tokio::sync::mpsc as tokio_mpsc;
struct StubExecBackend {
kill_observed: StdArc<AtomicBool>,
}
#[async_trait]
impl ExecBackend for StubExecBackend {
async fn signal(&mut self, _execution_id: &str, _signal: i32) -> BoxliteResult<()> {
self.kill_observed.store(true, AtomicOrdering::SeqCst);
Ok(())
}
async fn resize_tty(
&mut self,
_execution_id: &str,
_rows: u32,
_cols: u32,
_x_pixels: u32,
_y_pixels: u32,
) -> BoxliteResult<()> {
Ok(())
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn wait_does_not_block_kill() {
let (_result_tx, result_rx) = tokio_mpsc::unbounded_channel::<ExecResult>();
let kill_observed = StdArc::new(AtomicBool::new(false));
let backend = Box::new(StubExecBackend {
kill_observed: kill_observed.clone(),
});
let exec = Execution::new(
"test-exec".to_string(),
backend,
result_rx,
None,
None,
None,
);
let wait_clone = exec.clone();
tokio::spawn(async move {
let _ = wait_clone.wait().await;
});
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let result = tokio::time::timeout(std::time::Duration::from_millis(500), exec.signal(9))
.await
.expect(
"kill/signal blocked by parked wait — Execution lock split \
regressed; see src/boxlite/src/litebox/exec.rs::wait",
);
assert!(result.is_ok());
assert!(kill_observed.load(AtomicOrdering::SeqCst));
}
}