1use futures::{future::BoxFuture, FutureExt};
5use lance_encoding::EncodingsIo;
6use lance_io::scheduler::FileScheduler;
7
8use super::reader::DEFAULT_READ_CHUNK_SIZE;
9
10#[derive(Debug)]
11pub struct LanceEncodingsIo {
12 scheduler: FileScheduler,
13 read_chunk_size: u64,
15}
16
17impl LanceEncodingsIo {
18 pub fn new(scheduler: FileScheduler) -> Self {
19 Self {
20 scheduler,
21 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
22 }
23 }
24
25 pub fn with_read_chunk_size(mut self, read_chunk_size: u64) -> Self {
26 self.read_chunk_size = read_chunk_size;
27 self
28 }
29}
30
31impl EncodingsIo for LanceEncodingsIo {
32 fn submit_request(
33 &self,
34 ranges: Vec<std::ops::Range<u64>>,
35 priority: u64,
36 ) -> BoxFuture<'static, lance_core::Result<Vec<bytes::Bytes>>> {
37 let mut split_ranges = Vec::new();
38 let mut split_indices = Vec::new(); for (idx, range) in ranges.iter().enumerate() {
44 let range_size = range.end - range.start;
45
46 if range_size > self.read_chunk_size {
47 let num_chunks = range_size.div_ceil(self.read_chunk_size);
48 let chunk_size = range_size / num_chunks;
49
50 for i in 0..num_chunks {
51 let start = range.start + i * chunk_size;
52 let end = if i == num_chunks - 1 {
53 range.end } else {
55 start + chunk_size
56 };
57 split_ranges.push(start..end);
58 split_indices.push(idx);
59 }
60 } else {
61 split_ranges.push(range.clone());
62 split_indices.push(idx);
63 }
64 }
65
66 let fut = self.scheduler.submit_request(split_ranges, priority);
67
68 async move {
69 let split_results = fut.await?;
70
71 if split_results.len() == ranges.len() {
73 return Ok(split_results);
74 }
75
76 let mut results = vec![Vec::new(); ranges.len()];
78
79 for (split_result, &orig_idx) in split_results.iter().zip(split_indices.iter()) {
80 results[orig_idx].push(split_result.clone());
81 }
82
83 Ok(results
84 .into_iter()
85 .map(|chunks| {
86 if chunks.len() == 1 {
87 chunks.into_iter().next().unwrap()
88 } else {
89 let total_size: usize = chunks.iter().map(|c| c.len()).sum();
91 let mut combined = Vec::with_capacity(total_size);
92 for chunk in chunks {
93 combined.extend_from_slice(&chunk);
94 }
95 bytes::Bytes::from(combined)
96 }
97 })
98 .collect())
99 }
100 .boxed()
101 }
102}