1use crate::error::Result;
16use crate::response::QueryResponse;
17use crate::{APIClient, QueryStats, SchemaField};
18use std::future::Future;
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use tokio_stream::{Stream, StreamExt};
24
25#[derive(Default)]
26pub struct Page {
27 pub schema: Vec<SchemaField>,
28 pub data: Vec<Vec<Option<String>>>,
29 pub stats: QueryStats,
30}
31
32impl Page {
33 pub fn from_response(response: QueryResponse) -> Self {
34 Self {
35 schema: response.schema,
36 data: response.data,
37 stats: response.stats,
38 }
39 }
40
41 pub fn update(&mut self, p: Page) {
42 self.schema = p.schema;
43 if self.data.is_empty() {
44 self.data = p.data
45 } else {
46 self.data.extend_from_slice(&p.data);
47 }
48 self.stats = p.stats;
49 }
50}
51
52type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
53
54pub struct Pages {
55 query_id: String,
56 client: Arc<APIClient>,
57 first_page: Option<Page>,
58 need_progress: bool,
59
60 next_page_future: Option<PageFut>,
61 node_id: Option<String>,
62 next_uri: Option<String>,
63}
64
65impl Pages {
66 pub fn new(client: Arc<APIClient>, first_response: QueryResponse, need_progress: bool) -> Self {
67 let mut s = Self {
68 query_id: first_response.id.clone(),
69 need_progress,
70 client,
71 next_page_future: None,
72 node_id: first_response.node_id.clone(),
73 first_page: None,
74 next_uri: first_response.next_uri.clone(),
75 };
76 let first_page = Page::from_response(first_response);
77 s.first_page = Some(first_page);
78 s
79 }
80
81 pub fn add_back(&mut self, page: Page) {
82 self.first_page = Some(page);
83 }
84
85 pub async fn wait_for_schema(
86 mut self,
87 need_progress: bool,
88 ) -> Result<(Self, Vec<SchemaField>)> {
89 while let Some(page) = self.next().await {
90 let page = page?;
91 if !page.schema.is_empty()
92 || !page.data.is_empty()
93 || (need_progress && page.stats.progresses.has_progress())
94 {
95 let schema = page.schema.clone();
96 self.add_back(page);
97 return Ok((self, schema));
98 }
99 }
100 Ok((self, vec![]))
101 }
102}
103
104impl Stream for Pages {
105 type Item = Result<Page>;
106
107 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108 if let Some(p) = mem::take(&mut self.first_page) {
109 return Poll::Ready(Some(Ok(p)));
110 };
111 match self.next_page_future {
112 Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
113 Poll::Ready(Ok(resp)) => {
114 self.next_uri = resp.next_uri.clone();
115 self.next_page_future = None;
116 if resp.data.is_empty() && !self.need_progress {
117 self.poll_next(cx)
118 } else {
119 Poll::Ready(Some(Ok(Page::from_response(resp))))
120 }
121 }
122 Poll::Ready(Err(e)) => {
123 self.next_page_future = None;
124 self.next_uri = None;
125 Poll::Ready(Some(Err(e)))
126 }
127 Poll::Pending => Poll::Pending,
128 },
129 None => match self.next_uri {
130 Some(ref next_uri) => {
131 let client = self.client.clone();
132 let next_uri = next_uri.clone();
133 let query_id = self.query_id.clone();
134 let node_id = self.node_id.clone();
135 self.next_page_future = Some(Box::pin(async move {
136 client.query_page(&query_id, &next_uri, &node_id).await
137 }));
138 self.poll_next(cx)
139 }
140 None => Poll::Ready(None),
141 },
142 }
143 }
144}