1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use crate::{
11 decoder::{
12 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
13 ScheduledScanLine, SchedulerContext,
14 },
15 previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
16};
17use arrow_array::{ArrayRef, StructArray};
18use arrow_schema::{DataType, Field, Fields};
19use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt};
20use lance_core::{Error, Result};
21use log::trace;
22use snafu::location;
23
24#[derive(Debug)]
25struct SchedulingJobWithStatus<'a> {
26 col_idx: u32,
27 col_name: &'a str,
28 job: Box<dyn SchedulingJob + 'a>,
29 rows_scheduled: u64,
30 rows_remaining: u64,
31}
32
33impl PartialEq for SchedulingJobWithStatus<'_> {
34 fn eq(&self, other: &Self) -> bool {
35 self.col_idx == other.col_idx
36 }
37}
38
39impl Eq for SchedulingJobWithStatus<'_> {}
40
41impl PartialOrd for SchedulingJobWithStatus<'_> {
42 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
43 Some(self.cmp(other))
44 }
45}
46
47impl Ord for SchedulingJobWithStatus<'_> {
48 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
49 other.rows_scheduled.cmp(&self.rows_scheduled)
51 }
52}
53
54#[derive(Debug)]
55struct EmptyStructDecodeTask {
56 num_rows: u64,
57}
58
59impl DecodeArrayTask for EmptyStructDecodeTask {
60 fn decode(self: Box<Self>) -> Result<ArrayRef> {
61 Ok(Arc::new(StructArray::new_empty_fields(
62 self.num_rows as usize,
63 None,
64 )))
65 }
66}
67
68#[derive(Debug)]
69struct EmptyStructDecoder {
70 num_rows: u64,
71 rows_drained: u64,
72 data_type: DataType,
73}
74
75impl EmptyStructDecoder {
76 fn new(num_rows: u64) -> Self {
77 Self {
78 num_rows,
79 rows_drained: 0,
80 data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
81 }
82 }
83}
84
85impl LogicalPageDecoder for EmptyStructDecoder {
86 fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<'_, Result<()>> {
87 Box::pin(std::future::ready(Ok(())))
88 }
89 fn rows_loaded(&self) -> u64 {
90 self.num_rows
91 }
92 fn rows_unloaded(&self) -> u64 {
93 0
94 }
95 fn num_rows(&self) -> u64 {
96 self.num_rows
97 }
98 fn rows_drained(&self) -> u64 {
99 self.rows_drained
100 }
101 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
102 self.rows_drained += num_rows;
103 Ok(NextDecodeTask {
104 num_rows,
105 task: Box::new(EmptyStructDecodeTask { num_rows }),
106 })
107 }
108 fn data_type(&self) -> &DataType {
109 &self.data_type
110 }
111}
112
113#[derive(Debug)]
114struct EmptyStructSchedulerJob {
115 num_rows: u64,
116}
117
118impl SchedulingJob for EmptyStructSchedulerJob {
119 fn schedule_next(
120 &mut self,
121 context: &mut SchedulerContext,
122 _priority: &dyn PriorityRange,
123 ) -> Result<ScheduledScanLine> {
124 let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
125 #[allow(deprecated)]
126 let struct_decoder = context.locate_decoder(empty_decoder);
127 Ok(ScheduledScanLine {
128 decoders: vec![MessageType::DecoderReady(struct_decoder)],
129 rows_scheduled: self.num_rows,
130 })
131 }
132
133 fn num_rows(&self) -> u64 {
134 self.num_rows
135 }
136}
137
138#[derive(Debug)]
145struct SimpleStructSchedulerJob<'a> {
146 scheduler: &'a SimpleStructScheduler,
147 children: BinaryHeap<SchedulingJobWithStatus<'a>>,
149 rows_scheduled: u64,
150 num_rows: u64,
151 initialized: bool,
152}
153
154impl<'a> SimpleStructSchedulerJob<'a> {
155 fn new(
156 scheduler: &'a SimpleStructScheduler,
157 children: Vec<Box<dyn SchedulingJob + 'a>>,
158 num_rows: u64,
159 ) -> Self {
160 let children = children
161 .into_iter()
162 .enumerate()
163 .map(|(idx, job)| SchedulingJobWithStatus {
164 col_idx: idx as u32,
165 col_name: scheduler.child_fields[idx].name(),
166 job,
167 rows_scheduled: 0,
168 rows_remaining: num_rows,
169 })
170 .collect::<BinaryHeap<_>>();
171 Self {
172 scheduler,
173 children,
174 rows_scheduled: 0,
175 num_rows,
176 initialized: false,
177 }
178 }
179}
180
181impl SchedulingJob for SimpleStructSchedulerJob<'_> {
182 fn schedule_next(
183 &mut self,
184 mut context: &mut SchedulerContext,
185 priority: &dyn PriorityRange,
186 ) -> Result<ScheduledScanLine> {
187 let mut decoders = Vec::new();
188 if !self.initialized {
189 let struct_decoder = Box::new(SimpleStructDecoder::new(
192 self.scheduler.child_fields.clone(),
193 self.num_rows,
194 ));
195 #[allow(deprecated)]
196 let struct_decoder = context.locate_decoder(struct_decoder);
197 decoders.push(MessageType::DecoderReady(struct_decoder));
198 self.initialized = true;
199 }
200 let old_rows_scheduled = self.rows_scheduled;
201 while old_rows_scheduled == self.rows_scheduled {
204 let mut next_child = self.children.pop().unwrap();
205 trace!("Scheduling more rows for child {}", next_child.col_idx);
206 let scoped = context.push(next_child.col_name, next_child.col_idx);
207 let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
208 trace!(
209 "Scheduled {} rows for child {}",
210 child_scan.rows_scheduled,
211 next_child.col_idx
212 );
213 next_child.rows_scheduled += child_scan.rows_scheduled;
214 next_child.rows_remaining -= child_scan.rows_scheduled;
215 decoders.extend(child_scan.decoders);
216 self.children.push(next_child);
217 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
218 context = scoped.pop();
219 }
220 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
221 Ok(ScheduledScanLine {
222 decoders,
223 rows_scheduled: struct_rows_scheduled,
224 })
225 }
226
227 fn num_rows(&self) -> u64 {
228 self.num_rows
229 }
230}
231
232#[derive(Debug)]
243pub struct SimpleStructScheduler {
244 children: Vec<Arc<dyn FieldScheduler>>,
245 child_fields: Fields,
246 num_rows: u64,
247}
248
249impl SimpleStructScheduler {
250 pub fn new(
251 children: Vec<Arc<dyn FieldScheduler>>,
252 child_fields: Fields,
253 num_rows: u64,
254 ) -> Self {
255 let num_rows = children
256 .first()
257 .map(|child| child.num_rows())
258 .unwrap_or(num_rows);
259 debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
260 Self {
261 children,
262 child_fields,
263 num_rows,
264 }
265 }
266}
267
268impl FieldScheduler for SimpleStructScheduler {
269 fn schedule_ranges<'a>(
270 &'a self,
271 ranges: &[Range<u64>],
272 filter: &FilterExpression,
273 ) -> Result<Box<dyn SchedulingJob + 'a>> {
274 if self.children.is_empty() {
275 return Ok(Box::new(EmptyStructSchedulerJob {
276 num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
277 }));
278 }
279 let child_schedulers = self
280 .children
281 .iter()
282 .map(|child| child.schedule_ranges(ranges, filter))
283 .collect::<Result<Vec<_>>>()?;
284 let num_rows = child_schedulers[0].num_rows();
285 Ok(Box::new(SimpleStructSchedulerJob::new(
286 self,
287 child_schedulers,
288 num_rows,
289 )))
290 }
291
292 fn num_rows(&self) -> u64 {
293 self.num_rows
294 }
295
296 fn initialize<'a>(
297 &'a self,
298 _filter: &'a FilterExpression,
299 _context: &'a SchedulerContext,
300 ) -> BoxFuture<'a, Result<()>> {
301 let futures = self
302 .children
303 .iter()
304 .map(|child| child.initialize(_filter, _context))
305 .collect::<FuturesUnordered<_>>();
306 async move {
307 futures
308 .map(|res| res.map(|_| ()))
309 .try_collect::<Vec<_>>()
310 .await?;
311 Ok(())
312 }
313 .boxed()
314 }
315}
316
317#[derive(Debug)]
318struct ChildState {
319 scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
330 rows_loaded: u64,
332 rows_drained: u64,
334 rows_popped: u64,
336 num_rows: u64,
338 field_index: u32,
340}
341
342impl ChildState {
343 fn new(num_rows: u64, field_index: u32) -> Self {
344 Self {
345 scheduled: VecDeque::new(),
346 rows_loaded: 0,
347 rows_drained: 0,
348 rows_popped: 0,
349 num_rows,
350 field_index,
351 }
352 }
353
354 async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
359 trace!(
360 "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
361 self.field_index,
362 loaded_need,
363 self.rows_loaded,
364 );
365 let mut fully_loaded = self.rows_popped;
366 for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
367 if next_decoder.rows_unloaded() > 0 {
368 let mut current_need = loaded_need;
369 current_need -= fully_loaded;
370 let rows_in_page = next_decoder.num_rows();
371 let need_for_page = (rows_in_page - 1).min(current_need);
372 trace!(
373 "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
374 self.field_index,
375 page_idx,
376 need_for_page,
377 rows_in_page,
378 );
379 next_decoder.wait_for_loaded(need_for_page).await?;
385 let now_loaded = next_decoder.rows_loaded();
386 fully_loaded += now_loaded;
387 trace!(
388 "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
389 self.field_index,
390 page_idx,
391 now_loaded,
392 fully_loaded
393 );
394 } else {
395 fully_loaded += next_decoder.num_rows();
396 }
397 if fully_loaded > loaded_need {
398 break;
399 }
400 }
401 self.rows_loaded = fully_loaded;
402 trace!(
403 "Struct child {} loaded {} new rows and now {} are loaded",
404 self.field_index,
405 fully_loaded,
406 self.rows_loaded
407 );
408 Ok(())
409 }
410
411 fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
412 trace!("Struct draining {} rows", num_rows);
413
414 trace!(
415 "Draining {} rows from struct page with {} rows already drained",
416 num_rows,
417 self.rows_drained
418 );
419 let mut remaining = num_rows;
420 let mut composite = CompositeDecodeTask {
421 tasks: Vec::new(),
422 num_rows: 0,
423 has_more: true,
424 };
425 while remaining > 0 {
426 let next = self.scheduled.front_mut().unwrap();
427 let rows_to_take = remaining.min(next.rows_left());
428 let next_task = next.drain(rows_to_take)?;
429 if next.rows_left() == 0 {
430 trace!("Completely drained page");
431 self.rows_popped += next.num_rows();
432 self.scheduled.pop_front();
433 }
434 remaining -= rows_to_take;
435 composite.tasks.push(next_task.task);
436 composite.num_rows += next_task.num_rows;
437 }
438 self.rows_drained += num_rows;
439 composite.has_more = self.rows_drained != self.num_rows;
440 Ok(composite)
441 }
442}
443
444struct WaitOrder<'a>(&'a mut ChildState);
446
447impl Eq for WaitOrder<'_> {}
448impl PartialEq for WaitOrder<'_> {
449 fn eq(&self, other: &Self) -> bool {
450 self.0.rows_loaded == other.0.rows_loaded
451 }
452}
453impl Ord for WaitOrder<'_> {
454 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
455 other.0.rows_loaded.cmp(&self.0.rows_loaded)
457 }
458}
459impl PartialOrd for WaitOrder<'_> {
460 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
461 Some(self.cmp(other))
462 }
463}
464
465#[derive(Debug)]
466pub struct SimpleStructDecoder {
467 children: Vec<ChildState>,
468 child_fields: Fields,
469 data_type: DataType,
470 num_rows: u64,
471}
472
473impl SimpleStructDecoder {
474 pub fn new(child_fields: Fields, num_rows: u64) -> Self {
475 let data_type = DataType::Struct(child_fields.clone());
476 Self {
477 children: child_fields
478 .iter()
479 .enumerate()
480 .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
481 .collect(),
482 child_fields,
483 data_type,
484 num_rows,
485 }
486 }
487
488 async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
489 let mut wait_orders = self
490 .children
491 .iter_mut()
492 .filter_map(|child| {
493 if child.rows_loaded <= loaded_need {
494 Some(WaitOrder(child))
495 } else {
496 None
497 }
498 })
499 .collect::<BinaryHeap<_>>();
500 while !wait_orders.is_empty() {
501 let next_waiter = wait_orders.pop().unwrap();
502 let next_highest = wait_orders
503 .peek()
504 .map(|w| w.0.rows_loaded)
505 .unwrap_or(u64::MAX);
506 let limit = loaded_need.min(next_highest);
509 next_waiter.0.wait_for_loaded(limit).await?;
510 log::trace!(
511 "Struct child {} finished await pass and now {} are loaded",
512 next_waiter.0.field_index,
513 next_waiter.0.rows_loaded
514 );
515 if next_waiter.0.rows_loaded <= loaded_need {
516 wait_orders.push(next_waiter);
517 }
518 }
519 Ok(())
520 }
521}
522
523impl LogicalPageDecoder for SimpleStructDecoder {
524 fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
525 let child_idx = child.path.pop_front().unwrap();
527 if child.path.is_empty() {
528 self.children[child_idx as usize]
530 .scheduled
531 .push_back(child.decoder);
532 } else {
533 let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::Internal { message: format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx), location: location!() })?;
535 intended.accept_child(child)?;
536 }
537 Ok(())
538 }
539
540 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
541 self.do_wait_for_loaded(loaded_need).boxed()
542 }
543
544 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
545 let child_tasks = self
546 .children
547 .iter_mut()
548 .map(|child| child.drain(num_rows))
549 .collect::<Result<Vec<_>>>()?;
550 let num_rows = child_tasks[0].num_rows;
551 debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
552 Ok(NextDecodeTask {
553 task: Box::new(SimpleStructDecodeTask {
554 children: child_tasks,
555 child_fields: self.child_fields.clone(),
556 }),
557 num_rows,
558 })
559 }
560
561 fn rows_loaded(&self) -> u64 {
562 self.children.iter().map(|c| c.rows_loaded).min().unwrap()
563 }
564
565 fn rows_drained(&self) -> u64 {
566 debug_assert!(self
568 .children
569 .iter()
570 .all(|c| c.rows_drained == self.children[0].rows_drained));
571 self.children[0].rows_drained
572 }
573
574 fn num_rows(&self) -> u64 {
575 self.num_rows
576 }
577
578 fn data_type(&self) -> &DataType {
579 &self.data_type
580 }
581}
582
583struct CompositeDecodeTask {
584 tasks: Vec<Box<dyn DecodeArrayTask>>,
586 num_rows: u64,
587 has_more: bool,
588}
589
590impl CompositeDecodeTask {
591 fn decode(self) -> Result<ArrayRef> {
592 let arrays = self
593 .tasks
594 .into_iter()
595 .map(|task| task.decode())
596 .collect::<Result<Vec<_>>>()?;
597 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
598 Ok(arrow_select::concat::concat(&array_refs)?)
605 }
606}
607
608struct SimpleStructDecodeTask {
609 children: Vec<CompositeDecodeTask>,
610 child_fields: Fields,
611}
612
613impl DecodeArrayTask for SimpleStructDecodeTask {
614 fn decode(self: Box<Self>) -> Result<ArrayRef> {
615 let child_arrays = self
616 .children
617 .into_iter()
618 .map(|child| child.decode())
619 .collect::<Result<Vec<_>>>()?;
620 Ok(Arc::new(StructArray::try_new(
621 self.child_fields,
622 child_arrays,
623 None,
624 )?))
625 }
626}