wiki/generators/
rcpatrol.rs

1use std::pin::Pin;
2use std::task::Poll;
3
4use chrono::{DateTime, Duration, Utc};
5use futures_util::{Future, Stream};
6
7use super::{GeneratorStream, RecentChangesGenerator, WikiGenerator};
8use crate::api::RecentChangesResult;
9use crate::req::rc::{ListRc, RcProp, RcType};
10use crate::req::Limit;
11use crate::Bot;
12
13#[pin_project::pin_project(project = StateProj)]
14pub enum State {
15    Stream(#[pin] GeneratorStream<RecentChangesGenerator<Bot>>),
16    Sleep(Pin<Box<tokio::time::Sleep>>),
17}
18
19#[pin_project::pin_project]
20pub struct RecentChangesPatroller {
21    bot: Bot,
22    prev_time: DateTime<Utc>,
23    #[pin]
24    state: State,
25    interval: tokio::time::Duration,
26    errored: bool,
27    prop: RcProp,
28    ty: RcType,
29}
30
31impl RecentChangesPatroller {
32    pub fn new(bot: Bot, interval: tokio::time::Duration, prop: RcProp, ty: RcType) -> Self {
33        let prev_time = Self::now();
34        let state = State::Sleep(Box::pin(tokio::time::sleep(interval)));
35        Self {
36            bot,
37            prev_time,
38            state,
39            interval,
40            errored: false,
41            prop,
42            ty,
43        }
44    }
45    fn now() -> DateTime<Utc> {
46        Utc::now() - Duration::seconds(1)
47    }
48}
49
50impl Stream for RecentChangesPatroller {
51    type Item = crate::Result<RecentChangesResult>;
52    fn poll_next(
53        mut self: std::pin::Pin<&mut Self>,
54        cx: &mut std::task::Context<'_>,
55    ) -> std::task::Poll<Option<Self::Item>> {
56        let this = self.as_mut().project();
57        if *this.errored {
58            return Poll::Ready(None);
59        }
60        match this.state.project() {
61            StateProj::Sleep(s) => match s.as_mut().poll(cx) {
62                Poll::Pending => Poll::Pending,
63                Poll::Ready(()) => {
64                    let timestamp = Self::now();
65                    let gen = RecentChangesGenerator::new(
66                        self.bot.clone(),
67                        ListRc {
68                            start: Some(timestamp.into()),
69                            end: Some(self.prev_time.into()),
70                            limit: Limit::Max,
71                            prop: self.prop,
72                            ty: self.ty,
73                        },
74                    );
75                    self.prev_time = timestamp;
76                    self.state = State::Stream(gen.into_stream());
77                    self.poll_next(cx)
78                }
79            },
80            StateProj::Stream(s) => match s.poll_next(cx) {
81                Poll::Pending => Poll::Pending,
82                Poll::Ready(None) => {
83                    self.state = State::Sleep(Box::pin(tokio::time::sleep(self.interval)));
84                    self.poll_next(cx)
85                }
86                Poll::Ready(Some(Err(e))) => {
87                    *this.errored = true;
88                    Poll::Ready(Some(Err(e)))
89                }
90                Poll::Ready(Some(Ok(i))) => Poll::Ready(Some(Ok(i))),
91            },
92        }
93    }
94}