opensrv_clickhouse/protocols/
protocol_query.rs

1// Copyright 2021 Datafuse Labs.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Read;
16
17use super::*;
18use crate::binary::ReadEx;
19use crate::errors::DriverError::UnknownSetting;
20use crate::errors::Error;
21use crate::errors::Result;
22
23const TCP: u8 = 1;
24const HTTP: u8 = 2;
25
26#[derive(Default, Debug)]
27pub struct QueryClientInfo {
28    pub query_kind: u8,
29    pub initial_user: String,
30    pub initial_query_id: Vec<u8>,
31
32    pub initial_address: Vec<u8>,
33    pub interface: u8,
34
35    // TCP
36    pub os_user: Vec<u8>,
37    pub client_hostname: Vec<u8>,
38    pub client_name: Vec<u8>,
39
40    pub client_version_major: u64,
41    pub client_version_minor: u64,
42    pub client_version_patch: u64,
43    pub client_revision: u64,
44
45    // HTTP
46    pub http_method: u8,
47    pub http_user_agent: Vec<u8>,
48
49    pub quota_key: Vec<u8>,
50}
51
52impl QueryClientInfo {
53    pub fn read_from<R: Read>(reader: &mut R) -> Result<QueryClientInfo> {
54        let mut client_info = QueryClientInfo {
55            query_kind: reader.read_scalar()?,
56            ..Default::default()
57        };
58
59        if client_info.query_kind == 0 {
60            return Ok(client_info);
61        }
62
63        client_info.initial_user = reader.read_string()?;
64        client_info.initial_query_id = reader.read_len_encode_bytes()?;
65        client_info.initial_address = reader.read_len_encode_bytes()?;
66
67        client_info.interface = reader.read_scalar()?;
68
69        match client_info.interface {
70            TCP => {
71                client_info.os_user = reader.read_len_encode_bytes()?;
72                client_info.client_hostname = reader.read_len_encode_bytes()?;
73                client_info.client_name = reader.read_len_encode_bytes()?;
74
75                client_info.client_version_major = reader.read_uvarint()?;
76                client_info.client_version_minor = reader.read_uvarint()?;
77                let client_revision = reader.read_uvarint()?;
78
79                client_info.client_revision = client_revision;
80                client_info.client_version_patch = client_revision;
81            }
82            HTTP => {
83                client_info.http_method = reader.read_scalar()?;
84                client_info.http_user_agent = reader.read_len_encode_bytes()?;
85            }
86            _ => {}
87        }
88
89        if client_info.client_revision >= DBMS_MIN_REVISION_WITH_QUOTA_KEY_IN_CLIENT_INFO {
90            client_info.quota_key = reader.read_len_encode_bytes()?;
91        }
92
93        if client_info.interface == TCP
94            && client_info.client_revision >= DBMS_MIN_REVISION_WITH_VERSION_PATCH
95        {
96            client_info.client_version_patch = reader.read_uvarint()?;
97        }
98
99        // TODO
100        // if client_info.client_revision >= DBMS_MIN_REVISION_WITH_OPENTELEMETRY {
101        //     let trace_id: u8 = reader.read_scalar()?;
102        //     if trace_id > 0 {
103        //     }
104        // }
105
106        Ok(client_info)
107    }
108}
109
110#[allow(dead_code)]
111#[derive(Default, Debug)]
112pub struct QueryRequest {
113    pub(crate) query_id: String,
114    pub(crate) client_info: QueryClientInfo,
115    pub(crate) stage: u64,
116    pub(crate) compression: u64,
117    pub(crate) query: String,
118}
119
120impl QueryRequest {
121    pub fn read_from<R: Read>(
122        reader: &mut R,
123        hello_request: &HelloRequest,
124    ) -> Result<QueryRequest> {
125        let query_id = reader.read_string()?;
126
127        let mut client_info = Default::default();
128        if hello_request.client_revision >= DBMS_MIN_REVISION_WITH_CLIENT_INFO {
129            client_info = QueryClientInfo::read_from(reader)?;
130        }
131
132        if client_info.query_kind == 0 {
133            client_info.query_kind = INITIAL_QUERY;
134            client_info.client_name = hello_request.client_name.clone();
135            client_info.client_version_major = hello_request.client_version_major;
136            client_info.client_version_minor = hello_request.client_version_minor;
137            client_info.client_version_patch = hello_request.client_version_patch;
138            client_info.client_revision = hello_request.client_revision;
139        }
140
141        client_info.interface = TCP;
142
143        loop {
144            let name = reader.read_string()?;
145
146            if name.is_empty() {
147                break;
148            }
149
150            match name.as_str() {
151                "max_block_size" | "max_threads" | "readonly" => {
152                    let _ = reader.read_uvarint()?;
153                }
154                _ => {
155                    return Err(Error::Driver(UnknownSetting { name }));
156                }
157            }
158        }
159
160        let query_protocol = QueryRequest {
161            query_id,
162            client_info,
163            stage: reader.read_uvarint()?,
164            compression: reader.read_uvarint()?,
165            query: reader.read_string()?,
166        };
167
168        Ok(query_protocol)
169    }
170}