1use std::{fmt::Debug, ops::Range, sync::Arc, vec};
5
6use arrow_array::{cast::AsArray, make_array, Array, ArrayRef};
7use arrow_buffer::bit_util;
8use arrow_schema::DataType;
9use futures::{future::BoxFuture, FutureExt};
10use log::trace;
11use snafu::location;
12
13use crate::decoder::{ColumnBuffers, PageBuffers};
14use crate::previous::decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob};
15use crate::previous::encoder::ArrayEncodingStrategy;
16use crate::utils::accumulation::AccumulationQueue;
17use crate::{data::DataBlock, previous::encodings::physical::decoder_from_array_encoding};
18use lance_core::{datatypes::Field, Error, Result};
19
20use crate::{
21 decoder::{
22 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PageEncoding, PageInfo,
23 PageScheduler, PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext,
24 },
25 encoder::{
26 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
27 },
28 repdef::RepDefBuilder,
29};
30
31#[derive(Debug)]
32struct PrimitivePage {
33 scheduler: Box<dyn PageScheduler>,
34 num_rows: u64,
35 page_index: u32,
36}
37
38#[derive(Debug)]
48pub struct PrimitiveFieldScheduler {
49 data_type: DataType,
50 page_schedulers: Vec<PrimitivePage>,
51 num_rows: u64,
52 should_validate: bool,
53 column_index: u32,
54}
55
56impl PrimitiveFieldScheduler {
57 pub fn new(
58 column_index: u32,
59 data_type: DataType,
60 pages: Arc<[PageInfo]>,
61 buffers: ColumnBuffers,
62 should_validate: bool,
63 ) -> Self {
64 let page_schedulers = pages
65 .iter()
66 .enumerate()
67 .filter(|(page_index, page)| {
69 log::trace!("Skipping empty page with index {}", page_index);
70 page.num_rows > 0
71 })
72 .map(|(page_index, page)| {
73 let page_buffers = PageBuffers {
74 column_buffers: buffers,
75 positions_and_sizes: &page.buffer_offsets_and_sizes,
76 };
77 let scheduler = decoder_from_array_encoding(
78 page.encoding.as_legacy(),
79 &page_buffers,
80 &data_type,
81 );
82 PrimitivePage {
83 scheduler,
84 num_rows: page.num_rows,
85 page_index: page_index as u32,
86 }
87 })
88 .collect::<Vec<_>>();
89 let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
90 Self {
91 data_type,
92 page_schedulers,
93 num_rows,
94 should_validate,
95 column_index,
96 }
97 }
98}
99
100#[derive(Debug)]
101struct PrimitiveFieldSchedulingJob<'a> {
102 scheduler: &'a PrimitiveFieldScheduler,
103 ranges: Vec<Range<u64>>,
104 page_idx: usize,
105 range_idx: usize,
106 range_offset: u64,
107 global_row_offset: u64,
108}
109
110impl<'a> PrimitiveFieldSchedulingJob<'a> {
111 pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
112 Self {
113 scheduler,
114 ranges,
115 page_idx: 0,
116 range_idx: 0,
117 range_offset: 0,
118 global_row_offset: 0,
119 }
120 }
121}
122
123impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
124 fn schedule_next(
125 &mut self,
126 context: &mut SchedulerContext,
127 priority: &dyn PriorityRange,
128 ) -> Result<ScheduledScanLine> {
129 debug_assert!(self.range_idx < self.ranges.len());
130 let mut range = self.ranges[self.range_idx].clone();
132 range.start += self.range_offset;
133
134 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
135 trace!(
136 "Current range is {:?} and current page has {} rows",
137 range,
138 cur_page.num_rows
139 );
140 while cur_page.num_rows + self.global_row_offset <= range.start {
142 self.global_row_offset += cur_page.num_rows;
143 self.page_idx += 1;
144 trace!("Skipping entire page of {} rows", cur_page.num_rows);
145 cur_page = &self.scheduler.page_schedulers[self.page_idx];
146 }
147
148 let mut ranges_in_page = Vec::new();
152 while cur_page.num_rows + self.global_row_offset > range.start {
153 range.start = range.start.max(self.global_row_offset);
154 let start_in_page = range.start - self.global_row_offset;
155 let end_in_page = start_in_page + (range.end - range.start);
156 let end_in_page = end_in_page.min(cur_page.num_rows);
157 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
158
159 ranges_in_page.push(start_in_page..end_in_page);
160 if last_in_range {
161 self.range_idx += 1;
162 if self.range_idx == self.ranges.len() {
163 break;
164 }
165 range = self.ranges[self.range_idx].clone();
166 } else {
167 break;
168 }
169 }
170
171 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
172 trace!(
173 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
174 num_rows_in_next,
175 ranges_in_page.len(),
176 cur_page.num_rows,
177 priority.current_priority(),
178 self.scheduler.column_index,
179 cur_page.page_index,
180 );
181
182 self.global_row_offset += cur_page.num_rows;
183 self.page_idx += 1;
184
185 let physical_decoder = cur_page.scheduler.schedule_ranges(
186 &ranges_in_page,
187 context.io(),
188 priority.current_priority(),
189 );
190
191 let logical_decoder = PrimitiveFieldDecoder {
192 data_type: self.scheduler.data_type.clone(),
193 column_index: self.scheduler.column_index,
194 unloaded_physical_decoder: Some(physical_decoder),
195 physical_decoder: None,
196 rows_drained: 0,
197 num_rows: num_rows_in_next,
198 should_validate: self.scheduler.should_validate,
199 page_index: cur_page.page_index,
200 };
201
202 let decoder = Box::new(logical_decoder);
203 #[allow(deprecated)]
204 let decoder_ready = context.locate_decoder(decoder);
205 Ok(ScheduledScanLine {
206 decoders: vec![MessageType::DecoderReady(decoder_ready)],
207 rows_scheduled: num_rows_in_next,
208 })
209 }
210
211 fn num_rows(&self) -> u64 {
212 self.ranges.iter().map(|r| r.end - r.start).sum()
213 }
214}
215
216impl FieldScheduler for PrimitiveFieldScheduler {
217 fn num_rows(&self) -> u64 {
218 self.num_rows
219 }
220
221 fn schedule_ranges<'a>(
222 &'a self,
223 ranges: &[std::ops::Range<u64>],
224 _filter: &FilterExpression,
226 ) -> Result<Box<dyn SchedulingJob + 'a>> {
227 Ok(Box::new(PrimitiveFieldSchedulingJob::new(
228 self,
229 ranges.to_vec(),
230 )))
231 }
232
233 fn initialize<'a>(
234 &'a self,
235 _filter: &'a FilterExpression,
236 _context: &'a SchedulerContext,
237 ) -> BoxFuture<'a, Result<()>> {
238 std::future::ready(Ok(())).boxed()
240 }
241}
242
243pub struct PrimitiveFieldDecoder {
244 data_type: DataType,
245 unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
246 physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
247 should_validate: bool,
248 num_rows: u64,
249 rows_drained: u64,
250 column_index: u32,
251 page_index: u32,
252}
253
254impl PrimitiveFieldDecoder {
255 pub fn new_from_data(
256 physical_decoder: Arc<dyn PrimitivePageDecoder>,
257 data_type: DataType,
258 num_rows: u64,
259 should_validate: bool,
260 ) -> Self {
261 Self {
262 data_type,
263 unloaded_physical_decoder: None,
264 physical_decoder: Some(physical_decoder),
265 should_validate,
266 num_rows,
267 rows_drained: 0,
268 column_index: u32::MAX,
269 page_index: u32::MAX,
270 }
271 }
272}
273
274impl Debug for PrimitiveFieldDecoder {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 f.debug_struct("PrimitiveFieldDecoder")
277 .field("data_type", &self.data_type)
278 .field("num_rows", &self.num_rows)
279 .field("rows_drained", &self.rows_drained)
280 .finish()
281 }
282}
283
284struct PrimitiveFieldDecodeTask {
285 rows_to_skip: u64,
286 rows_to_take: u64,
287 should_validate: bool,
288 physical_decoder: Arc<dyn PrimitivePageDecoder>,
289 data_type: DataType,
290}
291
292impl DecodeArrayTask for PrimitiveFieldDecodeTask {
293 fn decode(self: Box<Self>) -> Result<ArrayRef> {
294 let block = self
295 .physical_decoder
296 .decode(self.rows_to_skip, self.rows_to_take)?;
297
298 let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
299
300 if let DataType::Dictionary(_, _) = self.data_type {
307 let dict = array.as_any_dictionary();
308 if let Some(nulls) = array.logical_nulls() {
309 let new_indices = dict.keys().to_data();
310 let new_array = make_array(
311 new_indices
312 .into_builder()
313 .nulls(Some(nulls))
314 .add_child_data(dict.values().to_data())
315 .data_type(dict.data_type().clone())
316 .build()?,
317 );
318 return Ok(new_array);
319 }
320 }
321 Ok(array)
322 }
323}
324
325impl LogicalPageDecoder for PrimitiveFieldDecoder {
326 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
329 log::trace!(
330 "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
331 loaded_need,
332 self.column_index,
333 self.page_index,
334 self.num_rows
335 );
336 async move {
337 let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
338 self.physical_decoder = Some(Arc::from(physical_decoder));
339 Ok(())
340 }
341 .boxed()
342 }
343
344 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
345 if self.physical_decoder.as_ref().is_none() {
346 return Err(lance_core::Error::Internal {
347 message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
348 location: location!(),
349 });
350 }
351
352 let rows_to_skip = self.rows_drained;
353 let rows_to_take = num_rows;
354
355 self.rows_drained += rows_to_take;
356
357 let task = Box::new(PrimitiveFieldDecodeTask {
358 rows_to_skip,
359 rows_to_take,
360 should_validate: self.should_validate,
361 physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
362 data_type: self.data_type.clone(),
363 });
364
365 Ok(NextDecodeTask {
366 task,
367 num_rows: rows_to_take,
368 })
369 }
370
371 fn rows_loaded(&self) -> u64 {
372 if self.unloaded_physical_decoder.is_some() {
373 0
374 } else {
375 self.num_rows
376 }
377 }
378
379 fn rows_drained(&self) -> u64 {
380 if self.unloaded_physical_decoder.is_some() {
381 0
382 } else {
383 self.rows_drained
384 }
385 }
386
387 fn num_rows(&self) -> u64 {
388 self.num_rows
389 }
390
391 fn data_type(&self) -> &DataType {
392 &self.data_type
393 }
394}
395
396pub struct PrimitiveFieldEncoder {
397 accumulation_queue: AccumulationQueue,
398 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
399 column_index: u32,
400 field: Field,
401 max_page_bytes: u64,
402}
403
404impl PrimitiveFieldEncoder {
405 pub fn try_new(
406 options: &EncodingOptions,
407 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
408 column_index: u32,
409 field: Field,
410 ) -> Result<Self> {
411 Ok(Self {
412 accumulation_queue: AccumulationQueue::new(
413 options.cache_bytes_per_column,
414 column_index,
415 options.keep_original_array,
416 ),
417 column_index,
418 max_page_bytes: options.max_page_bytes,
419 array_encoding_strategy,
420 field,
421 })
422 }
423
424 fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
425 let encoder = self
426 .array_encoding_strategy
427 .create_array_encoder(&arrays, &self.field)?;
428 let column_idx = self.column_index;
429 let data_type = self.field.data_type();
430
431 Ok(tokio::task::spawn(async move {
432 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
433 let data = DataBlock::from_arrays(&arrays, num_values);
434 let mut buffer_index = 0;
435 let array = encoder.encode(data, &data_type, &mut buffer_index)?;
436 let (data, description) = array.into_buffers();
437 Ok(EncodedPage {
438 data,
439 description: PageEncoding::Legacy(description),
440 num_rows: num_values,
441 column_idx,
442 row_number: 0, })
444 })
445 .map(|res_res| {
446 res_res.unwrap_or_else(|err| {
447 Err(Error::Internal {
448 message: format!("Encoding task failed with error: {:?}", err),
449 location: location!(),
450 })
451 })
452 })
453 .boxed())
454 }
455
456 fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
458 if arrays.len() == 1 {
459 let array = arrays.into_iter().next().unwrap();
460 let size_bytes = array.get_buffer_memory_size();
461 let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
462 let num_parts = num_parts.min(array.len());
464 if num_parts <= 1 {
465 Ok(vec![self.create_encode_task(vec![array])?])
467 } else {
468 let mut tasks = Vec::with_capacity(num_parts);
473 let mut offset = 0;
474 let part_size = bit_util::ceil(array.len(), num_parts);
475 for _ in 0..num_parts {
476 let avail = array.len() - offset;
477 if avail == 0 {
478 break;
479 }
480 let chunk_size = avail.min(part_size);
481 let part = array.slice(offset, chunk_size);
482 let task = self.create_encode_task(vec![part])?;
483 tasks.push(task);
484 offset += chunk_size;
485 }
486 Ok(tasks)
487 }
488 } else {
489 Ok(vec![self.create_encode_task(arrays)?])
493 }
494 }
495}
496
497impl FieldEncoder for PrimitiveFieldEncoder {
498 fn maybe_encode(
500 &mut self,
501 array: ArrayRef,
502 _external_buffers: &mut OutOfLineBuffers,
503 _repdef: RepDefBuilder,
504 row_number: u64,
505 num_rows: u64,
506 ) -> Result<Vec<EncodeTask>> {
507 if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
508 Ok(self.do_flush(arrays.0)?)
509 } else {
510 Ok(vec![])
511 }
512 }
513
514 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
516 if let Some(arrays) = self.accumulation_queue.flush() {
517 Ok(self.do_flush(arrays.0)?)
518 } else {
519 Ok(vec![])
520 }
521 }
522
523 fn num_columns(&self) -> u32 {
524 1
525 }
526
527 fn finish(
528 &mut self,
529 _external_buffers: &mut OutOfLineBuffers,
530 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
531 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
532 }
533}