polars_json/ndjson/
write.rs

1//! APIs to serialize and write to [NDJSON](http://ndjson.org/).
2use std::io::Write;
3
4use arrow::array::Array;
5pub use fallible_streaming_iterator::FallibleStreamingIterator;
6use polars_error::{PolarsError, PolarsResult};
7
8use super::super::json::write::new_serializer;
9
10fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
11    let mut serializer = new_serializer(array, 0, usize::MAX);
12    (0..array.len()).for_each(|_| {
13        buffer.extend_from_slice(serializer.next().unwrap());
14        buffer.push(b'\n');
15    });
16}
17
18/// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid NDJSON
19/// where every line is an element of the array.
20/// # Implementation
21/// Advancing this iterator CPU-bounded
22#[derive(Debug, Clone)]
23pub struct Serializer<A, I>
24where
25    A: AsRef<dyn Array>,
26    I: Iterator<Item = PolarsResult<A>>,
27{
28    arrays: I,
29    buffer: Vec<u8>,
30}
31
32impl<A, I> Serializer<A, I>
33where
34    A: AsRef<dyn Array>,
35    I: Iterator<Item = PolarsResult<A>>,
36{
37    /// Creates a new [`Serializer`].
38    pub fn new(arrays: I, buffer: Vec<u8>) -> Self {
39        Self { arrays, buffer }
40    }
41}
42
43impl<A, I> FallibleStreamingIterator for Serializer<A, I>
44where
45    A: AsRef<dyn Array>,
46    I: Iterator<Item = PolarsResult<A>>,
47{
48    type Item = [u8];
49
50    type Error = PolarsError;
51
52    fn advance(&mut self) -> PolarsResult<()> {
53        self.buffer.clear();
54        self.arrays
55            .next()
56            .map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer)))
57            .transpose()?;
58        Ok(())
59    }
60
61    fn get(&self) -> Option<&Self::Item> {
62        if !self.buffer.is_empty() {
63            Some(&self.buffer)
64        } else {
65            None
66        }
67    }
68}
69
70/// An iterator adapter that receives an implementer of [`Write`] and
71/// an implementer of [`FallibleStreamingIterator`] (such as [`Serializer`])
72/// and writes a valid NDJSON
73/// # Implementation
74/// Advancing this iterator mixes CPU-bounded (serializing arrays) tasks and IO-bounded (write to the writer).
75pub struct FileWriter<W, I>
76where
77    W: Write,
78    I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
79{
80    writer: W,
81    iterator: I,
82}
83
84impl<W, I> FileWriter<W, I>
85where
86    W: Write,
87    I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
88{
89    /// Creates a new [`FileWriter`].
90    pub fn new(writer: W, iterator: I) -> Self {
91        Self { writer, iterator }
92    }
93
94    /// Returns the inner content of this iterator
95    ///
96    /// There are two use-cases for this function:
97    /// * to continue writing to its writer
98    /// * to reuse an internal buffer of its iterator
99    pub fn into_inner(self) -> (W, I) {
100        (self.writer, self.iterator)
101    }
102}
103
104impl<W, I> Iterator for FileWriter<W, I>
105where
106    W: Write,
107    I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
108{
109    type Item = PolarsResult<()>;
110
111    fn next(&mut self) -> Option<Self::Item> {
112        let item = self.iterator.next().transpose()?;
113        Some(item.and_then(|x| {
114            self.writer.write_all(x)?;
115            Ok(())
116        }))
117    }
118}