clickhouse_srv/protocols/
protocol_query.rs

1use std::io::Read;
2
3use super::*;
4use crate::binary::ReadEx;
5use crate::errors::DriverError::UnknownSetting;
6use crate::errors::Error;
7use crate::errors::Result;
8
9const TCP: u8 = 1;
10const HTTP: u8 = 2;
11
12#[derive(Default, Debug)]
13pub struct QueryClientInfo {
14    pub query_kind: u8,
15    pub initial_user: String,
16    pub initial_query_id: String,
17
18    pub initial_address: String,
19    pub interface: u8,
20
21    // TCP
22    pub os_user: String,
23    pub client_hostname: String,
24    pub client_name: String,
25
26    pub client_version_major: u64,
27    pub client_version_minor: u64,
28    pub client_version_patch: u64,
29    pub client_revision: u64,
30
31    // HTTP
32    pub http_method: u8,
33    pub http_user_agent: String,
34
35    pub quota_key: String
36}
37
38impl QueryClientInfo {
39    pub fn read_from<R: Read>(reader: &mut R) -> Result<QueryClientInfo> {
40        let mut client_info = QueryClientInfo {
41            query_kind: reader.read_scalar()?,
42            ..Default::default()
43        };
44
45        if client_info.query_kind == 0 {
46            return Ok(client_info);
47        }
48
49        client_info.initial_user = reader.read_string()?;
50        client_info.initial_query_id = reader.read_string()?;
51        client_info.initial_address = reader.read_string()?;
52        client_info.interface = reader.read_scalar()?;
53
54        match client_info.interface {
55            TCP => {
56                client_info.os_user = reader.read_string()?;
57                client_info.client_hostname = reader.read_string()?;
58                client_info.client_name = reader.read_string()?;
59
60                client_info.client_version_major = reader.read_uvarint()?;
61                client_info.client_version_minor = reader.read_uvarint()?;
62                let client_revision = reader.read_uvarint()?;
63
64                client_info.client_revision = client_revision;
65                client_info.client_version_patch = client_revision;
66            }
67            HTTP => {
68                client_info.http_method = reader.read_scalar()?;
69                client_info.http_user_agent = reader.read_string()?;
70            }
71            _ => {}
72        }
73
74        if client_info.client_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO {
75            client_info.quota_key = reader.read_string()?;
76        }
77
78        if client_info.interface == TCP
79            && client_info.client_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH
80        {
81            client_info.client_version_patch = reader.read_uvarint()?;
82        }
83
84        // TODO
85        // if client_info.client_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY {
86        //     let trace_id: u8 = reader.read_scalar()?;
87        //     if trace_id > 0 {
88        //     }
89        // }
90
91        Ok(client_info)
92    }
93}
94
95#[derive(Default, Debug)]
96pub struct QueryRequest {
97    pub(crate) query_id: String,
98    pub(crate) client_info: QueryClientInfo,
99    pub(crate) stage: u64,
100    pub(crate) compression: u64,
101    pub(crate) query: String
102}
103
104impl QueryRequest {
105    pub fn read_from<R: Read>(
106        reader: &mut R,
107        hello_request: &HelloRequest
108    ) -> Result<QueryRequest> {
109        let query_id = reader.read_string()?;
110
111        let mut client_info = Default::default();
112        if hello_request.client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO {
113            client_info = QueryClientInfo::read_from(reader)?;
114        }
115
116        if client_info.query_kind == 0 {
117            client_info.query_kind = INITIAL_QUERY;
118            client_info.client_name = hello_request.client_name.clone();
119            client_info.client_version_major = hello_request.client_version_major;
120            client_info.client_version_minor = hello_request.client_version_minor;
121            client_info.client_version_patch = hello_request.client_version_patch;
122            client_info.client_revision = hello_request.client_revision;
123        }
124
125        client_info.interface = TCP;
126
127        loop {
128            let name = reader.read_string()?;
129
130            if name.is_empty() {
131                break;
132            }
133
134            match name.as_str() {
135                "max_block_size" | "max_threads" => {
136                    let _ = reader.read_uvarint()?;
137                }
138                _ => {
139                    return Err(Error::Driver(UnknownSetting { name }));
140                }
141            }
142        }
143
144        let query_protocol = QueryRequest {
145            query_id,
146            client_info,
147            stage: reader.read_uvarint()?,
148            compression: reader.read_uvarint()?,
149            query: reader.read_string()?
150        };
151
152        Ok(query_protocol)
153    }
154}