1#![doc = include_str!("mod.md")]
26
27pub use self::ndarr::{ArrayElement, NdArrayView};
28pub use self::timestamp::*;
29use crate::error::{self, Result, fmt};
30use crate::ingress::conf::ConfigSetting;
31use core::time::Duration;
32use std::collections::HashMap;
33use std::fmt::{Debug, Display, Formatter, Write};
34
35use std::ops::Deref;
36use std::path::PathBuf;
37use std::str::FromStr;
38
39mod tls;
40
41#[cfg(all(feature = "_sender-tcp", feature = "aws-lc-crypto"))]
42use aws_lc_rs::{
43 rand::SystemRandom,
44 signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
45};
46
47#[cfg(all(feature = "_sender-tcp", feature = "ring-crypto"))]
48use ring::{
49 rand::SystemRandom,
50 signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
51};
52
53mod conf;
54
55pub(crate) mod ndarr;
56
57mod timestamp;
58
59mod buffer;
60pub use buffer::*;
61
62mod sender;
63pub use sender::*;
64
65const MAX_NAME_LEN_DEFAULT: usize = 127;
66
67pub const MAX_ARRAY_DIMS: usize = 32;
69pub const MAX_ARRAY_BUFFER_SIZE: usize = 512 * 1024 * 1024; pub const MAX_ARRAY_DIM_LEN: usize = 0x0FFF_FFFF; pub(crate) const ARRAY_BINARY_FORMAT_TYPE: u8 = 14;
73pub(crate) const DOUBLE_BINARY_FORMAT_TYPE: u8 = 16;
74
75#[derive(Debug, Copy, Clone, PartialEq)]
77pub enum ProtocolVersion {
78 V1 = 1,
82
83 V2 = 2,
88}
89
90impl Display for ProtocolVersion {
91 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
92 match self {
93 ProtocolVersion::V1 => write!(f, "v1"),
94 ProtocolVersion::V2 => write!(f, "v2"),
95 }
96 }
97}
98
99#[cfg(feature = "_sender-tcp")]
100fn map_io_to_socket_err(prefix: &str, io_err: std::io::Error) -> error::Error {
101 fmt!(SocketError, "{}{}", prefix, io_err)
102}
103
104#[derive(PartialEq, Debug, Clone, Copy)]
107pub enum CertificateAuthority {
108 #[cfg(feature = "tls-webpki-certs")]
111 WebpkiRoots,
112
113 #[cfg(feature = "tls-native-certs")]
115 OsRoots,
116
117 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
119 WebpkiAndOsRoots,
120
121 PemFile,
123}
124
125pub struct Port(String);
145
146impl From<String> for Port {
147 fn from(s: String) -> Self {
148 Port(s)
149 }
150}
151
152impl From<&str> for Port {
153 fn from(s: &str) -> Self {
154 Port(s.to_owned())
155 }
156}
157
158impl From<u16> for Port {
159 fn from(p: u16) -> Self {
160 Port(p.to_string())
161 }
162}
163
164fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
165 if let Some(auto_flush) = params.get("auto_flush")
166 && auto_flush.as_str() != "off"
167 {
168 return Err(error::fmt!(
169 ConfigError,
170 "Invalid auto_flush value '{auto_flush}'. This client does not \
171 support auto-flush, so the only accepted value is 'off'"
172 ));
173 }
174
175 for ¶m in ["auto_flush_rows", "auto_flush_bytes"].iter() {
176 if params.contains_key(param) {
177 return Err(error::fmt!(
178 ConfigError,
179 "Invalid configuration parameter {:?}. This client does not support auto-flush",
180 param
181 ));
182 }
183 }
184 Ok(())
185}
186
187#[derive(PartialEq, Debug, Clone, Copy)]
189pub enum Protocol {
190 #[cfg(feature = "_sender-tcp")]
191 Tcp,
193
194 #[cfg(feature = "_sender-tcp")]
195 Tcps,
197
198 #[cfg(feature = "_sender-http")]
199 Http,
202
203 #[cfg(feature = "_sender-http")]
204 Https,
206}
207
208impl Display for Protocol {
209 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
210 f.write_str(self.schema())
211 }
212}
213
214impl Protocol {
215 fn default_port(&self) -> &str {
216 match self {
217 #[cfg(feature = "_sender-tcp")]
218 Protocol::Tcp | Protocol::Tcps => "9009",
219 #[cfg(feature = "_sender-http")]
220 Protocol::Http | Protocol::Https => "9000",
221 }
222 }
223
224 fn tls_enabled(&self) -> bool {
225 match self {
226 #[cfg(feature = "_sender-tcp")]
227 Protocol::Tcp => false,
228 #[cfg(feature = "_sender-tcp")]
229 Protocol::Tcps => true,
230 #[cfg(feature = "_sender-http")]
231 Protocol::Http => false,
232 #[cfg(feature = "_sender-http")]
233 Protocol::Https => true,
234 }
235 }
236
237 #[cfg(feature = "_sender-tcp")]
238 fn is_tcpx(&self) -> bool {
239 match self {
240 Protocol::Tcp | Protocol::Tcps => true,
241 #[cfg(feature = "_sender-http")]
242 Protocol::Http | Protocol::Https => false,
243 }
244 }
245
246 #[cfg(feature = "_sender-http")]
247 fn is_httpx(&self) -> bool {
248 match self {
249 #[cfg(feature = "_sender-tcp")]
250 Protocol::Tcp | Protocol::Tcps => false,
251 Protocol::Http | Protocol::Https => true,
252 }
253 }
254
255 fn schema(&self) -> &str {
256 match self {
257 #[cfg(feature = "_sender-tcp")]
258 Protocol::Tcp => "tcp",
259 #[cfg(feature = "_sender-tcp")]
260 Protocol::Tcps => "tcps",
261 #[cfg(feature = "_sender-http")]
262 Protocol::Http => "http",
263 #[cfg(feature = "_sender-http")]
264 Protocol::Https => "https",
265 }
266 }
267
268 fn from_schema(schema: &str) -> Result<Self> {
269 match schema {
270 #[cfg(feature = "_sender-tcp")]
271 "tcp" => Ok(Protocol::Tcp),
272 #[cfg(feature = "_sender-tcp")]
273 "tcps" => Ok(Protocol::Tcps),
274 #[cfg(feature = "_sender-http")]
275 "http" => Ok(Protocol::Http),
276 #[cfg(feature = "_sender-http")]
277 "https" => Ok(Protocol::Https),
278 _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
279 }
280 }
281}
282
283#[derive(Debug, Clone)]
329pub struct SenderBuilder {
330 protocol: Protocol,
331 host: ConfigSetting<String>,
332 port: ConfigSetting<String>,
333 net_interface: ConfigSetting<Option<String>>,
334 max_buf_size: ConfigSetting<usize>,
335 max_name_len: ConfigSetting<usize>,
336 auth_timeout: ConfigSetting<Duration>,
337 username: ConfigSetting<Option<String>>,
338 password: ConfigSetting<Option<String>>,
339 token: ConfigSetting<Option<String>>,
340
341 #[cfg(feature = "_sender-tcp")]
342 token_x: ConfigSetting<Option<String>>,
343
344 #[cfg(feature = "_sender-tcp")]
345 token_y: ConfigSetting<Option<String>>,
346
347 protocol_version: ConfigSetting<Option<ProtocolVersion>>,
348
349 #[cfg(feature = "insecure-skip-verify")]
350 tls_verify: ConfigSetting<bool>,
351
352 tls_ca: ConfigSetting<CertificateAuthority>,
353 tls_roots: ConfigSetting<Option<PathBuf>>,
354
355 #[cfg(feature = "_sender-http")]
356 http: Option<conf::HttpConfig>,
357}
358
359impl SenderBuilder {
360 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
387 let conf = conf.as_ref();
388 let conf = questdb_confstr::parse_conf_str(conf)
389 .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
390 let service = conf.service();
391 let params = conf.params();
392
393 let protocol = Protocol::from_schema(service)?;
394
395 let Some(addr) = params.get("addr") else {
396 return Err(error::fmt!(
397 ConfigError,
398 "Missing \"addr\" parameter in config string"
399 ));
400 };
401 let (host, port) = match addr.split_once(':') {
402 Some((h, p)) => (h, p),
403 None => (addr.as_str(), protocol.default_port()),
404 };
405 let mut builder = SenderBuilder::new(protocol, host, port);
406
407 validate_auto_flush_params(params)?;
408
409 for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
410 builder = match key {
411 "username" => builder.username(val)?,
412 "password" => builder.password(val)?,
413 "token" => builder.token(val)?,
414 "token_x" => builder.token_x(val)?,
415 "token_y" => builder.token_y(val)?,
416 "bind_interface" => builder.bind_interface(val)?,
417 "protocol_version" => match val {
418 "1" => builder.protocol_version(ProtocolVersion::V1)?,
419 "2" => builder.protocol_version(ProtocolVersion::V2)?,
420 "auto" => builder,
421 invalid => {
422 return Err(error::fmt!(
423 ConfigError,
424 "invalid \"protocol_version\" [value={invalid}, allowed-values=[auto, 1, 2]]]\"]"
425 ));
426 }
427 },
428 "max_name_len" => builder.max_name_len(parse_conf_value(key, val)?)?,
429
430 "init_buf_size" => {
431 return Err(error::fmt!(
432 ConfigError,
433 "\"init_buf_size\" is not supported in config string"
434 ));
435 }
436
437 "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
438
439 "auth_timeout" => {
440 builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
441 }
442
443 "tls_verify" => {
444 let verify = match val {
445 "on" => true,
446 "unsafe_off" => false,
447 _ => {
448 return Err(fmt!(
449 ConfigError,
450 r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
451 ));
452 }
453 };
454
455 #[cfg(not(feature = "insecure-skip-verify"))]
456 {
457 if !verify {
458 return Err(fmt!(
459 ConfigError,
460 r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
461 ));
462 }
463 builder
464 }
465
466 #[cfg(feature = "insecure-skip-verify")]
467 builder.tls_verify(verify)?
468 }
469
470 "tls_ca" => {
471 let ca = match val {
472 #[cfg(feature = "tls-webpki-certs")]
473 "webpki_roots" => CertificateAuthority::WebpkiRoots,
474
475 #[cfg(not(feature = "tls-webpki-certs"))]
476 "webpki_roots" => {
477 return Err(error::fmt!(
478 ConfigError,
479 "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature"
480 ));
481 }
482
483 #[cfg(feature = "tls-native-certs")]
484 "os_roots" => CertificateAuthority::OsRoots,
485
486 #[cfg(not(feature = "tls-native-certs"))]
487 "os_roots" => {
488 return Err(error::fmt!(
489 ConfigError,
490 "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature"
491 ));
492 }
493
494 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
495 "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
496
497 #[cfg(not(all(
498 feature = "tls-webpki-certs",
499 feature = "tls-native-certs"
500 )))]
501 "webpki_and_os_roots" => {
502 return Err(error::fmt!(
503 ConfigError,
504 "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features"
505 ));
506 }
507
508 _ => {
509 return Err(error::fmt!(
510 ConfigError,
511 "Invalid value {val:?} for \"tls_ca\""
512 ));
513 }
514 };
515 builder.tls_ca(ca)?
516 }
517
518 "tls_roots" => {
519 let path = PathBuf::from_str(val).map_err(|e| {
520 error::fmt!(
521 ConfigError,
522 "Invalid path {:?} for \"tls_roots\": {}",
523 val,
524 e
525 )
526 })?;
527 builder.tls_roots(path)?
528 }
529
530 "tls_roots_password" => {
531 return Err(error::fmt!(
532 ConfigError,
533 "\"tls_roots_password\" is not supported."
534 ));
535 }
536
537 #[cfg(feature = "sync-sender-http")]
538 "request_min_throughput" => {
539 builder.request_min_throughput(parse_conf_value(key, val)?)?
540 }
541
542 #[cfg(feature = "sync-sender-http")]
543 "request_timeout" => {
544 builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
545 }
546
547 #[cfg(feature = "sync-sender-http")]
548 "retry_timeout" => {
549 builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
550 }
551
552 _ => builder,
557 };
558 }
559
560 Ok(builder)
561 }
562
563 pub fn from_env() -> Result<Self> {
568 let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
569 error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
570 })?;
571 Self::from_conf(conf)
572 }
573
574 pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
594 let host = host.into();
595 let port: Port = port.into();
596 let port = port.0;
597
598 #[cfg(feature = "tls-webpki-certs")]
599 let tls_ca = CertificateAuthority::WebpkiRoots;
600
601 #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
602 let tls_ca = CertificateAuthority::OsRoots;
603
604 #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
605 let tls_ca = CertificateAuthority::PemFile;
606
607 Self {
608 protocol,
609 host: ConfigSetting::new_specified(host),
610 port: ConfigSetting::new_specified(port),
611 net_interface: ConfigSetting::new_default(None),
612 max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
613 max_name_len: ConfigSetting::new_default(MAX_NAME_LEN_DEFAULT),
614 auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
615 username: ConfigSetting::new_default(None),
616 password: ConfigSetting::new_default(None),
617 token: ConfigSetting::new_default(None),
618
619 #[cfg(feature = "_sender-tcp")]
620 token_x: ConfigSetting::new_default(None),
621
622 #[cfg(feature = "_sender-tcp")]
623 token_y: ConfigSetting::new_default(None),
624
625 protocol_version: ConfigSetting::new_default(None),
626
627 #[cfg(feature = "insecure-skip-verify")]
628 tls_verify: ConfigSetting::new_default(true),
629
630 tls_ca: ConfigSetting::new_default(tls_ca),
631 tls_roots: ConfigSetting::new_default(None),
632
633 #[cfg(feature = "sync-sender-http")]
634 http: if protocol.is_httpx() {
635 Some(conf::HttpConfig::default())
636 } else {
637 None
638 },
639 }
640 }
641
642 pub fn bind_interface<I: Into<String>>(self, addr: I) -> Result<Self> {
648 #[cfg(feature = "_sender-tcp")]
649 {
650 let mut builder = self;
651 builder.ensure_is_tcpx("bind_interface")?;
652 builder
653 .net_interface
654 .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
655 Ok(builder)
656 }
657
658 #[cfg(not(feature = "_sender-tcp"))]
659 {
660 let _ = addr;
661 Err(error::fmt!(
662 ConfigError,
663 "The \"bind_interface\" setting can only be used with the TCP protocol."
664 ))
665 }
666 }
667
668 pub fn username(mut self, username: &str) -> Result<Self> {
677 self.username
678 .set_specified("username", Some(validate_value(username.to_string())?))?;
679 Ok(self)
680 }
681
682 pub fn password(mut self, password: &str) -> Result<Self> {
685 self.password
686 .set_specified("password", Some(validate_value(password.to_string())?))?;
687 Ok(self)
688 }
689
690 pub fn token(mut self, token: &str) -> Result<Self> {
693 self.token
694 .set_specified("token", Some(validate_value(token.to_string())?))?;
695 Ok(self)
696 }
697
698 pub fn token_x(self, token_x: &str) -> Result<Self> {
700 #[cfg(feature = "_sender-tcp")]
701 {
702 let mut builder = self;
703 builder
704 .token_x
705 .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
706 Ok(builder)
707 }
708
709 #[cfg(not(feature = "_sender-tcp"))]
710 {
711 let _ = token_x;
712 Err(error::fmt!(
713 ConfigError,
714 "cannot specify \"token_x\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
715 ))
716 }
717 }
718
719 pub fn token_y(self, token_y: &str) -> Result<Self> {
721 #[cfg(feature = "_sender-tcp")]
722 {
723 let mut builder = self;
724 builder
725 .token_y
726 .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
727 Ok(builder)
728 }
729
730 #[cfg(not(feature = "_sender-tcp"))]
731 {
732 let _ = token_y;
733 Err(error::fmt!(
734 ConfigError,
735 "cannot specify \"token_y\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
736 ))
737 }
738 }
739
740 pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result<Self> {
749 self.protocol_version
750 .set_specified("protocol_version", Some(protocol_version))?;
751 Ok(self)
752 }
753
754 pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
758 self.auth_timeout.set_specified("auth_timeout", value)?;
759 Ok(self)
760 }
761
762 pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
764 if !self.protocol.tls_enabled() {
765 return Err(error::fmt!(
766 ConfigError,
767 "Cannot set {property:?}: TLS is not supported for protocol {}",
768 self.protocol
769 ));
770 }
771 Ok(())
772 }
773
774 #[cfg(feature = "insecure-skip-verify")]
780 pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
781 self.ensure_tls_enabled("tls_verify")?;
782 self.tls_verify.set_specified("tls_verify", verify)?;
783 Ok(self)
784 }
785
786 pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
789 self.ensure_tls_enabled("tls_ca")?;
790 self.tls_ca.set_specified("tls_ca", ca)?;
791 Ok(self)
792 }
793
794 pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
800 let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
801 let path = path.into();
802 let _file = std::fs::File::open(&path).map_err(|io_err| {
804 error::fmt!(
805 ConfigError,
806 "Could not open root certificate file from path {:?}: {}",
807 path,
808 io_err
809 )
810 })?;
811 builder.tls_roots.set_specified("tls_roots", Some(path))?;
812 Ok(builder)
813 }
814
815 pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
818 let min = 1024;
819 if value < min {
820 return Err(error::fmt!(
821 ConfigError,
822 "max_buf_size\" must be at least {min} bytes."
823 ));
824 }
825 self.max_buf_size.set_specified("max_buf_size", value)?;
826 Ok(self)
827 }
828
829 pub fn max_name_len(mut self, value: usize) -> Result<Self> {
835 if value < 16 {
836 return Err(error::fmt!(
837 ConfigError,
838 "max_name_len must be at least 16 bytes."
839 ));
840 }
841 self.max_name_len.set_specified("max_name_len", value)?;
842 Ok(self)
843 }
844
845 #[cfg(feature = "sync-sender-http")]
846 pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
849 if let Some(http) = &mut self.http {
850 http.retry_timeout.set_specified("retry_timeout", value)?;
851 } else {
852 return Err(error::fmt!(
853 ConfigError,
854 "retry_timeout is supported only in ILP over HTTP."
855 ));
856 }
857 Ok(self)
858 }
859
860 #[cfg(feature = "sync-sender-http")]
861 pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
871 if let Some(http) = &mut self.http {
872 http.request_min_throughput
873 .set_specified("request_min_throughput", value)?;
874 } else {
875 return Err(error::fmt!(
876 ConfigError,
877 "\"request_min_throughput\" is supported only in ILP over HTTP."
878 ));
879 }
880 Ok(self)
881 }
882
883 #[cfg(feature = "sync-sender-http")]
884 pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
889 if let Some(http) = &mut self.http {
890 if value.is_zero() {
891 return Err(error::fmt!(
892 ConfigError,
893 "\"request_timeout\" must be greater than 0."
894 ));
895 }
896 http.request_timeout
897 .set_specified("request_timeout", value)?;
898 } else {
899 return Err(error::fmt!(
900 ConfigError,
901 "\"request_timeout\" is supported only in ILP over HTTP."
902 ));
903 }
904 Ok(self)
905 }
906
907 #[cfg(feature = "sync-sender-http")]
908 #[doc(hidden)]
912 pub fn user_agent(mut self, value: &str) -> Result<Self> {
913 let value = validate_value(value)?;
914 if let Some(http) = &mut self.http {
915 http.user_agent = value.to_string();
916 }
917 Ok(self)
918 }
919
920 fn build_auth(&self) -> Result<Option<conf::AuthParams>> {
921 match (
922 self.protocol,
923 self.username.deref(),
924 self.password.deref(),
925 self.token.deref(),
926 #[cfg(feature = "_sender-tcp")]
927 self.token_x.deref(),
928 #[cfg(not(feature = "_sender-tcp"))]
929 None::<String>,
930 #[cfg(feature = "_sender-tcp")]
931 self.token_y.deref(),
932 #[cfg(not(feature = "_sender-tcp"))]
933 None::<String>,
934 ) {
935 (_, None, None, None, None, None) => Ok(None),
936
937 #[cfg(feature = "_sender-tcp")]
938 (protocol, Some(username), None, Some(token), Some(token_x), Some(token_y))
939 if protocol.is_tcpx() =>
940 {
941 Ok(Some(conf::AuthParams::Ecdsa(conf::EcdsaAuthParams {
942 key_id: username.to_string(),
943 priv_key: token.to_string(),
944 pub_key_x: token_x.to_string(),
945 pub_key_y: token_y.to_string(),
946 })))
947 }
948
949 #[cfg(feature = "_sender-tcp")]
950 (protocol, Some(_username), Some(_password), None, None, None)
951 if protocol.is_tcpx() =>
952 {
953 Err(error::fmt!(
954 ConfigError,
955 r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
956 ))
957 }
958
959 #[cfg(feature = "_sender-tcp")]
960 (protocol, None, None, Some(_token), None, None) if protocol.is_tcpx() => {
961 Err(error::fmt!(
962 ConfigError,
963 "Token authentication only be used with the ILP/HTTP protocol."
964 ))
965 }
966
967 #[cfg(feature = "_sender-tcp")]
968 (protocol, _username, None, _token, _token_x, _token_y) if protocol.is_tcpx() => {
969 Err(error::fmt!(
970 ConfigError,
971 r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
972 ))
973 }
974 #[cfg(feature = "_sender-http")]
975 (protocol, Some(username), Some(password), None, None, None) if protocol.is_httpx() => {
976 Ok(Some(conf::AuthParams::Basic(conf::BasicAuthParams {
977 username: username.to_string(),
978 password: password.to_string(),
979 })))
980 }
981 #[cfg(feature = "_sender-http")]
982 (protocol, Some(_username), None, None, None, None) if protocol.is_httpx() => {
983 Err(error::fmt!(
984 ConfigError,
985 r##"Basic authentication parameter "username" is present, but "password" is missing."##,
986 ))
987 }
988 #[cfg(feature = "_sender-http")]
989 (protocol, None, Some(_password), None, None, None) if protocol.is_httpx() => {
990 Err(error::fmt!(
991 ConfigError,
992 r##"Basic authentication parameter "password" is present, but "username" is missing."##,
993 ))
994 }
995 #[cfg(feature = "sync-sender-http")]
996 (protocol, None, None, Some(token), None, None) if protocol.is_httpx() => {
997 Ok(Some(conf::AuthParams::Token(conf::TokenAuthParams {
998 token: token.to_string(),
999 })))
1000 }
1001 #[cfg(feature = "sync-sender-http")]
1002 (protocol, Some(_username), None, Some(_token), Some(_token_x), Some(_token_y))
1003 if protocol.is_httpx() =>
1004 {
1005 Err(error::fmt!(
1006 ConfigError,
1007 "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
1008 ))
1009 }
1010 #[cfg(feature = "_sender-http")]
1011 (protocol, _username, _password, _token, None, None) if protocol.is_httpx() => {
1012 Err(error::fmt!(
1013 ConfigError,
1014 r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
1015 ))
1016 }
1017 _ => Err(error::fmt!(
1018 ConfigError,
1019 r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
1020 )),
1021 }
1022 }
1023
1024 #[cfg(feature = "_sync-sender")]
1025 pub fn build(&self) -> Result<Sender> {
1032 let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
1033
1034 if self.protocol.tls_enabled() {
1035 write!(descr, "tls=enabled,").unwrap();
1036 } else {
1037 write!(descr, "tls=disabled,").unwrap();
1038 }
1039
1040 #[cfg(feature = "insecure-skip-verify")]
1041 let tls_verify = *self.tls_verify;
1042
1043 let tls_settings = tls::TlsSettings::build(
1044 self.protocol.tls_enabled(),
1045 #[cfg(feature = "insecure-skip-verify")]
1046 tls_verify,
1047 *self.tls_ca,
1048 self.tls_roots.deref().as_deref(),
1049 )?;
1050
1051 let auth = self.build_auth()?;
1052
1053 let handler = match self.protocol {
1054 #[cfg(feature = "sync-sender-tcp")]
1055 Protocol::Tcp | Protocol::Tcps => connect_tcp(
1056 self.host.as_str(),
1057 self.port.as_str(),
1058 self.net_interface.deref().as_deref(),
1059 *self.auth_timeout,
1060 tls_settings,
1061 &auth,
1062 )?,
1063 #[cfg(feature = "sync-sender-http")]
1064 Protocol::Http | Protocol::Https => {
1065 use ureq::unversioned::transport::Connector;
1066 use ureq::unversioned::transport::TcpConnector;
1067 if self.net_interface.is_some() {
1068 return Err(error::fmt!(
1070 InvalidApiCall,
1071 "net_interface is not supported for ILP over HTTP."
1072 ));
1073 }
1074
1075 let http_config = self.http.as_ref().unwrap();
1076 let user_agent = http_config.user_agent.as_str();
1077 let connector = TcpConnector::default();
1078
1079 let agent_builder = ureq::Agent::config_builder()
1080 .user_agent(user_agent)
1081 .no_delay(true);
1082
1083 let tls_config = match tls_settings {
1084 Some(tls_settings) => Some(tls::configure_tls(tls_settings)?),
1085 None => None,
1086 };
1087
1088 let connector = connector.chain(TlsConnector::new(tls_config));
1089
1090 let auth = match auth {
1091 Some(conf::AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
1092 Some(conf::AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
1093
1094 #[cfg(feature = "sync-sender-tcp")]
1095 Some(conf::AuthParams::Ecdsa(_)) => {
1096 return Err(fmt!(
1097 AuthError,
1098 "ECDSA authentication is not supported for ILP over HTTP. \
1099 Please use basic or token authentication instead."
1100 ));
1101 }
1102 None => None,
1103 };
1104 let agent_builder = agent_builder
1105 .timeout_connect(Some(*http_config.request_timeout.deref()))
1106 .http_status_as_error(false);
1107 let agent = ureq::Agent::with_parts(
1108 agent_builder.build(),
1109 connector,
1110 ureq::unversioned::resolver::DefaultResolver::default(),
1111 );
1112 let proto = self.protocol.schema();
1113 let url = format!(
1114 "{}://{}:{}/write",
1115 proto,
1116 self.host.deref(),
1117 self.port.deref()
1118 );
1119 SyncProtocolHandler::SyncHttp(SyncHttpHandlerState {
1120 agent,
1121 url,
1122 auth,
1123 config: self.http.as_ref().unwrap().clone(),
1124 })
1125 }
1126 };
1127
1128 #[allow(unused_mut)]
1129 let mut max_name_len = *self.max_name_len;
1130
1131 let protocol_version = match self.protocol_version.deref() {
1132 Some(v) => *v,
1133 None => match self.protocol {
1134 #[cfg(feature = "sync-sender-tcp")]
1135 Protocol::Tcp | Protocol::Tcps => ProtocolVersion::V1,
1136 #[cfg(feature = "sync-sender-http")]
1137 Protocol::Http | Protocol::Https => {
1138 #[allow(irrefutable_let_patterns)]
1139 if let SyncProtocolHandler::SyncHttp(http_state) = &handler {
1140 let settings_url = &format!(
1141 "{}://{}:{}/settings",
1142 self.protocol.schema(),
1143 self.host.deref(),
1144 self.port.deref()
1145 );
1146 let (protocol_versions, server_max_name_len) =
1147 read_server_settings(http_state, settings_url, max_name_len)?;
1148 max_name_len = server_max_name_len;
1149 if protocol_versions.contains(&ProtocolVersion::V2) {
1150 ProtocolVersion::V2
1151 } else if protocol_versions.contains(&ProtocolVersion::V1) {
1152 ProtocolVersion::V1
1153 } else {
1154 return Err(fmt!(
1155 ProtocolVersionError,
1156 "Server does not support current client"
1157 ));
1158 }
1159 } else {
1160 unreachable!("HTTP handler should be used for HTTP protocol");
1161 }
1162 }
1163 },
1164 };
1165
1166 if auth.is_some() {
1167 descr.push_str("auth=on]");
1168 } else {
1169 descr.push_str("auth=off]");
1170 }
1171
1172 let sender = Sender::new(
1173 descr,
1174 handler,
1175 *self.max_buf_size,
1176 protocol_version,
1177 max_name_len,
1178 );
1179
1180 Ok(sender)
1181 }
1182
1183 #[cfg(feature = "_sender-tcp")]
1184 fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
1185 if self.protocol.is_tcpx() {
1186 Ok(())
1187 } else {
1188 Err(fmt!(
1189 ConfigError,
1190 "The {param_name:?} setting can only be used with the TCP protocol."
1191 ))
1192 }
1193 }
1194}
1195
1196fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
1199 let str_ref = value.as_ref();
1200 for (p, c) in str_ref.chars().enumerate() {
1201 if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
1202 return Err(error::fmt!(
1203 ConfigError,
1204 "Invalid character {c:?} at position {p}"
1205 ));
1206 }
1207 }
1208 Ok(value)
1209}
1210
1211fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
1212where
1213 T: FromStr,
1214 T::Err: std::fmt::Debug,
1215{
1216 str_value.parse().map_err(|e| {
1217 fmt!(
1218 ConfigError,
1219 "Could not parse {param_name:?} to number: {e:?}"
1220 )
1221 })
1222}
1223
1224#[cfg(feature = "_sender-tcp")]
1225fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
1226 use base64ct::{Base64UrlUnpadded, Encoding};
1227 Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
1228 fmt!(
1229 AuthError,
1230 "Misconfigured ILP authentication keys. Could not decode {}: {}. \
1231 Hint: Check the keys for a possible typo.",
1232 descr,
1233 b64_err
1234 )
1235 })
1236}
1237
1238#[cfg(feature = "_sender-tcp")]
1239fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
1240 let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
1241 let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
1242
1243 let mut encoded = Vec::new();
1245 encoded.push(4u8); let pub_key_x_ken = pub_key_x.len();
1247 if pub_key_x_ken > 32 {
1248 return Err(fmt!(
1249 AuthError,
1250 "Misconfigured ILP authentication keys. Public key x is too long. \
1251 Hint: Check the keys for a possible typo."
1252 ));
1253 }
1254 let pub_key_y_len = pub_key_y.len();
1255 if pub_key_y_len > 32 {
1256 return Err(fmt!(
1257 AuthError,
1258 "Misconfigured ILP authentication keys. Public key y is too long. \
1259 Hint: Check the keys for a possible typo."
1260 ));
1261 }
1262 encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
1263 encoded.append(&mut pub_key_x);
1264 encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
1265 encoded.append(&mut pub_key_y);
1266 Ok(encoded)
1267}
1268
1269#[cfg(feature = "_sender-tcp")]
1270fn parse_key_pair(auth: &conf::EcdsaAuthParams) -> Result<EcdsaKeyPair> {
1271 let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
1272 let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
1273
1274 #[cfg(feature = "aws-lc-crypto")]
1275 let res = EcdsaKeyPair::from_private_key_and_public_key(
1276 &ECDSA_P256_SHA256_FIXED_SIGNING,
1277 &private_key[..],
1278 &public_key[..],
1279 );
1280
1281 #[cfg(feature = "ring-crypto")]
1282 let res = {
1283 let system_random = SystemRandom::new();
1284 EcdsaKeyPair::from_private_key_and_public_key(
1285 &ECDSA_P256_SHA256_FIXED_SIGNING,
1286 &private_key[..],
1287 &public_key[..],
1288 &system_random,
1289 )
1290 };
1291
1292 res.map_err(|key_rejected| {
1293 fmt!(
1294 AuthError,
1295 "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
1296 key_rejected
1297 )
1298 })
1299}
1300
1301struct DebugBytes<'a>(pub &'a [u8]);
1302
1303impl Debug for DebugBytes<'_> {
1304 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1305 write!(f, "b\"")?;
1306
1307 for &byte in self.0 {
1308 match byte {
1309 0x20..=0x21 | 0x23..=0x5B | 0x5D..=0x7E => {
1311 write!(f, "{}", byte as char)?;
1312 }
1313 b'\n' => write!(f, "\\n")?,
1315 b'\r' => write!(f, "\\r")?,
1316 b'\t' => write!(f, "\\t")?,
1317 b'\\' => write!(f, "\\\\")?,
1318 b'"' => write!(f, "\\\"")?,
1319 b'\0' => write!(f, "\\0")?,
1320 _ => write!(f, "\\x{byte:02x}")?,
1322 }
1323 }
1324
1325 write!(f, "\"")
1326 }
1327}
1328
1329#[cfg(test)]
1330mod tests;