Skip to main content

reifydb_engine/expression/arith/
add.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, push::Push};
5use reifydb_type::{
6	error::{BinaryOp, TypeError},
7	fragment::{Fragment, LazyFragment},
8	value::{
9		container::{number::NumberContainer, temporal::TemporalContainer, utf8::Utf8Container},
10		is::IsNumber,
11		number::{promote::Promote, safe::add::SafeAdd},
12		r#type::{Type, get::GetType},
13	},
14};
15
16use crate::{
17	Result,
18	expression::{context::EvalContext, option::binary_op_unwrap_option},
19};
20
21pub(crate) fn add_columns(
22	ctx: &EvalContext,
23	left: &ColumnWithName,
24	right: &ColumnWithName,
25	fragment: impl LazyFragment + Copy,
26) -> Result<ColumnWithName> {
27	binary_op_unwrap_option(left, right, fragment.fragment(), |left, right| {
28		let target = Type::promote(left.get_type(), right.get_type());
29
30		dispatch_arith!(
31			&left.data(), &right.data();
32			fixed: add_numeric, arb: add_numeric_clone (ctx, target, fragment);
33
34			// Duration + Duration
35			(ColumnBuffer::Duration(l), ColumnBuffer::Duration(r)) => {
36				let mut container = TemporalContainer::with_capacity(l.len());
37				for i in 0..l.len() {
38					match (l.get(i), r.get(i)) {
39						(Some(lv), Some(rv)) => container.push(*lv + *rv),
40						_ => container.push_default(),
41					}
42				}
43				Ok(ColumnWithName::new(fragment.fragment(), ColumnBuffer::Duration(container)))
44			}
45
46			// String concatenation
47			(
48				ColumnBuffer::Utf8 {
49					container: l,
50					..
51				},
52				ColumnBuffer::Utf8 {
53					container: r,
54					..
55				},
56			) => concat_strings(l, r, target, fragment.fragment()),
57
58			// String + Other types (auto-promote to string)
59			(
60				ColumnBuffer::Utf8 {
61					container: l,
62					..
63				},
64				r,
65			) if can_promote_to_string(r) => concat_string_with_other(l, r, true, target, fragment.fragment()),
66
67			// Other types + String (auto-promote to string)
68			(
69				l,
70				ColumnBuffer::Utf8 {
71					container: r,
72					..
73				},
74			) if can_promote_to_string(l) => concat_string_with_other(r, l, false, target, fragment.fragment()),
75
76			_ => Err(TypeError::BinaryOperatorNotApplicable {
77				operator: BinaryOp::Add,
78				left: left.get_type(),
79				right: right.get_type(),
80				fragment: fragment.fragment(),
81			}.into()),
82		)
83	})
84}
85
86fn add_numeric<L, R>(
87	ctx: &EvalContext,
88	l: &NumberContainer<L>,
89	r: &NumberContainer<R>,
90	target: Type,
91	fragment: impl LazyFragment + Copy,
92) -> Result<ColumnWithName>
93where
94	L: GetType + Promote<R> + IsNumber,
95	R: GetType + IsNumber,
96	<L as Promote<R>>::Output: IsNumber,
97	<L as Promote<R>>::Output: SafeAdd,
98	ColumnBuffer: Push<<L as Promote<R>>::Output>,
99{
100	debug_assert_eq!(l.len(), r.len());
101
102	let mut data = ColumnBuffer::with_capacity(target, l.len());
103	let l_data = l.data();
104	let r_data = r.data();
105	for i in 0..l.len() {
106		if let Some(value) = ctx.add(&l_data[i], &r_data[i], fragment)? {
107			data.push(value);
108		} else {
109			data.push_none()
110		}
111	}
112	Ok(ColumnWithName {
113		name: fragment.fragment(),
114		data,
115	})
116}
117
118fn add_numeric_clone<L, R>(
119	ctx: &EvalContext,
120	l: &NumberContainer<L>,
121	r: &NumberContainer<R>,
122	target: Type,
123	fragment: impl LazyFragment + Copy,
124) -> Result<ColumnWithName>
125where
126	L: Clone + GetType + Promote<R> + IsNumber,
127	R: Clone + GetType + IsNumber,
128	<L as Promote<R>>::Output: IsNumber,
129	<L as Promote<R>>::Output: SafeAdd,
130	ColumnBuffer: Push<<L as Promote<R>>::Output>,
131{
132	debug_assert_eq!(l.len(), r.len());
133
134	let mut data = ColumnBuffer::with_capacity(target, l.len());
135	for i in 0..l.len() {
136		match (l.get(i), r.get(i)) {
137			(Some(l_val), Some(r_val)) => {
138				let l_clone = l_val.clone();
139				let r_clone = r_val.clone();
140				if let Some(value) = ctx.add(&l_clone, &r_clone, fragment)? {
141					data.push(value);
142				} else {
143					data.push_none()
144				}
145			}
146			_ => data.push_none(),
147		}
148	}
149	Ok(ColumnWithName {
150		name: fragment.fragment(),
151		data,
152	})
153}
154
155fn can_promote_to_string(data: &ColumnBuffer) -> bool {
156	matches!(
157		data,
158		ColumnBuffer::Bool(_)
159			| ColumnBuffer::Float4(_)
160			| ColumnBuffer::Float8(_)
161			| ColumnBuffer::Int1(_)
162			| ColumnBuffer::Int2(_)
163			| ColumnBuffer::Int4(_)
164			| ColumnBuffer::Int8(_)
165			| ColumnBuffer::Int16(_)
166			| ColumnBuffer::Uint1(_)
167			| ColumnBuffer::Uint2(_)
168			| ColumnBuffer::Uint4(_)
169			| ColumnBuffer::Uint8(_)
170			| ColumnBuffer::Uint16(_)
171			| ColumnBuffer::Date(_)
172			| ColumnBuffer::DateTime(_)
173			| ColumnBuffer::Time(_)
174			| ColumnBuffer::Duration(_)
175			| ColumnBuffer::Uuid4(_)
176			| ColumnBuffer::Uuid7(_)
177			| ColumnBuffer::Blob { .. }
178			| ColumnBuffer::Int { .. }
179			| ColumnBuffer::Uint { .. }
180			| ColumnBuffer::Decimal { .. }
181	)
182}
183
184fn concat_strings(l: &Utf8Container, r: &Utf8Container, target: Type, fragment: Fragment) -> Result<ColumnWithName> {
185	debug_assert_eq!(l.len(), r.len());
186
187	let mut data = ColumnBuffer::with_capacity(target, l.len());
188	for i in 0..l.len() {
189		match (l.get(i), r.get(i)) {
190			(Some(l_str), Some(r_str)) => {
191				let concatenated = format!("{}{}", l_str, r_str);
192				data.push(concatenated);
193			}
194			_ => data.push_none(),
195		}
196	}
197	Ok(ColumnWithName {
198		name: fragment,
199		data,
200	})
201}
202
203fn concat_string_with_other(
204	string_data: &Utf8Container,
205	other_data: &ColumnBuffer,
206	string_is_left: bool,
207	target: Type,
208	fragment: Fragment,
209) -> Result<ColumnWithName> {
210	debug_assert_eq!(string_data.len(), other_data.len());
211
212	let mut data = ColumnBuffer::with_capacity(target, string_data.len());
213	for i in 0..string_data.len() {
214		match (string_data.get(i), other_data.is_defined(i)) {
215			(Some(str_val), true) => {
216				let other_str = other_data.as_string(i);
217				let concatenated = if string_is_left {
218					format!("{}{}", str_val, other_str)
219				} else {
220					format!("{}{}", other_str, str_val)
221				};
222				data.push(concatenated);
223			}
224			_ => data.push_none(),
225		}
226	}
227	Ok(ColumnWithName {
228		name: fragment,
229		data,
230	})
231}