ogle 2.1.8

Execute a command periodically, showing the output only when it changes
Documentation
// Copyright (C) 2025 Leandro Lisboa Penz <lpenz@lpenz.org>
// This file is subject to the terms and conditions defined in
// file 'LICENSE', which is part of this source code package.

//! Main lower-level module that takes care of running the command and
//! yielding all possible events into a coherent stream of timestamped
//! events.

use color_eyre::Result;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_stream::Stream;
use tokio_stream::wrappers::IntervalStream;
use tracing::instrument;

use crate::process_wrapper;
use crate::process_wrapper::Cmd;
use crate::process_wrapper::ExitSts;
use crate::process_wrapper::ProcessStream;
use crate::sys::SysApi;
use crate::time_wrapper::Duration;
use crate::time_wrapper::Instant;
use crate::user_wrapper::UserStream;

// EData, EItem //////////////////////////////////////////////////////

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EData {
    StartRun,
    StartSleep(Instant),
    LineOut(String),
    LineErr(String),
    Done(ExitSts),
    Err(std::io::ErrorKind),
    Tick,
}

impl From<process_wrapper::Item> for EData {
    fn from(item: process_wrapper::Item) -> Self {
        match item {
            process_wrapper::Item::Stdout(l) => EData::LineOut(l),
            process_wrapper::Item::Stderr(l) => EData::LineErr(l),
            process_wrapper::Item::Done(Ok(sts)) => EData::Done(sts),
            process_wrapper::Item::Done(Err(e)) => EData::Err(e),
        }
    }
}

impl From<std::io::ErrorKind> for EData {
    fn from(e: std::io::ErrorKind) -> Self {
        EData::Err(e)
    }
}

impl From<std::io::Error> for EData {
    fn from(e: std::io::Error) -> Self {
        e.kind().into()
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EItem {
    pub time: Instant,
    pub data: EData,
}

impl EItem {
    pub fn new<D>(time: Instant, data: D) -> EItem
    where
        D: Into<EData>,
    {
        Self {
            time,
            data: data.into(),
        }
    }
}

// Engine ////////////////////////////////////////////////////////////

#[derive(Debug, Default)]
enum State {
    /// State where we start the process on the next iteration.
    #[default]
    Start,
    StartSleeping,
    /// The process is running and we are yielding lines and ticks.
    Running {
        /// Events coming from the running process
        process: ProcessStream,
        /// Tick events generated by the [`IntervalStream`] timer
        ticker: IntervalStream,
    },
    /// Sleeping between two process executions, yielding ticks.
    Sleeping {
        /// When to wake up
        deadline: Instant,
        /// 1s ticker
        ticker: IntervalStream,
    },
    /// Don't execute the process again, either because of an exit
    /// condition or an error.
    Done,
}

#[pin_project]
#[derive(Default, Debug)]
pub struct Engine<SI: SysApi> {
    sys: SI,
    cmd: Cmd,
    refresh: Duration,
    sleep: Duration,
    exit_on_success: bool,
    exit_on_failure: bool,
    state: State,
    user: Option<UserStream>,
    exit_by_user: bool,
}

impl<SI: SysApi> Engine<SI> {
    pub fn new(mut sys: SI, cmd: Cmd, refresh: Duration, sleep: Duration) -> Result<Self> {
        let user_stream = sys.user_stream();
        Ok(Self {
            sys,
            cmd,
            refresh,
            sleep,
            exit_on_success: false,
            exit_on_failure: false,
            state: State::Start,
            user: user_stream,
            exit_by_user: false,
        })
    }

    fn run(&mut self) -> std::result::Result<(), std::io::Error> {
        let process = self.sys.run_command(self.cmd.clone())?;
        let ticker = IntervalStream::new(self.refresh.into());
        self.state = State::Running { process, ticker };
        Ok(())
    }

    fn sleep(&mut self, now: Instant) -> EItem {
        let deadline = &now + &self.sleep;
        let ticker = IntervalStream::new(Duration::seconds(1).into());
        self.state = State::Sleeping { deadline, ticker };
        EItem::new(now, EData::StartSleep(deadline))
    }
}

impl<SI: SysApi> Stream for Engine<SI> {
    type Item = EItem;

    #[instrument(level = "debug", ret, skip(cx))]
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.as_mut().project();
        let now = this.sys.now();
        if let Some(user) = this.user {
            match Pin::new(user).poll_next(cx) {
                Poll::Ready(Some(s)) if s == "q" => {
                    *this.exit_by_user = true;
                    *this.user = None;
                }
                Poll::Ready(Some(_)) => {}
                Poll::Ready(None) => {
                    *this.user = None;
                }
                Poll::Pending => {}
            };
        }
        let mut state = std::mem::take(&mut *this.state);
        return match state {
            State::Start => match self.run() {
                Ok(_) => Poll::Ready(Some(EItem::new(now, EData::StartRun))),
                Err(e) => Poll::Ready(Some(EItem::new(now, e))),
            },
            State::StartSleeping => {
                let item = self.sleep(now);
                Poll::Ready(Some(item))
            }
            State::Sleeping {
                deadline,
                ref mut ticker,
            } => {
                if *this.exit_by_user {
                    *this.state = State::Done;
                    Poll::Ready(None)
                } else if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
                    let tick = EData::Tick;
                    if now < deadline {
                        *this.state = state;
                        Poll::Ready(Some(EItem::new(now, tick)))
                    } else {
                        *this.state = State::Start;
                        Poll::Ready(Some(EItem::new(now, tick)))
                    }
                } else {
                    *this.state = state;
                    Poll::Pending
                }
            }
            State::Running {
                ref mut process,
                ref mut ticker,
            } => match Pin::new(process).poll_next(cx) {
                Poll::Ready(Some(item)) => match item {
                    process_wrapper::Item::Stdout(_) => {
                        *this.state = state;
                        Poll::Ready(Some(EItem::new(now, item)))
                    }
                    process_wrapper::Item::Stderr(_) => {
                        *this.state = state;
                        Poll::Ready(Some(EItem::new(now, item)))
                    }
                    process_wrapper::Item::Done(Ok(ref exitsts)) => {
                        let success = exitsts.success();
                        if *this.exit_by_user
                            || success && *this.exit_on_success
                            || !success && *this.exit_on_failure
                        {
                            *this.state = State::Done;
                        } else {
                            *this.state = State::StartSleeping;
                        }
                        Poll::Ready(Some(EItem::new(now, item)))
                    }
                    process_wrapper::Item::Done(Err(e)) => {
                        *this.state = State::Done;
                        Poll::Ready(Some(EItem::new(now, e)))
                    }
                },
                Poll::Ready(None) => {
                    #[cfg(not(test))]
                    panic!("We should never see the underlying stream end");
                    #[cfg(test)]
                    {
                        *this.state = State::Done;
                        Poll::Ready(None)
                    }
                }
                Poll::Pending => {
                    // Process doesn't have an item, it must be the ticker
                    if let Poll::Ready(Some(_)) = Pin::new(ticker).poll_next(cx) {
                        *this.state = state;
                        Poll::Ready(Some(EItem::new(now, EData::Tick)))
                    } else {
                        *this.state = state;
                        Poll::Pending
                    }
                }
            },
            State::Done => {
                *this.state = state;
                Poll::Ready(None)
            }
        };
    }
}

// Tests /////////////////////////////////////////////////////////////

#[cfg(test)]
mod tests {
    use color_eyre::Result;
    use std::io;
    use tokio_stream::StreamExt;

    use crate::process_wrapper::Item;
    use crate::sys::SysVirtual;
    use crate::time_wrapper::Instant;

    use super::*;

    impl Engine<SysVirtual> {
        pub fn new_virtual(
            mut sys: SysVirtual,
            exit_on_success: bool,
            exit_on_failure: bool,
        ) -> Result<Self> {
            let user_stream = sys.user_stream();
            Ok(Self {
                sys,
                cmd: Cmd::default(),
                refresh: Duration::INFINITE,
                sleep: Duration::INFINITE,
                exit_on_success,
                exit_on_failure,
                state: State::Start,
                user: user_stream,
                exit_by_user: false,
            })
        }
    }

    #[tokio::test]
    async fn test_basic_success() -> Result<()> {
        let list = vec![
            Item::Stdout("stdout".into()),
            Item::Stderr("stderr".into()),
            Item::Done(Ok(ExitSts::default())),
        ];
        let mut sys = SysVirtual::default();
        sys.set_items(list.clone());
        let streamer = Engine::new_virtual(sys, true, true)?;
        let streamed = streamer.collect::<Vec<_>>().await;
        let mut now = Instant::default();
        assert_eq!(
            streamed,
            vec![
                EItem {
                    time: now.incr(),
                    data: EData::StartRun
                },
                EItem {
                    time: now.incr(),
                    data: EData::LineOut("stdout".to_owned())
                },
                EItem {
                    time: now.incr(),
                    data: EData::LineErr("stderr".to_owned())
                },
                EItem {
                    time: now.incr(),
                    data: EData::Done(ExitSts::default())
                }
            ]
        );
        Ok(())
    }

    #[tokio::test]
    async fn test_done_err() -> Result<()> {
        let list = vec![Item::Done(Err(io::ErrorKind::UnexpectedEof))];
        let mut sys = SysVirtual::default();
        sys.set_items(list.clone());
        let streamer = Engine::new_virtual(sys, false, false)?;
        let streamed = streamer.collect::<Vec<_>>().await;
        let mut now = Instant::default();
        assert_eq!(
            streamed,
            vec![
                EItem {
                    time: now.incr(),
                    data: EData::StartRun,
                },
                EItem {
                    time: now.incr(),
                    data: EData::Err(io::ErrorKind::UnexpectedEof)
                }
            ]
        );
        Ok(())
    }
}