opensrv_clickhouse/
connection.rs

1// Copyright 2021 Datafuse Labs.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Cursor;
16use std::sync::Arc;
17
18use bytes::Buf;
19use bytes::BytesMut;
20use chrono_tz::Tz;
21use tokio::io::AsyncReadExt;
22use tokio::io::AsyncWriteExt;
23use tokio::io::BufWriter;
24use tokio::net::TcpStream;
25
26use crate::binary::Encoder;
27use crate::binary::Parser;
28use crate::errors::Error;
29use crate::errors::Result;
30use crate::protocols::ExceptionResponse;
31use crate::protocols::Packet;
32use crate::protocols::SERVER_END_OF_STREAM;
33use crate::types::Block;
34use crate::types::Progress;
35use crate::CHContext;
36use crate::ClickHouseSession;
37
38/// Send and receive `Packet` values from a remote peer.
39///
40/// When implementing networking protocols, a message on that protocol is
41/// often composed of several smaller messages known as frames. The purpose of
42/// `Connection` is to read and write frames on the underlying `TcpStream`.
43///
44/// To read frames, the `Connection` uses an internal buffer, which is filled
45/// up until there are enough bytes to create a full frame. Once this happens,
46/// the `Connection` creates the frame and returns it to the caller.
47///
48/// When sending frames, the frame is first encoded into the write buffer.
49/// The contents of the write buffer are then written to the socket.
50pub struct Connection {
51    // The `TcpStream`. It is decorated with a `BufWriter`, which provides write
52    // level buffering. The `BufWriter` implementation provided by Tokio is
53    // sufficient for our needs.
54    pub buffer: BytesMut,
55
56    stream: BufWriter<TcpStream>,
57    pub session: Arc<dyn ClickHouseSession>,
58
59    // The buffer for reading frames.
60    tz: Tz,
61    with_stack_trace: bool,
62    compress: bool,
63
64    pub client_addr: String,
65}
66
67impl Connection {
68    /// Create a new `Connection`, backed by `socket`. Read and write buffers
69    /// are initialized.
70    pub fn new(
71        stream: TcpStream,
72        session: Arc<dyn ClickHouseSession>,
73        timezone: String,
74    ) -> Result<Connection> {
75        let tz: Tz = timezone.parse()?;
76        let client_addr = stream.peer_addr()?.to_string();
77        Ok(Connection {
78            stream: BufWriter::new(stream),
79            buffer: BytesMut::with_capacity(4 * 1024),
80            session,
81            tz,
82            with_stack_trace: false,
83            compress: true,
84            client_addr,
85        })
86    }
87
88    /// Read a single `Packet` value from the underlying stream.
89    ///
90    /// The function waits until it has retrieved enough data to parse a frame.
91    /// Any data remaining in the read buffer after the frame has been parsed is
92    /// kept there for the next call to `read_packet`.
93    ///
94    /// # Returns
95    ///
96    /// On success, the received frame is returned. If the `TcpStream`
97    /// is closed in a way that doesn't break a frame in half, it returns
98    /// `None`. Otherwise, an error is returned.
99    pub async fn read_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
100        loop {
101            // Attempt to parse a frame from the buffered data. If enough data
102            // has been buffered, the frame is returned.
103            if let Some(frame) = self.parse_packet(ctx)? {
104                return Ok(Some(frame));
105            }
106
107            // There is not enough buffered data to read a frame. Attempt to
108            // read more data from the socket.
109            //
110            // On success, the number of bytes is returned. `0` indicates "end
111            // of stream".
112            if 0 == self.stream.read_buf(&mut self.buffer).await? {
113                // The remote closed the connection. For this to be a clean
114                // shutdown, there should be no data in the read buffer. If
115                // there is, this means that the peer closed the socket while
116                // sending a frame.
117                if self.buffer.is_empty() {
118                    return Ok(None);
119                } else {
120                    return Err("connection reset by peer".into());
121                }
122            }
123        }
124    }
125
126    /// Tries to parse a frame from the buffer. If the buffer contains enough
127    /// data, the frame is returned and the data removed from the buffer. If not
128    /// enough data has been buffered yet, `Ok(None)` is returned. If the
129    /// buffered data does not represent a valid frame, `Err` is returned.
130    fn parse_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
131        // Cursor is used to track the "current" location in the
132        // buffer. Cursor also implements `Buf` from the `bytes` crate
133        // which provides a number of helpful utilities for working
134        // with bytes.
135        let mut buf = Cursor::new(&self.buffer[..]);
136        let mut parser = Parser::new(&mut buf, self.tz);
137
138        let hello = ctx.hello.clone();
139        let packet = parser.parse_packet(&hello, self.compress);
140
141        match packet {
142            Ok(packet) => {
143                if let Packet::Query(ref query) = &packet {
144                    self.compress = query.compression > 0
145                }
146                // The `check` function will have advanced the cursor until the
147                // end of the frame. Since the cursor had position set to zero
148                // before `Packet::check` was called, we obtain the length of the
149                // frame by checking the cursor position.
150                let len = buf.position() as usize;
151                buf.set_position(0);
152                self.buffer.advance(len);
153                // Return the parsed frame to the caller.
154                Ok(Some(packet))
155            }
156            // There is not enough data present in the read buffer to parse a
157            // single frame. We must wait for more data to be received from the
158            // socket. Reading from the socket will be done in the statement
159            // after this `match`.
160            //
161            // We do not want to return `Err` from here as this "error" is an
162            // expected runtime condition.
163            Err(err) if err.is_would_block() => Ok(None),
164            // An error was encountered while parsing the frame. The connection
165            // is now in an invalid state. Returning `Err` from here will result
166            // in the connection being closed.
167            Err(e) => Err(e),
168        }
169    }
170
171    pub async fn write_block(&mut self, block: &Block) -> Result<()> {
172        let mut encoder = Encoder::new();
173        block.send_server_data(&mut encoder, self.compress);
174        self.stream.write_all(&encoder.get_buffer()).await?;
175        self.stream.flush().await?;
176        Ok(())
177    }
178
179    pub async fn write_progress(&mut self, progress: Progress, client_revision: u64) -> Result<()> {
180        let mut encoder = Encoder::new();
181        progress.write(&mut encoder, client_revision);
182        self.stream.write_all(&encoder.get_buffer()).await?;
183        self.stream.flush().await?;
184        Ok(())
185    }
186
187    pub async fn write_end_of_stream(&mut self) -> Result<()> {
188        let mut encoder = Encoder::new();
189        encoder.uvarint(SERVER_END_OF_STREAM);
190        self.write_bytes(encoder.get_buffer()).await?;
191        Ok(())
192    }
193
194    pub async fn write_error(&mut self, err: &Error) -> Result<()> {
195        let mut encoder = Encoder::new();
196        ExceptionResponse::write(&mut encoder, err, self.with_stack_trace);
197
198        self.stream.write_all(&encoder.get_buffer()).await?;
199        self.stream.flush().await?;
200        Ok(())
201    }
202
203    pub async fn write_bytes(&mut self, bytes: Vec<u8>) -> Result<()> {
204        self.stream.write_all(&bytes).await?;
205        self.stream.flush().await?;
206        Ok(())
207    }
208}