Skip to main content

floe_core/io/read/
csv.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use polars::prelude::{
5    col, DataFrame, DataType, LazyCsvReader, LazyFileListReader, PlPath, Schema, SerReader,
6};
7
8use crate::errors::{IoError, RunError};
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct DelimitedInputAdapter {
13    format: &'static str,
14}
15
16static CSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "csv" };
17static TSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "tsv" };
18
19pub(crate) fn csv_input_adapter() -> &'static dyn InputAdapter {
20    &CSV_INPUT_ADAPTER
21}
22
23pub(crate) fn tsv_input_adapter() -> &'static dyn InputAdapter {
24    &TSV_INPUT_ADAPTER
25}
26#[derive(Debug, Clone)]
27pub struct CsvReadPlan {
28    pub schema: Schema,
29    pub ignore_errors: bool,
30}
31
32impl CsvReadPlan {
33    pub fn strict(schema: Schema) -> Self {
34        Self {
35            schema,
36            ignore_errors: false,
37        }
38    }
39}
40
41pub fn read_csv_file(
42    input_path: &Path,
43    source_options: &config::SourceOptions,
44    plan: &CsvReadPlan,
45) -> FloeResult<DataFrame> {
46    read_csv_lazy(
47        input_path,
48        source_options,
49        &plan.schema,
50        plan.ignore_errors,
51        None,
52    )
53}
54
55pub fn read_csv_header(
56    input_path: &Path,
57    source_options: &config::SourceOptions,
58    n_rows: Option<usize>,
59) -> FloeResult<Vec<String>> {
60    let read_options = source_options
61        .to_csv_read_options(input_path)?
62        .with_n_rows(n_rows);
63    let reader = read_options
64        .try_into_reader_with_file_path(None)
65        .map_err(|err| {
66            Box::new(IoError(format!(
67                "failed to open csv at {}: {err}",
68                input_path.display()
69            ))) as Box<dyn std::error::Error + Send + Sync>
70        })?;
71    let df = reader.finish().map_err(|err| {
72        Box::new(IoError(format!("csv header read failed: {err}")))
73            as Box<dyn std::error::Error + Send + Sync>
74    })?;
75    Ok(df
76        .get_column_names()
77        .iter()
78        .map(|name| name.to_string())
79        .collect())
80}
81
82impl InputAdapter for DelimitedInputAdapter {
83    fn format(&self) -> &'static str {
84        self.format
85    }
86
87    fn read_input_columns(
88        &self,
89        entity: &config::EntityConfig,
90        input_file: &InputFile,
91        columns: &[config::ColumnConfig],
92    ) -> Result<Vec<String>, FileReadError> {
93        let default_options = config::SourceOptions::defaults_for_format(self.format);
94        let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
95        resolve_input_columns(&input_file.source_local_path, source_options, columns).map_err(
96            |err| FileReadError {
97                rule: format!("{}_read_error", self.format),
98                message: err.to_string(),
99            },
100        )
101    }
102
103    fn read_inputs(
104        &self,
105        entity: &config::EntityConfig,
106        files: &[InputFile],
107        columns: &[config::ColumnConfig],
108        normalize_strategy: Option<&str>,
109        collect_raw: bool,
110    ) -> FloeResult<Vec<ReadInput>> {
111        let default_options = config::SourceOptions::defaults_for_format(self.format);
112        let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
113        let mut inputs = Vec::with_capacity(files.len());
114        for input_file in files {
115            let path = &input_file.source_local_path;
116            let input_columns = resolve_input_columns(path, source_options, columns)?;
117            let typed_projection = if collect_raw {
118                projected_columns(&input_columns, columns)
119            } else {
120                None
121            };
122            let typed_schema =
123                format::build_typed_schema(input_columns.as_slice(), columns, normalize_strategy)?;
124            let input = if collect_raw {
125                let raw_schema = build_raw_schema(&input_columns);
126                let raw_plan = CsvReadPlan::strict(raw_schema);
127                let raw_df = read_csv_file(path, source_options, &raw_plan)?;
128                let mut typed_df = format::cast_df_to_schema(&raw_df, &typed_schema)?;
129                if let Some(projection) = typed_projection.as_ref() {
130                    typed_df = typed_df.select(projection).map_err(|err| {
131                        Box::new(RunError(format!("failed to project typed columns: {err}")))
132                    })?;
133                }
134                format::finalize_read_input(input_file, Some(raw_df), typed_df, normalize_strategy)?
135            } else {
136                let typed_plan = CsvReadPlan {
137                    schema: typed_schema,
138                    ignore_errors: true,
139                };
140                let typed_df = read_csv_lazy(path, source_options, &typed_plan.schema, true, None)?;
141                format::finalize_read_input(input_file, None, typed_df, normalize_strategy)?
142            };
143            inputs.push(input);
144        }
145        Ok(inputs)
146    }
147}
148
149fn resolve_input_columns(
150    path: &Path,
151    source_options: &config::SourceOptions,
152    declared_columns: &[config::ColumnConfig],
153) -> FloeResult<Vec<String>> {
154    let header = source_options.header.unwrap_or(true);
155    if header {
156        return read_csv_header(path, source_options, Some(0));
157    }
158
159    let input_columns = read_csv_header(path, source_options, Some(1))?;
160    let declared_names = declared_columns
161        .iter()
162        .map(|column| column.name.clone())
163        .collect::<Vec<_>>();
164    Ok(headless_columns(&declared_names, input_columns.len()))
165}
166
167fn headless_columns(declared_names: &[String], input_count: usize) -> Vec<String> {
168    let mut names = declared_names
169        .iter()
170        .take(input_count)
171        .cloned()
172        .collect::<Vec<_>>();
173    if input_count > declared_names.len() {
174        for index in declared_names.len()..input_count {
175            names.push(format!("extra_column_{}", index + 1));
176        }
177    }
178    names
179}
180
181fn build_raw_schema(columns: &[String]) -> Schema {
182    let mut schema = Schema::with_capacity(columns.len());
183    for name in columns {
184        schema.insert(name.as_str().into(), DataType::String);
185    }
186    schema
187}
188
189fn read_csv_lazy(
190    input_path: &Path,
191    source_options: &config::SourceOptions,
192    schema: &Schema,
193    ignore_errors: bool,
194    projection: Option<&[String]>,
195) -> FloeResult<DataFrame> {
196    let header = source_options.header.unwrap_or(true);
197    let parse_options = source_options.to_csv_parse_options()?;
198    let path_str = input_path.to_string_lossy();
199    let mut reader = LazyCsvReader::new(PlPath::new(path_str.as_ref()))
200        .with_has_header(header)
201        .with_schema(Some(Arc::new(schema.clone())))
202        .with_ignore_errors(ignore_errors)
203        .map_parse_options(|_| parse_options.clone());
204
205    if let Some(chunk_size) = csv_chunk_size_for_path(input_path) {
206        reader = reader.with_chunk_size(chunk_size);
207    }
208
209    let mut lf = reader.finish().map_err(|err| {
210        Box::new(IoError(format!(
211            "failed to scan csv at {}: {err}",
212            input_path.display()
213        ))) as Box<dyn std::error::Error + Send + Sync>
214    })?;
215
216    if let Some(columns) = projection {
217        let exprs = columns.iter().map(col).collect::<Vec<_>>();
218        lf = lf.select(exprs);
219    }
220
221    lf.collect().map_err(|err| {
222        Box::new(IoError(format!("csv read failed: {err}")))
223            as Box<dyn std::error::Error + Send + Sync>
224    })
225}
226
227fn projected_columns(
228    input_columns: &[String],
229    declared_columns: &[config::ColumnConfig],
230) -> Option<Vec<String>> {
231    let declared = declared_columns
232        .iter()
233        .map(|column| column.name.as_str())
234        .collect::<std::collections::HashSet<_>>();
235    let projected = input_columns
236        .iter()
237        .filter(|name| declared.contains(name.as_str()))
238        .cloned()
239        .collect::<Vec<_>>();
240    if projected.is_empty() || projected.len() == input_columns.len() {
241        None
242    } else {
243        Some(projected)
244    }
245}
246
247fn csv_chunk_size_for_path(path: &Path) -> Option<usize> {
248    let metadata = std::fs::metadata(path).ok()?;
249    let size = metadata.len();
250    if size >= 64 * 1024 * 1024 {
251        Some(50_000)
252    } else {
253        None
254    }
255}