Skip to main content

databend_client/
pages.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::client::QueryState;
16use crate::error::Result;
17use crate::response::QueryResponse;
18use crate::schema::Schema;
19use crate::settings::{QueryResultFormatSettings, ResultFormatSettings};
20use crate::{APIClient, Error, QueryStats, SchemaField};
21use arrow_array::RecordBatch;
22use log::debug;
23use parking_lot::Mutex;
24use std::future::Future;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29use std::time::Instant;
30use tokio_stream::{Stream, StreamExt};
31
32#[derive(Default)]
33pub struct Page {
34    pub raw_schema: Vec<SchemaField>,
35    pub data: Vec<Vec<Option<String>>>,
36    pub batches: Vec<RecordBatch>,
37    pub stats: QueryStats,
38    pub settings: Option<QueryResultFormatSettings>,
39}
40
41impl Page {
42    pub fn from_response(response: QueryResponse, batches: Vec<RecordBatch>) -> Self {
43        Self {
44            raw_schema: response.schema,
45            data: response.data,
46            stats: response.stats,
47            batches,
48            settings: response.settings,
49        }
50    }
51
52    pub fn update(&mut self, p: Page) {
53        if self.settings.is_none() {
54            self.settings = p.settings.clone();
55        }
56        self.raw_schema = p.raw_schema;
57        if self.data.is_empty() {
58            self.data = p.data
59        } else {
60            self.data.extend_from_slice(&p.data);
61        }
62        if self.batches.is_empty() {
63            self.batches = p.batches;
64        } else {
65            self.batches.extend_from_slice(&p.batches);
66        }
67        self.stats = p.stats;
68    }
69}
70
71type PageFut = Pin<Box<dyn Future<Output = Result<(QueryResponse, Vec<RecordBatch>)>> + Send>>;
72
73pub struct Pages {
74    query_id: String,
75    client: Arc<APIClient>,
76    first_page: Option<Page>,
77    need_progress: bool,
78
79    next_page_future: Option<PageFut>,
80    node_id: Option<String>,
81    next_uri: Option<String>,
82
83    result_timeout_secs: Option<u64>,
84    last_access_time: Arc<Mutex<Instant>>,
85}
86
87impl Pages {
88    pub fn new(
89        client: Arc<APIClient>,
90        first_response: QueryResponse,
91        record_batches: Vec<RecordBatch>,
92        need_progress: bool,
93    ) -> Result<Self> {
94        let mut s = Self {
95            query_id: first_response.id.clone(),
96            need_progress,
97            client,
98            next_page_future: None,
99            node_id: first_response.node_id.clone(),
100            first_page: None,
101            next_uri: first_response.next_uri.clone(),
102            result_timeout_secs: first_response.result_timeout_secs,
103            last_access_time: Arc::new(Mutex::new(Instant::now())),
104        };
105        let first_page = Page::from_response(first_response, record_batches);
106        s.first_page = Some(first_page);
107        Ok(s)
108    }
109
110    pub fn add_back(&mut self, page: Page) {
111        self.first_page = Some(page);
112    }
113
114    pub async fn wait_for_schema(
115        mut self,
116        need_progress: bool,
117    ) -> Result<(Self, Schema, ResultFormatSettings)> {
118        while let Some(page) = self.next().await {
119            let page = page?;
120            if !page.raw_schema.is_empty()
121                || !page.data.is_empty()
122                || !page.batches.is_empty()
123                || (need_progress && page.stats.progresses.has_progress())
124            {
125                let schema: Schema = if !page.batches.is_empty() {
126                    let arrow_schema = page.batches[0].schema().clone();
127                    arrow_schema
128                        .try_into()
129                        .map_err(|e| Error::Decode(format!("fail to decode arrow schema: {e}")))?
130                } else {
131                    let s = page.raw_schema.clone();
132                    s.try_into()
133                        .map_err(|e| Error::Decode(format!("fail to decode string schema: {e}")))?
134                };
135                let settings = ResultFormatSettings::try_from(&page.settings)?;
136
137                self.add_back(page);
138                let last_access_time = self.last_access_time.clone();
139                if let Some(node_id) = &self.node_id {
140                    let state = QueryState {
141                        node_id: node_id.to_string(),
142                        last_access_time,
143                        timeout_secs: self.result_timeout_secs.unwrap_or(60),
144                    };
145                    self.client
146                        .register_query_for_heartbeat(&self.query_id, state)
147                }
148                return Ok((self, schema, settings));
149            }
150        }
151        Ok((self, Schema::default(), ResultFormatSettings::default()))
152    }
153}
154
155impl Stream for Pages {
156    type Item = Result<Page>;
157
158    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
159        if let Some(p) = mem::take(&mut self.first_page) {
160            return Poll::Ready(Some(Ok(p)));
161        };
162        match self.next_page_future {
163            Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
164                Poll::Ready(Ok((resp, batches))) => {
165                    self.next_uri = resp.next_uri.clone();
166                    self.next_page_future = None;
167                    if resp.data.is_empty() && !self.need_progress {
168                        self.poll_next(cx)
169                    } else {
170                        let now = Instant::now();
171                        *self.last_access_time.lock() = now;
172                        Poll::Ready(Some(Ok(Page::from_response(resp, batches))))
173                    }
174                }
175                Poll::Ready(Err(e)) => {
176                    self.next_page_future = None;
177                    self.next_uri = None;
178                    Poll::Ready(Some(Err(e)))
179                }
180                Poll::Pending => Poll::Pending,
181            },
182            None => match self.next_uri {
183                Some(ref next_uri) => {
184                    let client = self.client.clone();
185                    let next_uri = next_uri.clone();
186                    let query_id = self.query_id.clone();
187                    let node_id = self.node_id.clone();
188                    self.next_page_future = Some(Box::pin(async move {
189                        client.query_page(&query_id, &next_uri, &node_id).await
190                    }));
191                    self.poll_next(cx)
192                }
193                None => Poll::Ready(None),
194            },
195        }
196    }
197}
198
199impl Drop for Pages {
200    fn drop(&mut self) {
201        if let Some(uri) = &self.next_uri {
202            if uri.contains("/page/") || self.next_page_future.is_none() {
203                debug!("Dropping pages for {}", self.query_id);
204                self.client.finalize_query(&self.query_id)
205            }
206        }
207    }
208}