opensrv_clickhouse/connection.rs
1// Copyright 2021 Datafuse Labs.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io::Cursor;
16use std::sync::Arc;
17
18use bytes::Buf;
19use bytes::BytesMut;
20use chrono_tz::Tz;
21use tokio::io::AsyncReadExt;
22use tokio::io::AsyncWriteExt;
23use tokio::io::BufWriter;
24use tokio::net::TcpStream;
25
26use crate::binary::Encoder;
27use crate::binary::Parser;
28use crate::errors::Error;
29use crate::errors::Result;
30use crate::protocols::ExceptionResponse;
31use crate::protocols::Packet;
32use crate::protocols::SERVER_END_OF_STREAM;
33use crate::types::Block;
34use crate::types::Progress;
35use crate::CHContext;
36use crate::ClickHouseSession;
37
38/// Send and receive `Packet` values from a remote peer.
39///
40/// When implementing networking protocols, a message on that protocol is
41/// often composed of several smaller messages known as frames. The purpose of
42/// `Connection` is to read and write frames on the underlying `TcpStream`.
43///
44/// To read frames, the `Connection` uses an internal buffer, which is filled
45/// up until there are enough bytes to create a full frame. Once this happens,
46/// the `Connection` creates the frame and returns it to the caller.
47///
48/// When sending frames, the frame is first encoded into the write buffer.
49/// The contents of the write buffer are then written to the socket.
50pub struct Connection {
51 // The `TcpStream`. It is decorated with a `BufWriter`, which provides write
52 // level buffering. The `BufWriter` implementation provided by Tokio is
53 // sufficient for our needs.
54 pub buffer: BytesMut,
55
56 stream: BufWriter<TcpStream>,
57 pub session: Arc<dyn ClickHouseSession>,
58
59 // The buffer for reading frames.
60 tz: Tz,
61 with_stack_trace: bool,
62 compress: bool,
63
64 pub client_addr: String,
65}
66
67impl Connection {
68 /// Create a new `Connection`, backed by `socket`. Read and write buffers
69 /// are initialized.
70 pub fn new(
71 stream: TcpStream,
72 session: Arc<dyn ClickHouseSession>,
73 timezone: String,
74 ) -> Result<Connection> {
75 let tz: Tz = timezone.parse()?;
76 let client_addr = stream.peer_addr()?.to_string();
77 Ok(Connection {
78 stream: BufWriter::new(stream),
79 buffer: BytesMut::with_capacity(4 * 1024),
80 session,
81 tz,
82 with_stack_trace: false,
83 compress: true,
84 client_addr,
85 })
86 }
87
88 /// Read a single `Packet` value from the underlying stream.
89 ///
90 /// The function waits until it has retrieved enough data to parse a frame.
91 /// Any data remaining in the read buffer after the frame has been parsed is
92 /// kept there for the next call to `read_packet`.
93 ///
94 /// # Returns
95 ///
96 /// On success, the received frame is returned. If the `TcpStream`
97 /// is closed in a way that doesn't break a frame in half, it returns
98 /// `None`. Otherwise, an error is returned.
99 pub async fn read_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
100 loop {
101 // Attempt to parse a frame from the buffered data. If enough data
102 // has been buffered, the frame is returned.
103 if let Some(frame) = self.parse_packet(ctx)? {
104 return Ok(Some(frame));
105 }
106
107 // There is not enough buffered data to read a frame. Attempt to
108 // read more data from the socket.
109 //
110 // On success, the number of bytes is returned. `0` indicates "end
111 // of stream".
112 if 0 == self.stream.read_buf(&mut self.buffer).await? {
113 // The remote closed the connection. For this to be a clean
114 // shutdown, there should be no data in the read buffer. If
115 // there is, this means that the peer closed the socket while
116 // sending a frame.
117 if self.buffer.is_empty() {
118 return Ok(None);
119 } else {
120 return Err("connection reset by peer".into());
121 }
122 }
123 }
124 }
125
126 /// Tries to parse a frame from the buffer. If the buffer contains enough
127 /// data, the frame is returned and the data removed from the buffer. If not
128 /// enough data has been buffered yet, `Ok(None)` is returned. If the
129 /// buffered data does not represent a valid frame, `Err` is returned.
130 fn parse_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
131 // Cursor is used to track the "current" location in the
132 // buffer. Cursor also implements `Buf` from the `bytes` crate
133 // which provides a number of helpful utilities for working
134 // with bytes.
135 let mut buf = Cursor::new(&self.buffer[..]);
136 let mut parser = Parser::new(&mut buf, self.tz);
137
138 let hello = ctx.hello.clone();
139 let packet = parser.parse_packet(&hello, self.compress);
140
141 match packet {
142 Ok(packet) => {
143 if let Packet::Query(ref query) = &packet {
144 self.compress = query.compression > 0
145 }
146 // The `check` function will have advanced the cursor until the
147 // end of the frame. Since the cursor had position set to zero
148 // before `Packet::check` was called, we obtain the length of the
149 // frame by checking the cursor position.
150 let len = buf.position() as usize;
151 buf.set_position(0);
152 self.buffer.advance(len);
153 // Return the parsed frame to the caller.
154 Ok(Some(packet))
155 }
156 // There is not enough data present in the read buffer to parse a
157 // single frame. We must wait for more data to be received from the
158 // socket. Reading from the socket will be done in the statement
159 // after this `match`.
160 //
161 // We do not want to return `Err` from here as this "error" is an
162 // expected runtime condition.
163 Err(err) if err.is_would_block() => Ok(None),
164 // An error was encountered while parsing the frame. The connection
165 // is now in an invalid state. Returning `Err` from here will result
166 // in the connection being closed.
167 Err(e) => Err(e),
168 }
169 }
170
171 pub async fn write_block(&mut self, block: &Block) -> Result<()> {
172 let mut encoder = Encoder::new();
173 block.send_server_data(&mut encoder, self.compress);
174 self.stream.write_all(&encoder.get_buffer()).await?;
175 self.stream.flush().await?;
176 Ok(())
177 }
178
179 pub async fn write_progress(&mut self, progress: Progress, client_revision: u64) -> Result<()> {
180 let mut encoder = Encoder::new();
181 progress.write(&mut encoder, client_revision);
182 self.stream.write_all(&encoder.get_buffer()).await?;
183 self.stream.flush().await?;
184 Ok(())
185 }
186
187 pub async fn write_end_of_stream(&mut self) -> Result<()> {
188 let mut encoder = Encoder::new();
189 encoder.uvarint(SERVER_END_OF_STREAM);
190 self.write_bytes(encoder.get_buffer()).await?;
191 Ok(())
192 }
193
194 pub async fn write_error(&mut self, err: &Error) -> Result<()> {
195 let mut encoder = Encoder::new();
196 ExceptionResponse::write(&mut encoder, err, self.with_stack_trace);
197
198 self.stream.write_all(&encoder.get_buffer()).await?;
199 self.stream.flush().await?;
200 Ok(())
201 }
202
203 pub async fn write_bytes(&mut self, bytes: Vec<u8>) -> Result<()> {
204 self.stream.write_all(&bytes).await?;
205 self.stream.flush().await?;
206 Ok(())
207 }
208}