questdb/ingress/
mod.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2024 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24
25#![doc = include_str!("mod.md")]
26
27pub use self::timestamp::*;
28
29use crate::error::{self, Error, Result};
30use crate::gai;
31use crate::ingress::conf::ConfigSetting;
32use core::time::Duration;
33use std::collections::HashMap;
34use std::convert::Infallible;
35use std::fmt::{Debug, Display, Formatter, Write};
36use std::io::{self, BufRead, BufReader, ErrorKind, Write as IoWrite};
37use std::num::NonZeroUsize;
38use std::ops::Deref;
39use std::path::PathBuf;
40use std::str::FromStr;
41use std::sync::Arc;
42
43use base64ct::{Base64, Base64UrlUnpadded, Encoding};
44use ring::rand::SystemRandom;
45use ring::signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING};
46use rustls::{ClientConnection, RootCertStore, StreamOwned};
47use rustls_pki_types::ServerName;
48use socket2::{Domain, Protocol as SockProtocol, SockAddr, Socket, Type};
49
50#[derive(Debug, Copy, Clone)]
51enum Op {
52    Table = 1,
53    Symbol = 1 << 1,
54    Column = 1 << 2,
55    At = 1 << 3,
56    Flush = 1 << 4,
57}
58
59impl Op {
60    fn descr(self) -> &'static str {
61        match self {
62            Op::Table => "table",
63            Op::Symbol => "symbol",
64            Op::Column => "column",
65            Op::At => "at",
66            Op::Flush => "flush",
67        }
68    }
69}
70
71fn map_io_to_socket_err(prefix: &str, io_err: io::Error) -> Error {
72    error::fmt!(SocketError, "{}{}", prefix, io_err)
73}
74
75/// A validated table name.
76///
77/// This type simply wraps a `&str`.
78///
79/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
80/// it doesn't have to validate it again. This saves CPU cycles.
81#[derive(Clone, Copy)]
82pub struct TableName<'a> {
83    name: &'a str,
84}
85
86impl<'a> TableName<'a> {
87    /// Construct a validated table name.
88    pub fn new(name: &'a str) -> Result<Self> {
89        if name.is_empty() {
90            return Err(error::fmt!(
91                InvalidName,
92                "Table names must have a non-zero length."
93            ));
94        }
95
96        let mut prev = '\0';
97        for (index, c) in name.chars().enumerate() {
98            match c {
99                '.' => {
100                    if index == 0 || index == name.len() - 1 || prev == '.' {
101                        return Err(error::fmt!(
102                            InvalidName,
103                            concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
104                            name,
105                            index
106                        ));
107                    }
108                }
109                '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
110                | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
111                | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
112                | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
113                    return Err(error::fmt!(
114                        InvalidName,
115                        concat!(
116                            "Bad string {:?}: ",
117                            "Table names can't contain ",
118                            "a {:?} character, which was found at ",
119                            "byte position {}."
120                        ),
121                        name,
122                        c,
123                        index
124                    ));
125                }
126                '\u{feff}' => {
127                    // Reject unicode char 'ZERO WIDTH NO-BREAK SPACE',
128                    // aka UTF-8 BOM if it appears anywhere in the string.
129                    return Err(error::fmt!(
130                        InvalidName,
131                        concat!(
132                            "Bad string {:?}: ",
133                            "Table names can't contain ",
134                            "a UTF-8 BOM character, which was found at ",
135                            "byte position {}."
136                        ),
137                        name,
138                        index
139                    ));
140                }
141                _ => (),
142            }
143            prev = c;
144        }
145
146        Ok(Self { name })
147    }
148
149    /// Construct a table name without validating it.
150    ///
151    /// This breaks API encapsulation and is only intended for use
152    /// when the the string was already previously validated.
153    ///
154    /// The QuestDB server will reject an invalid table name.
155    pub fn new_unchecked(name: &'a str) -> Self {
156        Self { name }
157    }
158}
159
160/// A validated column name.
161///
162/// This type simply wraps a `&str`.
163///
164/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
165/// it doesn't have to validate it again. This saves CPU cycles.
166#[derive(Clone, Copy)]
167pub struct ColumnName<'a> {
168    name: &'a str,
169}
170
171impl<'a> ColumnName<'a> {
172    /// Construct a validated table name.
173    pub fn new(name: &'a str) -> Result<Self> {
174        if name.is_empty() {
175            return Err(error::fmt!(
176                InvalidName,
177                "Column names must have a non-zero length."
178            ));
179        }
180
181        for (index, c) in name.chars().enumerate() {
182            match c {
183                '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
184                | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
185                | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
186                | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
187                    return Err(error::fmt!(
188                        InvalidName,
189                        concat!(
190                            "Bad string {:?}: ",
191                            "Column names can't contain ",
192                            "a {:?} character, which was found at ",
193                            "byte position {}."
194                        ),
195                        name,
196                        c,
197                        index
198                    ));
199                }
200                '\u{FEFF}' => {
201                    // Reject unicode char 'ZERO WIDTH NO-BREAK SPACE',
202                    // aka UTF-8 BOM if it appears anywhere in the string.
203                    return Err(error::fmt!(
204                        InvalidName,
205                        concat!(
206                            "Bad string {:?}: ",
207                            "Column names can't contain ",
208                            "a UTF-8 BOM character, which was found at ",
209                            "byte position {}."
210                        ),
211                        name,
212                        index
213                    ));
214                }
215                _ => (),
216            }
217        }
218
219        Ok(Self { name })
220    }
221
222    /// Construct a column name without validating it.
223    ///
224    /// This breaks API encapsulation and is only intended for use
225    /// when the the string was already previously validated.
226    ///
227    /// The QuestDB server will reject an invalid column name.
228    pub fn new_unchecked(name: &'a str) -> Self {
229        Self { name }
230    }
231}
232
233impl<'a> TryFrom<&'a str> for TableName<'a> {
234    type Error = self::Error;
235
236    fn try_from(name: &'a str) -> Result<Self> {
237        Self::new(name)
238    }
239}
240
241impl<'a> TryFrom<&'a str> for ColumnName<'a> {
242    type Error = self::Error;
243
244    fn try_from(name: &'a str) -> Result<Self> {
245        Self::new(name)
246    }
247}
248
249impl From<Infallible> for Error {
250    fn from(_: Infallible) -> Self {
251        unreachable!()
252    }
253}
254
255fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut String, s: &str)
256where
257    C: Fn(u8) -> bool,
258    Q: Fn(&mut Vec<u8>),
259{
260    let output_vec = unsafe { output.as_mut_vec() };
261    let mut to_escape = 0usize;
262    for b in s.bytes() {
263        if check_escape_fn(b) {
264            to_escape += 1;
265        }
266    }
267
268    quoting_fn(output_vec);
269
270    if to_escape == 0 {
271        // output.push_str(s);
272        output_vec.extend_from_slice(s.as_bytes());
273    } else {
274        let additional = s.len() + to_escape;
275        output_vec.reserve(additional);
276        let mut index = output_vec.len();
277        unsafe { output_vec.set_len(index + additional) };
278        for b in s.bytes() {
279            if check_escape_fn(b) {
280                unsafe {
281                    *output_vec.get_unchecked_mut(index) = b'\\';
282                }
283                index += 1;
284            }
285
286            unsafe {
287                *output_vec.get_unchecked_mut(index) = b;
288            }
289            index += 1;
290        }
291    }
292
293    quoting_fn(output_vec);
294}
295
296fn must_escape_unquoted(c: u8) -> bool {
297    matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
298}
299
300fn must_escape_quoted(c: u8) -> bool {
301    matches!(c, b'\n' | b'\r' | b'"' | b'\\')
302}
303
304fn write_escaped_unquoted(output: &mut String, s: &str) {
305    write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
306}
307
308fn write_escaped_quoted(output: &mut String, s: &str) {
309    write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
310}
311
312enum Connection {
313    Direct(Socket),
314    Tls(Box<StreamOwned<ClientConnection, Socket>>),
315}
316
317impl Connection {
318    fn send_key_id(&mut self, key_id: &str) -> Result<()> {
319        writeln!(self, "{}", key_id)
320            .map_err(|io_err| map_io_to_socket_err("Failed to send key_id: ", io_err))?;
321        Ok(())
322    }
323
324    fn read_challenge(&mut self) -> Result<Vec<u8>> {
325        let mut buf = Vec::new();
326        let mut reader = BufReader::new(self);
327        reader.read_until(b'\n', &mut buf).map_err(|io_err| {
328            map_io_to_socket_err(
329                "Failed to read authentication challenge (timed out?): ",
330                io_err,
331            )
332        })?;
333        if buf.last().copied().unwrap_or(b'\0') != b'\n' {
334            return Err(if buf.is_empty() {
335                error::fmt!(
336                    AuthError,
337                    concat!(
338                        "Did not receive auth challenge. ",
339                        "Is the database configured to require ",
340                        "authentication?"
341                    )
342                )
343            } else {
344                error::fmt!(AuthError, "Received incomplete auth challenge: {:?}", buf)
345            });
346        }
347        buf.pop(); // b'\n'
348        Ok(buf)
349    }
350
351    fn authenticate(&mut self, auth: &EcdsaAuthParams) -> Result<()> {
352        if auth.key_id.contains('\n') {
353            return Err(error::fmt!(
354                AuthError,
355                "Bad key id {:?}: Should not contain new-line char.",
356                auth.key_id
357            ));
358        }
359        let key_pair = parse_key_pair(auth)?;
360        self.send_key_id(auth.key_id.as_str())?;
361        let challenge = self.read_challenge()?;
362        let rng = SystemRandom::new();
363        let signature = key_pair
364            .sign(&rng, &challenge[..])
365            .map_err(|unspecified_err| {
366                error::fmt!(AuthError, "Failed to sign challenge: {}", unspecified_err)
367            })?;
368        let mut encoded_sig = Base64::encode_string(signature.as_ref());
369        encoded_sig.push('\n');
370        let buf = encoded_sig.as_bytes();
371        if let Err(io_err) = self.write_all(buf) {
372            return Err(map_io_to_socket_err(
373                "Could not send signed challenge: ",
374                io_err,
375            ));
376        }
377        Ok(())
378    }
379}
380
381enum ProtocolHandler {
382    Socket(Connection),
383
384    #[cfg(feature = "ilp-over-http")]
385    Http(HttpHandlerState),
386}
387
388impl io::Read for Connection {
389    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
390        match self {
391            Self::Direct(sock) => sock.read(buf),
392            Self::Tls(stream) => stream.read(buf),
393        }
394    }
395}
396
397impl io::Write for Connection {
398    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
399        match self {
400            Self::Direct(sock) => sock.write(buf),
401            Self::Tls(stream) => stream.write(buf),
402        }
403    }
404
405    fn flush(&mut self) -> io::Result<()> {
406        match self {
407            Self::Direct(sock) => sock.flush(),
408            Self::Tls(stream) => stream.flush(),
409        }
410    }
411}
412
413#[derive(Debug, Copy, Clone, PartialEq)]
414enum OpCase {
415    Init = Op::Table as isize,
416    TableWritten = Op::Symbol as isize | Op::Column as isize,
417    SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
418    ColumnWritten = Op::Column as isize | Op::At as isize,
419    MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
420}
421
422impl OpCase {
423    fn next_op_descr(self) -> &'static str {
424        match self {
425            OpCase::Init => "should have called `table` instead",
426            OpCase::TableWritten => "should have called `symbol` or `column` instead",
427            OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
428            OpCase::ColumnWritten => "should have called `column` or `at` instead",
429            OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
430        }
431    }
432}
433
434// IMPORTANT: This struct MUST remain `Copy` to ensure that
435// there are no heap allocations when performing marker operations.
436#[derive(Debug, Clone, Copy)]
437struct BufferState {
438    op_case: OpCase,
439    row_count: usize,
440    first_table_len: Option<NonZeroUsize>,
441    transactional: bool,
442}
443
444impl BufferState {
445    fn new() -> Self {
446        Self {
447            op_case: OpCase::Init,
448            row_count: 0,
449            first_table_len: None,
450            transactional: true,
451        }
452    }
453}
454
455/// A reusable buffer to prepare a batch of ILP messages.
456///
457/// # Example
458///
459/// ```
460/// # use questdb::Result;
461/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
462///
463/// # fn main() -> Result<()> {
464/// let mut buffer = Buffer::new();
465///
466/// // first row
467/// buffer
468///     .table("table1")?
469///     .symbol("bar", "baz")?
470///     .column_bool("a", false)?
471///     .column_i64("b", 42)?
472///     .column_f64("c", 3.14)?
473///     .column_str("d", "hello")?
474///     .column_ts("e", TimestampMicros::now())?
475///     .at(TimestampNanos::now())?;
476///
477/// // second row
478/// buffer
479///     .table("table2")?
480///     .symbol("foo", "bar")?
481///     .at(TimestampNanos::now())?;
482/// # Ok(())
483/// # }
484/// ```
485///
486/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
487///
488/// # Sequential Coupling
489/// The Buffer API is sequentially coupled:
490///   * A row always starts with [`table`](Buffer::table).
491///   * A row must contain at least one [`symbol`](Buffer::symbol) or
492///     column (
493///     [`column_bool`](Buffer::column_bool),
494///     [`column_i64`](Buffer::column_i64),
495///     [`column_f64`](Buffer::column_f64),
496///     [`column_str`](Buffer::column_str),
497///     [`column_ts`](Buffer::column_ts)).
498///   * Symbols must appear before columns.
499///   * A row must be terminated with either [`at`](Buffer::at) or
500///     [`at_now`](Buffer::at_now).
501///
502/// This diagram visualizes the sequence:
503///
504/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
505///
506/// # Buffer method calls, Serialized ILP types and QuestDB types
507///
508/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
509/// |---------------|--------------------------------------------------------------|
510/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
511/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
512/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
513/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
514/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
515/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
516///
517/// QuestDB supports both `STRING` and `SYMBOL` column types.
518///
519/// To understand the difference, refer to the
520/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
521/// symbols are interned strings, most suitable for identifiers that are repeated many
522/// times throughout the column. They offer an advantage in storage space and query
523/// performance.
524///
525/// # Inserting NULL values
526///
527/// To insert a NULL value, skip the symbol or column for that row.
528///
529/// # Recovering from validation errors
530///
531/// If you want to recover from potential validation errors, call
532/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
533/// append as many rows or parts of rows as you like, and then call
534/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
535///
536/// If there was an error in one of the rows, use
537/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
538/// marked last known good state.
539///
540#[derive(Debug, Clone)]
541pub struct Buffer {
542    output: String,
543    state: BufferState,
544    marker: Option<(usize, BufferState)>,
545    max_name_len: usize,
546}
547
548impl Buffer {
549    /// Construct a `Buffer` with a `max_name_len` of `127`, which is the same as the
550    /// QuestDB server default.
551    pub fn new() -> Self {
552        Self {
553            output: String::new(),
554            state: BufferState::new(),
555            marker: None,
556            max_name_len: 127,
557        }
558    }
559
560    /// Construct a `Buffer` with a custom maximum length for table and column names.
561    ///
562    /// This should match the `cairo.max.file.name.length` setting of the
563    /// QuestDB instance you're connecting to.
564    ///
565    /// If the server does not configure it, the default is `127` and you can simply
566    /// call [`new`](Buffer::new).
567    pub fn with_max_name_len(max_name_len: usize) -> Self {
568        let mut buf = Self::new();
569        buf.max_name_len = max_name_len;
570        buf
571    }
572
573    /// Pre-allocate to ensure the buffer has enough capacity for at least the
574    /// specified additional byte count. This may be rounded up.
575    /// This does not allocate if such additional capacity is already satisfied.
576    /// See: `capacity`.
577    pub fn reserve(&mut self, additional: usize) {
578        self.output.reserve(additional);
579    }
580
581    /// The number of bytes accumulated in the buffer.
582    pub fn len(&self) -> usize {
583        self.output.len()
584    }
585
586    /// The number of rows accumulated in the buffer.
587    pub fn row_count(&self) -> usize {
588        self.state.row_count
589    }
590
591    /// Tells whether the buffer is transactional. It is transactional iff it contains
592    /// data for at most one table. Additionally, you must send the buffer over HTTP to
593    /// get transactional behavior.
594    pub fn transactional(&self) -> bool {
595        self.state.transactional
596    }
597
598    pub fn is_empty(&self) -> bool {
599        self.output.is_empty()
600    }
601
602    /// The total number of bytes the buffer can hold before it needs to resize.
603    pub fn capacity(&self) -> usize {
604        self.output.capacity()
605    }
606
607    /// A string representation of the buffer's contents. Useful for debugging.
608    pub fn as_str(&self) -> &str {
609        &self.output
610    }
611
612    /// Mark a rewind point.
613    /// This allows undoing accumulated changes to the buffer for one or more
614    /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
615    /// Any previous marker will be discarded.
616    /// Once the marker is no longer needed, call
617    /// [`clear_marker`](Buffer::clear_marker).
618    pub fn set_marker(&mut self) -> Result<()> {
619        if (self.state.op_case as isize & Op::Table as isize) == 0 {
620            return Err(error::fmt!(
621                InvalidApiCall,
622                concat!(
623                    "Can't set the marker whilst constructing a line. ",
624                    "A marker may only be set on an empty buffer or after ",
625                    "`at` or `at_now` is called."
626                )
627            ));
628        }
629        self.marker = Some((self.output.len(), self.state));
630        Ok(())
631    }
632
633    /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
634    /// call.
635    ///
636    /// As a side-effect, this also clears the marker.
637    pub fn rewind_to_marker(&mut self) -> Result<()> {
638        if let Some((position, state)) = self.marker.take() {
639            self.output.truncate(position);
640            self.state = state;
641            Ok(())
642        } else {
643            Err(error::fmt!(
644                InvalidApiCall,
645                "Can't rewind to the marker: No marker set."
646            ))
647        }
648    }
649
650    /// Discard any marker as may have been set by
651    /// [`set_marker`](Buffer::set_marker).
652    ///
653    /// Idempotent.
654    pub fn clear_marker(&mut self) {
655        self.marker = None;
656    }
657
658    /// Reset the buffer and clear contents whilst retaining
659    /// [`capacity`](Buffer::capacity).
660    pub fn clear(&mut self) {
661        self.output.clear();
662        self.state = BufferState::new();
663        self.marker = None;
664    }
665
666    /// Check if the next API operation is allowed as per the OP case state machine.
667    #[inline(always)]
668    fn check_op(&self, op: Op) -> Result<()> {
669        if (self.state.op_case as isize & op as isize) > 0 {
670            Ok(())
671        } else {
672            Err(error::fmt!(
673                InvalidApiCall,
674                "State error: Bad call to `{}`, {}.",
675                op.descr(),
676                self.state.op_case.next_op_descr()
677            ))
678        }
679    }
680
681    #[inline(always)]
682    fn validate_max_name_len(&self, name: &str) -> Result<()> {
683        if name.len() > self.max_name_len {
684            return Err(error::fmt!(
685                InvalidName,
686                "Bad name: {:?}: Too long (max {} characters)",
687                name,
688                self.max_name_len
689            ));
690        }
691        Ok(())
692    }
693
694    /// Begin recording a new row for the given table.
695    ///
696    /// ```
697    /// # use questdb::Result;
698    /// # use questdb::ingress::Buffer;
699    /// # fn main() -> Result<()> {
700    /// # let mut buffer = Buffer::new();
701    /// buffer.table("table_name")?;
702    /// # Ok(())
703    /// # }
704    /// ```
705    ///
706    /// or
707    ///
708    /// ```
709    /// # use questdb::Result;
710    /// # use questdb::ingress::Buffer;
711    /// use questdb::ingress::TableName;
712    ///
713    /// # fn main() -> Result<()> {
714    /// # let mut buffer = Buffer::new();
715    /// let table_name = TableName::new("table_name")?;
716    /// buffer.table(table_name)?;
717    /// # Ok(())
718    /// # }
719    /// ```
720    pub fn table<'a, N>(&mut self, name: N) -> Result<&mut Self>
721    where
722        N: TryInto<TableName<'a>>,
723        Error: From<N::Error>,
724    {
725        let name: TableName<'a> = name.try_into()?;
726        self.validate_max_name_len(name.name)?;
727        self.check_op(Op::Table)?;
728        let table_begin = self.output.len();
729        write_escaped_unquoted(&mut self.output, name.name);
730        let table_end = self.output.len();
731        self.state.op_case = OpCase::TableWritten;
732
733        // A buffer stops being transactional if it targets multiple tables.
734        if let Some(first_table_len) = &self.state.first_table_len {
735            let first_table = &self.output[0..(first_table_len.get())];
736            let this_table = &self.output[table_begin..table_end];
737            if first_table != this_table {
738                self.state.transactional = false;
739            }
740        } else {
741            debug_assert!(table_begin == 0);
742
743            // This is a bit confusing, so worth explaining:
744            // `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
745            // but we know that `table_end` is never 0 here, we just need an option type
746            // anyway, so we don't bother unwrapping it to then wrap it again.
747            let first_table_len = NonZeroUsize::new(table_end);
748
749            // Instead we just assert that it's `Some`.
750            debug_assert!(first_table_len.is_some());
751
752            self.state.first_table_len = first_table_len;
753        }
754        Ok(self)
755    }
756
757    /// Record a symbol for the given column.
758    /// Make sure you record all symbol columns before any other column type.
759    ///
760    /// ```
761    /// # use questdb::Result;
762    /// # use questdb::ingress::Buffer;
763    /// # fn main() -> Result<()> {
764    /// # let mut buffer = Buffer::new();
765    /// # buffer.table("x")?;
766    /// buffer.symbol("col_name", "value")?;
767    /// # Ok(())
768    /// # }
769    /// ```
770    ///
771    /// or
772    ///
773    /// ```
774    /// # use questdb::Result;
775    /// # use questdb::ingress::Buffer;
776    /// # fn main() -> Result<()> {
777    /// # let mut buffer = Buffer::new();
778    /// # buffer.table("x")?;
779    /// let value: String = "value".to_owned();
780    /// buffer.symbol("col_name", value)?;
781    /// # Ok(())
782    /// # }
783    /// ```
784    ///
785    /// or
786    ///
787    /// ```
788    /// # use questdb::Result;
789    /// # use questdb::ingress::Buffer;
790    /// use questdb::ingress::ColumnName;
791    ///
792    /// # fn main() -> Result<()> {
793    /// # let mut buffer = Buffer::new();
794    /// # buffer.table("x")?;
795    /// let col_name = ColumnName::new("col_name")?;
796    /// buffer.symbol(col_name, "value")?;
797    /// # Ok(())
798    /// # }
799    /// ```
800    ///
801    pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
802    where
803        N: TryInto<ColumnName<'a>>,
804        S: AsRef<str>,
805        Error: From<N::Error>,
806    {
807        let name: ColumnName<'a> = name.try_into()?;
808        self.validate_max_name_len(name.name)?;
809        self.check_op(Op::Symbol)?;
810        self.output.push(',');
811        write_escaped_unquoted(&mut self.output, name.name);
812        self.output.push('=');
813        write_escaped_unquoted(&mut self.output, value.as_ref());
814        self.state.op_case = OpCase::SymbolWritten;
815        Ok(self)
816    }
817
818    fn write_column_key<'a, N>(&mut self, name: N) -> Result<&mut Self>
819    where
820        N: TryInto<ColumnName<'a>>,
821        Error: From<N::Error>,
822    {
823        let name: ColumnName<'a> = name.try_into()?;
824        self.validate_max_name_len(name.name)?;
825        self.check_op(Op::Column)?;
826        self.output
827            .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
828                ' '
829            } else {
830                ','
831            });
832        write_escaped_unquoted(&mut self.output, name.name);
833        self.output.push('=');
834        self.state.op_case = OpCase::ColumnWritten;
835        Ok(self)
836    }
837
838    /// Record a boolean value for the given column.
839    ///
840    /// ```
841    /// # use questdb::Result;
842    /// # use questdb::ingress::Buffer;
843    /// # fn main() -> Result<()> {
844    /// # let mut buffer = Buffer::new();
845    /// # buffer.table("x")?;
846    /// buffer.column_bool("col_name", true)?;
847    /// # Ok(())
848    /// # }
849    /// ```
850    ///
851    /// or
852    ///
853    /// ```
854    /// # use questdb::Result;
855    /// # use questdb::ingress::Buffer;
856    /// use questdb::ingress::ColumnName;
857    ///
858    /// # fn main() -> Result<()> {
859    /// # let mut buffer = Buffer::new();
860    /// # buffer.table("x")?;
861    /// let col_name = ColumnName::new("col_name")?;
862    /// buffer.column_bool(col_name, true)?;
863    /// # Ok(())
864    /// # }
865    /// ```
866    pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> Result<&mut Self>
867    where
868        N: TryInto<ColumnName<'a>>,
869        Error: From<N::Error>,
870    {
871        self.write_column_key(name)?;
872        self.output.push(if value { 't' } else { 'f' });
873        Ok(self)
874    }
875
876    /// Record an integer value for the given column.
877    ///
878    /// ```
879    /// # use questdb::Result;
880    /// # use questdb::ingress::Buffer;
881    /// # fn main() -> Result<()> {
882    /// # let mut buffer = Buffer::new();
883    /// # buffer.table("x")?;
884    /// buffer.column_i64("col_name", 42)?;
885    /// # Ok(())
886    /// # }
887    /// ```
888    ///
889    /// or
890    ///
891    /// ```
892    /// # use questdb::Result;
893    /// # use questdb::ingress::Buffer;
894    /// use questdb::ingress::ColumnName;
895    ///
896    /// # fn main() -> Result<()> {
897    /// # let mut buffer = Buffer::new();
898    /// # buffer.table("x")?;
899    /// let col_name = ColumnName::new("col_name")?;
900    /// buffer.column_i64(col_name, 42);
901    /// # Ok(())
902    /// # }
903    /// ```
904    pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> Result<&mut Self>
905    where
906        N: TryInto<ColumnName<'a>>,
907        Error: From<N::Error>,
908    {
909        self.write_column_key(name)?;
910        let mut buf = itoa::Buffer::new();
911        let printed = buf.format(value);
912        self.output.push_str(printed);
913        self.output.push('i');
914        Ok(self)
915    }
916
917    /// Record a floating point value for the given column.
918    ///
919    /// ```
920    /// # use questdb::Result;
921    /// # use questdb::ingress::Buffer;
922    /// # fn main() -> Result<()> {
923    /// # let mut buffer = Buffer::new();
924    /// # buffer.table("x")?;
925    /// buffer.column_f64("col_name", 3.14)?;
926    /// # Ok(())
927    /// # }
928    /// ```
929    ///
930    /// or
931    ///
932    /// ```
933    /// # use questdb::Result;
934    /// # use questdb::ingress::Buffer;
935    /// use questdb::ingress::ColumnName;
936    ///
937    /// # fn main() -> Result<()> {
938    /// # let mut buffer = Buffer::new();
939    /// # buffer.table("x")?;
940    /// let col_name = ColumnName::new("col_name")?;
941    /// buffer.column_f64(col_name, 3.14)?;
942    /// # Ok(())
943    /// # }
944    /// ```
945    pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> Result<&mut Self>
946    where
947        N: TryInto<ColumnName<'a>>,
948        Error: From<N::Error>,
949    {
950        self.write_column_key(name)?;
951        let mut ser = F64Serializer::new(value);
952        self.output.push_str(ser.as_str());
953        Ok(self)
954    }
955
956    /// Record a string value for the given column.
957    ///
958    /// ```
959    /// # use questdb::Result;
960    /// # use questdb::ingress::Buffer;
961    /// # fn main() -> Result<()> {
962    /// # let mut buffer = Buffer::new();
963    /// # buffer.table("x")?;
964    /// buffer.column_str("col_name", "value")?;
965    /// # Ok(())
966    /// # }
967    /// ```
968    ///
969    /// or
970    ///
971    /// ```
972    /// # use questdb::Result;
973    /// # use questdb::ingress::Buffer;
974    /// # fn main() -> Result<()> {
975    /// # let mut buffer = Buffer::new();
976    /// # buffer.table("x")?;
977    /// let value: String = "value".to_owned();
978    /// buffer.column_str("col_name", value)?;
979    /// # Ok(())
980    /// # }
981    /// ```
982    ///
983    /// or
984    ///
985    /// ```
986    /// # use questdb::Result;
987    /// # use questdb::ingress::Buffer;
988    /// use questdb::ingress::ColumnName;
989    ///
990    /// # fn main() -> Result<()> {
991    /// # let mut buffer = Buffer::new();
992    /// # buffer.table("x")?;
993    /// let col_name = ColumnName::new("col_name")?;
994    /// buffer.column_str(col_name, "value")?;
995    /// # Ok(())
996    /// # }
997    /// ```
998    pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
999    where
1000        N: TryInto<ColumnName<'a>>,
1001        S: AsRef<str>,
1002        Error: From<N::Error>,
1003    {
1004        self.write_column_key(name)?;
1005        write_escaped_quoted(&mut self.output, value.as_ref());
1006        Ok(self)
1007    }
1008
1009    /// Record a timestamp value for the given column.
1010    ///
1011    /// ```
1012    /// # use questdb::Result;
1013    /// # use questdb::ingress::Buffer;
1014    /// use questdb::ingress::TimestampMicros;
1015    /// # fn main() -> Result<()> {
1016    /// # let mut buffer = Buffer::new();
1017    /// # buffer.table("x")?;
1018    /// buffer.column_ts("col_name", TimestampMicros::now())?;
1019    /// # Ok(())
1020    /// # }
1021    /// ```
1022    ///
1023    /// or
1024    ///
1025    /// ```
1026    /// # use questdb::Result;
1027    /// # use questdb::ingress::Buffer;
1028    /// use questdb::ingress::TimestampMicros;
1029    ///
1030    /// # fn main() -> Result<()> {
1031    /// # let mut buffer = Buffer::new();
1032    /// # buffer.table("x")?;
1033    /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1034    /// # Ok(())
1035    /// # }
1036    /// ```
1037    ///
1038    /// or
1039    ///
1040    /// ```
1041    /// # use questdb::Result;
1042    /// # use questdb::ingress::Buffer;
1043    /// use questdb::ingress::TimestampMicros;
1044    /// use questdb::ingress::ColumnName;
1045    ///
1046    /// # fn main() -> Result<()> {
1047    /// # let mut buffer = Buffer::new();
1048    /// # buffer.table("x")?;
1049    /// let col_name = ColumnName::new("col_name")?;
1050    /// buffer.column_ts(col_name, TimestampMicros::now())?;
1051    /// # Ok(())
1052    /// # }
1053    /// ```
1054    ///
1055    /// or you can also pass in a `TimestampNanos`.
1056    ///
1057    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1058    /// easily from either `chrono::DateTime` and `std::time::SystemTime`.
1059    ///
1060    /// This last option requires the `chrono_timestamp` feature.
1061    pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> Result<&mut Self>
1062    where
1063        N: TryInto<ColumnName<'a>>,
1064        T: TryInto<Timestamp>,
1065        Error: From<N::Error>,
1066        Error: From<T::Error>,
1067    {
1068        self.write_column_key(name)?;
1069        let timestamp: Timestamp = value.try_into()?;
1070        let timestamp: TimestampMicros = timestamp.try_into()?;
1071        let mut buf = itoa::Buffer::new();
1072        let printed = buf.format(timestamp.as_i64());
1073        self.output.push_str(printed);
1074        self.output.push('t');
1075        Ok(self)
1076    }
1077
1078    /// Complete the current row with the designated timestamp. After this call, you can
1079    /// start recording the next row by calling [Buffer::table] again, or  you can send
1080    /// the accumulated batch by calling [Sender::flush] or one of its variants.
1081    ///
1082    /// ```
1083    /// # use questdb::Result;
1084    /// # use questdb::ingress::Buffer;
1085    /// use questdb::ingress::TimestampNanos;
1086    /// # fn main() -> Result<()> {
1087    /// # let mut buffer = Buffer::new();
1088    /// # buffer.table("x")?.symbol("a", "b")?;
1089    /// buffer.at(TimestampNanos::now())?;
1090    /// # Ok(())
1091    /// # }
1092    /// ```
1093    ///
1094    /// or
1095    ///
1096    /// ```
1097    /// # use questdb::Result;
1098    /// # use questdb::ingress::Buffer;
1099    /// use questdb::ingress::TimestampNanos;
1100    ///
1101    /// # fn main() -> Result<()> {
1102    /// # let mut buffer = Buffer::new();
1103    /// # buffer.table("x")?.symbol("a", "b")?;
1104    /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1105    /// # Ok(())
1106    /// # }
1107    /// ```
1108    ///
1109    /// You can also pass in a `TimestampMicros`.
1110    ///
1111    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1112    /// easily from either `chrono::DateTime` and `std::time::SystemTime`.
1113    ///
1114    pub fn at<T>(&mut self, timestamp: T) -> Result<()>
1115    where
1116        T: TryInto<Timestamp>,
1117        Error: From<T::Error>,
1118    {
1119        self.check_op(Op::At)?;
1120        let timestamp: Timestamp = timestamp.try_into()?;
1121
1122        // https://github.com/rust-lang/rust/issues/115880
1123        let timestamp: Result<TimestampNanos> = timestamp.try_into();
1124        let timestamp: TimestampNanos = timestamp?;
1125
1126        let epoch_nanos = timestamp.as_i64();
1127        if epoch_nanos < 0 {
1128            return Err(error::fmt!(
1129                InvalidTimestamp,
1130                "Timestamp {} is negative. It must be >= 0.",
1131                epoch_nanos
1132            ));
1133        }
1134        let mut buf = itoa::Buffer::new();
1135        let printed = buf.format(epoch_nanos);
1136        self.output.push(' ');
1137        self.output.push_str(printed);
1138        self.output.push('\n');
1139        self.state.op_case = OpCase::MayFlushOrTable;
1140        self.state.row_count += 1;
1141        Ok(())
1142    }
1143
1144    /// Complete the current row without providing a timestamp. The QuestDB instance
1145    /// will insert its own timestamp.
1146    ///
1147    /// Letting the server assign the timestamp can be faster since it reliably avoids
1148    /// out-of-order operations in the database for maximum ingestion throughput. However,
1149    /// it removes the ability to deduplicate rows.
1150    ///
1151    /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1152    /// server will set the timestamp only after receiving the row. If you're flushing
1153    /// infrequently, the server-assigned timestamp may be significantly behind the
1154    /// time the data was recorded in the buffer.
1155    ///
1156    /// In almost all cases, you should prefer the [Buffer::at] function.
1157    ///
1158    /// After this call, you can start recording the next row by calling [Buffer::table]
1159    /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1160    /// its variants.
1161    ///
1162    /// ```
1163    /// # use questdb::Result;
1164    /// # use questdb::ingress::Buffer;
1165    /// # fn main() -> Result<()> {
1166    /// # let mut buffer = Buffer::new();
1167    /// # buffer.table("x")?.symbol("a", "b")?;
1168    /// buffer.at_now()?;
1169    /// # Ok(())
1170    /// # }
1171    /// ```
1172    pub fn at_now(&mut self) -> Result<()> {
1173        self.check_op(Op::At)?;
1174        self.output.push('\n');
1175        self.state.op_case = OpCase::MayFlushOrTable;
1176        self.state.row_count += 1;
1177        Ok(())
1178    }
1179}
1180
1181impl Default for Buffer {
1182    fn default() -> Self {
1183        Self::new()
1184    }
1185}
1186
1187/// Connects to a QuestDB instance and inserts data via the ILP protocol.
1188///
1189/// * To construct an instance, use [`Sender::from_conf`] or the [`SenderBuilder`].
1190/// * To prepare messages, use [`Buffer`] objects.
1191/// * To send messages, call the [`flush`](Sender::flush) method.
1192pub struct Sender {
1193    descr: String,
1194    handler: ProtocolHandler,
1195    connected: bool,
1196    max_buf_size: usize,
1197}
1198
1199impl std::fmt::Debug for Sender {
1200    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1201        f.write_str(self.descr.as_str())
1202    }
1203}
1204
1205#[derive(PartialEq, Debug, Clone)]
1206struct EcdsaAuthParams {
1207    key_id: String,
1208    priv_key: String,
1209    pub_key_x: String,
1210    pub_key_y: String,
1211}
1212
1213#[derive(PartialEq, Debug, Clone)]
1214enum AuthParams {
1215    Ecdsa(EcdsaAuthParams),
1216
1217    #[cfg(feature = "ilp-over-http")]
1218    Basic(BasicAuthParams),
1219
1220    #[cfg(feature = "ilp-over-http")]
1221    Token(TokenAuthParams),
1222}
1223
1224/// Possible sources of the root certificates used to validate the server's TLS
1225/// certificate.
1226#[derive(PartialEq, Debug, Clone, Copy)]
1227pub enum CertificateAuthority {
1228    /// Use the root certificates provided by the
1229    /// [`webpki-roots`](https://crates.io/crates/webpki-roots) crate.
1230    #[cfg(feature = "tls-webpki-certs")]
1231    WebpkiRoots,
1232
1233    /// Use the root certificates provided by the OS
1234    #[cfg(feature = "tls-native-certs")]
1235    OsRoots,
1236
1237    /// Combine the root certificates provided by the OS and the `webpki-roots` crate.
1238    #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1239    WebpkiAndOsRoots,
1240
1241    /// Use the root certificates provided in a PEM-encoded file.
1242    PemFile,
1243}
1244
1245/// A `u16` port number or `String` port service name as is registered with
1246/// `/etc/services` or equivalent.
1247///
1248/// ```
1249/// use questdb::ingress::Port;
1250/// use std::convert::Into;
1251///
1252/// let service: Port = 9009.into();
1253/// ```
1254///
1255/// or
1256///
1257/// ```
1258/// use questdb::ingress::Port;
1259/// use std::convert::Into;
1260///
1261/// // Assuming the service name is registered.
1262/// let service: Port = "qdb_ilp".into();  // or with a String too.
1263/// ```
1264pub struct Port(String);
1265
1266impl From<String> for Port {
1267    fn from(s: String) -> Self {
1268        Port(s)
1269    }
1270}
1271
1272impl From<&str> for Port {
1273    fn from(s: &str) -> Self {
1274        Port(s.to_owned())
1275    }
1276}
1277
1278impl From<u16> for Port {
1279    fn from(p: u16) -> Self {
1280        Port(p.to_string())
1281    }
1282}
1283
1284#[cfg(feature = "insecure-skip-verify")]
1285mod danger {
1286    use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
1287    use rustls::{DigitallySignedStruct, Error, SignatureScheme};
1288    use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
1289
1290    #[derive(Debug)]
1291    pub struct NoCertificateVerification {}
1292
1293    impl ServerCertVerifier for NoCertificateVerification {
1294        fn verify_server_cert(
1295            &self,
1296            _end_entity: &CertificateDer<'_>,
1297            _intermediates: &[CertificateDer<'_>],
1298            _server_name: &ServerName<'_>,
1299            _ocsp_response: &[u8],
1300            _now: UnixTime,
1301        ) -> Result<ServerCertVerified, Error> {
1302            Ok(ServerCertVerified::assertion())
1303        }
1304
1305        fn verify_tls12_signature(
1306            &self,
1307            _message: &[u8],
1308            _cert: &CertificateDer<'_>,
1309            _dss: &DigitallySignedStruct,
1310        ) -> Result<HandshakeSignatureValid, Error> {
1311            Ok(HandshakeSignatureValid::assertion())
1312        }
1313
1314        fn verify_tls13_signature(
1315            &self,
1316            _message: &[u8],
1317            _cert: &CertificateDer<'_>,
1318            _dss: &DigitallySignedStruct,
1319        ) -> Result<HandshakeSignatureValid, Error> {
1320            Ok(HandshakeSignatureValid::assertion())
1321        }
1322
1323        fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
1324            rustls::crypto::ring::default_provider()
1325                .signature_verification_algorithms
1326                .supported_schemes()
1327        }
1328    }
1329}
1330
1331#[cfg(feature = "tls-webpki-certs")]
1332fn add_webpki_roots(root_store: &mut RootCertStore) {
1333    root_store
1334        .roots
1335        .extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned())
1336}
1337
1338#[cfg(feature = "tls-native-certs")]
1339fn add_os_roots(root_store: &mut RootCertStore) -> Result<()> {
1340    let os_certs = rustls_native_certs::load_native_certs().map_err(|io_err| {
1341        error::fmt!(
1342            TlsError,
1343            "Could not load OS native TLS certificates: {}",
1344            io_err
1345        )
1346    })?;
1347
1348    let (valid_count, invalid_count) = root_store.add_parsable_certificates(os_certs);
1349    if valid_count == 0 && invalid_count > 0 {
1350        return Err(error::fmt!(
1351            TlsError,
1352            "No valid certificates found in native root store ({} found but were invalid)",
1353            invalid_count
1354        ));
1355    }
1356    Ok(())
1357}
1358
1359fn configure_tls(
1360    tls_enabled: bool,
1361    tls_verify: bool,
1362    tls_ca: CertificateAuthority,
1363    tls_roots: &Option<PathBuf>,
1364) -> Result<Option<Arc<rustls::ClientConfig>>> {
1365    if !tls_enabled {
1366        return Ok(None);
1367    }
1368
1369    let mut root_store = RootCertStore::empty();
1370    if tls_verify {
1371        match (tls_ca, tls_roots) {
1372            #[cfg(feature = "tls-webpki-certs")]
1373            (CertificateAuthority::WebpkiRoots, None) => {
1374                add_webpki_roots(&mut root_store);
1375            }
1376
1377            #[cfg(feature = "tls-webpki-certs")]
1378            (CertificateAuthority::WebpkiRoots, Some(_)) => {
1379                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_roots\"."));
1380            }
1381
1382            #[cfg(feature = "tls-native-certs")]
1383            (CertificateAuthority::OsRoots, None) => {
1384                add_os_roots(&mut root_store)?;
1385            }
1386
1387            #[cfg(feature = "tls-native-certs")]
1388            (CertificateAuthority::OsRoots, Some(_)) => {
1389                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"os_roots\"."));
1390            }
1391
1392            #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1393            (CertificateAuthority::WebpkiAndOsRoots, None) => {
1394                add_webpki_roots(&mut root_store);
1395                add_os_roots(&mut root_store)?;
1396            }
1397
1398            #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1399            (CertificateAuthority::WebpkiAndOsRoots, Some(_)) => {
1400                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_and_os_roots\"."));
1401            }
1402
1403            (CertificateAuthority::PemFile, Some(ca_file)) => {
1404                let certfile = std::fs::File::open(ca_file).map_err(|io_err| {
1405                    error::fmt!(
1406                        TlsError,
1407                        concat!(
1408                            "Could not open tls_roots certificate authority ",
1409                            "file from path {:?}: {}"
1410                        ),
1411                        ca_file,
1412                        io_err
1413                    )
1414                })?;
1415                let mut reader = BufReader::new(certfile);
1416                let der_certs = rustls_pemfile::certs(&mut reader)
1417                    .collect::<std::result::Result<Vec<_>, _>>()
1418                    .map_err(|io_err| {
1419                        error::fmt!(
1420                            TlsError,
1421                            concat!(
1422                                "Could not read certificate authority ",
1423                                "file from path {:?}: {}"
1424                            ),
1425                            ca_file,
1426                            io_err
1427                        )
1428                    })?;
1429                root_store.add_parsable_certificates(der_certs);
1430            }
1431
1432            (CertificateAuthority::PemFile, None) => {
1433                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" is required when \"tls_ca\" is set to \"pem_file\"."));
1434            }
1435        }
1436    }
1437
1438    let mut config = rustls::ClientConfig::builder()
1439        .with_root_certificates(root_store)
1440        .with_no_client_auth();
1441
1442    // TLS log file for debugging.
1443    // Set the SSLKEYLOGFILE env variable to a writable location.
1444    config.key_log = Arc::new(rustls::KeyLogFile::new());
1445
1446    #[cfg(feature = "insecure-skip-verify")]
1447    if !tls_verify {
1448        config
1449            .dangerous()
1450            .set_certificate_verifier(Arc::new(danger::NoCertificateVerification {}));
1451    }
1452
1453    Ok(Some(Arc::new(config)))
1454}
1455
1456fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
1457    if let Some(auto_flush) = params.get("auto_flush") {
1458        if auto_flush.as_str() != "off" {
1459            return Err(error::fmt!(
1460                ConfigError,
1461                "Invalid auto_flush value '{auto_flush}'. This client does not \
1462                support auto-flush, so the only accepted value is 'off'"
1463            ));
1464        }
1465    }
1466
1467    for &param in ["auto_flush_rows", "auto_flush_bytes"].iter() {
1468        if params.contains_key(param) {
1469            return Err(error::fmt!(
1470                ConfigError,
1471                "Invalid configuration parameter {:?}. This client does not support auto-flush",
1472                param
1473            ));
1474        }
1475    }
1476    Ok(())
1477}
1478
1479/// Protocol used to communicate with the QuestDB server.
1480#[derive(PartialEq, Debug, Clone, Copy)]
1481pub enum Protocol {
1482    /// ILP over TCP (streaming).
1483    Tcp,
1484
1485    /// TCP + TLS
1486    Tcps,
1487
1488    #[cfg(feature = "ilp-over-http")]
1489    /// ILP over HTTP (request-response, InfluxDB-compatible).
1490    Http,
1491
1492    #[cfg(feature = "ilp-over-http")]
1493    /// HTTP + TLS
1494    Https,
1495}
1496
1497impl Display for Protocol {
1498    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1499        f.write_str(self.schema())
1500    }
1501}
1502
1503impl Protocol {
1504    fn default_port(&self) -> &str {
1505        match self {
1506            Protocol::Tcp | Protocol::Tcps => "9009",
1507            #[cfg(feature = "ilp-over-http")]
1508            Protocol::Http | Protocol::Https => "9000",
1509        }
1510    }
1511
1512    fn tls_enabled(&self) -> bool {
1513        match self {
1514            Protocol::Tcp => false,
1515            Protocol::Tcps => true,
1516            #[cfg(feature = "ilp-over-http")]
1517            Protocol::Http => false,
1518            #[cfg(feature = "ilp-over-http")]
1519            Protocol::Https => true,
1520        }
1521    }
1522
1523    fn is_tcpx(&self) -> bool {
1524        match self {
1525            Protocol::Tcp => true,
1526            Protocol::Tcps => true,
1527            #[cfg(feature = "ilp-over-http")]
1528            Protocol::Http => false,
1529            #[cfg(feature = "ilp-over-http")]
1530            Protocol::Https => false,
1531        }
1532    }
1533
1534    #[cfg(feature = "ilp-over-http")]
1535    fn is_httpx(&self) -> bool {
1536        match self {
1537            Protocol::Tcp => false,
1538            Protocol::Tcps => false,
1539            Protocol::Http => true,
1540            Protocol::Https => true,
1541        }
1542    }
1543
1544    fn schema(&self) -> &str {
1545        match self {
1546            Protocol::Tcp => "tcp",
1547            Protocol::Tcps => "tcps",
1548            #[cfg(feature = "ilp-over-http")]
1549            Protocol::Http => "http",
1550            #[cfg(feature = "ilp-over-http")]
1551            Protocol::Https => "https",
1552        }
1553    }
1554
1555    fn from_schema(schema: &str) -> Result<Self> {
1556        match schema {
1557            "tcp" => Ok(Protocol::Tcp),
1558            "tcps" => Ok(Protocol::Tcps),
1559            #[cfg(feature = "ilp-over-http")]
1560            "http" => Ok(Protocol::Http),
1561            #[cfg(feature = "ilp-over-http")]
1562            "https" => Ok(Protocol::Https),
1563            _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
1564        }
1565    }
1566}
1567
1568/// Accumulates parameters for a new `Sender` instance.
1569///
1570/// You can also create the builder from a config string or the `QDB_CLIENT_CONF`
1571/// environment variable.
1572///
1573#[cfg_attr(
1574    feature = "ilp-over-http",
1575    doc = r##"
1576```no_run
1577# use questdb::Result;
1578use questdb::ingress::{Protocol, SenderBuilder};
1579# fn main() -> Result<()> {
1580let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9009).build()?;
1581# Ok(())
1582# }
1583```
1584"##
1585)]
1586///
1587/// ```no_run
1588/// # use questdb::Result;
1589/// use questdb::ingress::{Protocol, SenderBuilder};
1590///
1591/// # fn main() -> Result<()> {
1592/// let mut sender = SenderBuilder::new(Protocol::Tcp, "localhost", 9009).build()?;
1593/// # Ok(())
1594/// # }
1595/// ```
1596///
1597/// ```no_run
1598/// # use questdb::Result;
1599/// use questdb::ingress::SenderBuilder;
1600///
1601/// # fn main() -> Result<()> {
1602/// let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1603/// # Ok(())
1604/// # }
1605/// ```
1606///
1607/// ```no_run
1608/// # use questdb::Result;
1609/// use questdb::ingress::SenderBuilder;
1610///
1611/// # fn main() -> Result<()> {
1612/// // export QDB_CLIENT_CONF="https::addr=localhost:9000;"
1613/// let mut sender = SenderBuilder::from_env()?.build()?;
1614/// # Ok(())
1615/// # }
1616/// ```
1617///
1618#[derive(Debug, Clone)]
1619pub struct SenderBuilder {
1620    protocol: Protocol,
1621    host: ConfigSetting<String>,
1622    port: ConfigSetting<String>,
1623    net_interface: ConfigSetting<Option<String>>,
1624    max_buf_size: ConfigSetting<usize>,
1625    auth_timeout: ConfigSetting<Duration>,
1626    username: ConfigSetting<Option<String>>,
1627    password: ConfigSetting<Option<String>>,
1628    token: ConfigSetting<Option<String>>,
1629    token_x: ConfigSetting<Option<String>>,
1630    token_y: ConfigSetting<Option<String>>,
1631
1632    #[cfg(feature = "insecure-skip-verify")]
1633    tls_verify: ConfigSetting<bool>,
1634
1635    tls_ca: ConfigSetting<CertificateAuthority>,
1636    tls_roots: ConfigSetting<Option<PathBuf>>,
1637
1638    #[cfg(feature = "ilp-over-http")]
1639    http: Option<HttpConfig>,
1640}
1641
1642impl SenderBuilder {
1643    /// Create a new `SenderBuilder` instance from the configuration string.
1644    ///
1645    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
1646    ///
1647    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
1648    ///
1649    /// We recommend HTTP for most cases because it provides more features, like
1650    /// reporting errors to the client and supporting transaction control. TCP can
1651    /// sometimes be faster in higher-latency networks, but misses a number of
1652    /// features.
1653    ///
1654    /// The accepted keys match one-for-one with the methods on `SenderBuilder`.
1655    /// For example, this is a valid configuration string:
1656    ///
1657    /// "https::addr=host:port;username=alice;password=secret;"
1658    ///
1659    /// and there are matching methods [SenderBuilder::username] and
1660    /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the
1661    /// `SenderBuilder` constructor, so there's no matching method for that.
1662    ///
1663    /// You can also load the configuration from an environment variable. See
1664    /// [`SenderBuilder::from_env`].
1665    ///
1666    /// Once you have a `SenderBuilder` instance, you can further customize it
1667    /// before calling [`SenderBuilder::build`], but you can't change any settings
1668    /// that are already set in the config string.
1669    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
1670        let conf = conf.as_ref();
1671        let conf = questdb_confstr::parse_conf_str(conf)
1672            .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
1673        let service = conf.service();
1674        let params = conf.params();
1675
1676        let protocol = Protocol::from_schema(service)?;
1677
1678        let Some(addr) = params.get("addr") else {
1679            return Err(error::fmt!(
1680                ConfigError,
1681                "Missing \"addr\" parameter in config string"
1682            ));
1683        };
1684        let (host, port) = match addr.split_once(':') {
1685            Some((h, p)) => (h, p),
1686            None => (addr.as_str(), protocol.default_port()),
1687        };
1688        let mut builder = SenderBuilder::new(protocol, host, port);
1689
1690        validate_auto_flush_params(params)?;
1691
1692        for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
1693            builder = match key {
1694                "username" => builder.username(val)?,
1695                "password" => builder.password(val)?,
1696                "token" => builder.token(val)?,
1697                "token_x" => builder.token_x(val)?,
1698                "token_y" => builder.token_y(val)?,
1699                "bind_interface" => builder.bind_interface(val)?,
1700
1701                "init_buf_size" => {
1702                    return Err(error::fmt!(
1703                        ConfigError,
1704                        "\"init_buf_size\" is not supported in config string"
1705                    ))
1706                }
1707
1708                "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
1709
1710                "auth_timeout" => {
1711                    builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1712                }
1713
1714                "tls_verify" => {
1715                    let verify = match val {
1716                        "on" => true,
1717                        "unsafe_off" => false,
1718                        _ => {
1719                            return Err(error::fmt!(
1720                                ConfigError,
1721                                r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
1722                            ))
1723                        }
1724                    };
1725
1726                    #[cfg(not(feature = "insecure-skip-verify"))]
1727                    {
1728                        if !verify {
1729                            return Err(error::fmt!(
1730                                ConfigError,
1731                                r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
1732                            ));
1733                        }
1734                        builder
1735                    }
1736
1737                    #[cfg(feature = "insecure-skip-verify")]
1738                    builder.tls_verify(verify)?
1739                }
1740
1741                "tls_ca" => {
1742                    let ca = match val {
1743                        #[cfg(feature = "tls-webpki-certs")]
1744                        "webpki_roots" => CertificateAuthority::WebpkiRoots,
1745
1746                        #[cfg(not(feature = "tls-webpki-certs"))]
1747                        "webpki_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature")),
1748
1749                        #[cfg(feature = "tls-native-certs")]
1750                        "os_roots" => CertificateAuthority::OsRoots,
1751
1752                        #[cfg(not(feature = "tls-native-certs"))]
1753                        "os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature")),
1754
1755                        #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1756                        "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
1757
1758                        #[cfg(not(all(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1759                        "webpki_and_os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features")),
1760
1761                        _ => return Err(error::fmt!(ConfigError, "Invalid value {val:?} for \"tls_ca\"")),
1762                    };
1763                    builder.tls_ca(ca)?
1764                }
1765
1766                "tls_roots" => {
1767                    let path = PathBuf::from_str(val).map_err(|e| {
1768                        error::fmt!(
1769                            ConfigError,
1770                            "Invalid path {:?} for \"tls_roots\": {}",
1771                            val,
1772                            e
1773                        )
1774                    })?;
1775                    builder.tls_roots(path)?
1776                }
1777
1778                "tls_roots_password" => {
1779                    return Err(error::fmt!(
1780                        ConfigError,
1781                        "\"tls_roots_password\" is not supported."
1782                    ))
1783                }
1784
1785                #[cfg(feature = "ilp-over-http")]
1786                "request_min_throughput" => {
1787                    builder.request_min_throughput(parse_conf_value(key, val)?)?
1788                }
1789
1790                #[cfg(feature = "ilp-over-http")]
1791                "request_timeout" => {
1792                    builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1793                }
1794
1795                #[cfg(feature = "ilp-over-http")]
1796                "retry_timeout" => {
1797                    builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1798                }
1799                // Ignore other parameters.
1800                // We don't want to fail on unknown keys as this would require releasing different
1801                // library implementations in lock step as soon as a new parameter is added to any of them,
1802                // even if it's not used.
1803                _ => builder,
1804            };
1805        }
1806
1807        Ok(builder)
1808    }
1809
1810    /// Create a new `SenderBuilder` instance from the configuration from the
1811    /// configuration stored in the `QDB_CLIENT_CONF` environment variable.
1812    ///
1813    /// The format of the string is the same as for [`SenderBuilder::from_conf`].
1814    pub fn from_env() -> Result<Self> {
1815        let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
1816            error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
1817        })?;
1818        Self::from_conf(conf)
1819    }
1820
1821    /// Create a new `SenderBuilder` instance with the provided QuestDB
1822    /// server and port, using ILP over the specified protocol.
1823    ///
1824    /// ```no_run
1825    /// # use questdb::Result;
1826    /// use questdb::ingress::{Protocol, SenderBuilder};
1827    ///
1828    /// # fn main() -> Result<()> {
1829    /// let mut sender = SenderBuilder::new(
1830    ///     Protocol::Tcp, "localhost", 9009).build()?;
1831    /// # Ok(())
1832    /// # }
1833    /// ```
1834    pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
1835        let host = host.into();
1836        let port: Port = port.into();
1837        let port = port.0;
1838
1839        #[cfg(feature = "tls-webpki-certs")]
1840        let tls_ca = CertificateAuthority::WebpkiRoots;
1841
1842        #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
1843        let tls_ca = CertificateAuthority::OsRoots;
1844
1845        #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1846        let tls_ca = CertificateAuthority::PemFile;
1847
1848        Self {
1849            protocol,
1850            host: ConfigSetting::new_specified(host),
1851            port: ConfigSetting::new_specified(port),
1852            net_interface: ConfigSetting::new_default(None),
1853            max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
1854            auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
1855            username: ConfigSetting::new_default(None),
1856            password: ConfigSetting::new_default(None),
1857            token: ConfigSetting::new_default(None),
1858            token_x: ConfigSetting::new_default(None),
1859            token_y: ConfigSetting::new_default(None),
1860
1861            #[cfg(feature = "insecure-skip-verify")]
1862            tls_verify: ConfigSetting::new_default(true),
1863
1864            tls_ca: ConfigSetting::new_default(tls_ca),
1865            tls_roots: ConfigSetting::new_default(None),
1866
1867            #[cfg(feature = "ilp-over-http")]
1868            http: if protocol.is_httpx() {
1869                Some(HttpConfig::default())
1870            } else {
1871                None
1872            },
1873        }
1874    }
1875
1876    /// Select local outbound interface.
1877    ///
1878    /// This may be relevant if your machine has multiple network interfaces.
1879    ///
1880    /// The default is `"0.0.0.0"`.
1881    pub fn bind_interface<I: Into<String>>(mut self, addr: I) -> Result<Self> {
1882        self.ensure_is_tcpx("bind_interface")?;
1883        self.net_interface
1884            .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
1885        Ok(self)
1886    }
1887
1888    /// Set the username for authentication.
1889    ///
1890    /// For TCP, this is the `kid` part of the ECDSA key set.
1891    /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x),
1892    /// and [`token_y`](SenderBuilder::token_y).
1893    ///
1894    /// For HTTP, this is a part of basic authentication.
1895    /// See also: [`password`](SenderBuilder::password).
1896    pub fn username(mut self, username: &str) -> Result<Self> {
1897        self.username
1898            .set_specified("username", Some(validate_value(username.to_string())?))?;
1899        Ok(self)
1900    }
1901
1902    /// Set the password for basic HTTP authentication.
1903    /// See also: [`username`](SenderBuilder::username).
1904    pub fn password(mut self, password: &str) -> Result<Self> {
1905        self.password
1906            .set_specified("password", Some(validate_value(password.to_string())?))?;
1907        Ok(self)
1908    }
1909
1910    /// Set the Token (Bearer) Authentication parameter for HTTP,
1911    /// or the ECDSA private key for TCP authentication.
1912    pub fn token(mut self, token: &str) -> Result<Self> {
1913        self.token
1914            .set_specified("token", Some(validate_value(token.to_string())?))?;
1915        Ok(self)
1916    }
1917
1918    /// Set the ECDSA public key X for TCP authentication.
1919    pub fn token_x(mut self, token_x: &str) -> Result<Self> {
1920        self.token_x
1921            .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
1922        Ok(self)
1923    }
1924
1925    /// Set the ECDSA public key Y for TCP authentication.
1926    pub fn token_y(mut self, token_y: &str) -> Result<Self> {
1927        self.token_y
1928            .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
1929        Ok(self)
1930    }
1931
1932    /// Configure how long to wait for messages from the QuestDB server during
1933    /// the TLS handshake and authentication process. This only applies to TCP.
1934    /// The default is 15 seconds.
1935    pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
1936        self.auth_timeout.set_specified("auth_timeout", value)?;
1937        Ok(self)
1938    }
1939
1940    /// Ensure that TLS is enabled for the protocol.
1941    pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
1942        if !self.protocol.tls_enabled() {
1943            return Err(error::fmt!(
1944                ConfigError,
1945                "Cannot set {property:?}: TLS is not supported for protocol {}",
1946                self.protocol
1947            ));
1948        }
1949        Ok(())
1950    }
1951
1952    /// Set to `false` to disable TLS certificate verification.
1953    /// This should only be used for debugging purposes as it reduces security.
1954    ///
1955    /// For testing, consider specifying a path to a `.pem` file instead via
1956    /// the [`tls_roots`](SenderBuilder::tls_roots) method.
1957    #[cfg(feature = "insecure-skip-verify")]
1958    pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
1959        self.ensure_tls_enabled("tls_verify")?;
1960        self.tls_verify.set_specified("tls_verify", verify)?;
1961        Ok(self)
1962    }
1963
1964    /// Specify where to find the root certificate used to validate the
1965    /// server's TLS certificate.
1966    pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
1967        self.ensure_tls_enabled("tls_ca")?;
1968        self.tls_ca.set_specified("tls_ca", ca)?;
1969        Ok(self)
1970    }
1971
1972    /// Set the path to a custom root certificate `.pem` file.
1973    /// This is used to validate the server's certificate during the TLS handshake.
1974    ///
1975    /// See notes on how to test with [self-signed
1976    /// certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs).
1977    pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
1978        let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
1979        let path = path.into();
1980        // Attempt to read the file here to catch any issues early.
1981        let _file = std::fs::File::open(&path).map_err(|io_err| {
1982            error::fmt!(
1983                ConfigError,
1984                "Could not open root certificate file from path {:?}: {}",
1985                path,
1986                io_err
1987            )
1988        })?;
1989        builder.tls_roots.set_specified("tls_roots", Some(path))?;
1990        Ok(builder)
1991    }
1992
1993    /// The maximum buffer size in bytes that the client will flush to the server.
1994    /// The default is 100 MiB.
1995    pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
1996        let min = 1024;
1997        if value < min {
1998            return Err(error::fmt!(
1999                ConfigError,
2000                "max_buf_size\" must be at least {min} bytes."
2001            ));
2002        }
2003        self.max_buf_size.set_specified("max_buf_size", value)?;
2004        Ok(self)
2005    }
2006
2007    #[cfg(feature = "ilp-over-http")]
2008    /// Set the cumulative duration spent in retries.
2009    /// The value is in milliseconds, and the default is 10 seconds.
2010    pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
2011        if let Some(http) = &mut self.http {
2012            http.retry_timeout.set_specified("retry_timeout", value)?;
2013        } else {
2014            return Err(error::fmt!(
2015                ConfigError,
2016                "retry_timeout is supported only in ILP over HTTP."
2017            ));
2018        }
2019        Ok(self)
2020    }
2021
2022    #[cfg(feature = "ilp-over-http")]
2023    /// Set the minimum acceptable throughput while sending a buffer to the server.
2024    /// The sender will divide the payload size by this number to determine for how
2025    /// long to keep sending the payload before timing out.
2026    /// The value is in bytes per second, and the default is 100 KiB/s.
2027    /// The timeout calculated from minimum throughput is adedd to the value of
2028    /// [`request_timeout`](SenderBuilder::request_timeout) to get the total timeout
2029    /// value.
2030    /// A value of 0 disables this feature, so it's similar to setting "infinite"
2031    /// minimum throughput. The total timeout will then be equal to `request_timeout`.
2032    pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
2033        if let Some(http) = &mut self.http {
2034            http.request_min_throughput
2035                .set_specified("request_min_throughput", value)?;
2036        } else {
2037            return Err(error::fmt!(
2038                ConfigError,
2039                "\"request_min_throughput\" is supported only in ILP over HTTP."
2040            ));
2041        }
2042        Ok(self)
2043    }
2044
2045    #[cfg(feature = "ilp-over-http")]
2046    /// Additional time to wait on top of that calculated from the minimum throughput.
2047    /// This accounts for the fixed latency of the HTTP request-response roundtrip.
2048    /// The default is 10 seconds.
2049    /// See also: [`request_min_throughput`](SenderBuilder::request_min_throughput).
2050    pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
2051        if let Some(http) = &mut self.http {
2052            if value.is_zero() {
2053                return Err(error::fmt!(
2054                    ConfigError,
2055                    "\"request_timeout\" must be greater than 0."
2056                ));
2057            }
2058            http.request_timeout
2059                .set_specified("request_timeout", value)?;
2060        } else {
2061            return Err(error::fmt!(
2062                ConfigError,
2063                "\"request_timeout\" is supported only in ILP over HTTP."
2064            ));
2065        }
2066        Ok(self)
2067    }
2068
2069    #[cfg(feature = "ilp-over-http")]
2070    /// Internal API, do not use.
2071    /// This is exposed exclusively for the Python client.
2072    /// We (QuestDB) use this to help us debug which client is being used if we encounter issues.
2073    #[doc(hidden)]
2074    pub fn user_agent(mut self, value: &str) -> Result<Self> {
2075        let value = validate_value(value)?;
2076        if let Some(http) = &mut self.http {
2077            http.user_agent = value.to_string();
2078        }
2079        Ok(self)
2080    }
2081
2082    fn connect_tcp(&self, auth: &Option<AuthParams>) -> Result<ProtocolHandler> {
2083        let addr: SockAddr = gai::resolve_host_port(self.host.as_str(), self.port.as_str())?;
2084        let mut sock = Socket::new(Domain::IPV4, Type::STREAM, Some(SockProtocol::TCP))
2085            .map_err(|io_err| map_io_to_socket_err("Could not open TCP socket: ", io_err))?;
2086
2087        // See: https://idea.popcount.org/2014-04-03-bind-before-connect/
2088        // We set `SO_REUSEADDR` on the outbound socket to avoid issues where a client may exhaust
2089        // their interface's ports. See: https://github.com/questdb/py-questdb-client/issues/21
2090        sock.set_reuse_address(true)
2091            .map_err(|io_err| map_io_to_socket_err("Could not set SO_REUSEADDR: ", io_err))?;
2092
2093        sock.set_linger(Some(Duration::from_secs(120)))
2094            .map_err(|io_err| map_io_to_socket_err("Could not set socket linger: ", io_err))?;
2095        sock.set_keepalive(true)
2096            .map_err(|io_err| map_io_to_socket_err("Could not set SO_KEEPALIVE: ", io_err))?;
2097        sock.set_nodelay(true)
2098            .map_err(|io_err| map_io_to_socket_err("Could not set TCP_NODELAY: ", io_err))?;
2099        if let Some(ref host) = self.net_interface.deref() {
2100            let bind_addr = gai::resolve_host(host.as_str())?;
2101            sock.bind(&bind_addr).map_err(|io_err| {
2102                map_io_to_socket_err(
2103                    &format!("Could not bind to interface address {:?}: ", host),
2104                    io_err,
2105                )
2106            })?;
2107        }
2108        sock.connect(&addr).map_err(|io_err| {
2109            let host_port = format!("{}:{}", self.host.deref(), *self.port);
2110            let prefix = format!("Could not connect to {:?}: ", host_port);
2111            map_io_to_socket_err(&prefix, io_err)
2112        })?;
2113
2114        // We read during both TLS handshake and authentication.
2115        // We set up a read timeout to prevent the client from "hanging"
2116        // should we be connecting to a server configured in a different way
2117        // from the client.
2118        sock.set_read_timeout(Some(*self.auth_timeout))
2119            .map_err(|io_err| {
2120                map_io_to_socket_err("Failed to set read timeout on socket: ", io_err)
2121            })?;
2122
2123        #[cfg(feature = "insecure-skip-verify")]
2124        let tls_verify = *self.tls_verify;
2125
2126        #[cfg(not(feature = "insecure-skip-verify"))]
2127        let tls_verify = true;
2128
2129        let mut conn = match configure_tls(
2130            self.protocol.tls_enabled(),
2131            tls_verify,
2132            *self.tls_ca,
2133            self.tls_roots.deref(),
2134        )? {
2135            Some(tls_config) => {
2136                let server_name: ServerName = ServerName::try_from(self.host.as_str())
2137                    .map_err(|inv_dns_err| error::fmt!(TlsError, "Bad host: {}", inv_dns_err))?
2138                    .to_owned();
2139                let mut tls_conn =
2140                    ClientConnection::new(tls_config, server_name).map_err(|rustls_err| {
2141                        error::fmt!(TlsError, "Could not create TLS client: {}", rustls_err)
2142                    })?;
2143                while tls_conn.wants_write() || tls_conn.is_handshaking() {
2144                    tls_conn.complete_io(&mut sock).map_err(|io_err| {
2145                        if (io_err.kind() == ErrorKind::TimedOut)
2146                            || (io_err.kind() == ErrorKind::WouldBlock)
2147                        {
2148                            error::fmt!(
2149                                TlsError,
2150                                concat!(
2151                                    "Failed to complete TLS handshake:",
2152                                    " Timed out waiting for server ",
2153                                    "response after {:?}."
2154                                ),
2155                                *self.auth_timeout
2156                            )
2157                        } else {
2158                            error::fmt!(TlsError, "Failed to complete TLS handshake: {}", io_err)
2159                        }
2160                    })?;
2161                }
2162                Connection::Tls(StreamOwned::new(tls_conn, sock).into())
2163            }
2164            None => Connection::Direct(sock),
2165        };
2166
2167        if let Some(AuthParams::Ecdsa(auth)) = auth {
2168            conn.authenticate(auth)?;
2169        }
2170
2171        Ok(ProtocolHandler::Socket(conn))
2172    }
2173
2174    fn build_auth(&self) -> Result<Option<AuthParams>> {
2175        match (
2176            self.protocol,
2177            self.username.deref(),
2178            self.password.deref(),
2179            self.token.deref(),
2180            self.token_x.deref(),
2181            self.token_y.deref(),
2182        ) {
2183            (_, None, None, None, None, None) => Ok(None),
2184            (
2185                protocol,
2186                Some(username),
2187                None,
2188                Some(token),
2189                Some(token_x),
2190                Some(token_y),
2191            ) if protocol.is_tcpx() => Ok(Some(AuthParams::Ecdsa(EcdsaAuthParams {
2192                key_id: username.to_string(),
2193                priv_key: token.to_string(),
2194                pub_key_x: token_x.to_string(),
2195                pub_key_y: token_y.to_string(),
2196            }))),
2197            (protocol, Some(_username), Some(_password), None, None, None)
2198                if protocol.is_tcpx() => {
2199                Err(error::fmt!(ConfigError,
2200                    r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
2201                ))
2202            }
2203            (protocol, None, None, Some(_token), None, None)
2204                if protocol.is_tcpx() => {
2205                Err(error::fmt!(ConfigError, "Token authentication only be used with the ILP/HTTP protocol."))
2206            }
2207            (protocol, _username, None, _token, _token_x, _token_y)
2208                if protocol.is_tcpx() => {
2209                Err(error::fmt!(ConfigError,
2210                    r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
2211                ))
2212            }
2213            #[cfg(feature = "ilp-over-http")]
2214            (protocol, Some(username), Some(password), None, None, None)
2215                if protocol.is_httpx() => {
2216                Ok(Some(AuthParams::Basic(BasicAuthParams {
2217                    username: username.to_string(),
2218                    password: password.to_string(),
2219                })))
2220            }
2221            #[cfg(feature = "ilp-over-http")]
2222            (protocol, Some(_username), None, None, None, None)
2223                if protocol.is_httpx() => {
2224                Err(error::fmt!(ConfigError,
2225                    r##"Basic authentication parameter "username" is present, but "password" is missing."##,
2226                ))
2227            }
2228            #[cfg(feature = "ilp-over-http")]
2229            (protocol, None, Some(_password), None, None, None)
2230                if protocol.is_httpx() => {
2231                Err(error::fmt!(ConfigError,
2232                    r##"Basic authentication parameter "password" is present, but "username" is missing."##,
2233                ))
2234            }
2235            #[cfg(feature = "ilp-over-http")]
2236            (protocol, None, None, Some(token), None, None)
2237                if protocol.is_httpx() => {
2238                Ok(Some(AuthParams::Token(TokenAuthParams {
2239                    token: token.to_string(),
2240                })))
2241            }
2242            #[cfg(feature = "ilp-over-http")]
2243            (
2244                protocol,
2245                Some(_username),
2246                None,
2247                Some(_token),
2248                Some(_token_x),
2249                Some(_token_y),
2250            ) if protocol.is_httpx() => {
2251                Err(error::fmt!(ConfigError, "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."))
2252            }
2253            #[cfg(feature = "ilp-over-http")]
2254            (protocol, _username, _password, _token, None, None)
2255                if protocol.is_httpx() => {
2256                Err(error::fmt!(ConfigError,
2257                    r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
2258                ))
2259            }
2260            _ => {
2261                Err(error::fmt!(ConfigError,
2262                    r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
2263                ))
2264            }
2265        }
2266    }
2267
2268    /// Build the sender.
2269    ///
2270    /// In the case of TCP, this synchronously establishes the TCP connection, and
2271    /// returns once the connection is fully established. If the connection
2272    /// requires authentication or TLS, these will also be completed before
2273    /// returning.
2274    pub fn build(&self) -> Result<Sender> {
2275        let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
2276
2277        if self.protocol.tls_enabled() {
2278            write!(descr, "tls=enabled,").unwrap();
2279        } else {
2280            write!(descr, "tls=disabled,").unwrap();
2281        }
2282
2283        let auth = self.build_auth()?;
2284
2285        let handler = match self.protocol {
2286            Protocol::Tcp | Protocol::Tcps => self.connect_tcp(&auth)?,
2287            #[cfg(feature = "ilp-over-http")]
2288            Protocol::Http | Protocol::Https => {
2289                if self.net_interface.is_some() {
2290                    // See: https://github.com/algesten/ureq/issues/692
2291                    return Err(error::fmt!(
2292                        InvalidApiCall,
2293                        "net_interface is not supported for ILP over HTTP."
2294                    ));
2295                }
2296
2297                let http_config = self.http.as_ref().unwrap();
2298                let user_agent = http_config.user_agent.as_str();
2299                let agent_builder = ureq::AgentBuilder::new()
2300                    .user_agent(user_agent)
2301                    .no_delay(true);
2302
2303                #[cfg(feature = "insecure-skip-verify")]
2304                let tls_verify = *self.tls_verify;
2305
2306                #[cfg(not(feature = "insecure-skip-verify"))]
2307                let tls_verify = true;
2308
2309                let agent_builder = match configure_tls(
2310                    self.protocol.tls_enabled(),
2311                    tls_verify,
2312                    *self.tls_ca,
2313                    self.tls_roots.deref(),
2314                )? {
2315                    Some(tls_config) => agent_builder.tls_config(tls_config),
2316                    None => agent_builder,
2317                };
2318                let auth = match auth {
2319                    Some(AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
2320                    Some(AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
2321                    Some(AuthParams::Ecdsa(_)) => {
2322                        return Err(error::fmt!(
2323                            AuthError,
2324                            "ECDSA authentication is not supported for ILP over HTTP. \
2325                            Please use basic or token authentication instead."
2326                        ));
2327                    }
2328                    None => None,
2329                };
2330                let agent_builder =
2331                    agent_builder.timeout_connect(*http_config.request_timeout.deref());
2332                let agent = agent_builder.build();
2333                let proto = self.protocol.schema();
2334                let url = format!(
2335                    "{}://{}:{}/write",
2336                    proto,
2337                    self.host.deref(),
2338                    self.port.deref()
2339                );
2340                ProtocolHandler::Http(HttpHandlerState {
2341                    agent,
2342                    url,
2343                    auth,
2344
2345                    config: self.http.as_ref().unwrap().clone(),
2346                })
2347            }
2348        };
2349
2350        if auth.is_some() {
2351            descr.push_str("auth=on]");
2352        } else {
2353            descr.push_str("auth=off]");
2354        }
2355
2356        let sender = Sender {
2357            descr,
2358            handler,
2359            connected: true,
2360            max_buf_size: *self.max_buf_size,
2361        };
2362
2363        Ok(sender)
2364    }
2365
2366    fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
2367        if self.protocol.is_tcpx() {
2368            Ok(())
2369        } else {
2370            Err(error::fmt!(
2371                ConfigError,
2372                "The {param_name:?} setting can only be used with the TCP protocol."
2373            ))
2374        }
2375    }
2376}
2377
2378/// When parsing from config, we exclude certain characters.
2379/// Here we repeat the same validation logic for consistency.
2380fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
2381    let str_ref = value.as_ref();
2382    for (p, c) in str_ref.chars().enumerate() {
2383        if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
2384            return Err(error::fmt!(
2385                ConfigError,
2386                "Invalid character {c:?} at position {p}"
2387            ));
2388        }
2389    }
2390    Ok(value)
2391}
2392
2393fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
2394where
2395    T: FromStr,
2396    T::Err: std::fmt::Debug,
2397{
2398    str_value.parse().map_err(|e| {
2399        error::fmt!(
2400            ConfigError,
2401            "Could not parse {param_name:?} to number: {e:?}"
2402        )
2403    })
2404}
2405
2406fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
2407    Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
2408        error::fmt!(
2409            AuthError,
2410            "Misconfigured ILP authentication keys. Could not decode {}: {}. \
2411            Hint: Check the keys for a possible typo.",
2412            descr,
2413            b64_err
2414        )
2415    })
2416}
2417
2418fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
2419    let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
2420    let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
2421
2422    // SEC 1 Uncompressed Octet-String-to-Elliptic-Curve-Point Encoding
2423    let mut encoded = Vec::new();
2424    encoded.push(4u8); // 0x04 magic byte that identifies this as uncompressed.
2425    let pub_key_x_ken = pub_key_x.len();
2426    if pub_key_x_ken > 32 {
2427        return Err(error::fmt!(
2428            AuthError,
2429            "Misconfigured ILP authentication keys. Public key x is too long. \
2430            Hint: Check the keys for a possible typo."
2431        ));
2432    }
2433    let pub_key_y_len = pub_key_y.len();
2434    if pub_key_y_len > 32 {
2435        return Err(error::fmt!(
2436            AuthError,
2437            "Misconfigured ILP authentication keys. Public key y is too long. \
2438            Hint: Check the keys for a possible typo."
2439        ));
2440    }
2441    encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
2442    encoded.append(&mut pub_key_x);
2443    encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
2444    encoded.append(&mut pub_key_y);
2445    Ok(encoded)
2446}
2447
2448fn parse_key_pair(auth: &EcdsaAuthParams) -> Result<EcdsaKeyPair> {
2449    let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
2450    let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
2451    let system_random = SystemRandom::new();
2452    EcdsaKeyPair::from_private_key_and_public_key(
2453        &ECDSA_P256_SHA256_FIXED_SIGNING,
2454        &private_key[..],
2455        &public_key[..],
2456        &system_random,
2457    )
2458    .map_err(|key_rejected| {
2459        error::fmt!(
2460            AuthError,
2461            "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
2462            key_rejected
2463        )
2464    })
2465}
2466
2467pub(crate) struct F64Serializer {
2468    buf: ryu::Buffer,
2469    n: f64,
2470}
2471
2472impl F64Serializer {
2473    pub(crate) fn new(n: f64) -> Self {
2474        F64Serializer {
2475            buf: ryu::Buffer::new(),
2476            n,
2477        }
2478    }
2479
2480    // This function was taken and customized from the ryu crate.
2481    #[cold]
2482    #[cfg_attr(feature = "no-panic", inline)]
2483    fn format_nonfinite(&self) -> &'static str {
2484        const MANTISSA_MASK: u64 = 0x000fffffffffffff;
2485        const SIGN_MASK: u64 = 0x8000000000000000;
2486        let bits = self.n.to_bits();
2487        if bits & MANTISSA_MASK != 0 {
2488            "NaN"
2489        } else if bits & SIGN_MASK != 0 {
2490            "-Infinity"
2491        } else {
2492            "Infinity"
2493        }
2494    }
2495
2496    pub(crate) fn as_str(&mut self) -> &str {
2497        if self.n.is_finite() {
2498            self.buf.format_finite(self.n)
2499        } else {
2500            self.format_nonfinite()
2501        }
2502    }
2503}
2504
2505impl Sender {
2506    /// Create a new `Sender` instance from the given configuration string.
2507    ///
2508    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
2509    ///
2510    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
2511    ///
2512    /// We recommend HTTP for most cases because it provides more features, like
2513    /// reporting errors to the client and supporting transaction control. TCP can
2514    /// sometimes be faster in higher-latency networks, but misses a number of
2515    /// features.
2516    ///
2517    /// Keys in the config string correspond to same-named methods on `SenderBuilder`.
2518    ///
2519    /// For the full list of keys and values, see the docs on [`SenderBuilder`].
2520    ///
2521    /// You can also load the configuration from an environment variable.
2522    /// See [`Sender::from_env`].
2523    ///
2524    /// In the case of TCP, this synchronously establishes the TCP connection, and
2525    /// returns once the connection is fully established. If the connection
2526    /// requires authentication or TLS, these will also be completed before
2527    /// returning.
2528    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
2529        SenderBuilder::from_conf(conf)?.build()
2530    }
2531
2532    /// Create a new `Sender` from the configuration stored in the `QDB_CLIENT_CONF`
2533    /// environment variable. The format is the same as that accepted by
2534    /// [`Sender::from_conf`].
2535    ///
2536    /// In the case of TCP, this synchronously establishes the TCP connection, and
2537    /// returns once the connection is fully established. If the connection
2538    /// requires authentication or TLS, these will also be completed before
2539    /// returning.
2540    pub fn from_env() -> Result<Self> {
2541        SenderBuilder::from_env()?.build()
2542    }
2543
2544    #[allow(unused_variables)]
2545    fn flush_impl(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2546        if !self.connected {
2547            return Err(error::fmt!(
2548                SocketError,
2549                "Could not flush buffer: not connected to database."
2550            ));
2551        }
2552        buf.check_op(Op::Flush)?;
2553
2554        if buf.len() > self.max_buf_size {
2555            return Err(error::fmt!(
2556                InvalidApiCall,
2557                "Could not flush buffer: Buffer size of {} exceeds maximum configured allowed size of {} bytes.",
2558                buf.len(),
2559                self.max_buf_size
2560            ));
2561        }
2562
2563        let bytes = buf.as_str().as_bytes();
2564        if bytes.is_empty() {
2565            return Ok(());
2566        }
2567        match self.handler {
2568            ProtocolHandler::Socket(ref mut conn) => {
2569                if transactional {
2570                    return Err(error::fmt!(
2571                        InvalidApiCall,
2572                        "Transactional flushes are not supported for ILP over TCP."
2573                    ));
2574                }
2575                conn.write_all(bytes).map_err(|io_err| {
2576                    self.connected = false;
2577                    map_io_to_socket_err("Could not flush buffer: ", io_err)
2578                })?;
2579            }
2580            #[cfg(feature = "ilp-over-http")]
2581            ProtocolHandler::Http(ref state) => {
2582                if transactional && !buf.transactional() {
2583                    return Err(error::fmt!(
2584                        InvalidApiCall,
2585                        "Buffer contains lines for multiple tables. \
2586                        Transactional flushes are only supported for buffers containing lines for a single table."
2587                    ));
2588                }
2589                let request_min_throughput = *state.config.request_min_throughput;
2590                let extra_time = if request_min_throughput > 0 {
2591                    (bytes.len() as f64) / (request_min_throughput as f64)
2592                } else {
2593                    0.0f64
2594                };
2595                let timeout = *state.config.request_timeout + Duration::from_secs_f64(extra_time);
2596                let request = state
2597                    .agent
2598                    .post(&state.url)
2599                    .query_pairs([("precision", "n")])
2600                    .timeout(timeout)
2601                    .set("Content-Type", "text/plain; charset=utf-8");
2602                let request = match state.auth.as_ref() {
2603                    Some(auth) => request.set("Authorization", auth),
2604                    None => request,
2605                };
2606                let response_or_err =
2607                    http_send_with_retries(request, bytes, *state.config.retry_timeout);
2608                match response_or_err {
2609                    Ok(_response) => {
2610                        // on success, there's no information in the response.
2611                    }
2612                    Err(ureq::Error::Status(http_status_code, response)) => {
2613                        return Err(parse_http_error(http_status_code, response));
2614                    }
2615                    Err(ureq::Error::Transport(transport)) => {
2616                        return Err(error::fmt!(
2617                            SocketError,
2618                            "Could not flush buffer: {}",
2619                            transport
2620                        ));
2621                    }
2622                }
2623            }
2624        }
2625        Ok(())
2626    }
2627
2628    /// Send the batch of rows in the buffer to the QuestDB server, and, if the
2629    /// `transactional` parameter is true, ensure the flush will be transactional.
2630    ///
2631    /// A flush is transactional iff all the rows belong to the same table. This allows
2632    /// QuestDB to treat the flush as a single database transaction, because it doesn't
2633    /// support transactions spanning multiple tables. Additionally, only ILP-over-HTTP
2634    /// supports transactional flushes.
2635    ///
2636    /// If the flush wouldn't be transactional, this function returns an error and
2637    /// doesn't flush any data.
2638    ///
2639    /// The function sends an HTTP request and waits for the response. If the server
2640    /// responds with an error, it returns a descriptive error. In the case of a network
2641    /// error, it retries until it has exhausted the retry time budget.
2642    ///
2643    /// All the data stays in the buffer. Clear the buffer before starting a new batch.
2644    #[cfg(feature = "ilp-over-http")]
2645    pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2646        self.flush_impl(buf, transactional)
2647    }
2648
2649    /// Send the given buffer of rows to the QuestDB server.
2650    ///
2651    /// All the data stays in the buffer. Clear the buffer before starting a new batch.
2652    ///
2653    /// To send and clear in one step, call [Sender::flush] instead.
2654    pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> {
2655        self.flush_impl(buf, false)
2656    }
2657
2658    /// Send the given buffer of rows to the QuestDB server, clearing the buffer.
2659    ///
2660    /// After this function returns, the buffer is empty and ready for the next batch.
2661    /// If you want to preserve the buffer contents, call [Sender::flush_and_keep]. If
2662    /// you want to ensure the flush is transactional, call
2663    /// [Sender::flush_and_keep_with_flags].
2664    ///
2665    /// With ILP-over-HTTP, this function sends an HTTP request and waits for the
2666    /// response. If the server responds with an error, it returns a descriptive error.
2667    /// In the case of a network error, it retries until it has exhausted the retry time
2668    /// budget.
2669    ///
2670    /// With ILP-over-TCP, the function blocks only until the buffer is flushed to the
2671    /// underlying OS-level network socket, without waiting to actually send it to the
2672    /// server. In the case of an error, the server will quietly disconnect: consult the
2673    /// server logs for error messages.
2674    ///
2675    /// HTTP should be the first choice, but use TCP if you need to continuously send
2676    /// data to the server at a high rate.
2677    ///
2678    /// To improve the HTTP performance, send larger buffers (with more rows), and
2679    /// consider parallelizing writes using multiple senders from multiple threads.
2680    pub fn flush(&mut self, buf: &mut Buffer) -> Result<()> {
2681        self.flush_impl(buf, false)?;
2682        buf.clear();
2683        Ok(())
2684    }
2685
2686    /// Tell whether the sender is no longer usable and must be dropped.
2687    ///
2688    /// This happens when there was an earlier failure.
2689    ///
2690    /// This method is specific to ILP-over-TCP and is not relevant for ILP-over-HTTP.
2691    pub fn must_close(&self) -> bool {
2692        !self.connected
2693    }
2694}
2695
2696mod conf;
2697mod timestamp;
2698
2699#[cfg(feature = "ilp-over-http")]
2700mod http;
2701
2702#[cfg(feature = "ilp-over-http")]
2703use http::*;
2704
2705#[cfg(test)]
2706mod tests;