1use std::any::Any;
5
6use vortex_buffer::{Buffer, BufferMut};
7use vortex_dtype::{DType, DecimalDType, Nullability};
8use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_panic};
9use vortex_mask::Mask;
10use vortex_scalar::{BigCast, NativeDecimalType, i256, match_each_decimal_value_type};
11
12use crate::arrays::DecimalArray;
13use crate::builders::ArrayBuilder;
14use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
15use crate::{Array, ArrayRef, IntoArray, ToCanonical};
16
17enum DecimalBuffer {
23 I8(BufferMut<i8>),
24 I16(BufferMut<i16>),
25 I32(BufferMut<i32>),
26 I64(BufferMut<i64>),
27 I128(BufferMut<i128>),
28 I256(BufferMut<i256>),
29}
30
31impl Default for DecimalBuffer {
32 fn default() -> Self {
33 Self::I8(BufferMut::<i8>::empty())
34 }
35}
36
37macro_rules! impl_from_buffer {
38 ($typ:ty, $variant:ident) => {
39 impl From<BufferMut<$typ>> for DecimalBuffer {
40 fn from(buffer: BufferMut<$typ>) -> Self {
41 Self::$variant(buffer)
42 }
43 }
44 };
45}
46impl_from_buffer!(i8, I8);
47impl_from_buffer!(i16, I16);
48impl_from_buffer!(i32, I32);
49impl_from_buffer!(i64, I64);
50impl_from_buffer!(i128, I128);
51impl_from_buffer!(i256, I256);
52
53macro_rules! delegate_fn {
54 ($self:expr, | $tname:ident, $buffer:ident | $body:block) => {{
55 #[allow(unused)]
56 match $self {
57 DecimalBuffer::I8(buffer) => {
58 type $tname = i8;
59 let $buffer = buffer;
60 $body
61 }
62 DecimalBuffer::I16(buffer) => {
63 type $tname = i16;
64 let $buffer = buffer;
65 $body
66 }
67 DecimalBuffer::I32(buffer) => {
68 type $tname = i32;
69 let $buffer = buffer;
70 $body
71 }
72 DecimalBuffer::I64(buffer) => {
73 type $tname = i64;
74 let $buffer = buffer;
75 $body
76 }
77 DecimalBuffer::I128(buffer) => {
78 type $tname = i128;
79 let $buffer = buffer;
80 $body
81 }
82 DecimalBuffer::I256(buffer) => {
83 type $tname = i256;
84 let $buffer = buffer;
85 $body
86 }
87 }
88 }};
89}
90
91impl DecimalBuffer {
92 fn push<V: NativeDecimalType>(&mut self, value: V) {
93 delegate_fn!(self, |T, buffer| {
94 buffer.push(<T as BigCast>::from(value).vortex_expect("decimal conversion failure"))
95 });
96 }
97
98 fn push_n<V: NativeDecimalType>(&mut self, value: V, n: usize) {
99 delegate_fn!(self, |T, buffer| {
100 buffer.push_n(
101 <T as BigCast>::from(value).vortex_expect("decimal conversion failure"),
102 n,
103 )
104 });
105 }
106
107 fn reserve(&mut self, additional: usize) {
108 delegate_fn!(self, |T, buffer| { buffer.reserve(additional) })
109 }
110
111 fn capacity(&self) -> usize {
112 delegate_fn!(self, |T, buffer| { buffer.capacity() })
113 }
114
115 fn len(&self) -> usize {
116 delegate_fn!(self, |T, buffer| { buffer.len() })
117 }
118
119 pub fn extend<I, V: NativeDecimalType>(&mut self, iter: I)
120 where
121 I: Iterator<Item = V>,
122 {
123 delegate_fn!(self, |T, buffer| {
124 buffer.extend(
125 iter.map(|x| <T as BigCast>::from(x).vortex_expect("decimal conversion failure")),
126 )
127 })
128 }
129}
130
131pub struct DecimalBuilder {
137 values: DecimalBuffer,
138 nulls: LazyNullBufferBuilder,
139 dtype: DType,
140}
141
142const DEFAULT_BUILDER_CAPACITY: usize = 1024;
143
144impl DecimalBuilder {
145 pub fn new<T: NativeDecimalType>(precision: u8, scale: i8, nullability: Nullability) -> Self {
146 Self::with_capacity::<T>(
147 DEFAULT_BUILDER_CAPACITY,
148 DecimalDType::new(precision, scale),
149 nullability,
150 )
151 }
152
153 pub fn with_capacity<T: NativeDecimalType>(
154 capacity: usize,
155 decimal: DecimalDType,
156 nullability: Nullability,
157 ) -> Self {
158 Self {
159 values: match_each_decimal_value_type!(T::VALUES_TYPE, |D| {
160 DecimalBuffer::from(BufferMut::<D>::with_capacity(capacity))
161 }),
162 nulls: LazyNullBufferBuilder::new(capacity),
163 dtype: DType::Decimal(decimal, nullability),
164 }
165 }
166}
167
168impl DecimalBuilder {
169 fn extend_with_validity_mask(&mut self, validity_mask: Mask) {
170 self.nulls.append_validity_mask(validity_mask);
171 }
172
173 fn extend_from_buffer<V: NativeDecimalType>(&mut self, values: &Buffer<V>) {
176 self.values.extend(values.iter().copied());
177 }
178}
179
180impl DecimalBuilder {
181 pub fn append_value<V: NativeDecimalType>(&mut self, value: V) {
182 self.values.push(value);
183 self.nulls.append(true);
184 }
185
186 pub fn append_option<V: NativeDecimalType>(&mut self, value: Option<V>) {
187 match value {
188 Some(value) => {
189 self.values.push(value);
190 self.nulls.append(true);
191 }
192 None => self.append_null(),
193 }
194 }
195
196 pub fn append_mask(&mut self, mask: Mask) {
198 self.nulls.append_validity_mask(mask);
199 }
200}
201
202impl DecimalBuilder {
203 pub fn finish_into_decimal(&mut self) -> DecimalArray {
204 let validity = self.nulls.finish_with_nullability(self.dtype.nullability());
205
206 let DType::Decimal(decimal_dtype, _) = self.dtype else {
207 vortex_panic!("DecimalBuilder must have Decimal DType");
208 };
209
210 delegate_fn!(std::mem::take(&mut self.values), |T, values| {
211 DecimalArray::new::<T>(values.freeze(), decimal_dtype, validity)
212 })
213 }
214}
215
216impl ArrayBuilder for DecimalBuilder {
217 fn as_any(&self) -> &dyn Any {
218 self
219 }
220
221 fn as_any_mut(&mut self) -> &mut dyn Any {
222 self
223 }
224
225 fn dtype(&self) -> &DType {
226 &self.dtype
227 }
228
229 fn len(&self) -> usize {
230 self.values.len()
231 }
232
233 fn append_zeros(&mut self, n: usize) {
234 self.values.push_n(0, n);
235 self.nulls.append_n_non_nulls(n);
236 }
237
238 fn append_nulls(&mut self, n: usize) {
239 self.values.push_n(0, n);
240 self.nulls.append_n_nulls(n);
241 }
242
243 fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
244 let array = array.to_decimal()?;
245
246 let DType::Decimal(decimal_dtype, _) = self.dtype else {
247 vortex_panic!("DecimalBuilder must have Decimal DType");
248 };
249
250 if array.decimal_dtype() != decimal_dtype {
251 vortex_bail!(
252 "Cannot extend from array with different decimal type: {:?} != {:?}",
253 array.decimal_dtype(),
254 decimal_dtype
255 );
256 }
257
258 match_each_decimal_value_type!(array.values_type(), |D| {
259 self.extend_from_buffer(&array.buffer::<D>())
260 });
261
262 self.extend_with_validity_mask(array.validity_mask());
263
264 Ok(())
265 }
266
267 fn ensure_capacity(&mut self, capacity: usize) {
268 if capacity > self.values.capacity() {
269 self.values.reserve(capacity - self.values.len());
270 self.nulls.ensure_capacity(capacity);
271 }
272 }
273
274 fn set_validity(&mut self, validity: Mask) {
275 self.nulls = LazyNullBufferBuilder::new(validity.len());
276 self.nulls.append_validity_mask(validity);
277 }
278
279 fn finish(&mut self) -> ArrayRef {
280 self.finish_into_decimal().into_array()
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use crate::builders::{ArrayBuilder, DecimalBuilder};
287
288 #[test]
289 fn test_mixed_extend() {
290 let mut i8s = DecimalBuilder::new::<i8>(2, 1, false.into());
291 i8s.append_value(10);
292 i8s.append_value(11);
293 i8s.append_value(12);
294 let i8s = i8s.finish();
295
296 let mut i128s = DecimalBuilder::new::<i128>(2, 1, false.into());
297 i128s.extend_from_array(&i8s).unwrap();
298 let i128s = i128s.finish_into_decimal();
299 assert_eq!(i128s.buffer::<i128>().as_slice(), &[10, 11, 12]);
300 }
301}