clickhouse_srv/
connection.rs

1use std::io::Cursor;
2use std::sync::Arc;
3
4use bytes::Buf;
5use bytes::BytesMut;
6use chrono_tz::Tz;
7use tokio::io::AsyncReadExt;
8use tokio::io::AsyncWriteExt;
9use tokio::io::BufWriter;
10use tokio::net::TcpStream;
11
12use crate::binary::Encoder;
13use crate::binary::Parser;
14use crate::errors::Error;
15use crate::errors::Result;
16use crate::protocols::ExceptionResponse;
17use crate::protocols::Packet;
18use crate::protocols::SERVER_END_OF_STREAM;
19use crate::types::Block;
20use crate::types::Progress;
21use crate::CHContext;
22use crate::ClickHouseSession;
23
24/// Send and receive `Packet` values from a remote peer.
25///
26/// When implementing networking protocols, a message on that protocol is
27/// often composed of several smaller messages known as frames. The purpose of
28/// `Connection` is to read and write frames on the underlying `TcpStream`.
29///
30/// To read frames, the `Connection` uses an internal buffer, which is filled
31/// up until there are enough bytes to create a full frame. Once this happens,
32/// the `Connection` creates the frame and returns it to the caller.
33///
34/// When sending frames, the frame is first encoded into the write buffer.
35/// The contents of the write buffer are then written to the socket.
36pub struct Connection {
37    // The `TcpStream`. It is decorated with a `BufWriter`, which provides write
38    // level buffering. The `BufWriter` implementation provided by Tokio is
39    // sufficient for our needs.
40    pub buffer: BytesMut,
41
42    stream: BufWriter<TcpStream>,
43    pub session: Arc<dyn ClickHouseSession>,
44
45    // The buffer for reading frames.
46    tz: Tz,
47    with_stack_trace: bool,
48    compress: bool
49}
50
51impl Connection {
52    /// Create a new `Connection`, backed by `socket`. Read and write buffers
53    /// are initialized.
54    pub fn new(
55        stream: TcpStream,
56        session: Arc<dyn ClickHouseSession>,
57        timezone: String
58    ) -> Result<Connection> {
59        let tz: Tz = timezone.parse()?;
60        Ok(Connection {
61            stream: BufWriter::new(stream),
62            buffer: BytesMut::with_capacity(4 * 1024),
63            session,
64            tz,
65            with_stack_trace: false,
66            compress: true
67        })
68    }
69
70    /// Read a single `Packet` value from the underlying stream.
71    ///
72    /// The function waits until it has retrieved enough data to parse a frame.
73    /// Any data remaining in the read buffer after the frame has been parsed is
74    /// kept there for the next call to `read_packet`.
75    ///
76    /// # Returns
77    ///
78    /// On success, the received frame is returned. If the `TcpStream`
79    /// is closed in a way that doesn't break a frame in half, it returns
80    /// `None`. Otherwise, an error is returned.
81    pub async fn read_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
82        loop {
83            // Attempt to parse a frame from the buffered data. If enough data
84            // has been buffered, the frame is returned.
85            if let Some(frame) = self.parse_packet(ctx)? {
86                return Ok(Some(frame));
87            }
88
89            // There is not enough buffered data to read a frame. Attempt to
90            // read more data from the socket.
91            //
92            // On success, the number of bytes is returned. `0` indicates "end
93            // of stream".
94            if 0 == self.stream.read_buf(&mut self.buffer).await? {
95                // The remote closed the connection. For this to be a clean
96                // shutdown, there should be no data in the read buffer. If
97                // there is, this means that the peer closed the socket while
98                // sending a frame.
99                if self.buffer.is_empty() {
100                    return Ok(None);
101                } else {
102                    return Err("connection reset by peer".into());
103                }
104            }
105        }
106    }
107
108    /// Tries to parse a frame from the buffer. If the buffer contains enough
109    /// data, the frame is returned and the data removed from the buffer. If not
110    /// enough data has been buffered yet, `Ok(None)` is returned. If the
111    /// buffered data does not represent a valid frame, `Err` is returned.
112    fn parse_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
113        // Cursor is used to track the "current" location in the
114        // buffer. Cursor also implements `Buf` from the `bytes` crate
115        // which provides a number of helpful utilities for working
116        // with bytes.
117        let mut buf = Cursor::new(&self.buffer[..]);
118        let mut parser = Parser::new(&mut buf, self.tz);
119
120        let hello = ctx.hello.clone();
121        let packet = parser.parse_packet(&hello, self.compress);
122
123        match packet {
124            Ok(packet) => {
125                match &packet {
126                    Packet::Query(ref query) => self.compress = query.compression > 0,
127                    _ => {}
128                }
129                // The `check` function will have advanced the cursor until the
130                // end of the frame. Since the cursor had position set to zero
131                // before `Packet::check` was called, we obtain the length of the
132                // frame by checking the cursor position.
133                let len = buf.position() as usize;
134                buf.set_position(0);
135                self.buffer.advance(len);
136                // Return the parsed frame to the caller.
137                Ok(Some(packet))
138            }
139            // There is not enough data present in the read buffer to parse a
140            // single frame. We must wait for more data to be received from the
141            // socket. Reading from the socket will be done in the statement
142            // after this `match`.
143            //
144            // We do not want to return `Err` from here as this "error" is an
145            // expected runtime condition.
146            Err(err) if err.is_would_block() => Ok(None),
147            // An error was encountered while parsing the frame. The connection
148            // is now in an invalid state. Returning `Err` from here will result
149            // in the connection being closed.
150            Err(e) => Err(e.into())
151        }
152    }
153
154    pub async fn write_block(&mut self, block: &Block) -> Result<()> {
155        let mut encoder = Encoder::new();
156        block.send_server_data(&mut encoder, self.compress);
157        self.stream.write_all(&encoder.get_buffer()).await?;
158        self.stream.flush().await?;
159        Ok(())
160    }
161
162    pub async fn write_progress(&mut self, progress: Progress, client_revision: u64) -> Result<()> {
163        let mut encoder = Encoder::new();
164        progress.write(&mut encoder, client_revision);
165        self.stream.write_all(&encoder.get_buffer()).await?;
166        self.stream.flush().await?;
167        Ok(())
168    }
169
170    pub async fn write_end_of_stream(&mut self) -> Result<()> {
171        let mut encoder = Encoder::new();
172        encoder.uvarint(SERVER_END_OF_STREAM);
173        self.write_bytes(encoder.get_buffer()).await?;
174        Ok(())
175    }
176
177    pub async fn write_error(&mut self, err: &Error) -> Result<()> {
178        let mut encoder = Encoder::new();
179        ExceptionResponse::write(&mut encoder, &err, self.with_stack_trace);
180
181        self.stream.write_all(&encoder.get_buffer()).await?;
182        self.stream.flush().await?;
183        Ok(())
184    }
185
186    pub async fn write_bytes(&mut self, bytes: Vec<u8>) -> Result<()> {
187        self.stream.write_all(&bytes).await?;
188        self.stream.flush().await?;
189        Ok(())
190    }
191}