1use std::path::Path;
2use std::sync::Arc;
3
4use polars::prelude::{
5 col, DataFrame, DataType, LazyCsvReader, LazyFileListReader, PlPath, Schema, SerReader,
6};
7
8use crate::errors::{IoError, RunError};
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct DelimitedInputAdapter {
13 format: &'static str,
14}
15
16static CSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "csv" };
17static TSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "tsv" };
18
19pub(crate) fn csv_input_adapter() -> &'static dyn InputAdapter {
20 &CSV_INPUT_ADAPTER
21}
22
23pub(crate) fn tsv_input_adapter() -> &'static dyn InputAdapter {
24 &TSV_INPUT_ADAPTER
25}
26#[derive(Debug, Clone)]
27pub struct CsvReadPlan {
28 pub schema: Schema,
29 pub ignore_errors: bool,
30}
31
32impl CsvReadPlan {
33 pub fn strict(schema: Schema) -> Self {
34 Self {
35 schema,
36 ignore_errors: false,
37 }
38 }
39}
40
41pub fn read_csv_file(
42 input_path: &Path,
43 source_options: &config::SourceOptions,
44 plan: &CsvReadPlan,
45) -> FloeResult<DataFrame> {
46 read_csv_lazy(
47 input_path,
48 source_options,
49 &plan.schema,
50 plan.ignore_errors,
51 None,
52 )
53}
54
55pub fn read_csv_header(
56 input_path: &Path,
57 source_options: &config::SourceOptions,
58 n_rows: Option<usize>,
59) -> FloeResult<Vec<String>> {
60 let read_options = source_options
61 .to_csv_read_options(input_path)?
62 .with_n_rows(n_rows);
63 let reader = read_options
64 .try_into_reader_with_file_path(None)
65 .map_err(|err| {
66 Box::new(IoError(format!(
67 "failed to open csv at {}: {err}",
68 input_path.display()
69 ))) as Box<dyn std::error::Error + Send + Sync>
70 })?;
71 let df = reader.finish().map_err(|err| {
72 Box::new(IoError(format!("csv header read failed: {err}")))
73 as Box<dyn std::error::Error + Send + Sync>
74 })?;
75 Ok(df
76 .get_column_names()
77 .iter()
78 .map(|name| name.to_string())
79 .collect())
80}
81
82impl InputAdapter for DelimitedInputAdapter {
83 fn format(&self) -> &'static str {
84 self.format
85 }
86
87 fn read_input_columns(
88 &self,
89 entity: &config::EntityConfig,
90 input_file: &InputFile,
91 columns: &[config::ColumnConfig],
92 ) -> Result<Vec<String>, FileReadError> {
93 let default_options = config::SourceOptions::defaults_for_format(self.format);
94 let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
95 resolve_input_columns(&input_file.source_local_path, source_options, columns).map_err(
96 |err| FileReadError {
97 rule: format!("{}_read_error", self.format),
98 message: err.to_string(),
99 },
100 )
101 }
102
103 fn read_inputs(
104 &self,
105 entity: &config::EntityConfig,
106 files: &[InputFile],
107 columns: &[config::ColumnConfig],
108 normalize_strategy: Option<&str>,
109 collect_raw: bool,
110 ) -> FloeResult<Vec<ReadInput>> {
111 let default_options = config::SourceOptions::defaults_for_format(self.format);
112 let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
113 let mut inputs = Vec::with_capacity(files.len());
114 for input_file in files {
115 let path = &input_file.source_local_path;
116 let input_columns = resolve_input_columns(path, source_options, columns)?;
117 let typed_projection = if collect_raw {
118 projected_columns(&input_columns, columns)
119 } else {
120 None
121 };
122 let typed_schema =
123 format::build_typed_schema(input_columns.as_slice(), columns, normalize_strategy)?;
124 let input = if collect_raw {
125 let raw_schema = build_raw_schema(&input_columns);
126 let raw_plan = CsvReadPlan::strict(raw_schema);
127 let raw_df = read_csv_file(path, source_options, &raw_plan)?;
128 let mut typed_df = format::cast_df_to_schema(&raw_df, &typed_schema)?;
129 if let Some(projection) = typed_projection.as_ref() {
130 typed_df = typed_df.select(projection).map_err(|err| {
131 Box::new(RunError(format!("failed to project typed columns: {err}")))
132 })?;
133 }
134 format::finalize_read_input(input_file, Some(raw_df), typed_df, normalize_strategy)?
135 } else {
136 let typed_plan = CsvReadPlan {
137 schema: typed_schema,
138 ignore_errors: true,
139 };
140 let typed_df = read_csv_lazy(path, source_options, &typed_plan.schema, true, None)?;
141 format::finalize_read_input(input_file, None, typed_df, normalize_strategy)?
142 };
143 inputs.push(input);
144 }
145 Ok(inputs)
146 }
147}
148
149fn resolve_input_columns(
150 path: &Path,
151 source_options: &config::SourceOptions,
152 declared_columns: &[config::ColumnConfig],
153) -> FloeResult<Vec<String>> {
154 let header = source_options.header.unwrap_or(true);
155 if header {
156 return read_csv_header(path, source_options, Some(0));
157 }
158
159 let input_columns = read_csv_header(path, source_options, Some(1))?;
160 let declared_names = declared_columns
161 .iter()
162 .map(|column| column.name.clone())
163 .collect::<Vec<_>>();
164 Ok(headless_columns(&declared_names, input_columns.len()))
165}
166
167fn headless_columns(declared_names: &[String], input_count: usize) -> Vec<String> {
168 let mut names = declared_names
169 .iter()
170 .take(input_count)
171 .cloned()
172 .collect::<Vec<_>>();
173 if input_count > declared_names.len() {
174 for index in declared_names.len()..input_count {
175 names.push(format!("extra_column_{}", index + 1));
176 }
177 }
178 names
179}
180
181fn build_raw_schema(columns: &[String]) -> Schema {
182 let mut schema = Schema::with_capacity(columns.len());
183 for name in columns {
184 schema.insert(name.as_str().into(), DataType::String);
185 }
186 schema
187}
188
189fn read_csv_lazy(
190 input_path: &Path,
191 source_options: &config::SourceOptions,
192 schema: &Schema,
193 ignore_errors: bool,
194 projection: Option<&[String]>,
195) -> FloeResult<DataFrame> {
196 let header = source_options.header.unwrap_or(true);
197 let parse_options = source_options.to_csv_parse_options()?;
198 let path_str = input_path.to_string_lossy();
199 let mut reader = LazyCsvReader::new(PlPath::new(path_str.as_ref()))
200 .with_has_header(header)
201 .with_schema(Some(Arc::new(schema.clone())))
202 .with_ignore_errors(ignore_errors)
203 .map_parse_options(|_| parse_options.clone());
204
205 if let Some(chunk_size) = csv_chunk_size_for_path(input_path) {
206 reader = reader.with_chunk_size(chunk_size);
207 }
208
209 let mut lf = reader.finish().map_err(|err| {
210 Box::new(IoError(format!(
211 "failed to scan csv at {}: {err}",
212 input_path.display()
213 ))) as Box<dyn std::error::Error + Send + Sync>
214 })?;
215
216 if let Some(columns) = projection {
217 let exprs = columns.iter().map(col).collect::<Vec<_>>();
218 lf = lf.select(exprs);
219 }
220
221 lf.collect().map_err(|err| {
222 Box::new(IoError(format!("csv read failed: {err}")))
223 as Box<dyn std::error::Error + Send + Sync>
224 })
225}
226
227fn projected_columns(
228 input_columns: &[String],
229 declared_columns: &[config::ColumnConfig],
230) -> Option<Vec<String>> {
231 let declared = declared_columns
232 .iter()
233 .map(|column| column.name.as_str())
234 .collect::<std::collections::HashSet<_>>();
235 let projected = input_columns
236 .iter()
237 .filter(|name| declared.contains(name.as_str()))
238 .cloned()
239 .collect::<Vec<_>>();
240 if projected.is_empty() || projected.len() == input_columns.len() {
241 None
242 } else {
243 Some(projected)
244 }
245}
246
247fn csv_chunk_size_for_path(path: &Path) -> Option<usize> {
248 let metadata = std::fs::metadata(path).ok()?;
249 let size = metadata.len();
250 if size >= 64 * 1024 * 1024 {
251 Some(50_000)
252 } else {
253 None
254 }
255}