pub struct Decoder { /* private fields */ }Expand description
A low-level interface for reading JSON data from a byte stream
See Reader for a higher-level interface for interface with BufRead
The push-based interface facilitates integration with sources that yield arbitrarily
delimited bytes ranges, such as BufRead, or a chunked byte stream received from
object storage
fn read_from_json<R: BufRead>(
mut reader: R,
schema: SchemaRef,
) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, ArrowError> {
let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
let mut next = move || {
loop {
// Decoder is agnostic that buf doesn't contain whole records
let buf = reader.fill_buf()?;
if buf.is_empty() {
break; // Input exhausted
}
let read = buf.len();
let decoded = decoder.decode(buf)?;
// Consume the number of bytes read
reader.consume(decoded);
if decoded != read {
break; // Read batch size
}
}
decoder.flush()
};
Ok(std::iter::from_fn(move || next().transpose()))
}Implementations§
Source§impl Decoder
impl Decoder
Sourcepub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
pub fn decode(&mut self, buf: &[u8]) -> Result<usize, ArrowError>
Read JSON objects from buf, returning the number of bytes read
This method returns once batch_size objects have been parsed since the
last call to Self::flush, or buf is exhausted. Any remaining bytes
should be included in the next call to Self::decode
There is no requirement that buf contains a whole number of records, facilitating
integration with arbitrary byte streams, such as those yielded by BufRead
Sourcepub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
pub fn serialize<S>(&mut self, rows: &[S]) -> Result<(), ArrowError>where
S: Serialize,
Serialize rows to this Decoder
This provides a simple way to convert serde-compatible datastructures into arrow
RecordBatch.
Custom conversion logic as described in arrow_array::builder will likely outperform this, especially where the schema is known at compile-time, however, this provides a mechanism to get something up and running quickly
It can be used with serde_json::Value
let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&json).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 1);
let values = batch.column(0).as_primitive::<Float32Type>().values();
assert_eq!(values, &[2.3, 5.7])Or with arbitrary Serialize types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
float: f32,
}
let schema = Schema::new(vec![
Field::new("int32", DataType::Int32, false),
Field::new("float", DataType::Float32, false),
]);
let rows = vec![
MyStruct{ int32: 0, float: 3. },
MyStruct{ int32: 4, float: 67.53 },
];
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&rows).unwrap();
let batch = decoder.flush().unwrap().unwrap();
// Expect batch containing two columns
let int32 = batch.column(0).as_primitive::<Int32Type>();
assert_eq!(int32.values(), &[0, 4]);
let float = batch.column(1).as_primitive::<Float32Type>();
assert_eq!(float.values(), &[3., 67.53]);Or even complex nested types
#[derive(Serialize)]
struct MyStruct {
int32: i32,
list: Vec<f64>,
nested: Vec<Option<Nested>>,
}
impl MyStruct {
/// Returns the [`Fields`] for [`MyStruct`]
fn fields() -> Fields {
let nested = DataType::Struct(Nested::fields());
Fields::from([
Arc::new(Field::new("int32", DataType::Int32, false)),
Arc::new(Field::new_list(
"list",
Field::new("element", DataType::Float64, false),
false,
)),
Arc::new(Field::new_list(
"nested",
Field::new("element", nested, true),
true,
)),
])
}
}
#[derive(Serialize)]
struct Nested {
map: BTreeMap<String, Vec<String>>
}
impl Nested {
/// Returns the [`Fields`] for [`Nested`]
fn fields() -> Fields {
let element = Field::new("element", DataType::Utf8, false);
Fields::from([
Arc::new(Field::new_map(
"map",
"entries",
Field::new("key", DataType::Utf8, false),
Field::new_list("value", element, false),
false, // sorted
false, // nullable
))
])
}
}
let data = vec![
MyStruct {
int32: 34,
list: vec![1., 2., 34.],
nested: vec![
None,
Some(Nested {
map: vec![
("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
("key2".to_string(), vec!["baz".to_string()])
].into_iter().collect()
})
]
},
MyStruct {
int32: 56,
list: vec![],
nested: vec![]
},
MyStruct {
int32: 24,
list: vec![-1., 245.],
nested: vec![None]
}
];
let schema = Schema::new(MyStruct::fields());
let mut decoder = ReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
decoder.serialize(&data).unwrap();
let batch = decoder.flush().unwrap().unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 3);
// Convert to StructArray to format
let s = StructArray::from(batch);
let options = FormatOptions::default().with_null("null");
let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");Note: this ignores any batch size setting, and always decodes all rows
Sourcepub fn has_partial_record(&self) -> bool
pub fn has_partial_record(&self) -> bool
True if the decoder is currently part way through decoding a record.
Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
The number of unflushed records, including the partially decoded record (if any).
Sourcepub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError>
Flushes the currently buffered data to a RecordBatch
Returns Ok(None) if no buffered data, i.e. Self::is_empty is true.
Note: This will return an error if called part way through decoding a record,
i.e. Self::has_partial_record is true.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Decoder
impl !RefUnwindSafe for Decoder
impl Send for Decoder
impl !Sync for Decoder
impl Unpin for Decoder
impl !UnwindSafe for Decoder
Blanket Implementations§
Source§impl<T> AlignerFor<1> for T
impl<T> AlignerFor<1> for T
Source§impl<T> AlignerFor<1024> for T
impl<T> AlignerFor<1024> for T
Source§type Aligner = AlignTo1024<T>
type Aligner = AlignTo1024<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<128> for T
impl<T> AlignerFor<128> for T
Source§type Aligner = AlignTo128<T>
type Aligner = AlignTo128<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<16> for T
impl<T> AlignerFor<16> for T
Source§impl<T> AlignerFor<16384> for T
impl<T> AlignerFor<16384> for T
Source§type Aligner = AlignTo16384<T>
type Aligner = AlignTo16384<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<2> for T
impl<T> AlignerFor<2> for T
Source§impl<T> AlignerFor<2048> for T
impl<T> AlignerFor<2048> for T
Source§type Aligner = AlignTo2048<T>
type Aligner = AlignTo2048<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<256> for T
impl<T> AlignerFor<256> for T
Source§type Aligner = AlignTo256<T>
type Aligner = AlignTo256<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<32> for T
impl<T> AlignerFor<32> for T
Source§impl<T> AlignerFor<32768> for T
impl<T> AlignerFor<32768> for T
Source§type Aligner = AlignTo32768<T>
type Aligner = AlignTo32768<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<4> for T
impl<T> AlignerFor<4> for T
Source§impl<T> AlignerFor<4096> for T
impl<T> AlignerFor<4096> for T
Source§type Aligner = AlignTo4096<T>
type Aligner = AlignTo4096<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<512> for T
impl<T> AlignerFor<512> for T
Source§type Aligner = AlignTo512<T>
type Aligner = AlignTo512<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> AlignerFor<64> for T
impl<T> AlignerFor<64> for T
Source§impl<T> AlignerFor<8> for T
impl<T> AlignerFor<8> for T
Source§impl<T> AlignerFor<8192> for T
impl<T> AlignerFor<8192> for T
Source§type Aligner = AlignTo8192<T>
type Aligner = AlignTo8192<T>
AlignTo* type which aligns Self to ALIGNMENT.Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T, W> HasTypeWitness<W> for Twhere
W: MakeTypeWitness<Arg = T>,
T: ?Sized,
impl<T, W> HasTypeWitness<W> for Twhere
W: MakeTypeWitness<Arg = T>,
T: ?Sized,
Source§impl<T> Identity for Twhere
T: ?Sized,
impl<T> Identity for Twhere
T: ?Sized,
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
Source§impl<S> ROExtAcc for S
impl<S> ROExtAcc for S
Source§fn f_get<F>(&self, offset: FieldOffset<S, F, Aligned>) -> &F
fn f_get<F>(&self, offset: FieldOffset<S, F, Aligned>) -> &F
offset. Read moreSource§fn f_get_mut<F>(&mut self, offset: FieldOffset<S, F, Aligned>) -> &mut F
fn f_get_mut<F>(&mut self, offset: FieldOffset<S, F, Aligned>) -> &mut F
offset. Read moreSource§fn f_get_ptr<F, A>(&self, offset: FieldOffset<S, F, A>) -> *const F
fn f_get_ptr<F, A>(&self, offset: FieldOffset<S, F, A>) -> *const F
offset. Read moreSource§fn f_get_mut_ptr<F, A>(&mut self, offset: FieldOffset<S, F, A>) -> *mut F
fn f_get_mut_ptr<F, A>(&mut self, offset: FieldOffset<S, F, A>) -> *mut F
offset. Read moreSource§impl<S> ROExtOps<Aligned> for S
impl<S> ROExtOps<Aligned> for S
Source§fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Aligned>, value: F) -> F
fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Aligned>, value: F) -> F
offset) with value,
returning the previous value of the field. Read moreSource§fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Aligned>) -> Fwhere
F: Copy,
fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Aligned>) -> Fwhere
F: Copy,
Source§impl<S> ROExtOps<Unaligned> for S
impl<S> ROExtOps<Unaligned> for S
Source§fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, value: F) -> F
fn f_replace<F>(&mut self, offset: FieldOffset<S, F, Unaligned>, value: F) -> F
offset) with value,
returning the previous value of the field. Read moreSource§fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Unaligned>) -> Fwhere
F: Copy,
fn f_get_copy<F>(&self, offset: FieldOffset<S, F, Unaligned>) -> Fwhere
F: Copy,
Source§impl<T> SelfOps for Twhere
T: ?Sized,
impl<T> SelfOps for Twhere
T: ?Sized,
Source§fn piped<F, U>(self, f: F) -> U
fn piped<F, U>(self, f: F) -> U
Source§fn piped_ref<'a, F, U>(&'a self, f: F) -> Uwhere
F: FnOnce(&'a Self) -> U,
fn piped_ref<'a, F, U>(&'a self, f: F) -> Uwhere
F: FnOnce(&'a Self) -> U,
piped except that the function takes &Self
Useful for functions that take &Self instead of Self. Read moreSource§fn piped_mut<'a, F, U>(&'a mut self, f: F) -> Uwhere
F: FnOnce(&'a mut Self) -> U,
fn piped_mut<'a, F, U>(&'a mut self, f: F) -> Uwhere
F: FnOnce(&'a mut Self) -> U,
piped, except that the function takes &mut Self.
Useful for functions that take &mut Self instead of Self.Source§fn mutated<F>(self, f: F) -> Self
fn mutated<F>(self, f: F) -> Self
Source§fn observe<F>(self, f: F) -> Self
fn observe<F>(self, f: F) -> Self
Source§fn as_ref_<T>(&self) -> &T
fn as_ref_<T>(&self) -> &T
AsRef,
using the turbofish .as_ref_::<_>() syntax. Read more