databend_client/
pages.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::error::Result;
16use crate::response::QueryResponse;
17use crate::{APIClient, QueryStats, SchemaField};
18use std::future::Future;
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use tokio_stream::{Stream, StreamExt};
24
25#[derive(Default)]
26pub struct Page {
27    pub schema: Vec<SchemaField>,
28    pub data: Vec<Vec<Option<String>>>,
29    pub stats: QueryStats,
30}
31
32impl Page {
33    pub fn from_response(response: QueryResponse) -> Self {
34        Self {
35            schema: response.schema,
36            data: response.data,
37            stats: response.stats,
38        }
39    }
40
41    pub fn update(&mut self, p: Page) {
42        self.schema = p.schema;
43        if self.data.is_empty() {
44            self.data = p.data
45        } else {
46            self.data.extend_from_slice(&p.data);
47        }
48        self.stats = p.stats;
49    }
50}
51
52type PageFut = Pin<Box<dyn Future<Output = Result<QueryResponse>> + Send>>;
53
54pub struct Pages {
55    query_id: String,
56    client: Arc<APIClient>,
57    first_page: Option<Page>,
58    need_progress: bool,
59
60    next_page_future: Option<PageFut>,
61    node_id: Option<String>,
62    next_uri: Option<String>,
63}
64
65impl Pages {
66    pub fn new(client: Arc<APIClient>, first_response: QueryResponse, need_progress: bool) -> Self {
67        let mut s = Self {
68            query_id: first_response.id.clone(),
69            need_progress,
70            client,
71            next_page_future: None,
72            node_id: first_response.node_id.clone(),
73            first_page: None,
74            next_uri: first_response.next_uri.clone(),
75        };
76        let first_page = Page::from_response(first_response);
77        s.first_page = Some(first_page);
78        s
79    }
80
81    pub fn add_back(&mut self, page: Page) {
82        self.first_page = Some(page);
83    }
84
85    pub async fn wait_for_schema(
86        mut self,
87        need_progress: bool,
88    ) -> Result<(Self, Vec<SchemaField>)> {
89        while let Some(page) = self.next().await {
90            let page = page?;
91            if !page.schema.is_empty()
92                || !page.data.is_empty()
93                || (need_progress && page.stats.progresses.has_progress())
94            {
95                let schema = page.schema.clone();
96                self.add_back(page);
97                return Ok((self, schema));
98            }
99        }
100        Ok((self, vec![]))
101    }
102}
103
104impl Stream for Pages {
105    type Item = Result<Page>;
106
107    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
108        if let Some(p) = mem::take(&mut self.first_page) {
109            return Poll::Ready(Some(Ok(p)));
110        };
111        match self.next_page_future {
112            Some(ref mut next_page) => match Pin::new(next_page).poll(cx) {
113                Poll::Ready(Ok(resp)) => {
114                    self.next_uri = resp.next_uri.clone();
115                    self.next_page_future = None;
116                    if resp.data.is_empty() && !self.need_progress {
117                        self.poll_next(cx)
118                    } else {
119                        Poll::Ready(Some(Ok(Page::from_response(resp))))
120                    }
121                }
122                Poll::Ready(Err(e)) => {
123                    self.next_page_future = None;
124                    self.next_uri = None;
125                    Poll::Ready(Some(Err(e)))
126                }
127                Poll::Pending => Poll::Pending,
128            },
129            None => match self.next_uri {
130                Some(ref next_uri) => {
131                    let client = self.client.clone();
132                    let next_uri = next_uri.clone();
133                    let query_id = self.query_id.clone();
134                    let node_id = self.node_id.clone();
135                    self.next_page_future = Some(Box::pin(async move {
136                        client.query_page(&query_id, &next_uri, &node_id).await
137                    }));
138                    self.poll_next(cx)
139                }
140                None => Poll::Ready(None),
141            },
142        }
143    }
144}