1use std::{
5 collections::{BinaryHeap, VecDeque},
6 ops::Range,
7 sync::Arc,
8};
9
10use crate::{
11 decoder::{
12 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
13 ScheduledScanLine, SchedulerContext,
14 },
15 previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
16};
17use arrow_array::{ArrayRef, StructArray};
18use arrow_schema::{DataType, Field, Fields};
19use futures::{FutureExt, StreamExt, TryStreamExt, future::BoxFuture, stream::FuturesUnordered};
20use lance_core::{Error, Result};
21use log::trace;
22
23#[derive(Debug)]
24struct SchedulingJobWithStatus<'a> {
25 col_idx: u32,
26 col_name: &'a str,
27 job: Box<dyn SchedulingJob + 'a>,
28 rows_scheduled: u64,
29 rows_remaining: u64,
30}
31
32impl PartialEq for SchedulingJobWithStatus<'_> {
33 fn eq(&self, other: &Self) -> bool {
34 self.col_idx == other.col_idx
35 }
36}
37
38impl Eq for SchedulingJobWithStatus<'_> {}
39
40impl PartialOrd for SchedulingJobWithStatus<'_> {
41 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
42 Some(self.cmp(other))
43 }
44}
45
46impl Ord for SchedulingJobWithStatus<'_> {
47 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
48 other.rows_scheduled.cmp(&self.rows_scheduled)
50 }
51}
52
53#[derive(Debug)]
54struct EmptyStructDecodeTask {
55 num_rows: u64,
56}
57
58impl DecodeArrayTask for EmptyStructDecodeTask {
59 fn decode(self: Box<Self>) -> Result<ArrayRef> {
60 Ok(Arc::new(StructArray::new_empty_fields(
61 self.num_rows as usize,
62 None,
63 )))
64 }
65}
66
67#[derive(Debug)]
68struct EmptyStructDecoder {
69 num_rows: u64,
70 rows_drained: u64,
71 data_type: DataType,
72}
73
74impl EmptyStructDecoder {
75 fn new(num_rows: u64) -> Self {
76 Self {
77 num_rows,
78 rows_drained: 0,
79 data_type: DataType::Struct(Fields::from(Vec::<Field>::default())),
80 }
81 }
82}
83
84impl LogicalPageDecoder for EmptyStructDecoder {
85 fn wait_for_loaded(&mut self, _loaded_need: u64) -> BoxFuture<'_, Result<()>> {
86 Box::pin(std::future::ready(Ok(())))
87 }
88 fn rows_loaded(&self) -> u64 {
89 self.num_rows
90 }
91 fn rows_unloaded(&self) -> u64 {
92 0
93 }
94 fn num_rows(&self) -> u64 {
95 self.num_rows
96 }
97 fn rows_drained(&self) -> u64 {
98 self.rows_drained
99 }
100 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
101 self.rows_drained += num_rows;
102 Ok(NextDecodeTask {
103 num_rows,
104 task: Box::new(EmptyStructDecodeTask { num_rows }),
105 })
106 }
107 fn data_type(&self) -> &DataType {
108 &self.data_type
109 }
110}
111
112#[derive(Debug)]
113struct EmptyStructSchedulerJob {
114 num_rows: u64,
115}
116
117impl SchedulingJob for EmptyStructSchedulerJob {
118 fn schedule_next(
119 &mut self,
120 context: &mut SchedulerContext,
121 _priority: &dyn PriorityRange,
122 ) -> Result<ScheduledScanLine> {
123 let empty_decoder = Box::new(EmptyStructDecoder::new(self.num_rows));
124 #[allow(deprecated)]
125 let struct_decoder = context.locate_decoder(empty_decoder);
126 Ok(ScheduledScanLine {
127 decoders: vec![MessageType::DecoderReady(struct_decoder)],
128 rows_scheduled: self.num_rows,
129 })
130 }
131
132 fn num_rows(&self) -> u64 {
133 self.num_rows
134 }
135}
136
137#[derive(Debug)]
144struct SimpleStructSchedulerJob<'a> {
145 scheduler: &'a SimpleStructScheduler,
146 children: BinaryHeap<SchedulingJobWithStatus<'a>>,
148 rows_scheduled: u64,
149 num_rows: u64,
150 initialized: bool,
151}
152
153impl<'a> SimpleStructSchedulerJob<'a> {
154 fn new(
155 scheduler: &'a SimpleStructScheduler,
156 children: Vec<Box<dyn SchedulingJob + 'a>>,
157 num_rows: u64,
158 ) -> Self {
159 let children = children
160 .into_iter()
161 .enumerate()
162 .map(|(idx, job)| SchedulingJobWithStatus {
163 col_idx: idx as u32,
164 col_name: scheduler.child_fields[idx].name(),
165 job,
166 rows_scheduled: 0,
167 rows_remaining: num_rows,
168 })
169 .collect::<BinaryHeap<_>>();
170 Self {
171 scheduler,
172 children,
173 rows_scheduled: 0,
174 num_rows,
175 initialized: false,
176 }
177 }
178}
179
180impl SchedulingJob for SimpleStructSchedulerJob<'_> {
181 fn schedule_next(
182 &mut self,
183 mut context: &mut SchedulerContext,
184 priority: &dyn PriorityRange,
185 ) -> Result<ScheduledScanLine> {
186 let mut decoders = Vec::new();
187 if !self.initialized {
188 let struct_decoder = Box::new(SimpleStructDecoder::new(
191 self.scheduler.child_fields.clone(),
192 self.num_rows,
193 ));
194 #[allow(deprecated)]
195 let struct_decoder = context.locate_decoder(struct_decoder);
196 decoders.push(MessageType::DecoderReady(struct_decoder));
197 self.initialized = true;
198 }
199 let old_rows_scheduled = self.rows_scheduled;
200 while old_rows_scheduled == self.rows_scheduled {
203 let mut next_child = self.children.pop().unwrap();
204 trace!("Scheduling more rows for child {}", next_child.col_idx);
205 let scoped = context.push(next_child.col_name, next_child.col_idx);
206 let child_scan = next_child.job.schedule_next(scoped.context, priority)?;
207 trace!(
208 "Scheduled {} rows for child {}",
209 child_scan.rows_scheduled, next_child.col_idx
210 );
211 next_child.rows_scheduled += child_scan.rows_scheduled;
212 next_child.rows_remaining -= child_scan.rows_scheduled;
213 decoders.extend(child_scan.decoders);
214 self.children.push(next_child);
215 self.rows_scheduled = self.children.peek().unwrap().rows_scheduled;
216 context = scoped.pop();
217 }
218 let struct_rows_scheduled = self.rows_scheduled - old_rows_scheduled;
219 Ok(ScheduledScanLine {
220 decoders,
221 rows_scheduled: struct_rows_scheduled,
222 })
223 }
224
225 fn num_rows(&self) -> u64 {
226 self.num_rows
227 }
228}
229
230#[derive(Debug)]
241pub struct SimpleStructScheduler {
242 children: Vec<Arc<dyn FieldScheduler>>,
243 child_fields: Fields,
244 num_rows: u64,
245}
246
247impl SimpleStructScheduler {
248 pub fn new(
249 children: Vec<Arc<dyn FieldScheduler>>,
250 child_fields: Fields,
251 num_rows: u64,
252 ) -> Self {
253 let num_rows = children
254 .first()
255 .map(|child| child.num_rows())
256 .unwrap_or(num_rows);
257 debug_assert!(children.iter().all(|child| child.num_rows() == num_rows));
258 Self {
259 children,
260 child_fields,
261 num_rows,
262 }
263 }
264}
265
266impl FieldScheduler for SimpleStructScheduler {
267 fn schedule_ranges<'a>(
268 &'a self,
269 ranges: &[Range<u64>],
270 filter: &FilterExpression,
271 ) -> Result<Box<dyn SchedulingJob + 'a>> {
272 if self.children.is_empty() {
273 return Ok(Box::new(EmptyStructSchedulerJob {
274 num_rows: ranges.iter().map(|r| r.end - r.start).sum(),
275 }));
276 }
277 let child_schedulers = self
278 .children
279 .iter()
280 .map(|child| child.schedule_ranges(ranges, filter))
281 .collect::<Result<Vec<_>>>()?;
282 let num_rows = child_schedulers[0].num_rows();
283 Ok(Box::new(SimpleStructSchedulerJob::new(
284 self,
285 child_schedulers,
286 num_rows,
287 )))
288 }
289
290 fn num_rows(&self) -> u64 {
291 self.num_rows
292 }
293
294 fn initialize<'a>(
295 &'a self,
296 _filter: &'a FilterExpression,
297 _context: &'a SchedulerContext,
298 ) -> BoxFuture<'a, Result<()>> {
299 let futures = self
300 .children
301 .iter()
302 .map(|child| child.initialize(_filter, _context))
303 .collect::<FuturesUnordered<_>>();
304 async move {
305 futures
306 .map(|res| res.map(|_| ()))
307 .try_collect::<Vec<_>>()
308 .await?;
309 Ok(())
310 }
311 .boxed()
312 }
313}
314
315#[derive(Debug)]
316struct ChildState {
317 scheduled: VecDeque<Box<dyn LogicalPageDecoder>>,
328 rows_loaded: u64,
330 rows_drained: u64,
332 rows_popped: u64,
334 num_rows: u64,
336 field_index: u32,
338}
339
340impl ChildState {
341 fn new(num_rows: u64, field_index: u32) -> Self {
342 Self {
343 scheduled: VecDeque::new(),
344 rows_loaded: 0,
345 rows_drained: 0,
346 rows_popped: 0,
347 num_rows,
348 field_index,
349 }
350 }
351
352 async fn wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
357 trace!(
358 "Struct child {} waiting for more than {} rows to be loaded and {} are fully loaded already",
359 self.field_index, loaded_need, self.rows_loaded,
360 );
361 let mut fully_loaded = self.rows_popped;
362 for (page_idx, next_decoder) in self.scheduled.iter_mut().enumerate() {
363 if next_decoder.rows_unloaded() > 0 {
364 let mut current_need = loaded_need;
365 current_need -= fully_loaded;
366 let rows_in_page = next_decoder.num_rows();
367 let need_for_page = (rows_in_page - 1).min(current_need);
368 trace!(
369 "Struct child {} page {} will wait until more than {} rows loaded from page with {} rows",
370 self.field_index, page_idx, need_for_page, rows_in_page,
371 );
372 next_decoder.wait_for_loaded(need_for_page).await?;
378 let now_loaded = next_decoder.rows_loaded();
379 fully_loaded += now_loaded;
380 trace!(
381 "Struct child {} page {} await and now has {} loaded rows and we have {} fully loaded",
382 self.field_index, page_idx, now_loaded, fully_loaded
383 );
384 } else {
385 fully_loaded += next_decoder.num_rows();
386 }
387 if fully_loaded > loaded_need {
388 break;
389 }
390 }
391 self.rows_loaded = fully_loaded;
392 trace!(
393 "Struct child {} loaded {} new rows and now {} are loaded",
394 self.field_index, fully_loaded, self.rows_loaded
395 );
396 Ok(())
397 }
398
399 fn drain(&mut self, num_rows: u64) -> Result<CompositeDecodeTask> {
400 trace!("Struct draining {} rows", num_rows);
401
402 trace!(
403 "Draining {} rows from struct page with {} rows already drained",
404 num_rows, self.rows_drained
405 );
406 let mut remaining = num_rows;
407 let mut composite = CompositeDecodeTask {
408 tasks: Vec::new(),
409 num_rows: 0,
410 has_more: true,
411 };
412 while remaining > 0 {
413 let next = self.scheduled.front_mut().unwrap();
414 let rows_to_take = remaining.min(next.rows_left());
415 let next_task = next.drain(rows_to_take)?;
416 if next.rows_left() == 0 {
417 trace!("Completely drained page");
418 self.rows_popped += next.num_rows();
419 self.scheduled.pop_front();
420 }
421 remaining -= rows_to_take;
422 composite.tasks.push(next_task.task);
423 composite.num_rows += next_task.num_rows;
424 }
425 self.rows_drained += num_rows;
426 composite.has_more = self.rows_drained != self.num_rows;
427 Ok(composite)
428 }
429}
430
431struct WaitOrder<'a>(&'a mut ChildState);
433
434impl Eq for WaitOrder<'_> {}
435impl PartialEq for WaitOrder<'_> {
436 fn eq(&self, other: &Self) -> bool {
437 self.0.rows_loaded == other.0.rows_loaded
438 }
439}
440impl Ord for WaitOrder<'_> {
441 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
442 other.0.rows_loaded.cmp(&self.0.rows_loaded)
444 }
445}
446impl PartialOrd for WaitOrder<'_> {
447 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
448 Some(self.cmp(other))
449 }
450}
451
452#[derive(Debug)]
453pub struct SimpleStructDecoder {
454 children: Vec<ChildState>,
455 child_fields: Fields,
456 data_type: DataType,
457 num_rows: u64,
458}
459
460impl SimpleStructDecoder {
461 pub fn new(child_fields: Fields, num_rows: u64) -> Self {
462 let data_type = DataType::Struct(child_fields.clone());
463 Self {
464 children: child_fields
465 .iter()
466 .enumerate()
467 .map(|(idx, _)| ChildState::new(num_rows, idx as u32))
468 .collect(),
469 child_fields,
470 data_type,
471 num_rows,
472 }
473 }
474
475 async fn do_wait_for_loaded(&mut self, loaded_need: u64) -> Result<()> {
476 let mut wait_orders = self
477 .children
478 .iter_mut()
479 .filter_map(|child| {
480 if child.rows_loaded <= loaded_need {
481 Some(WaitOrder(child))
482 } else {
483 None
484 }
485 })
486 .collect::<BinaryHeap<_>>();
487 while !wait_orders.is_empty() {
488 let next_waiter = wait_orders.pop().unwrap();
489 let next_highest = wait_orders
490 .peek()
491 .map(|w| w.0.rows_loaded)
492 .unwrap_or(u64::MAX);
493 let limit = loaded_need.min(next_highest);
496 next_waiter.0.wait_for_loaded(limit).await?;
497 log::trace!(
498 "Struct child {} finished await pass and now {} are loaded",
499 next_waiter.0.field_index,
500 next_waiter.0.rows_loaded
501 );
502 if next_waiter.0.rows_loaded <= loaded_need {
503 wait_orders.push(next_waiter);
504 }
505 }
506 Ok(())
507 }
508}
509
510impl LogicalPageDecoder for SimpleStructDecoder {
511 fn accept_child(&mut self, mut child: DecoderReady) -> Result<()> {
512 let child_idx = child.path.pop_front().unwrap();
514 if child.path.is_empty() {
515 self.children[child_idx as usize]
517 .scheduled
518 .push_back(child.decoder);
519 } else {
520 let intended = self.children[child_idx as usize].scheduled.back_mut().ok_or_else(|| Error::internal(format!("Decoder scheduled for child at index {} but we don't have any child at that index yet", child_idx)))?;
522 intended.accept_child(child)?;
523 }
524 Ok(())
525 }
526
527 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
528 self.do_wait_for_loaded(loaded_need).boxed()
529 }
530
531 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
532 let child_tasks = self
533 .children
534 .iter_mut()
535 .map(|child| child.drain(num_rows))
536 .collect::<Result<Vec<_>>>()?;
537 let num_rows = child_tasks[0].num_rows;
538 debug_assert!(child_tasks.iter().all(|task| task.num_rows == num_rows));
539 Ok(NextDecodeTask {
540 task: Box::new(SimpleStructDecodeTask {
541 children: child_tasks,
542 child_fields: self.child_fields.clone(),
543 }),
544 num_rows,
545 })
546 }
547
548 fn rows_loaded(&self) -> u64 {
549 self.children.iter().map(|c| c.rows_loaded).min().unwrap()
550 }
551
552 fn rows_drained(&self) -> u64 {
553 debug_assert!(
555 self.children
556 .iter()
557 .all(|c| c.rows_drained == self.children[0].rows_drained)
558 );
559 self.children[0].rows_drained
560 }
561
562 fn num_rows(&self) -> u64 {
563 self.num_rows
564 }
565
566 fn data_type(&self) -> &DataType {
567 &self.data_type
568 }
569}
570
571struct CompositeDecodeTask {
572 tasks: Vec<Box<dyn DecodeArrayTask>>,
574 num_rows: u64,
575 has_more: bool,
576}
577
578impl CompositeDecodeTask {
579 fn decode(self) -> Result<ArrayRef> {
580 let arrays = self
581 .tasks
582 .into_iter()
583 .map(|task| task.decode())
584 .collect::<Result<Vec<_>>>()?;
585 let array_refs = arrays.iter().map(|arr| arr.as_ref()).collect::<Vec<_>>();
586 Ok(arrow_select::concat::concat(&array_refs)?)
593 }
594}
595
596struct SimpleStructDecodeTask {
597 children: Vec<CompositeDecodeTask>,
598 child_fields: Fields,
599}
600
601impl DecodeArrayTask for SimpleStructDecodeTask {
602 fn decode(self: Box<Self>) -> Result<ArrayRef> {
603 let child_arrays = self
604 .children
605 .into_iter()
606 .map(|child| child.decode())
607 .collect::<Result<Vec<_>>>()?;
608 Ok(Arc::new(StructArray::try_new(
609 self.child_fields,
610 child_arrays,
611 None,
612 )?))
613 }
614}