1use std::io::Write;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::write::{self, EncodedData, WriteOptions};
5use polars_core::prelude::*;
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9use crate::prelude::*;
10use crate::shared::schema_to_arrow_checked;
11
12#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
13#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
14#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
15pub struct IpcWriterOptions {
16 pub compression: Option<IpcCompression>,
18 pub compat_level: CompatLevel,
20 pub chunk_size: IdxSize,
22}
23
24impl Default for IpcWriterOptions {
25 fn default() -> Self {
26 Self {
27 compression: None,
28 compat_level: CompatLevel::newest(),
29 chunk_size: 1 << 18,
30 }
31 }
32}
33
34impl IpcWriterOptions {
35 pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
36 IpcWriter::new(writer).with_compression(self.compression)
37 }
38}
39
40#[must_use]
67pub struct IpcWriter<W> {
68 pub(super) writer: W,
69 pub(super) compression: Option<IpcCompression>,
70 pub(super) compat_level: CompatLevel,
72 pub(super) parallel: bool,
73 pub(super) custom_schema_metadata: Option<Arc<Metadata>>,
74}
75
76impl<W: Write> IpcWriter<W> {
77 pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
79 self.compression = compression;
80 self
81 }
82
83 pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
84 self.compat_level = compat_level;
85 self
86 }
87
88 pub fn with_parallel(mut self, parallel: bool) -> Self {
89 self.parallel = parallel;
90 self
91 }
92
93 pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
94 let schema = schema_to_arrow_checked(schema, self.compat_level, "ipc")?;
95 let mut writer = write::FileWriter::new(
96 self.writer,
97 Arc::new(schema),
98 None,
99 WriteOptions {
100 compression: self.compression.map(|c| c.into()),
101 },
102 );
103 writer.start()?;
104
105 Ok(BatchedWriter {
106 writer,
107 compat_level: self.compat_level,
108 })
109 }
110
111 pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
113 self.custom_schema_metadata = Some(custom_metadata);
114 }
115}
116
117impl<W> SerWriter<W> for IpcWriter<W>
118where
119 W: Write,
120{
121 fn new(writer: W) -> Self {
122 IpcWriter {
123 writer,
124 compression: None,
125 compat_level: CompatLevel::newest(),
126 parallel: true,
127 custom_schema_metadata: None,
128 }
129 }
130
131 fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
132 let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
133 let mut ipc_writer = write::FileWriter::try_new(
134 &mut self.writer,
135 Arc::new(schema),
136 None,
137 WriteOptions {
138 compression: self.compression.map(|c| c.into()),
139 },
140 )?;
141 if let Some(custom_metadata) = &self.custom_schema_metadata {
142 ipc_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
143 }
144
145 if self.parallel {
146 df.align_chunks_par();
147 } else {
148 df.align_chunks();
149 }
150 let iter = df.iter_chunks(self.compat_level, true);
151
152 for batch in iter {
153 ipc_writer.write(&batch, None)?
154 }
155 ipc_writer.finish()?;
156 Ok(())
157 }
158}
159
160pub struct BatchedWriter<W: Write> {
161 writer: write::FileWriter<W>,
162 compat_level: CompatLevel,
163}
164
165impl<W: Write> BatchedWriter<W> {
166 pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
171 let iter = df.iter_chunks(self.compat_level, true);
172 for batch in iter {
173 self.writer.write(&batch, None)?
174 }
175 Ok(())
176 }
177
178 pub fn write_encoded(
183 &mut self,
184 dictionaries: &[EncodedData],
185 message: &EncodedData,
186 ) -> PolarsResult<()> {
187 self.writer.write_encoded(dictionaries, message)?;
188 Ok(())
189 }
190
191 pub fn finish(&mut self) -> PolarsResult<()> {
193 self.writer.finish()?;
194 Ok(())
195 }
196}
197
198#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
200#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
201#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
202pub enum IpcCompression {
203 LZ4,
205 #[default]
207 ZSTD,
208}
209
210impl From<IpcCompression> for write::Compression {
211 fn from(value: IpcCompression) -> Self {
212 match value {
213 IpcCompression::LZ4 => write::Compression::LZ4,
214 IpcCompression::ZSTD => write::Compression::ZSTD,
215 }
216 }
217}