clickhouse_srv/
lib.rs

1use std::sync::Arc;
2
3use errors::Result;
4use log::debug;
5use protocols::Stage;
6use tokio::net::TcpStream;
7use tokio::sync::mpsc::Sender;
8use tokio::sync::Notify;
9
10use crate::cmd::Cmd;
11use crate::connection::Connection;
12use crate::protocols::HelloRequest;
13use crate::types::Block;
14use crate::types::Progress;
15
16mod binary;
17pub mod cmd;
18pub mod connection;
19pub mod error_codes;
20pub mod errors;
21pub mod protocols;
22pub mod types;
23
24#[async_trait::async_trait]
25pub trait ClickHouseSession: Send + Sync {
26    async fn execute_query(&self, ctx: &mut CHContext, connection: &mut Connection) -> Result<()>;
27
28    fn with_stack_trace(&self) -> bool {
29        false
30    }
31
32    fn dbms_name(&self) -> &str {
33        "clickhouse-server"
34    }
35
36    // None is by default, which will use same version as client send
37    fn dbms_version_major(&self) -> u64 {
38        19
39    }
40
41    fn dbms_version_minor(&self) -> u64 {
42        17
43    }
44
45    fn dbms_tcp_protocol_version(&self) -> u64 {
46        54428
47    }
48
49    fn timezone(&self) -> &str {
50        "UTC"
51    }
52
53    fn server_display_name(&self) -> &str {
54        "clickhouse-server"
55    }
56
57    fn dbms_version_patch(&self) -> u64 {
58        1
59    }
60
61    fn get_progress(&self) -> Progress {
62        Progress::default()
63    }
64}
65
66#[derive(Default)]
67pub struct QueryState {
68    pub query_id: String,
69    pub stage: Stage,
70    pub compression: u64,
71    pub query: String,
72    pub is_cancelled: bool,
73    pub is_connection_closed: bool,
74    /// empty or not
75    pub is_empty: bool,
76
77    /// Data was sent.
78    pub sent_all_data: Arc<Notify>,
79    pub out: Option<Sender<Block>>
80}
81
82impl QueryState {
83    fn reset(&mut self) {
84        self.stage = Stage::Default;
85        self.is_cancelled = false;
86        self.is_connection_closed = false;
87        self.is_empty = false;
88        self.out = None;
89    }
90}
91
92pub struct CHContext {
93    pub state: QueryState,
94
95    pub client_revision: u64,
96    pub hello: Option<HelloRequest>
97}
98
99impl CHContext {
100    pub fn new(state: QueryState) -> Self {
101        Self {
102            state,
103            client_revision: 0,
104            hello: None
105        }
106    }
107}
108
109/// A server that speaks the ClickHouseprotocol, and can delegate client commands to a backend
110/// that implements [`ClickHouseSession`]
111pub struct ClickHouseServer {}
112
113impl ClickHouseServer {
114    pub async fn run_on_stream(
115        session: Arc<dyn ClickHouseSession>,
116        stream: TcpStream
117    ) -> Result<()> {
118        ClickHouseServer::run_on(session, stream.into()).await
119    }
120}
121
122impl ClickHouseServer {
123    async fn run_on(session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
124        let mut srv = ClickHouseServer {};
125        srv.run(session, stream).await?;
126        Ok(())
127    }
128
129    async fn run(&mut self, session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
130        debug!("Handle New session");
131        let tz = session.timezone().to_string();
132        let mut ctx = CHContext::new(QueryState::default());
133        let mut connection = Connection::new(stream, session, tz)?;
134
135        loop {
136            // signal.
137            let maybe_packet = tokio::select! {
138               res = connection.read_packet(&mut ctx) => res,
139            };
140
141            let packet = match maybe_packet {
142                Ok(Some(packet)) => packet,
143                Err(e) => {
144                    ctx.state.reset();
145                    connection.write_error(&e).await?;
146                    return Err(e);
147                }
148                Ok(None) => {
149                    debug!("{:?}", "none data reset");
150                    ctx.state.reset();
151                    return Ok(());
152                }
153            };
154            let cmd = Cmd::create(packet);
155            cmd.apply(&mut connection, &mut ctx).await?;
156        }
157    }
158}
159
160#[macro_export]
161macro_rules! row {
162    () => { $crate::types::RNil };
163    ( $i:ident, $($tail:tt)* ) => {
164        row!( $($tail)* ).put(stringify!($i).into(), $i.into())
165    };
166    ( $i:ident ) => { row!($i: $i) };
167
168    ( $k:ident: $v:expr ) => {
169        $crate::types::RNil.put(stringify!($k).into(), $v.into())
170    };
171
172    ( $k:ident: $v:expr, $($tail:tt)* ) => {
173        row!( $($tail)* ).put(stringify!($k).into(), $v.into())
174    };
175
176    ( $k:expr => $v:expr ) => {
177        $crate::types::RNil.put($k.into(), $v.into())
178    };
179
180    ( $k:expr => $v:expr, $($tail:tt)* ) => {
181        row!( $($tail)* ).put($k.into(), $v.into())
182    };
183}
184
185#[cfg(test)]
186mod tests {
187    #[test]
188    fn it_works() {
189        assert_eq!(2 + 2, 4);
190    }
191}