lance_io/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3use std::{
4    ops::{Range, RangeFrom, RangeFull, RangeTo},
5    sync::Arc,
6};
7
8use arrow::datatypes::UInt32Type;
9use arrow_array::{PrimitiveArray, UInt32Array};
10use snafu::location;
11
12use lance_core::{Error, Result};
13
14pub mod encodings;
15pub mod ffi;
16pub mod local;
17pub mod object_reader;
18pub mod object_store;
19pub mod object_writer;
20pub mod scheduler;
21pub mod stream;
22#[cfg(test)]
23pub mod testing;
24pub mod traits;
25pub mod utils;
26
27pub use scheduler::{bytes_read_counter, iops_counter};
28
29/// Defines a selection of rows to read from a file/batch
30#[derive(Debug, Clone, PartialEq)]
31pub enum ReadBatchParams {
32    /// Select a contiguous range of rows
33    Range(Range<usize>),
34    /// Select multiple contiguous ranges of rows
35    Ranges(Arc<[Range<u64>]>),
36    /// Select all rows (this is the default)
37    RangeFull,
38    /// Select all rows up to a given index
39    RangeTo(RangeTo<usize>),
40    /// Select all rows starting at a given index
41    RangeFrom(RangeFrom<usize>),
42    /// Select scattered non-contiguous rows
43    Indices(UInt32Array),
44}
45
46impl std::fmt::Display for ReadBatchParams {
47    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
48        match self {
49            Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
50            Self::Ranges(ranges) => {
51                let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
52                    acc.push_str(&format!("{}..{}", r.start, r.end));
53                    acc.push(',');
54                    acc
55                });
56                // Remove the trailing comma
57                if !ranges_str.is_empty() {
58                    ranges_str.pop();
59                }
60                write!(f, "Ranges({})", ranges_str)
61            }
62            Self::RangeFull => write!(f, "RangeFull"),
63            Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
64            Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
65            Self::Indices(indices) => {
66                let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
67                    acc.push_str(&v.to_string());
68                    acc.push(',');
69                    acc
70                });
71                if !indices_str.is_empty() {
72                    indices_str.pop();
73                }
74                write!(f, "Indices({})", indices_str)
75            }
76        }
77    }
78}
79
80impl Default for ReadBatchParams {
81    fn default() -> Self {
82        // Default of ReadBatchParams is reading the full batch.
83        Self::RangeFull
84    }
85}
86
87impl From<&[u32]> for ReadBatchParams {
88    fn from(value: &[u32]) -> Self {
89        Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
90    }
91}
92
93impl From<UInt32Array> for ReadBatchParams {
94    fn from(value: UInt32Array) -> Self {
95        Self::Indices(value)
96    }
97}
98
99impl From<RangeFull> for ReadBatchParams {
100    fn from(_: RangeFull) -> Self {
101        Self::RangeFull
102    }
103}
104
105impl From<Range<usize>> for ReadBatchParams {
106    fn from(r: Range<usize>) -> Self {
107        Self::Range(r)
108    }
109}
110
111impl From<RangeTo<usize>> for ReadBatchParams {
112    fn from(r: RangeTo<usize>) -> Self {
113        Self::RangeTo(r)
114    }
115}
116
117impl From<RangeFrom<usize>> for ReadBatchParams {
118    fn from(r: RangeFrom<usize>) -> Self {
119        Self::RangeFrom(r)
120    }
121}
122
123impl From<&Self> for ReadBatchParams {
124    fn from(params: &Self) -> Self {
125        params.clone()
126    }
127}
128
129impl ReadBatchParams {
130    /// Validate that the selection is valid given the length of the batch
131    pub fn valid_given_len(&self, len: usize) -> bool {
132        match self {
133            Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
134            Self::Range(r) => r.start < len && r.end <= len,
135            Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
136            Self::RangeFull => true,
137            Self::RangeTo(r) => r.end <= len,
138            Self::RangeFrom(r) => r.start < len,
139        }
140    }
141
142    /// Slice the selection
143    ///
144    /// For example, given ReadBatchParams::RangeFull and slice(10, 20), the output will be
145    /// ReadBatchParams::Range(10..20)
146    ///
147    /// Given ReadBatchParams::Range(10..20) and slice(5, 3), the output will be
148    /// ReadBatchParams::Range(15..18)
149    ///
150    /// Given ReadBatchParams::RangeTo(20) and slice(10, 5), the output will be
151    /// ReadBatchParams::Range(10..15)
152    ///
153    /// Given ReadBatchParams::RangeFrom(20) and slice(10, 5), the output will be
154    /// ReadBatchParams::Range(30..35)
155    ///
156    /// Given ReadBatchParams::Indices([1, 3, 5, 7, 9]) and slice(1, 3), the output will be
157    /// ReadBatchParams::Indices([3, 5, 7])
158    ///
159    /// You cannot slice beyond the bounds of the selection and an attempt to do so will
160    /// return an error.
161    pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
162        let out_of_bounds = |size: usize| {
163            Err(Error::InvalidInput {
164                source: format!(
165                    "Cannot slice from {} with length {} given a selection of size {}",
166                    start, length, size
167                )
168                .into(),
169                location: location!(),
170            })
171        };
172
173        match self {
174            Self::Indices(indices) => {
175                if start + length > indices.len() {
176                    return out_of_bounds(indices.len());
177                }
178                Ok(Self::Indices(indices.slice(start, length)))
179            }
180            Self::Range(r) => {
181                if (r.start + start + length) > r.end {
182                    return out_of_bounds(r.end - r.start);
183                }
184                Ok(Self::Range((r.start + start)..(r.start + start + length)))
185            }
186            Self::Ranges(ranges) => {
187                let mut new_ranges = Vec::with_capacity(ranges.len());
188                let mut to_skip = start as u64;
189                let mut to_take = length as u64;
190                let mut total_num_rows = 0;
191                for r in ranges.as_ref() {
192                    let num_rows = r.end - r.start;
193                    total_num_rows += num_rows;
194                    if to_skip > num_rows {
195                        to_skip -= num_rows;
196                        continue;
197                    }
198                    let new_start = r.start + to_skip;
199                    let to_take_this_range = (num_rows - to_skip).min(to_take);
200                    new_ranges.push(new_start..(new_start + to_take_this_range));
201                    to_skip = 0;
202                    to_take -= to_take_this_range;
203                    if to_take == 0 {
204                        break;
205                    }
206                }
207                if to_take > 0 {
208                    out_of_bounds(total_num_rows as usize)
209                } else {
210                    Ok(Self::Ranges(new_ranges.into()))
211                }
212            }
213            Self::RangeFull => Ok(Self::Range(start..(start + length))),
214            Self::RangeTo(range) => {
215                if start + length > range.end {
216                    return out_of_bounds(range.end);
217                }
218                Ok(Self::Range(start..(start + length)))
219            }
220            Self::RangeFrom(r) => {
221                // No way to validate out_of_bounds, assume caller will do so
222                Ok(Self::Range((r.start + start)..(r.start + start + length)))
223            }
224        }
225    }
226
227    /// Convert a read range into a vector of row offsets
228    ///
229    /// RangeFull and RangeFrom are unbounded and cannot be converted into row offsets
230    /// and any attempt to do so will return an error.  Call slice first
231    pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
232        match self {
233            Self::Indices(indices) => Ok(indices.clone()),
234            Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
235                r.start as u32..r.end as u32,
236            ))),
237            Self::Ranges(ranges) => {
238                let num_rows = ranges
239                    .iter()
240                    .map(|r| (r.end - r.start) as usize)
241                    .sum::<usize>();
242                let mut offsets = Vec::with_capacity(num_rows);
243                for r in ranges.as_ref() {
244                    offsets.extend(r.start as u32..r.end as u32);
245                }
246                Ok(UInt32Array::from(offsets))
247            }
248            Self::RangeFull => Err(Error::invalid_input(
249                "cannot materialize RangeFull",
250                location!(),
251            )),
252            Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
253            Self::RangeFrom(_) => Err(Error::invalid_input(
254                "cannot materialize RangeFrom",
255                location!(),
256            )),
257        }
258    }
259
260    pub fn iter_offset_ranges<'a>(
261        &'a self,
262    ) -> Result<Box<dyn Iterator<Item = Range<u32>> + Send + 'a>> {
263        match self {
264            Self::Indices(indices) => Ok(Box::new(indices.values().iter().map(|i| *i..(*i + 1)))),
265            Self::Range(r) => Ok(Box::new(std::iter::once(r.start as u32..r.end as u32))),
266            Self::Ranges(ranges) => Ok(Box::new(
267                ranges.iter().map(|r| r.start as u32..r.end as u32),
268            )),
269            Self::RangeFull => Err(Error::invalid_input(
270                "cannot materialize RangeFull",
271                location!(),
272            )),
273            Self::RangeTo(r) => Ok(Box::new(std::iter::once(0..r.end as u32))),
274            Self::RangeFrom(_) => Err(Error::invalid_input(
275                "cannot materialize RangeFrom",
276                location!(),
277            )),
278        }
279    }
280
281    /// Convert a read range into a vector of row ranges
282    pub fn to_ranges(&self) -> Result<Vec<Range<u64>>> {
283        match self {
284            Self::Indices(indices) => Ok(indices
285                .values()
286                .iter()
287                .map(|i| *i as u64..(*i + 1) as u64)
288                .collect()),
289            Self::Range(r) => Ok(vec![r.start as u64..r.end as u64]),
290            Self::Ranges(ranges) => Ok(ranges.to_vec()),
291            Self::RangeFull => Err(Error::invalid_input(
292                "cannot materialize RangeFull",
293                location!(),
294            )),
295            Self::RangeTo(r) => Ok(vec![0..r.end as u64]),
296            Self::RangeFrom(_) => Err(Error::invalid_input(
297                "cannot materialize RangeFrom",
298                location!(),
299            )),
300        }
301    }
302
303    /// Same thing as to_offsets but the caller knows the total number of rows in the file
304    ///
305    /// This makes it possible to materialize RangeFull / RangeFrom
306    pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
307        match self {
308            Self::Indices(indices) => indices.clone(),
309            Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
310            Self::Ranges(ranges) => {
311                let num_rows = ranges
312                    .iter()
313                    .map(|r| (r.end - r.start) as usize)
314                    .sum::<usize>();
315                let mut offsets = Vec::with_capacity(num_rows);
316                for r in ranges.as_ref() {
317                    offsets.extend(r.start as u32..r.end as u32);
318                }
319                UInt32Array::from(offsets)
320            }
321            Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
322            Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
323            Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
324        }
325    }
326}
327
328#[cfg(test)]
329mod test {
330    use std::ops::{RangeFrom, RangeTo};
331
332    use arrow_array::UInt32Array;
333
334    use crate::ReadBatchParams;
335
336    #[test]
337    fn test_params_slice() {
338        let params = ReadBatchParams::Ranges(vec![0..15, 20..40].into());
339        let sliced = params.slice(10, 10).unwrap();
340        assert_eq!(sliced, ReadBatchParams::Ranges(vec![10..15, 20..25].into()));
341    }
342
343    #[test]
344    fn test_params_to_offsets() {
345        let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
346            let offsets = params
347                .slice(base_offset, length)
348                .unwrap()
349                .to_offsets()
350                .unwrap();
351            let expected = UInt32Array::from(expected);
352            assert_eq!(offsets, expected);
353        };
354
355        check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
356        check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
357        check(
358            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
359            0,
360            100,
361            (500..600).collect(),
362        );
363        check(
364            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
365            100,
366            100,
367            (600..700).collect(),
368        );
369        check(
370            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
371            0,
372            100,
373            (0..100).collect(),
374        );
375        check(
376            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
377            200,
378            100,
379            (200..300).collect(),
380        );
381        check(
382            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
383            0,
384            2,
385            vec![1, 3],
386        );
387        check(
388            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
389            2,
390            2,
391            vec![5, 7],
392        );
393
394        let check_error = |params: ReadBatchParams, base_offset, length| {
395            assert!(params.slice(base_offset, length).is_err());
396        };
397
398        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
399        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
400        check_error(ReadBatchParams::Range(0..10), 5, 6);
401        check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
402
403        assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
404        assert!(ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
405            .to_offsets()
406            .is_err());
407    }
408}