1#![doc = include_str!("mod.md")]
26
27pub use self::timestamp::*;
28
29use crate::error::{self, Error, Result};
30use crate::gai;
31use crate::ingress::conf::ConfigSetting;
32use core::time::Duration;
33use std::collections::HashMap;
34use std::convert::Infallible;
35use std::fmt::{Debug, Display, Formatter, Write};
36use std::io::{self, BufRead, BufReader, ErrorKind, Write as IoWrite};
37use std::num::NonZeroUsize;
38use std::ops::Deref;
39use std::path::PathBuf;
40use std::str::FromStr;
41use std::sync::Arc;
42
43use base64ct::{Base64, Base64UrlUnpadded, Encoding};
44use ring::rand::SystemRandom;
45use ring::signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING};
46use rustls::{ClientConnection, RootCertStore, StreamOwned};
47use rustls_pki_types::ServerName;
48use socket2::{Domain, Protocol as SockProtocol, SockAddr, Socket, Type};
49
50#[derive(Debug, Copy, Clone)]
51enum Op {
52 Table = 1,
53 Symbol = 1 << 1,
54 Column = 1 << 2,
55 At = 1 << 3,
56 Flush = 1 << 4,
57}
58
59impl Op {
60 fn descr(self) -> &'static str {
61 match self {
62 Op::Table => "table",
63 Op::Symbol => "symbol",
64 Op::Column => "column",
65 Op::At => "at",
66 Op::Flush => "flush",
67 }
68 }
69}
70
71fn map_io_to_socket_err(prefix: &str, io_err: io::Error) -> Error {
72 error::fmt!(SocketError, "{}{}", prefix, io_err)
73}
74
75#[derive(Clone, Copy)]
82pub struct TableName<'a> {
83 name: &'a str,
84}
85
86impl<'a> TableName<'a> {
87 pub fn new(name: &'a str) -> Result<Self> {
89 if name.is_empty() {
90 return Err(error::fmt!(
91 InvalidName,
92 "Table names must have a non-zero length."
93 ));
94 }
95
96 let mut prev = '\0';
97 for (index, c) in name.chars().enumerate() {
98 match c {
99 '.' => {
100 if index == 0 || index == name.len() - 1 || prev == '.' {
101 return Err(error::fmt!(
102 InvalidName,
103 concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
104 name,
105 index
106 ));
107 }
108 }
109 '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
110 | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
111 | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
112 | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
113 return Err(error::fmt!(
114 InvalidName,
115 concat!(
116 "Bad string {:?}: ",
117 "Table names can't contain ",
118 "a {:?} character, which was found at ",
119 "byte position {}."
120 ),
121 name,
122 c,
123 index
124 ));
125 }
126 '\u{feff}' => {
127 return Err(error::fmt!(
130 InvalidName,
131 concat!(
132 "Bad string {:?}: ",
133 "Table names can't contain ",
134 "a UTF-8 BOM character, which was found at ",
135 "byte position {}."
136 ),
137 name,
138 index
139 ));
140 }
141 _ => (),
142 }
143 prev = c;
144 }
145
146 Ok(Self { name })
147 }
148
149 pub fn new_unchecked(name: &'a str) -> Self {
156 Self { name }
157 }
158}
159
160#[derive(Clone, Copy)]
167pub struct ColumnName<'a> {
168 name: &'a str,
169}
170
171impl<'a> ColumnName<'a> {
172 pub fn new(name: &'a str) -> Result<Self> {
174 if name.is_empty() {
175 return Err(error::fmt!(
176 InvalidName,
177 "Column names must have a non-zero length."
178 ));
179 }
180
181 for (index, c) in name.chars().enumerate() {
182 match c {
183 '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
184 | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
185 | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
186 | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
187 return Err(error::fmt!(
188 InvalidName,
189 concat!(
190 "Bad string {:?}: ",
191 "Column names can't contain ",
192 "a {:?} character, which was found at ",
193 "byte position {}."
194 ),
195 name,
196 c,
197 index
198 ));
199 }
200 '\u{FEFF}' => {
201 return Err(error::fmt!(
204 InvalidName,
205 concat!(
206 "Bad string {:?}: ",
207 "Column names can't contain ",
208 "a UTF-8 BOM character, which was found at ",
209 "byte position {}."
210 ),
211 name,
212 index
213 ));
214 }
215 _ => (),
216 }
217 }
218
219 Ok(Self { name })
220 }
221
222 pub fn new_unchecked(name: &'a str) -> Self {
229 Self { name }
230 }
231}
232
233impl<'a> TryFrom<&'a str> for TableName<'a> {
234 type Error = self::Error;
235
236 fn try_from(name: &'a str) -> Result<Self> {
237 Self::new(name)
238 }
239}
240
241impl<'a> TryFrom<&'a str> for ColumnName<'a> {
242 type Error = self::Error;
243
244 fn try_from(name: &'a str) -> Result<Self> {
245 Self::new(name)
246 }
247}
248
249impl From<Infallible> for Error {
250 fn from(_: Infallible) -> Self {
251 unreachable!()
252 }
253}
254
255fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut String, s: &str)
256where
257 C: Fn(u8) -> bool,
258 Q: Fn(&mut Vec<u8>),
259{
260 let output_vec = unsafe { output.as_mut_vec() };
261 let mut to_escape = 0usize;
262 for b in s.bytes() {
263 if check_escape_fn(b) {
264 to_escape += 1;
265 }
266 }
267
268 quoting_fn(output_vec);
269
270 if to_escape == 0 {
271 output_vec.extend_from_slice(s.as_bytes());
273 } else {
274 let additional = s.len() + to_escape;
275 output_vec.reserve(additional);
276 let mut index = output_vec.len();
277 unsafe { output_vec.set_len(index + additional) };
278 for b in s.bytes() {
279 if check_escape_fn(b) {
280 unsafe {
281 *output_vec.get_unchecked_mut(index) = b'\\';
282 }
283 index += 1;
284 }
285
286 unsafe {
287 *output_vec.get_unchecked_mut(index) = b;
288 }
289 index += 1;
290 }
291 }
292
293 quoting_fn(output_vec);
294}
295
296fn must_escape_unquoted(c: u8) -> bool {
297 matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
298}
299
300fn must_escape_quoted(c: u8) -> bool {
301 matches!(c, b'\n' | b'\r' | b'"' | b'\\')
302}
303
304fn write_escaped_unquoted(output: &mut String, s: &str) {
305 write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
306}
307
308fn write_escaped_quoted(output: &mut String, s: &str) {
309 write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
310}
311
312enum Connection {
313 Direct(Socket),
314 Tls(Box<StreamOwned<ClientConnection, Socket>>),
315}
316
317impl Connection {
318 fn send_key_id(&mut self, key_id: &str) -> Result<()> {
319 writeln!(self, "{}", key_id)
320 .map_err(|io_err| map_io_to_socket_err("Failed to send key_id: ", io_err))?;
321 Ok(())
322 }
323
324 fn read_challenge(&mut self) -> Result<Vec<u8>> {
325 let mut buf = Vec::new();
326 let mut reader = BufReader::new(self);
327 reader.read_until(b'\n', &mut buf).map_err(|io_err| {
328 map_io_to_socket_err(
329 "Failed to read authentication challenge (timed out?): ",
330 io_err,
331 )
332 })?;
333 if buf.last().copied().unwrap_or(b'\0') != b'\n' {
334 return Err(if buf.is_empty() {
335 error::fmt!(
336 AuthError,
337 concat!(
338 "Did not receive auth challenge. ",
339 "Is the database configured to require ",
340 "authentication?"
341 )
342 )
343 } else {
344 error::fmt!(AuthError, "Received incomplete auth challenge: {:?}", buf)
345 });
346 }
347 buf.pop(); Ok(buf)
349 }
350
351 fn authenticate(&mut self, auth: &EcdsaAuthParams) -> Result<()> {
352 if auth.key_id.contains('\n') {
353 return Err(error::fmt!(
354 AuthError,
355 "Bad key id {:?}: Should not contain new-line char.",
356 auth.key_id
357 ));
358 }
359 let key_pair = parse_key_pair(auth)?;
360 self.send_key_id(auth.key_id.as_str())?;
361 let challenge = self.read_challenge()?;
362 let rng = SystemRandom::new();
363 let signature = key_pair
364 .sign(&rng, &challenge[..])
365 .map_err(|unspecified_err| {
366 error::fmt!(AuthError, "Failed to sign challenge: {}", unspecified_err)
367 })?;
368 let mut encoded_sig = Base64::encode_string(signature.as_ref());
369 encoded_sig.push('\n');
370 let buf = encoded_sig.as_bytes();
371 if let Err(io_err) = self.write_all(buf) {
372 return Err(map_io_to_socket_err(
373 "Could not send signed challenge: ",
374 io_err,
375 ));
376 }
377 Ok(())
378 }
379}
380
381enum ProtocolHandler {
382 Socket(Connection),
383
384 #[cfg(feature = "ilp-over-http")]
385 Http(HttpHandlerState),
386}
387
388impl io::Read for Connection {
389 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
390 match self {
391 Self::Direct(sock) => sock.read(buf),
392 Self::Tls(stream) => stream.read(buf),
393 }
394 }
395}
396
397impl io::Write for Connection {
398 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
399 match self {
400 Self::Direct(sock) => sock.write(buf),
401 Self::Tls(stream) => stream.write(buf),
402 }
403 }
404
405 fn flush(&mut self) -> io::Result<()> {
406 match self {
407 Self::Direct(sock) => sock.flush(),
408 Self::Tls(stream) => stream.flush(),
409 }
410 }
411}
412
413#[derive(Debug, Copy, Clone, PartialEq)]
414enum OpCase {
415 Init = Op::Table as isize,
416 TableWritten = Op::Symbol as isize | Op::Column as isize,
417 SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
418 ColumnWritten = Op::Column as isize | Op::At as isize,
419 MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
420}
421
422impl OpCase {
423 fn next_op_descr(self) -> &'static str {
424 match self {
425 OpCase::Init => "should have called `table` instead",
426 OpCase::TableWritten => "should have called `symbol` or `column` instead",
427 OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
428 OpCase::ColumnWritten => "should have called `column` or `at` instead",
429 OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
430 }
431 }
432}
433
434#[derive(Debug, Clone, Copy)]
437struct BufferState {
438 op_case: OpCase,
439 row_count: usize,
440 first_table_len: Option<NonZeroUsize>,
441 transactional: bool,
442}
443
444impl BufferState {
445 fn new() -> Self {
446 Self {
447 op_case: OpCase::Init,
448 row_count: 0,
449 first_table_len: None,
450 transactional: true,
451 }
452 }
453}
454
455#[derive(Debug, Clone)]
541pub struct Buffer {
542 output: String,
543 state: BufferState,
544 marker: Option<(usize, BufferState)>,
545 max_name_len: usize,
546}
547
548impl Buffer {
549 pub fn new() -> Self {
552 Self {
553 output: String::new(),
554 state: BufferState::new(),
555 marker: None,
556 max_name_len: 127,
557 }
558 }
559
560 pub fn with_max_name_len(max_name_len: usize) -> Self {
568 let mut buf = Self::new();
569 buf.max_name_len = max_name_len;
570 buf
571 }
572
573 pub fn reserve(&mut self, additional: usize) {
578 self.output.reserve(additional);
579 }
580
581 pub fn len(&self) -> usize {
583 self.output.len()
584 }
585
586 pub fn row_count(&self) -> usize {
588 self.state.row_count
589 }
590
591 pub fn transactional(&self) -> bool {
595 self.state.transactional
596 }
597
598 pub fn is_empty(&self) -> bool {
599 self.output.is_empty()
600 }
601
602 pub fn capacity(&self) -> usize {
604 self.output.capacity()
605 }
606
607 pub fn as_str(&self) -> &str {
609 &self.output
610 }
611
612 pub fn set_marker(&mut self) -> Result<()> {
619 if (self.state.op_case as isize & Op::Table as isize) == 0 {
620 return Err(error::fmt!(
621 InvalidApiCall,
622 concat!(
623 "Can't set the marker whilst constructing a line. ",
624 "A marker may only be set on an empty buffer or after ",
625 "`at` or `at_now` is called."
626 )
627 ));
628 }
629 self.marker = Some((self.output.len(), self.state));
630 Ok(())
631 }
632
633 pub fn rewind_to_marker(&mut self) -> Result<()> {
638 if let Some((position, state)) = self.marker.take() {
639 self.output.truncate(position);
640 self.state = state;
641 Ok(())
642 } else {
643 Err(error::fmt!(
644 InvalidApiCall,
645 "Can't rewind to the marker: No marker set."
646 ))
647 }
648 }
649
650 pub fn clear_marker(&mut self) {
655 self.marker = None;
656 }
657
658 pub fn clear(&mut self) {
661 self.output.clear();
662 self.state = BufferState::new();
663 self.marker = None;
664 }
665
666 #[inline(always)]
668 fn check_op(&self, op: Op) -> Result<()> {
669 if (self.state.op_case as isize & op as isize) > 0 {
670 Ok(())
671 } else {
672 Err(error::fmt!(
673 InvalidApiCall,
674 "State error: Bad call to `{}`, {}.",
675 op.descr(),
676 self.state.op_case.next_op_descr()
677 ))
678 }
679 }
680
681 #[inline(always)]
682 fn validate_max_name_len(&self, name: &str) -> Result<()> {
683 if name.len() > self.max_name_len {
684 return Err(error::fmt!(
685 InvalidName,
686 "Bad name: {:?}: Too long (max {} characters)",
687 name,
688 self.max_name_len
689 ));
690 }
691 Ok(())
692 }
693
694 pub fn table<'a, N>(&mut self, name: N) -> Result<&mut Self>
721 where
722 N: TryInto<TableName<'a>>,
723 Error: From<N::Error>,
724 {
725 let name: TableName<'a> = name.try_into()?;
726 self.validate_max_name_len(name.name)?;
727 self.check_op(Op::Table)?;
728 let table_begin = self.output.len();
729 write_escaped_unquoted(&mut self.output, name.name);
730 let table_end = self.output.len();
731 self.state.op_case = OpCase::TableWritten;
732
733 if let Some(first_table_len) = &self.state.first_table_len {
735 let first_table = &self.output[0..(first_table_len.get())];
736 let this_table = &self.output[table_begin..table_end];
737 if first_table != this_table {
738 self.state.transactional = false;
739 }
740 } else {
741 debug_assert!(table_begin == 0);
742
743 let first_table_len = NonZeroUsize::new(table_end);
748
749 debug_assert!(first_table_len.is_some());
751
752 self.state.first_table_len = first_table_len;
753 }
754 Ok(self)
755 }
756
757 pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
802 where
803 N: TryInto<ColumnName<'a>>,
804 S: AsRef<str>,
805 Error: From<N::Error>,
806 {
807 let name: ColumnName<'a> = name.try_into()?;
808 self.validate_max_name_len(name.name)?;
809 self.check_op(Op::Symbol)?;
810 self.output.push(',');
811 write_escaped_unquoted(&mut self.output, name.name);
812 self.output.push('=');
813 write_escaped_unquoted(&mut self.output, value.as_ref());
814 self.state.op_case = OpCase::SymbolWritten;
815 Ok(self)
816 }
817
818 fn write_column_key<'a, N>(&mut self, name: N) -> Result<&mut Self>
819 where
820 N: TryInto<ColumnName<'a>>,
821 Error: From<N::Error>,
822 {
823 let name: ColumnName<'a> = name.try_into()?;
824 self.validate_max_name_len(name.name)?;
825 self.check_op(Op::Column)?;
826 self.output
827 .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
828 ' '
829 } else {
830 ','
831 });
832 write_escaped_unquoted(&mut self.output, name.name);
833 self.output.push('=');
834 self.state.op_case = OpCase::ColumnWritten;
835 Ok(self)
836 }
837
838 pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> Result<&mut Self>
867 where
868 N: TryInto<ColumnName<'a>>,
869 Error: From<N::Error>,
870 {
871 self.write_column_key(name)?;
872 self.output.push(if value { 't' } else { 'f' });
873 Ok(self)
874 }
875
876 pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> Result<&mut Self>
905 where
906 N: TryInto<ColumnName<'a>>,
907 Error: From<N::Error>,
908 {
909 self.write_column_key(name)?;
910 let mut buf = itoa::Buffer::new();
911 let printed = buf.format(value);
912 self.output.push_str(printed);
913 self.output.push('i');
914 Ok(self)
915 }
916
917 pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> Result<&mut Self>
946 where
947 N: TryInto<ColumnName<'a>>,
948 Error: From<N::Error>,
949 {
950 self.write_column_key(name)?;
951 let mut ser = F64Serializer::new(value);
952 self.output.push_str(ser.as_str());
953 Ok(self)
954 }
955
956 pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
999 where
1000 N: TryInto<ColumnName<'a>>,
1001 S: AsRef<str>,
1002 Error: From<N::Error>,
1003 {
1004 self.write_column_key(name)?;
1005 write_escaped_quoted(&mut self.output, value.as_ref());
1006 Ok(self)
1007 }
1008
1009 pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> Result<&mut Self>
1062 where
1063 N: TryInto<ColumnName<'a>>,
1064 T: TryInto<Timestamp>,
1065 Error: From<N::Error>,
1066 Error: From<T::Error>,
1067 {
1068 self.write_column_key(name)?;
1069 let timestamp: Timestamp = value.try_into()?;
1070 let timestamp: TimestampMicros = timestamp.try_into()?;
1071 let mut buf = itoa::Buffer::new();
1072 let printed = buf.format(timestamp.as_i64());
1073 self.output.push_str(printed);
1074 self.output.push('t');
1075 Ok(self)
1076 }
1077
1078 pub fn at<T>(&mut self, timestamp: T) -> Result<()>
1115 where
1116 T: TryInto<Timestamp>,
1117 Error: From<T::Error>,
1118 {
1119 self.check_op(Op::At)?;
1120 let timestamp: Timestamp = timestamp.try_into()?;
1121
1122 let timestamp: Result<TimestampNanos> = timestamp.try_into();
1124 let timestamp: TimestampNanos = timestamp?;
1125
1126 let epoch_nanos = timestamp.as_i64();
1127 if epoch_nanos < 0 {
1128 return Err(error::fmt!(
1129 InvalidTimestamp,
1130 "Timestamp {} is negative. It must be >= 0.",
1131 epoch_nanos
1132 ));
1133 }
1134 let mut buf = itoa::Buffer::new();
1135 let printed = buf.format(epoch_nanos);
1136 self.output.push(' ');
1137 self.output.push_str(printed);
1138 self.output.push('\n');
1139 self.state.op_case = OpCase::MayFlushOrTable;
1140 self.state.row_count += 1;
1141 Ok(())
1142 }
1143
1144 pub fn at_now(&mut self) -> Result<()> {
1173 self.check_op(Op::At)?;
1174 self.output.push('\n');
1175 self.state.op_case = OpCase::MayFlushOrTable;
1176 self.state.row_count += 1;
1177 Ok(())
1178 }
1179}
1180
1181impl Default for Buffer {
1182 fn default() -> Self {
1183 Self::new()
1184 }
1185}
1186
1187pub struct Sender {
1193 descr: String,
1194 handler: ProtocolHandler,
1195 connected: bool,
1196 max_buf_size: usize,
1197}
1198
1199impl std::fmt::Debug for Sender {
1200 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1201 f.write_str(self.descr.as_str())
1202 }
1203}
1204
1205#[derive(PartialEq, Debug, Clone)]
1206struct EcdsaAuthParams {
1207 key_id: String,
1208 priv_key: String,
1209 pub_key_x: String,
1210 pub_key_y: String,
1211}
1212
1213#[derive(PartialEq, Debug, Clone)]
1214enum AuthParams {
1215 Ecdsa(EcdsaAuthParams),
1216
1217 #[cfg(feature = "ilp-over-http")]
1218 Basic(BasicAuthParams),
1219
1220 #[cfg(feature = "ilp-over-http")]
1221 Token(TokenAuthParams),
1222}
1223
1224#[derive(PartialEq, Debug, Clone, Copy)]
1227pub enum CertificateAuthority {
1228 #[cfg(feature = "tls-webpki-certs")]
1231 WebpkiRoots,
1232
1233 #[cfg(feature = "tls-native-certs")]
1235 OsRoots,
1236
1237 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1239 WebpkiAndOsRoots,
1240
1241 PemFile,
1243}
1244
1245pub struct Port(String);
1265
1266impl From<String> for Port {
1267 fn from(s: String) -> Self {
1268 Port(s)
1269 }
1270}
1271
1272impl From<&str> for Port {
1273 fn from(s: &str) -> Self {
1274 Port(s.to_owned())
1275 }
1276}
1277
1278impl From<u16> for Port {
1279 fn from(p: u16) -> Self {
1280 Port(p.to_string())
1281 }
1282}
1283
1284#[cfg(feature = "insecure-skip-verify")]
1285mod danger {
1286 use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
1287 use rustls::{DigitallySignedStruct, Error, SignatureScheme};
1288 use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
1289
1290 #[derive(Debug)]
1291 pub struct NoCertificateVerification {}
1292
1293 impl ServerCertVerifier for NoCertificateVerification {
1294 fn verify_server_cert(
1295 &self,
1296 _end_entity: &CertificateDer<'_>,
1297 _intermediates: &[CertificateDer<'_>],
1298 _server_name: &ServerName<'_>,
1299 _ocsp_response: &[u8],
1300 _now: UnixTime,
1301 ) -> Result<ServerCertVerified, Error> {
1302 Ok(ServerCertVerified::assertion())
1303 }
1304
1305 fn verify_tls12_signature(
1306 &self,
1307 _message: &[u8],
1308 _cert: &CertificateDer<'_>,
1309 _dss: &DigitallySignedStruct,
1310 ) -> Result<HandshakeSignatureValid, Error> {
1311 Ok(HandshakeSignatureValid::assertion())
1312 }
1313
1314 fn verify_tls13_signature(
1315 &self,
1316 _message: &[u8],
1317 _cert: &CertificateDer<'_>,
1318 _dss: &DigitallySignedStruct,
1319 ) -> Result<HandshakeSignatureValid, Error> {
1320 Ok(HandshakeSignatureValid::assertion())
1321 }
1322
1323 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
1324 rustls::crypto::ring::default_provider()
1325 .signature_verification_algorithms
1326 .supported_schemes()
1327 }
1328 }
1329}
1330
1331#[cfg(feature = "tls-webpki-certs")]
1332fn add_webpki_roots(root_store: &mut RootCertStore) {
1333 root_store
1334 .roots
1335 .extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned())
1336}
1337
1338#[cfg(feature = "tls-native-certs")]
1339fn add_os_roots(root_store: &mut RootCertStore) -> Result<()> {
1340 let os_certs = rustls_native_certs::load_native_certs().map_err(|io_err| {
1341 error::fmt!(
1342 TlsError,
1343 "Could not load OS native TLS certificates: {}",
1344 io_err
1345 )
1346 })?;
1347
1348 let (valid_count, invalid_count) = root_store.add_parsable_certificates(os_certs);
1349 if valid_count == 0 && invalid_count > 0 {
1350 return Err(error::fmt!(
1351 TlsError,
1352 "No valid certificates found in native root store ({} found but were invalid)",
1353 invalid_count
1354 ));
1355 }
1356 Ok(())
1357}
1358
1359fn configure_tls(
1360 tls_enabled: bool,
1361 tls_verify: bool,
1362 tls_ca: CertificateAuthority,
1363 tls_roots: &Option<PathBuf>,
1364) -> Result<Option<Arc<rustls::ClientConfig>>> {
1365 if !tls_enabled {
1366 return Ok(None);
1367 }
1368
1369 let mut root_store = RootCertStore::empty();
1370 if tls_verify {
1371 match (tls_ca, tls_roots) {
1372 #[cfg(feature = "tls-webpki-certs")]
1373 (CertificateAuthority::WebpkiRoots, None) => {
1374 add_webpki_roots(&mut root_store);
1375 }
1376
1377 #[cfg(feature = "tls-webpki-certs")]
1378 (CertificateAuthority::WebpkiRoots, Some(_)) => {
1379 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_roots\"."));
1380 }
1381
1382 #[cfg(feature = "tls-native-certs")]
1383 (CertificateAuthority::OsRoots, None) => {
1384 add_os_roots(&mut root_store)?;
1385 }
1386
1387 #[cfg(feature = "tls-native-certs")]
1388 (CertificateAuthority::OsRoots, Some(_)) => {
1389 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"os_roots\"."));
1390 }
1391
1392 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1393 (CertificateAuthority::WebpkiAndOsRoots, None) => {
1394 add_webpki_roots(&mut root_store);
1395 add_os_roots(&mut root_store)?;
1396 }
1397
1398 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1399 (CertificateAuthority::WebpkiAndOsRoots, Some(_)) => {
1400 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_and_os_roots\"."));
1401 }
1402
1403 (CertificateAuthority::PemFile, Some(ca_file)) => {
1404 let certfile = std::fs::File::open(ca_file).map_err(|io_err| {
1405 error::fmt!(
1406 TlsError,
1407 concat!(
1408 "Could not open tls_roots certificate authority ",
1409 "file from path {:?}: {}"
1410 ),
1411 ca_file,
1412 io_err
1413 )
1414 })?;
1415 let mut reader = BufReader::new(certfile);
1416 let der_certs = rustls_pemfile::certs(&mut reader)
1417 .collect::<std::result::Result<Vec<_>, _>>()
1418 .map_err(|io_err| {
1419 error::fmt!(
1420 TlsError,
1421 concat!(
1422 "Could not read certificate authority ",
1423 "file from path {:?}: {}"
1424 ),
1425 ca_file,
1426 io_err
1427 )
1428 })?;
1429 root_store.add_parsable_certificates(der_certs);
1430 }
1431
1432 (CertificateAuthority::PemFile, None) => {
1433 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" is required when \"tls_ca\" is set to \"pem_file\"."));
1434 }
1435 }
1436 }
1437
1438 let mut config = rustls::ClientConfig::builder()
1439 .with_root_certificates(root_store)
1440 .with_no_client_auth();
1441
1442 config.key_log = Arc::new(rustls::KeyLogFile::new());
1445
1446 #[cfg(feature = "insecure-skip-verify")]
1447 if !tls_verify {
1448 config
1449 .dangerous()
1450 .set_certificate_verifier(Arc::new(danger::NoCertificateVerification {}));
1451 }
1452
1453 Ok(Some(Arc::new(config)))
1454}
1455
1456fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
1457 if let Some(auto_flush) = params.get("auto_flush") {
1458 if auto_flush.as_str() != "off" {
1459 return Err(error::fmt!(
1460 ConfigError,
1461 "Invalid auto_flush value '{auto_flush}'. This client does not \
1462 support auto-flush, so the only accepted value is 'off'"
1463 ));
1464 }
1465 }
1466
1467 for ¶m in ["auto_flush_rows", "auto_flush_bytes"].iter() {
1468 if params.contains_key(param) {
1469 return Err(error::fmt!(
1470 ConfigError,
1471 "Invalid configuration parameter {:?}. This client does not support auto-flush",
1472 param
1473 ));
1474 }
1475 }
1476 Ok(())
1477}
1478
1479#[derive(PartialEq, Debug, Clone, Copy)]
1481pub enum Protocol {
1482 Tcp,
1484
1485 Tcps,
1487
1488 #[cfg(feature = "ilp-over-http")]
1489 Http,
1491
1492 #[cfg(feature = "ilp-over-http")]
1493 Https,
1495}
1496
1497impl Display for Protocol {
1498 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1499 f.write_str(self.schema())
1500 }
1501}
1502
1503impl Protocol {
1504 fn default_port(&self) -> &str {
1505 match self {
1506 Protocol::Tcp | Protocol::Tcps => "9009",
1507 #[cfg(feature = "ilp-over-http")]
1508 Protocol::Http | Protocol::Https => "9000",
1509 }
1510 }
1511
1512 fn tls_enabled(&self) -> bool {
1513 match self {
1514 Protocol::Tcp => false,
1515 Protocol::Tcps => true,
1516 #[cfg(feature = "ilp-over-http")]
1517 Protocol::Http => false,
1518 #[cfg(feature = "ilp-over-http")]
1519 Protocol::Https => true,
1520 }
1521 }
1522
1523 fn is_tcpx(&self) -> bool {
1524 match self {
1525 Protocol::Tcp => true,
1526 Protocol::Tcps => true,
1527 #[cfg(feature = "ilp-over-http")]
1528 Protocol::Http => false,
1529 #[cfg(feature = "ilp-over-http")]
1530 Protocol::Https => false,
1531 }
1532 }
1533
1534 #[cfg(feature = "ilp-over-http")]
1535 fn is_httpx(&self) -> bool {
1536 match self {
1537 Protocol::Tcp => false,
1538 Protocol::Tcps => false,
1539 Protocol::Http => true,
1540 Protocol::Https => true,
1541 }
1542 }
1543
1544 fn schema(&self) -> &str {
1545 match self {
1546 Protocol::Tcp => "tcp",
1547 Protocol::Tcps => "tcps",
1548 #[cfg(feature = "ilp-over-http")]
1549 Protocol::Http => "http",
1550 #[cfg(feature = "ilp-over-http")]
1551 Protocol::Https => "https",
1552 }
1553 }
1554
1555 fn from_schema(schema: &str) -> Result<Self> {
1556 match schema {
1557 "tcp" => Ok(Protocol::Tcp),
1558 "tcps" => Ok(Protocol::Tcps),
1559 #[cfg(feature = "ilp-over-http")]
1560 "http" => Ok(Protocol::Http),
1561 #[cfg(feature = "ilp-over-http")]
1562 "https" => Ok(Protocol::Https),
1563 _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
1564 }
1565 }
1566}
1567
1568#[cfg_attr(
1574 feature = "ilp-over-http",
1575 doc = r##"
1576```no_run
1577# use questdb::Result;
1578use questdb::ingress::{Protocol, SenderBuilder};
1579# fn main() -> Result<()> {
1580let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9009).build()?;
1581# Ok(())
1582# }
1583```
1584"##
1585)]
1586#[derive(Debug, Clone)]
1619pub struct SenderBuilder {
1620 protocol: Protocol,
1621 host: ConfigSetting<String>,
1622 port: ConfigSetting<String>,
1623 net_interface: ConfigSetting<Option<String>>,
1624 max_buf_size: ConfigSetting<usize>,
1625 auth_timeout: ConfigSetting<Duration>,
1626 username: ConfigSetting<Option<String>>,
1627 password: ConfigSetting<Option<String>>,
1628 token: ConfigSetting<Option<String>>,
1629 token_x: ConfigSetting<Option<String>>,
1630 token_y: ConfigSetting<Option<String>>,
1631
1632 #[cfg(feature = "insecure-skip-verify")]
1633 tls_verify: ConfigSetting<bool>,
1634
1635 tls_ca: ConfigSetting<CertificateAuthority>,
1636 tls_roots: ConfigSetting<Option<PathBuf>>,
1637
1638 #[cfg(feature = "ilp-over-http")]
1639 http: Option<HttpConfig>,
1640}
1641
1642impl SenderBuilder {
1643 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
1670 let conf = conf.as_ref();
1671 let conf = questdb_confstr::parse_conf_str(conf)
1672 .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
1673 let service = conf.service();
1674 let params = conf.params();
1675
1676 let protocol = Protocol::from_schema(service)?;
1677
1678 let Some(addr) = params.get("addr") else {
1679 return Err(error::fmt!(
1680 ConfigError,
1681 "Missing \"addr\" parameter in config string"
1682 ));
1683 };
1684 let (host, port) = match addr.split_once(':') {
1685 Some((h, p)) => (h, p),
1686 None => (addr.as_str(), protocol.default_port()),
1687 };
1688 let mut builder = SenderBuilder::new(protocol, host, port);
1689
1690 validate_auto_flush_params(params)?;
1691
1692 for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
1693 builder = match key {
1694 "username" => builder.username(val)?,
1695 "password" => builder.password(val)?,
1696 "token" => builder.token(val)?,
1697 "token_x" => builder.token_x(val)?,
1698 "token_y" => builder.token_y(val)?,
1699 "bind_interface" => builder.bind_interface(val)?,
1700
1701 "init_buf_size" => {
1702 return Err(error::fmt!(
1703 ConfigError,
1704 "\"init_buf_size\" is not supported in config string"
1705 ))
1706 }
1707
1708 "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
1709
1710 "auth_timeout" => {
1711 builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1712 }
1713
1714 "tls_verify" => {
1715 let verify = match val {
1716 "on" => true,
1717 "unsafe_off" => false,
1718 _ => {
1719 return Err(error::fmt!(
1720 ConfigError,
1721 r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
1722 ))
1723 }
1724 };
1725
1726 #[cfg(not(feature = "insecure-skip-verify"))]
1727 {
1728 if !verify {
1729 return Err(error::fmt!(
1730 ConfigError,
1731 r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
1732 ));
1733 }
1734 builder
1735 }
1736
1737 #[cfg(feature = "insecure-skip-verify")]
1738 builder.tls_verify(verify)?
1739 }
1740
1741 "tls_ca" => {
1742 let ca = match val {
1743 #[cfg(feature = "tls-webpki-certs")]
1744 "webpki_roots" => CertificateAuthority::WebpkiRoots,
1745
1746 #[cfg(not(feature = "tls-webpki-certs"))]
1747 "webpki_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature")),
1748
1749 #[cfg(feature = "tls-native-certs")]
1750 "os_roots" => CertificateAuthority::OsRoots,
1751
1752 #[cfg(not(feature = "tls-native-certs"))]
1753 "os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature")),
1754
1755 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1756 "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
1757
1758 #[cfg(not(all(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1759 "webpki_and_os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features")),
1760
1761 _ => return Err(error::fmt!(ConfigError, "Invalid value {val:?} for \"tls_ca\"")),
1762 };
1763 builder.tls_ca(ca)?
1764 }
1765
1766 "tls_roots" => {
1767 let path = PathBuf::from_str(val).map_err(|e| {
1768 error::fmt!(
1769 ConfigError,
1770 "Invalid path {:?} for \"tls_roots\": {}",
1771 val,
1772 e
1773 )
1774 })?;
1775 builder.tls_roots(path)?
1776 }
1777
1778 "tls_roots_password" => {
1779 return Err(error::fmt!(
1780 ConfigError,
1781 "\"tls_roots_password\" is not supported."
1782 ))
1783 }
1784
1785 #[cfg(feature = "ilp-over-http")]
1786 "request_min_throughput" => {
1787 builder.request_min_throughput(parse_conf_value(key, val)?)?
1788 }
1789
1790 #[cfg(feature = "ilp-over-http")]
1791 "request_timeout" => {
1792 builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1793 }
1794
1795 #[cfg(feature = "ilp-over-http")]
1796 "retry_timeout" => {
1797 builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1798 }
1799 _ => builder,
1804 };
1805 }
1806
1807 Ok(builder)
1808 }
1809
1810 pub fn from_env() -> Result<Self> {
1815 let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
1816 error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
1817 })?;
1818 Self::from_conf(conf)
1819 }
1820
1821 pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
1835 let host = host.into();
1836 let port: Port = port.into();
1837 let port = port.0;
1838
1839 #[cfg(feature = "tls-webpki-certs")]
1840 let tls_ca = CertificateAuthority::WebpkiRoots;
1841
1842 #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
1843 let tls_ca = CertificateAuthority::OsRoots;
1844
1845 #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1846 let tls_ca = CertificateAuthority::PemFile;
1847
1848 Self {
1849 protocol,
1850 host: ConfigSetting::new_specified(host),
1851 port: ConfigSetting::new_specified(port),
1852 net_interface: ConfigSetting::new_default(None),
1853 max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
1854 auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
1855 username: ConfigSetting::new_default(None),
1856 password: ConfigSetting::new_default(None),
1857 token: ConfigSetting::new_default(None),
1858 token_x: ConfigSetting::new_default(None),
1859 token_y: ConfigSetting::new_default(None),
1860
1861 #[cfg(feature = "insecure-skip-verify")]
1862 tls_verify: ConfigSetting::new_default(true),
1863
1864 tls_ca: ConfigSetting::new_default(tls_ca),
1865 tls_roots: ConfigSetting::new_default(None),
1866
1867 #[cfg(feature = "ilp-over-http")]
1868 http: if protocol.is_httpx() {
1869 Some(HttpConfig::default())
1870 } else {
1871 None
1872 },
1873 }
1874 }
1875
1876 pub fn bind_interface<I: Into<String>>(mut self, addr: I) -> Result<Self> {
1882 self.ensure_is_tcpx("bind_interface")?;
1883 self.net_interface
1884 .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
1885 Ok(self)
1886 }
1887
1888 pub fn username(mut self, username: &str) -> Result<Self> {
1897 self.username
1898 .set_specified("username", Some(validate_value(username.to_string())?))?;
1899 Ok(self)
1900 }
1901
1902 pub fn password(mut self, password: &str) -> Result<Self> {
1905 self.password
1906 .set_specified("password", Some(validate_value(password.to_string())?))?;
1907 Ok(self)
1908 }
1909
1910 pub fn token(mut self, token: &str) -> Result<Self> {
1913 self.token
1914 .set_specified("token", Some(validate_value(token.to_string())?))?;
1915 Ok(self)
1916 }
1917
1918 pub fn token_x(mut self, token_x: &str) -> Result<Self> {
1920 self.token_x
1921 .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
1922 Ok(self)
1923 }
1924
1925 pub fn token_y(mut self, token_y: &str) -> Result<Self> {
1927 self.token_y
1928 .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
1929 Ok(self)
1930 }
1931
1932 pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
1936 self.auth_timeout.set_specified("auth_timeout", value)?;
1937 Ok(self)
1938 }
1939
1940 pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
1942 if !self.protocol.tls_enabled() {
1943 return Err(error::fmt!(
1944 ConfigError,
1945 "Cannot set {property:?}: TLS is not supported for protocol {}",
1946 self.protocol
1947 ));
1948 }
1949 Ok(())
1950 }
1951
1952 #[cfg(feature = "insecure-skip-verify")]
1958 pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
1959 self.ensure_tls_enabled("tls_verify")?;
1960 self.tls_verify.set_specified("tls_verify", verify)?;
1961 Ok(self)
1962 }
1963
1964 pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
1967 self.ensure_tls_enabled("tls_ca")?;
1968 self.tls_ca.set_specified("tls_ca", ca)?;
1969 Ok(self)
1970 }
1971
1972 pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
1978 let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
1979 let path = path.into();
1980 let _file = std::fs::File::open(&path).map_err(|io_err| {
1982 error::fmt!(
1983 ConfigError,
1984 "Could not open root certificate file from path {:?}: {}",
1985 path,
1986 io_err
1987 )
1988 })?;
1989 builder.tls_roots.set_specified("tls_roots", Some(path))?;
1990 Ok(builder)
1991 }
1992
1993 pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
1996 let min = 1024;
1997 if value < min {
1998 return Err(error::fmt!(
1999 ConfigError,
2000 "max_buf_size\" must be at least {min} bytes."
2001 ));
2002 }
2003 self.max_buf_size.set_specified("max_buf_size", value)?;
2004 Ok(self)
2005 }
2006
2007 #[cfg(feature = "ilp-over-http")]
2008 pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
2011 if let Some(http) = &mut self.http {
2012 http.retry_timeout.set_specified("retry_timeout", value)?;
2013 } else {
2014 return Err(error::fmt!(
2015 ConfigError,
2016 "retry_timeout is supported only in ILP over HTTP."
2017 ));
2018 }
2019 Ok(self)
2020 }
2021
2022 #[cfg(feature = "ilp-over-http")]
2023 pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
2033 if let Some(http) = &mut self.http {
2034 http.request_min_throughput
2035 .set_specified("request_min_throughput", value)?;
2036 } else {
2037 return Err(error::fmt!(
2038 ConfigError,
2039 "\"request_min_throughput\" is supported only in ILP over HTTP."
2040 ));
2041 }
2042 Ok(self)
2043 }
2044
2045 #[cfg(feature = "ilp-over-http")]
2046 pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
2051 if let Some(http) = &mut self.http {
2052 if value.is_zero() {
2053 return Err(error::fmt!(
2054 ConfigError,
2055 "\"request_timeout\" must be greater than 0."
2056 ));
2057 }
2058 http.request_timeout
2059 .set_specified("request_timeout", value)?;
2060 } else {
2061 return Err(error::fmt!(
2062 ConfigError,
2063 "\"request_timeout\" is supported only in ILP over HTTP."
2064 ));
2065 }
2066 Ok(self)
2067 }
2068
2069 #[cfg(feature = "ilp-over-http")]
2070 #[doc(hidden)]
2074 pub fn user_agent(mut self, value: &str) -> Result<Self> {
2075 let value = validate_value(value)?;
2076 if let Some(http) = &mut self.http {
2077 http.user_agent = value.to_string();
2078 }
2079 Ok(self)
2080 }
2081
2082 fn connect_tcp(&self, auth: &Option<AuthParams>) -> Result<ProtocolHandler> {
2083 let addr: SockAddr = gai::resolve_host_port(self.host.as_str(), self.port.as_str())?;
2084 let mut sock = Socket::new(Domain::IPV4, Type::STREAM, Some(SockProtocol::TCP))
2085 .map_err(|io_err| map_io_to_socket_err("Could not open TCP socket: ", io_err))?;
2086
2087 sock.set_reuse_address(true)
2091 .map_err(|io_err| map_io_to_socket_err("Could not set SO_REUSEADDR: ", io_err))?;
2092
2093 sock.set_linger(Some(Duration::from_secs(120)))
2094 .map_err(|io_err| map_io_to_socket_err("Could not set socket linger: ", io_err))?;
2095 sock.set_keepalive(true)
2096 .map_err(|io_err| map_io_to_socket_err("Could not set SO_KEEPALIVE: ", io_err))?;
2097 sock.set_nodelay(true)
2098 .map_err(|io_err| map_io_to_socket_err("Could not set TCP_NODELAY: ", io_err))?;
2099 if let Some(ref host) = self.net_interface.deref() {
2100 let bind_addr = gai::resolve_host(host.as_str())?;
2101 sock.bind(&bind_addr).map_err(|io_err| {
2102 map_io_to_socket_err(
2103 &format!("Could not bind to interface address {:?}: ", host),
2104 io_err,
2105 )
2106 })?;
2107 }
2108 sock.connect(&addr).map_err(|io_err| {
2109 let host_port = format!("{}:{}", self.host.deref(), *self.port);
2110 let prefix = format!("Could not connect to {:?}: ", host_port);
2111 map_io_to_socket_err(&prefix, io_err)
2112 })?;
2113
2114 sock.set_read_timeout(Some(*self.auth_timeout))
2119 .map_err(|io_err| {
2120 map_io_to_socket_err("Failed to set read timeout on socket: ", io_err)
2121 })?;
2122
2123 #[cfg(feature = "insecure-skip-verify")]
2124 let tls_verify = *self.tls_verify;
2125
2126 #[cfg(not(feature = "insecure-skip-verify"))]
2127 let tls_verify = true;
2128
2129 let mut conn = match configure_tls(
2130 self.protocol.tls_enabled(),
2131 tls_verify,
2132 *self.tls_ca,
2133 self.tls_roots.deref(),
2134 )? {
2135 Some(tls_config) => {
2136 let server_name: ServerName = ServerName::try_from(self.host.as_str())
2137 .map_err(|inv_dns_err| error::fmt!(TlsError, "Bad host: {}", inv_dns_err))?
2138 .to_owned();
2139 let mut tls_conn =
2140 ClientConnection::new(tls_config, server_name).map_err(|rustls_err| {
2141 error::fmt!(TlsError, "Could not create TLS client: {}", rustls_err)
2142 })?;
2143 while tls_conn.wants_write() || tls_conn.is_handshaking() {
2144 tls_conn.complete_io(&mut sock).map_err(|io_err| {
2145 if (io_err.kind() == ErrorKind::TimedOut)
2146 || (io_err.kind() == ErrorKind::WouldBlock)
2147 {
2148 error::fmt!(
2149 TlsError,
2150 concat!(
2151 "Failed to complete TLS handshake:",
2152 " Timed out waiting for server ",
2153 "response after {:?}."
2154 ),
2155 *self.auth_timeout
2156 )
2157 } else {
2158 error::fmt!(TlsError, "Failed to complete TLS handshake: {}", io_err)
2159 }
2160 })?;
2161 }
2162 Connection::Tls(StreamOwned::new(tls_conn, sock).into())
2163 }
2164 None => Connection::Direct(sock),
2165 };
2166
2167 if let Some(AuthParams::Ecdsa(auth)) = auth {
2168 conn.authenticate(auth)?;
2169 }
2170
2171 Ok(ProtocolHandler::Socket(conn))
2172 }
2173
2174 fn build_auth(&self) -> Result<Option<AuthParams>> {
2175 match (
2176 self.protocol,
2177 self.username.deref(),
2178 self.password.deref(),
2179 self.token.deref(),
2180 self.token_x.deref(),
2181 self.token_y.deref(),
2182 ) {
2183 (_, None, None, None, None, None) => Ok(None),
2184 (
2185 protocol,
2186 Some(username),
2187 None,
2188 Some(token),
2189 Some(token_x),
2190 Some(token_y),
2191 ) if protocol.is_tcpx() => Ok(Some(AuthParams::Ecdsa(EcdsaAuthParams {
2192 key_id: username.to_string(),
2193 priv_key: token.to_string(),
2194 pub_key_x: token_x.to_string(),
2195 pub_key_y: token_y.to_string(),
2196 }))),
2197 (protocol, Some(_username), Some(_password), None, None, None)
2198 if protocol.is_tcpx() => {
2199 Err(error::fmt!(ConfigError,
2200 r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
2201 ))
2202 }
2203 (protocol, None, None, Some(_token), None, None)
2204 if protocol.is_tcpx() => {
2205 Err(error::fmt!(ConfigError, "Token authentication only be used with the ILP/HTTP protocol."))
2206 }
2207 (protocol, _username, None, _token, _token_x, _token_y)
2208 if protocol.is_tcpx() => {
2209 Err(error::fmt!(ConfigError,
2210 r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
2211 ))
2212 }
2213 #[cfg(feature = "ilp-over-http")]
2214 (protocol, Some(username), Some(password), None, None, None)
2215 if protocol.is_httpx() => {
2216 Ok(Some(AuthParams::Basic(BasicAuthParams {
2217 username: username.to_string(),
2218 password: password.to_string(),
2219 })))
2220 }
2221 #[cfg(feature = "ilp-over-http")]
2222 (protocol, Some(_username), None, None, None, None)
2223 if protocol.is_httpx() => {
2224 Err(error::fmt!(ConfigError,
2225 r##"Basic authentication parameter "username" is present, but "password" is missing."##,
2226 ))
2227 }
2228 #[cfg(feature = "ilp-over-http")]
2229 (protocol, None, Some(_password), None, None, None)
2230 if protocol.is_httpx() => {
2231 Err(error::fmt!(ConfigError,
2232 r##"Basic authentication parameter "password" is present, but "username" is missing."##,
2233 ))
2234 }
2235 #[cfg(feature = "ilp-over-http")]
2236 (protocol, None, None, Some(token), None, None)
2237 if protocol.is_httpx() => {
2238 Ok(Some(AuthParams::Token(TokenAuthParams {
2239 token: token.to_string(),
2240 })))
2241 }
2242 #[cfg(feature = "ilp-over-http")]
2243 (
2244 protocol,
2245 Some(_username),
2246 None,
2247 Some(_token),
2248 Some(_token_x),
2249 Some(_token_y),
2250 ) if protocol.is_httpx() => {
2251 Err(error::fmt!(ConfigError, "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."))
2252 }
2253 #[cfg(feature = "ilp-over-http")]
2254 (protocol, _username, _password, _token, None, None)
2255 if protocol.is_httpx() => {
2256 Err(error::fmt!(ConfigError,
2257 r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
2258 ))
2259 }
2260 _ => {
2261 Err(error::fmt!(ConfigError,
2262 r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
2263 ))
2264 }
2265 }
2266 }
2267
2268 pub fn build(&self) -> Result<Sender> {
2275 let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
2276
2277 if self.protocol.tls_enabled() {
2278 write!(descr, "tls=enabled,").unwrap();
2279 } else {
2280 write!(descr, "tls=disabled,").unwrap();
2281 }
2282
2283 let auth = self.build_auth()?;
2284
2285 let handler = match self.protocol {
2286 Protocol::Tcp | Protocol::Tcps => self.connect_tcp(&auth)?,
2287 #[cfg(feature = "ilp-over-http")]
2288 Protocol::Http | Protocol::Https => {
2289 if self.net_interface.is_some() {
2290 return Err(error::fmt!(
2292 InvalidApiCall,
2293 "net_interface is not supported for ILP over HTTP."
2294 ));
2295 }
2296
2297 let http_config = self.http.as_ref().unwrap();
2298 let user_agent = http_config.user_agent.as_str();
2299 let agent_builder = ureq::AgentBuilder::new()
2300 .user_agent(user_agent)
2301 .no_delay(true);
2302
2303 #[cfg(feature = "insecure-skip-verify")]
2304 let tls_verify = *self.tls_verify;
2305
2306 #[cfg(not(feature = "insecure-skip-verify"))]
2307 let tls_verify = true;
2308
2309 let agent_builder = match configure_tls(
2310 self.protocol.tls_enabled(),
2311 tls_verify,
2312 *self.tls_ca,
2313 self.tls_roots.deref(),
2314 )? {
2315 Some(tls_config) => agent_builder.tls_config(tls_config),
2316 None => agent_builder,
2317 };
2318 let auth = match auth {
2319 Some(AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
2320 Some(AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
2321 Some(AuthParams::Ecdsa(_)) => {
2322 return Err(error::fmt!(
2323 AuthError,
2324 "ECDSA authentication is not supported for ILP over HTTP. \
2325 Please use basic or token authentication instead."
2326 ));
2327 }
2328 None => None,
2329 };
2330 let agent_builder =
2331 agent_builder.timeout_connect(*http_config.request_timeout.deref());
2332 let agent = agent_builder.build();
2333 let proto = self.protocol.schema();
2334 let url = format!(
2335 "{}://{}:{}/write",
2336 proto,
2337 self.host.deref(),
2338 self.port.deref()
2339 );
2340 ProtocolHandler::Http(HttpHandlerState {
2341 agent,
2342 url,
2343 auth,
2344
2345 config: self.http.as_ref().unwrap().clone(),
2346 })
2347 }
2348 };
2349
2350 if auth.is_some() {
2351 descr.push_str("auth=on]");
2352 } else {
2353 descr.push_str("auth=off]");
2354 }
2355
2356 let sender = Sender {
2357 descr,
2358 handler,
2359 connected: true,
2360 max_buf_size: *self.max_buf_size,
2361 };
2362
2363 Ok(sender)
2364 }
2365
2366 fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
2367 if self.protocol.is_tcpx() {
2368 Ok(())
2369 } else {
2370 Err(error::fmt!(
2371 ConfigError,
2372 "The {param_name:?} setting can only be used with the TCP protocol."
2373 ))
2374 }
2375 }
2376}
2377
2378fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
2381 let str_ref = value.as_ref();
2382 for (p, c) in str_ref.chars().enumerate() {
2383 if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
2384 return Err(error::fmt!(
2385 ConfigError,
2386 "Invalid character {c:?} at position {p}"
2387 ));
2388 }
2389 }
2390 Ok(value)
2391}
2392
2393fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
2394where
2395 T: FromStr,
2396 T::Err: std::fmt::Debug,
2397{
2398 str_value.parse().map_err(|e| {
2399 error::fmt!(
2400 ConfigError,
2401 "Could not parse {param_name:?} to number: {e:?}"
2402 )
2403 })
2404}
2405
2406fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
2407 Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
2408 error::fmt!(
2409 AuthError,
2410 "Misconfigured ILP authentication keys. Could not decode {}: {}. \
2411 Hint: Check the keys for a possible typo.",
2412 descr,
2413 b64_err
2414 )
2415 })
2416}
2417
2418fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
2419 let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
2420 let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
2421
2422 let mut encoded = Vec::new();
2424 encoded.push(4u8); let pub_key_x_ken = pub_key_x.len();
2426 if pub_key_x_ken > 32 {
2427 return Err(error::fmt!(
2428 AuthError,
2429 "Misconfigured ILP authentication keys. Public key x is too long. \
2430 Hint: Check the keys for a possible typo."
2431 ));
2432 }
2433 let pub_key_y_len = pub_key_y.len();
2434 if pub_key_y_len > 32 {
2435 return Err(error::fmt!(
2436 AuthError,
2437 "Misconfigured ILP authentication keys. Public key y is too long. \
2438 Hint: Check the keys for a possible typo."
2439 ));
2440 }
2441 encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
2442 encoded.append(&mut pub_key_x);
2443 encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
2444 encoded.append(&mut pub_key_y);
2445 Ok(encoded)
2446}
2447
2448fn parse_key_pair(auth: &EcdsaAuthParams) -> Result<EcdsaKeyPair> {
2449 let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
2450 let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
2451 let system_random = SystemRandom::new();
2452 EcdsaKeyPair::from_private_key_and_public_key(
2453 &ECDSA_P256_SHA256_FIXED_SIGNING,
2454 &private_key[..],
2455 &public_key[..],
2456 &system_random,
2457 )
2458 .map_err(|key_rejected| {
2459 error::fmt!(
2460 AuthError,
2461 "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
2462 key_rejected
2463 )
2464 })
2465}
2466
2467pub(crate) struct F64Serializer {
2468 buf: ryu::Buffer,
2469 n: f64,
2470}
2471
2472impl F64Serializer {
2473 pub(crate) fn new(n: f64) -> Self {
2474 F64Serializer {
2475 buf: ryu::Buffer::new(),
2476 n,
2477 }
2478 }
2479
2480 #[cold]
2482 #[cfg_attr(feature = "no-panic", inline)]
2483 fn format_nonfinite(&self) -> &'static str {
2484 const MANTISSA_MASK: u64 = 0x000fffffffffffff;
2485 const SIGN_MASK: u64 = 0x8000000000000000;
2486 let bits = self.n.to_bits();
2487 if bits & MANTISSA_MASK != 0 {
2488 "NaN"
2489 } else if bits & SIGN_MASK != 0 {
2490 "-Infinity"
2491 } else {
2492 "Infinity"
2493 }
2494 }
2495
2496 pub(crate) fn as_str(&mut self) -> &str {
2497 if self.n.is_finite() {
2498 self.buf.format_finite(self.n)
2499 } else {
2500 self.format_nonfinite()
2501 }
2502 }
2503}
2504
2505impl Sender {
2506 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
2529 SenderBuilder::from_conf(conf)?.build()
2530 }
2531
2532 pub fn from_env() -> Result<Self> {
2541 SenderBuilder::from_env()?.build()
2542 }
2543
2544 #[allow(unused_variables)]
2545 fn flush_impl(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2546 if !self.connected {
2547 return Err(error::fmt!(
2548 SocketError,
2549 "Could not flush buffer: not connected to database."
2550 ));
2551 }
2552 buf.check_op(Op::Flush)?;
2553
2554 if buf.len() > self.max_buf_size {
2555 return Err(error::fmt!(
2556 InvalidApiCall,
2557 "Could not flush buffer: Buffer size of {} exceeds maximum configured allowed size of {} bytes.",
2558 buf.len(),
2559 self.max_buf_size
2560 ));
2561 }
2562
2563 let bytes = buf.as_str().as_bytes();
2564 if bytes.is_empty() {
2565 return Ok(());
2566 }
2567 match self.handler {
2568 ProtocolHandler::Socket(ref mut conn) => {
2569 if transactional {
2570 return Err(error::fmt!(
2571 InvalidApiCall,
2572 "Transactional flushes are not supported for ILP over TCP."
2573 ));
2574 }
2575 conn.write_all(bytes).map_err(|io_err| {
2576 self.connected = false;
2577 map_io_to_socket_err("Could not flush buffer: ", io_err)
2578 })?;
2579 }
2580 #[cfg(feature = "ilp-over-http")]
2581 ProtocolHandler::Http(ref state) => {
2582 if transactional && !buf.transactional() {
2583 return Err(error::fmt!(
2584 InvalidApiCall,
2585 "Buffer contains lines for multiple tables. \
2586 Transactional flushes are only supported for buffers containing lines for a single table."
2587 ));
2588 }
2589 let request_min_throughput = *state.config.request_min_throughput;
2590 let extra_time = if request_min_throughput > 0 {
2591 (bytes.len() as f64) / (request_min_throughput as f64)
2592 } else {
2593 0.0f64
2594 };
2595 let timeout = *state.config.request_timeout + Duration::from_secs_f64(extra_time);
2596 let request = state
2597 .agent
2598 .post(&state.url)
2599 .query_pairs([("precision", "n")])
2600 .timeout(timeout)
2601 .set("Content-Type", "text/plain; charset=utf-8");
2602 let request = match state.auth.as_ref() {
2603 Some(auth) => request.set("Authorization", auth),
2604 None => request,
2605 };
2606 let response_or_err =
2607 http_send_with_retries(request, bytes, *state.config.retry_timeout);
2608 match response_or_err {
2609 Ok(_response) => {
2610 }
2612 Err(ureq::Error::Status(http_status_code, response)) => {
2613 return Err(parse_http_error(http_status_code, response));
2614 }
2615 Err(ureq::Error::Transport(transport)) => {
2616 return Err(error::fmt!(
2617 SocketError,
2618 "Could not flush buffer: {}",
2619 transport
2620 ));
2621 }
2622 }
2623 }
2624 }
2625 Ok(())
2626 }
2627
2628 #[cfg(feature = "ilp-over-http")]
2645 pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2646 self.flush_impl(buf, transactional)
2647 }
2648
2649 pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> {
2655 self.flush_impl(buf, false)
2656 }
2657
2658 pub fn flush(&mut self, buf: &mut Buffer) -> Result<()> {
2681 self.flush_impl(buf, false)?;
2682 buf.clear();
2683 Ok(())
2684 }
2685
2686 pub fn must_close(&self) -> bool {
2692 !self.connected
2693 }
2694}
2695
2696mod conf;
2697mod timestamp;
2698
2699#[cfg(feature = "ilp-over-http")]
2700mod http;
2701
2702#[cfg(feature = "ilp-over-http")]
2703use http::*;
2704
2705#[cfg(test)]
2706mod tests;