1use datafusion::logical_expr::ColumnarValue;
2use rdf_fusion_encoding::TermEncoder;
3use rdf_fusion_encoding::plain_term::decoders::DefaultPlainTermDecoder;
4use rdf_fusion_encoding::plain_term::encoders::DefaultPlainTermEncoder;
5use rdf_fusion_encoding::plain_term::{
6 PlainTermArray, PlainTermEncoding, PlainTermScalar,
7};
8use rdf_fusion_encoding::typed_value::decoders::DefaultTypedValueDecoder;
9use rdf_fusion_encoding::typed_value::encoders::DefaultTypedValueEncoder;
10use rdf_fusion_encoding::typed_value::{
11 TypedValueArray, TypedValueEncoding, TypedValueScalar,
12};
13use rdf_fusion_encoding::{EncodingArray, EncodingDatum, EncodingScalar, TermDecoder};
14use rdf_fusion_model::DFResult;
15use rdf_fusion_model::{TermRef, ThinResult, TypedValue, TypedValueRef};
16
17pub fn dispatch_binary_typed_value<'data>(
18 lhs: &'data EncodingDatum<TypedValueEncoding>,
19 rhs: &'data EncodingDatum<TypedValueEncoding>,
20 op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
21 error_op: impl for<'a> Fn(
22 ThinResult<TypedValueRef<'a>>,
23 ThinResult<TypedValueRef<'a>>,
24 ) -> ThinResult<TypedValueRef<'a>>,
25) -> DFResult<ColumnarValue> {
26 match (lhs, rhs) {
27 (EncodingDatum::Array(lhs), EncodingDatum::Array(rhs)) => {
28 dispatch_binary_typed_value_array_array(lhs, rhs, op, error_op)
29 }
30 (EncodingDatum::Scalar(lhs, _), EncodingDatum::Array(rhs)) => {
31 dispatch_binary_typed_value_scalar_array(lhs, rhs, op, error_op)
32 }
33 (EncodingDatum::Array(lhs), EncodingDatum::Scalar(rhs, _)) => {
34 dispatch_binary_typed_value_array_scalar(lhs, rhs, op, error_op)
35 }
36 (EncodingDatum::Scalar(lhs, _), EncodingDatum::Scalar(rhs, _)) => {
37 dispatch_binary_typed_value_scalar_scalar(lhs, rhs, op, error_op)
38 }
39 }
40}
41
42fn dispatch_binary_typed_value_array_array<'data>(
43 lhs: &'data TypedValueArray,
44 rhs: &'data TypedValueArray,
45 op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
46 error_op: impl for<'a> Fn(
47 ThinResult<TypedValueRef<'a>>,
48 ThinResult<TypedValueRef<'a>>,
49 ) -> ThinResult<TypedValueRef<'a>>,
50) -> DFResult<ColumnarValue> {
51 let lhs = DefaultTypedValueDecoder::decode_terms(lhs);
52 let rhs = DefaultTypedValueDecoder::decode_terms(rhs);
53
54 let results = lhs.zip(rhs).map(|(lhs_value, rhs_value)| {
55 apply_binary_op(lhs_value, rhs_value, &op, &error_op)
56 });
57 let result = DefaultTypedValueEncoder::encode_terms(results)?;
58 Ok(ColumnarValue::Array(result.into_array()))
59}
60
61fn dispatch_binary_typed_value_scalar_array<'data>(
62 lhs: &'data TypedValueScalar,
63 rhs: &'data TypedValueArray,
64 op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
65 error_op: impl for<'a> Fn(
66 ThinResult<TypedValueRef<'a>>,
67 ThinResult<TypedValueRef<'a>>,
68 ) -> ThinResult<TypedValueRef<'a>>,
69) -> DFResult<ColumnarValue> {
70 let results = DefaultTypedValueDecoder::decode_terms(rhs).map(|rhs_value| {
71 let lhs_value = DefaultTypedValueDecoder::decode_term(lhs);
72 apply_binary_op(lhs_value, rhs_value, &op, &error_op)
73 });
74 let result = DefaultTypedValueEncoder::encode_terms(results)?;
75 Ok(ColumnarValue::Array(result.into_array()))
76}
77
78fn dispatch_binary_typed_value_array_scalar<'data>(
79 lhs: &'data TypedValueArray,
80 rhs: &'data TypedValueScalar,
81 op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
82 error_op: impl for<'a> Fn(
83 ThinResult<TypedValueRef<'a>>,
84 ThinResult<TypedValueRef<'a>>,
85 ) -> ThinResult<TypedValueRef<'a>>,
86) -> DFResult<ColumnarValue> {
87 let results = DefaultTypedValueDecoder::decode_terms(lhs).map(|lhs_value| {
88 let rhs_value = DefaultTypedValueDecoder::decode_term(rhs);
89 apply_binary_op(lhs_value, rhs_value, &op, &error_op)
90 });
91 let result = DefaultTypedValueEncoder::encode_terms(results)?;
92 Ok(ColumnarValue::Array(result.into_array()))
93}
94
95fn dispatch_binary_typed_value_scalar_scalar<'data>(
96 lhs: &'data TypedValueScalar,
97 rhs: &'data TypedValueScalar,
98 op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
99 error_op: impl for<'a> Fn(
100 ThinResult<TypedValueRef<'a>>,
101 ThinResult<TypedValueRef<'a>>,
102 ) -> ThinResult<TypedValueRef<'a>>,
103) -> DFResult<ColumnarValue> {
104 let lhs = DefaultTypedValueDecoder::decode_term(lhs);
105 let rhs = DefaultTypedValueDecoder::decode_term(rhs);
106
107 let result = apply_binary_op(lhs, rhs, &op, &error_op);
108 Ok(ColumnarValue::Scalar(
109 DefaultTypedValueEncoder::encode_term(result)?.into_scalar_value(),
110 ))
111}
112
113fn apply_binary_op<'data>(
114 lhs: ThinResult<TypedValueRef<'data>>,
115 rhs: ThinResult<TypedValueRef<'data>>,
116 op: impl for<'a> Fn(TypedValueRef<'a>, TypedValueRef<'a>) -> ThinResult<TypedValueRef<'a>>,
117 error_op: impl for<'a> Fn(
118 ThinResult<TypedValueRef<'a>>,
119 ThinResult<TypedValueRef<'a>>,
120 ) -> ThinResult<TypedValueRef<'a>>,
121) -> ThinResult<TypedValueRef<'data>> {
122 match (lhs, rhs) {
123 (Ok(lhs_value), Ok(rhs_value)) => op(lhs_value, rhs_value),
124 (lhs, rhs) => error_op(lhs, rhs),
125 }
126}
127
128pub fn dispatch_binary_owned_typed_value(
129 lhs: &EncodingDatum<TypedValueEncoding>,
130 rhs: &EncodingDatum<TypedValueEncoding>,
131 op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
132 error_op: impl Fn(
133 ThinResult<TypedValueRef<'_>>,
134 ThinResult<TypedValueRef<'_>>,
135 ) -> ThinResult<TypedValue>,
136) -> DFResult<ColumnarValue> {
137 match (lhs, rhs) {
138 (EncodingDatum::Array(lhs), EncodingDatum::Array(rhs)) => {
139 dispatch_binary_owned_array_array(lhs, rhs, op, error_op)
140 }
141 (EncodingDatum::Scalar(lhs, _), EncodingDatum::Array(rhs)) => {
142 dispatch_binary_owned_scalar_array(lhs, rhs, op, error_op)
143 }
144 (EncodingDatum::Array(lhs), EncodingDatum::Scalar(rhs, _)) => {
145 dispatch_binary_owned_array_scalar(lhs, rhs, op, error_op)
146 }
147 (EncodingDatum::Scalar(lhs, _), EncodingDatum::Scalar(rhs, _)) => {
148 dispatch_binary_owned_scalar_scalar(lhs, rhs, op, error_op)
149 }
150 }
151}
152
153fn dispatch_binary_owned_array_array(
154 lhs: &TypedValueArray,
155 rhs: &TypedValueArray,
156 op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
157 error_op: impl Fn(
158 ThinResult<TypedValueRef<'_>>,
159 ThinResult<TypedValueRef<'_>>,
160 ) -> ThinResult<TypedValue>,
161) -> DFResult<ColumnarValue> {
162 let lhs = DefaultTypedValueDecoder::decode_terms(lhs);
163 let rhs = DefaultTypedValueDecoder::decode_terms(rhs);
164
165 let results = lhs
166 .zip(rhs)
167 .map(|(lhs_value, rhs_value)| {
168 apply_binary_owned_op(lhs_value, rhs_value, &op, &error_op)
169 })
170 .collect::<Vec<_>>();
171
172 let result_refs = results
173 .iter()
174 .map(|result| match result {
175 Ok(value) => Ok(value.as_ref()),
176 Err(err) => Err(*err),
177 })
178 .collect::<Vec<_>>();
179 let result = DefaultTypedValueEncoder::encode_terms(result_refs)?;
180 Ok(ColumnarValue::Array(result.into_array()))
181}
182
183fn dispatch_binary_owned_scalar_array(
184 lhs: &TypedValueScalar,
185 rhs: &TypedValueArray,
186 op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
187 error_op: impl Fn(
188 ThinResult<TypedValueRef<'_>>,
189 ThinResult<TypedValueRef<'_>>,
190 ) -> ThinResult<TypedValue>,
191) -> DFResult<ColumnarValue> {
192 let results = DefaultTypedValueDecoder::decode_terms(rhs)
193 .map(|rhs_value| {
194 let lhs_value = DefaultTypedValueDecoder::decode_term(lhs);
195 apply_binary_owned_op(lhs_value, rhs_value, &op, &error_op)
196 })
197 .collect::<Vec<_>>();
198
199 let result_refs = results
200 .iter()
201 .map(|result| match result {
202 Ok(value) => Ok(value.as_ref()),
203 Err(err) => Err(*err),
204 })
205 .collect::<Vec<_>>();
206 let result = DefaultTypedValueEncoder::encode_terms(result_refs)?;
207 Ok(ColumnarValue::Array(result.into_array()))
208}
209
210fn dispatch_binary_owned_array_scalar(
211 lhs: &TypedValueArray,
212 rhs: &TypedValueScalar,
213 op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
214 error_op: impl Fn(
215 ThinResult<TypedValueRef<'_>>,
216 ThinResult<TypedValueRef<'_>>,
217 ) -> ThinResult<TypedValue>,
218) -> DFResult<ColumnarValue> {
219 let results = DefaultTypedValueDecoder::decode_terms(lhs)
220 .map(|lhs_value| {
221 let rhs_value = DefaultTypedValueDecoder::decode_term(rhs);
222 apply_binary_owned_op(lhs_value, rhs_value, &op, &error_op)
223 })
224 .collect::<Vec<_>>();
225
226 let result_refs = results
227 .iter()
228 .map(|result| match result {
229 Ok(value) => Ok(value.as_ref()),
230 Err(err) => Err(*err),
231 })
232 .collect::<Vec<_>>();
233 let result = DefaultTypedValueEncoder::encode_terms(result_refs)?;
234 Ok(ColumnarValue::Array(result.into_array()))
235}
236
237fn dispatch_binary_owned_scalar_scalar(
238 lhs: &TypedValueScalar,
239 rhs: &TypedValueScalar,
240 op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
241 error_op: impl Fn(
242 ThinResult<TypedValueRef<'_>>,
243 ThinResult<TypedValueRef<'_>>,
244 ) -> ThinResult<TypedValue>,
245) -> DFResult<ColumnarValue> {
246 let lhs = DefaultTypedValueDecoder::decode_term(lhs);
247 let rhs = DefaultTypedValueDecoder::decode_term(rhs);
248
249 let result = apply_binary_owned_op(lhs, rhs, &op, &error_op);
250 let result_ref = match result.as_ref() {
251 Ok(typed_value) => Ok(typed_value.as_ref()),
252 Err(err) => Err(*err),
253 };
254 Ok(ColumnarValue::Scalar(
255 DefaultTypedValueEncoder::encode_term(result_ref)?.into_scalar_value(),
256 ))
257}
258
259fn apply_binary_owned_op(
260 lhs: ThinResult<TypedValueRef<'_>>,
261 rhs: ThinResult<TypedValueRef<'_>>,
262 op: impl Fn(TypedValueRef<'_>, TypedValueRef<'_>) -> ThinResult<TypedValue>,
263 error_op: impl Fn(
264 ThinResult<TypedValueRef<'_>>,
265 ThinResult<TypedValueRef<'_>>,
266 ) -> ThinResult<TypedValue>,
267) -> ThinResult<TypedValue> {
268 match (lhs, rhs) {
269 (Ok(lhs_value), Ok(rhs_value)) => op(lhs_value, rhs_value),
270 (lhs, rhs) => error_op(lhs, rhs),
271 }
272}
273
274pub fn dispatch_binary_plain_term<'data>(
275 lhs: &'data EncodingDatum<PlainTermEncoding>,
276 rhs: &'data EncodingDatum<PlainTermEncoding>,
277 op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
278 error_op: impl for<'a> Fn(
279 ThinResult<TermRef<'a>>,
280 ThinResult<TermRef<'a>>,
281 ) -> ThinResult<TermRef<'a>>,
282) -> DFResult<ColumnarValue> {
283 match (lhs, rhs) {
284 (EncodingDatum::Array(lhs), EncodingDatum::Array(rhs)) => {
285 dispatch_binary_plain_term_array_array(lhs, rhs, op, error_op)
286 }
287 (EncodingDatum::Scalar(lhs, _), EncodingDatum::Array(rhs)) => {
288 dispatch_binary_plain_term_scalar_array(lhs, rhs, op, error_op)
289 }
290 (EncodingDatum::Array(lhs), EncodingDatum::Scalar(rhs, _)) => {
291 dispatch_binary_plain_term_array_scalar(lhs, rhs, op, error_op)
292 }
293 (EncodingDatum::Scalar(lhs, _), EncodingDatum::Scalar(rhs, _)) => {
294 dispatch_binary_plain_term_scalar_scalar(lhs, rhs, op, error_op)
295 }
296 }
297}
298
299fn dispatch_binary_plain_term_array_array<'data>(
300 lhs: &'data PlainTermArray,
301 rhs: &'data PlainTermArray,
302 op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
303 error_op: impl for<'a> Fn(
304 ThinResult<TermRef<'a>>,
305 ThinResult<TermRef<'a>>,
306 ) -> ThinResult<TermRef<'a>>,
307) -> DFResult<ColumnarValue> {
308 let lhs = DefaultPlainTermDecoder::decode_terms(lhs);
309 let rhs = DefaultPlainTermDecoder::decode_terms(rhs);
310
311 let results = lhs.zip(rhs).map(|(lhs_value, rhs_value)| {
312 apply_binary_op_plain_term(lhs_value, rhs_value, &op, &error_op)
313 });
314 let result = DefaultPlainTermEncoder::encode_terms(results)?;
315 Ok(ColumnarValue::Array(result.into_array()))
316}
317
318fn dispatch_binary_plain_term_scalar_array<'data>(
319 lhs: &'data PlainTermScalar,
320 rhs: &'data PlainTermArray,
321 op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
322 error_op: impl for<'a> Fn(
323 ThinResult<TermRef<'a>>,
324 ThinResult<TermRef<'a>>,
325 ) -> ThinResult<TermRef<'a>>,
326) -> DFResult<ColumnarValue> {
327 let results = DefaultPlainTermDecoder::decode_terms(rhs).map(|rhs_value| {
328 let lhs_value = DefaultPlainTermDecoder::decode_term(lhs);
329 apply_binary_op_plain_term(lhs_value, rhs_value, &op, &error_op)
330 });
331 let result = DefaultPlainTermEncoder::encode_terms(results)?;
332 Ok(ColumnarValue::Array(result.into_array()))
333}
334
335fn dispatch_binary_plain_term_array_scalar<'data>(
336 lhs: &'data PlainTermArray,
337 rhs: &'data PlainTermScalar,
338 op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
339 error_op: impl for<'a> Fn(
340 ThinResult<TermRef<'a>>,
341 ThinResult<TermRef<'a>>,
342 ) -> ThinResult<TermRef<'a>>,
343) -> DFResult<ColumnarValue> {
344 let results = DefaultPlainTermDecoder::decode_terms(lhs).map(|lhs_value| {
345 let rhs_value = DefaultPlainTermDecoder::decode_term(rhs);
346 apply_binary_op_plain_term(lhs_value, rhs_value, &op, &error_op)
347 });
348 let result = DefaultPlainTermEncoder::encode_terms(results)?;
349 Ok(ColumnarValue::Array(result.into_array()))
350}
351
352fn dispatch_binary_plain_term_scalar_scalar<'data>(
353 lhs: &'data PlainTermScalar,
354 rhs: &'data PlainTermScalar,
355 op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
356 error_op: impl for<'a> Fn(
357 ThinResult<TermRef<'a>>,
358 ThinResult<TermRef<'a>>,
359 ) -> ThinResult<TermRef<'a>>,
360) -> DFResult<ColumnarValue> {
361 let lhs = DefaultPlainTermDecoder::decode_term(lhs);
362 let rhs = DefaultPlainTermDecoder::decode_term(rhs);
363
364 let result = apply_binary_op_plain_term(lhs, rhs, &op, &error_op);
365 Ok(ColumnarValue::Scalar(
366 DefaultPlainTermEncoder::encode_term(result)?.into_scalar_value(),
367 ))
368}
369
370fn apply_binary_op_plain_term<'data>(
371 lhs: ThinResult<TermRef<'data>>,
372 rhs: ThinResult<TermRef<'data>>,
373 op: impl for<'a> Fn(TermRef<'a>, TermRef<'a>) -> ThinResult<TermRef<'a>>,
374 error_op: impl for<'a> Fn(
375 ThinResult<TermRef<'a>>,
376 ThinResult<TermRef<'a>>,
377 ) -> ThinResult<TermRef<'a>>,
378) -> ThinResult<TermRef<'data>> {
379 match (lhs, rhs) {
380 (Ok(lhs_value), Ok(rhs_value)) => op(lhs_value, rhs_value),
381 (lhs, rhs) => error_op(lhs, rhs),
382 }
383}