csv_managed/
lib.rs

1pub mod append;
2pub mod cli;
3pub mod columns;
4pub mod data;
5pub mod derive;
6pub mod expr;
7pub mod filter;
8pub mod frequency;
9pub mod index;
10pub mod install;
11pub mod io_utils;
12pub mod join;
13pub mod process;
14pub mod rows;
15pub mod schema;
16pub mod schema_cmd;
17pub mod stats;
18pub mod table;
19pub mod verify;
20
21use std::{env, ffi::OsString, sync::OnceLock, time::Instant};
22
23use anyhow::{Context, Result};
24use chrono::{SecondsFormat, Utc};
25use clap::Parser;
26use log::{LevelFilter, debug, error, info};
27
28use crate::cli::{Cli, Commands};
29
30static LOGGER: OnceLock<()> = OnceLock::new();
31
32fn init_logging() {
33    LOGGER.get_or_init(|| {
34        let mut builder = env_logger::Builder::from_env(env_logger::Env::default());
35        if env::var("RUST_LOG").is_err() {
36            builder.filter_module("csv_managed", LevelFilter::Info);
37        }
38        let _ = builder.format_timestamp_millis().try_init();
39    });
40}
41
42pub fn run() -> Result<()> {
43    init_logging();
44    let cli = Cli::parse_from(preprocess_cli_args(env::args_os()));
45    match cli.command {
46        Commands::Index(args) => run_operation("index", || handle_index(&args)),
47        Commands::Schema(args) => run_operation("schema", || schema_cmd::execute(&args)),
48        Commands::Process(args) => run_operation("process", || process::execute(&args)),
49        Commands::Append(args) => run_operation("append", || append::execute(&args)),
50        Commands::Stats(args) => run_operation("stats", || stats::execute(&args)),
51        // Commands::Join(args) => run_operation("join", || join::execute(&args)),
52        Commands::Install(args) => run_operation("install", || install::execute(&args)),
53    }
54}
55
56fn preprocess_cli_args<I>(args: I) -> Vec<OsString>
57where
58    I: IntoIterator<Item = OsString>,
59{
60    let mut processed = Vec::new();
61    for arg in args {
62        if let Some(value) = arg.to_str()
63            && let Some(rest) = value.strip_prefix("--report-invalid:")
64        {
65            processed.push(OsString::from("--report-invalid"));
66            for segment in rest.split(':').filter(|segment| !segment.is_empty()) {
67                processed.push(OsString::from(segment));
68            }
69            continue;
70        }
71        processed.push(arg);
72    }
73    processed
74}
75
76fn run_operation<F>(name: &str, op: F) -> Result<()>
77where
78    F: FnOnce() -> Result<()>,
79{
80    let start_clock = Utc::now();
81    let start_instant = Instant::now();
82    let result = op();
83    let end_clock = Utc::now();
84    let duration_secs = start_instant.elapsed().as_secs_f64();
85    let start_str = start_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
86    let end_str = end_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
87
88    match &result {
89        Ok(_) => info!(
90            "Operation '{name}' completed (status=ok)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}"
91        ),
92        Err(err) => error!(
93            "Operation '{name}' failed (status=error)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}\nerror: {err:?}"
94        ),
95    }
96
97    result
98}
99
100fn handle_index(args: &cli::IndexArgs) -> Result<()> {
101    let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
102    let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
103    info!(
104        "Building index for '{}' using delimiter '{}'",
105        args.input.display(),
106        printable_delimiter(delimiter)
107    );
108    let schema = match &args.schema {
109        Some(path) => Some(
110            schema::Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?,
111        ),
112        None => None,
113    };
114    let mut definitions = Vec::new();
115    for spec in &args.specs {
116        let definition = index::IndexDefinition::parse(spec)
117            .with_context(|| format!("Parsing index specification '{spec}'"))?;
118        definitions.push(definition);
119    }
120    for covering in &args.coverings {
121        let expanded = index::IndexDefinition::expand_covering_spec(covering)
122            .with_context(|| format!("Parsing index covering '{covering}'"))?;
123        definitions.extend(expanded);
124    }
125    if definitions.is_empty() {
126        let columns = args
127            .columns
128            .iter()
129            .map(|c| c.trim())
130            .filter(|c| !c.is_empty())
131            .map(|c| c.to_string())
132            .collect::<Vec<_>>();
133        let definition = index::IndexDefinition::from_columns(columns)
134            .context("Parsing --columns for index build")?;
135        definitions.push(definition);
136    }
137    debug!("Index definitions: {:?}", definitions.len());
138    let index = index::CsvIndex::build(
139        &args.input,
140        &definitions,
141        schema.as_ref(),
142        args.limit,
143        delimiter,
144        encoding,
145    )
146    .with_context(|| format!("Building index for {:?}", args.input))?;
147    let row_count = index.row_count();
148    index
149        .save(&args.index)
150        .with_context(|| format!("Writing index to {:?}", args.index))?;
151    info!(
152        "Index with {} variant(s) for {} row(s) written to {:?}",
153        index.variants().len(),
154        row_count,
155        args.index
156    );
157    for variant in index.variants() {
158        info!("  • {}", variant.describe());
159    }
160    Ok(())
161}
162
163pub(crate) fn printable_delimiter(delimiter: u8) -> String {
164    match delimiter {
165        b',' => ",".to_string(),
166        b'\t' => "\\t".to_string(),
167        b'\n' => "\\n".to_string(),
168        other => (other as char).to_string(),
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use proptest::prelude::*;
176
177    fn to_strings(args: &[OsString]) -> Vec<String> {
178        args.iter()
179            .map(|value| value.to_string_lossy().to_string())
180            .collect()
181    }
182
183    #[test]
184    fn preprocess_cli_args_expands_report_invalid_segments() {
185        let processed = preprocess_cli_args(vec![
186            OsString::from("csv-managed"),
187            OsString::from("--report-invalid:stats:counts"),
188            OsString::from("--dry-run"),
189        ]);
190        let tokens = to_strings(&processed);
191        assert_eq!(
192            tokens,
193            vec![
194                "csv-managed",
195                "--report-invalid",
196                "stats",
197                "counts",
198                "--dry-run",
199            ]
200        );
201    }
202
203    proptest! {
204        #[test]
205        fn preprocess_cli_args_splits_report_invalid_segments_prop(
206            segments in proptest::collection::vec("[A-Za-z0-9_-]{1,8}", 1..5)
207        ) {
208            let mut arg = String::from("--report-invalid");
209            for segment in &segments {
210                arg.push(':');
211                arg.push_str(segment);
212            }
213            let processed = preprocess_cli_args(vec![
214                OsString::from("csv-managed"),
215                OsString::from(arg),
216            ]);
217            let tokens = to_strings(&processed);
218            prop_assert_eq!(tokens[0].as_str(), "csv-managed");
219            prop_assert_eq!(tokens[1].as_str(), "--report-invalid");
220            for (idx, segment) in segments.iter().enumerate() {
221                prop_assert_eq!(tokens[idx + 2].as_str(), segment.as_str());
222            }
223        }
224    }
225}