expectrl/session/
sync_session.rs

1//! Module contains a Session structure.
2
3use std::{
4    io::{self, BufRead, BufReader, Read, Write},
5    time::{self, Duration},
6};
7
8use crate::{
9    error::Error,
10    needle::Needle,
11    process::{Healthcheck, NonBlocking},
12    Captures,
13};
14
15/// Session represents a spawned process and its streams.
16/// It controlls process and communication with it.
17#[derive(Debug)]
18pub struct Session<P = super::OsProcess, S = super::OsProcessStream> {
19    proc: P,
20    stream: TryStream<S>,
21    expect_timeout: Option<Duration>,
22    expect_lazy: bool,
23}
24
25impl<P, S> Session<P, S>
26where
27    S: Read,
28{
29    /// Creates a new session.
30    pub fn new(process: P, stream: S) -> io::Result<Self> {
31        let stream = TryStream::new(stream)?;
32        Ok(Self {
33            proc: process,
34            stream,
35            expect_timeout: Some(Duration::from_millis(10000)),
36            expect_lazy: false,
37        })
38    }
39
40    pub(crate) fn swap_stream<F, R>(mut self, new_stream: F) -> Result<Session<P, R>, Error>
41    where
42        F: FnOnce(S) -> R,
43        R: Read,
44    {
45        self.stream.flush_in_buffer();
46        let buf = self.stream.get_available().to_owned();
47
48        let stream = self.stream.into_inner();
49        let new_stream = new_stream(stream);
50
51        let mut session = Session::new(self.proc, new_stream)?;
52        session.stream.keep_in_buffer(&buf);
53        Ok(session)
54    }
55}
56
57impl<P, S> Session<P, S> {
58    /// Set the pty session's expect timeout.
59    pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
60        self.expect_timeout = expect_timeout;
61    }
62
63    /// Set a expect algorithm to be either gready or lazy.
64    ///
65    /// Default algorithm is gready.
66    ///
67    /// See [Session::expect].
68    pub fn set_expect_lazy(&mut self, lazy: bool) {
69        self.expect_lazy = lazy;
70    }
71
72    /// Get a reference to original stream.
73    pub fn get_stream(&self) -> &S {
74        self.stream.as_ref()
75    }
76
77    /// Get a mut reference to original stream.
78    pub fn get_stream_mut(&mut self) -> &mut S {
79        self.stream.as_mut()
80    }
81
82    /// Get a reference to a process running program.
83    pub fn get_process(&self) -> &P {
84        &self.proc
85    }
86
87    /// Get a mut reference to a process running program.
88    pub fn get_process_mut(&mut self) -> &mut P {
89        &mut self.proc
90    }
91}
92
93impl<P: Healthcheck, S> Session<P, S> {
94    /// Verifies whether process is still alive.
95    pub fn is_alive(&mut self) -> Result<bool, Error> {
96        self.proc.is_alive().map_err(|err| err.into())
97    }
98}
99
100impl<P, S: Read + NonBlocking> Session<P, S> {
101    /// Expect waits until a pattern is matched.
102    ///
103    /// If the method returns [Ok] it is guaranteed that at least 1 match was found.
104    ///
105    /// The match algorthm can be either
106    ///     - gready
107    ///     - lazy
108    ///
109    /// You can set one via [Session::set_expect_lazy].
110    /// Default version is gready.
111    ///
112    /// The implications are.
113    /// Imagine you use [crate::Regex] `"\d+"` to find a match.
114    /// And your process outputs `123`.
115    /// In case of lazy approach we will match `1`.
116    /// Where's in case of gready one we will match `123`.
117    ///
118    /// # Example
119    ///
120    #[cfg_attr(windows, doc = "```no_run")]
121    #[cfg_attr(unix, doc = "```")]
122    /// let mut p = expectrl::spawn("echo 123").unwrap();
123    /// let m = p.expect(expectrl::Regex("\\d+")).unwrap();
124    /// assert_eq!(m.get(0).unwrap(), b"123");
125    /// ```
126    ///
127    #[cfg_attr(windows, doc = "```no_run")]
128    #[cfg_attr(unix, doc = "```")]
129    /// let mut p = expectrl::spawn("echo 123").unwrap();
130    /// p.set_expect_lazy(true);
131    /// let m = p.expect(expectrl::Regex("\\d+")).unwrap();
132    /// assert_eq!(m.get(0).unwrap(), b"1");
133    /// ```
134    ///
135    /// This behaviour is different from [Session::check].
136    ///
137    /// It returns an error if timeout is reached.
138    /// You can specify a timeout value by [Session::set_expect_timeout] method.
139    pub fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
140    where
141        N: Needle,
142    {
143        match self.expect_lazy {
144            true => self.expect_lazy(needle),
145            false => self.expect_gready(needle),
146        }
147    }
148
149    /// Expect which fills as much as possible to the buffer.
150    ///
151    /// See [Session::expect].
152    fn expect_gready<N>(&mut self, needle: N) -> Result<Captures, Error>
153    where
154        N: Needle,
155    {
156        let start = time::Instant::now();
157        loop {
158            let eof = self.stream.read_available()?;
159            let data = self.stream.get_available();
160
161            let found = needle.check(data, eof)?;
162            if !found.is_empty() {
163                let end_index = Captures::right_most_index(&found);
164                let involved_bytes = data[..end_index].to_vec();
165                self.stream.consume_available(end_index);
166
167                return Ok(Captures::new(involved_bytes, found));
168            }
169
170            if eof {
171                return Err(Error::Eof);
172            }
173
174            if let Some(timeout) = self.expect_timeout {
175                if start.elapsed() > timeout {
176                    return Err(Error::ExpectTimeout);
177                }
178            }
179        }
180    }
181
182    /// Expect which reads byte by byte.
183    ///
184    /// See [Session::expect].
185    fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
186    where
187        N: Needle,
188    {
189        let mut checking_data_length = 0;
190        let mut eof = false;
191        let start = time::Instant::now();
192        loop {
193            let mut available = self.stream.get_available();
194            if checking_data_length == available.len() {
195                // We read by byte to make things as lazy as possible.
196                //
197                // It's chose is important in using Regex as a Needle.
198                // Imagine we have a `\d+` regex.
199                // Using such buffer will match string `2` imidiately eventhough right after might be other digit.
200                //
201                // The second reason is
202                // if we wouldn't read by byte EOF indication could be lost.
203                // And next blocking std::io::Read operation could be blocked forever.
204                //
205                // We could read all data available via `read_available` to reduce IO operations,
206                // but in such case we would need to keep a EOF indicator internally in stream,
207                // which is OK if EOF happens onces, but I am not sure if this is a case.
208                eof = self.stream.read_available_once(&mut [0; 1])? == Some(0);
209                available = self.stream.get_available();
210            }
211
212            // We intentinally not increase the counter
213            // and run check one more time even though the data isn't changed.
214            // Because it may be important for custom implementations of Needle.
215            if checking_data_length < available.len() {
216                checking_data_length += 1;
217            }
218
219            let data = &available[..checking_data_length];
220
221            let found = needle.check(data, eof)?;
222            if !found.is_empty() {
223                let end_index = Captures::right_most_index(&found);
224                let involved_bytes = data[..end_index].to_vec();
225                self.stream.consume_available(end_index);
226                return Ok(Captures::new(involved_bytes, found));
227            }
228
229            if eof {
230                return Err(Error::Eof);
231            }
232
233            if let Some(timeout) = self.expect_timeout {
234                if start.elapsed() > timeout {
235                    return Err(Error::ExpectTimeout);
236                }
237            }
238        }
239    }
240
241    /// Check verifies if a pattern is matched.
242    /// Returns empty found structure if nothing found.
243    ///
244    /// Is a non blocking version of [Session::expect].
245    /// But its strategy of matching is different from it.
246    /// It makes search against all bytes available.
247    ///
248    /// # Example
249    ///
250    #[cfg_attr(any(windows, target_os = "macos"), doc = "```no_run")]
251    #[cfg_attr(not(any(target_os = "macos", windows)), doc = "```")]
252    /// use expectrl::{spawn, Regex};
253    /// use std::time::Duration;
254    ///
255    /// let mut p = spawn("echo 123").unwrap();
256    /// #
257    /// # // wait to guarantee that check echo worked out (most likely)
258    /// # std::thread::sleep(Duration::from_millis(500));
259    /// #
260    /// let m = p.check(Regex("\\d+")).unwrap();
261    /// assert_eq!(m.get(0).unwrap(), b"123");
262    /// ```
263    pub fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
264    where
265        N: Needle,
266    {
267        let eof = self.stream.read_available()?;
268        let buf = self.stream.get_available();
269
270        let found = needle.check(buf, eof)?;
271        if !found.is_empty() {
272            let end_index = Captures::right_most_index(&found);
273            let involved_bytes = buf[..end_index].to_vec();
274            self.stream.consume_available(end_index);
275            return Ok(Captures::new(involved_bytes, found));
276        }
277
278        if eof {
279            return Err(Error::Eof);
280        }
281
282        Ok(Captures::new(Vec::new(), Vec::new()))
283    }
284
285    /// The functions checks if a pattern is matched.
286    /// It doesn’t consumes bytes from stream.
287    ///
288    /// Its strategy of matching is different from the one in [Session::expect].
289    /// It makes search agains all bytes available.
290    ///
291    /// If you want to get a matched result [Session::check] and [Session::expect] is a better option.
292    /// Because it is not guaranteed that [Session::check] or [Session::expect] with the same parameters:
293    ///     - will successed even right after Session::is_matched call.
294    ///     - will operate on the same bytes.
295    ///
296    /// IMPORTANT:
297    ///  
298    /// If you call this method with [crate::Eof] pattern be aware that eof
299    /// indication MAY be lost on the next interactions.
300    /// It depends from a process you spawn.
301    /// So it might be better to use [Session::check] or [Session::expect] with Eof.
302    ///
303    /// # Example
304    ///
305    #[cfg_attr(windows, doc = "```no_run")]
306    #[cfg_attr(unix, doc = "```")]
307    /// use expectrl::{spawn, Regex};
308    /// use std::time::Duration;
309    ///
310    /// let mut p = spawn("cat").unwrap();
311    /// p.send_line("123");
312    /// # // wait to guarantee that check echo worked out (most likely)
313    /// # std::thread::sleep(Duration::from_secs(1));
314    /// let m = p.is_matched(Regex("\\d+")).unwrap();
315    /// assert_eq!(m, true);
316    /// ```
317    pub fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
318    where
319        N: Needle,
320    {
321        let eof = self.stream.read_available()?;
322        let buf = self.stream.get_available();
323
324        let found = needle.check(buf, eof)?;
325        if !found.is_empty() {
326            return Ok(true);
327        }
328
329        if eof {
330            return Err(Error::Eof);
331        }
332
333        Ok(false)
334    }
335}
336
337impl<Proc, Stream: Write> Session<Proc, Stream> {
338    /// Send text to child’s STDIN.
339    ///
340    /// You can also use methods from [std::io::Write] instead.
341    ///
342    /// # Example
343    ///
344    /// ```
345    /// use expectrl::{spawn, ControlCode};
346    ///
347    /// let mut proc = spawn("cat").unwrap();
348    ///
349    /// proc.send("Hello");
350    /// proc.send(b"World");
351    /// proc.send(ControlCode::try_from("^C").unwrap());
352    /// ```
353    pub fn send<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
354        self.stream.write_all(buf.as_ref())
355    }
356
357    /// Send a line to child’s STDIN.
358    ///
359    /// # Example
360    ///
361    /// ```
362    /// use expectrl::{spawn, ControlCode};
363    ///
364    /// let mut proc = spawn("cat").unwrap();
365    ///
366    /// proc.send_line("Hello");
367    /// proc.send_line(b"World");
368    /// proc.send_line(ControlCode::try_from("^C").unwrap());
369    /// ```
370    pub fn send_line<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
371        #[cfg(windows)]
372        const LINE_ENDING: &[u8] = b"\r\n";
373        #[cfg(not(windows))]
374        const LINE_ENDING: &[u8] = b"\n";
375
376        self.stream.write_all(buf.as_ref())?;
377        self.write_all(LINE_ENDING)?;
378
379        Ok(())
380    }
381}
382
383impl<P, S: Read + NonBlocking> Session<P, S> {
384    /// Try to read in a non-blocking mode.
385    ///
386    /// Returns `[std::io::ErrorKind::WouldBlock]`
387    /// in case if there's nothing to read.
388    pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
389        self.stream.try_read(buf)
390    }
391
392    /// Verifyes if stream is empty or not.
393    pub fn is_empty(&mut self) -> io::Result<bool> {
394        self.stream.is_empty()
395    }
396}
397
398impl<P, S: Write> Write for Session<P, S> {
399    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
400        self.stream.write(buf)
401    }
402
403    fn flush(&mut self) -> std::io::Result<()> {
404        self.stream.flush()
405    }
406
407    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
408        self.stream.write_vectored(bufs)
409    }
410}
411
412impl<P, S: Read> Read for Session<P, S> {
413    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
414        self.stream.read(buf)
415    }
416}
417
418impl<P, S: Read> BufRead for Session<P, S> {
419    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
420        self.stream.fill_buf()
421    }
422
423    fn consume(&mut self, amt: usize) {
424        self.stream.consume(amt)
425    }
426}
427
428#[derive(Debug)]
429struct TryStream<S> {
430    stream: ControlledReader<S>,
431}
432
433impl<S> TryStream<S> {
434    fn into_inner(self) -> S {
435        self.stream.inner.into_inner().inner
436    }
437
438    fn as_ref(&self) -> &S {
439        &self.stream.inner.get_ref().inner
440    }
441
442    fn as_mut(&mut self) -> &mut S {
443        &mut self.stream.inner.get_mut().inner
444    }
445}
446
447impl<S: Read> TryStream<S> {
448    /// The function returns a new Stream from a file.
449    fn new(stream: S) -> io::Result<Self> {
450        Ok(Self {
451            stream: ControlledReader::new(stream),
452        })
453    }
454
455    fn flush_in_buffer(&mut self) {
456        self.stream.flush_in_buffer();
457    }
458}
459
460impl<S> TryStream<S> {
461    fn keep_in_buffer(&mut self, v: &[u8]) {
462        self.stream.keep_in_buffer(v);
463    }
464
465    fn get_available(&mut self) -> &[u8] {
466        self.stream.get_available()
467    }
468
469    fn consume_available(&mut self, n: usize) {
470        self.stream.consume_available(n)
471    }
472}
473
474impl<R: Read + NonBlocking> TryStream<R> {
475    /// Try to read in a non-blocking mode.
476    ///
477    /// It raises io::ErrorKind::WouldBlock if there's nothing to read.
478    fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
479        self.stream.get_mut().set_non_blocking()?;
480
481        let result = self.stream.inner.read(buf);
482
483        // As file is DUPed changes in one descriptor affects all ones
484        // so we need to make blocking file after we finished.
485        self.stream.get_mut().set_blocking()?;
486
487        result
488    }
489
490    #[allow(clippy::wrong_self_convention)]
491    fn is_empty(&mut self) -> io::Result<bool> {
492        match self.try_read(&mut []) {
493            Ok(0) => Ok(true),
494            Ok(_) => Ok(false),
495            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(true),
496            Err(err) => Err(err),
497        }
498    }
499
500    fn read_available(&mut self) -> std::io::Result<bool> {
501        self.stream.flush_in_buffer();
502
503        let mut buf = [0; 248];
504        loop {
505            match self.try_read_inner(&mut buf) {
506                Ok(0) => break Ok(true),
507                Ok(n) => {
508                    self.stream.keep_in_buffer(&buf[..n]);
509                }
510                Err(err) if err.kind() == io::ErrorKind::WouldBlock => break Ok(false),
511                Err(err) => break Err(err),
512            }
513        }
514    }
515
516    fn read_available_once(&mut self, buf: &mut [u8]) -> std::io::Result<Option<usize>> {
517        self.stream.flush_in_buffer();
518
519        match self.try_read_inner(buf) {
520            Ok(0) => Ok(Some(0)),
521            Ok(n) => {
522                self.stream.keep_in_buffer(&buf[..n]);
523                Ok(Some(n))
524            }
525            Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(None),
526            Err(err) => Err(err),
527        }
528    }
529
530    // non-buffered && non-blocking read
531    fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result<usize> {
532        self.stream.get_mut().set_non_blocking()?;
533
534        let result = self.stream.get_mut().read(buf);
535
536        // As file is DUPed changes in one descriptor affects all ones
537        // so we need to make blocking file after we finished.
538        self.stream.get_mut().set_blocking()?;
539
540        result
541    }
542}
543
544impl<S: Write> Write for TryStream<S> {
545    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
546        self.stream.inner.get_mut().inner.write(buf)
547    }
548
549    fn flush(&mut self) -> io::Result<()> {
550        self.stream.inner.get_mut().inner.flush()
551    }
552
553    fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
554        self.stream.inner.get_mut().inner.write_vectored(bufs)
555    }
556}
557
558impl<R: Read> Read for TryStream<R> {
559    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
560        self.stream.inner.read(buf)
561    }
562}
563
564impl<R: Read> BufRead for TryStream<R> {
565    fn fill_buf(&mut self) -> io::Result<&[u8]> {
566        self.stream.inner.fill_buf()
567    }
568
569    fn consume(&mut self, amt: usize) {
570        self.stream.inner.consume(amt)
571    }
572}
573
574#[derive(Debug)]
575struct ControlledReader<R> {
576    inner: BufReader<BufferedReader<R>>,
577}
578
579impl<R: Read> ControlledReader<R> {
580    fn new(reader: R) -> Self {
581        Self {
582            inner: BufReader::new(BufferedReader::new(reader)),
583        }
584    }
585
586    fn flush_in_buffer(&mut self) {
587        // Because we have 2 buffered streams there might appear inconsistancy
588        // in read operations and the data which was via `keep_in_buffer` function.
589        //
590        // To eliminate it we move BufReader buffer to our buffer.
591        let b = self.inner.buffer().to_vec();
592        self.inner.consume(b.len());
593        self.keep_in_buffer(&b);
594    }
595}
596
597impl<R> ControlledReader<R> {
598    fn keep_in_buffer(&mut self, v: &[u8]) {
599        self.inner.get_mut().buffer.extend(v);
600    }
601
602    fn get_mut(&mut self) -> &mut R {
603        &mut self.inner.get_mut().inner
604    }
605
606    fn get_available(&mut self) -> &[u8] {
607        &self.inner.get_ref().buffer
608    }
609
610    fn consume_available(&mut self, n: usize) {
611        let _ = self.inner.get_mut().buffer.drain(..n);
612    }
613}
614
615#[derive(Debug)]
616struct BufferedReader<R> {
617    inner: R,
618    buffer: Vec<u8>,
619}
620
621impl<R> BufferedReader<R> {
622    fn new(reader: R) -> Self {
623        Self {
624            inner: reader,
625            buffer: Vec::new(),
626        }
627    }
628}
629
630impl<R: Read> Read for BufferedReader<R> {
631    fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
632        if self.buffer.is_empty() {
633            self.inner.read(buf)
634        } else {
635            let n = buf.write(&self.buffer)?;
636            let _ = self.buffer.drain(..n);
637            Ok(n)
638        }
639    }
640}