opensrv_clickhouse/protocols/
protocol_query.rs1use std::io::Read;
16
17use super::*;
18use crate::binary::ReadEx;
19use crate::errors::DriverError::UnknownSetting;
20use crate::errors::Error;
21use crate::errors::Result;
22
23const TCP: u8 = 1;
24const HTTP: u8 = 2;
25
26#[derive(Default, Debug)]
27pub struct QueryClientInfo {
28 pub query_kind: u8,
29 pub initial_user: String,
30 pub initial_query_id: Vec<u8>,
31
32 pub initial_address: Vec<u8>,
33 pub interface: u8,
34
35 pub os_user: Vec<u8>,
37 pub client_hostname: Vec<u8>,
38 pub client_name: Vec<u8>,
39
40 pub client_version_major: u64,
41 pub client_version_minor: u64,
42 pub client_version_patch: u64,
43 pub client_revision: u64,
44
45 pub http_method: u8,
47 pub http_user_agent: Vec<u8>,
48
49 pub quota_key: Vec<u8>,
50}
51
52impl QueryClientInfo {
53 pub fn read_from<R: Read>(reader: &mut R) -> Result<QueryClientInfo> {
54 let mut client_info = QueryClientInfo {
55 query_kind: reader.read_scalar()?,
56 ..Default::default()
57 };
58
59 if client_info.query_kind == 0 {
60 return Ok(client_info);
61 }
62
63 client_info.initial_user = reader.read_string()?;
64 client_info.initial_query_id = reader.read_len_encode_bytes()?;
65 client_info.initial_address = reader.read_len_encode_bytes()?;
66
67 client_info.interface = reader.read_scalar()?;
68
69 match client_info.interface {
70 TCP => {
71 client_info.os_user = reader.read_len_encode_bytes()?;
72 client_info.client_hostname = reader.read_len_encode_bytes()?;
73 client_info.client_name = reader.read_len_encode_bytes()?;
74
75 client_info.client_version_major = reader.read_uvarint()?;
76 client_info.client_version_minor = reader.read_uvarint()?;
77 let client_revision = reader.read_uvarint()?;
78
79 client_info.client_revision = client_revision;
80 client_info.client_version_patch = client_revision;
81 }
82 HTTP => {
83 client_info.http_method = reader.read_scalar()?;
84 client_info.http_user_agent = reader.read_len_encode_bytes()?;
85 }
86 _ => {}
87 }
88
89 if client_info.client_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO {
90 client_info.quota_key = reader.read_len_encode_bytes()?;
91 }
92
93 if client_info.interface == TCP
94 && client_info.client_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH
95 {
96 client_info.client_version_patch = reader.read_uvarint()?;
97 }
98
99 Ok(client_info)
107 }
108}
109
110#[allow(dead_code)]
111#[derive(Default, Debug)]
112pub struct QueryRequest {
113 pub(crate) query_id: String,
114 pub(crate) client_info: QueryClientInfo,
115 pub(crate) stage: u64,
116 pub(crate) compression: u64,
117 pub(crate) query: String,
118}
119
120impl QueryRequest {
121 pub fn read_from<R: Read>(
122 reader: &mut R,
123 hello_request: &HelloRequest,
124 ) -> Result<QueryRequest> {
125 let query_id = reader.read_string()?;
126
127 let mut client_info = Default::default();
128 if hello_request.client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO {
129 client_info = QueryClientInfo::read_from(reader)?;
130 }
131
132 if client_info.query_kind == 0 {
133 client_info.query_kind = INITIAL_QUERY;
134 client_info.client_name = hello_request.client_name.clone();
135 client_info.client_version_major = hello_request.client_version_major;
136 client_info.client_version_minor = hello_request.client_version_minor;
137 client_info.client_version_patch = hello_request.client_version_patch;
138 client_info.client_revision = hello_request.client_revision;
139 }
140
141 client_info.interface = TCP;
142
143 loop {
144 let name = reader.read_string()?;
145
146 if name.is_empty() {
147 break;
148 }
149
150 match name.as_str() {
151 "max_block_size" | "max_threads" | "readonly" => {
152 let _ = reader.read_uvarint()?;
153 }
154 _ => {
155 return Err(Error::Driver(UnknownSetting { name }));
156 }
157 }
158 }
159
160 let query_protocol = QueryRequest {
161 query_id,
162 client_info,
163 stage: reader.read_uvarint()?,
164 compression: reader.read_uvarint()?,
165 query: reader.read_string()?,
166 };
167
168 Ok(query_protocol)
169 }
170}