parquet2/write/
file.rs

1use std::io::Write;
2
3use parquet_format_safe::thrift::protocol::TCompactOutputProtocol;
4use parquet_format_safe::RowGroup;
5
6use crate::metadata::ThriftFileMetaData;
7use crate::{
8    error::{Error, Result},
9    metadata::SchemaDescriptor,
10    FOOTER_SIZE, PARQUET_MAGIC,
11};
12
13use super::indexes::{write_column_index, write_offset_index};
14use super::page::PageWriteSpec;
15use super::{row_group::write_row_group, RowGroupIter, WriteOptions};
16
17pub use crate::metadata::KeyValue;
18use crate::write::State;
19
20pub(super) fn start_file<W: Write>(writer: &mut W) -> Result<u64> {
21    writer.write_all(&PARQUET_MAGIC)?;
22    Ok(PARQUET_MAGIC.len() as u64)
23}
24
25pub(super) fn end_file<W: Write>(mut writer: &mut W, metadata: &ThriftFileMetaData) -> Result<u64> {
26    // Write metadata
27    let mut protocol = TCompactOutputProtocol::new(&mut writer);
28    let metadata_len = metadata.write_to_out_protocol(&mut protocol)? as i32;
29
30    // Write footer
31    let metadata_bytes = metadata_len.to_le_bytes();
32    let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
33    (0..4).for_each(|i| {
34        footer_buffer[i] = metadata_bytes[i];
35    });
36
37    (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
38    writer.write_all(&footer_buffer)?;
39    writer.flush()?;
40    Ok(metadata_len as u64 + FOOTER_SIZE)
41}
42
43/// An interface to write a parquet file.
44/// Use `start` to write the header, `write` to write a row group,
45/// and `end` to write the footer.
46pub struct FileWriter<W: Write> {
47    writer: W,
48    schema: SchemaDescriptor,
49    options: WriteOptions,
50    created_by: Option<String>,
51
52    offset: u64,
53    row_groups: Vec<RowGroup>,
54    page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
55    /// Used to store the current state for writing the file
56    state: State,
57    // when the file is written, metadata becomes available
58    metadata: Option<ThriftFileMetaData>,
59}
60
61/// Writes a parquet file containing only the header and footer
62///
63/// This is used to write the metadata as a separate Parquet file, usually when data
64/// is partitioned across multiple files.
65///
66/// Note: Recall that when combining row groups from [`ThriftFileMetaData`], the `file_path` on each
67/// of their column chunks must be updated with their path relative to where they are written to.
68pub fn write_metadata_sidecar<W: Write>(
69    writer: &mut W,
70    metadata: &ThriftFileMetaData,
71) -> Result<u64> {
72    let mut len = start_file(writer)?;
73    len += end_file(writer, metadata)?;
74    Ok(len)
75}
76
77// Accessors
78impl<W: Write> FileWriter<W> {
79    /// The options assigned to the file
80    pub fn options(&self) -> &WriteOptions {
81        &self.options
82    }
83
84    /// The [`SchemaDescriptor`] assigned to this file
85    pub fn schema(&self) -> &SchemaDescriptor {
86        &self.schema
87    }
88
89    /// Returns the [`ThriftFileMetaData`]. This is Some iff the [`Self::end`] has been called.
90    ///
91    /// This is used to write the metadata as a separate Parquet file, usually when data
92    /// is partitioned across multiple files
93    pub fn metadata(&self) -> Option<&ThriftFileMetaData> {
94        self.metadata.as_ref()
95    }
96}
97
98impl<W: Write> FileWriter<W> {
99    /// Returns a new [`FileWriter`].
100    pub fn new(
101        writer: W,
102        schema: SchemaDescriptor,
103        options: WriteOptions,
104        created_by: Option<String>,
105    ) -> Self {
106        Self {
107            writer,
108            schema,
109            options,
110            created_by,
111            offset: 0,
112            row_groups: vec![],
113            page_specs: vec![],
114            state: State::Initialised,
115            metadata: None,
116        }
117    }
118
119    /// Writes the header of the file.
120    ///
121    /// This is automatically called by [`Self::write`] if not called following [`Self::new`].
122    ///
123    /// # Errors
124    /// Returns an error if data has been written to the file.
125    fn start(&mut self) -> Result<()> {
126        if self.offset == 0 {
127            self.offset = start_file(&mut self.writer)?;
128            self.state = State::Started;
129            Ok(())
130        } else {
131            Err(Error::InvalidParameter(
132                "Start cannot be called twice".to_string(),
133            ))
134        }
135    }
136
137    /// Writes a row group to the file.
138    ///
139    /// This call is IO-bounded
140    pub fn write<E>(&mut self, row_group: RowGroupIter<'_, E>) -> Result<()>
141    where
142        Error: From<E>,
143        E: std::error::Error,
144    {
145        if self.offset == 0 {
146            self.start()?;
147        }
148        let ordinal = self.row_groups.len();
149        let (group, specs, size) = write_row_group(
150            &mut self.writer,
151            self.offset,
152            self.schema.columns(),
153            row_group,
154            ordinal,
155        )?;
156        self.offset += size;
157        self.row_groups.push(group);
158        self.page_specs.push(specs);
159        Ok(())
160    }
161
162    /// Writes the footer of the parquet file. Returns the total size of the file and the
163    /// underlying writer.
164    pub fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<u64> {
165        if self.offset == 0 {
166            self.start()?;
167        }
168
169        if self.state != State::Started {
170            return Err(Error::InvalidParameter(
171                "End cannot be called twice".to_string(),
172            ));
173        }
174        // compute file stats
175        let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
176
177        if self.options.write_statistics {
178            // write column indexes (require page statistics)
179            self.row_groups
180                .iter_mut()
181                .zip(self.page_specs.iter())
182                .try_for_each(|(group, pages)| {
183                    group.columns.iter_mut().zip(pages.iter()).try_for_each(
184                        |(column, pages)| {
185                            let offset = self.offset;
186                            column.column_index_offset = Some(offset as i64);
187                            self.offset += write_column_index(&mut self.writer, pages)?;
188                            let length = self.offset - offset;
189                            column.column_index_length = Some(length as i32);
190                            Result::Ok(())
191                        },
192                    )?;
193                    Result::Ok(())
194                })?;
195        };
196
197        // write offset index
198        self.row_groups
199            .iter_mut()
200            .zip(self.page_specs.iter())
201            .try_for_each(|(group, pages)| {
202                group
203                    .columns
204                    .iter_mut()
205                    .zip(pages.iter())
206                    .try_for_each(|(column, pages)| {
207                        let offset = self.offset;
208                        column.offset_index_offset = Some(offset as i64);
209                        self.offset += write_offset_index(&mut self.writer, pages)?;
210                        column.offset_index_length = Some((self.offset - offset) as i32);
211                        Result::Ok(())
212                    })?;
213                Result::Ok(())
214            })?;
215
216        let metadata = ThriftFileMetaData::new(
217            self.options.version.into(),
218            self.schema.clone().into_thrift(),
219            num_rows,
220            self.row_groups.clone(),
221            key_value_metadata,
222            self.created_by.clone(),
223            None,
224            None,
225            None,
226        );
227
228        let len = end_file(&mut self.writer, &metadata)?;
229        self.state = State::Finished;
230        self.metadata = Some(metadata);
231        Ok(self.offset + len)
232    }
233
234    /// Returns the underlying writer.
235    pub fn into_inner(self) -> W {
236        self.writer
237    }
238
239    /// Returns the underlying writer and [`ThriftFileMetaData`]
240    /// # Panics
241    /// This function panics if [`Self::end`] has not yet been called
242    pub fn into_inner_and_metadata(self) -> (W, ThriftFileMetaData) {
243        (self.writer, self.metadata.expect("File to have ended"))
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use std::{fs::File, io::Cursor};
250
251    use super::*;
252
253    use crate::error::Result;
254    use crate::read::read_metadata;
255    use crate::tests::get_path;
256
257    #[test]
258    fn empty_file() -> Result<()> {
259        let mut testdata = get_path();
260        testdata.push("alltypes_plain.parquet");
261        let mut file = File::open(testdata).unwrap();
262
263        let mut metadata = read_metadata(&mut file)?;
264
265        // take away all groups and rows
266        metadata.row_groups = vec![];
267        metadata.num_rows = 0;
268
269        let mut writer = Cursor::new(vec![]);
270
271        // write the file
272        start_file(&mut writer)?;
273        end_file(&mut writer, &metadata.into_thrift())?;
274
275        let a = writer.into_inner();
276
277        // read it again:
278        let result = read_metadata(&mut Cursor::new(a));
279        assert!(result.is_ok());
280
281        Ok(())
282    }
283}