Skip to main content

lance_file/
io.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use futures::{FutureExt, future::BoxFuture};
7use lance_encoding::EncodingsIo;
8use lance_io::scheduler::FileScheduler;
9
10use super::reader::DEFAULT_READ_CHUNK_SIZE;
11
12#[derive(Debug)]
13pub struct LanceEncodingsIo {
14    scheduler: FileScheduler,
15    /// Size of chunks when reading large pages
16    read_chunk_size: u64,
17}
18
19impl LanceEncodingsIo {
20    pub fn new(scheduler: FileScheduler) -> Self {
21        Self {
22            scheduler,
23            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
24        }
25    }
26
27    pub fn with_read_chunk_size(mut self, read_chunk_size: u64) -> Self {
28        self.read_chunk_size = read_chunk_size;
29        self
30    }
31}
32
33impl EncodingsIo for LanceEncodingsIo {
34    fn with_bypass_backpressure(&self) -> Option<Arc<dyn EncodingsIo>> {
35        Some(Arc::new(Self {
36            scheduler: self.scheduler.with_bypass_backpressure(),
37            read_chunk_size: self.read_chunk_size,
38        }))
39    }
40
41    fn with_io_stats(
42        &self,
43        stats: Arc<dyn lance_core::utils::io_stats::IoStatsRecorder>,
44    ) -> Option<Arc<dyn EncodingsIo>> {
45        Some(Arc::new(Self {
46            scheduler: self.scheduler.with_io_stats(stats),
47            read_chunk_size: self.read_chunk_size,
48        }))
49    }
50
51    fn submit_request(
52        &self,
53        ranges: Vec<std::ops::Range<u64>>,
54        priority: u64,
55    ) -> BoxFuture<'static, lance_core::Result<Vec<bytes::Bytes>>> {
56        let mut split_ranges = Vec::new();
57        let mut split_indices = Vec::new(); // Track which original range each split came from
58
59        // Split large ranges into smaller chunks
60        //
61        // TODO: consider read_chunk_size before submitting requests.
62        for (idx, range) in ranges.iter().enumerate() {
63            let range_size = range.end - range.start;
64
65            if range_size > self.read_chunk_size {
66                let num_chunks = range_size.div_ceil(self.read_chunk_size);
67                let chunk_size = range_size / num_chunks;
68
69                for i in 0..num_chunks {
70                    let start = range.start + i * chunk_size;
71                    let end = if i == num_chunks - 1 {
72                        range.end // Last chunk gets any remaining bytes
73                    } else {
74                        start + chunk_size
75                    };
76                    split_ranges.push(start..end);
77                    split_indices.push(idx);
78                }
79            } else {
80                split_ranges.push(range.clone());
81                split_indices.push(idx);
82            }
83        }
84
85        let fut = self.scheduler.submit_request(split_ranges, priority);
86
87        async move {
88            let split_results = fut.await?;
89
90            // Fast path: if no splitting occurred, return results directly
91            if split_results.len() == ranges.len() {
92                return Ok(split_results);
93            }
94
95            // Slow path: reassemble split results
96            let mut results = vec![Vec::new(); ranges.len()];
97
98            for (split_result, &orig_idx) in split_results.iter().zip(split_indices.iter()) {
99                results[orig_idx].push(split_result.clone());
100            }
101
102            Ok(results
103                .into_iter()
104                .map(|chunks| {
105                    if chunks.len() == 1 {
106                        chunks.into_iter().next().unwrap()
107                    } else {
108                        // Concatenate multiple chunks
109                        let total_size: usize = chunks.iter().map(|c| c.len()).sum();
110                        let mut combined = Vec::with_capacity(total_size);
111                        for chunk in chunks {
112                            combined.extend_from_slice(&chunk);
113                        }
114                        bytes::Bytes::from(combined)
115                    }
116                })
117                .collect())
118        }
119        .boxed()
120    }
121}