#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RowSeparator {
#[default]
LF,
CR,
CRLF,
}
impl RowSeparator {
pub fn to_sql(&self) -> &'static str {
match self {
RowSeparator::LF => "LF",
RowSeparator::CR => "CR",
RowSeparator::CRLF => "CRLF",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Compression {
#[default]
None,
Gzip,
Bzip2,
}
impl Compression {
pub fn extension(&self) -> &'static str {
match self {
Compression::None => ".csv",
Compression::Gzip => ".csv.gz",
Compression::Bzip2 => ".csv.bz2",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ImportFormat {
#[default]
Csv,
Parquet,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TrimMode {
#[default]
None,
LTrim,
RTrim,
Trim,
}
impl TrimMode {
pub fn to_sql(&self) -> Option<&'static str> {
match self {
TrimMode::None => None,
TrimMode::LTrim => Some("LTRIM"),
TrimMode::RTrim => Some("RTRIM"),
TrimMode::Trim => Some("TRIM"),
}
}
}
#[derive(Debug, Clone)]
pub struct ImportFileEntry {
pub address: String,
pub file_name: String,
pub public_key: Option<String>,
}
impl ImportFileEntry {
pub fn new(address: String, file_name: String, public_key: Option<String>) -> Self {
Self {
address,
file_name,
public_key,
}
}
}
#[derive(Debug, Clone)]
pub struct ImportQuery {
table: String,
schema: Option<String>,
columns: Option<Vec<String>>,
address: Option<String>,
public_key: Option<String>,
file_name: String,
file_entries: Option<Vec<ImportFileEntry>>,
column_separator: char,
column_delimiter: char,
row_separator: RowSeparator,
encoding: String,
skip: u32,
null_value: Option<String>,
trim: TrimMode,
compression: Compression,
reject_limit: Option<u32>,
format: ImportFormat,
}
impl ImportQuery {
pub fn new(table: &str) -> Self {
Self {
table: table.to_string(),
schema: None,
columns: None,
address: None,
public_key: None,
file_name: "001.csv".to_string(),
file_entries: None,
column_separator: ',',
column_delimiter: '"',
row_separator: RowSeparator::default(),
encoding: "UTF-8".to_string(),
skip: 0,
null_value: None,
trim: TrimMode::default(),
compression: Compression::default(),
reject_limit: None,
format: ImportFormat::default(),
}
}
pub fn schema(mut self, schema: &str) -> Self {
self.schema = Some(schema.to_string());
self
}
pub fn columns(mut self, cols: Vec<&str>) -> Self {
self.columns = Some(cols.into_iter().map(String::from).collect());
self
}
pub fn at_address(mut self, addr: &str) -> Self {
self.address = Some(addr.to_string());
self
}
pub fn with_public_key(mut self, fingerprint: &str) -> Self {
self.public_key = Some(fingerprint.to_string());
self
}
pub fn file_name(mut self, name: &str) -> Self {
self.file_name = name.to_string();
self
}
pub fn column_separator(mut self, sep: char) -> Self {
self.column_separator = sep;
self
}
pub fn column_delimiter(mut self, delim: char) -> Self {
self.column_delimiter = delim;
self
}
pub fn row_separator(mut self, sep: RowSeparator) -> Self {
self.row_separator = sep;
self
}
pub fn encoding(mut self, enc: &str) -> Self {
self.encoding = enc.to_string();
self
}
pub fn skip(mut self, rows: u32) -> Self {
self.skip = rows;
self
}
pub fn null_value(mut self, val: &str) -> Self {
self.null_value = Some(val.to_string());
self
}
pub fn trim(mut self, trim: TrimMode) -> Self {
self.trim = trim;
self
}
pub fn compressed(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
pub fn reject_limit(mut self, limit: u32) -> Self {
self.reject_limit = Some(limit);
self
}
pub fn with_format(mut self, format: ImportFormat) -> Self {
self.format = format;
self
}
pub fn with_files(mut self, entries: Vec<ImportFileEntry>) -> Self {
self.file_entries = Some(entries);
self
}
fn get_file_name(&self) -> String {
let base_name = strip_known_extensions(&self.file_name);
match self.format {
ImportFormat::Parquet => format!("{}.parquet", base_name),
ImportFormat::Csv => format!("{}{}", base_name, self.compression.extension()),
}
}
pub fn build(&self) -> String {
let mut sql = String::with_capacity(512);
sql.push_str("IMPORT INTO ");
if let Some(ref schema) = self.schema {
sql.push_str(schema);
sql.push('.');
}
sql.push_str(&self.table);
if let Some(ref cols) = self.columns {
sql.push_str(" (");
sql.push_str(&cols.join(", "));
sql.push(')');
}
let is_parquet = matches!(self.format, ImportFormat::Parquet);
let format_keyword = if is_parquet { "PARQUET" } else { "CSV" };
if let Some(ref entries) = self.file_entries {
sql.push_str("\nFROM ");
sql.push_str(format_keyword);
for entry in entries {
sql.push_str(" AT '");
if entry.public_key.is_some() {
sql.push_str("https://");
} else {
sql.push_str("http://");
}
sql.push_str(&entry.address);
if is_parquet {
sql.push_str(";MaxConcurrentReads=1");
}
sql.push('\'');
if let Some(ref pk) = entry.public_key {
sql.push_str(" PUBLIC KEY '");
sql.push_str(pk);
sql.push('\'');
}
sql.push_str(" FILE '");
sql.push_str(&self.get_file_name_for(&entry.file_name));
sql.push('\'');
}
} else {
sql.push_str("\nFROM ");
sql.push_str(format_keyword);
sql.push_str(" AT '");
if self.public_key.is_some() {
sql.push_str("https://");
} else {
sql.push_str("http://");
}
if let Some(ref addr) = self.address {
sql.push_str(addr);
}
if is_parquet {
sql.push_str(";MaxConcurrentReads=1");
}
sql.push('\'');
if let Some(ref pk) = self.public_key {
sql.push_str(" PUBLIC KEY '");
sql.push_str(pk);
sql.push('\'');
}
sql.push_str("\nFILE '");
sql.push_str(&self.get_file_name());
sql.push('\'');
}
if !is_parquet {
sql.push_str("\nENCODING = '");
sql.push_str(&self.encoding);
sql.push('\'');
sql.push_str("\nCOLUMN SEPARATOR = '");
sql.push(self.column_separator);
sql.push('\'');
sql.push_str("\nCOLUMN DELIMITER = '");
sql.push(self.column_delimiter);
sql.push('\'');
sql.push_str("\nROW SEPARATOR = '");
sql.push_str(self.row_separator.to_sql());
sql.push('\'');
if self.skip > 0 {
sql.push_str("\nSKIP = ");
sql.push_str(&self.skip.to_string());
}
if let Some(ref null_val) = self.null_value {
sql.push_str("\nNULL = '");
sql.push_str(null_val);
sql.push('\'');
}
if let Some(trim_sql) = self.trim.to_sql() {
sql.push_str("\nTRIM = '");
sql.push_str(trim_sql);
sql.push('\'');
}
if let Some(limit) = self.reject_limit {
sql.push_str("\nREJECT LIMIT ");
sql.push_str(&limit.to_string());
}
}
sql
}
fn get_file_name_for(&self, base_name: &str) -> String {
let base = strip_known_extensions(base_name);
match self.format {
ImportFormat::Parquet => format!("{}.parquet", base),
ImportFormat::Csv => format!("{}{}", base, self.compression.extension()),
}
}
}
fn strip_known_extensions(name: &str) -> &str {
let mut current = name;
loop {
let stripped = current
.strip_suffix(".gz")
.or_else(|| current.strip_suffix(".bz2"))
.or_else(|| current.strip_suffix(".csv"))
.or_else(|| current.strip_suffix(".parquet"));
match stripped {
Some(next) if next.len() < current.len() => current = next,
_ => return current,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_import_statement() {
let sql = ImportQuery::new("users")
.at_address("192.168.1.1:8080")
.build();
assert!(sql.contains("IMPORT INTO users"));
assert!(sql.contains("FROM CSV AT 'http://192.168.1.1:8080'"));
assert!(sql.contains("FILE '001.csv'"));
assert!(sql.contains("ENCODING = 'UTF-8'"));
assert!(sql.contains("COLUMN SEPARATOR = ','"));
assert!(sql.contains("COLUMN DELIMITER = '\"'"));
assert!(sql.contains("ROW SEPARATOR = 'LF'"));
}
#[test]
fn test_import_with_schema() {
let sql = ImportQuery::new("users")
.schema("myschema")
.at_address("192.168.1.1:8080")
.build();
assert!(sql.contains("IMPORT INTO myschema.users"));
}
#[test]
fn test_import_with_columns() {
let sql = ImportQuery::new("users")
.columns(vec!["id", "name", "email"])
.at_address("192.168.1.1:8080")
.build();
assert!(sql.contains("IMPORT INTO users (id, name, email)"));
}
#[test]
fn test_import_with_all_format_options() {
let sql = ImportQuery::new("data")
.at_address("10.0.0.1:9000")
.column_separator(';')
.column_delimiter('\'')
.row_separator(RowSeparator::CRLF)
.encoding("ISO-8859-1")
.skip(2)
.null_value("NULL")
.trim(TrimMode::Trim)
.reject_limit(100)
.build();
assert!(sql.contains("COLUMN SEPARATOR = ';'"));
assert!(sql.contains("COLUMN DELIMITER = '''"));
assert!(sql.contains("ROW SEPARATOR = 'CRLF'"));
assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
assert!(sql.contains("SKIP = 2"));
assert!(sql.contains("NULL = 'NULL'"));
assert!(sql.contains("TRIM = 'TRIM'"));
assert!(sql.contains("REJECT LIMIT 100"));
}
#[test]
fn test_import_with_use_tls() {
let fingerprint = "SHA256:abc123def456";
let sql = ImportQuery::new("secure_data")
.at_address("192.168.1.1:8443")
.with_public_key(fingerprint)
.build();
assert!(sql.contains("FROM CSV AT 'https://192.168.1.1:8443'"));
assert!(sql.contains(&format!("PUBLIC KEY '{}'", fingerprint)));
}
#[test]
fn test_import_with_gzip_compression() {
let sql = ImportQuery::new("compressed_data")
.at_address("192.168.1.1:8080")
.compressed(Compression::Gzip)
.build();
assert!(sql.contains("FILE '001.csv.gz'"));
}
#[test]
fn test_import_with_bzip2_compression() {
let sql = ImportQuery::new("compressed_data")
.at_address("192.168.1.1:8080")
.compressed(Compression::Bzip2)
.build();
assert!(sql.contains("FILE '001.csv.bz2'"));
}
#[test]
fn test_import_custom_file_name() {
let sql = ImportQuery::new("data")
.at_address("192.168.1.1:8080")
.file_name("custom_file")
.build();
assert!(sql.contains("FILE 'custom_file.csv'"));
}
#[test]
fn test_import_custom_file_name_with_compression() {
let sql = ImportQuery::new("data")
.at_address("192.168.1.1:8080")
.file_name("custom_file")
.compressed(Compression::Gzip)
.build();
assert!(sql.contains("FILE 'custom_file.csv.gz'"));
}
#[test]
fn test_row_separator_to_sql() {
assert_eq!(RowSeparator::LF.to_sql(), "LF");
assert_eq!(RowSeparator::CR.to_sql(), "CR");
assert_eq!(RowSeparator::CRLF.to_sql(), "CRLF");
}
#[test]
fn test_compression_extension() {
assert_eq!(Compression::None.extension(), ".csv");
assert_eq!(Compression::Gzip.extension(), ".csv.gz");
assert_eq!(Compression::Bzip2.extension(), ".csv.bz2");
}
#[test]
fn test_trim_mode_to_sql() {
assert_eq!(TrimMode::None.to_sql(), None);
assert_eq!(TrimMode::LTrim.to_sql(), Some("LTRIM"));
assert_eq!(TrimMode::RTrim.to_sql(), Some("RTRIM"));
assert_eq!(TrimMode::Trim.to_sql(), Some("TRIM"));
}
#[test]
fn test_defaults() {
assert_eq!(RowSeparator::default(), RowSeparator::LF);
assert_eq!(Compression::default(), Compression::None);
assert_eq!(TrimMode::default(), TrimMode::None);
}
#[test]
fn test_import_file_entry() {
let entry = ImportFileEntry::new(
"10.0.0.5:8563".to_string(),
"001.csv".to_string(),
Some("sha256//abc123".to_string()),
);
assert_eq!(entry.address, "10.0.0.5:8563");
assert_eq!(entry.file_name, "001.csv");
assert_eq!(entry.public_key, Some("sha256//abc123".to_string()));
}
#[test]
fn test_multi_file_import_basic() {
let entries = vec![
ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
];
let sql = ImportQuery::new("my_table").with_files(entries).build();
assert!(sql.contains("IMPORT INTO my_table"));
assert!(sql.contains("FROM CSV"));
assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
}
#[test]
fn test_multi_file_import_with_tls() {
let entries = vec![
ImportFileEntry::new(
"10.0.0.5:8563".to_string(),
"001.csv".to_string(),
Some("sha256//fingerprint1".to_string()),
),
ImportFileEntry::new(
"10.0.0.6:8564".to_string(),
"002.csv".to_string(),
Some("sha256//fingerprint2".to_string()),
),
];
let sql = ImportQuery::new("secure_table").with_files(entries).build();
assert!(sql.contains("AT 'https://10.0.0.5:8563' PUBLIC KEY 'sha256//fingerprint1'"));
assert!(sql.contains("AT 'https://10.0.0.6:8564' PUBLIC KEY 'sha256//fingerprint2'"));
}
#[test]
fn test_multi_file_import_with_compression() {
let entries = vec![
ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
];
let sql = ImportQuery::new("compressed_table")
.with_files(entries)
.compressed(Compression::Gzip)
.build();
assert!(sql.contains("FILE '001.csv.gz'"));
assert!(sql.contains("FILE '002.csv.gz'"));
}
#[test]
fn test_multi_file_import_three_files() {
let entries = vec![
ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
ImportFileEntry::new("10.0.0.7:8565".to_string(), "003.csv".to_string(), None),
];
let sql = ImportQuery::new("data").with_files(entries).build();
assert!(sql.contains("AT 'http://10.0.0.5:8563' FILE '001.csv'"));
assert!(sql.contains("AT 'http://10.0.0.6:8564' FILE '002.csv'"));
assert!(sql.contains("AT 'http://10.0.0.7:8565' FILE '003.csv'"));
}
#[test]
fn test_multi_file_import_with_schema_and_columns() {
let entries = vec![
ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
];
let sql = ImportQuery::new("my_table")
.schema("my_schema")
.columns(vec!["id", "name", "value"])
.with_files(entries)
.build();
assert!(sql.contains("IMPORT INTO my_schema.my_table (id, name, value)"));
}
#[test]
fn test_multi_file_import_with_all_options() {
let entries = vec![
ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.csv".to_string(), None),
ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.csv".to_string(), None),
];
let sql = ImportQuery::new("data")
.with_files(entries)
.encoding("ISO-8859-1")
.column_separator(';')
.skip(1)
.null_value("NULL")
.reject_limit(100)
.build();
assert!(sql.contains("ENCODING = 'ISO-8859-1'"));
assert!(sql.contains("COLUMN SEPARATOR = ';'"));
assert!(sql.contains("SKIP = 1"));
assert!(sql.contains("NULL = 'NULL'"));
assert!(sql.contains("REJECT LIMIT 100"));
}
#[test]
fn test_import_no_skip_when_zero() {
let sql = ImportQuery::new("data")
.at_address("192.168.1.1:8080")
.build();
assert!(!sql.contains("SKIP"));
}
#[test]
fn test_import_skip_header_row() {
let sql = ImportQuery::new("data")
.at_address("192.168.1.1:8080")
.skip(1)
.build();
assert!(sql.contains("SKIP = 1"));
}
#[test]
fn test_complete_import_statement_format() {
let sql = ImportQuery::new("employees")
.schema("hr")
.columns(vec!["id", "first_name", "last_name", "department"])
.at_address("10.20.30.40:8080")
.with_public_key("SHA256:fingerprint123")
.skip(1)
.reject_limit(10)
.build();
let expected_parts = [
"IMPORT INTO hr.employees (id, first_name, last_name, department)",
"FROM CSV AT 'https://10.20.30.40:8080' PUBLIC KEY 'SHA256:fingerprint123'",
"FILE '001.csv'",
"ENCODING = 'UTF-8'",
"COLUMN SEPARATOR = ','",
"COLUMN DELIMITER = '\"'",
"ROW SEPARATOR = 'LF'",
"SKIP = 1",
"REJECT LIMIT 10",
];
for part in expected_parts {
assert!(sql.contains(part), "SQL should contain: {}", part);
}
}
#[test]
fn test_import_query_format_parquet_single_file() {
let sql = ImportQuery::new("my_table")
.at_address("10.0.0.5:8563")
.with_format(ImportFormat::Parquet)
.build();
assert!(sql.contains("FROM PARQUET AT 'http://10.0.0.5:8563;MaxConcurrentReads=1'"));
assert!(sql.contains("FILE '001.parquet'"));
assert!(!sql.contains("ENCODING"));
}
#[test]
fn test_import_query_format_parquet_multi_file() {
let entries = vec![
ImportFileEntry::new("10.0.0.5:8563".to_string(), "001.parquet".to_string(), None),
ImportFileEntry::new("10.0.0.6:8564".to_string(), "002.parquet".to_string(), None),
];
let sql = ImportQuery::new("my_table")
.with_files(entries)
.with_format(ImportFormat::Parquet)
.build();
assert!(sql.contains("FROM PARQUET"));
assert!(sql.contains("AT 'http://10.0.0.5:8563;MaxConcurrentReads=1'"));
assert!(sql.contains("AT 'http://10.0.0.6:8564;MaxConcurrentReads=1'"));
assert!(sql.contains("FILE '001.parquet'"));
assert!(sql.contains("FILE '002.parquet'"));
assert!(!sql.contains("MULTIPLE LOCAL FILES"));
}
#[test]
fn test_import_query_format_parquet_with_tls_emits_public_key() {
let sql = ImportQuery::new("t")
.at_address("10.0.0.5:8563")
.with_public_key("sha256//abc")
.with_format(ImportFormat::Parquet)
.build();
assert!(sql
.contains("AT 'https://10.0.0.5:8563;MaxConcurrentReads=1' PUBLIC KEY 'sha256//abc'"));
}
#[test]
fn test_import_query_format_parquet_omits_csv_format_options() {
let sql = ImportQuery::new("t")
.at_address("addr:1234")
.with_format(ImportFormat::Parquet)
.skip(2)
.null_value("NULL")
.trim(TrimMode::Trim)
.reject_limit(100)
.build();
for keyword in &[
"ENCODING",
"COLUMN SEPARATOR",
"COLUMN DELIMITER",
"ROW SEPARATOR",
"SKIP",
"NULL",
"TRIM",
"REJECT LIMIT",
] {
assert!(
!sql.contains(keyword),
"SQL should NOT contain {} for Parquet format",
keyword
);
}
}
#[test]
fn test_import_query_parquet_ignores_compression() {
let sql = ImportQuery::new("t")
.at_address("addr:1234")
.with_format(ImportFormat::Parquet)
.compressed(Compression::Gzip)
.build();
assert!(sql.contains("FILE '001.parquet'"));
assert!(!sql.contains(".gz"));
}
#[test]
fn test_import_query_format_csv_unchanged() {
let sql = ImportQuery::new("t").at_address("192.168.1.1:8080").build();
assert!(sql.contains("FROM CSV AT 'http://192.168.1.1:8080'"));
assert!(sql.contains("ENCODING = 'UTF-8'"));
assert!(sql.contains("COLUMN SEPARATOR = ','"));
}
}