1use std::sync::Arc;
5
6use futures::{FutureExt, future::BoxFuture};
7use lance_encoding::EncodingsIo;
8use lance_io::scheduler::FileScheduler;
9
10use super::reader::DEFAULT_READ_CHUNK_SIZE;
11
12#[derive(Debug)]
13pub struct LanceEncodingsIo {
14 scheduler: FileScheduler,
15 read_chunk_size: u64,
17}
18
19impl LanceEncodingsIo {
20 pub fn new(scheduler: FileScheduler) -> Self {
21 Self {
22 scheduler,
23 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
24 }
25 }
26
27 pub fn with_read_chunk_size(mut self, read_chunk_size: u64) -> Self {
28 self.read_chunk_size = read_chunk_size;
29 self
30 }
31}
32
33impl EncodingsIo for LanceEncodingsIo {
34 fn with_bypass_backpressure(&self) -> Option<Arc<dyn EncodingsIo>> {
35 Some(Arc::new(Self {
36 scheduler: self.scheduler.with_bypass_backpressure(),
37 read_chunk_size: self.read_chunk_size,
38 }))
39 }
40
41 fn submit_request(
42 &self,
43 ranges: Vec<std::ops::Range<u64>>,
44 priority: u64,
45 ) -> BoxFuture<'static, lance_core::Result<Vec<bytes::Bytes>>> {
46 let mut split_ranges = Vec::new();
47 let mut split_indices = Vec::new(); for (idx, range) in ranges.iter().enumerate() {
53 let range_size = range.end - range.start;
54
55 if range_size > self.read_chunk_size {
56 let num_chunks = range_size.div_ceil(self.read_chunk_size);
57 let chunk_size = range_size / num_chunks;
58
59 for i in 0..num_chunks {
60 let start = range.start + i * chunk_size;
61 let end = if i == num_chunks - 1 {
62 range.end } else {
64 start + chunk_size
65 };
66 split_ranges.push(start..end);
67 split_indices.push(idx);
68 }
69 } else {
70 split_ranges.push(range.clone());
71 split_indices.push(idx);
72 }
73 }
74
75 let fut = self.scheduler.submit_request(split_ranges, priority);
76
77 async move {
78 let split_results = fut.await?;
79
80 if split_results.len() == ranges.len() {
82 return Ok(split_results);
83 }
84
85 let mut results = vec![Vec::new(); ranges.len()];
87
88 for (split_result, &orig_idx) in split_results.iter().zip(split_indices.iter()) {
89 results[orig_idx].push(split_result.clone());
90 }
91
92 Ok(results
93 .into_iter()
94 .map(|chunks| {
95 if chunks.len() == 1 {
96 chunks.into_iter().next().unwrap()
97 } else {
98 let total_size: usize = chunks.iter().map(|c| c.len()).sum();
100 let mut combined = Vec::with_capacity(total_size);
101 for chunk in chunks {
102 combined.extend_from_slice(&chunk);
103 }
104 bytes::Bytes::from(combined)
105 }
106 })
107 .collect())
108 }
109 .boxed()
110 }
111}