1use std::collections::VecDeque;
11use std::ops::{Deref, DerefMut};
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::{Duration, Instant};
15
16use crate::Error;
17
18use futures::{
19 future::{BoxFuture, FutureExt},
20 stream::{FusedStream, Stream, StreamExt},
21};
22use futures_timer::Delay;
23use misskey_api::{OffsetPaginationRequest, PaginationItem, PaginationRequest};
24use misskey_core::model::ApiResult;
25use misskey_core::{Client, Request};
26
27const DEFAULT_PAGE_SIZE: u8 = 30;
28
29enum PagerState<'a, C: Client + ?Sized, R: Request> {
30 Pending {
31 request: R,
32 request_future: BoxFuture<'a, Result<ApiResult<R::Response>, C::Error>>,
33 },
34 Ready {
35 next_request: R,
36 },
37}
38
39pub(crate) struct BackwardPager<'a, C: Client + ?Sized, R: PaginationRequest> {
40 client: &'a C,
41 since_id: Option<<R::Item as PaginationItem>::Id>,
42 state: Option<PagerState<'a, C, R>>,
43}
44
45impl<'a, C: Client + ?Sized, R: PaginationRequest> BackwardPager<'a, C, R> {
46 pub(crate) fn with_since_id(
47 client: &'a C,
48 since_id: Option<<R::Item as PaginationItem>::Id>,
49 mut request: R,
50 ) -> Self {
51 request.set_limit(DEFAULT_PAGE_SIZE);
52 BackwardPager {
53 client,
54 since_id,
55 state: Some(PagerState::Ready {
56 next_request: request,
57 }),
58 }
59 }
60
61 pub(crate) fn new(client: &'a C, request: R) -> Self {
62 BackwardPager::with_since_id(client, None, request)
63 }
64}
65
66impl<'a, C, R> Stream for BackwardPager<'a, C, R>
67where
68 C: Client + ?Sized,
69 R: PaginationRequest + Unpin,
70 R::Response: IntoIterator<Item = R::Item>,
71 <R::Item as PaginationItem>::Id: Clone + Unpin,
72{
73 type Item = Result<Vec<R::Item>, Error<C::Error>>;
74
75 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
76 let state = self.state.take();
77 match state {
78 Some(PagerState::Ready { next_request }) => {
79 let request_future = self.client.request(&next_request);
80 self.state = Some(PagerState::Pending {
81 request: next_request,
82 request_future,
83 });
84 self.poll_next(cx)
85 }
86 Some(PagerState::Pending {
87 mut request,
88 mut request_future,
89 }) => {
90 let response = match request_future.poll_unpin(cx) {
91 Poll::Pending => {
92 self.state = Some(PagerState::Pending {
93 request,
94 request_future,
95 });
96 return Poll::Pending;
97 }
98 Poll::Ready(res) => res.map_err(Error::Client)?.into_result()?,
99 };
100 let mut response: Vec<_> = response.into_iter().collect();
101 if let Some(until) = response.last() {
102 request.set_until_id(until.item_id());
103 if let Some(since_id) = self.since_id.take() {
104 request.set_since_id(since_id.clone());
105 response.retain(|item| item.item_id() > since_id);
106 }
107 self.state = Some(PagerState::Ready {
108 next_request: request,
109 });
110 }
111 Poll::Ready(Some(Ok(response)))
112 }
113 None => Poll::Ready(None),
114 }
115 }
116}
117
118impl<'a, C, R> FusedStream for BackwardPager<'a, C, R>
119where
120 C: Client + ?Sized,
121 R: PaginationRequest + Unpin,
122 R::Response: IntoIterator<Item = R::Item>,
123 <R::Item as PaginationItem>::Id: Clone + Unpin,
124{
125 fn is_terminated(&self) -> bool {
126 self.state.is_none()
127 }
128}
129
130impl<'a, C, R> Pager for BackwardPager<'a, C, R>
131where
132 C: Client + ?Sized,
133 R: PaginationRequest + Unpin,
134 R::Response: IntoIterator<Item = R::Item>,
135 <R::Item as PaginationItem>::Id: Clone + Unpin,
136{
137 type Content = R::Item;
138 type Client = C;
139
140 fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
141 match self.state.as_mut() {
142 Some(PagerState::Ready { next_request, .. }) => next_request.set_limit(size),
143 Some(PagerState::Pending { request, .. }) => request.set_limit(size),
144 None => {}
145 }
146 }
147}
148
149pub(crate) struct ForwardPager<'a, C: Client + ?Sized, R: Request> {
150 client: &'a C,
151 state: Option<PagerState<'a, C, R>>,
152}
153
154impl<'a, C: Client + ?Sized, R: PaginationRequest> ForwardPager<'a, C, R> {
155 pub(crate) fn new(client: &'a C, mut request: R) -> Self {
156 request.set_limit(DEFAULT_PAGE_SIZE);
157 ForwardPager {
158 client,
159 state: Some(PagerState::Ready {
160 next_request: request,
161 }),
162 }
163 }
164}
165
166impl<'a, C, R> Stream for ForwardPager<'a, C, R>
167where
168 C: Client + ?Sized,
169 R: PaginationRequest + Unpin,
170 R::Response: IntoIterator<Item = R::Item>,
171{
172 type Item = Result<Vec<R::Item>, Error<C::Error>>;
173
174 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
175 let state = self.state.take();
176 match state {
177 Some(PagerState::Ready { next_request }) => {
178 let request_future = self.client.request(&next_request);
179 self.state = Some(PagerState::Pending {
180 request: next_request,
181 request_future,
182 });
183 self.poll_next(cx)
184 }
185 Some(PagerState::Pending {
186 mut request,
187 mut request_future,
188 }) => {
189 let response = match request_future.poll_unpin(cx) {
190 Poll::Pending => {
191 self.state = Some(PagerState::Pending {
192 request,
193 request_future,
194 });
195 return Poll::Pending;
196 }
197 Poll::Ready(res) => res.map_err(Error::Client)?.into_result()?,
198 };
199 let response: Vec<_> = response.into_iter().collect();
200 if let Some(since) = response.last() {
201 request.set_since_id(since.item_id());
202 self.state = Some(PagerState::Ready {
203 next_request: request,
204 });
205 }
206 Poll::Ready(Some(Ok(response)))
207 }
208 None => Poll::Ready(None),
209 }
210 }
211}
212
213impl<'a, C, R> FusedStream for ForwardPager<'a, C, R>
214where
215 C: Client + ?Sized,
216 R: PaginationRequest + Unpin,
217 R::Response: IntoIterator<Item = R::Item>,
218{
219 fn is_terminated(&self) -> bool {
220 self.state.is_none()
221 }
222}
223
224impl<'a, C, R> Pager for ForwardPager<'a, C, R>
225where
226 C: Client + ?Sized,
227 R: PaginationRequest + Unpin,
228 R::Response: IntoIterator<Item = R::Item>,
229{
230 type Content = R::Item;
231 type Client = C;
232
233 fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
234 match self.state.as_mut() {
235 Some(PagerState::Ready { next_request, .. }) => next_request.set_limit(size),
236 Some(PagerState::Pending { request, .. }) => request.set_limit(size),
237 None => {}
238 }
239 }
240}
241
242pub(crate) struct OffsetPager<'a, C: Client + ?Sized, R: Request> {
243 client: &'a C,
244 state: Option<PagerState<'a, C, R>>,
245 total_count: u64,
246}
247
248impl<'a, C: Client + ?Sized, R: OffsetPaginationRequest> OffsetPager<'a, C, R> {
249 pub(crate) fn new(client: &'a C, mut request: R) -> Self {
250 request.set_limit(DEFAULT_PAGE_SIZE);
251 OffsetPager {
252 client,
253 state: Some(PagerState::Ready {
254 next_request: request,
255 }),
256 total_count: 0,
257 }
258 }
259}
260
261impl<'a, C, R> Stream for OffsetPager<'a, C, R>
262where
263 C: Client + ?Sized,
264 R: OffsetPaginationRequest + Unpin,
265 R::Response: IntoIterator<Item = R::Item>,
266{
267 type Item = Result<Vec<R::Item>, Error<C::Error>>;
268
269 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
270 let state = self.state.take();
271 match state {
272 Some(PagerState::Ready { next_request }) => {
273 let request_future = self.client.request(&next_request);
274 self.state = Some(PagerState::Pending {
275 request: next_request,
276 request_future,
277 });
278 self.poll_next(cx)
279 }
280 Some(PagerState::Pending {
281 mut request,
282 mut request_future,
283 }) => {
284 let response = match request_future.poll_unpin(cx) {
285 Poll::Pending => {
286 self.state = Some(PagerState::Pending {
287 request,
288 request_future,
289 });
290 return Poll::Pending;
291 }
292 Poll::Ready(res) => res.map_err(Error::Client)?.into_result()?,
293 };
294 let response: Vec<_> = response.into_iter().collect();
295 if !response.is_empty() {
296 self.total_count += response.len() as u64;
297 request.set_offset(self.total_count);
298 self.state = Some(PagerState::Ready {
299 next_request: request,
300 });
301 }
302 Poll::Ready(Some(Ok(response)))
303 }
304 None => Poll::Ready(None),
305 }
306 }
307}
308
309impl<'a, C, R> FusedStream for OffsetPager<'a, C, R>
310where
311 C: Client + ?Sized,
312 R: OffsetPaginationRequest + Unpin,
313 R::Response: IntoIterator<Item = R::Item>,
314{
315 fn is_terminated(&self) -> bool {
316 self.state.is_none()
317 }
318}
319
320impl<'a, C, R> Pager for OffsetPager<'a, C, R>
321where
322 C: Client + ?Sized,
323 R: OffsetPaginationRequest + Unpin,
324 R::Response: IntoIterator<Item = R::Item>,
325{
326 type Content = R::Item;
327 type Client = C;
328
329 fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
330 match self.state.as_mut() {
331 Some(PagerState::Ready { next_request, .. }) => next_request.set_limit(size),
332 Some(PagerState::Pending { request, .. }) => request.set_limit(size),
333 None => {}
334 }
335 }
336}
337
338pub trait Pager:
340 Stream<
341 Item = Result<Vec<<Self as Pager>::Content>, Error<<<Self as Pager>::Client as Client>::Error>>,
342>
343{
344 type Content;
346 type Client: Client + ?Sized;
348
349 fn set_page_size(self: Pin<&mut Self>, size: u8);
351}
352
353impl<P: Pager + Unpin + ?Sized> Pager for &mut P {
354 type Content = P::Content;
355 type Client = P::Client;
356
357 fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
358 P::set_page_size(Pin::new(&mut **self), size)
359 }
360}
361
362impl<P: Pager + Unpin + ?Sized> Pager for Box<P> {
363 type Content = P::Content;
364 type Client = P::Client;
365
366 fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
367 P::set_page_size(Pin::new(&mut **self), size)
368 }
369}
370
371impl<P> Pager for Pin<P>
372where
373 P: DerefMut + Unpin,
374 <P as Deref>::Target: Pager,
375{
376 type Content = <<P as Deref>::Target as Pager>::Content;
377 type Client = <<P as Deref>::Target as Pager>::Client;
378
379 fn set_page_size(self: Pin<&mut Self>, size: u8) {
380 <<P as Deref>::Target as Pager>::set_page_size(self.get_mut().as_mut(), size)
381 }
382}
383
384impl<S, F, T> Pager for futures::stream::MapOk<S, F>
385where
386 S: Pager + Unpin,
387 F: FnMut(Vec<<S as Pager>::Content>) -> Vec<T>,
388{
389 type Content = T;
390 type Client = <S as Pager>::Client;
391
392 fn set_page_size(mut self: Pin<&mut Self>, size: u8) {
393 <S as Pager>::set_page_size(Pin::new(&mut *(*self).get_mut()), size)
394 }
395}
396
397pub type BoxPager<'a, C, T> = Pin<
399 Box<
400 dyn Pager<Content = T, Client = C, Item = Result<Vec<T>, Error<<C as Client>::Error>>>
401 + 'a
402 + Send,
403 >,
404>;
405
406enum PagerStreamState<P: Pager> {
407 Ready {
408 item: P::Content,
409 buffer: VecDeque<P::Content>,
410 last_fetch: Instant,
411 },
412 Fetch,
413 Wait(Delay),
414}
415
416pub struct PagerStream<P: Pager> {
418 pager: P,
419 minimum_interval: Duration,
420 state: Option<PagerStreamState<P>>,
421}
422
423impl<P: Pager> PagerStream<P> {
424 pub(crate) fn new(pager: P) -> Self {
425 PagerStream {
426 pager,
427 minimum_interval: Duration::new(0, 0),
428 state: Some(PagerStreamState::Fetch),
429 }
430 }
431
432 pub fn set_page_size(&mut self, size: u8)
434 where
435 P: Unpin,
436 {
437 Pin::new(&mut self.pager).set_page_size(size);
438 }
439
440 pub fn set_interval(&mut self, minimum_interval: Duration) {
445 self.minimum_interval = minimum_interval;
446 }
447
448 pub fn into_inner(self) -> P {
450 self.pager
451 }
452}
453
454impl<P> Stream for PagerStream<P>
455where
456 P: Pager + Unpin,
457 P::Content: Unpin + std::fmt::Debug,
458{
459 type Item = Result<P::Content, Error<<P::Client as Client>::Error>>;
460
461 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
462 let state = self.state.take();
463 match state {
464 Some(PagerStreamState::Ready {
465 item,
466 mut buffer,
467 last_fetch,
468 }) => {
469 if let Some(next) = buffer.pop_front() {
470 self.state = Some(PagerStreamState::Ready {
471 item: next,
472 buffer,
473 last_fetch,
474 });
475 } else {
476 let until = last_fetch + self.minimum_interval;
477 if let Some(duration) = until.checked_duration_since(Instant::now()) {
478 self.state = Some(PagerStreamState::Wait(Delay::new(duration)));
479 } else {
480 self.state = Some(PagerStreamState::Fetch);
481 }
482 }
483 Poll::Ready(Some(Ok(item)))
484 }
485 Some(PagerStreamState::Fetch) => match self.pager.poll_next_unpin(cx) {
486 Poll::Pending => {
487 self.state = Some(PagerStreamState::Fetch);
488 Poll::Pending
489 }
490 Poll::Ready(None) => Poll::Ready(None),
491 Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))),
492 Poll::Ready(Some(Ok(page))) => {
493 let mut buffer: VecDeque<_> = page.into();
494 if let Some(item) = buffer.pop_front() {
495 self.state = Some(PagerStreamState::Ready {
496 item,
497 buffer,
498 last_fetch: Instant::now(),
499 });
500 self.poll_next(cx)
501 } else {
502 Poll::Ready(None)
503 }
504 }
505 },
506 Some(PagerStreamState::Wait(mut delay)) => match delay.poll_unpin(cx) {
507 Poll::Pending => {
508 self.state = Some(PagerStreamState::Wait(delay));
509 Poll::Pending
510 }
511 Poll::Ready(()) => {
512 self.state = Some(PagerStreamState::Fetch);
513 self.poll_next(cx)
514 }
515 },
516 None => Poll::Ready(None),
517 }
518 }
519}
520
521impl<P> FusedStream for PagerStream<P>
522where
523 P: Pager + FusedStream + Unpin,
524 P::Content: Unpin + std::fmt::Debug,
525{
526 fn is_terminated(&self) -> bool {
527 self.state.is_none()
528 }
529}