opensrv_clickhouse/
lib.rs1#![allow(clippy::ptr_arg)]
17
18use std::sync::Arc;
19
20use errors::Result;
21use protocols::Stage;
22use tokio::net::TcpStream;
23use tokio::sync::mpsc::Sender;
24use tokio::sync::Notify;
25
26use crate::cmd::Cmd;
27use crate::connection::Connection;
28use crate::protocols::HelloRequest;
29use crate::types::Block;
30use crate::types::Progress;
31
32pub mod binary;
33pub mod cmd;
34pub mod connection;
35pub mod error_codes;
36pub mod errors;
37pub mod protocols;
38pub mod types;
39
40#[derive(Debug, Clone)]
42pub struct ClickHouseMetadata {
43 name: String,
44 display_name: String,
45 major_version: u64,
46 minor_version: u64,
47 patch_version: u64,
48 tcp_protocol_version: u64,
49 timezone: String,
50 has_stack_trace: bool,
51}
52
53impl Default for ClickHouseMetadata {
54 fn default() -> Self {
55 Self {
56 name: "clickhouse-server".to_string(),
57 display_name: "clickhouse-server".to_string(),
58 major_version: 19,
59 minor_version: 17,
60 patch_version: 1,
61 tcp_protocol_version: 54428,
62 timezone: "UTC".to_string(),
63 has_stack_trace: false,
64 }
65 }
66}
67
68impl ClickHouseMetadata {
69 pub fn name(&self) -> &str {
71 &self.name
72 }
73
74 pub fn with_name(mut self, name: &str) -> Self {
76 self.name = name.to_string();
77 self
78 }
79
80 pub fn display_name(&self) -> &str {
82 &self.display_name
83 }
84
85 pub fn with_display_name(mut self, name: &str) -> Self {
87 self.display_name = name.to_string();
88 self
89 }
90
91 pub fn version(&self) -> (u64, u64, u64) {
95 (self.major_version, self.minor_version, self.patch_version)
96 }
97
98 pub fn with_major_version(mut self, v: u64) -> Self {
100 self.major_version = v;
101 self
102 }
103
104 pub fn with_minor_version(mut self, v: u64) -> Self {
106 self.minor_version = v;
107 self
108 }
109
110 pub fn with_patch_version(mut self, v: u64) -> Self {
112 self.patch_version = v;
113 self
114 }
115
116 pub fn tcp_protocol_version(&self) -> u64 {
118 self.tcp_protocol_version
119 }
120
121 pub fn with_tcp_protocol_version(mut self, v: u64) -> Self {
123 self.tcp_protocol_version = v;
124 self
125 }
126
127 pub fn timezone(&self) -> &str {
129 &self.timezone
130 }
131
132 pub fn with_timezone(mut self, v: &str) -> Self {
134 self.timezone = v.to_string();
135 self
136 }
137
138 pub fn has_stack_trace(&self) -> bool {
140 self.has_stack_trace
141 }
142
143 pub fn with_enable_stack_trace(mut self) -> Self {
145 self.has_stack_trace = true;
146 self
147 }
148}
149
150#[async_trait::async_trait]
151pub trait ClickHouseSession: Send + Sync {
152 async fn authenticate(&self, _username: &str, _password: &[u8], _client_addr: &str) -> bool {
153 true
154 }
155
156 async fn execute_query(&self, ctx: &mut CHContext, connection: &mut Connection) -> Result<()>;
157
158 fn get_progress(&self) -> Progress {
159 Progress::default()
160 }
161
162 fn metadata(&self) -> &ClickHouseMetadata;
164
165 #[deprecated = "use ClickHouseMetadata::has_stack_trace() instead"]
166 fn with_stack_trace(&self) -> bool {
167 self.metadata().has_stack_trace()
168 }
169
170 #[deprecated = "use ClickHouseMetadata::name() instead"]
171 fn dbms_name(&self) -> &str {
172 self.metadata().name()
173 }
174
175 #[deprecated = "use ClickHouseMetadata::version() instead"]
177 fn dbms_version_major(&self) -> u64 {
178 self.metadata().version().0
179 }
180
181 #[deprecated = "use ClickHouseMetadata::version() instead"]
182 fn dbms_version_minor(&self) -> u64 {
183 self.metadata().version().1
184 }
185
186 #[deprecated = "use ClickHouseMetadata::tcp_protocol_version() instead"]
187 fn dbms_tcp_protocol_version(&self) -> u64 {
188 self.metadata().tcp_protocol_version()
189 }
190
191 #[deprecated = "use ClickHouseMetadata::timezone() instead"]
192 fn timezone(&self) -> &str {
193 self.metadata().timezone()
194 }
195
196 #[deprecated = "use ClickHouseMetadata::display_name() instead"]
197 fn server_display_name(&self) -> &str {
198 self.metadata().display_name()
199 }
200
201 #[deprecated = "use ClickHouseMetadata::version() instead"]
202 fn dbms_version_patch(&self) -> u64 {
203 self.metadata().version().2
204 }
205}
206
207#[derive(Default)]
208pub struct QueryState {
209 pub query_id: String,
210 pub stage: Stage,
211 pub compression: u64,
212 pub query: String,
213 pub is_cancelled: bool,
214 pub is_connection_closed: bool,
215 pub is_empty: bool,
217
218 pub sent_all_data: Arc<Notify>,
220 pub out: Option<Sender<Block>>,
221}
222
223impl QueryState {
224 fn reset(&mut self) {
225 self.stage = Stage::Default;
226 self.is_cancelled = false;
227 self.is_connection_closed = false;
228 self.is_empty = false;
229 self.out = None;
230 }
231}
232
233pub struct CHContext {
234 pub state: QueryState,
235
236 pub client_revision: u64,
237 pub hello: Option<HelloRequest>,
238}
239
240impl CHContext {
241 pub fn new(state: QueryState) -> Self {
242 Self {
243 state,
244 client_revision: 0,
245 hello: None,
246 }
247 }
248}
249
250pub struct ClickHouseServer {}
253
254impl ClickHouseServer {
255 pub async fn run_on_stream(
256 session: Arc<dyn ClickHouseSession>,
257 stream: TcpStream,
258 ) -> Result<()> {
259 ClickHouseServer::run_on(session, stream).await
260 }
261}
262
263impl ClickHouseServer {
264 async fn run_on(session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
265 let mut srv = ClickHouseServer {};
266 srv.run(session, stream).await?;
267 Ok(())
268 }
269
270 async fn run(&mut self, session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
271 tracing::debug!("Handle New session");
272 let metadata = session.metadata();
273 let tz = metadata.timezone().to_string();
274 let mut ctx = CHContext::new(QueryState::default());
275 let mut connection = Connection::new(stream, session, tz)?;
276
277 loop {
278 let maybe_packet = tokio::select! {
280 res = connection.read_packet(&mut ctx) => res,
281 };
282
283 let packet = match maybe_packet {
284 Ok(Some(packet)) => packet,
285 Err(e) => {
286 ctx.state.reset();
287 connection.write_error(&e).await?;
288 return Err(e);
289 }
290 Ok(None) => {
291 tracing::debug!("{:?}", "none data reset");
292 ctx.state.reset();
293 return Ok(());
294 }
295 };
296 let cmd = Cmd::create(packet);
297 cmd.apply(&mut connection, &mut ctx).await?;
298 }
299 }
300}
301
302#[macro_export]
303macro_rules! row {
304 () => { $crate::types::RNil };
305 ( $i:ident, $($tail:tt)* ) => {
306 row!( $($tail)* ).put(stringify!($i).into(), $i.into())
307 };
308 ( $i:ident ) => { row!($i: $i) };
309
310 ( $k:ident: $v:expr ) => {
311 $crate::types::RNil.put(stringify!($k).into(), $v.into())
312 };
313
314 ( $k:ident: $v:expr, $($tail:tt)* ) => {
315 row!( $($tail)* ).put(stringify!($k).into(), $v.into())
316 };
317
318 ( $k:expr => $v:expr ) => {
319 $crate::types::RNil.put($k.into(), $v.into())
320 };
321
322 ( $k:expr => $v:expr, $($tail:tt)* ) => {
323 row!( $($tail)* ).put($k.into(), $v.into())
324 };
325}