1use super::*;
8use crate::gather::{
9 RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10 gather_rows_from_chunks as shared_gather_rows_from_chunks,
11 gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12 gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13 gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
14 gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
15 gather_rows_single_shot as shared_gather_rows_single_shot,
16 gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
17 gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
18 gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
19 gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
20};
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use crate::types::{LogicalFieldId, RowId};
23use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
24use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
25use arrow::record_batch::RecordBatch;
26use llkv_result::{Error, Result};
27use llkv_storage::{
28 pager::{BatchGet, GetResult, Pager},
29 serialization::deserialize_array,
30 types::PhysicalKey,
31};
32use rustc_hash::{FxHashMap, FxHashSet};
33use simd_r_drive_entry_handle::EntryHandle;
34use std::borrow::Cow;
35use std::sync::Arc;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38pub enum GatherNullPolicy {
39 ErrorOnMissing,
41 IncludeNulls,
43 DropNulls,
45}
46
47#[derive(Clone, Debug, Default, Eq, PartialEq)]
48pub struct Projection {
49 pub logical_field_id: LogicalFieldId,
50 pub alias: Option<String>,
51}
52
53impl Projection {
54 pub fn new(logical_field_id: LogicalFieldId) -> Self {
55 Self {
56 logical_field_id,
57 alias: None,
58 }
59 }
60
61 pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
62 Self {
63 logical_field_id,
64 alias: Some(alias.into()),
65 }
66 }
67}
68
69impl From<LogicalFieldId> for Projection {
70 fn from(logical_field_id: LogicalFieldId) -> Self {
71 Projection::new(logical_field_id)
72 }
73}
74
75impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
76 fn from(value: (LogicalFieldId, S)) -> Self {
77 Projection::with_alias(value.0, value.1)
78 }
79}
80
81impl GatherNullPolicy {
82 #[inline]
83 fn allow_missing(self) -> bool {
84 !matches!(self, Self::ErrorOnMissing)
85 }
86}
87
88pub struct MultiGatherContext {
89 field_infos: Vec<(LogicalFieldId, DataType)>,
90 fields: Vec<Field>,
91 plans: Vec<FieldPlan>,
92 chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
93 row_index: FxHashMap<u64, usize>,
94 row_scratch: Vec<Option<(usize, usize)>>,
95 chunk_keys: Vec<PhysicalKey>,
96}
97
98impl MultiGatherContext {
99 fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
100 let fields: Vec<Field> = field_infos
103 .iter()
104 .map(|(fid, dtype)| {
105 let field_name = format!("field_{}", u64::from(*fid));
106 Field::new(field_name, dtype.clone(), true)
107 })
108 .collect();
109
110 Self {
111 chunk_cache: FxHashMap::default(),
112 row_index: FxHashMap::default(),
113 row_scratch: Vec::new(),
114 chunk_keys: Vec::new(),
115 field_infos,
116 fields,
117 plans,
118 }
119 }
120
121 #[inline]
122 pub fn is_empty(&self) -> bool {
123 self.plans.is_empty()
124 }
125
126 #[inline]
127 fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
128 &self.field_infos
129 }
130
131 #[inline]
132 fn fields(&self) -> &[Field] {
133 &self.fields
134 }
135
136 #[inline]
137 fn plans(&self) -> &[FieldPlan] {
138 &self.plans
139 }
140
141 #[inline]
142 fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
143 &self.chunk_cache
144 }
145
146 #[inline]
147 fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
148 &mut self.chunk_cache
149 }
150
151 #[inline]
152 fn plans_mut(&mut self) -> &mut [FieldPlan] {
153 &mut self.plans
154 }
155
156 fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
157 std::mem::take(&mut self.chunk_keys)
158 }
159
160 fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
161 self.chunk_keys = keys;
162 }
163
164 fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
165 std::mem::take(&mut self.row_index)
166 }
167
168 fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
169 self.row_index = row_index;
170 }
171
172 fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
173 std::mem::take(&mut self.row_scratch)
174 }
175
176 fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
177 self.row_scratch = scratch;
178 }
179
180 pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
181 let first_plan = self.plans.first()?;
182 let mut chunk_idx = None;
183 for (idx, meta) in first_plan.row_metas.iter().enumerate() {
184 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
185 chunk_idx = Some(idx);
186 break;
187 }
188 }
189 if chunk_idx.is_none() {
190 let total_chunks = first_plan.row_metas.len();
191 'outer: for idx in 0..total_chunks {
192 for plan in &self.plans {
193 let meta = &plan.row_metas[idx];
194 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
195 chunk_idx = Some(idx);
196 break 'outer;
197 }
198 }
199 }
200 }
201 let idx = chunk_idx?;
202
203 let mut span_min = u64::MAX;
204 let mut span_max = 0u64;
205 for plan in &self.plans {
206 let meta = plan.row_metas.get(idx)?;
207 span_min = span_min.min(meta.min_val_u64);
208 span_max = span_max.max(meta.max_val_u64);
209 }
210
211 if span_min > span_max {
212 return None;
213 }
214
215 Some((idx, span_min, span_max))
216 }
217}
218
219#[derive(Clone, Debug)]
220struct FieldPlan {
221 dtype: DataType,
222 value_metas: Vec<ChunkMetadata>,
223 row_metas: Vec<ChunkMetadata>,
224 candidate_indices: Vec<usize>,
225}
226
227impl<P> ColumnStore<P>
228where
229 P: Pager<Blob = EntryHandle> + Send + Sync,
230{
231 pub fn gather_rows(
236 &self,
237 field_ids: &[LogicalFieldId],
238 row_ids: &[u64],
239 policy: GatherNullPolicy,
240 ) -> Result<RecordBatch> {
241 self.gather_rows_with_schema(field_ids, row_ids, policy, None)
242 }
243
244 pub fn gather_rows_with_schema(
250 &self,
251 field_ids: &[LogicalFieldId],
252 row_ids: &[u64],
253 policy: GatherNullPolicy,
254 expected_schema: Option<Arc<Schema>>,
255 ) -> Result<RecordBatch> {
256 let mut ctx = self.prepare_gather_context(field_ids)?;
257 self.execute_gather_single_pass_with_schema(&mut ctx, row_ids, policy, expected_schema)
258 }
259
260 fn execute_gather_single_pass_with_schema(
262 &self,
263 ctx: &mut MultiGatherContext,
264 row_ids: &[u64],
265 policy: GatherNullPolicy,
266 expected_schema: Option<Arc<Schema>>,
267 ) -> Result<RecordBatch> {
268 if ctx.is_empty() {
269 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
270 }
271
272 let field_infos = ctx.field_infos().to_vec();
273
274 if row_ids.is_empty() {
275 let (schema, arrays) = if let Some(expected_schema) = expected_schema {
277 let mut arrays = Vec::with_capacity(expected_schema.fields().len());
278 for field in expected_schema.fields() {
279 arrays.push(new_empty_array(field.data_type()));
280 }
281 (expected_schema, arrays)
282 } else {
283 let fields = ctx.fields();
286 let mut arrays = Vec::with_capacity(fields.len());
287 for field in fields {
288 arrays.push(new_empty_array(field.data_type()));
289 }
290 (Arc::new(Schema::new(fields.to_vec())), arrays)
291 };
292 return RecordBatch::try_new(schema, arrays)
293 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
294 }
295
296 let mut row_index: FxHashMap<u64, usize> =
297 FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
298 for (idx, &row_id) in row_ids.iter().enumerate() {
299 if row_index.insert(row_id, idx).is_some() {
300 return Err(Error::Internal(
301 "duplicate row_id in gather_rows_multi".into(),
302 ));
303 }
304 }
305
306 let mut sorted_row_ids = row_ids.to_vec();
307 sorted_row_ids.sort_unstable();
308
309 let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
310 {
311 let plans_mut = ctx.plans_mut();
312 for plan in plans_mut.iter_mut() {
313 plan.candidate_indices.clear();
314 for (idx, meta) in plan.row_metas.iter().enumerate() {
315 if Self::chunk_intersects(&sorted_row_ids, meta) {
316 plan.candidate_indices.push(idx);
317 chunk_keys.insert(plan.value_metas[idx].chunk_pk);
318 chunk_keys.insert(plan.row_metas[idx].chunk_pk);
319 }
320 }
321 }
322 }
323
324 let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
325 for &key in &chunk_keys {
326 chunk_requests.push(BatchGet::Raw { key });
327 }
328
329 let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
330 FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
331 if !chunk_requests.is_empty() {
332 let chunk_results = self.pager.batch_get(&chunk_requests)?;
333 for result in chunk_results {
334 if let GetResult::Raw { key, bytes } = result {
335 chunk_map.insert(key, bytes);
336 }
337 }
338 }
339
340 let allow_missing = policy.allow_missing();
341
342 let mut outputs = Vec::with_capacity(ctx.plans().len());
343 for plan in ctx.plans() {
344 let array = match &plan.dtype {
345 DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
346 &row_index,
347 row_ids.len(),
348 plan,
349 &mut chunk_map,
350 allow_missing,
351 ),
352 DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
353 &row_index,
354 row_ids.len(),
355 plan,
356 &mut chunk_map,
357 allow_missing,
358 ),
359 DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
360 &row_index,
361 row_ids.len(),
362 plan,
363 &mut chunk_map,
364 allow_missing,
365 ),
366 DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
367 &row_index,
368 row_ids.len(),
369 plan,
370 &mut chunk_map,
371 allow_missing,
372 ),
373 DataType::Boolean => Self::gather_rows_single_shot_bool(
374 &row_index,
375 row_ids.len(),
376 plan,
377 &mut chunk_map,
378 allow_missing,
379 ),
380 DataType::Struct(_) => Self::gather_rows_single_shot_struct(
381 &row_index,
382 row_ids.len(),
383 plan,
384 &mut chunk_map,
385 allow_missing,
386 &plan.dtype,
387 ),
388 other => with_integer_arrow_type!(
389 other.clone(),
390 |ArrowTy| {
391 Self::gather_rows_single_shot::<ArrowTy>(
392 &row_index,
393 row_ids.len(),
394 plan,
395 &mut chunk_map,
396 allow_missing,
397 )
398 },
399 Err(Error::Internal(format!(
400 "gather_rows_multi: unsupported dtype {:?}",
401 other
402 ))),
403 ),
404 }?;
405 outputs.push(array);
406 }
407
408 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
409 Self::filter_rows_with_non_null(outputs)?
410 } else {
411 outputs
412 };
413
414 let mut fields = Vec::with_capacity(field_infos.len());
415 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
416 let array = &outputs[idx];
417 let field_name = format!("field_{}", u64::from(*fid));
418 let nullable = match policy {
419 GatherNullPolicy::IncludeNulls => true,
420 _ => array.null_count() > 0,
421 };
422 fields.push(Field::new(field_name, dtype.clone(), nullable));
423 }
424
425 let schema = Arc::new(Schema::new(fields));
426 RecordBatch::try_new(schema, outputs)
427 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
428 }
429
430 pub fn prepare_gather_context(
431 &self,
432 field_ids: &[LogicalFieldId],
433 ) -> Result<MultiGatherContext> {
434 let mut field_infos = Vec::with_capacity(field_ids.len());
435 for &fid in field_ids {
436 field_infos.push((fid, self.data_type(fid)?));
437 }
438
439 if field_infos.is_empty() {
440 return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
441 }
442
443 let catalog = self.catalog.read().unwrap();
444 let mut key_pairs = Vec::with_capacity(field_infos.len());
445 for (fid, _) in &field_infos {
446 let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
447 let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
448 key_pairs.push((value_pk, row_pk));
449 }
450 drop(catalog);
451
452 let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
453 for (value_pk, row_pk) in &key_pairs {
454 descriptor_requests.push(BatchGet::Raw { key: *value_pk });
455 descriptor_requests.push(BatchGet::Raw { key: *row_pk });
456 }
457 let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
458 let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
459 for result in descriptor_results {
460 if let GetResult::Raw { key, bytes } = result {
461 descriptor_map.insert(key, bytes);
462 }
463 }
464
465 let mut plans = Vec::with_capacity(field_infos.len());
466 for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
467 let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
468 let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
469 let value_metas =
470 Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
471
472 let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
473 let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
474 let row_metas =
475 Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
476
477 if value_metas.len() != row_metas.len() {
478 return Err(Error::Internal(
479 "gather_rows_multi: chunk count mismatch".into(),
480 ));
481 }
482
483 plans.push(FieldPlan {
484 dtype: dtype.clone(),
485 value_metas,
486 row_metas,
487 candidate_indices: Vec::new(),
488 });
489 }
490
491 Ok(MultiGatherContext::new(field_infos, plans))
492 }
493
494 pub fn gather_rows_with_reusable_context(
499 &self,
500 ctx: &mut MultiGatherContext,
501 row_ids: &[u64],
502 policy: GatherNullPolicy,
503 ) -> Result<RecordBatch> {
504 if ctx.is_empty() {
505 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
506 }
507
508 if row_ids.is_empty() {
509 let mut arrays = Vec::with_capacity(ctx.field_infos().len());
510 let mut fields = Vec::with_capacity(ctx.field_infos().len());
511 for (fid, dtype) in ctx.field_infos() {
512 arrays.push(new_empty_array(dtype));
513 let field_name = format!("field_{}", u64::from(*fid));
514 fields.push(Field::new(field_name, dtype.clone(), true));
515 }
516 let schema = Arc::new(Schema::new(fields));
517 return RecordBatch::try_new(schema, arrays)
518 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
519 }
520
521 let mut row_index = ctx.take_row_index();
522 let mut row_scratch = ctx.take_row_scratch();
523
524 let field_infos = ctx.field_infos().to_vec();
525 let mut chunk_keys = ctx.take_chunk_keys();
526
527 let result: Result<RecordBatch> = (|| {
528 let len = row_ids.len();
529 if row_scratch.len() < len {
530 row_scratch.resize(len, None);
531 }
532
533 let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
534 let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
535 Cow::Borrowed(row_ids)
536 } else {
537 let mut buf = row_ids.to_vec();
538 buf.sort_unstable();
539 Cow::Owned(buf)
540 };
541 let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
542
543 let dense_base = if len == 0 {
544 None
545 } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
546 {
547 Some(row_ids[0])
548 } else {
549 None
550 };
551
552 if dense_base.is_none() {
553 row_index.clear();
554 row_index.reserve(len);
555 for (idx, &row_id) in row_ids.iter().enumerate() {
556 if row_index.insert(row_id, idx).is_some() {
557 return Err(Error::Internal(
558 "duplicate row_id in gather_rows_multi".into(),
559 ));
560 }
561 }
562 } else {
563 row_index.clear();
564 }
565
566 let row_locator = if let Some(base) = dense_base {
567 RowLocator::Dense { base }
568 } else {
569 RowLocator::Sparse { index: &row_index }
570 };
571
572 chunk_keys.clear();
573
574 {
575 let plans_mut = ctx.plans_mut();
576 for plan in plans_mut.iter_mut() {
577 plan.candidate_indices.clear();
578 for (idx, meta) in plan.row_metas.iter().enumerate() {
579 if Self::chunk_intersects(sorted_row_ids, meta) {
580 plan.candidate_indices.push(idx);
581 chunk_keys.push(plan.value_metas[idx].chunk_pk);
582 chunk_keys.push(plan.row_metas[idx].chunk_pk);
583 }
584 }
585 }
586 }
587
588 chunk_keys.sort_unstable();
589 chunk_keys.dedup();
590
591 {
592 let mut pending: Vec<BatchGet> = Vec::new();
593 {
594 let cache = ctx.chunk_cache();
595 for &key in &chunk_keys {
596 if !cache.contains_key(&key) {
597 pending.push(BatchGet::Raw { key });
598 }
599 }
600 }
601
602 if !pending.is_empty() {
603 let chunk_results = self.pager.batch_get(&pending)?;
604 let cache = ctx.chunk_cache_mut();
605 for result in chunk_results {
606 if let GetResult::Raw { key, bytes } = result {
607 let array = deserialize_array(bytes)?;
608 cache.insert(key, Arc::clone(&array));
609 }
610 }
611 }
612 }
613
614 let allow_missing = policy.allow_missing();
615
616 let mut outputs = Vec::with_capacity(ctx.plans().len());
617 for plan in ctx.plans() {
618 let array = match &plan.dtype {
619 DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
620 row_ids,
621 row_locator,
622 len,
623 &plan.candidate_indices,
624 plan,
625 ctx.chunk_cache(),
626 &mut row_scratch,
627 allow_missing,
628 ),
629 DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
630 row_ids,
631 row_locator,
632 len,
633 &plan.candidate_indices,
634 plan,
635 ctx.chunk_cache(),
636 &mut row_scratch,
637 allow_missing,
638 ),
639 DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
640 row_ids,
641 row_locator,
642 len,
643 &plan.candidate_indices,
644 plan,
645 ctx.chunk_cache(),
646 &mut row_scratch,
647 allow_missing,
648 ),
649 DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
650 row_ids,
651 row_locator,
652 len,
653 &plan.candidate_indices,
654 plan,
655 ctx.chunk_cache(),
656 &mut row_scratch,
657 allow_missing,
658 ),
659 DataType::Boolean => Self::gather_rows_from_chunks_bool(
660 row_ids,
661 row_locator,
662 len,
663 &plan.candidate_indices,
664 plan,
665 ctx.chunk_cache(),
666 &mut row_scratch,
667 allow_missing,
668 ),
669 DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
670 row_locator,
671 len,
672 &plan.candidate_indices,
673 plan,
674 ctx.chunk_cache(),
675 &mut row_scratch,
676 allow_missing,
677 &plan.dtype,
678 ),
679 other => with_integer_arrow_type!(
680 other.clone(),
681 |ArrowTy| {
682 Self::gather_rows_from_chunks::<ArrowTy>(
683 row_ids,
684 row_locator,
685 len,
686 &plan.candidate_indices,
687 plan,
688 ctx.chunk_cache(),
689 &mut row_scratch,
690 allow_missing,
691 )
692 },
693 Err(Error::Internal(format!(
694 "gather_rows_multi: unsupported dtype {:?}",
695 other
696 ))),
697 ),
698 }?;
699 outputs.push(array);
700 }
701
702 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
703 Self::filter_rows_with_non_null(outputs)?
704 } else {
705 outputs
706 };
707
708 let mut fields = Vec::with_capacity(field_infos.len());
709 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
710 let array = &outputs[idx];
711 let field_name = format!("field_{}", u64::from(*fid));
712 let nullable = match policy {
713 GatherNullPolicy::IncludeNulls => true,
714 _ => array.null_count() > 0,
715 };
716 fields.push(Field::new(field_name, dtype.clone(), nullable));
717 }
718
719 let schema = Arc::new(Schema::new(fields));
720 RecordBatch::try_new(schema, outputs)
721 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
722 })();
723
724 ctx.store_row_scratch(row_scratch);
725 ctx.store_row_index(row_index);
726 ctx.store_chunk_keys(chunk_keys);
727
728 result
729 }
730
731 fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
732 let mut metas = Vec::new();
733 if head_page_pk == 0 {
734 return Ok(metas);
735 }
736 for meta in DescriptorIterator::new(pager, head_page_pk) {
737 let meta = meta?;
738 if meta.row_count > 0 {
739 metas.push(meta);
740 }
741 }
742 Ok(metas)
743 }
744
745 #[inline]
746 fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
747 if sorted_row_ids.is_empty() || meta.row_count == 0 {
748 return false;
749 }
750 let min = meta.min_val_u64;
751 let max = meta.max_val_u64;
752 if min == 0 && max == 0 && meta.row_count > 0 {
753 return true;
754 }
755 if min > max {
756 return true;
757 }
758 let min_req = sorted_row_ids[0];
759 let max_req = *sorted_row_ids.last().unwrap();
760 if max < min_req || min > max_req {
761 return false;
762 }
763 let idx = sorted_row_ids.partition_point(|&rid| rid < min);
764 idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
765 }
766
767 fn gather_rows_single_shot_string<O>(
768 row_index: &FxHashMap<u64, usize>,
769 len: usize,
770 plan: &FieldPlan,
771 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
772 allow_missing: bool,
773 ) -> Result<ArrayRef>
774 where
775 O: OffsetSizeTrait,
776 {
777 shared_gather_rows_single_shot_string::<O>(
778 row_index,
779 len,
780 &plan.value_metas,
781 &plan.row_metas,
782 &plan.candidate_indices,
783 chunk_blobs,
784 allow_missing,
785 )
786 }
787
788 #[allow(clippy::too_many_arguments)]
789 fn gather_rows_single_shot_bool(
790 row_index: &FxHashMap<u64, usize>,
791 len: usize,
792 plan: &FieldPlan,
793 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
794 allow_missing: bool,
795 ) -> Result<ArrayRef> {
796 shared_gather_rows_single_shot_bool(
797 row_index,
798 len,
799 &plan.value_metas,
800 &plan.row_metas,
801 &plan.candidate_indices,
802 chunk_blobs,
803 allow_missing,
804 )
805 }
806
807 fn gather_rows_single_shot_struct(
808 row_index: &FxHashMap<u64, usize>,
809 len: usize,
810 plan: &FieldPlan,
811 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
812 allow_missing: bool,
813 dtype: &DataType,
814 ) -> Result<ArrayRef> {
815 shared_gather_rows_single_shot_struct(
816 row_index,
817 len,
818 &plan.value_metas,
819 &plan.row_metas,
820 &plan.candidate_indices,
821 chunk_blobs,
822 allow_missing,
823 dtype,
824 )
825 }
826
827 #[allow(clippy::too_many_arguments)]
828 fn gather_rows_single_shot<T>(
829 row_index: &FxHashMap<u64, usize>,
830 len: usize,
831 plan: &FieldPlan,
832 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
833 allow_missing: bool,
834 ) -> Result<ArrayRef>
835 where
836 T: ArrowPrimitiveType,
837 {
838 shared_gather_rows_single_shot::<T>(
839 row_index,
840 len,
841 &plan.value_metas,
842 &plan.row_metas,
843 &plan.candidate_indices,
844 chunk_blobs,
845 allow_missing,
846 )
847 }
848
849 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_string<O>(
851 row_ids: &[u64],
852 row_locator: RowLocator,
853 len: usize,
854 candidate_indices: &[usize],
855 plan: &FieldPlan,
856 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
857 row_scratch: &mut [Option<(usize, usize)>],
858 allow_missing: bool,
859 ) -> Result<ArrayRef>
860 where
861 O: OffsetSizeTrait,
862 {
863 shared_gather_rows_from_chunks_string::<O>(
864 row_ids,
865 row_locator,
866 len,
867 candidate_indices,
868 &plan.value_metas,
869 &plan.row_metas,
870 chunk_arrays,
871 row_scratch,
872 allow_missing,
873 )
874 }
875
876 fn gather_rows_single_shot_binary<O>(
877 row_index: &FxHashMap<u64, usize>,
878 len: usize,
879 plan: &FieldPlan,
880 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
881 allow_missing: bool,
882 ) -> Result<ArrayRef>
883 where
884 O: OffsetSizeTrait,
885 {
886 shared_gather_rows_single_shot_binary::<O>(
887 row_index,
888 len,
889 &plan.value_metas,
890 &plan.row_metas,
891 &plan.candidate_indices,
892 chunk_blobs,
893 allow_missing,
894 )
895 }
896
897 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_binary<O>(
899 row_ids: &[u64],
900 row_locator: RowLocator,
901 len: usize,
902 candidate_indices: &[usize],
903 plan: &FieldPlan,
904 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
905 row_scratch: &mut [Option<(usize, usize)>],
906 allow_missing: bool,
907 ) -> Result<ArrayRef>
908 where
909 O: OffsetSizeTrait,
910 {
911 shared_gather_rows_from_chunks_binary::<O>(
912 row_ids,
913 row_locator,
914 len,
915 candidate_indices,
916 &plan.value_metas,
917 &plan.row_metas,
918 chunk_arrays,
919 row_scratch,
920 allow_missing,
921 )
922 }
923
924 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_bool(
926 row_ids: &[u64],
927 row_locator: RowLocator,
928 len: usize,
929 candidate_indices: &[usize],
930 plan: &FieldPlan,
931 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
932 row_scratch: &mut [Option<(usize, usize)>],
933 allow_missing: bool,
934 ) -> Result<ArrayRef> {
935 shared_gather_rows_from_chunks_bool(
936 row_ids,
937 row_locator,
938 len,
939 candidate_indices,
940 &plan.value_metas,
941 &plan.row_metas,
942 chunk_arrays,
943 row_scratch,
944 allow_missing,
945 )
946 }
947
948 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_struct(
950 row_locator: RowLocator,
951 len: usize,
952 candidate_indices: &[usize],
953 plan: &FieldPlan,
954 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
955 row_scratch: &mut [Option<(usize, usize)>],
956 allow_missing: bool,
957 dtype: &DataType,
958 ) -> Result<ArrayRef> {
959 shared_gather_rows_from_chunks_struct(
960 row_locator,
961 len,
962 candidate_indices,
963 &plan.value_metas,
964 &plan.row_metas,
965 chunk_arrays,
966 row_scratch,
967 allow_missing,
968 dtype,
969 )
970 }
971
972 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks<T>(
974 row_ids: &[u64],
975 row_locator: RowLocator,
976 len: usize,
977 candidate_indices: &[usize],
978 plan: &FieldPlan,
979 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
980 row_scratch: &mut [Option<(usize, usize)>],
981 allow_missing: bool,
982 ) -> Result<ArrayRef>
983 where
984 T: ArrowPrimitiveType,
985 {
986 shared_gather_rows_from_chunks::<T>(
987 row_ids,
988 row_locator,
989 len,
990 candidate_indices,
991 &plan.value_metas,
992 &plan.row_metas,
993 chunk_arrays,
994 row_scratch,
995 allow_missing,
996 )
997 }
998
999 fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1000 shared_filter_rows_with_non_null(columns)
1001 }
1002}