clickhouse/cursors/
data_row.rs1use std::sync::Arc;
2
3#[cfg(feature = "arrow")]
4use sea_orm_arrow::arrow;
5
6use clickhouse_types::error::TypesError;
7use clickhouse_types::{DataTypeNode, parse_rbwnat_columns_header};
8
9use crate::{
10 bytes_ext::BytesExt,
11 cursors::RawCursor,
12 data_row::{DataRow, RowBatch},
13 error::{Error, Result},
14 response::Response,
15 rowbinary::value_de::decode_row,
16};
17
18#[must_use]
23pub struct DataRowCursor {
24 raw: RawCursor,
25 bytes: BytesExt,
26 columns: Option<Arc<[Arc<str>]>>,
28 column_types: Option<Arc<[DataTypeNode]>>,
30 #[cfg(feature = "arrow")]
31 arrow_schema: Option<Arc<arrow::datatypes::Schema>>,
32}
33
34impl DataRowCursor {
35 pub(crate) fn new(response: Response) -> Self {
36 Self {
37 raw: RawCursor::new(response),
38 bytes: BytesExt::default(),
39 columns: None,
40 column_types: None,
41 #[cfg(feature = "arrow")]
42 arrow_schema: None,
43 }
44 }
45
46 pub fn columns(&self) -> Option<&[Arc<str>]> {
51 self.columns.as_deref()
52 }
53
54 #[inline]
59 pub fn received_bytes(&self) -> u64 {
60 self.raw.received_bytes()
61 }
62
63 #[inline]
65 pub fn decoded_bytes(&self) -> u64 {
66 self.raw.decoded_bytes()
67 }
68
69 pub async fn next(&mut self) -> Result<Option<DataRow>> {
78 if self.column_types.is_none() {
79 self.read_header().await?;
80 }
81
82 let column_types = self
85 .column_types
86 .as_ref()
87 .expect("just initialised")
88 .clone();
89 let column_names = self.columns.as_ref().expect("just initialised").clone();
90
91 loop {
92 if self.bytes.remaining() > 0 {
93 let mut slice = self.bytes.slice();
94 match decode_row(&mut slice, &*column_types) {
95 Ok(values) => {
96 self.bytes.set_remaining(slice.len());
99 return Ok(Some(DataRow {
100 column_names,
101 column_types,
102 values,
103 }));
104 }
105 Err(Error::NotEnoughData) => {
106 }
108 Err(err) => return Err(err),
109 }
110 }
111
112 match self.raw.next().await? {
115 Some(chunk) => self.bytes.extend(chunk),
116 None if self.bytes.remaining() > 0 => {
117 return Err(Error::NotEnoughData);
119 }
120 None => return Ok(None),
121 }
122 }
123 }
124
125 pub async fn next_batch(&mut self, max_rows: usize) -> Result<Option<RowBatch>> {
130 if self.column_types.is_none() {
131 self.read_header().await?;
132 }
133
134 let column_count = self.column_types.as_ref().expect("just initialised").len();
135
136 let mut column_data: Vec<Vec<sea_query::Value>> = (0..column_count)
137 .map(|_| Vec::with_capacity(max_rows))
138 .collect();
139 let mut num_rows = 0;
140
141 while num_rows < max_rows {
142 match self.next().await? {
143 Some(row) => {
144 for (col, value) in column_data.iter_mut().zip(row.values) {
145 col.push(value);
146 }
147 num_rows += 1;
148 }
149 None => break,
150 }
151 }
152
153 if num_rows == 0 {
154 return Ok(None);
155 }
156
157 let column_names = self.columns.as_ref().expect("header was read").clone();
158 let column_types = self.column_types.as_ref().expect("header was read").clone();
159 Ok(Some(RowBatch {
160 column_names,
161 column_types,
162 column_data,
163 num_rows,
164 }))
165 }
166
167 #[cfg(feature = "arrow")]
178 pub async fn next_arrow_batch(
179 &mut self,
180 max_rows: usize,
181 ) -> Result<Option<sea_orm_arrow::arrow::array::RecordBatch>> {
182 use std::sync::Arc;
183
184 let batch = match self.next_batch(max_rows).await? {
185 Some(b) => b,
186 None => return Ok(None),
187 };
188
189 let schema = self
190 .arrow_schema
191 .as_ref()
192 .expect("header was read by next_batch")
193 .clone();
194
195 let columns = schema
196 .fields()
197 .iter()
198 .zip(batch.column_data.iter())
199 .map(
200 |(field, values): (&Arc<arrow::datatypes::Field>, &Vec<sea_query::Value>)| {
201 sea_orm_arrow::values_to_arrow_array(values, field.data_type())
202 .map_err(|e| crate::error::Error::Other(Box::new(e)))
203 },
204 )
205 .collect::<Result<Vec<_>>>()?;
206
207 sea_orm_arrow::arrow::array::RecordBatch::try_new(schema, columns)
208 .map_err(|e| crate::error::Error::Other(Box::new(e)))
209 .map(Some)
210 }
211
212 #[cold]
213 #[inline(never)]
214 async fn read_header(&mut self) -> Result<()> {
215 loop {
216 if self.bytes.remaining() > 0 {
217 let mut slice = self.bytes.slice();
218 match parse_rbwnat_columns_header(&mut slice) {
219 Ok(cols) if !cols.is_empty() => {
220 self.bytes.set_remaining(slice.len());
221 #[cfg(feature = "arrow")]
222 {
223 self.arrow_schema =
224 Some(Arc::new(crate::arrow::schema::from_columns(&cols)));
225 }
226 let columns: Arc<[Arc<str>]> =
227 cols.iter().map(|c| Arc::from(c.name.as_str())).collect();
228 let types: Arc<[DataTypeNode]> =
229 cols.into_iter().map(|c| c.data_type).collect();
230 self.columns = Some(columns);
231 self.column_types = Some(types);
232 return Ok(());
233 }
234 Ok(_) => {
235 return Err(Error::BadResponse(
236 "Expected at least one column in the header".to_string(),
237 ));
238 }
239 Err(TypesError::NotEnoughData(_)) => {
240 }
242 Err(err) => {
243 return Err(Error::InvalidColumnsHeader(err.into()));
244 }
245 }
246 }
247
248 match self.raw.next().await? {
249 Some(chunk) => self.bytes.extend(chunk),
250 None => {
251 return Err(Error::BadResponse(
252 "Could not read columns header".to_string(),
253 ));
254 }
255 }
256 }
257 }
258}