databend_client/
pages.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::client::QueryState;
16use crate::error::Result;
17use crate::response::QueryResponse;
18use crate::{APIClient, Error, QueryStats, SchemaField};
19use arrow_array::RecordBatch;
20use chrono_tz::Tz;
21use log::debug;
22use parking_lot::Mutex;
23use std::collections::BTreeMap;
24use std::future::Future;
25use std::mem;
26use std::pin::Pin;
27use std::str::FromStr;
28use std::sync::Arc;
29use std::task::{Context, Poll};
30use std::time::Instant;
31use tokio_stream::{Stream, StreamExt};
32
33#[derive(Default)]
34pub struct Page {
35    pub raw_schema: Vec<SchemaField>,
36    pub data: Vec<Vec<Option<String>>>,
37    pub batches: Vec<RecordBatch>,
38    pub stats: QueryStats,
39    pub settings: Option<BTreeMap<String, String>>,
40}
41
42impl Page {
43    pub fn from_response(response: QueryResponse, batches: Vec<RecordBatch>) -> Self {
44        Self {
45            raw_schema: response.schema,
46            data: response.data,
47            stats: response.stats,
48            batches,
49            settings: response.settings,
50        }
51    }
52
53    pub fn update(&mut self, p: Page) {
54        self.raw_schema = p.raw_schema;
55        if self.data.is_empty() {
56            self.data = p.data
57        } else {
58            self.data.extend_from_slice(&p.data);
59        }
60        self.stats = p.stats;
61    }
62}
63
64type PageFut = Pin<Box<dyn Future<Output = Result<(QueryResponse, Vec<RecordBatch>)>> + Send>>;
65
66pub struct Pages {
67    query_id: String,
68    client: Arc<APIClient>,
69    first_page: Option<Page>,
70    need_progress: bool,
71
72    next_page_future: Option<PageFut>,
73    node_id: Option<String>,
74    next_uri: Option<String>,
75
76    result_timeout_secs: Option<u64>,
77    last_access_time: Arc<Mutex<Instant>>,
78}
79
80impl Pages {
81    pub fn new(
82        client: Arc<APIClient>,
83        first_response: QueryResponse,
84        record_batches: Vec<RecordBatch>,
85        need_progress: bool,
86    ) -> Result<Self> {
87        let mut s = Self {
88            query_id: first_response.id.clone(),
89            need_progress,
90            client,
91            next_page_future: None,
92            node_id: first_response.node_id.clone(),
93            first_page: None,
94            next_uri: first_response.next_uri.clone(),
95            result_timeout_secs: first_response.result_timeout_secs,
96            last_access_time: Arc::new(Mutex::new(Instant::now())),
97        };
98        let first_page = Page::from_response(first_response, record_batches);
99        s.first_page = Some(first_page);
100        Ok(s)
101    }
102
103    pub fn add_back(&mut self, page: Page) {
104        self.first_page = Some(page);
105    }
106
107    pub async fn wait_for_schema(
108        mut self,
109        need_progress: bool,
110    ) -> Result<(Self, Vec<SchemaField>, Tz)> {
111        while let Some(page) = self.next().await {
112            let page = page?;
113            if !page.raw_schema.is_empty()
114                || !page.data.is_empty()
115                || !page.batches.is_empty()
116                || (need_progress && page.stats.progresses.has_progress())
117            {
118                let schema = page.raw_schema.clone();
119                let utc = "UTC".to_owned();
120                let timezone = page
121                    .settings
122                    .as_ref()
123                    .and_then(|m| m.get("timezone"))
124                    .unwrap_or(&utc);
125                let timezone = Tz::from_str(timezone).map_err(|e| Error::Decode(e.to_string()))?;
126                self.add_back(page);
127                let last_access_time = self.last_access_time.clone();
128                if let Some(node_id) = &self.node_id {
129                    let state = QueryState {
130                        node_id: node_id.to_string(),
131                        last_access_time,
132                        timeout_secs: self.result_timeout_secs.unwrap_or(60),
133                    };
134                    self.client
135                        .register_query_for_heartbeat(&self.query_id, state)
136                }
137                return Ok((self, schema, timezone));
138            }
139        }
140        Ok((self, vec![], Tz::UTC))
141    }
142}
143
144impl Stream for Pages {
145    type Item = Result<Page>;
146
147    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
148        if let Some(p) = mem::take(&mut self.first_page) {
149            return Poll::Ready(Some(Ok(p)));
150        };
151        match self.next_page_future {
152            Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
153                Poll::Ready(Ok((resp, batches))) => {
154                    self.next_uri = resp.next_uri.clone();
155                    self.next_page_future = None;
156                    if resp.data.is_empty() && !self.need_progress {
157                        self.poll_next(cx)
158                    } else {
159                        let now = Instant::now();
160                        *self.last_access_time.lock() = now;
161                        Poll::Ready(Some(Ok(Page::from_response(resp, batches))))
162                    }
163                }
164                Poll::Ready(Err(e)) => {
165                    self.next_page_future = None;
166                    self.next_uri = None;
167                    Poll::Ready(Some(Err(e)))
168                }
169                Poll::Pending => Poll::Pending,
170            },
171            None => match self.next_uri {
172                Some(ref next_uri) => {
173                    let client = self.client.clone();
174                    let next_uri = next_uri.clone();
175                    let query_id = self.query_id.clone();
176                    let node_id = self.node_id.clone();
177                    self.next_page_future = Some(Box::pin(async move {
178                        client.query_page(&query_id, &next_uri, &node_id).await
179                    }));
180                    self.poll_next(cx)
181                }
182                None => Poll::Ready(None),
183            },
184        }
185    }
186}
187
188impl Drop for Pages {
189    fn drop(&mut self) {
190        if let Some(uri) = &self.next_uri {
191            if uri.contains("/page/") || self.next_page_future.is_none() {
192                debug!("Dropping pages for {}", self.query_id);
193                self.client.finalize_query(&self.query_id)
194            }
195        }
196    }
197}