1use crate::client::QueryState;
16use crate::error::Result;
17use crate::response::QueryResponse;
18use crate::{APIClient, Error, QueryStats, SchemaField};
19use arrow_array::RecordBatch;
20use chrono_tz::Tz;
21use log::debug;
22use parking_lot::Mutex;
23use std::collections::BTreeMap;
24use std::future::Future;
25use std::mem;
26use std::pin::Pin;
27use std::str::FromStr;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30use std::time::Instant;
31use tokio_stream::{Stream, StreamExt};
32
33#[derive(Default)]
34pub struct Page {
35 pub raw_schema: Vec<SchemaField>,
36 pub data: Vec<Vec<Option<String>>>,
37 pub batches: Vec<RecordBatch>,
38 pub stats: QueryStats,
39 pub settings: Option<BTreeMap<String, String>>,
40}
41
42impl Page {
43 pub fn from_response(response: QueryResponse, batches: Vec<RecordBatch>) -> Self {
44 Self {
45 raw_schema: response.schema,
46 data: response.data,
47 stats: response.stats,
48 batches,
49 settings: response.settings,
50 }
51 }
52
53 pub fn update(&mut self, p: Page) {
54 self.raw_schema = p.raw_schema;
55 if self.data.is_empty() {
56 self.data = p.data
57 } else {
58 self.data.extend_from_slice(&p.data);
59 }
60 self.stats = p.stats;
61 }
62}
63
64type PageFut = Pin<Box<dyn Future<Output = Result<(QueryResponse, Vec<RecordBatch>)>> + Send>>;
65
66pub struct Pages {
67 query_id: String,
68 client: Arc<APIClient>,
69 first_page: Option<Page>,
70 need_progress: bool,
71
72 next_page_future: Option<PageFut>,
73 node_id: Option<String>,
74 next_uri: Option<String>,
75
76 result_timeout_secs: Option<u64>,
77 last_access_time: Arc<Mutex<Instant>>,
78}
79
80impl Pages {
81 pub fn new(
82 client: Arc<APIClient>,
83 first_response: QueryResponse,
84 record_batches: Vec<RecordBatch>,
85 need_progress: bool,
86 ) -> Result<Self> {
87 let mut s = Self {
88 query_id: first_response.id.clone(),
89 need_progress,
90 client,
91 next_page_future: None,
92 node_id: first_response.node_id.clone(),
93 first_page: None,
94 next_uri: first_response.next_uri.clone(),
95 result_timeout_secs: first_response.result_timeout_secs,
96 last_access_time: Arc::new(Mutex::new(Instant::now())),
97 };
98 let first_page = Page::from_response(first_response, record_batches);
99 s.first_page = Some(first_page);
100 Ok(s)
101 }
102
103 pub fn add_back(&mut self, page: Page) {
104 self.first_page = Some(page);
105 }
106
107 pub async fn wait_for_schema(
108 mut self,
109 need_progress: bool,
110 ) -> Result<(Self, Vec<SchemaField>, Tz)> {
111 while let Some(page) = self.next().await {
112 let page = page?;
113 if !page.raw_schema.is_empty()
114 || !page.data.is_empty()
115 || !page.batches.is_empty()
116 || (need_progress && page.stats.progresses.has_progress())
117 {
118 let schema = page.raw_schema.clone();
119 let utc = "UTC".to_owned();
120 let timezone = page
121 .settings
122 .as_ref()
123 .and_then(|m| m.get("timezone"))
124 .unwrap_or(&utc);
125 let timezone = Tz::from_str(timezone).map_err(|e| Error::Decode(e.to_string()))?;
126 self.add_back(page);
127 let last_access_time = self.last_access_time.clone();
128 if let Some(node_id) = &self.node_id {
129 let state = QueryState {
130 node_id: node_id.to_string(),
131 last_access_time,
132 timeout_secs: self.result_timeout_secs.unwrap_or(60),
133 };
134 self.client
135 .register_query_for_heartbeat(&self.query_id, state)
136 }
137 return Ok((self, schema, timezone));
138 }
139 }
140 Ok((self, vec![], Tz::UTC))
141 }
142}
143
144impl Stream for Pages {
145 type Item = Result<Page>;
146
147 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148 if let Some(p) = mem::take(&mut self.first_page) {
149 return Poll::Ready(Some(Ok(p)));
150 };
151 match self.next_page_future {
152 Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
153 Poll::Ready(Ok((resp, batches))) => {
154 self.next_uri = resp.next_uri.clone();
155 self.next_page_future = None;
156 if resp.data.is_empty() && !self.need_progress {
157 self.poll_next(cx)
158 } else {
159 let now = Instant::now();
160 *self.last_access_time.lock() = now;
161 Poll::Ready(Some(Ok(Page::from_response(resp, batches))))
162 }
163 }
164 Poll::Ready(Err(e)) => {
165 self.next_page_future = None;
166 self.next_uri = None;
167 Poll::Ready(Some(Err(e)))
168 }
169 Poll::Pending => Poll::Pending,
170 },
171 None => match self.next_uri {
172 Some(ref next_uri) => {
173 let client = self.client.clone();
174 let next_uri = next_uri.clone();
175 let query_id = self.query_id.clone();
176 let node_id = self.node_id.clone();
177 self.next_page_future = Some(Box::pin(async move {
178 client.query_page(&query_id, &next_uri, &node_id).await
179 }));
180 self.poll_next(cx)
181 }
182 None => Poll::Ready(None),
183 },
184 }
185 }
186}
187
188impl Drop for Pages {
189 fn drop(&mut self) {
190 if let Some(uri) = &self.next_uri {
191 if uri.contains("/page/") || self.next_page_future.is_none() {
192 debug!("Dropping pages for {}", self.query_id);
193 self.client.finalize_query(&self.query_id)
194 }
195 }
196 }
197}