Skip to main content

lance_encoding/encodings/logical/primitive/
constant.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{any::Any, collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{Array, ArrayRef, new_empty_array};
7use arrow_buffer::ScalarBuffer;
8use arrow_schema::DataType;
9use bytes::Bytes;
10use futures::FutureExt;
11use futures::future::BoxFuture;
12
13use lance_core::{
14    Error, Result,
15    cache::{Context, DeepSizeOf},
16};
17
18use crate::{
19    EncodingsIo,
20    buffer::LanceBuffer,
21    decoder::PageEncoding,
22    encoder::EncodedPage,
23    encodings::logical::primitive::{CachedPageData, PageLoadTask},
24    format::ProtobufUtils21,
25    repdef::{DefinitionInterpretation, RepDefUnraveler},
26};
27
28pub(crate) fn encode_constant_page(
29    column_idx: u32,
30    scalar: ArrayRef,
31    repdef: crate::repdef::SerializedRepDefs,
32    row_number: u64,
33    num_rows: u64,
34) -> Result<EncodedPage> {
35    let inline_value = lance_arrow::scalar::try_inline_value(&scalar);
36    let value_buffer = if inline_value.is_some() {
37        None
38    } else {
39        Some(LanceBuffer::from(
40            lance_arrow::scalar::encode_scalar_value_buffer(&scalar)?,
41        ))
42    };
43
44    let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, inline_value);
45
46    let has_repdef = repdef.repetition_levels.is_some() || repdef.definition_levels.is_some();
47
48    let data = if !has_repdef {
49        value_buffer.into_iter().collect::<Vec<_>>()
50    } else {
51        let rep_bytes = repdef
52            .repetition_levels
53            .as_ref()
54            .map(|rep| LanceBuffer::reinterpret_slice(rep.clone()))
55            .unwrap_or_else(LanceBuffer::empty);
56        let def_bytes = repdef
57            .definition_levels
58            .as_ref()
59            .map(|def| LanceBuffer::reinterpret_slice(def.clone()))
60            .unwrap_or_else(LanceBuffer::empty);
61
62        match value_buffer {
63            Some(value_buffer) => vec![value_buffer, rep_bytes, def_bytes],
64            None => vec![rep_bytes, def_bytes],
65        }
66    };
67
68    Ok(EncodedPage {
69        column_idx,
70        data,
71        description: PageEncoding::Structural(description),
72        num_rows,
73        row_number,
74    })
75}
76
77#[derive(Debug)]
78struct CachedConstantState {
79    scalar: ArrayRef,
80    rep: Option<ScalarBuffer<u16>>,
81    def: Option<ScalarBuffer<u16>>,
82}
83
84impl DeepSizeOf for CachedConstantState {
85    fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
86        self.scalar.get_buffer_memory_size()
87            + self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
88            + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
89    }
90}
91
92impl CachedPageData for CachedConstantState {
93    fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
94        self
95    }
96}
97
98#[derive(Debug, Clone)]
99enum ScalarSource {
100    Inline(Vec<u8>),
101    ValueBuffer(usize),
102}
103
104#[derive(Debug)]
105pub struct ConstantPageScheduler {
106    buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
107    scalar_source: ScalarSource,
108    rep_buf_idx: Option<usize>,
109    def_buf_idx: Option<usize>,
110    data_type: DataType,
111    def_meaning: Arc<[DefinitionInterpretation]>,
112    max_rep: u16,
113    max_visible_def: u16,
114    repdef: Option<Arc<CachedConstantState>>,
115}
116
117impl ConstantPageScheduler {
118    pub fn try_new(
119        buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
120        inline_value: Option<Bytes>,
121        data_type: DataType,
122        def_meaning: Arc<[DefinitionInterpretation]>,
123    ) -> Result<Self> {
124        let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
125        let max_visible_def = def_meaning
126            .iter()
127            .filter(|d| !d.is_list())
128            .map(|d| d.num_def_levels())
129            .sum();
130
131        let (scalar_source, rep_buf_idx, def_buf_idx) =
132            match (inline_value, buffer_offsets_and_sizes.len()) {
133                (Some(inline), 0) => (ScalarSource::Inline(inline.to_vec()), None, None),
134                (Some(inline), 2) => (ScalarSource::Inline(inline.to_vec()), Some(0), Some(1)),
135                (None, 1) => (ScalarSource::ValueBuffer(0), None, None),
136                (None, 3) => (ScalarSource::ValueBuffer(0), Some(1), Some(2)),
137                (Some(_inline), 1) => {
138                    return Err(Error::invalid_input(format!(
139                        "Invalid constant layout: inline_value present with {} buffers",
140                        1
141                    )));
142                }
143                (Some(_inline), 3) => {
144                    return Err(Error::invalid_input(
145                        "Invalid constant layout: inline_value present with 3 buffers",
146                    ));
147                }
148                (None, 0) => {
149                    return Err(Error::invalid_input(
150                        "Invalid constant layout: missing scalar source",
151                    ));
152                }
153                (None, 2) => {
154                    return Err(Error::invalid_input(
155                        "Invalid constant layout: ambiguous (2 buffers and no inline_value)",
156                    ));
157                }
158                (Some(_), n) => {
159                    return Err(Error::invalid_input(format!(
160                        "Invalid constant layout: inline_value present with {} buffers",
161                        n
162                    )));
163                }
164                (None, n) => {
165                    return Err(Error::invalid_input(format!(
166                        "Invalid constant layout: unexpected buffer count {}",
167                        n
168                    )));
169                }
170            };
171
172        Ok(Self {
173            buffer_offsets_and_sizes,
174            scalar_source,
175            rep_buf_idx,
176            def_buf_idx,
177            data_type,
178            def_meaning,
179            max_rep,
180            max_visible_def,
181            repdef: None,
182        })
183    }
184}
185
186impl crate::encodings::logical::primitive::StructuralPageScheduler for ConstantPageScheduler {
187    fn initialize<'a>(
188        &'a mut self,
189        io: &Arc<dyn EncodingsIo>,
190    ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
191        let rep_range = self
192            .rep_buf_idx
193            .and_then(|idx| self.buffer_offsets_and_sizes.get(idx).copied())
194            .filter(|(_, len)| *len > 0)
195            .map(|(pos, len)| pos..pos + len);
196
197        let def_range = self
198            .def_buf_idx
199            .and_then(|idx| self.buffer_offsets_and_sizes.get(idx).copied())
200            .filter(|(_, len)| *len > 0)
201            .map(|(pos, len)| pos..pos + len);
202
203        let scalar_range = match self.scalar_source {
204            ScalarSource::ValueBuffer(idx) => {
205                let (pos, len) = self.buffer_offsets_and_sizes[idx];
206                Some(pos..pos + len)
207            }
208            ScalarSource::Inline(_) => None,
209        };
210
211        let mut reads = Vec::with_capacity(3);
212        if let Some(r) = scalar_range {
213            reads.push(r);
214        }
215        if let Some(r) = rep_range.clone() {
216            reads.push(r);
217        }
218        if let Some(r) = def_range.clone() {
219            reads.push(r);
220        }
221
222        if reads.is_empty() {
223            let ScalarSource::Inline(inline) = &self.scalar_source else {
224                return std::future::ready(Err(Error::invalid_input(
225                    "Invalid constant layout: missing scalar source",
226                )))
227                .boxed();
228            };
229
230            let scalar = match lance_arrow::scalar::decode_scalar_from_inline_value(
231                &self.data_type,
232                inline.as_slice(),
233            ) {
234                Ok(s) => s,
235                Err(e) => return std::future::ready(Err(e.into())).boxed(),
236            };
237            let cached = Arc::new(CachedConstantState {
238                scalar,
239                rep: None,
240                def: None,
241            });
242            self.repdef = Some(cached.clone());
243            return std::future::ready(Ok(cached as Arc<dyn CachedPageData>)).boxed();
244        }
245
246        let data = io.submit_request(reads, 0);
247        let scalar_source = self.scalar_source.clone();
248        let data_type = self.data_type.clone();
249        async move {
250            let mut data_iter = data.await?.into_iter();
251
252            let scalar = match scalar_source {
253                ScalarSource::Inline(inline) => {
254                    lance_arrow::scalar::decode_scalar_from_inline_value(&data_type, &inline)?
255                }
256                ScalarSource::ValueBuffer(_) => {
257                    let bytes = data_iter.next().unwrap();
258                    let buf = LanceBuffer::from_bytes(bytes, 1);
259                    lance_arrow::scalar::decode_scalar_from_value_buffer(&data_type, buf.as_ref())?
260                }
261            };
262
263            let rep = rep_range.map(|_| {
264                let rep = data_iter.next().unwrap();
265                let rep = LanceBuffer::from_bytes(rep, 2);
266                rep.borrow_to_typed_slice::<u16>()
267            });
268
269            let def = def_range.map(|_| {
270                let def = data_iter.next().unwrap();
271                let def = LanceBuffer::from_bytes(def, 2);
272                def.borrow_to_typed_slice::<u16>()
273            });
274
275            let cached = Arc::new(CachedConstantState { scalar, rep, def });
276            self.repdef = Some(cached.clone());
277            Ok(cached as Arc<dyn CachedPageData>)
278        }
279        .boxed()
280    }
281
282    fn load(&mut self, data: &Arc<dyn CachedPageData>) {
283        self.repdef = Some(
284            data.clone()
285                .as_arc_any()
286                .downcast::<CachedConstantState>()
287                .unwrap(),
288        );
289    }
290
291    fn schedule_ranges(
292        &self,
293        ranges: &[Range<u64>],
294        _io: &Arc<dyn EncodingsIo>,
295    ) -> Result<Vec<PageLoadTask>> {
296        let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
297        let decoder = Box::new(ConstantPageDecoder {
298            ranges: VecDeque::from_iter(ranges.iter().cloned()),
299            scalar: self.repdef.as_ref().unwrap().scalar.clone(),
300            rep: self.repdef.as_ref().unwrap().rep.clone(),
301            def: self.repdef.as_ref().unwrap().def.clone(),
302            def_meaning: self.def_meaning.clone(),
303            max_rep: self.max_rep,
304            max_visible_def: self.max_visible_def,
305            cursor_row: 0,
306            cursor_level: 0,
307            num_rows,
308        })
309            as Box<dyn crate::encodings::logical::primitive::StructuralPageDecoder>;
310        Ok(vec![PageLoadTask {
311            decoder_fut: std::future::ready(Ok(decoder)).boxed(),
312            num_rows,
313        }])
314    }
315}
316
317#[derive(Debug)]
318struct ConstantPageDecoder {
319    ranges: VecDeque<Range<u64>>,
320    scalar: ArrayRef,
321    rep: Option<ScalarBuffer<u16>>,
322    def: Option<ScalarBuffer<u16>>,
323    def_meaning: Arc<[DefinitionInterpretation]>,
324    max_rep: u16,
325    max_visible_def: u16,
326    cursor_row: u64,
327    cursor_level: usize,
328    num_rows: u64,
329}
330
331impl ConstantPageDecoder {
332    fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
333        let mut rows_desired = num_rows;
334        let mut ranges = Vec::with_capacity(self.ranges.len());
335        while rows_desired > 0 {
336            let front = self.ranges.front_mut().unwrap();
337            let avail = front.end - front.start;
338            if avail > rows_desired {
339                ranges.push(front.start..front.start + rows_desired);
340                front.start += rows_desired;
341                rows_desired = 0;
342            } else {
343                ranges.push(self.ranges.pop_front().unwrap());
344                rows_desired -= avail;
345            }
346        }
347        ranges
348    }
349
350    fn take_row(&mut self) -> Result<(Range<usize>, u64)> {
351        let start = self.cursor_level;
352        let end = if let Some(rep) = &self.rep {
353            if start >= rep.len() {
354                return Err(Error::internal(
355                    "Invalid constant layout: repetition buffer too short",
356                ));
357            }
358            if rep[start] != self.max_rep {
359                return Err(Error::internal(
360                    "Invalid constant layout: row did not start at max_rep",
361                ));
362            }
363            let mut end = start + 1;
364            while end < rep.len() && rep[end] != self.max_rep {
365                end += 1;
366            }
367            end
368        } else {
369            start + 1
370        };
371
372        let visible = if let Some(def) = &self.def {
373            def[start..end]
374                .iter()
375                .filter(|d| **d <= self.max_visible_def)
376                .count() as u64
377        } else {
378            (end - start) as u64
379        };
380
381        self.cursor_level = end;
382        self.cursor_row += 1;
383        Ok((start..end, visible))
384    }
385
386    fn skip_to_row(&mut self, target_row: u64) -> Result<()> {
387        while self.cursor_row < target_row {
388            self.take_row()?;
389        }
390        Ok(())
391    }
392}
393
394impl crate::encodings::logical::primitive::StructuralPageDecoder for ConstantPageDecoder {
395    fn drain(&mut self, num_rows: u64) -> Result<Box<dyn crate::decoder::DecodePageTask>> {
396        let drained_ranges = self.drain_ranges(num_rows);
397
398        let mut level_slices: Vec<Range<usize>> = Vec::new();
399        let mut visible_items_total: u64 = 0;
400
401        for range in drained_ranges {
402            self.skip_to_row(range.start)?;
403            for _ in range.start..range.end {
404                let (level_range, visible) = self.take_row()?;
405                visible_items_total += visible;
406                if let Some(last) = level_slices.last_mut()
407                    && last.end == level_range.start
408                {
409                    last.end = level_range.end;
410                    continue;
411                }
412                level_slices.push(level_range);
413            }
414        }
415
416        Ok(Box::new(DecodeConstantTask {
417            scalar: self.scalar.clone(),
418            rep: self.rep.clone(),
419            def: self.def.clone(),
420            level_slices,
421            visible_items_total,
422            def_meaning: self.def_meaning.clone(),
423            max_visible_def: self.max_visible_def,
424        }))
425    }
426
427    fn num_rows(&self) -> u64 {
428        self.num_rows
429    }
430}
431
432#[derive(Debug)]
433struct DecodeConstantTask {
434    scalar: ArrayRef,
435    rep: Option<ScalarBuffer<u16>>,
436    def: Option<ScalarBuffer<u16>>,
437    level_slices: Vec<Range<usize>>,
438    visible_items_total: u64,
439    def_meaning: Arc<[DefinitionInterpretation]>,
440    max_visible_def: u16,
441}
442
443impl DecodeConstantTask {
444    fn slice_levels(
445        levels: &Option<ScalarBuffer<u16>>,
446        slices: &[Range<usize>],
447    ) -> Option<Vec<u16>> {
448        levels.as_ref().map(|levels| {
449            let total = slices.iter().map(|r| r.end - r.start).sum();
450            let mut out = Vec::with_capacity(total);
451            for r in slices {
452                out.extend(levels[r.start..r.end].iter().copied());
453            }
454            out
455        })
456    }
457
458    fn materialize_values(&self, num_values: u64) -> Result<ArrayRef> {
459        if num_values == 0 {
460            return Ok(new_empty_array(self.scalar.data_type()));
461        }
462
463        if let DataType::Struct(fields) = self.scalar.data_type()
464            && fields.is_empty()
465        {
466            return Ok(Arc::new(arrow_array::StructArray::new_empty_fields(
467                num_values as usize,
468                None,
469            )) as ArrayRef);
470        }
471
472        let indices = arrow_array::UInt64Array::from(vec![0u64; num_values as usize]);
473        Ok(arrow_select::take::take(
474            self.scalar.as_ref(),
475            &indices,
476            None,
477        )?)
478    }
479}
480
481impl crate::decoder::DecodePageTask for DecodeConstantTask {
482    fn decode(self: Box<Self>) -> Result<crate::decoder::DecodedPage> {
483        let rep = Self::slice_levels(&self.rep, &self.level_slices);
484        let def = Self::slice_levels(&self.def, &self.level_slices);
485
486        let visible_items_total = if let Some(def) = &def {
487            def.iter().filter(|d| **d <= self.max_visible_def).count() as u64
488        } else {
489            self.visible_items_total
490        };
491
492        let values = self.materialize_values(visible_items_total)?;
493        let data = crate::data::DataBlock::from_array(values);
494        let unraveler =
495            RepDefUnraveler::new(rep, def, self.def_meaning.clone(), visible_items_total);
496
497        Ok(crate::decoder::DecodedPage {
498            data,
499            repdef: unraveler,
500        })
501    }
502}