1#![doc = include_str!("mod.md")]
26
27pub use self::ndarr::{ArrayElement, NdArrayView};
28pub use self::timestamp::*;
29use crate::error::{self, Result, fmt};
30use crate::ingress::conf::ConfigSetting;
31use core::time::Duration;
32use std::collections::HashMap;
33use std::fmt::{Debug, Display, Formatter, Write};
34
35use std::ops::Deref;
36use std::path::PathBuf;
37use std::str::FromStr;
38
39mod tls;
40
41#[cfg(all(feature = "_sender-tcp", feature = "aws-lc-crypto"))]
42use aws_lc_rs::{
43 rand::SystemRandom,
44 signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
45};
46
47#[cfg(all(feature = "_sender-tcp", feature = "ring-crypto"))]
48use ring::{
49 rand::SystemRandom,
50 signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair},
51};
52
53mod conf;
54
55pub(crate) mod ndarr;
56
57mod timestamp;
58
59mod buffer;
60pub use buffer::*;
61
62mod sender;
63pub use sender::*;
64
65mod decimal;
66pub use decimal::DecimalView;
67
68const MAX_NAME_LEN_DEFAULT: usize = 127;
69
70pub const MAX_ARRAY_DIMS: usize = 32;
72pub const MAX_ARRAY_BUFFER_SIZE: usize = 512 * 1024 * 1024; pub const MAX_ARRAY_DIM_LEN: usize = 0x0FFF_FFFF; pub(crate) const ARRAY_BINARY_FORMAT_TYPE: u8 = 14;
76pub(crate) const DOUBLE_BINARY_FORMAT_TYPE: u8 = 16;
77#[allow(dead_code)]
78pub const DECIMAL_BINARY_FORMAT_TYPE: u8 = 23;
79
80#[derive(Debug, Copy, Clone, PartialEq, PartialOrd)]
82pub enum ProtocolVersion {
83 V1 = 1,
87
88 V2 = 2,
93
94 V3 = 3,
99}
100
101const SUPPORTED_PROTOCOL_VERSIONS: [ProtocolVersion; 3] = [
103 ProtocolVersion::V3,
104 ProtocolVersion::V2,
105 ProtocolVersion::V1,
106];
107
108impl Display for ProtocolVersion {
109 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
110 match self {
111 ProtocolVersion::V1 => write!(f, "v1"),
112 ProtocolVersion::V2 => write!(f, "v2"),
113 ProtocolVersion::V3 => write!(f, "v3"),
114 }
115 }
116}
117
118#[cfg(feature = "_sender-tcp")]
119fn map_io_to_socket_err(prefix: &str, io_err: std::io::Error) -> error::Error {
120 fmt!(SocketError, "{}{}", prefix, io_err)
121}
122
123#[derive(PartialEq, Debug, Clone, Copy)]
126pub enum CertificateAuthority {
127 #[cfg(feature = "tls-webpki-certs")]
130 WebpkiRoots,
131
132 #[cfg(feature = "tls-native-certs")]
134 OsRoots,
135
136 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
138 WebpkiAndOsRoots,
139
140 PemFile,
142}
143
144pub struct Port(String);
164
165impl From<String> for Port {
166 fn from(s: String) -> Self {
167 Port(s)
168 }
169}
170
171impl From<&str> for Port {
172 fn from(s: &str) -> Self {
173 Port(s.to_owned())
174 }
175}
176
177impl From<u16> for Port {
178 fn from(p: u16) -> Self {
179 Port(p.to_string())
180 }
181}
182
183fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
184 if let Some(auto_flush) = params.get("auto_flush")
185 && auto_flush.as_str() != "off"
186 {
187 return Err(error::fmt!(
188 ConfigError,
189 "Invalid auto_flush value '{auto_flush}'. This client does not \
190 support auto-flush, so the only accepted value is 'off'"
191 ));
192 }
193
194 for ¶m in ["auto_flush_rows", "auto_flush_bytes"].iter() {
195 if params.contains_key(param) {
196 return Err(error::fmt!(
197 ConfigError,
198 "Invalid configuration parameter {:?}. This client does not support auto-flush",
199 param
200 ));
201 }
202 }
203 Ok(())
204}
205
206#[derive(PartialEq, Debug, Clone, Copy)]
208pub enum Protocol {
209 #[cfg(feature = "_sender-tcp")]
210 Tcp,
212
213 #[cfg(feature = "_sender-tcp")]
214 Tcps,
216
217 #[cfg(feature = "_sender-http")]
218 Http,
221
222 #[cfg(feature = "_sender-http")]
223 Https,
225}
226
227impl Display for Protocol {
228 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
229 f.write_str(self.schema())
230 }
231}
232
233impl Protocol {
234 fn default_port(&self) -> &str {
235 match self {
236 #[cfg(feature = "_sender-tcp")]
237 Protocol::Tcp | Protocol::Tcps => "9009",
238 #[cfg(feature = "_sender-http")]
239 Protocol::Http | Protocol::Https => "9000",
240 }
241 }
242
243 fn tls_enabled(&self) -> bool {
244 match self {
245 #[cfg(feature = "_sender-tcp")]
246 Protocol::Tcp => false,
247 #[cfg(feature = "_sender-tcp")]
248 Protocol::Tcps => true,
249 #[cfg(feature = "_sender-http")]
250 Protocol::Http => false,
251 #[cfg(feature = "_sender-http")]
252 Protocol::Https => true,
253 }
254 }
255
256 #[cfg(feature = "_sender-tcp")]
257 fn is_tcpx(&self) -> bool {
258 match self {
259 Protocol::Tcp | Protocol::Tcps => true,
260 #[cfg(feature = "_sender-http")]
261 Protocol::Http | Protocol::Https => false,
262 }
263 }
264
265 #[cfg(feature = "_sender-http")]
266 fn is_httpx(&self) -> bool {
267 match self {
268 #[cfg(feature = "_sender-tcp")]
269 Protocol::Tcp | Protocol::Tcps => false,
270 Protocol::Http | Protocol::Https => true,
271 }
272 }
273
274 fn schema(&self) -> &str {
275 match self {
276 #[cfg(feature = "_sender-tcp")]
277 Protocol::Tcp => "tcp",
278 #[cfg(feature = "_sender-tcp")]
279 Protocol::Tcps => "tcps",
280 #[cfg(feature = "_sender-http")]
281 Protocol::Http => "http",
282 #[cfg(feature = "_sender-http")]
283 Protocol::Https => "https",
284 }
285 }
286
287 fn from_schema(schema: &str) -> Result<Self> {
288 match schema {
289 #[cfg(feature = "_sender-tcp")]
290 "tcp" => Ok(Protocol::Tcp),
291 #[cfg(feature = "_sender-tcp")]
292 "tcps" => Ok(Protocol::Tcps),
293 #[cfg(feature = "_sender-http")]
294 "http" => Ok(Protocol::Http),
295 #[cfg(feature = "_sender-http")]
296 "https" => Ok(Protocol::Https),
297 _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
298 }
299 }
300}
301
302#[derive(Debug, Clone)]
348pub struct SenderBuilder {
349 protocol: Protocol,
350 host: ConfigSetting<String>,
351 port: ConfigSetting<String>,
352 net_interface: ConfigSetting<Option<String>>,
353 max_buf_size: ConfigSetting<usize>,
354 max_name_len: ConfigSetting<usize>,
355 auth_timeout: ConfigSetting<Duration>,
356 username: ConfigSetting<Option<String>>,
357 password: ConfigSetting<Option<String>>,
358 token: ConfigSetting<Option<String>>,
359
360 #[cfg(feature = "_sender-tcp")]
361 token_x: ConfigSetting<Option<String>>,
362
363 #[cfg(feature = "_sender-tcp")]
364 token_y: ConfigSetting<Option<String>>,
365
366 protocol_version: ConfigSetting<Option<ProtocolVersion>>,
367
368 #[cfg(feature = "insecure-skip-verify")]
369 tls_verify: ConfigSetting<bool>,
370
371 tls_ca: ConfigSetting<CertificateAuthority>,
372 tls_roots: ConfigSetting<Option<PathBuf>>,
373
374 #[cfg(feature = "_sender-http")]
375 http: Option<conf::HttpConfig>,
376}
377
378impl SenderBuilder {
379 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
406 let conf = conf.as_ref();
407 let conf = questdb_confstr::parse_conf_str(conf)
408 .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
409 let service = conf.service();
410 let params = conf.params();
411
412 let protocol = Protocol::from_schema(service)?;
413
414 let Some(addr) = params.get("addr") else {
415 return Err(error::fmt!(
416 ConfigError,
417 "Missing \"addr\" parameter in config string"
418 ));
419 };
420 let (host, port) = match addr.split_once(':') {
421 Some((h, p)) => (h, p),
422 None => (addr.as_str(), protocol.default_port()),
423 };
424 let mut builder = SenderBuilder::new(protocol, host, port);
425
426 validate_auto_flush_params(params)?;
427
428 for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
429 builder = match key {
430 "username" => builder.username(val)?,
431 "password" => builder.password(val)?,
432 "token" => builder.token(val)?,
433 "token_x" => builder.token_x(val)?,
434 "token_y" => builder.token_y(val)?,
435 "bind_interface" => builder.bind_interface(val)?,
436 "protocol_version" => match val {
437 "1" => builder.protocol_version(ProtocolVersion::V1)?,
438 "2" => builder.protocol_version(ProtocolVersion::V2)?,
439 "3" => builder.protocol_version(ProtocolVersion::V3)?,
440 "auto" => builder,
441 invalid => {
442 return Err(error::fmt!(
443 ConfigError,
444 "invalid \"protocol_version\" [value={invalid}, allowed-values=[auto, 1, 2, 3]]"
445 ));
446 }
447 },
448 "max_name_len" => builder.max_name_len(parse_conf_value(key, val)?)?,
449
450 "init_buf_size" => {
451 return Err(error::fmt!(
452 ConfigError,
453 "\"init_buf_size\" is not supported in config string"
454 ));
455 }
456
457 "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
458
459 "auth_timeout" => {
460 builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
461 }
462
463 "tls_verify" => {
464 let verify = match val {
465 "on" => true,
466 "unsafe_off" => false,
467 _ => {
468 return Err(fmt!(
469 ConfigError,
470 r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
471 ));
472 }
473 };
474
475 #[cfg(not(feature = "insecure-skip-verify"))]
476 {
477 if !verify {
478 return Err(fmt!(
479 ConfigError,
480 r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
481 ));
482 }
483 builder
484 }
485
486 #[cfg(feature = "insecure-skip-verify")]
487 builder.tls_verify(verify)?
488 }
489
490 "tls_ca" => {
491 let ca = match val {
492 #[cfg(feature = "tls-webpki-certs")]
493 "webpki_roots" => CertificateAuthority::WebpkiRoots,
494
495 #[cfg(not(feature = "tls-webpki-certs"))]
496 "webpki_roots" => {
497 return Err(error::fmt!(
498 ConfigError,
499 "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature"
500 ));
501 }
502
503 #[cfg(feature = "tls-native-certs")]
504 "os_roots" => CertificateAuthority::OsRoots,
505
506 #[cfg(not(feature = "tls-native-certs"))]
507 "os_roots" => {
508 return Err(error::fmt!(
509 ConfigError,
510 "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature"
511 ));
512 }
513
514 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
515 "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
516
517 #[cfg(not(all(
518 feature = "tls-webpki-certs",
519 feature = "tls-native-certs"
520 )))]
521 "webpki_and_os_roots" => {
522 return Err(error::fmt!(
523 ConfigError,
524 "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features"
525 ));
526 }
527
528 _ => {
529 return Err(error::fmt!(
530 ConfigError,
531 "Invalid value {val:?} for \"tls_ca\""
532 ));
533 }
534 };
535 builder.tls_ca(ca)?
536 }
537
538 "tls_roots" => {
539 let path = PathBuf::from_str(val).map_err(|e| {
540 error::fmt!(
541 ConfigError,
542 "Invalid path {:?} for \"tls_roots\": {}",
543 val,
544 e
545 )
546 })?;
547 builder.tls_roots(path)?
548 }
549
550 "tls_roots_password" => {
551 return Err(error::fmt!(
552 ConfigError,
553 "\"tls_roots_password\" is not supported."
554 ));
555 }
556
557 #[cfg(feature = "sync-sender-http")]
558 "request_min_throughput" => {
559 builder.request_min_throughput(parse_conf_value(key, val)?)?
560 }
561
562 #[cfg(feature = "sync-sender-http")]
563 "request_timeout" => {
564 builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
565 }
566
567 #[cfg(feature = "sync-sender-http")]
568 "retry_timeout" => {
569 builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
570 }
571
572 _ => builder,
577 };
578 }
579
580 Ok(builder)
581 }
582
583 pub fn from_env() -> Result<Self> {
588 let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
589 error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
590 })?;
591 Self::from_conf(conf)
592 }
593
594 pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
614 let host = host.into();
615 let port: Port = port.into();
616 let port = port.0;
617
618 #[cfg(feature = "tls-webpki-certs")]
619 let tls_ca = CertificateAuthority::WebpkiRoots;
620
621 #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
622 let tls_ca = CertificateAuthority::OsRoots;
623
624 #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
625 let tls_ca = CertificateAuthority::PemFile;
626
627 Self {
628 protocol,
629 host: ConfigSetting::new_specified(host),
630 port: ConfigSetting::new_specified(port),
631 net_interface: ConfigSetting::new_default(None),
632 max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
633 max_name_len: ConfigSetting::new_default(MAX_NAME_LEN_DEFAULT),
634 auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
635 username: ConfigSetting::new_default(None),
636 password: ConfigSetting::new_default(None),
637 token: ConfigSetting::new_default(None),
638
639 #[cfg(feature = "_sender-tcp")]
640 token_x: ConfigSetting::new_default(None),
641
642 #[cfg(feature = "_sender-tcp")]
643 token_y: ConfigSetting::new_default(None),
644
645 protocol_version: ConfigSetting::new_default(None),
646
647 #[cfg(feature = "insecure-skip-verify")]
648 tls_verify: ConfigSetting::new_default(true),
649
650 tls_ca: ConfigSetting::new_default(tls_ca),
651 tls_roots: ConfigSetting::new_default(None),
652
653 #[cfg(feature = "sync-sender-http")]
654 http: if protocol.is_httpx() {
655 Some(conf::HttpConfig::default())
656 } else {
657 None
658 },
659 }
660 }
661
662 pub fn bind_interface<I: Into<String>>(self, addr: I) -> Result<Self> {
668 #[cfg(feature = "_sender-tcp")]
669 {
670 let mut builder = self;
671 builder.ensure_is_tcpx("bind_interface")?;
672 builder
673 .net_interface
674 .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
675 Ok(builder)
676 }
677
678 #[cfg(not(feature = "_sender-tcp"))]
679 {
680 let _ = addr;
681 Err(error::fmt!(
682 ConfigError,
683 "The \"bind_interface\" setting can only be used with the TCP protocol."
684 ))
685 }
686 }
687
688 pub fn username(mut self, username: &str) -> Result<Self> {
697 self.username
698 .set_specified("username", Some(validate_value(username.to_string())?))?;
699 Ok(self)
700 }
701
702 pub fn password(mut self, password: &str) -> Result<Self> {
705 self.password
706 .set_specified("password", Some(validate_value(password.to_string())?))?;
707 Ok(self)
708 }
709
710 pub fn token(mut self, token: &str) -> Result<Self> {
713 self.token
714 .set_specified("token", Some(validate_value(token.to_string())?))?;
715 Ok(self)
716 }
717
718 pub fn token_x(self, token_x: &str) -> Result<Self> {
720 #[cfg(feature = "_sender-tcp")]
721 {
722 let mut builder = self;
723 builder
724 .token_x
725 .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
726 Ok(builder)
727 }
728
729 #[cfg(not(feature = "_sender-tcp"))]
730 {
731 let _ = token_x;
732 Err(error::fmt!(
733 ConfigError,
734 "cannot specify \"token_x\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
735 ))
736 }
737 }
738
739 pub fn token_y(self, token_y: &str) -> Result<Self> {
741 #[cfg(feature = "_sender-tcp")]
742 {
743 let mut builder = self;
744 builder
745 .token_y
746 .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
747 Ok(builder)
748 }
749
750 #[cfg(not(feature = "_sender-tcp"))]
751 {
752 let _ = token_y;
753 Err(error::fmt!(
754 ConfigError,
755 "cannot specify \"token_y\": ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
756 ))
757 }
758 }
759
760 pub fn protocol_version(mut self, protocol_version: ProtocolVersion) -> Result<Self> {
769 self.protocol_version
770 .set_specified("protocol_version", Some(protocol_version))?;
771 Ok(self)
772 }
773
774 pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
778 self.auth_timeout.set_specified("auth_timeout", value)?;
779 Ok(self)
780 }
781
782 pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
784 if !self.protocol.tls_enabled() {
785 return Err(error::fmt!(
786 ConfigError,
787 "Cannot set {property:?}: TLS is not supported for protocol {}",
788 self.protocol
789 ));
790 }
791 Ok(())
792 }
793
794 #[cfg(feature = "insecure-skip-verify")]
800 pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
801 self.ensure_tls_enabled("tls_verify")?;
802 self.tls_verify.set_specified("tls_verify", verify)?;
803 Ok(self)
804 }
805
806 pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
809 self.ensure_tls_enabled("tls_ca")?;
810 self.tls_ca.set_specified("tls_ca", ca)?;
811 Ok(self)
812 }
813
814 pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
820 let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
821 let path = path.into();
822 let _file = std::fs::File::open(&path).map_err(|io_err| {
824 error::fmt!(
825 ConfigError,
826 "Could not open root certificate file from path {:?}: {}",
827 path,
828 io_err
829 )
830 })?;
831 builder.tls_roots.set_specified("tls_roots", Some(path))?;
832 Ok(builder)
833 }
834
835 pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
838 let min = 1024;
839 if value < min {
840 return Err(error::fmt!(
841 ConfigError,
842 "max_buf_size\" must be at least {min} bytes."
843 ));
844 }
845 self.max_buf_size.set_specified("max_buf_size", value)?;
846 Ok(self)
847 }
848
849 pub fn max_name_len(mut self, value: usize) -> Result<Self> {
855 if value < 16 {
856 return Err(error::fmt!(
857 ConfigError,
858 "max_name_len must be at least 16 bytes."
859 ));
860 }
861 self.max_name_len.set_specified("max_name_len", value)?;
862 Ok(self)
863 }
864
865 #[cfg(feature = "sync-sender-http")]
866 pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
869 if let Some(http) = &mut self.http {
870 http.retry_timeout.set_specified("retry_timeout", value)?;
871 } else {
872 return Err(error::fmt!(
873 ConfigError,
874 "retry_timeout is supported only in ILP over HTTP."
875 ));
876 }
877 Ok(self)
878 }
879
880 #[cfg(feature = "sync-sender-http")]
881 pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
891 if let Some(http) = &mut self.http {
892 http.request_min_throughput
893 .set_specified("request_min_throughput", value)?;
894 } else {
895 return Err(error::fmt!(
896 ConfigError,
897 "\"request_min_throughput\" is supported only in ILP over HTTP."
898 ));
899 }
900 Ok(self)
901 }
902
903 #[cfg(feature = "sync-sender-http")]
904 pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
909 if let Some(http) = &mut self.http {
910 if value.is_zero() {
911 return Err(error::fmt!(
912 ConfigError,
913 "\"request_timeout\" must be greater than 0."
914 ));
915 }
916 http.request_timeout
917 .set_specified("request_timeout", value)?;
918 } else {
919 return Err(error::fmt!(
920 ConfigError,
921 "\"request_timeout\" is supported only in ILP over HTTP."
922 ));
923 }
924 Ok(self)
925 }
926
927 #[cfg(feature = "sync-sender-http")]
928 #[doc(hidden)]
932 pub fn user_agent(mut self, value: &str) -> Result<Self> {
933 let value = validate_value(value)?;
934 if let Some(http) = &mut self.http {
935 http.user_agent = value.to_string();
936 }
937 Ok(self)
938 }
939
940 fn build_auth(&self) -> Result<Option<conf::AuthParams>> {
941 match (
942 self.protocol,
943 self.username.deref(),
944 self.password.deref(),
945 self.token.deref(),
946 #[cfg(feature = "_sender-tcp")]
947 self.token_x.deref(),
948 #[cfg(not(feature = "_sender-tcp"))]
949 None::<String>,
950 #[cfg(feature = "_sender-tcp")]
951 self.token_y.deref(),
952 #[cfg(not(feature = "_sender-tcp"))]
953 None::<String>,
954 ) {
955 (_, None, None, None, None, None) => Ok(None),
956
957 #[cfg(feature = "_sender-tcp")]
958 (protocol, Some(username), None, Some(token), Some(token_x), Some(token_y))
959 if protocol.is_tcpx() =>
960 {
961 Ok(Some(conf::AuthParams::Ecdsa(conf::EcdsaAuthParams {
962 key_id: username.to_string(),
963 priv_key: token.to_string(),
964 pub_key_x: token_x.to_string(),
965 pub_key_y: token_y.to_string(),
966 })))
967 }
968
969 #[cfg(feature = "_sender-tcp")]
970 (protocol, Some(_username), Some(_password), None, None, None)
971 if protocol.is_tcpx() =>
972 {
973 Err(error::fmt!(
974 ConfigError,
975 r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
976 ))
977 }
978
979 #[cfg(feature = "_sender-tcp")]
980 (protocol, None, None, Some(_token), None, None) if protocol.is_tcpx() => {
981 Err(error::fmt!(
982 ConfigError,
983 "Token authentication only be used with the ILP/HTTP protocol."
984 ))
985 }
986
987 #[cfg(feature = "_sender-tcp")]
988 (protocol, _username, None, _token, _token_x, _token_y) if protocol.is_tcpx() => {
989 Err(error::fmt!(
990 ConfigError,
991 r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
992 ))
993 }
994 #[cfg(feature = "_sender-http")]
995 (protocol, Some(username), Some(password), None, None, None) if protocol.is_httpx() => {
996 Ok(Some(conf::AuthParams::Basic(conf::BasicAuthParams {
997 username: username.to_string(),
998 password: password.to_string(),
999 })))
1000 }
1001 #[cfg(feature = "_sender-http")]
1002 (protocol, Some(_username), None, None, None, None) if protocol.is_httpx() => {
1003 Err(error::fmt!(
1004 ConfigError,
1005 r##"Basic authentication parameter "username" is present, but "password" is missing."##,
1006 ))
1007 }
1008 #[cfg(feature = "_sender-http")]
1009 (protocol, None, Some(_password), None, None, None) if protocol.is_httpx() => {
1010 Err(error::fmt!(
1011 ConfigError,
1012 r##"Basic authentication parameter "password" is present, but "username" is missing."##,
1013 ))
1014 }
1015 #[cfg(feature = "sync-sender-http")]
1016 (protocol, None, None, Some(token), None, None) if protocol.is_httpx() => {
1017 Ok(Some(conf::AuthParams::Token(conf::TokenAuthParams {
1018 token: token.to_string(),
1019 })))
1020 }
1021 #[cfg(feature = "sync-sender-http")]
1022 (protocol, Some(_username), None, Some(_token), Some(_token_x), Some(_token_y))
1023 if protocol.is_httpx() =>
1024 {
1025 Err(error::fmt!(
1026 ConfigError,
1027 "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."
1028 ))
1029 }
1030 #[cfg(feature = "_sender-http")]
1031 (protocol, _username, _password, _token, None, None) if protocol.is_httpx() => {
1032 Err(error::fmt!(
1033 ConfigError,
1034 r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
1035 ))
1036 }
1037 _ => Err(error::fmt!(
1038 ConfigError,
1039 r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
1040 )),
1041 }
1042 }
1043
1044 #[cfg(feature = "_sync-sender")]
1045 pub fn build(&self) -> Result<Sender> {
1052 let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
1053
1054 if self.protocol.tls_enabled() {
1055 write!(descr, "tls=enabled,").unwrap();
1056 } else {
1057 write!(descr, "tls=disabled,").unwrap();
1058 }
1059
1060 #[cfg(feature = "insecure-skip-verify")]
1061 let tls_verify = *self.tls_verify;
1062
1063 let tls_settings = tls::TlsSettings::build(
1064 self.protocol.tls_enabled(),
1065 #[cfg(feature = "insecure-skip-verify")]
1066 tls_verify,
1067 *self.tls_ca,
1068 self.tls_roots.deref().as_deref(),
1069 )?;
1070
1071 let auth = self.build_auth()?;
1072
1073 let handler = match self.protocol {
1074 #[cfg(feature = "sync-sender-tcp")]
1075 Protocol::Tcp | Protocol::Tcps => connect_tcp(
1076 self.host.as_str(),
1077 self.port.as_str(),
1078 self.net_interface.deref().as_deref(),
1079 *self.auth_timeout,
1080 tls_settings,
1081 &auth,
1082 )?,
1083 #[cfg(feature = "sync-sender-http")]
1084 Protocol::Http | Protocol::Https => {
1085 use ureq::unversioned::transport::Connector;
1086 use ureq::unversioned::transport::TcpConnector;
1087 if self.net_interface.is_some() {
1088 return Err(error::fmt!(
1090 InvalidApiCall,
1091 "net_interface is not supported for ILP over HTTP."
1092 ));
1093 }
1094
1095 let http_config = self.http.as_ref().unwrap();
1096 let user_agent = http_config.user_agent.as_str();
1097 let connector = TcpConnector::default();
1098
1099 let agent_builder = ureq::Agent::config_builder()
1100 .user_agent(user_agent)
1101 .no_delay(true);
1102
1103 let tls_config = match tls_settings {
1104 Some(tls_settings) => Some(tls::configure_tls(tls_settings)?),
1105 None => None,
1106 };
1107
1108 let connector = connector.chain(TlsConnector::new(tls_config));
1109
1110 let auth = match auth {
1111 Some(conf::AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
1112 Some(conf::AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
1113
1114 #[cfg(feature = "sync-sender-tcp")]
1115 Some(conf::AuthParams::Ecdsa(_)) => {
1116 return Err(fmt!(
1117 AuthError,
1118 "ECDSA authentication is not supported for ILP over HTTP. \
1119 Please use basic or token authentication instead."
1120 ));
1121 }
1122 None => None,
1123 };
1124 let agent_builder = agent_builder
1125 .timeout_connect(Some(*http_config.request_timeout.deref()))
1126 .http_status_as_error(false);
1127 let agent = ureq::Agent::with_parts(
1128 agent_builder.build(),
1129 connector,
1130 ureq::unversioned::resolver::DefaultResolver::default(),
1131 );
1132 let proto = self.protocol.schema();
1133 let url = format!(
1134 "{}://{}:{}/write",
1135 proto,
1136 self.host.deref(),
1137 self.port.deref()
1138 );
1139 SyncProtocolHandler::SyncHttp(SyncHttpHandlerState {
1140 agent,
1141 url,
1142 auth,
1143 config: self.http.as_ref().unwrap().clone(),
1144 })
1145 }
1146 };
1147
1148 #[allow(unused_mut)]
1149 let mut max_name_len = *self.max_name_len;
1150
1151 let protocol_version = match self.protocol_version.deref() {
1152 Some(v) => *v,
1153 None => match self.protocol {
1154 #[cfg(feature = "sync-sender-tcp")]
1155 Protocol::Tcp | Protocol::Tcps => ProtocolVersion::V1,
1156 #[cfg(feature = "sync-sender-http")]
1157 Protocol::Http | Protocol::Https => {
1158 #[allow(irrefutable_let_patterns)]
1159 if let SyncProtocolHandler::SyncHttp(http_state) = &handler {
1160 let settings_url = &format!(
1161 "{}://{}:{}/settings",
1162 self.protocol.schema(),
1163 self.host.deref(),
1164 self.port.deref()
1165 );
1166 let (protocol_versions, server_max_name_len) =
1167 read_server_settings(http_state, settings_url, max_name_len)?;
1168 max_name_len = server_max_name_len;
1169 SUPPORTED_PROTOCOL_VERSIONS
1170 .iter()
1171 .find(|version| protocol_versions.contains(version))
1172 .copied()
1173 .ok_or_else(|| {
1174 fmt!(
1175 ProtocolVersionError,
1176 "Server does not support any of the client protocol versions: {:?}",
1177 SUPPORTED_PROTOCOL_VERSIONS
1178 )
1179 })?
1180 } else {
1181 unreachable!("HTTP handler should be used for HTTP protocol");
1182 }
1183 }
1184 },
1185 };
1186
1187 if auth.is_some() {
1188 descr.push_str("auth=on]");
1189 } else {
1190 descr.push_str("auth=off]");
1191 }
1192
1193 let sender = Sender::new(
1194 descr,
1195 handler,
1196 *self.max_buf_size,
1197 protocol_version,
1198 max_name_len,
1199 );
1200
1201 Ok(sender)
1202 }
1203
1204 #[cfg(feature = "_sender-tcp")]
1205 fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
1206 if self.protocol.is_tcpx() {
1207 Ok(())
1208 } else {
1209 Err(fmt!(
1210 ConfigError,
1211 "The {param_name:?} setting can only be used with the TCP protocol."
1212 ))
1213 }
1214 }
1215}
1216
1217fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
1220 let str_ref = value.as_ref();
1221 for (p, c) in str_ref.chars().enumerate() {
1222 if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
1223 return Err(error::fmt!(
1224 ConfigError,
1225 "Invalid character {c:?} at position {p}"
1226 ));
1227 }
1228 }
1229 Ok(value)
1230}
1231
1232fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
1233where
1234 T: FromStr,
1235 T::Err: std::fmt::Debug,
1236{
1237 str_value.parse().map_err(|e| {
1238 fmt!(
1239 ConfigError,
1240 "Could not parse {param_name:?} to number: {e:?}"
1241 )
1242 })
1243}
1244
1245#[cfg(feature = "_sender-tcp")]
1246fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
1247 use base64ct::{Base64UrlUnpadded, Encoding};
1248 Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
1249 fmt!(
1250 AuthError,
1251 "Misconfigured ILP authentication keys. Could not decode {}: {}. \
1252 Hint: Check the keys for a possible typo.",
1253 descr,
1254 b64_err
1255 )
1256 })
1257}
1258
1259#[cfg(feature = "_sender-tcp")]
1260fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
1261 let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
1262 let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
1263
1264 let mut encoded = Vec::new();
1266 encoded.push(4u8); let pub_key_x_ken = pub_key_x.len();
1268 if pub_key_x_ken > 32 {
1269 return Err(fmt!(
1270 AuthError,
1271 "Misconfigured ILP authentication keys. Public key x is too long. \
1272 Hint: Check the keys for a possible typo."
1273 ));
1274 }
1275 let pub_key_y_len = pub_key_y.len();
1276 if pub_key_y_len > 32 {
1277 return Err(fmt!(
1278 AuthError,
1279 "Misconfigured ILP authentication keys. Public key y is too long. \
1280 Hint: Check the keys for a possible typo."
1281 ));
1282 }
1283 encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
1284 encoded.append(&mut pub_key_x);
1285 encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
1286 encoded.append(&mut pub_key_y);
1287 Ok(encoded)
1288}
1289
1290#[cfg(feature = "_sender-tcp")]
1291fn parse_key_pair(auth: &conf::EcdsaAuthParams) -> Result<EcdsaKeyPair> {
1292 let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
1293 let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
1294
1295 #[cfg(feature = "aws-lc-crypto")]
1296 let res = EcdsaKeyPair::from_private_key_and_public_key(
1297 &ECDSA_P256_SHA256_FIXED_SIGNING,
1298 &private_key[..],
1299 &public_key[..],
1300 );
1301
1302 #[cfg(feature = "ring-crypto")]
1303 let res = {
1304 let system_random = SystemRandom::new();
1305 EcdsaKeyPair::from_private_key_and_public_key(
1306 &ECDSA_P256_SHA256_FIXED_SIGNING,
1307 &private_key[..],
1308 &public_key[..],
1309 &system_random,
1310 )
1311 };
1312
1313 res.map_err(|key_rejected| {
1314 fmt!(
1315 AuthError,
1316 "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
1317 key_rejected
1318 )
1319 })
1320}
1321
1322struct DebugBytes<'a>(pub &'a [u8]);
1323
1324impl Debug for DebugBytes<'_> {
1325 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1326 write!(f, "b\"")?;
1327
1328 for &byte in self.0 {
1329 match byte {
1330 0x20..=0x21 | 0x23..=0x5B | 0x5D..=0x7E => {
1332 write!(f, "{}", byte as char)?;
1333 }
1334 b'\n' => write!(f, "\\n")?,
1336 b'\r' => write!(f, "\\r")?,
1337 b'\t' => write!(f, "\\t")?,
1338 b'\\' => write!(f, "\\\\")?,
1339 b'"' => write!(f, "\\\"")?,
1340 b'\0' => write!(f, "\\0")?,
1341 _ => write!(f, "\\x{byte:02x}")?,
1343 }
1344 }
1345
1346 write!(f, "\"")
1347 }
1348}
1349
1350#[cfg(test)]
1351mod tests;