1use super::*;
2use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
3use crate::types::{LogicalFieldId, RowId};
4use arrow::array::{
5 Array, ArrayRef, BooleanArray, GenericBinaryArray, GenericBinaryBuilder, GenericStringArray,
6 GenericStringBuilder, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, UInt64Array,
7 new_empty_array,
8};
9use arrow::compute;
10use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::{Error, Result};
13use llkv_storage::{
14 pager::{BatchGet, GetResult, Pager},
15 serialization::deserialize_array,
16 types::PhysicalKey,
17};
18use rustc_hash::{FxHashMap, FxHashSet};
19use simd_r_drive_entry_handle::EntryHandle;
20use std::borrow::Cow;
21use std::sync::Arc;
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
24pub enum GatherNullPolicy {
25 ErrorOnMissing,
27 IncludeNulls,
29 DropNulls,
31}
32
33#[derive(Clone, Debug, Default, Eq, PartialEq)]
34pub struct Projection {
35 pub logical_field_id: LogicalFieldId,
36 pub alias: Option<String>,
37}
38
39impl Projection {
40 pub fn new(logical_field_id: LogicalFieldId) -> Self {
41 Self {
42 logical_field_id,
43 alias: None,
44 }
45 }
46
47 pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
48 Self {
49 logical_field_id,
50 alias: Some(alias.into()),
51 }
52 }
53}
54
55impl From<LogicalFieldId> for Projection {
56 fn from(logical_field_id: LogicalFieldId) -> Self {
57 Projection::new(logical_field_id)
58 }
59}
60
61impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
62 fn from(value: (LogicalFieldId, S)) -> Self {
63 Projection::with_alias(value.0, value.1)
64 }
65}
66
67impl GatherNullPolicy {
68 #[inline]
69 fn allow_missing(self) -> bool {
70 !matches!(self, Self::ErrorOnMissing)
71 }
72}
73
74pub struct MultiGatherContext {
75 field_infos: Vec<(LogicalFieldId, DataType)>,
76 plans: Vec<FieldPlan>,
77 chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
78 row_index: FxHashMap<u64, usize>,
79 row_scratch: Vec<Option<(usize, usize)>>,
80 chunk_keys: Vec<PhysicalKey>,
81}
82
83impl MultiGatherContext {
84 fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
85 Self {
86 chunk_cache: FxHashMap::default(),
87 row_index: FxHashMap::default(),
88 row_scratch: Vec::new(),
89 chunk_keys: Vec::new(),
90 field_infos,
91 plans,
92 }
93 }
94
95 #[inline]
96 pub fn is_empty(&self) -> bool {
97 self.plans.is_empty()
98 }
99
100 #[inline]
101 fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
102 &self.field_infos
103 }
104
105 #[inline]
106 fn plans(&self) -> &[FieldPlan] {
107 &self.plans
108 }
109
110 #[inline]
111 fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
112 &self.chunk_cache
113 }
114
115 #[inline]
116 fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
117 &mut self.chunk_cache
118 }
119
120 #[inline]
121 fn plans_mut(&mut self) -> &mut [FieldPlan] {
122 &mut self.plans
123 }
124
125 fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
126 std::mem::take(&mut self.chunk_keys)
127 }
128
129 fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
130 self.chunk_keys = keys;
131 }
132
133 fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
134 std::mem::take(&mut self.row_index)
135 }
136
137 fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
138 self.row_index = row_index;
139 }
140
141 fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
142 std::mem::take(&mut self.row_scratch)
143 }
144
145 fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
146 self.row_scratch = scratch;
147 }
148
149 pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
150 let first_plan = self.plans.first()?;
151 let mut chunk_idx = None;
152 for (idx, meta) in first_plan.row_metas.iter().enumerate() {
153 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
154 chunk_idx = Some(idx);
155 break;
156 }
157 }
158 if chunk_idx.is_none() {
159 let total_chunks = first_plan.row_metas.len();
160 'outer: for idx in 0..total_chunks {
161 for plan in &self.plans {
162 let meta = &plan.row_metas[idx];
163 if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
164 chunk_idx = Some(idx);
165 break 'outer;
166 }
167 }
168 }
169 }
170 let idx = chunk_idx?;
171
172 let mut span_min = u64::MAX;
173 let mut span_max = 0u64;
174 for plan in &self.plans {
175 let meta = plan.row_metas.get(idx)?;
176 span_min = span_min.min(meta.min_val_u64);
177 span_max = span_max.max(meta.max_val_u64);
178 }
179
180 if span_min > span_max {
181 return None;
182 }
183
184 Some((idx, span_min, span_max))
185 }
186}
187
188#[derive(Clone, Debug)]
189struct FieldPlan {
190 dtype: DataType,
191 value_metas: Vec<ChunkMetadata>,
192 row_metas: Vec<ChunkMetadata>,
193 candidate_indices: Vec<usize>,
194}
195
196#[derive(Clone, Copy)]
197enum RowLocator<'a> {
198 Dense { base: RowId },
199 Sparse { index: &'a FxHashMap<RowId, usize> },
200}
201
202impl<'a> RowLocator<'a> {
203 #[inline]
204 fn lookup(&self, row_id: RowId, len: usize) -> Option<usize> {
205 match self {
206 RowLocator::Dense { base } => {
207 let offset = row_id.checked_sub(*base)?;
208 if offset < len as u64 {
209 Some(offset as usize)
210 } else {
211 None
212 }
213 }
214 RowLocator::Sparse { index } => index.get(&row_id).copied(),
215 }
216 }
217}
218
219impl<P> ColumnStore<P>
220where
221 P: Pager<Blob = EntryHandle> + Send + Sync,
222{
223 pub fn gather_rows(
228 &self,
229 field_ids: &[LogicalFieldId],
230 row_ids: &[u64],
231 policy: GatherNullPolicy,
232 ) -> Result<RecordBatch> {
233 let mut ctx = self.prepare_gather_context(field_ids)?;
234 self.execute_gather_single_pass(&mut ctx, row_ids, policy)
235 }
236
237 fn execute_gather_single_pass(
243 &self,
244 ctx: &mut MultiGatherContext,
245 row_ids: &[u64],
246 policy: GatherNullPolicy,
247 ) -> Result<RecordBatch> {
248 if ctx.is_empty() {
249 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
250 }
251
252 let field_infos = ctx.field_infos().to_vec();
253
254 if row_ids.is_empty() {
255 let mut arrays = Vec::with_capacity(field_infos.len());
256 let mut fields = Vec::with_capacity(field_infos.len());
257 for (fid, dtype) in &field_infos {
258 arrays.push(new_empty_array(dtype));
259 let field_name = format!("field_{}", u64::from(*fid));
260 fields.push(Field::new(field_name, dtype.clone(), true));
261 }
262 let schema = Arc::new(Schema::new(fields));
263 return RecordBatch::try_new(schema, arrays)
264 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
265 }
266
267 let mut row_index: FxHashMap<u64, usize> =
268 FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
269 for (idx, &row_id) in row_ids.iter().enumerate() {
270 if row_index.insert(row_id, idx).is_some() {
271 return Err(Error::Internal(
272 "duplicate row_id in gather_rows_multi".into(),
273 ));
274 }
275 }
276
277 let mut sorted_row_ids = row_ids.to_vec();
278 sorted_row_ids.sort_unstable();
279
280 let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
281 {
282 let plans_mut = ctx.plans_mut();
283 for plan in plans_mut.iter_mut() {
284 plan.candidate_indices.clear();
285 for (idx, meta) in plan.row_metas.iter().enumerate() {
286 if Self::chunk_intersects(&sorted_row_ids, meta) {
287 plan.candidate_indices.push(idx);
288 chunk_keys.insert(plan.value_metas[idx].chunk_pk);
289 chunk_keys.insert(plan.row_metas[idx].chunk_pk);
290 }
291 }
292 }
293 }
294
295 let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
296 for &key in &chunk_keys {
297 chunk_requests.push(BatchGet::Raw { key });
298 }
299
300 let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
301 FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
302 if !chunk_requests.is_empty() {
303 let chunk_results = self.pager.batch_get(&chunk_requests)?;
304 for result in chunk_results {
305 if let GetResult::Raw { key, bytes } = result {
306 chunk_map.insert(key, bytes);
307 }
308 }
309 }
310
311 let allow_missing = policy.allow_missing();
312
313 let mut outputs = Vec::with_capacity(ctx.plans().len());
314 for plan in ctx.plans() {
315 let array = match &plan.dtype {
316 DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
317 &row_index,
318 row_ids.len(),
319 plan,
320 &mut chunk_map,
321 allow_missing,
322 ),
323 DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
324 &row_index,
325 row_ids.len(),
326 plan,
327 &mut chunk_map,
328 allow_missing,
329 ),
330 DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
331 &row_index,
332 row_ids.len(),
333 plan,
334 &mut chunk_map,
335 allow_missing,
336 ),
337 DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
338 &row_index,
339 row_ids.len(),
340 plan,
341 &mut chunk_map,
342 allow_missing,
343 ),
344 DataType::Boolean => Self::gather_rows_single_shot_bool(
345 &row_index,
346 row_ids.len(),
347 plan,
348 &mut chunk_map,
349 allow_missing,
350 ),
351 other => with_integer_arrow_type!(
352 other.clone(),
353 |ArrowTy| {
354 Self::gather_rows_single_shot::<ArrowTy>(
355 &row_index,
356 row_ids.len(),
357 plan,
358 &mut chunk_map,
359 allow_missing,
360 )
361 },
362 Err(Error::Internal(format!(
363 "gather_rows_multi: unsupported dtype {:?}",
364 other
365 ))),
366 ),
367 }?;
368 outputs.push(array);
369 }
370
371 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
372 Self::filter_rows_with_non_null(outputs)?
373 } else {
374 outputs
375 };
376
377 let mut fields = Vec::with_capacity(field_infos.len());
378 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
379 let array = &outputs[idx];
380 let field_name = format!("field_{}", u64::from(*fid));
381 let nullable = match policy {
382 GatherNullPolicy::IncludeNulls => true,
383 _ => array.null_count() > 0,
384 };
385 fields.push(Field::new(field_name, dtype.clone(), nullable));
386 }
387
388 let schema = Arc::new(Schema::new(fields));
389 RecordBatch::try_new(schema, outputs)
390 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
391 }
392
393 pub fn prepare_gather_context(
394 &self,
395 field_ids: &[LogicalFieldId],
396 ) -> Result<MultiGatherContext> {
397 let mut field_infos = Vec::with_capacity(field_ids.len());
398 for &fid in field_ids {
399 field_infos.push((fid, self.data_type(fid)?));
400 }
401
402 if field_infos.is_empty() {
403 return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
404 }
405
406 let catalog = self.catalog.read().unwrap();
407 let mut key_pairs = Vec::with_capacity(field_infos.len());
408 for (fid, _) in &field_infos {
409 let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
410 let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
411 key_pairs.push((value_pk, row_pk));
412 }
413 drop(catalog);
414
415 let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
416 for (value_pk, row_pk) in &key_pairs {
417 descriptor_requests.push(BatchGet::Raw { key: *value_pk });
418 descriptor_requests.push(BatchGet::Raw { key: *row_pk });
419 }
420 let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
421 let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
422 for result in descriptor_results {
423 if let GetResult::Raw { key, bytes } = result {
424 descriptor_map.insert(key, bytes);
425 }
426 }
427
428 let mut plans = Vec::with_capacity(field_infos.len());
429 for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
430 let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
431 let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
432 let value_metas =
433 Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
434
435 let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
436 let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
437 let row_metas =
438 Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
439
440 if value_metas.len() != row_metas.len() {
441 return Err(Error::Internal(
442 "gather_rows_multi: chunk count mismatch".into(),
443 ));
444 }
445
446 plans.push(FieldPlan {
447 dtype: dtype.clone(),
448 value_metas,
449 row_metas,
450 candidate_indices: Vec::new(),
451 });
452 }
453
454 Ok(MultiGatherContext::new(field_infos, plans))
455 }
456
457 pub fn gather_rows_with_reusable_context(
462 &self,
463 ctx: &mut MultiGatherContext,
464 row_ids: &[u64],
465 policy: GatherNullPolicy,
466 ) -> Result<RecordBatch> {
467 if ctx.is_empty() {
468 return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
469 }
470
471 if row_ids.is_empty() {
472 let mut arrays = Vec::with_capacity(ctx.field_infos().len());
473 let mut fields = Vec::with_capacity(ctx.field_infos().len());
474 for (fid, dtype) in ctx.field_infos() {
475 arrays.push(new_empty_array(dtype));
476 let field_name = format!("field_{}", u64::from(*fid));
477 fields.push(Field::new(field_name, dtype.clone(), true));
478 }
479 let schema = Arc::new(Schema::new(fields));
480 return RecordBatch::try_new(schema, arrays)
481 .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
482 }
483
484 let mut row_index = ctx.take_row_index();
485 let mut row_scratch = ctx.take_row_scratch();
486
487 let field_infos = ctx.field_infos().to_vec();
488 let mut chunk_keys = ctx.take_chunk_keys();
489
490 let result: Result<RecordBatch> = (|| {
491 let len = row_ids.len();
492 if row_scratch.len() < len {
493 row_scratch.resize(len, None);
494 }
495
496 let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
497 let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
498 Cow::Borrowed(row_ids)
499 } else {
500 let mut buf = row_ids.to_vec();
501 buf.sort_unstable();
502 Cow::Owned(buf)
503 };
504 let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
505
506 let dense_base = if len == 0 {
507 None
508 } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
509 {
510 Some(row_ids[0])
511 } else {
512 None
513 };
514
515 if dense_base.is_none() {
516 row_index.clear();
517 row_index.reserve(len);
518 for (idx, &row_id) in row_ids.iter().enumerate() {
519 if row_index.insert(row_id, idx).is_some() {
520 return Err(Error::Internal(
521 "duplicate row_id in gather_rows_multi".into(),
522 ));
523 }
524 }
525 } else {
526 row_index.clear();
527 }
528
529 let row_locator = if let Some(base) = dense_base {
530 RowLocator::Dense { base }
531 } else {
532 RowLocator::Sparse { index: &row_index }
533 };
534
535 chunk_keys.clear();
536
537 {
538 let plans_mut = ctx.plans_mut();
539 for plan in plans_mut.iter_mut() {
540 plan.candidate_indices.clear();
541 for (idx, meta) in plan.row_metas.iter().enumerate() {
542 if Self::chunk_intersects(sorted_row_ids, meta) {
543 plan.candidate_indices.push(idx);
544 chunk_keys.push(plan.value_metas[idx].chunk_pk);
545 chunk_keys.push(plan.row_metas[idx].chunk_pk);
546 }
547 }
548 }
549 }
550
551 chunk_keys.sort_unstable();
552 chunk_keys.dedup();
553
554 {
555 let mut pending: Vec<BatchGet> = Vec::new();
556 {
557 let cache = ctx.chunk_cache();
558 for &key in &chunk_keys {
559 if !cache.contains_key(&key) {
560 pending.push(BatchGet::Raw { key });
561 }
562 }
563 }
564
565 if !pending.is_empty() {
566 let chunk_results = self.pager.batch_get(&pending)?;
567 let cache = ctx.chunk_cache_mut();
568 for result in chunk_results {
569 if let GetResult::Raw { key, bytes } = result {
570 let array = deserialize_array(bytes)?;
571 cache.insert(key, Arc::clone(&array));
572 }
573 }
574 }
575 }
576
577 let allow_missing = policy.allow_missing();
578
579 let mut outputs = Vec::with_capacity(ctx.plans().len());
580 for plan in ctx.plans() {
581 let array = match &plan.dtype {
582 DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
583 row_ids,
584 row_locator,
585 len,
586 &plan.candidate_indices,
587 plan,
588 ctx.chunk_cache(),
589 &mut row_scratch,
590 allow_missing,
591 ),
592 DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
593 row_ids,
594 row_locator,
595 len,
596 &plan.candidate_indices,
597 plan,
598 ctx.chunk_cache(),
599 &mut row_scratch,
600 allow_missing,
601 ),
602 DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
603 row_ids,
604 row_locator,
605 len,
606 &plan.candidate_indices,
607 plan,
608 ctx.chunk_cache(),
609 &mut row_scratch,
610 allow_missing,
611 ),
612 DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
613 row_ids,
614 row_locator,
615 len,
616 &plan.candidate_indices,
617 plan,
618 ctx.chunk_cache(),
619 &mut row_scratch,
620 allow_missing,
621 ),
622 DataType::Boolean => Self::gather_rows_from_chunks_bool(
623 row_ids,
624 row_locator,
625 len,
626 &plan.candidate_indices,
627 plan,
628 ctx.chunk_cache(),
629 &mut row_scratch,
630 allow_missing,
631 ),
632 other => with_integer_arrow_type!(
633 other.clone(),
634 |ArrowTy| {
635 Self::gather_rows_from_chunks::<ArrowTy>(
636 row_ids,
637 row_locator,
638 len,
639 &plan.candidate_indices,
640 plan,
641 ctx.chunk_cache(),
642 &mut row_scratch,
643 allow_missing,
644 )
645 },
646 Err(Error::Internal(format!(
647 "gather_rows_multi: unsupported dtype {:?}",
648 other
649 ))),
650 ),
651 }?;
652 outputs.push(array);
653 }
654
655 let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
656 Self::filter_rows_with_non_null(outputs)?
657 } else {
658 outputs
659 };
660
661 let mut fields = Vec::with_capacity(field_infos.len());
662 for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
663 let array = &outputs[idx];
664 let field_name = format!("field_{}", u64::from(*fid));
665 let nullable = match policy {
666 GatherNullPolicy::IncludeNulls => true,
667 _ => array.null_count() > 0,
668 };
669 fields.push(Field::new(field_name, dtype.clone(), nullable));
670 }
671
672 let schema = Arc::new(Schema::new(fields));
673 RecordBatch::try_new(schema, outputs)
674 .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
675 })();
676
677 ctx.store_row_scratch(row_scratch);
678 ctx.store_row_index(row_index);
679 ctx.store_chunk_keys(chunk_keys);
680
681 result
682 }
683
684 fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
685 let mut metas = Vec::new();
686 if head_page_pk == 0 {
687 return Ok(metas);
688 }
689 for meta in DescriptorIterator::new(pager, head_page_pk) {
690 let meta = meta?;
691 if meta.row_count > 0 {
692 metas.push(meta);
693 }
694 }
695 Ok(metas)
696 }
697
698 #[inline]
699 fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
700 if sorted_row_ids.is_empty() || meta.row_count == 0 {
701 return false;
702 }
703 let min = meta.min_val_u64;
704 let max = meta.max_val_u64;
705 if min == 0 && max == 0 && meta.row_count > 0 {
706 return true;
707 }
708 if min > max {
709 return true;
710 }
711 let min_req = sorted_row_ids[0];
712 let max_req = *sorted_row_ids.last().unwrap();
713 if max < min_req || min > max_req {
714 return false;
715 }
716 let idx = sorted_row_ids.partition_point(|&rid| rid < min);
717 idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
718 }
719
720 fn gather_rows_single_shot_string<O>(
721 row_index: &FxHashMap<u64, usize>,
722 len: usize,
723 plan: &FieldPlan,
724 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
725 allow_missing: bool,
726 ) -> Result<ArrayRef>
727 where
728 O: OffsetSizeTrait,
729 {
730 if len == 0 {
731 let mut builder = GenericStringBuilder::<O>::new();
732 return Ok(Arc::new(builder.finish()) as ArrayRef);
733 }
734
735 let mut values: Vec<Option<String>> = vec![None; len];
736 let mut found: Vec<bool> = vec![false; len];
737
738 for &idx in &plan.candidate_indices {
739 let value_chunk = chunk_blobs
740 .remove(&plan.value_metas[idx].chunk_pk)
741 .ok_or(Error::NotFound)?;
742 let row_chunk = chunk_blobs
743 .remove(&plan.row_metas[idx].chunk_pk)
744 .ok_or(Error::NotFound)?;
745
746 let value_any = deserialize_array(value_chunk)?;
747 let value_arr = value_any
748 .as_any()
749 .downcast_ref::<GenericStringArray<O>>()
750 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
751 let row_any = deserialize_array(row_chunk)?;
752 let row_arr = row_any
753 .as_any()
754 .downcast_ref::<UInt64Array>()
755 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
756
757 for i in 0..row_arr.len() {
758 if !row_arr.is_valid(i) {
759 continue;
760 }
761 let row_id = row_arr.value(i);
762 if let Some(&out_idx) = row_index.get(&row_id) {
763 found[out_idx] = true;
764 if value_arr.is_null(i) {
765 values[out_idx] = None;
766 } else {
767 values[out_idx] = Some(value_arr.value(i).to_owned());
768 }
769 }
770 }
771 }
772
773 if !allow_missing {
774 if found.iter().any(|f| !*f) {
775 return Err(Error::Internal(
776 "gather_rows_multi: one or more requested row IDs were not found".into(),
777 ));
778 }
779 } else {
780 for (idx, was_found) in found.iter().enumerate() {
781 if !*was_found {
782 values[idx] = None;
783 }
784 }
785 }
786
787 let total_bytes: usize = values
788 .iter()
789 .filter_map(|v| v.as_ref().map(|s| s.len()))
790 .sum();
791
792 let mut builder = GenericStringBuilder::<O>::with_capacity(len, total_bytes);
793 for value in values {
794 match value {
795 Some(s) => builder.append_value(&s),
796 None => builder.append_null(),
797 }
798 }
799
800 Ok(Arc::new(builder.finish()) as ArrayRef)
801 }
802
803 #[allow(clippy::too_many_arguments)]
804 fn gather_rows_single_shot_bool(
805 row_index: &FxHashMap<u64, usize>,
806 len: usize,
807 plan: &FieldPlan,
808 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
809 allow_missing: bool,
810 ) -> Result<ArrayRef> {
811 if len == 0 {
812 let empty = BooleanArray::from(Vec::<bool>::new());
813 return Ok(Arc::new(empty) as ArrayRef);
814 }
815
816 let mut values: Vec<Option<bool>> = vec![None; len];
817 let mut found: Vec<bool> = vec![false; len];
818
819 for &idx in &plan.candidate_indices {
820 let value_chunk = chunk_blobs
821 .remove(&plan.value_metas[idx].chunk_pk)
822 .ok_or(Error::NotFound)?;
823 let row_chunk = chunk_blobs
824 .remove(&plan.row_metas[idx].chunk_pk)
825 .ok_or(Error::NotFound)?;
826
827 let value_any = deserialize_array(value_chunk)?;
828 let value_arr = value_any
829 .as_any()
830 .downcast_ref::<BooleanArray>()
831 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
832 let row_any = deserialize_array(row_chunk)?;
833 let row_arr = row_any
834 .as_any()
835 .downcast_ref::<UInt64Array>()
836 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
837
838 for i in 0..row_arr.len() {
839 if !row_arr.is_valid(i) {
840 continue;
841 }
842 let row_id = row_arr.value(i);
843 if let Some(&out_idx) = row_index.get(&row_id) {
844 found[out_idx] = true;
845 if value_arr.is_null(i) {
846 values[out_idx] = None;
847 } else {
848 values[out_idx] = Some(value_arr.value(i));
849 }
850 }
851 }
852 }
853
854 if !allow_missing && found.iter().any(|f| !*f) {
855 return Err(Error::Internal(
856 "gather_rows_multi: one or more requested row IDs were not found".into(),
857 ));
858 }
859
860 let array = BooleanArray::from(values);
861 Ok(Arc::new(array) as ArrayRef)
862 }
863
864 #[allow(clippy::too_many_arguments)]
865 fn gather_rows_single_shot<T>(
866 row_index: &FxHashMap<u64, usize>,
867 len: usize,
868 plan: &FieldPlan,
869 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
870 allow_missing: bool,
871 ) -> Result<ArrayRef>
872 where
873 T: ArrowPrimitiveType,
874 {
875 if len == 0 {
876 let empty = PrimitiveBuilder::<T>::new().finish();
877 return Ok(Arc::new(empty) as ArrayRef);
878 }
879
880 let mut values: Vec<Option<T::Native>> = vec![None; len];
881 let mut found: Vec<bool> = vec![false; len];
882
883 for &idx in &plan.candidate_indices {
884 let value_chunk = chunk_blobs
885 .remove(&plan.value_metas[idx].chunk_pk)
886 .ok_or(Error::NotFound)?;
887 let row_chunk = chunk_blobs
888 .remove(&plan.row_metas[idx].chunk_pk)
889 .ok_or(Error::NotFound)?;
890
891 let value_any = deserialize_array(value_chunk)?;
892 let value_arr = value_any
893 .as_any()
894 .downcast_ref::<PrimitiveArray<T>>()
895 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
896 let row_any = deserialize_array(row_chunk)?;
897 let row_arr = row_any
898 .as_any()
899 .downcast_ref::<UInt64Array>()
900 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
901
902 for i in 0..row_arr.len() {
903 if !row_arr.is_valid(i) {
904 continue;
905 }
906 let row_id = row_arr.value(i);
907 if let Some(&out_idx) = row_index.get(&row_id) {
908 found[out_idx] = true;
909 if value_arr.is_null(i) {
910 values[out_idx] = None;
911 } else {
912 values[out_idx] = Some(value_arr.value(i));
913 }
914 }
915 }
916 }
917
918 if !allow_missing {
919 if found.iter().any(|f| !*f) {
920 return Err(Error::Internal(
921 "gather_rows_multi: one or more requested row IDs were not found".into(),
922 ));
923 }
924 } else {
925 for (idx, was_found) in found.iter().enumerate() {
926 if !*was_found {
927 values[idx] = None;
928 }
929 }
930 }
931
932 let array = PrimitiveArray::<T>::from_iter(values);
933 Ok(Arc::new(array) as ArrayRef)
934 }
935
936 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_string<O>(
938 row_ids: &[u64],
939 row_locator: RowLocator,
940 len: usize,
941 candidate_indices: &[usize],
942 plan: &FieldPlan,
943 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
944 row_scratch: &mut [Option<(usize, usize)>],
945 allow_missing: bool,
946 ) -> Result<ArrayRef>
947 where
948 O: OffsetSizeTrait,
949 {
950 if len == 0 {
951 let mut builder = GenericStringBuilder::<O>::new();
952 return Ok(Arc::new(builder.finish()) as ArrayRef);
953 }
954
955 if candidate_indices.len() == 1 {
956 let chunk_idx = candidate_indices[0];
957 let value_any = chunk_arrays
958 .get(&plan.value_metas[chunk_idx].chunk_pk)
959 .ok_or(Error::NotFound)?;
960 let row_any = chunk_arrays
961 .get(&plan.row_metas[chunk_idx].chunk_pk)
962 .ok_or(Error::NotFound)?;
963 let _value_arr = value_any
964 .as_any()
965 .downcast_ref::<GenericStringArray<O>>()
966 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
967 let row_arr = row_any
968 .as_any()
969 .downcast_ref::<UInt64Array>()
970 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
971
972 if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
973 let values = row_arr.values();
974 if let Ok(start_idx) = values.binary_search(&row_ids[0])
975 && start_idx + len <= values.len()
976 && row_ids == &values[start_idx..start_idx + len]
977 {
978 return Ok(value_any.slice(start_idx, len));
979 }
980 }
981 }
982
983 for slot in row_scratch.iter_mut().take(len) {
984 *slot = None;
985 }
986
987 let mut candidates: Vec<(usize, &GenericStringArray<O>, &UInt64Array)> =
988 Vec::with_capacity(candidate_indices.len());
989 let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
990
991 for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
992 let value_any = chunk_arrays
993 .get(&plan.value_metas[chunk_idx].chunk_pk)
994 .ok_or(Error::NotFound)?;
995 let value_arr = value_any
996 .as_any()
997 .downcast_ref::<GenericStringArray<O>>()
998 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
999 let row_any = chunk_arrays
1000 .get(&plan.row_metas[chunk_idx].chunk_pk)
1001 .ok_or(Error::NotFound)?;
1002 let row_arr = row_any
1003 .as_any()
1004 .downcast_ref::<UInt64Array>()
1005 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1006
1007 candidates.push((chunk_idx, value_arr, row_arr));
1008 chunk_lookup.insert(chunk_idx, slot);
1009
1010 for i in 0..row_arr.len() {
1011 if !row_arr.is_valid(i) {
1012 continue;
1013 }
1014 let row_id = row_arr.value(i);
1015 if let Some(out_idx) = row_locator.lookup(row_id, len) {
1016 row_scratch[out_idx] = Some((chunk_idx, i));
1017 }
1018 }
1019 }
1020
1021 let mut total_bytes = 0usize;
1022 for row_scratch_item in row_scratch.iter().take(len) {
1023 if let Some((chunk_idx, value_idx)) = row_scratch_item {
1024 let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1025 Error::Internal("gather_rows_multi: chunk lookup missing".into())
1026 })?;
1027 let (_, value_arr, _) = candidates[slot];
1028 if !value_arr.is_null(*value_idx) {
1029 total_bytes += value_arr.value(*value_idx).len();
1030 }
1031 } else if !allow_missing {
1032 return Err(Error::Internal(
1033 "gather_rows_multi: one or more requested row IDs were not found".into(),
1034 ));
1035 }
1036 }
1037
1038 let mut builder = GenericStringBuilder::<O>::with_capacity(len, total_bytes);
1039 for row_scratch_item in row_scratch.iter().take(len) {
1040 match row_scratch_item {
1041 Some((chunk_idx, value_idx)) => {
1042 let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1043 Error::Internal("gather_rows_multi: chunk lookup missing".into())
1044 })?;
1045 let (_, value_arr, _) = candidates[slot];
1046 if value_arr.is_null(*value_idx) {
1047 builder.append_null();
1048 } else {
1049 builder.append_value(value_arr.value(*value_idx));
1050 }
1051 }
1052 None => {
1053 if allow_missing {
1054 builder.append_null();
1055 } else {
1056 return Err(Error::Internal(
1057 "gather_rows_multi: one or more requested row IDs were not found"
1058 .into(),
1059 ));
1060 }
1061 }
1062 }
1063 }
1064
1065 Ok(Arc::new(builder.finish()) as ArrayRef)
1066 }
1067
1068 fn gather_rows_single_shot_binary<O>(
1069 row_index: &FxHashMap<u64, usize>,
1070 len: usize,
1071 plan: &FieldPlan,
1072 chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
1073 allow_missing: bool,
1074 ) -> Result<ArrayRef>
1075 where
1076 O: OffsetSizeTrait,
1077 {
1078 if len == 0 {
1079 let mut builder = GenericBinaryBuilder::<O>::new();
1080 return Ok(Arc::new(builder.finish()) as ArrayRef);
1081 }
1082
1083 let mut values: Vec<Option<Vec<u8>>> = vec![None; len];
1084 let mut found: Vec<bool> = vec![false; len];
1085
1086 for &idx in &plan.candidate_indices {
1087 let value_chunk = chunk_blobs
1088 .remove(&plan.value_metas[idx].chunk_pk)
1089 .ok_or(Error::NotFound)?;
1090 let row_chunk = chunk_blobs
1091 .remove(&plan.row_metas[idx].chunk_pk)
1092 .ok_or(Error::NotFound)?;
1093
1094 let value_any = deserialize_array(value_chunk)?;
1095 let value_arr = value_any
1096 .as_any()
1097 .downcast_ref::<GenericBinaryArray<O>>()
1098 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1099 let row_any = deserialize_array(row_chunk)?;
1100 let row_arr = row_any
1101 .as_any()
1102 .downcast_ref::<UInt64Array>()
1103 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1104
1105 for i in 0..row_arr.len() {
1106 if !row_arr.is_valid(i) {
1107 continue;
1108 }
1109 let row_id = row_arr.value(i);
1110 if let Some(&out_idx) = row_index.get(&row_id) {
1111 found[out_idx] = true;
1112 if value_arr.is_null(i) {
1113 values[out_idx] = None;
1114 } else {
1115 values[out_idx] = Some(value_arr.value(i).to_vec());
1116 }
1117 }
1118 }
1119 }
1120
1121 if !allow_missing {
1122 if found.iter().any(|f| !*f) {
1123 return Err(Error::Internal(
1124 "gather_rows_multi: one or more requested row IDs were not found".into(),
1125 ));
1126 }
1127 } else {
1128 for (idx, was_found) in found.iter().enumerate() {
1129 if !*was_found {
1130 values[idx] = None;
1131 }
1132 }
1133 }
1134
1135 let total_bytes: usize = values
1136 .iter()
1137 .filter_map(|v| v.as_ref().map(|b| b.len()))
1138 .sum();
1139
1140 let mut builder = GenericBinaryBuilder::<O>::with_capacity(len, total_bytes);
1141 for value in values {
1142 match value {
1143 Some(bytes) => builder.append_value(&bytes),
1144 None => builder.append_null(),
1145 }
1146 }
1147
1148 Ok(Arc::new(builder.finish()) as ArrayRef)
1149 }
1150
1151 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_binary<O>(
1153 row_ids: &[u64],
1154 row_locator: RowLocator,
1155 len: usize,
1156 candidate_indices: &[usize],
1157 plan: &FieldPlan,
1158 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1159 row_scratch: &mut [Option<(usize, usize)>],
1160 allow_missing: bool,
1161 ) -> Result<ArrayRef>
1162 where
1163 O: OffsetSizeTrait,
1164 {
1165 if len == 0 {
1166 let mut builder = GenericBinaryBuilder::<O>::new();
1167 return Ok(Arc::new(builder.finish()) as ArrayRef);
1168 }
1169
1170 if candidate_indices.len() == 1 {
1171 let chunk_idx = candidate_indices[0];
1172 let value_any = chunk_arrays
1173 .get(&plan.value_metas[chunk_idx].chunk_pk)
1174 .ok_or(Error::NotFound)?;
1175 let row_any = chunk_arrays
1176 .get(&plan.row_metas[chunk_idx].chunk_pk)
1177 .ok_or(Error::NotFound)?;
1178 let _value_arr = value_any
1179 .as_any()
1180 .downcast_ref::<GenericBinaryArray<O>>()
1181 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1182 let row_arr = row_any
1183 .as_any()
1184 .downcast_ref::<UInt64Array>()
1185 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1186
1187 if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
1188 let values_slice = row_arr.values();
1189 if let Ok(start_idx) = values_slice.binary_search(&row_ids[0])
1190 && start_idx + len <= values_slice.len()
1191 && row_ids == &values_slice[start_idx..start_idx + len]
1192 {
1193 return Ok(value_any.slice(start_idx, len));
1194 }
1195 }
1196 }
1197
1198 for slot in row_scratch.iter_mut().take(len) {
1199 *slot = None;
1200 }
1201
1202 let mut candidates: Vec<(usize, &GenericBinaryArray<O>, &UInt64Array)> =
1203 Vec::with_capacity(candidate_indices.len());
1204 let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
1205
1206 for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
1207 let value_any = chunk_arrays
1208 .get(&plan.value_metas[chunk_idx].chunk_pk)
1209 .ok_or(Error::NotFound)?;
1210 let value_arr = value_any
1211 .as_any()
1212 .downcast_ref::<GenericBinaryArray<O>>()
1213 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1214 let row_any = chunk_arrays
1215 .get(&plan.row_metas[chunk_idx].chunk_pk)
1216 .ok_or(Error::NotFound)?;
1217 let row_arr = row_any
1218 .as_any()
1219 .downcast_ref::<UInt64Array>()
1220 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1221
1222 candidates.push((chunk_idx, value_arr, row_arr));
1223 chunk_lookup.insert(chunk_idx, slot);
1224
1225 for i in 0..row_arr.len() {
1226 if !row_arr.is_valid(i) {
1227 continue;
1228 }
1229 let row_id = row_arr.value(i);
1230 if let Some(out_idx) = row_locator.lookup(row_id, len) {
1231 row_scratch[out_idx] = Some((chunk_idx, i));
1232 }
1233 }
1234 }
1235
1236 let mut total_bytes = 0usize;
1237 for row_scratch_item in row_scratch.iter().take(len) {
1238 if let Some((chunk_idx, value_idx)) = row_scratch_item {
1239 let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1240 Error::Internal("gather_rows_multi: chunk lookup missing".into())
1241 })?;
1242 let (_, value_arr, _) = candidates[slot];
1243 if !value_arr.is_null(*value_idx) {
1244 total_bytes += value_arr.value(*value_idx).len();
1245 }
1246 } else if !allow_missing {
1247 return Err(Error::Internal(
1248 "gather_rows_multi: one or more requested row IDs were not found".into(),
1249 ));
1250 }
1251 }
1252
1253 let mut builder = GenericBinaryBuilder::<O>::with_capacity(len, total_bytes);
1254 for row_scratch_item in row_scratch.iter().take(len) {
1255 match row_scratch_item {
1256 Some((chunk_idx, value_idx)) => {
1257 let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1258 Error::Internal("gather_rows_multi: chunk lookup missing".into())
1259 })?;
1260 let (_, value_arr, _) = candidates[slot];
1261 if value_arr.is_null(*value_idx) {
1262 builder.append_null();
1263 } else {
1264 builder.append_value(value_arr.value(*value_idx));
1265 }
1266 }
1267 None => {
1268 if allow_missing {
1269 builder.append_null();
1270 } else {
1271 return Err(Error::Internal(
1272 "gather_rows_multi: one or more requested row IDs were not found"
1273 .into(),
1274 ));
1275 }
1276 }
1277 }
1278 }
1279
1280 Ok(Arc::new(builder.finish()) as ArrayRef)
1281 }
1282
1283 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks_bool(
1285 row_ids: &[u64],
1286 row_locator: RowLocator,
1287 len: usize,
1288 candidate_indices: &[usize],
1289 plan: &FieldPlan,
1290 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1291 row_scratch: &mut [Option<(usize, usize)>],
1292 allow_missing: bool,
1293 ) -> Result<ArrayRef> {
1294 if len == 0 {
1295 let empty = BooleanArray::from(Vec::<bool>::new());
1296 return Ok(Arc::new(empty) as ArrayRef);
1297 }
1298
1299 if candidate_indices.len() == 1 {
1300 let chunk_idx = candidate_indices[0];
1301 let value_any = chunk_arrays
1302 .get(&plan.value_metas[chunk_idx].chunk_pk)
1303 .ok_or(Error::NotFound)?;
1304 let row_any = chunk_arrays
1305 .get(&plan.row_metas[chunk_idx].chunk_pk)
1306 .ok_or(Error::NotFound)?;
1307 let _value_arr = value_any
1308 .as_any()
1309 .downcast_ref::<BooleanArray>()
1310 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1311 let row_arr = row_any
1312 .as_any()
1313 .downcast_ref::<UInt64Array>()
1314 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1315
1316 if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
1317 let values = row_arr.values();
1318 if let Ok(start_idx) = values.binary_search(&row_ids[0])
1319 && start_idx + len <= values.len()
1320 && row_ids == &values[start_idx..start_idx + len]
1321 {
1322 return Ok(value_any.slice(start_idx, len));
1323 }
1324 }
1325 }
1326
1327 for slot in row_scratch.iter_mut().take(len) {
1328 *slot = None;
1329 }
1330
1331 let mut candidates: Vec<(usize, &BooleanArray, &UInt64Array)> =
1332 Vec::with_capacity(candidate_indices.len());
1333 let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
1334
1335 for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
1336 let value_any = chunk_arrays
1337 .get(&plan.value_metas[chunk_idx].chunk_pk)
1338 .ok_or(Error::NotFound)?;
1339 let value_arr = value_any
1340 .as_any()
1341 .downcast_ref::<BooleanArray>()
1342 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1343 let row_any = chunk_arrays
1344 .get(&plan.row_metas[chunk_idx].chunk_pk)
1345 .ok_or(Error::NotFound)?;
1346 let row_arr = row_any
1347 .as_any()
1348 .downcast_ref::<UInt64Array>()
1349 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1350
1351 candidates.push((chunk_idx, value_arr, row_arr));
1352 chunk_lookup.insert(chunk_idx, slot);
1353
1354 for i in 0..row_arr.len() {
1355 if !row_arr.is_valid(i) {
1356 continue;
1357 }
1358 let row_id = row_arr.value(i);
1359 if let Some(out_idx) = row_locator.lookup(row_id, len) {
1360 row_scratch[out_idx] = Some((chunk_idx, i));
1361 }
1362 }
1363 }
1364
1365 if !allow_missing {
1366 for slot in row_scratch.iter().take(len) {
1367 if slot.is_none() {
1368 return Err(Error::Internal(
1369 "gather_rows_multi: one or more requested row IDs were not found".into(),
1370 ));
1371 }
1372 }
1373 }
1374
1375 let mut values: Vec<Option<bool>> = vec![None; len];
1376 for (out_idx, row_scratch_item) in row_scratch.iter().take(len).enumerate() {
1377 if let Some((chunk_idx, value_idx)) = row_scratch_item
1378 && let Some(&slot) = chunk_lookup.get(chunk_idx)
1379 {
1380 let (_idx, value_arr, _) = &candidates[slot];
1381 if value_arr.is_null(*value_idx) {
1382 values[out_idx] = None;
1383 } else {
1384 values[out_idx] = Some(value_arr.value(*value_idx));
1385 }
1386 }
1387 }
1388
1389 let array = BooleanArray::from(values);
1390 Ok(Arc::new(array) as ArrayRef)
1391 }
1392
1393 #[allow(clippy::too_many_arguments)] fn gather_rows_from_chunks<T>(
1395 row_ids: &[u64],
1396 row_locator: RowLocator,
1397 len: usize,
1398 candidate_indices: &[usize],
1399 plan: &FieldPlan,
1400 chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1401 row_scratch: &mut [Option<(usize, usize)>],
1402 allow_missing: bool,
1403 ) -> Result<ArrayRef>
1404 where
1405 T: ArrowPrimitiveType,
1406 {
1407 if len == 0 {
1408 return Ok(Arc::new(PrimitiveBuilder::<T>::new().finish()) as ArrayRef);
1409 }
1410
1411 if candidate_indices.len() == 1 {
1412 let chunk_idx = candidate_indices[0];
1413 let value_any = chunk_arrays
1414 .get(&plan.value_metas[chunk_idx].chunk_pk)
1415 .ok_or(Error::NotFound)?;
1416 let row_any = chunk_arrays
1417 .get(&plan.row_metas[chunk_idx].chunk_pk)
1418 .ok_or(Error::NotFound)?;
1419 let _value_arr = value_any
1420 .as_any()
1421 .downcast_ref::<PrimitiveArray<T>>()
1422 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1423 let row_arr = row_any
1424 .as_any()
1425 .downcast_ref::<UInt64Array>()
1426 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1427
1428 if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
1429 let values = row_arr.values();
1430 if let Ok(start_idx) = values.binary_search(&row_ids[0])
1431 && start_idx + len <= values.len()
1432 && row_ids == &values[start_idx..start_idx + len]
1433 {
1434 return Ok(value_any.slice(start_idx, len));
1435 }
1436 }
1437 }
1438
1439 for slot in row_scratch.iter_mut().take(len) {
1440 *slot = None;
1441 }
1442
1443 let mut candidates: Vec<(usize, &PrimitiveArray<T>, &UInt64Array)> =
1444 Vec::with_capacity(candidate_indices.len());
1445 let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
1446
1447 for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
1448 let value_any = chunk_arrays
1449 .get(&plan.value_metas[chunk_idx].chunk_pk)
1450 .ok_or(Error::NotFound)?;
1451 let value_arr = value_any
1452 .as_any()
1453 .downcast_ref::<PrimitiveArray<T>>()
1454 .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1455 let row_any = chunk_arrays
1456 .get(&plan.row_metas[chunk_idx].chunk_pk)
1457 .ok_or(Error::NotFound)?;
1458 let row_arr = row_any
1459 .as_any()
1460 .downcast_ref::<UInt64Array>()
1461 .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1462
1463 candidates.push((chunk_idx, value_arr, row_arr));
1464 chunk_lookup.insert(chunk_idx, slot);
1465
1466 for i in 0..row_arr.len() {
1467 if !row_arr.is_valid(i) {
1468 continue;
1469 }
1470 let row_id = row_arr.value(i);
1471 if let Some(out_idx) = row_locator.lookup(row_id, len) {
1472 row_scratch[out_idx] = Some((chunk_idx, i));
1473 }
1474 }
1475 }
1476
1477 if !allow_missing {
1478 for slot in row_scratch.iter().take(len) {
1479 if slot.is_none() {
1480 return Err(Error::Internal(
1481 "gather_rows_multi: one or more requested row IDs were not found".into(),
1482 ));
1483 }
1484 }
1485 }
1486
1487 let mut builder = PrimitiveBuilder::<T>::with_capacity(len);
1488 for row_scratch_item in row_scratch.iter().take(len) {
1489 if let Some((chunk_idx, value_idx)) = *row_scratch_item {
1490 if let Some(&slot) = chunk_lookup.get(&chunk_idx) {
1491 let (idx, value_arr, _) = candidates[slot];
1492 debug_assert_eq!(idx, chunk_idx);
1493 if value_arr.is_null(value_idx) {
1494 builder.append_null();
1495 } else {
1496 builder.append_value(value_arr.value(value_idx));
1497 }
1498 } else {
1499 builder.append_null();
1500 }
1501 } else {
1502 builder.append_null();
1503 }
1504 }
1505
1506 Ok(Arc::new(builder.finish()) as ArrayRef)
1507 }
1508
1509 fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1510 if columns.is_empty() {
1511 return Ok(columns);
1512 }
1513
1514 let len = columns[0].len();
1515 if len == 0 {
1516 return Ok(columns);
1517 }
1518
1519 let mut keep = vec![false; len];
1520 for array in &columns {
1521 debug_assert_eq!(array.len(), len);
1522 if array.null_count() == 0 {
1523 keep.fill(true);
1524 break;
1525 }
1526 for (i, keep_item) in keep.iter_mut().enumerate().take(len) {
1527 if array.is_valid(i) {
1528 *keep_item = true;
1529 }
1530 }
1531 if keep.iter().all(|flag| *flag) {
1532 break;
1533 }
1534 }
1535
1536 if keep.iter().all(|flag| *flag) {
1537 return Ok(columns);
1538 }
1539
1540 let mask = BooleanArray::from(keep);
1541
1542 let mut filtered = Vec::with_capacity(columns.len());
1543 for array in columns {
1544 let filtered_column = compute::filter(array.as_ref(), &mask)
1545 .map_err(|e| Error::Internal(format!("gather_rows_multi filter: {e}")))?;
1546 filtered.push(filtered_column);
1547 }
1548 Ok(filtered)
1549 }
1550}