lance_encoding/previous/encodings/physical/
fsst.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{ops::Range, sync::Arc};
5
6use arrow_buffer::ScalarBuffer;
7use arrow_schema::DataType;
8use futures::{future::BoxFuture, FutureExt};
9
10use lance_core::Result;
11
12use crate::{
13    buffer::LanceBuffer,
14    data::{BlockInfo, DataBlock, NullableDataBlock, VariableWidthBlock},
15    decoder::{PageScheduler, PrimitivePageDecoder},
16    format::ProtobufUtils,
17    previous::encoder::{ArrayEncoder, EncodedArray},
18    EncodingsIo,
19};
20
21#[derive(Debug)]
22pub struct FsstPageScheduler {
23    inner_scheduler: Box<dyn PageScheduler>,
24    symbol_table: LanceBuffer,
25}
26
27impl FsstPageScheduler {
28    pub fn new(inner_scheduler: Box<dyn PageScheduler>, symbol_table: LanceBuffer) -> Self {
29        Self {
30            inner_scheduler,
31            symbol_table,
32        }
33    }
34}
35
36impl PageScheduler for FsstPageScheduler {
37    fn schedule_ranges(
38        &self,
39        ranges: &[Range<u64>],
40        scheduler: &Arc<dyn EncodingsIo>,
41        top_level_row: u64,
42    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
43        let inner_decoder = self
44            .inner_scheduler
45            .schedule_ranges(ranges, scheduler, top_level_row);
46        let symbol_table = self.symbol_table.clone();
47
48        async move {
49            let inner_decoder = inner_decoder.await?;
50            Ok(Box::new(FsstPageDecoder {
51                inner_decoder,
52                symbol_table,
53            }) as Box<dyn PrimitivePageDecoder>)
54        }
55        .boxed()
56    }
57}
58
59struct FsstPageDecoder {
60    inner_decoder: Box<dyn PrimitivePageDecoder>,
61    symbol_table: LanceBuffer,
62}
63
64impl PrimitivePageDecoder for FsstPageDecoder {
65    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
66        let compressed_data = self.inner_decoder.decode(rows_to_skip, num_rows)?;
67        let (string_data, nulls) = match compressed_data {
68            DataBlock::Nullable(nullable) => {
69                let data = nullable.data.as_variable_width().unwrap();
70                Result::Ok((data, Some(nullable.nulls)))
71            }
72            DataBlock::VariableWidth(variable) => Ok((variable, None)),
73            _ => panic!("Received non-variable width data from inner decoder"),
74        }?;
75
76        let offsets = ScalarBuffer::<i32>::from(string_data.offsets.into_buffer());
77        let bytes = string_data.data.into_buffer();
78
79        let mut decompressed_offsets = vec![0_i32; offsets.len()];
80        let mut decompressed_bytes = vec![0_u8; bytes.len() * 8];
81        // Safety: Exposes uninitialized memory but we're about to clobber it
82        unsafe {
83            decompressed_bytes.set_len(decompressed_bytes.capacity());
84        }
85        fsst::fsst::decompress(
86            &self.symbol_table,
87            &bytes,
88            &offsets,
89            &mut decompressed_bytes,
90            &mut decompressed_offsets,
91        )?;
92
93        // TODO: Change PrimitivePageDecoder to use Vec instead of BytesMut
94        // since there is no way to get BytesMut from Vec but these copies should be avoidable
95        // This is not the first time this has happened
96        let mut offsets_as_bytes_mut = Vec::with_capacity(decompressed_offsets.len());
97        let decompressed_offsets = ScalarBuffer::<i32>::from(decompressed_offsets);
98        offsets_as_bytes_mut.extend_from_slice(decompressed_offsets.inner().as_slice());
99
100        let mut bytes_as_bytes_mut = Vec::with_capacity(decompressed_bytes.len());
101        bytes_as_bytes_mut.extend_from_slice(&decompressed_bytes);
102
103        let new_string_data = DataBlock::VariableWidth(VariableWidthBlock {
104            bits_per_offset: 32,
105            data: LanceBuffer::from(bytes_as_bytes_mut),
106            num_values: num_rows,
107            offsets: LanceBuffer::from(offsets_as_bytes_mut),
108            block_info: BlockInfo::new(),
109        });
110
111        if let Some(nulls) = nulls {
112            Ok(DataBlock::Nullable(NullableDataBlock {
113                data: Box::new(new_string_data),
114                nulls,
115                block_info: BlockInfo::new(),
116            }))
117        } else {
118            Ok(new_string_data)
119        }
120    }
121}
122
123#[derive(Debug)]
124pub struct FsstArrayEncoder {
125    inner_encoder: Box<dyn ArrayEncoder>,
126}
127
128impl FsstArrayEncoder {
129    pub fn new(inner_encoder: Box<dyn ArrayEncoder>) -> Self {
130        Self { inner_encoder }
131    }
132}
133
134impl ArrayEncoder for FsstArrayEncoder {
135    fn encode(
136        &self,
137        data: DataBlock,
138        data_type: &DataType,
139        buffer_index: &mut u32,
140    ) -> lance_core::Result<EncodedArray> {
141        let (data, nulls) = match data {
142            DataBlock::Nullable(nullable) => {
143                let data = nullable.data.as_variable_width().unwrap();
144                (data, Some(nullable.nulls))
145            }
146            DataBlock::VariableWidth(variable) => (variable, None),
147            _ => panic!("Expected variable width data block"),
148        };
149        assert_eq!(data.bits_per_offset, 32);
150        let num_values = data.num_values;
151        let offsets = data.offsets.borrow_to_typed_slice::<i32>();
152        let offsets_slice = offsets.as_ref();
153        let bytes_data = data.data.into_buffer();
154
155        let mut dest_offsets = vec![0_i32; offsets_slice.len() * 2];
156        let mut dest_values = vec![0_u8; bytes_data.len() * 2];
157        let mut symbol_table = vec![0_u8; fsst::fsst::FSST_SYMBOL_TABLE_SIZE];
158
159        fsst::fsst::compress(
160            &mut symbol_table,
161            bytes_data.as_slice(),
162            offsets_slice,
163            &mut dest_values,
164            &mut dest_offsets,
165        )?;
166
167        let dest_offset = LanceBuffer::reinterpret_vec(dest_offsets);
168        let dest_values = LanceBuffer::from(dest_values);
169        let dest_data = DataBlock::VariableWidth(VariableWidthBlock {
170            bits_per_offset: 32,
171            data: dest_values,
172            num_values,
173            offsets: dest_offset,
174            block_info: BlockInfo::new(),
175        });
176
177        let data_block = if let Some(nulls) = nulls {
178            DataBlock::Nullable(NullableDataBlock {
179                data: Box::new(dest_data),
180                nulls,
181                block_info: BlockInfo::new(),
182            })
183        } else {
184            dest_data
185        };
186
187        let inner_encoded = self
188            .inner_encoder
189            .encode(data_block, data_type, buffer_index)?;
190
191        let encoding = ProtobufUtils::fsst(inner_encoded.encoding, symbol_table);
192
193        Ok(EncodedArray {
194            data: inner_encoded.data,
195            encoding,
196        })
197    }
198}