1use std::pin::Pin;
16use std::task::Context;
17use std::task::Poll;
18
19use serde::Deserialize;
20use tokio_stream::{Stream, StreamExt};
21
22use crate::error::{Error, Result};
23use crate::value::Value;
24use arrow::record_batch::RecordBatch;
25use databend_client::schema::SchemaRef;
26use databend_client::ResultFormatSettings;
27
28#[derive(Clone, Debug)]
29pub enum RowWithStats {
30 Row(Row),
31 Stats(ServerStats),
32}
33
34#[derive(Deserialize, Clone, Debug, Default)]
35pub struct ServerStats {
36 #[serde(default)]
37 pub total_rows: usize,
38 #[serde(default)]
39 pub total_bytes: usize,
40
41 #[serde(default)]
42 pub read_rows: usize,
43 #[serde(default)]
44 pub read_bytes: usize,
45
46 #[serde(default)]
47 pub write_rows: usize,
48 #[serde(default)]
49 pub write_bytes: usize,
50
51 #[serde(default)]
52 pub running_time_ms: f64,
53
54 #[serde(default)]
55 pub spill_file_nums: usize,
56
57 #[serde(default)]
58 pub spill_bytes: usize,
59}
60
61impl ServerStats {
62 pub fn normalize(&mut self) {
63 if self.total_rows == 0 {
64 self.total_rows = self.read_rows;
65 }
66 if self.total_bytes == 0 {
67 self.total_bytes = self.read_bytes;
68 }
69 }
70
71 pub fn merge(&mut self, other: &ServerStats) {
72 self.total_rows += other.total_rows;
73 self.total_bytes += other.total_bytes;
74 self.read_rows += other.read_rows;
75 self.read_bytes += other.read_bytes;
76 self.write_rows += other.write_rows;
77 self.write_bytes += other.write_bytes;
78 self.running_time_ms += other.running_time_ms;
79 self.spill_file_nums += other.spill_file_nums;
80 self.spill_bytes += other.spill_bytes;
81 }
82}
83
84impl From<databend_client::QueryStats> for ServerStats {
85 fn from(stats: databend_client::QueryStats) -> Self {
86 let mut p = Self {
87 total_rows: 0,
88 total_bytes: 0,
89 read_rows: stats.progresses.scan_progress.rows,
90 read_bytes: stats.progresses.scan_progress.bytes,
91 write_rows: stats.progresses.write_progress.rows,
92 write_bytes: stats.progresses.write_progress.bytes,
93 spill_file_nums: stats.progresses.spill_progress.file_nums,
94 spill_bytes: stats.progresses.spill_progress.bytes,
95 running_time_ms: stats.running_time_ms,
96 };
97 if let Some(total) = stats.progresses.total_scan {
98 p.total_rows = total.rows;
99 p.total_bytes = total.bytes;
100 }
101 p
102 }
103}
104
105#[derive(Clone, Debug, Default)]
106pub struct Row {
107 schema: SchemaRef,
108 values: Vec<Value>,
109}
110
111impl Row {
112 pub fn new(schema: SchemaRef, values: Vec<Value>) -> Self {
113 Self { schema, values }
114 }
115
116 pub fn len(&self) -> usize {
117 self.values.len()
118 }
119
120 pub fn is_empty(&self) -> bool {
121 self.values.is_empty()
122 }
123
124 pub fn values(&self) -> &[Value] {
125 &self.values
126 }
127
128 pub fn schema(&self) -> SchemaRef {
129 self.schema.clone()
130 }
131
132 pub fn from_vec(schema: SchemaRef, values: Vec<Value>) -> Self {
133 Self { schema, values }
134 }
135}
136
137impl TryFrom<(SchemaRef, Vec<Option<String>>, &ResultFormatSettings)> for Row {
138 type Error = Error;
139
140 fn try_from(
141 (schema, data, settings): (SchemaRef, Vec<Option<String>>, &ResultFormatSettings),
142 ) -> Result<Self> {
143 let mut values: Vec<Value> = Vec::with_capacity(data.len());
144 for (field, val) in schema.fields().iter().zip(data) {
145 values.push(Value::try_from((&field.data_type, val, settings))?);
146 }
147 Ok(Self::new(schema, values))
148 }
149}
150
151impl IntoIterator for Row {
152 type Item = Value;
153 type IntoIter = std::vec::IntoIter<Self::Item>;
154
155 fn into_iter(self) -> Self::IntoIter {
156 self.values.into_iter()
157 }
158}
159
160#[derive(Clone, Debug)]
161pub struct Rows {
162 rows: Vec<Row>,
163}
164
165impl Rows {
166 pub fn new(rows: Vec<Row>) -> Self {
167 Self { rows }
168 }
169
170 pub fn rows(&self) -> &[Row] {
175 &self.rows
176 }
177
178 pub fn len(&self) -> usize {
179 self.rows.len()
180 }
181
182 pub fn is_empty(&self) -> bool {
183 self.rows.is_empty()
184 }
185}
186
187impl TryFrom<(RecordBatch, ResultFormatSettings)> for Rows {
188 type Error = Error;
189 fn try_from((batch, settings): (RecordBatch, ResultFormatSettings)) -> Result<Self> {
190 let batch_schema = batch.schema();
191 let schema = SchemaRef::new(batch_schema.clone().try_into()?);
192 let mut rows: Vec<Row> = Vec::new();
193 for i in 0..batch.num_rows() {
194 let mut values: Vec<Value> = Vec::new();
195 for j in 0..batch_schema.fields().len() {
196 let v = batch.column(j);
197 let field = batch_schema.field(j);
198 let value = Value::try_from((field, v, i, &settings))?;
199 values.push(value);
200 }
201 rows.push(Row::new(schema.clone(), values));
202 }
203 Ok(Self::new(rows))
204 }
205}
206
207impl IntoIterator for Rows {
208 type Item = Row;
209 type IntoIter = std::vec::IntoIter<Self::Item>;
210
211 fn into_iter(self) -> Self::IntoIter {
212 self.rows.into_iter()
213 }
214}
215
216macro_rules! replace_expr {
217 ($_t:tt $sub:expr) => {
218 $sub
219 };
220}
221
222macro_rules! impl_tuple_from_row {
224 ( $($Ti:tt),+ ) => {
225 impl<$($Ti),+> TryFrom<Row> for ($($Ti,)+)
226 where
227 $($Ti: TryFrom<Value>),+
228 {
229 type Error = String;
230 fn try_from(row: Row) -> Result<Self, String> {
231 let expected_len = <[()]>::len(&[$(replace_expr!(($Ti) ())),*]);
235
236 if expected_len != row.len() {
237 return Err(format!("row size mismatch: expected {} columns, got {}", expected_len, row.len()));
238 }
239 let mut vals_iter = row.into_iter().enumerate();
240
241 Ok((
242 $(
243 {
244 let (col_ix, col_value) = vals_iter
245 .next()
246 .unwrap(); let t = col_value.get_type();
249 $Ti::try_from(col_value)
250 .map_err(|_| format!("failed converting column {} from type({:?}) to type({})", col_ix, t, std::any::type_name::<$Ti>()))?
251 }
252 ,)+
253 ))
254 }
255 }
256 }
257}
258
259impl_tuple_from_row!(T1);
261impl_tuple_from_row!(T1, T2);
262impl_tuple_from_row!(T1, T2, T3);
263impl_tuple_from_row!(T1, T2, T3, T4);
264impl_tuple_from_row!(T1, T2, T3, T4, T5);
265impl_tuple_from_row!(T1, T2, T3, T4, T5, T6);
266impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7);
267impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8);
268impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9);
269impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10);
270impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11);
271impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12);
272impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13);
273impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14);
274impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15);
275impl_tuple_from_row!(T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16);
276
277pub struct RowIterator {
278 schema: SchemaRef,
279 it: Option<Pin<Box<dyn Stream<Item = Result<Row>> + Send>>>,
280}
281
282impl RowIterator {
283 pub fn new(schema: SchemaRef, it: Pin<Box<dyn Stream<Item = Result<Row>> + Send>>) -> Self {
284 Self {
285 schema,
286 it: Some(it),
287 }
288 }
289
290 pub fn schema(&self) -> SchemaRef {
291 self.schema.clone()
292 }
293
294 pub async fn try_collect<T>(mut self) -> Result<Vec<T>>
295 where
296 T: TryFrom<Row>,
297 T::Error: std::fmt::Display,
298 {
299 if let Some(it) = &mut self.it {
300 let mut ret = Vec::new();
301 while let Some(row) = it.next().await {
302 let v = T::try_from(row?).map_err(|e| Error::Parsing(e.to_string()))?;
303 ret.push(v)
304 }
305 Ok(ret)
306 } else {
307 Err(Error::BadArgument("RowIterator already closed".to_owned()))
308 }
309 }
310
311 pub fn close(&mut self) {
312 self.it = None;
313 }
314}
315
316impl Stream for RowIterator {
317 type Item = Result<Row>;
318
319 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320 if let Some(it) = self.it.as_mut() {
321 Pin::new(it).poll_next(cx)
322 } else {
323 Poll::Ready(Some(Err(Error::BadArgument(
324 "RowIterator already closed".to_owned(),
325 ))))
326 }
327 }
328}
329
330pub struct RowStatsIterator {
331 schema: SchemaRef,
332 it: Option<Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>>,
333}
334
335impl RowStatsIterator {
336 pub fn new(
337 schema: SchemaRef,
338 it: Pin<Box<dyn Stream<Item = Result<RowWithStats>> + Send>>,
339 ) -> Self {
340 Self {
341 schema,
342 it: Some(it),
343 }
344 }
345
346 pub fn schema(&self) -> SchemaRef {
347 self.schema.clone()
348 }
349
350 pub async fn filter_rows(self) -> Result<RowIterator> {
351 if let Some(it) = self.it {
352 let it = it.filter_map(|r| match r {
353 Ok(RowWithStats::Row(r)) => Some(Ok(r)),
354 Ok(_) => None,
355 Err(err) => Some(Err(err)),
356 });
357 Ok(RowIterator::new(self.schema, Box::pin(it)))
358 } else {
359 Err(Error::BadArgument("RowIterator already closed".to_owned()))
360 }
361 }
362
363 pub fn close(&mut self) {
364 self.it = None;
365 }
366}
367
368impl Stream for RowStatsIterator {
369 type Item = Result<RowWithStats>;
370
371 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
372 if let Some(it) = self.it.as_mut() {
373 Pin::new(it).poll_next(cx)
374 } else {
375 Poll::Ready(Some(Err(Error::BadArgument(
376 "RowStatsIterator already closed".to_owned(),
377 ))))
378 }
379 }
380}