questdb/ingress/sender/mod.rs
1/*******************************************************************************
2 * ___ _ ____ ____
3 * / _ \ _ _ ___ ___| |_| _ \| __ )
4 * | | | | | | |/ _ \/ __| __| | | | _ \
5 * | |_| | |_| | __/\__ \ |_| |_| | |_) |
6 * \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 * Copyright (c) 2014-2019 Appsicle
9 * Copyright (c) 2019-2025 QuestDB
10 *
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
22 *
23 ******************************************************************************/
24
25use crate::error::{self, Result};
26use crate::ingress::{Buffer, ProtocolVersion, SenderBuilder};
27use std::fmt::{Debug, Formatter};
28
29#[cfg(feature = "sync-sender-tcp")]
30mod tcp;
31
32#[cfg(feature = "sync-sender-tcp")]
33pub(crate) use tcp::*;
34
35#[cfg(feature = "sync-sender-tcp")]
36use std::io::Write;
37
38#[cfg(feature = "sync-sender-tcp")]
39use crate::ingress::map_io_to_socket_err;
40
41#[cfg(feature = "sync-sender-http")]
42mod http;
43
44#[cfg(feature = "sync-sender-http")]
45pub(crate) use http::*;
46
47pub(crate) enum SyncProtocolHandler {
48 #[cfg(feature = "sync-sender-tcp")]
49 SyncTcp(SyncConnection),
50
51 #[cfg(feature = "sync-sender-http")]
52 SyncHttp(SyncHttpHandlerState),
53}
54
55/// Connects to a QuestDB instance and inserts data via the ILP protocol.
56///
57/// * To construct an instance, use [`Sender::from_conf`] or the [`SenderBuilder`].
58/// * To prepare messages, use [`Buffer`] objects.
59/// * To send messages, call the [`flush`](Sender::flush) method.
60pub struct Sender {
61 descr: String,
62 handler: SyncProtocolHandler,
63 connected: bool,
64 max_buf_size: usize,
65 protocol_version: ProtocolVersion,
66 max_name_len: usize,
67}
68
69impl Debug for Sender {
70 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
71 f.write_str(self.descr.as_str())
72 }
73}
74
75impl Sender {
76 pub(crate) fn new(
77 descr: String,
78 handler: SyncProtocolHandler,
79 max_buf_size: usize,
80 protocol_version: ProtocolVersion,
81 max_name_len: usize,
82 ) -> Self {
83 Self {
84 descr,
85 handler,
86 connected: true,
87 max_buf_size,
88 protocol_version,
89 max_name_len,
90 }
91 }
92
93 /// Create a new `Sender` instance from the given configuration string.
94 ///
95 /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
96 ///
97 /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
98 ///
99 /// We recommend HTTP for most cases because it provides more features, like
100 /// reporting errors to the client and supporting transaction control. TCP can
101 /// sometimes be faster in higher-latency networks, but misses a number of
102 /// features.
103 ///
104 /// Keys in the config string correspond to same-named methods on `SenderBuilder`.
105 ///
106 /// For the full list of keys and values, see the docs on [`SenderBuilder`].
107 ///
108 /// You can also load the configuration from an environment variable.
109 /// See [`Sender::from_env`].
110 ///
111 /// In the case of TCP, this synchronously establishes the TCP connection, and
112 /// returns once the connection is fully established. If the connection
113 /// requires authentication or TLS, these will also be completed before
114 /// returning.
115 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
116 SenderBuilder::from_conf(conf)?.build()
117 }
118
119 /// Create a new `Sender` from the configuration stored in the `QDB_CLIENT_CONF`
120 /// environment variable. The format is the same as that accepted by
121 /// [`Sender::from_conf`].
122 ///
123 /// In the case of TCP, this synchronously establishes the TCP connection, and
124 /// returns once the connection is fully established. If the connection
125 /// requires authentication or TLS, these will also be completed before
126 /// returning.
127 pub fn from_env() -> Result<Self> {
128 SenderBuilder::from_env()?.build()
129 }
130
131 /// Creates a new [`Buffer`] using the sender's protocol settings
132 pub fn new_buffer(&self) -> Buffer {
133 Buffer::with_max_name_len(self.protocol_version, self.max_name_len)
134 }
135
136 #[allow(unused_variables)]
137 fn flush_impl(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
138 if !self.connected {
139 return Err(error::fmt!(
140 SocketError,
141 "Could not flush buffer: not connected to database."
142 ));
143 }
144 buf.check_can_flush()?;
145
146 if buf.len() > self.max_buf_size {
147 return Err(error::fmt!(
148 InvalidApiCall,
149 "Could not flush buffer: Buffer size of {} exceeds maximum configured allowed size of {} bytes.",
150 buf.len(),
151 self.max_buf_size
152 ));
153 }
154
155 self.check_protocol_version(buf.protocol_version())?;
156
157 let bytes = buf.as_bytes();
158 if bytes.is_empty() {
159 return Ok(());
160 }
161 match self.handler {
162 #[cfg(feature = "sync-sender-tcp")]
163 SyncProtocolHandler::SyncTcp(ref mut conn) => {
164 if transactional {
165 return Err(error::fmt!(
166 InvalidApiCall,
167 "Transactional flushes are not supported for ILP over TCP."
168 ));
169 }
170 conn.write_all(bytes).map_err(|io_err| {
171 self.connected = false;
172 map_io_to_socket_err("Could not flush buffer: ", io_err)
173 })?;
174 conn.flush().map_err(|io_err| {
175 self.connected = false;
176 map_io_to_socket_err("Could not flush to network: ", io_err)
177 })?;
178 Ok(())
179 }
180 #[cfg(feature = "sync-sender-http")]
181 SyncProtocolHandler::SyncHttp(ref state) => {
182 if transactional && !buf.transactional() {
183 return Err(error::fmt!(
184 InvalidApiCall,
185 "Buffer contains lines for multiple tables. \
186 Transactional flushes are only supported for buffers containing lines for a single table."
187 ));
188 }
189 let request_min_throughput = *state.config.request_min_throughput;
190 let extra_time = if request_min_throughput > 0 {
191 (bytes.len() as f64) / (request_min_throughput as f64)
192 } else {
193 0.0f64
194 };
195
196 match http_send_with_retries(
197 state,
198 bytes,
199 *state.config.request_timeout + std::time::Duration::from_secs_f64(extra_time),
200 *state.config.retry_timeout,
201 ) {
202 Ok(res) => {
203 if res.status().is_client_error() || res.status().is_server_error() {
204 Err(parse_http_error(res.status().as_u16(), res))
205 } else {
206 res.into_body();
207 Ok(())
208 }
209 }
210 Err(err) => Err(crate::error::Error::from_ureq_error(err, &state.url)),
211 }
212 }
213 }
214 }
215
216 /// Send the batch of rows in the buffer to the QuestDB server, and, if the
217 /// `transactional` parameter is true, ensure the flush will be transactional.
218 ///
219 /// A flush is transactional iff all the rows belong to the same table. This allows
220 /// QuestDB to treat the flush as a single database transaction, because it doesn't
221 /// support transactions spanning multiple tables. Additionally, only ILP-over-HTTP
222 /// supports transactional flushes.
223 ///
224 /// If the flush wouldn't be transactional, this function returns an error and
225 /// doesn't flush any data.
226 ///
227 /// The function sends an HTTP request and waits for the response. If the server
228 /// responds with an error, it returns a descriptive error. In the case of a network
229 /// error, it retries until it has exhausted the retry time budget.
230 ///
231 /// All the data stays in the buffer. Clear the buffer before starting a new batch.
232 #[cfg(feature = "sync-sender-http")]
233 pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
234 self.flush_impl(buf, transactional)
235 }
236
237 /// Send the given buffer of rows to the QuestDB server.
238 ///
239 /// All the data stays in the buffer. Clear the buffer before starting a new batch.
240 ///
241 /// To send and clear in one step, call [Sender::flush] instead.
242 pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> {
243 self.flush_impl(buf, false)
244 }
245
246 /// Send the given buffer of rows to the QuestDB server, clearing the buffer.
247 ///
248 /// After this function returns, the buffer is empty and ready for the next batch.
249 /// If you want to preserve the buffer contents, call [Sender::flush_and_keep]. If
250 /// you want to ensure the flush is transactional, call
251 /// [Sender::flush_and_keep_with_flags].
252 ///
253 /// With ILP-over-HTTP, this function sends an HTTP request and waits for the
254 /// response. If the server responds with an error, it returns a descriptive error.
255 /// In the case of a network error, it retries until it has exhausted the retry time
256 /// budget.
257 ///
258 /// With ILP-over-TCP, the function blocks only until the buffer is flushed to the
259 /// underlying OS-level network socket, without waiting to actually send it to the
260 /// server. In the case of an error, the server will quietly disconnect: consult the
261 /// server logs for error messages.
262 ///
263 /// HTTP should be the first choice, but use TCP if you need to continuously send
264 /// data to the server at a high rate.
265 ///
266 /// To improve the HTTP performance, send larger buffers (with more rows), and
267 /// consider parallelizing writes using multiple senders from multiple threads.
268 pub fn flush(&mut self, buf: &mut Buffer) -> crate::Result<()> {
269 self.flush_impl(buf, false)?;
270 buf.clear();
271 Ok(())
272 }
273
274 /// Tell whether the sender is no longer usable and must be dropped.
275 ///
276 /// This happens when there was an earlier failure.
277 ///
278 /// This method is specific to ILP-over-TCP and is not relevant for ILP-over-HTTP.
279 pub fn must_close(&self) -> bool {
280 !self.connected
281 }
282
283 /// Returns the sender's protocol version
284 ///
285 /// - Explicitly set version, or
286 /// - Auto-detected for HTTP transport, or [`ProtocolVersion::V1`] for TCP transport.
287 pub fn protocol_version(&self) -> ProtocolVersion {
288 self.protocol_version
289 }
290
291 /// Return the sender's maxinum name length of any column or table name.
292 /// This is either set explicitly when constructing the sender,
293 /// or the default value of 127.
294 /// When unset and using protocol version 2 over HTTP, the value is read
295 /// from the server from the `cairo.max.file.name.length` setting in
296 /// `server.conf` which defaults to 127.
297 pub fn max_name_len(&self) -> usize {
298 self.max_name_len
299 }
300
301 #[inline(always)]
302 fn check_protocol_version(&self, version: ProtocolVersion) -> Result<()> {
303 if self.protocol_version != version {
304 return Err(error::fmt!(
305 ProtocolVersionError,
306 "Attempting to send with protocol version {} \
307 but the sender is configured to use protocol version {}",
308 version,
309 self.protocol_version
310 ));
311 }
312 Ok(())
313 }
314}