clickhouse/cursors/
bytes.rs1use crate::{cursors::RawCursor, error::Result, response::Response};
2use bytes::{Buf, Bytes, BytesMut};
3use std::{
4 io::Result as IoResult,
5 pin::Pin,
6 task::{Context, Poll, ready},
7};
8use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
9
10pub struct BytesCursor {
36 raw: RawCursor,
37 bytes: Bytes,
38}
39
40impl BytesCursor {
43 pub(crate) fn new(response: Response) -> Self {
44 Self {
45 raw: RawCursor::new(response),
46 bytes: Bytes::default(),
47 }
48 }
49
50 pub async fn next(&mut self) -> Result<Option<Bytes>> {
56 assert!(
57 self.bytes.is_empty(),
58 "mixing `BytesCursor::next()` and `AsyncRead` API methods is not allowed"
59 );
60
61 self.raw.next().await
62 }
63
64 pub async fn collect(&mut self) -> Result<Bytes> {
71 let mut chunks = Vec::new();
72 let mut total_len = 0;
73
74 while let Some(chunk) = self.next().await? {
75 total_len += chunk.len();
76 chunks.push(chunk);
77 }
78
79 if chunks.len() == 1 {
81 return Ok(chunks.pop().unwrap());
82 }
83
84 let mut collected = BytesMut::with_capacity(total_len);
85 for chunk in chunks {
86 collected.extend_from_slice(&chunk);
87 }
88 debug_assert_eq!(collected.capacity(), total_len);
89
90 Ok(collected.freeze())
91 }
92
93 #[cold]
94 fn poll_refill(&mut self, cx: &mut Context<'_>) -> Poll<IoResult<bool>> {
95 debug_assert_eq!(self.bytes.len(), 0);
96
97 while self.bytes.is_empty() {
101 match ready!(self.raw.poll_next(cx)?) {
102 Some(chunk) => self.bytes = chunk,
103 None => return Poll::Ready(Ok(false)),
104 }
105 }
106
107 Poll::Ready(Ok(true))
108 }
109
110 #[inline]
116 pub fn received_bytes(&self) -> u64 {
117 self.raw.received_bytes()
118 }
119
120 #[inline]
123 pub fn decoded_bytes(&self) -> u64 {
124 self.raw.decoded_bytes()
125 }
126}
127
128impl AsyncRead for BytesCursor {
129 #[inline]
130 fn poll_read(
131 mut self: Pin<&mut Self>,
132 cx: &mut Context<'_>,
133 buf: &mut ReadBuf<'_>,
134 ) -> Poll<IoResult<()>> {
135 while buf.remaining() > 0 {
136 if self.bytes.is_empty() && !ready!(self.poll_refill(cx)?) {
137 break;
138 }
139
140 let len = self.bytes.len().min(buf.remaining());
141 let bytes = self.bytes.slice(..len);
142 buf.put_slice(&bytes[0..len]);
143 self.bytes.advance(len);
144 }
145
146 Poll::Ready(Ok(()))
147 }
148}
149
150impl AsyncBufRead for BytesCursor {
151 #[inline]
152 fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<&[u8]>> {
153 if self.bytes.is_empty() {
154 ready!(self.poll_refill(cx)?);
155 }
156
157 Poll::Ready(Ok(&self.get_mut().bytes))
158 }
159
160 #[inline]
161 fn consume(mut self: Pin<&mut Self>, amt: usize) {
162 assert!(
163 amt <= self.bytes.len(),
164 "invalid `AsyncBufRead::consume` usage"
165 );
166 self.bytes.advance(amt);
167 }
168}
169
170#[cfg(feature = "futures03")]
171impl futures_util::AsyncRead for BytesCursor {
172 #[inline]
173 fn poll_read(
174 self: Pin<&mut Self>,
175 cx: &mut Context<'_>,
176 buf: &mut [u8],
177 ) -> Poll<IoResult<usize>> {
178 let mut buf = ReadBuf::new(buf);
179 ready!(AsyncRead::poll_read(self, cx, &mut buf)?);
180 Poll::Ready(Ok(buf.filled().len()))
181 }
182}
183
184#[cfg(feature = "futures03")]
185impl futures_util::AsyncBufRead for BytesCursor {
186 #[inline]
187 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<&[u8]>> {
188 AsyncBufRead::poll_fill_buf(self, cx)
189 }
190
191 #[inline]
192 fn consume(self: Pin<&mut Self>, amt: usize) {
193 AsyncBufRead::consume(self, amt);
194 }
195}
196
197#[cfg(feature = "futures03")]
198impl futures_util::stream::Stream for BytesCursor {
199 type Item = crate::error::Result<bytes::Bytes>;
200
201 #[inline]
202 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
203 assert!(
204 self.bytes.is_empty(),
205 "mixing `Stream` and `AsyncRead` API methods is not allowed"
206 );
207
208 self.raw.poll_next(cx).map(Result::transpose)
209 }
210}
211
212#[cfg(feature = "futures03")]
213impl futures_util::stream::FusedStream for BytesCursor {
214 #[inline]
215 fn is_terminated(&self) -> bool {
216 self.bytes.is_empty() && self.raw.is_terminated()
217 }
218}