Skip to main content

lance_file/
io.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use futures::{FutureExt, future::BoxFuture};
7use lance_encoding::EncodingsIo;
8use lance_io::scheduler::FileScheduler;
9
10use super::reader::DEFAULT_READ_CHUNK_SIZE;
11
12#[derive(Debug)]
13pub struct LanceEncodingsIo {
14    scheduler: FileScheduler,
15    /// Size of chunks when reading large pages
16    read_chunk_size: u64,
17}
18
19impl LanceEncodingsIo {
20    pub fn new(scheduler: FileScheduler) -> Self {
21        Self {
22            scheduler,
23            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
24        }
25    }
26
27    pub fn with_read_chunk_size(mut self, read_chunk_size: u64) -> Self {
28        self.read_chunk_size = read_chunk_size;
29        self
30    }
31}
32
33impl EncodingsIo for LanceEncodingsIo {
34    fn with_bypass_backpressure(&self) -> Option<Arc<dyn EncodingsIo>> {
35        Some(Arc::new(Self {
36            scheduler: self.scheduler.with_bypass_backpressure(),
37            read_chunk_size: self.read_chunk_size,
38        }))
39    }
40
41    fn submit_request(
42        &self,
43        ranges: Vec<std::ops::Range<u64>>,
44        priority: u64,
45    ) -> BoxFuture<'static, lance_core::Result<Vec<bytes::Bytes>>> {
46        let mut split_ranges = Vec::new();
47        let mut split_indices = Vec::new(); // Track which original range each split came from
48
49        // Split large ranges into smaller chunks
50        //
51        // TODO: consider read_chunk_size before submitting requests.
52        for (idx, range) in ranges.iter().enumerate() {
53            let range_size = range.end - range.start;
54
55            if range_size > self.read_chunk_size {
56                let num_chunks = range_size.div_ceil(self.read_chunk_size);
57                let chunk_size = range_size / num_chunks;
58
59                for i in 0..num_chunks {
60                    let start = range.start + i * chunk_size;
61                    let end = if i == num_chunks - 1 {
62                        range.end // Last chunk gets any remaining bytes
63                    } else {
64                        start + chunk_size
65                    };
66                    split_ranges.push(start..end);
67                    split_indices.push(idx);
68                }
69            } else {
70                split_ranges.push(range.clone());
71                split_indices.push(idx);
72            }
73        }
74
75        let fut = self.scheduler.submit_request(split_ranges, priority);
76
77        async move {
78            let split_results = fut.await?;
79
80            // Fast path: if no splitting occurred, return results directly
81            if split_results.len() == ranges.len() {
82                return Ok(split_results);
83            }
84
85            // Slow path: reassemble split results
86            let mut results = vec![Vec::new(); ranges.len()];
87
88            for (split_result, &orig_idx) in split_results.iter().zip(split_indices.iter()) {
89                results[orig_idx].push(split_result.clone());
90            }
91
92            Ok(results
93                .into_iter()
94                .map(|chunks| {
95                    if chunks.len() == 1 {
96                        chunks.into_iter().next().unwrap()
97                    } else {
98                        // Concatenate multiple chunks
99                        let total_size: usize = chunks.iter().map(|c| c.len()).sum();
100                        let mut combined = Vec::with_capacity(total_size);
101                        for chunk in chunks {
102                            combined.extend_from_slice(&chunk);
103                        }
104                        bytes::Bytes::from(combined)
105                    }
106                })
107                .collect())
108        }
109        .boxed()
110    }
111}