1use super::*;
8use crate::gather::{
9 RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10 gather_rows_from_chunks as shared_gather_rows_from_chunks,
11 gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12 gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13 gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
14 gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
15 gather_rows_single_shot as shared_gather_rows_single_shot,
16 gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
17 gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
18 gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
19 gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
20};
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use crate::types::{LogicalFieldId, RowId};
23use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
24use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
25use arrow::record_batch::RecordBatch;
26use llkv_result::{Error, Result};
27use llkv_storage::{
28 pager::{BatchGet, GetResult, Pager},
29 serialization::deserialize_array,
30 types::PhysicalKey,
31};
32use rustc_hash::{FxHashMap, FxHashSet};
33use simd_r_drive_entry_handle::EntryHandle;
34use std::borrow::Cow;
35use std::sync::Arc;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38pub enum GatherNullPolicy {
39 ErrorOnMissing,
41 IncludeNulls,
43 DropNulls,
45}
46
47#[derive(Clone, Debug, Default, Eq, PartialEq)]
52pub struct Projection {
53 pub logical_field_id: LogicalFieldId,
54 pub alias: Option<String>,
55}
56
57impl Projection {
58 pub fn new(logical_field_id: LogicalFieldId) -> Self {
59 Self {
60 logical_field_id,
61 alias: None,
62 }
63 }
64
65 pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
66 Self {
67 logical_field_id,
68 alias: Some(alias.into()),
69 }
70 }
71}
72
73impl From<LogicalFieldId> for Projection {
74 fn from(logical_field_id: LogicalFieldId) -> Self {
75 Projection::new(logical_field_id)
76 }
77}
78
79impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
80 fn from(value: (LogicalFieldId, S)) -> Self {
81 Projection::with_alias(value.0, value.1)
82 }
83}
84
85impl GatherNullPolicy {
86 #[inline]
87 fn allow_missing(self) -> bool {
88 !matches!(self, Self::ErrorOnMissing)
89 }
90}
91
92pub struct MultiGatherContext {
97 field_infos: Vec<(LogicalFieldId, DataType)>,
98 fields: Vec<Field>,
99 plans: Vec<FieldPlan>,
100 chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
101 row_index: FxHashMap<u64, usize>,
102 row_scratch: Vec<Option<(usize, usize)>>,
103 chunk_keys: Vec<PhysicalKey>,
104}
105
106impl MultiGatherContext {
107 fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
108 let fields: Vec<Field> = field_infos
111 .iter()
112 .map(|(fid, dtype)| {
113 let field_name = format!("field_{}", u64::from(*fid));
114 Field::new(field_name, dtype.clone(), true)
115 })
116 .collect();
117
118 Self {
119 chunk_cache: FxHashMap::default(),
120 row_index: FxHashMap::default(),
121 row_scratch: Vec::new(),
122 chunk_keys: Vec::new(),
123 field_infos,
124 fields,
125 plans,
126 }
127 }
128
129 #[inline]
130 pub fn is_empty(&self) -> bool {
131 self.plans.is_empty()
132 }
133
134 #[inline]
135 fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
136 &self.field_infos
137 }
138
139 #[inline]
140 fn fields(&self) -> &[Field] {
141 &self.fields
142 }
143
144 #[inline]
145 fn plans(&self) -> &[FieldPlan] {
146 &self.plans
147 }
148
149 #[inline]
150 fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
151 &self.chunk_cache
152 }
153
154 #[inline]
155 fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
156 &mut self.chunk_cache
157 }
158
159 #[inline]
160 fn plans_mut(&mut self) -> &mut [FieldPlan] {
161 &mut self.plans
162 }
163
164 fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
165 std::mem::take(&mut self.chunk_keys)
166 }
167
168 fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
169 self.chunk_keys = keys;
170 }
171
172 fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
173 std::mem::take(&mut self.row_index)
174 }
175
176 fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
177 self.row_index = row_index;
178 }
179
180 fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
181 std::mem::take(&mut self.row_scratch)
182 }
183
184 fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
185 self.row_scratch = scratch;
186 }
187
188 pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
189 let first_plan = self.plans.first()?;
190 let mut chunk_idx = None;
191 for (idx, meta) in first_plan.row_metas.iter().enumerate() {
192 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
193 chunk_idx = Some(idx);
194 break;
195 }
196 }
197 if chunk_idx.is_none() {
198 let total_chunks = first_plan.row_metas.len();
199 'outer: for idx in 0..total_chunks {
200 for plan in &self.plans {
201 let meta = &plan.row_metas[idx];
202 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
203 chunk_idx = Some(idx);
204 break 'outer;
205 }
206 }
207 }
208 }
209 let idx = chunk_idx?;
210
211 let mut span_min = u64::MAX;
212 let mut span_max = 0u64;
213 for plan in &self.plans {
214 let meta = plan.row_metas.get(idx)?;
215 span_min = span_min.min(meta.min_val_u64);
216 span_max = span_max.max(meta.max_val_u64);
217 }
218
219 if span_min > span_max {
220 return None;
221 }
222
223 Some((idx, span_min, span_max))
224 }
225}
226
227#[derive(Clone, Debug)]
228struct FieldPlan {
229 dtype: DataType,
230 value_metas: Vec<ChunkMetadata>,
231 row_metas: Vec<ChunkMetadata>,
232 candidate_indices: Vec<usize>,
233}
234
235impl<P> ColumnStore<P>
236where
237 P: Pager<Blob = EntryHandle> + Send + Sync,
238{
239 pub fn gather_rows(
244 &self,
245 field_ids: &[LogicalFieldId],
246 row_ids: &[u64],
247 policy: GatherNullPolicy,
248 ) -> Result<RecordBatch> {
249 self.gather_rows_with_schema(field_ids, row_ids, policy, None)
250 }
251
252 pub fn gather_rows_with_schema(
258 &self,
259 field_ids: &[LogicalFieldId],
260 row_ids: &[u64],
261 policy: GatherNullPolicy,
262 expected_schema: Option<Arc<Schema>>,
263 ) -> Result<RecordBatch> {
264 let mut ctx = self.prepare_gather_context(field_ids)?;
265 self.execute_gather_single_pass_with_schema(&mut ctx, row_ids, policy, expected_schema)
266 }
267
268 fn execute_gather_single_pass_with_schema(
270 &self,
271 ctx: &mut MultiGatherContext,
272 row_ids: &[u64],
273 policy: GatherNullPolicy,
274 expected_schema: Option<Arc<Schema>>,
275 ) -> Result<RecordBatch> {
276 if ctx.is_empty() {
277 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
278 }
279
280 let field_infos = ctx.field_infos().to_vec();
281
282 if row_ids.is_empty() {
283 let (schema, arrays) = if let Some(expected_schema) = expected_schema {
285 let mut arrays = Vec::with_capacity(expected_schema.fields().len());
286 for field in expected_schema.fields() {
287 arrays.push(new_empty_array(field.data_type()));
288 }
289 (expected_schema, arrays)
290 } else {
291 let fields = ctx.fields();
294 let mut arrays = Vec::with_capacity(fields.len());
295 for field in fields {
296 arrays.push(new_empty_array(field.data_type()));
297 }
298 (Arc::new(Schema::new(fields.to_vec())), arrays)
299 };
300 return RecordBatch::try_new(schema, arrays)
301 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
302 }
303
304 let mut row_index: FxHashMap<u64, usize> =
305 FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
306 for (idx, &row_id) in row_ids.iter().enumerate() {
307 if row_index.insert(row_id, idx).is_some() {
308 return Err(Error::Internal(
309 "duplicate row_id in gather_rows_multi".into(),
310 ));
311 }
312 }
313
314 let mut sorted_row_ids = row_ids.to_vec();
315 sorted_row_ids.sort_unstable();
316
317 let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
318 {
319 let plans_mut = ctx.plans_mut();
320 for plan in plans_mut.iter_mut() {
321 plan.candidate_indices.clear();
322 for (idx, meta) in plan.row_metas.iter().enumerate() {
323 if Self::chunk_intersects(&sorted_row_ids, meta) {
324 plan.candidate_indices.push(idx);
325 chunk_keys.insert(plan.value_metas[idx].chunk_pk);
326 chunk_keys.insert(plan.row_metas[idx].chunk_pk);
327 }
328 }
329 }
330 }
331
332 let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
333 for &key in &chunk_keys {
334 chunk_requests.push(BatchGet::Raw { key });
335 }
336
337 let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
338 FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
339 if !chunk_requests.is_empty() {
340 let chunk_results = self.pager.batch_get(&chunk_requests)?;
341 for result in chunk_results {
342 if let GetResult::Raw { key, bytes } = result {
343 chunk_map.insert(key, bytes);
344 }
345 }
346 }
347
348 let allow_missing = policy.allow_missing();
349
350 let mut outputs = Vec::with_capacity(ctx.plans().len());
351 for plan in ctx.plans() {
352 let array = match &plan.dtype {
353 DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
354 &row_index,
355 row_ids.len(),
356 plan,
357 &mut chunk_map,
358 allow_missing,
359 ),
360 DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
361 &row_index,
362 row_ids.len(),
363 plan,
364 &mut chunk_map,
365 allow_missing,
366 ),
367 DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
368 &row_index,
369 row_ids.len(),
370 plan,
371 &mut chunk_map,
372 allow_missing,
373 ),
374 DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
375 &row_index,
376 row_ids.len(),
377 plan,
378 &mut chunk_map,
379 allow_missing,
380 ),
381 DataType::Boolean => Self::gather_rows_single_shot_bool(
382 &row_index,
383 row_ids.len(),
384 plan,
385 &mut chunk_map,
386 allow_missing,
387 ),
388 DataType::Struct(_) => Self::gather_rows_single_shot_struct(
389 &row_index,
390 row_ids.len(),
391 plan,
392 &mut chunk_map,
393 allow_missing,
394 &plan.dtype,
395 ),
396 other => with_integer_arrow_type!(
397 other.clone(),
398 |ArrowTy| {
399 Self::gather_rows_single_shot::<ArrowTy>(
400 &row_index,
401 row_ids.len(),
402 plan,
403 &mut chunk_map,
404 allow_missing,
405 )
406 },
407 Err(Error::Internal(format!(
408 "gather_rows_multi: unsupported dtype {:?}",
409 other
410 ))),
411 ),
412 }?;
413 outputs.push(array);
414 }
415
416 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
417 Self::filter_rows_with_non_null(outputs)?
418 } else {
419 outputs
420 };
421
422 let mut fields = Vec::with_capacity(field_infos.len());
423 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
424 let array = &outputs[idx];
425 let field_name = format!("field_{}", u64::from(*fid));
426 let nullable = match policy {
427 GatherNullPolicy::IncludeNulls => true,
428 _ => array.null_count() > 0,
429 };
430 fields.push(Field::new(field_name, dtype.clone(), nullable));
431 }
432
433 let schema = Arc::new(Schema::new(fields));
434 RecordBatch::try_new(schema, outputs)
435 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
436 }
437
438 pub fn prepare_gather_context(
439 &self,
440 field_ids: &[LogicalFieldId],
441 ) -> Result<MultiGatherContext> {
442 let mut field_infos = Vec::with_capacity(field_ids.len());
443 for &fid in field_ids {
444 field_infos.push((fid, self.data_type(fid)?));
445 }
446
447 if field_infos.is_empty() {
448 return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
449 }
450
451 let catalog = self.catalog.read().unwrap();
452 let mut key_pairs = Vec::with_capacity(field_infos.len());
453 for (fid, _) in &field_infos {
454 let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
455 let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
456 key_pairs.push((value_pk, row_pk));
457 }
458 drop(catalog);
459
460 let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
461 for (value_pk, row_pk) in &key_pairs {
462 descriptor_requests.push(BatchGet::Raw { key: *value_pk });
463 descriptor_requests.push(BatchGet::Raw { key: *row_pk });
464 }
465 let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
466 let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
467 for result in descriptor_results {
468 if let GetResult::Raw { key, bytes } = result {
469 descriptor_map.insert(key, bytes);
470 }
471 }
472
473 let mut plans = Vec::with_capacity(field_infos.len());
474 for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
475 let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
476 let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
477 let value_metas =
478 Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
479
480 let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
481 let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
482 let row_metas =
483 Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
484
485 if value_metas.len() != row_metas.len() {
486 return Err(Error::Internal(
487 "gather_rows_multi: chunk count mismatch".into(),
488 ));
489 }
490
491 plans.push(FieldPlan {
492 dtype: dtype.clone(),
493 value_metas,
494 row_metas,
495 candidate_indices: Vec::new(),
496 });
497 }
498
499 Ok(MultiGatherContext::new(field_infos, plans))
500 }
501
502 pub fn gather_rows_with_reusable_context(
507 &self,
508 ctx: &mut MultiGatherContext,
509 row_ids: &[u64],
510 policy: GatherNullPolicy,
511 ) -> Result<RecordBatch> {
512 if ctx.is_empty() {
513 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
514 }
515
516 if row_ids.is_empty() {
517 let mut arrays = Vec::with_capacity(ctx.field_infos().len());
518 let mut fields = Vec::with_capacity(ctx.field_infos().len());
519 for (fid, dtype) in ctx.field_infos() {
520 arrays.push(new_empty_array(dtype));
521 let field_name = format!("field_{}", u64::from(*fid));
522 fields.push(Field::new(field_name, dtype.clone(), true));
523 }
524 let schema = Arc::new(Schema::new(fields));
525 return RecordBatch::try_new(schema, arrays)
526 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
527 }
528
529 let mut row_index = ctx.take_row_index();
530 let mut row_scratch = ctx.take_row_scratch();
531
532 let field_infos = ctx.field_infos().to_vec();
533 let mut chunk_keys = ctx.take_chunk_keys();
534
535 let result: Result<RecordBatch> = (|| {
536 let len = row_ids.len();
537 if row_scratch.len() < len {
538 row_scratch.resize(len, None);
539 }
540
541 let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
542 let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
543 Cow::Borrowed(row_ids)
544 } else {
545 let mut buf = row_ids.to_vec();
546 buf.sort_unstable();
547 Cow::Owned(buf)
548 };
549 let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
550
551 let dense_base = if len == 0 {
552 None
553 } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
554 {
555 Some(row_ids[0])
556 } else {
557 None
558 };
559
560 if dense_base.is_none() {
561 row_index.clear();
562 row_index.reserve(len);
563 for (idx, &row_id) in row_ids.iter().enumerate() {
564 if row_index.insert(row_id, idx).is_some() {
565 return Err(Error::Internal(
566 "duplicate row_id in gather_rows_multi".into(),
567 ));
568 }
569 }
570 } else {
571 row_index.clear();
572 }
573
574 let row_locator = if let Some(base) = dense_base {
575 RowLocator::Dense { base }
576 } else {
577 RowLocator::Sparse { index: &row_index }
578 };
579
580 chunk_keys.clear();
581
582 {
583 let plans_mut = ctx.plans_mut();
584 for plan in plans_mut.iter_mut() {
585 plan.candidate_indices.clear();
586 for (idx, meta) in plan.row_metas.iter().enumerate() {
587 if Self::chunk_intersects(sorted_row_ids, meta) {
588 plan.candidate_indices.push(idx);
589 chunk_keys.push(plan.value_metas[idx].chunk_pk);
590 chunk_keys.push(plan.row_metas[idx].chunk_pk);
591 }
592 }
593 }
594 }
595
596 chunk_keys.sort_unstable();
597 chunk_keys.dedup();
598
599 {
600 let mut pending: Vec<BatchGet> = Vec::new();
601 {
602 let cache = ctx.chunk_cache();
603 for &key in &chunk_keys {
604 if !cache.contains_key(&key) {
605 pending.push(BatchGet::Raw { key });
606 }
607 }
608 }
609
610 if !pending.is_empty() {
611 let chunk_results = self.pager.batch_get(&pending)?;
612 let cache = ctx.chunk_cache_mut();
613 for result in chunk_results {
614 if let GetResult::Raw { key, bytes } = result {
615 let array = deserialize_array(bytes)?;
616 cache.insert(key, Arc::clone(&array));
617 }
618 }
619 }
620 }
621
622 let allow_missing = policy.allow_missing();
623
624 let mut outputs = Vec::with_capacity(ctx.plans().len());
625 for plan in ctx.plans() {
626 let array = match &plan.dtype {
627 DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
628 row_ids,
629 row_locator,
630 len,
631 &plan.candidate_indices,
632 plan,
633 ctx.chunk_cache(),
634 &mut row_scratch,
635 allow_missing,
636 ),
637 DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
638 row_ids,
639 row_locator,
640 len,
641 &plan.candidate_indices,
642 plan,
643 ctx.chunk_cache(),
644 &mut row_scratch,
645 allow_missing,
646 ),
647 DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
648 row_ids,
649 row_locator,
650 len,
651 &plan.candidate_indices,
652 plan,
653 ctx.chunk_cache(),
654 &mut row_scratch,
655 allow_missing,
656 ),
657 DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
658 row_ids,
659 row_locator,
660 len,
661 &plan.candidate_indices,
662 plan,
663 ctx.chunk_cache(),
664 &mut row_scratch,
665 allow_missing,
666 ),
667 DataType::Boolean => Self::gather_rows_from_chunks_bool(
668 row_ids,
669 row_locator,
670 len,
671 &plan.candidate_indices,
672 plan,
673 ctx.chunk_cache(),
674 &mut row_scratch,
675 allow_missing,
676 ),
677 DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
678 row_locator,
679 len,
680 &plan.candidate_indices,
681 plan,
682 ctx.chunk_cache(),
683 &mut row_scratch,
684 allow_missing,
685 &plan.dtype,
686 ),
687 other => with_integer_arrow_type!(
688 other.clone(),
689 |ArrowTy| {
690 Self::gather_rows_from_chunks::<ArrowTy>(
691 row_ids,
692 row_locator,
693 len,
694 &plan.candidate_indices,
695 plan,
696 ctx.chunk_cache(),
697 &mut row_scratch,
698 allow_missing,
699 )
700 },
701 Err(Error::Internal(format!(
702 "gather_rows_multi: unsupported dtype {:?}",
703 other
704 ))),
705 ),
706 }?;
707 outputs.push(array);
708 }
709
710 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
711 Self::filter_rows_with_non_null(outputs)?
712 } else {
713 outputs
714 };
715
716 let mut fields = Vec::with_capacity(field_infos.len());
717 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
718 let array = &outputs[idx];
719 let field_name = format!("field_{}", u64::from(*fid));
720 let nullable = match policy {
721 GatherNullPolicy::IncludeNulls => true,
722 _ => array.null_count() > 0,
723 };
724 fields.push(Field::new(field_name, dtype.clone(), nullable));
725 }
726
727 let schema = Arc::new(Schema::new(fields));
728 RecordBatch::try_new(schema, outputs)
729 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
730 })();
731
732 ctx.store_row_scratch(row_scratch);
733 ctx.store_row_index(row_index);
734 ctx.store_chunk_keys(chunk_keys);
735
736 result
737 }
738
739 fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
740 let mut metas = Vec::new();
741 if head_page_pk == 0 {
742 return Ok(metas);
743 }
744 for meta in DescriptorIterator::new(pager, head_page_pk) {
745 let meta = meta?;
746 if meta.row_count > 0 {
747 metas.push(meta);
748 }
749 }
750 Ok(metas)
751 }
752
753 #[inline]
754 fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
755 if sorted_row_ids.is_empty() || meta.row_count == 0 {
756 return false;
757 }
758 let min = meta.min_val_u64;
759 let max = meta.max_val_u64;
760 if min == 0 && max == 0 && meta.row_count > 0 {
761 return true;
762 }
763 if min > max {
764 return true;
765 }
766 let min_req = sorted_row_ids[0];
767 let max_req = *sorted_row_ids.last().unwrap();
768 if max < min_req || min > max_req {
769 return false;
770 }
771 let idx = sorted_row_ids.partition_point(|&rid| rid < min);
772 idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
773 }
774
775 fn gather_rows_single_shot_string<O>(
776 row_index: &FxHashMap<u64, usize>,
777 len: usize,
778 plan: &FieldPlan,
779 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
780 allow_missing: bool,
781 ) -> Result<ArrayRef>
782 where
783 O: OffsetSizeTrait,
784 {
785 shared_gather_rows_single_shot_string::<O>(
786 row_index,
787 len,
788 &plan.value_metas,
789 &plan.row_metas,
790 &plan.candidate_indices,
791 chunk_blobs,
792 allow_missing,
793 )
794 }
795
796 #[allow(clippy::too_many_arguments)]
797 fn gather_rows_single_shot_bool(
798 row_index: &FxHashMap<u64, usize>,
799 len: usize,
800 plan: &FieldPlan,
801 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
802 allow_missing: bool,
803 ) -> Result<ArrayRef> {
804 shared_gather_rows_single_shot_bool(
805 row_index,
806 len,
807 &plan.value_metas,
808 &plan.row_metas,
809 &plan.candidate_indices,
810 chunk_blobs,
811 allow_missing,
812 )
813 }
814
815 fn gather_rows_single_shot_struct(
816 row_index: &FxHashMap<u64, usize>,
817 len: usize,
818 plan: &FieldPlan,
819 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
820 allow_missing: bool,
821 dtype: &DataType,
822 ) -> Result<ArrayRef> {
823 shared_gather_rows_single_shot_struct(
824 row_index,
825 len,
826 &plan.value_metas,
827 &plan.row_metas,
828 &plan.candidate_indices,
829 chunk_blobs,
830 allow_missing,
831 dtype,
832 )
833 }
834
835 #[allow(clippy::too_many_arguments)]
836 fn gather_rows_single_shot<T>(
837 row_index: &FxHashMap<u64, usize>,
838 len: usize,
839 plan: &FieldPlan,
840 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
841 allow_missing: bool,
842 ) -> Result<ArrayRef>
843 where
844 T: ArrowPrimitiveType,
845 {
846 shared_gather_rows_single_shot::<T>(
847 row_index,
848 len,
849 &plan.value_metas,
850 &plan.row_metas,
851 &plan.candidate_indices,
852 chunk_blobs,
853 allow_missing,
854 )
855 }
856
857 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_string<O>(
859 row_ids: &[u64],
860 row_locator: RowLocator,
861 len: usize,
862 candidate_indices: &[usize],
863 plan: &FieldPlan,
864 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
865 row_scratch: &mut [Option<(usize, usize)>],
866 allow_missing: bool,
867 ) -> Result<ArrayRef>
868 where
869 O: OffsetSizeTrait,
870 {
871 shared_gather_rows_from_chunks_string::<O>(
872 row_ids,
873 row_locator,
874 len,
875 candidate_indices,
876 &plan.value_metas,
877 &plan.row_metas,
878 chunk_arrays,
879 row_scratch,
880 allow_missing,
881 )
882 }
883
884 fn gather_rows_single_shot_binary<O>(
885 row_index: &FxHashMap<u64, usize>,
886 len: usize,
887 plan: &FieldPlan,
888 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
889 allow_missing: bool,
890 ) -> Result<ArrayRef>
891 where
892 O: OffsetSizeTrait,
893 {
894 shared_gather_rows_single_shot_binary::<O>(
895 row_index,
896 len,
897 &plan.value_metas,
898 &plan.row_metas,
899 &plan.candidate_indices,
900 chunk_blobs,
901 allow_missing,
902 )
903 }
904
905 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_binary<O>(
907 row_ids: &[u64],
908 row_locator: RowLocator,
909 len: usize,
910 candidate_indices: &[usize],
911 plan: &FieldPlan,
912 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
913 row_scratch: &mut [Option<(usize, usize)>],
914 allow_missing: bool,
915 ) -> Result<ArrayRef>
916 where
917 O: OffsetSizeTrait,
918 {
919 shared_gather_rows_from_chunks_binary::<O>(
920 row_ids,
921 row_locator,
922 len,
923 candidate_indices,
924 &plan.value_metas,
925 &plan.row_metas,
926 chunk_arrays,
927 row_scratch,
928 allow_missing,
929 )
930 }
931
932 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_bool(
934 row_ids: &[u64],
935 row_locator: RowLocator,
936 len: usize,
937 candidate_indices: &[usize],
938 plan: &FieldPlan,
939 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
940 row_scratch: &mut [Option<(usize, usize)>],
941 allow_missing: bool,
942 ) -> Result<ArrayRef> {
943 shared_gather_rows_from_chunks_bool(
944 row_ids,
945 row_locator,
946 len,
947 candidate_indices,
948 &plan.value_metas,
949 &plan.row_metas,
950 chunk_arrays,
951 row_scratch,
952 allow_missing,
953 )
954 }
955
956 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_struct(
958 row_locator: RowLocator,
959 len: usize,
960 candidate_indices: &[usize],
961 plan: &FieldPlan,
962 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
963 row_scratch: &mut [Option<(usize, usize)>],
964 allow_missing: bool,
965 dtype: &DataType,
966 ) -> Result<ArrayRef> {
967 shared_gather_rows_from_chunks_struct(
968 row_locator,
969 len,
970 candidate_indices,
971 &plan.value_metas,
972 &plan.row_metas,
973 chunk_arrays,
974 row_scratch,
975 allow_missing,
976 dtype,
977 )
978 }
979
980 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks<T>(
982 row_ids: &[u64],
983 row_locator: RowLocator,
984 len: usize,
985 candidate_indices: &[usize],
986 plan: &FieldPlan,
987 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
988 row_scratch: &mut [Option<(usize, usize)>],
989 allow_missing: bool,
990 ) -> Result<ArrayRef>
991 where
992 T: ArrowPrimitiveType,
993 {
994 shared_gather_rows_from_chunks::<T>(
995 row_ids,
996 row_locator,
997 len,
998 candidate_indices,
999 &plan.value_metas,
1000 &plan.row_metas,
1001 chunk_arrays,
1002 row_scratch,
1003 allow_missing,
1004 )
1005 }
1006
1007 fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1008 shared_filter_rows_with_non_null(columns)
1009 }
1010}