databend_driver_core/
raw_rows.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use chrono_tz::Tz;
16use std::pin::Pin;
17use std::task::Context;
18use std::task::Poll;
19use tokio_stream::{Stream, StreamExt};
20
21use crate::error::Error;
22use crate::error::Result;
23use crate::rows::Row;
24use crate::rows::ServerStats;
25use crate::schema::SchemaRef;
26use crate::value::Value;
27
28#[derive(Clone, Debug)]
29pub enum RawRowWithStats {
30    Row(RawRow),
31    Stats(ServerStats),
32}
33
34#[derive(Clone, Debug, Default)]
35pub struct RawRow {
36    pub row: Row,
37    pub raw_row: Vec<Option<String>>,
38}
39
40impl RawRow {
41    pub fn new(row: Row, raw_row: Vec<Option<String>>) -> Self {
42        Self { row, raw_row }
43    }
44
45    pub fn len(&self) -> usize {
46        self.raw_row.len()
47    }
48
49    pub fn is_empty(&self) -> bool {
50        self.raw_row.is_empty()
51    }
52
53    pub fn values(&self) -> &[Option<String>] {
54        &self.raw_row
55    }
56
57    pub fn schema(&self) -> SchemaRef {
58        self.row.schema()
59    }
60}
61
62impl TryFrom<(SchemaRef, Vec<Option<String>>, Tz)> for RawRow {
63    type Error = Error;
64
65    fn try_from((schema, data, tz): (SchemaRef, Vec<Option<String>>, Tz)) -> Result<Self> {
66        let mut values: Vec<Value> = Vec::with_capacity(data.len());
67        for (field, val) in schema.fields().iter().zip(data.clone().into_iter()) {
68            values.push(Value::try_from((&field.data_type, val, tz))?);
69        }
70
71        let row = Row::new(schema, values);
72        Ok(RawRow::new(row, data))
73    }
74}
75
76impl From<Row> for RawRow {
77    fn from(row: Row) -> Self {
78        let mut raw_row: Vec<Option<String>> = Vec::with_capacity(row.values().len());
79        for val in row.values() {
80            raw_row.push(Some(val.to_string()));
81        }
82        RawRow::new(row, raw_row)
83    }
84}
85
86impl IntoIterator for RawRow {
87    type Item = Option<String>;
88    type IntoIter = std::vec::IntoIter<Self::Item>;
89
90    fn into_iter(self) -> Self::IntoIter {
91        self.raw_row.into_iter()
92    }
93}
94
95#[derive(Clone, Debug)]
96pub struct RawRows {
97    rows: Vec<RawRow>,
98}
99
100impl RawRows {
101    pub fn new(rows: Vec<RawRow>) -> Self {
102        Self { rows }
103    }
104
105    pub fn rows(&self) -> &[RawRow] {
106        &self.rows
107    }
108
109    pub fn len(&self) -> usize {
110        self.rows.len()
111    }
112
113    pub fn is_empty(&self) -> bool {
114        self.rows.is_empty()
115    }
116}
117
118impl IntoIterator for RawRows {
119    type Item = RawRow;
120    type IntoIter = std::vec::IntoIter<Self::Item>;
121
122    fn into_iter(self) -> Self::IntoIter {
123        self.rows.into_iter()
124    }
125}
126
127pub struct RawRowIterator {
128    schema: SchemaRef,
129    it: Pin<Box<dyn Stream<Item = Result<RawRow>> + Send>>,
130}
131
132impl RawRowIterator {
133    pub fn new(
134        schema: SchemaRef,
135        it: Pin<Box<dyn Stream<Item = Result<RawRowWithStats>> + Send>>,
136    ) -> Self {
137        let it = it.filter_map(|r| match r {
138            Ok(RawRowWithStats::Row(r)) => Some(Ok(r)),
139            Ok(_) => None,
140            Err(err) => Some(Err(err)),
141        });
142        Self {
143            schema,
144            it: Box::pin(it),
145        }
146    }
147
148    pub fn schema(&self) -> SchemaRef {
149        self.schema.clone()
150    }
151}
152
153impl Stream for RawRowIterator {
154    type Item = Result<RawRow>;
155
156    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157        Pin::new(&mut self.it).poll_next(cx)
158    }
159}