boxlite 0.9.4

Embeddable virtual machine runtime for secure, isolated code execution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
//! Command execution types
//!
//! Type definitions for executing commands in a box.
//! The actual execution logic is in BoxImpl::exec().

use crate::runtime::backend::ExecBackend;
use boxlite_shared::errors::BoxliteResult;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::mpsc;

/// Command builder for executing programs in a box.
///
/// Provides a builder API similar to `std::process::Command`.
///
/// # Examples
///
/// ```rust,no_run
/// # use boxlite::BoxCommand;
/// # use std::time::Duration;
/// let cmd = BoxCommand::new("python3")
///     .args(["-c", "print('hello')"])
///     .env("PYTHONPATH", "/app")
///     .timeout(Duration::from_secs(30))
///     .working_dir("/workspace");
/// ```
#[derive(Clone, Debug)]
pub struct BoxCommand {
    pub(crate) command: String,
    pub(crate) args: Vec<String>,
    pub(crate) env: Option<Vec<(String, String)>>,
    pub(crate) timeout: Option<Duration>,
    pub(crate) working_dir: Option<String>,
    pub(crate) tty: bool,
    pub(crate) user: Option<String>,
}

impl BoxCommand {
    /// Create a new command.
    pub fn new(command: impl Into<String>) -> Self {
        Self {
            command: command.into(),
            args: vec![],
            env: None,
            timeout: None,
            working_dir: None,
            tty: false,
            user: None,
        }
    }

    /// Add a single argument.
    pub fn arg(mut self, arg: impl Into<String>) -> Self {
        self.args.push(arg.into());
        self
    }

    /// Add multiple arguments.
    pub fn args<I, S>(mut self, args: I) -> Self
    where
        I: IntoIterator<Item = S>,
        S: Into<String>,
    {
        self.args.extend(args.into_iter().map(Into::into));
        self
    }

    /// Set an environment variable.
    pub fn env(mut self, key: impl Into<String>, val: impl Into<String>) -> Self {
        self.env
            .get_or_insert_with(Vec::new)
            .push((key.into(), val.into()));
        self
    }

    /// Set execution timeout.
    pub fn timeout(mut self, timeout: Duration) -> Self {
        self.timeout = Some(timeout);
        self
    }

    /// Set working directory.
    pub fn working_dir(mut self, dir: impl Into<String>) -> Self {
        self.working_dir = Some(dir.into());
        self
    }

    /// Enable TTY (pseudo-terminal) for interactive sessions.
    ///
    /// Terminal size is auto-detected from the current terminal.
    pub fn tty(mut self, enable: bool) -> Self {
        self.tty = enable;
        self
    }

    /// Set the user to run the command as.
    ///
    /// Format: `<name|uid>[:<group|gid>]` (same as `docker exec --user`).
    /// If not set, uses the container's default user from image config.
    pub fn user(mut self, spec: impl Into<String>) -> Self {
        let s = spec.into();
        self.user = if s.trim().is_empty() { None } else { Some(s) };
        self
    }
}

/// Handle to a running command execution.
///
/// Similar to `std::process::Child` but for remote execution in a guest.
/// Provides access to stdin, stdout, stderr streams and control operations.
///
/// # Examples
///
/// ```rust,no_run
/// # async fn example(litebox: &boxlite::LiteBox) -> Result<(), Box<dyn std::error::Error>> {
/// use boxlite::BoxCommand;
/// use futures::StreamExt;
///
/// let mut execution = litebox.exec(BoxCommand::new("ls").arg("-la")).await?;
///
/// // Read stdout
/// let mut stdout = execution.stdout.take().unwrap();
/// while let Some(line) = stdout.next().await {
///     println!("{}", line);
/// }
///
/// // Wait for completion
/// let status = execution.wait().await?;
/// println!("Exit code: {}", status.exit_code);
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct Execution {
    id: ExecutionId,
    inner: std::sync::Arc<tokio::sync::Mutex<ExecutionInner>>,
    /// Result-channel state held on a separate lock so a parked `wait()`
    /// can never block `kill()`/`signal()`/`resize_tty()`.
    wait_state: std::sync::Arc<WaitState>,
}

pub(crate) struct ExecutionInner {
    interface: Box<dyn ExecBackend>,

    /// Standard input stream (write-only).
    stdin: Option<ExecStdin>,

    /// Standard output stream (read-only).
    stdout: Option<ExecStdout>,

    /// Standard error stream (read-only).
    stderr: Option<ExecStderr>,
}

/// Independent lock domain for `Execution::wait`. Held only by waiters; never
/// contended with `interface`-using ops (`kill`, `signal`, `resize_tty`).
///
/// `OnceCell::get_or_try_init` ensures the result channel is read at most
/// once; concurrent waiters past the first one observe the cached value
/// without ever locking `rx`.
struct WaitState {
    cached: tokio::sync::OnceCell<ExecResult>,
    rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ExecResult>>,
}

/// Unique identifier for an execution.
pub type ExecutionId = String;

impl Execution {
    /// Create a new Execution (internal use).
    pub(crate) fn new(
        execution_id: ExecutionId,
        interface: Box<dyn ExecBackend>,
        result_rx: mpsc::UnboundedReceiver<ExecResult>,
        stdin: Option<ExecStdin>,
        stdout: Option<ExecStdout>,
        stderr: Option<ExecStderr>,
    ) -> Self {
        let inner = ExecutionInner {
            interface,
            stdin,
            stdout,
            stderr,
        };
        let wait_state = WaitState {
            cached: tokio::sync::OnceCell::new(),
            rx: tokio::sync::Mutex::new(result_rx),
        };

        Self {
            id: execution_id,
            inner: std::sync::Arc::new(tokio::sync::Mutex::new(inner)),
            wait_state: std::sync::Arc::new(wait_state),
        }
    }

    /// Get the execution ID.
    pub fn id(&self) -> &ExecutionId {
        &self.id
    }

    /// Take the stdin stream (can only be called once).
    pub fn stdin(&mut self) -> Option<ExecStdin> {
        futures::executor::block_on(async {
            let mut inner = self.inner.lock().await;
            inner.stdin.take()
        })
    }

    /// Take the stdout stream (can only be called once).
    pub fn stdout(&mut self) -> Option<ExecStdout> {
        futures::executor::block_on(async {
            let mut inner = self.inner.lock().await;
            inner.stdout.take()
        })
    }

    /// Take the stderr stream (can only be called once).
    pub fn stderr(&mut self) -> Option<ExecStderr> {
        futures::executor::block_on(async {
            let mut inner = self.inner.lock().await;
            inner.stderr.take()
        })
    }

    /// Wait for the execution to complete.
    ///
    /// Returns the exit status once the execution finishes. If the result is
    /// already cached, returns immediately. Otherwise, awaits the next value
    /// from the result channel.
    ///
    /// The lock domain is `wait_state`, independent of `inner`, so a parked
    /// wait does not block `kill`/`signal`/`resize_tty`.
    pub async fn wait(&self) -> BoxliteResult<ExecResult> {
        self.wait_state
            .cached
            .get_or_try_init(|| async {
                let mut rx = self.wait_state.rx.lock().await;
                rx.recv().await.ok_or_else(|| {
                    boxlite_shared::BoxliteError::Internal("Result channel closed".into())
                })
            })
            .await
            .cloned()
    }

    /// Terminate the execution and release server-side resources. For the
    /// REST backend this is `DELETE /executions/{id}` (atomic kill + evict).
    /// For the local backend this falls back to `signal(SIGKILL)`.
    ///
    /// Takes `&self` because state mutation happens through the internal
    /// `inner` mutex — there is no Rust-level invariant requiring exclusive
    /// access to the handle. Callers can share `Execution` via `Arc<Execution>`
    /// and call `kill()` without a wrapping outer mutex.
    pub async fn kill(&self) -> BoxliteResult<()> {
        let mut inner = self.inner.lock().await;
        inner.interface.kill(&self.id).await
    }

    /// Send a Unix signal to the execution. The execution continues running
    /// (or not) based on whether the process honors the signal.
    pub async fn signal(&self, signal: i32) -> BoxliteResult<()> {
        let mut inner = self.inner.lock().await;
        inner.interface.signal(&self.id, signal).await
    }

    /// Resize PTY terminal window.
    ///
    /// Only works for executions started with TTY enabled.
    pub async fn resize_tty(&self, rows: u32, cols: u32) -> BoxliteResult<()> {
        let mut inner = self.inner.lock().await;
        inner.interface.resize_tty(&self.id, rows, cols, 0, 0).await
    }

    /// Build a stub `Execution` for cross-crate tests.
    ///
    /// The stub backend no-ops every control operation. Callers drive
    /// the execution through the returned channel handles:
    ///  - send to `stdout_tx` / `stderr_tx` to produce output
    ///  - send to `result_tx` to signal process completion
    ///  - receive from `stdin_rx` to observe stdin writes
    #[cfg(feature = "test-support")]
    #[allow(clippy::type_complexity)]
    pub fn stub(
        id: &str,
    ) -> (
        Self,
        mpsc::UnboundedSender<String>,
        mpsc::UnboundedSender<String>,
        mpsc::UnboundedReceiver<Vec<u8>>,
        mpsc::UnboundedSender<ExecResult>,
    ) {
        use async_trait::async_trait;

        struct NoopBackend;
        #[async_trait]
        impl ExecBackend for NoopBackend {
            async fn signal(&mut self, _: &str, _: i32) -> BoxliteResult<()> {
                Ok(())
            }
            async fn resize_tty(
                &mut self,
                _: &str,
                _: u32,
                _: u32,
                _: u32,
                _: u32,
            ) -> BoxliteResult<()> {
                Ok(())
            }
        }

        let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::<String>();
        let (stderr_tx, stderr_rx) = mpsc::unbounded_channel::<String>();
        let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::<Vec<u8>>();
        let (result_tx, result_rx) = mpsc::unbounded_channel::<ExecResult>();

        let exec = Self::new(
            id.to_string(),
            Box::new(NoopBackend),
            result_rx,
            Some(ExecStdin::new(stdin_tx)),
            Some(ExecStdout::new(stdout_rx)),
            Some(ExecStderr::new(stderr_rx)),
        );
        (exec, stdout_tx, stderr_tx, stdin_rx, result_tx)
    }
}

/// Exit status of a process.
#[derive(Clone, Debug)]
pub struct ExecResult {
    /// Exit code (0 = success). If terminated by signal, code is negative signal number.
    pub exit_code: i32,
    /// Diagnostic message when process died unexpectedly
    /// (e.g., container init death causing PID namespace teardown).
    /// None if the process exited normally.
    pub error_message: Option<String>,
}

impl ExecResult {
    /// Returns true if the exit code was 0.
    pub fn success(&self) -> bool {
        self.exit_code == 0
    }

    pub fn code(&self) -> i32 {
        self.exit_code
    }
}

/// Standard input stream (write-only).
pub struct ExecStdin {
    sender: Option<mpsc::UnboundedSender<Vec<u8>>>,
}

impl ExecStdin {
    pub(crate) fn new(sender: mpsc::UnboundedSender<Vec<u8>>) -> Self {
        Self {
            sender: Some(sender),
        }
    }

    /// Write data to stdin.
    pub async fn write(&mut self, data: &[u8]) -> BoxliteResult<()> {
        match &self.sender {
            Some(sender) => sender.send(data.to_vec()).map_err(|_| {
                boxlite_shared::BoxliteError::Internal("stdin channel closed".to_string())
            }),
            None => Err(boxlite_shared::BoxliteError::Internal(
                "stdin already closed".to_string(),
            )),
        }
    }

    /// Write all data to stdin.
    pub async fn write_all(&mut self, data: &[u8]) -> BoxliteResult<()> {
        self.write(data).await
    }

    /// Close stdin stream, signaling EOF to the process.
    pub fn close(&mut self) {
        self.sender = None;
    }

    /// Check if stdin is closed.
    pub fn is_closed(&self) -> bool {
        self.sender.is_none()
    }
}

/// Standard output stream (read-only).
pub struct ExecStdout {
    receiver: mpsc::UnboundedReceiver<String>,
}

impl ExecStdout {
    pub(crate) fn new(receiver: mpsc::UnboundedReceiver<String>) -> Self {
        Self { receiver }
    }
}

impl Stream for ExecStdout {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.receiver.poll_recv(cx)
    }
}

/// Standard error stream (read-only).
pub struct ExecStderr {
    receiver: mpsc::UnboundedReceiver<String>,
}

impl ExecStderr {
    pub(crate) fn new(receiver: mpsc::UnboundedReceiver<String>) -> Self {
        Self { receiver }
    }
}

impl Stream for ExecStderr {
    type Item = String;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.receiver.poll_recv(cx)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_box_command_user_builder() {
        let cmd = BoxCommand::new("whoami").user("abc:staff");
        assert_eq!(cmd.user, Some("abc:staff".to_string()));
    }

    #[test]
    fn test_box_command_default_no_user() {
        let cmd = BoxCommand::new("ls");
        assert_eq!(cmd.user, None);
    }

    #[test]
    fn test_box_command_user_numeric() {
        let cmd = BoxCommand::new("id").user("1000:1000");
        assert_eq!(cmd.user, Some("1000:1000".to_string()));
    }

    #[test]
    fn test_box_command_user_empty_string_becomes_none() {
        let cmd = BoxCommand::new("id").user("");
        assert_eq!(cmd.user, None);
    }

    #[test]
    fn test_box_command_user_whitespace_only_becomes_none() {
        let cmd = BoxCommand::new("id").user("  ");
        assert_eq!(cmd.user, None);
    }

    // ─── wait must not block kill ─────────────────────────────────────
    //
    // `kill`/`signal`/`resize_tty` need the inner mutex. If `wait`
    // held the inner mutex across `result_rx.recv().await`, a parked
    // wait would block them indefinitely. wait operates on its own
    // `wait_state` lock domain so the inner mutex stays available.

    use crate::runtime::backend::ExecBackend;
    use async_trait::async_trait;
    use std::sync::Arc as StdArc;
    use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
    use tokio::sync::mpsc as tokio_mpsc;

    struct StubExecBackend {
        kill_observed: StdArc<AtomicBool>,
    }

    #[async_trait]
    impl ExecBackend for StubExecBackend {
        async fn signal(&mut self, _execution_id: &str, _signal: i32) -> BoxliteResult<()> {
            self.kill_observed.store(true, AtomicOrdering::SeqCst);
            Ok(())
        }
        async fn resize_tty(
            &mut self,
            _execution_id: &str,
            _rows: u32,
            _cols: u32,
            _x_pixels: u32,
            _y_pixels: u32,
        ) -> BoxliteResult<()> {
            Ok(())
        }
    }

    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
    async fn wait_does_not_block_kill() {
        let (_result_tx, result_rx) = tokio_mpsc::unbounded_channel::<ExecResult>();
        let kill_observed = StdArc::new(AtomicBool::new(false));
        let backend = Box::new(StubExecBackend {
            kill_observed: kill_observed.clone(),
        });

        let exec = Execution::new(
            "test-exec".to_string(),
            backend,
            result_rx,
            None,
            None,
            None,
        );

        // Park a `wait` future. With the bug present, `wait` would hold
        // `inner.lock()` across `recv().await` and `kill` below would
        // never get the lock.
        let wait_clone = exec.clone();
        tokio::spawn(async move {
            let _ = wait_clone.wait().await;
        });

        // Give the wait task a tick to grab whatever locks it needs.
        tokio::time::sleep(std::time::Duration::from_millis(50)).await;

        // Kill must resolve within bounded time even though wait is
        // parked. With the bug present, this awaits forever.
        let result = tokio::time::timeout(std::time::Duration::from_millis(500), exec.signal(9))
            .await
            .expect(
                "kill/signal blocked by parked wait — Execution lock split \
             regressed; see src/boxlite/src/litebox/exec.rs::wait",
            );
        assert!(result.is_ok());
        assert!(kill_observed.load(AtomicOrdering::SeqCst));
    }
}