Skip to main content

lance_encoding/previous/encodings/physical/
value.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::DataType;
5use bytes::Bytes;
6use futures::{FutureExt, future::BoxFuture};
7use log::trace;
8use std::ops::Range;
9use std::sync::{Arc, Mutex};
10
11use crate::buffer::LanceBuffer;
12use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
13use crate::encodings::physical::block::{
14    CompressionConfig, CompressionScheme, GeneralBufferCompressor,
15};
16use crate::encodings::physical::value::ValueEncoder;
17use crate::format::ProtobufUtils;
18use crate::{
19    EncodingsIo,
20    decoder::{PageScheduler, PrimitivePageDecoder},
21    previous::encoder::{ArrayEncoder, EncodedArray},
22};
23
24use lance_core::{Error, Result};
25
26/// Scheduler for a simple encoding where buffers of fixed-size items are stored as-is on disk
27#[derive(Debug, Clone, Copy)]
28pub struct ValuePageScheduler {
29    // TODO: do we really support values greater than 2^32 bytes per value?
30    // I think we want to, in theory, but will need to test this case.
31    bytes_per_value: u64,
32    buffer_offset: u64,
33    buffer_size: u64,
34    compression_config: CompressionConfig,
35}
36
37impl ValuePageScheduler {
38    pub fn new(
39        bytes_per_value: u64,
40        buffer_offset: u64,
41        buffer_size: u64,
42        compression_config: CompressionConfig,
43    ) -> Self {
44        Self {
45            bytes_per_value,
46            buffer_offset,
47            buffer_size,
48            compression_config,
49        }
50    }
51}
52
53impl PageScheduler for ValuePageScheduler {
54    fn schedule_ranges(
55        &self,
56        ranges: &[std::ops::Range<u64>],
57        scheduler: &Arc<dyn EncodingsIo>,
58        top_level_row: u64,
59    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
60        let (mut min, mut max) = (u64::MAX, 0);
61        let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
62            ranges
63                .iter()
64                .map(|range| {
65                    let start = self.buffer_offset + (range.start * self.bytes_per_value);
66                    let end = self.buffer_offset + (range.end * self.bytes_per_value);
67                    min = min.min(start);
68                    max = max.max(end);
69                    start..end
70                })
71                .collect::<Vec<_>>()
72        } else {
73            min = self.buffer_offset;
74            max = self.buffer_offset + self.buffer_size;
75            // for compressed page, the ranges are always the entire page,
76            // and it is guaranteed that only one range is passed
77            vec![Range {
78                start: min,
79                end: max,
80            }]
81        };
82
83        trace!(
84            "Scheduling I/O for {} ranges spread across byte range {}..{}",
85            byte_ranges.len(),
86            min,
87            max
88        );
89        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
90        let bytes_per_value = self.bytes_per_value;
91
92        let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
93            ranges
94                .iter()
95                .map(|range| {
96                    let start = (range.start * bytes_per_value) as usize;
97                    let end = (range.end * bytes_per_value) as usize;
98                    start..end
99                })
100                .collect::<Vec<_>>()
101        } else {
102            vec![]
103        };
104
105        let compression_config = self.compression_config;
106        async move {
107            let bytes = bytes.await?;
108
109            Ok(Box::new(ValuePageDecoder {
110                bytes_per_value,
111                data: bytes,
112                uncompressed_data: Arc::new(Mutex::new(None)),
113                uncompressed_range_offsets: range_offsets,
114                compression_config,
115            }) as Box<dyn PrimitivePageDecoder>)
116        }
117        .boxed()
118    }
119}
120
121struct ValuePageDecoder {
122    bytes_per_value: u64,
123    data: Vec<Bytes>,
124    uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
125    uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
126    compression_config: CompressionConfig,
127}
128
129impl ValuePageDecoder {
130    fn decompress(&self) -> Result<Vec<Bytes>> {
131        // for compressed page, it is guaranteed that only one range is passed
132        let bytes_u8: Vec<u8> = self.data[0].to_vec();
133        let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config)?;
134        let mut uncompressed_bytes: Vec<u8> = Vec::new();
135        buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
136
137        let mut bytes_in_ranges: Vec<Bytes> =
138            Vec::with_capacity(self.uncompressed_range_offsets.len());
139        for range in &self.uncompressed_range_offsets {
140            let start = range.start;
141            let end = range.end;
142            bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
143        }
144        Ok(bytes_in_ranges)
145    }
146
147    fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
148        let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
149        if uncompressed_bytes.is_none() {
150            *uncompressed_bytes = Some(self.decompress()?);
151        }
152        Ok(Arc::clone(&self.uncompressed_data))
153    }
154
155    fn is_compressed(&self) -> bool {
156        !self.uncompressed_range_offsets.is_empty()
157    }
158
159    fn decode_buffers<'a>(
160        &'a self,
161        buffers: impl IntoIterator<Item = &'a Bytes>,
162        mut bytes_to_skip: u64,
163        mut bytes_to_take: u64,
164    ) -> LanceBuffer {
165        let mut dest: Option<Vec<u8>> = None;
166
167        for buf in buffers.into_iter() {
168            let buf_len = buf.len() as u64;
169            if bytes_to_skip > buf_len {
170                bytes_to_skip -= buf_len;
171            } else {
172                let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
173                bytes_to_take -= bytes_to_take_here;
174                let start = bytes_to_skip as usize;
175                let end = start + bytes_to_take_here as usize;
176                let slice = buf.slice(start..end);
177                match (&mut dest, bytes_to_take) {
178                    (None, 0) => {
179                        // The entire request is contained in one buffer so we can maybe zero-copy
180                        // if the slice is aligned properly
181                        return LanceBuffer::from_bytes(slice, self.bytes_per_value);
182                    }
183                    (None, _) => {
184                        dest.replace(Vec::with_capacity(bytes_to_take as usize));
185                    }
186                    _ => {}
187                }
188                dest.as_mut().unwrap().extend_from_slice(&slice);
189                bytes_to_skip = 0;
190            }
191        }
192        LanceBuffer::from(dest.unwrap_or_default())
193    }
194}
195
196impl PrimitivePageDecoder for ValuePageDecoder {
197    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
198        let bytes_to_skip = rows_to_skip * self.bytes_per_value;
199        let bytes_to_take = num_rows * self.bytes_per_value;
200
201        let data_buffer = if self.is_compressed() {
202            let decoding_data = self.get_uncompressed_bytes()?;
203            let buffers = decoding_data.lock().unwrap();
204            self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
205        } else {
206            self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
207        };
208        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
209            bits_per_value: self.bytes_per_value * 8,
210            data: data_buffer,
211            num_values: num_rows,
212            block_info: BlockInfo::new(),
213        }))
214    }
215}
216
217impl ArrayEncoder for ValueEncoder {
218    fn encode(
219        &self,
220        data: DataBlock,
221        _data_type: &DataType,
222        buffer_index: &mut u32,
223    ) -> Result<EncodedArray> {
224        let index = *buffer_index;
225        *buffer_index += 1;
226
227        let encoding = match &data {
228            DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
229                fixed_width.bits_per_value,
230                index,
231                None,
232            )),
233            _ => Err(Error::invalid_input_source(
234                format!(
235                    "Cannot encode a data block of type {} with ValueEncoder",
236                    data.name()
237                )
238                .into(),
239            )),
240        }?;
241        Ok(EncodedArray { data, encoding })
242    }
243}