reifydb_engine/expression/arith/
add.rs1use reifydb_core::value::column::{Column, data::ColumnData, push::Push};
5use reifydb_type::{
6 error::{BinaryOp, TypeError},
7 fragment::{Fragment, LazyFragment},
8 value::{
9 container::{number::NumberContainer, temporal::TemporalContainer, utf8::Utf8Container},
10 is::IsNumber,
11 number::{promote::Promote, safe::add::SafeAdd},
12 r#type::{Type, get::GetType},
13 },
14};
15
16use crate::{
17 Result,
18 expression::{context::EvalContext, option::binary_op_unwrap_option},
19};
20
21pub(crate) fn add_columns(
22 ctx: &EvalContext,
23 left: &Column,
24 right: &Column,
25 fragment: impl LazyFragment + Copy,
26) -> Result<Column> {
27 binary_op_unwrap_option(left, right, fragment.fragment(), |left, right| {
28 let target = Type::promote(left.get_type(), right.get_type());
29
30 dispatch_arith!(
31 &left.data(), &right.data();
32 fixed: add_numeric, arb: add_numeric_clone (ctx, target, fragment);
33
34 (ColumnData::Duration(l), ColumnData::Duration(r)) => {
36 let mut container = TemporalContainer::with_capacity(l.len());
37 for i in 0..l.len() {
38 match (l.get(i), r.get(i)) {
39 (Some(lv), Some(rv)) => container.push(*lv + *rv),
40 _ => container.push_default(),
41 }
42 }
43 Ok(Column {
44 name: fragment.fragment(),
45 data: ColumnData::Duration(container),
46 })
47 }
48
49 (
51 ColumnData::Utf8 {
52 container: l,
53 ..
54 },
55 ColumnData::Utf8 {
56 container: r,
57 ..
58 },
59 ) => concat_strings(l, r, target, fragment.fragment()),
60
61 (
63 ColumnData::Utf8 {
64 container: l,
65 ..
66 },
67 r,
68 ) if can_promote_to_string(r) => concat_string_with_other(l, r, true, target, fragment.fragment()),
69
70 (
72 l,
73 ColumnData::Utf8 {
74 container: r,
75 ..
76 },
77 ) if can_promote_to_string(l) => concat_string_with_other(r, l, false, target, fragment.fragment()),
78
79 _ => return Err(TypeError::BinaryOperatorNotApplicable {
80 operator: BinaryOp::Add,
81 left: left.get_type(),
82 right: right.get_type(),
83 fragment: fragment.fragment(),
84 }.into()),
85 )
86 })
87}
88
89fn add_numeric<L, R>(
90 ctx: &EvalContext,
91 l: &NumberContainer<L>,
92 r: &NumberContainer<R>,
93 target: Type,
94 fragment: impl LazyFragment + Copy,
95) -> Result<Column>
96where
97 L: GetType + Promote<R> + IsNumber,
98 R: GetType + IsNumber,
99 <L as Promote<R>>::Output: IsNumber,
100 <L as Promote<R>>::Output: SafeAdd,
101 ColumnData: Push<<L as Promote<R>>::Output>,
102{
103 debug_assert_eq!(l.len(), r.len());
104
105 let mut data = ColumnData::with_capacity(target, l.len());
106 let l_data = l.data();
107 let r_data = r.data();
108 for i in 0..l.len() {
109 if let Some(value) = ctx.add(&l_data[i], &r_data[i], fragment)? {
110 data.push(value);
111 } else {
112 data.push_none()
113 }
114 }
115 Ok(Column {
116 name: fragment.fragment(),
117 data,
118 })
119}
120
121fn add_numeric_clone<L, R>(
122 ctx: &EvalContext,
123 l: &NumberContainer<L>,
124 r: &NumberContainer<R>,
125 target: Type,
126 fragment: impl LazyFragment + Copy,
127) -> Result<Column>
128where
129 L: Clone + GetType + Promote<R> + IsNumber,
130 R: Clone + GetType + IsNumber,
131 <L as Promote<R>>::Output: IsNumber,
132 <L as Promote<R>>::Output: SafeAdd,
133 ColumnData: Push<<L as Promote<R>>::Output>,
134{
135 debug_assert_eq!(l.len(), r.len());
136
137 let mut data = ColumnData::with_capacity(target, l.len());
138 for i in 0..l.len() {
139 match (l.get(i), r.get(i)) {
140 (Some(l_val), Some(r_val)) => {
141 let l_clone = l_val.clone();
142 let r_clone = r_val.clone();
143 if let Some(value) = ctx.add(&l_clone, &r_clone, fragment)? {
144 data.push(value);
145 } else {
146 data.push_none()
147 }
148 }
149 _ => data.push_none(),
150 }
151 }
152 Ok(Column {
153 name: fragment.fragment(),
154 data,
155 })
156}
157
158fn can_promote_to_string(data: &ColumnData) -> bool {
159 matches!(
160 data,
161 ColumnData::Bool(_)
162 | ColumnData::Float4(_)
163 | ColumnData::Float8(_)
164 | ColumnData::Int1(_) | ColumnData::Int2(_)
165 | ColumnData::Int4(_) | ColumnData::Int8(_)
166 | ColumnData::Int16(_)
167 | ColumnData::Uint1(_)
168 | ColumnData::Uint2(_)
169 | ColumnData::Uint4(_)
170 | ColumnData::Uint8(_)
171 | ColumnData::Uint16(_)
172 | ColumnData::Date(_) | ColumnData::DateTime(_)
173 | ColumnData::Time(_) | ColumnData::Duration(_)
174 | ColumnData::Uuid4(_)
175 | ColumnData::Uuid7(_)
176 | ColumnData::Blob { .. }
177 | ColumnData::Int { .. }
178 | ColumnData::Uint { .. }
179 | ColumnData::Decimal { .. }
180 )
181}
182
183fn concat_strings(l: &Utf8Container, r: &Utf8Container, target: Type, fragment: Fragment) -> Result<Column> {
184 debug_assert_eq!(l.len(), r.len());
185
186 let mut data = ColumnData::with_capacity(target, l.len());
187 for i in 0..l.len() {
188 match (l.get(i), r.get(i)) {
189 (Some(l_str), Some(r_str)) => {
190 let concatenated = format!("{}{}", l_str, r_str);
191 data.push(concatenated);
192 }
193 _ => data.push_none(),
194 }
195 }
196 Ok(Column {
197 name: fragment,
198 data,
199 })
200}
201
202fn concat_string_with_other(
203 string_data: &Utf8Container,
204 other_data: &ColumnData,
205 string_is_left: bool,
206 target: Type,
207 fragment: Fragment,
208) -> Result<Column> {
209 debug_assert_eq!(string_data.len(), other_data.len());
210
211 let mut data = ColumnData::with_capacity(target, string_data.len());
212 for i in 0..string_data.len() {
213 match (string_data.get(i), other_data.is_defined(i)) {
214 (Some(str_val), true) => {
215 let other_str = other_data.as_string(i);
216 let concatenated = if string_is_left {
217 format!("{}{}", str_val, other_str)
218 } else {
219 format!("{}{}", other_str, str_val)
220 };
221 data.push(concatenated);
222 }
223 _ => data.push_none(),
224 }
225 }
226 Ok(Column {
227 name: fragment,
228 data,
229 })
230}