parquet2/write/
stream.rs

1use std::io::Write;
2
3use futures::{AsyncWrite, AsyncWriteExt};
4
5use parquet_format_safe::{thrift::protocol::TCompactOutputStreamProtocol, FileMetaData, RowGroup};
6
7use crate::write::indexes::{write_column_index_async, write_offset_index_async};
8use crate::write::page::PageWriteSpec;
9use crate::write::State;
10use crate::{
11    error::{Error, Result},
12    metadata::{KeyValue, SchemaDescriptor},
13    FOOTER_SIZE, PARQUET_MAGIC,
14};
15
16use super::{row_group::write_row_group_async, RowGroupIter, WriteOptions};
17
18async fn start_file<W: AsyncWrite + Unpin>(writer: &mut W) -> Result<u64> {
19    writer.write_all(&PARQUET_MAGIC).await?;
20    Ok(PARQUET_MAGIC.len() as u64)
21}
22
23async fn end_file<W: AsyncWrite + Unpin + Send>(
24    mut writer: &mut W,
25    metadata: FileMetaData,
26) -> Result<u64> {
27    // Write file metadata
28    let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
29    let metadata_len = metadata.write_to_out_stream_protocol(&mut protocol).await? as i32;
30
31    // Write footer
32    let metadata_bytes = metadata_len.to_le_bytes();
33    let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
34    (0..4).for_each(|i| {
35        footer_buffer[i] = metadata_bytes[i];
36    });
37
38    (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
39    writer.write_all(&footer_buffer).await?;
40    writer.flush().await?;
41    Ok(metadata_len as u64 + FOOTER_SIZE)
42}
43
44/// An interface to write a parquet file asynchronously.
45/// Use `start` to write the header, `write` to write a row group,
46/// and `end` to write the footer.
47pub struct FileStreamer<W: AsyncWrite + Unpin + Send> {
48    writer: W,
49    schema: SchemaDescriptor,
50    options: WriteOptions,
51    created_by: Option<String>,
52
53    offset: u64,
54    row_groups: Vec<RowGroup>,
55    page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
56    /// Used to store the current state for writing the file
57    state: State,
58}
59
60// Accessors
61impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
62    /// The options assigned to the file
63    pub fn options(&self) -> &WriteOptions {
64        &self.options
65    }
66
67    /// The [`SchemaDescriptor`] assigned to this file
68    pub fn schema(&self) -> &SchemaDescriptor {
69        &self.schema
70    }
71}
72
73impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
74    /// Returns a new [`FileStreamer`].
75    pub fn new(
76        writer: W,
77        schema: SchemaDescriptor,
78        options: WriteOptions,
79        created_by: Option<String>,
80    ) -> Self {
81        Self {
82            writer,
83            schema,
84            options,
85            created_by,
86            offset: 0,
87            row_groups: vec![],
88            page_specs: vec![],
89            state: State::Initialised,
90        }
91    }
92
93    /// Writes the header of the file.
94    ///
95    /// This is automatically called by [`Self::write`] if not called following [`Self::new`].
96    ///
97    /// # Errors
98    /// Returns an error if data has been written to the file.
99    async fn start(&mut self) -> Result<()> {
100        if self.offset == 0 {
101            self.offset = start_file(&mut self.writer).await? as u64;
102            self.state = State::Started;
103            Ok(())
104        } else {
105            Err(Error::InvalidParameter(
106                "Start cannot be called twice".to_string(),
107            ))
108        }
109    }
110
111    /// Writes a row group to the file.
112    pub async fn write<E>(&mut self, row_group: RowGroupIter<'_, E>) -> Result<()>
113    where
114        Error: From<E>,
115        E: std::error::Error,
116    {
117        if self.offset == 0 {
118            self.start().await?;
119        }
120
121        let ordinal = self.row_groups.len();
122        let (group, specs, size) = write_row_group_async(
123            &mut self.writer,
124            self.offset,
125            self.schema.columns(),
126            row_group,
127            ordinal,
128        )
129        .await?;
130        self.offset += size;
131        self.row_groups.push(group);
132        self.page_specs.push(specs);
133        Ok(())
134    }
135
136    /// Writes the footer of the parquet file. Returns the total size of the file and the
137    /// underlying writer.
138    pub async fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<u64> {
139        if self.offset == 0 {
140            self.start().await?;
141        }
142
143        if self.state != State::Started {
144            return Err(Error::InvalidParameter(
145                "End cannot be called twice".to_string(),
146            ));
147        }
148        // compute file stats
149        let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
150
151        if self.options.write_statistics {
152            // write column indexes (require page statistics)
153            for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
154                for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
155                    let offset = self.offset;
156                    column.column_index_offset = Some(offset as i64);
157                    self.offset += write_column_index_async(&mut self.writer, pages).await?;
158                    let length = self.offset - offset;
159                    column.column_index_length = Some(length as i32);
160                }
161            }
162        };
163
164        // write offset index
165        for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
166            for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
167                let offset = self.offset;
168                column.offset_index_offset = Some(offset as i64);
169                self.offset += write_offset_index_async(&mut self.writer, pages).await?;
170                column.offset_index_length = Some((self.offset - offset) as i32);
171            }
172        }
173
174        let metadata = FileMetaData::new(
175            self.options.version.into(),
176            self.schema.clone().into_thrift(),
177            num_rows,
178            self.row_groups.clone(),
179            key_value_metadata,
180            self.created_by.clone(),
181            None,
182            None,
183            None,
184        );
185
186        let len = end_file(&mut self.writer, metadata).await?;
187        Ok(self.offset + len)
188    }
189
190    /// Returns the underlying writer.
191    pub fn into_inner(self) -> W {
192        self.writer
193    }
194}