lance_encoding/previous/encodings/physical/
bitmap.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{ops::Range, sync::Arc};
5
6use arrow_buffer::BooleanBufferBuilder;
7use bytes::Bytes;
8
9use futures::{future::BoxFuture, FutureExt};
10use lance_core::Result;
11use log::trace;
12
13use crate::{
14    buffer::LanceBuffer,
15    data::{BlockInfo, DataBlock, FixedWidthDataBlock},
16    decoder::{PageScheduler, PrimitivePageDecoder},
17    EncodingsIo,
18};
19
20/// A physical scheduler for bitmap buffers encoded densely as 1 bit per value
21/// with bit-endianness(e.g. what Arrow uses for validity bitmaps and boolean arrays)
22///
23/// This decoder decodes from one buffer of disk data into one buffer of memory data
24#[derive(Debug, Clone, Copy)]
25pub struct DenseBitmapScheduler {
26    buffer_offset: u64,
27}
28
29impl DenseBitmapScheduler {
30    pub fn new(buffer_offset: u64) -> Self {
31        Self { buffer_offset }
32    }
33}
34
35impl PageScheduler for DenseBitmapScheduler {
36    fn schedule_ranges(
37        &self,
38        ranges: &[Range<u64>],
39        scheduler: &Arc<dyn EncodingsIo>,
40        top_level_row: u64,
41    ) -> BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>> {
42        let mut min = u64::MAX;
43        let mut max = 0;
44        let chunk_reqs = ranges
45            .iter()
46            .map(|range| {
47                debug_assert_ne!(range.start, range.end);
48                let start = self.buffer_offset + range.start / 8;
49                let bit_offset = range.start % 8;
50                let end = self.buffer_offset + range.end.div_ceil(8);
51                let byte_range = start..end;
52                min = min.min(start);
53                max = max.max(end);
54                (byte_range, bit_offset, range.end - range.start)
55            })
56            .collect::<Vec<_>>();
57
58        let byte_ranges = chunk_reqs
59            .iter()
60            .map(|(range, _, _)| range.clone())
61            .collect::<Vec<_>>();
62        trace!(
63            "Scheduling I/O for {} ranges across byte range {}..{}",
64            byte_ranges.len(),
65            min,
66            max
67        );
68        let bytes = scheduler.submit_request(byte_ranges, top_level_row);
69
70        async move {
71            let bytes = bytes.await?;
72            let chunks = bytes
73                .into_iter()
74                .zip(chunk_reqs)
75                .map(|(bytes, (_, bit_offset, length))| BitmapData {
76                    data: bytes,
77                    bit_offset,
78                    length,
79                })
80                .collect::<Vec<_>>();
81            Ok(Box::new(BitmapDecoder { chunks }) as Box<dyn PrimitivePageDecoder>)
82        }
83        .boxed()
84    }
85}
86
87struct BitmapData {
88    data: Bytes,
89    bit_offset: u64,
90    length: u64,
91}
92
93struct BitmapDecoder {
94    chunks: Vec<BitmapData>,
95}
96
97impl PrimitivePageDecoder for BitmapDecoder {
98    fn decode(&self, rows_to_skip: u64, num_rows: u64) -> Result<DataBlock> {
99        let mut rows_to_skip = rows_to_skip;
100        let mut dest_builder = BooleanBufferBuilder::new(num_rows as usize);
101
102        let mut rows_remaining = num_rows;
103        for chunk in &self.chunks {
104            if chunk.length <= rows_to_skip {
105                rows_to_skip -= chunk.length;
106            } else {
107                let start = rows_to_skip + chunk.bit_offset;
108                let num_vals_to_take = rows_remaining.min(chunk.length - rows_to_skip);
109                let end = start + num_vals_to_take;
110                dest_builder.append_packed_range(start as usize..end as usize, &chunk.data);
111                rows_to_skip = 0;
112                rows_remaining -= num_vals_to_take;
113            }
114        }
115
116        let bool_buffer = dest_builder.finish().into_inner();
117        Ok(DataBlock::FixedWidth(FixedWidthDataBlock {
118            data: LanceBuffer::from(bool_buffer),
119            bits_per_value: 1,
120            num_values: num_rows,
121            block_info: BlockInfo::new(),
122        }))
123    }
124}
125
126#[cfg(test)]
127mod tests {
128
129    use arrow_array::BooleanArray;
130    use arrow_schema::{DataType, Field};
131    use bytes::Bytes;
132    use rstest::rstest;
133    use std::{collections::HashMap, sync::Arc};
134
135    use crate::data::{DataBlock, FixedWidthDataBlock};
136    use crate::decoder::PrimitivePageDecoder;
137    use crate::previous::encodings::physical::bitmap::BitmapData;
138    use crate::testing::{
139        check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases,
140    };
141    use crate::version::LanceFileVersion;
142
143    use super::BitmapDecoder;
144
145    #[rstest]
146    #[test_log::test(tokio::test)]
147    async fn test_bitmap_boolean(
148        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
149    ) {
150        let field = Field::new("", DataType::Boolean, false);
151        check_round_trip_encoding_random(field, version).await;
152    }
153
154    #[test_log::test(tokio::test)]
155    async fn test_fsl_bitmap_boolean() {
156        let field = Field::new("", DataType::Boolean, true);
157        let field = Field::new("", DataType::FixedSizeList(Arc::new(field), 3), true);
158        check_round_trip_encoding_random(field, LanceFileVersion::V2_1).await;
159    }
160
161    #[rstest]
162    #[test_log::test(tokio::test)]
163    async fn test_simple_boolean(
164        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
165    ) {
166        let array = BooleanArray::from(vec![
167            Some(false),
168            Some(true),
169            None,
170            Some(false),
171            Some(true),
172            None,
173            Some(false),
174            None,
175            None,
176        ]);
177
178        let test_cases = TestCases::default()
179            .with_range(0..2)
180            .with_range(0..3)
181            .with_range(1..9)
182            .with_indices(vec![0, 1, 3, 4])
183            .with_file_version(version);
184        check_round_trip_encoding_of_data(vec![Arc::new(array)], &test_cases, HashMap::default())
185            .await;
186    }
187
188    #[rstest]
189    #[test_log::test(tokio::test)]
190    async fn test_tiny_boolean(
191        #[values(LanceFileVersion::V2_0, LanceFileVersion::V2_1)] version: LanceFileVersion,
192    ) {
193        // Test case for a tiny boolean array that is technically smaller than 1 byte
194        let array = BooleanArray::from(vec![Some(false), Some(true), None]);
195
196        let test_cases = TestCases::default()
197            .with_range(0..1)
198            .with_range(1..3)
199            .with_indices(vec![0, 2])
200            .with_file_version(version);
201        check_round_trip_encoding_of_data(vec![Arc::new(array)], &test_cases, HashMap::default())
202            .await;
203    }
204
205    #[test]
206    fn test_bitmap_decoder_edge_cases() {
207        // Regression for a case where the row skip and the bit offset
208        // require us to read from the second Bytes instead of the first
209        let decoder = BitmapDecoder {
210            chunks: vec![
211                BitmapData {
212                    data: Bytes::from_static(&[0b11111111]),
213                    bit_offset: 4,
214                    length: 4,
215                },
216                BitmapData {
217                    data: Bytes::from_static(&[0b00000000]),
218                    bit_offset: 4,
219                    length: 4,
220                },
221            ],
222        };
223
224        // Read from first and second chunk
225        let result = decoder.decode(2, 4).unwrap();
226        let DataBlock::FixedWidth(FixedWidthDataBlock { data, .. }) = result else {
227            panic!("expected fixed width data block");
228        };
229        assert_eq!(data.as_ref(), &[0b00000011]);
230
231        // Read from second chunk
232        let result = decoder.decode(5, 1).unwrap();
233        let DataBlock::FixedWidth(FixedWidthDataBlock { data, .. }) = result else {
234            panic!("expected fixed width data block");
235        };
236        assert_eq!(data.as_ref(), &[0b00000000]);
237    }
238}