llkv_column_map/store/
projection.rs

1use super::*;
2use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
3use crate::types::LogicalFieldId;
4use arrow::array::{
5    Array, ArrayRef, BooleanArray, GenericBinaryArray, GenericBinaryBuilder, GenericStringArray,
6    GenericStringBuilder, OffsetSizeTrait, PrimitiveArray, PrimitiveBuilder, UInt64Array,
7    new_empty_array,
8};
9use arrow::compute;
10use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
11use arrow::record_batch::RecordBatch;
12use llkv_result::{Error, Result};
13use llkv_storage::{
14    pager::{BatchGet, GetResult, Pager},
15    serialization::deserialize_array,
16    types::PhysicalKey,
17};
18use rustc_hash::{FxHashMap, FxHashSet};
19use simd_r_drive_entry_handle::EntryHandle;
20use std::borrow::Cow;
21use std::sync::Arc;
22
23#[derive(Clone, Copy, Debug, PartialEq, Eq)]
24pub enum GatherNullPolicy {
25    /// Any missing row ID results in an error.
26    ErrorOnMissing,
27    /// Missing rows surface as nulls in the result arrays.
28    IncludeNulls,
29    /// Missing rows and rows where all projected columns are null are dropped from the output.
30    DropNulls,
31}
32
33#[derive(Clone, Debug, Default, Eq, PartialEq)]
34pub struct Projection {
35    pub logical_field_id: LogicalFieldId,
36    pub alias: Option<String>,
37}
38
39impl Projection {
40    pub fn new(logical_field_id: LogicalFieldId) -> Self {
41        Self {
42            logical_field_id,
43            alias: None,
44        }
45    }
46
47    pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
48        Self {
49            logical_field_id,
50            alias: Some(alias.into()),
51        }
52    }
53}
54
55impl From<LogicalFieldId> for Projection {
56    fn from(logical_field_id: LogicalFieldId) -> Self {
57        Projection::new(logical_field_id)
58    }
59}
60
61impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
62    fn from(value: (LogicalFieldId, S)) -> Self {
63        Projection::with_alias(value.0, value.1)
64    }
65}
66
67impl GatherNullPolicy {
68    #[inline]
69    fn allow_missing(self) -> bool {
70        !matches!(self, Self::ErrorOnMissing)
71    }
72}
73
74pub struct MultiGatherContext {
75    field_infos: Vec<(LogicalFieldId, DataType)>,
76    plans: Vec<FieldPlan>,
77    chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
78    row_index: FxHashMap<u64, usize>,
79    row_scratch: Vec<Option<(usize, usize)>>,
80    chunk_keys: Vec<PhysicalKey>,
81}
82
83impl MultiGatherContext {
84    fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
85        Self {
86            chunk_cache: FxHashMap::default(),
87            row_index: FxHashMap::default(),
88            row_scratch: Vec::new(),
89            chunk_keys: Vec::new(),
90            field_infos,
91            plans,
92        }
93    }
94
95    #[inline]
96    pub fn is_empty(&self) -> bool {
97        self.plans.is_empty()
98    }
99
100    #[inline]
101    fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
102        &self.field_infos
103    }
104
105    #[inline]
106    fn plans(&self) -> &[FieldPlan] {
107        &self.plans
108    }
109
110    #[inline]
111    fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
112        &self.chunk_cache
113    }
114
115    #[inline]
116    fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
117        &mut self.chunk_cache
118    }
119
120    #[inline]
121    fn plans_mut(&mut self) -> &mut [FieldPlan] {
122        &mut self.plans
123    }
124
125    fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
126        std::mem::take(&mut self.chunk_keys)
127    }
128
129    fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
130        self.chunk_keys = keys;
131    }
132
133    fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
134        std::mem::take(&mut self.row_index)
135    }
136
137    fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
138        self.row_index = row_index;
139    }
140
141    fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
142        std::mem::take(&mut self.row_scratch)
143    }
144
145    fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
146        self.row_scratch = scratch;
147    }
148
149    pub fn chunk_span_for_row(&self, row_id: u64) -> Option<(usize, u64, u64)> {
150        let first_plan = self.plans.first()?;
151        let mut chunk_idx = None;
152        for (idx, meta) in first_plan.row_metas.iter().enumerate() {
153            if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
154                chunk_idx = Some(idx);
155                break;
156            }
157        }
158        if chunk_idx.is_none() {
159            let total_chunks = first_plan.row_metas.len();
160            'outer: for idx in 0..total_chunks {
161                for plan in &self.plans {
162                    let meta = &plan.row_metas[idx];
163                    if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
164                        chunk_idx = Some(idx);
165                        break 'outer;
166                    }
167                }
168            }
169        }
170        let idx = chunk_idx?;
171
172        let mut span_min = u64::MAX;
173        let mut span_max = 0u64;
174        for plan in &self.plans {
175            let meta = plan.row_metas.get(idx)?;
176            span_min = span_min.min(meta.min_val_u64);
177            span_max = span_max.max(meta.max_val_u64);
178        }
179
180        if span_min > span_max {
181            return None;
182        }
183
184        Some((idx, span_min, span_max))
185    }
186}
187
188#[derive(Clone, Debug)]
189struct FieldPlan {
190    dtype: DataType,
191    value_metas: Vec<ChunkMetadata>,
192    row_metas: Vec<ChunkMetadata>,
193    candidate_indices: Vec<usize>,
194}
195
196#[derive(Clone, Copy)]
197enum RowLocator<'a> {
198    Dense { base: u64 },
199    Sparse { index: &'a FxHashMap<u64, usize> },
200}
201
202impl<'a> RowLocator<'a> {
203    #[inline]
204    fn lookup(&self, row_id: u64, len: usize) -> Option<usize> {
205        match self {
206            RowLocator::Dense { base } => {
207                let offset = row_id.checked_sub(*base)?;
208                if offset < len as u64 {
209                    Some(offset as usize)
210                } else {
211                    None
212                }
213            }
214            RowLocator::Sparse { index } => index.get(&row_id).copied(),
215        }
216    }
217}
218
219impl<P> ColumnStore<P>
220where
221    P: Pager<Blob = EntryHandle> + Send + Sync,
222{
223    /// Gathers multiple columns using a configurable null-handling policy.
224    /// When [`GatherNullPolicy::DropNulls`] is selected, rows where all
225    /// projected columns are null or missing are removed from the
226    /// resulting batch.
227    pub fn gather_rows(
228        &self,
229        field_ids: &[LogicalFieldId],
230        row_ids: &[u64],
231        policy: GatherNullPolicy,
232    ) -> Result<RecordBatch> {
233        let mut ctx = self.prepare_gather_context(field_ids)?;
234        self.execute_gather_single_pass(&mut ctx, row_ids, policy)
235    }
236
237    /// Executes a one-off gather using a freshly prepared context.
238    ///
239    /// This path reuses the planning metadata but fetches and decodes
240    /// required chunks for this call only, avoiding the reusable caches
241    /// maintained by [`Self::gather_rows_with_reusable_context`].
242    fn execute_gather_single_pass(
243        &self,
244        ctx: &mut MultiGatherContext,
245        row_ids: &[u64],
246        policy: GatherNullPolicy,
247    ) -> Result<RecordBatch> {
248        if ctx.is_empty() {
249            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
250        }
251
252        let field_infos = ctx.field_infos().to_vec();
253
254        if row_ids.is_empty() {
255            let mut arrays = Vec::with_capacity(field_infos.len());
256            let mut fields = Vec::with_capacity(field_infos.len());
257            for (fid, dtype) in &field_infos {
258                arrays.push(new_empty_array(dtype));
259                let field_name = format!("field_{}", u64::from(*fid));
260                fields.push(Field::new(field_name, dtype.clone(), true));
261            }
262            let schema = Arc::new(Schema::new(fields));
263            return RecordBatch::try_new(schema, arrays)
264                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
265        }
266
267        let mut row_index: FxHashMap<u64, usize> =
268            FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
269        for (idx, &row_id) in row_ids.iter().enumerate() {
270            if row_index.insert(row_id, idx).is_some() {
271                return Err(Error::Internal(
272                    "duplicate row_id in gather_rows_multi".into(),
273                ));
274            }
275        }
276
277        let mut sorted_row_ids = row_ids.to_vec();
278        sorted_row_ids.sort_unstable();
279
280        let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
281        {
282            let plans_mut = ctx.plans_mut();
283            for plan in plans_mut.iter_mut() {
284                plan.candidate_indices.clear();
285                for (idx, meta) in plan.row_metas.iter().enumerate() {
286                    if Self::chunk_intersects(&sorted_row_ids, meta) {
287                        plan.candidate_indices.push(idx);
288                        chunk_keys.insert(plan.value_metas[idx].chunk_pk);
289                        chunk_keys.insert(plan.row_metas[idx].chunk_pk);
290                    }
291                }
292            }
293        }
294
295        let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
296        for &key in &chunk_keys {
297            chunk_requests.push(BatchGet::Raw { key });
298        }
299
300        let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
301            FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
302        if !chunk_requests.is_empty() {
303            let chunk_results = self.pager.batch_get(&chunk_requests)?;
304            for result in chunk_results {
305                if let GetResult::Raw { key, bytes } = result {
306                    chunk_map.insert(key, bytes);
307                }
308            }
309        }
310
311        let allow_missing = policy.allow_missing();
312
313        let mut outputs = Vec::with_capacity(ctx.plans().len());
314        for plan in ctx.plans() {
315            let array = match &plan.dtype {
316                DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
317                    &row_index,
318                    row_ids.len(),
319                    plan,
320                    &mut chunk_map,
321                    allow_missing,
322                ),
323                DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
324                    &row_index,
325                    row_ids.len(),
326                    plan,
327                    &mut chunk_map,
328                    allow_missing,
329                ),
330                DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
331                    &row_index,
332                    row_ids.len(),
333                    plan,
334                    &mut chunk_map,
335                    allow_missing,
336                ),
337                DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
338                    &row_index,
339                    row_ids.len(),
340                    plan,
341                    &mut chunk_map,
342                    allow_missing,
343                ),
344                DataType::Boolean => Self::gather_rows_single_shot_bool(
345                    &row_index,
346                    row_ids.len(),
347                    plan,
348                    &mut chunk_map,
349                    allow_missing,
350                ),
351                other => with_integer_arrow_type!(
352                    other.clone(),
353                    |ArrowTy| {
354                        Self::gather_rows_single_shot::<ArrowTy>(
355                            &row_index,
356                            row_ids.len(),
357                            plan,
358                            &mut chunk_map,
359                            allow_missing,
360                        )
361                    },
362                    Err(Error::Internal(format!(
363                        "gather_rows_multi: unsupported dtype {:?}",
364                        other
365                    ))),
366                ),
367            }?;
368            outputs.push(array);
369        }
370
371        let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
372            Self::filter_rows_with_non_null(outputs)?
373        } else {
374            outputs
375        };
376
377        let mut fields = Vec::with_capacity(field_infos.len());
378        for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
379            let array = &outputs[idx];
380            let field_name = format!("field_{}", u64::from(*fid));
381            let nullable = match policy {
382                GatherNullPolicy::IncludeNulls => true,
383                _ => array.null_count() > 0,
384            };
385            fields.push(Field::new(field_name, dtype.clone(), nullable));
386        }
387
388        let schema = Arc::new(Schema::new(fields));
389        RecordBatch::try_new(schema, outputs)
390            .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
391    }
392
393    pub fn prepare_gather_context(
394        &self,
395        field_ids: &[LogicalFieldId],
396    ) -> Result<MultiGatherContext> {
397        let mut field_infos = Vec::with_capacity(field_ids.len());
398        for &fid in field_ids {
399            field_infos.push((fid, self.data_type(fid)?));
400        }
401
402        if field_infos.is_empty() {
403            return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
404        }
405
406        let catalog = self.catalog.read().unwrap();
407        let mut key_pairs = Vec::with_capacity(field_infos.len());
408        for (fid, _) in &field_infos {
409            let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
410            let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
411            key_pairs.push((value_pk, row_pk));
412        }
413        drop(catalog);
414
415        let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
416        for (value_pk, row_pk) in &key_pairs {
417            descriptor_requests.push(BatchGet::Raw { key: *value_pk });
418            descriptor_requests.push(BatchGet::Raw { key: *row_pk });
419        }
420        let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
421        let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
422        for result in descriptor_results {
423            if let GetResult::Raw { key, bytes } = result {
424                descriptor_map.insert(key, bytes);
425            }
426        }
427
428        let mut plans = Vec::with_capacity(field_infos.len());
429        for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
430            let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
431            let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
432            let value_metas =
433                Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
434
435            let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
436            let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
437            let row_metas =
438                Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
439
440            if value_metas.len() != row_metas.len() {
441                return Err(Error::Internal(
442                    "gather_rows_multi: chunk count mismatch".into(),
443                ));
444            }
445
446            plans.push(FieldPlan {
447                dtype: dtype.clone(),
448                value_metas,
449                row_metas,
450                candidate_indices: Vec::new(),
451            });
452        }
453
454        Ok(MultiGatherContext::new(field_infos, plans))
455    }
456
457    /// Gathers rows while reusing chunk caches and scratch buffers stored in the context.
458    ///
459    /// This path amortizes chunk fetch and decode costs across multiple calls by
460    /// retaining Arrow arrays and scratch state inside the provided context.
461    pub fn gather_rows_with_reusable_context(
462        &self,
463        ctx: &mut MultiGatherContext,
464        row_ids: &[u64],
465        policy: GatherNullPolicy,
466    ) -> Result<RecordBatch> {
467        if ctx.is_empty() {
468            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
469        }
470
471        if row_ids.is_empty() {
472            let mut arrays = Vec::with_capacity(ctx.field_infos().len());
473            let mut fields = Vec::with_capacity(ctx.field_infos().len());
474            for (fid, dtype) in ctx.field_infos() {
475                arrays.push(new_empty_array(dtype));
476                let field_name = format!("field_{}", u64::from(*fid));
477                fields.push(Field::new(field_name, dtype.clone(), true));
478            }
479            let schema = Arc::new(Schema::new(fields));
480            return RecordBatch::try_new(schema, arrays)
481                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
482        }
483
484        let mut row_index = ctx.take_row_index();
485        let mut row_scratch = ctx.take_row_scratch();
486
487        let field_infos = ctx.field_infos().to_vec();
488        let mut chunk_keys = ctx.take_chunk_keys();
489
490        let result: Result<RecordBatch> = (|| {
491            let len = row_ids.len();
492            if row_scratch.len() < len {
493                row_scratch.resize(len, None);
494            }
495
496            let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
497            let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
498                Cow::Borrowed(row_ids)
499            } else {
500                let mut buf = row_ids.to_vec();
501                buf.sort_unstable();
502                Cow::Owned(buf)
503            };
504            let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
505
506            let dense_base = if len == 0 {
507                None
508            } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
509            {
510                Some(row_ids[0])
511            } else {
512                None
513            };
514
515            if dense_base.is_none() {
516                row_index.clear();
517                row_index.reserve(len);
518                for (idx, &row_id) in row_ids.iter().enumerate() {
519                    if row_index.insert(row_id, idx).is_some() {
520                        return Err(Error::Internal(
521                            "duplicate row_id in gather_rows_multi".into(),
522                        ));
523                    }
524                }
525            } else {
526                row_index.clear();
527            }
528
529            let row_locator = if let Some(base) = dense_base {
530                RowLocator::Dense { base }
531            } else {
532                RowLocator::Sparse { index: &row_index }
533            };
534
535            chunk_keys.clear();
536
537            {
538                let plans_mut = ctx.plans_mut();
539                for plan in plans_mut.iter_mut() {
540                    plan.candidate_indices.clear();
541                    for (idx, meta) in plan.row_metas.iter().enumerate() {
542                        if Self::chunk_intersects(sorted_row_ids, meta) {
543                            plan.candidate_indices.push(idx);
544                            chunk_keys.push(plan.value_metas[idx].chunk_pk);
545                            chunk_keys.push(plan.row_metas[idx].chunk_pk);
546                        }
547                    }
548                }
549            }
550
551            chunk_keys.sort_unstable();
552            chunk_keys.dedup();
553
554            {
555                let mut pending: Vec<BatchGet> = Vec::new();
556                {
557                    let cache = ctx.chunk_cache();
558                    for &key in &chunk_keys {
559                        if !cache.contains_key(&key) {
560                            pending.push(BatchGet::Raw { key });
561                        }
562                    }
563                }
564
565                if !pending.is_empty() {
566                    let chunk_results = self.pager.batch_get(&pending)?;
567                    let cache = ctx.chunk_cache_mut();
568                    for result in chunk_results {
569                        if let GetResult::Raw { key, bytes } = result {
570                            let array = deserialize_array(bytes)?;
571                            cache.insert(key, Arc::clone(&array));
572                        }
573                    }
574                }
575            }
576
577            let allow_missing = policy.allow_missing();
578
579            let mut outputs = Vec::with_capacity(ctx.plans().len());
580            for plan in ctx.plans() {
581                let array = match &plan.dtype {
582                    DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
583                        row_ids,
584                        row_locator,
585                        len,
586                        &plan.candidate_indices,
587                        plan,
588                        ctx.chunk_cache(),
589                        &mut row_scratch,
590                        allow_missing,
591                    ),
592                    DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
593                        row_ids,
594                        row_locator,
595                        len,
596                        &plan.candidate_indices,
597                        plan,
598                        ctx.chunk_cache(),
599                        &mut row_scratch,
600                        allow_missing,
601                    ),
602                    DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
603                        row_ids,
604                        row_locator,
605                        len,
606                        &plan.candidate_indices,
607                        plan,
608                        ctx.chunk_cache(),
609                        &mut row_scratch,
610                        allow_missing,
611                    ),
612                    DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
613                        row_ids,
614                        row_locator,
615                        len,
616                        &plan.candidate_indices,
617                        plan,
618                        ctx.chunk_cache(),
619                        &mut row_scratch,
620                        allow_missing,
621                    ),
622                    DataType::Boolean => Self::gather_rows_from_chunks_bool(
623                        row_ids,
624                        row_locator,
625                        len,
626                        &plan.candidate_indices,
627                        plan,
628                        ctx.chunk_cache(),
629                        &mut row_scratch,
630                        allow_missing,
631                    ),
632                    other => with_integer_arrow_type!(
633                        other.clone(),
634                        |ArrowTy| {
635                            Self::gather_rows_from_chunks::<ArrowTy>(
636                                row_ids,
637                                row_locator,
638                                len,
639                                &plan.candidate_indices,
640                                plan,
641                                ctx.chunk_cache(),
642                                &mut row_scratch,
643                                allow_missing,
644                            )
645                        },
646                        Err(Error::Internal(format!(
647                            "gather_rows_multi: unsupported dtype {:?}",
648                            other
649                        ))),
650                    ),
651                }?;
652                outputs.push(array);
653            }
654
655            let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
656                Self::filter_rows_with_non_null(outputs)?
657            } else {
658                outputs
659            };
660
661            let mut fields = Vec::with_capacity(field_infos.len());
662            for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
663                let array = &outputs[idx];
664                let field_name = format!("field_{}", u64::from(*fid));
665                let nullable = match policy {
666                    GatherNullPolicy::IncludeNulls => true,
667                    _ => array.null_count() > 0,
668                };
669                fields.push(Field::new(field_name, dtype.clone(), nullable));
670            }
671
672            let schema = Arc::new(Schema::new(fields));
673            RecordBatch::try_new(schema, outputs)
674                .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
675        })();
676
677        ctx.store_row_scratch(row_scratch);
678        ctx.store_row_index(row_index);
679        ctx.store_chunk_keys(chunk_keys);
680
681        result
682    }
683
684    fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
685        let mut metas = Vec::new();
686        if head_page_pk == 0 {
687            return Ok(metas);
688        }
689        for meta in DescriptorIterator::new(pager, head_page_pk) {
690            let meta = meta?;
691            if meta.row_count > 0 {
692                metas.push(meta);
693            }
694        }
695        Ok(metas)
696    }
697
698    #[inline]
699    fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
700        if sorted_row_ids.is_empty() || meta.row_count == 0 {
701            return false;
702        }
703        let min = meta.min_val_u64;
704        let max = meta.max_val_u64;
705        if min == 0 && max == 0 && meta.row_count > 0 {
706            return true;
707        }
708        if min > max {
709            return true;
710        }
711        let min_req = sorted_row_ids[0];
712        let max_req = *sorted_row_ids.last().unwrap();
713        if max < min_req || min > max_req {
714            return false;
715        }
716        let idx = sorted_row_ids.partition_point(|&rid| rid < min);
717        idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
718    }
719
720    fn gather_rows_single_shot_string<O>(
721        row_index: &FxHashMap<u64, usize>,
722        len: usize,
723        plan: &FieldPlan,
724        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
725        allow_missing: bool,
726    ) -> Result<ArrayRef>
727    where
728        O: OffsetSizeTrait,
729    {
730        if len == 0 {
731            let mut builder = GenericStringBuilder::<O>::new();
732            return Ok(Arc::new(builder.finish()) as ArrayRef);
733        }
734
735        let mut values: Vec<Option<String>> = vec![None; len];
736        let mut found: Vec<bool> = vec![false; len];
737
738        for &idx in &plan.candidate_indices {
739            let value_chunk = chunk_blobs
740                .remove(&plan.value_metas[idx].chunk_pk)
741                .ok_or(Error::NotFound)?;
742            let row_chunk = chunk_blobs
743                .remove(&plan.row_metas[idx].chunk_pk)
744                .ok_or(Error::NotFound)?;
745
746            let value_any = deserialize_array(value_chunk)?;
747            let value_arr = value_any
748                .as_any()
749                .downcast_ref::<GenericStringArray<O>>()
750                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
751            let row_any = deserialize_array(row_chunk)?;
752            let row_arr = row_any
753                .as_any()
754                .downcast_ref::<UInt64Array>()
755                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
756
757            for i in 0..row_arr.len() {
758                if !row_arr.is_valid(i) {
759                    continue;
760                }
761                let row_id = row_arr.value(i);
762                if let Some(&out_idx) = row_index.get(&row_id) {
763                    found[out_idx] = true;
764                    if value_arr.is_null(i) {
765                        values[out_idx] = None;
766                    } else {
767                        values[out_idx] = Some(value_arr.value(i).to_owned());
768                    }
769                }
770            }
771        }
772
773        if !allow_missing {
774            if found.iter().any(|f| !*f) {
775                return Err(Error::Internal(
776                    "gather_rows_multi: one or more requested row IDs were not found".into(),
777                ));
778            }
779        } else {
780            for (idx, was_found) in found.iter().enumerate() {
781                if !*was_found {
782                    values[idx] = None;
783                }
784            }
785        }
786
787        let total_bytes: usize = values
788            .iter()
789            .filter_map(|v| v.as_ref().map(|s| s.len()))
790            .sum();
791
792        let mut builder = GenericStringBuilder::<O>::with_capacity(len, total_bytes);
793        for value in values {
794            match value {
795                Some(s) => builder.append_value(&s),
796                None => builder.append_null(),
797            }
798        }
799
800        Ok(Arc::new(builder.finish()) as ArrayRef)
801    }
802
803    #[allow(clippy::too_many_arguments)]
804    fn gather_rows_single_shot_bool(
805        row_index: &FxHashMap<u64, usize>,
806        len: usize,
807        plan: &FieldPlan,
808        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
809        allow_missing: bool,
810    ) -> Result<ArrayRef> {
811        if len == 0 {
812            let empty = BooleanArray::from(Vec::<bool>::new());
813            return Ok(Arc::new(empty) as ArrayRef);
814        }
815
816        let mut values: Vec<Option<bool>> = vec![None; len];
817        let mut found: Vec<bool> = vec![false; len];
818
819        for &idx in &plan.candidate_indices {
820            let value_chunk = chunk_blobs
821                .remove(&plan.value_metas[idx].chunk_pk)
822                .ok_or(Error::NotFound)?;
823            let row_chunk = chunk_blobs
824                .remove(&plan.row_metas[idx].chunk_pk)
825                .ok_or(Error::NotFound)?;
826
827            let value_any = deserialize_array(value_chunk)?;
828            let value_arr = value_any
829                .as_any()
830                .downcast_ref::<BooleanArray>()
831                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
832            let row_any = deserialize_array(row_chunk)?;
833            let row_arr = row_any
834                .as_any()
835                .downcast_ref::<UInt64Array>()
836                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
837
838            for i in 0..row_arr.len() {
839                if !row_arr.is_valid(i) {
840                    continue;
841                }
842                let row_id = row_arr.value(i);
843                if let Some(&out_idx) = row_index.get(&row_id) {
844                    found[out_idx] = true;
845                    if value_arr.is_null(i) {
846                        values[out_idx] = None;
847                    } else {
848                        values[out_idx] = Some(value_arr.value(i));
849                    }
850                }
851            }
852        }
853
854        if !allow_missing && found.iter().any(|f| !*f) {
855            return Err(Error::Internal(
856                "gather_rows_multi: one or more requested row IDs were not found".into(),
857            ));
858        }
859
860        let array = BooleanArray::from(values);
861        Ok(Arc::new(array) as ArrayRef)
862    }
863
864    #[allow(clippy::too_many_arguments)]
865    fn gather_rows_single_shot<T>(
866        row_index: &FxHashMap<u64, usize>,
867        len: usize,
868        plan: &FieldPlan,
869        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
870        allow_missing: bool,
871    ) -> Result<ArrayRef>
872    where
873        T: ArrowPrimitiveType,
874    {
875        if len == 0 {
876            let empty = PrimitiveBuilder::<T>::new().finish();
877            return Ok(Arc::new(empty) as ArrayRef);
878        }
879
880        let mut values: Vec<Option<T::Native>> = vec![None; len];
881        let mut found: Vec<bool> = vec![false; len];
882
883        for &idx in &plan.candidate_indices {
884            let value_chunk = chunk_blobs
885                .remove(&plan.value_metas[idx].chunk_pk)
886                .ok_or(Error::NotFound)?;
887            let row_chunk = chunk_blobs
888                .remove(&plan.row_metas[idx].chunk_pk)
889                .ok_or(Error::NotFound)?;
890
891            let value_any = deserialize_array(value_chunk)?;
892            let value_arr = value_any
893                .as_any()
894                .downcast_ref::<PrimitiveArray<T>>()
895                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
896            let row_any = deserialize_array(row_chunk)?;
897            let row_arr = row_any
898                .as_any()
899                .downcast_ref::<UInt64Array>()
900                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
901
902            for i in 0..row_arr.len() {
903                if !row_arr.is_valid(i) {
904                    continue;
905                }
906                let row_id = row_arr.value(i);
907                if let Some(&out_idx) = row_index.get(&row_id) {
908                    found[out_idx] = true;
909                    if value_arr.is_null(i) {
910                        values[out_idx] = None;
911                    } else {
912                        values[out_idx] = Some(value_arr.value(i));
913                    }
914                }
915            }
916        }
917
918        if !allow_missing {
919            if found.iter().any(|f| !*f) {
920                return Err(Error::Internal(
921                    "gather_rows_multi: one or more requested row IDs were not found".into(),
922                ));
923            }
924        } else {
925            for (idx, was_found) in found.iter().enumerate() {
926                if !*was_found {
927                    values[idx] = None;
928                }
929            }
930        }
931
932        let array = PrimitiveArray::<T>::from_iter(values);
933        Ok(Arc::new(array) as ArrayRef)
934    }
935
936    #[allow(clippy::too_many_arguments)] // TODO: Refactor
937    fn gather_rows_from_chunks_string<O>(
938        row_ids: &[u64],
939        row_locator: RowLocator,
940        len: usize,
941        candidate_indices: &[usize],
942        plan: &FieldPlan,
943        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
944        row_scratch: &mut [Option<(usize, usize)>],
945        allow_missing: bool,
946    ) -> Result<ArrayRef>
947    where
948        O: OffsetSizeTrait,
949    {
950        if len == 0 {
951            let mut builder = GenericStringBuilder::<O>::new();
952            return Ok(Arc::new(builder.finish()) as ArrayRef);
953        }
954
955        if candidate_indices.len() == 1 {
956            let chunk_idx = candidate_indices[0];
957            let value_any = chunk_arrays
958                .get(&plan.value_metas[chunk_idx].chunk_pk)
959                .ok_or(Error::NotFound)?;
960            let row_any = chunk_arrays
961                .get(&plan.row_metas[chunk_idx].chunk_pk)
962                .ok_or(Error::NotFound)?;
963            let _value_arr = value_any
964                .as_any()
965                .downcast_ref::<GenericStringArray<O>>()
966                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
967            let row_arr = row_any
968                .as_any()
969                .downcast_ref::<UInt64Array>()
970                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
971
972            if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
973                let values = row_arr.values();
974                if let Ok(start_idx) = values.binary_search(&row_ids[0])
975                    && start_idx + len <= values.len()
976                    && row_ids == &values[start_idx..start_idx + len]
977                {
978                    return Ok(value_any.slice(start_idx, len));
979                }
980            }
981        }
982
983        for slot in row_scratch.iter_mut().take(len) {
984            *slot = None;
985        }
986
987        let mut candidates: Vec<(usize, &GenericStringArray<O>, &UInt64Array)> =
988            Vec::with_capacity(candidate_indices.len());
989        let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
990
991        for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
992            let value_any = chunk_arrays
993                .get(&plan.value_metas[chunk_idx].chunk_pk)
994                .ok_or(Error::NotFound)?;
995            let value_arr = value_any
996                .as_any()
997                .downcast_ref::<GenericStringArray<O>>()
998                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
999            let row_any = chunk_arrays
1000                .get(&plan.row_metas[chunk_idx].chunk_pk)
1001                .ok_or(Error::NotFound)?;
1002            let row_arr = row_any
1003                .as_any()
1004                .downcast_ref::<UInt64Array>()
1005                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1006
1007            candidates.push((chunk_idx, value_arr, row_arr));
1008            chunk_lookup.insert(chunk_idx, slot);
1009
1010            for i in 0..row_arr.len() {
1011                if !row_arr.is_valid(i) {
1012                    continue;
1013                }
1014                let row_id = row_arr.value(i);
1015                if let Some(out_idx) = row_locator.lookup(row_id, len) {
1016                    row_scratch[out_idx] = Some((chunk_idx, i));
1017                }
1018            }
1019        }
1020
1021        let mut total_bytes = 0usize;
1022        for row_scratch_item in row_scratch.iter().take(len) {
1023            if let Some((chunk_idx, value_idx)) = row_scratch_item {
1024                let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1025                    Error::Internal("gather_rows_multi: chunk lookup missing".into())
1026                })?;
1027                let (_, value_arr, _) = candidates[slot];
1028                if !value_arr.is_null(*value_idx) {
1029                    total_bytes += value_arr.value(*value_idx).len();
1030                }
1031            } else if !allow_missing {
1032                return Err(Error::Internal(
1033                    "gather_rows_multi: one or more requested row IDs were not found".into(),
1034                ));
1035            }
1036        }
1037
1038        let mut builder = GenericStringBuilder::<O>::with_capacity(len, total_bytes);
1039        for row_scratch_item in row_scratch.iter().take(len) {
1040            match row_scratch_item {
1041                Some((chunk_idx, value_idx)) => {
1042                    let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1043                        Error::Internal("gather_rows_multi: chunk lookup missing".into())
1044                    })?;
1045                    let (_, value_arr, _) = candidates[slot];
1046                    if value_arr.is_null(*value_idx) {
1047                        builder.append_null();
1048                    } else {
1049                        builder.append_value(value_arr.value(*value_idx));
1050                    }
1051                }
1052                None => {
1053                    if allow_missing {
1054                        builder.append_null();
1055                    } else {
1056                        return Err(Error::Internal(
1057                            "gather_rows_multi: one or more requested row IDs were not found"
1058                                .into(),
1059                        ));
1060                    }
1061                }
1062            }
1063        }
1064
1065        Ok(Arc::new(builder.finish()) as ArrayRef)
1066    }
1067
1068    fn gather_rows_single_shot_binary<O>(
1069        row_index: &FxHashMap<u64, usize>,
1070        len: usize,
1071        plan: &FieldPlan,
1072        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
1073        allow_missing: bool,
1074    ) -> Result<ArrayRef>
1075    where
1076        O: OffsetSizeTrait,
1077    {
1078        if len == 0 {
1079            let mut builder = GenericBinaryBuilder::<O>::new();
1080            return Ok(Arc::new(builder.finish()) as ArrayRef);
1081        }
1082
1083        let mut values: Vec<Option<Vec<u8>>> = vec![None; len];
1084        let mut found: Vec<bool> = vec![false; len];
1085
1086        for &idx in &plan.candidate_indices {
1087            let value_chunk = chunk_blobs
1088                .remove(&plan.value_metas[idx].chunk_pk)
1089                .ok_or(Error::NotFound)?;
1090            let row_chunk = chunk_blobs
1091                .remove(&plan.row_metas[idx].chunk_pk)
1092                .ok_or(Error::NotFound)?;
1093
1094            let value_any = deserialize_array(value_chunk)?;
1095            let value_arr = value_any
1096                .as_any()
1097                .downcast_ref::<GenericBinaryArray<O>>()
1098                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1099            let row_any = deserialize_array(row_chunk)?;
1100            let row_arr = row_any
1101                .as_any()
1102                .downcast_ref::<UInt64Array>()
1103                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1104
1105            for i in 0..row_arr.len() {
1106                if !row_arr.is_valid(i) {
1107                    continue;
1108                }
1109                let row_id = row_arr.value(i);
1110                if let Some(&out_idx) = row_index.get(&row_id) {
1111                    found[out_idx] = true;
1112                    if value_arr.is_null(i) {
1113                        values[out_idx] = None;
1114                    } else {
1115                        values[out_idx] = Some(value_arr.value(i).to_vec());
1116                    }
1117                }
1118            }
1119        }
1120
1121        if !allow_missing {
1122            if found.iter().any(|f| !*f) {
1123                return Err(Error::Internal(
1124                    "gather_rows_multi: one or more requested row IDs were not found".into(),
1125                ));
1126            }
1127        } else {
1128            for (idx, was_found) in found.iter().enumerate() {
1129                if !*was_found {
1130                    values[idx] = None;
1131                }
1132            }
1133        }
1134
1135        let total_bytes: usize = values
1136            .iter()
1137            .filter_map(|v| v.as_ref().map(|b| b.len()))
1138            .sum();
1139
1140        let mut builder = GenericBinaryBuilder::<O>::with_capacity(len, total_bytes);
1141        for value in values {
1142            match value {
1143                Some(bytes) => builder.append_value(&bytes),
1144                None => builder.append_null(),
1145            }
1146        }
1147
1148        Ok(Arc::new(builder.finish()) as ArrayRef)
1149    }
1150
1151    #[allow(clippy::too_many_arguments)] // TODO: Refactor
1152    fn gather_rows_from_chunks_binary<O>(
1153        row_ids: &[u64],
1154        row_locator: RowLocator,
1155        len: usize,
1156        candidate_indices: &[usize],
1157        plan: &FieldPlan,
1158        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1159        row_scratch: &mut [Option<(usize, usize)>],
1160        allow_missing: bool,
1161    ) -> Result<ArrayRef>
1162    where
1163        O: OffsetSizeTrait,
1164    {
1165        if len == 0 {
1166            let mut builder = GenericBinaryBuilder::<O>::new();
1167            return Ok(Arc::new(builder.finish()) as ArrayRef);
1168        }
1169
1170        if candidate_indices.len() == 1 {
1171            let chunk_idx = candidate_indices[0];
1172            let value_any = chunk_arrays
1173                .get(&plan.value_metas[chunk_idx].chunk_pk)
1174                .ok_or(Error::NotFound)?;
1175            let row_any = chunk_arrays
1176                .get(&plan.row_metas[chunk_idx].chunk_pk)
1177                .ok_or(Error::NotFound)?;
1178            let _value_arr = value_any
1179                .as_any()
1180                .downcast_ref::<GenericBinaryArray<O>>()
1181                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1182            let row_arr = row_any
1183                .as_any()
1184                .downcast_ref::<UInt64Array>()
1185                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1186
1187            if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
1188                let values_slice = row_arr.values();
1189                if let Ok(start_idx) = values_slice.binary_search(&row_ids[0])
1190                    && start_idx + len <= values_slice.len()
1191                    && row_ids == &values_slice[start_idx..start_idx + len]
1192                {
1193                    return Ok(value_any.slice(start_idx, len));
1194                }
1195            }
1196        }
1197
1198        for slot in row_scratch.iter_mut().take(len) {
1199            *slot = None;
1200        }
1201
1202        let mut candidates: Vec<(usize, &GenericBinaryArray<O>, &UInt64Array)> =
1203            Vec::with_capacity(candidate_indices.len());
1204        let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
1205
1206        for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
1207            let value_any = chunk_arrays
1208                .get(&plan.value_metas[chunk_idx].chunk_pk)
1209                .ok_or(Error::NotFound)?;
1210            let value_arr = value_any
1211                .as_any()
1212                .downcast_ref::<GenericBinaryArray<O>>()
1213                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1214            let row_any = chunk_arrays
1215                .get(&plan.row_metas[chunk_idx].chunk_pk)
1216                .ok_or(Error::NotFound)?;
1217            let row_arr = row_any
1218                .as_any()
1219                .downcast_ref::<UInt64Array>()
1220                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1221
1222            candidates.push((chunk_idx, value_arr, row_arr));
1223            chunk_lookup.insert(chunk_idx, slot);
1224
1225            for i in 0..row_arr.len() {
1226                if !row_arr.is_valid(i) {
1227                    continue;
1228                }
1229                let row_id = row_arr.value(i);
1230                if let Some(out_idx) = row_locator.lookup(row_id, len) {
1231                    row_scratch[out_idx] = Some((chunk_idx, i));
1232                }
1233            }
1234        }
1235
1236        let mut total_bytes = 0usize;
1237        for row_scratch_item in row_scratch.iter().take(len) {
1238            if let Some((chunk_idx, value_idx)) = row_scratch_item {
1239                let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1240                    Error::Internal("gather_rows_multi: chunk lookup missing".into())
1241                })?;
1242                let (_, value_arr, _) = candidates[slot];
1243                if !value_arr.is_null(*value_idx) {
1244                    total_bytes += value_arr.value(*value_idx).len();
1245                }
1246            } else if !allow_missing {
1247                return Err(Error::Internal(
1248                    "gather_rows_multi: one or more requested row IDs were not found".into(),
1249                ));
1250            }
1251        }
1252
1253        let mut builder = GenericBinaryBuilder::<O>::with_capacity(len, total_bytes);
1254        for row_scratch_item in row_scratch.iter().take(len) {
1255            match row_scratch_item {
1256                Some((chunk_idx, value_idx)) => {
1257                    let slot = *chunk_lookup.get(chunk_idx).ok_or_else(|| {
1258                        Error::Internal("gather_rows_multi: chunk lookup missing".into())
1259                    })?;
1260                    let (_, value_arr, _) = candidates[slot];
1261                    if value_arr.is_null(*value_idx) {
1262                        builder.append_null();
1263                    } else {
1264                        builder.append_value(value_arr.value(*value_idx));
1265                    }
1266                }
1267                None => {
1268                    if allow_missing {
1269                        builder.append_null();
1270                    } else {
1271                        return Err(Error::Internal(
1272                            "gather_rows_multi: one or more requested row IDs were not found"
1273                                .into(),
1274                        ));
1275                    }
1276                }
1277            }
1278        }
1279
1280        Ok(Arc::new(builder.finish()) as ArrayRef)
1281    }
1282
1283    #[allow(clippy::too_many_arguments)] // TODO: Refactor
1284    fn gather_rows_from_chunks_bool(
1285        row_ids: &[u64],
1286        row_locator: RowLocator,
1287        len: usize,
1288        candidate_indices: &[usize],
1289        plan: &FieldPlan,
1290        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1291        row_scratch: &mut [Option<(usize, usize)>],
1292        allow_missing: bool,
1293    ) -> Result<ArrayRef> {
1294        if len == 0 {
1295            let empty = BooleanArray::from(Vec::<bool>::new());
1296            return Ok(Arc::new(empty) as ArrayRef);
1297        }
1298
1299        if candidate_indices.len() == 1 {
1300            let chunk_idx = candidate_indices[0];
1301            let value_any = chunk_arrays
1302                .get(&plan.value_metas[chunk_idx].chunk_pk)
1303                .ok_or(Error::NotFound)?;
1304            let row_any = chunk_arrays
1305                .get(&plan.row_metas[chunk_idx].chunk_pk)
1306                .ok_or(Error::NotFound)?;
1307            let _value_arr = value_any
1308                .as_any()
1309                .downcast_ref::<BooleanArray>()
1310                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1311            let row_arr = row_any
1312                .as_any()
1313                .downcast_ref::<UInt64Array>()
1314                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1315
1316            if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
1317                let values = row_arr.values();
1318                if let Ok(start_idx) = values.binary_search(&row_ids[0])
1319                    && start_idx + len <= values.len()
1320                    && row_ids == &values[start_idx..start_idx + len]
1321                {
1322                    return Ok(value_any.slice(start_idx, len));
1323                }
1324            }
1325        }
1326
1327        for slot in row_scratch.iter_mut().take(len) {
1328            *slot = None;
1329        }
1330
1331        let mut candidates: Vec<(usize, &BooleanArray, &UInt64Array)> =
1332            Vec::with_capacity(candidate_indices.len());
1333        let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
1334
1335        for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
1336            let value_any = chunk_arrays
1337                .get(&plan.value_metas[chunk_idx].chunk_pk)
1338                .ok_or(Error::NotFound)?;
1339            let value_arr = value_any
1340                .as_any()
1341                .downcast_ref::<BooleanArray>()
1342                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1343            let row_any = chunk_arrays
1344                .get(&plan.row_metas[chunk_idx].chunk_pk)
1345                .ok_or(Error::NotFound)?;
1346            let row_arr = row_any
1347                .as_any()
1348                .downcast_ref::<UInt64Array>()
1349                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1350
1351            candidates.push((chunk_idx, value_arr, row_arr));
1352            chunk_lookup.insert(chunk_idx, slot);
1353
1354            for i in 0..row_arr.len() {
1355                if !row_arr.is_valid(i) {
1356                    continue;
1357                }
1358                let row_id = row_arr.value(i);
1359                if let Some(out_idx) = row_locator.lookup(row_id, len) {
1360                    row_scratch[out_idx] = Some((chunk_idx, i));
1361                }
1362            }
1363        }
1364
1365        if !allow_missing {
1366            for slot in row_scratch.iter().take(len) {
1367                if slot.is_none() {
1368                    return Err(Error::Internal(
1369                        "gather_rows_multi: one or more requested row IDs were not found".into(),
1370                    ));
1371                }
1372            }
1373        }
1374
1375        let mut values: Vec<Option<bool>> = vec![None; len];
1376        for (out_idx, row_scratch_item) in row_scratch.iter().take(len).enumerate() {
1377            if let Some((chunk_idx, value_idx)) = row_scratch_item
1378                && let Some(&slot) = chunk_lookup.get(chunk_idx)
1379            {
1380                let (_idx, value_arr, _) = &candidates[slot];
1381                if value_arr.is_null(*value_idx) {
1382                    values[out_idx] = None;
1383                } else {
1384                    values[out_idx] = Some(value_arr.value(*value_idx));
1385                }
1386            }
1387        }
1388
1389        let array = BooleanArray::from(values);
1390        Ok(Arc::new(array) as ArrayRef)
1391    }
1392
1393    #[allow(clippy::too_many_arguments)] // TODO: Refactor
1394    fn gather_rows_from_chunks<T>(
1395        row_ids: &[u64],
1396        row_locator: RowLocator,
1397        len: usize,
1398        candidate_indices: &[usize],
1399        plan: &FieldPlan,
1400        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1401        row_scratch: &mut [Option<(usize, usize)>],
1402        allow_missing: bool,
1403    ) -> Result<ArrayRef>
1404    where
1405        T: ArrowPrimitiveType,
1406    {
1407        if len == 0 {
1408            return Ok(Arc::new(PrimitiveBuilder::<T>::new().finish()) as ArrayRef);
1409        }
1410
1411        if candidate_indices.len() == 1 {
1412            let chunk_idx = candidate_indices[0];
1413            let value_any = chunk_arrays
1414                .get(&plan.value_metas[chunk_idx].chunk_pk)
1415                .ok_or(Error::NotFound)?;
1416            let row_any = chunk_arrays
1417                .get(&plan.row_metas[chunk_idx].chunk_pk)
1418                .ok_or(Error::NotFound)?;
1419            let _value_arr = value_any
1420                .as_any()
1421                .downcast_ref::<PrimitiveArray<T>>()
1422                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1423            let row_arr = row_any
1424                .as_any()
1425                .downcast_ref::<UInt64Array>()
1426                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1427
1428            if row_arr.null_count() == 0 && row_ids.windows(2).all(|w| w[0] <= w[1]) {
1429                let values = row_arr.values();
1430                if let Ok(start_idx) = values.binary_search(&row_ids[0])
1431                    && start_idx + len <= values.len()
1432                    && row_ids == &values[start_idx..start_idx + len]
1433                {
1434                    return Ok(value_any.slice(start_idx, len));
1435                }
1436            }
1437        }
1438
1439        for slot in row_scratch.iter_mut().take(len) {
1440            *slot = None;
1441        }
1442
1443        let mut candidates: Vec<(usize, &PrimitiveArray<T>, &UInt64Array)> =
1444            Vec::with_capacity(candidate_indices.len());
1445        let mut chunk_lookup: FxHashMap<usize, usize> = FxHashMap::default();
1446
1447        for (slot, &chunk_idx) in candidate_indices.iter().enumerate() {
1448            let value_any = chunk_arrays
1449                .get(&plan.value_metas[chunk_idx].chunk_pk)
1450                .ok_or(Error::NotFound)?;
1451            let value_arr = value_any
1452                .as_any()
1453                .downcast_ref::<PrimitiveArray<T>>()
1454                .ok_or_else(|| Error::Internal("gather_rows_multi: dtype mismatch".into()))?;
1455            let row_any = chunk_arrays
1456                .get(&plan.row_metas[chunk_idx].chunk_pk)
1457                .ok_or(Error::NotFound)?;
1458            let row_arr = row_any
1459                .as_any()
1460                .downcast_ref::<UInt64Array>()
1461                .ok_or_else(|| Error::Internal("gather_rows_multi: row_id downcast".into()))?;
1462
1463            candidates.push((chunk_idx, value_arr, row_arr));
1464            chunk_lookup.insert(chunk_idx, slot);
1465
1466            for i in 0..row_arr.len() {
1467                if !row_arr.is_valid(i) {
1468                    continue;
1469                }
1470                let row_id = row_arr.value(i);
1471                if let Some(out_idx) = row_locator.lookup(row_id, len) {
1472                    row_scratch[out_idx] = Some((chunk_idx, i));
1473                }
1474            }
1475        }
1476
1477        if !allow_missing {
1478            for slot in row_scratch.iter().take(len) {
1479                if slot.is_none() {
1480                    return Err(Error::Internal(
1481                        "gather_rows_multi: one or more requested row IDs were not found".into(),
1482                    ));
1483                }
1484            }
1485        }
1486
1487        let mut builder = PrimitiveBuilder::<T>::with_capacity(len);
1488        for row_scratch_item in row_scratch.iter().take(len) {
1489            if let Some((chunk_idx, value_idx)) = *row_scratch_item {
1490                if let Some(&slot) = chunk_lookup.get(&chunk_idx) {
1491                    let (idx, value_arr, _) = candidates[slot];
1492                    debug_assert_eq!(idx, chunk_idx);
1493                    if value_arr.is_null(value_idx) {
1494                        builder.append_null();
1495                    } else {
1496                        builder.append_value(value_arr.value(value_idx));
1497                    }
1498                } else {
1499                    builder.append_null();
1500                }
1501            } else {
1502                builder.append_null();
1503            }
1504        }
1505
1506        Ok(Arc::new(builder.finish()) as ArrayRef)
1507    }
1508
1509    fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1510        if columns.is_empty() {
1511            return Ok(columns);
1512        }
1513
1514        let len = columns[0].len();
1515        if len == 0 {
1516            return Ok(columns);
1517        }
1518
1519        let mut keep = vec![false; len];
1520        for array in &columns {
1521            debug_assert_eq!(array.len(), len);
1522            if array.null_count() == 0 {
1523                keep.fill(true);
1524                break;
1525            }
1526            for (i, keep_item) in keep.iter_mut().enumerate().take(len) {
1527                if array.is_valid(i) {
1528                    *keep_item = true;
1529                }
1530            }
1531            if keep.iter().all(|flag| *flag) {
1532                break;
1533            }
1534        }
1535
1536        if keep.iter().all(|flag| *flag) {
1537            return Ok(columns);
1538        }
1539
1540        let mask = BooleanArray::from(keep);
1541
1542        let mut filtered = Vec::with_capacity(columns.len());
1543        for array in columns {
1544            let filtered_column = compute::filter(array.as_ref(), &mask)
1545                .map_err(|e| Error::Internal(format!("gather_rows_multi filter: {e}")))?;
1546            filtered.push(filtered_column);
1547        }
1548        Ok(filtered)
1549    }
1550}