polars_json/ndjson/
write.rs1use std::io::Write;
3
4use arrow::array::Array;
5pub use fallible_streaming_iterator::FallibleStreamingIterator;
6use polars_error::{PolarsError, PolarsResult};
7
8use super::super::json::write::new_serializer;
9
10fn serialize(array: &dyn Array, buffer: &mut Vec<u8>) {
11 let mut serializer = new_serializer(array, 0, usize::MAX);
12 (0..array.len()).for_each(|_| {
13 buffer.extend_from_slice(serializer.next().unwrap());
14 buffer.push(b'\n');
15 });
16}
17
18#[derive(Debug, Clone)]
23pub struct Serializer<A, I>
24where
25 A: AsRef<dyn Array>,
26 I: Iterator<Item = PolarsResult<A>>,
27{
28 arrays: I,
29 buffer: Vec<u8>,
30}
31
32impl<A, I> Serializer<A, I>
33where
34 A: AsRef<dyn Array>,
35 I: Iterator<Item = PolarsResult<A>>,
36{
37 pub fn new(arrays: I, buffer: Vec<u8>) -> Self {
39 Self { arrays, buffer }
40 }
41}
42
43impl<A, I> FallibleStreamingIterator for Serializer<A, I>
44where
45 A: AsRef<dyn Array>,
46 I: Iterator<Item = PolarsResult<A>>,
47{
48 type Item = [u8];
49
50 type Error = PolarsError;
51
52 fn advance(&mut self) -> PolarsResult<()> {
53 self.buffer.clear();
54 self.arrays
55 .next()
56 .map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer)))
57 .transpose()?;
58 Ok(())
59 }
60
61 fn get(&self) -> Option<&Self::Item> {
62 if !self.buffer.is_empty() {
63 Some(&self.buffer)
64 } else {
65 None
66 }
67 }
68}
69
70pub struct FileWriter<W, I>
76where
77 W: Write,
78 I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
79{
80 writer: W,
81 iterator: I,
82}
83
84impl<W, I> FileWriter<W, I>
85where
86 W: Write,
87 I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
88{
89 pub fn new(writer: W, iterator: I) -> Self {
91 Self { writer, iterator }
92 }
93
94 pub fn into_inner(self) -> (W, I) {
100 (self.writer, self.iterator)
101 }
102}
103
104impl<W, I> Iterator for FileWriter<W, I>
105where
106 W: Write,
107 I: FallibleStreamingIterator<Item = [u8], Error = PolarsError>,
108{
109 type Item = PolarsResult<()>;
110
111 fn next(&mut self) -> Option<Self::Item> {
112 let item = self.iterator.next().transpose()?;
113 Some(item.and_then(|x| {
114 self.writer.write_all(x)?;
115 Ok(())
116 }))
117 }
118}