1#![doc = include_str!("mod.md")]
26
27pub use self::timestamp::*;
28
29use crate::error::{self, Error, Result};
30use crate::gai;
31use crate::ingress::conf::ConfigSetting;
32use core::time::Duration;
33use std::collections::HashMap;
34use std::convert::Infallible;
35use std::fmt::{Debug, Display, Formatter, Write};
36use std::io::{self, BufRead, BufReader, ErrorKind, Write as IoWrite};
37use std::ops::Deref;
38use std::path::PathBuf;
39use std::str::FromStr;
40use std::sync::Arc;
41
42use base64ct::{Base64, Base64UrlUnpadded, Encoding};
43use ring::rand::SystemRandom;
44use ring::signature::{EcdsaKeyPair, ECDSA_P256_SHA256_FIXED_SIGNING};
45use rustls::{ClientConnection, RootCertStore, StreamOwned};
46use rustls_pki_types::ServerName;
47use socket2::{Domain, Protocol as SockProtocol, SockAddr, Socket, Type};
48
49#[derive(Debug, Copy, Clone)]
50enum Op {
51 Table = 1,
52 Symbol = 1 << 1,
53 Column = 1 << 2,
54 At = 1 << 3,
55 Flush = 1 << 4,
56}
57
58impl Op {
59 fn descr(self) -> &'static str {
60 match self {
61 Op::Table => "table",
62 Op::Symbol => "symbol",
63 Op::Column => "column",
64 Op::At => "at",
65 Op::Flush => "flush",
66 }
67 }
68}
69
70fn map_io_to_socket_err(prefix: &str, io_err: io::Error) -> Error {
71 error::fmt!(SocketError, "{}{}", prefix, io_err)
72}
73
74#[derive(Clone, Copy)]
81pub struct TableName<'a> {
82 name: &'a str,
83}
84
85impl<'a> TableName<'a> {
86 pub fn new(name: &'a str) -> Result<Self> {
88 if name.is_empty() {
89 return Err(error::fmt!(
90 InvalidName,
91 "Table names must have a non-zero length."
92 ));
93 }
94
95 let mut prev = '\0';
96 for (index, c) in name.chars().enumerate() {
97 match c {
98 '.' => {
99 if index == 0 || index == name.len() - 1 || prev == '.' {
100 return Err(error::fmt!(
101 InvalidName,
102 concat!("Bad string {:?}: ", "Found invalid dot `.` at position {}."),
103 name,
104 index
105 ));
106 }
107 }
108 '?' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '*' | '%' | '~'
109 | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}' | '\u{0004}'
110 | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}' | '\u{000b}'
111 | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
112 return Err(error::fmt!(
113 InvalidName,
114 concat!(
115 "Bad string {:?}: ",
116 "Table names can't contain ",
117 "a {:?} character, which was found at ",
118 "byte position {}."
119 ),
120 name,
121 c,
122 index
123 ));
124 }
125 '\u{feff}' => {
126 return Err(error::fmt!(
129 InvalidName,
130 concat!(
131 "Bad string {:?}: ",
132 "Table names can't contain ",
133 "a UTF-8 BOM character, which was found at ",
134 "byte position {}."
135 ),
136 name,
137 index
138 ));
139 }
140 _ => (),
141 }
142 prev = c;
143 }
144
145 Ok(Self { name })
146 }
147
148 pub fn new_unchecked(name: &'a str) -> Self {
155 Self { name }
156 }
157}
158
159#[derive(Clone, Copy)]
166pub struct ColumnName<'a> {
167 name: &'a str,
168}
169
170impl<'a> ColumnName<'a> {
171 pub fn new(name: &'a str) -> Result<Self> {
173 if name.is_empty() {
174 return Err(error::fmt!(
175 InvalidName,
176 "Column names must have a non-zero length."
177 ));
178 }
179
180 for (index, c) in name.chars().enumerate() {
181 match c {
182 '?' | '.' | ',' | '\'' | '\"' | '\\' | '/' | ':' | ')' | '(' | '+' | '-' | '*'
183 | '%' | '~' | '\r' | '\n' | '\0' | '\u{0001}' | '\u{0002}' | '\u{0003}'
184 | '\u{0004}' | '\u{0005}' | '\u{0006}' | '\u{0007}' | '\u{0008}' | '\u{0009}'
185 | '\u{000b}' | '\u{000c}' | '\u{000e}' | '\u{000f}' | '\u{007f}' => {
186 return Err(error::fmt!(
187 InvalidName,
188 concat!(
189 "Bad string {:?}: ",
190 "Column names can't contain ",
191 "a {:?} character, which was found at ",
192 "byte position {}."
193 ),
194 name,
195 c,
196 index
197 ));
198 }
199 '\u{FEFF}' => {
200 return Err(error::fmt!(
203 InvalidName,
204 concat!(
205 "Bad string {:?}: ",
206 "Column names can't contain ",
207 "a UTF-8 BOM character, which was found at ",
208 "byte position {}."
209 ),
210 name,
211 index
212 ));
213 }
214 _ => (),
215 }
216 }
217
218 Ok(Self { name })
219 }
220
221 pub fn new_unchecked(name: &'a str) -> Self {
228 Self { name }
229 }
230}
231
232impl<'a> TryFrom<&'a str> for TableName<'a> {
233 type Error = self::Error;
234
235 fn try_from(name: &'a str) -> Result<Self> {
236 Self::new(name)
237 }
238}
239
240impl<'a> TryFrom<&'a str> for ColumnName<'a> {
241 type Error = self::Error;
242
243 fn try_from(name: &'a str) -> Result<Self> {
244 Self::new(name)
245 }
246}
247
248impl From<Infallible> for Error {
249 fn from(_: Infallible) -> Self {
250 unreachable!()
251 }
252}
253
254fn write_escaped_impl<Q, C>(check_escape_fn: C, quoting_fn: Q, output: &mut String, s: &str)
255where
256 C: Fn(u8) -> bool,
257 Q: Fn(&mut Vec<u8>),
258{
259 let output_vec = unsafe { output.as_mut_vec() };
260 let mut to_escape = 0usize;
261 for b in s.bytes() {
262 if check_escape_fn(b) {
263 to_escape += 1;
264 }
265 }
266
267 quoting_fn(output_vec);
268
269 if to_escape == 0 {
270 output_vec.extend_from_slice(s.as_bytes());
272 } else {
273 let additional = s.len() + to_escape;
274 output_vec.reserve(additional);
275 let mut index = output_vec.len();
276 unsafe { output_vec.set_len(index + additional) };
277 for b in s.bytes() {
278 if check_escape_fn(b) {
279 unsafe {
280 *output_vec.get_unchecked_mut(index) = b'\\';
281 }
282 index += 1;
283 }
284
285 unsafe {
286 *output_vec.get_unchecked_mut(index) = b;
287 }
288 index += 1;
289 }
290 }
291
292 quoting_fn(output_vec);
293}
294
295fn must_escape_unquoted(c: u8) -> bool {
296 matches!(c, b' ' | b',' | b'=' | b'\n' | b'\r' | b'\\')
297}
298
299fn must_escape_quoted(c: u8) -> bool {
300 matches!(c, b'\n' | b'\r' | b'"' | b'\\')
301}
302
303fn write_escaped_unquoted(output: &mut String, s: &str) {
304 write_escaped_impl(must_escape_unquoted, |_output| (), output, s);
305}
306
307fn write_escaped_quoted(output: &mut String, s: &str) {
308 write_escaped_impl(must_escape_quoted, |output| output.push(b'"'), output, s)
309}
310
311enum Connection {
312 Direct(Socket),
313 Tls(Box<StreamOwned<ClientConnection, Socket>>),
314}
315
316impl Connection {
317 fn send_key_id(&mut self, key_id: &str) -> Result<()> {
318 writeln!(self, "{}", key_id)
319 .map_err(|io_err| map_io_to_socket_err("Failed to send key_id: ", io_err))?;
320 Ok(())
321 }
322
323 fn read_challenge(&mut self) -> Result<Vec<u8>> {
324 let mut buf = Vec::new();
325 let mut reader = BufReader::new(self);
326 reader.read_until(b'\n', &mut buf).map_err(|io_err| {
327 map_io_to_socket_err(
328 "Failed to read authentication challenge (timed out?): ",
329 io_err,
330 )
331 })?;
332 if buf.last().copied().unwrap_or(b'\0') != b'\n' {
333 return Err(if buf.is_empty() {
334 error::fmt!(
335 AuthError,
336 concat!(
337 "Did not receive auth challenge. ",
338 "Is the database configured to require ",
339 "authentication?"
340 )
341 )
342 } else {
343 error::fmt!(AuthError, "Received incomplete auth challenge: {:?}", buf)
344 });
345 }
346 buf.pop(); Ok(buf)
348 }
349
350 fn authenticate(&mut self, auth: &EcdsaAuthParams) -> Result<()> {
351 if auth.key_id.contains('\n') {
352 return Err(error::fmt!(
353 AuthError,
354 "Bad key id {:?}: Should not contain new-line char.",
355 auth.key_id
356 ));
357 }
358 let key_pair = parse_key_pair(auth)?;
359 self.send_key_id(auth.key_id.as_str())?;
360 let challenge = self.read_challenge()?;
361 let rng = SystemRandom::new();
362 let signature = key_pair
363 .sign(&rng, &challenge[..])
364 .map_err(|unspecified_err| {
365 error::fmt!(AuthError, "Failed to sign challenge: {}", unspecified_err)
366 })?;
367 let mut encoded_sig = Base64::encode_string(signature.as_ref());
368 encoded_sig.push('\n');
369 let buf = encoded_sig.as_bytes();
370 if let Err(io_err) = self.write_all(buf) {
371 return Err(map_io_to_socket_err(
372 "Could not send signed challenge: ",
373 io_err,
374 ));
375 }
376 Ok(())
377 }
378}
379
380enum ProtocolHandler {
381 Socket(Connection),
382
383 #[cfg(feature = "ilp-over-http")]
384 Http(HttpHandlerState),
385}
386
387impl io::Read for Connection {
388 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
389 match self {
390 Self::Direct(sock) => sock.read(buf),
391 Self::Tls(stream) => stream.read(buf),
392 }
393 }
394}
395
396impl io::Write for Connection {
397 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
398 match self {
399 Self::Direct(sock) => sock.write(buf),
400 Self::Tls(stream) => stream.write(buf),
401 }
402 }
403
404 fn flush(&mut self) -> io::Result<()> {
405 match self {
406 Self::Direct(sock) => sock.flush(),
407 Self::Tls(stream) => stream.flush(),
408 }
409 }
410}
411
412#[derive(Debug, Copy, Clone, PartialEq)]
413enum OpCase {
414 Init = Op::Table as isize,
415 TableWritten = Op::Symbol as isize | Op::Column as isize,
416 SymbolWritten = Op::Symbol as isize | Op::Column as isize | Op::At as isize,
417 ColumnWritten = Op::Column as isize | Op::At as isize,
418 MayFlushOrTable = Op::Flush as isize | Op::Table as isize,
419}
420
421impl OpCase {
422 fn next_op_descr(self) -> &'static str {
423 match self {
424 OpCase::Init => "should have called `table` instead",
425 OpCase::TableWritten => "should have called `symbol` or `column` instead",
426 OpCase::SymbolWritten => "should have called `symbol`, `column` or `at` instead",
427 OpCase::ColumnWritten => "should have called `column` or `at` instead",
428 OpCase::MayFlushOrTable => "should have called `flush` or `table` instead",
429 }
430 }
431}
432
433#[derive(Debug, Clone)]
434struct BufferState {
435 op_case: OpCase,
436 row_count: usize,
437 first_table: Option<String>,
438 transactional: bool,
439}
440
441impl BufferState {
442 fn new() -> Self {
443 Self {
444 op_case: OpCase::Init,
445 row_count: 0,
446 first_table: None,
447 transactional: true,
448 }
449 }
450
451 fn clear(&mut self) {
452 self.op_case = OpCase::Init;
453 self.row_count = 0;
454 self.first_table = None;
455 self.transactional = true;
456 }
457}
458
459#[derive(Debug, Clone)]
545pub struct Buffer {
546 output: String,
547 state: BufferState,
548 marker: Option<(usize, BufferState)>,
549 max_name_len: usize,
550}
551
552impl Buffer {
553 pub fn new() -> Self {
556 Self {
557 output: String::new(),
558 state: BufferState::new(),
559 marker: None,
560 max_name_len: 127,
561 }
562 }
563
564 pub fn with_max_name_len(max_name_len: usize) -> Self {
572 let mut buf = Self::new();
573 buf.max_name_len = max_name_len;
574 buf
575 }
576
577 pub fn reserve(&mut self, additional: usize) {
582 self.output.reserve(additional);
583 }
584
585 pub fn len(&self) -> usize {
587 self.output.len()
588 }
589
590 pub fn row_count(&self) -> usize {
592 self.state.row_count
593 }
594
595 pub fn transactional(&self) -> bool {
599 self.state.transactional
600 }
601
602 pub fn is_empty(&self) -> bool {
603 self.output.is_empty()
604 }
605
606 pub fn capacity(&self) -> usize {
608 self.output.capacity()
609 }
610
611 pub fn as_str(&self) -> &str {
613 &self.output
614 }
615
616 pub fn set_marker(&mut self) -> Result<()> {
623 if (self.state.op_case as isize & Op::Table as isize) == 0 {
624 return Err(error::fmt!(
625 InvalidApiCall,
626 concat!(
627 "Can't set the marker whilst constructing a line. ",
628 "A marker may only be set on an empty buffer or after ",
629 "`at` or `at_now` is called."
630 )
631 ));
632 }
633 self.marker = Some((self.output.len(), self.state.clone()));
634 Ok(())
635 }
636
637 pub fn rewind_to_marker(&mut self) -> Result<()> {
642 if let Some((position, state)) = self.marker.take() {
643 self.output.truncate(position);
644 self.state = state;
645 Ok(())
646 } else {
647 Err(error::fmt!(
648 InvalidApiCall,
649 "Can't rewind to the marker: No marker set."
650 ))
651 }
652 }
653
654 pub fn clear_marker(&mut self) {
659 self.marker = None;
660 }
661
662 pub fn clear(&mut self) {
665 self.output.clear();
666 self.state.clear();
667 self.marker = None;
668 }
669
670 #[inline(always)]
672 fn check_op(&self, op: Op) -> Result<()> {
673 if (self.state.op_case as isize & op as isize) > 0 {
674 Ok(())
675 } else {
676 Err(error::fmt!(
677 InvalidApiCall,
678 "State error: Bad call to `{}`, {}.",
679 op.descr(),
680 self.state.op_case.next_op_descr()
681 ))
682 }
683 }
684
685 #[inline(always)]
686 fn validate_max_name_len(&self, name: &str) -> Result<()> {
687 if name.len() > self.max_name_len {
688 return Err(error::fmt!(
689 InvalidName,
690 "Bad name: {:?}: Too long (max {} characters)",
691 name,
692 self.max_name_len
693 ));
694 }
695 Ok(())
696 }
697
698 pub fn table<'a, N>(&mut self, name: N) -> Result<&mut Self>
725 where
726 N: TryInto<TableName<'a>>,
727 Error: From<N::Error>,
728 {
729 let name: TableName<'a> = name.try_into()?;
730 self.validate_max_name_len(name.name)?;
731 self.check_op(Op::Table)?;
732 write_escaped_unquoted(&mut self.output, name.name);
733 self.state.op_case = OpCase::TableWritten;
734
735 if let Some(first_table) = &self.state.first_table {
737 if first_table != name.name {
738 self.state.transactional = false;
739 }
740 } else {
741 self.state.first_table = Some(name.name.to_owned());
742 }
743 Ok(self)
744 }
745
746 pub fn symbol<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
791 where
792 N: TryInto<ColumnName<'a>>,
793 S: AsRef<str>,
794 Error: From<N::Error>,
795 {
796 let name: ColumnName<'a> = name.try_into()?;
797 self.validate_max_name_len(name.name)?;
798 self.check_op(Op::Symbol)?;
799 self.output.push(',');
800 write_escaped_unquoted(&mut self.output, name.name);
801 self.output.push('=');
802 write_escaped_unquoted(&mut self.output, value.as_ref());
803 self.state.op_case = OpCase::SymbolWritten;
804 Ok(self)
805 }
806
807 fn write_column_key<'a, N>(&mut self, name: N) -> Result<&mut Self>
808 where
809 N: TryInto<ColumnName<'a>>,
810 Error: From<N::Error>,
811 {
812 let name: ColumnName<'a> = name.try_into()?;
813 self.validate_max_name_len(name.name)?;
814 self.check_op(Op::Column)?;
815 self.output
816 .push(if (self.state.op_case as isize & Op::Symbol as isize) > 0 {
817 ' '
818 } else {
819 ','
820 });
821 write_escaped_unquoted(&mut self.output, name.name);
822 self.output.push('=');
823 self.state.op_case = OpCase::ColumnWritten;
824 Ok(self)
825 }
826
827 pub fn column_bool<'a, N>(&mut self, name: N, value: bool) -> Result<&mut Self>
856 where
857 N: TryInto<ColumnName<'a>>,
858 Error: From<N::Error>,
859 {
860 self.write_column_key(name)?;
861 self.output.push(if value { 't' } else { 'f' });
862 Ok(self)
863 }
864
865 pub fn column_i64<'a, N>(&mut self, name: N, value: i64) -> Result<&mut Self>
894 where
895 N: TryInto<ColumnName<'a>>,
896 Error: From<N::Error>,
897 {
898 self.write_column_key(name)?;
899 let mut buf = itoa::Buffer::new();
900 let printed = buf.format(value);
901 self.output.push_str(printed);
902 self.output.push('i');
903 Ok(self)
904 }
905
906 pub fn column_f64<'a, N>(&mut self, name: N, value: f64) -> Result<&mut Self>
935 where
936 N: TryInto<ColumnName<'a>>,
937 Error: From<N::Error>,
938 {
939 self.write_column_key(name)?;
940 let mut ser = F64Serializer::new(value);
941 self.output.push_str(ser.as_str());
942 Ok(self)
943 }
944
945 pub fn column_str<'a, N, S>(&mut self, name: N, value: S) -> Result<&mut Self>
988 where
989 N: TryInto<ColumnName<'a>>,
990 S: AsRef<str>,
991 Error: From<N::Error>,
992 {
993 self.write_column_key(name)?;
994 write_escaped_quoted(&mut self.output, value.as_ref());
995 Ok(self)
996 }
997
998 pub fn column_ts<'a, N, T>(&mut self, name: N, value: T) -> Result<&mut Self>
1051 where
1052 N: TryInto<ColumnName<'a>>,
1053 T: TryInto<Timestamp>,
1054 Error: From<N::Error>,
1055 Error: From<T::Error>,
1056 {
1057 self.write_column_key(name)?;
1058 let timestamp: Timestamp = value.try_into()?;
1059 let timestamp: TimestampMicros = timestamp.try_into()?;
1060 let mut buf = itoa::Buffer::new();
1061 let printed = buf.format(timestamp.as_i64());
1062 self.output.push_str(printed);
1063 self.output.push('t');
1064 Ok(self)
1065 }
1066
1067 pub fn at<T>(&mut self, timestamp: T) -> Result<()>
1104 where
1105 T: TryInto<Timestamp>,
1106 Error: From<T::Error>,
1107 {
1108 self.check_op(Op::At)?;
1109 let timestamp: Timestamp = timestamp.try_into()?;
1110
1111 let timestamp: Result<TimestampNanos> = timestamp.try_into();
1113 let timestamp: TimestampNanos = timestamp?;
1114
1115 let epoch_nanos = timestamp.as_i64();
1116 if epoch_nanos < 0 {
1117 return Err(error::fmt!(
1118 InvalidTimestamp,
1119 "Timestamp {} is negative. It must be >= 0.",
1120 epoch_nanos
1121 ));
1122 }
1123 let mut buf = itoa::Buffer::new();
1124 let printed = buf.format(epoch_nanos);
1125 self.output.push(' ');
1126 self.output.push_str(printed);
1127 self.output.push('\n');
1128 self.state.op_case = OpCase::MayFlushOrTable;
1129 self.state.row_count += 1;
1130 Ok(())
1131 }
1132
1133 pub fn at_now(&mut self) -> Result<()> {
1162 self.check_op(Op::At)?;
1163 self.output.push('\n');
1164 self.state.op_case = OpCase::MayFlushOrTable;
1165 self.state.row_count += 1;
1166 Ok(())
1167 }
1168}
1169
1170impl Default for Buffer {
1171 fn default() -> Self {
1172 Self::new()
1173 }
1174}
1175
1176pub struct Sender {
1182 descr: String,
1183 handler: ProtocolHandler,
1184 connected: bool,
1185 max_buf_size: usize,
1186}
1187
1188impl std::fmt::Debug for Sender {
1189 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1190 f.write_str(self.descr.as_str())
1191 }
1192}
1193
1194#[derive(PartialEq, Debug, Clone)]
1195struct EcdsaAuthParams {
1196 key_id: String,
1197 priv_key: String,
1198 pub_key_x: String,
1199 pub_key_y: String,
1200}
1201
1202#[derive(PartialEq, Debug, Clone)]
1203enum AuthParams {
1204 Ecdsa(EcdsaAuthParams),
1205
1206 #[cfg(feature = "ilp-over-http")]
1207 Basic(BasicAuthParams),
1208
1209 #[cfg(feature = "ilp-over-http")]
1210 Token(TokenAuthParams),
1211}
1212
1213#[derive(PartialEq, Debug, Clone, Copy)]
1216pub enum CertificateAuthority {
1217 #[cfg(feature = "tls-webpki-certs")]
1220 WebpkiRoots,
1221
1222 #[cfg(feature = "tls-native-certs")]
1224 OsRoots,
1225
1226 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1228 WebpkiAndOsRoots,
1229
1230 PemFile,
1232}
1233
1234pub struct Port(String);
1254
1255impl From<String> for Port {
1256 fn from(s: String) -> Self {
1257 Port(s)
1258 }
1259}
1260
1261impl From<&str> for Port {
1262 fn from(s: &str) -> Self {
1263 Port(s.to_owned())
1264 }
1265}
1266
1267impl From<u16> for Port {
1268 fn from(p: u16) -> Self {
1269 Port(p.to_string())
1270 }
1271}
1272
1273#[cfg(feature = "insecure-skip-verify")]
1274mod danger {
1275 use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier};
1276 use rustls::{DigitallySignedStruct, Error, SignatureScheme};
1277 use rustls_pki_types::{CertificateDer, ServerName, UnixTime};
1278
1279 #[derive(Debug)]
1280 pub struct NoCertificateVerification {}
1281
1282 impl ServerCertVerifier for NoCertificateVerification {
1283 fn verify_server_cert(
1284 &self,
1285 _end_entity: &CertificateDer<'_>,
1286 _intermediates: &[CertificateDer<'_>],
1287 _server_name: &ServerName<'_>,
1288 _ocsp_response: &[u8],
1289 _now: UnixTime,
1290 ) -> Result<ServerCertVerified, Error> {
1291 Ok(ServerCertVerified::assertion())
1292 }
1293
1294 fn verify_tls12_signature(
1295 &self,
1296 _message: &[u8],
1297 _cert: &CertificateDer<'_>,
1298 _dss: &DigitallySignedStruct,
1299 ) -> Result<HandshakeSignatureValid, Error> {
1300 Ok(HandshakeSignatureValid::assertion())
1301 }
1302
1303 fn verify_tls13_signature(
1304 &self,
1305 _message: &[u8],
1306 _cert: &CertificateDer<'_>,
1307 _dss: &DigitallySignedStruct,
1308 ) -> Result<HandshakeSignatureValid, Error> {
1309 Ok(HandshakeSignatureValid::assertion())
1310 }
1311
1312 fn supported_verify_schemes(&self) -> Vec<SignatureScheme> {
1313 rustls::crypto::ring::default_provider()
1314 .signature_verification_algorithms
1315 .supported_schemes()
1316 }
1317 }
1318}
1319
1320#[cfg(feature = "tls-webpki-certs")]
1321fn add_webpki_roots(root_store: &mut RootCertStore) {
1322 root_store
1323 .roots
1324 .extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned())
1325}
1326
1327#[cfg(feature = "tls-native-certs")]
1328fn add_os_roots(root_store: &mut RootCertStore) -> Result<()> {
1329 let os_certs = rustls_native_certs::load_native_certs().map_err(|io_err| {
1330 error::fmt!(
1331 TlsError,
1332 "Could not load OS native TLS certificates: {}",
1333 io_err
1334 )
1335 })?;
1336
1337 let (valid_count, invalid_count) = root_store.add_parsable_certificates(os_certs);
1338 if valid_count == 0 && invalid_count > 0 {
1339 return Err(error::fmt!(
1340 TlsError,
1341 "No valid certificates found in native root store ({} found but were invalid)",
1342 invalid_count
1343 ));
1344 }
1345 Ok(())
1346}
1347
1348fn configure_tls(
1349 tls_enabled: bool,
1350 tls_verify: bool,
1351 tls_ca: CertificateAuthority,
1352 tls_roots: &Option<PathBuf>,
1353) -> Result<Option<Arc<rustls::ClientConfig>>> {
1354 if !tls_enabled {
1355 return Ok(None);
1356 }
1357
1358 let mut root_store = RootCertStore::empty();
1359 if tls_verify {
1360 match (tls_ca, tls_roots) {
1361 #[cfg(feature = "tls-webpki-certs")]
1362 (CertificateAuthority::WebpkiRoots, None) => {
1363 add_webpki_roots(&mut root_store);
1364 }
1365
1366 #[cfg(feature = "tls-webpki-certs")]
1367 (CertificateAuthority::WebpkiRoots, Some(_)) => {
1368 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_roots\"."));
1369 }
1370
1371 #[cfg(feature = "tls-native-certs")]
1372 (CertificateAuthority::OsRoots, None) => {
1373 add_os_roots(&mut root_store)?;
1374 }
1375
1376 #[cfg(feature = "tls-native-certs")]
1377 (CertificateAuthority::OsRoots, Some(_)) => {
1378 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"os_roots\"."));
1379 }
1380
1381 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1382 (CertificateAuthority::WebpkiAndOsRoots, None) => {
1383 add_webpki_roots(&mut root_store);
1384 add_os_roots(&mut root_store)?;
1385 }
1386
1387 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1388 (CertificateAuthority::WebpkiAndOsRoots, Some(_)) => {
1389 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" must be unset when \"tls_ca\" is set to \"webpki_and_os_roots\"."));
1390 }
1391
1392 (CertificateAuthority::PemFile, Some(ca_file)) => {
1393 let certfile = std::fs::File::open(ca_file).map_err(|io_err| {
1394 error::fmt!(
1395 TlsError,
1396 concat!(
1397 "Could not open tls_roots certificate authority ",
1398 "file from path {:?}: {}"
1399 ),
1400 ca_file,
1401 io_err
1402 )
1403 })?;
1404 let mut reader = BufReader::new(certfile);
1405 let der_certs = rustls_pemfile::certs(&mut reader)
1406 .collect::<std::result::Result<Vec<_>, _>>()
1407 .map_err(|io_err| {
1408 error::fmt!(
1409 TlsError,
1410 concat!(
1411 "Could not read certificate authority ",
1412 "file from path {:?}: {}"
1413 ),
1414 ca_file,
1415 io_err
1416 )
1417 })?;
1418 root_store.add_parsable_certificates(der_certs);
1419 }
1420
1421 (CertificateAuthority::PemFile, None) => {
1422 return Err(error::fmt!(ConfigError, "Config parameter \"tls_roots\" is required when \"tls_ca\" is set to \"pem_file\"."));
1423 }
1424 }
1425 }
1426
1427 let mut config = rustls::ClientConfig::builder()
1428 .with_root_certificates(root_store)
1429 .with_no_client_auth();
1430
1431 config.key_log = Arc::new(rustls::KeyLogFile::new());
1434
1435 #[cfg(feature = "insecure-skip-verify")]
1436 if !tls_verify {
1437 config
1438 .dangerous()
1439 .set_certificate_verifier(Arc::new(danger::NoCertificateVerification {}));
1440 }
1441
1442 Ok(Some(Arc::new(config)))
1443}
1444
1445fn validate_auto_flush_params(params: &HashMap<String, String>) -> Result<()> {
1446 if let Some(auto_flush) = params.get("auto_flush") {
1447 if auto_flush.as_str() != "off" {
1448 return Err(error::fmt!(
1449 ConfigError,
1450 "Invalid auto_flush value '{auto_flush}'. This client does not \
1451 support auto-flush, so the only accepted value is 'off'"
1452 ));
1453 }
1454 }
1455
1456 for ¶m in ["auto_flush_rows", "auto_flush_bytes"].iter() {
1457 if params.contains_key(param) {
1458 return Err(error::fmt!(
1459 ConfigError,
1460 "Invalid configuration parameter {:?}. This client does not support auto-flush",
1461 param
1462 ));
1463 }
1464 }
1465 Ok(())
1466}
1467
1468#[derive(PartialEq, Debug, Clone, Copy)]
1470pub enum Protocol {
1471 Tcp,
1473
1474 Tcps,
1476
1477 #[cfg(feature = "ilp-over-http")]
1478 Http,
1480
1481 #[cfg(feature = "ilp-over-http")]
1482 Https,
1484}
1485
1486impl Display for Protocol {
1487 fn fmt(&self, f: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
1488 f.write_str(self.schema())
1489 }
1490}
1491
1492impl Protocol {
1493 fn default_port(&self) -> &str {
1494 match self {
1495 Protocol::Tcp | Protocol::Tcps => "9009",
1496 #[cfg(feature = "ilp-over-http")]
1497 Protocol::Http | Protocol::Https => "9000",
1498 }
1499 }
1500
1501 fn tls_enabled(&self) -> bool {
1502 match self {
1503 Protocol::Tcp => false,
1504 Protocol::Tcps => true,
1505 #[cfg(feature = "ilp-over-http")]
1506 Protocol::Http => false,
1507 #[cfg(feature = "ilp-over-http")]
1508 Protocol::Https => true,
1509 }
1510 }
1511
1512 fn is_tcpx(&self) -> bool {
1513 match self {
1514 Protocol::Tcp => true,
1515 Protocol::Tcps => true,
1516 #[cfg(feature = "ilp-over-http")]
1517 Protocol::Http => false,
1518 #[cfg(feature = "ilp-over-http")]
1519 Protocol::Https => false,
1520 }
1521 }
1522
1523 #[cfg(feature = "ilp-over-http")]
1524 fn is_httpx(&self) -> bool {
1525 match self {
1526 Protocol::Tcp => false,
1527 Protocol::Tcps => false,
1528 Protocol::Http => true,
1529 Protocol::Https => true,
1530 }
1531 }
1532
1533 fn schema(&self) -> &str {
1534 match self {
1535 Protocol::Tcp => "tcp",
1536 Protocol::Tcps => "tcps",
1537 #[cfg(feature = "ilp-over-http")]
1538 Protocol::Http => "http",
1539 #[cfg(feature = "ilp-over-http")]
1540 Protocol::Https => "https",
1541 }
1542 }
1543
1544 fn from_schema(schema: &str) -> Result<Self> {
1545 match schema {
1546 "tcp" => Ok(Protocol::Tcp),
1547 "tcps" => Ok(Protocol::Tcps),
1548 #[cfg(feature = "ilp-over-http")]
1549 "http" => Ok(Protocol::Http),
1550 #[cfg(feature = "ilp-over-http")]
1551 "https" => Ok(Protocol::Https),
1552 _ => Err(error::fmt!(ConfigError, "Unsupported protocol: {}", schema)),
1553 }
1554 }
1555}
1556
1557#[cfg_attr(
1563 feature = "ilp-over-http",
1564 doc = r##"
1565```no_run
1566# use questdb::Result;
1567use questdb::ingress::{Protocol, SenderBuilder};
1568# fn main() -> Result<()> {
1569let mut sender = SenderBuilder::new(Protocol::Http, "localhost", 9009).build()?;
1570# Ok(())
1571# }
1572```
1573"##
1574)]
1575#[derive(Debug, Clone)]
1608pub struct SenderBuilder {
1609 protocol: Protocol,
1610 host: ConfigSetting<String>,
1611 port: ConfigSetting<String>,
1612 net_interface: ConfigSetting<Option<String>>,
1613 max_buf_size: ConfigSetting<usize>,
1614 auth_timeout: ConfigSetting<Duration>,
1615 username: ConfigSetting<Option<String>>,
1616 password: ConfigSetting<Option<String>>,
1617 token: ConfigSetting<Option<String>>,
1618 token_x: ConfigSetting<Option<String>>,
1619 token_y: ConfigSetting<Option<String>>,
1620
1621 #[cfg(feature = "insecure-skip-verify")]
1622 tls_verify: ConfigSetting<bool>,
1623
1624 tls_ca: ConfigSetting<CertificateAuthority>,
1625 tls_roots: ConfigSetting<Option<PathBuf>>,
1626
1627 #[cfg(feature = "ilp-over-http")]
1628 http: Option<HttpConfig>,
1629}
1630
1631impl SenderBuilder {
1632 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
1659 let conf = conf.as_ref();
1660 let conf = questdb_confstr::parse_conf_str(conf)
1661 .map_err(|e| error::fmt!(ConfigError, "Config parse error: {}", e))?;
1662 let service = conf.service();
1663 let params = conf.params();
1664
1665 let protocol = Protocol::from_schema(service)?;
1666
1667 let Some(addr) = params.get("addr") else {
1668 return Err(error::fmt!(
1669 ConfigError,
1670 "Missing \"addr\" parameter in config string"
1671 ));
1672 };
1673 let (host, port) = match addr.split_once(':') {
1674 Some((h, p)) => (h, p),
1675 None => (addr.as_str(), protocol.default_port()),
1676 };
1677 let mut builder = SenderBuilder::new(protocol, host, port);
1678
1679 validate_auto_flush_params(params)?;
1680
1681 for (key, val) in params.iter().map(|(k, v)| (k.as_str(), v.as_str())) {
1682 builder = match key {
1683 "username" => builder.username(val)?,
1684 "password" => builder.password(val)?,
1685 "token" => builder.token(val)?,
1686 "token_x" => builder.token_x(val)?,
1687 "token_y" => builder.token_y(val)?,
1688 "bind_interface" => builder.bind_interface(val)?,
1689
1690 "init_buf_size" => {
1691 return Err(error::fmt!(
1692 ConfigError,
1693 "\"init_buf_size\" is not supported in config string"
1694 ))
1695 }
1696
1697 "max_buf_size" => builder.max_buf_size(parse_conf_value(key, val)?)?,
1698
1699 "auth_timeout" => {
1700 builder.auth_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1701 }
1702
1703 "tls_verify" => {
1704 let verify = match val {
1705 "on" => true,
1706 "unsafe_off" => false,
1707 _ => {
1708 return Err(error::fmt!(
1709 ConfigError,
1710 r##"Config parameter "tls_verify" must be either "on" or "unsafe_off".'"##,
1711 ))
1712 }
1713 };
1714
1715 #[cfg(not(feature = "insecure-skip-verify"))]
1716 {
1717 if !verify {
1718 return Err(error::fmt!(
1719 ConfigError,
1720 r##"The "insecure-skip-verify" feature is not enabled, so "tls_verify=unsafe_off" is not supported"##,
1721 ));
1722 }
1723 builder
1724 }
1725
1726 #[cfg(feature = "insecure-skip-verify")]
1727 builder.tls_verify(verify)?
1728 }
1729
1730 "tls_ca" => {
1731 let ca = match val {
1732 #[cfg(feature = "tls-webpki-certs")]
1733 "webpki_roots" => CertificateAuthority::WebpkiRoots,
1734
1735 #[cfg(not(feature = "tls-webpki-certs"))]
1736 "webpki_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_roots\" requires the \"tls-webpki-certs\" feature")),
1737
1738 #[cfg(feature = "tls-native-certs")]
1739 "os_roots" => CertificateAuthority::OsRoots,
1740
1741 #[cfg(not(feature = "tls-native-certs"))]
1742 "os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=os_roots\" requires the \"tls-native-certs\" feature")),
1743
1744 #[cfg(all(feature = "tls-webpki-certs", feature = "tls-native-certs"))]
1745 "webpki_and_os_roots" => CertificateAuthority::WebpkiAndOsRoots,
1746
1747 #[cfg(not(all(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1748 "webpki_and_os_roots" => return Err(error::fmt!(ConfigError, "Config parameter \"tls_ca=webpki_and_os_roots\" requires both the \"tls-webpki-certs\" and \"tls-native-certs\" features")),
1749
1750 _ => return Err(error::fmt!(ConfigError, "Invalid value {val:?} for \"tls_ca\"")),
1751 };
1752 builder.tls_ca(ca)?
1753 }
1754
1755 "tls_roots" => {
1756 let path = PathBuf::from_str(val).map_err(|e| {
1757 error::fmt!(
1758 ConfigError,
1759 "Invalid path {:?} for \"tls_roots\": {}",
1760 val,
1761 e
1762 )
1763 })?;
1764 builder.tls_roots(path)?
1765 }
1766
1767 "tls_roots_password" => {
1768 return Err(error::fmt!(
1769 ConfigError,
1770 "\"tls_roots_password\" is not supported."
1771 ))
1772 }
1773
1774 #[cfg(feature = "ilp-over-http")]
1775 "request_min_throughput" => {
1776 builder.request_min_throughput(parse_conf_value(key, val)?)?
1777 }
1778
1779 #[cfg(feature = "ilp-over-http")]
1780 "request_timeout" => {
1781 builder.request_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1782 }
1783
1784 #[cfg(feature = "ilp-over-http")]
1785 "retry_timeout" => {
1786 builder.retry_timeout(Duration::from_millis(parse_conf_value(key, val)?))?
1787 }
1788 _ => builder,
1793 };
1794 }
1795
1796 Ok(builder)
1797 }
1798
1799 pub fn from_env() -> Result<Self> {
1804 let conf = std::env::var("QDB_CLIENT_CONF").map_err(|_| {
1805 error::fmt!(ConfigError, "Environment variable QDB_CLIENT_CONF not set.")
1806 })?;
1807 Self::from_conf(conf)
1808 }
1809
1810 pub fn new<H: Into<String>, P: Into<Port>>(protocol: Protocol, host: H, port: P) -> Self {
1824 let host = host.into();
1825 let port: Port = port.into();
1826 let port = port.0;
1827
1828 #[cfg(feature = "tls-webpki-certs")]
1829 let tls_ca = CertificateAuthority::WebpkiRoots;
1830
1831 #[cfg(all(not(feature = "tls-webpki-certs"), feature = "tls-native-certs"))]
1832 let tls_ca = CertificateAuthority::OsRoots;
1833
1834 #[cfg(not(any(feature = "tls-webpki-certs", feature = "tls-native-certs")))]
1835 let tls_ca = CertificateAuthority::PemFile;
1836
1837 Self {
1838 protocol,
1839 host: ConfigSetting::new_specified(host),
1840 port: ConfigSetting::new_specified(port),
1841 net_interface: ConfigSetting::new_default(None),
1842 max_buf_size: ConfigSetting::new_default(100 * 1024 * 1024),
1843 auth_timeout: ConfigSetting::new_default(Duration::from_secs(15)),
1844 username: ConfigSetting::new_default(None),
1845 password: ConfigSetting::new_default(None),
1846 token: ConfigSetting::new_default(None),
1847 token_x: ConfigSetting::new_default(None),
1848 token_y: ConfigSetting::new_default(None),
1849
1850 #[cfg(feature = "insecure-skip-verify")]
1851 tls_verify: ConfigSetting::new_default(true),
1852
1853 tls_ca: ConfigSetting::new_default(tls_ca),
1854 tls_roots: ConfigSetting::new_default(None),
1855
1856 #[cfg(feature = "ilp-over-http")]
1857 http: if protocol.is_httpx() {
1858 Some(HttpConfig::default())
1859 } else {
1860 None
1861 },
1862 }
1863 }
1864
1865 pub fn bind_interface<I: Into<String>>(mut self, addr: I) -> Result<Self> {
1871 self.ensure_is_tcpx("bind_interface")?;
1872 self.net_interface
1873 .set_specified("bind_interface", Some(validate_value(addr.into())?))?;
1874 Ok(self)
1875 }
1876
1877 pub fn username(mut self, username: &str) -> Result<Self> {
1886 self.username
1887 .set_specified("username", Some(validate_value(username.to_string())?))?;
1888 Ok(self)
1889 }
1890
1891 pub fn password(mut self, password: &str) -> Result<Self> {
1894 self.password
1895 .set_specified("password", Some(validate_value(password.to_string())?))?;
1896 Ok(self)
1897 }
1898
1899 pub fn token(mut self, token: &str) -> Result<Self> {
1902 self.token
1903 .set_specified("token", Some(validate_value(token.to_string())?))?;
1904 Ok(self)
1905 }
1906
1907 pub fn token_x(mut self, token_x: &str) -> Result<Self> {
1909 self.token_x
1910 .set_specified("token_x", Some(validate_value(token_x.to_string())?))?;
1911 Ok(self)
1912 }
1913
1914 pub fn token_y(mut self, token_y: &str) -> Result<Self> {
1916 self.token_y
1917 .set_specified("token_y", Some(validate_value(token_y.to_string())?))?;
1918 Ok(self)
1919 }
1920
1921 pub fn auth_timeout(mut self, value: Duration) -> Result<Self> {
1925 self.auth_timeout.set_specified("auth_timeout", value)?;
1926 Ok(self)
1927 }
1928
1929 pub fn ensure_tls_enabled(&self, property: &str) -> Result<()> {
1931 if !self.protocol.tls_enabled() {
1932 return Err(error::fmt!(
1933 ConfigError,
1934 "Cannot set {property:?}: TLS is not supported for protocol {}",
1935 self.protocol
1936 ));
1937 }
1938 Ok(())
1939 }
1940
1941 #[cfg(feature = "insecure-skip-verify")]
1947 pub fn tls_verify(mut self, verify: bool) -> Result<Self> {
1948 self.ensure_tls_enabled("tls_verify")?;
1949 self.tls_verify.set_specified("tls_verify", verify)?;
1950 Ok(self)
1951 }
1952
1953 pub fn tls_ca(mut self, ca: CertificateAuthority) -> Result<Self> {
1956 self.ensure_tls_enabled("tls_ca")?;
1957 self.tls_ca.set_specified("tls_ca", ca)?;
1958 Ok(self)
1959 }
1960
1961 pub fn tls_roots<P: Into<PathBuf>>(self, path: P) -> Result<Self> {
1967 let mut builder = self.tls_ca(CertificateAuthority::PemFile)?;
1968 let path = path.into();
1969 let _file = std::fs::File::open(&path).map_err(|io_err| {
1971 error::fmt!(
1972 ConfigError,
1973 "Could not open root certificate file from path {:?}: {}",
1974 path,
1975 io_err
1976 )
1977 })?;
1978 builder.tls_roots.set_specified("tls_roots", Some(path))?;
1979 Ok(builder)
1980 }
1981
1982 pub fn max_buf_size(mut self, value: usize) -> Result<Self> {
1985 let min = 1024;
1986 if value < min {
1987 return Err(error::fmt!(
1988 ConfigError,
1989 "max_buf_size\" must be at least {min} bytes."
1990 ));
1991 }
1992 self.max_buf_size.set_specified("max_buf_size", value)?;
1993 Ok(self)
1994 }
1995
1996 #[cfg(feature = "ilp-over-http")]
1997 pub fn retry_timeout(mut self, value: Duration) -> Result<Self> {
2000 if let Some(http) = &mut self.http {
2001 http.retry_timeout.set_specified("retry_timeout", value)?;
2002 } else {
2003 return Err(error::fmt!(
2004 ConfigError,
2005 "retry_timeout is supported only in ILP over HTTP."
2006 ));
2007 }
2008 Ok(self)
2009 }
2010
2011 #[cfg(feature = "ilp-over-http")]
2012 pub fn request_min_throughput(mut self, value: u64) -> Result<Self> {
2022 if let Some(http) = &mut self.http {
2023 http.request_min_throughput
2024 .set_specified("request_min_throughput", value)?;
2025 } else {
2026 return Err(error::fmt!(
2027 ConfigError,
2028 "\"request_min_throughput\" is supported only in ILP over HTTP."
2029 ));
2030 }
2031 Ok(self)
2032 }
2033
2034 #[cfg(feature = "ilp-over-http")]
2035 pub fn request_timeout(mut self, value: Duration) -> Result<Self> {
2040 if let Some(http) = &mut self.http {
2041 if value.is_zero() {
2042 return Err(error::fmt!(
2043 ConfigError,
2044 "\"request_timeout\" must be greater than 0."
2045 ));
2046 }
2047 http.request_timeout
2048 .set_specified("request_timeout", value)?;
2049 } else {
2050 return Err(error::fmt!(
2051 ConfigError,
2052 "\"request_timeout\" is supported only in ILP over HTTP."
2053 ));
2054 }
2055 Ok(self)
2056 }
2057
2058 #[cfg(feature = "ilp-over-http")]
2059 #[doc(hidden)]
2063 pub fn user_agent(mut self, value: &str) -> Result<Self> {
2064 let value = validate_value(value)?;
2065 if let Some(http) = &mut self.http {
2066 http.user_agent = value.to_string();
2067 }
2068 Ok(self)
2069 }
2070
2071 fn connect_tcp(&self, auth: &Option<AuthParams>) -> Result<ProtocolHandler> {
2072 let addr: SockAddr = gai::resolve_host_port(self.host.as_str(), self.port.as_str())?;
2073 let mut sock = Socket::new(Domain::IPV4, Type::STREAM, Some(SockProtocol::TCP))
2074 .map_err(|io_err| map_io_to_socket_err("Could not open TCP socket: ", io_err))?;
2075
2076 sock.set_reuse_address(true)
2080 .map_err(|io_err| map_io_to_socket_err("Could not set SO_REUSEADDR: ", io_err))?;
2081
2082 sock.set_linger(Some(Duration::from_secs(120)))
2083 .map_err(|io_err| map_io_to_socket_err("Could not set socket linger: ", io_err))?;
2084 sock.set_keepalive(true)
2085 .map_err(|io_err| map_io_to_socket_err("Could not set SO_KEEPALIVE: ", io_err))?;
2086 sock.set_nodelay(true)
2087 .map_err(|io_err| map_io_to_socket_err("Could not set TCP_NODELAY: ", io_err))?;
2088 if let Some(ref host) = self.net_interface.deref() {
2089 let bind_addr = gai::resolve_host(host.as_str())?;
2090 sock.bind(&bind_addr).map_err(|io_err| {
2091 map_io_to_socket_err(
2092 &format!("Could not bind to interface address {:?}: ", host),
2093 io_err,
2094 )
2095 })?;
2096 }
2097 sock.connect(&addr).map_err(|io_err| {
2098 let host_port = format!("{}:{}", self.host.deref(), *self.port);
2099 let prefix = format!("Could not connect to {:?}: ", host_port);
2100 map_io_to_socket_err(&prefix, io_err)
2101 })?;
2102
2103 sock.set_read_timeout(Some(*self.auth_timeout))
2108 .map_err(|io_err| {
2109 map_io_to_socket_err("Failed to set read timeout on socket: ", io_err)
2110 })?;
2111
2112 #[cfg(feature = "insecure-skip-verify")]
2113 let tls_verify = *self.tls_verify;
2114
2115 #[cfg(not(feature = "insecure-skip-verify"))]
2116 let tls_verify = true;
2117
2118 let mut conn = match configure_tls(
2119 self.protocol.tls_enabled(),
2120 tls_verify,
2121 *self.tls_ca,
2122 self.tls_roots.deref(),
2123 )? {
2124 Some(tls_config) => {
2125 let server_name: ServerName = ServerName::try_from(self.host.as_str())
2126 .map_err(|inv_dns_err| error::fmt!(TlsError, "Bad host: {}", inv_dns_err))?
2127 .to_owned();
2128 let mut tls_conn =
2129 ClientConnection::new(tls_config, server_name).map_err(|rustls_err| {
2130 error::fmt!(TlsError, "Could not create TLS client: {}", rustls_err)
2131 })?;
2132 while tls_conn.wants_write() || tls_conn.is_handshaking() {
2133 tls_conn.complete_io(&mut sock).map_err(|io_err| {
2134 if (io_err.kind() == ErrorKind::TimedOut)
2135 || (io_err.kind() == ErrorKind::WouldBlock)
2136 {
2137 error::fmt!(
2138 TlsError,
2139 concat!(
2140 "Failed to complete TLS handshake:",
2141 " Timed out waiting for server ",
2142 "response after {:?}."
2143 ),
2144 *self.auth_timeout
2145 )
2146 } else {
2147 error::fmt!(TlsError, "Failed to complete TLS handshake: {}", io_err)
2148 }
2149 })?;
2150 }
2151 Connection::Tls(StreamOwned::new(tls_conn, sock).into())
2152 }
2153 None => Connection::Direct(sock),
2154 };
2155
2156 if let Some(AuthParams::Ecdsa(auth)) = auth {
2157 conn.authenticate(auth)?;
2158 }
2159
2160 Ok(ProtocolHandler::Socket(conn))
2161 }
2162
2163 fn build_auth(&self) -> Result<Option<AuthParams>> {
2164 match (
2165 self.protocol,
2166 self.username.deref(),
2167 self.password.deref(),
2168 self.token.deref(),
2169 self.token_x.deref(),
2170 self.token_y.deref(),
2171 ) {
2172 (_, None, None, None, None, None) => Ok(None),
2173 (
2174 protocol,
2175 Some(username),
2176 None,
2177 Some(token),
2178 Some(token_x),
2179 Some(token_y),
2180 ) if protocol.is_tcpx() => Ok(Some(AuthParams::Ecdsa(EcdsaAuthParams {
2181 key_id: username.to_string(),
2182 priv_key: token.to_string(),
2183 pub_key_x: token_x.to_string(),
2184 pub_key_y: token_y.to_string(),
2185 }))),
2186 (protocol, Some(_username), Some(_password), None, None, None)
2187 if protocol.is_tcpx() => {
2188 Err(error::fmt!(ConfigError,
2189 r##"The "basic_auth" setting can only be used with the ILP/HTTP protocol."##,
2190 ))
2191 }
2192 (protocol, None, None, Some(_token), None, None)
2193 if protocol.is_tcpx() => {
2194 Err(error::fmt!(ConfigError, "Token authentication only be used with the ILP/HTTP protocol."))
2195 }
2196 (protocol, _username, None, _token, _token_x, _token_y)
2197 if protocol.is_tcpx() => {
2198 Err(error::fmt!(ConfigError,
2199 r##"Incomplete ECDSA authentication parameters. Specify either all or none of: "username", "token", "token_x", "token_y"."##,
2200 ))
2201 }
2202 #[cfg(feature = "ilp-over-http")]
2203 (protocol, Some(username), Some(password), None, None, None)
2204 if protocol.is_httpx() => {
2205 Ok(Some(AuthParams::Basic(BasicAuthParams {
2206 username: username.to_string(),
2207 password: password.to_string(),
2208 })))
2209 }
2210 #[cfg(feature = "ilp-over-http")]
2211 (protocol, Some(_username), None, None, None, None)
2212 if protocol.is_httpx() => {
2213 Err(error::fmt!(ConfigError,
2214 r##"Basic authentication parameter "username" is present, but "password" is missing."##,
2215 ))
2216 }
2217 #[cfg(feature = "ilp-over-http")]
2218 (protocol, None, Some(_password), None, None, None)
2219 if protocol.is_httpx() => {
2220 Err(error::fmt!(ConfigError,
2221 r##"Basic authentication parameter "password" is present, but "username" is missing."##,
2222 ))
2223 }
2224 #[cfg(feature = "ilp-over-http")]
2225 (protocol, None, None, Some(token), None, None)
2226 if protocol.is_httpx() => {
2227 Ok(Some(AuthParams::Token(TokenAuthParams {
2228 token: token.to_string(),
2229 })))
2230 }
2231 #[cfg(feature = "ilp-over-http")]
2232 (
2233 protocol,
2234 Some(_username),
2235 None,
2236 Some(_token),
2237 Some(_token_x),
2238 Some(_token_y),
2239 ) if protocol.is_httpx() => {
2240 Err(error::fmt!(ConfigError, "ECDSA authentication is only available with ILP/TCP and not available with ILP/HTTP."))
2241 }
2242 #[cfg(feature = "ilp-over-http")]
2243 (protocol, _username, _password, _token, None, None)
2244 if protocol.is_httpx() => {
2245 Err(error::fmt!(ConfigError,
2246 r##"Inconsistent HTTP authentication parameters. Specify either "username" and "password", or just "token"."##,
2247 ))
2248 }
2249 _ => {
2250 Err(error::fmt!(ConfigError,
2251 r##"Incomplete authentication parameters. Check "username", "password", "token", "token_x" and "token_y" parameters are set correctly."##,
2252 ))
2253 }
2254 }
2255 }
2256
2257 pub fn build(&self) -> Result<Sender> {
2264 let mut descr = format!("Sender[host={:?},port={:?},", self.host, self.port);
2265
2266 if self.protocol.tls_enabled() {
2267 write!(descr, "tls=enabled,").unwrap();
2268 } else {
2269 write!(descr, "tls=disabled,").unwrap();
2270 }
2271
2272 let auth = self.build_auth()?;
2273
2274 let handler = match self.protocol {
2275 Protocol::Tcp | Protocol::Tcps => self.connect_tcp(&auth)?,
2276 #[cfg(feature = "ilp-over-http")]
2277 Protocol::Http | Protocol::Https => {
2278 if self.net_interface.is_some() {
2279 return Err(error::fmt!(
2281 InvalidApiCall,
2282 "net_interface is not supported for ILP over HTTP."
2283 ));
2284 }
2285
2286 let http_config = self.http.as_ref().unwrap();
2287 let user_agent = http_config.user_agent.as_str();
2288 let agent_builder = ureq::AgentBuilder::new()
2289 .user_agent(user_agent)
2290 .no_delay(true);
2291
2292 #[cfg(feature = "insecure-skip-verify")]
2293 let tls_verify = *self.tls_verify;
2294
2295 #[cfg(not(feature = "insecure-skip-verify"))]
2296 let tls_verify = true;
2297
2298 let agent_builder = match configure_tls(
2299 self.protocol.tls_enabled(),
2300 tls_verify,
2301 *self.tls_ca,
2302 self.tls_roots.deref(),
2303 )? {
2304 Some(tls_config) => agent_builder.tls_config(tls_config),
2305 None => agent_builder,
2306 };
2307 let auth = match auth {
2308 Some(AuthParams::Basic(ref auth)) => Some(auth.to_header_string()),
2309 Some(AuthParams::Token(ref auth)) => Some(auth.to_header_string()?),
2310 Some(AuthParams::Ecdsa(_)) => {
2311 return Err(error::fmt!(
2312 AuthError,
2313 "ECDSA authentication is not supported for ILP over HTTP. \
2314 Please use basic or token authentication instead."
2315 ));
2316 }
2317 None => None,
2318 };
2319 let agent_builder =
2320 agent_builder.timeout_connect(*http_config.request_timeout.deref());
2321 let agent = agent_builder.build();
2322 let proto = self.protocol.schema();
2323 let url = format!(
2324 "{}://{}:{}/write",
2325 proto,
2326 self.host.deref(),
2327 self.port.deref()
2328 );
2329 ProtocolHandler::Http(HttpHandlerState {
2330 agent,
2331 url,
2332 auth,
2333
2334 config: self.http.as_ref().unwrap().clone(),
2335 })
2336 }
2337 };
2338
2339 if auth.is_some() {
2340 descr.push_str("auth=on]");
2341 } else {
2342 descr.push_str("auth=off]");
2343 }
2344
2345 let sender = Sender {
2346 descr,
2347 handler,
2348 connected: true,
2349 max_buf_size: *self.max_buf_size,
2350 };
2351
2352 Ok(sender)
2353 }
2354
2355 fn ensure_is_tcpx(&mut self, param_name: &str) -> Result<()> {
2356 if self.protocol.is_tcpx() {
2357 Ok(())
2358 } else {
2359 Err(error::fmt!(
2360 ConfigError,
2361 "The {param_name:?} setting can only be used with the TCP protocol."
2362 ))
2363 }
2364 }
2365}
2366
2367fn validate_value<T: AsRef<str>>(value: T) -> Result<T> {
2370 let str_ref = value.as_ref();
2371 for (p, c) in str_ref.chars().enumerate() {
2372 if matches!(c, '\u{0}'..='\u{1f}' | '\u{7f}'..='\u{9f}') {
2373 return Err(error::fmt!(
2374 ConfigError,
2375 "Invalid character {c:?} at position {p}"
2376 ));
2377 }
2378 }
2379 Ok(value)
2380}
2381
2382fn parse_conf_value<T>(param_name: &str, str_value: &str) -> Result<T>
2383where
2384 T: FromStr,
2385 T::Err: std::fmt::Debug,
2386{
2387 str_value.parse().map_err(|e| {
2388 error::fmt!(
2389 ConfigError,
2390 "Could not parse {param_name:?} to number: {e:?}"
2391 )
2392 })
2393}
2394
2395fn b64_decode(descr: &'static str, buf: &str) -> Result<Vec<u8>> {
2396 Base64UrlUnpadded::decode_vec(buf).map_err(|b64_err| {
2397 error::fmt!(
2398 AuthError,
2399 "Misconfigured ILP authentication keys. Could not decode {}: {}. \
2400 Hint: Check the keys for a possible typo.",
2401 descr,
2402 b64_err
2403 )
2404 })
2405}
2406
2407fn parse_public_key(pub_key_x: &str, pub_key_y: &str) -> Result<Vec<u8>> {
2408 let mut pub_key_x = b64_decode("public key x", pub_key_x)?;
2409 let mut pub_key_y = b64_decode("public key y", pub_key_y)?;
2410
2411 let mut encoded = Vec::new();
2413 encoded.push(4u8); let pub_key_x_ken = pub_key_x.len();
2415 if pub_key_x_ken > 32 {
2416 return Err(error::fmt!(
2417 AuthError,
2418 "Misconfigured ILP authentication keys. Public key x is too long. \
2419 Hint: Check the keys for a possible typo."
2420 ));
2421 }
2422 let pub_key_y_len = pub_key_y.len();
2423 if pub_key_y_len > 32 {
2424 return Err(error::fmt!(
2425 AuthError,
2426 "Misconfigured ILP authentication keys. Public key y is too long. \
2427 Hint: Check the keys for a possible typo."
2428 ));
2429 }
2430 encoded.resize((32 - pub_key_x_ken) + 1, 0u8);
2431 encoded.append(&mut pub_key_x);
2432 encoded.resize((32 - pub_key_y_len) + 1 + 32, 0u8);
2433 encoded.append(&mut pub_key_y);
2434 Ok(encoded)
2435}
2436
2437fn parse_key_pair(auth: &EcdsaAuthParams) -> Result<EcdsaKeyPair> {
2438 let private_key = b64_decode("private authentication key", auth.priv_key.as_str())?;
2439 let public_key = parse_public_key(auth.pub_key_x.as_str(), auth.pub_key_y.as_str())?;
2440 let system_random = SystemRandom::new();
2441 EcdsaKeyPair::from_private_key_and_public_key(
2442 &ECDSA_P256_SHA256_FIXED_SIGNING,
2443 &private_key[..],
2444 &public_key[..],
2445 &system_random,
2446 )
2447 .map_err(|key_rejected| {
2448 error::fmt!(
2449 AuthError,
2450 "Misconfigured ILP authentication keys: {}. Hint: Check the keys for a possible typo.",
2451 key_rejected
2452 )
2453 })
2454}
2455
2456pub(crate) struct F64Serializer {
2457 buf: ryu::Buffer,
2458 n: f64,
2459}
2460
2461impl F64Serializer {
2462 pub(crate) fn new(n: f64) -> Self {
2463 F64Serializer {
2464 buf: ryu::Buffer::new(),
2465 n,
2466 }
2467 }
2468
2469 #[cold]
2471 #[cfg_attr(feature = "no-panic", inline)]
2472 fn format_nonfinite(&self) -> &'static str {
2473 const MANTISSA_MASK: u64 = 0x000fffffffffffff;
2474 const SIGN_MASK: u64 = 0x8000000000000000;
2475 let bits = self.n.to_bits();
2476 if bits & MANTISSA_MASK != 0 {
2477 "NaN"
2478 } else if bits & SIGN_MASK != 0 {
2479 "-Infinity"
2480 } else {
2481 "Infinity"
2482 }
2483 }
2484
2485 pub(crate) fn as_str(&mut self) -> &str {
2486 if self.n.is_finite() {
2487 self.buf.format_finite(self.n)
2488 } else {
2489 self.format_nonfinite()
2490 }
2491 }
2492}
2493
2494impl Sender {
2495 pub fn from_conf<T: AsRef<str>>(conf: T) -> Result<Self> {
2518 SenderBuilder::from_conf(conf)?.build()
2519 }
2520
2521 pub fn from_env() -> Result<Self> {
2530 SenderBuilder::from_env()?.build()
2531 }
2532
2533 #[allow(unused_variables)]
2534 fn flush_impl(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2535 if !self.connected {
2536 return Err(error::fmt!(
2537 SocketError,
2538 "Could not flush buffer: not connected to database."
2539 ));
2540 }
2541 buf.check_op(Op::Flush)?;
2542
2543 if buf.len() > self.max_buf_size {
2544 return Err(error::fmt!(
2545 InvalidApiCall,
2546 "Could not flush buffer: Buffer size of {} exceeds maximum configured allowed size of {} bytes.",
2547 buf.len(),
2548 self.max_buf_size
2549 ));
2550 }
2551
2552 let bytes = buf.as_str().as_bytes();
2553 if bytes.is_empty() {
2554 return Ok(());
2555 }
2556 match self.handler {
2557 ProtocolHandler::Socket(ref mut conn) => {
2558 if transactional {
2559 return Err(error::fmt!(
2560 InvalidApiCall,
2561 "Transactional flushes are not supported for ILP over TCP."
2562 ));
2563 }
2564 conn.write_all(bytes).map_err(|io_err| {
2565 self.connected = false;
2566 map_io_to_socket_err("Could not flush buffer: ", io_err)
2567 })?;
2568 }
2569 #[cfg(feature = "ilp-over-http")]
2570 ProtocolHandler::Http(ref state) => {
2571 if transactional && !buf.transactional() {
2572 return Err(error::fmt!(
2573 InvalidApiCall,
2574 "Buffer contains lines for multiple tables. \
2575 Transactional flushes are only supported for buffers containing lines for a single table."
2576 ));
2577 }
2578 let request_min_throughput = *state.config.request_min_throughput;
2579 let extra_time = if request_min_throughput > 0 {
2580 (bytes.len() as f64) / (request_min_throughput as f64)
2581 } else {
2582 0.0f64
2583 };
2584 let timeout = *state.config.request_timeout + Duration::from_secs_f64(extra_time);
2585 let request = state
2586 .agent
2587 .post(&state.url)
2588 .query_pairs([("precision", "n")])
2589 .timeout(timeout)
2590 .set("Content-Type", "text/plain; charset=utf-8");
2591 let request = match state.auth.as_ref() {
2592 Some(auth) => request.set("Authorization", auth),
2593 None => request,
2594 };
2595 let response_or_err =
2596 http_send_with_retries(request, bytes, *state.config.retry_timeout);
2597 match response_or_err {
2598 Ok(_response) => {
2599 }
2601 Err(ureq::Error::Status(http_status_code, response)) => {
2602 return Err(parse_http_error(http_status_code, response));
2603 }
2604 Err(ureq::Error::Transport(transport)) => {
2605 return Err(error::fmt!(
2606 SocketError,
2607 "Could not flush buffer: {}",
2608 transport
2609 ));
2610 }
2611 }
2612 }
2613 }
2614 Ok(())
2615 }
2616
2617 #[cfg(feature = "ilp-over-http")]
2634 pub fn flush_and_keep_with_flags(&mut self, buf: &Buffer, transactional: bool) -> Result<()> {
2635 self.flush_impl(buf, transactional)
2636 }
2637
2638 pub fn flush_and_keep(&mut self, buf: &Buffer) -> Result<()> {
2644 self.flush_impl(buf, false)
2645 }
2646
2647 pub fn flush(&mut self, buf: &mut Buffer) -> Result<()> {
2670 self.flush_impl(buf, false)?;
2671 buf.clear();
2672 Ok(())
2673 }
2674
2675 pub fn must_close(&self) -> bool {
2681 !self.connected
2682 }
2683}
2684
2685mod conf;
2686mod timestamp;
2687
2688#[cfg(feature = "ilp-over-http")]
2689mod http;
2690
2691#[cfg(feature = "ilp-over-http")]
2692use http::*;
2693
2694#[cfg(test)]
2695mod tests;