opensrv_clickhouse/
cmd.rs1use crate::binary::Encoder;
16use crate::connection::Connection;
17use crate::error_codes::WRONG_PASSWORD;
18use crate::errors::Error;
19use crate::errors::Result;
20use crate::errors::ServerError;
21use crate::protocols::HelloResponse;
22use crate::protocols::Packet;
23use crate::protocols::Stage;
24use crate::protocols::SERVER_PONG;
25use crate::CHContext;
26
27pub struct Cmd {
28 packet: Packet,
29}
30
31impl Cmd {
32 pub fn create(packet: Packet) -> Self {
33 Self { packet }
34 }
35
36 pub async fn apply(self, connection: &mut Connection, ctx: &mut CHContext) -> Result<()> {
37 let mut encoder = Encoder::new();
38 match self.packet {
39 Packet::Ping => {
40 encoder.uvarint(SERVER_PONG);
41 }
42 Packet::Cancel => {}
44 Packet::Hello(hello) => {
45 if !connection
46 .session
47 .authenticate(&hello.user, &hello.password, &connection.client_addr)
48 .await
49 {
50 let err = Error::Server(ServerError {
51 code: WRONG_PASSWORD,
52 name: "AuthenticateException".to_owned(),
53 message: "Unknown user or wrong password".to_owned(),
54 stack_trace: "".to_owned(),
55 });
56 connection.write_error(&err).await?;
57 return Err(err);
58 }
59
60 let metadata = connection.session.metadata();
61 let (dbms_version_major, dbms_version_minor, dbms_version_patch) =
62 metadata.version();
63 let response = HelloResponse {
64 dbms_name: metadata.name().to_string(),
65 dbms_version_major,
66 dbms_version_minor,
67 dbms_tcp_protocol_version: metadata.tcp_protocol_version(),
68 timezone: metadata.timezone().to_string(),
69 server_display_name: metadata.display_name().to_string(),
70 dbms_version_patch,
71 };
72
73 ctx.client_revision = metadata.tcp_protocol_version.min(hello.client_revision);
74 ctx.hello = Some(hello);
75
76 response.encode(&mut encoder, ctx.client_revision)?;
77 }
78 Packet::Query(query) => {
79 ctx.state.query = query.query.clone();
80 ctx.state.compression = query.compression;
81
82 let session = connection.session.clone();
83 session.execute_query(ctx, connection).await?;
84
85 if ctx.state.out.is_some() {
86 ctx.state.stage = Stage::InsertPrepare;
87 } else {
88 connection.write_end_of_stream().await?;
89 }
90 }
91 Packet::Data(block) => {
92 if block.is_empty() {
93 match ctx.state.stage {
94 Stage::InsertPrepare => {
95 ctx.state.stage = Stage::InsertStarted;
96 }
97 Stage::InsertStarted => {
98 ctx.state.reset();
100
101 ctx.state.sent_all_data.notified().await;
102 connection.write_end_of_stream().await?;
104 ctx.state.stage = Stage::Default;
105 }
106 _ => {}
107 }
108 } else if let Some(out) = &ctx.state.out {
109 out.send(block).await.unwrap();
111 }
112 }
113 };
114
115 let bytes = encoder.get_buffer();
116 if !bytes.is_empty() {
117 connection.write_bytes(bytes).await?;
118 }
119 Ok(())
120 }
121}