Skip to main content

lance_io/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3#![recursion_limit = "512"]
4use std::{
5    ops::{Range, RangeFrom, RangeFull, RangeTo},
6    sync::Arc,
7};
8
9use arrow::datatypes::UInt32Type;
10use arrow_array::{PrimitiveArray, UInt32Array};
11
12use lance_core::{Error, Result};
13
14pub mod encodings;
15pub mod ffi;
16pub mod local;
17pub mod object_reader;
18pub mod object_store;
19pub mod object_writer;
20pub mod scheduler;
21pub mod stream;
22#[cfg(test)]
23pub mod testing;
24pub mod traits;
25#[cfg(target_os = "linux")]
26pub mod uring;
27pub mod utils;
28
29pub use scheduler::{bytes_read_counter, iops_counter};
30
31/// Defines a selection of rows to read from a file/batch
32#[derive(Debug, Clone, PartialEq, Default)]
33pub enum ReadBatchParams {
34    /// Select a contiguous range of rows
35    Range(Range<usize>),
36    /// Select multiple contiguous ranges of rows
37    Ranges(Arc<[Range<u64>]>),
38    /// Select all rows (this is the default)
39    #[default]
40    RangeFull,
41    /// Select all rows up to a given index
42    RangeTo(RangeTo<usize>),
43    /// Select all rows starting at a given index
44    RangeFrom(RangeFrom<usize>),
45    /// Select scattered non-contiguous rows
46    Indices(UInt32Array),
47}
48
49impl std::fmt::Display for ReadBatchParams {
50    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
51        match self {
52            Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
53            Self::Ranges(ranges) => {
54                let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
55                    acc.push_str(&format!("{}..{}", r.start, r.end));
56                    acc.push(',');
57                    acc
58                });
59                // Remove the trailing comma
60                if !ranges_str.is_empty() {
61                    ranges_str.pop();
62                }
63                write!(f, "Ranges({})", ranges_str)
64            }
65            Self::RangeFull => write!(f, "RangeFull"),
66            Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
67            Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
68            Self::Indices(indices) => {
69                let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
70                    acc.push_str(&v.to_string());
71                    acc.push(',');
72                    acc
73                });
74                if !indices_str.is_empty() {
75                    indices_str.pop();
76                }
77                write!(f, "Indices({})", indices_str)
78            }
79        }
80    }
81}
82
83impl From<&[u32]> for ReadBatchParams {
84    fn from(value: &[u32]) -> Self {
85        Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
86    }
87}
88
89impl From<UInt32Array> for ReadBatchParams {
90    fn from(value: UInt32Array) -> Self {
91        Self::Indices(value)
92    }
93}
94
95impl From<RangeFull> for ReadBatchParams {
96    fn from(_: RangeFull) -> Self {
97        Self::RangeFull
98    }
99}
100
101impl From<Range<usize>> for ReadBatchParams {
102    fn from(r: Range<usize>) -> Self {
103        Self::Range(r)
104    }
105}
106
107impl From<RangeTo<usize>> for ReadBatchParams {
108    fn from(r: RangeTo<usize>) -> Self {
109        Self::RangeTo(r)
110    }
111}
112
113impl From<RangeFrom<usize>> for ReadBatchParams {
114    fn from(r: RangeFrom<usize>) -> Self {
115        Self::RangeFrom(r)
116    }
117}
118
119impl From<&Self> for ReadBatchParams {
120    fn from(params: &Self) -> Self {
121        params.clone()
122    }
123}
124
125impl ReadBatchParams {
126    /// Validate that the selection is valid given the length of the batch
127    pub fn valid_given_len(&self, len: usize) -> bool {
128        match self {
129            Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
130            Self::Range(r) => r.start < len && r.end <= len,
131            Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
132            Self::RangeFull => true,
133            Self::RangeTo(r) => r.end <= len,
134            Self::RangeFrom(r) => r.start < len,
135        }
136    }
137
138    /// Slice the selection
139    ///
140    /// For example, given ReadBatchParams::RangeFull and slice(10, 20), the output will be
141    /// ReadBatchParams::Range(10..20)
142    ///
143    /// Given ReadBatchParams::Range(10..20) and slice(5, 3), the output will be
144    /// ReadBatchParams::Range(15..18)
145    ///
146    /// Given ReadBatchParams::RangeTo(20) and slice(10, 5), the output will be
147    /// ReadBatchParams::Range(10..15)
148    ///
149    /// Given ReadBatchParams::RangeFrom(20) and slice(10, 5), the output will be
150    /// ReadBatchParams::Range(30..35)
151    ///
152    /// Given ReadBatchParams::Indices([1, 3, 5, 7, 9]) and slice(1, 3), the output will be
153    /// ReadBatchParams::Indices([3, 5, 7])
154    ///
155    /// You cannot slice beyond the bounds of the selection and an attempt to do so will
156    /// return an error.
157    pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
158        let out_of_bounds = |size: usize| {
159            Err(Error::invalid_input_source(
160                format!(
161                    "Cannot slice from {} with length {} given a selection of size {}",
162                    start, length, size
163                )
164                .into(),
165            ))
166        };
167
168        match self {
169            Self::Indices(indices) => {
170                if start + length > indices.len() {
171                    return out_of_bounds(indices.len());
172                }
173                Ok(Self::Indices(indices.slice(start, length)))
174            }
175            Self::Range(r) => {
176                if (r.start + start + length) > r.end {
177                    return out_of_bounds(r.end - r.start);
178                }
179                Ok(Self::Range((r.start + start)..(r.start + start + length)))
180            }
181            Self::Ranges(ranges) => {
182                let mut new_ranges = Vec::with_capacity(ranges.len());
183                let mut to_skip = start as u64;
184                let mut to_take = length as u64;
185                let mut total_num_rows = 0;
186                for r in ranges.as_ref() {
187                    let num_rows = r.end - r.start;
188                    total_num_rows += num_rows;
189                    if to_skip > num_rows {
190                        to_skip -= num_rows;
191                        continue;
192                    }
193                    let new_start = r.start + to_skip;
194                    let to_take_this_range = (num_rows - to_skip).min(to_take);
195                    new_ranges.push(new_start..(new_start + to_take_this_range));
196                    to_skip = 0;
197                    to_take -= to_take_this_range;
198                    if to_take == 0 {
199                        break;
200                    }
201                }
202                if to_take > 0 {
203                    out_of_bounds(total_num_rows as usize)
204                } else {
205                    Ok(Self::Ranges(new_ranges.into()))
206                }
207            }
208            Self::RangeFull => Ok(Self::Range(start..(start + length))),
209            Self::RangeTo(range) => {
210                if start + length > range.end {
211                    return out_of_bounds(range.end);
212                }
213                Ok(Self::Range(start..(start + length)))
214            }
215            Self::RangeFrom(r) => {
216                // No way to validate out_of_bounds, assume caller will do so
217                Ok(Self::Range((r.start + start)..(r.start + start + length)))
218            }
219        }
220    }
221
222    /// Convert a read range into a vector of row offsets
223    ///
224    /// RangeFull and RangeFrom are unbounded and cannot be converted into row offsets
225    /// and any attempt to do so will return an error.  Call slice first
226    pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
227        match self {
228            Self::Indices(indices) => Ok(indices.clone()),
229            Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
230                r.start as u32..r.end as u32,
231            ))),
232            Self::Ranges(ranges) => {
233                let num_rows = ranges
234                    .iter()
235                    .map(|r| (r.end - r.start) as usize)
236                    .sum::<usize>();
237                let mut offsets = Vec::with_capacity(num_rows);
238                for r in ranges.as_ref() {
239                    offsets.extend(r.start as u32..r.end as u32);
240                }
241                Ok(UInt32Array::from(offsets))
242            }
243            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
244            Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
245            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
246        }
247    }
248
249    pub fn iter_offset_ranges<'a>(
250        &'a self,
251    ) -> Result<Box<dyn Iterator<Item = Range<u32>> + Send + 'a>> {
252        match self {
253            Self::Indices(indices) => Ok(Box::new(indices.values().iter().map(|i| *i..(*i + 1)))),
254            Self::Range(r) => Ok(Box::new(std::iter::once(r.start as u32..r.end as u32))),
255            Self::Ranges(ranges) => Ok(Box::new(
256                ranges.iter().map(|r| r.start as u32..r.end as u32),
257            )),
258            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
259            Self::RangeTo(r) => Ok(Box::new(std::iter::once(0..r.end as u32))),
260            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
261        }
262    }
263
264    /// Convert a read range into a vector of row ranges
265    pub fn to_ranges(&self) -> Result<Vec<Range<u64>>> {
266        match self {
267            Self::Indices(indices) => Ok(indices
268                .values()
269                .iter()
270                .map(|i| *i as u64..(*i + 1) as u64)
271                .collect()),
272            Self::Range(r) => Ok(vec![r.start as u64..r.end as u64]),
273            Self::Ranges(ranges) => Ok(ranges.to_vec()),
274            Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
275            Self::RangeTo(r) => Ok(vec![0..r.end as u64]),
276            Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
277        }
278    }
279
280    /// Same thing as to_offsets but the caller knows the total number of rows in the file
281    ///
282    /// This makes it possible to materialize RangeFull / RangeFrom
283    pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
284        match self {
285            Self::Indices(indices) => indices.clone(),
286            Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
287            Self::Ranges(ranges) => {
288                let num_rows = ranges
289                    .iter()
290                    .map(|r| (r.end - r.start) as usize)
291                    .sum::<usize>();
292                let mut offsets = Vec::with_capacity(num_rows);
293                for r in ranges.as_ref() {
294                    offsets.extend(r.start as u32..r.end as u32);
295                }
296                UInt32Array::from(offsets)
297            }
298            Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
299            Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
300            Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
301        }
302    }
303}
304
305#[cfg(test)]
306mod test {
307    use std::ops::{RangeFrom, RangeTo};
308
309    use arrow_array::UInt32Array;
310
311    use crate::ReadBatchParams;
312
313    #[test]
314    fn test_params_slice() {
315        let params = ReadBatchParams::Ranges(vec![0..15, 20..40].into());
316        let sliced = params.slice(10, 10).unwrap();
317        assert_eq!(sliced, ReadBatchParams::Ranges(vec![10..15, 20..25].into()));
318    }
319
320    #[test]
321    fn test_params_to_offsets() {
322        let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
323            let offsets = params
324                .slice(base_offset, length)
325                .unwrap()
326                .to_offsets()
327                .unwrap();
328            let expected = UInt32Array::from(expected);
329            assert_eq!(offsets, expected);
330        };
331
332        check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
333        check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
334        check(
335            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
336            0,
337            100,
338            (500..600).collect(),
339        );
340        check(
341            ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
342            100,
343            100,
344            (600..700).collect(),
345        );
346        check(
347            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
348            0,
349            100,
350            (0..100).collect(),
351        );
352        check(
353            ReadBatchParams::RangeTo(RangeTo { end: 800 }),
354            200,
355            100,
356            (200..300).collect(),
357        );
358        check(
359            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
360            0,
361            2,
362            vec![1, 3],
363        );
364        check(
365            ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
366            2,
367            2,
368            vec![5, 7],
369        );
370
371        let check_error = |params: ReadBatchParams, base_offset, length| {
372            assert!(params.slice(base_offset, length).is_err());
373        };
374
375        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
376        check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
377        check_error(ReadBatchParams::Range(0..10), 5, 6);
378        check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
379
380        assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
381        assert!(
382            ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
383                .to_offsets()
384                .is_err()
385        );
386    }
387}