arrow2/io/json/write/
mod.rs

1//! APIs to write to JSON
2mod serialize;
3mod utf8;
4
5pub use fallible_streaming_iterator::*;
6pub(crate) use serialize::new_serializer;
7use serialize::serialize;
8use std::io::Write;
9
10use crate::{
11    array::Array, chunk::Chunk, datatypes::Schema, error::Error, io::iterator::StreamingIterator,
12};
13
14/// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid JSON
15/// # Implementation
16/// Advancing this iterator CPU-bounded
17#[derive(Debug, Clone)]
18pub struct Serializer<A, I>
19where
20    A: AsRef<dyn Array>,
21    I: Iterator<Item = Result<A, Error>>,
22{
23    arrays: I,
24    buffer: Vec<u8>,
25}
26
27impl<A, I> Serializer<A, I>
28where
29    A: AsRef<dyn Array>,
30    I: Iterator<Item = Result<A, Error>>,
31{
32    /// Creates a new [`Serializer`].
33    pub fn new(arrays: I, buffer: Vec<u8>) -> Self {
34        Self { arrays, buffer }
35    }
36}
37
38impl<A, I> FallibleStreamingIterator for Serializer<A, I>
39where
40    A: AsRef<dyn Array>,
41    I: Iterator<Item = Result<A, Error>>,
42{
43    type Item = [u8];
44
45    type Error = Error;
46
47    fn advance(&mut self) -> Result<(), Error> {
48        self.buffer.clear();
49        self.arrays
50            .next()
51            .map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer)))
52            .transpose()?;
53        Ok(())
54    }
55
56    fn get(&self) -> Option<&Self::Item> {
57        if !self.buffer.is_empty() {
58            Some(&self.buffer)
59        } else {
60            None
61        }
62    }
63}
64
65/// [`FallibleStreamingIterator`] that serializes a [`Chunk`] into bytes of JSON
66/// in a (pandas-compatible) record-oriented format.
67///
68/// # Implementation
69/// Advancing this iterator is CPU-bounded.
70pub struct RecordSerializer<'a> {
71    schema: Schema,
72    index: usize,
73    end: usize,
74    iterators: Vec<Box<dyn StreamingIterator<Item = [u8]> + Send + Sync + 'a>>,
75    buffer: Vec<u8>,
76}
77
78impl<'a> RecordSerializer<'a> {
79    /// Creates a new [`RecordSerializer`].
80    pub fn new<A>(schema: Schema, chunk: &'a Chunk<A>, buffer: Vec<u8>) -> Self
81    where
82        A: AsRef<dyn Array>,
83    {
84        let end = chunk.len();
85        let iterators = chunk
86            .arrays()
87            .iter()
88            .map(|arr| new_serializer(arr.as_ref(), 0, usize::MAX))
89            .collect();
90
91        Self {
92            schema,
93            index: 0,
94            end,
95            iterators,
96            buffer,
97        }
98    }
99}
100
101impl<'a> FallibleStreamingIterator for RecordSerializer<'a> {
102    type Item = [u8];
103
104    type Error = Error;
105
106    fn advance(&mut self) -> Result<(), Error> {
107        self.buffer.clear();
108        if self.index == self.end {
109            return Ok(());
110        }
111
112        let mut is_first_row = true;
113        write!(&mut self.buffer, "{{")?;
114        for (f, ref mut it) in self.schema.fields.iter().zip(self.iterators.iter_mut()) {
115            if !is_first_row {
116                write!(&mut self.buffer, ",")?;
117            }
118            write!(&mut self.buffer, "\"{}\":", f.name)?;
119
120            self.buffer.extend_from_slice(it.next().unwrap());
121            is_first_row = false;
122        }
123        write!(&mut self.buffer, "}}")?;
124
125        self.index += 1;
126        Ok(())
127    }
128
129    fn get(&self) -> Option<&Self::Item> {
130        if !self.buffer.is_empty() {
131            Some(&self.buffer)
132        } else {
133            None
134        }
135    }
136}
137
138/// Writes valid JSON from an iterator of (assumed JSON-encoded) bytes to `writer`
139pub fn write<W, I>(writer: &mut W, mut blocks: I) -> Result<(), Error>
140where
141    W: std::io::Write,
142    I: FallibleStreamingIterator<Item = [u8], Error = Error>,
143{
144    writer.write_all(&[b'['])?;
145    let mut is_first_row = true;
146    while let Some(block) = blocks.next()? {
147        if !is_first_row {
148            writer.write_all(&[b','])?;
149        }
150        is_first_row = false;
151        writer.write_all(block)?;
152    }
153    writer.write_all(&[b']'])?;
154    Ok(())
155}