databend_driver_core/
rows.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::task::Context;
17use std::task::Poll;
18
19use serde::Deserialize;
20use tokio_stream::{Stream, StreamExt};
21
22#[cfg(feature = "flight-sql")]
23use arrow::record_batch::RecordBatch;
24
25use crate::error::{Error, Result};
26use crate::schema::SchemaRef;
27use crate::value::Value;
28
29#[derive(Clone, Debug)]
30pub enum RowWithStats {
31    Row(Row),
32    Stats(ServerStats),
33}
34
35#[derive(Deserialize, Clone, Debug, Default)]
36pub struct ServerStats {
37    #[serde(default)]
38    pub total_rows: usize,
39    #[serde(default)]
40    pub total_bytes: usize,
41
42    #[serde(default)]
43    pub read_rows: usize,
44    #[serde(default)]
45    pub read_bytes: usize,
46
47    #[serde(default)]
48    pub write_rows: usize,
49    #[serde(default)]
50    pub write_bytes: usize,
51
52    #[serde(default)]
53    pub running_time_ms: f64,
54
55    #[serde(default)]
56    pub spill_file_nums: usize,
57
58    #[serde(default)]
59    pub spill_bytes: usize,
60}
61
62impl ServerStats {
63    pub fn normalize(&mut self) {
64        if self.total_rows == 0 {
65            self.total_rows = self.read_rows;
66        }
67        if self.total_bytes == 0 {
68            self.total_bytes = self.read_bytes;
69        }
70    }
71
72    pub fn merge(&mut self, other: &ServerStats) {
73        self.total_rows += other.total_rows;
74        self.total_bytes += other.total_bytes;
75        self.read_rows += other.read_rows;
76        self.read_bytes += other.read_bytes;
77        self.write_rows += other.write_rows;
78        self.write_bytes += other.write_bytes;
79        self.running_time_ms += other.running_time_ms;
80        self.spill_file_nums += other.spill_file_nums;
81        self.spill_bytes += other.spill_bytes;
82    }
83}
84
85impl From<databend_client::QueryStats> for ServerStats {
86    fn from(stats: databend_client::QueryStats) -> Self {
87        let mut p = Self {
88            total_rows: 0,
89            total_bytes: 0,
90            read_rows: stats.progresses.scan_progress.rows,
91            read_bytes: stats.progresses.scan_progress.bytes,
92            write_rows: stats.progresses.write_progress.rows,
93            write_bytes: stats.progresses.write_progress.bytes,
94            spill_file_nums: stats.progresses.spill_progress.file_nums,
95            spill_bytes: stats.progresses.spill_progress.bytes,
96            running_time_ms: stats.running_time_ms,
97        };
98        if let Some(total) = stats.progresses.total_scan {
99            p.total_rows = total.rows;
100            p.total_bytes = total.bytes;
101        }
102        p
103    }
104}
105
106#[derive(Clone, Debug, Default)]
107pub struct Row {
108    schema: SchemaRef,
109    values: Vec<Value>,
110}
111
112impl Row {
113    pub fn new(schema: SchemaRef, values: Vec<Value>) -> Self {
114        Self { schema, values }
115    }
116
117    pub fn len(&self) -> usize {
118        self.values.len()
119    }
120
121    pub fn is_empty(&self) -> bool {
122        self.values.is_empty()
123    }
124
125    pub fn values(&self) -> &[Value] {
126        &self.values
127    }
128
129    pub fn schema(&self) -> SchemaRef {
130        self.schema.clone()
131    }
132
133    pub fn from_vec(schema: SchemaRef, values: Vec<Value>) -> Self {
134        Self { schema, values }
135    }
136}
137
138impl TryFrom<(SchemaRef, Vec<Option<String>>)> for Row {
139    type Error = Error;
140
141    fn try_from((schema, data): (SchemaRef, Vec<Option<String>>)) -> Result<Self> {
142        let mut values: Vec<Value> = Vec::new();
143        for (i, field) in schema.fields().iter().enumerate() {
144            let val: Option<&str> = data.get(i).and_then(|v| v.as_deref());
145            values.push(Value::try_from((&field.data_type, val))?);
146        }
147        Ok(Self::new(schema, values))
148    }
149}
150
151impl IntoIterator for Row {
152    type Item = Value;
153    type IntoIter = std::vec::IntoIter<Self::Item>;
154
155    fn into_iter(self) -> Self::IntoIter {
156        self.values.into_iter()
157    }
158}
159
160#[derive(Clone, Debug)]
161pub struct Rows {
162    rows: Vec<Row>,
163}
164
165impl Rows {
166    pub fn new(rows: Vec<Row>) -> Self {
167        Self { rows }
168    }
169
170    // pub fn schema(&self) -> SchemaRef {
171    //     self.schema.clone()
172    // }
173
174    pub fn rows(&self) -> &[Row] {
175        &self.rows
176    }
177
178    pub fn len(&self) -> usize {
179        self.rows.len()
180    }
181
182    pub fn is_empty(&self) -> bool {
183        self.rows.is_empty()
184    }
185}
186
187#[cfg(feature = "flight-sql")]
188impl TryFrom<RecordBatch> for Rows {
189    type Error = Error;
190    fn try_from(batch: RecordBatch) -> Result<Self> {
191        let batch_schema = batch.schema();
192        let schema = SchemaRef::new(batch_schema.clone().try_into()?);
193        let mut rows: Vec<Row> = Vec::new();
194        for i in 0..batch.num_rows() {
195            let mut values: Vec<Value> = Vec::new();
196            for j in 0..batch_schema.fields().len() {
197                let v = batch.column(j);
198                let field = batch_schema.field(j);
199                let value = Value::try_from((field, v, i))?;
200                values.push(value);
201            }
202            rows.push(Row::new(schema.clone(), values));
203        }
204        Ok(Self::new(rows))
205    }
206}
207
208impl IntoIterator for Rows {
209    type Item = Row;
210    type IntoIter = std::vec::IntoIter<Self::Item>;
211
212    fn into_iter(self) -> Self::IntoIter {
213        self.rows.into_iter()
214    }
215}
216
217macro_rules! replace_expr {
218    ($_t:tt $sub:expr) => {
219        $sub
220    };
221}
222
223// This macro implements TryFrom for tuple of types
224macro_rules! impl_tuple_from_row {
225    ( $($Ti:tt),+ ) => {
226        impl<$($Ti),+> TryFrom<Row> for ($($Ti,)+)
227        where
228            $($Ti: TryFrom<Value>),+
229        {
230            type Error = String;
231            fn try_from(row: Row) -> Result<Self, String> {
232                // It is not possible yet to get the number of metavariable repetitions
233                // ref: https://github.com/rust-lang/lang-team/issues/28#issue-644523674
234                // This is a workaround
235                let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
236
237                if expected_len != row.len() {
238                    return Err(format!("row size mismatch: expected {} columns, got {}", expected_len, row.len()));
239                }
240                let mut vals_iter = row.into_iter().enumerate();
241
242                Ok((
243                    $(
244                        {
245                            let (col_ix, col_value) = vals_iter
246                                .next()
247                                .unwrap(); // vals_iter size is checked before this code is reached,
248                                           // so it is safe to unwrap
249                            let t = col_value.get_type();
250                            $Ti::try_from(col_value)
251                                .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
252                        }
253                    ,)+
254                ))
255            }
256        }
257    }
258}
259
260// Implement FromRow for tuples of size up to 16
261impl_tuple_from_row!(T1);
262impl_tuple_from_row!(T1, T2);
263impl_tuple_from_row!(T1, T2, T3);
264impl_tuple_from_row!(T1, T2, T3, T4);
265impl_tuple_from_row!(T1, T2, T3, T4, T5);
266impl_tuple_from_row!(T1, T2, T3, T4, T5, T6);
267impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7);
268impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8);
269impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
270impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
271impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
272impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
273impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
274impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
275impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
276impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
277
278pub struct RowIterator {
279    schema: SchemaRef,
280    it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>,
281}
282
283impl RowIterator {
284    pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
285        Self { schema, it }
286    }
287
288    pub fn schema(&self) -> SchemaRef {
289        self.schema.clone()
290    }
291
292    pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
293    where
294        T: TryFrom<Row>,
295        T::Error: std::fmt::Display,
296    {
297        let mut ret = Vec::new();
298        while let Some(row) = self.it.next().await {
299            let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
300            ret.push(v)
301        }
302        Ok(ret)
303    }
304}
305
306impl Stream for RowIterator {
307    type Item = Result<Row>;
308
309    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
310        Pin::new(&mut self.it).poll_next(cx)
311    }
312}
313
314pub struct RowStatsIterator {
315    schema: SchemaRef,
316    it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
317}
318
319impl RowStatsIterator {
320    pub fn new(
321        schema: SchemaRef,
322        it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
323    ) -> Self {
324        Self { schema, it }
325    }
326
327    pub fn schema(&self) -> SchemaRef {
328        self.schema.clone()
329    }
330
331    pub async fn filter_rows(self) -> RowIterator {
332        let it = self.it.filter_map(|r| match r {
333            Ok(RowWithStats::Row(r)) => Some(Ok(r)),
334            Ok(_) => None,
335            Err(err) => Some(Err(err)),
336        });
337        RowIterator::new(self.schema, Box::pin(it))
338    }
339}
340
341impl Stream for RowStatsIterator {
342    type Item = Result<RowWithStats>;
343
344    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
345        Pin::new(&mut self.it).poll_next(cx)
346    }
347}