clickhouse/cursors/
row.rs1#[cfg(feature = "futures03")]
2use crate::RowOwned;
3use crate::row_metadata::RowMetadata;
4use crate::{
5 RowRead,
6 bytes_ext::BytesExt,
7 cursors::RawCursor,
8 error::{Error, Result},
9 response::Response,
10 rowbinary,
11};
12use bytes::Buf;
13use clickhouse_types::error::TypesError;
14use clickhouse_types::parse_rbwnat_columns_header;
15use polonius_the_crab::prelude::*;
16use std::marker::PhantomData;
17use std::pin::Pin;
18use std::task::{Context, Poll, ready};
19
20#[must_use]
22pub struct RowCursor<T> {
23 raw: RawCursor,
24 bytes: BytesExt,
25 validation: bool,
26 row_metadata: Option<RowMetadata>,
29 _marker: PhantomData<fn() -> T>,
30}
31
32impl<T> RowCursor<T> {
33 pub(crate) fn new(response: Response, validation: bool) -> Self {
34 Self {
35 _marker: PhantomData,
36 raw: RawCursor::new(response),
37 bytes: BytesExt::default(),
38 row_metadata: None,
39 validation,
40 }
41 }
42
43 #[cold]
44 #[inline(never)]
45 fn poll_read_columns(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>
46 where
47 T: RowRead,
48 {
49 loop {
50 if self.bytes.remaining() > 0 {
51 let mut slice = self.bytes.slice();
52
53 match parse_rbwnat_columns_header(&mut slice) {
55 Ok(columns) if !columns.is_empty() => {
56 self.bytes.set_remaining(slice.len());
57 let row_metadata = RowMetadata::new_for_cursor::<T>(columns)?;
58 self.row_metadata = Some(row_metadata);
59 return Poll::Ready(Ok(()));
60 }
61 Ok(_) => {
62 return Poll::Ready(Err(Error::BadResponse(
66 "Expected at least one column in the header".to_string(),
67 )));
68 }
69 Err(TypesError::NotEnoughData(_)) => {}
70 Err(err) => {
71 return Poll::Ready(Err(Error::InvalidColumnsHeader(err.into())));
72 }
73 }
74 }
75 match ready!(self.raw.poll_next(cx))? {
76 Some(chunk) => self.bytes.extend(chunk),
77 None if self.row_metadata.is_none() => {
78 return Poll::Ready(Err(Error::BadResponse(
80 "Could not read columns header".to_string(),
81 )));
82 }
83 None => return Poll::Ready(Ok(())),
85 }
86 }
87 }
88
89 pub async fn next(&mut self) -> Result<Option<T::Value<'_>>>
97 where
98 T: RowRead,
99 {
100 Next::new(self).await
101 }
102
103 #[inline]
104 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<T::Value<'_>>>>
105 where
106 T: RowRead,
107 {
108 if self.validation && self.row_metadata.is_none() {
109 ready!(self.poll_read_columns(cx))?;
110 debug_assert!(self.row_metadata.is_some());
111 }
112
113 let mut bytes = &mut self.bytes;
114
115 loop {
116 polonius!(|bytes| -> Poll<Result<Option<T::Value<'polonius>>>> {
117 if bytes.remaining() > 0 {
118 let mut slice = bytes.slice();
119 let result = rowbinary::deserialize_row::<T::Value<'_>>(
120 &mut slice,
121 self.row_metadata.as_ref(),
122 );
123
124 match result {
125 Ok(value) => {
126 bytes.set_remaining(slice.len());
127 polonius_return!(Poll::Ready(Ok(Some(value))))
128 }
129 Err(Error::NotEnoughData) => {}
130 Err(err) => polonius_return!(Poll::Ready(Err(err))),
131 }
132 }
133 });
134
135 match ready!(self.raw.poll_next(cx))? {
136 Some(chunk) => bytes.extend(chunk),
137 None if bytes.remaining() > 0 => {
138 return Poll::Ready(Err(Error::NotEnoughData));
141 }
142 None => return Poll::Ready(Ok(None)),
143 }
144 }
145 }
146
147 #[inline]
153 pub fn received_bytes(&self) -> u64 {
154 self.raw.received_bytes()
155 }
156
157 #[inline]
159 pub fn decoded_bytes(&self) -> u64 {
160 self.raw.decoded_bytes()
161 }
162}
163
164#[cfg(feature = "futures03")]
165impl<T> futures_util::stream::Stream for RowCursor<T>
166where
167 T: RowOwned + RowRead,
168{
169 type Item = Result<T>;
170
171 fn poll_next(
172 self: std::pin::Pin<&mut Self>,
173 cx: &mut std::task::Context<'_>,
174 ) -> Poll<Option<Self::Item>> {
175 Self::poll_next(self.get_mut(), cx).map(Result::transpose)
176 }
177}
178
179struct Next<'a, T> {
180 cursor: Option<&'a mut RowCursor<T>>,
181}
182
183impl<'a, T> Next<'a, T> {
184 fn new(cursor: &'a mut RowCursor<T>) -> Self {
185 Self {
186 cursor: Some(cursor),
187 }
188 }
189}
190
191impl<'a, T> std::future::Future for Next<'a, T>
192where
193 T: RowRead,
194{
195 type Output = Result<Option<T::Value<'a>>>;
196
197 #[inline]
198 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
199 let mut cursor = self.cursor.take().expect("Future polled after completion");
202
203 polonius!(|cursor| -> Poll<Result<Option<T::Value<'polonius>>>> {
204 match cursor.poll_next(cx) {
205 Poll::Ready(value) => polonius_return!(Poll::Ready(value)),
206 Poll::Pending => {}
207 }
208 });
209
210 self.cursor = Some(cursor);
211 Poll::Pending
212 }
213}