questdb/ingress/
mod.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2025 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24
25#![doc = include_str!("mod.md")]
26
27pub use self::ndarr::{ArrayElement, NdArrayView};
28pub use self::timestamp::*;
29use crate::error::{self, Result, fmt};
30use crate::ingress::conf::ConfigSetting;
31use core::time::Duration;
32use std::collections::HashMap;
33use std::fmt::{Debug, Display, Formatter, Write};
34
35use std::ops::Deref;
36use std::path::PathBuf;
37use std::str::FromStr;
38
39mod tls;
40
41#[cfg(all(feature = "_sender-tcp", feature = "aws-lc-crypto"))]
42use aws_lc_rs::{
43    rand::SystemRandom,
44    signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
45};
46
47#[cfg(all(feature = "_sender-tcp", feature = "ring-crypto"))]
48use ring::{
49    rand::SystemRandom,
50    signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
51};
52
53mod conf;
54
55pub(crate) mod ndarr;
56
57mod timestamp;
58
59mod buffer;
60pub use buffer::*;
61
62mod sender;
63pub use sender::*;
64
65const MAX_NAME_LEN_DEFAULT: usize = 127;
66
67/// The maximum allowed dimensions for arrays.
68pub const MAX_ARRAY_DIMS: usize = 32;
69pub const MAX_ARRAY_BUFFER_SIZE: usize = 512 * 1024 * 1024; // 512MiB
70pub const MAX_ARRAY_DIM_LEN: usize = 0x0FFF_FFFF; // 1 << 28 - 1
71
72pub(crate) const ARRAY_BINARY_FORMAT_TYPE: u8 = 14;
73pub(crate) const DOUBLE_BINARY_FORMAT_TYPE: u8 = 16;
74
75/// The version of InfluxDB Line Protocol used to communicate with the server.
76#[derive(Debug, Copy, Clone, PartialEq)]
77pub enum ProtocolVersion {
78    /// Version 1 of Line Protocol.
79    /// Full-text protocol.
80    /// This version is compatible with the InfluxDB database.
81    V1 = 1,
82
83    /// Version 2 of InfluxDB Line Protocol.
84    /// Uses binary format serialization for f64, and supports the array data type.
85    /// This version is specific to QuestDB and is not compatible with InfluxDB.
86    /// QuestDB server version 9.0.0 or later is required for `V2` supported.
87    V2 = 2,
88}
89
90impl Display for ProtocolVersion {
91    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92        match self {
93            ProtocolVersion::V1 => write!(f, "v1"),
94            ProtocolVersion::V2 => write!(f, "v2"),
95        }
96    }
97}
98
99#[cfg(feature = "_sender-tcp")]
100fn map_io_to_socket_err(prefix: &str, io_err: std::io::Error) -> error::Error {
101    fmt!(SocketError, "{}{}", prefix, io_err)
102}
103
104/// Possible sources of the root certificates used to validate the server's TLS
105/// certificate.
106#[derive(PartialEq, Debug, Clone, Copy)]
107pub enum CertificateAuthority {
108    /// Use the root certificates provided by the
109    /// [`webpki-roots`](https://crates.io/crates/webpki-roots) crate.
110    #[cfg(feature = "tls-webpki-certs")]
111    WebpkiRoots,
112
113    /// Use the root certificates provided by the OS
114    #[cfg(feature = "tls-native-certs")]
115    OsRoots,
116
117    /// Combine the root certificates provided by the OS and the `webpki-roots` crate.
118    #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
119    WebpkiAndOsRoots,
120
121    /// Use the root certificates provided in a PEM-encoded file.
122    PemFile,
123}
124
125/// A `u16` port number or `String` port service name as is registered with
126/// `/etc/services` or equivalent.
127///
128/// ```
129/// use questdb::ingress::Port;
130/// use std::convert::Into;
131///
132/// let service: Port = 9009.into();
133/// ```
134///
135/// or
136///
137/// ```
138/// use questdb::ingress::Port;
139/// use std::convert::Into;
140///
141/// // Assuming the service name is registered.
142/// let service: Port = "qdb_ilp".into();  // or with a String too.
143/// ```
144pub struct Port(String);
145
146impl From<String> for Port {
147    fn from(s: String) -> Self {
148        Port(s)
149    }
150}
151
152impl From<&str> for Port {
153    fn from(s: &str) -> Self {
154        Port(s.to_owned())
155    }
156}
157
158impl From<u16> for Port {
159    fn from(p: u16) -> Self {
160        Port(p.to_string())
161    }
162}
163
164fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
165    if let Some(auto_flush) = params.get("auto_flush")
166        && auto_flush.as_str() != "off"
167    {
168        return Err(error::fmt!(
169            ConfigError,
170            "Invalid auto_flush value '{auto_flush}'. This client does not \
171            support auto-flush, so the only accepted value is 'off'"
172        ));
173    }
174
175    for &param in ["auto_flush_rows", "auto_flush_bytes"].iter() {
176        if params.contains_key(param) {
177            return Err(error::fmt!(
178                ConfigError,
179                "Invalid configuration parameter {:?}. This client does not support auto-flush",
180                param
181            ));
182        }
183    }
184    Ok(())
185}
186
187/// Protocol used to communicate with the QuestDB server.
188#[derive(PartialEq, Debug, Clone, Copy)]
189pub enum Protocol {
190    #[cfg(feature = "_sender-tcp")]
191    /// ILP over TCP (streaming).
192    Tcp,
193
194    #[cfg(feature = "_sender-tcp")]
195    /// TCP + TLS
196    Tcps,
197
198    #[cfg(feature = "_sender-http")]
199    /// ILP over HTTP (request-response)
200    /// Version 1 is compatible with the InfluxDB Line Protocol.
201    Http,
202
203    #[cfg(feature = "_sender-http")]
204    /// HTTP + TLS
205    Https,
206}
207
208impl Display for Protocol {
209    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
210        f.write_str(self.schema())
211    }
212}
213
214impl Protocol {
215    fn default_port(&self) -> &str {
216        match self {
217            #[cfg(feature = "_sender-tcp")]
218            Protocol::Tcp | Protocol::Tcps => "9009",
219            #[cfg(feature = "_sender-http")]
220            Protocol::Http | Protocol::Https => "9000",
221        }
222    }
223
224    fn tls_enabled(&self) -> bool {
225        match self {
226            #[cfg(feature = "_sender-tcp")]
227            Protocol::Tcp => false,
228            #[cfg(feature = "_sender-tcp")]
229            Protocol::Tcps => true,
230            #[cfg(feature = "_sender-http")]
231            Protocol::Http => false,
232            #[cfg(feature = "_sender-http")]
233            Protocol::Https => true,
234        }
235    }
236
237    #[cfg(feature = "_sender-tcp")]
238    fn is_tcpx(&self) -> bool {
239        match self {
240            Protocol::Tcp | Protocol::Tcps => true,
241            #[cfg(feature = "_sender-http")]
242            Protocol::Http | Protocol::Https => false,
243        }
244    }
245
246    #[cfg(feature = "_sender-http")]
247    fn is_httpx(&self) -> bool {
248        match self {
249            #[cfg(feature = "_sender-tcp")]
250            Protocol::Tcp | Protocol::Tcps => false,
251            Protocol::Http | Protocol::Https => true,
252        }
253    }
254
255    fn schema(&self) -> &str {
256        match self {
257            #[cfg(feature = "_sender-tcp")]
258            Protocol::Tcp => "tcp",
259            #[cfg(feature = "_sender-tcp")]
260            Protocol::Tcps => "tcps",
261            #[cfg(feature = "_sender-http")]
262            Protocol::Http => "http",
263            #[cfg(feature = "_sender-http")]
264            Protocol::Https => "https",
265        }
266    }
267
268    fn from_schema(schema: &str) -> Result<Self> {
269        match schema {
270            #[cfg(feature = "_sender-tcp")]
271            "tcp" => Ok(Protocol::Tcp),
272            #[cfg(feature = "_sender-tcp")]
273            "tcps" => Ok(Protocol::Tcps),
274            #[cfg(feature = "_sender-http")]
275            "http" => Ok(Protocol::Http),
276            #[cfg(feature = "_sender-http")]
277            "https" => Ok(Protocol::Https),
278            _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
279        }
280    }
281}
282
283/// Accumulates parameters for a new `Sender` instance.
284///
285/// You can also create the builder from a config string.
286///
287/// ```no_run
288/// # use questdb::Result;
289/// use questdb::ingress::SenderBuilder;
290///
291/// # fn main() -> Result<()> {
292/// let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
293/// # Ok(())
294/// # }
295/// ```
296///
297/// Or create it from the `QDB_CLIENT_CONF` environment variable.
298///
299/// ```no_run
300/// # use questdb::Result;
301/// use questdb::ingress::SenderBuilder;
302///
303/// # fn main() -> Result<()> {
304/// // export QDB_CLIENT_CONF="https::addr=localhost:9000;"
305/// let mut sender = SenderBuilder::from_env()?.build()?;
306/// # Ok(())
307/// # }
308/// ```
309///
310/// The `SenderBuilder` can also be built programmatically.
311/// The minimum required parameters are the protocol, host, and port.
312///
313/// ```no_run
314/// # use questdb::Result;
315/// use questdb::ingress::SenderBuilder;
316/// use questdb::ingress::Protocol;
317///
318/// # fn main() -> Result<()> {
319/// # #[cfg(feature = "sync-sender-http")] {
320/// let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9000).build()?;
321/// # }
322/// # #[cfg(all(not(feature = "sync-sender-http"), feature = "sync-sender-tcp"))] {
323/// let mut sender = SenderBuilder::new(Protocol::Tcp, "localhost", 9009).build()?;
324/// # }
325/// # Ok(())
326/// # }
327/// ```
328#[derive(Debug, Clone)]
329pub struct SenderBuilder {
330    protocol: Protocol,
331    host: ConfigSetting<String>,
332    port: ConfigSetting<String>,
333    net_interface: ConfigSetting<Option<String>>,
334    max_buf_size: ConfigSetting<usize>,
335    max_name_len: ConfigSetting<usize>,
336    auth_timeout: ConfigSetting<Duration>,
337    username: ConfigSetting<Option<String>>,
338    password: ConfigSetting<Option<String>>,
339    token: ConfigSetting<Option<String>>,
340
341    #[cfg(feature = "_sender-tcp")]
342    token_x: ConfigSetting<Option<String>>,
343
344    #[cfg(feature = "_sender-tcp")]
345    token_y: ConfigSetting<Option<String>>,
346
347    protocol_version: ConfigSetting<Option<ProtocolVersion>>,
348
349    #[cfg(feature = "insecure-skip-verify")]
350    tls_verify: ConfigSetting<bool>,
351
352    tls_ca: ConfigSetting<CertificateAuthority>,
353    tls_roots: ConfigSetting<Option<PathBuf>>,
354
355    #[cfg(feature = "_sender-http")]
356    http: Option<conf::HttpConfig>,
357}
358
359impl SenderBuilder {
360    /// Create a new `SenderBuilder` instance from the configuration string.
361    ///
362    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
363    ///
364    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
365    ///
366    /// We recommend HTTP for most cases because it provides more features, like
367    /// reporting errors to the client and supporting transaction control. TCP can
368    /// sometimes be faster in higher-latency networks, but misses a number of
369    /// features.
370    ///
371    /// The accepted keys match one-for-one with the methods on `SenderBuilder`.
372    /// For example, this is a valid configuration string:
373    ///
374    /// "https::addr=host:port;username=alice;password=secret;"
375    ///
376    /// and there are matching methods [SenderBuilder::username] and
377    /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the
378    /// `SenderBuilder` constructor, so there's no matching method for that.
379    ///
380    /// You can also load the configuration from an environment variable. See
381    /// [`SenderBuilder::from_env`].
382    ///
383    /// Once you have a `SenderBuilder` instance, you can further customize it
384    /// before calling [`SenderBuilder::build`], but you can't change any settings
385    /// that are already set in the config string.
386    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
387        let conf = conf.as_ref();
388        let conf = questdb_confstr::parse_conf_str(conf)
389            .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
390        let service = conf.service();
391        let params = conf.params();
392
393        let protocol = Protocol::from_schema(service)?;
394
395        let Some(addr) = params.get("addr") else {
396            return Err(error::fmt!(
397                ConfigError,
398                "Missing \"addr\" parameter in config string"
399            ));
400        };
401        let (host, port) = match addr.split_once(':') {
402            Some((h, p)) => (h, p),
403            None => (addr.as_str(), protocol.default_port()),
404        };
405        let mut builder = SenderBuilder::new(protocol, host, port);
406
407        validate_auto_flush_params(params)?;
408
409        for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
410            builder = match key {
411                "username" => builder.username(val)?,
412                "password" => builder.password(val)?,
413                "token" => builder.token(val)?,
414                "token_x" => builder.token_x(val)?,
415                "token_y" => builder.token_y(val)?,
416                "bind_interface" => builder.bind_interface(val)?,
417                "protocol_version" => match val {
418                    "1" => builder.protocol_version(ProtocolVersion::V1)?,
419                    "2" => builder.protocol_version(ProtocolVersion::V2)?,
420                    "auto" => builder,
421                    invalid => {
422                        return Err(error::fmt!(
423                            ConfigError,
424                            "invalid \"protocol_version\" [value={invalid}, allowed-values=[auto, 1, 2]]]\"]"
425                        ));
426                    }
427                },
428                "max_name_len" => builder.max_name_len(parse_conf_value(key, val)?)?,
429
430                "init_buf_size" => {
431                    return Err(error::fmt!(
432                        ConfigError,
433                        "\"init_buf_size\" is not supported in config string"
434                    ));
435                }
436
437                "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
438
439                "auth_timeout" => {
440                    builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
441                }
442
443                "tls_verify" => {
444                    let verify = match val {
445                        "on" => true,
446                        "unsafe_off" => false,
447                        _ => {
448                            return Err(fmt!(
449                                ConfigError,
450                                r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
451                            ));
452                        }
453                    };
454
455                    #[cfg(not(feature = "insecure-skip-verify"))]
456                    {
457                        if !verify {
458                            return Err(fmt!(
459                                ConfigError,
460                                r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
461                            ));
462                        }
463                        builder
464                    }
465
466                    #[cfg(feature = "insecure-skip-verify")]
467                    builder.tls_verify(verify)?
468                }
469
470                "tls_ca" => {
471                    let ca = match val {
472                        #[cfg(feature = "tls-webpki-certs")]
473                        "webpki_roots" => CertificateAuthority::WebpkiRoots,
474
475                        #[cfg(not(feature = "tls-webpki-certs"))]
476                        "webpki_roots" => {
477                            return Err(error::fmt!(
478                                ConfigError,
479                                "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature"
480                            ));
481                        }
482
483                        #[cfg(feature = "tls-native-certs")]
484                        "os_roots" => CertificateAuthority::OsRoots,
485
486                        #[cfg(not(feature = "tls-native-certs"))]
487                        "os_roots" => {
488                            return Err(error::fmt!(
489                                ConfigError,
490                                "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature"
491                            ));
492                        }
493
494                        #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
495                        "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
496
497                        #[cfg(not(all(
498                            feature = "tls-webpki-certs",
499                            feature = "tls-native-certs"
500                        )))]
501                        "webpki_and_os_roots" => {
502                            return Err(error::fmt!(
503                                ConfigError,
504                                "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features"
505                            ));
506                        }
507
508                        _ => {
509                            return Err(error::fmt!(
510                                ConfigError,
511                                "Invalid value {val:?} for \"tls_ca\""
512                            ));
513                        }
514                    };
515                    builder.tls_ca(ca)?
516                }
517
518                "tls_roots" => {
519                    let path = PathBuf::from_str(val).map_err(|e| {
520                        error::fmt!(
521                            ConfigError,
522                            "Invalid path {:?} for \"tls_roots\": {}",
523                            val,
524                            e
525                        )
526                    })?;
527                    builder.tls_roots(path)?
528                }
529
530                "tls_roots_password" => {
531                    return Err(error::fmt!(
532                        ConfigError,
533                        "\"tls_roots_password\" is not supported."
534                    ));
535                }
536
537                #[cfg(feature = "sync-sender-http")]
538                "request_min_throughput" => {
539                    builder.request_min_throughput(parse_conf_value(key, val)?)?
540                }
541
542                #[cfg(feature = "sync-sender-http")]
543                "request_timeout" => {
544                    builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
545                }
546
547                #[cfg(feature = "sync-sender-http")]
548                "retry_timeout" => {
549                    builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
550                }
551
552                // Ignore other parameters.
553                // We don't want to fail on unknown keys as this would require releasing different
554                // library implementations in lock step as soon as a new parameter is added to any of them,
555                // even if it's not used.
556                _ => builder,
557            };
558        }
559
560        Ok(builder)
561    }
562
563    /// Create a new `SenderBuilder` instance from the configuration from the
564    /// configuration stored in the `QDB_CLIENT_CONF` environment variable.
565    ///
566    /// The format of the string is the same as for [`SenderBuilder::from_conf`].
567    pub fn from_env() -> Result<Self> {
568        let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
569            error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
570        })?;
571        Self::from_conf(conf)
572    }
573
574    /// Create a new `SenderBuilder` instance with the provided QuestDB
575    /// server and port, using ILP over the specified protocol.
576    ///
577    /// ```no_run
578    /// # use questdb::Result;
579    /// use questdb::ingress::{Protocol, SenderBuilder};
580    ///
581    /// # fn main() -> Result<()> {
582    /// # #[cfg(feature = "sync-sender-tcp")] {
583    /// let mut sender = SenderBuilder::new(
584    ///     Protocol::Tcp, "localhost", 9009).build()?;
585    /// # }
586    /// # #[cfg(all(not(feature = "sync-sender-tcp"), feature = "sync-sender-http"))] {
587    /// let mut sender = SenderBuilder::new(
588    ///     Protocol::Http, "localhost", 9000).build()?;
589    /// # }
590    /// # Ok(())
591    /// # }
592    /// ```
593    pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
594        let host = host.into();
595        let port: Port = port.into();
596        let port = port.0;
597
598        #[cfg(feature = "tls-webpki-certs")]
599        let tls_ca = CertificateAuthority::WebpkiRoots;
600
601        #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
602        let tls_ca = CertificateAuthority::OsRoots;
603
604        #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
605        let tls_ca = CertificateAuthority::PemFile;
606
607        Self {
608            protocol,
609            host: ConfigSetting::new_specified(host),
610            port: ConfigSetting::new_specified(port),
611            net_interface: ConfigSetting::new_default(None),
612            max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
613            max_name_len: ConfigSetting::new_default(MAX_NAME_LEN_DEFAULT),
614            auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
615            username: ConfigSetting::new_default(None),
616            password: ConfigSetting::new_default(None),
617            token: ConfigSetting::new_default(None),
618
619            #[cfg(feature = "_sender-tcp")]
620            token_x: ConfigSetting::new_default(None),
621
622            #[cfg(feature = "_sender-tcp")]
623            token_y: ConfigSetting::new_default(None),
624
625            protocol_version: ConfigSetting::new_default(None),
626
627            #[cfg(feature = "insecure-skip-verify")]
628            tls_verify: ConfigSetting::new_default(true),
629
630            tls_ca: ConfigSetting::new_default(tls_ca),
631            tls_roots: ConfigSetting::new_default(None),
632
633            #[cfg(feature = "sync-sender-http")]
634            http: if protocol.is_httpx() {
635                Some(conf::HttpConfig::default())
636            } else {
637                None
638            },
639        }
640    }
641
642    /// Select local outbound interface.
643    ///
644    /// This may be relevant if your machine has multiple network interfaces.
645    ///
646    /// The default is `"0.0.0.0"`.
647    pub fn bind_interface<I: Into<String>>(self, addr: I) -> Result<Self> {
648        #[cfg(feature = "_sender-tcp")]
649        {
650            let mut builder = self;
651            builder.ensure_is_tcpx("bind_interface")?;
652            builder
653                .net_interface
654                .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
655            Ok(builder)
656        }
657
658        #[cfg(not(feature = "_sender-tcp"))]
659        {
660            let _ = addr;
661            Err(error::fmt!(
662                ConfigError,
663                "The \"bind_interface\" setting can only be used with the TCP protocol."
664            ))
665        }
666    }
667
668    /// Set the username for authentication.
669    ///
670    /// For TCP, this is the `kid` part of the ECDSA key set.
671    /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x),
672    /// and [`token_y`](SenderBuilder::token_y).
673    ///
674    /// For HTTP, this is a part of basic authentication.
675    /// See also: [`password`](SenderBuilder::password).
676    pub fn username(mut self, username: &str) -> Result<Self> {
677        self.username
678            .set_specified("username", Some(validate_value(username.to_string())?))?;
679        Ok(self)
680    }
681
682    /// Set the password for basic HTTP authentication.
683    /// See also: [`username`](SenderBuilder::username).
684    pub fn password(mut self, password: &str) -> Result<Self> {
685        self.password
686            .set_specified("password", Some(validate_value(password.to_string())?))?;
687        Ok(self)
688    }
689
690    /// Set the Token (Bearer) Authentication parameter for HTTP,
691    /// or the ECDSA private key for TCP authentication.
692    pub fn token(mut self, token: &str) -> Result<Self> {
693        self.token
694            .set_specified("token", Some(validate_value(token.to_string())?))?;
695        Ok(self)
696    }
697
698    /// Set the ECDSA public key X for TCP authentication.
699    pub fn token_x(self, token_x: &str) -> Result<Self> {
700        #[cfg(feature = "_sender-tcp")]
701        {
702            let mut builder = self;
703            builder
704                .token_x
705                .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
706            Ok(builder)
707        }
708
709        #[cfg(not(feature = "_sender-tcp"))]
710        {
711            let _ = token_x;
712            Err(error::fmt!(
713                ConfigError,
714                "cannot specify \"token_x\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
715            ))
716        }
717    }
718
719    /// Set the ECDSA public key Y for TCP authentication.
720    pub fn token_y(self, token_y: &str) -> Result<Self> {
721        #[cfg(feature = "_sender-tcp")]
722        {
723            let mut builder = self;
724            builder
725                .token_y
726                .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
727            Ok(builder)
728        }
729
730        #[cfg(not(feature = "_sender-tcp"))]
731        {
732            let _ = token_y;
733            Err(error::fmt!(
734                ConfigError,
735                "cannot specify \"token_y\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
736            ))
737        }
738    }
739
740    /// Sets the ingestion protocol version.
741    /// - HTTP transport automatically negotiates the protocol version by default(unset, **Strong Recommended**).
742    ///   You can explicitly configure the protocol version to avoid the slight latency cost at connection time.
743    /// - TCP transport does not negotiate the protocol version and uses [`ProtocolVersion::V1`] by
744    ///   default. You must explicitly set [`ProtocolVersion::V2`] in order to ingest
745    ///   arrays.
746    ///
747    /// **Note**: QuestDB server version 9.0.0 or later is required for [`ProtocolVersion::V2`] support.
748    pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result<Self> {
749        self.protocol_version
750            .set_specified("protocol_version", Some(protocol_version))?;
751        Ok(self)
752    }
753
754    /// Configure how long to wait for messages from the QuestDB server during
755    /// the TLS handshake and authentication process. This only applies to TCP.
756    /// The default is 15 seconds.
757    pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
758        self.auth_timeout.set_specified("auth_timeout", value)?;
759        Ok(self)
760    }
761
762    /// Ensure that TLS is enabled for the protocol.
763    pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
764        if !self.protocol.tls_enabled() {
765            return Err(error::fmt!(
766                ConfigError,
767                "Cannot set {property:?}: TLS is not supported for protocol {}",
768                self.protocol
769            ));
770        }
771        Ok(())
772    }
773
774    /// Set to `false` to disable TLS certificate verification.
775    /// This should only be used for debugging purposes as it reduces security.
776    ///
777    /// For testing, consider specifying a path to a `.pem` file instead via
778    /// the [`tls_roots`](SenderBuilder::tls_roots) method.
779    #[cfg(feature = "insecure-skip-verify")]
780    pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
781        self.ensure_tls_enabled("tls_verify")?;
782        self.tls_verify.set_specified("tls_verify", verify)?;
783        Ok(self)
784    }
785
786    /// Specify where to find the root certificate used to validate the
787    /// server's TLS certificate.
788    pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
789        self.ensure_tls_enabled("tls_ca")?;
790        self.tls_ca.set_specified("tls_ca", ca)?;
791        Ok(self)
792    }
793
794    /// Set the path to a custom root certificate `.pem` file.
795    /// This is used to validate the server's certificate during the TLS handshake.
796    ///
797    /// See notes on how to test with [self-signed
798    /// certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs).
799    pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
800        let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
801        let path = path.into();
802        // Attempt to read the file here to catch any issues early.
803        let _file = std::fs::File::open(&path).map_err(|io_err| {
804            error::fmt!(
805                ConfigError,
806                "Could not open root certificate file from path {:?}: {}",
807                path,
808                io_err
809            )
810        })?;
811        builder.tls_roots.set_specified("tls_roots", Some(path))?;
812        Ok(builder)
813    }
814
815    /// The maximum buffer size in bytes that the client will flush to the server.
816    /// The default is 100 MiB.
817    pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
818        let min = 1024;
819        if value < min {
820            return Err(error::fmt!(
821                ConfigError,
822                "max_buf_size\" must be at least {min} bytes."
823            ));
824        }
825        self.max_buf_size.set_specified("max_buf_size", value)?;
826        Ok(self)
827    }
828
829    /// The maximum length of a table or column name in bytes.
830    /// Matches the `cairo.max.file.name.length` setting in the server.
831    /// The default is 127 bytes.
832    /// If running over HTTP and protocol version 2 is auto-negotiated, this
833    /// value is picked up from the server.
834    pub fn max_name_len(mut self, value: usize) -> Result<Self> {
835        if value < 16 {
836            return Err(error::fmt!(
837                ConfigError,
838                "max_name_len must be at least 16 bytes."
839            ));
840        }
841        self.max_name_len.set_specified("max_name_len", value)?;
842        Ok(self)
843    }
844
845    #[cfg(feature = "sync-sender-http")]
846    /// Set the cumulative duration spent in retries.
847    /// The value is in milliseconds, and the default is 10 seconds.
848    pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
849        if let Some(http) = &mut self.http {
850            http.retry_timeout.set_specified("retry_timeout", value)?;
851        } else {
852            return Err(error::fmt!(
853                ConfigError,
854                "retry_timeout is supported only in ILP over HTTP."
855            ));
856        }
857        Ok(self)
858    }
859
860    #[cfg(feature = "sync-sender-http")]
861    /// Set the minimum acceptable throughput while sending a buffer to the server.
862    /// The sender will divide the payload size by this number to determine for how
863    /// long to keep sending the payload before timing out.
864    /// The value is in bytes per second, and the default is 100 KiB/s.
865    /// The timeout calculated from minimum throughput is adedd to the value of
866    /// [`request_timeout`](SenderBuilder::request_timeout) to get the total timeout
867    /// value.
868    /// A value of 0 disables this feature, so it's similar to setting "infinite"
869    /// minimum throughput. The total timeout will then be equal to `request_timeout`.
870    pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
871        if let Some(http) = &mut self.http {
872            http.request_min_throughput
873                .set_specified("request_min_throughput", value)?;
874        } else {
875            return Err(error::fmt!(
876                ConfigError,
877                "\"request_min_throughput\" is supported only in ILP over HTTP."
878            ));
879        }
880        Ok(self)
881    }
882
883    #[cfg(feature = "sync-sender-http")]
884    /// Additional time to wait on top of that calculated from the minimum throughput.
885    /// This accounts for the fixed latency of the HTTP request-response roundtrip.
886    /// The default is 10 seconds.
887    /// See also: [`request_min_throughput`](SenderBuilder::request_min_throughput).
888    pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
889        if let Some(http) = &mut self.http {
890            if value.is_zero() {
891                return Err(error::fmt!(
892                    ConfigError,
893                    "\"request_timeout\" must be greater than 0."
894                ));
895            }
896            http.request_timeout
897                .set_specified("request_timeout", value)?;
898        } else {
899            return Err(error::fmt!(
900                ConfigError,
901                "\"request_timeout\" is supported only in ILP over HTTP."
902            ));
903        }
904        Ok(self)
905    }
906
907    #[cfg(feature = "sync-sender-http")]
908    /// Internal API, do not use.
909    /// This is exposed exclusively for the Python client.
910    /// We (QuestDB) use this to help us debug which client is being used if we encounter issues.
911    #[doc(hidden)]
912    pub fn user_agent(mut self, value: &str) -> Result<Self> {
913        let value = validate_value(value)?;
914        if let Some(http) = &mut self.http {
915            http.user_agent = value.to_string();
916        }
917        Ok(self)
918    }
919
920    fn build_auth(&self) -> Result<Option<conf::AuthParams>> {
921        match (
922            self.protocol,
923            self.username.deref(),
924            self.password.deref(),
925            self.token.deref(),
926            #[cfg(feature = "_sender-tcp")]
927            self.token_x.deref(),
928            #[cfg(not(feature = "_sender-tcp"))]
929            None::<String>,
930            #[cfg(feature = "_sender-tcp")]
931            self.token_y.deref(),
932            #[cfg(not(feature = "_sender-tcp"))]
933            None::<String>,
934        ) {
935            (_, None, None, None, None, None) => Ok(None),
936
937            #[cfg(feature = "_sender-tcp")]
938            (protocol, Some(username), None, Some(token), Some(token_x), Some(token_y))
939                if protocol.is_tcpx() =>
940            {
941                Ok(Some(conf::AuthParams::Ecdsa(conf::EcdsaAuthParams {
942                    key_id: username.to_string(),
943                    priv_key: token.to_string(),
944                    pub_key_x: token_x.to_string(),
945                    pub_key_y: token_y.to_string(),
946                })))
947            }
948
949            #[cfg(feature = "_sender-tcp")]
950            (protocol, Some(_username), Some(_password), None, None, None)
951                if protocol.is_tcpx() =>
952            {
953                Err(error::fmt!(
954                    ConfigError,
955                    r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
956                ))
957            }
958
959            #[cfg(feature = "_sender-tcp")]
960            (protocol, None, None, Some(_token), None, None) if protocol.is_tcpx() => {
961                Err(error::fmt!(
962                    ConfigError,
963                    "Token authentication only be used with the ILP/HTTP protocol."
964                ))
965            }
966
967            #[cfg(feature = "_sender-tcp")]
968            (protocol, _username, None, _token, _token_x, _token_y) if protocol.is_tcpx() => {
969                Err(error::fmt!(
970                    ConfigError,
971                    r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
972                ))
973            }
974            #[cfg(feature = "_sender-http")]
975            (protocol, Some(username), Some(password), None, None, None) if protocol.is_httpx() => {
976                Ok(Some(conf::AuthParams::Basic(conf::BasicAuthParams {
977                    username: username.to_string(),
978                    password: password.to_string(),
979                })))
980            }
981            #[cfg(feature = "_sender-http")]
982            (protocol, Some(_username), None, None, None, None) if protocol.is_httpx() => {
983                Err(error::fmt!(
984                    ConfigError,
985                    r##"Basic authentication parameter "username" is present, but "password" is missing."##,
986                ))
987            }
988            #[cfg(feature = "_sender-http")]
989            (protocol, None, Some(_password), None, None, None) if protocol.is_httpx() => {
990                Err(error::fmt!(
991                    ConfigError,
992                    r##"Basic authentication parameter "password" is present, but "username" is missing."##,
993                ))
994            }
995            #[cfg(feature = "sync-sender-http")]
996            (protocol, None, None, Some(token), None, None) if protocol.is_httpx() => {
997                Ok(Some(conf::AuthParams::Token(conf::TokenAuthParams {
998                    token: token.to_string(),
999                })))
1000            }
1001            #[cfg(feature = "sync-sender-http")]
1002            (protocol, Some(_username), None, Some(_token), Some(_token_x), Some(_token_y))
1003                if protocol.is_httpx() =>
1004            {
1005                Err(error::fmt!(
1006                    ConfigError,
1007                    "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
1008                ))
1009            }
1010            #[cfg(feature = "_sender-http")]
1011            (protocol, _username, _password, _token, None, None) if protocol.is_httpx() => {
1012                Err(error::fmt!(
1013                    ConfigError,
1014                    r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
1015                ))
1016            }
1017            _ => Err(error::fmt!(
1018                ConfigError,
1019                r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
1020            )),
1021        }
1022    }
1023
1024    #[cfg(feature = "_sync-sender")]
1025    /// Build the sender.
1026    ///
1027    /// In the case of TCP, this synchronously establishes the TCP connection, and
1028    /// returns once the connection is fully established. If the connection
1029    /// requires authentication or TLS, these will also be completed before
1030    /// returning.
1031    pub fn build(&self) -> Result<Sender> {
1032        let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
1033
1034        if self.protocol.tls_enabled() {
1035            write!(descr, "tls=enabled,").unwrap();
1036        } else {
1037            write!(descr, "tls=disabled,").unwrap();
1038        }
1039
1040        #[cfg(feature = "insecure-skip-verify")]
1041        let tls_verify = *self.tls_verify;
1042
1043        let tls_settings = tls::TlsSettings::build(
1044            self.protocol.tls_enabled(),
1045            #[cfg(feature = "insecure-skip-verify")]
1046            tls_verify,
1047            *self.tls_ca,
1048            self.tls_roots.deref().as_deref(),
1049        )?;
1050
1051        let auth = self.build_auth()?;
1052
1053        let handler = match self.protocol {
1054            #[cfg(feature = "sync-sender-tcp")]
1055            Protocol::Tcp | Protocol::Tcps => connect_tcp(
1056                self.host.as_str(),
1057                self.port.as_str(),
1058                self.net_interface.deref().as_deref(),
1059                *self.auth_timeout,
1060                tls_settings,
1061                &auth,
1062            )?,
1063            #[cfg(feature = "sync-sender-http")]
1064            Protocol::Http | Protocol::Https => {
1065                use ureq::unversioned::transport::Connector;
1066                use ureq::unversioned::transport::TcpConnector;
1067                if self.net_interface.is_some() {
1068                    // See: https://github.com/algesten/ureq/issues/692
1069                    return Err(error::fmt!(
1070                        InvalidApiCall,
1071                        "net_interface is not supported for ILP over HTTP."
1072                    ));
1073                }
1074
1075                let http_config = self.http.as_ref().unwrap();
1076                let user_agent = http_config.user_agent.as_str();
1077                let connector = TcpConnector::default();
1078
1079                let agent_builder = ureq::Agent::config_builder()
1080                    .user_agent(user_agent)
1081                    .no_delay(true);
1082
1083                let tls_config = match tls_settings {
1084                    Some(tls_settings) => Some(tls::configure_tls(tls_settings)?),
1085                    None => None,
1086                };
1087
1088                let connector = connector.chain(TlsConnector::new(tls_config));
1089
1090                let auth = match auth {
1091                    Some(conf::AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
1092                    Some(conf::AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
1093
1094                    #[cfg(feature = "sync-sender-tcp")]
1095                    Some(conf::AuthParams::Ecdsa(_)) => {
1096                        return Err(fmt!(
1097                            AuthError,
1098                            "ECDSA authentication is not supported for ILP over HTTP. \
1099                            Please use basic or token authentication instead."
1100                        ));
1101                    }
1102                    None => None,
1103                };
1104                let agent_builder = agent_builder
1105                    .timeout_connect(Some(*http_config.request_timeout.deref()))
1106                    .http_status_as_error(false);
1107                let agent = ureq::Agent::with_parts(
1108                    agent_builder.build(),
1109                    connector,
1110                    ureq::unversioned::resolver::DefaultResolver::default(),
1111                );
1112                let proto = self.protocol.schema();
1113                let url = format!(
1114                    "{}://{}:{}/write",
1115                    proto,
1116                    self.host.deref(),
1117                    self.port.deref()
1118                );
1119                SyncProtocolHandler::SyncHttp(SyncHttpHandlerState {
1120                    agent,
1121                    url,
1122                    auth,
1123                    config: self.http.as_ref().unwrap().clone(),
1124                })
1125            }
1126        };
1127
1128        #[allow(unused_mut)]
1129        let mut max_name_len = *self.max_name_len;
1130
1131        let protocol_version = match self.protocol_version.deref() {
1132            Some(v) => *v,
1133            None => match self.protocol {
1134                #[cfg(feature = "sync-sender-tcp")]
1135                Protocol::Tcp | Protocol::Tcps => ProtocolVersion::V1,
1136                #[cfg(feature = "sync-sender-http")]
1137                Protocol::Http | Protocol::Https => {
1138                    #[allow(irrefutable_let_patterns)]
1139                    if let SyncProtocolHandler::SyncHttp(http_state) = &handler {
1140                        let settings_url = &format!(
1141                            "{}://{}:{}/settings",
1142                            self.protocol.schema(),
1143                            self.host.deref(),
1144                            self.port.deref()
1145                        );
1146                        let (protocol_versions, server_max_name_len) =
1147                            read_server_settings(http_state, settings_url, max_name_len)?;
1148                        max_name_len = server_max_name_len;
1149                        if protocol_versions.contains(&ProtocolVersion::V2) {
1150                            ProtocolVersion::V2
1151                        } else if protocol_versions.contains(&ProtocolVersion::V1) {
1152                            ProtocolVersion::V1
1153                        } else {
1154                            return Err(fmt!(
1155                                ProtocolVersionError,
1156                                "Server does not support current client"
1157                            ));
1158                        }
1159                    } else {
1160                        unreachable!("HTTP handler should be used for HTTP protocol");
1161                    }
1162                }
1163            },
1164        };
1165
1166        if auth.is_some() {
1167            descr.push_str("auth=on]");
1168        } else {
1169            descr.push_str("auth=off]");
1170        }
1171
1172        let sender = Sender::new(
1173            descr,
1174            handler,
1175            *self.max_buf_size,
1176            protocol_version,
1177            max_name_len,
1178        );
1179
1180        Ok(sender)
1181    }
1182
1183    #[cfg(feature = "_sender-tcp")]
1184    fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
1185        if self.protocol.is_tcpx() {
1186            Ok(())
1187        } else {
1188            Err(fmt!(
1189                ConfigError,
1190                "The {param_name:?} setting can only be used with the TCP protocol."
1191            ))
1192        }
1193    }
1194}
1195
1196/// When parsing from config, we exclude certain characters.
1197/// Here we repeat the same validation logic for consistency.
1198fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
1199    let str_ref = value.as_ref();
1200    for (p, c) in str_ref.chars().enumerate() {
1201        if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
1202            return Err(error::fmt!(
1203                ConfigError,
1204                "Invalid character {c:?} at position {p}"
1205            ));
1206        }
1207    }
1208    Ok(value)
1209}
1210
1211fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
1212where
1213    T: FromStr,
1214    T::Err: std::fmt::Debug,
1215{
1216    str_value.parse().map_err(|e| {
1217        fmt!(
1218            ConfigError,
1219            "Could not parse {param_name:?} to number: {e:?}"
1220        )
1221    })
1222}
1223
1224#[cfg(feature = "_sender-tcp")]
1225fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
1226    use base64ct::{Base64UrlUnpadded, Encoding};
1227    Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
1228        fmt!(
1229            AuthError,
1230            "Misconfigured ILP authentication keys. Could not decode {}: {}. \
1231            Hint: Check the keys for a possible typo.",
1232            descr,
1233            b64_err
1234        )
1235    })
1236}
1237
1238#[cfg(feature = "_sender-tcp")]
1239fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
1240    let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
1241    let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
1242
1243    // SEC 1 Uncompressed Octet-String-to-Elliptic-Curve-Point Encoding
1244    let mut encoded = Vec::new();
1245    encoded.push(4u8); // 0x04 magic byte that identifies this as uncompressed.
1246    let pub_key_x_ken = pub_key_x.len();
1247    if pub_key_x_ken > 32 {
1248        return Err(fmt!(
1249            AuthError,
1250            "Misconfigured ILP authentication keys. Public key x is too long. \
1251            Hint: Check the keys for a possible typo."
1252        ));
1253    }
1254    let pub_key_y_len = pub_key_y.len();
1255    if pub_key_y_len > 32 {
1256        return Err(fmt!(
1257            AuthError,
1258            "Misconfigured ILP authentication keys. Public key y is too long. \
1259            Hint: Check the keys for a possible typo."
1260        ));
1261    }
1262    encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
1263    encoded.append(&mut pub_key_x);
1264    encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
1265    encoded.append(&mut pub_key_y);
1266    Ok(encoded)
1267}
1268
1269#[cfg(feature = "_sender-tcp")]
1270fn parse_key_pair(auth: &conf::EcdsaAuthParams) -> Result<EcdsaKeyPair> {
1271    let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
1272    let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
1273
1274    #[cfg(feature = "aws-lc-crypto")]
1275    let res = EcdsaKeyPair::from_private_key_and_public_key(
1276        &ECDSA_P256_SHA256_FIXED_SIGNING,
1277        &private_key[..],
1278        &public_key[..],
1279    );
1280
1281    #[cfg(feature = "ring-crypto")]
1282    let res = {
1283        let system_random = SystemRandom::new();
1284        EcdsaKeyPair::from_private_key_and_public_key(
1285            &ECDSA_P256_SHA256_FIXED_SIGNING,
1286            &private_key[..],
1287            &public_key[..],
1288            &system_random,
1289        )
1290    };
1291
1292    res.map_err(|key_rejected| {
1293        fmt!(
1294            AuthError,
1295            "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
1296            key_rejected
1297        )
1298    })
1299}
1300
1301struct DebugBytes<'a>(pub &'a [u8]);
1302
1303impl Debug for DebugBytes<'_> {
1304    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1305        write!(f, "b\"")?;
1306
1307        for &byte in self.0 {
1308            match byte {
1309                // Printable ASCII characters (except backslash and quote)
1310                0x20..=0x21 | 0x23..=0x5B | 0x5D..=0x7E => {
1311                    write!(f, "{}", byte as char)?;
1312                }
1313                // Common escape sequences
1314                b'\n' => write!(f, "\\n")?,
1315                b'\r' => write!(f, "\\r")?,
1316                b'\t' => write!(f, "\\t")?,
1317                b'\\' => write!(f, "\\\\")?,
1318                b'"' => write!(f, "\\\"")?,
1319                b'\0' => write!(f, "\\0")?,
1320                // Non-printable bytes as hex escapes
1321                _ => write!(f, "\\x{byte:02x}")?,
1322            }
1323        }
1324
1325        write!(f, "\"")
1326    }
1327}
1328
1329#[cfg(test)]
1330mod tests;