exasol/connection/http_transport/
config.rs

1//! HTTP Transport options for IMPORT and EXPORT.
2//!
3//! Defaults to 0 threads (meaning a thread will be created for all available Exasol nodes in the cluster),
4//! no compression while encryption is conditioned by the `native-tls` and `rustls` feature flags.
5
6use crossbeam::channel::Sender;
7use csv::Terminator;
8use std::fmt::{Display, Formatter};
9use std::sync::atomic::AtomicBool;
10use std::sync::{Arc, Barrier};
11use std::time::Duration;
12
13pub trait HttpTransportOpts {
14    fn num_threads(&self) -> usize;
15
16    fn encryption(&self) -> bool;
17
18    fn compression(&self) -> bool;
19
20    fn take_timeout(&mut self) -> Option<Duration>;
21
22    /// Sets the timeout of socket read and write operations.
23    /// The socket will error out of the timeout is exceeded.
24    fn set_timeout(&mut self, timeout: Option<Duration>);
25}
26
27/// Export options
28///
29/// # Defaults
30///
31/// num_threads: 0 -> this means a thread per node will be spawned
32/// compression: false
33/// encryption: *if encryption features are enabled true, else false*
34/// comment: None
35/// encoding: None -> database default will be used
36/// null: None -> by default NULL values turn to ""
37/// row_separator: `csv` crate's special Terminator::CRLF
38/// column_separator: ','
39/// column_delimiter: '"'
40/// timeout: 120 seconds
41/// with_column_names: true
42#[derive(Clone, Debug)]
43pub struct ExportOpts {
44    num_threads: usize,
45    compression: bool,
46    encryption: bool,
47    query: Option<String>,
48    table_name: Option<String>,
49    comment: Option<String>,
50    encoding: Option<String>,
51    null: Option<String>,
52    row_separator: Terminator,
53    column_separator: u8,
54    column_delimiter: u8,
55    timeout: Option<Duration>,
56    with_column_names: bool,
57}
58
59#[allow(clippy::derivable_impls)]
60impl Default for ExportOpts {
61    fn default() -> Self {
62        Self {
63            num_threads: 0,
64            compression: false,
65            encryption: cfg!(any(feature = "native-tls-basic", feature = "rustls")),
66            query: None,
67            table_name: None,
68            comment: None,
69            encoding: None,
70            null: None,
71            row_separator: Terminator::CRLF,
72            column_separator: b',',
73            column_delimiter: b'"',
74            timeout: Some(Duration::from_secs(120)),
75            with_column_names: true,
76        }
77    }
78}
79
80impl HttpTransportOpts for ExportOpts {
81    fn num_threads(&self) -> usize {
82        self.num_threads
83    }
84
85    fn encryption(&self) -> bool {
86        self.encryption
87    }
88
89    fn compression(&self) -> bool {
90        self.compression
91    }
92
93    fn take_timeout(&mut self) -> Option<Duration> {
94        self.timeout.take()
95    }
96
97    fn set_timeout(&mut self, timeout: Option<Duration>) {
98        self.timeout = timeout
99    }
100}
101
102impl ExportOpts {
103    pub fn new() -> Self {
104        Self::default()
105    }
106
107    pub fn set_encryption(&mut self, flag: bool) {
108        Self::validate_encryption(flag);
109        self.encryption = flag
110    }
111
112    pub fn set_compression(&mut self, flag: bool) {
113        Self::validate_compression(flag);
114        self.compression = flag
115    }
116
117    pub fn set_num_threads(&mut self, num: usize) {
118        self.num_threads = num
119    }
120
121    pub fn query(&self) -> Option<&str> {
122        self.query.as_deref()
123    }
124
125    /// Setting the query clears the table name
126    pub fn set_query<T>(&mut self, query: T)
127    where
128        T: Into<String>,
129    {
130        self.query = Some(query.into());
131        self.table_name = None;
132    }
133
134    pub fn table_name(&self) -> Option<&str> {
135        self.table_name.as_deref()
136    }
137
138    /// Setting the table name clears the query
139    pub fn set_table_name<T>(&mut self, table: T)
140    where
141        T: Into<String>,
142    {
143        self.table_name = Some(table.into());
144        self.query = None;
145    }
146
147    pub fn comment(&self) -> Option<&str> {
148        self.comment.as_deref()
149    }
150
151    pub fn set_comment<T>(&mut self, comment: T)
152    where
153        T: Into<String>,
154    {
155        self.comment = Some(comment.into())
156    }
157
158    pub fn encoding(&self) -> Option<&str> {
159        self.encoding.as_deref()
160    }
161
162    pub fn set_encoding<T>(&mut self, encoding: T)
163    where
164        T: Into<String>,
165    {
166        self.encoding = Some(encoding.into())
167    }
168
169    pub fn null(&self) -> Option<&str> {
170        self.null.as_deref()
171    }
172
173    pub fn set_null<T>(&mut self, value: T)
174    where
175        T: Into<String>,
176    {
177        self.null = Some(value.into())
178    }
179
180    pub fn row_separator(&self) -> Terminator {
181        self.row_separator
182    }
183
184    pub fn set_row_separator(&mut self, sep: Terminator) {
185        self.row_separator = sep
186    }
187
188    pub fn column_separator(&self) -> u8 {
189        self.column_separator
190    }
191
192    pub fn set_column_separator(&mut self, sep: u8) {
193        self.column_separator = sep
194    }
195
196    pub fn column_delimiter(&self) -> u8 {
197        self.column_delimiter
198    }
199
200    pub fn set_column_delimiter(&mut self, delimiter: u8) {
201        self.column_delimiter = delimiter
202    }
203
204    pub fn with_column_names(&self) -> bool {
205        self.with_column_names
206    }
207
208    /// When this is `true`, which is the default, the column names header is also exported
209    /// as the first row. This is important for deserializing structs from rows, for instance.
210    pub fn set_with_column_names(&mut self, flag: bool) {
211        self.with_column_names = flag
212    }
213
214    fn validate_encryption(flag: bool) {
215        if flag && cfg!(not(any(feature = "native-tls-basic", feature = "rustls"))) {
216            panic!("native-tls or rustls features must be enabled to use encryption")
217        }
218    }
219
220    fn validate_compression(flag: bool) {
221        if flag && cfg!(not(feature = "flate2")) {
222            panic!("flate2 feature must be enabled to use compression")
223        }
224    }
225}
226
227/// HTTP Transport import options.
228///
229/// # Defaults
230///
231/// num_threads: 0 -> this means a thread per node will be spawned
232/// compression: false
233/// encryption: *if encryption features are enabled true, else false*
234/// columns: None -> all table columns will be considered
235/// comment: None
236/// encoding: None -> database default will be used
237/// null: None -> by default NULL values turn to ""
238/// row_separator: `csv` crate's special Terminator::CRLF
239/// column_separator: ','
240/// column_delimiter: '"'
241/// timeout: 120 seconds
242/// skip: 0 rows
243/// trim: None
244#[derive(Clone, Debug)]
245pub struct ImportOpts {
246    num_threads: usize,
247    compression: bool,
248    encryption: bool,
249    columns: Option<Vec<String>>,
250    table_name: Option<String>,
251    comment: Option<String>,
252    encoding: Option<String>,
253    null: Option<String>,
254    row_separator: Terminator,
255    column_separator: u8,
256    column_delimiter: u8,
257    timeout: Option<Duration>,
258    skip: usize,
259    trim: Option<TrimType>,
260}
261
262#[allow(clippy::derivable_impls)]
263impl Default for ImportOpts {
264    fn default() -> Self {
265        Self {
266            num_threads: 0,
267            compression: false,
268            encryption: cfg!(any(feature = "native-tls-basic", feature = "rustls")),
269            columns: None,
270            table_name: None,
271            comment: None,
272            encoding: None,
273            null: None,
274            row_separator: Terminator::CRLF,
275            column_separator: b',',
276            column_delimiter: b'"',
277            timeout: Some(Duration::from_secs(120)),
278            skip: 0,
279            trim: None,
280        }
281    }
282}
283
284impl HttpTransportOpts for ImportOpts {
285    fn num_threads(&self) -> usize {
286        self.num_threads
287    }
288
289    fn encryption(&self) -> bool {
290        self.encryption
291    }
292
293    fn compression(&self) -> bool {
294        self.compression
295    }
296
297    fn take_timeout(&mut self) -> Option<Duration> {
298        self.timeout.take()
299    }
300
301    fn set_timeout(&mut self, timeout: Option<Duration>) {
302        self.timeout = timeout
303    }
304}
305
306impl ImportOpts {
307    pub fn new() -> Self {
308        Self::default()
309    }
310
311    pub fn set_encryption(&mut self, flag: bool) {
312        Self::validate_encryption(flag);
313        self.encryption = flag
314    }
315
316    pub fn set_compression(&mut self, flag: bool) {
317        Self::validate_compression(flag);
318        self.compression = flag
319    }
320
321    pub fn set_num_threads(&mut self, num: usize) {
322        self.num_threads = num
323    }
324
325    pub fn columns(&self) -> Option<&[String]> {
326        self.columns.as_deref()
327    }
328
329    pub fn set_columns<I, T>(&mut self, columns: I)
330    where
331        I: IntoIterator<Item = T>,
332        T: Into<String>,
333    {
334        self.columns = Some(
335            columns
336                .into_iter()
337                .map(|s| s.into())
338                .collect::<Vec<String>>(),
339        );
340    }
341
342    pub fn table_name(&self) -> Option<&str> {
343        self.table_name.as_deref()
344    }
345
346    pub fn set_table_name<T>(&mut self, table: T)
347    where
348        T: Into<String>,
349    {
350        self.table_name = Some(table.into());
351    }
352
353    pub fn comment(&self) -> Option<&str> {
354        self.comment.as_deref()
355    }
356
357    pub fn set_comment<T>(&mut self, comment: T)
358    where
359        T: Into<String>,
360    {
361        self.comment = Some(comment.into())
362    }
363
364    pub fn encoding(&self) -> Option<&str> {
365        self.encoding.as_deref()
366    }
367
368    pub fn set_encoding<T>(&mut self, encoding: T)
369    where
370        T: Into<String>,
371    {
372        self.encoding = Some(encoding.into())
373    }
374
375    pub fn null(&self) -> Option<&str> {
376        self.null.as_deref()
377    }
378
379    pub fn set_null<T>(&mut self, value: T)
380    where
381        T: Into<String>,
382    {
383        self.null = Some(value.into())
384    }
385
386    pub fn row_separator(&self) -> Terminator {
387        self.row_separator
388    }
389
390    pub fn set_row_separator(&mut self, sep: Terminator) {
391        self.row_separator = sep
392    }
393
394    pub fn column_separator(&self) -> u8 {
395        self.column_separator
396    }
397
398    pub fn set_column_separator(&mut self, sep: u8) {
399        self.column_separator = sep
400    }
401
402    pub fn column_delimiter(&self) -> u8 {
403        self.column_delimiter
404    }
405
406    pub fn set_column_delimiter(&mut self, delimiter: u8) {
407        self.column_delimiter = delimiter
408    }
409
410    pub fn skip(&self) -> usize {
411        self.skip
412    }
413
414    /// Skipping rows could be used for skipping the header row of a file, for instance.
415    pub fn set_skip(&mut self, num: usize) {
416        self.skip = num
417    }
418
419    pub fn trim(&self) -> Option<TrimType> {
420        self.trim
421    }
422
423    pub fn set_trim(&mut self, trim: Option<TrimType>) {
424        self.trim = trim
425    }
426
427    fn validate_encryption(flag: bool) {
428        if flag && cfg!(not(any(feature = "native-tls-basic", feature = "rustls"))) {
429            panic!("native-tls or rustls features must be enabled to use encryption")
430        }
431    }
432
433    fn validate_compression(flag: bool) {
434        if flag && cfg!(not(feature = "flate2")) {
435            panic!("flate2 feature must be enabled to use compression")
436        }
437    }
438}
439
440/// Trim options for IMPORT
441#[derive(Debug, Clone, Copy)]
442pub enum TrimType {
443    Left,
444    Right,
445    Both,
446}
447
448impl Display for TrimType {
449    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
450        match self {
451            Self::Left => write!(f, "LTRIM"),
452            Self::Right => write!(f, "RTRIM"),
453            Self::Both => write!(f, "TRIM"),
454        }
455    }
456}
457
458/// Struct that holds internal utilities and parameters for HTTP transport
459#[derive(Clone, Debug)]
460pub struct HttpTransportConfig {
461    pub barrier: Arc<Barrier>,
462    pub run: Arc<AtomicBool>,
463    pub addr_sender: Sender<String>,
464    pub server_addr: String,
465    pub encryption: bool,
466    pub compression: bool,
467    pub timeout: Option<Duration>,
468}
469
470impl HttpTransportConfig {
471    /// Generates a Vec of configs, one for each given address
472    pub fn generate(
473        hosts: Vec<String>,
474        barrier: Arc<Barrier>,
475        run: Arc<AtomicBool>,
476        addr_sender: Sender<String>,
477        use_encryption: bool,
478        use_compression: bool,
479        timeout: Option<Duration>,
480    ) -> Vec<Self> {
481        hosts
482            .into_iter()
483            .map(|server_addr| Self {
484                server_addr,
485                barrier: barrier.clone(),
486                run: run.clone(),
487                addr_sender: addr_sender.clone(),
488                encryption: use_encryption,
489                compression: use_compression,
490                timeout,
491            })
492            .collect()
493    }
494
495    pub fn take_timeout(&mut self) -> Option<Duration> {
496        self.timeout.take()
497    }
498}