pyc_shell/shell/proc/
pipe.rs1extern crate nix;
27
28use super::{ShellError};
29
30use std::path::PathBuf;
31use std::os::unix::io::RawFd;
32use std::time::{Instant, Duration};
33
34use nix::unistd;
36
37#[derive(Clone, std::fmt::Debug)]
38pub(crate) struct Pipe {
39 pub path: PathBuf, pub fd: RawFd
41}
42
43impl Pipe {
44
45 pub fn open(path: &PathBuf) -> Result<Pipe, ShellError> {
49 if let Err(err) = unistd::mkfifo(path.as_path(), nix::sys::stat::Mode::S_IRWXU | nix::sys::stat::Mode::S_IRWXG | nix::sys::stat::Mode::S_IRWXO) {
51 match err {
52 nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
53 _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
54 }
55 }
56 match nix::fcntl::open(path.as_path(), nix::fcntl::OFlag::O_RDWR, nix::sys::stat::Mode::S_IRWXU | nix::sys::stat::Mode::S_IRWXG | nix::sys::stat::Mode::S_IRWXO) {
58 Ok(fd) => {
59 Ok(Pipe {
60 path: path.clone(),
61 fd: fd
62 })
63 },
64 Err(err) => {
65 match err {
66 nix::Error::Sys(errno) => Err(ShellError::PipeError(errno)),
67 _ => Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
68 }
69 }
70 }
71 }
72
73 pub fn close(&self) -> Result<(), ShellError> {
77 if let Err(err) = unistd::close(self.fd) {
78 match err {
79 nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
80 _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
81 }
82 };
83 let _ = unistd::unlink(self.path.as_path());
85 Ok(())
86 }
87
88 pub fn read(&self, timeout: u64, read_all: bool) -> Result<Option<String>, ShellError> {
94 let mut poll_fds: [nix::poll::PollFd; 1] = [nix::poll::PollFd::new(self.fd, nix::poll::PollFlags::POLLIN | nix::poll::PollFlags::POLLRDBAND | nix::poll::PollFlags::POLLHUP)];
96 let mut data_out: String = String::new();
98 let mut data_size: usize = 0;
99 let timeout: Duration = Duration::from_millis(timeout);
101 let time: Instant = Instant::now();
102 while time.elapsed() < timeout {
103 match nix::poll::poll(&mut poll_fds, 50) {
105 Ok(ret) => {
106 if ret > 0 && poll_fds[0].revents().is_some() { let event: nix::poll::PollFlags = poll_fds[0].revents().unwrap();
108 if event.intersects(nix::poll::PollFlags::POLLIN) || event.intersects(nix::poll::PollFlags::POLLRDBAND) {
109 let mut buffer: [u8; 8192] = [0; 8192];
111 match unistd::read(self.fd, &mut buffer) {
112 Ok(bytes_read) => {
113 data_size += bytes_read;
114 data_out.push_str(match std::str::from_utf8(&buffer[0..bytes_read]) {
116 Ok(s) => s,
117 Err(_) => {
118 return Err(ShellError::InvalidData)
119 }
120 });
121 if ! read_all {
122 break;
123 }
124 },
125 Err(err) => {
126 match err {
127 nix::Error::Sys(errno) => {
128 match errno {
129 nix::errno::Errno::EAGAIN => { if data_size == 0 {
131 continue; } else {
133 break; }
135 },
136 _ => return Err(ShellError::PipeError(errno)) }
138 },
139 _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
140 }
141 }
142 };
143 } else if event.intersects(nix::poll::PollFlags::POLLERR) { return Err(ShellError::PipeError(nix::errno::Errno::EPIPE))
145 } else if event.intersects(nix::poll::PollFlags::POLLHUP) { if data_size == 0 {
148 continue;
149 } else {
150 break;
151 }
152 }
153 } else if ret == 0 {
154 if data_size == 0 {
156 continue;
157 } else {
158 break;
159 }
160 }
161 },
162 Err(err) => { match err {
164 nix::Error::Sys(errno) => {
165 match errno {
166 nix::errno::Errno::EAGAIN => { if data_size == 0 {
168 continue; } else {
170 break; }
172 },
173 _ => return Err(ShellError::PipeError(errno)) }
175 },
176 _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
177 }
178 }
179 }
180 }
181 match data_size {
183 0 => Ok(None),
184 _ => Ok(Some(data_out))
185 }
186 }
187
188 pub fn write(&self, data: String, timeout: u64) -> Result<(), ShellError> {
192 let mut poll_fds: [nix::poll::PollFd; 1] = [nix::poll::PollFd::new(self.fd, nix::poll::PollFlags::POLLOUT)];
194 let timeout: Duration = Duration::from_millis(timeout);
196 let time: Instant = Instant::now();
197 let data_out = data.as_bytes();
199 let total_bytes_amount: usize = data_out.len();
200 let mut bytes_written: usize = 0;
202 while bytes_written < total_bytes_amount {
203 match nix::poll::poll(&mut poll_fds, 50) {
204 Ok(_) => {
205 if let Some(revents) = poll_fds[0].revents() {
206 if revents.intersects(nix::poll::PollFlags::POLLOUT) {
207 let bytes_out = if total_bytes_amount - bytes_written > 8192 {
209 8192
210 } else {
211 total_bytes_amount - bytes_written
212 };
213 match unistd::write(self.fd, &data_out[bytes_written..(bytes_written + bytes_out)]) {
215 Ok(bytes) => {
216 bytes_written += bytes; },
218 Err(err) => {
219 match err {
220 nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
221 _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
222 }
223 }
224 }
225 }
226 }
227 },
228 Err(err) => {
229 match err {
230 nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
231 _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
232 }
233 }
234 };
235 if bytes_written == 0 && time.elapsed() >= timeout {
236 return Err(ShellError::IoTimeout);
238 }
239 }
240 Ok(())
241 }
242
243}
244
245#[cfg(test)]
248mod tests {
249
250 use super::*;
251
252 use std::thread;
253 use std::time::Duration;
254
255 #[test]
256 fn test_pipe_open_close() {
257 let tmpdir: tempfile::TempDir = create_tmp_dir();
258 let pipe_path: PathBuf = tmpdir.path().join("test.fifo");
259 let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
260 assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
261 let pipe: Pipe = pipe.unwrap();
262 assert_eq!(pipe.path, pipe_path);
263 assert!(pipe.fd > 0);
264 assert!(pipe.close().is_ok());
265 }
266
267 #[test]
268 fn test_pipe_io() {
269 let tmpdir: tempfile::TempDir = create_tmp_dir();
270 let pipe_path: PathBuf = tmpdir.path().join("stdout.fifo");
271 let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
273 assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
274 let pipe: Pipe = pipe.unwrap();
275 let pipe_thread: Pipe = pipe.clone();
276 let join_hnd: thread::JoinHandle<()> = thread::spawn(move || {
278 let input: String = pipe_thread.read(1000, true).unwrap().unwrap();
279 assert_eq!(input, String::from("HELLO\n"));
280 thread::sleep(Duration::from_millis(100)); assert!(pipe_thread.write(String::from("HI THERE\n"), 1000).is_ok());
283 });
284 assert!(pipe.write(String::from("HELLO\n"), 1000).is_ok(), "Write timeout");
286 thread::sleep(Duration::from_millis(100)); let read: Result<Option<String>, ShellError> = pipe.read(1000, true);
289 assert!(read.is_ok(), format!("Read should be Ok, but is {:?}", read));
290 let read: Option<String> = read.unwrap();
291 assert_eq!(read.unwrap(), String::from("HI THERE\n"));
292 assert!(join_hnd.join().is_ok());
294 assert!(pipe.close().is_ok());
296 }
297
298 #[test]
299 fn test_pipe_read_all() {
300 let tmpdir: tempfile::TempDir = create_tmp_dir();
301 let pipe_path: PathBuf = tmpdir.path().join("stdout.fifo");
302 let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
304 assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
305 let pipe: Pipe = pipe.unwrap();
306 let pipe_thread: Pipe = pipe.clone();
307 let join_hnd: thread::JoinHandle<()> = thread::spawn(move || {
309 let mut data: String = String::with_capacity(10240);
310 for _ in 0..10240 {
311 data.push('c');
312 }
313 assert!(pipe_thread.write(data.clone(), 1000).is_ok());
315 thread::sleep(Duration::from_millis(500)); assert!(pipe_thread.write(data, 1000).is_ok());
318 });
319 assert_eq!(pipe.read(500, true).unwrap().unwrap().len(), 10240);
321 thread::sleep(Duration::from_millis(500)); assert_eq!(pipe.read(500, false).unwrap().unwrap().len(), 8192);
325 assert_eq!(pipe.read(500, false).unwrap().unwrap().len(), 2048);
327 assert!(join_hnd.join().is_ok());
329 assert!(pipe.close().is_ok());
331 }
332
333 #[test]
334 fn test_pipe_open_close_error() {
335 let pipe_path: PathBuf = PathBuf::from("/dev/tty1");
337 let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
338 assert!(pipe.is_err());
339 let pipe: Pipe = Pipe {
341 fd: 10,
342 path: PathBuf::from("/tmp/stdout.fifo")
343 };
344 assert!(pipe.close().is_err());
345 }
346
347 #[test]
348 fn test_pipe_io_error() {
349 let tmpdir: tempfile::TempDir = create_tmp_dir();
350 let pipe_path: PathBuf = tmpdir.path().join("stdout.fifo");
351 let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
353 assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
354 let pipe: Pipe = pipe.unwrap();
355 assert!(pipe.read(1000, true).unwrap().is_none(), "Read should be None");
357 assert!(pipe.close().is_ok());
358 }
359
360 fn create_tmp_dir() -> tempfile::TempDir {
361 tempfile::TempDir::new().unwrap()
362 }
363
364}