sandbox_rs/execution/
stream.rs

1//! Stream handling for process output
2
3use crate::errors::{Result, SandboxError};
4use std::os::fd::FromRawFd;
5use std::os::unix::io::RawFd;
6use std::sync::mpsc::{Receiver, Sender, channel};
7use std::thread;
8
9/// A chunk of process output
10#[derive(Debug, Clone)]
11pub enum StreamChunk {
12    /// Data from stdout
13    Stdout(String),
14    /// Data from stderr
15    Stderr(String),
16    /// Process has exited
17    Exit { exit_code: i32, signal: Option<i32> },
18}
19
20/// Handle for receiving process output streams
21pub struct ProcessStream {
22    receiver: Receiver<StreamChunk>,
23}
24
25impl ProcessStream {
26    /// Create new process stream handler
27    pub fn new() -> (ProcessStreamWriter, Self) {
28        let (tx, rx) = channel();
29        (ProcessStreamWriter { tx }, ProcessStream { receiver: rx })
30    }
31
32    /// Receive next chunk from process streams
33    pub fn recv(&self) -> Result<Option<StreamChunk>> {
34        self.receiver
35            .recv()
36            .ok()
37            .ok_or_else(|| SandboxError::Io(std::io::Error::other("stream closed")))
38            .map(Some)
39    }
40
41    /// Try to receive next chunk without blocking
42    pub fn try_recv(&self) -> Result<Option<StreamChunk>> {
43        match self.receiver.try_recv() {
44            Ok(chunk) => Ok(Some(chunk)),
45            Err(std::sync::mpsc::TryRecvError::Empty) => Ok(None),
46            Err(std::sync::mpsc::TryRecvError::Disconnected) => Ok(None),
47        }
48    }
49}
50
51impl Default for ProcessStream {
52    fn default() -> Self {
53        Self::new().1
54    }
55}
56
57pub struct StreamIter {
58    receiver: Receiver<StreamChunk>,
59}
60
61impl Iterator for StreamIter {
62    type Item = StreamChunk;
63
64    fn next(&mut self) -> Option<Self::Item> {
65        self.receiver.recv().ok()
66    }
67}
68
69impl IntoIterator for ProcessStream {
70    type Item = StreamChunk;
71    type IntoIter = StreamIter;
72
73    fn into_iter(self) -> Self::IntoIter {
74        StreamIter {
75            receiver: self.receiver,
76        }
77    }
78}
79
80/// Writer side for process streams
81pub struct ProcessStreamWriter {
82    pub tx: Sender<StreamChunk>,
83}
84
85impl ProcessStreamWriter {
86    /// Send stdout chunk
87    pub fn send_stdout(&self, data: String) -> Result<()> {
88        self.tx
89            .send(StreamChunk::Stdout(data))
90            .map_err(|_| SandboxError::Io(std::io::Error::other("failed to send stdout chunk")))
91    }
92
93    /// Send stderr chunk
94    pub fn send_stderr(&self, data: String) -> Result<()> {
95        self.tx
96            .send(StreamChunk::Stderr(data))
97            .map_err(|_| SandboxError::Io(std::io::Error::other("failed to send stderr chunk")))
98    }
99
100    /// Send exit status
101    pub fn send_exit(&self, exit_code: i32, signal: Option<i32>) -> Result<()> {
102        self.tx
103            .send(StreamChunk::Exit { exit_code, signal })
104            .map_err(|_| SandboxError::Io(std::io::Error::other("failed to send exit chunk")))
105    }
106}
107
108/// Spawn a reader thread for a file descriptor
109pub fn spawn_fd_reader(
110    fd: RawFd,
111    is_stderr: bool,
112    tx: Sender<StreamChunk>,
113) -> Result<thread::JoinHandle<()>> {
114    let handle = thread::spawn(move || {
115        // SAFETY: This FD comes from a properly-managed pipe created by the parent.
116        // We wrap it in OwnedFd to ensure proper cleanup.
117        use std::io::Read;
118        use std::os::unix::io::OwnedFd;
119
120        let owned_fd = unsafe { OwnedFd::from_raw_fd(fd) };
121        let mut file = std::fs::File::from(owned_fd);
122
123        let mut buffer = vec![0u8; 4096];
124
125        loop {
126            match file.read(&mut buffer) {
127                Ok(0) => break,
128                Ok(n) => {
129                    let data = String::from_utf8_lossy(&buffer[..n]).to_string();
130                    let chunk = if is_stderr {
131                        StreamChunk::Stderr(data)
132                    } else {
133                        StreamChunk::Stdout(data)
134                    };
135
136                    if tx.send(chunk).is_err() {
137                        break;
138                    }
139                }
140                Err(_) => break,
141            }
142        }
143    });
144
145    Ok(handle)
146}