egg_mode_extras/limits/
stream.rs1use egg_mode::{
2 cursor::{Cursor, CursorIter},
3 error::{Error, Result},
4 tweet::{Timeline, Tweet},
5 Response,
6};
7use futures::{future::LocalBoxFuture, FutureExt, TryFutureExt};
8use serde::de::DeserializeOwned;
9
10pub(crate) type ResponseFuture<'a, T> = LocalBoxFuture<'a, Result<Response<T>>>;
11
12pub trait Pageable<'a> {
13 type Item: 'static;
14 type Page;
15
16 fn load(&mut self) -> ResponseFuture<'a, Self::Page>;
17 fn update(&mut self, response: &mut Response<Self::Page>) -> bool;
18 fn extract(page: Self::Page) -> Vec<Self::Item>;
19}
20
21impl<T: Cursor + DeserializeOwned + 'static> Pageable<'static> for CursorIter<T> {
22 type Item = T::Item;
23 type Page = T;
24
25 fn load(&mut self) -> ResponseFuture<'static, Self::Page> {
26 self.call().boxed_local()
27 }
28 fn update(&mut self, response: &mut Response<Self::Page>) -> bool {
29 self.previous_cursor = response.previous_cursor_id();
30 self.next_cursor = response.next_cursor_id();
31 self.next_cursor == 0
32 }
33 fn extract(page: Self::Page) -> Vec<Self::Item> {
34 page.into_inner()
35 }
36}
37
38pub struct TimelineScrollback {
39 timeline: Option<Timeline>,
40 is_started: bool,
41}
42
43impl TimelineScrollback {
44 pub fn new(timeline: Timeline) -> TimelineScrollback {
45 TimelineScrollback {
46 timeline: Some(timeline),
47 is_started: false,
48 }
49 }
50
51 fn lift_response(
52 pair: (Timeline, Response<Vec<Tweet>>),
53 ) -> Response<(Option<Timeline>, Vec<Tweet>)> {
54 let (timeline, response) = pair;
55 Response::map(response, |tweets| (Some(timeline), tweets))
56 }
57}
58
59impl Pageable<'static> for TimelineScrollback {
60 type Item = Tweet;
61 type Page = (Option<Timeline>, Vec<Tweet>);
62
63 fn load(&mut self) -> ResponseFuture<'static, Self::Page> {
64 if let Some(timeline) = self.timeline.take() {
65 if self.is_started {
66 timeline
67 .older(None)
68 .map_ok(TimelineScrollback::lift_response)
69 .boxed_local()
70 } else {
71 self.is_started = true;
72 timeline
73 .start()
74 .map_ok(TimelineScrollback::lift_response)
75 .boxed_local()
76 }
77 } else {
78 futures::future::err(Error::InvalidResponse("Problem retrieving timeline", None))
80 .boxed_local()
81 }
82 }
83 fn update(&mut self, response: &mut Response<Self::Page>) -> bool {
84 self.timeline = response.response.0.take();
85 response.response.1.is_empty()
86 }
87 fn extract(page: Self::Page) -> Vec<Self::Item> {
88 page.1
89 }
90}