1use std::str::FromStr;
2
3use strum::AsRefStr;
4use uuid::Uuid;
5
6use super::block::Block;
7use super::error_codes::map_exception_to_error;
8use super::progress::Progress;
9use crate::prelude::*;
10use crate::{Error, FxIndexMap, Result, ServerError};
11
12pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_INFO: u64 = 54032;
13pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE: u64 = 54058;
14pub(crate) const DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO: u64 = 54060;
15pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME: u64 = 54372;
18pub(crate) const DBMS_MIN_REVISION_WITH_VERSION_PATCH: u64 = 54401;
19pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_LOGS: u64 = 54406;
20pub(crate) const DBMS_MIN_REVISION_WITH_CLIENT_WRITE_INFO: u64 = 54420;
25pub(crate) const DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS: u64 = 54429;
26pub(crate) const DBMS_MIN_REVISION_WITH_OPENTELEMETRY: u64 = 54442;
27pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET: u64 = 54441;
28pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH: u64 = 54448;
31
32pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME: u64 = 54449;
33pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS: u64 = 54453;
35pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CUSTOM_SERIALIZATION: u64 = 54454;
36pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PROFILE_EVENTS_IN_INSERT: u64 = 54456;
37pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM: u64 = 54458;
38pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY: u64 = 54458;
39pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS: u64 = 54459;
40pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_SERVER_QUERY_TIME_IN_PROGRESS: u64 = 54460;
41pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES: u64 = 54461;
42pub(crate) const DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET_V2: u64 = 54462;
43pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_TOTAL_BYTES_IN_PROGRESS: u64 = 54463;
44pub(crate) const DBMS_MIN_REVISION_WITH_ROWS_BEFORE_AGGREGATION: u64 = 54469;
51pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_CHUNKED_PACKETS: u64 = 54470;
52pub(crate) const DBMS_MIN_REVISION_WITH_VERSIONED_PARALLEL_REPLICAS_PROTOCOL: u64 = 54471;
53pub(crate) const DBMS_MIN_PROTOCOL_VERSION_WITH_INTERSERVER_EXTERNALLY_GRANTED_ROLES: u64 = 54472;
55pub(crate) const DBMS_MIN_REVISION_WITH_SERVER_SETTINGS: u64 = 54474;
58pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS: u64 = 54475;
59pub(crate) const DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER: u64 = 54476;
60pub(crate) const DBMS_MIN_REVISION_WITH_QUERY_PLAN_SERIALIZATION: u64 = 54477;
62
63pub(crate) const DBMS_TCP_PROTOCOL_VERSION: u64 = DBMS_MIN_REVISION_WITH_QUERY_PLAN_SERIALIZATION;
65
66pub(crate) const DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION: u64 = 4;
67
68pub(crate) const MAX_STRING_SIZE: usize = 1 << 30;
70
71#[repr(u64)]
72#[derive(Clone, Copy, Debug)]
73#[expect(unused)]
74pub(crate) enum QueryProcessingStage {
75 FetchColumns,
76 WithMergeableState,
77 Complete,
78 WithMergableStateAfterAggregation,
79}
80
81#[expect(unused)]
82#[repr(u64)]
83#[derive(Clone, Copy, Debug)]
84pub(crate) enum ClientPacketId {
85 Hello = 0, Query = 1,
89 Data = 2, Cancel = 3, Ping = 4, TablesStatusRequest = 5, KeepAlive = 6, Scalar = 7, IgnoredPartUUIDs = 8, ReadTaskResponse = 9, MergeTreeReadTaskResponse = 10,
99 SSHChallengeRequest = 11, SSHChallengeResponse = 12, QueryPlan = 13, }
103
104pub(crate) struct ClientHello {
105 pub(crate) default_database: String,
106 pub(crate) username: String,
107 pub(crate) password: String,
108}
109
110#[repr(u64)]
114#[derive(Clone, Copy, Debug, AsRefStr)]
115pub(crate) enum ServerPacketId {
116 Hello = 0,
117 Data = 1,
118 Exception = 2,
119 Progress = 3,
120 Pong = 4,
121 EndOfStream = 5,
122 ProfileInfo = 6,
123 Totals = 7,
124 Extremes = 8,
125 TablesStatusResponse = 9,
126 Log = 10,
127 TableColumns = 11,
128 PartUUIDs = 12,
129 ReadTaskRequest = 13,
130 ProfileEvents = 14,
131 MergeTreeAllRangesAnnouncement = 15,
132 MergeTreeReadTaskRequest = 16, TimezoneUpdate = 17, SSHChallenge = 18, }
136
137impl ServerPacketId {
138 pub(crate) fn from_u64(i: u64) -> Result<Self> {
139 Ok(match i {
140 0 => ServerPacketId::Hello,
141 1 => ServerPacketId::Data,
142 2 => ServerPacketId::Exception,
143 3 => ServerPacketId::Progress,
144 4 => ServerPacketId::Pong,
145 5 => ServerPacketId::EndOfStream,
146 6 => ServerPacketId::ProfileInfo,
147 7 => ServerPacketId::Totals,
148 8 => ServerPacketId::Extremes,
149 9 => ServerPacketId::TablesStatusResponse,
150 10 => ServerPacketId::Log,
151 11 => ServerPacketId::TableColumns,
152 12 => ServerPacketId::PartUUIDs,
153 13 => ServerPacketId::ReadTaskRequest,
154 14 => ServerPacketId::ProfileEvents,
155 15 => ServerPacketId::MergeTreeAllRangesAnnouncement,
156 16 => ServerPacketId::MergeTreeReadTaskRequest,
157 17 => ServerPacketId::TimezoneUpdate,
158 18 => ServerPacketId::SSHChallenge,
159 x => {
160 error!("invalid packet id from server: {}", x);
161 return Err(Error::Protocol(format!("Unknown packet id {i}")));
162 }
163 })
164 }
165}
166
167#[expect(unused)]
169#[derive(Debug, Clone, AsRefStr)]
170pub(crate) enum ServerPacket<T = Block> {
171 Hello(ServerHello),
172 Header(ServerData<Block>),
173 Data(ServerData<T>),
174 QueryData(ServerData<T>),
175 Totals(ServerData<T>),
176 Extremes(ServerData<T>),
177 ProfileEvents(Vec<ProfileEvent>),
178 Log(Vec<LogData>),
179 Exception(ServerException),
180 Progress(Progress),
181 Pong,
182 EndOfStream,
183 ProfileInfo(ProfileInfo),
184 TablesStatusResponse(TablesStatusResponse),
185 TableColumns(TableColumns),
186 PartUUIDs(Vec<Uuid>),
187 ReadTaskRequest(Option<String>),
188 MergeTreeAllRangesAnnouncement,
189 MergeTreeReadTaskRequest,
190 TimezoneUpdate,
191 SSHChallenge,
192 Ignore(ServerPacketId), }
194
195#[derive(Debug, Clone, Default)]
196pub(crate) struct ServerHello {
197 #[expect(unused)]
198 pub(crate) server_name: String,
199 #[expect(unused)]
200 pub(crate) version: (u64, u64, u64),
201 pub(crate) revision_version: u64,
202 #[expect(unused)]
203 pub(crate) timezone: Option<String>,
204 #[expect(unused)]
205 pub(crate) display_name: Option<String>,
206 pub(crate) settings: Option<Settings>,
207 pub(crate) chunked_send: ChunkedProtocolMode,
208 pub(crate) chunked_recv: ChunkedProtocolMode,
209}
210
211impl ServerHello {
212 pub(crate) fn supports_chunked_send(&self) -> bool {
213 matches!(
214 self.chunked_send,
215 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
216 )
217 }
218
219 pub(crate) fn supports_chunked_recv(&self) -> bool {
220 matches!(
221 self.chunked_recv,
222 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
223 )
224 }
225}
226
227#[derive(Debug, Clone)]
228pub(crate) struct ServerData<T> {
229 pub(crate) block: T,
230}
231
232#[derive(Debug, Clone)]
233pub(crate) struct ServerException {
234 pub(crate) code: i32,
235 pub(crate) name: String,
236 pub(crate) message: String,
237 pub(crate) stack_trace: String,
238 #[expect(unused)]
239 pub(crate) has_nested: bool,
240}
241
242impl ServerException {
243 pub(crate) fn emit(self) -> ServerError { map_exception_to_error(self) }
244}
245
246#[expect(unused)]
247#[derive(Debug, Clone)]
248pub(crate) struct ProfileInfo {
249 pub(crate) rows: u64,
250 pub(crate) blocks: u64,
251 pub(crate) bytes: u64,
252 pub(crate) applied_limit: bool,
253 pub(crate) rows_before_limit: u64,
254 pub(crate) calculated_rows_before_limit: bool,
255 pub(crate) applied_aggregation: bool,
256 pub(crate) rows_before_aggregation: u64,
257}
258
259#[expect(unused)]
260#[derive(Debug, Clone)]
261pub(crate) struct TableColumns {
262 pub(crate) name: String,
263 pub(crate) description: String,
264}
265
266#[expect(unused)]
267#[derive(Debug, Clone)]
268pub(crate) struct TableStatus {
269 pub(crate) is_replicated: bool,
270 pub(crate) absolute_delay: u32,
271}
272
273#[derive(Debug, Clone)]
274pub(crate) struct TablesStatusResponse {
275 pub(crate) database_tables: FxIndexMap<String, FxIndexMap<String, TableStatus>>,
276}
277
278#[derive(Debug, Clone, Default)]
279pub(crate) struct LogData {
280 pub(crate) time: String,
281 pub(crate) time_micro: u32,
282 pub(crate) host_name: String,
283 pub(crate) query_id: String,
284 pub(crate) thread_id: u64,
285 pub(crate) priority: i8,
286 pub(crate) source: String,
287 pub(crate) text: String,
288}
289
290impl LogData {
291 fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
292 match name {
293 "time" => self.time = value.to_string(),
294 "time_micro" => self.time_micro = value.to_value(type_)?,
295 "host_name" => self.host_name = value.to_string(),
296 "query_id" => self.query_id = value.to_string(),
297 "thread_id" => self.thread_id = value.to_value(type_)?,
298 "priority" => self.priority = value.to_value(type_)?,
299 "source" => self.source = value.to_string(),
300 "text" => self.text = value.to_string(),
301 _ => {}
302 }
303 Ok(())
304 }
305
306 #[expect(clippy::cast_possible_truncation)]
307 pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
308 let rows = block.rows as usize;
309 let mut log_data = vec![Self::default(); rows];
310 let mut column_data = std::mem::take(&mut block.column_data);
311 for (name, type_) in &block.column_types {
312 for (i, value) in column_data.drain(..rows).enumerate() {
313 if let Some(log) = log_data.get_mut(i) {
314 log.update_value(name, value, type_)?;
315 }
316 }
317 }
318 Ok(log_data)
319 }
320}
321
322#[derive(Debug, Clone, Default)]
324pub struct ProfileEvent {
325 pub(crate) host_name: String,
326 pub(crate) current_time: String,
327 pub(crate) thread_id: u64,
328 pub(crate) type_code: i8,
329 pub(crate) name: String,
330 pub(crate) value: i64,
331}
332
333impl ProfileEvent {
334 fn update_value(&mut self, name: &str, value: Value, type_: &Type) -> Result<()> {
335 match name {
336 "host_name" => self.host_name = value.to_string(),
337 "current_time" => self.current_time = value.to_string(),
338 "thread_id" => self.thread_id = value.to_value(type_)?,
339 "type_code" => self.type_code = value.to_value(type_)?,
340 "name" => self.name = value.to_string(),
341 "value" => self.value = value.to_value(type_)?,
342 _ => {}
343 }
344 Ok(())
345 }
346
347 #[expect(clippy::cast_possible_truncation)]
348 pub(crate) fn from_block(mut block: Block) -> Result<Vec<Self>> {
349 let rows = block.rows as usize;
350 let mut profile_events = vec![Self::default(); rows];
351 let mut column_data = std::mem::take(&mut block.column_data);
352 for (name, type_) in &block.column_types {
353 for (i, value) in column_data.drain(..rows).enumerate() {
354 if let Some(profile) = profile_events.get_mut(i) {
355 profile.update_value(name, value, type_).inspect_err(|error| {
356 error!(?error, "profile event update failed");
357 })?;
358 }
359 }
360 }
361 Ok(profile_events)
362 }
363}
364
365#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash, AsRefStr)]
366#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
367pub enum ChunkedProtocolMode {
368 #[default]
369 #[strum(serialize = "chunked_optional")]
370 ChunkedOptional,
371 #[strum(serialize = "chunked")]
372 Chunked,
373 #[strum(serialize = "notchunked_optional")]
374 NotChunkedOptional,
375 #[strum(serialize = "notchunked")]
376 NotChunked,
377}
378
379impl ChunkedProtocolMode {
380 pub(crate) fn negotiate(
382 server_mode: ChunkedProtocolMode,
383 client_mode: ChunkedProtocolMode,
384 direction: &str,
385 ) -> Result<ChunkedProtocolMode> {
386 let server_chunked = matches!(
387 server_mode,
388 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
389 );
390 let server_optional = matches!(
391 server_mode,
392 ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
393 );
394 let client_chunked = matches!(
395 client_mode,
396 ChunkedProtocolMode::Chunked | ChunkedProtocolMode::ChunkedOptional
397 );
398 let client_optional = matches!(
399 client_mode,
400 ChunkedProtocolMode::ChunkedOptional | ChunkedProtocolMode::NotChunkedOptional
401 );
402 let result_chunked = if server_optional {
403 client_chunked
404 } else if client_optional {
405 server_chunked
406 } else if client_chunked != server_chunked {
407 return Err(Error::Protocol(format!(
408 "Incompatible protocol: {} set to {}, server requires {}",
409 direction,
410 if client_chunked { "chunked" } else { "notchunked" },
411 if server_chunked { "chunked" } else { "notchunked" }
412 )));
413 } else {
414 server_chunked
415 };
416
417 Ok(if result_chunked {
418 ChunkedProtocolMode::Chunked
419 } else {
420 ChunkedProtocolMode::NotChunked
421 })
422 }
423}
424
425impl FromStr for ChunkedProtocolMode {
426 type Err = Error;
427
428 fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
429 Ok(match s {
430 "chunked" => Self::Chunked,
431 "chunked_optional" => Self::ChunkedOptional,
432 "notchunked" => Self::NotChunked,
433 "notchunked_optional" => Self::NotChunkedOptional,
434 _ => {
435 return Err(Error::Protocol(format!(
436 "Unexpected value for chunked protocol mode: {s}"
437 )));
438 }
439 })
440 }
441}
442
443#[derive(Clone, Default, Copy, Debug, PartialEq, Eq, Hash)]
444#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
445pub enum CompressionMethod {
446 None,
447 #[default]
448 LZ4,
449 ZSTD,
450}
451
452impl CompressionMethod {
453 pub(crate) fn byte(self) -> u8 {
454 match self {
455 CompressionMethod::None => 0x02,
456 CompressionMethod::LZ4 => 0x82,
457 CompressionMethod::ZSTD => 0x90,
458 }
459 }
460}
461
462impl From<&str> for CompressionMethod {
463 fn from(value: &str) -> Self {
464 match value {
465 "lz4" | "LZ4" => CompressionMethod::LZ4,
466 "zstd" | "ZSTD" => CompressionMethod::ZSTD,
467 _ => CompressionMethod::None,
468 }
469 }
470}
471
472impl std::fmt::Display for CompressionMethod {
473 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474 match self {
475 CompressionMethod::None => write!(f, "None"),
476 CompressionMethod::LZ4 => write!(f, "LZ4"),
477 CompressionMethod::ZSTD => write!(f, "ZSTD"),
478 }
479 }
480}
481
482impl FromStr for CompressionMethod {
483 type Err = String;
484
485 fn from_str(s: &str) -> Result<Self, Self::Err> {
486 let method = CompressionMethod::from(s);
487 if matches!(method, CompressionMethod::None) {
488 return Err(format!("Invalid compression method: {s}"));
489 }
490
491 Ok(method)
492 }
493}
494
495impl AsRef<str> for CompressionMethod {
496 fn as_ref(&self) -> &str {
497 match self {
498 CompressionMethod::None => "None",
499 CompressionMethod::LZ4 => "LZ4",
500 CompressionMethod::ZSTD => "ZSTD",
501 }
502 }
503}