slack_morphism_hyper/
scroller_ext.rs

1use futures::future::BoxFuture;
2use futures::stream::BoxStream;
3use futures::TryStreamExt;
4use slack_morphism::{
5    ClientResult, SlackApiResponseScroller, SlackApiScrollableResponse, SlackClientHttpConnector,
6    SlackClientSession,
7};
8use std::time::Duration;
9use tokio_stream::StreamExt;
10
11pub trait SlackApiResponseScrollerExt<SCHC, CT, RT, RIT>:
12    SlackApiResponseScroller<SCHC, CursorType = CT, ResponseType = RT, ResponseItemType = RIT>
13where
14    SCHC: SlackClientHttpConnector + Send + Sync,
15    RT: Send + Clone + Sync + SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT>,
16    RIT: Send + Clone,
17{
18    fn collect_items_stream<'a, 's>(
19        &'a self,
20        session: &'a SlackClientSession<'s, SCHC>,
21        throttle_duration: Duration,
22    ) -> BoxFuture<'a, ClientResult<Vec<RIT>>>;
23
24    fn to_items_throttled_stream<'a, 's>(
25        &'a self,
26        session: &'a SlackClientSession<'s, SCHC>,
27        throttle_duration: Duration,
28    ) -> BoxStream<'a, ClientResult<Vec<Self::ResponseItemType>>>;
29}
30
31impl<SCHC, CT, RT, RIT> SlackApiResponseScrollerExt<SCHC, CT, RT, RIT>
32    for dyn SlackApiResponseScroller<SCHC, CursorType = CT, ResponseType = RT, ResponseItemType = RIT>
33        + Send
34        + Sync
35where
36    SCHC: SlackClientHttpConnector + Send + Sync,
37    RT: Send + Clone + Sync + SlackApiScrollableResponse<CursorType = CT, ResponseItemType = RIT>,
38    RIT: Send + Clone,
39{
40    fn collect_items_stream<'a, 's>(
41        &'a self,
42        session: &'a SlackClientSession<'s, SCHC>,
43        throttle_duration: Duration,
44    ) -> BoxFuture<'a, ClientResult<Vec<RIT>>> {
45        Box::pin(
46            self.to_items_throttled_stream(session, throttle_duration)
47                .try_concat(),
48        )
49    }
50
51    fn to_items_throttled_stream<'a, 's>(
52        &'a self,
53        session: &'a SlackClientSession<'s, SCHC>,
54        throttle_duration: Duration,
55    ) -> BoxStream<'a, ClientResult<Vec<Self::ResponseItemType>>> {
56        Box::pin(self.to_items_stream(session).throttle(throttle_duration))
57    }
58}