clickhouse_arrow/native/
client_info.rs

1use tokio::io::AsyncWriteExt;
2use uuid::Uuid;
3
4use super::protocol::{
5    DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER, DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS,
6};
7use crate::io::ClickHouseWrite;
8use crate::native::protocol::{
9    DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH,
10    DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS,
11    DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME, DBMS_MIN_REVISION_WITH_OPENTELEMETRY,
12    DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO, DBMS_MIN_REVISION_WITH_VERSION_PATCH,
13    DBMS_TCP_PROTOCOL_VERSION,
14};
15use crate::prelude::*;
16
17#[repr(u8)]
18#[derive(PartialEq, Clone, Copy, Debug)]
19#[allow(unused, clippy::enum_variant_names)]
20pub(crate) enum QueryKind {
21    NoQuery,
22    InitialQuery,
23    SecondaryQuery,
24}
25
26#[derive(Debug)]
27pub(crate) struct OpenTelemetry<'a> {
28    trace_id:    Uuid,
29    span_id:     u64,
30    tracestate:  &'a str,
31    trace_flags: u8,
32}
33
34#[derive(Debug)]
35pub(crate) struct ClientInfo<'a> {
36    pub kind:                        QueryKind,
37    pub initial_user:                &'a str,
38    pub initial_query_id:            &'a str,
39    pub initial_address:             &'a str,
40    // interface = TCP = 1
41    pub os_user:                     &'a str,
42    pub client_hostname:             &'a str,
43    pub client_name:                 &'a str,
44    pub client_version_major:        u64,
45    pub client_version_minor:        u64,
46    pub client_tcp_protocol_version: u64,
47    // DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME
48    pub query_start_time:            u64,
49    // if DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO
50    pub quota_key:                   &'a str,
51    // if DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH
52    pub distributed_depth:           u64,
53    // if DBMS_MIN_REVISION_WITH_VERSION_PATCH
54    pub client_version_patch:        u64,
55    // if DBMS_MIN_REVISION_WITH_OPENTELEMETRY
56    pub open_telemetry:              Option<OpenTelemetry<'a>>,
57}
58
59impl Default for ClientInfo<'_> {
60    fn default() -> Self {
61        ClientInfo {
62            kind: QueryKind::InitialQuery,
63            initial_user: "",
64            initial_query_id: "",
65            initial_address: "0.0.0.0:0",
66            os_user: "",
67            client_hostname: "localhost",
68            client_name: "ClickHouseArrow",
69            client_version_major: crate::constants::VERSION_MAJOR,
70            client_version_minor: crate::constants::VERSION_MINOR,
71            client_version_patch: crate::constants::VERSION_PATCH,
72            client_tcp_protocol_version: DBMS_TCP_PROTOCOL_VERSION,
73            #[expect(clippy::cast_possible_truncation)]
74            query_start_time: std::time::SystemTime::now()
75                .duration_since(std::time::UNIX_EPOCH)
76                .unwrap_or(std::time::Duration::from_secs(0))
77                .as_micros() as u64,
78            quota_key: "",
79            distributed_depth: 1,
80            open_telemetry: None,
81        }
82    }
83}
84
85impl ClientInfo<'_> {
86    pub(crate) async fn write<W: ClickHouseWrite>(&self, to: &mut W, revision: u64) -> Result<()> {
87        to.write_u8(self.kind as u8).await?;
88        if self.kind == QueryKind::NoQuery {
89            return Ok(());
90        }
91        to.write_string(self.initial_user).await?;
92        to.write_string(self.initial_query_id).await?;
93        to.write_string(self.initial_address).await?;
94
95        if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_QUERY_START_TIME {
96            to.write_u64_le(self.query_start_time).await?;
97        }
98
99        // interface = TCP = 1
100        to.write_u8(1).await?;
101
102        to.write_string(self.os_user).await?;
103        to.write_string(self.client_hostname).await?;
104        to.write_string(self.client_name).await?;
105
106        to.write_var_uint(self.client_version_major).await?;
107        to.write_var_uint(self.client_version_minor).await?;
108        to.write_var_uint(self.client_tcp_protocol_version).await?;
109
110        if revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO {
111            to.write_string(self.quota_key).await?;
112        }
113        if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_DISTRIBUTED_DEPTH {
114            to.write_var_uint(self.distributed_depth).await?;
115        }
116        if revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH {
117            to.write_var_uint(self.client_version_patch).await?;
118        }
119        if revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY {
120            if let Some(telemetry) = &self.open_telemetry {
121                to.write_u8(1u8).await?;
122                to.write_all(&telemetry.trace_id.as_bytes()[..]).await?;
123                to.write_u64(telemetry.span_id).await?;
124                to.write_string(telemetry.tracestate).await?;
125                to.write_u8(telemetry.trace_flags).await?;
126            } else {
127                to.write_u8(0u8).await?;
128            }
129        }
130
131        if revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARALLEL_REPLICAS {
132            to.write_var_uint(0).await?; // collaborate_with_initiator
133            to.write_var_uint(0).await?; // count_participating_replicas
134            to.write_var_uint(0).await?; // number_of_current_replica
135        }
136
137        if revision >= DBMS_MIN_REVISION_WITH_QUERY_AND_LINE_NUMBERS {
138            to.write_var_uint(0).await?; // script_query_number
139            to.write_var_uint(0).await?; // script_line_number
140        }
141
142        if revision >= DBMS_MIN_REVISION_WITH_JWT_IN_INTERSERVER {
143            // TODO: Support jwt
144            to.write_u8(0).await?;
145        }
146
147        Ok(())
148    }
149}