1use crate::client::QueryState;
16use crate::error::Result;
17use crate::response::QueryResponse;
18use crate::schema::Schema;
19use crate::settings::ResultFormatSettings;
20use crate::{APIClient, Error, QueryStats, SchemaField};
21use arrow_array::RecordBatch;
22use log::debug;
23use parking_lot::Mutex;
24use std::collections::BTreeMap;
25use std::future::Future;
26use std::mem;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30use std::time::Instant;
31use tokio_stream::{Stream, StreamExt};
32
33#[derive(Default)]
34pub struct Page {
35 pub raw_schema: Vec<SchemaField>,
36 pub data: Vec<Vec<Option<String>>>,
37 pub batches: Vec<RecordBatch>,
38 pub stats: QueryStats,
39 pub settings: Option<BTreeMap<String, String>>,
40}
41
42impl Page {
43 pub fn from_response(response: QueryResponse, batches: Vec<RecordBatch>) -> Self {
44 Self {
45 raw_schema: response.schema,
46 data: response.data,
47 stats: response.stats,
48 batches,
49 settings: response.settings,
50 }
51 }
52
53 pub fn update(&mut self, p: Page) {
54 self.raw_schema = p.raw_schema;
55 if self.data.is_empty() {
56 self.data = p.data
57 } else {
58 self.data.extend_from_slice(&p.data);
59 }
60 if self.batches.is_empty() {
61 self.batches = p.batches;
62 } else {
63 self.batches.extend_from_slice(&p.batches);
64 }
65 self.stats = p.stats;
66 }
67}
68
69type PageFut = Pin<Box<dyn Future<Output = Result<(QueryResponse, Vec<RecordBatch>)>> + Send>>;
70
71pub struct Pages {
72 query_id: String,
73 client: Arc<APIClient>,
74 first_page: Option<Page>,
75 need_progress: bool,
76
77 next_page_future: Option<PageFut>,
78 node_id: Option<String>,
79 next_uri: Option<String>,
80
81 result_timeout_secs: Option<u64>,
82 last_access_time: Arc<Mutex<Instant>>,
83}
84
85impl Pages {
86 pub fn new(
87 client: Arc<APIClient>,
88 first_response: QueryResponse,
89 record_batches: Vec<RecordBatch>,
90 need_progress: bool,
91 ) -> Result<Self> {
92 let mut s = Self {
93 query_id: first_response.id.clone(),
94 need_progress,
95 client,
96 next_page_future: None,
97 node_id: first_response.node_id.clone(),
98 first_page: None,
99 next_uri: first_response.next_uri.clone(),
100 result_timeout_secs: first_response.result_timeout_secs,
101 last_access_time: Arc::new(Mutex::new(Instant::now())),
102 };
103 let first_page = Page::from_response(first_response, record_batches);
104 s.first_page = Some(first_page);
105 Ok(s)
106 }
107
108 pub fn add_back(&mut self, page: Page) {
109 self.first_page = Some(page);
110 }
111
112 pub async fn wait_for_schema(
113 mut self,
114 need_progress: bool,
115 ) -> Result<(Self, Schema, ResultFormatSettings)> {
116 while let Some(page) = self.next().await {
117 let page = page?;
118 if !page.raw_schema.is_empty()
119 || !page.data.is_empty()
120 || !page.batches.is_empty()
121 || (need_progress && page.stats.progresses.has_progress())
122 {
123 let schema: Schema = if !page.batches.is_empty() {
124 let arrow_schema = page.batches[0].schema().clone();
125 arrow_schema
126 .try_into()
127 .map_err(|e| Error::Decode(format!("fail to decode arrow schema: {e}")))?
128 } else {
129 let s = page.raw_schema.clone();
130 s.try_into()
131 .map_err(|e| Error::Decode(format!("fail to decode string schema: {e}")))?
132 };
133 let settings = ResultFormatSettings::from_map(&page.settings)?;
134
135 self.add_back(page);
136 let last_access_time = self.last_access_time.clone();
137 if let Some(node_id) = &self.node_id {
138 let state = QueryState {
139 node_id: node_id.to_string(),
140 last_access_time,
141 timeout_secs: self.result_timeout_secs.unwrap_or(60),
142 };
143 self.client
144 .register_query_for_heartbeat(&self.query_id, state)
145 }
146 return Ok((self, schema, settings));
147 }
148 }
149 Ok((self, Schema::default(), ResultFormatSettings::default()))
150 }
151}
152
153impl Stream for Pages {
154 type Item = Result<Page>;
155
156 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157 if let Some(p) = mem::take(&mut self.first_page) {
158 return Poll::Ready(Some(Ok(p)));
159 };
160 match self.next_page_future {
161 Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
162 Poll::Ready(Ok((resp, batches))) => {
163 self.next_uri = resp.next_uri.clone();
164 self.next_page_future = None;
165 if resp.data.is_empty() && !self.need_progress {
166 self.poll_next(cx)
167 } else {
168 let now = Instant::now();
169 *self.last_access_time.lock() = now;
170 Poll::Ready(Some(Ok(Page::from_response(resp, batches))))
171 }
172 }
173 Poll::Ready(Err(e)) => {
174 self.next_page_future = None;
175 self.next_uri = None;
176 Poll::Ready(Some(Err(e)))
177 }
178 Poll::Pending => Poll::Pending,
179 },
180 None => match self.next_uri {
181 Some(ref next_uri) => {
182 let client = self.client.clone();
183 let next_uri = next_uri.clone();
184 let query_id = self.query_id.clone();
185 let node_id = self.node_id.clone();
186 self.next_page_future = Some(Box::pin(async move {
187 client.query_page(&query_id, &next_uri, &node_id).await
188 }));
189 self.poll_next(cx)
190 }
191 None => Poll::Ready(None),
192 },
193 }
194 }
195}
196
197impl Drop for Pages {
198 fn drop(&mut self) {
199 if let Some(uri) = &self.next_uri {
200 if uri.contains("/page/") || self.next_page_future.is_none() {
201 debug!("Dropping pages for {}", self.query_id);
202 self.client.finalize_query(&self.query_id)
203 }
204 }
205 }
206}