1use std::any::Any;
2
3use vortex_buffer::{Buffer, BufferMut};
4use vortex_dtype::{DType, DecimalDType, Nullability};
5use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_panic};
6use vortex_mask::Mask;
7use vortex_scalar::{BigCast, NativeDecimalType, i256, match_each_decimal_value_type};
8
9use crate::arrays::{BoolArray, DecimalArray};
10use crate::builders::ArrayBuilder;
11use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
12use crate::validity::Validity;
13use crate::{Array, ArrayRef, IntoArray, ToCanonical};
14
15enum DecimalBuffer {
21 I8(BufferMut<i8>),
22 I16(BufferMut<i16>),
23 I32(BufferMut<i32>),
24 I64(BufferMut<i64>),
25 I128(BufferMut<i128>),
26 I256(BufferMut<i256>),
27}
28
29impl Default for DecimalBuffer {
30 fn default() -> Self {
31 Self::I8(BufferMut::<i8>::empty())
32 }
33}
34
35macro_rules! impl_from_buffer {
36 ($typ:ty, $variant:ident) => {
37 impl From<BufferMut<$typ>> for DecimalBuffer {
38 fn from(buffer: BufferMut<$typ>) -> Self {
39 Self::$variant(buffer)
40 }
41 }
42 };
43}
44impl_from_buffer!(i8, I8);
45impl_from_buffer!(i16, I16);
46impl_from_buffer!(i32, I32);
47impl_from_buffer!(i64, I64);
48impl_from_buffer!(i128, I128);
49impl_from_buffer!(i256, I256);
50
51macro_rules! delegate_fn {
52 ($self:expr, | $tname:ident, $buffer:ident | $body:block) => {{
53 #[allow(unused)]
54 match $self {
55 DecimalBuffer::I8(buffer) => {
56 type $tname = i8;
57 let $buffer = buffer;
58 $body
59 }
60 DecimalBuffer::I16(buffer) => {
61 type $tname = i16;
62 let $buffer = buffer;
63 $body
64 }
65 DecimalBuffer::I32(buffer) => {
66 type $tname = i32;
67 let $buffer = buffer;
68 $body
69 }
70 DecimalBuffer::I64(buffer) => {
71 type $tname = i64;
72 let $buffer = buffer;
73 $body
74 }
75 DecimalBuffer::I128(buffer) => {
76 type $tname = i128;
77 let $buffer = buffer;
78 $body
79 }
80 DecimalBuffer::I256(buffer) => {
81 type $tname = i256;
82 let $buffer = buffer;
83 $body
84 }
85 }
86 }};
87}
88
89impl DecimalBuffer {
90 fn push<V: NativeDecimalType>(&mut self, value: V) {
91 delegate_fn!(self, |T, buffer| {
92 buffer.push(<T as BigCast>::from(value).vortex_expect("decimal conversion failure"))
93 });
94 }
95
96 fn push_n<V: NativeDecimalType>(&mut self, value: V, n: usize) {
97 delegate_fn!(self, |T, buffer| {
98 buffer.push_n(
99 <T as BigCast>::from(value).vortex_expect("decimal conversion failure"),
100 n,
101 )
102 });
103 }
104
105 fn reserve(&mut self, additional: usize) {
106 delegate_fn!(self, |T, buffer| { buffer.reserve(additional) })
107 }
108
109 fn capacity(&self) -> usize {
110 delegate_fn!(self, |T, buffer| { buffer.capacity() })
111 }
112
113 fn len(&self) -> usize {
114 delegate_fn!(self, |T, buffer| { buffer.len() })
115 }
116
117 pub fn extend<I, V: NativeDecimalType>(&mut self, iter: I)
118 where
119 I: Iterator<Item = V>,
120 {
121 delegate_fn!(self, |T, buffer| {
122 buffer.extend(
123 iter.map(|x| <T as BigCast>::from(x).vortex_expect("decimal conversion failure")),
124 )
125 })
126 }
127}
128
129pub struct DecimalBuilder {
135 values: DecimalBuffer,
136 nulls: LazyNullBufferBuilder,
137 dtype: DType,
138}
139
140const DEFAULT_BUILDER_CAPACITY: usize = 1024;
141
142impl DecimalBuilder {
143 pub fn new<T: NativeDecimalType>(precision: u8, scale: i8, nullability: Nullability) -> Self {
144 Self::with_capacity::<T>(
145 DEFAULT_BUILDER_CAPACITY,
146 DecimalDType::new(precision, scale),
147 nullability,
148 )
149 }
150
151 pub fn with_capacity<T: NativeDecimalType>(
152 capacity: usize,
153 decimal: DecimalDType,
154 nullability: Nullability,
155 ) -> Self {
156 Self {
157 values: match_each_decimal_value_type!(T::VALUES_TYPE, |D| {
158 DecimalBuffer::from(BufferMut::<D>::with_capacity(capacity))
159 }),
160 nulls: LazyNullBufferBuilder::new(capacity),
161 dtype: DType::Decimal(decimal, nullability),
162 }
163 }
164}
165
166impl DecimalBuilder {
167 fn extend_with_validity_mask(&mut self, validity_mask: Mask) {
168 self.nulls.append_validity_mask(validity_mask);
169 }
170
171 fn extend_from_buffer<V: NativeDecimalType>(&mut self, values: &Buffer<V>) {
174 self.values.extend(values.iter().copied());
175 }
176}
177
178impl DecimalBuilder {
179 pub fn append_value<V: NativeDecimalType>(&mut self, value: V) {
180 self.values.push(value);
181 self.nulls.append(true);
182 }
183
184 pub fn append_option<V: NativeDecimalType>(&mut self, value: Option<V>) {
185 match value {
186 Some(value) => {
187 self.values.push(value);
188 self.nulls.append(true);
189 }
190 None => self.append_null(),
191 }
192 }
193
194 pub fn append_mask(&mut self, mask: Mask) {
196 self.nulls.append_validity_mask(mask);
197 }
198}
199
200impl DecimalBuilder {
201 pub fn finish_into_decimal(&mut self) -> DecimalArray {
202 let nulls = self.nulls.finish();
203
204 if let Some(null_buf) = nulls.as_ref() {
205 assert_eq!(
206 null_buf.len(),
207 self.values.len(),
208 "null buffer length must equal value buffer length"
209 );
210 }
211
212 let validity = match (nulls, self.dtype.nullability()) {
213 (None, Nullability::NonNullable) => Validity::NonNullable,
214 (Some(_), Nullability::NonNullable) => {
215 vortex_panic!("Non-nullable builder has null values")
216 }
217 (None, Nullability::Nullable) => Validity::AllValid,
218 (Some(nulls), Nullability::Nullable) => {
219 if nulls.null_count() == nulls.len() {
220 Validity::AllInvalid
221 } else {
222 Validity::Array(BoolArray::from(nulls.into_inner()).into_array())
223 }
224 }
225 };
226
227 let DType::Decimal(decimal_dtype, _) = self.dtype else {
228 vortex_panic!("DecimalBuilder must have Decimal DType");
229 };
230
231 delegate_fn!(std::mem::take(&mut self.values), |T, values| {
232 DecimalArray::new::<T>(values.freeze(), decimal_dtype, validity)
233 })
234 }
235}
236
237impl ArrayBuilder for DecimalBuilder {
238 fn as_any(&self) -> &dyn Any {
239 self
240 }
241
242 fn as_any_mut(&mut self) -> &mut dyn Any {
243 self
244 }
245
246 fn dtype(&self) -> &DType {
247 &self.dtype
248 }
249
250 fn len(&self) -> usize {
251 self.values.len()
252 }
253
254 fn append_zeros(&mut self, n: usize) {
255 self.values.push_n(0, n);
256 self.nulls.append_n_non_nulls(n);
257 }
258
259 fn append_nulls(&mut self, n: usize) {
260 self.values.push_n(0, n);
261 self.nulls.append_n_nulls(n);
262 }
263
264 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
265 let array = array.to_decimal()?;
266
267 let DType::Decimal(decimal_dtype, _) = self.dtype else {
268 vortex_panic!("DecimalBuilder must have Decimal DType");
269 };
270
271 if array.decimal_dtype() != decimal_dtype {
272 vortex_bail!(
273 "Cannot extend from array with different decimal type: {:?} != {:?}",
274 array.decimal_dtype(),
275 decimal_dtype
276 );
277 }
278
279 match_each_decimal_value_type!(array.values_type(), |D| {
280 self.extend_from_buffer(&array.buffer::<D>())
281 });
282
283 self.extend_with_validity_mask(array.validity_mask()?);
284
285 Ok(())
286 }
287
288 fn ensure_capacity(&mut self, capacity: usize) {
289 if capacity > self.values.capacity() {
290 self.values.reserve(capacity - self.values.len());
291 self.nulls.ensure_capacity(capacity);
292 }
293 }
294
295 fn set_validity(&mut self, validity: Mask) {
296 self.nulls = LazyNullBufferBuilder::new(validity.len());
297 self.nulls.append_validity_mask(validity);
298 }
299
300 fn finish(&mut self) -> ArrayRef {
301 self.finish_into_decimal().into_array()
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use crate::builders::{ArrayBuilder, DecimalBuilder};
308
309 #[test]
310 fn test_mixed_extend() {
311 let mut i8s = DecimalBuilder::new::<i8>(2, 1, false.into());
312 i8s.append_value(10);
313 i8s.append_value(11);
314 i8s.append_value(12);
315 let i8s = i8s.finish();
316
317 let mut i128s = DecimalBuilder::new::<i128>(2, 1, false.into());
318 i128s.extend_from_array(&i8s).unwrap();
319 let i128s = i128s.finish_into_decimal();
320 assert_eq!(i128s.buffer::<i128>().as_slice(), &[10, 11, 12]);
321 }
322}