lance_encoding/previous/encodings/physical/
value.rs1use arrow_schema::DataType;
5use bytes::Bytes;
6use futures::{future::BoxFuture, FutureExt};
7use log::trace;
8use snafu::location;
9use std::ops::Range;
10use std::sync::{Arc, Mutex};
11
12use crate::buffer::LanceBuffer;
13use crate::data::{BlockInfo, DataBlock, FixedWidthDataBlock};
14use crate::encodings::physical::block::{
15 CompressionConfig, CompressionScheme, GeneralBufferCompressor,
16};
17use crate::encodings::physical::value::ValueEncoder;
18use crate::format::ProtobufUtils;
19use crate::{
20 decoder::{PageScheduler, PrimitivePageDecoder},
21 previous::encoder::{ArrayEncoder, EncodedArray},
22 EncodingsIo,
23};
24
25use lance_core::{Error, Result};
26
27#[derive(Debug, Clone, Copy)]
29pub struct ValuePageScheduler {
30 bytes_per_value: u64,
33 buffer_offset: u64,
34 buffer_size: u64,
35 compression_config: CompressionConfig,
36}
37
38impl ValuePageScheduler {
39 pub fn new(
40 bytes_per_value: u64,
41 buffer_offset: u64,
42 buffer_size: u64,
43 compression_config: CompressionConfig,
44 ) -> Self {
45 Self {
46 bytes_per_value,
47 buffer_offset,
48 buffer_size,
49 compression_config,
50 }
51 }
52}
53
54impl PageScheduler for ValuePageScheduler {
55 fn schedule_ranges(
56 &self,
57 ranges: &[std::ops::Range<u64>],
58 scheduler: &Arc<dyn EncodingsIo>,
59 top_level_row: u64,
60 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
61 let (mut min, mut max) = (u64::MAX, 0);
62 let byte_ranges = if self.compression_config.scheme == CompressionScheme::None {
63 ranges
64 .iter()
65 .map(|range| {
66 let start = self.buffer_offset + (range.start * self.bytes_per_value);
67 let end = self.buffer_offset + (range.end * self.bytes_per_value);
68 min = min.min(start);
69 max = max.max(end);
70 start..end
71 })
72 .collect::<Vec<_>>()
73 } else {
74 min = self.buffer_offset;
75 max = self.buffer_offset + self.buffer_size;
76 vec![Range {
79 start: min,
80 end: max,
81 }]
82 };
83
84 trace!(
85 "Scheduling I/O for {} ranges spread across byte range {}..{}",
86 byte_ranges.len(),
87 min,
88 max
89 );
90 let bytes = scheduler.submit_request(byte_ranges, top_level_row);
91 let bytes_per_value = self.bytes_per_value;
92
93 let range_offsets = if self.compression_config.scheme != CompressionScheme::None {
94 ranges
95 .iter()
96 .map(|range| {
97 let start = (range.start * bytes_per_value) as usize;
98 let end = (range.end * bytes_per_value) as usize;
99 start..end
100 })
101 .collect::<Vec<_>>()
102 } else {
103 vec![]
104 };
105
106 let compression_config = self.compression_config;
107 async move {
108 let bytes = bytes.await?;
109
110 Ok(Box::new(ValuePageDecoder {
111 bytes_per_value,
112 data: bytes,
113 uncompressed_data: Arc::new(Mutex::new(None)),
114 uncompressed_range_offsets: range_offsets,
115 compression_config,
116 }) as Box<dyn PrimitivePageDecoder>)
117 }
118 .boxed()
119 }
120}
121
122struct ValuePageDecoder {
123 bytes_per_value: u64,
124 data: Vec<Bytes>,
125 uncompressed_data: Arc<Mutex<Option<Vec<Bytes>>>>,
126 uncompressed_range_offsets: Vec<std::ops::Range<usize>>,
127 compression_config: CompressionConfig,
128}
129
130impl ValuePageDecoder {
131 fn decompress(&self) -> Result<Vec<Bytes>> {
132 let bytes_u8: Vec<u8> = self.data[0].to_vec();
134 let buffer_compressor = GeneralBufferCompressor::get_compressor(self.compression_config)?;
135 let mut uncompressed_bytes: Vec<u8> = Vec::new();
136 buffer_compressor.decompress(&bytes_u8, &mut uncompressed_bytes)?;
137
138 let mut bytes_in_ranges: Vec<Bytes> =
139 Vec::with_capacity(self.uncompressed_range_offsets.len());
140 for range in &self.uncompressed_range_offsets {
141 let start = range.start;
142 let end = range.end;
143 bytes_in_ranges.push(Bytes::from(uncompressed_bytes[start..end].to_vec()));
144 }
145 Ok(bytes_in_ranges)
146 }
147
148 fn get_uncompressed_bytes(&self) -> Result<Arc<Mutex<Option<Vec<Bytes>>>>> {
149 let mut uncompressed_bytes = self.uncompressed_data.lock().unwrap();
150 if uncompressed_bytes.is_none() {
151 *uncompressed_bytes = Some(self.decompress()?);
152 }
153 Ok(Arc::clone(&self.uncompressed_data))
154 }
155
156 fn is_compressed(&self) -> bool {
157 !self.uncompressed_range_offsets.is_empty()
158 }
159
160 fn decode_buffers<'a>(
161 &'a self,
162 buffers: impl IntoIterator<Item = &'a Bytes>,
163 mut bytes_to_skip: u64,
164 mut bytes_to_take: u64,
165 ) -> LanceBuffer {
166 let mut dest: Option<Vec<u8>> = None;
167
168 for buf in buffers.into_iter() {
169 let buf_len = buf.len() as u64;
170 if bytes_to_skip > buf_len {
171 bytes_to_skip -= buf_len;
172 } else {
173 let bytes_to_take_here = (buf_len - bytes_to_skip).min(bytes_to_take);
174 bytes_to_take -= bytes_to_take_here;
175 let start = bytes_to_skip as usize;
176 let end = start + bytes_to_take_here as usize;
177 let slice = buf.slice(start..end);
178 match (&mut dest, bytes_to_take) {
179 (None, 0) => {
180 return LanceBuffer::from_bytes(slice, self.bytes_per_value);
183 }
184 (None, _) => {
185 dest.replace(Vec::with_capacity(bytes_to_take as usize));
186 }
187 _ => {}
188 }
189 dest.as_mut().unwrap().extend_from_slice(&slice);
190 bytes_to_skip = 0;
191 }
192 }
193 LanceBuffer::from(dest.unwrap_or_default())
194 }
195}
196
197impl PrimitivePageDecoder for ValuePageDecoder {
198 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
199 let bytes_to_skip = rows_to_skip * self.bytes_per_value;
200 let bytes_to_take = num_rows * self.bytes_per_value;
201
202 let data_buffer = if self.is_compressed() {
203 let decoding_data = self.get_uncompressed_bytes()?;
204 let buffers = decoding_data.lock().unwrap();
205 self.decode_buffers(buffers.as_ref().unwrap(), bytes_to_skip, bytes_to_take)
206 } else {
207 self.decode_buffers(&self.data, bytes_to_skip, bytes_to_take)
208 };
209 Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
210 bits_per_value: self.bytes_per_value * 8,
211 data: data_buffer,
212 num_values: num_rows,
213 block_info: BlockInfo::new(),
214 }))
215 }
216}
217
218impl ArrayEncoder for ValueEncoder {
219 fn encode(
220 &self,
221 data: DataBlock,
222 _data_type: &DataType,
223 buffer_index: &mut u32,
224 ) -> Result<EncodedArray> {
225 let index = *buffer_index;
226 *buffer_index += 1;
227
228 let encoding = match &data {
229 DataBlock::FixedWidth(fixed_width) => Ok(ProtobufUtils::flat_encoding(
230 fixed_width.bits_per_value,
231 index,
232 None,
233 )),
234 _ => Err(Error::InvalidInput {
235 source: format!(
236 "Cannot encode a data block of type {} with ValueEncoder",
237 data.name()
238 )
239 .into(),
240 location: location!(),
241 }),
242 }?;
243 Ok(EncodedArray { data, encoding })
244 }
245}