misskey_util/
pager.rs

1//! Implementation of pagination.
2//!
3//! This module implements pagination using a request that implements
4//! [`PaginationRequest`][pagination_request].
5//! [`Pager`] trait serves as an alias for [`Stream`][stream] that fetches each page one by
6//! one, and [`PagerStream`] wraps it into [`Stream`][stream] that takes it by element.
7//!
8//! [pagination_request]: misskey_api::PaginationRequest
9//! [stream]: futures::stream::Stream
10use std::collections::VecDeque;
11use std::ops::{Deref, DerefMut};
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15
16use crate::Error;
17
18use futures::{
19    future::{BoxFuture, FutureExt},
20    stream::{FusedStream, Stream, StreamExt},
21};
22use futures_timer::Delay;
23use misskey_api::{OffsetPaginationRequest, PaginationItem, PaginationRequest};
24use misskey_core::model::ApiResult;
25use misskey_core::{Client, Request};
26
27const DEFAULT_PAGE_SIZE: u8 = 30;
28
29enum PagerState<'a, C: Client + ?Sized, R: Request> {
30    Pending {
31        request: R,
32        request_future: BoxFuture<'a, Result<ApiResult<R::Response>, C::Error>>,
33    },
34    Ready {
35        next_request: R,
36    },
37}
38
39pub(crate) struct BackwardPager<'a, C: Client + ?Sized, R: PaginationRequest> {
40    client: &'a C,
41    since_id: Option<<R::Item as PaginationItem>::Id>,
42    state: Option<PagerState<'a, C, R>>,
43}
44
45impl<'a, C: Client + ?Sized, R: PaginationRequest> BackwardPager<'a, C, R> {
46    pub(crate) fn with_since_id(
47        client: &'a C,
48        since_id: Option<<R::Item as PaginationItem>::Id>,
49        mut request: R,
50    ) -> Self {
51        request.set_limit(DEFAULT_PAGE_SIZE);
52        BackwardPager {
53            client,
54            since_id,
55            state: Some(PagerState::Ready {
56                next_request: request,
57            }),
58        }
59    }
60
61    pub(crate) fn new(client: &'a C, request: R) -> Self {
62        BackwardPager::with_since_id(client, None, request)
63    }
64}
65
66impl<'a, C, R> Stream for BackwardPager<'a, C, R>
67where
68    C: Client + ?Sized,
69    R: PaginationRequest + Unpin,
70    R::Response: IntoIterator<Item = R::Item>,
71    <R::Item as PaginationItem>::Id: Clone + Unpin,
72{
73    type Item = Result<Vec<R::Item>, Error<C::Error>>;
74
75    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
76        let state = self.state.take();
77        match state {
78            Some(PagerState::Ready { next_request }) => {
79                let request_future = self.client.request(&next_request);
80                self.state = Some(PagerState::Pending {
81                    request: next_request,
82                    request_future,
83                });
84                self.poll_next(cx)
85            }
86            Some(PagerState::Pending {
87                mut request,
88                mut request_future,
89            }) => {
90                let response = match request_future.poll_unpin(cx) {
91                    Poll::Pending => {
92                        self.state = Some(PagerState::Pending {
93                            request,
94                            request_future,
95                        });
96                        return Poll::Pending;
97                    }
98                    Poll::Ready(res) => res.map_err(Error::Client)?.into_result()?,
99                };
100                let mut response: Vec<_> = response.into_iter().collect();
101                if let Some(until) = response.last() {
102                    request.set_until_id(until.item_id());
103                    if let Some(since_id) = self.since_id.take() {
104                        request.set_since_id(since_id.clone());
105                        response.retain(|item| item.item_id() > since_id);
106                    }
107                    self.state = Some(PagerState::Ready {
108                        next_request: request,
109                    });
110                }
111                Poll::Ready(Some(Ok(response)))
112            }
113            None => Poll::Ready(None),
114        }
115    }
116}
117
118impl<'a, C, R> FusedStream for BackwardPager<'a, C, R>
119where
120    C: Client + ?Sized,
121    R: PaginationRequest + Unpin,
122    R::Response: IntoIterator<Item = R::Item>,
123    <R::Item as PaginationItem>::Id: Clone + Unpin,
124{
125    fn is_terminated(&self) -> bool {
126        self.state.is_none()
127    }
128}
129
130impl<'a, C, R> Pager for BackwardPager<'a, C, R>
131where
132    C: Client + ?Sized,
133    R: PaginationRequest + Unpin,
134    R::Response: IntoIterator<Item = R::Item>,
135    <R::Item as PaginationItem>::Id: Clone + Unpin,
136{
137    type Content = R::Item;
138    type Client = C;
139
140    fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
141        match self.state.as_mut() {
142            Some(PagerState::Ready { next_request, .. }) => next_request.set_limit(size),
143            Some(PagerState::Pending { request, .. }) => request.set_limit(size),
144            None => {}
145        }
146    }
147}
148
149pub(crate) struct ForwardPager<'a, C: Client + ?Sized, R: Request> {
150    client: &'a C,
151    state: Option<PagerState<'a, C, R>>,
152}
153
154impl<'a, C: Client + ?Sized, R: PaginationRequest> ForwardPager<'a, C, R> {
155    pub(crate) fn new(client: &'a C, mut request: R) -> Self {
156        request.set_limit(DEFAULT_PAGE_SIZE);
157        ForwardPager {
158            client,
159            state: Some(PagerState::Ready {
160                next_request: request,
161            }),
162        }
163    }
164}
165
166impl<'a, C, R> Stream for ForwardPager<'a, C, R>
167where
168    C: Client + ?Sized,
169    R: PaginationRequest + Unpin,
170    R::Response: IntoIterator<Item = R::Item>,
171{
172    type Item = Result<Vec<R::Item>, Error<C::Error>>;
173
174    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
175        let state = self.state.take();
176        match state {
177            Some(PagerState::Ready { next_request }) => {
178                let request_future = self.client.request(&next_request);
179                self.state = Some(PagerState::Pending {
180                    request: next_request,
181                    request_future,
182                });
183                self.poll_next(cx)
184            }
185            Some(PagerState::Pending {
186                mut request,
187                mut request_future,
188            }) => {
189                let response = match request_future.poll_unpin(cx) {
190                    Poll::Pending => {
191                        self.state = Some(PagerState::Pending {
192                            request,
193                            request_future,
194                        });
195                        return Poll::Pending;
196                    }
197                    Poll::Ready(res) => res.map_err(Error::Client)?.into_result()?,
198                };
199                let response: Vec<_> = response.into_iter().collect();
200                if let Some(since) = response.last() {
201                    request.set_since_id(since.item_id());
202                    self.state = Some(PagerState::Ready {
203                        next_request: request,
204                    });
205                }
206                Poll::Ready(Some(Ok(response)))
207            }
208            None => Poll::Ready(None),
209        }
210    }
211}
212
213impl<'a, C, R> FusedStream for ForwardPager<'a, C, R>
214where
215    C: Client + ?Sized,
216    R: PaginationRequest + Unpin,
217    R::Response: IntoIterator<Item = R::Item>,
218{
219    fn is_terminated(&self) -> bool {
220        self.state.is_none()
221    }
222}
223
224impl<'a, C, R> Pager for ForwardPager<'a, C, R>
225where
226    C: Client + ?Sized,
227    R: PaginationRequest + Unpin,
228    R::Response: IntoIterator<Item = R::Item>,
229{
230    type Content = R::Item;
231    type Client = C;
232
233    fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
234        match self.state.as_mut() {
235            Some(PagerState::Ready { next_request, .. }) => next_request.set_limit(size),
236            Some(PagerState::Pending { request, .. }) => request.set_limit(size),
237            None => {}
238        }
239    }
240}
241
242pub(crate) struct OffsetPager<'a, C: Client + ?Sized, R: Request> {
243    client: &'a C,
244    state: Option<PagerState<'a, C, R>>,
245    total_count: u64,
246}
247
248impl<'a, C: Client + ?Sized, R: OffsetPaginationRequest> OffsetPager<'a, C, R> {
249    pub(crate) fn new(client: &'a C, mut request: R) -> Self {
250        request.set_limit(DEFAULT_PAGE_SIZE);
251        OffsetPager {
252            client,
253            state: Some(PagerState::Ready {
254                next_request: request,
255            }),
256            total_count: 0,
257        }
258    }
259}
260
261impl<'a, C, R> Stream for OffsetPager<'a, C, R>
262where
263    C: Client + ?Sized,
264    R: OffsetPaginationRequest + Unpin,
265    R::Response: IntoIterator<Item = R::Item>,
266{
267    type Item = Result<Vec<R::Item>, Error<C::Error>>;
268
269    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
270        let state = self.state.take();
271        match state {
272            Some(PagerState::Ready { next_request }) => {
273                let request_future = self.client.request(&next_request);
274                self.state = Some(PagerState::Pending {
275                    request: next_request,
276                    request_future,
277                });
278                self.poll_next(cx)
279            }
280            Some(PagerState::Pending {
281                mut request,
282                mut request_future,
283            }) => {
284                let response = match request_future.poll_unpin(cx) {
285                    Poll::Pending => {
286                        self.state = Some(PagerState::Pending {
287                            request,
288                            request_future,
289                        });
290                        return Poll::Pending;
291                    }
292                    Poll::Ready(res) => res.map_err(Error::Client)?.into_result()?,
293                };
294                let response: Vec<_> = response.into_iter().collect();
295                if !response.is_empty() {
296                    self.total_count += response.len() as u64;
297                    request.set_offset(self.total_count);
298                    self.state = Some(PagerState::Ready {
299                        next_request: request,
300                    });
301                }
302                Poll::Ready(Some(Ok(response)))
303            }
304            None => Poll::Ready(None),
305        }
306    }
307}
308
309impl<'a, C, R> FusedStream for OffsetPager<'a, C, R>
310where
311    C: Client + ?Sized,
312    R: OffsetPaginationRequest + Unpin,
313    R::Response: IntoIterator<Item = R::Item>,
314{
315    fn is_terminated(&self) -> bool {
316        self.state.is_none()
317    }
318}
319
320impl<'a, C, R> Pager for OffsetPager<'a, C, R>
321where
322    C: Client + ?Sized,
323    R: OffsetPaginationRequest + Unpin,
324    R::Response: IntoIterator<Item = R::Item>,
325{
326    type Content = R::Item;
327    type Client = C;
328
329    fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
330        match self.state.as_mut() {
331            Some(PagerState::Ready { next_request, .. }) => next_request.set_limit(size),
332            Some(PagerState::Pending { request, .. }) => request.set_limit(size),
333            None => {}
334        }
335    }
336}
337
338/// A stream of pages..
339pub trait Pager:
340    Stream<
341    Item = Result<Vec<<Self as Pager>::Content>, Error<<<Self as Pager>::Client as Client>::Error>>,
342>
343{
344    /// Values yielded by the pager.
345    type Content;
346    /// [`Client`][`misskey_core::Client`] type used in the pager.
347    type Client: Client + ?Sized;
348
349    /// Sets the number of items to be fetched at once.
350    fn set_page_size(self: Pin<&mut Self>, size: u8);
351}
352
353impl<P: Pager + Unpin + ?Sized> Pager for &mut P {
354    type Content = P::Content;
355    type Client = P::Client;
356
357    fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
358        P::set_page_size(Pin::new(&mut **self), size)
359    }
360}
361
362impl<P: Pager + Unpin + ?Sized> Pager for Box<P> {
363    type Content = P::Content;
364    type Client = P::Client;
365
366    fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
367        P::set_page_size(Pin::new(&mut **self), size)
368    }
369}
370
371impl<P> Pager for Pin<P>
372where
373    P: DerefMut + Unpin,
374    <P as Deref>::Target: Pager,
375{
376    type Content = <<P as Deref>::Target as Pager>::Content;
377    type Client = <<P as Deref>::Target as Pager>::Client;
378
379    fn set_page_size(self: Pin<&mut Self>, size: u8) {
380        <<P as Deref>::Target as Pager>::set_page_size(self.get_mut().as_mut(), size)
381    }
382}
383
384impl<S, F, T> Pager for futures::stream::MapOk<S, F>
385where
386    S: Pager + Unpin,
387    F: FnMut(Vec<<S as Pager>::Content>) -> Vec<T>,
388{
389    type Content = T;
390    type Client = <S as Pager>::Client;
391
392    fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
393        <S as Pager>::set_page_size(Pin::new(&mut *(*self).get_mut()), size)
394    }
395}
396
397/// An owned dynamically typed [`Pager`].
398pub type BoxPager<'a, C, T> = Pin<
399    Box<
400        dyn Pager<Content = T, Client = C, Item = Result<Vec<T>, Error<<C as Client>::Error>>>
401            + 'a
402            + Send,
403    >,
404>;
405
406enum PagerStreamState<P: Pager> {
407    Ready {
408        item: P::Content,
409        buffer: VecDeque<P::Content>,
410        last_fetch: Instant,
411    },
412    Fetch,
413    Wait(Delay),
414}
415
416/// A stream of elements in [`Pager`].
417pub struct PagerStream<P: Pager> {
418    pager: P,
419    minimum_interval: Duration,
420    state: Option<PagerStreamState<P>>,
421}
422
423impl<P: Pager> PagerStream<P> {
424    pub(crate) fn new(pager: P) -> Self {
425        PagerStream {
426            pager,
427            minimum_interval: Duration::new(0, 0),
428            state: Some(PagerStreamState::Fetch),
429        }
430    }
431
432    /// Sets the number of items to be fetched at once by the inner pager.
433    pub fn set_page_size(&mut self, size: u8)
434    where
435        P: Unpin,
436    {
437        Pin::new(&mut self.pager).set_page_size(size);
438    }
439
440    /// Sets the minimum time interval between pagination.
441    ///
442    /// It is recommended to set this to reduce the load on the
443    /// server if you expect a lot of pagination.
444    pub fn set_interval(&mut self, minimum_interval: Duration) {
445        self.minimum_interval = minimum_interval;
446    }
447
448    /// Returns the inner pager.
449    pub fn into_inner(self) -> P {
450        self.pager
451    }
452}
453
454impl<P> Stream for PagerStream<P>
455where
456    P: Pager + Unpin,
457    P::Content: Unpin + std::fmt::Debug,
458{
459    type Item = Result<P::Content, Error<<P::Client as Client>::Error>>;
460
461    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
462        let state = self.state.take();
463        match state {
464            Some(PagerStreamState::Ready {
465                item,
466                mut buffer,
467                last_fetch,
468            }) => {
469                if let Some(next) = buffer.pop_front() {
470                    self.state = Some(PagerStreamState::Ready {
471                        item: next,
472                        buffer,
473                        last_fetch,
474                    });
475                } else {
476                    let until = last_fetch + self.minimum_interval;
477                    if let Some(duration) = until.checked_duration_since(Instant::now()) {
478                        self.state = Some(PagerStreamState::Wait(Delay::new(duration)));
479                    } else {
480                        self.state = Some(PagerStreamState::Fetch);
481                    }
482                }
483                Poll::Ready(Some(Ok(item)))
484            }
485            Some(PagerStreamState::Fetch) => match self.pager.poll_next_unpin(cx) {
486                Poll::Pending => {
487                    self.state = Some(PagerStreamState::Fetch);
488                    Poll::Pending
489                }
490                Poll::Ready(None) => Poll::Ready(None),
491                Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
492                Poll::Ready(Some(Ok(page))) => {
493                    let mut buffer: VecDeque<_> = page.into();
494                    if let Some(item) = buffer.pop_front() {
495                        self.state = Some(PagerStreamState::Ready {
496                            item,
497                            buffer,
498                            last_fetch: Instant::now(),
499                        });
500                        self.poll_next(cx)
501                    } else {
502                        Poll::Ready(None)
503                    }
504                }
505            },
506            Some(PagerStreamState::Wait(mut delay)) => match delay.poll_unpin(cx) {
507                Poll::Pending => {
508                    self.state = Some(PagerStreamState::Wait(delay));
509                    Poll::Pending
510                }
511                Poll::Ready(()) => {
512                    self.state = Some(PagerStreamState::Fetch);
513                    self.poll_next(cx)
514                }
515            },
516            None => Poll::Ready(None),
517        }
518    }
519}
520
521impl<P> FusedStream for PagerStream<P>
522where
523    P: Pager + FusedStream + Unpin,
524    P::Content: Unpin + std::fmt::Debug,
525{
526    fn is_terminated(&self) -> bool {
527        self.state.is_none()
528    }
529}