clickhouse_srv/protocols/
protocol_query.rs1use std::io::Read;
2
3use super::*;
4use crate::binary::ReadEx;
5use crate::errors::DriverError::UnknownSetting;
6use crate::errors::Error;
7use crate::errors::Result;
8
9const TCP: u8 = 1;
10const HTTP: u8 = 2;
11
12#[derive(Default, Debug)]
13pub struct QueryClientInfo {
14 pub query_kind: u8,
15 pub initial_user: String,
16 pub initial_query_id: String,
17
18 pub initial_address: String,
19 pub interface: u8,
20
21 pub os_user: String,
23 pub client_hostname: String,
24 pub client_name: String,
25
26 pub client_version_major: u64,
27 pub client_version_minor: u64,
28 pub client_version_patch: u64,
29 pub client_revision: u64,
30
31 pub http_method: u8,
33 pub http_user_agent: String,
34
35 pub quota_key: String
36}
37
38impl QueryClientInfo {
39 pub fn read_from<R: Read>(reader: &mut R) -> Result<QueryClientInfo> {
40 let mut client_info = QueryClientInfo {
41 query_kind: reader.read_scalar()?,
42 ..Default::default()
43 };
44
45 if client_info.query_kind == 0 {
46 return Ok(client_info);
47 }
48
49 client_info.initial_user = reader.read_string()?;
50 client_info.initial_query_id = reader.read_string()?;
51 client_info.initial_address = reader.read_string()?;
52 client_info.interface = reader.read_scalar()?;
53
54 match client_info.interface {
55 TCP => {
56 client_info.os_user = reader.read_string()?;
57 client_info.client_hostname = reader.read_string()?;
58 client_info.client_name = reader.read_string()?;
59
60 client_info.client_version_major = reader.read_uvarint()?;
61 client_info.client_version_minor = reader.read_uvarint()?;
62 let client_revision = reader.read_uvarint()?;
63
64 client_info.client_revision = client_revision;
65 client_info.client_version_patch = client_revision;
66 }
67 HTTP => {
68 client_info.http_method = reader.read_scalar()?;
69 client_info.http_user_agent = reader.read_string()?;
70 }
71 _ => {}
72 }
73
74 if client_info.client_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO {
75 client_info.quota_key = reader.read_string()?;
76 }
77
78 if client_info.interface == TCP
79 && client_info.client_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH
80 {
81 client_info.client_version_patch = reader.read_uvarint()?;
82 }
83
84 Ok(client_info)
92 }
93}
94
95#[derive(Default, Debug)]
96pub struct QueryRequest {
97 pub(crate) query_id: String,
98 pub(crate) client_info: QueryClientInfo,
99 pub(crate) stage: u64,
100 pub(crate) compression: u64,
101 pub(crate) query: String
102}
103
104impl QueryRequest {
105 pub fn read_from<R: Read>(
106 reader: &mut R,
107 hello_request: &HelloRequest
108 ) -> Result<QueryRequest> {
109 let query_id = reader.read_string()?;
110
111 let mut client_info = Default::default();
112 if hello_request.client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO {
113 client_info = QueryClientInfo::read_from(reader)?;
114 }
115
116 if client_info.query_kind == 0 {
117 client_info.query_kind = INITIAL_QUERY;
118 client_info.client_name = hello_request.client_name.clone();
119 client_info.client_version_major = hello_request.client_version_major;
120 client_info.client_version_minor = hello_request.client_version_minor;
121 client_info.client_version_patch = hello_request.client_version_patch;
122 client_info.client_revision = hello_request.client_revision;
123 }
124
125 client_info.interface = TCP;
126
127 loop {
128 let name = reader.read_string()?;
129
130 if name.is_empty() {
131 break;
132 }
133
134 match name.as_str() {
135 "max_block_size" | "max_threads" => {
136 let _ = reader.read_uvarint()?;
137 }
138 _ => {
139 return Err(Error::Driver(UnknownSetting { name }));
140 }
141 }
142 }
143
144 let query_protocol = QueryRequest {
145 query_id,
146 client_info,
147 stage: reader.read_uvarint()?,
148 compression: reader.read_uvarint()?,
149 query: reader.read_string()?
150 };
151
152 Ok(query_protocol)
153 }
154}