confab 0.3.1

Asynchronous line-oriented interactive TCP client
use crate::errors::InterfaceError;
use async_stream::stream;
use futures_util::Stream;
use pin_project_lite::pin_project;
use rustyline_async::{Readline, ReadlineError, ReadlineEvent};
use std::future::Future;
use std::pin::Pin;
use std::task::{ready, Context, Poll};
use std::time::Duration;
use tokio::fs::File as TokioFile;
use tokio::io::{AsyncBufReadExt, BufReader, Lines};
use tokio::time::{sleep, Sleep};

#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum Input {
    Line(String),
    CtrlC,
}

pin_project! {
    #[derive(Debug)]
    pub(crate) struct StartupScript {
        #[pin]
        lines: Lines<BufReader<TokioFile>>,
        #[pin]
        nap: Option<Sleep>,
        next_line: Option<Input>,
        delay: Duration,
    }
}

impl StartupScript {
    pub(crate) fn new(reader: BufReader<TokioFile>, delay: Duration) -> StartupScript {
        StartupScript {
            lines: reader.lines(),
            nap: Some(sleep(delay)),
            next_line: None,
            delay,
        }
    }
}

impl Stream for StartupScript {
    type Item = Result<Input, InterfaceError>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        if this.next_line.is_none() {
            match ready!(this.lines.as_mut().poll_next_line(cx)) {
                Ok(Some(line)) => {
                    *this.next_line = Some(Input::Line(line));
                    this.nap.set(Some(sleep(*this.delay)));
                }
                Ok(None) => return None.into(),
                Err(e) => return Some(Err(InterfaceError::ReadScript(e))).into(),
            }
        }
        if let Some(nap) = this.nap.as_mut().as_pin_mut() {
            ready!(nap.poll(cx));
            this.nap.set(None);
        }
        this.next_line.take().map(Ok).into()
    }
}

pub(crate) fn readline_stream(
    rl: &mut Readline,
) -> impl Stream<Item = Result<Input, InterfaceError>> + Send + '_ {
    stream! {
        loop {
            match rl.readline().await {
                Ok(ReadlineEvent::Line(line)) => {
                    rl.add_history_entry(line.clone());
                    yield Ok(Input::Line(line));
                }
                Ok(ReadlineEvent::Eof) | Err(ReadlineError::Closed) => break,
                Ok(ReadlineEvent::Interrupted) => yield Ok(Input::CtrlC),
                Err(ReadlineError::IO(e)) => yield Err(InterfaceError::ReadLine(e)),
            }
        }
    }
}