Skip to main content

floe_core/io/write/
csv.rs

1use std::path::{Path, PathBuf};
2
3use polars::prelude::{CsvWriter, DataFrame, SerWriter};
4
5use crate::errors::IoError;
6use crate::io::format::RejectedSinkAdapter;
7use crate::io::storage::Target;
8use crate::{config, io, FloeResult};
9
10struct CsvRejectedAdapter;
11
12static CSV_REJECTED_ADAPTER: CsvRejectedAdapter = CsvRejectedAdapter;
13
14pub(crate) fn csv_rejected_adapter() -> &'static dyn RejectedSinkAdapter {
15    &CSV_REJECTED_ADAPTER
16}
17
18pub fn write_rejected_csv(df: &mut DataFrame, output_path: &Path) -> FloeResult<PathBuf> {
19    if let Some(parent) = output_path.parent() {
20        std::fs::create_dir_all(parent)?;
21    }
22    let file = std::fs::File::create(output_path)?;
23    CsvWriter::new(file)
24        .finish(df)
25        .map_err(|err| Box::new(IoError(format!("rejected csv write failed: {err}"))))?;
26    Ok(output_path.to_path_buf())
27}
28
29impl RejectedSinkAdapter for CsvRejectedAdapter {
30    fn write_rejected(
31        &self,
32        target: &Target,
33        df: &mut DataFrame,
34        source_stem: &str,
35        temp_dir: Option<&Path>,
36        cloud: &mut io::storage::CloudClient,
37        resolver: &config::StorageResolver,
38        entity: &config::EntityConfig,
39    ) -> FloeResult<String> {
40        let filename = io::storage::paths::build_output_filename(source_stem, "_rejected", "csv");
41        io::storage::output::write_output(
42            target,
43            io::storage::output::OutputPlacement::Output,
44            &filename,
45            temp_dir,
46            cloud,
47            resolver,
48            entity,
49            |path| {
50                write_rejected_csv(df, path)?;
51                Ok(())
52            },
53        )
54    }
55}
56
57// Filename construction is shared via io::storage::paths helpers.