questdb/ingress/
mod.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2024 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24
25#![doc = include_str!("mod.md")]
26
27pub use self::timestamp::*;
28
29use crate::error::{self, Error, Result};
30use crate::gai;
31use crate::ingress::conf::ConfigSetting;
32use core::time::Duration;
33use std::collections::HashMap;
34use std::convert::Infallible;
35use std::fmt::{Debug, Display, Formatter, Write};
36use std::io::{self, BufRead, BufReader, ErrorKind, Write as IoWrite};
37use std::ops::Deref;
38use std::path::PathBuf;
39use std::str::FromStr;
40use std::sync::Arc;
41
42use base64ct::{Base64, Base64UrlUnpadded, Encoding};
43use ring::rand::SystemRandom;
44use ring::signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING};
45use rustls::{ClientConnection, RootCertStore, StreamOwned};
46use rustls_pki_types::ServerName;
47use socket2::{Domain, Protocol as SockProtocol, SockAddr, Socket, Type};
48
49#[derive(Debug, Copy, Clone)]
50enum Op {
51    Table = 1,
52    Symbol = 1 << 1,
53    Column = 1 << 2,
54    At = 1 << 3,
55    Flush = 1 << 4,
56}
57
58impl Op {
59    fn descr(self) -> &'static str {
60        match self {
61            Op::Table => "table",
62            Op::Symbol => "symbol",
63            Op::Column => "column",
64            Op::At => "at",
65            Op::Flush => "flush",
66        }
67    }
68}
69
70fn map_io_to_socket_err(prefix: &str, io_err: io::Error) -> Error {
71    error::fmt!(SocketError, "{}{}", prefix, io_err)
72}
73
74/// A validated table name.
75///
76/// This type simply wraps a `&str`.
77///
78/// When you pass a `TableName` instead of a plain string to a [`Buffer`] method,
79/// it doesn't have to validate it again. This saves CPU cycles.
80#[derive(Clone, Copy)]
81pub struct TableName<'a> {
82    name: &'a str,
83}
84
85impl<'a> TableName<'a> {
86    /// Construct a validated table name.
87    pub fn new(name: &'a str) -> Result<Self> {
88        if name.is_empty() {
89            return Err(error::fmt!(
90                InvalidName,
91                "Table names must have a non-zero length."
92            ));
93        }
94
95        let mut prev = '\0';
96        for (index, c) in name.chars().enumerate() {
97            match c {
98                '.' => {
99                    if index == 0 || index == name.len() - 1 || prev == '.' {
100                        return Err(error::fmt!(
101                            InvalidName,
102                            concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
103                            name,
104                            index
105                        ));
106                    }
107                }
108                '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
109                | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
110                | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
111                | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
112                    return Err(error::fmt!(
113                        InvalidName,
114                        concat!(
115                            "Bad string {:?}: ",
116                            "Table names can't contain ",
117                            "a {:?} character, which was found at ",
118                            "byte position {}."
119                        ),
120                        name,
121                        c,
122                        index
123                    ));
124                }
125                '\u{feff}' => {
126                    // Reject unicode char 'ZERO WIDTH NO-BREAK SPACE',
127                    // aka UTF-8 BOM if it appears anywhere in the string.
128                    return Err(error::fmt!(
129                        InvalidName,
130                        concat!(
131                            "Bad string {:?}: ",
132                            "Table names can't contain ",
133                            "a UTF-8 BOM character, which was found at ",
134                            "byte position {}."
135                        ),
136                        name,
137                        index
138                    ));
139                }
140                _ => (),
141            }
142            prev = c;
143        }
144
145        Ok(Self { name })
146    }
147
148    /// Construct a table name without validating it.
149    ///
150    /// This breaks API encapsulation and is only intended for use
151    /// when the the string was already previously validated.
152    ///
153    /// The QuestDB server will reject an invalid table name.
154    pub fn new_unchecked(name: &'a str) -> Self {
155        Self { name }
156    }
157}
158
159/// A validated column name.
160///
161/// This type simply wraps a `&str`.
162///
163/// When you pass a `ColumnName` instead of a plain string to a [`Buffer`] method,
164/// it doesn't have to validate it again. This saves CPU cycles.
165#[derive(Clone, Copy)]
166pub struct ColumnName<'a> {
167    name: &'a str,
168}
169
170impl<'a> ColumnName<'a> {
171    /// Construct a validated table name.
172    pub fn new(name: &'a str) -> Result<Self> {
173        if name.is_empty() {
174            return Err(error::fmt!(
175                InvalidName,
176                "Column names must have a non-zero length."
177            ));
178        }
179
180        for (index, c) in name.chars().enumerate() {
181            match c {
182                '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
183                | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
184                | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
185                | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
186                    return Err(error::fmt!(
187                        InvalidName,
188                        concat!(
189                            "Bad string {:?}: ",
190                            "Column names can't contain ",
191                            "a {:?} character, which was found at ",
192                            "byte position {}."
193                        ),
194                        name,
195                        c,
196                        index
197                    ));
198                }
199                '\u{FEFF}' => {
200                    // Reject unicode char 'ZERO WIDTH NO-BREAK SPACE',
201                    // aka UTF-8 BOM if it appears anywhere in the string.
202                    return Err(error::fmt!(
203                        InvalidName,
204                        concat!(
205                            "Bad string {:?}: ",
206                            "Column names can't contain ",
207                            "a UTF-8 BOM character, which was found at ",
208                            "byte position {}."
209                        ),
210                        name,
211                        index
212                    ));
213                }
214                _ => (),
215            }
216        }
217
218        Ok(Self { name })
219    }
220
221    /// Construct a column name without validating it.
222    ///
223    /// This breaks API encapsulation and is only intended for use
224    /// when the the string was already previously validated.
225    ///
226    /// The QuestDB server will reject an invalid column name.
227    pub fn new_unchecked(name: &'a str) -> Self {
228        Self { name }
229    }
230}
231
232impl<'a> TryFrom<&'a str> for TableName<'a> {
233    type Error = self::Error;
234
235    fn try_from(name: &'a str) -> Result<Self> {
236        Self::new(name)
237    }
238}
239
240impl<'a> TryFrom<&'a str> for ColumnName<'a> {
241    type Error = self::Error;
242
243    fn try_from(name: &'a str) -> Result<Self> {
244        Self::new(name)
245    }
246}
247
248impl From<Infallible> for Error {
249    fn from(_: Infallible) -> Self {
250        unreachable!()
251    }
252}
253
254fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut String, s: &str)
255where
256    C: Fn(u8) -> bool,
257    Q: Fn(&mut Vec<u8>),
258{
259    let output_vec = unsafe { output.as_mut_vec() };
260    let mut to_escape = 0usize;
261    for b in s.bytes() {
262        if check_escape_fn(b) {
263            to_escape += 1;
264        }
265    }
266
267    quoting_fn(output_vec);
268
269    if to_escape == 0 {
270        // output.push_str(s);
271        output_vec.extend_from_slice(s.as_bytes());
272    } else {
273        let additional = s.len() + to_escape;
274        output_vec.reserve(additional);
275        let mut index = output_vec.len();
276        unsafe { output_vec.set_len(index + additional) };
277        for b in s.bytes() {
278            if check_escape_fn(b) {
279                unsafe {
280                    *output_vec.get_unchecked_mut(index) = b'\\';
281                }
282                index += 1;
283            }
284
285            unsafe {
286                *output_vec.get_unchecked_mut(index) = b;
287            }
288            index += 1;
289        }
290    }
291
292    quoting_fn(output_vec);
293}
294
295fn must_escape_unquoted(c: u8) -> bool {
296    matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
297}
298
299fn must_escape_quoted(c: u8) -> bool {
300    matches!(c, b'\n' | b'\r' | b'"' | b'\\')
301}
302
303fn write_escaped_unquoted(output: &mut String, s: &str) {
304    write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
305}
306
307fn write_escaped_quoted(output: &mut String, s: &str) {
308    write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
309}
310
311enum Connection {
312    Direct(Socket),
313    Tls(Box<StreamOwned<ClientConnection, Socket>>),
314}
315
316impl Connection {
317    fn send_key_id(&mut self, key_id: &str) -> Result<()> {
318        writeln!(self, "{}", key_id)
319            .map_err(|io_err| map_io_to_socket_err("Failed to send key_id: ", io_err))?;
320        Ok(())
321    }
322
323    fn read_challenge(&mut self) -> Result<Vec<u8>> {
324        let mut buf = Vec::new();
325        let mut reader = BufReader::new(self);
326        reader.read_until(b'\n', &mut buf).map_err(|io_err| {
327            map_io_to_socket_err(
328                "Failed to read authentication challenge (timed out?): ",
329                io_err,
330            )
331        })?;
332        if buf.last().copied().unwrap_or(b'\0') != b'\n' {
333            return Err(if buf.is_empty() {
334                error::fmt!(
335                    AuthError,
336                    concat!(
337                        "Did not receive auth challenge. ",
338                        "Is the database configured to require ",
339                        "authentication?"
340                    )
341                )
342            } else {
343                error::fmt!(AuthError, "Received incomplete auth challenge: {:?}", buf)
344            });
345        }
346        buf.pop(); // b'\n'
347        Ok(buf)
348    }
349
350    fn authenticate(&mut self, auth: &EcdsaAuthParams) -> Result<()> {
351        if auth.key_id.contains('\n') {
352            return Err(error::fmt!(
353                AuthError,
354                "Bad key id {:?}: Should not contain new-line char.",
355                auth.key_id
356            ));
357        }
358        let key_pair = parse_key_pair(auth)?;
359        self.send_key_id(auth.key_id.as_str())?;
360        let challenge = self.read_challenge()?;
361        let rng = SystemRandom::new();
362        let signature = key_pair
363            .sign(&rng, &challenge[..])
364            .map_err(|unspecified_err| {
365                error::fmt!(AuthError, "Failed to sign challenge: {}", unspecified_err)
366            })?;
367        let mut encoded_sig = Base64::encode_string(signature.as_ref());
368        encoded_sig.push('\n');
369        let buf = encoded_sig.as_bytes();
370        if let Err(io_err) = self.write_all(buf) {
371            return Err(map_io_to_socket_err(
372                "Could not send signed challenge: ",
373                io_err,
374            ));
375        }
376        Ok(())
377    }
378}
379
380enum ProtocolHandler {
381    Socket(Connection),
382
383    #[cfg(feature = "ilp-over-http")]
384    Http(HttpHandlerState),
385}
386
387impl io::Read for Connection {
388    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
389        match self {
390            Self::Direct(sock) => sock.read(buf),
391            Self::Tls(stream) => stream.read(buf),
392        }
393    }
394}
395
396impl io::Write for Connection {
397    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
398        match self {
399            Self::Direct(sock) => sock.write(buf),
400            Self::Tls(stream) => stream.write(buf),
401        }
402    }
403
404    fn flush(&mut self) -> io::Result<()> {
405        match self {
406            Self::Direct(sock) => sock.flush(),
407            Self::Tls(stream) => stream.flush(),
408        }
409    }
410}
411
412#[derive(Debug, Copy, Clone, PartialEq)]
413enum OpCase {
414    Init = Op::Table as isize,
415    TableWritten = Op::Symbol as isize | Op::Column as isize,
416    SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
417    ColumnWritten = Op::Column as isize | Op::At as isize,
418    MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
419}
420
421impl OpCase {
422    fn next_op_descr(self) -> &'static str {
423        match self {
424            OpCase::Init => "should have called `table` instead",
425            OpCase::TableWritten => "should have called `symbol` or `column` instead",
426            OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
427            OpCase::ColumnWritten => "should have called `column` or `at` instead",
428            OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
429        }
430    }
431}
432
433#[derive(Debug, Clone)]
434struct BufferState {
435    op_case: OpCase,
436    row_count: usize,
437    first_table: Option<String>,
438    transactional: bool,
439}
440
441impl BufferState {
442    fn new() -> Self {
443        Self {
444            op_case: OpCase::Init,
445            row_count: 0,
446            first_table: None,
447            transactional: true,
448        }
449    }
450
451    fn clear(&mut self) {
452        self.op_case = OpCase::Init;
453        self.row_count = 0;
454        self.first_table = None;
455        self.transactional = true;
456    }
457}
458
459/// A reusable buffer to prepare a batch of ILP messages.
460///
461/// # Example
462///
463/// ```
464/// # use questdb::Result;
465/// use questdb::ingress::{Buffer, TimestampMicros, TimestampNanos};
466///
467/// # fn main() -> Result<()> {
468/// let mut buffer = Buffer::new();
469///
470/// // first row
471/// buffer
472///     .table("table1")?
473///     .symbol("bar", "baz")?
474///     .column_bool("a", false)?
475///     .column_i64("b", 42)?
476///     .column_f64("c", 3.14)?
477///     .column_str("d", "hello")?
478///     .column_ts("e", TimestampMicros::now())?
479///     .at(TimestampNanos::now())?;
480///
481/// // second row
482/// buffer
483///     .table("table2")?
484///     .symbol("foo", "bar")?
485///     .at(TimestampNanos::now())?;
486/// # Ok(())
487/// # }
488/// ```
489///
490/// Send the buffer to QuestDB using [`sender.flush(&mut buffer)`](Sender::flush).
491///
492/// # Sequential Coupling
493/// The Buffer API is sequentially coupled:
494///   * A row always starts with [`table`](Buffer::table).
495///   * A row must contain at least one [`symbol`](Buffer::symbol) or
496///     column (
497///       [`column_bool`](Buffer::column_bool),
498///       [`column_i64`](Buffer::column_i64),
499///       [`column_f64`](Buffer::column_f64),
500///       [`column_str`](Buffer::column_str),
501///       [`column_ts`](Buffer::column_ts)).
502///   * Symbols must appear before columns.
503///   * A row must be terminated with either [`at`](Buffer::at) or
504///     [`at_now`](Buffer::at_now).
505///
506/// This diagram visualizes the sequence:
507///
508/// <img src="https://raw.githubusercontent.com/questdb/c-questdb-client/main/api_seq/seq.svg">
509///
510/// # Buffer method calls, Serialized ILP types and QuestDB types
511///
512/// | Buffer Method | Serialized as ILP type (Click on link to see possible casts) |
513/// |---------------|--------------------------------------------------------------|
514/// | [`symbol`](Buffer::symbol) | [`SYMBOL`](https://questdb.io/docs/concept/symbol/) |
515/// | [`column_bool`](Buffer::column_bool) | [`BOOLEAN`](https://questdb.io/docs/reference/api/ilp/columnset-types#boolean) |
516/// | [`column_i64`](Buffer::column_i64) | [`INTEGER`](https://questdb.io/docs/reference/api/ilp/columnset-types#integer) |
517/// | [`column_f64`](Buffer::column_f64) | [`FLOAT`](https://questdb.io/docs/reference/api/ilp/columnset-types#float) |
518/// | [`column_str`](Buffer::column_str) | [`STRING`](https://questdb.io/docs/reference/api/ilp/columnset-types#string) |
519/// | [`column_ts`](Buffer::column_ts) | [`TIMESTAMP`](https://questdb.io/docs/reference/api/ilp/columnset-types#timestamp) |
520///
521/// QuestDB supports both `STRING` and `SYMBOL` column types.
522///
523/// To understand the difference, refer to the
524/// [QuestDB documentation](https://questdb.io/docs/concept/symbol/). In a nutshell,
525/// symbols are interned strings, most suitable for identifiers that are repeated many
526/// times throughout the column. They offer an advantage in storage space and query
527/// performance.
528///
529/// # Inserting NULL values
530///
531/// To insert a NULL value, skip the symbol or column for that row.
532///
533/// # Recovering from validation errors
534///
535/// If you want to recover from potential validation errors, call
536/// [`buffer.set_marker()`](Buffer::set_marker) to track the last known good state,
537/// append as many rows or parts of rows as you like, and then call
538/// [`buffer.clear_marker()`](Buffer::clear_marker) on success.
539///
540/// If there was an error in one of the rows, use
541/// [`buffer.rewind_to_marker()`](Buffer::rewind_to_marker) to go back to the
542/// marked last known good state.
543///
544#[derive(Debug, Clone)]
545pub struct Buffer {
546    output: String,
547    state: BufferState,
548    marker: Option<(usize, BufferState)>,
549    max_name_len: usize,
550}
551
552impl Buffer {
553    /// Construct a `Buffer` with a `max_name_len` of `127`, which is the same as the
554    /// QuestDB server default.
555    pub fn new() -> Self {
556        Self {
557            output: String::new(),
558            state: BufferState::new(),
559            marker: None,
560            max_name_len: 127,
561        }
562    }
563
564    /// Construct a `Buffer` with a custom maximum length for table and column names.
565    ///
566    /// This should match the `cairo.max.file.name.length` setting of the
567    /// QuestDB instance you're connecting to.
568    ///
569    /// If the server does not configure it, the default is `127` and you can simply
570    /// call [`new`](Buffer::new).
571    pub fn with_max_name_len(max_name_len: usize) -> Self {
572        let mut buf = Self::new();
573        buf.max_name_len = max_name_len;
574        buf
575    }
576
577    /// Pre-allocate to ensure the buffer has enough capacity for at least the
578    /// specified additional byte count. This may be rounded up.
579    /// This does not allocate if such additional capacity is already satisfied.
580    /// See: `capacity`.
581    pub fn reserve(&mut self, additional: usize) {
582        self.output.reserve(additional);
583    }
584
585    /// The number of bytes accumulated in the buffer.
586    pub fn len(&self) -> usize {
587        self.output.len()
588    }
589
590    /// The number of rows accumulated in the buffer.
591    pub fn row_count(&self) -> usize {
592        self.state.row_count
593    }
594
595    /// Tells whether the buffer is transactional. It is transactional iff it contains
596    /// data for at most one table. Additionally, you must send the buffer over HTTP to
597    /// get transactional behavior.
598    pub fn transactional(&self) -> bool {
599        self.state.transactional
600    }
601
602    pub fn is_empty(&self) -> bool {
603        self.output.is_empty()
604    }
605
606    /// The total number of bytes the buffer can hold before it needs to resize.
607    pub fn capacity(&self) -> usize {
608        self.output.capacity()
609    }
610
611    /// A string representation of the buffer's contents. Useful for debugging.
612    pub fn as_str(&self) -> &str {
613        &self.output
614    }
615
616    /// Mark a rewind point.
617    /// This allows undoing accumulated changes to the buffer for one or more
618    /// rows by calling [`rewind_to_marker`](Buffer::rewind_to_marker).
619    /// Any previous marker will be discarded.
620    /// Once the marker is no longer needed, call
621    /// [`clear_marker`](Buffer::clear_marker).
622    pub fn set_marker(&mut self) -> Result<()> {
623        if (self.state.op_case as isize & Op::Table as isize) == 0 {
624            return Err(error::fmt!(
625                InvalidApiCall,
626                concat!(
627                    "Can't set the marker whilst constructing a line. ",
628                    "A marker may only be set on an empty buffer or after ",
629                    "`at` or `at_now` is called."
630                )
631            ));
632        }
633        self.marker = Some((self.output.len(), self.state.clone()));
634        Ok(())
635    }
636
637    /// Undo all changes since the last [`set_marker`](Buffer::set_marker)
638    /// call.
639    ///
640    /// As a side-effect, this also clears the marker.
641    pub fn rewind_to_marker(&mut self) -> Result<()> {
642        if let Some((position, state)) = self.marker.take() {
643            self.output.truncate(position);
644            self.state = state;
645            Ok(())
646        } else {
647            Err(error::fmt!(
648                InvalidApiCall,
649                "Can't rewind to the marker: No marker set."
650            ))
651        }
652    }
653
654    /// Discard any marker as may have been set by
655    /// [`set_marker`](Buffer::set_marker).
656    ///
657    /// Idempotent.
658    pub fn clear_marker(&mut self) {
659        self.marker = None;
660    }
661
662    /// Reset the buffer and clear contents whilst retaining
663    /// [`capacity`](Buffer::capacity).
664    pub fn clear(&mut self) {
665        self.output.clear();
666        self.state.clear();
667        self.marker = None;
668    }
669
670    /// Check if the next API operation is allowed as per the OP case state machine.
671    #[inline(always)]
672    fn check_op(&self, op: Op) -> Result<()> {
673        if (self.state.op_case as isize & op as isize) > 0 {
674            Ok(())
675        } else {
676            Err(error::fmt!(
677                InvalidApiCall,
678                "State error: Bad call to `{}`, {}.",
679                op.descr(),
680                self.state.op_case.next_op_descr()
681            ))
682        }
683    }
684
685    #[inline(always)]
686    fn validate_max_name_len(&self, name: &str) -> Result<()> {
687        if name.len() > self.max_name_len {
688            return Err(error::fmt!(
689                InvalidName,
690                "Bad name: {:?}: Too long (max {} characters)",
691                name,
692                self.max_name_len
693            ));
694        }
695        Ok(())
696    }
697
698    /// Begin recording a new row for the given table.
699    ///
700    /// ```
701    /// # use questdb::Result;
702    /// # use questdb::ingress::Buffer;
703    /// # fn main() -> Result<()> {
704    /// # let mut buffer = Buffer::new();
705    /// buffer.table("table_name")?;
706    /// # Ok(())
707    /// # }
708    /// ```
709    ///
710    /// or
711    ///
712    /// ```
713    /// # use questdb::Result;
714    /// # use questdb::ingress::Buffer;
715    /// use questdb::ingress::TableName;
716    ///
717    /// # fn main() -> Result<()> {
718    /// # let mut buffer = Buffer::new();
719    /// let table_name = TableName::new("table_name")?;
720    /// buffer.table(table_name)?;
721    /// # Ok(())
722    /// # }
723    /// ```
724    pub fn table<'a, N>(&mut self, name: N) -> Result<&mut Self>
725    where
726        N: TryInto<TableName<'a>>,
727        Error: From<N::Error>,
728    {
729        let name: TableName<'a> = name.try_into()?;
730        self.validate_max_name_len(name.name)?;
731        self.check_op(Op::Table)?;
732        write_escaped_unquoted(&mut self.output, name.name);
733        self.state.op_case = OpCase::TableWritten;
734
735        // A buffer stops being transactional if it targets multiple tables.
736        if let Some(first_table) = &self.state.first_table {
737            if first_table != name.name {
738                self.state.transactional = false;
739            }
740        } else {
741            self.state.first_table = Some(name.name.to_owned());
742        }
743        Ok(self)
744    }
745
746    /// Record a symbol for the given column.
747    /// Make sure you record all symbol columns before any other column type.
748    ///
749    /// ```
750    /// # use questdb::Result;
751    /// # use questdb::ingress::Buffer;
752    /// # fn main() -> Result<()> {
753    /// # let mut buffer = Buffer::new();
754    /// # buffer.table("x")?;
755    /// buffer.symbol("col_name", "value")?;
756    /// # Ok(())
757    /// # }
758    /// ```
759    ///
760    /// or
761    ///
762    /// ```
763    /// # use questdb::Result;
764    /// # use questdb::ingress::Buffer;
765    /// # fn main() -> Result<()> {
766    /// # let mut buffer = Buffer::new();
767    /// # buffer.table("x")?;
768    /// let value: String = "value".to_owned();
769    /// buffer.symbol("col_name", value)?;
770    /// # Ok(())
771    /// # }
772    /// ```
773    ///
774    /// or
775    ///
776    /// ```
777    /// # use questdb::Result;
778    /// # use questdb::ingress::Buffer;
779    /// use questdb::ingress::ColumnName;
780    ///
781    /// # fn main() -> Result<()> {
782    /// # let mut buffer = Buffer::new();
783    /// # buffer.table("x")?;
784    /// let col_name = ColumnName::new("col_name")?;
785    /// buffer.symbol(col_name, "value")?;
786    /// # Ok(())
787    /// # }
788    /// ```
789    ///
790    pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
791    where
792        N: TryInto<ColumnName<'a>>,
793        S: AsRef<str>,
794        Error: From<N::Error>,
795    {
796        let name: ColumnName<'a> = name.try_into()?;
797        self.validate_max_name_len(name.name)?;
798        self.check_op(Op::Symbol)?;
799        self.output.push(',');
800        write_escaped_unquoted(&mut self.output, name.name);
801        self.output.push('=');
802        write_escaped_unquoted(&mut self.output, value.as_ref());
803        self.state.op_case = OpCase::SymbolWritten;
804        Ok(self)
805    }
806
807    fn write_column_key<'a, N>(&mut self, name: N) -> Result<&mut Self>
808    where
809        N: TryInto<ColumnName<'a>>,
810        Error: From<N::Error>,
811    {
812        let name: ColumnName<'a> = name.try_into()?;
813        self.validate_max_name_len(name.name)?;
814        self.check_op(Op::Column)?;
815        self.output
816            .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
817                ' '
818            } else {
819                ','
820            });
821        write_escaped_unquoted(&mut self.output, name.name);
822        self.output.push('=');
823        self.state.op_case = OpCase::ColumnWritten;
824        Ok(self)
825    }
826
827    /// Record a boolean value for the given column.
828    ///
829    /// ```
830    /// # use questdb::Result;
831    /// # use questdb::ingress::Buffer;
832    /// # fn main() -> Result<()> {
833    /// # let mut buffer = Buffer::new();
834    /// # buffer.table("x")?;
835    /// buffer.column_bool("col_name", true)?;
836    /// # Ok(())
837    /// # }
838    /// ```
839    ///
840    /// or
841    ///
842    /// ```
843    /// # use questdb::Result;
844    /// # use questdb::ingress::Buffer;
845    /// use questdb::ingress::ColumnName;
846    ///
847    /// # fn main() -> Result<()> {
848    /// # let mut buffer = Buffer::new();
849    /// # buffer.table("x")?;
850    /// let col_name = ColumnName::new("col_name")?;
851    /// buffer.column_bool(col_name, true)?;
852    /// # Ok(())
853    /// # }
854    /// ```
855    pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> Result<&mut Self>
856    where
857        N: TryInto<ColumnName<'a>>,
858        Error: From<N::Error>,
859    {
860        self.write_column_key(name)?;
861        self.output.push(if value { 't' } else { 'f' });
862        Ok(self)
863    }
864
865    /// Record an integer value for the given column.
866    ///
867    /// ```
868    /// # use questdb::Result;
869    /// # use questdb::ingress::Buffer;
870    /// # fn main() -> Result<()> {
871    /// # let mut buffer = Buffer::new();
872    /// # buffer.table("x")?;
873    /// buffer.column_i64("col_name", 42)?;
874    /// # Ok(())
875    /// # }
876    /// ```
877    ///
878    /// or
879    ///
880    /// ```
881    /// # use questdb::Result;
882    /// # use questdb::ingress::Buffer;
883    /// use questdb::ingress::ColumnName;
884    ///
885    /// # fn main() -> Result<()> {
886    /// # let mut buffer = Buffer::new();
887    /// # buffer.table("x")?;
888    /// let col_name = ColumnName::new("col_name")?;
889    /// buffer.column_i64(col_name, 42);
890    /// # Ok(())
891    /// # }
892    /// ```
893    pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> Result<&mut Self>
894    where
895        N: TryInto<ColumnName<'a>>,
896        Error: From<N::Error>,
897    {
898        self.write_column_key(name)?;
899        let mut buf = itoa::Buffer::new();
900        let printed = buf.format(value);
901        self.output.push_str(printed);
902        self.output.push('i');
903        Ok(self)
904    }
905
906    /// Record a floating point value for the given column.
907    ///
908    /// ```
909    /// # use questdb::Result;
910    /// # use questdb::ingress::Buffer;
911    /// # fn main() -> Result<()> {
912    /// # let mut buffer = Buffer::new();
913    /// # buffer.table("x")?;
914    /// buffer.column_f64("col_name", 3.14)?;
915    /// # Ok(())
916    /// # }
917    /// ```
918    ///
919    /// or
920    ///
921    /// ```
922    /// # use questdb::Result;
923    /// # use questdb::ingress::Buffer;
924    /// use questdb::ingress::ColumnName;
925    ///
926    /// # fn main() -> Result<()> {
927    /// # let mut buffer = Buffer::new();
928    /// # buffer.table("x")?;
929    /// let col_name = ColumnName::new("col_name")?;
930    /// buffer.column_f64(col_name, 3.14)?;
931    /// # Ok(())
932    /// # }
933    /// ```
934    pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> Result<&mut Self>
935    where
936        N: TryInto<ColumnName<'a>>,
937        Error: From<N::Error>,
938    {
939        self.write_column_key(name)?;
940        let mut ser = F64Serializer::new(value);
941        self.output.push_str(ser.as_str());
942        Ok(self)
943    }
944
945    /// Record a string value for the given column.
946    ///
947    /// ```
948    /// # use questdb::Result;
949    /// # use questdb::ingress::Buffer;
950    /// # fn main() -> Result<()> {
951    /// # let mut buffer = Buffer::new();
952    /// # buffer.table("x")?;
953    /// buffer.column_str("col_name", "value")?;
954    /// # Ok(())
955    /// # }
956    /// ```
957    ///
958    /// or
959    ///
960    /// ```
961    /// # use questdb::Result;
962    /// # use questdb::ingress::Buffer;
963    /// # fn main() -> Result<()> {
964    /// # let mut buffer = Buffer::new();
965    /// # buffer.table("x")?;
966    /// let value: String = "value".to_owned();
967    /// buffer.column_str("col_name", value)?;
968    /// # Ok(())
969    /// # }
970    /// ```
971    ///
972    /// or
973    ///
974    /// ```
975    /// # use questdb::Result;
976    /// # use questdb::ingress::Buffer;
977    /// use questdb::ingress::ColumnName;
978    ///
979    /// # fn main() -> Result<()> {
980    /// # let mut buffer = Buffer::new();
981    /// # buffer.table("x")?;
982    /// let col_name = ColumnName::new("col_name")?;
983    /// buffer.column_str(col_name, "value")?;
984    /// # Ok(())
985    /// # }
986    /// ```
987    pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
988    where
989        N: TryInto<ColumnName<'a>>,
990        S: AsRef<str>,
991        Error: From<N::Error>,
992    {
993        self.write_column_key(name)?;
994        write_escaped_quoted(&mut self.output, value.as_ref());
995        Ok(self)
996    }
997
998    /// Record a timestamp value for the given column.
999    ///
1000    /// ```
1001    /// # use questdb::Result;
1002    /// # use questdb::ingress::Buffer;
1003    /// use questdb::ingress::TimestampMicros;
1004    /// # fn main() -> Result<()> {
1005    /// # let mut buffer = Buffer::new();
1006    /// # buffer.table("x")?;
1007    /// buffer.column_ts("col_name", TimestampMicros::now())?;
1008    /// # Ok(())
1009    /// # }
1010    /// ```
1011    ///
1012    /// or
1013    ///
1014    /// ```
1015    /// # use questdb::Result;
1016    /// # use questdb::ingress::Buffer;
1017    /// use questdb::ingress::TimestampMicros;
1018    ///
1019    /// # fn main() -> Result<()> {
1020    /// # let mut buffer = Buffer::new();
1021    /// # buffer.table("x")?;
1022    /// buffer.column_ts("col_name", TimestampMicros::new(1659548204354448))?;
1023    /// # Ok(())
1024    /// # }
1025    /// ```
1026    ///
1027    /// or
1028    ///
1029    /// ```
1030    /// # use questdb::Result;
1031    /// # use questdb::ingress::Buffer;
1032    /// use questdb::ingress::TimestampMicros;
1033    /// use questdb::ingress::ColumnName;
1034    ///
1035    /// # fn main() -> Result<()> {
1036    /// # let mut buffer = Buffer::new();
1037    /// # buffer.table("x")?;
1038    /// let col_name = ColumnName::new("col_name")?;
1039    /// buffer.column_ts(col_name, TimestampMicros::now())?;
1040    /// # Ok(())
1041    /// # }
1042    /// ```
1043    ///
1044    /// or you can also pass in a `TimestampNanos`.
1045    ///
1046    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1047    /// easily from either `chrono::DateTime` and `std::time::SystemTime`.
1048    ///
1049    /// This last option requires the `chrono_timestamp` feature.
1050    pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> Result<&mut Self>
1051    where
1052        N: TryInto<ColumnName<'a>>,
1053        T: TryInto<Timestamp>,
1054        Error: From<N::Error>,
1055        Error: From<T::Error>,
1056    {
1057        self.write_column_key(name)?;
1058        let timestamp: Timestamp = value.try_into()?;
1059        let timestamp: TimestampMicros = timestamp.try_into()?;
1060        let mut buf = itoa::Buffer::new();
1061        let printed = buf.format(timestamp.as_i64());
1062        self.output.push_str(printed);
1063        self.output.push('t');
1064        Ok(self)
1065    }
1066
1067    /// Complete the current row with the designated timestamp. After this call, you can
1068    /// start recording the next row by calling [Buffer::table] again, or  you can send
1069    /// the accumulated batch by calling [Sender::flush] or one of its variants.
1070    ///
1071    /// ```
1072    /// # use questdb::Result;
1073    /// # use questdb::ingress::Buffer;
1074    /// use questdb::ingress::TimestampNanos;
1075    /// # fn main() -> Result<()> {
1076    /// # let mut buffer = Buffer::new();
1077    /// # buffer.table("x")?.symbol("a", "b")?;
1078    /// buffer.at(TimestampNanos::now())?;
1079    /// # Ok(())
1080    /// # }
1081    /// ```
1082    ///
1083    /// or
1084    ///
1085    /// ```
1086    /// # use questdb::Result;
1087    /// # use questdb::ingress::Buffer;
1088    /// use questdb::ingress::TimestampNanos;
1089    ///
1090    /// # fn main() -> Result<()> {
1091    /// # let mut buffer = Buffer::new();
1092    /// # buffer.table("x")?.symbol("a", "b")?;
1093    /// buffer.at(TimestampNanos::new(1659548315647406592))?;
1094    /// # Ok(())
1095    /// # }
1096    /// ```
1097    ///
1098    /// You can also pass in a `TimestampMicros`.
1099    ///
1100    /// Note that both `TimestampMicros` and `TimestampNanos` can be constructed
1101    /// easily from either `chrono::DateTime` and `std::time::SystemTime`.
1102    ///
1103    pub fn at<T>(&mut self, timestamp: T) -> Result<()>
1104    where
1105        T: TryInto<Timestamp>,
1106        Error: From<T::Error>,
1107    {
1108        self.check_op(Op::At)?;
1109        let timestamp: Timestamp = timestamp.try_into()?;
1110
1111        // https://github.com/rust-lang/rust/issues/115880
1112        let timestamp: Result<TimestampNanos> = timestamp.try_into();
1113        let timestamp: TimestampNanos = timestamp?;
1114
1115        let epoch_nanos = timestamp.as_i64();
1116        if epoch_nanos < 0 {
1117            return Err(error::fmt!(
1118                InvalidTimestamp,
1119                "Timestamp {} is negative. It must be >= 0.",
1120                epoch_nanos
1121            ));
1122        }
1123        let mut buf = itoa::Buffer::new();
1124        let printed = buf.format(epoch_nanos);
1125        self.output.push(' ');
1126        self.output.push_str(printed);
1127        self.output.push('\n');
1128        self.state.op_case = OpCase::MayFlushOrTable;
1129        self.state.row_count += 1;
1130        Ok(())
1131    }
1132
1133    /// Complete the current row without providing a timestamp. The QuestDB instance
1134    /// will insert its own timestamp.
1135    ///
1136    /// Letting the server assign the timestamp can be faster since it reliably avoids
1137    /// out-of-order operations in the database for maximum ingestion throughput. However,
1138    /// it removes the ability to deduplicate rows.
1139    ///
1140    /// This is NOT equivalent to calling [Buffer::at] with the current time: the QuestDB
1141    /// server will set the timestamp only after receiving the row. If you're flushing
1142    /// infrequently, the server-assigned timestamp may be significantly behind the
1143    /// time the data was recorded in the buffer.
1144    ///
1145    /// In almost all cases, you should prefer the [Buffer::at] function.
1146    ///
1147    /// After this call, you can start recording the next row by calling [Buffer::table]
1148    /// again, or you can send the accumulated batch by calling [Sender::flush] or one of
1149    /// its variants.
1150    ///
1151    /// ```
1152    /// # use questdb::Result;
1153    /// # use questdb::ingress::Buffer;
1154    /// # fn main() -> Result<()> {
1155    /// # let mut buffer = Buffer::new();
1156    /// # buffer.table("x")?.symbol("a", "b")?;
1157    /// buffer.at_now()?;
1158    /// # Ok(())
1159    /// # }
1160    /// ```
1161    pub fn at_now(&mut self) -> Result<()> {
1162        self.check_op(Op::At)?;
1163        self.output.push('\n');
1164        self.state.op_case = OpCase::MayFlushOrTable;
1165        self.state.row_count += 1;
1166        Ok(())
1167    }
1168}
1169
1170impl Default for Buffer {
1171    fn default() -> Self {
1172        Self::new()
1173    }
1174}
1175
1176/// Connects to a QuestDB instance and inserts data via the ILP protocol.
1177///
1178/// * To construct an instance, use [`Sender::from_conf`] or the [`SenderBuilder`].
1179/// * To prepare messages, use [`Buffer`] objects.
1180/// * To send messages, call the [`flush`](Sender::flush) method.
1181pub struct Sender {
1182    descr: String,
1183    handler: ProtocolHandler,
1184    connected: bool,
1185    max_buf_size: usize,
1186}
1187
1188impl std::fmt::Debug for Sender {
1189    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1190        f.write_str(self.descr.as_str())
1191    }
1192}
1193
1194#[derive(PartialEq, Debug, Clone)]
1195struct EcdsaAuthParams {
1196    key_id: String,
1197    priv_key: String,
1198    pub_key_x: String,
1199    pub_key_y: String,
1200}
1201
1202#[derive(PartialEq, Debug, Clone)]
1203enum AuthParams {
1204    Ecdsa(EcdsaAuthParams),
1205
1206    #[cfg(feature = "ilp-over-http")]
1207    Basic(BasicAuthParams),
1208
1209    #[cfg(feature = "ilp-over-http")]
1210    Token(TokenAuthParams),
1211}
1212
1213/// Possible sources of the root certificates used to validate the server's TLS
1214/// certificate.
1215#[derive(PartialEq, Debug, Clone, Copy)]
1216pub enum CertificateAuthority {
1217    /// Use the root certificates provided by the
1218    /// [`webpki-roots`](https://crates.io/crates/webpki-roots) crate.
1219    #[cfg(feature = "tls-webpki-certs")]
1220    WebpkiRoots,
1221
1222    /// Use the root certificates provided by the OS
1223    #[cfg(feature = "tls-native-certs")]
1224    OsRoots,
1225
1226    /// Combine the root certificates provided by the OS and the `webpki-roots` crate.
1227    #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1228    WebpkiAndOsRoots,
1229
1230    /// Use the root certificates provided in a PEM-encoded file.
1231    PemFile,
1232}
1233
1234/// A `u16` port number or `String` port service name as is registered with
1235/// `/etc/services` or equivalent.
1236///
1237/// ```
1238/// use questdb::ingress::Port;
1239/// use std::convert::Into;
1240///
1241/// let service: Port = 9009.into();
1242/// ```
1243///
1244/// or
1245///
1246/// ```
1247/// use questdb::ingress::Port;
1248/// use std::convert::Into;
1249///
1250/// // Assuming the service name is registered.
1251/// let service: Port = "qdb_ilp".into();  // or with a String too.
1252/// ```
1253pub struct Port(String);
1254
1255impl From<String> for Port {
1256    fn from(s: String) -> Self {
1257        Port(s)
1258    }
1259}
1260
1261impl From<&str> for Port {
1262    fn from(s: &str) -> Self {
1263        Port(s.to_owned())
1264    }
1265}
1266
1267impl From<u16> for Port {
1268    fn from(p: u16) -> Self {
1269        Port(p.to_string())
1270    }
1271}
1272
1273#[cfg(feature = "insecure-skip-verify")]
1274mod danger {
1275    use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
1276    use rustls::{DigitallySignedStruct, Error, SignatureScheme};
1277    use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
1278
1279    #[derive(Debug)]
1280    pub struct NoCertificateVerification {}
1281
1282    impl ServerCertVerifier for NoCertificateVerification {
1283        fn verify_server_cert(
1284            &self,
1285            _end_entity: &CertificateDer<'_>,
1286            _intermediates: &[CertificateDer<'_>],
1287            _server_name: &ServerName<'_>,
1288            _ocsp_response: &[u8],
1289            _now: UnixTime,
1290        ) -> Result<ServerCertVerified, Error> {
1291            Ok(ServerCertVerified::assertion())
1292        }
1293
1294        fn verify_tls12_signature(
1295            &self,
1296            _message: &[u8],
1297            _cert: &CertificateDer<'_>,
1298            _dss: &DigitallySignedStruct,
1299        ) -> Result<HandshakeSignatureValid, Error> {
1300            Ok(HandshakeSignatureValid::assertion())
1301        }
1302
1303        fn verify_tls13_signature(
1304            &self,
1305            _message: &[u8],
1306            _cert: &CertificateDer<'_>,
1307            _dss: &DigitallySignedStruct,
1308        ) -> Result<HandshakeSignatureValid, Error> {
1309            Ok(HandshakeSignatureValid::assertion())
1310        }
1311
1312        fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
1313            rustls::crypto::ring::default_provider()
1314                .signature_verification_algorithms
1315                .supported_schemes()
1316        }
1317    }
1318}
1319
1320#[cfg(feature = "tls-webpki-certs")]
1321fn add_webpki_roots(root_store: &mut RootCertStore) {
1322    root_store
1323        .roots
1324        .extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned())
1325}
1326
1327#[cfg(feature = "tls-native-certs")]
1328fn add_os_roots(root_store: &mut RootCertStore) -> Result<()> {
1329    let os_certs = rustls_native_certs::load_native_certs().map_err(|io_err| {
1330        error::fmt!(
1331            TlsError,
1332            "Could not load OS native TLS certificates: {}",
1333            io_err
1334        )
1335    })?;
1336
1337    let (valid_count, invalid_count) = root_store.add_parsable_certificates(os_certs);
1338    if valid_count == 0 && invalid_count > 0 {
1339        return Err(error::fmt!(
1340            TlsError,
1341            "No valid certificates found in native root store ({} found but were invalid)",
1342            invalid_count
1343        ));
1344    }
1345    Ok(())
1346}
1347
1348fn configure_tls(
1349    tls_enabled: bool,
1350    tls_verify: bool,
1351    tls_ca: CertificateAuthority,
1352    tls_roots: &Option<PathBuf>,
1353) -> Result<Option<Arc<rustls::ClientConfig>>> {
1354    if !tls_enabled {
1355        return Ok(None);
1356    }
1357
1358    let mut root_store = RootCertStore::empty();
1359    if tls_verify {
1360        match (tls_ca, tls_roots) {
1361            #[cfg(feature = "tls-webpki-certs")]
1362            (CertificateAuthority::WebpkiRoots, None) => {
1363                add_webpki_roots(&mut root_store);
1364            }
1365
1366            #[cfg(feature = "tls-webpki-certs")]
1367            (CertificateAuthority::WebpkiRoots, Some(_)) => {
1368                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_roots\"."));
1369            }
1370
1371            #[cfg(feature = "tls-native-certs")]
1372            (CertificateAuthority::OsRoots, None) => {
1373                add_os_roots(&mut root_store)?;
1374            }
1375
1376            #[cfg(feature = "tls-native-certs")]
1377            (CertificateAuthority::OsRoots, Some(_)) => {
1378                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"os_roots\"."));
1379            }
1380
1381            #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1382            (CertificateAuthority::WebpkiAndOsRoots, None) => {
1383                add_webpki_roots(&mut root_store);
1384                add_os_roots(&mut root_store)?;
1385            }
1386
1387            #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1388            (CertificateAuthority::WebpkiAndOsRoots, Some(_)) => {
1389                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_and_os_roots\"."));
1390            }
1391
1392            (CertificateAuthority::PemFile, Some(ca_file)) => {
1393                let certfile = std::fs::File::open(ca_file).map_err(|io_err| {
1394                    error::fmt!(
1395                        TlsError,
1396                        concat!(
1397                            "Could not open tls_roots certificate authority ",
1398                            "file from path {:?}: {}"
1399                        ),
1400                        ca_file,
1401                        io_err
1402                    )
1403                })?;
1404                let mut reader = BufReader::new(certfile);
1405                let der_certs = rustls_pemfile::certs(&mut reader)
1406                    .collect::<std::result::Result<Vec<_>, _>>()
1407                    .map_err(|io_err| {
1408                        error::fmt!(
1409                            TlsError,
1410                            concat!(
1411                                "Could not read certificate authority ",
1412                                "file from path {:?}: {}"
1413                            ),
1414                            ca_file,
1415                            io_err
1416                        )
1417                    })?;
1418                root_store.add_parsable_certificates(der_certs);
1419            }
1420
1421            (CertificateAuthority::PemFile, None) => {
1422                return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" is required when \"tls_ca\" is set to \"pem_file\"."));
1423            }
1424        }
1425    }
1426
1427    let mut config = rustls::ClientConfig::builder()
1428        .with_root_certificates(root_store)
1429        .with_no_client_auth();
1430
1431    // TLS log file for debugging.
1432    // Set the SSLKEYLOGFILE env variable to a writable location.
1433    config.key_log = Arc::new(rustls::KeyLogFile::new());
1434
1435    #[cfg(feature = "insecure-skip-verify")]
1436    if !tls_verify {
1437        config
1438            .dangerous()
1439            .set_certificate_verifier(Arc::new(danger::NoCertificateVerification {}));
1440    }
1441
1442    Ok(Some(Arc::new(config)))
1443}
1444
1445fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
1446    if let Some(auto_flush) = params.get("auto_flush") {
1447        if auto_flush.as_str() != "off" {
1448            return Err(error::fmt!(
1449                ConfigError,
1450                "Invalid auto_flush value '{auto_flush}'. This client does not \
1451                support auto-flush, so the only accepted value is 'off'"
1452            ));
1453        }
1454    }
1455
1456    for &param in ["auto_flush_rows", "auto_flush_bytes"].iter() {
1457        if params.contains_key(param) {
1458            return Err(error::fmt!(
1459                ConfigError,
1460                "Invalid configuration parameter {:?}. This client does not support auto-flush",
1461                param
1462            ));
1463        }
1464    }
1465    Ok(())
1466}
1467
1468/// Protocol used to communicate with the QuestDB server.
1469#[derive(PartialEq, Debug, Clone, Copy)]
1470pub enum Protocol {
1471    /// ILP over TCP (streaming).
1472    Tcp,
1473
1474    /// TCP + TLS
1475    Tcps,
1476
1477    #[cfg(feature = "ilp-over-http")]
1478    /// ILP over HTTP (request-response, InfluxDB-compatible).
1479    Http,
1480
1481    #[cfg(feature = "ilp-over-http")]
1482    /// HTTP + TLS
1483    Https,
1484}
1485
1486impl Display for Protocol {
1487    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1488        f.write_str(self.schema())
1489    }
1490}
1491
1492impl Protocol {
1493    fn default_port(&self) -> &str {
1494        match self {
1495            Protocol::Tcp | Protocol::Tcps => "9009",
1496            #[cfg(feature = "ilp-over-http")]
1497            Protocol::Http | Protocol::Https => "9000",
1498        }
1499    }
1500
1501    fn tls_enabled(&self) -> bool {
1502        match self {
1503            Protocol::Tcp => false,
1504            Protocol::Tcps => true,
1505            #[cfg(feature = "ilp-over-http")]
1506            Protocol::Http => false,
1507            #[cfg(feature = "ilp-over-http")]
1508            Protocol::Https => true,
1509        }
1510    }
1511
1512    fn is_tcpx(&self) -> bool {
1513        match self {
1514            Protocol::Tcp => true,
1515            Protocol::Tcps => true,
1516            #[cfg(feature = "ilp-over-http")]
1517            Protocol::Http => false,
1518            #[cfg(feature = "ilp-over-http")]
1519            Protocol::Https => false,
1520        }
1521    }
1522
1523    #[cfg(feature = "ilp-over-http")]
1524    fn is_httpx(&self) -> bool {
1525        match self {
1526            Protocol::Tcp => false,
1527            Protocol::Tcps => false,
1528            Protocol::Http => true,
1529            Protocol::Https => true,
1530        }
1531    }
1532
1533    fn schema(&self) -> &str {
1534        match self {
1535            Protocol::Tcp => "tcp",
1536            Protocol::Tcps => "tcps",
1537            #[cfg(feature = "ilp-over-http")]
1538            Protocol::Http => "http",
1539            #[cfg(feature = "ilp-over-http")]
1540            Protocol::Https => "https",
1541        }
1542    }
1543
1544    fn from_schema(schema: &str) -> Result<Self> {
1545        match schema {
1546            "tcp" => Ok(Protocol::Tcp),
1547            "tcps" => Ok(Protocol::Tcps),
1548            #[cfg(feature = "ilp-over-http")]
1549            "http" => Ok(Protocol::Http),
1550            #[cfg(feature = "ilp-over-http")]
1551            "https" => Ok(Protocol::Https),
1552            _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
1553        }
1554    }
1555}
1556
1557/// Accumulates parameters for a new `Sender` instance.
1558///
1559/// You can also create the builder from a config string or the `QDB_CLIENT_CONF`
1560/// environment variable.
1561///
1562#[cfg_attr(
1563    feature = "ilp-over-http",
1564    doc = r##"
1565```no_run
1566# use questdb::Result;
1567use questdb::ingress::{Protocol, SenderBuilder};
1568# fn main() -> Result<()> {
1569let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9009).build()?;
1570# Ok(())
1571# }
1572```
1573"##
1574)]
1575///
1576/// ```no_run
1577/// # use questdb::Result;
1578/// use questdb::ingress::{Protocol, SenderBuilder};
1579///
1580/// # fn main() -> Result<()> {
1581/// let mut sender = SenderBuilder::new(Protocol::Tcp, "localhost", 9009).build()?;
1582/// # Ok(())
1583/// # }
1584/// ```
1585///
1586/// ```no_run
1587/// # use questdb::Result;
1588/// use questdb::ingress::SenderBuilder;
1589///
1590/// # fn main() -> Result<()> {
1591/// let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
1592/// # Ok(())
1593/// # }
1594/// ```
1595///
1596/// ```no_run
1597/// # use questdb::Result;
1598/// use questdb::ingress::SenderBuilder;
1599///
1600/// # fn main() -> Result<()> {
1601/// // export QDB_CLIENT_CONF="https::addr=localhost:9000;"
1602/// let mut sender = SenderBuilder::from_env()?.build()?;
1603/// # Ok(())
1604/// # }
1605/// ```
1606///
1607#[derive(Debug, Clone)]
1608pub struct SenderBuilder {
1609    protocol: Protocol,
1610    host: ConfigSetting<String>,
1611    port: ConfigSetting<String>,
1612    net_interface: ConfigSetting<Option<String>>,
1613    max_buf_size: ConfigSetting<usize>,
1614    auth_timeout: ConfigSetting<Duration>,
1615    username: ConfigSetting<Option<String>>,
1616    password: ConfigSetting<Option<String>>,
1617    token: ConfigSetting<Option<String>>,
1618    token_x: ConfigSetting<Option<String>>,
1619    token_y: ConfigSetting<Option<String>>,
1620
1621    #[cfg(feature = "insecure-skip-verify")]
1622    tls_verify: ConfigSetting<bool>,
1623
1624    tls_ca: ConfigSetting<CertificateAuthority>,
1625    tls_roots: ConfigSetting<Option<PathBuf>>,
1626
1627    #[cfg(feature = "ilp-over-http")]
1628    http: Option<HttpConfig>,
1629}
1630
1631impl SenderBuilder {
1632    /// Create a new `SenderBuilder` instance from the configuration string.
1633    ///
1634    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
1635    ///
1636    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
1637    ///
1638    /// We recommend HTTP for most cases because it provides more features, like
1639    /// reporting errors to the client and supporting transaction control. TCP can
1640    /// sometimes be faster in higher-latency networks, but misses a number of
1641    /// features.
1642    ///
1643    /// The accepted keys match one-for-one with the methods on `SenderBuilder`.
1644    /// For example, this is a valid configuration string:
1645    ///
1646    /// "https::addr=host:port;username=alice;password=secret;"
1647    ///
1648    /// and there are matching methods [SenderBuilder::username] and
1649    /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the
1650    /// `SenderBuilder` constructor, so there's no matching method for that.
1651    ///
1652    /// You can also load the configuration from an environment variable. See
1653    /// [`SenderBuilder::from_env`].
1654    ///
1655    /// Once you have a `SenderBuilder` instance, you can further customize it
1656    /// before calling [`SenderBuilder::build`], but you can't change any settings
1657    /// that are already set in the config string.
1658    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
1659        let conf = conf.as_ref();
1660        let conf = questdb_confstr::parse_conf_str(conf)
1661            .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
1662        let service = conf.service();
1663        let params = conf.params();
1664
1665        let protocol = Protocol::from_schema(service)?;
1666
1667        let Some(addr) = params.get("addr") else {
1668            return Err(error::fmt!(
1669                ConfigError,
1670                "Missing \"addr\" parameter in config string"
1671            ));
1672        };
1673        let (host, port) = match addr.split_once(':') {
1674            Some((h, p)) => (h, p),
1675            None => (addr.as_str(), protocol.default_port()),
1676        };
1677        let mut builder = SenderBuilder::new(protocol, host, port);
1678
1679        validate_auto_flush_params(params)?;
1680
1681        for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
1682            builder = match key {
1683                "username" => builder.username(val)?,
1684                "password" => builder.password(val)?,
1685                "token" => builder.token(val)?,
1686                "token_x" => builder.token_x(val)?,
1687                "token_y" => builder.token_y(val)?,
1688                "bind_interface" => builder.bind_interface(val)?,
1689
1690                "init_buf_size" => {
1691                    return Err(error::fmt!(
1692                        ConfigError,
1693                        "\"init_buf_size\" is not supported in config string"
1694                    ))
1695                }
1696
1697                "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
1698
1699                "auth_timeout" => {
1700                    builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1701                }
1702
1703                "tls_verify" => {
1704                    let verify = match val {
1705                        "on" => true,
1706                        "unsafe_off" => false,
1707                        _ => {
1708                            return Err(error::fmt!(
1709                                ConfigError,
1710                                r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
1711                            ))
1712                        }
1713                    };
1714
1715                    #[cfg(not(feature = "insecure-skip-verify"))]
1716                    {
1717                        if !verify {
1718                            return Err(error::fmt!(
1719                                ConfigError,
1720                                r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
1721                            ));
1722                        }
1723                        builder
1724                    }
1725
1726                    #[cfg(feature = "insecure-skip-verify")]
1727                    builder.tls_verify(verify)?
1728                }
1729
1730                "tls_ca" => {
1731                    let ca = match val {
1732                        #[cfg(feature = "tls-webpki-certs")]
1733                        "webpki_roots" => CertificateAuthority::WebpkiRoots,
1734
1735                        #[cfg(not(feature = "tls-webpki-certs"))]
1736                        "webpki_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature")),
1737
1738                        #[cfg(feature = "tls-native-certs")]
1739                        "os_roots" => CertificateAuthority::OsRoots,
1740
1741                        #[cfg(not(feature = "tls-native-certs"))]
1742                        "os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature")),
1743
1744                        #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1745                        "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
1746
1747                        #[cfg(not(all(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1748                        "webpki_and_os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features")),
1749
1750                        _ => return Err(error::fmt!(ConfigError, "Invalid value {val:?} for \"tls_ca\"")),
1751                    };
1752                    builder.tls_ca(ca)?
1753                }
1754
1755                "tls_roots" => {
1756                    let path = PathBuf::from_str(val).map_err(|e| {
1757                        error::fmt!(
1758                            ConfigError,
1759                            "Invalid path {:?} for \"tls_roots\": {}",
1760                            val,
1761                            e
1762                        )
1763                    })?;
1764                    builder.tls_roots(path)?
1765                }
1766
1767                "tls_roots_password" => {
1768                    return Err(error::fmt!(
1769                        ConfigError,
1770                        "\"tls_roots_password\" is not supported."
1771                    ))
1772                }
1773
1774                #[cfg(feature = "ilp-over-http")]
1775                "request_min_throughput" => {
1776                    builder.request_min_throughput(parse_conf_value(key, val)?)?
1777                }
1778
1779                #[cfg(feature = "ilp-over-http")]
1780                "request_timeout" => {
1781                    builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1782                }
1783
1784                #[cfg(feature = "ilp-over-http")]
1785                "retry_timeout" => {
1786                    builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1787                }
1788                // Ignore other parameters.
1789                // We don't want to fail on unknown keys as this would require releasing different
1790                // library implementations in lock step as soon as a new parameter is added to any of them,
1791                // even if it's not used.
1792                _ => builder,
1793            };
1794        }
1795
1796        Ok(builder)
1797    }
1798
1799    /// Create a new `SenderBuilder` instance from the configuration from the
1800    /// configuration stored in the `QDB_CLIENT_CONF` environment variable.
1801    ///
1802    /// The format of the string is the same as for [`SenderBuilder::from_conf`].
1803    pub fn from_env() -> Result<Self> {
1804        let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
1805            error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
1806        })?;
1807        Self::from_conf(conf)
1808    }
1809
1810    /// Create a new `SenderBuilder` instance with the provided QuestDB
1811    /// server and port, using ILP over the specified protocol.
1812    ///
1813    /// ```no_run
1814    /// # use questdb::Result;
1815    /// use questdb::ingress::{Protocol, SenderBuilder};
1816    ///
1817    /// # fn main() -> Result<()> {
1818    /// let mut sender = SenderBuilder::new(
1819    ///     Protocol::Tcp, "localhost", 9009).build()?;
1820    /// # Ok(())
1821    /// # }
1822    /// ```
1823    pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
1824        let host = host.into();
1825        let port: Port = port.into();
1826        let port = port.0;
1827
1828        #[cfg(feature = "tls-webpki-certs")]
1829        let tls_ca = CertificateAuthority::WebpkiRoots;
1830
1831        #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
1832        let tls_ca = CertificateAuthority::OsRoots;
1833
1834        #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1835        let tls_ca = CertificateAuthority::PemFile;
1836
1837        Self {
1838            protocol,
1839            host: ConfigSetting::new_specified(host),
1840            port: ConfigSetting::new_specified(port),
1841            net_interface: ConfigSetting::new_default(None),
1842            max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
1843            auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
1844            username: ConfigSetting::new_default(None),
1845            password: ConfigSetting::new_default(None),
1846            token: ConfigSetting::new_default(None),
1847            token_x: ConfigSetting::new_default(None),
1848            token_y: ConfigSetting::new_default(None),
1849
1850            #[cfg(feature = "insecure-skip-verify")]
1851            tls_verify: ConfigSetting::new_default(true),
1852
1853            tls_ca: ConfigSetting::new_default(tls_ca),
1854            tls_roots: ConfigSetting::new_default(None),
1855
1856            #[cfg(feature = "ilp-over-http")]
1857            http: if protocol.is_httpx() {
1858                Some(HttpConfig::default())
1859            } else {
1860                None
1861            },
1862        }
1863    }
1864
1865    /// Select local outbound interface.
1866    ///
1867    /// This may be relevant if your machine has multiple network interfaces.
1868    ///
1869    /// The default is `"0.0.0.0"`.
1870    pub fn bind_interface<I: Into<String>>(mut self, addr: I) -> Result<Self> {
1871        self.ensure_is_tcpx("bind_interface")?;
1872        self.net_interface
1873            .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
1874        Ok(self)
1875    }
1876
1877    /// Set the username for authentication.
1878    ///
1879    /// For TCP, this is the `kid` part of the ECDSA key set.
1880    /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x),
1881    /// and [`token_y`](SenderBuilder::token_y).
1882    ///
1883    /// For HTTP, this is a part of basic authentication.
1884    /// See also: [`password`](SenderBuilder::password).
1885    pub fn username(mut self, username: &str) -> Result<Self> {
1886        self.username
1887            .set_specified("username", Some(validate_value(username.to_string())?))?;
1888        Ok(self)
1889    }
1890
1891    /// Set the password for basic HTTP authentication.
1892    /// See also: [`username`](SenderBuilder::username).
1893    pub fn password(mut self, password: &str) -> Result<Self> {
1894        self.password
1895            .set_specified("password", Some(validate_value(password.to_string())?))?;
1896        Ok(self)
1897    }
1898
1899    /// Set the Token (Bearer) Authentication parameter for HTTP,
1900    /// or the ECDSA private key for TCP authentication.
1901    pub fn token(mut self, token: &str) -> Result<Self> {
1902        self.token
1903            .set_specified("token", Some(validate_value(token.to_string())?))?;
1904        Ok(self)
1905    }
1906
1907    /// Set the ECDSA public key X for TCP authentication.
1908    pub fn token_x(mut self, token_x: &str) -> Result<Self> {
1909        self.token_x
1910            .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
1911        Ok(self)
1912    }
1913
1914    /// Set the ECDSA public key Y for TCP authentication.
1915    pub fn token_y(mut self, token_y: &str) -> Result<Self> {
1916        self.token_y
1917            .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
1918        Ok(self)
1919    }
1920
1921    /// Configure how long to wait for messages from the QuestDB server during
1922    /// the TLS handshake and authentication process. This only applies to TCP.
1923    /// The default is 15 seconds.
1924    pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
1925        self.auth_timeout.set_specified("auth_timeout", value)?;
1926        Ok(self)
1927    }
1928
1929    /// Ensure that TLS is enabled for the protocol.
1930    pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
1931        if !self.protocol.tls_enabled() {
1932            return Err(error::fmt!(
1933                ConfigError,
1934                "Cannot set {property:?}: TLS is not supported for protocol {}",
1935                self.protocol
1936            ));
1937        }
1938        Ok(())
1939    }
1940
1941    /// Set to `false` to disable TLS certificate verification.
1942    /// This should only be used for debugging purposes as it reduces security.
1943    ///
1944    /// For testing, consider specifying a path to a `.pem` file instead via
1945    /// the [`tls_roots`](SenderBuilder::tls_roots) method.
1946    #[cfg(feature = "insecure-skip-verify")]
1947    pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
1948        self.ensure_tls_enabled("tls_verify")?;
1949        self.tls_verify.set_specified("tls_verify", verify)?;
1950        Ok(self)
1951    }
1952
1953    /// Specify where to find the root certificate used to validate the
1954    /// server's TLS certificate.
1955    pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
1956        self.ensure_tls_enabled("tls_ca")?;
1957        self.tls_ca.set_specified("tls_ca", ca)?;
1958        Ok(self)
1959    }
1960
1961    /// Set the path to a custom root certificate `.pem` file.
1962    /// This is used to validate the server's certificate during the TLS handshake.
1963    ///
1964    /// See notes on how to test with [self-signed
1965    /// certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs).
1966    pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
1967        let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
1968        let path = path.into();
1969        // Attempt to read the file here to catch any issues early.
1970        let _file = std::fs::File::open(&path).map_err(|io_err| {
1971            error::fmt!(
1972                ConfigError,
1973                "Could not open root certificate file from path {:?}: {}",
1974                path,
1975                io_err
1976            )
1977        })?;
1978        builder.tls_roots.set_specified("tls_roots", Some(path))?;
1979        Ok(builder)
1980    }
1981
1982    /// The maximum buffer size in bytes that the client will flush to the server.
1983    /// The default is 100 MiB.
1984    pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
1985        let min = 1024;
1986        if value < min {
1987            return Err(error::fmt!(
1988                ConfigError,
1989                "max_buf_size\" must be at least {min} bytes."
1990            ));
1991        }
1992        self.max_buf_size.set_specified("max_buf_size", value)?;
1993        Ok(self)
1994    }
1995
1996    #[cfg(feature = "ilp-over-http")]
1997    /// Set the cumulative duration spent in retries.
1998    /// The value is in milliseconds, and the default is 10 seconds.
1999    pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
2000        if let Some(http) = &mut self.http {
2001            http.retry_timeout.set_specified("retry_timeout", value)?;
2002        } else {
2003            return Err(error::fmt!(
2004                ConfigError,
2005                "retry_timeout is supported only in ILP over HTTP."
2006            ));
2007        }
2008        Ok(self)
2009    }
2010
2011    #[cfg(feature = "ilp-over-http")]
2012    /// Set the minimum acceptable throughput while sending a buffer to the server.
2013    /// The sender will divide the payload size by this number to determine for how
2014    /// long to keep sending the payload before timing out.
2015    /// The value is in bytes per second, and the default is 100 KiB/s.
2016    /// The timeout calculated from minimum throughput is adedd to the value of
2017    /// [`request_timeout`](SenderBuilder::request_timeout) to get the total timeout
2018    /// value.
2019    /// A value of 0 disables this feature, so it's similar to setting "infinite"
2020    /// minimum throughput. The total timeout will then be equal to `request_timeout`.
2021    pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
2022        if let Some(http) = &mut self.http {
2023            http.request_min_throughput
2024                .set_specified("request_min_throughput", value)?;
2025        } else {
2026            return Err(error::fmt!(
2027                ConfigError,
2028                "\"request_min_throughput\" is supported only in ILP over HTTP."
2029            ));
2030        }
2031        Ok(self)
2032    }
2033
2034    #[cfg(feature = "ilp-over-http")]
2035    /// Additional time to wait on top of that calculated from the minimum throughput.
2036    /// This accounts for the fixed latency of the HTTP request-response roundtrip.
2037    /// The default is 10 seconds.
2038    /// See also: [`request_min_throughput`](SenderBuilder::request_min_throughput).
2039    pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
2040        if let Some(http) = &mut self.http {
2041            if value.is_zero() {
2042                return Err(error::fmt!(
2043                    ConfigError,
2044                    "\"request_timeout\" must be greater than 0."
2045                ));
2046            }
2047            http.request_timeout
2048                .set_specified("request_timeout", value)?;
2049        } else {
2050            return Err(error::fmt!(
2051                ConfigError,
2052                "\"request_timeout\" is supported only in ILP over HTTP."
2053            ));
2054        }
2055        Ok(self)
2056    }
2057
2058    #[cfg(feature = "ilp-over-http")]
2059    /// Internal API, do not use.
2060    /// This is exposed exclusively for the Python client.
2061    /// We (QuestDB) use this to help us debug which client is being used if we encounter issues.
2062    #[doc(hidden)]
2063    pub fn user_agent(mut self, value: &str) -> Result<Self> {
2064        let value = validate_value(value)?;
2065        if let Some(http) = &mut self.http {
2066            http.user_agent = value.to_string();
2067        }
2068        Ok(self)
2069    }
2070
2071    fn connect_tcp(&self, auth: &Option<AuthParams>) -> Result<ProtocolHandler> {
2072        let addr: SockAddr = gai::resolve_host_port(self.host.as_str(), self.port.as_str())?;
2073        let mut sock = Socket::new(Domain::IPV4, Type::STREAM, Some(SockProtocol::TCP))
2074            .map_err(|io_err| map_io_to_socket_err("Could not open TCP socket: ", io_err))?;
2075
2076        // See: https://idea.popcount.org/2014-04-03-bind-before-connect/
2077        // We set `SO_REUSEADDR` on the outbound socket to avoid issues where a client may exhaust
2078        // their interface's ports. See: https://github.com/questdb/py-questdb-client/issues/21
2079        sock.set_reuse_address(true)
2080            .map_err(|io_err| map_io_to_socket_err("Could not set SO_REUSEADDR: ", io_err))?;
2081
2082        sock.set_linger(Some(Duration::from_secs(120)))
2083            .map_err(|io_err| map_io_to_socket_err("Could not set socket linger: ", io_err))?;
2084        sock.set_keepalive(true)
2085            .map_err(|io_err| map_io_to_socket_err("Could not set SO_KEEPALIVE: ", io_err))?;
2086        sock.set_nodelay(true)
2087            .map_err(|io_err| map_io_to_socket_err("Could not set TCP_NODELAY: ", io_err))?;
2088        if let Some(ref host) = self.net_interface.deref() {
2089            let bind_addr = gai::resolve_host(host.as_str())?;
2090            sock.bind(&bind_addr).map_err(|io_err| {
2091                map_io_to_socket_err(
2092                    &format!("Could not bind to interface address {:?}: ", host),
2093                    io_err,
2094                )
2095            })?;
2096        }
2097        sock.connect(&addr).map_err(|io_err| {
2098            let host_port = format!("{}:{}", self.host.deref(), *self.port);
2099            let prefix = format!("Could not connect to {:?}: ", host_port);
2100            map_io_to_socket_err(&prefix, io_err)
2101        })?;
2102
2103        // We read during both TLS handshake and authentication.
2104        // We set up a read timeout to prevent the client from "hanging"
2105        // should we be connecting to a server configured in a different way
2106        // from the client.
2107        sock.set_read_timeout(Some(*self.auth_timeout))
2108            .map_err(|io_err| {
2109                map_io_to_socket_err("Failed to set read timeout on socket: ", io_err)
2110            })?;
2111
2112        #[cfg(feature = "insecure-skip-verify")]
2113        let tls_verify = *self.tls_verify;
2114
2115        #[cfg(not(feature = "insecure-skip-verify"))]
2116        let tls_verify = true;
2117
2118        let mut conn = match configure_tls(
2119            self.protocol.tls_enabled(),
2120            tls_verify,
2121            *self.tls_ca,
2122            self.tls_roots.deref(),
2123        )? {
2124            Some(tls_config) => {
2125                let server_name: ServerName = ServerName::try_from(self.host.as_str())
2126                    .map_err(|inv_dns_err| error::fmt!(TlsError, "Bad host: {}", inv_dns_err))?
2127                    .to_owned();
2128                let mut tls_conn =
2129                    ClientConnection::new(tls_config, server_name).map_err(|rustls_err| {
2130                        error::fmt!(TlsError, "Could not create TLS client: {}", rustls_err)
2131                    })?;
2132                while tls_conn.wants_write() || tls_conn.is_handshaking() {
2133                    tls_conn.complete_io(&mut sock).map_err(|io_err| {
2134                        if (io_err.kind() == ErrorKind::TimedOut)
2135                            || (io_err.kind() == ErrorKind::WouldBlock)
2136                        {
2137                            error::fmt!(
2138                                TlsError,
2139                                concat!(
2140                                    "Failed to complete TLS handshake:",
2141                                    " Timed out waiting for server ",
2142                                    "response after {:?}."
2143                                ),
2144                                *self.auth_timeout
2145                            )
2146                        } else {
2147                            error::fmt!(TlsError, "Failed to complete TLS handshake: {}", io_err)
2148                        }
2149                    })?;
2150                }
2151                Connection::Tls(StreamOwned::new(tls_conn, sock).into())
2152            }
2153            None => Connection::Direct(sock),
2154        };
2155
2156        if let Some(AuthParams::Ecdsa(auth)) = auth {
2157            conn.authenticate(auth)?;
2158        }
2159
2160        Ok(ProtocolHandler::Socket(conn))
2161    }
2162
2163    fn build_auth(&self) -> Result<Option<AuthParams>> {
2164        match (
2165            self.protocol,
2166            self.username.deref(),
2167            self.password.deref(),
2168            self.token.deref(),
2169            self.token_x.deref(),
2170            self.token_y.deref(),
2171        ) {
2172            (_, None, None, None, None, None) => Ok(None),
2173            (
2174                protocol,
2175                Some(username),
2176                None,
2177                Some(token),
2178                Some(token_x),
2179                Some(token_y),
2180            ) if protocol.is_tcpx() => Ok(Some(AuthParams::Ecdsa(EcdsaAuthParams {
2181                key_id: username.to_string(),
2182                priv_key: token.to_string(),
2183                pub_key_x: token_x.to_string(),
2184                pub_key_y: token_y.to_string(),
2185            }))),
2186            (protocol, Some(_username), Some(_password), None, None, None)
2187                if protocol.is_tcpx() => {
2188                Err(error::fmt!(ConfigError,
2189                    r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
2190                ))
2191            }
2192            (protocol, None, None, Some(_token), None, None)
2193                if protocol.is_tcpx() => {
2194                Err(error::fmt!(ConfigError, "Token authentication only be used with the ILP/HTTP protocol."))
2195            }
2196            (protocol, _username, None, _token, _token_x, _token_y)
2197                if protocol.is_tcpx() => {
2198                Err(error::fmt!(ConfigError,
2199                    r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
2200                ))
2201            }
2202            #[cfg(feature = "ilp-over-http")]
2203            (protocol, Some(username), Some(password), None, None, None)
2204                if protocol.is_httpx() => {
2205                Ok(Some(AuthParams::Basic(BasicAuthParams {
2206                    username: username.to_string(),
2207                    password: password.to_string(),
2208                })))
2209            }
2210            #[cfg(feature = "ilp-over-http")]
2211            (protocol, Some(_username), None, None, None, None)
2212                if protocol.is_httpx() => {
2213                Err(error::fmt!(ConfigError,
2214                    r##"Basic authentication parameter "username" is present, but "password" is missing."##,
2215                ))
2216            }
2217            #[cfg(feature = "ilp-over-http")]
2218            (protocol, None, Some(_password), None, None, None)
2219                if protocol.is_httpx() => {
2220                Err(error::fmt!(ConfigError,
2221                    r##"Basic authentication parameter "password" is present, but "username" is missing."##,
2222                ))
2223            }
2224            #[cfg(feature = "ilp-over-http")]
2225            (protocol, None, None, Some(token), None, None)
2226                if protocol.is_httpx() => {
2227                Ok(Some(AuthParams::Token(TokenAuthParams {
2228                    token: token.to_string(),
2229                })))
2230            }
2231            #[cfg(feature = "ilp-over-http")]
2232            (
2233                protocol,
2234                Some(_username),
2235                None,
2236                Some(_token),
2237                Some(_token_x),
2238                Some(_token_y),
2239            ) if protocol.is_httpx() => {
2240                Err(error::fmt!(ConfigError, "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."))
2241            }
2242            #[cfg(feature = "ilp-over-http")]
2243            (protocol, _username, _password, _token, None, None)
2244                if protocol.is_httpx() => {
2245                Err(error::fmt!(ConfigError,
2246                    r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
2247                ))
2248            }
2249            _ => {
2250                Err(error::fmt!(ConfigError,
2251                    r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
2252                ))
2253            }
2254        }
2255    }
2256
2257    /// Build the sender.
2258    ///
2259    /// In the case of TCP, this synchronously establishes the TCP connection, and
2260    /// returns once the connection is fully established. If the connection
2261    /// requires authentication or TLS, these will also be completed before
2262    /// returning.
2263    pub fn build(&self) -> Result<Sender> {
2264        let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
2265
2266        if self.protocol.tls_enabled() {
2267            write!(descr, "tls=enabled,").unwrap();
2268        } else {
2269            write!(descr, "tls=disabled,").unwrap();
2270        }
2271
2272        let auth = self.build_auth()?;
2273
2274        let handler = match self.protocol {
2275            Protocol::Tcp | Protocol::Tcps => self.connect_tcp(&auth)?,
2276            #[cfg(feature = "ilp-over-http")]
2277            Protocol::Http | Protocol::Https => {
2278                if self.net_interface.is_some() {
2279                    // See: https://github.com/algesten/ureq/issues/692
2280                    return Err(error::fmt!(
2281                        InvalidApiCall,
2282                        "net_interface is not supported for ILP over HTTP."
2283                    ));
2284                }
2285
2286                let http_config = self.http.as_ref().unwrap();
2287                let user_agent = http_config.user_agent.as_str();
2288                let agent_builder = ureq::AgentBuilder::new()
2289                    .user_agent(user_agent)
2290                    .no_delay(true);
2291
2292                #[cfg(feature = "insecure-skip-verify")]
2293                let tls_verify = *self.tls_verify;
2294
2295                #[cfg(not(feature = "insecure-skip-verify"))]
2296                let tls_verify = true;
2297
2298                let agent_builder = match configure_tls(
2299                    self.protocol.tls_enabled(),
2300                    tls_verify,
2301                    *self.tls_ca,
2302                    self.tls_roots.deref(),
2303                )? {
2304                    Some(tls_config) => agent_builder.tls_config(tls_config),
2305                    None => agent_builder,
2306                };
2307                let auth = match auth {
2308                    Some(AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
2309                    Some(AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
2310                    Some(AuthParams::Ecdsa(_)) => {
2311                        return Err(error::fmt!(
2312                            AuthError,
2313                            "ECDSA authentication is not supported for ILP over HTTP. \
2314                            Please use basic or token authentication instead."
2315                        ));
2316                    }
2317                    None => None,
2318                };
2319                let agent_builder =
2320                    agent_builder.timeout_connect(*http_config.request_timeout.deref());
2321                let agent = agent_builder.build();
2322                let proto = self.protocol.schema();
2323                let url = format!(
2324                    "{}://{}:{}/write",
2325                    proto,
2326                    self.host.deref(),
2327                    self.port.deref()
2328                );
2329                ProtocolHandler::Http(HttpHandlerState {
2330                    agent,
2331                    url,
2332                    auth,
2333
2334                    config: self.http.as_ref().unwrap().clone(),
2335                })
2336            }
2337        };
2338
2339        if auth.is_some() {
2340            descr.push_str("auth=on]");
2341        } else {
2342            descr.push_str("auth=off]");
2343        }
2344
2345        let sender = Sender {
2346            descr,
2347            handler,
2348            connected: true,
2349            max_buf_size: *self.max_buf_size,
2350        };
2351
2352        Ok(sender)
2353    }
2354
2355    fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
2356        if self.protocol.is_tcpx() {
2357            Ok(())
2358        } else {
2359            Err(error::fmt!(
2360                ConfigError,
2361                "The {param_name:?} setting can only be used with the TCP protocol."
2362            ))
2363        }
2364    }
2365}
2366
2367/// When parsing from config, we exclude certain characters.
2368/// Here we repeat the same validation logic for consistency.
2369fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
2370    let str_ref = value.as_ref();
2371    for (p, c) in str_ref.chars().enumerate() {
2372        if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
2373            return Err(error::fmt!(
2374                ConfigError,
2375                "Invalid character {c:?} at position {p}"
2376            ));
2377        }
2378    }
2379    Ok(value)
2380}
2381
2382fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
2383where
2384    T: FromStr,
2385    T::Err: std::fmt::Debug,
2386{
2387    str_value.parse().map_err(|e| {
2388        error::fmt!(
2389            ConfigError,
2390            "Could not parse {param_name:?} to number: {e:?}"
2391        )
2392    })
2393}
2394
2395fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
2396    Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
2397        error::fmt!(
2398            AuthError,
2399            "Misconfigured ILP authentication keys. Could not decode {}: {}. \
2400            Hint: Check the keys for a possible typo.",
2401            descr,
2402            b64_err
2403        )
2404    })
2405}
2406
2407fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
2408    let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
2409    let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
2410
2411    // SEC 1 Uncompressed Octet-String-to-Elliptic-Curve-Point Encoding
2412    let mut encoded = Vec::new();
2413    encoded.push(4u8); // 0x04 magic byte that identifies this as uncompressed.
2414    let pub_key_x_ken = pub_key_x.len();
2415    if pub_key_x_ken > 32 {
2416        return Err(error::fmt!(
2417            AuthError,
2418            "Misconfigured ILP authentication keys. Public key x is too long. \
2419            Hint: Check the keys for a possible typo."
2420        ));
2421    }
2422    let pub_key_y_len = pub_key_y.len();
2423    if pub_key_y_len > 32 {
2424        return Err(error::fmt!(
2425            AuthError,
2426            "Misconfigured ILP authentication keys. Public key y is too long. \
2427            Hint: Check the keys for a possible typo."
2428        ));
2429    }
2430    encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
2431    encoded.append(&mut pub_key_x);
2432    encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
2433    encoded.append(&mut pub_key_y);
2434    Ok(encoded)
2435}
2436
2437fn parse_key_pair(auth: &EcdsaAuthParams) -> Result<EcdsaKeyPair> {
2438    let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
2439    let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
2440    let system_random = SystemRandom::new();
2441    EcdsaKeyPair::from_private_key_and_public_key(
2442        &ECDSA_P256_SHA256_FIXED_SIGNING,
2443        &private_key[..],
2444        &public_key[..],
2445        &system_random,
2446    )
2447    .map_err(|key_rejected| {
2448        error::fmt!(
2449            AuthError,
2450            "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
2451            key_rejected
2452        )
2453    })
2454}
2455
2456pub(crate) struct F64Serializer {
2457    buf: ryu::Buffer,
2458    n: f64,
2459}
2460
2461impl F64Serializer {
2462    pub(crate) fn new(n: f64) -> Self {
2463        F64Serializer {
2464            buf: ryu::Buffer::new(),
2465            n,
2466        }
2467    }
2468
2469    // This function was taken and customized from the ryu crate.
2470    #[cold]
2471    #[cfg_attr(feature = "no-panic", inline)]
2472    fn format_nonfinite(&self) -> &'static str {
2473        const MANTISSA_MASK: u64 = 0x000fffffffffffff;
2474        const SIGN_MASK: u64 = 0x8000000000000000;
2475        let bits = self.n.to_bits();
2476        if bits & MANTISSA_MASK != 0 {
2477            "NaN"
2478        } else if bits & SIGN_MASK != 0 {
2479            "-Infinity"
2480        } else {
2481            "Infinity"
2482        }
2483    }
2484
2485    pub(crate) fn as_str(&mut self) -> &str {
2486        if self.n.is_finite() {
2487            self.buf.format_finite(self.n)
2488        } else {
2489            self.format_nonfinite()
2490        }
2491    }
2492}
2493
2494impl Sender {
2495    /// Create a new `Sender` instance from the given configuration string.
2496    ///
2497    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
2498    ///
2499    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
2500    ///
2501    /// We recommend HTTP for most cases because it provides more features, like
2502    /// reporting errors to the client and supporting transaction control. TCP can
2503    /// sometimes be faster in higher-latency networks, but misses a number of
2504    /// features.
2505    ///
2506    /// Keys in the config string correspond to same-named methods on `SenderBuilder`.
2507    ///
2508    /// For the full list of keys and values, see the docs on [`SenderBuilder`].
2509    ///
2510    /// You can also load the configuration from an environment variable.
2511    /// See [`Sender::from_env`].
2512    ///
2513    /// In the case of TCP, this synchronously establishes the TCP connection, and
2514    /// returns once the connection is fully established. If the connection
2515    /// requires authentication or TLS, these will also be completed before
2516    /// returning.
2517    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
2518        SenderBuilder::from_conf(conf)?.build()
2519    }
2520
2521    /// Create a new `Sender` from the configuration stored in the `QDB_CLIENT_CONF`
2522    /// environment variable. The format is the same as that accepted by
2523    /// [`Sender::from_conf`].
2524    ///
2525    /// In the case of TCP, this synchronously establishes the TCP connection, and
2526    /// returns once the connection is fully established. If the connection
2527    /// requires authentication or TLS, these will also be completed before
2528    /// returning.
2529    pub fn from_env() -> Result<Self> {
2530        SenderBuilder::from_env()?.build()
2531    }
2532
2533    #[allow(unused_variables)]
2534    fn flush_impl(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2535        if !self.connected {
2536            return Err(error::fmt!(
2537                SocketError,
2538                "Could not flush buffer: not connected to database."
2539            ));
2540        }
2541        buf.check_op(Op::Flush)?;
2542
2543        if buf.len() > self.max_buf_size {
2544            return Err(error::fmt!(
2545                InvalidApiCall,
2546                "Could not flush buffer: Buffer size of {} exceeds maximum configured allowed size of {} bytes.",
2547                buf.len(),
2548                self.max_buf_size
2549            ));
2550        }
2551
2552        let bytes = buf.as_str().as_bytes();
2553        if bytes.is_empty() {
2554            return Ok(());
2555        }
2556        match self.handler {
2557            ProtocolHandler::Socket(ref mut conn) => {
2558                if transactional {
2559                    return Err(error::fmt!(
2560                        InvalidApiCall,
2561                        "Transactional flushes are not supported for ILP over TCP."
2562                    ));
2563                }
2564                conn.write_all(bytes).map_err(|io_err| {
2565                    self.connected = false;
2566                    map_io_to_socket_err("Could not flush buffer: ", io_err)
2567                })?;
2568            }
2569            #[cfg(feature = "ilp-over-http")]
2570            ProtocolHandler::Http(ref state) => {
2571                if transactional && !buf.transactional() {
2572                    return Err(error::fmt!(
2573                        InvalidApiCall,
2574                        "Buffer contains lines for multiple tables. \
2575                        Transactional flushes are only supported for buffers containing lines for a single table."
2576                    ));
2577                }
2578                let request_min_throughput = *state.config.request_min_throughput;
2579                let extra_time = if request_min_throughput > 0 {
2580                    (bytes.len() as f64) / (request_min_throughput as f64)
2581                } else {
2582                    0.0f64
2583                };
2584                let timeout = *state.config.request_timeout + Duration::from_secs_f64(extra_time);
2585                let request = state
2586                    .agent
2587                    .post(&state.url)
2588                    .query_pairs([("precision", "n")])
2589                    .timeout(timeout)
2590                    .set("Content-Type", "text/plain; charset=utf-8");
2591                let request = match state.auth.as_ref() {
2592                    Some(auth) => request.set("Authorization", auth),
2593                    None => request,
2594                };
2595                let response_or_err =
2596                    http_send_with_retries(request, bytes, *state.config.retry_timeout);
2597                match response_or_err {
2598                    Ok(_response) => {
2599                        // on success, there's no information in the response.
2600                    }
2601                    Err(ureq::Error::Status(http_status_code, response)) => {
2602                        return Err(parse_http_error(http_status_code, response));
2603                    }
2604                    Err(ureq::Error::Transport(transport)) => {
2605                        return Err(error::fmt!(
2606                            SocketError,
2607                            "Could not flush buffer: {}",
2608                            transport
2609                        ));
2610                    }
2611                }
2612            }
2613        }
2614        Ok(())
2615    }
2616
2617    /// Send the batch of rows in the buffer to the QuestDB server, and, if the
2618    /// `transactional` parameter is true, ensure the flush will be transactional.
2619    ///
2620    /// A flush is transactional iff all the rows belong to the same table. This allows
2621    /// QuestDB to treat the flush as a single database transaction, because it doesn't
2622    /// support transactions spanning multiple tables. Additionally, only ILP-over-HTTP
2623    /// supports transactional flushes.
2624    ///
2625    /// If the flush wouldn't be transactional, this function returns an error and
2626    /// doesn't flush any data.
2627    ///
2628    /// The function sends an HTTP request and waits for the response. If the server
2629    /// responds with an error, it returns a descriptive error. In the case of a network
2630    /// error, it retries until it has exhausted the retry time budget.
2631    ///
2632    /// All the data stays in the buffer. Clear the buffer before starting a new batch.
2633    #[cfg(feature = "ilp-over-http")]
2634    pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2635        self.flush_impl(buf, transactional)
2636    }
2637
2638    /// Send the given buffer of rows to the QuestDB server.
2639    ///
2640    /// All the data stays in the buffer. Clear the buffer before starting a new batch.
2641    ///
2642    /// To send and clear in one step, call [Sender::flush] instead.
2643    pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> {
2644        self.flush_impl(buf, false)
2645    }
2646
2647    /// Send the given buffer of rows to the QuestDB server, clearing the buffer.
2648    ///
2649    /// After this function returns, the buffer is empty and ready for the next batch.
2650    /// If you want to preserve the buffer contents, call [Sender::flush_and_keep]. If
2651    /// you want to ensure the flush is transactional, call
2652    /// [Sender::flush_and_keep_with_flags].
2653    ///
2654    /// With ILP-over-HTTP, this function sends an HTTP request and waits for the
2655    /// response. If the server responds with an error, it returns a descriptive error.
2656    /// In the case of a network error, it retries until it has exhausted the retry time
2657    /// budget.
2658    ///
2659    /// With ILP-over-TCP, the function blocks only until the buffer is flushed to the
2660    /// underlying OS-level network socket, without waiting to actually send it to the
2661    /// server. In the case of an error, the server will quietly disconnect: consult the
2662    /// server logs for error messages.
2663    ///
2664    /// HTTP should be the first choice, but use TCP if you need to continuously send
2665    /// data to the server at a high rate.
2666    ///
2667    /// To improve the HTTP performance, send larger buffers (with more rows), and
2668    /// consider parallelizing writes using multiple senders from multiple threads.
2669    pub fn flush(&mut self, buf: &mut Buffer) -> Result<()> {
2670        self.flush_impl(buf, false)?;
2671        buf.clear();
2672        Ok(())
2673    }
2674
2675    /// Tell whether the sender is no longer usable and must be dropped.
2676    ///
2677    /// This happens when there was an earlier failure.
2678    ///
2679    /// This method is specific to ILP-over-TCP and is not relevant for ILP-over-HTTP.
2680    pub fn must_close(&self) -> bool {
2681        !self.connected
2682    }
2683}
2684
2685mod conf;
2686mod timestamp;
2687
2688#[cfg(feature = "ilp-over-http")]
2689mod http;
2690
2691#[cfg(feature = "ilp-over-http")]
2692use http::*;
2693
2694#[cfg(test)]
2695mod tests;