1use std::str::FromStr;
2
3use indexmap::IndexMap;
4use tokio::io::{AsyncReadExt, AsyncWriteExt};
5
6use super::block_info::BlockInfo;
7use super::protocol::DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION;
8use crate::deserialize::ClickHouseNativeDeserializer;
9use crate::formats::protocol_data::ProtocolData;
10use crate::formats::{DeserializerState, SerializerState};
11use crate::io::{ClickHouseBytesRead, ClickHouseBytesWrite, ClickHouseRead, ClickHouseWrite};
12use crate::native::values::Value;
13use crate::prelude::*;
14use crate::serialize::ClickHouseNativeSerializer;
15use crate::{Error, Result, Row, Type};
16
17#[derive(Debug, Clone, Default)]
18pub struct Block {
20 pub info: BlockInfo,
22 pub rows: u64,
24 pub column_types: Vec<(String, Type)>,
26 pub column_data: Vec<Value>,
29}
30
31pub struct BlockRowValueIter<'a, I>
33where
34 I: Iterator<Item = Value>,
35{
36 column_data: Vec<(&'a str, &'a Type, I)>,
37}
38
39impl<'a, I> Iterator for BlockRowValueIter<'a, I>
40where
41 I: Iterator<Item = Value>,
42{
43 type Item = Vec<(&'a str, &'a Type, Value)>;
44
45 fn next(&mut self) -> Option<Self::Item> {
46 if self.column_data.is_empty() {
47 return None;
48 }
49 let mut out = Vec::new();
50 for (name, type_, pop) in &mut self.column_data {
51 out.push((*name, *type_, pop.next()?));
52 }
53 Some(out)
54 }
55}
56
57impl Block {
58 pub fn take_iter_rows(&mut self) -> BlockRowValueIter<'_, impl Iterator<Item = Value>> {
60 #[allow(clippy::cast_possible_truncation)]
61 let rows = self.rows as usize;
62 let mut column_data = std::mem::take(&mut self.column_data);
63 let mut out = Vec::with_capacity(rows);
64 for (name, type_) in &self.column_types {
65 let mut column = Vec::with_capacity(rows);
66 let column_slice = column_data.drain(..rows);
67 column.extend(column_slice);
68 out.push((&**name, type_.strip_low_cardinality(), column.into_iter()));
69 }
70 BlockRowValueIter { column_data: out }
71 }
72
73 pub fn estimate_size(&self) -> usize {
75 let mut size = 16; #[allow(clippy::cast_possible_truncation)]
78 let rows = self.rows as usize;
79
80 for (name, type_) in &self.column_types {
81 size += name.len() + type_.to_string().len() + 10; size += rows * type_.estimate_capacity();
86 }
87
88 size * 6 / 5
90 }
91
92 pub fn from_rows<T: Row>(rows: Vec<T>, schema: Vec<(String, Type)>) -> Result<Self> {
99 let row_len = rows.len();
100 let row_col_len = schema.len() * rows.len();
101
102 let mut columns = schema
103 .iter()
104 .map(|(name, _)| (name.to_string(), Vec::with_capacity(rows.len())))
105 .collect::<IndexMap<String, Vec<_>>>();
106
107 rows.into_iter()
108 .enumerate()
109 .map(|(i, x)| {
110 x.serialize_row(&schema)
111 .inspect_err(|error| error!(?error, "serialize error during insert (ROW {i})"))
112 .map(|r| (i, r))
113 })
114 .try_for_each(|result| -> Result<()> {
115 let (i, x) = result?;
116 for (key, value) in x {
117 let type_ = &schema
118 .iter()
119 .find(|(n, _)| n == &*key)
120 .ok_or_else(|| {
121 Error::Protocol(format!(
122 "missing type for data in row {i}, column: {key}"
123 ))
124 })?
125 .1;
126 type_.validate_value(&value).inspect_err(|error| {
127 tracing::error!(
128 ?error,
129 ?value,
130 ?key,
131 ?type_,
132 "Value validation failed for row {i}"
133 );
134 })?;
135 let column = columns.get_mut(key.as_ref()).ok_or(Error::Protocol(format!(
136 "missing column for data in row {i}, column: {key}"
137 )))?;
138 column.push(value);
139 }
140 Ok(())
141 })?;
142
143 let mut column_data = Vec::with_capacity(row_col_len);
144
145 for (_, mut values) in columns.drain(..) {
147 column_data.append(&mut values);
148 }
149
150 Ok(Block {
151 info: BlockInfo::default(),
152 rows: row_len as u64,
153 column_types: schema,
154 column_data,
155 })
156 }
157}
158
159impl ProtocolData<Self, ()> for Block {
160 type Options = ();
161
162 async fn write_async<W: ClickHouseWrite>(
163 mut self,
164 writer: &mut W,
165 revision: u64,
166 _header: Option<&[(String, Type)]>,
167 _options: (),
168 ) -> Result<()> {
169 if revision > 0 {
170 self.info.write_async(writer).await?;
171 }
172
173 let columns = self.column_types.len();
174
175 #[allow(clippy::cast_possible_truncation)]
176 let rows = self.rows as usize;
177
178 writer.write_var_uint(columns as u64).await?;
179 writer.write_var_uint(self.rows).await?;
180
181 for (name, type_) in self.column_types {
182 let mut values = Vec::with_capacity(rows);
183 values.extend(self.column_data.drain(..rows));
184
185 if values.len() != rows {
186 return Err(Error::Protocol(format!(
187 "row and column length mismatch. {} != {}",
188 values.len(),
189 rows
190 )));
191 }
192
193 writer.write_string(&name).await?;
195 writer.write_string(type_.to_string()).await?;
196
197 if self.rows > 0 {
198 if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
199 writer.write_u8(0).await?;
200 }
201
202 let mut state = SerializerState::default();
203 type_.serialize_prefix_async(writer, &mut state).await?;
204 type_.serialize_column(values, writer, &mut state).await?;
205 }
206 }
207 Ok(())
208 }
209
210 fn write<W: ClickHouseBytesWrite>(
211 mut self,
212 writer: &mut W,
213 revision: u64,
214 _header: Option<&[(String, Type)]>,
215 _options: (),
216 ) -> Result<()> {
217 if revision > 0 {
218 self.info.write(writer)?;
219 }
220
221 let columns = self.column_types.len();
222
223 #[allow(clippy::cast_possible_truncation)]
224 let rows = self.rows as usize;
225
226 writer.put_var_uint(columns as u64)?;
227 writer.put_var_uint(self.rows)?;
228
229 for (name, type_) in self.column_types {
230 let mut values = Vec::with_capacity(rows);
231 values.extend(self.column_data.drain(..rows));
232
233 if values.len() != rows {
234 return Err(Error::Protocol(format!(
235 "row and column length mismatch. {} != {}",
236 values.len(),
237 rows
238 )));
239 }
240
241 writer.put_string(&name)?;
243 writer.put_string(type_.to_string())?;
244
245 if self.rows > 0 {
246 if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
247 writer.put_u8(0);
248 }
249
250 let mut state = SerializerState::default();
251 type_.serialize_prefix(writer, &mut state);
252 type_.serialize_column_sync(values, writer, &mut state)?;
253 }
254 }
255 Ok(())
256 }
257
258 async fn read_async<R: ClickHouseRead>(
259 reader: &mut R,
260 revision: u64,
261 _options: (),
262 state: &mut DeserializerState,
263 ) -> Result<Self> {
264 let info =
265 if revision > 0 { BlockInfo::read_async(reader).await? } else { BlockInfo::default() };
266
267 #[allow(clippy::cast_possible_truncation)]
268 let columns = reader.read_var_uint().await? as usize;
269 let rows = reader.read_var_uint().await?;
270
271 let mut block = Block {
272 info,
273 rows,
274 column_types: Vec::with_capacity(columns),
275 column_data: Vec::with_capacity(columns),
276 };
277
278 for i in 0..columns {
279 let name = reader
280 .read_utf8_string()
281 .await
282 .inspect_err(|e| error!("reading column name (index {i}): {e}"))?;
283
284 let type_name = reader
285 .read_utf8_string()
286 .await
287 .inspect_err(|e| error!("reading column type (name {name}): {e}"))?;
288
289 let mut _has_custom_serialization = false;
291 if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
292 _has_custom_serialization = reader.read_u8().await? != 0;
293 }
294
295 let type_ = Type::from_str(&type_name).inspect_err(|error| {
296 error!(?error, "Type deserialize failed: name={name}, type={type_name}");
297 })?;
298
299 let mut row_data = if rows > 0 {
300 type_.deserialize_prefix_async(reader, state).await?;
301
302 #[allow(clippy::cast_possible_truncation)]
303 type_
304 .deserialize_column(reader, rows as usize, state)
305 .await
306 .inspect_err(|e| error!("deserialize (name {name}): {e}"))?
307 } else {
308 vec![]
309 };
310
311 block.column_types.push((name, type_));
312 block.column_data.append(&mut row_data);
313 }
314
315 Ok(block)
316 }
317
318 fn read<R: ClickHouseBytesRead + 'static>(
319 reader: &mut R,
320 revision: u64,
321 _options: (),
322 state: &mut DeserializerState,
323 ) -> Result<Self> {
324 let info = if revision > 0 { BlockInfo::read(reader)? } else { BlockInfo::default() };
325
326 #[allow(clippy::cast_possible_truncation)]
327 let columns = reader.try_get_var_uint()? as usize;
328 let rows = reader.try_get_var_uint()?;
329
330 let mut block = Block {
331 info,
332 rows,
333 column_types: Vec::with_capacity(columns),
334 column_data: Vec::with_capacity(columns),
335 };
336
337 for i in 0..columns {
338 let name = String::from_utf8(
339 reader
340 .try_get_string()
341 .inspect_err(|e| error!("reading column name (index {i}): {e}"))?
342 .to_vec(),
343 )?;
344
345 let type_name = String::from_utf8(
346 reader
347 .try_get_string()
348 .inspect_err(|e| error!("reading column type (name {name}): {e}"))?
349 .to_vec(),
350 )?;
351
352 let mut _has_custom_serialization = false;
354 if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION {
355 _has_custom_serialization = reader.try_get_u8()? != 0;
356 }
357
358 let type_ = Type::from_str(&type_name).inspect_err(|error| {
359 error!(?error, "Type deserialize failed: name={name}, type={type_name}");
360 })?;
361
362 #[allow(clippy::cast_possible_truncation)]
363 let mut row_data = if rows > 0 {
364 type_.deserialize_prefix(reader)?;
365 type_
366 .deserialize_column_sync(reader, rows as usize, state)
367 .inspect_err(|e| error!("deserialize (name {name}): {e}"))?
368 } else {
369 vec![]
370 };
371
372 block.column_types.push((name, type_));
373 block.column_data.append(&mut row_data);
374 }
375
376 Ok(block)
377 }
378}