1use std::{
4 ops::{Range, RangeFrom, RangeFull, RangeTo},
5 sync::Arc,
6};
7
8use arrow::datatypes::UInt32Type;
9use arrow_array::{PrimitiveArray, UInt32Array};
10
11use lance_core::{Error, Result};
12
13pub mod encodings;
14pub mod ffi;
15pub mod local;
16pub mod object_reader;
17pub mod object_store;
18pub mod object_writer;
19pub mod scheduler;
20pub mod stream;
21#[cfg(test)]
22pub mod testing;
23pub mod traits;
24pub mod utils;
25
26pub use scheduler::{bytes_read_counter, iops_counter};
27
28#[derive(Debug, Clone, PartialEq, Default)]
30pub enum ReadBatchParams {
31 Range(Range<usize>),
33 Ranges(Arc<[Range<u64>]>),
35 #[default]
37 RangeFull,
38 RangeTo(RangeTo<usize>),
40 RangeFrom(RangeFrom<usize>),
42 Indices(UInt32Array),
44}
45
46impl std::fmt::Display for ReadBatchParams {
47 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
48 match self {
49 Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
50 Self::Ranges(ranges) => {
51 let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
52 acc.push_str(&format!("{}..{}", r.start, r.end));
53 acc.push(',');
54 acc
55 });
56 if !ranges_str.is_empty() {
58 ranges_str.pop();
59 }
60 write!(f, "Ranges({})", ranges_str)
61 }
62 Self::RangeFull => write!(f, "RangeFull"),
63 Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
64 Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
65 Self::Indices(indices) => {
66 let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
67 acc.push_str(&v.to_string());
68 acc.push(',');
69 acc
70 });
71 if !indices_str.is_empty() {
72 indices_str.pop();
73 }
74 write!(f, "Indices({})", indices_str)
75 }
76 }
77 }
78}
79
80impl From<&[u32]> for ReadBatchParams {
81 fn from(value: &[u32]) -> Self {
82 Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
83 }
84}
85
86impl From<UInt32Array> for ReadBatchParams {
87 fn from(value: UInt32Array) -> Self {
88 Self::Indices(value)
89 }
90}
91
92impl From<RangeFull> for ReadBatchParams {
93 fn from(_: RangeFull) -> Self {
94 Self::RangeFull
95 }
96}
97
98impl From<Range<usize>> for ReadBatchParams {
99 fn from(r: Range<usize>) -> Self {
100 Self::Range(r)
101 }
102}
103
104impl From<RangeTo<usize>> for ReadBatchParams {
105 fn from(r: RangeTo<usize>) -> Self {
106 Self::RangeTo(r)
107 }
108}
109
110impl From<RangeFrom<usize>> for ReadBatchParams {
111 fn from(r: RangeFrom<usize>) -> Self {
112 Self::RangeFrom(r)
113 }
114}
115
116impl From<&Self> for ReadBatchParams {
117 fn from(params: &Self) -> Self {
118 params.clone()
119 }
120}
121
122impl ReadBatchParams {
123 pub fn valid_given_len(&self, len: usize) -> bool {
125 match self {
126 Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
127 Self::Range(r) => r.start < len && r.end <= len,
128 Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
129 Self::RangeFull => true,
130 Self::RangeTo(r) => r.end <= len,
131 Self::RangeFrom(r) => r.start < len,
132 }
133 }
134
135 pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
155 let out_of_bounds = |size: usize| {
156 Err(Error::invalid_input_source(
157 format!(
158 "Cannot slice from {} with length {} given a selection of size {}",
159 start, length, size
160 )
161 .into(),
162 ))
163 };
164
165 match self {
166 Self::Indices(indices) => {
167 if start + length > indices.len() {
168 return out_of_bounds(indices.len());
169 }
170 Ok(Self::Indices(indices.slice(start, length)))
171 }
172 Self::Range(r) => {
173 if (r.start + start + length) > r.end {
174 return out_of_bounds(r.end - r.start);
175 }
176 Ok(Self::Range((r.start + start)..(r.start + start + length)))
177 }
178 Self::Ranges(ranges) => {
179 let mut new_ranges = Vec::with_capacity(ranges.len());
180 let mut to_skip = start as u64;
181 let mut to_take = length as u64;
182 let mut total_num_rows = 0;
183 for r in ranges.as_ref() {
184 let num_rows = r.end - r.start;
185 total_num_rows += num_rows;
186 if to_skip > num_rows {
187 to_skip -= num_rows;
188 continue;
189 }
190 let new_start = r.start + to_skip;
191 let to_take_this_range = (num_rows - to_skip).min(to_take);
192 new_ranges.push(new_start..(new_start + to_take_this_range));
193 to_skip = 0;
194 to_take -= to_take_this_range;
195 if to_take == 0 {
196 break;
197 }
198 }
199 if to_take > 0 {
200 out_of_bounds(total_num_rows as usize)
201 } else {
202 Ok(Self::Ranges(new_ranges.into()))
203 }
204 }
205 Self::RangeFull => Ok(Self::Range(start..(start + length))),
206 Self::RangeTo(range) => {
207 if start + length > range.end {
208 return out_of_bounds(range.end);
209 }
210 Ok(Self::Range(start..(start + length)))
211 }
212 Self::RangeFrom(r) => {
213 Ok(Self::Range((r.start + start)..(r.start + start + length)))
215 }
216 }
217 }
218
219 pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
224 match self {
225 Self::Indices(indices) => Ok(indices.clone()),
226 Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
227 r.start as u32..r.end as u32,
228 ))),
229 Self::Ranges(ranges) => {
230 let num_rows = ranges
231 .iter()
232 .map(|r| (r.end - r.start) as usize)
233 .sum::<usize>();
234 let mut offsets = Vec::with_capacity(num_rows);
235 for r in ranges.as_ref() {
236 offsets.extend(r.start as u32..r.end as u32);
237 }
238 Ok(UInt32Array::from(offsets))
239 }
240 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
241 Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
242 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
243 }
244 }
245
246 pub fn iter_offset_ranges<'a>(
247 &'a self,
248 ) -> Result<Box<dyn Iterator<Item = Range<u32>> + Send + 'a>> {
249 match self {
250 Self::Indices(indices) => Ok(Box::new(indices.values().iter().map(|i| *i..(*i + 1)))),
251 Self::Range(r) => Ok(Box::new(std::iter::once(r.start as u32..r.end as u32))),
252 Self::Ranges(ranges) => Ok(Box::new(
253 ranges.iter().map(|r| r.start as u32..r.end as u32),
254 )),
255 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
256 Self::RangeTo(r) => Ok(Box::new(std::iter::once(0..r.end as u32))),
257 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
258 }
259 }
260
261 pub fn to_ranges(&self) -> Result<Vec<Range<u64>>> {
263 match self {
264 Self::Indices(indices) => Ok(indices
265 .values()
266 .iter()
267 .map(|i| *i as u64..(*i + 1) as u64)
268 .collect()),
269 Self::Range(r) => Ok(vec![r.start as u64..r.end as u64]),
270 Self::Ranges(ranges) => Ok(ranges.to_vec()),
271 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
272 Self::RangeTo(r) => Ok(vec![0..r.end as u64]),
273 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
274 }
275 }
276
277 pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
281 match self {
282 Self::Indices(indices) => indices.clone(),
283 Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
284 Self::Ranges(ranges) => {
285 let num_rows = ranges
286 .iter()
287 .map(|r| (r.end - r.start) as usize)
288 .sum::<usize>();
289 let mut offsets = Vec::with_capacity(num_rows);
290 for r in ranges.as_ref() {
291 offsets.extend(r.start as u32..r.end as u32);
292 }
293 UInt32Array::from(offsets)
294 }
295 Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
296 Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
297 Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
298 }
299 }
300}
301
302#[cfg(test)]
303mod test {
304 use std::ops::{RangeFrom, RangeTo};
305
306 use arrow_array::UInt32Array;
307
308 use crate::ReadBatchParams;
309
310 #[test]
311 fn test_params_slice() {
312 let params = ReadBatchParams::Ranges(vec![0..15, 20..40].into());
313 let sliced = params.slice(10, 10).unwrap();
314 assert_eq!(sliced, ReadBatchParams::Ranges(vec![10..15, 20..25].into()));
315 }
316
317 #[test]
318 fn test_params_to_offsets() {
319 let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
320 let offsets = params
321 .slice(base_offset, length)
322 .unwrap()
323 .to_offsets()
324 .unwrap();
325 let expected = UInt32Array::from(expected);
326 assert_eq!(offsets, expected);
327 };
328
329 check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
330 check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
331 check(
332 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
333 0,
334 100,
335 (500..600).collect(),
336 );
337 check(
338 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
339 100,
340 100,
341 (600..700).collect(),
342 );
343 check(
344 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
345 0,
346 100,
347 (0..100).collect(),
348 );
349 check(
350 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
351 200,
352 100,
353 (200..300).collect(),
354 );
355 check(
356 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
357 0,
358 2,
359 vec![1, 3],
360 );
361 check(
362 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
363 2,
364 2,
365 vec![5, 7],
366 );
367
368 let check_error = |params: ReadBatchParams, base_offset, length| {
369 assert!(params.slice(base_offset, length).is_err());
370 };
371
372 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
373 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
374 check_error(ReadBatchParams::Range(0..10), 5, 6);
375 check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
376
377 assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
378 assert!(
379 ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
380 .to_offsets()
381 .is_err()
382 );
383 }
384}