Skip to main content

floe_core/io/write/
csv.rs

1use std::path::{Path, PathBuf};
2
3use polars::prelude::{CsvWriter, DataFrame, SerWriter};
4
5use crate::errors::IoError;
6use crate::io::format::{RejectedSinkAdapter, RejectedWriteRequest};
7use crate::{io, FloeResult};
8
9use super::strategy;
10
11struct CsvRejectedAdapter;
12
13static CSV_REJECTED_ADAPTER: CsvRejectedAdapter = CsvRejectedAdapter;
14
15pub(crate) fn csv_rejected_adapter() -> &'static dyn RejectedSinkAdapter {
16    &CSV_REJECTED_ADAPTER
17}
18
19pub fn write_rejected_csv(df: &mut DataFrame, output_path: &Path) -> FloeResult<PathBuf> {
20    if let Some(parent) = output_path.parent() {
21        std::fs::create_dir_all(parent)?;
22    }
23    let file = std::fs::File::create(output_path)?;
24    CsvWriter::new(file)
25        .finish(df)
26        .map_err(|err| Box::new(IoError(format!("rejected csv write failed: {err}"))))?;
27    Ok(output_path.to_path_buf())
28}
29
30impl RejectedSinkAdapter for CsvRejectedAdapter {
31    fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String> {
32        let RejectedWriteRequest {
33            target,
34            df,
35            source_stem: _,
36            temp_dir,
37            cloud,
38            resolver,
39            entity,
40            mode,
41        } = request;
42        let mut ctx = strategy::WriteContext {
43            target,
44            cloud,
45            resolver,
46            entity,
47        };
48        let spec = strategy::rejected_csv_spec();
49        let mut part_allocator = strategy::strategy_for(mode).part_allocator(&mut ctx, spec)?;
50        let part_filename = part_allocator.allocate_next();
51        io::storage::output::write_output(
52            target,
53            io::storage::output::OutputPlacement::Directory,
54            &part_filename,
55            temp_dir,
56            cloud,
57            resolver,
58            entity,
59            |path| {
60                write_rejected_csv(df, path)?;
61                Ok(())
62            },
63        )
64    }
65}
66
67// Filename construction is shared via io::storage::paths helpers.