Skip to main content

clickhouse_native_client/io/
block_stream.rs

1use crate::{
2    block::{
3        Block,
4        BlockInfo,
5    },
6    column::ColumnRef,
7    compression::{
8        compress,
9        decompress,
10    },
11    connection::Connection,
12    io::buffer_utils,
13    protocol::CompressionMethod,
14    types::Type,
15    Error,
16    Result,
17};
18use bytes::{
19    Buf,
20    BufMut,
21    BytesMut,
22};
23use std::sync::Arc;
24use tracing::debug;
25
26/// Minimum revision constants
27const DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES: u64 = 50264;
28const DBMS_MIN_REVISION_WITH_BLOCK_INFO: u64 = 51903;
29const DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION: u64 = 54454;
30
31/// Create a column instance for the given type
32/// This is used internally by column types like Array and Nullable
33pub fn create_column(type_: &Type) -> Result<ColumnRef> {
34    use crate::column::{
35        array::ColumnArray,
36        date::{
37            ColumnDate,
38            ColumnDate32,
39            ColumnDateTime,
40            ColumnDateTime64,
41        },
42        decimal::ColumnDecimal,
43        enum_column::{
44            ColumnEnum16,
45            ColumnEnum8,
46        },
47        ipv4::ColumnIpv4,
48        ipv6::ColumnIpv6,
49        lowcardinality::ColumnLowCardinality,
50        map::ColumnMap,
51        nothing::ColumnNothing,
52        nullable::ColumnNullable,
53        numeric::*,
54        string::{
55            ColumnFixedString,
56            ColumnString,
57        },
58        uuid::ColumnUuid,
59    };
60
61    match type_ {
62        Type::Simple(code) => {
63            use crate::types::TypeCode;
64            match code {
65                TypeCode::UInt8 => Ok(Arc::new(ColumnUInt8::new())),
66                TypeCode::UInt16 => Ok(Arc::new(ColumnUInt16::new())),
67                TypeCode::UInt32 => Ok(Arc::new(ColumnUInt32::new())),
68                TypeCode::UInt64 => Ok(Arc::new(ColumnUInt64::new())),
69                TypeCode::UInt128 => Ok(Arc::new(ColumnUInt128::new())),
70                TypeCode::Int8 => Ok(Arc::new(ColumnInt8::new())),
71                TypeCode::Int16 => Ok(Arc::new(ColumnInt16::new())),
72                TypeCode::Int32 => Ok(Arc::new(ColumnInt32::new())),
73                TypeCode::Int64 => Ok(Arc::new(ColumnInt64::new())),
74                TypeCode::Int128 => Ok(Arc::new(ColumnInt128::new())),
75                TypeCode::Float32 => Ok(Arc::new(ColumnFloat32::new())),
76                TypeCode::Float64 => Ok(Arc::new(ColumnFloat64::new())),
77                TypeCode::String => {
78                    Ok(Arc::new(ColumnString::new(type_.clone())))
79                }
80                TypeCode::Date => Ok(Arc::new(ColumnDate::new(type_.clone()))),
81                TypeCode::Date32 => {
82                    Ok(Arc::new(ColumnDate32::new(type_.clone())))
83                }
84                TypeCode::UUID => Ok(Arc::new(ColumnUuid::new(type_.clone()))),
85                TypeCode::IPv4 => Ok(Arc::new(ColumnIpv4::new(type_.clone()))),
86                TypeCode::IPv6 => Ok(Arc::new(ColumnIpv6::new(type_.clone()))),
87                TypeCode::Void => {
88                    Ok(Arc::new(ColumnNothing::new(type_.clone())))
89                }
90                // Geo types are compound types built from Tuple and Array
91                // They use the same column implementation but preserve the geo
92                // type name
93                TypeCode::Point => {
94                    // Point is Tuple(Float64, Float64)
95                    let columns: Vec<ColumnRef> = vec![
96                        Arc::new(ColumnFloat64::new()) as ColumnRef,
97                        Arc::new(ColumnFloat64::new()) as ColumnRef,
98                    ];
99                    Ok(Arc::new(crate::column::ColumnTuple::new(
100                        type_.clone(),
101                        columns,
102                    )))
103                }
104                TypeCode::Ring => {
105                    // Ring is Array(Point) - manually create with Point nested
106                    // type
107                    let point_type = Type::Simple(TypeCode::Point);
108                    let nested = create_column(&point_type)?;
109                    Ok(Arc::new(ColumnArray::from_parts(
110                        type_.clone(),
111                        nested,
112                    )))
113                }
114                TypeCode::Polygon => {
115                    // Polygon is Array(Ring) - manually create with Ring
116                    // nested type
117                    let ring_type = Type::Simple(TypeCode::Ring);
118                    let nested = create_column(&ring_type)?;
119                    Ok(Arc::new(ColumnArray::from_parts(
120                        type_.clone(),
121                        nested,
122                    )))
123                }
124                TypeCode::MultiPolygon => {
125                    // MultiPolygon is Array(Polygon) - manually create with
126                    // Polygon nested type
127                    let polygon_type = Type::Simple(TypeCode::Polygon);
128                    let nested = create_column(&polygon_type)?;
129                    Ok(Arc::new(ColumnArray::from_parts(
130                        type_.clone(),
131                        nested,
132                    )))
133                }
134                _ => Err(Error::Protocol(format!(
135                    "Unsupported type: {}",
136                    type_.name()
137                ))),
138            }
139        }
140        Type::FixedString { .. } => {
141            Ok(Arc::new(ColumnFixedString::new(type_.clone())))
142        }
143        Type::DateTime { .. } => {
144            // Use specialized ColumnDateTime with timezone support
145            Ok(Arc::new(ColumnDateTime::new(type_.clone())))
146        }
147        Type::DateTime64 { .. } => {
148            // Use specialized ColumnDateTime64 with precision and timezone
149            Ok(Arc::new(ColumnDateTime64::new(type_.clone())))
150        }
151        Type::Enum8 { .. } => {
152            // Use specialized ColumnEnum8 with name-value mapping
153            Ok(Arc::new(ColumnEnum8::new(type_.clone())))
154        }
155        Type::Enum16 { .. } => {
156            // Use specialized ColumnEnum16 with name-value mapping
157            Ok(Arc::new(ColumnEnum16::new(type_.clone())))
158        }
159        Type::Decimal { .. } => {
160            // Use specialized ColumnDecimal with precision and scale
161            Ok(Arc::new(ColumnDecimal::new(type_.clone())))
162        }
163        Type::Nullable { .. } => {
164            Ok(Arc::new(ColumnNullable::new(type_.clone())))
165        }
166        Type::Array { .. } => Ok(Arc::new(ColumnArray::new(type_.clone()))),
167        Type::Map { .. } => Ok(Arc::new(ColumnMap::new(type_.clone()))),
168        Type::LowCardinality { .. } => {
169            Ok(Arc::new(ColumnLowCardinality::new(type_.clone())))
170        }
171        Type::Tuple { item_types } => {
172            // Create empty columns for each tuple element
173            let mut columns = Vec::new();
174            for item_type in item_types {
175                columns.push(create_column(item_type)?);
176            }
177            Ok(Arc::new(crate::column::ColumnTuple::new(
178                type_.clone(),
179                columns,
180            )))
181        }
182    }
183}
184
185/// Reader for blocks from network
186pub struct BlockReader {
187    server_revision: u64,
188    compression: Option<CompressionMethod>,
189}
190
191impl BlockReader {
192    /// Create a new block reader
193    pub fn new(server_revision: u64) -> Self {
194        Self { server_revision, compression: None }
195    }
196
197    /// Enable compression
198    pub fn with_compression(mut self, method: CompressionMethod) -> Self {
199        self.compression = Some(method);
200        self
201    }
202
203    /// Read and decompress a single compressed frame from the connection.
204    async fn read_compressed_frame(
205        &self,
206        conn: &mut Connection,
207    ) -> Result<bytes::Bytes> {
208        let checksum = conn.read_bytes(16).await?;
209        let method = conn.read_u8().await?;
210        let compressed_size = conn.read_u32().await? as usize;
211        let uncompressed_size = conn.read_u32().await?;
212
213        let compressed_data_len = compressed_size.saturating_sub(9);
214        let compressed_data = conn.read_bytes(compressed_data_len).await?;
215
216        let mut full_block =
217            BytesMut::with_capacity(16 + 9 + compressed_data_len);
218        full_block.extend_from_slice(&checksum);
219        full_block.put_u8(method);
220        full_block.put_u32_le(compressed_size as u32);
221        full_block.put_u32_le(uncompressed_size);
222        full_block.extend_from_slice(&compressed_data);
223
224        decompress(&full_block)
225    }
226
227    /// Read a block from the connection.
228    ///
229    /// For compressed connections, ClickHouse may split a single logical
230    /// block across multiple compressed frames (each frame ≤
231    /// max_compress_block_size, typically 1 MB). This method reads frames
232    /// until the accumulated decompressed data forms a complete block.
233    ///
234    /// Note: Caller is responsible for skipping temp table name if needed
235    /// (matches C++ ReadBlock / CompressedInput).
236    pub async fn read_block(&self, conn: &mut Connection) -> Result<Block> {
237        if self.compression.is_none() {
238            return self.read_block_direct(conn).await;
239        }
240
241        let mut accumulated: Vec<u8> = Vec::new();
242        const MAX_FRAMES: usize = 4096;
243
244        for _ in 0..MAX_FRAMES {
245            let frame = self.read_compressed_frame(conn).await?;
246            accumulated.extend_from_slice(&frame);
247
248            let mut slice: &[u8] = &accumulated;
249            match self.parse_block_from_buffer(&mut slice) {
250                Ok(block) => return Ok(block),
251                Err(e) => {
252                    let msg = e.to_string();
253                    let is_underflow = msg.contains("Not enough data")
254                        || msg.contains("Buffer underflow")
255                        || msg.contains("Unexpected end");
256                    if !is_underflow {
257                        return Err(e);
258                    }
259                }
260            }
261        }
262
263        Err(Error::Protocol(
264            "Compressed block exceeded maximum frame count".to_string(),
265        ))
266    }
267
268    /// Read block directly from connection (uncompressed)
269    async fn read_block_direct(&self, conn: &mut Connection) -> Result<Block> {
270        let mut block = Block::new();
271
272        // Read block info if supported
273        if self.server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
274            let info = self.read_block_info(conn).await?;
275            block.set_info(info);
276        }
277
278        // Read column count and row count
279        let num_columns = conn.read_varint().await? as usize;
280        let num_rows = conn.read_varint().await? as usize;
281
282        // Read each column
283        for _ in 0..num_columns {
284            let name = conn.read_string().await?;
285            let type_name = conn.read_string().await?;
286
287            // Check for custom serialization
288            if self.server_revision
289                >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION
290            {
291                let custom_len = conn.read_u8().await?;
292                if custom_len > 0 {
293                    return Err(Error::Protocol(
294                        "Custom serialization not supported".to_string(),
295                    ));
296                }
297            }
298
299            // Parse the type
300            let column_type = Type::parse(&type_name)?;
301
302            // Create column and load data
303            let column = self.create_column_by_type(&column_type)?;
304
305            if num_rows > 0 {
306                // Read column data directly from async stream
307                // For uncompressed blocks, we can read data type by type
308                self.load_column_data_async(conn, &column_type, num_rows)
309                    .await?;
310            }
311
312            block.append_column(name, column)?;
313        }
314
315        Ok(block)
316    }
317
318    /// Load column data from async connection (for uncompressed blocks)
319    fn load_column_data_async<'a>(
320        &'a self,
321        conn: &'a mut Connection,
322        type_: &'a Type,
323        num_rows: usize,
324    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + 'a>>
325    {
326        Box::pin(async move {
327            self.load_column_data_impl(conn, type_, num_rows).await
328        })
329    }
330
331    /// Implementation of load_column_data_async
332    async fn load_column_data_impl(
333        &self,
334        conn: &mut Connection,
335        type_: &Type,
336        num_rows: usize,
337    ) -> Result<()> {
338        use crate::types::TypeCode;
339
340        // Try to use the storage_size_bytes helper for fixed-size types
341        if let Some(size_per_row) = type_.storage_size_bytes() {
342            // Fixed-size type - read all rows at once
343            let _ = conn.read_bytes(num_rows * size_per_row).await?;
344            return Ok(());
345        }
346
347        // Handle variable-length and complex types
348        match type_ {
349            Type::Simple(TypeCode::String) => {
350                // String - variable length, read each string
351                for _ in 0..num_rows {
352                    let len = conn.read_varint().await? as usize;
353                    let _ = conn.read_bytes(len).await?;
354                }
355            }
356            Type::Nullable { nested_type } => {
357                // Read null mask first (one byte per row)
358                let _ = conn.read_bytes(num_rows).await?;
359                // Then read nested data (recursive call via boxed wrapper)
360                self.load_column_data_async(conn, nested_type, num_rows)
361                    .await?;
362            }
363            Type::Array { item_type } => {
364                // Array wire format:
365                // 1. Offsets array (UInt64 per row, cumulative counts)
366                // 2. Nested data (item_type × total_items)
367
368                if num_rows == 0 {
369                    return Ok(());
370                }
371
372                // Read offsets array (UInt64 per row)
373                let offsets_data = conn.read_bytes(num_rows * 8).await?;
374
375                // Parse the last offset to get total item count
376                // Offsets are cumulative, so last offset = total items
377                let last_offset_bytes =
378                    &offsets_data[offsets_data.len() - 8..];
379                let total_items = u64::from_le_bytes([
380                    last_offset_bytes[0],
381                    last_offset_bytes[1],
382                    last_offset_bytes[2],
383                    last_offset_bytes[3],
384                    last_offset_bytes[4],
385                    last_offset_bytes[5],
386                    last_offset_bytes[6],
387                    last_offset_bytes[7],
388                ]) as usize;
389
390                // Recursively read nested column data
391                if total_items > 0 {
392                    self.load_column_data_async(conn, item_type, total_items)
393                        .await?;
394                }
395            }
396            Type::Tuple { item_types } => {
397                // Tuple wire format: each element serialized sequentially
398                // Read each tuple element's column data
399                for item_type in item_types {
400                    self.load_column_data_async(conn, item_type, num_rows)
401                        .await?;
402                }
403            }
404            Type::Map { key_type, value_type } => {
405                // Map wire format is Array(Tuple(K, V))
406                // We read it as: offsets array + tuple data
407
408                if num_rows == 0 {
409                    return Ok(());
410                }
411
412                // Read offsets array (UInt64 per row)
413                let offsets_data = conn.read_bytes(num_rows * 8).await?;
414
415                // Parse the last offset to get total number of map entries
416                let last_offset_bytes =
417                    &offsets_data[offsets_data.len() - 8..];
418                let total_entries = u64::from_le_bytes([
419                    last_offset_bytes[0],
420                    last_offset_bytes[1],
421                    last_offset_bytes[2],
422                    last_offset_bytes[3],
423                    last_offset_bytes[4],
424                    last_offset_bytes[5],
425                    last_offset_bytes[6],
426                    last_offset_bytes[7],
427                ]) as usize;
428
429                // Read tuple data: key column + value column
430                if total_entries > 0 {
431                    // Read key column
432                    self.load_column_data_async(conn, key_type, total_entries)
433                        .await?;
434                    // Read value column
435                    self.load_column_data_async(
436                        conn,
437                        value_type,
438                        total_entries,
439                    )
440                    .await?;
441                }
442            }
443            Type::FixedString { size } => {
444                // FixedString - fixed size per row
445                let _ = conn.read_bytes(num_rows * size).await?;
446            }
447            _ => {
448                return Err(Error::Protocol(format!(
449                    "Uncompressed reading not implemented for complex type: {}",
450                    type_.name()
451                )));
452            }
453        }
454
455        Ok(())
456    }
457
458    /// Read block info
459    async fn read_block_info(
460        &self,
461        conn: &mut Connection,
462    ) -> Result<BlockInfo> {
463        let _num1 = conn.read_varint().await?;
464        let is_overflows = conn.read_u8().await?;
465        let _num2 = conn.read_varint().await?;
466        let bucket_num = conn.read_i32().await?;
467        let _num3 = conn.read_varint().await?;
468
469        Ok(BlockInfo { is_overflows, bucket_num })
470    }
471
472    /// Parse block from buffer (compressed data)
473    fn parse_block_from_buffer(&self, buffer: &mut &[u8]) -> Result<Block> {
474        let mut block = Block::new();
475
476        // Read block info if supported
477        if self.server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
478            let info = self.read_block_info_from_buffer(buffer)?;
479            block.set_info(info);
480        }
481
482        // Read column count and row count
483        let num_columns = buffer_utils::read_varint(buffer)? as usize;
484        let num_rows = buffer_utils::read_varint(buffer)? as usize;
485
486        // Read each column
487        for _ in 0..num_columns {
488            let name = buffer_utils::read_string(buffer)?;
489            let type_name = buffer_utils::read_string(buffer)?;
490
491            // Check for custom serialization
492            if self.server_revision
493                >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION
494            {
495                if buffer.is_empty() {
496                    return Err(Error::Protocol(
497                        "Unexpected end of block data".to_string(),
498                    ));
499                }
500                let custom_len = buffer[0];
501                buffer.advance(1);
502
503                if custom_len > 0 {
504                    return Err(Error::Protocol(
505                        "Custom serialization not supported".to_string(),
506                    ));
507                }
508            }
509
510            // Parse the type
511            let column_type = Type::parse(&type_name)?;
512
513            // Create column and load data
514            let mut column = self.create_column_by_type(&column_type)?;
515
516            if num_rows > 0 {
517                let column_mut =
518                    Arc::get_mut(&mut column).ok_or_else(|| {
519                        Error::Protocol("Column not mutable".to_string())
520                    })?;
521
522                // Load prefix data first (for LowCardinality, etc.)
523                column_mut.load_prefix(buffer, num_rows)?;
524
525                // Load column body data
526                column_mut.load_from_buffer(buffer, num_rows)?;
527            }
528
529            block.append_column(name, column)?;
530        }
531
532        Ok(block)
533    }
534
535    /// Read block info from buffer
536    fn read_block_info_from_buffer(
537        &self,
538        buffer: &mut &[u8],
539    ) -> Result<BlockInfo> {
540        let _num1 = buffer_utils::read_varint(buffer)?;
541
542        if buffer.is_empty() {
543            return Err(Error::Protocol(
544                "Unexpected end reading block info".to_string(),
545            ));
546        }
547        let is_overflows = buffer[0];
548        buffer.advance(1);
549
550        let _num2 = buffer_utils::read_varint(buffer)?;
551
552        if buffer.len() < 4 {
553            return Err(Error::Protocol(
554                "Unexpected end reading bucket_num".to_string(),
555            ));
556        }
557        let bucket_num =
558            i32::from_le_bytes([buffer[0], buffer[1], buffer[2], buffer[3]]);
559        buffer.advance(4);
560
561        let _num3 = buffer_utils::read_varint(buffer)?;
562
563        Ok(BlockInfo { is_overflows, bucket_num })
564    }
565
566    /// Create a column by type
567    fn create_column_by_type(&self, type_: &Type) -> Result<ColumnRef> {
568        use crate::column::{
569            array::ColumnArray,
570            date::{
571                ColumnDate,
572                ColumnDate32,
573                ColumnDateTime,
574                ColumnDateTime64,
575            },
576            decimal::ColumnDecimal,
577            enum_column::{
578                ColumnEnum16,
579                ColumnEnum8,
580            },
581            ipv4::ColumnIpv4,
582            ipv6::ColumnIpv6,
583            lowcardinality::ColumnLowCardinality,
584            map::ColumnMap,
585            nothing::ColumnNothing,
586            nullable::ColumnNullable,
587            numeric::*,
588            string::{
589                ColumnFixedString,
590                ColumnString,
591            },
592            uuid::ColumnUuid,
593        };
594
595        match type_ {
596            Type::Simple(code) => {
597                use crate::types::TypeCode;
598                match code {
599                    TypeCode::UInt8 => Ok(Arc::new(ColumnUInt8::new())),
600                    TypeCode::UInt16 => Ok(Arc::new(ColumnUInt16::new())),
601                    TypeCode::UInt32 => Ok(Arc::new(ColumnUInt32::new())),
602                    TypeCode::UInt64 => Ok(Arc::new(ColumnUInt64::new())),
603                    TypeCode::UInt128 => Ok(Arc::new(ColumnUInt128::new())),
604                    TypeCode::Int8 => Ok(Arc::new(ColumnInt8::new())),
605                    TypeCode::Int16 => Ok(Arc::new(ColumnInt16::new())),
606                    TypeCode::Int32 => Ok(Arc::new(ColumnInt32::new())),
607                    TypeCode::Int64 => Ok(Arc::new(ColumnInt64::new())),
608                    TypeCode::Int128 => Ok(Arc::new(ColumnInt128::new())),
609                    TypeCode::Float32 => Ok(Arc::new(ColumnFloat32::new())),
610                    TypeCode::Float64 => Ok(Arc::new(ColumnFloat64::new())),
611                    TypeCode::String => {
612                        Ok(Arc::new(ColumnString::new(type_.clone())))
613                    }
614                    TypeCode::Date => {
615                        Ok(Arc::new(ColumnDate::new(type_.clone())))
616                    }
617                    TypeCode::Date32 => {
618                        Ok(Arc::new(ColumnDate32::new(type_.clone())))
619                    }
620                    TypeCode::UUID => {
621                        Ok(Arc::new(ColumnUuid::new(type_.clone())))
622                    }
623                    TypeCode::IPv4 => {
624                        Ok(Arc::new(ColumnIpv4::new(type_.clone())))
625                    }
626                    TypeCode::IPv6 => {
627                        Ok(Arc::new(ColumnIpv6::new(type_.clone())))
628                    }
629                    TypeCode::Void => {
630                        Ok(Arc::new(ColumnNothing::new(type_.clone())))
631                    }
632                    _ => Err(Error::Protocol(format!(
633                        "Unsupported type: {}",
634                        type_.name()
635                    ))),
636                }
637            }
638            Type::FixedString { .. } => {
639                Ok(Arc::new(ColumnFixedString::new(type_.clone())))
640            }
641            Type::DateTime { .. } => {
642                // Use specialized ColumnDateTime with timezone support
643                Ok(Arc::new(ColumnDateTime::new(type_.clone())))
644            }
645            Type::DateTime64 { .. } => {
646                // Use specialized ColumnDateTime64 with precision and timezone
647                Ok(Arc::new(ColumnDateTime64::new(type_.clone())))
648            }
649            Type::Enum8 { .. } => {
650                // Use specialized ColumnEnum8 with name-value mapping
651                Ok(Arc::new(ColumnEnum8::new(type_.clone())))
652            }
653            Type::Enum16 { .. } => {
654                // Use specialized ColumnEnum16 with name-value mapping
655                Ok(Arc::new(ColumnEnum16::new(type_.clone())))
656            }
657            Type::Decimal { .. } => {
658                // Use specialized ColumnDecimal with precision and scale
659                Ok(Arc::new(ColumnDecimal::new(type_.clone())))
660            }
661            Type::Nullable { .. } => {
662                Ok(Arc::new(ColumnNullable::new(type_.clone())))
663            }
664            Type::Array { .. } => {
665                Ok(Arc::new(ColumnArray::new(type_.clone())))
666            }
667            Type::Map { .. } => Ok(Arc::new(ColumnMap::new(type_.clone()))),
668            Type::LowCardinality { .. } => {
669                Ok(Arc::new(ColumnLowCardinality::new(type_.clone())))
670            }
671            Type::Tuple { item_types } => {
672                // Create empty columns for each tuple element
673                let mut columns = Vec::new();
674                for item_type in item_types {
675                    columns.push(create_column(item_type)?);
676                }
677                Ok(Arc::new(crate::column::ColumnTuple::new(
678                    type_.clone(),
679                    columns,
680                )))
681            }
682        }
683    }
684}
685
686/// Writer for blocks to network
687pub struct BlockWriter {
688    server_revision: u64,
689    compression: Option<CompressionMethod>,
690}
691
692impl BlockWriter {
693    /// Create a new block writer
694    pub fn new(server_revision: u64) -> Self {
695        Self { server_revision, compression: None }
696    }
697
698    /// Enable compression
699    pub fn with_compression(mut self, method: CompressionMethod) -> Self {
700        self.compression = Some(method);
701        self
702    }
703
704    /// Write a block to the connection
705    pub async fn write_block(
706        &self,
707        conn: &mut Connection,
708        block: &Block,
709    ) -> Result<()> {
710        self.write_block_with_temp_table(conn, block, true).await
711    }
712
713    /// Write a block to the connection (with optional temp table name)
714    ///
715    /// If `write_temp_table_name` is true, writes an empty temp table name
716    /// before the block. For external tables, set to false since the table
717    /// name was already written.
718    pub async fn write_block_with_temp_table(
719        &self,
720        conn: &mut Connection,
721        block: &Block,
722        write_temp_table_name: bool,
723    ) -> Result<()> {
724        debug!(
725            "Writing block: {} columns, {} rows",
726            block.column_count(),
727            block.row_count()
728        );
729
730        // Optionally write temporary table name if protocol supports it
731        if write_temp_table_name
732            && self.server_revision >= DBMS_MIN_REVISION_WITH_TEMPORARY_TABLES
733        {
734            debug!("Writing empty temp table name");
735            conn.write_string("").await?;
736        }
737
738        // Serialize block to buffer
739        let mut buffer = BytesMut::new();
740        self.write_block_to_buffer(&mut buffer, block)?;
741        debug!("Block serialized to {} bytes", buffer.len());
742
743        // Compress if needed
744        if let Some(compression_method) = self.compression {
745            let compressed = compress(compression_method, &buffer)?;
746            debug!("Compressed to {} bytes (includes 16-byte checksum + 9-byte header)", compressed.len());
747            // Compressed data already includes checksum + header, write it
748            // directly
749            conn.write_bytes(&compressed).await?;
750        } else {
751            // Write uncompressed
752            debug!("Writing uncompressed block");
753            conn.write_bytes(&buffer).await?;
754        }
755
756        conn.flush().await?;
757        debug!("Block write complete");
758        Ok(())
759    }
760
761    /// Write block to buffer
762    fn write_block_to_buffer(
763        &self,
764        buffer: &mut BytesMut,
765        block: &Block,
766    ) -> Result<()> {
767        // Write block info if supported
768        if self.server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO {
769            buffer_utils::write_varint(buffer, 1);
770            buffer.put_u8(block.info().is_overflows);
771            buffer_utils::write_varint(buffer, 2);
772            buffer.put_i32_le(block.info().bucket_num);
773            buffer_utils::write_varint(buffer, 0);
774        }
775
776        // Write column count and row count
777        buffer_utils::write_varint(buffer, block.column_count() as u64);
778        buffer_utils::write_varint(buffer, block.row_count() as u64);
779
780        // Write each column
781        for (name, type_, column) in block.iter() {
782            buffer_utils::write_string(buffer, name);
783            buffer_utils::write_string(buffer, &type_.name());
784
785            // Custom serialization flag
786            if self.server_revision
787                >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION
788            {
789                buffer.put_u8(0); // No custom serialization
790            }
791
792            // Write column data (only if rows > 0)
793            if block.row_count() > 0 {
794                column.save_prefix(buffer)?; // Phase 1: Write prefix data (for LowCardinality, etc.)
795                column.save_to_buffer(buffer)?; // Phase 2: Write body data
796            }
797        }
798
799        Ok(())
800    }
801}
802
803// Helper functions - now using centralized buffer_utils
804// (Functions removed - using buffer_utils::{read_varint, write_varint,
805// read_string, write_string})
806
807#[cfg(test)]
808#[cfg_attr(coverage_nightly, coverage(off))]
809mod tests {
810    use super::*;
811    use crate::column::numeric::ColumnUInt64;
812
813    #[test]
814    fn test_block_writer_serialization() {
815        let mut block = Block::new();
816
817        let mut col = ColumnUInt64::new();
818        col.append(1);
819        col.append(2);
820        col.append(3);
821
822        block.append_column("id", Arc::new(col)).unwrap();
823
824        let writer = BlockWriter::new(54449);
825        let mut buffer = BytesMut::new();
826
827        writer.write_block_to_buffer(&mut buffer, &block).unwrap();
828
829        // Verify buffer is not empty
830        assert!(!buffer.is_empty());
831    }
832
833    #[test]
834    fn test_block_reader_parser() {
835        // Create a block
836        let mut block = Block::new();
837
838        let mut col = ColumnUInt64::new();
839        col.append(42);
840        col.append(100);
841
842        block.append_column("test_col", Arc::new(col)).unwrap();
843
844        // Serialize it
845        let writer = BlockWriter::new(54449);
846        let mut buffer = BytesMut::new();
847        writer.write_block_to_buffer(&mut buffer, &block).unwrap();
848
849        // Deserialize it
850        let reader = BlockReader::new(54449);
851        let mut read_buffer = &buffer[..];
852        let decoded_block =
853            reader.parse_block_from_buffer(&mut read_buffer).unwrap();
854
855        assert_eq!(decoded_block.column_count(), 1);
856        assert_eq!(decoded_block.row_count(), 2);
857        assert_eq!(decoded_block.column_name(0), Some("test_col"));
858    }
859
860    #[test]
861    fn test_block_roundtrip_multiple_columns() {
862        let mut block = Block::new();
863
864        let mut col1 = ColumnUInt64::new();
865        col1.append(1);
866        col1.append(2);
867
868        let mut col2 = ColumnUInt64::new();
869        col2.append(100);
870        col2.append(200);
871
872        block.append_column("id", Arc::new(col1)).unwrap();
873        block.append_column("value", Arc::new(col2)).unwrap();
874
875        // Serialize
876        let writer = BlockWriter::new(54449);
877        let mut buffer = BytesMut::new();
878        writer.write_block_to_buffer(&mut buffer, &block).unwrap();
879
880        // Deserialize
881        let reader = BlockReader::new(54449);
882        let mut read_buffer = &buffer[..];
883        let decoded =
884            reader.parse_block_from_buffer(&mut read_buffer).unwrap();
885
886        assert_eq!(decoded.column_count(), 2);
887        assert_eq!(decoded.row_count(), 2);
888    }
889}