1use std::sync::Arc;
2
3use errors::Result;
4use log::debug;
5use protocols::Stage;
6use tokio::net::TcpStream;
7use tokio::sync::mpsc::Sender;
8use tokio::sync::Notify;
9
10use crate::cmd::Cmd;
11use crate::connection::Connection;
12use crate::protocols::HelloRequest;
13use crate::types::Block;
14use crate::types::Progress;
15
16mod binary;
17pub mod cmd;
18pub mod connection;
19pub mod error_codes;
20pub mod errors;
21pub mod protocols;
22pub mod types;
23
24#[async_trait::async_trait]
25pub trait ClickHouseSession: Send + Sync {
26 async fn execute_query(&self, ctx: &mut CHContext, connection: &mut Connection) -> Result<()>;
27
28 fn with_stack_trace(&self) -> bool {
29 false
30 }
31
32 fn dbms_name(&self) -> &str {
33 "clickhouse-server"
34 }
35
36 fn dbms_version_major(&self) -> u64 {
38 19
39 }
40
41 fn dbms_version_minor(&self) -> u64 {
42 17
43 }
44
45 fn dbms_tcp_protocol_version(&self) -> u64 {
46 54428
47 }
48
49 fn timezone(&self) -> &str {
50 "UTC"
51 }
52
53 fn server_display_name(&self) -> &str {
54 "clickhouse-server"
55 }
56
57 fn dbms_version_patch(&self) -> u64 {
58 1
59 }
60
61 fn get_progress(&self) -> Progress {
62 Progress::default()
63 }
64}
65
66#[derive(Default)]
67pub struct QueryState {
68 pub query_id: String,
69 pub stage: Stage,
70 pub compression: u64,
71 pub query: String,
72 pub is_cancelled: bool,
73 pub is_connection_closed: bool,
74 pub is_empty: bool,
76
77 pub sent_all_data: Arc<Notify>,
79 pub out: Option<Sender<Block>>
80}
81
82impl QueryState {
83 fn reset(&mut self) {
84 self.stage = Stage::Default;
85 self.is_cancelled = false;
86 self.is_connection_closed = false;
87 self.is_empty = false;
88 self.out = None;
89 }
90}
91
92pub struct CHContext {
93 pub state: QueryState,
94
95 pub client_revision: u64,
96 pub hello: Option<HelloRequest>
97}
98
99impl CHContext {
100 pub fn new(state: QueryState) -> Self {
101 Self {
102 state,
103 client_revision: 0,
104 hello: None
105 }
106 }
107}
108
109pub struct ClickHouseServer {}
112
113impl ClickHouseServer {
114 pub async fn run_on_stream(
115 session: Arc<dyn ClickHouseSession>,
116 stream: TcpStream
117 ) -> Result<()> {
118 ClickHouseServer::run_on(session, stream.into()).await
119 }
120}
121
122impl ClickHouseServer {
123 async fn run_on(session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
124 let mut srv = ClickHouseServer {};
125 srv.run(session, stream).await?;
126 Ok(())
127 }
128
129 async fn run(&mut self, session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
130 debug!("Handle New session");
131 let tz = session.timezone().to_string();
132 let mut ctx = CHContext::new(QueryState::default());
133 let mut connection = Connection::new(stream, session, tz)?;
134
135 loop {
136 let maybe_packet = tokio::select! {
138 res = connection.read_packet(&mut ctx) => res,
139 };
140
141 let packet = match maybe_packet {
142 Ok(Some(packet)) => packet,
143 Err(e) => {
144 ctx.state.reset();
145 connection.write_error(&e).await?;
146 return Err(e);
147 }
148 Ok(None) => {
149 debug!("{:?}", "none data reset");
150 ctx.state.reset();
151 return Ok(());
152 }
153 };
154 let cmd = Cmd::create(packet);
155 cmd.apply(&mut connection, &mut ctx).await?;
156 }
157 }
158}
159
160#[macro_export]
161macro_rules! row {
162 () => { $crate::types::RNil };
163 ( $i:ident, $($tail:tt)* ) => {
164 row!( $($tail)* ).put(stringify!($i).into(), $i.into())
165 };
166 ( $i:ident ) => { row!($i: $i) };
167
168 ( $k:ident: $v:expr ) => {
169 $crate::types::RNil.put(stringify!($k).into(), $v.into())
170 };
171
172 ( $k:ident: $v:expr, $($tail:tt)* ) => {
173 row!( $($tail)* ).put(stringify!($k).into(), $v.into())
174 };
175
176 ( $k:expr => $v:expr ) => {
177 $crate::types::RNil.put($k.into(), $v.into())
178 };
179
180 ( $k:expr => $v:expr, $($tail:tt)* ) => {
181 row!( $($tail)* ).put($k.into(), $v.into())
182 };
183}
184
185#[cfg(test)]
186mod tests {
187 #[test]
188 fn it_works() {
189 assert_eq!(2 + 2, 4);
190 }
191}