floe_core/io/write/
csv.rs1use std::path::{Path, PathBuf};
2
3use polars::prelude::{CsvWriter, DataFrame, SerWriter};
4
5use crate::errors::IoError;
6use crate::io::format::RejectedSinkAdapter;
7use crate::io::storage::Target;
8use crate::{config, io, FloeResult};
9
10struct CsvRejectedAdapter;
11
12static CSV_REJECTED_ADAPTER: CsvRejectedAdapter = CsvRejectedAdapter;
13
14pub(crate) fn csv_rejected_adapter() -> &'static dyn RejectedSinkAdapter {
15 &CSV_REJECTED_ADAPTER
16}
17
18pub fn write_rejected_csv(df: &mut DataFrame, output_path: &Path) -> FloeResult<PathBuf> {
19 if let Some(parent) = output_path.parent() {
20 std::fs::create_dir_all(parent)?;
21 }
22 let file = std::fs::File::create(output_path)?;
23 CsvWriter::new(file)
24 .finish(df)
25 .map_err(|err| Box::new(IoError(format!("rejected csv write failed: {err}"))))?;
26 Ok(output_path.to_path_buf())
27}
28
29impl RejectedSinkAdapter for CsvRejectedAdapter {
30 fn write_rejected(
31 &self,
32 target: &Target,
33 df: &mut DataFrame,
34 source_stem: &str,
35 temp_dir: Option<&Path>,
36 cloud: &mut io::storage::CloudClient,
37 resolver: &config::StorageResolver,
38 entity: &config::EntityConfig,
39 ) -> FloeResult<String> {
40 let filename = io::storage::paths::build_output_filename(source_stem, "_rejected", "csv");
41 io::storage::output::write_output(
42 target,
43 io::storage::output::OutputPlacement::Output,
44 &filename,
45 temp_dir,
46 cloud,
47 resolver,
48 entity,
49 |path| {
50 write_rejected_csv(df, path)?;
51 Ok(())
52 },
53 )
54 }
55}
56
57