1use std::{fmt::Debug, ops::Range, sync::Arc, vec};
5
6use arrow::array::AsArray;
7use arrow_array::{make_array, Array, ArrayRef};
8use arrow_buffer::bit_util;
9use arrow_schema::DataType;
10use futures::{future::BoxFuture, FutureExt};
11use log::trace;
12use snafu::location;
13
14use crate::decoder::{ColumnBuffers, PageBuffers};
15use crate::utils::accumulation::AccumulationQueue;
16use crate::v2::decoder::{FieldScheduler, LogicalPageDecoder, SchedulingJob};
17use crate::v2::encoder::ArrayEncodingStrategy;
18use crate::{data::DataBlock, v2::encodings::physical::decoder_from_array_encoding};
19use lance_core::{datatypes::Field, Error, Result};
20
21use crate::{
22 decoder::{
23 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PageEncoding, PageInfo,
24 PageScheduler, PrimitivePageDecoder, PriorityRange, ScheduledScanLine, SchedulerContext,
25 },
26 encoder::{
27 EncodeTask, EncodedColumn, EncodedPage, EncodingOptions, FieldEncoder, OutOfLineBuffers,
28 },
29 repdef::RepDefBuilder,
30};
31
32#[derive(Debug)]
33struct PrimitivePage {
34 scheduler: Box<dyn PageScheduler>,
35 num_rows: u64,
36 page_index: u32,
37}
38
39#[derive(Debug)]
49pub struct PrimitiveFieldScheduler {
50 data_type: DataType,
51 page_schedulers: Vec<PrimitivePage>,
52 num_rows: u64,
53 should_validate: bool,
54 column_index: u32,
55}
56
57impl PrimitiveFieldScheduler {
58 pub fn new(
59 column_index: u32,
60 data_type: DataType,
61 pages: Arc<[PageInfo]>,
62 buffers: ColumnBuffers,
63 should_validate: bool,
64 ) -> Self {
65 let page_schedulers = pages
66 .iter()
67 .enumerate()
68 .filter(|(page_index, page)| {
70 log::trace!("Skipping empty page with index {}", page_index);
71 page.num_rows > 0
72 })
73 .map(|(page_index, page)| {
74 let page_buffers = PageBuffers {
75 column_buffers: buffers,
76 positions_and_sizes: &page.buffer_offsets_and_sizes,
77 };
78 let scheduler = decoder_from_array_encoding(
79 page.encoding.as_legacy(),
80 &page_buffers,
81 &data_type,
82 );
83 PrimitivePage {
84 scheduler,
85 num_rows: page.num_rows,
86 page_index: page_index as u32,
87 }
88 })
89 .collect::<Vec<_>>();
90 let num_rows = page_schedulers.iter().map(|p| p.num_rows).sum();
91 Self {
92 data_type,
93 page_schedulers,
94 num_rows,
95 should_validate,
96 column_index,
97 }
98 }
99}
100
101#[derive(Debug)]
102struct PrimitiveFieldSchedulingJob<'a> {
103 scheduler: &'a PrimitiveFieldScheduler,
104 ranges: Vec<Range<u64>>,
105 page_idx: usize,
106 range_idx: usize,
107 range_offset: u64,
108 global_row_offset: u64,
109}
110
111impl<'a> PrimitiveFieldSchedulingJob<'a> {
112 pub fn new(scheduler: &'a PrimitiveFieldScheduler, ranges: Vec<Range<u64>>) -> Self {
113 Self {
114 scheduler,
115 ranges,
116 page_idx: 0,
117 range_idx: 0,
118 range_offset: 0,
119 global_row_offset: 0,
120 }
121 }
122}
123
124impl SchedulingJob for PrimitiveFieldSchedulingJob<'_> {
125 fn schedule_next(
126 &mut self,
127 context: &mut SchedulerContext,
128 priority: &dyn PriorityRange,
129 ) -> Result<ScheduledScanLine> {
130 debug_assert!(self.range_idx < self.ranges.len());
131 let mut range = self.ranges[self.range_idx].clone();
133 range.start += self.range_offset;
134
135 let mut cur_page = &self.scheduler.page_schedulers[self.page_idx];
136 trace!(
137 "Current range is {:?} and current page has {} rows",
138 range,
139 cur_page.num_rows
140 );
141 while cur_page.num_rows + self.global_row_offset <= range.start {
143 self.global_row_offset += cur_page.num_rows;
144 self.page_idx += 1;
145 trace!("Skipping entire page of {} rows", cur_page.num_rows);
146 cur_page = &self.scheduler.page_schedulers[self.page_idx];
147 }
148
149 let mut ranges_in_page = Vec::new();
153 while cur_page.num_rows + self.global_row_offset > range.start {
154 range.start = range.start.max(self.global_row_offset);
155 let start_in_page = range.start - self.global_row_offset;
156 let end_in_page = start_in_page + (range.end - range.start);
157 let end_in_page = end_in_page.min(cur_page.num_rows);
158 let last_in_range = (end_in_page + self.global_row_offset) >= range.end;
159
160 ranges_in_page.push(start_in_page..end_in_page);
161 if last_in_range {
162 self.range_idx += 1;
163 if self.range_idx == self.ranges.len() {
164 break;
165 }
166 range = self.ranges[self.range_idx].clone();
167 } else {
168 break;
169 }
170 }
171
172 let num_rows_in_next = ranges_in_page.iter().map(|r| r.end - r.start).sum();
173 trace!(
174 "Scheduling {} rows across {} ranges from page with {} rows (priority={}, column_index={}, page_index={})",
175 num_rows_in_next,
176 ranges_in_page.len(),
177 cur_page.num_rows,
178 priority.current_priority(),
179 self.scheduler.column_index,
180 cur_page.page_index,
181 );
182
183 self.global_row_offset += cur_page.num_rows;
184 self.page_idx += 1;
185
186 let physical_decoder = cur_page.scheduler.schedule_ranges(
187 &ranges_in_page,
188 context.io(),
189 priority.current_priority(),
190 );
191
192 let logical_decoder = PrimitiveFieldDecoder {
193 data_type: self.scheduler.data_type.clone(),
194 column_index: self.scheduler.column_index,
195 unloaded_physical_decoder: Some(physical_decoder),
196 physical_decoder: None,
197 rows_drained: 0,
198 num_rows: num_rows_in_next,
199 should_validate: self.scheduler.should_validate,
200 page_index: cur_page.page_index,
201 };
202
203 let decoder = Box::new(logical_decoder);
204 #[allow(deprecated)]
205 let decoder_ready = context.locate_decoder(decoder);
206 Ok(ScheduledScanLine {
207 decoders: vec![MessageType::DecoderReady(decoder_ready)],
208 rows_scheduled: num_rows_in_next,
209 })
210 }
211
212 fn num_rows(&self) -> u64 {
213 self.ranges.iter().map(|r| r.end - r.start).sum()
214 }
215}
216
217impl FieldScheduler for PrimitiveFieldScheduler {
218 fn num_rows(&self) -> u64 {
219 self.num_rows
220 }
221
222 fn schedule_ranges<'a>(
223 &'a self,
224 ranges: &[std::ops::Range<u64>],
225 _filter: &FilterExpression,
227 ) -> Result<Box<dyn SchedulingJob + 'a>> {
228 Ok(Box::new(PrimitiveFieldSchedulingJob::new(
229 self,
230 ranges.to_vec(),
231 )))
232 }
233
234 fn initialize<'a>(
235 &'a self,
236 _filter: &'a FilterExpression,
237 _context: &'a SchedulerContext,
238 ) -> BoxFuture<'a, Result<()>> {
239 std::future::ready(Ok(())).boxed()
241 }
242}
243
244pub struct PrimitiveFieldDecoder {
245 data_type: DataType,
246 unloaded_physical_decoder: Option<BoxFuture<'static, Result<Box<dyn PrimitivePageDecoder>>>>,
247 physical_decoder: Option<Arc<dyn PrimitivePageDecoder>>,
248 should_validate: bool,
249 num_rows: u64,
250 rows_drained: u64,
251 column_index: u32,
252 page_index: u32,
253}
254
255impl PrimitiveFieldDecoder {
256 pub fn new_from_data(
257 physical_decoder: Arc<dyn PrimitivePageDecoder>,
258 data_type: DataType,
259 num_rows: u64,
260 should_validate: bool,
261 ) -> Self {
262 Self {
263 data_type,
264 unloaded_physical_decoder: None,
265 physical_decoder: Some(physical_decoder),
266 should_validate,
267 num_rows,
268 rows_drained: 0,
269 column_index: u32::MAX,
270 page_index: u32::MAX,
271 }
272 }
273}
274
275impl Debug for PrimitiveFieldDecoder {
276 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
277 f.debug_struct("PrimitiveFieldDecoder")
278 .field("data_type", &self.data_type)
279 .field("num_rows", &self.num_rows)
280 .field("rows_drained", &self.rows_drained)
281 .finish()
282 }
283}
284
285struct PrimitiveFieldDecodeTask {
286 rows_to_skip: u64,
287 rows_to_take: u64,
288 should_validate: bool,
289 physical_decoder: Arc<dyn PrimitivePageDecoder>,
290 data_type: DataType,
291}
292
293impl DecodeArrayTask for PrimitiveFieldDecodeTask {
294 fn decode(self: Box<Self>) -> Result<ArrayRef> {
295 let block = self
296 .physical_decoder
297 .decode(self.rows_to_skip, self.rows_to_take)?;
298
299 let array = make_array(block.into_arrow(self.data_type.clone(), self.should_validate)?);
300
301 if let DataType::Dictionary(_, _) = self.data_type {
308 let dict = array.as_any_dictionary();
309 if let Some(nulls) = array.logical_nulls() {
310 let new_indices = dict.keys().to_data();
311 let new_array = make_array(
312 new_indices
313 .into_builder()
314 .nulls(Some(nulls))
315 .add_child_data(dict.values().to_data())
316 .data_type(dict.data_type().clone())
317 .build()?,
318 );
319 return Ok(new_array);
320 }
321 }
322 Ok(array)
323 }
324}
325
326impl LogicalPageDecoder for PrimitiveFieldDecoder {
327 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<Result<()>> {
330 log::trace!(
331 "primitive wait for more than {} rows on column {} and page {} (page has {} rows)",
332 loaded_need,
333 self.column_index,
334 self.page_index,
335 self.num_rows
336 );
337 async move {
338 let physical_decoder = self.unloaded_physical_decoder.take().unwrap().await?;
339 self.physical_decoder = Some(Arc::from(physical_decoder));
340 Ok(())
341 }
342 .boxed()
343 }
344
345 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
346 if self.physical_decoder.as_ref().is_none() {
347 return Err(lance_core::Error::Internal {
348 message: format!("drain was called on primitive field decoder for data type {} on column {} but the decoder was never awaited", self.data_type, self.column_index),
349 location: location!(),
350 });
351 }
352
353 let rows_to_skip = self.rows_drained;
354 let rows_to_take = num_rows;
355
356 self.rows_drained += rows_to_take;
357
358 let task = Box::new(PrimitiveFieldDecodeTask {
359 rows_to_skip,
360 rows_to_take,
361 should_validate: self.should_validate,
362 physical_decoder: self.physical_decoder.as_ref().unwrap().clone(),
363 data_type: self.data_type.clone(),
364 });
365
366 Ok(NextDecodeTask {
367 task,
368 num_rows: rows_to_take,
369 })
370 }
371
372 fn rows_loaded(&self) -> u64 {
373 if self.unloaded_physical_decoder.is_some() {
374 0
375 } else {
376 self.num_rows
377 }
378 }
379
380 fn rows_drained(&self) -> u64 {
381 if self.unloaded_physical_decoder.is_some() {
382 0
383 } else {
384 self.rows_drained
385 }
386 }
387
388 fn num_rows(&self) -> u64 {
389 self.num_rows
390 }
391
392 fn data_type(&self) -> &DataType {
393 &self.data_type
394 }
395}
396
397pub struct PrimitiveFieldEncoder {
398 accumulation_queue: AccumulationQueue,
399 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
400 column_index: u32,
401 field: Field,
402 max_page_bytes: u64,
403}
404
405impl PrimitiveFieldEncoder {
406 pub fn try_new(
407 options: &EncodingOptions,
408 array_encoding_strategy: Arc<dyn ArrayEncodingStrategy>,
409 column_index: u32,
410 field: Field,
411 ) -> Result<Self> {
412 Ok(Self {
413 accumulation_queue: AccumulationQueue::new(
414 options.cache_bytes_per_column,
415 column_index,
416 options.keep_original_array,
417 ),
418 column_index,
419 max_page_bytes: options.max_page_bytes,
420 array_encoding_strategy,
421 field,
422 })
423 }
424
425 fn create_encode_task(&mut self, arrays: Vec<ArrayRef>) -> Result<EncodeTask> {
426 let encoder = self
427 .array_encoding_strategy
428 .create_array_encoder(&arrays, &self.field)?;
429 let column_idx = self.column_index;
430 let data_type = self.field.data_type();
431
432 Ok(tokio::task::spawn(async move {
433 let num_values = arrays.iter().map(|arr| arr.len() as u64).sum();
434 let data = DataBlock::from_arrays(&arrays, num_values);
435 let mut buffer_index = 0;
436 let array = encoder.encode(data, &data_type, &mut buffer_index)?;
437 let (data, description) = array.into_buffers();
438 Ok(EncodedPage {
439 data,
440 description: PageEncoding::Legacy(description),
441 num_rows: num_values,
442 column_idx,
443 row_number: 0, })
445 })
446 .map(|res_res| {
447 res_res.unwrap_or_else(|err| {
448 Err(Error::Internal {
449 message: format!("Encoding task failed with error: {:?}", err),
450 location: location!(),
451 })
452 })
453 })
454 .boxed())
455 }
456
457 fn do_flush(&mut self, arrays: Vec<ArrayRef>) -> Result<Vec<EncodeTask>> {
459 if arrays.len() == 1 {
460 let array = arrays.into_iter().next().unwrap();
461 let size_bytes = array.get_buffer_memory_size();
462 let num_parts = bit_util::ceil(size_bytes, self.max_page_bytes as usize);
463 let num_parts = num_parts.min(array.len());
465 if num_parts <= 1 {
466 Ok(vec![self.create_encode_task(vec![array])?])
468 } else {
469 let mut tasks = Vec::with_capacity(num_parts);
474 let mut offset = 0;
475 let part_size = bit_util::ceil(array.len(), num_parts);
476 for _ in 0..num_parts {
477 let avail = array.len() - offset;
478 if avail == 0 {
479 break;
480 }
481 let chunk_size = avail.min(part_size);
482 let part = array.slice(offset, chunk_size);
483 let task = self.create_encode_task(vec![part])?;
484 tasks.push(task);
485 offset += chunk_size;
486 }
487 Ok(tasks)
488 }
489 } else {
490 Ok(vec![self.create_encode_task(arrays)?])
494 }
495 }
496}
497
498impl FieldEncoder for PrimitiveFieldEncoder {
499 fn maybe_encode(
501 &mut self,
502 array: ArrayRef,
503 _external_buffers: &mut OutOfLineBuffers,
504 _repdef: RepDefBuilder,
505 row_number: u64,
506 num_rows: u64,
507 ) -> Result<Vec<EncodeTask>> {
508 if let Some(arrays) = self.accumulation_queue.insert(array, row_number, num_rows) {
509 Ok(self.do_flush(arrays.0)?)
510 } else {
511 Ok(vec![])
512 }
513 }
514
515 fn flush(&mut self, _external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
517 if let Some(arrays) = self.accumulation_queue.flush() {
518 Ok(self.do_flush(arrays.0)?)
519 } else {
520 Ok(vec![])
521 }
522 }
523
524 fn num_columns(&self) -> u32 {
525 1
526 }
527
528 fn finish(
529 &mut self,
530 _external_buffers: &mut OutOfLineBuffers,
531 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
532 std::future::ready(Ok(vec![EncodedColumn::default()])).boxed()
533 }
534}