databend_client/
pages.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::client::QueryState;
16use crate::error::Result;
17use crate::response::QueryResponse;
18use crate::schema::Schema;
19use crate::settings::ResultFormatSettings;
20use crate::{APIClient, Error, QueryStats, SchemaField};
21use arrow_array::RecordBatch;
22use log::debug;
23use parking_lot::Mutex;
24use std::collections::BTreeMap;
25use std::future::Future;
26use std::mem;
27use std::pin::Pin;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30use std::time::Instant;
31use tokio_stream::{Stream, StreamExt};
32
33#[derive(Default)]
34pub struct Page {
35    pub raw_schema: Vec<SchemaField>,
36    pub data: Vec<Vec<Option<String>>>,
37    pub batches: Vec<RecordBatch>,
38    pub stats: QueryStats,
39    pub settings: Option<BTreeMap<String, String>>,
40}
41
42impl Page {
43    pub fn from_response(response: QueryResponse, batches: Vec<RecordBatch>) -> Self {
44        Self {
45            raw_schema: response.schema,
46            data: response.data,
47            stats: response.stats,
48            batches,
49            settings: response.settings,
50        }
51    }
52
53    pub fn update(&mut self, p: Page) {
54        self.raw_schema = p.raw_schema;
55        if self.data.is_empty() {
56            self.data = p.data
57        } else {
58            self.data.extend_from_slice(&p.data);
59        }
60        if self.batches.is_empty() {
61            self.batches = p.batches;
62        } else {
63            self.batches.extend_from_slice(&p.batches);
64        }
65        self.stats = p.stats;
66    }
67}
68
69type PageFut = Pin<Box<dyn Future<Output = Result<(QueryResponse, Vec<RecordBatch>)>> + Send>>;
70
71pub struct Pages {
72    query_id: String,
73    client: Arc<APIClient>,
74    first_page: Option<Page>,
75    need_progress: bool,
76
77    next_page_future: Option<PageFut>,
78    node_id: Option<String>,
79    next_uri: Option<String>,
80
81    result_timeout_secs: Option<u64>,
82    last_access_time: Arc<Mutex<Instant>>,
83}
84
85impl Pages {
86    pub fn new(
87        client: Arc<APIClient>,
88        first_response: QueryResponse,
89        record_batches: Vec<RecordBatch>,
90        need_progress: bool,
91    ) -> Result<Self> {
92        let mut s = Self {
93            query_id: first_response.id.clone(),
94            need_progress,
95            client,
96            next_page_future: None,
97            node_id: first_response.node_id.clone(),
98            first_page: None,
99            next_uri: first_response.next_uri.clone(),
100            result_timeout_secs: first_response.result_timeout_secs,
101            last_access_time: Arc::new(Mutex::new(Instant::now())),
102        };
103        let first_page = Page::from_response(first_response, record_batches);
104        s.first_page = Some(first_page);
105        Ok(s)
106    }
107
108    pub fn add_back(&mut self, page: Page) {
109        self.first_page = Some(page);
110    }
111
112    pub async fn wait_for_schema(
113        mut self,
114        need_progress: bool,
115    ) -> Result<(Self, Schema, ResultFormatSettings)> {
116        while let Some(page) = self.next().await {
117            let page = page?;
118            if !page.raw_schema.is_empty()
119                || !page.data.is_empty()
120                || !page.batches.is_empty()
121                || (need_progress && page.stats.progresses.has_progress())
122            {
123                let schema: Schema = if !page.batches.is_empty() {
124                    let arrow_schema = page.batches[0].schema().clone();
125                    arrow_schema
126                        .try_into()
127                        .map_err(|e| Error::Decode(format!("fail to decode arrow schema: {e}")))?
128                } else {
129                    let s = page.raw_schema.clone();
130                    s.try_into()
131                        .map_err(|e| Error::Decode(format!("fail to decode string schema: {e}")))?
132                };
133                let settings = ResultFormatSettings::from_map(&page.settings)?;
134
135                self.add_back(page);
136                let last_access_time = self.last_access_time.clone();
137                if let Some(node_id) = &self.node_id {
138                    let state = QueryState {
139                        node_id: node_id.to_string(),
140                        last_access_time,
141                        timeout_secs: self.result_timeout_secs.unwrap_or(60),
142                    };
143                    self.client
144                        .register_query_for_heartbeat(&self.query_id, state)
145                }
146                return Ok((self, schema, settings));
147            }
148        }
149        Ok((self, Schema::default(), ResultFormatSettings::default()))
150    }
151}
152
153impl Stream for Pages {
154    type Item = Result<Page>;
155
156    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157        if let Some(p) = mem::take(&mut self.first_page) {
158            return Poll::Ready(Some(Ok(p)));
159        };
160        match self.next_page_future {
161            Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
162                Poll::Ready(Ok((resp, batches))) => {
163                    self.next_uri = resp.next_uri.clone();
164                    self.next_page_future = None;
165                    if resp.data.is_empty() && !self.need_progress {
166                        self.poll_next(cx)
167                    } else {
168                        let now = Instant::now();
169                        *self.last_access_time.lock() = now;
170                        Poll::Ready(Some(Ok(Page::from_response(resp, batches))))
171                    }
172                }
173                Poll::Ready(Err(e)) => {
174                    self.next_page_future = None;
175                    self.next_uri = None;
176                    Poll::Ready(Some(Err(e)))
177                }
178                Poll::Pending => Poll::Pending,
179            },
180            None => match self.next_uri {
181                Some(ref next_uri) => {
182                    let client = self.client.clone();
183                    let next_uri = next_uri.clone();
184                    let query_id = self.query_id.clone();
185                    let node_id = self.node_id.clone();
186                    self.next_page_future = Some(Box::pin(async move {
187                        client.query_page(&query_id, &next_uri, &node_id).await
188                    }));
189                    self.poll_next(cx)
190                }
191                None => Poll::Ready(None),
192            },
193        }
194    }
195}
196
197impl Drop for Pages {
198    fn drop(&mut self) {
199        if let Some(uri) = &self.next_uri {
200            if uri.contains("/page/") || self.next_page_future.is_none() {
201                debug!("Dropping pages for {}", self.query_id);
202                self.client.finalize_query(&self.query_id)
203            }
204        }
205    }
206}