questdb/ingress/
mod.rs

1/*******************************************************************************
2 *     ___                  _   ____  ____
3 *    / _ \ _   _  ___  ___| |_|  _ \| __ )
4 *   | | | | | | |/ _ \/ __| __| | | |  _ \
5 *   | |_| | |_| |  __/\__ \ |_| |_| | |_) |
6 *    \__\_\\__,_|\___||___/\__|____/|____/
7 *
8 *  Copyright (c) 2014-2019 Appsicle
9 *  Copyright (c) 2019-2025 QuestDB
10 *
11 *  Licensed under the Apache License, Version 2.0 (the "License");
12 *  you may not use this file except in compliance with the License.
13 *  You may obtain a copy of the License at
14 *
15 *  http://www.apache.org/licenses/LICENSE-2.0
16 *
17 *  Unless required by applicable law or agreed to in writing, software
18 *  distributed under the License is distributed on an "AS IS" BASIS,
19 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 *  See the License for the specific language governing permissions and
21 *  limitations under the License.
22 *
23 ******************************************************************************/
24
25#![doc = include_str!("mod.md")]
26
27pub use self::ndarr::{ArrayElement, NdArrayView};
28pub use self::timestamp::*;
29use crate::error::{self, Result, fmt};
30use crate::ingress::conf::ConfigSetting;
31use core::time::Duration;
32use std::collections::HashMap;
33use std::fmt::{Debug, Display, Formatter, Write};
34
35use std::ops::Deref;
36use std::path::PathBuf;
37use std::str::FromStr;
38
39mod tls;
40
41#[cfg(all(feature = "_sender-tcp", feature = "aws-lc-crypto"))]
42use aws_lc_rs::{
43    rand::SystemRandom,
44    signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
45};
46
47#[cfg(all(feature = "_sender-tcp", feature = "ring-crypto"))]
48use ring::{
49    rand::SystemRandom,
50    signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
51};
52
53mod conf;
54
55pub(crate) mod ndarr;
56
57mod timestamp;
58
59mod buffer;
60pub use buffer::*;
61
62mod sender;
63pub use sender::*;
64
65mod decimal;
66pub use decimal::DecimalView;
67
68const MAX_NAME_LEN_DEFAULT: usize = 127;
69
70/// The maximum allowed dimensions for arrays.
71pub const MAX_ARRAY_DIMS: usize = 32;
72pub const MAX_ARRAY_BUFFER_SIZE: usize = 512 * 1024 * 1024; // 512MiB
73pub const MAX_ARRAY_DIM_LEN: usize = 0x0FFF_FFFF; // 1 << 28 - 1
74
75pub(crate) const ARRAY_BINARY_FORMAT_TYPE: u8 = 14;
76pub(crate) const DOUBLE_BINARY_FORMAT_TYPE: u8 = 16;
77#[allow(dead_code)]
78pub const DECIMAL_BINARY_FORMAT_TYPE: u8 = 23;
79
80/// The version of InfluxDB Line Protocol used to communicate with the server.
81#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)]
82pub enum ProtocolVersion {
83    /// Version 1 of Line Protocol.
84    /// Full-text protocol.
85    /// This version is compatible with the InfluxDB database.
86    V1 = 1,
87
88    /// Version 2 of InfluxDB Line Protocol.
89    /// Uses binary format serialization for f64, and supports the array data type.
90    /// This version is specific to QuestDB and is not compatible with InfluxDB.
91    /// QuestDB server version 9.0.0 or later is required for `V2` support.
92    V2 = 2,
93
94    /// Version 3 of InfluxDB Line Protocol.
95    /// Supports the decimal data type in text and binary formats.
96    /// This version is specific to QuestDB and is not compatible with InfluxDB.
97    /// QuestDB server version 9.2.0 or later is required for `V3` support.
98    V3 = 3,
99}
100
101/// List of supported protocol versions, in order of preference (highest to lowest).
102const SUPPORTED_PROTOCOL_VERSIONS: [ProtocolVersion; 3] = [
103    ProtocolVersion::V3,
104    ProtocolVersion::V2,
105    ProtocolVersion::V1,
106];
107
108impl Display for ProtocolVersion {
109    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110        match self {
111            ProtocolVersion::V1 => write!(f, "v1"),
112            ProtocolVersion::V2 => write!(f, "v2"),
113            ProtocolVersion::V3 => write!(f, "v3"),
114        }
115    }
116}
117
118#[cfg(feature = "_sender-tcp")]
119fn map_io_to_socket_err(prefix: &str, io_err: std::io::Error) -> error::Error {
120    fmt!(SocketError, "{}{}", prefix, io_err)
121}
122
123/// Possible sources of the root certificates used to validate the server's TLS
124/// certificate.
125#[derive(PartialEq, Debug, Clone, Copy)]
126pub enum CertificateAuthority {
127    /// Use the root certificates provided by the
128    /// [`webpki-roots`](https://crates.io/crates/webpki-roots) crate.
129    #[cfg(feature = "tls-webpki-certs")]
130    WebpkiRoots,
131
132    /// Use the root certificates provided by the OS
133    #[cfg(feature = "tls-native-certs")]
134    OsRoots,
135
136    /// Combine the root certificates provided by the OS and the `webpki-roots` crate.
137    #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
138    WebpkiAndOsRoots,
139
140    /// Use the root certificates provided in a PEM-encoded file.
141    PemFile,
142}
143
144/// A `u16` port number or `String` port service name as is registered with
145/// `/etc/services` or equivalent.
146///
147/// ```
148/// use questdb::ingress::Port;
149/// use std::convert::Into;
150///
151/// let service: Port = 9009.into();
152/// ```
153///
154/// or
155///
156/// ```
157/// use questdb::ingress::Port;
158/// use std::convert::Into;
159///
160/// // Assuming the service name is registered.
161/// let service: Port = "qdb_ilp".into();  // or with a String too.
162/// ```
163pub struct Port(String);
164
165impl From<String> for Port {
166    fn from(s: String) -> Self {
167        Port(s)
168    }
169}
170
171impl From<&str> for Port {
172    fn from(s: &str) -> Self {
173        Port(s.to_owned())
174    }
175}
176
177impl From<u16> for Port {
178    fn from(p: u16) -> Self {
179        Port(p.to_string())
180    }
181}
182
183fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
184    if let Some(auto_flush) = params.get("auto_flush")
185        && auto_flush.as_str() != "off"
186    {
187        return Err(error::fmt!(
188            ConfigError,
189            "Invalid auto_flush value '{auto_flush}'. This client does not \
190            support auto-flush, so the only accepted value is 'off'"
191        ));
192    }
193
194    for &param in ["auto_flush_rows", "auto_flush_bytes"].iter() {
195        if params.contains_key(param) {
196            return Err(error::fmt!(
197                ConfigError,
198                "Invalid configuration parameter {:?}. This client does not support auto-flush",
199                param
200            ));
201        }
202    }
203    Ok(())
204}
205
206/// Protocol used to communicate with the QuestDB server.
207#[derive(PartialEq, Debug, Clone, Copy)]
208pub enum Protocol {
209    #[cfg(feature = "_sender-tcp")]
210    /// ILP over TCP (streaming).
211    Tcp,
212
213    #[cfg(feature = "_sender-tcp")]
214    /// TCP + TLS
215    Tcps,
216
217    #[cfg(feature = "_sender-http")]
218    /// ILP over HTTP (request-response)
219    /// Version 1 is compatible with the InfluxDB Line Protocol.
220    Http,
221
222    #[cfg(feature = "_sender-http")]
223    /// HTTP + TLS
224    Https,
225}
226
227impl Display for Protocol {
228    fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
229        f.write_str(self.schema())
230    }
231}
232
233impl Protocol {
234    fn default_port(&self) -> &str {
235        match self {
236            #[cfg(feature = "_sender-tcp")]
237            Protocol::Tcp | Protocol::Tcps => "9009",
238            #[cfg(feature = "_sender-http")]
239            Protocol::Http | Protocol::Https => "9000",
240        }
241    }
242
243    fn tls_enabled(&self) -> bool {
244        match self {
245            #[cfg(feature = "_sender-tcp")]
246            Protocol::Tcp => false,
247            #[cfg(feature = "_sender-tcp")]
248            Protocol::Tcps => true,
249            #[cfg(feature = "_sender-http")]
250            Protocol::Http => false,
251            #[cfg(feature = "_sender-http")]
252            Protocol::Https => true,
253        }
254    }
255
256    #[cfg(feature = "_sender-tcp")]
257    fn is_tcpx(&self) -> bool {
258        match self {
259            Protocol::Tcp | Protocol::Tcps => true,
260            #[cfg(feature = "_sender-http")]
261            Protocol::Http | Protocol::Https => false,
262        }
263    }
264
265    #[cfg(feature = "_sender-http")]
266    fn is_httpx(&self) -> bool {
267        match self {
268            #[cfg(feature = "_sender-tcp")]
269            Protocol::Tcp | Protocol::Tcps => false,
270            Protocol::Http | Protocol::Https => true,
271        }
272    }
273
274    fn schema(&self) -> &str {
275        match self {
276            #[cfg(feature = "_sender-tcp")]
277            Protocol::Tcp => "tcp",
278            #[cfg(feature = "_sender-tcp")]
279            Protocol::Tcps => "tcps",
280            #[cfg(feature = "_sender-http")]
281            Protocol::Http => "http",
282            #[cfg(feature = "_sender-http")]
283            Protocol::Https => "https",
284        }
285    }
286
287    fn from_schema(schema: &str) -> Result<Self> {
288        match schema {
289            #[cfg(feature = "_sender-tcp")]
290            "tcp" => Ok(Protocol::Tcp),
291            #[cfg(feature = "_sender-tcp")]
292            "tcps" => Ok(Protocol::Tcps),
293            #[cfg(feature = "_sender-http")]
294            "http" => Ok(Protocol::Http),
295            #[cfg(feature = "_sender-http")]
296            "https" => Ok(Protocol::Https),
297            _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
298        }
299    }
300}
301
302/// Accumulates parameters for a new `Sender` instance.
303///
304/// You can also create the builder from a config string.
305///
306/// ```no_run
307/// # use questdb::Result;
308/// use questdb::ingress::SenderBuilder;
309///
310/// # fn main() -> Result<()> {
311/// let mut sender = SenderBuilder::from_conf("https::addr=localhost:9000;")?.build()?;
312/// # Ok(())
313/// # }
314/// ```
315///
316/// Or create it from the `QDB_CLIENT_CONF` environment variable.
317///
318/// ```no_run
319/// # use questdb::Result;
320/// use questdb::ingress::SenderBuilder;
321///
322/// # fn main() -> Result<()> {
323/// // export QDB_CLIENT_CONF="https::addr=localhost:9000;"
324/// let mut sender = SenderBuilder::from_env()?.build()?;
325/// # Ok(())
326/// # }
327/// ```
328///
329/// The `SenderBuilder` can also be built programmatically.
330/// The minimum required parameters are the protocol, host, and port.
331///
332/// ```no_run
333/// # use questdb::Result;
334/// use questdb::ingress::SenderBuilder;
335/// use questdb::ingress::Protocol;
336///
337/// # fn main() -> Result<()> {
338/// # #[cfg(feature = "sync-sender-http")] {
339/// let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9000).build()?;
340/// # }
341/// # #[cfg(all(not(feature = "sync-sender-http"), feature = "sync-sender-tcp"))] {
342/// let mut sender = SenderBuilder::new(Protocol::Tcp, "localhost", 9009).build()?;
343/// # }
344/// # Ok(())
345/// # }
346/// ```
347#[derive(Debug, Clone)]
348pub struct SenderBuilder {
349    protocol: Protocol,
350    host: ConfigSetting<String>,
351    port: ConfigSetting<String>,
352    net_interface: ConfigSetting<Option<String>>,
353    max_buf_size: ConfigSetting<usize>,
354    max_name_len: ConfigSetting<usize>,
355    auth_timeout: ConfigSetting<Duration>,
356    username: ConfigSetting<Option<String>>,
357    password: ConfigSetting<Option<String>>,
358    token: ConfigSetting<Option<String>>,
359
360    #[cfg(feature = "_sender-tcp")]
361    token_x: ConfigSetting<Option<String>>,
362
363    #[cfg(feature = "_sender-tcp")]
364    token_y: ConfigSetting<Option<String>>,
365
366    protocol_version: ConfigSetting<Option<ProtocolVersion>>,
367
368    #[cfg(feature = "insecure-skip-verify")]
369    tls_verify: ConfigSetting<bool>,
370
371    tls_ca: ConfigSetting<CertificateAuthority>,
372    tls_roots: ConfigSetting<Option<PathBuf>>,
373
374    #[cfg(feature = "_sender-http")]
375    http: Option<conf::HttpConfig>,
376}
377
378impl SenderBuilder {
379    /// Create a new `SenderBuilder` instance from the configuration string.
380    ///
381    /// The format of the string is: `"http::addr=host:port;key=value;...;"`.
382    ///
383    /// Instead of `"http"`, you can also specify `"https"`, `"tcp"`, and `"tcps"`.
384    ///
385    /// We recommend HTTP for most cases because it provides more features, like
386    /// reporting errors to the client and supporting transaction control. TCP can
387    /// sometimes be faster in higher-latency networks, but misses a number of
388    /// features.
389    ///
390    /// The accepted keys match one-for-one with the methods on `SenderBuilder`.
391    /// For example, this is a valid configuration string:
392    ///
393    /// "https::addr=host:port;username=alice;password=secret;"
394    ///
395    /// and there are matching methods [SenderBuilder::username] and
396    /// [SenderBuilder::password]. The value of `addr=` is supplied directly to the
397    /// `SenderBuilder` constructor, so there's no matching method for that.
398    ///
399    /// You can also load the configuration from an environment variable. See
400    /// [`SenderBuilder::from_env`].
401    ///
402    /// Once you have a `SenderBuilder` instance, you can further customize it
403    /// before calling [`SenderBuilder::build`], but you can't change any settings
404    /// that are already set in the config string.
405    pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
406        let conf = conf.as_ref();
407        let conf = questdb_confstr::parse_conf_str(conf)
408            .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
409        let service = conf.service();
410        let params = conf.params();
411
412        let protocol = Protocol::from_schema(service)?;
413
414        let Some(addr) = params.get("addr") else {
415            return Err(error::fmt!(
416                ConfigError,
417                "Missing \"addr\" parameter in config string"
418            ));
419        };
420        let (host, port) = match addr.split_once(':') {
421            Some((h, p)) => (h, p),
422            None => (addr.as_str(), protocol.default_port()),
423        };
424        let mut builder = SenderBuilder::new(protocol, host, port);
425
426        validate_auto_flush_params(params)?;
427
428        for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
429            builder = match key {
430                "username" => builder.username(val)?,
431                "password" => builder.password(val)?,
432                "token" => builder.token(val)?,
433                "token_x" => builder.token_x(val)?,
434                "token_y" => builder.token_y(val)?,
435                "bind_interface" => builder.bind_interface(val)?,
436                "protocol_version" => match val {
437                    "1" => builder.protocol_version(ProtocolVersion::V1)?,
438                    "2" => builder.protocol_version(ProtocolVersion::V2)?,
439                    "3" => builder.protocol_version(ProtocolVersion::V3)?,
440                    "auto" => builder,
441                    invalid => {
442                        return Err(error::fmt!(
443                            ConfigError,
444                            "invalid \"protocol_version\" [value={invalid}, allowed-values=[auto, 1, 2, 3]]"
445                        ));
446                    }
447                },
448                "max_name_len" => builder.max_name_len(parse_conf_value(key, val)?)?,
449
450                "init_buf_size" => {
451                    return Err(error::fmt!(
452                        ConfigError,
453                        "\"init_buf_size\" is not supported in config string"
454                    ));
455                }
456
457                "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
458
459                "auth_timeout" => {
460                    builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
461                }
462
463                "tls_verify" => {
464                    let verify = match val {
465                        "on" => true,
466                        "unsafe_off" => false,
467                        _ => {
468                            return Err(fmt!(
469                                ConfigError,
470                                r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
471                            ));
472                        }
473                    };
474
475                    #[cfg(not(feature = "insecure-skip-verify"))]
476                    {
477                        if !verify {
478                            return Err(fmt!(
479                                ConfigError,
480                                r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
481                            ));
482                        }
483                        builder
484                    }
485
486                    #[cfg(feature = "insecure-skip-verify")]
487                    builder.tls_verify(verify)?
488                }
489
490                "tls_ca" => {
491                    let ca = match val {
492                        #[cfg(feature = "tls-webpki-certs")]
493                        "webpki_roots" => CertificateAuthority::WebpkiRoots,
494
495                        #[cfg(not(feature = "tls-webpki-certs"))]
496                        "webpki_roots" => {
497                            return Err(error::fmt!(
498                                ConfigError,
499                                "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature"
500                            ));
501                        }
502
503                        #[cfg(feature = "tls-native-certs")]
504                        "os_roots" => CertificateAuthority::OsRoots,
505
506                        #[cfg(not(feature = "tls-native-certs"))]
507                        "os_roots" => {
508                            return Err(error::fmt!(
509                                ConfigError,
510                                "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature"
511                            ));
512                        }
513
514                        #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
515                        "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
516
517                        #[cfg(not(all(
518                            feature = "tls-webpki-certs",
519                            feature = "tls-native-certs"
520                        )))]
521                        "webpki_and_os_roots" => {
522                            return Err(error::fmt!(
523                                ConfigError,
524                                "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features"
525                            ));
526                        }
527
528                        _ => {
529                            return Err(error::fmt!(
530                                ConfigError,
531                                "Invalid value {val:?} for \"tls_ca\""
532                            ));
533                        }
534                    };
535                    builder.tls_ca(ca)?
536                }
537
538                "tls_roots" => {
539                    let path = PathBuf::from_str(val).map_err(|e| {
540                        error::fmt!(
541                            ConfigError,
542                            "Invalid path {:?} for \"tls_roots\": {}",
543                            val,
544                            e
545                        )
546                    })?;
547                    builder.tls_roots(path)?
548                }
549
550                "tls_roots_password" => {
551                    return Err(error::fmt!(
552                        ConfigError,
553                        "\"tls_roots_password\" is not supported."
554                    ));
555                }
556
557                #[cfg(feature = "sync-sender-http")]
558                "request_min_throughput" => {
559                    builder.request_min_throughput(parse_conf_value(key, val)?)?
560                }
561
562                #[cfg(feature = "sync-sender-http")]
563                "request_timeout" => {
564                    builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
565                }
566
567                #[cfg(feature = "sync-sender-http")]
568                "retry_timeout" => {
569                    builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
570                }
571
572                // Ignore other parameters.
573                // We don't want to fail on unknown keys as this would require releasing different
574                // library implementations in lock step as soon as a new parameter is added to any of them,
575                // even if it's not used.
576                _ => builder,
577            };
578        }
579
580        Ok(builder)
581    }
582
583    /// Create a new `SenderBuilder` instance from the configuration from the
584    /// configuration stored in the `QDB_CLIENT_CONF` environment variable.
585    ///
586    /// The format of the string is the same as for [`SenderBuilder::from_conf`].
587    pub fn from_env() -> Result<Self> {
588        let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
589            error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
590        })?;
591        Self::from_conf(conf)
592    }
593
594    /// Create a new `SenderBuilder` instance with the provided QuestDB
595    /// server and port, using ILP over the specified protocol.
596    ///
597    /// ```no_run
598    /// # use questdb::Result;
599    /// use questdb::ingress::{Protocol, SenderBuilder};
600    ///
601    /// # fn main() -> Result<()> {
602    /// # #[cfg(feature = "sync-sender-tcp")] {
603    /// let mut sender = SenderBuilder::new(
604    ///     Protocol::Tcp, "localhost", 9009).build()?;
605    /// # }
606    /// # #[cfg(all(not(feature = "sync-sender-tcp"), feature = "sync-sender-http"))] {
607    /// let mut sender = SenderBuilder::new(
608    ///     Protocol::Http, "localhost", 9000).build()?;
609    /// # }
610    /// # Ok(())
611    /// # }
612    /// ```
613    pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
614        let host = host.into();
615        let port: Port = port.into();
616        let port = port.0;
617
618        #[cfg(feature = "tls-webpki-certs")]
619        let tls_ca = CertificateAuthority::WebpkiRoots;
620
621        #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
622        let tls_ca = CertificateAuthority::OsRoots;
623
624        #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
625        let tls_ca = CertificateAuthority::PemFile;
626
627        Self {
628            protocol,
629            host: ConfigSetting::new_specified(host),
630            port: ConfigSetting::new_specified(port),
631            net_interface: ConfigSetting::new_default(None),
632            max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
633            max_name_len: ConfigSetting::new_default(MAX_NAME_LEN_DEFAULT),
634            auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
635            username: ConfigSetting::new_default(None),
636            password: ConfigSetting::new_default(None),
637            token: ConfigSetting::new_default(None),
638
639            #[cfg(feature = "_sender-tcp")]
640            token_x: ConfigSetting::new_default(None),
641
642            #[cfg(feature = "_sender-tcp")]
643            token_y: ConfigSetting::new_default(None),
644
645            protocol_version: ConfigSetting::new_default(None),
646
647            #[cfg(feature = "insecure-skip-verify")]
648            tls_verify: ConfigSetting::new_default(true),
649
650            tls_ca: ConfigSetting::new_default(tls_ca),
651            tls_roots: ConfigSetting::new_default(None),
652
653            #[cfg(feature = "sync-sender-http")]
654            http: if protocol.is_httpx() {
655                Some(conf::HttpConfig::default())
656            } else {
657                None
658            },
659        }
660    }
661
662    /// Select local outbound interface.
663    ///
664    /// This may be relevant if your machine has multiple network interfaces.
665    ///
666    /// The default is `"0.0.0.0"`.
667    pub fn bind_interface<I: Into<String>>(self, addr: I) -> Result<Self> {
668        #[cfg(feature = "_sender-tcp")]
669        {
670            let mut builder = self;
671            builder.ensure_is_tcpx("bind_interface")?;
672            builder
673                .net_interface
674                .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
675            Ok(builder)
676        }
677
678        #[cfg(not(feature = "_sender-tcp"))]
679        {
680            let _ = addr;
681            Err(error::fmt!(
682                ConfigError,
683                "The \"bind_interface\" setting can only be used with the TCP protocol."
684            ))
685        }
686    }
687
688    /// Set the username for authentication.
689    ///
690    /// For TCP, this is the `kid` part of the ECDSA key set.
691    /// The other fields are [`token`](SenderBuilder::token), [`token_x`](SenderBuilder::token_x),
692    /// and [`token_y`](SenderBuilder::token_y).
693    ///
694    /// For HTTP, this is a part of basic authentication.
695    /// See also: [`password`](SenderBuilder::password).
696    pub fn username(mut self, username: &str) -> Result<Self> {
697        self.username
698            .set_specified("username", Some(validate_value(username.to_string())?))?;
699        Ok(self)
700    }
701
702    /// Set the password for basic HTTP authentication.
703    /// See also: [`username`](SenderBuilder::username).
704    pub fn password(mut self, password: &str) -> Result<Self> {
705        self.password
706            .set_specified("password", Some(validate_value(password.to_string())?))?;
707        Ok(self)
708    }
709
710    /// Set the Token (Bearer) Authentication parameter for HTTP,
711    /// or the ECDSA private key for TCP authentication.
712    pub fn token(mut self, token: &str) -> Result<Self> {
713        self.token
714            .set_specified("token", Some(validate_value(token.to_string())?))?;
715        Ok(self)
716    }
717
718    /// Set the ECDSA public key X for TCP authentication.
719    pub fn token_x(self, token_x: &str) -> Result<Self> {
720        #[cfg(feature = "_sender-tcp")]
721        {
722            let mut builder = self;
723            builder
724                .token_x
725                .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
726            Ok(builder)
727        }
728
729        #[cfg(not(feature = "_sender-tcp"))]
730        {
731            let _ = token_x;
732            Err(error::fmt!(
733                ConfigError,
734                "cannot specify \"token_x\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
735            ))
736        }
737    }
738
739    /// Set the ECDSA public key Y for TCP authentication.
740    pub fn token_y(self, token_y: &str) -> Result<Self> {
741        #[cfg(feature = "_sender-tcp")]
742        {
743            let mut builder = self;
744            builder
745                .token_y
746                .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
747            Ok(builder)
748        }
749
750        #[cfg(not(feature = "_sender-tcp"))]
751        {
752            let _ = token_y;
753            Err(error::fmt!(
754                ConfigError,
755                "cannot specify \"token_y\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
756            ))
757        }
758    }
759
760    /// Sets the ingestion protocol version.
761    /// - HTTP transport automatically negotiates the protocol version by default(unset, **Strong Recommended**).
762    ///   You can explicitly configure the protocol version to avoid the slight latency cost at connection time.
763    /// - TCP transport does not negotiate the protocol version and uses [`ProtocolVersion::V1`] by
764    ///   default. You must explicitly set [`ProtocolVersion::V2`] in order to ingest
765    ///   arrays.
766    ///
767    /// **Note**: QuestDB server version 9.0.0 or later is required for [`ProtocolVersion::V2`] support.
768    pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result<Self> {
769        self.protocol_version
770            .set_specified("protocol_version", Some(protocol_version))?;
771        Ok(self)
772    }
773
774    /// Configure how long to wait for messages from the QuestDB server during
775    /// the TLS handshake and authentication process. This only applies to TCP.
776    /// The default is 15 seconds.
777    pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
778        self.auth_timeout.set_specified("auth_timeout", value)?;
779        Ok(self)
780    }
781
782    /// Ensure that TLS is enabled for the protocol.
783    pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
784        if !self.protocol.tls_enabled() {
785            return Err(error::fmt!(
786                ConfigError,
787                "Cannot set {property:?}: TLS is not supported for protocol {}",
788                self.protocol
789            ));
790        }
791        Ok(())
792    }
793
794    /// Set to `false` to disable TLS certificate verification.
795    /// This should only be used for debugging purposes as it reduces security.
796    ///
797    /// For testing, consider specifying a path to a `.pem` file instead via
798    /// the [`tls_roots`](SenderBuilder::tls_roots) method.
799    #[cfg(feature = "insecure-skip-verify")]
800    pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
801        self.ensure_tls_enabled("tls_verify")?;
802        self.tls_verify.set_specified("tls_verify", verify)?;
803        Ok(self)
804    }
805
806    /// Specify where to find the root certificate used to validate the
807    /// server's TLS certificate.
808    pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
809        self.ensure_tls_enabled("tls_ca")?;
810        self.tls_ca.set_specified("tls_ca", ca)?;
811        Ok(self)
812    }
813
814    /// Set the path to a custom root certificate `.pem` file.
815    /// This is used to validate the server's certificate during the TLS handshake.
816    ///
817    /// See notes on how to test with [self-signed
818    /// certificates](https://github.com/questdb/c-questdb-client/tree/main/tls_certs).
819    pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
820        let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
821        let path = path.into();
822        // Attempt to read the file here to catch any issues early.
823        let _file = std::fs::File::open(&path).map_err(|io_err| {
824            error::fmt!(
825                ConfigError,
826                "Could not open root certificate file from path {:?}: {}",
827                path,
828                io_err
829            )
830        })?;
831        builder.tls_roots.set_specified("tls_roots", Some(path))?;
832        Ok(builder)
833    }
834
835    /// The maximum buffer size in bytes that the client will flush to the server.
836    /// The default is 100 MiB.
837    pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
838        let min = 1024;
839        if value < min {
840            return Err(error::fmt!(
841                ConfigError,
842                "max_buf_size\" must be at least {min} bytes."
843            ));
844        }
845        self.max_buf_size.set_specified("max_buf_size", value)?;
846        Ok(self)
847    }
848
849    /// The maximum length of a table or column name in bytes.
850    /// Matches the `cairo.max.file.name.length` setting in the server.
851    /// The default is 127 bytes.
852    /// If running over HTTP and protocol version 2 is auto-negotiated, this
853    /// value is picked up from the server.
854    pub fn max_name_len(mut self, value: usize) -> Result<Self> {
855        if value < 16 {
856            return Err(error::fmt!(
857                ConfigError,
858                "max_name_len must be at least 16 bytes."
859            ));
860        }
861        self.max_name_len.set_specified("max_name_len", value)?;
862        Ok(self)
863    }
864
865    #[cfg(feature = "sync-sender-http")]
866    /// Set the cumulative duration spent in retries.
867    /// The value is in milliseconds, and the default is 10 seconds.
868    pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
869        if let Some(http) = &mut self.http {
870            http.retry_timeout.set_specified("retry_timeout", value)?;
871        } else {
872            return Err(error::fmt!(
873                ConfigError,
874                "retry_timeout is supported only in ILP over HTTP."
875            ));
876        }
877        Ok(self)
878    }
879
880    #[cfg(feature = "sync-sender-http")]
881    /// Set the minimum acceptable throughput while sending a buffer to the server.
882    /// The sender will divide the payload size by this number to determine for how
883    /// long to keep sending the payload before timing out.
884    /// The value is in bytes per second, and the default is 100 KiB/s.
885    /// The timeout calculated from minimum throughput is adedd to the value of
886    /// [`request_timeout`](SenderBuilder::request_timeout) to get the total timeout
887    /// value.
888    /// A value of 0 disables this feature, so it's similar to setting "infinite"
889    /// minimum throughput. The total timeout will then be equal to `request_timeout`.
890    pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
891        if let Some(http) = &mut self.http {
892            http.request_min_throughput
893                .set_specified("request_min_throughput", value)?;
894        } else {
895            return Err(error::fmt!(
896                ConfigError,
897                "\"request_min_throughput\" is supported only in ILP over HTTP."
898            ));
899        }
900        Ok(self)
901    }
902
903    #[cfg(feature = "sync-sender-http")]
904    /// Additional time to wait on top of that calculated from the minimum throughput.
905    /// This accounts for the fixed latency of the HTTP request-response roundtrip.
906    /// The default is 10 seconds.
907    /// See also: [`request_min_throughput`](SenderBuilder::request_min_throughput).
908    pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
909        if let Some(http) = &mut self.http {
910            if value.is_zero() {
911                return Err(error::fmt!(
912                    ConfigError,
913                    "\"request_timeout\" must be greater than 0."
914                ));
915            }
916            http.request_timeout
917                .set_specified("request_timeout", value)?;
918        } else {
919            return Err(error::fmt!(
920                ConfigError,
921                "\"request_timeout\" is supported only in ILP over HTTP."
922            ));
923        }
924        Ok(self)
925    }
926
927    #[cfg(feature = "sync-sender-http")]
928    /// Internal API, do not use.
929    /// This is exposed exclusively for the Python client.
930    /// We (QuestDB) use this to help us debug which client is being used if we encounter issues.
931    #[doc(hidden)]
932    pub fn user_agent(mut self, value: &str) -> Result<Self> {
933        let value = validate_value(value)?;
934        if let Some(http) = &mut self.http {
935            http.user_agent = value.to_string();
936        }
937        Ok(self)
938    }
939
940    fn build_auth(&self) -> Result<Option<conf::AuthParams>> {
941        match (
942            self.protocol,
943            self.username.deref(),
944            self.password.deref(),
945            self.token.deref(),
946            #[cfg(feature = "_sender-tcp")]
947            self.token_x.deref(),
948            #[cfg(not(feature = "_sender-tcp"))]
949            None::<String>,
950            #[cfg(feature = "_sender-tcp")]
951            self.token_y.deref(),
952            #[cfg(not(feature = "_sender-tcp"))]
953            None::<String>,
954        ) {
955            (_, None, None, None, None, None) => Ok(None),
956
957            #[cfg(feature = "_sender-tcp")]
958            (protocol, Some(username), None, Some(token), Some(token_x), Some(token_y))
959                if protocol.is_tcpx() =>
960            {
961                Ok(Some(conf::AuthParams::Ecdsa(conf::EcdsaAuthParams {
962                    key_id: username.to_string(),
963                    priv_key: token.to_string(),
964                    pub_key_x: token_x.to_string(),
965                    pub_key_y: token_y.to_string(),
966                })))
967            }
968
969            #[cfg(feature = "_sender-tcp")]
970            (protocol, Some(_username), Some(_password), None, None, None)
971                if protocol.is_tcpx() =>
972            {
973                Err(error::fmt!(
974                    ConfigError,
975                    r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
976                ))
977            }
978
979            #[cfg(feature = "_sender-tcp")]
980            (protocol, None, None, Some(_token), None, None) if protocol.is_tcpx() => {
981                Err(error::fmt!(
982                    ConfigError,
983                    "Token authentication only be used with the ILP/HTTP protocol."
984                ))
985            }
986
987            #[cfg(feature = "_sender-tcp")]
988            (protocol, _username, None, _token, _token_x, _token_y) if protocol.is_tcpx() => {
989                Err(error::fmt!(
990                    ConfigError,
991                    r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
992                ))
993            }
994            #[cfg(feature = "_sender-http")]
995            (protocol, Some(username), Some(password), None, None, None) if protocol.is_httpx() => {
996                Ok(Some(conf::AuthParams::Basic(conf::BasicAuthParams {
997                    username: username.to_string(),
998                    password: password.to_string(),
999                })))
1000            }
1001            #[cfg(feature = "_sender-http")]
1002            (protocol, Some(_username), None, None, None, None) if protocol.is_httpx() => {
1003                Err(error::fmt!(
1004                    ConfigError,
1005                    r##"Basic authentication parameter "username" is present, but "password" is missing."##,
1006                ))
1007            }
1008            #[cfg(feature = "_sender-http")]
1009            (protocol, None, Some(_password), None, None, None) if protocol.is_httpx() => {
1010                Err(error::fmt!(
1011                    ConfigError,
1012                    r##"Basic authentication parameter "password" is present, but "username" is missing."##,
1013                ))
1014            }
1015            #[cfg(feature = "sync-sender-http")]
1016            (protocol, None, None, Some(token), None, None) if protocol.is_httpx() => {
1017                Ok(Some(conf::AuthParams::Token(conf::TokenAuthParams {
1018                    token: token.to_string(),
1019                })))
1020            }
1021            #[cfg(feature = "sync-sender-http")]
1022            (protocol, Some(_username), None, Some(_token), Some(_token_x), Some(_token_y))
1023                if protocol.is_httpx() =>
1024            {
1025                Err(error::fmt!(
1026                    ConfigError,
1027                    "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
1028                ))
1029            }
1030            #[cfg(feature = "_sender-http")]
1031            (protocol, _username, _password, _token, None, None) if protocol.is_httpx() => {
1032                Err(error::fmt!(
1033                    ConfigError,
1034                    r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
1035                ))
1036            }
1037            _ => Err(error::fmt!(
1038                ConfigError,
1039                r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
1040            )),
1041        }
1042    }
1043
1044    #[cfg(feature = "_sync-sender")]
1045    /// Build the sender.
1046    ///
1047    /// In the case of TCP, this synchronously establishes the TCP connection, and
1048    /// returns once the connection is fully established. If the connection
1049    /// requires authentication or TLS, these will also be completed before
1050    /// returning.
1051    pub fn build(&self) -> Result<Sender> {
1052        let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
1053
1054        if self.protocol.tls_enabled() {
1055            write!(descr, "tls=enabled,").unwrap();
1056        } else {
1057            write!(descr, "tls=disabled,").unwrap();
1058        }
1059
1060        #[cfg(feature = "insecure-skip-verify")]
1061        let tls_verify = *self.tls_verify;
1062
1063        let tls_settings = tls::TlsSettings::build(
1064            self.protocol.tls_enabled(),
1065            #[cfg(feature = "insecure-skip-verify")]
1066            tls_verify,
1067            *self.tls_ca,
1068            self.tls_roots.deref().as_deref(),
1069        )?;
1070
1071        let auth = self.build_auth()?;
1072
1073        let handler = match self.protocol {
1074            #[cfg(feature = "sync-sender-tcp")]
1075            Protocol::Tcp | Protocol::Tcps => connect_tcp(
1076                self.host.as_str(),
1077                self.port.as_str(),
1078                self.net_interface.deref().as_deref(),
1079                *self.auth_timeout,
1080                tls_settings,
1081                &auth,
1082            )?,
1083            #[cfg(feature = "sync-sender-http")]
1084            Protocol::Http | Protocol::Https => {
1085                use ureq::unversioned::transport::Connector;
1086                use ureq::unversioned::transport::TcpConnector;
1087                if self.net_interface.is_some() {
1088                    // See: https://github.com/algesten/ureq/issues/692
1089                    return Err(error::fmt!(
1090                        InvalidApiCall,
1091                        "net_interface is not supported for ILP over HTTP."
1092                    ));
1093                }
1094
1095                let http_config = self.http.as_ref().unwrap();
1096                let user_agent = http_config.user_agent.as_str();
1097                let connector = TcpConnector::default();
1098
1099                let agent_builder = ureq::Agent::config_builder()
1100                    .user_agent(user_agent)
1101                    .no_delay(true);
1102
1103                let tls_config = match tls_settings {
1104                    Some(tls_settings) => Some(tls::configure_tls(tls_settings)?),
1105                    None => None,
1106                };
1107
1108                let connector = connector.chain(TlsConnector::new(tls_config));
1109
1110                let auth = match auth {
1111                    Some(conf::AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
1112                    Some(conf::AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
1113
1114                    #[cfg(feature = "sync-sender-tcp")]
1115                    Some(conf::AuthParams::Ecdsa(_)) => {
1116                        return Err(fmt!(
1117                            AuthError,
1118                            "ECDSA authentication is not supported for ILP over HTTP. \
1119                            Please use basic or token authentication instead."
1120                        ));
1121                    }
1122                    None => None,
1123                };
1124                let agent_builder = agent_builder
1125                    .timeout_connect(Some(*http_config.request_timeout.deref()))
1126                    .http_status_as_error(false);
1127                let agent = ureq::Agent::with_parts(
1128                    agent_builder.build(),
1129                    connector,
1130                    ureq::unversioned::resolver::DefaultResolver::default(),
1131                );
1132                let proto = self.protocol.schema();
1133                let url = format!(
1134                    "{}://{}:{}/write",
1135                    proto,
1136                    self.host.deref(),
1137                    self.port.deref()
1138                );
1139                SyncProtocolHandler::SyncHttp(SyncHttpHandlerState {
1140                    agent,
1141                    url,
1142                    auth,
1143                    config: self.http.as_ref().unwrap().clone(),
1144                })
1145            }
1146        };
1147
1148        #[allow(unused_mut)]
1149        let mut max_name_len = *self.max_name_len;
1150
1151        let protocol_version = match self.protocol_version.deref() {
1152            Some(v) => *v,
1153            None => match self.protocol {
1154                #[cfg(feature = "sync-sender-tcp")]
1155                Protocol::Tcp | Protocol::Tcps => ProtocolVersion::V1,
1156                #[cfg(feature = "sync-sender-http")]
1157                Protocol::Http | Protocol::Https => {
1158                    #[allow(irrefutable_let_patterns)]
1159                    if let SyncProtocolHandler::SyncHttp(http_state) = &handler {
1160                        let settings_url = &format!(
1161                            "{}://{}:{}/settings",
1162                            self.protocol.schema(),
1163                            self.host.deref(),
1164                            self.port.deref()
1165                        );
1166                        let (protocol_versions, server_max_name_len) =
1167                            read_server_settings(http_state, settings_url, max_name_len)?;
1168                        max_name_len = server_max_name_len;
1169                        SUPPORTED_PROTOCOL_VERSIONS
1170                            .iter()
1171                            .find(|version| protocol_versions.contains(version))
1172                            .copied()
1173                            .ok_or_else(|| {
1174                                fmt!(
1175                                    ProtocolVersionError,
1176                                    "Server does not support any of the client protocol versions: {:?}",
1177                                    SUPPORTED_PROTOCOL_VERSIONS
1178                                )
1179                            })?
1180                    } else {
1181                        unreachable!("HTTP handler should be used for HTTP protocol");
1182                    }
1183                }
1184            },
1185        };
1186
1187        if auth.is_some() {
1188            descr.push_str("auth=on]");
1189        } else {
1190            descr.push_str("auth=off]");
1191        }
1192
1193        let sender = Sender::new(
1194            descr,
1195            handler,
1196            *self.max_buf_size,
1197            protocol_version,
1198            max_name_len,
1199        );
1200
1201        Ok(sender)
1202    }
1203
1204    #[cfg(feature = "_sender-tcp")]
1205    fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
1206        if self.protocol.is_tcpx() {
1207            Ok(())
1208        } else {
1209            Err(fmt!(
1210                ConfigError,
1211                "The {param_name:?} setting can only be used with the TCP protocol."
1212            ))
1213        }
1214    }
1215}
1216
1217/// When parsing from config, we exclude certain characters.
1218/// Here we repeat the same validation logic for consistency.
1219fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
1220    let str_ref = value.as_ref();
1221    for (p, c) in str_ref.chars().enumerate() {
1222        if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
1223            return Err(error::fmt!(
1224                ConfigError,
1225                "Invalid character {c:?} at position {p}"
1226            ));
1227        }
1228    }
1229    Ok(value)
1230}
1231
1232fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
1233where
1234    T: FromStr,
1235    T::Err: std::fmt::Debug,
1236{
1237    str_value.parse().map_err(|e| {
1238        fmt!(
1239            ConfigError,
1240            "Could not parse {param_name:?} to number: {e:?}"
1241        )
1242    })
1243}
1244
1245#[cfg(feature = "_sender-tcp")]
1246fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
1247    use base64ct::{Base64UrlUnpadded, Encoding};
1248    Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
1249        fmt!(
1250            AuthError,
1251            "Misconfigured ILP authentication keys. Could not decode {}: {}. \
1252            Hint: Check the keys for a possible typo.",
1253            descr,
1254            b64_err
1255        )
1256    })
1257}
1258
1259#[cfg(feature = "_sender-tcp")]
1260fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
1261    let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
1262    let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
1263
1264    // SEC 1 Uncompressed Octet-String-to-Elliptic-Curve-Point Encoding
1265    let mut encoded = Vec::new();
1266    encoded.push(4u8); // 0x04 magic byte that identifies this as uncompressed.
1267    let pub_key_x_ken = pub_key_x.len();
1268    if pub_key_x_ken > 32 {
1269        return Err(fmt!(
1270            AuthError,
1271            "Misconfigured ILP authentication keys. Public key x is too long. \
1272            Hint: Check the keys for a possible typo."
1273        ));
1274    }
1275    let pub_key_y_len = pub_key_y.len();
1276    if pub_key_y_len > 32 {
1277        return Err(fmt!(
1278            AuthError,
1279            "Misconfigured ILP authentication keys. Public key y is too long. \
1280            Hint: Check the keys for a possible typo."
1281        ));
1282    }
1283    encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
1284    encoded.append(&mut pub_key_x);
1285    encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
1286    encoded.append(&mut pub_key_y);
1287    Ok(encoded)
1288}
1289
1290#[cfg(feature = "_sender-tcp")]
1291fn parse_key_pair(auth: &conf::EcdsaAuthParams) -> Result<EcdsaKeyPair> {
1292    let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
1293    let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
1294
1295    #[cfg(feature = "aws-lc-crypto")]
1296    let res = EcdsaKeyPair::from_private_key_and_public_key(
1297        &ECDSA_P256_SHA256_FIXED_SIGNING,
1298        &private_key[..],
1299        &public_key[..],
1300    );
1301
1302    #[cfg(feature = "ring-crypto")]
1303    let res = {
1304        let system_random = SystemRandom::new();
1305        EcdsaKeyPair::from_private_key_and_public_key(
1306            &ECDSA_P256_SHA256_FIXED_SIGNING,
1307            &private_key[..],
1308            &public_key[..],
1309            &system_random,
1310        )
1311    };
1312
1313    res.map_err(|key_rejected| {
1314        fmt!(
1315            AuthError,
1316            "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
1317            key_rejected
1318        )
1319    })
1320}
1321
1322struct DebugBytes<'a>(pub &'a [u8]);
1323
1324impl Debug for DebugBytes<'_> {
1325    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1326        write!(f, "b\"")?;
1327
1328        for &byte in self.0 {
1329            match byte {
1330                // Printable ASCII characters (except backslash and quote)
1331                0x20..=0x21 | 0x23..=0x5B | 0x5D..=0x7E => {
1332                    write!(f, "{}", byte as char)?;
1333                }
1334                // Common escape sequences
1335                b'\n' => write!(f, "\\n")?,
1336                b'\r' => write!(f, "\\r")?,
1337                b'\t' => write!(f, "\\t")?,
1338                b'\\' => write!(f, "\\\\")?,
1339                b'"' => write!(f, "\\\"")?,
1340                b'\0' => write!(f, "\\0")?,
1341                // Non-printable bytes as hex escapes
1342                _ => write!(f, "\\x{byte:02x}")?,
1343            }
1344        }
1345
1346        write!(f, "\"")
1347    }
1348}
1349
1350#[cfg(test)]
1351mod tests;