1use std::sync::Arc;
5
6use futures::{FutureExt, future::BoxFuture};
7use lance_encoding::EncodingsIo;
8use lance_io::scheduler::FileScheduler;
9
10use super::reader::DEFAULT_READ_CHUNK_SIZE;
11
12#[derive(Debug)]
13pub struct LanceEncodingsIo {
14 scheduler: FileScheduler,
15 read_chunk_size: u64,
17}
18
19impl LanceEncodingsIo {
20 pub fn new(scheduler: FileScheduler) -> Self {
21 Self {
22 scheduler,
23 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
24 }
25 }
26
27 pub fn with_read_chunk_size(mut self, read_chunk_size: u64) -> Self {
28 self.read_chunk_size = read_chunk_size;
29 self
30 }
31}
32
33impl EncodingsIo for LanceEncodingsIo {
34 fn with_bypass_backpressure(&self) -> Option<Arc<dyn EncodingsIo>> {
35 Some(Arc::new(Self {
36 scheduler: self.scheduler.with_bypass_backpressure(),
37 read_chunk_size: self.read_chunk_size,
38 }))
39 }
40
41 fn with_io_stats(
42 &self,
43 stats: Arc<dyn lance_core::utils::io_stats::IoStatsRecorder>,
44 ) -> Option<Arc<dyn EncodingsIo>> {
45 Some(Arc::new(Self {
46 scheduler: self.scheduler.with_io_stats(stats),
47 read_chunk_size: self.read_chunk_size,
48 }))
49 }
50
51 fn submit_request(
52 &self,
53 ranges: Vec<std::ops::Range<u64>>,
54 priority: u64,
55 ) -> BoxFuture<'static, lance_core::Result<Vec<bytes::Bytes>>> {
56 let mut split_ranges = Vec::new();
57 let mut split_indices = Vec::new(); for (idx, range) in ranges.iter().enumerate() {
63 let range_size = range.end - range.start;
64
65 if range_size > self.read_chunk_size {
66 let num_chunks = range_size.div_ceil(self.read_chunk_size);
67 let chunk_size = range_size / num_chunks;
68
69 for i in 0..num_chunks {
70 let start = range.start + i * chunk_size;
71 let end = if i == num_chunks - 1 {
72 range.end } else {
74 start + chunk_size
75 };
76 split_ranges.push(start..end);
77 split_indices.push(idx);
78 }
79 } else {
80 split_ranges.push(range.clone());
81 split_indices.push(idx);
82 }
83 }
84
85 let fut = self.scheduler.submit_request(split_ranges, priority);
86
87 async move {
88 let split_results = fut.await?;
89
90 if split_results.len() == ranges.len() {
92 return Ok(split_results);
93 }
94
95 let mut results = vec![Vec::new(); ranges.len()];
97
98 for (split_result, &orig_idx) in split_results.iter().zip(split_indices.iter()) {
99 results[orig_idx].push(split_result.clone());
100 }
101
102 Ok(results
103 .into_iter()
104 .map(|chunks| {
105 if chunks.len() == 1 {
106 chunks.into_iter().next().unwrap()
107 } else {
108 let total_size: usize = chunks.iter().map(|c| c.len()).sum();
110 let mut combined = Vec::with_capacity(total_size);
111 for chunk in chunks {
112 combined.extend_from_slice(&chunk);
113 }
114 bytes::Bytes::from(combined)
115 }
116 })
117 .collect())
118 }
119 .boxed()
120 }
121}