1use std::{
4 ops::{Range, RangeFrom, RangeFull, RangeTo},
5 sync::Arc,
6};
7
8use arrow::datatypes::UInt32Type;
9use arrow_array::{PrimitiveArray, UInt32Array};
10
11use lance_core::{Error, Result};
12
13pub mod encodings;
14pub mod ffi;
15pub mod local;
16pub mod object_reader;
17pub mod object_store;
18pub mod object_writer;
19pub mod scheduler;
20pub mod stream;
21#[cfg(test)]
22pub mod testing;
23pub mod traits;
24#[cfg(target_os = "linux")]
25pub mod uring;
26pub mod utils;
27
28pub use scheduler::{bytes_read_counter, iops_counter};
29
30#[derive(Debug, Clone, PartialEq, Default)]
32pub enum ReadBatchParams {
33 Range(Range<usize>),
35 Ranges(Arc<[Range<u64>]>),
37 #[default]
39 RangeFull,
40 RangeTo(RangeTo<usize>),
42 RangeFrom(RangeFrom<usize>),
44 Indices(UInt32Array),
46}
47
48impl std::fmt::Display for ReadBatchParams {
49 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
50 match self {
51 Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
52 Self::Ranges(ranges) => {
53 let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
54 acc.push_str(&format!("{}..{}", r.start, r.end));
55 acc.push(',');
56 acc
57 });
58 if !ranges_str.is_empty() {
60 ranges_str.pop();
61 }
62 write!(f, "Ranges({})", ranges_str)
63 }
64 Self::RangeFull => write!(f, "RangeFull"),
65 Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
66 Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
67 Self::Indices(indices) => {
68 let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
69 acc.push_str(&v.to_string());
70 acc.push(',');
71 acc
72 });
73 if !indices_str.is_empty() {
74 indices_str.pop();
75 }
76 write!(f, "Indices({})", indices_str)
77 }
78 }
79 }
80}
81
82impl From<&[u32]> for ReadBatchParams {
83 fn from(value: &[u32]) -> Self {
84 Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
85 }
86}
87
88impl From<UInt32Array> for ReadBatchParams {
89 fn from(value: UInt32Array) -> Self {
90 Self::Indices(value)
91 }
92}
93
94impl From<RangeFull> for ReadBatchParams {
95 fn from(_: RangeFull) -> Self {
96 Self::RangeFull
97 }
98}
99
100impl From<Range<usize>> for ReadBatchParams {
101 fn from(r: Range<usize>) -> Self {
102 Self::Range(r)
103 }
104}
105
106impl From<RangeTo<usize>> for ReadBatchParams {
107 fn from(r: RangeTo<usize>) -> Self {
108 Self::RangeTo(r)
109 }
110}
111
112impl From<RangeFrom<usize>> for ReadBatchParams {
113 fn from(r: RangeFrom<usize>) -> Self {
114 Self::RangeFrom(r)
115 }
116}
117
118impl From<&Self> for ReadBatchParams {
119 fn from(params: &Self) -> Self {
120 params.clone()
121 }
122}
123
124impl ReadBatchParams {
125 pub fn valid_given_len(&self, len: usize) -> bool {
127 match self {
128 Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
129 Self::Range(r) => r.start < len && r.end <= len,
130 Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
131 Self::RangeFull => true,
132 Self::RangeTo(r) => r.end <= len,
133 Self::RangeFrom(r) => r.start < len,
134 }
135 }
136
137 pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
157 let out_of_bounds = |size: usize| {
158 Err(Error::invalid_input_source(
159 format!(
160 "Cannot slice from {} with length {} given a selection of size {}",
161 start, length, size
162 )
163 .into(),
164 ))
165 };
166
167 match self {
168 Self::Indices(indices) => {
169 if start + length > indices.len() {
170 return out_of_bounds(indices.len());
171 }
172 Ok(Self::Indices(indices.slice(start, length)))
173 }
174 Self::Range(r) => {
175 if (r.start + start + length) > r.end {
176 return out_of_bounds(r.end - r.start);
177 }
178 Ok(Self::Range((r.start + start)..(r.start + start + length)))
179 }
180 Self::Ranges(ranges) => {
181 let mut new_ranges = Vec::with_capacity(ranges.len());
182 let mut to_skip = start as u64;
183 let mut to_take = length as u64;
184 let mut total_num_rows = 0;
185 for r in ranges.as_ref() {
186 let num_rows = r.end - r.start;
187 total_num_rows += num_rows;
188 if to_skip > num_rows {
189 to_skip -= num_rows;
190 continue;
191 }
192 let new_start = r.start + to_skip;
193 let to_take_this_range = (num_rows - to_skip).min(to_take);
194 new_ranges.push(new_start..(new_start + to_take_this_range));
195 to_skip = 0;
196 to_take -= to_take_this_range;
197 if to_take == 0 {
198 break;
199 }
200 }
201 if to_take > 0 {
202 out_of_bounds(total_num_rows as usize)
203 } else {
204 Ok(Self::Ranges(new_ranges.into()))
205 }
206 }
207 Self::RangeFull => Ok(Self::Range(start..(start + length))),
208 Self::RangeTo(range) => {
209 if start + length > range.end {
210 return out_of_bounds(range.end);
211 }
212 Ok(Self::Range(start..(start + length)))
213 }
214 Self::RangeFrom(r) => {
215 Ok(Self::Range((r.start + start)..(r.start + start + length)))
217 }
218 }
219 }
220
221 pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
226 match self {
227 Self::Indices(indices) => Ok(indices.clone()),
228 Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
229 r.start as u32..r.end as u32,
230 ))),
231 Self::Ranges(ranges) => {
232 let num_rows = ranges
233 .iter()
234 .map(|r| (r.end - r.start) as usize)
235 .sum::<usize>();
236 let mut offsets = Vec::with_capacity(num_rows);
237 for r in ranges.as_ref() {
238 offsets.extend(r.start as u32..r.end as u32);
239 }
240 Ok(UInt32Array::from(offsets))
241 }
242 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
243 Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
244 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
245 }
246 }
247
248 pub fn iter_offset_ranges<'a>(
249 &'a self,
250 ) -> Result<Box<dyn Iterator<Item = Range<u32>> + Send + 'a>> {
251 match self {
252 Self::Indices(indices) => Ok(Box::new(indices.values().iter().map(|i| *i..(*i + 1)))),
253 Self::Range(r) => Ok(Box::new(std::iter::once(r.start as u32..r.end as u32))),
254 Self::Ranges(ranges) => Ok(Box::new(
255 ranges.iter().map(|r| r.start as u32..r.end as u32),
256 )),
257 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
258 Self::RangeTo(r) => Ok(Box::new(std::iter::once(0..r.end as u32))),
259 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
260 }
261 }
262
263 pub fn to_ranges(&self) -> Result<Vec<Range<u64>>> {
265 match self {
266 Self::Indices(indices) => Ok(indices
267 .values()
268 .iter()
269 .map(|i| *i as u64..(*i + 1) as u64)
270 .collect()),
271 Self::Range(r) => Ok(vec![r.start as u64..r.end as u64]),
272 Self::Ranges(ranges) => Ok(ranges.to_vec()),
273 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
274 Self::RangeTo(r) => Ok(vec![0..r.end as u64]),
275 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
276 }
277 }
278
279 pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
283 match self {
284 Self::Indices(indices) => indices.clone(),
285 Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
286 Self::Ranges(ranges) => {
287 let num_rows = ranges
288 .iter()
289 .map(|r| (r.end - r.start) as usize)
290 .sum::<usize>();
291 let mut offsets = Vec::with_capacity(num_rows);
292 for r in ranges.as_ref() {
293 offsets.extend(r.start as u32..r.end as u32);
294 }
295 UInt32Array::from(offsets)
296 }
297 Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
298 Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
299 Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
300 }
301 }
302}
303
304#[cfg(test)]
305mod test {
306 use std::ops::{RangeFrom, RangeTo};
307
308 use arrow_array::UInt32Array;
309
310 use crate::ReadBatchParams;
311
312 #[test]
313 fn test_params_slice() {
314 let params = ReadBatchParams::Ranges(vec![0..15, 20..40].into());
315 let sliced = params.slice(10, 10).unwrap();
316 assert_eq!(sliced, ReadBatchParams::Ranges(vec![10..15, 20..25].into()));
317 }
318
319 #[test]
320 fn test_params_to_offsets() {
321 let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
322 let offsets = params
323 .slice(base_offset, length)
324 .unwrap()
325 .to_offsets()
326 .unwrap();
327 let expected = UInt32Array::from(expected);
328 assert_eq!(offsets, expected);
329 };
330
331 check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
332 check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
333 check(
334 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
335 0,
336 100,
337 (500..600).collect(),
338 );
339 check(
340 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
341 100,
342 100,
343 (600..700).collect(),
344 );
345 check(
346 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
347 0,
348 100,
349 (0..100).collect(),
350 );
351 check(
352 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
353 200,
354 100,
355 (200..300).collect(),
356 );
357 check(
358 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
359 0,
360 2,
361 vec![1, 3],
362 );
363 check(
364 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
365 2,
366 2,
367 vec![5, 7],
368 );
369
370 let check_error = |params: ReadBatchParams, base_offset, length| {
371 assert!(params.slice(base_offset, length).is_err());
372 };
373
374 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
375 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
376 check_error(ReadBatchParams::Range(0..10), 5, 6);
377 check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
378
379 assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
380 assert!(
381 ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
382 .to_offsets()
383 .is_err()
384 );
385 }
386}