1use std::{fmt::Debug, ops::Range, sync::Arc, vec};
5
6use arrow_array::{Array, ArrayRef, cast::AsArray, make_array};
7use arrow_buffer::bit_util;
8use arrow_schema::DataType;
9use futures::{FutureExt, future::BoxFuture};
10use log::trace;
11
12use crate::decoder::{ColumnBuffers, PageBuffers};
13use crate::previous::decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob};
14use crate::previous::encoder::ArrayEncodingStrategy;
15use crate::utils::accumulation::AccumulationQueue;
16use crate::{data::DataBlock, previous::encodings::physical::decoder_from_array_encoding};
17use lance_core::{Error, Result, datatypes::Field};
18
19use crate::{
20 decoder::{
21 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PageEncoding, PageInfo,
22 PageScheduler, PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext,
23 },
24 encoder::{
25 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
26 },
27 repdef::RepDefBuilder,
28};
29
30#[derive(Debug)]
31struct PrimitivePage {
32 scheduler: Box<dyn PageScheduler>,
33 num_rows: u64,
34 page_index: u32,
35}
36
37#[derive(Debug)]
47pub struct PrimitiveFieldScheduler {
48 data_type: DataType,
49 page_schedulers: Vec<PrimitivePage>,
50 num_rows: u64,
51 should_validate: bool,
52 column_index: u32,
53}
54
55impl PrimitiveFieldScheduler {
56 pub fn new(
57 column_index: u32,
58 data_type: DataType,
59 pages: Arc<[PageInfo]>,
60 buffers: ColumnBuffers,
61 should_validate: bool,
62 ) -> Self {
63 let page_schedulers = pages
64 .iter()
65 .enumerate()
66 .filter(|(page_index, page)| {
68 log::trace!("Skipping empty page with index {}", page_index);
69 page.num_rows > 0
70 })
71 .map(|(page_index, page)| {
72 let page_buffers = PageBuffers {
73 column_buffers: buffers,
74 positions_and_sizes: &page.buffer_offsets_and_sizes,
75 };
76 let scheduler = decoder_from_array_encoding(
77 page.encoding.as_legacy(),
78 &page_buffers,
79 &data_type,
80 );
81 PrimitivePage {
82 scheduler,
83 num_rows: page.num_rows,
84 page_index: page_index as u32,
85 }
86 })
87 .collect::<Vec<_>>();
88 let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
89 Self {
90 data_type,
91 page_schedulers,
92 num_rows,
93 should_validate,
94 column_index,
95 }
96 }
97}
98
99#[derive(Debug)]
100struct PrimitiveFieldSchedulingJob<'a> {
101 scheduler: &'a PrimitiveFieldScheduler,
102 ranges: Vec<Range<u64>>,
103 page_idx: usize,
104 range_idx: usize,
105 range_offset: u64,
106 global_row_offset: u64,
107}
108
109impl<'a> PrimitiveFieldSchedulingJob<'a> {
110 pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
111 Self {
112 scheduler,
113 ranges,
114 page_idx: 0,
115 range_idx: 0,
116 range_offset: 0,
117 global_row_offset: 0,
118 }
119 }
120}
121
122impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
123 fn schedule_next(
124 &mut self,
125 context: &mut SchedulerContext,
126 priority: &dyn PriorityRange,
127 ) -> Result<ScheduledScanLine> {
128 debug_assert!(self.range_idx < self.ranges.len());
129 let mut range = self.ranges[self.range_idx].clone();
131 range.start += self.range_offset;
132
133 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
134 trace!(
135 "Current range is {:?} and current page has {} rows",
136 range, cur_page.num_rows
137 );
138 while cur_page.num_rows + self.global_row_offset <= range.start {
140 self.global_row_offset += cur_page.num_rows;
141 self.page_idx += 1;
142 trace!("Skipping entire page of {} rows", cur_page.num_rows);
143 cur_page = &self.scheduler.page_schedulers[self.page_idx];
144 }
145
146 let mut ranges_in_page = Vec::new();
150 while cur_page.num_rows + self.global_row_offset > range.start {
151 range.start = range.start.max(self.global_row_offset);
152 let start_in_page = range.start - self.global_row_offset;
153 let end_in_page = start_in_page + (range.end - range.start);
154 let end_in_page = end_in_page.min(cur_page.num_rows);
155 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
156
157 ranges_in_page.push(start_in_page..end_in_page);
158 if last_in_range {
159 self.range_idx += 1;
160 if self.range_idx == self.ranges.len() {
161 break;
162 }
163 range = self.ranges[self.range_idx].clone();
164 } else {
165 break;
166 }
167 }
168
169 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
170 trace!(
171 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
172 num_rows_in_next,
173 ranges_in_page.len(),
174 cur_page.num_rows,
175 priority.current_priority(),
176 self.scheduler.column_index,
177 cur_page.page_index,
178 );
179
180 self.global_row_offset += cur_page.num_rows;
181 self.page_idx += 1;
182
183 let physical_decoder = cur_page.scheduler.schedule_ranges(
184 &ranges_in_page,
185 context.io(),
186 priority.current_priority(),
187 );
188
189 let logical_decoder = PrimitiveFieldDecoder {
190 data_type: self.scheduler.data_type.clone(),
191 column_index: self.scheduler.column_index,
192 unloaded_physical_decoder: Some(physical_decoder),
193 physical_decoder: None,
194 rows_drained: 0,
195 num_rows: num_rows_in_next,
196 should_validate: self.scheduler.should_validate,
197 page_index: cur_page.page_index,
198 };
199
200 let decoder = Box::new(logical_decoder);
201 #[allow(deprecated)]
202 let decoder_ready = context.locate_decoder(decoder);
203 Ok(ScheduledScanLine {
204 decoders: vec![MessageType::DecoderReady(decoder_ready)],
205 rows_scheduled: num_rows_in_next,
206 })
207 }
208
209 fn num_rows(&self) -> u64 {
210 self.ranges.iter().map(|r| r.end - r.start).sum()
211 }
212}
213
214impl FieldScheduler for PrimitiveFieldScheduler {
215 fn num_rows(&self) -> u64 {
216 self.num_rows
217 }
218
219 fn schedule_ranges<'a>(
220 &'a self,
221 ranges: &[std::ops::Range<u64>],
222 _filter: &FilterExpression,
224 ) -> Result<Box<dyn SchedulingJob + 'a>> {
225 Ok(Box::new(PrimitiveFieldSchedulingJob::new(
226 self,
227 ranges.to_vec(),
228 )))
229 }
230
231 fn initialize<'a>(
232 &'a self,
233 _filter: &'a FilterExpression,
234 _context: &'a SchedulerContext,
235 ) -> BoxFuture<'a, Result<()>> {
236 std::future::ready(Ok(())).boxed()
238 }
239}
240
241pub struct PrimitiveFieldDecoder {
242 data_type: DataType,
243 unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
244 physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
245 should_validate: bool,
246 num_rows: u64,
247 rows_drained: u64,
248 column_index: u32,
249 page_index: u32,
250}
251
252impl PrimitiveFieldDecoder {
253 pub fn new_from_data(
254 physical_decoder: Arc<dyn PrimitivePageDecoder>,
255 data_type: DataType,
256 num_rows: u64,
257 should_validate: bool,
258 ) -> Self {
259 Self {
260 data_type,
261 unloaded_physical_decoder: None,
262 physical_decoder: Some(physical_decoder),
263 should_validate,
264 num_rows,
265 rows_drained: 0,
266 column_index: u32::MAX,
267 page_index: u32::MAX,
268 }
269 }
270}
271
272impl Debug for PrimitiveFieldDecoder {
273 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
274 f.debug_struct("PrimitiveFieldDecoder")
275 .field("data_type", &self.data_type)
276 .field("num_rows", &self.num_rows)
277 .field("rows_drained", &self.rows_drained)
278 .finish()
279 }
280}
281
282struct PrimitiveFieldDecodeTask {
283 rows_to_skip: u64,
284 rows_to_take: u64,
285 should_validate: bool,
286 physical_decoder: Arc<dyn PrimitivePageDecoder>,
287 data_type: DataType,
288}
289
290impl DecodeArrayTask for PrimitiveFieldDecodeTask {
291 fn decode(self: Box<Self>) -> Result<ArrayRef> {
292 let block = self
293 .physical_decoder
294 .decode(self.rows_to_skip, self.rows_to_take)?;
295
296 let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
297
298 if let DataType::Dictionary(_, _) = self.data_type {
305 let dict = array.as_any_dictionary();
306 if let Some(nulls) = array.logical_nulls() {
307 let new_indices = dict.keys().to_data();
308 let new_array = make_array(
309 new_indices
310 .into_builder()
311 .nulls(Some(nulls))
312 .add_child_data(dict.values().to_data())
313 .data_type(dict.data_type().clone())
314 .build()?,
315 );
316 return Ok(new_array);
317 }
318 }
319 Ok(array)
320 }
321}
322
323impl LogicalPageDecoder for PrimitiveFieldDecoder {
324 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
327 log::trace!(
328 "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
329 loaded_need,
330 self.column_index,
331 self.page_index,
332 self.num_rows
333 );
334 async move {
335 let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
336 self.physical_decoder = Some(Arc::from(physical_decoder));
337 Ok(())
338 }
339 .boxed()
340 }
341
342 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
343 if self.physical_decoder.as_ref().is_none() {
344 return Err(lance_core::Error::internal(format!(
345 "drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited",
346 self.data_type, self.column_index
347 )));
348 }
349
350 let rows_to_skip = self.rows_drained;
351 let rows_to_take = num_rows;
352
353 self.rows_drained += rows_to_take;
354
355 let task = Box::new(PrimitiveFieldDecodeTask {
356 rows_to_skip,
357 rows_to_take,
358 should_validate: self.should_validate,
359 physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
360 data_type: self.data_type.clone(),
361 });
362
363 Ok(NextDecodeTask {
364 task,
365 num_rows: rows_to_take,
366 })
367 }
368
369 fn rows_loaded(&self) -> u64 {
370 if self.unloaded_physical_decoder.is_some() {
371 0
372 } else {
373 self.num_rows
374 }
375 }
376
377 fn rows_drained(&self) -> u64 {
378 if self.unloaded_physical_decoder.is_some() {
379 0
380 } else {
381 self.rows_drained
382 }
383 }
384
385 fn num_rows(&self) -> u64 {
386 self.num_rows
387 }
388
389 fn data_type(&self) -> &DataType {
390 &self.data_type
391 }
392}
393
394pub struct PrimitiveFieldEncoder {
395 accumulation_queue: AccumulationQueue,
396 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
397 column_index: u32,
398 field: Field,
399 max_page_bytes: u64,
400}
401
402impl PrimitiveFieldEncoder {
403 pub fn try_new(
404 options: &EncodingOptions,
405 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
406 column_index: u32,
407 field: Field,
408 ) -> Result<Self> {
409 Ok(Self {
410 accumulation_queue: AccumulationQueue::new(
411 options.cache_bytes_per_column,
412 column_index,
413 options.keep_original_array,
414 ),
415 column_index,
416 max_page_bytes: options.max_page_bytes,
417 array_encoding_strategy,
418 field,
419 })
420 }
421
422 fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
423 let encoder = self
424 .array_encoding_strategy
425 .create_array_encoder(&arrays, &self.field)?;
426 let column_idx = self.column_index;
427 let data_type = self.field.data_type();
428
429 Ok(tokio::task::spawn(async move {
430 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
431 let data = DataBlock::from_arrays(&arrays, num_values);
432 let mut buffer_index = 0;
433 let array = encoder.encode(data, &data_type, &mut buffer_index)?;
434 let (data, description) = array.into_buffers();
435 Ok(EncodedPage {
436 data,
437 description: PageEncoding::Legacy(description),
438 num_rows: num_values,
439 column_idx,
440 row_number: 0, })
442 })
443 .map(|res_res| {
444 res_res.unwrap_or_else(|err| {
445 Err(Error::internal(format!(
446 "Encoding task failed with error: {:?}",
447 err
448 )))
449 })
450 })
451 .boxed())
452 }
453
454 fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
456 if arrays.len() == 1 {
457 let array = arrays.into_iter().next().unwrap();
458 let size_bytes = array.get_buffer_memory_size();
459 let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
460 let num_parts = num_parts.min(array.len());
462 if num_parts <= 1 {
463 Ok(vec![self.create_encode_task(vec![array])?])
465 } else {
466 let mut tasks = Vec::with_capacity(num_parts);
471 let mut offset = 0;
472 let part_size = bit_util::ceil(array.len(), num_parts);
473 for _ in 0..num_parts {
474 let avail = array.len() - offset;
475 if avail == 0 {
476 break;
477 }
478 let chunk_size = avail.min(part_size);
479 let part = array.slice(offset, chunk_size);
480 let task = self.create_encode_task(vec![part])?;
481 tasks.push(task);
482 offset += chunk_size;
483 }
484 Ok(tasks)
485 }
486 } else {
487 Ok(vec![self.create_encode_task(arrays)?])
491 }
492 }
493}
494
495impl FieldEncoder for PrimitiveFieldEncoder {
496 fn maybe_encode(
498 &mut self,
499 array: ArrayRef,
500 _external_buffers: &mut OutOfLineBuffers,
501 _repdef: RepDefBuilder,
502 row_number: u64,
503 num_rows: u64,
504 ) -> Result<Vec<EncodeTask>> {
505 if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
506 Ok(self.do_flush(arrays.0)?)
507 } else {
508 Ok(vec![])
509 }
510 }
511
512 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
514 if let Some(arrays) = self.accumulation_queue.flush() {
515 Ok(self.do_flush(arrays.0)?)
516 } else {
517 Ok(vec![])
518 }
519 }
520
521 fn num_columns(&self) -> u32 {
522 1
523 }
524
525 fn finish(
526 &mut self,
527 _external_buffers: &mut OutOfLineBuffers,
528 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
529 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
530 }
531}