1use crate::{
2 block::{
3 Block,
4 BlockInfo,
5 },
6 column::ColumnRef,
7 compression::{
8 compress,
9 decompress,
10 },
11 connection::Connection,
12 io::buffer_utils,
13 protocol::CompressionMethod,
14 types::Type,
15 Error,
16 Result,
17};
18use bytes::{
19 Buf,
20 BufMut,
21 BytesMut,
22};
23use std::sync::Arc;
24use tracing::debug;
25
26const DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES: u64 = 50264;
28const DBMS_MIN_REVISION_WITH_BLOCK_INFO: u64 = 51903;
29const DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION: u64 = 54454;
30
31pub fn create_column(type_: &Type) -> Result<ColumnRef> {
34 use crate::column::{
35 array::ColumnArray,
36 date::{
37 ColumnDate,
38 ColumnDate32,
39 ColumnDateTime,
40 ColumnDateTime64,
41 },
42 decimal::ColumnDecimal,
43 enum_column::{
44 ColumnEnum16,
45 ColumnEnum8,
46 },
47 ipv4::ColumnIpv4,
48 ipv6::ColumnIpv6,
49 lowcardinality::ColumnLowCardinality,
50 map::ColumnMap,
51 nothing::ColumnNothing,
52 nullable::ColumnNullable,
53 numeric::*,
54 string::{
55 ColumnFixedString,
56 ColumnString,
57 },
58 uuid::ColumnUuid,
59 };
60
61 match type_ {
62 Type::Simple(code) => {
63 use crate::types::TypeCode;
64 match code {
65 TypeCode::UInt8 => Ok(Arc::new(ColumnUInt8::new())),
66 TypeCode::UInt16 => Ok(Arc::new(ColumnUInt16::new())),
67 TypeCode::UInt32 => Ok(Arc::new(ColumnUInt32::new())),
68 TypeCode::UInt64 => Ok(Arc::new(ColumnUInt64::new())),
69 TypeCode::UInt128 => Ok(Arc::new(ColumnUInt128::new())),
70 TypeCode::Int8 => Ok(Arc::new(ColumnInt8::new())),
71 TypeCode::Int16 => Ok(Arc::new(ColumnInt16::new())),
72 TypeCode::Int32 => Ok(Arc::new(ColumnInt32::new())),
73 TypeCode::Int64 => Ok(Arc::new(ColumnInt64::new())),
74 TypeCode::Int128 => Ok(Arc::new(ColumnInt128::new())),
75 TypeCode::Float32 => Ok(Arc::new(ColumnFloat32::new())),
76 TypeCode::Float64 => Ok(Arc::new(ColumnFloat64::new())),
77 TypeCode::String => {
78 Ok(Arc::new(ColumnString::new(type_.clone())))
79 }
80 TypeCode::Date => Ok(Arc::new(ColumnDate::new(type_.clone()))),
81 TypeCode::Date32 => {
82 Ok(Arc::new(ColumnDate32::new(type_.clone())))
83 }
84 TypeCode::UUID => Ok(Arc::new(ColumnUuid::new(type_.clone()))),
85 TypeCode::IPv4 => Ok(Arc::new(ColumnIpv4::new(type_.clone()))),
86 TypeCode::IPv6 => Ok(Arc::new(ColumnIpv6::new(type_.clone()))),
87 TypeCode::Void => {
88 Ok(Arc::new(ColumnNothing::new(type_.clone())))
89 }
90 TypeCode::Point => {
94 let columns: Vec<ColumnRef> = vec![
96 Arc::new(ColumnFloat64::new()) as ColumnRef,
97 Arc::new(ColumnFloat64::new()) as ColumnRef,
98 ];
99 Ok(Arc::new(crate::column::ColumnTuple::new(
100 type_.clone(),
101 columns,
102 )))
103 }
104 TypeCode::Ring => {
105 let point_type = Type::Simple(TypeCode::Point);
108 let nested = create_column(&point_type)?;
109 Ok(Arc::new(ColumnArray::from_parts(
110 type_.clone(),
111 nested,
112 )))
113 }
114 TypeCode::Polygon => {
115 let ring_type = Type::Simple(TypeCode::Ring);
118 let nested = create_column(&ring_type)?;
119 Ok(Arc::new(ColumnArray::from_parts(
120 type_.clone(),
121 nested,
122 )))
123 }
124 TypeCode::MultiPolygon => {
125 let polygon_type = Type::Simple(TypeCode::Polygon);
128 let nested = create_column(&polygon_type)?;
129 Ok(Arc::new(ColumnArray::from_parts(
130 type_.clone(),
131 nested,
132 )))
133 }
134 _ => Err(Error::Protocol(format!(
135 "Unsupported type: {}",
136 type_.name()
137 ))),
138 }
139 }
140 Type::FixedString { .. } => {
141 Ok(Arc::new(ColumnFixedString::new(type_.clone())))
142 }
143 Type::DateTime { .. } => {
144 Ok(Arc::new(ColumnDateTime::new(type_.clone())))
146 }
147 Type::DateTime64 { .. } => {
148 Ok(Arc::new(ColumnDateTime64::new(type_.clone())))
150 }
151 Type::Enum8 { .. } => {
152 Ok(Arc::new(ColumnEnum8::new(type_.clone())))
154 }
155 Type::Enum16 { .. } => {
156 Ok(Arc::new(ColumnEnum16::new(type_.clone())))
158 }
159 Type::Decimal { .. } => {
160 Ok(Arc::new(ColumnDecimal::new(type_.clone())))
162 }
163 Type::Nullable { .. } => {
164 Ok(Arc::new(ColumnNullable::new(type_.clone())))
165 }
166 Type::Array { .. } => Ok(Arc::new(ColumnArray::new(type_.clone()))),
167 Type::Map { .. } => Ok(Arc::new(ColumnMap::new(type_.clone()))),
168 Type::LowCardinality { .. } => {
169 Ok(Arc::new(ColumnLowCardinality::new(type_.clone())))
170 }
171 Type::Tuple { item_types } => {
172 let mut columns = Vec::new();
174 for item_type in item_types {
175 columns.push(create_column(item_type)?);
176 }
177 Ok(Arc::new(crate::column::ColumnTuple::new(
178 type_.clone(),
179 columns,
180 )))
181 }
182 }
183}
184
185pub struct BlockReader {
187 server_revision: u64,
188 compression: Option<CompressionMethod>,
189}
190
191impl BlockReader {
192 pub fn new(server_revision: u64) -> Self {
194 Self { server_revision, compression: None }
195 }
196
197 pub fn with_compression(mut self, method: CompressionMethod) -> Self {
199 self.compression = Some(method);
200 self
201 }
202
203 async fn read_compressed_frame(
205 &self,
206 conn: &mut Connection,
207 ) -> Result<bytes::Bytes> {
208 let checksum = conn.read_bytes(16).await?;
209 let method = conn.read_u8().await?;
210 let compressed_size = conn.read_u32().await? as usize;
211 let uncompressed_size = conn.read_u32().await?;
212
213 let compressed_data_len = compressed_size.saturating_sub(9);
214 let compressed_data = conn.read_bytes(compressed_data_len).await?;
215
216 let mut full_block =
217 BytesMut::with_capacity(16 + 9 + compressed_data_len);
218 full_block.extend_from_slice(&checksum);
219 full_block.put_u8(method);
220 full_block.put_u32_le(compressed_size as u32);
221 full_block.put_u32_le(uncompressed_size);
222 full_block.extend_from_slice(&compressed_data);
223
224 decompress(&full_block)
225 }
226
227 pub async fn read_block(&self, conn: &mut Connection) -> Result<Block> {
237 if self.compression.is_none() {
238 return self.read_block_direct(conn).await;
239 }
240
241 let mut accumulated: Vec<u8> = Vec::new();
242 const MAX_FRAMES: usize = 4096;
243
244 for _ in 0..MAX_FRAMES {
245 let frame = self.read_compressed_frame(conn).await?;
246 accumulated.extend_from_slice(&frame);
247
248 let mut slice: &[u8] = &accumulated;
249 match self.parse_block_from_buffer(&mut slice) {
250 Ok(block) => return Ok(block),
251 Err(e) => {
252 let msg = e.to_string();
253 let is_underflow = msg.contains("Not enough data")
254 || msg.contains("Buffer underflow")
255 || msg.contains("Unexpected end");
256 if !is_underflow {
257 return Err(e);
258 }
259 }
260 }
261 }
262
263 Err(Error::Protocol(
264 "Compressed block exceeded maximum frame count".to_string(),
265 ))
266 }
267
268 async fn read_block_direct(&self, conn: &mut Connection) -> Result<Block> {
270 let mut block = Block::new();
271
272 if self.server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
274 let info = self.read_block_info(conn).await?;
275 block.set_info(info);
276 }
277
278 let num_columns = conn.read_varint().await? as usize;
280 let num_rows = conn.read_varint().await? as usize;
281
282 for _ in 0..num_columns {
284 let name = conn.read_string().await?;
285 let type_name = conn.read_string().await?;
286
287 if self.server_revision
289 >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION
290 {
291 let custom_len = conn.read_u8().await?;
292 if custom_len > 0 {
293 return Err(Error::Protocol(
294 "Custom serialization not supported".to_string(),
295 ));
296 }
297 }
298
299 let column_type = Type::parse(&type_name)?;
301
302 let column = self.create_column_by_type(&column_type)?;
304
305 if num_rows > 0 {
306 self.load_column_data_async(conn, &column_type, num_rows)
309 .await?;
310 }
311
312 block.append_column(name, column)?;
313 }
314
315 Ok(block)
316 }
317
318 fn load_column_data_async<'a>(
320 &'a self,
321 conn: &'a mut Connection,
322 type_: &'a Type,
323 num_rows: usize,
324 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>>
325 {
326 Box::pin(async move {
327 self.load_column_data_impl(conn, type_, num_rows).await
328 })
329 }
330
331 async fn load_column_data_impl(
333 &self,
334 conn: &mut Connection,
335 type_: &Type,
336 num_rows: usize,
337 ) -> Result<()> {
338 use crate::types::TypeCode;
339
340 if let Some(size_per_row) = type_.storage_size_bytes() {
342 let _ = conn.read_bytes(num_rows * size_per_row).await?;
344 return Ok(());
345 }
346
347 match type_ {
349 Type::Simple(TypeCode::String) => {
350 for _ in 0..num_rows {
352 let len = conn.read_varint().await? as usize;
353 let _ = conn.read_bytes(len).await?;
354 }
355 }
356 Type::Nullable { nested_type } => {
357 let _ = conn.read_bytes(num_rows).await?;
359 self.load_column_data_async(conn, nested_type, num_rows)
361 .await?;
362 }
363 Type::Array { item_type } => {
364 if num_rows == 0 {
369 return Ok(());
370 }
371
372 let offsets_data = conn.read_bytes(num_rows * 8).await?;
374
375 let last_offset_bytes =
378 &offsets_data[offsets_data.len() - 8..];
379 let total_items = u64::from_le_bytes([
380 last_offset_bytes[0],
381 last_offset_bytes[1],
382 last_offset_bytes[2],
383 last_offset_bytes[3],
384 last_offset_bytes[4],
385 last_offset_bytes[5],
386 last_offset_bytes[6],
387 last_offset_bytes[7],
388 ]) as usize;
389
390 if total_items > 0 {
392 self.load_column_data_async(conn, item_type, total_items)
393 .await?;
394 }
395 }
396 Type::Tuple { item_types } => {
397 for item_type in item_types {
400 self.load_column_data_async(conn, item_type, num_rows)
401 .await?;
402 }
403 }
404 Type::Map { key_type, value_type } => {
405 if num_rows == 0 {
409 return Ok(());
410 }
411
412 let offsets_data = conn.read_bytes(num_rows * 8).await?;
414
415 let last_offset_bytes =
417 &offsets_data[offsets_data.len() - 8..];
418 let total_entries = u64::from_le_bytes([
419 last_offset_bytes[0],
420 last_offset_bytes[1],
421 last_offset_bytes[2],
422 last_offset_bytes[3],
423 last_offset_bytes[4],
424 last_offset_bytes[5],
425 last_offset_bytes[6],
426 last_offset_bytes[7],
427 ]) as usize;
428
429 if total_entries > 0 {
431 self.load_column_data_async(conn, key_type, total_entries)
433 .await?;
434 self.load_column_data_async(
436 conn,
437 value_type,
438 total_entries,
439 )
440 .await?;
441 }
442 }
443 Type::FixedString { size } => {
444 let _ = conn.read_bytes(num_rows * size).await?;
446 }
447 _ => {
448 return Err(Error::Protocol(format!(
449 "Uncompressed reading not implemented for complex type: {}",
450 type_.name()
451 )));
452 }
453 }
454
455 Ok(())
456 }
457
458 async fn read_block_info(
460 &self,
461 conn: &mut Connection,
462 ) -> Result<BlockInfo> {
463 let _num1 = conn.read_varint().await?;
464 let is_overflows = conn.read_u8().await?;
465 let _num2 = conn.read_varint().await?;
466 let bucket_num = conn.read_i32().await?;
467 let _num3 = conn.read_varint().await?;
468
469 Ok(BlockInfo { is_overflows, bucket_num })
470 }
471
472 fn parse_block_from_buffer(&self, buffer: &mut &[u8]) -> Result<Block> {
474 let mut block = Block::new();
475
476 if self.server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
478 let info = self.read_block_info_from_buffer(buffer)?;
479 block.set_info(info);
480 }
481
482 let num_columns = buffer_utils::read_varint(buffer)? as usize;
484 let num_rows = buffer_utils::read_varint(buffer)? as usize;
485
486 for _ in 0..num_columns {
488 let name = buffer_utils::read_string(buffer)?;
489 let type_name = buffer_utils::read_string(buffer)?;
490
491 if self.server_revision
493 >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION
494 {
495 if buffer.is_empty() {
496 return Err(Error::Protocol(
497 "Unexpected end of block data".to_string(),
498 ));
499 }
500 let custom_len = buffer[0];
501 buffer.advance(1);
502
503 if custom_len > 0 {
504 return Err(Error::Protocol(
505 "Custom serialization not supported".to_string(),
506 ));
507 }
508 }
509
510 let column_type = Type::parse(&type_name)?;
512
513 let mut column = self.create_column_by_type(&column_type)?;
515
516 if num_rows > 0 {
517 let column_mut =
518 Arc::get_mut(&mut column).ok_or_else(|| {
519 Error::Protocol("Column not mutable".to_string())
520 })?;
521
522 column_mut.load_prefix(buffer, num_rows)?;
524
525 column_mut.load_from_buffer(buffer, num_rows)?;
527 }
528
529 block.append_column(name, column)?;
530 }
531
532 Ok(block)
533 }
534
535 fn read_block_info_from_buffer(
537 &self,
538 buffer: &mut &[u8],
539 ) -> Result<BlockInfo> {
540 let _num1 = buffer_utils::read_varint(buffer)?;
541
542 if buffer.is_empty() {
543 return Err(Error::Protocol(
544 "Unexpected end reading block info".to_string(),
545 ));
546 }
547 let is_overflows = buffer[0];
548 buffer.advance(1);
549
550 let _num2 = buffer_utils::read_varint(buffer)?;
551
552 if buffer.len() < 4 {
553 return Err(Error::Protocol(
554 "Unexpected end reading bucket_num".to_string(),
555 ));
556 }
557 let bucket_num =
558 i32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]);
559 buffer.advance(4);
560
561 let _num3 = buffer_utils::read_varint(buffer)?;
562
563 Ok(BlockInfo { is_overflows, bucket_num })
564 }
565
566 fn create_column_by_type(&self, type_: &Type) -> Result<ColumnRef> {
568 use crate::column::{
569 array::ColumnArray,
570 date::{
571 ColumnDate,
572 ColumnDate32,
573 ColumnDateTime,
574 ColumnDateTime64,
575 },
576 decimal::ColumnDecimal,
577 enum_column::{
578 ColumnEnum16,
579 ColumnEnum8,
580 },
581 ipv4::ColumnIpv4,
582 ipv6::ColumnIpv6,
583 lowcardinality::ColumnLowCardinality,
584 map::ColumnMap,
585 nothing::ColumnNothing,
586 nullable::ColumnNullable,
587 numeric::*,
588 string::{
589 ColumnFixedString,
590 ColumnString,
591 },
592 uuid::ColumnUuid,
593 };
594
595 match type_ {
596 Type::Simple(code) => {
597 use crate::types::TypeCode;
598 match code {
599 TypeCode::UInt8 => Ok(Arc::new(ColumnUInt8::new())),
600 TypeCode::UInt16 => Ok(Arc::new(ColumnUInt16::new())),
601 TypeCode::UInt32 => Ok(Arc::new(ColumnUInt32::new())),
602 TypeCode::UInt64 => Ok(Arc::new(ColumnUInt64::new())),
603 TypeCode::UInt128 => Ok(Arc::new(ColumnUInt128::new())),
604 TypeCode::Int8 => Ok(Arc::new(ColumnInt8::new())),
605 TypeCode::Int16 => Ok(Arc::new(ColumnInt16::new())),
606 TypeCode::Int32 => Ok(Arc::new(ColumnInt32::new())),
607 TypeCode::Int64 => Ok(Arc::new(ColumnInt64::new())),
608 TypeCode::Int128 => Ok(Arc::new(ColumnInt128::new())),
609 TypeCode::Float32 => Ok(Arc::new(ColumnFloat32::new())),
610 TypeCode::Float64 => Ok(Arc::new(ColumnFloat64::new())),
611 TypeCode::String => {
612 Ok(Arc::new(ColumnString::new(type_.clone())))
613 }
614 TypeCode::Date => {
615 Ok(Arc::new(ColumnDate::new(type_.clone())))
616 }
617 TypeCode::Date32 => {
618 Ok(Arc::new(ColumnDate32::new(type_.clone())))
619 }
620 TypeCode::UUID => {
621 Ok(Arc::new(ColumnUuid::new(type_.clone())))
622 }
623 TypeCode::IPv4 => {
624 Ok(Arc::new(ColumnIpv4::new(type_.clone())))
625 }
626 TypeCode::IPv6 => {
627 Ok(Arc::new(ColumnIpv6::new(type_.clone())))
628 }
629 TypeCode::Void => {
630 Ok(Arc::new(ColumnNothing::new(type_.clone())))
631 }
632 _ => Err(Error::Protocol(format!(
633 "Unsupported type: {}",
634 type_.name()
635 ))),
636 }
637 }
638 Type::FixedString { .. } => {
639 Ok(Arc::new(ColumnFixedString::new(type_.clone())))
640 }
641 Type::DateTime { .. } => {
642 Ok(Arc::new(ColumnDateTime::new(type_.clone())))
644 }
645 Type::DateTime64 { .. } => {
646 Ok(Arc::new(ColumnDateTime64::new(type_.clone())))
648 }
649 Type::Enum8 { .. } => {
650 Ok(Arc::new(ColumnEnum8::new(type_.clone())))
652 }
653 Type::Enum16 { .. } => {
654 Ok(Arc::new(ColumnEnum16::new(type_.clone())))
656 }
657 Type::Decimal { .. } => {
658 Ok(Arc::new(ColumnDecimal::new(type_.clone())))
660 }
661 Type::Nullable { .. } => {
662 Ok(Arc::new(ColumnNullable::new(type_.clone())))
663 }
664 Type::Array { .. } => {
665 Ok(Arc::new(ColumnArray::new(type_.clone())))
666 }
667 Type::Map { .. } => Ok(Arc::new(ColumnMap::new(type_.clone()))),
668 Type::LowCardinality { .. } => {
669 Ok(Arc::new(ColumnLowCardinality::new(type_.clone())))
670 }
671 Type::Tuple { item_types } => {
672 let mut columns = Vec::new();
674 for item_type in item_types {
675 columns.push(create_column(item_type)?);
676 }
677 Ok(Arc::new(crate::column::ColumnTuple::new(
678 type_.clone(),
679 columns,
680 )))
681 }
682 }
683 }
684}
685
686pub struct BlockWriter {
688 server_revision: u64,
689 compression: Option<CompressionMethod>,
690}
691
692impl BlockWriter {
693 pub fn new(server_revision: u64) -> Self {
695 Self { server_revision, compression: None }
696 }
697
698 pub fn with_compression(mut self, method: CompressionMethod) -> Self {
700 self.compression = Some(method);
701 self
702 }
703
704 pub async fn write_block(
706 &self,
707 conn: &mut Connection,
708 block: &Block,
709 ) -> Result<()> {
710 self.write_block_with_temp_table(conn, block, true).await
711 }
712
713 pub async fn write_block_with_temp_table(
719 &self,
720 conn: &mut Connection,
721 block: &Block,
722 write_temp_table_name: bool,
723 ) -> Result<()> {
724 debug!(
725 "Writing block: {} columns, {} rows",
726 block.column_count(),
727 block.row_count()
728 );
729
730 if write_temp_table_name
732 && self.server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES
733 {
734 debug!("Writing empty temp table name");
735 conn.write_string("").await?;
736 }
737
738 let mut buffer = BytesMut::new();
740 self.write_block_to_buffer(&mut buffer, block)?;
741 debug!("Block serialized to {} bytes", buffer.len());
742
743 if let Some(compression_method) = self.compression {
745 let compressed = compress(compression_method, &buffer)?;
746 debug!("Compressed to {} bytes (includes 16-byte checksum + 9-byte header)", compressed.len());
747 conn.write_bytes(&compressed).await?;
750 } else {
751 debug!("Writing uncompressed block");
753 conn.write_bytes(&buffer).await?;
754 }
755
756 conn.flush().await?;
757 debug!("Block write complete");
758 Ok(())
759 }
760
761 fn write_block_to_buffer(
763 &self,
764 buffer: &mut BytesMut,
765 block: &Block,
766 ) -> Result<()> {
767 if self.server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
769 buffer_utils::write_varint(buffer, 1);
770 buffer.put_u8(block.info().is_overflows);
771 buffer_utils::write_varint(buffer, 2);
772 buffer.put_i32_le(block.info().bucket_num);
773 buffer_utils::write_varint(buffer, 0);
774 }
775
776 buffer_utils::write_varint(buffer, block.column_count() as u64);
778 buffer_utils::write_varint(buffer, block.row_count() as u64);
779
780 for (name, type_, column) in block.iter() {
782 buffer_utils::write_string(buffer, name);
783 buffer_utils::write_string(buffer, &type_.name());
784
785 if self.server_revision
787 >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION
788 {
789 buffer.put_u8(0); }
791
792 if block.row_count() > 0 {
794 column.save_prefix(buffer)?; column.save_to_buffer(buffer)?; }
797 }
798
799 Ok(())
800 }
801}
802
803#[cfg(test)]
808#[cfg_attr(coverage_nightly, coverage(off))]
809mod tests {
810 use super::*;
811 use crate::column::numeric::ColumnUInt64;
812
813 #[test]
814 fn test_block_writer_serialization() {
815 let mut block = Block::new();
816
817 let mut col = ColumnUInt64::new();
818 col.append(1);
819 col.append(2);
820 col.append(3);
821
822 block.append_column("id", Arc::new(col)).unwrap();
823
824 let writer = BlockWriter::new(54449);
825 let mut buffer = BytesMut::new();
826
827 writer.write_block_to_buffer(&mut buffer, &block).unwrap();
828
829 assert!(!buffer.is_empty());
831 }
832
833 #[test]
834 fn test_block_reader_parser() {
835 let mut block = Block::new();
837
838 let mut col = ColumnUInt64::new();
839 col.append(42);
840 col.append(100);
841
842 block.append_column("test_col", Arc::new(col)).unwrap();
843
844 let writer = BlockWriter::new(54449);
846 let mut buffer = BytesMut::new();
847 writer.write_block_to_buffer(&mut buffer, &block).unwrap();
848
849 let reader = BlockReader::new(54449);
851 let mut read_buffer = &buffer[..];
852 let decoded_block =
853 reader.parse_block_from_buffer(&mut read_buffer).unwrap();
854
855 assert_eq!(decoded_block.column_count(), 1);
856 assert_eq!(decoded_block.row_count(), 2);
857 assert_eq!(decoded_block.column_name(0), Some("test_col"));
858 }
859
860 #[test]
861 fn test_block_roundtrip_multiple_columns() {
862 let mut block = Block::new();
863
864 let mut col1 = ColumnUInt64::new();
865 col1.append(1);
866 col1.append(2);
867
868 let mut col2 = ColumnUInt64::new();
869 col2.append(100);
870 col2.append(200);
871
872 block.append_column("id", Arc::new(col1)).unwrap();
873 block.append_column("value", Arc::new(col2)).unwrap();
874
875 let writer = BlockWriter::new(54449);
877 let mut buffer = BytesMut::new();
878 writer.write_block_to_buffer(&mut buffer, &block).unwrap();
879
880 let reader = BlockReader::new(54449);
882 let mut read_buffer = &buffer[..];
883 let decoded =
884 reader.parse_block_from_buffer(&mut read_buffer).unwrap();
885
886 assert_eq!(decoded.column_count(), 2);
887 assert_eq!(decoded.row_count(), 2);
888 }
889}