pub struct Decoder { /* private fields */ }
Expand description
A low-level interface for reading JSON data from a byte stream
See Reader
for a higher-level interface for interface with BufRead
The push-based interface facilitates integration with sources that yield arbitrarily
delimited bytes ranges, such as BufRead
, or a chunked byte stream received from
object storage
fn read_from_json<R: BufRead>(
mut reader: R,
schema: SchemaRef,
) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
let mut next = move || {
loop {
// Decoder is agnostic that buf doesn't contain whole records
let buf = reader.fill_buf()?;
if buf.is_empty() {
break; // Input exhausted
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
// Consume the number of bytes read
reader.consume(decoded);
if decoded != read {
break; // Read batch size
}
}
decoder.flush()
};
Ok(std::iter::from_fn(move || next().transpose()))
}
Implementations§
source§impl Decoder
impl Decoder
sourcepub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
Read JSON objects from buf
, returning the number of bytes read
This method returns once batch_size
objects have been parsed since the
last call to Self::flush
, or buf
is exhausted. Any remaining bytes
should be included in the next call to Self::decode
There is no requirement that buf
contains a whole number of records, facilitating
integration with arbitrary byte streams, such as that yielded by BufRead
sourcepub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
pub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
Serialize rows
to this Decoder
This provides a simple way to convert serde-compatible datastructures into arrow
RecordBatch
.
Custom conversion logic as described in arrow_array::builder will likely outperform this, especially where the schema is known at compile-time, however, this provides a mechanism to get something up and running quickly
It can be used with serde_json::Value
let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&json).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 1);
let values = batch.column(0).as_primitive::<Float32Type>().values();
assert_eq!(values, &[2.3, 5.7])
Or with arbitrary Serialize
types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
float: f32,
}
let schema = Schema::new(vec![
Field::new("int32", DataType::Int32, false),
Field::new("float", DataType::Float32, false),
]);
let rows = vec![
MyStruct{ int32: 0, float: 3. },
MyStruct{ int32: 4, float: 67.53 },
];
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&rows).unwrap();
let batch = decoder.flush().unwrap().unwrap();
// Expect batch containing two columns
let int32 = batch.column(0).as_primitive::<Int32Type>();
assert_eq!(int32.values(), &[0, 4]);
let float = batch.column(1).as_primitive::<Float32Type>();
assert_eq!(float.values(), &[3., 67.53]);
Or even complex nested types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
list: Vec<f64>,
nested: Vec<Option<Nested>>,
}
impl MyStruct {
/// Returns the [`Fields`] for [`MyStruct`]
fn fields() -> Fields {
let nested = DataType::Struct(Nested::fields());
Fields::from([
Arc::new(Field::new("int32", DataType::Int32, false)),
Arc::new(Field::new_list(
"list",
Field::new("element", DataType::Float64, false),
false,
)),
Arc::new(Field::new_list(
"nested",
Field::new("element", nested, true),
true,
)),
])
}
}
#[derive(Serialize)]
struct Nested {
map: BTreeMap<String, Vec<String>>
}
impl Nested {
/// Returns the [`Fields`] for [`Nested`]
fn fields() -> Fields {
let element = Field::new("element", DataType::Utf8, false);
Fields::from([
Arc::new(Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new_list("value", element, false),
false, // sorted
false, // nullable
))
])
}
}
let data = vec![
MyStruct {
int32: 34,
list: vec![1., 2., 34.],
nested: vec![
None,
Some(Nested {
map: vec![
("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
("key2".to_string(), vec!["baz".to_string()])
].into_iter().collect()
})
]
},
MyStruct {
int32: 56,
list: vec![],
nested: vec![]
},
MyStruct {
int32: 24,
list: vec![-1., 245.],
nested: vec![None]
}
];
let schema = Schema::new(MyStruct::fields());
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&data).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 3);
// Convert to StructArray to format
let s = StructArray::from(batch);
let options = FormatOptions::default().with_null("null");
let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
Note: this ignores any batch size setting, and always decodes all rows
sourcepub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Flushes the currently buffered data to a RecordBatch
Returns Ok(None)
if no buffered data
Note: if called part way through decoding a record, this will return an error
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Decoder
impl !RefUnwindSafe for Decoder
impl Send for Decoder
impl !Sync for Decoder
impl Unpin for Decoder
impl !UnwindSafe for Decoder
Blanket Implementations§
source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
source§impl<T> Instrument for T
impl<T> Instrument for T
source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
source§impl<T> IntoEither for T
impl<T> IntoEither for T
source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moresource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more