1use std::{
4 ops::{Range, RangeFrom, RangeFull, RangeTo},
5 sync::Arc,
6};
7
8use arrow::datatypes::UInt32Type;
9use arrow_array::{PrimitiveArray, UInt32Array};
10use snafu::location;
11
12use lance_core::{Error, Result};
13
14pub mod encodings;
15pub mod ffi;
16pub mod local;
17pub mod object_reader;
18pub mod object_store;
19pub mod object_writer;
20pub mod scheduler;
21pub mod stream;
22#[cfg(test)]
23pub mod testing;
24pub mod traits;
25pub mod utils;
26
27pub use scheduler::{bytes_read_counter, iops_counter};
28
29#[derive(Debug, Clone)]
31pub enum ReadBatchParams {
32 Range(Range<usize>),
34 Ranges(Arc<[Range<u64>]>),
36 RangeFull,
38 RangeTo(RangeTo<usize>),
40 RangeFrom(RangeFrom<usize>),
42 Indices(UInt32Array),
44}
45
46impl std::fmt::Display for ReadBatchParams {
47 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
48 match self {
49 Self::Range(r) => write!(f, "Range({}..{})", r.start, r.end),
50 Self::Ranges(ranges) => {
51 let mut ranges_str = ranges.iter().fold(String::new(), |mut acc, r| {
52 acc.push_str(&format!("{}..{}", r.start, r.end));
53 acc.push(',');
54 acc
55 });
56 if !ranges_str.is_empty() {
58 ranges_str.pop();
59 }
60 write!(f, "Ranges({})", ranges_str)
61 }
62 Self::RangeFull => write!(f, "RangeFull"),
63 Self::RangeTo(r) => write!(f, "RangeTo({})", r.end),
64 Self::RangeFrom(r) => write!(f, "RangeFrom({})", r.start),
65 Self::Indices(indices) => {
66 let mut indices_str = indices.values().iter().fold(String::new(), |mut acc, v| {
67 acc.push_str(&v.to_string());
68 acc.push(',');
69 acc
70 });
71 if !indices_str.is_empty() {
72 indices_str.pop();
73 }
74 write!(f, "Indices({})", indices_str)
75 }
76 }
77 }
78}
79
80impl Default for ReadBatchParams {
81 fn default() -> Self {
82 Self::RangeFull
84 }
85}
86
87impl From<&[u32]> for ReadBatchParams {
88 fn from(value: &[u32]) -> Self {
89 Self::Indices(UInt32Array::from_iter_values(value.iter().copied()))
90 }
91}
92
93impl From<UInt32Array> for ReadBatchParams {
94 fn from(value: UInt32Array) -> Self {
95 Self::Indices(value)
96 }
97}
98
99impl From<RangeFull> for ReadBatchParams {
100 fn from(_: RangeFull) -> Self {
101 Self::RangeFull
102 }
103}
104
105impl From<Range<usize>> for ReadBatchParams {
106 fn from(r: Range<usize>) -> Self {
107 Self::Range(r)
108 }
109}
110
111impl From<RangeTo<usize>> for ReadBatchParams {
112 fn from(r: RangeTo<usize>) -> Self {
113 Self::RangeTo(r)
114 }
115}
116
117impl From<RangeFrom<usize>> for ReadBatchParams {
118 fn from(r: RangeFrom<usize>) -> Self {
119 Self::RangeFrom(r)
120 }
121}
122
123impl From<&Self> for ReadBatchParams {
124 fn from(params: &Self) -> Self {
125 params.clone()
126 }
127}
128
129impl ReadBatchParams {
130 pub fn valid_given_len(&self, len: usize) -> bool {
132 match self {
133 Self::Indices(indices) => indices.iter().all(|i| i.unwrap_or(0) < len as u32),
134 Self::Range(r) => r.start < len && r.end <= len,
135 Self::Ranges(ranges) => ranges.iter().all(|r| r.end <= len as u64),
136 Self::RangeFull => true,
137 Self::RangeTo(r) => r.end <= len,
138 Self::RangeFrom(r) => r.start < len,
139 }
140 }
141
142 pub fn slice(&self, start: usize, length: usize) -> Result<Self> {
162 let out_of_bounds = |size: usize| {
163 Err(Error::InvalidInput {
164 source: format!(
165 "Cannot slice from {} with length {} given a selection of size {}",
166 start, length, size
167 )
168 .into(),
169 location: location!(),
170 })
171 };
172
173 match self {
174 Self::Indices(indices) => {
175 if start + length > indices.len() {
176 return out_of_bounds(indices.len());
177 }
178 Ok(Self::Indices(indices.slice(start, length)))
179 }
180 Self::Range(r) => {
181 if (r.start + start + length) > r.end {
182 return out_of_bounds(r.end - r.start);
183 }
184 Ok(Self::Range((r.start + start)..(r.start + start + length)))
185 }
186 Self::Ranges(ranges) => {
187 let mut new_ranges = Vec::new();
188 let mut offset = 0;
189 let mut to_skip = start as u64;
190 for r in ranges.as_ref() {
191 if offset >= (start + length) as u64 {
192 break;
193 }
194 let num_rows = r.end - r.start;
195 if to_skip > num_rows {
196 to_skip -= num_rows;
197 continue;
198 }
199 let new_start = r.start + to_skip;
200 let new_length = num_rows.min(length as u64);
201 new_ranges.push(new_start..(new_start + new_length));
202 to_skip = 0;
203 offset += new_length;
204 }
205 Ok(Self::Ranges(new_ranges.into()))
206 }
207 Self::RangeFull => Ok(Self::Range(start..(start + length))),
208 Self::RangeTo(range) => {
209 if start + length > range.end {
210 return out_of_bounds(range.end);
211 }
212 Ok(Self::Range(start..(start + length)))
213 }
214 Self::RangeFrom(r) => {
215 Ok(Self::Range((r.start + start)..(r.start + start + length)))
217 }
218 }
219 }
220
221 pub fn to_offsets(&self) -> Result<PrimitiveArray<UInt32Type>> {
226 match self {
227 Self::Indices(indices) => Ok(indices.clone()),
228 Self::Range(r) => Ok(UInt32Array::from(Vec::from_iter(
229 r.start as u32..r.end as u32,
230 ))),
231 Self::Ranges(ranges) => {
232 let num_rows = ranges
233 .iter()
234 .map(|r| (r.end - r.start) as usize)
235 .sum::<usize>();
236 let mut offsets = Vec::with_capacity(num_rows);
237 for r in ranges.as_ref() {
238 offsets.extend(r.start as u32..r.end as u32);
239 }
240 Ok(UInt32Array::from(offsets))
241 }
242 Self::RangeFull => Err(Error::invalid_input(
243 "cannot materialize RangeFull",
244 location!(),
245 )),
246 Self::RangeTo(r) => Ok(UInt32Array::from(Vec::from_iter(0..r.end as u32))),
247 Self::RangeFrom(_) => Err(Error::invalid_input(
248 "cannot materialize RangeFrom",
249 location!(),
250 )),
251 }
252 }
253
254 pub fn to_offsets_total(&self, total: u32) -> PrimitiveArray<UInt32Type> {
258 match self {
259 Self::Indices(indices) => indices.clone(),
260 Self::Range(r) => UInt32Array::from_iter_values(r.start as u32..r.end as u32),
261 Self::Ranges(ranges) => {
262 let num_rows = ranges
263 .iter()
264 .map(|r| (r.end - r.start) as usize)
265 .sum::<usize>();
266 let mut offsets = Vec::with_capacity(num_rows);
267 for r in ranges.as_ref() {
268 offsets.extend(r.start as u32..r.end as u32);
269 }
270 UInt32Array::from(offsets)
271 }
272 Self::RangeFull => UInt32Array::from_iter_values(0_u32..total),
273 Self::RangeTo(r) => UInt32Array::from_iter_values(0..r.end as u32),
274 Self::RangeFrom(r) => UInt32Array::from_iter_values(r.start as u32..total),
275 }
276 }
277}
278
279#[cfg(test)]
280mod test {
281 use std::ops::{RangeFrom, RangeTo};
282
283 use arrow_array::UInt32Array;
284
285 use crate::ReadBatchParams;
286
287 #[test]
288 fn test_params_to_offsets() {
289 let check = |params: ReadBatchParams, base_offset, length, expected: Vec<u32>| {
290 let offsets = params
291 .slice(base_offset, length)
292 .unwrap()
293 .to_offsets()
294 .unwrap();
295 let expected = UInt32Array::from(expected);
296 assert_eq!(offsets, expected);
297 };
298
299 check(ReadBatchParams::RangeFull, 0, 100, (0..100).collect());
300 check(ReadBatchParams::RangeFull, 50, 100, (50..150).collect());
301 check(
302 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
303 0,
304 100,
305 (500..600).collect(),
306 );
307 check(
308 ReadBatchParams::RangeFrom(RangeFrom { start: 500 }),
309 100,
310 100,
311 (600..700).collect(),
312 );
313 check(
314 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
315 0,
316 100,
317 (0..100).collect(),
318 );
319 check(
320 ReadBatchParams::RangeTo(RangeTo { end: 800 }),
321 200,
322 100,
323 (200..300).collect(),
324 );
325 check(
326 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
327 0,
328 2,
329 vec![1, 3],
330 );
331 check(
332 ReadBatchParams::Indices(UInt32Array::from(vec![1, 3, 5, 7, 9])),
333 2,
334 2,
335 vec![5, 7],
336 );
337
338 let check_error = |params: ReadBatchParams, base_offset, length| {
339 assert!(params.slice(base_offset, length).is_err());
340 };
341
342 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 0, 2);
343 check_error(ReadBatchParams::Indices(UInt32Array::from(vec![1])), 1, 1);
344 check_error(ReadBatchParams::Range(0..10), 5, 6);
345 check_error(ReadBatchParams::RangeTo(RangeTo { end: 10 }), 5, 6);
346
347 assert!(ReadBatchParams::RangeFull.to_offsets().is_err());
348 assert!(ReadBatchParams::RangeFrom(RangeFrom { start: 10 })
349 .to_offsets()
350 .is_err());
351 }
352}