rdf_fusion_encoding/plain_term/decoders/
default.rs1use crate::encoding::{EncodingArray, TermDecoder};
2use crate::plain_term::PlainTermEncoding;
3use crate::plain_term::encoding::PlainTermType;
4use crate::{EncodingScalar, TermEncoding};
5use datafusion::arrow::array::{
6 Array, AsArray, GenericStringArray, PrimitiveArray, StructArray,
7};
8use datafusion::arrow::datatypes::UInt8Type;
9use datafusion::common::ScalarValue;
10use rdf_fusion_model::{
11 BlankNodeRef, LiteralRef, NamedNodeRef, TermRef, ThinError, ThinResult,
12};
13
14#[derive(Debug)]
15pub struct DefaultPlainTermDecoder;
16
17impl TermDecoder<PlainTermEncoding> for DefaultPlainTermDecoder {
19 type Term<'data> = TermRef<'data>;
20
21 fn decode_terms(
22 array: &<PlainTermEncoding as TermEncoding>::Array,
23 ) -> impl Iterator<Item = ThinResult<Self::Term<'_>>> {
24 let array = array.array().as_struct();
25
26 let term_type = array.column(0).as_primitive::<UInt8Type>();
27
28 let value = array.column(1).as_string::<i32>();
29 let datatype = array.column(2).as_string::<i32>();
30 let language = array.column(3).as_string::<i32>();
31
32 (0..array.len())
33 .map(|idx| extract_term(array, term_type, value, datatype, language, idx))
34 }
35
36 fn decode_term(
37 scalar: &<PlainTermEncoding as TermEncoding>::Scalar,
38 ) -> ThinResult<Self::Term<'_>> {
39 let ScalarValue::Struct(array) = scalar.scalar_value() else {
40 panic!("Unexpected encoding. Should be ensured by the wrapping type.");
41 };
42
43 let term_type = array.column(0).as_primitive::<UInt8Type>();
44 let value = array.column(1).as_string::<i32>();
45 let datatype = array.column(2).as_string::<i32>();
46 let language = array.column(3).as_string::<i32>();
47
48 extract_term(array, term_type, value, datatype, language, 0)
49 }
50}
51
52fn extract_term<'data>(
53 array: &'data StructArray,
54 term_type: &'data PrimitiveArray<UInt8Type>,
55 value: &'data GenericStringArray<i32>,
56 datatype: &'data GenericStringArray<i32>,
57 language: &'data GenericStringArray<i32>,
58 idx: usize,
59) -> ThinResult<TermRef<'data>> {
60 array
61 .is_valid(idx)
62 .then(|| {
63 let term_type = PlainTermType::try_from(term_type.value(idx)).expect(
64 "Unexpected term type encoding. Should be ensured by the wrapping type.",
65 );
66 decode_term(value, datatype, language, idx, term_type)
67 })
68 .ok_or(ThinError::ExpectedError)
69}
70
71fn decode_term<'data>(
72 value: &'data GenericStringArray<i32>,
73 datatype: &'data GenericStringArray<i32>,
74 language: &'data GenericStringArray<i32>,
75 idx: usize,
76 term_type: PlainTermType,
77) -> TermRef<'data> {
78 match term_type {
79 PlainTermType::NamedNode => {
80 TermRef::NamedNode(NamedNodeRef::new_unchecked(value.value(idx)))
81 }
82 PlainTermType::BlankNode => {
83 TermRef::BlankNode(BlankNodeRef::new_unchecked(value.value(idx)))
84 }
85 PlainTermType::Literal => {
86 if language.is_valid(idx) {
87 TermRef::Literal(LiteralRef::new_language_tagged_literal_unchecked(
88 value.value(idx),
89 language.value(idx),
90 ))
91 } else {
92 TermRef::Literal(LiteralRef::new_typed_literal(
93 value.value(idx),
94 NamedNodeRef::new_unchecked(datatype.value(idx)),
95 ))
96 }
97 }
98 }
99}