jquants_api_client/api/shared/traits/
pagination.rs1use std::fmt;
6use std::future::Future;
7
8use async_stream::try_stream;
9use futures::stream;
10use futures::StreamExt;
11use serde::de::DeserializeOwned;
12
13use crate::JQuantsBuilder;
14use crate::JQuantsError;
15
16pub trait HasPaginationKey {
18 fn get_pagination_key(&self) -> Option<&str>;
20}
21
22pub trait MergePage: Sized {
24 fn merge_page(
26 page: Result<Vec<Self>, crate::JQuantsError>,
27 ) -> Result<Self, crate::JQuantsError>;
28}
29
30pub trait Paginatable<R: DeserializeOwned + fmt::Debug + HasPaginationKey + MergePage>:
32 JQuantsBuilder<R> + Clone
33{
34 fn pagination_key(self, pagination_key: impl Into<String>) -> Self;
36
37 fn fetch_pages_stream(self) -> impl stream::Stream<Item = Result<R, JQuantsError>> {
39 let stream = try_stream! {
40 let mut builder = self.clone();
41
42 loop {
43 let response = builder.send_ref().await?;
44 let next_pagination_key = response.get_pagination_key();
45 if let Some(key) = next_pagination_key {
46 builder = builder.pagination_key(key.to_string());
47
48 yield response;
49 continue;
50 } else {
51 yield response;
52 break;
53 }
54 }
55 };
56
57 Box::pin(stream)
58 }
59
60 fn fetch_all(self) -> impl Future<Output = Result<Vec<R>, JQuantsError>> {
62 async {
63 let results: Vec<Result<R, JQuantsError>> = self.fetch_pages_stream().collect().await;
64 let mut final_results = Vec::new();
65 for result in results {
66 match result {
67 Ok(value) => final_results.push(value),
68 Err(e) => return Err(e),
69 }
70 }
71 Ok(final_results)
72 }
73 }
74
75 fn fetch_all_and_merge(self) -> impl Future<Output = Result<R, JQuantsError>> {
77 async {
78 let results = self.fetch_all().await;
79 R::merge_page(results)
80 }
81 }
82}