databend_driver_core/
raw_rows.rs1use crate::error::Error;
16use crate::error::Result;
17use crate::rows::Row;
18use crate::rows::ServerStats;
19use crate::value::FormatOptions;
20use crate::value::Value;
21use databend_client::schema::SchemaRef;
22use databend_client::ResultFormatSettings;
23use lexical_core::WriteFloatOptionsBuilder;
24use std::pin::Pin;
25use std::task::Context;
26use std::task::Poll;
27use tokio_stream::{Stream, StreamExt};
28
29pub static HTTP_HANDLER_OPTIONS: FormatOptions = FormatOptions {
30 true_string: b"1",
31 false_string: b"0",
32 float_options: WriteFloatOptionsBuilder::new()
33 .nan_string(Some(b"NaN"))
34 .inf_string(Some(b"Infinity"))
35 .build_unchecked(),
36};
37
38#[derive(Clone, Debug)]
39pub enum RawRowWithStats {
40 Row(RawRow),
41 Stats(ServerStats),
42}
43
44#[derive(Clone, Debug, Default)]
45pub struct RawRow {
46 pub row: Row,
47 pub raw_row: Vec<Option<String>>,
48}
49
50impl RawRow {
51 pub fn new(row: Row, raw_row: Vec<Option<String>>) -> Self {
52 Self { row, raw_row }
53 }
54
55 pub fn len(&self) -> usize {
56 self.raw_row.len()
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.raw_row.is_empty()
61 }
62
63 pub fn values(&self) -> &[Option<String>] {
64 &self.raw_row
65 }
66
67 pub fn schema(&self) -> SchemaRef {
68 self.row.schema()
69 }
70}
71
72impl TryFrom<(SchemaRef, Vec<Option<String>>, &ResultFormatSettings)> for RawRow {
73 type Error = Error;
74
75 fn try_from(
76 (schema, data, settings): (SchemaRef, Vec<Option<String>>, &ResultFormatSettings),
77 ) -> Result<Self> {
78 let mut values: Vec<Value> = Vec::with_capacity(data.len());
79 for (field, val) in schema.fields().iter().zip(data.clone()) {
80 values.push(Value::try_from((&field.data_type, val, settings))?);
81 }
82
83 let row = Row::new(schema, values);
84 Ok(RawRow::new(row, data))
85 }
86}
87
88impl From<Row> for RawRow {
89 fn from(row: Row) -> Self {
90 let mut raw_row: Vec<Option<String>> = Vec::with_capacity(row.values().len());
91 for val in row.values() {
92 raw_row.push(Some(val.to_string_with_options(&HTTP_HANDLER_OPTIONS)));
93 }
94 RawRow::new(row, raw_row)
95 }
96}
97
98impl IntoIterator for RawRow {
99 type Item = Option<String>;
100 type IntoIter = std::vec::IntoIter<Self::Item>;
101
102 fn into_iter(self) -> Self::IntoIter {
103 self.raw_row.into_iter()
104 }
105}
106
107#[derive(Clone, Debug)]
108pub struct RawRows {
109 rows: Vec<RawRow>,
110}
111
112impl RawRows {
113 pub fn new(rows: Vec<RawRow>) -> Self {
114 Self { rows }
115 }
116
117 pub fn rows(&self) -> &[RawRow] {
118 &self.rows
119 }
120
121 pub fn len(&self) -> usize {
122 self.rows.len()
123 }
124
125 pub fn is_empty(&self) -> bool {
126 self.rows.is_empty()
127 }
128}
129
130impl IntoIterator for RawRows {
131 type Item = RawRow;
132 type IntoIter = std::vec::IntoIter<Self::Item>;
133
134 fn into_iter(self) -> Self::IntoIter {
135 self.rows.into_iter()
136 }
137}
138
139pub struct RawRowIterator {
140 schema: SchemaRef,
141 it: Pin<Box<dyn Stream<Item = Result<RawRow>> + Send>>,
142}
143
144impl RawRowIterator {
145 pub fn new(
146 schema: SchemaRef,
147 it: Pin<Box<dyn Stream<Item = Result<RawRowWithStats>> + Send>>,
148 ) -> Self {
149 let it = it.filter_map(|r| match r {
150 Ok(RawRowWithStats::Row(r)) => Some(Ok(r)),
151 Ok(_) => None,
152 Err(err) => Some(Err(err)),
153 });
154 Self {
155 schema,
156 it: Box::pin(it),
157 }
158 }
159
160 pub fn schema(&self) -> SchemaRef {
161 self.schema.clone()
162 }
163}
164
165impl Stream for RawRowIterator {
166 type Item = Result<RawRow>;
167
168 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
169 Pin::new(&mut self.it).poll_next(cx)
170 }
171}