wiki/generators/
rcpatrol.rs1use std::pin::Pin;
2use std::task::Poll;
3
4use chrono::{DateTime, Duration, Utc};
5use futures_util::{Future, Stream};
6
7use super::{GeneratorStream, RecentChangesGenerator, WikiGenerator};
8use crate::api::RecentChangesResult;
9use crate::req::rc::{ListRc, RcProp, RcType};
10use crate::req::Limit;
11use crate::Bot;
12
13#[pin_project::pin_project(project = StateProj)]
14pub enum State {
15 Stream(#[pin] GeneratorStream<RecentChangesGenerator<Bot>>),
16 Sleep(Pin<Box<tokio::time::Sleep>>),
17}
18
19#[pin_project::pin_project]
20pub struct RecentChangesPatroller {
21 bot: Bot,
22 prev_time: DateTime<Utc>,
23 #[pin]
24 state: State,
25 interval: tokio::time::Duration,
26 errored: bool,
27 prop: RcProp,
28 ty: RcType,
29}
30
31impl RecentChangesPatroller {
32 pub fn new(bot: Bot, interval: tokio::time::Duration, prop: RcProp, ty: RcType) -> Self {
33 let prev_time = Self::now();
34 let state = State::Sleep(Box::pin(tokio::time::sleep(interval)));
35 Self {
36 bot,
37 prev_time,
38 state,
39 interval,
40 errored: false,
41 prop,
42 ty,
43 }
44 }
45 fn now() -> DateTime<Utc> {
46 Utc::now() - Duration::seconds(1)
47 }
48}
49
50impl Stream for RecentChangesPatroller {
51 type Item = crate::Result<RecentChangesResult>;
52 fn poll_next(
53 mut self: std::pin::Pin<&mut Self>,
54 cx: &mut std::task::Context<'_>,
55 ) -> std::task::Poll<Option<Self::Item>> {
56 let this = self.as_mut().project();
57 if *this.errored {
58 return Poll::Ready(None);
59 }
60 match this.state.project() {
61 StateProj::Sleep(s) => match s.as_mut().poll(cx) {
62 Poll::Pending => Poll::Pending,
63 Poll::Ready(()) => {
64 let timestamp = Self::now();
65 let gen = RecentChangesGenerator::new(
66 self.bot.clone(),
67 ListRc {
68 start: Some(timestamp.into()),
69 end: Some(self.prev_time.into()),
70 limit: Limit::Max,
71 prop: self.prop,
72 ty: self.ty,
73 },
74 );
75 self.prev_time = timestamp;
76 self.state = State::Stream(gen.into_stream());
77 self.poll_next(cx)
78 }
79 },
80 StateProj::Stream(s) => match s.poll_next(cx) {
81 Poll::Pending => Poll::Pending,
82 Poll::Ready(None) => {
83 self.state = State::Sleep(Box::pin(tokio::time::sleep(self.interval)));
84 self.poll_next(cx)
85 }
86 Poll::Ready(Some(Err(e))) => {
87 *this.errored = true;
88 Poll::Ready(Some(Err(e)))
89 }
90 Poll::Ready(Some(Ok(i))) => Poll::Ready(Some(Ok(i))),
91 },
92 }
93 }
94}