lance_encoding/previous/encodings/physical/
fsst.rs1use std::{ops::Range, sync::Arc};
5
6use arrow_buffer::ScalarBuffer;
7use arrow_schema::DataType;
8use futures::{future::BoxFuture, FutureExt};
9
10use lance_core::Result;
11
12use crate::{
13 buffer::LanceBuffer,
14 data::{BlockInfo, DataBlock, NullableDataBlock, VariableWidthBlock},
15 decoder::{PageScheduler, PrimitivePageDecoder},
16 format::ProtobufUtils,
17 previous::encoder::{ArrayEncoder, EncodedArray},
18 EncodingsIo,
19};
20
21#[derive(Debug)]
22pub struct FsstPageScheduler {
23 inner_scheduler: Box<dyn PageScheduler>,
24 symbol_table: LanceBuffer,
25}
26
27impl FsstPageScheduler {
28 pub fn new(inner_scheduler: Box<dyn PageScheduler>, symbol_table: LanceBuffer) -> Self {
29 Self {
30 inner_scheduler,
31 symbol_table,
32 }
33 }
34}
35
36impl PageScheduler for FsstPageScheduler {
37 fn schedule_ranges(
38 &self,
39 ranges: &[Range<u64>],
40 scheduler: &Arc<dyn EncodingsIo>,
41 top_level_row: u64,
42 ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
43 let inner_decoder = self
44 .inner_scheduler
45 .schedule_ranges(ranges, scheduler, top_level_row);
46 let symbol_table = self.symbol_table.clone();
47
48 async move {
49 let inner_decoder = inner_decoder.await?;
50 Ok(Box::new(FsstPageDecoder {
51 inner_decoder,
52 symbol_table,
53 }) as Box<dyn PrimitivePageDecoder>)
54 }
55 .boxed()
56 }
57}
58
59struct FsstPageDecoder {
60 inner_decoder: Box<dyn PrimitivePageDecoder>,
61 symbol_table: LanceBuffer,
62}
63
64impl PrimitivePageDecoder for FsstPageDecoder {
65 fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
66 let compressed_data = self.inner_decoder.decode(rows_to_skip, num_rows)?;
67 let (string_data, nulls) = match compressed_data {
68 DataBlock::Nullable(nullable) => {
69 let data = nullable.data.as_variable_width().unwrap();
70 Result::Ok((data, Some(nullable.nulls)))
71 }
72 DataBlock::VariableWidth(variable) => Ok((variable, None)),
73 _ => panic!("Received non-variable width data from inner decoder"),
74 }?;
75
76 let offsets = ScalarBuffer::<i32>::from(string_data.offsets.into_buffer());
77 let bytes = string_data.data.into_buffer();
78
79 let mut decompressed_offsets = vec![0_i32; offsets.len()];
80 let mut decompressed_bytes = vec![0_u8; bytes.len() * 8];
81 unsafe {
83 decompressed_bytes.set_len(decompressed_bytes.capacity());
84 }
85 fsst::fsst::decompress(
86 &self.symbol_table,
87 &bytes,
88 &offsets,
89 &mut decompressed_bytes,
90 &mut decompressed_offsets,
91 )?;
92
93 let mut offsets_as_bytes_mut = Vec::with_capacity(decompressed_offsets.len());
97 let decompressed_offsets = ScalarBuffer::<i32>::from(decompressed_offsets);
98 offsets_as_bytes_mut.extend_from_slice(decompressed_offsets.inner().as_slice());
99
100 let mut bytes_as_bytes_mut = Vec::with_capacity(decompressed_bytes.len());
101 bytes_as_bytes_mut.extend_from_slice(&decompressed_bytes);
102
103 let new_string_data = DataBlock::VariableWidth(VariableWidthBlock {
104 bits_per_offset: 32,
105 data: LanceBuffer::from(bytes_as_bytes_mut),
106 num_values: num_rows,
107 offsets: LanceBuffer::from(offsets_as_bytes_mut),
108 block_info: BlockInfo::new(),
109 });
110
111 if let Some(nulls) = nulls {
112 Ok(DataBlock::Nullable(NullableDataBlock {
113 data: Box::new(new_string_data),
114 nulls,
115 block_info: BlockInfo::new(),
116 }))
117 } else {
118 Ok(new_string_data)
119 }
120 }
121}
122
123#[derive(Debug)]
124pub struct FsstArrayEncoder {
125 inner_encoder: Box<dyn ArrayEncoder>,
126}
127
128impl FsstArrayEncoder {
129 pub fn new(inner_encoder: Box<dyn ArrayEncoder>) -> Self {
130 Self { inner_encoder }
131 }
132}
133
134impl ArrayEncoder for FsstArrayEncoder {
135 fn encode(
136 &self,
137 data: DataBlock,
138 data_type: &DataType,
139 buffer_index: &mut u32,
140 ) -> lance_core::Result<EncodedArray> {
141 let (data, nulls) = match data {
142 DataBlock::Nullable(nullable) => {
143 let data = nullable.data.as_variable_width().unwrap();
144 (data, Some(nullable.nulls))
145 }
146 DataBlock::VariableWidth(variable) => (variable, None),
147 _ => panic!("Expected variable width data block"),
148 };
149 assert_eq!(data.bits_per_offset, 32);
150 let num_values = data.num_values;
151 let offsets = data.offsets.borrow_to_typed_slice::<i32>();
152 let offsets_slice = offsets.as_ref();
153 let bytes_data = data.data.into_buffer();
154
155 let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
156 let mut dest_values = vec![0_u8; bytes_data.len() * 2];
157 let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
158
159 fsst::fsst::compress(
160 &mut symbol_table,
161 bytes_data.as_slice(),
162 offsets_slice,
163 &mut dest_values,
164 &mut dest_offsets,
165 )?;
166
167 let dest_offset = LanceBuffer::reinterpret_vec(dest_offsets);
168 let dest_values = LanceBuffer::from(dest_values);
169 let dest_data = DataBlock::VariableWidth(VariableWidthBlock {
170 bits_per_offset: 32,
171 data: dest_values,
172 num_values,
173 offsets: dest_offset,
174 block_info: BlockInfo::new(),
175 });
176
177 let data_block = if let Some(nulls) = nulls {
178 DataBlock::Nullable(NullableDataBlock {
179 data: Box::new(dest_data),
180 nulls,
181 block_info: BlockInfo::new(),
182 })
183 } else {
184 dest_data
185 };
186
187 let inner_encoded = self
188 .inner_encoder
189 .encode(data_block, data_type, buffer_index)?;
190
191 let encoding = ProtobufUtils::fsst(inner_encoded.encoding, symbol_table);
192
193 Ok(EncodedArray {
194 data: inner_encoded.data,
195 encoding,
196 })
197 }
198}