rdf_fusion_encoding/plain_term/decoders/
default.rs

1use crate::encoding::{EncodingArray, TermDecoder};
2use crate::plain_term::PlainTermEncoding;
3use crate::plain_term::encoding::PlainTermType;
4use crate::{EncodingScalar, TermEncoding};
5use datafusion::arrow::array::{
6    Array, AsArray, GenericStringArray, PrimitiveArray, StructArray,
7};
8use datafusion::arrow::datatypes::UInt8Type;
9use datafusion::common::ScalarValue;
10use rdf_fusion_model::{
11    BlankNodeRef, LiteralRef, NamedNodeRef, TermRef, ThinError, ThinResult,
12};
13
14#[derive(Debug)]
15pub struct DefaultPlainTermDecoder;
16
17/// Extracts a sequence of term references from the given array.
18impl TermDecoder<PlainTermEncoding> for DefaultPlainTermDecoder {
19    type Term<'data> = TermRef<'data>;
20
21    fn decode_terms(
22        array: &<PlainTermEncoding as TermEncoding>::Array,
23    ) -> impl Iterator<Item = ThinResult<Self::Term<'_>>> {
24        let array = array.array().as_struct();
25
26        let term_type = array.column(0).as_primitive::<UInt8Type>();
27
28        let value = array.column(1).as_string::<i32>();
29        let datatype = array.column(2).as_string::<i32>();
30        let language = array.column(3).as_string::<i32>();
31
32        (0..array.len())
33            .map(|idx| extract_term(array, term_type, value, datatype, language, idx))
34    }
35
36    fn decode_term(
37        scalar: &<PlainTermEncoding as TermEncoding>::Scalar,
38    ) -> ThinResult<Self::Term<'_>> {
39        let ScalarValue::Struct(array) = scalar.scalar_value() else {
40            panic!("Unexpected encoding. Should be ensured by the wrapping type.");
41        };
42
43        let term_type = array.column(0).as_primitive::<UInt8Type>();
44        let value = array.column(1).as_string::<i32>();
45        let datatype = array.column(2).as_string::<i32>();
46        let language = array.column(3).as_string::<i32>();
47
48        extract_term(array, term_type, value, datatype, language, 0)
49    }
50}
51
52fn extract_term<'data>(
53    array: &'data StructArray,
54    term_type: &'data PrimitiveArray<UInt8Type>,
55    value: &'data GenericStringArray<i32>,
56    datatype: &'data GenericStringArray<i32>,
57    language: &'data GenericStringArray<i32>,
58    idx: usize,
59) -> ThinResult<TermRef<'data>> {
60    array
61        .is_valid(idx)
62        .then(|| {
63            let term_type = PlainTermType::try_from(term_type.value(idx)).expect(
64                "Unexpected term type encoding. Should be ensured by the wrapping type.",
65            );
66            decode_term(value, datatype, language, idx, term_type)
67        })
68        .ok_or(ThinError::ExpectedError)
69}
70
71fn decode_term<'data>(
72    value: &'data GenericStringArray<i32>,
73    datatype: &'data GenericStringArray<i32>,
74    language: &'data GenericStringArray<i32>,
75    idx: usize,
76    term_type: PlainTermType,
77) -> TermRef<'data> {
78    match term_type {
79        PlainTermType::NamedNode => {
80            TermRef::NamedNode(NamedNodeRef::new_unchecked(value.value(idx)))
81        }
82        PlainTermType::BlankNode => {
83            TermRef::BlankNode(BlankNodeRef::new_unchecked(value.value(idx)))
84        }
85        PlainTermType::Literal => {
86            if language.is_valid(idx) {
87                TermRef::Literal(LiteralRef::new_language_tagged_literal_unchecked(
88                    value.value(idx),
89                    language.value(idx),
90                ))
91            } else {
92                TermRef::Literal(LiteralRef::new_typed_literal(
93                    value.value(idx),
94                    NamedNodeRef::new_unchecked(datatype.value(idx)),
95                ))
96            }
97        }
98    }
99}