llkv_column_map/store/
projection.rs

1//! Projection planners for building Arrow batches from column chunks.
2//!
3//! The projection subsystem wires descriptor metadata, chunk loading helpers,
4//! and gather routines to materialize user-facing result sets. It centralizes
5//! null-handling policies and keeps the hot paths monomorphized per Arrow type.
6
7use super::*;
8use crate::gather::{
9    RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10    gather_rows_from_chunks as shared_gather_rows_from_chunks,
11    gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12    gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13    gather_rows_from_chunks_decimal128 as shared_gather_rows_from_chunks_decimal128,
14    gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
15    gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
16    gather_rows_single_shot as shared_gather_rows_single_shot,
17    gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
18    gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
19    gather_rows_single_shot_decimal128 as shared_gather_rows_single_shot_decimal128,
20    gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
21    gather_rows_single_shot_string_view as shared_gather_rows_single_shot_string_view,
22    gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
23};
24use crate::serialization::deserialize_array;
25use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
26use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
27use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
28use arrow::record_batch::RecordBatch;
29use llkv_result::{Error, Result};
30use llkv_storage::{
31    pager::{BatchGet, GetResult, Pager},
32    types::PhysicalKey,
33};
34use llkv_types::ids::{LogicalFieldId, RowId};
35use rustc_hash::{FxHashMap, FxHashSet};
36use simd_r_drive_entry_handle::EntryHandle;
37use std::borrow::Cow;
38use std::sync::Arc;
39
40#[derive(Clone, Copy, Debug, PartialEq, Eq)]
41pub enum GatherNullPolicy {
42    /// Any missing row ID results in an error.
43    ErrorOnMissing,
44    /// Missing rows surface as nulls in the result arrays.
45    IncludeNulls,
46    /// Missing rows and rows where all projected columns are null are dropped from the output.
47    DropNulls,
48}
49
50/// Logical projection request describing a field and optional alias.
51///
52/// Used by planners to pull a column by [`LogicalFieldId`] and to rename the output slot when
53/// needed.
54#[derive(Clone, Debug, Default, Eq, PartialEq)]
55pub struct Projection {
56    pub logical_field_id: LogicalFieldId,
57    pub alias: Option<String>,
58}
59
60impl Projection {
61    pub fn new(logical_field_id: LogicalFieldId) -> Self {
62        Self {
63            logical_field_id,
64            alias: None,
65        }
66    }
67
68    pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
69        Self {
70            logical_field_id,
71            alias: Some(alias.into()),
72        }
73    }
74}
75
76impl From<LogicalFieldId> for Projection {
77    fn from(logical_field_id: LogicalFieldId) -> Self {
78        Projection::new(logical_field_id)
79    }
80}
81
82impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
83    fn from(value: (LogicalFieldId, S)) -> Self {
84        Projection::with_alias(value.0, value.1)
85    }
86}
87
88impl GatherNullPolicy {
89    #[inline]
90    fn allow_missing(self) -> bool {
91        !matches!(self, Self::ErrorOnMissing)
92    }
93}
94
95/// Scratch structures shared across field plans during multi-column gathers.
96///
97/// Keeps cached chunk blobs and row indexes so repeated projection passes avoid redundant pager
98/// reads.
99pub struct MultiGatherContext {
100    field_infos: Vec<(LogicalFieldId, DataType)>,
101    fields: Vec<Field>,
102    plans: Vec<FieldPlan>,
103    chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
104    row_index: FxHashMap<u64, usize>,
105    row_scratch: Vec<Option<(usize, usize)>>,
106    chunk_keys: Vec<PhysicalKey>,
107}
108
109impl MultiGatherContext {
110    fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
111        // Build Field objects from field_infos with default nullable=true
112        // This is for backward compatibility when no expected schema is provided
113        let fields: Vec<Field> = field_infos
114            .iter()
115            .map(|(fid, dtype)| {
116                let field_name = format!("field_{}", u64::from(*fid));
117                Field::new(field_name, dtype.clone(), true)
118            })
119            .collect();
120
121        Self {
122            chunk_cache: FxHashMap::default(),
123            row_index: FxHashMap::default(),
124            row_scratch: Vec::new(),
125            chunk_keys: Vec::new(),
126            field_infos,
127            fields,
128            plans,
129        }
130    }
131
132    #[inline]
133    pub fn is_empty(&self) -> bool {
134        self.plans.is_empty()
135    }
136
137    #[inline]
138    fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
139        &self.field_infos
140    }
141
142    #[inline]
143    fn fields(&self) -> &[Field] {
144        &self.fields
145    }
146
147    #[inline]
148    fn plans(&self) -> &[FieldPlan] {
149        &self.plans
150    }
151
152    #[inline]
153    fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
154        &self.chunk_cache
155    }
156
157    #[inline]
158    fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
159        &mut self.chunk_cache
160    }
161
162    #[inline]
163    fn plans_mut(&mut self) -> &mut [FieldPlan] {
164        &mut self.plans
165    }
166
167    fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
168        std::mem::take(&mut self.chunk_keys)
169    }
170
171    fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
172        self.chunk_keys = keys;
173    }
174
175    fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
176        std::mem::take(&mut self.row_index)
177    }
178
179    fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
180        self.row_index = row_index;
181    }
182
183    fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
184        std::mem::take(&mut self.row_scratch)
185    }
186
187    fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
188        self.row_scratch = scratch;
189    }
190
191    pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
192        let first_plan = self.plans.first()?;
193        let mut chunk_idx = None;
194        for (idx, meta) in first_plan.row_metas.iter().enumerate() {
195            if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
196                chunk_idx = Some(idx);
197                break;
198            }
199        }
200        if chunk_idx.is_none() {
201            let total_chunks = first_plan.row_metas.len();
202            'outer: for idx in 0..total_chunks {
203                for plan in &self.plans {
204                    let meta = &plan.row_metas[idx];
205                    if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
206                        chunk_idx = Some(idx);
207                        break 'outer;
208                    }
209                }
210            }
211        }
212        let idx = chunk_idx?;
213
214        let mut span_min = u64::MAX;
215        let mut span_max = 0u64;
216        for plan in &self.plans {
217            let meta = plan.row_metas.get(idx)?;
218            span_min = span_min.min(meta.min_val_u64);
219            span_max = span_max.max(meta.max_val_u64);
220        }
221
222        if span_min > span_max {
223            return None;
224        }
225
226        Some((idx, span_min, span_max))
227    }
228}
229
230#[derive(Clone, Debug)]
231struct FieldPlan {
232    dtype: DataType,
233    value_metas: Vec<ChunkMetadata>,
234    row_metas: Vec<ChunkMetadata>,
235    candidate_indices: Vec<usize>,
236}
237
238impl<P> ColumnStore<P>
239where
240    P: Pager<Blob = EntryHandle> + Send + Sync,
241{
242    /// Gathers multiple columns using a configurable null-handling policy.
243    /// When [`GatherNullPolicy::DropNulls`] is selected, rows where all
244    /// projected columns are null or missing are removed from the
245    /// resulting batch.
246    pub fn gather_rows(
247        &self,
248        field_ids: &[LogicalFieldId],
249        row_ids: &[u64],
250        policy: GatherNullPolicy,
251    ) -> Result<RecordBatch> {
252        self.gather_rows_with_schema(field_ids, row_ids, policy, None)
253    }
254
255    /// Gather rows with an optional expected schema for empty result sets.
256    ///
257    /// When `expected_schema` is provided and `row_ids` is empty, the returned
258    /// RecordBatch will use that schema instead of synthesizing one with all nullable fields.
259    /// This ensures that non-nullable columns (e.g., PRIMARY KEYs) are correctly represented.
260    pub fn gather_rows_with_schema(
261        &self,
262        field_ids: &[LogicalFieldId],
263        row_ids: &[u64],
264        policy: GatherNullPolicy,
265        expected_schema: Option<Arc<Schema>>,
266    ) -> Result<RecordBatch> {
267        let mut ctx = self.prepare_gather_context(field_ids)?;
268        self.execute_gather_single_pass_with_schema(&mut ctx, row_ids, policy, expected_schema)
269    }
270
271    /// Executes a one-off gather with optional schema for empty result sets.
272    fn execute_gather_single_pass_with_schema(
273        &self,
274        ctx: &mut MultiGatherContext,
275        row_ids: &[u64],
276        policy: GatherNullPolicy,
277        expected_schema: Option<Arc<Schema>>,
278    ) -> Result<RecordBatch> {
279        if ctx.is_empty() {
280            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
281        }
282
283        let field_infos = ctx.field_infos().to_vec();
284
285        if row_ids.is_empty() {
286            // Use expected_schema if provided to preserve nullability
287            let (schema, arrays) = if let Some(expected_schema) = expected_schema {
288                let mut arrays = Vec::with_capacity(expected_schema.fields().len());
289                for field in expected_schema.fields() {
290                    arrays.push(new_empty_array(field.data_type()));
291                }
292                (expected_schema, arrays)
293            } else {
294                // Fallback: Use fields from context which have nullable=true by default
295                // This is safe because nullable=true is always compatible
296                let fields = ctx.fields();
297                let mut arrays = Vec::with_capacity(fields.len());
298                for field in fields {
299                    arrays.push(new_empty_array(field.data_type()));
300                }
301                (Arc::new(Schema::new(fields.to_vec())), arrays)
302            };
303            return RecordBatch::try_new(schema, arrays)
304                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
305        }
306
307        let mut row_index: FxHashMap<u64, usize> =
308            FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
309        for (idx, &row_id) in row_ids.iter().enumerate() {
310            if row_index.insert(row_id, idx).is_some() {
311                return Err(Error::Internal(
312                    "duplicate row_id in gather_rows_multi".into(),
313                ));
314            }
315        }
316
317        let mut sorted_row_ids = row_ids.to_vec();
318        sorted_row_ids.sort_unstable();
319
320        let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
321        {
322            let plans_mut = ctx.plans_mut();
323            for plan in plans_mut.iter_mut() {
324                plan.candidate_indices.clear();
325                for (idx, meta) in plan.row_metas.iter().enumerate() {
326                    if Self::chunk_intersects(&sorted_row_ids, meta) {
327                        plan.candidate_indices.push(idx);
328                        chunk_keys.insert(plan.value_metas[idx].chunk_pk);
329                        chunk_keys.insert(plan.row_metas[idx].chunk_pk);
330                    }
331                }
332            }
333        }
334
335        let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
336        for &key in &chunk_keys {
337            chunk_requests.push(BatchGet::Raw { key });
338        }
339
340        let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
341            FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
342        if !chunk_requests.is_empty() {
343            let chunk_results = self.pager.batch_get(&chunk_requests)?;
344            for result in chunk_results {
345                if let GetResult::Raw { key, bytes } = result {
346                    chunk_map.insert(key, bytes);
347                }
348            }
349        }
350
351        let allow_missing = policy.allow_missing();
352
353        let mut outputs = Vec::with_capacity(ctx.plans().len());
354        for plan in ctx.plans() {
355            let array = match &plan.dtype {
356                DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
357                    &row_index,
358                    row_ids.len(),
359                    plan,
360                    &mut chunk_map,
361                    allow_missing,
362                ),
363                DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
364                    &row_index,
365                    row_ids.len(),
366                    plan,
367                    &mut chunk_map,
368                    allow_missing,
369                ),
370                DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
371                    &row_index,
372                    row_ids.len(),
373                    plan,
374                    &mut chunk_map,
375                    allow_missing,
376                ),
377                DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
378                    &row_index,
379                    row_ids.len(),
380                    plan,
381                    &mut chunk_map,
382                    allow_missing,
383                ),
384                DataType::Utf8View => Self::gather_rows_single_shot_string_view(
385                    &row_index,
386                    row_ids.len(),
387                    plan,
388                    &mut chunk_map,
389                    allow_missing,
390                ),
391                DataType::Boolean => Self::gather_rows_single_shot_bool(
392                    &row_index,
393                    row_ids.len(),
394                    plan,
395                    &mut chunk_map,
396                    allow_missing,
397                ),
398                DataType::Struct(_) => Self::gather_rows_single_shot_struct(
399                    &row_index,
400                    row_ids.len(),
401                    plan,
402                    &mut chunk_map,
403                    allow_missing,
404                    &plan.dtype,
405                ),
406                DataType::Decimal128(_, _) => Self::gather_rows_single_shot_decimal128(
407                    &row_index,
408                    row_ids.len(),
409                    plan,
410                    &mut chunk_map,
411                    allow_missing,
412                    &plan.dtype,
413                ),
414                other => with_integer_arrow_type!(
415                    other.clone(),
416                    |ArrowTy| {
417                        Self::gather_rows_single_shot::<ArrowTy>(
418                            &row_index,
419                            row_ids.len(),
420                            plan,
421                            &mut chunk_map,
422                            allow_missing,
423                        )
424                    },
425                    Err(Error::Internal(format!(
426                        "gather_rows_multi: unsupported dtype {:?}",
427                        other
428                    ))),
429                ),
430            }?;
431            outputs.push(array);
432        }
433
434        let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
435            Self::filter_rows_with_non_null(outputs)?
436        } else {
437            outputs
438        };
439
440        let mut fields = Vec::with_capacity(field_infos.len());
441        for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
442            let array = &outputs[idx];
443            let field_name = format!("field_{}", u64::from(*fid));
444            let nullable = match policy {
445                GatherNullPolicy::IncludeNulls => true,
446                _ => array.null_count() > 0,
447            };
448            fields.push(Field::new(field_name, dtype.clone(), nullable));
449        }
450
451        let schema = Arc::new(Schema::new(fields));
452        RecordBatch::try_new(schema, outputs)
453            .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
454    }
455
456    pub fn prepare_gather_context(
457        &self,
458        field_ids: &[LogicalFieldId],
459    ) -> Result<MultiGatherContext> {
460        let mut field_infos = Vec::with_capacity(field_ids.len());
461        for &fid in field_ids {
462            field_infos.push((fid, self.data_type(fid)?));
463        }
464
465        if field_infos.is_empty() {
466            return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
467        }
468
469        let catalog = self.catalog.read().unwrap();
470        let mut key_pairs = Vec::with_capacity(field_infos.len());
471        for (fid, _) in &field_infos {
472            let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
473            let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
474            key_pairs.push((value_pk, row_pk));
475        }
476        drop(catalog);
477
478        let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
479        for (value_pk, row_pk) in &key_pairs {
480            descriptor_requests.push(BatchGet::Raw { key: *value_pk });
481            descriptor_requests.push(BatchGet::Raw { key: *row_pk });
482        }
483        let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
484        let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
485        for result in descriptor_results {
486            if let GetResult::Raw { key, bytes } = result {
487                descriptor_map.insert(key, bytes);
488            }
489        }
490
491        let mut plans = Vec::with_capacity(field_infos.len());
492        for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
493            let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
494            let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
495            let value_metas =
496                Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
497
498            let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
499            let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
500            let row_metas =
501                Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
502
503            if value_metas.len() != row_metas.len() {
504                return Err(Error::Internal(
505                    "gather_rows_multi: chunk count mismatch".into(),
506                ));
507            }
508
509            plans.push(FieldPlan {
510                dtype: dtype.clone(),
511                value_metas,
512                row_metas,
513                candidate_indices: Vec::new(),
514            });
515        }
516
517        Ok(MultiGatherContext::new(field_infos, plans))
518    }
519
520    /// Gathers rows while reusing chunk caches and scratch buffers stored in the context.
521    ///
522    /// This path amortizes chunk fetch and decode costs across multiple calls by
523    /// retaining Arrow arrays and scratch state inside the provided context.
524    pub fn gather_rows_with_reusable_context(
525        &self,
526        ctx: &mut MultiGatherContext,
527        row_ids: &[u64],
528        policy: GatherNullPolicy,
529    ) -> Result<RecordBatch> {
530        if ctx.is_empty() {
531            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
532        }
533
534        if row_ids.is_empty() {
535            let mut arrays = Vec::with_capacity(ctx.field_infos().len());
536            let mut fields = Vec::with_capacity(ctx.field_infos().len());
537            for (fid, dtype) in ctx.field_infos() {
538                arrays.push(new_empty_array(dtype));
539                let field_name = format!("field_{}", u64::from(*fid));
540                fields.push(Field::new(field_name, dtype.clone(), true));
541            }
542            let schema = Arc::new(Schema::new(fields));
543            return RecordBatch::try_new(schema, arrays)
544                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
545        }
546
547        let mut row_index = ctx.take_row_index();
548        let mut row_scratch = ctx.take_row_scratch();
549
550        let field_infos = ctx.field_infos().to_vec();
551        let mut chunk_keys = ctx.take_chunk_keys();
552
553        let result: Result<RecordBatch> = (|| {
554            let len = row_ids.len();
555            if row_scratch.len() < len {
556                row_scratch.resize(len, None);
557            }
558
559            let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
560            let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
561                Cow::Borrowed(row_ids)
562            } else {
563                let mut buf = row_ids.to_vec();
564                buf.sort_unstable();
565                Cow::Owned(buf)
566            };
567            let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
568
569            let dense_base = if len == 0 {
570                None
571            } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
572            {
573                Some(row_ids[0])
574            } else {
575                None
576            };
577
578            if dense_base.is_none() {
579                row_index.clear();
580                row_index.reserve(len);
581                for (idx, &row_id) in row_ids.iter().enumerate() {
582                    if row_index.insert(row_id, idx).is_some() {
583                        return Err(Error::Internal(
584                            "duplicate row_id in gather_rows_multi".into(),
585                        ));
586                    }
587                }
588            } else {
589                row_index.clear();
590            }
591
592            let row_locator = if let Some(base) = dense_base {
593                RowLocator::Dense { base }
594            } else {
595                RowLocator::Sparse { index: &row_index }
596            };
597
598            chunk_keys.clear();
599
600            {
601                let plans_mut = ctx.plans_mut();
602                for plan in plans_mut.iter_mut() {
603                    plan.candidate_indices.clear();
604                    for (idx, meta) in plan.row_metas.iter().enumerate() {
605                        if Self::chunk_intersects(sorted_row_ids, meta) {
606                            plan.candidate_indices.push(idx);
607                            chunk_keys.push(plan.value_metas[idx].chunk_pk);
608                            chunk_keys.push(plan.row_metas[idx].chunk_pk);
609                        }
610                    }
611                }
612            }
613
614            chunk_keys.sort_unstable();
615            chunk_keys.dedup();
616
617            {
618                let mut pending: Vec<BatchGet> = Vec::new();
619                {
620                    let cache = ctx.chunk_cache();
621                    for &key in &chunk_keys {
622                        if !cache.contains_key(&key) {
623                            pending.push(BatchGet::Raw { key });
624                        }
625                    }
626                }
627
628                if !pending.is_empty() {
629                    let chunk_results = self.pager.batch_get(&pending)?;
630                    let cache = ctx.chunk_cache_mut();
631                    for result in chunk_results {
632                        if let GetResult::Raw { key, bytes } = result {
633                            let array = deserialize_array(bytes)?;
634                            cache.insert(key, Arc::clone(&array));
635                        }
636                    }
637                }
638            }
639
640            let allow_missing = policy.allow_missing();
641
642            let mut outputs = Vec::with_capacity(ctx.plans().len());
643            for plan in ctx.plans() {
644                let array = match &plan.dtype {
645                    DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
646                        row_ids,
647                        row_locator,
648                        len,
649                        &plan.candidate_indices,
650                        plan,
651                        ctx.chunk_cache(),
652                        &mut row_scratch,
653                        allow_missing,
654                    ),
655                    DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
656                        row_ids,
657                        row_locator,
658                        len,
659                        &plan.candidate_indices,
660                        plan,
661                        ctx.chunk_cache(),
662                        &mut row_scratch,
663                        allow_missing,
664                    ),
665                    DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
666                        row_ids,
667                        row_locator,
668                        len,
669                        &plan.candidate_indices,
670                        plan,
671                        ctx.chunk_cache(),
672                        &mut row_scratch,
673                        allow_missing,
674                    ),
675                    DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
676                        row_ids,
677                        row_locator,
678                        len,
679                        &plan.candidate_indices,
680                        plan,
681                        ctx.chunk_cache(),
682                        &mut row_scratch,
683                        allow_missing,
684                    ),
685                    DataType::Boolean => Self::gather_rows_from_chunks_bool(
686                        row_ids,
687                        row_locator,
688                        len,
689                        &plan.candidate_indices,
690                        plan,
691                        ctx.chunk_cache(),
692                        &mut row_scratch,
693                        allow_missing,
694                    ),
695                    DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
696                        row_locator,
697                        len,
698                        &plan.candidate_indices,
699                        plan,
700                        ctx.chunk_cache(),
701                        &mut row_scratch,
702                        allow_missing,
703                        &plan.dtype,
704                    ),
705                    DataType::Decimal128(_, _) => Self::gather_rows_from_chunks_decimal128(
706                        row_ids,
707                        row_locator,
708                        len,
709                        &plan.candidate_indices,
710                        plan,
711                        ctx.chunk_cache(),
712                        &mut row_scratch,
713                        allow_missing,
714                        &plan.dtype,
715                    ),
716                    other => with_integer_arrow_type!(
717                        other.clone(),
718                        |ArrowTy| {
719                            Self::gather_rows_from_chunks::<ArrowTy>(
720                                row_ids,
721                                row_locator,
722                                len,
723                                &plan.candidate_indices,
724                                plan,
725                                ctx.chunk_cache(),
726                                &mut row_scratch,
727                                allow_missing,
728                            )
729                        },
730                        Err(Error::Internal(format!(
731                            "gather_rows_multi: unsupported dtype {:?}",
732                            other
733                        ))),
734                    ),
735                }?;
736                outputs.push(array);
737            }
738
739            let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
740                Self::filter_rows_with_non_null(outputs)?
741            } else {
742                outputs
743            };
744
745            let mut fields = Vec::with_capacity(field_infos.len());
746            for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
747                let array = &outputs[idx];
748                let field_name = format!("field_{}", u64::from(*fid));
749                let nullable = match policy {
750                    GatherNullPolicy::IncludeNulls => true,
751                    _ => array.null_count() > 0,
752                };
753                fields.push(Field::new(field_name, dtype.clone(), nullable));
754            }
755
756            let schema = Arc::new(Schema::new(fields));
757            RecordBatch::try_new(schema, outputs)
758                .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
759        })();
760
761        ctx.store_row_scratch(row_scratch);
762        ctx.store_row_index(row_index);
763        ctx.store_chunk_keys(chunk_keys);
764
765        result
766    }
767
768    fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
769        let mut metas = Vec::new();
770        if head_page_pk == 0 {
771            return Ok(metas);
772        }
773        for meta in DescriptorIterator::new(pager, head_page_pk) {
774            let meta = meta?;
775            if meta.row_count > 0 {
776                metas.push(meta);
777            }
778        }
779        Ok(metas)
780    }
781
782    #[inline]
783    fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
784        if sorted_row_ids.is_empty() || meta.row_count == 0 {
785            return false;
786        }
787        let min = meta.min_val_u64;
788        let max = meta.max_val_u64;
789        if min == 0 && max == 0 && meta.row_count > 0 {
790            return true;
791        }
792        if min > max {
793            return true;
794        }
795        let min_req = sorted_row_ids[0];
796        let max_req = *sorted_row_ids.last().unwrap();
797        if max < min_req || min > max_req {
798            return false;
799        }
800        let idx = sorted_row_ids.partition_point(|&rid| rid < min);
801        idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
802    }
803
804    fn gather_rows_single_shot_string<O>(
805        row_index: &FxHashMap<u64, usize>,
806        len: usize,
807        plan: &FieldPlan,
808        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
809        allow_missing: bool,
810    ) -> Result<ArrayRef>
811    where
812        O: OffsetSizeTrait,
813    {
814        shared_gather_rows_single_shot_string::<O>(
815            row_index,
816            len,
817            &plan.value_metas,
818            &plan.row_metas,
819            &plan.candidate_indices,
820            chunk_blobs,
821            allow_missing,
822        )
823    }
824
825    #[allow(clippy::too_many_arguments)]
826    fn gather_rows_single_shot_bool(
827        row_index: &FxHashMap<u64, usize>,
828        len: usize,
829        plan: &FieldPlan,
830        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
831        allow_missing: bool,
832    ) -> Result<ArrayRef> {
833        shared_gather_rows_single_shot_bool(
834            row_index,
835            len,
836            &plan.value_metas,
837            &plan.row_metas,
838            &plan.candidate_indices,
839            chunk_blobs,
840            allow_missing,
841        )
842    }
843
844    fn gather_rows_single_shot_struct(
845        row_index: &FxHashMap<u64, usize>,
846        len: usize,
847        plan: &FieldPlan,
848        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
849        allow_missing: bool,
850        dtype: &DataType,
851    ) -> Result<ArrayRef> {
852        shared_gather_rows_single_shot_struct(
853            row_index,
854            len,
855            &plan.value_metas,
856            &plan.row_metas,
857            &plan.candidate_indices,
858            chunk_blobs,
859            allow_missing,
860            dtype,
861        )
862    }
863
864    fn gather_rows_single_shot_decimal128(
865        row_index: &FxHashMap<u64, usize>,
866        len: usize,
867        plan: &FieldPlan,
868        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
869        allow_missing: bool,
870        dtype: &DataType,
871    ) -> Result<ArrayRef> {
872        shared_gather_rows_single_shot_decimal128(
873            row_index,
874            len,
875            &plan.value_metas,
876            &plan.row_metas,
877            &plan.candidate_indices,
878            chunk_blobs,
879            allow_missing,
880            dtype,
881        )
882    }
883
884    #[allow(clippy::too_many_arguments)]
885    fn gather_rows_single_shot<T>(
886        row_index: &FxHashMap<u64, usize>,
887        len: usize,
888        plan: &FieldPlan,
889        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
890        allow_missing: bool,
891    ) -> Result<ArrayRef>
892    where
893        T: ArrowPrimitiveType,
894    {
895        shared_gather_rows_single_shot::<T>(
896            row_index,
897            len,
898            &plan.value_metas,
899            &plan.row_metas,
900            &plan.candidate_indices,
901            chunk_blobs,
902            allow_missing,
903        )
904    }
905
906    fn gather_rows_single_shot_string_view(
907        row_index: &FxHashMap<u64, usize>,
908        len: usize,
909        plan: &FieldPlan,
910        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
911        allow_missing: bool,
912    ) -> Result<ArrayRef> {
913        shared_gather_rows_single_shot_string_view(
914            row_index,
915            len,
916            &plan.value_metas,
917            &plan.row_metas,
918            &plan.candidate_indices,
919            chunk_blobs,
920            allow_missing,
921        )
922    }
923
924    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
925    fn gather_rows_from_chunks_string<O>(
926        row_ids: &[u64],
927        row_locator: RowLocator,
928        len: usize,
929        candidate_indices: &[usize],
930        plan: &FieldPlan,
931        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
932        row_scratch: &mut [Option<(usize, usize)>],
933        allow_missing: bool,
934    ) -> Result<ArrayRef>
935    where
936        O: OffsetSizeTrait,
937    {
938        shared_gather_rows_from_chunks_string::<O>(
939            row_ids,
940            row_locator,
941            len,
942            candidate_indices,
943            &plan.value_metas,
944            &plan.row_metas,
945            chunk_arrays,
946            row_scratch,
947            allow_missing,
948        )
949    }
950
951    fn gather_rows_single_shot_binary<O>(
952        row_index: &FxHashMap<u64, usize>,
953        len: usize,
954        plan: &FieldPlan,
955        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
956        allow_missing: bool,
957    ) -> Result<ArrayRef>
958    where
959        O: OffsetSizeTrait,
960    {
961        shared_gather_rows_single_shot_binary::<O>(
962            row_index,
963            len,
964            &plan.value_metas,
965            &plan.row_metas,
966            &plan.candidate_indices,
967            chunk_blobs,
968            allow_missing,
969        )
970    }
971
972    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
973    fn gather_rows_from_chunks_binary<O>(
974        row_ids: &[u64],
975        row_locator: RowLocator,
976        len: usize,
977        candidate_indices: &[usize],
978        plan: &FieldPlan,
979        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
980        row_scratch: &mut [Option<(usize, usize)>],
981        allow_missing: bool,
982    ) -> Result<ArrayRef>
983    where
984        O: OffsetSizeTrait,
985    {
986        shared_gather_rows_from_chunks_binary::<O>(
987            row_ids,
988            row_locator,
989            len,
990            candidate_indices,
991            &plan.value_metas,
992            &plan.row_metas,
993            chunk_arrays,
994            row_scratch,
995            allow_missing,
996        )
997    }
998
999    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
1000    fn gather_rows_from_chunks_bool(
1001        row_ids: &[u64],
1002        row_locator: RowLocator,
1003        len: usize,
1004        candidate_indices: &[usize],
1005        plan: &FieldPlan,
1006        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1007        row_scratch: &mut [Option<(usize, usize)>],
1008        allow_missing: bool,
1009    ) -> Result<ArrayRef> {
1010        shared_gather_rows_from_chunks_bool(
1011            row_ids,
1012            row_locator,
1013            len,
1014            candidate_indices,
1015            &plan.value_metas,
1016            &plan.row_metas,
1017            chunk_arrays,
1018            row_scratch,
1019            allow_missing,
1020        )
1021    }
1022
1023    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
1024    fn gather_rows_from_chunks_struct(
1025        row_locator: RowLocator,
1026        len: usize,
1027        candidate_indices: &[usize],
1028        plan: &FieldPlan,
1029        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1030        row_scratch: &mut [Option<(usize, usize)>],
1031        allow_missing: bool,
1032        dtype: &DataType,
1033    ) -> Result<ArrayRef> {
1034        shared_gather_rows_from_chunks_struct(
1035            row_locator,
1036            len,
1037            candidate_indices,
1038            &plan.value_metas,
1039            &plan.row_metas,
1040            chunk_arrays,
1041            row_scratch,
1042            allow_missing,
1043            dtype,
1044        )
1045    }
1046
1047    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
1048    fn gather_rows_from_chunks_decimal128(
1049        row_ids: &[u64],
1050        row_locator: RowLocator,
1051        len: usize,
1052        candidate_indices: &[usize],
1053        plan: &FieldPlan,
1054        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1055        row_scratch: &mut [Option<(usize, usize)>],
1056        allow_missing: bool,
1057        dtype: &DataType,
1058    ) -> Result<ArrayRef> {
1059        shared_gather_rows_from_chunks_decimal128(
1060            row_ids,
1061            row_locator,
1062            len,
1063            candidate_indices,
1064            &plan.value_metas,
1065            &plan.row_metas,
1066            chunk_arrays,
1067            row_scratch,
1068            allow_missing,
1069            dtype,
1070        )
1071    }
1072
1073    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and keeps type monomorphization straightforward.
1074    fn gather_rows_from_chunks<T>(
1075        row_ids: &[u64],
1076        row_locator: RowLocator,
1077        len: usize,
1078        candidate_indices: &[usize],
1079        plan: &FieldPlan,
1080        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
1081        row_scratch: &mut [Option<(usize, usize)>],
1082        allow_missing: bool,
1083    ) -> Result<ArrayRef>
1084    where
1085        T: ArrowPrimitiveType,
1086    {
1087        shared_gather_rows_from_chunks::<T>(
1088            row_ids,
1089            row_locator,
1090            len,
1091            candidate_indices,
1092            &plan.value_metas,
1093            &plan.row_metas,
1094            chunk_arrays,
1095            row_scratch,
1096            allow_missing,
1097        )
1098    }
1099
1100    fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1101        shared_filter_rows_with_non_null(columns)
1102    }
1103}