jquants_api_client/api/shared/traits/
pagination.rs

1//! Pagination response module.
2//!
3//! See: [Paging of Responses](https://jpx.gitbook.io/j-quants-en/api-reference/attention#paging-of-responses)
4
5use std::fmt;
6use std::future::Future;
7
8use async_stream::try_stream;
9use futures::stream;
10use futures::StreamExt;
11use serde::de::DeserializeOwned;
12
13use crate::JQuantsBuilder;
14use crate::JQuantsError;
15
16/// Trait for types that have a pagination key.
17pub trait HasPaginationKey {
18    /// Get the pagination key.
19    fn get_pagination_key(&self) -> Option<&str>;
20}
21
22/// Trait for types that can merge pages.
23pub trait MergePage: Sized {
24    /// Merge the pages.
25    fn merge_page(
26        page: Result<Vec<Self>, crate::JQuantsError>,
27    ) -> Result<Self, crate::JQuantsError>;
28}
29
30/// Trait for paginatable responses.
31pub trait Paginatable<R: DeserializeOwned + fmt::Debug + HasPaginationKey + MergePage>:
32    JQuantsBuilder<R> + Clone
33{
34    /// Set the pagination key.
35    fn pagination_key(self, pagination_key: impl Into<String>) -> Self;
36
37    /// Fetch the pages stream.
38    fn fetch_pages_stream(self) -> impl stream::Stream<Item = Result<R, JQuantsError>> {
39        let stream = try_stream! {
40            let mut builder = self.clone();
41
42            loop {
43                let response =  builder.send_ref().await?;
44                let next_pagination_key = response.get_pagination_key();
45                if let Some(key) = next_pagination_key {
46                    builder = builder.pagination_key(key.to_string());
47
48                    yield response;
49                    continue;
50                } else {
51                    yield response;
52                    break;
53                }
54            }
55        };
56
57        Box::pin(stream)
58    }
59
60    /// Fetch all pages.
61    fn fetch_all(self) -> impl Future<Output = Result<Vec<R>, JQuantsError>> {
62        async {
63            let results: Vec<Result<R, JQuantsError>> = self.fetch_pages_stream().collect().await;
64            let mut final_results = Vec::new();
65            for result in results {
66                match result {
67                    Ok(value) => final_results.push(value),
68                    Err(e) => return Err(e),
69                }
70            }
71            Ok(final_results)
72        }
73    }
74
75    /// Fetch all pages and merge them.
76    fn fetch_all_and_merge(self) -> impl Future<Output = Result<R, JQuantsError>> {
77        async {
78            let results = self.fetch_all().await;
79            R::merge_page(results)
80        }
81    }
82}