use sandbox_core::{Result, SandboxError};
use std::os::fd::FromRawFd;
use std::os::unix::io::RawFd;
use std::sync::mpsc::{Receiver, Sender, channel};
use std::thread;
#[derive(Debug, Clone)]
pub enum StreamChunk {
Stdout(String),
Stderr(String),
Exit { exit_code: i32, signal: Option<i32> },
}
pub struct ProcessStream {
receiver: Receiver<StreamChunk>,
}
impl ProcessStream {
pub fn new() -> (ProcessStreamWriter, Self) {
let (tx, rx) = channel();
(ProcessStreamWriter { tx }, ProcessStream { receiver: rx })
}
pub fn recv(&self) -> Result<Option<StreamChunk>> {
self.receiver
.recv()
.ok()
.ok_or_else(|| SandboxError::Io(std::io::Error::other("stream closed")))
.map(Some)
}
pub fn try_recv(&self) -> Result<Option<StreamChunk>> {
match self.receiver.try_recv() {
Ok(chunk) => Ok(Some(chunk)),
Err(std::sync::mpsc::TryRecvError::Empty) => Ok(None),
Err(std::sync::mpsc::TryRecvError::Disconnected) => Ok(None),
}
}
}
impl Default for ProcessStream {
fn default() -> Self {
Self::new().1
}
}
pub struct StreamIter {
receiver: Receiver<StreamChunk>,
}
impl Iterator for StreamIter {
type Item = StreamChunk;
fn next(&mut self) -> Option<Self::Item> {
self.receiver.recv().ok()
}
}
impl IntoIterator for ProcessStream {
type Item = StreamChunk;
type IntoIter = StreamIter;
fn into_iter(self) -> Self::IntoIter {
StreamIter {
receiver: self.receiver,
}
}
}
pub struct ProcessStreamWriter {
pub tx: Sender<StreamChunk>,
}
impl ProcessStreamWriter {
pub fn send_stdout(&self, data: String) -> Result<()> {
self.tx
.send(StreamChunk::Stdout(data))
.map_err(|_| SandboxError::Io(std::io::Error::other("failed to send stdout chunk")))
}
pub fn send_stderr(&self, data: String) -> Result<()> {
self.tx
.send(StreamChunk::Stderr(data))
.map_err(|_| SandboxError::Io(std::io::Error::other("failed to send stderr chunk")))
}
pub fn send_exit(&self, exit_code: i32, signal: Option<i32>) -> Result<()> {
self.tx
.send(StreamChunk::Exit { exit_code, signal })
.map_err(|_| SandboxError::Io(std::io::Error::other("failed to send exit chunk")))
}
}
pub fn spawn_fd_reader(
fd: RawFd,
is_stderr: bool,
tx: Sender<StreamChunk>,
) -> Result<thread::JoinHandle<()>> {
let handle = thread::spawn(move || {
use std::io::Read;
use std::os::unix::io::OwnedFd;
let owned_fd = unsafe { OwnedFd::from_raw_fd(fd) };
let mut file = std::fs::File::from(owned_fd);
let mut buffer = vec![0u8; 4096];
loop {
match file.read(&mut buffer) {
Ok(0) => break,
Ok(n) => {
let data = String::from_utf8_lossy(&buffer[..n]).to_string();
let chunk = if is_stderr {
StreamChunk::Stderr(data)
} else {
StreamChunk::Stdout(data)
};
if tx.send(chunk).is_err() {
break;
}
}
Err(_) => break,
}
}
});
Ok(handle)
}