pyc_shell/shell/proc/
pipe.rs

1//! ## Pipe
2//!
3//! `Pipe` provides an api to interface with UNIX pipes
4
5/*
6*
7*   Copyright (C) 2020 Christian Visintin - christian.visintin1997@gmail.com
8*
9* 	This file is part of "Pyc"
10*
11*   Pyc is free software: you can redistribute it and/or modify
12*   it under the terms of the GNU General Public License as published by
13*   the Free Software Foundation, either version 3 of the License, or
14*   (at your option) any later version.
15*
16*   Pyc is distributed in the hope that it will be useful,
17*   but WITHOUT ANY WARRANTY; without even the implied warranty of
18*   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19*   GNU General Public License for more details.
20*
21*   You should have received a copy of the GNU General Public License
22*   along with Pyc.  If not, see <http://www.gnu.org/licenses/>.
23*
24*/
25
26extern crate nix;
27
28use super::{ShellError};
29
30use std::path::PathBuf;
31use std::os::unix::io::RawFd;
32use std::time::{Instant, Duration};
33
34//UNIX
35use nix::unistd;
36
37#[derive(Clone, std::fmt::Debug)]
38pub(crate) struct Pipe {
39    pub path: PathBuf, //Pipe path
40    pub fd: RawFd
41}
42
43impl Pipe {
44
45    /// ### open
46    /// 
47    /// Open and creates a new pipe. Returns pipe on suceess or shell error
48    pub fn open(path: &PathBuf) -> Result<Pipe, ShellError> {
49        //Mkfifo - Not necessary with O_CREAT
50        if let Err(err) = unistd::mkfifo(path.as_path(), nix::sys::stat::Mode::S_IRWXU | nix::sys::stat::Mode::S_IRWXG | nix::sys::stat::Mode::S_IRWXO) {
51            match err {
52                nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
53                _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
54            }
55        }
56        //Open fifo
57        match nix::fcntl::open(path.as_path(), nix::fcntl::OFlag::O_RDWR, nix::sys::stat::Mode::S_IRWXU | nix::sys::stat::Mode::S_IRWXG | nix::sys::stat::Mode::S_IRWXO) {
58            Ok(fd) => {
59                Ok(Pipe {
60                    path: path.clone(),
61                    fd: fd
62                })
63            },
64            Err(err) => {
65                match err {
66                    nix::Error::Sys(errno) => Err(ShellError::PipeError(errno)),
67                    _ => Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
68                }
69            }
70        }
71    }
72
73    /// ### close
74    /// 
75    /// Close and delete pipe
76    pub fn close(&self) -> Result<(), ShellError> {
77        if let Err(err) = unistd::close(self.fd) {
78            match err {
79                nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
80                _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
81            }
82        };
83        //Unlink pipe
84        let _ = unistd::unlink(self.path.as_path());
85        Ok(())
86    }
87
88    /// ### read
89    /// 
90    /// Read from pipe
91    /// If read_all parameter is False, then the function returns after reading 8192 or less
92    /// otherwise, if set to True, reads until there's something available to be read
93    pub fn read(&self, timeout: u64, read_all: bool) -> Result<Option<String>, ShellError> {
94        //Create poll fd wrapper
95        let mut poll_fds: [nix::poll::PollFd; 1] = [nix::poll::PollFd::new(self.fd, nix::poll::PollFlags::POLLIN | nix::poll::PollFlags::POLLRDBAND | nix::poll::PollFlags::POLLHUP)];
96        //Prepare out buffer
97        let mut data_out: String = String::new();
98        let mut data_size: usize = 0;
99        //Prepare times
100        let timeout: Duration = Duration::from_millis(timeout);
101        let time: Instant = Instant::now();
102        while time.elapsed() < timeout {
103            //Poll pipe
104            match nix::poll::poll(&mut poll_fds, 50) {
105                Ok(ret) => {
106                    if ret > 0 && poll_fds[0].revents().is_some() { //Fifo is available to be read
107                        let event: nix::poll::PollFlags = poll_fds[0].revents().unwrap();
108                        if event.intersects(nix::poll::PollFlags::POLLIN) || event.intersects(nix::poll::PollFlags::POLLRDBAND) {
109                            //Read from FIFO
110                            let mut buffer: [u8; 8192] = [0; 8192];
111                            match unistd::read(self.fd, &mut buffer) {
112                                Ok(bytes_read) => {
113                                    data_size += bytes_read;
114                                    //Push bytes converted to string to data out
115                                    data_out.push_str(match std::str::from_utf8(&buffer[0..bytes_read]) {
116                                        Ok(s) => s,
117                                        Err(_) => {
118                                            return Err(ShellError::InvalidData)
119                                        }
120                                    });
121                                    if ! read_all {
122                                        break;
123                                    }
124                                },
125                                Err(err) => {
126                                    match err {
127                                        nix::Error::Sys(errno) => {
128                                            match errno {
129                                                nix::errno::Errno::EAGAIN => { //No more data is available to be read
130                                                    if data_size == 0 {
131                                                        continue; //Keep waiting for data
132                                                    } else {
133                                                        break; //All data has been read
134                                                    }
135                                                },
136                                                _ => return Err(ShellError::PipeError(errno)) //Error
137                                            }
138                                        },
139                                        _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
140                                    }
141                                }
142                            };
143                        } else if event.intersects(nix::poll::PollFlags::POLLERR) { //FIFO is in error state
144                            return Err(ShellError::PipeError(nix::errno::Errno::EPIPE))
145                        } else if event.intersects(nix::poll::PollFlags::POLLHUP) { //No more data
146                            //no data is available; if data is something break; otherwise continue
147                            if data_size == 0 {
148                                continue;
149                            } else {
150                                break;
151                            }
152                        }
153                    } else if ret == 0 {
154                        //no data is available; if data is something break; otherwise continue
155                        if data_size == 0 {
156                            continue;
157                        } else {
158                            break;
159                        }
160                    }
161                },
162                Err(err) => { //Handle poll error
163                    match err {
164                        nix::Error::Sys(errno) => {
165                            match errno {
166                                nix::errno::Errno::EAGAIN => { //No more data is available to be read
167                                    if data_size == 0 {
168                                        continue; //Keep waiting for data
169                                    } else {
170                                        break; //All data has been read
171                                    }
172                                },
173                                _ => return Err(ShellError::PipeError(errno)) //Error
174                            }
175                        },
176                        _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
177                    }
178                }
179            }
180        }
181        //Return data
182        match data_size {
183            0 => Ok(None),
184            _ => Ok(Some(data_out))
185        }
186    }
187
188    /// ### write
189    /// 
190    /// Write data out to pipe
191    pub fn write(&self, data: String, timeout: u64) -> Result<(), ShellError> {
192        //Create poll fd wrapper
193        let mut poll_fds: [nix::poll::PollFd; 1] = [nix::poll::PollFd::new(self.fd, nix::poll::PollFlags::POLLOUT)];
194        //Prepare times
195        let timeout: Duration = Duration::from_millis(timeout);
196        let time: Instant = Instant::now();
197        //Prepare data out
198        let data_out = data.as_bytes();
199        let total_bytes_amount: usize = data_out.len();
200        //Write bytes
201        let mut bytes_written: usize = 0;
202        while bytes_written < total_bytes_amount {
203            match nix::poll::poll(&mut poll_fds, 50) {
204                Ok(_) => {
205                    if let Some(revents) = poll_fds[0].revents() {
206                        if revents.intersects(nix::poll::PollFlags::POLLOUT) {
207                            //Write data out (8192 or remaining bytes)
208                            let bytes_out = if total_bytes_amount - bytes_written > 8192 {
209                                8192
210                            } else {
211                                total_bytes_amount - bytes_written
212                            };
213                            //Write data out
214                            match unistd::write(self.fd, &data_out[bytes_written..(bytes_written + bytes_out)]) {
215                                Ok(bytes) => {
216                                    bytes_written += bytes; //Increment bytes written of bytes
217                                },
218                                Err(err) => {
219                                    match err {
220                                        nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
221                                        _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
222                                    }
223                                }
224                            }
225                        }
226                    }
227                },
228                Err(err) => {
229                    match err {
230                        nix::Error::Sys(errno) => return Err(ShellError::PipeError(errno)),
231                        _ => return Err(ShellError::PipeError(nix::errno::Errno::UnknownErrno))
232                    }
233                }
234            };
235            if bytes_written == 0 && time.elapsed() >= timeout {
236                //Return Io Timeout
237                return Err(ShellError::IoTimeout);
238            }
239        }
240        Ok(())
241    }
242
243}
244
245//@! Test module
246
247#[cfg(test)]
248mod tests {
249
250    use super::*;
251
252    use std::thread;
253    use std::time::Duration;
254
255    #[test]
256    fn test_pipe_open_close() {
257        let tmpdir: tempfile::TempDir = create_tmp_dir();
258        let pipe_path: PathBuf = tmpdir.path().join("test.fifo");
259        let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
260        assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
261        let pipe: Pipe = pipe.unwrap();
262        assert_eq!(pipe.path, pipe_path);
263        assert!(pipe.fd > 0);
264        assert!(pipe.close().is_ok());
265    }
266
267    #[test]
268    fn test_pipe_io() {
269        let tmpdir: tempfile::TempDir = create_tmp_dir();
270        let pipe_path: PathBuf = tmpdir.path().join("stdout.fifo");
271        //Open Pipe
272        let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
273        assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
274        let pipe: Pipe = pipe.unwrap();
275        let pipe_thread: Pipe = pipe.clone();
276        //Start thread
277        let join_hnd: thread::JoinHandle<()> = thread::spawn(move || {
278            let input: String = pipe_thread.read(1000, true).unwrap().unwrap();
279            assert_eq!(input, String::from("HELLO\n"));
280            thread::sleep(Duration::from_millis(100)); //Sleep for 100 msecond
281            //Write
282            assert!(pipe_thread.write(String::from("HI THERE\n"), 1000).is_ok());
283        });
284        //Write pipe
285        assert!(pipe.write(String::from("HELLO\n"), 1000).is_ok(), "Write timeout");
286        //Read pipe
287        thread::sleep(Duration::from_millis(100)); //Sleep for 100 msecond
288        let read: Result<Option<String>, ShellError> = pipe.read(1000, true);
289        assert!(read.is_ok(), format!("Read should be Ok, but is {:?}", read));
290        let read: Option<String> = read.unwrap();
291        assert_eq!(read.unwrap(), String::from("HI THERE\n"));
292        //Join thread
293        assert!(join_hnd.join().is_ok());
294        //Close Pipe
295        assert!(pipe.close().is_ok());
296    }
297
298    #[test]
299    fn test_pipe_read_all() {
300        let tmpdir: tempfile::TempDir = create_tmp_dir();
301        let pipe_path: PathBuf = tmpdir.path().join("stdout.fifo");
302        //Open Pipe
303        let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
304        assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
305        let pipe: Pipe = pipe.unwrap();
306        let pipe_thread: Pipe = pipe.clone();
307        //Start thread
308        let join_hnd: thread::JoinHandle<()> = thread::spawn(move || {
309            let mut data: String = String::with_capacity(10240);
310            for _ in 0..10240 {
311                data.push('c');
312            }
313            //Write 10240 bytes
314            assert!(pipe_thread.write(data.clone(), 1000).is_ok());
315            thread::sleep(Duration::from_millis(500)); //Sleep for 500 msecond
316            //Write
317            assert!(pipe_thread.write(data, 1000).is_ok());
318        });
319        //Read all (10240 bytes should be read)
320        assert_eq!(pipe.read(500, true).unwrap().unwrap().len(), 10240);
321        //Read all set to false
322        thread::sleep(Duration::from_millis(500)); //Sleep for 500 msecond
323        //Now only 8192 bytes should have been read
324        assert_eq!(pipe.read(500, false).unwrap().unwrap().len(), 8192);
325        //Now finish to read
326        assert_eq!(pipe.read(500, false).unwrap().unwrap().len(), 2048);
327        //Join thread
328        assert!(join_hnd.join().is_ok());
329        //Close Pipe
330        assert!(pipe.close().is_ok());
331    }
332
333    #[test]
334    fn test_pipe_open_close_error() {
335        //Open error
336        let pipe_path: PathBuf = PathBuf::from("/dev/tty1");
337        let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
338        assert!(pipe.is_err());
339        //Close error
340        let pipe: Pipe = Pipe {
341            fd: 10,
342            path: PathBuf::from("/tmp/stdout.fifo")
343        };
344        assert!(pipe.close().is_err());
345    }
346
347    #[test]
348    fn test_pipe_io_error() {
349        let tmpdir: tempfile::TempDir = create_tmp_dir();
350        let pipe_path: PathBuf = tmpdir.path().join("stdout.fifo");
351        //Open Pipe
352        let pipe: Result<Pipe, ShellError> = Pipe::open(&pipe_path);
353        assert!(pipe.is_ok(), format!("Pipe ({}) should be OK, but is {:?}", pipe_path.display(), pipe));
354        let pipe: Pipe = pipe.unwrap();
355        //assert!(pipe.write(String::from("HELLO\n"), 1000).is_err(), "Write should time out");
356        assert!(pipe.read(1000, true).unwrap().is_none(), "Read should be None");
357        assert!(pipe.close().is_ok());
358    }
359
360    fn create_tmp_dir() -> tempfile::TempDir {
361        tempfile::TempDir::new().unwrap()
362    }
363
364}