opensrv_clickhouse/
lib.rs

1// Copyright 2021 Datafuse Labs.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// https://github.com/rust-lang/rust-clippy/issues/8334
16#![allow(clippy::ptr_arg)]
17
18use std::sync::Arc;
19
20use errors::Result;
21use protocols::Stage;
22use tokio::net::TcpStream;
23use tokio::sync::mpsc::Sender;
24use tokio::sync::Notify;
25
26use crate::cmd::Cmd;
27use crate::connection::Connection;
28use crate::protocols::HelloRequest;
29use crate::types::Block;
30use crate::types::Progress;
31
32pub mod binary;
33pub mod cmd;
34pub mod connection;
35pub mod error_codes;
36pub mod errors;
37pub mod protocols;
38pub mod types;
39
40/// Metadata for ClickHouse
41#[derive(Debug, Clone)]
42pub struct ClickHouseMetadata {
43    name: String,
44    display_name: String,
45    major_version: u64,
46    minor_version: u64,
47    patch_version: u64,
48    tcp_protocol_version: u64,
49    timezone: String,
50    has_stack_trace: bool,
51}
52
53impl Default for ClickHouseMetadata {
54    fn default() -> Self {
55        Self {
56            name: "clickhouse-server".to_string(),
57            display_name: "clickhouse-server".to_string(),
58            major_version: 19,
59            minor_version: 17,
60            patch_version: 1,
61            tcp_protocol_version: 54428,
62            timezone: "UTC".to_string(),
63            has_stack_trace: false,
64        }
65    }
66}
67
68impl ClickHouseMetadata {
69    /// ClickHouse DBMS Name.
70    pub fn name(&self) -> &str {
71        &self.name
72    }
73
74    /// Set clickhouse DBMS name.
75    pub fn with_name(mut self, name: &str) -> Self {
76        self.name = name.to_string();
77        self
78    }
79
80    /// ClickHouse DBMS Display Name.
81    pub fn display_name(&self) -> &str {
82        &self.display_name
83    }
84
85    /// Set clickhouse DBMS display name.
86    pub fn with_display_name(mut self, name: &str) -> Self {
87        self.display_name = name.to_string();
88        self
89    }
90
91    /// ClickHouse's version.
92    ///
93    /// (major, minor, patch)
94    pub fn version(&self) -> (u64, u64, u64) {
95        (self.major_version, self.minor_version, self.patch_version)
96    }
97
98    /// Set clickhouse major version
99    pub fn with_major_version(mut self, v: u64) -> Self {
100        self.major_version = v;
101        self
102    }
103
104    /// Set clickhouse minor version
105    pub fn with_minor_version(mut self, v: u64) -> Self {
106        self.minor_version = v;
107        self
108    }
109
110    /// Set clickhouse patch version
111    pub fn with_patch_version(mut self, v: u64) -> Self {
112        self.patch_version = v;
113        self
114    }
115
116    /// ClickHouse's tcp_protocol_version
117    pub fn tcp_protocol_version(&self) -> u64 {
118        self.tcp_protocol_version
119    }
120
121    /// Set clickhouse tcp protocol version
122    pub fn with_tcp_protocol_version(mut self, v: u64) -> Self {
123        self.tcp_protocol_version = v;
124        self
125    }
126
127    /// ClickHouse's timezone.
128    pub fn timezone(&self) -> &str {
129        &self.timezone
130    }
131
132    /// Set clickhouse timezone
133    pub fn with_timezone(mut self, v: &str) -> Self {
134        self.timezone = v.to_string();
135        self
136    }
137
138    /// Is this session has stack trace
139    pub fn has_stack_trace(&self) -> bool {
140        self.has_stack_trace
141    }
142
143    /// Enable stack trace for clickhouse
144    pub fn with_enable_stack_trace(mut self) -> Self {
145        self.has_stack_trace = true;
146        self
147    }
148}
149
150#[async_trait::async_trait]
151pub trait ClickHouseSession: Send + Sync {
152    async fn authenticate(&self, _username: &str, _password: &[u8], _client_addr: &str) -> bool {
153        true
154    }
155
156    async fn execute_query(&self, ctx: &mut CHContext, connection: &mut Connection) -> Result<()>;
157
158    fn get_progress(&self) -> Progress {
159        Progress::default()
160    }
161
162    /// Get ClickHouse metadata.
163    fn metadata(&self) -> &ClickHouseMetadata;
164
165    #[deprecated = "use ClickHouseMetadata::has_stack_trace() instead"]
166    fn with_stack_trace(&self) -> bool {
167        self.metadata().has_stack_trace()
168    }
169
170    #[deprecated = "use ClickHouseMetadata::name() instead"]
171    fn dbms_name(&self) -> &str {
172        self.metadata().name()
173    }
174
175    // None is by default, which will use same version as client send
176    #[deprecated = "use ClickHouseMetadata::version() instead"]
177    fn dbms_version_major(&self) -> u64 {
178        self.metadata().version().0
179    }
180
181    #[deprecated = "use ClickHouseMetadata::version() instead"]
182    fn dbms_version_minor(&self) -> u64 {
183        self.metadata().version().1
184    }
185
186    #[deprecated = "use ClickHouseMetadata::tcp_protocol_version() instead"]
187    fn dbms_tcp_protocol_version(&self) -> u64 {
188        self.metadata().tcp_protocol_version()
189    }
190
191    #[deprecated = "use ClickHouseMetadata::timezone() instead"]
192    fn timezone(&self) -> &str {
193        self.metadata().timezone()
194    }
195
196    #[deprecated = "use ClickHouseMetadata::display_name() instead"]
197    fn server_display_name(&self) -> &str {
198        self.metadata().display_name()
199    }
200
201    #[deprecated = "use ClickHouseMetadata::version() instead"]
202    fn dbms_version_patch(&self) -> u64 {
203        self.metadata().version().2
204    }
205}
206
207#[derive(Default)]
208pub struct QueryState {
209    pub query_id: String,
210    pub stage: Stage,
211    pub compression: u64,
212    pub query: String,
213    pub is_cancelled: bool,
214    pub is_connection_closed: bool,
215    /// empty or not
216    pub is_empty: bool,
217
218    /// Data was sent.
219    pub sent_all_data: Arc<Notify>,
220    pub out: Option<Sender<Block>>,
221}
222
223impl QueryState {
224    fn reset(&mut self) {
225        self.stage = Stage::Default;
226        self.is_cancelled = false;
227        self.is_connection_closed = false;
228        self.is_empty = false;
229        self.out = None;
230    }
231}
232
233pub struct CHContext {
234    pub state: QueryState,
235
236    pub client_revision: u64,
237    pub hello: Option<HelloRequest>,
238}
239
240impl CHContext {
241    pub fn new(state: QueryState) -> Self {
242        Self {
243            state,
244            client_revision: 0,
245            hello: None,
246        }
247    }
248}
249
250/// A server that speaks the ClickHouseprotocol, and can delegate client commands to a backend
251/// that implements [`ClickHouseSession`]
252pub struct ClickHouseServer {}
253
254impl ClickHouseServer {
255    pub async fn run_on_stream(
256        session: Arc<dyn ClickHouseSession>,
257        stream: TcpStream,
258    ) -> Result<()> {
259        ClickHouseServer::run_on(session, stream).await
260    }
261}
262
263impl ClickHouseServer {
264    async fn run_on(session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
265        let mut srv = ClickHouseServer {};
266        srv.run(session, stream).await?;
267        Ok(())
268    }
269
270    async fn run(&mut self, session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
271        tracing::debug!("Handle New session");
272        let metadata = session.metadata();
273        let tz = metadata.timezone().to_string();
274        let mut ctx = CHContext::new(QueryState::default());
275        let mut connection = Connection::new(stream, session, tz)?;
276
277        loop {
278            // signal.
279            let maybe_packet = tokio::select! {
280               res = connection.read_packet(&mut ctx) => res,
281            };
282
283            let packet = match maybe_packet {
284                Ok(Some(packet)) => packet,
285                Err(e) => {
286                    ctx.state.reset();
287                    connection.write_error(&e).await?;
288                    return Err(e);
289                }
290                Ok(None) => {
291                    tracing::debug!("{:?}", "none data reset");
292                    ctx.state.reset();
293                    return Ok(());
294                }
295            };
296            let cmd = Cmd::create(packet);
297            cmd.apply(&mut connection, &mut ctx).await?;
298        }
299    }
300}
301
302#[macro_export]
303macro_rules! row {
304    () => { $crate::types::RNil };
305    ( $i:ident, $($tail:tt)* ) => {
306        row!( $($tail)* ).put(stringify!($i).into(), $i.into())
307    };
308    ( $i:ident ) => { row!($i: $i) };
309
310    ( $k:ident: $v:expr ) => {
311        $crate::types::RNil.put(stringify!($k).into(), $v.into())
312    };
313
314    ( $k:ident: $v:expr, $($tail:tt)* ) => {
315        row!( $($tail)* ).put(stringify!($k).into(), $v.into())
316    };
317
318    ( $k:expr => $v:expr ) => {
319        $crate::types::RNil.put($k.into(), $v.into())
320    };
321
322    ( $k:expr => $v:expr, $($tail:tt)* ) => {
323        row!( $($tail)* ).put($k.into(), $v.into())
324    };
325}