1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use crate::{
11 decoder::{
12 DecodedArray, FilterExpression, LoadedPageShard, NextDecodeTask, PageEncoding,
13 ScheduledScanLine, SchedulerContext, StructuralDecodeArrayTask, StructuralFieldDecoder,
14 StructuralFieldScheduler, StructuralSchedulingJob,
15 },
16 encoder::{EncodeTask, EncodedColumn, EncodedPage, FieldEncoder, OutOfLineBuffers},
17 format::pb,
18 repdef::{CompositeRepDefUnraveler, RepDefBuilder},
19};
20use arrow_array::{cast::AsArray, Array, ArrayRef, StructArray};
21use arrow_schema::{DataType, Fields};
22use futures::{
23 future::BoxFuture,
24 stream::{FuturesOrdered, FuturesUnordered},
25 FutureExt, StreamExt, TryStreamExt,
26};
27use itertools::Itertools;
28use lance_arrow::FieldExt;
29use lance_arrow::{deepcopy::deep_copy_nulls, r#struct::StructArrayExt};
30use lance_core::Result;
31use log::trace;
32
33use super::{list::StructuralListDecoder, primitive::StructuralPrimitiveFieldDecoder};
34
35#[derive(Debug)]
36struct StructuralSchedulingJobWithStatus<'a> {
37 col_idx: u32,
38 col_name: &'a str,
39 job: Box<dyn StructuralSchedulingJob + 'a>,
40 rows_scheduled: u64,
41 rows_remaining: u64,
42 ready_scan_lines: VecDeque<ScheduledScanLine>,
43}
44
45impl PartialEq for StructuralSchedulingJobWithStatus<'_> {
46 fn eq(&self, other: &Self) -> bool {
47 self.col_idx == other.col_idx
48 }
49}
50
51impl Eq for StructuralSchedulingJobWithStatus<'_> {}
52
53impl PartialOrd for StructuralSchedulingJobWithStatus<'_> {
54 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
55 Some(self.cmp(other))
56 }
57}
58
59impl Ord for StructuralSchedulingJobWithStatus<'_> {
60 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
61 other.rows_scheduled.cmp(&self.rows_scheduled)
63 }
64}
65
66#[derive(Debug)]
73struct RepDefStructSchedulingJob<'a> {
74 children: BinaryHeap<StructuralSchedulingJobWithStatus<'a>>,
76 rows_scheduled: u64,
77 num_rows: u64,
78}
79
80impl<'a> RepDefStructSchedulingJob<'a> {
81 fn new(
82 scheduler: &'a StructuralStructScheduler,
83 children: Vec<Box<dyn StructuralSchedulingJob + 'a>>,
84 num_rows: u64,
85 ) -> Self {
86 let children = children
87 .into_iter()
88 .enumerate()
89 .map(|(idx, job)| StructuralSchedulingJobWithStatus {
90 col_idx: idx as u32,
91 col_name: scheduler.child_fields[idx].name(),
92 job,
93 rows_scheduled: 0,
94 rows_remaining: num_rows,
95 ready_scan_lines: VecDeque::new(),
96 })
97 .collect::<BinaryHeap<_>>();
98 Self {
99 children,
100 rows_scheduled: 0,
101 num_rows,
102 }
103 }
104}
105
106impl StructuralSchedulingJob for RepDefStructSchedulingJob<'_> {
107 fn schedule_next(
108 &mut self,
109 mut context: &mut SchedulerContext,
110 ) -> Result<Vec<ScheduledScanLine>> {
111 if self.children.is_empty() {
112 if self.rows_scheduled == self.num_rows {
114 return Ok(Vec::new());
115 }
116 self.rows_scheduled = self.num_rows;
117 return Ok(vec![ScheduledScanLine {
118 decoders: Vec::new(),
119 rows_scheduled: self.num_rows,
120 }]);
121 }
122
123 let mut decoders = Vec::new();
124 let old_rows_scheduled = self.rows_scheduled;
125 while old_rows_scheduled == self.rows_scheduled {
128 if self.children.is_empty() {
129 return Ok(Vec::new());
131 }
132 let mut next_child = self.children.pop().unwrap();
133 if next_child.ready_scan_lines.is_empty() {
134 let scoped = context.push(next_child.col_name, next_child.col_idx);
135 let child_scans = next_child.job.schedule_next(scoped.context)?;
136 context = scoped.pop();
137 if child_scans.is_empty() {
138 continue;
140 }
141 next_child.ready_scan_lines.extend(child_scans);
142 }
143 let child_scan = next_child.ready_scan_lines.pop_front().unwrap();
144 trace!(
145 "Scheduled {} rows for child {}",
146 child_scan.rows_scheduled,
147 next_child.col_idx
148 );
149 next_child.rows_scheduled += child_scan.rows_scheduled;
150 next_child.rows_remaining -= child_scan.rows_scheduled;
151 decoders.extend(child_scan.decoders);
152 self.children.push(next_child);
153 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
154 }
155 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
156 Ok(vec![ScheduledScanLine {
157 decoders,
158 rows_scheduled: struct_rows_scheduled,
159 }])
160 }
161}
162
163#[derive(Debug)]
174pub struct StructuralStructScheduler {
175 children: Vec<Box<dyn StructuralFieldScheduler>>,
176 child_fields: Fields,
177}
178
179impl StructuralStructScheduler {
180 pub fn new(children: Vec<Box<dyn StructuralFieldScheduler>>, child_fields: Fields) -> Self {
181 Self {
182 children,
183 child_fields,
184 }
185 }
186}
187
188impl StructuralFieldScheduler for StructuralStructScheduler {
189 fn schedule_ranges<'a>(
190 &'a self,
191 ranges: &[Range<u64>],
192 filter: &FilterExpression,
193 ) -> Result<Box<dyn StructuralSchedulingJob + 'a>> {
194 let num_rows = ranges.iter().map(|r| r.end - r.start).sum();
195
196 let child_schedulers = self
197 .children
198 .iter()
199 .map(|child| child.schedule_ranges(ranges, filter))
200 .collect::<Result<Vec<_>>>()?;
201
202 Ok(Box::new(RepDefStructSchedulingJob::new(
203 self,
204 child_schedulers,
205 num_rows,
206 )))
207 }
208
209 fn initialize<'a>(
210 &'a mut self,
211 filter: &'a FilterExpression,
212 context: &'a SchedulerContext,
213 ) -> BoxFuture<'a, Result<()>> {
214 let children_initialization = self
215 .children
216 .iter_mut()
217 .map(|child| child.initialize(filter, context))
218 .collect::<FuturesUnordered<_>>();
219 async move {
220 children_initialization
221 .map(|res| res.map(|_| ()))
222 .try_collect::<Vec<_>>()
223 .await?;
224 Ok(())
225 }
226 .boxed()
227 }
228}
229
230#[derive(Debug)]
231pub struct StructuralStructDecoder {
232 children: Vec<Box<dyn StructuralFieldDecoder>>,
233 data_type: DataType,
234 child_fields: Fields,
235 is_root: bool,
237}
238
239impl StructuralStructDecoder {
240 pub fn new(fields: Fields, should_validate: bool, is_root: bool) -> Self {
241 let children = fields
242 .iter()
243 .map(|field| Self::field_to_decoder(field, should_validate))
244 .collect();
245 let data_type = DataType::Struct(fields.clone());
246 Self {
247 data_type,
248 children,
249 child_fields: fields,
250 is_root,
251 }
252 }
253
254 fn field_to_decoder(
255 field: &Arc<arrow_schema::Field>,
256 should_validate: bool,
257 ) -> Box<dyn StructuralFieldDecoder> {
258 match field.data_type() {
259 DataType::Struct(fields) => {
260 if field.is_packed_struct() || field.is_blob() {
261 let decoder =
262 StructuralPrimitiveFieldDecoder::new(&field.clone(), should_validate);
263 Box::new(decoder)
264 } else {
265 Box::new(Self::new(fields.clone(), should_validate, false))
266 }
267 }
268 DataType::List(child_field) | DataType::LargeList(child_field) => {
269 let child_decoder = Self::field_to_decoder(child_field, should_validate);
270 Box::new(StructuralListDecoder::new(
271 child_decoder,
272 field.data_type().clone(),
273 ))
274 }
275 DataType::RunEndEncoded(_, _) => todo!(),
276 DataType::ListView(_) | DataType::LargeListView(_) => todo!(),
277 DataType::Map(_, _) => todo!(),
278 DataType::Union(_, _) => todo!(),
279 _ => Box::new(StructuralPrimitiveFieldDecoder::new(field, should_validate)),
280 }
281 }
282
283 pub fn drain_batch_task(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
284 let array_drain = self.drain(num_rows)?;
285 Ok(NextDecodeTask {
286 num_rows,
287 task: Box::new(array_drain),
288 })
289 }
290}
291
292impl StructuralFieldDecoder for StructuralStructDecoder {
293 fn accept_page(&mut self, mut child: LoadedPageShard) -> Result<()> {
294 let child_idx = child.path.pop_front().unwrap();
296 self.children[child_idx as usize].accept_page(child)?;
298 Ok(())
299 }
300
301 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn StructuralDecodeArrayTask>> {
302 let child_tasks = self
303 .children
304 .iter_mut()
305 .map(|child| child.drain(num_rows))
306 .collect::<Result<Vec<_>>>()?;
307 Ok(Box::new(RepDefStructDecodeTask {
308 children: child_tasks,
309 child_fields: self.child_fields.clone(),
310 is_root: self.is_root,
311 num_rows,
312 }))
313 }
314
315 fn data_type(&self) -> &DataType {
316 &self.data_type
317 }
318}
319
320#[derive(Debug)]
321struct RepDefStructDecodeTask {
322 children: Vec<Box<dyn StructuralDecodeArrayTask>>,
323 child_fields: Fields,
324 is_root: bool,
325 num_rows: u64,
326}
327
328impl StructuralDecodeArrayTask for RepDefStructDecodeTask {
329 fn decode(self: Box<Self>) -> Result<DecodedArray> {
330 if self.children.is_empty() {
331 return Ok(DecodedArray {
332 array: Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
333 repdef: CompositeRepDefUnraveler::new(vec![]),
334 });
335 }
336
337 let arrays = self
338 .children
339 .into_iter()
340 .map(|task| task.decode())
341 .collect::<Result<Vec<_>>>()?;
342 let mut children = Vec::with_capacity(arrays.len());
343 let mut arrays_iter = arrays.into_iter();
344 let first_array = arrays_iter.next().unwrap();
345 let length = first_array.array.len();
346
347 let mut repdef = first_array.repdef;
349 children.push(first_array.array);
350
351 for array in arrays_iter {
352 debug_assert_eq!(length, array.array.len());
353 children.push(array.array);
354 }
355
356 let validity = if self.is_root {
357 None
358 } else {
359 repdef.unravel_validity(length)
360 };
361
362 let array = StructArray::new(self.child_fields, children, validity);
363 Ok(DecodedArray {
364 array: Arc::new(array),
365 repdef,
366 })
367 }
368}
369
370pub struct StructStructuralEncoder {
375 keep_original_array: bool,
376 children: Vec<Box<dyn FieldEncoder>>,
377}
378
379impl StructStructuralEncoder {
380 pub fn new(keep_original_array: bool, children: Vec<Box<dyn FieldEncoder>>) -> Self {
381 Self {
382 keep_original_array,
383 children,
384 }
385 }
386}
387
388impl FieldEncoder for StructStructuralEncoder {
389 fn maybe_encode(
390 &mut self,
391 array: ArrayRef,
392 external_buffers: &mut OutOfLineBuffers,
393 mut repdef: RepDefBuilder,
394 row_number: u64,
395 num_rows: u64,
396 ) -> Result<Vec<EncodeTask>> {
397 let struct_array = array.as_struct();
398 let mut struct_array = struct_array.normalize_slicing()?;
399 if let Some(validity) = struct_array.nulls() {
400 if self.keep_original_array {
401 repdef.add_validity_bitmap(validity.clone())
402 } else {
403 repdef.add_validity_bitmap(deep_copy_nulls(Some(validity)).unwrap())
404 }
405 struct_array = struct_array.pushdown_nulls()?;
406 } else {
407 repdef.add_no_null(struct_array.len());
408 }
409 let child_tasks = self
410 .children
411 .iter_mut()
412 .zip(struct_array.columns().iter())
413 .map(|(encoder, arr)| {
414 encoder.maybe_encode(
415 arr.clone(),
416 external_buffers,
417 repdef.clone(),
418 row_number,
419 num_rows,
420 )
421 })
422 .collect::<Result<Vec<_>>>()?;
423 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
424 }
425
426 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
427 self.children
428 .iter_mut()
429 .map(|encoder| encoder.flush(external_buffers))
430 .flatten_ok()
431 .collect::<Result<Vec<_>>>()
432 }
433
434 fn num_columns(&self) -> u32 {
435 self.children
436 .iter()
437 .map(|child| child.num_columns())
438 .sum::<u32>()
439 }
440
441 fn finish(
442 &mut self,
443 external_buffers: &mut OutOfLineBuffers,
444 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
445 let mut child_columns = self
446 .children
447 .iter_mut()
448 .map(|child| child.finish(external_buffers))
449 .collect::<FuturesOrdered<_>>();
450 async move {
451 let mut encoded_columns = Vec::with_capacity(child_columns.len());
452 while let Some(child_cols) = child_columns.next().await {
453 encoded_columns.extend(child_cols?);
454 }
455 Ok(encoded_columns)
456 }
457 .boxed()
458 }
459}
460
461pub struct StructFieldEncoder {
462 children: Vec<Box<dyn FieldEncoder>>,
463 column_index: u32,
464 num_rows_seen: u64,
465}
466
467impl StructFieldEncoder {
468 #[allow(dead_code)]
469 pub fn new(children: Vec<Box<dyn FieldEncoder>>, column_index: u32) -> Self {
470 Self {
471 children,
472 column_index,
473 num_rows_seen: 0,
474 }
475 }
476}
477
478impl FieldEncoder for StructFieldEncoder {
479 fn maybe_encode(
480 &mut self,
481 array: ArrayRef,
482 external_buffers: &mut OutOfLineBuffers,
483 repdef: RepDefBuilder,
484 row_number: u64,
485 num_rows: u64,
486 ) -> Result<Vec<EncodeTask>> {
487 self.num_rows_seen += array.len() as u64;
488 let struct_array = array.as_struct();
489 let child_tasks = self
490 .children
491 .iter_mut()
492 .zip(struct_array.columns().iter())
493 .map(|(encoder, arr)| {
494 encoder.maybe_encode(
495 arr.clone(),
496 external_buffers,
497 repdef.clone(),
498 row_number,
499 num_rows,
500 )
501 })
502 .collect::<Result<Vec<_>>>()?;
503 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
504 }
505
506 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
507 let child_tasks = self
508 .children
509 .iter_mut()
510 .map(|encoder| encoder.flush(external_buffers))
511 .collect::<Result<Vec<_>>>()?;
512 Ok(child_tasks.into_iter().flatten().collect::<Vec<_>>())
513 }
514
515 fn num_columns(&self) -> u32 {
516 self.children
517 .iter()
518 .map(|child| child.num_columns())
519 .sum::<u32>()
520 + 1
521 }
522
523 fn finish(
524 &mut self,
525 external_buffers: &mut OutOfLineBuffers,
526 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
527 let mut child_columns = self
528 .children
529 .iter_mut()
530 .map(|child| child.finish(external_buffers))
531 .collect::<FuturesOrdered<_>>();
532 let num_rows_seen = self.num_rows_seen;
533 let column_index = self.column_index;
534 async move {
535 let mut columns = Vec::new();
536 let mut header = EncodedColumn::default();
538 header.final_pages.push(EncodedPage {
539 data: Vec::new(),
540 description: PageEncoding::Legacy(pb::ArrayEncoding {
541 array_encoding: Some(pb::array_encoding::ArrayEncoding::Struct(
542 pb::SimpleStruct {},
543 )),
544 }),
545 num_rows: num_rows_seen,
546 column_idx: column_index,
547 row_number: 0, });
549 columns.push(header);
550 while let Some(child_cols) = child_columns.next().await {
552 columns.extend(child_cols?);
553 }
554 Ok(columns)
555 }
556 .boxed()
557 }
558}
559
560#[cfg(test)]
561mod tests {
562
563 use std::{collections::HashMap, sync::Arc};
564
565 use arrow_array::{
566 builder::{Int32Builder, ListBuilder},
567 Array, ArrayRef, Int32Array, ListArray, StructArray,
568 };
569 use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
570 use arrow_schema::{DataType, Field, Fields};
571
572 use crate::{
573 testing::{check_basic_random, check_round_trip_encoding_of_data, TestCases},
574 version::LanceFileVersion,
575 };
576
577 #[test_log::test(tokio::test)]
578 async fn test_simple_struct() {
579 let data_type = DataType::Struct(Fields::from(vec![
580 Field::new("a", DataType::Int32, false),
581 Field::new("b", DataType::Int32, false),
582 ]));
583 let field = Field::new("", data_type, false);
584 check_basic_random(field).await;
585 }
586
587 #[test_log::test(tokio::test)]
588 async fn test_nullable_struct() {
589 let inner_fields = Fields::from(vec![
607 Field::new("x", DataType::Int32, false),
608 Field::new("y", DataType::Int32, true),
609 ]);
610 let inner_struct = DataType::Struct(inner_fields.clone());
611 let outer_fields = Fields::from(vec![
612 Field::new("score", DataType::Int32, true),
613 Field::new("location", inner_struct, true),
614 ]);
615
616 let x_vals = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
617 let y_vals = Int32Array::from(vec![Some(6), None, Some(8), Some(9), Some(10)]);
618 let scores = Int32Array::from(vec![None, Some(12), Some(13), Some(14), Some(15)]);
619
620 let location_validity = NullBuffer::from(vec![true, true, true, false, true]);
621 let locations = StructArray::new(
622 inner_fields,
623 vec![Arc::new(x_vals), Arc::new(y_vals)],
624 Some(location_validity),
625 );
626
627 let rows_validity = NullBuffer::from(vec![true, true, true, true, false]);
628 let rows = StructArray::new(
629 outer_fields,
630 vec![Arc::new(scores), Arc::new(locations)],
631 Some(rows_validity),
632 );
633
634 let test_cases = TestCases::default().with_min_file_version(LanceFileVersion::V2_1);
635
636 check_round_trip_encoding_of_data(vec![Arc::new(rows)], &test_cases, HashMap::new()).await;
637 }
638
639 #[test_log::test(tokio::test)]
640 async fn test_simple_masked_nonempty_list() {
641 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4), Some(5), Some(6)]);
644 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 5]));
645 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
646 let list_array = ListArray::new(
647 Arc::new(Field::new("item", DataType::Int32, true)),
648 offsets,
649 Arc::new(items),
650 Some(NullBuffer::new(list_validity)),
651 );
652 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
653 let struct_array = StructArray::new(
654 Fields::from(vec![Field::new(
655 "inner_list",
656 list_array.data_type().clone(),
657 true,
658 )]),
659 vec![Arc::new(list_array)],
660 Some(NullBuffer::new(struct_validity)),
661 );
662 check_round_trip_encoding_of_data(
663 vec![Arc::new(struct_array)],
664 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
665 HashMap::new(),
666 )
667 .await;
668 }
669
670 #[test_log::test(tokio::test)]
671 async fn test_simple_struct_list() {
672 let items = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
675 let offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 3, 4, 4, 4, 4]));
676 let list_validity = BooleanBuffer::from(vec![true, true, true, true, false, true]);
677 let list_array = ListArray::new(
678 Arc::new(Field::new("item", DataType::Int32, true)),
679 offsets,
680 Arc::new(items),
681 Some(NullBuffer::new(list_validity)),
682 );
683 let struct_validity = BooleanBuffer::from(vec![true, true, true, true, true, false]);
684 let struct_array = StructArray::new(
685 Fields::from(vec![Field::new(
686 "inner_list",
687 list_array.data_type().clone(),
688 true,
689 )]),
690 vec![Arc::new(list_array)],
691 Some(NullBuffer::new(struct_validity)),
692 );
693 check_round_trip_encoding_of_data(
694 vec![Arc::new(struct_array)],
695 &TestCases::default().with_min_file_version(LanceFileVersion::V2_1),
696 HashMap::new(),
697 )
698 .await;
699 }
700
701 #[test_log::test(tokio::test)]
702 async fn test_struct_list() {
703 let data_type = DataType::Struct(Fields::from(vec![
704 Field::new(
705 "inner_list",
706 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
707 true,
708 ),
709 Field::new("outer_int", DataType::Int32, true),
710 ]));
711 let field = Field::new("row", data_type, false);
712 check_basic_random(field).await;
713 }
714
715 #[test_log::test(tokio::test)]
716 async fn test_empty_struct() {
717 let data_type = DataType::Struct(Fields::from(Vec::<Field>::default()));
720 let field = Field::new("row", data_type, false);
721 check_basic_random(field).await;
722 }
723
724 #[test_log::test(tokio::test)]
725 async fn test_complicated_struct() {
726 let data_type = DataType::Struct(Fields::from(vec![
727 Field::new("int", DataType::Int32, true),
728 Field::new(
729 "inner",
730 DataType::Struct(Fields::from(vec![
731 Field::new("inner_int", DataType::Int32, true),
732 Field::new(
733 "inner_list",
734 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
735 true,
736 ),
737 ])),
738 true,
739 ),
740 Field::new("outer_binary", DataType::Binary, true),
741 ]));
742 let field = Field::new("row", data_type, false);
743 check_basic_random(field).await;
744 }
745
746 #[test_log::test(tokio::test)]
747 async fn test_ragged_scheduling() {
748 let items_builder = Int32Builder::new();
752 let mut list_builder = ListBuilder::new(items_builder);
753 for _ in 0..10000 {
754 list_builder.append_null();
755 }
756 let list_array = Arc::new(list_builder.finish());
757 let int_array = Arc::new(Int32Array::from_iter_values(0..10000));
758 let fields = vec![
759 Field::new("", list_array.data_type().clone(), true),
760 Field::new("", int_array.data_type().clone(), true),
761 ];
762 let struct_array = Arc::new(StructArray::new(
763 Fields::from(fields),
764 vec![list_array, int_array],
765 None,
766 )) as ArrayRef;
767 let struct_arrays = (0..10000)
768 .step_by(437)
770 .map(|offset| struct_array.slice(offset, 437.min(10000 - offset)))
771 .collect::<Vec<_>>();
772 check_round_trip_encoding_of_data(struct_arrays, &TestCases::default(), HashMap::new())
773 .await;
774 }
775}