1use std::io::Write;
2
3use futures::{AsyncWrite, AsyncWriteExt};
4
5use parquet_format_safe::{thrift::protocol::TCompactOutputStreamProtocol, FileMetaData, RowGroup};
6
7use crate::write::indexes::{write_column_index_async, write_offset_index_async};
8use crate::write::page::PageWriteSpec;
9use crate::write::State;
10use crate::{
11 error::{Error, Result},
12 metadata::{KeyValue, SchemaDescriptor},
13 FOOTER_SIZE, PARQUET_MAGIC,
14};
15
16use super::{row_group::write_row_group_async, RowGroupIter, WriteOptions};
17
18async fn start_file<W: AsyncWrite + Unpin>(writer: &mut W) -> Result<u64> {
19 writer.write_all(&PARQUET_MAGIC).await?;
20 Ok(PARQUET_MAGIC.len() as u64)
21}
22
23async fn end_file<W: AsyncWrite + Unpin + Send>(
24 mut writer: &mut W,
25 metadata: FileMetaData,
26) -> Result<u64> {
27 let mut protocol = TCompactOutputStreamProtocol::new(&mut writer);
29 let metadata_len = metadata.write_to_out_stream_protocol(&mut protocol).await? as i32;
30
31 let metadata_bytes = metadata_len.to_le_bytes();
33 let mut footer_buffer = [0u8; FOOTER_SIZE as usize];
34 (0..4).for_each(|i| {
35 footer_buffer[i] = metadata_bytes[i];
36 });
37
38 (&mut footer_buffer[4..]).write_all(&PARQUET_MAGIC)?;
39 writer.write_all(&footer_buffer).await?;
40 writer.flush().await?;
41 Ok(metadata_len as u64 + FOOTER_SIZE)
42}
43
44pub struct FileStreamer<W: AsyncWrite + Unpin + Send> {
48 writer: W,
49 schema: SchemaDescriptor,
50 options: WriteOptions,
51 created_by: Option<String>,
52
53 offset: u64,
54 row_groups: Vec<RowGroup>,
55 page_specs: Vec<Vec<Vec<PageWriteSpec>>>,
56 state: State,
58}
59
60impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
62 pub fn options(&self) -> &WriteOptions {
64 &self.options
65 }
66
67 pub fn schema(&self) -> &SchemaDescriptor {
69 &self.schema
70 }
71}
72
73impl<W: AsyncWrite + Unpin + Send> FileStreamer<W> {
74 pub fn new(
76 writer: W,
77 schema: SchemaDescriptor,
78 options: WriteOptions,
79 created_by: Option<String>,
80 ) -> Self {
81 Self {
82 writer,
83 schema,
84 options,
85 created_by,
86 offset: 0,
87 row_groups: vec![],
88 page_specs: vec![],
89 state: State::Initialised,
90 }
91 }
92
93 async fn start(&mut self) -> Result<()> {
100 if self.offset == 0 {
101 self.offset = start_file(&mut self.writer).await? as u64;
102 self.state = State::Started;
103 Ok(())
104 } else {
105 Err(Error::InvalidParameter(
106 "Start cannot be called twice".to_string(),
107 ))
108 }
109 }
110
111 pub async fn write<E>(&mut self, row_group: RowGroupIter<'_, E>) -> Result<()>
113 where
114 Error: From<E>,
115 E: std::error::Error,
116 {
117 if self.offset == 0 {
118 self.start().await?;
119 }
120
121 let ordinal = self.row_groups.len();
122 let (group, specs, size) = write_row_group_async(
123 &mut self.writer,
124 self.offset,
125 self.schema.columns(),
126 row_group,
127 ordinal,
128 )
129 .await?;
130 self.offset += size;
131 self.row_groups.push(group);
132 self.page_specs.push(specs);
133 Ok(())
134 }
135
136 pub async fn end(&mut self, key_value_metadata: Option<Vec<KeyValue>>) -> Result<u64> {
139 if self.offset == 0 {
140 self.start().await?;
141 }
142
143 if self.state != State::Started {
144 return Err(Error::InvalidParameter(
145 "End cannot be called twice".to_string(),
146 ));
147 }
148 let num_rows = self.row_groups.iter().map(|group| group.num_rows).sum();
150
151 if self.options.write_statistics {
152 for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
154 for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
155 let offset = self.offset;
156 column.column_index_offset = Some(offset as i64);
157 self.offset += write_column_index_async(&mut self.writer, pages).await?;
158 let length = self.offset - offset;
159 column.column_index_length = Some(length as i32);
160 }
161 }
162 };
163
164 for (group, pages) in self.row_groups.iter_mut().zip(self.page_specs.iter()) {
166 for (column, pages) in group.columns.iter_mut().zip(pages.iter()) {
167 let offset = self.offset;
168 column.offset_index_offset = Some(offset as i64);
169 self.offset += write_offset_index_async(&mut self.writer, pages).await?;
170 column.offset_index_length = Some((self.offset - offset) as i32);
171 }
172 }
173
174 let metadata = FileMetaData::new(
175 self.options.version.into(),
176 self.schema.clone().into_thrift(),
177 num_rows,
178 self.row_groups.clone(),
179 key_value_metadata,
180 self.created_by.clone(),
181 None,
182 None,
183 None,
184 );
185
186 let len = end_file(&mut self.writer, metadata).await?;
187 Ok(self.offset + len)
188 }
189
190 pub fn into_inner(self) -> W {
192 self.writer
193 }
194}