lance_encoding/previous/encodings/physical/
value.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use bytes::Bytes;
6use futures::{future::BoxFuture, FutureExt};
7use log::trace;
8use snafu::location;
9use std::ops::Range;
10use std::sync::{Arc, Mutex};
11
12use crate::buffer::LanceBuffer;
13use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
14use crate::encodings::physical::block::{
15    CompressionConfig, CompressionScheme, GeneralBufferCompressor,
16};
17use crate::encodings::physical::value::ValueEncoder;
18use crate::format::ProtobufUtils;
19use crate::{
20    decoder::{PageScheduler, PrimitivePageDecoder},
21    previous::encoder::{ArrayEncoder, EncodedArray},
22    EncodingsIo,
23};
24
25use lance_core::{Error, Result};
26
27/// Scheduler for a simple encoding where buffers of fixed-size items are stored as-is on disk
28#[derive(Debug, Clone, Copy)]
29pub struct ValuePageScheduler {
30    // TODO: do we really support values greater than 2^32 bytes per value?
31    // I think we want to, in theory, but will need to test this case.
32    bytes_per_value: u64,
33    buffer_offset: u64,
34    buffer_size: u64,
35    compression_config: CompressionConfig,
36}
37
38impl ValuePageScheduler {
39    pub fn new(
40        bytes_per_value: u64,
41        buffer_offset: u64,
42        buffer_size: u64,
43        compression_config: CompressionConfig,
44    ) -> Self {
45        Self {
46            bytes_per_value,
47            buffer_offset,
48            buffer_size,
49            compression_config,
50        }
51    }
52}
53
54impl PageScheduler for ValuePageScheduler {
55    fn schedule_ranges(
56        &self,
57        ranges: &[std::ops::Range<u64>],
58        scheduler: &Arc<dyn EncodingsIo>,
59        top_level_row: u64,
60    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
61        let (mut min, mut max) = (u64::MAX, 0);
62        let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
63            ranges
64                .iter()
65                .map(|range| {
66                    let start = self.buffer_offset + (range.start * self.bytes_per_value);
67                    let end = self.buffer_offset + (range.end * self.bytes_per_value);
68                    min = min.min(start);
69                    max = max.max(end);
70                    start..end
71                })
72                .collect::<Vec<_>>()
73        } else {
74            min = self.buffer_offset;
75            max = self.buffer_offset + self.buffer_size;
76            // for compressed page, the ranges are always the entire page,
77            // and it is guaranteed that only one range is passed
78            vec![Range {
79                start: min,
80                end: max,
81            }]
82        };
83
84        trace!(
85            "Scheduling I/O for {} ranges spread across byte range {}..{}",
86            byte_ranges.len(),
87            min,
88            max
89        );
90        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
91        let bytes_per_value = self.bytes_per_value;
92
93        let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
94            ranges
95                .iter()
96                .map(|range| {
97                    let start = (range.start * bytes_per_value) as usize;
98                    let end = (range.end * bytes_per_value) as usize;
99                    start..end
100                })
101                .collect::<Vec<_>>()
102        } else {
103            vec![]
104        };
105
106        let compression_config = self.compression_config;
107        async move {
108            let bytes = bytes.await?;
109
110            Ok(Box::new(ValuePageDecoder {
111                bytes_per_value,
112                data: bytes,
113                uncompressed_data: Arc::new(Mutex::new(None)),
114                uncompressed_range_offsets: range_offsets,
115                compression_config,
116            }) as Box<dyn PrimitivePageDecoder>)
117        }
118        .boxed()
119    }
120}
121
122struct ValuePageDecoder {
123    bytes_per_value: u64,
124    data: Vec<Bytes>,
125    uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
126    uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
127    compression_config: CompressionConfig,
128}
129
130impl ValuePageDecoder {
131    fn decompress(&self) -> Result<Vec<Bytes>> {
132        // for compressed page, it is guaranteed that only one range is passed
133        let bytes_u8: Vec<u8> = self.data[0].to_vec();
134        let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config)?;
135        let mut uncompressed_bytes: Vec<u8> = Vec::new();
136        buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
137
138        let mut bytes_in_ranges: Vec<Bytes> =
139            Vec::with_capacity(self.uncompressed_range_offsets.len());
140        for range in &self.uncompressed_range_offsets {
141            let start = range.start;
142            let end = range.end;
143            bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
144        }
145        Ok(bytes_in_ranges)
146    }
147
148    fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
149        let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
150        if uncompressed_bytes.is_none() {
151            *uncompressed_bytes = Some(self.decompress()?);
152        }
153        Ok(Arc::clone(&self.uncompressed_data))
154    }
155
156    fn is_compressed(&self) -> bool {
157        !self.uncompressed_range_offsets.is_empty()
158    }
159
160    fn decode_buffers<'a>(
161        &'a self,
162        buffers: impl IntoIterator<Item = &'a Bytes>,
163        mut bytes_to_skip: u64,
164        mut bytes_to_take: u64,
165    ) -> LanceBuffer {
166        let mut dest: Option<Vec<u8>> = None;
167
168        for buf in buffers.into_iter() {
169            let buf_len = buf.len() as u64;
170            if bytes_to_skip > buf_len {
171                bytes_to_skip -= buf_len;
172            } else {
173                let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
174                bytes_to_take -= bytes_to_take_here;
175                let start = bytes_to_skip as usize;
176                let end = start + bytes_to_take_here as usize;
177                let slice = buf.slice(start..end);
178                match (&mut dest, bytes_to_take) {
179                    (None, 0) => {
180                        // The entire request is contained in one buffer so we can maybe zero-copy
181                        // if the slice is aligned properly
182                        return LanceBuffer::from_bytes(slice, self.bytes_per_value);
183                    }
184                    (None, _) => {
185                        dest.replace(Vec::with_capacity(bytes_to_take as usize));
186                    }
187                    _ => {}
188                }
189                dest.as_mut().unwrap().extend_from_slice(&slice);
190                bytes_to_skip = 0;
191            }
192        }
193        LanceBuffer::from(dest.unwrap_or_default())
194    }
195}
196
197impl PrimitivePageDecoder for ValuePageDecoder {
198    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
199        let bytes_to_skip = rows_to_skip * self.bytes_per_value;
200        let bytes_to_take = num_rows * self.bytes_per_value;
201
202        let data_buffer = if self.is_compressed() {
203            let decoding_data = self.get_uncompressed_bytes()?;
204            let buffers = decoding_data.lock().unwrap();
205            self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
206        } else {
207            self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
208        };
209        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
210            bits_per_value: self.bytes_per_value * 8,
211            data: data_buffer,
212            num_values: num_rows,
213            block_info: BlockInfo::new(),
214        }))
215    }
216}
217
218impl ArrayEncoder for ValueEncoder {
219    fn encode(
220        &self,
221        data: DataBlock,
222        _data_type: &DataType,
223        buffer_index: &mut u32,
224    ) -> Result<EncodedArray> {
225        let index = *buffer_index;
226        *buffer_index += 1;
227
228        let encoding = match &data {
229            DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
230                fixed_width.bits_per_value,
231                index,
232                None,
233            )),
234            _ => Err(Error::InvalidInput {
235                source: format!(
236                    "Cannot encode a data block of type {} with ValueEncoder",
237                    data.name()
238                )
239                .into(),
240                location: location!(),
241            }),
242        }?;
243        Ok(EncodedArray { data, encoding })
244    }
245}