Skip to main content

databend_driver_core/
rows.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::task::Context;
17use std::task::Poll;
18
19use serde::Deserialize;
20use tokio_stream::{Stream, StreamExt};
21
22use crate::error::{Error, Result};
23use crate::value::Value;
24use arrow::record_batch::RecordBatch;
25use databend_client::schema::SchemaRef;
26use databend_client::ResultFormatSettings;
27
28#[derive(Clone, Debug)]
29pub enum RowWithStats {
30    Row(Row),
31    Stats(ServerStats),
32}
33
34#[derive(Deserialize, Clone, Debug, Default)]
35pub struct ServerStats {
36    #[serde(default)]
37    pub total_rows: usize,
38    #[serde(default)]
39    pub total_bytes: usize,
40
41    #[serde(default)]
42    pub read_rows: usize,
43    #[serde(default)]
44    pub read_bytes: usize,
45
46    #[serde(default)]
47    pub write_rows: usize,
48    #[serde(default)]
49    pub write_bytes: usize,
50
51    #[serde(default)]
52    pub running_time_ms: f64,
53
54    #[serde(default)]
55    pub spill_file_nums: usize,
56
57    #[serde(default)]
58    pub spill_bytes: usize,
59}
60
61impl ServerStats {
62    pub fn normalize(&mut self) {
63        if self.total_rows == 0 {
64            self.total_rows = self.read_rows;
65        }
66        if self.total_bytes == 0 {
67            self.total_bytes = self.read_bytes;
68        }
69    }
70
71    pub fn merge(&mut self, other: &ServerStats) {
72        self.total_rows += other.total_rows;
73        self.total_bytes += other.total_bytes;
74        self.read_rows += other.read_rows;
75        self.read_bytes += other.read_bytes;
76        self.write_rows += other.write_rows;
77        self.write_bytes += other.write_bytes;
78        self.running_time_ms += other.running_time_ms;
79        self.spill_file_nums += other.spill_file_nums;
80        self.spill_bytes += other.spill_bytes;
81    }
82}
83
84impl From<databend_client::QueryStats> for ServerStats {
85    fn from(stats: databend_client::QueryStats) -> Self {
86        let mut p = Self {
87            total_rows: 0,
88            total_bytes: 0,
89            read_rows: stats.progresses.scan_progress.rows,
90            read_bytes: stats.progresses.scan_progress.bytes,
91            write_rows: stats.progresses.write_progress.rows,
92            write_bytes: stats.progresses.write_progress.bytes,
93            spill_file_nums: stats.progresses.spill_progress.file_nums,
94            spill_bytes: stats.progresses.spill_progress.bytes,
95            running_time_ms: stats.running_time_ms,
96        };
97        if let Some(total) = stats.progresses.total_scan {
98            p.total_rows = total.rows;
99            p.total_bytes = total.bytes;
100        }
101        p
102    }
103}
104
105#[derive(Clone, Debug, Default)]
106pub struct Row {
107    schema: SchemaRef,
108    values: Vec<Value>,
109}
110
111impl Row {
112    pub fn new(schema: SchemaRef, values: Vec<Value>) -> Self {
113        Self { schema, values }
114    }
115
116    pub fn len(&self) -> usize {
117        self.values.len()
118    }
119
120    pub fn is_empty(&self) -> bool {
121        self.values.is_empty()
122    }
123
124    pub fn values(&self) -> &[Value] {
125        &self.values
126    }
127
128    pub fn schema(&self) -> SchemaRef {
129        self.schema.clone()
130    }
131
132    pub fn from_vec(schema: SchemaRef, values: Vec<Value>) -> Self {
133        Self { schema, values }
134    }
135}
136
137impl TryFrom<(SchemaRef, Vec<Option<String>>, &ResultFormatSettings)> for Row {
138    type Error = Error;
139
140    fn try_from(
141        (schema, data, settings): (SchemaRef, Vec<Option<String>>, &ResultFormatSettings),
142    ) -> Result<Self> {
143        let mut values: Vec<Value> = Vec::with_capacity(data.len());
144        for (field, val) in schema.fields().iter().zip(data) {
145            values.push(Value::try_from((&field.data_type, val, settings))?);
146        }
147        Ok(Self::new(schema, values))
148    }
149}
150
151impl IntoIterator for Row {
152    type Item = Value;
153    type IntoIter = std::vec::IntoIter<Self::Item>;
154
155    fn into_iter(self) -> Self::IntoIter {
156        self.values.into_iter()
157    }
158}
159
160#[derive(Clone, Debug)]
161pub struct Rows {
162    rows: Vec<Row>,
163}
164
165impl Rows {
166    pub fn new(rows: Vec<Row>) -> Self {
167        Self { rows }
168    }
169
170    // pub fn schema(&self) -> SchemaRef {
171    //     self.schema.clone()
172    // }
173
174    pub fn rows(&self) -> &[Row] {
175        &self.rows
176    }
177
178    pub fn len(&self) -> usize {
179        self.rows.len()
180    }
181
182    pub fn is_empty(&self) -> bool {
183        self.rows.is_empty()
184    }
185}
186
187impl TryFrom<(RecordBatch, ResultFormatSettings)> for Rows {
188    type Error = Error;
189    fn try_from((batch, settings): (RecordBatch, ResultFormatSettings)) -> Result<Self> {
190        let batch_schema = batch.schema();
191        let schema = SchemaRef::new(batch_schema.clone().try_into()?);
192        let mut rows: Vec<Row> = Vec::new();
193        for i in 0..batch.num_rows() {
194            let mut values: Vec<Value> = Vec::new();
195            for j in 0..batch_schema.fields().len() {
196                let v = batch.column(j);
197                let field = batch_schema.field(j);
198                let value = Value::try_from((field, v, i, &settings))?;
199                values.push(value);
200            }
201            rows.push(Row::new(schema.clone(), values));
202        }
203        Ok(Self::new(rows))
204    }
205}
206
207impl IntoIterator for Rows {
208    type Item = Row;
209    type IntoIter = std::vec::IntoIter<Self::Item>;
210
211    fn into_iter(self) -> Self::IntoIter {
212        self.rows.into_iter()
213    }
214}
215
216macro_rules! replace_expr {
217    ($_t:tt $sub:expr) => {
218        $sub
219    };
220}
221
222// This macro implements TryFrom for tuple of types
223macro_rules! impl_tuple_from_row {
224    ( $($Ti:tt),+ ) => {
225        impl<$($Ti),+> TryFrom<Row> for ($($Ti,)+)
226        where
227            $($Ti: TryFrom<Value>),+
228        {
229            type Error = String;
230            fn try_from(row: Row) -> Result<Self, String> {
231                // It is not possible yet to get the number of metavariable repetitions
232                // ref: https://github.com/rust-lang/lang-team/issues/28#issue-644523674
233                // This is a workaround
234                let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
235
236                if expected_len != row.len() {
237                    return Err(format!("row size mismatch: expected {} columns, got {}", expected_len, row.len()));
238                }
239                let mut vals_iter = row.into_iter().enumerate();
240
241                Ok((
242                    $(
243                        {
244                            let (col_ix, col_value) = vals_iter
245                                .next()
246                                .unwrap(); // vals_iter size is checked before this code is reached,
247                                           // so it is safe to unwrap
248                            let t = col_value.get_type();
249                            $Ti::try_from(col_value)
250                                .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
251                        }
252                    ,)+
253                ))
254            }
255        }
256    }
257}
258
259// Implement FromRow for tuples of size up to 16
260impl_tuple_from_row!(T1);
261impl_tuple_from_row!(T1, T2);
262impl_tuple_from_row!(T1, T2, T3);
263impl_tuple_from_row!(T1, T2, T3, T4);
264impl_tuple_from_row!(T1, T2, T3, T4, T5);
265impl_tuple_from_row!(T1, T2, T3, T4, T5, T6);
266impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7);
267impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8);
268impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
269impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
270impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
271impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
272impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
273impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
274impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
275impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
276
277pub struct RowIterator {
278    schema: SchemaRef,
279    it: Option<Pin<Box<dyn Stream<Item = Result<Row>> + Send>>>,
280}
281
282impl RowIterator {
283    pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
284        Self {
285            schema,
286            it: Some(it),
287        }
288    }
289
290    pub fn schema(&self) -> SchemaRef {
291        self.schema.clone()
292    }
293
294    pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
295    where
296        T: TryFrom<Row>,
297        T::Error: std::fmt::Display,
298    {
299        if let Some(it) = &mut self.it {
300            let mut ret = Vec::new();
301            while let Some(row) = it.next().await {
302                let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
303                ret.push(v)
304            }
305            Ok(ret)
306        } else {
307            Err(Error::BadArgument("RowIterator already closed".to_owned()))
308        }
309    }
310
311    pub fn close(&mut self) {
312        self.it = None;
313    }
314}
315
316impl Stream for RowIterator {
317    type Item = Result<Row>;
318
319    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320        if let Some(it) = self.it.as_mut() {
321            Pin::new(it).poll_next(cx)
322        } else {
323            Poll::Ready(Some(Err(Error::BadArgument(
324                "RowIterator already closed".to_owned(),
325            ))))
326        }
327    }
328}
329
330pub struct RowStatsIterator {
331    schema: SchemaRef,
332    it: Option<Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>>,
333}
334
335impl RowStatsIterator {
336    pub fn new(
337        schema: SchemaRef,
338        it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
339    ) -> Self {
340        Self {
341            schema,
342            it: Some(it),
343        }
344    }
345
346    pub fn schema(&self) -> SchemaRef {
347        self.schema.clone()
348    }
349
350    pub async fn filter_rows(self) -> Result<RowIterator> {
351        if let Some(it) = self.it {
352            let it = it.filter_map(|r| match r {
353                Ok(RowWithStats::Row(r)) => Some(Ok(r)),
354                Ok(_) => None,
355                Err(err) => Some(Err(err)),
356            });
357            Ok(RowIterator::new(self.schema, Box::pin(it)))
358        } else {
359            Err(Error::BadArgument("RowIterator already closed".to_owned()))
360        }
361    }
362
363    pub fn close(&mut self) {
364        self.it = None;
365    }
366}
367
368impl Stream for RowStatsIterator {
369    type Item = Result<RowWithStats>;
370
371    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
372        if let Some(it) = self.it.as_mut() {
373            Pin::new(it).poll_next(cx)
374        } else {
375            Poll::Ready(Some(Err(Error::BadArgument(
376                "RowStatsIterator already closed".to_owned(),
377            ))))
378        }
379    }
380}