csv_managed/
lib.rs

1pub mod append;
2pub mod cli;
3pub mod columns;
4pub mod data;
5pub mod derive;
6pub mod filter;
7pub mod frequency;
8pub mod index;
9pub mod install;
10pub mod io_utils;
11pub mod join;
12pub mod preview;
13pub mod process;
14pub mod schema;
15pub mod schema_cmd;
16pub mod stats;
17pub mod table;
18pub mod verify;
19
20use std::{env, ffi::OsString, sync::OnceLock, time::Instant};
21
22use anyhow::{Context, Result};
23use chrono::{SecondsFormat, Utc};
24use clap::Parser;
25use log::{LevelFilter, debug, error, info};
26
27use crate::cli::{Cli, Commands};
28
29static LOGGER: OnceLock<()> = OnceLock::new();
30
31fn init_logging() {
32    LOGGER.get_or_init(|| {
33        let mut builder = env_logger::Builder::from_env(env_logger::Env::default());
34        if env::var("RUST_LOG").is_err() {
35            builder.filter_module("csv_managed", LevelFilter::Info);
36        }
37        let _ = builder.format_timestamp_millis().try_init();
38    });
39}
40
41pub fn run() -> Result<()> {
42    init_logging();
43    let cli = Cli::parse_from(preprocess_cli_args(env::args_os()));
44    match cli.command {
45        Commands::Probe(args) => run_operation("probe", || handle_probe(&args)),
46        Commands::Index(args) => run_operation("index", || handle_index(&args)),
47        Commands::Schema(args) => run_operation("schema", || schema_cmd::execute(&args)),
48        Commands::Process(args) => run_operation("process", || process::execute(&args)),
49        Commands::Append(args) => run_operation("append", || append::execute(&args)),
50        Commands::Verify(args) => run_operation("verify", || verify::execute(&args)),
51        Commands::Preview(args) => run_operation("preview", || preview::execute(&args)),
52        Commands::Stats(args) => run_operation("stats", || stats::execute(&args)),
53        Commands::Frequency(args) => run_operation("frequency", || frequency::execute(&args)),
54        Commands::Join(args) => run_operation("join", || join::execute(&args)),
55        Commands::Install(args) => run_operation("install", || install::execute(&args)),
56        Commands::Columns(args) => run_operation("columns", || columns::execute(&args)),
57    }
58}
59
60fn preprocess_cli_args<I>(args: I) -> Vec<OsString>
61where
62    I: IntoIterator<Item = OsString>,
63{
64    let mut processed = Vec::new();
65    for arg in args {
66        if let Some(value) = arg.to_str()
67            && let Some(rest) = value.strip_prefix("--report-invalid:")
68        {
69            processed.push(OsString::from("--report-invalid"));
70            for segment in rest.split(':').filter(|segment| !segment.is_empty()) {
71                processed.push(OsString::from(segment));
72            }
73            continue;
74        }
75        processed.push(arg);
76    }
77    processed
78}
79
80fn run_operation<F>(name: &str, op: F) -> Result<()>
81where
82    F: FnOnce() -> Result<()>,
83{
84    let start_clock = Utc::now();
85    let start_instant = Instant::now();
86    let result = op();
87    let end_clock = Utc::now();
88    let duration_secs = start_instant.elapsed().as_secs_f64();
89    let start_str = start_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
90    let end_str = end_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
91
92    match &result {
93        Ok(_) => info!(
94            "Operation '{name}' completed (status=ok)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}"
95        ),
96        Err(err) => error!(
97            "Operation '{name}' failed (status=error)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}\nerror: {err:?}"
98        ),
99    }
100
101    result
102}
103
104fn handle_probe(args: &cli::ProbeArgs) -> Result<()> {
105    let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
106    let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
107    info!(
108        "Probing '{}' with delimiter '{}'",
109        args.input.display(),
110        printable_delimiter(delimiter)
111    );
112    let mut schema = schema::infer_schema(&args.input, args.sample_rows, delimiter, encoding)
113        .with_context(|| format!("Inferring schema from {input:?}", input = args.input))?;
114    if args.mapping {
115        apply_default_name_mappings(&mut schema);
116    }
117    if args.replace_template {
118        schema
119            .save_with_replace_template(&args.schema)
120            .with_context(|| format!("Writing schema to {:?}", args.schema))?;
121    } else {
122        schema
123            .save(&args.schema)
124            .with_context(|| format!("Writing schema to {:?}", args.schema))?;
125    }
126    info!(
127        "Inferred schema for {} column(s) written to {:?}",
128        schema.columns.len(),
129        args.schema
130    );
131
132    if args.mapping {
133        emit_mappings(&schema);
134    }
135    Ok(())
136}
137
138fn apply_default_name_mappings(schema: &mut schema::Schema) {
139    for column in &mut schema.columns {
140        if column.rename.is_none() {
141            column.rename = Some(to_lower_snake_case(&column.name));
142        }
143    }
144}
145
146fn to_lower_snake_case(value: &str) -> String {
147    let mut result = String::new();
148    let mut chars = value.chars().peekable();
149    let mut last_was_separator = true;
150    let mut last_was_upper = false;
151    while let Some(ch) = chars.next() {
152        if ch.is_ascii_alphanumeric() {
153            if ch.is_ascii_uppercase() {
154                let next_is_lowercase = chars
155                    .peek()
156                    .map(|c| c.is_ascii_lowercase())
157                    .unwrap_or(false);
158                if !result.is_empty()
159                    && (!last_was_separator && (!last_was_upper || next_is_lowercase))
160                    && !result.ends_with('_')
161                {
162                    result.push('_');
163                }
164                result.push(ch.to_ascii_lowercase());
165                last_was_separator = false;
166                last_was_upper = true;
167            } else {
168                if !result.is_empty() && last_was_separator && !result.ends_with('_') {
169                    result.push('_');
170                }
171                result.push(ch.to_ascii_lowercase());
172                last_was_separator = false;
173                last_was_upper = false;
174            }
175        } else {
176            if !result.ends_with('_') && !result.is_empty() {
177                result.push('_');
178            }
179            last_was_separator = true;
180            last_was_upper = false;
181        }
182    }
183    while result.ends_with('_') {
184        result.pop();
185    }
186    if result.is_empty() {
187        value.to_ascii_lowercase()
188    } else {
189        result
190    }
191}
192
193fn emit_mappings(schema: &schema::Schema) {
194    if schema.columns.is_empty() {
195        println!("No columns found to emit mappings.");
196        return;
197    }
198    let mut rows = Vec::with_capacity(schema.columns.len());
199    for (idx, column) in schema.columns.iter().enumerate() {
200        let mapping = format!("{}:{}->", column.name, column.datatype.as_str());
201        rows.push(vec![
202            (idx + 1).to_string(),
203            column.name.clone(),
204            column.datatype.to_string(),
205            mapping,
206        ]);
207    }
208    let headers = vec![
209        "#".to_string(),
210        "name".to_string(),
211        "type".to_string(),
212        "mapping".to_string(),
213    ];
214    table::print_table(&headers, &rows);
215}
216
217fn handle_index(args: &cli::IndexArgs) -> Result<()> {
218    let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
219    let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
220    info!(
221        "Building index for '{}' using delimiter '{}'",
222        args.input.display(),
223        printable_delimiter(delimiter)
224    );
225    let schema = match &args.schema {
226        Some(path) => Some(
227            schema::Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?,
228        ),
229        None => None,
230    };
231    let mut definitions = Vec::new();
232    for spec in &args.specs {
233        let definition = index::IndexDefinition::parse(spec)
234            .with_context(|| format!("Parsing index specification '{spec}'"))?;
235        definitions.push(definition);
236    }
237    for combo in &args.combos {
238        let expanded = index::IndexDefinition::expand_combo_spec(combo)
239            .with_context(|| format!("Parsing index combination '{combo}'"))?;
240        definitions.extend(expanded);
241    }
242    if definitions.is_empty() {
243        let columns = args
244            .columns
245            .iter()
246            .map(|c| c.trim())
247            .filter(|c| !c.is_empty())
248            .map(|c| c.to_string())
249            .collect::<Vec<_>>();
250        let definition = index::IndexDefinition::from_columns(columns)
251            .context("Parsing --columns for index build")?;
252        definitions.push(definition);
253    }
254    debug!("Index definitions: {:?}", definitions.len());
255    let index = index::CsvIndex::build(
256        &args.input,
257        &definitions,
258        schema.as_ref(),
259        args.limit,
260        delimiter,
261        encoding,
262    )
263    .with_context(|| format!("Building index for {:?}", args.input))?;
264    let row_count = index.row_count();
265    index
266        .save(&args.index)
267        .with_context(|| format!("Writing index to {:?}", args.index))?;
268    info!(
269        "Index with {} variant(s) for {} row(s) written to {:?}",
270        index.variants().len(),
271        row_count,
272        args.index
273    );
274    for variant in index.variants() {
275        info!("  • {}", variant.describe());
276    }
277    Ok(())
278}
279
280pub(crate) fn printable_delimiter(delimiter: u8) -> String {
281    match delimiter {
282        b',' => ",".to_string(),
283        b'\t' => "\\t".to_string(),
284        b'\n' => "\\n".to_string(),
285        other => (other as char).to_string(),
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::to_lower_snake_case;
292
293    #[test]
294    fn converts_camel_case_to_snake() {
295        assert_eq!(to_lower_snake_case("OrderDate"), "order_date");
296    }
297
298    #[test]
299    fn collapses_separators() {
300        assert_eq!(to_lower_snake_case("customer-name"), "customer_name");
301        assert_eq!(to_lower_snake_case("customer  name"), "customer_name");
302    }
303
304    #[test]
305    fn handles_acronyms() {
306        assert_eq!(to_lower_snake_case("APIKey"), "api_key");
307        assert_eq!(to_lower_snake_case("HTTPStatus"), "http_status");
308    }
309}