1pub mod append;
2pub mod cli;
3pub mod columns;
4pub mod data;
5pub mod derive;
6pub mod expr;
7pub mod filter;
8pub mod frequency;
9pub mod index;
10pub mod install;
11pub mod io_utils;
12pub mod join;
13pub mod process;
14pub mod rows;
15pub mod schema;
16pub mod schema_cmd;
17pub mod stats;
18pub mod table;
19pub mod verify;
20
21use std::{env, ffi::OsString, sync::OnceLock, time::Instant};
22
23use anyhow::{Context, Result};
24use chrono::{SecondsFormat, Utc};
25use clap::Parser;
26use log::{LevelFilter, debug, error, info};
27
28use crate::cli::{Cli, Commands};
29
30static LOGGER: OnceLock<()> = OnceLock::new();
31
32fn init_logging() {
33 LOGGER.get_or_init(|| {
34 let mut builder = env_logger::Builder::from_env(env_logger::Env::default());
35 if env::var("RUST_LOG").is_err() {
36 builder.filter_module("csv_managed", LevelFilter::Info);
37 }
38 let _ = builder.format_timestamp_millis().try_init();
39 });
40}
41
42pub fn run() -> Result<()> {
43 init_logging();
44 let cli = Cli::parse_from(preprocess_cli_args(env::args_os()));
45 match cli.command {
46 Commands::Index(args) => run_operation("index", || handle_index(&args)),
47 Commands::Schema(args) => run_operation("schema", || schema_cmd::execute(&args)),
48 Commands::Process(args) => run_operation("process", || process::execute(&args)),
49 Commands::Append(args) => run_operation("append", || append::execute(&args)),
50 Commands::Stats(args) => run_operation("stats", || stats::execute(&args)),
51 Commands::Install(args) => run_operation("install", || install::execute(&args)),
53 }
54}
55
56fn preprocess_cli_args<I>(args: I) -> Vec<OsString>
57where
58 I: IntoIterator<Item = OsString>,
59{
60 let mut processed = Vec::new();
61 for arg in args {
62 if let Some(value) = arg.to_str()
63 && let Some(rest) = value.strip_prefix("--report-invalid:")
64 {
65 processed.push(OsString::from("--report-invalid"));
66 for segment in rest.split(':').filter(|segment| !segment.is_empty()) {
67 processed.push(OsString::from(segment));
68 }
69 continue;
70 }
71 processed.push(arg);
72 }
73 processed
74}
75
76fn run_operation<F>(name: &str, op: F) -> Result<()>
77where
78 F: FnOnce() -> Result<()>,
79{
80 let start_clock = Utc::now();
81 let start_instant = Instant::now();
82 let result = op();
83 let end_clock = Utc::now();
84 let duration_secs = start_instant.elapsed().as_secs_f64();
85 let start_str = start_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
86 let end_str = end_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
87
88 match &result {
89 Ok(_) => info!(
90 "Operation '{name}' completed (status=ok)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}"
91 ),
92 Err(err) => error!(
93 "Operation '{name}' failed (status=error)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}\nerror: {err:?}"
94 ),
95 }
96
97 result
98}
99
100fn handle_index(args: &cli::IndexArgs) -> Result<()> {
101 let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
102 let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
103 info!(
104 "Building index for '{}' using delimiter '{}'",
105 args.input.display(),
106 printable_delimiter(delimiter)
107 );
108 let schema = match &args.schema {
109 Some(path) => Some(
110 schema::Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?,
111 ),
112 None => None,
113 };
114 let mut definitions = Vec::new();
115 for spec in &args.specs {
116 let definition = index::IndexDefinition::parse(spec)
117 .with_context(|| format!("Parsing index specification '{spec}'"))?;
118 definitions.push(definition);
119 }
120 for covering in &args.coverings {
121 let expanded = index::IndexDefinition::expand_covering_spec(covering)
122 .with_context(|| format!("Parsing index covering '{covering}'"))?;
123 definitions.extend(expanded);
124 }
125 if definitions.is_empty() {
126 let columns = args
127 .columns
128 .iter()
129 .map(|c| c.trim())
130 .filter(|c| !c.is_empty())
131 .map(|c| c.to_string())
132 .collect::<Vec<_>>();
133 let definition = index::IndexDefinition::from_columns(columns)
134 .context("Parsing --columns for index build")?;
135 definitions.push(definition);
136 }
137 debug!("Index definitions: {:?}", definitions.len());
138 let index = index::CsvIndex::build(
139 &args.input,
140 &definitions,
141 schema.as_ref(),
142 args.limit,
143 delimiter,
144 encoding,
145 )
146 .with_context(|| format!("Building index for {:?}", args.input))?;
147 let row_count = index.row_count();
148 index
149 .save(&args.index)
150 .with_context(|| format!("Writing index to {:?}", args.index))?;
151 info!(
152 "Index with {} variant(s) for {} row(s) written to {:?}",
153 index.variants().len(),
154 row_count,
155 args.index
156 );
157 for variant in index.variants() {
158 info!(" • {}", variant.describe());
159 }
160 Ok(())
161}
162
163pub(crate) fn printable_delimiter(delimiter: u8) -> String {
164 match delimiter {
165 b',' => ",".to_string(),
166 b'\t' => "\\t".to_string(),
167 b'\n' => "\\n".to_string(),
168 other => (other as char).to_string(),
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use proptest::prelude::*;
176
177 fn to_strings(args: &[OsString]) -> Vec<String> {
178 args.iter()
179 .map(|value| value.to_string_lossy().to_string())
180 .collect()
181 }
182
183 #[test]
184 fn preprocess_cli_args_expands_report_invalid_segments() {
185 let processed = preprocess_cli_args(vec![
186 OsString::from("csv-managed"),
187 OsString::from("--report-invalid:stats:counts"),
188 OsString::from("--dry-run"),
189 ]);
190 let tokens = to_strings(&processed);
191 assert_eq!(
192 tokens,
193 vec![
194 "csv-managed",
195 "--report-invalid",
196 "stats",
197 "counts",
198 "--dry-run",
199 ]
200 );
201 }
202
203 proptest! {
204 #[test]
205 fn preprocess_cli_args_splits_report_invalid_segments_prop(
206 segments in proptest::collection::vec("[A-Za-z0-9_-]{1,8}", 1..5)
207 ) {
208 let mut arg = String::from("--report-invalid");
209 for segment in &segments {
210 arg.push(':');
211 arg.push_str(segment);
212 }
213 let processed = preprocess_cli_args(vec![
214 OsString::from("csv-managed"),
215 OsString::from(arg),
216 ]);
217 let tokens = to_strings(&processed);
218 prop_assert_eq!(tokens[0].as_str(), "csv-managed");
219 prop_assert_eq!(tokens[1].as_str(), "--report-invalid");
220 for (idx, segment) in segments.iter().enumerate() {
221 prop_assert_eq!(tokens[idx + 2].as_str(), segment.as_str());
222 }
223 }
224 }
225}