1use std::str::FromStr;
2
3use strum::AsRefStr;
4use uuid::Uuid;
5
6use super::block::Block;
7use super::error_codes::map_exception_to_error;
8use super::progress::Progress;
9use crate::prelude::*;
10use crate::{Error, FxIndexMap, Result, ServerError};
11
12pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_INFO: u64 = 54032;
13pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE: u64 = 54058;
14pub(crate) const DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO: u64 = 54060;
15pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME: u64 = 54372;
18pub(crate) const DBMS_MIN_REVISION_WITH_VERSION_PATCH: u64 = 54401;
19pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_LOGS: u64 = 54406;
20pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO: u64 = 54420;
25pub(crate) const DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS: u64 = 54429;
26pub(crate) const DBMS_MIN_REVISION_WITH_OPENTELEMETRY: u64 = 54442;
27pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET: u64 = 54441;
28pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH: u64 = 54448;
31
32pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME: u64 = 54449;
33pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS: u64 = 54453;
35pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION: u64 = 54454;
36pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT: u64 = 54456;
37pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM: u64 = 54458;
38pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY: u64 = 54458;
39pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS: u64 = 54459;
40pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS: u64 = 54460;
41pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES: u64 = 54461;
42pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2: u64 = 54462;
43pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_TOTAL_BYTES_IN_PROGRESS: u64 = 54463;
44pub(crate) const DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION: u64 = 54469;
51pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS: u64 = 54470;
52pub(crate) const DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL: u64 = 54471;
53pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES: u64 = 54472;
55pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_SETTINGS: u64 = 54474;
58pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS: u64 = 54475;
59pub(crate) const DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER: u64 = 54476;
60pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_PLAN_SERIALIZATION: u64 = 54477;
61pub(crate) const DBMS_MIN_REVISION_WITH_VERSIONED_CLUSTER_FUNCTION_PROTOCOL: u64 = 54479;
64
65pub(crate) const DBMS_TCP_PROTOCOL_VERSION: u64 =
67 DBMS_MIN_REVISION_WITH_VERSIONED_CLUSTER_FUNCTION_PROTOCOL;
68
69pub(crate) const DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION: u64 = 4;
70
71pub(crate) const MAX_STRING_SIZE: usize = 1 << 30;
73
74#[repr(u64)]
75#[derive(Clone, Copy, Debug)]
76#[expect(unused)]
77pub(crate) enum QueryProcessingStage {
78 FetchColumns,
79 WithMergeableState,
80 Complete,
81 WithMergableStateAfterAggregation,
82}
83
84#[expect(unused)]
85#[repr(u64)]
86#[derive(Clone, Copy, Debug)]
87pub(crate) enum ClientPacketId {
88 Hello = 0, Query = 1,
92 Data = 2, Cancel = 3, Ping = 4, TablesStatusRequest = 5, KeepAlive = 6, Scalar = 7, IgnoredPartUUIDs = 8, ReadTaskResponse = 9, MergeTreeReadTaskResponse = 10,
102 SSHChallengeRequest = 11, SSHChallengeResponse = 12, QueryPlan = 13, }
106
107pub(crate) struct ClientHello {
108 pub(crate) default_database: String,
109 pub(crate) username: String,
110 pub(crate) password: String,
111}
112
113#[repr(u64)]
117#[derive(Clone, Copy, Debug, AsRefStr)]
118pub(crate) enum ServerPacketId {
119 Hello = 0,
120 Data = 1,
121 Exception = 2,
122 Progress = 3,
123 Pong = 4,
124 EndOfStream = 5,
125 ProfileInfo = 6,
126 Totals = 7,
127 Extremes = 8,
128 TablesStatusResponse = 9,
129 Log = 10,
130 TableColumns = 11,
131 PartUUIDs = 12,
132 ReadTaskRequest = 13,
133 ProfileEvents = 14,
134 MergeTreeAllRangesAnnouncement = 15,
135 MergeTreeReadTaskRequest = 16, TimezoneUpdate = 17, SSHChallenge = 18, }
139
140impl ServerPacketId {
141 pub(crate) fn from_u64(i: u64) -> Result<Self> {
142 Ok(match i {
143 0 => ServerPacketId::Hello,
144 1 => ServerPacketId::Data,
145 2 => ServerPacketId::Exception,
146 3 => ServerPacketId::Progress,
147 4 => ServerPacketId::Pong,
148 5 => ServerPacketId::EndOfStream,
149 6 => ServerPacketId::ProfileInfo,
150 7 => ServerPacketId::Totals,
151 8 => ServerPacketId::Extremes,
152 9 => ServerPacketId::TablesStatusResponse,
153 10 => ServerPacketId::Log,
154 11 => ServerPacketId::TableColumns,
155 12 => ServerPacketId::PartUUIDs,
156 13 => ServerPacketId::ReadTaskRequest,
157 14 => ServerPacketId::ProfileEvents,
158 15 => ServerPacketId::MergeTreeAllRangesAnnouncement,
159 16 => ServerPacketId::MergeTreeReadTaskRequest,
160 17 => ServerPacketId::TimezoneUpdate,
161 18 => ServerPacketId::SSHChallenge,
162 x => {
163 error!("invalid packet id from server: {}", x);
164 return Err(Error::Protocol(format!("Unknown packet id {i}")));
165 }
166 })
167 }
168}
169
170#[expect(unused)]
172#[derive(Debug, Clone, AsRefStr)]
173pub(crate) enum ServerPacket<T = Block> {
174 Hello(ServerHello),
175 Header(ServerData<Block>),
176 Data(ServerData<T>),
177 QueryData(ServerData<T>),
178 Totals(ServerData<T>),
179 Extremes(ServerData<T>),
180 ProfileEvents(Vec<ProfileEvent>),
181 Log(Vec<LogData>),
182 Exception(ServerException),
183 Progress(Progress),
184 Pong,
185 EndOfStream,
186 ProfileInfo(ProfileInfo),
187 TablesStatusResponse(TablesStatusResponse),
188 TableColumns(TableColumns),
189 PartUUIDs(Vec<Uuid>),
190 ReadTaskRequest(Option<String>),
191 MergeTreeAllRangesAnnouncement,
192 MergeTreeReadTaskRequest,
193 TimezoneUpdate,
194 SSHChallenge,
195 Ignore(ServerPacketId), }
197
198#[derive(Debug, Clone, Default)]
199pub(crate) struct ServerHello {
200 #[expect(unused)]
201 pub(crate) server_name: String,
202 #[expect(unused)]
203 pub(crate) version: (u64, u64, u64),
204 pub(crate) revision_version: u64,
205 #[expect(unused)]
206 pub(crate) timezone: Option<String>,
207 #[expect(unused)]
208 pub(crate) display_name: Option<String>,
209 pub(crate) settings: Option<Settings>,
210 pub(crate) chunked_send: ChunkedProtocolMode,
211 pub(crate) chunked_recv: ChunkedProtocolMode,
212}
213
214impl ServerHello {
215 pub(crate) fn supports_chunked_send(&self) -> bool {
216 matches!(
217 self.chunked_send,
218 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
219 )
220 }
221
222 pub(crate) fn supports_chunked_recv(&self) -> bool {
223 matches!(
224 self.chunked_recv,
225 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
226 )
227 }
228}
229
230#[derive(Debug, Clone)]
231pub(crate) struct ServerData<T> {
232 pub(crate) block: T,
233}
234
235#[derive(Debug, Clone)]
236pub(crate) struct ServerException {
237 pub(crate) code: i32,
238 pub(crate) name: String,
239 pub(crate) message: String,
240 pub(crate) stack_trace: String,
241 #[expect(unused)]
242 pub(crate) has_nested: bool,
243}
244
245impl ServerException {
246 pub(crate) fn emit(self) -> ServerError { map_exception_to_error(self) }
247}
248
249#[expect(unused)]
250#[derive(Debug, Clone)]
251pub(crate) struct ProfileInfo {
252 pub(crate) rows: u64,
253 pub(crate) blocks: u64,
254 pub(crate) bytes: u64,
255 pub(crate) applied_limit: bool,
256 pub(crate) rows_before_limit: u64,
257 pub(crate) calculated_rows_before_limit: bool,
258 pub(crate) applied_aggregation: bool,
259 pub(crate) rows_before_aggregation: u64,
260}
261
262#[expect(unused)]
263#[derive(Debug, Clone)]
264pub(crate) struct TableColumns {
265 pub(crate) name: String,
266 pub(crate) description: String,
267}
268
269#[expect(unused)]
270#[derive(Debug, Clone)]
271pub(crate) struct TableStatus {
272 pub(crate) is_replicated: bool,
273 pub(crate) absolute_delay: u32,
274}
275
276#[derive(Debug, Clone)]
277pub(crate) struct TablesStatusResponse {
278 pub(crate) database_tables: FxIndexMap<String, FxIndexMap<String, TableStatus>>,
279}
280
281#[derive(Debug, Clone, Default)]
282pub(crate) struct LogData {
283 pub(crate) time: String,
284 pub(crate) time_micro: u32,
285 pub(crate) host_name: String,
286 pub(crate) query_id: String,
287 pub(crate) thread_id: u64,
288 pub(crate) priority: i8,
289 pub(crate) source: String,
290 pub(crate) text: String,
291}
292
293impl LogData {
294 fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
295 match name {
296 "time" => self.time = value.to_string(),
297 "time_micro" => self.time_micro = value.to_value(type_)?,
298 "host_name" => self.host_name = value.to_string(),
299 "query_id" => self.query_id = value.to_string(),
300 "thread_id" => self.thread_id = value.to_value(type_)?,
301 "priority" => self.priority = value.to_value(type_)?,
302 "source" => self.source = value.to_string(),
303 "text" => self.text = value.to_string(),
304 _ => {}
305 }
306 Ok(())
307 }
308
309 #[expect(clippy::cast_possible_truncation)]
310 pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
311 let rows = block.rows as usize;
312 let mut log_data = vec![Self::default(); rows];
313 let mut column_data = std::mem::take(&mut block.column_data);
314 for (name, type_) in &block.column_types {
315 for (i, value) in column_data.drain(..rows).enumerate() {
316 if let Some(log) = log_data.get_mut(i) {
317 log.update_value(name, value, type_)?;
318 }
319 }
320 }
321 Ok(log_data)
322 }
323}
324
325#[derive(Debug, Clone, Default)]
327pub struct ProfileEvent {
328 pub(crate) host_name: String,
329 pub(crate) current_time: String,
330 pub(crate) thread_id: u64,
331 pub(crate) type_code: i8,
332 pub(crate) name: String,
333 pub(crate) value: i64,
334}
335
336impl ProfileEvent {
337 fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
338 match name {
339 "host_name" => self.host_name = value.to_string(),
340 "current_time" => self.current_time = value.to_string(),
341 "thread_id" => self.thread_id = value.to_value(type_)?,
342 "type_code" => self.type_code = value.to_value(type_)?,
343 "name" => self.name = value.to_string(),
344 "value" => self.value = value.to_value(type_)?,
345 _ => {}
346 }
347 Ok(())
348 }
349
350 #[expect(clippy::cast_possible_truncation)]
351 pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
352 let rows = block.rows as usize;
353 let mut profile_events = vec![Self::default(); rows];
354 let mut column_data = std::mem::take(&mut block.column_data);
355 for (name, type_) in &block.column_types {
356 for (i, value) in column_data.drain(..rows).enumerate() {
357 if let Some(profile) = profile_events.get_mut(i) {
358 profile.update_value(name, value, type_).inspect_err(|error| {
359 error!(?error, "profile event update failed");
360 })?;
361 }
362 }
363 }
364 Ok(profile_events)
365 }
366}
367
368#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash, AsRefStr)]
369#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
370pub enum ChunkedProtocolMode {
371 #[default]
372 #[strum(serialize = "chunked_optional")]
373 ChunkedOptional,
374 #[strum(serialize = "chunked")]
375 Chunked,
376 #[strum(serialize = "notchunked_optional")]
377 NotChunkedOptional,
378 #[strum(serialize = "notchunked")]
379 NotChunked,
380}
381
382impl ChunkedProtocolMode {
383 pub(crate) fn negotiate(
385 server_mode: ChunkedProtocolMode,
386 client_mode: ChunkedProtocolMode,
387 direction: &str,
388 ) -> Result<ChunkedProtocolMode> {
389 let server_chunked = matches!(
390 server_mode,
391 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
392 );
393 let server_optional = matches!(
394 server_mode,
395 ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
396 );
397 let client_chunked = matches!(
398 client_mode,
399 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
400 );
401 let client_optional = matches!(
402 client_mode,
403 ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
404 );
405 let result_chunked = if server_optional {
406 client_chunked
407 } else if client_optional {
408 server_chunked
409 } else if client_chunked != server_chunked {
410 return Err(Error::Protocol(format!(
411 "Incompatible protocol: {} set to {}, server requires {}",
412 direction,
413 if client_chunked { "chunked" } else { "notchunked" },
414 if server_chunked { "chunked" } else { "notchunked" }
415 )));
416 } else {
417 server_chunked
418 };
419
420 Ok(if result_chunked {
421 ChunkedProtocolMode::Chunked
422 } else {
423 ChunkedProtocolMode::NotChunked
424 })
425 }
426}
427
428impl FromStr for ChunkedProtocolMode {
429 type Err = Error;
430
431 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
432 Ok(match s {
433 "chunked" => Self::Chunked,
434 "chunked_optional" => Self::ChunkedOptional,
435 "notchunked" => Self::NotChunked,
436 "notchunked_optional" => Self::NotChunkedOptional,
437 _ => {
438 return Err(Error::Protocol(format!(
439 "Unexpected value for chunked protocol mode: {s}"
440 )));
441 }
442 })
443 }
444}
445
446#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash)]
447#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
448pub enum CompressionMethod {
449 None,
450 #[default]
451 LZ4,
452 ZSTD,
453}
454
455impl CompressionMethod {
456 pub(crate) fn byte(self) -> u8 {
457 match self {
458 CompressionMethod::None => 0x02,
459 CompressionMethod::LZ4 => 0x82,
460 CompressionMethod::ZSTD => 0x90,
461 }
462 }
463}
464
465impl From<&str> for CompressionMethod {
466 fn from(value: &str) -> Self {
467 match value {
468 "lz4" | "LZ4" => CompressionMethod::LZ4,
469 "zstd" | "ZSTD" => CompressionMethod::ZSTD,
470 _ => CompressionMethod::None,
471 }
472 }
473}
474
475impl std::fmt::Display for CompressionMethod {
476 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477 match self {
478 CompressionMethod::None => write!(f, "None"),
479 CompressionMethod::LZ4 => write!(f, "LZ4"),
480 CompressionMethod::ZSTD => write!(f, "ZSTD"),
481 }
482 }
483}
484
485impl FromStr for CompressionMethod {
486 type Err = String;
487
488 fn from_str(s: &str) -> Result<Self, Self::Err> {
489 let method = CompressionMethod::from(s);
490 if matches!(method, CompressionMethod::None) {
491 return Err(format!("Invalid compression method: {s}"));
492 }
493
494 Ok(method)
495 }
496}
497
498impl AsRef<str> for CompressionMethod {
499 fn as_ref(&self) -> &str {
500 match self {
501 CompressionMethod::None => "None",
502 CompressionMethod::LZ4 => "LZ4",
503 CompressionMethod::ZSTD => "ZSTD",
504 }
505 }
506}