clickhouse_arrow/native/
protocol.rs

1use std::str::FromStr;
2
3use strum::AsRefStr;
4use uuid::Uuid;
5
6use super::block::Block;
7use super::error_codes::map_exception_to_error;
8use super::progress::Progress;
9use crate::prelude::*;
10use crate::{Error, FxIndexMap, Result, ServerError};
11
12pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_INFO: u64 = 54032;
13pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE: u64 = 54058;
14pub(crate) const DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO: u64 = 54060;
15// pub(crate) const DBMS_MIN_REVISION_WITH_TABLES_STATUS: u64 = 54226;
16// pub(crate) const DBMS_MIN_REVISION_WITH_TIME_ZONE_PARAMETER_IN_DATETIME_DATA_TYPE: u64 = 54337;
17pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME: u64 = 54372;
18pub(crate) const DBMS_MIN_REVISION_WITH_VERSION_PATCH: u64 = 54401;
19pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_LOGS: u64 = 54406;
20// pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA: u64 = 54415;
21// pub(crate) const DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD: u64 =
22// 54431; pub(crate) const DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA: u64 = 54410;
23// pub(crate) const DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE: u64 = 54405;
24pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO: u64 = 54420;
25pub(crate) const DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS: u64 = 54429;
26pub(crate) const DBMS_MIN_REVISION_WITH_OPENTELEMETRY: u64 = 54442;
27pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET: u64 = 54441;
28// pub(crate) const DBMS_MIN_REVISION_WITH_X_FORWARDED_FOR_IN_CLIENT_INFO: u64 = 54443;
29// pub(crate) const DBMS_MIN_REVISION_WITH_REFERER_IN_CLIENT_INFO: u64 = 54447;
30pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH: u64 = 54448;
31
32pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME: u64 = 54449;
33// pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS: u64 = 54451;
34pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS: u64 = 54453;
35pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION: u64 = 54454;
36pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT: u64 = 54456;
37pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM: u64 = 54458;
38pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY: u64 = 54458;
39pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS: u64 = 54459;
40pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS: u64 = 54460;
41pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES: u64 = 54461;
42pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2: u64 = 54462;
43pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_TOTAL_BYTES_IN_PROGRESS: u64 = 54463;
44// pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_TIMEZONE_UPDATES: u64 = 54464;
45// pub(crate) const DBMS_MIN_REVISION_WITH_SPARSE_SERIALIZATION: u64 = 54465;
46// pub(crate) const DBMS_MIN_REVISION_WITH_SSH_AUTHENTICATION: u64 = 54466;
47/// Send read-only flag for Replicated tables as well
48// pub(crate) const DBMS_MIN_REVISION_WITH_TABLE_READ_ONLY_CHECK: u64 = 54467;
49// pub(crate) const DBMS_MIN_REVISION_WITH_SYSTEM_KEYWORDS_TABLE: u64 = 54468;
50pub(crate) const DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION: u64 = 54469;
51pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS: u64 = 54470;
52pub(crate) const DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL: u64 = 54471;
53/// Push externally granted roles to other nodes
54pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES: u64 = 54472;
55// TODO: Implement other types of json deserialization
56// pub(crate) const DBMS_MIN_REVISION_WITH_V2_DYNAMIC_AND_JSON_SERIALIZATION: u64 = 54473;
57pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_SETTINGS: u64 = 54474;
58pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS: u64 = 54475;
59pub(crate) const DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER: u64 = 54476;
60// Current
61pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_PLAN_SERIALIZATION: u64 = 54477;
62
63// Active revision
64pub(crate) const DBMS_TCP_PROTOCOL_VERSION: u64 = DBMS_MIN_REVISION_WITH_QUERY_PLAN_SERIALIZATION;
65
66pub(crate) const DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION: u64 = 4;
67
68// Max size of string over native
69pub(crate) const MAX_STRING_SIZE: usize = 1 << 30;
70
71#[repr(u64)]
72#[derive(Clone, Copy, Debug)]
73#[expect(unused)]
74pub(crate) enum QueryProcessingStage {
75    FetchColumns,
76    WithMergeableState,
77    Complete,
78    WithMergableStateAfterAggregation,
79}
80
81#[expect(unused)]
82#[repr(u64)]
83#[derive(Clone, Copy, Debug)]
84pub(crate) enum ClientPacketId {
85    Hello                     = 0, // Name, version, revision, default DB
86    // Query id, query settings, stage up to which the query must be executed, whether the
87    // compression must be used, query text (without data for INSERTs).
88    Query                     = 1,
89    Data                      = 2, // A block of data (compressed or not).
90    Cancel                    = 3, // Cancel the query execution.
91    Ping                      = 4, // Check that connection to the server is alive.
92    TablesStatusRequest       = 5, // Check status of tables on the server.
93    KeepAlive                 = 6, // Keep the connection alive
94    Scalar                    = 7, // A block of data (compressed or not).
95    IgnoredPartUUIDs          = 8, // List of unique parts ids to exclude from query processing
96    ReadTaskResponse          = 9, // A filename to read from s3 (used in s3Cluster)
97    //Coordinator's decision with a modified set of mark ranges allowed to read
98    MergeTreeReadTaskResponse = 10,
99    SSHChallengeRequest       = 11, // Request SSH signature challenge
100    SSHChallengeResponse      = 12, // Reply to SSH signature challenge
101    QueryPlan                 = 13, // Query plan
102}
103
104pub(crate) struct ClientHello {
105    pub(crate) default_database: String,
106    pub(crate) username:         String,
107    pub(crate) password:         String,
108}
109
110/// `ServerPacketId` is the packet id read from `ClickHouse`.
111///
112/// See `ServerPacket` for how the data is passed out from the tcp stream's reader
113#[repr(u64)]
114#[derive(Clone, Copy, Debug, AsRefStr)]
115pub(crate) enum ServerPacketId {
116    Hello                          = 0,
117    Data                           = 1,
118    Exception                      = 2,
119    Progress                       = 3,
120    Pong                           = 4,
121    EndOfStream                    = 5,
122    ProfileInfo                    = 6,
123    Totals                         = 7,
124    Extremes                       = 8,
125    TablesStatusResponse           = 9,
126    Log                            = 10,
127    TableColumns                   = 11,
128    PartUUIDs                      = 12,
129    ReadTaskRequest                = 13,
130    ProfileEvents                  = 14,
131    MergeTreeAllRangesAnnouncement = 15,
132    MergeTreeReadTaskRequest       = 16, // Request from a MergeTree replica to a coordinator
133    TimezoneUpdate                 = 17, // Receive server's (session-wide) default timezone
134    SSHChallenge                   = 18, // Return challenge for SSH signature signing
135}
136
137impl ServerPacketId {
138    pub(crate) fn from_u64(i: u64) -> Result<Self> {
139        Ok(match i {
140            0 => ServerPacketId::Hello,
141            1 => ServerPacketId::Data,
142            2 => ServerPacketId::Exception,
143            3 => ServerPacketId::Progress,
144            4 => ServerPacketId::Pong,
145            5 => ServerPacketId::EndOfStream,
146            6 => ServerPacketId::ProfileInfo,
147            7 => ServerPacketId::Totals,
148            8 => ServerPacketId::Extremes,
149            9 => ServerPacketId::TablesStatusResponse,
150            10 => ServerPacketId::Log,
151            11 => ServerPacketId::TableColumns,
152            12 => ServerPacketId::PartUUIDs,
153            13 => ServerPacketId::ReadTaskRequest,
154            14 => ServerPacketId::ProfileEvents,
155            15 => ServerPacketId::MergeTreeAllRangesAnnouncement,
156            16 => ServerPacketId::MergeTreeReadTaskRequest,
157            17 => ServerPacketId::TimezoneUpdate,
158            18 => ServerPacketId::SSHChallenge,
159            x => {
160                error!("invalid packet id from server: {}", x);
161                return Err(Error::Protocol(format!("Unknown packet id {i}")));
162            }
163        })
164    }
165}
166
167/// The deserialized information read from the tcp stream after a packet id has been received.
168#[expect(unused)]
169#[derive(Debug, Clone, AsRefStr)]
170pub(crate) enum ServerPacket<T = Block> {
171    Hello(ServerHello),
172    Header(ServerData<Block>),
173    Data(ServerData<T>),
174    QueryData(ServerData<T>),
175    Totals(ServerData<T>),
176    Extremes(ServerData<T>),
177    ProfileEvents(Vec<ProfileEvent>),
178    Log(Vec<LogData>),
179    Exception(ServerException),
180    Progress(Progress),
181    Pong,
182    EndOfStream,
183    ProfileInfo(ProfileInfo),
184    TablesStatusResponse(TablesStatusResponse),
185    TableColumns(TableColumns),
186    PartUUIDs(Vec<Uuid>),
187    ReadTaskRequest(Option<String>),
188    MergeTreeAllRangesAnnouncement,
189    MergeTreeReadTaskRequest,
190    TimezoneUpdate,
191    SSHChallenge,
192    Ignore(ServerPacketId), // Allows ignoring certain packets
193}
194
195#[derive(Debug, Clone, Default)]
196pub(crate) struct ServerHello {
197    #[expect(unused)]
198    pub(crate) server_name:      String,
199    #[expect(unused)]
200    pub(crate) version:          (u64, u64, u64),
201    pub(crate) revision_version: u64,
202    #[expect(unused)]
203    pub(crate) timezone:         Option<String>,
204    #[expect(unused)]
205    pub(crate) display_name:     Option<String>,
206    pub(crate) settings:         Option<Settings>,
207    pub(crate) chunked_send:     ChunkedProtocolMode,
208    pub(crate) chunked_recv:     ChunkedProtocolMode,
209}
210
211impl ServerHello {
212    pub(crate) fn supports_chunked_send(&self) -> bool {
213        matches!(
214            self.chunked_send,
215            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
216        )
217    }
218
219    pub(crate) fn supports_chunked_recv(&self) -> bool {
220        matches!(
221            self.chunked_recv,
222            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
223        )
224    }
225}
226
227#[derive(Debug, Clone)]
228pub(crate) struct ServerData<T> {
229    pub(crate) block: T,
230}
231
232#[derive(Debug, Clone)]
233pub(crate) struct ServerException {
234    pub(crate) code:        i32,
235    pub(crate) name:        String,
236    pub(crate) message:     String,
237    pub(crate) stack_trace: String,
238    #[expect(unused)]
239    pub(crate) has_nested:  bool,
240}
241
242impl ServerException {
243    pub(crate) fn emit(self) -> ServerError { map_exception_to_error(self) }
244}
245
246#[expect(unused)]
247#[derive(Debug, Clone)]
248pub(crate) struct ProfileInfo {
249    pub(crate) rows:                         u64,
250    pub(crate) blocks:                       u64,
251    pub(crate) bytes:                        u64,
252    pub(crate) applied_limit:                bool,
253    pub(crate) rows_before_limit:            u64,
254    pub(crate) calculated_rows_before_limit: bool,
255    pub(crate) applied_aggregation:          bool,
256    pub(crate) rows_before_aggregation:      u64,
257}
258
259#[expect(unused)]
260#[derive(Debug, Clone)]
261pub(crate) struct TableColumns {
262    pub(crate) name:        String,
263    pub(crate) description: String,
264}
265
266#[expect(unused)]
267#[derive(Debug, Clone)]
268pub(crate) struct TableStatus {
269    pub(crate) is_replicated:  bool,
270    pub(crate) absolute_delay: u32,
271}
272
273#[derive(Debug, Clone)]
274pub(crate) struct TablesStatusResponse {
275    pub(crate) database_tables: FxIndexMap<String, FxIndexMap<String, TableStatus>>,
276}
277
278#[derive(Debug, Clone, Default)]
279pub(crate) struct LogData {
280    pub(crate) time:       String,
281    pub(crate) time_micro: u32,
282    pub(crate) host_name:  String,
283    pub(crate) query_id:   String,
284    pub(crate) thread_id:  u64,
285    pub(crate) priority:   i8,
286    pub(crate) source:     String,
287    pub(crate) text:       String,
288}
289
290impl LogData {
291    fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
292        match name {
293            "time" => self.time = value.to_string(),
294            "time_micro" => self.time_micro = value.to_value(type_)?,
295            "host_name" => self.host_name = value.to_string(),
296            "query_id" => self.query_id = value.to_string(),
297            "thread_id" => self.thread_id = value.to_value(type_)?,
298            "priority" => self.priority = value.to_value(type_)?,
299            "source" => self.source = value.to_string(),
300            "text" => self.text = value.to_string(),
301            _ => {}
302        }
303        Ok(())
304    }
305
306    #[expect(clippy::cast_possible_truncation)]
307    pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
308        let rows = block.rows as usize;
309        let mut log_data = vec![Self::default(); rows];
310        let mut column_data = std::mem::take(&mut block.column_data);
311        for (name, type_) in &block.column_types {
312            for (i, value) in column_data.drain(..rows).enumerate() {
313                if let Some(log) = log_data.get_mut(i) {
314                    log.update_value(name, value, type_)?;
315                }
316            }
317        }
318        Ok(log_data)
319    }
320}
321
322/// Emitted by `ClickHouse` during operations.
323#[derive(Debug, Clone, Default)]
324pub struct ProfileEvent {
325    pub(crate) host_name:    String,
326    pub(crate) current_time: String,
327    pub(crate) thread_id:    u64,
328    pub(crate) type_code:    i8,
329    pub(crate) name:         String,
330    pub(crate) value:        i64,
331}
332
333impl ProfileEvent {
334    fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
335        match name {
336            "host_name" => self.host_name = value.to_string(),
337            "current_time" => self.current_time = value.to_string(),
338            "thread_id" => self.thread_id = value.to_value(type_)?,
339            "type_code" => self.type_code = value.to_value(type_)?,
340            "name" => self.name = value.to_string(),
341            "value" => self.value = value.to_value(type_)?,
342            _ => {}
343        }
344        Ok(())
345    }
346
347    #[expect(clippy::cast_possible_truncation)]
348    pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
349        let rows = block.rows as usize;
350        let mut profile_events = vec![Self::default(); rows];
351        let mut column_data = std::mem::take(&mut block.column_data);
352        for (name, type_) in &block.column_types {
353            for (i, value) in column_data.drain(..rows).enumerate() {
354                if let Some(profile) = profile_events.get_mut(i) {
355                    profile.update_value(name, value, type_).inspect_err(|error| {
356                        error!(?error, "profile event update failed");
357                    })?;
358                }
359            }
360        }
361        Ok(profile_events)
362    }
363}
364
365#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash, AsRefStr)]
366#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
367pub enum ChunkedProtocolMode {
368    #[default]
369    #[strum(serialize = "chunked_optional")]
370    ChunkedOptional,
371    #[strum(serialize = "chunked")]
372    Chunked,
373    #[strum(serialize = "notchunked_optional")]
374    NotChunkedOptional,
375    #[strum(serialize = "notchunked")]
376    NotChunked,
377}
378
379impl ChunkedProtocolMode {
380    /// Negotiates chunked protocol between client and server (based on C++ `is_chunked` function)
381    pub(crate) fn negotiate(
382        server_mode: ChunkedProtocolMode,
383        client_mode: ChunkedProtocolMode,
384        direction: &str,
385    ) -> Result<ChunkedProtocolMode> {
386        let server_chunked = matches!(
387            server_mode,
388            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
389        );
390        let server_optional = matches!(
391            server_mode,
392            ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
393        );
394        let client_chunked = matches!(
395            client_mode,
396            ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
397        );
398        let client_optional = matches!(
399            client_mode,
400            ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
401        );
402        let result_chunked = if server_optional {
403            client_chunked
404        } else if client_optional {
405            server_chunked
406        } else if client_chunked != server_chunked {
407            return Err(Error::Protocol(format!(
408                "Incompatible protocol: {} set to {}, server requires {}",
409                direction,
410                if client_chunked { "chunked" } else { "notchunked" },
411                if server_chunked { "chunked" } else { "notchunked" }
412            )));
413        } else {
414            server_chunked
415        };
416
417        Ok(if result_chunked {
418            ChunkedProtocolMode::Chunked
419        } else {
420            ChunkedProtocolMode::NotChunked
421        })
422    }
423}
424
425impl FromStr for ChunkedProtocolMode {
426    type Err = Error;
427
428    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
429        Ok(match s {
430            "chunked" => Self::Chunked,
431            "chunked_optional" => Self::ChunkedOptional,
432            "notchunked" => Self::NotChunked,
433            "notchunked_optional" => Self::NotChunkedOptional,
434            _ => {
435                return Err(Error::Protocol(format!(
436                    "Unexpected value for chunked protocol mode: {s}"
437                )));
438            }
439        })
440    }
441}
442
443#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash)]
444#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
445pub enum CompressionMethod {
446    None,
447    #[default]
448    LZ4,
449    ZSTD,
450}
451
452impl CompressionMethod {
453    pub(crate) fn byte(self) -> u8 {
454        match self {
455            CompressionMethod::None => 0x02,
456            CompressionMethod::LZ4 => 0x82,
457            CompressionMethod::ZSTD => 0x90,
458        }
459    }
460}
461
462impl From<&str> for CompressionMethod {
463    fn from(value: &str) -> Self {
464        match value {
465            "lz4" | "LZ4" => CompressionMethod::LZ4,
466            "zstd" | "ZSTD" => CompressionMethod::ZSTD,
467            _ => CompressionMethod::None,
468        }
469    }
470}
471
472impl std::fmt::Display for CompressionMethod {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        match self {
475            CompressionMethod::None => write!(f, "None"),
476            CompressionMethod::LZ4 => write!(f, "LZ4"),
477            CompressionMethod::ZSTD => write!(f, "ZSTD"),
478        }
479    }
480}
481
482impl FromStr for CompressionMethod {
483    type Err = String;
484
485    fn from_str(s: &str) -> Result<Self, Self::Err> {
486        let method = CompressionMethod::from(s);
487        if matches!(method, CompressionMethod::None) {
488            return Err(format!("Invalid compression method: {s}"));
489        }
490
491        Ok(method)
492    }
493}
494
495impl AsRef<str> for CompressionMethod {
496    fn as_ref(&self) -> &str {
497        match self {
498            CompressionMethod::None => "None",
499            CompressionMethod::LZ4 => "LZ4",
500            CompressionMethod::ZSTD => "ZSTD",
501        }
502    }
503}