1use std::io::Write;
2
3use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
4use parquet_format_safe::RowGroup;
5
6use crate::metadata::ThriftFileMetaData;
7use crate::{
8 error::{Error, Result},
9 metadata::SchemaDescriptor,
10 FOOTER_SIZE, PARQUET_MAGIC,
11};
12
13use super::indexes::{write_column_index, write_offset_index};
14use super::page::PageWriteSpec;
15use super::{row_group::write_row_group, RowGroupIter, WriteOptions};
16
17pub use crate::metadata::KeyValue;
18use crate::write::State;
19
20pub(super) fn start_file<W: Write>(writer: &mut W) -> Result<u64> {
21 writer.write_all(&PARQUET_MAGIC)?;
22 Ok(PARQUET_MAGIC.len() as u64)
23}
24
25pub(super) fn end_file<W: Write>(mut writer: &mut W, metadata: &ThriftFileMetaData) -> Result<u64> {
26 let mut protocol = TCompactOutputProtocol::new(&mut writer);
28 let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;
29
30 let metadata_bytes = metadata_len.to_le_bytes();
32 let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
33 (0..4).for_each(|i| {
34 footer_buffer[i] = metadata_bytes[i];
35 });
36
37 (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
38 writer.write_all(&footer_buffer)?;
39 writer.flush()?;
40 Ok(metadata_len as u64 + FOOTER_SIZE)
41}
42
43pub struct FileWriter<W: Write> {
47 writer: W,
48 schema: SchemaDescriptor,
49 options: WriteOptions,
50 created_by: Option<String>,
51
52 offset: u64,
53 row_groups: Vec<RowGroup>,
54 page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
55 state: State,
57 metadata: Option<ThriftFileMetaData>,
59}
60
61pub fn write_metadata_sidecar<W: Write>(
69 writer: &mut W,
70 metadata: &ThriftFileMetaData,
71) -> Result<u64> {
72 let mut len = start_file(writer)?;
73 len += end_file(writer, metadata)?;
74 Ok(len)
75}
76
77impl<W: Write> FileWriter<W> {
79 pub fn options(&self) -> &WriteOptions {
81 &self.options
82 }
83
84 pub fn schema(&self) -> &SchemaDescriptor {
86 &self.schema
87 }
88
89 pub fn metadata(&self) -> Option<&ThriftFileMetaData> {
94 self.metadata.as_ref()
95 }
96}
97
98impl<W: Write> FileWriter<W> {
99 pub fn new(
101 writer: W,
102 schema: SchemaDescriptor,
103 options: WriteOptions,
104 created_by: Option<String>,
105 ) -> Self {
106 Self {
107 writer,
108 schema,
109 options,
110 created_by,
111 offset: 0,
112 row_groups: vec![],
113 page_specs: vec![],
114 state: State::Initialised,
115 metadata: None,
116 }
117 }
118
119 fn start(&mut self) -> Result<()> {
126 if self.offset == 0 {
127 self.offset = start_file(&mut self.writer)?;
128 self.state = State::Started;
129 Ok(())
130 } else {
131 Err(Error::InvalidParameter(
132 "Start cannot be called twice".to_string(),
133 ))
134 }
135 }
136
137 pub fn write<E>(&mut self, row_group: RowGroupIter<'_, E>) -> Result<()>
141 where
142 Error: From<E>,
143 E: std::error::Error,
144 {
145 if self.offset == 0 {
146 self.start()?;
147 }
148 let ordinal = self.row_groups.len();
149 let (group, specs, size) = write_row_group(
150 &mut self.writer,
151 self.offset,
152 self.schema.columns(),
153 row_group,
154 ordinal,
155 )?;
156 self.offset += size;
157 self.row_groups.push(group);
158 self.page_specs.push(specs);
159 Ok(())
160 }
161
162 pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<u64> {
165 if self.offset == 0 {
166 self.start()?;
167 }
168
169 if self.state != State::Started {
170 return Err(Error::InvalidParameter(
171 "End cannot be called twice".to_string(),
172 ));
173 }
174 let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
176
177 if self.options.write_statistics {
178 self.row_groups
180 .iter_mut()
181 .zip(self.page_specs.iter())
182 .try_for_each(|(group, pages)| {
183 group.columns.iter_mut().zip(pages.iter()).try_for_each(
184 |(column, pages)| {
185 let offset = self.offset;
186 column.column_index_offset = Some(offset as i64);
187 self.offset += write_column_index(&mut self.writer, pages)?;
188 let length = self.offset - offset;
189 column.column_index_length = Some(length as i32);
190 Result::Ok(())
191 },
192 )?;
193 Result::Ok(())
194 })?;
195 };
196
197 self.row_groups
199 .iter_mut()
200 .zip(self.page_specs.iter())
201 .try_for_each(|(group, pages)| {
202 group
203 .columns
204 .iter_mut()
205 .zip(pages.iter())
206 .try_for_each(|(column, pages)| {
207 let offset = self.offset;
208 column.offset_index_offset = Some(offset as i64);
209 self.offset += write_offset_index(&mut self.writer, pages)?;
210 column.offset_index_length = Some((self.offset - offset) as i32);
211 Result::Ok(())
212 })?;
213 Result::Ok(())
214 })?;
215
216 let metadata = ThriftFileMetaData::new(
217 self.options.version.into(),
218 self.schema.clone().into_thrift(),
219 num_rows,
220 self.row_groups.clone(),
221 key_value_metadata,
222 self.created_by.clone(),
223 None,
224 None,
225 None,
226 );
227
228 let len = end_file(&mut self.writer, &metadata)?;
229 self.state = State::Finished;
230 self.metadata = Some(metadata);
231 Ok(self.offset + len)
232 }
233
234 pub fn into_inner(self) -> W {
236 self.writer
237 }
238
239 pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetaData) {
243 (self.writer, self.metadata.expect("File to have ended"))
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use std::{fs::File, io::Cursor};
250
251 use super::*;
252
253 use crate::error::Result;
254 use crate::read::read_metadata;
255 use crate::tests::get_path;
256
257 #[test]
258 fn empty_file() -> Result<()> {
259 let mut testdata = get_path();
260 testdata.push("alltypes_plain.parquet");
261 let mut file = File::open(testdata).unwrap();
262
263 let mut metadata = read_metadata(&mut file)?;
264
265 metadata.row_groups = vec![];
267 metadata.num_rows = 0;
268
269 let mut writer = Cursor::new(vec![]);
270
271 start_file(&mut writer)?;
273 end_file(&mut writer, &metadata.into_thrift())?;
274
275 let a = writer.into_inner();
276
277 let result = read_metadata(&mut Cursor::new(a));
279 assert!(result.is_ok());
280
281 Ok(())
282 }
283}