arrow2/io/json/write/
mod.rs1mod serialize;
3mod utf8;
4
5pub use fallible_streaming_iterator::*;
6pub(crate) use serialize::new_serializer;
7use serialize::serialize;
8use std::io::Write;
9
10use crate::{
11 array::Array, chunk::Chunk, datatypes::Schema, error::Error, io::iterator::StreamingIterator,
12};
13
14#[derive(Debug, Clone)]
18pub struct Serializer<A, I>
19where
20 A: AsRef<dyn Array>,
21 I: Iterator<Item = Result<A, Error>>,
22{
23 arrays: I,
24 buffer: Vec<u8>,
25}
26
27impl<A, I> Serializer<A, I>
28where
29 A: AsRef<dyn Array>,
30 I: Iterator<Item = Result<A, Error>>,
31{
32 pub fn new(arrays: I, buffer: Vec<u8>) -> Self {
34 Self { arrays, buffer }
35 }
36}
37
38impl<A, I> FallibleStreamingIterator for Serializer<A, I>
39where
40 A: AsRef<dyn Array>,
41 I: Iterator<Item = Result<A, Error>>,
42{
43 type Item = [u8];
44
45 type Error = Error;
46
47 fn advance(&mut self) -> Result<(), Error> {
48 self.buffer.clear();
49 self.arrays
50 .next()
51 .map(|maybe_array| maybe_array.map(|array| serialize(array.as_ref(), &mut self.buffer)))
52 .transpose()?;
53 Ok(())
54 }
55
56 fn get(&self) -> Option<&Self::Item> {
57 if !self.buffer.is_empty() {
58 Some(&self.buffer)
59 } else {
60 None
61 }
62 }
63}
64
65pub struct RecordSerializer<'a> {
71 schema: Schema,
72 index: usize,
73 end: usize,
74 iterators: Vec<Box<dyn StreamingIterator<Item = [u8]> + Send + Sync + 'a>>,
75 buffer: Vec<u8>,
76}
77
78impl<'a> RecordSerializer<'a> {
79 pub fn new<A>(schema: Schema, chunk: &'a Chunk<A>, buffer: Vec<u8>) -> Self
81 where
82 A: AsRef<dyn Array>,
83 {
84 let end = chunk.len();
85 let iterators = chunk
86 .arrays()
87 .iter()
88 .map(|arr| new_serializer(arr.as_ref(), 0, usize::MAX))
89 .collect();
90
91 Self {
92 schema,
93 index: 0,
94 end,
95 iterators,
96 buffer,
97 }
98 }
99}
100
101impl<'a> FallibleStreamingIterator for RecordSerializer<'a> {
102 type Item = [u8];
103
104 type Error = Error;
105
106 fn advance(&mut self) -> Result<(), Error> {
107 self.buffer.clear();
108 if self.index == self.end {
109 return Ok(());
110 }
111
112 let mut is_first_row = true;
113 write!(&mut self.buffer, "{{")?;
114 for (f, ref mut it) in self.schema.fields.iter().zip(self.iterators.iter_mut()) {
115 if !is_first_row {
116 write!(&mut self.buffer, ",")?;
117 }
118 write!(&mut self.buffer, "\"{}\":", f.name)?;
119
120 self.buffer.extend_from_slice(it.next().unwrap());
121 is_first_row = false;
122 }
123 write!(&mut self.buffer, "}}")?;
124
125 self.index += 1;
126 Ok(())
127 }
128
129 fn get(&self) -> Option<&Self::Item> {
130 if !self.buffer.is_empty() {
131 Some(&self.buffer)
132 } else {
133 None
134 }
135 }
136}
137
138pub fn write<W, I>(writer: &mut W, mut blocks: I) -> Result<(), Error>
140where
141 W: std::io::Write,
142 I: FallibleStreamingIterator<Item = [u8], Error = Error>,
143{
144 writer.write_all(&[b'['])?;
145 let mut is_first_row = true;
146 while let Some(block) = blocks.next()? {
147 if !is_first_row {
148 writer.write_all(&[b','])?;
149 }
150 is_first_row = false;
151 writer.write_all(block)?;
152 }
153 writer.write_all(&[b']'])?;
154 Ok(())
155}