Skip to main content

lance_io/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3use std::{
4    ops::{Range, RangeFrom, RangeFull, RangeTo},
5    sync::Arc,
6};
7
8use arrow::datatypes::UInt32Type;
9use arrow_array::{PrimitiveArray, UInt32Array};
10
11use lance_core::{Error, Result};
12
13pub mod encodings;
14pub mod ffi;
15pub mod local;
16pub mod object_reader;
17pub mod object_store;
18pub mod object_writer;
19pub mod scheduler;
20pub mod stream;
21#[cfg(test)]
22pub mod testing;
23pub mod traits;
24pub mod utils;
25
26pub use scheduler::{bytes_read_counter, iops_counter};
27
28/// Defines a selection of rows to read from a file/batch
29#[derive(Debug, Clone, PartialEq, Default)]
30pub enum ReadBatchParams {
31    /// Select a contiguous range of rows
32    Range(Range<usize>),
33    /// Select multiple contiguous ranges of rows
34    Ranges(Arc<[Range<u64>]>),
35    /// Select all rows (this is the default)
36    #[default]
37    RangeFull,
38    /// Select all rows up to a given index
39    RangeTo(RangeTo<usize>),
40    /// Select all rows starting at a given index
41    RangeFrom(RangeFrom<usize>),
42    /// Select scattered non-contiguous rows
43    Indices(UInt32Array),
44}
45
46impl std::fmt::Display for ReadBatchParams {
47    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
48        match self {
49            Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
50            Self::Ranges(ranges) => {
51                let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
52                    acc.push_str(&format!("{}..{}", r.start, r.end));
53                    acc.push(',');
54                    acc
55                });
56                // Remove the trailing comma
57                if !ranges_str.is_empty() {
58                    ranges_str.pop();
59                }
60                write!(f, "Ranges({})", ranges_str)
61            }
62            Self::RangeFull => write!(f, "RangeFull"),
63            Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
64            Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
65            Self::Indices(indices) => {
66                let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
67                    acc.push_str(&v.to_string());
68                    acc.push(',');
69                    acc
70                });
71                if !indices_str.is_empty() {
72                    indices_str.pop();
73                }
74                write!(f, "Indices({})", indices_str)
75            }
76        }
77    }
78}
79
80impl From<&[u32]> for ReadBatchParams {
81    fn from(value: &[u32]) -> Self {
82        Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
83    }
84}
85
86impl From<UInt32Array> for ReadBatchParams {
87    fn from(value: UInt32Array) -> Self {
88        Self::Indices(value)
89    }
90}
91
92impl From<RangeFull> for ReadBatchParams {
93    fn from(_: RangeFull) -> Self {
94        Self::RangeFull
95    }
96}
97
98impl From<Range<usize>> for ReadBatchParams {
99    fn from(r: Range<usize>) -> Self {
100        Self::Range(r)
101    }
102}
103
104impl From<RangeTo<usize>> for ReadBatchParams {
105    fn from(r: RangeTo<usize>) -> Self {
106        Self::RangeTo(r)
107    }
108}
109
110impl From<RangeFrom<usize>> for ReadBatchParams {
111    fn from(r: RangeFrom<usize>) -> Self {
112        Self::RangeFrom(r)
113    }
114}
115
116impl From<&Self> for ReadBatchParams {
117    fn from(params: &Self) -> Self {
118        params.clone()
119    }
120}
121
122impl ReadBatchParams {
123    /// Validate that the selection is valid given the length of the batch
124    pub fn valid_given_len(&self, len: usize) -> bool {
125        match self {
126            Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
127            Self::Range(r) => r.start < len && r.end <= len,
128            Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
129            Self::RangeFull => true,
130            Self::RangeTo(r) => r.end <= len,
131            Self::RangeFrom(r) => r.start < len,
132        }
133    }
134
135    /// Slice the selection
136    ///
137    /// For example, given ReadBatchParams::RangeFull and slice(10, 20), the output will be
138    /// ReadBatchParams::Range(10..20)
139    ///
140    /// Given ReadBatchParams::Range(10..20) and slice(5, 3), the output will be
141    /// ReadBatchParams::Range(15..18)
142    ///
143    /// Given ReadBatchParams::RangeTo(20) and slice(10, 5), the output will be
144    /// ReadBatchParams::Range(10..15)
145    ///
146    /// Given ReadBatchParams::RangeFrom(20) and slice(10, 5), the output will be
147    /// ReadBatchParams::Range(30..35)
148    ///
149    /// Given ReadBatchParams::Indices([1, 3, 5, 7, 9]) and slice(1, 3), the output will be
150    /// ReadBatchParams::Indices([3, 5, 7])
151    ///
152    /// You cannot slice beyond the bounds of the selection and an attempt to do so will
153    /// return an error.
154    pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
155        let out_of_bounds = |size: usize| {
156            Err(Error::invalid_input_source(
157                format!(
158                    "Cannot slice from {} with length {} given a selection of size {}",
159                    start, length, size
160                )
161                .into(),
162            ))
163        };
164
165        match self {
166            Self::Indices(indices) => {
167                if start + length > indices.len() {
168                    return out_of_bounds(indices.len());
169                }
170                Ok(Self::Indices(indices.slice(start, length)))
171            }
172            Self::Range(r) => {
173                if (r.start + start + length) > r.end {
174                    return out_of_bounds(r.end - r.start);
175                }
176                Ok(Self::Range((r.start + start)..(r.start + start + length)))
177            }
178            Self::Ranges(ranges) => {
179                let mut new_ranges = Vec::with_capacity(ranges.len());
180                let mut to_skip = start as u64;
181                let mut to_take = length as u64;
182                let mut total_num_rows = 0;
183                for r in ranges.as_ref() {
184                    let num_rows = r.end - r.start;
185                    total_num_rows += num_rows;
186                    if to_skip > num_rows {
187                        to_skip -= num_rows;
188                        continue;
189                    }
190                    let new_start = r.start + to_skip;
191                    let to_take_this_range = (num_rows - to_skip).min(to_take);
192                    new_ranges.push(new_start..(new_start + to_take_this_range));
193                    to_skip = 0;
194                    to_take -= to_take_this_range;
195                    if to_take == 0 {
196                        break;
197                    }
198                }
199                if to_take > 0 {
200                    out_of_bounds(total_num_rows as usize)
201                } else {
202                    Ok(Self::Ranges(new_ranges.into()))
203                }
204            }
205            Self::RangeFull => Ok(Self::Range(start..(start + length))),
206            Self::RangeTo(range) => {
207                if start + length > range.end {
208                    return out_of_bounds(range.end);
209                }
210                Ok(Self::Range(start..(start + length)))
211            }
212            Self::RangeFrom(r) => {
213                // No way to validate out_of_bounds, assume caller will do so
214                Ok(Self::Range((r.start + start)..(r.start + start + length)))
215            }
216        }
217    }
218
219    /// Convert a read range into a vector of row offsets
220    ///
221    /// RangeFull and RangeFrom are unbounded and cannot be converted into row offsets
222    /// and any attempt to do so will return an error.  Call slice first
223    pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
224        match self {
225            Self::Indices(indices) => Ok(indices.clone()),
226            Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
227                r.start as u32..r.end as u32,
228            ))),
229            Self::Ranges(ranges) => {
230                let num_rows = ranges
231                    .iter()
232                    .map(|r| (r.end - r.start) as usize)
233                    .sum::<usize>();
234                let mut offsets = Vec::with_capacity(num_rows);
235                for r in ranges.as_ref() {
236                    offsets.extend(r.start as u32..r.end as u32);
237                }
238                Ok(UInt32Array::from(offsets))
239            }
240            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
241            Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
242            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
243        }
244    }
245
246    pub fn iter_offset_ranges<'a>(
247        &'a self,
248    ) -> Result<Box<dyn Iterator<Item = Range<u32>> + Send + 'a>> {
249        match self {
250            Self::Indices(indices) => Ok(Box::new(indices.values().iter().map(|i| *i..(*i + 1)))),
251            Self::Range(r) => Ok(Box::new(std::iter::once(r.start as u32..r.end as u32))),
252            Self::Ranges(ranges) => Ok(Box::new(
253                ranges.iter().map(|r| r.start as u32..r.end as u32),
254            )),
255            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
256            Self::RangeTo(r) => Ok(Box::new(std::iter::once(0..r.end as u32))),
257            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
258        }
259    }
260
261    /// Convert a read range into a vector of row ranges
262    pub fn to_ranges(&self) -> Result<Vec<Range<u64>>> {
263        match self {
264            Self::Indices(indices) => Ok(indices
265                .values()
266                .iter()
267                .map(|i| *i as u64..(*i + 1) as u64)
268                .collect()),
269            Self::Range(r) => Ok(vec![r.start as u64..r.end as u64]),
270            Self::Ranges(ranges) => Ok(ranges.to_vec()),
271            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
272            Self::RangeTo(r) => Ok(vec![0..r.end as u64]),
273            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
274        }
275    }
276
277    /// Same thing as to_offsets but the caller knows the total number of rows in the file
278    ///
279    /// This makes it possible to materialize RangeFull / RangeFrom
280    pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
281        match self {
282            Self::Indices(indices) => indices.clone(),
283            Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
284            Self::Ranges(ranges) => {
285                let num_rows = ranges
286                    .iter()
287                    .map(|r| (r.end - r.start) as usize)
288                    .sum::<usize>();
289                let mut offsets = Vec::with_capacity(num_rows);
290                for r in ranges.as_ref() {
291                    offsets.extend(r.start as u32..r.end as u32);
292                }
293                UInt32Array::from(offsets)
294            }
295            Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
296            Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
297            Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
298        }
299    }
300}
301
302#[cfg(test)]
303mod test {
304    use std::ops::{RangeFrom, RangeTo};
305
306    use arrow_array::UInt32Array;
307
308    use crate::ReadBatchParams;
309
310    #[test]
311    fn test_params_slice() {
312        let params = ReadBatchParams::Ranges(vec![0..15, 20..40].into());
313        let sliced = params.slice(10, 10).unwrap();
314        assert_eq!(sliced, ReadBatchParams::Ranges(vec![10..15, 20..25].into()));
315    }
316
317    #[test]
318    fn test_params_to_offsets() {
319        let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
320            let offsets = params
321                .slice(base_offset, length)
322                .unwrap()
323                .to_offsets()
324                .unwrap();
325            let expected = UInt32Array::from(expected);
326            assert_eq!(offsets, expected);
327        };
328
329        check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
330        check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
331        check(
332            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
333            0,
334            100,
335            (500..600).collect(),
336        );
337        check(
338            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
339            100,
340            100,
341            (600..700).collect(),
342        );
343        check(
344            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
345            0,
346            100,
347            (0..100).collect(),
348        );
349        check(
350            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
351            200,
352            100,
353            (200..300).collect(),
354        );
355        check(
356            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
357            0,
358            2,
359            vec![1, 3],
360        );
361        check(
362            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
363            2,
364            2,
365            vec![5, 7],
366        );
367
368        let check_error = |params: ReadBatchParams, base_offset, length| {
369            assert!(params.slice(base_offset, length).is_err());
370        };
371
372        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
373        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
374        check_error(ReadBatchParams::Range(0..10), 5, 6);
375        check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
376
377        assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
378        assert!(
379            ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
380                .to_offsets()
381                .is_err()
382        );
383    }
384}