floe_core/io/write/
csv.rs1use std::path::{Path, PathBuf};
2
3use polars::prelude::{CsvWriter, DataFrame, SerWriter};
4
5use crate::errors::IoError;
6use crate::io::format::{RejectedSinkAdapter, RejectedWriteRequest};
7use crate::{io, FloeResult};
8
9use super::strategy;
10
11struct CsvRejectedAdapter;
12
13static CSV_REJECTED_ADAPTER: CsvRejectedAdapter = CsvRejectedAdapter;
14
15pub(crate) fn csv_rejected_adapter() -> &'static dyn RejectedSinkAdapter {
16 &CSV_REJECTED_ADAPTER
17}
18
19pub fn write_rejected_csv(df: &mut DataFrame, output_path: &Path) -> FloeResult<PathBuf> {
20 if let Some(parent) = output_path.parent() {
21 std::fs::create_dir_all(parent)?;
22 }
23 let file = std::fs::File::create(output_path)?;
24 CsvWriter::new(file)
25 .finish(df)
26 .map_err(|err| Box::new(IoError(format!("rejected csv write failed: {err}"))))?;
27 Ok(output_path.to_path_buf())
28}
29
30impl RejectedSinkAdapter for CsvRejectedAdapter {
31 fn write_rejected(&self, request: RejectedWriteRequest<'_>) -> FloeResult<String> {
32 let RejectedWriteRequest {
33 target,
34 df,
35 source_stem: _,
36 temp_dir,
37 cloud,
38 resolver,
39 entity,
40 mode,
41 } = request;
42 let mut ctx = strategy::WriteContext {
43 target,
44 cloud,
45 resolver,
46 entity,
47 };
48 let spec = strategy::rejected_csv_spec();
49 let mut part_allocator = strategy::strategy_for(mode).part_allocator(&mut ctx, spec)?;
50 let part_filename = part_allocator.allocate_next();
51 io::storage::output::write_output(
52 target,
53 io::storage::OutputPlacement::Directory,
54 &part_filename,
55 temp_dir,
56 cloud,
57 resolver,
58 entity,
59 |path| {
60 write_rejected_csv(df, path)?;
61 Ok(())
62 },
63 )
64 }
65}
66
67