1use std::{collections::BinaryHeap, ops::Range, sync::Arc};
5
6use crate::{
7 decoder::{
8 DecodedArray, FilterExpression, LoadedPage, NextDecodeTask, PageEncoding,
9 ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
10 StructuralFieldScheduler, StructuralSchedulingJob,
11 },
12 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
13 format::pb,
14 repdef::{CompositeRepDefUnraveler, RepDefBuilder},
15};
16use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
17use arrow_schema::{DataType, Fields};
18use futures::{
19 future::BoxFuture,
20 stream::{FuturesOrdered, FuturesUnordered},
21 FutureExt, StreamExt, TryStreamExt,
22};
23use itertools::Itertools;
24use lance_arrow::FieldExt;
25use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
26use lance_core::Result;
27use log::trace;
28
29use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
30
31#[derive(Debug)]
32struct StructuralSchedulingJobWithStatus<'a> {
33 col_idx: u32,
34 col_name: &'a str,
35 job: Box<dyn StructuralSchedulingJob + 'a>,
36 rows_scheduled: u64,
37 rows_remaining: u64,
38}
39
40impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
41 fn eq(&self, other: &Self) -> bool {
42 self.col_idx == other.col_idx
43 }
44}
45
46impl Eq for StructuralSchedulingJobWithStatus<'_> {}
47
48impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
49 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
50 Some(self.cmp(other))
51 }
52}
53
54impl Ord for StructuralSchedulingJobWithStatus<'_> {
55 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
56 other.rows_scheduled.cmp(&self.rows_scheduled)
58 }
59}
60
61#[derive(Debug)]
68struct RepDefStructSchedulingJob<'a> {
69 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
71 rows_scheduled: u64,
72 num_rows: u64,
73}
74
75impl<'a> RepDefStructSchedulingJob<'a> {
76 fn new(
77 scheduler: &'a StructuralStructScheduler,
78 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
79 num_rows: u64,
80 ) -> Self {
81 let children = children
82 .into_iter()
83 .enumerate()
84 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
85 col_idx: idx as u32,
86 col_name: scheduler.child_fields[idx].name(),
87 job,
88 rows_scheduled: 0,
89 rows_remaining: num_rows,
90 })
91 .collect::<BinaryHeap<_>>();
92 Self {
93 children,
94 rows_scheduled: 0,
95 num_rows,
96 }
97 }
98}
99
100impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
101 fn schedule_next(
102 &mut self,
103 mut context: &mut SchedulerContext,
104 ) -> Result<Option<ScheduledScanLine>> {
105 if self.children.is_empty() {
106 if self.rows_scheduled == self.num_rows {
108 return Ok(None);
109 }
110 self.rows_scheduled = self.num_rows;
111 return Ok(Some(ScheduledScanLine {
112 decoders: Vec::new(),
113 rows_scheduled: self.num_rows,
114 }));
115 }
116
117 let mut decoders = Vec::new();
118 let old_rows_scheduled = self.rows_scheduled;
119 while old_rows_scheduled == self.rows_scheduled {
122 let mut next_child = self.children.pop().unwrap();
123 let scoped = context.push(next_child.col_name, next_child.col_idx);
124 let child_scan = next_child.job.schedule_next(scoped.context)?;
125 if child_scan.is_none() {
128 return Ok(None);
129 }
130 let child_scan = child_scan.unwrap();
131
132 trace!(
133 "Scheduled {} rows for child {}",
134 child_scan.rows_scheduled,
135 next_child.col_idx
136 );
137 next_child.rows_scheduled += child_scan.rows_scheduled;
138 next_child.rows_remaining -= child_scan.rows_scheduled;
139 decoders.extend(child_scan.decoders);
140 self.children.push(next_child);
141 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
142 context = scoped.pop();
143 }
144 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
145 Ok(Some(ScheduledScanLine {
146 decoders,
147 rows_scheduled: struct_rows_scheduled,
148 }))
149 }
150}
151
152#[derive(Debug)]
163pub struct StructuralStructScheduler {
164 children: Vec<Box<dyn StructuralFieldScheduler>>,
165 child_fields: Fields,
166}
167
168impl StructuralStructScheduler {
169 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
170 Self {
171 children,
172 child_fields,
173 }
174 }
175}
176
177impl StructuralFieldScheduler for StructuralStructScheduler {
178 fn schedule_ranges<'a>(
179 &'a self,
180 ranges: &[Range<u64>],
181 filter: &FilterExpression,
182 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
183 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
184
185 let child_schedulers = self
186 .children
187 .iter()
188 .map(|child| child.schedule_ranges(ranges, filter))
189 .collect::<Result<Vec<_>>>()?;
190
191 Ok(Box::new(RepDefStructSchedulingJob::new(
192 self,
193 child_schedulers,
194 num_rows,
195 )))
196 }
197
198 fn initialize<'a>(
199 &'a mut self,
200 filter: &'a FilterExpression,
201 context: &'a SchedulerContext,
202 ) -> BoxFuture<'a, Result<()>> {
203 let children_initialization = self
204 .children
205 .iter_mut()
206 .map(|child| child.initialize(filter, context))
207 .collect::<FuturesUnordered<_>>();
208 async move {
209 children_initialization
210 .map(|res| res.map(|_| ()))
211 .try_collect::<Vec<_>>()
212 .await?;
213 Ok(())
214 }
215 .boxed()
216 }
217}
218
219#[derive(Debug)]
220pub struct StructuralStructDecoder {
221 children: Vec<Box<dyn StructuralFieldDecoder>>,
222 data_type: DataType,
223 child_fields: Fields,
224 is_root: bool,
226}
227
228impl StructuralStructDecoder {
229 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
230 let children = fields
231 .iter()
232 .map(|field| Self::field_to_decoder(field, should_validate))
233 .collect();
234 let data_type = DataType::Struct(fields.clone());
235 Self {
236 data_type,
237 children,
238 child_fields: fields,
239 is_root,
240 }
241 }
242
243 fn field_to_decoder(
244 field: &Arc<arrow_schema::Field>,
245 should_validate: bool,
246 ) -> Box<dyn StructuralFieldDecoder> {
247 match field.data_type() {
248 DataType::Struct(fields) => {
249 if field.is_packed_struct() {
250 let decoder =
251 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
252 Box::new(decoder)
253 } else {
254 Box::new(Self::new(fields.clone(), should_validate, false))
255 }
256 }
257 DataType::List(child_field) | DataType::LargeList(child_field) => {
258 let child_decoder = Self::field_to_decoder(child_field, should_validate);
259 Box::new(StructuralListDecoder::new(
260 child_decoder,
261 field.data_type().clone(),
262 ))
263 }
264 DataType::RunEndEncoded(_, _) => todo!(),
265 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
266 DataType::Map(_, _) => todo!(),
267 DataType::Union(_, _) => todo!(),
268 _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
269 }
270 }
271
272 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
273 let array_drain = self.drain(num_rows)?;
274 Ok(NextDecodeTask {
275 num_rows,
276 task: Box::new(array_drain),
277 })
278 }
279}
280
281impl StructuralFieldDecoder for StructuralStructDecoder {
282 fn accept_page(&mut self, mut child: LoadedPage) -> Result<()> {
283 let child_idx = child.path.pop_front().unwrap();
285 self.children[child_idx as usize].accept_page(child)?;
287 Ok(())
288 }
289
290 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
291 let child_tasks = self
292 .children
293 .iter_mut()
294 .map(|child| child.drain(num_rows))
295 .collect::<Result<Vec<_>>>()?;
296 Ok(Box::new(RepDefStructDecodeTask {
297 children: child_tasks,
298 child_fields: self.child_fields.clone(),
299 is_root: self.is_root,
300 num_rows,
301 }))
302 }
303
304 fn data_type(&self) -> &DataType {
305 &self.data_type
306 }
307}
308
309#[derive(Debug)]
310struct RepDefStructDecodeTask {
311 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
312 child_fields: Fields,
313 is_root: bool,
314 num_rows: u64,
315}
316
317impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
318 fn decode(self: Box<Self>) -> Result<DecodedArray> {
319 if self.children.is_empty() {
320 return Ok(DecodedArray {
321 array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
322 repdef: CompositeRepDefUnraveler::new(vec![]),
323 });
324 }
325
326 let arrays = self
327 .children
328 .into_iter()
329 .map(|task| task.decode())
330 .collect::<Result<Vec<_>>>()?;
331 let mut children = Vec::with_capacity(arrays.len());
332 let mut arrays_iter = arrays.into_iter();
333 let first_array = arrays_iter.next().unwrap();
334 let length = first_array.array.len();
335
336 let mut repdef = first_array.repdef;
338 children.push(first_array.array);
339
340 for array in arrays_iter {
341 debug_assert_eq!(length, array.array.len());
342 children.push(array.array);
343 }
344
345 let validity = if self.is_root {
346 None
347 } else {
348 repdef.unravel_validity(length)
349 };
350 let array = StructArray::new(self.child_fields, children, validity);
351 Ok(DecodedArray {
352 array: Arc::new(array),
353 repdef,
354 })
355 }
356}
357
358pub struct StructStructuralEncoder {
363 keep_original_array: bool,
364 children: Vec<Box<dyn FieldEncoder>>,
365}
366
367impl StructStructuralEncoder {
368 pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
369 Self {
370 keep_original_array,
371 children,
372 }
373 }
374}
375
376impl FieldEncoder for StructStructuralEncoder {
377 fn maybe_encode(
378 &mut self,
379 array: ArrayRef,
380 external_buffers: &mut OutOfLineBuffers,
381 mut repdef: RepDefBuilder,
382 row_number: u64,
383 num_rows: u64,
384 ) -> Result<Vec<EncodeTask>> {
385 let struct_array = array.as_struct();
386 let mut struct_array = struct_array.normalize_slicing()?;
387 if let Some(validity) = struct_array.nulls() {
388 if self.keep_original_array {
389 repdef.add_validity_bitmap(validity.clone())
390 } else {
391 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
392 }
393 struct_array = struct_array.pushdown_nulls()?;
394 } else {
395 repdef.add_no_null(struct_array.len());
396 }
397 let child_tasks = self
398 .children
399 .iter_mut()
400 .zip(struct_array.columns().iter())
401 .map(|(encoder, arr)| {
402 encoder.maybe_encode(
403 arr.clone(),
404 external_buffers,
405 repdef.clone(),
406 row_number,
407 num_rows,
408 )
409 })
410 .collect::<Result<Vec<_>>>()?;
411 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
412 }
413
414 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
415 self.children
416 .iter_mut()
417 .map(|encoder| encoder.flush(external_buffers))
418 .flatten_ok()
419 .collect::<Result<Vec<_>>>()
420 }
421
422 fn num_columns(&self) -> u32 {
423 self.children
424 .iter()
425 .map(|child| child.num_columns())
426 .sum::<u32>()
427 }
428
429 fn finish(
430 &mut self,
431 external_buffers: &mut OutOfLineBuffers,
432 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
433 let mut child_columns = self
434 .children
435 .iter_mut()
436 .map(|child| child.finish(external_buffers))
437 .collect::<FuturesOrdered<_>>();
438 async move {
439 let mut encoded_columns = Vec::with_capacity(child_columns.len());
440 while let Some(child_cols) = child_columns.next().await {
441 encoded_columns.extend(child_cols?);
442 }
443 Ok(encoded_columns)
444 }
445 .boxed()
446 }
447}
448
449pub struct StructFieldEncoder {
450 children: Vec<Box<dyn FieldEncoder>>,
451 column_index: u32,
452 num_rows_seen: u64,
453}
454
455impl StructFieldEncoder {
456 #[allow(dead_code)]
457 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
458 Self {
459 children,
460 column_index,
461 num_rows_seen: 0,
462 }
463 }
464}
465
466impl FieldEncoder for StructFieldEncoder {
467 fn maybe_encode(
468 &mut self,
469 array: ArrayRef,
470 external_buffers: &mut OutOfLineBuffers,
471 repdef: RepDefBuilder,
472 row_number: u64,
473 num_rows: u64,
474 ) -> Result<Vec<EncodeTask>> {
475 self.num_rows_seen += array.len() as u64;
476 let struct_array = array.as_struct();
477 let child_tasks = self
478 .children
479 .iter_mut()
480 .zip(struct_array.columns().iter())
481 .map(|(encoder, arr)| {
482 encoder.maybe_encode(
483 arr.clone(),
484 external_buffers,
485 repdef.clone(),
486 row_number,
487 num_rows,
488 )
489 })
490 .collect::<Result<Vec<_>>>()?;
491 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
492 }
493
494 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
495 let child_tasks = self
496 .children
497 .iter_mut()
498 .map(|encoder| encoder.flush(external_buffers))
499 .collect::<Result<Vec<_>>>()?;
500 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
501 }
502
503 fn num_columns(&self) -> u32 {
504 self.children
505 .iter()
506 .map(|child| child.num_columns())
507 .sum::<u32>()
508 + 1
509 }
510
511 fn finish(
512 &mut self,
513 external_buffers: &mut OutOfLineBuffers,
514 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
515 let mut child_columns = self
516 .children
517 .iter_mut()
518 .map(|child| child.finish(external_buffers))
519 .collect::<FuturesOrdered<_>>();
520 let num_rows_seen = self.num_rows_seen;
521 let column_index = self.column_index;
522 async move {
523 let mut columns = Vec::new();
524 let mut header = EncodedColumn::default();
526 header.final_pages.push(EncodedPage {
527 data: Vec::new(),
528 description: PageEncoding::Legacy(pb::ArrayEncoding {
529 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
530 pb::SimpleStruct {},
531 )),
532 }),
533 num_rows: num_rows_seen,
534 column_idx: column_index,
535 row_number: 0, });
537 columns.push(header);
538 while let Some(child_cols) = child_columns.next().await {
540 columns.extend(child_cols?);
541 }
542 Ok(columns)
543 }
544 .boxed()
545 }
546}
547
548#[cfg(test)]
549mod tests {
550
551 use std::{collections::HashMap, sync::Arc};
552
553 use arrow_array::{
554 builder::{Int32Builder, ListBuilder},
555 Array, ArrayRef, Int32Array, ListArray, StructArray,
556 };
557 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
558 use arrow_schema::{DataType, Field, Fields};
559
560 use crate::{
561 testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases},
562 version::LanceFileVersion,
563 };
564
565 #[test_log::test(tokio::test)]
566 async fn test_simple_struct() {
567 let data_type = DataType::Struct(Fields::from(vec![
568 Field::new("a", DataType::Int32, false),
569 Field::new("b", DataType::Int32, false),
570 ]));
571 let field = Field::new("", data_type, false);
572 check_basic_random(field).await;
573 }
574
575 #[test_log::test(tokio::test)]
576 async fn test_nullable_struct() {
577 let inner_fields = Fields::from(vec![
595 Field::new("x", DataType::Int32, false),
596 Field::new("y", DataType::Int32, true),
597 ]);
598 let inner_struct = DataType::Struct(inner_fields.clone());
599 let outer_fields = Fields::from(vec![
600 Field::new("score", DataType::Int32, true),
601 Field::new("location", inner_struct, true),
602 ]);
603
604 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
605 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
606 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
607
608 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
609 let locations = StructArray::new(
610 inner_fields,
611 vec![Arc::new(x_vals), Arc::new(y_vals)],
612 Some(location_validity),
613 );
614
615 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
616 let rows = StructArray::new(
617 outer_fields,
618 vec![Arc::new(scores), Arc::new(locations)],
619 Some(rows_validity),
620 );
621
622 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
623
624 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
625 }
626
627 #[test_log::test(tokio::test)]
628 async fn test_simple_masked_nonempty_list() {
629 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
632 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
633 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
634 let list_array = ListArray::new(
635 Arc::new(Field::new("item", DataType::Int32, true)),
636 offsets,
637 Arc::new(items),
638 Some(NullBuffer::new(list_validity)),
639 );
640 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
641 let struct_array = StructArray::new(
642 Fields::from(vec![Field::new(
643 "inner_list",
644 list_array.data_type().clone(),
645 true,
646 )]),
647 vec![Arc::new(list_array)],
648 Some(NullBuffer::new(struct_validity)),
649 );
650 check_round_trip_encoding_of_data(
651 vec![Arc::new(struct_array)],
652 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
653 HashMap::new(),
654 )
655 .await;
656 }
657
658 #[test_log::test(tokio::test)]
659 async fn test_simple_struct_list() {
660 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
663 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
664 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
665 let list_array = ListArray::new(
666 Arc::new(Field::new("item", DataType::Int32, true)),
667 offsets,
668 Arc::new(items),
669 Some(NullBuffer::new(list_validity)),
670 );
671 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
672 let struct_array = StructArray::new(
673 Fields::from(vec![Field::new(
674 "inner_list",
675 list_array.data_type().clone(),
676 true,
677 )]),
678 vec![Arc::new(list_array)],
679 Some(NullBuffer::new(struct_validity)),
680 );
681 check_round_trip_encoding_of_data(
682 vec![Arc::new(struct_array)],
683 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
684 HashMap::new(),
685 )
686 .await;
687 }
688
689 #[test_log::test(tokio::test)]
690 async fn test_struct_list() {
691 let data_type = DataType::Struct(Fields::from(vec![
692 Field::new(
693 "inner_list",
694 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
695 true,
696 ),
697 Field::new("outer_int", DataType::Int32, true),
698 ]));
699 let field = Field::new("row", data_type, false);
700 check_basic_random(field).await;
701 }
702
703 #[test_log::test(tokio::test)]
704 async fn test_empty_struct() {
705 let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
708 let field = Field::new("row", data_type, false);
709 check_basic_random(field).await;
710 }
711
712 #[test_log::test(tokio::test)]
713 async fn test_complicated_struct() {
714 let data_type = DataType::Struct(Fields::from(vec![
715 Field::new("int", DataType::Int32, true),
716 Field::new(
717 "inner",
718 DataType::Struct(Fields::from(vec![
719 Field::new("inner_int", DataType::Int32, true),
720 Field::new(
721 "inner_list",
722 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
723 true,
724 ),
725 ])),
726 true,
727 ),
728 Field::new("outer_binary", DataType::Binary, true),
729 ]));
730 let field = Field::new("row", data_type, false);
731 check_basic_random(field).await;
732 }
733
734 #[test_log::test(tokio::test)]
735 async fn test_ragged_scheduling() {
736 let items_builder = Int32Builder::new();
740 let mut list_builder = ListBuilder::new(items_builder);
741 for _ in 0..10000 {
742 list_builder.append_null();
743 }
744 let list_array = Arc::new(list_builder.finish());
745 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
746 let fields = vec![
747 Field::new("", list_array.data_type().clone(), true),
748 Field::new("", int_array.data_type().clone(), true),
749 ];
750 let struct_array = Arc::new(StructArray::new(
751 Fields::from(fields),
752 vec![list_array, int_array],
753 None,
754 )) as ArrayRef;
755 let struct_arrays = (0..10000)
756 .step_by(437)
758 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
759 .collect::<Vec<_>>();
760 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
761 .await;
762 }
763}