1use log::debug;
2
3use crate::binary::Encoder;
4use crate::connection::Connection;
5use crate::errors::Result;
6use crate::protocols::HelloResponse;
7use crate::protocols::Packet;
8use crate::protocols::Stage;
9use crate::protocols::SERVER_PONG;
10use crate::CHContext;
11
12pub struct Cmd {
13 packet: Packet
14}
15
16impl Cmd {
17 pub fn create(packet: Packet) -> Self {
18 Self { packet }
19 }
20
21 pub async fn apply(self, connection: &mut Connection, ctx: &mut CHContext) -> Result<()> {
22 let mut encoder = Encoder::new();
23 match self.packet {
24 Packet::Ping => {
25 encoder.uvarint(SERVER_PONG);
26 }
27 Packet::Cancel => {}
29 Packet::Hello(hello) => {
30 let response = HelloResponse {
31 dbms_name: connection.session.dbms_name().to_string(),
32 dbms_version_major: connection.session.dbms_version_major(),
33 dbms_version_minor: connection.session.dbms_version_minor(),
34 dbms_tcp_protocol_version: connection.session.dbms_tcp_protocol_version(),
35 timezone: connection.session.timezone().to_string(),
36 server_display_name: connection.session.server_display_name().to_string(),
37 dbms_version_patch: connection.session.dbms_version_patch()
38 };
39
40 ctx.client_revision = connection
41 .session
42 .dbms_tcp_protocol_version()
43 .min(hello.client_revision);
44 ctx.hello = Some(hello.clone());
45
46 response.encode(&mut encoder, ctx.client_revision)?;
47 }
48 Packet::Query(query) => {
49 ctx.state.query = query.query.clone();
50 ctx.state.compression = query.compression;
51
52 let session = connection.session.clone();
53 session.execute_query(ctx, connection).await?;
54
55 if let Some(_) = &ctx.state.out {
56 ctx.state.stage = Stage::InsertPrepare;
57 } else {
58 connection.write_end_of_stream().await?;
59 }
60 }
61 Packet::Data(block) => {
62 if block.is_empty() {
63 match ctx.state.stage {
64 Stage::InsertPrepare => {
65 ctx.state.stage = Stage::InsertStarted;
66 }
67 Stage::InsertStarted => {
68 ctx.state.reset();
70
71 ctx.state.sent_all_data.notified().await;
72 connection.write_end_of_stream().await?;
74 ctx.state.stage = Stage::Default;
75 }
76 _ => {}
77 }
78 } else {
79 if let Some(out) = &ctx.state.out {
80 out.send(block).await.unwrap();
82 }
83 }
84 }
85 };
86
87 let bytes = encoder.get_buffer();
88 if !bytes.is_empty() {
89 connection.write_bytes(bytes).await?;
90 }
91 Ok(())
92 }
93}