1use num_traits::AsPrimitive;
5use num_traits::NumCast;
6use vortex_array::ArrayRef;
7use vortex_array::ArrayView;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::ToCanonical;
11use vortex_array::arrays::PrimitiveArray;
12use vortex_array::arrays::dict::TakeExecute;
13use vortex_array::match_each_integer_ptype;
14use vortex_array::search_sorted::SearchResult;
15use vortex_array::search_sorted::SearchSorted;
16use vortex_array::search_sorted::SearchSortedSide;
17use vortex_array::validity::Validity;
18use vortex_buffer::Buffer;
19use vortex_error::VortexResult;
20use vortex_error::vortex_bail;
21
22use crate::RunEnd;
23use crate::array::RunEndArrayExt;
24
25impl TakeExecute for RunEnd {
26 #[expect(
27 clippy::cast_possible_truncation,
28 reason = "index cast to usize inside macro"
29 )]
30 fn take(
31 array: ArrayView<'_, Self>,
32 indices: &ArrayRef,
33 ctx: &mut ExecutionCtx,
34 ) -> VortexResult<Option<ArrayRef>> {
35 let primitive_indices = indices.clone().execute::<PrimitiveArray>(ctx)?;
36
37 let checked_indices = match_each_integer_ptype!(primitive_indices.ptype(), |P| {
38 primitive_indices
39 .as_slice::<P>()
40 .iter()
41 .copied()
42 .map(|idx| {
43 let usize_idx = idx as usize;
44 if usize_idx >= array.len() {
45 vortex_bail!(OutOfBounds: usize_idx, 0, array.len());
46 }
47 Ok(usize_idx)
48 })
49 .collect::<VortexResult<Vec<_>>>()?
50 });
51
52 let indices_validity = primitive_indices.validity()?;
53 take_indices_unchecked(array, &checked_indices, &indices_validity).map(Some)
54 }
55}
56
57pub fn take_indices_unchecked<T: AsPrimitive<usize>>(
59 array: ArrayView<'_, RunEnd>,
60 indices: &[T],
61 validity: &Validity,
62) -> VortexResult<ArrayRef> {
63 let ends = array.ends().to_primitive();
64 let ends_len = ends.len();
65
66 let physical_indices = match_each_integer_ptype!(ends.ptype(), |I| {
68 let end_slices = ends.as_slice::<I>();
69 let physical_indices_vec: Vec<u64> = indices
70 .iter()
71 .map(|idx| idx.as_() + array.offset())
72 .map(|idx| {
73 match <I as NumCast>::from(idx) {
74 Some(idx) => end_slices.search_sorted(&idx, SearchSortedSide::Right),
75 None => {
76 Ok(SearchResult::NotFound(ends_len))
78 }
79 }
80 })
81 .map(|result| result.map(|r| r.to_ends_index(ends_len) as u64))
82 .collect::<VortexResult<Vec<_>>>()?;
83 let buffer = Buffer::from(physical_indices_vec);
84
85 PrimitiveArray::new(buffer, validity.clone())
86 });
87
88 array.values().take(physical_indices.into_array())
89}
90
91#[cfg(test)]
92mod test {
93 use rstest::rstest;
94 use vortex_array::ArrayRef;
95 use vortex_array::Canonical;
96 use vortex_array::IntoArray;
97 use vortex_array::LEGACY_SESSION;
98 use vortex_array::VortexSessionExecute;
99 use vortex_array::arrays::PrimitiveArray;
100 use vortex_array::assert_arrays_eq;
101 use vortex_array::compute::conformance::take::test_take_conformance;
102 use vortex_buffer::buffer;
103
104 use crate::RunEnd;
105 use crate::RunEndArray;
106
107 fn ree_array() -> RunEndArray {
108 RunEnd::encode(buffer![1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5].into_array()).unwrap()
109 }
110
111 #[test]
112 fn ree_take() {
113 let taken = ree_array().take(buffer![9, 8, 1, 3].into_array()).unwrap();
114 let expected = PrimitiveArray::from_iter(vec![5i32, 5, 1, 4]).into_array();
115 assert_arrays_eq!(taken, expected);
116 }
117
118 #[test]
119 fn ree_take_end() {
120 let taken = ree_array().take(buffer![11].into_array()).unwrap();
121 let expected = PrimitiveArray::from_iter(vec![5i32]).into_array();
122 assert_arrays_eq!(taken, expected);
123 }
124
125 #[test]
126 #[should_panic]
127 fn ree_take_out_of_bounds() {
128 let _array = ree_array()
129 .take(buffer![12].into_array())
130 .unwrap()
131 .execute::<Canonical>(&mut LEGACY_SESSION.create_execution_ctx())
132 .unwrap();
133 }
134
135 #[test]
136 fn sliced_take() {
137 let sliced = ree_array().slice(4..9).unwrap();
138 let taken = sliced.take(buffer![1, 3, 4].into_array()).unwrap();
139
140 let expected = PrimitiveArray::from_iter(vec![4i32, 2, 5]).into_array();
141 assert_arrays_eq!(taken, expected);
142 }
143
144 #[test]
145 fn ree_take_nullable() {
146 let taken = ree_array()
147 .take(PrimitiveArray::from_option_iter([Some(1), None]).into_array())
148 .unwrap();
149
150 let expected = PrimitiveArray::from_option_iter([Some(1i32), None]);
151 assert_arrays_eq!(taken, expected.into_array());
152 }
153
154 #[rstest]
155 #[case(ree_array())]
156 #[case(RunEnd::encode(
157 buffer![1u8, 1, 2, 2, 2, 3, 3, 3, 3, 4].into_array(),
158 ).unwrap())]
159 #[case(RunEnd::encode(
160 PrimitiveArray::from_option_iter([
161 Some(10),
162 Some(10),
163 None,
164 None,
165 Some(20),
166 Some(20),
167 Some(20),
168 ])
169 .into_array(),
170 ).unwrap())]
171 #[case(RunEnd::encode(buffer![42i32, 42, 42, 42, 42].into_array())
172 .unwrap())]
173 #[case(RunEnd::encode(
174 buffer![1i32, 2, 3, 4, 5, 6, 7, 8, 9, 10].into_array(),
175 ).unwrap())]
176 #[case({
177 let mut values = Vec::new();
178 for i in 0..20 {
179 for _ in 0..=i {
180 values.push(i);
181 }
182 }
183 RunEnd::encode(PrimitiveArray::from_iter(values).into_array()).unwrap()
184 })]
185 fn test_take_runend_conformance(#[case] array: RunEndArray) {
186 test_take_conformance(&array.into_array());
187 }
188
189 #[rstest]
190 #[case(ree_array().slice(3..6).unwrap())]
191 #[case({
192 let array = RunEnd::encode(
193 buffer![1i32, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3].into_array(),
194 )
195 .unwrap();
196 array.slice(2..8).unwrap()
197 })]
198 fn test_take_sliced_runend_conformance(#[case] sliced: ArrayRef) {
199 test_take_conformance(&sliced);
200 }
201}