Skip to main content

floe_core/io/read/
csv.rs

1use std::path::Path;
2use std::sync::Arc;
3
4use polars::prelude::{
5    col, DataFrame, DataType, LazyCsvReader, LazyFileListReader, PlPath, Schema, SerReader,
6};
7
8use crate::errors::{IoError, RunError};
9use crate::io::format::{self, FileReadError, InputAdapter, InputFile, ReadInput};
10use crate::{config, FloeResult};
11
12struct CsvInputAdapter;
13
14static CSV_INPUT_ADAPTER: CsvInputAdapter = CsvInputAdapter;
15
16pub(crate) fn csv_input_adapter() -> &'static dyn InputAdapter {
17    &CSV_INPUT_ADAPTER
18}
19
20#[derive(Debug, Clone)]
21pub struct CsvReadPlan {
22    pub schema: Schema,
23    pub ignore_errors: bool,
24}
25
26impl CsvReadPlan {
27    pub fn strict(schema: Schema) -> Self {
28        Self {
29            schema,
30            ignore_errors: false,
31        }
32    }
33}
34
35pub fn read_csv_file(
36    input_path: &Path,
37    source_options: &config::SourceOptions,
38    plan: &CsvReadPlan,
39) -> FloeResult<DataFrame> {
40    read_csv_lazy(
41        input_path,
42        source_options,
43        &plan.schema,
44        plan.ignore_errors,
45        None,
46    )
47}
48
49pub fn read_csv_header(
50    input_path: &Path,
51    source_options: &config::SourceOptions,
52    n_rows: Option<usize>,
53) -> FloeResult<Vec<String>> {
54    let read_options = source_options
55        .to_csv_read_options(input_path)?
56        .with_n_rows(n_rows);
57    let reader = read_options
58        .try_into_reader_with_file_path(None)
59        .map_err(|err| {
60            Box::new(IoError(format!(
61                "failed to open csv at {}: {err}",
62                input_path.display()
63            ))) as Box<dyn std::error::Error + Send + Sync>
64        })?;
65    let df = reader.finish().map_err(|err| {
66        Box::new(IoError(format!("csv header read failed: {err}")))
67            as Box<dyn std::error::Error + Send + Sync>
68    })?;
69    Ok(df
70        .get_column_names()
71        .iter()
72        .map(|name| name.to_string())
73        .collect())
74}
75
76impl InputAdapter for CsvInputAdapter {
77    fn format(&self) -> &'static str {
78        "csv"
79    }
80
81    fn read_input_columns(
82        &self,
83        entity: &config::EntityConfig,
84        input_file: &InputFile,
85        columns: &[config::ColumnConfig],
86    ) -> Result<Vec<String>, FileReadError> {
87        let default_options = config::SourceOptions::default();
88        let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
89        resolve_input_columns(&input_file.source_local_path, source_options, columns).map_err(
90            |err| FileReadError {
91                rule: "csv_read_error".to_string(),
92                message: err.to_string(),
93            },
94        )
95    }
96
97    fn read_inputs(
98        &self,
99        entity: &config::EntityConfig,
100        files: &[InputFile],
101        columns: &[config::ColumnConfig],
102        normalize_strategy: Option<&str>,
103        collect_raw: bool,
104    ) -> FloeResult<Vec<ReadInput>> {
105        let default_options = config::SourceOptions::default();
106        let source_options = entity.source.options.as_ref().unwrap_or(&default_options);
107        let mut inputs = Vec::with_capacity(files.len());
108        for input_file in files {
109            let path = &input_file.source_local_path;
110            let input_columns = resolve_input_columns(path, source_options, columns)?;
111            let typed_projection = if collect_raw {
112                projected_columns(&input_columns, columns)
113            } else {
114                None
115            };
116            let typed_schema =
117                format::build_typed_schema(input_columns.as_slice(), columns, normalize_strategy)?;
118            let input = if collect_raw {
119                let raw_schema = build_raw_schema(&input_columns);
120                let raw_plan = CsvReadPlan::strict(raw_schema);
121                let raw_df = read_csv_file(path, source_options, &raw_plan)?;
122                let mut typed_df = format::cast_df_to_schema(&raw_df, &typed_schema)?;
123                if let Some(projection) = typed_projection.as_ref() {
124                    typed_df = typed_df.select(projection).map_err(|err| {
125                        Box::new(RunError(format!("failed to project typed columns: {err}")))
126                    })?;
127                }
128                format::finalize_read_input(input_file, Some(raw_df), typed_df, normalize_strategy)?
129            } else {
130                let typed_plan = CsvReadPlan {
131                    schema: typed_schema,
132                    ignore_errors: true,
133                };
134                let typed_df = read_csv_lazy(path, source_options, &typed_plan.schema, true, None)?;
135                format::finalize_read_input(input_file, None, typed_df, normalize_strategy)?
136            };
137            inputs.push(input);
138        }
139        Ok(inputs)
140    }
141}
142
143fn resolve_input_columns(
144    path: &Path,
145    source_options: &config::SourceOptions,
146    declared_columns: &[config::ColumnConfig],
147) -> FloeResult<Vec<String>> {
148    let header = source_options.header.unwrap_or(true);
149    if header {
150        return read_csv_header(path, source_options, Some(0));
151    }
152
153    let input_columns = read_csv_header(path, source_options, Some(1))?;
154    let declared_names = declared_columns
155        .iter()
156        .map(|column| column.name.clone())
157        .collect::<Vec<_>>();
158    Ok(headless_columns(&declared_names, input_columns.len()))
159}
160
161fn headless_columns(declared_names: &[String], input_count: usize) -> Vec<String> {
162    let mut names = declared_names
163        .iter()
164        .take(input_count)
165        .cloned()
166        .collect::<Vec<_>>();
167    if input_count > declared_names.len() {
168        for index in declared_names.len()..input_count {
169            names.push(format!("extra_column_{}", index + 1));
170        }
171    }
172    names
173}
174
175fn build_raw_schema(columns: &[String]) -> Schema {
176    let mut schema = Schema::with_capacity(columns.len());
177    for name in columns {
178        schema.insert(name.as_str().into(), DataType::String);
179    }
180    schema
181}
182
183fn read_csv_lazy(
184    input_path: &Path,
185    source_options: &config::SourceOptions,
186    schema: &Schema,
187    ignore_errors: bool,
188    projection: Option<&[String]>,
189) -> FloeResult<DataFrame> {
190    let header = source_options.header.unwrap_or(true);
191    let parse_options = source_options.to_csv_parse_options()?;
192    let path_str = input_path.to_string_lossy();
193    let mut reader = LazyCsvReader::new(PlPath::new(path_str.as_ref()))
194        .with_has_header(header)
195        .with_schema(Some(Arc::new(schema.clone())))
196        .with_ignore_errors(ignore_errors)
197        .map_parse_options(|_| parse_options.clone());
198
199    if let Some(chunk_size) = csv_chunk_size_for_path(input_path) {
200        reader = reader.with_chunk_size(chunk_size);
201    }
202
203    let mut lf = reader.finish().map_err(|err| {
204        Box::new(IoError(format!(
205            "failed to scan csv at {}: {err}",
206            input_path.display()
207        ))) as Box<dyn std::error::Error + Send + Sync>
208    })?;
209
210    if let Some(columns) = projection {
211        let exprs = columns.iter().map(col).collect::<Vec<_>>();
212        lf = lf.select(exprs);
213    }
214
215    lf.collect().map_err(|err| {
216        Box::new(IoError(format!("csv read failed: {err}")))
217            as Box<dyn std::error::Error + Send + Sync>
218    })
219}
220
221fn projected_columns(
222    input_columns: &[String],
223    declared_columns: &[config::ColumnConfig],
224) -> Option<Vec<String>> {
225    let declared = declared_columns
226        .iter()
227        .map(|column| column.name.as_str())
228        .collect::<std::collections::HashSet<_>>();
229    let projected = input_columns
230        .iter()
231        .filter(|name| declared.contains(name.as_str()))
232        .cloned()
233        .collect::<Vec<_>>();
234    if projected.is_empty() || projected.len() == input_columns.len() {
235        None
236    } else {
237        Some(projected)
238    }
239}
240
241fn csv_chunk_size_for_path(path: &Path) -> Option<usize> {
242    let metadata = std::fs::metadata(path).ok()?;
243    let size = metadata.len();
244    if size >= 64 * 1024 * 1024 {
245        Some(50_000)
246    } else {
247        None
248    }
249}