1#![doc = include_str!("mod.md")]
26
27pub use self::ndarr::{ArrayElement, NdArrayView};
28pub use self::timestamp::*;
29use crate::error::{self, fmt, Result};
30use crate::ingress::conf::ConfigSetting;
31use core::time::Duration;
32use std::collections::HashMap;
33use std::fmt::{Debug, Display, Formatter, Write};
34
35use std::ops::Deref;
36use std::path::PathBuf;
37use std::str::FromStr;
38
39mod tls;
40
41#[cfg(all(feature = "_sender-tcp", feature = "aws-lc-crypto"))]
42use aws_lc_rs::{
43 rand::SystemRandom,
44 signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING},
45};
46
47#[cfg(all(feature = "_sender-tcp", feature = "ring-crypto"))]
48use ring::{
49 rand::SystemRandom,
50 signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING},
51};
52
53mod conf;
54
55pub(crate) mod ndarr;
56
57mod timestamp;
58
59mod buffer;
60pub use buffer::*;
61
62mod sender;
63pub use sender::*;
64
65const MAX_NAME_LEN_DEFAULT: usize = 127;
66
67pub const MAX_ARRAY_DIMS: usize = 32;
69pub const MAX_ARRAY_BUFFER_SIZE: usize = 512 * 1024 * 1024; pub const MAX_ARRAY_DIM_LEN: usize = 0x0FFF_FFFF; pub(crate) const ARRAY_BINARY_FORMAT_TYPE: u8 = 14;
73pub(crate) const DOUBLE_BINARY_FORMAT_TYPE: u8 = 16;
74
75#[derive(Debug, Copy, Clone, PartialEq)]
77pub enum ProtocolVersion {
78 V1 = 1,
82
83 V2 = 2,
88}
89
90impl Display for ProtocolVersion {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 match self {
93 ProtocolVersion::V1 => write!(f, "v1"),
94 ProtocolVersion::V2 => write!(f, "v2"),
95 }
96 }
97}
98
99#[cfg(feature = "_sender-tcp")]
100fn map_io_to_socket_err(prefix: &str, io_err: std::io::Error) -> error::Error {
101 fmt!(SocketError, "{}{}", prefix, io_err)
102}
103
104#[derive(PartialEq, Debug, Clone, Copy)]
107pub enum CertificateAuthority {
108 #[cfg(feature = "tls-webpki-certs")]
111 WebpkiRoots,
112
113 #[cfg(feature = "tls-native-certs")]
115 OsRoots,
116
117 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
119 WebpkiAndOsRoots,
120
121 PemFile,
123}
124
125pub struct Port(String);
145
146impl From<String> for Port {
147 fn from(s: String) -> Self {
148 Port(s)
149 }
150}
151
152impl From<&str> for Port {
153 fn from(s: &str) -> Self {
154 Port(s.to_owned())
155 }
156}
157
158impl From<u16> for Port {
159 fn from(p: u16) -> Self {
160 Port(p.to_string())
161 }
162}
163
164fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
165 if let Some(auto_flush) = params.get("auto_flush") {
166 if auto_flush.as_str() != "off" {
167 return Err(error::fmt!(
168 ConfigError,
169 "Invalid auto_flush value '{auto_flush}'. This client does not \
170 support auto-flush, so the only accepted value is 'off'"
171 ));
172 }
173 }
174
175 for ¶m in ["auto_flush_rows", "auto_flush_bytes"].iter() {
176 if params.contains_key(param) {
177 return Err(error::fmt!(
178 ConfigError,
179 "Invalid configuration parameter {:?}. This client does not support auto-flush",
180 param
181 ));
182 }
183 }
184 Ok(())
185}
186
187#[derive(PartialEq, Debug, Clone, Copy)]
189pub enum Protocol {
190 #[cfg(feature = "_sender-tcp")]
191 Tcp,
193
194 #[cfg(feature = "_sender-tcp")]
195 Tcps,
197
198 #[cfg(feature = "_sender-http")]
199 Http,
202
203 #[cfg(feature = "_sender-http")]
204 Https,
206}
207
208impl Display for Protocol {
209 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
210 f.write_str(self.schema())
211 }
212}
213
214impl Protocol {
215 fn default_port(&self) -> &str {
216 match self {
217 #[cfg(feature = "_sender-tcp")]
218 Protocol::Tcp | Protocol::Tcps => "9009",
219 #[cfg(feature = "_sender-http")]
220 Protocol::Http | Protocol::Https => "9000",
221 }
222 }
223
224 fn tls_enabled(&self) -> bool {
225 match self {
226 #[cfg(feature = "_sender-tcp")]
227 Protocol::Tcp => false,
228 #[cfg(feature = "_sender-tcp")]
229 Protocol::Tcps => true,
230 #[cfg(feature = "_sender-http")]
231 Protocol::Http => false,
232 #[cfg(feature = "_sender-http")]
233 Protocol::Https => true,
234 }
235 }
236
237 #[cfg(feature = "_sender-tcp")]
238 fn is_tcpx(&self) -> bool {
239 match self {
240 Protocol::Tcp | Protocol::Tcps => true,
241 #[cfg(feature = "_sender-http")]
242 Protocol::Http | Protocol::Https => false,
243 }
244 }
245
246 #[cfg(feature = "_sender-http")]
247 fn is_httpx(&self) -> bool {
248 match self {
249 #[cfg(feature = "_sender-tcp")]
250 Protocol::Tcp | Protocol::Tcps => false,
251 Protocol::Http | Protocol::Https => true,
252 }
253 }
254
255 fn schema(&self) -> &str {
256 match self {
257 #[cfg(feature = "_sender-tcp")]
258 Protocol::Tcp => "tcp",
259 #[cfg(feature = "_sender-tcp")]
260 Protocol::Tcps => "tcps",
261 #[cfg(feature = "_sender-http")]
262 Protocol::Http => "http",
263 #[cfg(feature = "_sender-http")]
264 Protocol::Https => "https",
265 }
266 }
267
268 fn from_schema(schema: &str) -> Result<Self> {
269 match schema {
270 #[cfg(feature = "_sender-tcp")]
271 "tcp" => Ok(Protocol::Tcp),
272 #[cfg(feature = "_sender-tcp")]
273 "tcps" => Ok(Protocol::Tcps),
274 #[cfg(feature = "_sender-http")]
275 "http" => Ok(Protocol::Http),
276 #[cfg(feature = "_sender-http")]
277 "https" => Ok(Protocol::Https),
278 _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
279 }
280 }
281}
282
283#[derive(Debug, Clone)]
329pub struct SenderBuilder {
330 protocol: Protocol,
331 host: ConfigSetting<String>,
332 port: ConfigSetting<String>,
333 net_interface: ConfigSetting<Option<String>>,
334 max_buf_size: ConfigSetting<usize>,
335 max_name_len: ConfigSetting<usize>,
336 auth_timeout: ConfigSetting<Duration>,
337 username: ConfigSetting<Option<String>>,
338 password: ConfigSetting<Option<String>>,
339 token: ConfigSetting<Option<String>>,
340
341 #[cfg(feature = "_sender-tcp")]
342 token_x: ConfigSetting<Option<String>>,
343
344 #[cfg(feature = "_sender-tcp")]
345 token_y: ConfigSetting<Option<String>>,
346
347 protocol_version: ConfigSetting<Option<ProtocolVersion>>,
348
349 #[cfg(feature = "insecure-skip-verify")]
350 tls_verify: ConfigSetting<bool>,
351
352 tls_ca: ConfigSetting<CertificateAuthority>,
353 tls_roots: ConfigSetting<Option<PathBuf>>,
354
355 #[cfg(feature = "_sender-http")]
356 http: Option<conf::HttpConfig>,
357}
358
359impl SenderBuilder {
360 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
387 let conf = conf.as_ref();
388 let conf = questdb_confstr::parse_conf_str(conf)
389 .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
390 let service = conf.service();
391 let params = conf.params();
392
393 let protocol = Protocol::from_schema(service)?;
394
395 let Some(addr) = params.get("addr") else {
396 return Err(error::fmt!(
397 ConfigError,
398 "Missing \"addr\" parameter in config string"
399 ));
400 };
401 let (host, port) = match addr.split_once(':') {
402 Some((h, p)) => (h, p),
403 None => (addr.as_str(), protocol.default_port()),
404 };
405 let mut builder = SenderBuilder::new(protocol, host, port);
406
407 validate_auto_flush_params(params)?;
408
409 for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
410 builder = match key {
411 "username" => builder.username(val)?,
412 "password" => builder.password(val)?,
413 "token" => builder.token(val)?,
414 "token_x" => builder.token_x(val)?,
415 "token_y" => builder.token_y(val)?,
416 "bind_interface" => builder.bind_interface(val)?,
417 "protocol_version" => match val {
418 "1" => builder.protocol_version(ProtocolVersion::V1)?,
419 "2" => builder.protocol_version(ProtocolVersion::V2)?,
420 "auto" => builder,
421 invalid => {
422 return Err(error::fmt!(
423 ConfigError,
424 "invalid \"protocol_version\" [value={invalid}, allowed-values=[auto, 1, 2]]]\"]"
425 ))
426 }
427 },
428 "max_name_len" => {
429 builder.max_name_len(parse_conf_value(key, val)?)?
430 }
431
432 "init_buf_size" => {
433 return Err(error::fmt!(
434 ConfigError,
435 "\"init_buf_size\" is not supported in config string"
436 ))
437 }
438
439 "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
440
441 "auth_timeout" => {
442 builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
443 }
444
445 "tls_verify" => {
446 let verify = match val {
447 "on" => true,
448 "unsafe_off" => false,
449 _ => {
450 return Err(fmt!(
451 ConfigError,
452 r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
453 ))
454 }
455 };
456
457 #[cfg(not(feature = "insecure-skip-verify"))]
458 {
459 if !verify {
460 return Err(fmt!(
461 ConfigError,
462 r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
463 ));
464 }
465 builder
466 }
467
468 #[cfg(feature = "insecure-skip-verify")]
469 builder.tls_verify(verify)?
470 }
471
472 "tls_ca" => {
473 let ca = match val {
474 #[cfg(feature = "tls-webpki-certs")]
475 "webpki_roots" => CertificateAuthority::WebpkiRoots,
476
477 #[cfg(not(feature = "tls-webpki-certs"))]
478 "webpki_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature")),
479
480 #[cfg(feature = "tls-native-certs")]
481 "os_roots" => CertificateAuthority::OsRoots,
482
483 #[cfg(not(feature = "tls-native-certs"))]
484 "os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature")),
485
486 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
487 "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
488
489 #[cfg(not(all(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
490 "webpki_and_os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features")),
491
492 _ => return Err(error::fmt!(ConfigError, "Invalid value {val:?} for \"tls_ca\"")),
493 };
494 builder.tls_ca(ca)?
495 }
496
497 "tls_roots" => {
498 let path = PathBuf::from_str(val).map_err(|e| {
499 error::fmt!(
500 ConfigError,
501 "Invalid path {:?} for \"tls_roots\": {}",
502 val,
503 e
504 )
505 })?;
506 builder.tls_roots(path)?
507 }
508
509 "tls_roots_password" => {
510 return Err(error::fmt!(
511 ConfigError,
512 "\"tls_roots_password\" is not supported."
513 ))
514 }
515
516 #[cfg(feature = "sync-sender-http")]
517 "request_min_throughput" => {
518 builder.request_min_throughput(parse_conf_value(key, val)?)?
519 }
520
521 #[cfg(feature = "sync-sender-http")]
522 "request_timeout" => {
523 builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
524 }
525
526 #[cfg(feature = "sync-sender-http")]
527 "retry_timeout" => {
528 builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
529 }
530
531 _ => builder,
536 };
537 }
538
539 Ok(builder)
540 }
541
542 pub fn from_env() -> Result<Self> {
547 let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
548 error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
549 })?;
550 Self::from_conf(conf)
551 }
552
553 pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
573 let host = host.into();
574 let port: Port = port.into();
575 let port = port.0;
576
577 #[cfg(feature = "tls-webpki-certs")]
578 let tls_ca = CertificateAuthority::WebpkiRoots;
579
580 #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
581 let tls_ca = CertificateAuthority::OsRoots;
582
583 #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
584 let tls_ca = CertificateAuthority::PemFile;
585
586 Self {
587 protocol,
588 host: ConfigSetting::new_specified(host),
589 port: ConfigSetting::new_specified(port),
590 net_interface: ConfigSetting::new_default(None),
591 max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
592 max_name_len: ConfigSetting::new_default(MAX_NAME_LEN_DEFAULT),
593 auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
594 username: ConfigSetting::new_default(None),
595 password: ConfigSetting::new_default(None),
596 token: ConfigSetting::new_default(None),
597
598 #[cfg(feature = "_sender-tcp")]
599 token_x: ConfigSetting::new_default(None),
600
601 #[cfg(feature = "_sender-tcp")]
602 token_y: ConfigSetting::new_default(None),
603
604 protocol_version: ConfigSetting::new_default(None),
605
606 #[cfg(feature = "insecure-skip-verify")]
607 tls_verify: ConfigSetting::new_default(true),
608
609 tls_ca: ConfigSetting::new_default(tls_ca),
610 tls_roots: ConfigSetting::new_default(None),
611
612 #[cfg(feature = "sync-sender-http")]
613 http: if protocol.is_httpx() {
614 Some(conf::HttpConfig::default())
615 } else {
616 None
617 },
618 }
619 }
620
621 pub fn bind_interface<I: Into<String>>(self, addr: I) -> Result<Self> {
627 #[cfg(feature = "_sender-tcp")]
628 {
629 let mut builder = self;
630 builder.ensure_is_tcpx("bind_interface")?;
631 builder
632 .net_interface
633 .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
634 Ok(builder)
635 }
636
637 #[cfg(not(feature = "_sender-tcp"))]
638 {
639 let _ = addr;
640 Err(error::fmt!(
641 ConfigError,
642 "The \"bind_interface\" setting can only be used with the TCP protocol."
643 ))
644 }
645 }
646
647 pub fn username(mut self, username: &str) -> Result<Self> {
656 self.username
657 .set_specified("username", Some(validate_value(username.to_string())?))?;
658 Ok(self)
659 }
660
661 pub fn password(mut self, password: &str) -> Result<Self> {
664 self.password
665 .set_specified("password", Some(validate_value(password.to_string())?))?;
666 Ok(self)
667 }
668
669 pub fn token(mut self, token: &str) -> Result<Self> {
672 self.token
673 .set_specified("token", Some(validate_value(token.to_string())?))?;
674 Ok(self)
675 }
676
677 pub fn token_x(self, token_x: &str) -> Result<Self> {
679 #[cfg(feature = "_sender-tcp")]
680 {
681 let mut builder = self;
682 builder
683 .token_x
684 .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
685 Ok(builder)
686 }
687
688 #[cfg(not(feature = "_sender-tcp"))]
689 {
690 let _ = token_x;
691 Err(error::fmt!(
692 ConfigError,
693 "cannot specify \"token_x\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
694 ))
695 }
696 }
697
698 pub fn token_y(self, token_y: &str) -> Result<Self> {
700 #[cfg(feature = "_sender-tcp")]
701 {
702 let mut builder = self;
703 builder
704 .token_y
705 .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
706 Ok(builder)
707 }
708
709 #[cfg(not(feature = "_sender-tcp"))]
710 {
711 let _ = token_y;
712 Err(error::fmt!(
713 ConfigError,
714 "cannot specify \"token_y\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
715 ))
716 }
717 }
718
719 pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result<Self> {
728 self.protocol_version
729 .set_specified("protocol_version", Some(protocol_version))?;
730 Ok(self)
731 }
732
733 pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
737 self.auth_timeout.set_specified("auth_timeout", value)?;
738 Ok(self)
739 }
740
741 pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
743 if !self.protocol.tls_enabled() {
744 return Err(error::fmt!(
745 ConfigError,
746 "Cannot set {property:?}: TLS is not supported for protocol {}",
747 self.protocol
748 ));
749 }
750 Ok(())
751 }
752
753 #[cfg(feature = "insecure-skip-verify")]
759 pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
760 self.ensure_tls_enabled("tls_verify")?;
761 self.tls_verify.set_specified("tls_verify", verify)?;
762 Ok(self)
763 }
764
765 pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
768 self.ensure_tls_enabled("tls_ca")?;
769 self.tls_ca.set_specified("tls_ca", ca)?;
770 Ok(self)
771 }
772
773 pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
779 let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
780 let path = path.into();
781 let _file = std::fs::File::open(&path).map_err(|io_err| {
783 error::fmt!(
784 ConfigError,
785 "Could not open root certificate file from path {:?}: {}",
786 path,
787 io_err
788 )
789 })?;
790 builder.tls_roots.set_specified("tls_roots", Some(path))?;
791 Ok(builder)
792 }
793
794 pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
797 let min = 1024;
798 if value < min {
799 return Err(error::fmt!(
800 ConfigError,
801 "max_buf_size\" must be at least {min} bytes."
802 ));
803 }
804 self.max_buf_size.set_specified("max_buf_size", value)?;
805 Ok(self)
806 }
807
808 pub fn max_name_len(mut self, value: usize) -> Result<Self> {
814 if value < 16 {
815 return Err(error::fmt!(
816 ConfigError,
817 "max_name_len must be at least 16 bytes."
818 ));
819 }
820 self.max_name_len.set_specified("max_name_len", value)?;
821 Ok(self)
822 }
823
824 #[cfg(feature = "sync-sender-http")]
825 pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
828 if let Some(http) = &mut self.http {
829 http.retry_timeout.set_specified("retry_timeout", value)?;
830 } else {
831 return Err(error::fmt!(
832 ConfigError,
833 "retry_timeout is supported only in ILP over HTTP."
834 ));
835 }
836 Ok(self)
837 }
838
839 #[cfg(feature = "sync-sender-http")]
840 pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
850 if let Some(http) = &mut self.http {
851 http.request_min_throughput
852 .set_specified("request_min_throughput", value)?;
853 } else {
854 return Err(error::fmt!(
855 ConfigError,
856 "\"request_min_throughput\" is supported only in ILP over HTTP."
857 ));
858 }
859 Ok(self)
860 }
861
862 #[cfg(feature = "sync-sender-http")]
863 pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
868 if let Some(http) = &mut self.http {
869 if value.is_zero() {
870 return Err(error::fmt!(
871 ConfigError,
872 "\"request_timeout\" must be greater than 0."
873 ));
874 }
875 http.request_timeout
876 .set_specified("request_timeout", value)?;
877 } else {
878 return Err(error::fmt!(
879 ConfigError,
880 "\"request_timeout\" is supported only in ILP over HTTP."
881 ));
882 }
883 Ok(self)
884 }
885
886 #[cfg(feature = "sync-sender-http")]
887 #[doc(hidden)]
891 pub fn user_agent(mut self, value: &str) -> Result<Self> {
892 let value = validate_value(value)?;
893 if let Some(http) = &mut self.http {
894 http.user_agent = value.to_string();
895 }
896 Ok(self)
897 }
898
899 fn build_auth(&self) -> Result<Option<conf::AuthParams>> {
900 match (
901 self.protocol,
902 self.username.deref(),
903 self.password.deref(),
904 self.token.deref(),
905
906 #[cfg(feature = "_sender-tcp")]
907 self.token_x.deref(),
908
909 #[cfg(not(feature = "_sender-tcp"))]
910 None::<String>,
911
912 #[cfg(feature = "_sender-tcp")]
913 self.token_y.deref(),
914
915 #[cfg(not(feature = "_sender-tcp"))]
916 None::<String>,
917 ) {
918 (_, None, None, None, None, None) => Ok(None),
919
920 #[cfg(feature = "_sender-tcp")]
921 (
922 protocol,
923 Some(username),
924 None,
925 Some(token),
926 Some(token_x),
927 Some(token_y),
928 ) if protocol.is_tcpx() => Ok(Some(conf::AuthParams::Ecdsa(conf::EcdsaAuthParams {
929 key_id: username.to_string(),
930 priv_key: token.to_string(),
931 pub_key_x: token_x.to_string(),
932 pub_key_y: token_y.to_string(),
933 }))),
934
935 #[cfg(feature = "_sender-tcp")]
936 (protocol, Some(_username), Some(_password), None, None, None)
937 if protocol.is_tcpx() => {
938 Err(error::fmt!(ConfigError,
939 r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
940 ))
941 }
942
943 #[cfg(feature = "_sender-tcp")]
944 (protocol, None, None, Some(_token), None, None)
945 if protocol.is_tcpx() => {
946 Err(error::fmt!(ConfigError, "Token authentication only be used with the ILP/HTTP protocol."))
947 }
948
949 #[cfg(feature = "_sender-tcp")]
950 (protocol, _username, None, _token, _token_x, _token_y)
951 if protocol.is_tcpx() => {
952 Err(error::fmt!(ConfigError,
953 r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
954 ))
955 }
956 #[cfg(feature = "_sender-http")]
957 (protocol, Some(username), Some(password), None, None, None)
958 if protocol.is_httpx() => {
959 Ok(Some(conf::AuthParams::Basic(conf::BasicAuthParams {
960 username: username.to_string(),
961 password: password.to_string(),
962 })))
963 }
964 #[cfg(feature = "_sender-http")]
965 (protocol, Some(_username), None, None, None, None)
966 if protocol.is_httpx() => {
967 Err(error::fmt!(ConfigError,
968 r##"Basic authentication parameter "username" is present, but "password" is missing."##,
969 ))
970 }
971 #[cfg(feature = "_sender-http")]
972 (protocol, None, Some(_password), None, None, None)
973 if protocol.is_httpx() => {
974 Err(error::fmt!(ConfigError,
975 r##"Basic authentication parameter "password" is present, but "username" is missing."##,
976 ))
977 }
978 #[cfg(feature = "sync-sender-http")]
979 (protocol, None, None, Some(token), None, None)
980 if protocol.is_httpx() => {
981 Ok(Some(conf::AuthParams::Token(conf::TokenAuthParams {
982 token: token.to_string(),
983 })))
984 }
985 #[cfg(feature = "sync-sender-http")]
986 (
987 protocol,
988 Some(_username),
989 None,
990 Some(_token),
991 Some(_token_x),
992 Some(_token_y),
993 ) if protocol.is_httpx() => {
994 Err(error::fmt!(ConfigError, "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."))
995 }
996 #[cfg(feature = "_sender-http")]
997 (protocol, _username, _password, _token, None, None)
998 if protocol.is_httpx() => {
999 Err(error::fmt!(ConfigError,
1000 r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
1001 ))
1002 }
1003 _ => {
1004 Err(error::fmt!(ConfigError,
1005 r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
1006 ))
1007 }
1008 }
1009 }
1010
1011 #[cfg(feature = "_sync-sender")]
1012 pub fn build(&self) -> Result<Sender> {
1019 let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
1020
1021 if self.protocol.tls_enabled() {
1022 write!(descr, "tls=enabled,").unwrap();
1023 } else {
1024 write!(descr, "tls=disabled,").unwrap();
1025 }
1026
1027 #[cfg(feature = "insecure-skip-verify")]
1028 let tls_verify = *self.tls_verify;
1029
1030 let tls_settings = tls::TlsSettings::build(
1031 self.protocol.tls_enabled(),
1032 #[cfg(feature = "insecure-skip-verify")]
1033 tls_verify,
1034 *self.tls_ca,
1035 self.tls_roots.deref().as_deref(),
1036 )?;
1037
1038 let auth = self.build_auth()?;
1039
1040 let handler = match self.protocol {
1041 #[cfg(feature = "sync-sender-tcp")]
1042 Protocol::Tcp | Protocol::Tcps => connect_tcp(
1043 self.host.as_str(),
1044 self.port.as_str(),
1045 self.net_interface.deref().as_deref(),
1046 *self.auth_timeout,
1047 tls_settings,
1048 &auth,
1049 )?,
1050 #[cfg(feature = "sync-sender-http")]
1051 Protocol::Http | Protocol::Https => {
1052 use ureq::unversioned::transport::Connector;
1053 use ureq::unversioned::transport::TcpConnector;
1054 if self.net_interface.is_some() {
1055 return Err(error::fmt!(
1057 InvalidApiCall,
1058 "net_interface is not supported for ILP over HTTP."
1059 ));
1060 }
1061
1062 let http_config = self.http.as_ref().unwrap();
1063 let user_agent = http_config.user_agent.as_str();
1064 let connector = TcpConnector::default();
1065
1066 let agent_builder = ureq::Agent::config_builder()
1067 .user_agent(user_agent)
1068 .no_delay(true);
1069
1070 let tls_config = match tls_settings {
1071 Some(tls_settings) => Some(tls::configure_tls(tls_settings)?),
1072 None => None,
1073 };
1074
1075 let connector = connector.chain(TlsConnector::new(tls_config));
1076
1077 let auth = match auth {
1078 Some(conf::AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
1079 Some(conf::AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
1080
1081 #[cfg(feature = "sync-sender-tcp")]
1082 Some(conf::AuthParams::Ecdsa(_)) => {
1083 return Err(fmt!(
1084 AuthError,
1085 "ECDSA authentication is not supported for ILP over HTTP. \
1086 Please use basic or token authentication instead."
1087 ));
1088 }
1089 None => None,
1090 };
1091 let agent_builder = agent_builder
1092 .timeout_connect(Some(*http_config.request_timeout.deref()))
1093 .http_status_as_error(false);
1094 let agent = ureq::Agent::with_parts(
1095 agent_builder.build(),
1096 connector,
1097 ureq::unversioned::resolver::DefaultResolver::default(),
1098 );
1099 let proto = self.protocol.schema();
1100 let url = format!(
1101 "{}://{}:{}/write",
1102 proto,
1103 self.host.deref(),
1104 self.port.deref()
1105 );
1106 SyncProtocolHandler::SyncHttp(SyncHttpHandlerState {
1107 agent,
1108 url,
1109 auth,
1110 config: self.http.as_ref().unwrap().clone(),
1111 })
1112 }
1113 };
1114
1115 #[allow(unused_mut)]
1116 let mut max_name_len = *self.max_name_len;
1117
1118 let protocol_version = match self.protocol_version.deref() {
1119 Some(v) => *v,
1120 None => match self.protocol {
1121 #[cfg(feature = "sync-sender-tcp")]
1122 Protocol::Tcp | Protocol::Tcps => ProtocolVersion::V1,
1123 #[cfg(feature = "sync-sender-http")]
1124 Protocol::Http | Protocol::Https => {
1125 #[allow(irrefutable_let_patterns)]
1126 if let SyncProtocolHandler::SyncHttp(http_state) = &handler {
1127 let settings_url = &format!(
1128 "{}://{}:{}/settings",
1129 self.protocol.schema(),
1130 self.host.deref(),
1131 self.port.deref()
1132 );
1133 let (protocol_versions, server_max_name_len) =
1134 read_server_settings(http_state, settings_url, max_name_len)?;
1135 max_name_len = server_max_name_len;
1136 if protocol_versions.contains(&ProtocolVersion::V2) {
1137 ProtocolVersion::V2
1138 } else if protocol_versions.contains(&ProtocolVersion::V1) {
1139 ProtocolVersion::V1
1140 } else {
1141 return Err(fmt!(
1142 ProtocolVersionError,
1143 "Server does not support current client"
1144 ));
1145 }
1146 } else {
1147 unreachable!("HTTP handler should be used for HTTP protocol");
1148 }
1149 }
1150 },
1151 };
1152
1153 if auth.is_some() {
1154 descr.push_str("auth=on]");
1155 } else {
1156 descr.push_str("auth=off]");
1157 }
1158
1159 let sender = Sender::new(
1160 descr,
1161 handler,
1162 *self.max_buf_size,
1163 protocol_version,
1164 max_name_len,
1165 );
1166
1167 Ok(sender)
1168 }
1169
1170 #[cfg(feature = "_sender-tcp")]
1171 fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
1172 if self.protocol.is_tcpx() {
1173 Ok(())
1174 } else {
1175 Err(fmt!(
1176 ConfigError,
1177 "The {param_name:?} setting can only be used with the TCP protocol."
1178 ))
1179 }
1180 }
1181}
1182
1183fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
1186 let str_ref = value.as_ref();
1187 for (p, c) in str_ref.chars().enumerate() {
1188 if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
1189 return Err(error::fmt!(
1190 ConfigError,
1191 "Invalid character {c:?} at position {p}"
1192 ));
1193 }
1194 }
1195 Ok(value)
1196}
1197
1198fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
1199where
1200 T: FromStr,
1201 T::Err: std::fmt::Debug,
1202{
1203 str_value.parse().map_err(|e| {
1204 fmt!(
1205 ConfigError,
1206 "Could not parse {param_name:?} to number: {e:?}"
1207 )
1208 })
1209}
1210
1211#[cfg(feature = "_sender-tcp")]
1212fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
1213 use base64ct::{Base64UrlUnpadded, Encoding};
1214 Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
1215 fmt!(
1216 AuthError,
1217 "Misconfigured ILP authentication keys. Could not decode {}: {}. \
1218 Hint: Check the keys for a possible typo.",
1219 descr,
1220 b64_err
1221 )
1222 })
1223}
1224
1225#[cfg(feature = "_sender-tcp")]
1226fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
1227 let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
1228 let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
1229
1230 let mut encoded = Vec::new();
1232 encoded.push(4u8); let pub_key_x_ken = pub_key_x.len();
1234 if pub_key_x_ken > 32 {
1235 return Err(fmt!(
1236 AuthError,
1237 "Misconfigured ILP authentication keys. Public key x is too long. \
1238 Hint: Check the keys for a possible typo."
1239 ));
1240 }
1241 let pub_key_y_len = pub_key_y.len();
1242 if pub_key_y_len > 32 {
1243 return Err(fmt!(
1244 AuthError,
1245 "Misconfigured ILP authentication keys. Public key y is too long. \
1246 Hint: Check the keys for a possible typo."
1247 ));
1248 }
1249 encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
1250 encoded.append(&mut pub_key_x);
1251 encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
1252 encoded.append(&mut pub_key_y);
1253 Ok(encoded)
1254}
1255
1256#[cfg(feature = "_sender-tcp")]
1257fn parse_key_pair(auth: &conf::EcdsaAuthParams) -> Result<EcdsaKeyPair> {
1258 let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
1259 let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
1260
1261 #[cfg(feature = "aws-lc-crypto")]
1262 let res = EcdsaKeyPair::from_private_key_and_public_key(
1263 &ECDSA_P256_SHA256_FIXED_SIGNING,
1264 &private_key[..],
1265 &public_key[..],
1266 );
1267
1268 #[cfg(feature = "ring-crypto")]
1269 let res = {
1270 let system_random = SystemRandom::new();
1271 EcdsaKeyPair::from_private_key_and_public_key(
1272 &ECDSA_P256_SHA256_FIXED_SIGNING,
1273 &private_key[..],
1274 &public_key[..],
1275 &system_random,
1276 )
1277 };
1278
1279 res.map_err(|key_rejected| {
1280 fmt!(
1281 AuthError,
1282 "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
1283 key_rejected
1284 )
1285 })
1286}
1287
1288struct DebugBytes<'a>(pub &'a [u8]);
1289
1290impl<'a> Debug for DebugBytes<'a> {
1291 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1292 write!(f, "b\"")?;
1293
1294 for &byte in self.0 {
1295 match byte {
1296 0x20..=0x21 | 0x23..=0x5B | 0x5D..=0x7E => {
1298 write!(f, "{}", byte as char)?;
1299 }
1300 b'\n' => write!(f, "\\n")?,
1302 b'\r' => write!(f, "\\r")?,
1303 b'\t' => write!(f, "\\t")?,
1304 b'\\' => write!(f, "\\\\")?,
1305 b'"' => write!(f, "\\\"")?,
1306 b'\0' => write!(f, "\\0")?,
1307 _ => write!(f, "\\x{byte:02x}")?,
1309 }
1310 }
1311
1312 write!(f, "\"")
1313 }
1314}
1315
1316#[cfg(test)]
1317mod tests;