opensrv_clickhouse/
cmd.rs1use crate::binary::Encoder;
16use crate::connection::Connection;
17use crate::error_codes::WRONG_PASSWORD;
18use crate::errors::Error;
19use crate::errors::Result;
20use crate::errors::ServerError;
21use crate::protocols::HelloResponse;
22use crate::protocols::Packet;
23use crate::protocols::Stage;
24use crate::protocols::SERVER_PONG;
25use crate::CHContext;
26
27pub struct Cmd {
28    packet: Packet,
29}
30
31impl Cmd {
32    pub fn create(packet: Packet) -> Self {
33        Self { packet }
34    }
35
36    pub async fn apply(self, connection: &mut Connection, ctx: &mut CHContext) -> Result<()> {
37        let mut encoder = Encoder::new();
38        match self.packet {
39            Packet::Ping => {
40                encoder.uvarint(SERVER_PONG);
41            }
42            Packet::Cancel => {}
44            Packet::Hello(hello) => {
45                if !connection
46                    .session
47                    .authenticate(&hello.user, &hello.password, &connection.client_addr)
48                    .await
49                {
50                    let err = Error::Server(ServerError {
51                        code: WRONG_PASSWORD,
52                        name: "AuthenticateException".to_owned(),
53                        message: "Unknown user or wrong password".to_owned(),
54                        stack_trace: "".to_owned(),
55                    });
56                    connection.write_error(&err).await?;
57                    return Err(err);
58                }
59
60                let metadata = connection.session.metadata();
61                let (dbms_version_major, dbms_version_minor, dbms_version_patch) =
62                    metadata.version();
63                let response = HelloResponse {
64                    dbms_name: metadata.name().to_string(),
65                    dbms_version_major,
66                    dbms_version_minor,
67                    dbms_tcp_protocol_version: metadata.tcp_protocol_version(),
68                    timezone: metadata.timezone().to_string(),
69                    server_display_name: metadata.display_name().to_string(),
70                    dbms_version_patch,
71                };
72
73                ctx.client_revision = metadata.tcp_protocol_version.min(hello.client_revision);
74                ctx.hello = Some(hello);
75
76                response.encode(&mut encoder, ctx.client_revision)?;
77            }
78            Packet::Query(query) => {
79                ctx.state.query = query.query.clone();
80                ctx.state.compression = query.compression;
81
82                let session = connection.session.clone();
83                session.execute_query(ctx, connection).await?;
84
85                if ctx.state.out.is_some() {
86                    ctx.state.stage = Stage::InsertPrepare;
87                } else {
88                    connection.write_end_of_stream().await?;
89                }
90            }
91            Packet::Data(block) => {
92                if block.is_empty() {
93                    match ctx.state.stage {
94                        Stage::InsertPrepare => {
95                            ctx.state.stage = Stage::InsertStarted;
96                        }
97                        Stage::InsertStarted => {
98                            ctx.state.reset();
100
101                            ctx.state.sent_all_data.notified().await;
102                            connection.write_end_of_stream().await?;
104                            ctx.state.stage = Stage::Default;
105                        }
106                        _ => {}
107                    }
108                } else if let Some(out) = &ctx.state.out {
109                    out.send(block).await.unwrap();
111                }
112            }
113        };
114
115        let bytes = encoder.get_buffer();
116        if !bytes.is_empty() {
117            connection.write_bytes(bytes).await?;
118        }
119        Ok(())
120    }
121}