polars_io/ipc/
write.rs

1use std::io::Write;
2
3use arrow::datatypes::Metadata;
4use arrow::io::ipc::write::{self, EncodedData, WriteOptions};
5use polars_core::prelude::*;
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9use crate::prelude::*;
10use crate::shared::schema_to_arrow_checked;
11
12#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
13#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
14#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
15pub struct IpcWriterOptions {
16    /// Data page compression
17    pub compression: Option<IpcCompression>,
18    /// Compatibility level
19    pub compat_level: CompatLevel,
20    /// Size of each written chunk.
21    pub chunk_size: IdxSize,
22}
23
24impl Default for IpcWriterOptions {
25    fn default() -> Self {
26        Self {
27            compression: None,
28            compat_level: CompatLevel::newest(),
29            chunk_size: 1 << 18,
30        }
31    }
32}
33
34impl IpcWriterOptions {
35    pub fn to_writer<W: Write>(&self, writer: W) -> IpcWriter<W> {
36        IpcWriter::new(writer).with_compression(self.compression)
37    }
38}
39
40/// Write a DataFrame to Arrow's IPC format
41///
42/// # Example
43///
44/// ```
45/// use polars_core::prelude::*;
46/// use polars_io::ipc::IpcWriter;
47/// use std::fs::File;
48/// use polars_io::SerWriter;
49///
50/// fn example(df: &mut DataFrame) -> PolarsResult<()> {
51///     let mut file = File::create("file.ipc").expect("could not create file");
52///
53///     let mut writer = IpcWriter::new(&mut file);
54///
55///     let custom_metadata = [
56///         ("first_name".into(), "John".into()),
57///         ("last_name".into(), "Doe".into()),
58///     ]
59///     .into_iter()
60///     .collect();
61///     writer.set_custom_schema_metadata(Arc::new(custom_metadata));
62///     writer.finish(df)
63/// }
64///
65/// ```
66#[must_use]
67pub struct IpcWriter<W> {
68    pub(super) writer: W,
69    pub(super) compression: Option<IpcCompression>,
70    /// Polars' flavor of arrow. This might be temporary.
71    pub(super) compat_level: CompatLevel,
72    pub(super) parallel: bool,
73    pub(super) custom_schema_metadata: Option<Arc<Metadata>>,
74}
75
76impl<W: Write> IpcWriter<W> {
77    /// Set the compression used. Defaults to None.
78    pub fn with_compression(mut self, compression: Option<IpcCompression>) -> Self {
79        self.compression = compression;
80        self
81    }
82
83    pub fn with_compat_level(mut self, compat_level: CompatLevel) -> Self {
84        self.compat_level = compat_level;
85        self
86    }
87
88    pub fn with_parallel(mut self, parallel: bool) -> Self {
89        self.parallel = parallel;
90        self
91    }
92
93    pub fn batched(self, schema: &Schema) -> PolarsResult<BatchedWriter<W>> {
94        let schema = schema_to_arrow_checked(schema, self.compat_level, "ipc")?;
95        let mut writer = write::FileWriter::new(
96            self.writer,
97            Arc::new(schema),
98            None,
99            WriteOptions {
100                compression: self.compression.map(|c| c.into()),
101            },
102        );
103        writer.start()?;
104
105        Ok(BatchedWriter {
106            writer,
107            compat_level: self.compat_level,
108        })
109    }
110
111    /// Sets custom schema metadata. Must be called before `start` is called
112    pub fn set_custom_schema_metadata(&mut self, custom_metadata: Arc<Metadata>) {
113        self.custom_schema_metadata = Some(custom_metadata);
114    }
115}
116
117impl<W> SerWriter<W> for IpcWriter<W>
118where
119    W: Write,
120{
121    fn new(writer: W) -> Self {
122        IpcWriter {
123            writer,
124            compression: None,
125            compat_level: CompatLevel::newest(),
126            parallel: true,
127            custom_schema_metadata: None,
128        }
129    }
130
131    fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
132        let schema = schema_to_arrow_checked(df.schema(), self.compat_level, "ipc")?;
133        let mut ipc_writer = write::FileWriter::try_new(
134            &mut self.writer,
135            Arc::new(schema),
136            None,
137            WriteOptions {
138                compression: self.compression.map(|c| c.into()),
139            },
140        )?;
141        if let Some(custom_metadata) = &self.custom_schema_metadata {
142            ipc_writer.set_custom_schema_metadata(Arc::clone(custom_metadata));
143        }
144
145        if self.parallel {
146            df.align_chunks_par();
147        } else {
148            df.align_chunks();
149        }
150        let iter = df.iter_chunks(self.compat_level, true);
151
152        for batch in iter {
153            ipc_writer.write(&batch, None)?
154        }
155        ipc_writer.finish()?;
156        Ok(())
157    }
158}
159
160pub struct BatchedWriter<W: Write> {
161    writer: write::FileWriter<W>,
162    compat_level: CompatLevel,
163}
164
165impl<W: Write> BatchedWriter<W> {
166    /// Write a batch to the ipc writer.
167    ///
168    /// # Panics
169    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
170    pub fn write_batch(&mut self, df: &DataFrame) -> PolarsResult<()> {
171        let iter = df.iter_chunks(self.compat_level, true);
172        for batch in iter {
173            self.writer.write(&batch, None)?
174        }
175        Ok(())
176    }
177
178    /// Write a encoded data to the ipc writer.
179    ///
180    /// # Panics
181    /// The caller must ensure the chunks in the given [`DataFrame`] are aligned.
182    pub fn write_encoded(
183        &mut self,
184        dictionaries: &[EncodedData],
185        message: &EncodedData,
186    ) -> PolarsResult<()> {
187        self.writer.write_encoded(dictionaries, message)?;
188        Ok(())
189    }
190
191    /// Writes the footer of the IPC file.
192    pub fn finish(&mut self) -> PolarsResult<()> {
193        self.writer.finish()?;
194        Ok(())
195    }
196}
197
198/// Compression codec
199#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
200#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
201#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
202pub enum IpcCompression {
203    /// LZ4 (framed)
204    LZ4,
205    /// ZSTD
206    #[default]
207    ZSTD,
208}
209
210impl From<IpcCompression> for write::Compression {
211    fn from(value: IpcCompression) -> Self {
212        match value {
213            IpcCompression::LZ4 => write::Compression::LZ4,
214            IpcCompression::ZSTD => write::Compression::ZSTD,
215        }
216    }
217}