lance_encoding/previous/encodings/physical/
value.rs1use arrow_schema::DataType;
5use bytes::Bytes;
6use futures::{FutureExt, future::BoxFuture};
7use log::trace;
8use std::ops::Range;
9use std::sync::{Arc, Mutex};
10
11use crate::buffer::LanceBuffer;
12use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
13use crate::encodings::physical::block::{
14 CompressionConfig, CompressionScheme, GeneralBufferCompressor,
15};
16use crate::encodings::physical::value::ValueEncoder;
17use crate::format::ProtobufUtils;
18use crate::{
19 EncodingsIo,
20 decoder::{PageScheduler, PrimitivePageDecoder},
21 previous::encoder::{ArrayEncoder, EncodedArray},
22};
23
24use lance_core::{Error, Result};
25
26#[derive(Debug, Clone, Copy)]
28pub struct ValuePageScheduler {
29 bytes_per_value: u64,
32 buffer_offset: u64,
33 buffer_size: u64,
34 compression_config: CompressionConfig,
35}
36
37impl ValuePageScheduler {
38 pub fn new(
39 bytes_per_value: u64,
40 buffer_offset: u64,
41 buffer_size: u64,
42 compression_config: CompressionConfig,
43 ) -> Self {
44 Self {
45 bytes_per_value,
46 buffer_offset,
47 buffer_size,
48 compression_config,
49 }
50 }
51}
52
53impl PageScheduler for ValuePageScheduler {
54 fn schedule_ranges(
55 &self,
56 ranges: &[std::ops::Range<u64>],
57 scheduler: &Arc<dyn EncodingsIo>,
58 top_level_row: u64,
59 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
60 let (mut min, mut max) = (u64::MAX, 0);
61 let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
62 ranges
63 .iter()
64 .map(|range| {
65 let start = self.buffer_offset + (range.start * self.bytes_per_value);
66 let end = self.buffer_offset + (range.end * self.bytes_per_value);
67 min = min.min(start);
68 max = max.max(end);
69 start..end
70 })
71 .collect::<Vec<_>>()
72 } else {
73 min = self.buffer_offset;
74 max = self.buffer_offset + self.buffer_size;
75 vec![Range {
78 start: min,
79 end: max,
80 }]
81 };
82
83 trace!(
84 "Scheduling I/O for {} ranges spread across byte range {}..{}",
85 byte_ranges.len(),
86 min,
87 max
88 );
89 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
90 let bytes_per_value = self.bytes_per_value;
91
92 let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
93 ranges
94 .iter()
95 .map(|range| {
96 let start = (range.start * bytes_per_value) as usize;
97 let end = (range.end * bytes_per_value) as usize;
98 start..end
99 })
100 .collect::<Vec<_>>()
101 } else {
102 vec![]
103 };
104
105 let compression_config = self.compression_config;
106 async move {
107 let bytes = bytes.await?;
108
109 Ok(Box::new(ValuePageDecoder {
110 bytes_per_value,
111 data: bytes,
112 uncompressed_data: Arc::new(Mutex::new(None)),
113 uncompressed_range_offsets: range_offsets,
114 compression_config,
115 }) as Box<dyn PrimitivePageDecoder>)
116 }
117 .boxed()
118 }
119}
120
121struct ValuePageDecoder {
122 bytes_per_value: u64,
123 data: Vec<Bytes>,
124 uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
125 uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
126 compression_config: CompressionConfig,
127}
128
129impl ValuePageDecoder {
130 fn decompress(&self) -> Result<Vec<Bytes>> {
131 let bytes_u8: Vec<u8> = self.data[0].to_vec();
133 let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config)?;
134 let mut uncompressed_bytes: Vec<u8> = Vec::new();
135 buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
136
137 let mut bytes_in_ranges: Vec<Bytes> =
138 Vec::with_capacity(self.uncompressed_range_offsets.len());
139 for range in &self.uncompressed_range_offsets {
140 let start = range.start;
141 let end = range.end;
142 bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
143 }
144 Ok(bytes_in_ranges)
145 }
146
147 fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
148 let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
149 if uncompressed_bytes.is_none() {
150 *uncompressed_bytes = Some(self.decompress()?);
151 }
152 Ok(Arc::clone(&self.uncompressed_data))
153 }
154
155 fn is_compressed(&self) -> bool {
156 !self.uncompressed_range_offsets.is_empty()
157 }
158
159 fn decode_buffers<'a>(
160 &'a self,
161 buffers: impl IntoIterator<Item = &'a Bytes>,
162 mut bytes_to_skip: u64,
163 mut bytes_to_take: u64,
164 ) -> LanceBuffer {
165 let mut dest: Option<Vec<u8>> = None;
166
167 for buf in buffers.into_iter() {
168 let buf_len = buf.len() as u64;
169 if bytes_to_skip > buf_len {
170 bytes_to_skip -= buf_len;
171 } else {
172 let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
173 bytes_to_take -= bytes_to_take_here;
174 let start = bytes_to_skip as usize;
175 let end = start + bytes_to_take_here as usize;
176 let slice = buf.slice(start..end);
177 match (&mut dest, bytes_to_take) {
178 (None, 0) => {
179 return LanceBuffer::from_bytes(slice, self.bytes_per_value);
182 }
183 (None, _) => {
184 dest.replace(Vec::with_capacity(bytes_to_take as usize));
185 }
186 _ => {}
187 }
188 dest.as_mut().unwrap().extend_from_slice(&slice);
189 bytes_to_skip = 0;
190 }
191 }
192 LanceBuffer::from(dest.unwrap_or_default())
193 }
194}
195
196impl PrimitivePageDecoder for ValuePageDecoder {
197 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
198 let bytes_to_skip = rows_to_skip * self.bytes_per_value;
199 let bytes_to_take = num_rows * self.bytes_per_value;
200
201 let data_buffer = if self.is_compressed() {
202 let decoding_data = self.get_uncompressed_bytes()?;
203 let buffers = decoding_data.lock().unwrap();
204 self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
205 } else {
206 self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
207 };
208 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
209 bits_per_value: self.bytes_per_value * 8,
210 data: data_buffer,
211 num_values: num_rows,
212 block_info: BlockInfo::new(),
213 }))
214 }
215}
216
217impl ArrayEncoder for ValueEncoder {
218 fn encode(
219 &self,
220 data: DataBlock,
221 _data_type: &DataType,
222 buffer_index: &mut u32,
223 ) -> Result<EncodedArray> {
224 let index = *buffer_index;
225 *buffer_index += 1;
226
227 let encoding = match &data {
228 DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
229 fixed_width.bits_per_value,
230 index,
231 None,
232 )),
233 _ => Err(Error::invalid_input_source(
234 format!(
235 "Cannot encode a data block of type {} with ValueEncoder",
236 data.name()
237 )
238 .into(),
239 )),
240 }?;
241 Ok(EncodedArray { data, encoding })
242 }
243}