1#![recursion_limit = "512"]
4use std::{
5 ops::{Range, RangeFrom, RangeFull, RangeTo},
6 sync::Arc,
7};
8
9use arrow::datatypes::UInt32Type;
10use arrow_array::{PrimitiveArray, UInt32Array};
11
12use lance_core::{Error, Result};
13
14pub mod encodings;
15pub mod ffi;
16pub mod local;
17pub mod object_reader;
18pub mod object_store;
19pub mod object_writer;
20pub mod scheduler;
21pub mod stream;
22#[cfg(test)]
23pub mod testing;
24pub mod traits;
25#[cfg(target_os = "linux")]
26pub mod uring;
27pub mod utils;
28
29pub use scheduler::{bytes_read_counter, iops_counter};
30
31#[derive(Debug, Clone, PartialEq, Default)]
33pub enum ReadBatchParams {
34 Range(Range<usize>),
36 Ranges(Arc<[Range<u64>]>),
38 #[default]
40 RangeFull,
41 RangeTo(RangeTo<usize>),
43 RangeFrom(RangeFrom<usize>),
45 Indices(UInt32Array),
47}
48
49impl std::fmt::Display for ReadBatchParams {
50 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
51 match self {
52 Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
53 Self::Ranges(ranges) => {
54 let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
55 acc.push_str(&format!("{}..{}", r.start, r.end));
56 acc.push(',');
57 acc
58 });
59 if !ranges_str.is_empty() {
61 ranges_str.pop();
62 }
63 write!(f, "Ranges({})", ranges_str)
64 }
65 Self::RangeFull => write!(f, "RangeFull"),
66 Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
67 Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
68 Self::Indices(indices) => {
69 let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
70 acc.push_str(&v.to_string());
71 acc.push(',');
72 acc
73 });
74 if !indices_str.is_empty() {
75 indices_str.pop();
76 }
77 write!(f, "Indices({})", indices_str)
78 }
79 }
80 }
81}
82
83impl From<&[u32]> for ReadBatchParams {
84 fn from(value: &[u32]) -> Self {
85 Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
86 }
87}
88
89impl From<UInt32Array> for ReadBatchParams {
90 fn from(value: UInt32Array) -> Self {
91 Self::Indices(value)
92 }
93}
94
95impl From<RangeFull> for ReadBatchParams {
96 fn from(_: RangeFull) -> Self {
97 Self::RangeFull
98 }
99}
100
101impl From<Range<usize>> for ReadBatchParams {
102 fn from(r: Range<usize>) -> Self {
103 Self::Range(r)
104 }
105}
106
107impl From<RangeTo<usize>> for ReadBatchParams {
108 fn from(r: RangeTo<usize>) -> Self {
109 Self::RangeTo(r)
110 }
111}
112
113impl From<RangeFrom<usize>> for ReadBatchParams {
114 fn from(r: RangeFrom<usize>) -> Self {
115 Self::RangeFrom(r)
116 }
117}
118
119impl From<&Self> for ReadBatchParams {
120 fn from(params: &Self) -> Self {
121 params.clone()
122 }
123}
124
125impl ReadBatchParams {
126 pub fn valid_given_len(&self, len: usize) -> bool {
128 match self {
129 Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
130 Self::Range(r) => r.start < len && r.end <= len,
131 Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
132 Self::RangeFull => true,
133 Self::RangeTo(r) => r.end <= len,
134 Self::RangeFrom(r) => r.start < len,
135 }
136 }
137
138 pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
158 let out_of_bounds = |size: usize| {
159 Err(Error::invalid_input_source(
160 format!(
161 "Cannot slice from {} with length {} given a selection of size {}",
162 start, length, size
163 )
164 .into(),
165 ))
166 };
167
168 match self {
169 Self::Indices(indices) => {
170 if start + length > indices.len() {
171 return out_of_bounds(indices.len());
172 }
173 Ok(Self::Indices(indices.slice(start, length)))
174 }
175 Self::Range(r) => {
176 if (r.start + start + length) > r.end {
177 return out_of_bounds(r.end - r.start);
178 }
179 Ok(Self::Range((r.start + start)..(r.start + start + length)))
180 }
181 Self::Ranges(ranges) => {
182 let mut new_ranges = Vec::with_capacity(ranges.len());
183 let mut to_skip = start as u64;
184 let mut to_take = length as u64;
185 let mut total_num_rows = 0;
186 for r in ranges.as_ref() {
187 let num_rows = r.end - r.start;
188 total_num_rows += num_rows;
189 if to_skip > num_rows {
190 to_skip -= num_rows;
191 continue;
192 }
193 let new_start = r.start + to_skip;
194 let to_take_this_range = (num_rows - to_skip).min(to_take);
195 new_ranges.push(new_start..(new_start + to_take_this_range));
196 to_skip = 0;
197 to_take -= to_take_this_range;
198 if to_take == 0 {
199 break;
200 }
201 }
202 if to_take > 0 {
203 out_of_bounds(total_num_rows as usize)
204 } else {
205 Ok(Self::Ranges(new_ranges.into()))
206 }
207 }
208 Self::RangeFull => Ok(Self::Range(start..(start + length))),
209 Self::RangeTo(range) => {
210 if start + length > range.end {
211 return out_of_bounds(range.end);
212 }
213 Ok(Self::Range(start..(start + length)))
214 }
215 Self::RangeFrom(r) => {
216 Ok(Self::Range((r.start + start)..(r.start + start + length)))
218 }
219 }
220 }
221
222 pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
227 match self {
228 Self::Indices(indices) => Ok(indices.clone()),
229 Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
230 r.start as u32..r.end as u32,
231 ))),
232 Self::Ranges(ranges) => {
233 let num_rows = ranges
234 .iter()
235 .map(|r| (r.end - r.start) as usize)
236 .sum::<usize>();
237 let mut offsets = Vec::with_capacity(num_rows);
238 for r in ranges.as_ref() {
239 offsets.extend(r.start as u32..r.end as u32);
240 }
241 Ok(UInt32Array::from(offsets))
242 }
243 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
244 Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
245 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
246 }
247 }
248
249 pub fn iter_offset_ranges<'a>(
250 &'a self,
251 ) -> Result<Box<dyn Iterator<Item = Range<u32>> + Send + 'a>> {
252 match self {
253 Self::Indices(indices) => Ok(Box::new(indices.values().iter().map(|i| *i..(*i + 1)))),
254 Self::Range(r) => Ok(Box::new(std::iter::once(r.start as u32..r.end as u32))),
255 Self::Ranges(ranges) => Ok(Box::new(
256 ranges.iter().map(|r| r.start as u32..r.end as u32),
257 )),
258 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
259 Self::RangeTo(r) => Ok(Box::new(std::iter::once(0..r.end as u32))),
260 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
261 }
262 }
263
264 pub fn to_ranges(&self) -> Result<Vec<Range<u64>>> {
266 match self {
267 Self::Indices(indices) => Ok(indices
268 .values()
269 .iter()
270 .map(|i| *i as u64..(*i + 1) as u64)
271 .collect()),
272 Self::Range(r) => Ok(vec![r.start as u64..r.end as u64]),
273 Self::Ranges(ranges) => Ok(ranges.to_vec()),
274 Self::RangeFull => Err(Error::invalid_input("cannot materialize RangeFull")),
275 Self::RangeTo(r) => Ok(vec![0..r.end as u64]),
276 Self::RangeFrom(_) => Err(Error::invalid_input("cannot materialize RangeFrom")),
277 }
278 }
279
280 pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
284 match self {
285 Self::Indices(indices) => indices.clone(),
286 Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
287 Self::Ranges(ranges) => {
288 let num_rows = ranges
289 .iter()
290 .map(|r| (r.end - r.start) as usize)
291 .sum::<usize>();
292 let mut offsets = Vec::with_capacity(num_rows);
293 for r in ranges.as_ref() {
294 offsets.extend(r.start as u32..r.end as u32);
295 }
296 UInt32Array::from(offsets)
297 }
298 Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
299 Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
300 Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
301 }
302 }
303}
304
305#[cfg(test)]
306mod test {
307 use std::ops::{RangeFrom, RangeTo};
308
309 use arrow_array::UInt32Array;
310
311 use crate::ReadBatchParams;
312
313 #[test]
314 fn test_params_slice() {
315 let params = ReadBatchParams::Ranges(vec![0..15, 20..40].into());
316 let sliced = params.slice(10, 10).unwrap();
317 assert_eq!(sliced, ReadBatchParams::Ranges(vec![10..15, 20..25].into()));
318 }
319
320 #[test]
321 fn test_params_to_offsets() {
322 let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
323 let offsets = params
324 .slice(base_offset, length)
325 .unwrap()
326 .to_offsets()
327 .unwrap();
328 let expected = UInt32Array::from(expected);
329 assert_eq!(offsets, expected);
330 };
331
332 check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
333 check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
334 check(
335 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
336 0,
337 100,
338 (500..600).collect(),
339 );
340 check(
341 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
342 100,
343 100,
344 (600..700).collect(),
345 );
346 check(
347 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
348 0,
349 100,
350 (0..100).collect(),
351 );
352 check(
353 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
354 200,
355 100,
356 (200..300).collect(),
357 );
358 check(
359 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
360 0,
361 2,
362 vec![1, 3],
363 );
364 check(
365 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
366 2,
367 2,
368 vec![5, 7],
369 );
370
371 let check_error = |params: ReadBatchParams, base_offset, length| {
372 assert!(params.slice(base_offset, length).is_err());
373 };
374
375 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
376 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
377 check_error(ReadBatchParams::Range(0..10), 5, 6);
378 check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
379
380 assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
381 assert!(
382 ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
383 .to_offsets()
384 .is_err()
385 );
386 }
387}