lance_file/v2/
io.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use futures::{future::BoxFuture, FutureExt};
5use lance_encoding::EncodingsIo;
6use lance_io::scheduler::FileScheduler;
7
8use super::reader::DEFAULT_READ_CHUNK_SIZE;
9
10#[derive(Debug)]
11pub struct LanceEncodingsIo {
12    scheduler: FileScheduler,
13    /// Size of chunks when reading large pages
14    read_chunk_size: u64,
15}
16
17impl LanceEncodingsIo {
18    pub fn new(scheduler: FileScheduler) -> Self {
19        Self {
20            scheduler,
21            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
22        }
23    }
24
25    pub fn with_read_chunk_size(mut self, read_chunk_size: u64) -> Self {
26        self.read_chunk_size = read_chunk_size;
27        self
28    }
29}
30
31impl EncodingsIo for LanceEncodingsIo {
32    fn submit_request(
33        &self,
34        ranges: Vec<std::ops::Range<u64>>,
35        priority: u64,
36    ) -> BoxFuture<'static, lance_core::Result<Vec<bytes::Bytes>>> {
37        let mut split_ranges = Vec::new();
38        let mut split_indices = Vec::new(); // Track which original range each split came from
39
40        // Split large ranges into smaller chunks
41        //
42        // TODO: consider read_chunk_size before submitting requests.
43        for (idx, range) in ranges.iter().enumerate() {
44            let range_size = range.end - range.start;
45
46            if range_size > self.read_chunk_size {
47                let num_chunks = range_size.div_ceil(self.read_chunk_size);
48                let chunk_size = range_size / num_chunks;
49
50                for i in 0..num_chunks {
51                    let start = range.start + i * chunk_size;
52                    let end = if i == num_chunks - 1 {
53                        range.end // Last chunk gets any remaining bytes
54                    } else {
55                        start + chunk_size
56                    };
57                    split_ranges.push(start..end);
58                    split_indices.push(idx);
59                }
60            } else {
61                split_ranges.push(range.clone());
62                split_indices.push(idx);
63            }
64        }
65
66        let fut = self.scheduler.submit_request(split_ranges, priority);
67
68        async move {
69            let split_results = fut.await?;
70
71            // Fast path: if no splitting occurred, return results directly
72            if split_results.len() == ranges.len() {
73                return Ok(split_results);
74            }
75
76            // Slow path: reassemble split results
77            let mut results = vec![Vec::new(); ranges.len()];
78
79            for (split_result, &orig_idx) in split_results.iter().zip(split_indices.iter()) {
80                results[orig_idx].push(split_result.clone());
81            }
82
83            Ok(results
84                .into_iter()
85                .map(|chunks| {
86                    if chunks.len() == 1 {
87                        chunks.into_iter().next().unwrap()
88                    } else {
89                        // Concatenate multiple chunks
90                        let total_size: usize = chunks.iter().map(|c| c.len()).sum();
91                        let mut combined = Vec::with_capacity(total_size);
92                        for chunk in chunks {
93                            combined.extend_from_slice(&chunk);
94                        }
95                        bytes::Bytes::from(combined)
96                    }
97                })
98                .collect())
99        }
100        .boxed()
101    }
102}