databend_driver_core/
raw_rows.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::error::Error;
16use crate::error::Result;
17use crate::rows::Row;
18use crate::rows::ServerStats;
19use crate::value::FormatOptions;
20use crate::value::Value;
21use databend_client::schema::SchemaRef;
22use jiff::tz::TimeZone;
23use lexical_core::WriteFloatOptionsBuilder;
24use std::pin::Pin;
25use std::task::Context;
26use std::task::Poll;
27use tokio_stream::{Stream, StreamExt};
28
29pub static HTTP_HANDLER_OPTIONS: FormatOptions = FormatOptions {
30    true_string: b"1",
31    false_string: b"0",
32    float_options: WriteFloatOptionsBuilder::new()
33        .nan_string(Some(b"NaN"))
34        .inf_string(Some(b"Infinity"))
35        .build_unchecked(),
36};
37
38#[derive(Clone, Debug)]
39pub enum RawRowWithStats {
40    Row(RawRow),
41    Stats(ServerStats),
42}
43
44#[derive(Clone, Debug, Default)]
45pub struct RawRow {
46    pub row: Row,
47    pub raw_row: Vec<Option<String>>,
48}
49
50impl RawRow {
51    pub fn new(row: Row, raw_row: Vec<Option<String>>) -> Self {
52        Self { row, raw_row }
53    }
54
55    pub fn len(&self) -> usize {
56        self.raw_row.len()
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.raw_row.is_empty()
61    }
62
63    pub fn values(&self) -> &[Option<String>] {
64        &self.raw_row
65    }
66
67    pub fn schema(&self) -> SchemaRef {
68        self.row.schema()
69    }
70}
71
72impl TryFrom<(SchemaRef, Vec<Option<String>>, &TimeZone)> for RawRow {
73    type Error = Error;
74
75    fn try_from((schema, data, tz): (SchemaRef, Vec<Option<String>>, &TimeZone)) -> Result<Self> {
76        let mut values: Vec<Value> = Vec::with_capacity(data.len());
77        for (field, val) in schema.fields().iter().zip(data.clone().into_iter()) {
78            values.push(Value::try_from((&field.data_type, val, tz))?);
79        }
80
81        let row = Row::new(schema, values);
82        Ok(RawRow::new(row, data))
83    }
84}
85
86impl From<Row> for RawRow {
87    fn from(row: Row) -> Self {
88        let mut raw_row: Vec<Option<String>> = Vec::with_capacity(row.values().len());
89        for val in row.values() {
90            raw_row.push(Some(val.to_string_with_options(&HTTP_HANDLER_OPTIONS)));
91        }
92        RawRow::new(row, raw_row)
93    }
94}
95
96impl IntoIterator for RawRow {
97    type Item = Option<String>;
98    type IntoIter = std::vec::IntoIter<Self::Item>;
99
100    fn into_iter(self) -> Self::IntoIter {
101        self.raw_row.into_iter()
102    }
103}
104
105#[derive(Clone, Debug)]
106pub struct RawRows {
107    rows: Vec<RawRow>,
108}
109
110impl RawRows {
111    pub fn new(rows: Vec<RawRow>) -> Self {
112        Self { rows }
113    }
114
115    pub fn rows(&self) -> &[RawRow] {
116        &self.rows
117    }
118
119    pub fn len(&self) -> usize {
120        self.rows.len()
121    }
122
123    pub fn is_empty(&self) -> bool {
124        self.rows.is_empty()
125    }
126}
127
128impl IntoIterator for RawRows {
129    type Item = RawRow;
130    type IntoIter = std::vec::IntoIter<Self::Item>;
131
132    fn into_iter(self) -> Self::IntoIter {
133        self.rows.into_iter()
134    }
135}
136
137pub struct RawRowIterator {
138    schema: SchemaRef,
139    it: Pin<Box<dyn Stream<Item = Result<RawRow>> + Send>>,
140}
141
142impl RawRowIterator {
143    pub fn new(
144        schema: SchemaRef,
145        it: Pin<Box<dyn Stream<Item = Result<RawRowWithStats>> + Send>>,
146    ) -> Self {
147        let it = it.filter_map(|r| match r {
148            Ok(RawRowWithStats::Row(r)) => Some(Ok(r)),
149            Ok(_) => None,
150            Err(err) => Some(Err(err)),
151        });
152        Self {
153            schema,
154            it: Box::pin(it),
155        }
156    }
157
158    pub fn schema(&self) -> SchemaRef {
159        self.schema.clone()
160    }
161}
162
163impl Stream for RawRowIterator {
164    type Item = Result<RawRow>;
165
166    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
167        Pin::new(&mut self.it).poll_next(cx)
168    }
169}