1use crate::client::QueryState;
16use crate::error::Result;
17use crate::response::QueryResponse;
18use crate::schema::Schema;
19use crate::settings::{QueryResultFormatSettings, ResultFormatSettings};
20use crate::{APIClient, Error, QueryStats, SchemaField};
21use arrow_array::RecordBatch;
22use log::debug;
23use parking_lot::Mutex;
24use std::future::Future;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use std::time::Instant;
30use tokio_stream::{Stream, StreamExt};
31
32#[derive(Default)]
33pub struct Page {
34 pub raw_schema: Vec<SchemaField>,
35 pub data: Vec<Vec<Option<String>>>,
36 pub batches: Vec<RecordBatch>,
37 pub stats: QueryStats,
38 pub settings: Option<QueryResultFormatSettings>,
39}
40
41impl Page {
42 pub fn from_response(response: QueryResponse, batches: Vec<RecordBatch>) -> Self {
43 Self {
44 raw_schema: response.schema,
45 data: response.data,
46 stats: response.stats,
47 batches,
48 settings: response.settings,
49 }
50 }
51
52 pub fn update(&mut self, p: Page) {
53 if self.settings.is_none() {
54 self.settings = p.settings.clone();
55 }
56 self.raw_schema = p.raw_schema;
57 if self.data.is_empty() {
58 self.data = p.data
59 } else {
60 self.data.extend_from_slice(&p.data);
61 }
62 if self.batches.is_empty() {
63 self.batches = p.batches;
64 } else {
65 self.batches.extend_from_slice(&p.batches);
66 }
67 self.stats = p.stats;
68 }
69}
70
71type PageFut = Pin<Box<dyn Future<Output = Result<(QueryResponse, Vec<RecordBatch>)>> + Send>>;
72
73pub struct Pages {
74 query_id: String,
75 client: Arc<APIClient>,
76 first_page: Option<Page>,
77 need_progress: bool,
78
79 next_page_future: Option<PageFut>,
80 node_id: Option<String>,
81 next_uri: Option<String>,
82
83 result_timeout_secs: Option<u64>,
84 last_access_time: Arc<Mutex<Instant>>,
85}
86
87impl Pages {
88 pub fn new(
89 client: Arc<APIClient>,
90 first_response: QueryResponse,
91 record_batches: Vec<RecordBatch>,
92 need_progress: bool,
93 ) -> Result<Self> {
94 let mut s = Self {
95 query_id: first_response.id.clone(),
96 need_progress,
97 client,
98 next_page_future: None,
99 node_id: first_response.node_id.clone(),
100 first_page: None,
101 next_uri: first_response.next_uri.clone(),
102 result_timeout_secs: first_response.result_timeout_secs,
103 last_access_time: Arc::new(Mutex::new(Instant::now())),
104 };
105 let first_page = Page::from_response(first_response, record_batches);
106 s.first_page = Some(first_page);
107 Ok(s)
108 }
109
110 pub fn add_back(&mut self, page: Page) {
111 self.first_page = Some(page);
112 }
113
114 pub async fn wait_for_schema(
115 mut self,
116 need_progress: bool,
117 ) -> Result<(Self, Schema, ResultFormatSettings)> {
118 while let Some(page) = self.next().await {
119 let page = page?;
120 if !page.raw_schema.is_empty()
121 || !page.data.is_empty()
122 || !page.batches.is_empty()
123 || (need_progress && page.stats.progresses.has_progress())
124 {
125 let schema: Schema = if !page.batches.is_empty() {
126 let arrow_schema = page.batches[0].schema().clone();
127 arrow_schema
128 .try_into()
129 .map_err(|e| Error::Decode(format!("fail to decode arrow schema: {e}")))?
130 } else {
131 let s = page.raw_schema.clone();
132 s.try_into()
133 .map_err(|e| Error::Decode(format!("fail to decode string schema: {e}")))?
134 };
135 let settings = ResultFormatSettings::try_from(&page.settings)?;
136
137 self.add_back(page);
138 let last_access_time = self.last_access_time.clone();
139 if let Some(node_id) = &self.node_id {
140 let state = QueryState {
141 node_id: node_id.to_string(),
142 last_access_time,
143 timeout_secs: self.result_timeout_secs.unwrap_or(60),
144 };
145 self.client
146 .register_query_for_heartbeat(&self.query_id, state)
147 }
148 return Ok((self, schema, settings));
149 }
150 }
151 Ok((self, Schema::default(), ResultFormatSettings::default()))
152 }
153}
154
155impl Stream for Pages {
156 type Item = Result<Page>;
157
158 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159 if let Some(p) = mem::take(&mut self.first_page) {
160 return Poll::Ready(Some(Ok(p)));
161 };
162 match self.next_page_future {
163 Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
164 Poll::Ready(Ok((resp, batches))) => {
165 self.next_uri = resp.next_uri.clone();
166 self.next_page_future = None;
167 if resp.data.is_empty() && !self.need_progress {
168 self.poll_next(cx)
169 } else {
170 let now = Instant::now();
171 *self.last_access_time.lock() = now;
172 Poll::Ready(Some(Ok(Page::from_response(resp, batches))))
173 }
174 }
175 Poll::Ready(Err(e)) => {
176 self.next_page_future = None;
177 self.next_uri = None;
178 Poll::Ready(Some(Err(e)))
179 }
180 Poll::Pending => Poll::Pending,
181 },
182 None => match self.next_uri {
183 Some(ref next_uri) => {
184 let client = self.client.clone();
185 let next_uri = next_uri.clone();
186 let query_id = self.query_id.clone();
187 let node_id = self.node_id.clone();
188 self.next_page_future = Some(Box::pin(async move {
189 client.query_page(&query_id, &next_uri, &node_id).await
190 }));
191 self.poll_next(cx)
192 }
193 None => Poll::Ready(None),
194 },
195 }
196 }
197}
198
199impl Drop for Pages {
200 fn drop(&mut self) {
201 if let Some(uri) = &self.next_uri {
202 if uri.contains("/page/") || self.next_page_future.is_none() {
203 debug!("Dropping pages for {}", self.query_id);
204 self.client.finalize_query(&self.query_id)
205 }
206 }
207 }
208}