clickhouse_srv/
cmd.rs

1use log::debug;
2
3use crate::binary::Encoder;
4use crate::connection::Connection;
5use crate::errors::Result;
6use crate::protocols::HelloResponse;
7use crate::protocols::Packet;
8use crate::protocols::Stage;
9use crate::protocols::SERVER_PONG;
10use crate::CHContext;
11
12pub struct Cmd {
13    packet: Packet
14}
15
16impl Cmd {
17    pub fn create(packet: Packet) -> Self {
18        Self { packet }
19    }
20
21    pub async fn apply(self, connection: &mut Connection, ctx: &mut CHContext) -> Result<()> {
22        let mut encoder = Encoder::new();
23        match self.packet {
24            Packet::Ping => {
25                encoder.uvarint(SERVER_PONG);
26            }
27            // todo cancel
28            Packet::Cancel => {}
29            Packet::Hello(hello) => {
30                let response = HelloResponse {
31                    dbms_name: connection.session.dbms_name().to_string(),
32                    dbms_version_major: connection.session.dbms_version_major(),
33                    dbms_version_minor: connection.session.dbms_version_minor(),
34                    dbms_tcp_protocol_version: connection.session.dbms_tcp_protocol_version(),
35                    timezone: connection.session.timezone().to_string(),
36                    server_display_name: connection.session.server_display_name().to_string(),
37                    dbms_version_patch: connection.session.dbms_version_patch()
38                };
39
40                ctx.client_revision = connection
41                    .session
42                    .dbms_tcp_protocol_version()
43                    .min(hello.client_revision);
44                ctx.hello = Some(hello.clone());
45
46                response.encode(&mut encoder, ctx.client_revision)?;
47            }
48            Packet::Query(query) => {
49                ctx.state.query = query.query.clone();
50                ctx.state.compression = query.compression;
51
52                let session = connection.session.clone();
53                session.execute_query(ctx, connection).await?;
54
55                if let Some(_) = &ctx.state.out {
56                    ctx.state.stage = Stage::InsertPrepare;
57                } else {
58                    connection.write_end_of_stream().await?;
59                }
60            }
61            Packet::Data(block) => {
62                if block.is_empty() {
63                    match ctx.state.stage {
64                        Stage::InsertPrepare => {
65                            ctx.state.stage = Stage::InsertStarted;
66                        }
67                        Stage::InsertStarted => {
68                            // reset will reset the out, so the outer stream will break
69                            ctx.state.reset();
70
71                            ctx.state.sent_all_data.notified().await;
72                            // wait stream finished
73                            connection.write_end_of_stream().await?;
74                            ctx.state.stage = Stage::Default;
75                        }
76                        _ => {}
77                    }
78                } else {
79                    if let Some(out) = &ctx.state.out {
80                        // out.block_stream.
81                        out.send(block).await.unwrap();
82                    }
83                }
84            }
85        };
86
87        let bytes = encoder.get_buffer();
88        if !bytes.is_empty() {
89            connection.write_bytes(bytes).await?;
90        }
91        Ok(())
92    }
93}