1use std::{any::Any, collections::VecDeque, ops::Range, sync::Arc};
5
6use arrow_array::{Array, ArrayRef, new_empty_array};
7use arrow_buffer::ScalarBuffer;
8use arrow_schema::DataType;
9use bytes::Bytes;
10use futures::FutureExt;
11use futures::future::BoxFuture;
12
13use lance_core::{
14 Error, Result,
15 cache::{Context, DeepSizeOf},
16};
17
18use crate::{
19 EncodingsIo,
20 buffer::LanceBuffer,
21 decoder::PageEncoding,
22 encoder::EncodedPage,
23 encodings::logical::primitive::{CachedPageData, PageLoadTask},
24 format::ProtobufUtils21,
25 repdef::{DefinitionInterpretation, RepDefUnraveler},
26};
27
28pub(crate) fn encode_constant_page(
29 column_idx: u32,
30 scalar: ArrayRef,
31 repdef: crate::repdef::SerializedRepDefs,
32 row_number: u64,
33 num_rows: u64,
34) -> Result<EncodedPage> {
35 let inline_value = lance_arrow::scalar::try_inline_value(&scalar);
36 let value_buffer = if inline_value.is_some() {
37 None
38 } else {
39 Some(LanceBuffer::from(
40 lance_arrow::scalar::encode_scalar_value_buffer(&scalar)?,
41 ))
42 };
43
44 let description = ProtobufUtils21::constant_layout(&repdef.def_meaning, inline_value);
45
46 let has_repdef = repdef.repetition_levels.is_some() || repdef.definition_levels.is_some();
47
48 let data = if !has_repdef {
49 value_buffer.into_iter().collect::<Vec<_>>()
50 } else {
51 let rep_bytes = repdef
52 .repetition_levels
53 .as_ref()
54 .map(|rep| LanceBuffer::reinterpret_slice(rep.clone()))
55 .unwrap_or_else(LanceBuffer::empty);
56 let def_bytes = repdef
57 .definition_levels
58 .as_ref()
59 .map(|def| LanceBuffer::reinterpret_slice(def.clone()))
60 .unwrap_or_else(LanceBuffer::empty);
61
62 match value_buffer {
63 Some(value_buffer) => vec![value_buffer, rep_bytes, def_bytes],
64 None => vec![rep_bytes, def_bytes],
65 }
66 };
67
68 Ok(EncodedPage {
69 column_idx,
70 data,
71 description: PageEncoding::Structural(description),
72 num_rows,
73 row_number,
74 })
75}
76
77#[derive(Debug)]
78struct CachedConstantState {
79 scalar: ArrayRef,
80 rep: Option<ScalarBuffer<u16>>,
81 def: Option<ScalarBuffer<u16>>,
82}
83
84impl DeepSizeOf for CachedConstantState {
85 fn deep_size_of_children(&self, _ctx: &mut Context) -> usize {
86 self.scalar.get_buffer_memory_size()
87 + self.rep.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
88 + self.def.as_ref().map(|buf| buf.len() * 2).unwrap_or(0)
89 }
90}
91
92impl CachedPageData for CachedConstantState {
93 fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static> {
94 self
95 }
96}
97
98#[derive(Debug, Clone)]
99enum ScalarSource {
100 Inline(Vec<u8>),
101 ValueBuffer(usize),
102}
103
104#[derive(Debug)]
105pub struct ConstantPageScheduler {
106 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
107 scalar_source: ScalarSource,
108 rep_buf_idx: Option<usize>,
109 def_buf_idx: Option<usize>,
110 data_type: DataType,
111 def_meaning: Arc<[DefinitionInterpretation]>,
112 max_rep: u16,
113 max_visible_def: u16,
114 repdef: Option<Arc<CachedConstantState>>,
115}
116
117impl ConstantPageScheduler {
118 pub fn try_new(
119 buffer_offsets_and_sizes: Arc<[(u64, u64)]>,
120 inline_value: Option<Bytes>,
121 data_type: DataType,
122 def_meaning: Arc<[DefinitionInterpretation]>,
123 ) -> Result<Self> {
124 let max_rep = def_meaning.iter().filter(|d| d.is_list()).count() as u16;
125 let max_visible_def = def_meaning
126 .iter()
127 .filter(|d| !d.is_list())
128 .map(|d| d.num_def_levels())
129 .sum();
130
131 let (scalar_source, rep_buf_idx, def_buf_idx) =
132 match (inline_value, buffer_offsets_and_sizes.len()) {
133 (Some(inline), 0) => (ScalarSource::Inline(inline.to_vec()), None, None),
134 (Some(inline), 2) => (ScalarSource::Inline(inline.to_vec()), Some(0), Some(1)),
135 (None, 1) => (ScalarSource::ValueBuffer(0), None, None),
136 (None, 3) => (ScalarSource::ValueBuffer(0), Some(1), Some(2)),
137 (Some(_inline), 1) => {
138 return Err(Error::invalid_input(format!(
139 "Invalid constant layout: inline_value present with {} buffers",
140 1
141 )));
142 }
143 (Some(_inline), 3) => {
144 return Err(Error::invalid_input(
145 "Invalid constant layout: inline_value present with 3 buffers",
146 ));
147 }
148 (None, 0) => {
149 return Err(Error::invalid_input(
150 "Invalid constant layout: missing scalar source",
151 ));
152 }
153 (None, 2) => {
154 return Err(Error::invalid_input(
155 "Invalid constant layout: ambiguous (2 buffers and no inline_value)",
156 ));
157 }
158 (Some(_), n) => {
159 return Err(Error::invalid_input(format!(
160 "Invalid constant layout: inline_value present with {} buffers",
161 n
162 )));
163 }
164 (None, n) => {
165 return Err(Error::invalid_input(format!(
166 "Invalid constant layout: unexpected buffer count {}",
167 n
168 )));
169 }
170 };
171
172 Ok(Self {
173 buffer_offsets_and_sizes,
174 scalar_source,
175 rep_buf_idx,
176 def_buf_idx,
177 data_type,
178 def_meaning,
179 max_rep,
180 max_visible_def,
181 repdef: None,
182 })
183 }
184}
185
186impl crate::encodings::logical::primitive::StructuralPageScheduler for ConstantPageScheduler {
187 fn initialize<'a>(
188 &'a mut self,
189 io: &Arc<dyn EncodingsIo>,
190 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
191 let rep_range = self
192 .rep_buf_idx
193 .and_then(|idx| self.buffer_offsets_and_sizes.get(idx).copied())
194 .filter(|(_, len)| *len > 0)
195 .map(|(pos, len)| pos..pos + len);
196
197 let def_range = self
198 .def_buf_idx
199 .and_then(|idx| self.buffer_offsets_and_sizes.get(idx).copied())
200 .filter(|(_, len)| *len > 0)
201 .map(|(pos, len)| pos..pos + len);
202
203 let scalar_range = match self.scalar_source {
204 ScalarSource::ValueBuffer(idx) => {
205 let (pos, len) = self.buffer_offsets_and_sizes[idx];
206 Some(pos..pos + len)
207 }
208 ScalarSource::Inline(_) => None,
209 };
210
211 let mut reads = Vec::with_capacity(3);
212 if let Some(r) = scalar_range {
213 reads.push(r);
214 }
215 if let Some(r) = rep_range.clone() {
216 reads.push(r);
217 }
218 if let Some(r) = def_range.clone() {
219 reads.push(r);
220 }
221
222 if reads.is_empty() {
223 let ScalarSource::Inline(inline) = &self.scalar_source else {
224 return std::future::ready(Err(Error::invalid_input(
225 "Invalid constant layout: missing scalar source",
226 )))
227 .boxed();
228 };
229
230 let scalar = match lance_arrow::scalar::decode_scalar_from_inline_value(
231 &self.data_type,
232 inline.as_slice(),
233 ) {
234 Ok(s) => s,
235 Err(e) => return std::future::ready(Err(e.into())).boxed(),
236 };
237 let cached = Arc::new(CachedConstantState {
238 scalar,
239 rep: None,
240 def: None,
241 });
242 self.repdef = Some(cached.clone());
243 return std::future::ready(Ok(cached as Arc<dyn CachedPageData>)).boxed();
244 }
245
246 let data = io.submit_request(reads, 0);
247 let scalar_source = self.scalar_source.clone();
248 let data_type = self.data_type.clone();
249 async move {
250 let mut data_iter = data.await?.into_iter();
251
252 let scalar = match scalar_source {
253 ScalarSource::Inline(inline) => {
254 lance_arrow::scalar::decode_scalar_from_inline_value(&data_type, &inline)?
255 }
256 ScalarSource::ValueBuffer(_) => {
257 let bytes = data_iter.next().unwrap();
258 let buf = LanceBuffer::from_bytes(bytes, 1);
259 lance_arrow::scalar::decode_scalar_from_value_buffer(&data_type, buf.as_ref())?
260 }
261 };
262
263 let rep = rep_range.map(|_| {
264 let rep = data_iter.next().unwrap();
265 let rep = LanceBuffer::from_bytes(rep, 2);
266 rep.borrow_to_typed_slice::<u16>()
267 });
268
269 let def = def_range.map(|_| {
270 let def = data_iter.next().unwrap();
271 let def = LanceBuffer::from_bytes(def, 2);
272 def.borrow_to_typed_slice::<u16>()
273 });
274
275 let cached = Arc::new(CachedConstantState { scalar, rep, def });
276 self.repdef = Some(cached.clone());
277 Ok(cached as Arc<dyn CachedPageData>)
278 }
279 .boxed()
280 }
281
282 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
283 self.repdef = Some(
284 data.clone()
285 .as_arc_any()
286 .downcast::<CachedConstantState>()
287 .unwrap(),
288 );
289 }
290
291 fn schedule_ranges(
292 &self,
293 ranges: &[Range<u64>],
294 _io: &Arc<dyn EncodingsIo>,
295 ) -> Result<Vec<PageLoadTask>> {
296 let num_rows = ranges.iter().map(|r| r.end - r.start).sum::<u64>();
297 let decoder = Box::new(ConstantPageDecoder {
298 ranges: VecDeque::from_iter(ranges.iter().cloned()),
299 scalar: self.repdef.as_ref().unwrap().scalar.clone(),
300 rep: self.repdef.as_ref().unwrap().rep.clone(),
301 def: self.repdef.as_ref().unwrap().def.clone(),
302 def_meaning: self.def_meaning.clone(),
303 max_rep: self.max_rep,
304 max_visible_def: self.max_visible_def,
305 cursor_row: 0,
306 cursor_level: 0,
307 num_rows,
308 })
309 as Box<dyn crate::encodings::logical::primitive::StructuralPageDecoder>;
310 Ok(vec![PageLoadTask {
311 decoder_fut: std::future::ready(Ok(decoder)).boxed(),
312 num_rows,
313 }])
314 }
315}
316
317#[derive(Debug)]
318struct ConstantPageDecoder {
319 ranges: VecDeque<Range<u64>>,
320 scalar: ArrayRef,
321 rep: Option<ScalarBuffer<u16>>,
322 def: Option<ScalarBuffer<u16>>,
323 def_meaning: Arc<[DefinitionInterpretation]>,
324 max_rep: u16,
325 max_visible_def: u16,
326 cursor_row: u64,
327 cursor_level: usize,
328 num_rows: u64,
329}
330
331impl ConstantPageDecoder {
332 fn drain_ranges(&mut self, num_rows: u64) -> Vec<Range<u64>> {
333 let mut rows_desired = num_rows;
334 let mut ranges = Vec::with_capacity(self.ranges.len());
335 while rows_desired > 0 {
336 let front = self.ranges.front_mut().unwrap();
337 let avail = front.end - front.start;
338 if avail > rows_desired {
339 ranges.push(front.start..front.start + rows_desired);
340 front.start += rows_desired;
341 rows_desired = 0;
342 } else {
343 ranges.push(self.ranges.pop_front().unwrap());
344 rows_desired -= avail;
345 }
346 }
347 ranges
348 }
349
350 fn take_row(&mut self) -> Result<(Range<usize>, u64)> {
351 let start = self.cursor_level;
352 let end = if let Some(rep) = &self.rep {
353 if start >= rep.len() {
354 return Err(Error::internal(
355 "Invalid constant layout: repetition buffer too short",
356 ));
357 }
358 if rep[start] != self.max_rep {
359 return Err(Error::internal(
360 "Invalid constant layout: row did not start at max_rep",
361 ));
362 }
363 let mut end = start + 1;
364 while end < rep.len() && rep[end] != self.max_rep {
365 end += 1;
366 }
367 end
368 } else {
369 start + 1
370 };
371
372 let visible = if let Some(def) = &self.def {
373 def[start..end]
374 .iter()
375 .filter(|d| **d <= self.max_visible_def)
376 .count() as u64
377 } else {
378 (end - start) as u64
379 };
380
381 self.cursor_level = end;
382 self.cursor_row += 1;
383 Ok((start..end, visible))
384 }
385
386 fn skip_to_row(&mut self, target_row: u64) -> Result<()> {
387 while self.cursor_row < target_row {
388 self.take_row()?;
389 }
390 Ok(())
391 }
392}
393
394impl crate::encodings::logical::primitive::StructuralPageDecoder for ConstantPageDecoder {
395 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn crate::decoder::DecodePageTask>> {
396 let drained_ranges = self.drain_ranges(num_rows);
397
398 let mut level_slices: Vec<Range<usize>> = Vec::new();
399 let mut visible_items_total: u64 = 0;
400
401 for range in drained_ranges {
402 self.skip_to_row(range.start)?;
403 for _ in range.start..range.end {
404 let (level_range, visible) = self.take_row()?;
405 visible_items_total += visible;
406 if let Some(last) = level_slices.last_mut()
407 && last.end == level_range.start
408 {
409 last.end = level_range.end;
410 continue;
411 }
412 level_slices.push(level_range);
413 }
414 }
415
416 Ok(Box::new(DecodeConstantTask {
417 scalar: self.scalar.clone(),
418 rep: self.rep.clone(),
419 def: self.def.clone(),
420 level_slices,
421 visible_items_total,
422 def_meaning: self.def_meaning.clone(),
423 max_visible_def: self.max_visible_def,
424 }))
425 }
426
427 fn num_rows(&self) -> u64 {
428 self.num_rows
429 }
430}
431
432#[derive(Debug)]
433struct DecodeConstantTask {
434 scalar: ArrayRef,
435 rep: Option<ScalarBuffer<u16>>,
436 def: Option<ScalarBuffer<u16>>,
437 level_slices: Vec<Range<usize>>,
438 visible_items_total: u64,
439 def_meaning: Arc<[DefinitionInterpretation]>,
440 max_visible_def: u16,
441}
442
443impl DecodeConstantTask {
444 fn slice_levels(
445 levels: &Option<ScalarBuffer<u16>>,
446 slices: &[Range<usize>],
447 ) -> Option<Vec<u16>> {
448 levels.as_ref().map(|levels| {
449 let total = slices.iter().map(|r| r.end - r.start).sum();
450 let mut out = Vec::with_capacity(total);
451 for r in slices {
452 out.extend(levels[r.start..r.end].iter().copied());
453 }
454 out
455 })
456 }
457
458 fn materialize_values(&self, num_values: u64) -> Result<ArrayRef> {
459 if num_values == 0 {
460 return Ok(new_empty_array(self.scalar.data_type()));
461 }
462
463 if let DataType::Struct(fields) = self.scalar.data_type()
464 && fields.is_empty()
465 {
466 return Ok(Arc::new(arrow_array::StructArray::new_empty_fields(
467 num_values as usize,
468 None,
469 )) as ArrayRef);
470 }
471
472 let indices = arrow_array::UInt64Array::from(vec![0u64; num_values as usize]);
473 Ok(arrow_select::take::take(
474 self.scalar.as_ref(),
475 &indices,
476 None,
477 )?)
478 }
479}
480
481impl crate::decoder::DecodePageTask for DecodeConstantTask {
482 fn decode(self: Box<Self>) -> Result<crate::decoder::DecodedPage> {
483 let rep = Self::slice_levels(&self.rep, &self.level_slices);
484 let def = Self::slice_levels(&self.def, &self.level_slices);
485
486 let visible_items_total = if let Some(def) = &def {
487 def.iter().filter(|d| **d <= self.max_visible_def).count() as u64
488 } else {
489 self.visible_items_total
490 };
491
492 let values = self.materialize_values(visible_items_total)?;
493 let data = crate::data::DataBlock::from_array(values);
494 let unraveler =
495 RepDefUnraveler::new(rep, def, self.def_meaning.clone(), visible_items_total);
496
497 Ok(crate::decoder::DecodedPage {
498 data,
499 repdef: unraveler,
500 })
501 }
502}