clickhouse_srv/connection.rs
1use std::io::Cursor;
2use std::sync::Arc;
3
4use bytes::Buf;
5use bytes::BytesMut;
6use chrono_tz::Tz;
7use tokio::io::AsyncReadExt;
8use tokio::io::AsyncWriteExt;
9use tokio::io::BufWriter;
10use tokio::net::TcpStream;
11
12use crate::binary::Encoder;
13use crate::binary::Parser;
14use crate::errors::Error;
15use crate::errors::Result;
16use crate::protocols::ExceptionResponse;
17use crate::protocols::Packet;
18use crate::protocols::SERVER_END_OF_STREAM;
19use crate::types::Block;
20use crate::types::Progress;
21use crate::CHContext;
22use crate::ClickHouseSession;
23
24/// Send and receive `Packet` values from a remote peer.
25///
26/// When implementing networking protocols, a message on that protocol is
27/// often composed of several smaller messages known as frames. The purpose of
28/// `Connection` is to read and write frames on the underlying `TcpStream`.
29///
30/// To read frames, the `Connection` uses an internal buffer, which is filled
31/// up until there are enough bytes to create a full frame. Once this happens,
32/// the `Connection` creates the frame and returns it to the caller.
33///
34/// When sending frames, the frame is first encoded into the write buffer.
35/// The contents of the write buffer are then written to the socket.
36pub struct Connection {
37 // The `TcpStream`. It is decorated with a `BufWriter`, which provides write
38 // level buffering. The `BufWriter` implementation provided by Tokio is
39 // sufficient for our needs.
40 pub buffer: BytesMut,
41
42 stream: BufWriter<TcpStream>,
43 pub session: Arc<dyn ClickHouseSession>,
44
45 // The buffer for reading frames.
46 tz: Tz,
47 with_stack_trace: bool,
48 compress: bool
49}
50
51impl Connection {
52 /// Create a new `Connection`, backed by `socket`. Read and write buffers
53 /// are initialized.
54 pub fn new(
55 stream: TcpStream,
56 session: Arc<dyn ClickHouseSession>,
57 timezone: String
58 ) -> Result<Connection> {
59 let tz: Tz = timezone.parse()?;
60 Ok(Connection {
61 stream: BufWriter::new(stream),
62 buffer: BytesMut::with_capacity(4 * 1024),
63 session,
64 tz,
65 with_stack_trace: false,
66 compress: true
67 })
68 }
69
70 /// Read a single `Packet` value from the underlying stream.
71 ///
72 /// The function waits until it has retrieved enough data to parse a frame.
73 /// Any data remaining in the read buffer after the frame has been parsed is
74 /// kept there for the next call to `read_packet`.
75 ///
76 /// # Returns
77 ///
78 /// On success, the received frame is returned. If the `TcpStream`
79 /// is closed in a way that doesn't break a frame in half, it returns
80 /// `None`. Otherwise, an error is returned.
81 pub async fn read_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
82 loop {
83 // Attempt to parse a frame from the buffered data. If enough data
84 // has been buffered, the frame is returned.
85 if let Some(frame) = self.parse_packet(ctx)? {
86 return Ok(Some(frame));
87 }
88
89 // There is not enough buffered data to read a frame. Attempt to
90 // read more data from the socket.
91 //
92 // On success, the number of bytes is returned. `0` indicates "end
93 // of stream".
94 if 0 == self.stream.read_buf(&mut self.buffer).await? {
95 // The remote closed the connection. For this to be a clean
96 // shutdown, there should be no data in the read buffer. If
97 // there is, this means that the peer closed the socket while
98 // sending a frame.
99 if self.buffer.is_empty() {
100 return Ok(None);
101 } else {
102 return Err("connection reset by peer".into());
103 }
104 }
105 }
106 }
107
108 /// Tries to parse a frame from the buffer. If the buffer contains enough
109 /// data, the frame is returned and the data removed from the buffer. If not
110 /// enough data has been buffered yet, `Ok(None)` is returned. If the
111 /// buffered data does not represent a valid frame, `Err` is returned.
112 fn parse_packet(&mut self, ctx: &mut CHContext) -> crate::Result<Option<Packet>> {
113 // Cursor is used to track the "current" location in the
114 // buffer. Cursor also implements `Buf` from the `bytes` crate
115 // which provides a number of helpful utilities for working
116 // with bytes.
117 let mut buf = Cursor::new(&self.buffer[..]);
118 let mut parser = Parser::new(&mut buf, self.tz);
119
120 let hello = ctx.hello.clone();
121 let packet = parser.parse_packet(&hello, self.compress);
122
123 match packet {
124 Ok(packet) => {
125 match &packet {
126 Packet::Query(ref query) => self.compress = query.compression > 0,
127 _ => {}
128 }
129 // The `check` function will have advanced the cursor until the
130 // end of the frame. Since the cursor had position set to zero
131 // before `Packet::check` was called, we obtain the length of the
132 // frame by checking the cursor position.
133 let len = buf.position() as usize;
134 buf.set_position(0);
135 self.buffer.advance(len);
136 // Return the parsed frame to the caller.
137 Ok(Some(packet))
138 }
139 // There is not enough data present in the read buffer to parse a
140 // single frame. We must wait for more data to be received from the
141 // socket. Reading from the socket will be done in the statement
142 // after this `match`.
143 //
144 // We do not want to return `Err` from here as this "error" is an
145 // expected runtime condition.
146 Err(err) if err.is_would_block() => Ok(None),
147 // An error was encountered while parsing the frame. The connection
148 // is now in an invalid state. Returning `Err` from here will result
149 // in the connection being closed.
150 Err(e) => Err(e.into())
151 }
152 }
153
154 pub async fn write_block(&mut self, block: &Block) -> Result<()> {
155 let mut encoder = Encoder::new();
156 block.send_server_data(&mut encoder, self.compress);
157 self.stream.write_all(&encoder.get_buffer()).await?;
158 self.stream.flush().await?;
159 Ok(())
160 }
161
162 pub async fn write_progress(&mut self, progress: Progress, client_revision: u64) -> Result<()> {
163 let mut encoder = Encoder::new();
164 progress.write(&mut encoder, client_revision);
165 self.stream.write_all(&encoder.get_buffer()).await?;
166 self.stream.flush().await?;
167 Ok(())
168 }
169
170 pub async fn write_end_of_stream(&mut self) -> Result<()> {
171 let mut encoder = Encoder::new();
172 encoder.uvarint(SERVER_END_OF_STREAM);
173 self.write_bytes(encoder.get_buffer()).await?;
174 Ok(())
175 }
176
177 pub async fn write_error(&mut self, err: &Error) -> Result<()> {
178 let mut encoder = Encoder::new();
179 ExceptionResponse::write(&mut encoder, &err, self.with_stack_trace);
180
181 self.stream.write_all(&encoder.get_buffer()).await?;
182 self.stream.flush().await?;
183 Ok(())
184 }
185
186 pub async fn write_bytes(&mut self, bytes: Vec<u8>) -> Result<()> {
187 self.stream.write_all(&bytes).await?;
188 self.stream.flush().await?;
189 Ok(())
190 }
191}