1use std::path::Path;
2use std::sync::Arc;
3
4use polars::prelude::{
5 col, DataFrame, DataType, LazyCsvReader, LazyFileListReader, PlPath, Schema, SerReader,
6};
7
8use crate::errors::{IoError, RunError};
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct DelimitedInputAdapter {
13 format: &'static str,
14}
15
16static CSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "csv" };
17static TSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "tsv" };
18
19pub(crate) fn csv_input_adapter() -> &'static dyn InputAdapter {
20 &CSV_INPUT_ADAPTER
21}
22
23pub(crate) fn tsv_input_adapter() -> &'static dyn InputAdapter {
24 &TSV_INPUT_ADAPTER
25}
26#[derive(Debug, Clone)]
27pub struct CsvReadPlan {
28 pub schema: Schema,
29 pub ignore_errors: bool,
30}
31
32impl CsvReadPlan {
33 pub fn strict(schema: Schema) -> Self {
34 Self {
35 schema,
36 ignore_errors: false,
37 }
38 }
39}
40
41pub fn read_csv_file(
42 input_path: &Path,
43 source_options: &config::SourceOptions,
44 plan: &CsvReadPlan,
45) -> FloeResult<DataFrame> {
46 read_csv_lazy(
47 input_path,
48 source_options,
49 &plan.schema,
50 plan.ignore_errors,
51 None,
52 )
53}
54
55pub fn read_csv_header(
56 input_path: &Path,
57 source_options: &config::SourceOptions,
58 n_rows: Option<usize>,
59) -> FloeResult<Vec<String>> {
60 let read_options = source_options
61 .to_csv_read_options(input_path)?
62 .with_n_rows(n_rows);
63 let reader = read_options
64 .try_into_reader_with_file_path(None)
65 .map_err(|err| {
66 Box::new(IoError(format!(
67 "failed to open csv at {}: {err}",
68 input_path.display()
69 ))) as Box<dyn std::error::Error + Send + Sync>
70 })?;
71 let df = reader.finish().map_err(|err| {
72 Box::new(IoError(format!("csv header read failed: {err}")))
73 as Box<dyn std::error::Error + Send + Sync>
74 })?;
75 Ok(df
76 .get_column_names()
77 .iter()
78 .map(|name| name.to_string())
79 .collect())
80}
81
82impl InputAdapter for DelimitedInputAdapter {
83 fn format(&self) -> &'static str {
84 self.format
85 }
86
87 fn read_input_columns(
88 &self,
89 entity: &config::EntityConfig,
90 input_file: &InputFile,
91 columns: &[config::ColumnConfig],
92 ) -> Result<Vec<String>, FileReadError> {
93 let default_options = config::SourceOptions::defaults_for_format(self.format);
94 let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
95 resolve_input_columns(&input_file.source_local_path, source_options, columns).map_err(
96 |err| FileReadError {
97 rule: format!("{}_read_error", self.format),
98 message: err.to_string(),
99 },
100 )
101 }
102
103 fn read_inputs(
104 &self,
105 entity: &config::EntityConfig,
106 files: &[InputFile],
107 columns: &[config::ColumnConfig],
108 normalize_strategy: Option<&str>,
109 collect_raw: bool,
110 ) -> FloeResult<Vec<ReadInput>> {
111 let default_options = config::SourceOptions::defaults_for_format(self.format);
112 let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
113 let mut inputs = Vec::with_capacity(files.len());
114 for input_file in files {
115 let input_columns =
116 resolve_input_columns(&input_file.source_local_path, source_options, columns)?;
117 inputs.push(read_csv_input_with_columns(
118 input_file,
119 source_options,
120 columns,
121 normalize_strategy,
122 collect_raw,
123 input_columns,
124 )?);
125 }
126 Ok(inputs)
127 }
128
129 fn read_inputs_with_prechecked_columns(
130 &self,
131 entity: &config::EntityConfig,
132 files: &[InputFile],
133 columns: &[config::ColumnConfig],
134 normalize_strategy: Option<&str>,
135 collect_raw: bool,
136 prechecked_input_columns: Option<&[String]>,
137 ) -> FloeResult<Vec<ReadInput>> {
138 if let (Some(prechecked), [input_file]) = (prechecked_input_columns, files) {
139 let default_options = config::SourceOptions::defaults_for_format(self.format);
140 let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
141 return Ok(vec![read_csv_input_with_columns(
142 input_file,
143 source_options,
144 columns,
145 normalize_strategy,
146 collect_raw,
147 prechecked.to_vec(),
148 )?]);
149 }
150 self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
151 }
152}
153
154fn read_csv_input_with_columns(
155 input_file: &InputFile,
156 source_options: &config::SourceOptions,
157 columns: &[config::ColumnConfig],
158 normalize_strategy: Option<&str>,
159 collect_raw: bool,
160 input_columns: Vec<String>,
161) -> FloeResult<ReadInput> {
162 let path = &input_file.source_local_path;
163 let typed_projection = if collect_raw {
164 projected_columns(&input_columns, columns)
165 } else {
166 None
167 };
168 let typed_schema =
169 format::build_typed_schema(input_columns.as_slice(), columns, normalize_strategy)?;
170 if collect_raw {
171 let raw_schema = build_raw_schema(&input_columns);
172 let raw_plan = CsvReadPlan::strict(raw_schema);
173 let raw_df = read_csv_file(path, source_options, &raw_plan)?;
174 let mut typed_df = format::cast_df_to_schema(&raw_df, &typed_schema)?;
175 if let Some(projection) = typed_projection.as_ref() {
176 typed_df = typed_df.select(projection).map_err(|err| {
177 Box::new(RunError(format!("failed to project typed columns: {err}")))
178 })?;
179 }
180 return format::finalize_read_input(input_file, Some(raw_df), typed_df, normalize_strategy);
181 }
182
183 let typed_plan = CsvReadPlan {
184 schema: typed_schema,
185 ignore_errors: true,
186 };
187 let typed_df = read_csv_lazy(path, source_options, &typed_plan.schema, true, None)?;
188 format::finalize_read_input(input_file, None, typed_df, normalize_strategy)
189}
190
191fn resolve_input_columns(
192 path: &Path,
193 source_options: &config::SourceOptions,
194 declared_columns: &[config::ColumnConfig],
195) -> FloeResult<Vec<String>> {
196 let header = source_options.header.unwrap_or(true);
197 if header {
198 return read_csv_header(path, source_options, Some(0));
199 }
200
201 let input_columns = read_csv_header(path, source_options, Some(1))?;
202 let declared_names = declared_columns
203 .iter()
204 .map(|column| column.name.clone())
205 .collect::<Vec<_>>();
206 Ok(headless_columns(&declared_names, input_columns.len()))
207}
208
209fn headless_columns(declared_names: &[String], input_count: usize) -> Vec<String> {
210 let mut names = declared_names
211 .iter()
212 .take(input_count)
213 .cloned()
214 .collect::<Vec<_>>();
215 if input_count > declared_names.len() {
216 for index in declared_names.len()..input_count {
217 names.push(format!("extra_column_{}", index + 1));
218 }
219 }
220 names
221}
222
223fn build_raw_schema(columns: &[String]) -> Schema {
224 let mut schema = Schema::with_capacity(columns.len());
225 for name in columns {
226 schema.insert(name.as_str().into(), DataType::String);
227 }
228 schema
229}
230
231fn read_csv_lazy(
232 input_path: &Path,
233 source_options: &config::SourceOptions,
234 schema: &Schema,
235 ignore_errors: bool,
236 projection: Option<&[String]>,
237) -> FloeResult<DataFrame> {
238 let header = source_options.header.unwrap_or(true);
239 let parse_options = source_options.to_csv_parse_options()?;
240 let path_str = input_path.to_string_lossy();
241 let mut reader = LazyCsvReader::new(PlPath::new(path_str.as_ref()))
242 .with_has_header(header)
243 .with_schema(Some(Arc::new(schema.clone())))
244 .with_ignore_errors(ignore_errors)
245 .map_parse_options(|_| parse_options.clone());
246
247 if let Some(chunk_size) = csv_chunk_size_for_path(input_path) {
248 reader = reader.with_chunk_size(chunk_size);
249 }
250
251 let mut lf = reader.finish().map_err(|err| {
252 Box::new(IoError(format!(
253 "failed to scan csv at {}: {err}",
254 input_path.display()
255 ))) as Box<dyn std::error::Error + Send + Sync>
256 })?;
257
258 if let Some(columns) = projection {
259 let exprs = columns.iter().map(col).collect::<Vec<_>>();
260 lf = lf.select(exprs);
261 }
262
263 lf.collect().map_err(|err| {
264 Box::new(IoError(format!("csv read failed: {err}")))
265 as Box<dyn std::error::Error + Send + Sync>
266 })
267}
268
269fn projected_columns(
270 input_columns: &[String],
271 declared_columns: &[config::ColumnConfig],
272) -> Option<Vec<String>> {
273 let declared = declared_columns
274 .iter()
275 .map(|column| column.name.as_str())
276 .collect::<std::collections::HashSet<_>>();
277 let projected = input_columns
278 .iter()
279 .filter(|name| declared.contains(name.as_str()))
280 .cloned()
281 .collect::<Vec<_>>();
282 if projected.is_empty() || projected.len() == input_columns.len() {
283 None
284 } else {
285 Some(projected)
286 }
287}
288
289fn csv_chunk_size_for_path(path: &Path) -> Option<usize> {
290 let metadata = std::fs::metadata(path).ok()?;
291 let size = metadata.len();
292 if size >= 64 * 1024 * 1024 {
293 Some(50_000)
294 } else {
295 None
296 }
297}