questdb/ingress/
mod.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2025 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24
25#![doc = include_str!("mod.md")]
26
27pub use self::ndarr::{ArrayElement, NdArrayView};
28pub use self::timestamp::*;
29use crate::error::{self, fmt, Result};
30use crate::ingress::conf::ConfigSetting;
31use core::time::Duration;
32use std::collections::HashMap;
33use std::fmt::{Debug, Display, Formatter, Write};
34
35use std::ops::Deref;
36use std::path::PathBuf;
37use std::str::FromStr;
38
39mod tls;
40
41#[cfg(all(feature = "_sender-tcp", feature = "aws-lc-crypto"))]
42use aws_lc_rs::{
43    rand::SystemRandom,
44    signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING},
45};
46
47#[cfg(all(feature = "_sender-tcp", feature = "ring-crypto"))]
48use ring::{
49    rand::SystemRandom,
50    signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING},
51};
52
53mod conf;
54
55pub(crate) mod ndarr;
56
57mod timestamp;
58
59mod buffer;
60pub use buffer::*;
61
62mod sender;
63pub use sender::*;
64
65const MAX_NAME_LEN_DEFAULT: usize = 127;
66
67/// The maximum allowed dimensions for arrays.
68pub const MAX_ARRAY_DIMS: usize = 32;
69pub const MAX_ARRAY_BUFFER_SIZE: usize = 512 * 1024 * 1024; // 512MiB
70pub const MAX_ARRAY_DIM_LEN: usize = 0x0FFF_FFFF; // 1 << 28 - 1
71
72pub(crate) const ARRAY_BINARY_FORMAT_TYPE: u8 = 14;
73pub(crate) const DOUBLE_BINARY_FORMAT_TYPE: u8 = 16;
74
75/// The version of InfluxDB Line Protocol used to communicate with the server.
76#[derive(Debug, Copy, Clone, PartialEq)]
77pub enum ProtocolVersion {
78    /// Version 1 of Line Protocol.
79    /// Full-text protocol.
80    /// This version is compatible with the InfluxDB database.
81    V1 = 1,
82
83    /// Version 2 of InfluxDB Line Protocol.
84    /// Uses binary format serialization for f64, and supports the array data type.
85    /// This version is specific to QuestDB and is not compatible with InfluxDB.
86    /// QuestDB server version 9.0.0 or later is required for `V2` supported.
87    V2 = 2,
88}
89
90impl Display for ProtocolVersion {
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        match self {
93            ProtocolVersion::V1 => write!(f, "v1"),
94            ProtocolVersion::V2 => write!(f, "v2"),
95        }
96    }
97}
98
99#[cfg(feature = "_sender-tcp")]
100fn map_io_to_socket_err(prefix: &str, io_err: std::io::Error) -> error::Error {
101    fmt!(SocketError, "{}{}", prefix, io_err)
102}
103
104/// Possible sources of the root certificates used to validate the server's TLS
105/// certificate.
106#[derive(PartialEq, Debug, Clone, Copy)]
107pub enum CertificateAuthority {
108    /// Use the root certificates provided by the
109    /// [`webpki-roots`](https://crates.io/crates/webpki-roots) crate.
110    #[cfg(feature = "tls-webpki-certs")]
111    WebpkiRoots,
112
113    /// Use the root certificates provided by the OS
114    #[cfg(feature = "tls-native-certs")]
115    OsRoots,
116
117    /// Combine the root certificates provided by the OS and the `webpki-roots` crate.
118    #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
119    WebpkiAndOsRoots,
120
121    /// Use the root certificates provided in a PEM-encoded file.
122    PemFile,
123}
124
125/// A `u16` port number or `String` port service name as is registered with
126/// `/etc/services` or equivalent.
127///
128/// ```
129/// use questdb::ingress::Port;
130/// use std::convert::Into;
131///
132/// let service: Port = 9009.into();
133/// ```
134///
135/// or
136///
137/// ```
138/// use questdb::ingress::Port;
139/// use std::convert::Into;
140///
141/// // Assuming the service name is registered.
142/// let service: Port = "qdb_ilp".into();  // or with a String too.
143/// ```
144pub struct Port(String);
145
146impl From<String> for Port {
147    fn from(s: String) -> Self {
148        Port(s)
149    }
150}
151
152impl From<&str> for Port {
153    fn from(s: &str) -> Self {
154        Port(s.to_owned())
155    }
156}
157
158impl From<u16> for Port {
159    fn from(p: u16) -> Self {
160        Port(p.to_string())
161    }
162}
163
164fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
165    if let Some(auto_flush) = params.get("auto_flush") {
166        if auto_flush.as_str() != "off" {
167            return Err(error::fmt!(
168                ConfigError,
169                "Invalid auto_flush value '{auto_flush}'. This client does not \
170                support auto-flush, so the only accepted value is 'off'"
171            ));
172        }
173    }
174
175    for &param in ["auto_flush_rows", "auto_flush_bytes"].iter() {
176        if params.contains_key(param) {
177            return Err(error::fmt!(
178                ConfigError,
179                "Invalid configuration parameter {:?}. This client does not support auto-flush",
180                param
181            ));
182        }
183    }
184    Ok(())
185}
186
187/// Protocol used to communicate with the QuestDB server.
188#[derive(PartialEq, Debug, Clone, Copy)]
189pub enum Protocol {
190    #[cfg(feature = "_sender-tcp")]
191    /// ILP over TCP (streaming).
192    Tcp,
193
194    #[cfg(feature = "_sender-tcp")]
195    /// TCP + TLS
196    Tcps,
197
198    #[cfg(feature = "_sender-http")]
199    /// ILP over HTTP (request-response)
200    /// Version 1 is compatible with the InfluxDB Line Protocol.
201    Http,
202
203    #[cfg(feature = "_sender-http")]
204    /// HTTP + TLS
205    Https,
206}
207
208impl Display for Protocol {
209    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
210        f.write_str(self.schema())
211    }
212}
213
214impl Protocol {
215    fn default_port(&self) -> &str {
216        match self {
217            #[cfg(feature = "_sender-tcp")]
218            Protocol::Tcp | Protocol::Tcps => "9009",
219            #[cfg(feature = "_sender-http")]
220            Protocol::Http | Protocol::Https => "9000",
221        }
222    }
223
224    fn tls_enabled(&self) -> bool {
225        match self {
226            #[cfg(feature = "_sender-tcp")]
227            Protocol::Tcp => false,
228            #[cfg(feature = "_sender-tcp")]
229            Protocol::Tcps => true,
230            #[cfg(feature = "_sender-http")]
231            Protocol::Http => false,
232            #[cfg(feature = "_sender-http")]
233            Protocol::Https => true,
234        }
235    }
236
237    #[cfg(feature = "_sender-tcp")]
238    fn is_tcpx(&self) -> bool {
239        match self {
240            Protocol::Tcp | Protocol::Tcps => true,
241            #[cfg(feature = "_sender-http")]
242            Protocol::Http | Protocol::Https => false,
243        }
244    }
245
246    #[cfg(feature = "_sender-http")]
247    fn is_httpx(&self) -> bool {
248        match self {
249            #[cfg(feature = "_sender-tcp")]
250            Protocol::Tcp | Protocol::Tcps => false,
251            Protocol::Http | Protocol::Https => true,
252        }
253    }
254
255    fn schema(&self) -> &str {
256        match self {
257            #[cfg(feature = "_sender-tcp")]
258            Protocol::Tcp => "tcp",
259            #[cfg(feature = "_sender-tcp")]
260            Protocol::Tcps => "tcps",
261            #[cfg(feature = "_sender-http")]
262            Protocol::Http => "http",
263            #[cfg(feature = "_sender-http")]
264            Protocol::Https => "https",
265        }
266    }
267
268    fn from_schema(schema: &str) -> Result<Self> {
269        match schema {
270            #[cfg(feature = "_sender-tcp")]
271            "tcp" => Ok(Protocol::Tcp),
272            #[cfg(feature = "_sender-tcp")]
273            "tcps" => Ok(Protocol::Tcps),
274            #[cfg(feature = "_sender-http")]
275            "http" => Ok(Protocol::Http),
276            #[cfg(feature = "_sender-http")]
277            "https" => Ok(Protocol::Https),
278            _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
279        }
280    }
281}
282
283/// Accumulates parameters for a new `Sender` instance.
284///
285/// You can also create the builder from a config string.
286///
287/// ```no_run
288/// # use questdb::Result;
289/// use questdb::ingress::SenderBuilder;
290///
291/// # fn main() -> Result<()> {
292/// let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
293/// # Ok(())
294/// # }
295/// ```
296///
297/// Or create it from the `QDB_CLIENT_CONF` environment variable.
298///
299/// ```no_run
300/// # use questdb::Result;
301/// use questdb::ingress::SenderBuilder;
302///
303/// # fn main() -> Result<()> {
304/// // export QDB_CLIENT_CONF="https::addr=localhost:9000;"
305/// let mut sender = SenderBuilder::from_env()?.build()?;
306/// # Ok(())
307/// # }
308/// ```
309///
310/// The `SenderBuilder` can also be built programmatically.
311/// The minimum required parameters are the protocol, host, and port.
312///
313/// ```no_run
314/// # use questdb::Result;
315/// use questdb::ingress::SenderBuilder;
316/// use questdb::ingress::Protocol;
317///
318/// # fn main() -> Result<()> {
319/// # #[cfg(feature = "sync-sender-http")] {
320/// let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9000).build()?;
321/// # }
322/// # #[cfg(all(not(feature = "sync-sender-http"), feature = "sync-sender-tcp"))] {
323/// let mut sender = SenderBuilder::new(Protocol::Tcp, "localhost", 9009).build()?;
324/// # }
325/// # Ok(())
326/// # }
327/// ```
328#[derive(Debug, Clone)]
329pub struct SenderBuilder {
330    protocol: Protocol,
331    host: ConfigSetting<String>,
332    port: ConfigSetting<String>,
333    net_interface: ConfigSetting<Option<String>>,
334    max_buf_size: ConfigSetting<usize>,
335    max_name_len: ConfigSetting<usize>,
336    auth_timeout: ConfigSetting<Duration>,
337    username: ConfigSetting<Option<String>>,
338    password: ConfigSetting<Option<String>>,
339    token: ConfigSetting<Option<String>>,
340
341    #[cfg(feature = "_sender-tcp")]
342    token_x: ConfigSetting<Option<String>>,
343
344    #[cfg(feature = "_sender-tcp")]
345    token_y: ConfigSetting<Option<String>>,
346
347    protocol_version: ConfigSetting<Option<ProtocolVersion>>,
348
349    #[cfg(feature = "insecure-skip-verify")]
350    tls_verify: ConfigSetting<bool>,
351
352    tls_ca: ConfigSetting<CertificateAuthority>,
353    tls_roots: ConfigSetting<Option<PathBuf>>,
354
355    #[cfg(feature = "_sender-http")]
356    http: Option<conf::HttpConfig>,
357}
358
359impl SenderBuilder {
360    /// Create a new `SenderBuilder` instance from the configuration string.
361    ///
362    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
363    ///
364    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
365    ///
366    /// We recommend HTTP for most cases because it provides more features, like
367    /// reporting errors to the client and supporting transaction control. TCP can
368    /// sometimes be faster in higher-latency networks, but misses a number of
369    /// features.
370    ///
371    /// The accepted keys match one-for-one with the methods on `SenderBuilder`.
372    /// For example, this is a valid configuration string:
373    ///
374    /// "https::addr=host:port;username=alice;password=secret;"
375    ///
376    /// and there are matching methods [SenderBuilder::username] and
377    /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the
378    /// `SenderBuilder` constructor, so there's no matching method for that.
379    ///
380    /// You can also load the configuration from an environment variable. See
381    /// [`SenderBuilder::from_env`].
382    ///
383    /// Once you have a `SenderBuilder` instance, you can further customize it
384    /// before calling [`SenderBuilder::build`], but you can't change any settings
385    /// that are already set in the config string.
386    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
387        let conf = conf.as_ref();
388        let conf = questdb_confstr::parse_conf_str(conf)
389            .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
390        let service = conf.service();
391        let params = conf.params();
392
393        let protocol = Protocol::from_schema(service)?;
394
395        let Some(addr) = params.get("addr") else {
396            return Err(error::fmt!(
397                ConfigError,
398                "Missing \"addr\" parameter in config string"
399            ));
400        };
401        let (host, port) = match addr.split_once(':') {
402            Some((h, p)) => (h, p),
403            None => (addr.as_str(), protocol.default_port()),
404        };
405        let mut builder = SenderBuilder::new(protocol, host, port);
406
407        validate_auto_flush_params(params)?;
408
409        for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
410            builder = match key {
411                "username" => builder.username(val)?,
412                "password" => builder.password(val)?,
413                "token" => builder.token(val)?,
414                "token_x" => builder.token_x(val)?,
415                "token_y" => builder.token_y(val)?,
416                "bind_interface" => builder.bind_interface(val)?,
417                "protocol_version" => match val {
418                    "1" => builder.protocol_version(ProtocolVersion::V1)?,
419                    "2" => builder.protocol_version(ProtocolVersion::V2)?,
420                    "auto" => builder,
421                    invalid => {
422                        return Err(error::fmt!(
423                            ConfigError,
424                            "invalid \"protocol_version\" [value={invalid}, allowed-values=[auto, 1, 2]]]\"]"
425                        ))
426                    }
427                },
428                "max_name_len" => {
429                    builder.max_name_len(parse_conf_value(key, val)?)?
430                }
431
432                "init_buf_size" => {
433                    return Err(error::fmt!(
434                        ConfigError,
435                        "\"init_buf_size\" is not supported in config string"
436                    ))
437                }
438
439                "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
440
441                "auth_timeout" => {
442                    builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
443                }
444
445                "tls_verify" => {
446                    let verify = match val {
447                        "on" => true,
448                        "unsafe_off" => false,
449                        _ => {
450                            return Err(fmt!(
451                                ConfigError,
452                                r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
453                            ))
454                        }
455                    };
456
457                    #[cfg(not(feature = "insecure-skip-verify"))]
458                    {
459                        if !verify {
460                            return Err(fmt!(
461                                ConfigError,
462                                r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
463                            ));
464                        }
465                        builder
466                    }
467
468                    #[cfg(feature = "insecure-skip-verify")]
469                    builder.tls_verify(verify)?
470                }
471
472                "tls_ca" => {
473                    let ca = match val {
474                        #[cfg(feature = "tls-webpki-certs")]
475                        "webpki_roots" => CertificateAuthority::WebpkiRoots,
476
477                        #[cfg(not(feature = "tls-webpki-certs"))]
478                        "webpki_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature")),
479
480                        #[cfg(feature = "tls-native-certs")]
481                        "os_roots" => CertificateAuthority::OsRoots,
482
483                        #[cfg(not(feature = "tls-native-certs"))]
484                        "os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature")),
485
486                        #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
487                        "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
488
489                        #[cfg(not(all(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
490                        "webpki_and_os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features")),
491
492                        _ => return Err(error::fmt!(ConfigError, "Invalid value {val:?} for \"tls_ca\"")),
493                    };
494                    builder.tls_ca(ca)?
495                }
496
497                "tls_roots" => {
498                    let path = PathBuf::from_str(val).map_err(|e| {
499                        error::fmt!(
500                            ConfigError,
501                            "Invalid path {:?} for \"tls_roots\": {}",
502                            val,
503                            e
504                        )
505                    })?;
506                    builder.tls_roots(path)?
507                }
508
509                "tls_roots_password" => {
510                    return Err(error::fmt!(
511                        ConfigError,
512                        "\"tls_roots_password\" is not supported."
513                    ))
514                }
515
516                #[cfg(feature = "sync-sender-http")]
517                "request_min_throughput" => {
518                    builder.request_min_throughput(parse_conf_value(key, val)?)?
519                }
520
521                #[cfg(feature = "sync-sender-http")]
522                "request_timeout" => {
523                    builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
524                }
525
526                #[cfg(feature = "sync-sender-http")]
527                "retry_timeout" => {
528                    builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
529                }
530
531                // Ignore other parameters.
532                // We don't want to fail on unknown keys as this would require releasing different
533                // library implementations in lock step as soon as a new parameter is added to any of them,
534                // even if it's not used.
535                _ => builder,
536            };
537        }
538
539        Ok(builder)
540    }
541
542    /// Create a new `SenderBuilder` instance from the configuration from the
543    /// configuration stored in the `QDB_CLIENT_CONF` environment variable.
544    ///
545    /// The format of the string is the same as for [`SenderBuilder::from_conf`].
546    pub fn from_env() -> Result<Self> {
547        let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
548            error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
549        })?;
550        Self::from_conf(conf)
551    }
552
553    /// Create a new `SenderBuilder` instance with the provided QuestDB
554    /// server and port, using ILP over the specified protocol.
555    ///
556    /// ```no_run
557    /// # use questdb::Result;
558    /// use questdb::ingress::{Protocol, SenderBuilder};
559    ///
560    /// # fn main() -> Result<()> {
561    /// # #[cfg(feature = "sync-sender-tcp")] {
562    /// let mut sender = SenderBuilder::new(
563    ///     Protocol::Tcp, "localhost", 9009).build()?;
564    /// # }
565    /// # #[cfg(all(not(feature = "sync-sender-tcp"), feature = "sync-sender-http"))] {
566    /// let mut sender = SenderBuilder::new(
567    ///     Protocol::Http, "localhost", 9000).build()?;
568    /// # }
569    /// # Ok(())
570    /// # }
571    /// ```
572    pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
573        let host = host.into();
574        let port: Port = port.into();
575        let port = port.0;
576
577        #[cfg(feature = "tls-webpki-certs")]
578        let tls_ca = CertificateAuthority::WebpkiRoots;
579
580        #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
581        let tls_ca = CertificateAuthority::OsRoots;
582
583        #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
584        let tls_ca = CertificateAuthority::PemFile;
585
586        Self {
587            protocol,
588            host: ConfigSetting::new_specified(host),
589            port: ConfigSetting::new_specified(port),
590            net_interface: ConfigSetting::new_default(None),
591            max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
592            max_name_len: ConfigSetting::new_default(MAX_NAME_LEN_DEFAULT),
593            auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
594            username: ConfigSetting::new_default(None),
595            password: ConfigSetting::new_default(None),
596            token: ConfigSetting::new_default(None),
597
598            #[cfg(feature = "_sender-tcp")]
599            token_x: ConfigSetting::new_default(None),
600
601            #[cfg(feature = "_sender-tcp")]
602            token_y: ConfigSetting::new_default(None),
603
604            protocol_version: ConfigSetting::new_default(None),
605
606            #[cfg(feature = "insecure-skip-verify")]
607            tls_verify: ConfigSetting::new_default(true),
608
609            tls_ca: ConfigSetting::new_default(tls_ca),
610            tls_roots: ConfigSetting::new_default(None),
611
612            #[cfg(feature = "sync-sender-http")]
613            http: if protocol.is_httpx() {
614                Some(conf::HttpConfig::default())
615            } else {
616                None
617            },
618        }
619    }
620
621    /// Select local outbound interface.
622    ///
623    /// This may be relevant if your machine has multiple network interfaces.
624    ///
625    /// The default is `"0.0.0.0"`.
626    pub fn bind_interface<I: Into<String>>(self, addr: I) -> Result<Self> {
627        #[cfg(feature = "_sender-tcp")]
628        {
629            let mut builder = self;
630            builder.ensure_is_tcpx("bind_interface")?;
631            builder
632                .net_interface
633                .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
634            Ok(builder)
635        }
636
637        #[cfg(not(feature = "_sender-tcp"))]
638        {
639            let _ = addr;
640            Err(error::fmt!(
641                ConfigError,
642                "The \"bind_interface\" setting can only be used with the TCP protocol."
643            ))
644        }
645    }
646
647    /// Set the username for authentication.
648    ///
649    /// For TCP, this is the `kid` part of the ECDSA key set.
650    /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x),
651    /// and [`token_y`](SenderBuilder::token_y).
652    ///
653    /// For HTTP, this is a part of basic authentication.
654    /// See also: [`password`](SenderBuilder::password).
655    pub fn username(mut self, username: &str) -> Result<Self> {
656        self.username
657            .set_specified("username", Some(validate_value(username.to_string())?))?;
658        Ok(self)
659    }
660
661    /// Set the password for basic HTTP authentication.
662    /// See also: [`username`](SenderBuilder::username).
663    pub fn password(mut self, password: &str) -> Result<Self> {
664        self.password
665            .set_specified("password", Some(validate_value(password.to_string())?))?;
666        Ok(self)
667    }
668
669    /// Set the Token (Bearer) Authentication parameter for HTTP,
670    /// or the ECDSA private key for TCP authentication.
671    pub fn token(mut self, token: &str) -> Result<Self> {
672        self.token
673            .set_specified("token", Some(validate_value(token.to_string())?))?;
674        Ok(self)
675    }
676
677    /// Set the ECDSA public key X for TCP authentication.
678    pub fn token_x(self, token_x: &str) -> Result<Self> {
679        #[cfg(feature = "_sender-tcp")]
680        {
681            let mut builder = self;
682            builder
683                .token_x
684                .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
685            Ok(builder)
686        }
687
688        #[cfg(not(feature = "_sender-tcp"))]
689        {
690            let _ = token_x;
691            Err(error::fmt!(
692                ConfigError,
693                "cannot specify \"token_x\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
694            ))
695        }
696    }
697
698    /// Set the ECDSA public key Y for TCP authentication.
699    pub fn token_y(self, token_y: &str) -> Result<Self> {
700        #[cfg(feature = "_sender-tcp")]
701        {
702            let mut builder = self;
703            builder
704                .token_y
705                .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
706            Ok(builder)
707        }
708
709        #[cfg(not(feature = "_sender-tcp"))]
710        {
711            let _ = token_y;
712            Err(error::fmt!(
713                ConfigError,
714                "cannot specify \"token_y\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
715            ))
716        }
717    }
718
719    /// Sets the ingestion protocol version.
720    /// - HTTP transport automatically negotiates the protocol version by default(unset, **Strong Recommended**).
721    ///   You can explicitly configure the protocol version to avoid the slight latency cost at connection time.
722    /// - TCP transport does not negotiate the protocol version and uses [`ProtocolVersion::V1`] by
723    ///   default. You must explicitly set [`ProtocolVersion::V2`] in order to ingest
724    ///   arrays.
725    ///
726    /// **Note**: QuestDB server version 9.0.0 or later is required for [`ProtocolVersion::V2`] support.
727    pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result<Self> {
728        self.protocol_version
729            .set_specified("protocol_version", Some(protocol_version))?;
730        Ok(self)
731    }
732
733    /// Configure how long to wait for messages from the QuestDB server during
734    /// the TLS handshake and authentication process. This only applies to TCP.
735    /// The default is 15 seconds.
736    pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
737        self.auth_timeout.set_specified("auth_timeout", value)?;
738        Ok(self)
739    }
740
741    /// Ensure that TLS is enabled for the protocol.
742    pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
743        if !self.protocol.tls_enabled() {
744            return Err(error::fmt!(
745                ConfigError,
746                "Cannot set {property:?}: TLS is not supported for protocol {}",
747                self.protocol
748            ));
749        }
750        Ok(())
751    }
752
753    /// Set to `false` to disable TLS certificate verification.
754    /// This should only be used for debugging purposes as it reduces security.
755    ///
756    /// For testing, consider specifying a path to a `.pem` file instead via
757    /// the [`tls_roots`](SenderBuilder::tls_roots) method.
758    #[cfg(feature = "insecure-skip-verify")]
759    pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
760        self.ensure_tls_enabled("tls_verify")?;
761        self.tls_verify.set_specified("tls_verify", verify)?;
762        Ok(self)
763    }
764
765    /// Specify where to find the root certificate used to validate the
766    /// server's TLS certificate.
767    pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
768        self.ensure_tls_enabled("tls_ca")?;
769        self.tls_ca.set_specified("tls_ca", ca)?;
770        Ok(self)
771    }
772
773    /// Set the path to a custom root certificate `.pem` file.
774    /// This is used to validate the server's certificate during the TLS handshake.
775    ///
776    /// See notes on how to test with [self-signed
777    /// certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs).
778    pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
779        let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
780        let path = path.into();
781        // Attempt to read the file here to catch any issues early.
782        let _file = std::fs::File::open(&path).map_err(|io_err| {
783            error::fmt!(
784                ConfigError,
785                "Could not open root certificate file from path {:?}: {}",
786                path,
787                io_err
788            )
789        })?;
790        builder.tls_roots.set_specified("tls_roots", Some(path))?;
791        Ok(builder)
792    }
793
794    /// The maximum buffer size in bytes that the client will flush to the server.
795    /// The default is 100 MiB.
796    pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
797        let min = 1024;
798        if value < min {
799            return Err(error::fmt!(
800                ConfigError,
801                "max_buf_size\" must be at least {min} bytes."
802            ));
803        }
804        self.max_buf_size.set_specified("max_buf_size", value)?;
805        Ok(self)
806    }
807
808    /// The maximum length of a table or column name in bytes.
809    /// Matches the `cairo.max.file.name.length` setting in the server.
810    /// The default is 127 bytes.
811    /// If running over HTTP and protocol version 2 is auto-negotiated, this
812    /// value is picked up from the server.
813    pub fn max_name_len(mut self, value: usize) -> Result<Self> {
814        if value < 16 {
815            return Err(error::fmt!(
816                ConfigError,
817                "max_name_len must be at least 16 bytes."
818            ));
819        }
820        self.max_name_len.set_specified("max_name_len", value)?;
821        Ok(self)
822    }
823
824    #[cfg(feature = "sync-sender-http")]
825    /// Set the cumulative duration spent in retries.
826    /// The value is in milliseconds, and the default is 10 seconds.
827    pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
828        if let Some(http) = &mut self.http {
829            http.retry_timeout.set_specified("retry_timeout", value)?;
830        } else {
831            return Err(error::fmt!(
832                ConfigError,
833                "retry_timeout is supported only in ILP over HTTP."
834            ));
835        }
836        Ok(self)
837    }
838
839    #[cfg(feature = "sync-sender-http")]
840    /// Set the minimum acceptable throughput while sending a buffer to the server.
841    /// The sender will divide the payload size by this number to determine for how
842    /// long to keep sending the payload before timing out.
843    /// The value is in bytes per second, and the default is 100 KiB/s.
844    /// The timeout calculated from minimum throughput is adedd to the value of
845    /// [`request_timeout`](SenderBuilder::request_timeout) to get the total timeout
846    /// value.
847    /// A value of 0 disables this feature, so it's similar to setting "infinite"
848    /// minimum throughput. The total timeout will then be equal to `request_timeout`.
849    pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
850        if let Some(http) = &mut self.http {
851            http.request_min_throughput
852                .set_specified("request_min_throughput", value)?;
853        } else {
854            return Err(error::fmt!(
855                ConfigError,
856                "\"request_min_throughput\" is supported only in ILP over HTTP."
857            ));
858        }
859        Ok(self)
860    }
861
862    #[cfg(feature = "sync-sender-http")]
863    /// Additional time to wait on top of that calculated from the minimum throughput.
864    /// This accounts for the fixed latency of the HTTP request-response roundtrip.
865    /// The default is 10 seconds.
866    /// See also: [`request_min_throughput`](SenderBuilder::request_min_throughput).
867    pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
868        if let Some(http) = &mut self.http {
869            if value.is_zero() {
870                return Err(error::fmt!(
871                    ConfigError,
872                    "\"request_timeout\" must be greater than 0."
873                ));
874            }
875            http.request_timeout
876                .set_specified("request_timeout", value)?;
877        } else {
878            return Err(error::fmt!(
879                ConfigError,
880                "\"request_timeout\" is supported only in ILP over HTTP."
881            ));
882        }
883        Ok(self)
884    }
885
886    #[cfg(feature = "sync-sender-http")]
887    /// Internal API, do not use.
888    /// This is exposed exclusively for the Python client.
889    /// We (QuestDB) use this to help us debug which client is being used if we encounter issues.
890    #[doc(hidden)]
891    pub fn user_agent(mut self, value: &str) -> Result<Self> {
892        let value = validate_value(value)?;
893        if let Some(http) = &mut self.http {
894            http.user_agent = value.to_string();
895        }
896        Ok(self)
897    }
898
899    fn build_auth(&self) -> Result<Option<conf::AuthParams>> {
900        match (
901            self.protocol,
902            self.username.deref(),
903            self.password.deref(),
904            self.token.deref(),
905
906            #[cfg(feature = "_sender-tcp")]
907            self.token_x.deref(),
908
909            #[cfg(not(feature = "_sender-tcp"))]
910            None::<String>,
911
912            #[cfg(feature = "_sender-tcp")]
913            self.token_y.deref(),
914
915            #[cfg(not(feature = "_sender-tcp"))]
916            None::<String>,
917        ) {
918            (_, None, None, None, None, None) => Ok(None),
919
920            #[cfg(feature = "_sender-tcp")]
921            (
922                protocol,
923                Some(username),
924                None,
925                Some(token),
926                Some(token_x),
927                Some(token_y),
928            ) if protocol.is_tcpx() => Ok(Some(conf::AuthParams::Ecdsa(conf::EcdsaAuthParams {
929                key_id: username.to_string(),
930                priv_key: token.to_string(),
931                pub_key_x: token_x.to_string(),
932                pub_key_y: token_y.to_string(),
933            }))),
934
935            #[cfg(feature = "_sender-tcp")]
936            (protocol, Some(_username), Some(_password), None, None, None)
937            if protocol.is_tcpx() => {
938                Err(error::fmt!(ConfigError,
939                    r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
940                ))
941            }
942
943            #[cfg(feature = "_sender-tcp")]
944            (protocol, None, None, Some(_token), None, None)
945            if protocol.is_tcpx() => {
946                Err(error::fmt!(ConfigError, "Token authentication only be used with the ILP/HTTP protocol."))
947            }
948
949            #[cfg(feature = "_sender-tcp")]
950            (protocol, _username, None, _token, _token_x, _token_y)
951            if protocol.is_tcpx() => {
952                Err(error::fmt!(ConfigError,
953                    r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
954                ))
955            }
956            #[cfg(feature = "_sender-http")]
957            (protocol, Some(username), Some(password), None, None, None)
958            if protocol.is_httpx() => {
959                Ok(Some(conf::AuthParams::Basic(conf::BasicAuthParams {
960                    username: username.to_string(),
961                    password: password.to_string(),
962                })))
963            }
964            #[cfg(feature = "_sender-http")]
965            (protocol, Some(_username), None, None, None, None)
966            if protocol.is_httpx() => {
967                Err(error::fmt!(ConfigError,
968                    r##"Basic authentication parameter "username" is present, but "password" is missing."##,
969                ))
970            }
971            #[cfg(feature = "_sender-http")]
972            (protocol, None, Some(_password), None, None, None)
973            if protocol.is_httpx() => {
974                Err(error::fmt!(ConfigError,
975                    r##"Basic authentication parameter "password" is present, but "username" is missing."##,
976                ))
977            }
978            #[cfg(feature = "sync-sender-http")]
979            (protocol, None, None, Some(token), None, None)
980            if protocol.is_httpx() => {
981                Ok(Some(conf::AuthParams::Token(conf::TokenAuthParams {
982                    token: token.to_string(),
983                })))
984            }
985            #[cfg(feature = "sync-sender-http")]
986            (
987                protocol,
988                Some(_username),
989                None,
990                Some(_token),
991                Some(_token_x),
992                Some(_token_y),
993            ) if protocol.is_httpx() => {
994                Err(error::fmt!(ConfigError, "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."))
995            }
996            #[cfg(feature = "_sender-http")]
997            (protocol, _username, _password, _token, None, None)
998            if protocol.is_httpx() => {
999                Err(error::fmt!(ConfigError,
1000                    r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
1001                ))
1002            }
1003            _ => {
1004                Err(error::fmt!(ConfigError,
1005                    r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
1006                ))
1007            }
1008        }
1009    }
1010
1011    #[cfg(feature = "_sync-sender")]
1012    /// Build the sender.
1013    ///
1014    /// In the case of TCP, this synchronously establishes the TCP connection, and
1015    /// returns once the connection is fully established. If the connection
1016    /// requires authentication or TLS, these will also be completed before
1017    /// returning.
1018    pub fn build(&self) -> Result<Sender> {
1019        let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
1020
1021        if self.protocol.tls_enabled() {
1022            write!(descr, "tls=enabled,").unwrap();
1023        } else {
1024            write!(descr, "tls=disabled,").unwrap();
1025        }
1026
1027        #[cfg(feature = "insecure-skip-verify")]
1028        let tls_verify = *self.tls_verify;
1029
1030        let tls_settings = tls::TlsSettings::build(
1031            self.protocol.tls_enabled(),
1032            #[cfg(feature = "insecure-skip-verify")]
1033            tls_verify,
1034            *self.tls_ca,
1035            self.tls_roots.deref().as_deref(),
1036        )?;
1037
1038        let auth = self.build_auth()?;
1039
1040        let handler = match self.protocol {
1041            #[cfg(feature = "sync-sender-tcp")]
1042            Protocol::Tcp | Protocol::Tcps => connect_tcp(
1043                self.host.as_str(),
1044                self.port.as_str(),
1045                self.net_interface.deref().as_deref(),
1046                *self.auth_timeout,
1047                tls_settings,
1048                &auth,
1049            )?,
1050            #[cfg(feature = "sync-sender-http")]
1051            Protocol::Http | Protocol::Https => {
1052                use ureq::unversioned::transport::Connector;
1053                use ureq::unversioned::transport::TcpConnector;
1054                if self.net_interface.is_some() {
1055                    // See: https://github.com/algesten/ureq/issues/692
1056                    return Err(error::fmt!(
1057                        InvalidApiCall,
1058                        "net_interface is not supported for ILP over HTTP."
1059                    ));
1060                }
1061
1062                let http_config = self.http.as_ref().unwrap();
1063                let user_agent = http_config.user_agent.as_str();
1064                let connector = TcpConnector::default();
1065
1066                let agent_builder = ureq::Agent::config_builder()
1067                    .user_agent(user_agent)
1068                    .no_delay(true);
1069
1070                let tls_config = match tls_settings {
1071                    Some(tls_settings) => Some(tls::configure_tls(tls_settings)?),
1072                    None => None,
1073                };
1074
1075                let connector = connector.chain(TlsConnector::new(tls_config));
1076
1077                let auth = match auth {
1078                    Some(conf::AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
1079                    Some(conf::AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
1080
1081                    #[cfg(feature = "sync-sender-tcp")]
1082                    Some(conf::AuthParams::Ecdsa(_)) => {
1083                        return Err(fmt!(
1084                            AuthError,
1085                            "ECDSA authentication is not supported for ILP over HTTP. \
1086                            Please use basic or token authentication instead."
1087                        ));
1088                    }
1089                    None => None,
1090                };
1091                let agent_builder = agent_builder
1092                    .timeout_connect(Some(*http_config.request_timeout.deref()))
1093                    .http_status_as_error(false);
1094                let agent = ureq::Agent::with_parts(
1095                    agent_builder.build(),
1096                    connector,
1097                    ureq::unversioned::resolver::DefaultResolver::default(),
1098                );
1099                let proto = self.protocol.schema();
1100                let url = format!(
1101                    "{}://{}:{}/write",
1102                    proto,
1103                    self.host.deref(),
1104                    self.port.deref()
1105                );
1106                SyncProtocolHandler::SyncHttp(SyncHttpHandlerState {
1107                    agent,
1108                    url,
1109                    auth,
1110                    config: self.http.as_ref().unwrap().clone(),
1111                })
1112            }
1113        };
1114
1115        #[allow(unused_mut)]
1116        let mut max_name_len = *self.max_name_len;
1117
1118        let protocol_version = match self.protocol_version.deref() {
1119            Some(v) => *v,
1120            None => match self.protocol {
1121                #[cfg(feature = "sync-sender-tcp")]
1122                Protocol::Tcp | Protocol::Tcps => ProtocolVersion::V1,
1123                #[cfg(feature = "sync-sender-http")]
1124                Protocol::Http | Protocol::Https => {
1125                    #[allow(irrefutable_let_patterns)]
1126                    if let SyncProtocolHandler::SyncHttp(http_state) = &handler {
1127                        let settings_url = &format!(
1128                            "{}://{}:{}/settings",
1129                            self.protocol.schema(),
1130                            self.host.deref(),
1131                            self.port.deref()
1132                        );
1133                        let (protocol_versions, server_max_name_len) =
1134                            read_server_settings(http_state, settings_url, max_name_len)?;
1135                        max_name_len = server_max_name_len;
1136                        if protocol_versions.contains(&ProtocolVersion::V2) {
1137                            ProtocolVersion::V2
1138                        } else if protocol_versions.contains(&ProtocolVersion::V1) {
1139                            ProtocolVersion::V1
1140                        } else {
1141                            return Err(fmt!(
1142                                ProtocolVersionError,
1143                                "Server does not support current client"
1144                            ));
1145                        }
1146                    } else {
1147                        unreachable!("HTTP handler should be used for HTTP protocol");
1148                    }
1149                }
1150            },
1151        };
1152
1153        if auth.is_some() {
1154            descr.push_str("auth=on]");
1155        } else {
1156            descr.push_str("auth=off]");
1157        }
1158
1159        let sender = Sender::new(
1160            descr,
1161            handler,
1162            *self.max_buf_size,
1163            protocol_version,
1164            max_name_len,
1165        );
1166
1167        Ok(sender)
1168    }
1169
1170    #[cfg(feature = "_sender-tcp")]
1171    fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
1172        if self.protocol.is_tcpx() {
1173            Ok(())
1174        } else {
1175            Err(fmt!(
1176                ConfigError,
1177                "The {param_name:?} setting can only be used with the TCP protocol."
1178            ))
1179        }
1180    }
1181}
1182
1183/// When parsing from config, we exclude certain characters.
1184/// Here we repeat the same validation logic for consistency.
1185fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
1186    let str_ref = value.as_ref();
1187    for (p, c) in str_ref.chars().enumerate() {
1188        if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
1189            return Err(error::fmt!(
1190                ConfigError,
1191                "Invalid character {c:?} at position {p}"
1192            ));
1193        }
1194    }
1195    Ok(value)
1196}
1197
1198fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
1199where
1200    T: FromStr,
1201    T::Err: std::fmt::Debug,
1202{
1203    str_value.parse().map_err(|e| {
1204        fmt!(
1205            ConfigError,
1206            "Could not parse {param_name:?} to number: {e:?}"
1207        )
1208    })
1209}
1210
1211#[cfg(feature = "_sender-tcp")]
1212fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
1213    use base64ct::{Base64UrlUnpadded, Encoding};
1214    Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
1215        fmt!(
1216            AuthError,
1217            "Misconfigured ILP authentication keys. Could not decode {}: {}. \
1218            Hint: Check the keys for a possible typo.",
1219            descr,
1220            b64_err
1221        )
1222    })
1223}
1224
1225#[cfg(feature = "_sender-tcp")]
1226fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
1227    let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
1228    let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
1229
1230    // SEC 1 Uncompressed Octet-String-to-Elliptic-Curve-Point Encoding
1231    let mut encoded = Vec::new();
1232    encoded.push(4u8); // 0x04 magic byte that identifies this as uncompressed.
1233    let pub_key_x_ken = pub_key_x.len();
1234    if pub_key_x_ken > 32 {
1235        return Err(fmt!(
1236            AuthError,
1237            "Misconfigured ILP authentication keys. Public key x is too long. \
1238            Hint: Check the keys for a possible typo."
1239        ));
1240    }
1241    let pub_key_y_len = pub_key_y.len();
1242    if pub_key_y_len > 32 {
1243        return Err(fmt!(
1244            AuthError,
1245            "Misconfigured ILP authentication keys. Public key y is too long. \
1246            Hint: Check the keys for a possible typo."
1247        ));
1248    }
1249    encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
1250    encoded.append(&mut pub_key_x);
1251    encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
1252    encoded.append(&mut pub_key_y);
1253    Ok(encoded)
1254}
1255
1256#[cfg(feature = "_sender-tcp")]
1257fn parse_key_pair(auth: &conf::EcdsaAuthParams) -> Result<EcdsaKeyPair> {
1258    let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
1259    let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
1260
1261    #[cfg(feature = "aws-lc-crypto")]
1262    let res = EcdsaKeyPair::from_private_key_and_public_key(
1263        &ECDSA_P256_SHA256_FIXED_SIGNING,
1264        &private_key[..],
1265        &public_key[..],
1266    );
1267
1268    #[cfg(feature = "ring-crypto")]
1269    let res = {
1270        let system_random = SystemRandom::new();
1271        EcdsaKeyPair::from_private_key_and_public_key(
1272            &ECDSA_P256_SHA256_FIXED_SIGNING,
1273            &private_key[..],
1274            &public_key[..],
1275            &system_random,
1276        )
1277    };
1278
1279    res.map_err(|key_rejected| {
1280        fmt!(
1281            AuthError,
1282            "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
1283            key_rejected
1284        )
1285    })
1286}
1287
1288struct DebugBytes<'a>(pub &'a [u8]);
1289
1290impl<'a> Debug for DebugBytes<'a> {
1291    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1292        write!(f, "b\"")?;
1293
1294        for &byte in self.0 {
1295            match byte {
1296                // Printable ASCII characters (except backslash and quote)
1297                0x20..=0x21 | 0x23..=0x5B | 0x5D..=0x7E => {
1298                    write!(f, "{}", byte as char)?;
1299                }
1300                // Common escape sequences
1301                b'\n' => write!(f, "\\n")?,
1302                b'\r' => write!(f, "\\r")?,
1303                b'\t' => write!(f, "\\t")?,
1304                b'\\' => write!(f, "\\\\")?,
1305                b'"' => write!(f, "\\\"")?,
1306                b'\0' => write!(f, "\\0")?,
1307                // Non-printable bytes as hex escapes
1308                _ => write!(f, "\\x{byte:02x}")?,
1309            }
1310        }
1311
1312        write!(f, "\"")
1313    }
1314}
1315
1316#[cfg(test)]
1317mod tests;