opensrv_clickhouse/
cmd.rs

1// Copyright 2021 Datafuse Labs.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::binary::Encoder;
16use crate::connection::Connection;
17use crate::error_codes::WRONG_PASSWORD;
18use crate::errors::Error;
19use crate::errors::Result;
20use crate::errors::ServerError;
21use crate::protocols::HelloResponse;
22use crate::protocols::Packet;
23use crate::protocols::Stage;
24use crate::protocols::SERVER_PONG;
25use crate::CHContext;
26
27pub struct Cmd {
28    packet: Packet,
29}
30
31impl Cmd {
32    pub fn create(packet: Packet) -> Self {
33        Self { packet }
34    }
35
36    pub async fn apply(self, connection: &mut Connection, ctx: &mut CHContext) -> Result<()> {
37        let mut encoder = Encoder::new();
38        match self.packet {
39            Packet::Ping => {
40                encoder.uvarint(SERVER_PONG);
41            }
42            // todo cancel
43            Packet::Cancel => {}
44            Packet::Hello(hello) => {
45                if !connection
46                    .session
47                    .authenticate(&hello.user, &hello.password, &connection.client_addr)
48                    .await
49                {
50                    let err = Error::Server(ServerError {
51                        code: WRONG_PASSWORD,
52                        name: "AuthenticateException".to_owned(),
53                        message: "Unknown user or wrong password".to_owned(),
54                        stack_trace: "".to_owned(),
55                    });
56                    connection.write_error(&err).await?;
57                    return Err(err);
58                }
59
60                let metadata = connection.session.metadata();
61                let (dbms_version_major, dbms_version_minor, dbms_version_patch) =
62                    metadata.version();
63                let response = HelloResponse {
64                    dbms_name: metadata.name().to_string(),
65                    dbms_version_major,
66                    dbms_version_minor,
67                    dbms_tcp_protocol_version: metadata.tcp_protocol_version(),
68                    timezone: metadata.timezone().to_string(),
69                    server_display_name: metadata.display_name().to_string(),
70                    dbms_version_patch,
71                };
72
73                ctx.client_revision = metadata.tcp_protocol_version.min(hello.client_revision);
74                ctx.hello = Some(hello);
75
76                response.encode(&mut encoder, ctx.client_revision)?;
77            }
78            Packet::Query(query) => {
79                ctx.state.query = query.query.clone();
80                ctx.state.compression = query.compression;
81
82                let session = connection.session.clone();
83                session.execute_query(ctx, connection).await?;
84
85                if ctx.state.out.is_some() {
86                    ctx.state.stage = Stage::InsertPrepare;
87                } else {
88                    connection.write_end_of_stream().await?;
89                }
90            }
91            Packet::Data(block) => {
92                if block.is_empty() {
93                    match ctx.state.stage {
94                        Stage::InsertPrepare => {
95                            ctx.state.stage = Stage::InsertStarted;
96                        }
97                        Stage::InsertStarted => {
98                            // reset will reset the out, so the outer stream will break
99                            ctx.state.reset();
100
101                            ctx.state.sent_all_data.notified().await;
102                            // wait stream finished
103                            connection.write_end_of_stream().await?;
104                            ctx.state.stage = Stage::Default;
105                        }
106                        _ => {}
107                    }
108                } else if let Some(out) = &ctx.state.out {
109                    // out.block_stream.
110                    out.send(block).await.unwrap();
111                }
112            }
113        };
114
115        let bytes = encoder.get_buffer();
116        if !bytes.is_empty() {
117            connection.write_bytes(bytes).await?;
118        }
119        Ok(())
120    }
121}