Skip to main content

clickhouse/cursors/
data_row.rs

1use std::sync::Arc;
2
3#[cfg(feature = "arrow")]
4use sea_orm_arrow::arrow;
5
6use clickhouse_types::error::TypesError;
7use clickhouse_types::{DataTypeNode, parse_rbwnat_columns_header};
8
9use crate::{
10    bytes_ext::BytesExt,
11    cursors::RawCursor,
12    data_row::{DataRow, RowBatch},
13    error::{Error, Result},
14    response::Response,
15    rowbinary::value_de::decode_row,
16};
17
18/// A cursor that emits dynamically-typed [`DataRow`]s decoded from
19/// `RowBinaryWithNamesAndTypes`.
20///
21/// Obtain one via [`crate::query::Query::fetch_rows`].
22#[must_use]
23pub struct DataRowCursor {
24    raw: RawCursor,
25    bytes: BytesExt,
26    /// Column names, shared across all rows from this cursor.
27    columns: Option<Arc<[Arc<str>]>>,
28    /// Column types parsed once from the RBWNAT header.
29    column_types: Option<Arc<[DataTypeNode]>>,
30    #[cfg(feature = "arrow")]
31    arrow_schema: Option<Arc<arrow::datatypes::Schema>>,
32}
33
34impl DataRowCursor {
35    pub(crate) fn new(response: Response) -> Self {
36        Self {
37            raw: RawCursor::new(response),
38            bytes: BytesExt::default(),
39            columns: None,
40            column_types: None,
41            #[cfg(feature = "arrow")]
42            arrow_schema: None,
43        }
44    }
45
46    /// Returns column names after the header has been read (i.e., after the
47    /// first [`next`] call returns).
48    ///
49    /// [`next`]: DataRowCursor::next
50    pub fn columns(&self) -> Option<&[Arc<str>]> {
51        self.columns.as_deref()
52    }
53
54    /// Returns the total size in bytes received from the CH server since the
55    /// cursor was created.
56    ///
57    /// This counts only the payload size (no HTTP headers).
58    #[inline]
59    pub fn received_bytes(&self) -> u64 {
60        self.raw.received_bytes()
61    }
62
63    /// Returns the total size in bytes decompressed since the cursor was created.
64    #[inline]
65    pub fn decoded_bytes(&self) -> u64 {
66        self.raw.decoded_bytes()
67    }
68
69    /// Emits the next row.
70    ///
71    /// Returns `Ok(None)` when all rows have been consumed.
72    /// The result is unspecified if called after an `Err` is returned.
73    ///
74    /// # Cancel safety
75    ///
76    /// This method is cancellation safe.
77    pub async fn next(&mut self) -> Result<Option<DataRow>> {
78        if self.column_types.is_none() {
79            self.read_header().await?;
80        }
81
82        // Clone the Arcs once per call — two cheap pointer increments — so
83        // neither reference borrows `self` across the `.await` below.
84        let column_types = self
85            .column_types
86            .as_ref()
87            .expect("just initialised")
88            .clone();
89        let column_names = self.columns.as_ref().expect("just initialised").clone();
90
91        loop {
92            if self.bytes.remaining() > 0 {
93                let mut slice = self.bytes.slice();
94                match decode_row(&mut slice, &*column_types) {
95                    Ok(values) => {
96                        // `set_remaining` takes `&self` (interior Cell), so holding
97                        // the immutable `slice` borrow is fine here.
98                        self.bytes.set_remaining(slice.len());
99                        return Ok(Some(DataRow {
100                            column_names,
101                            column_types,
102                            values,
103                        }));
104                    }
105                    Err(Error::NotEnoughData) => {
106                        // Fall through to fetch more data below.
107                    }
108                    Err(err) => return Err(err),
109                }
110            }
111
112            // `slice` is dropped here, so the immutable borrow on `self.bytes`
113            // ends before we call `extend` (which needs `&mut self.bytes`).
114            match self.raw.next().await? {
115                Some(chunk) => self.bytes.extend(chunk),
116                None if self.bytes.remaining() > 0 => {
117                    // Partial row at EOF — usually a schema/type mismatch.
118                    return Err(Error::NotEnoughData);
119                }
120                None => return Ok(None),
121            }
122        }
123    }
124
125    /// Reads up to `max_rows` rows and returns them as a column-oriented [`RowBatch`].
126    ///
127    /// Returns `Ok(None)` when all rows have been consumed.
128    /// The result is unspecified if called after an `Err` is returned.
129    pub async fn next_batch(&mut self, max_rows: usize) -> Result<Option<RowBatch>> {
130        if self.column_types.is_none() {
131            self.read_header().await?;
132        }
133
134        let column_count = self.column_types.as_ref().expect("just initialised").len();
135
136        let mut column_data: Vec<Vec<sea_query::Value>> = (0..column_count)
137            .map(|_| Vec::with_capacity(max_rows))
138            .collect();
139        let mut num_rows = 0;
140
141        while num_rows < max_rows {
142            match self.next().await? {
143                Some(row) => {
144                    for (col, value) in column_data.iter_mut().zip(row.values) {
145                        col.push(value);
146                    }
147                    num_rows += 1;
148                }
149                None => break,
150            }
151        }
152
153        if num_rows == 0 {
154            return Ok(None);
155        }
156
157        let column_names = self.columns.as_ref().expect("header was read").clone();
158        let column_types = self.column_types.as_ref().expect("header was read").clone();
159        Ok(Some(RowBatch {
160            column_names,
161            column_types,
162            column_data,
163            num_rows,
164        }))
165    }
166
167    /// Reads up to `max_rows` rows and returns them as an Arrow [`RecordBatch`].
168    ///
169    /// The schema is derived from the `RowBinaryWithNamesAndTypes` header using
170    /// [`crate::arrow::schema::from_columns`] and is available on the returned
171    /// batch via [`RecordBatch::schema`].
172    ///
173    /// Returns `Ok(None)` when all rows have been consumed.
174    /// The result is unspecified if called after an `Err` is returned.
175    ///
176    /// [`RecordBatch`]: sea_orm_arrow::arrow::array::RecordBatch
177    #[cfg(feature = "arrow")]
178    pub async fn next_arrow_batch(
179        &mut self,
180        max_rows: usize,
181    ) -> Result<Option<sea_orm_arrow::arrow::array::RecordBatch>> {
182        use std::sync::Arc;
183
184        let batch = match self.next_batch(max_rows).await? {
185            Some(b) => b,
186            None => return Ok(None),
187        };
188
189        let schema = self
190            .arrow_schema
191            .as_ref()
192            .expect("header was read by next_batch")
193            .clone();
194
195        let columns = schema
196            .fields()
197            .iter()
198            .zip(batch.column_data.iter())
199            .map(
200                |(field, values): (&Arc<arrow::datatypes::Field>, &Vec<sea_query::Value>)| {
201                    sea_orm_arrow::values_to_arrow_array(values, field.data_type())
202                        .map_err(|e| crate::error::Error::Other(Box::new(e)))
203                },
204            )
205            .collect::<Result<Vec<_>>>()?;
206
207        sea_orm_arrow::arrow::array::RecordBatch::try_new(schema, columns)
208            .map_err(|e| crate::error::Error::Other(Box::new(e)))
209            .map(Some)
210    }
211
212    #[cold]
213    #[inline(never)]
214    async fn read_header(&mut self) -> Result<()> {
215        loop {
216            if self.bytes.remaining() > 0 {
217                let mut slice = self.bytes.slice();
218                match parse_rbwnat_columns_header(&mut slice) {
219                    Ok(cols) if !cols.is_empty() => {
220                        self.bytes.set_remaining(slice.len());
221                        #[cfg(feature = "arrow")]
222                        {
223                            self.arrow_schema =
224                                Some(Arc::new(crate::arrow::schema::from_columns(&cols)));
225                        }
226                        let columns: Arc<[Arc<str>]> =
227                            cols.iter().map(|c| Arc::from(c.name.as_str())).collect();
228                        let types: Arc<[DataTypeNode]> =
229                            cols.into_iter().map(|c| c.data_type).collect();
230                        self.columns = Some(columns);
231                        self.column_types = Some(types);
232                        return Ok(());
233                    }
234                    Ok(_) => {
235                        return Err(Error::BadResponse(
236                            "Expected at least one column in the header".to_string(),
237                        ));
238                    }
239                    Err(TypesError::NotEnoughData(_)) => {
240                        // Need more bytes; fall through to fetch.
241                    }
242                    Err(err) => {
243                        return Err(Error::InvalidColumnsHeader(err.into()));
244                    }
245                }
246            }
247
248            match self.raw.next().await? {
249                Some(chunk) => self.bytes.extend(chunk),
250                None => {
251                    return Err(Error::BadResponse(
252                        "Could not read columns header".to_string(),
253                    ));
254                }
255            }
256        }
257    }
258}