#![allow(clippy::ptr_arg)]
use std::sync::Arc;
use errors::Result;
use protocols::Stage;
use tokio::net::TcpStream;
use tokio::sync::mpsc::Sender;
use tokio::sync::Notify;
use crate::cmd::Cmd;
use crate::connection::Connection;
use crate::protocols::HelloRequest;
use crate::types::Block;
use crate::types::Progress;
pub mod binary;
pub mod cmd;
pub mod connection;
pub mod error_codes;
pub mod errors;
pub mod protocols;
pub mod types;
#[derive(Debug, Clone)]
pub struct ClickHouseMetadata {
name: String,
display_name: String,
major_version: u64,
minor_version: u64,
patch_version: u64,
tcp_protocol_version: u64,
timezone: String,
has_stack_trace: bool,
}
impl Default for ClickHouseMetadata {
fn default() -> Self {
Self {
name: "clickhouse-server".to_string(),
display_name: "clickhouse-server".to_string(),
major_version: 19,
minor_version: 17,
patch_version: 1,
tcp_protocol_version: 54428,
timezone: "UTC".to_string(),
has_stack_trace: false,
}
}
}
impl ClickHouseMetadata {
pub fn name(&self) -> &str {
&self.name
}
pub fn with_name(mut self, name: &str) -> Self {
self.name = name.to_string();
self
}
pub fn display_name(&self) -> &str {
&self.display_name
}
pub fn with_display_name(mut self, name: &str) -> Self {
self.display_name = name.to_string();
self
}
pub fn version(&self) -> (u64, u64, u64) {
(self.major_version, self.minor_version, self.patch_version)
}
pub fn with_major_version(mut self, v: u64) -> Self {
self.major_version = v;
self
}
pub fn with_minor_version(mut self, v: u64) -> Self {
self.minor_version = v;
self
}
pub fn with_patch_version(mut self, v: u64) -> Self {
self.patch_version = v;
self
}
pub fn tcp_protocol_version(&self) -> u64 {
self.tcp_protocol_version
}
pub fn with_tcp_protocol_version(mut self, v: u64) -> Self {
self.tcp_protocol_version = v;
self
}
pub fn timezone(&self) -> &str {
&self.timezone
}
pub fn with_timezone(mut self, v: &str) -> Self {
self.timezone = v.to_string();
self
}
pub fn has_stack_trace(&self) -> bool {
self.has_stack_trace
}
pub fn with_enable_stack_trace(mut self) -> Self {
self.has_stack_trace = true;
self
}
}
#[async_trait::async_trait]
pub trait ClickHouseSession: Send + Sync {
async fn authenticate(&self, _username: &str, _password: &[u8], _client_addr: &str) -> bool {
true
}
async fn execute_query(&self, ctx: &mut CHContext, connection: &mut Connection) -> Result<()>;
fn get_progress(&self) -> Progress {
Progress::default()
}
fn metadata(&self) -> &ClickHouseMetadata;
#[deprecated = "use ClickHouseMetadata::has_stack_trace() instead"]
fn with_stack_trace(&self) -> bool {
self.metadata().has_stack_trace()
}
#[deprecated = "use ClickHouseMetadata::name() instead"]
fn dbms_name(&self) -> &str {
self.metadata().name()
}
#[deprecated = "use ClickHouseMetadata::version() instead"]
fn dbms_version_major(&self) -> u64 {
self.metadata().version().0
}
#[deprecated = "use ClickHouseMetadata::version() instead"]
fn dbms_version_minor(&self) -> u64 {
self.metadata().version().1
}
#[deprecated = "use ClickHouseMetadata::tcp_protocol_version() instead"]
fn dbms_tcp_protocol_version(&self) -> u64 {
self.metadata().tcp_protocol_version()
}
#[deprecated = "use ClickHouseMetadata::timezone() instead"]
fn timezone(&self) -> &str {
self.metadata().timezone()
}
#[deprecated = "use ClickHouseMetadata::display_name() instead"]
fn server_display_name(&self) -> &str {
self.metadata().display_name()
}
#[deprecated = "use ClickHouseMetadata::version() instead"]
fn dbms_version_patch(&self) -> u64 {
self.metadata().version().2
}
}
#[derive(Default)]
pub struct QueryState {
pub query_id: String,
pub stage: Stage,
pub compression: u64,
pub query: String,
pub is_cancelled: bool,
pub is_connection_closed: bool,
pub is_empty: bool,
pub sent_all_data: Arc<Notify>,
pub out: Option<Sender<Block>>,
}
impl QueryState {
fn reset(&mut self) {
self.stage = Stage::Default;
self.is_cancelled = false;
self.is_connection_closed = false;
self.is_empty = false;
self.out = None;
}
}
pub struct CHContext {
pub state: QueryState,
pub client_revision: u64,
pub hello: Option<HelloRequest>,
}
impl CHContext {
pub fn new(state: QueryState) -> Self {
Self {
state,
client_revision: 0,
hello: None,
}
}
}
pub struct ClickHouseServer {}
impl ClickHouseServer {
pub async fn run_on_stream(
session: Arc<dyn ClickHouseSession>,
stream: TcpStream,
) -> Result<()> {
ClickHouseServer::run_on(session, stream).await
}
}
impl ClickHouseServer {
async fn run_on(session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
let mut srv = ClickHouseServer {};
srv.run(session, stream).await?;
Ok(())
}
async fn run(&mut self, session: Arc<dyn ClickHouseSession>, stream: TcpStream) -> Result<()> {
tracing::debug!("Handle New session");
let metadata = session.metadata();
let tz = metadata.timezone().to_string();
let mut ctx = CHContext::new(QueryState::default());
let mut connection = Connection::new(stream, session, tz)?;
loop {
let maybe_packet = tokio::select! {
res = connection.read_packet(&mut ctx) => res,
};
let packet = match maybe_packet {
Ok(Some(packet)) => packet,
Err(e) => {
ctx.state.reset();
connection.write_error(&e).await?;
return Err(e);
}
Ok(None) => {
tracing::debug!("{:?}", "none data reset");
ctx.state.reset();
return Ok(());
}
};
let cmd = Cmd::create(packet);
cmd.apply(&mut connection, &mut ctx).await?;
}
}
}
#[macro_export]
macro_rules! row {
() => { $crate::types::RNil };
( $i:ident, $($tail:tt)* ) => {
row!( $($tail)* ).put(stringify!($i).into(), $i.into())
};
( $i:ident ) => { row!($i: $i) };
( $k:ident: $v:expr ) => {
$crate::types::RNil.put(stringify!($k).into(), $v.into())
};
( $k:ident: $v:expr, $($tail:tt)* ) => {
row!( $($tail)* ).put(stringify!($k).into(), $v.into())
};
( $k:expr => $v:expr ) => {
$crate::types::RNil.put($k.into(), $v.into())
};
( $k:expr => $v:expr, $($tail:tt)* ) => {
row!( $($tail)* ).put($k.into(), $v.into())
};
}