Skip to main content

floe_core/io/write/
parquet.rs

1use std::path::Path;
2
3use polars::prelude::{DataFrame, ParquetCompression, ParquetWriter};
4
5use crate::errors::IoError;
6use crate::io::format::{AcceptedSinkAdapter, AcceptedWriteOutput};
7use crate::io::storage::Target;
8use crate::{config, io, ConfigError, FloeResult};
9
10use super::{metrics, strategy};
11
12struct ParquetAcceptedAdapter;
13
14static PARQUET_ACCEPTED_ADAPTER: ParquetAcceptedAdapter = ParquetAcceptedAdapter;
15const DEFAULT_MAX_SIZE_PER_FILE_BYTES: u64 = 256 * 1024 * 1024;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct ParquetWriteRuntimeOptions {
19    pub max_size_per_file_bytes: u64,
20    pub small_file_threshold_bytes: u64,
21}
22
23pub(crate) fn parquet_accepted_adapter() -> &'static dyn AcceptedSinkAdapter {
24    &PARQUET_ACCEPTED_ADAPTER
25}
26
27pub fn parquet_write_runtime_options(target: &config::SinkTarget) -> ParquetWriteRuntimeOptions {
28    let max_size_per_file_bytes = target
29        .options
30        .as_ref()
31        .and_then(|options| options.max_size_per_file)
32        .unwrap_or(DEFAULT_MAX_SIZE_PER_FILE_BYTES);
33    ParquetWriteRuntimeOptions {
34        max_size_per_file_bytes,
35        small_file_threshold_bytes: metrics::default_small_file_threshold_bytes(Some(
36            max_size_per_file_bytes,
37        )),
38    }
39}
40
41pub fn write_parquet_to_path(
42    df: &mut DataFrame,
43    output_path: &Path,
44    options: Option<&config::SinkOptions>,
45) -> FloeResult<()> {
46    if let Some(parent) = output_path.parent() {
47        std::fs::create_dir_all(parent)?;
48    }
49    let file = std::fs::File::create(output_path)?;
50    let mut writer = ParquetWriter::new(file);
51    if let Some(options) = options {
52        if let Some(compression) = &options.compression {
53            writer = writer.with_compression(parse_parquet_compression(compression)?);
54        }
55        if let Some(row_group_size) = options.row_group_size {
56            let row_group_size = usize::try_from(row_group_size).map_err(|_| {
57                Box::new(ConfigError(format!(
58                    "parquet row_group_size is too large: {row_group_size}"
59                )))
60            })?;
61            writer = writer.with_row_group_size(Some(row_group_size));
62        }
63    }
64    writer
65        .finish(df)
66        .map_err(|err| Box::new(IoError(format!("parquet write failed: {err}"))))?;
67    Ok(())
68}
69
70impl AcceptedSinkAdapter for ParquetAcceptedAdapter {
71    fn write_accepted(
72        &self,
73        target: &Target,
74        df: &mut DataFrame,
75        mode: config::WriteMode,
76        _output_stem: &str,
77        temp_dir: Option<&Path>,
78        cloud: &mut io::storage::CloudClient,
79        resolver: &config::StorageResolver,
80        _catalogs: &config::CatalogResolver,
81        entity: &config::EntityConfig,
82    ) -> FloeResult<AcceptedWriteOutput> {
83        let mut ctx = strategy::WriteContext {
84            target,
85            cloud,
86            resolver,
87            entity,
88        };
89        let spec = strategy::accepted_parquet_spec();
90        let mut part_allocator = strategy::strategy_for(mode).part_allocator(&mut ctx, spec)?;
91        let options = entity.sink.accepted.options.as_ref();
92        let runtime_options = parquet_write_runtime_options(&entity.sink.accepted);
93        let max_size_per_file = runtime_options.max_size_per_file_bytes;
94        let mut parts_written = 0;
95        let mut part_files = Vec::new();
96        let mut file_sizes = Vec::new();
97        let total_rows = df.height();
98        if total_rows > 0 {
99            let estimated_size = df.estimated_size() as u64;
100            let avg_row_size = if estimated_size == 0 {
101                1
102            } else {
103                estimated_size.div_ceil(total_rows as u64)
104            };
105            let max_rows = std::cmp::max(1, max_size_per_file / avg_row_size) as usize;
106            let mut offset = 0usize;
107            while offset < total_rows {
108                let len = std::cmp::min(max_rows, total_rows - offset);
109                let mut chunk = df.slice(offset as i64, len);
110                let part_filename = part_allocator.allocate_next();
111                io::storage::output::write_output(
112                    target,
113                    io::storage::OutputPlacement::Directory,
114                    &part_filename,
115                    temp_dir,
116                    cloud,
117                    resolver,
118                    entity,
119                    |path| write_parquet_to_path(&mut chunk, path, options),
120                )?;
121                if let Some(size) = stat_written_output_file_size(target, temp_dir, &part_filename)
122                {
123                    file_sizes.push(size);
124                }
125                if part_files.len() < 50 {
126                    part_files.push(part_filename);
127                }
128                parts_written += 1;
129                offset += len;
130            }
131        } else {
132            let part_filename = part_allocator.allocate_next();
133            io::storage::output::write_output(
134                target,
135                io::storage::OutputPlacement::Directory,
136                &part_filename,
137                temp_dir,
138                cloud,
139                resolver,
140                entity,
141                |path| write_parquet_to_path(df, path, options),
142            )?;
143            if let Some(size) = stat_written_output_file_size(target, temp_dir, &part_filename) {
144                file_sizes.push(size);
145            }
146            parts_written = 1;
147            part_files.push(part_filename);
148        }
149
150        let metrics = metrics::summarize_written_file_sizes(
151            &file_sizes,
152            parts_written,
153            runtime_options.small_file_threshold_bytes,
154        );
155
156        Ok(AcceptedWriteOutput {
157            files_written: Some(parts_written),
158            parts_written,
159            part_files,
160            table_version: None,
161            snapshot_id: None,
162            table_root_uri: None,
163            iceberg_catalog_name: None,
164            iceberg_database: None,
165            iceberg_namespace: None,
166            iceberg_table: None,
167            metrics,
168            merge: None,
169            schema_evolution: io::format::AcceptedSchemaEvolution {
170                enabled: false,
171                mode: entity
172                    .schema
173                    .resolved_schema_evolution()
174                    .mode
175                    .as_str()
176                    .to_string(),
177                applied: false,
178                added_columns: Vec::new(),
179                incompatible_changes_detected: false,
180            },
181            perf: None,
182        })
183    }
184}
185
186fn parse_parquet_compression(value: &str) -> FloeResult<ParquetCompression> {
187    match value {
188        "snappy" => Ok(ParquetCompression::Snappy),
189        "gzip" => Ok(ParquetCompression::Gzip(None)),
190        "zstd" => Ok(ParquetCompression::Zstd(None)),
191        "uncompressed" => Ok(ParquetCompression::Uncompressed),
192        _ => Err(Box::new(ConfigError(format!(
193            "unsupported parquet compression: {value}"
194        )))),
195    }
196}
197
198fn stat_written_output_file_size(
199    target: &Target,
200    temp_dir: Option<&Path>,
201    filename: &str,
202) -> Option<u64> {
203    let path = match target {
204        Target::Local { base_path, .. } => {
205            crate::io::storage::paths::resolve_output_dir_path(base_path, filename)
206        }
207        Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => temp_dir?.join(filename),
208    };
209    std::fs::metadata(path).ok().map(|meta| meta.len())
210}