clickhouse_arrow/native/
protocol.rs

1use std::str::FromStr;
2
3use strum::AsRefStr;
4use uuid::Uuid;
5
6use super::block::Block;
7use super::error_codes::map_exception_to_error;
8use super::progress::Progress;
9use crate::prelude::*;
10use crate::{Error, FxIndexMap, Result, ServerError};
11
12pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_INFO: u64 = 54032;
13pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE: u64 = 54058;
14pub(crate) const DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO: u64 = 54060;
15// pub(crate) const DBMS_MIN_REVISION_WITH_TABLES_STATUS: u64 = 54226;
16// pub(crate) const DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE: u64 = 54337;
17pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME: u64 = 54372;
18pub(crate) const DBMS_MIN_REVISION_WITH_VERSION_PATCH: u64 = 54401;
19pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_LOGS: u64 = 54406;
20// pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA: u64 = 54415;
21// pub(crate) const DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD: u64 =
22// 54431; pub(crate) const DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA: u64 = 54410;
23// pub(crate) const DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE: u64 = 54405;
24pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO: u64 = 54420;
25pub(crate) const DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS: u64 = 54429;
26pub(crate) const DBMS_MIN_REVISION_WITH_OPENTELEMETRY: u64 = 54442;
27pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET: u64 = 54441;
28// pub(crate) const DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO: u64 = 54443;
29// pub(crate) const DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO: u64 = 54447;
30pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH: u64 = 54448;
31
32pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME: u64 = 54449;
33// pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS: u64 = 54451;
34pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS: u64 = 54453;
35pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION: u64 = 54454;
36pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT: u64 = 54456;
37pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM: u64 = 54458;
38pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY: u64 = 54458;
39pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS: u64 = 54459;
40pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS: u64 = 54460;
41pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES: u64 = 54461;
42pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2: u64 = 54462;
43pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_TOTAL_BYTES_IN_PROGRESS: u64 = 54463;
44// pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES: u64 = 54464;
45// pub(crate) const DBMS_MIN_REVISION_WITH_SPARSE_SERIALIZATION: u64 = 54465;
46// pub(crate) const DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION: u64 = 54466;
47/// Send read-only flag for Replicated tables as well
48// pub(crate) const DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK: u64 = 54467;
49// pub(crate) const DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE: u64 = 54468;
50pub(crate) const DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION: u64 = 54469;
51pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS: u64 = 54470;
52pub(crate) const DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL: u64 = 54471;
53/// Push externally granted roles to other nodes
54pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES: u64 = 54472;
55// TODO: Implement other types of json deserialization
56// pub(crate) const DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION: u64 = 54473;
57pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_SETTINGS: u64 = 54474;
58pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS: u64 = 54475;
59pub(crate) const DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER: u64 = 54476;
60pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_PLAN_SERIALIZATION: u64 = 54477;
61// pub(crate) const DBMS_MIN_REVISON_WITH_PARALLEL_BLOCK_MARSHALLING: u64 = 54478;
62// Current
63pub(crate) const DBMS_MIN_REVISION_WITH_VERSIONED_CLUSTER_FUNCTION_PROTOCOL: u64 = 54479;
64
65// Active revision
66pub(crate) const DBMS_TCP_PROTOCOL_VERSION: u64 =
67    DBMS_MIN_REVISION_WITH_VERSIONED_CLUSTER_FUNCTION_PROTOCOL;
68
69pub(crate) const DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION: u64 = 4;
70
71// Max size of string over native
72pub(crate) const MAX_STRING_SIZE: usize = 1 << 30;
73
74#[repr(u64)]
75#[derive(Clone, Copy, Debug)]
76#[expect(unused)]
77pub(crate) enum QueryProcessingStage {
78    FetchColumns,
79    WithMergeableState,
80    Complete,
81    WithMergableStateAfterAggregation,
82}
83
84#[expect(unused)]
85#[repr(u64)]
86#[derive(Clone, Copy, Debug)]
87pub(crate) enum ClientPacketId {
88    Hello                     = 0, // Name, version, revision, default DB
89    // Query id, query settings, stage up to which the query must be executed, whether the
90    // compression must be used, query text (without data for INSERTs).
91    Query                     = 1,
92    Data                      = 2, // A block of data (compressed or not).
93    Cancel                    = 3, // Cancel the query execution.
94    Ping                      = 4, // Check that connection to the server is alive.
95    TablesStatusRequest       = 5, // Check status of tables on the server.
96    KeepAlive                 = 6, // Keep the connection alive
97    Scalar                    = 7, // A block of data (compressed or not).
98    IgnoredPartUUIDs          = 8, // List of unique parts ids to exclude from query processing
99    ReadTaskResponse          = 9, // A filename to read from s3 (used in s3Cluster)
100    //Coordinator's decision with a modified set of mark ranges allowed to read
101    MergeTreeReadTaskResponse = 10,
102    SSHChallengeRequest       = 11, // Request SSH signature challenge
103    SSHChallengeResponse      = 12, // Reply to SSH signature challenge
104    QueryPlan                 = 13, // Query plan
105}
106
107pub(crate) struct ClientHello {
108    pub(crate) default_database: String,
109    pub(crate) username:         String,
110    pub(crate) password:         String,
111}
112
113/// `ServerPacketId` is the packet id read from `ClickHouse`.
114///
115/// See `ServerPacket` for how the data is passed out from the tcp stream's reader
116#[repr(u64)]
117#[derive(Clone, Copy, Debug, AsRefStr)]
118pub(crate) enum ServerPacketId {
119    Hello                          = 0,
120    Data                           = 1,
121    Exception                      = 2,
122    Progress                       = 3,
123    Pong                           = 4,
124    EndOfStream                    = 5,
125    ProfileInfo                    = 6,
126    Totals                         = 7,
127    Extremes                       = 8,
128    TablesStatusResponse           = 9,
129    Log                            = 10,
130    TableColumns                   = 11,
131    PartUUIDs                      = 12,
132    ReadTaskRequest                = 13,
133    ProfileEvents                  = 14,
134    MergeTreeAllRangesAnnouncement = 15,
135    MergeTreeReadTaskRequest       = 16, // Request from a MergeTree replica to a coordinator
136    TimezoneUpdate                 = 17, // Receive server's (session-wide) default timezone
137    SSHChallenge                   = 18, // Return challenge for SSH signature signing
138}
139
140impl ServerPacketId {
141    pub(crate) fn from_u64(i: u64) -> Result<Self> {
142        Ok(match i {
143            0 => ServerPacketId::Hello,
144            1 => ServerPacketId::Data,
145            2 => ServerPacketId::Exception,
146            3 => ServerPacketId::Progress,
147            4 => ServerPacketId::Pong,
148            5 => ServerPacketId::EndOfStream,
149            6 => ServerPacketId::ProfileInfo,
150            7 => ServerPacketId::Totals,
151            8 => ServerPacketId::Extremes,
152            9 => ServerPacketId::TablesStatusResponse,
153            10 => ServerPacketId::Log,
154            11 => ServerPacketId::TableColumns,
155            12 => ServerPacketId::PartUUIDs,
156            13 => ServerPacketId::ReadTaskRequest,
157            14 => ServerPacketId::ProfileEvents,
158            15 => ServerPacketId::MergeTreeAllRangesAnnouncement,
159            16 => ServerPacketId::MergeTreeReadTaskRequest,
160            17 => ServerPacketId::TimezoneUpdate,
161            18 => ServerPacketId::SSHChallenge,
162            x => {
163                error!("invalid packet id from server: {}", x);
164                return Err(Error::Protocol(format!("Unknown packet id {i}")));
165            }
166        })
167    }
168}
169
170/// The deserialized information read from the tcp stream after a packet id has been received.
171#[expect(unused)]
172#[derive(Debug, Clone, AsRefStr)]
173pub(crate) enum ServerPacket<T = Block> {
174    Hello(ServerHello),
175    Header(ServerData<Block>),
176    Data(ServerData<T>),
177    QueryData(ServerData<T>),
178    Totals(ServerData<T>),
179    Extremes(ServerData<T>),
180    ProfileEvents(Vec<ProfileEvent>),
181    Log(Vec<LogData>),
182    Exception(ServerException),
183    Progress(Progress),
184    Pong,
185    EndOfStream,
186    ProfileInfo(ProfileInfo),
187    TablesStatusResponse(TablesStatusResponse),
188    TableColumns(TableColumns),
189    PartUUIDs(Vec<Uuid>),
190    ReadTaskRequest(Option<String>),
191    MergeTreeAllRangesAnnouncement,
192    MergeTreeReadTaskRequest,
193    TimezoneUpdate,
194    SSHChallenge,
195    Ignore(ServerPacketId), // Allows ignoring certain packets
196}
197
198#[derive(Debug, Clone, Default)]
199pub(crate) struct ServerHello {
200    #[expect(unused)]
201    pub(crate) server_name:      String,
202    #[expect(unused)]
203    pub(crate) version:          (u64, u64, u64),
204    pub(crate) revision_version: u64,
205    #[expect(unused)]
206    pub(crate) timezone:         Option<String>,
207    #[expect(unused)]
208    pub(crate) display_name:     Option<String>,
209    pub(crate) settings:         Option<Settings>,
210    pub(crate) chunked_send:     ChunkedProtocolMode,
211    pub(crate) chunked_recv:     ChunkedProtocolMode,
212}
213
214impl ServerHello {
215    pub(crate) fn supports_chunked_send(&self) -> bool {
216        matches!(
217            self.chunked_send,
218            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
219        )
220    }
221
222    pub(crate) fn supports_chunked_recv(&self) -> bool {
223        matches!(
224            self.chunked_recv,
225            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
226        )
227    }
228}
229
230#[derive(Debug, Clone)]
231pub(crate) struct ServerData<T> {
232    pub(crate) block: T,
233}
234
235#[derive(Debug, Clone)]
236pub(crate) struct ServerException {
237    pub(crate) code:        i32,
238    pub(crate) name:        String,
239    pub(crate) message:     String,
240    pub(crate) stack_trace: String,
241    #[expect(unused)]
242    pub(crate) has_nested:  bool,
243}
244
245impl ServerException {
246    pub(crate) fn emit(self) -> ServerError { map_exception_to_error(self) }
247}
248
249#[expect(unused)]
250#[derive(Debug, Clone)]
251pub(crate) struct ProfileInfo {
252    pub(crate) rows:                         u64,
253    pub(crate) blocks:                       u64,
254    pub(crate) bytes:                        u64,
255    pub(crate) applied_limit:                bool,
256    pub(crate) rows_before_limit:            u64,
257    pub(crate) calculated_rows_before_limit: bool,
258    pub(crate) applied_aggregation:          bool,
259    pub(crate) rows_before_aggregation:      u64,
260}
261
262#[expect(unused)]
263#[derive(Debug, Clone)]
264pub(crate) struct TableColumns {
265    pub(crate) name:        String,
266    pub(crate) description: String,
267}
268
269#[expect(unused)]
270#[derive(Debug, Clone)]
271pub(crate) struct TableStatus {
272    pub(crate) is_replicated:  bool,
273    pub(crate) absolute_delay: u32,
274}
275
276#[derive(Debug, Clone)]
277pub(crate) struct TablesStatusResponse {
278    pub(crate) database_tables: FxIndexMap<String, FxIndexMap<String, TableStatus>>,
279}
280
281#[derive(Debug, Clone, Default)]
282pub(crate) struct LogData {
283    pub(crate) time:       String,
284    pub(crate) time_micro: u32,
285    pub(crate) host_name:  String,
286    pub(crate) query_id:   String,
287    pub(crate) thread_id:  u64,
288    pub(crate) priority:   i8,
289    pub(crate) source:     String,
290    pub(crate) text:       String,
291}
292
293impl LogData {
294    fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
295        match name {
296            "time" => self.time = value.to_string(),
297            "time_micro" => self.time_micro = value.to_value(type_)?,
298            "host_name" => self.host_name = value.to_string(),
299            "query_id" => self.query_id = value.to_string(),
300            "thread_id" => self.thread_id = value.to_value(type_)?,
301            "priority" => self.priority = value.to_value(type_)?,
302            "source" => self.source = value.to_string(),
303            "text" => self.text = value.to_string(),
304            _ => {}
305        }
306        Ok(())
307    }
308
309    #[expect(clippy::cast_possible_truncation)]
310    pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
311        let rows = block.rows as usize;
312        let mut log_data = vec![Self::default(); rows];
313        let mut column_data = std::mem::take(&mut block.column_data);
314        for (name, type_) in &block.column_types {
315            for (i, value) in column_data.drain(..rows).enumerate() {
316                if let Some(log) = log_data.get_mut(i) {
317                    log.update_value(name, value, type_)?;
318                }
319            }
320        }
321        Ok(log_data)
322    }
323}
324
325/// Emitted by `ClickHouse` during operations.
326#[derive(Debug, Clone, Default)]
327pub struct ProfileEvent {
328    pub(crate) host_name:    String,
329    pub(crate) current_time: String,
330    pub(crate) thread_id:    u64,
331    pub(crate) type_code:    i8,
332    pub(crate) name:         String,
333    pub(crate) value:        i64,
334}
335
336impl ProfileEvent {
337    fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
338        match name {
339            "host_name" => self.host_name = value.to_string(),
340            "current_time" => self.current_time = value.to_string(),
341            "thread_id" => self.thread_id = value.to_value(type_)?,
342            "type_code" => self.type_code = value.to_value(type_)?,
343            "name" => self.name = value.to_string(),
344            "value" => self.value = value.to_value(type_)?,
345            _ => {}
346        }
347        Ok(())
348    }
349
350    #[expect(clippy::cast_possible_truncation)]
351    pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
352        let rows = block.rows as usize;
353        let mut profile_events = vec![Self::default(); rows];
354        let mut column_data = std::mem::take(&mut block.column_data);
355        for (name, type_) in &block.column_types {
356            for (i, value) in column_data.drain(..rows).enumerate() {
357                if let Some(profile) = profile_events.get_mut(i) {
358                    profile.update_value(name, value, type_).inspect_err(|error| {
359                        error!(?error, "profile event update failed");
360                    })?;
361                }
362            }
363        }
364        Ok(profile_events)
365    }
366}
367
368#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash, AsRefStr)]
369#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
370pub enum ChunkedProtocolMode {
371    #[default]
372    #[strum(serialize = "chunked_optional")]
373    ChunkedOptional,
374    #[strum(serialize = "chunked")]
375    Chunked,
376    #[strum(serialize = "notchunked_optional")]
377    NotChunkedOptional,
378    #[strum(serialize = "notchunked")]
379    NotChunked,
380}
381
382impl ChunkedProtocolMode {
383    /// Negotiates chunked protocol between client and server (based on C++ `is_chunked` function)
384    pub(crate) fn negotiate(
385        server_mode: ChunkedProtocolMode,
386        client_mode: ChunkedProtocolMode,
387        direction: &str,
388    ) -> Result<ChunkedProtocolMode> {
389        let server_chunked = matches!(
390            server_mode,
391            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
392        );
393        let server_optional = matches!(
394            server_mode,
395            ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
396        );
397        let client_chunked = matches!(
398            client_mode,
399            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
400        );
401        let client_optional = matches!(
402            client_mode,
403            ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
404        );
405        let result_chunked = if server_optional {
406            client_chunked
407        } else if client_optional {
408            server_chunked
409        } else if client_chunked != server_chunked {
410            return Err(Error::Protocol(format!(
411                "Incompatible protocol: {} set to {}, server requires {}",
412                direction,
413                if client_chunked { "chunked" } else { "notchunked" },
414                if server_chunked { "chunked" } else { "notchunked" }
415            )));
416        } else {
417            server_chunked
418        };
419
420        Ok(if result_chunked {
421            ChunkedProtocolMode::Chunked
422        } else {
423            ChunkedProtocolMode::NotChunked
424        })
425    }
426}
427
428impl FromStr for ChunkedProtocolMode {
429    type Err = Error;
430
431    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
432        Ok(match s {
433            "chunked" => Self::Chunked,
434            "chunked_optional" => Self::ChunkedOptional,
435            "notchunked" => Self::NotChunked,
436            "notchunked_optional" => Self::NotChunkedOptional,
437            _ => {
438                return Err(Error::Protocol(format!(
439                    "Unexpected value for chunked protocol mode: {s}"
440                )));
441            }
442        })
443    }
444}
445
446#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash)]
447#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
448pub enum CompressionMethod {
449    None,
450    #[default]
451    LZ4,
452    ZSTD,
453}
454
455impl CompressionMethod {
456    pub(crate) fn byte(self) -> u8 {
457        match self {
458            CompressionMethod::None => 0x02,
459            CompressionMethod::LZ4 => 0x82,
460            CompressionMethod::ZSTD => 0x90,
461        }
462    }
463}
464
465impl From<&str> for CompressionMethod {
466    fn from(value: &str) -> Self {
467        match value {
468            "lz4" | "LZ4" => CompressionMethod::LZ4,
469            "zstd" | "ZSTD" => CompressionMethod::ZSTD,
470            _ => CompressionMethod::None,
471        }
472    }
473}
474
475impl std::fmt::Display for CompressionMethod {
476    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477        match self {
478            CompressionMethod::None => write!(f, "None"),
479            CompressionMethod::LZ4 => write!(f, "LZ4"),
480            CompressionMethod::ZSTD => write!(f, "ZSTD"),
481        }
482    }
483}
484
485impl FromStr for CompressionMethod {
486    type Err = String;
487
488    fn from_str(s: &str) -> Result<Self, Self::Err> {
489        let method = CompressionMethod::from(s);
490        if matches!(method, CompressionMethod::None) {
491            return Err(format!("Invalid compression method: {s}"));
492        }
493
494        Ok(method)
495    }
496}
497
498impl AsRef<str> for CompressionMethod {
499    fn as_ref(&self) -> &str {
500        match self {
501            CompressionMethod::None => "None",
502            CompressionMethod::LZ4 => "LZ4",
503            CompressionMethod::ZSTD => "ZSTD",
504        }
505    }
506}