crb-superagent 0.0.35

CRB | Composable Runtime Blocks | Agent Extensions
Documentation
use anyhow::{Result, anyhow};
use crb_core::mpsc;
use crb_core::time::{Duration, Instant, Sleep, sleep_until};
use futures::{Future, Stream};
use std::pin::Pin;
use std::task::{Context, Poll};

#[derive(Debug)]
pub struct Timeout {
    pub scheduled_at: Instant,
}

#[derive(Debug)]
pub enum ScheduleCommand {
    Schedule { delay: Duration },
    Cancel,
}

struct PendingEvent {
    baseline: Instant,
    delay: Duration,
}

pub struct TimerStream {
    command_rx: mpsc::UnboundedReceiver<ScheduleCommand>,
    pending: Option<PendingEvent>,
    sleep: Option<Pin<Box<Sleep>>>,
}

pub struct Timer {
    command_tx: mpsc::UnboundedSender<ScheduleCommand>,
    stream: Option<TimerStream>,
}

impl Default for Timer {
    fn default() -> Self {
        Self::new()
    }
}

impl Timer {
    pub fn new() -> Self {
        let (tx, rx) = mpsc::unbounded_channel();
        let stream = TimerStream {
            command_rx: rx,
            pending: None,
            sleep: None,
        };
        Self {
            command_tx: tx,
            stream: Some(stream),
        }
    }

    pub fn schedule(&self, delay: Duration) -> Result<()> {
        self.command_tx
            .send(ScheduleCommand::Schedule { delay })
            .map_err(|_| anyhow!("Can't schedule the task."))
    }

    pub fn cancel(&self) -> Result<()> {
        self.command_tx
            .send(ScheduleCommand::Cancel)
            .map_err(|_| anyhow!("Can't cancel the task."))
    }

    pub fn events(&mut self) -> Result<TimerStream> {
        self.stream
            .take()
            .ok_or_else(|| anyhow!("Timer events stream has detached already."))
    }
}

impl TimerStream {
    fn timeout(&mut self, scheduled_at: Instant) -> Poll<Option<Timeout>> {
        let event = Timeout { scheduled_at };
        self.pending = None;
        self.sleep = None;
        Poll::Ready(Some(event))
    }
}

impl Stream for TimerStream {
    type Item = Timeout;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        loop {
            while let Poll::Ready(cmd) = Pin::new(&mut self.command_rx).poll_recv(cx) {
                match cmd {
                    Some(ScheduleCommand::Schedule { delay }) => {
                        let now = Instant::now();
                        if let Some(pending) = &mut self.pending {
                            pending.delay = delay;
                        } else {
                            self.pending = Some(PendingEvent {
                                baseline: now,
                                delay,
                            });
                        }

                        if let Some(pending) = &self.pending {
                            let scheduled_at = pending.baseline + pending.delay;
                            if scheduled_at <= now {
                                return self.timeout(scheduled_at);
                            } else if let Some(sleep) = &mut self.sleep {
                                sleep.as_mut().reset(scheduled_at.into());
                            } else {
                                self.sleep = Some(Box::pin(sleep_until(scheduled_at.into())));
                            }
                        }
                    }
                    Some(ScheduleCommand::Cancel) => {
                        self.pending = None;
                        self.sleep = None;
                    }
                    None => {
                        // The handle was closed
                        return Poll::Ready(None);
                    }
                }
            }

            if let Some(pending) = &self.pending {
                let scheduled_at = pending.baseline + pending.delay;
                let now = Instant::now();
                if now >= scheduled_at {
                    return self.timeout(scheduled_at);
                }
            }

            if let Some(sleep) = &mut self.sleep {
                match sleep.as_mut().poll(cx) {
                    Poll::Ready(_) => continue,
                    Poll::Pending => break,
                }
            } else {
                break;
            }
        }
        Poll::Pending
    }
}