1use std::path::Path;
2use std::sync::Arc;
3
4use polars::prelude::{
5 col, DataFrame, DataType, LazyCsvReader, LazyFileListReader, PlPath, Schema, SerReader,
6};
7
8use crate::errors::{IoError, RunError};
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct CsvInputAdapter;
13
14static CSV_INPUT_ADAPTER: CsvInputAdapter = CsvInputAdapter;
15
16pub(crate) fn csv_input_adapter() -> &'static dyn InputAdapter {
17 &CSV_INPUT_ADAPTER
18}
19
20#[derive(Debug, Clone)]
21pub struct CsvReadPlan {
22 pub schema: Schema,
23 pub ignore_errors: bool,
24}
25
26impl CsvReadPlan {
27 pub fn strict(schema: Schema) -> Self {
28 Self {
29 schema,
30 ignore_errors: false,
31 }
32 }
33}
34
35pub fn read_csv_file(
36 input_path: &Path,
37 source_options: &config::SourceOptions,
38 plan: &CsvReadPlan,
39) -> FloeResult<DataFrame> {
40 read_csv_lazy(
41 input_path,
42 source_options,
43 &plan.schema,
44 plan.ignore_errors,
45 None,
46 )
47}
48
49pub fn read_csv_header(
50 input_path: &Path,
51 source_options: &config::SourceOptions,
52 n_rows: Option<usize>,
53) -> FloeResult<Vec<String>> {
54 let read_options = source_options
55 .to_csv_read_options(input_path)?
56 .with_n_rows(n_rows);
57 let reader = read_options
58 .try_into_reader_with_file_path(None)
59 .map_err(|err| {
60 Box::new(IoError(format!(
61 "failed to open csv at {}: {err}",
62 input_path.display()
63 ))) as Box<dyn std::error::Error + Send + Sync>
64 })?;
65 let df = reader.finish().map_err(|err| {
66 Box::new(IoError(format!("csv header read failed: {err}")))
67 as Box<dyn std::error::Error + Send + Sync>
68 })?;
69 Ok(df
70 .get_column_names()
71 .iter()
72 .map(|name| name.to_string())
73 .collect())
74}
75
76impl InputAdapter for CsvInputAdapter {
77 fn format(&self) -> &'static str {
78 "csv"
79 }
80
81 fn read_input_columns(
82 &self,
83 entity: &config::EntityConfig,
84 input_file: &InputFile,
85 columns: &[config::ColumnConfig],
86 ) -> Result<Vec<String>, FileReadError> {
87 let default_options = config::SourceOptions::default();
88 let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
89 resolve_input_columns(&input_file.source_local_path, source_options, columns).map_err(
90 |err| FileReadError {
91 rule: "csv_read_error".to_string(),
92 message: err.to_string(),
93 },
94 )
95 }
96
97 fn read_inputs(
98 &self,
99 entity: &config::EntityConfig,
100 files: &[InputFile],
101 columns: &[config::ColumnConfig],
102 normalize_strategy: Option<&str>,
103 collect_raw: bool,
104 ) -> FloeResult<Vec<ReadInput>> {
105 let default_options = config::SourceOptions::default();
106 let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
107 let mut inputs = Vec::with_capacity(files.len());
108 for input_file in files {
109 let path = &input_file.source_local_path;
110 let input_columns = resolve_input_columns(path, source_options, columns)?;
111 let typed_projection = if collect_raw {
112 projected_columns(&input_columns, columns)
113 } else {
114 None
115 };
116 let typed_schema =
117 format::build_typed_schema(input_columns.as_slice(), columns, normalize_strategy)?;
118 let input = if collect_raw {
119 let raw_schema = build_raw_schema(&input_columns);
120 let raw_plan = CsvReadPlan::strict(raw_schema);
121 let raw_df = read_csv_file(path, source_options, &raw_plan)?;
122 let mut typed_df = format::cast_df_to_schema(&raw_df, &typed_schema)?;
123 if let Some(projection) = typed_projection.as_ref() {
124 typed_df = typed_df.select(projection).map_err(|err| {
125 Box::new(RunError(format!("failed to project typed columns: {err}")))
126 })?;
127 }
128 format::finalize_read_input(input_file, Some(raw_df), typed_df, normalize_strategy)?
129 } else {
130 let typed_plan = CsvReadPlan {
131 schema: typed_schema,
132 ignore_errors: true,
133 };
134 let typed_df = read_csv_lazy(path, source_options, &typed_plan.schema, true, None)?;
135 format::finalize_read_input(input_file, None, typed_df, normalize_strategy)?
136 };
137 inputs.push(input);
138 }
139 Ok(inputs)
140 }
141}
142
143fn resolve_input_columns(
144 path: &Path,
145 source_options: &config::SourceOptions,
146 declared_columns: &[config::ColumnConfig],
147) -> FloeResult<Vec<String>> {
148 let header = source_options.header.unwrap_or(true);
149 if header {
150 return read_csv_header(path, source_options, Some(0));
151 }
152
153 let input_columns = read_csv_header(path, source_options, Some(1))?;
154 let declared_names = declared_columns
155 .iter()
156 .map(|column| column.name.clone())
157 .collect::<Vec<_>>();
158 Ok(headless_columns(&declared_names, input_columns.len()))
159}
160
161fn headless_columns(declared_names: &[String], input_count: usize) -> Vec<String> {
162 let mut names = declared_names
163 .iter()
164 .take(input_count)
165 .cloned()
166 .collect::<Vec<_>>();
167 if input_count > declared_names.len() {
168 for index in declared_names.len()..input_count {
169 names.push(format!("extra_column_{}", index + 1));
170 }
171 }
172 names
173}
174
175fn build_raw_schema(columns: &[String]) -> Schema {
176 let mut schema = Schema::with_capacity(columns.len());
177 for name in columns {
178 schema.insert(name.as_str().into(), DataType::String);
179 }
180 schema
181}
182
183fn read_csv_lazy(
184 input_path: &Path,
185 source_options: &config::SourceOptions,
186 schema: &Schema,
187 ignore_errors: bool,
188 projection: Option<&[String]>,
189) -> FloeResult<DataFrame> {
190 let header = source_options.header.unwrap_or(true);
191 let parse_options = source_options.to_csv_parse_options()?;
192 let path_str = input_path.to_string_lossy();
193 let mut reader = LazyCsvReader::new(PlPath::new(path_str.as_ref()))
194 .with_has_header(header)
195 .with_schema(Some(Arc::new(schema.clone())))
196 .with_ignore_errors(ignore_errors)
197 .map_parse_options(|_| parse_options.clone());
198
199 if let Some(chunk_size) = csv_chunk_size_for_path(input_path) {
200 reader = reader.with_chunk_size(chunk_size);
201 }
202
203 let mut lf = reader.finish().map_err(|err| {
204 Box::new(IoError(format!(
205 "failed to scan csv at {}: {err}",
206 input_path.display()
207 ))) as Box<dyn std::error::Error + Send + Sync>
208 })?;
209
210 if let Some(columns) = projection {
211 let exprs = columns.iter().map(col).collect::<Vec<_>>();
212 lf = lf.select(exprs);
213 }
214
215 lf.collect().map_err(|err| {
216 Box::new(IoError(format!("csv read failed: {err}")))
217 as Box<dyn std::error::Error + Send + Sync>
218 })
219}
220
221fn projected_columns(
222 input_columns: &[String],
223 declared_columns: &[config::ColumnConfig],
224) -> Option<Vec<String>> {
225 let declared = declared_columns
226 .iter()
227 .map(|column| column.name.as_str())
228 .collect::<std::collections::HashSet<_>>();
229 let projected = input_columns
230 .iter()
231 .filter(|name| declared.contains(name.as_str()))
232 .cloned()
233 .collect::<Vec<_>>();
234 if projected.is_empty() || projected.len() == input_columns.len() {
235 None
236 } else {
237 Some(projected)
238 }
239}
240
241fn csv_chunk_size_for_path(path: &Path) -> Option<usize> {
242 let metadata = std::fs::metadata(path).ok()?;
243 let size = metadata.len();
244 if size >= 64 * 1024 * 1024 {
245 Some(50_000)
246 } else {
247 None
248 }
249}