Skip to main content

clickhouse/cursors/
row.rs

1#[cfg(feature = "futures03")]
2use crate::RowOwned;
3use crate::row_metadata::RowMetadata;
4use crate::{
5    RowRead,
6    bytes_ext::BytesExt,
7    cursors::RawCursor,
8    error::{Error, Result},
9    response::Response,
10    rowbinary,
11};
12use bytes::Buf;
13use clickhouse_types::error::TypesError;
14use clickhouse_types::parse_rbwnat_columns_header;
15use polonius_the_crab::prelude::*;
16use std::marker::PhantomData;
17use std::pin::Pin;
18use std::task::{Context, Poll, ready};
19
20/// A cursor that emits rows deserialized as structures from RowBinary.
21#[must_use]
22pub struct RowCursor<T> {
23    raw: RawCursor,
24    bytes: BytesExt,
25    validation: bool,
26    /// [`None`] until the first call to [`RowCursor::next()`],
27    /// as [`RowCursor::new`] is not `async`, so it loads lazily.
28    row_metadata: Option<RowMetadata>,
29    _marker: PhantomData<fn() -> T>,
30}
31
32impl<T> RowCursor<T> {
33    pub(crate) fn new(response: Response, validation: bool) -> Self {
34        Self {
35            _marker: PhantomData,
36            raw: RawCursor::new(response),
37            bytes: BytesExt::default(),
38            row_metadata: None,
39            validation,
40        }
41    }
42
43    #[cold]
44    #[inline(never)]
45    fn poll_read_columns(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>
46    where
47        T: RowRead,
48    {
49        loop {
50            if self.bytes.remaining() > 0 {
51                let mut slice = self.bytes.slice();
52
53                // Can't pass `&mut self.bytes` because the parsing may partially consume the buffer
54                match parse_rbwnat_columns_header(&mut slice) {
55                    Ok(columns) if !columns.is_empty() => {
56                        self.bytes.set_remaining(slice.len());
57                        let row_metadata = RowMetadata::new_for_cursor::<T>(columns)?;
58                        self.row_metadata = Some(row_metadata);
59                        return Poll::Ready(Ok(()));
60                    }
61                    Ok(_) => {
62                        // This does not panic, as it could be a network issue
63                        // or a malformed response from the server or LB,
64                        // and a simple retry might help in certain cases.
65                        return Poll::Ready(Err(Error::BadResponse(
66                            "Expected at least one column in the header".to_string(),
67                        )));
68                    }
69                    Err(TypesError::NotEnoughData(_)) => {}
70                    Err(err) => {
71                        return Poll::Ready(Err(Error::InvalidColumnsHeader(err.into())));
72                    }
73                }
74            }
75            match ready!(self.raw.poll_next(cx))? {
76                Some(chunk) => self.bytes.extend(chunk),
77                None if self.row_metadata.is_none() => {
78                    // Similar to the other BadResponse branch above
79                    return Poll::Ready(Err(Error::BadResponse(
80                        "Could not read columns header".to_string(),
81                    )));
82                }
83                // if the result set is empty, there is only the columns header
84                None => return Poll::Ready(Ok(())),
85            }
86        }
87    }
88
89    /// Emits the next row.
90    ///
91    /// The result is unspecified if it's called after `Err` is returned.
92    ///
93    /// # Cancel safety
94    ///
95    /// This method is cancellation safe.
96    pub async fn next(&mut self) -> Result<Option<T::Value<'_>>>
97    where
98        T: RowRead,
99    {
100        Next::new(self).await
101    }
102
103    #[inline]
104    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<T::Value<'_>>>>
105    where
106        T: RowRead,
107    {
108        if self.validation && self.row_metadata.is_none() {
109            ready!(self.poll_read_columns(cx))?;
110            debug_assert!(self.row_metadata.is_some());
111        }
112
113        let mut bytes = &mut self.bytes;
114
115        loop {
116            polonius!(|bytes| -> Poll<Result<Option<T::Value<'polonius>>>> {
117                if bytes.remaining() > 0 {
118                    let mut slice = bytes.slice();
119                    let result = rowbinary::deserialize_row::<T::Value<'_>>(
120                        &mut slice,
121                        self.row_metadata.as_ref(),
122                    );
123
124                    match result {
125                        Ok(value) => {
126                            bytes.set_remaining(slice.len());
127                            polonius_return!(Poll::Ready(Ok(Some(value))))
128                        }
129                        Err(Error::NotEnoughData) => {}
130                        Err(err) => polonius_return!(Poll::Ready(Err(err))),
131                    }
132                }
133            });
134
135            match ready!(self.raw.poll_next(cx))? {
136                Some(chunk) => bytes.extend(chunk),
137                None if bytes.remaining() > 0 => {
138                    // If some data is left, we have an incomplete row in the buffer.
139                    // This is usually a schema mismatch on the client side.
140                    return Poll::Ready(Err(Error::NotEnoughData));
141                }
142                None => return Poll::Ready(Ok(None)),
143            }
144        }
145    }
146
147    /// Returns the total size in bytes received from the CH server since
148    /// the cursor was created.
149    ///
150    /// This method counts only size without HTTP headers for now.
151    /// It can be changed in the future without notice.
152    #[inline]
153    pub fn received_bytes(&self) -> u64 {
154        self.raw.received_bytes()
155    }
156
157    /// Returns the total size in bytes decompressed since the cursor was created.
158    #[inline]
159    pub fn decoded_bytes(&self) -> u64 {
160        self.raw.decoded_bytes()
161    }
162}
163
164#[cfg(feature = "futures03")]
165impl<T> futures_util::stream::Stream for RowCursor<T>
166where
167    T: RowOwned + RowRead,
168{
169    type Item = Result<T>;
170
171    fn poll_next(
172        self: std::pin::Pin<&mut Self>,
173        cx: &mut std::task::Context<'_>,
174    ) -> Poll<Option<Self::Item>> {
175        Self::poll_next(self.get_mut(), cx).map(Result::transpose)
176    }
177}
178
179struct Next<'a, T> {
180    cursor: Option<&'a mut RowCursor<T>>,
181}
182
183impl<'a, T> Next<'a, T> {
184    fn new(cursor: &'a mut RowCursor<T>) -> Self {
185        Self {
186            cursor: Some(cursor),
187        }
188    }
189}
190
191impl<'a, T> std::future::Future for Next<'a, T>
192where
193    T: RowRead,
194{
195    type Output = Result<Option<T::Value<'a>>>;
196
197    #[inline]
198    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199        // Temporarily take the cursor out in order for `cursor.poll_next` to return a value with
200        // the correct lifetime `'a` rather than the unnamed lifetime of `&mut self`.
201        let mut cursor = self.cursor.take().expect("Future polled after completion");
202
203        polonius!(|cursor| -> Poll<Result<Option<T::Value<'polonius>>>> {
204            match cursor.poll_next(cx) {
205                Poll::Ready(value) => polonius_return!(Poll::Ready(value)),
206                Poll::Pending => {}
207            }
208        });
209
210        self.cursor = Some(cursor);
211        Poll::Pending
212    }
213}