pub struct Decoder { /* private fields */ }
Expand description
A low-level interface for reading JSON data from a byte stream
See Reader
for a higher-level interface for interface with BufRead
The push-based interface facilitates integration with sources that yield arbitrarily
delimited bytes ranges, such as BufRead
, or a chunked byte stream received from
object storage
fn read_from_json<R: BufRead>(
mut reader: R,
schema: SchemaRef,
) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
let mut next = move || {
loop {
// Decoder is agnostic that buf doesn't contain whole records
let buf = reader.fill_buf()?;
if buf.is_empty() {
break; // Input exhausted
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
// Consume the number of bytes read
reader.consume(decoded);
if decoded != read {
break; // Read batch size
}
}
decoder.flush()
};
Ok(std::iter::from_fn(move || next().transpose()))
}
Implementations§
Source§impl Decoder
impl Decoder
Sourcepub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
Read JSON objects from buf
, returning the number of bytes read
This method returns once batch_size
objects have been parsed since the
last call to Self::flush
, or buf
is exhausted. Any remaining bytes
should be included in the next call to Self::decode
There is no requirement that buf
contains a whole number of records, facilitating
integration with arbitrary byte streams, such as those yielded by BufRead
Sourcepub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
pub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
Serialize rows
to this Decoder
This provides a simple way to convert serde-compatible datastructures into arrow
RecordBatch
.
Custom conversion logic as described in arrow_array::builder will likely outperform this, especially where the schema is known at compile-time, however, this provides a mechanism to get something up and running quickly
It can be used with serde_json::Value
let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&json).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 1);
let values = batch.column(0).as_primitive::<Float32Type>().values();
assert_eq!(values, &[2.3, 5.7])
Or with arbitrary Serialize
types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
float: f32,
}
let schema = Schema::new(vec![
Field::new("int32", DataType::Int32, false),
Field::new("float", DataType::Float32, false),
]);
let rows = vec![
MyStruct{ int32: 0, float: 3. },
MyStruct{ int32: 4, float: 67.53 },
];
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&rows).unwrap();
let batch = decoder.flush().unwrap().unwrap();
// Expect batch containing two columns
let int32 = batch.column(0).as_primitive::<Int32Type>();
assert_eq!(int32.values(), &[0, 4]);
let float = batch.column(1).as_primitive::<Float32Type>();
assert_eq!(float.values(), &[3., 67.53]);
Or even complex nested types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
list: Vec<f64>,
nested: Vec<Option<Nested>>,
}
impl MyStruct {
/// Returns the [`Fields`] for [`MyStruct`]
fn fields() -> Fields {
let nested = DataType::Struct(Nested::fields());
Fields::from([
Arc::new(Field::new("int32", DataType::Int32, false)),
Arc::new(Field::new_list(
"list",
Field::new("element", DataType::Float64, false),
false,
)),
Arc::new(Field::new_list(
"nested",
Field::new("element", nested, true),
true,
)),
])
}
}
#[derive(Serialize)]
struct Nested {
map: BTreeMap<String, Vec<String>>
}
impl Nested {
/// Returns the [`Fields`] for [`Nested`]
fn fields() -> Fields {
let element = Field::new("element", DataType::Utf8, false);
Fields::from([
Arc::new(Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new_list("value", element, false),
false, // sorted
false, // nullable
))
])
}
}
let data = vec![
MyStruct {
int32: 34,
list: vec![1., 2., 34.],
nested: vec![
None,
Some(Nested {
map: vec![
("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
("key2".to_string(), vec!["baz".to_string()])
].into_iter().collect()
})
]
},
MyStruct {
int32: 56,
list: vec![],
nested: vec![]
},
MyStruct {
int32: 24,
list: vec![-1., 245.],
nested: vec![None]
}
];
let schema = Schema::new(MyStruct::fields());
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&data).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 3);
// Convert to StructArray to format
let s = StructArray::from(batch);
let options = FormatOptions::default().with_null("null");
let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
Note: this ignores any batch size setting, and always decodes all rows
Sourcepub fn has_partial_record(&self) -> bool
pub fn has_partial_record(&self) -> bool
True if the decoder is currently part way through decoding a record.
Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
The number of unflushed records, including the partially decoded record (if any).
Sourcepub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Flushes the currently buffered data to a RecordBatch
Returns Ok(None)
if no buffered data, i.e. Self::is_empty
is true.
Note: This will return an error if called part way through decoding a record,
i.e. Self::has_partial_record
is true.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Decoder
impl !RefUnwindSafe for Decoder
impl Send for Decoder
impl !Sync for Decoder
impl Unpin for Decoder
impl !UnwindSafe for Decoder
Blanket Implementations§
Source§impl<T> AlignerFor<1> for T
impl<T> AlignerFor<1> for T
Source§impl<T> AlignerFor<1024> for T
impl<T> AlignerFor<1024> for T
Source§type Aligner = AlignTo1024<T>
type Aligner = AlignTo1024<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<128> for T
impl<T> AlignerFor<128> for T
Source§type Aligner = AlignTo128<T>
type Aligner = AlignTo128<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<16> for T
impl<T> AlignerFor<16> for T
Source§impl<T> AlignerFor<16384> for T
impl<T> AlignerFor<16384> for T
Source§type Aligner = AlignTo16384<T>
type Aligner = AlignTo16384<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<2> for T
impl<T> AlignerFor<2> for T
Source§impl<T> AlignerFor<2048> for T
impl<T> AlignerFor<2048> for T
Source§type Aligner = AlignTo2048<T>
type Aligner = AlignTo2048<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<256> for T
impl<T> AlignerFor<256> for T
Source§type Aligner = AlignTo256<T>
type Aligner = AlignTo256<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<32> for T
impl<T> AlignerFor<32> for T
Source§impl<T> AlignerFor<32768> for T
impl<T> AlignerFor<32768> for T
Source§type Aligner = AlignTo32768<T>
type Aligner = AlignTo32768<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<4> for T
impl<T> AlignerFor<4> for T
Source§impl<T> AlignerFor<4096> for T
impl<T> AlignerFor<4096> for T
Source§type Aligner = AlignTo4096<T>
type Aligner = AlignTo4096<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<512> for T
impl<T> AlignerFor<512> for T
Source§type Aligner = AlignTo512<T>
type Aligner = AlignTo512<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> AlignerFor<64> for T
impl<T> AlignerFor<64> for T
Source§impl<T> AlignerFor<8> for T
impl<T> AlignerFor<8> for T
Source§impl<T> AlignerFor<8192> for T
impl<T> AlignerFor<8192> for T
Source§type Aligner = AlignTo8192<T>
type Aligner = AlignTo8192<T>
AlignTo*
type which aligns Self
to ALIGNMENT
.Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<S> ROExtAcc for S
impl<S> ROExtAcc for S
Source§fn f_get<F>(&self, offset: FieldOffset<S, F, Aligned>) -> &F
fn f_get<F>(&self, offset: FieldOffset<S, F, Aligned>) -> &F
offset
. Read moreSource§fn f_get_mut<F>(&mut self, offset: FieldOffset<S, F, Aligned>) -> &mut F
fn f_get_mut<F>(&mut self, offset: FieldOffset<S, F, Aligned>) -> &mut F
offset
. Read moreSource§fn f_get_ptr<F, A>(&self, offset: FieldOffset<S, F, A>) -> *const F
fn f_get_ptr<F, A>(&self, offset: FieldOffset<S, F, A>) -> *const F
offset
. Read moreSource§fn f_get_mut_ptr<F, A>(&mut self, offset: FieldOffset<S, F, A>) -> *mut F
fn f_get_mut_ptr<F, A>(&mut self, offset: FieldOffset<S, F, A>) -> *mut F
offset
. Read moreSource§impl<S> ROExtOps<Aligned> for S
impl<S> ROExtOps<Aligned> for S
Source§fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Aligned>, value: F) -> F
fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Aligned>, value: F) -> F
offset
) with value
,
returning the previous value of the field. Read moreSource§fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Aligned>) -> Fwhere
F: Copy,
fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Aligned>) -> Fwhere
F: Copy,
Source§impl<S> ROExtOps<Unaligned> for S
impl<S> ROExtOps<Unaligned> for S
Source§fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, value: F) -> F
fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, value: F) -> F
offset
) with value
,
returning the previous value of the field. Read moreSource§fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Unaligned>) -> Fwhere
F: Copy,
fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Unaligned>) -> Fwhere
F: Copy,
Source§impl<T> SelfOps for Twhere
T: ?Sized,
impl<T> SelfOps for Twhere
T: ?Sized,
Source§fn piped<F, U>(self, f: F) -> U
fn piped<F, U>(self, f: F) -> U
Source§fn piped_ref<'a, F, U>(&'a self, f: F) -> Uwhere
F: FnOnce(&'a Self) -> U,
fn piped_ref<'a, F, U>(&'a self, f: F) -> Uwhere
F: FnOnce(&'a Self) -> U,
piped
except that the function takes &Self
Useful for functions that take &Self
instead of Self
. Read moreSource§fn piped_mut<'a, F, U>(&'a mut self, f: F) -> Uwhere
F: FnOnce(&'a mut Self) -> U,
fn piped_mut<'a, F, U>(&'a mut self, f: F) -> Uwhere
F: FnOnce(&'a mut Self) -> U,
piped
, except that the function takes &mut Self
.
Useful for functions that take &mut Self
instead of Self
.Source§fn mutated<F>(self, f: F) -> Self
fn mutated<F>(self, f: F) -> Self
Source§fn observe<F>(self, f: F) -> Self
fn observe<F>(self, f: F) -> Self
Source§fn as_ref_<T>(&self) -> &T
fn as_ref_<T>(&self) -> &T
AsRef
,
using the turbofish .as_ref_::<_>()
syntax. Read more