expectrl 0.6.0

A tool for automating terminal applications in Unix like Don libes expect
Documentation
//! Module contains a Session structure.

use std::{
    convert::TryInto,
    io::{self, BufRead, BufReader, Read, Write},
    ops::{Deref, DerefMut},
    time::{self, Duration},
};

use crate::{
    control_code::ControlCode,
    error::{to_io_error, Error},
    needle::Needle,
    process::NonBlocking,
    Captures,
};

/// Session represents a spawned process and its streams.
/// It controlls process and communication with it.
#[derive(Debug)]
pub struct Session<P, S> {
    proc: P,
    stream: TryStream<S>,
    expect_timeout: Option<Duration>,
    expect_lazy: bool,
}

impl<P, S> Session<P, S>
where
    S: Read,
{
    pub(crate) fn new(process: P, stream: S) -> io::Result<Self> {
        let stream = TryStream::new(stream)?;
        Ok(Self {
            proc: process,
            stream,
            expect_timeout: Some(Duration::from_millis(10000)),
            expect_lazy: false,
        })
    }

    pub(crate) fn swap_stream<F: FnOnce(S) -> R, R: Read>(
        mut self,
        new_stream: F,
    ) -> Result<Session<P, R>, Error> {
        self.stream.flush_in_buffer();
        let buf = self.stream.get_available().to_owned();

        let stream = self.stream.into_inner();
        let new_stream = new_stream(stream);

        let mut session = Session::new(self.proc, new_stream)?;
        session.stream.keep_in_buffer(&buf);
        Ok(session)
    }
}

impl<P, S> Session<P, S> {
    /// Set the pty session's expect timeout.
    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
        self.expect_timeout = expect_timeout;
    }

    /// Set a expect algorithm to be either gready or lazy.
    ///
    /// Default algorithm is gready.
    ///
    /// See [Session::expect].
    pub fn set_expect_lazy(&mut self, lazy: bool) {
        self.expect_lazy = lazy;
    }

    /// Get a reference to original stream.
    pub fn get_stream(&self) -> &S {
        self.stream.as_ref()
    }
}

impl<P, S: Read + NonBlocking> Session<P, S> {
    /// Expect waits until a pattern is matched.
    ///
    /// If the method returns [Ok] it is guaranteed that at least 1 match was found.
    ///
    /// The match algorthm can be either
    ///     - gready
    ///     - lazy
    ///
    /// You can set one via [Session::set_expect_lazy].
    /// Default version is gready.
    ///
    /// The implications are.
    /// Imagine you use [crate::Regex] `"\d+"` to find a match.
    /// And your process outputs `123`.
    /// In case of lazy approach we will match `1`.
    /// Where's in case of gready one we will match `123`.
    ///
    /// # Example
    ///
    #[cfg_attr(windows, doc = "```no_run")]
    #[cfg_attr(unix, doc = "```")]
    /// let mut p = expectrl::spawn("echo 123").unwrap();
    /// let m = p.expect(expectrl::Regex("\\d+")).unwrap();
    /// assert_eq!(m.get(0).unwrap(), b"123");
    /// ```
    ///
    #[cfg_attr(windows, doc = "```no_run")]
    #[cfg_attr(unix, doc = "```")]
    /// let mut p = expectrl::spawn("echo 123").unwrap();
    /// p.set_expect_lazy(true);
    /// let m = p.expect(expectrl::Regex("\\d+")).unwrap();
    /// assert_eq!(m.get(0).unwrap(), b"1");
    /// ```
    ///
    /// This behaviour is different from [Session::check].
    ///
    /// It returns an error if timeout is reached.
    /// You can specify a timeout value by [Session::set_expect_timeout] method.
    pub fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
    where
        N: Needle,
    {
        match self.expect_lazy {
            true => self.expect_lazy(needle),
            false => self.expect_gready(needle),
        }
    }

    /// Expect which fills as much as possible to the buffer.
    ///
    /// See [Session::expect].
    fn expect_gready<N>(&mut self, needle: N) -> Result<Captures, Error>
    where
        N: Needle,
    {
        let start = time::Instant::now();
        loop {
            let eof = self.stream.read_available()?;
            let data = self.stream.get_available();

            let found = needle.check(data, eof)?;
            if !found.is_empty() {
                let end_index = Captures::right_most_index(&found);
                let involved_bytes = data[..end_index].to_vec();
                self.stream.consume_available(end_index);

                return Ok(Captures::new(involved_bytes, found));
            }

            if eof {
                return Err(Error::Eof);
            }

            if let Some(timeout) = self.expect_timeout {
                if start.elapsed() > timeout {
                    return Err(Error::ExpectTimeout);
                }
            }
        }
    }

    /// Expect which reads byte by byte.
    ///
    /// See [Session::expect].
    fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
    where
        N: Needle,
    {
        let mut checking_data_length = 0;
        let mut eof = false;
        let start = time::Instant::now();
        loop {
            let mut available = self.stream.get_available();
            if checking_data_length == available.len() {
                // We read by byte to make things as lazy as possible.
                //
                // It's chose is important in using Regex as a Needle.
                // Imagine we have a `\d+` regex.
                // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
                //
                // The second reason is
                // if we wouldn't read by byte EOF indication could be lost.
                // And next blocking std::io::Read operation could be blocked forever.
                //
                // We could read all data available via `read_available` to reduce IO operations,
                // but in such case we would need to keep a EOF indicator internally in stream,
                // which is OK if EOF happens onces, but I am not sure if this is a case.
                eof = self.stream.read_available_once(&mut [0; 1])? == Some(0);
                available = self.stream.get_available();
            }

            // We intentinally not increase the counter
            // and run check one more time even though the data isn't changed.
            // Because it may be important for custom implementations of Needle.
            if checking_data_length < available.len() {
                checking_data_length += 1;
            }

            let data = &available[..checking_data_length];

            let found = needle.check(data, eof)?;
            if !found.is_empty() {
                let end_index = Captures::right_most_index(&found);
                let involved_bytes = data[..end_index].to_vec();
                self.stream.consume_available(end_index);
                return Ok(Captures::new(involved_bytes, found));
            }

            if eof {
                return Err(Error::Eof);
            }

            if let Some(timeout) = self.expect_timeout {
                if start.elapsed() > timeout {
                    return Err(Error::ExpectTimeout);
                }
            }
        }
    }

    /// Check verifies if a pattern is matched.
    /// Returns empty found structure if nothing found.
    ///
    /// Is a non blocking version of [Session::expect].
    /// But its strategy of matching is different from it.
    /// It makes search against all bytes available.
    ///
    /// # Example
    ///
    #[cfg_attr(windows, doc = "```no_run")]
    #[cfg_attr(unix, doc = "```")]
    /// use expectrl::{spawn, Regex};
    /// use std::time::Duration;
    ///
    /// let mut p = spawn("echo 123").unwrap();
    /// #
    /// # // wait to guarantee that check echo worked out (most likely)
    /// # std::thread::sleep(Duration::from_millis(500));
    /// #
    /// let m = p.check(Regex("\\d+")).unwrap();
    /// assert_eq!(m.get(0).unwrap(), b"123");
    /// ```
    pub fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
    where
        N: Needle,
    {
        let eof = self.stream.read_available()?;
        let buf = self.stream.get_available();

        let found = needle.check(buf, eof)?;
        if !found.is_empty() {
            let end_index = Captures::right_most_index(&found);
            let involved_bytes = buf[..end_index].to_vec();
            self.stream.consume_available(end_index);
            return Ok(Captures::new(involved_bytes, found));
        }

        if eof {
            return Err(Error::Eof);
        }

        Ok(Captures::new(Vec::new(), Vec::new()))
    }

    /// The functions checks if a pattern is matched.
    /// It doesn’t consumes bytes from stream.
    ///
    /// Its strategy of matching is different from the one in [Session::expect].
    /// It makes search agains all bytes available.
    ///
    /// If you want to get a matched result [Session::check] and [Session::expect] is a better option.
    /// Because it is not guaranteed that [Session::check] or [Session::expect] with the same parameters:
    ///     - will successed even right after Session::is_matched call.
    ///     - will operate on the same bytes.
    ///
    /// IMPORTANT:
    ///  
    /// If you call this method with [crate::Eof] pattern be aware that eof
    /// indication MAY be lost on the next interactions.
    /// It depends from a process you spawn.
    /// So it might be better to use [Session::check] or [Session::expect] with Eof.
    ///
    /// # Example
    ///
    #[cfg_attr(windows, doc = "```no_run")]
    #[cfg_attr(unix, doc = "```")]
    /// use expectrl::{spawn, Regex};
    /// use std::time::Duration;
    ///
    /// let mut p = spawn("cat").unwrap();
    /// p.send_line("123");
    /// # // wait to guarantee that check echo worked out (most likely)
    /// # std::thread::sleep(Duration::from_secs(1));
    /// let m = p.is_matched(Regex("\\d+")).unwrap();
    /// assert_eq!(m, true);
    /// ```
    pub fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
    where
        N: Needle,
    {
        let eof = self.stream.read_available()?;
        let buf = self.stream.get_available();

        let found = needle.check(buf, eof)?;
        if !found.is_empty() {
            return Ok(true);
        }

        if eof {
            return Err(Error::Eof);
        }

        Ok(false)
    }
}

impl<P, S: Write> Session<P, S> {
    /// Send text to child’s STDIN.
    ///
    /// You can also use methods from [std::io::Write] instead.
    pub fn send(&mut self, s: impl AsRef<str>) -> io::Result<()> {
        self.stream.write_all(s.as_ref().as_bytes())
    }

    /// Send a line to child’s STDIN.
    pub fn send_line(&mut self, s: impl AsRef<str>) -> io::Result<()> {
        // fixme: move it to a processes stream function.
        #[cfg(windows)]
        {
            // win32 has writefilegather function which could be used as write_vectored but it asyncronos which may involve some issue?
            // https://docs.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-writefilegather

            const LINE_ENDING: &[u8] = b"\r\n";
            let _ = self.write_all(s.as_ref().as_bytes())?;
            let _ = self.write_all(LINE_ENDING)?;
            self.flush()?;
            Ok(())
        }
        #[cfg(not(windows))]
        {
            const LINE_ENDING: &[u8] = b"\n";

            let bufs = &mut [
                std::io::IoSlice::new(s.as_ref().as_bytes()),
                std::io::IoSlice::new(LINE_ENDING),
                std::io::IoSlice::new(&[]), // we need to add a empty one as it may be not written.
            ];

            // As Write trait says it's not guaranteed that write_vectored will write_all data.
            // But we are sure that write_vectored writes everyting or nothing because underthehood it uses a File.
            // But we rely on this fact not explicitely.
            //
            // todo: check amount of written bytes ands write the rest if not everyting was written already.
            let _ = self.write_vectored(bufs)?;
            self.flush()?;

            Ok(())
        }
    }

    /// Sends controll character to a child process.
    ///
    /// You must be carefull passing a char or &str as an argument.
    /// If you pass an unexpected controll you’ll get a error.
    /// So it may be better to use [ControlCode].
    ///
    /// # Example
    ///
    /// ```
    /// use expectrl::{spawn, ControlCode};
    ///
    /// let mut process = spawn("cat").unwrap();
    ///
    /// process.send_control(ControlCode::EndOfText); // sends CTRL^C
    /// process.send_control('C'); // sends CTRL^C
    /// process.send_control("^C"); // sends CTRL^C
    /// ```
    pub fn send_control(&mut self, code: impl TryInto<ControlCode>) -> io::Result<()> {
        let code = code
            .try_into()
            .map_err(|_| to_io_error("Failed to parse a control character")(""))?;
        self.stream.write_all(&[code.into()])
    }
}

impl<P, S: Read + NonBlocking> Session<P, S> {
    /// Try to read in a non-blocking mode.
    ///
    /// Returns `[std::io::ErrorKind::WouldBlock]`
    /// in case if there's nothing to read.
    pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.stream.try_read(buf)
    }

    /// Verifyes if stream is empty or not.
    pub fn is_empty(&mut self) -> io::Result<bool> {
        self.stream.is_empty()
    }
}

impl<P, S: Write> Write for Session<P, S> {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        self.stream.write(buf)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        self.stream.flush()
    }

    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
        self.stream.write_vectored(bufs)
    }
}

impl<P, S: Read> Read for Session<P, S> {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        self.stream.read(buf)
    }
}

impl<P, S: Read> BufRead for Session<P, S> {
    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
        self.stream.fill_buf()
    }

    fn consume(&mut self, amt: usize) {
        self.stream.consume(amt)
    }
}

impl<P, S> Deref for Session<P, S> {
    type Target = P;

    fn deref(&self) -> &Self::Target {
        &self.proc
    }
}

impl<P, S> DerefMut for Session<P, S> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.proc
    }
}

#[derive(Debug)]
struct TryStream<S> {
    stream: ControlledReader<S>,
}

impl<S> TryStream<S> {
    fn into_inner(self) -> S {
        self.stream.inner.into_inner().inner
    }

    fn as_ref(&self) -> &S {
        &self.stream.inner.get_ref().inner
    }
}

impl<S: Read> TryStream<S> {
    /// The function returns a new Stream from a file.
    fn new(stream: S) -> io::Result<Self> {
        Ok(Self {
            stream: ControlledReader::new(stream),
        })
    }

    fn flush_in_buffer(&mut self) {
        self.stream.flush_in_buffer();
    }
}

impl<S> TryStream<S> {
    fn keep_in_buffer(&mut self, v: &[u8]) {
        self.stream.keep_in_buffer(v);
    }

    fn get_available(&mut self) -> &[u8] {
        self.stream.get_available()
    }

    fn consume_available(&mut self, n: usize) {
        self.stream.consume_available(n)
    }
}

impl<R: Read + NonBlocking> TryStream<R> {
    /// Try to read in a non-blocking mode.
    ///
    /// It raises io::ErrorKind::WouldBlock if there's nothing to read.
    fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.stream.get_mut().set_non_blocking()?;

        let result = self.stream.inner.read(buf);

        // As file is DUPed changes in one descriptor affects all ones
        // so we need to make blocking file after we finished.
        self.stream.get_mut().set_blocking()?;

        result
    }

    #[allow(clippy::wrong_self_convention)]
    fn is_empty(&mut self) -> io::Result<bool> {
        match self.try_read(&mut []) {
            Ok(0) => Ok(true),
            Ok(_) => Ok(false),
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(true),
            Err(err) => Err(err),
        }
    }

    fn read_available(&mut self) -> std::io::Result<bool> {
        self.stream.flush_in_buffer();

        let mut buf = [0; 248];
        loop {
            match self.try_read_inner(&mut buf) {
                Ok(0) => break Ok(true),
                Ok(n) => {
                    self.stream.keep_in_buffer(&buf[..n]);
                }
                Err(err) if err.kind() == io::ErrorKind::WouldBlock => break Ok(false),
                Err(err) => break Err(err),
            }
        }
    }

    fn read_available_once(&mut self, buf: &mut [u8]) -> std::io::Result<Option<usize>> {
        self.stream.flush_in_buffer();

        match self.try_read_inner(buf) {
            Ok(0) => Ok(Some(0)),
            Ok(n) => {
                self.stream.keep_in_buffer(&buf[..n]);
                Ok(Some(n))
            }
            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(None),
            Err(err) => Err(err),
        }
    }

    // non-buffered && non-blocking read
    fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.stream.get_mut().set_non_blocking()?;

        let result = self.stream.get_mut().read(buf);

        // As file is DUPed changes in one descriptor affects all ones
        // so we need to make blocking file after we finished.
        self.stream.get_mut().set_blocking()?;

        result
    }
}

impl<S: Write> Write for TryStream<S> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        self.stream.inner.get_mut().inner.write(buf)
    }

    fn flush(&mut self) -> io::Result<()> {
        self.stream.inner.get_mut().inner.flush()
    }

    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
        self.stream.inner.get_mut().inner.write_vectored(bufs)
    }
}

impl<R: Read> Read for TryStream<R> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        self.stream.inner.read(buf)
    }
}

impl<R: Read> BufRead for TryStream<R> {
    fn fill_buf(&mut self) -> io::Result<&[u8]> {
        self.stream.inner.fill_buf()
    }

    fn consume(&mut self, amt: usize) {
        self.stream.inner.consume(amt)
    }
}

#[derive(Debug)]
struct ControlledReader<R> {
    inner: BufReader<BufferedReader<R>>,
}

impl<R: Read> ControlledReader<R> {
    fn new(reader: R) -> Self {
        Self {
            inner: BufReader::new(BufferedReader::new(reader)),
        }
    }

    fn flush_in_buffer(&mut self) {
        // Because we have 2 buffered streams there might appear inconsistancy
        // in read operations and the data which was via `keep_in_buffer` function.
        //
        // To eliminate it we move BufReader buffer to our buffer.
        let b = self.inner.buffer().to_vec();
        self.inner.consume(b.len());
        self.keep_in_buffer(&b);
    }
}

impl<R> ControlledReader<R> {
    fn keep_in_buffer(&mut self, v: &[u8]) {
        self.inner.get_mut().buffer.extend(v);
    }

    fn get_mut(&mut self) -> &mut R {
        &mut self.inner.get_mut().inner
    }

    fn get_available(&mut self) -> &[u8] {
        &self.inner.get_ref().buffer
    }

    fn consume_available(&mut self, n: usize) {
        let _ = self.inner.get_mut().buffer.drain(..n);
    }
}

#[derive(Debug)]
struct BufferedReader<R> {
    inner: R,
    buffer: Vec<u8>,
}

impl<R> BufferedReader<R> {
    fn new(reader: R) -> Self {
        Self {
            inner: reader,
            buffer: Vec::new(),
        }
    }
}

impl<R: Read> Read for BufferedReader<R> {
    fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
        if self.buffer.is_empty() {
            self.inner.read(buf)
        } else {
            let n = buf.write(&self.buffer)?;
            let _ = self.buffer.drain(..n);
            Ok(n)
        }
    }
}