arrow-buffer 58.0.0

Buffer abstractions for Apache Arrow
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::ArrowNativeType;
use crate::buffer::ScalarBuffer;

/// A buffer of monotonically increasing, positive integers used to store run-ends.
///
/// Used to compactly represent runs of the same value. Values being represented
/// are stored in a separate buffer from this struct. See [`RunArray`] for an example
/// of how this is used with a companion array to represent the values.
///
/// # Logical vs Physical
///
/// Physically, each value in the `run_ends` buffer is the cumulative length of
/// all runs in the logical representation, up to that physical index. Consider
/// the following example:
///
/// ```text
///           physical                        logical
///     ┌─────────┬─────────┐           ┌─────────┬─────────┐
///     │    3    │    0    │ ◄──────┬─ │    A    │    0    │
///     ├─────────┼─────────┤        │  ├─────────┼─────────┤
///     │    4    │    1    │ ◄────┐ ├─ │    A    │    1    │
///     ├─────────┼─────────┤      │ │  ├─────────┼─────────┤
///     │    6    │    2    │ ◄──┐ │ └─ │    A    │    2    │
///     └─────────┴─────────┘    │ │    ├─────────┼─────────┤
///      run-ends    index       │ └─── │    B    │    3    │
///                              │      ├─────────┼─────────┤
///      logical_offset = 0      ├───── │    C    │    4    │
///      logical_length = 6      │      ├─────────┼─────────┤
///                              └───── │    C    │    5    │
///                                     └─────────┴─────────┘
///                                       values     index
/// ```
///
/// A [`RunEndBuffer`] is physically the buffer and offset with length on the left.
/// In this case, the offset and length represent the whole buffer, so it is essentially
/// unsliced. See the section below on slicing for more details on how this buffer
/// handles slicing.
///
/// This means that multiple logical values are represented in the same physical index,
/// and multiple logical indices map to the same physical index. The [`RunEndBuffer`]
/// containing `[3, 4, 6]` is essentially the physical indices `[0, 0, 0, 1, 2, 2]`,
/// and having a separately stored buffer of values such as `[A, B, C]` can turn
/// this into a representation of `[A, A, A, B, C, C]`.
///
/// # Slicing
///
/// In order to provide zero-copy slicing, this struct stores a separate **logical**
/// offset and length. Consider the following example:
///
/// ```text
///           physical                        logical
///     ┌─────────┬─────────┐           ┌ ─ ─ ─ ─ ┬ ─ ─ ─ ─ ┐
///     │    3    │    0    │ ◄──────┐       A         0
///     ├─────────┼─────────┤        │  ├── ─ ─ ─ ┼ ─ ─ ─ ─ ┤
///     │    4    │    1    │ ◄────┐ │       A         1
///     ├─────────┼─────────┤      │ │  ├─────────┼─────────┤
///     │    6    │    2    │ ◄──┐ │ └─ │    A    │    2    │◄─── logical_offset
///     └─────────┴─────────┘    │ │    ├─────────┼─────────┤
///      run-ends    index       │ └─── │    B    │    3    │
///                              │      ├─────────┼─────────┤
///      logical_offset = 2      └───── │    C    │    4    │
///      logical_length = 3             ├─────────┼─────────┤
///                                          C         5     ◄─── logical_offset + logical_length
///                                     └ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ┘
///                                       values     index
/// ```
///
/// The physical `run_ends` [`ScalarBuffer`] remains unchanged, in order to facilitate
/// zero-copy. However, we now offset into the **logical** representation with an
/// accompanying length. This allows us to represent values `[A, B, C]` using physical
/// indices `0, 1, 2` with the same underlying physical buffer, at the cost of two
/// extra `usize`s to represent the logical slice that was taken.
///
/// (A [`RunEndBuffer`] is considered unsliced when `logical_offset` is `0` and
/// `logical_length` is equal to the last value in `run_ends`)
///
/// [`RunArray`]: https://docs.rs/arrow/latest/arrow/array/struct.RunArray.html
/// [Run-End encoded layout]: https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout
#[derive(Debug, Clone)]
pub struct RunEndBuffer<E: ArrowNativeType> {
    run_ends: ScalarBuffer<E>,
    logical_length: usize,
    logical_offset: usize,
}

impl<E> RunEndBuffer<E>
where
    E: ArrowNativeType,
{
    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
    /// and `logical_length`.
    ///
    /// # Panics
    ///
    /// - `run_ends` does not contain strictly increasing values greater than zero
    /// - The last value of `run_ends` is less than `logical_offset + logical_length`
    pub fn new(run_ends: ScalarBuffer<E>, logical_offset: usize, logical_length: usize) -> Self {
        assert!(
            run_ends.windows(2).all(|w| w[0] < w[1]),
            "run-ends not strictly increasing"
        );

        if logical_length != 0 {
            assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
            let end = E::from_usize(logical_offset.saturating_add(logical_length)).unwrap();
            assert!(
                *run_ends.first().unwrap() > E::usize_as(0),
                "run-ends not greater than 0"
            );
            assert!(
                *run_ends.last().unwrap() >= end,
                "slice beyond bounds of run-ends"
            );
        }

        Self {
            run_ends,
            logical_offset,
            logical_length,
        }
    }

    /// Create a new [`RunEndBuffer`] from a [`ScalarBuffer`], `logical_offset`
    /// and `logical_length`.
    ///
    /// # Safety
    ///
    /// - `run_ends` must contain strictly increasing values greater than zero
    /// - The last value of `run_ends` must be greater than or equal to `logical_offset + logical_len`
    pub unsafe fn new_unchecked(
        run_ends: ScalarBuffer<E>,
        logical_offset: usize,
        logical_length: usize,
    ) -> Self {
        Self {
            run_ends,
            logical_offset,
            logical_length,
        }
    }

    /// Returns the logical offset into the run-ends stored by this buffer.
    #[inline]
    pub fn offset(&self) -> usize {
        self.logical_offset
    }

    /// Returns the logical length of the run-ends stored by this buffer.
    #[inline]
    pub fn len(&self) -> usize {
        self.logical_length
    }

    /// Returns true if this buffer is logically empty.
    #[inline]
    pub fn is_empty(&self) -> bool {
        self.logical_length == 0
    }

    /// Free up unused memory.
    pub fn shrink_to_fit(&mut self) {
        // TODO(emilk): we could shrink even more in the case where we are a small sub-slice of the full buffer
        self.run_ends.shrink_to_fit();
    }

    /// Returns the physical (**unsliced**) run ends of this buffer.
    ///
    /// Take care when operating on these values as it doesn't take into account
    /// any logical slicing that may have occurred.
    #[inline]
    pub fn values(&self) -> &[E] {
        &self.run_ends
    }

    /// Returns an iterator yielding run ends adjusted for the logical slice.
    ///
    /// Each yielded value is subtracted by the [`logical_offset`] and capped
    /// at the [`logical_length`].
    ///
    /// [`logical_offset`]: Self::offset
    /// [`logical_length`]: Self::len
    pub fn sliced_values(&self) -> impl Iterator<Item = E> + '_ {
        let offset = self.logical_offset;
        let len = self.logical_length;
        // Doing this roundabout way since the iterator type we return must be
        // the same (i.e. cannot use std::iter::empty())
        let physical_slice = if self.is_empty() {
            &self.run_ends[0..0]
        } else {
            let start = self.get_start_physical_index();
            let end = self.get_end_physical_index();
            &self.run_ends[start..=end]
        };
        physical_slice.iter().map(move |&val| {
            let val = val.as_usize().saturating_sub(offset).min(len);
            E::from_usize(val).unwrap()
        })
    }

    /// Returns the maximum run-end encoded in the underlying buffer; that is, the
    /// last physical run of the buffer. This does not take into account any logical
    /// slicing that may have occurred.
    #[inline]
    pub fn max_value(&self) -> usize {
        self.values().last().copied().unwrap_or_default().as_usize()
    }

    /// Performs a binary search to find the physical index for the given logical
    /// index.
    ///
    /// Useful for extracting the corresponding physical `run_ends` when this buffer
    /// is logically sliced.
    ///
    /// The result is arbitrary if `logical_index >= self.len()`.
    pub fn get_physical_index(&self, logical_index: usize) -> usize {
        let logical_index = E::usize_as(self.logical_offset + logical_index);
        let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();

        match self.run_ends.binary_search_by(cmp) {
            Ok(idx) => idx + 1,
            Err(idx) => idx,
        }
    }

    /// Returns the physical index at which the logical array starts.
    ///
    /// The same as calling `get_physical_index(0)` but with a fast path if the
    /// buffer is not logically sliced, in which case it always returns `0`.
    pub fn get_start_physical_index(&self) -> usize {
        if self.logical_offset == 0 || self.logical_length == 0 {
            return 0;
        }
        // Fallback to binary search
        self.get_physical_index(0)
    }

    /// Returns the physical index at which the logical array ends.
    ///
    /// The same as calling `get_physical_index(length - 1)` but with a fast path
    /// if the buffer is not logically sliced, in which case it returns `length - 1`.
    pub fn get_end_physical_index(&self) -> usize {
        if self.logical_length == 0 {
            return 0;
        }
        if self.max_value() == self.logical_offset + self.logical_length {
            return self.values().len() - 1;
        }
        // Fallback to binary search
        self.get_physical_index(self.logical_length - 1)
    }

    /// Slices this [`RunEndBuffer`] by the provided `logical_offset` and `logical_length`.
    ///
    /// # Panics
    ///
    /// - Specified slice (`logical_offset` + `logical_length`) exceeds existing
    ///   logical length
    pub fn slice(&self, logical_offset: usize, logical_length: usize) -> Self {
        assert!(
            logical_offset.saturating_add(logical_length) <= self.logical_length,
            "the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
        );
        Self {
            run_ends: self.run_ends.clone(),
            logical_offset: self.logical_offset + logical_offset,
            logical_length,
        }
    }

    /// Returns the inner [`ScalarBuffer`].
    pub fn inner(&self) -> &ScalarBuffer<E> {
        &self.run_ends
    }

    /// Returns the inner [`ScalarBuffer`], consuming self.
    pub fn into_inner(self) -> ScalarBuffer<E> {
        self.run_ends
    }

    /// Returns the physical indices corresponding to the provided logical indices.
    ///
    /// Given a slice of logical indices, this method returns a `Vec` containing the
    /// corresponding physical indices into the run-ends buffer.
    ///
    /// This method operates by iterating the logical indices in sorted order, instead of
    /// finding the physical index for each logical index using binary search via
    /// the function [`RunEndBuffer::get_physical_index`].
    ///
    /// Running benchmarks on both approaches showed that the approach used here
    /// scaled well for larger inputs.
    ///
    /// See <https://github.com/apache/arrow-rs/pull/3622#issuecomment-1407753727> for more details.
    ///
    /// # Errors
    ///
    /// If any logical index is out of bounds (>= self.len()), returns an error containing the invalid index.
    #[inline]
    pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, I>
    where
        I: ArrowNativeType,
    {
        let len = self.len();
        let offset = self.offset();

        let indices_len = logical_indices.len();

        if indices_len == 0 {
            return Ok(vec![]);
        }

        // `ordered_indices` store index into `logical_indices` and can be used
        // to iterate `logical_indices` in sorted order.
        let mut ordered_indices: Vec<usize> = (0..indices_len).collect();

        // Instead of sorting `logical_indices` directly, sort the `ordered_indices`
        // whose values are index of `logical_indices`
        ordered_indices.sort_unstable_by(|lhs, rhs| {
            logical_indices[*lhs]
                .partial_cmp(&logical_indices[*rhs])
                .unwrap()
        });

        // Return early if all the logical indices cannot be converted to physical indices.
        let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
        if largest_logical_index >= len {
            return Err(logical_indices[*ordered_indices.last().unwrap()]);
        }

        // Skip some physical indices based on offset.
        let skip_value = self.get_start_physical_index();

        let mut physical_indices = vec![0; indices_len];

        let mut ordered_index = 0_usize;
        for (physical_index, run_end) in self.values().iter().enumerate().skip(skip_value) {
            // Get the run end index (relative to offset) of current physical index
            let run_end_value = run_end.as_usize() - offset;

            // All the `logical_indices` that are less than current run end index
            // belongs to current physical index.
            while ordered_index < indices_len
                && logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
            {
                physical_indices[ordered_indices[ordered_index]] = physical_index;
                ordered_index += 1;
            }
        }

        // If there are input values >= run_ends.last_value then we'll not be able to convert
        // all logical indices to physical indices.
        if ordered_index < logical_indices.len() {
            return Err(logical_indices[ordered_indices[ordered_index]]);
        }
        Ok(physical_indices)
    }
}

#[cfg(test)]
mod tests {
    use crate::buffer::RunEndBuffer;

    #[test]
    fn test_zero_length_slice() {
        let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
        assert_eq!(buffer.get_start_physical_index(), 0);
        assert_eq!(buffer.get_end_physical_index(), 1);
        assert_eq!(buffer.get_physical_index(3), 1);

        for offset in 0..4 {
            let sliced = buffer.slice(offset, 0);
            assert_eq!(sliced.get_start_physical_index(), 0);
            assert_eq!(sliced.get_end_physical_index(), 0);
        }

        let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
        assert_eq!(buffer.get_start_physical_index(), 0);
        assert_eq!(buffer.get_end_physical_index(), 0);
    }

    #[test]
    fn test_sliced_values() {
        // [0, 0, 1, 2, 2, 2]
        let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6);

        // Slice: [0, 1, 2, 2] start: 1, len: 4
        // Logical indices: 1, 2, 3, 4
        // Original run ends: [2, 3, 6]
        // Adjusted: [2-1, 3-1, 6-1] capped at 4 -> [1, 2, 4]
        let sliced = buffer.slice(1, 4);
        let sliced_values: Vec<i32> = sliced.sliced_values().collect();
        assert_eq!(sliced_values, &[1, 2, 4]);

        // Slice: [2, 2] start: 4, len: 2
        // Original run ends: [2, 3, 6]
        // Slicing at 4 means we only have the last run (physical index 2, which ends at 6)
        // Adjusted: [6-4] capped at 2 -> [2]
        let sliced = buffer.slice(4, 2);
        let sliced_values: Vec<i32> = sliced.sliced_values().collect();
        assert_eq!(sliced_values, &[2]);
    }
}