questdb/ingress/sender/
mod.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2025 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24
25use crate::error::{self, Result};
26use crate::ingress::{Buffer, ProtocolVersion, SenderBuilder};
27use std::fmt::{Debug, Formatter};
28
29#[cfg(feature = "sync-sender-tcp")]
30mod tcp;
31
32#[cfg(feature = "sync-sender-tcp")]
33pub(crate) use tcp::*;
34
35#[cfg(feature = "sync-sender-tcp")]
36use std::io::Write;
37
38#[cfg(feature = "sync-sender-tcp")]
39use crate::ingress::map_io_to_socket_err;
40
41#[cfg(feature = "sync-sender-http")]
42mod http;
43
44#[cfg(feature = "sync-sender-http")]
45pub(crate) use http::*;
46
47pub(crate) enum SyncProtocolHandler {
48    #[cfg(feature = "sync-sender-tcp")]
49    SyncTcp(SyncConnection),
50
51    #[cfg(feature = "sync-sender-http")]
52    SyncHttp(SyncHttpHandlerState),
53}
54
55/// Connects to a QuestDB instance and inserts data via the ILP protocol.
56///
57/// * To construct an instance, use [`Sender::from_conf`] or the [`SenderBuilder`].
58/// * To prepare messages, use [`Buffer`] objects.
59/// * To send messages, call the [`flush`](Sender::flush) method.
60pub struct Sender {
61    descr: String,
62    handler: SyncProtocolHandler,
63    connected: bool,
64    max_buf_size: usize,
65    protocol_version: ProtocolVersion,
66    max_name_len: usize,
67}
68
69impl Debug for Sender {
70    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
71        f.write_str(self.descr.as_str())
72    }
73}
74
75impl Sender {
76    pub(crate) fn new(
77        descr: String,
78        handler: SyncProtocolHandler,
79        max_buf_size: usize,
80        protocol_version: ProtocolVersion,
81        max_name_len: usize,
82    ) -> Self {
83        Self {
84            descr,
85            handler,
86            connected: true,
87            max_buf_size,
88            protocol_version,
89            max_name_len,
90        }
91    }
92
93    /// Create a new `Sender` instance from the given configuration string.
94    ///
95    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
96    ///
97    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
98    ///
99    /// We recommend HTTP for most cases because it provides more features, like
100    /// reporting errors to the client and supporting transaction control. TCP can
101    /// sometimes be faster in higher-latency networks, but misses a number of
102    /// features.
103    ///
104    /// Keys in the config string correspond to same-named methods on `SenderBuilder`.
105    ///
106    /// For the full list of keys and values, see the docs on [`SenderBuilder`].
107    ///
108    /// You can also load the configuration from an environment variable.
109    /// See [`Sender::from_env`].
110    ///
111    /// In the case of TCP, this synchronously establishes the TCP connection, and
112    /// returns once the connection is fully established. If the connection
113    /// requires authentication or TLS, these will also be completed before
114    /// returning.
115    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
116        SenderBuilder::from_conf(conf)?.build()
117    }
118
119    /// Create a new `Sender` from the configuration stored in the `QDB_CLIENT_CONF`
120    /// environment variable. The format is the same as that accepted by
121    /// [`Sender::from_conf`].
122    ///
123    /// In the case of TCP, this synchronously establishes the TCP connection, and
124    /// returns once the connection is fully established. If the connection
125    /// requires authentication or TLS, these will also be completed before
126    /// returning.
127    pub fn from_env() -> Result<Self> {
128        SenderBuilder::from_env()?.build()
129    }
130
131    /// Creates a new [`Buffer`] using the sender's protocol settings
132    pub fn new_buffer(&self) -> Buffer {
133        Buffer::with_max_name_len(self.protocol_version, self.max_name_len)
134    }
135
136    #[allow(unused_variables)]
137    fn flush_impl(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
138        if !self.connected {
139            return Err(error::fmt!(
140                SocketError,
141                "Could not flush buffer: not connected to database."
142            ));
143        }
144        buf.check_can_flush()?;
145
146        if buf.len() > self.max_buf_size {
147            return Err(error::fmt!(
148                InvalidApiCall,
149                "Could not flush buffer: Buffer size of {} exceeds maximum configured allowed size of {} bytes.",
150                buf.len(),
151                self.max_buf_size
152            ));
153        }
154
155        self.check_protocol_version(buf.protocol_version())?;
156
157        let bytes = buf.as_bytes();
158        if bytes.is_empty() {
159            return Ok(());
160        }
161        match self.handler {
162            #[cfg(feature = "sync-sender-tcp")]
163            SyncProtocolHandler::SyncTcp(ref mut conn) => {
164                if transactional {
165                    return Err(error::fmt!(
166                        InvalidApiCall,
167                        "Transactional flushes are not supported for ILP over TCP."
168                    ));
169                }
170                conn.write_all(bytes).map_err(|io_err| {
171                    self.connected = false;
172                    map_io_to_socket_err("Could not flush buffer: ", io_err)
173                })?;
174                conn.flush().map_err(|io_err| {
175                    self.connected = false;
176                    map_io_to_socket_err("Could not flush to network: ", io_err)
177                })?;
178                Ok(())
179            }
180            #[cfg(feature = "sync-sender-http")]
181            SyncProtocolHandler::SyncHttp(ref state) => {
182                if transactional && !buf.transactional() {
183                    return Err(error::fmt!(
184                        InvalidApiCall,
185                        "Buffer contains lines for multiple tables. \
186                        Transactional flushes are only supported for buffers containing lines for a single table."
187                    ));
188                }
189                let request_min_throughput = *state.config.request_min_throughput;
190                let extra_time = if request_min_throughput > 0 {
191                    (bytes.len() as f64) / (request_min_throughput as f64)
192                } else {
193                    0.0f64
194                };
195
196                match http_send_with_retries(
197                    state,
198                    bytes,
199                    *state.config.request_timeout + std::time::Duration::from_secs_f64(extra_time),
200                    *state.config.retry_timeout,
201                ) {
202                    Ok(res) => {
203                        if res.status().is_client_error() || res.status().is_server_error() {
204                            Err(parse_http_error(res.status().as_u16(), res))
205                        } else {
206                            res.into_body();
207                            Ok(())
208                        }
209                    }
210                    Err(err) => Err(crate::error::Error::from_ureq_error(err, &state.url)),
211                }
212            }
213        }
214    }
215
216    /// Send the batch of rows in the buffer to the QuestDB server, and, if the
217    /// `transactional` parameter is true, ensure the flush will be transactional.
218    ///
219    /// A flush is transactional iff all the rows belong to the same table. This allows
220    /// QuestDB to treat the flush as a single database transaction, because it doesn't
221    /// support transactions spanning multiple tables. Additionally, only ILP-over-HTTP
222    /// supports transactional flushes.
223    ///
224    /// If the flush wouldn't be transactional, this function returns an error and
225    /// doesn't flush any data.
226    ///
227    /// The function sends an HTTP request and waits for the response. If the server
228    /// responds with an error, it returns a descriptive error. In the case of a network
229    /// error, it retries until it has exhausted the retry time budget.
230    ///
231    /// All the data stays in the buffer. Clear the buffer before starting a new batch.
232    #[cfg(feature = "sync-sender-http")]
233    pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
234        self.flush_impl(buf, transactional)
235    }
236
237    /// Send the given buffer of rows to the QuestDB server.
238    ///
239    /// All the data stays in the buffer. Clear the buffer before starting a new batch.
240    ///
241    /// To send and clear in one step, call [Sender::flush] instead.
242    pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> {
243        self.flush_impl(buf, false)
244    }
245
246    /// Send the given buffer of rows to the QuestDB server, clearing the buffer.
247    ///
248    /// After this function returns, the buffer is empty and ready for the next batch.
249    /// If you want to preserve the buffer contents, call [Sender::flush_and_keep]. If
250    /// you want to ensure the flush is transactional, call
251    /// [Sender::flush_and_keep_with_flags].
252    ///
253    /// With ILP-over-HTTP, this function sends an HTTP request and waits for the
254    /// response. If the server responds with an error, it returns a descriptive error.
255    /// In the case of a network error, it retries until it has exhausted the retry time
256    /// budget.
257    ///
258    /// With ILP-over-TCP, the function blocks only until the buffer is flushed to the
259    /// underlying OS-level network socket, without waiting to actually send it to the
260    /// server. In the case of an error, the server will quietly disconnect: consult the
261    /// server logs for error messages.
262    ///
263    /// HTTP should be the first choice, but use TCP if you need to continuously send
264    /// data to the server at a high rate.
265    ///
266    /// To improve the HTTP performance, send larger buffers (with more rows), and
267    /// consider parallelizing writes using multiple senders from multiple threads.
268    pub fn flush(&mut self, buf: &mut Buffer) -> crate::Result<()> {
269        self.flush_impl(buf, false)?;
270        buf.clear();
271        Ok(())
272    }
273
274    /// Tell whether the sender is no longer usable and must be dropped.
275    ///
276    /// This happens when there was an earlier failure.
277    ///
278    /// This method is specific to ILP-over-TCP and is not relevant for ILP-over-HTTP.
279    pub fn must_close(&self) -> bool {
280        !self.connected
281    }
282
283    /// Returns the sender's protocol version
284    ///
285    /// - Explicitly set version, or
286    /// - Auto-detected for HTTP transport, or [`ProtocolVersion::V1`] for TCP transport.
287    pub fn protocol_version(&self) -> ProtocolVersion {
288        self.protocol_version
289    }
290
291    /// Return the sender's maxinum name length of any column or table name.
292    /// This is either set explicitly when constructing the sender,
293    /// or the default value of 127.
294    /// When unset and using protocol version 2 over HTTP, the value is read
295    /// from the server from the `cairo.max.file.name.length` setting in
296    /// `server.conf` which defaults to 127.
297    pub fn max_name_len(&self) -> usize {
298        self.max_name_len
299    }
300
301    #[inline(always)]
302    fn check_protocol_version(&self, version: ProtocolVersion) -> Result<()> {
303        if self.protocol_version != version {
304            return Err(error::fmt!(
305                ProtocolVersionError,
306                "Attempting to send with protocol version {} \
307                but the sender is configured to use protocol version {}",
308                version,
309                self.protocol_version
310            ));
311        }
312        Ok(())
313    }
314}