1use std::{
4 io::{self, BufRead, BufReader, Read, Write},
5 time::{self, Duration},
6};
7
8use crate::{
9 error::Error,
10 needle::Needle,
11 process::{Healthcheck, NonBlocking},
12 Captures,
13};
14
15#[derive(Debug)]
18pub struct Session<P = super::OsProcess, S = super::OsProcessStream> {
19 proc: P,
20 stream: TryStream<S>,
21 expect_timeout: Option<Duration>,
22 expect_lazy: bool,
23}
24
25impl<P, S> Session<P, S>
26where
27 S: Read,
28{
29 pub fn new(process: P, stream: S) -> io::Result<Self> {
31 let stream = TryStream::new(stream)?;
32 Ok(Self {
33 proc: process,
34 stream,
35 expect_timeout: Some(Duration::from_millis(10000)),
36 expect_lazy: false,
37 })
38 }
39
40 pub(crate) fn swap_stream<F, R>(mut self, new_stream: F) -> Result<Session<P, R>, Error>
41 where
42 F: FnOnce(S) -> R,
43 R: Read,
44 {
45 self.stream.flush_in_buffer();
46 let buf = self.stream.get_available().to_owned();
47
48 let stream = self.stream.into_inner();
49 let new_stream = new_stream(stream);
50
51 let mut session = Session::new(self.proc, new_stream)?;
52 session.stream.keep_in_buffer(&buf);
53 Ok(session)
54 }
55}
56
57impl<P, S> Session<P, S> {
58 pub fn set_expect_timeout(&mut self, expect_timeout: Option<Duration>) {
60 self.expect_timeout = expect_timeout;
61 }
62
63 pub fn set_expect_lazy(&mut self, lazy: bool) {
69 self.expect_lazy = lazy;
70 }
71
72 pub fn get_stream(&self) -> &S {
74 self.stream.as_ref()
75 }
76
77 pub fn get_stream_mut(&mut self) -> &mut S {
79 self.stream.as_mut()
80 }
81
82 pub fn get_process(&self) -> &P {
84 &self.proc
85 }
86
87 pub fn get_process_mut(&mut self) -> &mut P {
89 &mut self.proc
90 }
91}
92
93impl<P: Healthcheck, S> Session<P, S> {
94 pub fn is_alive(&mut self) -> Result<bool, Error> {
96 self.proc.is_alive().map_err(|err| err.into())
97 }
98}
99
100impl<P, S: Read + NonBlocking> Session<P, S> {
101 #[cfg_attr(windows, doc = "```no_run")]
121 #[cfg_attr(unix, doc = "```")]
122 #[cfg_attr(windows, doc = "```no_run")]
128 #[cfg_attr(unix, doc = "```")]
129 pub fn expect<N>(&mut self, needle: N) -> Result<Captures, Error>
140 where
141 N: Needle,
142 {
143 match self.expect_lazy {
144 true => self.expect_lazy(needle),
145 false => self.expect_gready(needle),
146 }
147 }
148
149 fn expect_gready<N>(&mut self, needle: N) -> Result<Captures, Error>
153 where
154 N: Needle,
155 {
156 let start = time::Instant::now();
157 loop {
158 let eof = self.stream.read_available()?;
159 let data = self.stream.get_available();
160
161 let found = needle.check(data, eof)?;
162 if !found.is_empty() {
163 let end_index = Captures::right_most_index(&found);
164 let involved_bytes = data[..end_index].to_vec();
165 self.stream.consume_available(end_index);
166
167 return Ok(Captures::new(involved_bytes, found));
168 }
169
170 if eof {
171 return Err(Error::Eof);
172 }
173
174 if let Some(timeout) = self.expect_timeout {
175 if start.elapsed() > timeout {
176 return Err(Error::ExpectTimeout);
177 }
178 }
179 }
180 }
181
182 fn expect_lazy<N>(&mut self, needle: N) -> Result<Captures, Error>
186 where
187 N: Needle,
188 {
189 let mut checking_data_length = 0;
190 let mut eof = false;
191 let start = time::Instant::now();
192 loop {
193 let mut available = self.stream.get_available();
194 if checking_data_length == available.len() {
195 eof = self.stream.read_available_once(&mut [0; 1])? == Some(0);
209 available = self.stream.get_available();
210 }
211
212 if checking_data_length < available.len() {
216 checking_data_length += 1;
217 }
218
219 let data = &available[..checking_data_length];
220
221 let found = needle.check(data, eof)?;
222 if !found.is_empty() {
223 let end_index = Captures::right_most_index(&found);
224 let involved_bytes = data[..end_index].to_vec();
225 self.stream.consume_available(end_index);
226 return Ok(Captures::new(involved_bytes, found));
227 }
228
229 if eof {
230 return Err(Error::Eof);
231 }
232
233 if let Some(timeout) = self.expect_timeout {
234 if start.elapsed() > timeout {
235 return Err(Error::ExpectTimeout);
236 }
237 }
238 }
239 }
240
241 #[cfg_attr(any(windows, target_os = "macos"), doc = "```no_run")]
251 #[cfg_attr(not(any(target_os = "macos", windows)), doc = "```")]
252 pub fn check<N>(&mut self, needle: N) -> Result<Captures, Error>
264 where
265 N: Needle,
266 {
267 let eof = self.stream.read_available()?;
268 let buf = self.stream.get_available();
269
270 let found = needle.check(buf, eof)?;
271 if !found.is_empty() {
272 let end_index = Captures::right_most_index(&found);
273 let involved_bytes = buf[..end_index].to_vec();
274 self.stream.consume_available(end_index);
275 return Ok(Captures::new(involved_bytes, found));
276 }
277
278 if eof {
279 return Err(Error::Eof);
280 }
281
282 Ok(Captures::new(Vec::new(), Vec::new()))
283 }
284
285 #[cfg_attr(windows, doc = "```no_run")]
306 #[cfg_attr(unix, doc = "```")]
307 pub fn is_matched<N>(&mut self, needle: N) -> Result<bool, Error>
318 where
319 N: Needle,
320 {
321 let eof = self.stream.read_available()?;
322 let buf = self.stream.get_available();
323
324 let found = needle.check(buf, eof)?;
325 if !found.is_empty() {
326 return Ok(true);
327 }
328
329 if eof {
330 return Err(Error::Eof);
331 }
332
333 Ok(false)
334 }
335}
336
337impl<Proc, Stream: Write> Session<Proc, Stream> {
338 pub fn send<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
354 self.stream.write_all(buf.as_ref())
355 }
356
357 pub fn send_line<B: AsRef<[u8]>>(&mut self, buf: B) -> io::Result<()> {
371 #[cfg(windows)]
372 const LINE_ENDING: &[u8] = b"\r\n";
373 #[cfg(not(windows))]
374 const LINE_ENDING: &[u8] = b"\n";
375
376 self.stream.write_all(buf.as_ref())?;
377 self.write_all(LINE_ENDING)?;
378
379 Ok(())
380 }
381}
382
383impl<P, S: Read + NonBlocking> Session<P, S> {
384 pub fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
389 self.stream.try_read(buf)
390 }
391
392 pub fn is_empty(&mut self) -> io::Result<bool> {
394 self.stream.is_empty()
395 }
396}
397
398impl<P, S: Write> Write for Session<P, S> {
399 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
400 self.stream.write(buf)
401 }
402
403 fn flush(&mut self) -> std::io::Result<()> {
404 self.stream.flush()
405 }
406
407 fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
408 self.stream.write_vectored(bufs)
409 }
410}
411
412impl<P, S: Read> Read for Session<P, S> {
413 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
414 self.stream.read(buf)
415 }
416}
417
418impl<P, S: Read> BufRead for Session<P, S> {
419 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
420 self.stream.fill_buf()
421 }
422
423 fn consume(&mut self, amt: usize) {
424 self.stream.consume(amt)
425 }
426}
427
428#[derive(Debug)]
429struct TryStream<S> {
430 stream: ControlledReader<S>,
431}
432
433impl<S> TryStream<S> {
434 fn into_inner(self) -> S {
435 self.stream.inner.into_inner().inner
436 }
437
438 fn as_ref(&self) -> &S {
439 &self.stream.inner.get_ref().inner
440 }
441
442 fn as_mut(&mut self) -> &mut S {
443 &mut self.stream.inner.get_mut().inner
444 }
445}
446
447impl<S: Read> TryStream<S> {
448 fn new(stream: S) -> io::Result<Self> {
450 Ok(Self {
451 stream: ControlledReader::new(stream),
452 })
453 }
454
455 fn flush_in_buffer(&mut self) {
456 self.stream.flush_in_buffer();
457 }
458}
459
460impl<S> TryStream<S> {
461 fn keep_in_buffer(&mut self, v: &[u8]) {
462 self.stream.keep_in_buffer(v);
463 }
464
465 fn get_available(&mut self) -> &[u8] {
466 self.stream.get_available()
467 }
468
469 fn consume_available(&mut self, n: usize) {
470 self.stream.consume_available(n)
471 }
472}
473
474impl<R: Read + NonBlocking> TryStream<R> {
475 fn try_read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
479 self.stream.get_mut().set_non_blocking()?;
480
481 let result = self.stream.inner.read(buf);
482
483 self.stream.get_mut().set_blocking()?;
486
487 result
488 }
489
490 #[allow(clippy::wrong_self_convention)]
491 fn is_empty(&mut self) -> io::Result<bool> {
492 match self.try_read(&mut []) {
493 Ok(0) => Ok(true),
494 Ok(_) => Ok(false),
495 Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(true),
496 Err(err) => Err(err),
497 }
498 }
499
500 fn read_available(&mut self) -> std::io::Result<bool> {
501 self.stream.flush_in_buffer();
502
503 let mut buf = [0; 248];
504 loop {
505 match self.try_read_inner(&mut buf) {
506 Ok(0) => break Ok(true),
507 Ok(n) => {
508 self.stream.keep_in_buffer(&buf[..n]);
509 }
510 Err(err) if err.kind() == io::ErrorKind::WouldBlock => break Ok(false),
511 Err(err) => break Err(err),
512 }
513 }
514 }
515
516 fn read_available_once(&mut self, buf: &mut [u8]) -> std::io::Result<Option<usize>> {
517 self.stream.flush_in_buffer();
518
519 match self.try_read_inner(buf) {
520 Ok(0) => Ok(Some(0)),
521 Ok(n) => {
522 self.stream.keep_in_buffer(&buf[..n]);
523 Ok(Some(n))
524 }
525 Err(err) if err.kind() == io::ErrorKind::WouldBlock => Ok(None),
526 Err(err) => Err(err),
527 }
528 }
529
530 fn try_read_inner(&mut self, buf: &mut [u8]) -> io::Result<usize> {
532 self.stream.get_mut().set_non_blocking()?;
533
534 let result = self.stream.get_mut().read(buf);
535
536 self.stream.get_mut().set_blocking()?;
539
540 result
541 }
542}
543
544impl<S: Write> Write for TryStream<S> {
545 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
546 self.stream.inner.get_mut().inner.write(buf)
547 }
548
549 fn flush(&mut self) -> io::Result<()> {
550 self.stream.inner.get_mut().inner.flush()
551 }
552
553 fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
554 self.stream.inner.get_mut().inner.write_vectored(bufs)
555 }
556}
557
558impl<R: Read> Read for TryStream<R> {
559 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
560 self.stream.inner.read(buf)
561 }
562}
563
564impl<R: Read> BufRead for TryStream<R> {
565 fn fill_buf(&mut self) -> io::Result<&[u8]> {
566 self.stream.inner.fill_buf()
567 }
568
569 fn consume(&mut self, amt: usize) {
570 self.stream.inner.consume(amt)
571 }
572}
573
574#[derive(Debug)]
575struct ControlledReader<R> {
576 inner: BufReader<BufferedReader<R>>,
577}
578
579impl<R: Read> ControlledReader<R> {
580 fn new(reader: R) -> Self {
581 Self {
582 inner: BufReader::new(BufferedReader::new(reader)),
583 }
584 }
585
586 fn flush_in_buffer(&mut self) {
587 let b = self.inner.buffer().to_vec();
592 self.inner.consume(b.len());
593 self.keep_in_buffer(&b);
594 }
595}
596
597impl<R> ControlledReader<R> {
598 fn keep_in_buffer(&mut self, v: &[u8]) {
599 self.inner.get_mut().buffer.extend(v);
600 }
601
602 fn get_mut(&mut self) -> &mut R {
603 &mut self.inner.get_mut().inner
604 }
605
606 fn get_available(&mut self) -> &[u8] {
607 &self.inner.get_ref().buffer
608 }
609
610 fn consume_available(&mut self, n: usize) {
611 let _ = self.inner.get_mut().buffer.drain(..n);
612 }
613}
614
615#[derive(Debug)]
616struct BufferedReader<R> {
617 inner: R,
618 buffer: Vec<u8>,
619}
620
621impl<R> BufferedReader<R> {
622 fn new(reader: R) -> Self {
623 Self {
624 inner: reader,
625 buffer: Vec::new(),
626 }
627 }
628}
629
630impl<R: Read> Read for BufferedReader<R> {
631 fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
632 if self.buffer.is_empty() {
633 self.inner.read(buf)
634 } else {
635 let n = buf.write(&self.buffer)?;
636 let _ = self.buffer.drain(..n);
637 Ok(n)
638 }
639 }
640}