1use crate::types::bytes::ByteArrayNativeType;
19use std::{any::Any, sync::Arc};
20
21use crate::{
22    ArrayRef, ArrowPrimitiveType, RunArray,
23    types::{BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, RunEndIndexType, Utf8Type},
24};
25
26use super::{ArrayBuilder, GenericByteBuilder, PrimitiveBuilder};
27
28use arrow_buffer::ArrowNativeType;
29
30#[derive(Debug)]
65pub struct GenericByteRunBuilder<R, V>
66where
67    R: ArrowPrimitiveType,
68    V: ByteArrayType,
69{
70    run_ends_builder: PrimitiveBuilder<R>,
71    values_builder: GenericByteBuilder<V>,
72    current_value: Vec<u8>,
73    has_current_value: bool,
74    current_run_end_index: usize,
75    prev_run_end_index: usize,
76}
77
78impl<R, V> Default for GenericByteRunBuilder<R, V>
79where
80    R: ArrowPrimitiveType,
81    V: ByteArrayType,
82{
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl<R, V> GenericByteRunBuilder<R, V>
89where
90    R: ArrowPrimitiveType,
91    V: ByteArrayType,
92{
93    pub fn new() -> Self {
95        Self {
96            run_ends_builder: PrimitiveBuilder::new(),
97            values_builder: GenericByteBuilder::<V>::new(),
98            current_value: Vec::new(),
99            has_current_value: false,
100            current_run_end_index: 0,
101            prev_run_end_index: 0,
102        }
103    }
104
105    pub fn with_capacity(capacity: usize, data_capacity: usize) -> Self {
110        Self {
111            run_ends_builder: PrimitiveBuilder::with_capacity(capacity),
112            values_builder: GenericByteBuilder::<V>::with_capacity(capacity, data_capacity),
113            current_value: Vec::new(),
114            has_current_value: false,
115            current_run_end_index: 0,
116            prev_run_end_index: 0,
117        }
118    }
119}
120
121impl<R, V> ArrayBuilder for GenericByteRunBuilder<R, V>
122where
123    R: RunEndIndexType,
124    V: ByteArrayType,
125{
126    fn as_any(&self) -> &dyn Any {
128        self
129    }
130
131    fn as_any_mut(&mut self) -> &mut dyn Any {
133        self
134    }
135
136    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
138        self
139    }
140
141    fn len(&self) -> usize {
144        self.current_run_end_index
145    }
146
147    fn finish(&mut self) -> ArrayRef {
149        Arc::new(self.finish())
150    }
151
152    fn finish_cloned(&self) -> ArrayRef {
154        Arc::new(self.finish_cloned())
155    }
156}
157
158impl<R, V> GenericByteRunBuilder<R, V>
159where
160    R: RunEndIndexType,
161    V: ByteArrayType,
162{
163    pub fn append_option(&mut self, input_value: Option<impl AsRef<V::Native>>) {
165        match input_value {
166            Some(value) => self.append_value(value),
167            None => self.append_null(),
168        }
169    }
170
171    pub fn append_value(&mut self, input_value: impl AsRef<V::Native>) {
173        let value: &[u8] = input_value.as_ref().as_ref();
174        if !self.has_current_value {
175            self.append_run_end();
176            self.current_value.extend_from_slice(value);
177            self.has_current_value = true;
178        } else if self.current_value.as_slice() != value {
179            self.append_run_end();
180            self.current_value.clear();
181            self.current_value.extend_from_slice(value);
182        }
183        self.current_run_end_index += 1;
184    }
185
186    pub fn append_null(&mut self) {
188        if self.has_current_value {
189            self.append_run_end();
190            self.current_value.clear();
191            self.has_current_value = false;
192        }
193        self.current_run_end_index += 1;
194    }
195
196    pub fn finish(&mut self) -> RunArray<R> {
199        self.append_run_end();
201
202        self.current_value.clear();
204        self.has_current_value = false;
205        self.current_run_end_index = 0;
206        self.prev_run_end_index = 0;
207
208        let run_ends_array = self.run_ends_builder.finish();
210        let values_array = self.values_builder.finish();
211        RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
212    }
213
214    pub fn finish_cloned(&self) -> RunArray<R> {
217        let mut run_ends_array = self.run_ends_builder.finish_cloned();
218        let mut values_array = self.values_builder.finish_cloned();
219
220        if self.prev_run_end_index != self.current_run_end_index {
222            let mut run_end_builder = run_ends_array.into_builder().unwrap();
223            let mut values_builder = values_array.into_builder().unwrap();
224            self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder);
225            run_ends_array = run_end_builder.finish();
226            values_array = values_builder.finish();
227        }
228
229        RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
230    }
231
232    fn append_run_end(&mut self) {
234        if self.prev_run_end_index == self.current_run_end_index {
236            return;
237        }
238        let run_end_index = self.run_end_index_as_native();
239        self.run_ends_builder.append_value(run_end_index);
240        if self.has_current_value {
241            let slice = self.current_value.as_slice();
242            let native = unsafe {
243                V::Native::from_bytes_unchecked(slice)
247            };
248            self.values_builder.append_value(native);
249        } else {
250            self.values_builder.append_null();
251        }
252        self.prev_run_end_index = self.current_run_end_index;
253    }
254
255    fn append_run_end_with_builders(
258        &self,
259        run_ends_builder: &mut PrimitiveBuilder<R>,
260        values_builder: &mut GenericByteBuilder<V>,
261    ) {
262        let run_end_index = self.run_end_index_as_native();
263        run_ends_builder.append_value(run_end_index);
264        if self.has_current_value {
265            let slice = self.current_value.as_slice();
266            let native = unsafe {
267                V::Native::from_bytes_unchecked(slice)
271            };
272            values_builder.append_value(native);
273        } else {
274            values_builder.append_null();
275        }
276    }
277
278    fn run_end_index_as_native(&self) -> R::Native {
279        R::Native::from_usize(self.current_run_end_index).unwrap_or_else(|| {
280            panic!(
281                "Cannot convert the value {} from `usize` to native form of arrow datatype {}",
282                self.current_run_end_index,
283                R::DATA_TYPE
284            )
285        })
286    }
287}
288
289impl<R, V, S> Extend<Option<S>> for GenericByteRunBuilder<R, V>
290where
291    R: RunEndIndexType,
292    V: ByteArrayType,
293    S: AsRef<V::Native>,
294{
295    fn extend<T: IntoIterator<Item = Option<S>>>(&mut self, iter: T) {
296        for elem in iter {
297            self.append_option(elem);
298        }
299    }
300}
301
302pub type StringRunBuilder<K> = GenericByteRunBuilder<K, Utf8Type>;
334
335pub type LargeStringRunBuilder<K> = GenericByteRunBuilder<K, LargeUtf8Type>;
337
338pub type BinaryRunBuilder<K> = GenericByteRunBuilder<K, BinaryType>;
370
371pub type LargeBinaryRunBuilder<K> = GenericByteRunBuilder<K, LargeBinaryType>;
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    use crate::GenericByteArray;
379    use crate::Int16RunArray;
380    use crate::array::Array;
381    use crate::cast::AsArray;
382    use crate::types::{Int16Type, Int32Type};
383
384    fn test_bytes_run_builder<T>(values: Vec<&T::Native>)
385    where
386        T: ByteArrayType,
387        <T as ByteArrayType>::Native: PartialEq,
388        <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
389    {
390        let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
391        builder.append_value(values[0]);
392        builder.append_value(values[0]);
393        builder.append_value(values[0]);
394        builder.append_null();
395        builder.append_null();
396        builder.append_value(values[1]);
397        builder.append_value(values[1]);
398        builder.append_value(values[2]);
399        builder.append_value(values[2]);
400        builder.append_value(values[2]);
401        builder.append_value(values[2]);
402        let array = builder.finish();
403
404        assert_eq!(array.len(), 11);
405        assert_eq!(array.null_count(), 0);
406        assert_eq!(array.logical_null_count(), 2);
407
408        assert_eq!(array.run_ends().values(), &[3, 5, 7, 11]);
409
410        let av = array.values();
412        let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
413
414        assert_eq!(*ava.value(0), *values[0]);
415        assert!(ava.is_null(1));
416        assert_eq!(*ava.value(2), *values[1]);
417        assert_eq!(*ava.value(3), *values[2]);
418    }
419
420    #[test]
421    fn test_string_run_builder() {
422        test_bytes_run_builder::<Utf8Type>(vec!["abc", "def", "ghi"]);
423    }
424
425    #[test]
426    fn test_string_run_builder_with_empty_strings() {
427        test_bytes_run_builder::<Utf8Type>(vec!["abc", "", "ghi"]);
428    }
429
430    #[test]
431    fn test_binary_run_builder() {
432        test_bytes_run_builder::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
433    }
434
435    fn test_bytes_run_builder_finish_cloned<T>(values: Vec<&T::Native>)
436    where
437        T: ByteArrayType,
438        <T as ByteArrayType>::Native: PartialEq,
439        <T as ByteArrayType>::Native: AsRef<<T as ByteArrayType>::Native>,
440    {
441        let mut builder = GenericByteRunBuilder::<Int16Type, T>::new();
442
443        builder.append_value(values[0]);
444        builder.append_null();
445        builder.append_value(values[1]);
446        builder.append_value(values[1]);
447        builder.append_value(values[0]);
448        let mut array: Int16RunArray = builder.finish_cloned();
449
450        assert_eq!(array.len(), 5);
451        assert_eq!(array.null_count(), 0);
452        assert_eq!(array.logical_null_count(), 1);
453
454        assert_eq!(array.run_ends().values(), &[1, 2, 4, 5]);
455
456        let av = array.values();
458        let ava: &GenericByteArray<T> = av.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
459
460        assert_eq!(ava.value(0), values[0]);
461        assert!(ava.is_null(1));
462        assert_eq!(ava.value(2), values[1]);
463        assert_eq!(ava.value(3), values[0]);
464
465        builder.append_value(values[0]);
468        builder.append_value(values[0]);
469        builder.append_value(values[1]);
470        array = builder.finish();
471
472        assert_eq!(array.len(), 8);
473        assert_eq!(array.null_count(), 0);
474        assert_eq!(array.logical_null_count(), 1);
475
476        assert_eq!(array.run_ends().values(), &[1, 2, 4, 7, 8]);
477
478        let av2 = array.values();
480        let ava2: &GenericByteArray<T> =
481            av2.as_any().downcast_ref::<GenericByteArray<T>>().unwrap();
482
483        assert_eq!(ava2.value(0), values[0]);
484        assert!(ava2.is_null(1));
485        assert_eq!(ava2.value(2), values[1]);
486        assert_eq!(ava2.value(3), values[0]);
488        assert_eq!(ava2.value(4), values[1]);
489    }
490
491    #[test]
492    fn test_string_run_builder_finish_cloned() {
493        test_bytes_run_builder_finish_cloned::<Utf8Type>(vec!["abc", "def", "ghi"]);
494    }
495
496    #[test]
497    fn test_binary_run_builder_finish_cloned() {
498        test_bytes_run_builder_finish_cloned::<BinaryType>(vec![b"abc", b"def", b"ghi"]);
499    }
500
501    #[test]
502    fn test_extend() {
503        let mut builder = StringRunBuilder::<Int32Type>::new();
504        builder.extend(["a", "a", "a", "", "", "b", "b"].into_iter().map(Some));
505        builder.extend(["b", "cupcakes", "cupcakes"].into_iter().map(Some));
506        let array = builder.finish();
507
508        assert_eq!(array.len(), 10);
509        assert_eq!(array.run_ends().values(), &[3, 5, 8, 10]);
510
511        let str_array = array.values().as_string::<i32>();
512        assert_eq!(str_array.value(0), "a");
513        assert_eq!(str_array.value(1), "");
514        assert_eq!(str_array.value(2), "b");
515        assert_eq!(str_array.value(3), "cupcakes");
516    }
517}