llkv_column_map/store/
projection.rs

1//! Projection planners for building Arrow batches from column chunks.
2//!
3//! The projection subsystem wires descriptor metadata, chunk loading helpers,
4//! and gather routines to materialize user-facing result sets. It centralizes
5//! null-handling policies and keeps the hot paths monomorphized per Arrow type.
6
7use super::*;
8use crate::gather::{
9    RowLocator, filter_rows_with_non_null as shared_filter_rows_with_non_null,
10    gather_rows_from_chunks as shared_gather_rows_from_chunks,
11    gather_rows_from_chunks_binary as shared_gather_rows_from_chunks_binary,
12    gather_rows_from_chunks_bool as shared_gather_rows_from_chunks_bool,
13    gather_rows_from_chunks_string as shared_gather_rows_from_chunks_string,
14    gather_rows_from_chunks_struct as shared_gather_rows_from_chunks_struct,
15    gather_rows_single_shot as shared_gather_rows_single_shot,
16    gather_rows_single_shot_binary as shared_gather_rows_single_shot_binary,
17    gather_rows_single_shot_bool as shared_gather_rows_single_shot_bool,
18    gather_rows_single_shot_string as shared_gather_rows_single_shot_string,
19    gather_rows_single_shot_struct as shared_gather_rows_single_shot_struct,
20};
21use crate::store::descriptor::{ChunkMetadata, ColumnDescriptor, DescriptorIterator};
22use crate::types::{LogicalFieldId, RowId};
23use arrow::array::{ArrayRef, OffsetSizeTrait, new_empty_array};
24use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema};
25use arrow::record_batch::RecordBatch;
26use llkv_result::{Error, Result};
27use llkv_storage::{
28    pager::{BatchGet, GetResult, Pager},
29    serialization::deserialize_array,
30    types::PhysicalKey,
31};
32use rustc_hash::{FxHashMap, FxHashSet};
33use simd_r_drive_entry_handle::EntryHandle;
34use std::borrow::Cow;
35use std::sync::Arc;
36
37#[derive(Clone, Copy, Debug, PartialEq, Eq)]
38pub enum GatherNullPolicy {
39    /// Any missing row ID results in an error.
40    ErrorOnMissing,
41    /// Missing rows surface as nulls in the result arrays.
42    IncludeNulls,
43    /// Missing rows and rows where all projected columns are null are dropped from the output.
44    DropNulls,
45}
46
47/// Logical projection request describing a field and optional alias.
48///
49/// Used by planners to pull a column by [`LogicalFieldId`] and to rename the output slot when
50/// needed.
51#[derive(Clone, Debug, Default, Eq, PartialEq)]
52pub struct Projection {
53    pub logical_field_id: LogicalFieldId,
54    pub alias: Option<String>,
55}
56
57impl Projection {
58    pub fn new(logical_field_id: LogicalFieldId) -> Self {
59        Self {
60            logical_field_id,
61            alias: None,
62        }
63    }
64
65    pub fn with_alias<S: Into<String>>(logical_field_id: LogicalFieldId, alias: S) -> Self {
66        Self {
67            logical_field_id,
68            alias: Some(alias.into()),
69        }
70    }
71}
72
73impl From<LogicalFieldId> for Projection {
74    fn from(logical_field_id: LogicalFieldId) -> Self {
75        Projection::new(logical_field_id)
76    }
77}
78
79impl<S: Into<String>> From<(LogicalFieldId, S)> for Projection {
80    fn from(value: (LogicalFieldId, S)) -> Self {
81        Projection::with_alias(value.0, value.1)
82    }
83}
84
85impl GatherNullPolicy {
86    #[inline]
87    fn allow_missing(self) -> bool {
88        !matches!(self, Self::ErrorOnMissing)
89    }
90}
91
92/// Scratch structures shared across field plans during multi-column gathers.
93///
94/// Keeps cached chunk blobs and row indexes so repeated projection passes avoid redundant pager
95/// reads.
96pub struct MultiGatherContext {
97    field_infos: Vec<(LogicalFieldId, DataType)>,
98    fields: Vec<Field>,
99    plans: Vec<FieldPlan>,
100    chunk_cache: FxHashMap<PhysicalKey, ArrayRef>,
101    row_index: FxHashMap<u64, usize>,
102    row_scratch: Vec<Option<(usize, usize)>>,
103    chunk_keys: Vec<PhysicalKey>,
104}
105
106impl MultiGatherContext {
107    fn new(field_infos: Vec<(LogicalFieldId, DataType)>, plans: Vec<FieldPlan>) -> Self {
108        // Build Field objects from field_infos with default nullable=true
109        // This is for backward compatibility when no expected schema is provided
110        let fields: Vec<Field> = field_infos
111            .iter()
112            .map(|(fid, dtype)| {
113                let field_name = format!("field_{}", u64::from(*fid));
114                Field::new(field_name, dtype.clone(), true)
115            })
116            .collect();
117
118        Self {
119            chunk_cache: FxHashMap::default(),
120            row_index: FxHashMap::default(),
121            row_scratch: Vec::new(),
122            chunk_keys: Vec::new(),
123            field_infos,
124            fields,
125            plans,
126        }
127    }
128
129    #[inline]
130    pub fn is_empty(&self) -> bool {
131        self.plans.is_empty()
132    }
133
134    #[inline]
135    fn field_infos(&self) -> &[(LogicalFieldId, DataType)] {
136        &self.field_infos
137    }
138
139    #[inline]
140    fn fields(&self) -> &[Field] {
141        &self.fields
142    }
143
144    #[inline]
145    fn plans(&self) -> &[FieldPlan] {
146        &self.plans
147    }
148
149    #[inline]
150    fn chunk_cache(&self) -> &FxHashMap<PhysicalKey, ArrayRef> {
151        &self.chunk_cache
152    }
153
154    #[inline]
155    fn chunk_cache_mut(&mut self) -> &mut FxHashMap<PhysicalKey, ArrayRef> {
156        &mut self.chunk_cache
157    }
158
159    #[inline]
160    fn plans_mut(&mut self) -> &mut [FieldPlan] {
161        &mut self.plans
162    }
163
164    fn take_chunk_keys(&mut self) -> Vec<PhysicalKey> {
165        std::mem::take(&mut self.chunk_keys)
166    }
167
168    fn store_chunk_keys(&mut self, keys: Vec<PhysicalKey>) {
169        self.chunk_keys = keys;
170    }
171
172    fn take_row_index(&mut self) -> FxHashMap<u64, usize> {
173        std::mem::take(&mut self.row_index)
174    }
175
176    fn store_row_index(&mut self, row_index: FxHashMap<u64, usize>) {
177        self.row_index = row_index;
178    }
179
180    fn take_row_scratch(&mut self) -> Vec<Option<(usize, usize)>> {
181        std::mem::take(&mut self.row_scratch)
182    }
183
184    fn store_row_scratch(&mut self, scratch: Vec<Option<(usize, usize)>>) {
185        self.row_scratch = scratch;
186    }
187
188    pub fn chunk_span_for_row(&self, row_id: RowId) -> Option<(usize, RowId, RowId)> {
189        let first_plan = self.plans.first()?;
190        let mut chunk_idx = None;
191        for (idx, meta) in first_plan.row_metas.iter().enumerate() {
192            if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
193                chunk_idx = Some(idx);
194                break;
195            }
196        }
197        if chunk_idx.is_none() {
198            let total_chunks = first_plan.row_metas.len();
199            'outer: for idx in 0..total_chunks {
200                for plan in &self.plans {
201                    let meta = &plan.row_metas[idx];
202                    if row_id >= meta.min_val_u64 && row_id <= meta.max_val_u64 {
203                        chunk_idx = Some(idx);
204                        break 'outer;
205                    }
206                }
207            }
208        }
209        let idx = chunk_idx?;
210
211        let mut span_min = u64::MAX;
212        let mut span_max = 0u64;
213        for plan in &self.plans {
214            let meta = plan.row_metas.get(idx)?;
215            span_min = span_min.min(meta.min_val_u64);
216            span_max = span_max.max(meta.max_val_u64);
217        }
218
219        if span_min > span_max {
220            return None;
221        }
222
223        Some((idx, span_min, span_max))
224    }
225}
226
227#[derive(Clone, Debug)]
228struct FieldPlan {
229    dtype: DataType,
230    value_metas: Vec<ChunkMetadata>,
231    row_metas: Vec<ChunkMetadata>,
232    candidate_indices: Vec<usize>,
233}
234
235impl<P> ColumnStore<P>
236where
237    P: Pager<Blob = EntryHandle> + Send + Sync,
238{
239    /// Gathers multiple columns using a configurable null-handling policy.
240    /// When [`GatherNullPolicy::DropNulls`] is selected, rows where all
241    /// projected columns are null or missing are removed from the
242    /// resulting batch.
243    pub fn gather_rows(
244        &self,
245        field_ids: &[LogicalFieldId],
246        row_ids: &[u64],
247        policy: GatherNullPolicy,
248    ) -> Result<RecordBatch> {
249        self.gather_rows_with_schema(field_ids, row_ids, policy, None)
250    }
251
252    /// Gather rows with an optional expected schema for empty result sets.
253    ///
254    /// When `expected_schema` is provided and `row_ids` is empty, the returned
255    /// RecordBatch will use that schema instead of synthesizing one with all nullable fields.
256    /// This ensures that non-nullable columns (e.g., PRIMARY KEYs) are correctly represented.
257    pub fn gather_rows_with_schema(
258        &self,
259        field_ids: &[LogicalFieldId],
260        row_ids: &[u64],
261        policy: GatherNullPolicy,
262        expected_schema: Option<Arc<Schema>>,
263    ) -> Result<RecordBatch> {
264        let mut ctx = self.prepare_gather_context(field_ids)?;
265        self.execute_gather_single_pass_with_schema(&mut ctx, row_ids, policy, expected_schema)
266    }
267
268    /// Executes a one-off gather with optional schema for empty result sets.
269    fn execute_gather_single_pass_with_schema(
270        &self,
271        ctx: &mut MultiGatherContext,
272        row_ids: &[u64],
273        policy: GatherNullPolicy,
274        expected_schema: Option<Arc<Schema>>,
275    ) -> Result<RecordBatch> {
276        if ctx.is_empty() {
277            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
278        }
279
280        let field_infos = ctx.field_infos().to_vec();
281
282        if row_ids.is_empty() {
283            // Use expected_schema if provided to preserve nullability
284            let (schema, arrays) = if let Some(expected_schema) = expected_schema {
285                let mut arrays = Vec::with_capacity(expected_schema.fields().len());
286                for field in expected_schema.fields() {
287                    arrays.push(new_empty_array(field.data_type()));
288                }
289                (expected_schema, arrays)
290            } else {
291                // Fallback: Use fields from context which have nullable=true by default
292                // This is safe because nullable=true is always compatible
293                let fields = ctx.fields();
294                let mut arrays = Vec::with_capacity(fields.len());
295                for field in fields {
296                    arrays.push(new_empty_array(field.data_type()));
297                }
298                (Arc::new(Schema::new(fields.to_vec())), arrays)
299            };
300            return RecordBatch::try_new(schema, arrays)
301                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
302        }
303
304        let mut row_index: FxHashMap<u64, usize> =
305            FxHashMap::with_capacity_and_hasher(row_ids.len(), Default::default());
306        for (idx, &row_id) in row_ids.iter().enumerate() {
307            if row_index.insert(row_id, idx).is_some() {
308                return Err(Error::Internal(
309                    "duplicate row_id in gather_rows_multi".into(),
310                ));
311            }
312        }
313
314        let mut sorted_row_ids = row_ids.to_vec();
315        sorted_row_ids.sort_unstable();
316
317        let mut chunk_keys: FxHashSet<PhysicalKey> = FxHashSet::default();
318        {
319            let plans_mut = ctx.plans_mut();
320            for plan in plans_mut.iter_mut() {
321                plan.candidate_indices.clear();
322                for (idx, meta) in plan.row_metas.iter().enumerate() {
323                    if Self::chunk_intersects(&sorted_row_ids, meta) {
324                        plan.candidate_indices.push(idx);
325                        chunk_keys.insert(plan.value_metas[idx].chunk_pk);
326                        chunk_keys.insert(plan.row_metas[idx].chunk_pk);
327                    }
328                }
329            }
330        }
331
332        let mut chunk_requests = Vec::with_capacity(chunk_keys.len());
333        for &key in &chunk_keys {
334            chunk_requests.push(BatchGet::Raw { key });
335        }
336
337        let mut chunk_map: FxHashMap<PhysicalKey, EntryHandle> =
338            FxHashMap::with_capacity_and_hasher(chunk_requests.len(), Default::default());
339        if !chunk_requests.is_empty() {
340            let chunk_results = self.pager.batch_get(&chunk_requests)?;
341            for result in chunk_results {
342                if let GetResult::Raw { key, bytes } = result {
343                    chunk_map.insert(key, bytes);
344                }
345            }
346        }
347
348        let allow_missing = policy.allow_missing();
349
350        let mut outputs = Vec::with_capacity(ctx.plans().len());
351        for plan in ctx.plans() {
352            let array = match &plan.dtype {
353                DataType::Utf8 => Self::gather_rows_single_shot_string::<i32>(
354                    &row_index,
355                    row_ids.len(),
356                    plan,
357                    &mut chunk_map,
358                    allow_missing,
359                ),
360                DataType::LargeUtf8 => Self::gather_rows_single_shot_string::<i64>(
361                    &row_index,
362                    row_ids.len(),
363                    plan,
364                    &mut chunk_map,
365                    allow_missing,
366                ),
367                DataType::Binary => Self::gather_rows_single_shot_binary::<i32>(
368                    &row_index,
369                    row_ids.len(),
370                    plan,
371                    &mut chunk_map,
372                    allow_missing,
373                ),
374                DataType::LargeBinary => Self::gather_rows_single_shot_binary::<i64>(
375                    &row_index,
376                    row_ids.len(),
377                    plan,
378                    &mut chunk_map,
379                    allow_missing,
380                ),
381                DataType::Boolean => Self::gather_rows_single_shot_bool(
382                    &row_index,
383                    row_ids.len(),
384                    plan,
385                    &mut chunk_map,
386                    allow_missing,
387                ),
388                DataType::Struct(_) => Self::gather_rows_single_shot_struct(
389                    &row_index,
390                    row_ids.len(),
391                    plan,
392                    &mut chunk_map,
393                    allow_missing,
394                    &plan.dtype,
395                ),
396                other => with_integer_arrow_type!(
397                    other.clone(),
398                    |ArrowTy| {
399                        Self::gather_rows_single_shot::<ArrowTy>(
400                            &row_index,
401                            row_ids.len(),
402                            plan,
403                            &mut chunk_map,
404                            allow_missing,
405                        )
406                    },
407                    Err(Error::Internal(format!(
408                        "gather_rows_multi: unsupported dtype {:?}",
409                        other
410                    ))),
411                ),
412            }?;
413            outputs.push(array);
414        }
415
416        let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
417            Self::filter_rows_with_non_null(outputs)?
418        } else {
419            outputs
420        };
421
422        let mut fields = Vec::with_capacity(field_infos.len());
423        for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
424            let array = &outputs[idx];
425            let field_name = format!("field_{}", u64::from(*fid));
426            let nullable = match policy {
427                GatherNullPolicy::IncludeNulls => true,
428                _ => array.null_count() > 0,
429            };
430            fields.push(Field::new(field_name, dtype.clone(), nullable));
431        }
432
433        let schema = Arc::new(Schema::new(fields));
434        RecordBatch::try_new(schema, outputs)
435            .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
436    }
437
438    pub fn prepare_gather_context(
439        &self,
440        field_ids: &[LogicalFieldId],
441    ) -> Result<MultiGatherContext> {
442        let mut field_infos = Vec::with_capacity(field_ids.len());
443        for &fid in field_ids {
444            field_infos.push((fid, self.data_type(fid)?));
445        }
446
447        if field_infos.is_empty() {
448            return Ok(MultiGatherContext::new(Vec::new(), Vec::new()));
449        }
450
451        let catalog = self.catalog.read().unwrap();
452        let mut key_pairs = Vec::with_capacity(field_infos.len());
453        for (fid, _) in &field_infos {
454            let value_pk = *catalog.map.get(fid).ok_or(Error::NotFound)?;
455            let row_pk = *catalog.map.get(&rowid_fid(*fid)).ok_or(Error::NotFound)?;
456            key_pairs.push((value_pk, row_pk));
457        }
458        drop(catalog);
459
460        let mut descriptor_requests = Vec::with_capacity(key_pairs.len() * 2);
461        for (value_pk, row_pk) in &key_pairs {
462            descriptor_requests.push(BatchGet::Raw { key: *value_pk });
463            descriptor_requests.push(BatchGet::Raw { key: *row_pk });
464        }
465        let descriptor_results = self.pager.batch_get(&descriptor_requests)?;
466        let mut descriptor_map: FxHashMap<PhysicalKey, EntryHandle> = FxHashMap::default();
467        for result in descriptor_results {
468            if let GetResult::Raw { key, bytes } = result {
469                descriptor_map.insert(key, bytes);
470            }
471        }
472
473        let mut plans = Vec::with_capacity(field_infos.len());
474        for ((_, dtype), (value_pk, row_pk)) in field_infos.iter().zip(key_pairs.iter()) {
475            let value_desc_blob = descriptor_map.remove(value_pk).ok_or(Error::NotFound)?;
476            let value_desc = ColumnDescriptor::from_le_bytes(value_desc_blob.as_ref());
477            let value_metas =
478                Self::collect_non_empty_metas(self.pager.as_ref(), value_desc.head_page_pk)?;
479
480            let row_desc_blob = descriptor_map.remove(row_pk).ok_or(Error::NotFound)?;
481            let row_desc = ColumnDescriptor::from_le_bytes(row_desc_blob.as_ref());
482            let row_metas =
483                Self::collect_non_empty_metas(self.pager.as_ref(), row_desc.head_page_pk)?;
484
485            if value_metas.len() != row_metas.len() {
486                return Err(Error::Internal(
487                    "gather_rows_multi: chunk count mismatch".into(),
488                ));
489            }
490
491            plans.push(FieldPlan {
492                dtype: dtype.clone(),
493                value_metas,
494                row_metas,
495                candidate_indices: Vec::new(),
496            });
497        }
498
499        Ok(MultiGatherContext::new(field_infos, plans))
500    }
501
502    /// Gathers rows while reusing chunk caches and scratch buffers stored in the context.
503    ///
504    /// This path amortizes chunk fetch and decode costs across multiple calls by
505    /// retaining Arrow arrays and scratch state inside the provided context.
506    pub fn gather_rows_with_reusable_context(
507        &self,
508        ctx: &mut MultiGatherContext,
509        row_ids: &[u64],
510        policy: GatherNullPolicy,
511    ) -> Result<RecordBatch> {
512        if ctx.is_empty() {
513            return Ok(RecordBatch::new_empty(Arc::new(Schema::empty())));
514        }
515
516        if row_ids.is_empty() {
517            let mut arrays = Vec::with_capacity(ctx.field_infos().len());
518            let mut fields = Vec::with_capacity(ctx.field_infos().len());
519            for (fid, dtype) in ctx.field_infos() {
520                arrays.push(new_empty_array(dtype));
521                let field_name = format!("field_{}", u64::from(*fid));
522                fields.push(Field::new(field_name, dtype.clone(), true));
523            }
524            let schema = Arc::new(Schema::new(fields));
525            return RecordBatch::try_new(schema, arrays)
526                .map_err(|e| Error::Internal(format!("gather_rows_multi empty batch: {e}")));
527        }
528
529        let mut row_index = ctx.take_row_index();
530        let mut row_scratch = ctx.take_row_scratch();
531
532        let field_infos = ctx.field_infos().to_vec();
533        let mut chunk_keys = ctx.take_chunk_keys();
534
535        let result: Result<RecordBatch> = (|| {
536            let len = row_ids.len();
537            if row_scratch.len() < len {
538                row_scratch.resize(len, None);
539            }
540
541            let is_non_decreasing = len <= 1 || row_ids.windows(2).all(|w| w[0] <= w[1]);
542            let sorted_row_ids_cow: Cow<'_, [u64]> = if is_non_decreasing {
543                Cow::Borrowed(row_ids)
544            } else {
545                let mut buf = row_ids.to_vec();
546                buf.sort_unstable();
547                Cow::Owned(buf)
548            };
549            let sorted_row_ids: &[u64] = sorted_row_ids_cow.as_ref();
550
551            let dense_base = if len == 0 {
552                None
553            } else if len == 1 || is_non_decreasing && row_ids.windows(2).all(|w| w[1] == w[0] + 1)
554            {
555                Some(row_ids[0])
556            } else {
557                None
558            };
559
560            if dense_base.is_none() {
561                row_index.clear();
562                row_index.reserve(len);
563                for (idx, &row_id) in row_ids.iter().enumerate() {
564                    if row_index.insert(row_id, idx).is_some() {
565                        return Err(Error::Internal(
566                            "duplicate row_id in gather_rows_multi".into(),
567                        ));
568                    }
569                }
570            } else {
571                row_index.clear();
572            }
573
574            let row_locator = if let Some(base) = dense_base {
575                RowLocator::Dense { base }
576            } else {
577                RowLocator::Sparse { index: &row_index }
578            };
579
580            chunk_keys.clear();
581
582            {
583                let plans_mut = ctx.plans_mut();
584                for plan in plans_mut.iter_mut() {
585                    plan.candidate_indices.clear();
586                    for (idx, meta) in plan.row_metas.iter().enumerate() {
587                        if Self::chunk_intersects(sorted_row_ids, meta) {
588                            plan.candidate_indices.push(idx);
589                            chunk_keys.push(plan.value_metas[idx].chunk_pk);
590                            chunk_keys.push(plan.row_metas[idx].chunk_pk);
591                        }
592                    }
593                }
594            }
595
596            chunk_keys.sort_unstable();
597            chunk_keys.dedup();
598
599            {
600                let mut pending: Vec<BatchGet> = Vec::new();
601                {
602                    let cache = ctx.chunk_cache();
603                    for &key in &chunk_keys {
604                        if !cache.contains_key(&key) {
605                            pending.push(BatchGet::Raw { key });
606                        }
607                    }
608                }
609
610                if !pending.is_empty() {
611                    let chunk_results = self.pager.batch_get(&pending)?;
612                    let cache = ctx.chunk_cache_mut();
613                    for result in chunk_results {
614                        if let GetResult::Raw { key, bytes } = result {
615                            let array = deserialize_array(bytes)?;
616                            cache.insert(key, Arc::clone(&array));
617                        }
618                    }
619                }
620            }
621
622            let allow_missing = policy.allow_missing();
623
624            let mut outputs = Vec::with_capacity(ctx.plans().len());
625            for plan in ctx.plans() {
626                let array = match &plan.dtype {
627                    DataType::Utf8 => Self::gather_rows_from_chunks_string::<i32>(
628                        row_ids,
629                        row_locator,
630                        len,
631                        &plan.candidate_indices,
632                        plan,
633                        ctx.chunk_cache(),
634                        &mut row_scratch,
635                        allow_missing,
636                    ),
637                    DataType::LargeUtf8 => Self::gather_rows_from_chunks_string::<i64>(
638                        row_ids,
639                        row_locator,
640                        len,
641                        &plan.candidate_indices,
642                        plan,
643                        ctx.chunk_cache(),
644                        &mut row_scratch,
645                        allow_missing,
646                    ),
647                    DataType::Binary => Self::gather_rows_from_chunks_binary::<i32>(
648                        row_ids,
649                        row_locator,
650                        len,
651                        &plan.candidate_indices,
652                        plan,
653                        ctx.chunk_cache(),
654                        &mut row_scratch,
655                        allow_missing,
656                    ),
657                    DataType::LargeBinary => Self::gather_rows_from_chunks_binary::<i64>(
658                        row_ids,
659                        row_locator,
660                        len,
661                        &plan.candidate_indices,
662                        plan,
663                        ctx.chunk_cache(),
664                        &mut row_scratch,
665                        allow_missing,
666                    ),
667                    DataType::Boolean => Self::gather_rows_from_chunks_bool(
668                        row_ids,
669                        row_locator,
670                        len,
671                        &plan.candidate_indices,
672                        plan,
673                        ctx.chunk_cache(),
674                        &mut row_scratch,
675                        allow_missing,
676                    ),
677                    DataType::Struct(_) => Self::gather_rows_from_chunks_struct(
678                        row_locator,
679                        len,
680                        &plan.candidate_indices,
681                        plan,
682                        ctx.chunk_cache(),
683                        &mut row_scratch,
684                        allow_missing,
685                        &plan.dtype,
686                    ),
687                    other => with_integer_arrow_type!(
688                        other.clone(),
689                        |ArrowTy| {
690                            Self::gather_rows_from_chunks::<ArrowTy>(
691                                row_ids,
692                                row_locator,
693                                len,
694                                &plan.candidate_indices,
695                                plan,
696                                ctx.chunk_cache(),
697                                &mut row_scratch,
698                                allow_missing,
699                            )
700                        },
701                        Err(Error::Internal(format!(
702                            "gather_rows_multi: unsupported dtype {:?}",
703                            other
704                        ))),
705                    ),
706                }?;
707                outputs.push(array);
708            }
709
710            let outputs = if matches!(policy, GatherNullPolicy::DropNulls) {
711                Self::filter_rows_with_non_null(outputs)?
712            } else {
713                outputs
714            };
715
716            let mut fields = Vec::with_capacity(field_infos.len());
717            for (idx, (fid, dtype)) in field_infos.iter().enumerate() {
718                let array = &outputs[idx];
719                let field_name = format!("field_{}", u64::from(*fid));
720                let nullable = match policy {
721                    GatherNullPolicy::IncludeNulls => true,
722                    _ => array.null_count() > 0,
723                };
724                fields.push(Field::new(field_name, dtype.clone(), nullable));
725            }
726
727            let schema = Arc::new(Schema::new(fields));
728            RecordBatch::try_new(schema, outputs)
729                .map_err(|e| Error::Internal(format!("gather_rows_multi batch: {e}")))
730        })();
731
732        ctx.store_row_scratch(row_scratch);
733        ctx.store_row_index(row_index);
734        ctx.store_chunk_keys(chunk_keys);
735
736        result
737    }
738
739    fn collect_non_empty_metas(pager: &P, head_page_pk: PhysicalKey) -> Result<Vec<ChunkMetadata>> {
740        let mut metas = Vec::new();
741        if head_page_pk == 0 {
742            return Ok(metas);
743        }
744        for meta in DescriptorIterator::new(pager, head_page_pk) {
745            let meta = meta?;
746            if meta.row_count > 0 {
747                metas.push(meta);
748            }
749        }
750        Ok(metas)
751    }
752
753    #[inline]
754    fn chunk_intersects(sorted_row_ids: &[u64], meta: &ChunkMetadata) -> bool {
755        if sorted_row_ids.is_empty() || meta.row_count == 0 {
756            return false;
757        }
758        let min = meta.min_val_u64;
759        let max = meta.max_val_u64;
760        if min == 0 && max == 0 && meta.row_count > 0 {
761            return true;
762        }
763        if min > max {
764            return true;
765        }
766        let min_req = sorted_row_ids[0];
767        let max_req = *sorted_row_ids.last().unwrap();
768        if max < min_req || min > max_req {
769            return false;
770        }
771        let idx = sorted_row_ids.partition_point(|&rid| rid < min);
772        idx < sorted_row_ids.len() && sorted_row_ids[idx] <= max
773    }
774
775    fn gather_rows_single_shot_string<O>(
776        row_index: &FxHashMap<u64, usize>,
777        len: usize,
778        plan: &FieldPlan,
779        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
780        allow_missing: bool,
781    ) -> Result<ArrayRef>
782    where
783        O: OffsetSizeTrait,
784    {
785        shared_gather_rows_single_shot_string::<O>(
786            row_index,
787            len,
788            &plan.value_metas,
789            &plan.row_metas,
790            &plan.candidate_indices,
791            chunk_blobs,
792            allow_missing,
793        )
794    }
795
796    #[allow(clippy::too_many_arguments)]
797    fn gather_rows_single_shot_bool(
798        row_index: &FxHashMap<u64, usize>,
799        len: usize,
800        plan: &FieldPlan,
801        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
802        allow_missing: bool,
803    ) -> Result<ArrayRef> {
804        shared_gather_rows_single_shot_bool(
805            row_index,
806            len,
807            &plan.value_metas,
808            &plan.row_metas,
809            &plan.candidate_indices,
810            chunk_blobs,
811            allow_missing,
812        )
813    }
814
815    fn gather_rows_single_shot_struct(
816        row_index: &FxHashMap<u64, usize>,
817        len: usize,
818        plan: &FieldPlan,
819        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
820        allow_missing: bool,
821        dtype: &DataType,
822    ) -> Result<ArrayRef> {
823        shared_gather_rows_single_shot_struct(
824            row_index,
825            len,
826            &plan.value_metas,
827            &plan.row_metas,
828            &plan.candidate_indices,
829            chunk_blobs,
830            allow_missing,
831            dtype,
832        )
833    }
834
835    #[allow(clippy::too_many_arguments)]
836    fn gather_rows_single_shot<T>(
837        row_index: &FxHashMap<u64, usize>,
838        len: usize,
839        plan: &FieldPlan,
840        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
841        allow_missing: bool,
842    ) -> Result<ArrayRef>
843    where
844        T: ArrowPrimitiveType,
845    {
846        shared_gather_rows_single_shot::<T>(
847            row_index,
848            len,
849            &plan.value_metas,
850            &plan.row_metas,
851            &plan.candidate_indices,
852            chunk_blobs,
853            allow_missing,
854        )
855    }
856
857    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
858    fn gather_rows_from_chunks_string<O>(
859        row_ids: &[u64],
860        row_locator: RowLocator,
861        len: usize,
862        candidate_indices: &[usize],
863        plan: &FieldPlan,
864        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
865        row_scratch: &mut [Option<(usize, usize)>],
866        allow_missing: bool,
867    ) -> Result<ArrayRef>
868    where
869        O: OffsetSizeTrait,
870    {
871        shared_gather_rows_from_chunks_string::<O>(
872            row_ids,
873            row_locator,
874            len,
875            candidate_indices,
876            &plan.value_metas,
877            &plan.row_metas,
878            chunk_arrays,
879            row_scratch,
880            allow_missing,
881        )
882    }
883
884    fn gather_rows_single_shot_binary<O>(
885        row_index: &FxHashMap<u64, usize>,
886        len: usize,
887        plan: &FieldPlan,
888        chunk_blobs: &mut FxHashMap<PhysicalKey, EntryHandle>,
889        allow_missing: bool,
890    ) -> Result<ArrayRef>
891    where
892        O: OffsetSizeTrait,
893    {
894        shared_gather_rows_single_shot_binary::<O>(
895            row_index,
896            len,
897            &plan.value_metas,
898            &plan.row_metas,
899            &plan.candidate_indices,
900            chunk_blobs,
901            allow_missing,
902        )
903    }
904
905    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
906    fn gather_rows_from_chunks_binary<O>(
907        row_ids: &[u64],
908        row_locator: RowLocator,
909        len: usize,
910        candidate_indices: &[usize],
911        plan: &FieldPlan,
912        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
913        row_scratch: &mut [Option<(usize, usize)>],
914        allow_missing: bool,
915    ) -> Result<ArrayRef>
916    where
917        O: OffsetSizeTrait,
918    {
919        shared_gather_rows_from_chunks_binary::<O>(
920            row_ids,
921            row_locator,
922            len,
923            candidate_indices,
924            &plan.value_metas,
925            &plan.row_metas,
926            chunk_arrays,
927            row_scratch,
928            allow_missing,
929        )
930    }
931
932    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
933    fn gather_rows_from_chunks_bool(
934        row_ids: &[u64],
935        row_locator: RowLocator,
936        len: usize,
937        candidate_indices: &[usize],
938        plan: &FieldPlan,
939        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
940        row_scratch: &mut [Option<(usize, usize)>],
941        allow_missing: bool,
942    ) -> Result<ArrayRef> {
943        shared_gather_rows_from_chunks_bool(
944            row_ids,
945            row_locator,
946            len,
947            candidate_indices,
948            &plan.value_metas,
949            &plan.row_metas,
950            chunk_arrays,
951            row_scratch,
952            allow_missing,
953        )
954    }
955
956    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and avoids intermediate structs.
957    fn gather_rows_from_chunks_struct(
958        row_locator: RowLocator,
959        len: usize,
960        candidate_indices: &[usize],
961        plan: &FieldPlan,
962        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
963        row_scratch: &mut [Option<(usize, usize)>],
964        allow_missing: bool,
965        dtype: &DataType,
966    ) -> Result<ArrayRef> {
967        shared_gather_rows_from_chunks_struct(
968            row_locator,
969            len,
970            candidate_indices,
971            &plan.value_metas,
972            &plan.row_metas,
973            chunk_arrays,
974            row_scratch,
975            allow_missing,
976            dtype,
977        )
978    }
979
980    #[allow(clippy::too_many_arguments)] // NOTE: Signature mirrors shared helper and keeps type monomorphization straightforward.
981    fn gather_rows_from_chunks<T>(
982        row_ids: &[u64],
983        row_locator: RowLocator,
984        len: usize,
985        candidate_indices: &[usize],
986        plan: &FieldPlan,
987        chunk_arrays: &FxHashMap<PhysicalKey, ArrayRef>,
988        row_scratch: &mut [Option<(usize, usize)>],
989        allow_missing: bool,
990    ) -> Result<ArrayRef>
991    where
992        T: ArrowPrimitiveType,
993    {
994        shared_gather_rows_from_chunks::<T>(
995            row_ids,
996            row_locator,
997            len,
998            candidate_indices,
999            &plan.value_metas,
1000            &plan.row_metas,
1001            chunk_arrays,
1002            row_scratch,
1003            allow_missing,
1004        )
1005    }
1006
1007    fn filter_rows_with_non_null(columns: Vec<ArrayRef>) -> Result<Vec<ArrayRef>> {
1008        shared_filter_rows_with_non_null(columns)
1009    }
1010}