rdf_fusion_functions/scalar/dispatch/
binary.rs

1use datafusion::logical_expr::ColumnarValue;
2use rdf_fusion_encoding::TermEncoder;
3use rdf_fusion_encoding::plain_term::decoders::DefaultPlainTermDecoder;
4use rdf_fusion_encoding::plain_term::encoders::DefaultPlainTermEncoder;
5use rdf_fusion_encoding::plain_term::{
6    PlainTermArray, PlainTermEncoding, PlainTermScalar,
7};
8use rdf_fusion_encoding::typed_value::decoders::DefaultTypedValueDecoder;
9use rdf_fusion_encoding::typed_value::encoders::DefaultTypedValueEncoder;
10use rdf_fusion_encoding::typed_value::{
11    TypedValueArray, TypedValueEncoding, TypedValueScalar,
12};
13use rdf_fusion_encoding::{EncodingArray, EncodingDatum, EncodingScalar, TermDecoder};
14use rdf_fusion_model::DFResult;
15use rdf_fusion_model::{TermRef, ThinResult, TypedValue, TypedValueRef};
16
17pub fn dispatch_binary_typed_value<'data>(
18    lhs: &'data EncodingDatum<TypedValueEncoding>,
19    rhs: &'data EncodingDatum<TypedValueEncoding>,
20    op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
21    error_op: impl for<'a> Fn(
22        ThinResult<TypedValueRef<'a>>,
23        ThinResult<TypedValueRef<'a>>,
24    ) -> ThinResult<TypedValueRef<'a>>,
25) -> DFResult<ColumnarValue> {
26    match (lhs, rhs) {
27        (EncodingDatum::Array(lhs), EncodingDatum::Array(rhs)) => {
28            dispatch_binary_typed_value_array_array(lhs, rhs, op, error_op)
29        }
30        (EncodingDatum::Scalar(lhs, _), EncodingDatum::Array(rhs)) => {
31            dispatch_binary_typed_value_scalar_array(lhs, rhs, op, error_op)
32        }
33        (EncodingDatum::Array(lhs), EncodingDatum::Scalar(rhs, _)) => {
34            dispatch_binary_typed_value_array_scalar(lhs, rhs, op, error_op)
35        }
36        (EncodingDatum::Scalar(lhs, _), EncodingDatum::Scalar(rhs, _)) => {
37            dispatch_binary_typed_value_scalar_scalar(lhs, rhs, op, error_op)
38        }
39    }
40}
41
42fn dispatch_binary_typed_value_array_array<'data>(
43    lhs: &'data TypedValueArray,
44    rhs: &'data TypedValueArray,
45    op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
46    error_op: impl for<'a> Fn(
47        ThinResult<TypedValueRef<'a>>,
48        ThinResult<TypedValueRef<'a>>,
49    ) -> ThinResult<TypedValueRef<'a>>,
50) -> DFResult<ColumnarValue> {
51    let lhs = DefaultTypedValueDecoder::decode_terms(lhs);
52    let rhs = DefaultTypedValueDecoder::decode_terms(rhs);
53
54    let results = lhs.zip(rhs).map(|(lhs_value, rhs_value)| {
55        apply_binary_op(lhs_value, rhs_value, &op, &error_op)
56    });
57    let result = DefaultTypedValueEncoder::encode_terms(results)?;
58    Ok(ColumnarValue::Array(result.into_array()))
59}
60
61fn dispatch_binary_typed_value_scalar_array<'data>(
62    lhs: &'data TypedValueScalar,
63    rhs: &'data TypedValueArray,
64    op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
65    error_op: impl for<'a> Fn(
66        ThinResult<TypedValueRef<'a>>,
67        ThinResult<TypedValueRef<'a>>,
68    ) -> ThinResult<TypedValueRef<'a>>,
69) -> DFResult<ColumnarValue> {
70    let results = DefaultTypedValueDecoder::decode_terms(rhs).map(|rhs_value| {
71        let lhs_value = DefaultTypedValueDecoder::decode_term(lhs);
72        apply_binary_op(lhs_value, rhs_value, &op, &error_op)
73    });
74    let result = DefaultTypedValueEncoder::encode_terms(results)?;
75    Ok(ColumnarValue::Array(result.into_array()))
76}
77
78fn dispatch_binary_typed_value_array_scalar<'data>(
79    lhs: &'data TypedValueArray,
80    rhs: &'data TypedValueScalar,
81    op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
82    error_op: impl for<'a> Fn(
83        ThinResult<TypedValueRef<'a>>,
84        ThinResult<TypedValueRef<'a>>,
85    ) -> ThinResult<TypedValueRef<'a>>,
86) -> DFResult<ColumnarValue> {
87    let results = DefaultTypedValueDecoder::decode_terms(lhs).map(|lhs_value| {
88        let rhs_value = DefaultTypedValueDecoder::decode_term(rhs);
89        apply_binary_op(lhs_value, rhs_value, &op, &error_op)
90    });
91    let result = DefaultTypedValueEncoder::encode_terms(results)?;
92    Ok(ColumnarValue::Array(result.into_array()))
93}
94
95fn dispatch_binary_typed_value_scalar_scalar<'data>(
96    lhs: &'data TypedValueScalar,
97    rhs: &'data TypedValueScalar,
98    op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
99    error_op: impl for<'a> Fn(
100        ThinResult<TypedValueRef<'a>>,
101        ThinResult<TypedValueRef<'a>>,
102    ) -> ThinResult<TypedValueRef<'a>>,
103) -> DFResult<ColumnarValue> {
104    let lhs = DefaultTypedValueDecoder::decode_term(lhs);
105    let rhs = DefaultTypedValueDecoder::decode_term(rhs);
106
107    let result = apply_binary_op(lhs, rhs, &op, &error_op);
108    Ok(ColumnarValue::Scalar(
109        DefaultTypedValueEncoder::encode_term(result)?.into_scalar_value(),
110    ))
111}
112
113fn apply_binary_op<'data>(
114    lhs: ThinResult<TypedValueRef<'data>>,
115    rhs: ThinResult<TypedValueRef<'data>>,
116    op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
117    error_op: impl for<'a> Fn(
118        ThinResult<TypedValueRef<'a>>,
119        ThinResult<TypedValueRef<'a>>,
120    ) -> ThinResult<TypedValueRef<'a>>,
121) -> ThinResult<TypedValueRef<'data>> {
122    match (lhs, rhs) {
123        (Ok(lhs_value), Ok(rhs_value)) => op(lhs_value, rhs_value),
124        (lhs, rhs) => error_op(lhs, rhs),
125    }
126}
127
128pub fn dispatch_binary_owned_typed_value(
129    lhs: &EncodingDatum<TypedValueEncoding>,
130    rhs: &EncodingDatum<TypedValueEncoding>,
131    op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
132    error_op: impl Fn(
133        ThinResult<TypedValueRef<'_>>,
134        ThinResult<TypedValueRef<'_>>,
135    ) -> ThinResult<TypedValue>,
136) -> DFResult<ColumnarValue> {
137    match (lhs, rhs) {
138        (EncodingDatum::Array(lhs), EncodingDatum::Array(rhs)) => {
139            dispatch_binary_owned_array_array(lhs, rhs, op, error_op)
140        }
141        (EncodingDatum::Scalar(lhs, _), EncodingDatum::Array(rhs)) => {
142            dispatch_binary_owned_scalar_array(lhs, rhs, op, error_op)
143        }
144        (EncodingDatum::Array(lhs), EncodingDatum::Scalar(rhs, _)) => {
145            dispatch_binary_owned_array_scalar(lhs, rhs, op, error_op)
146        }
147        (EncodingDatum::Scalar(lhs, _), EncodingDatum::Scalar(rhs, _)) => {
148            dispatch_binary_owned_scalar_scalar(lhs, rhs, op, error_op)
149        }
150    }
151}
152
153fn dispatch_binary_owned_array_array(
154    lhs: &TypedValueArray,
155    rhs: &TypedValueArray,
156    op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
157    error_op: impl Fn(
158        ThinResult<TypedValueRef<'_>>,
159        ThinResult<TypedValueRef<'_>>,
160    ) -> ThinResult<TypedValue>,
161) -> DFResult<ColumnarValue> {
162    let lhs = DefaultTypedValueDecoder::decode_terms(lhs);
163    let rhs = DefaultTypedValueDecoder::decode_terms(rhs);
164
165    let results = lhs
166        .zip(rhs)
167        .map(|(lhs_value, rhs_value)| {
168            apply_binary_owned_op(lhs_value, rhs_value, &op, &error_op)
169        })
170        .collect::<Vec<_>>();
171
172    let result_refs = results
173        .iter()
174        .map(|result| match result {
175            Ok(value) => Ok(value.as_ref()),
176            Err(err) => Err(*err),
177        })
178        .collect::<Vec<_>>();
179    let result = DefaultTypedValueEncoder::encode_terms(result_refs)?;
180    Ok(ColumnarValue::Array(result.into_array()))
181}
182
183fn dispatch_binary_owned_scalar_array(
184    lhs: &TypedValueScalar,
185    rhs: &TypedValueArray,
186    op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
187    error_op: impl Fn(
188        ThinResult<TypedValueRef<'_>>,
189        ThinResult<TypedValueRef<'_>>,
190    ) -> ThinResult<TypedValue>,
191) -> DFResult<ColumnarValue> {
192    let results = DefaultTypedValueDecoder::decode_terms(rhs)
193        .map(|rhs_value| {
194            let lhs_value = DefaultTypedValueDecoder::decode_term(lhs);
195            apply_binary_owned_op(lhs_value, rhs_value, &op, &error_op)
196        })
197        .collect::<Vec<_>>();
198
199    let result_refs = results
200        .iter()
201        .map(|result| match result {
202            Ok(value) => Ok(value.as_ref()),
203            Err(err) => Err(*err),
204        })
205        .collect::<Vec<_>>();
206    let result = DefaultTypedValueEncoder::encode_terms(result_refs)?;
207    Ok(ColumnarValue::Array(result.into_array()))
208}
209
210fn dispatch_binary_owned_array_scalar(
211    lhs: &TypedValueArray,
212    rhs: &TypedValueScalar,
213    op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
214    error_op: impl Fn(
215        ThinResult<TypedValueRef<'_>>,
216        ThinResult<TypedValueRef<'_>>,
217    ) -> ThinResult<TypedValue>,
218) -> DFResult<ColumnarValue> {
219    let results = DefaultTypedValueDecoder::decode_terms(lhs)
220        .map(|lhs_value| {
221            let rhs_value = DefaultTypedValueDecoder::decode_term(rhs);
222            apply_binary_owned_op(lhs_value, rhs_value, &op, &error_op)
223        })
224        .collect::<Vec<_>>();
225
226    let result_refs = results
227        .iter()
228        .map(|result| match result {
229            Ok(value) => Ok(value.as_ref()),
230            Err(err) => Err(*err),
231        })
232        .collect::<Vec<_>>();
233    let result = DefaultTypedValueEncoder::encode_terms(result_refs)?;
234    Ok(ColumnarValue::Array(result.into_array()))
235}
236
237fn dispatch_binary_owned_scalar_scalar(
238    lhs: &TypedValueScalar,
239    rhs: &TypedValueScalar,
240    op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
241    error_op: impl Fn(
242        ThinResult<TypedValueRef<'_>>,
243        ThinResult<TypedValueRef<'_>>,
244    ) -> ThinResult<TypedValue>,
245) -> DFResult<ColumnarValue> {
246    let lhs = DefaultTypedValueDecoder::decode_term(lhs);
247    let rhs = DefaultTypedValueDecoder::decode_term(rhs);
248
249    let result = apply_binary_owned_op(lhs, rhs, &op, &error_op);
250    let result_ref = match result.as_ref() {
251        Ok(typed_value) => Ok(typed_value.as_ref()),
252        Err(err) => Err(*err),
253    };
254    Ok(ColumnarValue::Scalar(
255        DefaultTypedValueEncoder::encode_term(result_ref)?.into_scalar_value(),
256    ))
257}
258
259fn apply_binary_owned_op(
260    lhs: ThinResult<TypedValueRef<'_>>,
261    rhs: ThinResult<TypedValueRef<'_>>,
262    op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
263    error_op: impl Fn(
264        ThinResult<TypedValueRef<'_>>,
265        ThinResult<TypedValueRef<'_>>,
266    ) -> ThinResult<TypedValue>,
267) -> ThinResult<TypedValue> {
268    match (lhs, rhs) {
269        (Ok(lhs_value), Ok(rhs_value)) => op(lhs_value, rhs_value),
270        (lhs, rhs) => error_op(lhs, rhs),
271    }
272}
273
274pub fn dispatch_binary_plain_term<'data>(
275    lhs: &'data EncodingDatum<PlainTermEncoding>,
276    rhs: &'data EncodingDatum<PlainTermEncoding>,
277    op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
278    error_op: impl for<'a> Fn(
279        ThinResult<TermRef<'a>>,
280        ThinResult<TermRef<'a>>,
281    ) -> ThinResult<TermRef<'a>>,
282) -> DFResult<ColumnarValue> {
283    match (lhs, rhs) {
284        (EncodingDatum::Array(lhs), EncodingDatum::Array(rhs)) => {
285            dispatch_binary_plain_term_array_array(lhs, rhs, op, error_op)
286        }
287        (EncodingDatum::Scalar(lhs, _), EncodingDatum::Array(rhs)) => {
288            dispatch_binary_plain_term_scalar_array(lhs, rhs, op, error_op)
289        }
290        (EncodingDatum::Array(lhs), EncodingDatum::Scalar(rhs, _)) => {
291            dispatch_binary_plain_term_array_scalar(lhs, rhs, op, error_op)
292        }
293        (EncodingDatum::Scalar(lhs, _), EncodingDatum::Scalar(rhs, _)) => {
294            dispatch_binary_plain_term_scalar_scalar(lhs, rhs, op, error_op)
295        }
296    }
297}
298
299fn dispatch_binary_plain_term_array_array<'data>(
300    lhs: &'data PlainTermArray,
301    rhs: &'data PlainTermArray,
302    op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
303    error_op: impl for<'a> Fn(
304        ThinResult<TermRef<'a>>,
305        ThinResult<TermRef<'a>>,
306    ) -> ThinResult<TermRef<'a>>,
307) -> DFResult<ColumnarValue> {
308    let lhs = DefaultPlainTermDecoder::decode_terms(lhs);
309    let rhs = DefaultPlainTermDecoder::decode_terms(rhs);
310
311    let results = lhs.zip(rhs).map(|(lhs_value, rhs_value)| {
312        apply_binary_op_plain_term(lhs_value, rhs_value, &op, &error_op)
313    });
314    let result = DefaultPlainTermEncoder::encode_terms(results)?;
315    Ok(ColumnarValue::Array(result.into_array()))
316}
317
318fn dispatch_binary_plain_term_scalar_array<'data>(
319    lhs: &'data PlainTermScalar,
320    rhs: &'data PlainTermArray,
321    op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
322    error_op: impl for<'a> Fn(
323        ThinResult<TermRef<'a>>,
324        ThinResult<TermRef<'a>>,
325    ) -> ThinResult<TermRef<'a>>,
326) -> DFResult<ColumnarValue> {
327    let results = DefaultPlainTermDecoder::decode_terms(rhs).map(|rhs_value| {
328        let lhs_value = DefaultPlainTermDecoder::decode_term(lhs);
329        apply_binary_op_plain_term(lhs_value, rhs_value, &op, &error_op)
330    });
331    let result = DefaultPlainTermEncoder::encode_terms(results)?;
332    Ok(ColumnarValue::Array(result.into_array()))
333}
334
335fn dispatch_binary_plain_term_array_scalar<'data>(
336    lhs: &'data PlainTermArray,
337    rhs: &'data PlainTermScalar,
338    op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
339    error_op: impl for<'a> Fn(
340        ThinResult<TermRef<'a>>,
341        ThinResult<TermRef<'a>>,
342    ) -> ThinResult<TermRef<'a>>,
343) -> DFResult<ColumnarValue> {
344    let results = DefaultPlainTermDecoder::decode_terms(lhs).map(|lhs_value| {
345        let rhs_value = DefaultPlainTermDecoder::decode_term(rhs);
346        apply_binary_op_plain_term(lhs_value, rhs_value, &op, &error_op)
347    });
348    let result = DefaultPlainTermEncoder::encode_terms(results)?;
349    Ok(ColumnarValue::Array(result.into_array()))
350}
351
352fn dispatch_binary_plain_term_scalar_scalar<'data>(
353    lhs: &'data PlainTermScalar,
354    rhs: &'data PlainTermScalar,
355    op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
356    error_op: impl for<'a> Fn(
357        ThinResult<TermRef<'a>>,
358        ThinResult<TermRef<'a>>,
359    ) -> ThinResult<TermRef<'a>>,
360) -> DFResult<ColumnarValue> {
361    let lhs = DefaultPlainTermDecoder::decode_term(lhs);
362    let rhs = DefaultPlainTermDecoder::decode_term(rhs);
363
364    let result = apply_binary_op_plain_term(lhs, rhs, &op, &error_op);
365    Ok(ColumnarValue::Scalar(
366        DefaultPlainTermEncoder::encode_term(result)?.into_scalar_value(),
367    ))
368}
369
370fn apply_binary_op_plain_term<'data>(
371    lhs: ThinResult<TermRef<'data>>,
372    rhs: ThinResult<TermRef<'data>>,
373    op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
374    error_op: impl for<'a> Fn(
375        ThinResult<TermRef<'a>>,
376        ThinResult<TermRef<'a>>,
377    ) -> ThinResult<TermRef<'a>>,
378) -> ThinResult<TermRef<'data>> {
379    match (lhs, rhs) {
380        (Ok(lhs_value), Ok(rhs_value)) => op(lhs_value, rhs_value),
381        (lhs, rhs) => error_op(lhs, rhs),
382    }
383}