1use std::ops::Range;
5
6use vortex_array::arrays::ConstantArray;
7use vortex_array::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
8use vortex_array::vtable::OperationsVTable;
9use vortex_array::{Array, ArrayRef, IntoArray};
10use vortex_scalar::{PValue, Scalar};
11
12use crate::{RunEndArray, RunEndVTable};
13
14impl OperationsVTable<RunEndVTable> for RunEndVTable {
15 fn slice(array: &RunEndArray, range: Range<usize>) -> ArrayRef {
16 let new_length = range.len();
17
18 let slice_begin = array.find_physical_index(range.start);
19 let slice_end = find_slice_end_index(array.ends(), range.end + array.offset());
20
21 if slice_begin + 1 == slice_end {
23 let value = array.values().scalar_at(slice_begin);
24 return ConstantArray::new(value, new_length).into_array();
25 }
26
27 unsafe {
29 RunEndArray::new_unchecked(
30 array.ends().slice(slice_begin..slice_end),
31 array.values().slice(slice_begin..slice_end),
32 range.start + array.offset(),
33 new_length,
34 )
35 .into_array()
36 }
37 }
38
39 fn scalar_at(array: &RunEndArray, index: usize) -> Scalar {
40 array.values().scalar_at(array.find_physical_index(index))
41 }
42}
43
44pub(crate) fn find_slice_end_index(array: &dyn Array, index: usize) -> usize {
49 let result = array
50 .as_primitive_typed()
51 .search_sorted(&PValue::from(index), SearchSortedSide::Right);
52 match result {
53 SearchResult::Found(i) => i,
54 SearchResult::NotFound(i) => {
55 if i == array.len() {
56 i
57 } else {
58 i + 1
59 }
60 }
61 }
62}
63
64#[cfg(test)]
65mod tests {
66
67 use vortex_array::arrays::PrimitiveArray;
68 use vortex_array::{Array, IntoArray, assert_arrays_eq};
69 use vortex_buffer::buffer;
70 use vortex_dtype::{DType, Nullability, PType};
71
72 use crate::RunEndArray;
73
74 #[test]
75 fn slice_array() {
76 let arr = RunEndArray::try_new(
77 buffer![2u32, 5, 10].into_array(),
78 buffer![1i32, 2, 3].into_array(),
79 )
80 .unwrap()
81 .slice(3..8);
82 assert_eq!(
83 arr.dtype(),
84 &DType::Primitive(PType::I32, Nullability::NonNullable)
85 );
86 assert_eq!(arr.len(), 5);
87
88 let expected = PrimitiveArray::from_iter(vec![2i32, 2, 3, 3, 3]).into_array();
89 assert_arrays_eq!(arr, expected);
90 }
91
92 #[test]
93 fn double_slice() {
94 let arr = RunEndArray::try_new(
95 buffer![2u32, 5, 10].into_array(),
96 buffer![1i32, 2, 3].into_array(),
97 )
98 .unwrap()
99 .slice(3..8);
100 assert_eq!(arr.len(), 5);
101
102 let doubly_sliced = arr.slice(0..3);
103
104 let expected = PrimitiveArray::from_iter(vec![2i32, 2, 3]).into_array();
105 assert_arrays_eq!(doubly_sliced, expected);
106 }
107
108 #[test]
109 fn slice_end_inclusive() {
110 let arr = RunEndArray::try_new(
111 buffer![2u32, 5, 10].into_array(),
112 buffer![1i32, 2, 3].into_array(),
113 )
114 .unwrap()
115 .slice(4..10);
116 assert_eq!(
117 arr.dtype(),
118 &DType::Primitive(PType::I32, Nullability::NonNullable)
119 );
120 assert_eq!(arr.len(), 6);
121
122 let expected = PrimitiveArray::from_iter(vec![2i32, 3, 3, 3, 3, 3]).into_array();
123 assert_arrays_eq!(arr, expected);
124 }
125
126 #[test]
127 fn slice_at_end() {
128 let re_array = RunEndArray::try_new(
129 buffer![7_u64, 10].into_array(),
130 buffer![2_u64, 3].into_array(),
131 )
132 .unwrap();
133
134 assert_eq!(re_array.len(), 10);
135
136 let sliced_array = re_array.slice(re_array.len()..re_array.len());
137 assert!(sliced_array.is_empty());
138 }
139
140 #[test]
141 fn slice_single_end() {
142 let re_array = RunEndArray::try_new(
143 buffer![7_u64, 10].into_array(),
144 buffer![2_u64, 3].into_array(),
145 )
146 .unwrap();
147
148 assert_eq!(re_array.len(), 10);
149
150 let sliced_array = re_array.slice(2..5);
151
152 assert!(sliced_array.is_constant())
153 }
154
155 #[test]
156 fn ree_scalar_at_end() {
157 let scalar = RunEndArray::encode(buffer![1, 1, 1, 4, 4, 4, 2, 2, 5, 5, 5, 5].into_array())
158 .unwrap()
159 .scalar_at(11);
160 assert_eq!(scalar, 5.into());
161 }
162
163 #[test]
164 #[allow(clippy::cognitive_complexity)]
165 fn slice_along_run_boundaries() {
166 let arr = RunEndArray::try_new(
169 buffer![3u32, 6, 8, 12].into_array(),
170 buffer![1i32, 4, 2, 5].into_array(),
171 )
172 .unwrap();
173
174 let slice1 = arr.slice(0..3);
176 assert_eq!(slice1.len(), 3);
177 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 1]).into_array();
178 assert_arrays_eq!(slice1, expected);
179
180 let slice2 = arr.slice(3..6);
182 assert_eq!(slice2.len(), 3);
183 let expected = PrimitiveArray::from_iter(vec![4i32, 4, 4]).into_array();
184 assert_arrays_eq!(slice2, expected);
185
186 let slice3 = arr.slice(6..8);
188 assert_eq!(slice3.len(), 2);
189 let expected = PrimitiveArray::from_iter(vec![2i32, 2]).into_array();
190 assert_arrays_eq!(slice3, expected);
191
192 let slice4 = arr.slice(8..12);
194 assert_eq!(slice4.len(), 4);
195 let expected = PrimitiveArray::from_iter(vec![5i32, 5, 5, 5]).into_array();
196 assert_arrays_eq!(slice4, expected);
197
198 let slice5 = arr.slice(3..8);
200 assert_eq!(slice5.len(), 5);
201 let expected = PrimitiveArray::from_iter(vec![4i32, 4, 4, 2, 2]).into_array();
202 assert_arrays_eq!(slice5, expected);
203
204 let slice6 = arr.slice(1..6);
206 assert_eq!(slice6.len(), 5);
207 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 4, 4, 4]).into_array();
208 assert_arrays_eq!(slice6, expected);
209
210 let slice7 = arr.slice(3..7);
212 assert_eq!(slice7.len(), 4);
213 let expected = PrimitiveArray::from_iter(vec![4i32, 4, 4, 2]).into_array();
214 assert_arrays_eq!(slice7, expected);
215 }
216}