Skip to main content

bigbytes_driver_core/
rows.rs

1// Copyright 2024 Digitrans Inc
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::task::Context;
17use std::task::Poll;
18
19use serde::Deserialize;
20use tokio_stream::{Stream, StreamExt};
21
22#[cfg(feature = "flight-sql")]
23use arrow::record_batch::RecordBatch;
24
25use crate::error::{Error, Result};
26use crate::schema::SchemaRef;
27use crate::value::Value;
28
29#[derive(Clone, Debug)]
30pub enum RowWithStats {
31    Row(Row),
32    Stats(ServerStats),
33}
34
35#[derive(Deserialize, Clone, Debug, Default)]
36pub struct ServerStats {
37    #[serde(default)]
38    pub total_rows: usize,
39    #[serde(default)]
40    pub total_bytes: usize,
41
42    #[serde(default)]
43    pub read_rows: usize,
44    #[serde(default)]
45    pub read_bytes: usize,
46
47    #[serde(default)]
48    pub write_rows: usize,
49    #[serde(default)]
50    pub write_bytes: usize,
51
52    #[serde(default)]
53    pub running_time_ms: f64,
54
55    #[serde(default)]
56    pub spill_file_nums: usize,
57
58    #[serde(default)]
59    pub spill_bytes: usize,
60}
61
62impl ServerStats {
63    pub fn normalize(&mut self) {
64        if self.total_rows == 0 {
65            self.total_rows = self.read_rows;
66        }
67        if self.total_bytes == 0 {
68            self.total_bytes = self.read_bytes;
69        }
70    }
71}
72
73impl From<databend_client::QueryStats> for ServerStats {
74    fn from(stats: databend_client::QueryStats) -> Self {
75        let mut p = Self {
76            total_rows: 0,
77            total_bytes: 0,
78            read_rows: stats.progresses.scan_progress.rows,
79            read_bytes: stats.progresses.scan_progress.bytes,
80            write_rows: stats.progresses.write_progress.rows,
81            write_bytes: stats.progresses.write_progress.bytes,
82            spill_file_nums: stats.progresses.spill_progress.file_nums,
83            spill_bytes: stats.progresses.spill_progress.bytes,
84            running_time_ms: stats.running_time_ms,
85        };
86        if let Some(total) = stats.progresses.total_scan {
87            p.total_rows = total.rows;
88            p.total_bytes = total.bytes;
89        }
90        p
91    }
92}
93
94#[derive(Clone, Debug, Default)]
95pub struct Row {
96    schema: SchemaRef,
97    values: Vec<Value>,
98}
99
100impl Row {
101    pub fn new(schema: SchemaRef, values: Vec<Value>) -> Self {
102        Self { schema, values }
103    }
104
105    pub fn len(&self) -> usize {
106        self.values.len()
107    }
108
109    pub fn is_empty(&self) -> bool {
110        self.values.is_empty()
111    }
112
113    pub fn values(&self) -> &[Value] {
114        &self.values
115    }
116
117    pub fn schema(&self) -> SchemaRef {
118        self.schema.clone()
119    }
120
121    pub fn from_vec(schema: SchemaRef, values: Vec<Value>) -> Self {
122        Self { schema, values }
123    }
124}
125
126impl TryFrom<(SchemaRef, Vec<Option<String>>)> for Row {
127    type Error = Error;
128
129    fn try_from((schema, data): (SchemaRef, Vec<Option<String>>)) -> Result<Self> {
130        let mut values: Vec<Value> = Vec::new();
131        for (i, field) in schema.fields().iter().enumerate() {
132            let val: Option<&str> = data.get(i).and_then(|v| v.as_deref());
133            values.push(Value::try_from((&field.data_type, val))?);
134        }
135        Ok(Self::new(schema, values))
136    }
137}
138
139impl IntoIterator for Row {
140    type Item = Value;
141    type IntoIter = std::vec::IntoIter<Self::Item>;
142
143    fn into_iter(self) -> Self::IntoIter {
144        self.values.into_iter()
145    }
146}
147
148#[derive(Clone, Debug)]
149pub struct Rows {
150    rows: Vec<Row>,
151}
152
153impl Rows {
154    pub fn new(rows: Vec<Row>) -> Self {
155        Self { rows }
156    }
157
158    // pub fn schema(&self) -> SchemaRef {
159    //     self.schema.clone()
160    // }
161
162    pub fn rows(&self) -> &[Row] {
163        &self.rows
164    }
165
166    pub fn len(&self) -> usize {
167        self.rows.len()
168    }
169
170    pub fn is_empty(&self) -> bool {
171        self.rows.is_empty()
172    }
173}
174
175#[cfg(feature = "flight-sql")]
176impl TryFrom<RecordBatch> for Rows {
177    type Error = Error;
178    fn try_from(batch: RecordBatch) -> Result<Self> {
179        let batch_schema = batch.schema();
180        let schema = SchemaRef::new(batch_schema.clone().try_into()?);
181        let mut rows: Vec<Row> = Vec::new();
182        for i in 0..batch.num_rows() {
183            let mut values: Vec<Value> = Vec::new();
184            for j in 0..batch_schema.fields().len() {
185                let v = batch.column(j);
186                let field = batch_schema.field(j);
187                let value = Value::try_from((field, v, i))?;
188                values.push(value);
189            }
190            rows.push(Row::new(schema.clone(), values));
191        }
192        Ok(Self::new(rows))
193    }
194}
195
196impl IntoIterator for Rows {
197    type Item = Row;
198    type IntoIter = std::vec::IntoIter<Self::Item>;
199
200    fn into_iter(self) -> Self::IntoIter {
201        self.rows.into_iter()
202    }
203}
204
205macro_rules! replace_expr {
206    ($_t:tt $sub:expr) => {
207        $sub
208    };
209}
210
211// This macro implements TryFrom for tuple of types
212macro_rules! impl_tuple_from_row {
213    ( $($Ti:tt),+ ) => {
214        impl<$($Ti),+> TryFrom<Row> for ($($Ti,)+)
215        where
216            $($Ti: TryFrom<Value>),+
217        {
218            type Error = String;
219            fn try_from(row: Row) -> Result<Self, String> {
220                // It is not possible yet to get the number of metavariable repetitions
221                // ref: https://github.com/rust-lang/lang-team/issues/28#issue-644523674
222                // This is a workaround
223                let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
224
225                if expected_len != row.len() {
226                    return Err(format!("row size mismatch: expected {} columns, got {}", expected_len, row.len()));
227                }
228                let mut vals_iter = row.into_iter().enumerate();
229
230                Ok((
231                    $(
232                        {
233                            let (col_ix, col_value) = vals_iter
234                                .next()
235                                .unwrap(); // vals_iter size is checked before this code is reached,
236                                           // so it is safe to unwrap
237                            let t = col_value.get_type();
238                            $Ti::try_from(col_value)
239                                .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
240                        }
241                    ,)+
242                ))
243            }
244        }
245    }
246}
247
248// Implement FromRow for tuples of size up to 16
249impl_tuple_from_row!(T1);
250impl_tuple_from_row!(T1, T2);
251impl_tuple_from_row!(T1, T2, T3);
252impl_tuple_from_row!(T1, T2, T3, T4);
253impl_tuple_from_row!(T1, T2, T3, T4, T5);
254impl_tuple_from_row!(T1, T2, T3, T4, T5, T6);
255impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7);
256impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8);
257impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
258impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
259impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
260impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
261impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
262impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
263impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
264impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
265
266pub struct RowIterator {
267    schema: SchemaRef,
268    it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>,
269}
270
271impl RowIterator {
272    pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
273        Self { schema, it }
274    }
275
276    pub fn schema(&self) -> SchemaRef {
277        self.schema.clone()
278    }
279
280    pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
281    where
282        T: TryFrom<Row>,
283        T::Error: std::fmt::Display,
284    {
285        let mut ret = Vec::new();
286        while let Some(row) = self.it.next().await {
287            let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
288            ret.push(v)
289        }
290        Ok(ret)
291    }
292}
293
294impl Stream for RowIterator {
295    type Item = Result<Row>;
296
297    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298        Pin::new(&mut self.it).poll_next(cx)
299    }
300}
301
302pub struct RowStatsIterator {
303    schema: SchemaRef,
304    it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
305}
306
307impl RowStatsIterator {
308    pub fn new(
309        schema: SchemaRef,
310        it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
311    ) -> Self {
312        Self { schema, it }
313    }
314
315    pub fn schema(&self) -> SchemaRef {
316        self.schema.clone()
317    }
318
319    pub async fn filter_rows(self) -> RowIterator {
320        let it = self.it.filter_map(|r| match r {
321            Ok(RowWithStats::Row(r)) => Some(Ok(r)),
322            Ok(_) => None,
323            Err(err) => Some(Err(err)),
324        });
325        RowIterator::new(self.schema, Box::pin(it))
326    }
327}
328
329impl Stream for RowStatsIterator {
330    type Item = Result<RowWithStats>;
331
332    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333        Pin::new(&mut self.it).poll_next(cx)
334    }
335}