Skip to main content

clickhouse/cursors/
bytes.rs

1use crate::{cursors::RawCursor, error::Result, response::Response};
2use bytes::{Buf, Bytes, BytesMut};
3use std::{
4    io::Result as IoResult,
5    pin::Pin,
6    task::{Context, Poll, ready},
7};
8use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
9
10/// A cursor over raw bytes of the response returned by [`Query::fetch_bytes`].
11///
12/// Unlike [`RowCursor`] which emits rows deserialized as structures from
13/// RowBinary, this cursor emits raw bytes without deserialization.
14///
15/// # Integration
16///
17/// Additionally to [`BytesCursor::next`] and [`BytesCursor::collect`],
18/// this cursor implements:
19/// * [`AsyncRead`] and [`AsyncBufRead`] for `tokio`-based ecosystem.
20/// * [`futures_util::Stream`], [`futures_util::AsyncRead`] and
21///   [`futures_util::AsyncBufRead`] for `futures`-based ecosystem.
22///   (requires the `futures03` feature)
23///
24/// For instance, if the requested format emits each row on a newline
25/// (e.g. `JSONEachRow`, `CSV`, `TSV`, etc.), the cursor can be read line by
26/// line using `AsyncBufReadExt::lines`. Note that this method
27/// produces a new `String` for each line, so it's not the most performant way
28/// to iterate.
29///
30/// Note: methods of these traits use [`std::io::Error`] for errors.
31/// To get an original error from this crate, use `From` conversion.
32///
33/// [`RowCursor`]: crate::query::RowCursor
34/// [`Query::fetch_bytes`]: crate::query::Query::fetch_bytes
35pub struct BytesCursor {
36    raw: RawCursor,
37    bytes: Bytes,
38}
39
40// TODO: what if any next/poll_* called AFTER error returned?
41
42impl BytesCursor {
43    pub(crate) fn new(response: Response) -> Self {
44        Self {
45            raw: RawCursor::new(response),
46            bytes: Bytes::default(),
47        }
48    }
49
50    /// Emits the next bytes chunk.
51    ///
52    /// # Cancel safety
53    ///
54    /// This method is cancellation safe.
55    pub async fn next(&mut self) -> Result<Option<Bytes>> {
56        assert!(
57            self.bytes.is_empty(),
58            "mixing `BytesCursor::next()` and `AsyncRead` API methods is not allowed"
59        );
60
61        self.raw.next().await
62    }
63
64    /// Collects the whole response into a single [`Bytes`].
65    ///
66    /// # Cancel safety
67    ///
68    /// This method is NOT cancellation safe.
69    /// If cancelled, already collected bytes are lost.
70    pub async fn collect(&mut self) -> Result<Bytes> {
71        let mut chunks = Vec::new();
72        let mut total_len = 0;
73
74        while let Some(chunk) = self.next().await? {
75            total_len += chunk.len();
76            chunks.push(chunk);
77        }
78
79        // The whole response is in a single chunk.
80        if chunks.len() == 1 {
81            return Ok(chunks.pop().unwrap());
82        }
83
84        let mut collected = BytesMut::with_capacity(total_len);
85        for chunk in chunks {
86            collected.extend_from_slice(&chunk);
87        }
88        debug_assert_eq!(collected.capacity(), total_len);
89
90        Ok(collected.freeze())
91    }
92
93    #[cold]
94    fn poll_refill(&mut self, cx: &mut Context<'_>) -> Poll<IoResult<bool>> {
95        debug_assert_eq!(self.bytes.len(), 0);
96
97        // Theoretically, `self.raw.poll_next(cx)` can return empty chunks.
98        // In this case, we should continue polling until we get a non-empty chunk or
99        // end of stream in order to avoid false positive `Ok(0)` in I/O traits.
100        while self.bytes.is_empty() {
101            match ready!(self.raw.poll_next(cx)?) {
102                Some(chunk) => self.bytes = chunk,
103                None => return Poll::Ready(Ok(false)),
104            }
105        }
106
107        Poll::Ready(Ok(true))
108    }
109
110    /// Returns the total size in bytes received from the CH server since
111    /// the cursor was created.
112    ///
113    /// This method counts only size without HTTP headers for now.
114    /// It can be changed in the future without notice.
115    #[inline]
116    pub fn received_bytes(&self) -> u64 {
117        self.raw.received_bytes()
118    }
119
120    /// Returns the total size in bytes decompressed since the cursor was
121    /// created.
122    #[inline]
123    pub fn decoded_bytes(&self) -> u64 {
124        self.raw.decoded_bytes()
125    }
126}
127
128impl AsyncRead for BytesCursor {
129    #[inline]
130    fn poll_read(
131        mut self: Pin<&mut Self>,
132        cx: &mut Context<'_>,
133        buf: &mut ReadBuf<'_>,
134    ) -> Poll<IoResult<()>> {
135        while buf.remaining() > 0 {
136            if self.bytes.is_empty() && !ready!(self.poll_refill(cx)?) {
137                break;
138            }
139
140            let len = self.bytes.len().min(buf.remaining());
141            let bytes = self.bytes.slice(..len);
142            buf.put_slice(&bytes[0..len]);
143            self.bytes.advance(len);
144        }
145
146        Poll::Ready(Ok(()))
147    }
148}
149
150impl AsyncBufRead for BytesCursor {
151    #[inline]
152    fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<&[u8]>> {
153        if self.bytes.is_empty() {
154            ready!(self.poll_refill(cx)?);
155        }
156
157        Poll::Ready(Ok(&self.get_mut().bytes))
158    }
159
160    #[inline]
161    fn consume(mut self: Pin<&mut Self>, amt: usize) {
162        assert!(
163            amt <= self.bytes.len(),
164            "invalid `AsyncBufRead::consume` usage"
165        );
166        self.bytes.advance(amt);
167    }
168}
169
170#[cfg(feature = "futures03")]
171impl futures_util::AsyncRead for BytesCursor {
172    #[inline]
173    fn poll_read(
174        self: Pin<&mut Self>,
175        cx: &mut Context<'_>,
176        buf: &mut [u8],
177    ) -> Poll<IoResult<usize>> {
178        let mut buf = ReadBuf::new(buf);
179        ready!(AsyncRead::poll_read(self, cx, &mut buf)?);
180        Poll::Ready(Ok(buf.filled().len()))
181    }
182}
183
184#[cfg(feature = "futures03")]
185impl futures_util::AsyncBufRead for BytesCursor {
186    #[inline]
187    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<&[u8]>> {
188        AsyncBufRead::poll_fill_buf(self, cx)
189    }
190
191    #[inline]
192    fn consume(self: Pin<&mut Self>, amt: usize) {
193        AsyncBufRead::consume(self, amt);
194    }
195}
196
197#[cfg(feature = "futures03")]
198impl futures_util::stream::Stream for BytesCursor {
199    type Item = crate::error::Result<bytes::Bytes>;
200
201    #[inline]
202    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203        assert!(
204            self.bytes.is_empty(),
205            "mixing `Stream` and `AsyncRead` API methods is not allowed"
206        );
207
208        self.raw.poll_next(cx).map(Result::transpose)
209    }
210}
211
212#[cfg(feature = "futures03")]
213impl futures_util::stream::FusedStream for BytesCursor {
214    #[inline]
215    fn is_terminated(&self) -> bool {
216        self.bytes.is_empty() && self.raw.is_terminated()
217    }
218}