1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use crate::{
11 decoder::{
12 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
13 ScheduledScanLine, SchedulerContext,
14 },
15 previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
16};
17use arrow_array::{ArrayRef, StructArray};
18use arrow_schema::{DataType, Field, Fields};
19use futures::{FutureExt, StreamExt, TryStreamExt, future::BoxFuture, stream::FuturesUnordered};
20use lance_core::{Error, Result};
21use log::trace;
22
23#[derive(Debug)]
24struct SchedulingJobWithStatus<'a> {
25 col_idx: u32,
26 col_name: &'a str,
27 job: Box<dyn SchedulingJob + 'a>,
28 rows_scheduled: u64,
29 rows_remaining: u64,
30}
31
32impl PartialEq for SchedulingJobWithStatus<'_> {
33 fn eq(&self, other: &Self) -> bool {
34 self.col_idx == other.col_idx
35 }
36}
37
38impl Eq for SchedulingJobWithStatus<'_> {}
39
40impl PartialOrd for SchedulingJobWithStatus<'_> {
41 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
42 Some(self.cmp(other))
43 }
44}
45
46impl Ord for SchedulingJobWithStatus<'_> {
47 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48 other.rows_scheduled.cmp(&self.rows_scheduled)
50 }
51}
52
53#[derive(Debug)]
54struct EmptyStructDecodeTask {
55 num_rows: u64,
56}
57
58impl DecodeArrayTask for EmptyStructDecodeTask {
59 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
60 Ok((
63 Arc::new(StructArray::new_empty_fields(self.num_rows as usize, None)),
64 0,
65 ))
66 }
67}
68
69#[derive(Debug)]
70struct EmptyStructDecoder {
71 num_rows: u64,
72 rows_drained: u64,
73 data_type: DataType,
74}
75
76impl EmptyStructDecoder {
77 fn new(num_rows: u64) -> Self {
78 Self {
79 num_rows,
80 rows_drained: 0,
81 data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
82 }
83 }
84}
85
86impl LogicalPageDecoder for EmptyStructDecoder {
87 fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<'_, Result<()>> {
88 Box::pin(std::future::ready(Ok(())))
89 }
90 fn rows_loaded(&self) -> u64 {
91 self.num_rows
92 }
93 fn rows_unloaded(&self) -> u64 {
94 0
95 }
96 fn num_rows(&self) -> u64 {
97 self.num_rows
98 }
99 fn rows_drained(&self) -> u64 {
100 self.rows_drained
101 }
102 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
103 self.rows_drained += num_rows;
104 Ok(NextDecodeTask {
105 num_rows,
106 task: Box::new(EmptyStructDecodeTask { num_rows }),
107 })
108 }
109 fn data_type(&self) -> &DataType {
110 &self.data_type
111 }
112}
113
114#[derive(Debug)]
115struct EmptyStructSchedulerJob {
116 num_rows: u64,
117}
118
119impl SchedulingJob for EmptyStructSchedulerJob {
120 fn schedule_next(
121 &mut self,
122 context: &mut SchedulerContext,
123 _priority: &dyn PriorityRange,
124 ) -> Result<ScheduledScanLine> {
125 let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
126 #[allow(deprecated)]
127 let struct_decoder = context.locate_decoder(empty_decoder);
128 Ok(ScheduledScanLine {
129 decoders: vec![MessageType::DecoderReady(struct_decoder)],
130 rows_scheduled: self.num_rows,
131 })
132 }
133
134 fn num_rows(&self) -> u64 {
135 self.num_rows
136 }
137}
138
139#[derive(Debug)]
146struct SimpleStructSchedulerJob<'a> {
147 scheduler: &'a SimpleStructScheduler,
148 children: BinaryHeap<SchedulingJobWithStatus<'a>>,
150 rows_scheduled: u64,
151 num_rows: u64,
152 initialized: bool,
153}
154
155impl<'a> SimpleStructSchedulerJob<'a> {
156 fn new(
157 scheduler: &'a SimpleStructScheduler,
158 children: Vec<Box<dyn SchedulingJob + 'a>>,
159 num_rows: u64,
160 ) -> Self {
161 let children = children
162 .into_iter()
163 .enumerate()
164 .map(|(idx, job)| SchedulingJobWithStatus {
165 col_idx: idx as u32,
166 col_name: scheduler.child_fields[idx].name(),
167 job,
168 rows_scheduled: 0,
169 rows_remaining: num_rows,
170 })
171 .collect::<BinaryHeap<_>>();
172 Self {
173 scheduler,
174 children,
175 rows_scheduled: 0,
176 num_rows,
177 initialized: false,
178 }
179 }
180}
181
182impl SchedulingJob for SimpleStructSchedulerJob<'_> {
183 fn schedule_next(
184 &mut self,
185 mut context: &mut SchedulerContext,
186 priority: &dyn PriorityRange,
187 ) -> Result<ScheduledScanLine> {
188 let mut decoders = Vec::new();
189 if !self.initialized {
190 let struct_decoder = Box::new(SimpleStructDecoder::new(
193 self.scheduler.child_fields.clone(),
194 self.num_rows,
195 ));
196 #[allow(deprecated)]
197 let struct_decoder = context.locate_decoder(struct_decoder);
198 decoders.push(MessageType::DecoderReady(struct_decoder));
199 self.initialized = true;
200 }
201 let old_rows_scheduled = self.rows_scheduled;
202 while old_rows_scheduled == self.rows_scheduled {
205 let mut next_child = self.children.pop().unwrap();
206 trace!("Scheduling more rows for child {}", next_child.col_idx);
207 let scoped = context.push(next_child.col_name, next_child.col_idx);
208 let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
209 trace!(
210 "Scheduled {} rows for child {}",
211 child_scan.rows_scheduled, next_child.col_idx
212 );
213 next_child.rows_scheduled += child_scan.rows_scheduled;
214 next_child.rows_remaining -= child_scan.rows_scheduled;
215 decoders.extend(child_scan.decoders);
216 self.children.push(next_child);
217 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
218 context = scoped.pop();
219 }
220 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
221 Ok(ScheduledScanLine {
222 decoders,
223 rows_scheduled: struct_rows_scheduled,
224 })
225 }
226
227 fn num_rows(&self) -> u64 {
228 self.num_rows
229 }
230}
231
232#[derive(Debug)]
243pub struct SimpleStructScheduler {
244 children: Vec<Arc<dyn FieldScheduler>>,
245 child_fields: Fields,
246 num_rows: u64,
247}
248
249impl SimpleStructScheduler {
250 pub fn new(
251 children: Vec<Arc<dyn FieldScheduler>>,
252 child_fields: Fields,
253 num_rows: u64,
254 ) -> Self {
255 let num_rows = children
256 .first()
257 .map(|child| child.num_rows())
258 .unwrap_or(num_rows);
259 debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
260 Self {
261 children,
262 child_fields,
263 num_rows,
264 }
265 }
266}
267
268impl FieldScheduler for SimpleStructScheduler {
269 fn schedule_ranges<'a>(
270 &'a self,
271 ranges: &[Range<u64>],
272 filter: &FilterExpression,
273 ) -> Result<Box<dyn SchedulingJob + 'a>> {
274 if self.children.is_empty() {
275 return Ok(Box::new(EmptyStructSchedulerJob {
276 num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
277 }));
278 }
279 let child_schedulers = self
280 .children
281 .iter()
282 .map(|child| child.schedule_ranges(ranges, filter))
283 .collect::<Result<Vec<_>>>()?;
284 let num_rows = child_schedulers[0].num_rows();
285 Ok(Box::new(SimpleStructSchedulerJob::new(
286 self,
287 child_schedulers,
288 num_rows,
289 )))
290 }
291
292 fn num_rows(&self) -> u64 {
293 self.num_rows
294 }
295
296 fn initialize<'a>(
297 &'a self,
298 _filter: &'a FilterExpression,
299 _context: &'a SchedulerContext,
300 ) -> BoxFuture<'a, Result<()>> {
301 let futures = self
302 .children
303 .iter()
304 .map(|child| child.initialize(_filter, _context))
305 .collect::<FuturesUnordered<_>>();
306 async move {
307 futures
308 .map(|res| res.map(|_| ()))
309 .try_collect::<Vec<_>>()
310 .await?;
311 Ok(())
312 }
313 .boxed()
314 }
315}
316
317#[derive(Debug)]
318struct ChildState {
319 scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
330 rows_loaded: u64,
332 rows_drained: u64,
334 rows_popped: u64,
336 num_rows: u64,
338 field_index: u32,
340}
341
342impl ChildState {
343 fn new(num_rows: u64, field_index: u32) -> Self {
344 Self {
345 scheduled: VecDeque::new(),
346 rows_loaded: 0,
347 rows_drained: 0,
348 rows_popped: 0,
349 num_rows,
350 field_index,
351 }
352 }
353
354 async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
359 trace!(
360 "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
361 self.field_index, loaded_need, self.rows_loaded,
362 );
363 let mut fully_loaded = self.rows_popped;
364 for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
365 if next_decoder.rows_unloaded() > 0 {
366 let mut current_need = loaded_need;
367 current_need -= fully_loaded;
368 let rows_in_page = next_decoder.num_rows();
369 let need_for_page = (rows_in_page - 1).min(current_need);
370 trace!(
371 "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
372 self.field_index, page_idx, need_for_page, rows_in_page,
373 );
374 next_decoder.wait_for_loaded(need_for_page).await?;
380 let now_loaded = next_decoder.rows_loaded();
381 fully_loaded += now_loaded;
382 trace!(
383 "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
384 self.field_index, page_idx, now_loaded, fully_loaded
385 );
386 } else {
387 fully_loaded += next_decoder.num_rows();
388 }
389 if fully_loaded > loaded_need {
390 break;
391 }
392 }
393 self.rows_loaded = fully_loaded;
394 trace!(
395 "Struct child {} loaded {} new rows and now {} are loaded",
396 self.field_index, fully_loaded, self.rows_loaded
397 );
398 Ok(())
399 }
400
401 fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
402 trace!("Struct draining {} rows", num_rows);
403
404 trace!(
405 "Draining {} rows from struct page with {} rows already drained",
406 num_rows, self.rows_drained
407 );
408 let mut remaining = num_rows;
409 let mut composite = CompositeDecodeTask {
410 tasks: Vec::new(),
411 num_rows: 0,
412 has_more: true,
413 };
414 while remaining > 0 {
415 let next = self.scheduled.front_mut().unwrap();
416 let rows_to_take = remaining.min(next.rows_left());
417 let next_task = next.drain(rows_to_take)?;
418 if next.rows_left() == 0 {
419 trace!("Completely drained page");
420 self.rows_popped += next.num_rows();
421 self.scheduled.pop_front();
422 }
423 remaining -= rows_to_take;
424 composite.tasks.push(next_task.task);
425 composite.num_rows += next_task.num_rows;
426 }
427 self.rows_drained += num_rows;
428 composite.has_more = self.rows_drained != self.num_rows;
429 Ok(composite)
430 }
431}
432
433struct WaitOrder<'a>(&'a mut ChildState);
435
436impl Eq for WaitOrder<'_> {}
437impl PartialEq for WaitOrder<'_> {
438 fn eq(&self, other: &Self) -> bool {
439 self.0.rows_loaded == other.0.rows_loaded
440 }
441}
442impl Ord for WaitOrder<'_> {
443 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
444 other.0.rows_loaded.cmp(&self.0.rows_loaded)
446 }
447}
448impl PartialOrd for WaitOrder<'_> {
449 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
450 Some(self.cmp(other))
451 }
452}
453
454#[derive(Debug)]
455pub struct SimpleStructDecoder {
456 children: Vec<ChildState>,
457 child_fields: Fields,
458 data_type: DataType,
459 num_rows: u64,
460}
461
462impl SimpleStructDecoder {
463 pub fn new(child_fields: Fields, num_rows: u64) -> Self {
464 let data_type = DataType::Struct(child_fields.clone());
465 Self {
466 children: child_fields
467 .iter()
468 .enumerate()
469 .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
470 .collect(),
471 child_fields,
472 data_type,
473 num_rows,
474 }
475 }
476
477 async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
478 let mut wait_orders = self
479 .children
480 .iter_mut()
481 .filter_map(|child| {
482 if child.rows_loaded <= loaded_need {
483 Some(WaitOrder(child))
484 } else {
485 None
486 }
487 })
488 .collect::<BinaryHeap<_>>();
489 while !wait_orders.is_empty() {
490 let next_waiter = wait_orders.pop().unwrap();
491 let next_highest = wait_orders
492 .peek()
493 .map(|w| w.0.rows_loaded)
494 .unwrap_or(u64::MAX);
495 let limit = loaded_need.min(next_highest);
498 next_waiter.0.wait_for_loaded(limit).await?;
499 log::trace!(
500 "Struct child {} finished await pass and now {} are loaded",
501 next_waiter.0.field_index,
502 next_waiter.0.rows_loaded
503 );
504 if next_waiter.0.rows_loaded <= loaded_need {
505 wait_orders.push(next_waiter);
506 }
507 }
508 Ok(())
509 }
510}
511
512impl LogicalPageDecoder for SimpleStructDecoder {
513 fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
514 let child_idx = child.path.pop_front().unwrap();
516 if child.path.is_empty() {
517 self.children[child_idx as usize]
519 .scheduled
520 .push_back(child.decoder);
521 } else {
522 let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::internal(format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx)))?;
524 intended.accept_child(child)?;
525 }
526 Ok(())
527 }
528
529 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
530 self.do_wait_for_loaded(loaded_need).boxed()
531 }
532
533 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
534 let child_tasks = self
535 .children
536 .iter_mut()
537 .map(|child| child.drain(num_rows))
538 .collect::<Result<Vec<_>>>()?;
539 let num_rows = child_tasks[0].num_rows;
540 debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
541 Ok(NextDecodeTask {
542 task: Box::new(SimpleStructDecodeTask {
543 children: child_tasks,
544 child_fields: self.child_fields.clone(),
545 }),
546 num_rows,
547 })
548 }
549
550 fn rows_loaded(&self) -> u64 {
551 self.children.iter().map(|c| c.rows_loaded).min().unwrap()
552 }
553
554 fn rows_drained(&self) -> u64 {
555 debug_assert!(
557 self.children
558 .iter()
559 .all(|c| c.rows_drained == self.children[0].rows_drained)
560 );
561 self.children[0].rows_drained
562 }
563
564 fn num_rows(&self) -> u64 {
565 self.num_rows
566 }
567
568 fn data_type(&self) -> &DataType {
569 &self.data_type
570 }
571}
572
573struct CompositeDecodeTask {
574 tasks: Vec<Box<dyn DecodeArrayTask>>,
576 num_rows: u64,
577 has_more: bool,
578}
579
580impl CompositeDecodeTask {
581 fn decode(self) -> Result<ArrayRef> {
582 let arrays = self
583 .tasks
584 .into_iter()
585 .map(|task| task.decode().map(|(arr, _)| arr))
586 .collect::<Result<Vec<_>>>()?;
587 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
588 Ok(arrow_select::concat::concat(&array_refs)?)
595 }
596}
597
598struct SimpleStructDecodeTask {
599 children: Vec<CompositeDecodeTask>,
600 child_fields: Fields,
601}
602
603impl DecodeArrayTask for SimpleStructDecodeTask {
604 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
605 let child_arrays = self
606 .children
607 .into_iter()
608 .map(|child| child.decode())
609 .collect::<Result<Vec<_>>>()?;
610 Ok((
613 Arc::new(StructArray::try_new(self.child_fields, child_arrays, None)?),
614 0,
615 ))
616 }
617}