Skip to main content

floe_core/io/read/
csv.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use polars::prelude::{
5    col, DataFrame, DataType, LazyCsvReader, LazyFileListReader, PlPath, Schema, SerReader,
6};
7
8use crate::errors::{IoError, RunError};
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct DelimitedInputAdapter {
13    format: &'static str,
14}
15
16static CSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "csv" };
17static TSV_INPUT_ADAPTER: DelimitedInputAdapter = DelimitedInputAdapter { format: "tsv" };
18
19pub(crate) fn csv_input_adapter() -> &'static dyn InputAdapter {
20    &CSV_INPUT_ADAPTER
21}
22
23pub(crate) fn tsv_input_adapter() -> &'static dyn InputAdapter {
24    &TSV_INPUT_ADAPTER
25}
26#[derive(Debug, Clone)]
27pub struct CsvReadPlan {
28    pub schema: Schema,
29    pub ignore_errors: bool,
30}
31
32impl CsvReadPlan {
33    pub fn strict(schema: Schema) -> Self {
34        Self {
35            schema,
36            ignore_errors: false,
37        }
38    }
39}
40
41pub fn read_csv_file(
42    input_path: &Path,
43    source_options: &config::SourceOptions,
44    plan: &CsvReadPlan,
45) -> FloeResult<DataFrame> {
46    read_csv_lazy(
47        input_path,
48        source_options,
49        &plan.schema,
50        plan.ignore_errors,
51        None,
52    )
53}
54
55pub fn read_csv_header(
56    input_path: &Path,
57    source_options: &config::SourceOptions,
58    n_rows: Option<usize>,
59) -> FloeResult<Vec<String>> {
60    let read_options = source_options
61        .to_csv_read_options(input_path)?
62        .with_n_rows(n_rows);
63    let reader = read_options
64        .try_into_reader_with_file_path(None)
65        .map_err(|err| {
66            Box::new(IoError(format!(
67                "failed to open csv at {}: {err}",
68                input_path.display()
69            ))) as Box<dyn std::error::Error + Send + Sync>
70        })?;
71    let df = reader.finish().map_err(|err| {
72        Box::new(IoError(format!("csv header read failed: {err}")))
73            as Box<dyn std::error::Error + Send + Sync>
74    })?;
75    Ok(df
76        .get_column_names()
77        .iter()
78        .map(|name| name.to_string())
79        .collect())
80}
81
82impl InputAdapter for DelimitedInputAdapter {
83    fn format(&self) -> &'static str {
84        self.format
85    }
86
87    fn read_input_columns(
88        &self,
89        entity: &config::EntityConfig,
90        input_file: &InputFile,
91        columns: &[config::ColumnConfig],
92    ) -> Result<Vec<String>, FileReadError> {
93        let default_options = config::SourceOptions::defaults_for_format(self.format);
94        let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
95        resolve_input_columns(&input_file.source_local_path, source_options, columns).map_err(
96            |err| FileReadError {
97                rule: format!("{}_read_error", self.format),
98                message: err.to_string(),
99            },
100        )
101    }
102
103    fn read_inputs(
104        &self,
105        entity: &config::EntityConfig,
106        files: &[InputFile],
107        columns: &[config::ColumnConfig],
108        normalize_strategy: Option<&str>,
109        collect_raw: bool,
110    ) -> FloeResult<Vec<ReadInput>> {
111        let default_options = config::SourceOptions::defaults_for_format(self.format);
112        let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
113        let mut inputs = Vec::with_capacity(files.len());
114        for input_file in files {
115            let input_columns =
116                resolve_input_columns(&input_file.source_local_path, source_options, columns)?;
117            inputs.push(read_csv_input_with_columns(
118                input_file,
119                source_options,
120                columns,
121                normalize_strategy,
122                collect_raw,
123                input_columns,
124            )?);
125        }
126        Ok(inputs)
127    }
128
129    fn read_inputs_with_prechecked_columns(
130        &self,
131        entity: &config::EntityConfig,
132        files: &[InputFile],
133        columns: &[config::ColumnConfig],
134        normalize_strategy: Option<&str>,
135        collect_raw: bool,
136        prechecked_input_columns: Option<&[String]>,
137    ) -> FloeResult<Vec<ReadInput>> {
138        if let (Some(prechecked), [input_file]) = (prechecked_input_columns, files) {
139            let default_options = config::SourceOptions::defaults_for_format(self.format);
140            let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
141            return Ok(vec![read_csv_input_with_columns(
142                input_file,
143                source_options,
144                columns,
145                normalize_strategy,
146                collect_raw,
147                prechecked.to_vec(),
148            )?]);
149        }
150        self.read_inputs(entity, files, columns, normalize_strategy, collect_raw)
151    }
152}
153
154fn read_csv_input_with_columns(
155    input_file: &InputFile,
156    source_options: &config::SourceOptions,
157    columns: &[config::ColumnConfig],
158    normalize_strategy: Option<&str>,
159    collect_raw: bool,
160    input_columns: Vec<String>,
161) -> FloeResult<ReadInput> {
162    let path = &input_file.source_local_path;
163    let typed_projection = if collect_raw {
164        projected_columns(&input_columns, columns)
165    } else {
166        None
167    };
168    let typed_schema =
169        format::build_typed_schema(input_columns.as_slice(), columns, normalize_strategy)?;
170    if collect_raw {
171        let raw_schema = build_raw_schema(&input_columns);
172        let raw_plan = CsvReadPlan::strict(raw_schema);
173        let raw_df = read_csv_file(path, source_options, &raw_plan)?;
174        let mut typed_df = format::cast_df_to_schema(&raw_df, &typed_schema)?;
175        if let Some(projection) = typed_projection.as_ref() {
176            typed_df = typed_df.select(projection).map_err(|err| {
177                Box::new(RunError(format!("failed to project typed columns: {err}")))
178            })?;
179        }
180        return format::finalize_read_input(input_file, Some(raw_df), typed_df, normalize_strategy);
181    }
182
183    let typed_plan = CsvReadPlan {
184        schema: typed_schema,
185        ignore_errors: true,
186    };
187    let typed_df = read_csv_lazy(path, source_options, &typed_plan.schema, true, None)?;
188    format::finalize_read_input(input_file, None, typed_df, normalize_strategy)
189}
190
191fn resolve_input_columns(
192    path: &Path,
193    source_options: &config::SourceOptions,
194    declared_columns: &[config::ColumnConfig],
195) -> FloeResult<Vec<String>> {
196    let header = source_options.header.unwrap_or(true);
197    if header {
198        return read_csv_header(path, source_options, Some(0));
199    }
200
201    let input_columns = read_csv_header(path, source_options, Some(1))?;
202    let declared_names = declared_columns
203        .iter()
204        .map(|column| column.name.clone())
205        .collect::<Vec<_>>();
206    Ok(headless_columns(&declared_names, input_columns.len()))
207}
208
209fn headless_columns(declared_names: &[String], input_count: usize) -> Vec<String> {
210    let mut names = declared_names
211        .iter()
212        .take(input_count)
213        .cloned()
214        .collect::<Vec<_>>();
215    if input_count > declared_names.len() {
216        for index in declared_names.len()..input_count {
217            names.push(format!("extra_column_{}", index + 1));
218        }
219    }
220    names
221}
222
223fn build_raw_schema(columns: &[String]) -> Schema {
224    let mut schema = Schema::with_capacity(columns.len());
225    for name in columns {
226        schema.insert(name.as_str().into(), DataType::String);
227    }
228    schema
229}
230
231fn read_csv_lazy(
232    input_path: &Path,
233    source_options: &config::SourceOptions,
234    schema: &Schema,
235    ignore_errors: bool,
236    projection: Option<&[String]>,
237) -> FloeResult<DataFrame> {
238    let header = source_options.header.unwrap_or(true);
239    let parse_options = source_options.to_csv_parse_options()?;
240    let path_str = input_path.to_string_lossy();
241    let mut reader = LazyCsvReader::new(PlPath::new(path_str.as_ref()))
242        .with_has_header(header)
243        .with_schema(Some(Arc::new(schema.clone())))
244        .with_ignore_errors(ignore_errors)
245        .map_parse_options(|_| parse_options.clone());
246
247    if let Some(chunk_size) = csv_chunk_size_for_path(input_path) {
248        reader = reader.with_chunk_size(chunk_size);
249    }
250
251    let mut lf = reader.finish().map_err(|err| {
252        Box::new(IoError(format!(
253            "failed to scan csv at {}: {err}",
254            input_path.display()
255        ))) as Box<dyn std::error::Error + Send + Sync>
256    })?;
257
258    if let Some(columns) = projection {
259        let exprs = columns.iter().map(col).collect::<Vec<_>>();
260        lf = lf.select(exprs);
261    }
262
263    lf.collect().map_err(|err| {
264        Box::new(IoError(format!("csv read failed: {err}")))
265            as Box<dyn std::error::Error + Send + Sync>
266    })
267}
268
269fn projected_columns(
270    input_columns: &[String],
271    declared_columns: &[config::ColumnConfig],
272) -> Option<Vec<String>> {
273    let declared = declared_columns
274        .iter()
275        .map(|column| column.name.as_str())
276        .collect::<std::collections::HashSet<_>>();
277    let projected = input_columns
278        .iter()
279        .filter(|name| declared.contains(name.as_str()))
280        .cloned()
281        .collect::<Vec<_>>();
282    if projected.is_empty() || projected.len() == input_columns.len() {
283        None
284    } else {
285        Some(projected)
286    }
287}
288
289fn csv_chunk_size_for_path(path: &Path) -> Option<usize> {
290    let metadata = std::fs::metadata(path).ok()?;
291    let size = metadata.len();
292    if size >= 64 * 1024 * 1024 {
293        Some(50_000)
294    } else {
295        None
296    }
297}