clickhouse_arrow/native/
client_info.rs1use tokio::io::AsyncWriteExt;
2use uuid::Uuid;
3
4use super::protocol::{
5 DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER, DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS,
6};
7use crate::io::ClickHouseWrite;
8use crate::native::protocol::{
9 DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH,
10 DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS,
11 DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME, DBMS_MIN_REVISION_WITH_OPENTELEMETRY,
12 DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO, DBMS_MIN_REVISION_WITH_VERSION_PATCH,
13 DBMS_TCP_PROTOCOL_VERSION,
14};
15use crate::prelude::*;
16
17#[repr(u8)]
18#[derive(PartialEq, Clone, Copy, Debug)]
19#[allow(unused, clippy::enum_variant_names)]
20pub(crate) enum QueryKind {
21 NoQuery,
22 InitialQuery,
23 SecondaryQuery,
24}
25
26#[derive(Debug)]
27pub(crate) struct OpenTelemetry<'a> {
28 trace_id: Uuid,
29 span_id: u64,
30 tracestate: &'a str,
31 trace_flags: u8,
32}
33
34#[derive(Debug)]
35pub(crate) struct ClientInfo<'a> {
36 pub kind: QueryKind,
37 pub initial_user: &'a str,
38 pub initial_query_id: &'a str,
39 pub initial_address: &'a str,
40 pub os_user: &'a str,
42 pub client_hostname: &'a str,
43 pub client_name: &'a str,
44 pub client_version_major: u64,
45 pub client_version_minor: u64,
46 pub client_tcp_protocol_version: u64,
47 pub query_start_time: u64,
49 pub quota_key: &'a str,
51 pub distributed_depth: u64,
53 pub client_version_patch: u64,
55 pub open_telemetry: Option<OpenTelemetry<'a>>,
57}
58
59impl Default for ClientInfo<'_> {
60 fn default() -> Self {
61 ClientInfo {
62 kind: QueryKind::InitialQuery,
63 initial_user: "",
64 initial_query_id: "",
65 initial_address: "0.0.0.0:0",
66 os_user: "",
67 client_hostname: "localhost",
68 client_name: "ClickHouseArrow",
69 client_version_major: crate::constants::VERSION_MAJOR,
70 client_version_minor: crate::constants::VERSION_MINOR,
71 client_version_patch: crate::constants::VERSION_PATCH,
72 client_tcp_protocol_version: DBMS_TCP_PROTOCOL_VERSION,
73 #[expect(clippy::cast_possible_truncation)]
74 query_start_time: std::time::SystemTime::now()
75 .duration_since(std::time::UNIX_EPOCH)
76 .unwrap_or(std::time::Duration::from_secs(0))
77 .as_micros() as u64,
78 quota_key: "",
79 distributed_depth: 1,
80 open_telemetry: None,
81 }
82 }
83}
84
85impl ClientInfo<'_> {
86 pub(crate) async fn write<W: ClickHouseWrite>(&self, to: &mut W, revision: u64) -> Result<()> {
87 to.write_u8(self.kind as u8).await?;
88 if self.kind == QueryKind::NoQuery {
89 return Ok(());
90 }
91 to.write_string(self.initial_user).await?;
92 to.write_string(self.initial_query_id).await?;
93 to.write_string(self.initial_address).await?;
94
95 if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME {
96 to.write_u64_le(self.query_start_time).await?;
97 }
98
99 to.write_u8(1).await?;
101
102 to.write_string(self.os_user).await?;
103 to.write_string(self.client_hostname).await?;
104 to.write_string(self.client_name).await?;
105
106 to.write_var_uint(self.client_version_major).await?;
107 to.write_var_uint(self.client_version_minor).await?;
108 to.write_var_uint(self.client_tcp_protocol_version).await?;
109
110 if revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO {
111 to.write_string(self.quota_key).await?;
112 }
113 if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH {
114 to.write_var_uint(self.distributed_depth).await?;
115 }
116 if revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH {
117 to.write_var_uint(self.client_version_patch).await?;
118 }
119 if revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY {
120 if let Some(telemetry) = &self.open_telemetry {
121 to.write_u8(1u8).await?;
122 to.write_all(&telemetry.trace_id.as_bytes()[..]).await?;
123 to.write_u64(telemetry.span_id).await?;
124 to.write_string(telemetry.tracestate).await?;
125 to.write_u8(telemetry.trace_flags).await?;
126 } else {
127 to.write_u8(0u8).await?;
128 }
129 }
130
131 if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS {
132 to.write_var_uint(0).await?; to.write_var_uint(0).await?; to.write_var_uint(0).await?; }
136
137 if revision >= DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS {
138 to.write_var_uint(0).await?; to.write_var_uint(0).await?; }
141
142 if revision >= DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER {
143 to.write_u8(0).await?;
145 }
146
147 Ok(())
148 }
149}