1use std::path::Path;
2
3use polars::prelude::{DataFrame, ParquetCompression, ParquetWriter};
4
5use crate::errors::IoError;
6use crate::io::format::{AcceptedSinkAdapter, AcceptedWriteOutput};
7use crate::io::storage::Target;
8use crate::{config, io, ConfigError, FloeResult};
9
10use super::{metrics, strategy};
11
12struct ParquetAcceptedAdapter;
13
14static PARQUET_ACCEPTED_ADAPTER: ParquetAcceptedAdapter = ParquetAcceptedAdapter;
15const DEFAULT_MAX_SIZE_PER_FILE_BYTES: u64 = 256 * 1024 * 1024;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct ParquetWriteRuntimeOptions {
19 pub max_size_per_file_bytes: u64,
20 pub small_file_threshold_bytes: u64,
21}
22
23pub(crate) fn parquet_accepted_adapter() -> &'static dyn AcceptedSinkAdapter {
24 &PARQUET_ACCEPTED_ADAPTER
25}
26
27pub fn parquet_write_runtime_options(target: &config::SinkTarget) -> ParquetWriteRuntimeOptions {
28 let max_size_per_file_bytes = target
29 .options
30 .as_ref()
31 .and_then(|options| options.max_size_per_file)
32 .unwrap_or(DEFAULT_MAX_SIZE_PER_FILE_BYTES);
33 ParquetWriteRuntimeOptions {
34 max_size_per_file_bytes,
35 small_file_threshold_bytes: metrics::default_small_file_threshold_bytes(Some(
36 max_size_per_file_bytes,
37 )),
38 }
39}
40
41pub fn write_parquet_to_path(
42 df: &mut DataFrame,
43 output_path: &Path,
44 options: Option<&config::SinkOptions>,
45) -> FloeResult<()> {
46 if let Some(parent) = output_path.parent() {
47 std::fs::create_dir_all(parent)?;
48 }
49 let file = std::fs::File::create(output_path)?;
50 let mut writer = ParquetWriter::new(file);
51 if let Some(options) = options {
52 if let Some(compression) = &options.compression {
53 writer = writer.with_compression(parse_parquet_compression(compression)?);
54 }
55 if let Some(row_group_size) = options.row_group_size {
56 let row_group_size = usize::try_from(row_group_size).map_err(|_| {
57 Box::new(ConfigError(format!(
58 "parquet row_group_size is too large: {row_group_size}"
59 )))
60 })?;
61 writer = writer.with_row_group_size(Some(row_group_size));
62 }
63 }
64 writer
65 .finish(df)
66 .map_err(|err| Box::new(IoError(format!("parquet write failed: {err}"))))?;
67 Ok(())
68}
69
70impl AcceptedSinkAdapter for ParquetAcceptedAdapter {
71 fn write_accepted(
72 &self,
73 target: &Target,
74 df: &mut DataFrame,
75 mode: config::WriteMode,
76 _output_stem: &str,
77 temp_dir: Option<&Path>,
78 cloud: &mut io::storage::CloudClient,
79 resolver: &config::StorageResolver,
80 _catalogs: &config::CatalogResolver,
81 entity: &config::EntityConfig,
82 ) -> FloeResult<AcceptedWriteOutput> {
83 let mut ctx = strategy::WriteContext {
84 target,
85 cloud,
86 resolver,
87 entity,
88 };
89 let spec = strategy::accepted_parquet_spec();
90 let mut part_allocator = strategy::strategy_for(mode).part_allocator(&mut ctx, spec)?;
91 let options = entity.sink.accepted.options.as_ref();
92 let runtime_options = parquet_write_runtime_options(&entity.sink.accepted);
93 let max_size_per_file = runtime_options.max_size_per_file_bytes;
94 let mut parts_written = 0;
95 let mut part_files = Vec::new();
96 let mut file_sizes = Vec::new();
97 let total_rows = df.height();
98 if total_rows > 0 {
99 let estimated_size = df.estimated_size() as u64;
100 let avg_row_size = if estimated_size == 0 {
101 1
102 } else {
103 estimated_size.div_ceil(total_rows as u64)
104 };
105 let max_rows = std::cmp::max(1, max_size_per_file / avg_row_size) as usize;
106 let mut offset = 0usize;
107 while offset < total_rows {
108 let len = std::cmp::min(max_rows, total_rows - offset);
109 let mut chunk = df.slice(offset as i64, len);
110 let part_filename = part_allocator.allocate_next();
111 io::storage::output::write_output(
112 target,
113 io::storage::OutputPlacement::Directory,
114 &part_filename,
115 temp_dir,
116 cloud,
117 resolver,
118 entity,
119 |path| write_parquet_to_path(&mut chunk, path, options),
120 )?;
121 if let Some(size) = stat_written_output_file_size(target, temp_dir, &part_filename)
122 {
123 file_sizes.push(size);
124 }
125 if part_files.len() < 50 {
126 part_files.push(part_filename);
127 }
128 parts_written += 1;
129 offset += len;
130 }
131 } else {
132 let part_filename = part_allocator.allocate_next();
133 io::storage::output::write_output(
134 target,
135 io::storage::OutputPlacement::Directory,
136 &part_filename,
137 temp_dir,
138 cloud,
139 resolver,
140 entity,
141 |path| write_parquet_to_path(df, path, options),
142 )?;
143 if let Some(size) = stat_written_output_file_size(target, temp_dir, &part_filename) {
144 file_sizes.push(size);
145 }
146 parts_written = 1;
147 part_files.push(part_filename);
148 }
149
150 let metrics = metrics::summarize_written_file_sizes(
151 &file_sizes,
152 parts_written,
153 runtime_options.small_file_threshold_bytes,
154 );
155
156 Ok(AcceptedWriteOutput {
157 files_written: Some(parts_written),
158 parts_written,
159 part_files,
160 table_version: None,
161 snapshot_id: None,
162 table_root_uri: None,
163 iceberg_catalog_name: None,
164 iceberg_database: None,
165 iceberg_namespace: None,
166 iceberg_table: None,
167 metrics,
168 merge: None,
169 schema_evolution: io::format::AcceptedSchemaEvolution {
170 enabled: false,
171 mode: entity
172 .schema
173 .resolved_schema_evolution()
174 .mode
175 .as_str()
176 .to_string(),
177 applied: false,
178 added_columns: Vec::new(),
179 incompatible_changes_detected: false,
180 },
181 perf: None,
182 })
183 }
184}
185
186fn parse_parquet_compression(value: &str) -> FloeResult<ParquetCompression> {
187 match value {
188 "snappy" => Ok(ParquetCompression::Snappy),
189 "gzip" => Ok(ParquetCompression::Gzip(None)),
190 "zstd" => Ok(ParquetCompression::Zstd(None)),
191 "uncompressed" => Ok(ParquetCompression::Uncompressed),
192 _ => Err(Box::new(ConfigError(format!(
193 "unsupported parquet compression: {value}"
194 )))),
195 }
196}
197
198fn stat_written_output_file_size(
199 target: &Target,
200 temp_dir: Option<&Path>,
201 filename: &str,
202) -> Option<u64> {
203 let path = match target {
204 Target::Local { base_path, .. } => {
205 crate::io::storage::paths::resolve_output_dir_path(base_path, filename)
206 }
207 Target::S3 { .. } | Target::Gcs { .. } | Target::Adls { .. } => temp_dir?.join(filename),
208 };
209 std::fs::metadata(path).ok().map(|meta| meta.len())
210}