1pub mod append;
2pub mod cli;
3pub mod columns;
4pub mod data;
5pub mod derive;
6pub mod filter;
7pub mod frequency;
8pub mod index;
9pub mod install;
10pub mod io_utils;
11pub mod join;
12pub mod preview;
13pub mod process;
14pub mod schema;
15pub mod schema_cmd;
16pub mod stats;
17pub mod table;
18pub mod verify;
19
20use std::{env, ffi::OsString, sync::OnceLock, time::Instant};
21
22use anyhow::{Context, Result};
23use chrono::{SecondsFormat, Utc};
24use clap::Parser;
25use log::{LevelFilter, debug, error, info};
26
27use crate::cli::{Cli, Commands};
28
29static LOGGER: OnceLock<()> = OnceLock::new();
30
31fn init_logging() {
32 LOGGER.get_or_init(|| {
33 let mut builder = env_logger::Builder::from_env(env_logger::Env::default());
34 if env::var("RUST_LOG").is_err() {
35 builder.filter_module("csv_managed", LevelFilter::Info);
36 }
37 let _ = builder.format_timestamp_millis().try_init();
38 });
39}
40
41pub fn run() -> Result<()> {
42 init_logging();
43 let cli = Cli::parse_from(preprocess_cli_args(env::args_os()));
44 match cli.command {
45 Commands::Probe(args) => run_operation("probe", || handle_probe(&args)),
46 Commands::Index(args) => run_operation("index", || handle_index(&args)),
47 Commands::Schema(args) => run_operation("schema", || schema_cmd::execute(&args)),
48 Commands::Process(args) => run_operation("process", || process::execute(&args)),
49 Commands::Append(args) => run_operation("append", || append::execute(&args)),
50 Commands::Verify(args) => run_operation("verify", || verify::execute(&args)),
51 Commands::Preview(args) => run_operation("preview", || preview::execute(&args)),
52 Commands::Stats(args) => run_operation("stats", || stats::execute(&args)),
53 Commands::Frequency(args) => run_operation("frequency", || frequency::execute(&args)),
54 Commands::Join(args) => run_operation("join", || join::execute(&args)),
55 Commands::Install(args) => run_operation("install", || install::execute(&args)),
56 Commands::Columns(args) => run_operation("columns", || columns::execute(&args)),
57 }
58}
59
60fn preprocess_cli_args<I>(args: I) -> Vec<OsString>
61where
62 I: IntoIterator<Item = OsString>,
63{
64 let mut processed = Vec::new();
65 for arg in args {
66 if let Some(value) = arg.to_str()
67 && let Some(rest) = value.strip_prefix("--report-invalid:")
68 {
69 processed.push(OsString::from("--report-invalid"));
70 for segment in rest.split(':').filter(|segment| !segment.is_empty()) {
71 processed.push(OsString::from(segment));
72 }
73 continue;
74 }
75 processed.push(arg);
76 }
77 processed
78}
79
80fn run_operation<F>(name: &str, op: F) -> Result<()>
81where
82 F: FnOnce() -> Result<()>,
83{
84 let start_clock = Utc::now();
85 let start_instant = Instant::now();
86 let result = op();
87 let end_clock = Utc::now();
88 let duration_secs = start_instant.elapsed().as_secs_f64();
89 let start_str = start_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
90 let end_str = end_clock.to_rfc3339_opts(SecondsFormat::Millis, true);
91
92 match &result {
93 Ok(_) => info!(
94 "Operation '{name}' completed (status=ok)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}"
95 ),
96 Err(err) => error!(
97 "Operation '{name}' failed (status=error)\nstart: {start_str}\nend: {end_str}\nduration_secs: {duration_secs:.3}\nerror: {err:?}"
98 ),
99 }
100
101 result
102}
103
104fn handle_probe(args: &cli::ProbeArgs) -> Result<()> {
105 let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
106 let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
107 info!(
108 "Probing '{}' with delimiter '{}'",
109 args.input.display(),
110 printable_delimiter(delimiter)
111 );
112 let mut schema = schema::infer_schema(&args.input, args.sample_rows, delimiter, encoding)
113 .with_context(|| format!("Inferring schema from {input:?}", input = args.input))?;
114 if args.mapping {
115 apply_default_name_mappings(&mut schema);
116 }
117 if args.replace_template {
118 schema
119 .save_with_replace_template(&args.schema)
120 .with_context(|| format!("Writing schema to {:?}", args.schema))?;
121 } else {
122 schema
123 .save(&args.schema)
124 .with_context(|| format!("Writing schema to {:?}", args.schema))?;
125 }
126 info!(
127 "Inferred schema for {} column(s) written to {:?}",
128 schema.columns.len(),
129 args.schema
130 );
131
132 if args.mapping {
133 emit_mappings(&schema);
134 }
135 Ok(())
136}
137
138fn apply_default_name_mappings(schema: &mut schema::Schema) {
139 for column in &mut schema.columns {
140 if column.rename.is_none() {
141 column.rename = Some(to_lower_snake_case(&column.name));
142 }
143 }
144}
145
146fn to_lower_snake_case(value: &str) -> String {
147 let mut result = String::new();
148 let mut chars = value.chars().peekable();
149 let mut last_was_separator = true;
150 let mut last_was_upper = false;
151 while let Some(ch) = chars.next() {
152 if ch.is_ascii_alphanumeric() {
153 if ch.is_ascii_uppercase() {
154 let next_is_lowercase = chars
155 .peek()
156 .map(|c| c.is_ascii_lowercase())
157 .unwrap_or(false);
158 if !result.is_empty()
159 && (!last_was_separator && (!last_was_upper || next_is_lowercase))
160 && !result.ends_with('_')
161 {
162 result.push('_');
163 }
164 result.push(ch.to_ascii_lowercase());
165 last_was_separator = false;
166 last_was_upper = true;
167 } else {
168 if !result.is_empty() && last_was_separator && !result.ends_with('_') {
169 result.push('_');
170 }
171 result.push(ch.to_ascii_lowercase());
172 last_was_separator = false;
173 last_was_upper = false;
174 }
175 } else {
176 if !result.ends_with('_') && !result.is_empty() {
177 result.push('_');
178 }
179 last_was_separator = true;
180 last_was_upper = false;
181 }
182 }
183 while result.ends_with('_') {
184 result.pop();
185 }
186 if result.is_empty() {
187 value.to_ascii_lowercase()
188 } else {
189 result
190 }
191}
192
193fn emit_mappings(schema: &schema::Schema) {
194 if schema.columns.is_empty() {
195 println!("No columns found to emit mappings.");
196 return;
197 }
198 let mut rows = Vec::with_capacity(schema.columns.len());
199 for (idx, column) in schema.columns.iter().enumerate() {
200 let mapping = format!("{}:{}->", column.name, column.datatype.as_str());
201 rows.push(vec![
202 (idx + 1).to_string(),
203 column.name.clone(),
204 column.datatype.to_string(),
205 mapping,
206 ]);
207 }
208 let headers = vec![
209 "#".to_string(),
210 "name".to_string(),
211 "type".to_string(),
212 "mapping".to_string(),
213 ];
214 table::print_table(&headers, &rows);
215}
216
217fn handle_index(args: &cli::IndexArgs) -> Result<()> {
218 let delimiter = io_utils::resolve_input_delimiter(&args.input, args.delimiter);
219 let encoding = io_utils::resolve_encoding(args.input_encoding.as_deref())?;
220 info!(
221 "Building index for '{}' using delimiter '{}'",
222 args.input.display(),
223 printable_delimiter(delimiter)
224 );
225 let schema = match &args.schema {
226 Some(path) => Some(
227 schema::Schema::load(path).with_context(|| format!("Loading schema from {path:?}"))?,
228 ),
229 None => None,
230 };
231 let mut definitions = Vec::new();
232 for spec in &args.specs {
233 let definition = index::IndexDefinition::parse(spec)
234 .with_context(|| format!("Parsing index specification '{spec}'"))?;
235 definitions.push(definition);
236 }
237 for combo in &args.combos {
238 let expanded = index::IndexDefinition::expand_combo_spec(combo)
239 .with_context(|| format!("Parsing index combination '{combo}'"))?;
240 definitions.extend(expanded);
241 }
242 if definitions.is_empty() {
243 let columns = args
244 .columns
245 .iter()
246 .map(|c| c.trim())
247 .filter(|c| !c.is_empty())
248 .map(|c| c.to_string())
249 .collect::<Vec<_>>();
250 let definition = index::IndexDefinition::from_columns(columns)
251 .context("Parsing --columns for index build")?;
252 definitions.push(definition);
253 }
254 debug!("Index definitions: {:?}", definitions.len());
255 let index = index::CsvIndex::build(
256 &args.input,
257 &definitions,
258 schema.as_ref(),
259 args.limit,
260 delimiter,
261 encoding,
262 )
263 .with_context(|| format!("Building index for {:?}", args.input))?;
264 let row_count = index.row_count();
265 index
266 .save(&args.index)
267 .with_context(|| format!("Writing index to {:?}", args.index))?;
268 info!(
269 "Index with {} variant(s) for {} row(s) written to {:?}",
270 index.variants().len(),
271 row_count,
272 args.index
273 );
274 for variant in index.variants() {
275 info!(" • {}", variant.describe());
276 }
277 Ok(())
278}
279
280pub(crate) fn printable_delimiter(delimiter: u8) -> String {
281 match delimiter {
282 b',' => ",".to_string(),
283 b'\t' => "\\t".to_string(),
284 b'\n' => "\\n".to_string(),
285 other => (other as char).to_string(),
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::to_lower_snake_case;
292
293 #[test]
294 fn converts_camel_case_to_snake() {
295 assert_eq!(to_lower_snake_case("OrderDate"), "order_date");
296 }
297
298 #[test]
299 fn collapses_separators() {
300 assert_eq!(to_lower_snake_case("customer-name"), "customer_name");
301 assert_eq!(to_lower_snake_case("customer name"), "customer_name");
302 }
303
304 #[test]
305 fn handles_acronyms() {
306 assert_eq!(to_lower_snake_case("APIKey"), "api_key");
307 assert_eq!(to_lower_snake_case("HTTPStatus"), "http_status");
308 }
309}