sandbox_rs/execution/
stream.rs1use crate::errors::{Result, SandboxError};
4use std::os::fd::FromRawFd;
5use std::os::unix::io::RawFd;
6use std::sync::mpsc::{Receiver, Sender, channel};
7use std::thread;
8
9#[derive(Debug, Clone)]
11pub enum StreamChunk {
12 Stdout(String),
14 Stderr(String),
16 Exit { exit_code: i32, signal: Option<i32> },
18}
19
20pub struct ProcessStream {
22 receiver: Receiver<StreamChunk>,
23}
24
25impl ProcessStream {
26 pub fn new() -> (ProcessStreamWriter, Self) {
28 let (tx, rx) = channel();
29 (ProcessStreamWriter { tx }, ProcessStream { receiver: rx })
30 }
31
32 pub fn recv(&self) -> Result<Option<StreamChunk>> {
34 self.receiver
35 .recv()
36 .ok()
37 .ok_or_else(|| SandboxError::Io(std::io::Error::other("stream closed")))
38 .map(Some)
39 }
40
41 pub fn try_recv(&self) -> Result<Option<StreamChunk>> {
43 match self.receiver.try_recv() {
44 Ok(chunk) => Ok(Some(chunk)),
45 Err(std::sync::mpsc::TryRecvError::Empty) => Ok(None),
46 Err(std::sync::mpsc::TryRecvError::Disconnected) => Ok(None),
47 }
48 }
49}
50
51impl Default for ProcessStream {
52 fn default() -> Self {
53 Self::new().1
54 }
55}
56
57pub struct StreamIter {
58 receiver: Receiver<StreamChunk>,
59}
60
61impl Iterator for StreamIter {
62 type Item = StreamChunk;
63
64 fn next(&mut self) -> Option<Self::Item> {
65 self.receiver.recv().ok()
66 }
67}
68
69impl IntoIterator for ProcessStream {
70 type Item = StreamChunk;
71 type IntoIter = StreamIter;
72
73 fn into_iter(self) -> Self::IntoIter {
74 StreamIter {
75 receiver: self.receiver,
76 }
77 }
78}
79
80pub struct ProcessStreamWriter {
82 pub tx: Sender<StreamChunk>,
83}
84
85impl ProcessStreamWriter {
86 pub fn send_stdout(&self, data: String) -> Result<()> {
88 self.tx
89 .send(StreamChunk::Stdout(data))
90 .map_err(|_| SandboxError::Io(std::io::Error::other("failed to send stdout chunk")))
91 }
92
93 pub fn send_stderr(&self, data: String) -> Result<()> {
95 self.tx
96 .send(StreamChunk::Stderr(data))
97 .map_err(|_| SandboxError::Io(std::io::Error::other("failed to send stderr chunk")))
98 }
99
100 pub fn send_exit(&self, exit_code: i32, signal: Option<i32>) -> Result<()> {
102 self.tx
103 .send(StreamChunk::Exit { exit_code, signal })
104 .map_err(|_| SandboxError::Io(std::io::Error::other("failed to send exit chunk")))
105 }
106}
107
108pub fn spawn_fd_reader(
110 fd: RawFd,
111 is_stderr: bool,
112 tx: Sender<StreamChunk>,
113) -> Result<thread::JoinHandle<()>> {
114 let handle = thread::spawn(move || {
115 use std::io::Read;
118 use std::os::unix::io::OwnedFd;
119
120 let owned_fd = unsafe { OwnedFd::from_raw_fd(fd) };
121 let mut file = std::fs::File::from(owned_fd);
122
123 let mut buffer = vec![0u8; 4096];
124
125 loop {
126 match file.read(&mut buffer) {
127 Ok(0) => break,
128 Ok(n) => {
129 let data = String::from_utf8_lossy(&buffer[..n]).to_string();
130 let chunk = if is_stderr {
131 StreamChunk::Stderr(data)
132 } else {
133 StreamChunk::Stdout(data)
134 };
135
136 if tx.send(chunk).is_err() {
137 break;
138 }
139 }
140 Err(_) => break,
141 }
142 }
143 });
144
145 Ok(handle)
146}