Skip to main content

lance_io/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3use std::{
4    ops::{Range, RangeFrom, RangeFull, RangeTo},
5    sync::Arc,
6};
7
8use arrow::datatypes::UInt32Type;
9use arrow_array::{PrimitiveArray, UInt32Array};
10
11use lance_core::{Error, Result};
12
13pub mod encodings;
14pub mod ffi;
15pub mod local;
16pub mod object_reader;
17pub mod object_store;
18pub mod object_writer;
19pub mod scheduler;
20pub mod stream;
21#[cfg(test)]
22pub mod testing;
23pub mod traits;
24#[cfg(target_os = "linux")]
25pub mod uring;
26pub mod utils;
27
28pub use scheduler::{bytes_read_counter, iops_counter};
29
30/// Defines a selection of rows to read from a file/batch
31#[derive(Debug, Clone, PartialEq, Default)]
32pub enum ReadBatchParams {
33    /// Select a contiguous range of rows
34    Range(Range<usize>),
35    /// Select multiple contiguous ranges of rows
36    Ranges(Arc<[Range<u64>]>),
37    /// Select all rows (this is the default)
38    #[default]
39    RangeFull,
40    /// Select all rows up to a given index
41    RangeTo(RangeTo<usize>),
42    /// Select all rows starting at a given index
43    RangeFrom(RangeFrom<usize>),
44    /// Select scattered non-contiguous rows
45    Indices(UInt32Array),
46}
47
48impl std::fmt::Display for ReadBatchParams {
49    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
50        match self {
51            Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
52            Self::Ranges(ranges) => {
53                let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
54                    acc.push_str(&format!("{}..{}", r.start, r.end));
55                    acc.push(',');
56                    acc
57                });
58                // Remove the trailing comma
59                if !ranges_str.is_empty() {
60                    ranges_str.pop();
61                }
62                write!(f, "Ranges({})", ranges_str)
63            }
64            Self::RangeFull => write!(f, "RangeFull"),
65            Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
66            Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
67            Self::Indices(indices) => {
68                let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
69                    acc.push_str(&v.to_string());
70                    acc.push(',');
71                    acc
72                });
73                if !indices_str.is_empty() {
74                    indices_str.pop();
75                }
76                write!(f, "Indices({})", indices_str)
77            }
78        }
79    }
80}
81
82impl From<&[u32]> for ReadBatchParams {
83    fn from(value: &[u32]) -> Self {
84        Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
85    }
86}
87
88impl From<UInt32Array> for ReadBatchParams {
89    fn from(value: UInt32Array) -> Self {
90        Self::Indices(value)
91    }
92}
93
94impl From<RangeFull> for ReadBatchParams {
95    fn from(_: RangeFull) -> Self {
96        Self::RangeFull
97    }
98}
99
100impl From<Range<usize>> for ReadBatchParams {
101    fn from(r: Range<usize>) -> Self {
102        Self::Range(r)
103    }
104}
105
106impl From<RangeTo<usize>> for ReadBatchParams {
107    fn from(r: RangeTo<usize>) -> Self {
108        Self::RangeTo(r)
109    }
110}
111
112impl From<RangeFrom<usize>> for ReadBatchParams {
113    fn from(r: RangeFrom<usize>) -> Self {
114        Self::RangeFrom(r)
115    }
116}
117
118impl From<&Self> for ReadBatchParams {
119    fn from(params: &Self) -> Self {
120        params.clone()
121    }
122}
123
124impl ReadBatchParams {
125    /// Validate that the selection is valid given the length of the batch
126    pub fn valid_given_len(&self, len: usize) -> bool {
127        match self {
128            Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
129            Self::Range(r) => r.start < len && r.end <= len,
130            Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
131            Self::RangeFull => true,
132            Self::RangeTo(r) => r.end <= len,
133            Self::RangeFrom(r) => r.start < len,
134        }
135    }
136
137    /// Slice the selection
138    ///
139    /// For example, given ReadBatchParams::RangeFull and slice(10, 20), the output will be
140    /// ReadBatchParams::Range(10..20)
141    ///
142    /// Given ReadBatchParams::Range(10..20) and slice(5, 3), the output will be
143    /// ReadBatchParams::Range(15..18)
144    ///
145    /// Given ReadBatchParams::RangeTo(20) and slice(10, 5), the output will be
146    /// ReadBatchParams::Range(10..15)
147    ///
148    /// Given ReadBatchParams::RangeFrom(20) and slice(10, 5), the output will be
149    /// ReadBatchParams::Range(30..35)
150    ///
151    /// Given ReadBatchParams::Indices([1, 3, 5, 7, 9]) and slice(1, 3), the output will be
152    /// ReadBatchParams::Indices([3, 5, 7])
153    ///
154    /// You cannot slice beyond the bounds of the selection and an attempt to do so will
155    /// return an error.
156    pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
157        let out_of_bounds = |size: usize| {
158            Err(Error::invalid_input_source(
159                format!(
160                    "Cannot slice from {} with length {} given a selection of size {}",
161                    start, length, size
162                )
163                .into(),
164            ))
165        };
166
167        match self {
168            Self::Indices(indices) => {
169                if start + length > indices.len() {
170                    return out_of_bounds(indices.len());
171                }
172                Ok(Self::Indices(indices.slice(start, length)))
173            }
174            Self::Range(r) => {
175                if (r.start + start + length) > r.end {
176                    return out_of_bounds(r.end - r.start);
177                }
178                Ok(Self::Range((r.start + start)..(r.start + start + length)))
179            }
180            Self::Ranges(ranges) => {
181                let mut new_ranges = Vec::with_capacity(ranges.len());
182                let mut to_skip = start as u64;
183                let mut to_take = length as u64;
184                let mut total_num_rows = 0;
185                for r in ranges.as_ref() {
186                    let num_rows = r.end - r.start;
187                    total_num_rows += num_rows;
188                    if to_skip > num_rows {
189                        to_skip -= num_rows;
190                        continue;
191                    }
192                    let new_start = r.start + to_skip;
193                    let to_take_this_range = (num_rows - to_skip).min(to_take);
194                    new_ranges.push(new_start..(new_start + to_take_this_range));
195                    to_skip = 0;
196                    to_take -= to_take_this_range;
197                    if to_take == 0 {
198                        break;
199                    }
200                }
201                if to_take > 0 {
202                    out_of_bounds(total_num_rows as usize)
203                } else {
204                    Ok(Self::Ranges(new_ranges.into()))
205                }
206            }
207            Self::RangeFull => Ok(Self::Range(start..(start + length))),
208            Self::RangeTo(range) => {
209                if start + length > range.end {
210                    return out_of_bounds(range.end);
211                }
212                Ok(Self::Range(start..(start + length)))
213            }
214            Self::RangeFrom(r) => {
215                // No way to validate out_of_bounds, assume caller will do so
216                Ok(Self::Range((r.start + start)..(r.start + start + length)))
217            }
218        }
219    }
220
221    /// Convert a read range into a vector of row offsets
222    ///
223    /// RangeFull and RangeFrom are unbounded and cannot be converted into row offsets
224    /// and any attempt to do so will return an error.  Call slice first
225    pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
226        match self {
227            Self::Indices(indices) => Ok(indices.clone()),
228            Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
229                r.start as u32..r.end as u32,
230            ))),
231            Self::Ranges(ranges) => {
232                let num_rows = ranges
233                    .iter()
234                    .map(|r| (r.end - r.start) as usize)
235                    .sum::<usize>();
236                let mut offsets = Vec::with_capacity(num_rows);
237                for r in ranges.as_ref() {
238                    offsets.extend(r.start as u32..r.end as u32);
239                }
240                Ok(UInt32Array::from(offsets))
241            }
242            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
243            Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
244            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
245        }
246    }
247
248    pub fn iter_offset_ranges<'a>(
249        &'a self,
250    ) -> Result<Box<dyn Iterator<Item = Range<u32>> + Send + 'a>> {
251        match self {
252            Self::Indices(indices) => Ok(Box::new(indices.values().iter().map(|i| *i..(*i + 1)))),
253            Self::Range(r) => Ok(Box::new(std::iter::once(r.start as u32..r.end as u32))),
254            Self::Ranges(ranges) => Ok(Box::new(
255                ranges.iter().map(|r| r.start as u32..r.end as u32),
256            )),
257            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
258            Self::RangeTo(r) => Ok(Box::new(std::iter::once(0..r.end as u32))),
259            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
260        }
261    }
262
263    /// Convert a read range into a vector of row ranges
264    pub fn to_ranges(&self) -> Result<Vec<Range<u64>>> {
265        match self {
266            Self::Indices(indices) => Ok(indices
267                .values()
268                .iter()
269                .map(|i| *i as u64..(*i + 1) as u64)
270                .collect()),
271            Self::Range(r) => Ok(vec![r.start as u64..r.end as u64]),
272            Self::Ranges(ranges) => Ok(ranges.to_vec()),
273            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
274            Self::RangeTo(r) => Ok(vec![0..r.end as u64]),
275            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
276        }
277    }
278
279    /// Same thing as to_offsets but the caller knows the total number of rows in the file
280    ///
281    /// This makes it possible to materialize RangeFull / RangeFrom
282    pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
283        match self {
284            Self::Indices(indices) => indices.clone(),
285            Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
286            Self::Ranges(ranges) => {
287                let num_rows = ranges
288                    .iter()
289                    .map(|r| (r.end - r.start) as usize)
290                    .sum::<usize>();
291                let mut offsets = Vec::with_capacity(num_rows);
292                for r in ranges.as_ref() {
293                    offsets.extend(r.start as u32..r.end as u32);
294                }
295                UInt32Array::from(offsets)
296            }
297            Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
298            Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
299            Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
300        }
301    }
302}
303
304#[cfg(test)]
305mod test {
306    use std::ops::{RangeFrom, RangeTo};
307
308    use arrow_array::UInt32Array;
309
310    use crate::ReadBatchParams;
311
312    #[test]
313    fn test_params_slice() {
314        let params = ReadBatchParams::Ranges(vec![0..15, 20..40].into());
315        let sliced = params.slice(10, 10).unwrap();
316        assert_eq!(sliced, ReadBatchParams::Ranges(vec![10..15, 20..25].into()));
317    }
318
319    #[test]
320    fn test_params_to_offsets() {
321        let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
322            let offsets = params
323                .slice(base_offset, length)
324                .unwrap()
325                .to_offsets()
326                .unwrap();
327            let expected = UInt32Array::from(expected);
328            assert_eq!(offsets, expected);
329        };
330
331        check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
332        check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
333        check(
334            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
335            0,
336            100,
337            (500..600).collect(),
338        );
339        check(
340            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
341            100,
342            100,
343            (600..700).collect(),
344        );
345        check(
346            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
347            0,
348            100,
349            (0..100).collect(),
350        );
351        check(
352            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
353            200,
354            100,
355            (200..300).collect(),
356        );
357        check(
358            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
359            0,
360            2,
361            vec![1, 3],
362        );
363        check(
364            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
365            2,
366            2,
367            vec![5, 7],
368        );
369
370        let check_error = |params: ReadBatchParams, base_offset, length| {
371            assert!(params.slice(base_offset, length).is_err());
372        };
373
374        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
375        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
376        check_error(ReadBatchParams::Range(0..10), 5, 6);
377        check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
378
379        assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
380        assert!(
381            ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
382                .to_offsets()
383                .is_err()
384        );
385    }
386}